diff options
Diffstat (limited to 'ndb/src/ndbapi/NdbEventOperationImpl.cpp')
-rw-r--r-- | ndb/src/ndbapi/NdbEventOperationImpl.cpp | 1305 |
1 files changed, 1305 insertions, 0 deletions
diff --git a/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/ndb/src/ndbapi/NdbEventOperationImpl.cpp new file mode 100644 index 00000000000..d167b8205a2 --- /dev/null +++ b/ndb/src/ndbapi/NdbEventOperationImpl.cpp @@ -0,0 +1,1305 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#include <stdio.h> + + +#include "NdbDictionaryImpl.hpp" +#include "API.hpp" +#include <NdbOut.hpp> +#include <AttrType.hpp> +#include "NdbApiSignal.hpp" +#include "TransporterFacade.hpp" +#include <signaldata/GetTabInfo.hpp> +#include <signaldata/DictTabInfo.hpp> +#include <signaldata/CreateTable.hpp> +#include <signaldata/CreateIndx.hpp> +#include <signaldata/CreateEvnt.hpp> +#include <signaldata/SumaImpl.hpp> +#include <signaldata/DropTable.hpp> +#include <signaldata/DropIndx.hpp> +#include <signaldata/ListTables.hpp> +#include <SimpleProperties.hpp> +#include <Bitmask.hpp> +#include <AttributeHeader.hpp> +#include <AttributeList.hpp> +#include <ndb_types.h> +#include <kernel_types.h> +#include <NdbError.hpp> +#include <BaseString.hpp> +#include <UtilBuffer.hpp> +#include <NdbDictionary.hpp> +#include <Ndb.hpp> +#include "NdbImpl.hpp" +#include "DictCache.hpp" +#include <portlib/NdbMem.h> +#include <NdbRecAttr.hpp> +#include <NdbEventOperation.hpp> +#include "NdbEventOperationImpl.hpp" + +/* + * Class NdbEventOperationImpl + * + * + */ + +//#define EVENT_DEBUG + + +NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, + Ndb *theNdb, + const char* eventName, + const int bufferLength) + : NdbEventOperation(*this), m_ndb(theNdb), + m_state(ERROR), m_bufferL(bufferLength) +{ + + m_eventId = 0; + theFirstRecAttrs[0] = NULL; + theCurrentRecAttrs[0] = NULL; + theFirstRecAttrs[1] = NULL; + theCurrentRecAttrs[1] = NULL; + sdata = NULL; + ptr[0].p = NULL; + ptr[1].p = NULL; + ptr[2].p = NULL; + + // we should lookup id in Dictionary, TODO + // also make sure we only have one listener on each event + + if (!m_ndb) { ndbout_c("m_ndb=NULL"); return; } + + NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); + if (!myDict) { ndbout_c("getDictionary=NULL"); return; } + + const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName); + if (!myEvnt) { ndbout_c("getEvent()=NULL"); return; } + + m_eventImpl = &myEvnt->m_impl; + if (!m_eventImpl) { ndbout_c("m_impl=NULL"); return; } + + m_bufferHandle = m_ndb->getGlobalEventBufferHandle(); + if (m_bufferHandle->m_bufferL > 0) + m_bufferL =m_bufferHandle->m_bufferL; + else + m_bufferHandle->m_bufferL = m_bufferL; + + m_state = CREATED; +} + +NdbEventOperationImpl::~NdbEventOperationImpl() +{ + if (sdata) NdbMem_Free(sdata); + for (int i=0 ; i<3; i++) { + if (ptr[i].p) NdbMem_Free(ptr[i].p); + } + for (int i=0 ; i<2; i++) { + NdbRecAttr *p = theFirstRecAttrs[i]; + while (p) { + NdbRecAttr *p_next = p->next(); + m_ndb->releaseRecAttr(p); + p = p_next; + } + } + if (m_state == NdbEventOperation::EXECUTING) { + stop(); + // m_bufferHandle->dropSubscribeEvent(m_bufferId); + ; // We should send stop signal here + } +} + +NdbEventOperation::State +NdbEventOperationImpl::getState() +{ + return m_state; +} + +NdbRecAttr* +NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n) +{ + if (m_state != NdbEventOperation::CREATED) { + ndbout_c("NdbEventOperationImpl::getValue may only be called between instantiation and execute()"); + return NULL; + } + + NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName); + + if (tAttrInfo == NULL) { + ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName); + return NULL; + } + + return NdbEventOperationImpl::getValue(tAttrInfo, aValue, n); +} + +NdbRecAttr* +NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n) +{ + // Insert Attribute Id into ATTRINFO part. + NdbRecAttr *&theFirstRecAttr = theFirstRecAttrs[n]; + NdbRecAttr *&theCurrentRecAttr = theCurrentRecAttrs[n]; + + /************************************************************************ + * Get a Receive Attribute object and link it into the operation object. + ************************************************************************/ + NdbRecAttr *tRecAttr = m_ndb->getRecAttr(); + if (tRecAttr == NULL) { + exit(-1); + //setErrorCodeAbort(4000); + return NULL; + } + + /********************************************************************** + * Now set the attribute identity and the pointer to the data in + * the RecAttr object + * Also set attribute size, array size and attribute type + ********************************************************************/ + if (tRecAttr->setup(tAttrInfo, aValue)) { + //setErrorCodeAbort(4000); + m_ndb->releaseRecAttr(tRecAttr); + exit(-1); + return NULL; + } + //theErrorLine++; + + tRecAttr->setUNDEFINED(); + + // We want to keep the list sorted to make data insertion easier later + if (theFirstRecAttr == NULL) { + theFirstRecAttr = tRecAttr; + theCurrentRecAttr = tRecAttr; + tRecAttr->next(NULL); + } else { + Uint32 tAttrId = tAttrInfo->m_attrId; + if (tAttrId > theCurrentRecAttr->attrId()) { // right order + theCurrentRecAttr->next(tRecAttr); + tRecAttr->next(NULL); + theCurrentRecAttr = tRecAttr; + } else if (theFirstRecAttr->next() == NULL || // only one in list + theFirstRecAttr->attrId() > tAttrId) {// or first + tRecAttr->next(theFirstRecAttr); + theFirstRecAttr = tRecAttr; + } else { // at least 2 in list and not first and not last + NdbRecAttr *p = theFirstRecAttr; + NdbRecAttr *p_next = p->next(); + while (tAttrId > p_next->attrId()) { + p = p_next; + p_next = p->next(); + } + if (tAttrId == p_next->attrId()) { // Using same attribute twice + tRecAttr->release(); // do I need to do this? + m_ndb->releaseRecAttr(tRecAttr); + exit(-1); + return NULL; + } + // this is it, between p and p_next + p->next(tRecAttr); + tRecAttr->next(p_next); + } + } + + return tRecAttr; +} + +int +NdbEventOperationImpl::execute() +{ + NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); + if (!myDict) { + ndbout_c("NdbEventOperation::execute(): getDictionary=NULL"); + return 0; + } + + if (theFirstRecAttrs[0] == NULL) { // defaults to get all + + } + + NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict); + + + int hasSubscriber; + m_bufferId = + m_bufferHandle->prepareAddSubscribeEvent(m_eventImpl->m_eventId, + hasSubscriber /* return value */); + + m_eventImpl->m_bufferId = m_bufferId; + + int r = -1; + if (m_bufferId >= 0) { + // now we check if there's already a subscriber + + if (hasSubscriber == 0) { // only excute if there's no other subscribers + r = myDictImpl.executeSubscribeEvent(*m_eventImpl); + } else { + r = 0; + } + if (r) { + //Error + m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId); + m_state = NdbEventOperation::ERROR; + } else { + m_bufferHandle->addSubscribeEvent(m_bufferId, this); + m_state = NdbEventOperation::EXECUTING; + } + } else { + //Error + m_state = NdbEventOperation::ERROR; + } + return r; +} + +int +NdbEventOperationImpl::stop() +{ + if (m_state != NdbEventOperation::EXECUTING) + return -1; + + // ndbout_c("NdbEventOperation::stopping()"); + + NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); + if (!myDict) { + ndbout_c("NdbEventOperation::stop(): getDictionary=NULL"); + return 0; + } + + NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict); + + int hasSubscriber; + int ret = + m_bufferHandle->prepareDropSubscribeEvent(m_bufferId, + hasSubscriber /* return value */); + + if (ret < 0) { + ndbout_c("prepareDropSubscribeEvent failed"); + return -1; + } + // m_eventImpl->m_bufferId = m_bufferId; + + int r = -1; + + if (hasSubscriber == 0) { // only excute if there's no other subscribers + r = myDictImpl.stopSubscribeEvent(*m_eventImpl); +#ifdef EVENT_DEBUG + ndbout_c("NdbEventOperation::stopping() done"); +#endif + } else + r = 0; + + if (r) { + //Error + m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId); + m_state = NdbEventOperation::ERROR; + } else { +#ifdef EVENT_DEBUG + ndbout_c("NdbEventOperation::dropping()"); +#endif + m_bufferHandle->dropSubscribeEvent(m_bufferId); + m_state = NdbEventOperation::CREATED; + } + + + return r; +} + +bool +NdbEventOperationImpl::isConsistent() +{ + return sdata->isGCIConsistent(); +} + +Uint32 +NdbEventOperationImpl::getGCI() +{ + return sdata->gci; +} + +Uint32 +NdbEventOperationImpl::getLatestGCI() +{ + return NdbGlobalEventBufferHandle::getLatestGCI(); +} + +int +NdbEventOperationImpl::next(int *pOverrun) +{ + int nr = 10000; // a high value + int tmpOverrun = 0; + int *ptmpOverrun; + if (pOverrun) { + ptmpOverrun = &tmpOverrun; + } else + ptmpOverrun = NULL; + + while (nr > 0) { + int r=NdbGlobalEventBufferHandle::getDataL(m_bufferId, sdata, + ptr, pOverrun); + if (pOverrun) { + tmpOverrun += *pOverrun; + *pOverrun = tmpOverrun; + } + + if (r <= 0) return r; // no data + + if (r < nr) r = nr; else nr--; // we don't want to be stuck here forever + +#ifdef EVENT_DEBUG + ndbout_c("!!!!!!!sdata->operation %u", (Uint32)sdata->operation); +#endif + + // now move the data into the RecAttrs + if ((theFirstRecAttrs[0] == NULL) && + (theFirstRecAttrs[1] == NULL)) return r; + // no copying since no RecAttr's + + + Uint32 *aAttrPtr = ptr[0].p; + Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz; + Uint32 *aDataPtr = ptr[1].p; + +#ifdef EVENT_DEBUG + printf("after values sz=%u\n", ptr[1].sz); + for(int i=0; i < ptr[1].sz; i++) + printf ("H'%.8X ",ptr[1].p[i]); + printf("\n"); + printf("before values sz=%u\n", ptr[2].sz); + for(int i=0; i < ptr[2].sz; i++) + printf ("H'%.8X ",ptr[2].p[i]); + printf("\n"); +#endif + + NdbRecAttr *tWorkingRecAttr = theFirstRecAttrs[0]; + + // copy data into the RecAttr's + // we assume that the respective attribute lists are sorted + + Uint32 tRecAttrId; + Uint32 tAttrId; + Uint32 tDataSz; + int hasSomeData=0; + while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) { + tRecAttrId = tWorkingRecAttr->attrId(); + tAttrId = AttributeHeader(*aAttrPtr).getAttributeId(); + tDataSz = AttributeHeader(*aAttrPtr).getDataSize(); + + while (tAttrId > tRecAttrId) { + //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); + tWorkingRecAttr->setUNDEFINED(); + tWorkingRecAttr = tWorkingRecAttr->next(); + if (tWorkingRecAttr == NULL) + break; + tRecAttrId = tWorkingRecAttr->attrId(); + } + if (tWorkingRecAttr == NULL) + break; + + //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); + + if (tAttrId == tRecAttrId) { + tWorkingRecAttr->setNotNULL(); + if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()) + hasSomeData++; + + //printf("set!\n"); + + Uint32 *theRef = (Uint32*)tWorkingRecAttr->aRef(); + Uint32 *theEndRef = theRef + tDataSz; + while (theRef < theEndRef) + *theRef++ = *aDataPtr++; + + // move forward, data has already moved forward + aAttrPtr++; + tWorkingRecAttr = tWorkingRecAttr->next(); + } else { + // move only attr forward + aAttrPtr++; + aDataPtr += tDataSz; + } + } + + while (tWorkingRecAttr != NULL) { + tRecAttrId = tWorkingRecAttr->attrId(); + //printf("set undefined [%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); + tWorkingRecAttr->setUNDEFINED(); + tWorkingRecAttr = tWorkingRecAttr->next(); + } + + tWorkingRecAttr = theFirstRecAttrs[1]; + aDataPtr = ptr[2].p; + Uint32 *aDataEndPtr = aDataPtr + ptr[2].sz; + while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) { + tRecAttrId = tWorkingRecAttr->attrId(); + tAttrId = AttributeHeader(*aDataPtr).getAttributeId(); + tDataSz = AttributeHeader(*aDataPtr).getDataSize(); + aDataPtr++; + while (tAttrId > tRecAttrId) { + tWorkingRecAttr->setUNDEFINED(); + tWorkingRecAttr = tWorkingRecAttr->next(); + if (tWorkingRecAttr == NULL) + break; + tRecAttrId = tWorkingRecAttr->attrId(); + } + if (tWorkingRecAttr == NULL) + break; + if (tAttrId == tRecAttrId) { + tWorkingRecAttr->setNotNULL(); + + if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()) + hasSomeData++; + + Uint32 *theRef = (Uint32*)tWorkingRecAttr->aRef(); + Uint32 *theEndRef = theRef + tDataSz; + while (theRef < theEndRef) + *theRef++ = *aDataPtr++; + + // move forward, data+attr has already moved forward + tWorkingRecAttr = tWorkingRecAttr->next(); + } else { + // move only data+attr forward + aDataPtr += tDataSz; + } + } + while (tWorkingRecAttr != NULL) { + tWorkingRecAttr->setUNDEFINED(); + tWorkingRecAttr = tWorkingRecAttr->next(); + } + + if (hasSomeData) + return r; + } + return 0; +} + +NdbDictionary::Event::TableEvent +NdbEventOperationImpl::getEventType() +{ + switch (sdata->operation) { + case TriggerEvent::TE_INSERT: + return NdbDictionary::Event::TE_INSERT; + case TriggerEvent::TE_DELETE: + return NdbDictionary::Event::TE_DELETE; + case TriggerEvent::TE_UPDATE: + return NdbDictionary::Event::TE_UPDATE; + default: + return NdbDictionary::Event::TE_ALL; + } +} + +void +NdbEventOperationImpl::printRecAttr(NdbRecAttr *p) +{ + int size = p->attrSize(); + int aSize = p->arraySize(); + + switch(p->attrType()){ + case UnSigned: + switch(size) { + case 8: ndbout << p->u_64_value(); break; + case 4: ndbout << p->u_32_value(); break; + case 2: ndbout << p->u_short_value(); break; + case 1: ndbout << (unsigned) p->u_char_value(); break; + default: ndbout << "Unknown size" << endl; + } + break; + + case Signed: + switch(size) { + case 8: ndbout << p->int64_value(); break; + case 4: ndbout << p->int32_value(); break; + case 2: ndbout << p->short_value(); break; + case 1: ndbout << (int) p->char_value(); break; + default: ndbout << "Unknown size" << endl; + } + break; + + case String: + { + char* buf = new char[aSize+1]; + memcpy(buf, p->aRef(), aSize); + buf[aSize] = 0; + ndbout << buf; + delete [] buf; + } + break; + + case Float: + ndbout << p->float_value(); + break; + + default: + ndbout << "Unknown"; + break; + } +} + +void +NdbEventOperationImpl::print() +{ + ndbout << "EventId " << m_eventId << "\n"; + + for (int i = 0; i < 2; i++) { + NdbRecAttr *p = theFirstRecAttrs[i]; + ndbout << " %u " << i; + while (p) { + ndbout << " : " << p->attrId() << " = "; + printRecAttr(p); + p = p->next(); + } + ndbout << "\n"; + } +} + +void +NdbEventOperationImpl::printAll() +{ + Uint32 *aAttrPtr = ptr[0].p; + Uint32 *aAttrEndPtr = aAttrPtr + ptr[0].sz; + Uint32 *aDataPtr = ptr[1].p; + + //tRecAttr->setup(tAttrInfo, aValue)) { + + Uint32 tAttrId; + Uint32 tDataSz; + for (; aAttrPtr < aAttrEndPtr; ) { + tAttrId = AttributeHeader(*aAttrPtr).getAttributeId(); + tDataSz = AttributeHeader(*aAttrPtr).getDataSize(); + + aAttrPtr++; + aDataPtr += tDataSz; + } +} + + +int NdbEventOperationImpl::wait(void *p, int aMillisecondNumber) +{ + return ((NdbGlobalEventBufferHandle*)p)->wait(aMillisecondNumber); +} + +/* + * Global variable ndbGlobalEventBuffer + * Class NdbGlobalEventBufferHandle + * Class NdbGlobalEventBuffer + * + */ + +#define ADD_DROP_LOCK_GUARDR(TYPE, FN) \ +{ \ + ndbGlobalEventBuffer->add_drop_lock(); \ + ndbGlobalEventBuffer->lock(); \ + TYPE r = ndbGlobalEventBuffer->FN; \ + ndbGlobalEventBuffer->unlock(); \ + if (r < 0) { \ + ndbGlobalEventBuffer->add_drop_unlock(); \ + } \ + return r;\ +} +#define GUARDR(TYPE, FN) \ +{ \ + ndbGlobalEventBuffer->lock(); \ + TYPE r = ndbGlobalEventBuffer->FN; \ + ndbGlobalEventBuffer->unlock(); \ + return r;\ +} +#define GUARD(FN) \ +{ \ + ndbGlobalEventBuffer->lock(); \ + ndbGlobalEventBuffer->FN; \ + ndbGlobalEventBuffer->unlock(); \ +} +#define ADD_DROP_UNLOCK_GUARD(FN) \ +{ \ + GUARD(FN); \ + ndbGlobalEventBuffer->add_drop_unlock(); \ +} +#define GUARDBLOCK(BLOCK) \ +{ \ + ndbGlobalEventBuffer->lock(); \ + BLOCK \ + ndbGlobalEventBuffer->unlock(); \ +} + +/* + * Global variable ndbGlobalEventBuffer + * + */ + +static NdbGlobalEventBuffer *ndbGlobalEventBuffer=NULL; +#ifdef NDB_WIN32 +static NdbMutex & ndbGlobalEventBufferMutex = * NdbMutex_Create(); +#else +static NdbMutex ndbGlobalEventBufferMutex = NDB_MUTEX_INITIALIZER; +#endif + +/* + * Class NdbGlobalEventBufferHandle + * Each Ndb object has a Handle. This Handle is used to access the + * global NdbGlobalEventBuffer instance ndbGlobalEventBuffer + */ + +NdbGlobalEventBufferHandle * +NdbGlobalEventBuffer_init(int n) +{ + return new NdbGlobalEventBufferHandle(n); + // return NdbGlobalEventBufferHandle::init(n); +} + +void +NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *h) +{ + delete h; +} + +NdbGlobalEventBufferHandle::NdbGlobalEventBufferHandle +(int MAX_NUMBER_ACTIVE_EVENTS) : m_bufferL(0), m_nids(0) +{ + if ((p_cond = NdbCondition_Create()) == NULL) { + ndbout_c("NdbGlobalEventBufferHandle: NdbCondition_Create() failed"); + exit(-1); + } + + NdbMutex_Lock(&ndbGlobalEventBufferMutex); + if (ndbGlobalEventBuffer == NULL) { + if (ndbGlobalEventBuffer == NULL) { + ndbGlobalEventBuffer = new NdbGlobalEventBuffer(); + if (!ndbGlobalEventBuffer) { + NdbMutex_Unlock(&ndbGlobalEventBufferMutex); + ndbout_c("NdbGlobalEventBufferHandle:: failed to allocate ndbGlobalEventBuffer"); + exit(-1); + } + } + } + NdbMutex_Unlock(&ndbGlobalEventBufferMutex); + + GUARD(real_init(this,MAX_NUMBER_ACTIVE_EVENTS)); +} + +NdbGlobalEventBufferHandle::~NdbGlobalEventBufferHandle() +{ + NdbCondition_Destroy(p_cond); + + ndbGlobalEventBuffer->lock(); + ndbGlobalEventBuffer->real_remove(this); + ndbGlobalEventBuffer->unlock(); + + NdbMutex_Lock(&ndbGlobalEventBufferMutex); + if (ndbGlobalEventBuffer->m_handlers.size() == 0) { + delete ndbGlobalEventBuffer; + ndbGlobalEventBuffer = NULL; + } + NdbMutex_Unlock(&ndbGlobalEventBufferMutex); +} + +void +NdbGlobalEventBufferHandle::addBufferId(int bufferId) +{ + if (m_nids >= NDB_MAX_ACTIVE_EVENTS) { + ndbout_c("NdbGlobalEventBufferHandle::addBufferId error in paramerer setting"); + exit(-1); + } + m_bufferIds[m_nids] = bufferId; + m_nids++; +} + +void +NdbGlobalEventBufferHandle::dropBufferId(int bufferId) +{ + for (int i = 0; i < m_nids; i++) + if (m_bufferIds[i] == bufferId) { + m_nids--; + for (; i < m_nids; i++) + m_bufferIds[i] = m_bufferIds[i+1]; + return; + } + ndbout_c("NdbGlobalEventBufferHandle::dropBufferId %d does not exist", + bufferId); + exit(-1); +} +/* +NdbGlobalEventBufferHandle * +NdbGlobalEventBufferHandle::init (int MAX_NUMBER_ACTIVE_EVENTS) +{ + return new NdbGlobalEventBufferHandle(); +} +void +NdbGlobalEventBufferHandle::drop(NdbGlobalEventBufferHandle *handle) +{ + delete handle; +} +*/ +int +NdbGlobalEventBufferHandle::prepareAddSubscribeEvent(Uint32 eventId, + int& hasSubscriber) +{ + ADD_DROP_LOCK_GUARDR(int,real_prepareAddSubscribeEvent(this, eventId, hasSubscriber)); +} +void +NdbGlobalEventBufferHandle::addSubscribeEvent +(int bufferId, NdbEventOperationImpl *ndbEventOperationImpl) +{ + ADD_DROP_UNLOCK_GUARD(real_addSubscribeEvent(bufferId, ndbEventOperationImpl)); +} +void +NdbGlobalEventBufferHandle::unprepareAddSubscribeEvent(int bufferId) +{ + ADD_DROP_UNLOCK_GUARD(real_unprepareAddSubscribeEvent(bufferId)); +} + +int +NdbGlobalEventBufferHandle::prepareDropSubscribeEvent(int bufferId, + int& hasSubscriber) +{ + ADD_DROP_LOCK_GUARDR(int,real_prepareDropSubscribeEvent(bufferId, hasSubscriber)); +} + +void +NdbGlobalEventBufferHandle::unprepareDropSubscribeEvent(int bufferId) +{ + ADD_DROP_UNLOCK_GUARD(real_unprepareDropSubscribeEvent(bufferId)); +} + +void +NdbGlobalEventBufferHandle::dropSubscribeEvent(int bufferId) +{ + ADD_DROP_UNLOCK_GUARD(real_dropSubscribeEvent(bufferId)); +} + +int +NdbGlobalEventBufferHandle::insertDataL(int bufferId, + const SubTableData * const sdata, + LinearSectionPtr ptr[3]) +{ + GUARDR(int,real_insertDataL(bufferId,sdata,ptr)); +} + +void +NdbGlobalEventBufferHandle::latestGCI(int bufferId, Uint32 gci) +{ + GUARD(real_latestGCI(bufferId,gci)); +} + +Uint32 +NdbGlobalEventBufferHandle::getLatestGCI() +{ + GUARDR(Uint32, real_getLatestGCI()); +} + +inline void +NdbGlobalEventBufferHandle::group_lock() +{ + ndbGlobalEventBuffer->group_lock(); +} + +inline void +NdbGlobalEventBufferHandle::group_unlock() +{ + ndbGlobalEventBuffer->group_unlock(); +} + +int +NdbGlobalEventBufferHandle::wait(int aMillisecondNumber) +{ + GUARDR(int, real_wait(this, aMillisecondNumber)); +} + +int NdbGlobalEventBufferHandle::getDataL(const int bufferId, + SubTableData * &sdata, + LinearSectionPtr ptr[3], + int *pOverrun) +{ + GUARDR(int,real_getDataL(bufferId,sdata,ptr,pOverrun)); +} + +/* + * Class NdbGlobalEventBuffer + * + * + */ + + +void +NdbGlobalEventBuffer::lock() +{ + if (!m_group_lock_flag) + NdbMutex_Lock(&ndbGlobalEventBufferMutex); +} +void +NdbGlobalEventBuffer::unlock() +{ + if (!m_group_lock_flag) + NdbMutex_Unlock(&ndbGlobalEventBufferMutex); +} +void +NdbGlobalEventBuffer::add_drop_lock() +{ + NdbMutex_Lock(p_add_drop_mutex); +} +void +NdbGlobalEventBuffer::add_drop_unlock() +{ + NdbMutex_Unlock(p_add_drop_mutex); +} +inline void +NdbGlobalEventBuffer::group_lock() +{ + lock(); + m_group_lock_flag = 1; +} + +inline void +NdbGlobalEventBuffer::group_unlock() +{ + m_group_lock_flag = 0; + unlock(); +} + +void +NdbGlobalEventBuffer::lockB(int bufferId) +{ + NdbMutex_Lock(m_buf[ID(bufferId)].p_buf_mutex); +} +void +NdbGlobalEventBuffer::unlockB(int bufferId) +{ + NdbMutex_Lock(m_buf[ID(bufferId)].p_buf_mutex); +} + +// Private methods + +NdbGlobalEventBuffer::NdbGlobalEventBuffer() : + m_handlers(), + m_group_lock_flag(0), + m_latestGCI(0), + m_no(0) // must start at ZERO! +{ + if ((p_add_drop_mutex = NdbMutex_Create()) == NULL) { + ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed"); + exit(-1); + } +} + +NdbGlobalEventBuffer::~NdbGlobalEventBuffer() +{ + NdbMutex_Destroy(p_add_drop_mutex); + // NdbMem_Deallocate(m_eventBufferIdToEventId); +} +void +NdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h, + int MAX_NUMBER_ACTIVE_EVENTS) +{ + if (m_handlers.size() == 0) { // First init + m_max = MAX_NUMBER_ACTIVE_EVENTS; + m_buf = new BufItem[m_max]; + // (BufItem *)NdbMem_Allocate(m_max*sizeof(BufItem)); + + for (int i=0; i<m_max; i++) { + m_buf[i].gId = 0; + } + } + // TODO make sure we don't hit roof + // m_handlers[m_nhandlers] = h; + m_handlers.push_back(h); + // ndbout_c("NdbGlobalEventBuffer::real_init(), m_handles=%u %u", m_nhandlers, h); +} +void +NdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h) +{ + // ndbout_c("NdbGlobalEventBuffer::real_init_remove(), m_handles=%u %u", m_nhandlers, h); + for (Uint32 i=0 ; i < m_handlers.size(); i++) { + // ndbout_c("%u %u %u", i, m_handlers[i], h); + if (m_handlers[i] == h) { + m_handlers.erase(i); + if (m_handlers.size() == 0) { + // ndbout_c("last to go"); + delete[] m_buf; + m_buf = NULL; + // NdbMem_Free((char*)m_buf); + } + return; + } + } + ndbout_c("NdbGlobalEventBuffer::real_init_remove() non-existing handle"); + exit(-1); +} + +int +NdbGlobalEventBuffer::real_prepareAddSubscribeEvent +(NdbGlobalEventBufferHandle *aHandle, Uint32 eventId, int& hasSubscriber) +{ + int i; + int bufferId = -1; + + // add_drop_lock(); // only one thread can do add or drop at a time + + // Find place where eventId already set + for (i=0; i<m_no; i++) { + if (m_buf[i].gId == eventId) { + bufferId = i; + break; + } + } + if (bufferId < 0) { + // find space for new bufferId + for (i=0; i<m_no; i++) { + if (m_buf[i].gId == 0) { + bufferId = i; // we found an empty spot + break; + } + } + if (bufferId < 0 && + m_no < m_max) { + // room for more so get that + bufferId=m_no; + m_buf[m_no].gId = 0; + m_no++; + } else { + ndbout_c("prepareAddSubscribeEvent: Can't accept more subscribers"); + // add_drop_unlock(); + return -1; + } + } + + BufItem &b = m_buf[ID(bufferId)]; + + if (b.gId == 0) { // first subscriber needs some initialization + + bufferId = NO_ID(0, bufferId); + + b.gId = eventId; + + if ((b.p_buf_mutex = NdbMutex_Create()) == NULL) { + ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed"); + exit(-1); + } + + b.subs = 0; + b.f = 0; + b.sz = 0; + b.max_sz = aHandle->m_bufferL; + b.data = + (BufItem::Data *)NdbMem_Allocate(b.max_sz*sizeof(BufItem::Data)); + for (int i = 0; i < b.max_sz; i++) { + b.data[i].sdata = NULL; + b.data[i].ptr[0].p = NULL; + b.data[i].ptr[1].p = NULL; + b.data[i].ptr[2].p = NULL; + } + } else { +#ifdef EVENT_DEBUG + ndbout_c("NdbGlobalEventBuffer::prepareAddSubscribeEvent: TRYING handle one subscriber per event b.subs = %u", b.subs); +#endif + + int ni = -1; + for(int i=0; i < b.subs;i++) { + if (b.ps[i].theHandle == NULL) { + ni = i; + break; + } + } + if (ni < 0) { + if (b.subs < MAX_SUBSCRIBERS_PER_EVENT) { + ni = b.subs; + } else { + ndbout_c("prepareAddSubscribeEvent: Can't accept more subscribers"); + // add_drop_unlock(); + return -1; + } + } + bufferId = NO_ID(ni, bufferId); + } + + // initialize BufItem::Ps + { + int n = NO(bufferId); + NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n]; + e.theHandle = aHandle; + e.b=0; + e.bufferempty = 1; + e.overrun=0; // set to -1 to handle first insert + } + + if (b.subs > 0) + hasSubscriber = 1; + else + hasSubscriber = 0; + +#ifdef EVENT_DEBUG + ndbout_c("prepareAddSubscribeEvent: handed out bufferId %d for eventId %d", + bufferId, eventId); +#endif + + /* we now have a lock on the prepare so that no one can mess with this + * unlock comes in unprepareAddSubscribeEvent or addSubscribeEvent + */ + return bufferId; +} + +void +NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId) +{ + BufItem &b = m_buf[ID(bufferId)]; + int n = NO(bufferId); + + b.ps[n].theHandle = NULL; + + // remove subscribers from the end, + // we have to keep gaps since the position + // has been handed out in bufferId + for (int i = b.subs-1; i >= 0; i--) + if (b.ps[i].theHandle == NULL) + b.subs--; + else + break; + + if (b.subs == 0) { +#ifdef EVENT_DEBUG + ndbout_c("unprepareAddSubscribeEvent: no more subscribers left on eventId %d", b.gId); +#endif + b.gId = 0; // We don't have any subscribers, reuse BufItem + if (b.data) { + NdbMem_Free((void *)b.data); + b.data = NULL; + } + if (b.p_buf_mutex) { + NdbMutex_Destroy(b.p_buf_mutex); + b.p_buf_mutex = NULL; + } + } + // add_drop_unlock(); +} + +void +NdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId, + void *ndbEventOperation) +{ + BufItem &b = m_buf[ID(bufferId)]; + int n = NO(bufferId); + + b.subs++; + b.ps[n].theHandle->addBufferId(bufferId); + + // add_drop_unlock(); +#ifdef EVENT_DEBUG + ndbout_c("addSubscribeEvent:: added bufferId %d", bufferId); +#endif +} + +void +NdbGlobalEventBuffer::real_unprepareDropSubscribeEvent(int bufferId) +{ + // add_drop_unlock(); // only one thread can do add or drop at a time +} + +int +NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId, + int& hasSubscriber) +{ + // add_drop_lock(); // only one thread can do add or drop at a time + + BufItem &b = m_buf[ID(bufferId)]; + + int n = 0; + for(int i=0; i < b.subs;i++) { + if (b.ps[i].theHandle != NULL) + n++; + } + + if (n > 1) + hasSubscriber = 1; + else if (n == 1) + hasSubscriber = 0; + else + return -1; + + return 0; +} + +void +NdbGlobalEventBuffer::real_dropSubscribeEvent(int bufferId) +{ + // add_drop_lock(); // only one thread can do add-drop at a time + + BufItem &b = m_buf[ID(bufferId)]; + int n = NO(bufferId); + + b.ps[n].overrun=0; + b.ps[n].bufferempty=1; + b.ps[n].b=0; + b.ps[n].theHandle->dropBufferId(bufferId); + + real_unprepareAddSubscribeEvent(bufferId); // does add_drop_unlock(); + +#ifdef EVENT_DEBUG + ndbout_c("dropSubscribeEvent:: dropped bufferId %d", bufferId); +#endif +} + +void +NdbGlobalEventBuffer::real_latestGCI(int bufferId, Uint32 gci) +{ + if (gci > m_latestGCI) + m_latestGCI = gci; + else if ((m_latestGCI-gci) > 0xffff) // If NDB stays up :-) + m_latestGCI = gci; +} + +Uint32 +NdbGlobalEventBuffer::real_getLatestGCI() +{ + return m_latestGCI; +} + +int +NdbGlobalEventBuffer::real_insertDataL(int bufferId, + const SubTableData * const sdata, + LinearSectionPtr ptr[3]) +{ + BufItem &b = m_buf[ID(bufferId)]; +#ifdef EVENT_DEBUG + int n = NO(bufferId); +#endif + { + if (b.subs) { +#ifdef EVENT_DEBUG + ndbout_c("data insertion in buffer %d with eventId %d", bufferId, b.gId); +#endif + // move front forward + if (copy_data_alloc(sdata, ptr, + b.data[b.f].sdata, b.data[b.f].ptr)) + return -1; + for (int i=0; i < b.subs; i++) { + NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[i]; + if (e.theHandle) { // active subscriber + if (b.f == e.b) { // next-to-read == written + if (e.bufferempty == 0) { + e.overrun++; // another item has been overwritten + e.b++; // move next-to-read next since old item was overwritten + if (e.b == b.max_sz) e.b = 0; // start from beginning + } + } + e.bufferempty = 0; + // signal subscriber that there's more to get + NdbCondition_Signal(e.theHandle->p_cond); + } + } + b.f++; // move next-to-write + if (b.f == b.max_sz) b.f = 0; // start from beginning +#ifdef EVENT_DEBUG + ndbout_c("Front= %d Back = %d overun = %d", b.f, + b.ps[n].b, b.ps[n].overrun); +#endif + } else { +#ifdef EVENT_DEBUG + ndbout_c("Data arrived before ready eventId", b.gId); +#endif + } + } + return 0; +} + +int NdbGlobalEventBuffer::hasData(int bufferId) { + BufItem &b = m_buf[ID(bufferId)]; + int n = NO(bufferId); + NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n]; + + if(e.bufferempty) + return 0; + + if (b.f <= e.b) + return b.max_sz-e.b + b.f; + else + return b.f-e.b; +} + +int NdbGlobalEventBuffer::real_getDataL(const int bufferId, + SubTableData * &sdata, + LinearSectionPtr ptr[3], + int *pOverrun) +{ + BufItem &b = m_buf[ID(bufferId)]; + int n = NO(bufferId); + NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n]; + + if (pOverrun) { + *pOverrun = e.overrun; + e.overrun = 0; // if pOverrun is returned to user reset e.overrun + } + + if (e.bufferempty) + return 0; // nothing to get + + if (copy_data_alloc(b.data[e.b].sdata, b.data[e.b].ptr, + sdata, ptr)) + return -1; + + e.b++; if (e.b == b.max_sz) e.b = 0; // move next-to-read forward + + if (b.f == e.b) // back has cought up with front + e.bufferempty = 1; + +#ifdef EVENT_DEBUG + ndbout_c("getting data from buffer %d with eventId %d", bufferId, b.gId); +#endif + + return hasData(bufferId)+1; +} +int +NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata, + LinearSectionPtr f_ptr[3], + SubTableData * &t_sdata, + LinearSectionPtr t_ptr[3]) +{ + if (t_sdata == NULL) { + t_sdata = (SubTableData *)NdbMem_Allocate(sizeof(SubTableData)); + } + memcpy(t_sdata,f_sdata,sizeof(SubTableData)); + for (int i = 0; i < 3; i++) { + LinearSectionPtr & f_p = f_ptr[i]; + LinearSectionPtr & t_p = t_ptr[i]; + if (f_p.sz > 0) { + if (t_p.p == NULL) { + t_p.p = (Uint32 *)NdbMem_Allocate(sizeof(Uint32)*f_p.sz); + } else if (t_p.sz != f_p.sz) { + NdbMem_Free(t_p.p); + t_p.p = (Uint32 *)NdbMem_Allocate(sizeof(Uint32)*f_p.sz); + } + memcpy(t_p.p, f_p.p, sizeof(Uint32)*f_p.sz); + } else if (t_p.p != NULL) { + NdbMem_Free(t_p.p); + t_p.p = NULL; + } + t_p.sz = f_p.sz; + } + return 0; +} +int +NdbGlobalEventBuffer::real_wait(NdbGlobalEventBufferHandle *h, + int aMillisecondNumber) +{ + // check if there are anything in any of the buffers + int n = 0; + for (int i = 0; i < h->m_nids; i++) + n += hasData(h->m_bufferIds[i]); + if (n) return n; + + int r = NdbCondition_WaitTimeout(h->p_cond, &ndbGlobalEventBufferMutex, aMillisecondNumber); + if (r > 0) + return -1; + + n = 0; + for (int i = 0; i < h->m_nids; i++) + n += hasData(h->m_bufferIds[i]); + return n; +} |