diff options
Diffstat (limited to 'storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp')
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 3184 |
1 files changed, 2054 insertions, 1130 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 2d6f579302c..dc8b123f10f 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -41,6 +41,19 @@ #define DEBUG(x) #endif +#ifdef ACC_SAFE_QUEUE +#define vlqrequire(x) do { if (unlikely(!(x))) {\ + dump_lock_queue(loPtr); \ + ndbrequire(false); } } while(0) +#else +#define vlqrequire(x) ndbrequire(x) +#endif + + +// primary key is stored in TUP +#include "../dbtup/Dbtup.hpp" +#include "../dblqh/Dblqh.hpp" + // Signal entries and statement blocks /* --------------------------------------------------------------------------------- */ @@ -208,8 +221,8 @@ void Dbacc::execSTTOR(Signal* signal) switch (tstartphase) { case 1: jam(); - c_tup = (Dbtup*)globalData.getBlock(DBTUP); - ndbrequire(c_tup != 0); + ndbrequire((c_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0); + ndbrequire((c_lqh = (Dblqh*)globalData.getBlock(DBLQH)) != 0); break; } tuserblockref = signal->theData[3]; @@ -435,9 +448,7 @@ void Dbacc::initialiseOperationRec(Signal* signal) for (operationRecPtr.i = 0; operationRecPtr.i < coprecsize; operationRecPtr.i++) { refresh_watch_dog(); ptrAss(operationRecPtr, operationrec); - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; - operationRecPtr.p->opState = FREE_OP; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; operationRecPtr.p->nextOp = operationRecPtr.i + 1; }//for operationRecPtr.i = coprecsize - 1; @@ -899,8 +910,7 @@ void Dbacc::execACCSEIZEREQ(Signal* signal) ptrGuard(operationRecPtr); operationRecPtr.p->userptr = tuserptr; operationRecPtr.p->userblockref = tuserblockref; - operationRecPtr.p->operation = ZUNDEFINED_OP; - operationRecPtr.p->transactionstate = IDLE; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; /* ******************************< */ /* ACCSEIZECONF */ /* ******************************< */ @@ -954,51 +964,32 @@ void Dbacc::initOpRec(Signal* signal) operationRecPtr.p->xfrmtupkeylen = signal->theData[4]; operationRecPtr.p->transId1 = signal->theData[5]; operationRecPtr.p->transId2 = signal->theData[6]; - operationRecPtr.p->transactionstate = ACTIVE; - operationRecPtr.p->commitDeleteCheckFlag = ZFALSE; - operationRecPtr.p->operation = Treqinfo & 0x7; - /* --------------------------------------------------------------------------------- */ - // opSimple is not used in this version. Is needed for deadlock handling later on. - /* --------------------------------------------------------------------------------- */ - // operationRecPtr.p->opSimple = (Treqinfo >> 3) & 0x1; - - operationRecPtr.p->lockMode = (Treqinfo >> 4) & 0x3; Uint32 readFlag = (((Treqinfo >> 4) & 0x3) == 0); // Only 1 if Read Uint32 dirtyFlag = (((Treqinfo >> 6) & 0x1) == 1); // Only 1 if Dirty Uint32 dirtyReadFlag = readFlag & dirtyFlag; - operationRecPtr.p->dirtyRead = dirtyReadFlag; - operationRecPtr.p->nodeType = (Treqinfo >> 7) & 0x3; + Uint32 opbits = 0; + opbits |= Treqinfo & 0x7; + opbits |= ((Treqinfo >> 4) & 0x3) ? Operationrec::OP_LOCK_MODE : 0; + opbits |= ((Treqinfo >> 4) & 0x3) ? Operationrec::OP_ACC_LOCK_MODE : 0; + opbits |= (dirtyReadFlag) ? Operationrec::OP_DIRTY_READ : 0; + opbits |= ((Treqinfo >> 31) & 0x1) ? Operationrec::OP_LOCK_REQ : 0; + + //operationRecPtr.p->nodeType = (Treqinfo >> 7) & 0x3; operationRecPtr.p->fid = fragrecptr.p->myfid; operationRecPtr.p->fragptr = fragrecptr.i; operationRecPtr.p->nextParallelQue = RNIL; operationRecPtr.p->prevParallelQue = RNIL; - operationRecPtr.p->prevQueOp = RNIL; - operationRecPtr.p->nextQueOp = RNIL; operationRecPtr.p->nextSerialQue = RNIL; operationRecPtr.p->prevSerialQue = RNIL; operationRecPtr.p->elementPage = RNIL; - operationRecPtr.p->keyinfoPage = RNIL; - operationRecPtr.p->lockOwner = ZFALSE; - operationRecPtr.p->insertIsDone = ZFALSE; - operationRecPtr.p->elementIsDisappeared = ZFALSE; - operationRecPtr.p->insertDeleteLen = fragrecptr.p->elementLength; - operationRecPtr.p->longPagePtr = RNIL; - operationRecPtr.p->longKeyPageIndex = RNIL; operationRecPtr.p->scanRecPtr = RNIL; + operationRecPtr.p->m_op_bits = opbits; // bit to mark lock operation - operationRecPtr.p->isAccLockReq = (Treqinfo >> 31) & 0x1; // undo log is not run via ACCKEYREQ - if(ERROR_INSERTED(5900) || ERROR_INSERTED(5901)) - { - for(unsigned i = 0; i<8 && i<signal->theData[4]; i++){ - operationRecPtr.p->keydata[i] = signal->theData[i+7]; - } - } - }//Dbacc::initOpRec() /* --------------------------------------------------------------------------------- */ @@ -1007,7 +998,7 @@ void Dbacc::initOpRec(Signal* signal) void Dbacc::sendAcckeyconf(Signal* signal) { signal->theData[0] = operationRecPtr.p->userptr; - signal->theData[1] = operationRecPtr.p->operation; + signal->theData[1] = operationRecPtr.p->m_op_bits & Operationrec::OP_MASK; signal->theData[2] = operationRecPtr.p->fid; signal->theData[3] = operationRecPtr.p->localdata[0]; signal->theData[4] = operationRecPtr.p->localdata[1]; @@ -1015,7 +1006,8 @@ void Dbacc::sendAcckeyconf(Signal* signal) }//Dbacc::sendAcckeyconf() -void Dbacc::ACCKEY_error(Uint32 fromWhere) +void +Dbacc::ACCKEY_error(Uint32 fromWhere) { switch(fromWhere) { case 0: @@ -1069,7 +1061,8 @@ void Dbacc::execACCKEYREQ(Signal* signal) }//if ptrAss(operationRecPtr, operationrec); ptrAss(fragrecptr, fragmentrec); - ndbrequire(operationRecPtr.p->transactionstate == IDLE); + + ndbrequire(operationRecPtr.p->m_op_bits == Operationrec::OP_INITIAL); initOpRec(signal); // normalize key if any char attr @@ -1083,23 +1076,32 @@ void Dbacc::execACCKEYREQ(Signal* signal) /* WE REMEMBER THESE ADDRESS IF WE LATER NEED TO INSERT */ /* THE ITEM AFTER NOT FINDING THE ITEM. */ /*---------------------------------------------------------------*/ - getElement(signal); - - if (tgeResult == ZTRUE) { - switch (operationRecPtr.p->operation) { + OperationrecPtr lockOwnerPtr; + const Uint32 found = getElement(signal, lockOwnerPtr); + + Uint32 opbits = operationRecPtr.p->m_op_bits; + Uint32 op = opbits & Operationrec::OP_MASK; + if (found == ZTRUE) + { + switch (op) { case ZREAD: case ZUPDATE: case ZDELETE: case ZWRITE: case ZSCAN_OP: - if (!tgeLocked){ - if(operationRecPtr.p->operation == ZWRITE) + if (!lockOwnerPtr.p) + { + if(op == ZWRITE) { jam(); - operationRecPtr.p->operation = ZUPDATE; + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits |= (op = ZUPDATE); + operationRecPtr.p->m_op_bits = opbits; // store to get correct ACCKEYCONF } + opbits |= Operationrec::OP_STATE_RUNNING; + opbits |= Operationrec::OP_RUN_QUEUE; sendAcckeyconf(signal); - if (operationRecPtr.p->dirtyRead == ZFALSE) { + if (! (opbits & Operationrec::OP_DIRTY_READ)) { /*---------------------------------------------------------------*/ // It is not a dirty read. We proceed by locking and continue with // the operation. @@ -1116,42 +1118,44 @@ void Dbacc::execACCKEYREQ(Signal* signal) dbgWord32(gePageptr, tgeElementptr, eh); gePageptr.p->word32[tgeElementptr] = eh; - insertLockOwnersList(signal , operationRecPtr); - return; + opbits |= Operationrec::OP_LOCK_OWNER; + insertLockOwnersList(signal, operationRecPtr); } else { jam(); /*---------------------------------------------------------------*/ // It is a dirty read. We do not lock anything. Set state to // IDLE since no COMMIT call will come. /*---------------------------------------------------------------*/ - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; - return; + opbits = Operationrec::OP_EXECUTED_DIRTY_READ; }//if + operationRecPtr.p->m_op_bits = opbits; + return; } else { jam(); - accIsLockedLab(signal); + accIsLockedLab(signal, lockOwnerPtr); return; }//if break; case ZINSERT: jam(); - insertExistElemLab(signal); + insertExistElemLab(signal, lockOwnerPtr); return; break; default: ndbrequire(false); break; }//switch - } else if (tgeResult == ZFALSE) { - switch (operationRecPtr.p->operation) { - case ZINSERT: + } else if (found == ZFALSE) { + switch (op){ case ZWRITE: + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits |= (op = ZINSERT); + case ZINSERT: jam(); - // If a write operation makes an insert we switch operation to ZINSERT so - // that the commit-method knows an insert has been made and updates noOfElements. - operationRecPtr.p->operation = ZINSERT; - operationRecPtr.p->insertIsDone = ZTRUE; + opbits |= Operationrec::OP_INSERT_IS_DONE; + opbits |= Operationrec::OP_STATE_RUNNING; + opbits |= Operationrec::OP_RUN_QUEUE; + operationRecPtr.p->m_op_bits = opbits; insertelementLab(signal); return; break; @@ -1169,13 +1173,287 @@ void Dbacc::execACCKEYREQ(Signal* signal) }//switch } else { jam(); - acckeyref1Lab(signal, tgeResult); + acckeyref1Lab(signal, found); return; }//if return; }//Dbacc::execACCKEYREQ() void +Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI) +{ + jamEntry(); + OperationrecPtr lastOp; + lastOp.i = opPtrI; + ptrCheckGuard(lastOp, coprecsize, operationrec); + Uint32 opbits = lastOp.p->m_op_bits; + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; + + if (likely(opbits == Operationrec::OP_EXECUTED_DIRTY_READ)) + { + jam(); + lastOp.p->m_op_bits = Operationrec::OP_INITIAL; + return; + } + else if (likely(opstate == Operationrec::OP_STATE_RUNNING)) + { + opbits |= Operationrec::OP_STATE_EXECUTED; + lastOp.p->m_op_bits = opbits; + startNext(signal, lastOp); + return; + } + else + { + } + + ndbout_c("bits: %.8x state: %.8x", opbits, opstate); + ndbrequire(false); +} + +void +Dbacc::startNext(Signal* signal, OperationrecPtr lastOp) +{ + jam(); + OperationrecPtr nextOp; + OperationrecPtr loPtr; + nextOp.i = lastOp.p->nextParallelQue; + loPtr.i = lastOp.p->m_lock_owner_ptr_i; + Uint32 opbits = lastOp.p->m_op_bits; + + if ((opbits & Operationrec::OP_STATE_MASK)!= Operationrec::OP_STATE_EXECUTED) + { + jam(); + return; + } + + Uint32 nextbits; + if (nextOp.i != RNIL) + { + jam(); + ptrCheckGuard(nextOp, coprecsize, operationrec); + nextbits = nextOp.p->m_op_bits; + goto checkop; + } + + if ((opbits & Operationrec::OP_LOCK_OWNER) == 0) + { + jam(); + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + else + { + jam(); + loPtr = lastOp; + } + + nextOp.i = loPtr.p->nextSerialQue; + ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + + if (nextOp.i == RNIL) + { + jam(); + return; + } + + /** + * There is an op in serie queue... + * Check if it can run + */ + ptrCheckGuard(nextOp, coprecsize, operationrec); + nextbits = nextOp.p->m_op_bits; + + { + const bool same = nextOp.p->is_same_trans(lastOp.p); + + if (!same && ((opbits & Operationrec::OP_ACC_LOCK_MODE) || + (nextbits & Operationrec::OP_LOCK_MODE))) + { + jam(); + /** + * Not same transaction + * and either last had exclusive lock + * or next had exclusive lock + */ + return; + } + + /** + * same trans and X-lock + */ + if (same && (opbits & Operationrec::OP_ACC_LOCK_MODE)) + { + jam(); + goto upgrade; + } + } + + /** + * all shared lock... + */ + if ((opbits & Operationrec::OP_ACC_LOCK_MODE) == 0 && + (nextbits & Operationrec::OP_LOCK_MODE) == 0) + { + jam(); + goto upgrade; + } + + /** + * There is a shared parallell queue & and exclusive op is first in queue + */ + ndbassert((opbits & Operationrec::OP_ACC_LOCK_MODE) == 0 && + (nextbits & Operationrec::OP_LOCK_MODE)); + + /** + * We must check if there are many transactions in parallel queue... + */ + OperationrecPtr tmp; + tmp.i = loPtr.p->nextParallelQue; + while (tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + if (!nextOp.p->is_same_trans(tmp.p)) + { + jam(); + /** + * parallel queue contained another transaction, dont let it run + */ + return; + } + } + +upgrade: + /** + * Move first op in serie queue to end of parallell queue + */ + + tmp.i = loPtr.p->nextSerialQue = nextOp.p->nextSerialQue; + loPtr.p->m_lo_last_parallel_op_ptr_i = nextOp.i; + nextOp.p->nextSerialQue = RNIL; + nextOp.p->prevSerialQue = RNIL; + nextOp.p->m_lock_owner_ptr_i = loPtr.i; + nextOp.p->prevParallelQue = lastOp.i; + lastOp.p->nextParallelQue = nextOp.i; + + if (tmp.i != RNIL) + { + jam(); + ptrCheckGuard(tmp, coprecsize, operationrec); + tmp.p->prevSerialQue = loPtr.i; + } + else + { + jam(); + loPtr.p->m_lo_last_serial_op_ptr_i = RNIL; + } + + nextbits |= Operationrec::OP_RUN_QUEUE; + + /** + * Currently no grouping of ops in serie queue + */ + ndbrequire(nextOp.p->nextParallelQue == RNIL); + +checkop: + Uint32 errCode = 0; + OperationrecPtr save = operationRecPtr; + operationRecPtr = nextOp; + + Uint32 lastop = opbits & Operationrec::OP_MASK; + Uint32 nextop = nextbits & Operationrec::OP_MASK; + + nextbits &= nextbits & ~(Uint32)Operationrec::OP_STATE_MASK; + nextbits |= Operationrec::OP_STATE_RUNNING; + + if (lastop == ZDELETE) + { + jam(); + if (nextop != ZINSERT && nextop != ZWRITE) + { + errCode = ZREAD_ERROR; + goto ref; + } + + nextbits &= ~(Uint32)Operationrec::OP_MASK; + nextbits &= ~(Uint32)Operationrec::OP_ELEMENT_DISAPPEARED; + nextbits |= (nextop = ZINSERT); + nextbits |= Operationrec::OP_INSERT_IS_DONE; + goto conf; + } + else if (nextop == ZINSERT) + { + jam(); + errCode = ZWRITE_ERROR; + goto ref; + } + else if (nextop == ZWRITE) + { + jam(); + nextbits &= ~(Uint32)Operationrec::OP_MASK; + nextbits |= (nextop = ZUPDATE); + goto conf; + } + else + { + jam(); + } + +conf: + nextOp.p->m_op_bits = nextbits; + nextOp.p->localdata[0] = lastOp.p->localdata[0]; + nextOp.p->localdata[1] = lastOp.p->localdata[1]; + + if (nextop == ZSCAN_OP && (nextbits & Operationrec::OP_LOCK_REQ) == 0) + { + jam(); + takeOutScanLockQueue(nextOp.p->scanRecPtr); + putReadyScanQueue(signal, nextOp.p->scanRecPtr); + } + else + { + jam(); + fragrecptr.i = nextOp.p->fragptr; + ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); + + sendAcckeyconf(signal); + sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF, + signal, 6, JBB); + } + + operationRecPtr = save; + return; + +ref: + nextOp.p->m_op_bits = nextbits; + + if (nextop == ZSCAN_OP && (nextbits & Operationrec::OP_LOCK_REQ) == 0) + { + jam(); + nextOp.p->m_op_bits |= Operationrec::OP_ELEMENT_DISAPPEARED; + takeOutScanLockQueue(nextOp.p->scanRecPtr); + putReadyScanQueue(signal, nextOp.p->scanRecPtr); + } + else + { + jam(); + signal->theData[0] = nextOp.p->userptr; + signal->theData[1] = errCode; + sendSignal(nextOp.p->userblockref, GSN_ACCKEYREF, signal, + 2, JBB); + } + + operationRecPtr = save; + return; +} + + +#if 0 +void +Dbacc::execACCKEY_REP_REF(Signal* signal, Uint32 opPtrI) +{ +} +#endif + +void Dbacc::xfrmKeyData(Signal* signal) { Uint32 table = fragrecptr.p->myTableId; @@ -1188,23 +1466,22 @@ Dbacc::xfrmKeyData(Signal* signal) operationRecPtr.p->xfrmtupkeylen = len; } -void Dbacc::accIsLockedLab(Signal* signal) +void +Dbacc::accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr) { ndbrequire(csystemRestart == ZFALSE); - queOperPtr.i = ElementHeader::getOpPtrI(gePageptr.p->word32[tgeElementptr]); - ptrCheckGuard(queOperPtr, coprecsize, operationrec); - if (operationRecPtr.p->dirtyRead == ZFALSE) { + + Uint32 bits = operationRecPtr.p->m_op_bits; + validate_lock_queue(lockOwnerPtr); + + if ((bits & Operationrec::OP_DIRTY_READ) == 0){ Uint32 return_result; - if (operationRecPtr.p->lockMode == ZREADLOCK) { + if ((bits & Operationrec::OP_LOCK_MODE) == ZREADLOCK) { jam(); - priPageptr = gePageptr; - tpriElementptr = tgeElementptr; - return_result = placeReadInLockQueue(signal); + return_result = placeReadInLockQueue(lockOwnerPtr); } else { jam(); - pwiPageptr = gePageptr; - tpwiElementptr = tgeElementptr; - return_result = placeWriteInLockQueue(signal); + return_result = placeWriteInLockQueue(lockOwnerPtr); }//if if (return_result == ZPARALLEL_QUEUE) { jam(); @@ -1214,24 +1491,29 @@ void Dbacc::accIsLockedLab(Signal* signal) jam(); signal->theData[0] = RNIL; return; - } else if (return_result == ZWRITE_ERROR) { + } else { jam(); acckeyref1Lab(signal, return_result); return; }//if ndbrequire(false); - } else { - if (queOperPtr.p->elementIsDisappeared == ZFALSE) { + } + else + { + if (!(lockOwnerPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED) && + lockOwnerPtr.p->localdata[0] != ~(Uint32)0) + { jam(); - /*---------------------------------------------------------------*/ - // It is a dirty read. We do not lock anything. Set state to - // IDLE since no COMMIT call will arrive. - /*---------------------------------------------------------------*/ + /* --------------------------------------------------------------- + * It is a dirty read. We do not lock anything. Set state to + *IDLE since no COMMIT call will arrive. + * ---------------------------------------------------------------*/ sendAcckeyconf(signal); - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; + operationRecPtr.p->m_op_bits = Operationrec::OP_EXECUTED_DIRTY_READ; return; - } else { + } + else + { jam(); /*---------------------------------------------------------------*/ // The tuple does not exist in the committed world currently. @@ -1243,17 +1525,18 @@ void Dbacc::accIsLockedLab(Signal* signal) }//if }//Dbacc::accIsLockedLab() -/* --------------------------------------------------------------------------------- */ -/* I N S E R T E X I S T E L E M E N T */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::insertExistElemLab(Signal* signal) +/* ------------------------------------------------------------------------ */ +/* I N S E R T E X I S T E L E M E N T */ +/* ------------------------------------------------------------------------ */ +void Dbacc::insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr) { - if (!tgeLocked){ + if (!lockOwnerPtr.p) + { jam(); acckeyref1Lab(signal, ZWRITE_ERROR);/* THE ELEMENT ALREADY EXIST */ return; }//if - accIsLockedLab(signal); + accIsLockedLab(signal, lockOwnerPtr); }//Dbacc::insertExistElemLab() /* --------------------------------------------------------------------------------- */ @@ -1271,26 +1554,8 @@ void Dbacc::insertelementLab(Signal* signal) }//if }//if ndbrequire(operationRecPtr.p->tupkeylen <= fragrecptr.p->keyLength); - - Uint32 localKey; - if(!operationRecPtr.p->isAccLockReq) - { - signal->theData[0] = operationRecPtr.p->userptr; - Uint32 blockNo = refToBlock(operationRecPtr.p->userblockref); - EXECUTE_DIRECT(blockNo, GSN_LQH_ALLOCREQ, signal, 1); - jamEntry(); - if (signal->theData[0] != 0) { - jam(); - Uint32 result_code = signal->theData[0]; - acckeyref1Lab(signal, result_code); - return; - }//if - localKey = (signal->theData[1] << MAX_TUPLES_BITS) + signal->theData[2]; - } - else - { - localKey = signal->theData[7]; - } + ndbassert(!(operationRecPtr.p->m_op_bits & Operationrec::OP_LOCK_REQ)); + Uint32 localKey = ~(Uint32)0; insertLockOwnersList(signal, operationRecPtr); @@ -1305,196 +1570,18 @@ void Dbacc::insertelementLab(Signal* signal) idrOperationRecPtr = operationRecPtr; clocalkey[0] = localKey; operationRecPtr.p->localdata[0] = localKey; - /* --------------------------------------------------------------------------------- */ - /* WE SET THE LOCAL KEY TO MINUS ONE TO INDICATE IT IS NOT YET VALID. */ - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------------- */ + /* WE SET THE LOCAL KEY TO MINUS ONE TO INDICATE IT IS NOT YET VALID. */ + /* ----------------------------------------------------------------------- */ insertElement(signal); sendAcckeyconf(signal); return; }//Dbacc::insertelementLab() -/* --------------------------------------------------------------------------------- */ -/* PLACE_READ_IN_LOCK_QUEUE */ -/* INPUT: OPERATION_REC_PTR OUR OPERATION POINTER */ -/* QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER */ -/* PRI_PAGEPTR PAGE POINTER OF ELEMENT */ -/* TPRI_ELEMENTPTR ELEMENT POINTER OF ELEMENT */ -/* OUTPUT TRESULT = */ -/* ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE */ -/* OPERATION CAN PROCEED NOW. */ -/* ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE */ -/* ERROR CODE OPERATION NEEDS ABORTING */ -/* THE ELEMENT WAS LOCKED AND WE WANT TO READ THE TUPLE. WE WILL CHECK THE LOCK */ -/* QUEUES TO PERFORM THE PROPER ACTION. */ -/* */ -/* IN SOME PLACES IN THE CODE BELOW THAT HANDLES WHAT TO DO WHEN THE TUPLE IS LOCKED */ -/* WE DO ASSUME THAT NEXT_PARALLEL_QUEUE AND NEXT_SERIAL_QUEUE ON OPERATION_REC_PTR */ -/* HAVE BEEN INITIALISED TO RNIL. THUS WE DO NOT PERFORM THIS ONCE MORE EVEN IF IT */ -/* COULD BE NICE FOR READABILITY. */ -/* --------------------------------------------------------------------------------- */ -Uint32 Dbacc::placeReadInLockQueue(Signal* signal) -{ - if (getNoParallelTransaction(queOperPtr.p) == 1) { - if ((queOperPtr.p->transId1 == operationRecPtr.p->transId1) && - (queOperPtr.p->transId2 == operationRecPtr.p->transId2)) { - /* --------------------------------------------------------------------------------- */ - /* WE ARE PERFORMING A READ OPERATION AND THIS TRANSACTION ALREADY OWNS THE LOCK */ - /* ALONE. PUT THE OPERATION LAST IN THE PARALLEL QUEUE. */ - /* --------------------------------------------------------------------------------- */ - jam(); - mlpqOperPtr = queOperPtr; - moveLastParallelQueue(signal); - operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1]; - operationRecPtr.p->prevParallelQue = mlpqOperPtr.i; - mlpqOperPtr.p->nextParallelQue = operationRecPtr.i; - switch (queOperPtr.p->lockMode) { - case ZREADLOCK: - jam(); - /*empty*/; - break; - default: - jam(); - /* --------------------------------------------------------------------------------- */ - /* IF THE TRANSACTION PREVIOUSLY SET A WRITE LOCK WE MUST ENSURE THAT ALL */ - /* OPERATIONS IN THE PARALLEL QUEUE HAVE WRITE LOCK MODE TO AVOID STRANGE BUGS.*/ - /* --------------------------------------------------------------------------------- */ - operationRecPtr.p->lockMode = queOperPtr.p->lockMode; - break; - }//switch - return ZPARALLEL_QUEUE; - }//if - }//if - if (queOperPtr.p->nextSerialQue == RNIL) { - /* --------------------------------------------------------------------------------- */ - /* WE ARE PERFORMING A READ OPERATION AND THERE IS NO SERIAL QUEUE. IF THERE IS NO */ - /* WRITE OPERATION THAT OWNS THE LOCK OR ANY WRITE OPERATION IN THE PARALLEL QUEUE */ - /* IT IS ENOUGH TO CHECK THE LOCK MODE OF THE LEADER IN THE PARALLEL QUEUE. IF IT IS */ - /* A READ LOCK THEN WE PLACE OURSELVES IN THE PARALLEL QUEUE OTHERWISE WE GO ON TO */ - /* PLACE OURSELVES IN THE SERIAL QUEUE. */ - /* --------------------------------------------------------------------------------- */ - switch (queOperPtr.p->lockMode) { - case ZREADLOCK: - jam(); - mlpqOperPtr = queOperPtr; - moveLastParallelQueue(signal); - operationRecPtr.p->prevParallelQue = mlpqOperPtr.i; - mlpqOperPtr.p->nextParallelQue = operationRecPtr.i; - operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1]; - return ZPARALLEL_QUEUE; - default: - jam(); - queOperPtr.p->nextSerialQue = operationRecPtr.i; - operationRecPtr.p->prevSerialQue = queOperPtr.i; - break; - }//switch - } else { - jam(); - placeSerialQueueRead(signal); - }//if - return ZSERIAL_QUEUE; -}//Dbacc::placeReadInLockQueue() - -/* --------------------------------------------------------------------------------- */ -/* WE WILL CHECK IF THIS TRANSACTION IS ALREADY PLACED AT SOME SPOT IN THE PARALLEL */ -/* SERIAL QUEUE WITHOUT ANY NEIGHBORS FROM OTHER TRANSACTION. IF SO WE WILL INSERT */ -/* IT IN THAT PARALLEL QUEUE. */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::placeSerialQueueRead(Signal* signal) -{ - readWriteOpPtr.i = queOperPtr.p->nextSerialQue; - ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec); - PSQR_LOOP: - jam(); - if (readWriteOpPtr.p->nextSerialQue == RNIL) { - jam(); - /* --------------------------------------------------------------------------------- */ - /* THERE WAS NO PREVIOUS OPERATION IN THIS TRANSACTION WHICH WE COULD PUT IT */ - /* IN THE PARALLEL QUEUE TOGETHER WITH. */ - /* --------------------------------------------------------------------------------- */ - checkOnlyReadEntry(signal); - return; - }//if - if (getNoParallelTransaction(readWriteOpPtr.p) == 1) { - jam(); - /* --------------------------------------------------------------------------------- */ - /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */ - /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */ - /* --------------------------------------------------------------------------------- */ - if ((readWriteOpPtr.p->transId1 == operationRecPtr.p->transId1) && - (readWriteOpPtr.p->transId2 == operationRecPtr.p->transId2)) { - jam(); - /* --------------------------------------------------------------------------------- */ - /* WE ARE PERFORMING A READ IN THE SAME TRANSACTION WHERE WE ALREADY */ - /* PREVIOUSLY HAVE EXECUTED AN OPERATION. INSERT-DELETE, READ-UPDATE, READ-READ, */ - /* UPDATE-UPDATE, UPDATE-DELETE, READ-DELETE, INSERT-READ, INSERT-UPDATE ARE ALLOWED */ - /* COMBINATIONS. A NEW INSERT AFTER A DELETE IS NOT ALLOWED AND SUCH AN INSERT WILL */ - /* GO TO THE SERIAL LOCK QUEUE WHICH IT WILL NOT LEAVE UNTIL A TIME-OUT AND THE */ - /* TRANSACTION IS ABORTED. READS AND UPDATES AFTER DELETES IS ALSO NOT ALLOWED. */ - /* --------------------------------------------------------------------------------- */ - mlpqOperPtr = readWriteOpPtr; - moveLastParallelQueue(signal); - readWriteOpPtr = mlpqOperPtr; - operationRecPtr.p->prevParallelQue = readWriteOpPtr.i; - readWriteOpPtr.p->nextParallelQue = operationRecPtr.i; - operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1]; - switch (readWriteOpPtr.p->lockMode) { - case ZREADLOCK: - jam(); - /*empty*/; - break; - default: - jam(); - /* --------------------------------------------------------------------------------- */ - /* IF THE TRANSACTION PREVIOUSLY SET A WRITE LOCK WE MUST ENSURE THAT ALL */ - /* OPERATIONS IN THE PARALLEL QUEUE HAVE WRITE LOCK MODE TO AVOID STRANGE BUGS.*/ - /* --------------------------------------------------------------------------------- */ - operationRecPtr.p->lockMode = readWriteOpPtr.p->lockMode; - break; - }//switch - return; - }//if - }//if - readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue; - ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec); - goto PSQR_LOOP; -}//Dbacc::placeSerialQueueRead() - -/* --------------------------------------------------------------------------------- */ -/* WE WILL CHECK IF THE LAST ENTRY IN THE SERIAL QUEUE CONTAINS ONLY READ */ -/* OPERATIONS. IF SO WE WILL INSERT IT IN THAT PARALLEL QUEUE. OTHERWISE WE */ -/* WILL PLACE IT AT THE END OF THE SERIAL QUEUE. */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::checkOnlyReadEntry(Signal* signal) -{ - switch (readWriteOpPtr.p->lockMode) { - case ZREADLOCK: - jam(); - /* --------------------------------------------------------------------------------- */ - /* SINCE THIS LAST QUEUE ONLY CONTAINS READ LOCKS WE CAN JOIN THE PARALLEL QUEUE AT */ - /* THE END. */ - /* --------------------------------------------------------------------------------- */ - mlpqOperPtr = readWriteOpPtr; - moveLastParallelQueue(signal); - readWriteOpPtr = mlpqOperPtr; - operationRecPtr.p->prevParallelQue = readWriteOpPtr.i; - readWriteOpPtr.p->nextParallelQue = operationRecPtr.i; - operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1]; - break; - default: - jam(); /* PUT THE OPERATION RECORD IN THE SERIAL QUEUE */ - readWriteOpPtr.p->nextSerialQue = operationRecPtr.i; - operationRecPtr.p->prevSerialQue = readWriteOpPtr.i; - break; - }//switch -}//Dbacc::checkOnlyReadEntry() -/* --------------------------------------------------------------------------------- */ -/* GET_NO_PARALLEL_TRANSACTION */ -/* --------------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------ */ +/* GET_NO_PARALLEL_TRANSACTION */ +/* ------------------------------------------------------------------------ */ Uint32 Dbacc::getNoParallelTransaction(const Operationrec * op) { @@ -1514,142 +1601,636 @@ Dbacc::getNoParallelTransaction(const Operationrec * op) return 1; }//Dbacc::getNoParallelTransaction() -void Dbacc::moveLastParallelQueue(Signal* signal) +#ifdef VM_TRACE +Uint32 +Dbacc::getNoParallelTransactionFull(const Operationrec * op) { - while (mlpqOperPtr.p->nextParallelQue != RNIL) { - jam(); - mlpqOperPtr.i = mlpqOperPtr.p->nextParallelQue; - ptrCheckGuard(mlpqOperPtr, coprecsize, operationrec); - }//if -}//Dbacc::moveLastParallelQueue() + ConstPtr<Operationrec> tmp; + + tmp.p = op; + while ((tmp.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0) + { + tmp.i = tmp.p->prevParallelQue; + if (tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + } + else + { + break; + } + } + + return getNoParallelTransaction(tmp.p); +} +#endif + +#ifdef ACC_SAFE_QUEUE -void Dbacc::moveLastParallelQueueWrite(Signal* signal) +Uint32 +Dbacc::get_parallel_head(OperationrecPtr opPtr) { - /* --------------------------------------------------------------------------------- */ - /* ENSURE THAT ALL OPERATIONS HAVE LOCK MODE SET TO WRITE SINCE WE INSERT A */ - /* WRITE LOCK INTO THE PARALLEL QUEUE. */ - /* --------------------------------------------------------------------------------- */ - while (mlpqOperPtr.p->nextParallelQue != RNIL) { - jam(); - mlpqOperPtr.p->lockMode = operationRecPtr.p->lockMode; - mlpqOperPtr.i = mlpqOperPtr.p->nextParallelQue; - ptrCheckGuard(mlpqOperPtr, coprecsize, operationrec); - }//if - mlpqOperPtr.p->lockMode = operationRecPtr.p->lockMode; -}//Dbacc::moveLastParallelQueueWrite() + while ((opPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 && + opPtr.p->prevParallelQue != RNIL) + { + opPtr.i = opPtr.p->prevParallelQue; + ptrCheckGuard(opPtr, coprecsize, operationrec); + } + + return opPtr.i; +} -/* --------------------------------------------------------------------------------- */ -/* PLACE_WRITE_IN_LOCK_QUEUE */ -/* INPUT: OPERATION_REC_PTR OUR OPERATION POINTER */ -/* QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER */ -/* PWI_PAGEPTR PAGE POINTER OF ELEMENT */ -/* TPWI_ELEMENTPTR ELEMENT POINTER OF ELEMENT */ -/* OUTPUT TRESULT = */ -/* ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE */ -/* OPERATION CAN PROCEED NOW. */ -/* ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE */ -/* ERROR CODE OPERATION NEEDS ABORTING */ -/* --------------------------------------------------------------------------------- */ -Uint32 Dbacc::placeWriteInLockQueue(Signal* signal) +bool +Dbacc::validate_lock_queue(OperationrecPtr opPtr) +{ + OperationrecPtr loPtr; + loPtr.i = get_parallel_head(opPtr); + ptrCheckGuard(loPtr, coprecsize, operationrec); + + while((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 && + loPtr.p->prevSerialQue != RNIL) + { + loPtr.i = loPtr.p->prevSerialQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + + // Now we have lock owner... + vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_RUN_QUEUE); + + // 1 Validate page pointer + { + Page8Ptr pagePtr; + pagePtr.i = loPtr.p->elementPage; + ptrCheckGuard(pagePtr, cpagesize, page8); + arrGuard(loPtr.p->elementPointer, 2048); + Uint32 eh = pagePtr.p->word32[loPtr.p->elementPointer]; + vlqrequire(ElementHeader::getLocked(eh)); + vlqrequire(ElementHeader::getOpPtrI(eh) == loPtr.i); + } + + // 2 Lock owner should always have same LOCK_MODE and ACC_LOCK_MODE + if (loPtr.p->m_op_bits & Operationrec::OP_LOCK_MODE) + { + vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE); + } + else + { + vlqrequire((loPtr.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE) == 0); + } + + // 3 Lock owner should never be waiting... + bool running = false; + { + Uint32 opstate = loPtr.p->m_op_bits & Operationrec::OP_STATE_MASK; + if (opstate == Operationrec::OP_STATE_RUNNING) + running = true; + else + { + vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED); + } + } + + // Validate parallel queue + { + bool many = false; + bool orlockmode = loPtr.p->m_op_bits & Operationrec::OP_LOCK_MODE; + OperationrecPtr lastP = loPtr; + + while (lastP.p->nextParallelQue != RNIL) + { + Uint32 prev = lastP.i; + lastP.i = lastP.p->nextParallelQue; + ptrCheckGuard(lastP, coprecsize, operationrec); + + vlqrequire(lastP.p->prevParallelQue == prev); + + Uint32 opbits = lastP.p->m_op_bits; + many |= loPtr.p->is_same_trans(lastP.p) ? 0 : 1; + orlockmode |= !!(opbits & Operationrec::OP_LOCK_MODE); + + vlqrequire(opbits & Operationrec::OP_RUN_QUEUE); + vlqrequire((opbits & Operationrec::OP_LOCK_OWNER) == 0); + + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; + if (running) + { + // If I found a running operation, + // all following should be waiting + vlqrequire(opstate == Operationrec::OP_STATE_WAITING); + } + else + { + if (opstate == Operationrec::OP_STATE_RUNNING) + running = true; + else + vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED); + } + + if (lastP.p->m_op_bits & Operationrec::OP_LOCK_MODE) + { + vlqrequire(lastP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE); + } + else + { + vlqrequire((lastP.p->m_op_bits && orlockmode) == orlockmode); + vlqrequire((lastP.p->m_op_bits & Operationrec::OP_MASK) == ZREAD || + (lastP.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP); + } + + if (many) + { + vlqrequire(orlockmode == 0); + } + } + + if (lastP.i != loPtr.i) + { + vlqrequire(loPtr.p->m_lo_last_parallel_op_ptr_i == lastP.i); + vlqrequire(lastP.p->m_lock_owner_ptr_i == loPtr.i); + } + else + { + vlqrequire(loPtr.p->m_lo_last_parallel_op_ptr_i == RNIL); + } + } + + // Validate serie queue + if (loPtr.p->nextSerialQue != RNIL) + { + Uint32 prev = loPtr.i; + OperationrecPtr lastS; + lastS.i = loPtr.p->nextSerialQue; + while (true) + { + ptrCheckGuard(lastS, coprecsize, operationrec); + vlqrequire(lastS.p->prevSerialQue == prev); + vlqrequire(getNoParallelTransaction(lastS.p) == 1); + vlqrequire((lastS.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0); + vlqrequire((lastS.p->m_op_bits & Operationrec::OP_RUN_QUEUE) == 0); + vlqrequire((lastS.p->m_op_bits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_WAITING); + if (lastS.p->nextSerialQue == RNIL) + break; + prev = lastS.i; + lastS.i = lastS.p->nextSerialQue; + } + + vlqrequire(loPtr.p->m_lo_last_serial_op_ptr_i == lastS.i); + } + else + { + vlqrequire(loPtr.p->m_lo_last_serial_op_ptr_i == RNIL); + } + return true; +} + +NdbOut& +operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr) { - if (!((getNoParallelTransaction(queOperPtr.p) == 1) && - (queOperPtr.p->transId1 == operationRecPtr.p->transId1) && - (queOperPtr.p->transId2 == operationRecPtr.p->transId2))) { + Uint32 opbits = ptr.p->m_op_bits; + out << "[ " << dec << ptr.i + << " [ " << hex << ptr.p->transId1 + << " " << hex << ptr.p->transId2 << "] " + << " bits: H'" << hex << opbits << " "; + + bool read = false; + switch(opbits & Dbacc::Operationrec::OP_MASK){ + case ZREAD: out << "READ "; read = true; break; + case ZINSERT: out << "INSERT "; break; + case ZUPDATE: out << "UPDATE "; break; + case ZDELETE: out << "DELETE "; break; + case ZWRITE: out << "WRITE "; break; + case ZSCAN_OP: out << "SCAN "; read = true; break; + default: + out << "<Unknown: H'" + << hex << (opbits & Dbacc::Operationrec::OP_MASK) + << "> "; + } + + if (read) + { + if (opbits & Dbacc::Operationrec::OP_LOCK_MODE) + out << "(X)"; + else + out << "(S)"; + if (opbits & Dbacc::Operationrec::OP_ACC_LOCK_MODE) + out << "(X)"; + else + out << "(S)"; + } + + if (opbits) + { + out << "(RQ)"; + } + + switch(opbits & Dbacc::Operationrec::OP_STATE_MASK){ + case Dbacc::Operationrec::OP_STATE_WAITING: + out << " WAITING "; break; + case Dbacc::Operationrec::OP_STATE_RUNNING: + out << " RUNNING "; break; + case Dbacc::Operationrec::OP_STATE_EXECUTED: + out << " EXECUTED "; break; + case Dbacc::Operationrec::OP_STATE_IDLE: + out << " IDLE "; break; + default: + out << " <Unknown: H'" + << hex << (opbits & Dbacc::Operationrec::OP_STATE_MASK) + << "> "; + } + +/* + OP_MASK = 0x000F // 4 bits for operation type + ,OP_LOCK_MODE = 0x0010 // 0 - shared lock, 1 = exclusive lock + ,OP_ACC_LOCK_MODE = 0x0020 // Or:de lock mode of all operation + // before me + ,OP_LOCK_OWNER = 0x0040 + ,OP_DIRTY_READ = 0x0080 + ,OP_LOCK_REQ = 0x0100 // isAccLockReq + ,OP_COMMIT_DELETE_CHECK = 0x0200 + ,OP_INSERT_IS_DONE = 0x0400 + ,OP_ELEMENT_DISAPPEARED = 0x0800 + + ,OP_STATE_MASK = 0xF000 + ,OP_STATE_IDLE = 0xF000 + ,OP_STATE_WAITING = 0x0000 + ,OP_STATE_RUNNING = 0x1000 + ,OP_STATE_EXECUTED = 0x3000 + }; +*/ + if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER) + out << "LO "; + + if (opbits & Dbacc::Operationrec::OP_DIRTY_READ) + out << "DR "; + + if (opbits & Dbacc::Operationrec::OP_LOCK_REQ) + out << "LOCK_REQ "; + + if (opbits & Dbacc::Operationrec::OP_COMMIT_DELETE_CHECK) + out << "COMMIT_DELETE_CHECK "; + + if (opbits & Dbacc::Operationrec::OP_INSERT_IS_DONE) + out << "INSERT_IS_DONE "; + + if (opbits & Dbacc::Operationrec::OP_ELEMENT_DISAPPEARED) + out << "ELEMENT_DISAPPEARED "; + + if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER) + { + out << "last_parallel: " << dec << ptr.p->m_lo_last_parallel_op_ptr_i << " "; + out << "last_serial: " << dec << ptr.p->m_lo_last_serial_op_ptr_i << " "; + } + + out << "]"; + return out; +} + +void +Dbacc::dump_lock_queue(OperationrecPtr loPtr) +{ + if ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0) + { + while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 && + loPtr.p->prevParallelQue != RNIL) + { + loPtr.i = loPtr.p->prevParallelQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + + while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 && + loPtr.p->prevSerialQue != RNIL) + { + loPtr.i = loPtr.p->prevSerialQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + + ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + } + + ndbout << "-- HEAD --" << endl; + OperationrecPtr tmp = loPtr; + while (tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + ndbout << tmp << " "; + tmp.i = tmp.p->nextParallelQue; + + if (tmp.i == loPtr.i) + { + ndbout << " <LOOP>"; + break; + } + } + ndbout << endl; + + tmp.i = loPtr.p->nextSerialQue; + while (tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + OperationrecPtr tmp2 = tmp; + + if (tmp.i == loPtr.i) + { + ndbout << "<LOOP S>" << endl; + break; + } + + while (tmp2.i != RNIL) + { + ptrCheckGuard(tmp2, coprecsize, operationrec); + ndbout << tmp2 << " "; + tmp2.i = tmp2.p->nextParallelQue; + + if (tmp2.i == tmp.i) + { + ndbout << "<LOOP 3>"; + break; + } + } + ndbout << endl; + tmp.i = tmp.p->nextSerialQue; + } +} +#endif + +/* ------------------------------------------------------------------------- + * PLACE_WRITE_IN_LOCK_QUEUE + * INPUT: OPERATION_REC_PTR OUR OPERATION POINTER + * QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER + * PWI_PAGEPTR PAGE POINTER OF ELEMENT + * TPWI_ELEMENTPTR ELEMENT POINTER OF ELEMENT + * OUTPUT TRESULT = + * ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE + * OPERATION CAN PROCEED NOW. + * ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE + * ERROR CODE OPERATION NEEDS ABORTING + * ------------------------------------------------------------------------- */ +Uint32 +Dbacc::placeWriteInLockQueue(OperationrecPtr lockOwnerPtr) +{ + OperationrecPtr lastOpPtr; + lastOpPtr.i = lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i; + Uint32 opbits = operationRecPtr.p->m_op_bits; + + if (lastOpPtr.i == RNIL) + { + lastOpPtr = lockOwnerPtr; + } + else + { + ptrCheckGuard(lastOpPtr, coprecsize, operationrec); + } + + ndbassert(get_parallel_head(lastOpPtr) == lockOwnerPtr.i); + + Uint32 lastbits = lastOpPtr.p->m_op_bits; + if (lastbits & Operationrec::OP_ACC_LOCK_MODE) + { + if(operationRecPtr.p->is_same_trans(lastOpPtr.p)) + { + goto checkop; + } + } + else + { + /** + * We dont have an exclusive lock on operation and + * + */ jam(); - placeSerialQueueWrite(signal); - return ZSERIAL_QUEUE; - }//if + + /** + * Scan parallell queue to see if we are the only one + */ + OperationrecPtr loopPtr = lockOwnerPtr; + do + { + ptrCheckGuard(loopPtr, coprecsize, operationrec); + if (!loopPtr.p->is_same_trans(operationRecPtr.p)) + { + goto serial; + } + loopPtr.i = loopPtr.p->nextParallelQue; + } while (loopPtr.i != RNIL); + + goto checkop; + } + +serial: + jam(); + placeSerialQueue(lockOwnerPtr, operationRecPtr); + + validate_lock_queue(lockOwnerPtr); + return ZSERIAL_QUEUE; + +checkop: /* - WE ARE PERFORMING AN READ EXCLUSIVE, INSERT, UPDATE OR DELETE IN THE SAME - TRANSACTION WHERE WE PREVIOUSLY HAVE EXECUTED AN OPERATION. - Read-All, Update-All, Insert-All and Delete-Insert are allowed - combinations. - Delete-Read, Delete-Update and Delete-Delete are not an allowed - combination and will result in tuple not found error. + WE ARE PERFORMING AN READ EXCLUSIVE, INSERT, UPDATE OR DELETE IN THE SAME + TRANSACTION WHERE WE PREVIOUSLY HAVE EXECUTED AN OPERATION. + Read-All, Update-All, Insert-All and Delete-Insert are allowed + combinations. + Delete-Read, Delete-Update and Delete-Delete are not an allowed + combination and will result in tuple not found error. */ - mlpqOperPtr = queOperPtr; - moveLastParallelQueueWrite(signal); + Uint32 lstate = lastbits & Operationrec::OP_STATE_MASK; - if (operationRecPtr.p->operation == ZINSERT && - mlpqOperPtr.p->operation != ZDELETE){ + Uint32 retValue = ZSERIAL_QUEUE; // So that it gets blocked... + if (lstate == Operationrec::OP_STATE_EXECUTED) + { jam(); - return ZWRITE_ERROR; - }//if - if(operationRecPtr.p->operation == ZWRITE) - { - operationRecPtr.p->operation = - (mlpqOperPtr.p->operation == ZDELETE) ? ZINSERT : ZUPDATE; + /** + * Since last operation has executed...we can now check operation types + * if not, we have to wait until it has executed + */ + Uint32 op = opbits & Operationrec::OP_MASK; + Uint32 lop = lastbits & Operationrec::OP_MASK; + if (op == ZINSERT && lop != ZDELETE) + { + jam(); + return ZWRITE_ERROR; + }//if + + /** + * NOTE. No checking op operation types, as one can read different save + * points... + */ +#if 0 + if (lop == ZDELETE && (op != ZINSERT && op != ZWRITE)) + { + jam(); + return ZREAD_ERROR; + } +#else + if (lop == ZDELETE && (op == ZUPDATE && op == ZDELETE)) + { + jam(); + return ZREAD_ERROR; + } +#endif + + if(op == ZWRITE) + { + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits |= (lop == ZDELETE) ? ZINSERT : ZUPDATE; + } + + opbits |= Operationrec::OP_STATE_RUNNING; + operationRecPtr.p->localdata[0] = lastOpPtr.p->localdata[0]; + operationRecPtr.p->localdata[1] = lastOpPtr.p->localdata[1]; + retValue = ZPARALLEL_QUEUE; } - operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1]; - operationRecPtr.p->prevParallelQue = mlpqOperPtr.i; - mlpqOperPtr.p->nextParallelQue = operationRecPtr.i; - return ZPARALLEL_QUEUE; + opbits |= Operationrec::OP_RUN_QUEUE; + operationRecPtr.p->m_op_bits = opbits; + operationRecPtr.p->prevParallelQue = lastOpPtr.i; + operationRecPtr.p->m_lock_owner_ptr_i = lockOwnerPtr.i; + lastOpPtr.p->nextParallelQue = operationRecPtr.i; + lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i = operationRecPtr.i; + + validate_lock_queue(lockOwnerPtr); + + return retValue; }//Dbacc::placeWriteInLockQueue() -/* --------------------------------------------------------------------------------- */ -/* WE HAVE TO PLACE IT SOMEWHERE IN THE SERIAL QUEUE INSTEAD. */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::placeSerialQueueWrite(Signal* signal) +Uint32 +Dbacc::placeReadInLockQueue(OperationrecPtr lockOwnerPtr) { - readWriteOpPtr = queOperPtr; - PSQW_LOOP: - if (readWriteOpPtr.p->nextSerialQue == RNIL) { + OperationrecPtr lastOpPtr; + OperationrecPtr loopPtr = lockOwnerPtr; + lastOpPtr.i = lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i; + Uint32 opbits = operationRecPtr.p->m_op_bits; + + if (lastOpPtr.i == RNIL) + { + lastOpPtr = lockOwnerPtr; + } + else + { + ptrCheckGuard(lastOpPtr, coprecsize, operationrec); + } + + ndbassert(get_parallel_head(lastOpPtr) == lockOwnerPtr.i); + + /** + * Last operation in parallell queue of lock owner is same trans + * and ACC_LOCK_MODE is exlusive, then we can proceed + */ + Uint32 lastbits = lastOpPtr.p->m_op_bits; + bool same = operationRecPtr.p->is_same_trans(lastOpPtr.p); + if (same && (lastbits & Operationrec::OP_ACC_LOCK_MODE)) + { jam(); - /* --------------------------------------------------------------------------------- */ - /* WE COULD NOT PUT IN ANY PARALLEL QUEUE. WE MUST PUT IT LAST IN THE SERIAL QUEUE. */ - /* --------------------------------------------------------------------------------- */ - readWriteOpPtr.p->nextSerialQue = operationRecPtr.i; - operationRecPtr.p->prevSerialQue = readWriteOpPtr.i; - return; - }//if - readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue; - ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec); - if (getNoParallelTransaction(readWriteOpPtr.p) == 1) { - /* --------------------------------------------------------------------------------- */ - /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */ - /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */ - /* --------------------------------------------------------------------------------- */ - if ((readWriteOpPtr.p->transId1 == operationRecPtr.p->transId1) && - (readWriteOpPtr.p->transId2 == operationRecPtr.p->transId2)) { + goto checkop; + } + + if ((lastbits & Operationrec::OP_ACC_LOCK_MODE) && !same) + { + jam(); + /** + * Last op in serial queue had X-lock and was not our transaction... + */ + goto serial; + } + + if (lockOwnerPtr.p->nextSerialQue == RNIL) + { + jam(); + goto checkop; + } + + /** + * Scan parallell queue to see if we are already there... + */ + do + { + ptrCheckGuard(loopPtr, coprecsize, operationrec); + if (loopPtr.p->is_same_trans(operationRecPtr.p)) + goto checkop; + loopPtr.i = loopPtr.p->nextParallelQue; + } while (loopPtr.i != RNIL); + +serial: + placeSerialQueue(lockOwnerPtr, operationRecPtr); + + validate_lock_queue(lockOwnerPtr); + + return ZSERIAL_QUEUE; + +checkop: + Uint32 lstate = lastbits & Operationrec::OP_STATE_MASK; + + Uint32 retValue = ZSERIAL_QUEUE; // So that it gets blocked... + if (lstate == Operationrec::OP_STATE_EXECUTED) + { + jam(); + + /** + * NOTE. No checking op operation types, as one can read different save + * points... + */ + +#if 0 + /** + * Since last operation has executed...we can now check operation types + * if not, we have to wait until it has executed + */ + if (lop == ZDELETE) + { jam(); - /* --------------------------------------------------------------------------------- */ - /* WE ARE PERFORMING AN UPDATE OR DELETE IN THE SAME TRANSACTION WHERE WE ALREADY */ - /* PREVIOUSLY HAVE EXECUTED AN OPERATION. INSERT-DELETE, READ-UPDATE, READ-READ, */ - /* UPDATE-UPDATE, UPDATE-DELETE, READ-DELETE, INSERT-READ, INSERT-UPDATE ARE ALLOWED */ - /* COMBINATIONS. A NEW INSERT AFTER A DELETE IS NOT ALLOWED AND SUCH AN INSERT WILL */ - /* GO TO THE SERIAL LOCK QUEUE WHICH IT WILL NOT LEAVE UNTIL A TIME-OUT AND THE */ - /* TRANSACTION IS ABORTED. READS AND UPDATES AFTER DELETES IS ALSO NOT ALLOWED. */ - /* --------------------------------------------------------------------------------- */ - mlpqOperPtr = readWriteOpPtr; - moveLastParallelQueueWrite(signal); - readWriteOpPtr = mlpqOperPtr; - operationRecPtr.p->prevParallelQue = readWriteOpPtr.i; - readWriteOpPtr.p->nextParallelQue = operationRecPtr.i; - operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1]; - return; - }//if - }//if - goto PSQW_LOOP; -}//Dbacc::placeSerialQueueWrite() + return ZREAD_ERROR; + } +#endif + + opbits |= Operationrec::OP_STATE_RUNNING; + operationRecPtr.p->localdata[0] = lastOpPtr.p->localdata[0]; + operationRecPtr.p->localdata[1] = lastOpPtr.p->localdata[1]; + retValue = ZPARALLEL_QUEUE; + } + opbits |= (lastbits & Operationrec::OP_ACC_LOCK_MODE); + opbits |= Operationrec::OP_RUN_QUEUE; + operationRecPtr.p->m_op_bits = opbits; + + operationRecPtr.p->prevParallelQue = lastOpPtr.i; + operationRecPtr.p->m_lock_owner_ptr_i = lockOwnerPtr.i; + lastOpPtr.p->nextParallelQue = operationRecPtr.i; + lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i = operationRecPtr.i; + + validate_lock_queue(lockOwnerPtr); + + return retValue; +}//Dbacc::placeReadInLockQueue + +void Dbacc::placeSerialQueue(OperationrecPtr lockOwnerPtr, + OperationrecPtr opPtr) +{ + OperationrecPtr lastOpPtr; + lastOpPtr.i = lockOwnerPtr.p->m_lo_last_serial_op_ptr_i; + + if (lastOpPtr.i == RNIL) + { + // Lock owner is last... + ndbrequire(lockOwnerPtr.p->nextSerialQue == RNIL); + lastOpPtr = lockOwnerPtr; + } + else + { + ptrCheckGuard(lastOpPtr, coprecsize, operationrec); + } + + operationRecPtr.p->prevSerialQue = lastOpPtr.i; + lastOpPtr.p->nextSerialQue = opPtr.i; + lockOwnerPtr.p->m_lo_last_serial_op_ptr_i = opPtr.i; +} /* ------------------------------------------------------------------------- */ /* ACC KEYREQ END */ /* ------------------------------------------------------------------------- */ void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code) { - if (operationRecPtr.p->keyinfoPage != RNIL) { - jam(); - rpPageptr.i = operationRecPtr.p->keyinfoPage; - ptrCheckGuard(rpPageptr, cpagesize, page8); - releasePage(signal); - operationRecPtr.p->keyinfoPage = RNIL; - }//if - operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; /* ************************<< */ /* ACCKEYREF */ /* ************************<< */ @@ -1658,15 +2239,15 @@ void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code) return; }//Dbacc::acckeyref1Lab() -/* ******************--------------------------------------------------------------- */ -/* ACCMINUPDATE UPDATE LOCAL KEY REQ */ -/* DESCRIPTION: UPDATES LOCAL KEY OF AN ELEMENTS IN THE HASH TABLE */ -/* THIS SIGNAL IS WAITED AFTER ANY INSERT REQ */ -/* ENTER ACCMINUPDATE WITH SENDER: LQH, LEVEL B */ -/* OPERATION_REC_PTR, OPERATION RECORD PTR */ -/* CLOCALKEY(0), LOCAL KEY 1 */ -/* CLOCALKEY(1) LOCAL KEY 2 */ -/* ******************--------------------------------------------------------------- */ +/* ******************----------------------------------------------------- */ +/* ACCMINUPDATE UPDATE LOCAL KEY REQ */ +/* DESCRIPTION: UPDATES LOCAL KEY OF AN ELEMENTS IN THE HASH TABLE */ +/* THIS SIGNAL IS WAITED AFTER ANY INSERT REQ */ +/* ENTER ACCMINUPDATE WITH SENDER: LQH, LEVEL B */ +/* OPERATION_REC_PTR, OPERATION RECORD PTR */ +/* CLOCALKEY(0), LOCAL KEY 1 */ +/* CLOCALKEY(1) LOCAL KEY 2 */ +/* ******************----------------------------------------------------- */ void Dbacc::execACCMINUPDATE(Signal* signal) { Page8Ptr ulkPageidptr; @@ -1679,19 +2260,26 @@ void Dbacc::execACCMINUPDATE(Signal* signal) tlocalkey1 = signal->theData[1]; tlocalkey2 = signal->theData[2]; ptrCheckGuard(operationRecPtr, coprecsize, operationrec); - if (operationRecPtr.p->transactionstate == ACTIVE) { - fragrecptr.i = operationRecPtr.p->fragptr; - ulkPageidptr.i = operationRecPtr.p->elementPage; - tulkLocalPtr = operationRecPtr.p->elementPointer + operationRecPtr.p->elementIsforward; + Uint32 opbits = operationRecPtr.p->m_op_bits; + fragrecptr.i = operationRecPtr.p->fragptr; + ulkPageidptr.i = operationRecPtr.p->elementPage; + tulkLocalPtr = operationRecPtr.p->elementPointer + + operationRecPtr.p->elementIsforward; + + if ((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_RUNNING) + { ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); ptrCheckGuard(ulkPageidptr, cpagesize, page8); dbgWord32(ulkPageidptr, tulkLocalPtr, tlocalkey1); arrGuard(tulkLocalPtr, 2048); ulkPageidptr.p->word32[tulkLocalPtr] = tlocalkey1; operationRecPtr.p->localdata[0] = tlocalkey1; - if (fragrecptr.p->localkeylen == 1) { + if (likely(fragrecptr.p->localkeylen == 1)) + { return; - } else if (fragrecptr.p->localkeylen == 2) { + } + else if (fragrecptr.p->localkeylen == 2) + { jam(); tulkLocalPtr = tulkLocalPtr + operationRecPtr.p->elementIsforward; operationRecPtr.p->localdata[1] = tlocalkey2; @@ -1715,15 +2303,17 @@ void Dbacc::execACC_COMMITREQ(Signal* signal) { Uint8 Toperation; jamEntry(); - operationRecPtr.i = signal->theData[0]; + Uint32 tmp = operationRecPtr.i = signal->theData[0]; ptrCheckGuard(operationRecPtr, coprecsize, operationrec); - ndbrequire(operationRecPtr.p->transactionstate == ACTIVE); + void* ptr = operationRecPtr.p; + Uint32 opbits = operationRecPtr.p->m_op_bits; fragrecptr.i = operationRecPtr.p->fragptr; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); + Toperation = opbits & Operationrec::OP_MASK; commitOperation(signal); - Toperation = operationRecPtr.p->operation; - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; + ndbassert(operationRecPtr.i == tmp); + ndbassert(operationRecPtr.p == ptr); + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; if(Toperation != ZREAD){ fragrecptr.p->m_commit_count++; if (Toperation != ZINSERT) { @@ -1732,7 +2322,7 @@ void Dbacc::execACC_COMMITREQ(Signal* signal) } else { jam(); fragrecptr.p->noOfElements--; - fragrecptr.p->slack += operationRecPtr.p->insertDeleteLen; + fragrecptr.p->slack += fragrecptr.p->elementLength; if (fragrecptr.p->slack > fragrecptr.p->slackCheck) { /* TIME FOR JOIN BUCKETS PROCESS */ if (fragrecptr.p->expandCounter > 0) { @@ -1751,7 +2341,7 @@ void Dbacc::execACC_COMMITREQ(Signal* signal) } else { jam(); /* EXPAND PROCESS HANDLING */ fragrecptr.p->noOfElements++; - fragrecptr.p->slack -= operationRecPtr.p->insertDeleteLen; + fragrecptr.p->slack -= fragrecptr.p->elementLength; if (fragrecptr.p->slack >= (1u << 31)) { /* IT MEANS THAT IF SLACK < ZERO */ if (fragrecptr.p->expandFlag == 0) { @@ -1768,45 +2358,57 @@ void Dbacc::execACC_COMMITREQ(Signal* signal) return; }//Dbacc::execACC_COMMITREQ() -/* ******************--------------------------------------------------------------- */ -/* ACC ABORT REQ ABORT ALL OPERATION OF THE TRANSACTION */ -/* ******************------------------------------+ */ -/* SENDER: LQH, LEVEL B */ -/* ******************--------------------------------------------------------------- */ -/* ACC ABORT REQ ABORT TRANSACTION */ -/* ******************------------------------------+ */ +/* ******************------------------------------------------------------- */ +/* ACC ABORT REQ ABORT ALL OPERATION OF THE TRANSACTION */ +/* ******************------------------------------+ */ +/* SENDER: LQH, LEVEL B */ +/* ******************------------------------------------------------------- */ +/* ACC ABORT REQ ABORT TRANSACTION */ +/* ******************------------------------------+ */ /* SENDER: LQH, LEVEL B */ void Dbacc::execACC_ABORTREQ(Signal* signal) { jamEntry(); - accAbortReqLab(signal); -}//Dbacc::execACC_ABORTREQ() - -void Dbacc::accAbortReqLab(Signal* signal) -{ operationRecPtr.i = signal->theData[0]; - bool sendConf = signal->theData[1]; + Uint32 sendConf = signal->theData[1]; ptrCheckGuard(operationRecPtr, coprecsize, operationrec); + fragrecptr.i = operationRecPtr.p->fragptr; + Uint32 opbits = operationRecPtr.p->m_op_bits; + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; tresult = 0; /* ZFALSE */ - if ((operationRecPtr.p->transactionstate == ACTIVE) || - (operationRecPtr.p->transactionstate == WAIT_COMMIT_ABORT)) { + + if (opbits == Operationrec::OP_EXECUTED_DIRTY_READ) + { + jam(); + } + else if (opstate == Operationrec::OP_STATE_EXECUTED || + opstate == Operationrec::OP_STATE_WAITING || + opstate == Operationrec::OP_STATE_RUNNING) + { jam(); - fragrecptr.i = operationRecPtr.p->fragptr; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); - operationRecPtr.p->transactionstate = ABORT; abortOperation(signal); - } else { - ndbrequire(operationRecPtr.p->transactionstate == IDLE); - jam(); - }//if - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; - if (! sendConf) - return; + } + + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; + signal->theData[0] = operationRecPtr.p->userptr; - sendSignal(operationRecPtr.p->userblockref, GSN_ACC_ABORTCONF, signal, 1, JBB); - return; -}//Dbacc::accAbortReqLab() + signal->theData[1] = 0; + switch(sendConf){ + case 0: + return; + case 2: + if (opstate != Operationrec::OP_STATE_RUNNING) + { + return; + } + case 1: + sendSignal(operationRecPtr.p->userblockref, GSN_ACC_ABORTCONF, + signal, 1, JBB); + } + + signal->theData[1] = RNIL; +} /* * Lock or unlock tuple. @@ -1847,8 +2449,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) // init as in ACCSEIZEREQ operationRecPtr.p->userptr = req->userPtr; operationRecPtr.p->userblockref = req->userRef; - operationRecPtr.p->operation = ZUNDEFINED_OP; - operationRecPtr.p->transactionstate = IDLE; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; operationRecPtr.p->scanRecPtr = RNIL; // do read with lock via ACCKEYREQ Uint32 lockMode = (lockOp == AccLockReq::LockShared) ? 0 : 1; @@ -1899,8 +2500,8 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) jam(); // do abort via ACC_ABORTREQ (immediate) signal->theData[0] = req->accOpPtr; - signal->theData[1] = false; // Dont send abort - accAbortReqLab(signal); + signal->theData[1] = 0; // Dont send abort + execACC_ABORTREQ(signal); releaseOpRec(signal); req->returnCode = AccLockReq::Success; *sig = *req; @@ -1910,8 +2511,8 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) jam(); // do abort via ACC_ABORTREQ (with conf signal) signal->theData[0] = req->accOpPtr; - signal->theData[1] = true; // send abort - accAbortReqLab(signal); + signal->theData[1] = 1; // send abort + execACC_ABORTREQ(signal); releaseOpRec(signal); req->returnCode = AccLockReq::Success; *sig = *req; @@ -2570,16 +3171,30 @@ void Dbacc::getdirindex(Signal* signal) }//Dbacc::getdirindex() Uint32 -Dbacc::readTablePk(Uint32 localkey1) +Dbacc::readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec* op) { + int ret; Uint32 tableId = fragrecptr.p->myTableId; Uint32 fragId = fragrecptr.p->myfid; - Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS; - Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1); + bool xfrm = fragrecptr.p->hasCharAttr; + #ifdef VM_TRACE memset(ckeys, 0x1f, (fragrecptr.p->keyLength * MAX_XFRM_MULTIPLY) << 2); #endif - int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys, true); + + if (likely(localkey1 != ~(Uint32)0)) + { + Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS; + Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1); + ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, + ckeys, true); + } + else + { + ndbrequire(ElementHeader::getLocked(eh)); + ndbrequire((op->m_op_bits & Operationrec::OP_MASK) != ZSCAN_OP); + ret = c_lqh->readPrimaryKeys(op->userptr, ckeys, xfrm); + } jamEntry(); ndbrequire(ret >= 0); return ret; @@ -2613,11 +3228,12 @@ void ndb_acc_ia64_icc810_dummy_func() #endif #endif -void Dbacc::getElement(Signal* signal) +Uint32 +Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr) { + Uint32 errcode; DirRangePtr geOverflowrangeptr; DirectoryarrayPtr geOverflowDirptr; - OperationrecPtr geTmpOperationRecPtr; Uint32 tgeElementHeader; Uint32 tgeElemStep; Uint32 tgeContainerhead; @@ -2632,7 +3248,6 @@ void Dbacc::getElement(Signal* signal) getdirindex(signal); tgePageindex = tgdiPageindex; gePageptr = gdiPageptr; - tgeResult = ZFALSE; /* * The value seached is * - table key for ACCKEYREQ, stored in TUP @@ -2642,7 +3257,6 @@ void Dbacc::getElement(Signal* signal) ndbrequire(TelemLen == ZELEM_HEAD_SIZE + fragrecptr.p->localkeylen); tgeNextptrtype = ZLEFT; - tgeLocked = 0; const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits; const Uint32 opHashValuePart = (operationRecPtr.p->hashValue >> tmp) &0xFFFF; @@ -2655,9 +3269,17 @@ void Dbacc::getElement(Signal* signal) tgeKeyptr = (tgeElementptr + ZELEM_HEAD_SIZE) + fragrecptr.p->localkeylen; tgeElemStep = TelemLen; tgeForward = 1; - if (tgeContainerptr >= 2048) { ACCKEY_error(4); return;} + if (unlikely(tgeContainerptr >= 2048)) + { + errcode = 4; + goto error; + } tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26; - if ((tgeContainerptr + tgeRemLen - 1) >= 2048) { ACCKEY_error(5); return;} + if (unlikely(((tgeContainerptr + tgeRemLen - 1) >= 2048))) + { + errcode = 5; + goto error; + } } else if (tgeNextptrtype == ZRIGHT) { jam(); tgeContainerptr = tgeContainerptr + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE); @@ -2665,53 +3287,69 @@ void Dbacc::getElement(Signal* signal) tgeKeyptr = (tgeElementptr - ZELEM_HEAD_SIZE) - fragrecptr.p->localkeylen; tgeElemStep = 0 - TelemLen; tgeForward = (Uint32)-1; - if (tgeContainerptr >= 2048) { ACCKEY_error(4); return;} + if (unlikely(tgeContainerptr >= 2048)) + { + errcode = 4; + goto error; + } tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26; - if ((tgeContainerptr - tgeRemLen) >= 2048) { ACCKEY_error(5); return;} + if (unlikely((tgeContainerptr - tgeRemLen) >= 2048)) + { + errcode = 5; + goto error; + } } else { - ACCKEY_error(6); return; + errcode = 6; + goto error; }//if if (tgeRemLen >= ZCON_HEAD_SIZE + TelemLen) { - if (tgeRemLen > ZBUF_SIZE) { - ACCKEY_error(7); return; + if (unlikely(tgeRemLen > ZBUF_SIZE)) + { + errcode = 7; + goto error; }//if - /* --------------------------------------------------------------------------------- */ - // There is at least one element in this container. Check if it is the element - // searched for. - /* --------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------- */ + // There is at least one element in this container. + // Check if it is the element searched for. + /* ------------------------------------------------------------------- */ do { tgeElementHeader = gePageptr.p->word32[tgeElementptr]; tgeRemLen = tgeRemLen - TelemLen; Uint32 hashValuePart; + Uint32 localkey1, localkey2; + lockOwnerPtr.i = RNIL; + lockOwnerPtr.p = NULL; if (ElementHeader::getLocked(tgeElementHeader)) { jam(); - geTmpOperationRecPtr.i = ElementHeader::getOpPtrI(tgeElementHeader); - ptrCheckGuard(geTmpOperationRecPtr, coprecsize, operationrec); - hashValuePart = geTmpOperationRecPtr.p->hashvaluePart; + lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader); + ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec); + hashValuePart = lockOwnerPtr.p->hashvaluePart; + localkey1 = lockOwnerPtr.p->localdata[0]; + localkey2 = lockOwnerPtr.p->localdata[1]; } else { jam(); hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader); + localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward]; + localkey2 = 0; } if (hashValuePart == opHashValuePart) { jam(); - Uint32 localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward]; - Uint32 localkey2 = 0; bool found; - if (! searchLocalKey) { - Uint32 len = readTablePk(localkey1); + if (! searchLocalKey) + { + Uint32 len = readTablePk(localkey1, tgeElementptr, lockOwnerPtr.p); found = (len == operationRecPtr.p->xfrmtupkeylen) && (memcmp(Tkeydata, ckeys, len << 2) == 0); } else { jam(); found = (localkey1 == Tkeydata[0]); } - if (found) { + if (found) + { jam(); - tgeLocked = ElementHeader::getLocked(tgeElementHeader); - tgeResult = ZTRUE; operationRecPtr.p->localdata[0] = localkey1; operationRecPtr.p->localdata[1] = localkey2; - return; + return ZTRUE; } } if (tgeRemLen <= ZCON_HEAD_SIZE) { @@ -2720,18 +3358,22 @@ void Dbacc::getElement(Signal* signal) tgeElementptr = tgeElementptr + tgeElemStep; } while (true); }//if - if (tgeRemLen != ZCON_HEAD_SIZE) { - ACCKEY_error(8); return; + if (unlikely(tgeRemLen != ZCON_HEAD_SIZE)) + { + errcode = 8; + goto error; }//if tgeContainerhead = gePageptr.p->word32[tgeContainerptr]; tgeNextptrtype = (tgeContainerhead >> 7) & 0x3; if (tgeNextptrtype == 0) { jam(); - return; /* NO MORE CONTAINER */ + return ZFALSE; /* NO MORE CONTAINER */ }//if tgePageindex = tgeContainerhead & 0x7f; /* NEXT CONTAINER PAGE INDEX 7 BITS */ - if (tgePageindex > ZEMPTYLIST) { - ACCKEY_error(9); return; + if (unlikely(tgePageindex > ZEMPTYLIST)) + { + errcode = 9; + goto error; }//if if (((tgeContainerhead >> 9) & 1) == ZFALSE) { jam(); @@ -2745,55 +3387,72 @@ void Dbacc::getElement(Signal* signal) ptrCheckGuard(gePageptr, cpagesize, page8); }//if } while (1); - return; + + return ZFALSE; + +error: + ACCKEY_error(errcode); + return ~0; }//Dbacc::getElement() -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* */ -/* END OF GET_ELEMENT MODULE */ -/* */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* */ -/* MODULE: DELETE */ -/* */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* COMMITDELETE */ -/* INPUT: OPERATION_REC_PTR, PTR TO AN OPERATION RECORD. */ -/* FRAGRECPTR, PTR TO A FRAGMENT RECORD */ -/* */ -/* OUTPUT: */ -/* NONE */ -/* DESCRIPTION: DELETE OPERATIONS WILL BE COMPLETED AT THE COMMIT OF TRANSA- */ -/* CTION. THIS SUBROUTINE SEARCHS FOR ELEMENT AND DELETES IT. IT DOES SO BY */ -/* REPLACING IT WITH THE LAST ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT */ -/* IS ALSO THE LAST ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT. */ -/* --------------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* */ +/* END OF GET_ELEMENT MODULE */ +/* */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* */ +/* MODULE: DELETE */ +/* */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* COMMITDELETE */ +/* INPUT: OPERATION_REC_PTR, PTR TO AN OPERATION RECORD. */ +/* FRAGRECPTR, PTR TO A FRAGMENT RECORD */ +/* */ +/* OUTPUT: */ +/* NONE */ +/* DESCRIPTION: DELETE OPERATIONS WILL BE COMPLETED AT THE + * COMMIT OF TRANSACTION. THIS SUBROUTINE SEARCHS FOR ELEMENT AND + * DELETES IT. IT DOES SO BY REPLACING IT WITH THE LAST + * ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT IS ALSO THE LAST + * ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT + * ------------------------------------------------------------------------- */ +void +Dbacc::report_dealloc(Signal* signal, const Operationrec* opPtrP) +{ + Uint32 localKey = opPtrP->localdata[0]; + Uint32 opbits = opPtrP->m_op_bits; + Uint32 userptr= opPtrP->userptr; + Uint32 scanInd = + ((opbits & Operationrec::OP_MASK) == ZSCAN_OP) || + (opbits & Operationrec::OP_LOCK_REQ); + + if (localKey != ~(Uint32)0) + { + signal->theData[0] = fragrecptr.p->myfid; + signal->theData[1] = fragrecptr.p->myTableId; + Uint32 pageId = localKey >> MAX_TUPLES_BITS; + Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1); + signal->theData[2] = pageId; + signal->theData[3] = pageIndex; + signal->theData[4] = userptr; + signal->theData[5] = scanInd; + EXECUTE_DIRECT(DBLQH, GSN_TUP_DEALLOCREQ, signal, 6); + jamEntry(); + } +} + void Dbacc::commitdelete(Signal* signal) { jam(); - Uint32 localKey = operationRecPtr.p->localdata[0]; - Uint32 userptr= operationRecPtr.p->userptr; - Uint32 scanInd = operationRecPtr.p->operation == ZSCAN_OP - || operationRecPtr.p->isAccLockReq; - - signal->theData[0] = fragrecptr.p->myfid; - signal->theData[1] = fragrecptr.p->myTableId; - Uint32 pageId = localKey >> MAX_TUPLES_BITS; - Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1); - signal->theData[2] = pageId; - signal->theData[3] = pageIndex; - signal->theData[4] = userptr; - signal->theData[5] = scanInd; - EXECUTE_DIRECT(DBLQH, GSN_TUP_DEALLOCREQ, signal, 6); - jamEntry(); + report_dealloc(signal, operationRecPtr.p); getdirindex(signal); tlastPageindex = tgdiPageindex; @@ -3282,31 +3941,337 @@ void Dbacc::checkoverfreelist(Signal* signal) /*BE CHECKED. THE OPERATION RECORD WILL BE REMOVED FROM THE QUEUE IF IT */ /*BELONGED TO ANY ONE, OTHERWISE THE ELEMENT HEAD WILL BE UPDATED. */ /* ------------------------------------------------------------------------- */ + +/** + * + * P0 - P1 - P2 - P3 + * S0 + * S1 + * S2 + */ +void +Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr) +{ + jam(); + OperationrecPtr nextP; + OperationrecPtr prevP; + OperationrecPtr loPtr; + + Uint32 opbits = opPtr.p->m_op_bits; + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; + nextP.i = opPtr.p->nextParallelQue; + prevP.i = opPtr.p->prevParallelQue; + loPtr.i = opPtr.p->m_lock_owner_ptr_i; + + ndbassert(! (opbits & Operationrec::OP_LOCK_OWNER)); + ndbassert(opbits & Operationrec::OP_RUN_QUEUE); + + ptrCheckGuard(prevP, coprecsize, operationrec); + ndbassert(prevP.p->nextParallelQue == opPtr.i); + prevP.p->nextParallelQue = nextP.i; + + if (nextP.i != RNIL) + { + ptrCheckGuard(nextP, coprecsize, operationrec); + ndbassert(nextP.p->prevParallelQue == opPtr.i); + nextP.p->prevParallelQue = prevP.i; + } + else if (prevP.i != loPtr.i) + { + jam(); + ptrCheckGuard(loPtr, coprecsize, operationrec); + ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + ndbassert(loPtr.p->m_lo_last_parallel_op_ptr_i == opPtr.i); + loPtr.p->m_lo_last_parallel_op_ptr_i = prevP.i; + prevP.p->m_lock_owner_ptr_i = loPtr.i; + + /** + * Abort P3...check start next + */ + startNext(signal, prevP); + validate_lock_queue(prevP); + return; + } + else + { + jam(); + /** + * P0 - P1 + * + * Abort P1, check start next + */ + ndbassert(prevP.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + prevP.p->m_lo_last_parallel_op_ptr_i = RNIL; + startNext(signal, prevP); + validate_lock_queue(prevP); + return; + } + + /** + * Abort P1/P2 + */ + if (opbits & Operationrec::OP_LOCK_MODE) + { + Uint32 nextbits = nextP.p->m_op_bits; + while ((nextbits & Operationrec::OP_LOCK_MODE) == 0) + { + ndbassert(nextbits & Operationrec::OP_ACC_LOCK_MODE); + nextbits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE; + nextP.p->m_op_bits = nextbits; + + if (nextP.p->nextParallelQue != RNIL) + { + nextP.i = nextP.p->nextParallelQue; + ptrCheckGuard(nextP, coprecsize, operationrec); + nextbits = nextP.p->m_op_bits; + } + else + { + break; + } + } + } + + /** + * Abort P1, P2 + */ + if (opstate == Operationrec::OP_STATE_RUNNING) + { + jam(); + startNext(signal, prevP); + validate_lock_queue(prevP); + return; + } + + ndbassert(opstate == Operationrec::OP_STATE_EXECUTED || + opstate == Operationrec::OP_STATE_WAITING); + + /** + * Scan to last of run queue + */ + while (nextP.p->nextParallelQue != RNIL) + { + jam(); + nextP.i = nextP.p->nextParallelQue; + ptrCheckGuard(nextP, coprecsize, operationrec); + } + +#ifdef VM_TRACE + loPtr.i = nextP.p->m_lock_owner_ptr_i; + ptrCheckGuard(loPtr, coprecsize, operationrec); + ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + ndbassert(loPtr.p->m_lo_last_parallel_op_ptr_i == nextP.i); +#endif + startNext(signal, nextP); + validate_lock_queue(nextP); + + return; +} + +void +Dbacc::abortSerieQueueOperation(Signal* signal, OperationrecPtr opPtr) +{ + jam(); + OperationrecPtr prevS, nextS; + OperationrecPtr prevP, nextP; + OperationrecPtr loPtr; + + Uint32 opbits = opPtr.p->m_op_bits; + + prevS.i = opPtr.p->prevSerialQue; + nextS.i = opPtr.p->nextSerialQue; + + prevP.i = opPtr.p->prevParallelQue; + nextP.i = opPtr.p->nextParallelQue; + + ndbassert((opbits & Operationrec::OP_LOCK_OWNER) == 0); + ndbassert((opbits & Operationrec::OP_RUN_QUEUE) == 0); + + if (prevP.i != RNIL) + { + /** + * We're not list head... + */ + ptrCheckGuard(prevP, coprecsize, operationrec); + ndbassert(prevP.p->nextParallelQue == opPtr.i); + prevP.p->nextParallelQue = nextP.i; + + if (nextP.i != RNIL) + { + ptrCheckGuard(nextP, coprecsize, operationrec); + ndbassert(nextP.p->prevParallelQue == opPtr.i); + ndbassert((nextP.p->m_op_bits & Operationrec::OP_STATE_MASK) == + Operationrec::OP_STATE_WAITING); + nextP.p->prevParallelQue = prevP.i; + + if ((prevP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE) == 0 && + opbits & Operationrec::OP_LOCK_MODE) + { + /** + * Scan right in parallel queue to fix OP_ACC_LOCK_MODE + */ + while ((nextP.p->m_op_bits & Operationrec::OP_LOCK_MODE) == 0) + { + ndbassert(nextP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE); + nextP.p->m_op_bits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE; + nextP.i = nextP.p->nextParallelQue; + if (nextP.i == RNIL) + break; + ptrCheckGuard(nextP, coprecsize, operationrec); + } + } + } + validate_lock_queue(prevP); + return; + } + else + { + /** + * We're a list head + */ + ptrCheckGuard(prevS, coprecsize, operationrec); + ndbassert(prevS.p->nextSerialQue == opPtr.i); + + if (nextP.i != RNIL) + { + /** + * Promote nextP to list head + */ + ptrCheckGuard(nextP, coprecsize, operationrec); + ndbassert(nextP.p->prevParallelQue == opPtr.i); + prevS.p->nextSerialQue = nextP.i; + nextP.p->prevParallelQue = RNIL; + nextP.p->nextSerialQue = nextS.i; + if (nextS.i != RNIL) + { + jam(); + ptrCheckGuard(nextS, coprecsize, operationrec); + ndbassert(nextS.p->prevSerialQue == opPtr.i); + nextS.p->prevSerialQue = nextP.i; + validate_lock_queue(prevS); + return; + } + else + { + // nextS is RNIL, i.e we're last in serie queue... + // we must update lockOwner.m_lo_last_serial_op_ptr_i + loPtr = prevS; + while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0) + { + loPtr.i = loPtr.p->prevSerialQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + ndbassert(loPtr.p->m_lo_last_serial_op_ptr_i == opPtr.i); + loPtr.p->m_lo_last_serial_op_ptr_i = nextP.i; + validate_lock_queue(loPtr); + return; + } + } + + if (nextS.i == RNIL) + { + /** + * Abort S2 + */ + + // nextS is RNIL, i.e we're last in serie queue... + // and we have no parallel queue, + // we must update lockOwner.m_lo_last_serial_op_ptr_i + prevS.p->nextSerialQue = RNIL; + + loPtr = prevS; + while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0) + { + loPtr.i = loPtr.p->prevSerialQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + ndbassert(loPtr.p->m_lo_last_serial_op_ptr_i == opPtr.i); + if (prevS.i != loPtr.i) + { + jam(); + loPtr.p->m_lo_last_serial_op_ptr_i = prevS.i; + } + else + { + loPtr.p->m_lo_last_serial_op_ptr_i = RNIL; + } + validate_lock_queue(loPtr); + } + else if (nextP.i == RNIL) + { + ptrCheckGuard(nextS, coprecsize, operationrec); + ndbassert(nextS.p->prevSerialQue == opPtr.i); + prevS.p->nextSerialQue = nextS.i; + nextS.p->prevSerialQue = prevS.i; + + if (prevS.p->m_op_bits & Operationrec::OP_LOCK_OWNER) + { + /** + * Abort S0 + */ + OperationrecPtr lastOp; + lastOp.i = prevS.p->m_lo_last_parallel_op_ptr_i; + if (lastOp.i != RNIL) + { + jam(); + ptrCheckGuard(lastOp, coprecsize, operationrec); + ndbassert(lastOp.p->m_lock_owner_ptr_i = prevS.i); + } + else + { + jam(); + lastOp = prevS; + } + startNext(signal, lastOp); + validate_lock_queue(lastOp); + } + else + { + validate_lock_queue(prevS); + } + } + } +} + + void Dbacc::abortOperation(Signal* signal) { - OperationrecPtr aboOperRecPtr; - OperationrecPtr TaboOperRecPtr; - Page8Ptr aboPageidptr; - Uint32 taboElementptr; - Uint32 tmp2Olq; + Uint32 opbits = operationRecPtr.p->m_op_bits; + + validate_lock_queue(operationRecPtr); - if (operationRecPtr.p->lockOwner == ZTRUE) { + if (opbits & Operationrec::OP_LOCK_OWNER) + { takeOutLockOwnersList(signal, operationRecPtr); - if (operationRecPtr.p->insertIsDone == ZTRUE) { + opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER; + if (opbits & Operationrec::OP_INSERT_IS_DONE) + { jam(); - operationRecPtr.p->elementIsDisappeared = ZTRUE; + opbits |= Operationrec::OP_ELEMENT_DISAPPEARED; }//if - if ((operationRecPtr.p->nextParallelQue != RNIL) || - (operationRecPtr.p->nextSerialQue != RNIL)) { + operationRecPtr.p->m_op_bits = opbits; + const bool queue = (operationRecPtr.p->nextParallelQue != RNIL || + operationRecPtr.p->nextSerialQue != RNIL); + + if (queue) + { jam(); - releaselock(signal); - } else { - /* --------------------------------------------------------------------------------- */ - /* WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED. IF INSERT OR STANDBY */ - /* WE DELETE THE ELEMENT OTHERWISE WE REMOVE THE LOCK FROM THE ELEMENT. */ - /* --------------------------------------------------------------------------------- */ - if (operationRecPtr.p->elementIsDisappeared == ZFALSE) { + release_lockowner(signal, operationRecPtr, false); + } + else + { + /* ------------------------------------------------------------------- + * WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED. + * IF INSERT OR STANDBY WE DELETE THE ELEMENT OTHERWISE WE REMOVE + * THE LOCK FROM THE ELEMENT. + * ------------------------------------------------------------------ */ + if ((opbits & Operationrec::OP_ELEMENT_DISAPPEARED) == 0) + { jam(); + Page8Ptr aboPageidptr; + Uint32 taboElementptr; + Uint32 tmp2Olq; + taboElementptr = operationRecPtr.p->elementPointer; aboPageidptr.i = operationRecPtr.p->elementPage; tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart, @@ -3316,87 +4281,31 @@ void Dbacc::abortOperation(Signal* signal) arrGuard(taboElementptr, 2048); aboPageidptr.p->word32[taboElementptr] = tmp2Olq; return; - } else { + } + else + { jam(); commitdelete(signal); }//if }//if - } else { - /* --------------------------------------------------------------- */ - // We are not the lock owner. - /* --------------------------------------------------------------- */ - jam(); - if (operationRecPtr.p->prevParallelQue != RNIL) { - jam(); - /* ---------------------------------------------------------------------------------- */ - /* SINCE WE ARE NOT QUEUE LEADER WE NEED NOT CONSIDER IF THE ELEMENT IS TO BE DELETED.*/ - /* We will simply remove it from the parallel list without any other rearrangements. */ - /* ---------------------------------------------------------------------------------- */ - aboOperRecPtr.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue; - if (operationRecPtr.p->nextParallelQue != RNIL) { - jam(); - aboOperRecPtr.i = operationRecPtr.p->nextParallelQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue; - }//if - } else if (operationRecPtr.p->prevSerialQue != RNIL) { - /* ------------------------------------------------------------------------- */ - // We are not in the parallel queue owning the lock. Thus we are in another parallel - // queue longer down in the serial queue. We are however first since prevParallelQue - // == RNIL. - /* ------------------------------------------------------------------------- */ - if (operationRecPtr.p->nextParallelQue != RNIL) { - jam(); - /* ------------------------------------------------------------------------- */ - // We have an operation in the queue after us. We simply rearrange this parallel queue. - // The new leader of this parallel queue will be operation in the serial queue. - /* ------------------------------------------------------------------------- */ - aboOperRecPtr.i = operationRecPtr.p->nextParallelQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue; - aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue; - aboOperRecPtr.p->prevParallelQue = RNIL; // Queue Leader - if (operationRecPtr.p->nextSerialQue != RNIL) { - jam(); - TaboOperRecPtr.i = operationRecPtr.p->nextSerialQue; - ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec); - TaboOperRecPtr.p->prevSerialQue = aboOperRecPtr.i; - }//if - TaboOperRecPtr.i = operationRecPtr.p->prevSerialQue; - ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec); - TaboOperRecPtr.p->nextSerialQue = aboOperRecPtr.i; - } else { - jam(); - /* ------------------------------------------------------------------------- */ - // We are the only operation in this parallel queue. We will thus shrink the serial - // queue. - /* ------------------------------------------------------------------------- */ - aboOperRecPtr.i = operationRecPtr.p->prevSerialQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue; - if (operationRecPtr.p->nextSerialQue != RNIL) { - jam(); - aboOperRecPtr.i = operationRecPtr.p->nextSerialQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue; - }//if - }//if - }//if - }//if - /* ------------------------------------------------------------------------- */ - // If prevParallelQue = RNIL and prevSerialQue = RNIL and we are not owner of the - // lock then we cannot be in any lock queue at all. - /* ------------------------------------------------------------------------- */ -}//Dbacc::abortOperation() + } + else if (opbits & Operationrec::OP_RUN_QUEUE) + { + abortParallelQueueOperation(signal, operationRecPtr); + } + else + { + abortSerieQueueOperation(signal, operationRecPtr); + } +} -void Dbacc::commitDeleteCheck() +void +Dbacc::commitDeleteCheck() { OperationrecPtr opPtr; OperationrecPtr lastOpPtr; OperationrecPtr deleteOpPtr; - bool elementDeleted = false; + Uint32 elementDeleted = 0; bool deleteCheckOngoing = true; Uint32 hashValue = 0; lastOpPtr = operationRecPtr; @@ -3409,7 +4318,9 @@ void Dbacc::commitDeleteCheck() }//while deleteOpPtr = lastOpPtr; do { - if (deleteOpPtr.p->operation == ZDELETE) { + Uint32 opbits = deleteOpPtr.p->m_op_bits; + Uint32 op = opbits & Operationrec::OP_MASK; + if (op == ZDELETE) { jam(); /* ------------------------------------------------------------------- * IF THE CURRENT OPERATION TO BE COMMITTED IS A DELETE OPERATION DUE TO @@ -3423,10 +4334,9 @@ void Dbacc::commitDeleteCheck() * DELETE-OPERATION THAT HAS A HASH VALUE. * ----------------------------------------------------------------- */ hashValue = deleteOpPtr.p->hashValue; - elementDeleted = true; + elementDeleted = Operationrec::OP_ELEMENT_DISAPPEARED; deleteCheckOngoing = false; - } else if ((deleteOpPtr.p->operation == ZREAD) || - (deleteOpPtr.p->operation == ZSCAN_OP)) { + } else if (op == ZREAD || op == ZSCAN_OP) { /* ------------------------------------------------------------------- * We are trying to find out whether the commit will in the end delete * the tuple. Normally the delete will be the last operation in the @@ -3437,7 +4347,7 @@ void Dbacc::commitDeleteCheck() * we have to continue scanning the list looking for a delete operation. */ deleteOpPtr.i = deleteOpPtr.p->prevParallelQue; - if (deleteOpPtr.i == RNIL) { + if (opbits & Operationrec::OP_LOCK_OWNER) { jam(); deleteCheckOngoing = false; } else { @@ -3456,14 +4366,14 @@ void Dbacc::commitDeleteCheck() opPtr = lastOpPtr; do { jam(); - opPtr.p->commitDeleteCheckFlag = ZTRUE; + opPtr.p->m_op_bits |= Operationrec::OP_COMMIT_DELETE_CHECK; if (elementDeleted) { jam(); - opPtr.p->elementIsDisappeared = ZTRUE; + opPtr.p->m_op_bits |= elementDeleted; opPtr.p->hashValue = hashValue; }//if opPtr.i = opPtr.p->prevParallelQue; - if (opPtr.i == RNIL) { + if (opPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) { jam(); break; }//if @@ -3479,14 +4389,13 @@ void Dbacc::commitDeleteCheck() /* ------------------------------------------------------------------------- */ void Dbacc::commitOperation(Signal* signal) { - OperationrecPtr tolqTmpPtr; - Page8Ptr coPageidptr; - Uint32 tcoElementptr; - Uint32 tmp2Olq; + validate_lock_queue(operationRecPtr); - if ((operationRecPtr.p->commitDeleteCheckFlag == ZFALSE) && - (operationRecPtr.p->operation != ZSCAN_OP) && - (operationRecPtr.p->operation != ZREAD)) { + Uint32 opbits = operationRecPtr.p->m_op_bits; + Uint32 op = opbits & Operationrec::OP_MASK; + ndbrequire((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_EXECUTED); + if ((opbits & Operationrec::OP_COMMIT_DELETE_CHECK) == 0 && (op != ZREAD)) + { jam(); /* This method is used to check whether the end result of the transaction will be to delete the tuple. In this case all operation will be marked @@ -3498,16 +4407,30 @@ void Dbacc::commitOperation(Signal* signal) lock is released. */ commitDeleteCheck(); + opbits = operationRecPtr.p->m_op_bits; }//if - if (operationRecPtr.p->lockOwner == ZTRUE) { + + ndbassert(opbits & Operationrec::OP_RUN_QUEUE); + + if (opbits & Operationrec::OP_LOCK_OWNER) + { takeOutLockOwnersList(signal, operationRecPtr); - if ((operationRecPtr.p->nextParallelQue == RNIL) && - (operationRecPtr.p->nextSerialQue == RNIL) && - (operationRecPtr.p->elementIsDisappeared == ZFALSE)) { + opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER; + operationRecPtr.p->m_op_bits = opbits; + + const bool queue = (operationRecPtr.p->nextParallelQue != RNIL || + operationRecPtr.p->nextSerialQue != RNIL); + + if (!queue && (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) == 0) + { /* - This is the normal path through the commit for operations owning the - lock without any queues and not a delete operation. - */ + * This is the normal path through the commit for operations owning the + * lock without any queues and not a delete operation. + */ + Page8Ptr coPageidptr; + Uint32 tcoElementptr; + Uint32 tmp2Olq; + coPageidptr.i = operationRecPtr.p->elementPage; tcoElementptr = operationRecPtr.p->elementPointer; tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart, @@ -3517,445 +4440,439 @@ void Dbacc::commitOperation(Signal* signal) arrGuard(tcoElementptr, 2048); coPageidptr.p->word32[tcoElementptr] = tmp2Olq; return; - } else if ((operationRecPtr.p->nextParallelQue != RNIL) || - (operationRecPtr.p->nextSerialQue != RNIL)) { + } + else if (queue) + { jam(); /* - The case when there is a queue lined up. - Release the lock and pass it to the next operation lined up. - */ - releaselock(signal); + * The case when there is a queue lined up. + * Release the lock and pass it to the next operation lined up. + */ + release_lockowner(signal, operationRecPtr, true); return; - } else { + } + else + { jam(); /* - No queue and elementIsDisappeared is true. We perform the actual delete - operation. - */ + * No queue and elementIsDisappeared is true. + * We perform the actual delete operation. + */ commitdelete(signal); return; }//if - } else { - /* - THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE - ELEMENT. - */ - ndbrequire(operationRecPtr.p->prevParallelQue != RNIL); + } + else + { + /** + * THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE + * ELEMENT. + */ jam(); - tolqTmpPtr.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec); - tolqTmpPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue; - if (operationRecPtr.p->nextParallelQue != RNIL) { + OperationrecPtr prev, next, lockOwner; + prev.i = operationRecPtr.p->prevParallelQue; + next.i = operationRecPtr.p->nextParallelQue; + lockOwner.i = operationRecPtr.p->m_lock_owner_ptr_i; + ptrCheckGuard(prev, coprecsize, operationrec); + + prev.p->nextParallelQue = next.i; + if (next.i != RNIL) + { jam(); - tolqTmpPtr.i = operationRecPtr.p->nextParallelQue; - ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec); - tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue; - }//if - + ptrCheckGuard(next, coprecsize, operationrec); + next.p->prevParallelQue = prev.i; + } + else if (prev.p->m_op_bits & Operationrec::OP_LOCK_OWNER) + { + jam(); + ndbassert(lockOwner.i == prev.i); + prev.p->m_lo_last_parallel_op_ptr_i = RNIL; + next = prev; + } + else + { + jam(); + /** + * Last operation in parallell queue + */ + ndbassert(prev.i != lockOwner.i); + ptrCheckGuard(lockOwner, coprecsize, operationrec); + ndbassert(lockOwner.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + lockOwner.p->m_lo_last_parallel_op_ptr_i = prev.i; + prev.p->m_lock_owner_ptr_i = lockOwner.i; + next = prev; + } + /** * Check possible lock upgrade - * 1) Find lock owner - * 2) Count transactions in parallel que - * 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner) - * upgrade next serial */ - if(operationRecPtr.p->lockMode) + if(opbits & Operationrec::OP_ACC_LOCK_MODE) { jam(); + /** - * Committing a non shared operation can't lead to lock upgrade + * Not lock owner...committing a exclusive operation... + * + * e.g + * T1(R) T1(X) + * T2(R/X) + * + * If T1(X) commits T2(R/X) is not supposed to run + * as T1(R) should also commit + * + * e.g + * T1(R) T1(X) T1*(R) + * T2(R/X) + * + * If T1*(R) commits T2(R/X) is not supposed to run + * as T1(R),T2(x) should also commit */ + validate_lock_queue(prev); return; } - - OperationrecPtr lock_owner; - lock_owner.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(lock_owner, coprecsize, operationrec); - Uint32 transid[2] = { lock_owner.p->transId1, - lock_owner.p->transId2 }; - - - while(lock_owner.p->prevParallelQue != RNIL) + + /** + * We committed a shared lock + * Check if we can start next... + */ + while(next.p->nextParallelQue != RNIL) { - lock_owner.i = lock_owner.p->prevParallelQue; - ptrCheckGuard(lock_owner, coprecsize, operationrec); + jam(); + next.i = next.p->nextParallelQue; + ptrCheckGuard(next, coprecsize, operationrec); - if(lock_owner.p->transId1 != transid[0] || - lock_owner.p->transId2 != transid[1]) + if ((next.p->m_op_bits & Operationrec::OP_STATE_MASK) != + Operationrec::OP_STATE_EXECUTED) { jam(); - /** - * If more than 1 trans in lock queue -> no lock upgrade - */ return; } } - check_lock_upgrade(signal, lock_owner, operationRecPtr); + startNext(signal, next); + + validate_lock_queue(prev); } }//Dbacc::commitOperation() -void -Dbacc::check_lock_upgrade(Signal* signal, - OperationrecPtr lock_owner, - OperationrecPtr release_op) -{ - if((lock_owner.p->transId1 == release_op.p->transId1 && - lock_owner.p->transId2 == release_op.p->transId2) || - release_op.p->lockMode || - lock_owner.p->nextSerialQue == RNIL) - { - jam(); - /** - * No lock upgrade if same trans or lock owner has no serial queue - * or releasing non shared op - */ - return; - } - - OperationrecPtr next; - next.i = lock_owner.p->nextSerialQue; - ptrCheckGuard(next, coprecsize, operationrec); +void +Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit) +{ + OperationrecPtr nextP; + OperationrecPtr nextS; + OperationrecPtr newOwner; + OperationrecPtr lastP; - if(lock_owner.p->transId1 != next.p->transId1 || - lock_owner.p->transId2 != next.p->transId2) + Uint32 opbits = opPtr.p->m_op_bits; + nextP.i = opPtr.p->nextParallelQue; + nextS.i = opPtr.p->nextSerialQue; + lastP.i = opPtr.p->m_lo_last_parallel_op_ptr_i; + Uint32 lastS = opPtr.p->m_lo_last_serial_op_ptr_i; + + ndbassert(lastP.i != RNIL || lastS != RNIL); + ndbassert(nextP.i != RNIL || nextS.i != RNIL); + + enum { + NOTHING, + CHECK_LOCK_UPGRADE, + START_NEW + } action = NOTHING; + + if (nextP.i != RNIL) { jam(); - /** - * No lock upgrad if !same trans in serial queue - */ - return; + ptrCheckGuard(nextP, coprecsize, operationrec); + newOwner = nextP; + + if (lastP.i == newOwner.i) + { + newOwner.p->m_lo_last_parallel_op_ptr_i = RNIL; + lastP = nextP; + } + else + { + ptrCheckGuard(lastP, coprecsize, operationrec); + newOwner.p->m_lo_last_parallel_op_ptr_i = lastP.i; + lastP.p->m_lock_owner_ptr_i = newOwner.i; + } + + newOwner.p->m_lo_last_serial_op_ptr_i = lastS; + newOwner.p->nextSerialQue = nextS.i; + + if (nextS.i != RNIL) + { + jam(); + ptrCheckGuard(nextS, coprecsize, operationrec); + ndbassert(nextS.p->prevSerialQue == opPtr.i); + nextS.p->prevSerialQue = newOwner.i; + } + + if (commit) + { + if ((opbits & Operationrec::OP_ACC_LOCK_MODE) == ZREADLOCK) + { + jam(); + /** + * Lock owner...committing a shared operation... + * this can be a lock upgrade + * + * e.g + * T1(R) T2(R) + * T2(X) + * + * If T1(R) commits T2(X) is supposed to run + * + * e.g + * T1(X) T1(R) + * T2(R) + * + * If T1(X) commits, then T1(R) _should_ commit before T2(R) is + * allowed to proceed + */ + action = CHECK_LOCK_UPGRADE; + } + else + { + jam(); + newOwner.p->m_op_bits |= Operationrec::OP_LOCK_MODE; + } + } + else + { + /** + * Aborting an operation can *always* lead to lock upgrade + */ + action = CHECK_LOCK_UPGRADE; + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; + if (opstate != Operationrec::OP_STATE_EXECUTED) + { + ndbassert(opstate == Operationrec::OP_STATE_RUNNING); + if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) + { + jam(); + report_dealloc(signal, opPtr.p); + newOwner.p->localdata[0] = ~(Uint32)0; + } + else + { + jam(); + newOwner.p->localdata[0] = opPtr.p->localdata[0]; + newOwner.p->localdata[1] = opPtr.p->localdata[1]; + } + action = START_NEW; + } + + /** + * Update ACC_LOCK_MODE + */ + if (opbits & Operationrec::OP_LOCK_MODE) + { + Uint32 nextbits = nextP.p->m_op_bits; + while ((nextbits & Operationrec::OP_LOCK_MODE) == 0) + { + ndbassert(nextbits & Operationrec::OP_ACC_LOCK_MODE); + nextbits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE; + nextP.p->m_op_bits = nextbits; + + if (nextP.p->nextParallelQue != RNIL) + { + nextP.i = nextP.p->nextParallelQue; + ptrCheckGuard(nextP, coprecsize, operationrec); + nextbits = nextP.p->m_op_bits; + } + else + { + break; + } + } + } + } } - - if (getNoParallelTransaction(lock_owner.p) > 1) + else { jam(); - /** - * No lock upgrade if more than 1 transaction in parallell queue - */ - return; - } + ptrCheckGuard(nextS, coprecsize, operationrec); + newOwner = nextS; + + newOwner.p->m_op_bits |= Operationrec::OP_RUN_QUEUE; + + if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) + { + report_dealloc(signal, opPtr.p); + newOwner.p->localdata[0] = ~(Uint32)0; + } + else + { + jam(); + newOwner.p->localdata[0] = opPtr.p->localdata[0]; + newOwner.p->localdata[1] = opPtr.p->localdata[1]; + } + + lastP = newOwner; + while (lastP.p->nextParallelQue != RNIL) + { + lastP.i = lastP.p->nextParallelQue; + ptrCheckGuard(lastP, coprecsize, operationrec); + lastP.p->m_op_bits |= Operationrec::OP_RUN_QUEUE; + } + + if (newOwner.i != lastP.i) + { + jam(); + newOwner.p->m_lo_last_parallel_op_ptr_i = lastP.i; + } + else + { + jam(); + newOwner.p->m_lo_last_parallel_op_ptr_i = RNIL; + } - if (getNoParallelTransaction(next.p) > 1) - { - jam(); - /** - * No lock upgrade if more than 1 transaction in next's parallell queue - */ - return; + if (newOwner.i != lastS) + { + jam(); + newOwner.p->m_lo_last_serial_op_ptr_i = lastS; + } + else + { + jam(); + newOwner.p->m_lo_last_serial_op_ptr_i = RNIL; + } + + action = START_NEW; } - OperationrecPtr tmp; - tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue; - if(tmp.i != RNIL) - { - ptrCheckGuard(tmp, coprecsize, operationrec); - ndbassert(tmp.p->prevSerialQue == next.i); - tmp.p->prevSerialQue = lock_owner.i; - } - next.p->nextSerialQue = next.p->prevSerialQue = RNIL; + insertLockOwnersList(signal, newOwner); - // Find end of parallell que - tmp = lock_owner; - Uint32 lockMode = next.p->lockMode > lock_owner.p->lockMode ? - next.p->lockMode : lock_owner.p->lockMode; - while(tmp.p->nextParallelQue != RNIL) + /** + * Copy op info, and store op in element + * + */ { - jam(); - tmp.i = tmp.p->nextParallelQue; - tmp.p->lockMode = lockMode; - ptrCheckGuard(tmp, coprecsize, operationrec); - } - tmp.p->lockMode = lockMode; - - next.p->prevParallelQue = tmp.i; - tmp.p->nextParallelQue = next.i; - - OperationrecPtr save = operationRecPtr; - - Uint32 localdata[2]; - localdata[0] = lock_owner.p->localdata[0]; - localdata[1] = lock_owner.p->localdata[1]; - do { - next.p->localdata[0] = localdata[0]; - next.p->localdata[1] = localdata[1]; - next.p->lockMode = lockMode; - - operationRecPtr = next; - executeNextOperation(signal); - if (next.p->nextParallelQue != RNIL) + newOwner.p->elementPage = opPtr.p->elementPage; + newOwner.p->elementIsforward = opPtr.p->elementIsforward; + newOwner.p->elementPointer = opPtr.p->elementPointer; + newOwner.p->elementContainer = opPtr.p->elementContainer; + newOwner.p->scanBits = opPtr.p->scanBits; + newOwner.p->hashvaluePart = opPtr.p->hashvaluePart; + newOwner.p->m_op_bits |= (opbits & Operationrec::OP_ELEMENT_DISAPPEARED); + if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) { + /* ------------------------------------------------------------------- */ + // If the elementIsDisappeared is set then we know that the + // hashValue is also set since it always originates from a + // committing abort or a aborting insert. + // Scans do not initialise the hashValue and must have this + // value initialised if they are + // to successfully commit the delete. + /* ------------------------------------------------------------------- */ jam(); - next.i = next.p->nextParallelQue; - ptrCheckGuard(next, coprecsize, operationrec); - } else { - jam(); - break; + newOwner.p->hashValue = opPtr.p->hashValue; }//if - } while (1); + + Page8Ptr pagePtr; + pagePtr.i = newOwner.p->elementPage; + ptrCheckGuard(pagePtr, cpagesize, page8); + const Uint32 tmp = ElementHeader::setLocked(newOwner.i); + arrGuard(newOwner.p->elementPointer, 2048); + pagePtr.p->word32[newOwner.p->elementPointer] = tmp; + } - operationRecPtr = save; + switch(action){ + case NOTHING: + validate_lock_queue(newOwner); + return; + case START_NEW: + startNew(signal, newOwner); + validate_lock_queue(newOwner); + return; + case CHECK_LOCK_UPGRADE: + startNext(signal, lastP); + validate_lock_queue(lastP); + break; + } } -/* ------------------------------------------------------------------------- */ -/* RELEASELOCK */ -/* RESETS LOCK OF AN ELEMENT. */ -/* INFORMATION ABOUT THE ELEMENT IS SAVED IN THE OPERATION RECORD */ -/* THESE INFORMATION IS USED TO UPDATE HEADER OF THE ELEMENT */ -/* ------------------------------------------------------------------------- */ -void Dbacc::releaselock(Signal* signal) -{ - OperationrecPtr rloOperPtr; - OperationrecPtr trlOperPtr; - OperationrecPtr trlTmpOperPtr; - Uint32 TelementIsDisappeared; - - trlOperPtr.i = RNIL; - if (operationRecPtr.p->nextParallelQue != RNIL) { - jam(); - /** --------------------------------------------------------------------- - * NEXT OPERATION TAKES OVER THE LOCK. - * We will simply move the info from the leader - * to the new queue leader. - * -------------------------------------------------------------------- */ - trlOperPtr.i = operationRecPtr.p->nextParallelQue; - ptrCheckGuard(trlOperPtr, coprecsize, operationrec); - copyInOperPtr = trlOperPtr; - copyOperPtr = operationRecPtr; - copyOpInfo(signal); - trlOperPtr.p->prevParallelQue = RNIL; - if (operationRecPtr.p->nextSerialQue != RNIL) { - jam(); - /* ----------------------------------------------------------------- - * THERE IS A SERIAL QUEUE. MOVE IT FROM RELEASED OP REC TO THE - * NEW LOCK OWNER. - * ------------------------------------------------------------------ */ - trlOperPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue; - trlTmpOperPtr.i = trlOperPtr.p->nextSerialQue; - ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec); - trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i; - }//if - - check_lock_upgrade(signal, copyInOperPtr, operationRecPtr); - } else { - ndbrequire(operationRecPtr.p->nextSerialQue != RNIL); - jam(); - /** --------------------------------------------------------------------- - * THE PARALLEL QUEUE IS EMPTY AND THE SERIAL QUEUE IS NOT EMPTY. - * WE NEED TO REARRANGE LISTS AND START A NUMBER OF OPERATIONS. - * -------------------------------------------------------------------- */ - trlOperPtr.i = operationRecPtr.p->nextSerialQue; - ptrCheckGuard(trlOperPtr, coprecsize, operationrec); - copyOperPtr = operationRecPtr; - copyInOperPtr = trlOperPtr; - copyOpInfo(signal); - trlOperPtr.p->prevSerialQue = RNIL; - ndbrequire(trlOperPtr.p->prevParallelQue == RNIL); - /* --------------------------------------------------------------------- */ - /* WE HAVE MOVED TO THE NEXT PARALLEL QUEUE. WE MUST START ALL OF THOSE */ - /* OPERATIONS WHICH UP TILL NOW HAVE BEEN QUEUED WAITING FOR THE LOCK. */ - /* --------------------------------------------------------------------- */ - rloOperPtr = operationRecPtr; - trlTmpOperPtr = trlOperPtr; - TelementIsDisappeared = trlOperPtr.p->elementIsDisappeared; - Uint32 ThashValue = trlOperPtr.p->hashValue; - do { - /* ------------------------------------------------------------------ */ - // Ensure that all operations in the queue are assigned with the - // elementIsDisappeared to ensure that the element is removed after - // a previous delete. An insert does however revert this decision - // since the element is put back again. - // Local checkpoints complicate life here since they do not - // execute the next operation but simply change - // the state on the operation. - // We need to set-up the variable elementIsDisappeared - // properly even when local checkpoints and inserts/writes after - // deletes occur. - /* ------------------------------------------------------------------- */ - trlTmpOperPtr.p->elementIsDisappeared = TelementIsDisappeared; - if (TelementIsDisappeared == ZTRUE) { - /* ----------------------------------------------------------------- */ - // If the elementIsDisappeared is set then we know that the - // hashValue is also set since it always originates from a - // committing abort or a aborting insert. - // Scans do not initialise the hashValue and must have this - // value initialised if they are to successfully commit the delete. - /* ----------------------------------------------------------------- */ - jam(); - trlTmpOperPtr.p->hashValue = ThashValue; - }//if - trlTmpOperPtr.p->localdata[0] = trlOperPtr.p->localdata[0]; - trlTmpOperPtr.p->localdata[1] = trlOperPtr.p->localdata[1]; - /* ------------------------------------------------------------------- */ - // Restart the queued operation. - /* ------------------------------------------------------------------- */ - operationRecPtr = trlTmpOperPtr; - TelementIsDisappeared = executeNextOperation(signal); - ThashValue = operationRecPtr.p->hashValue; - if (trlTmpOperPtr.p->nextParallelQue != RNIL) { - jam(); - /* ----------------------------------------------------------------- */ - // We will continue with the next operation in the parallel - // queue and start this as well. - /* ----------------------------------------------------------------- */ - trlTmpOperPtr.i = trlTmpOperPtr.p->nextParallelQue; - ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec); - } else { - jam(); - break; - }//if - } while (1); - operationRecPtr = rloOperPtr; - }//if - - // Insert the next op into the lock owner list - insertLockOwnersList(signal, trlOperPtr); - return; -}//Dbacc::releaselock() - -/* --------------------------------------------------------------------------------- */ -/* COPY_OP_INFO */ -/* INPUT: COPY_IN_OPER_PTR AND COPY_OPER_PTR. */ -/* DESCRIPTION:INFORMATION ABOUT THE ELEMENT WILL BE MOVED FROM OPERATION */ -/* REC TO QUEUE OP REC. QUE OP REC TAKES OVER THE LOCK. */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::copyOpInfo(Signal* signal) +void +Dbacc::startNew(Signal* signal, OperationrecPtr newOwner) { - Page8Ptr coiPageidptr; - - copyInOperPtr.p->elementPage = copyOperPtr.p->elementPage; - copyInOperPtr.p->elementIsforward = copyOperPtr.p->elementIsforward; - copyInOperPtr.p->elementContainer = copyOperPtr.p->elementContainer; - copyInOperPtr.p->elementPointer = copyOperPtr.p->elementPointer; - copyInOperPtr.p->scanBits = copyOperPtr.p->scanBits; - copyInOperPtr.p->hashvaluePart = copyOperPtr.p->hashvaluePart; - copyInOperPtr.p->elementIsDisappeared = copyOperPtr.p->elementIsDisappeared; - if (copyInOperPtr.p->elementIsDisappeared == ZTRUE) { - /* --------------------------------------------------------------------------------- */ - // If the elementIsDisappeared is set then we know that the hashValue is also set - // since it always originates from a committing abort or a aborting insert. Scans - // do not initialise the hashValue and must have this value initialised if they are - // to successfully commit the delete. - /* --------------------------------------------------------------------------------- */ - jam(); - copyInOperPtr.p->hashValue = copyOperPtr.p->hashValue; - }//if - coiPageidptr.i = copyOperPtr.p->elementPage; - ptrCheckGuard(coiPageidptr, cpagesize, page8); - const Uint32 tmp = ElementHeader::setLocked(copyInOperPtr.i); - dbgWord32(coiPageidptr, copyOperPtr.p->elementPointer, tmp); - arrGuard(copyOperPtr.p->elementPointer, 2048); - coiPageidptr.p->word32[copyOperPtr.p->elementPointer] = tmp; - copyInOperPtr.p->localdata[0] = copyOperPtr.p->localdata[0]; - copyInOperPtr.p->localdata[1] = copyOperPtr.p->localdata[1]; -}//Dbacc::copyOpInfo() + OperationrecPtr save = operationRecPtr; + operationRecPtr = newOwner; + + Uint32 opbits = newOwner.p->m_op_bits; + Uint32 op = opbits & Operationrec::OP_MASK; + Uint32 opstate = (opbits & Operationrec::OP_STATE_MASK); + ndbassert(opstate == Operationrec::OP_STATE_WAITING); + ndbassert(opbits & Operationrec::OP_LOCK_OWNER); + const bool deleted = opbits & Operationrec::OP_ELEMENT_DISAPPEARED; + Uint32 errCode = 0; + + opbits &= opbits & ~(Uint32)Operationrec::OP_STATE_MASK; + opbits |= Operationrec::OP_STATE_RUNNING; + + if (op == ZSCAN_OP && (opbits & Operationrec::OP_LOCK_REQ) == 0) + goto scan; -/* ******************--------------------------------------------------------------- */ -/* EXECUTE NEXT OPERATION */ -/* NEXT OPERATION IN A LOCK QUEUE WILL BE EXECUTED. */ -/* --------------------------------------------------------------------------------- */ -Uint32 Dbacc::executeNextOperation(Signal* signal) -{ - ndbrequire(operationRecPtr.p->transactionstate == ACTIVE); - if (operationRecPtr.p->elementIsDisappeared == ZTRUE) { - /* --------------------------------------------------------------------- */ - /* PREVIOUS OPERATION WAS DELETE OPERATION AND THE ELEMENT IS DELETED. */ - /* --------------------------------------------------------------------- */ - if (((operationRecPtr.p->operation != ZINSERT) && - (operationRecPtr.p->operation != ZWRITE)) || - (operationRecPtr.p->prevParallelQue != RNIL)) { - if (operationRecPtr.p->operation != ZSCAN_OP || - operationRecPtr.p->isAccLockReq) { - jam(); - /* ----------------------------------------------------------------- */ - // Updates and reads with a previous delete simply aborts with read - // error indicating that tuple did not exist. - // Also inserts and writes not being the first operation. - /* ----------------------------------------------------------------- */ - operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT; - signal->theData[0] = operationRecPtr.p->userptr; - signal->theData[1] = ZREAD_ERROR; - sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, - 2, JBB); - return operationRecPtr.p->elementIsDisappeared; - } else { - /* ----------------------------------------------------------------- */ - /* ABORT OF OPERATION NEEDED BUT THE OPERATION IS A - * SCAN => SPECIAL TREATMENT. - * IF THE SCAN WAITS IN QUEUE THEN WE MUST REMOVE THE OPERATION - * FROM THE SCAN LOCK QUEUE AND IF NO MORE OPERATIONS ARE QUEUED - * THEN WE SHOULD RESTART THE SCAN PROCESS. OTHERWISE WE SIMPLY - * RELEASE THE OPERATION AND DECREASE THE NUMBER OF LOCKS HELD. - * ----------------------------------------------------------------- */ - takeOutScanLockQueue(operationRecPtr.p->scanRecPtr); - putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr); - return operationRecPtr.p->elementIsDisappeared; - }//if - }//if - /* --------------------------------------------------------------------- */ - // Insert and writes can continue but need to be converted to inserts. - /* --------------------------------------------------------------------- */ - jam(); - operationRecPtr.p->elementIsDisappeared = ZFALSE; - operationRecPtr.p->operation = ZINSERT; - operationRecPtr.p->insertIsDone = ZTRUE; - } else if (operationRecPtr.p->operation == ZINSERT) { - bool abortFlag = true; - if (operationRecPtr.p->prevParallelQue != RNIL) { - OperationrecPtr prevOpPtr; - jam(); - prevOpPtr.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(prevOpPtr, coprecsize, operationrec); - if (prevOpPtr.p->operation == ZDELETE) { - jam(); - abortFlag = false; - }//if - }//if - if (abortFlag) { - jam(); - /* ------------------------------------------------------------------- */ - /* ELEMENT STILL REMAINS AND WE ARE TRYING TO INSERT IT AGAIN. */ - /* THIS IS CLEARLY NOT A GOOD IDEA. */ - /* ------------------------------------------------------------------- */ - operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT; - signal->theData[0] = operationRecPtr.p->userptr; - signal->theData[1] = ZWRITE_ERROR; - sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, - 2, JBB); - return operationRecPtr.p->elementIsDisappeared; - }//if - } - else if(operationRecPtr.p->operation == ZWRITE) + if (deleted) { jam(); - operationRecPtr.p->operation = ZUPDATE; - if (operationRecPtr.p->prevParallelQue != RNIL) { - OperationrecPtr prevOpPtr; - jam(); - prevOpPtr.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(prevOpPtr, coprecsize, operationrec); - if (prevOpPtr.p->operation == ZDELETE) - { - jam(); - operationRecPtr.p->operation = ZINSERT; - } + if (op != ZINSERT && op != ZWRITE) + { + errCode = ZREAD_ERROR; + goto ref; } + + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits &= ~(Uint32)Operationrec::OP_ELEMENT_DISAPPEARED; + opbits |= (op = ZINSERT); + opbits |= Operationrec::OP_INSERT_IS_DONE; + goto conf; } - - if (operationRecPtr.p->operation == ZSCAN_OP && - ! operationRecPtr.p->isAccLockReq) { + else if (op == ZINSERT) + { jam(); - takeOutScanLockQueue(operationRecPtr.p->scanRecPtr); - putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr); - } else { + errCode = ZWRITE_ERROR; + goto ref; + } + else if (op == ZWRITE) + { jam(); - sendAcckeyconf(signal); - sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYCONF, - signal, 6, JBB); - }//if - return operationRecPtr.p->elementIsDisappeared; -}//Dbacc::executeNextOperation() + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits |= (op = ZUPDATE); + goto conf; + } + +conf: + newOwner.p->m_op_bits = opbits; + + sendAcckeyconf(signal); + sendSignal(newOwner.p->userblockref, GSN_ACCKEYCONF, + signal, 6, JBB); + + operationRecPtr = save; + return; + +scan: + jam(); + newOwner.p->m_op_bits = opbits; + + takeOutScanLockQueue(newOwner.p->scanRecPtr); + putReadyScanQueue(signal, newOwner.p->scanRecPtr); + + operationRecPtr = save; + return; + +ref: + newOwner.p->m_op_bits = opbits; + + signal->theData[0] = newOwner.p->userptr; + signal->theData[1] = errCode; + sendSignal(newOwner.p->userblockref, GSN_ACCKEYREF, signal, + 2, JBB); + + operationRecPtr = save; + return; +} /** * takeOutLockOwnersList @@ -3969,7 +4886,6 @@ void Dbacc::takeOutLockOwnersList(Signal* signal, { const Uint32 Tprev = outOperPtr.p->prevLockOwnerOp; const Uint32 Tnext = outOperPtr.p->nextLockOwnerOp; - #ifdef VM_TRACE // Check that operation is already in the list OperationrecPtr tmpOperPtr; @@ -3984,8 +4900,7 @@ void Dbacc::takeOutLockOwnersList(Signal* signal, ndbrequire(inList == true); #endif - ndbrequire(outOperPtr.p->lockOwner == ZTRUE); - outOperPtr.p->lockOwner = ZFALSE; + ndbassert(outOperPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); // Fast path through the code for the common case. if ((Tprev == RNIL) && (Tnext == RNIL)) { @@ -4026,7 +4941,6 @@ void Dbacc::insertLockOwnersList(Signal* signal, const OperationrecPtr& insOperPtr) { OperationrecPtr tmpOperPtr; - #ifdef VM_TRACE // Check that operation is not already in list tmpOperPtr.i = fragrecptr.p->lockOwnersList; @@ -4036,18 +4950,15 @@ void Dbacc::insertLockOwnersList(Signal* signal, tmpOperPtr.i = tmpOperPtr.p->nextLockOwnerOp; } #endif + tmpOperPtr.i = fragrecptr.p->lockOwnersList; + + ndbrequire(! (insOperPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER)); - ndbrequire(insOperPtr.p->lockOwner == ZFALSE); - - insOperPtr.p->lockOwner = ZTRUE; + insOperPtr.p->m_op_bits |= Operationrec::OP_LOCK_OWNER; insOperPtr.p->prevLockOwnerOp = RNIL; - tmpOperPtr.i = fragrecptr.p->lockOwnersList; - const Uint32 seq = fragrecptr.p->m_current_sequence_no; insOperPtr.p->nextLockOwnerOp = tmpOperPtr.i; - insOperPtr.p->m_sequence_no = seq; fragrecptr.p->lockOwnersList = insOperPtr.i; - fragrecptr.p->m_current_sequence_no = seq+1; if (tmpOperPtr.i == RNIL) { return; } else { @@ -5407,6 +6318,7 @@ void Dbacc::execNEXT_SCANREQ(Signal* signal) if (!scanPtr.p->scanReadCommittedFlag) { commitOperation(signal); }//if + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; takeOutActiveScanOp(signal); releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; @@ -5422,9 +6334,10 @@ void Dbacc::execNEXT_SCANREQ(Signal* signal) jam(); fragrecptr.i = scanPtr.p->activeLocalFrag; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); - /* --------------------------------------------------------------------------------- */ - /* THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL. RELESE ALL INVOLVED REC. */ - /* --------------------------------------------------------------------------------- */ + /* --------------------------------------------------------------------- + * THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL. + * RELESE ALL INVOLVED REC. + * ------------------------------------------------------------------- */ releaseScanLab(signal); return; break; @@ -5474,24 +6387,26 @@ void Dbacc::checkNextBucketLab(Signal* signal) scanPtr.p->nextBucketIndex++; if (scanPtr.p->scanBucketState == ScanRec::SECOND_LAP) { if (scanPtr.p->nextBucketIndex > scanPtr.p->maxBucketIndexToRescan) { - /* --------------------------------------------------------------------------------- */ - // We have finished the rescan phase. We are ready to proceed with the next fragment part. - /* --------------------------------------------------------------------------------- */ + /* ---------------------------------------------------------------- */ + // We have finished the rescan phase. + // We are ready to proceed with the next fragment part. + /* ---------------------------------------------------------------- */ jam(); checkNextFragmentLab(signal); return; }//if } else if (scanPtr.p->scanBucketState == ScanRec::FIRST_LAP) { if ((fragrecptr.p->p + fragrecptr.p->maxp) < scanPtr.p->nextBucketIndex) { - /* --------------------------------------------------------------------------------- */ + /* ---------------------------------------------------------------- */ // All buckets have been scanned a first time. - /* --------------------------------------------------------------------------------- */ + /* ---------------------------------------------------------------- */ if (scanPtr.p->minBucketIndexToRescan == 0xFFFFFFFF) { jam(); - /* --------------------------------------------------------------------------------- */ - // We have not had any merges behind the scan. Thus it is not necessary to perform - // any rescan any buckets and we can proceed immediately with the next fragment part. - /* --------------------------------------------------------------------------------- */ + /* -------------------------------------------------------------- */ + // We have not had any merges behind the scan. + // Thus it is not necessary to perform any rescan any buckets + // and we can proceed immediately with the next fragment part. + /* --------------------------------------------------------------- */ checkNextFragmentLab(signal); return; } else { @@ -5583,18 +6498,23 @@ void Dbacc::checkNextBucketLab(Signal* signal) tslElementptr = tnsElementptr; setlock(signal); insertLockOwnersList(signal, operationRecPtr); + operationRecPtr.p->m_op_bits |= + Operationrec::OP_STATE_RUNNING | Operationrec::OP_RUN_QUEUE; }//if } else { arrGuard(tnsElementptr, 2048); queOperPtr.i = ElementHeader::getOpPtrI(nsPageptr.p->word32[tnsElementptr]); ptrCheckGuard(queOperPtr, coprecsize, operationrec); - if (queOperPtr.p->elementIsDisappeared == ZTRUE) { + if (queOperPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED || + queOperPtr.p->localdata[0] == ~(Uint32)0) + { jam(); - /* --------------------------------------------------------------------------------- */ - // If the lock owner indicates the element is disappeared then we will not report this - // tuple. We will continue with the next tuple. - /* --------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------ */ + // If the lock owner indicates the element is disappeared then + // we will not report this tuple. We will continue with the next tuple. + /* ------------------------------------------------------------------ */ + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; signal->theData[0] = scanPtr.i; @@ -5606,31 +6526,29 @@ void Dbacc::checkNextBucketLab(Signal* signal) Uint32 return_result; if (scanPtr.p->scanLockMode == ZREADLOCK) { jam(); - priPageptr = nsPageptr; - tpriElementptr = tnsElementptr; - return_result = placeReadInLockQueue(signal); + return_result = placeReadInLockQueue(queOperPtr); } else { jam(); - pwiPageptr = nsPageptr; - tpwiElementptr = tnsElementptr; - return_result = placeWriteInLockQueue(signal); + return_result = placeWriteInLockQueue(queOperPtr); }//if if (return_result == ZSERIAL_QUEUE) { - /* --------------------------------------------------------------------------------- */ - /* WE PLACED THE OPERATION INTO A SERIAL QUEUE AND THUS WE HAVE TO WAIT FOR */ - /* THE LOCK TO BE RELEASED. WE CONTINUE WITH THE NEXT ELEMENT. */ - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------- + * WE PLACED THE OPERATION INTO A SERIAL QUEUE AND THUS WE HAVE TO + * WAIT FOR THE LOCK TO BE RELEASED. WE CONTINUE WITH THE NEXT ELEMENT + * ----------------------------------------------------------------- */ putOpScanLockQue(); /* PUT THE OP IN A QUE IN THE SCAN REC */ signal->theData[0] = scanPtr.i; signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP; sendSignal(cownBlockref, GSN_ACC_CHECK_SCAN, signal, 2, JBB); return; - } else if (return_result == ZWRITE_ERROR) { + } else if (return_result != ZPARALLEL_QUEUE) { jam(); - /* --------------------------------------------------------------------------------- */ - // The tuple is either not committed yet or a delete in the same transaction (not - // possible here since we are a scan). Thus we simply continue with the next tuple. - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------- */ + // The tuple is either not committed yet or a delete in + // the same transaction (not possible here since we are a scan). + // Thus we simply continue with the next tuple. + /* ----------------------------------------------------------------- */ + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; signal->theData[0] = scanPtr.i; @@ -5641,10 +6559,11 @@ void Dbacc::checkNextBucketLab(Signal* signal) ndbassert(return_result == ZPARALLEL_QUEUE); }//if }//if - /* --------------------------------------------------------------------------------- */ - // Committed read proceed without caring for locks immediately down here except when - // the tuple was deleted permanently and no new operation has inserted it again. - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------------- */ + // Committed read proceed without caring for locks immediately + // down here except when the tuple was deleted permanently + // and no new operation has inserted it again. + /* ----------------------------------------------------------------------- */ putActiveScanOp(signal); sendNextScanConf(signal); return; @@ -5668,14 +6587,15 @@ void Dbacc::initScanFragmentPart(Signal* signal) DirRangePtr cnfDirRangePtr; DirectoryarrayPtr cnfDirptr; Page8Ptr cnfPageidptr; - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------------- */ // Set the active fragment part. // Set the current bucket scanned to the first. // Start with the first lap. // Remember the number of buckets at start of the scan. - // Set the minimum and maximum to values that will always be smaller and larger than. + // Set the minimum and maximum to values that will always be smaller and + // larger than. // Reset the scan indicator on the first bucket. - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------------- */ scanPtr.p->activeLocalFrag = fragrecptr.i; scanPtr.p->nextBucketIndex = 0; /* INDEX OF SCAN BUCKET */ scanPtr.p->scanBucketState = ScanRec::FIRST_LAP; @@ -5694,11 +6614,11 @@ void Dbacc::initScanFragmentPart(Signal* signal) releaseScanBucket(signal); }//Dbacc::initScanFragmentPart() -/* --------------------------------------------------------------------------------- */ -/* FLAG = 6 = ZCOPY_CLOSE THE SCAN PROCESS IS READY OR ABORTED. ALL OPERATION IN THE */ -/* ACTIVE OR WAIT QUEUE ARE RELEASED, SCAN FLAG OF ROOT FRAG IS RESET AND THE SCAN */ -/* RECORD IS RELEASED. */ -/* --------------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- + * FLAG = 6 = ZCOPY_CLOSE THE SCAN PROCESS IS READY OR ABORTED. + * ALL OPERATION IN THE ACTIVE OR WAIT QUEUE ARE RELEASED, + * SCAN FLAG OF ROOT FRAG IS RESET AND THE SCAN RECORD IS RELEASED. + * ------------------------------------------------------------------------ */ void Dbacc::releaseScanLab(Signal* signal) { releaseAndCommitActiveOps(signal); @@ -5737,8 +6657,17 @@ void Dbacc::releaseAndCommitActiveOps(Signal* signal) ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); if (!scanPtr.p->scanReadCommittedFlag) { jam(); - commitOperation(signal); + if ((operationRecPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) == + Operationrec::OP_STATE_EXECUTED) + { + commitOperation(signal); + } + else + { + abortOperation(signal); + } }//if + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; takeOutActiveScanOp(signal); releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; @@ -5759,8 +6688,17 @@ void Dbacc::releaseAndCommitQueuedOps(Signal* signal) ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); if (!scanPtr.p->scanReadCommittedFlag) { jam(); - commitOperation(signal); + if ((operationRecPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) == + Operationrec::OP_STATE_EXECUTED) + { + commitOperation(signal); + } + else + { + abortOperation(signal); + } }//if + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; takeOutReadyScanQueue(signal); releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; @@ -5783,6 +6721,7 @@ void Dbacc::releaseAndAbortLockedOps(Signal* signal) { abortOperation(signal); }//if takeOutScanLockQueue(scanPtr.i); + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; operationRecPtr.i = trsoOperPtr.i; @@ -5817,9 +6756,11 @@ void Dbacc::execACC_CHECK_SCAN(Signal* signal) takeOutReadyScanQueue(signal); fragrecptr.i = operationRecPtr.p->fragptr; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); - if (operationRecPtr.p->elementIsDisappeared == ZTRUE) { + if (operationRecPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED) + { jam(); abortOperation(signal); + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; continue; @@ -5906,7 +6847,8 @@ void Dbacc::execACC_TO_REQ(Signal* signal) jamEntry(); tatrOpPtr.i = signal->theData[1]; /* OPER PTR OF ACC */ ptrCheckGuard(tatrOpPtr, coprecsize, operationrec); - if (tatrOpPtr.p->operation == ZSCAN_OP) { + if ((tatrOpPtr.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP) + { tatrOpPtr.p->transId1 = signal->theData[2]; tatrOpPtr.p->transId2 = signal->theData[3]; } else { @@ -6003,32 +6945,28 @@ void Dbacc::initScanOpRec(Signal* signal) scanPtr.p->scanOpsAllocated++; + Uint32 opbits = 0; + opbits |= ZSCAN_OP; + opbits |= scanPtr.p->scanLockMode ? Operationrec::OP_LOCK_MODE : 0; + opbits |= scanPtr.p->scanLockMode ? Operationrec::OP_ACC_LOCK_MODE : 0; + opbits |= scanPtr.p->scanReadCommittedFlag ? + Operationrec::OP_EXECUTED_DIRTY_READ : 0; + opbits |= Operationrec::OP_COMMIT_DELETE_CHECK; operationRecPtr.p->userptr = RNIL; operationRecPtr.p->scanRecPtr = scanPtr.i; - operationRecPtr.p->operation = ZSCAN_OP; - operationRecPtr.p->transactionstate = ACTIVE; - operationRecPtr.p->commitDeleteCheckFlag = ZFALSE; - operationRecPtr.p->lockMode = scanPtr.p->scanLockMode; operationRecPtr.p->fid = fragrecptr.p->myfid; operationRecPtr.p->fragptr = fragrecptr.i; - operationRecPtr.p->elementIsDisappeared = ZFALSE; operationRecPtr.p->nextParallelQue = RNIL; operationRecPtr.p->prevParallelQue = RNIL; operationRecPtr.p->nextSerialQue = RNIL; operationRecPtr.p->prevSerialQue = RNIL; - operationRecPtr.p->prevQueOp = RNIL; - operationRecPtr.p->nextQueOp = RNIL; - operationRecPtr.p->keyinfoPage = RNIL; // Safety precaution operationRecPtr.p->transId1 = scanPtr.p->scanTrid1; operationRecPtr.p->transId2 = scanPtr.p->scanTrid2; - operationRecPtr.p->lockOwner = ZFALSE; - operationRecPtr.p->dirtyRead = 0; - operationRecPtr.p->nodeType = 0; // Not a stand-by node operationRecPtr.p->elementIsforward = tisoIsforward; operationRecPtr.p->elementContainer = tisoContainerptr; operationRecPtr.p->elementPointer = tisoElementptr; operationRecPtr.p->elementPage = isoPageptr.i; - operationRecPtr.p->isAccLockReq = ZFALSE; + operationRecPtr.p->m_op_bits = opbits; tisoLocalPtr = tisoElementptr + tisoIsforward; guard24 = fragrecptr.p->localkeylen - 1; for (tisoTmp = 0; tisoTmp <= guard24; tisoTmp++) { @@ -6038,7 +6976,6 @@ void Dbacc::initScanOpRec(Signal* signal) tisoLocalPtr = tisoLocalPtr + tisoIsforward; }//for arrGuard(tisoLocalPtr, 2048); - operationRecPtr.p->keydata[0] = isoPageptr.p->word32[tisoLocalPtr]; operationRecPtr.p->tupkeylen = fragrecptr.p->keyLength; operationRecPtr.p->xfrmtupkeylen = 0; // not used }//Dbacc::initScanOpRec() @@ -6894,14 +7831,12 @@ void Dbacc::releaseOpRec(Signal* signal) } ndbrequire(opInList == false); #endif - ndbrequire(operationRecPtr.p->lockOwner == ZFALSE); + ndbrequire(operationRecPtr.p->m_op_bits == Operationrec::OP_INITIAL); operationRecPtr.p->nextOp = cfreeopRec; cfreeopRec = operationRecPtr.i; /* UPDATE FREE LIST OF OP RECORDS */ operationRecPtr.p->prevOp = RNIL; - operationRecPtr.p->opState = FREE_OP; - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; }//Dbacc::releaseOpRec() /* --------------------------------------------------------------------------------- */ @@ -7403,8 +8338,8 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal) OperationrecPtr tmpOpPtr; tmpOpPtr.i = recordNo; ptrAss(tmpOpPtr, operationrec); - infoEvent("Dbacc::operationrec[%d]: opState=%d, transid(0x%x, 0x%x)", - tmpOpPtr.i, tmpOpPtr.p->opState, tmpOpPtr.p->transId1, + infoEvent("Dbacc::operationrec[%d]: transid(0x%x, 0x%x)", + tmpOpPtr.i, tmpOpPtr.p->transId1, tmpOpPtr.p->transId2); infoEvent("elementIsforward=%d, elementPage=%d, elementPointer=%d ", tmpOpPtr.p->elementIsforward, tmpOpPtr.p->elementPage, @@ -7412,30 +8347,19 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal) infoEvent("fid=%d, fragptr=%d, hashvaluePart=%d ", tmpOpPtr.p->fid, tmpOpPtr.p->fragptr, tmpOpPtr.p->hashvaluePart); - infoEvent("hashValue=%d, insertDeleteLen=%d, keyinfoPage=%d ", - tmpOpPtr.p->hashValue, tmpOpPtr.p->insertDeleteLen, - tmpOpPtr.p->keyinfoPage); + infoEvent("hashValue=%d", tmpOpPtr.p->hashValue); infoEvent("nextLockOwnerOp=%d, nextOp=%d, nextParallelQue=%d ", tmpOpPtr.p->nextLockOwnerOp, tmpOpPtr.p->nextOp, tmpOpPtr.p->nextParallelQue); - infoEvent("nextQueOp=%d, nextSerialQue=%d, prevOp=%d ", - tmpOpPtr.p->nextQueOp, tmpOpPtr.p->nextSerialQue, + infoEvent("nextSerialQue=%d, prevOp=%d ", + tmpOpPtr.p->nextSerialQue, tmpOpPtr.p->prevOp); - infoEvent("prevLockOwnerOp=%d, prevParallelQue=%d, prevQueOp=%d ", - tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue, - tmpOpPtr.p->prevQueOp); - infoEvent("prevSerialQue=%d, scanRecPtr=%d, longPagePtr=%d ", - tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr, - tmpOpPtr.p->longPagePtr); - infoEvent("transactionstate=%d, elementIsDisappeared=%d, insertIsDone=%d ", - tmpOpPtr.p->transactionstate, tmpOpPtr.p->elementIsDisappeared, - tmpOpPtr.p->insertIsDone); - infoEvent("lockMode=%d, lockOwner=%d, nodeType=%d ", - tmpOpPtr.p->lockMode, tmpOpPtr.p->lockOwner, - tmpOpPtr.p->nodeType); - infoEvent("operation=%d, opSimple=%d, dirtyRead=%d,scanBits=%d ", - tmpOpPtr.p->operation, tmpOpPtr.p->opSimple, - tmpOpPtr.p->dirtyRead, tmpOpPtr.p->scanBits); + infoEvent("prevLockOwnerOp=%d, prevParallelQue=%d", + tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue); + infoEvent("prevSerialQue=%d, scanRecPtr=%d", + tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr); + infoEvent("m_op_bits=0x%x, scanBits=%d ", + tmpOpPtr.p->m_op_bits, tmpOpPtr.p->scanBits); return; } |