summaryrefslogtreecommitdiff
path: root/storage/ndb/src/kernel/blocks/trix/Trix.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/kernel/blocks/trix/Trix.cpp')
-rw-r--r--storage/ndb/src/kernel/blocks/trix/Trix.cpp967
1 files changed, 967 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/blocks/trix/Trix.cpp b/storage/ndb/src/kernel/blocks/trix/Trix.cpp
new file mode 100644
index 00000000000..cd11cb4d575
--- /dev/null
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.cpp
@@ -0,0 +1,967 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "Trix.hpp"
+
+#include <string.h>
+#include <kernel_types.h>
+#include <NdbOut.hpp>
+
+#include <signaldata/ReadNodesConf.hpp>
+#include <signaldata/NodeFailRep.hpp>
+#include <signaldata/DumpStateOrd.hpp>
+#include <signaldata/GetTabInfo.hpp>
+#include <signaldata/DictTabInfo.hpp>
+#include <signaldata/BuildIndx.hpp>
+#include <signaldata/SumaImpl.hpp>
+#include <signaldata/UtilPrepare.hpp>
+#include <signaldata/UtilExecute.hpp>
+#include <signaldata/UtilRelease.hpp>
+#include <SectionReader.hpp>
+#include <AttributeHeader.hpp>
+
+#define CONSTRAINT_VIOLATION 893
+
+#define DEBUG(x) { ndbout << "TRIX::" << x << endl; }
+
+/**
+ *
+ */
+Trix::Trix(const Configuration & conf) :
+ SimulatedBlock(TRIX, conf),
+ c_theNodes(c_theNodeRecPool),
+ c_masterNodeId(0),
+ c_masterTrixRef(0),
+ c_noNodesFailed(0),
+ c_noActiveNodes(0),
+ c_theSubscriptions(c_theSubscriptionRecPool)
+{
+ BLOCK_CONSTRUCTOR(Trix);
+
+ // Add received signals
+ addRecSignal(GSN_STTOR, &Trix::execSTTOR);
+ addRecSignal(GSN_NDB_STTOR, &Trix::execNDB_STTOR); // Forwarded from DICT
+ addRecSignal(GSN_READ_NODESCONF, &Trix::execREAD_NODESCONF);
+ addRecSignal(GSN_READ_NODESREF, &Trix::execREAD_NODESREF);
+ addRecSignal(GSN_NODE_FAILREP, &Trix::execNODE_FAILREP);
+ addRecSignal(GSN_INCL_NODEREQ, &Trix::execINCL_NODEREQ);
+ addRecSignal(GSN_DUMP_STATE_ORD, &Trix::execDUMP_STATE_ORD);
+
+ // Index build
+ addRecSignal(GSN_BUILDINDXREQ, &Trix::execBUILDINDXREQ);
+ // Dump testing
+ addRecSignal(GSN_BUILDINDXCONF, &Trix::execBUILDINDXCONF);
+ addRecSignal(GSN_BUILDINDXREF, &Trix::execBUILDINDXREF);
+
+
+ addRecSignal(GSN_UTIL_PREPARE_CONF, &Trix::execUTIL_PREPARE_CONF);
+ addRecSignal(GSN_UTIL_PREPARE_REF, &Trix::execUTIL_PREPARE_REF);
+ addRecSignal(GSN_UTIL_EXECUTE_CONF, &Trix::execUTIL_EXECUTE_CONF);
+ addRecSignal(GSN_UTIL_EXECUTE_REF, &Trix::execUTIL_EXECUTE_REF);
+ addRecSignal(GSN_UTIL_RELEASE_CONF, &Trix::execUTIL_RELEASE_CONF);
+ addRecSignal(GSN_UTIL_RELEASE_REF, &Trix::execUTIL_RELEASE_REF);
+
+
+ // Suma signals
+ addRecSignal(GSN_SUB_CREATE_CONF, &Trix::execSUB_CREATE_CONF);
+ addRecSignal(GSN_SUB_CREATE_REF, &Trix::execSUB_CREATE_REF);
+ addRecSignal(GSN_SUB_REMOVE_CONF, &Trix::execSUB_REMOVE_CONF);
+ addRecSignal(GSN_SUB_REMOVE_REF, &Trix::execSUB_REMOVE_REF);
+ addRecSignal(GSN_SUB_SYNC_CONF, &Trix::execSUB_SYNC_CONF);
+ addRecSignal(GSN_SUB_SYNC_REF, &Trix::execSUB_SYNC_REF);
+ addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, &Trix::execSUB_SYNC_CONTINUE_REQ);
+ addRecSignal(GSN_SUB_META_DATA, &Trix::execSUB_META_DATA);
+ addRecSignal(GSN_SUB_TABLE_DATA, &Trix::execSUB_TABLE_DATA);
+
+ // Allocate pool sizes
+ c_theAttrOrderBufferPool.setSize(100);
+ c_theSubscriptionRecPool.setSize(100);
+
+ ArrayList<SubscriptionRecord> subscriptions(c_theSubscriptionRecPool);
+ SubscriptionRecPtr subptr;
+ while(subscriptions.seize(subptr) == true) {
+ new (subptr.p) SubscriptionRecord(c_theAttrOrderBufferPool);
+ }
+ subscriptions.release();
+}
+
+/**
+ *
+ */
+Trix::~Trix()
+{
+}
+
+/**
+ *
+ */
+void Trix::execSTTOR(Signal* signal)
+{
+ jamEntry();
+
+ //const Uint32 startphase = signal->theData[1];
+ const Uint32 theSignalKey = signal->theData[6];
+
+ signal->theData[0] = theSignalKey;
+ signal->theData[3] = 1;
+ signal->theData[4] = 255; // No more start phases from missra
+ sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB);
+ return;
+}//Trix::execSTTOR()
+
+/**
+ *
+ */
+void Trix::execNDB_STTOR(Signal* signal)
+{
+ jamEntry();
+ BlockReference ndbcntrRef = signal->theData[0];
+ Uint16 startphase = signal->theData[2]; /* RESTART PHASE */
+ Uint16 mynode = signal->theData[1];
+ //Uint16 restarttype = signal->theData[3];
+ //UintR configInfo1 = signal->theData[6]; /* CONFIGRATION INFO PART 1 */
+ //UintR configInfo2 = signal->theData[7]; /* CONFIGRATION INFO PART 2 */
+ switch (startphase) {
+ case 3:
+ jam();
+ /* SYMBOLIC START PHASE 4 */
+ /* ABSOLUTE PHASE 5 */
+ /* REQUEST NODE IDENTITIES FROM DBDIH */
+ signal->theData[0] = calcTrixBlockRef(mynode);
+ sendSignal(ndbcntrRef, GSN_READ_NODESREQ, signal, 1, JBB);
+ return;
+ break;
+ case 6:
+ break;
+ default:
+ break;
+ }
+}
+
+/**
+ *
+ */
+void Trix::execREAD_NODESCONF(Signal* signal)
+{
+ jamEntry();
+
+ ReadNodesConf * const readNodes = (ReadNodesConf *)signal->getDataPtr();
+ //Uint32 noOfNodes = readNodes->noOfNodes;
+ NodeRecPtr nodeRecPtr;
+
+ c_masterNodeId = readNodes->masterNodeId;
+ c_masterTrixRef = RNIL;
+ c_noNodesFailed = 0;
+
+ for(unsigned i = 0; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(NodeBitmask::get(readNodes->allNodes, i)) {
+ // Node is defined
+ jam();
+ ndbrequire(c_theNodes.seizeId(nodeRecPtr, i));
+ nodeRecPtr.p->trixRef = calcTrixBlockRef(i);
+ if (i == c_masterNodeId) {
+ c_masterTrixRef = nodeRecPtr.p->trixRef;
+ }
+ if(NodeBitmask::get(readNodes->inactiveNodes, i)){
+ // Node is not active
+ jam();
+ /**-----------------------------------------------------------------
+ * THIS NODE IS DEFINED IN THE CLUSTER BUT IS NOT ALIVE CURRENTLY.
+ * WE ADD THE NODE TO THE SET OF FAILED NODES AND ALSO SET THE
+ * BLOCKSTATE TO BUSY TO AVOID ADDING TRIGGERS OR INDEXES WHILE
+ * NOT ALL NODES ARE ALIVE.
+ *------------------------------------------------------------------*/
+ arrGuard(c_noNodesFailed, MAX_NDB_NODES);
+ nodeRecPtr.p->alive = false;
+ c_noNodesFailed++;
+ c_blockState = Trix::NODE_FAILURE;
+ }
+ else {
+ // Node is active
+ jam();
+ c_noActiveNodes++;
+ nodeRecPtr.p->alive = true;
+ }
+ }
+ }
+ if (c_noNodesFailed == 0) {
+ c_blockState = Trix::STARTED;
+ }
+}
+
+/**
+ *
+ */
+void Trix::execREAD_NODESREF(Signal* signal)
+{
+ // NYI
+}
+
+/**
+ *
+ */
+void Trix::execNODE_FAILREP(Signal* signal)
+{
+ jamEntry();
+ NodeFailRep * const nodeFail = (NodeFailRep *) signal->getDataPtr();
+
+ //Uint32 failureNr = nodeFail->failNo;
+ //Uint32 numberNodes = nodeFail->noOfNodes;
+ Uint32 masterNodeId = nodeFail->masterNodeId;
+
+ NodeRecPtr nodeRecPtr;
+
+ for(c_theNodes.first(nodeRecPtr);
+ nodeRecPtr.i != RNIL;
+ c_theNodes.next(nodeRecPtr)) {
+ if(NodeBitmask::get(nodeFail->theNodes, nodeRecPtr.i)) {
+ nodeRecPtr.p->alive = false;
+ c_noNodesFailed++;
+ c_noActiveNodes--;
+ }
+ }
+ if (c_masterNodeId != masterNodeId) {
+ c_masterNodeId = masterNodeId;
+ NodeRecord* nodeRec = c_theNodes.getPtr(masterNodeId);
+ c_masterTrixRef = nodeRec->trixRef;
+ }
+}
+
+/**
+ *
+ */
+void Trix::execINCL_NODEREQ(Signal* signal)
+{
+ jamEntry();
+ UintR node_id = signal->theData[1];
+ NodeRecord* nodeRec = c_theNodes.getPtr(node_id);
+ nodeRec->alive = true;
+ c_noNodesFailed--;
+ c_noActiveNodes++;
+ nodeRec->trixRef = calcTrixBlockRef(node_id);
+ if (c_noNodesFailed == 0) {
+ c_blockState = Trix::STARTED;
+ }
+}
+
+// Debugging
+void
+Trix::execDUMP_STATE_ORD(Signal* signal)
+{
+ jamEntry();
+
+ DumpStateOrd * dumpStateOrd = (DumpStateOrd *)signal->getDataPtr();
+
+ switch(dumpStateOrd->args[0]) {
+ case(300): {// ok
+ // index2 -T; index2 -I -n10000; index2 -c
+ // all dump 300 0 0 0 0 0 4 2
+ // select_count INDEX0000
+ BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
+
+ MEMCOPY_NO_WORDS(buildIndxReq,
+ signal->theData + 1,
+ BuildIndxReq::SignalLength);
+ buildIndxReq->setUserRef(reference()); // return to me
+ buildIndxReq->setParallelism(10);
+ Uint32 indexColumns[1] = {1};
+ Uint32 keyColumns[1] = {0};
+ struct LinearSectionPtr orderPtr[2];
+ buildIndxReq->setColumnOrder(indexColumns, 1, keyColumns, 1, orderPtr);
+ sendSignal(reference(),
+ GSN_BUILDINDXREQ,
+ signal,
+ BuildIndxReq::SignalLength,
+ JBB,
+ orderPtr,
+ BuildIndxReq::NoOfSections);
+ break;
+ }
+ case(301): { // ok
+ // index2 -T; index2 -I -n10000; index2 -c -p
+ // all dump 301 0 0 0 0 0 4 2
+ // select_count INDEX0000
+ BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
+
+ MEMCOPY_NO_WORDS(buildIndxReq,
+ signal->theData + 1,
+ BuildIndxReq::SignalLength);
+ buildIndxReq->setUserRef(reference()); // return to me
+ buildIndxReq->setParallelism(10);
+ Uint32 indexColumns[2] = {0, 1};
+ Uint32 keyColumns[1] = {0};
+ struct LinearSectionPtr orderPtr[2];
+ buildIndxReq->setColumnOrder(indexColumns, 2, keyColumns, 1, orderPtr);
+ sendSignal(reference(),
+ GSN_BUILDINDXREQ,
+ signal,
+ BuildIndxReq::SignalLength,
+ JBB,
+ orderPtr,
+ BuildIndxReq::NoOfSections);
+ break;
+ }
+ case(302): { // ok
+ // index -T; index -I -n1000; index -c -p
+ // all dump 302 0 0 0 0 0 4 2
+ // select_count PNUMINDEX0000
+ BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
+
+ MEMCOPY_NO_WORDS(buildIndxReq,
+ signal->theData + 1,
+ BuildIndxReq::SignalLength);
+ buildIndxReq->setUserRef(reference()); // return to me
+ buildIndxReq->setParallelism(10);
+ Uint32 indexColumns[3] = {0, 3, 5};
+ Uint32 keyColumns[1] = {0};
+ struct LinearSectionPtr orderPtr[2];
+ buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 1, orderPtr);
+ sendSignal(reference(),
+ GSN_BUILDINDXREQ,
+ signal,
+ BuildIndxReq::SignalLength,
+ JBB,
+ orderPtr,
+ BuildIndxReq::NoOfSections);
+ break;
+ }
+ case(303): { // ok
+ // index -T -2; index -I -2 -n1000; index -c -p
+ // all dump 303 0 0 0 0 0 4 2
+ // select_count PNUMINDEX0000
+ BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
+
+ MEMCOPY_NO_WORDS(buildIndxReq,
+ signal->theData + 1,
+ BuildIndxReq::SignalLength);
+ buildIndxReq->setUserRef(reference()); // return to me
+ buildIndxReq->setParallelism(10);
+ Uint32 indexColumns[3] = {0, 3, 5};
+ Uint32 keyColumns[2] = {0, 1};
+ struct LinearSectionPtr orderPtr[2];
+ buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 2, orderPtr);
+ sendSignal(reference(),
+ GSN_BUILDINDXREQ,
+ signal,
+ BuildIndxReq::SignalLength,
+ JBB,
+ orderPtr,
+ BuildIndxReq::NoOfSections);
+ break;
+ }
+ case(304): { // ok
+ // index -T -L; index -I -L -n1000; index -c -p
+ // all dump 304 0 0 0 0 0 4 2
+ // select_count PNUMINDEX0000
+ BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
+
+ MEMCOPY_NO_WORDS(buildIndxReq,
+ signal->theData + 1,
+ BuildIndxReq::SignalLength);
+ buildIndxReq->setUserRef(reference()); // return to me
+ buildIndxReq->setParallelism(10);
+ Uint32 indexColumns[3] = {0, 3, 5};
+ Uint32 keyColumns[1] = {0};
+ struct LinearSectionPtr orderPtr[2];
+ buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 1, orderPtr);
+ sendSignal(reference(),
+ GSN_BUILDINDXREQ,
+ signal,
+ BuildIndxReq::SignalLength,
+ JBB,
+ orderPtr,
+ BuildIndxReq::NoOfSections);
+ break;
+ }
+ case(305): { // ok
+ // index -T -2 -L; index -I -2 -L -n1000; index -c -p
+ // all dump 305 0 0 0 0 0 4 2
+ // select_count PNUMINDEX0000
+ BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend();
+
+ MEMCOPY_NO_WORDS(buildIndxReq,
+ signal->theData + 1,
+ BuildIndxReq::SignalLength);
+ buildIndxReq->setUserRef(reference()); // return to me
+ buildIndxReq->setParallelism(10);
+ Uint32 indexColumns[3] = {0, 3, 5};
+ Uint32 keyColumns[2] = {0, 1};
+ struct LinearSectionPtr orderPtr[2];
+ buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 2, orderPtr);
+ sendSignal(reference(),
+ GSN_BUILDINDXREQ,
+ signal,
+ BuildIndxReq::SignalLength,
+ JBB,
+ orderPtr,
+ BuildIndxReq::NoOfSections);
+ break;
+ }
+ default: {
+ // Ignore
+ }
+ }
+}
+
+// Build index
+/**
+ *
+ */
+void Trix:: execBUILDINDXREQ(Signal* signal)
+{
+ jamEntry();
+ BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtr();
+
+ // Seize a subscription record
+ SubscriptionRecPtr subRecPtr;
+ SubscriptionRecord* subRec;
+
+ if (!c_theSubscriptions.seizeId(subRecPtr, buildIndxReq->getBuildId())) {
+ // Failed to allocate subscription record
+ BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend();
+
+ buildIndxRef->setErrorCode(BuildIndxRef::AllocationFailure);
+ releaseSections(signal);
+ sendSignal(buildIndxReq->getUserRef(),
+ GSN_BUILDINDXREF, signal, BuildIndxRef::SignalLength, JBB);
+ return;
+ }
+ subRec = subRecPtr.p;
+ subRec->errorCode = BuildIndxRef::NoError;
+ subRec->userReference = buildIndxReq->getUserRef();
+ subRec->connectionPtr = buildIndxReq->getConnectionPtr();
+ subRec->subscriptionId = buildIndxReq->getBuildId();
+ subRec->subscriptionKey = buildIndxReq->getBuildKey();
+ subRec->indexType = buildIndxReq->getIndexType();
+ subRec->sourceTableId = buildIndxReq->getTableId();
+ subRec->targetTableId = buildIndxReq->getIndexId();
+ subRec->parallelism = buildIndxReq->getParallelism();
+ subRec->expectedConf = 0;
+ subRec->subscriptionCreated = false;
+ subRec->pendingSubSyncContinueConf = false;
+ subRec->prepareId = RNIL;
+
+ // Get column order segments
+ Uint32 noOfSections = signal->getNoOfSections();
+ if(noOfSections > 0) {
+ SegmentedSectionPtr ptr;
+ signal->getSection(ptr, BuildIndxReq::INDEX_COLUMNS);
+ append(subRec->attributeOrder, ptr, getSectionSegmentPool());
+ subRec->noOfIndexColumns = ptr.sz;
+ }
+ if(noOfSections > 1) {
+ SegmentedSectionPtr ptr;
+ signal->getSection(ptr, BuildIndxReq::KEY_COLUMNS);
+ append(subRec->attributeOrder, ptr, getSectionSegmentPool());
+ subRec->noOfKeyColumns = ptr.sz;
+ }
+#if 0
+ // Debugging
+ printf("Trix:: execBUILDINDXREQ: Attribute order:\n");
+ subRec->attributeOrder.print(stdout);
+#endif
+ releaseSections(signal);
+ prepareInsertTransactions(signal, subRecPtr);
+}
+
+void Trix:: execBUILDINDXCONF(Signal* signal)
+{
+ printf("Trix:: execBUILDINDXCONF\n");
+}
+
+void Trix:: execBUILDINDXREF(Signal* signal)
+{
+ printf("Trix:: execBUILDINDXREF\n");
+}
+
+void Trix::execUTIL_PREPARE_CONF(Signal* signal)
+{
+ jamEntry();
+ UtilPrepareConf * utilPrepareConf = (UtilPrepareConf *)signal->getDataPtr();
+ SubscriptionRecPtr subRecPtr;
+ SubscriptionRecord* subRec;
+
+ subRecPtr.i = utilPrepareConf->senderData;
+ if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
+ printf("Trix::execUTIL_PREPARE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
+ return;
+ }
+ subRecPtr.p = subRec;
+ subRec->prepareId = utilPrepareConf->prepareId;
+ setupSubscription(signal, subRecPtr);
+}
+
+void Trix::execUTIL_PREPARE_REF(Signal* signal)
+{
+ jamEntry();
+ UtilPrepareRef * utilPrepareRef = (UtilPrepareRef *)signal->getDataPtr();
+ SubscriptionRecPtr subRecPtr;
+ SubscriptionRecord* subRec;
+
+ subRecPtr.i = utilPrepareRef->senderData;
+ if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
+ printf("Trix::execUTIL_PREPARE_REF: Failed to find subscription data %u\n", subRecPtr.i);
+ return;
+ }
+ subRecPtr.p = subRec;
+ subRec->errorCode = BuildIndxRef::InternalError;
+}
+
+void Trix::execUTIL_EXECUTE_CONF(Signal* signal)
+{
+ jamEntry();
+ UtilExecuteConf * utilExecuteConf = (UtilExecuteConf *)signal->getDataPtr();
+ SubscriptionRecPtr subRecPtr;
+ SubscriptionRecord* subRec;
+
+ subRecPtr.i = utilExecuteConf->senderData;
+ if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
+ printf("rix::execUTIL_EXECUTE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
+ return;
+ }
+ subRecPtr.p = subRec;
+ subRec->expectedConf--;
+ checkParallelism(signal, subRec);
+ if (subRec->expectedConf == 0)
+ buildComplete(signal, subRecPtr);
+}
+
+void Trix::execUTIL_EXECUTE_REF(Signal* signal)
+{
+ jamEntry();
+ UtilExecuteRef * utilExecuteRef = (UtilExecuteRef *)signal->getDataPtr();
+ SubscriptionRecPtr subRecPtr;
+ SubscriptionRecord* subRec;
+
+ subRecPtr.i = utilExecuteRef->senderData;
+ if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
+ printf("Trix::execUTIL_EXECUTE_REF: Failed to find subscription data %u\n", subRecPtr.i);
+ return;
+ }
+ subRecPtr.p = subRec;
+ ndbrequire(utilExecuteRef->errorCode == UtilExecuteRef::TCError);
+ if(utilExecuteRef->TCErrorCode == CONSTRAINT_VIOLATION)
+ buildFailed(signal, subRecPtr, BuildIndxRef::IndexNotUnique);
+ else
+ buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
+}
+
+void Trix::execSUB_CREATE_CONF(Signal* signal)
+{
+ jamEntry();
+ SubCreateConf * subCreateConf = (SubCreateConf *)signal->getDataPtr();
+ SubscriptionRecPtr subRecPtr;
+ SubscriptionRecord* subRec;
+
+ subRecPtr.i = subCreateConf->subscriberData;
+ if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
+ printf("Trix::execSUB_CREATE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
+ return;
+ }
+ ndbrequire(subRec->subscriptionId == subCreateConf->subscriptionId);
+ ndbrequire(subRec->subscriptionKey == subCreateConf->subscriptionKey);
+ subRec->subscriptionCreated = true;
+ subRecPtr.p = subRec;
+ setupTableScan(signal, subRecPtr);
+}
+
+void Trix::execSUB_CREATE_REF(Signal* signal)
+{
+ jamEntry();
+ // THIS SIGNAL IS NEVER SENT FROM SUMA?
+ /*
+ SubCreateRef * subCreateRef = (SubCreateRef *)signal->getDataPtr();
+ SubscriptionRecPtr subRecPtr;
+ SubscriptionRecord* subRec;
+
+ subRecPtr.i = subCreateRef->subscriberData;
+ if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
+ printf("Trix::execSUB_CREATE_REF: Failed to find subscription data %u\n", subRecPtr.i);
+ return;
+ }
+ subRecPtr.p = subRec;
+ buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
+ */
+}
+
+void Trix::execSUB_SYNC_CONF(Signal* signal)
+{
+ jamEntry();
+ SubSyncConf * subSyncConf = (SubSyncConf *)signal->getDataPtr();
+ SubscriptionRecPtr subRecPtr;
+ SubscriptionRecord* subRec;
+
+ subRecPtr.i = subSyncConf->subscriberData;
+ if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
+ printf("Trix::execSUB_SYNC_CONF: Failed to find subscription data %u\n", subRecPtr.i);
+ return;
+ }
+ ndbrequire(subRec->subscriptionId == subSyncConf->subscriptionId);
+ ndbrequire(subRec->subscriptionKey == subSyncConf->subscriptionKey);
+ subRecPtr.p = subRec;
+ if(subSyncConf->part == SubscriptionData::MetaData)
+ startTableScan(signal, subRecPtr);
+ else {
+ subRec->expectedConf--;
+ checkParallelism(signal, subRec);
+ if (subRec->expectedConf == 0)
+ buildComplete(signal, subRecPtr);
+ }
+}
+
+void Trix::execSUB_SYNC_REF(Signal* signal)
+{
+ jamEntry();
+ SubSyncRef * subSyncRef = (SubSyncRef *)signal->getDataPtr();
+ SubscriptionRecPtr subRecPtr;
+ SubscriptionRecord* subRec;
+
+ subRecPtr.i = subSyncRef->subscriberData;
+ if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
+ printf("Trix::execSUB_SYNC_REF: Failed to find subscription data %u\n", subRecPtr.i);
+ return;
+ }
+ subRecPtr.p = subRec;
+ buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
+}
+
+void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal)
+{
+ SubSyncContinueReq * subSyncContinueReq =
+ (SubSyncContinueReq *) signal->getDataPtr();
+
+ SubscriptionRecPtr subRecPtr;
+ SubscriptionRecord* subRec;
+ subRecPtr.i = subSyncContinueReq->subscriberData;
+ if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
+ printf("Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %u\n", subRecPtr.i);
+ return;
+ }
+ subRecPtr.p = subRec;
+ subRec->pendingSubSyncContinueConf = true;
+ checkParallelism(signal, subRec);
+}
+
+void Trix::execSUB_META_DATA(Signal* signal)
+{
+ jamEntry();
+}
+
+void Trix::execSUB_TABLE_DATA(Signal* signal)
+{
+ jamEntry();
+ SubTableData * subTableData = (SubTableData *)signal->getDataPtr();
+ SubscriptionRecPtr subRecPtr;
+ SubscriptionRecord* subRec;
+ subRecPtr.i = subTableData->subscriberData;
+ if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
+ printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %u\n", subRecPtr.i);
+ return;
+ }
+ subRecPtr.p = subRec;
+ SegmentedSectionPtr headerPtr, dataPtr;
+ if (!signal->getSection(headerPtr, 0)) {
+ printf("Trix::execSUB_TABLE_DATA: Failed to get header section\n");
+ }
+ if (!signal->getSection(dataPtr, 1)) {
+ printf("Trix::execSUB_TABLE_DATA: Failed to get data section\n");
+ }
+ executeInsertTransaction(signal, subRecPtr, headerPtr, dataPtr);
+}
+
+void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr)
+{
+ Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2];
+ SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend();
+ SubscriptionRecord* subRec = subRecPtr.p;
+// Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns;
+ AttrOrderBuffer::DataBufferIterator iter;
+ Uint32 i = 0;
+
+ jam();
+ bool moreAttributes = subRec->attributeOrder.first(iter);
+ while (moreAttributes) {
+ attributeList[i++] = *iter.data;
+ moreAttributes = subRec->attributeOrder.next(iter);
+ }
+ // Merge index and key column segments
+ struct LinearSectionPtr orderPtr[3];
+ orderPtr[0].p = attributeList;
+ orderPtr[0].sz = subRec->attributeOrder.getSize();
+
+
+ subCreateReq->subscriberRef = reference();
+ subCreateReq->subscriberData = subRecPtr.i;
+ subCreateReq->subscriptionId = subRec->subscriptionId;
+ subCreateReq->subscriptionKey = subRec->subscriptionKey;
+ subCreateReq->tableId = subRec->sourceTableId;
+ subCreateReq->subscriptionType = SubCreateReq::SingleTableScan;
+
+ sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ,
+ signal, SubCreateReq::SignalLength+1, JBB, orderPtr, 1);
+}
+
+void Trix::setupTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
+{
+ SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
+
+ jam();
+ subSyncReq->subscriptionId = subRecPtr.i;
+ subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey;
+ subSyncReq->part = SubscriptionData::MetaData;
+ sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
+ signal, SubSyncReq::SignalLength, JBB);
+}
+
+void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
+{
+ jam();
+ subRecPtr.p->expectedConf = 1;
+ SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
+
+ subSyncReq->subscriptionId = subRecPtr.i;
+ subSyncReq->subscriptionKey = subRecPtr.p->subscriptionKey;
+ subSyncReq->part = SubscriptionData::TableData;
+ sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
+ signal, SubSyncReq::SignalLength, JBB);
+}
+
+void Trix::prepareInsertTransactions(Signal* signal,
+ SubscriptionRecPtr subRecPtr)
+{
+ SubscriptionRecord* subRec = subRecPtr.p;
+ UtilPrepareReq * utilPrepareReq =
+ (UtilPrepareReq *)signal->getDataPtrSend();
+
+ jam();
+ utilPrepareReq->senderRef = reference();
+ utilPrepareReq->senderData = subRecPtr.i;
+
+ const Uint32 pageSizeInWords = 128;
+ Uint32 propPage[pageSizeInWords];
+ LinearWriter w(&propPage[0],128);
+ w.first();
+ w.add(UtilPrepareReq::NoOfOperations, 1);
+ w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
+ w.add(UtilPrepareReq::TableId, subRec->targetTableId);
+ // Add index attributes in increasing order and one PK attribute
+ for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++)
+ w.add(UtilPrepareReq::AttributeId, i);
+
+#if 0
+ // Debugging
+ SimplePropertiesLinearReader reader(propPage, w.getWordsUsed());
+ printf("Trix::prepareInsertTransactions: Sent SimpleProperties:\n");
+ reader.printAll(ndbout);
+#endif
+
+ struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
+ sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
+ sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
+ sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
+ UtilPrepareReq::SignalLength, JBB,
+ sectionsPtr, UtilPrepareReq::NoOfSections);
+}
+
+void Trix::executeInsertTransaction(Signal* signal,
+ SubscriptionRecPtr subRecPtr,
+ SegmentedSectionPtr headerPtr,
+ SegmentedSectionPtr dataPtr)
+{
+ jam();
+ SubscriptionRecord* subRec = subRecPtr.p;
+ UtilExecuteReq * utilExecuteReq =
+ (UtilExecuteReq *)signal->getDataPtrSend();
+ Uint32* headerBuffer = signal->theData + 25;
+ Uint32* dataBuffer = headerBuffer + headerPtr.sz;
+
+ utilExecuteReq->senderRef = reference();
+ utilExecuteReq->senderData = subRecPtr.i;
+ utilExecuteReq->prepareId = subRec->prepareId;
+#if 0
+ printf("Header size %u\n", headerPtr.sz);
+ for(int i = 0; i < headerPtr.sz; i++)
+ printf("H'%.8x ", headerBuffer[i]);
+ printf("\n");
+
+ printf("Data size %u\n", dataPtr.sz);
+ for(int i = 0; i < dataPtr.sz; i++)
+ printf("H'%.8x ", dataBuffer[i]);
+ printf("\n");
+#endif
+ // Save scan result in linear buffers
+ copy(headerBuffer, headerPtr);
+ copy(dataBuffer, dataPtr);
+
+ // Calculate packed key size
+ Uint32 noOfKeyData = 0;
+ for(Uint32 i = 0; i < headerPtr.sz; i++) {
+ AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i;
+
+ // Filter out NULL attributes
+ if (keyAttrHead->isNULL())
+ return;
+
+ if (i < subRec->noOfIndexColumns)
+ // Renumber index attributes in consequtive order
+ keyAttrHead->setAttributeId(i);
+ else
+ // Calculate total size of PK attribute
+ noOfKeyData += keyAttrHead->getDataSize();
+ }
+ // Increase expected CONF count
+ subRec->expectedConf++;
+
+ // Pack key attributes
+ AttributeHeader::init(headerBuffer + subRec->noOfIndexColumns,
+ subRec->noOfIndexColumns,
+ noOfKeyData);
+
+ struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections];
+ sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;
+ sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz =
+ subRec->noOfIndexColumns + 1;
+ sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;
+ sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;
+ sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
+ UtilExecuteReq::SignalLength, JBB,
+ sectionsPtr, UtilExecuteReq::NoOfSections);
+}
+
+void Trix::buildComplete(Signal* signal, SubscriptionRecPtr subRecPtr)
+{
+ SubRemoveReq * const req = (SubRemoveReq*)signal->getDataPtrSend();
+ req->senderRef = reference();
+ req->senderData = subRecPtr.i;
+ req->subscriptionId = subRecPtr.p->subscriptionId;
+ req->subscriptionKey = subRecPtr.p->subscriptionKey;
+ sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal,
+ SubRemoveReq::SignalLength, JBB);
+}
+
+void Trix::buildFailed(Signal* signal,
+ SubscriptionRecPtr subRecPtr,
+ BuildIndxRef::ErrorCode errorCode)
+{
+ SubscriptionRecord* subRec = subRecPtr.p;
+
+ subRec->errorCode = errorCode;
+ // Continue accumulating since we currently cannot stop SUMA
+ subRec->expectedConf--;
+ checkParallelism(signal, subRec);
+ if (subRec->expectedConf == 0)
+ buildComplete(signal, subRecPtr);
+}
+
+void
+Trix::execSUB_REMOVE_REF(Signal* signal){
+ jamEntry();
+ //@todo
+ ndbrequire(false);
+}
+
+void
+Trix::execSUB_REMOVE_CONF(Signal* signal){
+ jamEntry();
+
+ SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
+
+ SubscriptionRecPtr subRecPtr;
+ c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
+
+ if(subRecPtr.p->prepareId != RNIL){
+ jam();
+
+ UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();
+ req->prepareId = subRecPtr.p->prepareId;
+ req->senderData = subRecPtr.i;
+
+ sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal,
+ UtilReleaseReq::SignalLength , JBB);
+ return;
+ }
+
+ {
+ UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
+ conf->senderData = subRecPtr.i;
+ execUTIL_RELEASE_CONF(signal);
+ }
+}
+
+void
+Trix::execUTIL_RELEASE_REF(Signal* signal){
+ jamEntry();
+ ndbrequire(false);
+}
+
+void
+Trix::execUTIL_RELEASE_CONF(Signal* signal){
+
+ UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
+
+ SubscriptionRecPtr subRecPtr;
+ c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
+
+ if(subRecPtr.p->errorCode == BuildIndxRef::NoError){
+ // Build is complete, reply to original sender
+ BuildIndxConf * buildIndxConf = (BuildIndxConf *)signal->getDataPtrSend();
+ buildIndxConf->setUserRef(subRecPtr.p->userReference);
+ buildIndxConf->setConnectionPtr(subRecPtr.p->connectionPtr);
+ buildIndxConf->setRequestType(BuildIndxReq::RT_TRIX);
+ buildIndxConf->setIndexType(subRecPtr.p->indexType);
+ buildIndxConf->setTableId(subRecPtr.p->sourceTableId);
+ buildIndxConf->setIndexId(subRecPtr.p->targetTableId);
+
+ sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXCONF, signal,
+ BuildIndxConf::SignalLength , JBB);
+ } else {
+ // Build failed, reply to original sender
+ BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend();
+ buildIndxRef->setUserRef(subRecPtr.p->userReference);
+ buildIndxRef->setConnectionPtr(subRecPtr.p->connectionPtr);
+ buildIndxRef->setRequestType(BuildIndxReq::RT_TRIX);
+ buildIndxRef->setIndexType(subRecPtr.p->indexType);
+ buildIndxRef->setTableId(subRecPtr.p->sourceTableId);
+ buildIndxRef->setIndexId(subRecPtr.p->targetTableId);
+ buildIndxRef->setErrorCode(subRecPtr.p->errorCode);
+
+ sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXREF, signal,
+ BuildIndxRef::SignalLength , JBB);
+ }
+
+ // Release subscription record
+ subRecPtr.p->attributeOrder.release();
+ c_theSubscriptions.release(subRecPtr.i);
+}
+
+void Trix::checkParallelism(Signal* signal, SubscriptionRecord* subRec)
+{
+ if ((subRec->pendingSubSyncContinueConf) &&
+ (subRec->expectedConf < subRec->parallelism)) {
+ SubSyncContinueConf * subSyncContinueConf =
+ (SubSyncContinueConf *) signal->getDataPtrSend();
+ subSyncContinueConf->subscriptionId = subRec->subscriptionId;
+ subSyncContinueConf->subscriptionKey = subRec->subscriptionKey;
+ sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal,
+ SubSyncContinueConf::SignalLength , JBB);
+ subRec->pendingSubSyncContinueConf = false;
+ }
+}
+
+BLOCK_FUNCTIONS(Trix)
+
+template void append(DataBuffer<15>&,SegmentedSectionPtr,SectionSegmentPool&);