summaryrefslogtreecommitdiff
path: root/storage/ndb/src/common/transporter/basictest/basicTransporterTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/common/transporter/basictest/basicTransporterTest.cpp')
-rw-r--r--storage/ndb/src/common/transporter/basictest/basicTransporterTest.cpp535
1 files changed, 535 insertions, 0 deletions
diff --git a/storage/ndb/src/common/transporter/basictest/basicTransporterTest.cpp b/storage/ndb/src/common/transporter/basictest/basicTransporterTest.cpp
new file mode 100644
index 00000000000..c0a437c4907
--- /dev/null
+++ b/storage/ndb/src/common/transporter/basictest/basicTransporterTest.cpp
@@ -0,0 +1,535 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+
+#include "TransporterRegistry.hpp"
+#include "TransporterDefinitions.hpp"
+#include "TransporterCallback.hpp"
+#include <RefConvert.hpp>
+
+#include <NdbTick.h>
+#include <NdbMain.h>
+#include <NdbOut.hpp>
+#include <NdbSleep.h>
+
+int basePortTCP = 17000;
+
+SCI_TransporterConfiguration sciTemplate = {
+ 8000,
+ // Packet size
+ 2500000, // Buffer size
+ 2, // number of adapters
+ 1, // remote node id SCI
+ 2, // Remote node Id SCI
+ 0, // local ndb node id (server)
+ 0, // remote ndb node id (client)
+ 0, // byteOrder;
+ false, // compression;
+ true, // checksum;
+ true // signalId;
+};
+
+TCP_TransporterConfiguration tcpTemplate = {
+ 17000, // port;
+ "", // remoteHostName;
+ "", // localhostname
+ 2, // remoteNodeId;
+ 1, // localNodeId;
+ 10000, // sendBufferSize - Size of SendBuffer of priority B
+ 10000, // maxReceiveSize - Maximum no of bytes to receive
+ 0, // byteOrder;
+ false, // compression;
+ true, // checksum;
+ true // signalId;
+};
+
+OSE_TransporterConfiguration oseTemplate = {
+ "", // remoteHostName;
+ "", // localHostName;
+ 0, // remoteNodeId;
+ 0, // localNodeId;
+ false, // compression;
+ true, // checksum;
+ true, // signalId;
+ 0, // byteOrder;
+
+ 2000, // prioASignalSize;
+ 1000, // prioBSignalSize;
+ 10
+};
+
+SHM_TransporterConfiguration shmTemplate = {
+ 0, //remoteNodeId
+ 0, //localNodeId;
+ false, //compression
+ true, //checksum;
+ true, //signalId;
+ 0, //byteOrder;
+ 123, //shmKey;
+ 2500000 //shmSize;
+};
+
+TransporterRegistry *tReg = 0;
+
+#ifndef OSE_DELTA
+#include <signal.h>
+#endif
+
+extern "C"
+void
+signalHandler(int signo){
+#ifndef OSE_DELTA
+ ::signal(13, signalHandler);
+#endif
+ char buf[255];
+ sprintf(buf,"Signal: %d\n", signo);
+ ndbout << buf << endl;
+}
+
+void
+usage(const char * progName){
+ ndbout << "Usage: " << progName << " <type> localNodeId localHostName"
+ << " remoteHostName1 remoteHostName2" << endl;
+ ndbout << " type = shm tcp ose sci" << endl;
+ ndbout << " localNodeId - 1 to 3" << endl;
+}
+
+typedef void (* CreateTransporterFunc)(void * conf,
+ NodeId localNodeId,
+ NodeId remoteNodeId,
+ const char * localHostName,
+ const char * remoteHostName);
+
+void createOSETransporter(void *, NodeId, NodeId, const char *, const char *);
+void createSCITransporter(void *, NodeId, NodeId, const char *, const char *);
+void createTCPTransporter(void *, NodeId, NodeId, const char *, const char *);
+void createSHMTransporter(void *, NodeId, NodeId, const char *, const char *);
+
+int signalReceived[4];
+
+int
+main(int argc, const char **argv){
+
+ signalHandler(0);
+
+ for(int i = 0; i<4; i++)
+ signalReceived[i] = 0;
+
+ if(argc < 5){
+ usage(argv[0]);
+ return 0;
+ }
+
+ Uint32 noOfConnections = 0;
+ const char * progName = argv[0];
+ const char * type = argv[1];
+ const NodeId localNodeId = atoi(argv[2]);
+ const char * localHostName = argv[3];
+ const char * remoteHost1 = argv[4];
+ const char * remoteHost2 = NULL;
+
+ if(argc == 5)
+ noOfConnections = 1;
+ else {
+ noOfConnections = 2;
+ remoteHost2 = argv[5];
+ }
+
+ if(localNodeId < 1 || localNodeId > 3){
+ ndbout << "localNodeId = " << localNodeId << endl << endl;
+ usage(progName);
+ return 0;
+ }
+
+ ndbout << "-----------------" << endl;
+ ndbout << "localNodeId: " << localNodeId << endl;
+ ndbout << "localHostName: " << localHostName << endl;
+ ndbout << "remoteHost1 (node " << (localNodeId == 1?2:1) << "): "
+ << remoteHost1 << endl;
+ if(noOfConnections == 2){
+ ndbout << "remoteHost2 (node " << (localNodeId == 3?2:3) << "): "
+ << remoteHost2 << endl;
+ }
+ ndbout << "-----------------" << endl;
+
+ void * confTemplate = 0;
+ CreateTransporterFunc func = 0;
+
+ if(strcasecmp(type, "tcp") == 0){
+ func = createTCPTransporter;
+ confTemplate = &tcpTemplate;
+ } else if(strcasecmp(type, "ose") == 0){
+ func = createOSETransporter;
+ confTemplate = &oseTemplate;
+ } else if(strcasecmp(type, "sci") == 0){
+ func = createSCITransporter;
+ confTemplate = &sciTemplate;
+ } else if(strcasecmp(type, "shm") == 0){
+ func = createSHMTransporter;
+ confTemplate = &shmTemplate;
+ } else {
+ ndbout << "Unsupported transporter type" << endl;
+ return 0;
+ }
+
+ ndbout << "Creating transporter registry" << endl;
+ tReg = new TransporterRegistry;
+ tReg->init(localNodeId);
+
+ switch(localNodeId){
+ case 1:
+ (* func)(confTemplate, 1, 2, localHostName, remoteHost1);
+ if(noOfConnections == 2)
+ (* func)(confTemplate, 1, 3, localHostName, remoteHost2);
+ break;
+ case 2:
+ (* func)(confTemplate, 2, 1, localHostName, remoteHost1);
+ if(noOfConnections == 2)
+ (* func)(confTemplate, 2, 3, localHostName, remoteHost2);
+ break;
+ case 3:
+ (* func)(confTemplate, 3, 1, localHostName, remoteHost1);
+ if(noOfConnections == 2)
+ (* func)(confTemplate, 3, 2, localHostName, remoteHost2);
+ break;
+ }
+
+ ndbout << "Doing startSending/startReceiving" << endl;
+ tReg->startSending();
+ tReg->startReceiving();
+
+ ndbout << "Connecting" << endl;
+ tReg->setPerformState(PerformConnect);
+ tReg->checkConnections();
+
+ unsigned sum = 0;
+ do {
+ sum = 0;
+ for(int i = 0; i<4; i++)
+ sum += signalReceived[i];
+
+ tReg->checkConnections();
+
+ tReg->external_IO(500);
+ NdbSleep_MilliSleep(500);
+
+ ndbout << "In main loop" << endl;
+ } while(sum != 2*noOfConnections);
+
+ ndbout << "Doing setPerformState(Disconnect)" << endl;
+ tReg->setPerformState(PerformDisconnect);
+
+ ndbout << "Doing checkConnections()" << endl;
+ tReg->checkConnections();
+
+ ndbout << "Sleeping 3 secs" << endl;
+ NdbSleep_SecSleep(3);
+
+ ndbout << "Deleting transporter registry" << endl;
+ delete tReg; tReg = 0;
+
+ return 0;
+}
+
+void
+checkData(SignalHeader * const header, Uint8 prio, Uint32 * const theData,
+ LinearSectionPtr ptr[3]){
+ Uint32 expectedLength = 0;
+ if(prio == 0)
+ expectedLength = 17;
+ else
+ expectedLength = 19;
+
+ if(header->theLength != expectedLength){
+ ndbout << "Unexpected signal length: " << header->theLength
+ << " expected: " << expectedLength << endl;
+ abort();
+ }
+
+ if(header->theVerId_signalNumber != expectedLength + 1)
+ abort();
+
+ if(header->theReceiversBlockNumber != expectedLength + 2)
+ abort();
+
+ if(refToBlock(header->theSendersBlockRef) != expectedLength + 3)
+ abort();
+
+ if(header->theSendersSignalId != expectedLength + 5)
+ abort();
+
+ if(header->theTrace != expectedLength + 6)
+ abort();
+
+ if(header->m_noOfSections != (prio == 0 ? 0 : 1))
+ abort();
+
+ if(header->m_fragmentInfo != (prio + 1))
+ abort();
+
+ Uint32 dataWordStart = header->theLength ;
+ for(unsigned i = 0; i<header->theLength; i++){
+ if(theData[i] != i){ //dataWordStart){
+ ndbout << "data corrupt!\n" << endl;
+ abort();
+ }
+ dataWordStart ^= (~i*i);
+ }
+
+ if(prio != 0){
+ ndbout_c("Found section");
+ if(ptr[0].sz != header->theLength)
+ abort();
+
+ if(memcmp(ptr[0].p, theData, (ptr[0].sz * 4)) != 0)
+ abort();
+ }
+}
+
+void
+sendSignalTo(NodeId nodeId, int prio){
+ SignalHeader sh;
+ sh.theLength = (prio == 0 ? 17 : 19);
+ sh.theVerId_signalNumber = sh.theLength + 1;
+ sh.theReceiversBlockNumber = sh.theLength + 2;
+ sh.theSendersBlockRef = sh.theLength + 3;
+ sh.theSendersSignalId = sh.theLength + 4;
+ sh.theSignalId = sh.theLength + 5;
+ sh.theTrace = sh.theLength + 6;
+ sh.m_noOfSections = (prio == 0 ? 0 : 1);
+ sh.m_fragmentInfo = prio + 1;
+
+ Uint32 theData[25];
+
+ Uint32 dataWordStart = sh.theLength;
+ for(unsigned i = 0; i<sh.theLength; i++){
+ theData[i] = i;
+ dataWordStart ^= (~i*i);
+ }
+ ndbout << "Sending prio " << (int)prio << " signal to node: "
+ << nodeId
+ << " gsn = " << sh.theVerId_signalNumber << endl;
+
+ LinearSectionPtr ptr[3];
+ ptr[0].p = &theData[0];
+ ptr[0].sz = sh.theLength;
+
+ SendStatus s = tReg->prepareSend(&sh, prio, theData, nodeId, ptr);
+ if(s != SEND_OK){
+ ndbout << "Send was not ok. Send was: " << s << endl;
+ }
+}
+
+void
+execute(void* callbackObj,
+ SignalHeader * const header, Uint8 prio, Uint32 * const theData,
+ LinearSectionPtr ptr[3]){
+ const NodeId nodeId = refToNode(header->theSendersBlockRef);
+
+ ndbout << "Recieved prio " << (int)prio << " signal from node: "
+ << nodeId
+ << " gsn = " << header->theVerId_signalNumber << endl;
+ checkData(header, prio, theData, ptr);
+ ndbout << " Data is ok!\n" << endl;
+
+ signalReceived[nodeId]++;
+
+ if(prio == 0)
+ sendSignalTo(nodeId, 1);
+ else
+ tReg->setPerformState(nodeId, PerformDisconnect);
+}
+
+void
+copy(Uint32 * & insertPtr,
+ class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
+ abort();
+}
+
+void
+reportError(void* callbackObj, NodeId nodeId, TransporterError errorCode){
+ char buf[255];
+ sprintf(buf, "reportError (%d, %x)", nodeId, errorCode);
+ ndbout << buf << endl;
+ if(errorCode & 0x8000){
+ tReg->setPerformState(nodeId, PerformDisconnect);
+ abort();
+ }
+}
+
+/**
+ * Report average send theLength in bytes (4096 last sends)
+ */
+void
+reportSendLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
+ char buf[255];
+ sprintf(buf, "reportSendLen(%d, %d)", nodeId, (Uint32)(bytes/count));
+ ndbout << buf << endl;
+}
+
+/**
+ * Report average receive theLength in bytes (4096 last receives)
+ */
+void
+reportReceiveLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
+ char buf[255];
+ sprintf(buf, "reportReceiveLen(%d, %d)", nodeId, (Uint32)(bytes/count));
+ ndbout << buf << endl;
+}
+
+/**
+ * Report connection established
+ */
+void
+reportConnect(void* callbackObj, NodeId nodeId){
+ char buf[255];
+ sprintf(buf, "reportConnect(%d)", nodeId);
+ ndbout << buf << endl;
+ tReg->setPerformState(nodeId, PerformIO);
+
+ sendSignalTo(nodeId, 0);
+}
+
+/**
+ * Report connection broken
+ */
+void
+reportDisconnect(void* callbackObj, NodeId nodeId, Uint32 errNo){
+ char buf[255];
+ sprintf(buf, "reportDisconnect(%d)", nodeId);
+ ndbout << buf << endl;
+ if(signalReceived[nodeId] < 2)
+ tReg->setPerformState(nodeId, PerformConnect);
+}
+
+int
+checkJobBuffer() {
+ /**
+ * Check to see if jobbbuffers are starting to get full
+ * and if so call doJob
+ */
+ return 0;
+}
+
+void
+createOSETransporter(void * _conf,
+ NodeId localNodeId,
+ NodeId remoteNodeId,
+ const char * localHostName,
+ const char * remoteHostName){
+ ndbout << "Creating OSE transporter from node "
+ << localNodeId << "(" << localHostName << ") to "
+ << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
+
+ OSE_TransporterConfiguration * conf = (OSE_TransporterConfiguration*)_conf;
+
+ conf->localNodeId = localNodeId;
+ conf->localHostName = localHostName;
+ conf->remoteNodeId = remoteNodeId;
+ conf->remoteHostName = remoteHostName;
+ bool res = tReg->createTransporter(conf);
+ if(res)
+ ndbout << "... -- Success " << endl;
+ else
+ ndbout << "... -- Failure " << endl;
+}
+
+void
+createTCPTransporter(void * _conf,
+ NodeId localNodeId,
+ NodeId remoteNodeId,
+ const char * localHostName,
+ const char * remoteHostName){
+ ndbout << "Creating TCP transporter from node "
+ << localNodeId << "(" << localHostName << ") to "
+ << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
+
+ TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
+
+ int port;
+ if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
+ if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
+ if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
+ if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
+ if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
+ if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
+
+ conf->localNodeId = localNodeId;
+ conf->localHostName = localHostName;
+ conf->remoteNodeId = remoteNodeId;
+ conf->remoteHostName = remoteHostName;
+ conf->port = port;
+ bool res = tReg->createTransporter(conf);
+ if(res)
+ ndbout << "... -- Success " << endl;
+ else
+ ndbout << "... -- Failure " << endl;
+}
+
+void
+createSCITransporter(void * _conf,
+ NodeId localNodeId,
+ NodeId remoteNodeId,
+ const char * localHostName,
+ const char * remoteHostName){
+
+
+ ndbout << "Creating SCI transporter from node "
+ << localNodeId << "(" << localHostName << ") to "
+ << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
+
+
+ SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
+
+ conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
+ conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
+
+
+ conf->localNodeId = localNodeId;
+ conf->remoteNodeId = remoteNodeId;
+
+ bool res = tReg->createTransporter(conf);
+ if(res)
+ ndbout << "... -- Success " << endl;
+ else
+ ndbout << "... -- Failure " << endl;
+}
+
+void
+createSHMTransporter(void * _conf,
+ NodeId localNodeId,
+ NodeId remoteNodeId,
+ const char * localHostName,
+ const char * remoteHostName){
+
+
+ ndbout << "Creating SHM transporter from node "
+ << localNodeId << "(" << localHostName << ") to "
+ << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
+
+
+ SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
+
+ conf->localNodeId = localNodeId;
+ conf->remoteNodeId = remoteNodeId;
+
+ bool res = tReg->createTransporter(conf);
+ if(res)
+ ndbout << "... -- Success " << endl;
+ else
+ ndbout << "... -- Failure " << endl;
+}