summaryrefslogtreecommitdiff
path: root/storage/ndb/src/ndbapi/TransporterFacade.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/ndbapi/TransporterFacade.cpp')
-rw-r--r--storage/ndb/src/ndbapi/TransporterFacade.cpp1108
1 files changed, 1108 insertions, 0 deletions
diff --git a/storage/ndb/src/ndbapi/TransporterFacade.cpp b/storage/ndb/src/ndbapi/TransporterFacade.cpp
new file mode 100644
index 00000000000..7f1e68a42d3
--- /dev/null
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp
@@ -0,0 +1,1108 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+#include <my_pthread.h>
+#include <ndb_limits.h>
+#include "TransporterFacade.hpp"
+#include "ClusterMgr.hpp"
+#include <IPCConfig.hpp>
+#include <TransporterCallback.hpp>
+#include <TransporterRegistry.hpp>
+#include "NdbApiSignal.hpp"
+#include <NdbOut.hpp>
+#include <NdbEnv.h>
+#include <NdbSleep.h>
+
+#include "API.hpp"
+#include <ConfigRetriever.hpp>
+#include <mgmapi_config_parameters.h>
+#include <mgmapi_configuration.hpp>
+#include <NdbConfig.h>
+#include <ndb_version.h>
+#include <SignalLoggerManager.hpp>
+#include <kernel/ndb_limits.h>
+
+//#define REPORT_TRANSPORTER
+//#define API_TRACE;
+
+static int numberToIndex(int number)
+{
+ return number - MIN_API_BLOCK_NO;
+}
+
+static int indexToNumber(int index)
+{
+ return index + MIN_API_BLOCK_NO;
+}
+
+#if defined DEBUG_TRANSPORTER
+#define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
+#else
+#define TRP_DEBUG(t)
+#endif
+
+TransporterFacade* TransporterFacade::theFacadeInstance = NULL;
+
+/*****************************************************************************
+ * Call back functions
+ *****************************************************************************/
+
+void
+reportError(void * callbackObj, NodeId nodeId, TransporterError errorCode){
+#ifdef REPORT_TRANSPORTER
+ ndbout_c("REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d)",
+ (int)nodeId, (int)errorCode);
+#endif
+ if(errorCode & 0x8000) {
+ ndbout_c("reportError (%d, %d)\n", (int)nodeId, (int)errorCode);
+ ((TransporterFacade*)(callbackObj))->doDisconnect(nodeId);
+ }
+}
+
+/**
+ * Report average send length in bytes (4096 last sends)
+ */
+void
+reportSendLen(void * callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
+#ifdef REPORT_TRANSPORTER
+ ndbout_c("REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)",
+ (int)nodeId, (Uint32)(bytes/count));
+#endif
+ (void)nodeId;
+ (void)count;
+ (void)bytes;
+}
+
+/**
+ * Report average receive length in bytes (4096 last receives)
+ */
+void
+reportReceiveLen(void * callbackObj,
+ NodeId nodeId, Uint32 count, Uint64 bytes){
+#ifdef REPORT_TRANSPORTER
+ ndbout_c("REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)",
+ (int)nodeId, (Uint32)(bytes/count));
+#endif
+ (void)nodeId;
+ (void)count;
+ (void)bytes;
+}
+
+/**
+ * Report connection established
+ */
+void
+reportConnect(void * callbackObj, NodeId nodeId){
+#ifdef REPORT_TRANSPORTER
+ ndbout_c("REPORT_TRANSP: API reportConnect (nodeId=%d)", (int)nodeId);
+#endif
+ ((TransporterFacade*)(callbackObj))->reportConnected(nodeId);
+ // TransporterFacade::instance()->reportConnected(nodeId);
+}
+
+/**
+ * Report connection broken
+ */
+void
+reportDisconnect(void * callbackObj, NodeId nodeId, Uint32 error){
+#ifdef REPORT_TRANSPORTER
+ ndbout_c("REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (int)nodeId);
+#endif
+ ((TransporterFacade*)(callbackObj))->reportDisconnected(nodeId);
+ //TransporterFacade::instance()->reportDisconnected(nodeId);
+}
+
+
+/****************************************************************************
+ *
+ *****************************************************************************/
+
+/**
+ * Report connection broken
+ */
+int checkJobBuffer() {
+ return 0;
+}
+
+#ifdef API_TRACE
+static const char * API_SIGNAL_LOG = "API_SIGNAL_LOG";
+static const char * apiSignalLog = 0;
+static SignalLoggerManager signalLogger;
+static
+inline
+bool
+setSignalLog(){
+ signalLogger.flushSignalLog();
+
+ const char * tmp = NdbEnv_GetEnv(API_SIGNAL_LOG, (char *)0, 0);
+ if(tmp != 0 && apiSignalLog != 0 && strcmp(tmp,apiSignalLog) == 0){
+ return true;
+ } else if(tmp == 0 && apiSignalLog == 0){
+ return false;
+ } else if(tmp == 0 && apiSignalLog != 0){
+ signalLogger.setOutputStream(0);
+ apiSignalLog = tmp;
+ return false;
+ } else if(tmp !=0){
+ if (strcmp(tmp, "-") == 0)
+ signalLogger.setOutputStream(stdout);
+ else
+ signalLogger.setOutputStream(fopen(tmp, "w"));
+ apiSignalLog = tmp;
+ return true;
+ }
+ return false;
+}
+#ifdef TRACE_APIREGREQ
+#define TRACE_GSN(gsn) true
+#else
+#define TRACE_GSN(gsn) (gsn != GSN_API_REGREQ && gsn != GSN_API_REGCONF)
+#endif
+#endif
+
+/**
+ * The execute function : Handle received signal
+ */
+void
+execute(void * callbackObj, SignalHeader * const header,
+ Uint8 prio, Uint32 * const theData,
+ LinearSectionPtr ptr[3]){
+
+ TransporterFacade * theFacade = (TransporterFacade*)callbackObj;
+ TransporterFacade::ThreadData::Object_Execute oe;
+ Uint32 tRecBlockNo = header->theReceiversBlockNumber;
+
+#ifdef API_TRACE
+ if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){
+ signalLogger.executeSignal(* header,
+ prio,
+ theData,
+ theFacade->ownId(),
+ ptr, header->m_noOfSections);
+ signalLogger.flushSignalLog();
+ }
+#endif
+
+ if (tRecBlockNo >= MIN_API_BLOCK_NO) {
+ oe = theFacade->m_threads.get(tRecBlockNo);
+ if (oe.m_object != 0 && oe.m_executeFunction != 0) {
+ /**
+ * Handle received signal immediately to avoid any unnecessary
+ * copying of data, allocation of memory and other things. Copying
+ * of data could be interesting to support several priority levels
+ * and to support a special memory structure when executing the
+ * signals. Neither of those are interesting when receiving data
+ * in the NDBAPI. The NDBAPI will thus read signal data directly as
+ * it was written by the sender (SCI sender is other node, Shared
+ * memory sender is other process and TCP/IP sender is the OS that
+ * writes the TCP/IP message into a message buffer).
+ */
+ NdbApiSignal tmpSignal(*header);
+ NdbApiSignal * tSignal = &tmpSignal;
+ tSignal->setDataPtr(theData);
+ (* oe.m_executeFunction) (oe.m_object, tSignal, ptr);
+ }//if
+ } else if (tRecBlockNo == API_PACKED) {
+ /**
+ * Block number == 2047 is used to signal a signal that consists of
+ * multiple instances of the same signal. This is an effort to
+ * package the signals so as to avoid unnecessary communication
+ * overhead since TCP/IP has a great performance impact.
+ */
+ Uint32 Tlength = header->theLength;
+ Uint32 Tsent = 0;
+ /**
+ * Since it contains at least two data packets we will first
+ * copy the signal data to safe place.
+ */
+ while (Tsent < Tlength) {
+ Uint32 Theader = theData[Tsent];
+ Tsent++;
+ Uint32 TpacketLen = (Theader & 0x1F) + 3;
+ tRecBlockNo = Theader >> 16;
+ if (TpacketLen <= 25) {
+ if ((TpacketLen + Tsent) <= Tlength) {
+ /**
+ * Set the data length of the signal and the receivers block
+ * reference and then call the API.
+ */
+ header->theLength = TpacketLen;
+ header->theReceiversBlockNumber = tRecBlockNo;
+ Uint32* tDataPtr = &theData[Tsent];
+ Tsent += TpacketLen;
+ if (tRecBlockNo >= MIN_API_BLOCK_NO) {
+ oe = theFacade->m_threads.get(tRecBlockNo);
+ if(oe.m_object != 0 && oe.m_executeFunction != 0){
+ NdbApiSignal tmpSignal(*header);
+ NdbApiSignal * tSignal = &tmpSignal;
+ tSignal->setDataPtr(tDataPtr);
+ (*oe.m_executeFunction)(oe.m_object, tSignal, 0);
+ }
+ }
+ }
+ }
+ }
+ return;
+ } else if (tRecBlockNo == API_CLUSTERMGR) {
+ /**
+ * The signal was aimed for the Cluster Manager.
+ * We handle it immediately here.
+ */
+ ClusterMgr * clusterMgr = theFacade->theClusterMgr;
+ const Uint32 gsn = header->theVerId_signalNumber;
+
+ switch (gsn){
+ case GSN_API_REGREQ:
+ clusterMgr->execAPI_REGREQ(theData);
+ break;
+
+ case GSN_API_REGCONF:
+ clusterMgr->execAPI_REGCONF(theData);
+ break;
+
+ case GSN_API_REGREF:
+ clusterMgr->execAPI_REGREF(theData);
+ break;
+
+ case GSN_NODE_FAILREP:
+ clusterMgr->execNODE_FAILREP(theData);
+ break;
+
+ case GSN_NF_COMPLETEREP:
+ clusterMgr->execNF_COMPLETEREP(theData);
+ break;
+
+ case GSN_ARBIT_STARTREQ:
+ if (theFacade->theArbitMgr != NULL)
+ theFacade->theArbitMgr->doStart(theData);
+ break;
+
+ case GSN_ARBIT_CHOOSEREQ:
+ if (theFacade->theArbitMgr != NULL)
+ theFacade->theArbitMgr->doChoose(theData);
+ break;
+
+ case GSN_ARBIT_STOPORD:
+ if(theFacade->theArbitMgr != NULL)
+ theFacade->theArbitMgr->doStop(theData);
+ break;
+
+ default:
+ break;
+
+ }
+ return;
+ } else {
+ ; // Ignore all other block numbers.
+ if(header->theVerId_signalNumber!=3) {
+ TRP_DEBUG( "TransporterFacade received signal to unknown block no." );
+ ndbout << "BLOCK NO: " << tRecBlockNo << " sig "
+ << header->theVerId_signalNumber << endl;
+ abort();
+ }
+ }
+}
+
+// These symbols are needed, but not used in the API
+void
+SignalLoggerManager::printSegmentedSection(FILE *, const SignalHeader &,
+ const SegmentedSectionPtr ptr[3],
+ unsigned i){
+ abort();
+}
+
+void
+copy(Uint32 * & insertPtr,
+ class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
+ abort();
+}
+
+/**
+ * Note that this function need no locking since its
+ * only called from the constructor of Ndb (the NdbObject)
+ *
+ * Which is protected by a mutex
+ */
+
+int
+TransporterFacade::start_instance(int nodeId,
+ const ndb_mgm_configuration* props)
+{
+ if (! theFacadeInstance->init(nodeId, props)) {
+ return -1;
+ }
+
+ /**
+ * Install signal handler for SIGPIPE
+ *
+ * This due to the fact that a socket connection might have
+ * been closed in between a select and a corresponding send
+ */
+#if !defined NDB_OSE && !defined NDB_SOFTOSE && !defined NDB_WIN32
+ signal(SIGPIPE, SIG_IGN);
+#endif
+
+ return 0;
+}
+
+/**
+ * Note that this function need no locking since its
+ * only called from the destructor of Ndb (the NdbObject)
+ *
+ * Which is protected by a mutex
+ */
+void
+TransporterFacade::stop_instance(){
+ DBUG_ENTER("TransporterFacade::stop_instance");
+ if(theFacadeInstance)
+ theFacadeInstance->doStop();
+ DBUG_VOID_RETURN;
+}
+
+void
+TransporterFacade::doStop(){
+ DBUG_ENTER("TransporterFacade::doStop");
+ /**
+ * First stop the ClusterMgr because it needs to send one more signal
+ * and also uses theFacadeInstance to lock/unlock theMutexPtr
+ */
+ if (theClusterMgr != NULL) theClusterMgr->doStop();
+ if (theArbitMgr != NULL) theArbitMgr->doStop(NULL);
+
+ /**
+ * Now stop the send and receive threads
+ */
+ void *status;
+ theStopReceive = 1;
+ if (theReceiveThread) {
+ NdbThread_WaitFor(theReceiveThread, &status);
+ NdbThread_Destroy(&theReceiveThread);
+ theReceiveThread= 0;
+ }
+ if (theSendThread) {
+ NdbThread_WaitFor(theSendThread, &status);
+ NdbThread_Destroy(&theSendThread);
+ theSendThread= 0;
+ }
+ DBUG_VOID_RETURN;
+}
+
+extern "C"
+void*
+runSendRequest_C(void * me)
+{
+ ((TransporterFacade*) me)->threadMainSend();
+ return 0;
+}
+
+void TransporterFacade::threadMainSend(void)
+{
+ theTransporterRegistry->startSending();
+ if (!theTransporterRegistry->start_clients()){
+ ndbout_c("Unable to start theTransporterRegistry->start_clients");
+ exit(0);
+ }
+
+ m_socket_server.startServer();
+
+ while(!theStopReceive) {
+ NdbSleep_MilliSleep(10);
+ NdbMutex_Lock(theMutexPtr);
+ if (sendPerformedLastInterval == 0) {
+ theTransporterRegistry->performSend();
+ }
+ sendPerformedLastInterval = 0;
+ NdbMutex_Unlock(theMutexPtr);
+ }
+ theTransporterRegistry->stopSending();
+
+ m_socket_server.stopServer();
+ m_socket_server.stopSessions();
+
+ theTransporterRegistry->stop_clients();
+}
+
+extern "C"
+void*
+runReceiveResponse_C(void * me)
+{
+ ((TransporterFacade*) me)->threadMainReceive();
+ return 0;
+}
+
+void TransporterFacade::threadMainReceive(void)
+{
+ theTransporterRegistry->startReceiving();
+ NdbMutex_Lock(theMutexPtr);
+ theTransporterRegistry->update_connections();
+ NdbMutex_Unlock(theMutexPtr);
+ while(!theStopReceive) {
+ for(int i = 0; i<10; i++){
+ const int res = theTransporterRegistry->pollReceive(10);
+ if(res > 0){
+ NdbMutex_Lock(theMutexPtr);
+ theTransporterRegistry->performReceive();
+ NdbMutex_Unlock(theMutexPtr);
+ }
+ }
+ NdbMutex_Lock(theMutexPtr);
+ theTransporterRegistry->update_connections();
+ NdbMutex_Unlock(theMutexPtr);
+ }//while
+ theTransporterRegistry->stopReceiving();
+}
+
+TransporterFacade::TransporterFacade() :
+ theTransporterRegistry(0),
+ theStopReceive(0),
+ theSendThread(NULL),
+ theReceiveThread(NULL),
+ m_fragmented_signal_id(0)
+{
+ theOwnId = 0;
+
+ theMutexPtr = NdbMutex_Create();
+ sendPerformedLastInterval = 0;
+
+ checkCounter = 4;
+ currentSendLimit = 1;
+ theClusterMgr = NULL;
+ theArbitMgr = NULL;
+ theStartNodeId = 1;
+ m_scan_batch_size= MAX_SCAN_BATCH_SIZE;
+ m_batch_byte_size= SCAN_BATCH_SIZE;
+ m_batch_size= DEF_BATCH_SIZE;
+ m_max_trans_id = 0;
+
+ theClusterMgr = new ClusterMgr(* this);
+}
+
+bool
+TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
+{
+ theOwnId = nodeId;
+ theTransporterRegistry = new TransporterRegistry(this);
+
+ const int res = IPCConfig::configureTransporters(nodeId,
+ * props,
+ * theTransporterRegistry);
+ if(res <= 0){
+ TRP_DEBUG( "configureTransporters returned 0 or less" );
+ return false;
+ }
+
+ ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
+ iter.first();
+ theClusterMgr->init(iter);
+
+ iter.first();
+ if(iter.find(CFG_NODE_ID, nodeId)){
+ TRP_DEBUG( "Node info missing from config." );
+ return false;
+ }
+
+ Uint32 rank = 0;
+ if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){
+ theArbitMgr = new ArbitMgr(* this);
+ theArbitMgr->setRank(rank);
+ Uint32 delay = 0;
+ iter.get(CFG_NODE_ARBIT_DELAY, &delay);
+ theArbitMgr->setDelay(delay);
+ }
+ Uint32 scan_batch_size= 0;
+ if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
+ m_scan_batch_size= scan_batch_size;
+ }
+ Uint32 batch_byte_size= 0;
+ if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
+ m_batch_byte_size= batch_byte_size;
+ }
+ Uint32 batch_size= 0;
+ if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
+ m_batch_size= batch_size;
+ }
+
+ if (!theTransporterRegistry->start_service(m_socket_server)){
+ ndbout_c("Unable to start theTransporterRegistry->start_service");
+ return false;
+ }
+
+ theReceiveThread = NdbThread_Create(runReceiveResponse_C,
+ (void**)this,
+ 32768,
+ "ndb_receive",
+ NDB_THREAD_PRIO_LOW);
+
+ theSendThread = NdbThread_Create(runSendRequest_C,
+ (void**)this,
+ 32768,
+ "ndb_send",
+ NDB_THREAD_PRIO_LOW);
+ theClusterMgr->startThread();
+
+#ifdef API_TRACE
+ signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
+#endif
+
+ return true;
+}
+
+
+void
+TransporterFacade::connected()
+{
+ DBUG_ENTER("TransporterFacade::connected");
+ Uint32 sz = m_threads.m_statusNext.size();
+ for (Uint32 i = 0; i < sz ; i ++) {
+ if (m_threads.getInUse(i)){
+ void * obj = m_threads.m_objectExecute[i].m_object;
+ NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
+ (*RegPC) (obj, numberToRef(indexToNumber(i), theOwnId), true, true);
+ }
+ }
+ DBUG_VOID_RETURN;
+}
+
+void
+TransporterFacade::ReportNodeDead(NodeId tNodeId)
+{
+ /**
+ * When a node fails we must report this to each Ndb object.
+ * The function that is used for communicating node failures is called.
+ * This is to ensure that the Ndb objects do not think their connections
+ * are correct after a failure followed by a restart.
+ * After the restart the node is up again and the Ndb object
+ * might not have noticed the failure.
+ */
+ Uint32 sz = m_threads.m_statusNext.size();
+ for (Uint32 i = 0; i < sz ; i ++) {
+ if (m_threads.getInUse(i)){
+ void * obj = m_threads.m_objectExecute[i].m_object;
+ NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
+ (*RegPC) (obj, tNodeId, false, false);
+ }
+ }
+}
+
+void
+TransporterFacade::ReportNodeFailureComplete(NodeId tNodeId)
+{
+ /**
+ * When a node fails we must report this to each Ndb object.
+ * The function that is used for communicating node failures is called.
+ * This is to ensure that the Ndb objects do not think their connections
+ * are correct after a failure followed by a restart.
+ * After the restart the node is up again and the Ndb object
+ * might not have noticed the failure.
+ */
+
+ DBUG_ENTER("TransporterFacade::ReportNodeFailureComplete");
+ DBUG_PRINT("enter",("nodeid= %d", tNodeId));
+ Uint32 sz = m_threads.m_statusNext.size();
+ for (Uint32 i = 0; i < sz ; i ++) {
+ if (m_threads.getInUse(i)){
+ void * obj = m_threads.m_objectExecute[i].m_object;
+ NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
+ (*RegPC) (obj, tNodeId, false, true);
+ }
+ }
+ DBUG_VOID_RETURN;
+}
+
+void
+TransporterFacade::ReportNodeAlive(NodeId tNodeId)
+{
+ /**
+ * When a node fails we must report this to each Ndb object.
+ * The function that is used for communicating node failures is called.
+ * This is to ensure that the Ndb objects do not think there connections
+ * are correct after a failure
+ * followed by a restart.
+ * After the restart the node is up again and the Ndb object
+ * might not have noticed the failure.
+ */
+ Uint32 sz = m_threads.m_statusNext.size();
+ for (Uint32 i = 0; i < sz ; i ++) {
+ if (m_threads.getInUse(i)){
+ void * obj = m_threads.m_objectExecute[i].m_object;
+ NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
+ (*RegPC) (obj, tNodeId, true, false);
+ }
+ }
+}
+
+int
+TransporterFacade::close(BlockNumber blockNumber, Uint64 trans_id)
+{
+ NdbMutex_Lock(theMutexPtr);
+ Uint32 low_bits = (Uint32)trans_id;
+ m_max_trans_id = m_max_trans_id > low_bits ? m_max_trans_id : low_bits;
+ close_local(blockNumber);
+ NdbMutex_Unlock(theMutexPtr);
+ return 0;
+}
+
+int
+TransporterFacade::close_local(BlockNumber blockNumber){
+ m_threads.close(blockNumber);
+ return 0;
+}
+
+int
+TransporterFacade::open(void* objRef,
+ ExecuteFunction fun,
+ NodeStatusFunction statusFun)
+{
+ DBUG_ENTER("TransporterFacade::open");
+ int r= m_threads.open(objRef, fun, statusFun);
+ if (r < 0)
+ DBUG_RETURN(r);
+#if 1
+ if (theOwnId > 0) {
+ (*statusFun)(objRef, numberToRef(r, theOwnId), true, true);
+ }
+#endif
+ DBUG_RETURN(r);
+}
+
+TransporterFacade::~TransporterFacade(){
+
+ NdbMutex_Lock(theMutexPtr);
+ delete theClusterMgr;
+ delete theArbitMgr;
+ delete theTransporterRegistry;
+ NdbMutex_Unlock(theMutexPtr);
+ NdbMutex_Destroy(theMutexPtr);
+#ifdef API_TRACE
+ signalLogger.setOutputStream(0);
+#endif
+}
+
+void
+TransporterFacade::calculateSendLimit()
+{
+ Uint32 Ti;
+ Uint32 TthreadCount = 0;
+
+ Uint32 sz = m_threads.m_statusNext.size();
+ for (Ti = 0; Ti < sz; Ti++) {
+ if (m_threads.m_statusNext[Ti] == (ThreadData::ACTIVE)){
+ TthreadCount++;
+ m_threads.m_statusNext[Ti] = ThreadData::INACTIVE;
+ }
+ }
+ currentSendLimit = TthreadCount;
+ if (currentSendLimit == 0) {
+ currentSendLimit = 1;
+ }
+ checkCounter = currentSendLimit << 2;
+}
+
+
+//-------------------------------------------------
+// Force sending but still report the sending to the
+// adaptive algorithm.
+//-------------------------------------------------
+void TransporterFacade::forceSend(Uint32 block_number) {
+ checkCounter--;
+ m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
+ sendPerformedLastInterval = 1;
+ if (checkCounter < 0) {
+ calculateSendLimit();
+ }
+ theTransporterRegistry->forceSendCheck(0);
+}
+
+//-------------------------------------------------
+// Improving API performance
+//-------------------------------------------------
+void
+TransporterFacade::checkForceSend(Uint32 block_number) {
+ m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
+ //-------------------------------------------------
+ // This code is an adaptive algorithm to discover when
+ // the API should actually send its buffers. The reason
+ // is that the performance is highly dependent on the
+ // size of the writes over the communication network.
+ // Thus we try to ensure that the send size is as big
+ // as possible. At the same time we don't want response
+ // time to increase so therefore we have to keep track of
+ // how the users are performing adaptively.
+ //-------------------------------------------------
+
+ if (theTransporterRegistry->forceSendCheck(currentSendLimit) == 1) {
+ sendPerformedLastInterval = 1;
+ }
+ checkCounter--;
+ if (checkCounter < 0) {
+ calculateSendLimit();
+ }
+}
+
+
+/******************************************************************************
+ * SEND SIGNAL METHODS
+ *****************************************************************************/
+int
+TransporterFacade::sendSignal(NdbApiSignal * aSignal, NodeId aNode){
+ Uint32* tDataPtr = aSignal->getDataPtrSend();
+ Uint32 Tlen = aSignal->theLength;
+ Uint32 TBno = aSignal->theReceiversBlockNumber;
+ if(getIsNodeSendable(aNode) == true){
+#ifdef API_TRACE
+ if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
+ Uint32 tmp = aSignal->theSendersBlockRef;
+ aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
+ LinearSectionPtr ptr[3];
+ signalLogger.sendSignal(* aSignal,
+ 1,
+ aSignal->getDataPtr(),
+ aNode, ptr, 0);
+ signalLogger.flushSignalLog();
+ aSignal->theSendersBlockRef = tmp;
+ }
+#endif
+ if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) {
+ SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
+ 1, // JBB
+ tDataPtr,
+ aNode,
+ 0);
+ //if (ss != SEND_OK) ndbout << ss << endl;
+ return (ss == SEND_OK ? 0 : -1);
+ } else {
+ ndbout << "ERR: SigLen = " << Tlen << " BlockRec = " << TBno;
+ ndbout << " SignalNo = " << aSignal->theVerId_signalNumber << endl;
+ assert(0);
+ }//if
+ }
+ //const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(aNode);
+ //const Uint32 startLevel = node.m_state.startLevel;
+ return -1; // Node Dead
+}
+
+int
+TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
+#ifdef API_TRACE
+ if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
+ Uint32 tmp = aSignal->theSendersBlockRef;
+ aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
+ LinearSectionPtr ptr[3];
+ signalLogger.sendSignal(* aSignal,
+ 0,
+ aSignal->getDataPtr(),
+ aNode, ptr, 0);
+ signalLogger.flushSignalLog();
+ aSignal->theSendersBlockRef = tmp;
+ }
+#endif
+ assert((aSignal->theLength != 0) &&
+ (aSignal->theLength <= 25) &&
+ (aSignal->theReceiversBlockNumber != 0));
+ SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
+ 0,
+ aSignal->getDataPtr(),
+ aNode,
+ 0);
+
+ return (ss == SEND_OK ? 0 : -1);
+}
+
+#define CHUNK_SZ NDB_SECTION_SEGMENT_SZ*64 // related to MAX_MESSAGE_SIZE
+int
+TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode,
+ LinearSectionPtr ptr[3], Uint32 secs)
+{
+ if(getIsNodeSendable(aNode) != true)
+ return -1;
+
+#ifdef API_TRACE
+ if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
+ Uint32 tmp = aSignal->theSendersBlockRef;
+ aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
+ signalLogger.sendSignal(* aSignal,
+ 1,
+ aSignal->getDataPtrSend(),
+ aNode,
+ ptr, secs);
+ aSignal->theSendersBlockRef = tmp;
+ }
+#endif
+
+ NdbApiSignal tmp_signal(*(SignalHeader*)aSignal);
+ LinearSectionPtr tmp_ptr[3];
+ Uint32 unique_id= m_fragmented_signal_id++; // next unique id
+ unsigned i;
+ for (i= 0; i < secs; i++)
+ tmp_ptr[i]= ptr[i];
+
+ unsigned start_i= 0;
+ unsigned chunk_sz= 0;
+ unsigned fragment_info= 0;
+ Uint32 *tmp_data= tmp_signal.getDataPtrSend();
+ for (i= 0; i < secs;) {
+ unsigned save_sz= tmp_ptr[i].sz;
+ tmp_data[i-start_i]= i;
+ if (chunk_sz + save_sz > CHUNK_SZ) {
+ // truncate
+ unsigned send_sz= CHUNK_SZ - chunk_sz;
+ if (i != start_i) // first piece of a new section has to be a multiple of NDB_SECTION_SEGMENT_SZ
+ {
+ send_sz=
+ NDB_SECTION_SEGMENT_SZ
+ *(send_sz+NDB_SECTION_SEGMENT_SZ-1)
+ /NDB_SECTION_SEGMENT_SZ;
+ if (send_sz > save_sz)
+ send_sz= save_sz;
+ }
+ tmp_ptr[i].sz= send_sz;
+
+ if (fragment_info < 2) // 1 = first fragment, 2 = middle fragments
+ fragment_info++;
+
+ // send tmp_signal
+ tmp_data[i-start_i+1]= unique_id;
+ tmp_signal.setLength(i-start_i+2);
+ tmp_signal.m_fragmentInfo= fragment_info;
+ tmp_signal.m_noOfSections= i-start_i+1;
+ // do prepare send
+ {
+ SendStatus ss = theTransporterRegistry->prepareSend
+ (&tmp_signal,
+ 1, /*JBB*/
+ tmp_data,
+ aNode,
+ &tmp_ptr[start_i]);
+ assert(ss != SEND_MESSAGE_TOO_BIG);
+ if (ss != SEND_OK) return -1;
+ }
+ // setup variables for next signal
+ start_i= i;
+ chunk_sz= 0;
+ tmp_ptr[i].sz= save_sz-send_sz;
+ tmp_ptr[i].p+= send_sz;
+ if (tmp_ptr[i].sz == 0)
+ i++;
+ }
+ else
+ {
+ chunk_sz+=save_sz;
+ i++;
+ }
+ }
+
+ unsigned a_sz= aSignal->getLength();
+
+ if (fragment_info > 0) {
+ // update the original signal to include section info
+ Uint32 *a_data= aSignal->getDataPtrSend();
+ unsigned tmp_sz= i-start_i;
+ memcpy(a_data+a_sz,
+ tmp_data,
+ tmp_sz*sizeof(Uint32));
+ a_data[a_sz+tmp_sz]= unique_id;
+ aSignal->setLength(a_sz+tmp_sz+1);
+
+ // send last fragment
+ aSignal->m_fragmentInfo= 3; // 3 = last fragment
+ aSignal->m_noOfSections= i-start_i;
+ } else {
+ aSignal->m_noOfSections= secs;
+ }
+
+ // send aSignal
+ int ret;
+ {
+ SendStatus ss = theTransporterRegistry->prepareSend
+ (aSignal,
+ 1/*JBB*/,
+ aSignal->getDataPtrSend(),
+ aNode,
+ &tmp_ptr[start_i]);
+ assert(ss != SEND_MESSAGE_TOO_BIG);
+ ret = (ss == SEND_OK ? 0 : -1);
+ }
+ aSignal->m_noOfSections = 0;
+ aSignal->m_fragmentInfo = 0;
+ aSignal->setLength(a_sz);
+ return ret;
+}
+
+int
+TransporterFacade::sendSignal(NdbApiSignal* aSignal, NodeId aNode,
+ LinearSectionPtr ptr[3], Uint32 secs){
+ aSignal->m_noOfSections = secs;
+ if(getIsNodeSendable(aNode) == true){
+#ifdef API_TRACE
+ if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
+ Uint32 tmp = aSignal->theSendersBlockRef;
+ aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
+ signalLogger.sendSignal(* aSignal,
+ 1,
+ aSignal->getDataPtrSend(),
+ aNode,
+ ptr, secs);
+ signalLogger.flushSignalLog();
+ aSignal->theSendersBlockRef = tmp;
+ }
+#endif
+ SendStatus ss = theTransporterRegistry->prepareSend
+ (aSignal,
+ 1, // JBB
+ aSignal->getDataPtrSend(),
+ aNode,
+ ptr);
+ assert(ss != SEND_MESSAGE_TOO_BIG);
+ aSignal->m_noOfSections = 0;
+ return (ss == SEND_OK ? 0 : -1);
+ }
+ aSignal->m_noOfSections = 0;
+ return -1;
+}
+
+/******************************************************************************
+ * CONNECTION METHODS Etc
+ ******************************************************************************/
+
+void
+TransporterFacade::doConnect(int aNodeId){
+ theTransporterRegistry->setIOState(aNodeId, NoHalt);
+ theTransporterRegistry->do_connect(aNodeId);
+}
+
+void
+TransporterFacade::doDisconnect(int aNodeId)
+{
+ theTransporterRegistry->do_disconnect(aNodeId);
+}
+
+void
+TransporterFacade::reportConnected(int aNodeId)
+{
+ theClusterMgr->reportConnected(aNodeId);
+ return;
+}
+
+void
+TransporterFacade::reportDisconnected(int aNodeId)
+{
+ theClusterMgr->reportDisconnected(aNodeId);
+ return;
+}
+
+NodeId
+TransporterFacade::ownId() const
+{
+ return theOwnId;
+}
+
+bool
+TransporterFacade::isConnected(NodeId aNodeId){
+ return theTransporterRegistry->is_connected(aNodeId);
+}
+
+NodeId
+TransporterFacade::get_an_alive_node()
+{
+ DBUG_ENTER("TransporterFacade::get_an_alive_node");
+ DBUG_PRINT("enter", ("theStartNodeId: %d", theStartNodeId));
+#ifdef VM_TRACE
+ const char* p = NdbEnv_GetEnv("NDB_ALIVE_NODE_ID", (char*)0, 0);
+ if (p != 0 && *p != 0)
+ return atoi(p);
+#endif
+ NodeId i;
+ for (i = theStartNodeId; i < MAX_NDB_NODES; i++) {
+ if (get_node_alive(i)){
+ DBUG_PRINT("info", ("Node %d is alive", i));
+ theStartNodeId = ((i + 1) % MAX_NDB_NODES);
+ DBUG_RETURN(i);
+ }
+ }
+ for (i = 1; i < theStartNodeId; i++) {
+ if (get_node_alive(i)){
+ DBUG_PRINT("info", ("Node %d is alive", i));
+ theStartNodeId = ((i + 1) % MAX_NDB_NODES);
+ DBUG_RETURN(i);
+ }
+ }
+ DBUG_RETURN((NodeId)0);
+}
+
+TransporterFacade::ThreadData::ThreadData(Uint32 size){
+ m_firstFree = END_OF_LIST;
+ expand(size);
+}
+
+void
+TransporterFacade::ThreadData::expand(Uint32 size){
+ Object_Execute oe = { 0 ,0 };
+ NodeStatusFunction fun = 0;
+
+ const Uint32 sz = m_statusNext.size();
+ m_objectExecute.fill(sz + size, oe);
+ m_statusFunction.fill(sz + size, fun);
+ for(Uint32 i = 0; i<size; i++){
+ m_statusNext.push_back(sz + i + 1);
+ }
+
+ m_statusNext.back() = m_firstFree;
+ m_firstFree = m_statusNext.size() - size;
+}
+
+
+int
+TransporterFacade::ThreadData::open(void* objRef,
+ ExecuteFunction fun,
+ NodeStatusFunction fun2)
+{
+ Uint32 nextFree = m_firstFree;
+
+ if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
+ return -1;
+ }
+
+ if(nextFree == END_OF_LIST){
+ expand(10);
+ nextFree = m_firstFree;
+ }
+
+ m_firstFree = m_statusNext[nextFree];
+
+ Object_Execute oe = { objRef , fun };
+
+ m_statusNext[nextFree] = INACTIVE;
+ m_objectExecute[nextFree] = oe;
+ m_statusFunction[nextFree] = fun2;
+
+ return indexToNumber(nextFree);
+}
+
+int
+TransporterFacade::ThreadData::close(int number){
+ number= numberToIndex(number);
+ assert(getInUse(number));
+ m_statusNext[number] = m_firstFree;
+ m_firstFree = number;
+ Object_Execute oe = { 0, 0 };
+ m_objectExecute[number] = oe;
+ m_statusFunction[number] = 0;
+ return 0;
+}
+
+template class Vector<NodeStatusFunction>;
+template class Vector<TransporterFacade::ThreadData::Object_Execute>;