summaryrefslogtreecommitdiff
path: root/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp')
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp2052
1 files changed, 2052 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
new file mode 100644
index 00000000000..761f959acdc
--- /dev/null
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
@@ -0,0 +1,2052 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#define DBTUP_C
+#include "Dbtup.hpp"
+#include <RefConvert.hpp>
+#include <ndb_limits.h>
+#include <pc.hpp>
+#include <AttributeDescriptor.hpp>
+#include "AttributeOffset.hpp"
+#include <AttributeHeader.hpp>
+#include <Interpreter.hpp>
+#include <signaldata/TupCommit.hpp>
+#include <signaldata/TupKey.hpp>
+#include <NdbSqlUtil.hpp>
+
+/* ----------------------------------------------------------------- */
+/* ----------- INIT_STORED_OPERATIONREC -------------- */
+/* ----------------------------------------------------------------- */
+int Dbtup::initStoredOperationrec(Operationrec* const regOperPtr,
+ Uint32 storedId)
+{
+ jam();
+ StoredProcPtr storedPtr;
+ c_storedProcPool.getPtr(storedPtr, storedId);
+ if (storedPtr.i != RNIL) {
+ if (storedPtr.p->storedCode == ZSCAN_PROCEDURE) {
+ storedPtr.p->storedCounter++;
+ regOperPtr->firstAttrinbufrec = storedPtr.p->storedLinkFirst;
+ regOperPtr->lastAttrinbufrec = storedPtr.p->storedLinkLast;
+ regOperPtr->attrinbufLen = storedPtr.p->storedProcLength;
+ regOperPtr->currentAttrinbufLen = storedPtr.p->storedProcLength;
+ return ZOK;
+ }//if
+ }//if
+ terrorCode = ZSTORED_PROC_ID_ERROR;
+ return terrorCode;
+}//Dbtup::initStoredOperationrec()
+
+void Dbtup::copyAttrinfo(Signal* signal,
+ Operationrec * const regOperPtr,
+ Uint32* inBuffer)
+{
+ AttrbufrecPtr copyAttrBufPtr;
+ Uint32 RnoOfAttrBufrec = cnoOfAttrbufrec;
+ int RbufLen;
+ Uint32 RinBufIndex = 0;
+ Uint32 Rnext;
+ Uint32 Rfirst;
+ Uint32 TstoredProcedure = (regOperPtr->storedProcedureId != ZNIL);
+ Uint32 RnoFree = cnoFreeAttrbufrec;
+
+//-------------------------------------------------------------------------
+// As a prelude to the execution of the TUPKEYREQ we will copy the program
+// into the inBuffer to enable easy execution without any complex jumping
+// between the buffers. In particular this will make the interpreter less
+// complex. Hopefully it does also improve performance.
+//-------------------------------------------------------------------------
+ copyAttrBufPtr.i = regOperPtr->firstAttrinbufrec;
+ while (copyAttrBufPtr.i != RNIL) {
+ jam();
+ ndbrequire(copyAttrBufPtr.i < RnoOfAttrBufrec);
+ ptrAss(copyAttrBufPtr, attrbufrec);
+ RbufLen = copyAttrBufPtr.p->attrbuf[ZBUF_DATA_LEN];
+ Rnext = copyAttrBufPtr.p->attrbuf[ZBUF_NEXT];
+ Rfirst = cfirstfreeAttrbufrec;
+ MEMCOPY_NO_WORDS(&inBuffer[RinBufIndex],
+ &copyAttrBufPtr.p->attrbuf[0],
+ RbufLen);
+ RinBufIndex += RbufLen;
+ if (!TstoredProcedure) {
+ copyAttrBufPtr.p->attrbuf[ZBUF_NEXT] = Rfirst;
+ cfirstfreeAttrbufrec = copyAttrBufPtr.i;
+ RnoFree++;
+ }//if
+ copyAttrBufPtr.i = Rnext;
+ }//while
+ cnoFreeAttrbufrec = RnoFree;
+ if (TstoredProcedure) {
+ jam();
+ StoredProcPtr storedPtr;
+ c_storedProcPool.getPtr(storedPtr, (Uint32)regOperPtr->storedProcedureId);
+ ndbrequire(storedPtr.p->storedCode == ZSCAN_PROCEDURE);
+ storedPtr.p->storedCounter--;
+ regOperPtr->storedProcedureId = ZNIL;
+ }//if
+ // Release the ATTRINFO buffers
+ regOperPtr->firstAttrinbufrec = RNIL;
+ regOperPtr->lastAttrinbufrec = RNIL;
+}//Dbtup::copyAttrinfo()
+
+void Dbtup::handleATTRINFOforTUPKEYREQ(Signal* signal,
+ Uint32 length,
+ Operationrec * const regOperPtr)
+{
+ AttrbufrecPtr TAttrinbufptr;
+ TAttrinbufptr.i = cfirstfreeAttrbufrec;
+ if ((cfirstfreeAttrbufrec < cnoOfAttrbufrec) &&
+ (cnoFreeAttrbufrec > MIN_ATTRBUF)) {
+ ptrAss(TAttrinbufptr, attrbufrec);
+ MEMCOPY_NO_WORDS(&TAttrinbufptr.p->attrbuf[0],
+ &signal->theData[3],
+ length);
+ Uint32 RnoFree = cnoFreeAttrbufrec;
+ Uint32 Rnext = TAttrinbufptr.p->attrbuf[ZBUF_NEXT];
+ TAttrinbufptr.p->attrbuf[ZBUF_DATA_LEN] = length;
+ TAttrinbufptr.p->attrbuf[ZBUF_NEXT] = RNIL;
+
+ AttrbufrecPtr locAttrinbufptr;
+ Uint32 RnewLen = regOperPtr->currentAttrinbufLen;
+
+ locAttrinbufptr.i = regOperPtr->lastAttrinbufrec;
+ cfirstfreeAttrbufrec = Rnext;
+ cnoFreeAttrbufrec = RnoFree - 1;
+ RnewLen += length;
+ regOperPtr->lastAttrinbufrec = TAttrinbufptr.i;
+ regOperPtr->currentAttrinbufLen = RnewLen;
+ if (locAttrinbufptr.i == RNIL) {
+ regOperPtr->firstAttrinbufrec = TAttrinbufptr.i;
+ return;
+ } else {
+ jam();
+ ptrCheckGuard(locAttrinbufptr, cnoOfAttrbufrec, attrbufrec);
+ locAttrinbufptr.p->attrbuf[ZBUF_NEXT] = TAttrinbufptr.i;
+ }//if
+ if (RnewLen < ZATTR_BUFFER_SIZE) {
+ return;
+ } else {
+ jam();
+ regOperPtr->transstate = TOO_MUCH_AI;
+ return;
+ }//if
+ } else if (cnoFreeAttrbufrec <= MIN_ATTRBUF) {
+ jam();
+ regOperPtr->transstate = ERROR_WAIT_TUPKEYREQ;
+ } else {
+ ndbrequire(false);
+ }//if
+}//Dbtup::handleATTRINFOforTUPKEYREQ()
+
+void Dbtup::execATTRINFO(Signal* signal)
+{
+ OperationrecPtr regOpPtr;
+ Uint32 Rsig0 = signal->theData[0];
+ Uint32 Rlen = signal->length();
+ regOpPtr.i = Rsig0;
+
+ jamEntry();
+
+ ptrCheckGuard(regOpPtr, cnoOfOprec, operationrec);
+ if (regOpPtr.p->transstate == IDLE) {
+ handleATTRINFOforTUPKEYREQ(signal, Rlen - 3, regOpPtr.p);
+ return;
+ } else if (regOpPtr.p->transstate == WAIT_STORED_PROCEDURE_ATTR_INFO) {
+ storedProcedureAttrInfo(signal, regOpPtr.p, Rlen - 3, 3, false);
+ return;
+ }//if
+ switch (regOpPtr.p->transstate) {
+ case ERROR_WAIT_STORED_PROCREQ:
+ jam();
+ case TOO_MUCH_AI:
+ jam();
+ case ERROR_WAIT_TUPKEYREQ:
+ jam();
+ return; /* IGNORE ATTRINFO IN THOSE STATES, WAITING FOR ABORT SIGNAL */
+ break;
+ case DISCONNECTED:
+ jam();
+ case STARTED:
+ jam();
+ default:
+ ndbrequire(false);
+ }//switch
+}//Dbtup::execATTRINFO()
+
+void Dbtup::execTUP_ALLOCREQ(Signal* signal)
+{
+ OperationrecPtr regOperPtr;
+ TablerecPtr regTabPtr;
+ FragrecordPtr regFragPtr;
+
+ jamEntry();
+
+ regOperPtr.i = signal->theData[0];
+ regFragPtr.i = signal->theData[1];
+ regTabPtr.i = signal->theData[2];
+
+ if (!((regOperPtr.i < cnoOfOprec) &&
+ (regFragPtr.i < cnoOfFragrec) &&
+ (regTabPtr.i < cnoOfTablerec))) {
+ ndbrequire(false);
+ }//if
+ ptrAss(regOperPtr, operationrec);
+ ptrAss(regFragPtr, fragrecord);
+ ptrAss(regTabPtr, tablerec);
+
+//---------------------------------------------------
+/* --- Allocate a tuple as requested by ACC --- */
+//---------------------------------------------------
+ PagePtr pagePtr;
+ Uint32 pageOffset;
+ if (!allocTh(regFragPtr.p,
+ regTabPtr.p,
+ NORMAL_PAGE,
+ signal,
+ pageOffset,
+ pagePtr)) {
+ signal->theData[0] = terrorCode; // Indicate failure
+ return;
+ }//if
+ Uint32 fragPageId = pagePtr.p->pageWord[ZPAGE_FRAG_PAGE_ID_POS];
+ Uint32 pageIndex = ((pageOffset - ZPAGE_HEADER_SIZE) /
+ regTabPtr.p->tupheadsize) << 1;
+ regOperPtr.p->tableRef = regTabPtr.i;
+ regOperPtr.p->fragId = regFragPtr.p->fragmentId;
+ regOperPtr.p->realPageId = pagePtr.i;
+ regOperPtr.p->fragPageId = fragPageId;
+ regOperPtr.p->pageOffset = pageOffset;
+ regOperPtr.p->pageIndex = pageIndex;
+ /* -------------------------------------------------------------- */
+ /* AN INSERT IS UNDONE BY FREEING THE DATA OCCUPIED BY THE INSERT */
+ /* THE ONLY DATA WE HAVE TO LOG EXCEPT THE TYPE, PAGE AND INDEX */
+ /* IS THE AMOUNT OF DATA TO FREE */
+ /* -------------------------------------------------------------- */
+ if (isUndoLoggingNeeded(regFragPtr.p, fragPageId)) {
+ jam();
+ cprAddUndoLogRecord(signal,
+ ZLCPR_TYPE_DELETE_TH,
+ fragPageId,
+ pageIndex,
+ regTabPtr.i,
+ regFragPtr.p->fragmentId,
+ regFragPtr.p->checkpointVersion);
+ }//if
+
+ //---------------------------------------------------------------
+ // Initialise Active operation list by setting the list to empty
+ //---------------------------------------------------------------
+ ndbrequire(pageOffset < ZWORDS_ON_PAGE);
+ pagePtr.p->pageWord[pageOffset] = RNIL;
+
+ signal->theData[0] = 0;
+ signal->theData[1] = fragPageId;
+ signal->theData[2] = pageIndex;
+}//Dbtup::execTUP_ALLOCREQ()
+
+void
+Dbtup::setChecksum(Page* const pagePtr, Uint32 tupHeadOffset, Uint32 tupHeadSize)
+{
+ // 2 == regTabPtr.p->tupChecksumIndex
+ pagePtr->pageWord[tupHeadOffset + 2] = 0;
+ Uint32 checksum = calculateChecksum(pagePtr, tupHeadOffset, tupHeadSize);
+ pagePtr->pageWord[tupHeadOffset + 2] = checksum;
+}//Dbtup::setChecksum()
+
+Uint32
+Dbtup::calculateChecksum(Page* pagePtr,
+ Uint32 tupHeadOffset,
+ Uint32 tupHeadSize)
+{
+ Uint32 checksum = 0;
+ Uint32 loopStop = tupHeadOffset + tupHeadSize;
+ ndbrequire(loopStop <= ZWORDS_ON_PAGE);
+ // includes tupVersion
+ for (Uint32 i = tupHeadOffset + 1; i < loopStop; i++) {
+ checksum ^= pagePtr->pageWord[i];
+ }//if
+ return checksum;
+}//Dbtup::calculateChecksum()
+
+/* ----------------------------------------------------------------- */
+/* ----------- INSERT_ACTIVE_OP_LIST -------------- */
+/* ----------------------------------------------------------------- */
+void Dbtup::insertActiveOpList(Signal* signal,
+ OperationrecPtr regOperPtr,
+ Page* const pagePtr,
+ Uint32 pageOffset)
+{
+ OperationrecPtr iaoPrevOpPtr;
+ ndbrequire(regOperPtr.p->inActiveOpList == ZFALSE);
+ regOperPtr.p->inActiveOpList = ZTRUE;
+ ndbrequire(pageOffset < ZWORDS_ON_PAGE);
+ iaoPrevOpPtr.i = pagePtr->pageWord[pageOffset];
+ pagePtr->pageWord[pageOffset] = regOperPtr.i;
+ regOperPtr.p->prevActiveOp = RNIL;
+ regOperPtr.p->nextActiveOp = iaoPrevOpPtr.i;
+ if (iaoPrevOpPtr.i == RNIL) {
+ return;
+ } else {
+ jam();
+ ptrCheckGuard(iaoPrevOpPtr, cnoOfOprec, operationrec);
+ iaoPrevOpPtr.p->prevActiveOp = regOperPtr.i;
+ if (iaoPrevOpPtr.p->optype == ZDELETE &&
+ regOperPtr.p->optype == ZINSERT) {
+ jam();
+ // mark both
+ iaoPrevOpPtr.p->deleteInsertFlag = 1;
+ regOperPtr.p->deleteInsertFlag = 1;
+ }
+ return;
+ }//if
+}//Dbtup::insertActiveOpList()
+
+void Dbtup::linkOpIntoFragList(OperationrecPtr regOperPtr,
+ Fragrecord* const regFragPtr)
+{
+ OperationrecPtr sopTmpOperPtr;
+ Uint32 tail = regFragPtr->lastusedOprec;
+ ndbrequire(regOperPtr.p->inFragList == ZFALSE);
+ regOperPtr.p->inFragList = ZTRUE;
+ regOperPtr.p->prevOprecInList = tail;
+ regOperPtr.p->nextOprecInList = RNIL;
+ sopTmpOperPtr.i = tail;
+ if (tail == RNIL) {
+ regFragPtr->firstusedOprec = regOperPtr.i;
+ } else {
+ jam();
+ ptrCheckGuard(sopTmpOperPtr, cnoOfOprec, operationrec);
+ sopTmpOperPtr.p->nextOprecInList = regOperPtr.i;
+ }//if
+ regFragPtr->lastusedOprec = regOperPtr.i;
+}//Dbtup::linkOpIntoFragList()
+
+/*
+This routine is optimised for use from TUPKEYREQ.
+This means that a lot of input data is stored in the operation record.
+The routine expects the following data in the operation record to be
+set-up properly.
+Transaction data
+1) transid1
+2) transid2
+3) savePointId
+
+Operation data
+4) optype
+5) dirtyOp
+
+Tuple address
+6) fragPageId
+7) pageIndex
+
+regFragPtr and regTabPtr are references to the table and fragment data and
+is read-only.
+
+The routine will set up the following data in the operation record if
+returned with success.
+
+Tuple address data
+1) realPageId
+2) fragPageId
+3) pageOffset
+4) pageIndex
+
+Also the pagePtr is an output variable if the routine returns with success.
+It's input value can be undefined.
+*/
+bool
+Dbtup::getPage(PagePtr& pagePtr,
+ Operationrec* const regOperPtr,
+ Fragrecord* const regFragPtr,
+ Tablerec* const regTabPtr)
+{
+/* ------------------------------------------------------------------------- */
+// GET THE REFERENCE TO THE TUPLE HEADER BY TRANSLATING THE FRAGMENT PAGE ID
+// INTO A REAL PAGE ID AND BY USING THE PAGE INDEX TO DERIVE THE PROPER INDEX
+// IN THE REAL PAGE.
+/* ------------------------------------------------------------------------- */
+ pagePtr.i = getRealpid(regFragPtr, regOperPtr->fragPageId);
+ regOperPtr->realPageId = pagePtr.i;
+ Uint32 RpageIndex = regOperPtr->pageIndex;
+ Uint32 Rtupheadsize = regTabPtr->tupheadsize;
+ ptrCheckGuard(pagePtr, cnoOfPage, page);
+ Uint32 RpageIndexScaled = RpageIndex >> 1;
+ ndbrequire((RpageIndex & 1) == 0);
+ regOperPtr->pageOffset = ZPAGE_HEADER_SIZE +
+ (Rtupheadsize * RpageIndexScaled);
+
+ OperationrecPtr leaderOpPtr;
+ ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE);
+ leaderOpPtr.i = pagePtr.p->pageWord[regOperPtr->pageOffset];
+ if (leaderOpPtr.i == RNIL) {
+ return true;
+ }//if
+ ptrCheckGuard(leaderOpPtr, cnoOfOprec, operationrec);
+ bool dirtyRead = ((regOperPtr->optype == ZREAD) &&
+ (regOperPtr->dirtyOp == 1));
+ if (dirtyRead) {
+ bool sameTrans = ((regOperPtr->transid1 == leaderOpPtr.p->transid1) &&
+ (regOperPtr->transid2 == leaderOpPtr.p->transid2));
+ if (!sameTrans) {
+ if (!getPageLastCommitted(regOperPtr, leaderOpPtr.p)) {
+ return false;
+ }//if
+ pagePtr.i = regOperPtr->realPageId;
+ ptrCheckGuard(pagePtr, cnoOfPage, page);
+ return true;
+ }//if
+ }//if
+ if (regOperPtr->optype == ZREAD) {
+ /*
+ Read uses savepoint id's to find the correct tuple version.
+ */
+ if (getPageThroughSavePoint(regOperPtr, leaderOpPtr.p)) {
+ jam();
+ pagePtr.i = regOperPtr->realPageId;
+ ptrCheckGuard(pagePtr, cnoOfPage, page);
+ return true;
+ }
+ return false;
+ }
+//----------------------------------------------------------------------
+// Check that no other operation is already active on the tuple. Also
+// that abort or commit is not ongoing.
+//----------------------------------------------------------------------
+ if (leaderOpPtr.p->tupleState == NO_OTHER_OP) {
+ jam();
+ if ((leaderOpPtr.p->optype == ZDELETE) &&
+ (regOperPtr->optype != ZINSERT)) {
+ jam();
+ terrorCode = ZTUPLE_DELETED_ERROR;
+ return false;
+ }//if
+ return true;
+ } else if (leaderOpPtr.p->tupleState == ALREADY_ABORTED) {
+ jam();
+ terrorCode = ZMUST_BE_ABORTED_ERROR;
+ return false;
+ } else {
+ ndbrequire(false);
+ }//if
+ return true;
+}//Dbtup::getPage()
+
+bool
+Dbtup::getPageThroughSavePoint(Operationrec* regOperPtr,
+ Operationrec* leaderOpPtr)
+{
+ bool found = false;
+ OperationrecPtr loopOpPtr;
+ loopOpPtr.p = leaderOpPtr;
+ while(true) {
+ if (regOperPtr->savePointId > loopOpPtr.p->savePointId) {
+ jam();
+ found = true;
+ break;
+ }
+ if (loopOpPtr.p->nextActiveOp == RNIL) {
+ break;
+ }
+ loopOpPtr.i = loopOpPtr.p->nextActiveOp;
+ ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
+ jam();
+ }
+ if (!found) {
+ return getPageLastCommitted(regOperPtr, loopOpPtr.p);
+ } else {
+ if (loopOpPtr.p->optype == ZDELETE) {
+ jam();
+ terrorCode = ZTUPLE_DELETED_ERROR;
+ return false;
+ }
+ if (loopOpPtr.p->tupleState == ALREADY_ABORTED) {
+ /*
+ Requested tuple version has already been aborted
+ */
+ jam();
+ terrorCode = ZMUST_BE_ABORTED_ERROR;
+ return false;
+ }
+ bool use_copy;
+ if (loopOpPtr.p->prevActiveOp == RNIL) {
+ jam();
+ /*
+ Use original tuple since we are reading from the last written tuple.
+ We are the
+ */
+ use_copy = false;
+ } else {
+ /*
+ Go forward in time to find a copy of the tuple which this operation
+ produced
+ */
+ loopOpPtr.i = loopOpPtr.p->prevActiveOp;
+ ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
+ if (loopOpPtr.p->optype == ZDELETE) {
+ /*
+ This operation was a Delete and thus have no copy tuple attached to
+ it. We will move forward to the next that either doesn't exist in
+ which case we will return the original tuple of any operation and
+ otherwise it must be an insert which contains a copy record.
+ */
+ if (loopOpPtr.p->prevActiveOp == RNIL) {
+ jam();
+ use_copy = false;
+ } else {
+ jam();
+ loopOpPtr.i = loopOpPtr.p->prevActiveOp;
+ ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
+ ndbrequire(loopOpPtr.p->optype == ZINSERT);
+ use_copy = true;
+ }
+ } else if (loopOpPtr.p->optype == ZUPDATE) {
+ jam();
+ /*
+ This operation which was the next in time have a copy which was the
+ result of the previous operation which we want to use. Thus use
+ the copy tuple of this operation.
+ */
+ use_copy = true;
+ } else {
+ /*
+ This operation was an insert that happened after an insert or update.
+ This is not a possible case.
+ */
+ ndbrequire(false);
+ return false;
+ }
+ }
+ if (use_copy) {
+ regOperPtr->realPageId = loopOpPtr.p->realPageIdC;
+ regOperPtr->fragPageId = loopOpPtr.p->fragPageIdC;
+ regOperPtr->pageIndex = loopOpPtr.p->pageIndexC;
+ regOperPtr->pageOffset = loopOpPtr.p->pageOffsetC;
+ } else {
+ regOperPtr->realPageId = loopOpPtr.p->realPageId;
+ regOperPtr->fragPageId = loopOpPtr.p->fragPageId;
+ regOperPtr->pageIndex = loopOpPtr.p->pageIndex;
+ regOperPtr->pageOffset = loopOpPtr.p->pageOffset;
+ }
+ return true;
+ }
+}
+
+bool
+Dbtup::getPageLastCommitted(Operationrec* const regOperPtr,
+ Operationrec* const leaderOpPtr)
+{
+//----------------------------------------------------------------------
+// Dirty reads wants to read the latest committed tuple. The latest
+// tuple value could be not existing or else we have to find the copy
+// tuple. Start by finding the end of the list to find the first operation
+// on the record in the ongoing transaction.
+//----------------------------------------------------------------------
+ jam();
+ OperationrecPtr loopOpPtr;
+ loopOpPtr.p = leaderOpPtr;
+ while (loopOpPtr.p->nextActiveOp != RNIL) {
+ jam();
+ loopOpPtr.i = loopOpPtr.p->nextActiveOp;
+ ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
+ }//while
+ if (loopOpPtr.p->optype == ZINSERT) {
+ jam();
+//----------------------------------------------------------------------
+// With an insert in the start of the list we know that the tuple did not
+// exist before this transaction was started. We don't care if the current
+// transaction is in the commit phase since the commit is not really
+// completed until the operation is gone from TUP.
+//----------------------------------------------------------------------
+ terrorCode = ZTUPLE_DELETED_ERROR;
+ return false;
+ } else {
+//----------------------------------------------------------------------
+// A successful update and delete as first in the queue means that a tuple
+// exist in the committed world. We need to find it.
+//----------------------------------------------------------------------
+ if (loopOpPtr.p->optype == ZUPDATE) {
+ jam();
+//----------------------------------------------------------------------
+// The first operation was a delete we set our tuple reference to the
+// copy tuple of this operation.
+//----------------------------------------------------------------------
+ regOperPtr->realPageId = loopOpPtr.p->realPageIdC;
+ regOperPtr->fragPageId = loopOpPtr.p->fragPageIdC;
+ regOperPtr->pageIndex = loopOpPtr.p->pageIndexC;
+ regOperPtr->pageOffset = loopOpPtr.p->pageOffsetC;
+ } else if ((loopOpPtr.p->optype == ZDELETE) &&
+ (loopOpPtr.p->prevActiveOp == RNIL)) {
+ jam();
+//----------------------------------------------------------------------
+// There was only a delete. The original tuple still is ok.
+//----------------------------------------------------------------------
+ } else {
+ jam();
+//----------------------------------------------------------------------
+// There was another operation after the delete, this must be an insert
+// and we have found our copy tuple there.
+//----------------------------------------------------------------------
+ loopOpPtr.i = loopOpPtr.p->prevActiveOp;
+ ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
+ ndbrequire(loopOpPtr.p->optype == ZINSERT);
+ regOperPtr->realPageId = loopOpPtr.p->realPageIdC;
+ regOperPtr->fragPageId = loopOpPtr.p->fragPageIdC;
+ regOperPtr->pageIndex = loopOpPtr.p->pageIndexC;
+ regOperPtr->pageOffset = loopOpPtr.p->pageOffsetC;
+ }//if
+ }//if
+ return true;
+}//Dbtup::getPageLastCommitted()
+
+void Dbtup::execTUPKEYREQ(Signal* signal)
+{
+ TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtr();
+ Uint32 RoperPtr = tupKeyReq->connectPtr;
+ Uint32 Rtabptr = tupKeyReq->tableRef;
+ Uint32 RfragId = tupKeyReq->fragId;
+ Uint32 Rstoredid = tupKeyReq->storedProcedure;
+ Uint32 Rfragptr = tupKeyReq->fragPtr;
+
+ Uint32 RnoOfOprec = cnoOfOprec;
+ Uint32 RnoOfTablerec = cnoOfTablerec;
+ Uint32 RnoOfFragrec = cnoOfFragrec;
+
+ operPtr.i = RoperPtr;
+ fragptr.i = Rfragptr;
+ tabptr.i = Rtabptr;
+ jamEntry();
+
+ ndbrequire(((RoperPtr < RnoOfOprec) &&
+ (Rtabptr < RnoOfTablerec) &&
+ (Rfragptr < RnoOfFragrec)));
+ ptrAss(operPtr, operationrec);
+ Operationrec * const regOperPtr = operPtr.p;
+ ptrAss(fragptr, fragrecord);
+ Fragrecord * const regFragPtr = fragptr.p;
+ ptrAss(tabptr, tablerec);
+ Tablerec* const regTabPtr = tabptr.p;
+
+ Uint32 TrequestInfo = tupKeyReq->request;
+
+ if (regOperPtr->transstate != IDLE) {
+ TUPKEY_abort(signal, 39);
+ return;
+ }//if
+/* ----------------------------------------------------------------- */
+// Operation is ZREAD when we arrive here so no need to worry about the
+// abort process.
+/* ----------------------------------------------------------------- */
+/* ----------- INITIATE THE OPERATION RECORD -------------- */
+/* ----------------------------------------------------------------- */
+ regOperPtr->fragmentPtr = Rfragptr;
+ regOperPtr->dirtyOp = TrequestInfo & 1;
+ regOperPtr->opSimple = (TrequestInfo >> 1) & 1;
+ regOperPtr->interpretedExec = (TrequestInfo >> 10) & 1;
+ regOperPtr->optype = (TrequestInfo >> 6) & 0xf;
+
+ // Attributes needed by trigger execution
+ regOperPtr->noFiredTriggers = 0;
+ regOperPtr->tableRef = Rtabptr;
+ regOperPtr->tcOperationPtr = tupKeyReq->opRef;
+ regOperPtr->primaryReplica = tupKeyReq->primaryReplica;
+ regOperPtr->coordinatorTC = tupKeyReq->coordinatorTC;
+ regOperPtr->tcOpIndex = tupKeyReq->tcOpIndex;
+ regOperPtr->savePointId = tupKeyReq->savePointId;
+
+ regOperPtr->fragId = RfragId;
+
+ regOperPtr->fragPageId = tupKeyReq->keyRef1;
+ regOperPtr->pageIndex = tupKeyReq->keyRef2;
+ regOperPtr->attrinbufLen = regOperPtr->logSize = tupKeyReq->attrBufLen;
+ regOperPtr->recBlockref = tupKeyReq->applRef;
+
+// Schema Version in tupKeyReq->schemaVersion not used in this version
+ regOperPtr->storedProcedureId = Rstoredid;
+ regOperPtr->transid1 = tupKeyReq->transId1;
+ regOperPtr->transid2 = tupKeyReq->transId2;
+
+ regOperPtr->attroutbufLen = 0;
+/* ----------------------------------------------------------------------- */
+// INITIALISE TO DEFAULT VALUE
+// INIT THE COPY REFERENCE RECORDS TO RNIL TO ENSURE THAT THEIR VALUES
+// ARE VALID IF THEY EXISTS
+// NO PENDING CHECKPOINT WHEN COPY CREATED (DEFAULT)
+// NO TUPLE HAS BEEN ALLOCATED YET
+// NO COPY HAS BEEN CREATED YET
+/* ----------------------------------------------------------------------- */
+ regOperPtr->undoLogged = false;
+ regOperPtr->realPageId = RNIL;
+ regOperPtr->realPageIdC = RNIL;
+ regOperPtr->fragPageIdC = RNIL;
+
+ regOperPtr->pageOffset = ZNIL;
+ regOperPtr->pageOffsetC = ZNIL;
+
+ regOperPtr->pageIndexC = ZNIL;
+
+ // version not yet known
+ regOperPtr->tupVersion = ZNIL;
+ regOperPtr->deleteInsertFlag = 0;
+
+ regOperPtr->tupleState = TUPLE_BLOCKED;
+ regOperPtr->changeMask.clear();
+
+ if (Rstoredid != ZNIL) {
+ ndbrequire(initStoredOperationrec(regOperPtr, Rstoredid) == ZOK);
+ }//if
+ copyAttrinfo(signal, regOperPtr, &cinBuffer[0]);
+
+ PagePtr pagePtr;
+ if (!getPage(pagePtr, regOperPtr, regFragPtr, regTabPtr)) {
+ tupkeyErrorLab(signal);
+ return;
+ }//if
+
+ Uint32 Roptype = regOperPtr->optype;
+ if (Roptype == ZREAD) {
+ jam();
+ if (handleReadReq(signal, regOperPtr, regTabPtr, pagePtr.p) != -1) {
+ sendTUPKEYCONF(signal, regOperPtr, 0);
+/* ------------------------------------------------------------------------- */
+// Read Operations need not to be taken out of any lists. We also do not
+// need to wait for commit since there is no changes to commit. Thus we
+// prepare the operation record already now for the next operation.
+// Write operations have set the state to STARTED above indicating that
+// they are waiting for the Commit or Abort decision.
+/* ------------------------------------------------------------------------- */
+ regOperPtr->transstate = IDLE;
+ regOperPtr->currentAttrinbufLen = 0;
+ }//if
+ return;
+ }//if
+ linkOpIntoFragList(operPtr, regFragPtr);
+ insertActiveOpList(signal,
+ operPtr,
+ pagePtr.p,
+ regOperPtr->pageOffset);
+ if (isUndoLoggingBlocked(regFragPtr)) {
+ TUPKEY_abort(signal, 38);
+ return;
+ }//if
+/* ---------------------------------------------------------------------- */
+// WE SET THE CURRENT ACTIVE OPERATION IN THE TUPLE TO POINT TO OUR
+//OPERATION RECORD. IF SEVERAL OPERATIONS WORK ON THIS TUPLE THEY ARE
+// LINKED TO OUR OPERATION RECORD. DIRTY READS CAN ACCESS THE COPY
+// TUPLE THROUGH OUR OPERATION RECORD.
+/* ---------------------------------------------------------------------- */
+ if (Roptype == ZINSERT) {
+ jam();
+ if (handleInsertReq(signal, regOperPtr,
+ regFragPtr, regTabPtr, pagePtr.p) == -1) {
+ return;
+ }//if
+ if (!regTabPtr->tuxCustomTriggers.isEmpty()) {
+ jam();
+ if (executeTuxInsertTriggers(signal, regOperPtr, regTabPtr) != 0) {
+ jam();
+ tupkeyErrorLab(signal);
+ return;
+ }
+ }
+ checkImmediateTriggersAfterInsert(signal,
+ regOperPtr,
+ regTabPtr);
+ sendTUPKEYCONF(signal, regOperPtr, regOperPtr->logSize);
+ return;
+ }//if
+ if (regTabPtr->checksumIndicator &&
+ (calculateChecksum(pagePtr.p,
+ regOperPtr->pageOffset,
+ regTabPtr->tupheadsize) != 0)) {
+ jam();
+ terrorCode = ZTUPLE_CORRUPTED_ERROR;
+ tupkeyErrorLab(signal);
+ return;
+ }//if
+ if (Roptype == ZUPDATE) {
+ jam();
+ if (handleUpdateReq(signal, regOperPtr,
+ regFragPtr, regTabPtr, pagePtr.p) == -1) {
+ return;
+ }//if
+ // If update operation is done on primary,
+ // check any after op triggers
+ terrorCode = 0;
+ if (!regTabPtr->tuxCustomTriggers.isEmpty()) {
+ jam();
+ if (executeTuxUpdateTriggers(signal, regOperPtr, regTabPtr) != 0) {
+ jam();
+ tupkeyErrorLab(signal);
+ return;
+ }
+ }
+ checkImmediateTriggersAfterUpdate(signal,
+ regOperPtr,
+ regTabPtr);
+ // XXX use terrorCode for now since all methods are void
+ if (terrorCode != 0) {
+ tupkeyErrorLab(signal);
+ return;
+ }
+ sendTUPKEYCONF(signal, regOperPtr, regOperPtr->logSize);
+ return;
+ } else if (Roptype == ZDELETE) {
+ jam();
+ if (handleDeleteReq(signal, regOperPtr,
+ regFragPtr, regTabPtr, pagePtr.p) == -1) {
+ return;
+ }//if
+ // If delete operation is done on primary,
+ // check any after op triggers
+ if (!regTabPtr->tuxCustomTriggers.isEmpty()) {
+ jam();
+ if (executeTuxDeleteTriggers(signal, regOperPtr, regTabPtr) != 0) {
+ jam();
+ tupkeyErrorLab(signal);
+ return;
+ }
+ }
+ checkImmediateTriggersAfterDelete(signal,
+ regOperPtr,
+ regTabPtr);
+ sendTUPKEYCONF(signal, regOperPtr, 0);
+ return;
+ } else {
+ ndbrequire(false);
+ }//if
+}//Dbtup::execTUPKEYREQ()
+
+/* ---------------------------------------------------------------- */
+/* ------------------------ CONFIRM REQUEST ----------------------- */
+/* ---------------------------------------------------------------- */
+void Dbtup::sendTUPKEYCONF(Signal* signal,
+ Operationrec * const regOperPtr,
+ Uint32 TlogSize)
+{
+ TupKeyConf * const tupKeyConf = (TupKeyConf *)signal->getDataPtrSend();
+
+ Uint32 RuserPointer = regOperPtr->userpointer;
+ Uint32 RattroutbufLen = regOperPtr->attroutbufLen;
+ Uint32 RnoFiredTriggers = regOperPtr->noFiredTriggers;
+ BlockReference Ruserblockref = regOperPtr->userblockref;
+ Uint32 lastRow = regOperPtr->lastRow;
+
+ regOperPtr->transstate = STARTED;
+ regOperPtr->tupleState = NO_OTHER_OP;
+ tupKeyConf->userPtr = RuserPointer;
+ tupKeyConf->readLength = RattroutbufLen;
+ tupKeyConf->writeLength = TlogSize;
+ tupKeyConf->noFiredTriggers = RnoFiredTriggers;
+ tupKeyConf->lastRow = lastRow;
+
+ EXECUTE_DIRECT(refToBlock(Ruserblockref), GSN_TUPKEYCONF, signal,
+ TupKeyConf::SignalLength);
+ return;
+}//Dbtup::sendTUPKEYCONF()
+
+#define MAX_READ (sizeof(signal->theData) > MAX_MESSAGE_SIZE ? MAX_MESSAGE_SIZE : sizeof(signal->theData))
+
+/* ---------------------------------------------------------------- */
+/* ----------------------------- READ ---------------------------- */
+/* ---------------------------------------------------------------- */
+int Dbtup::handleReadReq(Signal* signal,
+ Operationrec* const regOperPtr,
+ Tablerec* const regTabPtr,
+ Page* pagePtr)
+{
+ Uint32 Ttupheadoffset = regOperPtr->pageOffset;
+ const BlockReference sendBref = regOperPtr->recBlockref;
+ if (regTabPtr->checksumIndicator &&
+ (calculateChecksum(pagePtr, Ttupheadoffset,
+ regTabPtr->tupheadsize) != 0)) {
+ jam();
+ terrorCode = ZTUPLE_CORRUPTED_ERROR;
+ tupkeyErrorLab(signal);
+ return -1;
+ }//if
+
+ Uint32 * dst = &signal->theData[25];
+ Uint32 dstLen = (MAX_READ / 4) - 25;
+ const Uint32 node = refToNode(sendBref);
+ if(node != 0 && node != getOwnNodeId()) {
+ ;
+ } else {
+ jam();
+ /**
+ * execute direct
+ */
+ dst = &signal->theData[3];
+ dstLen = (MAX_READ / 4) - 3;
+ }
+
+ if (regOperPtr->interpretedExec != 1) {
+ jam();
+ int ret = readAttributes(pagePtr,
+ Ttupheadoffset,
+ &cinBuffer[0],
+ regOperPtr->attrinbufLen,
+ dst,
+ dstLen,
+ false);
+ if (ret != -1) {
+/* ------------------------------------------------------------------------- */
+// We have read all data into coutBuffer. Now send it to the API.
+/* ------------------------------------------------------------------------- */
+ jam();
+ Uint32 TnoOfDataRead= (Uint32) ret;
+ regOperPtr->attroutbufLen = TnoOfDataRead;
+ sendReadAttrinfo(signal, TnoOfDataRead, regOperPtr);
+ return 0;
+ }//if
+ jam();
+ tupkeyErrorLab(signal);
+ return -1;
+ } else {
+ jam();
+ regOperPtr->lastRow = 0;
+ if (interpreterStartLab(signal, pagePtr, Ttupheadoffset) != -1) {
+ return 0;
+ }//if
+ return -1;
+ }//if
+}//Dbtup::handleReadReq()
+
+/* ---------------------------------------------------------------- */
+/* ---------------------------- UPDATE ---------------------------- */
+/* ---------------------------------------------------------------- */
+int Dbtup::handleUpdateReq(Signal* signal,
+ Operationrec* const regOperPtr,
+ Fragrecord* const regFragPtr,
+ Tablerec* const regTabPtr,
+ Page* const pagePtr)
+{
+ PagePtr copyPagePtr;
+ Uint32 tuple_size = regTabPtr->tupheadsize;
+
+//---------------------------------------------------
+/* --- MAKE A COPY OF THIS TUPLE ON A COPY PAGE --- */
+//---------------------------------------------------
+ Uint32 RpageOffsetC;
+ if (!allocTh(regFragPtr,
+ regTabPtr,
+ COPY_PAGE,
+ signal,
+ RpageOffsetC,
+ copyPagePtr)) {
+ TUPKEY_abort(signal, 1);
+ return -1;
+ }//if
+ Uint32 RpageIdC = copyPagePtr.i;
+ Uint32 RfragPageIdC = copyPagePtr.p->pageWord[ZPAGE_FRAG_PAGE_ID_POS];
+ Uint32 indexC = ((RpageOffsetC - ZPAGE_HEADER_SIZE) / tuple_size) << 1;
+ regOperPtr->pageIndexC = indexC;
+ regOperPtr->fragPageIdC = RfragPageIdC;
+ regOperPtr->realPageIdC = RpageIdC;
+ regOperPtr->pageOffsetC = RpageOffsetC;
+ /* -------------------------------------------------------------- */
+ /* IF WE HAVE AN ONGING CHECKPOINT WE HAVE TO LOG THE ALLOCATION */
+ /* OF THE TUPLE HEADER TO BE ABLE TO DELETE IT UPON RESTART */
+ /* THE ONLY DATA EXCEPT THE TYPE, PAGE, INDEX IS THE SIZE TO FREE */
+ /* -------------------------------------------------------------- */
+ if (isUndoLoggingActive(regFragPtr)) {
+ if (isPageUndoLogged(regFragPtr, RfragPageIdC)) {
+ jam();
+ regOperPtr->undoLogged = true;
+ cprAddUndoLogRecord(signal,
+ ZLCPR_TYPE_DELETE_TH,
+ RfragPageIdC,
+ indexC,
+ regOperPtr->tableRef,
+ regOperPtr->fragId,
+ regFragPtr->checkpointVersion);
+ }//if
+ if (isPageUndoLogged(regFragPtr, regOperPtr->fragPageId)) {
+ jam();
+ cprAddUndoLogRecord(signal,
+ ZLCPR_TYPE_UPDATE_TH,
+ regOperPtr->fragPageId,
+ regOperPtr->pageIndex,
+ regOperPtr->tableRef,
+ regOperPtr->fragId,
+ regFragPtr->checkpointVersion);
+ cprAddData(signal,
+ regFragPtr,
+ regOperPtr->realPageId,
+ tuple_size,
+ regOperPtr->pageOffset);
+ }//if
+ }//if
+ Uint32 RwordCount = tuple_size - 1;
+ Uint32 end_dest = RpageOffsetC + tuple_size;
+ Uint32 offset = regOperPtr->pageOffset;
+ Uint32 end_source = offset + tuple_size;
+ ndbrequire(end_dest <= ZWORDS_ON_PAGE && end_source <= ZWORDS_ON_PAGE);
+ void* Tdestination = (void*)&copyPagePtr.p->pageWord[RpageOffsetC + 1];
+ const void* Tsource = (void*)&pagePtr->pageWord[offset + 1];
+ MEMCOPY_NO_WORDS(Tdestination, Tsource, RwordCount);
+
+ Uint32 prev_tup_version;
+ // nextActiveOp is before this op in event order
+ if (regOperPtr->nextActiveOp == RNIL) {
+ jam();
+ prev_tup_version = ((const Uint32*)Tsource)[0];
+ } else {
+ OperationrecPtr prevOperPtr;
+ jam();
+ prevOperPtr.i = regOperPtr->nextActiveOp;
+ ptrCheckGuard(prevOperPtr, cnoOfOprec, operationrec);
+ prev_tup_version = prevOperPtr.p->tupVersion;
+ }//if
+ regOperPtr->tupVersion = (prev_tup_version + 1) &
+ ((1 << ZTUP_VERSION_BITS) - 1);
+ // global variable alert
+ ndbassert(operationrec + operPtr.i == regOperPtr);
+ copyPagePtr.p->pageWord[RpageOffsetC] = operPtr.i;
+
+ return updateStartLab(signal, regOperPtr, regTabPtr, pagePtr);
+}//Dbtup::handleUpdateReq()
+
+/* ---------------------------------------------------------------- */
+/* ----------------------------- INSERT --------------------------- */
+/* ---------------------------------------------------------------- */
+int Dbtup::handleInsertReq(Signal* signal,
+ Operationrec* const regOperPtr,
+ Fragrecord* const regFragPtr,
+ Tablerec* const regTabPtr,
+ Page* const pagePtr)
+{
+ Uint32 ret_value;
+
+ if (regOperPtr->nextActiveOp != RNIL) {
+ jam();
+ OperationrecPtr prevExecOpPtr;
+ prevExecOpPtr.i = regOperPtr->nextActiveOp;
+ ptrCheckGuard(prevExecOpPtr, cnoOfOprec, operationrec);
+ if (prevExecOpPtr.p->optype != ZDELETE) {
+ terrorCode = ZINSERT_ERROR;
+ tupkeyErrorLab(signal);
+ return -1;
+ }//if
+ ret_value = handleUpdateReq(signal, regOperPtr,
+ regFragPtr, regTabPtr, pagePtr);
+ } else {
+ jam();
+ regOperPtr->tupVersion = 0;
+ ret_value = updateStartLab(signal, regOperPtr, regTabPtr, pagePtr);
+ }//if
+ if (ret_value != (Uint32)-1) {
+ if (checkNullAttributes(regOperPtr, regTabPtr)) {
+ jam();
+ return 0;
+ }//if
+ TUPKEY_abort(signal, 17);
+ }//if
+ return -1;
+}//Dbtup::handleInsertReq()
+
+/* ---------------------------------------------------------------- */
+/* ---------------------------- DELETE ---------------------------- */
+/* ---------------------------------------------------------------- */
+int Dbtup::handleDeleteReq(Signal* signal,
+ Operationrec* const regOperPtr,
+ Fragrecord* const regFragPtr,
+ Tablerec* const regTabPtr,
+ Page* const pagePtr)
+{
+ // delete must set but not increment tupVersion
+ if (regOperPtr->nextActiveOp != RNIL) {
+ OperationrecPtr prevExecOpPtr;
+ prevExecOpPtr.i = regOperPtr->nextActiveOp;
+ ptrCheckGuard(prevExecOpPtr, cnoOfOprec, operationrec);
+ regOperPtr->tupVersion = prevExecOpPtr.p->tupVersion;
+ } else {
+ jam();
+ regOperPtr->tupVersion = pagePtr->pageWord[regOperPtr->pageOffset + 1];
+ }
+ if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageId)) {
+ jam();
+ cprAddUndoLogRecord(signal,
+ ZINDICATE_NO_OP_ACTIVE,
+ regOperPtr->fragPageId,
+ regOperPtr->pageIndex,
+ regOperPtr->tableRef,
+ regOperPtr->fragId,
+ regFragPtr->checkpointVersion);
+ }//if
+ if (regOperPtr->attrinbufLen == 0) {
+ return 0;
+ }//if
+/* ------------------------------------------------------------------------ */
+/* THE APPLICATION WANTS TO READ THE TUPLE BEFORE IT IS DELETED. */
+/* ------------------------------------------------------------------------ */
+ return handleReadReq(signal, regOperPtr, regTabPtr, pagePtr);
+}//Dbtup::handleDeleteReq()
+
+int
+Dbtup::updateStartLab(Signal* signal,
+ Operationrec* const regOperPtr,
+ Tablerec* const regTabPtr,
+ Page* const pagePtr)
+{
+ int retValue;
+ if (regOperPtr->optype == ZINSERT) {
+ jam();
+ setNullBits(pagePtr, regTabPtr, regOperPtr->pageOffset);
+ }
+ if (regOperPtr->interpretedExec != 1) {
+ jam();
+ retValue = updateAttributes(pagePtr,
+ regOperPtr->pageOffset,
+ &cinBuffer[0],
+ regOperPtr->attrinbufLen);
+ if (retValue == -1) {
+ tupkeyErrorLab(signal);
+ return -1;
+ }//if
+ } else {
+ jam();
+ retValue = interpreterStartLab(signal, pagePtr, regOperPtr->pageOffset);
+ }//if
+ ndbrequire(regOperPtr->tupVersion != ZNIL);
+ pagePtr->pageWord[regOperPtr->pageOffset + 1] = regOperPtr->tupVersion;
+ if (regTabPtr->checksumIndicator) {
+ jam();
+ setChecksum(pagePtr, regOperPtr->pageOffset, regTabPtr->tupheadsize);
+ }//if
+ return retValue;
+}//Dbtup::updateStartLab()
+
+void
+Dbtup::setNullBits(Page* const regPage, Tablerec* const regTabPtr, Uint32 pageOffset)
+{
+ Uint32 noOfExtraNullWords = regTabPtr->tupNullWords;
+ Uint32 nullOffsetStart = regTabPtr->tupNullIndex + pageOffset;
+ ndbrequire((noOfExtraNullWords + nullOffsetStart) < ZWORDS_ON_PAGE);
+ for (Uint32 i = 0; i < noOfExtraNullWords; i++) {
+ regPage->pageWord[nullOffsetStart + i] = 0xFFFFFFFF;
+ }//for
+}//Dbtup::setNullBits()
+
+bool
+Dbtup::checkNullAttributes(Operationrec* const regOperPtr,
+ Tablerec* const regTabPtr)
+{
+// Implement checking of updating all not null attributes in an insert here.
+ Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
+ /*
+ * The idea here is maybe that changeMask is not-null attributes
+ * and must contain notNullAttributeMask. But:
+ *
+ * 1. changeMask has all bits set on insert
+ * 2. not-null is checked in each UpdateFunction
+ * 3. the code below does not work except trivially due to 1.
+ *
+ * XXX remove or fix
+ */
+ attributeMask.clear();
+ attributeMask.bitOR(regOperPtr->changeMask);
+ attributeMask.bitAND(regTabPtr->notNullAttributeMask);
+ attributeMask.bitXOR(regTabPtr->notNullAttributeMask);
+ if (!attributeMask.isclear()) {
+ return false;
+ }//if
+ return true;
+}//Dbtup::checkNullAttributes()
+
+/* ---------------------------------------------------------------- */
+/* THIS IS THE START OF THE INTERPRETED EXECUTION OF UPDATES. WE */
+/* START BY LINKING ALL ATTRINFO'S IN A DOUBLY LINKED LIST (THEY ARE*/
+/* ALREADY IN A LINKED LIST). WE ALLOCATE A REGISTER MEMORY (EQUAL */
+/* TO AN ATTRINFO RECORD). THE INTERPRETER GOES THROUGH FOUR PHASES*/
+/* DURING THE FIRST PHASE IT IS ONLY ALLOWED TO READ ATTRIBUTES THAT*/
+/* ARE SENT TO THE CLIENT APPLICATION. DURING THE SECOND PHASE IT IS*/
+/* ALLOWED TO READ FROM ATTRIBUTES INTO REGISTERS, TO UPDATE */
+/* ATTRIBUTES BASED ON EITHER A CONSTANT VALUE OR A REGISTER VALUE, */
+/* A DIVERSE SET OF OPERATIONS ON REGISTERS ARE AVAILABLE AS WELL. */
+/* IT IS ALSO POSSIBLE TO PERFORM JUMPS WITHIN THE INSTRUCTIONS THAT*/
+/* BELONGS TO THE SECOND PHASE. ALSO SUBROUTINES CAN BE CALLED IN */
+/* THIS PHASE. THE THIRD PHASE IS TO AGAIN READ ATTRIBUTES AND */
+/* FINALLY THE FOURTH PHASE READS SELECTED REGISTERS AND SEND THEM */
+/* TO THE CLIENT APPLICATION. */
+/* THERE IS A FIFTH REGION WHICH CONTAINS SUBROUTINES CALLABLE FROM */
+/* THE INTERPRETER EXECUTION REGION. */
+/* THE FIRST FIVE WORDS WILL GIVE THE LENGTH OF THE FIVEE REGIONS */
+/* */
+/* THIS MEANS THAT FROM THE APPLICATIONS POINT OF VIEW THE DATABASE */
+/* CAN HANDLE SUBROUTINE CALLS WHERE THE CODE IS SENT IN THE REQUEST*/
+/* THE RETURN PARAMETERS ARE FIXED AND CAN EITHER BE GENERATED */
+/* BEFORE THE EXECUTION OF THE ROUTINE OR AFTER. */
+/* */
+/* IN LATER VERSIONS WE WILL ADD MORE THINGS LIKE THE POSSIBILITY */
+/* TO ALLOCATE MEMORY AND USE THIS AS LOCAL STORAGE. IT IS ALSO */
+/* IMAGINABLE TO HAVE SPECIAL ROUTINES THAT CAN PERFORM CERTAIN */
+/* OPERATIONS ON BLOB'S DEPENDENT ON WHAT THE BLOB REPRESENTS. */
+/* */
+/* */
+/* ----------------------------------------- */
+/* + INITIAL READ REGION + */
+/* ----------------------------------------- */
+/* + INTERPRETED EXECUTE REGION + */
+/* ----------------------------------------- */
+/* + FINAL UPDATE REGION + */
+/* ----------------------------------------- */
+/* + FINAL READ REGION + */
+/* ----------------------------------------- */
+/* + SUBROUTINE REGION + */
+/* ----------------------------------------- */
+/* ---------------------------------------------------------------- */
+/* ---------------------------------------------------------------- */
+/* ----------------- INTERPRETED EXECUTION ----------------------- */
+/* ---------------------------------------------------------------- */
+int Dbtup::interpreterStartLab(Signal* signal,
+ Page* const pagePtr,
+ Uint32 TupHeadOffset)
+{
+ Operationrec * const regOperPtr = operPtr.p;
+ Uint32 RtotalLen;
+ int TnoDataRW;
+
+ Uint32 RinitReadLen = cinBuffer[0];
+ Uint32 RexecRegionLen = cinBuffer[1];
+ Uint32 RfinalUpdateLen = cinBuffer[2];
+ Uint32 RfinalRLen = cinBuffer[3];
+ Uint32 RsubLen = cinBuffer[4];
+
+ Uint32 RattrinbufLen = regOperPtr->attrinbufLen;
+ const BlockReference sendBref = regOperPtr->recBlockref;
+
+ Uint32 * dst = &signal->theData[25];
+ Uint32 dstLen = (MAX_READ / 4) - 25;
+ const Uint32 node = refToNode(sendBref);
+ if(node != 0 && node != getOwnNodeId()) {
+ ;
+ } else {
+ jam();
+ /**
+ * execute direct
+ */
+ dst = &signal->theData[3];
+ dstLen = (MAX_READ / 4) - 3;
+ }
+
+ RtotalLen = RinitReadLen;
+ RtotalLen += RexecRegionLen;
+ RtotalLen += RfinalUpdateLen;
+ RtotalLen += RfinalRLen;
+ RtotalLen += RsubLen;
+
+ Uint32 RattroutCounter = 0;
+ Uint32 RinstructionCounter = 5;
+ Uint32 RlogSize = 0;
+
+ if (((RtotalLen + 5) == RattrinbufLen) &&
+ (RattrinbufLen >= 5) &&
+ (RattrinbufLen < ZATTR_BUFFER_SIZE)) {
+ /* ---------------------------------------------------------------- */
+ // We start by checking consistency. We must have the first five
+ // words of the ATTRINFO to give us the length of the regions. The
+ // size of these regions must be the same as the total ATTRINFO
+ // length and finally the total length must be within the limits.
+ /* ---------------------------------------------------------------- */
+
+ if (RinitReadLen > 0) {
+ jam();
+ /* ---------------------------------------------------------------- */
+ // The first step that can be taken in the interpreter is to read
+ // data of the tuple before any updates have been applied.
+ /* ---------------------------------------------------------------- */
+ TnoDataRW = readAttributes(pagePtr,
+ TupHeadOffset,
+ &cinBuffer[5],
+ RinitReadLen,
+ &dst[0],
+ dstLen,
+ false);
+ if (TnoDataRW != -1) {
+ RattroutCounter = TnoDataRW;
+ RinstructionCounter += RinitReadLen;
+ } else {
+ jam();
+ tupkeyErrorLab(signal);
+ return -1;
+ }//if
+ }//if
+ if (RexecRegionLen > 0) {
+ jam();
+ /* ---------------------------------------------------------------- */
+ // The next step is the actual interpreted execution. This executes
+ // a register-based virtual machine which can read and write attributes
+ // to and from registers.
+ /* ---------------------------------------------------------------- */
+ Uint32 RsubPC = RinstructionCounter + RfinalUpdateLen + RfinalRLen;
+ TnoDataRW = interpreterNextLab(signal,
+ pagePtr,
+ TupHeadOffset,
+ &clogMemBuffer[0],
+ &cinBuffer[RinstructionCounter],
+ RexecRegionLen,
+ &cinBuffer[RsubPC],
+ RsubLen,
+ &coutBuffer[0],
+ sizeof(coutBuffer) / 4);
+ if (TnoDataRW != -1) {
+ RinstructionCounter += RexecRegionLen;
+ RlogSize = TnoDataRW;
+ } else {
+ jam();
+ return -1;
+ }//if
+ }//if
+ if (RfinalUpdateLen > 0) {
+ jam();
+ /* ---------------------------------------------------------------- */
+ // We can also apply a set of updates without any conditions as part
+ // of the interpreted execution.
+ /* ---------------------------------------------------------------- */
+ if (regOperPtr->optype == ZUPDATE) {
+ TnoDataRW = updateAttributes(pagePtr,
+ TupHeadOffset,
+ &cinBuffer[RinstructionCounter],
+ RfinalUpdateLen);
+ if (TnoDataRW != -1) {
+ MEMCOPY_NO_WORDS(&clogMemBuffer[RlogSize],
+ &cinBuffer[RinstructionCounter],
+ RfinalUpdateLen);
+ RinstructionCounter += RfinalUpdateLen;
+ RlogSize += RfinalUpdateLen;
+ } else {
+ jam();
+ tupkeyErrorLab(signal);
+ return -1;
+ }//if
+ } else {
+ return TUPKEY_abort(signal, 19);
+ }//if
+ }//if
+ if (RfinalRLen > 0) {
+ jam();
+ /* ---------------------------------------------------------------- */
+ // The final action is that we can also read the tuple after it has
+ // been updated.
+ /* ---------------------------------------------------------------- */
+ TnoDataRW = readAttributes(pagePtr,
+ TupHeadOffset,
+ &cinBuffer[RinstructionCounter],
+ RfinalRLen,
+ &dst[RattroutCounter],
+ (dstLen - RattroutCounter),
+ false);
+ if (TnoDataRW != -1) {
+ RattroutCounter += TnoDataRW;
+ } else {
+ jam();
+ tupkeyErrorLab(signal);
+ return -1;
+ }//if
+ }//if
+ regOperPtr->logSize = RlogSize;
+ regOperPtr->attroutbufLen = RattroutCounter;
+ sendReadAttrinfo(signal, RattroutCounter, regOperPtr);
+ if (RlogSize > 0) {
+ sendLogAttrinfo(signal, RlogSize, regOperPtr);
+ }//if
+ return 0;
+ } else {
+ return TUPKEY_abort(signal, 22);
+ }//if
+}//Dbtup::interpreterStartLab()
+
+/* ---------------------------------------------------------------- */
+/* WHEN EXECUTION IS INTERPRETED WE NEED TO SEND SOME ATTRINFO*/
+/* BACK TO LQH FOR LOGGING AND SENDING TO BACKUP AND STANDBY */
+/* NODES. */
+/* INPUT: LOG_ATTRINFOPTR WHERE TO FETCH DATA FROM */
+/* TLOG_START FIRST INDEX TO LOG */
+/* TLOG_END LAST INDEX + 1 TO LOG */
+/* ---------------------------------------------------------------- */
+void Dbtup::sendLogAttrinfo(Signal* signal,
+ Uint32 TlogSize,
+ Operationrec * const regOperPtr)
+
+{
+ Uint32 TbufferIndex = 0;
+ signal->theData[0] = regOperPtr->userpointer;
+ while (TlogSize > 22) {
+ MEMCOPY_NO_WORDS(&signal->theData[3],
+ &clogMemBuffer[TbufferIndex],
+ 22);
+ EXECUTE_DIRECT(refToBlock(regOperPtr->userblockref),
+ GSN_TUP_ATTRINFO, signal, 25);
+ TbufferIndex += 22;
+ TlogSize -= 22;
+ }//while
+ MEMCOPY_NO_WORDS(&signal->theData[3],
+ &clogMemBuffer[TbufferIndex],
+ TlogSize);
+ EXECUTE_DIRECT(refToBlock(regOperPtr->userblockref),
+ GSN_TUP_ATTRINFO, signal, 3 + TlogSize);
+}//Dbtup::sendLogAttrinfo()
+
+inline
+Uint32
+brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
+{
+ Uint32 TbranchDirection = TheInstruction >> 31;
+ Uint32 TbranchLength = (TheInstruction >> 16) & 0x7fff;
+ TprogramCounter--;
+ if (TbranchDirection == 1) {
+ jam();
+ /* ---------------------------------------------------------------- */
+ /* WE JUMP BACKWARDS. */
+ /* ---------------------------------------------------------------- */
+ return (TprogramCounter - TbranchLength);
+ } else {
+ jam();
+ /* ---------------------------------------------------------------- */
+ /* WE JUMP FORWARD. */
+ /* ---------------------------------------------------------------- */
+ return (TprogramCounter + TbranchLength);
+ }//if
+}//brancher()
+
+int Dbtup::interpreterNextLab(Signal* signal,
+ Page* const pagePtr,
+ Uint32 TupHeadOffset,
+ Uint32* logMemory,
+ Uint32* mainProgram,
+ Uint32 TmainProgLen,
+ Uint32* subroutineProg,
+ Uint32 TsubroutineLen,
+ Uint32 * tmpArea,
+ Uint32 tmpAreaSz)
+{
+ register Uint32* TcurrentProgram = mainProgram;
+ register Uint32 TcurrentSize = TmainProgLen;
+ register Uint32 RnoOfInstructions = 0;
+ register Uint32 TprogramCounter = 0;
+ register Uint32 theInstruction;
+ register Uint32 theRegister;
+ Uint32 TdataWritten = 0;
+ Uint32 RstackPtr = 0;
+ union {
+ Uint32 TregMemBuffer[32];
+ Uint64 Tdummy[16];
+ };
+ Uint32 TstackMemBuffer[32];
+
+ /* ---------------------------------------------------------------- */
+ // Initialise all 8 registers to contain the NULL value.
+ // In this version we can handle 32 and 64 bit unsigned integers.
+ // They are handled as 64 bit values. Thus the 32 most significant
+ // bits are zeroed for 32 bit values.
+ /* ---------------------------------------------------------------- */
+ TregMemBuffer[0] = 0;
+ TregMemBuffer[4] = 0;
+ TregMemBuffer[8] = 0;
+ TregMemBuffer[12] = 0;
+ TregMemBuffer[16] = 0;
+ TregMemBuffer[20] = 0;
+ TregMemBuffer[24] = 0;
+ TregMemBuffer[28] = 0;
+ Uint32 tmpHabitant = ~0;
+
+ while (RnoOfInstructions < 8000) {
+ /* ---------------------------------------------------------------- */
+ /* EXECUTE THE NEXT INTERPRETER INSTRUCTION. */
+ /* ---------------------------------------------------------------- */
+ RnoOfInstructions++;
+ theInstruction = TcurrentProgram[TprogramCounter];
+ theRegister = Interpreter::getReg1(theInstruction) << 2;
+ if (TprogramCounter < TcurrentSize) {
+ TprogramCounter++;
+ switch (Interpreter::getOpCode(theInstruction)) {
+ case Interpreter::READ_ATTR_INTO_REG:
+ jam();
+ /* ---------------------------------------------------------------- */
+ // Read an attribute from the tuple into a register.
+ // While reading an attribute we allow the attribute to be an array
+ // as long as it fits in the 64 bits of the register.
+ /* ---------------------------------------------------------------- */
+ {
+ Uint32 theAttrinfo = theInstruction;
+ int TnoDataRW= readAttributes(pagePtr,
+ TupHeadOffset,
+ &theAttrinfo,
+ (Uint32)1,
+ &TregMemBuffer[theRegister],
+ (Uint32)3,
+ false);
+ if (TnoDataRW == 2) {
+ /* ------------------------------------------------------------- */
+ // Two words read means that we get the instruction plus one 32
+ // word read. Thus we set the register to be a 32 bit register.
+ /* ------------------------------------------------------------- */
+ TregMemBuffer[theRegister] = 0x50;
+ * (Int64*)(TregMemBuffer+theRegister+2) = TregMemBuffer[theRegister+1];
+ } else if (TnoDataRW == 3) {
+ /* ------------------------------------------------------------- */
+ // Three words read means that we get the instruction plus two
+ // 32 words read. Thus we set the register to be a 64 bit register.
+ /* ------------------------------------------------------------- */
+ TregMemBuffer[theRegister] = 0x60;
+ TregMemBuffer[theRegister+3] = TregMemBuffer[theRegister+2];
+ TregMemBuffer[theRegister+2] = TregMemBuffer[theRegister+1];
+ } else if (TnoDataRW == 1) {
+ /* ------------------------------------------------------------- */
+ // One word read means that we must have read a NULL value. We set
+ // the register to indicate a NULL value.
+ /* ------------------------------------------------------------- */
+ TregMemBuffer[theRegister] = 0;
+ TregMemBuffer[theRegister + 2] = 0;
+ TregMemBuffer[theRegister + 3] = 0;
+ } else if (TnoDataRW == -1) {
+ jam();
+ tupkeyErrorLab(signal);
+ return -1;
+ } else {
+ /* ------------------------------------------------------------- */
+ // Any other return value from the read attribute here is not
+ // allowed and will lead to a system crash.
+ /* ------------------------------------------------------------- */
+ ndbrequire(false);
+ }//if
+ break;
+ }
+
+ case Interpreter::WRITE_ATTR_FROM_REG:
+ jam();
+ {
+ Uint32 TattrId = theInstruction >> 16;
+ Uint32 TattrDescrIndex = tabptr.p->tabDescriptor +
+ (TattrId << ZAD_LOG_SIZE);
+ Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr;
+ Uint32 TregType = TregMemBuffer[theRegister];
+
+ /* --------------------------------------------------------------- */
+ // Calculate the number of words of this attribute.
+ // We allow writes into arrays as long as they fit into the 64 bit
+ // register size.
+ /* --------------------------------------------------------------- */
+ Uint32 TattrNoOfWords = AttributeDescriptor::getSizeInWords(TattrDesc1);
+ Uint32 Toptype = operPtr.p->optype;
+
+ Uint32 TdataForUpdate[3];
+ Uint32 Tlen;
+
+ AttributeHeader& ah = AttributeHeader::init(&TdataForUpdate[0],
+ TattrId, TattrNoOfWords);
+ TdataForUpdate[1] = TregMemBuffer[theRegister + 2];
+ TdataForUpdate[2] = TregMemBuffer[theRegister + 3];
+ Tlen = TattrNoOfWords + 1;
+ if (Toptype == ZUPDATE) {
+ if (TattrNoOfWords <= 2) {
+ if (TregType == 0) {
+ /* --------------------------------------------------------- */
+ // Write a NULL value into the attribute
+ /* --------------------------------------------------------- */
+ ah.setNULL();
+ Tlen = 1;
+ }//if
+ int TnoDataRW= updateAttributes(pagePtr,
+ TupHeadOffset,
+ &TdataForUpdate[0],
+ Tlen);
+ if (TnoDataRW != -1) {
+ /* --------------------------------------------------------- */
+ // Write the written data also into the log buffer so that it
+ // will be logged.
+ /* --------------------------------------------------------- */
+ logMemory[TdataWritten + 0] = TdataForUpdate[0];
+ logMemory[TdataWritten + 1] = TdataForUpdate[1];
+ logMemory[TdataWritten + 2] = TdataForUpdate[2];
+ TdataWritten += Tlen;
+ } else {
+ tupkeyErrorLab(signal);
+ return -1;
+ }//if
+ } else {
+ return TUPKEY_abort(signal, 15);
+ }//if
+ } else {
+ return TUPKEY_abort(signal, 16);
+ }//if
+ break;
+ }
+
+ case Interpreter::LOAD_CONST_NULL:
+ jam();
+ TregMemBuffer[theRegister] = 0; /* NULL INDICATOR */
+ break;
+
+ case Interpreter::LOAD_CONST16:
+ jam();
+ TregMemBuffer[theRegister] = 0x50; /* 32 BIT UNSIGNED CONSTANT */
+ * (Int64*)(TregMemBuffer+theRegister+2) = theInstruction >> 16;
+ break;
+
+ case Interpreter::LOAD_CONST32:
+ jam();
+ TregMemBuffer[theRegister] = 0x50; /* 32 BIT UNSIGNED CONSTANT */
+ * (Int64*)(TregMemBuffer+theRegister+2) = *
+ (TcurrentProgram+TprogramCounter);
+ TprogramCounter++;
+ break;
+
+ case Interpreter::LOAD_CONST64:
+ jam();
+ TregMemBuffer[theRegister] = 0x60; /* 64 BIT UNSIGNED CONSTANT */
+ TregMemBuffer[theRegister + 2 ] = * (TcurrentProgram + TprogramCounter++);
+ TregMemBuffer[theRegister + 3 ] = * (TcurrentProgram + TprogramCounter++);
+ break;
+
+ case Interpreter::ADD_REG_REG:
+ jam();
+ {
+ Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+ Uint32 TdestRegister = Interpreter::getReg3(theInstruction) << 2;
+
+ Uint32 TrightType = TregMemBuffer[TrightRegister];
+ Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2);
+
+
+ Uint32 TleftType = TregMemBuffer[theRegister];
+ Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2);
+
+ if ((TleftType | TrightType) != 0) {
+ Uint64 Tdest0 = Tleft0 + Tright0;
+ * (Int64*)(TregMemBuffer+TdestRegister+2) = Tdest0;
+ TregMemBuffer[TdestRegister] = 0x60;
+ } else {
+ return TUPKEY_abort(signal, 20);
+ }
+ break;
+ }
+
+ case Interpreter::SUB_REG_REG:
+ jam();
+ {
+ Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+ Uint32 TdestRegister = Interpreter::getReg3(theInstruction) << 2;
+
+ Uint32 TrightType = TregMemBuffer[TrightRegister];
+ Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2);
+
+ Uint32 TleftType = TregMemBuffer[theRegister];
+ Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2);
+
+ if ((TleftType | TrightType) != 0) {
+ Int64 Tdest0 = Tleft0 - Tright0;
+ * (Int64*)(TregMemBuffer+TdestRegister+2) = Tdest0;
+ TregMemBuffer[TdestRegister] = 0x60;
+ } else {
+ return TUPKEY_abort(signal, 20);
+ }
+ break;
+ }
+
+ case Interpreter::BRANCH:
+ TprogramCounter = brancher(theInstruction, TprogramCounter);
+ break;
+
+ case Interpreter::BRANCH_REG_EQ_NULL:
+ if (TregMemBuffer[theRegister] != 0) {
+ jam();
+ continue;
+ } else {
+ jam();
+ TprogramCounter = brancher(theInstruction, TprogramCounter);
+ }//if
+ break;
+
+ case Interpreter::BRANCH_REG_NE_NULL:
+ if (TregMemBuffer[theRegister] == 0) {
+ jam();
+ continue;
+ } else {
+ jam();
+ TprogramCounter = brancher(theInstruction, TprogramCounter);
+ }//if
+ break;
+
+
+ case Interpreter::BRANCH_EQ_REG_REG:
+ {
+ Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+
+ Uint32 TleftType = TregMemBuffer[theRegister];
+ Uint32 Tleft0 = TregMemBuffer[theRegister + 2];
+ Uint32 Tleft1 = TregMemBuffer[theRegister + 3];
+
+ Uint32 TrightType = TregMemBuffer[TrightRegister];
+ Uint32 Tright0 = TregMemBuffer[TrightRegister + 2];
+ Uint32 Tright1 = TregMemBuffer[TrightRegister + 3];
+ if ((TrightType | TleftType) != 0) {
+ jam();
+ if ((Tleft0 == Tright0) && (Tleft1 == Tright1)) {
+ TprogramCounter = brancher(theInstruction, TprogramCounter);
+ }//if
+ } else {
+ return TUPKEY_abort(signal, 23);
+ }//if
+ break;
+ }
+
+ case Interpreter::BRANCH_NE_REG_REG:
+ {
+ Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+
+ Uint32 TleftType = TregMemBuffer[theRegister];
+ Uint32 Tleft0 = TregMemBuffer[theRegister + 2];
+ Uint32 Tleft1 = TregMemBuffer[theRegister + 3];
+
+ Uint32 TrightType = TregMemBuffer[TrightRegister];
+ Uint32 Tright0 = TregMemBuffer[TrightRegister + 2];
+ Uint32 Tright1 = TregMemBuffer[TrightRegister + 3];
+ if ((TrightType | TleftType) != 0) {
+ jam();
+ if ((Tleft0 != Tright0) || (Tleft1 != Tright1)) {
+ TprogramCounter = brancher(theInstruction, TprogramCounter);
+ }//if
+ } else {
+ return TUPKEY_abort(signal, 24);
+ }//if
+ break;
+ }
+
+ case Interpreter::BRANCH_LT_REG_REG:
+ {
+ Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+
+ Uint32 TrightType = TregMemBuffer[TrightRegister];
+ Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2);
+
+ Uint32 TleftType = TregMemBuffer[theRegister];
+ Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2);
+
+
+ if ((TrightType | TleftType) != 0) {
+ jam();
+ if (Tleft0 < Tright0) {
+ TprogramCounter = brancher(theInstruction, TprogramCounter);
+ }//if
+ } else {
+ return TUPKEY_abort(signal, 24);
+ }//if
+ break;
+ }
+
+ case Interpreter::BRANCH_LE_REG_REG:
+ {
+ Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+
+ Uint32 TrightType = TregMemBuffer[TrightRegister];
+ Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2);
+
+ Uint32 TleftType = TregMemBuffer[theRegister];
+ Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2);
+
+
+ if ((TrightType | TleftType) != 0) {
+ jam();
+ if (Tleft0 <= Tright0) {
+ TprogramCounter = brancher(theInstruction, TprogramCounter);
+ }//if
+ } else {
+ return TUPKEY_abort(signal, 26);
+ }//if
+ break;
+ }
+
+ case Interpreter::BRANCH_GT_REG_REG:
+ {
+ Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+
+ Uint32 TrightType = TregMemBuffer[TrightRegister];
+ Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2);
+
+ Uint32 TleftType = TregMemBuffer[theRegister];
+ Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2);
+
+
+ if ((TrightType | TleftType) != 0) {
+ jam();
+ if (Tleft0 > Tright0){
+ TprogramCounter = brancher(theInstruction, TprogramCounter);
+ }//if
+ } else {
+ return TUPKEY_abort(signal, 27);
+ }//if
+ break;
+ }
+
+ case Interpreter::BRANCH_GE_REG_REG:
+ {
+ Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2;
+
+ Uint32 TrightType = TregMemBuffer[TrightRegister];
+ Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2);
+
+ Uint32 TleftType = TregMemBuffer[theRegister];
+ Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2);
+
+
+ if ((TrightType | TleftType) != 0) {
+ jam();
+ if (Tleft0 >= Tright0){
+ TprogramCounter = brancher(theInstruction, TprogramCounter);
+ }//if
+ } else {
+ return TUPKEY_abort(signal, 28);
+ }//if
+ break;
+ }
+
+ case Interpreter::BRANCH_ATTR_OP_ARG:{
+ jam();
+ Uint32 cond = Interpreter::getBinaryCondition(theInstruction);
+ Uint32 ins2 = TcurrentProgram[TprogramCounter];
+ Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
+ Uint32 argLen = Interpreter::getBranchCol_Len(ins2);
+
+ if(tmpHabitant != attrId){
+ Int32 TnoDataR = readAttributes(pagePtr,
+ TupHeadOffset,
+ &attrId, 1,
+ tmpArea, tmpAreaSz,
+ false);
+
+ if (TnoDataR == -1) {
+ jam();
+ tupkeyErrorLab(signal);
+ return -1;
+ }
+ tmpHabitant = attrId;
+ }
+
+ // get type
+ attrId >>= 16;
+ Uint32 TattrDescrIndex = tabptr.p->tabDescriptor +
+ (attrId << ZAD_LOG_SIZE);
+ Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr;
+ Uint32 TattrDesc2 = tableDescriptor[TattrDescrIndex+1].tabDescr;
+ Uint32 typeId = AttributeDescriptor::getType(TattrDesc1);
+ void * cs = 0;
+ if(AttributeOffset::getCharsetFlag(TattrDesc2))
+ {
+ Uint32 pos = AttributeOffset::getCharsetPos(TattrDesc2);
+ cs = tabptr.p->charsetArray[pos];
+ }
+ const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(typeId);
+
+ // get data
+ AttributeHeader ah(tmpArea[0]);
+ const char* s1 = (char*)&tmpArea[1];
+ const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
+ // fixed length in 5.0
+ Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1);
+
+ bool r1_null = ah.isNULL();
+ bool r2_null = argLen == 0;
+ int res1;
+ if (cond != Interpreter::LIKE &&
+ cond != Interpreter::NOT_LIKE) {
+ if (r1_null || r2_null) {
+ // NULL==NULL and NULL<not-NULL
+ res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
+ } else {
+ res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
+ }
+ } else {
+ if (r1_null || r2_null) {
+ // NULL like NULL is true (has no practical use)
+ res1 = r1_null && r2_null ? 0 : -1;
+ } else {
+ res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
+ }
+ }
+
+ int res = 0;
+ switch ((Interpreter::BinaryCondition)cond) {
+ case Interpreter::EQ:
+ res = (res1 == 0);
+ break;
+ case Interpreter::NE:
+ res = (res1 != 0);
+ break;
+ // note the condition is backwards
+ case Interpreter::LT:
+ res = (res1 > 0);
+ break;
+ case Interpreter::LE:
+ res = (res1 >= 0);
+ break;
+ case Interpreter::GT:
+ res = (res1 < 0);
+ break;
+ case Interpreter::GE:
+ res = (res1 <= 0);
+ break;
+ case Interpreter::LIKE:
+ res = (res1 == 0);
+ break;
+ case Interpreter::NOT_LIKE:
+ res = (res1 == 1);
+ break;
+ // XXX handle invalid value
+ }
+#ifdef TRACE_INTERPRETER
+ ndbout_c("cond=%u attr(%d)='%.*s'(%d) str='%.*s'(%d) res1=%d res=%d",
+ cond, attrId >> 16,
+ attrLen, s1, attrLen, argLen, s2, argLen, res1, res);
+#endif
+ if (res)
+ TprogramCounter = brancher(theInstruction, TprogramCounter);
+ else
+ {
+ Uint32 tmp = ((argLen + 3) >> 2) + 1;
+ TprogramCounter += tmp;
+ }
+ break;
+ }
+
+ case Interpreter::BRANCH_ATTR_EQ_NULL:{
+ jam();
+ Uint32 ins2 = TcurrentProgram[TprogramCounter];
+ Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
+
+ if(tmpHabitant != attrId){
+ Int32 TnoDataR = readAttributes(pagePtr,
+ TupHeadOffset,
+ &attrId, 1,
+ tmpArea, tmpAreaSz,
+ false);
+
+ if (TnoDataR == -1) {
+ jam();
+ tupkeyErrorLab(signal);
+ return -1;
+ }
+ tmpHabitant = attrId;
+ }
+
+ AttributeHeader ah(tmpArea[0]);
+ if(ah.isNULL()){
+ TprogramCounter = brancher(theInstruction, TprogramCounter);
+ } else {
+ TprogramCounter ++;
+ }
+ break;
+ }
+
+ case Interpreter::BRANCH_ATTR_NE_NULL:{
+ jam();
+ Uint32 ins2 = TcurrentProgram[TprogramCounter];
+ Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
+
+ if(tmpHabitant != attrId){
+ Int32 TnoDataR = readAttributes(pagePtr,
+ TupHeadOffset,
+ &attrId, 1,
+ tmpArea, tmpAreaSz,
+ false);
+
+ if (TnoDataR == -1) {
+ jam();
+ tupkeyErrorLab(signal);
+ return -1;
+ }
+ tmpHabitant = attrId;
+ }
+
+ AttributeHeader ah(tmpArea[0]);
+ if(ah.isNULL()){
+ TprogramCounter ++;
+ } else {
+ TprogramCounter = brancher(theInstruction, TprogramCounter);
+ }
+ break;
+ }
+
+ case Interpreter::EXIT_OK:
+ jam();
+#ifdef TRACE_INTERPRETER
+ ndbout_c(" - exit_ok");
+#endif
+ return TdataWritten;
+
+ case Interpreter::EXIT_OK_LAST:
+ jam();
+#ifdef TRACE_INTERPRETER
+ ndbout_c(" - exit_ok_last");
+#endif
+ operPtr.p->lastRow = 1;
+ return TdataWritten;
+
+ case Interpreter::EXIT_REFUSE:
+ jam();
+#ifdef TRACE_INTERPRETER
+ ndbout_c(" - exit_nok");
+#endif
+ terrorCode = theInstruction >> 16;
+ return TUPKEY_abort(signal, 29);
+
+ case Interpreter::CALL:
+ jam();
+ RstackPtr++;
+ if (RstackPtr < 32) {
+ TstackMemBuffer[RstackPtr] = TprogramCounter + 1;
+ TprogramCounter = theInstruction >> 16;
+ if (TprogramCounter < TsubroutineLen) {
+ TcurrentProgram = subroutineProg;
+ TcurrentSize = TsubroutineLen;
+ } else {
+ return TUPKEY_abort(signal, 30);
+ }//if
+ } else {
+ return TUPKEY_abort(signal, 31);
+ }//if
+ break;
+
+ case Interpreter::RETURN:
+ jam();
+ if (RstackPtr > 0) {
+ TprogramCounter = TstackMemBuffer[RstackPtr];
+ RstackPtr--;
+ if (RstackPtr == 0) {
+ jam();
+ /* ------------------------------------------------------------- */
+ // We are back to the main program.
+ /* ------------------------------------------------------------- */
+ TcurrentProgram = mainProgram;
+ TcurrentSize = TmainProgLen;
+ }//if
+ } else {
+ return TUPKEY_abort(signal, 32);
+ }//if
+ break;
+
+ default:
+ return TUPKEY_abort(signal, 33);
+ }//switch
+ } else {
+ return TUPKEY_abort(signal, 34);
+ }//if
+ }//while
+ return TUPKEY_abort(signal, 35);
+}//Dbtup::interpreterNextLab()
+
+