diff options
author | unknown <jonas@perch.ndb.mysql.com> | 2006-05-18 10:17:53 +0200 |
---|---|---|
committer | unknown <jonas@perch.ndb.mysql.com> | 2006-05-18 10:17:53 +0200 |
commit | be0ab479b822a3b9d065dc659ef59d5450f2270b (patch) | |
tree | d32aff29244d0593c63fcd197f040618e049d1b9 | |
parent | 7d6ee98f1c0be068755cef4baa586b99fd98e604 (diff) | |
download | mariadb-git-be0ab479b822a3b9d065dc659ef59d5450f2270b.tar.gz |
ndb - bug#19293 and family
introduce acc per row logical mutex to fix difficult error handling cases
storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp:
1) Fix per row mutex so that only 1 op at a time is running on a row
2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) }
3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue
4) Impl. a validate_lock_queue/dump_lock_queue test framework
storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp:
1) Fix per row mutex so that only 1 op at a time is running on a row
2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) }
3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue
4) Impl. a validate_lock_queue/dump_lock_queue test framework
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
1) Fix per row mutex so that only 1 op at a time is running on a row
2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) }
3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue
4) Impl. a validate_lock_queue/dump_lock_queue test framework
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
1) impl. a new read key from operation record needed by acc
2) expand TRACE_OP toolkit
3) impl. ACCKEY_ORD as needed by ACC changes
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
1) impl. a new read key from operation record needed by acc
2) expand TRACE_OP toolkit
3) impl. ACCKEY_ORD as needed by ACC changes
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
remove unused states/methods
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp:
remove extremly tricky code that handles disk_insert_but_no_mem_insert
that is no long needed with current acc changes
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp:
remove unused states/methods
storage/ndb/test/ndbapi/testOperations.cpp:
renable last 3 lock upgrade testcases since they now pass
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp | 133 | ||||
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp | 4 | ||||
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 3098 | ||||
-rw-r--r-- | storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp | 29 | ||||
-rw-r--r-- | storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 417 | ||||
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp | 3 | ||||
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp | 120 | ||||
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp | 1 | ||||
-rw-r--r-- | storage/ndb/test/ndbapi/testOperations.cpp | 4 |
9 files changed, 2396 insertions, 1413 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp index 8373004fd0c..b1638599dcc 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp +++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp @@ -17,14 +17,13 @@ #ifndef DBACC_H #define DBACC_H - +#ifdef VM_TRACE +#define ACC_SAFE_QUEUE +#endif #include <pc.hpp> #include <SimulatedBlock.hpp> -// primary key is stored in TUP -#include "../dbtup/Dbtup.hpp" - #ifdef DBACC_C // Debug Macros #define dbgWord32(ptr, ind, val) @@ -135,7 +134,10 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: " #define ZRIGHT 2 #define ZROOTFRAGMENTSIZE 32 #define ZSCAN_LOCK_ALL 3 -#define ZSCAN_OP 5 +/** + * Check kernel_types for other operation types + */ +#define ZSCAN_OP 6 #define ZSCAN_REC_SIZE 256 #define ZSTAND_BY 2 #define ZTABLESIZE 16 @@ -477,6 +479,7 @@ struct Fragmentrec { /* OPERATIONREC */ /* --------------------------------------------------------------------------------- */ struct Operationrec { + Uint32 m_op_bits; Uint32 localdata[2]; Uint32 elementIsforward; Uint32 elementPage; @@ -485,36 +488,62 @@ struct Operationrec { Uint32 fragptr; Uint32 hashvaluePart; Uint32 hashValue; - Uint32 insertDeleteLen; Uint32 nextLockOwnerOp; Uint32 nextOp; Uint32 nextParallelQue; - Uint32 nextSerialQue; + union { + Uint32 nextSerialQue; + Uint32 m_lock_owner_ptr_i; // if nextParallelQue = RNIL, else undefined + }; Uint32 prevOp; Uint32 prevLockOwnerOp; - Uint32 prevParallelQue; - Uint32 prevSerialQue; + union { + Uint32 prevParallelQue; + Uint32 m_lo_last_parallel_op_ptr_i; + }; + union { + Uint32 prevSerialQue; + Uint32 m_lo_last_serial_op_ptr_i; + }; Uint32 scanRecPtr; Uint32 transId1; Uint32 transId2; - State opState; Uint32 userptr; - State transactionstate; Uint16 elementContainer; Uint16 tupkeylen; Uint32 xfrmtupkeylen; Uint32 userblockref; Uint32 scanBits; - Uint8 elementIsDisappeared; - Uint8 insertIsDone; - Uint8 lockMode; - Uint8 lockOwner; - Uint8 nodeType; - Uint8 operation; - Uint8 opSimple; - Uint8 dirtyRead; - Uint8 commitDeleteCheckFlag; - Uint8 isAccLockReq; + + enum OpBits { + OP_MASK = 0x0000F // 4 bits for operation type + ,OP_LOCK_MODE = 0x00010 // 0 - shared lock, 1 = exclusive lock + ,OP_ACC_LOCK_MODE = 0x00020 // Or:de lock mode of all operation + // before me + ,OP_LOCK_OWNER = 0x00040 + ,OP_RUN_QUEUE = 0x00080 // In parallell queue of lock owner + ,OP_DIRTY_READ = 0x00100 + ,OP_LOCK_REQ = 0x00200 // isAccLockReq + ,OP_COMMIT_DELETE_CHECK = 0x00400 + ,OP_INSERT_IS_DONE = 0x00800 + ,OP_ELEMENT_DISAPPEARED = 0x01000 + + ,OP_STATE_MASK = 0xF0000 + ,OP_STATE_IDLE = 0xF0000 + ,OP_STATE_WAITING = 0x00000 + ,OP_STATE_RUNNING = 0x10000 + ,OP_STATE_EXECUTED = 0x30000 + ,OP_STATE_RUNNING_ABORT = 0x20000 + + ,OP_EXECUTED_DIRTY_READ = 0x3050F + ,OP_INITIAL = ~(Uint32)0 + }; + + bool is_same_trans(const Operationrec* op) const { + return + transId1 == op->transId1 && transId2 == op->transId2; + } + }; /* p2c: size = 168 bytes */ typedef Ptr<Operationrec> OperationrecPtr; @@ -577,7 +606,6 @@ struct ScanRec { Uint32 scanUserblockref; Uint32 scanMask; Uint8 scanLockMode; - Uint8 scanKeyinfoFlag; Uint8 scanTimer; Uint8 scanContinuebCounter; Uint8 scanReadCommittedFlag; @@ -602,7 +630,8 @@ public: virtual ~Dbacc(); // pointer to TUP instance in this thread - Dbtup* c_tup; + class Dbtup* c_tup; + class Dblqh* c_lqh; void execACCMINUPDATE(Signal* signal); @@ -640,7 +669,8 @@ private: void ACCKEY_error(Uint32 fromWhere); void commitDeleteCheck(); - + void report_dealloc(Signal* signal, const Operationrec* opPtrP); + typedef void * RootfragmentrecPtr; void initRootFragPageZero(FragmentrecPtr, Page8Ptr); void initFragAdd(Signal*, FragmentrecPtr); @@ -679,14 +709,30 @@ private: bool addfragtotab(Signal* signal, Uint32 rootIndex, Uint32 fragId); void initOpRec(Signal* signal); void sendAcckeyconf(Signal* signal); - Uint32 placeReadInLockQueue(Signal* signal); - void placeSerialQueueRead(Signal* signal); - void checkOnlyReadEntry(Signal* signal); Uint32 getNoParallelTransaction(const Operationrec*); - void moveLastParallelQueue(Signal* signal); - void moveLastParallelQueueWrite(Signal* signal); - Uint32 placeWriteInLockQueue(Signal* signal); - void placeSerialQueueWrite(Signal* signal); + +#ifdef VM_TRACE + Uint32 getNoParallelTransactionFull(const Operationrec*); +#endif +#ifdef ACC_SAFE_QUEUE + bool validate_lock_queue(OperationrecPtr opPtr); + Uint32 get_parallel_head(OperationrecPtr opPtr); + void dump_lock_queue(OperationrecPtr loPtr); +#else + bool validate_lock_queue(OperationrecPtr) { return true;} +#endif + +public: + void execACCKEY_ORD(Signal* signal, Uint32 opPtrI); + void startNext(Signal* signal, OperationrecPtr lastOp); + +private: + Uint32 placeReadInLockQueue(OperationrecPtr lockOwnerPtr); + Uint32 placeWriteInLockQueue(OperationrecPtr lockOwnerPtr); + void placeSerialQueue(OperationrecPtr lockOwner, OperationrecPtr op); + void abortSerieQueueOperation(Signal* signal, OperationrecPtr op); + void abortParallelQueueOperation(Signal* signal, OperationrecPtr op); + void expandcontainer(Signal* signal); void shrinkcontainer(Signal* signal); void nextcontainerinfoExp(Signal* signal); @@ -716,8 +762,8 @@ private: void increaselistcont(Signal* signal); void seizeLeftlist(Signal* signal); void seizeRightlist(Signal* signal); - Uint32 readTablePk(Uint32 localkey1); - void getElement(Signal* signal); + Uint32 readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec*); + Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner); void getdirindex(Signal* signal); void commitdelete(Signal* signal); void deleteElement(Signal* signal); @@ -726,12 +772,17 @@ private: void releaseRightlist(Signal* signal); void checkoverfreelist(Signal* signal); void abortOperation(Signal* signal); - void accAbortReqLab(Signal* signal); void commitOperation(Signal* signal); - void copyOpInfo(Signal* signal); + void copyOpInfo(OperationrecPtr dst, OperationrecPtr src); Uint32 executeNextOperation(Signal* signal); void releaselock(Signal* signal); + void release_lockowner(Signal* signal, OperationrecPtr, bool commit); + void startNew(Signal* signal, OperationrecPtr newOwner); + void abortWaitingOperation(Signal*, OperationrecPtr); + void abortExecutedOperation(Signal*, OperationrecPtr); + void takeOutFragWaitQue(Signal* signal); + void check_lock_upgrade(Signal* signal, OperationrecPtr release_op, bool lo); void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner, OperationrecPtr release_op); void allocOverflowPage(Signal* signal); @@ -780,8 +831,8 @@ private: void senddatapagesLab(Signal* signal); void sttorrysignalLab(Signal* signal); void sendholdconfsignalLab(Signal* signal); - void accIsLockedLab(Signal* signal); - void insertExistElemLab(Signal* signal); + void accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr); + void insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr); void refaccConnectLab(Signal* signal); void releaseScanLab(Signal* signal); void ndbrestart1Lab(Signal* signal); @@ -840,8 +891,6 @@ private: Operationrec *operationrec; OperationrecPtr operationRecPtr; OperationrecPtr idrOperationRecPtr; - OperationrecPtr copyInOperPtr; - OperationrecPtr copyOperPtr; OperationrecPtr mlpqOperPtr; OperationrecPtr queOperPtr; OperationrecPtr readWriteOpPtr; @@ -885,8 +934,6 @@ private: Page8Ptr lcnPageptr; Page8Ptr lcnCopyPageptr; Page8Ptr lupPageptr; - Page8Ptr priPageptr; - Page8Ptr pwiPageptr; Page8Ptr ciPageidptr; Page8Ptr gsePageidptr; Page8Ptr isoPageptr; @@ -926,8 +973,6 @@ private: Tabrec *tabrec; TabrecPtr tabptr; Uint32 ctablesize; - Uint32 tpwiElementptr; - Uint32 tpriElementptr; Uint32 tgseElementptr; Uint32 tgseContainerptr; Uint32 trlHead; @@ -961,8 +1006,6 @@ private: Uint32 tdelForward; Uint32 tiopPageId; Uint32 tipPageId; - Uint32 tgeLocked; - Uint32 tgeResult; Uint32 tgeContainerptr; Uint32 tgeElementptr; Uint32 tgeForward; diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp index 80fe82ed657..27355299a9c 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp @@ -130,8 +130,6 @@ Dbacc::Dbacc(Block_context& ctx): &fragrecptr, &operationRecPtr, &idrOperationRecPtr, - ©InOperPtr, - ©OperPtr, &mlpqOperPtr, &queOperPtr, &readWriteOpPtr, @@ -161,8 +159,6 @@ Dbacc::Dbacc(Block_context& ctx): &lcnPageptr, &lcnCopyPageptr, &lupPageptr, - &priPageptr, - &pwiPageptr, &ciPageidptr, &gsePageidptr, &isoPageptr, diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 3768609860b..5337c7c015d 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -41,6 +41,19 @@ #define DEBUG(x) #endif +#ifdef ACC_SAFE_QUEUE +#define vlqrequire(x) do { if (unlikely(!(x))) {\ + dump_lock_queue(loPtr); \ + ndbrequire(false); } } while(0) +#else +#define vlqrequire(x) ndbrequire(x) +#endif + + +// primary key is stored in TUP +#include "../dbtup/Dbtup.hpp" +#include "../dblqh/Dblqh.hpp" + // Signal entries and statement blocks /* --------------------------------------------------------------------------------- */ @@ -208,8 +221,8 @@ void Dbacc::execSTTOR(Signal* signal) switch (tstartphase) { case 1: jam(); - c_tup = (Dbtup*)globalData.getBlock(DBTUP); - ndbrequire(c_tup != 0); + ndbrequire((c_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0); + ndbrequire((c_lqh = (Dblqh*)globalData.getBlock(DBLQH)) != 0); break; } tuserblockref = signal->theData[3]; @@ -435,9 +448,7 @@ void Dbacc::initialiseOperationRec(Signal* signal) for (operationRecPtr.i = 0; operationRecPtr.i < coprecsize; operationRecPtr.i++) { refresh_watch_dog(); ptrAss(operationRecPtr, operationrec); - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; - operationRecPtr.p->opState = FREE_OP; + operationRecPtr.p->m_op_bits = ~0; operationRecPtr.p->nextOp = operationRecPtr.i + 1; }//for operationRecPtr.i = coprecsize - 1; @@ -899,8 +910,7 @@ void Dbacc::execACCSEIZEREQ(Signal* signal) ptrGuard(operationRecPtr); operationRecPtr.p->userptr = tuserptr; operationRecPtr.p->userblockref = tuserblockref; - operationRecPtr.p->operation = ZUNDEFINED_OP; - operationRecPtr.p->transactionstate = IDLE; + operationRecPtr.p->m_op_bits = ~0; /* ******************************< */ /* ACCSEIZECONF */ /* ******************************< */ @@ -954,22 +964,19 @@ void Dbacc::initOpRec(Signal* signal) operationRecPtr.p->xfrmtupkeylen = signal->theData[4]; operationRecPtr.p->transId1 = signal->theData[5]; operationRecPtr.p->transId2 = signal->theData[6]; - operationRecPtr.p->transactionstate = ACTIVE; - operationRecPtr.p->commitDeleteCheckFlag = ZFALSE; - operationRecPtr.p->operation = Treqinfo & 0x7; - /* --------------------------------------------------------------------------------- */ - // opSimple is not used in this version. Is needed for deadlock handling later on. - /* --------------------------------------------------------------------------------- */ - // operationRecPtr.p->opSimple = (Treqinfo >> 3) & 0x1; - - operationRecPtr.p->lockMode = (Treqinfo >> 4) & 0x3; Uint32 readFlag = (((Treqinfo >> 4) & 0x3) == 0); // Only 1 if Read Uint32 dirtyFlag = (((Treqinfo >> 6) & 0x1) == 1); // Only 1 if Dirty Uint32 dirtyReadFlag = readFlag & dirtyFlag; - operationRecPtr.p->dirtyRead = dirtyReadFlag; - operationRecPtr.p->nodeType = (Treqinfo >> 7) & 0x3; + Uint32 opbits = 0; + opbits |= Treqinfo & 0x7; + opbits |= ((Treqinfo >> 4) & 0x3) ? Operationrec::OP_LOCK_MODE : 0; + opbits |= ((Treqinfo >> 4) & 0x3) ? Operationrec::OP_ACC_LOCK_MODE : 0; + opbits |= (dirtyReadFlag) ? Operationrec::OP_DIRTY_READ : 0; + opbits |= ((Treqinfo >> 31) & 0x1) ? Operationrec::OP_LOCK_REQ : 0; + + //operationRecPtr.p->nodeType = (Treqinfo >> 7) & 0x3; operationRecPtr.p->fid = fragrecptr.p->myfid; operationRecPtr.p->fragptr = fragrecptr.i; operationRecPtr.p->nextParallelQue = RNIL; @@ -977,14 +984,10 @@ void Dbacc::initOpRec(Signal* signal) operationRecPtr.p->nextSerialQue = RNIL; operationRecPtr.p->prevSerialQue = RNIL; operationRecPtr.p->elementPage = RNIL; - operationRecPtr.p->lockOwner = ZFALSE; - operationRecPtr.p->insertIsDone = ZFALSE; - operationRecPtr.p->elementIsDisappeared = ZFALSE; - operationRecPtr.p->insertDeleteLen = fragrecptr.p->elementLength; operationRecPtr.p->scanRecPtr = RNIL; + operationRecPtr.p->m_op_bits = opbits; // bit to mark lock operation - operationRecPtr.p->isAccLockReq = (Treqinfo >> 31) & 0x1; // undo log is not run via ACCKEYREQ }//Dbacc::initOpRec() @@ -995,7 +998,7 @@ void Dbacc::initOpRec(Signal* signal) void Dbacc::sendAcckeyconf(Signal* signal) { signal->theData[0] = operationRecPtr.p->userptr; - signal->theData[1] = operationRecPtr.p->operation; + signal->theData[1] = operationRecPtr.p->m_op_bits & Operationrec::OP_MASK; signal->theData[2] = operationRecPtr.p->fid; signal->theData[3] = operationRecPtr.p->localdata[0]; signal->theData[4] = operationRecPtr.p->localdata[1]; @@ -1003,7 +1006,8 @@ void Dbacc::sendAcckeyconf(Signal* signal) }//Dbacc::sendAcckeyconf() -void Dbacc::ACCKEY_error(Uint32 fromWhere) +void +Dbacc::ACCKEY_error(Uint32 fromWhere) { switch(fromWhere) { case 0: @@ -1057,7 +1061,8 @@ void Dbacc::execACCKEYREQ(Signal* signal) }//if ptrAss(operationRecPtr, operationrec); ptrAss(fragrecptr, fragmentrec); - ndbrequire(operationRecPtr.p->transactionstate == IDLE); + + ndbrequire(operationRecPtr.p->m_op_bits == Operationrec::OP_INITIAL); initOpRec(signal); // normalize key if any char attr @@ -1071,23 +1076,31 @@ void Dbacc::execACCKEYREQ(Signal* signal) /* WE REMEMBER THESE ADDRESS IF WE LATER NEED TO INSERT */ /* THE ITEM AFTER NOT FINDING THE ITEM. */ /*---------------------------------------------------------------*/ - getElement(signal); - - if (tgeResult == ZTRUE) { - switch (operationRecPtr.p->operation) { + OperationrecPtr lockOwnerPtr; + const Uint32 found = getElement(signal, lockOwnerPtr); + + Uint32 opbits = operationRecPtr.p->m_op_bits; + Uint32 op = opbits & Operationrec::OP_MASK; + if (found == ZTRUE) + { + switch (op) { case ZREAD: case ZUPDATE: case ZDELETE: case ZWRITE: case ZSCAN_OP: - if (!tgeLocked){ - if(operationRecPtr.p->operation == ZWRITE) + if (!lockOwnerPtr.p) + { + if(op == ZWRITE) { jam(); - operationRecPtr.p->operation = ZUPDATE; + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits |= (op = ZUPDATE); } + opbits |= Operationrec::OP_STATE_RUNNING; + opbits |= Operationrec::OP_RUN_QUEUE; sendAcckeyconf(signal); - if (operationRecPtr.p->dirtyRead == ZFALSE) { + if (! (opbits & Operationrec::OP_DIRTY_READ)) { /*---------------------------------------------------------------*/ // It is not a dirty read. We proceed by locking and continue with // the operation. @@ -1104,42 +1117,44 @@ void Dbacc::execACCKEYREQ(Signal* signal) dbgWord32(gePageptr, tgeElementptr, eh); gePageptr.p->word32[tgeElementptr] = eh; - insertLockOwnersList(signal , operationRecPtr); - return; + opbits |= Operationrec::OP_LOCK_OWNER; + insertLockOwnersList(signal, operationRecPtr); } else { jam(); /*---------------------------------------------------------------*/ // It is a dirty read. We do not lock anything. Set state to // IDLE since no COMMIT call will come. /*---------------------------------------------------------------*/ - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; - return; + opbits = Operationrec::OP_EXECUTED_DIRTY_READ; }//if + operationRecPtr.p->m_op_bits = opbits; + return; } else { jam(); - accIsLockedLab(signal); + accIsLockedLab(signal, lockOwnerPtr); return; }//if break; case ZINSERT: jam(); - insertExistElemLab(signal); + insertExistElemLab(signal, lockOwnerPtr); return; break; default: ndbrequire(false); break; }//switch - } else if (tgeResult == ZFALSE) { - switch (operationRecPtr.p->operation) { - case ZINSERT: + } else if (found == ZFALSE) { + switch (op){ case ZWRITE: + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits |= (op = ZINSERT); + case ZINSERT: jam(); - // If a write operation makes an insert we switch operation to ZINSERT so - // that the commit-method knows an insert has been made and updates noOfElements. - operationRecPtr.p->operation = ZINSERT; - operationRecPtr.p->insertIsDone = ZTRUE; + opbits |= Operationrec::OP_INSERT_IS_DONE; + opbits |= Operationrec::OP_STATE_RUNNING; + opbits |= Operationrec::OP_RUN_QUEUE; + operationRecPtr.p->m_op_bits = opbits; insertelementLab(signal); return; break; @@ -1157,13 +1172,285 @@ void Dbacc::execACCKEYREQ(Signal* signal) }//switch } else { jam(); - acckeyref1Lab(signal, tgeResult); + acckeyref1Lab(signal, found); return; }//if return; }//Dbacc::execACCKEYREQ() void +Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI) +{ + OperationrecPtr lastOp; + lastOp.i = opPtrI; + ptrCheckGuard(lastOp, coprecsize, operationrec); + Uint32 opbits = lastOp.p->m_op_bits; + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; + + if (likely(opbits == Operationrec::OP_EXECUTED_DIRTY_READ)) + { + jam(); + lastOp.p->m_op_bits = Operationrec::OP_INITIAL; + return; + } + else if (likely(opstate == Operationrec::OP_STATE_RUNNING)) + { + opbits |= Operationrec::OP_STATE_EXECUTED; + lastOp.p->m_op_bits = opbits; + startNext(signal, lastOp); + return; + } + else if (opstate == Operationrec::OP_STATE_RUNNING_ABORT) + { + } + else + { + } + + ndbout_c("bits: %.8x state: %.8x", opbits, opstate); + ndbrequire(false); +} + +void +Dbacc::startNext(Signal* signal, OperationrecPtr lastOp) +{ + jam(); + OperationrecPtr nextOp; + OperationrecPtr loPtr; + nextOp.i = lastOp.p->nextParallelQue; + loPtr.i = lastOp.p->m_lock_owner_ptr_i; + Uint32 opbits = lastOp.p->m_op_bits; + + if ((opbits & Operationrec::OP_STATE_MASK)!= Operationrec::OP_STATE_EXECUTED) + { + jam(); + return; + } + + Uint32 nextbits; + if (nextOp.i != RNIL) + { + jam(); + ptrCheckGuard(nextOp, coprecsize, operationrec); + nextbits = nextOp.p->m_op_bits; + goto checkop; + } + + if ((opbits & Operationrec::OP_LOCK_OWNER) == 0) + { + jam(); + ptrCheckGuard(loPtr, coprecsize, operationrec); + nextOp.i = loPtr.p->nextSerialQue; + } + else + { + jam(); + nextOp.i = loPtr.i; + loPtr = lastOp; + } + + ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + + if (nextOp.i == RNIL) + { + jam(); + return; + } + + /** + * There is an op in serie queue... + * Check if it can run + */ + ptrCheckGuard(nextOp, coprecsize, operationrec); + nextbits = nextOp.p->m_op_bits; + + bool same = nextOp.p->is_same_trans(lastOp.p); + + if (!same && ((opbits & Operationrec::OP_ACC_LOCK_MODE) || + (nextbits & Operationrec::OP_LOCK_MODE))) + { + jam(); + /** + * Not same transaction + * and either last had exclusive lock + * or next had exclusive lock + */ + return; + } + + /** + * same trans and X-lock + */ + if (same && (opbits & Operationrec::OP_ACC_LOCK_MODE)) + { + jam(); + goto upgrade; + } + + /** + * all shared lock... + */ + if ((opbits & Operationrec::OP_ACC_LOCK_MODE) == 0 && + (nextbits & Operationrec::OP_LOCK_MODE) == 0) + { + jam(); + goto upgrade; + } + + /** + * There is a shared parallell queue & and exclusive op is first in queue + */ + ndbassert((opbits & Operationrec::OP_ACC_LOCK_MODE) == 0 && + (nextbits & Operationrec::OP_LOCK_MODE)); + + /** + * We must check if there are many transactions in parallel queue... + */ + OperationrecPtr tmp; + tmp.i = loPtr.p->nextParallelQue; + while (tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + if (!nextOp.p->is_same_trans(tmp.p)) + { + jam(); + /** + * parallel queue contained another transaction, dont let it run + */ + return; + } + } + +upgrade: + /** + * Move first op in serie queue to end of parallell queue + */ + + tmp.i = loPtr.p->nextSerialQue = nextOp.p->nextSerialQue; + loPtr.p->m_lo_last_parallel_op_ptr_i = nextOp.i; + nextOp.p->nextSerialQue = RNIL; + nextOp.p->prevSerialQue = RNIL; + nextOp.p->m_lock_owner_ptr_i = loPtr.i; + nextOp.p->prevParallelQue = lastOp.i; + lastOp.p->nextParallelQue = nextOp.i; + + if (tmp.i != RNIL) + { + jam(); + ptrCheckGuard(tmp, coprecsize, operationrec); + tmp.p->prevSerialQue = loPtr.i; + } + else + { + jam(); + loPtr.p->m_lo_last_serial_op_ptr_i = RNIL; + } + + nextbits |= Operationrec::OP_RUN_QUEUE; + + /** + * Currently no grouping of ops in serie queue + */ + ndbrequire(nextOp.p->nextParallelQue == RNIL); + +checkop: + Uint32 errCode = 0; + OperationrecPtr save = operationRecPtr; + operationRecPtr = nextOp; + + Uint32 lastop = opbits & Operationrec::OP_MASK; + Uint32 nextop = nextbits & Operationrec::OP_MASK; + + nextbits &= nextbits & ~(Uint32)Operationrec::OP_STATE_MASK; + nextbits |= Operationrec::OP_STATE_RUNNING; + + if (lastop == ZDELETE) + { + jam(); + if (nextop != ZINSERT && nextop != ZWRITE) + { + errCode = ZREAD_ERROR; + goto ref; + } + + nextbits &= ~(Uint32)Operationrec::OP_MASK; + nextbits &= ~(Uint32)Operationrec::OP_ELEMENT_DISAPPEARED; + nextbits |= (nextop = ZINSERT); + nextbits |= Operationrec::OP_INSERT_IS_DONE; + goto conf; + } + else if (nextop == ZINSERT) + { + jam(); + errCode = ZWRITE_ERROR; + goto ref; + } + else if (nextop == ZWRITE) + { + jam(); + nextbits &= ~(Uint32)Operationrec::OP_MASK; + nextbits |= (nextop = ZUPDATE); + goto conf; + } + else + { + jam(); + } + +conf: + nextOp.p->m_op_bits = nextbits; + nextOp.p->localdata[0] = lastOp.p->localdata[0]; + nextOp.p->localdata[1] = lastOp.p->localdata[1]; + + if (nextop == ZSCAN_OP && (nextbits & Operationrec::OP_LOCK_REQ) == 0) + { + jam(); + takeOutScanLockQueue(nextOp.p->scanRecPtr); + putReadyScanQueue(signal, nextOp.p->scanRecPtr); + } + else + { + jam(); + sendAcckeyconf(signal); + sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF, + signal, 6, JBA); + } + + operationRecPtr = save; + return; + +ref: + nextOp.p->m_op_bits = nextbits; + + if (nextop == ZSCAN_OP && (nextbits & Operationrec::OP_LOCK_REQ) == 0) + { + jam(); + nextOp.p->m_op_bits |= Operationrec::OP_ELEMENT_DISAPPEARED; + takeOutScanLockQueue(nextOp.p->scanRecPtr); + putReadyScanQueue(signal, nextOp.p->scanRecPtr); + } + else + { + jam(); + signal->theData[0] = nextOp.p->userptr; + signal->theData[1] = errCode; + sendSignal(nextOp.p->userblockref, GSN_ACCKEYREF, signal, + 2, JBB); + } + + operationRecPtr = save; + return; +} + + +#if 0 +void +Dbacc::execACCKEY_REP_REF(Signal* signal, Uint32 opPtrI) +{ +} +#endif + +void Dbacc::xfrmKeyData(Signal* signal) { Uint32 table = fragrecptr.p->myTableId; @@ -1176,23 +1463,22 @@ Dbacc::xfrmKeyData(Signal* signal) operationRecPtr.p->xfrmtupkeylen = len; } -void Dbacc::accIsLockedLab(Signal* signal) +void +Dbacc::accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr) { ndbrequire(csystemRestart == ZFALSE); - queOperPtr.i = ElementHeader::getOpPtrI(gePageptr.p->word32[tgeElementptr]); - ptrCheckGuard(queOperPtr, coprecsize, operationrec); - if (operationRecPtr.p->dirtyRead == ZFALSE) { + + Uint32 bits = operationRecPtr.p->m_op_bits; + validate_lock_queue(lockOwnerPtr); + + if ((bits & Operationrec::OP_DIRTY_READ) == 0){ Uint32 return_result; - if (operationRecPtr.p->lockMode == ZREADLOCK) { + if ((bits & Operationrec::OP_LOCK_MODE) == ZREADLOCK) { jam(); - priPageptr = gePageptr; - tpriElementptr = tgeElementptr; - return_result = placeReadInLockQueue(signal); + return_result = placeReadInLockQueue(lockOwnerPtr); } else { jam(); - pwiPageptr = gePageptr; - tpwiElementptr = tgeElementptr; - return_result = placeWriteInLockQueue(signal); + return_result = placeWriteInLockQueue(lockOwnerPtr); }//if if (return_result == ZPARALLEL_QUEUE) { jam(); @@ -1202,24 +1488,29 @@ void Dbacc::accIsLockedLab(Signal* signal) jam(); signal->theData[0] = RNIL; return; - } else if (return_result == ZWRITE_ERROR) { + } else { jam(); acckeyref1Lab(signal, return_result); return; }//if ndbrequire(false); - } else { - if (queOperPtr.p->elementIsDisappeared == ZFALSE) { + } + else + { + if (!(lockOwnerPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED) && + lockOwnerPtr.p->localdata[0] != ~(Uint32)0) + { jam(); - /*---------------------------------------------------------------*/ - // It is a dirty read. We do not lock anything. Set state to - // IDLE since no COMMIT call will arrive. - /*---------------------------------------------------------------*/ + /* --------------------------------------------------------------- + * It is a dirty read. We do not lock anything. Set state to + *IDLE since no COMMIT call will arrive. + * ---------------------------------------------------------------*/ sendAcckeyconf(signal); - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; + operationRecPtr.p->m_op_bits = Operationrec::OP_EXECUTED_DIRTY_READ; return; - } else { + } + else + { jam(); /*---------------------------------------------------------------*/ // The tuple does not exist in the committed world currently. @@ -1231,17 +1522,18 @@ void Dbacc::accIsLockedLab(Signal* signal) }//if }//Dbacc::accIsLockedLab() -/* --------------------------------------------------------------------------------- */ -/* I N S E R T E X I S T E L E M E N T */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::insertExistElemLab(Signal* signal) +/* ------------------------------------------------------------------------ */ +/* I N S E R T E X I S T E L E M E N T */ +/* ------------------------------------------------------------------------ */ +void Dbacc::insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr) { - if (!tgeLocked){ + if (!lockOwnerPtr.p) + { jam(); acckeyref1Lab(signal, ZWRITE_ERROR);/* THE ELEMENT ALREADY EXIST */ return; }//if - accIsLockedLab(signal); + accIsLockedLab(signal, lockOwnerPtr); }//Dbacc::insertExistElemLab() /* --------------------------------------------------------------------------------- */ @@ -1259,26 +1551,8 @@ void Dbacc::insertelementLab(Signal* signal) }//if }//if ndbrequire(operationRecPtr.p->tupkeylen <= fragrecptr.p->keyLength); - - Uint32 localKey; - if(!operationRecPtr.p->isAccLockReq) - { - signal->theData[0] = operationRecPtr.p->userptr; - Uint32 blockNo = refToBlock(operationRecPtr.p->userblockref); - EXECUTE_DIRECT(blockNo, GSN_LQH_ALLOCREQ, signal, 1); - jamEntry(); - if (signal->theData[0] != 0) { - jam(); - Uint32 result_code = signal->theData[0]; - acckeyref1Lab(signal, result_code); - return; - }//if - localKey = (signal->theData[1] << MAX_TUPLES_BITS) + signal->theData[2]; - } - else - { - localKey = signal->theData[7]; - } + ndbassert(!(operationRecPtr.p->m_op_bits & Operationrec::OP_LOCK_REQ)); + Uint32 localKey = ~(Uint32)0; insertLockOwnersList(signal, operationRecPtr); @@ -1293,196 +1567,18 @@ void Dbacc::insertelementLab(Signal* signal) idrOperationRecPtr = operationRecPtr; clocalkey[0] = localKey; operationRecPtr.p->localdata[0] = localKey; - /* --------------------------------------------------------------------------------- */ - /* WE SET THE LOCAL KEY TO MINUS ONE TO INDICATE IT IS NOT YET VALID. */ - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------------- */ + /* WE SET THE LOCAL KEY TO MINUS ONE TO INDICATE IT IS NOT YET VALID. */ + /* ----------------------------------------------------------------------- */ insertElement(signal); sendAcckeyconf(signal); return; }//Dbacc::insertelementLab() -/* --------------------------------------------------------------------------------- */ -/* PLACE_READ_IN_LOCK_QUEUE */ -/* INPUT: OPERATION_REC_PTR OUR OPERATION POINTER */ -/* QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER */ -/* PRI_PAGEPTR PAGE POINTER OF ELEMENT */ -/* TPRI_ELEMENTPTR ELEMENT POINTER OF ELEMENT */ -/* OUTPUT TRESULT = */ -/* ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE */ -/* OPERATION CAN PROCEED NOW. */ -/* ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE */ -/* ERROR CODE OPERATION NEEDS ABORTING */ -/* THE ELEMENT WAS LOCKED AND WE WANT TO READ THE TUPLE. WE WILL CHECK THE LOCK */ -/* QUEUES TO PERFORM THE PROPER ACTION. */ -/* */ -/* IN SOME PLACES IN THE CODE BELOW THAT HANDLES WHAT TO DO WHEN THE TUPLE IS LOCKED */ -/* WE DO ASSUME THAT NEXT_PARALLEL_QUEUE AND NEXT_SERIAL_QUEUE ON OPERATION_REC_PTR */ -/* HAVE BEEN INITIALISED TO RNIL. THUS WE DO NOT PERFORM THIS ONCE MORE EVEN IF IT */ -/* COULD BE NICE FOR READABILITY. */ -/* --------------------------------------------------------------------------------- */ -Uint32 Dbacc::placeReadInLockQueue(Signal* signal) -{ - if (getNoParallelTransaction(queOperPtr.p) == 1) { - if ((queOperPtr.p->transId1 == operationRecPtr.p->transId1) && - (queOperPtr.p->transId2 == operationRecPtr.p->transId2)) { - /* --------------------------------------------------------------------------------- */ - /* WE ARE PERFORMING A READ OPERATION AND THIS TRANSACTION ALREADY OWNS THE LOCK */ - /* ALONE. PUT THE OPERATION LAST IN THE PARALLEL QUEUE. */ - /* --------------------------------------------------------------------------------- */ - jam(); - mlpqOperPtr = queOperPtr; - moveLastParallelQueue(signal); - operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1]; - operationRecPtr.p->prevParallelQue = mlpqOperPtr.i; - mlpqOperPtr.p->nextParallelQue = operationRecPtr.i; - switch (queOperPtr.p->lockMode) { - case ZREADLOCK: - jam(); - /*empty*/; - break; - default: - jam(); - /* --------------------------------------------------------------------------------- */ - /* IF THE TRANSACTION PREVIOUSLY SET A WRITE LOCK WE MUST ENSURE THAT ALL */ - /* OPERATIONS IN THE PARALLEL QUEUE HAVE WRITE LOCK MODE TO AVOID STRANGE BUGS.*/ - /* --------------------------------------------------------------------------------- */ - operationRecPtr.p->lockMode = queOperPtr.p->lockMode; - break; - }//switch - return ZPARALLEL_QUEUE; - }//if - }//if - if (queOperPtr.p->nextSerialQue == RNIL) { - /* --------------------------------------------------------------------------------- */ - /* WE ARE PERFORMING A READ OPERATION AND THERE IS NO SERIAL QUEUE. IF THERE IS NO */ - /* WRITE OPERATION THAT OWNS THE LOCK OR ANY WRITE OPERATION IN THE PARALLEL QUEUE */ - /* IT IS ENOUGH TO CHECK THE LOCK MODE OF THE LEADER IN THE PARALLEL QUEUE. IF IT IS */ - /* A READ LOCK THEN WE PLACE OURSELVES IN THE PARALLEL QUEUE OTHERWISE WE GO ON TO */ - /* PLACE OURSELVES IN THE SERIAL QUEUE. */ - /* --------------------------------------------------------------------------------- */ - switch (queOperPtr.p->lockMode) { - case ZREADLOCK: - jam(); - mlpqOperPtr = queOperPtr; - moveLastParallelQueue(signal); - operationRecPtr.p->prevParallelQue = mlpqOperPtr.i; - mlpqOperPtr.p->nextParallelQue = operationRecPtr.i; - operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1]; - return ZPARALLEL_QUEUE; - default: - jam(); - queOperPtr.p->nextSerialQue = operationRecPtr.i; - operationRecPtr.p->prevSerialQue = queOperPtr.i; - break; - }//switch - } else { - jam(); - placeSerialQueueRead(signal); - }//if - return ZSERIAL_QUEUE; -}//Dbacc::placeReadInLockQueue() - -/* --------------------------------------------------------------------------------- */ -/* WE WILL CHECK IF THIS TRANSACTION IS ALREADY PLACED AT SOME SPOT IN THE PARALLEL */ -/* SERIAL QUEUE WITHOUT ANY NEIGHBORS FROM OTHER TRANSACTION. IF SO WE WILL INSERT */ -/* IT IN THAT PARALLEL QUEUE. */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::placeSerialQueueRead(Signal* signal) -{ - readWriteOpPtr.i = queOperPtr.p->nextSerialQue; - ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec); -PSQR_LOOP: - jam(); - if (readWriteOpPtr.p->nextSerialQue == RNIL) { - jam(); - /* --------------------------------------------------------------------------------- */ - /* THERE WAS NO PREVIOUS OPERATION IN THIS TRANSACTION WHICH WE COULD PUT IT */ - /* IN THE PARALLEL QUEUE TOGETHER WITH. */ - /* --------------------------------------------------------------------------------- */ - checkOnlyReadEntry(signal); - return; - }//if - if (getNoParallelTransaction(readWriteOpPtr.p) == 1) { - jam(); - /* --------------------------------------------------------------------------------- */ - /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */ - /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */ - /* --------------------------------------------------------------------------------- */ - if ((readWriteOpPtr.p->transId1 == operationRecPtr.p->transId1) && - (readWriteOpPtr.p->transId2 == operationRecPtr.p->transId2)) { - jam(); - /* --------------------------------------------------------------------------------- */ - /* WE ARE PERFORMING A READ IN THE SAME TRANSACTION WHERE WE ALREADY */ - /* PREVIOUSLY HAVE EXECUTED AN OPERATION. INSERT-DELETE, READ-UPDATE, READ-READ, */ - /* UPDATE-UPDATE, UPDATE-DELETE, READ-DELETE, INSERT-READ, INSERT-UPDATE ARE ALLOWED */ - /* COMBINATIONS. A NEW INSERT AFTER A DELETE IS NOT ALLOWED AND SUCH AN INSERT WILL */ - /* GO TO THE SERIAL LOCK QUEUE WHICH IT WILL NOT LEAVE UNTIL A TIME-OUT AND THE */ - /* TRANSACTION IS ABORTED. READS AND UPDATES AFTER DELETES IS ALSO NOT ALLOWED. */ - /* --------------------------------------------------------------------------------- */ - mlpqOperPtr = readWriteOpPtr; - moveLastParallelQueue(signal); - readWriteOpPtr = mlpqOperPtr; - operationRecPtr.p->prevParallelQue = readWriteOpPtr.i; - readWriteOpPtr.p->nextParallelQue = operationRecPtr.i; - operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1]; - switch (readWriteOpPtr.p->lockMode) { - case ZREADLOCK: - jam(); - /*empty*/; - break; - default: - jam(); - /* --------------------------------------------------------------------------------- */ - /* IF THE TRANSACTION PREVIOUSLY SET A WRITE LOCK WE MUST ENSURE THAT ALL */ - /* OPERATIONS IN THE PARALLEL QUEUE HAVE WRITE LOCK MODE TO AVOID STRANGE BUGS.*/ - /* --------------------------------------------------------------------------------- */ - operationRecPtr.p->lockMode = readWriteOpPtr.p->lockMode; - break; - }//switch - return; - }//if - }//if - readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue; - ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec); - goto PSQR_LOOP; -}//Dbacc::placeSerialQueueRead() -/* --------------------------------------------------------------------------------- */ -/* WE WILL CHECK IF THE LAST ENTRY IN THE SERIAL QUEUE CONTAINS ONLY READ */ -/* OPERATIONS. IF SO WE WILL INSERT IT IN THAT PARALLEL QUEUE. OTHERWISE WE */ -/* WILL PLACE IT AT THE END OF THE SERIAL QUEUE. */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::checkOnlyReadEntry(Signal* signal) -{ - switch (readWriteOpPtr.p->lockMode) { - case ZREADLOCK: - jam(); - /* --------------------------------------------------------------------------------- */ - /* SINCE THIS LAST QUEUE ONLY CONTAINS READ LOCKS WE CAN JOIN THE PARALLEL QUEUE AT */ - /* THE END. */ - /* --------------------------------------------------------------------------------- */ - mlpqOperPtr = readWriteOpPtr; - moveLastParallelQueue(signal); - readWriteOpPtr = mlpqOperPtr; - operationRecPtr.p->prevParallelQue = readWriteOpPtr.i; - readWriteOpPtr.p->nextParallelQue = operationRecPtr.i; - operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1]; - break; - default: - jam(); /* PUT THE OPERATION RECORD IN THE SERIAL QUEUE */ - readWriteOpPtr.p->nextSerialQue = operationRecPtr.i; - operationRecPtr.p->prevSerialQue = readWriteOpPtr.i; - break; - }//switch -}//Dbacc::checkOnlyReadEntry() - -/* --------------------------------------------------------------------------------- */ -/* GET_NO_PARALLEL_TRANSACTION */ -/* --------------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------ */ +/* GET_NO_PARALLEL_TRANSACTION */ +/* ------------------------------------------------------------------------ */ Uint32 Dbacc::getNoParallelTransaction(const Operationrec * op) { @@ -1502,135 +1598,641 @@ Dbacc::getNoParallelTransaction(const Operationrec * op) return 1; }//Dbacc::getNoParallelTransaction() -void Dbacc::moveLastParallelQueue(Signal* signal) +#ifdef VM_TRACE +Uint32 +Dbacc::getNoParallelTransactionFull(const Operationrec * op) { - while (mlpqOperPtr.p->nextParallelQue != RNIL) { - jam(); - mlpqOperPtr.i = mlpqOperPtr.p->nextParallelQue; - ptrCheckGuard(mlpqOperPtr, coprecsize, operationrec); - }//if -}//Dbacc::moveLastParallelQueue() + ConstPtr<Operationrec> tmp; + + tmp.p = op; + while ((tmp.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0) + { + tmp.i = tmp.p->prevParallelQue; + if (tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + } + else + { + break; + } + } + + return getNoParallelTransaction(tmp.p); +} +#endif -void Dbacc::moveLastParallelQueueWrite(Signal* signal) +#ifdef ACC_SAFE_QUEUE + +Uint32 +Dbacc::get_parallel_head(OperationrecPtr opPtr) { - /* --------------------------------------------------------------------------------- */ - /* ENSURE THAT ALL OPERATIONS HAVE LOCK MODE SET TO WRITE SINCE WE INSERT A */ - /* WRITE LOCK INTO THE PARALLEL QUEUE. */ - /* --------------------------------------------------------------------------------- */ - while (mlpqOperPtr.p->nextParallelQue != RNIL) { - jam(); - mlpqOperPtr.p->lockMode = operationRecPtr.p->lockMode; - mlpqOperPtr.i = mlpqOperPtr.p->nextParallelQue; - ptrCheckGuard(mlpqOperPtr, coprecsize, operationrec); - }//if - mlpqOperPtr.p->lockMode = operationRecPtr.p->lockMode; -}//Dbacc::moveLastParallelQueueWrite() + while ((opPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 && + opPtr.p->prevParallelQue != RNIL) + { + opPtr.i = opPtr.p->prevParallelQue; + ptrCheckGuard(opPtr, coprecsize, operationrec); + } + + return opPtr.i; +} -/* --------------------------------------------------------------------------------- */ -/* PLACE_WRITE_IN_LOCK_QUEUE */ -/* INPUT: OPERATION_REC_PTR OUR OPERATION POINTER */ -/* QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER */ -/* PWI_PAGEPTR PAGE POINTER OF ELEMENT */ -/* TPWI_ELEMENTPTR ELEMENT POINTER OF ELEMENT */ -/* OUTPUT TRESULT = */ -/* ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE */ -/* OPERATION CAN PROCEED NOW. */ -/* ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE */ -/* ERROR CODE OPERATION NEEDS ABORTING */ -/* --------------------------------------------------------------------------------- */ -Uint32 Dbacc::placeWriteInLockQueue(Signal* signal) +bool +Dbacc::validate_lock_queue(OperationrecPtr opPtr) +{ + OperationrecPtr loPtr; + loPtr.i = get_parallel_head(opPtr); + ptrCheckGuard(loPtr, coprecsize, operationrec); + + while((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 && + loPtr.p->prevSerialQue != RNIL) + { + loPtr.i = loPtr.p->prevSerialQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + + // Now we have lock owner... + vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_RUN_QUEUE); + + // 1 Validate page pointer + { + Page8Ptr pagePtr; + pagePtr.i = loPtr.p->elementPage; + ptrCheckGuard(pagePtr, cpagesize, page8); + arrGuard(loPtr.p->elementPointer, 2048); + Uint32 eh = pagePtr.p->word32[loPtr.p->elementPointer]; + vlqrequire(ElementHeader::getLocked(eh)); + vlqrequire(ElementHeader::getOpPtrI(eh) == loPtr.i); + } + + // 2 Lock owner should always have same LOCK_MODE and ACC_LOCK_MODE + if (loPtr.p->m_op_bits & Operationrec::OP_LOCK_MODE) + { + vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE); + } + else + { + vlqrequire((loPtr.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE) == 0); + } + + // 3 Lock owner should never be waiting... + bool running = false; + { + Uint32 opstate = loPtr.p->m_op_bits & Operationrec::OP_STATE_MASK; + if (opstate == Operationrec::OP_STATE_RUNNING || + opstate == Operationrec::OP_STATE_RUNNING_ABORT) + running = true; + else + { + vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED); + } + } + + // Validate parallel queue + { + bool many = false; + bool orlockmode = loPtr.p->m_op_bits & Operationrec::OP_LOCK_MODE; + OperationrecPtr lastP = loPtr; + + while (lastP.p->nextParallelQue != RNIL) + { + Uint32 prev = lastP.i; + lastP.i = lastP.p->nextParallelQue; + ptrCheckGuard(lastP, coprecsize, operationrec); + + vlqrequire(lastP.p->prevParallelQue == prev); + + Uint32 opbits = lastP.p->m_op_bits; + many |= loPtr.p->is_same_trans(lastP.p) ? 0 : 1; + orlockmode |= !!(opbits & Operationrec::OP_LOCK_MODE); + + vlqrequire(opbits & Operationrec::OP_RUN_QUEUE); + vlqrequire((opbits & Operationrec::OP_LOCK_OWNER) == 0); + + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; + if (running) + { + // If I found a running operation, + // all following should be waiting + vlqrequire(opstate == Operationrec::OP_STATE_WAITING); + } + else + { + if (opstate == Operationrec::OP_STATE_RUNNING || + opstate == Operationrec::OP_STATE_RUNNING_ABORT) + running = true; + else + vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED); + } + + if (lastP.p->m_op_bits & Operationrec::OP_LOCK_MODE) + { + vlqrequire(lastP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE); + } + else + { + vlqrequire((lastP.p->m_op_bits && orlockmode) == orlockmode); + vlqrequire((lastP.p->m_op_bits & Operationrec::OP_MASK) == ZREAD || + (lastP.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP); + } + + if (many) + { + vlqrequire(orlockmode == 0); + } + } + + if (lastP.i != loPtr.i) + { + vlqrequire(loPtr.p->m_lo_last_parallel_op_ptr_i == lastP.i); + vlqrequire(lastP.p->m_lock_owner_ptr_i == loPtr.i); + } + else + { + vlqrequire(loPtr.p->m_lo_last_parallel_op_ptr_i == RNIL); + } + } + + // Validate serie queue + if (loPtr.p->nextSerialQue != RNIL) + { + Uint32 prev = loPtr.i; + OperationrecPtr lastS; + lastS.i = loPtr.p->nextSerialQue; + while (true) + { + ptrCheckGuard(lastS, coprecsize, operationrec); + vlqrequire(lastS.p->prevSerialQue == prev); + vlqrequire(getNoParallelTransaction(lastS.p) == 1); + vlqrequire((lastS.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0); + vlqrequire((lastS.p->m_op_bits & Operationrec::OP_RUN_QUEUE) == 0); + vlqrequire((lastS.p->m_op_bits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_WAITING); + if (lastS.p->nextSerialQue == RNIL) + break; + prev = lastS.i; + lastS.i = lastS.p->nextSerialQue; + } + + vlqrequire(loPtr.p->m_lo_last_serial_op_ptr_i == lastS.i); + } + else + { + vlqrequire(loPtr.p->m_lo_last_serial_op_ptr_i == RNIL); + } + return true; +} + +NdbOut& +operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr) { - if (!((getNoParallelTransaction(queOperPtr.p) == 1) && - (queOperPtr.p->transId1 == operationRecPtr.p->transId1) && - (queOperPtr.p->transId2 == operationRecPtr.p->transId2))) { + Uint32 opbits = ptr.p->m_op_bits; + out << "[ " << dec << ptr.i + << " [ " << hex << ptr.p->transId1 + << " " << hex << ptr.p->transId2 << "] " + << " bits: H'" << hex << opbits << " "; + + bool read = false; + switch(opbits & Dbacc::Operationrec::OP_MASK){ + case ZREAD: out << "READ "; read = true; break; + case ZINSERT: out << "INSERT "; break; + case ZUPDATE: out << "UPDATE "; break; + case ZDELETE: out << "DELETE "; break; + case ZWRITE: out << "WRITE "; break; + case ZSCAN_OP: out << "SCAN "; read = true; break; + default: + out << "<Unknown: H'" + << hex << (opbits & Dbacc::Operationrec::OP_MASK) + << "> "; + } + + if (read) + { + if (opbits & Dbacc::Operationrec::OP_LOCK_MODE) + out << "(X)"; + else + out << "(S)"; + if (opbits & Dbacc::Operationrec::OP_ACC_LOCK_MODE) + out << "(X)"; + else + out << "(S)"; + } + + if (opbits) + { + out << "(RQ)"; + } + + switch(opbits & Dbacc::Operationrec::OP_STATE_MASK){ + case Dbacc::Operationrec::OP_STATE_WAITING: + out << " WAITING "; break; + case Dbacc::Operationrec::OP_STATE_RUNNING: + out << " RUNNING "; break; + case Dbacc::Operationrec::OP_STATE_EXECUTED: + out << " EXECUTED "; break; + case Dbacc::Operationrec::OP_STATE_RUNNING_ABORT: + out << " RUNNIG_ABORT "; break; + case Dbacc::Operationrec::OP_STATE_IDLE: + out << " IDLE "; break; + default: + out << " <Unknown: H'" + << hex << (opbits & Dbacc::Operationrec::OP_STATE_MASK) + << "> "; + } + +/* + OP_MASK = 0x000F // 4 bits for operation type + ,OP_LOCK_MODE = 0x0010 // 0 - shared lock, 1 = exclusive lock + ,OP_ACC_LOCK_MODE = 0x0020 // Or:de lock mode of all operation + // before me + ,OP_LOCK_OWNER = 0x0040 + ,OP_DIRTY_READ = 0x0080 + ,OP_LOCK_REQ = 0x0100 // isAccLockReq + ,OP_COMMIT_DELETE_CHECK = 0x0200 + ,OP_INSERT_IS_DONE = 0x0400 + ,OP_ELEMENT_DISAPPEARED = 0x0800 + + ,OP_STATE_MASK = 0xF000 + ,OP_STATE_IDLE = 0xF000 + ,OP_STATE_WAITING = 0x0000 + ,OP_STATE_RUNNING = 0x1000 + ,OP_STATE_EXECUTED = 0x3000 + ,OP_STATE_RUNNING_ABORT = 0x2000 + }; +*/ + if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER) + out << "LO "; + + if (opbits & Dbacc::Operationrec::OP_DIRTY_READ) + out << "DR "; + + if (opbits & Dbacc::Operationrec::OP_LOCK_REQ) + out << "LOCK_REQ "; + + if (opbits & Dbacc::Operationrec::OP_COMMIT_DELETE_CHECK) + out << "COMMIT_DELETE_CHECK "; + + if (opbits & Dbacc::Operationrec::OP_INSERT_IS_DONE) + out << "INSERT_IS_DONE "; + + if (opbits & Dbacc::Operationrec::OP_ELEMENT_DISAPPEARED) + out << "ELEMENT_DISAPPEARED "; + + if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER) + { + out << "last_parallel: " << dec << ptr.p->m_lo_last_parallel_op_ptr_i << " "; + out << "last_serial: " << dec << ptr.p->m_lo_last_serial_op_ptr_i << " "; + } + + out << "]"; + return out; +} + +void +Dbacc::dump_lock_queue(OperationrecPtr loPtr) +{ + if ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0) + { + while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 && + loPtr.p->prevParallelQue != RNIL) + { + loPtr.i = loPtr.p->prevParallelQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + + while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 && + loPtr.p->prevSerialQue != RNIL) + { + loPtr.i = loPtr.p->prevSerialQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + + ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + } + + ndbout << "-- HEAD --" << endl; + OperationrecPtr tmp = loPtr; + while (tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + ndbout << tmp << " "; + tmp.i = tmp.p->nextParallelQue; + + if (tmp.i == loPtr.i) + { + ndbout << " <LOOP>"; + break; + } + } + ndbout << endl; + + tmp.i = loPtr.p->nextSerialQue; + while (tmp.i != RNIL) + { + ptrCheckGuard(tmp, coprecsize, operationrec); + OperationrecPtr tmp2 = tmp; + + if (tmp.i == loPtr.i) + { + ndbout << "<LOOP S>" << endl; + break; + } + + while (tmp2.i != RNIL) + { + ptrCheckGuard(tmp2, coprecsize, operationrec); + ndbout << tmp2 << " "; + tmp2.i = tmp2.p->nextParallelQue; + + if (tmp2.i == tmp.i) + { + ndbout << "<LOOP 3>"; + break; + } + } + ndbout << endl; + tmp.i = tmp.p->nextSerialQue; + } +} +#endif + +/* ------------------------------------------------------------------------- + * PLACE_WRITE_IN_LOCK_QUEUE + * INPUT: OPERATION_REC_PTR OUR OPERATION POINTER + * QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER + * PWI_PAGEPTR PAGE POINTER OF ELEMENT + * TPWI_ELEMENTPTR ELEMENT POINTER OF ELEMENT + * OUTPUT TRESULT = + * ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE + * OPERATION CAN PROCEED NOW. + * ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE + * ERROR CODE OPERATION NEEDS ABORTING + * ------------------------------------------------------------------------- */ +Uint32 +Dbacc::placeWriteInLockQueue(OperationrecPtr lockOwnerPtr) +{ + OperationrecPtr lastOpPtr; + lastOpPtr.i = lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i; + Uint32 opbits = operationRecPtr.p->m_op_bits; + + if (lastOpPtr.i == RNIL) + { + lastOpPtr = lockOwnerPtr; + } + else + { + ptrCheckGuard(lastOpPtr, coprecsize, operationrec); + } + + ndbassert(get_parallel_head(lastOpPtr) == lockOwnerPtr.i); + + Uint32 lastbits = lastOpPtr.p->m_op_bits; + if (lastbits & Operationrec::OP_ACC_LOCK_MODE) + { + if(operationRecPtr.p->is_same_trans(lastOpPtr.p)) + { + goto checkop; + } + } + else + { + /** + * We dont have an exclusive lock on operation and + * + */ jam(); - placeSerialQueueWrite(signal); - return ZSERIAL_QUEUE; - }//if + + /** + * Scan parallell queue to see if we are the only one + */ + OperationrecPtr loopPtr = lockOwnerPtr; + do + { + ptrCheckGuard(loopPtr, coprecsize, operationrec); + if (!loopPtr.p->is_same_trans(operationRecPtr.p)) + { + goto serial; + } + loopPtr.i = loopPtr.p->nextParallelQue; + } while (loopPtr.i != RNIL); + + goto checkop; + } +serial: + jam(); + placeSerialQueue(lockOwnerPtr, operationRecPtr); + + validate_lock_queue(lockOwnerPtr); + + return ZSERIAL_QUEUE; + +checkop: /* - WE ARE PERFORMING AN READ EXCLUSIVE, INSERT, UPDATE OR DELETE IN THE SAME - TRANSACTION WHERE WE PREVIOUSLY HAVE EXECUTED AN OPERATION. - Read-All, Update-All, Insert-All and Delete-Insert are allowed - combinations. - Delete-Read, Delete-Update and Delete-Delete are not an allowed - combination and will result in tuple not found error. + WE ARE PERFORMING AN READ EXCLUSIVE, INSERT, UPDATE OR DELETE IN THE SAME + TRANSACTION WHERE WE PREVIOUSLY HAVE EXECUTED AN OPERATION. + Read-All, Update-All, Insert-All and Delete-Insert are allowed + combinations. + Delete-Read, Delete-Update and Delete-Delete are not an allowed + combination and will result in tuple not found error. */ - mlpqOperPtr = queOperPtr; - moveLastParallelQueueWrite(signal); + Uint32 lstate = lastbits & Operationrec::OP_STATE_MASK; - if (operationRecPtr.p->operation == ZINSERT && - mlpqOperPtr.p->operation != ZDELETE){ + Uint32 retValue = ZSERIAL_QUEUE; // So that it gets blocked... + if (lstate == Operationrec::OP_STATE_EXECUTED) + { jam(); - return ZWRITE_ERROR; - }//if - if(operationRecPtr.p->operation == ZWRITE) - { - operationRecPtr.p->operation = - (mlpqOperPtr.p->operation == ZDELETE) ? ZINSERT : ZUPDATE; + /** + * Since last operation has executed...we can now check operation types + * if not, we have to wait until it has executed + */ + Uint32 op = opbits & Operationrec::OP_MASK; + Uint32 lop = lastbits & Operationrec::OP_MASK; + if (op == ZINSERT && lop != ZDELETE) + { + jam(); + return ZWRITE_ERROR; + }//if + + /** + * NOTE. No checking op operation types, as one can read different save + * points... + */ +#if 0 + if (lop == ZDELETE && (op != ZINSERT && op != ZWRITE)) + { + jam(); + return ZREAD_ERROR; + } +#else + if (lop == ZDELETE && (op == ZUPDATE && op == ZDELETE)) + { + jam(); + return ZREAD_ERROR; + } +#endif + + if(op == ZWRITE) + { + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits |= (lop == ZDELETE) ? ZINSERT : ZUPDATE; + } + + opbits |= Operationrec::OP_STATE_RUNNING; + operationRecPtr.p->localdata[0] = lastOpPtr.p->localdata[0]; + operationRecPtr.p->localdata[1] = lastOpPtr.p->localdata[1]; + retValue = ZPARALLEL_QUEUE; } - operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1]; - operationRecPtr.p->prevParallelQue = mlpqOperPtr.i; - mlpqOperPtr.p->nextParallelQue = operationRecPtr.i; - return ZPARALLEL_QUEUE; + opbits |= Operationrec::OP_RUN_QUEUE; + operationRecPtr.p->m_op_bits = opbits; + operationRecPtr.p->prevParallelQue = lastOpPtr.i; + operationRecPtr.p->m_lock_owner_ptr_i = lockOwnerPtr.i; + lastOpPtr.p->nextParallelQue = operationRecPtr.i; + lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i = operationRecPtr.i; + + validate_lock_queue(lockOwnerPtr); + + return retValue; }//Dbacc::placeWriteInLockQueue() -/* --------------------------------------------------------------------------------- */ -/* WE HAVE TO PLACE IT SOMEWHERE IN THE SERIAL QUEUE INSTEAD. */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::placeSerialQueueWrite(Signal* signal) +Uint32 +Dbacc::placeReadInLockQueue(OperationrecPtr lockOwnerPtr) { - readWriteOpPtr = queOperPtr; -PSQW_LOOP: - if (readWriteOpPtr.p->nextSerialQue == RNIL) { + OperationrecPtr lastOpPtr; + OperationrecPtr loopPtr = lockOwnerPtr; + lastOpPtr.i = lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i; + Uint32 opbits = operationRecPtr.p->m_op_bits; + + if (lastOpPtr.i == RNIL) + { + lastOpPtr = lockOwnerPtr; + } + else + { + ptrCheckGuard(lastOpPtr, coprecsize, operationrec); + } + + ndbassert(get_parallel_head(lastOpPtr) == lockOwnerPtr.i); + + /** + * Last operation in parallell queue of lock owner is same trans + * and ACC_LOCK_MODE is exlusive, then we can proceed + */ + Uint32 lastbits = lastOpPtr.p->m_op_bits; + bool same = operationRecPtr.p->is_same_trans(lastOpPtr.p); + if (same && (lastbits & Operationrec::OP_ACC_LOCK_MODE)) + { jam(); - /* --------------------------------------------------------------------------------- */ - /* WE COULD NOT PUT IN ANY PARALLEL QUEUE. WE MUST PUT IT LAST IN THE SERIAL QUEUE. */ - /* --------------------------------------------------------------------------------- */ - readWriteOpPtr.p->nextSerialQue = operationRecPtr.i; - operationRecPtr.p->prevSerialQue = readWriteOpPtr.i; - return; - }//if - readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue; - ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec); - if (getNoParallelTransaction(readWriteOpPtr.p) == 1) { - /* --------------------------------------------------------------------------------- */ - /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */ - /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */ - /* --------------------------------------------------------------------------------- */ - if ((readWriteOpPtr.p->transId1 == operationRecPtr.p->transId1) && - (readWriteOpPtr.p->transId2 == operationRecPtr.p->transId2)) { + goto checkop; + } + + if ((lastbits & Operationrec::OP_ACC_LOCK_MODE) && !same) + { + jam(); + /** + * Last op in serial queue had X-lock and was not our transaction... + */ + goto serial; + } + + if (lockOwnerPtr.p->nextSerialQue == RNIL) + { + jam(); + goto checkop; + } + + /** + * Scan parallell queue to see if we are already there... + */ + do + { + ptrCheckGuard(loopPtr, coprecsize, operationrec); + if (loopPtr.p->is_same_trans(operationRecPtr.p)) + goto checkop; + loopPtr.i = loopPtr.p->nextParallelQue; + } while (loopPtr.i != RNIL); + +serial: + placeSerialQueue(lockOwnerPtr, operationRecPtr); + + validate_lock_queue(lockOwnerPtr); + + return ZSERIAL_QUEUE; + +checkop: + Uint32 lstate = lastbits & Operationrec::OP_STATE_MASK; + + Uint32 retValue = ZSERIAL_QUEUE; // So that it gets blocked... + if (lstate == Operationrec::OP_STATE_EXECUTED) + { + jam(); + + /** + * NOTE. No checking op operation types, as one can read different save + * points... + */ + +#if 0 + /** + * Since last operation has executed...we can now check operation types + * if not, we have to wait until it has executed + */ + if (lop == ZDELETE) + { jam(); - /* --------------------------------------------------------------------------------- */ - /* WE ARE PERFORMING AN UPDATE OR DELETE IN THE SAME TRANSACTION WHERE WE ALREADY */ - /* PREVIOUSLY HAVE EXECUTED AN OPERATION. INSERT-DELETE, READ-UPDATE, READ-READ, */ - /* UPDATE-UPDATE, UPDATE-DELETE, READ-DELETE, INSERT-READ, INSERT-UPDATE ARE ALLOWED */ - /* COMBINATIONS. A NEW INSERT AFTER A DELETE IS NOT ALLOWED AND SUCH AN INSERT WILL */ - /* GO TO THE SERIAL LOCK QUEUE WHICH IT WILL NOT LEAVE UNTIL A TIME-OUT AND THE */ - /* TRANSACTION IS ABORTED. READS AND UPDATES AFTER DELETES IS ALSO NOT ALLOWED. */ - /* --------------------------------------------------------------------------------- */ - mlpqOperPtr = readWriteOpPtr; - moveLastParallelQueueWrite(signal); - readWriteOpPtr = mlpqOperPtr; - operationRecPtr.p->prevParallelQue = readWriteOpPtr.i; - readWriteOpPtr.p->nextParallelQue = operationRecPtr.i; - operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0]; - operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1]; - return; - }//if - }//if - goto PSQW_LOOP; -}//Dbacc::placeSerialQueueWrite() + return ZREAD_ERROR; + } +#endif + + opbits |= Operationrec::OP_STATE_RUNNING; + operationRecPtr.p->localdata[0] = lastOpPtr.p->localdata[0]; + operationRecPtr.p->localdata[1] = lastOpPtr.p->localdata[1]; + retValue = ZPARALLEL_QUEUE; + } + opbits |= (lastbits & Operationrec::OP_ACC_LOCK_MODE); + opbits |= Operationrec::OP_RUN_QUEUE; + operationRecPtr.p->m_op_bits = opbits; + + operationRecPtr.p->prevParallelQue = lastOpPtr.i; + operationRecPtr.p->m_lock_owner_ptr_i = lockOwnerPtr.i; + lastOpPtr.p->nextParallelQue = operationRecPtr.i; + lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i = operationRecPtr.i; + + validate_lock_queue(lockOwnerPtr); + + return retValue; +}//Dbacc::placeReadInLockQueue + +void Dbacc::placeSerialQueue(OperationrecPtr lockOwnerPtr, + OperationrecPtr opPtr) +{ + OperationrecPtr lastOpPtr; + lastOpPtr.i = lockOwnerPtr.p->m_lo_last_serial_op_ptr_i; + + if (lastOpPtr.i == RNIL) + { + // Lock owner is last... + ndbrequire(lockOwnerPtr.p->nextSerialQue == RNIL); + lastOpPtr = lockOwnerPtr; + } + else + { + ptrCheckGuard(lastOpPtr, coprecsize, operationrec); + } + + operationRecPtr.p->prevSerialQue = lastOpPtr.i; + lastOpPtr.p->nextSerialQue = opPtr.i; + lockOwnerPtr.p->m_lo_last_serial_op_ptr_i = opPtr.i; +} /* ------------------------------------------------------------------------- */ /* ACC KEYREQ END */ /* ------------------------------------------------------------------------- */ void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code) { - operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT; + operationRecPtr.p->m_op_bits = ~0; /* ************************<< */ /* ACCKEYREF */ /* ************************<< */ @@ -1639,15 +2241,15 @@ void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code) return; }//Dbacc::acckeyref1Lab() -/* ******************--------------------------------------------------------------- */ -/* ACCMINUPDATE UPDATE LOCAL KEY REQ */ -/* DESCRIPTION: UPDATES LOCAL KEY OF AN ELEMENTS IN THE HASH TABLE */ -/* THIS SIGNAL IS WAITED AFTER ANY INSERT REQ */ -/* ENTER ACCMINUPDATE WITH SENDER: LQH, LEVEL B */ -/* OPERATION_REC_PTR, OPERATION RECORD PTR */ -/* CLOCALKEY(0), LOCAL KEY 1 */ -/* CLOCALKEY(1) LOCAL KEY 2 */ -/* ******************--------------------------------------------------------------- */ +/* ******************----------------------------------------------------- */ +/* ACCMINUPDATE UPDATE LOCAL KEY REQ */ +/* DESCRIPTION: UPDATES LOCAL KEY OF AN ELEMENTS IN THE HASH TABLE */ +/* THIS SIGNAL IS WAITED AFTER ANY INSERT REQ */ +/* ENTER ACCMINUPDATE WITH SENDER: LQH, LEVEL B */ +/* OPERATION_REC_PTR, OPERATION RECORD PTR */ +/* CLOCALKEY(0), LOCAL KEY 1 */ +/* CLOCALKEY(1) LOCAL KEY 2 */ +/* ******************----------------------------------------------------- */ void Dbacc::execACCMINUPDATE(Signal* signal) { Page8Ptr ulkPageidptr; @@ -1660,19 +2262,26 @@ void Dbacc::execACCMINUPDATE(Signal* signal) tlocalkey1 = signal->theData[1]; tlocalkey2 = signal->theData[2]; ptrCheckGuard(operationRecPtr, coprecsize, operationrec); - if (operationRecPtr.p->transactionstate == ACTIVE) { - fragrecptr.i = operationRecPtr.p->fragptr; - ulkPageidptr.i = operationRecPtr.p->elementPage; - tulkLocalPtr = operationRecPtr.p->elementPointer + operationRecPtr.p->elementIsforward; + Uint32 opbits = operationRecPtr.p->m_op_bits; + fragrecptr.i = operationRecPtr.p->fragptr; + ulkPageidptr.i = operationRecPtr.p->elementPage; + tulkLocalPtr = operationRecPtr.p->elementPointer + + operationRecPtr.p->elementIsforward; + + if ((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_RUNNING) + { ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); ptrCheckGuard(ulkPageidptr, cpagesize, page8); dbgWord32(ulkPageidptr, tulkLocalPtr, tlocalkey1); arrGuard(tulkLocalPtr, 2048); ulkPageidptr.p->word32[tulkLocalPtr] = tlocalkey1; operationRecPtr.p->localdata[0] = tlocalkey1; - if (fragrecptr.p->localkeylen == 1) { + if (likely(fragrecptr.p->localkeylen == 1)) + { return; - } else if (fragrecptr.p->localkeylen == 2) { + } + else if (fragrecptr.p->localkeylen == 2) + { jam(); tulkLocalPtr = tulkLocalPtr + operationRecPtr.p->elementIsforward; operationRecPtr.p->localdata[1] = tlocalkey2; @@ -1696,15 +2305,17 @@ void Dbacc::execACC_COMMITREQ(Signal* signal) { Uint8 Toperation; jamEntry(); - operationRecPtr.i = signal->theData[0]; + Uint32 tmp = operationRecPtr.i = signal->theData[0]; ptrCheckGuard(operationRecPtr, coprecsize, operationrec); - ndbrequire(operationRecPtr.p->transactionstate == ACTIVE); + void* ptr = operationRecPtr.p; + Uint32 opbits = operationRecPtr.p->m_op_bits; fragrecptr.i = operationRecPtr.p->fragptr; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); + Toperation = opbits & Operationrec::OP_MASK; commitOperation(signal); - Toperation = operationRecPtr.p->operation; - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; + ndbassert(operationRecPtr.i == tmp); + ndbassert(operationRecPtr.p == ptr); + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; if(Toperation != ZREAD){ fragrecptr.p->m_commit_count++; if (Toperation != ZINSERT) { @@ -1713,7 +2324,7 @@ void Dbacc::execACC_COMMITREQ(Signal* signal) } else { jam(); fragrecptr.p->noOfElements--; - fragrecptr.p->slack += operationRecPtr.p->insertDeleteLen; + fragrecptr.p->slack += fragrecptr.p->elementLength; if (fragrecptr.p->slack > fragrecptr.p->slackCheck) { /* TIME FOR JOIN BUCKETS PROCESS */ if (fragrecptr.p->expandCounter > 0) { @@ -1732,7 +2343,7 @@ void Dbacc::execACC_COMMITREQ(Signal* signal) } else { jam(); /* EXPAND PROCESS HANDLING */ fragrecptr.p->noOfElements++; - fragrecptr.p->slack -= operationRecPtr.p->insertDeleteLen; + fragrecptr.p->slack -= fragrecptr.p->elementLength; if (fragrecptr.p->slack >= (1u << 31)) { /* IT MEANS THAT IF SLACK < ZERO */ if (fragrecptr.p->expandFlag == 0) { @@ -1749,45 +2360,57 @@ void Dbacc::execACC_COMMITREQ(Signal* signal) return; }//Dbacc::execACC_COMMITREQ() -/* ******************--------------------------------------------------------------- */ -/* ACC ABORT REQ ABORT ALL OPERATION OF THE TRANSACTION */ -/* ******************------------------------------+ */ -/* SENDER: LQH, LEVEL B */ -/* ******************--------------------------------------------------------------- */ -/* ACC ABORT REQ ABORT TRANSACTION */ -/* ******************------------------------------+ */ +/* ******************------------------------------------------------------- */ +/* ACC ABORT REQ ABORT ALL OPERATION OF THE TRANSACTION */ +/* ******************------------------------------+ */ +/* SENDER: LQH, LEVEL B */ +/* ******************------------------------------------------------------- */ +/* ACC ABORT REQ ABORT TRANSACTION */ +/* ******************------------------------------+ */ /* SENDER: LQH, LEVEL B */ void Dbacc::execACC_ABORTREQ(Signal* signal) { jamEntry(); - accAbortReqLab(signal); -}//Dbacc::execACC_ABORTREQ() - -void Dbacc::accAbortReqLab(Signal* signal) -{ operationRecPtr.i = signal->theData[0]; - bool sendConf = signal->theData[1]; + Uint32 sendConf = signal->theData[1]; ptrCheckGuard(operationRecPtr, coprecsize, operationrec); + fragrecptr.i = operationRecPtr.p->fragptr; + Uint32 opbits = operationRecPtr.p->m_op_bits; + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; tresult = 0; /* ZFALSE */ - if ((operationRecPtr.p->transactionstate == ACTIVE) || - (operationRecPtr.p->transactionstate == WAIT_COMMIT_ABORT)) { + + if (opbits == Operationrec::OP_EXECUTED_DIRTY_READ) + { + jam(); + } + else if (opstate == Operationrec::OP_STATE_EXECUTED || + opstate == Operationrec::OP_STATE_WAITING || + opstate == Operationrec::OP_STATE_RUNNING) + { jam(); - fragrecptr.i = operationRecPtr.p->fragptr; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); - operationRecPtr.p->transactionstate = ABORT; abortOperation(signal); - } else { - ndbrequire(operationRecPtr.p->transactionstate == IDLE); - jam(); - }//if - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; - if (! sendConf) - return; + } + + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; + signal->theData[0] = operationRecPtr.p->userptr; - sendSignal(operationRecPtr.p->userblockref, GSN_ACC_ABORTCONF, signal, 1, JBB); - return; -}//Dbacc::accAbortReqLab() + signal->theData[1] = 0; + switch(sendConf){ + case 0: + return; + case 2: + if (opstate != Operationrec::OP_STATE_RUNNING) + { + return; + } + case 1: + sendSignal(operationRecPtr.p->userblockref, GSN_ACC_ABORTCONF, + signal, 1, JBB); + } + + signal->theData[1] = RNIL; +} /* * Lock or unlock tuple. @@ -1828,8 +2451,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) // init as in ACCSEIZEREQ operationRecPtr.p->userptr = req->userPtr; operationRecPtr.p->userblockref = req->userRef; - operationRecPtr.p->operation = ZUNDEFINED_OP; - operationRecPtr.p->transactionstate = IDLE; + operationRecPtr.p->m_op_bits = ~0; operationRecPtr.p->scanRecPtr = RNIL; // do read with lock via ACCKEYREQ Uint32 lockMode = (lockOp == AccLockReq::LockShared) ? 0 : 1; @@ -1880,8 +2502,8 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) jam(); // do abort via ACC_ABORTREQ (immediate) signal->theData[0] = req->accOpPtr; - signal->theData[1] = false; // Dont send abort - accAbortReqLab(signal); + signal->theData[1] = 0; // Dont send abort + execACC_ABORTREQ(signal); releaseOpRec(signal); req->returnCode = AccLockReq::Success; *sig = *req; @@ -1891,8 +2513,8 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) jam(); // do abort via ACC_ABORTREQ (with conf signal) signal->theData[0] = req->accOpPtr; - signal->theData[1] = true; // send abort - accAbortReqLab(signal); + signal->theData[1] = 1; // send abort + execACC_ABORTREQ(signal); releaseOpRec(signal); req->returnCode = AccLockReq::Success; *sig = *req; @@ -2551,16 +3173,29 @@ void Dbacc::getdirindex(Signal* signal) }//Dbacc::getdirindex() Uint32 -Dbacc::readTablePk(Uint32 localkey1) +Dbacc::readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec* op) { + int ret; Uint32 tableId = fragrecptr.p->myTableId; Uint32 fragId = fragrecptr.p->myfid; - Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS; - Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1); + bool xfrm = fragrecptr.p->hasCharAttr; + #ifdef VM_TRACE memset(ckeys, 0x1f, (fragrecptr.p->keyLength * MAX_XFRM_MULTIPLY) << 2); #endif - int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys, true); + + if (likely(localkey1 != ~(Uint32)0)) + { + Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS; + Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1); + ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, + ckeys, true); + } + else + { + ndbrequire(ElementHeader::getLocked(eh)); + ret = c_lqh->readPrimaryKeys(op->userptr, ckeys, xfrm); + } jamEntry(); ndbrequire(ret >= 0); return ret; @@ -2594,11 +3229,12 @@ void ndb_acc_ia64_icc810_dummy_func() #endif #endif -void Dbacc::getElement(Signal* signal) +Uint32 +Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr) { + Uint32 errcode; DirRangePtr geOverflowrangeptr; DirectoryarrayPtr geOverflowDirptr; - OperationrecPtr geTmpOperationRecPtr; Uint32 tgeElementHeader; Uint32 tgeElemStep; Uint32 tgeContainerhead; @@ -2613,7 +3249,6 @@ void Dbacc::getElement(Signal* signal) getdirindex(signal); tgePageindex = tgdiPageindex; gePageptr = gdiPageptr; - tgeResult = ZFALSE; /* * The value seached is * - table key for ACCKEYREQ, stored in TUP @@ -2623,7 +3258,6 @@ void Dbacc::getElement(Signal* signal) ndbrequire(TelemLen == ZELEM_HEAD_SIZE + fragrecptr.p->localkeylen); tgeNextptrtype = ZLEFT; - tgeLocked = 0; const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits; const Uint32 opHashValuePart = (operationRecPtr.p->hashValue >> tmp) &0xFFFF; @@ -2636,9 +3270,17 @@ void Dbacc::getElement(Signal* signal) tgeKeyptr = (tgeElementptr + ZELEM_HEAD_SIZE) + fragrecptr.p->localkeylen; tgeElemStep = TelemLen; tgeForward = 1; - if (tgeContainerptr >= 2048) { ACCKEY_error(4); return;} + if (unlikely(tgeContainerptr >= 2048)) + { + errcode = 4; + goto error; + } tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26; - if ((tgeContainerptr + tgeRemLen - 1) >= 2048) { ACCKEY_error(5); return;} + if (unlikely(((tgeContainerptr + tgeRemLen - 1) >= 2048))) + { + errcode = 5; + goto error; + } } else if (tgeNextptrtype == ZRIGHT) { jam(); tgeContainerptr = tgeContainerptr + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE); @@ -2646,29 +3288,42 @@ void Dbacc::getElement(Signal* signal) tgeKeyptr = (tgeElementptr - ZELEM_HEAD_SIZE) - fragrecptr.p->localkeylen; tgeElemStep = 0 - TelemLen; tgeForward = (Uint32)-1; - if (tgeContainerptr >= 2048) { ACCKEY_error(4); return;} + if (unlikely(tgeContainerptr >= 2048)) + { + errcode = 4; + goto error; + } tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26; - if ((tgeContainerptr - tgeRemLen) >= 2048) { ACCKEY_error(5); return;} + if (unlikely((tgeContainerptr - tgeRemLen) >= 2048)) + { + errcode = 5; + goto error; + } } else { - ACCKEY_error(6); return; + errcode = 6; + goto error; }//if if (tgeRemLen >= ZCON_HEAD_SIZE + TelemLen) { - if (tgeRemLen > ZBUF_SIZE) { - ACCKEY_error(7); return; + if (unlikely(tgeRemLen > ZBUF_SIZE)) + { + errcode = 7; + goto error; }//if - /* --------------------------------------------------------------------------------- */ - // There is at least one element in this container. Check if it is the element - // searched for. - /* --------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------- */ + // There is at least one element in this container. + // Check if it is the element searched for. + /* ------------------------------------------------------------------- */ do { tgeElementHeader = gePageptr.p->word32[tgeElementptr]; tgeRemLen = tgeRemLen - TelemLen; Uint32 hashValuePart; + lockOwnerPtr.i = RNIL; + lockOwnerPtr.p = NULL; if (ElementHeader::getLocked(tgeElementHeader)) { jam(); - geTmpOperationRecPtr.i = ElementHeader::getOpPtrI(tgeElementHeader); - ptrCheckGuard(geTmpOperationRecPtr, coprecsize, operationrec); - hashValuePart = geTmpOperationRecPtr.p->hashvaluePart; + lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader); + ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec); + hashValuePart = lockOwnerPtr.p->hashvaluePart; } else { jam(); hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader); @@ -2678,21 +3333,21 @@ void Dbacc::getElement(Signal* signal) Uint32 localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward]; Uint32 localkey2 = 0; bool found; - if (! searchLocalKey) { - Uint32 len = readTablePk(localkey1); + if (! searchLocalKey) + { + Uint32 len = readTablePk(localkey1, tgeElementptr, lockOwnerPtr.p); found = (len == operationRecPtr.p->xfrmtupkeylen) && (memcmp(Tkeydata, ckeys, len << 2) == 0); } else { jam(); found = (localkey1 == Tkeydata[0]); } - if (found) { + if (found) + { jam(); - tgeLocked = ElementHeader::getLocked(tgeElementHeader); - tgeResult = ZTRUE; operationRecPtr.p->localdata[0] = localkey1; operationRecPtr.p->localdata[1] = localkey2; - return; + return ZTRUE; } } if (tgeRemLen <= ZCON_HEAD_SIZE) { @@ -2701,18 +3356,22 @@ void Dbacc::getElement(Signal* signal) tgeElementptr = tgeElementptr + tgeElemStep; } while (true); }//if - if (tgeRemLen != ZCON_HEAD_SIZE) { - ACCKEY_error(8); return; + if (unlikely(tgeRemLen != ZCON_HEAD_SIZE)) + { + errcode = 8; + goto error; }//if tgeContainerhead = gePageptr.p->word32[tgeContainerptr]; tgeNextptrtype = (tgeContainerhead >> 7) & 0x3; if (tgeNextptrtype == 0) { jam(); - return; /* NO MORE CONTAINER */ + return ZFALSE; /* NO MORE CONTAINER */ }//if tgePageindex = tgeContainerhead & 0x7f; /* NEXT CONTAINER PAGE INDEX 7 BITS */ - if (tgePageindex > ZEMPTYLIST) { - ACCKEY_error(9); return; + if (unlikely(tgePageindex > ZEMPTYLIST)) + { + errcode = 9; + goto error; }//if if (((tgeContainerhead >> 9) & 1) == ZFALSE) { jam(); @@ -2726,55 +3385,72 @@ void Dbacc::getElement(Signal* signal) ptrCheckGuard(gePageptr, cpagesize, page8); }//if } while (1); - return; + + return ZFALSE; + +error: + ACCKEY_error(errcode); + return ~0; }//Dbacc::getElement() -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* */ -/* END OF GET_ELEMENT MODULE */ -/* */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* */ -/* MODULE: DELETE */ -/* */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* --------------------------------------------------------------------------------- */ -/* COMMITDELETE */ -/* INPUT: OPERATION_REC_PTR, PTR TO AN OPERATION RECORD. */ -/* FRAGRECPTR, PTR TO A FRAGMENT RECORD */ -/* */ -/* OUTPUT: */ -/* NONE */ -/* DESCRIPTION: DELETE OPERATIONS WILL BE COMPLETED AT THE COMMIT OF TRANSA- */ -/* CTION. THIS SUBROUTINE SEARCHS FOR ELEMENT AND DELETES IT. IT DOES SO BY */ -/* REPLACING IT WITH THE LAST ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT */ -/* IS ALSO THE LAST ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT. */ -/* --------------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* */ +/* END OF GET_ELEMENT MODULE */ +/* */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* */ +/* MODULE: DELETE */ +/* */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* COMMITDELETE */ +/* INPUT: OPERATION_REC_PTR, PTR TO AN OPERATION RECORD. */ +/* FRAGRECPTR, PTR TO A FRAGMENT RECORD */ +/* */ +/* OUTPUT: */ +/* NONE */ +/* DESCRIPTION: DELETE OPERATIONS WILL BE COMPLETED AT THE + * COMMIT OF TRANSACTION. THIS SUBROUTINE SEARCHS FOR ELEMENT AND + * DELETES IT. IT DOES SO BY REPLACING IT WITH THE LAST + * ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT IS ALSO THE LAST + * ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT + * ------------------------------------------------------------------------- */ +void +Dbacc::report_dealloc(Signal* signal, const Operationrec* opPtrP) +{ + Uint32 localKey = opPtrP->localdata[0]; + Uint32 opbits = opPtrP->m_op_bits; + Uint32 userptr= opPtrP->userptr; + Uint32 scanInd = + ((opbits & Operationrec::OP_MASK) == ZSCAN_OP) || + (opbits & Operationrec::OP_LOCK_REQ); + + if (localKey != ~(Uint32)0) + { + signal->theData[0] = fragrecptr.p->myfid; + signal->theData[1] = fragrecptr.p->myTableId; + Uint32 pageId = localKey >> MAX_TUPLES_BITS; + Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1); + signal->theData[2] = pageId; + signal->theData[3] = pageIndex; + signal->theData[4] = userptr; + signal->theData[5] = scanInd; + EXECUTE_DIRECT(DBLQH, GSN_TUP_DEALLOCREQ, signal, 6); + jamEntry(); + } +} + void Dbacc::commitdelete(Signal* signal) { jam(); - Uint32 localKey = operationRecPtr.p->localdata[0]; - Uint32 userptr= operationRecPtr.p->userptr; - Uint32 scanInd = operationRecPtr.p->operation == ZSCAN_OP - || operationRecPtr.p->isAccLockReq; - - signal->theData[0] = fragrecptr.p->myfid; - signal->theData[1] = fragrecptr.p->myTableId; - Uint32 pageId = localKey >> MAX_TUPLES_BITS; - Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1); - signal->theData[2] = pageId; - signal->theData[3] = pageIndex; - signal->theData[4] = userptr; - signal->theData[5] = scanInd; - EXECUTE_DIRECT(DBLQH, GSN_TUP_DEALLOCREQ, signal, 6); - jamEntry(); + report_dealloc(signal, operationRecPtr.p); getdirindex(signal); tlastPageindex = tgdiPageindex; @@ -3263,31 +3939,316 @@ void Dbacc::checkoverfreelist(Signal* signal) /*BE CHECKED. THE OPERATION RECORD WILL BE REMOVED FROM THE QUEUE IF IT */ /*BELONGED TO ANY ONE, OTHERWISE THE ELEMENT HEAD WILL BE UPDATED. */ /* ------------------------------------------------------------------------- */ + +/** + * + * P0 - P1 - P2 - P3 + * S0 + * S1 + * S2 + */ +void +Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr) +{ + OperationrecPtr nextP; + OperationrecPtr prevP; + OperationrecPtr loPtr; + + Uint32 opbits = opPtr.p->m_op_bits; + Uint32 opstate = opbits & Operationrec::OP_STATE_MASK; + nextP.i = opPtr.p->nextParallelQue; + prevP.i = opPtr.p->prevParallelQue; + loPtr.i = opPtr.p->m_lock_owner_ptr_i; + + ndbassert(! (opbits & Operationrec::OP_LOCK_OWNER)); + ndbassert(opbits & Operationrec::OP_RUN_QUEUE); + + ptrCheckGuard(prevP, coprecsize, operationrec); + ndbassert(prevP.p->nextParallelQue == opPtr.i); + prevP.p->nextParallelQue = nextP.i; + + if (nextP.i != RNIL) + { + ptrCheckGuard(nextP, coprecsize, operationrec); + ndbassert(nextP.p->prevParallelQue == opPtr.i); + nextP.p->prevParallelQue = prevP.i; + } + else if (prevP.i != loPtr.i) + { + jam(); + ptrCheckGuard(loPtr, coprecsize, operationrec); + ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + ndbassert(loPtr.p->m_lo_last_parallel_op_ptr_i == opPtr.i); + loPtr.p->m_lo_last_parallel_op_ptr_i = prevP.i; + prevP.p->m_lock_owner_ptr_i = loPtr.i; + + /** + * Abort P3...check start next + */ + startNext(signal, prevP); + validate_lock_queue(prevP); + return; + } + else + { + jam(); + ndbassert(prevP.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + prevP.p->m_lo_last_parallel_op_ptr_i = RNIL; + startNext(signal, prevP); + validate_lock_queue(prevP); + return; + } + + if (opbits & Operationrec::OP_LOCK_MODE) + { + Uint32 nextbits = nextP.p->m_op_bits; + while ((nextbits & Operationrec::OP_LOCK_MODE) == 0) + { + ndbassert(nextbits & Operationrec::OP_ACC_LOCK_MODE); + nextbits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE; + nextP.p->m_op_bits = nextbits; + + if (nextP.p->nextParallelQue != RNIL) + { + nextP.i = nextP.p->nextParallelQue; + ptrCheckGuard(nextP, coprecsize, operationrec); + nextbits = nextP.p->m_op_bits; + } + else + { + break; + } + } + } + + /** + * Abort P1, P2 + */ + + /** + * Scan to last of run queue + */ + while (nextP.p->nextParallelQue != RNIL) + { + nextP.i = nextP.p->nextParallelQue; + ptrCheckGuard(nextP, coprecsize, operationrec); + } + +#ifdef VM_TRACE + loPtr.i = nextP.p->m_lock_owner_ptr_i; + ptrCheckGuard(loPtr, coprecsize, operationrec); + ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + ndbassert(loPtr.p->m_lo_last_parallel_op_ptr_i == nextP.i); +#endif + startNext(signal, nextP); + validate_lock_queue(nextP); + + return; +} + +void +Dbacc::abortSerieQueueOperation(Signal* signal, OperationrecPtr opPtr) +{ + OperationrecPtr prevS, nextS; + OperationrecPtr prevP, nextP; + OperationrecPtr loPtr; + + Uint32 opbits = opPtr.p->m_op_bits; + + prevS.i = opPtr.p->prevSerialQue; + nextS.i = opPtr.p->nextSerialQue; + + prevP.i = opPtr.p->prevParallelQue; + nextP.i = opPtr.p->nextParallelQue; + + ndbassert((opbits & Operationrec::OP_LOCK_OWNER) == 0); + ndbassert((opbits & Operationrec::OP_RUN_QUEUE) == 0); + + if (prevP.i != RNIL) + { + /** + * We're not list head... + */ + ptrCheckGuard(prevP, coprecsize, operationrec); + ndbassert(prevP.p->nextParallelQue == opPtr.i); + prevP.p->nextParallelQue = nextP.i; + + if (nextP.i != RNIL) + { + ptrCheckGuard(nextP, coprecsize, operationrec); + ndbassert(nextP.p->prevParallelQue == opPtr.i); + ndbassert((nextP.p->m_op_bits & Operationrec::OP_STATE_MASK) == + Operationrec::OP_STATE_WAITING); + nextP.p->prevParallelQue = prevP.i; + + if ((prevP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE) == 0 && + opbits & Operationrec::OP_LOCK_MODE) + { + /** + * Scan right in parallel queue to fix OP_ACC_LOCK_MODE + */ + while ((nextP.p->m_op_bits & Operationrec::OP_LOCK_MODE) == 0) + { + ndbassert(nextP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE); + nextP.p->m_op_bits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE; + nextP.i = nextP.p->nextParallelQue; + if (nextP.i == RNIL) + break; + ptrCheckGuard(nextP, coprecsize, operationrec); + } + } + } + validate_lock_queue(prevP); + return; + } + else + { + /** + * We're a list head + */ + ptrCheckGuard(prevS, coprecsize, operationrec); + ndbassert(prevS.p->nextSerialQue == opPtr.i); + + if (nextP.i != RNIL) + { + /** + * Promote nextP to list head + */ + ptrCheckGuard(nextP, coprecsize, operationrec); + ndbassert(nextP.p->prevParallelQue == opPtr.i); + prevS.p->nextSerialQue = nextP.i; + nextP.p->prevParallelQue = RNIL; + nextP.p->nextSerialQue = nextS.i; + if (nextS.i != RNIL) + { + jam(); + ptrCheckGuard(nextS, coprecsize, operationrec); + ndbassert(nextS.p->prevSerialQue == opPtr.i); + nextS.p->prevSerialQue = nextP.i; + validate_lock_queue(prevS); + return; + } + else + { + // nextS is RNIL, i.e we're last in serie queue... + // we must update lockOwner.m_lo_last_serial_op_ptr_i + loPtr = prevS; + while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0) + { + loPtr.i = loPtr.p->prevSerialQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + ndbassert(loPtr.p->m_lo_last_serial_op_ptr_i == opPtr.i); + loPtr.p->m_lo_last_serial_op_ptr_i = nextP.i; + validate_lock_queue(loPtr); + return; + } + } + + if (nextS.i == RNIL) + { + /** + * Abort S2 + */ + + // nextS is RNIL, i.e we're last in serie queue... + // and we have no parallel queue, + // we must update lockOwner.m_lo_last_serial_op_ptr_i + prevS.p->nextSerialQue = RNIL; + + loPtr = prevS; + while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0) + { + loPtr.i = loPtr.p->prevSerialQue; + ptrCheckGuard(loPtr, coprecsize, operationrec); + } + ndbassert(loPtr.p->m_lo_last_serial_op_ptr_i == opPtr.i); + if (prevS.i != loPtr.i) + { + jam(); + loPtr.p->m_lo_last_serial_op_ptr_i = prevS.i; + } + else + { + loPtr.p->m_lo_last_serial_op_ptr_i = RNIL; + } + validate_lock_queue(loPtr); + } + else if (nextP.i == RNIL) + { + ptrCheckGuard(nextS, coprecsize, operationrec); + ndbassert(nextS.p->prevSerialQue == opPtr.i); + prevS.p->nextSerialQue = nextS.i; + nextS.p->prevSerialQue = prevS.i; + + if (prevS.p->m_op_bits & Operationrec::OP_LOCK_OWNER) + { + /** + * Abort S0 + */ + OperationrecPtr lastOp; + lastOp.i = prevS.p->m_lo_last_parallel_op_ptr_i; + if (lastOp.i != RNIL) + { + jam(); + ptrCheckGuard(lastOp, coprecsize, operationrec); + ndbassert(lastOp.p->m_lock_owner_ptr_i = prevS.i); + } + else + { + jam(); + lastOp = prevS; + } + startNext(signal, lastOp); + validate_lock_queue(lastOp); + } + else + { + validate_lock_queue(prevS); + } + } + } +} + + void Dbacc::abortOperation(Signal* signal) { - OperationrecPtr aboOperRecPtr; - OperationrecPtr TaboOperRecPtr; - Page8Ptr aboPageidptr; - Uint32 taboElementptr; - Uint32 tmp2Olq; + Uint32 opbits = operationRecPtr.p->m_op_bits; + + validate_lock_queue(operationRecPtr); - if (operationRecPtr.p->lockOwner == ZTRUE) { + if (opbits & Operationrec::OP_LOCK_OWNER) + { takeOutLockOwnersList(signal, operationRecPtr); - if (operationRecPtr.p->insertIsDone == ZTRUE) { + opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER; + if (opbits & Operationrec::OP_INSERT_IS_DONE) + { jam(); - operationRecPtr.p->elementIsDisappeared = ZTRUE; + opbits |= Operationrec::OP_ELEMENT_DISAPPEARED; }//if - if ((operationRecPtr.p->nextParallelQue != RNIL) || - (operationRecPtr.p->nextSerialQue != RNIL)) { + operationRecPtr.p->m_op_bits = opbits; + const bool queue = (operationRecPtr.p->nextParallelQue != RNIL || + operationRecPtr.p->nextSerialQue != RNIL); + + if (queue) + { jam(); - releaselock(signal); - } else { - /* --------------------------------------------------------------------------------- */ - /* WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED. IF INSERT OR STANDBY */ - /* WE DELETE THE ELEMENT OTHERWISE WE REMOVE THE LOCK FROM THE ELEMENT. */ - /* --------------------------------------------------------------------------------- */ - if (operationRecPtr.p->elementIsDisappeared == ZFALSE) { + release_lockowner(signal, operationRecPtr, false); + } + else + { + /* ------------------------------------------------------------------- + * WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED. + * IF INSERT OR STANDBY WE DELETE THE ELEMENT OTHERWISE WE REMOVE + * THE LOCK FROM THE ELEMENT. + * ------------------------------------------------------------------ */ + if ((opbits & Operationrec::OP_ELEMENT_DISAPPEARED) == 0) + { jam(); + Page8Ptr aboPageidptr; + Uint32 taboElementptr; + Uint32 tmp2Olq; + taboElementptr = operationRecPtr.p->elementPointer; aboPageidptr.i = operationRecPtr.p->elementPage; tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart, @@ -3297,87 +4258,31 @@ void Dbacc::abortOperation(Signal* signal) arrGuard(taboElementptr, 2048); aboPageidptr.p->word32[taboElementptr] = tmp2Olq; return; - } else { + } + else + { jam(); commitdelete(signal); }//if }//if - } else { - /* --------------------------------------------------------------- */ - // We are not the lock owner. - /* --------------------------------------------------------------- */ - jam(); - if (operationRecPtr.p->prevParallelQue != RNIL) { - jam(); - /* ---------------------------------------------------------------------------------- */ - /* SINCE WE ARE NOT QUEUE LEADER WE NEED NOT CONSIDER IF THE ELEMENT IS TO BE DELETED.*/ - /* We will simply remove it from the parallel list without any other rearrangements. */ - /* ---------------------------------------------------------------------------------- */ - aboOperRecPtr.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue; - if (operationRecPtr.p->nextParallelQue != RNIL) { - jam(); - aboOperRecPtr.i = operationRecPtr.p->nextParallelQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue; - }//if - } else if (operationRecPtr.p->prevSerialQue != RNIL) { - /* ------------------------------------------------------------------------- */ - // We are not in the parallel queue owning the lock. Thus we are in another parallel - // queue longer down in the serial queue. We are however first since prevParallelQue - // == RNIL. - /* ------------------------------------------------------------------------- */ - if (operationRecPtr.p->nextParallelQue != RNIL) { - jam(); - /* ------------------------------------------------------------------------- */ - // We have an operation in the queue after us. We simply rearrange this parallel queue. - // The new leader of this parallel queue will be operation in the serial queue. - /* ------------------------------------------------------------------------- */ - aboOperRecPtr.i = operationRecPtr.p->nextParallelQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue; - aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue; - aboOperRecPtr.p->prevParallelQue = RNIL; // Queue Leader - if (operationRecPtr.p->nextSerialQue != RNIL) { - jam(); - TaboOperRecPtr.i = operationRecPtr.p->nextSerialQue; - ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec); - TaboOperRecPtr.p->prevSerialQue = aboOperRecPtr.i; - }//if - TaboOperRecPtr.i = operationRecPtr.p->prevSerialQue; - ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec); - TaboOperRecPtr.p->nextSerialQue = aboOperRecPtr.i; - } else { - jam(); - /* ------------------------------------------------------------------------- */ - // We are the only operation in this parallel queue. We will thus shrink the serial - // queue. - /* ------------------------------------------------------------------------- */ - aboOperRecPtr.i = operationRecPtr.p->prevSerialQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue; - if (operationRecPtr.p->nextSerialQue != RNIL) { - jam(); - aboOperRecPtr.i = operationRecPtr.p->nextSerialQue; - ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec); - aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue; - }//if - }//if - }//if - }//if - /* ------------------------------------------------------------------------- */ - // If prevParallelQue = RNIL and prevSerialQue = RNIL and we are not owner of the - // lock then we cannot be in any lock queue at all. - /* ------------------------------------------------------------------------- */ -}//Dbacc::abortOperation() + } + else if (opbits & Operationrec::OP_RUN_QUEUE) + { + abortParallelQueueOperation(signal, operationRecPtr); + } + else + { + abortSerieQueueOperation(signal, operationRecPtr); + } +} -void Dbacc::commitDeleteCheck() +void +Dbacc::commitDeleteCheck() { OperationrecPtr opPtr; OperationrecPtr lastOpPtr; OperationrecPtr deleteOpPtr; - bool elementDeleted = false; + Uint32 elementDeleted = 0; bool deleteCheckOngoing = true; Uint32 hashValue = 0; lastOpPtr = operationRecPtr; @@ -3390,7 +4295,9 @@ void Dbacc::commitDeleteCheck() }//while deleteOpPtr = lastOpPtr; do { - if (deleteOpPtr.p->operation == ZDELETE) { + Uint32 opbits = deleteOpPtr.p->m_op_bits; + Uint32 op = opbits & Operationrec::OP_MASK; + if (op == ZDELETE) { jam(); /* ------------------------------------------------------------------- * IF THE CURRENT OPERATION TO BE COMMITTED IS A DELETE OPERATION DUE TO @@ -3404,10 +4311,9 @@ void Dbacc::commitDeleteCheck() * DELETE-OPERATION THAT HAS A HASH VALUE. * ----------------------------------------------------------------- */ hashValue = deleteOpPtr.p->hashValue; - elementDeleted = true; + elementDeleted = Operationrec::OP_ELEMENT_DISAPPEARED; deleteCheckOngoing = false; - } else if ((deleteOpPtr.p->operation == ZREAD) || - (deleteOpPtr.p->operation == ZSCAN_OP)) { + } else if (op == ZREAD || op == ZSCAN_OP) { /* ------------------------------------------------------------------- * We are trying to find out whether the commit will in the end delete * the tuple. Normally the delete will be the last operation in the @@ -3418,7 +4324,7 @@ void Dbacc::commitDeleteCheck() * we have to continue scanning the list looking for a delete operation. */ deleteOpPtr.i = deleteOpPtr.p->prevParallelQue; - if (deleteOpPtr.i == RNIL) { + if (opbits & Operationrec::OP_LOCK_OWNER) { jam(); deleteCheckOngoing = false; } else { @@ -3437,14 +4343,14 @@ void Dbacc::commitDeleteCheck() opPtr = lastOpPtr; do { jam(); - opPtr.p->commitDeleteCheckFlag = ZTRUE; + opPtr.p->m_op_bits |= Operationrec::OP_COMMIT_DELETE_CHECK; if (elementDeleted) { jam(); - opPtr.p->elementIsDisappeared = ZTRUE; + opPtr.p->m_op_bits |= elementDeleted; opPtr.p->hashValue = hashValue; }//if opPtr.i = opPtr.p->prevParallelQue; - if (opPtr.i == RNIL) { + if (opPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) { jam(); break; }//if @@ -3460,14 +4366,13 @@ void Dbacc::commitDeleteCheck() /* ------------------------------------------------------------------------- */ void Dbacc::commitOperation(Signal* signal) { - OperationrecPtr tolqTmpPtr; - Page8Ptr coPageidptr; - Uint32 tcoElementptr; - Uint32 tmp2Olq; + validate_lock_queue(operationRecPtr); - if ((operationRecPtr.p->commitDeleteCheckFlag == ZFALSE) && - (operationRecPtr.p->operation != ZSCAN_OP) && - (operationRecPtr.p->operation != ZREAD)) { + Uint32 opbits = operationRecPtr.p->m_op_bits; + Uint32 op = opbits & Operationrec::OP_MASK; + ndbrequire((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_EXECUTED); + if ((opbits & Operationrec::OP_COMMIT_DELETE_CHECK) == 0 && (op != ZREAD)) + { jam(); /* This method is used to check whether the end result of the transaction will be to delete the tuple. In this case all operation will be marked @@ -3479,16 +4384,30 @@ void Dbacc::commitOperation(Signal* signal) lock is released. */ commitDeleteCheck(); + opbits = operationRecPtr.p->m_op_bits; }//if - if (operationRecPtr.p->lockOwner == ZTRUE) { + + ndbassert(opbits & Operationrec::OP_RUN_QUEUE); + + if (opbits & Operationrec::OP_LOCK_OWNER) + { takeOutLockOwnersList(signal, operationRecPtr); - if ((operationRecPtr.p->nextParallelQue == RNIL) && - (operationRecPtr.p->nextSerialQue == RNIL) && - (operationRecPtr.p->elementIsDisappeared == ZFALSE)) { + opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER; + operationRecPtr.p->m_op_bits = opbits; + + const bool queue = (operationRecPtr.p->nextParallelQue != RNIL || + operationRecPtr.p->nextSerialQue != RNIL); + + if (!queue && (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) == 0) + { /* - This is the normal path through the commit for operations owning the - lock without any queues and not a delete operation. - */ + * This is the normal path through the commit for operations owning the + * lock without any queues and not a delete operation. + */ + Page8Ptr coPageidptr; + Uint32 tcoElementptr; + Uint32 tmp2Olq; + coPageidptr.i = operationRecPtr.p->elementPage; tcoElementptr = operationRecPtr.p->elementPointer; tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart, @@ -3498,445 +4417,421 @@ void Dbacc::commitOperation(Signal* signal) arrGuard(tcoElementptr, 2048); coPageidptr.p->word32[tcoElementptr] = tmp2Olq; return; - } else if ((operationRecPtr.p->nextParallelQue != RNIL) || - (operationRecPtr.p->nextSerialQue != RNIL)) { + } + else if (queue) + { jam(); /* - The case when there is a queue lined up. - Release the lock and pass it to the next operation lined up. - */ - releaselock(signal); + * The case when there is a queue lined up. + * Release the lock and pass it to the next operation lined up. + */ + release_lockowner(signal, operationRecPtr, true); return; - } else { + } + else + { jam(); /* - No queue and elementIsDisappeared is true. We perform the actual delete - operation. - */ + * No queue and elementIsDisappeared is true. + * We perform the actual delete operation. + */ commitdelete(signal); return; }//if - } else { - /* - THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE - ELEMENT. - */ - ndbrequire(operationRecPtr.p->prevParallelQue != RNIL); + } + else + { + /** + * THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE + * ELEMENT. + */ jam(); - tolqTmpPtr.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec); - tolqTmpPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue; - if (operationRecPtr.p->nextParallelQue != RNIL) { + OperationrecPtr prev, next, lockOwner; + prev.i = operationRecPtr.p->prevParallelQue; + next.i = operationRecPtr.p->nextParallelQue; + lockOwner.i = operationRecPtr.p->m_lock_owner_ptr_i; + ptrCheckGuard(prev, coprecsize, operationrec); + + prev.p->nextParallelQue = next.i; + if (next.i != RNIL) + { jam(); - tolqTmpPtr.i = operationRecPtr.p->nextParallelQue; - ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec); - tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue; - }//if - + ptrCheckGuard(next, coprecsize, operationrec); + next.p->prevParallelQue = prev.i; + } + else if (prev.p->m_op_bits & Operationrec::OP_LOCK_OWNER) + { + jam(); + ndbassert(lockOwner.i == prev.i); + prev.p->m_lo_last_parallel_op_ptr_i = RNIL; + next = prev; + } + else + { + jam(); + /** + * Last operation in parallell queue + */ + ndbassert(prev.i != lockOwner.i); + ptrCheckGuard(lockOwner, coprecsize, operationrec); + ndbassert(lockOwner.p->m_op_bits & Operationrec::OP_LOCK_OWNER); + lockOwner.p->m_lo_last_parallel_op_ptr_i = prev.i; + prev.p->m_lock_owner_ptr_i = lockOwner.i; + next = prev; + } + /** * Check possible lock upgrade - * 1) Find lock owner - * 2) Count transactions in parallel que - * 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner) - * upgrade next serial */ - if(operationRecPtr.p->lockMode) + if(opbits & Operationrec::OP_ACC_LOCK_MODE) { jam(); + /** - * Committing a non shared operation can't lead to lock upgrade + * Not lock owner...committing a exclusive operation... + * + * e.g + * T1(R) T1(X) + * T2(R/X) + * + * If T1(X) commits T2(R/X) is not supposed to run + * as T1(R) should also commit + * + * e.g + * T1(R) T1(X) T1*(R) + * T2(R/X) + * + * If T1*(R) commits T2(R/X) is not supposed to run + * as T1(R),T2(x) should also commit */ + validate_lock_queue(prev); return; } - - OperationrecPtr lock_owner; - lock_owner.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(lock_owner, coprecsize, operationrec); - Uint32 transid[2] = { lock_owner.p->transId1, - lock_owner.p->transId2 }; - - - while(lock_owner.p->prevParallelQue != RNIL) + + /** + * We committed a shared lock + * Check if we can start next... + */ + while(next.p->nextParallelQue != RNIL) { - lock_owner.i = lock_owner.p->prevParallelQue; - ptrCheckGuard(lock_owner, coprecsize, operationrec); + jam(); + next.i = next.p->nextParallelQue; + ptrCheckGuard(next, coprecsize, operationrec); - if(lock_owner.p->transId1 != transid[0] || - lock_owner.p->transId2 != transid[1]) + if ((next.p->m_op_bits & Operationrec::OP_STATE_MASK) != + Operationrec::OP_STATE_EXECUTED) { jam(); - /** - * If more than 1 trans in lock queue -> no lock upgrade - */ return; } } - check_lock_upgrade(signal, lock_owner, operationRecPtr); + startNext(signal, next); + + validate_lock_queue(prev); } }//Dbacc::commitOperation() -void -Dbacc::check_lock_upgrade(Signal* signal, - OperationrecPtr lock_owner, - OperationrecPtr release_op) -{ - if((lock_owner.p->transId1 == release_op.p->transId1 && - lock_owner.p->transId2 == release_op.p->transId2) || - release_op.p->lockMode || - lock_owner.p->nextSerialQue == RNIL) - { - jam(); - /** - * No lock upgrade if same trans or lock owner has no serial queue - * or releasing non shared op - */ - return; - } - - OperationrecPtr next; - next.i = lock_owner.p->nextSerialQue; - ptrCheckGuard(next, coprecsize, operationrec); +void +Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit) +{ + OperationrecPtr nextP; + OperationrecPtr nextS; + OperationrecPtr newOwner; + OperationrecPtr lastP; - if(lock_owner.p->transId1 != next.p->transId1 || - lock_owner.p->transId2 != next.p->transId2) + Uint32 opbits = opPtr.p->m_op_bits; + nextP.i = opPtr.p->nextParallelQue; + nextS.i = opPtr.p->nextSerialQue; + lastP.i = opPtr.p->m_lo_last_parallel_op_ptr_i; + Uint32 lastS = opPtr.p->m_lo_last_serial_op_ptr_i; + + ndbassert(lastP.i != RNIL || lastS != RNIL); + ndbassert(nextP.i != RNIL || nextS.i != RNIL); + + enum { + NOTHING, + CHECK_LOCK_UPGRADE, + START_NEW + } action = NOTHING; + + if (nextP.i != RNIL) { jam(); - /** - * No lock upgrad if !same trans in serial queue - */ - return; + ptrCheckGuard(nextP, coprecsize, operationrec); + newOwner = nextP; + + if (lastP.i == newOwner.i) + { + newOwner.p->m_lo_last_parallel_op_ptr_i = RNIL; + lastP = nextP; + } + else + { + ptrCheckGuard(lastP, coprecsize, operationrec); + newOwner.p->m_lo_last_parallel_op_ptr_i = lastP.i; + lastP.p->m_lock_owner_ptr_i = newOwner.i; + } + + newOwner.p->m_lo_last_serial_op_ptr_i = lastS; + newOwner.p->nextSerialQue = nextS.i; + + if (nextS.i != RNIL) + { + jam(); + ptrCheckGuard(nextS, coprecsize, operationrec); + ndbassert(nextS.p->prevSerialQue == opPtr.i); + nextS.p->prevSerialQue = newOwner.i; + } + + if (commit) + { + if ((opbits & Operationrec::OP_ACC_LOCK_MODE) == ZREADLOCK) + { + jam(); + /** + * Lock owner...committing a shared operation... + * this can be a lock upgrade + * + * e.g + * T1(R) T2(R) + * T2(X) + * + * If T1(R) commits T2(X) is supposed to run + * + * e.g + * T1(X) T1(R) + * T2(R) + * + * If T1(X) commits, then T1(R) _should_ commit before T2(R) is + * allowed to proceed + */ + action = CHECK_LOCK_UPGRADE; + } + else + { + jam(); + newOwner.p->m_op_bits |= Operationrec::OP_LOCK_MODE; + } + } + else + { + /** + * Aborting an operation can *always* lead to lock upgrade + */ + action = CHECK_LOCK_UPGRADE; + + /** + * Update ACC_LOCK_MODE + */ + if (opbits & Operationrec::OP_LOCK_MODE) + { + Uint32 nextbits = nextP.p->m_op_bits; + while ((nextbits & Operationrec::OP_LOCK_MODE) == 0) + { + ndbassert(nextbits & Operationrec::OP_ACC_LOCK_MODE); + nextbits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE; + nextP.p->m_op_bits = nextbits; + + if (nextP.p->nextParallelQue != RNIL) + { + nextP.i = nextP.p->nextParallelQue; + ptrCheckGuard(nextP, coprecsize, operationrec); + nextbits = nextP.p->m_op_bits; + } + else + { + break; + } + } + } + } } - - if (getNoParallelTransaction(lock_owner.p) > 1) + else { jam(); - /** - * No lock upgrade if more than 1 transaction in parallell queue - */ - return; - } + ptrCheckGuard(nextS, coprecsize, operationrec); + newOwner = nextS; + + newOwner.p->m_op_bits |= Operationrec::OP_RUN_QUEUE; + + if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) + { + report_dealloc(signal, opPtr.p); + newOwner.p->localdata[0] = ~0; + } + else + { + jam(); + newOwner.p->localdata[0] = opPtr.p->localdata[0]; + newOwner.p->localdata[1] = opPtr.p->localdata[1]; + } + + lastP = newOwner; + while (lastP.p->nextParallelQue != RNIL) + { + lastP.i = lastP.p->nextParallelQue; + ptrCheckGuard(lastP, coprecsize, operationrec); + lastP.p->m_op_bits |= Operationrec::OP_RUN_QUEUE; + } + + if (newOwner.i != lastP.i) + { + jam(); + newOwner.p->m_lo_last_parallel_op_ptr_i = lastP.i; + } + else + { + jam(); + newOwner.p->m_lo_last_parallel_op_ptr_i = RNIL; + } - if (getNoParallelTransaction(next.p) > 1) - { - jam(); - /** - * No lock upgrade if more than 1 transaction in next's parallell queue - */ - return; + if (newOwner.i != lastS) + { + jam(); + newOwner.p->m_lo_last_serial_op_ptr_i = lastS; + } + else + { + jam(); + newOwner.p->m_lo_last_serial_op_ptr_i = RNIL; + } + + action = START_NEW; } - OperationrecPtr tmp; - tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue; - if(tmp.i != RNIL) - { - ptrCheckGuard(tmp, coprecsize, operationrec); - ndbassert(tmp.p->prevSerialQue == next.i); - tmp.p->prevSerialQue = lock_owner.i; - } - next.p->nextSerialQue = next.p->prevSerialQue = RNIL; + insertLockOwnersList(signal, newOwner); - // Find end of parallell que - tmp = lock_owner; - Uint32 lockMode = next.p->lockMode > lock_owner.p->lockMode ? - next.p->lockMode : lock_owner.p->lockMode; - while(tmp.p->nextParallelQue != RNIL) + /** + * Copy op info, and store op in element + * + */ { - jam(); - tmp.i = tmp.p->nextParallelQue; - tmp.p->lockMode = lockMode; - ptrCheckGuard(tmp, coprecsize, operationrec); - } - tmp.p->lockMode = lockMode; - - next.p->prevParallelQue = tmp.i; - tmp.p->nextParallelQue = next.i; - - OperationrecPtr save = operationRecPtr; - - Uint32 localdata[2]; - localdata[0] = lock_owner.p->localdata[0]; - localdata[1] = lock_owner.p->localdata[1]; - do { - next.p->localdata[0] = localdata[0]; - next.p->localdata[1] = localdata[1]; - next.p->lockMode = lockMode; - - operationRecPtr = next; - executeNextOperation(signal); - if (next.p->nextParallelQue != RNIL) + newOwner.p->elementPage = opPtr.p->elementPage; + newOwner.p->elementIsforward = opPtr.p->elementIsforward; + newOwner.p->elementPointer = opPtr.p->elementPointer; + newOwner.p->elementContainer = opPtr.p->elementContainer; + newOwner.p->scanBits = opPtr.p->scanBits; + newOwner.p->hashvaluePart = opPtr.p->hashvaluePart; + newOwner.p->m_op_bits |= (opbits & Operationrec::OP_ELEMENT_DISAPPEARED); + if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) { + /* ------------------------------------------------------------------- */ + // If the elementIsDisappeared is set then we know that the + // hashValue is also set since it always originates from a + // committing abort or a aborting insert. + // Scans do not initialise the hashValue and must have this + // value initialised if they are + // to successfully commit the delete. + /* ------------------------------------------------------------------- */ jam(); - next.i = next.p->nextParallelQue; - ptrCheckGuard(next, coprecsize, operationrec); - } else { - jam(); - break; + newOwner.p->hashValue = opPtr.p->hashValue; }//if - } while (1); + + Page8Ptr pagePtr; + pagePtr.i = newOwner.p->elementPage; + ptrCheckGuard(pagePtr, cpagesize, page8); + const Uint32 tmp = ElementHeader::setLocked(newOwner.i); + arrGuard(newOwner.p->elementPointer, 2048); + pagePtr.p->word32[newOwner.p->elementPointer] = tmp; + } - operationRecPtr = save; + switch(action){ + case NOTHING: + validate_lock_queue(newOwner); + return; + case START_NEW: + startNew(signal, newOwner); + validate_lock_queue(newOwner); + return; + case CHECK_LOCK_UPGRADE: + startNext(signal, lastP); + validate_lock_queue(lastP); + break; + } } -/* ------------------------------------------------------------------------- */ -/* RELEASELOCK */ -/* RESETS LOCK OF AN ELEMENT. */ -/* INFORMATION ABOUT THE ELEMENT IS SAVED IN THE OPERATION RECORD */ -/* THESE INFORMATION IS USED TO UPDATE HEADER OF THE ELEMENT */ -/* ------------------------------------------------------------------------- */ -void Dbacc::releaselock(Signal* signal) -{ - OperationrecPtr rloOperPtr; - OperationrecPtr trlOperPtr; - OperationrecPtr trlTmpOperPtr; - Uint32 TelementIsDisappeared; - - trlOperPtr.i = RNIL; - if (operationRecPtr.p->nextParallelQue != RNIL) { - jam(); - /** --------------------------------------------------------------------- - * NEXT OPERATION TAKES OVER THE LOCK. - * We will simply move the info from the leader - * to the new queue leader. - * -------------------------------------------------------------------- */ - trlOperPtr.i = operationRecPtr.p->nextParallelQue; - ptrCheckGuard(trlOperPtr, coprecsize, operationrec); - copyInOperPtr = trlOperPtr; - copyOperPtr = operationRecPtr; - copyOpInfo(signal); - trlOperPtr.p->prevParallelQue = RNIL; - if (operationRecPtr.p->nextSerialQue != RNIL) { - jam(); - /* ----------------------------------------------------------------- - * THERE IS A SERIAL QUEUE. MOVE IT FROM RELEASED OP REC TO THE - * NEW LOCK OWNER. - * ------------------------------------------------------------------ */ - trlOperPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue; - trlTmpOperPtr.i = trlOperPtr.p->nextSerialQue; - ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec); - trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i; - }//if - - check_lock_upgrade(signal, copyInOperPtr, operationRecPtr); - } else { - ndbrequire(operationRecPtr.p->nextSerialQue != RNIL); - jam(); - /** --------------------------------------------------------------------- - * THE PARALLEL QUEUE IS EMPTY AND THE SERIAL QUEUE IS NOT EMPTY. - * WE NEED TO REARRANGE LISTS AND START A NUMBER OF OPERATIONS. - * -------------------------------------------------------------------- */ - trlOperPtr.i = operationRecPtr.p->nextSerialQue; - ptrCheckGuard(trlOperPtr, coprecsize, operationrec); - copyOperPtr = operationRecPtr; - copyInOperPtr = trlOperPtr; - copyOpInfo(signal); - trlOperPtr.p->prevSerialQue = RNIL; - ndbrequire(trlOperPtr.p->prevParallelQue == RNIL); - /* --------------------------------------------------------------------- */ - /* WE HAVE MOVED TO THE NEXT PARALLEL QUEUE. WE MUST START ALL OF THOSE */ - /* OPERATIONS WHICH UP TILL NOW HAVE BEEN QUEUED WAITING FOR THE LOCK. */ - /* --------------------------------------------------------------------- */ - rloOperPtr = operationRecPtr; - trlTmpOperPtr = trlOperPtr; - TelementIsDisappeared = trlOperPtr.p->elementIsDisappeared; - Uint32 ThashValue = trlOperPtr.p->hashValue; - do { - /* ------------------------------------------------------------------ */ - // Ensure that all operations in the queue are assigned with the - // elementIsDisappeared to ensure that the element is removed after - // a previous delete. An insert does however revert this decision - // since the element is put back again. - // Local checkpoints complicate life here since they do not - // execute the next operation but simply change - // the state on the operation. - // We need to set-up the variable elementIsDisappeared - // properly even when local checkpoints and inserts/writes after - // deletes occur. - /* ------------------------------------------------------------------- */ - trlTmpOperPtr.p->elementIsDisappeared = TelementIsDisappeared; - if (TelementIsDisappeared == ZTRUE) { - /* ----------------------------------------------------------------- */ - // If the elementIsDisappeared is set then we know that the - // hashValue is also set since it always originates from a - // committing abort or a aborting insert. - // Scans do not initialise the hashValue and must have this - // value initialised if they are to successfully commit the delete. - /* ----------------------------------------------------------------- */ - jam(); - trlTmpOperPtr.p->hashValue = ThashValue; - }//if - trlTmpOperPtr.p->localdata[0] = trlOperPtr.p->localdata[0]; - trlTmpOperPtr.p->localdata[1] = trlOperPtr.p->localdata[1]; - /* ------------------------------------------------------------------- */ - // Restart the queued operation. - /* ------------------------------------------------------------------- */ - operationRecPtr = trlTmpOperPtr; - TelementIsDisappeared = executeNextOperation(signal); - ThashValue = operationRecPtr.p->hashValue; - if (trlTmpOperPtr.p->nextParallelQue != RNIL) { - jam(); - /* ----------------------------------------------------------------- */ - // We will continue with the next operation in the parallel - // queue and start this as well. - /* ----------------------------------------------------------------- */ - trlTmpOperPtr.i = trlTmpOperPtr.p->nextParallelQue; - ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec); - } else { - jam(); - break; - }//if - } while (1); - operationRecPtr = rloOperPtr; - }//if - - // Insert the next op into the lock owner list - insertLockOwnersList(signal, trlOperPtr); - return; -}//Dbacc::releaselock() - -/* --------------------------------------------------------------------------------- */ -/* COPY_OP_INFO */ -/* INPUT: COPY_IN_OPER_PTR AND COPY_OPER_PTR. */ -/* DESCRIPTION:INFORMATION ABOUT THE ELEMENT WILL BE MOVED FROM OPERATION */ -/* REC TO QUEUE OP REC. QUE OP REC TAKES OVER THE LOCK. */ -/* --------------------------------------------------------------------------------- */ -void Dbacc::copyOpInfo(Signal* signal) +void +Dbacc::startNew(Signal* signal, OperationrecPtr newOwner) { - Page8Ptr coiPageidptr; - - copyInOperPtr.p->elementPage = copyOperPtr.p->elementPage; - copyInOperPtr.p->elementIsforward = copyOperPtr.p->elementIsforward; - copyInOperPtr.p->elementContainer = copyOperPtr.p->elementContainer; - copyInOperPtr.p->elementPointer = copyOperPtr.p->elementPointer; - copyInOperPtr.p->scanBits = copyOperPtr.p->scanBits; - copyInOperPtr.p->hashvaluePart = copyOperPtr.p->hashvaluePart; - copyInOperPtr.p->elementIsDisappeared = copyOperPtr.p->elementIsDisappeared; - if (copyInOperPtr.p->elementIsDisappeared == ZTRUE) { - /* --------------------------------------------------------------------------------- */ - // If the elementIsDisappeared is set then we know that the hashValue is also set - // since it always originates from a committing abort or a aborting insert. Scans - // do not initialise the hashValue and must have this value initialised if they are - // to successfully commit the delete. - /* --------------------------------------------------------------------------------- */ - jam(); - copyInOperPtr.p->hashValue = copyOperPtr.p->hashValue; - }//if - coiPageidptr.i = copyOperPtr.p->elementPage; - ptrCheckGuard(coiPageidptr, cpagesize, page8); - const Uint32 tmp = ElementHeader::setLocked(copyInOperPtr.i); - dbgWord32(coiPageidptr, copyOperPtr.p->elementPointer, tmp); - arrGuard(copyOperPtr.p->elementPointer, 2048); - coiPageidptr.p->word32[copyOperPtr.p->elementPointer] = tmp; - copyInOperPtr.p->localdata[0] = copyOperPtr.p->localdata[0]; - copyInOperPtr.p->localdata[1] = copyOperPtr.p->localdata[1]; -}//Dbacc::copyOpInfo() + OperationrecPtr save = operationRecPtr; + operationRecPtr = newOwner; + + Uint32 opbits = newOwner.p->m_op_bits; + Uint32 op = opbits & Operationrec::OP_MASK; + Uint32 opstate = (opbits & Operationrec::OP_STATE_MASK); + ndbassert(opstate == Operationrec::OP_STATE_WAITING); + ndbassert(opbits & Operationrec::OP_LOCK_OWNER); + const bool deleted = opbits & Operationrec::OP_ELEMENT_DISAPPEARED; + Uint32 errCode = 0; + + opbits &= opbits & ~(Uint32)Operationrec::OP_STATE_MASK; + opbits |= Operationrec::OP_STATE_RUNNING; + + if (op == ZSCAN_OP && (opbits & Operationrec::OP_LOCK_REQ) == 0) + goto scan; -/* ******************--------------------------------------------------------------- */ -/* EXECUTE NEXT OPERATION */ -/* NEXT OPERATION IN A LOCK QUEUE WILL BE EXECUTED. */ -/* --------------------------------------------------------------------------------- */ -Uint32 Dbacc::executeNextOperation(Signal* signal) -{ - ndbrequire(operationRecPtr.p->transactionstate == ACTIVE); - if (operationRecPtr.p->elementIsDisappeared == ZTRUE) { - /* --------------------------------------------------------------------- */ - /* PREVIOUS OPERATION WAS DELETE OPERATION AND THE ELEMENT IS DELETED. */ - /* --------------------------------------------------------------------- */ - if (((operationRecPtr.p->operation != ZINSERT) && - (operationRecPtr.p->operation != ZWRITE)) || - (operationRecPtr.p->prevParallelQue != RNIL)) { - if (operationRecPtr.p->operation != ZSCAN_OP || - operationRecPtr.p->isAccLockReq) { - jam(); - /* ----------------------------------------------------------------- */ - // Updates and reads with a previous delete simply aborts with read - // error indicating that tuple did not exist. - // Also inserts and writes not being the first operation. - /* ----------------------------------------------------------------- */ - operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT; - signal->theData[0] = operationRecPtr.p->userptr; - signal->theData[1] = ZREAD_ERROR; - sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, - 2, JBB); - return operationRecPtr.p->elementIsDisappeared; - } else { - /* ----------------------------------------------------------------- */ - /* ABORT OF OPERATION NEEDED BUT THE OPERATION IS A - * SCAN => SPECIAL TREATMENT. - * IF THE SCAN WAITS IN QUEUE THEN WE MUST REMOVE THE OPERATION - * FROM THE SCAN LOCK QUEUE AND IF NO MORE OPERATIONS ARE QUEUED - * THEN WE SHOULD RESTART THE SCAN PROCESS. OTHERWISE WE SIMPLY - * RELEASE THE OPERATION AND DECREASE THE NUMBER OF LOCKS HELD. - * ----------------------------------------------------------------- */ - takeOutScanLockQueue(operationRecPtr.p->scanRecPtr); - putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr); - return operationRecPtr.p->elementIsDisappeared; - }//if - }//if - /* --------------------------------------------------------------------- */ - // Insert and writes can continue but need to be converted to inserts. - /* --------------------------------------------------------------------- */ - jam(); - operationRecPtr.p->elementIsDisappeared = ZFALSE; - operationRecPtr.p->operation = ZINSERT; - operationRecPtr.p->insertIsDone = ZTRUE; - } else if (operationRecPtr.p->operation == ZINSERT) { - bool abortFlag = true; - if (operationRecPtr.p->prevParallelQue != RNIL) { - OperationrecPtr prevOpPtr; - jam(); - prevOpPtr.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(prevOpPtr, coprecsize, operationrec); - if (prevOpPtr.p->operation == ZDELETE) { - jam(); - abortFlag = false; - }//if - }//if - if (abortFlag) { - jam(); - /* ------------------------------------------------------------------- */ - /* ELEMENT STILL REMAINS AND WE ARE TRYING TO INSERT IT AGAIN. */ - /* THIS IS CLEARLY NOT A GOOD IDEA. */ - /* ------------------------------------------------------------------- */ - operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT; - signal->theData[0] = operationRecPtr.p->userptr; - signal->theData[1] = ZWRITE_ERROR; - sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, - 2, JBB); - return operationRecPtr.p->elementIsDisappeared; - }//if - } - else if(operationRecPtr.p->operation == ZWRITE) + if (deleted) { jam(); - operationRecPtr.p->operation = ZUPDATE; - if (operationRecPtr.p->prevParallelQue != RNIL) { - OperationrecPtr prevOpPtr; - jam(); - prevOpPtr.i = operationRecPtr.p->prevParallelQue; - ptrCheckGuard(prevOpPtr, coprecsize, operationrec); - if (prevOpPtr.p->operation == ZDELETE) - { - jam(); - operationRecPtr.p->operation = ZINSERT; - } + if (op != ZINSERT && op != ZWRITE) + { + errCode = ZREAD_ERROR; + goto ref; } + + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits &= ~(Uint32)Operationrec::OP_ELEMENT_DISAPPEARED; + opbits |= (op = ZINSERT); + opbits |= Operationrec::OP_INSERT_IS_DONE; + goto conf; } - - if (operationRecPtr.p->operation == ZSCAN_OP && - ! operationRecPtr.p->isAccLockReq) { + else if (op == ZINSERT) + { jam(); - takeOutScanLockQueue(operationRecPtr.p->scanRecPtr); - putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr); - } else { + errCode = ZWRITE_ERROR; + goto ref; + } + else if (op == ZWRITE) + { jam(); - sendAcckeyconf(signal); - sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYCONF, - signal, 6, JBB); - }//if - return operationRecPtr.p->elementIsDisappeared; -}//Dbacc::executeNextOperation() + opbits &= ~(Uint32)Operationrec::OP_MASK; + opbits |= (op = ZUPDATE); + goto conf; + } + +conf: + newOwner.p->m_op_bits = opbits; + + sendAcckeyconf(signal); + sendSignal(newOwner.p->userblockref, GSN_ACCKEYCONF, + signal, 6, JBA); + + operationRecPtr = save; + return; + +scan: + jam(); + newOwner.p->m_op_bits = opbits; + + takeOutScanLockQueue(newOwner.p->scanRecPtr); + putReadyScanQueue(signal, newOwner.p->scanRecPtr); + + operationRecPtr = save; + return; + +ref: + newOwner.p->m_op_bits = opbits; + + signal->theData[0] = newOwner.p->userptr; + signal->theData[1] = errCode; + sendSignal(newOwner.p->userblockref, GSN_ACCKEYREF, signal, + 2, JBB); + + operationRecPtr = save; + return; +} /** * takeOutLockOwnersList @@ -3950,7 +4845,6 @@ void Dbacc::takeOutLockOwnersList(Signal* signal, { const Uint32 Tprev = outOperPtr.p->prevLockOwnerOp; const Uint32 Tnext = outOperPtr.p->nextLockOwnerOp; - #ifdef VM_TRACE // Check that operation is already in the list OperationrecPtr tmpOperPtr; @@ -3965,8 +4859,7 @@ void Dbacc::takeOutLockOwnersList(Signal* signal, ndbrequire(inList == true); #endif - ndbrequire(outOperPtr.p->lockOwner == ZTRUE); - outOperPtr.p->lockOwner = ZFALSE; + ndbassert(outOperPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER); // Fast path through the code for the common case. if ((Tprev == RNIL) && (Tnext == RNIL)) { @@ -4007,7 +4900,6 @@ void Dbacc::insertLockOwnersList(Signal* signal, const OperationrecPtr& insOperPtr) { OperationrecPtr tmpOperPtr; - #ifdef VM_TRACE // Check that operation is not already in list tmpOperPtr.i = fragrecptr.p->lockOwnersList; @@ -4017,12 +4909,12 @@ void Dbacc::insertLockOwnersList(Signal* signal, tmpOperPtr.i = tmpOperPtr.p->nextLockOwnerOp; } #endif + tmpOperPtr.i = fragrecptr.p->lockOwnersList; + + ndbrequire(! (insOperPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER)); - ndbrequire(insOperPtr.p->lockOwner == ZFALSE); - - insOperPtr.p->lockOwner = ZTRUE; + insOperPtr.p->m_op_bits |= Operationrec::OP_LOCK_OWNER; insOperPtr.p->prevLockOwnerOp = RNIL; - tmpOperPtr.i = fragrecptr.p->lockOwnersList; insOperPtr.p->nextLockOwnerOp = tmpOperPtr.i; fragrecptr.p->lockOwnersList = insOperPtr.i; @@ -5385,6 +6277,7 @@ void Dbacc::execNEXT_SCANREQ(Signal* signal) if (!scanPtr.p->scanReadCommittedFlag) { commitOperation(signal); }//if + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; takeOutActiveScanOp(signal); releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; @@ -5400,9 +6293,10 @@ void Dbacc::execNEXT_SCANREQ(Signal* signal) jam(); fragrecptr.i = scanPtr.p->activeLocalFrag; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); - /* --------------------------------------------------------------------------------- */ - /* THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL. RELESE ALL INVOLVED REC. */ - /* --------------------------------------------------------------------------------- */ + /* --------------------------------------------------------------------- + * THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL. + * RELESE ALL INVOLVED REC. + * ------------------------------------------------------------------- */ releaseScanLab(signal); return; break; @@ -5452,24 +6346,26 @@ void Dbacc::checkNextBucketLab(Signal* signal) scanPtr.p->nextBucketIndex++; if (scanPtr.p->scanBucketState == ScanRec::SECOND_LAP) { if (scanPtr.p->nextBucketIndex > scanPtr.p->maxBucketIndexToRescan) { - /* --------------------------------------------------------------------------------- */ - // We have finished the rescan phase. We are ready to proceed with the next fragment part. - /* --------------------------------------------------------------------------------- */ + /* ---------------------------------------------------------------- */ + // We have finished the rescan phase. + // We are ready to proceed with the next fragment part. + /* ---------------------------------------------------------------- */ jam(); checkNextFragmentLab(signal); return; }//if } else if (scanPtr.p->scanBucketState == ScanRec::FIRST_LAP) { if ((fragrecptr.p->p + fragrecptr.p->maxp) < scanPtr.p->nextBucketIndex) { - /* --------------------------------------------------------------------------------- */ + /* ---------------------------------------------------------------- */ // All buckets have been scanned a first time. - /* --------------------------------------------------------------------------------- */ + /* ---------------------------------------------------------------- */ if (scanPtr.p->minBucketIndexToRescan == 0xFFFFFFFF) { jam(); - /* --------------------------------------------------------------------------------- */ - // We have not had any merges behind the scan. Thus it is not necessary to perform - // any rescan any buckets and we can proceed immediately with the next fragment part. - /* --------------------------------------------------------------------------------- */ + /* -------------------------------------------------------------- */ + // We have not had any merges behind the scan. + // Thus it is not necessary to perform any rescan any buckets + // and we can proceed immediately with the next fragment part. + /* --------------------------------------------------------------- */ checkNextFragmentLab(signal); return; } else { @@ -5561,18 +6457,23 @@ void Dbacc::checkNextBucketLab(Signal* signal) tslElementptr = tnsElementptr; setlock(signal); insertLockOwnersList(signal, operationRecPtr); + operationRecPtr.p->m_op_bits |= + Operationrec::OP_STATE_RUNNING | Operationrec::OP_RUN_QUEUE; }//if } else { arrGuard(tnsElementptr, 2048); queOperPtr.i = ElementHeader::getOpPtrI(nsPageptr.p->word32[tnsElementptr]); ptrCheckGuard(queOperPtr, coprecsize, operationrec); - if (queOperPtr.p->elementIsDisappeared == ZTRUE) { + if (queOperPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED || + queOperPtr.p->localdata[0] == ~(Uint32)0) + { jam(); - /* --------------------------------------------------------------------------------- */ - // If the lock owner indicates the element is disappeared then we will not report this - // tuple. We will continue with the next tuple. - /* --------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------ */ + // If the lock owner indicates the element is disappeared then + // we will not report this tuple. We will continue with the next tuple. + /* ------------------------------------------------------------------ */ + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; signal->theData[0] = scanPtr.i; @@ -5584,31 +6485,29 @@ void Dbacc::checkNextBucketLab(Signal* signal) Uint32 return_result; if (scanPtr.p->scanLockMode == ZREADLOCK) { jam(); - priPageptr = nsPageptr; - tpriElementptr = tnsElementptr; - return_result = placeReadInLockQueue(signal); + return_result = placeReadInLockQueue(queOperPtr); } else { jam(); - pwiPageptr = nsPageptr; - tpwiElementptr = tnsElementptr; - return_result = placeWriteInLockQueue(signal); + return_result = placeWriteInLockQueue(queOperPtr); }//if if (return_result == ZSERIAL_QUEUE) { - /* --------------------------------------------------------------------------------- */ - /* WE PLACED THE OPERATION INTO A SERIAL QUEUE AND THUS WE HAVE TO WAIT FOR */ - /* THE LOCK TO BE RELEASED. WE CONTINUE WITH THE NEXT ELEMENT. */ - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------- + * WE PLACED THE OPERATION INTO A SERIAL QUEUE AND THUS WE HAVE TO + * WAIT FOR THE LOCK TO BE RELEASED. WE CONTINUE WITH THE NEXT ELEMENT + * ----------------------------------------------------------------- */ putOpScanLockQue(); /* PUT THE OP IN A QUE IN THE SCAN REC */ signal->theData[0] = scanPtr.i; signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP; sendSignal(cownBlockref, GSN_ACC_CHECK_SCAN, signal, 2, JBB); return; - } else if (return_result == ZWRITE_ERROR) { + } else if (return_result != ZPARALLEL_QUEUE) { jam(); - /* --------------------------------------------------------------------------------- */ - // The tuple is either not committed yet or a delete in the same transaction (not - // possible here since we are a scan). Thus we simply continue with the next tuple. - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------- */ + // The tuple is either not committed yet or a delete in + // the same transaction (not possible here since we are a scan). + // Thus we simply continue with the next tuple. + /* ----------------------------------------------------------------- */ + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; signal->theData[0] = scanPtr.i; @@ -5619,10 +6518,11 @@ void Dbacc::checkNextBucketLab(Signal* signal) ndbassert(return_result == ZPARALLEL_QUEUE); }//if }//if - /* --------------------------------------------------------------------------------- */ - // Committed read proceed without caring for locks immediately down here except when - // the tuple was deleted permanently and no new operation has inserted it again. - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------------- */ + // Committed read proceed without caring for locks immediately + // down here except when the tuple was deleted permanently + // and no new operation has inserted it again. + /* ----------------------------------------------------------------------- */ putActiveScanOp(signal); sendNextScanConf(signal); return; @@ -5646,14 +6546,15 @@ void Dbacc::initScanFragmentPart(Signal* signal) DirRangePtr cnfDirRangePtr; DirectoryarrayPtr cnfDirptr; Page8Ptr cnfPageidptr; - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------------- */ // Set the active fragment part. // Set the current bucket scanned to the first. // Start with the first lap. // Remember the number of buckets at start of the scan. - // Set the minimum and maximum to values that will always be smaller and larger than. + // Set the minimum and maximum to values that will always be smaller and + // larger than. // Reset the scan indicator on the first bucket. - /* --------------------------------------------------------------------------------- */ + /* ----------------------------------------------------------------------- */ scanPtr.p->activeLocalFrag = fragrecptr.i; scanPtr.p->nextBucketIndex = 0; /* INDEX OF SCAN BUCKET */ scanPtr.p->scanBucketState = ScanRec::FIRST_LAP; @@ -5672,11 +6573,11 @@ void Dbacc::initScanFragmentPart(Signal* signal) releaseScanBucket(signal); }//Dbacc::initScanFragmentPart() -/* --------------------------------------------------------------------------------- */ -/* FLAG = 6 = ZCOPY_CLOSE THE SCAN PROCESS IS READY OR ABORTED. ALL OPERATION IN THE */ -/* ACTIVE OR WAIT QUEUE ARE RELEASED, SCAN FLAG OF ROOT FRAG IS RESET AND THE SCAN */ -/* RECORD IS RELEASED. */ -/* --------------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- + * FLAG = 6 = ZCOPY_CLOSE THE SCAN PROCESS IS READY OR ABORTED. + * ALL OPERATION IN THE ACTIVE OR WAIT QUEUE ARE RELEASED, + * SCAN FLAG OF ROOT FRAG IS RESET AND THE SCAN RECORD IS RELEASED. + * ------------------------------------------------------------------------ */ void Dbacc::releaseScanLab(Signal* signal) { releaseAndCommitActiveOps(signal); @@ -5715,8 +6616,17 @@ void Dbacc::releaseAndCommitActiveOps(Signal* signal) ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); if (!scanPtr.p->scanReadCommittedFlag) { jam(); - commitOperation(signal); + if ((operationRecPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) == + Operationrec::OP_STATE_EXECUTED) + { + commitOperation(signal); + } + else + { + abortOperation(signal); + } }//if + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; takeOutActiveScanOp(signal); releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; @@ -5737,8 +6647,17 @@ void Dbacc::releaseAndCommitQueuedOps(Signal* signal) ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); if (!scanPtr.p->scanReadCommittedFlag) { jam(); - commitOperation(signal); + if ((operationRecPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) == + Operationrec::OP_STATE_EXECUTED) + { + commitOperation(signal); + } + else + { + abortOperation(signal); + } }//if + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; takeOutReadyScanQueue(signal); releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; @@ -5761,6 +6680,7 @@ void Dbacc::releaseAndAbortLockedOps(Signal* signal) { abortOperation(signal); }//if takeOutScanLockQueue(scanPtr.i); + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; operationRecPtr.i = trsoOperPtr.i; @@ -5795,9 +6715,11 @@ void Dbacc::execACC_CHECK_SCAN(Signal* signal) takeOutReadyScanQueue(signal); fragrecptr.i = operationRecPtr.p->fragptr; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); - if (operationRecPtr.p->elementIsDisappeared == ZTRUE) { + if (operationRecPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED) + { jam(); abortOperation(signal); + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; releaseOpRec(signal); scanPtr.p->scanOpsAllocated--; continue; @@ -5884,7 +6806,8 @@ void Dbacc::execACC_TO_REQ(Signal* signal) jamEntry(); tatrOpPtr.i = signal->theData[1]; /* OPER PTR OF ACC */ ptrCheckGuard(tatrOpPtr, coprecsize, operationrec); - if (tatrOpPtr.p->operation == ZSCAN_OP) { + if ((tatrOpPtr.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP) + { tatrOpPtr.p->transId1 = signal->theData[2]; tatrOpPtr.p->transId2 = signal->theData[3]; } else { @@ -5981,29 +6904,28 @@ void Dbacc::initScanOpRec(Signal* signal) scanPtr.p->scanOpsAllocated++; + Uint32 opbits = 0; + opbits |= ZSCAN_OP; + opbits |= scanPtr.p->scanLockMode ? Operationrec::OP_LOCK_MODE : 0; + opbits |= scanPtr.p->scanLockMode ? Operationrec::OP_ACC_LOCK_MODE : 0; + opbits |= scanPtr.p->scanReadCommittedFlag ? + Operationrec::OP_EXECUTED_DIRTY_READ : 0; + opbits |= Operationrec::OP_COMMIT_DELETE_CHECK; operationRecPtr.p->userptr = RNIL; operationRecPtr.p->scanRecPtr = scanPtr.i; - operationRecPtr.p->operation = ZSCAN_OP; - operationRecPtr.p->transactionstate = ACTIVE; - operationRecPtr.p->commitDeleteCheckFlag = ZFALSE; - operationRecPtr.p->lockMode = scanPtr.p->scanLockMode; operationRecPtr.p->fid = fragrecptr.p->myfid; operationRecPtr.p->fragptr = fragrecptr.i; - operationRecPtr.p->elementIsDisappeared = ZFALSE; operationRecPtr.p->nextParallelQue = RNIL; operationRecPtr.p->prevParallelQue = RNIL; operationRecPtr.p->nextSerialQue = RNIL; operationRecPtr.p->prevSerialQue = RNIL; operationRecPtr.p->transId1 = scanPtr.p->scanTrid1; operationRecPtr.p->transId2 = scanPtr.p->scanTrid2; - operationRecPtr.p->lockOwner = ZFALSE; - operationRecPtr.p->dirtyRead = 0; - operationRecPtr.p->nodeType = 0; // Not a stand-by node operationRecPtr.p->elementIsforward = tisoIsforward; operationRecPtr.p->elementContainer = tisoContainerptr; operationRecPtr.p->elementPointer = tisoElementptr; operationRecPtr.p->elementPage = isoPageptr.i; - operationRecPtr.p->isAccLockReq = ZFALSE; + operationRecPtr.p->m_op_bits = opbits; tisoLocalPtr = tisoElementptr + tisoIsforward; guard24 = fragrecptr.p->localkeylen - 1; for (tisoTmp = 0; tisoTmp <= guard24; tisoTmp++) { @@ -6868,14 +7790,12 @@ void Dbacc::releaseOpRec(Signal* signal) } ndbrequire(opInList == false); #endif - ndbrequire(operationRecPtr.p->lockOwner == ZFALSE); + ndbrequire(operationRecPtr.p->m_op_bits == Operationrec::OP_INITIAL); operationRecPtr.p->nextOp = cfreeopRec; cfreeopRec = operationRecPtr.i; /* UPDATE FREE LIST OF OP RECORDS */ operationRecPtr.p->prevOp = RNIL; - operationRecPtr.p->opState = FREE_OP; - operationRecPtr.p->transactionstate = IDLE; - operationRecPtr.p->operation = ZUNDEFINED_OP; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; }//Dbacc::releaseOpRec() /* --------------------------------------------------------------------------------- */ @@ -7377,8 +8297,8 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal) OperationrecPtr tmpOpPtr; tmpOpPtr.i = recordNo; ptrAss(tmpOpPtr, operationrec); - infoEvent("Dbacc::operationrec[%d]: opState=%d, transid(0x%x, 0x%x)", - tmpOpPtr.i, tmpOpPtr.p->opState, tmpOpPtr.p->transId1, + infoEvent("Dbacc::operationrec[%d]: transid(0x%x, 0x%x)", + tmpOpPtr.i, tmpOpPtr.p->transId1, tmpOpPtr.p->transId2); infoEvent("elementIsforward=%d, elementPage=%d, elementPointer=%d ", tmpOpPtr.p->elementIsforward, tmpOpPtr.p->elementPage, @@ -7386,8 +8306,7 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal) infoEvent("fid=%d, fragptr=%d, hashvaluePart=%d ", tmpOpPtr.p->fid, tmpOpPtr.p->fragptr, tmpOpPtr.p->hashvaluePart); - infoEvent("hashValue=%d, insertDeleteLen=%d", - tmpOpPtr.p->hashValue, tmpOpPtr.p->insertDeleteLen); + infoEvent("hashValue=%d", tmpOpPtr.p->hashValue); infoEvent("nextLockOwnerOp=%d, nextOp=%d, nextParallelQue=%d ", tmpOpPtr.p->nextLockOwnerOp, tmpOpPtr.p->nextOp, tmpOpPtr.p->nextParallelQue); @@ -7398,15 +8317,8 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal) tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue); infoEvent("prevSerialQue=%d, scanRecPtr=%d", tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr); - infoEvent("transactionstate=%d, elementIsDisappeared=%d, insertIsDone=%d ", - tmpOpPtr.p->transactionstate, tmpOpPtr.p->elementIsDisappeared, - tmpOpPtr.p->insertIsDone); - infoEvent("lockMode=%d, lockOwner=%d, nodeType=%d ", - tmpOpPtr.p->lockMode, tmpOpPtr.p->lockOwner, - tmpOpPtr.p->nodeType); - infoEvent("operation=%d, opSimple=%d, dirtyRead=%d,scanBits=%d ", - tmpOpPtr.p->operation, tmpOpPtr.p->opSimple, - tmpOpPtr.p->dirtyRead, tmpOpPtr.p->scanBits); + infoEvent("m_op_bits=0x%x, scanBits=%d ", + tmpOpPtr.p->m_op_bits, tmpOpPtr.p->scanBits); return; } diff --git a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index 407cd8074ab..d228d8ae819 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -33,8 +33,6 @@ // primary key is stored in TUP #include "../dbtup/Dbtup.hpp" -#include "../dbacc/Dbacc.hpp" - #ifdef DBLQH_C // Constants /* ------------------------------------------------------------------------- */ @@ -2556,8 +2554,19 @@ private: Dbtup* c_tup; Dbacc* c_acc; + + /** + * Read primary key from tup + */ Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst); + /** + * Read primary key from operation + */ +public: + Uint32 readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm); +private: + void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32); void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32); @@ -2924,6 +2933,11 @@ public: } DLHashTable<ScanRecord> c_scanTakeOverHash; + +#ifdef ERROR_INSERT + inline bool TRACE_OP_CHECK(const TcConnectionrec* regTcPtr); + void TRACE_OP_DUMP(const TcConnectionrec* regTcPtr, const char * pos); +#endif }; inline @@ -2991,10 +3005,19 @@ Dblqh::accminupdate(Signal* signal, Uint32 opId, const Local_key* key) signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS | key->m_page_idx; c_acc->execACCMINUPDATE(signal); - if (ERROR_INSERTED(5712)) + if (ERROR_INSERTED(5712) || ERROR_INSERTED(5713)) ndbout << " LK: " << *key; regTcPtr.p->m_row_id = *key; } +inline +bool +Dblqh::TRACE_OP_CHECK(const TcConnectionrec* regTcPtr) +{ + return (ERROR_INSERTED(5712) && + (regTcPtr->operation == ZINSERT || + regTcPtr->operation == ZDELETE)) || + ERROR_INSERTED(5713); +} #endif diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index d031f9a00bf..18e5e6cd585 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -67,48 +67,70 @@ // seen only when we debug the product #ifdef VM_TRACE #define DEBUG(x) ndbout << "DBLQH: "<< x << endl; +static NdbOut & operator<<(NdbOut& out, Dblqh::TcConnectionrec::TransactionState state){ out << (int)state; return out; } +static NdbOut & operator<<(NdbOut& out, Dblqh::TcConnectionrec::LogWriteState state){ out << (int)state; return out; } +static NdbOut & operator<<(NdbOut& out, Dblqh::TcConnectionrec::ListState state){ out << (int)state; return out; } +static NdbOut & operator<<(NdbOut& out, Dblqh::TcConnectionrec::AbortState state){ out << (int)state; return out; } +static NdbOut & operator<<(NdbOut& out, Dblqh::ScanRecord::ScanState state){ out << (int)state; return out; } +static NdbOut & operator<<(NdbOut& out, Dblqh::LogFileOperationRecord::LfoState state){ out << (int)state; return out; } +static NdbOut & operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){ out << (int)state; return out; } +static +NdbOut & +operator<<(NdbOut& out, Operation_t op) +{ + switch(op){ + case ZREAD: out << "READ"; break; + case ZREAD_EX: out << "READ-EX"; break; + case ZINSERT: out << "INSERT"; break; + case ZUPDATE: out << "UPDATE"; break; + case ZDELETE: out << "DELETE"; break; + case ZWRITE: out << "WRITE"; break; + } + return out; +} + #else #define DEBUG(x) #endif @@ -120,7 +142,7 @@ const Uint32 NR_ScanNo = 0; #if defined VM_TRACE || defined ERROR_INSERT || defined NDBD_TRACENR #include <NdbConfig.h> -NdbOut * tracenrout = 0; +static NdbOut * tracenrout = 0; static int TRACENR_FLAG = 0; #define TRACENR(x) (* tracenrout) << x #define SET_TRACENR_FLAG TRACENR_FLAG = 1 @@ -132,6 +154,13 @@ static int TRACENR_FLAG = 0; #define CLEAR_TRACENR_FLAG #endif +#ifdef ERROR_INSERT +static NdbOut * traceopout = 0; +#define TRACE_OP(regTcPtr, place) do { if (TRACE_OP_CHECK(regTcPtr)) TRACE_OP_DUMP(regTcPtr, place); } while(0) +#else +#define TRACE_OP(x) {} +#endif + /* ------------------------------------------------------------------------- */ /* ------- SEND SYSTEM ERROR ------- */ /* */ @@ -454,6 +483,10 @@ void Dblqh::execSTTOR(Signal* signal) name = NdbConfig_SignalLogFileName(getOwnNodeId()); tracenrout = new NdbOut(* new FileOutputStream(fopen(name, "w+"))); #endif + +#ifdef ERROR_INSERT + traceopout = &ndbout; +#endif return; break; @@ -2531,14 +2564,15 @@ void Dblqh::execTUPKEYCONF(Signal* signal) case TcConnectionrec::WAIT_ACC_ABORT: case TcConnectionrec::ABORT_QUEUED: jam(); -/* -------------------------------------------------------------------------- */ -/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */ -/* -------------------------------------------------------------------------- */ +/* ------------------------------------------------------------------------- */ +/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */ +/* ------------------------------------------------------------------------- */ break; default: ndbrequire(false); break; }//switch + }//Dblqh::execTUPKEYCONF() /* ************> */ @@ -2560,6 +2594,8 @@ void Dblqh::execTUPKEYREF(Signal* signal) c_fragment_pool.getPtr(regFragptr); fragptr = regFragptr; + TRACE_OP(regTcPtr, "TUPKEYREF"); + if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) { jam(); @@ -2568,7 +2604,12 @@ void Dblqh::execTUPKEYREF(Signal* signal) ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP || regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT); } - + else if (getNodeState().startLevel == NodeState::SL_STARTED) + { + if (terrorCode == 899) + ndbout << "899: " << regTcPtr->m_row_id << endl; + } + switch (tcConnectptr.p->transactionState) { case TcConnectionrec::WAIT_TUP: jam(); @@ -3606,57 +3647,7 @@ void Dblqh::endgettupkeyLab(Signal* signal) regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR; return; }//if -//#define TRACE_LQHKEYREQ -#ifdef TRACE_LQHKEYREQ - { - ndbout << (regTcPtr->operation == ZREAD ? "READ" : - regTcPtr->operation == ZUPDATE ? "UPDATE" : - regTcPtr->operation == ZINSERT ? "INSERT" : - regTcPtr->operation == ZDELETE ? "DELETE" : "<Other>") - << "(" << (int)regTcPtr->operation << ")" - << " from=(" << getBlockName(refToBlock(regTcPtr->clientBlockref)) - << ", " << refToNode(regTcPtr->clientBlockref) << ")" - << " table=" << regTcPtr->tableref << " "; - - ndbout << "hash: " << hex << regTcPtr->hashValue << endl; - - ndbout << "key=[" << hex; - Uint32 i; - for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){ - ndbout << hex << regTcPtr->tupkeyData[i] << " "; - } - - DatabufPtr regDatabufptr; - regDatabufptr.i = regTcPtr->firstTupkeybuf; - while(i < regTcPtr->primKeyLen) - { - ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf); - for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++) - ndbout << hex << regDatabufptr.p->data[j] << " "; - } - ndbout << "]" << endl; - - ndbout << "attr=[" << hex; - for(i = 0; i<regTcPtr->reclenAiLqhkey && i < 5; i++) - ndbout << hex << regTcPtr->firstAttrinfo[i] << " "; - - AttrbufPtr regAttrinbufptr; - regAttrinbufptr.i= regTcPtr->firstAttrinbuf; - while(i < regTcPtr->totReclenAi) - { - ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf); - Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN]; - ndbrequire(dataLen != 0); - ndbrequire(i + dataLen <= regTcPtr->totReclenAi); - for(Uint32 j= 0; j<dataLen; j++, i++) - ndbout << hex << regAttrinbufptr.p->attrbuf[j] << " "; - - regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT]; - } - - ndbout << "]" << endl; - } -#endif + /* ---------------------------------------------------------------------- */ /* NOW RECEPTION OF LQHKEYREQ IS COMPLETED THE NEXT STEP IS TO START*/ /* PROCESSING THE MESSAGE. IF THE MESSAGE IS TO A STAND-BY NODE */ @@ -3763,6 +3754,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) /* ----------------------------------------------------------------- */ if (TRACENR_FLAG) { + TRACE_OP(regTcPtr, "RECEIVED"); switch (regTcPtr->operation) { case ZREAD: TRACENR("READ"); break; case ZUPDATE: TRACENR("UPDATE"); break; @@ -3847,6 +3839,9 @@ Dblqh::exec_acckeyreq(Signal* signal, TcConnectionrecPtr regTcPtr) signal->theData[8] = sig2; signal->theData[9] = sig3; signal->theData[10] = sig4; + + TRACE_OP(regTcPtr.p, "ACC"); + if (regTcPtr.p->primKeyLen > 4) { sendKeyinfoAcc(signal, 11); }//if @@ -4133,7 +4128,7 @@ Dblqh::nr_copy_delete_row(Signal* signal, jam(); ndbrequire(rowid == 0); signal->theData[0] = accPtr; - signal->theData[1] = false; + signal->theData[1] = 0; EXECUTE_DIRECT(ref, GSN_ACC_ABORTREQ, signal, 2); jamEntry(); return; @@ -4144,16 +4139,18 @@ Dblqh::nr_copy_delete_row(Signal* signal, */ ndbrequire(regTcPtr.p->m_dealloc == 0); Local_key save = regTcPtr.p->m_row_id; - signal->theData[0] = regTcPtr.p->accConnectrec; + + c_acc->execACCKEY_ORD(signal, accPtr); + signal->theData[0] = accPtr; EXECUTE_DIRECT(ref, GSN_ACC_COMMITREQ, signal, 1); jamEntry(); - + ndbrequire(regTcPtr.p->m_dealloc == 1); int ret = c_tup->nr_delete(signal, regTcPtr.i, fragPtr.p->tupFragptr, ®TcPtr.p->m_row_id, regTcPtr.p->gci); jamEntry(); - + if (ret) { ndbassert(ret == 1); @@ -4167,7 +4164,7 @@ Dblqh::nr_copy_delete_row(Signal* signal, } TRACENR("DELETED: " << regTcPtr.p->m_row_id << endl); - + regTcPtr.p->m_dealloc = 0; regTcPtr.p->m_row_id = save; fragptr = fragPtr; @@ -4274,6 +4271,45 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op) } } +Uint32 +Dblqh::readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm) +{ + TcConnectionrecPtr regTcPtr; + DatabufPtr regDatabufptr; + Uint64 Tmp[MAX_KEY_SIZE_IN_WORDS >> 1]; + + jamEntry(); + regTcPtr.i = opPtrI; + ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec); + + Uint32 tableId = regTcPtr.p->tableref; + Uint32 keyLen = regTcPtr.p->primKeyLen; + regDatabufptr.i = regTcPtr.p->firstTupkeybuf; + Uint32 * tmp = xfrm ? (Uint32*)Tmp : dst; + + memcpy(tmp, regTcPtr.p->tupkeyData, sizeof(regTcPtr.p->tupkeyData)); + if (keyLen > 4) + { + tmp += 4; + Uint32 pos = 4; + do { + ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf); + memcpy(tmp, regDatabufptr.p->data, sizeof(regDatabufptr.p->data)); + regDatabufptr.i = regDatabufptr.p->nextDatabuf; + tmp += sizeof(regDatabufptr.p->data) >> 2; + pos += sizeof(regDatabufptr.p->data) >> 2; + } while(pos < keyLen); + } + + if (xfrm) + { + jam(); + Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]; + return xfrm_key(tableId, (Uint32*)Tmp, dst, ~0, keyPartLen); + } + + return keyLen; +} /* =*======================================================================= */ /* ======= SEND KEYINFO TO ACC ======= */ @@ -4447,10 +4483,6 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr, * ----------------------------------------------------------------------- */ Uint32 page_idx = local_key & MAX_TUPLES_PER_PAGE; Uint32 page_no = local_key >> MAX_TUPLES_BITS; -#ifdef TRACE_LQHKEYREQ - ndbout << "localkey: [ " << hex << page_no << " " << page_idx << "]" - << endl; -#endif Uint32 Ttupreq = regTcPtr->dirtyOp; Ttupreq = Ttupreq + (regTcPtr->opSimple << 1); Ttupreq = Ttupreq + (op << 6); @@ -4506,70 +4538,13 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr, tupKeyReq->m_row_id_page_no = sig0; tupKeyReq->m_row_id_page_idx = sig1; - if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT) - { - ndbout << "INSERT " << regFragptrP->tabRef - << "(" << regFragptrP->fragId << ")"; - - { - ndbout << "key=[" << hex; - Uint32 i; - for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){ - ndbout << hex << regTcPtr->tupkeyData[i] << " "; - } - - DatabufPtr regDatabufptr; - regDatabufptr.i = regTcPtr->firstTupkeybuf; - while(i < regTcPtr->primKeyLen) - { - ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf); - for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++) - ndbout << hex << regDatabufptr.p->data[j] << " "; - } - ndbout << "] "; - } - - if(regTcPtr->m_use_rowid) - ndbout << " " << regTcPtr->m_row_id; - } - - if (ERROR_INSERTED(5712) && regTcPtr->operation == ZDELETE) - { - Local_key lk; lk.assref(local_key); - - ndbout << "DELETE " << regFragptrP->tabRef - << "(" << regFragptrP->fragId << ") " << lk; - - { - ndbout << "key=[" << hex; - Uint32 i; - for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){ - ndbout << hex << regTcPtr->tupkeyData[i] << " "; - } - - DatabufPtr regDatabufptr; - regDatabufptr.i = regTcPtr->firstTupkeybuf; - while(i < regTcPtr->primKeyLen) - { - ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf); - for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++) - ndbout << hex << regDatabufptr.p->data[j] << " "; - } - ndbout << "]" << endl; - } - - } - + TRACE_OP(regTcPtr, "TUPKEYREQ"); + regTcPtr->m_use_rowid |= (op == ZINSERT); regTcPtr->m_row_id.m_page_no = page_no; regTcPtr->m_row_id.m_page_idx = page_idx; EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength); - - if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT) - { - ndbout << endl; - } }//Dblqh::execACCKEYCONF() void @@ -4654,27 +4629,37 @@ void Dblqh::tupkeyConfLab(Signal* signal) const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0]; TcConnectionrec * const regTcPtr = tcConnectptr.p; Uint32 activeCreat = regTcPtr->activeCreat; + Uint32 readLen = tupKeyConf->readLength; + Uint32 writeLen = tupKeyConf->writeLength; + Uint32 accOp = regTcPtr->accConnectrec; + c_acc->execACCKEY_ORD(signal, accOp); + + TRACE_OP(regTcPtr, "TUPKEYCONF"); + if (regTcPtr->simpleRead) { jam(); /* ---------------------------------------------------------------------- - * THE OPERATION IS A SIMPLE READ. WE WILL IMMEDIATELY COMMIT THE OPERATION. - * SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK (FOR LOCAL CHECKPOINTS) YET + * THE OPERATION IS A SIMPLE READ. + * WE WILL IMMEDIATELY COMMIT THE OPERATION. + * SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK + * (FOR LOCAL CHECKPOINTS) YET * WE CAN GO IMMEDIATELY TO COMMIT_CONTINUE_AFTER_BLOCKED. - * WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN READ LENGTH - * ---------------------------------------------------------------------- */ + * WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN + * READ LENGTH + * --------------------------------------------------------------------- */ commitContinueAfterBlockedLab(signal); return; }//if - if (tupKeyConf->readLength != 0) { + if (readLen != 0) + { jam(); /* SET BIT 15 IN REQINFO */ LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1); - - regTcPtr->readlenAi = tupKeyConf->readLength; + regTcPtr->readlenAi = readLen; }//if - regTcPtr->totSendlenAi = tupKeyConf->writeLength; + regTcPtr->totSendlenAi = writeLen; ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen); if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) @@ -5597,6 +5582,8 @@ void Dblqh::releaseOprec(Signal* signal) if (TRACENR_FLAG) TRACENR("DELETED: " << regTcPtr->m_row_id << endl); + + TRACE_OP(regTcPtr, "DEALLOC"); signal->theData[0] = regTcPtr->fragmentid; signal->theData[1] = regTcPtr->tableref; @@ -5818,6 +5805,10 @@ void Dblqh::execCOMMIT(Signal* signal) ptrAss(tcConnectptr, regTcConnectionrec); if ((tcConnectptr.p->transid[0] == transid1) && (tcConnectptr.p->transid[1] == transid2)) { + + TcConnectionrec * const regTcPtr = tcConnectptr.p; + TRACE_OP(regTcPtr, "COMMIT"); + commitReqLab(signal, gci); return; }//if @@ -5937,6 +5928,10 @@ void Dblqh::execCOMPLETE(Signal* signal) if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) && (tcConnectptr.p->transid[0] == transid1) && (tcConnectptr.p->transid[1] == transid2)) { + + TcConnectionrec * const regTcPtr = tcConnectptr.p; + TRACE_OP(regTcPtr, "COMPLETE"); + if (tcConnectptr.p->seqNoReplica != 0 && tcConnectptr.p->activeCreat == Fragrecord::AC_NORMAL) { jam(); @@ -6313,12 +6308,16 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal) TRACENR(endl); } + TRACE_OP(regTcPtr.p, "ACC_COMMITREQ"); + Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref); signal->theData[0] = regTcPtr.p->accConnectrec; EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); } else { if(!dirtyOp){ + TRACE_OP(regTcPtr.p, "ACC_COMMITREQ"); + Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref); signal->theData[0] = regTcPtr.p->accConnectrec; EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); @@ -6362,6 +6361,8 @@ Dblqh::tupcommit_conf_callback(Signal* signal, Uint32 tcPtrI) c_fragment_pool.getPtr(regFragptr); fragptr = regFragptr; + TRACE_OP(tcPtr, "ACC_COMMITREQ"); + Uint32 acc = refToBlock(tcPtr->tcAccBlockref); signal->theData[0] = tcPtr->accConnectrec; EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); @@ -6670,6 +6671,8 @@ void Dblqh::execABORT(Signal* signal) regTcPtr->commitAckMarker = RNIL; } + TRACE_OP(regTcPtr, "ABORT"); + abortStateHandlerLab(signal); return; @@ -7087,23 +7090,30 @@ void Dblqh::abortContinueAfterBlockedLab(Signal* signal, bool canBlock) * ALSO AS PART OF A NORMAL ABORT WITHOUT BLOCKING. * WE MUST ABORT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN * AND SEES A STATE IN TUP. - * ------------------------------------------------------------------------ */ + * ----------------------------------------------------------------------- */ TcConnectionrec * const regTcPtr = tcConnectptr.p; - fragptr.i = regTcPtr->fragmentptr; - c_fragment_pool.getPtr(fragptr); - signal->theData[0] = regTcPtr->tupConnectrec; - EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1); + + TRACE_OP(regTcPtr, "ACC ABORT"); + regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT; signal->theData[0] = regTcPtr->accConnectrec; - signal->theData[1] = true; + signal->theData[1] = 2; // JOB BUFFER IF NEEDED EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 2); - /* ------------------------------------------------------------------------ - * We need to insert a real-time break by sending ACC_ABORTCONF through the - * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or - * TUPKEYREF that are in the job buffer but not yet processed. Doing - * everything without that would race and create a state error when they - * are executed. - * ----------------------------------------------------------------------- */ + + if (signal->theData[1] == RNIL) + { + jam(); + /* ------------------------------------------------------------------------ + * We need to insert a real-time break by sending ACC_ABORTCONF through the + * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or + * TUPKEYREF that are in the job buffer but not yet processed. Doing + * everything without that would race and create a state error when they + * are executed. + * --------------------------------------------------------------------- */ + return; + } + + execACC_ABORTCONF(signal); return; }//Dblqh::abortContinueAfterBlockedLab() @@ -7117,6 +7127,11 @@ void Dblqh::execACC_ABORTCONF(Signal* signal) ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); TcConnectionrec * const regTcPtr = tcConnectptr.p; ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT); + + TRACE_OP(regTcPtr, "ACC_ABORTCONF"); + signal->theData[0] = regTcPtr->tupConnectrec; + EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1); + continueAbortLab(signal); return; }//Dblqh::execACC_ABORTCONF() @@ -8837,7 +8852,9 @@ void Dblqh::nextScanConfScanLab(Signal* signal) }//if // If accOperationPtr == RNIL no record was returned by ACC - if (nextScanConf->accOperationPtr == RNIL) { + Uint32 accOpPtr = nextScanConf->accOperationPtr; + if (accOpPtr == RNIL) + { jam(); /************************************************************* * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. @@ -8871,7 +8888,8 @@ void Dblqh::nextScanConfScanLab(Signal* signal) jam(); set_acc_ptr_in_scan_record(scanptr.p, scanptr.p->m_curr_batch_size_rows, - nextScanConf->accOperationPtr); + accOpPtr); + jam(); nextScanConfLoopLab(signal); }//Dblqh::nextScanConfScanLab() @@ -9071,12 +9089,24 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED; scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); + + Uint32 rows = scanptr.p->m_curr_batch_size_rows; + Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false); + if (accOpPtr != (Uint32)-1) + { + c_acc->execACCKEY_ORD(signal, accOpPtr); + } + else + { + ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC); + } + if (scanptr.p->scanCompletedStatus == ZTRUE) { /* --------------------------------------------------------------------- * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. * --------------------------------------------------------------------- */ - if ((scanptr.p->scanLockHold == ZTRUE) && - (scanptr.p->m_curr_batch_size_rows > 0)) { + if ((scanptr.p->scanLockHold == ZTRUE) && rows) + { jam(); scanptr.p->scanReleaseCounter = 1; scanReleaseLocksLab(signal); @@ -9093,7 +9123,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) }//if ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN); scanptr.p->m_curr_batch_size_bytes+= tdata4; - scanptr.p->m_curr_batch_size_rows++; + scanptr.p->m_curr_batch_size_rows = rows + 1; scanptr.p->m_last_row = tdata5; if (scanptr.p->check_scan_batch_completed() | tdata5){ if (scanptr.p->scanLockHold == ZTRUE) { @@ -9103,7 +9133,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) return; } else { jam(); - scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows; + scanptr.p->scanReleaseCounter = rows + 1; scanReleaseLocksLab(signal); return; } @@ -9187,12 +9217,24 @@ void Dblqh::scanTupkeyRefLab(Signal* signal) tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED; scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); + + Uint32 rows = scanptr.p->m_curr_batch_size_rows; + Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false); + if (accOpPtr != (Uint32)-1) + { + c_acc->execACCKEY_ORD(signal, accOpPtr); + } + else + { + ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC); + } + if (scanptr.p->scanCompletedStatus == ZTRUE) { /* --------------------------------------------------------------------- * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. * --------------------------------------------------------------------- */ - if ((scanptr.p->scanLockHold == ZTRUE) && - (scanptr.p->m_curr_batch_size_rows > 0)) { + if ((scanptr.p->scanLockHold == ZTRUE) && rows) + { jam(); scanptr.p->scanReleaseCounter = 1; scanReleaseLocksLab(signal); @@ -9213,8 +9255,8 @@ void Dblqh::scanTupkeyRefLab(Signal* signal) scanptr.p->scanReleaseCounter = 1; } else { jam(); - scanptr.p->m_curr_batch_size_rows++; - scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows; + scanptr.p->m_curr_batch_size_rows = rows + 1; + scanptr.p->scanReleaseCounter = rows + 1; }//if /* -------------------------------------------------------------------- * WE NEED TO RELEASE ALL LOCKS CURRENTLY @@ -9224,7 +9266,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal) return; }//if Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount; - if (scanptr.p->m_curr_batch_size_rows > 0) { + if (rows) { if (time_passed > 1) { /* ----------------------------------------------------------------------- * WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A @@ -9232,7 +9274,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal) * THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we * send the found tuples to the API. * ----------------------------------------------------------------------- */ - scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows + 1; + scanptr.p->scanReleaseCounter = rows + 1; scanReleaseLocksLab(signal); return; } @@ -9872,6 +9914,8 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal) fragptr.p->m_scanNumberMask.clear(NR_ScanNo); scanptr.p->scanBlockref = DBTUP_REF; scanptr.p->scanLockHold = ZFALSE; + scanptr.p->m_curr_batch_size_rows = 0; + scanptr.p->m_curr_batch_size_bytes= 0; initScanTc(0, 0, @@ -10074,7 +10118,7 @@ void Dblqh::nextScanConfCopyLab(Signal* signal) initCopyTc(signal, ZDELETE); set_acc_ptr_in_scan_record(scanptr.p, 0, RNIL); tcConP->gci = nextScanConf->gci; - + tcConP->primKeyLen = 0; tcConP->totSendlenAi = 0; tcConP->connectState = TcConnectionrec::COPY_CONNECTED; @@ -10197,6 +10241,12 @@ void Dblqh::copyTupkeyConfLab(Signal* signal) scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); ScanRecord* scanP = scanptr.p; + + Uint32 rows = scanP->m_curr_batch_size_rows; + Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, rows, false); + ndbassert(accOpPtr != (Uint32)-1); + c_acc->execACCKEY_ORD(signal, accOpPtr); + if (tcConnectptr.p->errorCode != 0) { jam(); closeCopyLab(signal); @@ -18538,6 +18588,21 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) } ndbrequire(arg != 2308); } + +#ifdef ERROR_INSERT + if (arg == 5712 || arg == 5713) + { + if (arg == 5712) + { + traceopout = &ndbout; + } + else if (arg == 5713) + { + traceopout = tracenrout; + } + SET_ERROR_INSERT_VALUE(arg); + } +#endif }//Dblqh::execDUMP_STATE_ORD() @@ -18702,3 +18767,39 @@ void Dblqh::writeDbgInfoPageHeader(LogPageRecordPtr logP, Uint32 place, logP.p->logPageWord[ZPOS_IN_WRITING]= 1; } +#if defined ERROR_INSERT +void +Dblqh::TRACE_OP_DUMP(const Dblqh::TcConnectionrec* regTcPtr, const char * pos) +{ + (* traceopout) + << "[ " << hex << regTcPtr->transid[0] + << " " << hex << regTcPtr->transid[1] << " ] " << dec + << pos + << " " << (Operation_t)regTcPtr->operation + << " " << regTcPtr->tableref + << "(" << regTcPtr->fragmentid << ")" + << "(" << (regTcPtr->seqNoReplica == 0 ? "P" : "B") << ")" ; + + { + (* traceopout) << "key=[" << hex; + Uint32 i; + for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){ + (* traceopout) << hex << regTcPtr->tupkeyData[i] << " "; + } + + DatabufPtr regDatabufptr; + regDatabufptr.i = regTcPtr->firstTupkeybuf; + while(i < regTcPtr->primKeyLen) + { + ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf); + for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++) + (* traceopout) << hex << regDatabufptr.p->data[j] << " "; + } + (* traceopout) << "] "; + } + + if (regTcPtr->m_use_rowid) + (* traceopout) << " " << regTcPtr->m_row_id; + (* traceopout) << endl; +} +#endif diff --git a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp index b319932aec7..4ff6e069963 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp +++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp @@ -297,7 +297,6 @@ enum TransState { }; enum TupleState { - TUPLE_INITIAL_INSERT = 0, TUPLE_PREPARED = 1, TUPLE_ALREADY_ABORTED = 2, TUPLE_TO_BE_COMMITTED = 3 @@ -305,7 +304,6 @@ enum TupleState { enum State { NOT_INITIALIZED = 0, - COMMON_AREA_PAGES = 1, IDLE = 17, ACTIVE = 18, SYSTEM_RESTART = 19, @@ -1441,7 +1439,6 @@ private: void execSET_VAR_REQ(Signal* signal); void execDROP_TAB_REQ(Signal* signal); void execALTER_TAB_REQ(Signal* signal); - void execTUP_ALLOCREQ(Signal* signal); void execTUP_DEALLOCREQ(Signal* signal); void execTUP_WRITELOG_REQ(Signal* signal); diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp index 90abe2cb809..d45c5f9d988 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp @@ -202,29 +202,6 @@ Dbtup::receive_attrinfo(Signal* signal, Uint32 op, } } -void Dbtup::execTUP_ALLOCREQ(Signal* signal) -{ - OperationrecPtr regOperPtr; - - jamEntry(); - - regOperPtr.i= signal->theData[0]; - c_operation_pool.getPtr(regOperPtr); - - regOperPtr.p->op_struct.tuple_state= TUPLE_INITIAL_INSERT; - //ndbout_c("execTUP_ALLOCREQ"); - - signal->theData[0]= 0; - signal->theData[1]= ~0 >> MAX_TUPLES_BITS; - signal->theData[2]= (1 << MAX_TUPLES_BITS) - 1; - return; - -mem_error: - jam(); - signal->theData[0]= ZMEM_NOMEM_ERROR; - return; -} - void Dbtup::setChecksum(Tuple_header* tuple_ptr, Tablerec* regTabPtr) @@ -455,13 +432,13 @@ Dbtup::load_diskpage(Signal* signal, ptrCheckGuard(tabptr, cnoOfTablerec, tablerec); Tablerec* regTabPtr = tabptr.p; - if(regOperPtr->op_struct.tuple_state == TUPLE_INITIAL_INSERT) + if(local_key == ~(Uint32)0) { jam(); regOperPtr->op_struct.m_wait_log_buffer= 1; regOperPtr->op_struct.m_load_diskpage_on_commit= 1; return 1; - } + } jam(); Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE; @@ -663,7 +640,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal) regOperPtr->savepointId= sig1; regOperPtr->op_struct.primary_replica= sig2; - regOperPtr->m_tuple_location.m_page_idx= sig3; + Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3; sig1= tupKeyReq->opRef; sig2= tupKeyReq->tcOpIndex; @@ -673,7 +650,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal) req_struct.tc_operation_ptr= sig1; req_struct.TC_index= sig2; req_struct.TC_ref= sig3; - req_struct.frag_page_id= sig4; + Uint32 pageid = req_struct.frag_page_id= sig4; req_struct.m_use_rowid = (TrequestInfo >> 11) & 1; sig1= tupKeyReq->attrBufLen; @@ -706,7 +683,8 @@ void Dbtup::execTUPKEYREQ(Signal* signal) copyAttrinfo(regOperPtr, &cinBuffer[0]); - if(Roptype == ZINSERT && get_tuple_state(regOperPtr)== TUPLE_INITIAL_INSERT) + Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx; + if(Roptype == ZINSERT && localkey == ~0) { // No tuple allocatated yet goto do_insert; @@ -1159,49 +1137,6 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct, disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0; } -void -Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct, - Operationrec* regOperPtr, - Tablerec* regTabPtr) -{ - regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc); - req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD); - - const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize; - const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize; - Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr); - - if(cnt1) - { - // Disk part is 32-bit aligned - char *varptr = req_struct->m_var_data[MM].m_data_ptr; - ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset); - } - else - { - ptr -= Tuple_header::HeaderSize; - } - - req_struct->m_disk_ptr= (Tuple_header*)ptr; - - if(cnt2) - { - KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD]; - ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset; - dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1); - dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1); - dst->m_var_len_offset= cnt2; - dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset; - } - - // Set all null bits - memset(req_struct->m_disk_ptr->m_null_bits+ - regTabPtr->m_offsets[DD].m_null_offset, 0xFF, - 4*regTabPtr->m_offsets[DD].m_null_words); - req_struct->m_tuple_ptr->m_header_bits = - (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE); -} - int Dbtup::handleInsertReq(Signal* signal, Ptr<Operationrec> regOperPtr, Ptr<Fragrecord> fragPtr, @@ -1215,8 +1150,8 @@ int Dbtup::handleInsertReq(Signal* signal, Tuple_header *tuple_ptr; bool disk = regTabPtr->m_no_of_disk_attributes > 0; - bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT; - bool disk_insert = regOperPtr.p->is_first_operation() && disk; + bool mem_insert = regOperPtr.p->is_first_operation(); + bool disk_insert = mem_insert && disk; bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize; bool rowid = req_struct->m_use_rowid; Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no; @@ -1244,21 +1179,16 @@ int Dbtup::handleInsertReq(Signal* signal, if(mem_insert) { jam(); - ndbassert(regOperPtr.p->is_first_operation()); // disk insert prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr); } else { - if (!regOperPtr.p->is_first_operation()) - { - Operationrec* prevOp= req_struct->prevOpPtr.p; - ndbassert(prevOp->op_struct.op_type == ZDELETE); - tup_version= prevOp->tupVersion + 1; - - if(!prevOp->is_first_operation()) - org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location); - } - + Operationrec* prevOp= req_struct->prevOpPtr.p; + ndbassert(prevOp->op_struct.op_type == ZDELETE); + tup_version= prevOp->tupVersion + 1; + + if(!prevOp->is_first_operation()) + org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location); if (regTabPtr->need_expand()) expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert); else @@ -1268,11 +1198,6 @@ int Dbtup::handleInsertReq(Signal* signal, if (disk_insert) { int res; - if (unlikely(!mem_insert)) - { - sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size; - fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr); - } if (ERROR_INSERTED(4015)) { @@ -1381,6 +1306,7 @@ int Dbtup::handleInsertReq(Signal* signal, } if (unlikely(ptr == 0)) { + jam(); goto alloc_rowid_error; } } @@ -1396,7 +1322,7 @@ int Dbtup::handleInsertReq(Signal* signal, (varsize ? Tuple_header::CHAINED_ROW : 0); regOperPtr.p->m_tuple_location.m_page_no = real_page_id; } - else if(!rowid || !regOperPtr.p->is_first_operation()) + else { int ret; if (ERROR_INSERTED(4020)) @@ -1417,20 +1343,6 @@ int Dbtup::handleInsertReq(Signal* signal, req_struct->m_use_rowid = false; base->m_header_bits &= ~(Uint32)Tuple_header::FREE; } - else - { - if ((req_struct->m_row_id.m_page_no == frag_page_id && - req_struct->m_row_id.m_page_idx == regOperPtr.p->m_tuple_location.m_page_idx)) - { - ndbout_c("no mem insert but rowid (same)"); - base->m_header_bits &= ~(Uint32)Tuple_header::FREE; - } - else - { - // no mem insert, but rowid - ndbrequire(false); - } - } base->m_header_bits |= Tuple_header::ALLOC & (regOperPtr.p->is_first_operation() ? ~0 : 1); diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp index d1580f8da54..8a68905cef9 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp @@ -89,7 +89,6 @@ Dbtup::Dbtup(Block_context& ctx, Pgman* pgman) addRecSignal(GSN_DROP_TAB_REQ, &Dbtup::execDROP_TAB_REQ); - addRecSignal(GSN_TUP_ALLOCREQ, &Dbtup::execTUP_ALLOCREQ); addRecSignal(GSN_TUP_DEALLOCREQ, &Dbtup::execTUP_DEALLOCREQ); addRecSignal(GSN_TUP_WRITELOG_REQ, &Dbtup::execTUP_WRITELOG_REQ); diff --git a/storage/ndb/test/ndbapi/testOperations.cpp b/storage/ndb/test/ndbapi/testOperations.cpp index 65b406f155d..e116545f7e3 100644 --- a/storage/ndb/test/ndbapi/testOperations.cpp +++ b/storage/ndb/test/ndbapi/testOperations.cpp @@ -665,9 +665,9 @@ main(int argc, const char** argv){ for(Uint32 i = 0; i < 12; i++) { - if(i == 6 || i == 8 || i == 10) + if(false && (i == 6 || i == 8 || i == 10)) continue; - + BaseString name("bug_9749"); name.appfmt("_%d", i); NDBT_TestCaseImpl1 *pt = new NDBT_TestCaseImpl1(&ts, |