diff options
Diffstat (limited to 'storage/ndb/src/kernel/blocks/trix/Trix.cpp')
-rw-r--r-- | storage/ndb/src/kernel/blocks/trix/Trix.cpp | 967 |
1 files changed, 967 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/blocks/trix/Trix.cpp b/storage/ndb/src/kernel/blocks/trix/Trix.cpp new file mode 100644 index 00000000000..cd11cb4d575 --- /dev/null +++ b/storage/ndb/src/kernel/blocks/trix/Trix.cpp @@ -0,0 +1,967 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "Trix.hpp" + +#include <string.h> +#include <kernel_types.h> +#include <NdbOut.hpp> + +#include <signaldata/ReadNodesConf.hpp> +#include <signaldata/NodeFailRep.hpp> +#include <signaldata/DumpStateOrd.hpp> +#include <signaldata/GetTabInfo.hpp> +#include <signaldata/DictTabInfo.hpp> +#include <signaldata/BuildIndx.hpp> +#include <signaldata/SumaImpl.hpp> +#include <signaldata/UtilPrepare.hpp> +#include <signaldata/UtilExecute.hpp> +#include <signaldata/UtilRelease.hpp> +#include <SectionReader.hpp> +#include <AttributeHeader.hpp> + +#define CONSTRAINT_VIOLATION 893 + +#define DEBUG(x) { ndbout << "TRIX::" << x << endl; } + +/** + * + */ +Trix::Trix(const Configuration & conf) : + SimulatedBlock(TRIX, conf), + c_theNodes(c_theNodeRecPool), + c_masterNodeId(0), + c_masterTrixRef(0), + c_noNodesFailed(0), + c_noActiveNodes(0), + c_theSubscriptions(c_theSubscriptionRecPool) +{ + BLOCK_CONSTRUCTOR(Trix); + + // Add received signals + addRecSignal(GSN_STTOR, &Trix::execSTTOR); + addRecSignal(GSN_NDB_STTOR, &Trix::execNDB_STTOR); // Forwarded from DICT + addRecSignal(GSN_READ_NODESCONF, &Trix::execREAD_NODESCONF); + addRecSignal(GSN_READ_NODESREF, &Trix::execREAD_NODESREF); + addRecSignal(GSN_NODE_FAILREP, &Trix::execNODE_FAILREP); + addRecSignal(GSN_INCL_NODEREQ, &Trix::execINCL_NODEREQ); + addRecSignal(GSN_DUMP_STATE_ORD, &Trix::execDUMP_STATE_ORD); + + // Index build + addRecSignal(GSN_BUILDINDXREQ, &Trix::execBUILDINDXREQ); + // Dump testing + addRecSignal(GSN_BUILDINDXCONF, &Trix::execBUILDINDXCONF); + addRecSignal(GSN_BUILDINDXREF, &Trix::execBUILDINDXREF); + + + addRecSignal(GSN_UTIL_PREPARE_CONF, &Trix::execUTIL_PREPARE_CONF); + addRecSignal(GSN_UTIL_PREPARE_REF, &Trix::execUTIL_PREPARE_REF); + addRecSignal(GSN_UTIL_EXECUTE_CONF, &Trix::execUTIL_EXECUTE_CONF); + addRecSignal(GSN_UTIL_EXECUTE_REF, &Trix::execUTIL_EXECUTE_REF); + addRecSignal(GSN_UTIL_RELEASE_CONF, &Trix::execUTIL_RELEASE_CONF); + addRecSignal(GSN_UTIL_RELEASE_REF, &Trix::execUTIL_RELEASE_REF); + + + // Suma signals + addRecSignal(GSN_SUB_CREATE_CONF, &Trix::execSUB_CREATE_CONF); + addRecSignal(GSN_SUB_CREATE_REF, &Trix::execSUB_CREATE_REF); + addRecSignal(GSN_SUB_REMOVE_CONF, &Trix::execSUB_REMOVE_CONF); + addRecSignal(GSN_SUB_REMOVE_REF, &Trix::execSUB_REMOVE_REF); + addRecSignal(GSN_SUB_SYNC_CONF, &Trix::execSUB_SYNC_CONF); + addRecSignal(GSN_SUB_SYNC_REF, &Trix::execSUB_SYNC_REF); + addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, &Trix::execSUB_SYNC_CONTINUE_REQ); + addRecSignal(GSN_SUB_META_DATA, &Trix::execSUB_META_DATA); + addRecSignal(GSN_SUB_TABLE_DATA, &Trix::execSUB_TABLE_DATA); + + // Allocate pool sizes + c_theAttrOrderBufferPool.setSize(100); + c_theSubscriptionRecPool.setSize(100); + + ArrayList<SubscriptionRecord> subscriptions(c_theSubscriptionRecPool); + SubscriptionRecPtr subptr; + while(subscriptions.seize(subptr) == true) { + new (subptr.p) SubscriptionRecord(c_theAttrOrderBufferPool); + } + subscriptions.release(); +} + +/** + * + */ +Trix::~Trix() +{ +} + +/** + * + */ +void Trix::execSTTOR(Signal* signal) +{ + jamEntry(); + + //const Uint32 startphase = signal->theData[1]; + const Uint32 theSignalKey = signal->theData[6]; + + signal->theData[0] = theSignalKey; + signal->theData[3] = 1; + signal->theData[4] = 255; // No more start phases from missra + sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB); + return; +}//Trix::execSTTOR() + +/** + * + */ +void Trix::execNDB_STTOR(Signal* signal) +{ + jamEntry(); + BlockReference ndbcntrRef = signal->theData[0]; + Uint16 startphase = signal->theData[2]; /* RESTART PHASE */ + Uint16 mynode = signal->theData[1]; + //Uint16 restarttype = signal->theData[3]; + //UintR configInfo1 = signal->theData[6]; /* CONFIGRATION INFO PART 1 */ + //UintR configInfo2 = signal->theData[7]; /* CONFIGRATION INFO PART 2 */ + switch (startphase) { + case 3: + jam(); + /* SYMBOLIC START PHASE 4 */ + /* ABSOLUTE PHASE 5 */ + /* REQUEST NODE IDENTITIES FROM DBDIH */ + signal->theData[0] = calcTrixBlockRef(mynode); + sendSignal(ndbcntrRef, GSN_READ_NODESREQ, signal, 1, JBB); + return; + break; + case 6: + break; + default: + break; + } +} + +/** + * + */ +void Trix::execREAD_NODESCONF(Signal* signal) +{ + jamEntry(); + + ReadNodesConf * const readNodes = (ReadNodesConf *)signal->getDataPtr(); + //Uint32 noOfNodes = readNodes->noOfNodes; + NodeRecPtr nodeRecPtr; + + c_masterNodeId = readNodes->masterNodeId; + c_masterTrixRef = RNIL; + c_noNodesFailed = 0; + + for(unsigned i = 0; i < MAX_NDB_NODES; i++) { + jam(); + if(NodeBitmask::get(readNodes->allNodes, i)) { + // Node is defined + jam(); + ndbrequire(c_theNodes.seizeId(nodeRecPtr, i)); + nodeRecPtr.p->trixRef = calcTrixBlockRef(i); + if (i == c_masterNodeId) { + c_masterTrixRef = nodeRecPtr.p->trixRef; + } + if(NodeBitmask::get(readNodes->inactiveNodes, i)){ + // Node is not active + jam(); + /**----------------------------------------------------------------- + * THIS NODE IS DEFINED IN THE CLUSTER BUT IS NOT ALIVE CURRENTLY. + * WE ADD THE NODE TO THE SET OF FAILED NODES AND ALSO SET THE + * BLOCKSTATE TO BUSY TO AVOID ADDING TRIGGERS OR INDEXES WHILE + * NOT ALL NODES ARE ALIVE. + *------------------------------------------------------------------*/ + arrGuard(c_noNodesFailed, MAX_NDB_NODES); + nodeRecPtr.p->alive = false; + c_noNodesFailed++; + c_blockState = Trix::NODE_FAILURE; + } + else { + // Node is active + jam(); + c_noActiveNodes++; + nodeRecPtr.p->alive = true; + } + } + } + if (c_noNodesFailed == 0) { + c_blockState = Trix::STARTED; + } +} + +/** + * + */ +void Trix::execREAD_NODESREF(Signal* signal) +{ + // NYI +} + +/** + * + */ +void Trix::execNODE_FAILREP(Signal* signal) +{ + jamEntry(); + NodeFailRep * const nodeFail = (NodeFailRep *) signal->getDataPtr(); + + //Uint32 failureNr = nodeFail->failNo; + //Uint32 numberNodes = nodeFail->noOfNodes; + Uint32 masterNodeId = nodeFail->masterNodeId; + + NodeRecPtr nodeRecPtr; + + for(c_theNodes.first(nodeRecPtr); + nodeRecPtr.i != RNIL; + c_theNodes.next(nodeRecPtr)) { + if(NodeBitmask::get(nodeFail->theNodes, nodeRecPtr.i)) { + nodeRecPtr.p->alive = false; + c_noNodesFailed++; + c_noActiveNodes--; + } + } + if (c_masterNodeId != masterNodeId) { + c_masterNodeId = masterNodeId; + NodeRecord* nodeRec = c_theNodes.getPtr(masterNodeId); + c_masterTrixRef = nodeRec->trixRef; + } +} + +/** + * + */ +void Trix::execINCL_NODEREQ(Signal* signal) +{ + jamEntry(); + UintR node_id = signal->theData[1]; + NodeRecord* nodeRec = c_theNodes.getPtr(node_id); + nodeRec->alive = true; + c_noNodesFailed--; + c_noActiveNodes++; + nodeRec->trixRef = calcTrixBlockRef(node_id); + if (c_noNodesFailed == 0) { + c_blockState = Trix::STARTED; + } +} + +// Debugging +void +Trix::execDUMP_STATE_ORD(Signal* signal) +{ + jamEntry(); + + DumpStateOrd * dumpStateOrd = (DumpStateOrd *)signal->getDataPtr(); + + switch(dumpStateOrd->args[0]) { + case(300): {// ok + // index2 -T; index2 -I -n10000; index2 -c + // all dump 300 0 0 0 0 0 4 2 + // select_count INDEX0000 + BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend(); + + MEMCOPY_NO_WORDS(buildIndxReq, + signal->theData + 1, + BuildIndxReq::SignalLength); + buildIndxReq->setUserRef(reference()); // return to me + buildIndxReq->setParallelism(10); + Uint32 indexColumns[1] = {1}; + Uint32 keyColumns[1] = {0}; + struct LinearSectionPtr orderPtr[2]; + buildIndxReq->setColumnOrder(indexColumns, 1, keyColumns, 1, orderPtr); + sendSignal(reference(), + GSN_BUILDINDXREQ, + signal, + BuildIndxReq::SignalLength, + JBB, + orderPtr, + BuildIndxReq::NoOfSections); + break; + } + case(301): { // ok + // index2 -T; index2 -I -n10000; index2 -c -p + // all dump 301 0 0 0 0 0 4 2 + // select_count INDEX0000 + BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend(); + + MEMCOPY_NO_WORDS(buildIndxReq, + signal->theData + 1, + BuildIndxReq::SignalLength); + buildIndxReq->setUserRef(reference()); // return to me + buildIndxReq->setParallelism(10); + Uint32 indexColumns[2] = {0, 1}; + Uint32 keyColumns[1] = {0}; + struct LinearSectionPtr orderPtr[2]; + buildIndxReq->setColumnOrder(indexColumns, 2, keyColumns, 1, orderPtr); + sendSignal(reference(), + GSN_BUILDINDXREQ, + signal, + BuildIndxReq::SignalLength, + JBB, + orderPtr, + BuildIndxReq::NoOfSections); + break; + } + case(302): { // ok + // index -T; index -I -n1000; index -c -p + // all dump 302 0 0 0 0 0 4 2 + // select_count PNUMINDEX0000 + BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend(); + + MEMCOPY_NO_WORDS(buildIndxReq, + signal->theData + 1, + BuildIndxReq::SignalLength); + buildIndxReq->setUserRef(reference()); // return to me + buildIndxReq->setParallelism(10); + Uint32 indexColumns[3] = {0, 3, 5}; + Uint32 keyColumns[1] = {0}; + struct LinearSectionPtr orderPtr[2]; + buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 1, orderPtr); + sendSignal(reference(), + GSN_BUILDINDXREQ, + signal, + BuildIndxReq::SignalLength, + JBB, + orderPtr, + BuildIndxReq::NoOfSections); + break; + } + case(303): { // ok + // index -T -2; index -I -2 -n1000; index -c -p + // all dump 303 0 0 0 0 0 4 2 + // select_count PNUMINDEX0000 + BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend(); + + MEMCOPY_NO_WORDS(buildIndxReq, + signal->theData + 1, + BuildIndxReq::SignalLength); + buildIndxReq->setUserRef(reference()); // return to me + buildIndxReq->setParallelism(10); + Uint32 indexColumns[3] = {0, 3, 5}; + Uint32 keyColumns[2] = {0, 1}; + struct LinearSectionPtr orderPtr[2]; + buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 2, orderPtr); + sendSignal(reference(), + GSN_BUILDINDXREQ, + signal, + BuildIndxReq::SignalLength, + JBB, + orderPtr, + BuildIndxReq::NoOfSections); + break; + } + case(304): { // ok + // index -T -L; index -I -L -n1000; index -c -p + // all dump 304 0 0 0 0 0 4 2 + // select_count PNUMINDEX0000 + BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend(); + + MEMCOPY_NO_WORDS(buildIndxReq, + signal->theData + 1, + BuildIndxReq::SignalLength); + buildIndxReq->setUserRef(reference()); // return to me + buildIndxReq->setParallelism(10); + Uint32 indexColumns[3] = {0, 3, 5}; + Uint32 keyColumns[1] = {0}; + struct LinearSectionPtr orderPtr[2]; + buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 1, orderPtr); + sendSignal(reference(), + GSN_BUILDINDXREQ, + signal, + BuildIndxReq::SignalLength, + JBB, + orderPtr, + BuildIndxReq::NoOfSections); + break; + } + case(305): { // ok + // index -T -2 -L; index -I -2 -L -n1000; index -c -p + // all dump 305 0 0 0 0 0 4 2 + // select_count PNUMINDEX0000 + BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend(); + + MEMCOPY_NO_WORDS(buildIndxReq, + signal->theData + 1, + BuildIndxReq::SignalLength); + buildIndxReq->setUserRef(reference()); // return to me + buildIndxReq->setParallelism(10); + Uint32 indexColumns[3] = {0, 3, 5}; + Uint32 keyColumns[2] = {0, 1}; + struct LinearSectionPtr orderPtr[2]; + buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 2, orderPtr); + sendSignal(reference(), + GSN_BUILDINDXREQ, + signal, + BuildIndxReq::SignalLength, + JBB, + orderPtr, + BuildIndxReq::NoOfSections); + break; + } + default: { + // Ignore + } + } +} + +// Build index +/** + * + */ +void Trix:: execBUILDINDXREQ(Signal* signal) +{ + jamEntry(); + BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtr(); + + // Seize a subscription record + SubscriptionRecPtr subRecPtr; + SubscriptionRecord* subRec; + + if (!c_theSubscriptions.seizeId(subRecPtr, buildIndxReq->getBuildId())) { + // Failed to allocate subscription record + BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend(); + + buildIndxRef->setErrorCode(BuildIndxRef::AllocationFailure); + releaseSections(signal); + sendSignal(buildIndxReq->getUserRef(), + GSN_BUILDINDXREF, signal, BuildIndxRef::SignalLength, JBB); + return; + } + subRec = subRecPtr.p; + subRec->errorCode = BuildIndxRef::NoError; + subRec->userReference = buildIndxReq->getUserRef(); + subRec->connectionPtr = buildIndxReq->getConnectionPtr(); + subRec->subscriptionId = buildIndxReq->getBuildId(); + subRec->subscriptionKey = buildIndxReq->getBuildKey(); + subRec->indexType = buildIndxReq->getIndexType(); + subRec->sourceTableId = buildIndxReq->getTableId(); + subRec->targetTableId = buildIndxReq->getIndexId(); + subRec->parallelism = buildIndxReq->getParallelism(); + subRec->expectedConf = 0; + subRec->subscriptionCreated = false; + subRec->pendingSubSyncContinueConf = false; + subRec->prepareId = RNIL; + + // Get column order segments + Uint32 noOfSections = signal->getNoOfSections(); + if(noOfSections > 0) { + SegmentedSectionPtr ptr; + signal->getSection(ptr, BuildIndxReq::INDEX_COLUMNS); + append(subRec->attributeOrder, ptr, getSectionSegmentPool()); + subRec->noOfIndexColumns = ptr.sz; + } + if(noOfSections > 1) { + SegmentedSectionPtr ptr; + signal->getSection(ptr, BuildIndxReq::KEY_COLUMNS); + append(subRec->attributeOrder, ptr, getSectionSegmentPool()); + subRec->noOfKeyColumns = ptr.sz; + } +#if 0 + // Debugging + printf("Trix:: execBUILDINDXREQ: Attribute order:\n"); + subRec->attributeOrder.print(stdout); +#endif + releaseSections(signal); + prepareInsertTransactions(signal, subRecPtr); +} + +void Trix:: execBUILDINDXCONF(Signal* signal) +{ + printf("Trix:: execBUILDINDXCONF\n"); +} + +void Trix:: execBUILDINDXREF(Signal* signal) +{ + printf("Trix:: execBUILDINDXREF\n"); +} + +void Trix::execUTIL_PREPARE_CONF(Signal* signal) +{ + jamEntry(); + UtilPrepareConf * utilPrepareConf = (UtilPrepareConf *)signal->getDataPtr(); + SubscriptionRecPtr subRecPtr; + SubscriptionRecord* subRec; + + subRecPtr.i = utilPrepareConf->senderData; + if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { + printf("Trix::execUTIL_PREPARE_CONF: Failed to find subscription data %u\n", subRecPtr.i); + return; + } + subRecPtr.p = subRec; + subRec->prepareId = utilPrepareConf->prepareId; + setupSubscription(signal, subRecPtr); +} + +void Trix::execUTIL_PREPARE_REF(Signal* signal) +{ + jamEntry(); + UtilPrepareRef * utilPrepareRef = (UtilPrepareRef *)signal->getDataPtr(); + SubscriptionRecPtr subRecPtr; + SubscriptionRecord* subRec; + + subRecPtr.i = utilPrepareRef->senderData; + if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { + printf("Trix::execUTIL_PREPARE_REF: Failed to find subscription data %u\n", subRecPtr.i); + return; + } + subRecPtr.p = subRec; + subRec->errorCode = BuildIndxRef::InternalError; +} + +void Trix::execUTIL_EXECUTE_CONF(Signal* signal) +{ + jamEntry(); + UtilExecuteConf * utilExecuteConf = (UtilExecuteConf *)signal->getDataPtr(); + SubscriptionRecPtr subRecPtr; + SubscriptionRecord* subRec; + + subRecPtr.i = utilExecuteConf->senderData; + if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { + printf("rix::execUTIL_EXECUTE_CONF: Failed to find subscription data %u\n", subRecPtr.i); + return; + } + subRecPtr.p = subRec; + subRec->expectedConf--; + checkParallelism(signal, subRec); + if (subRec->expectedConf == 0) + buildComplete(signal, subRecPtr); +} + +void Trix::execUTIL_EXECUTE_REF(Signal* signal) +{ + jamEntry(); + UtilExecuteRef * utilExecuteRef = (UtilExecuteRef *)signal->getDataPtr(); + SubscriptionRecPtr subRecPtr; + SubscriptionRecord* subRec; + + subRecPtr.i = utilExecuteRef->senderData; + if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { + printf("Trix::execUTIL_EXECUTE_REF: Failed to find subscription data %u\n", subRecPtr.i); + return; + } + subRecPtr.p = subRec; + ndbrequire(utilExecuteRef->errorCode == UtilExecuteRef::TCError); + if(utilExecuteRef->TCErrorCode == CONSTRAINT_VIOLATION) + buildFailed(signal, subRecPtr, BuildIndxRef::IndexNotUnique); + else + buildFailed(signal, subRecPtr, BuildIndxRef::InternalError); +} + +void Trix::execSUB_CREATE_CONF(Signal* signal) +{ + jamEntry(); + SubCreateConf * subCreateConf = (SubCreateConf *)signal->getDataPtr(); + SubscriptionRecPtr subRecPtr; + SubscriptionRecord* subRec; + + subRecPtr.i = subCreateConf->subscriberData; + if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { + printf("Trix::execSUB_CREATE_CONF: Failed to find subscription data %u\n", subRecPtr.i); + return; + } + ndbrequire(subRec->subscriptionId == subCreateConf->subscriptionId); + ndbrequire(subRec->subscriptionKey == subCreateConf->subscriptionKey); + subRec->subscriptionCreated = true; + subRecPtr.p = subRec; + setupTableScan(signal, subRecPtr); +} + +void Trix::execSUB_CREATE_REF(Signal* signal) +{ + jamEntry(); + // THIS SIGNAL IS NEVER SENT FROM SUMA? + /* + SubCreateRef * subCreateRef = (SubCreateRef *)signal->getDataPtr(); + SubscriptionRecPtr subRecPtr; + SubscriptionRecord* subRec; + + subRecPtr.i = subCreateRef->subscriberData; + if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { + printf("Trix::execSUB_CREATE_REF: Failed to find subscription data %u\n", subRecPtr.i); + return; + } + subRecPtr.p = subRec; + buildFailed(signal, subRecPtr, BuildIndxRef::InternalError); + */ +} + +void Trix::execSUB_SYNC_CONF(Signal* signal) +{ + jamEntry(); + SubSyncConf * subSyncConf = (SubSyncConf *)signal->getDataPtr(); + SubscriptionRecPtr subRecPtr; + SubscriptionRecord* subRec; + + subRecPtr.i = subSyncConf->subscriberData; + if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { + printf("Trix::execSUB_SYNC_CONF: Failed to find subscription data %u\n", subRecPtr.i); + return; + } + ndbrequire(subRec->subscriptionId == subSyncConf->subscriptionId); + ndbrequire(subRec->subscriptionKey == subSyncConf->subscriptionKey); + subRecPtr.p = subRec; + if(subSyncConf->part == SubscriptionData::MetaData) + startTableScan(signal, subRecPtr); + else { + subRec->expectedConf--; + checkParallelism(signal, subRec); + if (subRec->expectedConf == 0) + buildComplete(signal, subRecPtr); + } +} + +void Trix::execSUB_SYNC_REF(Signal* signal) +{ + jamEntry(); + SubSyncRef * subSyncRef = (SubSyncRef *)signal->getDataPtr(); + SubscriptionRecPtr subRecPtr; + SubscriptionRecord* subRec; + + subRecPtr.i = subSyncRef->subscriberData; + if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { + printf("Trix::execSUB_SYNC_REF: Failed to find subscription data %u\n", subRecPtr.i); + return; + } + subRecPtr.p = subRec; + buildFailed(signal, subRecPtr, BuildIndxRef::InternalError); +} + +void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal) +{ + SubSyncContinueReq * subSyncContinueReq = + (SubSyncContinueReq *) signal->getDataPtr(); + + SubscriptionRecPtr subRecPtr; + SubscriptionRecord* subRec; + subRecPtr.i = subSyncContinueReq->subscriberData; + if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { + printf("Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %u\n", subRecPtr.i); + return; + } + subRecPtr.p = subRec; + subRec->pendingSubSyncContinueConf = true; + checkParallelism(signal, subRec); +} + +void Trix::execSUB_META_DATA(Signal* signal) +{ + jamEntry(); +} + +void Trix::execSUB_TABLE_DATA(Signal* signal) +{ + jamEntry(); + SubTableData * subTableData = (SubTableData *)signal->getDataPtr(); + SubscriptionRecPtr subRecPtr; + SubscriptionRecord* subRec; + subRecPtr.i = subTableData->subscriberData; + if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { + printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %u\n", subRecPtr.i); + return; + } + subRecPtr.p = subRec; + SegmentedSectionPtr headerPtr, dataPtr; + if (!signal->getSection(headerPtr, 0)) { + printf("Trix::execSUB_TABLE_DATA: Failed to get header section\n"); + } + if (!signal->getSection(dataPtr, 1)) { + printf("Trix::execSUB_TABLE_DATA: Failed to get data section\n"); + } + executeInsertTransaction(signal, subRecPtr, headerPtr, dataPtr); +} + +void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr) +{ + Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2]; + SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend(); + SubscriptionRecord* subRec = subRecPtr.p; +// Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns; + AttrOrderBuffer::DataBufferIterator iter; + Uint32 i = 0; + + jam(); + bool moreAttributes = subRec->attributeOrder.first(iter); + while (moreAttributes) { + attributeList[i++] = *iter.data; + moreAttributes = subRec->attributeOrder.next(iter); + } + // Merge index and key column segments + struct LinearSectionPtr orderPtr[3]; + orderPtr[0].p = attributeList; + orderPtr[0].sz = subRec->attributeOrder.getSize(); + + + subCreateReq->subscriberRef = reference(); + subCreateReq->subscriberData = subRecPtr.i; + subCreateReq->subscriptionId = subRec->subscriptionId; + subCreateReq->subscriptionKey = subRec->subscriptionKey; + subCreateReq->tableId = subRec->sourceTableId; + subCreateReq->subscriptionType = SubCreateReq::SingleTableScan; + + sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ, + signal, SubCreateReq::SignalLength+1, JBB, orderPtr, 1); +} + +void Trix::setupTableScan(Signal* signal, SubscriptionRecPtr subRecPtr) +{ + SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend(); + + jam(); + subSyncReq->subscriptionId = subRecPtr.i; + subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey; + subSyncReq->part = SubscriptionData::MetaData; + sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, + signal, SubSyncReq::SignalLength, JBB); +} + +void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr) +{ + jam(); + subRecPtr.p->expectedConf = 1; + SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend(); + + subSyncReq->subscriptionId = subRecPtr.i; + subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey; + subSyncReq->part = SubscriptionData::TableData; + sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, + signal, SubSyncReq::SignalLength, JBB); +} + +void Trix::prepareInsertTransactions(Signal* signal, + SubscriptionRecPtr subRecPtr) +{ + SubscriptionRecord* subRec = subRecPtr.p; + UtilPrepareReq * utilPrepareReq = + (UtilPrepareReq *)signal->getDataPtrSend(); + + jam(); + utilPrepareReq->senderRef = reference(); + utilPrepareReq->senderData = subRecPtr.i; + + const Uint32 pageSizeInWords = 128; + Uint32 propPage[pageSizeInWords]; + LinearWriter w(&propPage[0],128); + w.first(); + w.add(UtilPrepareReq::NoOfOperations, 1); + w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write); + w.add(UtilPrepareReq::TableId, subRec->targetTableId); + // Add index attributes in increasing order and one PK attribute + for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++) + w.add(UtilPrepareReq::AttributeId, i); + +#if 0 + // Debugging + SimplePropertiesLinearReader reader(propPage, w.getWordsUsed()); + printf("Trix::prepareInsertTransactions: Sent SimpleProperties:\n"); + reader.printAll(ndbout); +#endif + + struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections]; + sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage; + sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed(); + sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal, + UtilPrepareReq::SignalLength, JBB, + sectionsPtr, UtilPrepareReq::NoOfSections); +} + +void Trix::executeInsertTransaction(Signal* signal, + SubscriptionRecPtr subRecPtr, + SegmentedSectionPtr headerPtr, + SegmentedSectionPtr dataPtr) +{ + jam(); + SubscriptionRecord* subRec = subRecPtr.p; + UtilExecuteReq * utilExecuteReq = + (UtilExecuteReq *)signal->getDataPtrSend(); + Uint32* headerBuffer = signal->theData + 25; + Uint32* dataBuffer = headerBuffer + headerPtr.sz; + + utilExecuteReq->senderRef = reference(); + utilExecuteReq->senderData = subRecPtr.i; + utilExecuteReq->prepareId = subRec->prepareId; +#if 0 + printf("Header size %u\n", headerPtr.sz); + for(int i = 0; i < headerPtr.sz; i++) + printf("H'%.8x ", headerBuffer[i]); + printf("\n"); + + printf("Data size %u\n", dataPtr.sz); + for(int i = 0; i < dataPtr.sz; i++) + printf("H'%.8x ", dataBuffer[i]); + printf("\n"); +#endif + // Save scan result in linear buffers + copy(headerBuffer, headerPtr); + copy(dataBuffer, dataPtr); + + // Calculate packed key size + Uint32 noOfKeyData = 0; + for(Uint32 i = 0; i < headerPtr.sz; i++) { + AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i; + + // Filter out NULL attributes + if (keyAttrHead->isNULL()) + return; + + if (i < subRec->noOfIndexColumns) + // Renumber index attributes in consequtive order + keyAttrHead->setAttributeId(i); + else + // Calculate total size of PK attribute + noOfKeyData += keyAttrHead->getDataSize(); + } + // Increase expected CONF count + subRec->expectedConf++; + + // Pack key attributes + AttributeHeader::init(headerBuffer + subRec->noOfIndexColumns, + subRec->noOfIndexColumns, + noOfKeyData); + + struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections]; + sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer; + sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz = + subRec->noOfIndexColumns + 1; + sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer; + sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz; + sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal, + UtilExecuteReq::SignalLength, JBB, + sectionsPtr, UtilExecuteReq::NoOfSections); +} + +void Trix::buildComplete(Signal* signal, SubscriptionRecPtr subRecPtr) +{ + SubRemoveReq * const req = (SubRemoveReq*)signal->getDataPtrSend(); + req->senderRef = reference(); + req->senderData = subRecPtr.i; + req->subscriptionId = subRecPtr.p->subscriptionId; + req->subscriptionKey = subRecPtr.p->subscriptionKey; + sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal, + SubRemoveReq::SignalLength, JBB); +} + +void Trix::buildFailed(Signal* signal, + SubscriptionRecPtr subRecPtr, + BuildIndxRef::ErrorCode errorCode) +{ + SubscriptionRecord* subRec = subRecPtr.p; + + subRec->errorCode = errorCode; + // Continue accumulating since we currently cannot stop SUMA + subRec->expectedConf--; + checkParallelism(signal, subRec); + if (subRec->expectedConf == 0) + buildComplete(signal, subRecPtr); +} + +void +Trix::execSUB_REMOVE_REF(Signal* signal){ + jamEntry(); + //@todo + ndbrequire(false); +} + +void +Trix::execSUB_REMOVE_CONF(Signal* signal){ + jamEntry(); + + SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend(); + + SubscriptionRecPtr subRecPtr; + c_theSubscriptions.getPtr(subRecPtr, conf->senderData); + + if(subRecPtr.p->prepareId != RNIL){ + jam(); + + UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend(); + req->prepareId = subRecPtr.p->prepareId; + req->senderData = subRecPtr.i; + + sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal, + UtilReleaseReq::SignalLength , JBB); + return; + } + + { + UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend(); + conf->senderData = subRecPtr.i; + execUTIL_RELEASE_CONF(signal); + } +} + +void +Trix::execUTIL_RELEASE_REF(Signal* signal){ + jamEntry(); + ndbrequire(false); +} + +void +Trix::execUTIL_RELEASE_CONF(Signal* signal){ + + UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend(); + + SubscriptionRecPtr subRecPtr; + c_theSubscriptions.getPtr(subRecPtr, conf->senderData); + + if(subRecPtr.p->errorCode == BuildIndxRef::NoError){ + // Build is complete, reply to original sender + BuildIndxConf * buildIndxConf = (BuildIndxConf *)signal->getDataPtrSend(); + buildIndxConf->setUserRef(subRecPtr.p->userReference); + buildIndxConf->setConnectionPtr(subRecPtr.p->connectionPtr); + buildIndxConf->setRequestType(BuildIndxReq::RT_TRIX); + buildIndxConf->setIndexType(subRecPtr.p->indexType); + buildIndxConf->setTableId(subRecPtr.p->sourceTableId); + buildIndxConf->setIndexId(subRecPtr.p->targetTableId); + + sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXCONF, signal, + BuildIndxConf::SignalLength , JBB); + } else { + // Build failed, reply to original sender + BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend(); + buildIndxRef->setUserRef(subRecPtr.p->userReference); + buildIndxRef->setConnectionPtr(subRecPtr.p->connectionPtr); + buildIndxRef->setRequestType(BuildIndxReq::RT_TRIX); + buildIndxRef->setIndexType(subRecPtr.p->indexType); + buildIndxRef->setTableId(subRecPtr.p->sourceTableId); + buildIndxRef->setIndexId(subRecPtr.p->targetTableId); + buildIndxRef->setErrorCode(subRecPtr.p->errorCode); + + sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXREF, signal, + BuildIndxRef::SignalLength , JBB); + } + + // Release subscription record + subRecPtr.p->attributeOrder.release(); + c_theSubscriptions.release(subRecPtr.i); +} + +void Trix::checkParallelism(Signal* signal, SubscriptionRecord* subRec) +{ + if ((subRec->pendingSubSyncContinueConf) && + (subRec->expectedConf < subRec->parallelism)) { + SubSyncContinueConf * subSyncContinueConf = + (SubSyncContinueConf *) signal->getDataPtrSend(); + subSyncContinueConf->subscriptionId = subRec->subscriptionId; + subSyncContinueConf->subscriptionKey = subRec->subscriptionKey; + sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal, + SubSyncContinueConf::SignalLength , JBB); + subRec->pendingSubSyncContinueConf = false; + } +} + +BLOCK_FUNCTIONS(Trix) + +template void append(DataBuffer<15>&,SegmentedSectionPtr,SectionSegmentPool&); |