summaryrefslogtreecommitdiff
path: root/storage/ndb/src/ndbapi/signal-sender
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/ndbapi/signal-sender')
-rw-r--r--storage/ndb/src/ndbapi/signal-sender/Makefile19
-rw-r--r--storage/ndb/src/ndbapi/signal-sender/SignalSender.cpp237
-rw-r--r--storage/ndb/src/ndbapi/signal-sender/SignalSender.hpp82
3 files changed, 338 insertions, 0 deletions
diff --git a/storage/ndb/src/ndbapi/signal-sender/Makefile b/storage/ndb/src/ndbapi/signal-sender/Makefile
new file mode 100644
index 00000000000..56e6ce1eac0
--- /dev/null
+++ b/storage/ndb/src/ndbapi/signal-sender/Makefile
@@ -0,0 +1,19 @@
+include .defs.mk
+
+TYPE := ndbapi
+
+NONPIC_ARCHIVE := Y
+ARCHIVE_TARGET := signal-sender
+
+BIN_TARGET_LIBS := # -lkalle
+BIN_TARGET_ARCHIVES := portlib # $(NDB_TOP)/lib/libkalle.a
+
+# Source files of non-templated classes (.cpp files)
+SOURCES = SignalSender.cpp
+
+CCFLAGS_LOC += -I$(call fixpath,$(NDB_TOP)/src/ndbapi)
+
+include $(NDB_TOP)/Epilogue.mk
+
+###
+# Backward compatible
diff --git a/storage/ndb/src/ndbapi/signal-sender/SignalSender.cpp b/storage/ndb/src/ndbapi/signal-sender/SignalSender.cpp
new file mode 100644
index 00000000000..680d0c23b4a
--- /dev/null
+++ b/storage/ndb/src/ndbapi/signal-sender/SignalSender.cpp
@@ -0,0 +1,237 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "SignalSender.hpp"
+#include "ConfigRetriever.hpp"
+#include <NdbSleep.h>
+#include <SignalLoggerManager.hpp>
+
+SimpleSignal::SimpleSignal(bool dealloc){
+ memset(this, 0, sizeof(* this));
+ deallocSections = dealloc;
+}
+
+SimpleSignal::~SimpleSignal(){
+ if(!deallocSections)
+ return;
+ if(ptr[0].p != 0) delete []ptr[0].p;
+ if(ptr[1].p != 0) delete []ptr[1].p;
+ if(ptr[2].p != 0) delete []ptr[2].p;
+}
+
+void
+SimpleSignal::set(class SignalSender& ss,
+ Uint8 trace, Uint16 recBlock, Uint16 gsn, Uint32 len){
+
+ header.theTrace = trace;
+ header.theReceiversBlockNumber = recBlock;
+ header.theVerId_signalNumber = gsn;
+ header.theLength = len;
+ header.theSendersBlockRef = refToBlock(ss.getOwnRef());
+}
+
+void
+SimpleSignal::print(FILE * out){
+ fprintf(out, "---- Signal ----------------\n");
+ SignalLoggerManager::printSignalHeader(out, header, 0, 0, false);
+ SignalLoggerManager::printSignalData(out, header, theData);
+ for(Uint32 i = 0; i<header.m_noOfSections; i++){
+ Uint32 len = ptr[i].sz;
+ fprintf(out, " --- Section %d size=%d ---\n", i, len);
+ Uint32 * signalData = ptr[i].p;
+ while(len >= 7){
+ fprintf(out,
+ " H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x\n",
+ signalData[0], signalData[1], signalData[2], signalData[3],
+ signalData[4], signalData[5], signalData[6]);
+ len -= 7;
+ signalData += 7;
+ }
+ if(len > 0){
+ fprintf(out, " H\'%.8x", signalData[0]);
+ for(Uint32 i = 1; i<len; i++)
+ fprintf(out, " H\'%.8x", signalData[i]);
+ fprintf(out, "\n");
+ }
+ }
+}
+
+SignalSender::SignalSender(const char * connectString){
+ m_cond = NdbCondition_Create();
+ theFacade = TransporterFacade::start_instance(connectString);
+ m_blockNo = theFacade->open(this, execSignal, execNodeStatus);
+ assert(m_blockNo > 0);
+}
+
+SignalSender::~SignalSender(){
+ theFacade->close(m_blockNo);
+ theFacade->stop_instance();
+ NdbCondition_Destroy(m_cond);
+}
+
+Uint32
+SignalSender::getOwnRef() const {
+ return numberToRef(m_blockNo, theFacade->ownId());
+}
+
+bool
+SignalSender::connectOne(Uint32 timeOutMillis){
+ NDB_TICKS start = NdbTick_CurrentMillisecond();
+ NDB_TICKS now = start;
+ while(theFacade->theClusterMgr->getNoOfConnectedNodes() == 0 &&
+ (timeOutMillis == 0 || (now - start) < timeOutMillis)){
+ NdbSleep_MilliSleep(100);
+ }
+ return theFacade->theClusterMgr->getNoOfConnectedNodes() > 0;
+}
+
+bool
+SignalSender::connectAll(Uint32 timeOutMillis){
+ NDB_TICKS start = NdbTick_CurrentMillisecond();
+ NDB_TICKS now = start;
+ while(theFacade->theClusterMgr->getNoOfConnectedNodes() < 1 &&
+ (timeOutMillis == 0 || (now - start) < timeOutMillis)){
+ NdbSleep_MilliSleep(100);
+ }
+ return theFacade->theClusterMgr->getNoOfConnectedNodes() >= 1;
+}
+
+
+Uint32
+SignalSender::getAliveNode(){
+ return theFacade->get_an_alive_node();
+}
+
+const ClusterMgr::Node &
+SignalSender::getNodeInfo(Uint16 nodeId) const {
+ return theFacade->theClusterMgr->getNodeInfo(nodeId);
+}
+
+Uint32
+SignalSender::getNoOfConnectedNodes() const {
+ return theFacade->theClusterMgr->getNoOfConnectedNodes();
+}
+
+SendStatus
+SignalSender::sendSignal(Uint16 nodeId, const SimpleSignal * s){
+ return theFacade->theTransporterRegistry->prepareSend(&s->header,
+ 1, // JBB
+ &s->theData[0],
+ nodeId,
+ &s->ptr[0]);
+}
+
+template<class T>
+SimpleSignal *
+SignalSender::waitFor(Uint32 timeOutMillis, T & t){
+
+ Guard g(theFacade->theMutexPtr);
+
+ SimpleSignal * s = t.check(m_jobBuffer);
+ if(s != 0){
+ return s;
+ }
+
+ NDB_TICKS now = NdbTick_CurrentMillisecond();
+ NDB_TICKS stop = now + timeOutMillis;
+ Uint32 wait = (timeOutMillis == 0 ? 10 : timeOutMillis);
+ do {
+ NdbCondition_WaitTimeout(m_cond,
+ theFacade->theMutexPtr,
+ wait);
+
+
+ SimpleSignal * s = t.check(m_jobBuffer);
+ if(s != 0){
+ return s;
+ }
+
+ now = NdbTick_CurrentMillisecond();
+ wait = (timeOutMillis == 0 ? 10 : stop - now);
+ } while(stop > now || timeOutMillis == 0);
+
+ return 0;
+}
+
+class WaitForAny {
+public:
+ SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
+ if(m_jobBuffer.size() > 0){
+ SimpleSignal * s = m_jobBuffer[0];
+ m_jobBuffer.erase(0);
+ return s;
+ }
+ return 0;
+ }
+};
+
+SimpleSignal *
+SignalSender::waitFor(Uint32 timeOutMillis){
+
+ WaitForAny w;
+ return waitFor(timeOutMillis, w);
+}
+
+class WaitForNode {
+public:
+ Uint32 m_nodeId;
+ SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
+ Uint32 len = m_jobBuffer.size();
+ for(Uint32 i = 0; i<len; i++){
+ if(refToNode(m_jobBuffer[i]->header.theSendersBlockRef) == m_nodeId){
+ SimpleSignal * s = m_jobBuffer[i];
+ m_jobBuffer.erase(i);
+ return s;
+ }
+ }
+ return 0;
+ }
+};
+
+SimpleSignal *
+SignalSender::waitFor(Uint16 nodeId, Uint32 timeOutMillis){
+
+ WaitForNode w;
+ w.m_nodeId = nodeId;
+ return waitFor(timeOutMillis, w);
+}
+
+#include <NdbApiSignal.hpp>
+
+void
+SignalSender::execSignal(void* signalSender,
+ NdbApiSignal* signal,
+ class LinearSectionPtr ptr[3]){
+ SimpleSignal * s = new SimpleSignal(true);
+ s->header = * signal;
+ memcpy(&s->theData[0], signal->getDataPtr(), 4 * s->header.theLength);
+ for(Uint32 i = 0; i<s->header.m_noOfSections; i++){
+ s->ptr[i].p = new Uint32[ptr[i].sz];
+ s->ptr[i].sz = ptr[i].sz;
+ memcpy(s->ptr[i].p, ptr[i].p, 4 * ptr[i].sz);
+ }
+ SignalSender * ss = (SignalSender*)signalSender;
+ ss->m_jobBuffer.push_back(s);
+ NdbCondition_Signal(ss->m_cond);
+}
+
+void
+SignalSender::execNodeStatus(void* signalSender,
+ Uint16 NodeId,
+ bool alive,
+ bool nfCompleted){
+}
+
diff --git a/storage/ndb/src/ndbapi/signal-sender/SignalSender.hpp b/storage/ndb/src/ndbapi/signal-sender/SignalSender.hpp
new file mode 100644
index 00000000000..e4e6c1931d2
--- /dev/null
+++ b/storage/ndb/src/ndbapi/signal-sender/SignalSender.hpp
@@ -0,0 +1,82 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef SIGNAL_SENDER_HPP
+#define SIGNAL_SENDER_HPP
+
+#include <ndb_global.h>
+#include <TransporterDefinitions.hpp>
+#include <TransporterFacade.hpp>
+#include <ClusterMgr.hpp>
+#include <Vector.hpp>
+
+struct SimpleSignal {
+public:
+ SimpleSignal(bool dealloc = false);
+ ~SimpleSignal();
+
+ void set(class SignalSender&,
+ Uint8 trace, Uint16 recBlock, Uint16 gsn, Uint32 len);
+
+ struct SignalHeader header;
+ Uint32 theData[25];
+ LinearSectionPtr ptr[3];
+
+ void print(FILE * out = stdout);
+private:
+ bool deallocSections;
+};
+
+class SignalSender {
+public:
+ SignalSender(const char * connectString = 0);
+ virtual ~SignalSender();
+
+ bool connectOne(Uint32 timeOutMillis = 0);
+ bool connectAll(Uint32 timeOutMillis = 0);
+ bool connect(Uint32 timeOutMillis = 0) { return connectAll(timeOutMillis);}
+
+ Uint32 getOwnRef() const;
+
+ Uint32 getAliveNode();
+ Uint32 getNoOfConnectedNodes() const;
+ const ClusterMgr::Node & getNodeInfo(Uint16 nodeId) const;
+
+ SendStatus sendSignal(Uint16 nodeId, const SimpleSignal *);
+
+ SimpleSignal * waitFor(Uint32 timeOutMillis = 0);
+ SimpleSignal * waitFor(Uint16 nodeId, Uint32 timeOutMillis = 0);
+ SimpleSignal * waitFor(Uint16 nodeId, Uint16 gsn, Uint32 timeOutMillis = 0);
+
+private:
+ int m_blockNo;
+ TransporterFacade * theFacade;
+
+ static void execSignal(void* signalSender,
+ NdbApiSignal* signal,
+ class LinearSectionPtr ptr[3]);
+
+ static void execNodeStatus(void* signalSender, NodeId,
+ bool alive, bool nfCompleted);
+
+ struct NdbCondition * m_cond;
+ Vector<SimpleSignal *> m_jobBuffer;
+
+ template<class T>
+ SimpleSignal * waitFor(Uint32 timeOutMillis, T & t);
+};
+
+#endif