diff options
Diffstat (limited to 'ndb/src/ndbapi/NdbEventOperationImpl.hpp')
-rw-r--r-- | ndb/src/ndbapi/NdbEventOperationImpl.hpp | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/ndb/src/ndbapi/NdbEventOperationImpl.hpp b/ndb/src/ndbapi/NdbEventOperationImpl.hpp new file mode 100644 index 00000000000..b7dee084a9f --- /dev/null +++ b/ndb/src/ndbapi/NdbEventOperationImpl.hpp @@ -0,0 +1,206 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/***************************************************************************** + * Name: NdbEventOperationImpl.hpp + * Include: + * Link: + * Author: Tomas Ulin MySQL AB + * Date: 2003-11-21 + * Version: 0.1 + * Description: Event support + * Documentation: + * Adjust: 2003-11-21 Tomas Ulin First version. + ****************************************************************************/ + +#ifndef NdbEventOperationImpl_H +#define NdbEventOperationImpl_H + +class NdbGlobalEventBufferHandle; +class NdbEventOperationImpl : public NdbEventOperation { +public: + NdbEventOperationImpl(NdbEventOperation &N, + Ndb *theNdb, + const char* eventName, + const int bufferLength); + ~NdbEventOperationImpl(); + + NdbEventOperation::State getState(); + + int execute(); + int stop(); + NdbRecAttr *getValue(const char *colName, char *aValue, int n); + NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n); + static int wait(void *p, int aMillisecondNumber); + int next(int *pOverrun); + bool isConsistent(); + Uint32 getGCI(); + Uint32 getLatestGCI(); + + NdbDictionary::Event::TableEvent getEventType(); + + /* + getOperation(); + getGCI(); + getLogType(); + */ + + void print(); + void printAll(); + void printRecAttr(NdbRecAttr *); + + Ndb *m_ndb; + NdbEventImpl *m_eventImpl; + NdbGlobalEventBufferHandle *m_bufferHandle; + + NdbRecAttr *theFirstRecAttrs[2]; + NdbRecAttr *theCurrentRecAttrs[2]; + + NdbEventOperation::State m_state; + Uint32 m_eventId; + int m_bufferId; + int m_bufferL; + SubTableData *sdata; + LinearSectionPtr ptr[3]; +}; + +class NdbGlobalEventBuffer; +class NdbGlobalEventBufferHandle { +public: + NdbGlobalEventBufferHandle (int MAX_NUMBER_ACTIVE_EVENTS); + ~NdbGlobalEventBufferHandle (); + //static NdbGlobalEventBufferHandle *init(int MAX_NUMBER_ACTIVE_EVENTS); + + // returns bufferId 0-N if ok otherwise -1 + int prepareAddSubscribeEvent(Uint32 eventId, int& hasSubscriber); + void unprepareAddSubscribeEvent(int bufferId); + void addSubscribeEvent(int bufferId, + NdbEventOperationImpl *ndbEventOperationImpl); + + void unprepareDropSubscribeEvent(int bufferId); + int prepareDropSubscribeEvent(int bufferId, int& hasSubscriber); + void dropSubscribeEvent(int bufferId); + + static int getDataL(const int bufferId, + SubTableData * &sdata, + LinearSectionPtr ptr[3], + int *pOverrun); + static int insertDataL(int bufferId, + const SubTableData * const sdata, + LinearSectionPtr ptr[3]); + static void latestGCI(int bufferId, Uint32 gci); + static Uint32 getLatestGCI(); + static Uint32 getEventId(int bufferId); + + void group_lock(); + void group_unlock(); + int wait(int aMillisecondNumber); + int m_bufferL; +private: + friend class NdbGlobalEventBuffer; + void addBufferId(int bufferId); + void dropBufferId(int bufferId); + + struct NdbCondition *p_cond; + int m_nids; + int m_bufferIds[NDB_MAX_ACTIVE_EVENTS]; +}; + +class NdbGlobalEventBuffer { +private: + friend class NdbGlobalEventBufferHandle; + void lockB(int bufferId); + void unlockB(int bufferId); + void group_lock(); + void group_unlock(); + void lock(); + void unlock(); + void add_drop_lock(); + void add_drop_unlock(); + + NdbGlobalEventBuffer(); + ~NdbGlobalEventBuffer(); + + void real_remove(NdbGlobalEventBufferHandle *h); + void real_init(NdbGlobalEventBufferHandle *h, + int MAX_NUMBER_ACTIVE_EVENTS); + + int real_prepareAddSubscribeEvent(NdbGlobalEventBufferHandle *h, + Uint32 eventId, int& hasSubscriber); + void real_unprepareAddSubscribeEvent(int bufferId); + void real_addSubscribeEvent(int bufferId, void *ndbEventOperation); + + void real_unprepareDropSubscribeEvent(int bufferId); + int real_prepareDropSubscribeEvent(int bufferId, + int& hasSubscriber); + void real_dropSubscribeEvent(int bufferId); + + int real_getDataL(const int bufferId, + SubTableData * &sdata, + LinearSectionPtr ptr[3], + int *pOverrun); + int real_insertDataL(int bufferId, + const SubTableData * const sdata, + LinearSectionPtr ptr[3]); + void real_latestGCI(int bufferId, Uint32 gci); + Uint32 real_getLatestGCI(); + int copy_data_alloc(const SubTableData * const f_sdata, + LinearSectionPtr f_ptr[3], + SubTableData * &t_sdata, + LinearSectionPtr t_ptr[3]); + + int real_wait(NdbGlobalEventBufferHandle *, int aMillisecondNumber); + int hasData(int bufferId); + int ID (int bufferId) {return bufferId & 0xFF;}; + int NO (int bufferId) {return bufferId >> 16;}; + int NO_ID (int n, int bufferId) {return (n << 16) | ID(bufferId);}; + + Vector<NdbGlobalEventBufferHandle*> m_handlers; + + // Global Mutex used for some things + NdbMutex *p_add_drop_mutex; + + int m_group_lock_flag; + Uint32 m_latestGCI; + + int m_no; + int m_max; +#define MAX_SUBSCRIBERS_PER_EVENT 16 + struct BufItem { + // local mutex for each event/buffer + NdbMutex *p_buf_mutex; + Uint32 gId; + struct Data { + SubTableData *sdata; + LinearSectionPtr ptr[3]; + } * data; + + struct Ps { + NdbGlobalEventBufferHandle *theHandle; + int b; + int overrun; + int bufferempty; + //void *ndbEventOperation; + } ps[MAX_SUBSCRIBERS_PER_EVENT]; // only supports 1 subscriber so far + + int subs; + int f; + int sz; + int max_sz; + }; + BufItem *m_buf; +}; +#endif |