diff options
author | tomas@poseidon.ndb.mysql.com <> | 2005-08-23 12:16:16 +0200 |
---|---|---|
committer | tomas@poseidon.ndb.mysql.com <> | 2005-08-23 12:16:16 +0200 |
commit | 653e99dea2099a1d47c82ad8142c0ac3cf4c74cd (patch) | |
tree | 3db751368d871e126914b592709ad3b7e4fc678f /ndb | |
parent | 28e3fe8759fe36c5db7eeb0765ea921eb252dfce (diff) | |
download | mariadb-git-653e99dea2099a1d47c82ad8142c0ac3cf4c74cd.tar.gz |
removed ndb grep from configure .in
Diffstat (limited to 'ndb')
-rw-r--r-- | ndb/src/kernel/blocks/dblqh/redoLogReader/Makefile | 9 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/grep/Grep.cpp | 2010 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/grep/Grep.hpp | 535 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/grep/GrepInit.cpp | 164 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/grep/Makefile.am | 23 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/grep/systab_test/Makefile | 12 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/grep/systab_test/grep_systab_test.cpp | 138 | ||||
-rw-r--r-- | ndb/test/ndbapi/testGrep.cpp | 540 |
8 files changed, 0 insertions, 3431 deletions
diff --git a/ndb/src/kernel/blocks/dblqh/redoLogReader/Makefile b/ndb/src/kernel/blocks/dblqh/redoLogReader/Makefile deleted file mode 100644 index a89b648de77..00000000000 --- a/ndb/src/kernel/blocks/dblqh/redoLogReader/Makefile +++ /dev/null @@ -1,9 +0,0 @@ -include .defs.mk - -BIN_TARGET := redoLogFileReader - -SOURCES := records.cpp redoLogFileReader.cpp - -TYPE := util - -include $(NDB_TOP)/Epilogue.mk diff --git a/ndb/src/kernel/blocks/grep/Grep.cpp b/ndb/src/kernel/blocks/grep/Grep.cpp deleted file mode 100644 index e89361dab06..00000000000 --- a/ndb/src/kernel/blocks/grep/Grep.cpp +++ /dev/null @@ -1,2010 +0,0 @@ -/* Copyright (C) 2003 MySQL AB - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include "Grep.hpp" -#include <ndb_version.h> - -#include <NdbTCP.h> -#include <Bitmask.hpp> - -#include <signaldata/NodeFailRep.hpp> -#include <signaldata/ReadNodesConf.hpp> -#include <signaldata/CheckNodeGroups.hpp> -#include <signaldata/GrepImpl.hpp> -#include <signaldata/RepImpl.hpp> -#include <signaldata/EventReport.hpp> -#include <signaldata/DictTabInfo.hpp> -#include <signaldata/GetTabInfo.hpp> -#include <signaldata/WaitGCP.hpp> -#include <GrepEvent.hpp> -#include <AttributeHeader.hpp> - -#define CONTINUEB_DELAY 500 -#define SSREPBLOCKNO 2 -#define PSREPBLOCKNO 2 - -//#define DEBUG_GREP -//#define DEBUG_GREP_SUBSCRIPTION -//#define DEBUG_GREP_TRANSFER -//#define DEBUG_GREP_APPLY -//#define DEBUG_GREP_DELETE - -/************************************************************************** - * ------------------------------------------------------------------------ - * MODULE: STARTUP of GREP Block, etc - * ------------------------------------------------------------------------ - **************************************************************************/ -static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE; -void -Grep::getNodeGroupMembers(Signal* signal) { - jam(); - /** - * Ask DIH for nodeGroupMembers - */ - CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend(); - sd->blockRef = reference(); - sd->requestType = - CheckNodeGroups::Direct | - CheckNodeGroups::GetNodeGroupMembers; - sd->nodeId = getOwnNodeId(); - EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal, - CheckNodeGroups::SignalLength); - jamEntry(); - - c_nodeGroup = sd->output; - c_noNodesInGroup = 0; - for (int i = 0; i < MAX_NDB_NODES; i++) { - if (sd->mask.get(i)) { - if (i == getOwnNodeId()) c_idInNodeGroup = c_noNodesInGroup; - c_nodesInGroup[c_noNodesInGroup] = i; - c_noNodesInGroup++; - } - } - ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup - -#ifdef NODEFAIL_DEBUG - for (Uint32 i = 0; i < c_noNodesInGroup; i++) { - ndbout_c ("Grep: NodeGroup %u, me %u, me in group %u, member[%u] %u", - c_nodeGroup, getOwnNodeId(), c_idInNodeGroup, - i, c_nodesInGroup[i]); - } -#endif -} - - -void -Grep::execSTTOR(Signal* signal) -{ - jamEntry(); - const Uint32 startphase = signal->theData[1]; - const Uint32 typeOfStart = signal->theData[7]; - if (startphase == 3) - { - jam(); - signal->theData[0] = reference(); - g_TypeOfStart = typeOfStart; - sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB); - return; - } - if(startphase == 5) { - jam(); - /** - * we don't want any log/meta records comming to use - * until we are done with the recovery. - */ - if (g_TypeOfStart == NodeState::ST_NODE_RESTART) { - jam(); - pspart.m_recoveryMode = true; - getNodeGroupMembers(signal); - for (Uint32 i = 0; i < c_noNodesInGroup; i++) { - Uint32 ref =numberToRef(GREP, c_nodesInGroup[i]); - if (ref != reference()) - sendSignal(ref, GSN_GREP_START_ME, signal, - 1 /*SumaStartMe::SignalLength*/, JBB); - } - } else pspart.m_recoveryMode = false; - - } - - if(startphase == 7) { - jam(); - if (g_TypeOfStart == NodeState::ST_NODE_RESTART) { - pspart.m_recoveryMode = false; - } - } - - sendSTTORRY(signal); -} - - -void -Grep::PSPart::execSTART_ME(Signal* signal) -{ - jamEntry(); - GrepStartMe * me =(GrepStartMe*)signal->getDataPtr(); - BlockReference ref = me->senderRef; - GrepAddSubReq* const addReq = (GrepAddSubReq *)signal->getDataPtr(); - - - SubscriptionPtr subPtr; - c_subscriptions.first(c_subPtr); - for(; !c_subPtr.isNull(); c_subscriptions.next(c_subPtr)) { - jam(); - subPtr.i = c_subPtr.curr.i; - subPtr.p = c_subscriptions.getPtr(subPtr.i); - addReq->subscriptionId = subPtr.p->m_subscriptionId; - addReq->subscriptionKey = subPtr.p->m_subscriptionKey; - addReq->subscriberData = subPtr.p->m_subscriberData; - addReq->subscriptionType = subPtr.p->m_subscriptionType; - addReq->senderRef = subPtr.p->m_coordinatorRef; - addReq->subscriberRef =subPtr.p->m_subscriberRef; - - sendSignal(ref, - GSN_GREP_ADD_SUB_REQ, - signal, - GrepAddSubReq::SignalLength, - JBB); - } - - addReq->subscriptionId = 0; - addReq->subscriptionKey = 0; - addReq->subscriberData = 0; - addReq->subscriptionType = 0; - addReq->senderRef = 0; - addReq->subscriberRef = 0; - - sendSignal(ref, - GSN_GREP_ADD_SUB_REQ, - signal, - GrepAddSubReq::SignalLength, - JBB); -} - -void -Grep::PSPart::execGREP_ADD_SUB_REQ(Signal* signal) -{ - jamEntry(); - GrepAddSubReq * const grepReq = (GrepAddSubReq *)signal->getDataPtr(); - const Uint32 subId = grepReq->subscriptionId; - const Uint32 subKey = grepReq->subscriptionKey; - const Uint32 subData = grepReq->subscriberData; - const Uint32 subType = grepReq->subscriptionType; - const Uint32 coordinatorRef = grepReq->senderRef; - - /** - * this is ref to the REP node for this subscription. - */ - const Uint32 subRef = grepReq->subscriberRef; - - if(subId!=0 && subKey!=0) { - jam(); - SubscriptionPtr subPtr; - ndbrequire( c_subscriptionPool.seize(subPtr)); - subPtr.p->m_coordinatorRef = coordinatorRef; - subPtr.p->m_subscriptionId = subId; - subPtr.p->m_subscriptionKey = subKey; - subPtr.p->m_subscriberRef = subRef; - subPtr.p->m_subscriberData = subData; - subPtr.p->m_subscriptionType = subType; - - c_subscriptions.add(subPtr); - } - else { - jam(); - GrepAddSubConf * conf = (GrepAddSubConf *)grepReq; - conf->noOfSub = - c_subscriptionPool.getSize()-c_subscriptionPool.getNoOfFree(); - sendSignal(signal->getSendersBlockRef(), - GSN_GREP_ADD_SUB_CONF, - signal, - GrepAddSubConf::SignalLength, - JBB); - } -} - -void -Grep::PSPart::execGREP_ADD_SUB_REF(Signal* signal) -{ - /** - * @todo fix error stuff - */ -} - -void -Grep::PSPart::execGREP_ADD_SUB_CONF(Signal* signal) -{ - jamEntry(); - GrepAddSubConf* const conf = (GrepAddSubConf *)signal->getDataPtr(); - Uint32 noOfSubscriptions = conf->noOfSub; - Uint32 noOfRestoredSubscriptions = - c_subscriptionPool.getSize()-c_subscriptionPool.getNoOfFree(); - if(noOfSubscriptions!=noOfRestoredSubscriptions) { - jam(); - /** - *@todo send ref signal - */ - ndbrequire(false); - } -} - -void -Grep::execREAD_NODESCONF(Signal* signal) -{ - jamEntry(); - ReadNodesConf * conf = (ReadNodesConf *)signal->getDataPtr(); - -#if 0 - ndbout_c("Grep: Recd READ_NODESCONF"); -#endif - - /****************************** - * Check which REP nodes exist - ******************************/ - Uint32 i; - for (i = 1; i < MAX_NODES; i++) - { - jam(); -#if 0 - ndbout_c("Grep: Found node %d of type %d", i, getNodeInfo(i).getType()); -#endif - if (getNodeInfo(i).getType() == NodeInfo::REP) - { - jam(); - /** - * @todo This should work for more than ONE rep node! - */ - pscoord.m_repRef = numberToRef(PSREPBLOCKNO, i); - pspart.m_repRef = numberToRef(PSREPBLOCKNO, i); -#if 0 - ndbout_c("Grep: REP node %d detected", i); -#endif - } - } - - /***************************** - * Check which DB nodes exist - *****************************/ - m_aliveNodes.clear(); - - Uint32 count = 0; - for(i = 0; i<MAX_NDB_NODES; i++) - { - if (NodeBitmask::get(conf->allNodes, i)) - { - jam(); - count++; - - NodePtr node; - ndbrequire(m_nodes.seize(node)); - - node.p->nodeId = i; - if (NodeBitmask::get(conf->inactiveNodes, i)) - { - node.p->alive = 0; - } - else - { - node.p->alive = 1; - m_aliveNodes.set(i); - } - } - } - m_masterNodeId = conf->masterNodeId; - ndbrequire(count == conf->noOfNodes); - sendSTTORRY(signal); -} - -void -Grep::sendSTTORRY(Signal* signal) -{ - signal->theData[0] = 0; - signal->theData[3] = 1; - signal->theData[4] = 3; - signal->theData[5] = 5; - signal->theData[6] = 7; - signal->theData[7] = 255; // No more start phases from missra - sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 8, JBB); -} - -void -Grep::execNDB_STTOR(Signal* signal) -{ - jamEntry(); -} - -void -Grep::execDUMP_STATE_ORD(Signal* signal) -{ - jamEntry(); - //Uint32 tCase = signal->theData[0]; - -#if 0 - if(sscoord.m_repRef == 0) - { - ndbout << "Grep: Recd DUMP signal but has no connection with REP node" - << endl; - return; - } -#endif - - /* - switch (tCase) - { - case 8100: sscoord.grepReq(signal, GrepReq::START_SUBSCR); break; - case 8102: sscoord.grepReq(signal, GrepReq::START_METALOG); break; - case 8104: sscoord.grepReq(signal, GrepReq::START_METASCAN); break; - case 8106: sscoord.grepReq(signal, GrepReq::START_DATALOG); break; - case 8108: sscoord.grepReq(signal, GrepReq::START_DATASCAN); break; - case 8110: sscoord.grepReq(signal, GrepReq::STOP_SUBSCR); break; - case 8500: sscoord.grepReq(signal, GrepReq::REMOVE_BUFFERS); break; - case 8300: sscoord.grepReq(signal, GrepReq::SLOWSTOP); break; - case 8400: sscoord.grepReq(signal, GrepReq::FASTSTOP); break; - case 8600: sscoord.grepReq(signal, GrepReq::CREATE_SUBSCR); break; - case 8700: sscoord.dropTable(signal,(Uint32)signal->theData[1]);break; - default: break; - } - */ -} - -/** - * Signal received when REP node has failed - */ -void -Grep::execAPI_FAILREQ(Signal* signal) -{ - jamEntry(); - //Uint32 failedApiNode = signal->theData[0]; - //BlockReference retRef = signal->theData[1]; - - /** - * @todo We should probably do something smart if the - * PS REP node fails???? /Lars - */ - -#if 0 - ndbout_c("Grep: API_FAILREQ received for API node %d.", failedApiNode); -#endif - - /** - * @note This signal received is NOT allowed to send any CONF - * signal, since this would screw up TC/DICT to API - * "connections". - */ -} - -/************************************************************************** - * ------------------------------------------------------------------------ - * MODULE: GREP Control - * ------------------------------------------------------------------------ - **************************************************************************/ -void -Grep::execGREP_REQ(Signal* signal) -{ - jamEntry(); - - //GrepReq * req = (GrepReq *)signal->getDataPtr(); - - /** - * @todo Fix so that request is redirected to REP Server - * Obsolete? - * Was: sscoord.grepReq(signal, req->request); - */ - ndbout_c("Warning! REP commands can only be executed at REP SERVER prompt!"); -} - - -/************************************************************************** - * ------------------------------------------------------------------------ - * MODULE: NODE STATE HANDLING - * ------------------------------------------------------------------------ - **************************************************************************/ -void -Grep::execNODE_FAILREP(Signal* signal) -{ - jamEntry(); - NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr(); - bool changed = false; - - NodePtr nodePtr; - for(m_nodes.first(nodePtr); nodePtr.i != RNIL; m_nodes.next(nodePtr)) - { - jam(); - if (NodeBitmask::get(rep->theNodes, nodePtr.p->nodeId)) - { - jam(); - - if (nodePtr.p->alive) - { - jam(); - ndbassert(m_aliveNodes.get(nodePtr.p->nodeId)); - changed = true; - } - else - { - ndbassert(!m_aliveNodes.get(nodePtr.p->nodeId)); - } - - nodePtr.p->alive = 0; - m_aliveNodes.clear(nodePtr.p->nodeId); - } - } - - - /** - * Problem: Fix a node failure running a protocol - * - * 1. Coordinator node of a protocol dies - * - Elect a new coordinator - * - send ref to user - * - * 2. Non-coordinator dies. - * - make coordinator aware of this - * so that coordinator does not wait for - * conf from faulty node - * - node recovery will restore the non-coordinator. - * - */ -} - -void -Grep::execINCL_NODEREQ(Signal* signal) -{ - jamEntry(); - - //const Uint32 senderRef = signal->theData[0]; - const Uint32 inclNode = signal->theData[1]; - - NodePtr node; - for(m_nodes.first(node); node.i != RNIL; m_nodes.next(node)) - { - jam(); - const Uint32 nodeId = node.p->nodeId; - if (inclNode == nodeId) { - jam(); - - ndbrequire(node.p->alive == 0); - ndbassert(!m_aliveNodes.get(nodeId)); - - node.p->alive = 1; - m_aliveNodes.set(nodeId); - - break; - } - } - - /** - * @todo: if we include this DIH's got to be prepared, later if needed... - */ -#if 0 - signal->theData[0] = reference(); - - sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB); -#endif -} - - -/** - * Helper methods - */ -void -Grep::PSCoord::prepareOperationRec(SubCoordinatorPtr subPtr, - BlockReference subscriber, - Uint32 subId, - Uint32 subKey, - Uint32 request) -{ - subPtr.p->m_coordinatorRef = reference(); - subPtr.p->m_subscriberRef = subscriber; - subPtr.p->m_subscriberData = subPtr.i; - subPtr.p->m_subscriptionId = subId; - subPtr.p->m_subscriptionKey = subKey; - subPtr.p->m_outstandingRequest = request; -} - - -/************************************************************************** - * ------------------------------------------------------------------------ - * MODULE: CREATE SUBSCRIPTION ID - * ------------------------------------------------------------------------ - * - * Requests SUMA to create a unique subscription id - **************************************************************************/ - -void -Grep::PSCoord::execGREP_CREATE_SUBID_REQ(Signal* signal) -{ - jamEntry(); - - CreateSubscriptionIdReq * req = - (CreateSubscriptionIdReq*)signal->getDataPtr(); - BlockReference ref = signal->getSendersBlockRef(); - - SubCoordinatorPtr subPtr; - if( !c_subCoordinatorPool.seize(subPtr)) { - jam(); - SubCoordinator sub; - sub.m_subscriberRef = ref; - sub.m_subscriptionId = 0; - sub.m_subscriptionKey = 0; - sendRefToSS(signal, sub, GrepError::SUBSCRIPTION_ID_NOMEM ); - return; - } - prepareOperationRec(subPtr, - ref, - 0,0, - GSN_CREATE_SUBID_REQ); - - - ndbout_c("SUBID_REQ Ref %d",ref); - req->senderData=subPtr.p->m_subscriberData; - - sendSignal(SUMA_REF, GSN_CREATE_SUBID_REQ, signal, - SubCreateReq::SignalLength, JBB); - -#if 1 //def DEBUG_GREP_SUBSCRIPTION - ndbout_c("Grep::PSCoord: Sent CREATE_SUBID_REQ to SUMA"); -#endif -} - -void -Grep::PSCoord::execCREATE_SUBID_CONF(Signal* signal) -{ - jamEntry(); - CreateSubscriptionIdConf const * conf = - (CreateSubscriptionIdConf *)signal->getDataPtr(); - Uint32 subId = conf->subscriptionId; - Uint32 subKey = conf->subscriptionKey; - Uint32 subData = conf->subscriberData; - -#if 1 //def DEBUG_GREP_SUBSCRIPTION - ndbout_c("Grep::PSCoord: Recd GREP_SUBID_CONF (subId:%d, subKey:%d)", - subId, subKey); -#endif - - SubCoordinatorPtr subPtr; - c_subCoordinatorPool.getPtr(subPtr, subData); - BlockReference repRef = subPtr.p->m_subscriberRef; - - { // Check that id/key is unique - SubCoordinator key; - SubCoordinatorPtr tmp; - key.m_subscriptionId = subId; - key.m_subscriptionKey = subKey; - if(c_runningSubscriptions.find(tmp, key)){ - jam(); - SubCoordinator sub; - sub.m_subscriberRef=repRef; - sub.m_subscriptionId = subId; - sub.m_subscriptionKey = subKey; - sendRefToSS(signal,sub, GrepError::SUBSCRIPTION_ID_NOT_UNIQUE ); - return; - } - } - - sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_CREATE_SUBID_CONF, signal, - CreateSubscriptionIdConf::SignalLength, JBB); - c_subCoordinatorPool.release(subData); - - m_grep->sendEventRep(signal, - EventReport::GrepSubscriptionInfo, - GrepEvent::GrepPS_CreateSubIdConf, - subId, - subKey, - (Uint32)GrepError::GE_NO_ERROR); -} - -void -Grep::PSCoord::execCREATE_SUBID_REF(Signal* signal) { - jamEntry(); - CreateSubscriptionIdRef const * ref = - (CreateSubscriptionIdRef *)signal->getDataPtr(); - Uint32 subData = ref->subscriberData; - GrepError::GE_Code err; - - Uint32 sendersBlockRef = signal->getSendersBlockRef(); - if(sendersBlockRef == SUMA_REF) - { - jam(); - err = GrepError::SUBSCRIPTION_ID_SUMA_FAILED_CREATE; - } else { - jam(); - ndbrequire(false); /* Added since errorcode err unhandled - * TODO: fix correct errorcode - */ - err= GrepError::GE_NO_ERROR; // remove compiler warning - } - - SubCoordinatorPtr subPtr; - c_runningSubscriptions.getPtr(subPtr, subData); - BlockReference repref = subPtr.p->m_subscriberRef; - - SubCoordinator sub; - sub.m_subscriberRef = repref; - sub.m_subscriptionId = 0; - sub.m_subscriptionKey = 0; - sendRefToSS(signal,sub, err); - -} - - -/************************************************************************** - * ------------------------------------------------------------------------ - * MODULE: CREATE SUBSCRIPTION - * ------------------------------------------------------------------------ - * - * Creates a subscription for every GREP to its local SUMA. - * GREP node that executes createSubscription becomes the GREP Coord. - **************************************************************************/ - -/** - * Request to create a subscription (sent from SS) - */ -void -Grep::PSCoord::execGREP_SUB_CREATE_REQ(Signal* signal) -{ - jamEntry(); - GrepSubCreateReq const * grepReq = (GrepSubCreateReq *)signal->getDataPtr(); - Uint32 subId = grepReq->subscriptionId; - Uint32 subKey = grepReq->subscriptionKey; - Uint32 subType = grepReq->subscriptionType; - BlockReference rep = signal->getSendersBlockRef(); - - GrepCreateReq * req =(GrepCreateReq*)grepReq; - - SubCoordinatorPtr subPtr; - - if( !c_subCoordinatorPool.seize(subPtr)) { - jam(); - SubCoordinator sub; - sub.m_subscriberRef = rep; - sub.m_subscriptionId = 0; - sub.m_subscriptionKey = 0; - sub.m_outstandingRequest = GSN_GREP_CREATE_REQ; - sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL); - return; - } - prepareOperationRec(subPtr, - numberToRef(PSREPBLOCKNO, refToNode(rep)), subId, subKey, - GSN_GREP_CREATE_REQ); - - /* Get the payload of the signal. - */ - SegmentedSectionPtr selectedTablesPtr; - if(subType == SubCreateReq::SelectiveTableSnapshot) { - jam(); - ndbrequire(signal->getNoOfSections()==1); - signal->getSection(selectedTablesPtr,0); - signal->header.m_noOfSections = 0; - } - /** - * Prepare the signal to be sent to Grep participatns - */ - subPtr.p->m_subscriptionType = subType; - req->senderRef = reference(); - req->subscriberRef = numberToRef(PSREPBLOCKNO, refToNode(rep)); - req->subscriberData = subPtr.p->m_subscriberData; - req->subscriptionId = subId; - req->subscriptionKey = subKey; - req->subscriptionType = subType; - - /*add payload if it is a selectivetablesnap*/ - if(subType == SubCreateReq::SelectiveTableSnapshot) { - jam(); - signal->setSection(selectedTablesPtr, 0); - } - - /****************************** - * Send to all PS participants - ******************************/ - NodeReceiverGroup rg(GREP, m_grep->m_aliveNodes); - subPtr.p->m_outstandingParticipants = rg; - sendSignal(rg, - GSN_GREP_CREATE_REQ, signal, - GrepCreateReq::SignalLength, JBB); - - -#ifdef DEBUG_GREP_SUBSCRIPTION - ndbout_c("Grep::PSCoord: Sent GREP_CREATE_REQ " - "(subId:%d, subKey:%d, subData:%d, subType:%d) to parts", - subId, subKey, subPtr.p->m_subscriberData, subType); -#endif -} - -void -Grep::PSPart::execGREP_CREATE_REQ(Signal* signal) -{ - jamEntry(); - GrepCreateReq * const grepReq = (GrepCreateReq *)signal->getDataPtr(); - const Uint32 subId = grepReq->subscriptionId; - const Uint32 subKey = grepReq->subscriptionKey; - const Uint32 subData = grepReq->subscriberData; - const Uint32 subType = grepReq->subscriptionType; - const Uint32 coordinatorRef = grepReq->senderRef; - const Uint32 subRef = grepReq->subscriberRef; //this is ref to the - //REP node for this - //subscription. - - SubscriptionPtr subPtr; - ndbrequire( c_subscriptionPool.seize(subPtr)); - subPtr.p->m_coordinatorRef = coordinatorRef; - subPtr.p->m_subscriptionId = subId; - subPtr.p->m_subscriptionKey = subKey; - subPtr.p->m_subscriberRef = subRef; - subPtr.p->m_subscriberData = subPtr.i; - subPtr.p->m_subscriptionType = subType; - subPtr.p->m_outstandingRequest = GSN_GREP_CREATE_REQ; - subPtr.p->m_operationPtrI = subData; - - c_subscriptions.add(subPtr); - - SegmentedSectionPtr selectedTablesPtr; - if(subType == SubCreateReq::SelectiveTableSnapshot) { - jam(); - ndbrequire(signal->getNoOfSections()==1); - signal->getSection(selectedTablesPtr,0);// SubCreateReq::TABLE_LIST); - signal->header.m_noOfSections = 0; - } - - /** - * Prepare signal to be sent to SUMA - */ - SubCreateReq * sumaReq = (SubCreateReq *)grepReq; - sumaReq->subscriberRef = GREP_REF; - sumaReq->subscriberData = subPtr.p->m_subscriberData; - sumaReq->subscriptionId = subPtr.p->m_subscriptionId; - sumaReq->subscriptionKey = subPtr.p->m_subscriptionKey; - sumaReq->subscriptionType = subPtr.p->m_subscriptionType; - /*add payload if it is a selectivetablesnap*/ - if(subType == SubCreateReq::SelectiveTableSnapshot) { - jam(); - signal->setSection(selectedTablesPtr, 0); - } - sendSignal(SUMA_REF, - GSN_SUB_CREATE_REQ, - signal, - SubCreateReq::SignalLength, - JBB); -} - -void -Grep::PSPart::execSUB_CREATE_CONF(Signal* signal) -{ - jamEntry(); - - SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr(); - Uint32 subData = conf->subscriberData; - - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subData); - /** - @todo check why this can fuck up -johan - - ndbrequire(subPtr.p->m_subscriptionId == conf->subscriptionId); - ndbrequire(subPtr.p->m_subscriptionKey == conf->subscriptionKey); - */ -#ifdef DEBUG_GREP_SUBSCRIPTION - ndbout_c("Grep::PSPart: Recd SUB_CREATE_CONF " - "(subId:%d, subKey:%d) from SUMA", - conf->subscriptionId, conf->subscriptionKey); -#endif - - /********************* - * Send conf to coord - *********************/ - GrepCreateConf * grepConf = (GrepCreateConf*)conf; - grepConf->senderNodeId = getOwnNodeId(); - grepConf->senderData = subPtr.p->m_operationPtrI; - sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_CREATE_CONF, signal, - GrepCreateConf::SignalLength, JBB); - subPtr.p->m_outstandingRequest = 0; -} - -/** - * Handle errors that either occured in: - * 1) PSPart - * or - * 2) propagated from local SUMA - */ -void -Grep::PSPart::execSUB_CREATE_REF(Signal* signal) -{ - jamEntry(); - SubCreateRef * const ref = (SubCreateRef *)signal->getDataPtr(); - Uint32 subData = ref->subscriberData; - GrepError::GE_Code err = (GrepError::GE_Code)ref->err; - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subData); - sendRefToPSCoord(signal, *subPtr.p, err /*error*/); - subPtr.p->m_outstandingRequest = 0; -} - -void -Grep::PSCoord::execGREP_CREATE_CONF(Signal* signal) -{ - jamEntry(); - GrepCreateConf const * conf = (GrepCreateConf *)signal->getDataPtr(); - Uint32 subData = conf->senderData; - Uint32 nodeId = conf->senderNodeId; - - SubCoordinatorPtr subPtr; - c_subCoordinatorPool.getPtr(subPtr, subData); - - ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_CREATE_REQ); - - subPtr.p->m_outstandingParticipants.clearWaitingFor(nodeId); - - if(!subPtr.p->m_outstandingParticipants.done()) return; - /******************************** - * All participants have CONF:ed - ********************************/ - Uint32 subId = subPtr.p->m_subscriptionId; - Uint32 subKey = subPtr.p->m_subscriptionKey; - - GrepSubCreateConf * grepConf = (GrepSubCreateConf *)signal->getDataPtr(); - grepConf->subscriptionId = subId; - grepConf->subscriptionKey = subKey; - sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_CREATE_CONF, signal, - GrepSubCreateConf::SignalLength, JBB); - - /** - * Send event report - */ - m_grep->sendEventRep(signal, - EventReport::GrepSubscriptionInfo, - GrepEvent::GrepPS_SubCreateConf, - subId, - subKey, - (Uint32)GrepError::GE_NO_ERROR); - - c_subCoordinatorPool.release(subPtr); - -} - -/** - * Handle errors that either occured in: - * 1) PSCoord - * or - * 2) propagated from PSPart - */ -void -Grep::PSCoord::execGREP_CREATE_REF(Signal* signal) -{ - jamEntry(); - GrepCreateRef * const ref = (GrepCreateRef *)signal->getDataPtr(); - Uint32 subData = ref->senderData; - Uint32 err = ref->err; - SubCoordinatorPtr subPtr; - c_runningSubscriptions.getPtr(subPtr, subData); - - sendRefToSS(signal, *subPtr.p, (GrepError::GE_Code)err /*error*/); -} - - -/************************************************************************** - * ------------------------------------------------------------------------ - * MODULE: START SUBSCRIPTION - * ------------------------------------------------------------------------ - * - * Starts a subscription at SUMA. - * Each participant starts its own subscription. - **************************************************************************/ - -/** - * Request to start subscription (Sent from SS) - */ -void -Grep::PSCoord::execGREP_SUB_START_REQ(Signal* signal) -{ - jamEntry(); - GrepSubStartReq * const subReq = (GrepSubStartReq *)signal->getDataPtr(); - SubscriptionData::Part part = (SubscriptionData::Part) subReq->part; - Uint32 subId = subReq->subscriptionId; - Uint32 subKey = subReq->subscriptionKey; - BlockReference rep = signal->getSendersBlockRef(); - - SubCoordinatorPtr subPtr; - - if(!c_subCoordinatorPool.seize(subPtr)) { - jam(); - SubCoordinator sub; - sub.m_subscriberRef = rep; - sub.m_subscriptionId = 0; - sub.m_subscriptionKey = 0; - sub.m_outstandingRequest = GSN_GREP_START_REQ; - sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL); - return; - } - - prepareOperationRec(subPtr, - numberToRef(PSREPBLOCKNO, refToNode(rep)), - subId, subKey, - GSN_GREP_START_REQ); - - GrepStartReq * const req = (GrepStartReq *) subReq; - req->part = (Uint32) part; - req->subscriptionId = subPtr.p->m_subscriptionId; - req->subscriptionKey = subPtr.p->m_subscriptionKey; - req->senderData = subPtr.p->m_subscriberData; - - /*************************** - * Send to all participants - ***************************/ - NodeReceiverGroup rg(GREP, m_grep->m_aliveNodes); - subPtr.p->m_outstandingParticipants = rg; - sendSignal(rg, - GSN_GREP_START_REQ, - signal, - GrepStartReq::SignalLength, JBB); - -#ifdef DEBUG_GREP_SUBSCRIPTION - ndbout_c("Grep::PSCoord: Sent GREP_START_REQ " - "(subId:%d, subKey:%d, senderData:%d, part:%d) to all participants", - req->subscriptionId, req->subscriptionKey, req->senderData, part); -#endif -} - - -void -Grep::PSPart::execGREP_START_REQ(Signal* signal) -{ - jamEntry(); - GrepStartReq * const grepReq = (GrepStartReq *) signal->getDataPtr(); - SubscriptionData::Part part = (SubscriptionData::Part)grepReq->part; - Uint32 subId = grepReq->subscriptionId; - Uint32 subKey = grepReq->subscriptionKey; - Uint32 operationPtrI = grepReq->senderData; - - Subscription key; - key.m_subscriptionId = subId; - key.m_subscriptionKey = subKey; - SubscriptionPtr subPtr; - ndbrequire(c_subscriptions.find(subPtr, key));; - subPtr.p->m_outstandingRequest = GSN_GREP_START_REQ; - subPtr.p->m_operationPtrI = operationPtrI; - /** - * send SUB_START_REQ to local SUMA - */ - SubStartReq * sumaReq = (SubStartReq *) grepReq; - sumaReq->subscriptionId = subId; - sumaReq->subscriptionKey = subKey; - sumaReq->subscriberData = subPtr.i; - sumaReq->part = (Uint32) part; - - sendSignal(SUMA_REF, GSN_SUB_START_REQ, signal, - SubStartReq::SignalLength, JBB); -#ifdef DEBUG_GREP_SUBSCRIPTION - ndbout_c("Grep::PSPart: Sent SUB_START_REQ (subId:%d, subKey:%d, part:%d)", - subId, subKey, (Uint32)part); -#endif -} - - -void -Grep::PSPart::execSUB_START_CONF(Signal* signal) -{ - jamEntry(); - - SubStartConf * const conf = (SubStartConf *) signal->getDataPtr(); - SubscriptionData::Part part = (SubscriptionData::Part)conf->part; - Uint32 subId = conf->subscriptionId; - Uint32 subKey = conf->subscriptionKey; - Uint32 subData = conf->subscriberData; - Uint32 firstGCI = conf->firstGCI; -#ifdef DEBUG_GREP_SUBSCRIPTION - ndbout_c("Grep::PSPart: Recd SUB_START_CONF " - "(subId:%d, subKey:%d, subData:%d)", - subId, subKey, subData); -#endif - - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subData); - ndbrequire(subPtr.p->m_subscriptionId == subId); - ndbrequire(subPtr.p->m_subscriptionKey == subKey); - - GrepStartConf * grepConf = (GrepStartConf *)conf; - grepConf->senderData = subPtr.p->m_operationPtrI; - grepConf->part = (Uint32) part; - grepConf->subscriptionKey = subKey; - grepConf->subscriptionId = subId; - grepConf->firstGCI = firstGCI; - grepConf->senderNodeId = getOwnNodeId(); - sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_START_CONF, signal, - GrepStartConf::SignalLength, JBB); - subPtr.p->m_outstandingRequest = 0; - -#ifdef DEBUG_GREP_SUBSCRIPTION - ndbout_c("Grep::PSPart: Sent GREP_START_CONF " - "(subId:%d, subKey:%d, subData:%d, part:%d)", - subId, subKey, subData, part); -#endif -} - - -/** - * Handle errors that either occured in: - * 1) PSPart - * or - * 2) propagated from local SUMA - * - * Propagates REF signal to PSCoord - */ -void -Grep::PSPart::execSUB_START_REF(Signal* signal) -{ - SubStartRef * const ref = (SubStartRef *)signal->getDataPtr(); - Uint32 subData = ref->subscriberData; - GrepError::GE_Code err = (GrepError::GE_Code)ref->err; - SubscriptionData::Part part = (SubscriptionData::Part)ref->part; - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subData); - sendRefToPSCoord(signal, *subPtr.p, err /*error*/, part); - subPtr.p->m_outstandingRequest = 0; -} - - -/** - * Logging has started... (says PS Participant) - */ -void -Grep::PSCoord::execGREP_START_CONF(Signal* signal) -{ - jamEntry(); - - GrepStartConf * const conf = (GrepStartConf *) signal->getDataPtr(); - Uint32 subData = conf->senderData; - SubscriptionData::Part part = (SubscriptionData::Part)conf->part; - Uint32 subId = conf->subscriptionId; - Uint32 subKey = conf->subscriptionKey; - Uint32 firstGCI = conf->firstGCI; - - SubCoordinatorPtr subPtr; - c_subCoordinatorPool.getPtr(subPtr, subData); - ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_START_REQ); - - subPtr.p->m_outstandingParticipants.clearWaitingFor(conf->senderNodeId); - - if(!subPtr.p->m_outstandingParticipants.done()) return; - jam(); - - /************************* - * All participants ready - *************************/ - GrepSubStartConf * grepConf = (GrepSubStartConf *) conf; - grepConf->part = part; - grepConf->subscriptionId = subId; - grepConf->subscriptionKey = subKey; - grepConf->firstGCI = firstGCI; - - bool ok = false; - switch(part) { - case SubscriptionData::MetaData: - ok = true; - sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_START_CONF, signal, - GrepSubStartConf::SignalLength, JBB); - - /** - * Send event report - */ - m_grep->sendEventRep(signal, - EventReport::GrepSubscriptionInfo, - GrepEvent::GrepPS_SubStartMetaConf, - subId, subKey, - (Uint32)GrepError::GE_NO_ERROR); - - c_subCoordinatorPool.release(subPtr); - break; - case SubscriptionData::TableData: - ok = true; - sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_START_CONF, signal, - GrepSubStartConf::SignalLength, JBB); - - /** - * Send event report - */ - m_grep->sendEventRep(signal, - EventReport::GrepSubscriptionInfo, - GrepEvent::GrepPS_SubStartDataConf, - subId, subKey, - (Uint32)GrepError::GE_NO_ERROR); - - - c_subCoordinatorPool.release(subPtr); - break; - } - ndbrequire(ok); - -#ifdef DEBUG_GREP_SUBSCRIPTION - ndbout_c("Grep::PSCoord: Recd SUB_START_CONF (subId:%d, subKey:%d, part:%d) " - "from all slaves", - subId, subKey, (Uint32)part); -#endif -} - -/** - * Handle errors that either occured in: - * 1) PSCoord - * or - * 2) propagated from PSPart - */ -void -Grep::PSCoord::execGREP_START_REF(Signal* signal) -{ - jamEntry(); - GrepStartRef * const ref = (GrepStartRef *)signal->getDataPtr(); - Uint32 subData = ref->senderData; - GrepError::GE_Code err = (GrepError::GE_Code)ref->err; - SubscriptionData::Part part = (SubscriptionData::Part)ref->part; - - SubCoordinatorPtr subPtr; - c_runningSubscriptions.getPtr(subPtr, subData); - sendRefToSS(signal, *subPtr.p, err /*error*/, part); -} - -/************************************************************************** - * ------------------------------------------------------------------------ - * MODULE: REMOVE SUBSCRIPTION - * ------------------------------------------------------------------------ - * - * Remove a subscription at SUMA. - * Each participant removes its own subscription. - * We start by deleting the subscription inside the requestor - * since, we don't know if nodes (REP nodes or DB nodes) - * have disconnected after we sent out this and - * if we dont delete the sub in the requestor now, - * we won't be able to create a new subscription - **************************************************************************/ - -/** - * Request to abort subscription (Sent from SS) - */ -void -Grep::PSCoord::execGREP_SUB_REMOVE_REQ(Signal* signal) -{ - jamEntry(); - GrepSubRemoveReq * const subReq = (GrepSubRemoveReq *)signal->getDataPtr(); - Uint32 subId = subReq->subscriptionId; - Uint32 subKey = subReq->subscriptionKey; - BlockReference rep = signal->getSendersBlockRef(); - - SubCoordinatorPtr subPtr; - if( !c_subCoordinatorPool.seize(subPtr)) { - jam(); - SubCoordinator sub; - sub.m_subscriberRef = rep; - sub.m_subscriptionId = 0; - sub.m_subscriptionKey = 0; - sub.m_outstandingRequest = GSN_GREP_REMOVE_REQ; - sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL); - return; - } - - - prepareOperationRec(subPtr, - numberToRef(PSREPBLOCKNO, refToNode(rep)), - subId, subKey, - GSN_GREP_REMOVE_REQ); - - c_runningSubscriptions.add(subPtr); - - GrepRemoveReq * req = (GrepRemoveReq *) subReq; - req->subscriptionId = subPtr.p->m_subscriptionId; - req->subscriptionKey = subPtr.p->m_subscriptionKey; - req->senderData = subPtr.p->m_subscriberData; - req->senderRef = subPtr.p->m_coordinatorRef; - - /*************************** - * Send to all participants - ***************************/ - NodeReceiverGroup rg(GREP, m_grep->m_aliveNodes); - subPtr.p->m_outstandingParticipants = rg; - sendSignal(rg, - GSN_GREP_REMOVE_REQ, signal, - GrepRemoveReq::SignalLength, JBB); -} - - -void -Grep::PSPart::execGREP_REMOVE_REQ(Signal* signal) -{ - jamEntry(); - GrepRemoveReq * const grepReq = (GrepRemoveReq *) signal->getDataPtr(); - Uint32 subId = grepReq->subscriptionId; - Uint32 subKey = grepReq->subscriptionKey; - Uint32 subData = grepReq->senderData; - Uint32 coordinator = grepReq->senderRef; - - Subscription key; - key.m_subscriptionId = subId; - key.m_subscriptionKey = subKey; - SubscriptionPtr subPtr; - - if(!c_subscriptions.find(subPtr, key)) - { - /** - * The subscription was not found, so it must be deleted. - * Send CONF back, since it does not exist (thus, it is removed) - */ - GrepRemoveConf * grepConf = (GrepRemoveConf *)grepReq; - grepConf->subscriptionKey = subKey; - grepConf->subscriptionId = subId; - grepConf->senderData = subData; - grepConf->senderNodeId = getOwnNodeId(); - sendSignal(coordinator, GSN_GREP_REMOVE_CONF, signal, - GrepRemoveConf::SignalLength, JBB); - return; - } - - subPtr.p->m_operationPtrI = subData; - subPtr.p->m_coordinatorRef = coordinator; - subPtr.p->m_outstandingRequest = GSN_GREP_REMOVE_REQ; - - /** - * send SUB_REMOVE_REQ to local SUMA - */ - SubRemoveReq * sumaReq = (SubRemoveReq *) grepReq; - sumaReq->subscriptionId = subId; - sumaReq->subscriptionKey = subKey; - sumaReq->senderData = subPtr.i; - sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal, - SubStartReq::SignalLength, JBB); -} - - -/** - * SUB_REMOVE_CONF (from local SUMA) - */ -void -Grep::PSPart::execSUB_REMOVE_CONF(Signal* signal) -{ - jamEntry(); - SubRemoveConf * const conf = (SubRemoveConf *) signal->getDataPtr(); - Uint32 subId = conf->subscriptionId; - Uint32 subKey = conf->subscriptionKey; - Uint32 subData = conf->subscriberData; - - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subData); - ndbrequire(subPtr.p->m_subscriptionId == subId); - ndbrequire(subPtr.p->m_subscriptionKey == subKey); - subPtr.p->m_outstandingRequest = 0; - GrepRemoveConf * grepConf = (GrepRemoveConf *)conf; - grepConf->subscriptionKey = subKey; - grepConf->subscriptionId = subId; - grepConf->senderData = subPtr.p->m_operationPtrI; - grepConf->senderNodeId = getOwnNodeId(); - sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_REMOVE_CONF, signal, - GrepRemoveConf::SignalLength, JBB); - c_subscriptions.release(subPtr); - -} - - -/** - * SUB_REMOVE_CONF (from local SUMA) - */ -void -Grep::PSPart::execSUB_REMOVE_REF(Signal* signal) -{ - jamEntry(); - SubRemoveRef * const ref = (SubRemoveRef *)signal->getDataPtr(); - Uint32 subData = ref->subscriberData; - /* GrepError::GE_Code err = (GrepError::GE_Code)ref->err;*/ - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subData); - - //sendSubRemoveRef_PSCoord(signal, *subPtr.p, err /*error*/); -} - - -/** - * Aborting has been carried out (says Participants) - */ -void -Grep::PSCoord::execGREP_REMOVE_CONF(Signal* signal) -{ - jamEntry(); - GrepRemoveConf * const conf = (GrepRemoveConf *) signal->getDataPtr(); - Uint32 subId = conf->subscriptionId; - Uint32 subKey = conf->subscriptionKey; - Uint32 senderNodeId = conf->senderNodeId; - Uint32 subData = conf->senderData; - SubCoordinatorPtr subPtr; - c_subCoordinatorPool.getPtr(subPtr, subData); - - ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_REMOVE_REQ); - - subPtr.p->m_outstandingParticipants.clearWaitingFor(senderNodeId); - - if(!subPtr.p->m_outstandingParticipants.done()) { - jam(); - return; - } - jam(); - - /************************* - * All participants ready - *************************/ - - m_grep->sendEventRep(signal, - EventReport::GrepSubscriptionInfo, - GrepEvent::GrepPS_SubRemoveConf, - subId, subKey, - GrepError::GE_NO_ERROR); - - GrepSubRemoveConf * grepConf = (GrepSubRemoveConf *) conf; - grepConf->subscriptionId = subId; - grepConf->subscriptionKey = subKey; - sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_REMOVE_CONF, signal, - GrepSubRemoveConf::SignalLength, JBB); - - c_subCoordinatorPool.release(subPtr); -} - - - -void -Grep::PSCoord::execGREP_REMOVE_REF(Signal* signal) -{ - jamEntry(); - GrepRemoveRef * const ref = (GrepRemoveRef *)signal->getDataPtr(); - Uint32 subData = ref->senderData; - Uint32 err = ref->err; - SubCoordinatorPtr subPtr; - - /** - * Get the operationrecord matching subdata and remove it. Subsequent - * execGREP_REMOVE_REF will simply be ignored at this stage. - */ - for( c_runningSubscriptions.first(c_subPtr); - !c_subPtr.isNull(); c_runningSubscriptions.next(c_subPtr)) { - jam(); - subPtr.i = c_subPtr.curr.i; - subPtr.p = c_runningSubscriptions.getPtr(subPtr.i); - if(subData == subPtr.i) - { - sendRefToSS(signal, *subPtr.p, (GrepError::GE_Code)err /*error*/); - c_runningSubscriptions.release(subPtr); - return; - } - } - return; -} - - -/************************************************************************** - * ------------------------------------------------------------------------ - * MODULE: LOG RECORDS (COMING IN FROM LOCAL SUMA) - * ------------------------------------------------------------------------ - * - * After the subscription is started, we get log records from SUMA. - * Both table data and meta data log records are received. - * - * TODO: - * @todo Changes in meta data is currently not - * allowed during global replication - **************************************************************************/ - -void -Grep::PSPart::execSUB_META_DATA(Signal* signal) -{ - jamEntry(); - if(m_recoveryMode) { - jam(); - return; - } - /** - * METASCAN and METALOG - */ - SubMetaData * data = (SubMetaData *) signal->getDataPtrSend(); - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, data->subscriberData); - - /*************************** - * Forward data to REP node - ***************************/ - sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_META_DATA, signal, - SubMetaData::SignalLength, JBB); -#ifdef DEBUG_GREP_SUBSCRIPTION - ndbout_c("Grep::PSPart: Sent SUB_META_DATA to REP " - "(TableId: %d, SenderData: %d, GCI: %d)", - data->tableId, data->senderData, data->gci); -#endif -} - -/** - * Receive table data from SUMA and dispatches it to REP node. - */ -void -Grep::PSPart::execSUB_TABLE_DATA(Signal* signal) -{ - jamEntry(); - if(m_recoveryMode) { - jam(); - return; - } - ndbrequire(m_repRef!=0); - - if(!assembleFragments(signal)) { jam(); return; } - - /** - * Check if it is SCAN or LOG data that has arrived - */ - if(signal->getNoOfSections() == 2) - { - jam(); - /** - * DATASCAN - Not marked with GCI, so mark with latest seen GCI - */ - if(m_firstScanGCI == 1 && m_lastScanGCI == 0) { - m_firstScanGCI = m_latestSeenGCI; - m_lastScanGCI = m_latestSeenGCI; - } - SubTableData * data = (SubTableData*)signal->getDataPtrSend(); - Uint32 subData = data->senderData; - data->gci = m_latestSeenGCI; - data->logType = SubTableData::SCAN; - - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subData); - sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_TABLE_DATA, signal, - SubTableData::SignalLength, JBB); -#ifdef DEBUG_GREP - ndbout_c("Grep::PSPart: Sent SUB_TABLE_DATA (Scan, GCI: %d)", - data->gci); -#endif - } - else - { - jam(); - /** - * DATALOG (TRIGGER) - Already marked with GCI - */ - SubTableData * data = (SubTableData*)signal->getDataPtrSend(); - data->logType = SubTableData::LOG; - Uint32 subData = data->senderData; - if (data->gci > m_latestSeenGCI) m_latestSeenGCI = data->gci; - - // Reformat to sections and send to replication node. - LinearSectionPtr ptr[3]; - ptr[0].p = signal->theData + 25; - ptr[0].sz = data->noOfAttributes; - ptr[1].p = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE; - ptr[1].sz = data->dataSize; - - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subData); - sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_TABLE_DATA, - signal, SubTableData::SignalLength, JBB, ptr, 2); -#ifdef DEBUG_GREP - ndbout_c("Grep::PSPart: Sent SUB_TABLE_DATA (Log, GCI: %d)", - data->gci); -#endif - } -} - - -/************************************************************************** - * ------------------------------------------------------------------------ - * MODULE: START SYNCHRONIZATION - * ------------------------------------------------------------------------ - * - * - **************************************************************************/ - -/** - * Request to start sync (from Rep SS) - */ -void -Grep::PSCoord::execGREP_SUB_SYNC_REQ(Signal* signal) -{ - jamEntry(); - GrepSubSyncReq * const subReq = (GrepSubSyncReq*)signal->getDataPtr(); - SubscriptionData::Part part = (SubscriptionData::Part) subReq->part; - Uint32 subId = subReq->subscriptionId; - Uint32 subKey = subReq->subscriptionKey; - BlockReference rep = signal->getSendersBlockRef(); - - SubCoordinatorPtr subPtr; - if( !c_subCoordinatorPool.seize(subPtr)) { - jam(); - SubCoordinator sub; - sub.m_subscriberRef = rep; - sub.m_subscriptionId = 0; - sub.m_subscriptionKey = 0; - sub.m_outstandingRequest = GSN_GREP_SYNC_REQ; - sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL); - return; - } - - prepareOperationRec(subPtr, - numberToRef(PSREPBLOCKNO, refToNode(rep)), - subId, subKey, - GSN_GREP_SYNC_REQ); - - GrepSyncReq * req = (GrepSyncReq *)subReq; - req->subscriptionId = subPtr.p->m_subscriptionId; - req->subscriptionKey = subPtr.p->m_subscriptionKey; - req->senderData = subPtr.p->m_subscriberData; - req->part = (Uint32)part; - - /*************************** - * Send to all participants - ***************************/ - NodeReceiverGroup rg(GREP, m_grep->m_aliveNodes); - subPtr.p->m_outstandingParticipants = rg; - sendSignal(rg, - GSN_GREP_SYNC_REQ, signal, GrepSyncReq::SignalLength, JBB); -} - - -/** - * Sync req from Grep::PSCoord to PS particpant - */ -void -Grep::PSPart::execGREP_SYNC_REQ(Signal* signal) -{ - jamEntry(); - - GrepSyncReq * const grepReq = (GrepSyncReq *) signal->getDataPtr(); - Uint32 part = grepReq->part; - Uint32 subId = grepReq->subscriptionId; - Uint32 subKey = grepReq->subscriptionKey; - Uint32 subData = grepReq->senderData; - - Subscription key; - key.m_subscriptionId = subId; - key.m_subscriptionKey = subKey; - SubscriptionPtr subPtr; - ndbrequire(c_subscriptions.find(subPtr, key)); - subPtr.p->m_operationPtrI = subData; - subPtr.p->m_outstandingRequest = GSN_GREP_SYNC_REQ; - /********************************** - * Send SUB_SYNC_REQ to local SUMA - **********************************/ - SubSyncReq * sumaReq = (SubSyncReq *)grepReq; - sumaReq->subscriptionId = subId; - sumaReq->subscriptionKey = subKey; - sumaReq->subscriberData = subPtr.i; - sumaReq->part = part; - sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, signal, - SubSyncReq::SignalLength, JBB); -} - - -/** - * SYNC conf from SUMA - */ -void -Grep::PSPart::execSUB_SYNC_CONF(Signal* signal) -{ - jamEntry(); - - SubSyncConf * const conf = (SubSyncConf *) signal->getDataPtr(); - Uint32 part = conf->part; - Uint32 subId = conf->subscriptionId; - Uint32 subKey = conf->subscriptionKey; - Uint32 subData = conf->subscriberData; - - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subData); - - ndbrequire(subPtr.p->m_subscriptionId == subId); - ndbrequire(subPtr.p->m_subscriptionKey == subKey); - - GrepSyncConf * grepConf = (GrepSyncConf *)conf; - grepConf->senderNodeId = getOwnNodeId(); - grepConf->part = part; - grepConf->firstGCI = m_firstScanGCI; - grepConf->lastGCI = m_lastScanGCI; - grepConf->subscriptionId = subId; - grepConf->subscriptionKey = subKey; - grepConf->senderData = subPtr.p->m_operationPtrI; - sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_SYNC_CONF, signal, - GrepSyncConf::SignalLength, JBB); - - m_firstScanGCI = 1; - m_lastScanGCI = 0; - subPtr.p->m_outstandingRequest = 0; -} - -/** - * Handle errors that either occured in: - * 1) PSPart - * or - * 2) propagated from local SUMA - * - * Propagates REF signal to PSCoord - */ -void -Grep::PSPart::execSUB_SYNC_REF(Signal* signal) { - jamEntry(); - SubSyncRef * const ref = (SubSyncRef *)signal->getDataPtr(); - Uint32 subData = ref->subscriberData; - GrepError::GE_Code err = (GrepError::GE_Code)ref->err; - SubscriptionData::Part part = (SubscriptionData::Part)ref->part; - - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr, subData); - sendRefToPSCoord(signal, *subPtr.p, err /*error*/ ,part); - subPtr.p->m_outstandingRequest = 0; -} - -/** - * Syncing has started... (says PS Participant) - */ -void -Grep::PSCoord::execGREP_SYNC_CONF(Signal* signal) -{ - jamEntry(); - - GrepSyncConf const * conf = (GrepSyncConf *)signal->getDataPtr(); - Uint32 part = conf->part; - Uint32 firstGCI = conf->firstGCI; - Uint32 lastGCI = conf->lastGCI; - Uint32 subId = conf->subscriptionId; - Uint32 subKey = conf->subscriptionKey; - Uint32 subData = conf->senderData; - - SubCoordinatorPtr subPtr; - c_subCoordinatorPool.getPtr(subPtr, subData); - ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_SYNC_REQ); - - subPtr.p->m_outstandingParticipants.clearWaitingFor(conf->senderNodeId); - if(!subPtr.p->m_outstandingParticipants.done()) return; - - /** - * Send event - */ - GrepEvent::Subscription event; - if(part == SubscriptionData::MetaData) - event = GrepEvent::GrepPS_SubSyncMetaConf; - else - event = GrepEvent::GrepPS_SubSyncDataConf; - - /* @todo Johan: Add firstGCI here. /Lars */ - m_grep->sendEventRep(signal, EventReport::GrepSubscriptionInfo, - event, subId, subKey, - (Uint32)GrepError::GE_NO_ERROR, - lastGCI); - - /************************* - * All participants ready - *************************/ - GrepSubSyncConf * grepConf = (GrepSubSyncConf *)conf; - grepConf->part = part; - grepConf->firstGCI = firstGCI; - grepConf->lastGCI = lastGCI; - grepConf->subscriptionId = subId; - grepConf->subscriptionKey = subKey; - - sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_SYNC_CONF, signal, - GrepSubSyncConf::SignalLength, JBB); - c_subCoordinatorPool.release(subPtr); -} - -/** - * Handle errors that either occured in: - * 1) PSCoord - * or - * 2) propagated from PSPart - */ -void -Grep::PSCoord::execGREP_SYNC_REF(Signal* signal) { - jamEntry(); - GrepSyncRef * const ref = (GrepSyncRef *)signal->getDataPtr(); - Uint32 subData = ref->senderData; - SubscriptionData::Part part = (SubscriptionData::Part)ref->part; - GrepError::GE_Code err = (GrepError::GE_Code)ref->err; - SubCoordinatorPtr subPtr; - c_runningSubscriptions.getPtr(subPtr, subData); - sendRefToSS(signal, *subPtr.p, err /*error*/, part); -} - - - -void -Grep::PSCoord::sendRefToSS(Signal * signal, - SubCoordinator sub, - GrepError::GE_Code err, - SubscriptionData::Part part) { - /** - - GrepCreateRef * ref = (GrepCreateRef*)signal->getDataPtrSend(); - ref->senderData = sub.m_subscriberData; - ref->subscriptionId = sub.m_subscriptionId; - ref->subscriptionKey = sub.m_subscriptionKey; - ref->err = err; - sendSignal(sub.m_coordinatorRef, GSN_GREP_CREATE_REF, signal, - GrepCreateRef::SignalLength, JBB); -*/ - - jam(); - GrepEvent::Subscription event; - switch(sub.m_outstandingRequest) { - case GSN_GREP_CREATE_SUBID_REQ: - { - jam(); - CreateSubscriptionIdRef * ref = - (CreateSubscriptionIdRef*)signal->getDataPtrSend(); - ref->err = (Uint32)err; - ref->subscriptionId = sub.m_subscriptionId; - ref->subscriptionKey = sub.m_subscriptionKey; - sendSignal(sub.m_subscriberRef, - GSN_GREP_CREATE_SUBID_REF, - signal, - CreateSubscriptionIdRef::SignalLength, - JBB); - event = GrepEvent::GrepPS_CreateSubIdRef; - } - break; - case GSN_GREP_CREATE_REQ: - { - jam(); - GrepSubCreateRef * ref = (GrepSubCreateRef*)signal->getDataPtrSend(); - ref->err = (Uint32)err; - ref->subscriptionId = sub.m_subscriptionId; - ref->subscriptionKey = sub.m_subscriptionKey; - sendSignal(sub.m_subscriberRef, GSN_GREP_SUB_CREATE_REF, signal, - GrepSubCreateRef::SignalLength, JBB); - event = GrepEvent::GrepPS_SubCreateRef; - } - break; - case GSN_GREP_SYNC_REQ: - { - jam(); - GrepSubSyncRef * ref = (GrepSubSyncRef*)signal->getDataPtrSend(); - ref->err = (Uint32)err; - ref->subscriptionId = sub.m_subscriptionId; - ref->subscriptionKey = sub.m_subscriptionKey; - ref->part = (SubscriptionData::Part) part; - sendSignal(sub.m_subscriberRef, - GSN_GREP_SUB_SYNC_REF, - signal, - GrepSubSyncRef::SignalLength, - JBB); - if(part == SubscriptionData::MetaData) - event = GrepEvent::GrepPS_SubSyncMetaRef; - else - event = GrepEvent::GrepPS_SubSyncDataRef; - } - break; - case GSN_GREP_START_REQ: - { - jam(); - GrepSubStartRef * ref = (GrepSubStartRef*)signal->getDataPtrSend(); - ref->err = (Uint32)err; - ref->subscriptionId = sub.m_subscriptionId; - ref->subscriptionKey = sub.m_subscriptionKey; - - sendSignal(sub.m_subscriberRef, GSN_GREP_SUB_START_REF, - signal, GrepSubStartRef::SignalLength, JBB); - if(part == SubscriptionData::MetaData) - event = GrepEvent::GrepPS_SubStartMetaRef; - else - event = GrepEvent::GrepPS_SubStartDataRef; - /** - * Send event report - */ - m_grep->sendEventRep(signal, - EventReport::GrepSubscriptionAlert, - event, - sub.m_subscriptionId, - sub.m_subscriptionKey, - (Uint32)err); - } - break; - case GSN_GREP_REMOVE_REQ: - { - jam(); - GrepSubRemoveRef * ref = (GrepSubRemoveRef*)signal->getDataPtrSend(); - ref->subscriptionId = sub.m_subscriptionId; - ref->subscriptionKey = sub.m_subscriptionKey; - ref->err = (Uint32)err; - - sendSignal(sub.m_subscriberRef, - GSN_GREP_SUB_REMOVE_REF, - signal, - GrepSubRemoveRef::SignalLength, - JBB); - - event = GrepEvent::GrepPS_SubRemoveRef; - } - break; - default: - ndbrequire(false); - event= GrepEvent::Rep_Disconnect; // remove compiler warning - } - /** - * Finally, send an event. - */ - m_grep->sendEventRep(signal, - EventReport::GrepSubscriptionAlert, - event, - sub.m_subscriptionId, - sub.m_subscriptionKey, - err); - -} - - -void -Grep::PSPart::sendRefToPSCoord(Signal * signal, - Subscription sub, - GrepError::GE_Code err, - SubscriptionData::Part part) { - - jam(); - GrepEvent::Subscription event; - switch(sub.m_outstandingRequest) { - - case GSN_GREP_CREATE_REQ: - { - GrepCreateRef * ref = (GrepCreateRef*)signal->getDataPtrSend(); - ref->senderData = sub.m_subscriberData; - ref->subscriptionId = sub.m_subscriptionId; - ref->subscriptionKey = sub.m_subscriptionKey; - ref->err = err; - sendSignal(sub.m_coordinatorRef, GSN_GREP_CREATE_REF, signal, - GrepCreateRef::SignalLength, JBB); - - event = GrepEvent::GrepPS_SubCreateRef; - } - break; - case GSN_GREP_SYNC_REQ: - { - GrepSyncRef * ref = (GrepSyncRef*)signal->getDataPtrSend(); - ref->senderData = sub.m_subscriberData; - ref->subscriptionId = sub.m_subscriptionId; - ref->subscriptionKey = sub.m_subscriptionKey; - ref->part = part; - ref->err = err; - sendSignal(sub.m_coordinatorRef, - GSN_GREP_SYNC_REF, signal, - GrepSyncRef::SignalLength, JBB); - if(part == SubscriptionData::MetaData) - event = GrepEvent::GrepPS_SubSyncMetaRef; - else - event = GrepEvent::GrepPS_SubSyncDataRef; - } - break; - case GSN_GREP_START_REQ: - { - jam(); - GrepStartRef * ref = (GrepStartRef*)signal->getDataPtrSend(); - ref->senderData = sub.m_subscriberData; - ref->subscriptionId = sub.m_subscriptionId; - ref->subscriptionKey = sub.m_subscriptionKey; - ref->part = (Uint32) part; - ref->err = err; - sendSignal(sub.m_coordinatorRef, GSN_GREP_START_REF, signal, - GrepStartRef::SignalLength, JBB); - if(part == SubscriptionData::MetaData) - event = GrepEvent::GrepPS_SubStartMetaRef; - else - event = GrepEvent::GrepPS_SubStartDataRef; - } - break; - - case GSN_GREP_REMOVE_REQ: - { - jamEntry(); - GrepRemoveRef * ref = (GrepRemoveRef*)signal->getDataPtrSend(); - ref->senderData = sub.m_operationPtrI; - ref->subscriptionId = sub.m_subscriptionId; - ref->subscriptionKey = sub.m_subscriptionKey; - ref->err = err; - sendSignal(sub.m_coordinatorRef, GSN_GREP_REMOVE_REF, signal, - GrepCreateRef::SignalLength, JBB); - - } - break; - default: - ndbrequire(false); - event= GrepEvent::Rep_Disconnect; // remove compiler warning - } - - /** - * Finally, send an event. - */ - m_grep->sendEventRep(signal, - EventReport::GrepSubscriptionAlert, - event, - sub.m_subscriptionId, - sub.m_subscriptionKey, - err); - -} - -/************************************************************************** - * ------------------------------------------------------------------------ - * MODULE: GREP PS Coordinator GCP - * ------------------------------------------------------------------------ - * - * - **************************************************************************/ - -void -Grep::PSPart::execSUB_GCP_COMPLETE_REP(Signal* signal) -{ - jamEntry(); - if(m_recoveryMode) { - jam(); - return; - } - SubGcpCompleteRep * rep = (SubGcpCompleteRep *)signal->getDataPtrSend(); - rep->senderRef = reference(); - - if (rep->gci > m_latestSeenGCI) m_latestSeenGCI = rep->gci; - SubscriptionPtr subPtr; - c_subscriptions.first(c_subPtr); - for(; !c_subPtr.isNull(); c_subscriptions.next(c_subPtr)) { - - subPtr.i = c_subPtr.curr.i; - subPtr.p = c_subscriptions.getPtr(subPtr.i); - sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_GCP_COMPLETE_REP, signal, - SubGcpCompleteRep::SignalLength, JBB); - } - -#ifdef DEBUG_GREP - ndbout_c("Grep::PSPart: Recd SUB_GCP_COMPLETE_REP " - "(GCI: %d, nodeId: %d) from SUMA", - rep->gci, refToNode(rep->senderRef)); -#endif -} - - -void -Grep::PSPart::execSUB_SYNC_CONTINUE_REQ(Signal* signal) -{ - jamEntry(); - SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtr(); - Uint32 subData = req->subscriberData; - - SubscriptionPtr subPtr; - c_subscriptions.getPtr(subPtr,subData); - - /** - * @todo Figure out how to control how much data we can receive? - */ - SubSyncContinueConf * conf = (SubSyncContinueConf*)req; - conf->subscriptionId = subPtr.p->m_subscriptionId; - conf->subscriptionKey = subPtr.p->m_subscriptionKey; - sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal, - SubSyncContinueConf::SignalLength, JBB); -} - -void -Grep::sendEventRep(Signal * signal, - EventReport::EventType type, - GrepEvent::Subscription event, - Uint32 subId, - Uint32 subKey, - Uint32 err, - Uint32 other) { - jam(); - signal->theData[0] = type; - signal->theData[1] = event; - signal->theData[2] = subId; - signal->theData[3] = subKey; - signal->theData[4] = err; - - if(other==0) - sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5 ,JBB); - else { - signal->theData[5] = other; - sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 6 ,JBB); - } -} diff --git a/ndb/src/kernel/blocks/grep/Grep.hpp b/ndb/src/kernel/blocks/grep/Grep.hpp deleted file mode 100644 index 7d3dd916ecc..00000000000 --- a/ndb/src/kernel/blocks/grep/Grep.hpp +++ /dev/null @@ -1,535 +0,0 @@ -/* Copyright (C) 2003 MySQL AB - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#ifndef GREP_HPP -#define GREP_HPP - -#include <ndb_limits.h> -#include <SimulatedBlock.hpp> - -#include <NodeBitmask.hpp> -#include <SignalCounter.hpp> -#include <SLList.hpp> - -#include <DLList.hpp> - -#include <GrepError.hpp> -#include <GrepEvent.hpp> - -#include <signaldata/EventReport.hpp> -#include <signaldata/SumaImpl.hpp> - - -/** - * Module in block (Should be placed elsewhere) - */ -class BlockComponent { -public: - BlockComponent(SimulatedBlock *); - BlockReference reference() { return m_sb->reference(); }; - BlockNumber number() { return m_sb->number(); }; - - void sendSignal(NodeReceiverGroup rg, - GlobalSignalNumber gsn, - Signal* signal, - Uint32 length, - JobBufferLevel jbuf ) const { - m_sb->sendSignal(rg, gsn, signal, length, jbuf); - } - - void sendSignal(BlockReference ref, - GlobalSignalNumber gsn, - Signal* signal, - Uint32 length, - JobBufferLevel jbuf ) const { - m_sb->sendSignal(ref, gsn, signal, length, jbuf); - } - - void sendSignal(BlockReference ref, - GlobalSignalNumber gsn, - Signal* signal, - Uint32 length, - JobBufferLevel jbuf, - LinearSectionPtr ptr[3], - Uint32 noOfSections) const { - m_sb->sendSignal(ref, gsn, signal, length, jbuf, ptr, noOfSections); - } - - void sendSignalWithDelay(BlockReference ref, - GlobalSignalNumber gsn, - Signal* signal, - Uint32 delayInMilliSeconds, - Uint32 length) const { - - m_sb->sendSignalWithDelay(ref, gsn, signal, delayInMilliSeconds, length); - } - - NodeId getOwnNodeId() const { - return m_sb->getOwnNodeId(); - } - - bool assembleFragments(Signal * signal) { - return m_sb->assembleFragments(signal); - } - - void progError(int line, int err_code, const char* extra) { - m_sb->progError(line, err_code, extra); - } - -private: - SimulatedBlock * m_sb; -}; - - - -/** - * Participant of GREP Protocols (not necessarily a protocol coordinator) - * - * This object is only used on primary system - */ -#if 0 -class GrepParticipant : public SimulatedBlock -{ -protected: - GrepParticipant(const Configuration & conf); - virtual ~GrepParticipant(); - BLOCK_DEFINES(GrepParticipant); - -protected: - /*************************************************************************** - * SUMA Signal Interface - ***************************************************************************/ - void execSUB_CREATE_CONF(Signal*); - void execSUB_STARTCONF(Signal*); - void execSUB_REMOVE_CONF(Signal*); - - void execSUB_META_DATA(Signal*); - void execSUB_TABLE_DATA(Signal*); - - void execSUB_SYNC_CONF(Signal*); - - void execSUB_GCP_COMPLETE_REP(Signal*); - void execSUB_SYNC_CONTINUE_REQ(Signal*); - - /*************************************************************************** - * GREP Coordinator Signal Interface - ***************************************************************************/ - void execGREP_CREATE_REQ(Signal*); - void execGREP_START_REQ(Signal*); - void execGREP_SYNC_REQ(Signal*); - void execGREP_REMOVE_REQ(Signal*); - - -protected: - BlockReference m_repRef; ///< Replication node (only one rep node per grep) - -private: - BlockReference m_coordinator; - Uint32 m_latestSeenGCI; -}; -#endif - - -/** - * GREP Coordinator - */ -class Grep : public SimulatedBlock //GrepParticipant -{ - BLOCK_DEFINES(Grep); - -public: - Grep(const Configuration & conf); - virtual ~Grep(); - -private: - /*************************************************************************** - * General Signal Recivers - ***************************************************************************/ - void execSTTOR(Signal*); - void sendSTTORRY(Signal*); - void execNDB_STTOR(Signal*); - void execDUMP_STATE_ORD(Signal*); - void execREAD_NODESCONF(Signal*); - void execNODE_FAILREP(Signal*); - void execINCL_NODEREQ(Signal*); - void execGREP_REQ(Signal*); - void execAPI_FAILREQ(Signal*); - /** - * Forwarded to PSCoord - */ - //CONF - void fwdGREP_CREATE_CONF(Signal* s) { - pscoord.execGREP_CREATE_CONF(s); }; - void fwdGREP_START_CONF(Signal* s) { - pscoord.execGREP_START_CONF(s); }; - void fwdGREP_SYNC_CONF(Signal* s) { - pscoord.execGREP_SYNC_CONF(s); }; - void fwdGREP_REMOVE_CONF(Signal* s) { - pscoord.execGREP_REMOVE_CONF(s); }; - void fwdCREATE_SUBID_CONF(Signal* s) { - pscoord.execCREATE_SUBID_CONF(s); }; - - //REF - - void fwdGREP_CREATE_REF(Signal* s) { - pscoord.execGREP_CREATE_REF(s); }; - void fwdGREP_START_REF(Signal* s) { - pscoord.execGREP_START_REF(s); }; - void fwdGREP_SYNC_REF(Signal* s) { - pscoord.execGREP_SYNC_REF(s); }; - - void fwdGREP_REMOVE_REF(Signal* s) { - pscoord.execGREP_REMOVE_REF(s); }; - - void fwdCREATE_SUBID_REF(Signal* s) { - pscoord.execCREATE_SUBID_REF(s); }; - - //REQ - void fwdGREP_SUB_CREATE_REQ(Signal* s) { - pscoord.execGREP_SUB_CREATE_REQ(s); }; - void fwdGREP_SUB_START_REQ(Signal* s) { - pscoord.execGREP_SUB_START_REQ(s); }; - void fwdGREP_SUB_SYNC_REQ(Signal* s) { - pscoord.execGREP_SUB_SYNC_REQ(s); }; - void fwdGREP_SUB_REMOVE_REQ(Signal* s) { - pscoord.execGREP_SUB_REMOVE_REQ(s); }; - void fwdGREP_CREATE_SUBID_REQ(Signal* s) { - pscoord.execGREP_CREATE_SUBID_REQ(s); }; - - /** - * Forwarded to PSPart - */ - - void fwdSTART_ME(Signal* s){ - pspart.execSTART_ME(s); - }; - void fwdGREP_ADD_SUB_REQ(Signal* s){ - pspart.execGREP_ADD_SUB_REQ(s); - }; - void fwdGREP_ADD_SUB_REF(Signal* s){ - pspart.execGREP_ADD_SUB_REF(s); - }; - void fwdGREP_ADD_SUB_CONF(Signal* s){ - pspart.execGREP_ADD_SUB_CONF(s); - }; - - //CONF - void fwdSUB_CREATE_CONF(Signal* s) { - pspart.execSUB_CREATE_CONF(s); }; - void fwdSUB_START_CONF(Signal* s) { - pspart.execSUB_START_CONF(s); }; - void fwdSUB_REMOVE_CONF(Signal* s) { - pspart.execSUB_REMOVE_CONF(s); }; - void fwdSUB_SYNC_CONF(Signal* s) { - pspart.execSUB_SYNC_CONF(s); }; - - //REF - - void fwdSUB_CREATE_REF(Signal* s) { - pspart.execSUB_CREATE_REF(s); }; - void fwdSUB_START_REF(Signal* s) { - pspart.execSUB_START_REF(s); }; - void fwdSUB_REMOVE_REF(Signal* s) { - pspart.execSUB_REMOVE_REF(s); }; - void fwdSUB_SYNC_REF(Signal* s) { - pspart.execSUB_SYNC_REF(s); }; - - //REQ - void fwdSUB_SYNC_CONTINUE_REQ(Signal* s) { - pspart.execSUB_SYNC_CONTINUE_REQ(s); }; - void fwdGREP_CREATE_REQ(Signal* s) { - pspart.execGREP_CREATE_REQ(s); }; - void fwdGREP_START_REQ(Signal* s) { - pspart.execGREP_START_REQ(s); }; - void fwdGREP_SYNC_REQ(Signal* s) { - pspart.execGREP_SYNC_REQ(s); }; - void fwdGREP_REMOVE_REQ(Signal* s) { - pspart.execGREP_REMOVE_REQ(s); }; - - void fwdSUB_META_DATA(Signal* s) { - pspart.execSUB_META_DATA(s); }; - void fwdSUB_TABLE_DATA(Signal* s) { - pspart.execSUB_TABLE_DATA(s); }; - - void fwdSUB_GCP_COMPLETE_REP(Signal* s) { - pspart.execSUB_GCP_COMPLETE_REP(s); }; - - void sendEventRep(Signal * signal, - EventReport::EventType type, - GrepEvent::Subscription event, - Uint32 subId, - Uint32 subKey, - Uint32 err, - Uint32 gci=0); - - void getNodeGroupMembers(Signal* signal); - - - /*************************************************************************** - * Block Data - ***************************************************************************/ - struct Node { - Uint32 nodeId; - Uint32 alive; - Uint32 nextList; - union { Uint32 prevList; Uint32 nextPool; }; - }; - typedef Ptr<Node> NodePtr; - - NodeId m_masterNodeId; - SLList<Node> m_nodes; - NdbNodeBitmask m_aliveNodes; - ArrayPool<Node> m_nodePool; - - /** - * for all Suma's to keep track of other Suma's in Node group - */ - Uint32 c_nodeGroup; - Uint32 c_noNodesInGroup; - Uint32 c_idInNodeGroup; - NodeId c_nodesInGroup[4]; - - -public: - /*************************************************************************** - * GREP PS Coordinator - ***************************************************************************/ - class PSCoord : public BlockComponent { - - private: - - struct SubCoordinator { - Uint32 m_subscriberRef; - Uint32 m_subscriberData; - Uint32 m_coordinatorRef; - Uint32 m_subscriptionId; - Uint32 m_subscriptionKey; - Uint32 m_subscriptionType; - NdbNodeBitmask m_participants; - Uint32 m_outstandingRequest; - SignalCounter m_outstandingParticipants; - - Uint32 nextHash; - union { Uint32 prevHash; Uint32 nextPool; }; - - Uint32 hashValue() const { - return m_subscriptionId + m_subscriptionKey; - } - - bool equal(const SubCoordinator & s) const { - return - m_subscriptionId == s.m_subscriptionId && - m_subscriptionKey == s.m_subscriptionKey; - } - - }; - - typedef Ptr<SubCoordinator> SubCoordinatorPtr; - ArrayPool<SubCoordinator> c_subCoordinatorPool; - DLHashTable<SubCoordinator>::Iterator c_subPtr; - DLHashTable<SubCoordinator> c_runningSubscriptions; - - void prepareOperationRec(SubCoordinatorPtr ptr, - BlockReference subscriber, - Uint32 subId, - Uint32 subKey, - Uint32 request); - - public: - PSCoord(class Grep *); - - void execGREP_CREATE_CONF(Signal*); - void execGREP_START_CONF(Signal*); - void execGREP_SYNC_CONF(Signal*); - void execGREP_REMOVE_CONF(Signal*); - - void execGREP_CREATE_REF(Signal*); - void execGREP_START_REF(Signal*); - void execGREP_SYNC_REF(Signal*); - void execGREP_REMOVE_REF(Signal*); - - - void execCREATE_SUBID_CONF(Signal*); //comes from SUMA - void execGREP_CREATE_SUBID_REQ(Signal*); - - void execGREP_SUB_CREATE_REQ(Signal*); - void execGREP_SUB_START_REQ(Signal*); - void execGREP_SUB_SYNC_REQ(Signal*); - void execGREP_SUB_REMOVE_REQ(Signal*); - - - - void execCREATE_SUBID_REF(Signal*); - - - - void sendCreateSubIdRef_SS(Signal * signal, - Uint32 subId, - Uint32 subKey, - BlockReference to, - GrepError::GE_Code err); - - - void sendSubRemoveRef_SS(Signal * signal, - SubCoordinator sub, - GrepError::GE_Code err); - - void sendRefToSS(Signal * signal, - SubCoordinator sub, - GrepError::GE_Code err, - SubscriptionData::Part part = (SubscriptionData::Part)0); - - void setRepRef(BlockReference rr) { m_repRef = rr; }; - //void setAliveNodes(NdbNodeBitmask an) { m_aliveNodes = an; }; - - BlockReference m_repRef; ///< Rep node (only one rep node per grep) - // NdbNodeBitmask m_aliveNodes; - - Uint32 m_outstandingRequest; - SignalCounter m_outstandingParticipants; - - Grep * m_grep; - } pscoord; - friend class PSCoord; - - /*************************************************************************** - * GREP PS Participant - *************************************************************************** - * Participant of GREP Protocols (not necessarily a protocol coordinator) - * - * This object is only used on primary system - ***************************************************************************/ - class PSPart: public BlockComponent - { - //protected: - //GrepParticipant(const Configuration & conf); - //virtual ~GrepParticipant(); - //BLOCK_DEFINES(GrepParticipant); - - struct Subscription { - Uint32 m_subscriberRef; - Uint32 m_subscriberData; - Uint32 m_subscriptionId; - Uint32 m_subscriptionKey; - Uint32 m_subscriptionType; - Uint32 m_coordinatorRef; - Uint32 m_outstandingRequest; - Uint32 m_operationPtrI; - Uint32 nextHash; - union { Uint32 prevHash; Uint32 nextPool; }; - - Uint32 hashValue() const { - return m_subscriptionId + m_subscriptionKey; - } - - bool equal(const Subscription & s) const { - return - m_subscriptionId == s.m_subscriptionId && - m_subscriptionKey == s.m_subscriptionKey; - } - - }; - typedef Ptr<Subscription> SubscriptionPtr; - - DLHashTable<Subscription> c_subscriptions; - DLHashTable<Subscription>::Iterator c_subPtr; - ArrayPool<Subscription> c_subscriptionPool; - - public: - PSPart(class Grep *); - - - //protected: - /************************************************************************* - * SUMA Signal Interface - *************************************************************************/ - void execSUB_CREATE_CONF(Signal*); - void execSUB_START_CONF(Signal*); - void execSUB_SYNC_CONF(Signal*); - void execSUB_REMOVE_CONF(Signal*); - - void execSUB_CREATE_REF(Signal*); - void execSUB_START_REF(Signal*); - void execSUB_SYNC_REF(Signal*); - void execSUB_REMOVE_REF(Signal*); - - - void execSUB_META_DATA(Signal*); - void execSUB_TABLE_DATA(Signal*); - - - void execSUB_GCP_COMPLETE_REP(Signal*); - void execSUB_SYNC_CONTINUE_REQ(Signal*); - - /************************************************************************* - * GREP Coordinator Signal Interface - *************************************************************************/ - void execGREP_CREATE_REQ(Signal*); - void execGREP_START_REQ(Signal*); - void execGREP_SYNC_REQ(Signal*); - void execGREP_REMOVE_REQ(Signal*); - - /** - * NR/NF signals - */ - void execSTART_ME(Signal *); - void execGREP_ADD_SUB_REQ(Signal *); - void execGREP_ADD_SUB_REF(Signal *); - void execGREP_ADD_SUB_CONF(Signal *); - - /************************************************************************* - * GREP Coordinator error handling interface - *************************************************************************/ - - void sendRefToPSCoord(Signal * signal, - Subscription sub, - GrepError::GE_Code err, - SubscriptionData::Part part = (SubscriptionData::Part)0); - - //protected: - BlockReference m_repRef; ///< Replication node - ///< (only one rep node per grep) - bool m_recoveryMode; - - private: - BlockReference m_coordinator; - Uint32 m_firstScanGCI; - Uint32 m_lastScanGCI; - Uint32 m_latestSeenGCI; - Grep * m_grep; - } pspart; - friend class PSPart; - - /*************************************************************************** - * AddRecSignal Stuff (should maybe be gerneralized) - ***************************************************************************/ - typedef void (Grep::* ExecSignalLocal1) (Signal* signal); - typedef void (Grep::PSCoord::* ExecSignalLocal2) (Signal* signal); - typedef void (Grep::PSPart::* ExecSignalLocal4) (Signal* signal); -}; - - -/************************************************************************* - * Requestor - * - * The following methods are callbacks (registered functions) - * for the Requestor. The Requestor calls these when it needs - * something to be done. - *************************************************************************/ -void startSubscription(void * cbObj, Signal*, int type); -void scanSubscription(void * cbObj, Signal*, int type); - -#endif diff --git a/ndb/src/kernel/blocks/grep/GrepInit.cpp b/ndb/src/kernel/blocks/grep/GrepInit.cpp deleted file mode 100644 index d764fb1f473..00000000000 --- a/ndb/src/kernel/blocks/grep/GrepInit.cpp +++ /dev/null @@ -1,164 +0,0 @@ -/* Copyright (C) 2003 MySQL AB - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include "Grep.hpp" -#include <Properties.hpp> -#include <Configuration.hpp> - -/***************************************************************************** - * Grep Participant - *****************************************************************************/ -#if 0 -GrepParticipant::GrepParticipant(const Configuration & conf) : - SimulatedBlock(GREP, conf) -{ - BLOCK_CONSTRUCTOR(Grep); - //m_repRef = 0; - m_latestSeenGCI = 0; -} - -GrepParticipant::~GrepParticipant() -{ -} - -BLOCK_FUNCTIONS(GrepParticipant); -#endif - -/***************************************************************************** - * Grep Coordinator - *****************************************************************************/ -Grep::Grep(const Configuration & conf) : - // GrepParticipant(conf), - SimulatedBlock(GREP, conf), - m_nodes(m_nodePool), - pscoord(this), - pspart(this) -{ - m_nodePool.setSize(MAX_NDB_NODES); - m_masterNodeId = getOwnNodeId(); - - /*************************************************************************** - * General Signals - ***************************************************************************/ - addRecSignal(GSN_STTOR, &Grep::execSTTOR); - addRecSignal(GSN_NDB_STTOR, &Grep::execNDB_STTOR); - addRecSignal(GSN_DUMP_STATE_ORD, &Grep::execDUMP_STATE_ORD); - addRecSignal(GSN_READ_NODESCONF, &Grep::execREAD_NODESCONF); - addRecSignal(GSN_NODE_FAILREP, &Grep::execNODE_FAILREP); - addRecSignal(GSN_INCL_NODEREQ, &Grep::execINCL_NODEREQ); - - addRecSignal(GSN_GREP_REQ, &Grep::execGREP_REQ); - addRecSignal(GSN_API_FAILREQ, &Grep::execAPI_FAILREQ); - - - /*************************************************************************** - * Grep::PSCoord Signal Interface - ***************************************************************************/ - /** - * From Grep::PSPart - */ - addRecSignal(GSN_GREP_CREATE_CONF, &Grep::fwdGREP_CREATE_CONF); - addRecSignal(GSN_GREP_START_CONF, &Grep::fwdGREP_START_CONF); - addRecSignal(GSN_GREP_SYNC_CONF, &Grep::fwdGREP_SYNC_CONF); - addRecSignal(GSN_GREP_REMOVE_CONF, &Grep::fwdGREP_REMOVE_CONF); - - addRecSignal(GSN_GREP_CREATE_REF, &Grep::fwdGREP_CREATE_REF); - addRecSignal(GSN_GREP_START_REF, &Grep::fwdGREP_START_REF); - addRecSignal(GSN_GREP_REMOVE_REF, &Grep::fwdGREP_REMOVE_REF); - - /** - * From Grep::SSCoord to Grep::PSCoord - */ - addRecSignal(GSN_GREP_SUB_START_REQ, &Grep::fwdGREP_SUB_START_REQ); - addRecSignal(GSN_GREP_SUB_CREATE_REQ, &Grep::fwdGREP_SUB_CREATE_REQ); - addRecSignal(GSN_GREP_SUB_SYNC_REQ, &Grep::fwdGREP_SUB_SYNC_REQ); - addRecSignal(GSN_GREP_SUB_REMOVE_REQ, &Grep::fwdGREP_SUB_REMOVE_REQ); - addRecSignal(GSN_GREP_CREATE_SUBID_REQ, &Grep::fwdGREP_CREATE_SUBID_REQ); - - /**************************************************************************** - * PSPart - ***************************************************************************/ - /** - * From SUMA to GREP PS Participant. If suma is not a coodinator - */ - addRecSignal(GSN_SUB_START_CONF, &Grep::fwdSUB_START_CONF); - addRecSignal(GSN_SUB_CREATE_CONF, &Grep::fwdSUB_CREATE_CONF); - addRecSignal(GSN_SUB_SYNC_CONF, &Grep::fwdSUB_SYNC_CONF); - addRecSignal(GSN_SUB_REMOVE_CONF, &Grep::fwdSUB_REMOVE_CONF); - addRecSignal(GSN_SUB_CREATE_REF, &Grep::fwdSUB_CREATE_REF); - addRecSignal(GSN_SUB_START_REF, &Grep::fwdSUB_START_REF); - addRecSignal(GSN_SUB_SYNC_REF, &Grep::fwdSUB_SYNC_REF); - addRecSignal(GSN_SUB_REMOVE_REF, &Grep::fwdSUB_REMOVE_REF); - - addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, - &Grep::fwdSUB_SYNC_CONTINUE_REQ); - - /** - * From Suma to Grep::PSPart. Data signals. - */ - addRecSignal(GSN_SUB_META_DATA, &Grep::fwdSUB_META_DATA); - addRecSignal(GSN_SUB_TABLE_DATA, &Grep::fwdSUB_TABLE_DATA); - addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &Grep::fwdSUB_GCP_COMPLETE_REP); - - /** - * From Grep::PSCoord to Grep::PSPart - */ - addRecSignal(GSN_GREP_CREATE_REQ, &Grep::fwdGREP_CREATE_REQ); - addRecSignal(GSN_GREP_START_REQ, &Grep::fwdGREP_START_REQ); - addRecSignal(GSN_GREP_REMOVE_REQ, &Grep::fwdGREP_REMOVE_REQ); - addRecSignal(GSN_GREP_SYNC_REQ, &Grep::fwdGREP_SYNC_REQ); - addRecSignal(GSN_CREATE_SUBID_CONF, &Grep::fwdCREATE_SUBID_CONF); - addRecSignal(GSN_GREP_START_ME, &Grep::fwdSTART_ME); - addRecSignal(GSN_GREP_ADD_SUB_REQ, &Grep::fwdGREP_ADD_SUB_REQ); - addRecSignal(GSN_GREP_ADD_SUB_REF, &Grep::fwdGREP_ADD_SUB_REF); - addRecSignal(GSN_GREP_ADD_SUB_CONF, &Grep::fwdGREP_ADD_SUB_CONF); -} - -Grep::~Grep() -{ -} - -BLOCK_FUNCTIONS(Grep) - -Grep::PSPart::PSPart(Grep * sb) : - BlockComponent(sb), - c_subscriptions(c_subscriptionPool) -{ - m_grep = sb; - - m_firstScanGCI = 1; // Empty interval = [1,0] - m_lastScanGCI = 0; - - m_latestSeenGCI = 0; - - c_subscriptions.setSize(10); - c_subscriptionPool.setSize(10); -} - -Grep::PSCoord::PSCoord(Grep * sb) : - BlockComponent(sb), - c_runningSubscriptions(c_subCoordinatorPool) -{ - m_grep = sb; - c_runningSubscriptions.setSize(10); - c_subCoordinatorPool.setSize(2); -} - -//BLOCK_FUNCTIONS(Grep::PSCoord); - -BlockComponent::BlockComponent(SimulatedBlock * sb) { - m_sb = sb; -} diff --git a/ndb/src/kernel/blocks/grep/Makefile.am b/ndb/src/kernel/blocks/grep/Makefile.am deleted file mode 100644 index 6d2b422784b..00000000000 --- a/ndb/src/kernel/blocks/grep/Makefile.am +++ /dev/null @@ -1,23 +0,0 @@ -noinst_LIBRARIES = libgrep.a - -libgrep_a_SOURCES = Grep.cpp GrepInit.cpp - -include $(top_srcdir)/ndb/config/common.mk.am -include $(top_srcdir)/ndb/config/type_kernel.mk.am - -# Don't update the files from bitkeeper -%::SCCS/s.% - -windoze-dsp: libgrep.dsp - -libgrep.dsp: Makefile \ - $(top_srcdir)/ndb/config/win-lib.am \ - $(top_srcdir)/ndb/config/win-name \ - $(top_srcdir)/ndb/config/win-includes \ - $(top_srcdir)/ndb/config/win-sources \ - $(top_srcdir)/ndb/config/win-libraries - cat $(top_srcdir)/ndb/config/win-lib.am > $@ - @$(top_srcdir)/ndb/config/win-name $@ $(noinst_LIBRARIES) - @$(top_srcdir)/ndb/config/win-includes $@ $(INCLUDES) - @$(top_srcdir)/ndb/config/win-sources $@ $(libgrep_a_SOURCES) - @$(top_srcdir)/ndb/config/win-libraries $@ LIB $(LDADD) diff --git a/ndb/src/kernel/blocks/grep/systab_test/Makefile b/ndb/src/kernel/blocks/grep/systab_test/Makefile deleted file mode 100644 index bd69e0f3799..00000000000 --- a/ndb/src/kernel/blocks/grep/systab_test/Makefile +++ /dev/null @@ -1,12 +0,0 @@ -include .defs.mk - -TYPE := kernel - -BIN_TARGET := grep_systab_test -BIN_TARGET_ARCHIVES := portlib general - -CCFLAGS_LOC += -I.. - -SOURCES = ../GrepSystemTable.cpp grep_systab_test.cpp - -include $(NDB_TOP)/Epilogue.mk diff --git a/ndb/src/kernel/blocks/grep/systab_test/grep_systab_test.cpp b/ndb/src/kernel/blocks/grep/systab_test/grep_systab_test.cpp deleted file mode 100644 index e3a77af4e4e..00000000000 --- a/ndb/src/kernel/blocks/grep/systab_test/grep_systab_test.cpp +++ /dev/null @@ -1,138 +0,0 @@ -/* Copyright (C) 2003 MySQL AB - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -/** - * Unit Test for GrepSystemTable - */ - -#include "../GrepSystemTable.hpp" -#include <SimulatedBlock.hpp> - -#define EXEC(X) ( ndbout << endl, ndbout_c(#X), X ) - -int -main () { - GrepSystemTable st; - - Uint32 f, l; - - ndbout_c("*************************************"); - ndbout_c("* GrepSystemTable Unit Test Program *"); - ndbout_c("*************************************"); - - ndbout_c("--------------------------------------------------------"); - ndbout_c("Test 1: Clear"); - ndbout_c("--------------------------------------------------------"); - - EXEC(st.set(GrepSystemTable::PS, 22, 26)); - st.print(); - st.require(GrepSystemTable::PS, 22, 26); - - EXEC(st.clear(GrepSystemTable::PS, 20, 24)); - st.print(); - st.require(GrepSystemTable::PS, 25, 26); - - EXEC(st.clear(GrepSystemTable::PS, 0, 100)); - st.print(); - st.require(GrepSystemTable::PS, 1, 0); - - EXEC(st.set(GrepSystemTable::PS, 22, 26)); - st.print(); - st.require(GrepSystemTable::PS, 22, 26); - - EXEC(st.clear(GrepSystemTable::PS, 24, 28)); - st.print(); - st.require(GrepSystemTable::PS, 22, 23); - - EXEC(st.clear(GrepSystemTable::PS, 0, 100)); - st.print(); - st.require(GrepSystemTable::PS, 1, 0); - - EXEC(st.set(GrepSystemTable::PS, 22, 26)); - st.print(); - st.require(GrepSystemTable::PS, 22, 26); - - EXEC(st.clear(GrepSystemTable::PS, 24, 26)); - st.print(); - st.require(GrepSystemTable::PS, 22, 23); - - EXEC(st.clear(GrepSystemTable::PS, 0, 100)); - st.print(); - st.require(GrepSystemTable::PS, 1, 0); - - EXEC(st.set(GrepSystemTable::PS, 22, 26)); - st.print(); - st.require(GrepSystemTable::PS, 22, 26); - - EXEC(st.clear(GrepSystemTable::PS, 22, 24)); - st.print(); - st.require(GrepSystemTable::PS, 25, 26); - - ndbout_c("--------------------------------------------------------"); - ndbout_c("Test 2: PS --> SSreq"); - ndbout_c("--------------------------------------------------------"); - - EXEC(st.set(GrepSystemTable::PS, 22, 26)); - st.print(); - st.require(GrepSystemTable::PS, 22, 26); - st.require(GrepSystemTable::SSReq, 1, 0); - - if (!EXEC(st.copy(GrepSystemTable::PS, GrepSystemTable::SSReq, 3, &f, &l))) - ndbout_c("%s:%d: Illegal copy!", __FILE__, __FILE__); - ndbout_c("f=%d, l=%d", f, l); - st.print(); - st.require(GrepSystemTable::PS, 22, 26); - st.require(GrepSystemTable::SSReq, 22, 24); - - EXEC(st.clear(GrepSystemTable::PS, 22, 22)); - st.print(); - st.require(GrepSystemTable::PS, 23, 26); - st.require(GrepSystemTable::SSReq, 22, 24); - - if (!EXEC(st.copy(GrepSystemTable::PS, GrepSystemTable::SSReq, 2, &f, &l))) - ndbout_c("%s:%d: Illegal copy!", __FILE__, __LINE__); - ndbout_c("f=%d, l=%d", f, l); - st.print(); - st.require(GrepSystemTable::PS, 23, 26); - st.require(GrepSystemTable::SSReq, 22, 26); - - st.set(GrepSystemTable::SS, 7, 9); - st.set(GrepSystemTable::InsReq, 7, 9); - if (EXEC(st.movable(GrepSystemTable::SS, GrepSystemTable::InsReq))) - ndbout_c("%s:%d: Illegal move!", __FILE__, __LINE__); - st.print(); - st.require(GrepSystemTable::SS, 7, 9); - st.require(GrepSystemTable::InsReq, 7, 9); - - EXEC(st.intervalMinus(7, 9, 7, 7, &f, &l)); - ndbout_c("f=%d, l=%d", f, l); - - st.clear(GrepSystemTable::InsReq, 8, 9); - st.require(GrepSystemTable::SS, 7, 9); - st.require(GrepSystemTable::InsReq, 7, 7); - if (EXEC(st.movable(GrepSystemTable::SS, GrepSystemTable::InsReq)) != 2) - ndbout_c("%s:%d: Illegal move!", __FILE__, __LINE__); - st.print(); - - EXEC(st.copy(GrepSystemTable::SS, GrepSystemTable::InsReq, &f)); - st.print(); - st.require(GrepSystemTable::SS, 7, 9); - st.require(GrepSystemTable::InsReq, 7, 8); - - ndbout_c("--------------------------------------------------------"); - ndbout_c("Test completed"); - ndbout_c("--------------------------------------------------------"); -} diff --git a/ndb/test/ndbapi/testGrep.cpp b/ndb/test/ndbapi/testGrep.cpp deleted file mode 100644 index 713aefbeafa..00000000000 --- a/ndb/test/ndbapi/testGrep.cpp +++ /dev/null @@ -1,540 +0,0 @@ -/* Copyright (C) 2003 MySQL AB - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include <NDBT.hpp> -#include <NDBT_Test.hpp> -#include <HugoTransactions.hpp> -#include <UtilTransactions.hpp> -#include <NdbGrep.hpp> - - -#define CHECK(b) if (!(b)) { \ - g_err << "ERR: "<< step->getName() \ - << " failed on line " << __LINE__ << endl; \ - result = NDBT_FAILED; \ - continue; } - - -int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){ - - int records = ctx->getNumRecords(); - HugoTransactions hugoTrans(*ctx->getTab()); - if (hugoTrans.loadTable(GETNDB(step), records) != 0){ - return NDBT_FAILED; - } - return NDBT_OK; -} - -int runPkUpdate(NDBT_Context* ctx, NDBT_Step* step){ - int loops = ctx->getNumLoops(); - int records = ctx->getNumRecords(); - int batchSize = ctx->getProperty("BatchSize", 1); - int i = 0; - HugoTransactions hugoTrans(*ctx->getTab()); - while (i<loops) { - g_info << "|- " << i << ": "; - if (hugoTrans.pkUpdateRecords(GETNDB(step), records, batchSize) != 0){ - g_info << endl; - return NDBT_FAILED; - } - i++; - } - g_info << endl; - return NDBT_OK; -} - -int runRestartInitial(NDBT_Context* ctx, NDBT_Step* step){ - NdbRestarter restarter; - - Ndb* pNdb = GETNDB(step); - - const NdbDictionary::Table *tab = ctx->getTab(); - pNdb->getDictionary()->dropTable(tab->getName()); - - if (restarter.restartAll(true) != 0) - return NDBT_FAILED; - - return NDBT_OK; -} - -int runRestarter(NDBT_Context* ctx, NDBT_Step* step){ - int result = NDBT_OK; - int loops = ctx->getNumLoops(); - NdbRestarter restarter; - int i = 0; - int lastId = 0; - - if (restarter.getNumDbNodes() < 2){ - ctx->stopTest(); - return NDBT_OK; - } - - if(restarter.waitClusterStarted(60) != 0){ - g_err << "Cluster failed to start" << endl; - return NDBT_FAILED; - } - - loops *= restarter.getNumDbNodes(); - while(i<loops && result != NDBT_FAILED && !ctx->isTestStopped()){ - - int id = lastId % restarter.getNumDbNodes(); - int nodeId = restarter.getDbNodeId(id); - ndbout << "Restart node " << nodeId << endl; - if(restarter.restartOneDbNode(nodeId) != 0){ - g_err << "Failed to restartNextDbNode" << endl; - result = NDBT_FAILED; - break; - } - - if(restarter.waitClusterStarted(60) != 0){ - g_err << "Cluster failed to start" << endl; - result = NDBT_FAILED; - break; - } - - NdbSleep_SecSleep(1); - - lastId++; - i++; - } - - ctx->stopTest(); - - return result; -} - -int runCheckAllNodesStarted(NDBT_Context* ctx, NDBT_Step* step){ - NdbRestarter restarter; - - if(restarter.waitClusterStarted(1) != 0){ - g_err << "All nodes was not started " << endl; - return NDBT_FAILED; - } - - return NDBT_OK; -} - - -bool testMaster = true; -bool testSlave = false; - -int setMaster(NDBT_Context* ctx, NDBT_Step* step){ - testMaster = true; - testSlave = false; - return NDBT_OK; -} -int setMasterAsSlave(NDBT_Context* ctx, NDBT_Step* step){ - testMaster = true; - testSlave = true; - return NDBT_OK; -} -int setSlave(NDBT_Context* ctx, NDBT_Step* step){ - testMaster = false; - testSlave = true; - return NDBT_OK; -} - -int runAbort(NDBT_Context* ctx, NDBT_Step* step){ - - - NdbGrep grep(GETNDB(step)->getNodeId()+1); - NdbRestarter restarter; - - if (restarter.getNumDbNodes() < 2){ - ctx->stopTest(); - return NDBT_OK; - } - - if(restarter.waitClusterStarted(60) != 0){ - g_err << "Cluster failed to start" << endl; - return NDBT_FAILED; - } - - if (testMaster) { - if (testSlave) { - if (grep.NFMasterAsSlave(restarter) == -1){ - return NDBT_FAILED; - } - } else { - if (grep.NFMaster(restarter) == -1){ - return NDBT_FAILED; - } - } - } else { - if (grep.NFSlave(restarter) == -1){ - return NDBT_FAILED; - } - } - - return NDBT_OK; -} - -int runFail(NDBT_Context* ctx, NDBT_Step* step){ - NdbGrep grep(GETNDB(step)->getNodeId()+1); - - NdbRestarter restarter; - - if (restarter.getNumDbNodes() < 2){ - ctx->stopTest(); - return NDBT_OK; - } - - if(restarter.waitClusterStarted(60) != 0){ - g_err << "Cluster failed to start" << endl; - return NDBT_FAILED; - } - - if (testMaster) { - if (testSlave) { - if (grep.FailMasterAsSlave(restarter) == -1){ - return NDBT_FAILED; - } - } else { - if (grep.FailMaster(restarter) == -1){ - return NDBT_FAILED; - } - } - } else { - if (grep.FailSlave(restarter) == -1){ - return NDBT_FAILED; - } - } - - return NDBT_OK; -} - -int runGrepBasic(NDBT_Context* ctx, NDBT_Step* step){ - NdbGrep grep(GETNDB(step)->getNodeId()+1); - unsigned grepId = 0; - - if (grep.start() == -1){ - return NDBT_FAILED; - } - ndbout << "Started grep " << grepId << endl; - ctx->setProperty("GrepId", grepId); - - return NDBT_OK; -} - - - - -int runVerifyBasic(NDBT_Context* ctx, NDBT_Step* step){ - NdbGrep grep(GETNDB(step)->getNodeId()+1, ctx->getRemoteMgm()); - ndbout_c("no of nodes %d" ,grep.getNumDbNodes()); - int result; - if ((result = grep.verify(ctx)) == -1){ - return NDBT_FAILED; - } - return result; -} - - - -int runClearTable(NDBT_Context* ctx, NDBT_Step* step){ - int records = ctx->getNumRecords(); - - UtilTransactions utilTrans(*ctx->getTab()); - if (utilTrans.clearTable2(GETNDB(step), records) != 0){ - return NDBT_FAILED; - } - return NDBT_OK; -} - -#include "bank/Bank.hpp" - -int runCreateBank(NDBT_Context* ctx, NDBT_Step* step){ - Bank bank; - int overWriteExisting = true; - if (bank.createAndLoadBank(overWriteExisting) != NDBT_OK) - return NDBT_FAILED; - return NDBT_OK; -} - -int runBankTimer(NDBT_Context* ctx, NDBT_Step* step){ - Bank bank; - int wait = 30; // Max seconds between each "day" - int yield = 1; // Loops before bank returns - - while (ctx->isTestStopped() == false) { - bank.performIncreaseTime(wait, yield); - } - return NDBT_OK; -} - -int runBankTransactions(NDBT_Context* ctx, NDBT_Step* step){ - Bank bank; - int wait = 10; // Max ms between each transaction - int yield = 100; // Loops before bank returns - - while (ctx->isTestStopped() == false) { - bank.performTransactions(wait, yield); - } - return NDBT_OK; -} - -int runBankGL(NDBT_Context* ctx, NDBT_Step* step){ - Bank bank; - int yield = 20; // Loops before bank returns - int result = NDBT_OK; - - while (ctx->isTestStopped() == false) { - if (bank.performMakeGLs(yield) != NDBT_OK){ - ndbout << "bank.performMakeGLs FAILED" << endl; - result = NDBT_FAILED; - } - } - return NDBT_OK; -} - -int runBankSum(NDBT_Context* ctx, NDBT_Step* step){ - Bank bank; - int wait = 2000; // Max ms between each sum of accounts - int yield = 1; // Loops before bank returns - int result = NDBT_OK; - - while (ctx->isTestStopped() == false) { - if (bank.performSumAccounts(wait, yield) != NDBT_OK){ - ndbout << "bank.performSumAccounts FAILED" << endl; - result = NDBT_FAILED; - } - } - return result ; -} - -int runDropBank(NDBT_Context* ctx, NDBT_Step* step){ - Bank bank; - if (bank.dropBank() != NDBT_OK) - return NDBT_FAILED; - return NDBT_OK; -} - -int runGrepBank(NDBT_Context* ctx, NDBT_Step* step){ - int loops = ctx->getNumLoops(); - int l = 0; - int maxSleep = 30; // Max seconds between each grep - Ndb* pNdb = GETNDB(step); - NdbGrep grep(GETNDB(step)->getNodeId()+1); - unsigned minGrepId = ~0; - unsigned maxGrepId = 0; - unsigned grepId = 0; - int result = NDBT_OK; - - while (l < loops && result != NDBT_FAILED){ - - if (pNdb->waitUntilReady() != 0){ - result = NDBT_FAILED; - continue; - } - - // Sleep for a while - NdbSleep_SecSleep(maxSleep); - - // Perform grep - if (grep.start() != 0){ - ndbout << "grep.start failed" << endl; - result = NDBT_FAILED; - continue; - } - ndbout << "Started grep " << grepId << endl; - - // Remember min and max grepid - if (grepId < minGrepId) - minGrepId = grepId; - - if (grepId > maxGrepId) - maxGrepId = grepId; - - ndbout << " maxGrepId = " << maxGrepId - << ", minGrepId = " << minGrepId << endl; - ctx->setProperty("MinGrepId", minGrepId); - ctx->setProperty("MaxGrepId", maxGrepId); - - l++; - } - - ctx->stopTest(); - - return result; -} -/* -int runRestoreBankAndVerify(NDBT_Context* ctx, NDBT_Step* step){ - NdbRestarter restarter; - NdbGrep grep(GETNDB(step)->getNodeId()+1); - unsigned minGrepId = ctx->getProperty("MinGrepId"); - unsigned maxGrepId = ctx->getProperty("MaxGrepId"); - unsigned grepId = minGrepId; - int result = NDBT_OK; - int errSumAccounts = 0; - int errValidateGL = 0; - - ndbout << " maxGrepId = " << maxGrepId << endl; - ndbout << " minGrepId = " << minGrepId << endl; - - while (grepId <= maxGrepId){ - - // TEMPORARY FIX - // To erase all tables from cache(s) - // To be removed, maybe replaced by ndb.invalidate(); - { - Bank bank; - - if (bank.dropBank() != NDBT_OK){ - result = NDBT_FAILED; - break; - } - } - // END TEMPORARY FIX - - ndbout << "Performing initial restart" << endl; - if (restarter.restartAll(true) != 0) - return NDBT_FAILED; - - if (restarter.waitClusterStarted() != 0) - return NDBT_FAILED; - - ndbout << "Restoring grep " << grepId << endl; - if (grep.restore(grepId) == -1){ - return NDBT_FAILED; - } - ndbout << "Grep " << grepId << " restored" << endl; - - // Let bank verify - Bank bank; - - int wait = 0; - int yield = 1; - if (bank.performSumAccounts(wait, yield) != 0){ - ndbout << "bank.performSumAccounts FAILED" << endl; - ndbout << " grepId = " << grepId << endl << endl; - result = NDBT_FAILED; - errSumAccounts++; - } - - if (bank.performValidateAllGLs() != 0){ - ndbout << "bank.performValidateAllGLs FAILED" << endl; - ndbout << " grepId = " << grepId << endl << endl; - result = NDBT_FAILED; - errValidateGL++; - } - - grepId++; - } - - if (result != NDBT_OK){ - ndbout << "Verification of grep failed" << endl - << " errValidateGL="<<errValidateGL<<endl - << " errSumAccounts="<<errSumAccounts<<endl << endl; - } - - return result; -} -*/ - -NDBT_TESTSUITE(testGrep); -TESTCASE("GrepBasic", - "Test that Global Replication works on one table \n" - "1. Load table\n" - "2. Grep\n" - "3. Restart -i\n" - "4. Restore\n" - "5. Verify count and content of table\n"){ - INITIALIZER(runLoadTable); - VERIFIER(runVerifyBasic); - FINALIZER(runClearTable); - -} - -TESTCASE("GrepNodeRestart", - "Test that Global Replication works on one table \n" - "1. Load table\n" - "2. Grep\n" - "3. Restart -i\n" - "4. Restore\n" - "5. Verify count and content of table\n"){ - INITIALIZER(runLoadTable); - STEP(runPkUpdate); - STEP(runRestarter); - VERIFIER(runVerifyBasic); - FINALIZER(runClearTable); -} - - -TESTCASE("GrepBank", - "Test that grep and restore works during transaction load\n" - " by backing up the bank" - "1. Create bank\n" - "2a. Start bank and let it run\n" - "2b. Perform loop number of greps of the bank\n" - " when greps are finished tell bank to close\n" - "3. Restart ndb -i and reload each grep\n" - " let bank verify that the grep is consistent\n" - "4. Drop bank\n"){ - INITIALIZER(runCreateBank); - STEP(runBankTimer); - STEP(runBankTransactions); - STEP(runBankGL); - // TODO STEP(runBankSum); - STEP(runGrepBank); - // VERIFIER(runRestoreBankAndVerify); - // FINALIZER(runDropBank); - -} - -TESTCASE("NFMaster", - "Test that grep behaves during node failiure\n"){ - INITIALIZER(setMaster); - STEP(runAbort); - -} -TESTCASE("NFMasterAsSlave", - "Test that grep behaves during node failiure\n"){ - INITIALIZER(setMasterAsSlave); - STEP(runAbort); - -} -TESTCASE("NFSlave", - "Test that grep behaves during node failiure\n"){ - INITIALIZER(setSlave); - STEP(runAbort); - -} -TESTCASE("FailMaster", - "Test that grep behaves during node failiure\n"){ - INITIALIZER(setMaster); - STEP(runFail); - -} -TESTCASE("FailMasterAsSlave", - "Test that grep behaves during node failiure\n"){ - INITIALIZER(setMasterAsSlave); - STEP(runFail); - -} -TESTCASE("FailSlave", - "Test that grep behaves during node failiure\n"){ - INITIALIZER(setSlave); - STEP(runFail); - -} -NDBT_TESTSUITE_END(testGrep); - -int main(int argc, const char** argv){ - ndb_init(); - return testGrep.execute(argc, argv); -} - - |