path: root/storage/ndb/src/common/transporter/TransporterRegistry.cpp
diff options
Diffstat (limited to 'storage/ndb/src/common/transporter/TransporterRegistry.cpp')
1 files changed, 1625 insertions, 0 deletions
diff --git a/storage/ndb/src/common/transporter/TransporterRegistry.cpp b/storage/ndb/src/common/transporter/TransporterRegistry.cpp
new file mode 100644
index 00000000000..f331b1660c1
--- /dev/null
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp
@@ -0,0 +1,1625 @@
+/* Copyright (C) 2003 MySQL AB
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ GNU General Public License for more details.
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#include <ndb_global.h>
+#include <my_pthread.h>
+#include <TransporterRegistry.hpp>
+#include "TransporterInternalDefinitions.hpp"
+#include "Transporter.hpp"
+#include <SocketAuthenticator.hpp>
+#include "TCP_Transporter.hpp"
+#include "OSE_Receiver.hpp"
+#include "OSE_Transporter.hpp"
+#include "SCI_Transporter.hpp"
+#include "SHM_Transporter.hpp"
+extern int g_ndb_shm_signum;
+#include "TransporterCallback.hpp"
+#include "NdbOut.hpp"
+#include <NdbSleep.h>
+#include <NdbTick.h>
+#include <InputStream.hpp>
+#include <OutputStream.hpp>
+#include <mgmapi/mgmapi.h>
+#include <mgmapi_internal.h>
+#include <mgmapi/mgmapi_debug.h>
+#include <EventLogger.hpp>
+extern EventLogger g_eventLogger;
+SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
+ DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
+ if (m_auth && !m_auth->server_authenticate(sockfd)){
+ }
+ if (!m_transporter_registry->connect_server(sockfd))
+ {
+ }
+TransporterRegistry::TransporterRegistry(void * callback,
+ unsigned _maxTransporters,
+ unsigned sizeOfLongSignalMemory) {
+ nodeIdSpecified = false;
+ maxTransporters = _maxTransporters;
+ sendCounter = 1;
+ m_mgm_handle= 0;
+ callbackObj=callback;
+ theTCPTransporters = new TCP_Transporter * [maxTransporters];
+ theSCITransporters = new SCI_Transporter * [maxTransporters];
+ theSHMTransporters = new SHM_Transporter * [maxTransporters];
+ theOSETransporters = new OSE_Transporter * [maxTransporters];
+ theTransporterTypes = new TransporterType [maxTransporters];
+ theTransporters = new Transporter * [maxTransporters];
+ performStates = new PerformState [maxTransporters];
+ ioStates = new IOState [maxTransporters];
+ // Initialize member variables
+ nTransporters = 0;
+ nTCPTransporters = 0;
+ nSCITransporters = 0;
+ nSHMTransporters = 0;
+ nOSETransporters = 0;
+ // Initialize the transporter arrays
+ for (unsigned i=0; i<maxTransporters; i++) {
+ theTCPTransporters[i] = NULL;
+ theSCITransporters[i] = NULL;
+ theSHMTransporters[i] = NULL;
+ theOSETransporters[i] = NULL;
+ theTransporters[i] = NULL;
+ performStates[i] = DISCONNECTED;
+ ioStates[i] = NoHalt;
+ }
+ theOSEReceiver = 0;
+ theOSEJunkSocketSend = 0;
+ theOSEJunkSocketRecv = 0;
+void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
+ DBUG_ENTER("TransporterRegistry::set_mgm_handle");
+ if (m_mgm_handle)
+ ndb_mgm_destroy_handle(&m_mgm_handle);
+ m_mgm_handle= h;
+#ifndef DBUG_OFF
+ if (h)
+ {
+ char buf[256];
+ DBUG_PRINT("info",("handle set with connectstring: %s",
+ ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
+ }
+ else
+ {
+ DBUG_PRINT("info",("handle set to NULL"));
+ }
+TransporterRegistry::~TransporterRegistry() {
+ removeAll();
+ delete[] theTCPTransporters;
+ delete[] theSCITransporters;
+ delete[] theSHMTransporters;
+ delete[] theOSETransporters;
+ delete[] theTransporterTypes;
+ delete[] theTransporters;
+ delete[] performStates;
+ delete[] ioStates;
+ if(theOSEReceiver != NULL){
+ theOSEReceiver->destroyPhantom();
+ delete theOSEReceiver;
+ theOSEReceiver = 0;
+ }
+ if (m_mgm_handle)
+ ndb_mgm_destroy_handle(&m_mgm_handle);
+ for(unsigned i = 0; i<maxTransporters; i++){
+ if(theTransporters[i] != NULL)
+ removeTransporter(theTransporters[i]->getRemoteNodeId());
+ }
+ for(unsigned i = 0; i<maxTransporters; i++){
+ if(theTransporters[i] != NULL)
+ theTransporters[i]->doDisconnect();
+ }
+TransporterRegistry::init(NodeId nodeId) {
+ DBUG_ENTER("TransporterRegistry::init");
+ nodeIdSpecified = true;
+ localNodeId = nodeId;
+ DEBUG("TransporterRegistry started node: " << localNodeId);
+ DBUG_RETURN(true);
+TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
+ DBUG_ENTER("TransporterRegistry::connect_server");
+ // read node id from client
+ // read transporter type
+ int nodeId, remote_transporter_type= -1;
+ SocketInputStream s_input(sockfd);
+ char buf[256];
+ if (s_input.gets(buf, 256) == 0) {
+ DBUG_PRINT("error", ("Could not get node id from client"));
+ DBUG_RETURN(false);
+ }
+ int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
+ switch (r) {
+ case 2:
+ break;
+ case 1:
+ // we're running version prior to 4.1.9
+ // ok, but with no checks on transporter configuration compatability
+ break;
+ default:
+ DBUG_PRINT("error", ("Error in node id from client"));
+ DBUG_RETURN(false);
+ }
+ DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
+ nodeId,remote_transporter_type));
+ //check that nodeid is valid and that there is an allocated transporter
+ if ( nodeId < 0 || nodeId >= (int)maxTransporters) {
+ DBUG_PRINT("error", ("Node id out of range from client"));
+ DBUG_RETURN(false);
+ }
+ if (theTransporters[nodeId] == 0) {
+ DBUG_PRINT("error", ("No transporter for this node id from client"));
+ DBUG_RETURN(false);
+ }
+ //check that the transporter should be connected
+ if (performStates[nodeId] != TransporterRegistry::CONNECTING) {
+ DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
+ DBUG_RETURN(false);
+ }
+ Transporter *t= theTransporters[nodeId];
+ // send info about own id (just as response to acknowledge connection)
+ // send info on own transporter type
+ SocketOutputStream s_output(sockfd);
+ s_output.println("%d %d", t->getLocalNodeId(), t->m_type);
+ if (remote_transporter_type != -1)
+ {
+ if (remote_transporter_type != t->m_type)
+ {
+ DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
+ t->m_type, remote_transporter_type));
+ g_eventLogger.error("Incompatible configuration: Transporter type "
+ "mismatch with node %d", nodeId);
+ // wait for socket close for 1 second to let message arrive at client
+ {
+ fd_set a_set;
+ FD_ZERO(&a_set);
+ FD_SET(sockfd, &a_set);
+ struct timeval timeout;
+ timeout.tv_sec = 1; timeout.tv_usec = 0;
+ select(sockfd+1, &a_set, 0, 0, &timeout);
+ }
+ DBUG_RETURN(false);
+ }
+ }
+ else if (t->m_type == tt_SHM_TRANSPORTER)
+ {
+ g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
+ }
+ // setup transporter (transporter responsible for closing sockfd)
+ t->connect_server(sockfd);
+ DBUG_RETURN(true);
+TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
+ if(!nodeIdSpecified){
+ init(config->localNodeId);
+ }
+ if(config->localNodeId != localNodeId)
+ return false;
+ if(theTransporters[config->remoteNodeId] != NULL)
+ return false;
+ TCP_Transporter * t = new TCP_Transporter(*this,
+ config->tcp.sendBufferSize,
+ config->tcp.maxReceiveSize,
+ config->localHostName,
+ config->remoteHostName,
+ config->port,
+ config->isMgmConnection,
+ localNodeId,
+ config->remoteNodeId,
+ config->serverNodeId,
+ config->checksum,
+ config->signalId);
+ if (t == NULL)
+ return false;
+ else if (!t->initTransporter()) {
+ delete t;
+ return false;
+ }
+ // Put the transporter in the transporter arrays
+ theTCPTransporters[nTCPTransporters] = t;
+ theTransporters[t->getRemoteNodeId()] = t;
+ theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
+ performStates[t->getRemoteNodeId()] = DISCONNECTED;
+ nTransporters++;
+ nTCPTransporters++;
+#if defined NDB_OSE || defined NDB_SOFTOSE
+ t->theReceiverPid = theReceiverPid;
+ return true;
+ return false;
+TransporterRegistry::createOSETransporter(TransporterConfiguration *conf) {
+ if(!nodeIdSpecified){
+ init(conf->localNodeId);
+ }
+ if(conf->localNodeId != localNodeId)
+ return false;
+ if(theTransporters[conf->remoteNodeId] != NULL)
+ return false;
+ if(theOSEReceiver == NULL){
+ theOSEReceiver = new OSE_Receiver(this,
+ 10,
+ localNodeId);
+ }
+ OSE_Transporter * t = new OSE_Transporter(conf->ose.prioASignalSize,
+ conf->ose.prioBSignalSize,
+ localNodeId,
+ conf->localHostName,
+ conf->remoteNodeId,
+ conf->serverNodeId,
+ conf->remoteHostName,
+ conf->checksum,
+ conf->signalId);
+ if (t == NULL)
+ return false;
+ else if (!t->initTransporter()) {
+ delete t;
+ return false;
+ }
+ // Put the transporter in the transporter arrays
+ theOSETransporters[nOSETransporters] = t;
+ theTransporters[t->getRemoteNodeId()] = t;
+ theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER;
+ performStates[t->getRemoteNodeId()] = DISCONNECTED;
+ nTransporters++;
+ nOSETransporters++;
+ return true;
+ return false;
+TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
+ if(!SCI_Transporter::initSCI())
+ abort();
+ if(!nodeIdSpecified){
+ init(config->localNodeId);
+ }
+ if(config->localNodeId != localNodeId)
+ return false;
+ if(theTransporters[config->remoteNodeId] != NULL)
+ return false;
+ SCI_Transporter * t = new SCI_Transporter(*this,
+ config->localHostName,
+ config->remoteHostName,
+ config->port,
+ config->isMgmConnection,
+ config->sci.sendLimit,
+ config->sci.bufferSize,
+ config->sci.nLocalAdapters,
+ config->sci.remoteSciNodeId0,
+ config->sci.remoteSciNodeId1,
+ localNodeId,
+ config->remoteNodeId,
+ config->serverNodeId,
+ config->checksum,
+ config->signalId);
+ if (t == NULL)
+ return false;
+ else if (!t->initTransporter()) {
+ delete t;
+ return false;
+ }
+ // Put the transporter in the transporter arrays
+ theSCITransporters[nSCITransporters] = t;
+ theTransporters[t->getRemoteNodeId()] = t;
+ theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
+ performStates[t->getRemoteNodeId()] = DISCONNECTED;
+ nTransporters++;
+ nSCITransporters++;
+ return true;
+ return false;
+TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
+ DBUG_ENTER("TransporterRegistry::createTransporter SHM");
+ if(!nodeIdSpecified){
+ init(config->localNodeId);
+ }
+ if(config->localNodeId != localNodeId)
+ return false;
+ if (!g_ndb_shm_signum) {
+ g_ndb_shm_signum= config->shm.signum;
+ DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
+ /**
+ * Make sure to block g_ndb_shm_signum
+ * TransporterRegistry::init is run from "main" thread
+ */
+ sigset_t mask;
+ sigemptyset(&mask);
+ sigaddset(&mask, g_ndb_shm_signum);
+ pthread_sigmask(SIG_BLOCK, &mask, 0);
+ }
+ if(config->shm.signum != g_ndb_shm_signum)
+ return false;
+ if(theTransporters[config->remoteNodeId] != NULL)
+ return false;
+ SHM_Transporter * t = new SHM_Transporter(*this,
+ config->localHostName,
+ config->remoteHostName,
+ config->port,
+ config->isMgmConnection,
+ localNodeId,
+ config->remoteNodeId,
+ config->serverNodeId,
+ config->checksum,
+ config->signalId,
+ config->shm.shmKey,
+ config->shm.shmSize
+ );
+ if (t == NULL)
+ return false;
+ else if (!t->initTransporter()) {
+ delete t;
+ return false;
+ }
+ // Put the transporter in the transporter arrays
+ theSHMTransporters[nSHMTransporters] = t;
+ theTransporters[t->getRemoteNodeId()] = t;
+ theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
+ performStates[t->getRemoteNodeId()] = DISCONNECTED;
+ nTransporters++;
+ nSHMTransporters++;
+ DBUG_RETURN(true);
+ DBUG_RETURN(false);
+TransporterRegistry::removeTransporter(NodeId nodeId) {
+ DEBUG("Removing transporter from " << localNodeId
+ << " to " << nodeId);
+ if(theTransporters[nodeId] == NULL)
+ return;
+ theTransporters[nodeId]->doDisconnect();
+ const TransporterType type = theTransporterTypes[nodeId];
+ int ind = 0;
+ switch(type){
+ for(; ind < nTCPTransporters; ind++)
+ if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
+ break;
+ ind++;
+ for(; ind<nTCPTransporters; ind++)
+ theTCPTransporters[ind-1] = theTCPTransporters[ind];
+ nTCPTransporters --;
+ break;
+ for(; ind < nSCITransporters; ind++)
+ if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
+ break;
+ ind++;
+ for(; ind<nSCITransporters; ind++)
+ theSCITransporters[ind-1] = theSCITransporters[ind];
+ nSCITransporters --;
+ break;
+ for(; ind < nSHMTransporters; ind++)
+ if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
+ break;
+ ind++;
+ for(; ind<nSHMTransporters; ind++)
+ theSHMTransporters[ind-1] = theSHMTransporters[ind];
+ nSHMTransporters --;
+ break;
+ for(; ind < nOSETransporters; ind++)
+ if(theOSETransporters[ind]->getRemoteNodeId() == nodeId)
+ break;
+ ind++;
+ for(; ind<nOSETransporters; ind++)
+ theOSETransporters[ind-1] = theOSETransporters[ind];
+ nOSETransporters --;
+ break;
+ }
+ nTransporters--;
+ // Delete the transporter and remove it from theTransporters array
+ delete theTransporters[nodeId];
+ theTransporters[nodeId] = NULL;
+TransporterRegistry::prepareSend(const SignalHeader * const signalHeader,
+ Uint8 prio,
+ const Uint32 * const signalData,
+ NodeId nodeId,
+ const LinearSectionPtr ptr[3]){
+ Transporter *t = theTransporters[nodeId];
+ if(t != NULL &&
+ (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
+ ((signalHeader->theReceiversBlockNumber == 252) ||
+ (signalHeader->theReceiversBlockNumber == 4002)))) {
+ if(t->isConnected()){
+ Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
+ if(lenBytes <= MAX_MESSAGE_SIZE){
+ Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
+ if(insertPtr != 0){
+ t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
+ t->updateWritePtr(lenBytes, prio);
+ return SEND_OK;
+ }
+ int sleepTime = 2;
+ /**
+ * @note: on linux/i386 the granularity is 10ms
+ * so sleepTime = 2 generates a 10 ms sleep.
+ */
+ for(int i = 0; i<50; i++){
+ if((nSHMTransporters+nSCITransporters) == 0)
+ NdbSleep_MilliSleep(sleepTime);
+ insertPtr = t->getWritePtr(lenBytes, prio);
+ if(insertPtr != 0){
+ t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
+ t->updateWritePtr(lenBytes, prio);
+ break;
+ }
+ }
+ if(insertPtr != 0){
+ /**
+ * Send buffer full, but resend works
+ */
+ reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
+ return SEND_OK;
+ }
+ WARNING("Signal to " << nodeId << " lost(buffer)");
+ reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
+ } else {
+ }
+ } else {
+ DEBUG("Signal to " << nodeId << " lost(disconnect) ");
+ }
+ } else {
+ DEBUG("Discarding message to block: "
+ << signalHeader->theReceiversBlockNumber
+ << " node: " << nodeId);
+ if(t == NULL)
+ return SEND_BLOCKED;
+ }
+TransporterRegistry::prepareSend(const SignalHeader * const signalHeader,
+ Uint8 prio,
+ const Uint32 * const signalData,
+ NodeId nodeId,
+ class SectionSegmentPool & thePool,
+ const SegmentedSectionPtr ptr[3]){
+ Transporter *t = theTransporters[nodeId];
+ if(t != NULL &&
+ (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
+ ((signalHeader->theReceiversBlockNumber == 252)||
+ (signalHeader->theReceiversBlockNumber == 4002)))) {
+ if(t->isConnected()){
+ Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
+ if(lenBytes <= MAX_MESSAGE_SIZE){
+ Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
+ if(insertPtr != 0){
+ t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
+ t->updateWritePtr(lenBytes, prio);
+ return SEND_OK;
+ }
+ /**
+ * @note: on linux/i386 the granularity is 10ms
+ * so sleepTime = 2 generates a 10 ms sleep.
+ */
+ int sleepTime = 2;
+ for(int i = 0; i<50; i++){
+ if((nSHMTransporters+nSCITransporters) == 0)
+ NdbSleep_MilliSleep(sleepTime);
+ insertPtr = t->getWritePtr(lenBytes, prio);
+ if(insertPtr != 0){
+ t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
+ t->updateWritePtr(lenBytes, prio);
+ break;
+ }
+ }
+ if(insertPtr != 0){
+ /**
+ * Send buffer full, but resend works
+ */
+ reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
+ return SEND_OK;
+ }
+ WARNING("Signal to " << nodeId << " lost(buffer)");
+ reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
+ } else {
+ }
+ } else {
+ DEBUG("Signal to " << nodeId << " lost(disconnect) ");
+ }
+ } else {
+ DEBUG("Discarding message to block: "
+ << signalHeader->theReceiversBlockNumber
+ << " node: " << nodeId);
+ if(t == NULL)
+ return SEND_BLOCKED;
+ }
+TransporterRegistry::external_IO(Uint32 timeOutMillis) {
+ //-----------------------------------------------------------
+ // Most of the time we will send the buffers here and then wait
+ // for new signals. Thus we start by sending without timeout
+ // followed by the receive part where we expect to sleep for
+ // a while.
+ //-----------------------------------------------------------
+ if(pollReceive(timeOutMillis)){
+ performReceive();
+ }
+ performSend();
+TransporterRegistry::pollReceive(Uint32 timeOutMillis){
+ Uint32 retVal = 0;
+ retVal |= poll_OSE(timeOutMillis);
+ retVal |= poll_TCP(0);
+ return retVal;
+ if((nSCITransporters) > 0)
+ {
+ timeOutMillis=0;
+ }
+ if(nSHMTransporters > 0)
+ {
+ Uint32 res = poll_SHM(0);
+ if(res)
+ {
+ retVal |= res;
+ timeOutMillis = 0;
+ }
+ }
+ if(nTCPTransporters > 0 || retVal == 0)
+ {
+ retVal |= poll_TCP(timeOutMillis);
+ }
+ else
+ tcpReadSelectReply = 0;
+ if(nSCITransporters > 0)
+ retVal |= poll_SCI(timeOutMillis);
+ if(nSHMTransporters > 0 && retVal == 0)
+ {
+ int res = poll_SHM(0);
+ retVal |= res;
+ }
+ return retVal;
+TransporterRegistry::poll_SCI(Uint32 timeOutMillis)
+ for (int i=0; i<nSCITransporters; i++) {
+ SCI_Transporter * t = theSCITransporters[i];
+ if (t->isConnected()) {
+ if(t->hasDataToRead())
+ return 1;
+ }
+ }
+ return 0;
+static int g_shm_counter = 0;
+TransporterRegistry::poll_SHM(Uint32 timeOutMillis)
+ for(int j=0; j < 100; j++)
+ {
+ for (int i=0; i<nSHMTransporters; i++) {
+ SHM_Transporter * t = theSHMTransporters[i];
+ if (t->isConnected()) {
+ if(t->hasDataToRead()) {
+ return 1;
+ }
+ }
+ }
+ }
+ return 0;
+TransporterRegistry::poll_OSE(Uint32 timeOutMillis)
+ if(theOSEReceiver != NULL){
+ return theOSEReceiver->doReceive(timeOutMillis);
+ }
+ NdbSleep_MilliSleep(timeOutMillis);
+ return 0;
+TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
+ if (false && nTCPTransporters == 0)
+ {
+ tcpReadSelectReply = 0;
+ return 0;
+ }
+ struct timeval timeout;
+#ifdef NDB_OSE
+ // Return directly if there are no TCP transporters configured
+ if(timeOutMillis <= 1){
+ timeout.tv_sec = 0;
+ timeout.tv_usec = 1025;
+ } else {
+ timeout.tv_sec = timeOutMillis / 1000;
+ timeout.tv_usec = (timeOutMillis % 1000) * 1000;
+ }
+ timeout.tv_sec = timeOutMillis / 1000;
+ timeout.tv_usec = (timeOutMillis % 1000) * 1000;
+ NDB_SOCKET_TYPE maxSocketValue = -1;
+ // Needed for TCP/IP connections
+ // The read- and writeset are used by select
+ FD_ZERO(&tcpReadset);
+ // Prepare for sending and receiving
+ for (int i = 0; i < nTCPTransporters; i++) {
+ TCP_Transporter * t = theTCPTransporters[i];
+ // If the transporter is connected
+ if (t->isConnected()) {
+ const NDB_SOCKET_TYPE socket = t->getSocket();
+ // Find the highest socket value. It will be used by select
+ if (socket > maxSocketValue)
+ maxSocketValue = socket;
+ // Put the connected transporters in the socket read-set
+ FD_SET(socket, &tcpReadset);
+ }
+ }
+ // The highest socket value plus one
+ maxSocketValue++;
+ tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);
+ if(false && tcpReadSelectReply == -1 && errno == EINTR)
+ ndbout_c("woke-up by signal");
+#ifdef NDB_WIN32
+ if(tcpReadSelectReply == SOCKET_ERROR)
+ {
+ NdbSleep_MilliSleep(timeOutMillis);
+ }
+ return tcpReadSelectReply;
+ if(theOSEReceiver != 0)
+ {
+ while(theOSEReceiver->hasData())
+ {
+ NodeId remoteNodeId;
+ Uint32 * readPtr;
+ Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr);
+ Uint32 szUsed = unpack(readPtr,
+ sz,
+ remoteNodeId,
+ ioStates[remoteNodeId]);
+ /**
+ * OSE transporter can handle executions of
+ * half signals
+ */
+ assert(sz == szUsed);
+ theOSEReceiver->updateReceiveDataPtr(szUsed);
+ theOSEReceiver->doReceive(0);
+ // checkJobBuffer();
+ }
+ }
+ if(tcpReadSelectReply > 0)
+ {
+ for (int i=0; i<nTCPTransporters; i++)
+ {
+ checkJobBuffer();
+ TCP_Transporter *t = theTCPTransporters[i];
+ const NodeId nodeId = t->getRemoteNodeId();
+ const NDB_SOCKET_TYPE socket = t->getSocket();
+ if(is_connected(nodeId)){
+ if(t->isConnected() && FD_ISSET(socket, &tcpReadset))
+ {
+ const int receiveSize = t->doReceive();
+ if(receiveSize > 0)
+ {
+ Uint32 * ptr;
+ Uint32 sz = t->getReceiveData(&ptr);
+ Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
+ t->updateReceiveDataPtr(szUsed);
+ }
+ }
+ }
+ }
+ }
+ //performReceive
+ //do prepareReceive on the SCI transporters (prepareReceive(t,,,,))
+ for (int i=0; i<nSCITransporters; i++)
+ {
+ checkJobBuffer();
+ SCI_Transporter *t = theSCITransporters[i];
+ const NodeId nodeId = t->getRemoteNodeId();
+ if(is_connected(nodeId))
+ {
+ if(t->isConnected() && t->checkConnected())
+ {
+ Uint32 * readPtr, * eodPtr;
+ t->getReceivePtr(&readPtr, &eodPtr);
+ Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
+ t->updateReceivePtr(newPtr);
+ }
+ }
+ }
+ for (int i=0; i<nSHMTransporters; i++)
+ {
+ checkJobBuffer();
+ SHM_Transporter *t = theSHMTransporters[i];
+ const NodeId nodeId = t->getRemoteNodeId();
+ if(is_connected(nodeId)){
+ if(t->isConnected() && t->checkConnected())
+ {
+ Uint32 * readPtr, * eodPtr;
+ t->getReceivePtr(&readPtr, &eodPtr);
+ Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
+ t->updateReceivePtr(newPtr);
+ }
+ }
+ }
+static int x = 0;
+ int i;
+ sendCounter = 1;
+ for (int i = 0; i < nOSETransporters; i++)
+ {
+ OSE_Transporter *t = theOSETransporters[i];
+ if(is_connected(t->getRemoteNodeId()) &&& (t->isConnected()))
+ {
+ t->doSend();
+ }//if
+ }//for
+#ifdef NDB_OSE
+ {
+ int maxSocketValue = 0;
+ // Needed for TCP/IP connections
+ // The writeset are used by select
+ fd_set writeset;
+ FD_ZERO(&writeset);
+ // Prepare for sending and receiving
+ for (i = 0; i < nTCPTransporters; i++) {
+ TCP_Transporter * t = theTCPTransporters[i];
+ // If the transporter is connected
+ if ((t->hasDataToSend()) && (t->isConnected())) {
+ const int socket = t->getSocket();
+ // Find the highest socket value. It will be used by select
+ if (socket > maxSocketValue) {
+ maxSocketValue = socket;
+ }//if
+ FD_SET(socket, &writeset);
+ }//if
+ }//for
+ // The highest socket value plus one
+ if(maxSocketValue == 0)
+ return;
+ maxSocketValue++;
+ struct timeval timeout = { 0, 1025 };
+ Uint32 tmp = select(maxSocketValue, 0, &writeset, 0, &timeout);
+ if (tmp == 0)
+ {
+ return;
+ }//if
+ for (i = 0; i < nTCPTransporters; i++) {
+ TCP_Transporter *t = theTCPTransporters[i];
+ const NodeId nodeId = t->getRemoteNodeId();
+ const int socket = t->getSocket();
+ if(is_connected(nodeId)){
+ if(t->isConnected() && FD_ISSET(socket, &writeset)) {
+ t->doSend();
+ }//if
+ }//if
+ }//for
+ }
+ for (i = x; i < nTCPTransporters; i++)
+ {
+ TCP_Transporter *t = theTCPTransporters[i];
+ if (t && t->hasDataToSend() && t->isConnected() &&
+ is_connected(t->getRemoteNodeId()))
+ {
+ t->doSend();
+ }
+ }
+ for (i = 0; i < x && i < nTCPTransporters; i++)
+ {
+ TCP_Transporter *t = theTCPTransporters[i];
+ if (t && t->hasDataToSend() && t->isConnected() &&
+ is_connected(t->getRemoteNodeId()))
+ {
+ t->doSend();
+ }
+ }
+ x++;
+ if (x == nTCPTransporters) x = 0;
+ //scroll through the SCI transporters,
+ // get each transporter, check if connected, send data
+ for (i=0; i<nSCITransporters; i++) {
+ SCI_Transporter *t = theSCITransporters[i];
+ const NodeId nodeId = t->getRemoteNodeId();
+ if(is_connected(nodeId))
+ {
+ if(t->isConnected() && t->hasDataToSend()) {
+ t->doSend();
+ } //if
+ } //if
+ }
+ for (i=0; i<nSHMTransporters; i++)
+ {
+ SHM_Transporter *t = theSHMTransporters[i];
+ const NodeId nodeId = t->getRemoteNodeId();
+ if(is_connected(nodeId))
+ {
+ if(t->isConnected())
+ {
+ t->doSend();
+ }
+ }
+ }
+TransporterRegistry::forceSendCheck(int sendLimit){
+ int tSendCounter = sendCounter;
+ sendCounter = tSendCounter + 1;
+ if (tSendCounter >= sendLimit) {
+ performSend();
+ sendCounter = 1;
+ return 1;
+ }//if
+ return 0;
+ ndbout << "-- TransporterRegistry -- " << endl << endl
+ << "Transporters = " << nTransporters << endl;
+ for(int i = 0; i<maxTransporters; i++)
+ if(theTransporters[i] != NULL){
+ const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
+ ndbout << "Transporter: " << remoteNodeId
+ << " PerformState: " << performStates[remoteNodeId]
+ << " IOState: " << ioStates[remoteNodeId] << endl;
+ }
+TransporterRegistry::ioState(NodeId nodeId) {
+ return ioStates[nodeId];
+TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
+ DEBUG("TransporterRegistry::setIOState("
+ << nodeId << ", " << state << ")");
+ ioStates[nodeId] = state;
+static void *
+run_start_clients_C(void * me)
+ ((TransporterRegistry*) me)->start_clients_thread();
+ return 0;
+// Run by kernel thread
+TransporterRegistry::do_connect(NodeId node_id)
+ PerformState &curr_state = performStates[node_id];
+ switch(curr_state){
+ break;
+ return;
+ return;
+ break;
+ }
+ curr_state= CONNECTING;
+TransporterRegistry::do_disconnect(NodeId node_id)
+ PerformState &curr_state = performStates[node_id];
+ switch(curr_state){
+ return;
+ break;
+ break;
+ return;
+ }
+ curr_state= DISCONNECTING;
+TransporterRegistry::report_connect(NodeId node_id)
+ performStates[node_id] = CONNECTED;
+ reportConnect(callbackObj, node_id);
+TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
+ performStates[node_id] = DISCONNECTED;
+ reportDisconnect(callbackObj, node_id, errnum);
+ for (int i= 0, n= 0; n < nTransporters; i++){
+ Transporter * t = theTransporters[i];
+ if (!t)
+ continue;
+ n++;
+ const NodeId nodeId = t->getRemoteNodeId();
+ switch(performStates[nodeId]){
+ break;
+ if(t->isConnected())
+ report_connect(nodeId);
+ break;
+ if(!t->isConnected())
+ report_disconnect(nodeId, 0);
+ break;
+ }
+ }
+// run as own thread
+ DBUG_ENTER("TransporterRegistry::start_clients_thread");
+ while (m_run_start_clients_thread) {
+ NdbSleep_MilliSleep(100);
+ for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
+ Transporter * t = theTransporters[i];
+ if (!t)
+ continue;
+ n++;
+ const NodeId nodeId = t->getRemoteNodeId();
+ switch(performStates[nodeId]){
+ if(!t->isConnected() && !t->isServer) {
+ bool connected= false;
+ /**
+ * First, we try to connect (if we have a port number).
+ */
+ if (t->get_s_port())
+ connected= t->connect_client();
+ /**
+ * If dynamic, get the port for connecting from the management server
+ */
+ if( !connected && t->get_s_port() <= 0) { // Port is dynamic
+ int server_port= 0;
+ struct ndb_mgm_reply mgm_reply;
+ if(!ndb_mgm_is_connected(m_mgm_handle))
+ ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
+ if(ndb_mgm_is_connected(m_mgm_handle))
+ {
+ int res=
+ ndb_mgm_get_connection_int_parameter(m_mgm_handle,
+ t->getRemoteNodeId(),
+ t->getLocalNodeId(),
+ &server_port,
+ &mgm_reply);
+ DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
+ server_port,t->getRemoteNodeId(),
+ t->getLocalNodeId(),res));
+ if( res >= 0 )
+ {
+ /**
+ * Server_port == 0 just means that that a mgmt server
+ * has not received a new port yet. Keep the old.
+ */
+ if (server_port)
+ t->set_s_port(server_port);
+ }
+ else
+ {
+ ndbout_c("Failed to get dynamic port to connect to: %d", res);
+ ndb_mgm_disconnect(m_mgm_handle);
+ }
+ }
+ /** else
+ * We will not be able to get a new port unless
+ * the m_mgm_handle is connected. Note that not
+ * being connected is an ok state, just continue
+ * until it is able to connect. Continue using the
+ * old port until we can connect again and get a
+ * new port.
+ */
+ }
+ }
+ break;
+ if(t->isConnected())
+ t->doDisconnect();
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ m_run_start_clients_thread= true;
+ m_start_clients_thread= NdbThread_Create(run_start_clients_C,
+ (void**)this,
+ 32768,
+ "ndb_start_clients",
+ if (m_start_clients_thread == 0) {
+ m_run_start_clients_thread= false;
+ return false;
+ }
+ return true;
+ if (m_start_clients_thread) {
+ m_run_start_clients_thread= false;
+ void* status;
+ NdbThread_WaitFor(m_start_clients_thread, &status);
+ NdbThread_Destroy(&m_start_clients_thread);
+ }
+ return true;
+TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
+ const char *interf,
+ int s_port)
+ DBUG_ENTER("TransporterRegistry::add_transporter_interface");
+ DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
+ if (interf && strlen(interf) == 0)
+ interf= 0;
+ for (unsigned i= 0; i < m_transporter_interface.size(); i++)
+ {
+ Transporter_interface &tmp= m_transporter_interface[i];
+ if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
+ continue;
+ if (interf != 0 && tmp.m_interface != 0 &&
+ strcmp(interf, tmp.m_interface) == 0)
+ {
+ DBUG_VOID_RETURN; // found match, no need to insert
+ }
+ if (interf == 0 && tmp.m_interface == 0)
+ {
+ DBUG_VOID_RETURN; // found match, no need to insert
+ }
+ }
+ Transporter_interface t;
+ t.m_remote_nodeId= remoteNodeId;
+ t.m_s_service_port= s_port;
+ t.m_interface= interf;
+ m_transporter_interface.push_back(t);
+ DBUG_PRINT("exit",("interface and port added"));
+TransporterRegistry::start_service(SocketServer& socket_server)
+ struct ndb_mgm_reply mgm_reply;
+ DBUG_ENTER("TransporterRegistry::start_service");
+ if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
+ {
+ ndbout_c("TransporterRegistry::startReceiving: localNodeId not specified");
+ DBUG_RETURN(false);
+ }
+ for (unsigned i= 0; i < m_transporter_interface.size(); i++)
+ {
+ Transporter_interface &t= m_transporter_interface[i];
+ unsigned short port= (unsigned short)t.m_s_service_port;
+ if(t.m_s_service_port<0)
+ port= -t.m_s_service_port; // is a dynamic port
+ TransporterService *transporter_service =
+ new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
+ if(!socket_server.setup(transporter_service,
+ &port, t.m_interface))
+ {
+ DBUG_PRINT("info", ("Trying new port"));
+ port= 0;
+ if(t.m_s_service_port>0
+ || !socket_server.setup(transporter_service,
+ &port, t.m_interface))
+ {
+ /*
+ * If it wasn't a dynamically allocated port, or
+ * our attempts at getting a new dynamic port failed
+ */
+ ndbout_c("Unable to setup transporter service port: %s:%d!\n"
+ "Please check if the port is already used,\n"
+ "(perhaps the node is already running)",
+ t.m_interface ? t.m_interface : "*", t.m_s_service_port);
+ delete transporter_service;
+ DBUG_RETURN(false);
+ }
+ }
+ t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
+ DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
+ transporter_service->setTransporterRegistry(this);
+ }
+ DBUG_RETURN(true);
+shm_sig_handler(int signo)
+ g_shm_counter++;
+ DBUG_ENTER("TransporterRegistry::startReceiving");
+ if(theOSEReceiver != NULL){
+ theOSEReceiver->createPhantom();
+ }
+#ifdef NDB_OSE
+ theOSEJunkSocketRecv = socket(AF_INET, SOCK_STREAM, 0);
+#if defined NDB_OSE || defined NDB_SOFTOSE
+ theReceiverPid = current_process();
+ for(int i = 0; i<nTCPTransporters; i++)
+ theTCPTransporters[i]->theReceiverPid = theReceiverPid;
+ m_shm_own_pid = getpid();
+ if (g_ndb_shm_signum)
+ {
+ DBUG_PRINT("info",("Install signal handler for signum %d",
+ g_ndb_shm_signum));
+ struct sigaction sa;
+ sigemptyset(&sa.sa_mask);
+ sigaddset(&sa.sa_mask, g_ndb_shm_signum);
+ pthread_sigmask(SIG_UNBLOCK, &sa.sa_mask, 0);
+ sa.sa_handler = shm_sig_handler;
+ sigemptyset(&sa.sa_mask);
+ sa.sa_flags = 0;
+ int ret;
+ while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);
+ if(ret != 0)
+ {
+ DBUG_PRINT("error",("Install failed"));
+ g_eventLogger.error("Failed to install signal handler for"
+ " SHM transporter errno: %d (%s)", errno,
+ strerror(errno));
+ }
+ }
+ if(theOSEReceiver != NULL){
+ theOSEReceiver->destroyPhantom();
+ }
+ /**
+ * Disconnect all transporters, this includes detach from remote node
+ * and since that must be done from the same process that called attach
+ * it's done here in the receive thread
+ */
+ disconnectAll();
+#if defined NDB_OSE || defined NDB_SOFTOSE
+ if(theOSEJunkSocketRecv > 0)
+ close(theOSEJunkSocketRecv);
+ theOSEJunkSocketRecv = -1;
+#if defined NDB_OSE || defined NDB_SOFTOSE
+ theOSEJunkSocketSend = socket(AF_INET, SOCK_STREAM, 0);
+#if defined NDB_OSE || defined NDB_SOFTOSE
+ if(theOSEJunkSocketSend > 0)
+ close(theOSEJunkSocketSend);
+ theOSEJunkSocketSend = -1;
+NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
+ out << "-- Signal Header --" << endl;
+ out << "theLength: " << sh.theLength << endl;
+ out << "gsn: " << sh.theVerId_signalNumber << endl;
+ out << "recBlockNo: " << sh.theReceiversBlockNumber << endl;
+ out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
+ out << "sendersSig: " << sh.theSendersSignalId << endl;
+ out << "theSignalId: " << sh.theSignalId << endl;
+ out << "trace: " << (int)sh.theTrace << endl;
+ return out;
+TransporterRegistry::get_transporter(NodeId nodeId) {
+ return theTransporters[nodeId];
+bool TransporterRegistry::connect_client(NdbMgmHandle *h)
+ DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
+ Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
+ if(!mgm_nodeid)
+ {
+ ndbout_c("%s: %d", __FILE__, __LINE__);
+ return false;
+ }
+ Transporter * t = theTransporters[mgm_nodeid];
+ if (!t)
+ {
+ ndbout_c("%s: %d", __FILE__, __LINE__);
+ return false;
+ }
+ DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h)));
+ * Given a connected NdbMgmHandle, turns it into a transporter
+ * and returns the socket.
+ */
+NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
+ struct ndb_mgm_reply mgm_reply;
+ if ( h==NULL || *h == NULL )
+ {
+ ndbout_c("%s: %d", __FILE__, __LINE__);
+ }
+ for(unsigned int i=0;i < m_transporter_interface.size();i++)
+ if (m_transporter_interface[i].m_s_service_port < 0
+ && ndb_mgm_set_connection_int_parameter(*h,
+ get_localNodeId(),
+ m_transporter_interface[i].m_remote_nodeId,
+ m_transporter_interface[i].m_s_service_port,
+ &mgm_reply) < 0)
+ {
+ ndbout_c("Error: %s: %d",
+ ndb_mgm_get_latest_error_desc(*h),
+ ndb_mgm_get_latest_error(*h));
+ ndbout_c("%s: %d", __FILE__, __LINE__);
+ ndb_mgm_destroy_handle(h);
+ }
+ /**
+ * convert_to_transporter also disposes of the handle (i.e. we don't leak
+ * memory here.
+ */
+ NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h);
+ if ( sockfd == NDB_INVALID_SOCKET)
+ {
+ ndbout_c("Error: %s: %d",
+ ndb_mgm_get_latest_error_desc(*h),
+ ndb_mgm_get_latest_error(*h));
+ ndbout_c("%s: %d", __FILE__, __LINE__);
+ ndb_mgm_destroy_handle(h);
+ }
+ return sockfd;
+ * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
+ * and returns the socket.
+ */
+NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc)
+ NdbMgmHandle h= ndb_mgm_create_handle();
+ if ( h == NULL )
+ {
+ }
+ /**
+ * Set connectstring
+ */
+ {
+ BaseString cs;
+ cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port());
+ ndb_mgm_set_connectstring(h, cs.c_str());
+ }
+ if(ndb_mgm_connect(h, 0, 0, 0)<0)
+ {
+ ndb_mgm_destroy_handle(&h);
+ }
+ return connect_ndb_mgmd(&h);
+template class Vector<TransporterRegistry::Transporter_interface>;