diff options
26 files changed, 9 insertions, 6580 deletions
diff --git a/ndb/include/Makefile.am b/ndb/include/Makefile.am index 240101c2004..842f4daabee 100644 --- a/ndb/include/Makefile.am +++ b/ndb/include/Makefile.am @@ -15,7 +15,6 @@ ndbapi/NdbApi.hpp \ ndbapi/NdbTransaction.hpp \ ndbapi/NdbDictionary.hpp \ ndbapi/NdbError.hpp \ -ndbapi/NdbEventOperation.hpp \ ndbapi/NdbIndexOperation.hpp \ ndbapi/NdbOperation.hpp \ ndbapi/ndb_cluster_connection.hpp \ diff --git a/ndb/include/kernel/GlobalSignalNumbers.h b/ndb/include/kernel/GlobalSignalNumbers.h index ca82806f4b1..ac8ac75dc41 100644 --- a/ndb/include/kernel/GlobalSignalNumbers.h +++ b/ndb/include/kernel/GlobalSignalNumbers.h @@ -731,9 +731,11 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES; #define GSN_SUB_CREATE_REQ 576 #define GSN_SUB_CREATE_REF 577 #define GSN_SUB_CREATE_CONF 578 +/* #define GSN_SUB_START_REQ 579 #define GSN_SUB_START_REF 580 #define GSN_SUB_START_CONF 581 +*/ #define GSN_SUB_SYNC_REQ 582 #define GSN_SUB_SYNC_REF 583 #define GSN_SUB_SYNC_CONF 584 @@ -899,10 +901,11 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES; /** * SUMA restart protocol */ +/* #define GSN_SUMA_START_ME 691 #define GSN_SUMA_HANDOVER_REQ 692 #define GSN_SUMA_HANDOVER_CONF 693 - +*/ /* not used 694 */ /* not used 695 */ /* not used 696 */ @@ -919,6 +922,7 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES; /* * EVENT Signals */ +/* #define GSN_SUB_GCP_COMPLETE_ACC 699 #define GSN_CREATE_EVNT_REQ 700 @@ -928,7 +932,7 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES; #define GSN_DROP_EVNT_REQ 703 #define GSN_DROP_EVNT_CONF 704 #define GSN_DROP_EVNT_REF 705 - +*/ #define GSN_TUX_BOUND_INFO 710 #define GSN_ACC_LOCKREQ 711 diff --git a/ndb/include/ndbapi/Ndb.hpp b/ndb/include/ndbapi/Ndb.hpp index f128a45f5bf..e7c1e85c02a 100644 --- a/ndb/include/ndbapi/Ndb.hpp +++ b/ndb/include/ndbapi/Ndb.hpp @@ -38,9 +38,6 @@ In addition, the NDB API defines a structure NdbError, which contains the specification for an error. - It is also possible to receive "events" triggered when data in the database in changed. - This is done through the NdbEventOperation class. - There are also some auxiliary classes, which are listed in the class hierarchy. The main structure of an application program is as follows: @@ -968,7 +965,6 @@ class NdbObjectIdMap; class NdbOperation; -class NdbEventOperationImpl; class NdbScanOperation; class NdbIndexScanOperation; class NdbIndexOperation; @@ -981,13 +977,11 @@ class NdbSubroutine; class NdbCall; class Table; class BaseString; -class NdbEventOperation; class NdbBlob; class NdbReceiver; class Ndb_local_table_info; template <class T> struct Ndb_free_list_t; -typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*); #if defined NDB_OSE /** @@ -1049,7 +1043,6 @@ class Ndb #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL friend class NdbReceiver; friend class NdbOperation; - friend class NdbEventOperationImpl; friend class NdbTransaction; friend class Table; friend class NdbApiSignal; @@ -1196,46 +1189,6 @@ public: /** @} *********************************************************************/ /** - * @name Event subscriptions - * @{ - */ - - /** - * Create a subcription to an event defined in the database - * - * @param eventName - * unique identifier of the event - * @param bufferLength - * circular buffer size for storing event data - * - * @return Object representing an event, NULL on failure - */ - NdbEventOperation* createEventOperation(const char* eventName, - const int bufferLength); - /** - * Drop a subscription to an event - * - * @param eventOp - * Event operation - * - * @return 0 on success - */ - int dropEventOperation(NdbEventOperation* eventOp); - - /** - * Wait for an event to occur. Will return as soon as an event - * is detected on any of the created events. - * - * @param aMillisecondNumber - * maximum time to wait - * - * @return the number of events that has occured, -1 on failure - */ - int pollEvents(int aMillisecondNumber); - - /** @} *********************************************************************/ - - /** * @name Starting and Closing Transactions * @{ */ diff --git a/ndb/include/ndbapi/NdbApi.hpp b/ndb/include/ndbapi/NdbApi.hpp index aed4d5efbd7..c8400ed78ce 100644 --- a/ndb/include/ndbapi/NdbApi.hpp +++ b/ndb/include/ndbapi/NdbApi.hpp @@ -29,7 +29,6 @@ #include "NdbScanFilter.hpp" #include "NdbRecAttr.hpp" #include "NdbDictionary.hpp" -#include "NdbEventOperation.hpp" #include "NdbPool.hpp" #include "NdbBlob.hpp" #endif diff --git a/ndb/include/ndbapi/NdbDictionary.hpp b/ndb/include/ndbapi/NdbDictionary.hpp index e67a0253096..db84c3715a5 100644 --- a/ndb/include/ndbapi/NdbDictionary.hpp +++ b/ndb/include/ndbapi/NdbDictionary.hpp @@ -938,165 +938,6 @@ public: }; /** - * @brief Represents an Event in NDB Cluster - * - */ - class Event : public Object { - public: - /** - * Specifies the type of database operations an Event listens to - */ -#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL - /** TableEvent must match 1 << TriggerEvent */ -#endif - enum TableEvent { - TE_INSERT=1, ///< Insert event on table - TE_DELETE=2, ///< Delete event on table - TE_UPDATE=4, ///< Update event on table - TE_ALL=7 ///< Any/all event on table (not relevant when - ///< events are received) - }; - /** - * Specifies the durability of an event - * (future version may supply other types) - */ - enum EventDurability { - ED_UNDEFINED -#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL - = 0 -#endif -#if 0 // not supported - ,ED_SESSION = 1, - // Only this API can use it - // and it's deleted after api has disconnected or ndb has restarted - - ED_TEMPORARY = 2 - // All API's can use it, - // But's its removed when ndb is restarted -#endif - ,ED_PERMANENT ///< All API's can use it. - ///< It's still defined after a cluster system restart -#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL - = 3 -#endif - }; - - /** - * Constructor - * @param name Name of event - */ - Event(const char *name); - /** - * Constructor - * @param name Name of event - * @param table Reference retrieved from NdbDictionary - */ - Event(const char *name, const NdbDictionary::Table& table); - virtual ~Event(); - /** - * Set unique identifier for the event - */ - void setName(const char *name); - /** - * Get unique identifier for the event - */ - const char *getName() const; - /** - * Define table on which events should be detected - * - * @note calling this method will default to detection - * of events on all columns. Calling subsequent - * addEventColumn calls will override this. - * - * @param table reference retrieved from NdbDictionary - */ - void setTable(const NdbDictionary::Table& table); - /** - * Set table for which events should be detected - * - * @note preferred way is using setTable(const NdbDictionary::Table&) - * or constructor with table object parameter - */ - void setTable(const char *tableName); - /** - * Get table name for events - * - * @return table name - */ - const char* getTableName() const; - /** - * Add type of event that should be detected - */ - void addTableEvent(const TableEvent te); - /** - * Set durability of the event - */ - void setDurability(EventDurability); - /** - * Get durability of the event - */ - EventDurability getDurability() const; -#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL - void addColumn(const Column &c); -#endif - /** - * Add a column on which events should be detected - * - * @param attrId Column id - * - * @note errors will mot be detected until createEvent() is called - */ - void addEventColumn(unsigned attrId); - /** - * Add a column on which events should be detected - * - * @param columnName Column name - * - * @note errors will not be detected until createEvent() is called - */ - void addEventColumn(const char * columnName); - /** - * Add several columns on which events should be detected - * - * @param n Number of columns - * @param columnNames Column names - * - * @note errors will mot be detected until - * NdbDictionary::Dictionary::createEvent() is called - */ - void addEventColumns(int n, const char ** columnNames); - - /** - * Get no of columns defined in an Event - * - * @return Number of columns, -1 on error - */ - int getNoOfEventColumns() const; - - /** - * Get object status - */ - virtual Object::Status getObjectStatus() const; - - /** - * Get object version - */ - virtual int getObjectVersion() const; - -#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL - void print(); -#endif - - private: -#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL - friend class NdbEventImpl; - friend class NdbEventOperationImpl; -#endif - class NdbEventImpl & m_impl; - Event(NdbEventImpl&); - }; - - /** * @class Dictionary * @brief Dictionary for defining and retreiving meta data */ @@ -1215,33 +1056,6 @@ public: int listIndexes(List & list, const char * tableName) const; /** @} *******************************************************************/ - /** - * @name Events - * @{ - */ - - /** - * Create event given defined Event instance - * @param event Event to create - * @return 0 if successful otherwise -1. - */ - int createEvent(const Event &event); - - /** - * Drop event with given name - * @param eventName Name of event to drop. - * @return 0 if successful otherwise -1. - */ - int dropEvent(const char * eventName); - - /** - * Get event with given name. - * @param eventName Name of event to get. - * @return an Event if successful, otherwise NULL. - */ - const Event * getEvent(const char * eventName); - - /** @} *******************************************************************/ /** * @name Table creation diff --git a/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp b/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp index 34cae9f618f..572d8f6e3ca 100644 --- a/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp +++ b/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp @@ -156,12 +156,6 @@ SignalDataPrintFunctions[] = { { GSN_SUB_REMOVE_REQ, printSUB_REMOVE_REQ }, { GSN_SUB_REMOVE_REF, printSUB_REMOVE_REF }, { GSN_SUB_REMOVE_CONF, printSUB_REMOVE_CONF }, - { GSN_SUB_START_REQ, printSUB_START_REQ }, - { GSN_SUB_START_REF, printSUB_START_REF }, - { GSN_SUB_START_CONF, printSUB_START_CONF }, - { GSN_SUB_STOP_REQ, printSUB_STOP_REQ }, - { GSN_SUB_STOP_REF, printSUB_STOP_REF }, - { GSN_SUB_STOP_CONF, printSUB_STOP_CONF }, { GSN_SUB_SYNC_REQ, printSUB_SYNC_REQ }, { GSN_SUB_SYNC_REF, printSUB_SYNC_REF }, { GSN_SUB_SYNC_CONF, printSUB_SYNC_CONF }, diff --git a/ndb/src/common/debugger/signaldata/SignalNames.cpp b/ndb/src/common/debugger/signaldata/SignalNames.cpp index 5162679017a..8aedf35344f 100644 --- a/ndb/src/common/debugger/signaldata/SignalNames.cpp +++ b/ndb/src/common/debugger/signaldata/SignalNames.cpp @@ -502,18 +502,6 @@ const GsnName SignalNames [] = { //,{ GSN_TCINDEXNEXTCONF, "TCINDEXNEXTCONF" } //,{ GSN_TCINDEXNEXREF, "TCINDEXNEXREF" } - ,{ GSN_CREATE_EVNT_REQ, "CREATE_EVNT_REQ" } - ,{ GSN_CREATE_EVNT_CONF, "CREATE_EVNT_CONF" } - ,{ GSN_CREATE_EVNT_REF, "CREATE_EVNT_REF" } - - ,{ GSN_SUMA_START_ME, "SUMA_START_ME" } - ,{ GSN_SUMA_HANDOVER_REQ, "SUMA_HANDOVER_REQ"} - ,{ GSN_SUMA_HANDOVER_CONF, "SUMA_HANDOVER_CONF"} - - ,{ GSN_DROP_EVNT_REQ, "DROP_EVNT_REQ" } - ,{ GSN_DROP_EVNT_CONF, "DROP_EVNT_CONF" } - ,{ GSN_DROP_EVNT_REF, "DROP_EVNT_REF" } - ,{ GSN_BACKUP_TRIG_REQ, "BACKUP_TRIG_REQ" } ,{ GSN_BACKUP_REQ, "BACKUP_REQ" } ,{ GSN_BACKUP_DATA, "BACKUP_DATA" } @@ -581,12 +569,6 @@ const GsnName SignalNames [] = { ,{ GSN_SUB_REMOVE_REQ, "SUB_REMOVE_REQ" } ,{ GSN_SUB_REMOVE_REF, "SUB_REMOVE_REF" } ,{ GSN_SUB_REMOVE_CONF, "SUB_REMOVE_CONF" } - ,{ GSN_SUB_START_REQ, "SUB_START_REQ" } - ,{ GSN_SUB_START_REF, "SUB_START_REF" } - ,{ GSN_SUB_START_CONF, "SUB_START_CONF" } - ,{ GSN_SUB_STOP_REQ, "SUB_STOP_REQ" } - ,{ GSN_SUB_STOP_REF, "SUB_STOP_REF" } - ,{ GSN_SUB_STOP_CONF, "SUB_STOP_CONF" } ,{ GSN_SUB_SYNC_REQ, "SUB_SYNC_REQ" } ,{ GSN_SUB_SYNC_REF, "SUB_SYNC_REF" } ,{ GSN_SUB_SYNC_CONF, "SUB_SYNC_CONF" } @@ -596,7 +578,6 @@ const GsnName SignalNames [] = { ,{ GSN_SUB_SYNC_CONTINUE_REF, "SUB_SYNC_CONTINUE_REF" } ,{ GSN_SUB_SYNC_CONTINUE_CONF, "SUB_SYNC_CONTINUE_CONF" } ,{ GSN_SUB_GCP_COMPLETE_REP, "SUB_GCP_COMPLETE_REP" } - ,{ GSN_SUB_GCP_COMPLETE_ACC, "SUB_GCP_COMPLETE_ACC" } ,{ GSN_CREATE_SUBID_REQ, "CREATE_SUBID_REQ" } ,{ GSN_CREATE_SUBID_REF, "CREATE_SUBID_REF" } diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp index 133b4d75d8e..efd519339f7 100644 --- a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp +++ b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp @@ -1210,10 +1210,6 @@ Dbdict::Dbdict(const class Configuration & conf): c_opDropIndex(c_opRecordPool), c_opAlterIndex(c_opRecordPool), c_opBuildIndex(c_opRecordPool), - c_opCreateEvent(c_opRecordPool), - c_opSubEvent(c_opRecordPool), - c_opDropEvent(c_opRecordPool), - c_opSignalUtil(c_opRecordPool), c_opCreateTrigger(c_opRecordPool), c_opDropTrigger(c_opRecordPool), c_opAlterTrigger(c_opRecordPool), @@ -1273,44 +1269,6 @@ Dbdict::Dbdict(const class Configuration & conf): addRecSignal(GSN_BUILDINDXCONF, &Dbdict::execBUILDINDXCONF); addRecSignal(GSN_BUILDINDXREF, &Dbdict::execBUILDINDXREF); - // Util signals - addRecSignal(GSN_UTIL_PREPARE_CONF, &Dbdict::execUTIL_PREPARE_CONF); - addRecSignal(GSN_UTIL_PREPARE_REF, &Dbdict::execUTIL_PREPARE_REF); - - addRecSignal(GSN_UTIL_EXECUTE_CONF, &Dbdict::execUTIL_EXECUTE_CONF); - addRecSignal(GSN_UTIL_EXECUTE_REF, &Dbdict::execUTIL_EXECUTE_REF); - - addRecSignal(GSN_UTIL_RELEASE_CONF, &Dbdict::execUTIL_RELEASE_CONF); - addRecSignal(GSN_UTIL_RELEASE_REF, &Dbdict::execUTIL_RELEASE_REF); - - // Event signals - addRecSignal(GSN_CREATE_EVNT_REQ, &Dbdict::execCREATE_EVNT_REQ); - addRecSignal(GSN_CREATE_EVNT_CONF, &Dbdict::execCREATE_EVNT_CONF); - addRecSignal(GSN_CREATE_EVNT_REF, &Dbdict::execCREATE_EVNT_REF); - - addRecSignal(GSN_CREATE_SUBID_CONF, &Dbdict::execCREATE_SUBID_CONF); - addRecSignal(GSN_CREATE_SUBID_REF, &Dbdict::execCREATE_SUBID_REF); - - addRecSignal(GSN_SUB_CREATE_CONF, &Dbdict::execSUB_CREATE_CONF); - addRecSignal(GSN_SUB_CREATE_REF, &Dbdict::execSUB_CREATE_REF); - - addRecSignal(GSN_SUB_START_REQ, &Dbdict::execSUB_START_REQ); - addRecSignal(GSN_SUB_START_CONF, &Dbdict::execSUB_START_CONF); - addRecSignal(GSN_SUB_START_REF, &Dbdict::execSUB_START_REF); - - addRecSignal(GSN_SUB_STOP_REQ, &Dbdict::execSUB_STOP_REQ); - addRecSignal(GSN_SUB_STOP_CONF, &Dbdict::execSUB_STOP_CONF); - addRecSignal(GSN_SUB_STOP_REF, &Dbdict::execSUB_STOP_REF); - - addRecSignal(GSN_SUB_SYNC_CONF, &Dbdict::execSUB_SYNC_CONF); - addRecSignal(GSN_SUB_SYNC_REF, &Dbdict::execSUB_SYNC_REF); - - addRecSignal(GSN_DROP_EVNT_REQ, &Dbdict::execDROP_EVNT_REQ); - - addRecSignal(GSN_SUB_REMOVE_REQ, &Dbdict::execSUB_REMOVE_REQ); - addRecSignal(GSN_SUB_REMOVE_CONF, &Dbdict::execSUB_REMOVE_CONF); - addRecSignal(GSN_SUB_REMOVE_REF, &Dbdict::execSUB_REMOVE_REF); - // Trigger signals addRecSignal(GSN_CREATE_TRIG_REQ, &Dbdict::execCREATE_TRIG_REQ); addRecSignal(GSN_CREATE_TRIG_CONF, &Dbdict::execCREATE_TRIG_CONF); @@ -1772,10 +1730,6 @@ void Dbdict::execREAD_CONFIG_REQ(Signal* signal) c_opCreateTable.setSize(8); c_opDropTable.setSize(8); c_opCreateIndex.setSize(8); - c_opCreateEvent.setSize(8); - c_opSubEvent.setSize(8); - c_opDropEvent.setSize(8); - c_opSignalUtil.setSize(8); c_opDropIndex.setSize(8); c_opAlterIndex.setSize(8); c_opBuildIndex.setSize(8); @@ -7449,2171 +7403,6 @@ Dbdict::dropIndex_sendReply(Signal* signal, OpDropIndexPtr opPtr, sendSignal(rep->getUserRef(), gsn, signal, length, JBB); } -/***************************************************** - * - * Util signalling - * - *****************************************************/ - -int -Dbdict::sendSignalUtilReq(Callback *pcallback, - BlockReference ref, - GlobalSignalNumber gsn, - Signal* signal, - Uint32 length, - JobBufferLevel jbuf, - LinearSectionPtr ptr[3], - Uint32 noOfSections) -{ - jam(); - EVENT_TRACE; - OpSignalUtilPtr utilRecPtr; - - // Seize a Util Send record - if (!c_opSignalUtil.seize(utilRecPtr)) { - // Failed to allocate util record - return -1; - } - utilRecPtr.p->m_callback = *pcallback; - - // should work for all util signal classes - UtilPrepareReq *req = (UtilPrepareReq*)signal->getDataPtrSend(); - utilRecPtr.p->m_userData = req->getSenderData(); - req->setSenderData(utilRecPtr.i); - - if (ptr) { - jam(); - sendSignal(ref, gsn, signal, length, jbuf, ptr, noOfSections); - } else { - jam(); - sendSignal(ref, gsn, signal, length, jbuf); - } - - return 0; -} - -int -Dbdict::recvSignalUtilReq(Signal* signal, Uint32 returnCode) -{ - jam(); - EVENT_TRACE; - UtilPrepareConf * const req = (UtilPrepareConf*)signal->getDataPtr(); - OpSignalUtilPtr utilRecPtr; - utilRecPtr.i = req->getSenderData(); - if ((utilRecPtr.p = c_opSignalUtil.getPtr(utilRecPtr.i)) == NULL) { - jam(); - return -1; - } - - req->setSenderData(utilRecPtr.p->m_userData); - Callback c = utilRecPtr.p->m_callback; - c_opSignalUtil.release(utilRecPtr); - - execute(signal, c, returnCode); - return 0; -} - -void Dbdict::execUTIL_PREPARE_CONF(Signal *signal) -{ - jamEntry(); - EVENT_TRACE; - ndbrequire(recvSignalUtilReq(signal, 0) == 0); -} - -void -Dbdict::execUTIL_PREPARE_REF(Signal *signal) -{ - jamEntry(); - EVENT_TRACE; - ndbrequire(recvSignalUtilReq(signal, 1) == 0); -} - -void Dbdict::execUTIL_EXECUTE_CONF(Signal *signal) -{ - jamEntry(); - EVENT_TRACE; - ndbrequire(recvSignalUtilReq(signal, 0) == 0); -} - -void Dbdict::execUTIL_EXECUTE_REF(Signal *signal) -{ - jamEntry(); - EVENT_TRACE; - -#ifdef EVENT_DEBUG - UtilExecuteRef * ref = (UtilExecuteRef *)signal->getDataPtrSend(); - - ndbout_c("execUTIL_EXECUTE_REF"); - ndbout_c("senderData %u",ref->getSenderData()); - ndbout_c("errorCode %u",ref->getErrorCode()); - ndbout_c("TCErrorCode %u",ref->getTCErrorCode()); -#endif - - ndbrequire(recvSignalUtilReq(signal, 1) == 0); -} -void Dbdict::execUTIL_RELEASE_CONF(Signal *signal) -{ - jamEntry(); - EVENT_TRACE; - ndbrequire(false); - ndbrequire(recvSignalUtilReq(signal, 0) == 0); -} -void Dbdict::execUTIL_RELEASE_REF(Signal *signal) -{ - jamEntry(); - EVENT_TRACE; - ndbrequire(false); - ndbrequire(recvSignalUtilReq(signal, 1) == 0); -} - -/** - * MODULE: Create event - * - * Create event in DICT. - * - * - * Request type in CREATE_EVNT signals: - * - * Signalflow see Dbdict.txt - * - */ - -/***************************************************************** - * - * Systable stuff - * - */ - -const Uint32 Dbdict::sysTab_NDBEVENTS_0_szs[EVENT_SYSTEM_TABLE_LENGTH] = { - sizeof(((sysTab_NDBEVENTS_0*)0)->NAME), - sizeof(((sysTab_NDBEVENTS_0*)0)->EVENT_TYPE), - sizeof(((sysTab_NDBEVENTS_0*)0)->TABLE_NAME), - sizeof(((sysTab_NDBEVENTS_0*)0)->ATTRIBUTE_MASK), - sizeof(((sysTab_NDBEVENTS_0*)0)->SUBID), - sizeof(((sysTab_NDBEVENTS_0*)0)->SUBKEY) -}; - -void -Dbdict::prepareTransactionEventSysTable (Callback *pcallback, - Signal* signal, - Uint32 senderData, - UtilPrepareReq::OperationTypeValue prepReq) -{ - // find table id for event system table - TableRecord keyRecord; - strcpy(keyRecord.tableName, EVENT_SYSTEM_TABLE_NAME); - - TableRecordPtr tablePtr; - c_tableRecordHash.find(tablePtr, keyRecord); - - ndbrequire(tablePtr.i != RNIL); // system table must exist - - Uint32 tableId = tablePtr.p->tableId; /* System table */ - Uint32 noAttr = tablePtr.p->noOfAttributes; - ndbrequire(noAttr == EVENT_SYSTEM_TABLE_LENGTH); - - switch (prepReq) { - case UtilPrepareReq::Update: - case UtilPrepareReq::Insert: - case UtilPrepareReq::Write: - case UtilPrepareReq::Read: - jam(); - break; - case UtilPrepareReq::Delete: - jam(); - noAttr = 1; // only involves Primary key which should be the first - break; - } - prepareUtilTransaction(pcallback, signal, senderData, tableId, NULL, - prepReq, noAttr, NULL, NULL); -} - -void -Dbdict::prepareUtilTransaction(Callback *pcallback, - Signal* signal, - Uint32 senderData, - Uint32 tableId, - const char* tableName, - UtilPrepareReq::OperationTypeValue prepReq, - Uint32 noAttr, - Uint32 attrIds[], - const char *attrNames[]) -{ - jam(); - EVENT_TRACE; - - UtilPrepareReq * utilPrepareReq = - (UtilPrepareReq *)signal->getDataPtrSend(); - - utilPrepareReq->setSenderRef(reference()); - utilPrepareReq->setSenderData(senderData); - - const Uint32 pageSizeInWords = 128; - Uint32 propPage[pageSizeInWords]; - LinearWriter w(&propPage[0],128); - w.first(); - w.add(UtilPrepareReq::NoOfOperations, 1); - w.add(UtilPrepareReq::OperationType, prepReq); - if (tableName) { - jam(); - w.add(UtilPrepareReq::TableName, tableName); - } else { - jam(); - w.add(UtilPrepareReq::TableId, tableId); - } - for(Uint32 i = 0; i < noAttr; i++) - if (tableName) { - jam(); - w.add(UtilPrepareReq::AttributeName, attrNames[i]); - } else { - if (attrIds) { - jam(); - w.add(UtilPrepareReq::AttributeId, attrIds[i]); - } else { - jam(); - w.add(UtilPrepareReq::AttributeId, i); - } - } -#ifdef EVENT_DEBUG - // Debugging - SimplePropertiesLinearReader reader(propPage, w.getWordsUsed()); - printf("Dict::prepareInsertTransactions: Sent SimpleProperties:\n"); - reader.printAll(ndbout); -#endif - - struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections]; - sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage; - sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed(); - - sendSignalUtilReq(pcallback, DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal, - UtilPrepareReq::SignalLength, JBB, - sectionsPtr, UtilPrepareReq::NoOfSections); -} - -/***************************************************************** - * - * CREATE_EVNT_REQ has three types RT_CREATE, RT_GET (from user) - * and RT_DICT_AFTER_GET send from master DICT to slaves - * - * This function just dscpaches these to - * - * createEvent_RT_USER_CREATE - * createEvent_RT_USER_GET - * createEvent_RT_DICT_AFTER_GET - * - * repectively - * - */ - -void -Dbdict::execCREATE_EVNT_REQ(Signal* signal) -{ - jamEntry(); - -#if 0 - { - SafeCounterHandle handle; - { - SafeCounter tmp(c_counterMgr, handle); - tmp.init<CreateEvntRef>(CMVMI, GSN_DUMP_STATE_ORD, /* senderData */ 13); - tmp.clearWaitingFor(); - tmp.setWaitingFor(3); - ndbrequire(!tmp.done()); - ndbout_c("Allocted"); - } - ndbrequire(!handle.done()); - { - SafeCounter tmp(c_counterMgr, handle); - tmp.clearWaitingFor(3); - ndbrequire(tmp.done()); - ndbout_c("Deallocted"); - } - ndbrequire(handle.done()); - } - { - NodeBitmask nodes; - nodes.clear(); - - nodes.set(2); - nodes.set(3); - nodes.set(4); - nodes.set(5); - - { - Uint32 i = 0; - while((i = nodes.find(i)) != NodeBitmask::NotFound){ - ndbout_c("1 Node id = %u", i); - i++; - } - } - - NodeReceiverGroup rg(DBDICT, nodes); - RequestTracker rt2; - ndbrequire(rt2.done()); - ndbrequire(!rt2.hasRef()); - ndbrequire(!rt2.hasConf()); - rt2.init<CreateEvntRef>(c_counterMgr, rg, GSN_CREATE_EVNT_REF, 13); - - RequestTracker rt3; - rt3.init<CreateEvntRef>(c_counterMgr, rg, GSN_CREATE_EVNT_REF, 13); - - ndbrequire(!rt2.done()); - ndbrequire(!rt3.done()); - - rt2.reportRef(c_counterMgr, 2); - rt3.reportConf(c_counterMgr, 2); - - ndbrequire(!rt2.done()); - ndbrequire(!rt3.done()); - - rt2.reportConf(c_counterMgr, 3); - rt3.reportConf(c_counterMgr, 3); - - ndbrequire(!rt2.done()); - ndbrequire(!rt3.done()); - - rt2.reportConf(c_counterMgr, 4); - rt3.reportConf(c_counterMgr, 4); - - ndbrequire(!rt2.done()); - ndbrequire(!rt3.done()); - - rt2.reportConf(c_counterMgr, 5); - rt3.reportConf(c_counterMgr, 5); - - ndbrequire(rt2.done()); - ndbrequire(rt3.done()); - } -#endif - - if (! assembleFragments(signal)) { - jam(); - return; - } - - CreateEvntReq *req = (CreateEvntReq*)signal->getDataPtr(); - const CreateEvntReq::RequestType requestType = req->getRequestType(); - const Uint32 requestFlag = req->getRequestFlag(); - - OpCreateEventPtr evntRecPtr; - // Seize a Create Event record - if (!c_opCreateEvent.seize(evntRecPtr)) { - // Failed to allocate event record - jam(); - releaseSections(signal); - - CreateEvntRef * ret = (CreateEvntRef *)signal->getDataPtrSend(); - ret->senderRef = reference(); - ret->setErrorCode(CreateEvntRef::SeizeError); - ret->setErrorLine(__LINE__); - ret->setErrorNode(reference()); - sendSignal(signal->senderBlockRef(), GSN_CREATE_EVNT_REF, signal, - CreateEvntRef::SignalLength, JBB); - return; - } - -#ifdef EVENT_DEBUG - ndbout_c("DBDICT::execCREATE_EVNT_REQ from %u evntRecId = (%d)", refToNode(signal->getSendersBlockRef()), evntRecPtr.i); -#endif - - ndbrequire(req->getUserRef() == signal->getSendersBlockRef()); - - evntRecPtr.p->init(req,this); - - if (requestFlag & (Uint32)CreateEvntReq::RT_DICT_AFTER_GET) { - jam(); - EVENT_TRACE; - createEvent_RT_DICT_AFTER_GET(signal, evntRecPtr); - return; - } - if (requestType == CreateEvntReq::RT_USER_GET) { - jam(); - EVENT_TRACE; - createEvent_RT_USER_GET(signal, evntRecPtr); - return; - } - if (requestType == CreateEvntReq::RT_USER_CREATE) { - jam(); - EVENT_TRACE; - createEvent_RT_USER_CREATE(signal, evntRecPtr); - return; - } - -#ifdef EVENT_DEBUG - ndbout << "Dbdict.cpp: Dbdict::execCREATE_EVNT_REQ other" << endl; -#endif - jam(); - releaseSections(signal); - - evntRecPtr.p->m_errorCode = CreateEvntRef::Undefined; - evntRecPtr.p->m_errorLine = __LINE__; - evntRecPtr.p->m_errorNode = reference(); - - createEvent_sendReply(signal, evntRecPtr); -} - -/******************************************************************** - * - * Event creation - * - *****************************************************************/ - -void -Dbdict::createEvent_RT_USER_CREATE(Signal* signal, OpCreateEventPtr evntRecPtr){ - jam(); - evntRecPtr.p->m_request.setUserRef(signal->senderBlockRef()); - -#ifdef EVENT_DEBUG - ndbout << "Dbdict.cpp: Dbdict::execCREATE_EVNT_REQ RT_USER" << endl; - char buf[128] = {0}; - AttributeMask mask = evntRecPtr.p->m_request.getAttrListBitmask(); - mask.getText(buf); - ndbout_c("mask = %s", buf); -#endif - - // Interpret the long signal - - SegmentedSectionPtr ssPtr; - // save name and event properties - signal->getSection(ssPtr, CreateEvntReq::EVENT_NAME_SECTION); - - SimplePropertiesSectionReader r0(ssPtr, getSectionSegmentPool()); -#ifdef EVENT_DEBUG - r0.printAll(ndbout); -#endif - // event name - if ((!r0.first()) || - (r0.getValueType() != SimpleProperties::StringValue) || - (r0.getValueLen() <= 0)) { - jam(); - releaseSections(signal); - - evntRecPtr.p->m_errorCode = CreateEvntRef::Undefined; - evntRecPtr.p->m_errorLine = __LINE__; - evntRecPtr.p->m_errorNode = reference(); - - createEvent_sendReply(signal, evntRecPtr); - return; - } - r0.getString(evntRecPtr.p->m_eventRec.NAME); - { - int len = strlen(evntRecPtr.p->m_eventRec.NAME); - memset(evntRecPtr.p->m_eventRec.NAME+len, 0, MAX_TAB_NAME_SIZE-len); -#ifdef EVENT_DEBUG - printf("CreateEvntReq::RT_USER_CREATE; EventName %s, len %u\n", - evntRecPtr.p->m_eventRec.NAME, len); - for(int i = 0; i < MAX_TAB_NAME_SIZE/4; i++) - printf("H'%.8x ", ((Uint32*)evntRecPtr.p->m_eventRec.NAME)[i]); - printf("\n"); -#endif - } - // table name - if ((!r0.next()) || - (r0.getValueType() != SimpleProperties::StringValue) || - (r0.getValueLen() <= 0)) { - jam(); - releaseSections(signal); - - evntRecPtr.p->m_errorCode = CreateEvntRef::Undefined; - evntRecPtr.p->m_errorLine = __LINE__; - evntRecPtr.p->m_errorNode = reference(); - - createEvent_sendReply(signal, evntRecPtr); - return; - } - r0.getString(evntRecPtr.p->m_eventRec.TABLE_NAME); - { - int len = strlen(evntRecPtr.p->m_eventRec.TABLE_NAME); - memset(evntRecPtr.p->m_eventRec.TABLE_NAME+len, 0, MAX_TAB_NAME_SIZE-len); - } - -#ifdef EVENT_DEBUG - ndbout_c("event name: %s",evntRecPtr.p->m_eventRec.NAME); - ndbout_c("table name: %s",evntRecPtr.p->m_eventRec.TABLE_NAME); -#endif - - releaseSections(signal); - - // Send request to SUMA - - CreateSubscriptionIdReq * sumaIdReq = - (CreateSubscriptionIdReq *)signal->getDataPtrSend(); - - // make sure we save the original sender for later - sumaIdReq->senderData = evntRecPtr.i; -#ifdef EVENT_DEBUG - ndbout << "sumaIdReq->senderData = " << sumaIdReq->senderData << endl; -#endif - sendSignal(SUMA_REF, GSN_CREATE_SUBID_REQ, signal, - CreateSubscriptionIdReq::SignalLength, JBB); - // we should now return in either execCREATE_SUBID_CONF - // or execCREATE_SUBID_REF -} - -void Dbdict::execCREATE_SUBID_REF(Signal* signal) -{ - jamEntry(); - EVENT_TRACE; - CreateSubscriptionIdRef * const ref = - (CreateSubscriptionIdRef *)signal->getDataPtr(); - OpCreateEventPtr evntRecPtr; - - evntRecPtr.i = ref->senderData; - ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL); - - evntRecPtr.p->m_errorCode = CreateEvntRef::Undefined; - evntRecPtr.p->m_errorLine = __LINE__; - evntRecPtr.p->m_errorNode = reference(); - - createEvent_sendReply(signal, evntRecPtr); -} - -void Dbdict::execCREATE_SUBID_CONF(Signal* signal) -{ - jamEntry(); - EVENT_TRACE; - - CreateSubscriptionIdConf const * sumaIdConf = - (CreateSubscriptionIdConf *)signal->getDataPtr(); - - Uint32 evntRecId = sumaIdConf->senderData; - OpCreateEvent *evntRec; - - ndbrequire((evntRec = c_opCreateEvent.getPtr(evntRecId)) != NULL); - - evntRec->m_request.setEventId(sumaIdConf->subscriptionId); - evntRec->m_request.setEventKey(sumaIdConf->subscriptionKey); - - releaseSections(signal); - - Callback c = { safe_cast(&Dbdict::createEventUTIL_PREPARE), 0 }; - - prepareTransactionEventSysTable(&c, signal, evntRecId, - UtilPrepareReq::Insert); -} - -void -Dbdict::createEventComplete_RT_USER_CREATE(Signal* signal, - OpCreateEventPtr evntRecPtr){ - jam(); - createEvent_sendReply(signal, evntRecPtr); -} - -/********************************************************************* - * - * UTIL_PREPARE, UTIL_EXECUTE - * - * insert or read systable NDB$EVENTS_0 - */ - -void interpretUtilPrepareErrorCode(UtilPrepareRef::ErrorCode errorCode, - bool& temporary, Uint32& line) -{ - switch (errorCode) { - case UtilPrepareRef::NO_ERROR: - jam(); - line = __LINE__; - EVENT_TRACE; - break; - case UtilPrepareRef::PREPARE_SEIZE_ERROR: - jam(); - temporary = true; - line = __LINE__; - EVENT_TRACE; - break; - case UtilPrepareRef::PREPARE_PAGES_SEIZE_ERROR: - jam(); - line = __LINE__; - EVENT_TRACE; - break; - case UtilPrepareRef::PREPARED_OPERATION_SEIZE_ERROR: - jam(); - line = __LINE__; - EVENT_TRACE; - break; - case UtilPrepareRef::DICT_TAB_INFO_ERROR: - jam(); - line = __LINE__; - EVENT_TRACE; - break; - case UtilPrepareRef::MISSING_PROPERTIES_SECTION: - jam(); - line = __LINE__; - EVENT_TRACE; - break; - default: - jam(); - line = __LINE__; - EVENT_TRACE; - break; - } -} - -void -Dbdict::createEventUTIL_PREPARE(Signal* signal, - Uint32 callbackData, - Uint32 returnCode) -{ - jam(); - EVENT_TRACE; - if (returnCode == 0) { - UtilPrepareConf* const req = (UtilPrepareConf*)signal->getDataPtr(); - OpCreateEventPtr evntRecPtr; - jam(); - evntRecPtr.i = req->getSenderData(); - const Uint32 prepareId = req->getPrepareId(); - - ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL); - - Callback c = { safe_cast(&Dbdict::createEventUTIL_EXECUTE), 0 }; - - switch (evntRecPtr.p->m_requestType) { - case CreateEvntReq::RT_USER_GET: -#ifdef EVENT_DEBUG - printf("get type = %d\n", CreateEvntReq::RT_USER_GET); -#endif - jam(); - executeTransEventSysTable(&c, signal, - evntRecPtr.i, evntRecPtr.p->m_eventRec, - prepareId, UtilPrepareReq::Read); - break; - case CreateEvntReq::RT_USER_CREATE: -#ifdef EVENT_DEBUG - printf("create type = %d\n", CreateEvntReq::RT_USER_CREATE); -#endif - { - evntRecPtr.p->m_eventRec.EVENT_TYPE = evntRecPtr.p->m_request.getEventType(); - AttributeMask m = evntRecPtr.p->m_request.getAttrListBitmask(); - memcpy(evntRecPtr.p->m_eventRec.ATTRIBUTE_MASK, &m, - sizeof(evntRecPtr.p->m_eventRec.ATTRIBUTE_MASK)); - evntRecPtr.p->m_eventRec.SUBID = evntRecPtr.p->m_request.getEventId(); - evntRecPtr.p->m_eventRec.SUBKEY = evntRecPtr.p->m_request.getEventKey(); - } - jam(); - executeTransEventSysTable(&c, signal, - evntRecPtr.i, evntRecPtr.p->m_eventRec, - prepareId, UtilPrepareReq::Insert); - break; - default: -#ifdef EVENT_DEBUG - printf("type = %d\n", evntRecPtr.p->m_requestType); - printf("bet type = %d\n", CreateEvntReq::RT_USER_GET); - printf("create type = %d\n", CreateEvntReq::RT_USER_CREATE); -#endif - ndbrequire(false); - } - } else { // returnCode != 0 - UtilPrepareRef* const ref = (UtilPrepareRef*)signal->getDataPtr(); - - const UtilPrepareRef::ErrorCode errorCode = - (UtilPrepareRef::ErrorCode)ref->getErrorCode(); - - OpCreateEventPtr evntRecPtr; - evntRecPtr.i = ref->getSenderData(); - ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL); - - bool temporary = false; - interpretUtilPrepareErrorCode(errorCode, - temporary, evntRecPtr.p->m_errorLine); - if (temporary) { - evntRecPtr.p->m_errorCode = - CreateEvntRef::makeTemporary(CreateEvntRef::Undefined); - } - - if (evntRecPtr.p->m_errorCode == 0) { - evntRecPtr.p->m_errorCode = CreateEvntRef::Undefined; - } - evntRecPtr.p->m_errorNode = reference(); - - createEvent_sendReply(signal, evntRecPtr); - } -} - -void Dbdict::executeTransEventSysTable(Callback *pcallback, Signal *signal, - const Uint32 ptrI, - sysTab_NDBEVENTS_0& m_eventRec, - const Uint32 prepareId, - UtilPrepareReq::OperationTypeValue prepReq) -{ - jam(); - const Uint32 noAttr = EVENT_SYSTEM_TABLE_LENGTH; - Uint32 total_len = 0; - - Uint32* attrHdr = signal->theData + 25; - Uint32* attrPtr = attrHdr; - - Uint32 id=0; - // attribute 0 event name: Primary Key - { - AttributeHeader::init(attrPtr, id, sysTab_NDBEVENTS_0_szs[id]/4); - total_len += sysTab_NDBEVENTS_0_szs[id]; - attrPtr++; id++; - } - - switch (prepReq) { - case UtilPrepareReq::Read: - jam(); - EVENT_TRACE; - // no more - while ( id < noAttr ) - AttributeHeader::init(attrPtr++, id++, 0); - ndbrequire(id == (Uint32) noAttr); - break; - case UtilPrepareReq::Insert: - jam(); - EVENT_TRACE; - while ( id < noAttr ) { - AttributeHeader::init(attrPtr, id, sysTab_NDBEVENTS_0_szs[id]/4); - total_len += sysTab_NDBEVENTS_0_szs[id]; - attrPtr++; id++; - } - ndbrequire(id == (Uint32) noAttr); - break; - case UtilPrepareReq::Delete: - ndbrequire(id == 1); - break; - default: - ndbrequire(false); - } - - LinearSectionPtr headerPtr; - LinearSectionPtr dataPtr; - - headerPtr.p = attrHdr; - headerPtr.sz = noAttr; - - dataPtr.p = (Uint32*)&m_eventRec; - dataPtr.sz = total_len/4; - - ndbrequire((total_len == sysTab_NDBEVENTS_0_szs[0]) || - (total_len == sizeof(sysTab_NDBEVENTS_0))); - -#if 0 - printf("Header size %u\n", headerPtr.sz); - for(int i = 0; i < (int)headerPtr.sz; i++) - printf("H'%.8x ", attrHdr[i]); - printf("\n"); - - printf("Data size %u\n", dataPtr.sz); - for(int i = 0; i < (int)dataPtr.sz; i++) - printf("H'%.8x ", dataPage[i]); - printf("\n"); -#endif - - executeTransaction(pcallback, signal, - ptrI, - prepareId, - id, - headerPtr, - dataPtr); -} - -void Dbdict::executeTransaction(Callback *pcallback, - Signal* signal, - Uint32 senderData, - Uint32 prepareId, - Uint32 noAttr, - LinearSectionPtr headerPtr, - LinearSectionPtr dataPtr) -{ - jam(); - EVENT_TRACE; - - UtilExecuteReq * utilExecuteReq = - (UtilExecuteReq *)signal->getDataPtrSend(); - - utilExecuteReq->setSenderRef(reference()); - utilExecuteReq->setSenderData(senderData); - utilExecuteReq->setPrepareId(prepareId); - utilExecuteReq->setReleaseFlag(); // must be done after setting prepareId - -#if 0 - printf("Header size %u\n", headerPtr.sz); - for(int i = 0; i < (int)headerPtr.sz; i++) - printf("H'%.8x ", headerBuffer[i]); - printf("\n"); - - printf("Data size %u\n", dataPtr.sz); - for(int i = 0; i < (int)dataPtr.sz; i++) - printf("H'%.8x ", dataBuffer[i]); - printf("\n"); -#endif - - struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections]; - sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerPtr.p; - sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz = noAttr; - sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataPtr.p; - sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz; - - sendSignalUtilReq(pcallback, DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal, - UtilExecuteReq::SignalLength, JBB, - sectionsPtr, UtilExecuteReq::NoOfSections); -} - -void Dbdict::parseReadEventSys(Signal* signal, sysTab_NDBEVENTS_0& m_eventRec) -{ - SegmentedSectionPtr headerPtr, dataPtr; - jam(); - signal->getSection(headerPtr, UtilExecuteReq::HEADER_SECTION); - SectionReader headerReader(headerPtr, getSectionSegmentPool()); - - signal->getSection(dataPtr, UtilExecuteReq::DATA_SECTION); - SectionReader dataReader(dataPtr, getSectionSegmentPool()); - - AttributeHeader header; - Uint32 *dst = (Uint32*)&m_eventRec; - - for (int i = 0; i < EVENT_SYSTEM_TABLE_LENGTH; i++) { - headerReader.getWord((Uint32 *)&header); - int sz = header.getDataSize(); - for (int i=0; i < sz; i++) - dataReader.getWord(dst++); - } - - ndbrequire( ((char*)dst-(char*)&m_eventRec) == sizeof(m_eventRec) ); - - releaseSections(signal); -} - -void Dbdict::createEventUTIL_EXECUTE(Signal *signal, - Uint32 callbackData, - Uint32 returnCode) -{ - jam(); - EVENT_TRACE; - if (returnCode == 0) { - // Entry into system table all set - UtilExecuteConf* const conf = (UtilExecuteConf*)signal->getDataPtr(); - jam(); - OpCreateEventPtr evntRecPtr; - evntRecPtr.i = conf->getSenderData(); - - ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL); - OpCreateEvent *evntRec = evntRecPtr.p; - - switch (evntRec->m_requestType) { - case CreateEvntReq::RT_USER_GET: { -#ifdef EVENT_DEBUG - printf("get type = %d\n", CreateEvntReq::RT_USER_GET); -#endif - parseReadEventSys(signal, evntRecPtr.p->m_eventRec); - - evntRec->m_request.setEventType(evntRecPtr.p->m_eventRec.EVENT_TYPE); - evntRec->m_request.setAttrListBitmask(*(AttributeMask*)evntRecPtr.p->m_eventRec.ATTRIBUTE_MASK); - evntRec->m_request.setEventId(evntRecPtr.p->m_eventRec.SUBID); - evntRec->m_request.setEventKey(evntRecPtr.p->m_eventRec.SUBKEY); - -#ifdef EVENT_DEBUG - printf("EventName: %s\n", evntRec->m_eventRec.NAME); - printf("TableName: %s\n", evntRec->m_eventRec.TABLE_NAME); -#endif - - // find table id for event table - TableRecord keyRecord; - strcpy(keyRecord.tableName, evntRecPtr.p->m_eventRec.TABLE_NAME); - - TableRecordPtr tablePtr; - c_tableRecordHash.find(tablePtr, keyRecord); - - if (tablePtr.i == RNIL) { - jam(); - evntRecPtr.p->m_errorCode = CreateEvntRef::Undefined; - evntRecPtr.p->m_errorLine = __LINE__; - evntRecPtr.p->m_errorNode = reference(); - - createEvent_sendReply(signal, evntRecPtr); - return; - } - - evntRec->m_request.setTableId(tablePtr.p->tableId); - - createEventComplete_RT_USER_GET(signal, evntRecPtr); - return; - } - case CreateEvntReq::RT_USER_CREATE: { -#ifdef EVENT_DEBUG - printf("create type = %d\n", CreateEvntReq::RT_USER_CREATE); -#endif - jam(); - createEventComplete_RT_USER_CREATE(signal, evntRecPtr); - return; - } - break; - default: - ndbrequire(false); - } - } else { // returnCode != 0 - UtilExecuteRef * const ref = (UtilExecuteRef *)signal->getDataPtr(); - OpCreateEventPtr evntRecPtr; - evntRecPtr.i = ref->getSenderData(); - ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL); - jam(); - evntRecPtr.p->m_errorNode = reference(); - evntRecPtr.p->m_errorLine = __LINE__; - - switch (ref->getErrorCode()) { - case UtilExecuteRef::TCError: - switch (ref->getTCErrorCode()) { - case ZNOT_FOUND: - jam(); - evntRecPtr.p->m_errorCode = CreateEvntRef::EventNotFound; - break; - case ZALREADYEXIST: - jam(); - evntRecPtr.p->m_errorCode = CreateEvntRef::EventNameExists; - break; - default: - jam(); - evntRecPtr.p->m_errorCode = CreateEvntRef::UndefinedTCError; - break; - } - break; - default: - jam(); - evntRecPtr.p->m_errorCode = CreateEvntRef::Undefined; - break; - } - - createEvent_sendReply(signal, evntRecPtr); - } -} - -/*********************************************************************** - * - * NdbEventOperation, reading systable, creating event in suma - * - */ - -void -Dbdict::createEvent_RT_USER_GET(Signal* signal, OpCreateEventPtr evntRecPtr){ - jam(); - EVENT_TRACE; -#ifdef EVENT_PH2_DEBUG - ndbout_c("DBDICT(Coordinator) got GSN_CREATE_EVNT_REQ::RT_USER_GET evntRecPtr.i = (%d), ref = %u", evntRecPtr.i, evntRecPtr.p->m_request.getUserRef()); -#endif - - SegmentedSectionPtr ssPtr; - - signal->getSection(ssPtr, 0); - - SimplePropertiesSectionReader r0(ssPtr, getSectionSegmentPool()); -#ifdef EVENT_DEBUG - r0.printAll(ndbout); -#endif - if ((!r0.first()) || - (r0.getValueType() != SimpleProperties::StringValue) || - (r0.getValueLen() <= 0)) { - jam(); - releaseSections(signal); - - evntRecPtr.p->m_errorCode = CreateEvntRef::Undefined; - evntRecPtr.p->m_errorLine = __LINE__; - evntRecPtr.p->m_errorNode = reference(); - - createEvent_sendReply(signal, evntRecPtr); - return; - } - - r0.getString(evntRecPtr.p->m_eventRec.NAME); - int len = strlen(evntRecPtr.p->m_eventRec.NAME); - memset(evntRecPtr.p->m_eventRec.NAME+len, 0, MAX_TAB_NAME_SIZE-len); - - releaseSections(signal); - - Callback c = { safe_cast(&Dbdict::createEventUTIL_PREPARE), 0 }; - - prepareTransactionEventSysTable(&c, signal, evntRecPtr.i, - UtilPrepareReq::Read); - /* - * Will read systable and fill an OpCreateEventPtr - * and return below - */ -} - -void -Dbdict::createEventComplete_RT_USER_GET(Signal* signal, - OpCreateEventPtr evntRecPtr){ - jam(); - - // Send to oneself and the other DICT's - CreateEvntReq * req = (CreateEvntReq *)signal->getDataPtrSend(); - - *req = evntRecPtr.p->m_request; - req->senderRef = reference(); - req->senderData = evntRecPtr.i; - - req->addRequestFlag(CreateEvntReq::RT_DICT_AFTER_GET); - -#ifdef EVENT_PH2_DEBUG - ndbout_c("DBDICT(Coordinator) sending GSN_CREATE_EVNT_REQ::RT_DICT_AFTER_GET to DBDICT participants evntRecPtr.i = (%d)", evntRecPtr.i); -#endif - - NodeReceiverGroup rg(DBDICT, c_aliveNodes); - RequestTracker & p = evntRecPtr.p->m_reqTracker; - p.init<CreateEvntRef>(c_counterMgr, rg, GSN_CREATE_EVNT_REF, evntRecPtr.i); - - sendSignal(rg, GSN_CREATE_EVNT_REQ, signal, CreateEvntReq::SignalLength, JBB); -} - -void -Dbdict::createEvent_nodeFailCallback(Signal* signal, Uint32 eventRecPtrI, - Uint32 returnCode){ - OpCreateEventPtr evntRecPtr; - c_opCreateEvent.getPtr(evntRecPtr, eventRecPtrI); - createEvent_sendReply(signal, evntRecPtr); -} - -void Dbdict::execCREATE_EVNT_REF(Signal* signal) -{ - jamEntry(); - EVENT_TRACE; - CreateEvntRef * const ref = (CreateEvntRef *)signal->getDataPtr(); - OpCreateEventPtr evntRecPtr; - - evntRecPtr.i = ref->getUserData(); - - ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL); - -#ifdef EVENT_PH2_DEBUG - ndbout_c("DBDICT(Coordinator) got GSN_CREATE_EVNT_REF evntRecPtr.i = (%d)", evntRecPtr.i); -#endif - - if (ref->errorCode == CreateEvntRef::NF_FakeErrorREF){ - jam(); - evntRecPtr.p->m_reqTracker.ignoreRef(c_counterMgr, refToNode(ref->senderRef)); - } else { - jam(); - evntRecPtr.p->m_reqTracker.reportRef(c_counterMgr, refToNode(ref->senderRef)); - } - createEvent_sendReply(signal, evntRecPtr); - - return; -} - -void Dbdict::execCREATE_EVNT_CONF(Signal* signal) -{ - jamEntry(); - EVENT_TRACE; - CreateEvntConf * const conf = (CreateEvntConf *)signal->getDataPtr(); - OpCreateEventPtr evntRecPtr; - - evntRecPtr.i = conf->getUserData(); - - ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL); - -#ifdef EVENT_PH2_DEBUG - ndbout_c("DBDICT(Coordinator) got GSN_CREATE_EVNT_CONF evntRecPtr.i = (%d)", evntRecPtr.i); -#endif - - evntRecPtr.p->m_reqTracker.reportConf(c_counterMgr, refToNode(conf->senderRef)); - - // we will only have a valid tablename if it the master DICT sending this - // but that's ok - LinearSectionPtr ptr[1]; - ptr[0].p = (Uint32 *)evntRecPtr.p->m_eventRec.TABLE_NAME; - ptr[0].sz = - (strlen(evntRecPtr.p->m_eventRec.TABLE_NAME)+4)/4; // to make sure we have a null - - createEvent_sendReply(signal, evntRecPtr, ptr, 1); - - return; -} - -/************************************************ - * - * Participant stuff - * - */ - -void -Dbdict::createEvent_RT_DICT_AFTER_GET(Signal* signal, OpCreateEventPtr evntRecPtr){ - jam(); - evntRecPtr.p->m_request.setUserRef(signal->senderBlockRef()); - -#ifdef EVENT_PH2_DEBUG - ndbout_c("DBDICT(Participant) got CREATE_EVNT_REQ::RT_DICT_AFTER_GET evntRecPtr.i = (%d)", evntRecPtr.i); -#endif - - // the signal comes from the DICT block that got the first user request! - // This code runs on all DICT nodes, including oneself - - // Seize a Create Event record, the Coordinator will now have two seized - // but that's ok, it's like a recursion - - SubCreateReq * sumaReq = (SubCreateReq *)signal->getDataPtrSend(); - - sumaReq->subscriberRef = reference(); // reference to DICT - sumaReq->subscriberData = evntRecPtr.i; - sumaReq->subscriptionId = evntRecPtr.p->m_request.getEventId(); - sumaReq->subscriptionKey = evntRecPtr.p->m_request.getEventKey(); - sumaReq->subscriptionType = SubCreateReq::TableEvent; - sumaReq->tableId = evntRecPtr.p->m_request.getTableId(); - -#ifdef EVENT_PH2_DEBUG - ndbout_c("sending GSN_SUB_CREATE_REQ"); -#endif - - sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ, signal, - SubCreateReq::SignalLength+1 /*to get table Id*/, JBB); -} - -void Dbdict::execSUB_CREATE_REF(Signal* signal) -{ - jamEntry(); - EVENT_TRACE; - SubCreateRef * const ref = (SubCreateRef *)signal->getDataPtr(); - OpCreateEventPtr evntRecPtr; - - evntRecPtr.i = ref->subscriberData; - ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL); - -#ifdef EVENT_PH2_DEBUG - ndbout_c("DBDICT(Participant) got SUB_CREATE_REF evntRecPtr.i = (%d)", evntRecPtr.i); -#endif - - if (ref->err == GrepError::SUBSCRIPTION_ID_NOT_UNIQUE) { - jam(); -#ifdef EVENT_PH2_DEBUG - ndbout_c("SUBSCRIPTION_ID_NOT_UNIQUE"); -#endif - createEvent_sendReply(signal, evntRecPtr); - return; - } - -#ifdef EVENT_PH2_DEBUG - ndbout_c("Other error"); -#endif - - evntRecPtr.p->m_errorCode = CreateEvntRef::Undefined; - evntRecPtr.p->m_errorLine = __LINE__; - evntRecPtr.p->m_errorNode = reference(); - - createEvent_sendReply(signal, evntRecPtr); -} - -void Dbdict::execSUB_CREATE_CONF(Signal* signal) -{ - jamEntry(); - EVENT_TRACE; - - SubCreateConf * const sumaConf = (SubCreateConf *)signal->getDataPtr(); - - const Uint32 subscriptionId = sumaConf->subscriptionId; - const Uint32 subscriptionKey = sumaConf->subscriptionKey; - const Uint32 evntRecId = sumaConf->subscriberData; - - OpCreateEvent *evntRec; - ndbrequire((evntRec = c_opCreateEvent.getPtr(evntRecId)) != NULL); - -#ifdef EVENT_PH2_DEBUG - ndbout_c("DBDICT(Participant) got SUB_CREATE_CONF evntRecPtr.i = (%d)", evntRecId); -#endif - - SubSyncReq *sumaSync = (SubSyncReq *)signal->getDataPtrSend(); - - sumaSync->subscriptionId = subscriptionId; - sumaSync->subscriptionKey = subscriptionKey; - sumaSync->part = (Uint32) SubscriptionData::MetaData; - sumaSync->subscriberData = evntRecId; - - sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, signal, - SubSyncReq::SignalLength, JBB); -} - -void Dbdict::execSUB_SYNC_REF(Signal* signal) -{ - jamEntry(); - EVENT_TRACE; - SubSyncRef * const ref = (SubSyncRef *)signal->getDataPtr(); - OpCreateEventPtr evntRecPtr; - - evntRecPtr.i = ref->subscriberData; - ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL); - - evntRecPtr.p->m_errorCode = CreateEvntRef::Undefined; - evntRecPtr.p->m_errorLine = __LINE__; - evntRecPtr.p->m_errorNode = reference(); - - createEvent_sendReply(signal, evntRecPtr); -} - -void Dbdict::execSUB_SYNC_CONF(Signal* signal) -{ - jamEntry(); - EVENT_TRACE; - - SubSyncConf * const sumaSyncConf = (SubSyncConf *)signal->getDataPtr(); - - // Uint32 subscriptionId = sumaSyncConf->subscriptionId; - // Uint32 subscriptionKey = sumaSyncConf->subscriptionKey; - OpCreateEventPtr evntRecPtr; - - evntRecPtr.i = sumaSyncConf->subscriberData; - ndbrequire((evntRecPtr.p = c_opCreateEvent.getPtr(evntRecPtr.i)) != NULL); - - ndbrequire(sumaSyncConf->part == (Uint32)SubscriptionData::MetaData); - - createEvent_sendReply(signal, evntRecPtr); -} - -/**************************************************** - * - * common create reply method - * - *******************************************************/ - -void Dbdict::createEvent_sendReply(Signal* signal, - OpCreateEventPtr evntRecPtr, - LinearSectionPtr *ptr, int noLSP) -{ - jam(); - EVENT_TRACE; - - // check if we're ready to sent reply - // if we are the master dict we might be waiting for conf/ref - - if (!evntRecPtr.p->m_reqTracker.done()) { - jam(); - return; // there's more to come - } - - if (evntRecPtr.p->m_reqTracker.hasRef()) { - ptr = NULL; // we don't want to return anything if there's an error - if (!evntRecPtr.p->hasError()) { - evntRecPtr.p->m_errorCode = CreateEvntRef::Undefined; - evntRecPtr.p->m_errorLine = __LINE__; - evntRecPtr.p->m_errorNode = reference(); - jam(); - } else - jam(); - } - - // reference to API if master DICT - // else reference to master DICT - Uint32 senderRef = evntRecPtr.p->m_request.getUserRef(); - Uint32 signalLength; - Uint32 gsn; - - if (evntRecPtr.p->hasError()) { - jam(); - EVENT_TRACE; - CreateEvntRef * ret = (CreateEvntRef *)signal->getDataPtrSend(); - - ret->setEventId(evntRecPtr.p->m_request.getEventId()); - ret->setEventKey(evntRecPtr.p->m_request.getEventKey()); - ret->setUserData(evntRecPtr.p->m_request.getUserData()); - ret->senderRef = reference(); - ret->setTableId(evntRecPtr.p->m_request.getTableId()); - ret->setEventType(evntRecPtr.p->m_request.getEventType()); - ret->setRequestType(evntRecPtr.p->m_request.getRequestType()); - - ret->setErrorCode(evntRecPtr.p->m_errorCode); - ret->setErrorLine(evntRecPtr.p->m_errorLine); - ret->setErrorNode(evntRecPtr.p->m_errorNode); - - signalLength = CreateEvntRef::SignalLength; -#ifdef EVENT_PH2_DEBUG - ndbout_c("DBDICT sending GSN_CREATE_EVNT_REF to evntRecPtr.i = (%d) node = %u ref = %u", evntRecPtr.i, refToNode(senderRef), senderRef); - ndbout_c("errorCode = %u", evntRecPtr.p->m_errorCode); - ndbout_c("errorLine = %u", evntRecPtr.p->m_errorLine); -#endif - gsn = GSN_CREATE_EVNT_REF; - - } else { - jam(); - EVENT_TRACE; - CreateEvntConf * evntConf = (CreateEvntConf *)signal->getDataPtrSend(); - - evntConf->setEventId(evntRecPtr.p->m_request.getEventId()); - evntConf->setEventKey(evntRecPtr.p->m_request.getEventKey()); - evntConf->setUserData(evntRecPtr.p->m_request.getUserData()); - evntConf->senderRef = reference(); - evntConf->setTableId(evntRecPtr.p->m_request.getTableId()); - evntConf->setAttrListBitmask(evntRecPtr.p->m_request.getAttrListBitmask()); - evntConf->setEventType(evntRecPtr.p->m_request.getEventType()); - evntConf->setRequestType(evntRecPtr.p->m_request.getRequestType()); - - signalLength = CreateEvntConf::SignalLength; -#ifdef EVENT_PH2_DEBUG - ndbout_c("DBDICT sending GSN_CREATE_EVNT_CONF to evntRecPtr.i = (%d) node = %u ref = %u", evntRecPtr.i, refToNode(senderRef), senderRef); -#endif - gsn = GSN_CREATE_EVNT_CONF; - } - - if (ptr) { - jam(); - sendSignal(senderRef, gsn, signal, signalLength, JBB, ptr, noLSP); - } else { - jam(); - sendSignal(senderRef, gsn, signal, signalLength, JBB); - } - - c_opCreateEvent.release(evntRecPtr); -} - -/*************************************************************/ - -/******************************************************************** - * - * Start event - * - *******************************************************************/ - -void Dbdict::execSUB_START_REQ(Signal* signal) -{ - jamEntry(); - - Uint32 origSenderRef = signal->senderBlockRef(); - - OpSubEventPtr subbPtr; - if (!c_opSubEvent.seize(subbPtr)) { - SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend(); - { // fix - Uint32 subcriberRef = ((SubStartReq*)signal->getDataPtr())->subscriberRef; - ref->subscriberRef = subcriberRef; - } - jam(); - // ret->setErrorCode(SubStartRef::SeizeError); - // ret->setErrorLine(__LINE__); - // ret->setErrorNode(reference()); - ref->senderRef = reference(); - ref->setTemporary(SubStartRef::Busy); - - sendSignal(origSenderRef, GSN_SUB_START_REF, signal, - SubStartRef::SignalLength2, JBB); - return; - } - - { - const SubStartReq* req = (SubStartReq*) signal->getDataPtr(); - subbPtr.p->m_senderRef = req->senderRef; - subbPtr.p->m_senderData = req->senderData; - subbPtr.p->m_errorCode = 0; - } - - if (refToBlock(origSenderRef) != DBDICT) { - /* - * Coordinator - */ - jam(); - - subbPtr.p->m_senderRef = origSenderRef; // not sure if API sets correctly - NodeReceiverGroup rg(DBDICT, c_aliveNodes); - RequestTracker & p = subbPtr.p->m_reqTracker; - p.init<SubStartRef>(c_counterMgr, rg, GSN_SUB_START_REF, subbPtr.i); - - SubStartReq* req = (SubStartReq*) signal->getDataPtrSend(); - - req->senderRef = reference(); - req->senderData = subbPtr.i; - -#ifdef EVENT_PH3_DEBUG - ndbout_c("DBDICT(Coordinator) sending GSN_SUB_START_REQ to DBDICT participants subbPtr.i = (%d)", subbPtr.i); -#endif - - sendSignal(rg, GSN_SUB_START_REQ, signal, SubStartReq::SignalLength2, JBB); - return; - } - /* - * Participant - */ - ndbrequire(refToBlock(origSenderRef) == DBDICT); - - { - SubStartReq* req = (SubStartReq*) signal->getDataPtrSend(); - - req->senderRef = reference(); - req->senderData = subbPtr.i; - -#ifdef EVENT_PH3_DEBUG - ndbout_c("DBDICT(Participant) sending GSN_SUB_START_REQ to SUMA subbPtr.i = (%d)", subbPtr.i); -#endif - sendSignal(SUMA_REF, GSN_SUB_START_REQ, signal, SubStartReq::SignalLength2, JBB); - } -} - -void Dbdict::execSUB_START_REF(Signal* signal) -{ - jamEntry(); - - const SubStartRef* ref = (SubStartRef*) signal->getDataPtr(); - Uint32 senderRef = ref->senderRef; - - OpSubEventPtr subbPtr; - c_opSubEvent.getPtr(subbPtr, ref->senderData); - - if (refToBlock(senderRef) == SUMA) { - /* - * Participant - */ - jam(); - -#ifdef EVENT_PH3_DEBUG - ndbout_c("DBDICT(Participant) got GSN_SUB_START_REF = (%d)", subbPtr.i); -#endif - - if (ref->isTemporary()){ - jam(); - SubStartReq* req = (SubStartReq*)signal->getDataPtrSend(); - { // fix - Uint32 subscriberRef = ref->subscriberRef; - req->subscriberRef = subscriberRef; - } - req->senderRef = reference(); - req->senderData = subbPtr.i; - sendSignal(SUMA_REF, GSN_SUB_START_REQ, - signal, SubStartReq::SignalLength2, JBB); - } else { - jam(); - - SubStartRef* ref = (SubStartRef*) signal->getDataPtrSend(); - ref->senderRef = reference(); - ref->senderData = subbPtr.p->m_senderData; - sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_REF, - signal, SubStartRef::SignalLength2, JBB); - c_opSubEvent.release(subbPtr); - } - return; - } - /* - * Coordinator - */ - ndbrequire(refToBlock(senderRef) == DBDICT); -#ifdef EVENT_PH3_DEBUG - ndbout_c("DBDICT(Coordinator) got GSN_SUB_START_REF = (%d)", subbPtr.i); -#endif - if (ref->errorCode == SubStartRef::NF_FakeErrorREF){ - jam(); - subbPtr.p->m_reqTracker.ignoreRef(c_counterMgr, refToNode(senderRef)); - } else { - jam(); - subbPtr.p->m_reqTracker.reportRef(c_counterMgr, refToNode(senderRef)); - } - completeSubStartReq(signal,subbPtr.i,0); -} - -void Dbdict::execSUB_START_CONF(Signal* signal) -{ - jamEntry(); - - const SubStartConf* conf = (SubStartConf*) signal->getDataPtr(); - Uint32 senderRef = conf->senderRef; - - OpSubEventPtr subbPtr; - c_opSubEvent.getPtr(subbPtr, conf->senderData); - - if (refToBlock(senderRef) == SUMA) { - /* - * Participant - */ - jam(); - SubStartConf* conf = (SubStartConf*) signal->getDataPtrSend(); - -#ifdef EVENT_PH3_DEBUG - ndbout_c("DBDICT(Participant) got GSN_SUB_START_CONF = (%d)", subbPtr.i); -#endif - - conf->senderRef = reference(); - conf->senderData = subbPtr.p->m_senderData; - - sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_CONF, - signal, SubStartConf::SignalLength2, JBB); - c_opSubEvent.release(subbPtr); - return; - } - /* - * Coordinator - */ - ndbrequire(refToBlock(senderRef) == DBDICT); -#ifdef EVENT_PH3_DEBUG - ndbout_c("DBDICT(Coordinator) got GSN_SUB_START_CONF = (%d)", subbPtr.i); -#endif - subbPtr.p->m_reqTracker.reportConf(c_counterMgr, refToNode(senderRef)); - completeSubStartReq(signal,subbPtr.i,0); -} - -/* - * Coordinator - */ -void Dbdict::completeSubStartReq(Signal* signal, - Uint32 ptrI, - Uint32 returnCode){ - jam(); - - OpSubEventPtr subbPtr; - c_opSubEvent.getPtr(subbPtr, ptrI); - - if (!subbPtr.p->m_reqTracker.done()){ - jam(); - return; - } - - if (subbPtr.p->m_reqTracker.hasRef()) { - jam(); -#ifdef EVENT_DEBUG - ndbout_c("SUB_START_REF"); -#endif - sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_REF, - signal, SubStartRef::SignalLength, JBB); - if (subbPtr.p->m_reqTracker.hasConf()) { - // stopStartedNodes(signal); - } - c_opSubEvent.release(subbPtr); - return; - } -#ifdef EVENT_DEBUG - ndbout_c("SUB_START_CONF"); -#endif - sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_CONF, - signal, SubStartConf::SignalLength, JBB); - c_opSubEvent.release(subbPtr); -} - -/******************************************************************** - * - * Stop event - * - *******************************************************************/ - -void Dbdict::execSUB_STOP_REQ(Signal* signal) -{ - jamEntry(); - - Uint32 origSenderRef = signal->senderBlockRef(); - - OpSubEventPtr subbPtr; - if (!c_opSubEvent.seize(subbPtr)) { - SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend(); - jam(); - // ret->setErrorCode(SubStartRef::SeizeError); - // ret->setErrorLine(__LINE__); - // ret->setErrorNode(reference()); - ref->senderRef = reference(); - ref->setTemporary(SubStopRef::Busy); - - sendSignal(origSenderRef, GSN_SUB_STOP_REF, signal, - SubStopRef::SignalLength, JBB); - return; - } - - { - const SubStopReq* req = (SubStopReq*) signal->getDataPtr(); - subbPtr.p->m_senderRef = req->senderRef; - subbPtr.p->m_senderData = req->senderData; - subbPtr.p->m_errorCode = 0; - } - - if (refToBlock(origSenderRef) != DBDICT) { - /* - * Coordinator - */ - jam(); -#ifdef EVENT_DEBUG - ndbout_c("SUB_STOP_REQ 1"); -#endif - subbPtr.p->m_senderRef = origSenderRef; // not sure if API sets correctly - NodeReceiverGroup rg(DBDICT, c_aliveNodes); - RequestTracker & p = subbPtr.p->m_reqTracker; - p.init<SubStopRef>(c_counterMgr, rg, GSN_SUB_STOP_REF, subbPtr.i); - - SubStopReq* req = (SubStopReq*) signal->getDataPtrSend(); - - req->senderRef = reference(); - req->senderData = subbPtr.i; - - sendSignal(rg, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB); - return; - } - /* - * Participant - */ -#ifdef EVENT_DEBUG - ndbout_c("SUB_STOP_REQ 2"); -#endif - ndbrequire(refToBlock(origSenderRef) == DBDICT); - { - SubStopReq* req = (SubStopReq*) signal->getDataPtrSend(); - - req->senderRef = reference(); - req->senderData = subbPtr.i; - - sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB); - } -} - -void Dbdict::execSUB_STOP_REF(Signal* signal) -{ - jamEntry(); - const SubStopRef* ref = (SubStopRef*) signal->getDataPtr(); - Uint32 senderRef = ref->senderRef; - - OpSubEventPtr subbPtr; - c_opSubEvent.getPtr(subbPtr, ref->senderData); - - if (refToBlock(senderRef) == SUMA) { - /* - * Participant - */ - jam(); - if (ref->isTemporary()){ - jam(); - SubStopReq* req = (SubStopReq*)signal->getDataPtrSend(); - req->senderRef = reference(); - req->senderData = subbPtr.i; - sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, - signal, SubStopReq::SignalLength, JBB); - } else { - jam(); - SubStopRef* ref = (SubStopRef*) signal->getDataPtrSend(); - ref->senderRef = reference(); - ref->senderData = subbPtr.p->m_senderData; - sendSignal(subbPtr.p->m_senderRef, GSN_SUB_STOP_REF, - signal, SubStopRef::SignalLength, JBB); - c_opSubEvent.release(subbPtr); - } - return; - } - /* - * Coordinator - */ - ndbrequire(refToBlock(senderRef) == DBDICT); - if (ref->errorCode == SubStopRef::NF_FakeErrorREF){ - jam(); - subbPtr.p->m_reqTracker.ignoreRef(c_counterMgr, refToNode(senderRef)); - } else { - jam(); - subbPtr.p->m_reqTracker.reportRef(c_counterMgr, refToNode(senderRef)); - } - completeSubStopReq(signal,subbPtr.i,0); -} - -void Dbdict::execSUB_STOP_CONF(Signal* signal) -{ - jamEntry(); - - const SubStopConf* conf = (SubStopConf*) signal->getDataPtr(); - Uint32 senderRef = conf->senderRef; - - OpSubEventPtr subbPtr; - c_opSubEvent.getPtr(subbPtr, conf->senderData); - - if (refToBlock(senderRef) == SUMA) { - /* - * Participant - */ - jam(); - SubStopConf* conf = (SubStopConf*) signal->getDataPtrSend(); - - conf->senderRef = reference(); - conf->senderData = subbPtr.p->m_senderData; - - sendSignal(subbPtr.p->m_senderRef, GSN_SUB_STOP_CONF, - signal, SubStopConf::SignalLength, JBB); - c_opSubEvent.release(subbPtr); - return; - } - /* - * Coordinator - */ - ndbrequire(refToBlock(senderRef) == DBDICT); - subbPtr.p->m_reqTracker.reportConf(c_counterMgr, refToNode(senderRef)); - completeSubStopReq(signal,subbPtr.i,0); -} - -/* - * Coordinator - */ -void Dbdict::completeSubStopReq(Signal* signal, - Uint32 ptrI, - Uint32 returnCode){ - OpSubEventPtr subbPtr; - c_opSubEvent.getPtr(subbPtr, ptrI); - - if (!subbPtr.p->m_reqTracker.done()){ - jam(); - return; - } - - if (subbPtr.p->m_reqTracker.hasRef()) { - jam(); -#ifdef EVENT_DEBUG - ndbout_c("SUB_STOP_REF"); -#endif - SubStopRef* ref = (SubStopRef*)signal->getDataPtrSend(); - - ref->senderRef = reference(); - ref->senderData = subbPtr.p->m_senderData; - /* - ref->subscriptionId = subbPtr.p->m_senderData; - ref->subscriptionKey = subbPtr.p->m_senderData; - ref->part = subbPtr.p->m_part; // SubscriptionData::Part - ref->subscriberData = subbPtr.p->m_subscriberData; - ref->subscriberRef = subbPtr.p->m_subscriberRef; - */ - ref->errorCode = subbPtr.p->m_errorCode; - - - sendSignal(subbPtr.p->m_senderRef, GSN_SUB_STOP_REF, - signal, SubStopRef::SignalLength, JBB); - if (subbPtr.p->m_reqTracker.hasConf()) { - // stopStartedNodes(signal); - } - c_opSubEvent.release(subbPtr); - return; - } -#ifdef EVENT_DEBUG - ndbout_c("SUB_STOP_CONF"); -#endif - sendSignal(subbPtr.p->m_senderRef, GSN_SUB_STOP_CONF, - signal, SubStopConf::SignalLength, JBB); - c_opSubEvent.release(subbPtr); -} - -/*************************************************************** - * MODULE: Drop event. - * - * Drop event. - * - * TODO - */ - -void -Dbdict::execDROP_EVNT_REQ(Signal* signal) -{ - jamEntry(); - EVENT_TRACE; - - DropEvntReq *req = (DropEvntReq*)signal->getDataPtr(); - const Uint32 senderRef = signal->senderBlockRef(); - OpDropEventPtr evntRecPtr; - - // Seize a Create Event record - if (!c_opDropEvent.seize(evntRecPtr)) { - // Failed to allocate event record - jam(); - releaseSections(signal); - - DropEvntRef * ret = (DropEvntRef *)signal->getDataPtrSend(); - ret->setErrorCode(DropEvntRef::SeizeError); - ret->setErrorLine(__LINE__); - ret->setErrorNode(reference()); - sendSignal(senderRef, GSN_DROP_EVNT_REF, signal, - DropEvntRef::SignalLength, JBB); - return; - } - -#ifdef EVENT_DEBUG - ndbout_c("DBDICT::execDROP_EVNT_REQ evntRecId = (%d)", evntRecPtr.i); -#endif - - OpDropEvent* evntRec = evntRecPtr.p; - evntRec->init(req); - - SegmentedSectionPtr ssPtr; - - signal->getSection(ssPtr, 0); - - SimplePropertiesSectionReader r0(ssPtr, getSectionSegmentPool()); -#ifdef EVENT_DEBUG - r0.printAll(ndbout); -#endif - // event name - if ((!r0.first()) || - (r0.getValueType() != SimpleProperties::StringValue) || - (r0.getValueLen() <= 0)) { - jam(); - releaseSections(signal); - - evntRecPtr.p->m_errorCode = DropEvntRef::Undefined; - evntRecPtr.p->m_errorLine = __LINE__; - evntRecPtr.p->m_errorNode = reference(); - - dropEvent_sendReply(signal, evntRecPtr); - return; - } - r0.getString(evntRecPtr.p->m_eventRec.NAME); - { - int len = strlen(evntRecPtr.p->m_eventRec.NAME); - memset(evntRecPtr.p->m_eventRec.NAME+len, 0, MAX_TAB_NAME_SIZE-len); -#ifdef EVENT_DEBUG - printf("DropEvntReq; EventName %s, len %u\n", - evntRecPtr.p->m_eventRec.NAME, len); - for(int i = 0; i < MAX_TAB_NAME_SIZE/4; i++) - printf("H'%.8x ", ((Uint32*)evntRecPtr.p->m_eventRec.NAME)[i]); - printf("\n"); -#endif - } - - releaseSections(signal); - - Callback c = { safe_cast(&Dbdict::dropEventUTIL_PREPARE_READ), 0 }; - - prepareTransactionEventSysTable(&c, signal, evntRecPtr.i, - UtilPrepareReq::Read); -} - -void -Dbdict::dropEventUTIL_PREPARE_READ(Signal* signal, - Uint32 callbackData, - Uint32 returnCode) -{ - jam(); - EVENT_TRACE; - if (returnCode != 0) { - EVENT_TRACE; - dropEventUtilPrepareRef(signal, callbackData, returnCode); - return; - } - - UtilPrepareConf* const req = (UtilPrepareConf*)signal->getDataPtr(); - OpDropEventPtr evntRecPtr; - evntRecPtr.i = req->getSenderData(); - const Uint32 prepareId = req->getPrepareId(); - - ndbrequire((evntRecPtr.p = c_opDropEvent.getPtr(evntRecPtr.i)) != NULL); - - Callback c = { safe_cast(&Dbdict::dropEventUTIL_EXECUTE_READ), 0 }; - - executeTransEventSysTable(&c, signal, - evntRecPtr.i, evntRecPtr.p->m_eventRec, - prepareId, UtilPrepareReq::Read); -} - -void -Dbdict::dropEventUTIL_EXECUTE_READ(Signal* signal, - Uint32 callbackData, - Uint32 returnCode) -{ - jam(); - EVENT_TRACE; - if (returnCode != 0) { - EVENT_TRACE; - dropEventUtilExecuteRef(signal, callbackData, returnCode); - return; - } - - OpDropEventPtr evntRecPtr; - UtilExecuteConf * const ref = (UtilExecuteConf *)signal->getDataPtr(); - jam(); - evntRecPtr.i = ref->getSenderData(); - ndbrequire((evntRecPtr.p = c_opDropEvent.getPtr(evntRecPtr.i)) != NULL); - - parseReadEventSys(signal, evntRecPtr.p->m_eventRec); - - NodeReceiverGroup rg(DBDICT, c_aliveNodes); - RequestTracker & p = evntRecPtr.p->m_reqTracker; - p.init<SubRemoveRef>(c_counterMgr, rg, GSN_SUB_REMOVE_REF, - evntRecPtr.i); - - SubRemoveReq* req = (SubRemoveReq*) signal->getDataPtrSend(); - - req->senderRef = reference(); - req->senderData = evntRecPtr.i; - req->subscriptionId = evntRecPtr.p->m_eventRec.SUBID; - req->subscriptionKey = evntRecPtr.p->m_eventRec.SUBKEY; - - sendSignal(rg, GSN_SUB_REMOVE_REQ, signal, SubRemoveReq::SignalLength, JBB); -} - -/* - * Participant - */ - -void -Dbdict::execSUB_REMOVE_REQ(Signal* signal) -{ - jamEntry(); - - Uint32 origSenderRef = signal->senderBlockRef(); - - OpSubEventPtr subbPtr; - if (!c_opSubEvent.seize(subbPtr)) { - SubRemoveRef * ref = (SubRemoveRef *)signal->getDataPtrSend(); - jam(); - ref->senderRef = reference(); - ref->setTemporary(SubRemoveRef::Busy); - - sendSignal(origSenderRef, GSN_SUB_REMOVE_REF, signal, - SubRemoveRef::SignalLength, JBB); - return; - } - - { - const SubRemoveReq* req = (SubRemoveReq*) signal->getDataPtr(); - subbPtr.p->m_senderRef = req->senderRef; - subbPtr.p->m_senderData = req->senderData; - subbPtr.p->m_errorCode = 0; - } - - SubRemoveReq* req = (SubRemoveReq*) signal->getDataPtrSend(); - req->senderRef = reference(); - req->senderData = subbPtr.i; - - sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal, SubRemoveReq::SignalLength, JBB); -} - -/* - * Coordintor/Participant - */ - -void -Dbdict::execSUB_REMOVE_REF(Signal* signal) -{ - jamEntry(); - const SubRemoveRef* ref = (SubRemoveRef*) signal->getDataPtr(); - Uint32 senderRef = ref->senderRef; - - if (refToBlock(senderRef) == SUMA) { - /* - * Participant - */ - jam(); - OpSubEventPtr subbPtr; - c_opSubEvent.getPtr(subbPtr, ref->senderData); - if (ref->errorCode == (Uint32) GrepError::SUBSCRIPTION_ID_NOT_FOUND) { - // conf this since this may occur if a nodefailiure has occured - // earlier so that the systable was not cleared - SubRemoveConf* conf = (SubRemoveConf*) signal->getDataPtrSend(); - conf->senderRef = reference(); - conf->senderData = subbPtr.p->m_senderData; - sendSignal(subbPtr.p->m_senderRef, GSN_SUB_REMOVE_CONF, - signal, SubRemoveConf::SignalLength, JBB); - } else { - SubRemoveRef* ref = (SubRemoveRef*) signal->getDataPtrSend(); - ref->senderRef = reference(); - ref->senderData = subbPtr.p->m_senderData; - sendSignal(subbPtr.p->m_senderRef, GSN_SUB_REMOVE_REF, - signal, SubRemoveRef::SignalLength, JBB); - } - c_opSubEvent.release(subbPtr); - return; - } - /* - * Coordinator - */ - ndbrequire(refToBlock(senderRef) == DBDICT); - OpDropEventPtr eventRecPtr; - c_opDropEvent.getPtr(eventRecPtr, ref->senderData); - if (ref->errorCode == SubRemoveRef::NF_FakeErrorREF){ - jam(); - eventRecPtr.p->m_reqTracker.ignoreRef(c_counterMgr, refToNode(senderRef)); - } else { - jam(); - eventRecPtr.p->m_reqTracker.reportRef(c_counterMgr, refToNode(senderRef)); - } - completeSubRemoveReq(signal,eventRecPtr.i,0); -} - -void -Dbdict::execSUB_REMOVE_CONF(Signal* signal) -{ - jamEntry(); - const SubRemoveConf* conf = (SubRemoveConf*) signal->getDataPtr(); - Uint32 senderRef = conf->senderRef; - - if (refToBlock(senderRef) == SUMA) { - /* - * Participant - */ - jam(); - OpSubEventPtr subbPtr; - c_opSubEvent.getPtr(subbPtr, conf->senderData); - SubRemoveConf* conf = (SubRemoveConf*) signal->getDataPtrSend(); - conf->senderRef = reference(); - conf->senderData = subbPtr.p->m_senderData; - sendSignal(subbPtr.p->m_senderRef, GSN_SUB_REMOVE_CONF, - signal, SubRemoveConf::SignalLength, JBB); - c_opSubEvent.release(subbPtr); - return; - } - /* - * Coordinator - */ - ndbrequire(refToBlock(senderRef) == DBDICT); - OpDropEventPtr eventRecPtr; - c_opDropEvent.getPtr(eventRecPtr, conf->senderData); - eventRecPtr.p->m_reqTracker.reportConf(c_counterMgr, refToNode(senderRef)); - completeSubRemoveReq(signal,eventRecPtr.i,0); -} - -void -Dbdict::completeSubRemoveReq(Signal* signal, Uint32 ptrI, Uint32 xxx) -{ - OpDropEventPtr evntRecPtr; - c_opDropEvent.getPtr(evntRecPtr, ptrI); - - if (!evntRecPtr.p->m_reqTracker.done()){ - jam(); - return; - } - - if (evntRecPtr.p->m_reqTracker.hasRef()) { - jam(); - evntRecPtr.p->m_errorNode = reference(); - evntRecPtr.p->m_errorLine = __LINE__; - evntRecPtr.p->m_errorCode = DropEvntRef::Undefined; - dropEvent_sendReply(signal, evntRecPtr); - return; - } - - Callback c = { safe_cast(&Dbdict::dropEventUTIL_PREPARE_DELETE), 0 }; - - prepareTransactionEventSysTable(&c, signal, evntRecPtr.i, - UtilPrepareReq::Delete); -} - -void -Dbdict::dropEventUTIL_PREPARE_DELETE(Signal* signal, - Uint32 callbackData, - Uint32 returnCode) -{ - jam(); - EVENT_TRACE; - if (returnCode != 0) { - EVENT_TRACE; - dropEventUtilPrepareRef(signal, callbackData, returnCode); - return; - } - - UtilPrepareConf* const req = (UtilPrepareConf*)signal->getDataPtr(); - OpDropEventPtr evntRecPtr; - jam(); - evntRecPtr.i = req->getSenderData(); - const Uint32 prepareId = req->getPrepareId(); - - ndbrequire((evntRecPtr.p = c_opDropEvent.getPtr(evntRecPtr.i)) != NULL); -#ifdef EVENT_DEBUG - printf("DropEvntUTIL_PREPARE; evntRecPtr.i len %u\n",evntRecPtr.i); -#endif - - Callback c = { safe_cast(&Dbdict::dropEventUTIL_EXECUTE_DELETE), 0 }; - - executeTransEventSysTable(&c, signal, - evntRecPtr.i, evntRecPtr.p->m_eventRec, - prepareId, UtilPrepareReq::Delete); -} - -void -Dbdict::dropEventUTIL_EXECUTE_DELETE(Signal* signal, - Uint32 callbackData, - Uint32 returnCode) -{ - jam(); - EVENT_TRACE; - if (returnCode != 0) { - EVENT_TRACE; - dropEventUtilExecuteRef(signal, callbackData, returnCode); - return; - } - - OpDropEventPtr evntRecPtr; - UtilExecuteConf * const ref = (UtilExecuteConf *)signal->getDataPtr(); - jam(); - evntRecPtr.i = ref->getSenderData(); - ndbrequire((evntRecPtr.p = c_opDropEvent.getPtr(evntRecPtr.i)) != NULL); - - dropEvent_sendReply(signal, evntRecPtr); -} - -void -Dbdict::dropEventUtilPrepareRef(Signal* signal, - Uint32 callbackData, - Uint32 returnCode) -{ - jam(); - EVENT_TRACE; - UtilPrepareRef * const ref = (UtilPrepareRef *)signal->getDataPtr(); - OpDropEventPtr evntRecPtr; - evntRecPtr.i = ref->getSenderData(); - ndbrequire((evntRecPtr.p = c_opDropEvent.getPtr(evntRecPtr.i)) != NULL); - - bool temporary = false; - interpretUtilPrepareErrorCode((UtilPrepareRef::ErrorCode)ref->getErrorCode(), - temporary, evntRecPtr.p->m_errorLine); - if (temporary) { - evntRecPtr.p->m_errorCode = (DropEvntRef::ErrorCode) - ((Uint32) DropEvntRef::Undefined | (Uint32) DropEvntRef::Temporary); - } - - if (evntRecPtr.p->m_errorCode == 0) { - evntRecPtr.p->m_errorCode = DropEvntRef::Undefined; - evntRecPtr.p->m_errorLine = __LINE__; - } - evntRecPtr.p->m_errorNode = reference(); - - dropEvent_sendReply(signal, evntRecPtr); -} - -void -Dbdict::dropEventUtilExecuteRef(Signal* signal, - Uint32 callbackData, - Uint32 returnCode) -{ - jam(); - EVENT_TRACE; - OpDropEventPtr evntRecPtr; - UtilExecuteRef * const ref = (UtilExecuteRef *)signal->getDataPtr(); - jam(); - evntRecPtr.i = ref->getSenderData(); - ndbrequire((evntRecPtr.p = c_opDropEvent.getPtr(evntRecPtr.i)) != NULL); - - evntRecPtr.p->m_errorNode = reference(); - evntRecPtr.p->m_errorLine = __LINE__; - - switch (ref->getErrorCode()) { - case UtilExecuteRef::TCError: - switch (ref->getTCErrorCode()) { - case ZNOT_FOUND: - jam(); - evntRecPtr.p->m_errorCode = DropEvntRef::EventNotFound; - break; - default: - jam(); - evntRecPtr.p->m_errorCode = DropEvntRef::UndefinedTCError; - break; - } - break; - default: - jam(); - evntRecPtr.p->m_errorCode = DropEvntRef::Undefined; - break; - } - dropEvent_sendReply(signal, evntRecPtr); -} - -void Dbdict::dropEvent_sendReply(Signal* signal, - OpDropEventPtr evntRecPtr) -{ - jam(); - EVENT_TRACE; - Uint32 senderRef = evntRecPtr.p->m_request.getUserRef(); - - if (evntRecPtr.p->hasError()) { - jam(); - DropEvntRef * ret = (DropEvntRef *)signal->getDataPtrSend(); - - ret->setUserData(evntRecPtr.p->m_request.getUserData()); - ret->setUserRef(evntRecPtr.p->m_request.getUserRef()); - - ret->setErrorCode(evntRecPtr.p->m_errorCode); - ret->setErrorLine(evntRecPtr.p->m_errorLine); - ret->setErrorNode(evntRecPtr.p->m_errorNode); - - sendSignal(senderRef, GSN_DROP_EVNT_REF, signal, - DropEvntRef::SignalLength, JBB); - } else { - jam(); - DropEvntConf * evntConf = (DropEvntConf *)signal->getDataPtrSend(); - - evntConf->setUserData(evntRecPtr.p->m_request.getUserData()); - evntConf->setUserRef(evntRecPtr.p->m_request.getUserRef()); - - sendSignal(senderRef, GSN_DROP_EVNT_CONF, signal, - DropEvntConf::SignalLength, JBB); - } - - c_opDropEvent.release(evntRecPtr); -} /** * MODULE: Alter index diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.hpp b/ndb/src/kernel/blocks/dbdict/Dbdict.hpp index 0fa984a4c61..ed8b7e3b822 100644 --- a/ndb/src/kernel/blocks/dbdict/Dbdict.hpp +++ b/ndb/src/kernel/blocks/dbdict/Dbdict.hpp @@ -46,8 +46,6 @@ #include <signaldata/DropIndx.hpp> #include <signaldata/AlterIndx.hpp> #include <signaldata/BuildIndx.hpp> -#include <signaldata/UtilPrepare.hpp> -#include <signaldata/CreateEvnt.hpp> #include <signaldata/CreateTrig.hpp> #include <signaldata/DropTrig.hpp> #include <signaldata/AlterTrig.hpp> @@ -517,45 +515,6 @@ private: void execBACKUP_FRAGMENT_REQ(Signal*); - // Util signals used by Event code - void execUTIL_PREPARE_CONF(Signal* signal); - void execUTIL_PREPARE_REF (Signal* signal); - void execUTIL_EXECUTE_CONF(Signal* signal); - void execUTIL_EXECUTE_REF (Signal* signal); - void execUTIL_RELEASE_CONF(Signal* signal); - void execUTIL_RELEASE_REF (Signal* signal); - - - // Event signals from API - void execCREATE_EVNT_REQ (Signal* signal); - void execCREATE_EVNT_CONF(Signal* signal); - void execCREATE_EVNT_REF (Signal* signal); - - void execDROP_EVNT_REQ (Signal* signal); - - void execSUB_START_REQ (Signal* signal); - void execSUB_START_CONF (Signal* signal); - void execSUB_START_REF (Signal* signal); - - void execSUB_STOP_REQ (Signal* signal); - void execSUB_STOP_CONF (Signal* signal); - void execSUB_STOP_REF (Signal* signal); - - // Event signals from SUMA - - void execCREATE_SUBID_CONF(Signal* signal); - void execCREATE_SUBID_REF (Signal* signal); - - void execSUB_CREATE_CONF(Signal* signal); - void execSUB_CREATE_REF (Signal* signal); - - void execSUB_SYNC_CONF(Signal* signal); - void execSUB_SYNC_REF (Signal* signal); - - void execSUB_REMOVE_REQ(Signal* signal); - void execSUB_REMOVE_CONF(Signal* signal); - void execSUB_REMOVE_REF(Signal* signal); - // Trigger signals void execCREATE_TRIG_REQ(Signal* signal); void execCREATE_TRIG_CONF(Signal* signal); @@ -1386,119 +1345,6 @@ private: typedef Ptr<OpBuildIndex> OpBuildIndexPtr; /** - * Operation record for Util Signals. - */ - struct OpSignalUtil : OpRecordCommon{ - Callback m_callback; - Uint32 m_userData; - }; - typedef Ptr<OpSignalUtil> OpSignalUtilPtr; - - /** - * Operation record for subscribe-start-stop - */ - struct OpSubEvent : OpRecordCommon { - Uint32 m_senderRef; - Uint32 m_senderData; - Uint32 m_errorCode; - RequestTracker m_reqTracker; - }; - typedef Ptr<OpSubEvent> OpSubEventPtr; - - static const Uint32 sysTab_NDBEVENTS_0_szs[]; - - /** - * Operation record for create event. - */ - struct OpCreateEvent : OpRecordCommon { - // original request (event id will be added) - CreateEvntReq m_request; - //AttributeMask m_attrListBitmask; - // AttributeList m_attrList; - sysTab_NDBEVENTS_0 m_eventRec; - // char m_eventName[MAX_TAB_NAME_SIZE]; - // char m_tableName[MAX_TAB_NAME_SIZE]; - - // coordinator DICT - RequestTracker m_reqTracker; - // state info - CreateEvntReq::RequestType m_requestType; - Uint32 m_requestFlag; - // error info - CreateEvntRef::ErrorCode m_errorCode; - Uint32 m_errorLine; - Uint32 m_errorNode; - // ctor - OpCreateEvent() { - memset(&m_request, 0, sizeof(m_request)); - m_requestType = CreateEvntReq::RT_UNDEFINED; - m_requestFlag = 0; - m_errorCode = CreateEvntRef::NoError; - m_errorLine = 0; - m_errorNode = 0; - } - void init(const CreateEvntReq* req, Dbdict* dp) { - m_request = *req; - m_errorCode = CreateEvntRef::NoError; - m_errorLine = 0; - m_errorNode = 0; - m_requestType = req->getRequestType(); - m_requestFlag = req->getRequestFlag(); - } - bool hasError() { - return m_errorCode != CreateEvntRef::NoError; - } - void setError(const CreateEvntRef* ref) { - if (ref != 0 && ! hasError()) { - m_errorCode = ref->getErrorCode(); - m_errorLine = ref->getErrorLine(); - m_errorNode = ref->getErrorNode(); - } - } - - }; - typedef Ptr<OpCreateEvent> OpCreateEventPtr; - - /** - * Operation record for drop event. - */ - struct OpDropEvent : OpRecordCommon { - // original request - DropEvntReq m_request; - // char m_eventName[MAX_TAB_NAME_SIZE]; - sysTab_NDBEVENTS_0 m_eventRec; - RequestTracker m_reqTracker; - // error info - DropEvntRef::ErrorCode m_errorCode; - Uint32 m_errorLine; - Uint32 m_errorNode; - // ctor - OpDropEvent() { - memset(&m_request, 0, sizeof(m_request)); - m_errorCode = DropEvntRef::NoError; - m_errorLine = 0; - m_errorNode = 0; - } - void init(const DropEvntReq* req) { - m_request = *req; - m_errorCode = DropEvntRef::NoError; - m_errorLine = 0; - m_errorNode = 0; - } - bool hasError() { - return m_errorCode != DropEvntRef::NoError; - } - void setError(const DropEvntRef* ref) { - if (ref != 0 && ! hasError()) { - m_errorCode = ref->getErrorCode(); - m_errorLine = ref->getErrorLine(); - m_errorNode = ref->getErrorNode(); - } - } - }; - typedef Ptr<OpDropEvent> OpDropEventPtr; - - /** * Operation record for create trigger. */ struct OpCreateTrigger : OpRecordCommon { @@ -1718,10 +1564,6 @@ public: STATIC_CONST( opDropIndexSize = sizeof(OpDropIndex) ); STATIC_CONST( opAlterIndexSize = sizeof(OpAlterIndex) ); STATIC_CONST( opBuildIndexSize = sizeof(OpBuildIndex) ); - STATIC_CONST( opCreateEventSize = sizeof(OpCreateEvent) ); - STATIC_CONST( opSubEventSize = sizeof(OpSubEvent) ); - STATIC_CONST( opDropEventSize = sizeof(OpDropEvent) ); - STATIC_CONST( opSignalUtilSize = sizeof(OpSignalUtil) ); STATIC_CONST( opCreateTriggerSize = sizeof(OpCreateTrigger) ); STATIC_CONST( opDropTriggerSize = sizeof(OpDropTrigger) ); STATIC_CONST( opAlterTriggerSize = sizeof(OpAlterTrigger) ); @@ -1732,10 +1574,6 @@ private: Uint32 u_opDropTable [PTR_ALIGN(opDropTableSize)]; Uint32 u_opCreateIndex [PTR_ALIGN(opCreateIndexSize)]; Uint32 u_opDropIndex [PTR_ALIGN(opDropIndexSize)]; - Uint32 u_opCreateEvent [PTR_ALIGN(opCreateEventSize)]; - Uint32 u_opSubEvent [PTR_ALIGN(opSubEventSize)]; - Uint32 u_opDropEvent [PTR_ALIGN(opDropEventSize)]; - Uint32 u_opSignalUtil [PTR_ALIGN(opSignalUtilSize)]; Uint32 u_opAlterIndex [PTR_ALIGN(opAlterIndexSize)]; Uint32 u_opBuildIndex [PTR_ALIGN(opBuildIndexSize)]; Uint32 u_opCreateTrigger[PTR_ALIGN(opCreateTriggerSize)]; @@ -1752,10 +1590,6 @@ private: KeyTable2<OpDropIndex, OpRecordUnion> c_opDropIndex; KeyTable2<OpAlterIndex, OpRecordUnion> c_opAlterIndex; KeyTable2<OpBuildIndex, OpRecordUnion> c_opBuildIndex; - KeyTable2<OpCreateEvent, OpRecordUnion> c_opCreateEvent; - KeyTable2<OpSubEvent, OpRecordUnion> c_opSubEvent; - KeyTable2<OpDropEvent, OpRecordUnion> c_opDropEvent; - KeyTable2<OpSignalUtil, OpRecordUnion> c_opSignalUtil; KeyTable2<OpCreateTrigger, OpRecordUnion> c_opCreateTrigger; KeyTable2<OpDropTrigger, OpRecordUnion> c_opDropTrigger; KeyTable2<OpAlterTrigger, OpRecordUnion> c_opAlterTrigger; @@ -2019,101 +1853,6 @@ private: void buildIndex_sendSlaveReq(Signal* signal, OpBuildIndexPtr opPtr); void buildIndex_sendReply(Signal* signal, OpBuildIndexPtr opPtr, bool); - // Events - void - createEventUTIL_PREPARE(Signal* signal, - Uint32 callbackData, - Uint32 returnCode); - void - createEventUTIL_EXECUTE(Signal *signal, - Uint32 callbackData, - Uint32 returnCode); - void - dropEventUTIL_PREPARE_READ(Signal* signal, - Uint32 callbackData, - Uint32 returnCode); - void - dropEventUTIL_EXECUTE_READ(Signal* signal, - Uint32 callbackData, - Uint32 returnCode); - void - dropEventUTIL_PREPARE_DELETE(Signal* signal, - Uint32 callbackData, - Uint32 returnCode); - void - dropEventUTIL_EXECUTE_DELETE(Signal *signal, - Uint32 callbackData, - Uint32 returnCode); - void - dropEventUtilPrepareRef(Signal* signal, - Uint32 callbackData, - Uint32 returnCode); - void - dropEventUtilExecuteRef(Signal* signal, - Uint32 callbackData, - Uint32 returnCode); - int - sendSignalUtilReq(Callback *c, - BlockReference ref, - GlobalSignalNumber gsn, - Signal* signal, - Uint32 length, - JobBufferLevel jbuf, - LinearSectionPtr ptr[3], - Uint32 noOfSections); - int - recvSignalUtilReq(Signal* signal, Uint32 returnCode); - - void completeSubStartReq(Signal* signal, Uint32 ptrI, Uint32 returnCode); - void completeSubStopReq(Signal* signal, Uint32 ptrI, Uint32 returnCode); - void completeSubRemoveReq(Signal* signal, Uint32 ptrI, Uint32 returnCode); - - void dropEvent_sendReply(Signal* signal, - OpDropEventPtr evntRecPtr); - - void createEvent_RT_USER_CREATE(Signal* signal, OpCreateEventPtr evntRecPtr); - void createEventComplete_RT_USER_CREATE(Signal* signal, - OpCreateEventPtr evntRecPtr); - void createEvent_RT_USER_GET(Signal* signal, OpCreateEventPtr evntRecPtr); - void createEventComplete_RT_USER_GET(Signal* signal, OpCreateEventPtr evntRecPtr); - - void createEvent_RT_DICT_AFTER_GET(Signal* signal, OpCreateEventPtr evntRecPtr); - - void createEvent_nodeFailCallback(Signal* signal, Uint32 eventRecPtrI, - Uint32 returnCode); - void createEvent_sendReply(Signal* signal, OpCreateEventPtr evntRecPtr, - LinearSectionPtr *ptr = NULL, int noLSP = 0); - - void prepareTransactionEventSysTable (Callback *c, - Signal* signal, - Uint32 senderData, - UtilPrepareReq::OperationTypeValue prepReq); - void prepareUtilTransaction(Callback *c, - Signal* signal, - Uint32 senderData, - Uint32 tableId, - const char *tableName, - UtilPrepareReq::OperationTypeValue prepReq, - Uint32 noAttr, - Uint32 attrIds[], - const char *attrNames[]); - - void executeTransEventSysTable(Callback *c, - Signal *signal, - const Uint32 ptrI, - sysTab_NDBEVENTS_0& m_eventRec, - const Uint32 prepareId, - UtilPrepareReq::OperationTypeValue prepReq); - void executeTransaction(Callback *c, - Signal* signal, - Uint32 senderData, - Uint32 prepareId, - Uint32 noAttr, - LinearSectionPtr headerPtr, - LinearSectionPtr dataPtr); - - void parseReadEventSys(Signal *signal, sysTab_NDBEVENTS_0& m_eventRec); - // create trigger void createTrigger_recvReply(Signal* signal, const CreateTrigConf* conf, const CreateTrigRef* ref); diff --git a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp index c8a33715b9c..b9ebbaf0f76 100644 --- a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp +++ b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp @@ -2018,9 +2018,6 @@ void Dbdih::execINCL_NODECONF(Signal* signal) signal->theData[0] = reference(); signal->theData[1] = c_nodeStartSlave.nodeId; sendSignal(BACKUP_REF, GSN_INCL_NODEREQ, signal, 2, JBB); - - // Suma will not send response to this for now, later... - sendSignal(SUMA_REF, GSN_INCL_NODEREQ, signal, 2, JBB); return; }//if if (TstartNode_or_blockref == numberToRef(BACKUP, getOwnNodeId())){ @@ -8019,12 +8016,6 @@ void Dbdih::writingCopyGciLab(Signal* signal, FileRecordPtr filePtr) if (reason == CopyGCIReq::GLOBAL_CHECKPOINT) { jam(); cgcpParticipantState = GCP_PARTICIPANT_READY; - - SubGcpCompleteRep * const rep = (SubGcpCompleteRep*)signal->getDataPtr(); - rep->gci = coldgcp; - rep->senderData = 0; - sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_REP, signal, - SubGcpCompleteRep::SignalLength, JBB); } jam(); diff --git a/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp b/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp index 176bab0d4bf..14efa8cd784 100644 --- a/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp +++ b/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp @@ -1461,9 +1461,6 @@ void Ndbcntr::execNODE_FAILREP(Signal* signal) sendSignal(BACKUP_REF, GSN_NODE_FAILREP, signal, NodeFailRep::SignalLength, JBB); - sendSignal(SUMA_REF, GSN_NODE_FAILREP, signal, - NodeFailRep::SignalLength, JBB); - if (c_stopRec.stopReq.senderRef) { jam(); diff --git a/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp b/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp index b9bf522f7c8..3cb07dae924 100644 --- a/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp +++ b/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp @@ -2340,7 +2340,6 @@ void Qmgr::sendApiFailReq(Signal* signal, Uint16 failedNodeNo) failedNodePtr.p->failState = WAITING_FOR_FAILCONF1; sendSignal(DBTC_REF, GSN_API_FAILREQ, signal, 2, JBA); sendSignal(DBDICT_REF, GSN_API_FAILREQ, signal, 2, JBA); - sendSignal(SUMA_REF, GSN_API_FAILREQ, signal, 2, JBA); /**------------------------------------------------------------------------- * THE OTHER NODE WAS AN API NODE. THE COMMUNICATION LINK IS ALREADY diff --git a/ndb/src/kernel/blocks/suma/Suma.cpp b/ndb/src/kernel/blocks/suma/Suma.cpp index 3644bc0a03f..449436331e4 100644 --- a/ndb/src/kernel/blocks/suma/Suma.cpp +++ b/ndb/src/kernel/blocks/suma/Suma.cpp @@ -82,44 +82,6 @@ static const Uint32 SUMA_SEQUENCE = 0xBABEBABE; #define PRINT_ONLY 0 static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE; -void -Suma::getNodeGroupMembers(Signal* signal) { - jam(); - /** - * Ask DIH for nodeGroupMembers - */ - CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend(); - sd->blockRef = reference(); - sd->requestType = - CheckNodeGroups::Direct | - CheckNodeGroups::GetNodeGroupMembers; - sd->nodeId = getOwnNodeId(); - EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal, - CheckNodeGroups::SignalLength); - jamEntry(); - - c_nodeGroup = sd->output; - c_noNodesInGroup = 0; - for (int i = 0; i < MAX_NDB_NODES; i++) { - if (sd->mask.get(i)) { - if (i == getOwnNodeId()) c_idInNodeGroup = c_noNodesInGroup; - c_nodesInGroup[c_noNodesInGroup] = i; - c_noNodesInGroup++; - } - } - - // ndbout_c("c_noNodesInGroup=%d", c_noNodesInGroup); - ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup - -#ifdef NODEFAIL_DEBUG - for (Uint32 i = 0; i < c_noNodesInGroup; i++) { - ndbout_c ("Suma: NodeGroup %u, me %u, me in group %u, member[%u] %u", - c_nodeGroup, getOwnNodeId(), c_idInNodeGroup, - i, c_nodesInGroup[i]); - } -#endif -} - void Suma::execREAD_CONFIG_REQ(Signal* signal) { @@ -188,11 +150,6 @@ Suma::execSTTOR(Signal* signal) { DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", startphase, typeOfStart)); - if(startphase == 1){ - jam(); - c_restartLock = true; - } - if(startphase == 3){ jam(); g_TypeOfStart = typeOfStart; @@ -224,32 +181,7 @@ Suma::execSTTOR(Signal* signal) { DBUG_VOID_RETURN; } - if(startphase == 5) { - getNodeGroupMembers(signal); - if (g_TypeOfStart == NodeState::ST_NODE_RESTART) { - jam(); - for (Uint32 i = 0; i < c_noNodesInGroup; i++) { - Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]); - if (ref != reference()) - sendSignal(ref, GSN_SUMA_START_ME, signal, - 1 /*SumaStartMe::SignalLength*/, JBB); - } - } - } - if(startphase == 7) { - c_restartLock = false; // may be set false earlier with HANDOVER_REQ - - if (g_TypeOfStart != NodeState::ST_NODE_RESTART) { - for( int i = 0; i < NO_OF_BUCKETS; i++) { - if (getResponsibleSumaNodeId(i) == refToNode(reference())) { - // I'm running this bucket - DBUG_PRINT("info",("bucket %u set to true", i)); - c_buckets[i].active = true; - } - } - } - if(g_TypeOfStart == NodeState::ST_INITIAL_START && c_masterNodeId == getOwnNodeId()) { jam(); @@ -364,282 +296,6 @@ SumaParticipant::execCONTINUEB(Signal* signal) * *****************************************************************************/ -void Suma::execAPI_FAILREQ(Signal* signal) -{ - jamEntry(); - DBUG_ENTER("Suma::execAPI_FAILREQ"); - Uint32 failedApiNode = signal->theData[0]; - //BlockReference retRef = signal->theData[1]; - - c_failedApiNodes.set(failedApiNode); - bool found = removeSubscribersOnNode(signal, failedApiNode); - - if(!found){ - jam(); - c_failedApiNodes.clear(failedApiNode); - } - DBUG_VOID_RETURN; -}//execAPI_FAILREQ() - -bool -SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId) -{ - DBUG_ENTER("SumaParticipant::removeSubscribersOnNode"); - bool found = false; - - SubscriberPtr i_subbPtr; - c_dataSubscribers.first(i_subbPtr); - while(!i_subbPtr.isNull()){ - SubscriberPtr subbPtr = i_subbPtr; - c_dataSubscribers.next(i_subbPtr); - jam(); - if (refToNode(subbPtr.p->m_subscriberRef) == nodeId) { - jam(); - c_dataSubscribers.remove(subbPtr); - c_removeDataSubscribers.add(subbPtr); - found = true; - } - } - if(found){ - jam(); - sendSubStopReq(signal); - } - DBUG_RETURN(found); -} - -void -SumaParticipant::sendSubStopReq(Signal *signal, bool unlock){ - DBUG_ENTER("SumaParticipant::sendSubStopReq"); - static bool remove_lock = false; - jam(); - - SubscriberPtr subbPtr; - c_removeDataSubscribers.first(subbPtr); - if (subbPtr.isNull()){ - jam(); -#if 0 - signal->theData[0] = failedApiNode; - signal->theData[1] = reference(); - sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB); -#endif - c_failedApiNodes.clear(); - - remove_lock = false; - DBUG_VOID_RETURN; - } - - if(remove_lock && !unlock) { - jam(); - DBUG_VOID_RETURN; - } - remove_lock = true; - - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); - - SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend(); - req->senderRef = reference(); - req->senderData = subbPtr.i; - req->subscriberRef = subbPtr.p->m_subscriberRef; - req->subscriberData = subbPtr.p->m_subscriberData; - req->subscriptionId = subPtr.p->m_subscriptionId; - req->subscriptionKey = subPtr.p->m_subscriptionKey; - req->part = SubscriptionData::TableData; - - sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB); - DBUG_VOID_RETURN; -} - -void -SumaParticipant::execSUB_STOP_CONF(Signal* signal){ - jamEntry(); - DBUG_ENTER("SumaParticipant::execSUB_STOP_CONF"); - - SubStopConf * const conf = (SubStopConf*)signal->getDataPtr(); - - // Uint32 subscriberData = conf->subscriberData; - // Uint32 subscriberRef = conf->subscriberRef; - - Subscription key; - key.m_subscriptionId = conf->subscriptionId; - key.m_subscriptionKey = conf->subscriptionKey; - - SubscriptionPtr subPtr; - if(c_subscriptions.find(subPtr, key)) { - jam(); - if (subPtr.p->m_markRemove) { - jam(); - ndbrequire(false); - ndbrequire(subPtr.p->m_nSubscribers > 0); - subPtr.p->m_nSubscribers--; - if (subPtr.p->m_nSubscribers == 0){ - jam(); - completeSubRemoveReq(signal, subPtr); - } - } - } - - sendSubStopReq(signal,true); - DBUG_VOID_RETURN; -} - -void -SumaParticipant::execSUB_STOP_REF(Signal* signal){ - jamEntry(); - DBUG_ENTER("SumaParticipant::execSUB_STOP_REF"); - - SubStopRef * const ref = (SubStopRef*)signal->getDataPtr(); - - Uint32 subscriptionId = ref->subscriptionId; - Uint32 subscriptionKey = ref->subscriptionKey; - Uint32 part = ref->part; - Uint32 subscriberData = ref->subscriberData; - Uint32 subscriberRef = ref->subscriberRef; - // Uint32 err = ref->err; - - if(!ref->isTemporary()){ - ndbrequire(false); - } - - SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend(); - req->subscriberRef = subscriberRef; - req->subscriberData = subscriberData; - req->subscriptionId = subscriptionId; - req->subscriptionKey = subscriptionKey; - req->part = part; - - sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB); - - DBUG_VOID_RETURN; -} - -void -Suma::execNODE_FAILREP(Signal* signal){ - jamEntry(); - DBUG_ENTER("Suma::execNODE_FAILREP"); - - NodeFailRep * const rep = (NodeFailRep*)signal->getDataPtr(); - - bool changed = false; - - NodePtr nodePtr; -#ifdef NODEFAIL_DEBUG - ndbout_c("Suma: nodefailrep"); -#endif - c_nodeFailGCI = getFirstGCI(signal); - - for(c_nodes.first(nodePtr); nodePtr.i != RNIL; c_nodes.next(nodePtr)){ - if(NodeBitmask::get(rep->theNodes, nodePtr.p->nodeId)){ - if(nodePtr.p->alive){ - ndbassert(c_aliveNodes.get(nodePtr.p->nodeId)); - changed = true; - jam(); - } else { - ndbassert(!c_aliveNodes.get(nodePtr.p->nodeId)); - jam(); - } - - if (c_preparingNodes.get(nodePtr.p->nodeId)) { - jam(); - // we are currently preparing this node that died - // it's ok just to clear and go back to waiting for it to start up - Restart.resetNode(calcSumaBlockRef(nodePtr.p->nodeId)); - c_preparingNodes.clear(nodePtr.p->nodeId); - } else if (c_handoverToDo) { - jam(); - // TODO what if I'm a SUMA that is currently restarting and the SUMA - // responsible for restarting me is the one that died? - - // a node has failed whilst handover is going on - // let's check if we're in the process of handover with that node - c_handoverToDo = false; - for( int i = 0; i < NO_OF_BUCKETS; i++) { - if (c_buckets[i].handover) { - // I'm doing handover, but is it with the dead node? - if (getResponsibleSumaNodeId(i) == nodePtr.p->nodeId) { - // so it was the dead node, has handover started? - if (c_buckets[i].handover_started) { - jam(); - // we're not ok and will have lost data! - // set not active to indicate this - - // this will generate takeover behaviour - c_buckets[i].active = false; - c_buckets[i].handover_started = false; - } // else we're ok to revert back to state before - c_buckets[i].handover = false; - } else { - jam(); - // ok, we're doing handover with a different node - c_handoverToDo = true; - } - } - } - } - - c_failoverBuffer.nodeFailRep(); - - nodePtr.p->alive = 0; - c_aliveNodes.clear(nodePtr.p->nodeId); // this has to be done after the loop above - } - } - DBUG_VOID_RETURN; -} - -void -Suma::execINCL_NODEREQ(Signal* signal){ - jamEntry(); - - //const Uint32 senderRef = signal->theData[0]; - const Uint32 inclNode = signal->theData[1]; - - NodePtr node; - for(c_nodes.first(node); node.i != RNIL; c_nodes.next(node)){ - jam(); - const Uint32 nodeId = node.p->nodeId; - if(inclNode == nodeId){ - jam(); - - ndbrequire(node.p->alive == 0); - ndbrequire(!c_aliveNodes.get(nodeId)); - - for (Uint32 j = 0; j < c_noNodesInGroup; j++) { - jam(); - if (c_nodesInGroup[j] == nodeId) { - // the starting node is part of my node group - jam(); - c_preparingNodes.set(nodeId); // set as being prepared - for (Uint32 i = 0; i < c_noNodesInGroup; i++) { - jam(); - if (i == c_idInNodeGroup) { - jam(); - // I'm responsible for restarting this SUMA - // ALL dict's should have meta data info so it is ok to start - Restart.startNode(signal, calcSumaBlockRef(nodeId)); - break; - }//if - if (c_aliveNodes.get(c_nodesInGroup[i])) { - jam(); - break; // another Suma takes care of this - }//if - }//for - break; - }//if - }//for - - node.p->alive = 1; - c_aliveNodes.set(nodeId); - - break; - }//if - }//for - -#if 0 // if we include this DIH's got to be prepared, later if needed... - signal->theData[0] = reference(); - - sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB); -#endif -} - void Suma::execSIGNAL_DROPPED_REP(Signal* signal){ jamEntry(); @@ -685,10 +341,6 @@ Suma::execDUMP_STATE_ORD(Signal* signal){ syncPtr.p->startScan(signal); } - if(tCase == 8002){ - syncPtr.p->startTrigger(signal); - } - if(tCase == 8003){ subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan; LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList); @@ -1154,26 +806,6 @@ SumaParticipant::sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint3 return; } - - - - - - - - - - - -Uint32 -SumaParticipant::getFirstGCI(Signal* signal) { - if (c_lastCompleteGCI == RNIL) { - ndbout_c("WARNING: c_lastCompleteGCI == RNIL"); - return 0; - } - return c_lastCompleteGCI+3; -} - /********************************************************** * * Setting upp trigger for subscription @@ -1219,27 +851,6 @@ SumaParticipant::execSUB_SYNC_REQ(Signal* signal) { case SubscriptionData::MetaData: ok = true; jam(); - if (subPtr.p->m_subscriptionType == SubCreateReq::DatabaseSnapshot) { - TableList::DataBufferIterator it; - syncPtr.p->m_tableList.first(it); - if(it.isNull()) { - /** - * Get all tables from dict - */ - ListTablesReq * req = (ListTablesReq*)signal->getDataPtrSend(); - req->senderRef = reference(); - req->senderData = syncPtr.i; - req->requestData = 0; - /** - * @todo: accomodate scan of index tables? - */ - req->setTableType(DictTabInfo::UserTable); - - sendSignal(DBDICT_REF, GSN_LIST_TABLES_REQ, signal, - ListTablesReq::SignalLength, JBB); - break; - } - } syncPtr.p->startMeta(signal); break; @@ -1274,16 +885,6 @@ SumaParticipant::sendSubSyncRef(Signal* signal, Uint32 errCode){ */ void -SumaParticipant::execLIST_TABLES_CONF(Signal* signal){ - jamEntry(); - CRASH_INSERTION(13005); - ListTablesConf* const conf = (ListTablesConf*)signal->getDataPtr(); - SyncRecord* tmp = c_syncPool.getPtr(conf->senderData); - tmp->runLIST_TABLES_CONF(signal); -} - - -void SumaParticipant::execGET_TABINFOREF(Signal* signal){ jamEntry(); GetTabInfoRef* const ref = (GetTabInfoRef*)signal->getDataPtr(); @@ -1491,113 +1092,12 @@ SumaParticipant::execDIGETPRIMCONF(Signal* signal){ tmp->runDIGETPRIMCONF(signal); } -void -SumaParticipant::execCREATE_TRIG_CONF(Signal* signal){ - jamEntry(); - DBUG_ENTER("SumaParticipant::execCREATE_TRIG_CONF"); - CRASH_INSERTION(13009); - - CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr(); - - const Uint32 senderData = conf->getConnectionPtr(); - SyncRecord* tmp = c_syncPool.getPtr(senderData); - tmp->runCREATE_TRIG_CONF(signal); - - /** - * dodido - * @todo: I (Johan) dont know what to do here. Jonas, what do you mean? - */ - DBUG_VOID_RETURN; -} - -void -SumaParticipant::execCREATE_TRIG_REF(Signal* signal){ - jamEntry(); - ndbrequire(false); -} - -void -SumaParticipant::execDROP_TRIG_CONF(Signal* signal){ - jamEntry(); - DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF"); - CRASH_INSERTION(13010); - - DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr(); - - const Uint32 senderData = conf->getConnectionPtr(); - SyncRecord* tmp = c_syncPool.getPtr(senderData); - tmp->runDROP_TRIG_CONF(signal); - DBUG_VOID_RETURN; -} - -void -SumaParticipant::execDROP_TRIG_REF(Signal* signal){ - jamEntry(); - DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF"); - DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr(); - - const Uint32 senderData = ref->getConnectionPtr(); - SyncRecord* tmp = c_syncPool.getPtr(senderData); - tmp->runDROP_TRIG_CONF(signal); - DBUG_VOID_RETURN; -} - /************************************************************************* * * */ void -SumaParticipant::SyncRecord::runLIST_TABLES_CONF(Signal* signal){ - jam(); - - ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtr(); - const Uint32 len = signal->length() - ListTablesConf::HeaderLength; - - SubscriptionPtr subPtr; - suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); - - for (unsigned i = 0; i < len; i++) { - subPtr.p->m_maxTables++; - suma.addTableId(ListTablesConf::getTableId(conf->tableData[i]), subPtr, this); - } - - // for (unsigned i = 0; i < len; i++) - // conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]); - // m_tableList.append(&conf->tableData[0], len); - -#if 0 - TableList::DataBufferIterator it; - int i = 0; - for(m_tableList.first(it);!it.isNull();m_tableList.next(it)) { - ndbout_c("%u listtableconf tableid %d", i++, *it.data); - } -#endif - - if(len == ListTablesConf::DataLength){ - jam(); - // we expect more LIST_TABLE_CONF - return; - } - -#if 0 - subPtr.p->m_currentTable = 0; - subPtr.p->m_maxTables = 0; - - TableList::DataBufferIterator it; - for(m_tableList.first(it); !it.isNull(); m_tableList.next(it)) { - subPtr.p->m_maxTables++; - suma.addTableId(*it.data, subPtr, NULL); -#ifdef NODEFAIL_DEBUG - ndbout_c(" listtableconf tableid %d",*it.data); -#endif - } -#endif - - startMeta(signal); -} - -void SumaParticipant::SyncRecord::startMeta(Signal* signal){ jam(); m_currentTable = 0; @@ -1696,18 +1196,6 @@ SumaParticipant::SyncRecord::runGET_TABINFO_CONF(Signal* signal){ SegmentedSectionPtr ptr; signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO); - SubMetaData * data = (SubMetaData*)signal->getDataPtrSend(); - /** - * sending lastCompleteGCI. Used by Lars in interval calculations - * incremenet by one, since last_CompleteGCI is the not the current gci. - */ - data->gci = suma.c_lastCompleteGCI + 1; - data->tableId = tableId; - data->senderData = subPtr.p->m_subscriberData; -#if PRINT_ONLY - ndbout_c("GSN_SUB_META_DATA Table %d", tableId); -#else - bool okToSend = m_doSendSyncData; /* @@ -1737,7 +1225,6 @@ SumaParticipant::SyncRecord::runGET_TABINFO_CONF(Signal* signal){ SubMetaData::SignalLength, JBB); } } -#endif TablePtr tabPtr; ndbrequire(suma.c_tables.find(tabPtr, tableId)); @@ -2113,513 +1600,6 @@ SumaParticipant::execSCAN_HBREP(Signal* signal){ } /********************************************************** - * - * Suma participant interface - * - * Creation of subscriber - * - */ - -void -SumaParticipant::execSUB_START_REQ(Signal* signal){ - jamEntry(); - DBUG_ENTER("SumaParticipant::execSUB_START_REQ"); - - CRASH_INSERTION(13013); - - if (c_restartLock) { - jam(); - // ndbout_c("c_restartLock"); - if (RtoI(signal->getSendersBlockRef(), false) == RNIL) { - jam(); - sendSubStartRef(signal, /** Error Code */ 0, true); - DBUG_VOID_RETURN; - } - // only allow other Suma's in the nodegroup to come through for restart purposes - } - - Subscription key; - - SubStartReq * const req = (SubStartReq*)signal->getDataPtr(); - - Uint32 senderRef = req->senderRef; - Uint32 senderData = req->senderData; - Uint32 subscriberData = req->subscriberData; - Uint32 subscriberRef = req->subscriberRef; - SubscriptionData::Part part = (SubscriptionData::Part)req->part; - key.m_subscriptionId = req->subscriptionId; - key.m_subscriptionKey = req->subscriptionKey; - - SubscriptionPtr subPtr; - if(!c_subscriptions.find(subPtr, key)){ - jam(); - sendSubStartRef(signal, /** Error Code */ 0); - DBUG_VOID_RETURN; - } - - Ptr<SyncRecord> syncPtr; - c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); - if (syncPtr.p->m_locked) { - jam(); -#if 0 - ndbout_c("Locked"); -#endif - sendSubStartRef(signal, /** Error Code */ 0, true); - DBUG_VOID_RETURN; - } - syncPtr.p->m_locked = true; - - SubscriberPtr subbPtr; - if(!c_subscriberPool.seize(subbPtr)){ - jam(); - syncPtr.p->m_locked = false; - sendSubStartRef(signal, /** Error Code */ 0); - DBUG_VOID_RETURN; - } - - Uint32 type = subPtr.p->m_subscriptionType; - - subbPtr.p->m_senderRef = senderRef; - subbPtr.p->m_senderData = senderData; - - switch (type) { - case SubCreateReq::TableEvent: - jam(); - // we want the data to return to the API not DICT - subbPtr.p->m_subscriberRef = subscriberRef; - // ndbout_c("start ref = %u", signal->getSendersBlockRef()); - // ndbout_c("ref = %u", subbPtr.p->m_subscriberRef); - // we use the subscription id for now, should really be API choice - subbPtr.p->m_subscriberData = subscriberData; - -#if 0 - if (RtoI(signal->getSendersBlockRef(), false) == RNIL) { - jam(); - for (Uint32 i = 0; i < c_noNodesInGroup; i++) { - Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]); - if (ref != reference()) { - jam(); - sendSubStartReq(subPtr, subbPtr, signal, ref); - } else - jam(); - } - } -#endif - break; - case SubCreateReq::DatabaseSnapshot: - case SubCreateReq::SelectiveTableSnapshot: - jam(); - ndbrequire(false); - //subbPtr.p->m_subscriberRef = GREP_REF; - subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData; - break; - case SubCreateReq::SingleTableScan: - jam(); - subbPtr.p->m_subscriberRef = subPtr.p->m_subscriberRef; - subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData; - } - - subbPtr.p->m_subPtrI = subPtr.i; - subbPtr.p->m_firstGCI = RNIL; - if (type == SubCreateReq::TableEvent) - subbPtr.p->m_lastGCI = 0; - else - subbPtr.p->m_lastGCI = RNIL; // disable usage of m_lastGCI - bool ok = false; - - switch(part){ - case SubscriptionData::MetaData: - ok = true; - jam(); - c_metaSubscribers.add(subbPtr); - sendSubStartComplete(signal, subbPtr, 0, part); - break; - case SubscriptionData::TableData: - ok = true; - jam(); - c_prepDataSubscribers.add(subbPtr); - syncPtr.p->startTrigger(signal); - break; - } - ndbrequire(ok); - DBUG_VOID_RETURN; -} - -void -SumaParticipant::sendSubStartComplete(Signal* signal, - SubscriberPtr subbPtr, - Uint32 firstGCI, - SubscriptionData::Part part){ - jam(); - - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); - - Ptr<SyncRecord> syncPtr; - c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); - syncPtr.p->m_locked = false; - - SubStartConf * const conf = (SubStartConf*)signal->getDataPtrSend(); - - conf->senderRef = reference(); - conf->senderData = subbPtr.p->m_senderData; - conf->subscriptionId = subPtr.p->m_subscriptionId; - conf->subscriptionKey = subPtr.p->m_subscriptionKey; - conf->firstGCI = firstGCI; - conf->part = (Uint32) part; - - conf->subscriberData = subPtr.p->m_subscriberData; - sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_CONF, signal, - SubStartConf::SignalLength, JBB); -} - -#if 0 -void -SumaParticipant::sendSubStartRef(SubscriptionPtr subPtr, - Signal* signal, Uint32 errCode, - bool temporary){ - jam(); - SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend(); - xxx ref->senderRef = reference(); - xxx ref->senderData = subPtr.p->m_senderData; - ref->subscriptionId = subPtr.p->m_subscriptionId; - ref->subscriptionKey = subPtr.p->m_subscriptionKey; - ref->part = (Uint32) subPtr.p->m_subscriptionType; - ref->subscriberData = subPtr.p->m_subscriberData; - ref->err = errCode; - if (temporary) { - jam(); - ref->setTemporary(); - } - releaseSections(signal); - sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_REF, signal, - SubStartRef::SignalLength, JBB); -} -#endif -void -SumaParticipant::sendSubStartRef(Signal* signal, Uint32 errCode, - bool temporary){ - jam(); - SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend(); - ref->senderRef = reference(); - ref->err = errCode; - if (temporary) { - jam(); - ref->setTemporary(); - } - releaseSections(signal); - sendSignal(signal->getSendersBlockRef(), GSN_SUB_START_REF, signal, - SubStartRef::SignalLength, JBB); -} - -/********************************************************** - * - * Trigger admin interface - * - */ - -void -SumaParticipant::SyncRecord::startTrigger(Signal* signal){ - jam(); - m_currentTable = 0; - m_latestTriggerId = RNIL; - nextTrigger(signal); -} - -void -SumaParticipant::SyncRecord::nextTrigger(Signal* signal){ - jam(); - - TableList::DataBufferIterator it; - - if(!m_tableList.position(it, m_currentTable)){ - completeTrigger(signal); - return; - } - - SubscriptionPtr subPtr; - suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); - ndbrequire(subPtr.p->m_syncPtrI == ptrI); - const Uint32 RT_BREAK = 48; - Uint32 latestTriggerId = 0; - for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){ - TablePtr tabPtr; -#if 0 - ndbout_c("nextTrigger tableid %u", *it.data); -#endif - ndbrequire(suma.c_tables.find(tabPtr, *it.data)); - - AttributeMask attrMask; - createAttributeMask(attrMask, tabPtr.p); - - for(Uint32 j = 0; j<3; j++){ - i++; - latestTriggerId = (tabPtr.p->m_schemaVersion << 18) | - (j << 16) | tabPtr.p->m_tableId; - if(tabPtr.p->m_hasTriggerDefined[j] == 0) { - ndbrequire(tabPtr.p->m_triggerIds[j] == ILLEGAL_TRIGGER_ID); -#if 0 - ndbout_c("DEFINING trigger on table %u[%u]", tabPtr.p->m_tableId, j); -#endif - CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend(); - req->setUserRef(SUMA_REF); - req->setConnectionPtr(ptrI); - req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE); - req->setTriggerActionTime(TriggerActionTime::TA_DETACHED); - req->setMonitorReplicas(true); - req->setMonitorAllAttributes(false); - req->setReceiverRef(SUMA_REF); - req->setTriggerId(latestTriggerId); - req->setTriggerEvent((TriggerEvent::Value)j); - req->setTableId(tabPtr.p->m_tableId); - req->setAttributeMask(attrMask); - suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ, - signal, CreateTrigReq::SignalLength, JBB); - - } else { - /** - * Faking that a trigger has been created in order to - * simulate the proper behaviour. - * Perhaps this should be a dummy signal instead of - * (ab)using CREATE_TRIG_CONF. - */ - CreateTrigConf * conf = (CreateTrigConf*)signal->getDataPtrSend(); - conf->setConnectionPtr(ptrI); - conf->setTableId(tabPtr.p->m_tableId); - conf->setTriggerId(latestTriggerId); - suma.sendSignal(SUMA_REF,GSN_CREATE_TRIG_CONF, - signal, CreateTrigConf::SignalLength, JBB); - - } - - } - m_currentTable++; - } - m_latestTriggerId = latestTriggerId; -} - -void -SumaParticipant::SyncRecord::createAttributeMask(AttributeMask& mask, - Table * table){ - jam(); - mask.clear(); - DataBuffer<15>::DataBufferIterator it; - LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, table->m_attributes); - for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){ - mask.set(* it.data); - } -} - -void -SumaParticipant::SyncRecord::runCREATE_TRIG_CONF(Signal* signal){ - jam(); - - CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr(); - const Uint32 triggerId = conf->getTriggerId(); - Uint32 type = (triggerId >> 16) & 0x3; - Uint32 tableId = conf->getTableId(); - - TablePtr tabPtr; - ndbrequire(suma.c_tables.find(tabPtr, tableId)); - - ndbrequire(type < 3); - tabPtr.p->m_triggerIds[type] = triggerId; - tabPtr.p->m_hasTriggerDefined[type]++; - - if(triggerId == m_latestTriggerId){ - jam(); - nextTrigger(signal); - } -} - -void -SumaParticipant::SyncRecord::completeTrigger(Signal* signal){ - jam(); - SubscriptionPtr subPtr; - CRASH_INSERTION(13013); -#ifdef EVENT_PH3_DEBUG - ndbout_c("SumaParticipant: trigger completed"); -#endif - Uint32 gci; - suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); - ndbrequire(subPtr.p->m_syncPtrI == ptrI); - - SubscriberPtr subbPtr; - { - bool found = false; - - for(suma.c_prepDataSubscribers.first(subbPtr); - !subbPtr.isNull(); suma.c_prepDataSubscribers.next(subbPtr)) { - jam(); - if(subbPtr.p->m_subPtrI == subPtr.i) { - jam(); - found = true; - break; - } - } - ndbrequire(found); - gci = suma.getFirstGCI(signal); - subbPtr.p->m_firstGCI = gci; - suma.c_prepDataSubscribers.remove(subbPtr); - suma.c_dataSubscribers.add(subbPtr); - } - suma.sendSubStartComplete(signal, subbPtr, gci, SubscriptionData::TableData); -} - -void -SumaParticipant::SyncRecord::startDropTrigger(Signal* signal){ - jam(); - m_currentTable = 0; - m_latestTriggerId = RNIL; - nextDropTrigger(signal); -} - -void -SumaParticipant::SyncRecord::nextDropTrigger(Signal* signal){ - jam(); - - TableList::DataBufferIterator it; - - if(!m_tableList.position(it, m_currentTable)){ - completeDropTrigger(signal); - return; - } - - SubscriptionPtr subPtr; - suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); - ndbrequire(subPtr.p->m_syncPtrI == ptrI); - - const Uint32 RT_BREAK = 48; - Uint32 latestTriggerId = 0; - for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){ - jam(); - TablePtr tabPtr; -#if 0 - ndbout_c("nextDropTrigger tableid %u", *it.data); -#endif - ndbrequire(suma.c_tables.find(tabPtr, * it.data)); - - for(Uint32 j = 0; j<3; j++){ - jam(); - ndbrequire(tabPtr.p->m_triggerIds[j] != ILLEGAL_TRIGGER_ID); - i++; - latestTriggerId = tabPtr.p->m_triggerIds[j]; - if(tabPtr.p->m_hasTriggerDefined[j] == 1) { - jam(); - - DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend(); - req->setConnectionPtr(ptrI); - req->setUserRef(SUMA_REF); // Sending to myself - req->setRequestType(DropTrigReq::RT_USER); - req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE); - req->setTriggerActionTime(TriggerActionTime::TA_DETACHED); - req->setIndexId(RNIL); - - req->setTableId(tabPtr.p->m_tableId); - req->setTriggerId(latestTriggerId); - req->setTriggerEvent((TriggerEvent::Value)j); - -#if 0 - ndbout_c("DROPPING trigger %u = %u %u %u on table %u[%u]", - latestTriggerId,TriggerType::SUBSCRIPTION_BEFORE, - TriggerActionTime::TA_DETACHED, j, tabPtr.p->m_tableId, j); -#endif - suma.sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ, - signal, DropTrigReq::SignalLength, JBB); - } else { - jam(); - ndbrequire(tabPtr.p->m_hasTriggerDefined[j] > 1); - /** - * Faking that a trigger has been dropped in order to - * simulate the proper behaviour. - * Perhaps this should be a dummy signal instead of - * (ab)using DROP_TRIG_CONF. - */ - DropTrigConf * conf = (DropTrigConf*)signal->getDataPtrSend(); - conf->setConnectionPtr(ptrI); - conf->setTableId(tabPtr.p->m_tableId); - conf->setTriggerId(latestTriggerId); - suma.sendSignal(SUMA_REF,GSN_DROP_TRIG_CONF, - signal, DropTrigConf::SignalLength, JBB); - } - } - m_currentTable++; - } - m_latestTriggerId = latestTriggerId; -} - -void -SumaParticipant::SyncRecord::runDROP_TRIG_REF(Signal* signal){ - jam(); - DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr(); - if (ref->getErrorCode() != DropTrigRef::TriggerNotFound){ - ndbrequire(false); - } - const Uint32 triggerId = ref->getTriggerId(); - Uint32 tableId = ref->getTableId(); - runDropTrig(signal, triggerId, tableId); -} - -void -SumaParticipant::SyncRecord::runDROP_TRIG_CONF(Signal* signal){ - jam(); - - DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr(); - const Uint32 triggerId = conf->getTriggerId(); - Uint32 tableId = conf->getTableId(); - runDropTrig(signal, triggerId, tableId); -} - -void -SumaParticipant::SyncRecord::runDropTrig(Signal* signal, - Uint32 triggerId, - Uint32 tableId){ - Uint32 type = (triggerId >> 16) & 0x3; - - TablePtr tabPtr; - ndbrequire(suma.c_tables.find(tabPtr, tableId)); - - ndbrequire(type < 3); - ndbrequire(tabPtr.p->m_triggerIds[type] == triggerId); - tabPtr.p->m_hasTriggerDefined[type]--; - if (tabPtr.p->m_hasTriggerDefined[type] == 0) { - jam(); - tabPtr.p->m_triggerIds[type] = ILLEGAL_TRIGGER_ID; - } - if(triggerId == m_latestTriggerId){ - jam(); - nextDropTrigger(signal); - } -} - -void -SumaParticipant::SyncRecord::completeDropTrigger(Signal* signal){ - jam(); - SubscriptionPtr subPtr; - CRASH_INSERTION(13014); -#if 0 - ndbout_c("trigger completed"); -#endif - - suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); - ndbrequire(subPtr.p->m_syncPtrI == ptrI); - - bool found = false; - SubscriberPtr subbPtr; - for(suma.c_prepDataSubscribers.first(subbPtr); - !subbPtr.isNull(); suma.c_prepDataSubscribers.next(subbPtr)) { - jam(); - if(subbPtr.p->m_subPtrI == subPtr.i) { - jam(); - found = true; - break; - } - } - ndbrequire(found); - suma.sendSubStopComplete(signal, subbPtr); -} - -/********************************************************** * Scan data interface * * Assumption: one execTRANSID_AI contains all attr info @@ -2712,710 +1692,6 @@ SumaParticipant::execTRANSID_AI(Signal* signal){ f_bufferLock = 0; } -/********************************************************** - * - * Trigger data interface - * - */ - -void -SumaParticipant::execTRIG_ATTRINFO(Signal* signal){ - jamEntry(); - - CRASH_INSERTION(13016); - TrigAttrInfo* const trg = (TrigAttrInfo*)signal->getDataPtr(); - const Uint32 trigId = trg->getTriggerId(); - - const Uint32 dataLen = signal->length() - TrigAttrInfo::StaticLength; - - if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES){ - jam(); - - ndbrequire(b_bufferLock == trigId); - - memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen); - b_trigBufferSize += dataLen; - // printf("before values %u %u %u\n",trigId, dataLen, b_trigBufferSize); - } else { - jam(); - - if(f_bufferLock == 0){ - f_bufferLock = trigId; - f_trigBufferSize = 0; - b_bufferLock = trigId; - b_trigBufferSize = 0; - } else { - ndbrequire(f_bufferLock == trigId); - } - - memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen); - f_trigBufferSize += dataLen; - } -} - -#ifdef NODEFAIL_DEBUG2 -static int theCounts[64] = {0}; -#endif - -Uint32 -Suma::getStoreBucket(Uint32 v) -{ - // id will contain id to responsible suma or - // RNIL if we don't have nodegroup info yet - - const Uint32 N = NO_OF_BUCKETS; - const Uint32 D = v % N; // Distibution key - return D; -} - -Uint32 -Suma::getResponsibleSumaNodeId(Uint32 D) -{ - // id will contain id to responsible suma or - // RNIL if we don't have nodegroup info yet - - Uint32 id; - - if (c_restartLock) { - jam(); - // ndbout_c("c_restartLock"); - id = RNIL; - } else { - jam(); - id = RNIL; - const Uint32 n = c_noNodesInGroup; // Number nodes in node group - const Uint32 C1 = D / n; - const Uint32 C2 = D - C1*n; // = D % n; - const Uint32 C = C2 + C1 % n; - for (Uint32 i = 0; i < n; i++) { - jam(); - id = c_nodesInGroup[(C + i) % n]; - if (c_aliveNodes.get(id) && - !c_preparingNodes.get(id)) { - jam(); - break; - }//if - } -#ifdef NODEFAIL_DEBUG2 - theCounts[id]++; - ndbout_c("Suma:responsible n=%u, D=%u, id = %u, count=%u", - n,D, id, theCounts[id]); -#endif - } - return id; -} - -Uint32 -SumaParticipant::decideWhoToSend(Uint32 nBucket, Uint32 gci){ - bool replicaFlag = true; - Uint32 nId = RNIL; - - // bucket active/not active set by GCP_COMPLETE - if (c_buckets[nBucket].active) { - if (c_buckets[nBucket].handover && c_buckets[nBucket].handoverGCI <= gci) { - jam(); - replicaFlag = true; // let the other node send this - nId = RNIL; - // mark this as started, if we get a node failiure now we have some lost stuff - c_buckets[nBucket].handover_started = true; - } else { - jam(); - replicaFlag = false; - nId = refToNode(reference()); - } - } else { - nId = getResponsibleSumaNodeId(nBucket); - replicaFlag = !(nId == refToNode(reference())); - - if (!replicaFlag) { - if (!c_buckets[nBucket].handover) { - jam(); - // appearently a node has failed and we are taking over sending - // from that bucket. Now we need to go back to latest completed - // GCI. Handling will depend on Subscriber and Subscription - - // TODO, for now we make an easy takeover - if (gci < c_nodeFailGCI) - c_lastInconsistentGCI = gci; - - // we now have responsability for this bucket and we're actively - // sending from that - c_buckets[nBucket].active = true; -#ifdef HANDOVER_DEBUG - ndbout_c("Takeover Bucket %u", nBucket); -#endif - } else if (c_buckets[nBucket].handoverGCI > gci) { - jam(); - replicaFlag = true; // handover going on, but don't start sending yet - nId = RNIL; - } else { - jam(); -#ifdef HANDOVER_DEBUG - ndbout_c("Possible error: Will send from GCI = %u", gci); -#endif - } - } - } - -#ifdef NODEFAIL_DEBUG2 - ndbout_c("Suma:bucket %u, responsible id = %u, replicaFlag = %u", - nBucket, nId, (Uint32)replicaFlag); -#endif - return replicaFlag; -} - -void -SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){ - jamEntry(); - DBUG_ENTER("SumaParticipant::execFIRE_TRIG_ORD"); - CRASH_INSERTION(13016); - FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr(); - const Uint32 trigId = trg->getTriggerId(); - const Uint32 hashValue = trg->getHashValue(); - const Uint32 gci = trg->getGCI(); - const Uint32 event = trg->getTriggerEvent(); - const Uint32 triggerId = trg->getTriggerId(); - Uint32 tableId = triggerId & 0xFFFF; - - ndbrequire(f_bufferLock == trigId); - -#ifdef EVENT_DEBUG2 - ndbout_c("SumaParticipant::execFIRE_TRIG_ORD"); -#endif - - Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords(); - ndbrequire(sz == f_trigBufferSize); - - /** - * Reformat as "all headers" + "all data" - */ - Uint32 dataLen = 0; - Uint32 noOfAttrs = 0; - Uint32 * src = f_buffer; - Uint32 * headers = signal->theData + 25; - Uint32 * dst = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE; - - LinearSectionPtr ptr[3]; - int nptr; - - ptr[0].p = headers; - ptr[1].p = dst; - - while(sz > 0){ - jam(); - Uint32 tmp = * src ++; - * headers ++ = tmp; - Uint32 len = AttributeHeader::getDataSize(tmp); - memcpy(dst, src, 4 * len); - dst += len; - src += len; - - noOfAttrs++; - dataLen += len; - sz -= (1 + len); - } - ndbrequire(sz == 0); - - ptr[0].sz = noOfAttrs; - ptr[1].sz = dataLen; - - if (b_trigBufferSize > 0) { - jam(); - ptr[2].p = b_buffer; - ptr[2].sz = b_trigBufferSize; - nptr = 3; - } else { - jam(); - nptr = 2; - } - - // right now only for tableEvent - bool replicaFlag = decideWhoToSend(getStoreBucket(hashValue), gci); - - /** - * Signal to subscriber(s) - */ - SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg; - data->gci = gci; - data->tableId = tableId; - data->operation = event; - data->noOfAttributes = noOfAttrs; - data->dataSize = dataLen; - - SubscriberPtr subbPtr; - for(c_dataSubscribers.first(subbPtr); !subbPtr.isNull(); - c_dataSubscribers.next(subbPtr)){ - if (subbPtr.p->m_firstGCI > gci) { -#ifdef EVENT_DEBUG - ndbout_c("m_firstGCI = %u, gci = %u", subbPtr.p->m_firstGCI, gci); -#endif - jam(); - // we're either restarting or it's a newly created subscriber - // and waiting for the right gci - continue; - } - - jam(); - - const Uint32 ref = subbPtr.p->m_subscriberRef; - // ndbout_c("ref = %u", ref); - const Uint32 subdata = subbPtr.p->m_subscriberData; - data->senderData = subdata; - /* - * get subscription ptr for this subscriber - */ - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); - - if(!subPtr.p->m_tables[tableId]) { - jam(); - continue; - //continue in for-loop if the table is not part of - //the subscription. Otherwise, send data to subscriber. - } - - if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) { - if (replicaFlag) { - jam(); - c_failoverBuffer.subTableData(gci,NULL,0); - continue; - } - jam(); - Uint32 tmp = data->logType; - if (c_lastInconsistentGCI == data->gci) { - data->setGCINotConsistent(); - } - -#ifdef HANDOVER_DEBUG - { - static int aLongGCIName = 0; - if (data->gci != aLongGCIName) { - aLongGCIName = data->gci; - ndbout_c("sent from GCI = %u", aLongGCIName); - } - } -#endif - DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d", refToNode(ref))); - sendSignal(ref, GSN_SUB_TABLE_DATA, signal, - SubTableData::SignalLength, JBB, ptr, nptr); - data->logType = tmp; - } else { - ndbassert(refToNode(ref) == 0 || refToNode(ref) == getOwnNodeId()); - jam(); -#if PRINT_ONLY - ndbout_c("GSN_SUB_TABLE_DATA to %s: op: %d #attr: %d len: %d", - getBlockName(refToBlock(ref)), - noOfAttrs, dataLen); - -#else -#ifdef HANDOVER_DEBUG - { - static int aLongGCIName2 = 0; - if (data->gci != aLongGCIName2) { - aLongGCIName2 = data->gci; - ndbout_c("(EXECUTE_DIRECT) sent from GCI = %u to %u", aLongGCIName2, ref); - } - } -#endif - EXECUTE_DIRECT(refToBlock(ref), GSN_SUB_TABLE_DATA, signal, - SubTableData::SignalLength); - jamEntry(); -#endif - } - } - - /** - * Reset f_bufferLock - */ - f_bufferLock = 0; - b_bufferLock = 0; - - DBUG_VOID_RETURN; -} - -void -SumaParticipant::execSUB_GCP_COMPLETE_REP(Signal* signal){ - jamEntry(); - - SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend(); - - Uint32 gci = rep->gci; - c_lastCompleteGCI = gci; - - /** - * Signal to subscriber(s) - */ - - SubscriberPtr subbPtr; - SubscriptionPtr subPtr; - c_dataSubscribers.first(subbPtr); - for(; !subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){ - - if (subbPtr.p->m_firstGCI > gci) { - jam(); - // we don't send SUB_GCP_COMPLETE_REP for incomplete GCI's - continue; - } - - const Uint32 ref = subbPtr.p->m_subscriberRef; - rep->senderRef = ref; - rep->senderData = subbPtr.p->m_subscriberData; - - c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); -#if PRINT_ONLY - ndbout_c("GSN_SUB_GCP_COMPLETE_REP to %s:", - getBlockName(refToBlock(ref))); -#else - - CRASH_INSERTION(13018); - - if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) - { - jam(); - sendSignal(ref, GSN_SUB_GCP_COMPLETE_REP, signal, - SubGcpCompleteRep::SignalLength, JBB); - } - else - { - jam(); - ndbassert(refToNode(ref) == 0 || refToNode(ref) == getOwnNodeId()); - EXECUTE_DIRECT(refToBlock(ref), GSN_SUB_GCP_COMPLETE_REP, signal, - SubGcpCompleteRep::SignalLength); - jamEntry(); - } -#endif - } - - if (c_handoverToDo) { - jam(); - c_handoverToDo = false; - for( int i = 0; i < NO_OF_BUCKETS; i++) { - if (c_buckets[i].handover) { - if (c_buckets[i].handoverGCI > gci) { - jam(); - c_handoverToDo = true; // still waiting for the right GCI - break; /* since all handover should happen at the same time - * we can break here - */ - } else { - c_buckets[i].handover = false; -#ifdef HANDOVER_DEBUG - ndbout_c("Handover Bucket %u", i); -#endif - if (getResponsibleSumaNodeId(i) == refToNode(reference())) { - // my bucket to be handed over to me - ndbrequire(!c_buckets[i].active); - jam(); - c_buckets[i].active = true; - } else { - // someone else's bucket to handover to - ndbrequire(c_buckets[i].active); - jam(); - c_buckets[i].active = false; - } - } - } - } - } -} - -/*********************************************************** - * - * Embryo to syncronize the Suma's so as to know if a subscriber - * has received a GCP_COMPLETE from all suma's or not - * - */ - -void -SumaParticipant::runSUB_GCP_COMPLETE_ACC(Signal* signal){ - jam(); - - SubGcpCompleteAcc * const acc = (SubGcpCompleteAcc*)signal->getDataPtr(); - - Uint32 gci = acc->rep.gci; - -#ifdef EVENT_DEBUG - ndbout_c("SumaParticipant::runSUB_GCP_COMPLETE_ACC gci = %u", gci); -#endif - - c_failoverBuffer.subGcpCompleteRep(gci); -} - -void -Suma::execSUB_GCP_COMPLETE_ACC(Signal* signal){ - jamEntry(); - - if (RtoI(signal->getSendersBlockRef(), false) != RNIL) { - jam(); - // Ack from other SUMA - runSUB_GCP_COMPLETE_ACC(signal); - return; - } - - jam(); - // Ack from User and not an acc from other SUMA, redistribute in nodegroup - - SubGcpCompleteAcc * const acc = (SubGcpCompleteAcc*)signal->getDataPtr(); - Uint32 gci = acc->rep.gci; - Uint32 senderRef = acc->rep.senderRef; - Uint32 subscriberData = acc->rep.subscriberData; - -#ifdef EVENT_DEBUG - ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC gci = %u", gci); -#endif - bool moreToCome = false; - - SubscriberPtr subbPtr; - for(c_dataSubscribers.first(subbPtr); - !subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){ -#ifdef EVENT_DEBUG - ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC %u == %u && %u == %u", - subbPtr.p->m_subscriberRef, - senderRef, - subbPtr.p->m_subscriberData, - subscriberData); -#endif - if (subbPtr.p->m_subscriberRef == senderRef && - subbPtr.p->m_subscriberData == subscriberData) { - jam(); -#ifdef EVENT_DEBUG - ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC gci = FOUND SUBSCRIBER"); -#endif - subbPtr.p->m_lastGCI = gci; - } else if (subbPtr.p->m_lastGCI < gci) { - jam(); - if (subbPtr.p->m_firstGCI <= gci) - moreToCome = true; - } else - jam(); - } - - if (!moreToCome) { - // tell the other SUMA's that I'm done with this GCI - jam(); - for (Uint32 i = 0; i < c_noNodesInGroup; i++) { - Uint32 id = c_nodesInGroup[i]; - Uint32 ref = calcSumaBlockRef(id); - if ((ref != reference()) && c_aliveNodes.get(id)) { - jam(); - sendSignal(ref, GSN_SUB_GCP_COMPLETE_ACC, signal, - SubGcpCompleteAcc::SignalLength, JBB); - } else - jam(); - } - } -} - -static Uint32 tmpFailoverBuffer[512]; -//SumaParticipant::FailoverBuffer::FailoverBuffer(DataBuffer<15>::DataBufferPool & p) -// : m_dataList(p), -SumaParticipant::FailoverBuffer::FailoverBuffer() - : - c_gcis(tmpFailoverBuffer), c_sz(512), c_first(0), c_next(0), c_full(false) -{ -} - -bool SumaParticipant::FailoverBuffer::subTableData(Uint32 gci, Uint32 *src, int sz) -{ - bool ok = true; - - if (c_full) { - ok = false; -#ifdef EVENT_DEBUG - ndbout_c("Suma::FailoverBuffer::SubTableData buffer full gci=%u"); -#endif - } else { - c_gcis[c_next] = gci; - c_next++; - if (c_next == c_sz) c_next = 0; - if (c_next == c_first) - c_full = true; - // ndbout_c("%u %u %u",c_first,c_next,c_sz); - } - return ok; -} -bool SumaParticipant::FailoverBuffer::subGcpCompleteRep(Uint32 gci) -{ - bool ok = true; - - // ndbout_c("Empty"); - while (true) { - if (c_first == c_next && !c_full) - break; - if (c_gcis[c_first] > gci) - break; - c_full = false; - c_first++; - if (c_first == c_sz) c_first = 0; - // ndbout_c("%u %u %u : ",c_first,c_next,c_sz); - } - - return ok; -} -bool SumaParticipant::FailoverBuffer::nodeFailRep() -{ - bool ok = true; - while (true) { - if (c_first == c_next && !c_full) - break; - -#ifdef EVENT_DEBUG - ndbout_c("Suma::FailoverBuffer::NodeFailRep resending gci=%u", c_gcis[c_first]); -#endif - c_full = false; - c_first++; - if (c_first == c_sz) c_first = 0; - } - return ok; -} - -/********************************************************** - * Suma participant interface - * - * Stopping and removing of subscriber - * - */ - -void -SumaParticipant::execSUB_STOP_REQ(Signal* signal){ - jamEntry(); - DBUG_ENTER("SumaParticipant::execSUB_STOP_REQ"); - - CRASH_INSERTION(13019); - - SubStopReq * const req = (SubStopReq*)signal->getDataPtr(); - Uint32 senderRef = signal->getSendersBlockRef(); - Uint32 senderData = req->senderData; - Uint32 subscriberRef = req->subscriberRef; - Uint32 subscriberData = req->subscriberData; - SubscriptionPtr subPtr; - Subscription key; - key.m_subscriptionId = req->subscriptionId; - key.m_subscriptionKey = req->subscriptionKey; - Uint32 part = req->part; - - if (key.m_subscriptionKey == 0 && - key.m_subscriptionId == 0 && - subscriberData == 0) { - SubStopConf* conf = (SubStopConf*)signal->getDataPtrSend(); - - conf->senderRef = reference(); - conf->senderData = senderData; - conf->subscriptionId = key.m_subscriptionId; - conf->subscriptionKey = key.m_subscriptionKey; - conf->subscriberData = subscriberData; - - sendSignal(senderRef, GSN_SUB_STOP_CONF, signal, - SubStopConf::SignalLength, JBB); - - removeSubscribersOnNode(signal, refToNode(subscriberRef)); - DBUG_VOID_RETURN; - } - - if(!c_subscriptions.find(subPtr, key)){ - jam(); - sendSubStopRef(signal, GrepError::SUBSCRIPTION_ID_NOT_FOUND); - return; - } - - ndbrequire(part == SubscriptionData::TableData); - - SubscriberPtr subbPtr; - if (senderRef == reference()){ - jam(); - c_subscriberPool.getPtr(subbPtr, senderData); - ndbrequire(subbPtr.p->m_subPtrI == subPtr.i && - subbPtr.p->m_subscriberRef == subscriberRef && - subbPtr.p->m_subscriberData == subscriberData); - c_removeDataSubscribers.remove(subbPtr); - } else { - bool found = false; - jam(); - c_dataSubscribers.first(subbPtr); - for (;!subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){ - jam(); - if (subbPtr.p->m_subPtrI == subPtr.i && - refToNode(subbPtr.p->m_subscriberRef) == refToNode(subscriberRef) && - subbPtr.p->m_subscriberData == subscriberData){ - // ndbout_c("STOP_REQ: before c_dataSubscribers.release"); - jam(); - c_dataSubscribers.remove(subbPtr); - found = true; - break; - } - } - /** - * If we didn't find anyone, send ref - */ - if (!found) { - jam(); - sendSubStopRef(signal, GrepError::SUBSCRIBER_NOT_FOUND); - DBUG_VOID_RETURN; - } - } - - subbPtr.p->m_senderRef = senderRef; // store ref to requestor - subbPtr.p->m_senderData = senderData; // store ref to requestor - c_prepDataSubscribers.add(subbPtr); - - Ptr<SyncRecord> syncPtr; - c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); - if (syncPtr.p->m_locked) { - jam(); - sendSubStopRef(signal, /** Error Code */ 0, true); - DBUG_VOID_RETURN; - } - syncPtr.p->m_locked = true; - - syncPtr.p->startDropTrigger(signal); - DBUG_VOID_RETURN; -} - -void -SumaParticipant::sendSubStopComplete(Signal* signal, SubscriberPtr subbPtr){ - jam(); - - CRASH_INSERTION(13020); - - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); - - Ptr<SyncRecord> syncPtr; - c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); - syncPtr.p->m_locked = false; - - SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend(); - - conf->senderRef = reference(); - conf->senderData = subbPtr.p->m_senderData; - conf->subscriptionId = subPtr.p->m_subscriptionId; - conf->subscriptionKey = subPtr.p->m_subscriptionKey; - conf->subscriberData = subbPtr.p->m_subscriberData; - Uint32 senderRef = subbPtr.p->m_senderRef; - - c_prepDataSubscribers.release(subbPtr); - sendSignal(senderRef, GSN_SUB_STOP_CONF, signal, - SubStopConf::SignalLength, JBB); -} - -void -SumaParticipant::sendSubStopRef(Signal* signal, Uint32 errCode, - bool temporary){ - jam(); - SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend(); - ref->senderRef = reference(); - ref->errorCode = errCode; - if (temporary) { - ref->setTemporary(); - } - sendSignal(signal->getSendersBlockRef(), - GSN_SUB_STOP_REF, - signal, - SubStopRef::SignalLength, - JBB); - return; -} - /************************************************************** * * Removing subscription @@ -3446,36 +1722,6 @@ SumaParticipant::execSUB_REMOVE_REQ(Signal* signal) { { jam(); SubscriberPtr i_subbPtr; - for(c_prepDataSubscribers.first(i_subbPtr); - !i_subbPtr.isNull(); c_prepDataSubscribers.next(i_subbPtr)){ - jam(); - if( i_subbPtr.p->m_subPtrI == subPtr.i ) { - jam(); - sendSubRemoveRef(signal, req, /* ErrorCode */ 0, true); - return; - // c_prepDataSubscribers.release(subbPtr); - } - } - c_dataSubscribers.first(i_subbPtr); - while(!i_subbPtr.isNull()){ - jam(); - SubscriberPtr subbPtr = i_subbPtr; - c_dataSubscribers.next(i_subbPtr); - if( subbPtr.p->m_subPtrI == subPtr.i ) { - jam(); - sendSubRemoveRef(signal, req, /* ErrorCode */ 0, true); - return; - /* Unfinished/untested code. If remove should be possible - * even if subscribers are left these have to be stopped - * first. See m_markRemove, m_nSubscribers. We need also to - * block remove for this subscription so that multiple - * removes is not possible... - */ - c_dataSubscribers.remove(subbPtr); - c_removeDataSubscribers.add(subbPtr); - count++; - } - } c_metaSubscribers.first(i_subbPtr); while(!i_subbPtr.isNull()){ jam(); @@ -3491,15 +1737,7 @@ SumaParticipant::execSUB_REMOVE_REQ(Signal* signal) { subPtr.p->m_senderRef = senderRef; subPtr.p->m_senderData = req.senderData; - if (count > 0){ - jam(); - ndbrequire(false); // code not finalized - subPtr.p->m_markRemove = true; - subPtr.p->m_nSubscribers = count; - sendSubStopReq(signal); - } else { - completeSubRemoveReq(signal, subPtr); - } + completeSubRemoveReq(signal, subPtr); } void @@ -3596,486 +1834,5 @@ SumaParticipant::SyncRecord::release(){ attrBuf.release(); } - -/************************************************************** - * - * Restarting remote node functions, master functionality - * (slave does nothing special) - * - triggered on INCL_NODEREQ calling startNode - * - included node will issue START_ME when it's ready to start - * the subscribers - * - */ - -Suma::Restart::Restart(Suma& s) : suma(s) { - for (int i = 0; i < MAX_REPLICAS; i++) { - c_okToStart[i] = false; - c_waitingToStart[i] = false; - } -} - -void -Suma::Restart::resetNode(Uint32 sumaRef) -{ - jam(); - int I = suma.RtoI(sumaRef); - c_okToStart[I] = false; - c_waitingToStart[I] = false; -} - -void -Suma::Restart::startNode(Signal* signal, Uint32 sumaRef) -{ - jam(); - resetNode(sumaRef); - - // right now we can only handle restarting one node - // at a time in a node group - - createSubscription(signal, sumaRef); -} - -void -Suma::Restart::createSubscription(Signal* signal, Uint32 sumaRef) { - jam(); - suma.c_subscriptions.first(c_subPtr); - nextSubscription(signal, sumaRef); -} - -void -Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef) { - jam(); - if (c_subPtr.isNull()) { - jam(); - completeSubscription(signal, sumaRef); - return; - } - SubscriptionPtr subPtr; - subPtr.i = c_subPtr.curr.i; - subPtr.p = suma.c_subscriptions.getPtr(subPtr.i); - - suma.c_subscriptions.next(c_subPtr); - - SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend(); - - req->subscriberRef = suma.reference(); - req->subscriberData = subPtr.i; - req->subscriptionId = subPtr.p->m_subscriptionId; - req->subscriptionKey = subPtr.p->m_subscriptionKey; - req->subscriptionType = subPtr.p->m_subscriptionType | - SubCreateReq::RestartFlag; - - switch (subPtr.p->m_subscriptionType) { - case SubCreateReq::TableEvent: - case SubCreateReq::SelectiveTableSnapshot: - case SubCreateReq::DatabaseSnapshot: { - jam(); - - Ptr<SyncRecord> syncPtr; - suma.c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); - syncPtr.p->m_tableList.first(syncPtr.p->m_tableList_it); - - ndbrequire(!syncPtr.p->m_tableList_it.isNull()); - - req->tableId = *syncPtr.p->m_tableList_it.data; - -#if 0 - for (int i = 0; i < MAX_TABLES; i++) - if (subPtr.p->m_tables[i]) { - req->tableId = i; - break; - } -#endif - - suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal, - SubCreateReq::SignalLength+1 /*to get table Id*/, JBB); - return; - } - case SubCreateReq::SingleTableScan : - // TODO - jam(); - return; - } - ndbrequire(false); -} - -void -Suma::execSUB_CREATE_CONF(Signal* signal) { - jamEntry(); -#ifdef NODEFAIL_DEBUG - ndbout_c("Suma::execSUB_CREATE_CONF"); -#endif - - const Uint32 senderRef = signal->senderBlockRef(); - - SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr(); - - Subscription key; - const Uint32 subscriberData = conf->subscriberData; - key.m_subscriptionId = conf->subscriptionId; - key.m_subscriptionKey = conf->subscriptionKey; - - SubscriptionPtr subPtr; - ndbrequire(c_subscriptions.find(subPtr, key)); - - switch(subPtr.p->m_subscriptionType) { - case SubCreateReq::TableEvent: - case SubCreateReq::SelectiveTableSnapshot: - case SubCreateReq::DatabaseSnapshot: - { - Ptr<SyncRecord> syncPtr; - c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); - - syncPtr.p->m_tableList.next(syncPtr.p->m_tableList_it); - if (syncPtr.p->m_tableList_it.isNull()) { - jam(); - SubSyncReq *req = (SubSyncReq *)signal->getDataPtrSend(); - - req->subscriptionId = key.m_subscriptionId; - req->subscriptionKey = key.m_subscriptionKey; - req->subscriberData = subscriberData; - req->part = (Uint32) SubscriptionData::MetaData; - - sendSignal(senderRef, GSN_SUB_SYNC_REQ, signal, - SubSyncReq::SignalLength, JBB); - } else { - jam(); - SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend(); - - req->subscriberRef = reference(); - req->subscriberData = subPtr.i; - req->subscriptionId = subPtr.p->m_subscriptionId; - req->subscriptionKey = subPtr.p->m_subscriptionKey; - req->subscriptionType = subPtr.p->m_subscriptionType | - SubCreateReq::RestartFlag | - SubCreateReq::AddTableFlag; - - req->tableId = *syncPtr.p->m_tableList_it.data; - - sendSignal(senderRef, GSN_SUB_CREATE_REQ, signal, - SubCreateReq::SignalLength+1 /*to get table Id*/, JBB); - } - } - return; - case SubCreateReq::SingleTableScan: - ndbrequire(false); - } - ndbrequire(false); -} - -void -Suma::execSUB_CREATE_REF(Signal* signal) { - jamEntry(); -#ifdef NODEFAIL_DEBUG - ndbout_c("Suma::execSUB_CREATE_REF"); -#endif - //ndbrequire(false); -} - -void -Suma::execSUB_SYNC_CONF(Signal* signal) { - jamEntry(); -#ifdef NODEFAIL_DEBUG - ndbout_c("Suma::execSUB_SYNC_CONF"); -#endif - Uint32 sumaRef = signal->getSendersBlockRef(); - - SubSyncConf *conf = (SubSyncConf *)signal->getDataPtr(); - Subscription key; - - key.m_subscriptionId = conf->subscriptionId; - key.m_subscriptionKey = conf->subscriptionKey; - // SubscriptionData::Part part = (SubscriptionData::Part)conf->part; - // const Uint32 subscriberData = conf->subscriberData; - - SubscriptionPtr subPtr; - c_subscriptions.find(subPtr, key); - - switch(subPtr.p->m_subscriptionType) { - case SubCreateReq::TableEvent: - case SubCreateReq::SelectiveTableSnapshot: - case SubCreateReq::DatabaseSnapshot: - jam(); - Restart.nextSubscription(signal, sumaRef); - return; - case SubCreateReq::SingleTableScan: - ndbrequire(false); - return; - } - ndbrequire(false); -} - -void -Suma::execSUB_SYNC_REF(Signal* signal) { - jamEntry(); -#ifdef NODEFAIL_DEBUG - ndbout_c("Suma::execSUB_SYNC_REF"); -#endif - //ndbrequire(false); -} - -void -Suma::execSUMA_START_ME(Signal* signal) { - jamEntry(); -#ifdef NODEFAIL_DEBUG - ndbout_c("Suma::execSUMA_START_ME"); -#endif - - Restart.runSUMA_START_ME(signal, signal->getSendersBlockRef()); -} - -void -Suma::Restart::runSUMA_START_ME(Signal* signal, Uint32 sumaRef) { - int I = suma.RtoI(sumaRef); - - // restarting Suma is ready for SUB_START_REQ - if (c_waitingToStart[I]) { - // we've waited with startSubscriber since restarting suma was not ready - c_waitingToStart[I] = false; - startSubscriber(signal, sumaRef); - } else { - // do startSubscriber as soon as its time - c_okToStart[I] = true; - } -} - -void -Suma::Restart::completeSubscription(Signal* signal, Uint32 sumaRef) { - jam(); - int I = suma.RtoI(sumaRef); - - if (c_okToStart[I]) {// otherwise will start when START_ME comes - c_okToStart[I] = false; - startSubscriber(signal, sumaRef); - } else { - c_waitingToStart[I] = true; - } -} - -void -Suma::Restart::startSubscriber(Signal* signal, Uint32 sumaRef) { - jam(); - suma.c_dataSubscribers.first(c_subbPtr); - nextSubscriber(signal, sumaRef); -} - -void -Suma::Restart::sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr, - Signal* signal, Uint32 sumaRef) -{ - jam(); - SubStartReq * req = (SubStartReq *)signal->getDataPtrSend(); - - req->senderRef = suma.reference(); - req->senderData = subbPtr.p->m_senderData; - req->subscriptionId = subPtr.p->m_subscriptionId; - req->subscriptionKey = subPtr.p->m_subscriptionKey; - req->part = SubscriptionData::TableData; - req->subscriberData = subbPtr.p->m_subscriberData; - req->subscriberRef = subbPtr.p->m_subscriberRef; - - // restarting suma will not respond to this until startphase 5 - // since it is not until then data copying has been completed -#ifdef NODEFAIL_DEBUG - ndbout_c("Suma::Restart::sendSubStartReq sending GSN_SUB_START_REQ id=%u key=%u", - req->subscriptionId, req->subscriptionKey); -#endif - suma.sendSignal(sumaRef, GSN_SUB_START_REQ, - signal, SubStartReq::SignalLength2, JBB); -} - -void -Suma::execSUB_START_CONF(Signal* signal) { - jamEntry(); -#ifdef NODEFAIL_DEBUG - ndbout_c("Suma::execSUB_START_CONF"); -#endif - Uint32 sumaRef = signal->getSendersBlockRef(); - Restart.nextSubscriber(signal, sumaRef); -} - -void -Suma::execSUB_START_REF(Signal* signal) { - jamEntry(); -#ifdef NODEFAIL_DEBUG - ndbout_c("Suma::execSUB_START_REF"); -#endif - //ndbrequire(false); -} - -void -Suma::Restart::nextSubscriber(Signal* signal, Uint32 sumaRef) { - jam(); - if (c_subbPtr.isNull()) { - jam(); - completeSubscriber(signal, sumaRef); - return; - } - - SubscriberPtr subbPtr = c_subbPtr; - suma.c_dataSubscribers.next(c_subbPtr); - - /* - * get subscription ptr for this subscriber - */ - - SubscriptionPtr subPtr; - suma.c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); - switch (subPtr.p->m_subscriptionType) { - case SubCreateReq::TableEvent: - case SubCreateReq::SelectiveTableSnapshot: - case SubCreateReq::DatabaseSnapshot: - { - jam(); - sendSubStartReq(subPtr, subbPtr, signal, sumaRef); -#if 0 - SubStartReq * req = (SubStartReq *)signal->getDataPtrSend(); - - req->senderRef = reference(); - req->senderData = subbPtr.p->m_senderData; - req->subscriptionId = subPtr.p->m_subscriptionId; - req->subscriptionKey = subPtr.p->m_subscriptionKey; - req->part = SubscriptionData::TableData; - req->subscriberData = subbPtr.p->m_subscriberData; - req->subscriberRef = subbPtr.p->m_subscriberRef; - - // restarting suma will not respond to this until startphase 5 - // since it is not until then data copying has been completed -#ifdef NODEFAIL_DEBUG - ndbout_c("Suma::nextSubscriber sending GSN_SUB_START_REQ id=%u key=%u", - req->subscriptionId, req->subscriptionKey); -#endif - suma.sendSignal(sumaRef, GSN_SUB_START_REQ, - signal, SubStartReq::SignalLength2, JBB); -#endif - } - return; - case SubCreateReq::SingleTableScan: - ndbrequire(false); - return; - } - ndbrequire(false); -} - -void -Suma::Restart::completeSubscriber(Signal* signal, Uint32 sumaRef) { - completeRestartingNode(signal, sumaRef); -} - -void -Suma::Restart::completeRestartingNode(Signal* signal, Uint32 sumaRef) { - jam(); - SumaHandoverReq * req = (SumaHandoverReq *)signal->getDataPtrSend(); - - req->gci = suma.getFirstGCI(signal); - - suma.sendSignal(sumaRef, GSN_SUMA_HANDOVER_REQ, signal, - SumaHandoverReq::SignalLength, JBB); -} - -// only run on restarting suma - -void -Suma::execSUMA_HANDOVER_REQ(Signal* signal) -{ - jamEntry(); - // Uint32 sumaRef = signal->getSendersBlockRef(); - SumaHandoverReq const * req = (SumaHandoverReq *)signal->getDataPtr(); - - Uint32 gci = req->gci; - Uint32 new_gci = getFirstGCI(signal); - - if (new_gci > gci) { - gci = new_gci; - } - - { // all recreated subscribers at restarting SUMA start at same GCI - SubscriberPtr subbPtr; - for(c_dataSubscribers.first(subbPtr); - !subbPtr.isNull(); - c_dataSubscribers.next(subbPtr)){ - subbPtr.p->m_firstGCI = gci; - } - } - -#ifdef NODEFAIL_DEBUG - ndbout_c("Suma::execSUMA_HANDOVER_REQ, gci = %u", gci); -#endif - - c_handoverToDo = false; - c_restartLock = false; - { -#ifdef HANDOVER_DEBUG - int c = 0; -#endif - for( int i = 0; i < NO_OF_BUCKETS; i++) { - jam(); - if (getResponsibleSumaNodeId(i) == refToNode(reference())) { -#ifdef HANDOVER_DEBUG - c++; -#endif - jam(); - c_buckets[i].active = false; - c_buckets[i].handoverGCI = gci; - c_buckets[i].handover = true; - c_buckets[i].handover_started = false; - c_handoverToDo = true; - } - } -#ifdef HANDOVER_DEBUG - ndbout_c("prepared handover of bucket %u buckets", c); -#endif - } - - for (Uint32 i = 0; i < c_noNodesInGroup; i++) { - jam(); - Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]); - if (ref != reference()) { - jam(); - sendSignal(ref, GSN_SUMA_HANDOVER_CONF, signal, - SumaHandoverConf::SignalLength, JBB); - }//if - } -} - -// only run on all but restarting suma -void -Suma::execSUMA_HANDOVER_CONF(Signal* signal) { - jamEntry(); - Uint32 sumaRef = signal->getSendersBlockRef(); - SumaHandoverConf const * conf = (SumaHandoverConf *)signal->getDataPtr(); - - Uint32 gci = conf->gci; - -#ifdef HANDOVER_DEBUG - ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci); -#endif - - /* TODO, if we are restarting several SUMA's (>2 in a nodegroup) - * we have to collect all these conf's before proceding - */ - - // restarting node is now prepared and ready - c_preparingNodes.clear(refToNode(sumaRef)); /* !! important to do before - * below since it affects - * getResponsibleSumaNodeId() - */ - - c_handoverToDo = false; - // mark all active buckets really belonging to restarting SUMA - for( int i = 0; i < NO_OF_BUCKETS; i++) { - if (c_buckets[i].active) { - // I'm running this bucket - if (getResponsibleSumaNodeId(i) == refToNode(sumaRef)) { - // but it should really be the restarted node - c_buckets[i].handoverGCI = gci; - c_buckets[i].handover = true; - c_buckets[i].handover_started = false; - c_handoverToDo = true; - } - } - } -} - template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&); diff --git a/ndb/src/kernel/blocks/suma/Suma.hpp b/ndb/src/kernel/blocks/suma/Suma.hpp index 3508c5b0e0f..5cf1c4d543f 100644 --- a/ndb/src/kernel/blocks/suma/Suma.hpp +++ b/ndb/src/kernel/blocks/suma/Suma.hpp @@ -77,14 +77,6 @@ protected: void execSUB_SYNC_CONTINUE_CONF(Signal* signal); /** - * Trigger logging - */ - void execTRIG_ATTRINFO(Signal* signal); - void execFIRE_TRIG_ORD(Signal* signal); - void execSUB_GCP_COMPLETE_REP(Signal* signal); - void runSUB_GCP_COMPLETE_ACC(Signal* signal); - - /** * DIH signals */ void execDI_FCOUNTREF(Signal* signal); @@ -93,14 +85,6 @@ protected: void execDIGETPRIMCONF(Signal* signal); /** - * Trigger administration - */ - void execCREATE_TRIG_REF(Signal* signal); - void execCREATE_TRIG_CONF(Signal* signal); - void execDROP_TRIG_REF(Signal* signal); - void execDROP_TRIG_CONF(Signal* signal); - - /** * continueb */ void execCONTINUEB(Signal* signal); @@ -190,22 +174,6 @@ public: void completeMeta(Signal*); /** - * Create triggers - */ - Uint32 m_latestTriggerId; - void startTrigger(Signal* signal); - void nextTrigger(Signal* signal); - void completeTrigger(Signal* signal); - void createAttributeMask(AttributeMask&, Table*); - - /** - * Drop triggers - */ - void startDropTrigger(Signal* signal); - void nextDropTrigger(Signal* signal); - void completeDropTrigger(Signal* signal); - - /** * Sync data */ Uint32 m_currentTable; // Index in m_tableList @@ -229,18 +197,12 @@ public: suma.progError(line, cause, extra); } - void runLIST_TABLES_CONF(Signal* signal); void runGET_TABINFO_CONF(Signal* signal); void runGET_TABINFOREF(Signal* signal); void runDI_FCOUNTCONF(Signal* signal); void runDIGETPRIMCONF(Signal* signal); - void runCREATE_TRIG_CONF(Signal* signal); - void runDROP_TRIG_CONF(Signal* signal); - void runDROP_TRIG_REF(Signal* signal); - void runDropTrig(Signal* signal, Uint32 triggerId, Uint32 tableId); - Uint32 ptrI; union { Uint32 nextPool; Uint32 nextList; }; }; @@ -294,24 +256,11 @@ public: Uint32 m_subscriberRef; Uint32 m_subscriberData; Uint32 m_subPtrI; //reference to subscription - Uint32 m_firstGCI; // first GCI to send - Uint32 m_lastGCI; // last acnowledged GCI Uint32 nextList; union { Uint32 nextPool; Uint32 prevList; }; }; typedef Ptr<Subscriber> SubscriberPtr; - struct Bucket { - bool active; - bool handover; - bool handover_started; - Uint32 handoverGCI; - }; -#define NO_OF_BUCKETS 24 - struct Bucket c_buckets[NO_OF_BUCKETS]; - bool c_handoverToDo; - Uint32 c_lastCompleteGCI; - /** * */ @@ -336,25 +285,8 @@ public: DataBuffer<15>::DataBufferPool c_dataBufferPool; /** - * for restarting Suma not to start sending data too early - */ - bool c_restartLock; - - /** - * for flagging that a GCI containg inconsistent data - * typically due to node failiure - */ - - Uint32 c_lastInconsistentGCI; - Uint32 c_nodeFailGCI; - - NodeBitmask c_failedApiNodes; - - /** * Functions */ - bool removeSubscribersOnNode(Signal *signal, Uint32 nodeId); - bool parseTable(Signal* signal, class GetTabInfoConf* conf, Uint32 tableId, SyncRecord* syncPtr_p); bool checkTableTriggers(SegmentedSectionPtr ptr); @@ -365,52 +297,11 @@ public: void sendSubIdRef(Signal* signal, Uint32 errorCode); void sendSubCreateConf(Signal* signal, Uint32 sender, SubscriptionPtr subPtr); void sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errorCode); - void sendSubStartRef(SubscriptionPtr subPtr, Signal* signal, - Uint32 errorCode, bool temporary = false); - void sendSubStartRef(Signal* signal, - Uint32 errorCode, bool temporary = false); - void sendSubStopRef(Signal* signal, - Uint32 errorCode, bool temporary = false); void sendSubSyncRef(Signal* signal, Uint32 errorCode); void sendSubRemoveRef(Signal* signal, const SubRemoveReq& ref, Uint32 errorCode, bool temporary = false); - void sendSubStartComplete(Signal*, SubscriberPtr, Uint32, - SubscriptionData::Part); - void sendSubStopComplete(Signal*, SubscriberPtr); - void sendSubStopReq(Signal* signal, bool unlock= false); - void completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr); - Uint32 getFirstGCI(Signal* signal); - Uint32 decideWhoToSend(Uint32 nBucket, Uint32 gci); - - virtual Uint32 getStoreBucket(Uint32 v) = 0; - virtual Uint32 getResponsibleSumaNodeId(Uint32 D) = 0; - virtual Uint32 RtoI(Uint32 sumaRef, bool dieOnNotFound = true) = 0; - - struct FailoverBuffer { - // FailoverBuffer(DataBuffer<15>::DataBufferPool & p); - FailoverBuffer(); - - bool subTableData(Uint32 gci, Uint32 *src, int sz); - bool subGcpCompleteRep(Uint32 gci); - bool nodeFailRep(); - - // typedef DataBuffer<15> GCIDataBuffer; - // GCIDataBuffer m_GCIDataBuffer; - // GCIDataBuffer::DataBufferIterator m_GCIDataBuffer_it; - - Uint32 *c_gcis; - int c_sz; - - // Uint32 *c_buf; - // int c_buf_sz; - - int c_first; - int c_next; - bool c_full; - } c_failoverBuffer; - /** * Table admin */ @@ -441,8 +332,6 @@ private: * Framework signals */ - void getNodeGroupMembers(Signal* signal); - void execREAD_CONFIG_REQ(Signal* signal); void execSTTOR(Signal* signal); @@ -454,35 +343,13 @@ private: void execINCL_NODEREQ(Signal* signal); void execCONTINUEB(Signal* signal); void execSIGNAL_DROPPED_REP(Signal* signal); - void execAPI_FAILREQ(Signal* signal) ; - - void execSUB_GCP_COMPLETE_ACC(Signal* signal); /** * Controller interface */ - void execSUB_CREATE_REF(Signal* signal); - void execSUB_CREATE_CONF(Signal* signal); - - void execSUB_DROP_REF(Signal* signal); - void execSUB_DROP_CONF(Signal* signal); - - void execSUB_START_REF(Signal* signal); - void execSUB_START_CONF(Signal* signal); - - void execSUB_STOP_REF(Signal* signal); - void execSUB_STOP_CONF(Signal* signal); - - void execSUB_SYNC_REF(Signal* signal); - void execSUB_SYNC_CONF(Signal* signal); - void execSUB_ABORT_SYNC_REF(Signal* signal); void execSUB_ABORT_SYNC_CONF(Signal* signal); - void execSUMA_START_ME(Signal* signal); - void execSUMA_HANDOVER_REQ(Signal* signal); - void execSUMA_HANDOVER_CONF(Signal* signal); - /** * Subscription generation interface */ @@ -494,49 +361,6 @@ private: void execUTIL_SEQUENCE_REF(Signal* signal); void execCREATE_SUBID_REQ(Signal* signal); - Uint32 getStoreBucket(Uint32 v); - Uint32 getResponsibleSumaNodeId(Uint32 D); - - /** - * for Suma that is restarting another - */ - - struct Restart { - Restart(Suma& s); - - Suma & suma; - - bool c_okToStart[MAX_REPLICAS]; - bool c_waitingToStart[MAX_REPLICAS]; - - DLHashTable<SumaParticipant::Subscription>::Iterator c_subPtr; // TODO [MAX_REPLICAS] - SubscriberPtr c_subbPtr; // TODO [MAX_REPLICAS] - - void progError(int line, int cause, const char * extra) { - suma.progError(line, cause, extra); - } - - void resetNode(Uint32 sumaRef); - void runSUMA_START_ME(Signal*, Uint32 sumaRef); - void startNode(Signal*, Uint32 sumaRef); - - void createSubscription(Signal* signal, Uint32 sumaRef); - void nextSubscription(Signal* signal, Uint32 sumaRef); - void completeSubscription(Signal* signal, Uint32 sumaRef); - - void startSync(Signal* signal, Uint32 sumaRef); - void nextSync(Signal* signal, Uint32 sumaRef); - void completeSync(Signal* signal, Uint32 sumaRef); - - void sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr, - Signal* signal, Uint32 sumaRef); - void startSubscriber(Signal* signal, Uint32 sumaRef); - void nextSubscriber(Signal* signal, Uint32 sumaRef); - void completeSubscriber(Signal* signal, Uint32 sumaRef); - - void completeRestartingNode(Signal* signal, Uint32 sumaRef); - } Restart; - private: friend class Restart; struct SubCoordinator { @@ -590,14 +414,4 @@ private: DLList<SubCoordinator> c_runningSubscriptions; }; -inline Uint32 -Suma::RtoI(Uint32 sumaRef, bool dieOnNotFound) { - for (Uint32 i = 0; i < c_noNodesInGroup; i++) { - if (sumaRef == calcSumaBlockRef(c_nodesInGroup[i])) - return i; - } - ndbrequire(!dieOnNotFound); - return RNIL; -} - #endif diff --git a/ndb/src/kernel/blocks/suma/SumaInit.cpp b/ndb/src/kernel/blocks/suma/SumaInit.cpp index ad8493ff908..ae7425da4bf 100644 --- a/ndb/src/kernel/blocks/suma/SumaInit.cpp +++ b/ndb/src/kernel/blocks/suma/SumaInit.cpp @@ -35,19 +35,11 @@ SumaParticipant::SumaParticipant(const Configuration & conf) : */ addRecSignal(GSN_SUB_CREATE_REQ, &SumaParticipant::execSUB_CREATE_REQ); addRecSignal(GSN_SUB_REMOVE_REQ, &SumaParticipant::execSUB_REMOVE_REQ); - addRecSignal(GSN_SUB_START_REQ, &SumaParticipant::execSUB_START_REQ); - addRecSignal(GSN_SUB_STOP_REQ, &SumaParticipant::execSUB_STOP_REQ); addRecSignal(GSN_SUB_SYNC_REQ, &SumaParticipant::execSUB_SYNC_REQ); - addRecSignal(GSN_SUB_STOP_CONF, &SumaParticipant::execSUB_STOP_CONF); - addRecSignal(GSN_SUB_STOP_REF, &SumaParticipant::execSUB_STOP_REF); - /** * Dict interface */ - //addRecSignal(GSN_LIST_TABLES_REF, &SumaParticipant::execLIST_TABLES_REF); - addRecSignal(GSN_LIST_TABLES_CONF, &SumaParticipant::execLIST_TABLES_CONF); - //addRecSignal(GSN_GET_TABINFOREF, &SumaParticipant::execGET_TABINFO_REF); addRecSignal(GSN_GET_TABINFO_CONF, &SumaParticipant::execGET_TABINFO_CONF); addRecSignal(GSN_GET_TABINFOREF, &SumaParticipant::execGET_TABINFOREF); #if 0 @@ -76,32 +68,6 @@ SumaParticipant::SumaParticipant(const Configuration & conf) : addRecSignal(GSN_SUB_SYNC_CONTINUE_CONF, &SumaParticipant::execSUB_SYNC_CONTINUE_CONF); - /** - * Trigger stuff - */ - addRecSignal(GSN_TRIG_ATTRINFO, &SumaParticipant::execTRIG_ATTRINFO); - addRecSignal(GSN_FIRE_TRIG_ORD, &SumaParticipant::execFIRE_TRIG_ORD); - - addRecSignal(GSN_CREATE_TRIG_REF, &Suma::execCREATE_TRIG_REF); - addRecSignal(GSN_CREATE_TRIG_CONF, &Suma::execCREATE_TRIG_CONF); - addRecSignal(GSN_DROP_TRIG_REF, &Suma::execDROP_TRIG_REF); - addRecSignal(GSN_DROP_TRIG_CONF, &Suma::execDROP_TRIG_CONF); - - addRecSignal(GSN_SUB_GCP_COMPLETE_REP, - &SumaParticipant::execSUB_GCP_COMPLETE_REP); - - for( int i = 0; i < NO_OF_BUCKETS; i++) { - c_buckets[i].active = false; - c_buckets[i].handover = false; - c_buckets[i].handover_started = false; - c_buckets[i].handoverGCI = 0; - } - c_handoverToDo = false; - c_lastInconsistentGCI = RNIL; - c_lastCompleteGCI = RNIL; - c_nodeFailGCI = 0; - - c_failedApiNodes.clear(); } SumaParticipant::~SumaParticipant() @@ -110,7 +76,6 @@ SumaParticipant::~SumaParticipant() Suma::Suma(const Configuration & conf) : SumaParticipant(conf), - Restart(*this), c_nodes(c_nodePool), c_runningSubscriptions(c_subCoordinatorPool) { @@ -120,29 +85,12 @@ Suma::Suma(const Configuration & conf) : addRecSignal(GSN_NDB_STTOR, &Suma::execNDB_STTOR); addRecSignal(GSN_DUMP_STATE_ORD, &Suma::execDUMP_STATE_ORD); addRecSignal(GSN_READ_NODESCONF, &Suma::execREAD_NODESCONF); - addRecSignal(GSN_API_FAILREQ, &Suma::execAPI_FAILREQ); - addRecSignal(GSN_NODE_FAILREP, &Suma::execNODE_FAILREP); - addRecSignal(GSN_INCL_NODEREQ, &Suma::execINCL_NODEREQ); addRecSignal(GSN_CONTINUEB, &Suma::execCONTINUEB); addRecSignal(GSN_SIGNAL_DROPPED_REP, &Suma::execSIGNAL_DROPPED_REP, true); addRecSignal(GSN_UTIL_SEQUENCE_CONF, &Suma::execUTIL_SEQUENCE_CONF); addRecSignal(GSN_UTIL_SEQUENCE_REF, &Suma::execUTIL_SEQUENCE_REF); addRecSignal(GSN_CREATE_SUBID_REQ, &Suma::execCREATE_SUBID_REQ); - - addRecSignal(GSN_SUB_CREATE_CONF, &Suma::execSUB_CREATE_CONF); - addRecSignal(GSN_SUB_CREATE_REF, &Suma::execSUB_CREATE_REF); - addRecSignal(GSN_SUB_SYNC_CONF, &Suma::execSUB_SYNC_CONF); - addRecSignal(GSN_SUB_SYNC_REF, &Suma::execSUB_SYNC_REF); - addRecSignal(GSN_SUB_START_CONF, &Suma::execSUB_START_CONF); - addRecSignal(GSN_SUB_START_REF, &Suma::execSUB_START_REF); - - addRecSignal(GSN_SUMA_START_ME, &Suma::execSUMA_START_ME); - addRecSignal(GSN_SUMA_HANDOVER_REQ, &Suma::execSUMA_HANDOVER_REQ); - addRecSignal(GSN_SUMA_HANDOVER_CONF, &Suma::execSUMA_HANDOVER_CONF); - - addRecSignal(GSN_SUB_GCP_COMPLETE_ACC, - &Suma::execSUB_GCP_COMPLETE_ACC); } Suma::~Suma() diff --git a/ndb/src/ndbapi/Makefile.am b/ndb/src/ndbapi/Makefile.am index 99b75ffbd53..522e78dd6e0 100644 --- a/ndb/src/ndbapi/Makefile.am +++ b/ndb/src/ndbapi/Makefile.am @@ -24,8 +24,6 @@ libndbapi_la_SOURCES = \ NdbOperationExec.cpp \ NdbScanOperation.cpp NdbScanFilter.cpp \ NdbIndexOperation.cpp \ - NdbEventOperation.cpp \ - NdbEventOperationImpl.cpp \ NdbApiSignal.cpp \ NdbRecAttr.cpp \ NdbUtil.cpp \ diff --git a/ndb/src/ndbapi/Ndb.cpp b/ndb/src/ndbapi/Ndb.cpp index 56d68503825..9d1c78a5972 100644 --- a/ndb/src/ndbapi/Ndb.cpp +++ b/ndb/src/ndbapi/Ndb.cpp @@ -28,7 +28,6 @@ Name: Ndb.cpp #include "NdbImpl.hpp" #include <NdbOperation.hpp> #include <NdbTransaction.hpp> -#include <NdbEventOperation.hpp> #include <NdbRecAttr.hpp> #include <md5_hash.hpp> #include <NdbSleep.h> @@ -1300,51 +1299,6 @@ Ndb::getSchemaFromInternalName(const char * internalName) return ret; } -NdbEventOperation* Ndb::createEventOperation(const char* eventName, - const int bufferLength) -{ - NdbEventOperation* tOp; - - tOp = new NdbEventOperation(this, eventName, bufferLength); - - if (tOp == 0) - { - theError.code= 4000; - return NULL; - } - - if (tOp->getState() != NdbEventOperation::EO_CREATED) { - theError.code= tOp->getNdbError().code; - delete tOp; - tOp = NULL; - } - - //now we have to look up this event in dict - - return tOp; -} - -int Ndb::dropEventOperation(NdbEventOperation* op) { - delete op; - return 0; -} - -NdbGlobalEventBufferHandle* Ndb::getGlobalEventBufferHandle() -{ - return theGlobalEventBufferHandle; -} - -//void Ndb::monitorEvent(NdbEventOperation *op, NdbEventCallback cb, void* rs) -//{ -//} - -int -Ndb::pollEvents(int aMillisecondNumber) -{ - return NdbEventOperation::wait(theGlobalEventBufferHandle, - aMillisecondNumber); -} - #ifdef VM_TRACE #include <NdbMutex.h> extern NdbMutex *ndb_print_state_mutex; diff --git a/ndb/src/ndbapi/NdbDictionary.cpp b/ndb/src/ndbapi/NdbDictionary.cpp index a0a3dd431b8..6c721b76ba0 100644 --- a/ndb/src/ndbapi/NdbDictionary.cpp +++ b/ndb/src/ndbapi/NdbDictionary.cpp @@ -611,132 +611,6 @@ NdbDictionary::Index::getObjectVersion() const { } /***************************************************************** - * Event facade - */ -NdbDictionary::Event::Event(const char * name) - : m_impl(* new NdbEventImpl(* this)) -{ - setName(name); -} - -NdbDictionary::Event::Event(const char * name, const Table& table) - : m_impl(* new NdbEventImpl(* this)) -{ - setName(name); - setTable(table); -} - -NdbDictionary::Event::Event(NdbEventImpl & impl) - : m_impl(impl) -{ -} - -NdbDictionary::Event::~Event() -{ - NdbEventImpl * tmp = &m_impl; - if(this != tmp){ - delete tmp; - } -} - -void -NdbDictionary::Event::setName(const char * name) -{ - m_impl.setName(name); -} - -const char * -NdbDictionary::Event::getName() const -{ - return m_impl.getName(); -} - -void -NdbDictionary::Event::setTable(const Table& table) -{ - m_impl.setTable(table); -} - -void -NdbDictionary::Event::setTable(const char * table) -{ - m_impl.setTable(table); -} - -const char* -NdbDictionary::Event::getTableName() const -{ - return m_impl.getTableName(); -} - -void -NdbDictionary::Event::addTableEvent(const TableEvent t) -{ - m_impl.addTableEvent(t); -} - -void -NdbDictionary::Event::setDurability(EventDurability d) -{ - m_impl.setDurability(d); -} - -NdbDictionary::Event::EventDurability -NdbDictionary::Event::getDurability() const -{ - return m_impl.getDurability(); -} - -void -NdbDictionary::Event::addColumn(const Column & c){ - NdbColumnImpl* col = new NdbColumnImpl; - (* col) = NdbColumnImpl::getImpl(c); - m_impl.m_columns.push_back(col); -} - -void -NdbDictionary::Event::addEventColumn(unsigned attrId) -{ - m_impl.m_attrIds.push_back(attrId); -} - -void -NdbDictionary::Event::addEventColumn(const char * name) -{ - const Column c(name); - addColumn(c); -} - -void -NdbDictionary::Event::addEventColumns(int n, const char ** names) -{ - for (int i = 0; i < n; i++) - addEventColumn(names[i]); -} - -int NdbDictionary::Event::getNoOfEventColumns() const -{ - return m_impl.getNoOfEventColumns(); -} - -NdbDictionary::Object::Status -NdbDictionary::Event::getObjectStatus() const -{ - return m_impl.m_status; -} - -int -NdbDictionary::Event::getObjectVersion() const -{ - return m_impl.m_version; -} - -void NdbDictionary::Event::print() -{ - m_impl.print(); -} - -/***************************************************************** * Dictionary facade */ NdbDictionary::Dictionary::Dictionary(Ndb & ndb) @@ -885,28 +759,6 @@ NdbDictionary::Dictionary::getIndexTable(const char * indexName, return 0; } - -int -NdbDictionary::Dictionary::createEvent(const Event & ev) -{ - return m_impl.createEvent(NdbEventImpl::getImpl(ev)); -} - -int -NdbDictionary::Dictionary::dropEvent(const char * eventName) -{ - return m_impl.dropEvent(eventName); -} - -const NdbDictionary::Event * -NdbDictionary::Dictionary::getEvent(const char * eventName) -{ - NdbEventImpl * t = m_impl.getEvent(eventName); - if(t) - return t->m_facade; - return 0; -} - int NdbDictionary::Dictionary::listObjects(List& list, Object::Type type) { diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index ce348b616c9..b91df24d8d7 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -32,8 +32,6 @@ #include <SimpleProperties.hpp> #include <Bitmask.hpp> #include <AttributeList.hpp> -#include <NdbEventOperation.hpp> -#include "NdbEventOperationImpl.hpp" #include <NdbBlob.hpp> #include "NdbBlobImpl.hpp" #include <AttributeHeader.hpp> @@ -586,99 +584,6 @@ NdbIndexImpl::getIndexTable() const } /** - * NdbEventImpl - */ - -NdbEventImpl::NdbEventImpl() : - NdbDictionary::Event(* this), - m_facade(this) -{ - init(); -} - -NdbEventImpl::NdbEventImpl(NdbDictionary::Event & f) : - NdbDictionary::Event(* this), - m_facade(&f) -{ - init(); -} - -void NdbEventImpl::init() -{ - m_eventId= RNIL; - m_eventKey= RNIL; - m_tableId= RNIL; - mi_type= 0; - m_dur= NdbDictionary::Event::ED_UNDEFINED; - m_tableImpl= NULL; - m_bufferId= RNIL; - eventOp= NULL; -} - -NdbEventImpl::~NdbEventImpl() -{ - for (unsigned i = 0; i < m_columns.size(); i++) - delete m_columns[i]; -} - -void NdbEventImpl::setName(const char * name) -{ - m_externalName.assign(name); -} - -const char *NdbEventImpl::getName() const -{ - return m_externalName.c_str(); -} - -void -NdbEventImpl::setTable(const NdbDictionary::Table& table) -{ - m_tableImpl= &NdbTableImpl::getImpl(table); - m_tableName.assign(m_tableImpl->getName()); -} - -void -NdbEventImpl::setTable(const char * table) -{ - m_tableName.assign(table); -} - -const char * -NdbEventImpl::getTableName() const -{ - return m_tableName.c_str(); -} - -void -NdbEventImpl::addTableEvent(const NdbDictionary::Event::TableEvent t = NdbDictionary::Event::TE_ALL) -{ - switch (t) { - case NdbDictionary::Event::TE_INSERT : mi_type |= 1; break; - case NdbDictionary::Event::TE_DELETE : mi_type |= 2; break; - case NdbDictionary::Event::TE_UPDATE : mi_type |= 4; break; - default: mi_type = 4 | 2 | 1; // all types - } -} - -void -NdbEventImpl::setDurability(NdbDictionary::Event::EventDurability d) -{ - m_dur = d; -} - -NdbDictionary::Event::EventDurability -NdbEventImpl::getDurability() const -{ - return m_dur; -} - -int NdbEventImpl::getNoOfEventColumns() const -{ - return m_attrIds.size() + m_columns.size(); -} - -/** * NdbDictionaryImpl */ @@ -901,36 +806,6 @@ NdbDictInterface::execSignal(void* dictImpl, case GSN_DROP_INDX_CONF: tmp->execDROP_INDX_CONF(signal, ptr); break; - case GSN_CREATE_EVNT_REF: - tmp->execCREATE_EVNT_REF(signal, ptr); - break; - case GSN_CREATE_EVNT_CONF: - tmp->execCREATE_EVNT_CONF(signal, ptr); - break; - case GSN_SUB_START_CONF: - tmp->execSUB_START_CONF(signal, ptr); - break; - case GSN_SUB_START_REF: - tmp->execSUB_START_REF(signal, ptr); - break; - case GSN_SUB_TABLE_DATA: - tmp->execSUB_TABLE_DATA(signal, ptr); - break; - case GSN_SUB_GCP_COMPLETE_REP: - tmp->execSUB_GCP_COMPLETE_REP(signal, ptr); - break; - case GSN_SUB_STOP_CONF: - tmp->execSUB_STOP_CONF(signal, ptr); - break; - case GSN_SUB_STOP_REF: - tmp->execSUB_STOP_REF(signal, ptr); - break; - case GSN_DROP_EVNT_REF: - tmp->execDROP_EVNT_REF(signal, ptr); - break; - case GSN_DROP_EVNT_CONF: - tmp->execDROP_EVNT_CONF(signal, ptr); - break; case GSN_LIST_TABLES_CONF: tmp->execLIST_TABLES_CONF(signal, ptr); break; @@ -2385,616 +2260,6 @@ NdbDictInterface::execDROP_INDX_REF(NdbApiSignal * signal, } /***************************************************************** - * Create event - */ - -int -NdbDictionaryImpl::createEvent(NdbEventImpl & evnt) -{ - int i; - NdbTableImpl* tab = getTable(evnt.getTableName()); - - if(tab == 0){ -#ifdef EVENT_DEBUG - ndbout_c("NdbDictionaryImpl::createEvent: table not found: %s", - evnt.getTableName()); -#endif - return -1; - } - - evnt.m_tableId = tab->m_tableId; - evnt.m_tableImpl = tab; -#ifdef EVENT_DEBUG - ndbout_c("Event on tableId=%d", evnt.m_tableId); -#endif - - NdbTableImpl &table = *evnt.m_tableImpl; - - - int attributeList_sz = evnt.m_attrIds.size(); - - for (i = 0; i < attributeList_sz; i++) { - NdbColumnImpl *col_impl = table.getColumn(evnt.m_attrIds[i]); - if (col_impl) { - evnt.m_facade->addColumn(*(col_impl->m_facade)); - } else { - ndbout_c("Attr id %u in table %s not found", evnt.m_attrIds[i], - evnt.getTableName()); - m_error.code= 4713; - return -1; - } - } - - evnt.m_attrIds.clear(); - - attributeList_sz = evnt.m_columns.size(); -#ifdef EVENT_DEBUG - ndbout_c("creating event %s", evnt.m_externalName.c_str()); - ndbout_c("no of columns %d", evnt.m_columns.size()); -#endif - int pk_count = 0; - evnt.m_attrListBitmask.clear(); - - for(i = 0; i<attributeList_sz; i++){ - const NdbColumnImpl* col = - table.getColumn(evnt.m_columns[i]->m_name.c_str()); - if(col == 0){ - m_error.code= 4247; - return -1; - } - // Copy column definition - *evnt.m_columns[i] = *col; - - if(col->m_pk){ - pk_count++; - } - - evnt.m_attrListBitmask.set(col->m_attrId); - } - - // Sort index attributes according to primary table (using insertion sort) - for(i = 1; i < attributeList_sz; i++) { - NdbColumnImpl* temp = evnt.m_columns[i]; - unsigned int j = i; - while((j > 0) && (evnt.m_columns[j - 1]->m_attrId > temp->m_attrId)) { - evnt.m_columns[j] = evnt.m_columns[j - 1]; - j--; - } - evnt.m_columns[j] = temp; - } - // Check for illegal duplicate attributes - for(i = 1; i<attributeList_sz; i++) { - if (evnt.m_columns[i-1]->m_attrId == evnt.m_columns[i]->m_attrId) { - m_error.code= 4258; - return -1; - } - } - -#ifdef EVENT_DEBUG - char buf[128] = {0}; - evnt.m_attrListBitmask.getText(buf); - ndbout_c("createEvent: mask = %s", buf); -#endif - - // NdbDictInterface m_receiver; - return m_receiver.createEvent(m_ndb, evnt, 0 /* getFlag unset */); -} - -int -NdbDictInterface::createEvent(class Ndb & ndb, - NdbEventImpl & evnt, - int getFlag) -{ - NdbApiSignal tSignal(m_reference); - tSignal.theReceiversBlockNumber = DBDICT; - tSignal.theVerId_signalNumber = GSN_CREATE_EVNT_REQ; - if (getFlag) - tSignal.theLength = CreateEvntReq::SignalLengthGet; - else - tSignal.theLength = CreateEvntReq::SignalLengthCreate; - - CreateEvntReq * const req = CAST_PTR(CreateEvntReq, tSignal.getDataPtrSend()); - - req->setUserRef(m_reference); - req->setUserData(0); - - if (getFlag) { - // getting event from Dictionary - req->setRequestType(CreateEvntReq::RT_USER_GET); - } else { - // creating event in Dictionary - req->setRequestType(CreateEvntReq::RT_USER_CREATE); - req->setTableId(evnt.m_tableId); - req->setAttrListBitmask(evnt.m_attrListBitmask); - req->setEventType(evnt.mi_type); - } - - UtilBufferWriter w(m_buffer); - - const size_t len = strlen(evnt.m_externalName.c_str()) + 1; - if(len > MAX_TAB_NAME_SIZE) { - m_error.code= 4241; - return -1; - } - - w.add(SimpleProperties::StringValue, evnt.m_externalName.c_str()); - - if (getFlag == 0) - { - const BaseString internal_tabname( - ndb.internalize_table_name(evnt.m_tableName.c_str())); - w.add(SimpleProperties::StringValue, - internal_tabname.c_str()); - } - - LinearSectionPtr ptr[1]; - ptr[0].p = (Uint32*)m_buffer.get_data(); - ptr[0].sz = (m_buffer.length()+3) >> 2; - - int ret = createEvent(&tSignal, ptr, 1); - - if (ret) { - return ret; - } - - char *dataPtr = (char *)m_buffer.get_data(); - unsigned int lenCreateEvntConf = *((unsigned int *)dataPtr); - dataPtr += sizeof(lenCreateEvntConf); - CreateEvntConf const * evntConf = (CreateEvntConf *)dataPtr; - dataPtr += lenCreateEvntConf; - - // NdbEventImpl *evntImpl = (NdbEventImpl *)evntConf->getUserData(); - - if (getFlag) { - evnt.m_tableId = evntConf->getTableId(); - evnt.m_attrListBitmask = evntConf->getAttrListBitmask(); - evnt.mi_type = evntConf->getEventType(); - evnt.setTable(dataPtr); - } else { - if (evnt.m_tableId != evntConf->getTableId() || - //evnt.m_attrListBitmask != evntConf->getAttrListBitmask() || - evnt.mi_type != evntConf->getEventType()) { - ndbout_c("ERROR*************"); - return 1; - } - } - - evnt.m_eventId = evntConf->getEventId(); - evnt.m_eventKey = evntConf->getEventKey(); - - return ret; -} - -int -NdbDictInterface::createEvent(NdbApiSignal* signal, - LinearSectionPtr ptr[3], int noLSP) -{ - const int noErrCodes = 1; - int errCodes[noErrCodes] = {CreateEvntRef::Busy}; - return dictSignal(signal,ptr,noLSP, - 1 /*use masternode id*/, - 100, - WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/, - -1, - errCodes,noErrCodes, CreateEvntRef::Temporary); -} - -int -NdbDictionaryImpl::executeSubscribeEvent(NdbEventImpl & ev) -{ - // NdbDictInterface m_receiver; - return m_receiver.executeSubscribeEvent(m_ndb, ev); -} - -int -NdbDictInterface::executeSubscribeEvent(class Ndb & ndb, - NdbEventImpl & evnt) -{ - DBUG_ENTER("NdbDictInterface::executeSubscribeEvent"); - NdbApiSignal tSignal(m_reference); - // tSignal.theReceiversBlockNumber = SUMA; - tSignal.theReceiversBlockNumber = DBDICT; - tSignal.theVerId_signalNumber = GSN_SUB_START_REQ; - tSignal.theLength = SubStartReq::SignalLength2; - - SubStartReq * sumaStart = CAST_PTR(SubStartReq, tSignal.getDataPtrSend()); - - sumaStart->subscriptionId = evnt.m_eventId; - sumaStart->subscriptionKey = evnt.m_eventKey; - sumaStart->part = SubscriptionData::TableData; - sumaStart->subscriberData = evnt.m_bufferId & 0xFF; - sumaStart->subscriberRef = m_reference; - - DBUG_RETURN(executeSubscribeEvent(&tSignal, NULL)); -} - -int -NdbDictInterface::executeSubscribeEvent(NdbApiSignal* signal, - LinearSectionPtr ptr[3]) -{ - return dictSignal(signal,NULL,0, - 1 /*use masternode id*/, - 100, - WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/, - -1, - NULL,0); -} - -int -NdbDictionaryImpl::stopSubscribeEvent(NdbEventImpl & ev) -{ - // NdbDictInterface m_receiver; - return m_receiver.stopSubscribeEvent(m_ndb, ev); -} - -int -NdbDictInterface::stopSubscribeEvent(class Ndb & ndb, - NdbEventImpl & evnt) -{ - DBUG_ENTER("NdbDictInterface::stopSubscribeEvent"); - - NdbApiSignal tSignal(m_reference); - // tSignal.theReceiversBlockNumber = SUMA; - tSignal.theReceiversBlockNumber = DBDICT; - tSignal.theVerId_signalNumber = GSN_SUB_STOP_REQ; - tSignal.theLength = SubStopReq::SignalLength; - - SubStopReq * sumaStop = CAST_PTR(SubStopReq, tSignal.getDataPtrSend()); - - sumaStop->subscriptionId = evnt.m_eventId; - sumaStop->subscriptionKey = evnt.m_eventKey; - sumaStop->subscriberData = evnt.m_bufferId & 0xFF; - sumaStop->part = (Uint32) SubscriptionData::TableData; - sumaStop->subscriberRef = m_reference; - - DBUG_RETURN(stopSubscribeEvent(&tSignal, NULL)); -} - -int -NdbDictInterface::stopSubscribeEvent(NdbApiSignal* signal, - LinearSectionPtr ptr[3]) -{ - return dictSignal(signal,NULL,0, - 1 /*use masternode id*/, - 100, - WAIT_CREATE_INDX_REQ /*WAIT_SUB_STOP__REQ*/, - -1, - NULL,0); -} - -NdbEventImpl * -NdbDictionaryImpl::getEvent(const char * eventName) -{ - NdbEventImpl *ev = new NdbEventImpl(); - - if (ev == NULL) { - return NULL; - } - - ev->setName(eventName); - - int ret = m_receiver.createEvent(m_ndb, *ev, 1 /* getFlag set */); - - if (ret) { - delete ev; - return NULL; - } - - // We only have the table name with internal name - ev->setTable(m_ndb.externalizeTableName(ev->getTableName())); - ev->m_tableImpl = getTable(ev->getTableName()); - - // get the columns from the attrListBitmask - - NdbTableImpl &table = *ev->m_tableImpl; - AttributeMask & mask = ev->m_attrListBitmask; - int attributeList_sz = mask.count(); - int id = -1; - -#ifdef EVENT_DEBUG - ndbout_c("NdbDictionaryImpl::getEvent attributeList_sz = %d", - attributeList_sz); - char buf[128] = {0}; - mask.getText(buf); - ndbout_c("mask = %s", buf); -#endif - - for(int i = 0; i < attributeList_sz; i++) { - id++; while (!mask.get(id)) id++; - - const NdbColumnImpl* col = table.getColumn(id); - if(col == 0) { -#ifdef EVENT_DEBUG - ndbout_c("NdbDictionaryImpl::getEvent could not find column id %d", id); -#endif - m_error.code= 4247; - delete ev; - return NULL; - } - NdbColumnImpl* new_col = new NdbColumnImpl; - // Copy column definition - *new_col = *col; - - ev->m_columns.push_back(new_col); - } - - return ev; -} - -void -NdbDictInterface::execCREATE_EVNT_CONF(NdbApiSignal * signal, - LinearSectionPtr ptr[3]) -{ - DBUG_ENTER("NdbDictInterface::execCREATE_EVNT_CONF"); - - m_buffer.clear(); - unsigned int len = signal->getLength() << 2; - m_buffer.append((char *)&len, sizeof(len)); - m_buffer.append(signal->getDataPtr(), len); - - if (signal->m_noOfSections > 0) { - m_buffer.append((char *)ptr[0].p, strlen((char *)ptr[0].p)+1); - } - - const CreateEvntConf * const createEvntConf= - CAST_CONSTPTR(CreateEvntConf, signal->getDataPtr()); - - Uint32 subscriptionId = createEvntConf->getEventId(); - Uint32 subscriptionKey = createEvntConf->getEventKey(); - - DBUG_PRINT("info",("subscriptionId=%d,subscriptionKey=%d", - subscriptionId,subscriptionKey)); - m_waiter.signal(NO_WAIT); - DBUG_VOID_RETURN; -} - -void -NdbDictInterface::execCREATE_EVNT_REF(NdbApiSignal * signal, - LinearSectionPtr ptr[3]) -{ - DBUG_ENTER("NdbDictInterface::execCREATE_EVNT_REF"); - - const CreateEvntRef* const ref= - CAST_CONSTPTR(CreateEvntRef, signal->getDataPtr()); - m_error.code= ref->getErrorCode(); - DBUG_PRINT("error",("error=%d,line=%d,node=%d",ref->getErrorCode(), - ref->getErrorLine(),ref->getErrorNode())); - m_waiter.signal(NO_WAIT); - DBUG_VOID_RETURN; -} - -void -NdbDictInterface::execSUB_STOP_CONF(NdbApiSignal * signal, - LinearSectionPtr ptr[3]) -{ - DBUG_ENTER("NdbDictInterface::execSUB_STOP_CONF"); - const SubStopConf * const subStopConf= - CAST_CONSTPTR(SubStopConf, signal->getDataPtr()); - - Uint32 subscriptionId = subStopConf->subscriptionId; - Uint32 subscriptionKey = subStopConf->subscriptionKey; - Uint32 subscriberData = subStopConf->subscriberData; - - DBUG_PRINT("info",("subscriptionId=%d,subscriptionKey=%d,subscriberData=%d", - subscriptionId,subscriptionKey,subscriberData)); - m_waiter.signal(NO_WAIT); - DBUG_VOID_RETURN; -} - -void -NdbDictInterface::execSUB_STOP_REF(NdbApiSignal * signal, - LinearSectionPtr ptr[3]) -{ - DBUG_ENTER("NdbDictInterface::execSUB_STOP_REF"); - const SubStopRef * const subStopRef= - CAST_CONSTPTR(SubStopRef, signal->getDataPtr()); - - Uint32 subscriptionId = subStopRef->subscriptionId; - Uint32 subscriptionKey = subStopRef->subscriptionKey; - Uint32 subscriberData = subStopRef->subscriberData; - m_error.code= subStopRef->errorCode; - - DBUG_PRINT("error",("subscriptionId=%d,subscriptionKey=%d,subscriberData=%d,error=%d", - subscriptionId,subscriptionKey,subscriberData,m_error.code)); - m_waiter.signal(NO_WAIT); - DBUG_VOID_RETURN; -} - -void -NdbDictInterface::execSUB_START_CONF(NdbApiSignal * signal, - LinearSectionPtr ptr[3]) -{ - DBUG_ENTER("NdbDictInterface::execSUB_START_CONF"); - const SubStartConf * const subStartConf= - CAST_CONSTPTR(SubStartConf, signal->getDataPtr()); - - Uint32 subscriptionId = subStartConf->subscriptionId; - Uint32 subscriptionKey = subStartConf->subscriptionKey; - SubscriptionData::Part part = - (SubscriptionData::Part)subStartConf->part; - Uint32 subscriberData = subStartConf->subscriberData; - - switch(part) { - case SubscriptionData::MetaData: { - DBUG_PRINT("error",("SubscriptionData::MetaData")); - m_error.code= 1; - break; - } - case SubscriptionData::TableData: { - DBUG_PRINT("info",("SubscriptionData::TableData")); - break; - } - default: { - DBUG_PRINT("error",("wrong data")); - m_error.code= 2; - break; - } - } - DBUG_PRINT("info",("subscriptionId=%d,subscriptionKey=%d,subscriberData=%d", - subscriptionId,subscriptionKey,subscriberData)); - m_waiter.signal(NO_WAIT); - DBUG_VOID_RETURN; -} - -void -NdbDictInterface::execSUB_START_REF(NdbApiSignal * signal, - LinearSectionPtr ptr[3]) -{ - DBUG_ENTER("NdbDictInterface::execSUB_START_REF"); - const SubStartRef * const subStartRef= - CAST_CONSTPTR(SubStartRef, signal->getDataPtr()); - m_error.code= subStartRef->errorCode; - m_waiter.signal(NO_WAIT); - DBUG_VOID_RETURN; -} -void -NdbDictInterface::execSUB_GCP_COMPLETE_REP(NdbApiSignal * signal, - LinearSectionPtr ptr[3]) -{ - const SubGcpCompleteRep * const rep= - CAST_CONSTPTR(SubGcpCompleteRep, signal->getDataPtr()); - - const Uint32 gci = rep->gci; - // const Uint32 senderRef = rep->senderRef; - const Uint32 subscriberData = rep->subscriberData; - - const Uint32 bufferId = subscriberData; - - const Uint32 ref = signal->theSendersBlockRef; - - NdbApiSignal tSignal(m_reference); - SubGcpCompleteAcc * acc= - CAST_PTR(SubGcpCompleteAcc, tSignal.getDataPtrSend()); - - acc->rep = *rep; - - tSignal.theReceiversBlockNumber = refToBlock(ref); - tSignal.theVerId_signalNumber = GSN_SUB_GCP_COMPLETE_ACC; - tSignal.theLength = SubGcpCompleteAcc::SignalLength; - - Uint32 aNodeId = refToNode(ref); - - // m_transporter->lock_mutex(); - int r; - r = m_transporter->sendSignal(&tSignal, aNodeId); - // m_transporter->unlock_mutex(); - - NdbGlobalEventBufferHandle::latestGCI(bufferId, gci); -} - -void -NdbDictInterface::execSUB_TABLE_DATA(NdbApiSignal * signal, - LinearSectionPtr ptr[3]) -{ -#ifdef EVENT_DEBUG - const char * FNAME = "NdbDictInterface::execSUB_TABLE_DATA"; -#endif - //TODO - const SubTableData * const sdata = CAST_CONSTPTR(SubTableData, signal->getDataPtr()); - - // const Uint32 gci = sdata->gci; - // const Uint32 operation = sdata->operation; - // const Uint32 tableId = sdata->tableId; - // const Uint32 noOfAttrs = sdata->noOfAttributes; - // const Uint32 dataLen = sdata->dataSize; - const Uint32 subscriberData = sdata->subscriberData; - // const Uint32 logType = sdata->logType; - - for (int i=signal->m_noOfSections;i < 3; i++) { - ptr[i].p = NULL; - ptr[i].sz = 0; - } -#ifdef EVENT_DEBUG - ndbout_c("%s: senderData %d, gci %d, operation %d, tableId %d, noOfAttrs %d, dataLen %d", - FNAME, subscriberData, gci, operation, tableId, noOfAttrs, dataLen); - ndbout_c("ptr[0] %u %u ptr[1] %u %u ptr[2] %u %u\n", - ptr[0].p,ptr[0].sz,ptr[1].p,ptr[1].sz,ptr[2].p,ptr[2].sz); -#endif - const Uint32 bufferId = subscriberData; - - NdbGlobalEventBufferHandle::insertDataL(bufferId, - sdata, ptr); -} - -/***************************************************************** - * Drop event - */ -int -NdbDictionaryImpl::dropEvent(const char * eventName) -{ - NdbEventImpl *ev= new NdbEventImpl(); - ev->setName(eventName); - int ret= m_receiver.dropEvent(*ev); - delete ev; - - // printf("__________________RET %u\n", ret); - return ret; -} - -int -NdbDictInterface::dropEvent(const NdbEventImpl &evnt) -{ - NdbApiSignal tSignal(m_reference); - tSignal.theReceiversBlockNumber = DBDICT; - tSignal.theVerId_signalNumber = GSN_DROP_EVNT_REQ; - tSignal.theLength = DropEvntReq::SignalLength; - - DropEvntReq * const req = CAST_PTR(DropEvntReq, tSignal.getDataPtrSend()); - - req->setUserRef(m_reference); - req->setUserData(0); - - UtilBufferWriter w(m_buffer); - - w.add(SimpleProperties::StringValue, evnt.m_externalName.c_str()); - - LinearSectionPtr ptr[1]; - ptr[0].p = (Uint32*)m_buffer.get_data(); - ptr[0].sz = (m_buffer.length()+3) >> 2; - - return dropEvent(&tSignal, ptr, 1); -} - -int -NdbDictInterface::dropEvent(NdbApiSignal* signal, - LinearSectionPtr ptr[3], int noLSP) -{ - //TODO - const int noErrCodes = 1; - int errCodes[noErrCodes] = {DropEvntRef::Busy}; - return dictSignal(signal,ptr,noLSP, - 1 /*use masternode id*/, - 100, - WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/, - -1, - errCodes,noErrCodes, DropEvntRef::Temporary); -} -void -NdbDictInterface::execDROP_EVNT_CONF(NdbApiSignal * signal, - LinearSectionPtr ptr[3]) -{ - DBUG_ENTER("NdbDictInterface::execDROP_EVNT_CONF"); - m_waiter.signal(NO_WAIT); - DBUG_VOID_RETURN; -} - -void -NdbDictInterface::execDROP_EVNT_REF(NdbApiSignal * signal, - LinearSectionPtr ptr[3]) -{ - DBUG_ENTER("NdbDictInterface::execDROP_EVNT_REF"); - const DropEvntRef* const ref= - CAST_CONSTPTR(DropEvntRef, signal->getDataPtr()); - m_error.code= ref->getErrorCode(); - - DBUG_PRINT("info",("ErrorCode=%u Errorline=%u ErrorNode=%u", - ref->getErrorCode(), ref->getErrorLine(), ref->getErrorNode())); - - m_waiter.signal(NO_WAIT); - DBUG_VOID_RETURN; -} - -/***************************************************************** * List objects or indexes */ int diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/ndb/src/ndbapi/NdbDictionaryImpl.hpp index dfccf120228..6a86ee44bfb 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.hpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.hpp @@ -208,55 +208,6 @@ public: NdbDictionary::Index * m_facade; }; -class NdbEventImpl : public NdbDictionary::Event, public NdbDictObjectImpl { -public: - NdbEventImpl(); - NdbEventImpl(NdbDictionary::Event &); - ~NdbEventImpl(); - - void init(); - void setName(const char * name); - const char * getName() const; - void setTable(const NdbDictionary::Table& table); - void setTable(const char * table); - const char * getTableName() const; - void addTableEvent(const NdbDictionary::Event::TableEvent t); - void setDurability(NdbDictionary::Event::EventDurability d); - NdbDictionary::Event::EventDurability getDurability() const; - void addEventColumn(const NdbColumnImpl &c); - int getNoOfEventColumns() const; - - void print() { - ndbout_c("NdbEventImpl: id=%d, key=%d", - m_eventId, - m_eventKey); - }; - - Uint32 m_eventId; - Uint32 m_eventKey; - Uint32 m_tableId; - AttributeMask m_attrListBitmask; - //BaseString m_internalName; - BaseString m_externalName; - Uint32 mi_type; - NdbDictionary::Event::EventDurability m_dur; - - - NdbTableImpl *m_tableImpl; - BaseString m_tableName; - Vector<NdbColumnImpl *> m_columns; - Vector<unsigned> m_attrIds; - - int m_bufferId; - - NdbEventOperation *eventOp; - - static NdbEventImpl & getImpl(NdbDictionary::Event & t); - static NdbEventImpl & getImpl(const NdbDictionary::Event & t); - NdbDictionary::Event * m_facade; -}; - - class NdbDictInterface { public: NdbDictInterface(NdbError& err) : m_error(err) { @@ -294,24 +245,12 @@ public: const NdbTableImpl &); int createIndex(NdbApiSignal* signal, LinearSectionPtr ptr[3]); - int createEvent(class Ndb & ndb, NdbEventImpl &, int getFlag); - int createEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3], int noLSP); - int dropTable(const NdbTableImpl &); int dropTable(NdbApiSignal* signal, LinearSectionPtr ptr[3]); int dropIndex(const NdbIndexImpl &, const NdbTableImpl &); int dropIndex(NdbApiSignal* signal, LinearSectionPtr ptr[3]); - int dropEvent(const NdbEventImpl &); - int dropEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3], int noLSP); - - int executeSubscribeEvent(class Ndb & ndb, NdbEventImpl &); - int executeSubscribeEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3]); - - int stopSubscribeEvent(class Ndb & ndb, NdbEventImpl &); - int stopSubscribeEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3]); - int listObjects(NdbDictionary::Dictionary::List& list, Uint32 requestData, bool fullyQualifiedNames); int listObjects(NdbApiSignal* signal); @@ -357,17 +296,6 @@ private: void execDROP_INDX_REF(NdbApiSignal *, LinearSectionPtr ptr[3]); void execDROP_INDX_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]); - void execCREATE_EVNT_REF(NdbApiSignal *, LinearSectionPtr ptr[3]); - void execCREATE_EVNT_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]); - void execSUB_START_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]); - void execSUB_START_REF(NdbApiSignal *, LinearSectionPtr ptr[3]); - void execSUB_TABLE_DATA(NdbApiSignal *, LinearSectionPtr ptr[3]); - void execSUB_GCP_COMPLETE_REP(NdbApiSignal *, LinearSectionPtr ptr[3]); - void execSUB_STOP_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]); - void execSUB_STOP_REF(NdbApiSignal *, LinearSectionPtr ptr[3]); - void execDROP_EVNT_REF(NdbApiSignal *, LinearSectionPtr ptr[3]); - void execDROP_EVNT_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]); - void execDROP_TABLE_REF(NdbApiSignal *, LinearSectionPtr ptr[3]); void execDROP_TABLE_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]); void execLIST_TABLES_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]); @@ -402,12 +330,6 @@ public: NdbTableImpl * getIndexTable(NdbIndexImpl * index, NdbTableImpl * table); - int createEvent(NdbEventImpl &); - int dropEvent(const char * eventName); - - int executeSubscribeEvent(NdbEventImpl &); - int stopSubscribeEvent(NdbEventImpl &); - int listObjects(List& list, NdbDictionary::Object::Type type); int listIndexes(List& list, Uint32 indexId); @@ -418,8 +340,6 @@ public: const char * tableName); NdbIndexImpl * getIndex(const char * indexName, NdbTableImpl * table); - NdbEventImpl * getEvent(const char * eventName); - NdbEventImpl * getEventImpl(const char * internalName); const NdbError & getNdbError() const; NdbError m_error; @@ -441,18 +361,6 @@ private: }; inline -NdbEventImpl & -NdbEventImpl::getImpl(const NdbDictionary::Event & t){ - return t.m_impl; -} - -inline -NdbEventImpl & -NdbEventImpl::getImpl(NdbDictionary::Event & t){ - return t.m_impl; -} - -inline NdbColumnImpl & NdbColumnImpl::getImpl(NdbDictionary::Column & t){ return t.m_impl; diff --git a/ndb/src/ndbapi/Ndberr.cpp b/ndb/src/ndbapi/Ndberr.cpp index b05818de6f1..ad0b4eafcb4 100644 --- a/ndb/src/ndbapi/Ndberr.cpp +++ b/ndb/src/ndbapi/Ndberr.cpp @@ -21,7 +21,6 @@ #include <NdbOperation.hpp> #include <NdbTransaction.hpp> #include <NdbBlob.hpp> -#include "NdbEventOperationImpl.hpp" static void update(const NdbError & _err){ @@ -73,10 +72,3 @@ NdbBlob::getNdbError() const { update(theError); return theError; } - -const -NdbError & -NdbEventOperationImpl::getNdbError() const { - update(m_error); - return m_error; -} diff --git a/ndb/src/ndbapi/Ndbif.cpp b/ndb/src/ndbapi/Ndbif.cpp index 4af070638d4..6aaf44d0168 100644 --- a/ndb/src/ndbapi/Ndbif.cpp +++ b/ndb/src/ndbapi/Ndbif.cpp @@ -661,29 +661,11 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) case GSN_CREATE_INDX_REF: case GSN_DROP_INDX_CONF: case GSN_DROP_INDX_REF: - case GSN_CREATE_EVNT_CONF: - case GSN_CREATE_EVNT_REF: - case GSN_DROP_EVNT_CONF: - case GSN_DROP_EVNT_REF: case GSN_LIST_TABLES_CONF: NdbDictInterface::execSignal(&theDictionary->m_receiver, aSignal, ptr); break; - case GSN_SUB_META_DATA: - case GSN_SUB_REMOVE_CONF: - case GSN_SUB_REMOVE_REF: - break; // ignore these signals - case GSN_SUB_GCP_COMPLETE_REP: - case GSN_SUB_START_CONF: - case GSN_SUB_START_REF: - case GSN_SUB_TABLE_DATA: - case GSN_SUB_STOP_CONF: - case GSN_SUB_STOP_REF: - NdbDictInterface::execSignal(&theDictionary->m_receiver, - aSignal, ptr); - break; - case GSN_DIHNDBTAMPER: { tFirstDataPtr = int2void(tFirstData); diff --git a/ndb/src/ndbapi/Ndbinit.cpp b/ndb/src/ndbapi/Ndbinit.cpp index d5ad7066273..40cac675b21 100644 --- a/ndb/src/ndbapi/Ndbinit.cpp +++ b/ndb/src/ndbapi/Ndbinit.cpp @@ -34,10 +34,6 @@ #include "NdbUtil.hpp" #include <NdbBlob.hpp> -class NdbGlobalEventBufferHandle; -NdbGlobalEventBufferHandle *NdbGlobalEventBuffer_init(int); -void NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *); - Ndb::Ndb( Ndb_cluster_connection *ndb_cluster_connection, const char* aDataBase , const char* aSchema) : theImpl(NULL) @@ -107,16 +103,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, if (theInitState == NotConstructed) theInitState = NotInitialised; - { - NdbGlobalEventBufferHandle *h= - NdbGlobalEventBuffer_init(NDB_MAX_ACTIVE_EVENTS); - if (h == NULL) { - ndbout_c("Failed NdbGlobalEventBuffer_init(%d)",NDB_MAX_ACTIVE_EVENTS); - exit(-1); - } - theGlobalEventBufferHandle = h; - } - DBUG_VOID_RETURN; } @@ -132,8 +118,6 @@ Ndb::~Ndb() DBUG_PRINT("enter",("Ndb::~Ndb this=0x%x",this)); doDisconnect(); - NdbGlobalEventBuffer_drop(theGlobalEventBufferHandle); - if (TransporterFacade::instance() != NULL && theNdbBlockNumber > 0){ TransporterFacade::instance()->close(theNdbBlockNumber, theFirstTransId); } diff --git a/ndb/test/include/HugoTransactions.hpp b/ndb/test/include/HugoTransactions.hpp index 5795bbc94c9..7a15a2f977d 100644 --- a/ndb/test/include/HugoTransactions.hpp +++ b/ndb/test/include/HugoTransactions.hpp @@ -28,9 +28,6 @@ public: HugoTransactions(const NdbDictionary::Table&, const NdbDictionary::Index* idx = 0); ~HugoTransactions(); - int createEvent(Ndb*); - int eventOperation(Ndb*, void* stats, - int records); int loadTable(Ndb*, int records, int batch = 512, diff --git a/ndb/test/ndbapi/Makefile.am b/ndb/test/ndbapi/Makefile.am index 7dfa239cb66..19d3c4902a8 100644 --- a/ndb/test/ndbapi/Makefile.am +++ b/ndb/test/ndbapi/Makefile.am @@ -31,13 +31,11 @@ testSystemRestart \ testTimeout \ testTransactions \ testDeadlock \ -test_event ndbapi_slow_select testReadPerf testLcp \ +ndbapi_slow_select testReadPerf testLcp \ testPartitioning \ testBitfield \ DbCreate DbAsyncGenerator \ -test_event_multi_table \ -testSRBank \ -test_event_merge +testSRBank #flexTimedAsynch #testBlobs diff --git a/ndb/test/src/HugoTransactions.cpp b/ndb/test/src/HugoTransactions.cpp index 3260b921985..7616c93c9e3 100644 --- a/ndb/test/src/HugoTransactions.cpp +++ b/ndb/test/src/HugoTransactions.cpp @@ -768,285 +768,6 @@ HugoTransactions::fillTable(Ndb* pNdb, } int -HugoTransactions::createEvent(Ndb* pNdb){ - - char eventName[1024]; - sprintf(eventName,"%s_EVENT",tab.getName()); - - NdbDictionary::Dictionary *myDict = pNdb->getDictionary(); - - if (!myDict) { - g_err << "Dictionary not found " - << pNdb->getNdbError().code << " " - << pNdb->getNdbError().message << endl; - return NDBT_FAILED; - } - - NdbDictionary::Event myEvent(eventName); - myEvent.setTable(tab.getName()); - myEvent.addTableEvent(NdbDictionary::Event::TE_ALL); - // myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT); - // myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE); - // myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE); - - // const NdbDictionary::Table *_table = myDict->getTable(tab.getName()); - for(int a = 0; a < tab.getNoOfColumns(); a++){ - // myEvent.addEventColumn(_table->getColumn(a)->getName()); - myEvent.addEventColumn(a); - } - - int res = myDict->createEvent(myEvent); // Add event to database - - if (res == 0) - myEvent.print(); - else if (myDict->getNdbError().classification == - NdbError::SchemaObjectExists) - { - g_info << "Event creation failed event exists\n"; - res = myDict->dropEvent(eventName); - if (res) { - g_err << "Failed to drop event: " - << myDict->getNdbError().code << " : " - << myDict->getNdbError().message << endl; - return NDBT_FAILED; - } - // try again - res = myDict->createEvent(myEvent); // Add event to database - if (res) { - g_err << "Failed to create event (1): " - << myDict->getNdbError().code << " : " - << myDict->getNdbError().message << endl; - return NDBT_FAILED; - } - } - else - { - g_err << "Failed to create event (2): " - << myDict->getNdbError().code << " : " - << myDict->getNdbError().message << endl; - return NDBT_FAILED; - } - - return NDBT_OK; -} - -#include <NdbEventOperation.hpp> -#include "TestNdbEventOperation.hpp" -#include <NdbAutoPtr.hpp> - -struct receivedEvent { - Uint32 pk; - Uint32 count; - Uint32 event; -}; - -int XXXXX = 0; - -int -HugoTransactions::eventOperation(Ndb* pNdb, void* pstats, - int records) { - int myXXXXX = XXXXX++; - Uint32 i; - const char function[] = "HugoTransactions::eventOperation: "; - struct receivedEvent* recInsertEvent; - NdbAutoObjArrayPtr<struct receivedEvent> - p00( recInsertEvent = new struct receivedEvent[3*records] ); - struct receivedEvent* recUpdateEvent = &recInsertEvent[records]; - struct receivedEvent* recDeleteEvent = &recInsertEvent[2*records]; - - EventOperationStats &stats = *(EventOperationStats*)pstats; - - stats.n_inserts = 0; - stats.n_deletes = 0; - stats.n_updates = 0; - stats.n_consecutive = 0; - stats.n_duplicates = 0; - stats.n_inconsistent_gcis = 0; - - for (i = 0; i < records; i++) { - recInsertEvent[i].pk = 0xFFFFFFFF; - recInsertEvent[i].count = 0; - recInsertEvent[i].event = 0xFFFFFFFF; - - recUpdateEvent[i].pk = 0xFFFFFFFF; - recUpdateEvent[i].count = 0; - recUpdateEvent[i].event = 0xFFFFFFFF; - - recDeleteEvent[i].pk = 0xFFFFFFFF; - recDeleteEvent[i].count = 0; - recDeleteEvent[i].event = 0xFFFFFFFF; - } - - NdbDictionary::Dictionary *myDict = pNdb->getDictionary(); - - if (!myDict) { - g_err << function << "Event Creation failedDictionary not found\n"; - return NDBT_FAILED; - } - - int r = 0; - NdbEventOperation *pOp; - - char eventName[1024]; - sprintf(eventName,"%s_EVENT",tab.getName()); - int noEventColumnName = tab.getNoOfColumns(); - - g_info << function << "create EventOperation\n"; - pOp = pNdb->createEventOperation(eventName, 100); - if ( pOp == NULL ) { - g_err << function << "Event operation creation failed\n"; - return NDBT_FAILED; - } - - g_info << function << "get values\n"; - NdbRecAttr* recAttr[1024]; - NdbRecAttr* recAttrPre[1024]; - - const NdbDictionary::Table *_table = myDict->getTable(tab.getName()); - - for (int a = 0; a < noEventColumnName; a++) { - recAttr[a] = pOp->getValue(_table->getColumn(a)->getName()); - recAttrPre[a] = pOp->getPreValue(_table->getColumn(a)->getName()); - } - - // set up the callbacks - g_info << function << "execute\n"; - if (pOp->execute()) { // This starts changes to "start flowing" - g_err << function << "operation execution failed: \n"; - g_err << pOp->getNdbError().code << " " - << pOp->getNdbError().message << endl; - return NDBT_FAILED; - } - - g_info << function << "ok\n"; - - int count = 0; - Uint32 last_inconsitant_gci = 0xEFFFFFF0; - - while (r < records){ - //printf("now waiting for event...\n"); - int res = pNdb->pollEvents(1000); // wait for event or 1000 ms - - if (res > 0) { - //printf("got data! %d\n", r); - int overrun; - while (pOp->next(&overrun) > 0) { - r++; - r += overrun; - count++; - - Uint32 gci = pOp->getGCI(); - Uint32 pk = recAttr[0]->u_32_value(); - - if (!pOp->isConsistent()) { - if (last_inconsitant_gci != gci) { - last_inconsitant_gci = gci; - stats.n_inconsistent_gcis++; - } - g_warning << "A node failure has occured and events might be missing\n"; - } - g_info << function << "GCI " << gci << ": " << count; - struct receivedEvent* recEvent; - switch (pOp->getEventType()) { - case NdbDictionary::Event::TE_INSERT: - stats.n_inserts++; - g_info << " INSERT: "; - recEvent = recInsertEvent; - break; - case NdbDictionary::Event::TE_DELETE: - stats.n_deletes++; - g_info << " DELETE: "; - recEvent = recDeleteEvent; - break; - case NdbDictionary::Event::TE_UPDATE: - stats.n_updates++; - g_info << " UPDATE: "; - recEvent = recUpdateEvent; - break; - case NdbDictionary::Event::TE_ALL: - abort(); - } - - if ((int)pk < records) { - recEvent[pk].pk = pk; - recEvent[pk].count++; - } - - g_info << "overrun " << overrun << " pk " << pk; - for (i = 1; i < noEventColumnName; i++) { - if (recAttr[i]->isNULL() >= 0) { // we have a value - g_info << " post[" << i << "]="; - if (recAttr[i]->isNULL() == 0) // we have a non-null value - g_info << recAttr[i]->u_32_value(); - else // we have a null value - g_info << "NULL"; - } - if (recAttrPre[i]->isNULL() >= 0) { // we have a value - g_info << " pre[" << i << "]="; - if (recAttrPre[i]->isNULL() == 0) // we have a non-null value - g_info << recAttrPre[i]->u_32_value(); - else // we have a null value - g_info << "NULL"; - } - } - g_info << endl; - } - } else - ;//printf("timed out\n"); - } - - // sleep ((XXXXX-myXXXXX)*2); - - g_info << myXXXXX << "dropping event operation" << endl; - - int res = pNdb->dropEventOperation(pOp); - if (res != 0) { - g_err << "operation execution failed\n"; - return NDBT_FAILED; - } - - g_info << myXXXXX << " ok" << endl; - - if (stats.n_inserts > 0) { - stats.n_consecutive++; - } - if (stats.n_deletes > 0) { - stats.n_consecutive++; - } - if (stats.n_updates > 0) { - stats.n_consecutive++; - } - for (i = 0; i < (Uint32)records/3; i++) { - if (recInsertEvent[i].pk != i) { - stats.n_consecutive ++; - ndbout << "missing insert pk " << i << endl; - } else if (recInsertEvent[i].count > 1) { - ndbout << "duplicates insert pk " << i - << " count " << recInsertEvent[i].count << endl; - stats.n_duplicates += recInsertEvent[i].count-1; - } - if (recUpdateEvent[i].pk != i) { - stats.n_consecutive ++; - ndbout << "missing update pk " << i << endl; - } else if (recUpdateEvent[i].count > 1) { - ndbout << "duplicates update pk " << i - << " count " << recUpdateEvent[i].count << endl; - stats.n_duplicates += recUpdateEvent[i].count-1; - } - if (recDeleteEvent[i].pk != i) { - stats.n_consecutive ++; - ndbout << "missing delete pk " << i << endl; - } else if (recDeleteEvent[i].count > 1) { - ndbout << "duplicates delete pk " << i - << " count " << recDeleteEvent[i].count << endl; - stats.n_duplicates += recDeleteEvent[i].count-1; - } - } - - return NDBT_OK; -} - -int HugoTransactions::pkReadRecords(Ndb* pNdb, int records, int batch, |