diff options
24 files changed, 701 insertions, 538 deletions
diff --git a/BitKeeper/etc/logging_ok b/BitKeeper/etc/logging_ok index c3ca14ab929..c6fbf5d23f5 100644 --- a/BitKeeper/etc/logging_ok +++ b/BitKeeper/etc/logging_ok @@ -100,6 +100,7 @@ miguel@hegel.txg.br miguel@light. miguel@light.local miguel@sartre.local +mikael@mc04.(none) mikron@c-fb0ae253.1238-1-64736c10.cust.bredbandsbolaget.se mikron@mikael-ronstr-ms-dator.local mmatthew@markslaptop. @@ -158,6 +159,7 @@ ram@ram.(none) ranger@regul.home.lan rburnett@build.mysql.com root@home.(none) +root@mc04.(none) root@x3.internalnet salle@banica.(none) salle@geopard.(none) diff --git a/acinclude.m4 b/acinclude.m4 index dff3b22ecec..c73f14b638c 100644 --- a/acinclude.m4 +++ b/acinclude.m4 @@ -1551,16 +1551,43 @@ dnl Sets HAVE_NDBCLUSTER_DB if --with-ndbcluster is used dnl --------------------------------------------------------------------------- AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [ + AC_ARG_WITH([ndb-sci], + AC_HELP_STRING([--with-ndb-sci=DIR], + [Provide MySQL with a custom location of + sci library. Given DIR, sci library is + assumed to be in $DIR/lib and header files + in $DIR/include.]), + [mysql_sci_dir=${withval}], + [mysql_sci_dir=""]) + + case "$mysql_sci_dir" in + "no" ) + have_ndb_sci=no + AC_MSG_RESULT([-- not including sci transporter]) + ;; + * ) + if test -f "$mysql_sci_dir/lib/libsisci.a" -a \ + -f "$mysql_sci_dir/include/sisci_api.h"; then + NDB_SCI_INCLUDES="-I$mysql_sci_dir/include" + NDB_SCI_LIBS="-L$mysql_sci_dir/lib -lsisci" + AC_MSG_RESULT([-- including sci transporter]) + AC_DEFINE([NDB_SCI_TRANSPORTER], [1], + [Including Ndb Cluster DB sci transporter]) + AC_SUBST(NDB_SCI_INCLUDES) + AC_SUBST(NDB_SCI_LIBS) + have_ndb_sci="yes" + AC_MSG_RESULT([found sci transporter in $mysql_sci_dir/{include, lib}]) + else + AC_MSG_RESULT([could not find sci transporter in $mysql_sci_dir/{include, lib}]) + fi + ;; + esac + AC_ARG_WITH([ndb-shm], [ --with-ndb-shm Include the NDB Cluster shared memory transporter], [ndb_shm="$withval"], [ndb_shm=no]) - AC_ARG_WITH([ndb-sci], - [ - --with-ndb-sci Include the NDB Cluster sci transporter], - [ndb_sci="$withval"], - [ndb_sci=no]) AC_ARG_WITH([ndb-test], [ --with-ndb-test Include the NDB Cluster ndbapi test programs], @@ -1593,19 +1620,6 @@ AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [ ;; esac - have_ndb_sci=no - case "$ndb_sci" in - yes ) - AC_MSG_RESULT([-- including sci transporter]) - AC_DEFINE([NDB_SCI_TRANSPORTER], [1], - [Including Ndb Cluster DB sci transporter]) - have_ndb_sci="yes" - ;; - * ) - AC_MSG_RESULT([-- not including sci transporter]) - ;; - esac - have_ndb_test=no case "$ndb_test" in yes ) diff --git a/configure.in b/configure.in index 9e23b6cf61c..bc05940b018 100644 --- a/configure.in +++ b/configure.in @@ -3024,11 +3024,11 @@ AC_SUBST([ndb_port_base]) ndb_transporter_opt_objs="" if test X"$have_ndb_shm" = Xyes then - ndb_transporter_opt_objs="$(ndb_transporter_opt_objs) SHM_Transporter.lo SHM_Transporter.unix.lo" + ndb_transporter_opt_objs="$ndb_transporter_opt_objs SHM_Transporter.lo SHM_Transporter.unix.lo" fi if test X"$have_ndb_sci" = Xyes then - ndb_transporter_opt_objs="$(ndb_transporter_opt_objs) SCI_Transporter.lo" + ndb_transporter_opt_objs="$ndb_transporter_opt_objs SCI_Transporter.lo" fi AC_SUBST([ndb_transporter_opt_objs]) diff --git a/ndb/config/type_ndbapitest.mk.am b/ndb/config/type_ndbapitest.mk.am index 8ac39aec8cf..f1fd8286337 100644 --- a/ndb/config/type_ndbapitest.mk.am +++ b/ndb/config/type_ndbapitest.mk.am @@ -3,7 +3,7 @@ LDADD += $(top_builddir)/ndb/test/src/libNDBT.a \ $(top_builddir)/ndb/src/libndbclient.la \ $(top_builddir)/dbug/libdbug.a \ $(top_builddir)/mysys/libmysys.a \ - $(top_builddir)/strings/libmystrings.a + $(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@ INCLUDES += -I$(srcdir) -I$(top_srcdir)/include \ -I$(top_srcdir)/ndb/include \ diff --git a/ndb/config/type_ndbapitools.mk.am b/ndb/config/type_ndbapitools.mk.am index 3b5d40874b2..ed6d8699e05 100644 --- a/ndb/config/type_ndbapitools.mk.am +++ b/ndb/config/type_ndbapitools.mk.am @@ -3,7 +3,7 @@ LDADD += \ $(top_builddir)/ndb/src/libndbclient.la \ $(top_builddir)/dbug/libdbug.a \ $(top_builddir)/mysys/libmysys.a \ - $(top_builddir)/strings/libmystrings.a + $(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@ INCLUDES += -I$(srcdir) -I$(top_srcdir)/include \ -I$(top_srcdir)/ndb/include \ diff --git a/ndb/include/mgmapi/mgmapi_config_parameters.h b/ndb/include/mgmapi/mgmapi_config_parameters.h index 4a4863298dd..68eff84dd03 100644 --- a/ndb/include/mgmapi/mgmapi_config_parameters.h +++ b/ndb/include/mgmapi/mgmapi_config_parameters.h @@ -117,16 +117,14 @@ #define CFG_SHM_KEY 502 #define CFG_SHM_BUFFER_MEM 503 -#define CFG_SCI_ID_0 550 -#define CFG_SCI_ID_1 551 -#define CFG_SCI_SEND_LIMIT 552 -#define CFG_SCI_BUFFER_MEM 553 -#define CFG_SCI_NODE1_ADAPTERS 554 -#define CFG_SCI_NODE1_ADAPTER0 555 -#define CFG_SCI_NODE1_ADAPTER1 556 -#define CFG_SCI_NODE2_ADAPTERS 554 -#define CFG_SCI_NODE2_ADAPTER0 555 -#define CFG_SCI_NODE2_ADAPTER1 556 +#define CFG_SCI_HOST1_ID_0 550 +#define CFG_SCI_HOST1_ID_1 551 +#define CFG_SCI_HOST2_ID_0 552 +#define CFG_SCI_HOST2_ID_1 553 +#define CFG_SCI_HOSTNAME_1 554 +#define CFG_SCI_HOSTNAME_2 555 +#define CFG_SCI_SEND_LIMIT 556 +#define CFG_SCI_BUFFER_MEM 557 #define CFG_OSE_HOSTNAME_1 600 #define CFG_OSE_HOSTNAME_2 601 diff --git a/ndb/include/transporter/TransporterDefinitions.hpp b/ndb/include/transporter/TransporterDefinitions.hpp index 445e8b889d2..a8da8068552 100644 --- a/ndb/include/transporter/TransporterDefinitions.hpp +++ b/ndb/include/transporter/TransporterDefinitions.hpp @@ -59,8 +59,6 @@ struct TCP_TransporterConfiguration { NodeId localNodeId; Uint32 sendBufferSize; // Size of SendBuffer of priority B Uint32 maxReceiveSize; // Maximum no of bytes to receive - Uint32 byteOrder; - bool compression; bool checksum; bool signalId; }; @@ -72,10 +70,8 @@ struct SHM_TransporterConfiguration { Uint32 port; NodeId remoteNodeId; NodeId localNodeId; - bool compression; bool checksum; bool signalId; - int byteOrder; Uint32 shmKey; Uint32 shmSize; @@ -89,10 +85,8 @@ struct OSE_TransporterConfiguration { const char *localHostName; NodeId remoteNodeId; NodeId localNodeId; - bool compression; bool checksum; bool signalId; - int byteOrder; Uint32 prioASignalSize; Uint32 prioBSignalSize; @@ -103,20 +97,20 @@ struct OSE_TransporterConfiguration { * SCI Transporter Configuration */ struct SCI_TransporterConfiguration { + const char *remoteHostName; + const char *localHostName; + Uint32 port; Uint32 sendLimit; // Packet size Uint32 bufferSize; // Buffer size Uint32 nLocalAdapters; // 1 or 2, the number of adapters on local host - Uint32 nRemoteAdapters; Uint32 remoteSciNodeId0; // SCInodeId for adapter 1 Uint32 remoteSciNodeId1; // SCInodeId for adapter 2 NodeId localNodeId; // Local node Id NodeId remoteNodeId; // Remote node Id - Uint32 byteOrder; - bool compression; bool checksum; bool signalId; diff --git a/ndb/src/common/mgmcommon/IPCConfig.cpp b/ndb/src/common/mgmcommon/IPCConfig.cpp index a76c541f3f6..83aa3e88b41 100644 --- a/ndb/src/common/mgmcommon/IPCConfig.cpp +++ b/ndb/src/common/mgmcommon/IPCConfig.cpp @@ -133,7 +133,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){ Uint32 compression; Uint32 checksum; if(!tmp->get("SendSignalId", &sendSignalId)) continue; - if(!tmp->get("Compression", &compression)) continue; if(!tmp->get("Checksum", &checksum)) continue; const char * type; @@ -143,8 +142,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){ SHM_TransporterConfiguration conf; conf.localNodeId = the_ownId; conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2); - conf.byteOrder = 0; - conf.compression = compression; conf.checksum = checksum; conf.signalId = sendSignalId; @@ -164,8 +161,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){ SCI_TransporterConfiguration conf; conf.localNodeId = the_ownId; conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2); - conf.byteOrder = 0; - conf.compression = compression; conf.checksum = checksum; conf.signalId = sendSignalId; @@ -174,18 +169,16 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){ if(the_ownId == nodeId1){ if(!tmp->get("Node1_NoOfAdapters", &conf.nLocalAdapters)) continue; - if(!tmp->get("Node2_NoOfAdapters", &conf.nRemoteAdapters)) continue; if(!tmp->get("Node2_Adapter", 0, &conf.remoteSciNodeId0)) continue; - if(conf.nRemoteAdapters > 1){ + if(conf.nLocalAdapters > 1){ if(!tmp->get("Node2_Adapter", 1, &conf.remoteSciNodeId1)) continue; } } else { if(!tmp->get("Node2_NoOfAdapters", &conf.nLocalAdapters)) continue; - if(!tmp->get("Node1_NoOfAdapters", &conf.nRemoteAdapters)) continue; if(!tmp->get("Node1_Adapter", 0, &conf.remoteSciNodeId0)) continue; - if(conf.nRemoteAdapters > 1){ + if(conf.nLocalAdapters > 1){ if(!tmp->get("Node1_Adapter", 1, &conf.remoteSciNodeId1)) continue; } } @@ -243,8 +236,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){ conf.localHostName = ownHostName; conf.remoteNodeId = remoteNodeId; conf.localNodeId = ownNodeId; - conf.byteOrder = 0; - conf.compression = compression; conf.checksum = checksum; conf.signalId = sendSignalId; @@ -270,8 +261,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){ conf.localHostName = ownHostName; conf.remoteNodeId = remoteNodeId; conf.localNodeId = ownNodeId; - conf.byteOrder = 0; - conf.compression = compression; conf.checksum = checksum; conf.signalId = sendSignalId; @@ -344,6 +333,7 @@ Uint32 IPCConfig::configureTransporters(Uint32 nodeId, const class ndb_mgm_configuration & config, class TransporterRegistry & tr){ + DBUG_ENTER("IPCConfig::configureTransporters"); Uint32 noOfTransportersCreated= 0, server_port= 0; ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION); @@ -374,14 +364,13 @@ IPCConfig::configureTransporters(Uint32 nodeId, } server_port= tmp_server_port; } - + DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d", + nodeId, remoteNodeId, tmp_server_port, sendSignalId, checksum)); switch(type){ case CONNECTION_TYPE_SHM:{ SHM_TransporterConfiguration conf; conf.localNodeId = nodeId; conf.remoteNodeId = remoteNodeId; - conf.byteOrder = 0; - conf.compression = 0; conf.checksum = checksum; conf.signalId = sendSignalId; @@ -391,45 +380,60 @@ IPCConfig::configureTransporters(Uint32 nodeId, conf.port= tmp_server_port; if(!tr.createTransporter(&conf)){ + DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d", + conf.localNodeId, conf.remoteNodeId)); ndbout << "Failed to create SHM Transporter from: " << conf.localNodeId << " to: " << conf.remoteNodeId << endl; } else { noOfTransportersCreated++; } + DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, buf size = %d", + conf.shmKey, conf.shmSize)); break; } case CONNECTION_TYPE_SCI:{ SCI_TransporterConfiguration conf; + const char * host1, * host2; conf.localNodeId = nodeId; conf.remoteNodeId = remoteNodeId; - conf.byteOrder = 0; - conf.compression = 0; conf.checksum = checksum; conf.signalId = sendSignalId; + conf.port= tmp_server_port; + if(iter.get(CFG_SCI_HOSTNAME_1, &host1)) break; + if(iter.get(CFG_SCI_HOSTNAME_2, &host2)) break; + + conf.localHostName = (nodeId == nodeId1 ? host1 : host2); + conf.remoteHostName = (nodeId == nodeId1 ? host2 : host1); + if(iter.get(CFG_SCI_SEND_LIMIT, &conf.sendLimit)) break; if(iter.get(CFG_SCI_BUFFER_MEM, &conf.bufferSize)) break; - - if(nodeId == nodeId1){ - if(iter.get(CFG_SCI_NODE1_ADAPTERS, &conf.nLocalAdapters)) break; - if(iter.get(CFG_SCI_NODE2_ADAPTERS, &conf.nRemoteAdapters)) break; - if(iter.get(CFG_SCI_NODE2_ADAPTER0, &conf.remoteSciNodeId0)) break; - if(conf.nRemoteAdapters > 1){ - if(iter.get(CFG_SCI_NODE2_ADAPTER1, &conf.remoteSciNodeId1)) break; - } + if (nodeId == nodeId1) { + if(iter.get(CFG_SCI_HOST2_ID_0, &conf.remoteSciNodeId0)) break; + if(iter.get(CFG_SCI_HOST2_ID_1, &conf.remoteSciNodeId1)) break; } else { - if(iter.get(CFG_SCI_NODE2_ADAPTERS, &conf.nLocalAdapters)) break; - if(iter.get(CFG_SCI_NODE1_ADAPTERS, &conf.nRemoteAdapters)) break; - if(iter.get(CFG_SCI_NODE1_ADAPTER0, &conf.remoteSciNodeId0)) break; - if(conf.nRemoteAdapters > 1){ - if(iter.get(CFG_SCI_NODE1_ADAPTER1, &conf.remoteSciNodeId1)) break; - } + if(iter.get(CFG_SCI_HOST1_ID_0, &conf.remoteSciNodeId0)) break; + if(iter.get(CFG_SCI_HOST1_ID_1, &conf.remoteSciNodeId1)) break; } - - if(!tr.createTransporter(&conf)){ + if (conf.remoteSciNodeId1 == 0) { + conf.nLocalAdapters = 1; + } else { + conf.nLocalAdapters = 2; + } + if(!tr.createTransporter(&conf)){ + DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d", + conf.localNodeId, conf.remoteNodeId)); ndbout << "Failed to create SCI Transporter from: " << conf.localNodeId << " to: " << conf.remoteNodeId << endl; } else { + DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, remote SCI node id %d", + conf.nLocalAdapters, conf.remoteSciNodeId0)); + DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, buf size = %d", + conf.localHostName, conf.remoteHostName, conf.sendLimit, conf.bufferSize)); + if (conf.nLocalAdapters > 1) { + DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, second remote SCI node id = %d", + conf.remoteSciNodeId1)); + } noOfTransportersCreated++; continue; } @@ -457,8 +461,6 @@ IPCConfig::configureTransporters(Uint32 nodeId, conf.remoteNodeId = remoteNodeId; conf.localHostName = (nodeId == nodeId1 ? host1 : host2); conf.remoteHostName = (nodeId == nodeId1 ? host2 : host1); - conf.byteOrder = 0; - conf.compression = 0; conf.checksum = checksum; conf.signalId = sendSignalId; @@ -468,6 +470,9 @@ IPCConfig::configureTransporters(Uint32 nodeId, } else { noOfTransportersCreated++; } + DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, maxReceiveSize = %d", + conf.sendBufferSize, conf.maxReceiveSize)); + break; case CONNECTION_TYPE_OSE:{ OSE_TransporterConfiguration conf; @@ -483,8 +488,6 @@ IPCConfig::configureTransporters(Uint32 nodeId, conf.remoteNodeId = remoteNodeId; conf.localHostName = (nodeId == nodeId1 ? host1 : host2); conf.remoteHostName = (nodeId == nodeId1 ? host2 : host1); - conf.byteOrder = 0; - conf.compression = 0; conf.checksum = checksum; conf.signalId = sendSignalId; @@ -505,6 +508,6 @@ IPCConfig::configureTransporters(Uint32 nodeId, tr.m_service_port= server_port; - return noOfTransportersCreated; + DBUG_RETURN(noOfTransportersCreated); } diff --git a/ndb/src/common/transporter/Makefile.am b/ndb/src/common/transporter/Makefile.am index 218b261606d..9d91a210d46 100644 --- a/ndb/src/common/transporter/Makefile.am +++ b/ndb/src/common/transporter/Makefile.am @@ -13,7 +13,7 @@ EXTRA_libtransporter_la_SOURCES = SHM_Transporter.cpp SHM_Transporter.unix.cpp S libtransporter_la_LIBADD = @ndb_transporter_opt_objs@ libtransporter_la_DEPENDENCIES = @ndb_transporter_opt_objs@ -INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel -I$(top_srcdir)/ndb/include/transporter +INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel -I$(top_srcdir)/ndb/include/transporter @NDB_SCI_INCLUDES@ include $(top_srcdir)/ndb/config/common.mk.am include $(top_srcdir)/ndb/config/type_util.mk.am diff --git a/ndb/src/common/transporter/SCI_Transporter.cpp b/ndb/src/common/transporter/SCI_Transporter.cpp index c52c8a9d8c0..465d7827069 100644 --- a/ndb/src/common/transporter/SCI_Transporter.cpp +++ b/ndb/src/common/transporter/SCI_Transporter.cpp @@ -24,23 +24,30 @@ #include "TransporterInternalDefinitions.hpp" #include <TransporterCallback.hpp> - + +#include <InputStream.hpp> +#include <OutputStream.hpp> + #define FLAGS 0 - -SCI_Transporter::SCI_Transporter(Uint32 packetSize, +#define DEBUG_TRANSPORTER +SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg, + const char *lHostName, + const char *rHostName, + int r_port, + Uint32 packetSize, Uint32 bufferSize, Uint32 nAdapters, Uint16 remoteSciNodeId0, Uint16 remoteSciNodeId1, NodeId _localNodeId, NodeId _remoteNodeId, - int byte_order, - bool compr, bool chksm, bool signalId, Uint32 reportFreq) : - Transporter(_localNodeId, _remoteNodeId, byte_order, compr, chksm, signalId) -{ + Transporter(t_reg, lHostName, rHostName, r_port, _localNodeId, + _remoteNodeId, 0, false, chksm, signalId) +{ + DBUG_ENTER("SCI_Transporter::SCI_Transporter"); m_PacketSize = (packetSize + 3)/4 ; m_BufferSize = bufferSize; m_sendBuffer.m_buffer = NULL; @@ -56,10 +63,6 @@ SCI_Transporter::SCI_Transporter(Uint32 packetSize, m_initLocal=false; - m_remoteNodes= new Uint16[m_numberOfRemoteNodes]; - if(m_remoteNodes == NULL) { - //DO WHAT?? - } m_swapCounter=0; m_failCounter=0; m_remoteNodes[0]=remoteSciNodeId0; @@ -94,20 +97,19 @@ SCI_Transporter::SCI_Transporter(Uint32 packetSize, i4096=0; i4097=0; #endif - + DBUG_VOID_RETURN; } void SCI_Transporter::disconnectImpl() { + DBUG_ENTER("SCI_Transporter::disconnectImpl"); sci_error_t err; if(m_mapped){ setDisconnect(); -#ifdef DEBUG_TRANSPORTER - ndbout << "DisconnectImpl " << getConnectionStatus() << endl; - ndbout << "remote node " << remoteNodeId << endl; -#endif + DBUG_PRINT("info", ("connect status = %d, remote node = %d", + (int)getConnectionStatus(), remoteNodeId)); disconnectRemote(); disconnectLocal(); } @@ -124,65 +126,56 @@ void SCI_Transporter::disconnectImpl() SCIClose(sciAdapters[i].scidesc, FLAGS, &err); if(err != SCI_ERR_OK) { - reportError(callbackObj, localNodeId, TE_SCI_UNABLE_TO_CLOSE_CHANNEL); -#ifdef DEBUG_TRANSPORTER - fprintf(stderr, - "\nCannot close channel to the driver. Error code 0x%x", - err); -#endif - } + report_error(TE_SCI_UNABLE_TO_CLOSE_CHANNEL); + DBUG_PRINT("error", ("Cannot close channel to the driver. Error code 0x%x", + err)); + } } } m_sciinit=false; #ifdef DEBUG_TRANSPORTER - ndbout << "total: " << i1024+ i10242048 + i2048+i2049 << endl; + ndbout << "total: " << i1024+ i10242048 + i2048+i2049 << endl; ndbout << "<1024: " << i1024 << endl; ndbout << "1024-2047: " << i10242048 << endl; ndbout << "==2048: " << i2048 << endl; ndbout << "2049-4096: " << i20484096 << endl; ndbout << "==4096: " << i4096 << endl; ndbout << ">4096: " << i4097 << endl; - #endif - + DBUG_VOID_RETURN; } bool SCI_Transporter::initTransporter() { - if(m_BufferSize < (2*MAX_MESSAGE_SIZE)){ - m_BufferSize = 2 * MAX_MESSAGE_SIZE; + DBUG_ENTER("SCI_Transporter::initTransporter"); + if(m_BufferSize < (2*MAX_MESSAGE_SIZE + 4096)){ + m_BufferSize = 2 * MAX_MESSAGE_SIZE + 4096; } - // Allocate buffers for sending - Uint32 sz = 0; - if(m_BufferSize < (m_PacketSize * 4)){ - sz = m_BufferSize + MAX_MESSAGE_SIZE; - } else { - /** - * 3 packages - */ - sz = (m_PacketSize * 4) * 3 + MAX_MESSAGE_SIZE; - } + // Allocate buffers for sending, send buffer size plus 2048 bytes for avoiding + // the need to send twice when a large message comes around. Send buffer size is + // measured in words. + Uint32 sz = 4 * m_PacketSize + MAX_MESSAGE_SIZE;; - m_sendBuffer.m_bufferSize = 4 * ((sz + 3) / 4); - m_sendBuffer.m_buffer = new Uint32[m_sendBuffer.m_bufferSize / 4]; + m_sendBuffer.m_sendBufferSize = 4 * ((sz + 3) / 4); + m_sendBuffer.m_buffer = new Uint32[m_sendBuffer.m_sendBufferSize / 4]; m_sendBuffer.m_dataSize = 0; - + + DBUG_PRINT("info", ("Created SCI Send Buffer with buffer size %d and packet size %d", + m_sendBuffer.m_sendBufferSize, m_PacketSize * 4)); if(!getLinkStatus(m_ActiveAdapterId) || - !getLinkStatus(m_StandbyAdapterId)) { -#ifdef DEBUG_TRANSPORTER - ndbout << "The link is not fully operational. " << endl; - ndbout << "Check the cables and the switches" << endl; -#endif + (m_adapters > 1 && + !getLinkStatus(m_StandbyAdapterId))) { + DBUG_PRINT("error", ("The link is not fully operational. Check the cables and the switches")); //reportDisconnect(remoteNodeId, 0); //doDisconnect(); //NDB should terminate - reportError(callbackObj, localNodeId, TE_SCI_LINK_ERROR); - return false; + report_error(TE_SCI_LINK_ERROR); + DBUG_RETURN(false); } - return true; + DBUG_RETURN(true); } // initTransporter() @@ -218,10 +211,8 @@ bool SCI_Transporter::getLinkStatus(Uint32 adapterNo) SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error); if(error != SCI_ERR_OK) { -#ifdef DEBUG_TRANSPORTER - ndbout << "error querying adapter " << endl; -#endif - return false; + DBUG_PRINT("error", ("error %d querying adapter", error)); + return false; } if(linkstatus<=0) return false; @@ -231,6 +222,7 @@ bool SCI_Transporter::getLinkStatus(Uint32 adapterNo) sci_error_t SCI_Transporter::initLocalSegment() { + DBUG_ENTER("SCI_Transporter::initLocalSegment"); Uint32 segmentSize = m_BufferSize; Uint32 offset = 0; sci_error_t err; @@ -238,16 +230,12 @@ sci_error_t SCI_Transporter::initLocalSegment() { for(Uint32 i=0; i<m_adapters ; i++) { SCIOpen(&(sciAdapters[i].scidesc), FLAGS, &err); sciAdapters[i].localSciNodeId=getLocalNodeId(i); -#ifdef DEBUG_TRANSPORTER - ndbout_c("SCInode iD %d adapter %d\n", - sciAdapters[i].localSciNodeId, i); -#endif + DBUG_PRINT("info", ("SCInode iD %d adapter %d\n", + sciAdapters[i].localSciNodeId, i)); if(err != SCI_ERR_OK) { -#ifdef DEBUG_TRANSPORTER - ndbout_c("\nCannot open an SCI virtual device. Error code 0x%x", - err); -#endif - return err; + DBUG_PRINT("error", ("Cannot open an SCI virtual device. Error code 0x%x", + err)); + DBUG_RETURN(err); } } } @@ -264,12 +252,11 @@ sci_error_t SCI_Transporter::initLocalSegment() { &err); if(err != SCI_ERR_OK) { - return err; + DBUG_PRINT("error", ("Error creating segment, err = 0x%x", err)); + DBUG_RETURN(err); } else { -#ifdef DEBUG_TRANSPORTER - ndbout << "created segment id : " - << hostSegmentId(localNodeId, remoteNodeId) << endl; -#endif + DBUG_PRINT("info", ("created segment id : %d", + hostSegmentId(localNodeId, remoteNodeId))); } /** Prepare the segment*/ @@ -280,11 +267,9 @@ sci_error_t SCI_Transporter::initLocalSegment() { &err); if(err != SCI_ERR_OK) { -#ifdef DEBUG_TRANSPORTER - ndbout_c("Local Segment is not accessible by an SCI adapter."); - ndbout_c("Error code 0x%x\n", err); -#endif - return err; + DBUG_PRINT("error", ("Local Segment is not accessible by an SCI adapter. Error code 0x%x\n", + err)); + DBUG_RETURN(err); } } @@ -301,14 +286,10 @@ sci_error_t SCI_Transporter::initLocalSegment() { if(err != SCI_ERR_OK) { - -#ifdef DEBUG_TRANSPORTER - fprintf(stderr, "\nCannot map area of size %d. Error code 0x%x", - segmentSize,err); - ndbout << "initLocalSegment does a disConnect" << endl; -#endif + DBUG_PRINT("error", ("Cannot map area of size %d. Error code 0x%x", + segmentSize,err)); doDisconnect(); - return err; + DBUG_RETURN(err); } @@ -320,18 +301,16 @@ sci_error_t SCI_Transporter::initLocalSegment() { &err); if(err != SCI_ERR_OK) { -#ifdef DEBUG_TRANSPORTER - ndbout_c("\nLocal Segment is not available for remote connections."); - ndbout_c("Error code 0x%x\n", err); -#endif - return err; + DBUG_PRINT("error", ("Local Segment is not available for remote connections. Error code 0x%x\n", + err)); + DBUG_RETURN(err); } } setupLocalSegment(); - return err; + DBUG_RETURN(err); } // initLocalSegment() @@ -345,7 +324,7 @@ bool SCI_Transporter::doSend() { Uint32 retry=0; const char * const sendPtr = (char*)m_sendBuffer.m_buffer; - const Uint32 sizeToSend = m_sendBuffer.m_dataSize; + const Uint32 sizeToSend = 4 * m_sendBuffer.m_dataSize; //Convert to number of bytes if (sizeToSend > 0){ #ifdef DEBUG_TRANSPORTER @@ -363,15 +342,19 @@ bool SCI_Transporter::doSend() { i4097++; #endif if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { -#ifdef DEBUG_TRANSPORTER - ndbout << "Start sequence failed" << endl; -#endif - reportError(callbackObj, remoteNodeId, TE_SCI_UNABLE_TO_START_SEQUENCE); + DBUG_PRINT("error", ("Start sequence failed")); + report_error(TE_SCI_UNABLE_TO_START_SEQUENCE); return false; } - tryagain: + tryagain: + retry++; + if (retry > 3) { + DBUG_PRINT("error", ("SCI Transfer failed")); + report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); + return false; + } Uint32 * insertPtr = (Uint32 *) (m_TargetSegm[m_ActiveAdapterId].writer)->getWritePtr(sizeToSend); @@ -390,44 +373,37 @@ bool SCI_Transporter::doSend() { &err); + if (err != SCI_ERR_OK) { if(err == SCI_ERR_OUT_OF_RANGE) { -#ifdef DEBUG_TRANSPORTER - ndbout << "Data transfer : out of range error \n" << endl; -#endif + DBUG_PRINT("error", ("Data transfer : out of range error")); goto tryagain; } if(err == SCI_ERR_SIZE_ALIGNMENT) { -#ifdef DEBUG_TRANSPORTER - ndbout << "Data transfer : aligne\n" << endl; -#endif + DBUG_PRINT("error", ("Data transfer : alignment error")); + DBUG_PRINT("info", ("sendPtr 0x%x, sizeToSend = %d", sendPtr, sizeToSend)); goto tryagain; } if(err == SCI_ERR_OFFSET_ALIGNMENT) { -#ifdef DEBUG_TRANSPORTER - ndbout << "Data transfer : offset alignment\n" << endl; -#endif + DBUG_PRINT("error", ("Data transfer : offset alignment")); goto tryagain; - } + } if(err == SCI_ERR_TRANSFER_FAILED) { //(m_TargetSegm[m_StandbyAdapterId].writer)->heavyLock(); if(getLinkStatus(m_ActiveAdapterId)) { - retry++; - if(retry>3) { - reportError(callbackObj, - remoteNodeId, TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); - return false; - } goto tryagain; } + if (m_adapters == 1) { + DBUG_PRINT("error", ("SCI Transfer failed")); + report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); + return false; + } m_failCounter++; Uint32 temp=m_ActiveAdapterId; switch(m_swapCounter) { case 0: /**swap from active (0) to standby (1)*/ if(getLinkStatus(m_StandbyAdapterId)) { -#ifdef DEBUG_TRANSPORTER - ndbout << "Swapping from 0 to 1 " << endl; -#endif + DBUG_PRINT("error", ("Swapping from adapter 0 to 1")); failoverShmWriter(); SCIStoreBarrier(m_TargetSegm[m_StandbyAdapterId].sequence,0); m_ActiveAdapterId=m_StandbyAdapterId; @@ -436,26 +412,21 @@ bool SCI_Transporter::doSend() { FLAGS, &err); if(err!=SCI_ERR_OK) { - reportError(callbackObj, - remoteNodeId, TE_SCI_UNABLE_TO_REMOVE_SEQUENCE); + report_error(TE_SCI_UNABLE_TO_REMOVE_SEQUENCE); + DBUG_PRINT("error", ("Unable to remove sequence")); return false; } if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { -#ifdef DEBUG_TRANSPORTER - ndbout << "Start sequence failed" << endl; -#endif - reportError(callbackObj, - remoteNodeId, TE_SCI_UNABLE_TO_START_SEQUENCE); + DBUG_PRINT("error", ("Start sequence failed")); + report_error(TE_SCI_UNABLE_TO_START_SEQUENCE); return false; } m_swapCounter++; -#ifdef DEBUG_TRANSPORTER - ndbout << "failover complete.." << endl; -#endif + DBUG_PRINT("info", ("failover complete")); goto tryagain; } else { - reportError(callbackObj, - remoteNodeId, TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); + report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); + DBUG_PRINT("error", ("SCI Transfer failed")); return false; } return false; @@ -468,20 +439,15 @@ bool SCI_Transporter::doSend() { failoverShmWriter(); m_ActiveAdapterId=m_StandbyAdapterId; m_StandbyAdapterId=temp; -#ifdef DEBUG_TRANSPORTER - ndbout << "Swapping from 1 to 0 " << endl; -#endif + DBUG_PRINT("info", ("Swapping from 1 to 0")); if(createSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { - reportError(callbackObj, - remoteNodeId, TE_SCI_UNABLE_TO_CREATE_SEQUENCE); + DBUG_PRINT("error", ("Unable to create sequence")); + report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); return false; } if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { -#ifdef DEBUG_TRANSPORTER - ndbout << "startSequence failed... disconnecting" << endl; -#endif - reportError(callbackObj, - remoteNodeId, TE_SCI_UNABLE_TO_START_SEQUENCE); + DBUG_PRINT("error", ("startSequence failed... disconnecting")); + report_error(TE_SCI_UNABLE_TO_START_SEQUENCE); return false; } @@ -489,37 +455,36 @@ bool SCI_Transporter::doSend() { , FLAGS, &err); if(err!=SCI_ERR_OK) { - reportError(callbackObj, - remoteNodeId, TE_SCI_UNABLE_TO_REMOVE_SEQUENCE); + DBUG_PRINT("error", ("Unable to remove sequence")); + report_error(TE_SCI_UNABLE_TO_REMOVE_SEQUENCE); return false; } if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) { - reportError(callbackObj, - remoteNodeId, TE_SCI_UNABLE_TO_CREATE_SEQUENCE); + DBUG_PRINT("error", ("Unable to create sequence on standby")); + report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); return false; } m_swapCounter=0; -#ifdef DEBUG_TRANSPORTER - ndbout << "failover complete.." << endl; -#endif + DBUG_PRINT("info", ("failover complete..")); goto tryagain; } else { - reportError(callbackObj, - remoteNodeId, TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); + DBUG_PRINT("error", ("Unrecoverable data transfer error")); + report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); return false; } break; default: - reportError(callbackObj, - remoteNodeId, TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); + DBUG_PRINT("error", ("Unrecoverable data transfer error")); + report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); return false; break; } + } } else { SHM_Writer * writer = (m_TargetSegm[m_ActiveAdapterId].writer); writer->updateWritePtr(sizeToSend); @@ -535,13 +500,10 @@ bool SCI_Transporter::doSend() { /** * If we end up here, the SCI segment is full. */ -#ifdef DEBUG_TRANSPORTER - ndbout << "the segment is full for some reason" << endl; -#endif + DBUG_PRINT("error", ("the segment is full for some reason")); return false; } //if } - return true; } // doSend() @@ -557,11 +519,8 @@ void SCI_Transporter::failoverShmWriter() { void SCI_Transporter::setupLocalSegment() { - + DBUG_ENTER("SCI_Transporter::setupLocalSegment"); Uint32 sharedSize = 0; - sharedSize += 16; //SHM_Reader::getSharedSize(); - sharedSize += 16; //SHM_Writer::getSharedSize(); - sharedSize += 32; //SHM_Writer::getSharedSize(); sharedSize =4096; //start of the buffer is page aligend Uint32 sizeOfBuffer = m_BufferSize; @@ -570,207 +529,265 @@ void SCI_Transporter::setupLocalSegment() Uint32 * localReadIndex = (Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory; - Uint32 * localWriteIndex = - (Uint32*)(localReadIndex+ 1); - - Uint32 * localEndOfDataIndex = (Uint32*) - (localReadIndex + 2); - + Uint32 * localWriteIndex = (Uint32*)(localReadIndex+ 1); + Uint32 * localEndWriteIndex = (Uint32*)(localReadIndex + 2); m_localStatusFlag = (Uint32*)(localReadIndex + 3); - Uint32 * sharedLockIndex = (Uint32*) - (localReadIndex + 4); - - Uint32 * sharedHeavyLock = (Uint32*) - (localReadIndex + 5); - char * localStartOfBuf = (char*) ((char*)m_SourceSegm[m_ActiveAdapterId].mappedMemory+sharedSize); - - * localReadIndex = * localWriteIndex = 0; - * localEndOfDataIndex = sizeOfBuffer -1; - + * localReadIndex = 0; + * localWriteIndex = 0; + * localEndWriteIndex = 0; + const Uint32 slack = MAX_MESSAGE_SIZE; reader = new SHM_Reader(localStartOfBuf, sizeOfBuffer, slack, localReadIndex, + localEndWriteIndex, localWriteIndex); - * localReadIndex = 0; - * localWriteIndex = 0; - reader->clear(); + DBUG_VOID_RETURN; } //setupLocalSegment void SCI_Transporter::setupRemoteSegment() { + DBUG_ENTER("SCI_Transporter::setupRemoteSegment"); Uint32 sharedSize = 0; - sharedSize += 16; //SHM_Reader::getSharedSize(); - sharedSize += 16; //SHM_Writer::getSharedSize(); - sharedSize += 32; - sharedSize =4096; //start of the buffer is page aligend + sharedSize =4096; //start of the buffer is page aligned Uint32 sizeOfBuffer = m_BufferSize; + const Uint32 slack = MAX_MESSAGE_SIZE; sizeOfBuffer -= sharedSize; - Uint32 * segPtr = (Uint32*) m_TargetSegm[m_StandbyAdapterId].mappedMemory ; - - Uint32 * remoteReadIndex2 = (Uint32*)segPtr; - Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1); - Uint32 * remoteEndOfDataIndex2 = (Uint32*) (segPtr + 2); - Uint32 * sharedLockIndex2 = (Uint32*) (segPtr + 3); - m_remoteStatusFlag2 = (Uint32*)(segPtr + 4); - Uint32 * sharedHeavyLock2 = (Uint32*) (segPtr + 5); - - - char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize); - - segPtr = (Uint32*) m_TargetSegm[m_ActiveAdapterId].mappedMemory ; + + Uint32 *segPtr = (Uint32*) m_TargetSegm[m_ActiveAdapterId].mappedMemory ; Uint32 * remoteReadIndex = (Uint32*)segPtr; - Uint32 * remoteWriteIndex = (Uint32*) (segPtr + 1); - Uint32 * remoteEndOfDataIndex = (Uint32*) (segPtr + 2); - Uint32 * sharedLockIndex = (Uint32*) (segPtr + 3); - m_remoteStatusFlag = (Uint32*)(segPtr + 4); - Uint32 * sharedHeavyLock = (Uint32*) (segPtr + 5); + Uint32 * remoteWriteIndex = (Uint32*)(segPtr + 1); + Uint32 * remoteEndWriteIndex = (Uint32*) (segPtr + 2); + m_remoteStatusFlag = (Uint32*)(segPtr + 3); char * remoteStartOfBuf = ( char*)((char*)segPtr+(sharedSize)); - * remoteReadIndex = * remoteWriteIndex = 0; - * remoteReadIndex2 = * remoteWriteIndex2 = 0; - * remoteEndOfDataIndex = sizeOfBuffer - 1; - * remoteEndOfDataIndex2 = sizeOfBuffer - 1; - - /** - * setup two writers. writer2 is used to mirror the changes of - * writer on the standby - * segment, so that in the case of a failover, we can switch - * to the stdby seg. quickly.* - */ - const Uint32 slack = MAX_MESSAGE_SIZE; - writer = new SHM_Writer(remoteStartOfBuf, sizeOfBuffer, slack, remoteReadIndex, + remoteEndWriteIndex, remoteWriteIndex); - writer2 = new SHM_Writer(remoteStartOfBuf2, - sizeOfBuffer, - slack, - remoteReadIndex2, - remoteWriteIndex2); - - * remoteReadIndex = 0; - * remoteWriteIndex = 0; - writer->clear(); - writer2->clear(); m_TargetSegm[0].writer=writer; - m_TargetSegm[1].writer=writer2; m_sendBuffer.m_forceSendLimit = writer->getBufferSize(); if(createSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { - reportThreadError(remoteNodeId, TE_SCI_UNABLE_TO_CREATE_SEQUENCE); + report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); + DBUG_PRINT("error", ("Unable to create sequence on active")); doDisconnect(); } - if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) { - reportThreadError(remoteNodeId, TE_SCI_UNABLE_TO_CREATE_SEQUENCE); - doDisconnect(); - } - - + if (m_adapters > 1) { + segPtr = (Uint32*) m_TargetSegm[m_StandbyAdapterId].mappedMemory ; + + Uint32 * remoteReadIndex2 = (Uint32*)segPtr; + Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1); + Uint32 * remoteEndWriteIndex2 = (Uint32*) (segPtr + 2); + m_remoteStatusFlag2 = (Uint32*)(segPtr + 3); + + char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize); + + /** + * setup a writer. writer2 is used to mirror the changes of + * writer on the standby + * segment, so that in the case of a failover, we can switch + * to the stdby seg. quickly.* + */ + writer2 = new SHM_Writer(remoteStartOfBuf2, + sizeOfBuffer, + slack, + remoteReadIndex2, + remoteEndWriteIndex2, + remoteWriteIndex2); + + * remoteReadIndex = 0; + * remoteWriteIndex = 0; + * remoteEndWriteIndex = 0; + writer2->clear(); + m_TargetSegm[1].writer=writer2; + if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) { + report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); + DBUG_PRINT("error", ("Unable to create sequence on standby")); + doDisconnect(); + } + } + DBUG_VOID_RETURN; } //setupRemoteSegment - - -bool SCI_Transporter::connectImpl(Uint32 timeout) { - - sci_error_t err; - Uint32 offset = 0; - + +bool +SCI_Transporter::init_local() +{ + DBUG_ENTER("SCI_Transporter::init_local"); if(!m_initLocal) { if(initLocalSegment()!=SCI_ERR_OK){ - NdbSleep_MilliSleep(timeout); + NdbSleep_MilliSleep(10); //NDB SHOULD TERMINATE AND COMPUTER REBOOTED! - reportThreadError(localNodeId, TE_SCI_CANNOT_INIT_LOCALSEGMENT); - return false; + report_error(TE_SCI_CANNOT_INIT_LOCALSEGMENT); + DBUG_RETURN(false); } - m_initLocal=true; + m_initLocal=true; } - - if(!m_mapped ) { - - for(Uint32 i=0; i < m_adapters ; i++) { - m_TargetSegm[i].rhm[i].remoteHandle=0; - SCIConnectSegment(sciAdapters[i].scidesc, - &(m_TargetSegm[i].rhm[i].remoteHandle), - m_remoteNodes[i], - remoteSegmentId(localNodeId, remoteNodeId), - i, - 0, - 0, - 0, - 0, - &err); - - if(err != SCI_ERR_OK) { - NdbSleep_MilliSleep(timeout); - return false; - } - - } - - + DBUG_RETURN(true); +} + +bool +SCI_Transporter::init_remote() +{ + DBUG_ENTER("SCI_Transporter::init_remote"); + sci_error_t err; + Uint32 offset = 0; + if(!m_mapped ) { + DBUG_PRINT("info", ("Map remote segments")); + for(Uint32 i=0; i < m_adapters ; i++) { + m_TargetSegm[i].rhm[i].remoteHandle=0; + SCIConnectSegment(sciAdapters[i].scidesc, + &(m_TargetSegm[i].rhm[i].remoteHandle), + m_remoteNodes[i], + remoteSegmentId(localNodeId, remoteNodeId), + i, + 0, + 0, + 0, + 0, + &err); + + if(err != SCI_ERR_OK) { + NdbSleep_MilliSleep(10); + DBUG_PRINT("error", ("Error connecting segment, err 0x%x", err)); + DBUG_RETURN(false); + } + + } // Map the remote memory segment into program space - for(Uint32 i=0; i < m_adapters ; i++) { - m_TargetSegm[i].mappedMemory = - SCIMapRemoteSegment((m_TargetSegm[i].rhm[i].remoteHandle), - &(m_TargetSegm[i].rhm[i].map), - offset, - m_BufferSize, - NULL, - FLAGS, - &err); - - - if(err!= SCI_ERR_OK) { -#ifdef DEBUG_TRANSPORTER - ndbout_c("\nCannot map a segment to the remote node %d."); - ndbout_c("Error code 0x%x",m_RemoteSciNodeId, err); -#endif - //NDB SHOULD TERMINATE AND COMPUTER REBOOTED! - reportThreadError(remoteNodeId, TE_SCI_CANNOT_MAP_REMOTESEGMENT); - return false; - } - - - } - m_mapped=true; - setupRemoteSegment(); - setConnected(); -#ifdef DEBUG_TRANSPORTER - ndbout << "connected and mapped to segment : " << endl; - ndbout << "remoteNode: " << m_remoteNodes[0] << endl; - ndbout << "remoteNode: " << m_remotenodes[1] << endl; - ndbout << "remoteSegId: " - << remoteSegmentId(localNodeId, remoteNodeId) - << endl; -#endif - return true; - } - else { - return getConnectionStatus(); - } -} // connectImpl() - + for(Uint32 i=0; i < m_adapters ; i++) { + m_TargetSegm[i].mappedMemory = + SCIMapRemoteSegment((m_TargetSegm[i].rhm[i].remoteHandle), + &(m_TargetSegm[i].rhm[i].map), + offset, + m_BufferSize, + NULL, + FLAGS, + &err); + + if(err!= SCI_ERR_OK) { + DBUG_PRINT("error", ("Cannot map a segment to the remote node %d. Error code 0x%x",m_RemoteSciNodeId, err)); + //NDB SHOULD TERMINATE AND COMPUTER REBOOTED! + report_error(TE_SCI_CANNOT_MAP_REMOTESEGMENT); + DBUG_RETURN(false); + } + } + m_mapped=true; + setupRemoteSegment(); + setConnected(); + DBUG_PRINT("info", ("connected and mapped to segment, remoteNode: %d", + remoteNodeId)); + DBUG_PRINT("info", ("remoteSegId: %d", + remoteSegmentId(localNodeId, remoteNodeId))); + DBUG_RETURN(true); + } else { + DBUG_RETURN(getConnectionStatus()); + } +} + +bool +SCI_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) +{ + SocketInputStream s_input(sockfd); + SocketOutputStream s_output(sockfd); + char buf[256]; + DBUG_ENTER("SCI_Transporter::connect_client_impl"); + // Wait for server to create and attach + if (s_input.gets(buf, 256) == 0) { + DBUG_PRINT("error", ("No initial response from server in SCI")); + NDB_CLOSE_SOCKET(sockfd); + DBUG_RETURN(false); + } + + if (!init_local()) { + NDB_CLOSE_SOCKET(sockfd); + DBUG_RETURN(false); + } + + // Send ok to server + s_output.println("sci client 1 ok"); + + if (!init_remote()) { + NDB_CLOSE_SOCKET(sockfd); + DBUG_RETURN(false); + } + // Wait for ok from server + if (s_input.gets(buf, 256) == 0) { + DBUG_PRINT("error", ("No second response from server in SCI")); + NDB_CLOSE_SOCKET(sockfd); + DBUG_RETURN(false); + } + // Send ok to server + s_output.println("sci client 2 ok"); + + NDB_CLOSE_SOCKET(sockfd); + DBUG_PRINT("info", ("Successfully connected client to node %d", + remoteNodeId)); + DBUG_RETURN(true); +} + +bool +SCI_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) +{ + SocketOutputStream s_output(sockfd); + SocketInputStream s_input(sockfd); + char buf[256]; + DBUG_ENTER("SCI_Transporter::connect_server_impl"); + + if (!init_local()) { + NDB_CLOSE_SOCKET(sockfd); + DBUG_RETURN(false); + } + // Send ok to client + s_output.println("sci server 1 ok"); + + // Wait for ok from client + if (s_input.gets(buf, 256) == 0) { + DBUG_PRINT("error", ("No response from client in SCI")); + NDB_CLOSE_SOCKET(sockfd); + DBUG_RETURN(false); + } + + if (!init_remote()) { + NDB_CLOSE_SOCKET(sockfd); + DBUG_RETURN(false); + } + // Send ok to client + s_output.println("sci server 2 ok"); + // Wait for ok from client + if (s_input.gets(buf, 256) == 0) { + DBUG_PRINT("error", ("No second response from client in SCI")); + NDB_CLOSE_SOCKET(sockfd); + DBUG_RETURN(false); + } + + NDB_CLOSE_SOCKET(sockfd); + DBUG_PRINT("info", ("Successfully connected server to node %d", + remoteNodeId)); + DBUG_RETURN(true); +} + sci_error_t SCI_Transporter::createSequence(Uint32 adapterid) { sci_error_t err; SCICreateMapSequence((m_TargetSegm[adapterid].rhm[adapterid].map), @@ -795,13 +812,14 @@ sci_error_t SCI_Transporter::startSequence(Uint32 adapterid) { // If there still is an error then data cannot be safely send - return err; + return err; } // startSequence() bool SCI_Transporter::disconnectLocal() -{ +{ + DBUG_ENTER("SCI_Transporter::disconnectLocal"); sci_error_t err; m_ActiveAdapterId=0; @@ -809,31 +827,28 @@ bool SCI_Transporter::disconnectLocal() */ SCIUnmapSegment(m_SourceSegm[0].lhm[0].map,0,&err); - if(err!=SCI_ERR_OK) { - reportError(callbackObj, - remoteNodeId, TE_SCI_UNABLE_TO_UNMAP_SEGMENT); - return false; - } + if(err!=SCI_ERR_OK) { + report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT); + DBUG_PRINT("error", ("Unable to unmap segment")); + DBUG_RETURN(false); + } SCIRemoveSegment((m_SourceSegm[m_ActiveAdapterId].localHandle), FLAGS, &err); if(err!=SCI_ERR_OK) { - reportError(callbackObj, remoteNodeId, TE_SCI_UNABLE_TO_REMOVE_SEGMENT); - return false; + report_error(TE_SCI_UNABLE_TO_REMOVE_SEGMENT); + DBUG_PRINT("error", ("Unable to remove segment")); + DBUG_RETURN(false); } - - if(err == SCI_ERR_OK) { -#ifdef DEBUG_TRANSPORTER - printf("Local memory segment is unmapped and removed\n" ); -#endif - } - return true; + DBUG_PRINT("info", ("Local memory segment is unmapped and removed")); + DBUG_RETURN(true); } // disconnectLocal() bool SCI_Transporter::disconnectRemote() { + DBUG_ENTER("SCI_Transporter::disconnectRemote"); sci_error_t err; for(Uint32 i=0; i<m_adapters; i++) { /** @@ -841,35 +856,32 @@ bool SCI_Transporter::disconnectRemote() { */ SCIUnmapSegment(m_TargetSegm[i].rhm[i].map,0,&err); if(err!=SCI_ERR_OK) { - reportError(callbackObj, - remoteNodeId, TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT); - return false; - } + report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT); + DBUG_PRINT("error", ("Unable to unmap segment")); + DBUG_RETURN(false); + } SCIDisconnectSegment(m_TargetSegm[i].rhm[i].remoteHandle, FLAGS, &err); if(err!=SCI_ERR_OK) { - reportError(callbackObj, - remoteNodeId, TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT); - return false; + report_error(TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT); + DBUG_PRINT("error", ("Unable to disconnect segment")); + DBUG_RETURN(false); } -#ifdef DEBUG_TRANSPORTER - ndbout_c("Remote memory segment is unmapped and disconnected\n" ); -#endif + DBUG_PRINT("info", ("Remote memory segment is unmapped and disconnected")); } - return true; + DBUG_RETURN(true); } // disconnectRemote() SCI_Transporter::~SCI_Transporter() { + DBUG_ENTER("SCI_Transporter::~SCI_Transporter"); // Close channel to the driver -#ifdef DEBUG_TRANSPORTER - ndbout << "~SCITransporter does a disConnect" << endl; -#endif doDisconnect(); if(m_sendBuffer.m_buffer != NULL) delete[] m_sendBuffer.m_buffer; + DBUG_VOID_RETURN; } // ~SCI_Transporter() @@ -878,7 +890,7 @@ SCI_Transporter::~SCI_Transporter() { void SCI_Transporter::closeSCI() { // Termination of SCI sci_error_t err; - printf("\nClosing SCI Transporter...\n"); + DBUG_ENTER("SCI_Transporter::closeSCI"); // Disconnect and remove remote segment disconnectRemote(); @@ -890,26 +902,42 @@ void SCI_Transporter::closeSCI() { // Closes an SCI virtual device SCIClose(activeSCIDescriptor, FLAGS, &err); - if(err != SCI_ERR_OK) - fprintf(stderr, - "\nCannot close SCI channel to the driver. Error code 0x%x", - err); + if(err != SCI_ERR_OK) { + DBUG_PRINT("error", ("Cannot close SCI channel to the driver. Error code 0x%x", + err)); + } SCITerminate(); + DBUG_VOID_RETURN; } // closeSCI() Uint32 * -SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){ +SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio) +{ - if(m_sendBuffer.full()){ - /**------------------------------------------------- - * Buffer was completely full. We have severe problems. - * ------------------------------------------------- - */ - if(!doSend()){ + Uint32 sci_buffer_remaining = m_sendBuffer.m_forceSendLimit; + Uint32 send_buf_size = m_sendBuffer.m_sendBufferSize; + Uint32 curr_data_size = m_sendBuffer.m_dataSize << 2; + Uint32 new_curr_data_size = curr_data_size + lenBytes; + if ((new_curr_data_size >= send_buf_size) || + (curr_data_size >= sci_buffer_remaining)) { + /** + * The new message will not fit in the send buffer. We need to + * send the send buffer before filling it up with the new + * signal data. If current data size will spill over buffer edge + * we will also send to avoid writing larger than possible in + * buffer. + */ + if (!doSend()) { + /** + * We were not successfull sending, report 0 as meaning buffer full and + * upper levels handle retries and other recovery matters. + */ return 0; } } - + /** + * New signal fits, simply fill it up with more data. + */ Uint32 sz = m_sendBuffer.m_dataSize; return &m_sendBuffer.m_buffer[sz]; } @@ -918,10 +946,11 @@ void SCI_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){ Uint32 sz = m_sendBuffer.m_dataSize; - sz += (lenBytes / 4); + Uint32 packet_size = m_PacketSize; + sz += ((lenBytes + 3) >> 2); m_sendBuffer.m_dataSize = sz; - if(sz > m_PacketSize) { + if(sz > packet_size) { /**------------------------------------------------- * Buffer is full and we are ready to send. We will * not wait since the signal is already in the buffer. @@ -944,7 +973,8 @@ bool SCI_Transporter::getConnectionStatus() { if(*m_localStatusFlag == SCICONNECTED && (*m_remoteStatusFlag == SCICONNECTED || - *m_remoteStatusFlag2 == SCICONNECTED)) + ((m_adapters > 1) && + *m_remoteStatusFlag2 == SCICONNECTED))) return true; else return false; @@ -954,7 +984,9 @@ SCI_Transporter::getConnectionStatus() { void SCI_Transporter::setConnected() { *m_remoteStatusFlag = SCICONNECTED; - *m_remoteStatusFlag2 = SCICONNECTED; + if (m_adapters > 1) { + *m_remoteStatusFlag2 = SCICONNECTED; + } *m_localStatusFlag = SCICONNECTED; } @@ -963,8 +995,10 @@ void SCI_Transporter::setDisconnect() { if(getLinkStatus(m_ActiveAdapterId)) *m_remoteStatusFlag = SCIDISCONNECT; - if(getLinkStatus(m_StandbyAdapterId)) - *m_remoteStatusFlag2 = SCIDISCONNECT; + if (m_adapters > 1) { + if(getLinkStatus(m_StandbyAdapterId)) + *m_remoteStatusFlag2 = SCIDISCONNECT; + } } @@ -981,20 +1015,20 @@ static bool init = false; bool SCI_Transporter::initSCI() { + DBUG_ENTER("SCI_Transporter::initSCI"); if(!init){ sci_error_t error; // Initialize SISCI library SCIInitialize(0, &error); if(error != SCI_ERR_OK) { -#ifdef DEBUG_TRANSPORTER - ndbout_c("\nCannot initialize SISCI library."); - ndbout_c("\nInconsistency between SISCI library and SISCI driver.Error code 0x%x", error); -#endif - return false; + DBUG_PRINT("error", ("Cannot initialize SISCI library.")); + DBUG_PRINT("error", ("Inconsistency between SISCI library and SISCI driver. Error code 0x%x", + error)); + DBUG_RETURN(false); } init = true; } - return true; + DBUG_RETURN(true); } diff --git a/ndb/src/common/transporter/SCI_Transporter.hpp b/ndb/src/common/transporter/SCI_Transporter.hpp index 03496c2ce21..adc94f8bb4b 100644 --- a/ndb/src/common/transporter/SCI_Transporter.hpp +++ b/ndb/src/common/transporter/SCI_Transporter.hpp @@ -26,7 +26,7 @@ #include <ndb_types.h> - /** +/** * The SCI Transporter * * The design goal of the SCI transporter is to deliver high performance @@ -135,15 +135,17 @@ public: bool getConnectionStatus(); private: - SCI_Transporter(Uint32 packetSize, + SCI_Transporter(TransporterRegistry &t_reg, + const char *local_host, + const char *remote_host, + int port, + Uint32 packetSize, Uint32 bufferSize, Uint32 nAdapters, Uint16 remoteSciNodeId0, Uint16 remoteSciNodeId1, NodeId localNodeID, NodeId remoteNodeID, - int byteorder, - bool compression, bool checksum, bool signalId, Uint32 reportFreq = 4096); @@ -160,7 +162,8 @@ private: /** * For statistics on transfered packets */ -#ifdef DEBUG_TRANSPORTER +//#ifdef DEBUG_TRANSPORTER +#if 1 Uint32 i1024; Uint32 i2048; Uint32 i2049; @@ -177,10 +180,8 @@ private: struct { Uint32 * m_buffer; // The buffer Uint32 m_dataSize; // No of words in buffer - Uint32 m_bufferSize; // Buffer size + Uint32 m_sendBufferSize; // Buffer size Uint32 m_forceSendLimit; // Send when buffer is this full - - bool full() const { return (m_dataSize * 4) > m_forceSendLimit ;} } m_sendBuffer; SHM_Reader * reader; @@ -196,7 +197,7 @@ private: Uint32 m_adapters; Uint32 m_numberOfRemoteNodes; - Uint16* m_remoteNodes; + Uint16 m_remoteNodes[2]; typedef struct SciAdapter { sci_desc_t scidesc; @@ -297,12 +298,12 @@ private: bool sendIsPossible(struct timeval * timeout); - void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){ - reader->getReadPtr(* ptr, * eod); + void getReceivePtr(Uint32 ** ptr, Uint32 &size){ + size = reader->getReadPtr(* ptr); } - void updateReceivePtr(Uint32 * ptr){ - reader->updateReadPtr(ptr); + void updateReceivePtr(Uint32 size){ + reader->updateReadPtr(size); } /** @@ -341,7 +342,9 @@ private: */ void failoverShmWriter(); - + bool init_local(); + bool init_remote(); + protected: /** Perform a connection between segment @@ -350,7 +353,8 @@ protected: * retrying. * @return Returns true on success, otherwize falser */ - bool connectImpl(Uint32 timeOutMillis); + bool connect_server_impl(NDB_SOCKET_TYPE sockfd); + bool connect_client_impl(NDB_SOCKET_TYPE sockfd); /** * We will disconnect if: diff --git a/ndb/src/common/transporter/SHM_Buffer.hpp b/ndb/src/common/transporter/SHM_Buffer.hpp index 32e59dd57a2..b0dbd3362a8 100644 --- a/ndb/src/common/transporter/SHM_Buffer.hpp +++ b/ndb/src/common/transporter/SHM_Buffer.hpp @@ -42,17 +42,19 @@ public: Uint32 _sizeOfBuffer, Uint32 _slack, Uint32 * _readIndex, + Uint32 * _endWriteIndex, Uint32 * _writeIndex) : m_startOfBuffer(_startOfBuffer), m_totalBufferSize(_sizeOfBuffer), m_bufferSize(_sizeOfBuffer - _slack), m_sharedReadIndex(_readIndex), + m_sharedEndWriteIndex(_endWriteIndex), m_sharedWriteIndex(_writeIndex) { } void clear() { - m_readIndex = * m_sharedReadIndex; + m_readIndex = 0; } /** @@ -66,12 +68,12 @@ public: * returns ptr - where to start reading * sz - how much can I read */ - inline void getReadPtr(Uint32 * & ptr, Uint32 * & eod); + inline Uint32 getReadPtr(Uint32 * & ptr); /** * Update read ptr */ - inline void updateReadPtr(Uint32 * readPtr); + inline void updateReadPtr(Uint32 size); private: char * const m_startOfBuffer; @@ -80,6 +82,7 @@ private: Uint32 m_readIndex; Uint32 * m_sharedReadIndex; + Uint32 * m_sharedEndWriteIndex; Uint32 * m_sharedWriteIndex; }; @@ -97,19 +100,22 @@ SHM_Reader::empty() const{ * sz - how much can I read */ inline -void -SHM_Reader::getReadPtr(Uint32 * & ptr, Uint32 * & eod){ - +Uint32 +SHM_Reader::getReadPtr(Uint32 * & ptr) +{ + Uint32 *eod; Uint32 tReadIndex = m_readIndex; Uint32 tWriteIndex = * m_sharedWriteIndex; + Uint32 tEndWriteIndex = * m_sharedEndWriteIndex; ptr = (Uint32*)&m_startOfBuffer[tReadIndex]; if(tReadIndex <= tWriteIndex){ eod = (Uint32*)&m_startOfBuffer[tWriteIndex]; } else { - eod = (Uint32*)&m_startOfBuffer[m_bufferSize]; + eod = (Uint32*)&m_startOfBuffer[tEndWriteIndex]; } + return (Uint32)((char*)eod - (char*)ptr); } /** @@ -117,14 +123,14 @@ SHM_Reader::getReadPtr(Uint32 * & ptr, Uint32 * & eod){ */ inline void -SHM_Reader::updateReadPtr(Uint32 * ptr){ - - Uint32 tReadIndex = ((char *)ptr) - m_startOfBuffer; - +SHM_Reader::updateReadPtr(Uint32 size) +{ + Uint32 tReadIndex = m_readIndex; + tReadIndex += size; assert(tReadIndex < m_totalBufferSize); if(tReadIndex >= m_bufferSize){ - tReadIndex = 0; //-= m_bufferSize; + tReadIndex = 0; } m_readIndex = tReadIndex; @@ -139,17 +145,19 @@ public: Uint32 _sizeOfBuffer, Uint32 _slack, Uint32 * _readIndex, + Uint32 * _endWriteIndex, Uint32 * _writeIndex) : m_startOfBuffer(_startOfBuffer), m_totalBufferSize(_sizeOfBuffer), m_bufferSize(_sizeOfBuffer - _slack), m_sharedReadIndex(_readIndex), + m_sharedEndWriteIndex(_endWriteIndex), m_sharedWriteIndex(_writeIndex) { } void clear() { - m_writeIndex = * m_sharedWriteIndex; + m_writeIndex = 0; } inline char * getWritePtr(Uint32 sz); @@ -168,6 +176,7 @@ private: Uint32 m_writeIndex; Uint32 * m_sharedReadIndex; + Uint32 * m_sharedEndWriteIndex; Uint32 * m_sharedWriteIndex; }; @@ -206,7 +215,8 @@ SHM_Writer::updateWritePtr(Uint32 sz){ assert(tWriteIndex < m_totalBufferSize); if(tWriteIndex >= m_bufferSize){ - tWriteIndex = 0; //-= m_bufferSize; + * m_sharedEndWriteIndex = tWriteIndex; + tWriteIndex = 0; } m_writeIndex = tWriteIndex; diff --git a/ndb/src/common/transporter/SHM_Transporter.cpp b/ndb/src/common/transporter/SHM_Transporter.cpp index aa6b650afa8..7c801658dbd 100644 --- a/ndb/src/common/transporter/SHM_Transporter.cpp +++ b/ndb/src/common/transporter/SHM_Transporter.cpp @@ -32,13 +32,12 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg, int r_port, NodeId lNodeId, NodeId rNodeId, - bool compression, bool checksum, bool signalId, key_t _shmKey, Uint32 _shmSize) : Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId, - 0, compression, checksum, signalId), + 0, false, checksum, signalId), shmKey(_shmKey), shmSize(_shmSize) { @@ -48,7 +47,7 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg, shmBuf = 0; reader = 0; writer = 0; - + setupBuffersDone=false; #ifdef DEBUG_TRANSPORTER printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey); @@ -83,36 +82,40 @@ SHM_Transporter::setupBuffers(){ Uint32 * sharedReadIndex1 = base1; Uint32 * sharedWriteIndex1 = base1 + 1; + Uint32 * sharedEndWriteIndex1 = base1 + 2; serverStatusFlag = base1 + 4; char * startOfBuf1 = shmBuf+sharedSize; Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize); Uint32 * sharedReadIndex2 = base2; Uint32 * sharedWriteIndex2 = base2 + 1; + Uint32 * sharedEndWriteIndex2 = base2 + 2; clientStatusFlag = base2 + 4; char * startOfBuf2 = ((char *)base2)+sharedSize; - * sharedReadIndex2 = * sharedWriteIndex2 = 0; - if(isServer){ * serverStatusFlag = 0; reader = new SHM_Reader(startOfBuf1, sizeOfBuffer, slack, sharedReadIndex1, + sharedEndWriteIndex1, sharedWriteIndex1); writer = new SHM_Writer(startOfBuf2, sizeOfBuffer, slack, sharedReadIndex2, + sharedEndWriteIndex2, sharedWriteIndex2); * sharedReadIndex1 = 0; - * sharedWriteIndex2 = 0; + * sharedWriteIndex1 = 0; + * sharedEndWriteIndex1 = 0; * sharedReadIndex2 = 0; - * sharedWriteIndex1 = 0; + * sharedWriteIndex2 = 0; + * sharedEndWriteIndex2 = 0; reader->clear(); writer->clear(); @@ -145,16 +148,19 @@ SHM_Transporter::setupBuffers(){ sizeOfBuffer, slack, sharedReadIndex2, + sharedEndWriteIndex2, sharedWriteIndex2); writer = new SHM_Writer(startOfBuf1, sizeOfBuffer, slack, sharedReadIndex1, + sharedEndWriteIndex1, sharedWriteIndex1); * sharedReadIndex2 = 0; * sharedWriteIndex1 = 0; + * sharedEndWriteIndex1 = 0; reader->clear(); writer->clear(); @@ -224,6 +230,7 @@ SHM_Transporter::prepareSend(const SignalHeader * const signalHeader, bool SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) { + DBUG_ENTER("SHM_Transporter::connect_server_impl"); SocketOutputStream s_output(sockfd); SocketInputStream s_input(sockfd); char buf[256]; @@ -233,7 +240,7 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) if (!ndb_shm_create()) { report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT); NDB_CLOSE_SOCKET(sockfd); - return false; + DBUG_RETURN(false); } _shmSegCreated = true; } @@ -243,7 +250,7 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) if (!ndb_shm_attach()) { report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT); NDB_CLOSE_SOCKET(sockfd); - return false; + DBUG_RETURN(false); } _attached = true; } @@ -254,7 +261,7 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) // Wait for ok from client if (s_input.gets(buf, 256) == 0) { NDB_CLOSE_SOCKET(sockfd); - return false; + DBUG_RETURN(false); } int r= connect_common(sockfd); @@ -265,17 +272,20 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) // Wait for ok from client if (s_input.gets(buf, 256) == 0) { NDB_CLOSE_SOCKET(sockfd); - return false; + DBUG_RETURN(false); } + DBUG_PRINT("info", ("Successfully connected server to node %d", + remoteNodeId)); } NDB_CLOSE_SOCKET(sockfd); - return r; + DBUG_RETURN(r); } bool SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) { + DBUG_ENTER("SHM_Transporter::connect_client_impl"); SocketInputStream s_input(sockfd); SocketOutputStream s_output(sockfd); char buf[256]; @@ -283,14 +293,18 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) // Wait for server to create and attach if (s_input.gets(buf, 256) == 0) { NDB_CLOSE_SOCKET(sockfd); - return false; + DBUG_PRINT("error", ("Server id %d did not attach", + remoteNodeId)); + DBUG_RETURN(false); } // Create if(!_shmSegCreated){ if (!ndb_shm_get()) { NDB_CLOSE_SOCKET(sockfd); - return false; + DBUG_PRINT("error", ("Failed create of shm seg to node %d", + remoteNodeId)); + DBUG_RETURN(false); } _shmSegCreated = true; } @@ -300,7 +314,9 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) if (!ndb_shm_attach()) { report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT); NDB_CLOSE_SOCKET(sockfd); - return false; + DBUG_PRINT("error", ("Failed attach of shm seg to node %d", + remoteNodeId)); + DBUG_RETURN(false); } _attached = true; } @@ -314,21 +330,28 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) // Wait for ok from server if (s_input.gets(buf, 256) == 0) { NDB_CLOSE_SOCKET(sockfd); - return false; + DBUG_PRINT("error", ("No ok from server node %d", + remoteNodeId)); + DBUG_RETURN(false); } // Send ok to server s_output.println("shm client 2 ok"); + DBUG_PRINT("info", ("Successfully connected client to node %d", + remoteNodeId)); } NDB_CLOSE_SOCKET(sockfd); - return r; + DBUG_RETURN(r); } bool SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd) { - if (!checkConnected()) + if (!checkConnected()) { + DBUG_PRINT("error", ("Already connected to node %d", + remoteNodeId)); return false; + } if(!setupBuffersDone) { setupBuffers(); @@ -341,5 +364,7 @@ SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd) return true; } + DBUG_PRINT("error", ("Failed to set up buffers to node %d", + remoteNodeId)); return false; } diff --git a/ndb/src/common/transporter/SHM_Transporter.hpp b/ndb/src/common/transporter/SHM_Transporter.hpp index be54d0daa2a..892acbb7ac4 100644 --- a/ndb/src/common/transporter/SHM_Transporter.hpp +++ b/ndb/src/common/transporter/SHM_Transporter.hpp @@ -38,7 +38,6 @@ public: int r_port, NodeId lNodeId, NodeId rNodeId, - bool compression, bool checksum, bool signalId, key_t shmKey, @@ -62,12 +61,12 @@ public: writer->updateWritePtr(lenBytes); } - void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){ - reader->getReadPtr(* ptr, * eod); + void getReceivePtr(Uint32 ** ptr, Uint32 sz){ + sz = reader->getReadPtr(* ptr); } - void updateReceivePtr(Uint32 * ptr){ - reader->updateReadPtr(ptr); + void updateReceivePtr(Uint32 sz){ + reader->updateReadPtr(sz); } protected: @@ -127,6 +126,7 @@ protected: private: bool _shmSegCreated; bool _attached; + bool m_connected; key_t shmKey; volatile Uint32 * serverStatusFlag; diff --git a/ndb/src/common/transporter/TCP_Transporter.cpp b/ndb/src/common/transporter/TCP_Transporter.cpp index 8833b51e236..b44afc7c136 100644 --- a/ndb/src/common/transporter/TCP_Transporter.cpp +++ b/ndb/src/common/transporter/TCP_Transporter.cpp @@ -70,11 +70,10 @@ TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg, int r_port, NodeId lNodeId, NodeId rNodeId, - int byte_order, - bool compr, bool chksm, bool signalId, + bool chksm, bool signalId, Uint32 _reportFreq) : Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId, - byte_order, compr, chksm, signalId), + 0, false, chksm, signalId), m_sendBuffer(sendBufSize) { maxReceiveSize = maxRecvSize; @@ -106,12 +105,14 @@ TCP_Transporter::~TCP_Transporter() { bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) { - return connect_common(sockfd); + DBUG_ENTER("TCP_Transpporter::connect_server_impl"); + DBUG_RETURN(connect_common(sockfd)); } bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) { - return connect_common(sockfd); + DBUG_ENTER("TCP_Transpporter::connect_client_impl"); + DBUG_RETURN(connect_common(sockfd)); } bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd) @@ -119,6 +120,8 @@ bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd) theSocket = sockfd; setSocketOptions(); setSocketNonBlocking(theSocket); + DBUG_PRINT("info", ("Successfully set-up TCP transporter to node %d", + remoteNodeId)); return true; } diff --git a/ndb/src/common/transporter/TCP_Transporter.hpp b/ndb/src/common/transporter/TCP_Transporter.hpp index 958cfde03a1..48046310bf8 100644 --- a/ndb/src/common/transporter/TCP_Transporter.hpp +++ b/ndb/src/common/transporter/TCP_Transporter.hpp @@ -52,8 +52,7 @@ private: int r_port, NodeId lHostId, NodeId rHostId, - int byteorder, - bool compression, bool checksum, bool signalId, + bool checksum, bool signalId, Uint32 reportFreq = 4096); // Disconnect, delete send buffers and receive buffer diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp index 01f1f74f053..ad8a2729c26 100644 --- a/ndb/src/common/transporter/TransporterRegistry.cpp +++ b/ndb/src/common/transporter/TransporterRegistry.cpp @@ -15,6 +15,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include <ndb_global.h> +#include <my_pthread.h> #include <TransporterRegistry.hpp> #include "TransporterInternalDefinitions.hpp" @@ -48,9 +49,10 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) { + DBUG_ENTER("SocketServer::Session * TransporterService::newSession"); if (m_auth && !m_auth->server_authenticate(sockfd)){ NDB_CLOSE_SOCKET(sockfd); - return 0; + DBUG_RETURN(0); } { @@ -60,27 +62,32 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) char buf[256]; if (s_input.gets(buf, 256) == 0) { NDB_CLOSE_SOCKET(sockfd); - return 0; + DBUG_PRINT("error", ("Could not get node id from client")); + DBUG_RETURN(0); } if (sscanf(buf, "%d", &nodeId) != 1) { NDB_CLOSE_SOCKET(sockfd); - return 0; + DBUG_PRINT("error", ("Error in node id from client")); + DBUG_RETURN(0); } //check that nodeid is valid and that there is an allocated transporter - if ( nodeId < 0 || nodeId >= m_transporter_registry->maxTransporters) { - NDB_CLOSE_SOCKET(sockfd); - return 0; + if ( nodeId < 0 || nodeId >= (int)m_transporter_registry->maxTransporters) { + NDB_CLOSE_SOCKET(sockfd); + DBUG_PRINT("error", ("Node id out of range from client")); + DBUG_RETURN(0); } if (m_transporter_registry->theTransporters[nodeId] == 0) { NDB_CLOSE_SOCKET(sockfd); - return 0; + DBUG_PRINT("error", ("No transporter for this node id from client")); + DBUG_RETURN(0); } //check that the transporter should be connected if (m_transporter_registry->performStates[nodeId] != TransporterRegistry::CONNECTING) { NDB_CLOSE_SOCKET(sockfd); - return 0; + DBUG_PRINT("error", ("Transporter in wrong state for this node id from client")); + DBUG_RETURN(0); } Transporter *t= m_transporter_registry->theTransporters[nodeId]; @@ -93,7 +100,7 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) t->connect_server(sockfd); } - return 0; + DBUG_RETURN(0); } TransporterRegistry::TransporterRegistry(void * callback, @@ -209,8 +216,6 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { config->port, localNodeId, config->remoteNodeId, - config->byteOrder, - config->compression, config->checksum, config->signalId); if (t == NULL) @@ -264,8 +269,6 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) { conf->localHostName, conf->remoteNodeId, conf->remoteHostName, - conf->byteOrder, - conf->compression, conf->checksum, conf->signalId); if (t == NULL) @@ -306,15 +309,17 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) { if(theTransporters[config->remoteNodeId] != NULL) return false; - SCI_Transporter * t = new SCI_Transporter(config->sendLimit, + SCI_Transporter * t = new SCI_Transporter(*this, + config->localHostName, + config->remoteHostName, + config->port, + config->sendLimit, config->bufferSize, config->nLocalAdapters, config->remoteSciNodeId0, config->remoteSciNodeId1, localNodeId, config->remoteNodeId, - config->byteOrder, - config->compression, config->checksum, config->signalId); @@ -357,7 +362,6 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { config->port, localNodeId, config->remoteNodeId, - config->compression, config->checksum, config->signalId, config->shmKey, @@ -853,10 +857,11 @@ TransporterRegistry::performReceive(){ const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()){ - Uint32 * readPtr, * eodPtr; - t->getReceivePtr(&readPtr, &eodPtr); - readPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); - t->updateReceivePtr(readPtr); + Uint32 * readPtr; + Uint32 sz = 0; + t->getReceivePtr(&readPtr, sz); + Uint32 szUsed = unpack(readPtr, sz, nodeId, ioStates[nodeId]); + t->updateReceivePtr(szUsed); } } } @@ -868,10 +873,11 @@ TransporterRegistry::performReceive(){ const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()){ - Uint32 * readPtr, * eodPtr; - t->getReceivePtr(&readPtr, &eodPtr); - readPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); - t->updateReceivePtr(readPtr); + Uint32 * readPtr; + Uint32 sz = 0; + t->getReceivePtr(&readPtr, sz); + Uint32 szUsed = unpack(readPtr, sz, nodeId, ioStates[nodeId]); + t->updateReceivePtr(szUsed); } } } @@ -1023,7 +1029,9 @@ TransporterRegistry::setIOState(NodeId nodeId, IOState state) { static void * run_start_clients_C(void * me) { + my_thread_init(); ((TransporterRegistry*) me)->start_clients_thread(); + my_thread_end(); NdbThread_Exit(0); return me; } @@ -1106,6 +1114,7 @@ TransporterRegistry::update_connections() void TransporterRegistry::start_clients_thread() { + DBUG_ENTER("TransporterRegistry::start_clients_thread"); while (m_run_start_clients_thread) { NdbSleep_MilliSleep(100); for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){ @@ -1129,6 +1138,7 @@ TransporterRegistry::start_clients_thread() } } } + DBUG_VOID_RETURN; } bool diff --git a/ndb/src/common/util/SocketServer.cpp b/ndb/src/common/util/SocketServer.cpp index 0cc06a54496..c432d686462 100644 --- a/ndb/src/common/util/SocketServer.cpp +++ b/ndb/src/common/util/SocketServer.cpp @@ -16,6 +16,7 @@ #include <ndb_global.h> +#include <my_pthread.h> #include <SocketServer.hpp> @@ -176,9 +177,9 @@ extern "C" void* socketServerThread_C(void* _ss){ SocketServer * ss = (SocketServer *)_ss; - + my_thread_init(); ss->doRun(); - + my_thread_end(); NdbThread_Exit(0); return 0; } @@ -287,8 +288,10 @@ void* sessionThread_C(void* _sc){ SocketServer::Session * si = (SocketServer::Session *)_sc; + my_thread_init(); if(!transfer(si->m_socket)){ si->m_stopped = true; + my_thread_end(); NdbThread_Exit(0); return 0; } @@ -301,6 +304,7 @@ sessionThread_C(void* _sc){ } si->m_stopped = true; + my_thread_end(); NdbThread_Exit(0); return 0; } diff --git a/ndb/src/cw/cpcd/Makefile.am b/ndb/src/cw/cpcd/Makefile.am index e276d1a766d..6af44a359fc 100644 --- a/ndb/src/cw/cpcd/Makefile.am +++ b/ndb/src/cw/cpcd/Makefile.am @@ -7,7 +7,7 @@ LDADD_LOC = \ $(top_builddir)/ndb/src/libndbclient.la \ $(top_builddir)/dbug/libdbug.a \ $(top_builddir)/mysys/libmysys.a \ - $(top_builddir)/strings/libmystrings.a + $(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@ include $(top_srcdir)/ndb/config/common.mk.am include $(top_srcdir)/ndb/config/type_util.mk.am diff --git a/ndb/src/kernel/Makefile.am b/ndb/src/kernel/Makefile.am index a6be3244b41..493ab4f9982 100644 --- a/ndb/src/kernel/Makefile.am +++ b/ndb/src/kernel/Makefile.am @@ -55,7 +55,7 @@ LDADD += \ $(top_builddir)/ndb/src/common/util/libgeneral.la \ $(top_builddir)/dbug/libdbug.a \ $(top_builddir)/mysys/libmysys.a \ - $(top_builddir)/strings/libmystrings.a + $(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@ # Don't update the files from bitkeeper %::SCCS/s.% diff --git a/ndb/src/kernel/blocks/backup/restore/Makefile.am b/ndb/src/kernel/blocks/backup/restore/Makefile.am index eef5bc5a203..16550f13546 100644 --- a/ndb/src/kernel/blocks/backup/restore/Makefile.am +++ b/ndb/src/kernel/blocks/backup/restore/Makefile.am @@ -7,7 +7,7 @@ LDADD_LOC = \ $(top_builddir)/ndb/src/libndbclient.la \ $(top_builddir)/dbug/libdbug.a \ $(top_builddir)/mysys/libmysys.a \ - $(top_builddir)/strings/libmystrings.a + $(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@ include $(top_srcdir)/ndb/config/common.mk.am diff --git a/ndb/src/mgmsrv/ConfigInfo.cpp b/ndb/src/mgmsrv/ConfigInfo.cpp index 948423f0109..b3afd57f6cd 100644 --- a/ndb/src/mgmsrv/ConfigInfo.cpp +++ b/ndb/src/mgmsrv/ConfigInfo.cpp @@ -125,11 +125,14 @@ ConfigInfo::m_SectionRules[] = { { "TCP", fixHostname, "HostName1" }, { "TCP", fixHostname, "HostName2" }, + { "SCI", fixHostname, "HostName1" }, + { "SCI", fixHostname, "HostName2" }, { "OSE", fixHostname, "HostName1" }, { "OSE", fixHostname, "HostName2" }, { "TCP", fixPortNumber, 0 }, // has to come after fixHostName { "SHM", fixPortNumber, 0 }, // has to come after fixHostName + { "SCI", fixPortNumber, 0 }, // has to come after fixHostName //{ "SHM", fixShmKey, 0 }, /** @@ -159,6 +162,8 @@ ConfigInfo::m_SectionRules[] = { { "TCP", checkTCPConstraints, "HostName1" }, { "TCP", checkTCPConstraints, "HostName2" }, + { "SCI", checkTCPConstraints, "HostName1" }, + { "SCI", checkTCPConstraints, "HostName2" }, { "*", checkMandatory, 0 }, @@ -1788,7 +1793,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { "Id of node ("DB_TOKEN", "API_TOKEN" or "MGM_TOKEN") on one side of the connection", ConfigInfo::USED, false, - ConfigInfo::INT, + ConfigInfo::STRING, MANDATORY, "0", STR_VALUE(MAX_INT_RNIL) }, @@ -1800,28 +1805,74 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { "Id of node ("DB_TOKEN", "API_TOKEN" or "MGM_TOKEN") on one side of the connection", ConfigInfo::USED, false, - ConfigInfo::INT, + ConfigInfo::STRING, MANDATORY, "0", STR_VALUE(MAX_INT_RNIL) }, { - CFG_SCI_ID_0, - "SciId0", + CFG_SCI_HOSTNAME_1, + "HostName1", + "SCI", + "Name/IP of computer on one side of the connection", + ConfigInfo::INTERNAL, + false, + ConfigInfo::STRING, + UNDEFINED, + 0, 0 }, + + { + CFG_SCI_HOSTNAME_2, + "HostName2", + "SCI", + "Name/IP of computer on one side of the connection", + ConfigInfo::INTERNAL, + false, + ConfigInfo::STRING, + UNDEFINED, + 0, 0 }, + + { + CFG_CONNECTION_SERVER_PORT, + "PortNumber", "SCI", - "Local SCI-node id for adapter 0 (a computer can have two adapters)", + "Port used for this transporter", ConfigInfo::USED, false, ConfigInfo::INT, MANDATORY, + "0", + STR_VALUE(MAX_INT_RNIL) }, + + { + CFG_SCI_HOST1_ID_0, + "Host1SciId0", + "SCI", + "SCI-node id for adapter 0 on Host1 (a computer can have two adapters)", + ConfigInfo::USED, + false, + ConfigInfo::INT, + MANDATORY, + "0", + STR_VALUE(MAX_INT_RNIL) }, + + { + CFG_SCI_HOST1_ID_1, + "Host1SciId1", + "SCI", + "SCI-node id for adapter 1 on Host1 (a computer can have two adapters)", + ConfigInfo::USED, + false, + ConfigInfo::INT, + "0", "0", STR_VALUE(MAX_INT_RNIL) }, { - CFG_SCI_ID_1, - "SciId1", + CFG_SCI_HOST2_ID_0, + "Host2SciId0", "SCI", - "Local SCI-node id for adapter 1 (a computer can have two adapters)", + "SCI-node id for adapter 0 on Host2 (a computer can have two adapters)", ConfigInfo::USED, false, ConfigInfo::INT, @@ -1830,6 +1881,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { STR_VALUE(MAX_INT_RNIL) }, { + CFG_SCI_HOST2_ID_1, + "Host2SciId1", + "SCI", + "SCI-node id for adapter 1 on Host2 (a computer can have two adapters)", + ConfigInfo::USED, + false, + ConfigInfo::INT, + "0", + "0", + STR_VALUE(MAX_INT_RNIL) }, + + { CFG_CONNECTION_SEND_SIGNAL_ID, "SendSignalId", "SCI", @@ -1862,8 +1925,8 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { false, ConfigInfo::INT, "2K", - "512", - STR_VALUE(MAX_INT_RNIL) }, + "128", + "32K" }, { CFG_SCI_BUFFER_MEM, @@ -1873,8 +1936,8 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::USED, false, ConfigInfo::INT, - "1M", - "256K", + "192K", + "64K", STR_VALUE(MAX_INT_RNIL) }, { diff --git a/ndb/src/mgmsrv/Makefile.am b/ndb/src/mgmsrv/Makefile.am index 8fa9ec5f63e..5e048eb1418 100644 --- a/ndb/src/mgmsrv/Makefile.am +++ b/ndb/src/mgmsrv/Makefile.am @@ -29,7 +29,7 @@ LDADD_LOC = $(top_builddir)/ndb/src/libndbclient.la \ $(top_builddir)/ndb/src/common/editline/libeditline.a \ $(top_builddir)/dbug/libdbug.a \ $(top_builddir)/mysys/libmysys.a \ - $(top_builddir)/strings/libmystrings.a + $(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@ @TERMCAP_LIB@ DEFS_LOC = -DDEFAULT_MYSQL_HOME="\"$(MYSQLBASEdir)\"" \ diff --git a/sql/Makefile.am b/sql/Makefile.am index d951aae91e1..19bdf8055f3 100644 --- a/sql/Makefile.am +++ b/sql/Makefile.am @@ -37,7 +37,7 @@ LDADD = @isam_libs@ \ $(top_builddir)/mysys/libmysys.a \ $(top_builddir)/dbug/libdbug.a \ $(top_builddir)/regex/libregex.a \ - $(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@ + $(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@ @NDB_SCI_LIBS@ mysqld_LDADD = @MYSQLD_EXTRA_LDFLAGS@ \ @bdb_libs@ @innodb_libs@ @pstack_libs@ \ |