diff options
Diffstat (limited to 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp')
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp | 2052 |
1 files changed, 2052 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp new file mode 100644 index 00000000000..761f959acdc --- /dev/null +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp @@ -0,0 +1,2052 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#define DBTUP_C +#include "Dbtup.hpp" +#include <RefConvert.hpp> +#include <ndb_limits.h> +#include <pc.hpp> +#include <AttributeDescriptor.hpp> +#include "AttributeOffset.hpp" +#include <AttributeHeader.hpp> +#include <Interpreter.hpp> +#include <signaldata/TupCommit.hpp> +#include <signaldata/TupKey.hpp> +#include <NdbSqlUtil.hpp> + +/* ----------------------------------------------------------------- */ +/* ----------- INIT_STORED_OPERATIONREC -------------- */ +/* ----------------------------------------------------------------- */ +int Dbtup::initStoredOperationrec(Operationrec* const regOperPtr, + Uint32 storedId) +{ + jam(); + StoredProcPtr storedPtr; + c_storedProcPool.getPtr(storedPtr, storedId); + if (storedPtr.i != RNIL) { + if (storedPtr.p->storedCode == ZSCAN_PROCEDURE) { + storedPtr.p->storedCounter++; + regOperPtr->firstAttrinbufrec = storedPtr.p->storedLinkFirst; + regOperPtr->lastAttrinbufrec = storedPtr.p->storedLinkLast; + regOperPtr->attrinbufLen = storedPtr.p->storedProcLength; + regOperPtr->currentAttrinbufLen = storedPtr.p->storedProcLength; + return ZOK; + }//if + }//if + terrorCode = ZSTORED_PROC_ID_ERROR; + return terrorCode; +}//Dbtup::initStoredOperationrec() + +void Dbtup::copyAttrinfo(Signal* signal, + Operationrec * const regOperPtr, + Uint32* inBuffer) +{ + AttrbufrecPtr copyAttrBufPtr; + Uint32 RnoOfAttrBufrec = cnoOfAttrbufrec; + int RbufLen; + Uint32 RinBufIndex = 0; + Uint32 Rnext; + Uint32 Rfirst; + Uint32 TstoredProcedure = (regOperPtr->storedProcedureId != ZNIL); + Uint32 RnoFree = cnoFreeAttrbufrec; + +//------------------------------------------------------------------------- +// As a prelude to the execution of the TUPKEYREQ we will copy the program +// into the inBuffer to enable easy execution without any complex jumping +// between the buffers. In particular this will make the interpreter less +// complex. Hopefully it does also improve performance. +//------------------------------------------------------------------------- + copyAttrBufPtr.i = regOperPtr->firstAttrinbufrec; + while (copyAttrBufPtr.i != RNIL) { + jam(); + ndbrequire(copyAttrBufPtr.i < RnoOfAttrBufrec); + ptrAss(copyAttrBufPtr, attrbufrec); + RbufLen = copyAttrBufPtr.p->attrbuf[ZBUF_DATA_LEN]; + Rnext = copyAttrBufPtr.p->attrbuf[ZBUF_NEXT]; + Rfirst = cfirstfreeAttrbufrec; + MEMCOPY_NO_WORDS(&inBuffer[RinBufIndex], + ©AttrBufPtr.p->attrbuf[0], + RbufLen); + RinBufIndex += RbufLen; + if (!TstoredProcedure) { + copyAttrBufPtr.p->attrbuf[ZBUF_NEXT] = Rfirst; + cfirstfreeAttrbufrec = copyAttrBufPtr.i; + RnoFree++; + }//if + copyAttrBufPtr.i = Rnext; + }//while + cnoFreeAttrbufrec = RnoFree; + if (TstoredProcedure) { + jam(); + StoredProcPtr storedPtr; + c_storedProcPool.getPtr(storedPtr, (Uint32)regOperPtr->storedProcedureId); + ndbrequire(storedPtr.p->storedCode == ZSCAN_PROCEDURE); + storedPtr.p->storedCounter--; + regOperPtr->storedProcedureId = ZNIL; + }//if + // Release the ATTRINFO buffers + regOperPtr->firstAttrinbufrec = RNIL; + regOperPtr->lastAttrinbufrec = RNIL; +}//Dbtup::copyAttrinfo() + +void Dbtup::handleATTRINFOforTUPKEYREQ(Signal* signal, + Uint32 length, + Operationrec * const regOperPtr) +{ + AttrbufrecPtr TAttrinbufptr; + TAttrinbufptr.i = cfirstfreeAttrbufrec; + if ((cfirstfreeAttrbufrec < cnoOfAttrbufrec) && + (cnoFreeAttrbufrec > MIN_ATTRBUF)) { + ptrAss(TAttrinbufptr, attrbufrec); + MEMCOPY_NO_WORDS(&TAttrinbufptr.p->attrbuf[0], + &signal->theData[3], + length); + Uint32 RnoFree = cnoFreeAttrbufrec; + Uint32 Rnext = TAttrinbufptr.p->attrbuf[ZBUF_NEXT]; + TAttrinbufptr.p->attrbuf[ZBUF_DATA_LEN] = length; + TAttrinbufptr.p->attrbuf[ZBUF_NEXT] = RNIL; + + AttrbufrecPtr locAttrinbufptr; + Uint32 RnewLen = regOperPtr->currentAttrinbufLen; + + locAttrinbufptr.i = regOperPtr->lastAttrinbufrec; + cfirstfreeAttrbufrec = Rnext; + cnoFreeAttrbufrec = RnoFree - 1; + RnewLen += length; + regOperPtr->lastAttrinbufrec = TAttrinbufptr.i; + regOperPtr->currentAttrinbufLen = RnewLen; + if (locAttrinbufptr.i == RNIL) { + regOperPtr->firstAttrinbufrec = TAttrinbufptr.i; + return; + } else { + jam(); + ptrCheckGuard(locAttrinbufptr, cnoOfAttrbufrec, attrbufrec); + locAttrinbufptr.p->attrbuf[ZBUF_NEXT] = TAttrinbufptr.i; + }//if + if (RnewLen < ZATTR_BUFFER_SIZE) { + return; + } else { + jam(); + regOperPtr->transstate = TOO_MUCH_AI; + return; + }//if + } else if (cnoFreeAttrbufrec <= MIN_ATTRBUF) { + jam(); + regOperPtr->transstate = ERROR_WAIT_TUPKEYREQ; + } else { + ndbrequire(false); + }//if +}//Dbtup::handleATTRINFOforTUPKEYREQ() + +void Dbtup::execATTRINFO(Signal* signal) +{ + OperationrecPtr regOpPtr; + Uint32 Rsig0 = signal->theData[0]; + Uint32 Rlen = signal->length(); + regOpPtr.i = Rsig0; + + jamEntry(); + + ptrCheckGuard(regOpPtr, cnoOfOprec, operationrec); + if (regOpPtr.p->transstate == IDLE) { + handleATTRINFOforTUPKEYREQ(signal, Rlen - 3, regOpPtr.p); + return; + } else if (regOpPtr.p->transstate == WAIT_STORED_PROCEDURE_ATTR_INFO) { + storedProcedureAttrInfo(signal, regOpPtr.p, Rlen - 3, 3, false); + return; + }//if + switch (regOpPtr.p->transstate) { + case ERROR_WAIT_STORED_PROCREQ: + jam(); + case TOO_MUCH_AI: + jam(); + case ERROR_WAIT_TUPKEYREQ: + jam(); + return; /* IGNORE ATTRINFO IN THOSE STATES, WAITING FOR ABORT SIGNAL */ + break; + case DISCONNECTED: + jam(); + case STARTED: + jam(); + default: + ndbrequire(false); + }//switch +}//Dbtup::execATTRINFO() + +void Dbtup::execTUP_ALLOCREQ(Signal* signal) +{ + OperationrecPtr regOperPtr; + TablerecPtr regTabPtr; + FragrecordPtr regFragPtr; + + jamEntry(); + + regOperPtr.i = signal->theData[0]; + regFragPtr.i = signal->theData[1]; + regTabPtr.i = signal->theData[2]; + + if (!((regOperPtr.i < cnoOfOprec) && + (regFragPtr.i < cnoOfFragrec) && + (regTabPtr.i < cnoOfTablerec))) { + ndbrequire(false); + }//if + ptrAss(regOperPtr, operationrec); + ptrAss(regFragPtr, fragrecord); + ptrAss(regTabPtr, tablerec); + +//--------------------------------------------------- +/* --- Allocate a tuple as requested by ACC --- */ +//--------------------------------------------------- + PagePtr pagePtr; + Uint32 pageOffset; + if (!allocTh(regFragPtr.p, + regTabPtr.p, + NORMAL_PAGE, + signal, + pageOffset, + pagePtr)) { + signal->theData[0] = terrorCode; // Indicate failure + return; + }//if + Uint32 fragPageId = pagePtr.p->pageWord[ZPAGE_FRAG_PAGE_ID_POS]; + Uint32 pageIndex = ((pageOffset - ZPAGE_HEADER_SIZE) / + regTabPtr.p->tupheadsize) << 1; + regOperPtr.p->tableRef = regTabPtr.i; + regOperPtr.p->fragId = regFragPtr.p->fragmentId; + regOperPtr.p->realPageId = pagePtr.i; + regOperPtr.p->fragPageId = fragPageId; + regOperPtr.p->pageOffset = pageOffset; + regOperPtr.p->pageIndex = pageIndex; + /* -------------------------------------------------------------- */ + /* AN INSERT IS UNDONE BY FREEING THE DATA OCCUPIED BY THE INSERT */ + /* THE ONLY DATA WE HAVE TO LOG EXCEPT THE TYPE, PAGE AND INDEX */ + /* IS THE AMOUNT OF DATA TO FREE */ + /* -------------------------------------------------------------- */ + if (isUndoLoggingNeeded(regFragPtr.p, fragPageId)) { + jam(); + cprAddUndoLogRecord(signal, + ZLCPR_TYPE_DELETE_TH, + fragPageId, + pageIndex, + regTabPtr.i, + regFragPtr.p->fragmentId, + regFragPtr.p->checkpointVersion); + }//if + + //--------------------------------------------------------------- + // Initialise Active operation list by setting the list to empty + //--------------------------------------------------------------- + ndbrequire(pageOffset < ZWORDS_ON_PAGE); + pagePtr.p->pageWord[pageOffset] = RNIL; + + signal->theData[0] = 0; + signal->theData[1] = fragPageId; + signal->theData[2] = pageIndex; +}//Dbtup::execTUP_ALLOCREQ() + +void +Dbtup::setChecksum(Page* const pagePtr, Uint32 tupHeadOffset, Uint32 tupHeadSize) +{ + // 2 == regTabPtr.p->tupChecksumIndex + pagePtr->pageWord[tupHeadOffset + 2] = 0; + Uint32 checksum = calculateChecksum(pagePtr, tupHeadOffset, tupHeadSize); + pagePtr->pageWord[tupHeadOffset + 2] = checksum; +}//Dbtup::setChecksum() + +Uint32 +Dbtup::calculateChecksum(Page* pagePtr, + Uint32 tupHeadOffset, + Uint32 tupHeadSize) +{ + Uint32 checksum = 0; + Uint32 loopStop = tupHeadOffset + tupHeadSize; + ndbrequire(loopStop <= ZWORDS_ON_PAGE); + // includes tupVersion + for (Uint32 i = tupHeadOffset + 1; i < loopStop; i++) { + checksum ^= pagePtr->pageWord[i]; + }//if + return checksum; +}//Dbtup::calculateChecksum() + +/* ----------------------------------------------------------------- */ +/* ----------- INSERT_ACTIVE_OP_LIST -------------- */ +/* ----------------------------------------------------------------- */ +void Dbtup::insertActiveOpList(Signal* signal, + OperationrecPtr regOperPtr, + Page* const pagePtr, + Uint32 pageOffset) +{ + OperationrecPtr iaoPrevOpPtr; + ndbrequire(regOperPtr.p->inActiveOpList == ZFALSE); + regOperPtr.p->inActiveOpList = ZTRUE; + ndbrequire(pageOffset < ZWORDS_ON_PAGE); + iaoPrevOpPtr.i = pagePtr->pageWord[pageOffset]; + pagePtr->pageWord[pageOffset] = regOperPtr.i; + regOperPtr.p->prevActiveOp = RNIL; + regOperPtr.p->nextActiveOp = iaoPrevOpPtr.i; + if (iaoPrevOpPtr.i == RNIL) { + return; + } else { + jam(); + ptrCheckGuard(iaoPrevOpPtr, cnoOfOprec, operationrec); + iaoPrevOpPtr.p->prevActiveOp = regOperPtr.i; + if (iaoPrevOpPtr.p->optype == ZDELETE && + regOperPtr.p->optype == ZINSERT) { + jam(); + // mark both + iaoPrevOpPtr.p->deleteInsertFlag = 1; + regOperPtr.p->deleteInsertFlag = 1; + } + return; + }//if +}//Dbtup::insertActiveOpList() + +void Dbtup::linkOpIntoFragList(OperationrecPtr regOperPtr, + Fragrecord* const regFragPtr) +{ + OperationrecPtr sopTmpOperPtr; + Uint32 tail = regFragPtr->lastusedOprec; + ndbrequire(regOperPtr.p->inFragList == ZFALSE); + regOperPtr.p->inFragList = ZTRUE; + regOperPtr.p->prevOprecInList = tail; + regOperPtr.p->nextOprecInList = RNIL; + sopTmpOperPtr.i = tail; + if (tail == RNIL) { + regFragPtr->firstusedOprec = regOperPtr.i; + } else { + jam(); + ptrCheckGuard(sopTmpOperPtr, cnoOfOprec, operationrec); + sopTmpOperPtr.p->nextOprecInList = regOperPtr.i; + }//if + regFragPtr->lastusedOprec = regOperPtr.i; +}//Dbtup::linkOpIntoFragList() + +/* +This routine is optimised for use from TUPKEYREQ. +This means that a lot of input data is stored in the operation record. +The routine expects the following data in the operation record to be +set-up properly. +Transaction data +1) transid1 +2) transid2 +3) savePointId + +Operation data +4) optype +5) dirtyOp + +Tuple address +6) fragPageId +7) pageIndex + +regFragPtr and regTabPtr are references to the table and fragment data and +is read-only. + +The routine will set up the following data in the operation record if +returned with success. + +Tuple address data +1) realPageId +2) fragPageId +3) pageOffset +4) pageIndex + +Also the pagePtr is an output variable if the routine returns with success. +It's input value can be undefined. +*/ +bool +Dbtup::getPage(PagePtr& pagePtr, + Operationrec* const regOperPtr, + Fragrecord* const regFragPtr, + Tablerec* const regTabPtr) +{ +/* ------------------------------------------------------------------------- */ +// GET THE REFERENCE TO THE TUPLE HEADER BY TRANSLATING THE FRAGMENT PAGE ID +// INTO A REAL PAGE ID AND BY USING THE PAGE INDEX TO DERIVE THE PROPER INDEX +// IN THE REAL PAGE. +/* ------------------------------------------------------------------------- */ + pagePtr.i = getRealpid(regFragPtr, regOperPtr->fragPageId); + regOperPtr->realPageId = pagePtr.i; + Uint32 RpageIndex = regOperPtr->pageIndex; + Uint32 Rtupheadsize = regTabPtr->tupheadsize; + ptrCheckGuard(pagePtr, cnoOfPage, page); + Uint32 RpageIndexScaled = RpageIndex >> 1; + ndbrequire((RpageIndex & 1) == 0); + regOperPtr->pageOffset = ZPAGE_HEADER_SIZE + + (Rtupheadsize * RpageIndexScaled); + + OperationrecPtr leaderOpPtr; + ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE); + leaderOpPtr.i = pagePtr.p->pageWord[regOperPtr->pageOffset]; + if (leaderOpPtr.i == RNIL) { + return true; + }//if + ptrCheckGuard(leaderOpPtr, cnoOfOprec, operationrec); + bool dirtyRead = ((regOperPtr->optype == ZREAD) && + (regOperPtr->dirtyOp == 1)); + if (dirtyRead) { + bool sameTrans = ((regOperPtr->transid1 == leaderOpPtr.p->transid1) && + (regOperPtr->transid2 == leaderOpPtr.p->transid2)); + if (!sameTrans) { + if (!getPageLastCommitted(regOperPtr, leaderOpPtr.p)) { + return false; + }//if + pagePtr.i = regOperPtr->realPageId; + ptrCheckGuard(pagePtr, cnoOfPage, page); + return true; + }//if + }//if + if (regOperPtr->optype == ZREAD) { + /* + Read uses savepoint id's to find the correct tuple version. + */ + if (getPageThroughSavePoint(regOperPtr, leaderOpPtr.p)) { + jam(); + pagePtr.i = regOperPtr->realPageId; + ptrCheckGuard(pagePtr, cnoOfPage, page); + return true; + } + return false; + } +//---------------------------------------------------------------------- +// Check that no other operation is already active on the tuple. Also +// that abort or commit is not ongoing. +//---------------------------------------------------------------------- + if (leaderOpPtr.p->tupleState == NO_OTHER_OP) { + jam(); + if ((leaderOpPtr.p->optype == ZDELETE) && + (regOperPtr->optype != ZINSERT)) { + jam(); + terrorCode = ZTUPLE_DELETED_ERROR; + return false; + }//if + return true; + } else if (leaderOpPtr.p->tupleState == ALREADY_ABORTED) { + jam(); + terrorCode = ZMUST_BE_ABORTED_ERROR; + return false; + } else { + ndbrequire(false); + }//if + return true; +}//Dbtup::getPage() + +bool +Dbtup::getPageThroughSavePoint(Operationrec* regOperPtr, + Operationrec* leaderOpPtr) +{ + bool found = false; + OperationrecPtr loopOpPtr; + loopOpPtr.p = leaderOpPtr; + while(true) { + if (regOperPtr->savePointId > loopOpPtr.p->savePointId) { + jam(); + found = true; + break; + } + if (loopOpPtr.p->nextActiveOp == RNIL) { + break; + } + loopOpPtr.i = loopOpPtr.p->nextActiveOp; + ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec); + jam(); + } + if (!found) { + return getPageLastCommitted(regOperPtr, loopOpPtr.p); + } else { + if (loopOpPtr.p->optype == ZDELETE) { + jam(); + terrorCode = ZTUPLE_DELETED_ERROR; + return false; + } + if (loopOpPtr.p->tupleState == ALREADY_ABORTED) { + /* + Requested tuple version has already been aborted + */ + jam(); + terrorCode = ZMUST_BE_ABORTED_ERROR; + return false; + } + bool use_copy; + if (loopOpPtr.p->prevActiveOp == RNIL) { + jam(); + /* + Use original tuple since we are reading from the last written tuple. + We are the + */ + use_copy = false; + } else { + /* + Go forward in time to find a copy of the tuple which this operation + produced + */ + loopOpPtr.i = loopOpPtr.p->prevActiveOp; + ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec); + if (loopOpPtr.p->optype == ZDELETE) { + /* + This operation was a Delete and thus have no copy tuple attached to + it. We will move forward to the next that either doesn't exist in + which case we will return the original tuple of any operation and + otherwise it must be an insert which contains a copy record. + */ + if (loopOpPtr.p->prevActiveOp == RNIL) { + jam(); + use_copy = false; + } else { + jam(); + loopOpPtr.i = loopOpPtr.p->prevActiveOp; + ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec); + ndbrequire(loopOpPtr.p->optype == ZINSERT); + use_copy = true; + } + } else if (loopOpPtr.p->optype == ZUPDATE) { + jam(); + /* + This operation which was the next in time have a copy which was the + result of the previous operation which we want to use. Thus use + the copy tuple of this operation. + */ + use_copy = true; + } else { + /* + This operation was an insert that happened after an insert or update. + This is not a possible case. + */ + ndbrequire(false); + return false; + } + } + if (use_copy) { + regOperPtr->realPageId = loopOpPtr.p->realPageIdC; + regOperPtr->fragPageId = loopOpPtr.p->fragPageIdC; + regOperPtr->pageIndex = loopOpPtr.p->pageIndexC; + regOperPtr->pageOffset = loopOpPtr.p->pageOffsetC; + } else { + regOperPtr->realPageId = loopOpPtr.p->realPageId; + regOperPtr->fragPageId = loopOpPtr.p->fragPageId; + regOperPtr->pageIndex = loopOpPtr.p->pageIndex; + regOperPtr->pageOffset = loopOpPtr.p->pageOffset; + } + return true; + } +} + +bool +Dbtup::getPageLastCommitted(Operationrec* const regOperPtr, + Operationrec* const leaderOpPtr) +{ +//---------------------------------------------------------------------- +// Dirty reads wants to read the latest committed tuple. The latest +// tuple value could be not existing or else we have to find the copy +// tuple. Start by finding the end of the list to find the first operation +// on the record in the ongoing transaction. +//---------------------------------------------------------------------- + jam(); + OperationrecPtr loopOpPtr; + loopOpPtr.p = leaderOpPtr; + while (loopOpPtr.p->nextActiveOp != RNIL) { + jam(); + loopOpPtr.i = loopOpPtr.p->nextActiveOp; + ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec); + }//while + if (loopOpPtr.p->optype == ZINSERT) { + jam(); +//---------------------------------------------------------------------- +// With an insert in the start of the list we know that the tuple did not +// exist before this transaction was started. We don't care if the current +// transaction is in the commit phase since the commit is not really +// completed until the operation is gone from TUP. +//---------------------------------------------------------------------- + terrorCode = ZTUPLE_DELETED_ERROR; + return false; + } else { +//---------------------------------------------------------------------- +// A successful update and delete as first in the queue means that a tuple +// exist in the committed world. We need to find it. +//---------------------------------------------------------------------- + if (loopOpPtr.p->optype == ZUPDATE) { + jam(); +//---------------------------------------------------------------------- +// The first operation was a delete we set our tuple reference to the +// copy tuple of this operation. +//---------------------------------------------------------------------- + regOperPtr->realPageId = loopOpPtr.p->realPageIdC; + regOperPtr->fragPageId = loopOpPtr.p->fragPageIdC; + regOperPtr->pageIndex = loopOpPtr.p->pageIndexC; + regOperPtr->pageOffset = loopOpPtr.p->pageOffsetC; + } else if ((loopOpPtr.p->optype == ZDELETE) && + (loopOpPtr.p->prevActiveOp == RNIL)) { + jam(); +//---------------------------------------------------------------------- +// There was only a delete. The original tuple still is ok. +//---------------------------------------------------------------------- + } else { + jam(); +//---------------------------------------------------------------------- +// There was another operation after the delete, this must be an insert +// and we have found our copy tuple there. +//---------------------------------------------------------------------- + loopOpPtr.i = loopOpPtr.p->prevActiveOp; + ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec); + ndbrequire(loopOpPtr.p->optype == ZINSERT); + regOperPtr->realPageId = loopOpPtr.p->realPageIdC; + regOperPtr->fragPageId = loopOpPtr.p->fragPageIdC; + regOperPtr->pageIndex = loopOpPtr.p->pageIndexC; + regOperPtr->pageOffset = loopOpPtr.p->pageOffsetC; + }//if + }//if + return true; +}//Dbtup::getPageLastCommitted() + +void Dbtup::execTUPKEYREQ(Signal* signal) +{ + TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtr(); + Uint32 RoperPtr = tupKeyReq->connectPtr; + Uint32 Rtabptr = tupKeyReq->tableRef; + Uint32 RfragId = tupKeyReq->fragId; + Uint32 Rstoredid = tupKeyReq->storedProcedure; + Uint32 Rfragptr = tupKeyReq->fragPtr; + + Uint32 RnoOfOprec = cnoOfOprec; + Uint32 RnoOfTablerec = cnoOfTablerec; + Uint32 RnoOfFragrec = cnoOfFragrec; + + operPtr.i = RoperPtr; + fragptr.i = Rfragptr; + tabptr.i = Rtabptr; + jamEntry(); + + ndbrequire(((RoperPtr < RnoOfOprec) && + (Rtabptr < RnoOfTablerec) && + (Rfragptr < RnoOfFragrec))); + ptrAss(operPtr, operationrec); + Operationrec * const regOperPtr = operPtr.p; + ptrAss(fragptr, fragrecord); + Fragrecord * const regFragPtr = fragptr.p; + ptrAss(tabptr, tablerec); + Tablerec* const regTabPtr = tabptr.p; + + Uint32 TrequestInfo = tupKeyReq->request; + + if (regOperPtr->transstate != IDLE) { + TUPKEY_abort(signal, 39); + return; + }//if +/* ----------------------------------------------------------------- */ +// Operation is ZREAD when we arrive here so no need to worry about the +// abort process. +/* ----------------------------------------------------------------- */ +/* ----------- INITIATE THE OPERATION RECORD -------------- */ +/* ----------------------------------------------------------------- */ + regOperPtr->fragmentPtr = Rfragptr; + regOperPtr->dirtyOp = TrequestInfo & 1; + regOperPtr->opSimple = (TrequestInfo >> 1) & 1; + regOperPtr->interpretedExec = (TrequestInfo >> 10) & 1; + regOperPtr->optype = (TrequestInfo >> 6) & 0xf; + + // Attributes needed by trigger execution + regOperPtr->noFiredTriggers = 0; + regOperPtr->tableRef = Rtabptr; + regOperPtr->tcOperationPtr = tupKeyReq->opRef; + regOperPtr->primaryReplica = tupKeyReq->primaryReplica; + regOperPtr->coordinatorTC = tupKeyReq->coordinatorTC; + regOperPtr->tcOpIndex = tupKeyReq->tcOpIndex; + regOperPtr->savePointId = tupKeyReq->savePointId; + + regOperPtr->fragId = RfragId; + + regOperPtr->fragPageId = tupKeyReq->keyRef1; + regOperPtr->pageIndex = tupKeyReq->keyRef2; + regOperPtr->attrinbufLen = regOperPtr->logSize = tupKeyReq->attrBufLen; + regOperPtr->recBlockref = tupKeyReq->applRef; + +// Schema Version in tupKeyReq->schemaVersion not used in this version + regOperPtr->storedProcedureId = Rstoredid; + regOperPtr->transid1 = tupKeyReq->transId1; + regOperPtr->transid2 = tupKeyReq->transId2; + + regOperPtr->attroutbufLen = 0; +/* ----------------------------------------------------------------------- */ +// INITIALISE TO DEFAULT VALUE +// INIT THE COPY REFERENCE RECORDS TO RNIL TO ENSURE THAT THEIR VALUES +// ARE VALID IF THEY EXISTS +// NO PENDING CHECKPOINT WHEN COPY CREATED (DEFAULT) +// NO TUPLE HAS BEEN ALLOCATED YET +// NO COPY HAS BEEN CREATED YET +/* ----------------------------------------------------------------------- */ + regOperPtr->undoLogged = false; + regOperPtr->realPageId = RNIL; + regOperPtr->realPageIdC = RNIL; + regOperPtr->fragPageIdC = RNIL; + + regOperPtr->pageOffset = ZNIL; + regOperPtr->pageOffsetC = ZNIL; + + regOperPtr->pageIndexC = ZNIL; + + // version not yet known + regOperPtr->tupVersion = ZNIL; + regOperPtr->deleteInsertFlag = 0; + + regOperPtr->tupleState = TUPLE_BLOCKED; + regOperPtr->changeMask.clear(); + + if (Rstoredid != ZNIL) { + ndbrequire(initStoredOperationrec(regOperPtr, Rstoredid) == ZOK); + }//if + copyAttrinfo(signal, regOperPtr, &cinBuffer[0]); + + PagePtr pagePtr; + if (!getPage(pagePtr, regOperPtr, regFragPtr, regTabPtr)) { + tupkeyErrorLab(signal); + return; + }//if + + Uint32 Roptype = regOperPtr->optype; + if (Roptype == ZREAD) { + jam(); + if (handleReadReq(signal, regOperPtr, regTabPtr, pagePtr.p) != -1) { + sendTUPKEYCONF(signal, regOperPtr, 0); +/* ------------------------------------------------------------------------- */ +// Read Operations need not to be taken out of any lists. We also do not +// need to wait for commit since there is no changes to commit. Thus we +// prepare the operation record already now for the next operation. +// Write operations have set the state to STARTED above indicating that +// they are waiting for the Commit or Abort decision. +/* ------------------------------------------------------------------------- */ + regOperPtr->transstate = IDLE; + regOperPtr->currentAttrinbufLen = 0; + }//if + return; + }//if + linkOpIntoFragList(operPtr, regFragPtr); + insertActiveOpList(signal, + operPtr, + pagePtr.p, + regOperPtr->pageOffset); + if (isUndoLoggingBlocked(regFragPtr)) { + TUPKEY_abort(signal, 38); + return; + }//if +/* ---------------------------------------------------------------------- */ +// WE SET THE CURRENT ACTIVE OPERATION IN THE TUPLE TO POINT TO OUR +//OPERATION RECORD. IF SEVERAL OPERATIONS WORK ON THIS TUPLE THEY ARE +// LINKED TO OUR OPERATION RECORD. DIRTY READS CAN ACCESS THE COPY +// TUPLE THROUGH OUR OPERATION RECORD. +/* ---------------------------------------------------------------------- */ + if (Roptype == ZINSERT) { + jam(); + if (handleInsertReq(signal, regOperPtr, + regFragPtr, regTabPtr, pagePtr.p) == -1) { + return; + }//if + if (!regTabPtr->tuxCustomTriggers.isEmpty()) { + jam(); + if (executeTuxInsertTriggers(signal, regOperPtr, regTabPtr) != 0) { + jam(); + tupkeyErrorLab(signal); + return; + } + } + checkImmediateTriggersAfterInsert(signal, + regOperPtr, + regTabPtr); + sendTUPKEYCONF(signal, regOperPtr, regOperPtr->logSize); + return; + }//if + if (regTabPtr->checksumIndicator && + (calculateChecksum(pagePtr.p, + regOperPtr->pageOffset, + regTabPtr->tupheadsize) != 0)) { + jam(); + terrorCode = ZTUPLE_CORRUPTED_ERROR; + tupkeyErrorLab(signal); + return; + }//if + if (Roptype == ZUPDATE) { + jam(); + if (handleUpdateReq(signal, regOperPtr, + regFragPtr, regTabPtr, pagePtr.p) == -1) { + return; + }//if + // If update operation is done on primary, + // check any after op triggers + terrorCode = 0; + if (!regTabPtr->tuxCustomTriggers.isEmpty()) { + jam(); + if (executeTuxUpdateTriggers(signal, regOperPtr, regTabPtr) != 0) { + jam(); + tupkeyErrorLab(signal); + return; + } + } + checkImmediateTriggersAfterUpdate(signal, + regOperPtr, + regTabPtr); + // XXX use terrorCode for now since all methods are void + if (terrorCode != 0) { + tupkeyErrorLab(signal); + return; + } + sendTUPKEYCONF(signal, regOperPtr, regOperPtr->logSize); + return; + } else if (Roptype == ZDELETE) { + jam(); + if (handleDeleteReq(signal, regOperPtr, + regFragPtr, regTabPtr, pagePtr.p) == -1) { + return; + }//if + // If delete operation is done on primary, + // check any after op triggers + if (!regTabPtr->tuxCustomTriggers.isEmpty()) { + jam(); + if (executeTuxDeleteTriggers(signal, regOperPtr, regTabPtr) != 0) { + jam(); + tupkeyErrorLab(signal); + return; + } + } + checkImmediateTriggersAfterDelete(signal, + regOperPtr, + regTabPtr); + sendTUPKEYCONF(signal, regOperPtr, 0); + return; + } else { + ndbrequire(false); + }//if +}//Dbtup::execTUPKEYREQ() + +/* ---------------------------------------------------------------- */ +/* ------------------------ CONFIRM REQUEST ----------------------- */ +/* ---------------------------------------------------------------- */ +void Dbtup::sendTUPKEYCONF(Signal* signal, + Operationrec * const regOperPtr, + Uint32 TlogSize) +{ + TupKeyConf * const tupKeyConf = (TupKeyConf *)signal->getDataPtrSend(); + + Uint32 RuserPointer = regOperPtr->userpointer; + Uint32 RattroutbufLen = regOperPtr->attroutbufLen; + Uint32 RnoFiredTriggers = regOperPtr->noFiredTriggers; + BlockReference Ruserblockref = regOperPtr->userblockref; + Uint32 lastRow = regOperPtr->lastRow; + + regOperPtr->transstate = STARTED; + regOperPtr->tupleState = NO_OTHER_OP; + tupKeyConf->userPtr = RuserPointer; + tupKeyConf->readLength = RattroutbufLen; + tupKeyConf->writeLength = TlogSize; + tupKeyConf->noFiredTriggers = RnoFiredTriggers; + tupKeyConf->lastRow = lastRow; + + EXECUTE_DIRECT(refToBlock(Ruserblockref), GSN_TUPKEYCONF, signal, + TupKeyConf::SignalLength); + return; +}//Dbtup::sendTUPKEYCONF() + +#define MAX_READ (sizeof(signal->theData) > MAX_MESSAGE_SIZE ? MAX_MESSAGE_SIZE : sizeof(signal->theData)) + +/* ---------------------------------------------------------------- */ +/* ----------------------------- READ ---------------------------- */ +/* ---------------------------------------------------------------- */ +int Dbtup::handleReadReq(Signal* signal, + Operationrec* const regOperPtr, + Tablerec* const regTabPtr, + Page* pagePtr) +{ + Uint32 Ttupheadoffset = regOperPtr->pageOffset; + const BlockReference sendBref = regOperPtr->recBlockref; + if (regTabPtr->checksumIndicator && + (calculateChecksum(pagePtr, Ttupheadoffset, + regTabPtr->tupheadsize) != 0)) { + jam(); + terrorCode = ZTUPLE_CORRUPTED_ERROR; + tupkeyErrorLab(signal); + return -1; + }//if + + Uint32 * dst = &signal->theData[25]; + Uint32 dstLen = (MAX_READ / 4) - 25; + const Uint32 node = refToNode(sendBref); + if(node != 0 && node != getOwnNodeId()) { + ; + } else { + jam(); + /** + * execute direct + */ + dst = &signal->theData[3]; + dstLen = (MAX_READ / 4) - 3; + } + + if (regOperPtr->interpretedExec != 1) { + jam(); + int ret = readAttributes(pagePtr, + Ttupheadoffset, + &cinBuffer[0], + regOperPtr->attrinbufLen, + dst, + dstLen, + false); + if (ret != -1) { +/* ------------------------------------------------------------------------- */ +// We have read all data into coutBuffer. Now send it to the API. +/* ------------------------------------------------------------------------- */ + jam(); + Uint32 TnoOfDataRead= (Uint32) ret; + regOperPtr->attroutbufLen = TnoOfDataRead; + sendReadAttrinfo(signal, TnoOfDataRead, regOperPtr); + return 0; + }//if + jam(); + tupkeyErrorLab(signal); + return -1; + } else { + jam(); + regOperPtr->lastRow = 0; + if (interpreterStartLab(signal, pagePtr, Ttupheadoffset) != -1) { + return 0; + }//if + return -1; + }//if +}//Dbtup::handleReadReq() + +/* ---------------------------------------------------------------- */ +/* ---------------------------- UPDATE ---------------------------- */ +/* ---------------------------------------------------------------- */ +int Dbtup::handleUpdateReq(Signal* signal, + Operationrec* const regOperPtr, + Fragrecord* const regFragPtr, + Tablerec* const regTabPtr, + Page* const pagePtr) +{ + PagePtr copyPagePtr; + Uint32 tuple_size = regTabPtr->tupheadsize; + +//--------------------------------------------------- +/* --- MAKE A COPY OF THIS TUPLE ON A COPY PAGE --- */ +//--------------------------------------------------- + Uint32 RpageOffsetC; + if (!allocTh(regFragPtr, + regTabPtr, + COPY_PAGE, + signal, + RpageOffsetC, + copyPagePtr)) { + TUPKEY_abort(signal, 1); + return -1; + }//if + Uint32 RpageIdC = copyPagePtr.i; + Uint32 RfragPageIdC = copyPagePtr.p->pageWord[ZPAGE_FRAG_PAGE_ID_POS]; + Uint32 indexC = ((RpageOffsetC - ZPAGE_HEADER_SIZE) / tuple_size) << 1; + regOperPtr->pageIndexC = indexC; + regOperPtr->fragPageIdC = RfragPageIdC; + regOperPtr->realPageIdC = RpageIdC; + regOperPtr->pageOffsetC = RpageOffsetC; + /* -------------------------------------------------------------- */ + /* IF WE HAVE AN ONGING CHECKPOINT WE HAVE TO LOG THE ALLOCATION */ + /* OF THE TUPLE HEADER TO BE ABLE TO DELETE IT UPON RESTART */ + /* THE ONLY DATA EXCEPT THE TYPE, PAGE, INDEX IS THE SIZE TO FREE */ + /* -------------------------------------------------------------- */ + if (isUndoLoggingActive(regFragPtr)) { + if (isPageUndoLogged(regFragPtr, RfragPageIdC)) { + jam(); + regOperPtr->undoLogged = true; + cprAddUndoLogRecord(signal, + ZLCPR_TYPE_DELETE_TH, + RfragPageIdC, + indexC, + regOperPtr->tableRef, + regOperPtr->fragId, + regFragPtr->checkpointVersion); + }//if + if (isPageUndoLogged(regFragPtr, regOperPtr->fragPageId)) { + jam(); + cprAddUndoLogRecord(signal, + ZLCPR_TYPE_UPDATE_TH, + regOperPtr->fragPageId, + regOperPtr->pageIndex, + regOperPtr->tableRef, + regOperPtr->fragId, + regFragPtr->checkpointVersion); + cprAddData(signal, + regFragPtr, + regOperPtr->realPageId, + tuple_size, + regOperPtr->pageOffset); + }//if + }//if + Uint32 RwordCount = tuple_size - 1; + Uint32 end_dest = RpageOffsetC + tuple_size; + Uint32 offset = regOperPtr->pageOffset; + Uint32 end_source = offset + tuple_size; + ndbrequire(end_dest <= ZWORDS_ON_PAGE && end_source <= ZWORDS_ON_PAGE); + void* Tdestination = (void*)©PagePtr.p->pageWord[RpageOffsetC + 1]; + const void* Tsource = (void*)&pagePtr->pageWord[offset + 1]; + MEMCOPY_NO_WORDS(Tdestination, Tsource, RwordCount); + + Uint32 prev_tup_version; + // nextActiveOp is before this op in event order + if (regOperPtr->nextActiveOp == RNIL) { + jam(); + prev_tup_version = ((const Uint32*)Tsource)[0]; + } else { + OperationrecPtr prevOperPtr; + jam(); + prevOperPtr.i = regOperPtr->nextActiveOp; + ptrCheckGuard(prevOperPtr, cnoOfOprec, operationrec); + prev_tup_version = prevOperPtr.p->tupVersion; + }//if + regOperPtr->tupVersion = (prev_tup_version + 1) & + ((1 << ZTUP_VERSION_BITS) - 1); + // global variable alert + ndbassert(operationrec + operPtr.i == regOperPtr); + copyPagePtr.p->pageWord[RpageOffsetC] = operPtr.i; + + return updateStartLab(signal, regOperPtr, regTabPtr, pagePtr); +}//Dbtup::handleUpdateReq() + +/* ---------------------------------------------------------------- */ +/* ----------------------------- INSERT --------------------------- */ +/* ---------------------------------------------------------------- */ +int Dbtup::handleInsertReq(Signal* signal, + Operationrec* const regOperPtr, + Fragrecord* const regFragPtr, + Tablerec* const regTabPtr, + Page* const pagePtr) +{ + Uint32 ret_value; + + if (regOperPtr->nextActiveOp != RNIL) { + jam(); + OperationrecPtr prevExecOpPtr; + prevExecOpPtr.i = regOperPtr->nextActiveOp; + ptrCheckGuard(prevExecOpPtr, cnoOfOprec, operationrec); + if (prevExecOpPtr.p->optype != ZDELETE) { + terrorCode = ZINSERT_ERROR; + tupkeyErrorLab(signal); + return -1; + }//if + ret_value = handleUpdateReq(signal, regOperPtr, + regFragPtr, regTabPtr, pagePtr); + } else { + jam(); + regOperPtr->tupVersion = 0; + ret_value = updateStartLab(signal, regOperPtr, regTabPtr, pagePtr); + }//if + if (ret_value != (Uint32)-1) { + if (checkNullAttributes(regOperPtr, regTabPtr)) { + jam(); + return 0; + }//if + TUPKEY_abort(signal, 17); + }//if + return -1; +}//Dbtup::handleInsertReq() + +/* ---------------------------------------------------------------- */ +/* ---------------------------- DELETE ---------------------------- */ +/* ---------------------------------------------------------------- */ +int Dbtup::handleDeleteReq(Signal* signal, + Operationrec* const regOperPtr, + Fragrecord* const regFragPtr, + Tablerec* const regTabPtr, + Page* const pagePtr) +{ + // delete must set but not increment tupVersion + if (regOperPtr->nextActiveOp != RNIL) { + OperationrecPtr prevExecOpPtr; + prevExecOpPtr.i = regOperPtr->nextActiveOp; + ptrCheckGuard(prevExecOpPtr, cnoOfOprec, operationrec); + regOperPtr->tupVersion = prevExecOpPtr.p->tupVersion; + } else { + jam(); + regOperPtr->tupVersion = pagePtr->pageWord[regOperPtr->pageOffset + 1]; + } + if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageId)) { + jam(); + cprAddUndoLogRecord(signal, + ZINDICATE_NO_OP_ACTIVE, + regOperPtr->fragPageId, + regOperPtr->pageIndex, + regOperPtr->tableRef, + regOperPtr->fragId, + regFragPtr->checkpointVersion); + }//if + if (regOperPtr->attrinbufLen == 0) { + return 0; + }//if +/* ------------------------------------------------------------------------ */ +/* THE APPLICATION WANTS TO READ THE TUPLE BEFORE IT IS DELETED. */ +/* ------------------------------------------------------------------------ */ + return handleReadReq(signal, regOperPtr, regTabPtr, pagePtr); +}//Dbtup::handleDeleteReq() + +int +Dbtup::updateStartLab(Signal* signal, + Operationrec* const regOperPtr, + Tablerec* const regTabPtr, + Page* const pagePtr) +{ + int retValue; + if (regOperPtr->optype == ZINSERT) { + jam(); + setNullBits(pagePtr, regTabPtr, regOperPtr->pageOffset); + } + if (regOperPtr->interpretedExec != 1) { + jam(); + retValue = updateAttributes(pagePtr, + regOperPtr->pageOffset, + &cinBuffer[0], + regOperPtr->attrinbufLen); + if (retValue == -1) { + tupkeyErrorLab(signal); + return -1; + }//if + } else { + jam(); + retValue = interpreterStartLab(signal, pagePtr, regOperPtr->pageOffset); + }//if + ndbrequire(regOperPtr->tupVersion != ZNIL); + pagePtr->pageWord[regOperPtr->pageOffset + 1] = regOperPtr->tupVersion; + if (regTabPtr->checksumIndicator) { + jam(); + setChecksum(pagePtr, regOperPtr->pageOffset, regTabPtr->tupheadsize); + }//if + return retValue; +}//Dbtup::updateStartLab() + +void +Dbtup::setNullBits(Page* const regPage, Tablerec* const regTabPtr, Uint32 pageOffset) +{ + Uint32 noOfExtraNullWords = regTabPtr->tupNullWords; + Uint32 nullOffsetStart = regTabPtr->tupNullIndex + pageOffset; + ndbrequire((noOfExtraNullWords + nullOffsetStart) < ZWORDS_ON_PAGE); + for (Uint32 i = 0; i < noOfExtraNullWords; i++) { + regPage->pageWord[nullOffsetStart + i] = 0xFFFFFFFF; + }//for +}//Dbtup::setNullBits() + +bool +Dbtup::checkNullAttributes(Operationrec* const regOperPtr, + Tablerec* const regTabPtr) +{ +// Implement checking of updating all not null attributes in an insert here. + Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask; + /* + * The idea here is maybe that changeMask is not-null attributes + * and must contain notNullAttributeMask. But: + * + * 1. changeMask has all bits set on insert + * 2. not-null is checked in each UpdateFunction + * 3. the code below does not work except trivially due to 1. + * + * XXX remove or fix + */ + attributeMask.clear(); + attributeMask.bitOR(regOperPtr->changeMask); + attributeMask.bitAND(regTabPtr->notNullAttributeMask); + attributeMask.bitXOR(regTabPtr->notNullAttributeMask); + if (!attributeMask.isclear()) { + return false; + }//if + return true; +}//Dbtup::checkNullAttributes() + +/* ---------------------------------------------------------------- */ +/* THIS IS THE START OF THE INTERPRETED EXECUTION OF UPDATES. WE */ +/* START BY LINKING ALL ATTRINFO'S IN A DOUBLY LINKED LIST (THEY ARE*/ +/* ALREADY IN A LINKED LIST). WE ALLOCATE A REGISTER MEMORY (EQUAL */ +/* TO AN ATTRINFO RECORD). THE INTERPRETER GOES THROUGH FOUR PHASES*/ +/* DURING THE FIRST PHASE IT IS ONLY ALLOWED TO READ ATTRIBUTES THAT*/ +/* ARE SENT TO THE CLIENT APPLICATION. DURING THE SECOND PHASE IT IS*/ +/* ALLOWED TO READ FROM ATTRIBUTES INTO REGISTERS, TO UPDATE */ +/* ATTRIBUTES BASED ON EITHER A CONSTANT VALUE OR A REGISTER VALUE, */ +/* A DIVERSE SET OF OPERATIONS ON REGISTERS ARE AVAILABLE AS WELL. */ +/* IT IS ALSO POSSIBLE TO PERFORM JUMPS WITHIN THE INSTRUCTIONS THAT*/ +/* BELONGS TO THE SECOND PHASE. ALSO SUBROUTINES CAN BE CALLED IN */ +/* THIS PHASE. THE THIRD PHASE IS TO AGAIN READ ATTRIBUTES AND */ +/* FINALLY THE FOURTH PHASE READS SELECTED REGISTERS AND SEND THEM */ +/* TO THE CLIENT APPLICATION. */ +/* THERE IS A FIFTH REGION WHICH CONTAINS SUBROUTINES CALLABLE FROM */ +/* THE INTERPRETER EXECUTION REGION. */ +/* THE FIRST FIVE WORDS WILL GIVE THE LENGTH OF THE FIVEE REGIONS */ +/* */ +/* THIS MEANS THAT FROM THE APPLICATIONS POINT OF VIEW THE DATABASE */ +/* CAN HANDLE SUBROUTINE CALLS WHERE THE CODE IS SENT IN THE REQUEST*/ +/* THE RETURN PARAMETERS ARE FIXED AND CAN EITHER BE GENERATED */ +/* BEFORE THE EXECUTION OF THE ROUTINE OR AFTER. */ +/* */ +/* IN LATER VERSIONS WE WILL ADD MORE THINGS LIKE THE POSSIBILITY */ +/* TO ALLOCATE MEMORY AND USE THIS AS LOCAL STORAGE. IT IS ALSO */ +/* IMAGINABLE TO HAVE SPECIAL ROUTINES THAT CAN PERFORM CERTAIN */ +/* OPERATIONS ON BLOB'S DEPENDENT ON WHAT THE BLOB REPRESENTS. */ +/* */ +/* */ +/* ----------------------------------------- */ +/* + INITIAL READ REGION + */ +/* ----------------------------------------- */ +/* + INTERPRETED EXECUTE REGION + */ +/* ----------------------------------------- */ +/* + FINAL UPDATE REGION + */ +/* ----------------------------------------- */ +/* + FINAL READ REGION + */ +/* ----------------------------------------- */ +/* + SUBROUTINE REGION + */ +/* ----------------------------------------- */ +/* ---------------------------------------------------------------- */ +/* ---------------------------------------------------------------- */ +/* ----------------- INTERPRETED EXECUTION ----------------------- */ +/* ---------------------------------------------------------------- */ +int Dbtup::interpreterStartLab(Signal* signal, + Page* const pagePtr, + Uint32 TupHeadOffset) +{ + Operationrec * const regOperPtr = operPtr.p; + Uint32 RtotalLen; + int TnoDataRW; + + Uint32 RinitReadLen = cinBuffer[0]; + Uint32 RexecRegionLen = cinBuffer[1]; + Uint32 RfinalUpdateLen = cinBuffer[2]; + Uint32 RfinalRLen = cinBuffer[3]; + Uint32 RsubLen = cinBuffer[4]; + + Uint32 RattrinbufLen = regOperPtr->attrinbufLen; + const BlockReference sendBref = regOperPtr->recBlockref; + + Uint32 * dst = &signal->theData[25]; + Uint32 dstLen = (MAX_READ / 4) - 25; + const Uint32 node = refToNode(sendBref); + if(node != 0 && node != getOwnNodeId()) { + ; + } else { + jam(); + /** + * execute direct + */ + dst = &signal->theData[3]; + dstLen = (MAX_READ / 4) - 3; + } + + RtotalLen = RinitReadLen; + RtotalLen += RexecRegionLen; + RtotalLen += RfinalUpdateLen; + RtotalLen += RfinalRLen; + RtotalLen += RsubLen; + + Uint32 RattroutCounter = 0; + Uint32 RinstructionCounter = 5; + Uint32 RlogSize = 0; + + if (((RtotalLen + 5) == RattrinbufLen) && + (RattrinbufLen >= 5) && + (RattrinbufLen < ZATTR_BUFFER_SIZE)) { + /* ---------------------------------------------------------------- */ + // We start by checking consistency. We must have the first five + // words of the ATTRINFO to give us the length of the regions. The + // size of these regions must be the same as the total ATTRINFO + // length and finally the total length must be within the limits. + /* ---------------------------------------------------------------- */ + + if (RinitReadLen > 0) { + jam(); + /* ---------------------------------------------------------------- */ + // The first step that can be taken in the interpreter is to read + // data of the tuple before any updates have been applied. + /* ---------------------------------------------------------------- */ + TnoDataRW = readAttributes(pagePtr, + TupHeadOffset, + &cinBuffer[5], + RinitReadLen, + &dst[0], + dstLen, + false); + if (TnoDataRW != -1) { + RattroutCounter = TnoDataRW; + RinstructionCounter += RinitReadLen; + } else { + jam(); + tupkeyErrorLab(signal); + return -1; + }//if + }//if + if (RexecRegionLen > 0) { + jam(); + /* ---------------------------------------------------------------- */ + // The next step is the actual interpreted execution. This executes + // a register-based virtual machine which can read and write attributes + // to and from registers. + /* ---------------------------------------------------------------- */ + Uint32 RsubPC = RinstructionCounter + RfinalUpdateLen + RfinalRLen; + TnoDataRW = interpreterNextLab(signal, + pagePtr, + TupHeadOffset, + &clogMemBuffer[0], + &cinBuffer[RinstructionCounter], + RexecRegionLen, + &cinBuffer[RsubPC], + RsubLen, + &coutBuffer[0], + sizeof(coutBuffer) / 4); + if (TnoDataRW != -1) { + RinstructionCounter += RexecRegionLen; + RlogSize = TnoDataRW; + } else { + jam(); + return -1; + }//if + }//if + if (RfinalUpdateLen > 0) { + jam(); + /* ---------------------------------------------------------------- */ + // We can also apply a set of updates without any conditions as part + // of the interpreted execution. + /* ---------------------------------------------------------------- */ + if (regOperPtr->optype == ZUPDATE) { + TnoDataRW = updateAttributes(pagePtr, + TupHeadOffset, + &cinBuffer[RinstructionCounter], + RfinalUpdateLen); + if (TnoDataRW != -1) { + MEMCOPY_NO_WORDS(&clogMemBuffer[RlogSize], + &cinBuffer[RinstructionCounter], + RfinalUpdateLen); + RinstructionCounter += RfinalUpdateLen; + RlogSize += RfinalUpdateLen; + } else { + jam(); + tupkeyErrorLab(signal); + return -1; + }//if + } else { + return TUPKEY_abort(signal, 19); + }//if + }//if + if (RfinalRLen > 0) { + jam(); + /* ---------------------------------------------------------------- */ + // The final action is that we can also read the tuple after it has + // been updated. + /* ---------------------------------------------------------------- */ + TnoDataRW = readAttributes(pagePtr, + TupHeadOffset, + &cinBuffer[RinstructionCounter], + RfinalRLen, + &dst[RattroutCounter], + (dstLen - RattroutCounter), + false); + if (TnoDataRW != -1) { + RattroutCounter += TnoDataRW; + } else { + jam(); + tupkeyErrorLab(signal); + return -1; + }//if + }//if + regOperPtr->logSize = RlogSize; + regOperPtr->attroutbufLen = RattroutCounter; + sendReadAttrinfo(signal, RattroutCounter, regOperPtr); + if (RlogSize > 0) { + sendLogAttrinfo(signal, RlogSize, regOperPtr); + }//if + return 0; + } else { + return TUPKEY_abort(signal, 22); + }//if +}//Dbtup::interpreterStartLab() + +/* ---------------------------------------------------------------- */ +/* WHEN EXECUTION IS INTERPRETED WE NEED TO SEND SOME ATTRINFO*/ +/* BACK TO LQH FOR LOGGING AND SENDING TO BACKUP AND STANDBY */ +/* NODES. */ +/* INPUT: LOG_ATTRINFOPTR WHERE TO FETCH DATA FROM */ +/* TLOG_START FIRST INDEX TO LOG */ +/* TLOG_END LAST INDEX + 1 TO LOG */ +/* ---------------------------------------------------------------- */ +void Dbtup::sendLogAttrinfo(Signal* signal, + Uint32 TlogSize, + Operationrec * const regOperPtr) + +{ + Uint32 TbufferIndex = 0; + signal->theData[0] = regOperPtr->userpointer; + while (TlogSize > 22) { + MEMCOPY_NO_WORDS(&signal->theData[3], + &clogMemBuffer[TbufferIndex], + 22); + EXECUTE_DIRECT(refToBlock(regOperPtr->userblockref), + GSN_TUP_ATTRINFO, signal, 25); + TbufferIndex += 22; + TlogSize -= 22; + }//while + MEMCOPY_NO_WORDS(&signal->theData[3], + &clogMemBuffer[TbufferIndex], + TlogSize); + EXECUTE_DIRECT(refToBlock(regOperPtr->userblockref), + GSN_TUP_ATTRINFO, signal, 3 + TlogSize); +}//Dbtup::sendLogAttrinfo() + +inline +Uint32 +brancher(Uint32 TheInstruction, Uint32 TprogramCounter) +{ + Uint32 TbranchDirection = TheInstruction >> 31; + Uint32 TbranchLength = (TheInstruction >> 16) & 0x7fff; + TprogramCounter--; + if (TbranchDirection == 1) { + jam(); + /* ---------------------------------------------------------------- */ + /* WE JUMP BACKWARDS. */ + /* ---------------------------------------------------------------- */ + return (TprogramCounter - TbranchLength); + } else { + jam(); + /* ---------------------------------------------------------------- */ + /* WE JUMP FORWARD. */ + /* ---------------------------------------------------------------- */ + return (TprogramCounter + TbranchLength); + }//if +}//brancher() + +int Dbtup::interpreterNextLab(Signal* signal, + Page* const pagePtr, + Uint32 TupHeadOffset, + Uint32* logMemory, + Uint32* mainProgram, + Uint32 TmainProgLen, + Uint32* subroutineProg, + Uint32 TsubroutineLen, + Uint32 * tmpArea, + Uint32 tmpAreaSz) +{ + register Uint32* TcurrentProgram = mainProgram; + register Uint32 TcurrentSize = TmainProgLen; + register Uint32 RnoOfInstructions = 0; + register Uint32 TprogramCounter = 0; + register Uint32 theInstruction; + register Uint32 theRegister; + Uint32 TdataWritten = 0; + Uint32 RstackPtr = 0; + union { + Uint32 TregMemBuffer[32]; + Uint64 Tdummy[16]; + }; + Uint32 TstackMemBuffer[32]; + + /* ---------------------------------------------------------------- */ + // Initialise all 8 registers to contain the NULL value. + // In this version we can handle 32 and 64 bit unsigned integers. + // They are handled as 64 bit values. Thus the 32 most significant + // bits are zeroed for 32 bit values. + /* ---------------------------------------------------------------- */ + TregMemBuffer[0] = 0; + TregMemBuffer[4] = 0; + TregMemBuffer[8] = 0; + TregMemBuffer[12] = 0; + TregMemBuffer[16] = 0; + TregMemBuffer[20] = 0; + TregMemBuffer[24] = 0; + TregMemBuffer[28] = 0; + Uint32 tmpHabitant = ~0; + + while (RnoOfInstructions < 8000) { + /* ---------------------------------------------------------------- */ + /* EXECUTE THE NEXT INTERPRETER INSTRUCTION. */ + /* ---------------------------------------------------------------- */ + RnoOfInstructions++; + theInstruction = TcurrentProgram[TprogramCounter]; + theRegister = Interpreter::getReg1(theInstruction) << 2; + if (TprogramCounter < TcurrentSize) { + TprogramCounter++; + switch (Interpreter::getOpCode(theInstruction)) { + case Interpreter::READ_ATTR_INTO_REG: + jam(); + /* ---------------------------------------------------------------- */ + // Read an attribute from the tuple into a register. + // While reading an attribute we allow the attribute to be an array + // as long as it fits in the 64 bits of the register. + /* ---------------------------------------------------------------- */ + { + Uint32 theAttrinfo = theInstruction; + int TnoDataRW= readAttributes(pagePtr, + TupHeadOffset, + &theAttrinfo, + (Uint32)1, + &TregMemBuffer[theRegister], + (Uint32)3, + false); + if (TnoDataRW == 2) { + /* ------------------------------------------------------------- */ + // Two words read means that we get the instruction plus one 32 + // word read. Thus we set the register to be a 32 bit register. + /* ------------------------------------------------------------- */ + TregMemBuffer[theRegister] = 0x50; + * (Int64*)(TregMemBuffer+theRegister+2) = TregMemBuffer[theRegister+1]; + } else if (TnoDataRW == 3) { + /* ------------------------------------------------------------- */ + // Three words read means that we get the instruction plus two + // 32 words read. Thus we set the register to be a 64 bit register. + /* ------------------------------------------------------------- */ + TregMemBuffer[theRegister] = 0x60; + TregMemBuffer[theRegister+3] = TregMemBuffer[theRegister+2]; + TregMemBuffer[theRegister+2] = TregMemBuffer[theRegister+1]; + } else if (TnoDataRW == 1) { + /* ------------------------------------------------------------- */ + // One word read means that we must have read a NULL value. We set + // the register to indicate a NULL value. + /* ------------------------------------------------------------- */ + TregMemBuffer[theRegister] = 0; + TregMemBuffer[theRegister + 2] = 0; + TregMemBuffer[theRegister + 3] = 0; + } else if (TnoDataRW == -1) { + jam(); + tupkeyErrorLab(signal); + return -1; + } else { + /* ------------------------------------------------------------- */ + // Any other return value from the read attribute here is not + // allowed and will lead to a system crash. + /* ------------------------------------------------------------- */ + ndbrequire(false); + }//if + break; + } + + case Interpreter::WRITE_ATTR_FROM_REG: + jam(); + { + Uint32 TattrId = theInstruction >> 16; + Uint32 TattrDescrIndex = tabptr.p->tabDescriptor + + (TattrId << ZAD_LOG_SIZE); + Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr; + Uint32 TregType = TregMemBuffer[theRegister]; + + /* --------------------------------------------------------------- */ + // Calculate the number of words of this attribute. + // We allow writes into arrays as long as they fit into the 64 bit + // register size. + /* --------------------------------------------------------------- */ + Uint32 TattrNoOfWords = AttributeDescriptor::getSizeInWords(TattrDesc1); + Uint32 Toptype = operPtr.p->optype; + + Uint32 TdataForUpdate[3]; + Uint32 Tlen; + + AttributeHeader& ah = AttributeHeader::init(&TdataForUpdate[0], + TattrId, TattrNoOfWords); + TdataForUpdate[1] = TregMemBuffer[theRegister + 2]; + TdataForUpdate[2] = TregMemBuffer[theRegister + 3]; + Tlen = TattrNoOfWords + 1; + if (Toptype == ZUPDATE) { + if (TattrNoOfWords <= 2) { + if (TregType == 0) { + /* --------------------------------------------------------- */ + // Write a NULL value into the attribute + /* --------------------------------------------------------- */ + ah.setNULL(); + Tlen = 1; + }//if + int TnoDataRW= updateAttributes(pagePtr, + TupHeadOffset, + &TdataForUpdate[0], + Tlen); + if (TnoDataRW != -1) { + /* --------------------------------------------------------- */ + // Write the written data also into the log buffer so that it + // will be logged. + /* --------------------------------------------------------- */ + logMemory[TdataWritten + 0] = TdataForUpdate[0]; + logMemory[TdataWritten + 1] = TdataForUpdate[1]; + logMemory[TdataWritten + 2] = TdataForUpdate[2]; + TdataWritten += Tlen; + } else { + tupkeyErrorLab(signal); + return -1; + }//if + } else { + return TUPKEY_abort(signal, 15); + }//if + } else { + return TUPKEY_abort(signal, 16); + }//if + break; + } + + case Interpreter::LOAD_CONST_NULL: + jam(); + TregMemBuffer[theRegister] = 0; /* NULL INDICATOR */ + break; + + case Interpreter::LOAD_CONST16: + jam(); + TregMemBuffer[theRegister] = 0x50; /* 32 BIT UNSIGNED CONSTANT */ + * (Int64*)(TregMemBuffer+theRegister+2) = theInstruction >> 16; + break; + + case Interpreter::LOAD_CONST32: + jam(); + TregMemBuffer[theRegister] = 0x50; /* 32 BIT UNSIGNED CONSTANT */ + * (Int64*)(TregMemBuffer+theRegister+2) = * + (TcurrentProgram+TprogramCounter); + TprogramCounter++; + break; + + case Interpreter::LOAD_CONST64: + jam(); + TregMemBuffer[theRegister] = 0x60; /* 64 BIT UNSIGNED CONSTANT */ + TregMemBuffer[theRegister + 2 ] = * (TcurrentProgram + TprogramCounter++); + TregMemBuffer[theRegister + 3 ] = * (TcurrentProgram + TprogramCounter++); + break; + + case Interpreter::ADD_REG_REG: + jam(); + { + Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2; + Uint32 TdestRegister = Interpreter::getReg3(theInstruction) << 2; + + Uint32 TrightType = TregMemBuffer[TrightRegister]; + Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2); + + + Uint32 TleftType = TregMemBuffer[theRegister]; + Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2); + + if ((TleftType | TrightType) != 0) { + Uint64 Tdest0 = Tleft0 + Tright0; + * (Int64*)(TregMemBuffer+TdestRegister+2) = Tdest0; + TregMemBuffer[TdestRegister] = 0x60; + } else { + return TUPKEY_abort(signal, 20); + } + break; + } + + case Interpreter::SUB_REG_REG: + jam(); + { + Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2; + Uint32 TdestRegister = Interpreter::getReg3(theInstruction) << 2; + + Uint32 TrightType = TregMemBuffer[TrightRegister]; + Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2); + + Uint32 TleftType = TregMemBuffer[theRegister]; + Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2); + + if ((TleftType | TrightType) != 0) { + Int64 Tdest0 = Tleft0 - Tright0; + * (Int64*)(TregMemBuffer+TdestRegister+2) = Tdest0; + TregMemBuffer[TdestRegister] = 0x60; + } else { + return TUPKEY_abort(signal, 20); + } + break; + } + + case Interpreter::BRANCH: + TprogramCounter = brancher(theInstruction, TprogramCounter); + break; + + case Interpreter::BRANCH_REG_EQ_NULL: + if (TregMemBuffer[theRegister] != 0) { + jam(); + continue; + } else { + jam(); + TprogramCounter = brancher(theInstruction, TprogramCounter); + }//if + break; + + case Interpreter::BRANCH_REG_NE_NULL: + if (TregMemBuffer[theRegister] == 0) { + jam(); + continue; + } else { + jam(); + TprogramCounter = brancher(theInstruction, TprogramCounter); + }//if + break; + + + case Interpreter::BRANCH_EQ_REG_REG: + { + Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2; + + Uint32 TleftType = TregMemBuffer[theRegister]; + Uint32 Tleft0 = TregMemBuffer[theRegister + 2]; + Uint32 Tleft1 = TregMemBuffer[theRegister + 3]; + + Uint32 TrightType = TregMemBuffer[TrightRegister]; + Uint32 Tright0 = TregMemBuffer[TrightRegister + 2]; + Uint32 Tright1 = TregMemBuffer[TrightRegister + 3]; + if ((TrightType | TleftType) != 0) { + jam(); + if ((Tleft0 == Tright0) && (Tleft1 == Tright1)) { + TprogramCounter = brancher(theInstruction, TprogramCounter); + }//if + } else { + return TUPKEY_abort(signal, 23); + }//if + break; + } + + case Interpreter::BRANCH_NE_REG_REG: + { + Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2; + + Uint32 TleftType = TregMemBuffer[theRegister]; + Uint32 Tleft0 = TregMemBuffer[theRegister + 2]; + Uint32 Tleft1 = TregMemBuffer[theRegister + 3]; + + Uint32 TrightType = TregMemBuffer[TrightRegister]; + Uint32 Tright0 = TregMemBuffer[TrightRegister + 2]; + Uint32 Tright1 = TregMemBuffer[TrightRegister + 3]; + if ((TrightType | TleftType) != 0) { + jam(); + if ((Tleft0 != Tright0) || (Tleft1 != Tright1)) { + TprogramCounter = brancher(theInstruction, TprogramCounter); + }//if + } else { + return TUPKEY_abort(signal, 24); + }//if + break; + } + + case Interpreter::BRANCH_LT_REG_REG: + { + Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2; + + Uint32 TrightType = TregMemBuffer[TrightRegister]; + Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2); + + Uint32 TleftType = TregMemBuffer[theRegister]; + Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2); + + + if ((TrightType | TleftType) != 0) { + jam(); + if (Tleft0 < Tright0) { + TprogramCounter = brancher(theInstruction, TprogramCounter); + }//if + } else { + return TUPKEY_abort(signal, 24); + }//if + break; + } + + case Interpreter::BRANCH_LE_REG_REG: + { + Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2; + + Uint32 TrightType = TregMemBuffer[TrightRegister]; + Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2); + + Uint32 TleftType = TregMemBuffer[theRegister]; + Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2); + + + if ((TrightType | TleftType) != 0) { + jam(); + if (Tleft0 <= Tright0) { + TprogramCounter = brancher(theInstruction, TprogramCounter); + }//if + } else { + return TUPKEY_abort(signal, 26); + }//if + break; + } + + case Interpreter::BRANCH_GT_REG_REG: + { + Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2; + + Uint32 TrightType = TregMemBuffer[TrightRegister]; + Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2); + + Uint32 TleftType = TregMemBuffer[theRegister]; + Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2); + + + if ((TrightType | TleftType) != 0) { + jam(); + if (Tleft0 > Tright0){ + TprogramCounter = brancher(theInstruction, TprogramCounter); + }//if + } else { + return TUPKEY_abort(signal, 27); + }//if + break; + } + + case Interpreter::BRANCH_GE_REG_REG: + { + Uint32 TrightRegister = Interpreter::getReg2(theInstruction) << 2; + + Uint32 TrightType = TregMemBuffer[TrightRegister]; + Int64 Tright0 = * (Int64*)(TregMemBuffer + TrightRegister + 2); + + Uint32 TleftType = TregMemBuffer[theRegister]; + Int64 Tleft0 = * (Int64*)(TregMemBuffer + theRegister + 2); + + + if ((TrightType | TleftType) != 0) { + jam(); + if (Tleft0 >= Tright0){ + TprogramCounter = brancher(theInstruction, TprogramCounter); + }//if + } else { + return TUPKEY_abort(signal, 28); + }//if + break; + } + + case Interpreter::BRANCH_ATTR_OP_ARG:{ + jam(); + Uint32 cond = Interpreter::getBinaryCondition(theInstruction); + Uint32 ins2 = TcurrentProgram[TprogramCounter]; + Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16; + Uint32 argLen = Interpreter::getBranchCol_Len(ins2); + + if(tmpHabitant != attrId){ + Int32 TnoDataR = readAttributes(pagePtr, + TupHeadOffset, + &attrId, 1, + tmpArea, tmpAreaSz, + false); + + if (TnoDataR == -1) { + jam(); + tupkeyErrorLab(signal); + return -1; + } + tmpHabitant = attrId; + } + + // get type + attrId >>= 16; + Uint32 TattrDescrIndex = tabptr.p->tabDescriptor + + (attrId << ZAD_LOG_SIZE); + Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr; + Uint32 TattrDesc2 = tableDescriptor[TattrDescrIndex+1].tabDescr; + Uint32 typeId = AttributeDescriptor::getType(TattrDesc1); + void * cs = 0; + if(AttributeOffset::getCharsetFlag(TattrDesc2)) + { + Uint32 pos = AttributeOffset::getCharsetPos(TattrDesc2); + cs = tabptr.p->charsetArray[pos]; + } + const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(typeId); + + // get data + AttributeHeader ah(tmpArea[0]); + const char* s1 = (char*)&tmpArea[1]; + const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1]; + // fixed length in 5.0 + Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1); + + bool r1_null = ah.isNULL(); + bool r2_null = argLen == 0; + int res1; + if (cond != Interpreter::LIKE && + cond != Interpreter::NOT_LIKE) { + if (r1_null || r2_null) { + // NULL==NULL and NULL<not-NULL + res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1; + } else { + res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true); + } + } else { + if (r1_null || r2_null) { + // NULL like NULL is true (has no practical use) + res1 = r1_null && r2_null ? 0 : -1; + } else { + res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen); + } + } + + int res = 0; + switch ((Interpreter::BinaryCondition)cond) { + case Interpreter::EQ: + res = (res1 == 0); + break; + case Interpreter::NE: + res = (res1 != 0); + break; + // note the condition is backwards + case Interpreter::LT: + res = (res1 > 0); + break; + case Interpreter::LE: + res = (res1 >= 0); + break; + case Interpreter::GT: + res = (res1 < 0); + break; + case Interpreter::GE: + res = (res1 <= 0); + break; + case Interpreter::LIKE: + res = (res1 == 0); + break; + case Interpreter::NOT_LIKE: + res = (res1 == 1); + break; + // XXX handle invalid value + } +#ifdef TRACE_INTERPRETER + ndbout_c("cond=%u attr(%d)='%.*s'(%d) str='%.*s'(%d) res1=%d res=%d", + cond, attrId >> 16, + attrLen, s1, attrLen, argLen, s2, argLen, res1, res); +#endif + if (res) + TprogramCounter = brancher(theInstruction, TprogramCounter); + else + { + Uint32 tmp = ((argLen + 3) >> 2) + 1; + TprogramCounter += tmp; + } + break; + } + + case Interpreter::BRANCH_ATTR_EQ_NULL:{ + jam(); + Uint32 ins2 = TcurrentProgram[TprogramCounter]; + Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16; + + if(tmpHabitant != attrId){ + Int32 TnoDataR = readAttributes(pagePtr, + TupHeadOffset, + &attrId, 1, + tmpArea, tmpAreaSz, + false); + + if (TnoDataR == -1) { + jam(); + tupkeyErrorLab(signal); + return -1; + } + tmpHabitant = attrId; + } + + AttributeHeader ah(tmpArea[0]); + if(ah.isNULL()){ + TprogramCounter = brancher(theInstruction, TprogramCounter); + } else { + TprogramCounter ++; + } + break; + } + + case Interpreter::BRANCH_ATTR_NE_NULL:{ + jam(); + Uint32 ins2 = TcurrentProgram[TprogramCounter]; + Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16; + + if(tmpHabitant != attrId){ + Int32 TnoDataR = readAttributes(pagePtr, + TupHeadOffset, + &attrId, 1, + tmpArea, tmpAreaSz, + false); + + if (TnoDataR == -1) { + jam(); + tupkeyErrorLab(signal); + return -1; + } + tmpHabitant = attrId; + } + + AttributeHeader ah(tmpArea[0]); + if(ah.isNULL()){ + TprogramCounter ++; + } else { + TprogramCounter = brancher(theInstruction, TprogramCounter); + } + break; + } + + case Interpreter::EXIT_OK: + jam(); +#ifdef TRACE_INTERPRETER + ndbout_c(" - exit_ok"); +#endif + return TdataWritten; + + case Interpreter::EXIT_OK_LAST: + jam(); +#ifdef TRACE_INTERPRETER + ndbout_c(" - exit_ok_last"); +#endif + operPtr.p->lastRow = 1; + return TdataWritten; + + case Interpreter::EXIT_REFUSE: + jam(); +#ifdef TRACE_INTERPRETER + ndbout_c(" - exit_nok"); +#endif + terrorCode = theInstruction >> 16; + return TUPKEY_abort(signal, 29); + + case Interpreter::CALL: + jam(); + RstackPtr++; + if (RstackPtr < 32) { + TstackMemBuffer[RstackPtr] = TprogramCounter + 1; + TprogramCounter = theInstruction >> 16; + if (TprogramCounter < TsubroutineLen) { + TcurrentProgram = subroutineProg; + TcurrentSize = TsubroutineLen; + } else { + return TUPKEY_abort(signal, 30); + }//if + } else { + return TUPKEY_abort(signal, 31); + }//if + break; + + case Interpreter::RETURN: + jam(); + if (RstackPtr > 0) { + TprogramCounter = TstackMemBuffer[RstackPtr]; + RstackPtr--; + if (RstackPtr == 0) { + jam(); + /* ------------------------------------------------------------- */ + // We are back to the main program. + /* ------------------------------------------------------------- */ + TcurrentProgram = mainProgram; + TcurrentSize = TmainProgLen; + }//if + } else { + return TUPKEY_abort(signal, 32); + }//if + break; + + default: + return TUPKEY_abort(signal, 33); + }//switch + } else { + return TUPKEY_abort(signal, 34); + }//if + }//while + return TUPKEY_abort(signal, 35); +}//Dbtup::interpreterNextLab() + + |