summaryrefslogtreecommitdiff
path: root/ndb/src/ndbapi/NdbEventOperationImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/ndbapi/NdbEventOperationImpl.cpp')
-rw-r--r--ndb/src/ndbapi/NdbEventOperationImpl.cpp1305
1 files changed, 1305 insertions, 0 deletions
diff --git a/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/ndb/src/ndbapi/NdbEventOperationImpl.cpp
new file mode 100644
index 00000000000..d167b8205a2
--- /dev/null
+++ b/ndb/src/ndbapi/NdbEventOperationImpl.cpp
@@ -0,0 +1,1305 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#include <stdio.h>
+
+
+#include "NdbDictionaryImpl.hpp"
+#include "API.hpp"
+#include <NdbOut.hpp>
+#include <AttrType.hpp>
+#include "NdbApiSignal.hpp"
+#include "TransporterFacade.hpp"
+#include <signaldata/GetTabInfo.hpp>
+#include <signaldata/DictTabInfo.hpp>
+#include <signaldata/CreateTable.hpp>
+#include <signaldata/CreateIndx.hpp>
+#include <signaldata/CreateEvnt.hpp>
+#include <signaldata/SumaImpl.hpp>
+#include <signaldata/DropTable.hpp>
+#include <signaldata/DropIndx.hpp>
+#include <signaldata/ListTables.hpp>
+#include <SimpleProperties.hpp>
+#include <Bitmask.hpp>
+#include <AttributeHeader.hpp>
+#include <AttributeList.hpp>
+#include <ndb_types.h>
+#include <kernel_types.h>
+#include <NdbError.hpp>
+#include <BaseString.hpp>
+#include <UtilBuffer.hpp>
+#include <NdbDictionary.hpp>
+#include <Ndb.hpp>
+#include "NdbImpl.hpp"
+#include "DictCache.hpp"
+#include <portlib/NdbMem.h>
+#include <NdbRecAttr.hpp>
+#include <NdbEventOperation.hpp>
+#include "NdbEventOperationImpl.hpp"
+
+/*
+ * Class NdbEventOperationImpl
+ *
+ *
+ */
+
+//#define EVENT_DEBUG
+
+
+NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
+ Ndb *theNdb,
+ const char* eventName,
+ const int bufferLength)
+ : NdbEventOperation(*this), m_ndb(theNdb),
+ m_state(ERROR), m_bufferL(bufferLength)
+{
+
+ m_eventId = 0;
+ theFirstRecAttrs[0] = NULL;
+ theCurrentRecAttrs[0] = NULL;
+ theFirstRecAttrs[1] = NULL;
+ theCurrentRecAttrs[1] = NULL;
+ sdata = NULL;
+ ptr[0].p = NULL;
+ ptr[1].p = NULL;
+ ptr[2].p = NULL;
+
+ // we should lookup id in Dictionary, TODO
+ // also make sure we only have one listener on each event
+
+ if (!m_ndb) { ndbout_c("m_ndb=NULL"); return; }
+
+ NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
+ if (!myDict) { ndbout_c("getDictionary=NULL"); return; }
+
+ const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName);
+ if (!myEvnt) { ndbout_c("getEvent()=NULL"); return; }
+
+ m_eventImpl = &myEvnt->m_impl;
+ if (!m_eventImpl) { ndbout_c("m_impl=NULL"); return; }
+
+ m_bufferHandle = m_ndb->getGlobalEventBufferHandle();
+ if (m_bufferHandle->m_bufferL > 0)
+ m_bufferL =m_bufferHandle->m_bufferL;
+ else
+ m_bufferHandle->m_bufferL = m_bufferL;
+
+ m_state = CREATED;
+}
+
+NdbEventOperationImpl::~NdbEventOperationImpl()
+{
+ if (sdata) NdbMem_Free(sdata);
+ for (int i=0 ; i<3; i++) {
+ if (ptr[i].p) NdbMem_Free(ptr[i].p);
+ }
+ for (int i=0 ; i<2; i++) {
+ NdbRecAttr *p = theFirstRecAttrs[i];
+ while (p) {
+ NdbRecAttr *p_next = p->next();
+ m_ndb->releaseRecAttr(p);
+ p = p_next;
+ }
+ }
+ if (m_state == NdbEventOperation::EXECUTING) {
+ stop();
+ // m_bufferHandle->dropSubscribeEvent(m_bufferId);
+ ; // We should send stop signal here
+ }
+}
+
+NdbEventOperation::State
+NdbEventOperationImpl::getState()
+{
+ return m_state;
+}
+
+NdbRecAttr*
+NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n)
+{
+ if (m_state != NdbEventOperation::CREATED) {
+ ndbout_c("NdbEventOperationImpl::getValue may only be called between instantiation and execute()");
+ return NULL;
+ }
+
+ NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
+
+ if (tAttrInfo == NULL) {
+ ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName);
+ return NULL;
+ }
+
+ return NdbEventOperationImpl::getValue(tAttrInfo, aValue, n);
+}
+
+NdbRecAttr*
+NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n)
+{
+ // Insert Attribute Id into ATTRINFO part.
+ NdbRecAttr *&theFirstRecAttr = theFirstRecAttrs[n];
+ NdbRecAttr *&theCurrentRecAttr = theCurrentRecAttrs[n];
+
+ /************************************************************************
+ * Get a Receive Attribute object and link it into the operation object.
+ ************************************************************************/
+ NdbRecAttr *tRecAttr = m_ndb->getRecAttr();
+ if (tRecAttr == NULL) {
+ exit(-1);
+ //setErrorCodeAbort(4000);
+ return NULL;
+ }
+
+ /**********************************************************************
+ * Now set the attribute identity and the pointer to the data in
+ * the RecAttr object
+ * Also set attribute size, array size and attribute type
+ ********************************************************************/
+ if (tRecAttr->setup(tAttrInfo, aValue)) {
+ //setErrorCodeAbort(4000);
+ m_ndb->releaseRecAttr(tRecAttr);
+ exit(-1);
+ return NULL;
+ }
+ //theErrorLine++;
+
+ tRecAttr->setUNDEFINED();
+
+ // We want to keep the list sorted to make data insertion easier later
+ if (theFirstRecAttr == NULL) {
+ theFirstRecAttr = tRecAttr;
+ theCurrentRecAttr = tRecAttr;
+ tRecAttr->next(NULL);
+ } else {
+ Uint32 tAttrId = tAttrInfo->m_attrId;
+ if (tAttrId > theCurrentRecAttr->attrId()) { // right order
+ theCurrentRecAttr->next(tRecAttr);
+ tRecAttr->next(NULL);
+ theCurrentRecAttr = tRecAttr;
+ } else if (theFirstRecAttr->next() == NULL || // only one in list
+ theFirstRecAttr->attrId() > tAttrId) {// or first
+ tRecAttr->next(theFirstRecAttr);
+ theFirstRecAttr = tRecAttr;
+ } else { // at least 2 in list and not first and not last
+ NdbRecAttr *p = theFirstRecAttr;
+ NdbRecAttr *p_next = p->next();
+ while (tAttrId > p_next->attrId()) {
+ p = p_next;
+ p_next = p->next();
+ }
+ if (tAttrId == p_next->attrId()) { // Using same attribute twice
+ tRecAttr->release(); // do I need to do this?
+ m_ndb->releaseRecAttr(tRecAttr);
+ exit(-1);
+ return NULL;
+ }
+ // this is it, between p and p_next
+ p->next(tRecAttr);
+ tRecAttr->next(p_next);
+ }
+ }
+
+ return tRecAttr;
+}
+
+int
+NdbEventOperationImpl::execute()
+{
+ NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
+ if (!myDict) {
+ ndbout_c("NdbEventOperation::execute(): getDictionary=NULL");
+ return 0;
+ }
+
+ if (theFirstRecAttrs[0] == NULL) { // defaults to get all
+
+ }
+
+ NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict);
+
+
+ int hasSubscriber;
+ m_bufferId =
+ m_bufferHandle->prepareAddSubscribeEvent(m_eventImpl->m_eventId,
+ hasSubscriber /* return value */);
+
+ m_eventImpl->m_bufferId = m_bufferId;
+
+ int r = -1;
+ if (m_bufferId >= 0) {
+ // now we check if there's already a subscriber
+
+ if (hasSubscriber == 0) { // only excute if there's no other subscribers
+ r = myDictImpl.executeSubscribeEvent(*m_eventImpl);
+ } else {
+ r = 0;
+ }
+ if (r) {
+ //Error
+ m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId);
+ m_state = NdbEventOperation::ERROR;
+ } else {
+ m_bufferHandle->addSubscribeEvent(m_bufferId, this);
+ m_state = NdbEventOperation::EXECUTING;
+ }
+ } else {
+ //Error
+ m_state = NdbEventOperation::ERROR;
+ }
+ return r;
+}
+
+int
+NdbEventOperationImpl::stop()
+{
+ if (m_state != NdbEventOperation::EXECUTING)
+ return -1;
+
+ // ndbout_c("NdbEventOperation::stopping()");
+
+ NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
+ if (!myDict) {
+ ndbout_c("NdbEventOperation::stop(): getDictionary=NULL");
+ return 0;
+ }
+
+ NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict);
+
+ int hasSubscriber;
+ int ret =
+ m_bufferHandle->prepareDropSubscribeEvent(m_bufferId,
+ hasSubscriber /* return value */);
+
+ if (ret < 0) {
+ ndbout_c("prepareDropSubscribeEvent failed");
+ return -1;
+ }
+ // m_eventImpl->m_bufferId = m_bufferId;
+
+ int r = -1;
+
+ if (hasSubscriber == 0) { // only excute if there's no other subscribers
+ r = myDictImpl.stopSubscribeEvent(*m_eventImpl);
+#ifdef EVENT_DEBUG
+ ndbout_c("NdbEventOperation::stopping() done");
+#endif
+ } else
+ r = 0;
+
+ if (r) {
+ //Error
+ m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId);
+ m_state = NdbEventOperation::ERROR;
+ } else {
+#ifdef EVENT_DEBUG
+ ndbout_c("NdbEventOperation::dropping()");
+#endif
+ m_bufferHandle->dropSubscribeEvent(m_bufferId);
+ m_state = NdbEventOperation::CREATED;
+ }
+
+
+ return r;
+}
+
+bool
+NdbEventOperationImpl::isConsistent()
+{
+ return sdata->isGCIConsistent();
+}
+
+Uint32
+NdbEventOperationImpl::getGCI()
+{
+ return sdata->gci;
+}
+
+Uint32
+NdbEventOperationImpl::getLatestGCI()
+{
+ return NdbGlobalEventBufferHandle::getLatestGCI();
+}
+
+int
+NdbEventOperationImpl::next(int *pOverrun)
+{
+ int nr = 10000; // a high value
+ int tmpOverrun = 0;
+ int *ptmpOverrun;
+ if (pOverrun) {
+ ptmpOverrun = &tmpOverrun;
+ } else
+ ptmpOverrun = NULL;
+
+ while (nr > 0) {
+ int r=NdbGlobalEventBufferHandle::getDataL(m_bufferId, sdata,
+ ptr, pOverrun);
+ if (pOverrun) {
+ tmpOverrun += *pOverrun;
+ *pOverrun = tmpOverrun;
+ }
+
+ if (r <= 0) return r; // no data
+
+ if (r < nr) r = nr; else nr--; // we don't want to be stuck here forever
+
+#ifdef EVENT_DEBUG
+ ndbout_c("!!!!!!!sdata->operation %u", (Uint32)sdata->operation);
+#endif
+
+ // now move the data into the RecAttrs
+ if ((theFirstRecAttrs[0] == NULL) &&
+ (theFirstRecAttrs[1] == NULL)) return r;
+ // no copying since no RecAttr's
+
+
+ Uint32 *aAttrPtr = ptr[0].p;
+ Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz;
+ Uint32 *aDataPtr = ptr[1].p;
+
+#ifdef EVENT_DEBUG
+ printf("after values sz=%u\n", ptr[1].sz);
+ for(int i=0; i < ptr[1].sz; i++)
+ printf ("H'%.8X ",ptr[1].p[i]);
+ printf("\n");
+ printf("before values sz=%u\n", ptr[2].sz);
+ for(int i=0; i < ptr[2].sz; i++)
+ printf ("H'%.8X ",ptr[2].p[i]);
+ printf("\n");
+#endif
+
+ NdbRecAttr *tWorkingRecAttr = theFirstRecAttrs[0];
+
+ // copy data into the RecAttr's
+ // we assume that the respective attribute lists are sorted
+
+ Uint32 tRecAttrId;
+ Uint32 tAttrId;
+ Uint32 tDataSz;
+ int hasSomeData=0;
+ while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) {
+ tRecAttrId = tWorkingRecAttr->attrId();
+ tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
+ tDataSz = AttributeHeader(*aAttrPtr).getDataSize();
+
+ while (tAttrId > tRecAttrId) {
+ //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
+ tWorkingRecAttr->setUNDEFINED();
+ tWorkingRecAttr = tWorkingRecAttr->next();
+ if (tWorkingRecAttr == NULL)
+ break;
+ tRecAttrId = tWorkingRecAttr->attrId();
+ }
+ if (tWorkingRecAttr == NULL)
+ break;
+
+ //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
+
+ if (tAttrId == tRecAttrId) {
+ tWorkingRecAttr->setNotNULL();
+ if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey())
+ hasSomeData++;
+
+ //printf("set!\n");
+
+ Uint32 *theRef = (Uint32*)tWorkingRecAttr->aRef();
+ Uint32 *theEndRef = theRef + tDataSz;
+ while (theRef < theEndRef)
+ *theRef++ = *aDataPtr++;
+
+ // move forward, data has already moved forward
+ aAttrPtr++;
+ tWorkingRecAttr = tWorkingRecAttr->next();
+ } else {
+ // move only attr forward
+ aAttrPtr++;
+ aDataPtr += tDataSz;
+ }
+ }
+
+ while (tWorkingRecAttr != NULL) {
+ tRecAttrId = tWorkingRecAttr->attrId();
+ //printf("set undefined [%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
+ tWorkingRecAttr->setUNDEFINED();
+ tWorkingRecAttr = tWorkingRecAttr->next();
+ }
+
+ tWorkingRecAttr = theFirstRecAttrs[1];
+ aDataPtr = ptr[2].p;
+ Uint32 *aDataEndPtr = aDataPtr + ptr[2].sz;
+ while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
+ tRecAttrId = tWorkingRecAttr->attrId();
+ tAttrId = AttributeHeader(*aDataPtr).getAttributeId();
+ tDataSz = AttributeHeader(*aDataPtr).getDataSize();
+ aDataPtr++;
+ while (tAttrId > tRecAttrId) {
+ tWorkingRecAttr->setUNDEFINED();
+ tWorkingRecAttr = tWorkingRecAttr->next();
+ if (tWorkingRecAttr == NULL)
+ break;
+ tRecAttrId = tWorkingRecAttr->attrId();
+ }
+ if (tWorkingRecAttr == NULL)
+ break;
+ if (tAttrId == tRecAttrId) {
+ tWorkingRecAttr->setNotNULL();
+
+ if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey())
+ hasSomeData++;
+
+ Uint32 *theRef = (Uint32*)tWorkingRecAttr->aRef();
+ Uint32 *theEndRef = theRef + tDataSz;
+ while (theRef < theEndRef)
+ *theRef++ = *aDataPtr++;
+
+ // move forward, data+attr has already moved forward
+ tWorkingRecAttr = tWorkingRecAttr->next();
+ } else {
+ // move only data+attr forward
+ aDataPtr += tDataSz;
+ }
+ }
+ while (tWorkingRecAttr != NULL) {
+ tWorkingRecAttr->setUNDEFINED();
+ tWorkingRecAttr = tWorkingRecAttr->next();
+ }
+
+ if (hasSomeData)
+ return r;
+ }
+ return 0;
+}
+
+NdbDictionary::Event::TableEvent
+NdbEventOperationImpl::getEventType()
+{
+ switch (sdata->operation) {
+ case TriggerEvent::TE_INSERT:
+ return NdbDictionary::Event::TE_INSERT;
+ case TriggerEvent::TE_DELETE:
+ return NdbDictionary::Event::TE_DELETE;
+ case TriggerEvent::TE_UPDATE:
+ return NdbDictionary::Event::TE_UPDATE;
+ default:
+ return NdbDictionary::Event::TE_ALL;
+ }
+}
+
+void
+NdbEventOperationImpl::printRecAttr(NdbRecAttr *p)
+{
+ int size = p->attrSize();
+ int aSize = p->arraySize();
+
+ switch(p->attrType()){
+ case UnSigned:
+ switch(size) {
+ case 8: ndbout << p->u_64_value(); break;
+ case 4: ndbout << p->u_32_value(); break;
+ case 2: ndbout << p->u_short_value(); break;
+ case 1: ndbout << (unsigned) p->u_char_value(); break;
+ default: ndbout << "Unknown size" << endl;
+ }
+ break;
+
+ case Signed:
+ switch(size) {
+ case 8: ndbout << p->int64_value(); break;
+ case 4: ndbout << p->int32_value(); break;
+ case 2: ndbout << p->short_value(); break;
+ case 1: ndbout << (int) p->char_value(); break;
+ default: ndbout << "Unknown size" << endl;
+ }
+ break;
+
+ case String:
+ {
+ char* buf = new char[aSize+1];
+ memcpy(buf, p->aRef(), aSize);
+ buf[aSize] = 0;
+ ndbout << buf;
+ delete [] buf;
+ }
+ break;
+
+ case Float:
+ ndbout << p->float_value();
+ break;
+
+ default:
+ ndbout << "Unknown";
+ break;
+ }
+}
+
+void
+NdbEventOperationImpl::print()
+{
+ ndbout << "EventId " << m_eventId << "\n";
+
+ for (int i = 0; i < 2; i++) {
+ NdbRecAttr *p = theFirstRecAttrs[i];
+ ndbout << " %u " << i;
+ while (p) {
+ ndbout << " : " << p->attrId() << " = ";
+ printRecAttr(p);
+ p = p->next();
+ }
+ ndbout << "\n";
+ }
+}
+
+void
+NdbEventOperationImpl::printAll()
+{
+ Uint32 *aAttrPtr = ptr[0].p;
+ Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz;
+ Uint32 *aDataPtr = ptr[1].p;
+
+ //tRecAttr->setup(tAttrInfo, aValue)) {
+
+ Uint32 tAttrId;
+ Uint32 tDataSz;
+ for (; aAttrPtr < aAttrEndPtr; ) {
+ tAttrId = AttributeHeader(*aAttrPtr).getAttributeId();
+ tDataSz = AttributeHeader(*aAttrPtr).getDataSize();
+
+ aAttrPtr++;
+ aDataPtr += tDataSz;
+ }
+}
+
+
+int NdbEventOperationImpl::wait(void *p, int aMillisecondNumber)
+{
+ return ((NdbGlobalEventBufferHandle*)p)->wait(aMillisecondNumber);
+}
+
+/*
+ * Global variable ndbGlobalEventBuffer
+ * Class NdbGlobalEventBufferHandle
+ * Class NdbGlobalEventBuffer
+ *
+ */
+
+#define ADD_DROP_LOCK_GUARDR(TYPE, FN) \
+{ \
+ ndbGlobalEventBuffer->add_drop_lock(); \
+ ndbGlobalEventBuffer->lock(); \
+ TYPE r = ndbGlobalEventBuffer->FN; \
+ ndbGlobalEventBuffer->unlock(); \
+ if (r < 0) { \
+ ndbGlobalEventBuffer->add_drop_unlock(); \
+ } \
+ return r;\
+}
+#define GUARDR(TYPE, FN) \
+{ \
+ ndbGlobalEventBuffer->lock(); \
+ TYPE r = ndbGlobalEventBuffer->FN; \
+ ndbGlobalEventBuffer->unlock(); \
+ return r;\
+}
+#define GUARD(FN) \
+{ \
+ ndbGlobalEventBuffer->lock(); \
+ ndbGlobalEventBuffer->FN; \
+ ndbGlobalEventBuffer->unlock(); \
+}
+#define ADD_DROP_UNLOCK_GUARD(FN) \
+{ \
+ GUARD(FN); \
+ ndbGlobalEventBuffer->add_drop_unlock(); \
+}
+#define GUARDBLOCK(BLOCK) \
+{ \
+ ndbGlobalEventBuffer->lock(); \
+ BLOCK \
+ ndbGlobalEventBuffer->unlock(); \
+}
+
+/*
+ * Global variable ndbGlobalEventBuffer
+ *
+ */
+
+static NdbGlobalEventBuffer *ndbGlobalEventBuffer=NULL;
+#ifdef NDB_WIN32
+static NdbMutex & ndbGlobalEventBufferMutex = * NdbMutex_Create();
+#else
+static NdbMutex ndbGlobalEventBufferMutex = NDB_MUTEX_INITIALIZER;
+#endif
+
+/*
+ * Class NdbGlobalEventBufferHandle
+ * Each Ndb object has a Handle. This Handle is used to access the
+ * global NdbGlobalEventBuffer instance ndbGlobalEventBuffer
+ */
+
+NdbGlobalEventBufferHandle *
+NdbGlobalEventBuffer_init(int n)
+{
+ return new NdbGlobalEventBufferHandle(n);
+ // return NdbGlobalEventBufferHandle::init(n);
+}
+
+void
+NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *h)
+{
+ delete h;
+}
+
+NdbGlobalEventBufferHandle::NdbGlobalEventBufferHandle
+(int MAX_NUMBER_ACTIVE_EVENTS) : m_bufferL(0), m_nids(0)
+{
+ if ((p_cond = NdbCondition_Create()) == NULL) {
+ ndbout_c("NdbGlobalEventBufferHandle: NdbCondition_Create() failed");
+ exit(-1);
+ }
+
+ NdbMutex_Lock(&ndbGlobalEventBufferMutex);
+ if (ndbGlobalEventBuffer == NULL) {
+ if (ndbGlobalEventBuffer == NULL) {
+ ndbGlobalEventBuffer = new NdbGlobalEventBuffer();
+ if (!ndbGlobalEventBuffer) {
+ NdbMutex_Unlock(&ndbGlobalEventBufferMutex);
+ ndbout_c("NdbGlobalEventBufferHandle:: failed to allocate ndbGlobalEventBuffer");
+ exit(-1);
+ }
+ }
+ }
+ NdbMutex_Unlock(&ndbGlobalEventBufferMutex);
+
+ GUARD(real_init(this,MAX_NUMBER_ACTIVE_EVENTS));
+}
+
+NdbGlobalEventBufferHandle::~NdbGlobalEventBufferHandle()
+{
+ NdbCondition_Destroy(p_cond);
+
+ ndbGlobalEventBuffer->lock();
+ ndbGlobalEventBuffer->real_remove(this);
+ ndbGlobalEventBuffer->unlock();
+
+ NdbMutex_Lock(&ndbGlobalEventBufferMutex);
+ if (ndbGlobalEventBuffer->m_handlers.size() == 0) {
+ delete ndbGlobalEventBuffer;
+ ndbGlobalEventBuffer = NULL;
+ }
+ NdbMutex_Unlock(&ndbGlobalEventBufferMutex);
+}
+
+void
+NdbGlobalEventBufferHandle::addBufferId(int bufferId)
+{
+ if (m_nids >= NDB_MAX_ACTIVE_EVENTS) {
+ ndbout_c("NdbGlobalEventBufferHandle::addBufferId error in paramerer setting");
+ exit(-1);
+ }
+ m_bufferIds[m_nids] = bufferId;
+ m_nids++;
+}
+
+void
+NdbGlobalEventBufferHandle::dropBufferId(int bufferId)
+{
+ for (int i = 0; i < m_nids; i++)
+ if (m_bufferIds[i] == bufferId) {
+ m_nids--;
+ for (; i < m_nids; i++)
+ m_bufferIds[i] = m_bufferIds[i+1];
+ return;
+ }
+ ndbout_c("NdbGlobalEventBufferHandle::dropBufferId %d does not exist",
+ bufferId);
+ exit(-1);
+}
+/*
+NdbGlobalEventBufferHandle *
+NdbGlobalEventBufferHandle::init (int MAX_NUMBER_ACTIVE_EVENTS)
+{
+ return new NdbGlobalEventBufferHandle();
+}
+void
+NdbGlobalEventBufferHandle::drop(NdbGlobalEventBufferHandle *handle)
+{
+ delete handle;
+}
+*/
+int
+NdbGlobalEventBufferHandle::prepareAddSubscribeEvent(Uint32 eventId,
+ int& hasSubscriber)
+{
+ ADD_DROP_LOCK_GUARDR(int,real_prepareAddSubscribeEvent(this, eventId, hasSubscriber));
+}
+void
+NdbGlobalEventBufferHandle::addSubscribeEvent
+(int bufferId, NdbEventOperationImpl *ndbEventOperationImpl)
+{
+ ADD_DROP_UNLOCK_GUARD(real_addSubscribeEvent(bufferId, ndbEventOperationImpl));
+}
+void
+NdbGlobalEventBufferHandle::unprepareAddSubscribeEvent(int bufferId)
+{
+ ADD_DROP_UNLOCK_GUARD(real_unprepareAddSubscribeEvent(bufferId));
+}
+
+int
+NdbGlobalEventBufferHandle::prepareDropSubscribeEvent(int bufferId,
+ int& hasSubscriber)
+{
+ ADD_DROP_LOCK_GUARDR(int,real_prepareDropSubscribeEvent(bufferId, hasSubscriber));
+}
+
+void
+NdbGlobalEventBufferHandle::unprepareDropSubscribeEvent(int bufferId)
+{
+ ADD_DROP_UNLOCK_GUARD(real_unprepareDropSubscribeEvent(bufferId));
+}
+
+void
+NdbGlobalEventBufferHandle::dropSubscribeEvent(int bufferId)
+{
+ ADD_DROP_UNLOCK_GUARD(real_dropSubscribeEvent(bufferId));
+}
+
+int
+NdbGlobalEventBufferHandle::insertDataL(int bufferId,
+ const SubTableData * const sdata,
+ LinearSectionPtr ptr[3])
+{
+ GUARDR(int,real_insertDataL(bufferId,sdata,ptr));
+}
+
+void
+NdbGlobalEventBufferHandle::latestGCI(int bufferId, Uint32 gci)
+{
+ GUARD(real_latestGCI(bufferId,gci));
+}
+
+Uint32
+NdbGlobalEventBufferHandle::getLatestGCI()
+{
+ GUARDR(Uint32, real_getLatestGCI());
+}
+
+inline void
+NdbGlobalEventBufferHandle::group_lock()
+{
+ ndbGlobalEventBuffer->group_lock();
+}
+
+inline void
+NdbGlobalEventBufferHandle::group_unlock()
+{
+ ndbGlobalEventBuffer->group_unlock();
+}
+
+int
+NdbGlobalEventBufferHandle::wait(int aMillisecondNumber)
+{
+ GUARDR(int, real_wait(this, aMillisecondNumber));
+}
+
+int NdbGlobalEventBufferHandle::getDataL(const int bufferId,
+ SubTableData * &sdata,
+ LinearSectionPtr ptr[3],
+ int *pOverrun)
+{
+ GUARDR(int,real_getDataL(bufferId,sdata,ptr,pOverrun));
+}
+
+/*
+ * Class NdbGlobalEventBuffer
+ *
+ *
+ */
+
+
+void
+NdbGlobalEventBuffer::lock()
+{
+ if (!m_group_lock_flag)
+ NdbMutex_Lock(&ndbGlobalEventBufferMutex);
+}
+void
+NdbGlobalEventBuffer::unlock()
+{
+ if (!m_group_lock_flag)
+ NdbMutex_Unlock(&ndbGlobalEventBufferMutex);
+}
+void
+NdbGlobalEventBuffer::add_drop_lock()
+{
+ NdbMutex_Lock(p_add_drop_mutex);
+}
+void
+NdbGlobalEventBuffer::add_drop_unlock()
+{
+ NdbMutex_Unlock(p_add_drop_mutex);
+}
+inline void
+NdbGlobalEventBuffer::group_lock()
+{
+ lock();
+ m_group_lock_flag = 1;
+}
+
+inline void
+NdbGlobalEventBuffer::group_unlock()
+{
+ m_group_lock_flag = 0;
+ unlock();
+}
+
+void
+NdbGlobalEventBuffer::lockB(int bufferId)
+{
+ NdbMutex_Lock(m_buf[ID(bufferId)].p_buf_mutex);
+}
+void
+NdbGlobalEventBuffer::unlockB(int bufferId)
+{
+ NdbMutex_Lock(m_buf[ID(bufferId)].p_buf_mutex);
+}
+
+// Private methods
+
+NdbGlobalEventBuffer::NdbGlobalEventBuffer() :
+ m_handlers(),
+ m_group_lock_flag(0),
+ m_latestGCI(0),
+ m_no(0) // must start at ZERO!
+{
+ if ((p_add_drop_mutex = NdbMutex_Create()) == NULL) {
+ ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed");
+ exit(-1);
+ }
+}
+
+NdbGlobalEventBuffer::~NdbGlobalEventBuffer()
+{
+ NdbMutex_Destroy(p_add_drop_mutex);
+ // NdbMem_Deallocate(m_eventBufferIdToEventId);
+}
+void
+NdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h,
+ int MAX_NUMBER_ACTIVE_EVENTS)
+{
+ if (m_handlers.size() == 0) { // First init
+ m_max = MAX_NUMBER_ACTIVE_EVENTS;
+ m_buf = new BufItem[m_max];
+ // (BufItem *)NdbMem_Allocate(m_max*sizeof(BufItem));
+
+ for (int i=0; i<m_max; i++) {
+ m_buf[i].gId = 0;
+ }
+ }
+ // TODO make sure we don't hit roof
+ // m_handlers[m_nhandlers] = h;
+ m_handlers.push_back(h);
+ // ndbout_c("NdbGlobalEventBuffer::real_init(), m_handles=%u %u", m_nhandlers, h);
+}
+void
+NdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h)
+{
+ // ndbout_c("NdbGlobalEventBuffer::real_init_remove(), m_handles=%u %u", m_nhandlers, h);
+ for (Uint32 i=0 ; i < m_handlers.size(); i++) {
+ // ndbout_c("%u %u %u", i, m_handlers[i], h);
+ if (m_handlers[i] == h) {
+ m_handlers.erase(i);
+ if (m_handlers.size() == 0) {
+ // ndbout_c("last to go");
+ delete[] m_buf;
+ m_buf = NULL;
+ // NdbMem_Free((char*)m_buf);
+ }
+ return;
+ }
+ }
+ ndbout_c("NdbGlobalEventBuffer::real_init_remove() non-existing handle");
+ exit(-1);
+}
+
+int
+NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
+(NdbGlobalEventBufferHandle *aHandle, Uint32 eventId, int& hasSubscriber)
+{
+ int i;
+ int bufferId = -1;
+
+ // add_drop_lock(); // only one thread can do add or drop at a time
+
+ // Find place where eventId already set
+ for (i=0; i<m_no; i++) {
+ if (m_buf[i].gId == eventId) {
+ bufferId = i;
+ break;
+ }
+ }
+ if (bufferId < 0) {
+ // find space for new bufferId
+ for (i=0; i<m_no; i++) {
+ if (m_buf[i].gId == 0) {
+ bufferId = i; // we found an empty spot
+ break;
+ }
+ }
+ if (bufferId < 0 &&
+ m_no < m_max) {
+ // room for more so get that
+ bufferId=m_no;
+ m_buf[m_no].gId = 0;
+ m_no++;
+ } else {
+ ndbout_c("prepareAddSubscribeEvent: Can't accept more subscribers");
+ // add_drop_unlock();
+ return -1;
+ }
+ }
+
+ BufItem &b = m_buf[ID(bufferId)];
+
+ if (b.gId == 0) { // first subscriber needs some initialization
+
+ bufferId = NO_ID(0, bufferId);
+
+ b.gId = eventId;
+
+ if ((b.p_buf_mutex = NdbMutex_Create()) == NULL) {
+ ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed");
+ exit(-1);
+ }
+
+ b.subs = 0;
+ b.f = 0;
+ b.sz = 0;
+ b.max_sz = aHandle->m_bufferL;
+ b.data =
+ (BufItem::Data *)NdbMem_Allocate(b.max_sz*sizeof(BufItem::Data));
+ for (int i = 0; i < b.max_sz; i++) {
+ b.data[i].sdata = NULL;
+ b.data[i].ptr[0].p = NULL;
+ b.data[i].ptr[1].p = NULL;
+ b.data[i].ptr[2].p = NULL;
+ }
+ } else {
+#ifdef EVENT_DEBUG
+ ndbout_c("NdbGlobalEventBuffer::prepareAddSubscribeEvent: TRYING handle one subscriber per event b.subs = %u", b.subs);
+#endif
+
+ int ni = -1;
+ for(int i=0; i < b.subs;i++) {
+ if (b.ps[i].theHandle == NULL) {
+ ni = i;
+ break;
+ }
+ }
+ if (ni < 0) {
+ if (b.subs < MAX_SUBSCRIBERS_PER_EVENT) {
+ ni = b.subs;
+ } else {
+ ndbout_c("prepareAddSubscribeEvent: Can't accept more subscribers");
+ // add_drop_unlock();
+ return -1;
+ }
+ }
+ bufferId = NO_ID(ni, bufferId);
+ }
+
+ // initialize BufItem::Ps
+ {
+ int n = NO(bufferId);
+ NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];
+ e.theHandle = aHandle;
+ e.b=0;
+ e.bufferempty = 1;
+ e.overrun=0; // set to -1 to handle first insert
+ }
+
+ if (b.subs > 0)
+ hasSubscriber = 1;
+ else
+ hasSubscriber = 0;
+
+#ifdef EVENT_DEBUG
+ ndbout_c("prepareAddSubscribeEvent: handed out bufferId %d for eventId %d",
+ bufferId, eventId);
+#endif
+
+ /* we now have a lock on the prepare so that no one can mess with this
+ * unlock comes in unprepareAddSubscribeEvent or addSubscribeEvent
+ */
+ return bufferId;
+}
+
+void
+NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId)
+{
+ BufItem &b = m_buf[ID(bufferId)];
+ int n = NO(bufferId);
+
+ b.ps[n].theHandle = NULL;
+
+ // remove subscribers from the end,
+ // we have to keep gaps since the position
+ // has been handed out in bufferId
+ for (int i = b.subs-1; i >= 0; i--)
+ if (b.ps[i].theHandle == NULL)
+ b.subs--;
+ else
+ break;
+
+ if (b.subs == 0) {
+#ifdef EVENT_DEBUG
+ ndbout_c("unprepareAddSubscribeEvent: no more subscribers left on eventId %d", b.gId);
+#endif
+ b.gId = 0; // We don't have any subscribers, reuse BufItem
+ if (b.data) {
+ NdbMem_Free((void *)b.data);
+ b.data = NULL;
+ }
+ if (b.p_buf_mutex) {
+ NdbMutex_Destroy(b.p_buf_mutex);
+ b.p_buf_mutex = NULL;
+ }
+ }
+ // add_drop_unlock();
+}
+
+void
+NdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId,
+ void *ndbEventOperation)
+{
+ BufItem &b = m_buf[ID(bufferId)];
+ int n = NO(bufferId);
+
+ b.subs++;
+ b.ps[n].theHandle->addBufferId(bufferId);
+
+ // add_drop_unlock();
+#ifdef EVENT_DEBUG
+ ndbout_c("addSubscribeEvent:: added bufferId %d", bufferId);
+#endif
+}
+
+void
+NdbGlobalEventBuffer::real_unprepareDropSubscribeEvent(int bufferId)
+{
+ // add_drop_unlock(); // only one thread can do add or drop at a time
+}
+
+int
+NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId,
+ int& hasSubscriber)
+{
+ // add_drop_lock(); // only one thread can do add or drop at a time
+
+ BufItem &b = m_buf[ID(bufferId)];
+
+ int n = 0;
+ for(int i=0; i < b.subs;i++) {
+ if (b.ps[i].theHandle != NULL)
+ n++;
+ }
+
+ if (n > 1)
+ hasSubscriber = 1;
+ else if (n == 1)
+ hasSubscriber = 0;
+ else
+ return -1;
+
+ return 0;
+}
+
+void
+NdbGlobalEventBuffer::real_dropSubscribeEvent(int bufferId)
+{
+ // add_drop_lock(); // only one thread can do add-drop at a time
+
+ BufItem &b = m_buf[ID(bufferId)];
+ int n = NO(bufferId);
+
+ b.ps[n].overrun=0;
+ b.ps[n].bufferempty=1;
+ b.ps[n].b=0;
+ b.ps[n].theHandle->dropBufferId(bufferId);
+
+ real_unprepareAddSubscribeEvent(bufferId); // does add_drop_unlock();
+
+#ifdef EVENT_DEBUG
+ ndbout_c("dropSubscribeEvent:: dropped bufferId %d", bufferId);
+#endif
+}
+
+void
+NdbGlobalEventBuffer::real_latestGCI(int bufferId, Uint32 gci)
+{
+ if (gci > m_latestGCI)
+ m_latestGCI = gci;
+ else if ((m_latestGCI-gci) > 0xffff) // If NDB stays up :-)
+ m_latestGCI = gci;
+}
+
+Uint32
+NdbGlobalEventBuffer::real_getLatestGCI()
+{
+ return m_latestGCI;
+}
+
+int
+NdbGlobalEventBuffer::real_insertDataL(int bufferId,
+ const SubTableData * const sdata,
+ LinearSectionPtr ptr[3])
+{
+ BufItem &b = m_buf[ID(bufferId)];
+#ifdef EVENT_DEBUG
+ int n = NO(bufferId);
+#endif
+ {
+ if (b.subs) {
+#ifdef EVENT_DEBUG
+ ndbout_c("data insertion in buffer %d with eventId %d", bufferId, b.gId);
+#endif
+ // move front forward
+ if (copy_data_alloc(sdata, ptr,
+ b.data[b.f].sdata, b.data[b.f].ptr))
+ return -1;
+ for (int i=0; i < b.subs; i++) {
+ NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[i];
+ if (e.theHandle) { // active subscriber
+ if (b.f == e.b) { // next-to-read == written
+ if (e.bufferempty == 0) {
+ e.overrun++; // another item has been overwritten
+ e.b++; // move next-to-read next since old item was overwritten
+ if (e.b == b.max_sz) e.b = 0; // start from beginning
+ }
+ }
+ e.bufferempty = 0;
+ // signal subscriber that there's more to get
+ NdbCondition_Signal(e.theHandle->p_cond);
+ }
+ }
+ b.f++; // move next-to-write
+ if (b.f == b.max_sz) b.f = 0; // start from beginning
+#ifdef EVENT_DEBUG
+ ndbout_c("Front= %d Back = %d overun = %d", b.f,
+ b.ps[n].b, b.ps[n].overrun);
+#endif
+ } else {
+#ifdef EVENT_DEBUG
+ ndbout_c("Data arrived before ready eventId", b.gId);
+#endif
+ }
+ }
+ return 0;
+}
+
+int NdbGlobalEventBuffer::hasData(int bufferId) {
+ BufItem &b = m_buf[ID(bufferId)];
+ int n = NO(bufferId);
+ NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];
+
+ if(e.bufferempty)
+ return 0;
+
+ if (b.f <= e.b)
+ return b.max_sz-e.b + b.f;
+ else
+ return b.f-e.b;
+}
+
+int NdbGlobalEventBuffer::real_getDataL(const int bufferId,
+ SubTableData * &sdata,
+ LinearSectionPtr ptr[3],
+ int *pOverrun)
+{
+ BufItem &b = m_buf[ID(bufferId)];
+ int n = NO(bufferId);
+ NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];
+
+ if (pOverrun) {
+ *pOverrun = e.overrun;
+ e.overrun = 0; // if pOverrun is returned to user reset e.overrun
+ }
+
+ if (e.bufferempty)
+ return 0; // nothing to get
+
+ if (copy_data_alloc(b.data[e.b].sdata, b.data[e.b].ptr,
+ sdata, ptr))
+ return -1;
+
+ e.b++; if (e.b == b.max_sz) e.b = 0; // move next-to-read forward
+
+ if (b.f == e.b) // back has cought up with front
+ e.bufferempty = 1;
+
+#ifdef EVENT_DEBUG
+ ndbout_c("getting data from buffer %d with eventId %d", bufferId, b.gId);
+#endif
+
+ return hasData(bufferId)+1;
+}
+int
+NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
+ LinearSectionPtr f_ptr[3],
+ SubTableData * &t_sdata,
+ LinearSectionPtr t_ptr[3])
+{
+ if (t_sdata == NULL) {
+ t_sdata = (SubTableData *)NdbMem_Allocate(sizeof(SubTableData));
+ }
+ memcpy(t_sdata,f_sdata,sizeof(SubTableData));
+ for (int i = 0; i < 3; i++) {
+ LinearSectionPtr & f_p = f_ptr[i];
+ LinearSectionPtr & t_p = t_ptr[i];
+ if (f_p.sz > 0) {
+ if (t_p.p == NULL) {
+ t_p.p = (Uint32 *)NdbMem_Allocate(sizeof(Uint32)*f_p.sz);
+ } else if (t_p.sz != f_p.sz) {
+ NdbMem_Free(t_p.p);
+ t_p.p = (Uint32 *)NdbMem_Allocate(sizeof(Uint32)*f_p.sz);
+ }
+ memcpy(t_p.p, f_p.p, sizeof(Uint32)*f_p.sz);
+ } else if (t_p.p != NULL) {
+ NdbMem_Free(t_p.p);
+ t_p.p = NULL;
+ }
+ t_p.sz = f_p.sz;
+ }
+ return 0;
+}
+int
+NdbGlobalEventBuffer::real_wait(NdbGlobalEventBufferHandle *h,
+ int aMillisecondNumber)
+{
+ // check if there are anything in any of the buffers
+ int n = 0;
+ for (int i = 0; i < h->m_nids; i++)
+ n += hasData(h->m_bufferIds[i]);
+ if (n) return n;
+
+ int r = NdbCondition_WaitTimeout(h->p_cond, &ndbGlobalEventBufferMutex, aMillisecondNumber);
+ if (r > 0)
+ return -1;
+
+ n = 0;
+ for (int i = 0; i < h->m_nids; i++)
+ n += hasData(h->m_bufferIds[i]);
+ return n;
+}