summaryrefslogtreecommitdiff
path: root/ndb/src/ndbapi
diff options
context:
space:
mode:
authortomas@poseidon.ndb.mysql.com <>2005-09-08 15:59:06 +0200
committertomas@poseidon.ndb.mysql.com <>2005-09-08 15:59:06 +0200
commit4d534ae24e3a26b5fabc0e96ee9cfaae84de340e (patch)
treeb65f4500389f114de9bb53641a8625d61d4e5cbf /ndb/src/ndbapi
parent9c7d9c8fbbebde235be968405ed31dac944d3118 (diff)
parenta6c9ebba470befa2ed5143ad10a8d0fc39ba1574 (diff)
downloadmariadb-git-4d534ae24e3a26b5fabc0e96ee9cfaae84de340e.tar.gz
Merge poseidon.ndb.mysql.com:/home/tomas/mysql-4.1
into poseidon.ndb.mysql.com:/home/tomas/mysql-5.0
Diffstat (limited to 'ndb/src/ndbapi')
-rw-r--r--ndb/src/ndbapi/Makefile.am3
-rw-r--r--ndb/src/ndbapi/SignalSender.cpp270
-rw-r--r--ndb/src/ndbapi/SignalSender.hpp83
3 files changed, 355 insertions, 1 deletions
diff --git a/ndb/src/ndbapi/Makefile.am b/ndb/src/ndbapi/Makefile.am
index b734e058b87..99b75ffbd53 100644
--- a/ndb/src/ndbapi/Makefile.am
+++ b/ndb/src/ndbapi/Makefile.am
@@ -34,7 +34,8 @@ libndbapi_la_SOURCES = \
NdbDictionaryImpl.cpp \
DictCache.cpp \
ndb_cluster_connection.cpp \
- NdbBlob.cpp
+ NdbBlob.cpp \
+ SignalSender.cpp
INCLUDES_LOC = -I$(top_srcdir)/ndb/src/mgmapi
diff --git a/ndb/src/ndbapi/SignalSender.cpp b/ndb/src/ndbapi/SignalSender.cpp
new file mode 100644
index 00000000000..0a23529dc73
--- /dev/null
+++ b/ndb/src/ndbapi/SignalSender.cpp
@@ -0,0 +1,270 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "SignalSender.hpp"
+#include <NdbSleep.h>
+#include <SignalLoggerManager.hpp>
+#include <signaldata/NFCompleteRep.hpp>
+#include <signaldata/NodeFailRep.hpp>
+
+SimpleSignal::SimpleSignal(bool dealloc){
+ memset(this, 0, sizeof(* this));
+ deallocSections = dealloc;
+}
+
+SimpleSignal::~SimpleSignal(){
+ if(!deallocSections)
+ return;
+ if(ptr[0].p != 0) delete []ptr[0].p;
+ if(ptr[1].p != 0) delete []ptr[1].p;
+ if(ptr[2].p != 0) delete []ptr[2].p;
+}
+
+void
+SimpleSignal::set(class SignalSender& ss,
+ Uint8 trace, Uint16 recBlock, Uint16 gsn, Uint32 len){
+
+ header.theTrace = trace;
+ header.theReceiversBlockNumber = recBlock;
+ header.theVerId_signalNumber = gsn;
+ header.theLength = len;
+ header.theSendersBlockRef = refToBlock(ss.getOwnRef());
+}
+
+void
+SimpleSignal::print(FILE * out){
+ fprintf(out, "---- Signal ----------------\n");
+ SignalLoggerManager::printSignalHeader(out, header, 0, 0, false);
+ SignalLoggerManager::printSignalData(out, header, theData);
+ for(Uint32 i = 0; i<header.m_noOfSections; i++){
+ Uint32 len = ptr[i].sz;
+ fprintf(out, " --- Section %d size=%d ---\n", i, len);
+ Uint32 * signalData = ptr[i].p;
+ while(len >= 7){
+ fprintf(out,
+ " H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x\n",
+ signalData[0], signalData[1], signalData[2], signalData[3],
+ signalData[4], signalData[5], signalData[6]);
+ len -= 7;
+ signalData += 7;
+ }
+ if(len > 0){
+ fprintf(out, " H\'%.8x", signalData[0]);
+ for(Uint32 i = 1; i<len; i++)
+ fprintf(out, " H\'%.8x", signalData[i]);
+ fprintf(out, "\n");
+ }
+ }
+}
+
+SignalSender::SignalSender(TransporterFacade *facade)
+ : m_lock(0)
+{
+ m_cond = NdbCondition_Create();
+ theFacade = facade;
+ m_blockNo = theFacade->open(this, execSignal, execNodeStatus);
+ assert(m_blockNo > 0);
+}
+
+SignalSender::~SignalSender(){
+ int i;
+ if (m_lock)
+ unlock();
+ theFacade->close(m_blockNo,0);
+ // free these _after_ closing theFacade to ensure that
+ // we delete all signals
+ for (i= m_jobBuffer.size()-1; i>= 0; i--)
+ delete m_jobBuffer[i];
+ for (i= m_usedBuffer.size()-1; i>= 0; i--)
+ delete m_usedBuffer[i];
+ NdbCondition_Destroy(m_cond);
+}
+
+int SignalSender::lock()
+{
+ if (NdbMutex_Lock(theFacade->theMutexPtr))
+ return -1;
+ m_lock= 1;
+ return 0;
+}
+
+int SignalSender::unlock()
+{
+ if (NdbMutex_Unlock(theFacade->theMutexPtr))
+ return -1;
+ m_lock= 0;
+ return 0;
+}
+
+Uint32
+SignalSender::getOwnRef() const {
+ return numberToRef(m_blockNo, theFacade->ownId());
+}
+
+Uint32
+SignalSender::getAliveNode() const{
+ return theFacade->get_an_alive_node();
+}
+
+const ClusterMgr::Node &
+SignalSender::getNodeInfo(Uint16 nodeId) const {
+ return theFacade->theClusterMgr->getNodeInfo(nodeId);
+}
+
+Uint32
+SignalSender::getNoOfConnectedNodes() const {
+ return theFacade->theClusterMgr->getNoOfConnectedNodes();
+}
+
+SendStatus
+SignalSender::sendSignal(Uint16 nodeId, const SimpleSignal * s){
+ return theFacade->theTransporterRegistry->prepareSend(&s->header,
+ 1, // JBB
+ &s->theData[0],
+ nodeId,
+ &s->ptr[0]);
+}
+
+template<class T>
+SimpleSignal *
+SignalSender::waitFor(Uint32 timeOutMillis, T & t)
+{
+ SimpleSignal * s = t.check(m_jobBuffer);
+ if(s != 0){
+ return s;
+ }
+
+ NDB_TICKS now = NdbTick_CurrentMillisecond();
+ NDB_TICKS stop = now + timeOutMillis;
+ Uint32 wait = (timeOutMillis == 0 ? 10 : timeOutMillis);
+ do {
+ NdbCondition_WaitTimeout(m_cond,
+ theFacade->theMutexPtr,
+ wait);
+
+
+ SimpleSignal * s = t.check(m_jobBuffer);
+ if(s != 0){
+ m_usedBuffer.push_back(s);
+ return s;
+ }
+
+ now = NdbTick_CurrentMillisecond();
+ wait = (timeOutMillis == 0 ? 10 : stop - now);
+ } while(stop > now || timeOutMillis == 0);
+
+ return 0;
+}
+
+class WaitForAny {
+public:
+ SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
+ if(m_jobBuffer.size() > 0){
+ SimpleSignal * s = m_jobBuffer[0];
+ m_jobBuffer.erase(0);
+ return s;
+ }
+ return 0;
+ }
+};
+
+SimpleSignal *
+SignalSender::waitFor(Uint32 timeOutMillis){
+
+ WaitForAny w;
+ return waitFor(timeOutMillis, w);
+}
+
+class WaitForNode {
+public:
+ Uint32 m_nodeId;
+ SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){
+ Uint32 len = m_jobBuffer.size();
+ for(Uint32 i = 0; i<len; i++){
+ if(refToNode(m_jobBuffer[i]->header.theSendersBlockRef) == m_nodeId){
+ SimpleSignal * s = m_jobBuffer[i];
+ m_jobBuffer.erase(i);
+ return s;
+ }
+ }
+ return 0;
+ }
+};
+
+SimpleSignal *
+SignalSender::waitFor(Uint16 nodeId, Uint32 timeOutMillis){
+
+ WaitForNode w;
+ w.m_nodeId = nodeId;
+ return waitFor(timeOutMillis, w);
+}
+
+#include <NdbApiSignal.hpp>
+
+void
+SignalSender::execSignal(void* signalSender,
+ NdbApiSignal* signal,
+ class LinearSectionPtr ptr[3]){
+ SimpleSignal * s = new SimpleSignal(true);
+ s->header = * signal;
+ memcpy(&s->theData[0], signal->getDataPtr(), 4 * s->header.theLength);
+ for(Uint32 i = 0; i<s->header.m_noOfSections; i++){
+ s->ptr[i].p = new Uint32[ptr[i].sz];
+ s->ptr[i].sz = ptr[i].sz;
+ memcpy(s->ptr[i].p, ptr[i].p, 4 * ptr[i].sz);
+ }
+ SignalSender * ss = (SignalSender*)signalSender;
+ ss->m_jobBuffer.push_back(s);
+ NdbCondition_Signal(ss->m_cond);
+}
+
+void
+SignalSender::execNodeStatus(void* signalSender,
+ Uint32 nodeId,
+ bool alive,
+ bool nfCompleted){
+ if (alive) {
+ // node connected
+ return;
+ }
+
+ SimpleSignal * s = new SimpleSignal(true);
+ SignalSender * ss = (SignalSender*)signalSender;
+
+ // node disconnected
+ if(nfCompleted)
+ {
+ // node shutdown complete
+ s->header.theVerId_signalNumber = GSN_NF_COMPLETEREP;
+ NFCompleteRep *rep = (NFCompleteRep *)s->getDataPtrSend();
+ rep->failedNodeId = nodeId;
+ }
+ else
+ {
+ // node failure
+ s->header.theVerId_signalNumber = GSN_NODE_FAILREP;
+ NodeFailRep *rep = (NodeFailRep *)s->getDataPtrSend();
+ rep->failNo = nodeId;
+ }
+
+ ss->m_jobBuffer.push_back(s);
+ NdbCondition_Signal(ss->m_cond);
+}
+
+template SimpleSignal* SignalSender::waitFor<WaitForNode>(unsigned, WaitForNode&);
+template SimpleSignal* SignalSender::waitFor<WaitForAny>(unsigned, WaitForAny&);
+template Vector<SimpleSignal*>;
+
diff --git a/ndb/src/ndbapi/SignalSender.hpp b/ndb/src/ndbapi/SignalSender.hpp
new file mode 100644
index 00000000000..4b991460034
--- /dev/null
+++ b/ndb/src/ndbapi/SignalSender.hpp
@@ -0,0 +1,83 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef SIGNAL_SENDER_HPP
+#define SIGNAL_SENDER_HPP
+
+#include <ndb_global.h>
+#include "TransporterFacade.hpp"
+#include <Vector.hpp>
+
+struct SimpleSignal {
+public:
+ SimpleSignal(bool dealloc = false);
+ ~SimpleSignal();
+
+ void set(class SignalSender&,
+ Uint8 trace, Uint16 recBlock, Uint16 gsn, Uint32 len);
+
+ struct SignalHeader header;
+ Uint32 theData[25];
+ LinearSectionPtr ptr[3];
+
+ int readSignalNumber() {return header.theVerId_signalNumber; }
+ Uint32 *getDataPtrSend() { return theData; }
+ const Uint32 *getDataPtr() const { return theData; }
+
+ void print(FILE * out = stdout);
+private:
+ bool deallocSections;
+};
+
+class SignalSender {
+public:
+ SignalSender(TransporterFacade *facade);
+ virtual ~SignalSender();
+
+ int lock();
+ int unlock();
+
+ Uint32 getOwnRef() const;
+ Uint32 getAliveNode() const;
+ const ClusterMgr::Node &getNodeInfo(Uint16 nodeId) const;
+ Uint32 getNoOfConnectedNodes() const;
+
+ SendStatus sendSignal(Uint16 nodeId, const SimpleSignal *);
+
+ SimpleSignal * waitFor(Uint32 timeOutMillis = 0);
+ SimpleSignal * waitFor(Uint16 nodeId, Uint32 timeOutMillis = 0);
+ SimpleSignal * waitFor(Uint16 nodeId, Uint16 gsn, Uint32 timeOutMillis = 0);
+private:
+ int m_blockNo;
+ TransporterFacade * theFacade;
+
+ static void execSignal(void* signalSender,
+ NdbApiSignal* signal,
+ class LinearSectionPtr ptr[3]);
+
+ static void execNodeStatus(void* signalSender, Uint32 nodeId,
+ bool alive, bool nfCompleted);
+
+ int m_lock;
+ struct NdbCondition * m_cond;
+ Vector<SimpleSignal *> m_jobBuffer;
+ Vector<SimpleSignal *> m_usedBuffer;
+
+ template<class T>
+ SimpleSignal * waitFor(Uint32 timeOutMillis, T & t);
+};
+
+#endif