summaryrefslogtreecommitdiff
path: root/storage/ndb/src/kernel/blocks/suma/Suma.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/kernel/blocks/suma/Suma.cpp')
-rw-r--r--storage/ndb/src/kernel/blocks/suma/Suma.cpp4073
1 files changed, 4073 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/blocks/suma/Suma.cpp b/storage/ndb/src/kernel/blocks/suma/Suma.cpp
new file mode 100644
index 00000000000..ed54505b729
--- /dev/null
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp
@@ -0,0 +1,4073 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "Suma.hpp"
+
+#include <ndb_version.h>
+
+#include <NdbTCP.h>
+#include <Bitmask.hpp>
+#include <SimpleProperties.hpp>
+
+#include <signaldata/NodeFailRep.hpp>
+#include <signaldata/ReadNodesConf.hpp>
+
+#include <signaldata/ListTables.hpp>
+#include <signaldata/GetTabInfo.hpp>
+#include <signaldata/GetTableId.hpp>
+#include <signaldata/DictTabInfo.hpp>
+#include <signaldata/SumaImpl.hpp>
+#include <signaldata/ScanFrag.hpp>
+#include <signaldata/TransIdAI.hpp>
+#include <signaldata/CreateTrig.hpp>
+#include <signaldata/AlterTrig.hpp>
+#include <signaldata/DropTrig.hpp>
+#include <signaldata/FireTrigOrd.hpp>
+#include <signaldata/TrigAttrInfo.hpp>
+#include <signaldata/CheckNodeGroups.hpp>
+#include <signaldata/GCPSave.hpp>
+#include <GrepError.hpp>
+
+#include <DebuggerNames.hpp>
+
+//#define HANDOVER_DEBUG
+//#define NODEFAIL_DEBUG
+//#define NODEFAIL_DEBUG2
+//#define DEBUG_SUMA_SEQUENCE
+//#define EVENT_DEBUG
+//#define EVENT_PH3_DEBUG
+//#define EVENT_DEBUG2
+#if 0
+#undef DBUG_ENTER
+#undef DBUG_PRINT
+#undef DBUG_RETURN
+#undef DBUG_VOID_RETURN
+
+#define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}
+#define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}
+#define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }
+#define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }
+#endif
+
+/**
+ * @todo:
+ * SUMA crashes if an index is created at the same time as
+ * global replication. Very easy to reproduce using testIndex.
+ * Note: This only happens occasionally, but is quite easy to reprod.
+ */
+
+Uint32 g_subPtrI = RNIL;
+static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;
+
+
+/**************************************************************
+ *
+ * Start of suma
+ *
+ */
+
+#define PRINT_ONLY 0
+static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;
+
+void
+Suma::getNodeGroupMembers(Signal* signal) {
+ jam();
+ /**
+ * Ask DIH for nodeGroupMembers
+ */
+ CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend();
+ sd->blockRef = reference();
+ sd->requestType =
+ CheckNodeGroups::Direct |
+ CheckNodeGroups::GetNodeGroupMembers;
+ sd->nodeId = getOwnNodeId();
+ EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal,
+ CheckNodeGroups::SignalLength);
+ jamEntry();
+
+ c_nodeGroup = sd->output;
+ c_noNodesInGroup = 0;
+ for (int i = 0; i < MAX_NDB_NODES; i++) {
+ if (sd->mask.get(i)) {
+ if (i == getOwnNodeId()) c_idInNodeGroup = c_noNodesInGroup;
+ c_nodesInGroup[c_noNodesInGroup] = i;
+ c_noNodesInGroup++;
+ }
+ }
+
+ // ndbout_c("c_noNodesInGroup=%d", c_noNodesInGroup);
+ ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup
+
+#ifdef NODEFAIL_DEBUG
+ for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
+ ndbout_c ("Suma: NodeGroup %u, me %u, me in group %u, member[%u] %u",
+ c_nodeGroup, getOwnNodeId(), c_idInNodeGroup,
+ i, c_nodesInGroup[i]);
+ }
+#endif
+}
+
+void
+Suma::execSTTOR(Signal* signal) {
+ jamEntry();
+
+ DBUG_ENTER("Suma::execSTTOR");
+ const Uint32 startphase = signal->theData[1];
+ const Uint32 typeOfStart = signal->theData[7];
+
+ DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", startphase, typeOfStart));
+
+ if(startphase == 1){
+ jam();
+ c_restartLock = true;
+ }
+
+ if(startphase == 3){
+ jam();
+ g_TypeOfStart = typeOfStart;
+ signal->theData[0] = reference();
+ sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
+
+#if 0
+
+ /**
+ * Debug
+ */
+
+
+ SubscriptionPtr subPtr;
+ Ptr<SyncRecord> syncPtr;
+ ndbrequire(c_subscriptions.seize(subPtr));
+ ndbrequire(c_syncPool.seize(syncPtr));
+
+
+ ndbout_c("Suma: subPtr.i = %d syncPtr.i = %d", subPtr.i, syncPtr.i);
+
+ subPtr.p->m_syncPtrI = syncPtr.i;
+ subPtr.p->m_subscriptionType = SubCreateReq::DatabaseSnapshot;
+ syncPtr.p->m_subscriptionPtrI = subPtr.i;
+ syncPtr.p->ptrI = syncPtr.i;
+ g_subPtrI = subPtr.i;
+ // sendSTTORRY(signal);
+#endif
+ DBUG_VOID_RETURN;
+ }
+
+ if(startphase == 5) {
+ getNodeGroupMembers(signal);
+ if (g_TypeOfStart == NodeState::ST_NODE_RESTART) {
+ jam();
+ for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
+ Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
+ if (ref != reference())
+ sendSignal(ref, GSN_SUMA_START_ME, signal,
+ 1 /*SumaStartMe::SignalLength*/, JBB);
+ }
+ }
+ }
+
+ if(startphase == 7) {
+ c_restartLock = false; // may be set false earlier with HANDOVER_REQ
+
+ if (g_TypeOfStart != NodeState::ST_NODE_RESTART) {
+ for( int i = 0; i < NO_OF_BUCKETS; i++) {
+ if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
+ // I'm running this bucket
+ DBUG_PRINT("info",("bucket %u set to true", i));
+ c_buckets[i].active = true;
+ }
+ }
+ }
+
+ if(g_TypeOfStart == NodeState::ST_INITIAL_START &&
+ c_masterNodeId == getOwnNodeId()) {
+ jam();
+ createSequence(signal);
+ DBUG_VOID_RETURN;
+ }//if
+ }//if
+
+
+ sendSTTORRY(signal);
+
+ DBUG_VOID_RETURN;
+}
+
+void
+Suma::createSequence(Signal* signal)
+{
+ jam();
+ DBUG_ENTER("Suma::createSequence");
+
+ UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
+
+ req->senderData = RNIL;
+ req->sequenceId = SUMA_SEQUENCE;
+ req->requestType = UtilSequenceReq::Create;
+ sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
+ signal, UtilSequenceReq::SignalLength, JBB);
+ // execUTIL_SEQUENCE_CONF will call createSequenceReply()
+ DBUG_VOID_RETURN;
+}
+
+void
+Suma::createSequenceReply(Signal* signal,
+ UtilSequenceConf * conf,
+ UtilSequenceRef * ref)
+{
+ jam();
+
+ if (ref != NULL)
+ ndbrequire(false);
+
+ sendSTTORRY(signal);
+}
+
+void
+Suma::execREAD_NODESCONF(Signal* signal){
+ jamEntry();
+ ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();
+
+ c_aliveNodes.clear();
+ c_preparingNodes.clear();
+
+ Uint32 count = 0;
+ for(Uint32 i = 0; i < MAX_NDB_NODES; i++){
+ if(NodeBitmask::get(conf->allNodes, i)){
+ jam();
+
+ count++;
+
+ NodePtr node;
+ ndbrequire(c_nodes.seize(node));
+
+ node.p->nodeId = i;
+ if(NodeBitmask::get(conf->inactiveNodes, i)){
+ jam();
+ node.p->alive = 0;
+ } else {
+ jam();
+ node.p->alive = 1;
+ c_aliveNodes.set(i);
+ }
+ } else
+ jam();
+ }
+ c_masterNodeId = conf->masterNodeId;
+ ndbrequire(count == conf->noOfNodes);
+
+ sendSTTORRY(signal);
+}
+
+#if 0
+void
+Suma::execREAD_CONFIG_REQ(Signal* signal)
+{
+ const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
+ Uint32 ref = req->senderRef;
+ Uint32 senderData = req->senderData;
+ ndbrequire(req->noOfParameters == 0);
+
+ jamEntry();
+
+ const ndb_mgm_configuration_iterator * p =
+ theConfiguration.getOwnConfigIterator();
+ ndbrequire(p != 0);
+
+ ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_FILES,
+ &cnoLogFiles));
+ ndbrequire(cnoLogFiles > 0);
+
+ ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_FRAG, &cfragrecFileSize));
+ ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_TABLE, &ctabrecFileSize));
+ ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_TC_CONNECT,
+ &ctcConnectrecFileSize));
+ clogFileFileSize = 4 * cnoLogFiles;
+ ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_SCAN, &cscanrecFileSize));
+ cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_SCANS_PER_FRAG;
+
+ initRecords();
+ initialiseRecordsLab(signal, 0, ref, senderData);
+
+ return;
+}//Dblqh::execSIZEALT_REP()
+#endif
+
+void
+Suma::sendSTTORRY(Signal* signal){
+ signal->theData[0] = 0;
+ signal->theData[3] = 1;
+ signal->theData[4] = 3;
+ signal->theData[5] = 5;
+ signal->theData[6] = 7;
+ signal->theData[7] = 255; // No more start phases from missra
+ sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 8, JBB);
+}
+
+void
+Suma::execNDB_STTOR(Signal* signal)
+{
+ jamEntry();
+}
+
+void
+Suma::execCONTINUEB(Signal* signal){
+ jamEntry();
+}
+
+void
+SumaParticipant::execCONTINUEB(Signal* signal)
+{
+ jamEntry();
+}
+
+/*****************************************************************************
+ *
+ * Node state handling
+ *
+ *****************************************************************************/
+
+void Suma::execAPI_FAILREQ(Signal* signal)
+{
+ jamEntry();
+ DBUG_ENTER("Suma::execAPI_FAILREQ");
+ Uint32 failedApiNode = signal->theData[0];
+ //BlockReference retRef = signal->theData[1];
+
+ c_failedApiNodes.set(failedApiNode);
+ bool found = removeSubscribersOnNode(signal, failedApiNode);
+
+ if(!found){
+ jam();
+ c_failedApiNodes.clear(failedApiNode);
+ }
+ DBUG_VOID_RETURN;
+}//execAPI_FAILREQ()
+
+bool
+SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId)
+{
+ DBUG_ENTER("SumaParticipant::removeSubscribersOnNode");
+ bool found = false;
+
+ SubscriberPtr i_subbPtr;
+ c_dataSubscribers.first(i_subbPtr);
+ while(!i_subbPtr.isNull()){
+ SubscriberPtr subbPtr = i_subbPtr;
+ c_dataSubscribers.next(i_subbPtr);
+ jam();
+ if (refToNode(subbPtr.p->m_subscriberRef) == nodeId) {
+ jam();
+ c_dataSubscribers.remove(subbPtr);
+ c_removeDataSubscribers.add(subbPtr);
+ found = true;
+ }
+ }
+ if(found){
+ jam();
+ sendSubStopReq(signal);
+ }
+ DBUG_RETURN(found);
+}
+
+void
+SumaParticipant::sendSubStopReq(Signal *signal, bool unlock){
+ DBUG_ENTER("SumaParticipant::sendSubStopReq");
+ static bool remove_lock = false;
+ jam();
+
+ SubscriberPtr subbPtr;
+ c_removeDataSubscribers.first(subbPtr);
+ if (subbPtr.isNull()){
+ jam();
+#if 0
+ signal->theData[0] = failedApiNode;
+ signal->theData[1] = reference();
+ sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
+#endif
+ c_failedApiNodes.clear();
+
+ remove_lock = false;
+ DBUG_VOID_RETURN;
+ }
+
+ if(remove_lock && !unlock) {
+ jam();
+ DBUG_VOID_RETURN;
+ }
+ remove_lock = true;
+
+ SubscriptionPtr subPtr;
+ c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+
+ SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
+ req->senderRef = reference();
+ req->senderData = subbPtr.i;
+ req->subscriberRef = subbPtr.p->m_subscriberRef;
+ req->subscriberData = subbPtr.p->m_subscriberData;
+ req->subscriptionId = subPtr.p->m_subscriptionId;
+ req->subscriptionKey = subPtr.p->m_subscriptionKey;
+ req->part = SubscriptionData::TableData;
+
+ sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);
+ DBUG_VOID_RETURN;
+}
+
+void
+SumaParticipant::execSUB_STOP_CONF(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execSUB_STOP_CONF");
+
+ SubStopConf * const conf = (SubStopConf*)signal->getDataPtr();
+
+ // Uint32 subscriberData = conf->subscriberData;
+ // Uint32 subscriberRef = conf->subscriberRef;
+
+ Subscription key;
+ key.m_subscriptionId = conf->subscriptionId;
+ key.m_subscriptionKey = conf->subscriptionKey;
+
+ SubscriptionPtr subPtr;
+ if(c_subscriptions.find(subPtr, key)) {
+ jam();
+ if (subPtr.p->m_markRemove) {
+ jam();
+ ndbrequire(false);
+ ndbrequire(subPtr.p->m_nSubscribers > 0);
+ subPtr.p->m_nSubscribers--;
+ if (subPtr.p->m_nSubscribers == 0){
+ jam();
+ completeSubRemoveReq(signal, subPtr);
+ }
+ }
+ }
+
+ sendSubStopReq(signal,true);
+ DBUG_VOID_RETURN;
+}
+
+void
+SumaParticipant::execSUB_STOP_REF(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execSUB_STOP_REF");
+
+ SubStopRef * const ref = (SubStopRef*)signal->getDataPtr();
+
+ Uint32 subscriptionId = ref->subscriptionId;
+ Uint32 subscriptionKey = ref->subscriptionKey;
+ Uint32 part = ref->part;
+ Uint32 subscriberData = ref->subscriberData;
+ Uint32 subscriberRef = ref->subscriberRef;
+ // Uint32 err = ref->err;
+
+ if(!ref->isTemporary()){
+ ndbrequire(false);
+ }
+
+ SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
+ req->subscriberRef = subscriberRef;
+ req->subscriberData = subscriberData;
+ req->subscriptionId = subscriptionId;
+ req->subscriptionKey = subscriptionKey;
+ req->part = part;
+
+ sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);
+
+ DBUG_VOID_RETURN;
+}
+
+void
+Suma::execNODE_FAILREP(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("Suma::execNODE_FAILREP");
+
+ NodeFailRep * const rep = (NodeFailRep*)signal->getDataPtr();
+
+ bool changed = false;
+
+ NodePtr nodePtr;
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("Suma: nodefailrep");
+#endif
+ c_nodeFailGCI = getFirstGCI(signal);
+
+ for(c_nodes.first(nodePtr); nodePtr.i != RNIL; c_nodes.next(nodePtr)){
+ if(NodeBitmask::get(rep->theNodes, nodePtr.p->nodeId)){
+ if(nodePtr.p->alive){
+ ndbassert(c_aliveNodes.get(nodePtr.p->nodeId));
+ changed = true;
+ jam();
+ } else {
+ ndbassert(!c_aliveNodes.get(nodePtr.p->nodeId));
+ jam();
+ }
+
+ if (c_preparingNodes.get(nodePtr.p->nodeId)) {
+ jam();
+ // we are currently preparing this node that died
+ // it's ok just to clear and go back to waiting for it to start up
+ Restart.resetNode(calcSumaBlockRef(nodePtr.p->nodeId));
+ c_preparingNodes.clear(nodePtr.p->nodeId);
+ } else if (c_handoverToDo) {
+ jam();
+ // TODO what if I'm a SUMA that is currently restarting and the SUMA
+ // responsible for restarting me is the one that died?
+
+ // a node has failed whilst handover is going on
+ // let's check if we're in the process of handover with that node
+ c_handoverToDo = false;
+ for( int i = 0; i < NO_OF_BUCKETS; i++) {
+ if (c_buckets[i].handover) {
+ // I'm doing handover, but is it with the dead node?
+ if (getResponsibleSumaNodeId(i) == nodePtr.p->nodeId) {
+ // so it was the dead node, has handover started?
+ if (c_buckets[i].handover_started) {
+ jam();
+ // we're not ok and will have lost data!
+ // set not active to indicate this -
+ // this will generate takeover behaviour
+ c_buckets[i].active = false;
+ c_buckets[i].handover_started = false;
+ } // else we're ok to revert back to state before
+ c_buckets[i].handover = false;
+ } else {
+ jam();
+ // ok, we're doing handover with a different node
+ c_handoverToDo = true;
+ }
+ }
+ }
+ }
+
+ c_failoverBuffer.nodeFailRep();
+
+ nodePtr.p->alive = 0;
+ c_aliveNodes.clear(nodePtr.p->nodeId); // this has to be done after the loop above
+ }
+ }
+ DBUG_VOID_RETURN;
+}
+
+void
+Suma::execINCL_NODEREQ(Signal* signal){
+ jamEntry();
+
+ //const Uint32 senderRef = signal->theData[0];
+ const Uint32 inclNode = signal->theData[1];
+
+ NodePtr node;
+ for(c_nodes.first(node); node.i != RNIL; c_nodes.next(node)){
+ jam();
+ const Uint32 nodeId = node.p->nodeId;
+ if(inclNode == nodeId){
+ jam();
+
+ ndbrequire(node.p->alive == 0);
+ ndbrequire(!c_aliveNodes.get(nodeId));
+
+ for (Uint32 j = 0; j < c_noNodesInGroup; j++) {
+ jam();
+ if (c_nodesInGroup[j] == nodeId) {
+ // the starting node is part of my node group
+ jam();
+ c_preparingNodes.set(nodeId); // set as being prepared
+ for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
+ jam();
+ if (i == c_idInNodeGroup) {
+ jam();
+ // I'm responsible for restarting this SUMA
+ // ALL dict's should have meta data info so it is ok to start
+ Restart.startNode(signal, calcSumaBlockRef(nodeId));
+ break;
+ }//if
+ if (c_aliveNodes.get(c_nodesInGroup[i])) {
+ jam();
+ break; // another Suma takes care of this
+ }//if
+ }//for
+ break;
+ }//if
+ }//for
+
+ node.p->alive = 1;
+ c_aliveNodes.set(nodeId);
+
+ break;
+ }//if
+ }//for
+
+#if 0 // if we include this DIH's got to be prepared, later if needed...
+ signal->theData[0] = reference();
+
+ sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB);
+#endif
+}
+
+void
+Suma::execSIGNAL_DROPPED_REP(Signal* signal){
+ jamEntry();
+ ndbrequire(false);
+}
+
+/********************************************************************
+ *
+ * Dump state
+ *
+ */
+
+static unsigned
+count_subscribers(const DLList<SumaParticipant::Subscriber> &subs)
+{
+ unsigned n= 0;
+ SumaParticipant::SubscriberPtr i_subbPtr;
+ subs.first(i_subbPtr);
+ while(!i_subbPtr.isNull()){
+ n++;
+ subs.next(i_subbPtr);
+ }
+ return n;
+}
+
+void
+Suma::execDUMP_STATE_ORD(Signal* signal){
+ jamEntry();
+
+ Uint32 tCase = signal->theData[0];
+ if(tCase >= 8000 && tCase <= 8003){
+ SubscriptionPtr subPtr;
+ c_subscriptions.getPtr(subPtr, g_subPtrI);
+
+ Ptr<SyncRecord> syncPtr;
+ c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+
+ if(tCase == 8000){
+ syncPtr.p->startMeta(signal);
+ }
+
+ if(tCase == 8001){
+ syncPtr.p->startScan(signal);
+ }
+
+ if(tCase == 8002){
+ syncPtr.p->startTrigger(signal);
+ }
+
+ if(tCase == 8003){
+ subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan;
+ LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList);
+ Uint32 tab = 0;
+ Uint32 att[] = { 0, 1, 1 };
+ syncPtr.p->m_tableList.append(&tab, 1);
+ attrs.append(att, 3);
+ }
+ }
+
+ if(tCase == 8004){
+ infoEvent("Suma: c_subscriberPool size: %d free: %d",
+ c_subscriberPool.getSize(),
+ c_subscriberPool.getNoOfFree());
+
+ infoEvent("Suma: c_tablePool size: %d free: %d",
+ c_tablePool_.getSize(),
+ c_tablePool_.getNoOfFree());
+
+ infoEvent("Suma: c_subscriptionPool size: %d free: %d",
+ c_subscriptionPool.getSize(),
+ c_subscriptionPool.getNoOfFree());
+
+ infoEvent("Suma: c_syncPool size: %d free: %d",
+ c_syncPool.getSize(),
+ c_syncPool.getNoOfFree());
+
+ infoEvent("Suma: c_dataBufferPool size: %d free: %d",
+ c_dataBufferPool.getSize(),
+ c_dataBufferPool.getNoOfFree());
+
+ infoEvent("Suma: c_metaSubscribers count: %d",
+ count_subscribers(c_metaSubscribers));
+ infoEvent("Suma: c_dataSubscribers count: %d",
+ count_subscribers(c_dataSubscribers));
+ infoEvent("Suma: c_prepDataSubscribers count: %d",
+ count_subscribers(c_prepDataSubscribers));
+ infoEvent("Suma: c_removeDataSubscribers count: %d",
+ count_subscribers(c_removeDataSubscribers));
+ }
+}
+
+/********************************************************************
+ *
+ * Convert a table name (db+schema+tablename) to tableId
+ *
+ */
+
+#if 0
+void
+SumaParticipant::convertNameToId(SubscriptionPtr subPtr, Signal * signal)
+{
+ jam();
+ if(subPtr.p->m_currentTable < subPtr.p->m_maxTables) {
+ jam();
+
+ GetTableIdReq * req = (GetTableIdReq *)signal->getDataPtrSend();
+ char * tableName = subPtr.p->m_tableNames[subPtr.p->m_currentTable];
+ const Uint32 strLen = strlen(tableName) + 1; // NULL Terminated
+ req->senderRef = reference();
+ req->senderData = subPtr.i;
+ req->len = strLen;
+
+ LinearSectionPtr ptr[1];
+ ptr[0].p = (Uint32*)tableName;
+ ptr[0].sz = strLen;
+
+ sendSignal(DBDICT_REF,
+ GSN_GET_TABLEID_REQ,
+ signal,
+ GetTableIdReq::SignalLength,
+ JBB,
+ ptr,
+ 1);
+ } else {
+ jam();
+ sendSubCreateConf(signal, subPtr.p->m_subscriberRef, subPtr);
+ }
+}
+#endif
+
+
+void
+SumaParticipant::addTableId(Uint32 tableId,
+ SubscriptionPtr subPtr, SyncRecord *psyncRec)
+{
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("SumaParticipant::addTableId(%u,%u,%u), current_table=%u",
+ tableId, subPtr.i, psyncRec, subPtr.p->m_currentTable);
+#endif
+ subPtr.p->m_tables[tableId] = 1;
+ subPtr.p->m_currentTable++;
+ if(psyncRec != NULL)
+ psyncRec->m_tableList.append(&tableId, 1);
+}
+
+#if 0
+void
+SumaParticipant::execGET_TABLEID_CONF(Signal * signal)
+{
+ jamEntry();
+
+ GetTableIdConf* conf = (GetTableIdConf *)signal->getDataPtr();
+ Uint32 tableId = conf->tableId;
+ //Uint32 schemaVersion = conf->schemaVersion;
+ Uint32 senderData = conf->senderData;
+
+ SubscriptionPtr subPtr;
+ Ptr<SyncRecord> syncPtr;
+
+ c_subscriptions.getPtr(subPtr, senderData);
+ c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+
+ /*
+ * add to m_tableList
+ */
+ addTableId(tableId, subPtr, syncPtr.p);
+
+ convertNameToId(subPtr, signal);
+}
+
+void
+SumaParticipant::execGET_TABLEID_REF(Signal * signal)
+{
+ jamEntry();
+ GetTableIdRef const * ref = (GetTableIdRef *)signal->getDataPtr();
+ Uint32 senderData = ref->senderData;
+ // Uint32 err = ref->err;
+
+ SubscriptionPtr subPtr;
+ c_subscriptions.getPtr(subPtr, senderData);
+ Uint32 subData = subPtr.p->m_subscriberData;
+ SubCreateRef * reff = (SubCreateRef*)ref;
+ /**
+ * @todo: map ref->err to GrepError.
+ */
+ reff->err = GrepError::SELECTED_TABLE_NOT_FOUND;
+ reff->subscriberData = subData;
+ sendSignal(subPtr.p->m_subscriberRef,
+ GSN_SUB_CREATE_REF,
+ signal,
+ SubCreateRef::SignalLength,
+ JBB);
+}
+#endif
+
+
+/*************************************************************
+ *
+ * Creation of subscription id's
+ *
+ ************************************************************/
+
+void
+Suma::execCREATE_SUBID_REQ(Signal* signal)
+{
+ jamEntry();
+
+ CRASH_INSERTION(13001);
+
+ CreateSubscriptionIdReq const * req =
+ (CreateSubscriptionIdReq*)signal->getDataPtr();
+ SubscriberPtr subbPtr;
+ if(!c_subscriberPool.seize(subbPtr)){
+ jam();
+ sendSubIdRef(signal, GrepError::SUBSCRIPTION_ID_NOMEM);
+ return;
+ }
+
+ subbPtr.p->m_subscriberRef = signal->getSendersBlockRef();
+ subbPtr.p->m_senderData = req->senderData;
+ subbPtr.p->m_subscriberData = subbPtr.i;
+
+ UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend();
+
+ utilReq->senderData = subbPtr.p->m_subscriberData;
+ utilReq->sequenceId = SUMA_SEQUENCE;
+ utilReq->requestType = UtilSequenceReq::NextVal;
+ sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
+ signal, UtilSequenceReq::SignalLength, JBB);
+}
+
+void
+Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
+{
+ jamEntry();
+
+ DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF");
+ CRASH_INSERTION(13002);
+
+ UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();
+ if(conf->requestType == UtilSequenceReq::Create) {
+ jam();
+ createSequenceReply(signal, conf, NULL);
+ DBUG_VOID_RETURN;
+ }
+
+ Uint64 subId;
+ memcpy(&subId,conf->sequenceValue,8);
+ Uint32 subData = conf->senderData;
+
+ SubscriberPtr subbPtr;
+ c_subscriberPool.getPtr(subbPtr,subData);
+
+
+ CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf;
+ subconf->subscriptionId = (Uint32)subId;
+ subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF);
+ subconf->subscriberData = subbPtr.p->m_senderData;
+
+ sendSignal(subbPtr.p->m_subscriberRef, GSN_CREATE_SUBID_CONF, signal,
+ CreateSubscriptionIdConf::SignalLength, JBB);
+
+ c_subscriberPool.release(subbPtr);
+
+ DBUG_VOID_RETURN;
+}
+
+void
+Suma::execUTIL_SEQUENCE_REF(Signal* signal)
+{
+ jamEntry();
+ DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF");
+ UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();
+
+ if(ref->requestType == UtilSequenceReq::Create) {
+ jam();
+ createSequenceReply(signal, NULL, ref);
+ DBUG_VOID_RETURN;
+ }
+
+ Uint32 subData = ref->senderData;
+
+ SubscriberPtr subbPtr;
+ c_subscriberPool.getPtr(subbPtr,subData);
+ sendSubIdRef(signal, GrepError::SEQUENCE_ERROR);
+ c_subscriberPool.release(subbPtr);
+ DBUG_VOID_RETURN;
+}//execUTIL_SEQUENCE_REF()
+
+
+void
+SumaParticipant::sendSubIdRef(Signal* signal, Uint32 errCode){
+ jam();
+ CreateSubscriptionIdRef * ref =
+ (CreateSubscriptionIdRef *)signal->getDataPtrSend();
+
+ ref->err = errCode;
+ sendSignal(signal->getSendersBlockRef(),
+ GSN_CREATE_SUBID_REF,
+ signal,
+ CreateSubscriptionIdRef::SignalLength,
+ JBB);
+
+ releaseSections(signal);
+ return;
+}
+
+/**********************************************************
+ * Suma participant interface
+ *
+ * Creation of subscriptions
+ */
+
+void
+SumaParticipant::execSUB_CREATE_REQ(Signal* signal) {
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("SumaParticipant::execSUB_CREATE_REQ");
+#endif
+ jamEntry();
+
+ CRASH_INSERTION(13003);
+
+ const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr();
+
+ const Uint32 subId = req.subscriptionId;
+ const Uint32 subKey = req.subscriptionKey;
+ const Uint32 subRef = req.subscriberRef;
+ const Uint32 subData = req.subscriberData;
+ const Uint32 type = req.subscriptionType & SubCreateReq::RemoveFlags;
+ const Uint32 flags = req.subscriptionType & SubCreateReq::GetFlags;
+ const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0;
+ const bool restartFlag = (flags & SubCreateReq::RestartFlag) != 0;
+
+ const Uint32 sender = signal->getSendersBlockRef();
+
+ Subscription key;
+ key.m_subscriptionId = subId;
+ key.m_subscriptionKey = subKey;
+
+ SubscriptionPtr subPtr;
+ Ptr<SyncRecord> syncPtr;
+
+ if (addTableFlag) {
+ ndbrequire(restartFlag); //TODO remove this
+
+ if(!c_subscriptions.find(subPtr, key)) {
+ jam();
+ sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_NOT_FOUND);
+ return;
+ }
+ jam();
+ c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+ } else {
+ // Check that id/key is unique
+ if(c_subscriptions.find(subPtr, key)) {
+ jam();
+ sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_ID_NOT_UNIQUE);
+ return;
+ }
+ if(!c_subscriptions.seize(subPtr)) {
+ jam();
+ sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL);
+ return;
+ }
+ if(!c_syncPool.seize(syncPtr)) {
+ jam();
+ sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL);
+ return;
+ }
+ jam();
+ subPtr.p->m_subscriberRef = subRef;
+ subPtr.p->m_subscriberData = subData;
+ subPtr.p->m_subscriptionId = subId;
+ subPtr.p->m_subscriptionKey = subKey;
+ subPtr.p->m_subscriptionType = type;
+
+ /**
+ * ok to memset? Support on all compilers
+ * @todo find out if memset is supported by all compilers
+ */
+ memset(subPtr.p->m_tables,0,MAX_TABLES);
+ subPtr.p->m_maxTables = 0;
+ subPtr.p->m_currentTable = 0;
+ subPtr.p->m_syncPtrI = syncPtr.i;
+ subPtr.p->m_markRemove = false;
+ subPtr.p->m_nSubscribers = 0;
+
+ c_subscriptions.add(subPtr);
+
+ syncPtr.p->m_subscriptionPtrI = subPtr.i;
+ syncPtr.p->m_doSendSyncData = true;
+ syncPtr.p->ptrI = syncPtr.i;
+ syncPtr.p->m_locked = false;
+ syncPtr.p->m_error = false;
+ }
+
+ if (restartFlag ||
+ type == SubCreateReq::TableEvent) {
+
+ syncPtr.p->m_doSendSyncData = false;
+
+ ndbrequire(type != SubCreateReq::SingleTableScan);
+ jam();
+
+ if (subPtr.p->m_tables[req.tableId] != 0) {
+ ndbrequire(false); //TODO remove
+ jam();
+ sendSubCreateRef(signal, req, GrepError::SELECTED_TABLE_ALREADY_ADDED);
+ return;
+ }
+ if (addTableFlag) {
+ ndbrequire(type != SubCreateReq::TableEvent);
+ jam();
+ }
+ subPtr.p->m_maxTables++;
+ addTableId(req.tableId, subPtr, syncPtr.p);
+ } else {
+ switch(type){
+ case SubCreateReq::SingleTableScan:
+ {
+ jam();
+ syncPtr.p->m_tableList.append(&req.tableId, 1);
+ if(signal->getNoOfSections() > 0){
+ SegmentedSectionPtr ptr;
+ signal->getSection(ptr, SubCreateReq::ATTRIBUTE_LIST);
+ LocalDataBuffer<15> attrBuf(c_dataBufferPool,syncPtr.p->m_attributeList);
+ append(attrBuf, ptr, getSectionSegmentPool());
+ }
+ }
+ break;
+#if 0
+ case SubCreateReq::SelectiveTableSnapshot:
+ /**
+ * Tables specified by the user that does not exist
+ * in the database are just ignored. No error message
+ * is given, nor does the db nodes crash
+ * @todo: Memory is not release here (used tableBuf)
+ */
+ {
+ if(signal->getNoOfSections() == 0 ){
+ jam();
+ sendSubCreateRef(signal, req, GrepError::WRONG_NO_OF_SECTIONS);
+ return;
+ }
+
+ jam();
+ SegmentedSectionPtr ptr;
+ signal->getSection(ptr,0);// SubCreateReq::TABLE_LIST);
+ SimplePropertiesSectionReader r0(ptr, getSectionSegmentPool());
+ Uint32 i=0;
+ char table[MAX_TAB_NAME_SIZE];
+ r0.reset();
+ r0.first();
+ while(true){
+ if ((r0.getValueType() != SimpleProperties::StringValue) ||
+ (r0.getValueLen() <= 0)) {
+ releaseSections(signal);
+ ndbrequire(false);
+ }
+ r0.getString(table);
+ strcpy(subPtr.p->m_tableNames[i],table);
+ i++;
+ if(!r0.next())
+ break;
+ }
+ releaseSections(signal);
+ subPtr.p->m_maxTables = i;
+ subPtr.p->m_currentTable = 0;
+ releaseSections(signal);
+ convertNameToId(subPtr, signal);
+ return;
+ }
+ break;
+#endif
+ case SubCreateReq::DatabaseSnapshot:
+ {
+ jam();
+ }
+ break;
+ default:
+ ndbrequire(false);
+ }
+ }
+
+ sendSubCreateConf(signal, sender, subPtr);
+
+ return;
+}
+
+void
+SumaParticipant::sendSubCreateConf(Signal* signal, Uint32 sender,
+ SubscriptionPtr subPtr)
+{
+ SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
+ conf->subscriptionId = subPtr.p->m_subscriptionId;
+ conf->subscriptionKey = subPtr.p->m_subscriptionKey;
+ conf->subscriberData = subPtr.p->m_subscriberData;
+ sendSignal(sender, GSN_SUB_CREATE_CONF, signal,
+ SubCreateConf::SignalLength, JBB);
+}
+
+void
+SumaParticipant::sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errCode){
+ jam();
+ SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend();
+ ref->subscriberRef = reference();
+ ref->subscriberData = req.subscriberData;
+ ref->err = errCode;
+ releaseSections(signal);
+ sendSignal(signal->getSendersBlockRef(), GSN_SUB_CREATE_REF, signal,
+ SubCreateRef::SignalLength, JBB);
+ return;
+}
+
+
+
+
+
+
+
+
+
+
+
+
+Uint32
+SumaParticipant::getFirstGCI(Signal* signal) {
+ if (c_lastCompleteGCI == RNIL) {
+ ndbout_c("WARNING: c_lastCompleteGCI == RNIL");
+ return 0;
+ }
+ return c_lastCompleteGCI+3;
+}
+
+/**********************************************************
+ *
+ * Setting upp trigger for subscription
+ *
+ */
+
+void
+SumaParticipant::execSUB_SYNC_REQ(Signal* signal) {
+ jamEntry();
+
+ CRASH_INSERTION(13004);
+#ifdef EVENT_PH3_DEBUG
+ ndbout_c("SumaParticipant::execSUB_SYNC_REQ");
+#endif
+
+ SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr();
+
+ SubscriptionPtr subPtr;
+ Subscription key;
+ key.m_subscriptionId = req->subscriptionId;
+ key.m_subscriptionKey = req->subscriptionKey;
+
+ if(!c_subscriptions.find(subPtr, key)){
+ jam();
+ sendSubSyncRef(signal, GrepError::SUBSCRIPTION_ID_NOT_FOUND);
+ return;
+ }
+
+ /**
+ * @todo Tomas, do you really need to do this?
+ */
+ if(subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) {
+ jam();
+ subPtr.p->m_subscriberData = req->subscriberData;
+ }
+
+ bool ok = false;
+ SubscriptionData::Part part = (SubscriptionData::Part)req->part;
+
+ Ptr<SyncRecord> syncPtr;
+ c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+ switch(part){
+ case SubscriptionData::MetaData:
+ ok = true;
+ jam();
+ if (subPtr.p->m_subscriptionType == SubCreateReq::DatabaseSnapshot) {
+ TableList::DataBufferIterator it;
+ syncPtr.p->m_tableList.first(it);
+ if(it.isNull()) {
+ /**
+ * Get all tables from dict
+ */
+ ListTablesReq * req = (ListTablesReq*)signal->getDataPtrSend();
+ req->senderRef = reference();
+ req->senderData = syncPtr.i;
+ req->requestData = 0;
+ /**
+ * @todo: accomodate scan of index tables?
+ */
+ req->setTableType(DictTabInfo::UserTable);
+
+ sendSignal(DBDICT_REF, GSN_LIST_TABLES_REQ, signal,
+ ListTablesReq::SignalLength, JBB);
+ break;
+ }
+ }
+
+ syncPtr.p->startMeta(signal);
+ break;
+ case SubscriptionData::TableData: {
+ ok = true;
+ jam();
+ syncPtr.p->startScan(signal);
+ break;
+ }
+ }
+ ndbrequire(ok);
+}
+
+void
+SumaParticipant::sendSubSyncRef(Signal* signal, Uint32 errCode){
+ jam();
+ SubSyncRef * ref =
+ (SubSyncRef *)signal->getDataPtrSend();
+ ref->err = errCode;
+ sendSignal(signal->getSendersBlockRef(),
+ GSN_SUB_SYNC_REF,
+ signal,
+ SubSyncRef::SignalLength,
+ JBB);
+
+ releaseSections(signal);
+ return;
+}
+
+/**********************************************************
+ * Dict interface
+ */
+
+void
+SumaParticipant::execLIST_TABLES_CONF(Signal* signal){
+ jamEntry();
+ CRASH_INSERTION(13005);
+ ListTablesConf* const conf = (ListTablesConf*)signal->getDataPtr();
+ SyncRecord* tmp = c_syncPool.getPtr(conf->senderData);
+ tmp->runLIST_TABLES_CONF(signal);
+}
+
+
+void
+SumaParticipant::execGET_TABINFOREF(Signal* signal){
+ jamEntry();
+ GetTabInfoRef* const ref = (GetTabInfoRef*)signal->getDataPtr();
+ SyncRecord* tmp = c_syncPool.getPtr(ref->senderData);
+ tmp->runGET_TABINFOREF(signal);
+}
+
+void
+SumaParticipant::execGET_TABINFO_CONF(Signal* signal){
+ jamEntry();
+
+ CRASH_INSERTION(13006);
+
+ if(!assembleFragments(signal)){
+ return;
+ }
+
+ GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr();
+
+ Uint32 tableId = conf->tableId;
+ Uint32 senderData = conf->senderData;
+
+ SyncRecord* tmp = c_syncPool.getPtr(senderData);
+ ndbrequire(parseTable(signal, conf, tableId, tmp));
+ tmp->runGET_TABINFO_CONF(signal);
+}
+
+bool
+SumaParticipant::parseTable(Signal* signal, GetTabInfoConf* conf, Uint32 tableId,
+ SyncRecord* syncPtr_p){
+
+ SegmentedSectionPtr ptr;
+ signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
+
+ SimplePropertiesSectionReader it(ptr, getSectionSegmentPool());
+
+ SimpleProperties::UnpackStatus s;
+ DictTabInfo::Table tableDesc; tableDesc.init();
+ s = SimpleProperties::unpack(it, &tableDesc,
+ DictTabInfo::TableMapping,
+ DictTabInfo::TableMappingSize,
+ true, true);
+
+ ndbrequire(s == SimpleProperties::Break);
+
+ TablePtr tabPtr;
+ c_tables.find(tabPtr, tableId);
+
+ if(!tabPtr.isNull() &&
+ tabPtr.p->m_schemaVersion != tableDesc.TableVersion){
+ jam();
+
+ tabPtr.p->release(* this);
+
+ // oops wrong schema version in stored tabledesc
+ // we need to find all subscriptions with old table desc
+ // and all subscribers to this
+ // hopefully none
+ c_tables.release(tabPtr);
+ tabPtr.setNull();
+ DLHashTable<SumaParticipant::Subscription>::Iterator i_subPtr;
+ c_subscriptions.first(i_subPtr);
+ SubscriptionPtr subPtr;
+ for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){
+ jam();
+ c_subscriptions.getPtr(subPtr, i_subPtr.curr.i);
+ SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);
+ if (tmp == syncPtr_p) {
+ jam();
+ continue;
+ }
+ if (subPtr.p->m_tables[tableId]) {
+ jam();
+ subPtr.p->m_tables[tableId] = 0; // remove this old table reference
+ TableList::DataBufferIterator it;
+ for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) {
+ jam();
+ if (*it.data == tableId){
+ jam();
+ Uint32 *pdata = it.data;
+ tmp->m_tableList.next(it);
+ for(;!it.isNull();tmp->m_tableList.next(it)) {
+ jam();
+ *pdata = *it.data;
+ pdata = it.data;
+ }
+ *pdata = RNIL; // todo remove this last item...
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (tabPtr.isNull()) {
+ jam();
+ /**
+ * Uninitialized table record
+ */
+ ndbrequire(c_tables.seize(tabPtr));
+ new (tabPtr.p) Table;
+ tabPtr.p->m_schemaVersion = RNIL;
+ tabPtr.p->m_tableId = tableId;
+ tabPtr.p->m_hasTriggerDefined[0] = 0;
+ tabPtr.p->m_hasTriggerDefined[1] = 0;
+ tabPtr.p->m_hasTriggerDefined[2] = 0;
+ tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID;
+ tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;
+ tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;
+#if 0
+ ndbout_c("Get tab info conf %d", tableId);
+#endif
+ c_tables.add(tabPtr);
+ }
+
+ if(tabPtr.p->m_attributes.getSize() != 0){
+ jam();
+ return true;
+ }
+
+ /**
+ * Initialize table object
+ */
+ Uint32 noAttribs = tableDesc.NoOfAttributes;
+ Uint32 notFixed = (tableDesc.NoOfNullable+tableDesc.NoOfVariable);
+ tabPtr.p->m_schemaVersion = tableDesc.TableVersion;
+
+ // The attribute buffer
+ LocalDataBuffer<15> attrBuf(c_dataBufferPool, tabPtr.p->m_attributes);
+
+ // Temporary buffer
+ DataBuffer<15> theRest(c_dataBufferPool);
+
+ if(!attrBuf.seize(noAttribs)){
+ ndbrequire(false);
+ return false;
+ }
+
+ if(!theRest.seize(notFixed)){
+ ndbrequire(false);
+ return false;
+ }
+
+ DataBuffer<15>::DataBufferIterator attrIt; // Fixed not nullable
+ DataBuffer<15>::DataBufferIterator restIt; // variable + nullable
+ attrBuf.first(attrIt);
+ theRest.first(restIt);
+
+ for(Uint32 i = 0; i < noAttribs; i++) {
+ DictTabInfo::Attribute attrDesc; attrDesc.init();
+ s = SimpleProperties::unpack(it, &attrDesc,
+ DictTabInfo::AttributeMapping,
+ DictTabInfo::AttributeMappingSize,
+ true, true);
+ ndbrequire(s == SimpleProperties::Break);
+
+ if (!attrDesc.AttributeNullableFlag
+ /* && !attrDesc.AttributeVariableFlag */) {
+ jam();
+ * attrIt.data = attrDesc.AttributeId;
+ attrBuf.next(attrIt);
+ } else {
+ jam();
+ * restIt.data = attrDesc.AttributeId;
+ theRest.next(restIt);
+ }
+
+ // Move to next attribute
+ it.next();
+ }
+
+ /**
+ * Put the rest in end of attrBuf
+ */
+ theRest.first(restIt);
+ for(; !restIt.isNull(); theRest.next(restIt)){
+ * attrIt.data = * restIt.data;
+ attrBuf.next(attrIt);
+ }
+
+ theRest.release();
+
+ return true;
+}
+
+void
+SumaParticipant::execDI_FCOUNTCONF(Signal* signal){
+ jamEntry();
+
+ CRASH_INSERTION(13007);
+
+ const Uint32 senderData = signal->theData[3];
+ SyncRecord* tmp = c_syncPool.getPtr(senderData);
+ tmp->runDI_FCOUNTCONF(signal);
+}
+
+void
+SumaParticipant::execDIGETPRIMCONF(Signal* signal){
+ jamEntry();
+
+ CRASH_INSERTION(13008);
+
+ const Uint32 senderData = signal->theData[1];
+ SyncRecord* tmp = c_syncPool.getPtr(senderData);
+ tmp->runDIGETPRIMCONF(signal);
+}
+
+void
+SumaParticipant::execCREATE_TRIG_CONF(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execCREATE_TRIG_CONF");
+ CRASH_INSERTION(13009);
+
+ CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();
+
+ const Uint32 senderData = conf->getConnectionPtr();
+ SyncRecord* tmp = c_syncPool.getPtr(senderData);
+ tmp->runCREATE_TRIG_CONF(signal);
+
+ /**
+ * dodido
+ * @todo: I (Johan) dont know what to do here. Jonas, what do you mean?
+ */
+ DBUG_VOID_RETURN;
+}
+
+void
+SumaParticipant::execCREATE_TRIG_REF(Signal* signal){
+ jamEntry();
+ ndbrequire(false);
+}
+
+void
+SumaParticipant::execDROP_TRIG_CONF(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF");
+ CRASH_INSERTION(13010);
+
+ DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
+
+ const Uint32 senderData = conf->getConnectionPtr();
+ SyncRecord* tmp = c_syncPool.getPtr(senderData);
+ tmp->runDROP_TRIG_CONF(signal);
+ DBUG_VOID_RETURN;
+}
+
+void
+SumaParticipant::execDROP_TRIG_REF(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF");
+ DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
+
+ const Uint32 senderData = ref->getConnectionPtr();
+ SyncRecord* tmp = c_syncPool.getPtr(senderData);
+ tmp->runDROP_TRIG_CONF(signal);
+ DBUG_VOID_RETURN;
+}
+
+/*************************************************************************
+ *
+ *
+ */
+
+void
+SumaParticipant::SyncRecord::runLIST_TABLES_CONF(Signal* signal){
+ jam();
+
+ ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtr();
+ const Uint32 len = signal->length() - ListTablesConf::HeaderLength;
+
+ SubscriptionPtr subPtr;
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+
+ for (unsigned i = 0; i < len; i++) {
+ subPtr.p->m_maxTables++;
+ suma.addTableId(ListTablesConf::getTableId(conf->tableData[i]), subPtr, this);
+ }
+
+ // for (unsigned i = 0; i < len; i++)
+ // conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]);
+ // m_tableList.append(&conf->tableData[0], len);
+
+#if 0
+ TableList::DataBufferIterator it;
+ int i = 0;
+ for(m_tableList.first(it);!it.isNull();m_tableList.next(it)) {
+ ndbout_c("%u listtableconf tableid %d", i++, *it.data);
+ }
+#endif
+
+ if(len == ListTablesConf::DataLength){
+ jam();
+ // we expect more LIST_TABLE_CONF
+ return;
+ }
+
+#if 0
+ subPtr.p->m_currentTable = 0;
+ subPtr.p->m_maxTables = 0;
+
+ TableList::DataBufferIterator it;
+ for(m_tableList.first(it); !it.isNull(); m_tableList.next(it)) {
+ subPtr.p->m_maxTables++;
+ suma.addTableId(*it.data, subPtr, NULL);
+#ifdef NODEFAIL_DEBUG
+ ndbout_c(" listtableconf tableid %d",*it.data);
+#endif
+ }
+#endif
+
+ startMeta(signal);
+}
+
+void
+SumaParticipant::SyncRecord::startMeta(Signal* signal){
+ jam();
+ m_currentTable = 0;
+ nextMeta(signal);
+}
+
+/**
+ * m_tableList only contains UserTables
+ */
+void
+SumaParticipant::SyncRecord::nextMeta(Signal* signal){
+ jam();
+
+ TableList::DataBufferIterator it;
+ if(!m_tableList.position(it, m_currentTable)){
+ completeMeta(signal);
+ return;
+ }
+
+ GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
+ req->senderRef = suma.reference();
+ req->senderData = ptrI;
+ req->requestType =
+ GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
+ req->tableId = * it.data;
+
+#if 0
+ ndbout_c("GET_TABINFOREQ id %d", req->tableId);
+#endif
+ suma.sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
+ GetTabInfoReq::SignalLength, JBB);
+}
+
+void
+SumaParticipant::SyncRecord::runGET_TABINFOREF(Signal* signal)
+{
+ jam();
+
+ SubscriptionPtr subPtr;
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+ ndbrequire(subPtr.p->m_syncPtrI == ptrI);
+
+ Uint32 type = subPtr.p->m_subscriptionType;
+
+ bool do_continue = false;
+ switch (type) {
+ case SubCreateReq::TableEvent:
+ jam();
+ break;
+ case SubCreateReq::DatabaseSnapshot:
+ jam();
+ do_continue = true;
+ break;
+ case SubCreateReq::SelectiveTableSnapshot:
+ jam();
+ do_continue = true;
+ break;
+ case SubCreateReq::SingleTableScan:
+ jam();
+ break;
+ default:
+ ndbrequire(false);
+ break;
+ }
+
+ if (! do_continue) {
+ m_error = true;
+ completeMeta(signal);
+ return;
+ }
+
+ m_currentTable++;
+ nextMeta(signal);
+ return;
+
+ // now we need to clean-up
+}
+
+
+void
+SumaParticipant::SyncRecord::runGET_TABINFO_CONF(Signal* signal){
+ jam();
+
+ GetTabInfoConf * const conf = (GetTabInfoConf*)signal->getDataPtr();
+ // const Uint32 gci = conf->gci;
+ const Uint32 tableId = conf->tableId;
+ TableList::DataBufferIterator it;
+
+ ndbrequire(m_tableList.position(it, m_currentTable));
+ ndbrequire(* it.data == tableId);
+
+ SubscriptionPtr subPtr;
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+ ndbrequire(subPtr.p->m_syncPtrI == ptrI);
+
+ SegmentedSectionPtr ptr;
+ signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
+
+ SubMetaData * data = (SubMetaData*)signal->getDataPtrSend();
+ /**
+ * sending lastCompleteGCI. Used by Lars in interval calculations
+ * incremenet by one, since last_CompleteGCI is the not the current gci.
+ */
+ data->gci = suma.c_lastCompleteGCI + 1;
+ data->tableId = tableId;
+ data->senderData = subPtr.p->m_subscriberData;
+#if PRINT_ONLY
+ ndbout_c("GSN_SUB_META_DATA Table %d", tableId);
+#else
+
+ bool okToSend = m_doSendSyncData;
+
+ /*
+ * If it is a selectivetablesnapshot and the table is not part of the
+ * subscription, then do not send anything, just continue.
+ * If it is a tablevent, don't send regardless since the APIs are not
+ * interested in meta data.
+ */
+ if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot)
+ if(!subPtr.p->m_tables[tableId])
+ okToSend = false;
+
+ if(okToSend) {
+ if(refToNode(subPtr.p->m_subscriberRef) == 0){
+ jam();
+ suma.EXECUTE_DIRECT(refToBlock(subPtr.p->m_subscriberRef),
+ GSN_SUB_META_DATA,
+ signal,
+ SubMetaData::SignalLength);
+ jamEntry();
+ suma.releaseSections(signal);
+ } else {
+ jam();
+ suma.sendSignal(subPtr.p->m_subscriberRef,
+ GSN_SUB_META_DATA,
+ signal,
+ SubMetaData::SignalLength, JBB);
+ }
+ }
+#endif
+
+ TablePtr tabPtr;
+ ndbrequire(suma.c_tables.find(tabPtr, tableId));
+
+ LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);
+ if(fragBuf.getSize() == 0){
+ /**
+ * We need to gather fragment info
+ */
+ jam();
+ signal->theData[0] = RNIL;
+ signal->theData[1] = tableId;
+ signal->theData[2] = ptrI;
+ suma.sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);
+ return;
+ }
+
+ m_currentTable++;
+ nextMeta(signal);
+}
+
+void
+SumaParticipant::SyncRecord::runDI_FCOUNTCONF(Signal* signal){
+ jam();
+
+ const Uint32 userPtr = signal->theData[0];
+ const Uint32 fragCount = signal->theData[1];
+ const Uint32 tableId = signal->theData[2];
+
+ ndbrequire(userPtr == RNIL && signal->length() == 5);
+
+ TablePtr tabPtr;
+ ndbrequire(suma.c_tables.find(tabPtr, tableId));
+
+ LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);
+ ndbrequire(fragBuf.getSize() == 0);
+
+ m_currentFragment = fragCount;
+ signal->theData[0] = RNIL;
+ signal->theData[1] = ptrI;
+ signal->theData[2] = tableId;
+ signal->theData[3] = 0; // Frag no
+ suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);
+}
+
+void
+SumaParticipant::SyncRecord::runDIGETPRIMCONF(Signal* signal){
+ jam();
+
+ const Uint32 userPtr = signal->theData[0];
+ //const Uint32 senderData = signal->theData[1];
+ const Uint32 nodeCount = signal->theData[6];
+ const Uint32 tableId = signal->theData[7];
+ const Uint32 fragNo = signal->theData[8];
+
+ ndbrequire(userPtr == RNIL && signal->length() == 9);
+ ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);
+
+ TablePtr tabPtr;
+ ndbrequire(suma.c_tables.find(tabPtr, tableId));
+ LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);
+
+ /**
+ * Add primary node for fragment to list
+ */
+ FragmentDescriptor fd;
+ fd.m_fragDesc.m_nodeId = signal->theData[2];
+ fd.m_fragDesc.m_fragmentNo = fragNo;
+ signal->theData[2] = fd.m_dummy;
+ fragBuf.append(&signal->theData[2], 1);
+
+ const Uint32 nextFrag = fragNo + 1;
+ if(nextFrag == m_currentFragment){
+ /**
+ * Complete frag info for table
+ */
+ m_currentTable++;
+ nextMeta(signal);
+ return;
+ }
+ signal->theData[0] = RNIL;
+ signal->theData[1] = ptrI;
+ signal->theData[2] = tableId;
+ signal->theData[3] = nextFrag; // Frag no
+ suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);
+}
+
+void
+SumaParticipant::SyncRecord::completeMeta(Signal* signal){
+ jam();
+ SubscriptionPtr subPtr;
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+ ndbrequire(subPtr.p->m_syncPtrI == ptrI);
+
+#if PRINT_ONLY
+ ndbout_c("GSN_SUB_SYNC_CONF (meta)");
+#else
+
+ suma.releaseSections(signal);
+
+ if (m_error) {
+ SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();
+ ref->subscriptionId = subPtr.p->m_subscriptionId;
+ ref->subscriptionKey = subPtr.p->m_subscriptionKey;
+ ref->part = SubscriptionData::MetaData;
+ ref->subscriberData = subPtr.p->m_subscriberData;
+ ref->errorCode = SubSyncRef::Undefined;
+ suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_REF, signal,
+ SubSyncRef::SignalLength, JBB);
+ } else {
+ SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
+ conf->subscriptionId = subPtr.p->m_subscriptionId;
+ conf->subscriptionKey = subPtr.p->m_subscriptionKey;
+ conf->part = SubscriptionData::MetaData;
+ conf->subscriberData = subPtr.p->m_subscriberData;
+ suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal,
+ SubSyncConf::SignalLength, JBB);
+ }
+#endif
+}
+
+/**********************************************************
+ *
+ * Scan interface
+ *
+ */
+
+void
+SumaParticipant::SyncRecord::startScan(Signal* signal){
+ jam();
+
+ /**
+ * Get fraginfo
+ */
+ m_currentTable = 0;
+ m_currentFragment = 0;
+
+ nextScan(signal);
+}
+
+bool
+SumaParticipant::SyncRecord::getNextFragment(TablePtr * tab,
+ FragmentDescriptor * fd){
+ jam();
+ SubscriptionPtr subPtr;
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+ TableList::DataBufferIterator tabIt;
+ DataBuffer<15>::DataBufferIterator fragIt;
+
+ m_tableList.position(tabIt, m_currentTable);
+ for(; !tabIt.curr.isNull(); m_tableList.next(tabIt), m_currentTable++){
+ TablePtr tabPtr;
+ ndbrequire(suma.c_tables.find(tabPtr, * tabIt.data));
+ if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot)
+ {
+ if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) {
+ *tab = tabPtr;
+ return true;
+ }
+ }
+ LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);
+
+ fragBuf.position(fragIt, m_currentFragment);
+ for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++){
+ FragmentDescriptor tmp;
+ tmp.m_dummy = * fragIt.data;
+ if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){
+ * fd = tmp;
+ * tab = tabPtr;
+ return true;
+ }
+ }
+ m_currentFragment = 0;
+ }
+ return false;
+}
+
+void
+SumaParticipant::SyncRecord::nextScan(Signal* signal){
+ jam();
+ TablePtr tabPtr;
+ FragmentDescriptor fd;
+ SubscriptionPtr subPtr;
+ if(!getNextFragment(&tabPtr, &fd)){
+ jam();
+ completeScan(signal);
+ return;
+ }
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+ ndbrequire(subPtr.p->m_syncPtrI == ptrI);
+
+ if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) {
+ jam();
+ if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) {
+ /*
+ * table is not part of the subscription. Check next table
+ */
+ m_currentTable++;
+ nextScan(signal);
+ return;
+ }
+ }
+
+ DataBuffer<15>::Head head = m_attributeList;
+ if(head.getSize() == 0){
+ head = tabPtr.p->m_attributes;
+ }
+ LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head);
+
+ ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend();
+ const Uint32 parallelism = 16;
+ const Uint32 attrLen = 5 + attrBuf.getSize();
+
+ req->senderData = m_subscriptionPtrI;
+ req->resultRef = suma.reference();
+ req->tableId = tabPtr.p->m_tableId;
+ req->requestInfo = 0;
+ req->savePointId = 0;
+ ScanFragReq::setLockMode(req->requestInfo, 0);
+ ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
+ ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
+ ScanFragReq::setAttrLen(req->requestInfo, attrLen);
+ req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
+ req->schemaVersion = tabPtr.p->m_schemaVersion;
+ req->transId1 = 0;
+ req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
+ req->clientOpPtr = (ptrI << 16);
+ req->batch_size_rows= 16;
+ req->batch_size_bytes= 0;
+ suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal,
+ ScanFragReq::SignalLength, JBB);
+
+ signal->theData[0] = ptrI;
+ signal->theData[1] = 0;
+ signal->theData[2] = (SUMA << 20) + (suma.getOwnNodeId() << 8);
+
+ // Return all
+ signal->theData[3] = attrBuf.getSize();
+ signal->theData[4] = 0;
+ signal->theData[5] = 0;
+ signal->theData[6] = 0;
+ signal->theData[7] = 0;
+
+ Uint32 dataPos = 8;
+ DataBuffer<15>::DataBufferIterator it;
+ for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){
+ AttributeHeader::init(&signal->theData[dataPos++], * it.data, 0);
+ if(dataPos == 25){
+ suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, 25, JBB);
+ dataPos = 3;
+ }
+ }
+ if(dataPos != 3){
+ suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, dataPos, JBB);
+ }
+
+ m_currentTableId = tabPtr.p->m_tableId;
+ m_currentNoOfAttributes = attrBuf.getSize();
+}
+
+
+void
+SumaParticipant::execSCAN_FRAGREF(Signal* signal){
+ jamEntry();
+
+// ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr();
+ ndbrequire(false);
+}
+
+void
+SumaParticipant::execSCAN_FRAGCONF(Signal* signal){
+ jamEntry();
+
+ CRASH_INSERTION(13011);
+
+ ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr();
+
+ const Uint32 completed = conf->fragmentCompleted;
+ const Uint32 senderData = conf->senderData;
+ const Uint32 completedOps = conf->completedOps;
+
+ SubscriptionPtr subPtr;
+ c_subscriptions.getPtr(subPtr, senderData);
+
+ if(completed != 2){
+ jam();
+
+#if PRINT_ONLY
+ SubSyncContinueConf * const conf =
+ (SubSyncContinueConf*)signal->getDataPtrSend();
+ conf->subscriptionId = subPtr.p->m_subscriptionId;
+ conf->subscriptionKey = subPtr.p->m_subscriptionKey;
+ execSUB_SYNC_CONTINUE_CONF(signal);
+#else
+ SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend();
+ req->subscriberData = subPtr.p->m_subscriberData;
+ req->noOfRowsSent = completedOps;
+ sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,
+ SubSyncContinueReq::SignalLength, JBB);
+#endif
+ return;
+ }
+
+ ndbrequire(completedOps == 0);
+
+ SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);
+
+ tmp->m_currentFragment++;
+ tmp->nextScan(signal);
+}
+
+void
+SumaParticipant::execSUB_SYNC_CONTINUE_CONF(Signal* signal){
+ jamEntry();
+
+ CRASH_INSERTION(13012);
+
+ SubSyncContinueConf * const conf =
+ (SubSyncContinueConf*)signal->getDataPtr();
+
+ SubscriptionPtr subPtr;
+ Subscription key;
+ key.m_subscriptionId = conf->subscriptionId;
+ key.m_subscriptionKey = conf->subscriptionKey;
+
+ ndbrequire(c_subscriptions.find(subPtr, key));
+
+ ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend();
+ req->senderData = subPtr.i;
+ req->closeFlag = 0;
+ req->transId1 = 0;
+ req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);
+ req->batch_size_rows = 16;
+ req->batch_size_bytes = 0;
+ sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
+ ScanFragNextReq::SignalLength, JBB);
+}
+
+void
+SumaParticipant::SyncRecord::completeScan(Signal* signal){
+ jam();
+ // m_tableList.release();
+
+ SubscriptionPtr subPtr;
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+ ndbrequire(subPtr.p->m_syncPtrI == ptrI);
+
+#if PRINT_ONLY
+ ndbout_c("GSN_SUB_SYNC_CONF (data)");
+#else
+ SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
+ conf->subscriptionId = subPtr.p->m_subscriptionId;
+ conf->subscriptionKey = subPtr.p->m_subscriptionKey;
+ conf->part = SubscriptionData::TableData;
+ conf->subscriberData = subPtr.p->m_subscriberData;
+ suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal,
+ SubSyncConf::SignalLength, JBB);
+#endif
+}
+
+void
+SumaParticipant::execSCAN_HBREP(Signal* signal){
+ jamEntry();
+#if 0
+ ndbout << "execSCAN_HBREP" << endl << hex;
+ for(int i = 0; i<signal->length(); i++){
+ ndbout << signal->theData[i] << " ";
+ if(((i + 1) % 8) == 0)
+ ndbout << endl << hex;
+ }
+ ndbout << endl;
+#endif
+}
+
+/**********************************************************
+ *
+ * Suma participant interface
+ *
+ * Creation of subscriber
+ *
+ */
+
+void
+SumaParticipant::execSUB_START_REQ(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execSUB_START_REQ");
+
+ CRASH_INSERTION(13013);
+
+ if (c_restartLock) {
+ jam();
+ // ndbout_c("c_restartLock");
+ if (RtoI(signal->getSendersBlockRef(), false) == RNIL) {
+ jam();
+ sendSubStartRef(signal, /** Error Code */ 0, true);
+ DBUG_VOID_RETURN;
+ }
+ // only allow other Suma's in the nodegroup to come through for restart purposes
+ }
+
+ Subscription key;
+
+ SubStartReq * const req = (SubStartReq*)signal->getDataPtr();
+
+ Uint32 senderRef = req->senderRef;
+ Uint32 senderData = req->senderData;
+ Uint32 subscriberData = req->subscriberData;
+ Uint32 subscriberRef = req->subscriberRef;
+ SubscriptionData::Part part = (SubscriptionData::Part)req->part;
+ key.m_subscriptionId = req->subscriptionId;
+ key.m_subscriptionKey = req->subscriptionKey;
+
+ SubscriptionPtr subPtr;
+ if(!c_subscriptions.find(subPtr, key)){
+ jam();
+ sendSubStartRef(signal, /** Error Code */ 0);
+ DBUG_VOID_RETURN;
+ }
+
+ Ptr<SyncRecord> syncPtr;
+ c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+ if (syncPtr.p->m_locked) {
+ jam();
+#if 0
+ ndbout_c("Locked");
+#endif
+ sendSubStartRef(signal, /** Error Code */ 0, true);
+ DBUG_VOID_RETURN;
+ }
+ syncPtr.p->m_locked = true;
+
+ SubscriberPtr subbPtr;
+ if(!c_subscriberPool.seize(subbPtr)){
+ jam();
+ syncPtr.p->m_locked = false;
+ sendSubStartRef(signal, /** Error Code */ 0);
+ DBUG_VOID_RETURN;
+ }
+
+ Uint32 type = subPtr.p->m_subscriptionType;
+
+ subbPtr.p->m_senderRef = senderRef;
+ subbPtr.p->m_senderData = senderData;
+
+ switch (type) {
+ case SubCreateReq::TableEvent:
+ jam();
+ // we want the data to return to the API not DICT
+ subbPtr.p->m_subscriberRef = subscriberRef;
+ // ndbout_c("start ref = %u", signal->getSendersBlockRef());
+ // ndbout_c("ref = %u", subbPtr.p->m_subscriberRef);
+ // we use the subscription id for now, should really be API choice
+ subbPtr.p->m_subscriberData = subscriberData;
+
+#if 0
+ if (RtoI(signal->getSendersBlockRef(), false) == RNIL) {
+ jam();
+ for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
+ Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
+ if (ref != reference()) {
+ jam();
+ sendSubStartReq(subPtr, subbPtr, signal, ref);
+ } else
+ jam();
+ }
+ }
+#endif
+ break;
+ case SubCreateReq::DatabaseSnapshot:
+ case SubCreateReq::SelectiveTableSnapshot:
+ jam();
+ subbPtr.p->m_subscriberRef = GREP_REF;
+ subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData;
+ break;
+ case SubCreateReq::SingleTableScan:
+ jam();
+ subbPtr.p->m_subscriberRef = subPtr.p->m_subscriberRef;
+ subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData;
+ }
+
+ subbPtr.p->m_subPtrI = subPtr.i;
+ subbPtr.p->m_firstGCI = RNIL;
+ if (type == SubCreateReq::TableEvent)
+ subbPtr.p->m_lastGCI = 0;
+ else
+ subbPtr.p->m_lastGCI = RNIL; // disable usage of m_lastGCI
+ bool ok = false;
+
+ switch(part){
+ case SubscriptionData::MetaData:
+ ok = true;
+ jam();
+ c_metaSubscribers.add(subbPtr);
+ sendSubStartComplete(signal, subbPtr, 0, part);
+ break;
+ case SubscriptionData::TableData:
+ ok = true;
+ jam();
+ c_prepDataSubscribers.add(subbPtr);
+ syncPtr.p->startTrigger(signal);
+ break;
+ }
+ ndbrequire(ok);
+ DBUG_VOID_RETURN;
+}
+
+void
+SumaParticipant::sendSubStartComplete(Signal* signal,
+ SubscriberPtr subbPtr,
+ Uint32 firstGCI,
+ SubscriptionData::Part part){
+ jam();
+
+ SubscriptionPtr subPtr;
+ c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+
+ Ptr<SyncRecord> syncPtr;
+ c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+ syncPtr.p->m_locked = false;
+
+ SubStartConf * const conf = (SubStartConf*)signal->getDataPtrSend();
+
+ conf->senderRef = reference();
+ conf->senderData = subbPtr.p->m_senderData;
+ conf->subscriptionId = subPtr.p->m_subscriptionId;
+ conf->subscriptionKey = subPtr.p->m_subscriptionKey;
+ conf->firstGCI = firstGCI;
+ conf->part = (Uint32) part;
+
+ conf->subscriberData = subPtr.p->m_subscriberData;
+ sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_CONF, signal,
+ SubStartConf::SignalLength, JBB);
+}
+
+#if 0
+void
+SumaParticipant::sendSubStartRef(SubscriptionPtr subPtr,
+ Signal* signal, Uint32 errCode,
+ bool temporary){
+ jam();
+ SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
+ xxx ref->senderRef = reference();
+ xxx ref->senderData = subPtr.p->m_senderData;
+ ref->subscriptionId = subPtr.p->m_subscriptionId;
+ ref->subscriptionKey = subPtr.p->m_subscriptionKey;
+ ref->part = (Uint32) subPtr.p->m_subscriptionType;
+ ref->subscriberData = subPtr.p->m_subscriberData;
+ ref->err = errCode;
+ if (temporary) {
+ jam();
+ ref->setTemporary();
+ }
+ releaseSections(signal);
+ sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_REF, signal,
+ SubStartRef::SignalLength, JBB);
+}
+#endif
+void
+SumaParticipant::sendSubStartRef(Signal* signal, Uint32 errCode,
+ bool temporary){
+ jam();
+ SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
+ ref->senderRef = reference();
+ ref->err = errCode;
+ if (temporary) {
+ jam();
+ ref->setTemporary();
+ }
+ releaseSections(signal);
+ sendSignal(signal->getSendersBlockRef(), GSN_SUB_START_REF, signal,
+ SubStartRef::SignalLength, JBB);
+}
+
+/**********************************************************
+ *
+ * Trigger admin interface
+ *
+ */
+
+void
+SumaParticipant::SyncRecord::startTrigger(Signal* signal){
+ jam();
+ m_currentTable = 0;
+ m_latestTriggerId = RNIL;
+ nextTrigger(signal);
+}
+
+void
+SumaParticipant::SyncRecord::nextTrigger(Signal* signal){
+ jam();
+
+ TableList::DataBufferIterator it;
+
+ if(!m_tableList.position(it, m_currentTable)){
+ completeTrigger(signal);
+ return;
+ }
+
+ SubscriptionPtr subPtr;
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+ ndbrequire(subPtr.p->m_syncPtrI == ptrI);
+ const Uint32 RT_BREAK = 48;
+ Uint32 latestTriggerId = 0;
+ for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){
+ TablePtr tabPtr;
+#if 0
+ ndbout_c("nextTrigger tableid %u", *it.data);
+#endif
+ ndbrequire(suma.c_tables.find(tabPtr, *it.data));
+
+ AttributeMask attrMask;
+ createAttributeMask(attrMask, tabPtr.p);
+
+ for(Uint32 j = 0; j<3; j++){
+ i++;
+ latestTriggerId = (tabPtr.p->m_schemaVersion << 18) |
+ (j << 16) | tabPtr.p->m_tableId;
+ if(tabPtr.p->m_hasTriggerDefined[j] == 0) {
+ ndbrequire(tabPtr.p->m_triggerIds[j] == ILLEGAL_TRIGGER_ID);
+#if 0
+ ndbout_c("DEFINING trigger on table %u[%u]", tabPtr.p->m_tableId, j);
+#endif
+ CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend();
+ req->setUserRef(SUMA_REF);
+ req->setConnectionPtr(ptrI);
+ req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
+ req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
+ req->setMonitorReplicas(true);
+ req->setMonitorAllAttributes(false);
+ req->setReceiverRef(SUMA_REF);
+ req->setTriggerId(latestTriggerId);
+ req->setTriggerEvent((TriggerEvent::Value)j);
+ req->setTableId(tabPtr.p->m_tableId);
+ req->setAttributeMask(attrMask);
+ suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ,
+ signal, CreateTrigReq::SignalLength, JBB);
+
+ } else {
+ /**
+ * Faking that a trigger has been created in order to
+ * simulate the proper behaviour.
+ * Perhaps this should be a dummy signal instead of
+ * (ab)using CREATE_TRIG_CONF.
+ */
+ CreateTrigConf * conf = (CreateTrigConf*)signal->getDataPtrSend();
+ conf->setConnectionPtr(ptrI);
+ conf->setTableId(tabPtr.p->m_tableId);
+ conf->setTriggerId(latestTriggerId);
+ suma.sendSignal(SUMA_REF,GSN_CREATE_TRIG_CONF,
+ signal, CreateTrigConf::SignalLength, JBB);
+
+ }
+
+ }
+ m_currentTable++;
+ }
+ m_latestTriggerId = latestTriggerId;
+}
+
+void
+SumaParticipant::SyncRecord::createAttributeMask(AttributeMask& mask,
+ Table * table){
+ jam();
+ mask.clear();
+ DataBuffer<15>::DataBufferIterator it;
+ LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, table->m_attributes);
+ for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){
+ mask.set(* it.data);
+ }
+}
+
+void
+SumaParticipant::SyncRecord::runCREATE_TRIG_CONF(Signal* signal){
+ jam();
+
+ CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();
+ const Uint32 triggerId = conf->getTriggerId();
+ Uint32 type = (triggerId >> 16) & 0x3;
+ Uint32 tableId = conf->getTableId();
+
+ TablePtr tabPtr;
+ ndbrequire(suma.c_tables.find(tabPtr, tableId));
+
+ ndbrequire(type < 3);
+ tabPtr.p->m_triggerIds[type] = triggerId;
+ tabPtr.p->m_hasTriggerDefined[type]++;
+
+ if(triggerId == m_latestTriggerId){
+ jam();
+ nextTrigger(signal);
+ }
+}
+
+void
+SumaParticipant::SyncRecord::completeTrigger(Signal* signal){
+ jam();
+ SubscriptionPtr subPtr;
+ CRASH_INSERTION(13013);
+#ifdef EVENT_PH3_DEBUG
+ ndbout_c("SumaParticipant: trigger completed");
+#endif
+ Uint32 gci;
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+ ndbrequire(subPtr.p->m_syncPtrI == ptrI);
+
+ SubscriberPtr subbPtr;
+ {
+ bool found = false;
+
+ for(suma.c_prepDataSubscribers.first(subbPtr);
+ !subbPtr.isNull(); suma.c_prepDataSubscribers.next(subbPtr)) {
+ jam();
+ if(subbPtr.p->m_subPtrI == subPtr.i) {
+ jam();
+ found = true;
+ break;
+ }
+ }
+ ndbrequire(found);
+ gci = suma.getFirstGCI(signal);
+ subbPtr.p->m_firstGCI = gci;
+ suma.c_prepDataSubscribers.remove(subbPtr);
+ suma.c_dataSubscribers.add(subbPtr);
+ }
+ suma.sendSubStartComplete(signal, subbPtr, gci, SubscriptionData::TableData);
+}
+
+void
+SumaParticipant::SyncRecord::startDropTrigger(Signal* signal){
+ jam();
+ m_currentTable = 0;
+ m_latestTriggerId = RNIL;
+ nextDropTrigger(signal);
+}
+
+void
+SumaParticipant::SyncRecord::nextDropTrigger(Signal* signal){
+ jam();
+
+ TableList::DataBufferIterator it;
+
+ if(!m_tableList.position(it, m_currentTable)){
+ completeDropTrigger(signal);
+ return;
+ }
+
+ SubscriptionPtr subPtr;
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+ ndbrequire(subPtr.p->m_syncPtrI == ptrI);
+
+ const Uint32 RT_BREAK = 48;
+ Uint32 latestTriggerId = 0;
+ for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){
+ jam();
+ TablePtr tabPtr;
+#if 0
+ ndbout_c("nextDropTrigger tableid %u", *it.data);
+#endif
+ ndbrequire(suma.c_tables.find(tabPtr, * it.data));
+
+ for(Uint32 j = 0; j<3; j++){
+ jam();
+ ndbrequire(tabPtr.p->m_triggerIds[j] != ILLEGAL_TRIGGER_ID);
+ i++;
+ latestTriggerId = tabPtr.p->m_triggerIds[j];
+ if(tabPtr.p->m_hasTriggerDefined[j] == 1) {
+ jam();
+
+ DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
+ req->setConnectionPtr(ptrI);
+ req->setUserRef(SUMA_REF); // Sending to myself
+ req->setRequestType(DropTrigReq::RT_USER);
+ req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
+ req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
+ req->setIndexId(RNIL);
+
+ req->setTableId(tabPtr.p->m_tableId);
+ req->setTriggerId(latestTriggerId);
+ req->setTriggerEvent((TriggerEvent::Value)j);
+
+#if 0
+ ndbout_c("DROPPING trigger %u = %u %u %u on table %u[%u]",
+ latestTriggerId,TriggerType::SUBSCRIPTION_BEFORE,
+ TriggerActionTime::TA_DETACHED, j, tabPtr.p->m_tableId, j);
+#endif
+ suma.sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
+ signal, DropTrigReq::SignalLength, JBB);
+ } else {
+ jam();
+ ndbrequire(tabPtr.p->m_hasTriggerDefined[j] > 1);
+ /**
+ * Faking that a trigger has been dropped in order to
+ * simulate the proper behaviour.
+ * Perhaps this should be a dummy signal instead of
+ * (ab)using DROP_TRIG_CONF.
+ */
+ DropTrigConf * conf = (DropTrigConf*)signal->getDataPtrSend();
+ conf->setConnectionPtr(ptrI);
+ conf->setTableId(tabPtr.p->m_tableId);
+ conf->setTriggerId(latestTriggerId);
+ suma.sendSignal(SUMA_REF,GSN_DROP_TRIG_CONF,
+ signal, DropTrigConf::SignalLength, JBB);
+ }
+ }
+ m_currentTable++;
+ }
+ m_latestTriggerId = latestTriggerId;
+}
+
+void
+SumaParticipant::SyncRecord::runDROP_TRIG_REF(Signal* signal){
+ jam();
+ DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
+ if (ref->getErrorCode() != DropTrigRef::TriggerNotFound){
+ ndbrequire(false);
+ }
+ const Uint32 triggerId = ref->getTriggerId();
+ Uint32 tableId = ref->getTableId();
+ runDropTrig(signal, triggerId, tableId);
+}
+
+void
+SumaParticipant::SyncRecord::runDROP_TRIG_CONF(Signal* signal){
+ jam();
+
+ DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
+ const Uint32 triggerId = conf->getTriggerId();
+ Uint32 tableId = conf->getTableId();
+ runDropTrig(signal, triggerId, tableId);
+}
+
+void
+SumaParticipant::SyncRecord::runDropTrig(Signal* signal,
+ Uint32 triggerId,
+ Uint32 tableId){
+ Uint32 type = (triggerId >> 16) & 0x3;
+
+ TablePtr tabPtr;
+ ndbrequire(suma.c_tables.find(tabPtr, tableId));
+
+ ndbrequire(type < 3);
+ ndbrequire(tabPtr.p->m_triggerIds[type] == triggerId);
+ tabPtr.p->m_hasTriggerDefined[type]--;
+ if (tabPtr.p->m_hasTriggerDefined[type] == 0) {
+ jam();
+ tabPtr.p->m_triggerIds[type] = ILLEGAL_TRIGGER_ID;
+ }
+ if(triggerId == m_latestTriggerId){
+ jam();
+ nextDropTrigger(signal);
+ }
+}
+
+void
+SumaParticipant::SyncRecord::completeDropTrigger(Signal* signal){
+ jam();
+ SubscriptionPtr subPtr;
+ CRASH_INSERTION(13014);
+#if 0
+ ndbout_c("trigger completed");
+#endif
+
+ suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
+ ndbrequire(subPtr.p->m_syncPtrI == ptrI);
+
+ bool found = false;
+ SubscriberPtr subbPtr;
+ for(suma.c_prepDataSubscribers.first(subbPtr);
+ !subbPtr.isNull(); suma.c_prepDataSubscribers.next(subbPtr)) {
+ jam();
+ if(subbPtr.p->m_subPtrI == subPtr.i) {
+ jam();
+ found = true;
+ break;
+ }
+ }
+ ndbrequire(found);
+ suma.sendSubStopComplete(signal, subbPtr);
+}
+
+/**********************************************************
+ * Scan data interface
+ *
+ * Assumption: one execTRANSID_AI contains all attr info
+ *
+ */
+
+#define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS
+#define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1
+
+static Uint32 f_bufferLock = 0;
+static Uint32 f_buffer[SUMA_BUF_SZ];
+static Uint32 f_trigBufferSize = 0;
+static Uint32 b_bufferLock = 0;
+static Uint32 b_buffer[SUMA_BUF_SZ];
+static Uint32 b_trigBufferSize = 0;
+
+void
+SumaParticipant::execTRANSID_AI(Signal* signal){
+ jamEntry();
+
+ CRASH_INSERTION(13015);
+ TransIdAI * const data = (TransIdAI*)signal->getDataPtr();
+ const Uint32 opPtrI = data->connectPtr;
+ const Uint32 length = signal->length() - 3;
+
+ if(f_bufferLock == 0){
+ f_bufferLock = opPtrI;
+ } else {
+ ndbrequire(f_bufferLock == opPtrI);
+ }
+
+ Ptr<SyncRecord> syncPtr;
+ c_syncPool.getPtr(syncPtr, (opPtrI >> 16));
+
+ Uint32 sum = 0;
+ Uint32 * dst = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
+ Uint32 * headers = f_buffer;
+ const Uint32 * src = &data->attrData[0];
+ const Uint32 * const end = &src[length];
+
+ const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;
+ for(Uint32 i = 0; i<attribs; i++){
+ Uint32 tmp = * src++;
+ * headers++ = tmp;
+ Uint32 len = AttributeHeader::getDataSize(tmp);
+
+ memcpy(dst, src, 4 * len);
+ dst += len;
+ src += len;
+ sum += len;
+ }
+
+ ndbrequire(src == end);
+
+ /**
+ * Send data to subscriber
+ */
+ LinearSectionPtr ptr[3];
+ ptr[0].p = f_buffer;
+ ptr[0].sz = attribs;
+
+ ptr[1].p = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
+ ptr[1].sz = sum;
+
+ SubscriptionPtr subPtr;
+ c_subscriptions.getPtr(subPtr, syncPtr.p->m_subscriptionPtrI);
+
+ /**
+ * Initialize signal
+ */
+ SubTableData * sdata = (SubTableData*)signal->getDataPtrSend();
+ Uint32 ref = subPtr.p->m_subscriberRef;
+ sdata->tableId = syncPtr.p->m_currentTableId;
+ sdata->senderData = subPtr.p->m_subscriberData;
+ sdata->operation = 3; // Scan
+ sdata->gci = 1; // Undefined
+#if PRINT_ONLY
+ ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum);
+#else
+ sendSignal(ref,
+ GSN_SUB_TABLE_DATA,
+ signal,
+ SubTableData::SignalLength, JBB,
+ ptr, 2);
+#endif
+
+ /**
+ * Reset f_bufferLock
+ */
+ f_bufferLock = 0;
+}
+
+/**********************************************************
+ *
+ * Trigger data interface
+ *
+ */
+
+void
+SumaParticipant::execTRIG_ATTRINFO(Signal* signal){
+ jamEntry();
+
+ CRASH_INSERTION(13016);
+ TrigAttrInfo* const trg = (TrigAttrInfo*)signal->getDataPtr();
+ const Uint32 trigId = trg->getTriggerId();
+
+ const Uint32 dataLen = signal->length() - TrigAttrInfo::StaticLength;
+
+ if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES){
+ jam();
+
+ ndbrequire(b_bufferLock == trigId);
+
+ memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen);
+ b_trigBufferSize += dataLen;
+ // printf("before values %u %u %u\n",trigId, dataLen, b_trigBufferSize);
+ } else {
+ jam();
+
+ if(f_bufferLock == 0){
+ f_bufferLock = trigId;
+ f_trigBufferSize = 0;
+ b_bufferLock = trigId;
+ b_trigBufferSize = 0;
+ } else {
+ ndbrequire(f_bufferLock == trigId);
+ }
+
+ memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen);
+ f_trigBufferSize += dataLen;
+ }
+}
+
+#ifdef NODEFAIL_DEBUG2
+static int theCounts[64] = {0};
+#endif
+
+Uint32
+Suma::getStoreBucket(Uint32 v)
+{
+ // id will contain id to responsible suma or
+ // RNIL if we don't have nodegroup info yet
+
+ const Uint32 N = NO_OF_BUCKETS;
+ const Uint32 D = v % N; // Distibution key
+ return D;
+}
+
+Uint32
+Suma::getResponsibleSumaNodeId(Uint32 D)
+{
+ // id will contain id to responsible suma or
+ // RNIL if we don't have nodegroup info yet
+
+ Uint32 id;
+
+ if (c_restartLock) {
+ jam();
+ // ndbout_c("c_restartLock");
+ id = RNIL;
+ } else {
+ jam();
+ id = RNIL;
+ const Uint32 n = c_noNodesInGroup; // Number nodes in node group
+ const Uint32 C1 = D / n;
+ const Uint32 C2 = D - C1*n; // = D % n;
+ const Uint32 C = C2 + C1 % n;
+ for (Uint32 i = 0; i < n; i++) {
+ jam();
+ id = c_nodesInGroup[(C + i) % n];
+ if (c_aliveNodes.get(id) &&
+ !c_preparingNodes.get(id)) {
+ jam();
+ break;
+ }//if
+ }
+#ifdef NODEFAIL_DEBUG2
+ theCounts[id]++;
+ ndbout_c("Suma:responsible n=%u, D=%u, id = %u, count=%u",
+ n,D, id, theCounts[id]);
+#endif
+ }
+ return id;
+}
+
+Uint32
+SumaParticipant::decideWhoToSend(Uint32 nBucket, Uint32 gci){
+ bool replicaFlag = true;
+ Uint32 nId = RNIL;
+
+ // bucket active/not active set by GCP_COMPLETE
+ if (c_buckets[nBucket].active) {
+ if (c_buckets[nBucket].handover && c_buckets[nBucket].handoverGCI <= gci) {
+ jam();
+ replicaFlag = true; // let the other node send this
+ nId = RNIL;
+ // mark this as started, if we get a node failiure now we have some lost stuff
+ c_buckets[nBucket].handover_started = true;
+ } else {
+ jam();
+ replicaFlag = false;
+ nId = refToNode(reference());
+ }
+ } else {
+ nId = getResponsibleSumaNodeId(nBucket);
+ replicaFlag = !(nId == refToNode(reference()));
+
+ if (!replicaFlag) {
+ if (!c_buckets[nBucket].handover) {
+ jam();
+ // appearently a node has failed and we are taking over sending
+ // from that bucket. Now we need to go back to latest completed
+ // GCI. Handling will depend on Subscriber and Subscription
+
+ // TODO, for now we make an easy takeover
+ if (gci < c_nodeFailGCI)
+ c_lastInconsistentGCI = gci;
+
+ // we now have responsability for this bucket and we're actively
+ // sending from that
+ c_buckets[nBucket].active = true;
+#ifdef HANDOVER_DEBUG
+ ndbout_c("Takeover Bucket %u", nBucket);
+#endif
+ } else if (c_buckets[nBucket].handoverGCI > gci) {
+ jam();
+ replicaFlag = true; // handover going on, but don't start sending yet
+ nId = RNIL;
+ } else {
+ jam();
+#ifdef HANDOVER_DEBUG
+ ndbout_c("Possible error: Will send from GCI = %u", gci);
+#endif
+ }
+ }
+ }
+
+#ifdef NODEFAIL_DEBUG2
+ ndbout_c("Suma:bucket %u, responsible id = %u, replicaFlag = %u",
+ nBucket, nId, (Uint32)replicaFlag);
+#endif
+ return replicaFlag;
+}
+
+void
+SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execFIRE_TRIG_ORD");
+ CRASH_INSERTION(13016);
+ FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr();
+ const Uint32 trigId = trg->getTriggerId();
+ const Uint32 hashValue = trg->getHashValue();
+ const Uint32 gci = trg->getGCI();
+ const Uint32 event = trg->getTriggerEvent();
+ const Uint32 triggerId = trg->getTriggerId();
+ Uint32 tableId = triggerId & 0xFFFF;
+
+ ndbrequire(f_bufferLock == trigId);
+
+#ifdef EVENT_DEBUG2
+ ndbout_c("SumaParticipant::execFIRE_TRIG_ORD");
+#endif
+
+ Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
+ ndbrequire(sz == f_trigBufferSize);
+
+ /**
+ * Reformat as "all headers" + "all data"
+ */
+ Uint32 dataLen = 0;
+ Uint32 noOfAttrs = 0;
+ Uint32 * src = f_buffer;
+ Uint32 * headers = signal->theData + 25;
+ Uint32 * dst = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
+
+ LinearSectionPtr ptr[3];
+ int nptr;
+
+ ptr[0].p = headers;
+ ptr[1].p = dst;
+
+ while(sz > 0){
+ jam();
+ Uint32 tmp = * src ++;
+ * headers ++ = tmp;
+ Uint32 len = AttributeHeader::getDataSize(tmp);
+ memcpy(dst, src, 4 * len);
+ dst += len;
+ src += len;
+
+ noOfAttrs++;
+ dataLen += len;
+ sz -= (1 + len);
+ }
+ ndbrequire(sz == 0);
+
+ ptr[0].sz = noOfAttrs;
+ ptr[1].sz = dataLen;
+
+ if (b_trigBufferSize > 0) {
+ jam();
+ ptr[2].p = b_buffer;
+ ptr[2].sz = b_trigBufferSize;
+ nptr = 3;
+ } else {
+ jam();
+ nptr = 2;
+ }
+
+ // right now only for tableEvent
+ bool replicaFlag = decideWhoToSend(getStoreBucket(hashValue), gci);
+
+ /**
+ * Signal to subscriber(s)
+ */
+ SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
+ data->gci = gci;
+ data->tableId = tableId;
+ data->operation = event;
+ data->noOfAttributes = noOfAttrs;
+ data->dataSize = dataLen;
+
+ SubscriberPtr subbPtr;
+ for(c_dataSubscribers.first(subbPtr); !subbPtr.isNull();
+ c_dataSubscribers.next(subbPtr)){
+ if (subbPtr.p->m_firstGCI > gci) {
+#ifdef EVENT_DEBUG
+ ndbout_c("m_firstGCI = %u, gci = %u", subbPtr.p->m_firstGCI, gci);
+#endif
+ jam();
+ // we're either restarting or it's a newly created subscriber
+ // and waiting for the right gci
+ continue;
+ }
+
+ jam();
+
+ const Uint32 ref = subbPtr.p->m_subscriberRef;
+ // ndbout_c("ref = %u", ref);
+ const Uint32 subdata = subbPtr.p->m_subscriberData;
+ data->senderData = subdata;
+ /*
+ * get subscription ptr for this subscriber
+ */
+ SubscriptionPtr subPtr;
+ c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+
+ if(!subPtr.p->m_tables[tableId]) {
+ jam();
+ continue;
+ //continue in for-loop if the table is not part of
+ //the subscription. Otherwise, send data to subscriber.
+ }
+
+ if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) {
+ if (replicaFlag) {
+ jam();
+ c_failoverBuffer.subTableData(gci,NULL,0);
+ continue;
+ }
+ jam();
+ Uint32 tmp = data->logType;
+ if (c_lastInconsistentGCI == data->gci) {
+ data->setGCINotConsistent();
+ }
+
+#ifdef HANDOVER_DEBUG
+ {
+ static int aLongGCIName = 0;
+ if (data->gci != aLongGCIName) {
+ aLongGCIName = data->gci;
+ ndbout_c("sent from GCI = %u", aLongGCIName);
+ }
+ }
+#endif
+ DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d", refToNode(ref)));
+ sendSignal(ref, GSN_SUB_TABLE_DATA, signal,
+ SubTableData::SignalLength, JBB, ptr, nptr);
+ data->logType = tmp;
+ } else {
+ ndbassert(refToNode(ref) == 0 || refToNode(ref) == getOwnNodeId());
+ jam();
+#if PRINT_ONLY
+ ndbout_c("GSN_SUB_TABLE_DATA to %s: op: %d #attr: %d len: %d",
+ getBlockName(refToBlock(ref)),
+ noOfAttrs, dataLen);
+
+#else
+#ifdef HANDOVER_DEBUG
+ {
+ static int aLongGCIName2 = 0;
+ if (data->gci != aLongGCIName2) {
+ aLongGCIName2 = data->gci;
+ ndbout_c("(EXECUTE_DIRECT) sent from GCI = %u to %u", aLongGCIName2, ref);
+ }
+ }
+#endif
+ EXECUTE_DIRECT(refToBlock(ref), GSN_SUB_TABLE_DATA, signal,
+ SubTableData::SignalLength);
+ jamEntry();
+#endif
+ }
+ }
+
+ /**
+ * Reset f_bufferLock
+ */
+ f_bufferLock = 0;
+ b_bufferLock = 0;
+
+ DBUG_VOID_RETURN;
+}
+
+void
+SumaParticipant::execSUB_GCP_COMPLETE_REP(Signal* signal){
+ jamEntry();
+
+ SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
+
+ Uint32 gci = rep->gci;
+ c_lastCompleteGCI = gci;
+
+ /**
+ * always send SUB_GCP_COMPLETE_REP to Grep (so
+ * Lars can do funky stuff calculating intervals,
+ * even before the subscription is started
+ */
+ rep->senderRef = reference();
+ rep->senderData = 0; //ignored in grep
+ EXECUTE_DIRECT(refToBlock(GREP_REF), GSN_SUB_GCP_COMPLETE_REP, signal,
+ SubGcpCompleteRep::SignalLength);
+
+ /**
+ * Signal to subscriber(s)
+ */
+
+ SubscriberPtr subbPtr;
+ SubscriptionPtr subPtr;
+ c_dataSubscribers.first(subbPtr);
+ for(; !subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){
+
+ if (subbPtr.p->m_firstGCI > gci) {
+ jam();
+ // we don't send SUB_GCP_COMPLETE_REP for incomplete GCI's
+ continue;
+ }
+
+ const Uint32 ref = subbPtr.p->m_subscriberRef;
+ rep->senderRef = ref;
+ rep->senderData = subbPtr.p->m_subscriberData;
+
+ c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+#if PRINT_ONLY
+ ndbout_c("GSN_SUB_GCP_COMPLETE_REP to %s:",
+ getBlockName(refToBlock(ref)));
+#else
+ /**
+ * Ignore sending to GREP (since we sent earlier)
+ */
+ if (ref == GREP_REF) {
+ jam();
+ continue;
+ }
+
+ CRASH_INSERTION(13018);
+
+ if (subPtr.p->m_subscriptionType == SubCreateReq::TableEvent)
+ {
+ jam();
+ sendSignal(ref, GSN_SUB_GCP_COMPLETE_REP, signal,
+ SubGcpCompleteRep::SignalLength, JBB);
+ }
+ else
+ {
+ jam();
+ ndbassert(refToNode(ref) == 0 || refToNode(ref) == getOwnNodeId());
+ EXECUTE_DIRECT(refToBlock(ref), GSN_SUB_GCP_COMPLETE_REP, signal,
+ SubGcpCompleteRep::SignalLength);
+ jamEntry();
+ }
+#endif
+ }
+
+ if (c_handoverToDo) {
+ jam();
+ c_handoverToDo = false;
+ for( int i = 0; i < NO_OF_BUCKETS; i++) {
+ if (c_buckets[i].handover) {
+ if (c_buckets[i].handoverGCI > gci) {
+ jam();
+ c_handoverToDo = true; // still waiting for the right GCI
+ break; /* since all handover should happen at the same time
+ * we can break here
+ */
+ } else {
+ c_buckets[i].handover = false;
+#ifdef HANDOVER_DEBUG
+ ndbout_c("Handover Bucket %u", i);
+#endif
+ if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
+ // my bucket to be handed over to me
+ ndbrequire(!c_buckets[i].active);
+ jam();
+ c_buckets[i].active = true;
+ } else {
+ // someone else's bucket to handover to
+ ndbrequire(c_buckets[i].active);
+ jam();
+ c_buckets[i].active = false;
+ }
+ }
+ }
+ }
+ }
+}
+
+/***********************************************************
+ *
+ * Embryo to syncronize the Suma's so as to know if a subscriber
+ * has received a GCP_COMPLETE from all suma's or not
+ *
+ */
+
+void
+SumaParticipant::runSUB_GCP_COMPLETE_ACC(Signal* signal){
+ jam();
+
+ SubGcpCompleteAcc * const acc = (SubGcpCompleteAcc*)signal->getDataPtr();
+
+ Uint32 gci = acc->rep.gci;
+
+#ifdef EVENT_DEBUG
+ ndbout_c("SumaParticipant::runSUB_GCP_COMPLETE_ACC gci = %u", gci);
+#endif
+
+ c_failoverBuffer.subGcpCompleteRep(gci);
+}
+
+void
+Suma::execSUB_GCP_COMPLETE_ACC(Signal* signal){
+ jamEntry();
+
+ if (RtoI(signal->getSendersBlockRef(), false) != RNIL) {
+ jam();
+ // Ack from other SUMA
+ runSUB_GCP_COMPLETE_ACC(signal);
+ return;
+ }
+
+ jam();
+ // Ack from User and not an acc from other SUMA, redistribute in nodegroup
+
+ SubGcpCompleteAcc * const acc = (SubGcpCompleteAcc*)signal->getDataPtr();
+ Uint32 gci = acc->rep.gci;
+ Uint32 senderRef = acc->rep.senderRef;
+ Uint32 subscriberData = acc->rep.subscriberData;
+
+#ifdef EVENT_DEBUG
+ ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC gci = %u", gci);
+#endif
+ bool moreToCome = false;
+
+ SubscriberPtr subbPtr;
+ for(c_dataSubscribers.first(subbPtr);
+ !subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){
+#ifdef EVENT_DEBUG
+ ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC %u == %u && %u == %u",
+ subbPtr.p->m_subscriberRef,
+ senderRef,
+ subbPtr.p->m_subscriberData,
+ subscriberData);
+#endif
+ if (subbPtr.p->m_subscriberRef == senderRef &&
+ subbPtr.p->m_subscriberData == subscriberData) {
+ jam();
+#ifdef EVENT_DEBUG
+ ndbout_c("Suma::execSUB_GCP_COMPLETE_ACC gci = FOUND SUBSCRIBER");
+#endif
+ subbPtr.p->m_lastGCI = gci;
+ } else if (subbPtr.p->m_lastGCI < gci) {
+ jam();
+ if (subbPtr.p->m_firstGCI <= gci)
+ moreToCome = true;
+ } else
+ jam();
+ }
+
+ if (!moreToCome) {
+ // tell the other SUMA's that I'm done with this GCI
+ jam();
+ for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
+ Uint32 id = c_nodesInGroup[i];
+ Uint32 ref = calcSumaBlockRef(id);
+ if ((ref != reference()) && c_aliveNodes.get(id)) {
+ jam();
+ sendSignal(ref, GSN_SUB_GCP_COMPLETE_ACC, signal,
+ SubGcpCompleteAcc::SignalLength, JBB);
+ } else
+ jam();
+ }
+ }
+}
+
+static Uint32 tmpFailoverBuffer[512];
+//SumaParticipant::FailoverBuffer::FailoverBuffer(DataBuffer<15>::DataBufferPool & p)
+// : m_dataList(p),
+SumaParticipant::FailoverBuffer::FailoverBuffer()
+ :
+ c_gcis(tmpFailoverBuffer), c_sz(512), c_first(0), c_next(0), c_full(false)
+{
+}
+
+bool SumaParticipant::FailoverBuffer::subTableData(Uint32 gci, Uint32 *src, int sz)
+{
+ bool ok = true;
+
+ if (c_full) {
+ ok = false;
+#ifdef EVENT_DEBUG
+ ndbout_c("Suma::FailoverBuffer::SubTableData buffer full gci=%u");
+#endif
+ } else {
+ c_gcis[c_next] = gci;
+ c_next++;
+ if (c_next == c_sz) c_next = 0;
+ if (c_next == c_first)
+ c_full = true;
+ // ndbout_c("%u %u %u",c_first,c_next,c_sz);
+ }
+ return ok;
+}
+bool SumaParticipant::FailoverBuffer::subGcpCompleteRep(Uint32 gci)
+{
+ bool ok = true;
+
+ // ndbout_c("Empty");
+ while (true) {
+ if (c_first == c_next && !c_full)
+ break;
+ if (c_gcis[c_first] > gci)
+ break;
+ c_full = false;
+ c_first++;
+ if (c_first == c_sz) c_first = 0;
+ // ndbout_c("%u %u %u : ",c_first,c_next,c_sz);
+ }
+
+ return ok;
+}
+bool SumaParticipant::FailoverBuffer::nodeFailRep()
+{
+ bool ok = true;
+ while (true) {
+ if (c_first == c_next && !c_full)
+ break;
+
+#ifdef EVENT_DEBUG
+ ndbout_c("Suma::FailoverBuffer::NodeFailRep resending gci=%u", c_gcis[c_first]);
+#endif
+ c_full = false;
+ c_first++;
+ if (c_first == c_sz) c_first = 0;
+ }
+ return ok;
+}
+
+/**********************************************************
+ * Suma participant interface
+ *
+ * Stopping and removing of subscriber
+ *
+ */
+
+void
+SumaParticipant::execSUB_STOP_REQ(Signal* signal){
+ jamEntry();
+ DBUG_ENTER("SumaParticipant::execSUB_STOP_REQ");
+
+ CRASH_INSERTION(13019);
+
+ SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
+ Uint32 senderRef = signal->getSendersBlockRef();
+ Uint32 senderData = req->senderData;
+ Uint32 subscriberRef = req->subscriberRef;
+ Uint32 subscriberData = req->subscriberData;
+ SubscriptionPtr subPtr;
+ Subscription key;
+ key.m_subscriptionId = req->subscriptionId;
+ key.m_subscriptionKey = req->subscriptionKey;
+ Uint32 part = req->part;
+
+ if (key.m_subscriptionKey == 0 &&
+ key.m_subscriptionId == 0 &&
+ subscriberData == 0) {
+ SubStopConf* conf = (SubStopConf*)signal->getDataPtrSend();
+
+ conf->senderRef = reference();
+ conf->senderData = senderData;
+ conf->subscriptionId = key.m_subscriptionId;
+ conf->subscriptionKey = key.m_subscriptionKey;
+ conf->subscriberData = subscriberData;
+
+ sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
+ SubStopConf::SignalLength, JBB);
+
+ removeSubscribersOnNode(signal, refToNode(subscriberRef));
+ DBUG_VOID_RETURN;
+ }
+
+ if(!c_subscriptions.find(subPtr, key)){
+ jam();
+ sendSubStopRef(signal, GrepError::SUBSCRIPTION_ID_NOT_FOUND);
+ return;
+ }
+
+ ndbrequire(part == SubscriptionData::TableData);
+
+ SubscriberPtr subbPtr;
+ if (senderRef == reference()){
+ jam();
+ c_subscriberPool.getPtr(subbPtr, senderData);
+ ndbrequire(subbPtr.p->m_subPtrI == subPtr.i &&
+ subbPtr.p->m_subscriberRef == subscriberRef &&
+ subbPtr.p->m_subscriberData == subscriberData);
+ c_removeDataSubscribers.remove(subbPtr);
+ } else {
+ bool found = false;
+ jam();
+ c_dataSubscribers.first(subbPtr);
+ for (;!subbPtr.isNull(); c_dataSubscribers.next(subbPtr)){
+ jam();
+ if (subbPtr.p->m_subPtrI == subPtr.i &&
+ refToNode(subbPtr.p->m_subscriberRef) == refToNode(subscriberRef) &&
+ subbPtr.p->m_subscriberData == subscriberData){
+ // ndbout_c("STOP_REQ: before c_dataSubscribers.release");
+ jam();
+ c_dataSubscribers.remove(subbPtr);
+ found = true;
+ break;
+ }
+ }
+ /**
+ * If we didn't find anyone, send ref
+ */
+ if (!found) {
+ jam();
+ sendSubStopRef(signal, GrepError::SUBSCRIBER_NOT_FOUND);
+ DBUG_VOID_RETURN;
+ }
+ }
+
+ subbPtr.p->m_senderRef = senderRef; // store ref to requestor
+ subbPtr.p->m_senderData = senderData; // store ref to requestor
+ c_prepDataSubscribers.add(subbPtr);
+
+ Ptr<SyncRecord> syncPtr;
+ c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+ if (syncPtr.p->m_locked) {
+ jam();
+ sendSubStopRef(signal, /** Error Code */ 0, true);
+ DBUG_VOID_RETURN;
+ }
+ syncPtr.p->m_locked = true;
+
+ syncPtr.p->startDropTrigger(signal);
+ DBUG_VOID_RETURN;
+}
+
+void
+SumaParticipant::sendSubStopComplete(Signal* signal, SubscriberPtr subbPtr){
+ jam();
+
+ CRASH_INSERTION(13020);
+
+ SubscriptionPtr subPtr;
+ c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+
+ Ptr<SyncRecord> syncPtr;
+ c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+ syncPtr.p->m_locked = false;
+
+ SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
+
+ conf->senderRef = reference();
+ conf->senderData = subbPtr.p->m_senderData;
+ conf->subscriptionId = subPtr.p->m_subscriptionId;
+ conf->subscriptionKey = subPtr.p->m_subscriptionKey;
+ conf->subscriberData = subbPtr.p->m_subscriberData;
+ Uint32 senderRef = subbPtr.p->m_senderRef;
+
+ c_prepDataSubscribers.release(subbPtr);
+ sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
+ SubStopConf::SignalLength, JBB);
+}
+
+void
+SumaParticipant::sendSubStopRef(Signal* signal, Uint32 errCode,
+ bool temporary){
+ jam();
+ SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend();
+ ref->senderRef = reference();
+ ref->errorCode = errCode;
+ if (temporary) {
+ ref->setTemporary();
+ }
+ sendSignal(signal->getSendersBlockRef(),
+ GSN_SUB_STOP_REF,
+ signal,
+ SubStopRef::SignalLength,
+ JBB);
+ return;
+}
+
+/**************************************************************
+ *
+ * Removing subscription
+ *
+ */
+
+void
+SumaParticipant::execSUB_REMOVE_REQ(Signal* signal) {
+ jamEntry();
+
+ Uint32 senderRef = signal->getSendersBlockRef();
+
+ CRASH_INSERTION(13021);
+
+ const SubRemoveReq req = *(SubRemoveReq*)signal->getDataPtr();
+ SubscriptionPtr subPtr;
+ Subscription key;
+ key.m_subscriptionId = req.subscriptionId;
+ key.m_subscriptionKey = req.subscriptionKey;
+
+ if(!c_subscriptions.find(subPtr, key)) {
+ jam();
+ sendSubRemoveRef(signal, req, (Uint32) GrepError::SUBSCRIPTION_ID_NOT_FOUND);
+ return;
+ }
+
+ int count = 0;
+ {
+ jam();
+ SubscriberPtr i_subbPtr;
+ for(c_prepDataSubscribers.first(i_subbPtr);
+ !i_subbPtr.isNull(); c_prepDataSubscribers.next(i_subbPtr)){
+ jam();
+ if( i_subbPtr.p->m_subPtrI == subPtr.i ) {
+ jam();
+ sendSubRemoveRef(signal, req, /* ErrorCode */ 0, true);
+ return;
+ // c_prepDataSubscribers.release(subbPtr);
+ }
+ }
+ c_dataSubscribers.first(i_subbPtr);
+ while(!i_subbPtr.isNull()){
+ jam();
+ SubscriberPtr subbPtr = i_subbPtr;
+ c_dataSubscribers.next(i_subbPtr);
+ if( subbPtr.p->m_subPtrI == subPtr.i ) {
+ jam();
+ sendSubRemoveRef(signal, req, /* ErrorCode */ 0, true);
+ return;
+ /* Unfinished/untested code. If remove should be possible
+ * even if subscribers are left these have to be stopped
+ * first. See m_markRemove, m_nSubscribers. We need also to
+ * block remove for this subscription so that multiple
+ * removes is not possible...
+ */
+ c_dataSubscribers.remove(subbPtr);
+ c_removeDataSubscribers.add(subbPtr);
+ count++;
+ }
+ }
+ c_metaSubscribers.first(i_subbPtr);
+ while(!i_subbPtr.isNull()){
+ jam();
+ SubscriberPtr subbPtr = i_subbPtr;
+ c_metaSubscribers.next(i_subbPtr);
+ if( subbPtr.p->m_subPtrI == subPtr.i ){
+ jam();
+ c_metaSubscribers.release(subbPtr);
+ }
+ }
+ }
+
+ subPtr.p->m_senderRef = senderRef;
+ subPtr.p->m_senderData = req.senderData;
+
+ if (count > 0){
+ jam();
+ ndbrequire(false); // code not finalized
+ subPtr.p->m_markRemove = true;
+ subPtr.p->m_nSubscribers = count;
+ sendSubStopReq(signal);
+ } else {
+ completeSubRemoveReq(signal, subPtr);
+ }
+}
+
+void
+SumaParticipant::completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr) {
+ Uint32 subscriptionId = subPtr.p->m_subscriptionId;
+ Uint32 subscriptionKey = subPtr.p->m_subscriptionKey;
+ Uint32 senderRef = subPtr.p->m_senderRef;
+ Uint32 senderData = subPtr.p->m_senderData;
+
+ {
+ Ptr<SyncRecord> syncPtr;
+ c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+
+ syncPtr.p->release();
+ c_syncPool.release(syncPtr);
+ }
+
+ // if (subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {
+ // jam();
+ // senderRef = subPtr.p->m_subscriberRef;
+ // }
+ c_subscriptions.release(subPtr);
+
+ /**
+ * I was the last subscription to be remove so clear c_tables
+ */
+#if 0
+ ndbout_c("c_subscriptionPool.getSize() %d c_subscriptionPool.getNoOfFree()%d",
+ c_subscriptionPool.getSize(),c_subscriptionPool.getNoOfFree());
+#endif
+
+ if(c_subscriptionPool.getSize() == c_subscriptionPool.getNoOfFree()) {
+ jam();
+#if 0
+ ndbout_c("SUB_REMOVE_REQ:Clearing c_tables");
+#endif
+ KeyTable<Table>::Iterator it;
+ for(c_tables.first(it); !it.isNull(); ){
+
+ it.curr.p->release(* this);
+
+ TablePtr tabPtr = it.curr;
+
+ c_tables.next(it);
+ c_tables.release(tabPtr);
+ }
+ }
+
+ SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ conf->senderData = senderData;
+ conf->subscriptionId = subscriptionId;
+ conf->subscriptionKey = subscriptionKey;
+
+ sendSignal(senderRef, GSN_SUB_REMOVE_CONF, signal,
+ SubRemoveConf::SignalLength, JBB);
+}
+
+void
+SumaParticipant::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req,
+ Uint32 errCode, bool temporary){
+ jam();
+ SubRemoveRef * ref = (SubRemoveRef *)signal->getDataPtrSend();
+ ref->senderRef = reference();
+ ref->subscriptionId = req.subscriptionId;
+ ref->subscriptionKey = req.subscriptionKey;
+ ref->senderData = req.senderData;
+ ref->err = errCode;
+ if (temporary)
+ ref->setTemporary();
+ releaseSections(signal);
+ sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF,
+ signal, SubRemoveRef::SignalLength, JBB);
+ return;
+}
+
+void
+SumaParticipant::Table::release(SumaParticipant & suma){
+ jam();
+
+ LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes);
+ attrBuf.release();
+
+ LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);
+ fragBuf.release();
+}
+
+void
+SumaParticipant::SyncRecord::release(){
+ jam();
+ m_tableList.release();
+
+ LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributeList);
+ attrBuf.release();
+}
+
+
+/**************************************************************
+ *
+ * Restarting remote node functions, master functionality
+ * (slave does nothing special)
+ * - triggered on INCL_NODEREQ calling startNode
+ * - included node will issue START_ME when it's ready to start
+ * the subscribers
+ *
+ */
+
+Suma::Restart::Restart(Suma& s) : suma(s) {
+ for (int i = 0; i < MAX_REPLICAS; i++) {
+ c_okToStart[i] = false;
+ c_waitingToStart[i] = false;
+ }
+}
+
+void
+Suma::Restart::resetNode(Uint32 sumaRef)
+{
+ jam();
+ int I = suma.RtoI(sumaRef);
+ c_okToStart[I] = false;
+ c_waitingToStart[I] = false;
+}
+
+void
+Suma::Restart::startNode(Signal* signal, Uint32 sumaRef)
+{
+ jam();
+ resetNode(sumaRef);
+
+ // right now we can only handle restarting one node
+ // at a time in a node group
+
+ createSubscription(signal, sumaRef);
+}
+
+void
+Suma::Restart::createSubscription(Signal* signal, Uint32 sumaRef) {
+ jam();
+ suma.c_subscriptions.first(c_subPtr);
+ nextSubscription(signal, sumaRef);
+}
+
+void
+Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef) {
+ jam();
+ if (c_subPtr.isNull()) {
+ jam();
+ completeSubscription(signal, sumaRef);
+ return;
+ }
+ SubscriptionPtr subPtr;
+ subPtr.i = c_subPtr.curr.i;
+ subPtr.p = suma.c_subscriptions.getPtr(subPtr.i);
+
+ suma.c_subscriptions.next(c_subPtr);
+
+ SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
+
+ req->subscriberRef = suma.reference();
+ req->subscriberData = subPtr.i;
+ req->subscriptionId = subPtr.p->m_subscriptionId;
+ req->subscriptionKey = subPtr.p->m_subscriptionKey;
+ req->subscriptionType = subPtr.p->m_subscriptionType |
+ SubCreateReq::RestartFlag;
+
+ switch (subPtr.p->m_subscriptionType) {
+ case SubCreateReq::TableEvent:
+ case SubCreateReq::SelectiveTableSnapshot:
+ case SubCreateReq::DatabaseSnapshot: {
+ jam();
+
+ Ptr<SyncRecord> syncPtr;
+ suma.c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+ syncPtr.p->m_tableList.first(syncPtr.p->m_tableList_it);
+
+ ndbrequire(!syncPtr.p->m_tableList_it.isNull());
+
+ req->tableId = *syncPtr.p->m_tableList_it.data;
+
+#if 0
+ for (int i = 0; i < MAX_TABLES; i++)
+ if (subPtr.p->m_tables[i]) {
+ req->tableId = i;
+ break;
+ }
+#endif
+
+ suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal,
+ SubCreateReq::SignalLength+1 /*to get table Id*/, JBB);
+ return;
+ }
+ case SubCreateReq::SingleTableScan :
+ // TODO
+ jam();
+ return;
+ }
+ ndbrequire(false);
+}
+
+void
+Suma::execSUB_CREATE_CONF(Signal* signal) {
+ jamEntry();
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("Suma::execSUB_CREATE_CONF");
+#endif
+
+ const Uint32 senderRef = signal->senderBlockRef();
+
+ SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr();
+
+ Subscription key;
+ const Uint32 subscriberData = conf->subscriberData;
+ key.m_subscriptionId = conf->subscriptionId;
+ key.m_subscriptionKey = conf->subscriptionKey;
+
+ SubscriptionPtr subPtr;
+ ndbrequire(c_subscriptions.find(subPtr, key));
+
+ switch(subPtr.p->m_subscriptionType) {
+ case SubCreateReq::TableEvent:
+ case SubCreateReq::SelectiveTableSnapshot:
+ case SubCreateReq::DatabaseSnapshot:
+ {
+ Ptr<SyncRecord> syncPtr;
+ c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
+
+ syncPtr.p->m_tableList.next(syncPtr.p->m_tableList_it);
+ if (syncPtr.p->m_tableList_it.isNull()) {
+ jam();
+ SubSyncReq *req = (SubSyncReq *)signal->getDataPtrSend();
+
+ req->subscriptionId = key.m_subscriptionId;
+ req->subscriptionKey = key.m_subscriptionKey;
+ req->subscriberData = subscriberData;
+ req->part = (Uint32) SubscriptionData::MetaData;
+
+ sendSignal(senderRef, GSN_SUB_SYNC_REQ, signal,
+ SubSyncReq::SignalLength, JBB);
+ } else {
+ jam();
+ SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
+
+ req->subscriberRef = reference();
+ req->subscriberData = subPtr.i;
+ req->subscriptionId = subPtr.p->m_subscriptionId;
+ req->subscriptionKey = subPtr.p->m_subscriptionKey;
+ req->subscriptionType = subPtr.p->m_subscriptionType |
+ SubCreateReq::RestartFlag |
+ SubCreateReq::AddTableFlag;
+
+ req->tableId = *syncPtr.p->m_tableList_it.data;
+
+ sendSignal(senderRef, GSN_SUB_CREATE_REQ, signal,
+ SubCreateReq::SignalLength+1 /*to get table Id*/, JBB);
+ }
+ }
+ return;
+ case SubCreateReq::SingleTableScan:
+ ndbrequire(false);
+ }
+ ndbrequire(false);
+}
+
+void
+Suma::execSUB_CREATE_REF(Signal* signal) {
+ jamEntry();
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("Suma::execSUB_CREATE_REF");
+#endif
+ //ndbrequire(false);
+}
+
+void
+Suma::execSUB_SYNC_CONF(Signal* signal) {
+ jamEntry();
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("Suma::execSUB_SYNC_CONF");
+#endif
+ Uint32 sumaRef = signal->getSendersBlockRef();
+
+ SubSyncConf *conf = (SubSyncConf *)signal->getDataPtr();
+ Subscription key;
+
+ key.m_subscriptionId = conf->subscriptionId;
+ key.m_subscriptionKey = conf->subscriptionKey;
+ // SubscriptionData::Part part = (SubscriptionData::Part)conf->part;
+ // const Uint32 subscriberData = conf->subscriberData;
+
+ SubscriptionPtr subPtr;
+ c_subscriptions.find(subPtr, key);
+
+ switch(subPtr.p->m_subscriptionType) {
+ case SubCreateReq::TableEvent:
+ case SubCreateReq::SelectiveTableSnapshot:
+ case SubCreateReq::DatabaseSnapshot:
+ jam();
+ Restart.nextSubscription(signal, sumaRef);
+ return;
+ case SubCreateReq::SingleTableScan:
+ ndbrequire(false);
+ return;
+ }
+ ndbrequire(false);
+}
+
+void
+Suma::execSUB_SYNC_REF(Signal* signal) {
+ jamEntry();
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("Suma::execSUB_SYNC_REF");
+#endif
+ //ndbrequire(false);
+}
+
+void
+Suma::execSUMA_START_ME(Signal* signal) {
+ jamEntry();
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("Suma::execSUMA_START_ME");
+#endif
+
+ Restart.runSUMA_START_ME(signal, signal->getSendersBlockRef());
+}
+
+void
+Suma::Restart::runSUMA_START_ME(Signal* signal, Uint32 sumaRef) {
+ int I = suma.RtoI(sumaRef);
+
+ // restarting Suma is ready for SUB_START_REQ
+ if (c_waitingToStart[I]) {
+ // we've waited with startSubscriber since restarting suma was not ready
+ c_waitingToStart[I] = false;
+ startSubscriber(signal, sumaRef);
+ } else {
+ // do startSubscriber as soon as its time
+ c_okToStart[I] = true;
+ }
+}
+
+void
+Suma::Restart::completeSubscription(Signal* signal, Uint32 sumaRef) {
+ jam();
+ int I = suma.RtoI(sumaRef);
+
+ if (c_okToStart[I]) {// otherwise will start when START_ME comes
+ c_okToStart[I] = false;
+ startSubscriber(signal, sumaRef);
+ } else {
+ c_waitingToStart[I] = true;
+ }
+}
+
+void
+Suma::Restart::startSubscriber(Signal* signal, Uint32 sumaRef) {
+ jam();
+ suma.c_dataSubscribers.first(c_subbPtr);
+ nextSubscriber(signal, sumaRef);
+}
+
+void
+Suma::Restart::sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
+ Signal* signal, Uint32 sumaRef)
+{
+ jam();
+ SubStartReq * req = (SubStartReq *)signal->getDataPtrSend();
+
+ req->senderRef = suma.reference();
+ req->senderData = subbPtr.p->m_senderData;
+ req->subscriptionId = subPtr.p->m_subscriptionId;
+ req->subscriptionKey = subPtr.p->m_subscriptionKey;
+ req->part = SubscriptionData::TableData;
+ req->subscriberData = subbPtr.p->m_subscriberData;
+ req->subscriberRef = subbPtr.p->m_subscriberRef;
+
+ // restarting suma will not respond to this until startphase 5
+ // since it is not until then data copying has been completed
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("Suma::Restart::sendSubStartReq sending GSN_SUB_START_REQ id=%u key=%u",
+ req->subscriptionId, req->subscriptionKey);
+#endif
+ suma.sendSignal(sumaRef, GSN_SUB_START_REQ,
+ signal, SubStartReq::SignalLength2, JBB);
+}
+
+void
+Suma::execSUB_START_CONF(Signal* signal) {
+ jamEntry();
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("Suma::execSUB_START_CONF");
+#endif
+ Uint32 sumaRef = signal->getSendersBlockRef();
+ Restart.nextSubscriber(signal, sumaRef);
+}
+
+void
+Suma::execSUB_START_REF(Signal* signal) {
+ jamEntry();
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("Suma::execSUB_START_REF");
+#endif
+ //ndbrequire(false);
+}
+
+void
+Suma::Restart::nextSubscriber(Signal* signal, Uint32 sumaRef) {
+ jam();
+ if (c_subbPtr.isNull()) {
+ jam();
+ completeSubscriber(signal, sumaRef);
+ return;
+ }
+
+ SubscriberPtr subbPtr = c_subbPtr;
+ suma.c_dataSubscribers.next(c_subbPtr);
+
+ /*
+ * get subscription ptr for this subscriber
+ */
+
+ SubscriptionPtr subPtr;
+ suma.c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
+ switch (subPtr.p->m_subscriptionType) {
+ case SubCreateReq::TableEvent:
+ case SubCreateReq::SelectiveTableSnapshot:
+ case SubCreateReq::DatabaseSnapshot:
+ {
+ jam();
+ sendSubStartReq(subPtr, subbPtr, signal, sumaRef);
+#if 0
+ SubStartReq * req = (SubStartReq *)signal->getDataPtrSend();
+
+ req->senderRef = reference();
+ req->senderData = subbPtr.p->m_senderData;
+ req->subscriptionId = subPtr.p->m_subscriptionId;
+ req->subscriptionKey = subPtr.p->m_subscriptionKey;
+ req->part = SubscriptionData::TableData;
+ req->subscriberData = subbPtr.p->m_subscriberData;
+ req->subscriberRef = subbPtr.p->m_subscriberRef;
+
+ // restarting suma will not respond to this until startphase 5
+ // since it is not until then data copying has been completed
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("Suma::nextSubscriber sending GSN_SUB_START_REQ id=%u key=%u",
+ req->subscriptionId, req->subscriptionKey);
+#endif
+ suma.sendSignal(sumaRef, GSN_SUB_START_REQ,
+ signal, SubStartReq::SignalLength2, JBB);
+#endif
+ }
+ return;
+ case SubCreateReq::SingleTableScan:
+ ndbrequire(false);
+ return;
+ }
+ ndbrequire(false);
+}
+
+void
+Suma::Restart::completeSubscriber(Signal* signal, Uint32 sumaRef) {
+ completeRestartingNode(signal, sumaRef);
+}
+
+void
+Suma::Restart::completeRestartingNode(Signal* signal, Uint32 sumaRef) {
+ jam();
+ SumaHandoverReq * req = (SumaHandoverReq *)signal->getDataPtrSend();
+
+ req->gci = suma.getFirstGCI(signal);
+
+ suma.sendSignal(sumaRef, GSN_SUMA_HANDOVER_REQ, signal,
+ SumaHandoverReq::SignalLength, JBB);
+}
+
+// only run on restarting suma
+
+void
+Suma::execSUMA_HANDOVER_REQ(Signal* signal)
+{
+ jamEntry();
+ // Uint32 sumaRef = signal->getSendersBlockRef();
+ SumaHandoverReq const * req = (SumaHandoverReq *)signal->getDataPtr();
+
+ Uint32 gci = req->gci;
+ Uint32 new_gci = getFirstGCI(signal);
+
+ if (new_gci > gci) {
+ gci = new_gci;
+ }
+
+ { // all recreated subscribers at restarting SUMA start at same GCI
+ SubscriberPtr subbPtr;
+ for(c_dataSubscribers.first(subbPtr);
+ !subbPtr.isNull();
+ c_dataSubscribers.next(subbPtr)){
+ subbPtr.p->m_firstGCI = gci;
+ }
+ }
+
+#ifdef NODEFAIL_DEBUG
+ ndbout_c("Suma::execSUMA_HANDOVER_REQ, gci = %u", gci);
+#endif
+
+ c_handoverToDo = false;
+ c_restartLock = false;
+ {
+#ifdef HANDOVER_DEBUG
+ int c = 0;
+#endif
+ for( int i = 0; i < NO_OF_BUCKETS; i++) {
+ jam();
+ if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
+#ifdef HANDOVER_DEBUG
+ c++;
+#endif
+ jam();
+ c_buckets[i].active = false;
+ c_buckets[i].handoverGCI = gci;
+ c_buckets[i].handover = true;
+ c_buckets[i].handover_started = false;
+ c_handoverToDo = true;
+ }
+ }
+#ifdef HANDOVER_DEBUG
+ ndbout_c("prepared handover of bucket %u buckets", c);
+#endif
+ }
+
+ for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
+ jam();
+ Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);
+ if (ref != reference()) {
+ jam();
+ sendSignal(ref, GSN_SUMA_HANDOVER_CONF, signal,
+ SumaHandoverConf::SignalLength, JBB);
+ }//if
+ }
+}
+
+// only run on all but restarting suma
+void
+Suma::execSUMA_HANDOVER_CONF(Signal* signal) {
+ jamEntry();
+ Uint32 sumaRef = signal->getSendersBlockRef();
+ SumaHandoverConf const * conf = (SumaHandoverConf *)signal->getDataPtr();
+
+ Uint32 gci = conf->gci;
+
+#ifdef HANDOVER_DEBUG
+ ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci);
+#endif
+
+ /* TODO, if we are restarting several SUMA's (>2 in a nodegroup)
+ * we have to collect all these conf's before proceding
+ */
+
+ // restarting node is now prepared and ready
+ c_preparingNodes.clear(refToNode(sumaRef)); /* !! important to do before
+ * below since it affects
+ * getResponsibleSumaNodeId()
+ */
+
+ c_handoverToDo = false;
+ // mark all active buckets really belonging to restarting SUMA
+ for( int i = 0; i < NO_OF_BUCKETS; i++) {
+ if (c_buckets[i].active) {
+ // I'm running this bucket
+ if (getResponsibleSumaNodeId(i) == refToNode(sumaRef)) {
+ // but it should really be the restarted node
+ c_buckets[i].handoverGCI = gci;
+ c_buckets[i].handover = true;
+ c_buckets[i].handover_started = false;
+ c_handoverToDo = true;
+ }
+ }
+ }
+}
+
+template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&);
+