summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <jonas@perch.ndb.mysql.com>2006-05-18 10:17:53 +0200
committerunknown <jonas@perch.ndb.mysql.com>2006-05-18 10:17:53 +0200
commitbe0ab479b822a3b9d065dc659ef59d5450f2270b (patch)
treed32aff29244d0593c63fcd197f040618e049d1b9
parent7d6ee98f1c0be068755cef4baa586b99fd98e604 (diff)
downloadmariadb-git-be0ab479b822a3b9d065dc659ef59d5450f2270b.tar.gz
ndb - bug#19293 and family
introduce acc per row logical mutex to fix difficult error handling cases storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp: 1) Fix per row mutex so that only 1 op at a time is running on a row 2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) } 3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue 4) Impl. a validate_lock_queue/dump_lock_queue test framework storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp: 1) Fix per row mutex so that only 1 op at a time is running on a row 2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) } 3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue 4) Impl. a validate_lock_queue/dump_lock_queue test framework storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: 1) Fix per row mutex so that only 1 op at a time is running on a row 2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) } 3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue 4) Impl. a validate_lock_queue/dump_lock_queue test framework storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp: 1) impl. a new read key from operation record needed by acc 2) expand TRACE_OP toolkit 3) impl. ACCKEY_ORD as needed by ACC changes storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: 1) impl. a new read key from operation record needed by acc 2) expand TRACE_OP toolkit 3) impl. ACCKEY_ORD as needed by ACC changes storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp: remove unused states/methods storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp: remove extremly tricky code that handles disk_insert_but_no_mem_insert that is no long needed with current acc changes storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp: remove unused states/methods storage/ndb/test/ndbapi/testOperations.cpp: renable last 3 lock upgrade testcases since they now pass
-rw-r--r--storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp133
-rw-r--r--storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp4
-rw-r--r--storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp3098
-rw-r--r--storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp29
-rw-r--r--storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp417
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp3
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp120
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp1
-rw-r--r--storage/ndb/test/ndbapi/testOperations.cpp4
9 files changed, 2396 insertions, 1413 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
index 8373004fd0c..b1638599dcc 100644
--- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
+++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
@@ -17,14 +17,13 @@
#ifndef DBACC_H
#define DBACC_H
-
+#ifdef VM_TRACE
+#define ACC_SAFE_QUEUE
+#endif
#include <pc.hpp>
#include <SimulatedBlock.hpp>
-// primary key is stored in TUP
-#include "../dbtup/Dbtup.hpp"
-
#ifdef DBACC_C
// Debug Macros
#define dbgWord32(ptr, ind, val)
@@ -135,7 +134,10 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: "
#define ZRIGHT 2
#define ZROOTFRAGMENTSIZE 32
#define ZSCAN_LOCK_ALL 3
-#define ZSCAN_OP 5
+/**
+ * Check kernel_types for other operation types
+ */
+#define ZSCAN_OP 6
#define ZSCAN_REC_SIZE 256
#define ZSTAND_BY 2
#define ZTABLESIZE 16
@@ -477,6 +479,7 @@ struct Fragmentrec {
/* OPERATIONREC */
/* --------------------------------------------------------------------------------- */
struct Operationrec {
+ Uint32 m_op_bits;
Uint32 localdata[2];
Uint32 elementIsforward;
Uint32 elementPage;
@@ -485,36 +488,62 @@ struct Operationrec {
Uint32 fragptr;
Uint32 hashvaluePart;
Uint32 hashValue;
- Uint32 insertDeleteLen;
Uint32 nextLockOwnerOp;
Uint32 nextOp;
Uint32 nextParallelQue;
- Uint32 nextSerialQue;
+ union {
+ Uint32 nextSerialQue;
+ Uint32 m_lock_owner_ptr_i; // if nextParallelQue = RNIL, else undefined
+ };
Uint32 prevOp;
Uint32 prevLockOwnerOp;
- Uint32 prevParallelQue;
- Uint32 prevSerialQue;
+ union {
+ Uint32 prevParallelQue;
+ Uint32 m_lo_last_parallel_op_ptr_i;
+ };
+ union {
+ Uint32 prevSerialQue;
+ Uint32 m_lo_last_serial_op_ptr_i;
+ };
Uint32 scanRecPtr;
Uint32 transId1;
Uint32 transId2;
- State opState;
Uint32 userptr;
- State transactionstate;
Uint16 elementContainer;
Uint16 tupkeylen;
Uint32 xfrmtupkeylen;
Uint32 userblockref;
Uint32 scanBits;
- Uint8 elementIsDisappeared;
- Uint8 insertIsDone;
- Uint8 lockMode;
- Uint8 lockOwner;
- Uint8 nodeType;
- Uint8 operation;
- Uint8 opSimple;
- Uint8 dirtyRead;
- Uint8 commitDeleteCheckFlag;
- Uint8 isAccLockReq;
+
+ enum OpBits {
+ OP_MASK = 0x0000F // 4 bits for operation type
+ ,OP_LOCK_MODE = 0x00010 // 0 - shared lock, 1 = exclusive lock
+ ,OP_ACC_LOCK_MODE = 0x00020 // Or:de lock mode of all operation
+ // before me
+ ,OP_LOCK_OWNER = 0x00040
+ ,OP_RUN_QUEUE = 0x00080 // In parallell queue of lock owner
+ ,OP_DIRTY_READ = 0x00100
+ ,OP_LOCK_REQ = 0x00200 // isAccLockReq
+ ,OP_COMMIT_DELETE_CHECK = 0x00400
+ ,OP_INSERT_IS_DONE = 0x00800
+ ,OP_ELEMENT_DISAPPEARED = 0x01000
+
+ ,OP_STATE_MASK = 0xF0000
+ ,OP_STATE_IDLE = 0xF0000
+ ,OP_STATE_WAITING = 0x00000
+ ,OP_STATE_RUNNING = 0x10000
+ ,OP_STATE_EXECUTED = 0x30000
+ ,OP_STATE_RUNNING_ABORT = 0x20000
+
+ ,OP_EXECUTED_DIRTY_READ = 0x3050F
+ ,OP_INITIAL = ~(Uint32)0
+ };
+
+ bool is_same_trans(const Operationrec* op) const {
+ return
+ transId1 == op->transId1 && transId2 == op->transId2;
+ }
+
}; /* p2c: size = 168 bytes */
typedef Ptr<Operationrec> OperationrecPtr;
@@ -577,7 +606,6 @@ struct ScanRec {
Uint32 scanUserblockref;
Uint32 scanMask;
Uint8 scanLockMode;
- Uint8 scanKeyinfoFlag;
Uint8 scanTimer;
Uint8 scanContinuebCounter;
Uint8 scanReadCommittedFlag;
@@ -602,7 +630,8 @@ public:
virtual ~Dbacc();
// pointer to TUP instance in this thread
- Dbtup* c_tup;
+ class Dbtup* c_tup;
+ class Dblqh* c_lqh;
void execACCMINUPDATE(Signal* signal);
@@ -640,7 +669,8 @@ private:
void ACCKEY_error(Uint32 fromWhere);
void commitDeleteCheck();
-
+ void report_dealloc(Signal* signal, const Operationrec* opPtrP);
+
typedef void * RootfragmentrecPtr;
void initRootFragPageZero(FragmentrecPtr, Page8Ptr);
void initFragAdd(Signal*, FragmentrecPtr);
@@ -679,14 +709,30 @@ private:
bool addfragtotab(Signal* signal, Uint32 rootIndex, Uint32 fragId);
void initOpRec(Signal* signal);
void sendAcckeyconf(Signal* signal);
- Uint32 placeReadInLockQueue(Signal* signal);
- void placeSerialQueueRead(Signal* signal);
- void checkOnlyReadEntry(Signal* signal);
Uint32 getNoParallelTransaction(const Operationrec*);
- void moveLastParallelQueue(Signal* signal);
- void moveLastParallelQueueWrite(Signal* signal);
- Uint32 placeWriteInLockQueue(Signal* signal);
- void placeSerialQueueWrite(Signal* signal);
+
+#ifdef VM_TRACE
+ Uint32 getNoParallelTransactionFull(const Operationrec*);
+#endif
+#ifdef ACC_SAFE_QUEUE
+ bool validate_lock_queue(OperationrecPtr opPtr);
+ Uint32 get_parallel_head(OperationrecPtr opPtr);
+ void dump_lock_queue(OperationrecPtr loPtr);
+#else
+ bool validate_lock_queue(OperationrecPtr) { return true;}
+#endif
+
+public:
+ void execACCKEY_ORD(Signal* signal, Uint32 opPtrI);
+ void startNext(Signal* signal, OperationrecPtr lastOp);
+
+private:
+ Uint32 placeReadInLockQueue(OperationrecPtr lockOwnerPtr);
+ Uint32 placeWriteInLockQueue(OperationrecPtr lockOwnerPtr);
+ void placeSerialQueue(OperationrecPtr lockOwner, OperationrecPtr op);
+ void abortSerieQueueOperation(Signal* signal, OperationrecPtr op);
+ void abortParallelQueueOperation(Signal* signal, OperationrecPtr op);
+
void expandcontainer(Signal* signal);
void shrinkcontainer(Signal* signal);
void nextcontainerinfoExp(Signal* signal);
@@ -716,8 +762,8 @@ private:
void increaselistcont(Signal* signal);
void seizeLeftlist(Signal* signal);
void seizeRightlist(Signal* signal);
- Uint32 readTablePk(Uint32 localkey1);
- void getElement(Signal* signal);
+ Uint32 readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec*);
+ Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner);
void getdirindex(Signal* signal);
void commitdelete(Signal* signal);
void deleteElement(Signal* signal);
@@ -726,12 +772,17 @@ private:
void releaseRightlist(Signal* signal);
void checkoverfreelist(Signal* signal);
void abortOperation(Signal* signal);
- void accAbortReqLab(Signal* signal);
void commitOperation(Signal* signal);
- void copyOpInfo(Signal* signal);
+ void copyOpInfo(OperationrecPtr dst, OperationrecPtr src);
Uint32 executeNextOperation(Signal* signal);
void releaselock(Signal* signal);
+ void release_lockowner(Signal* signal, OperationrecPtr, bool commit);
+ void startNew(Signal* signal, OperationrecPtr newOwner);
+ void abortWaitingOperation(Signal*, OperationrecPtr);
+ void abortExecutedOperation(Signal*, OperationrecPtr);
+
void takeOutFragWaitQue(Signal* signal);
+ void check_lock_upgrade(Signal* signal, OperationrecPtr release_op, bool lo);
void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner,
OperationrecPtr release_op);
void allocOverflowPage(Signal* signal);
@@ -780,8 +831,8 @@ private:
void senddatapagesLab(Signal* signal);
void sttorrysignalLab(Signal* signal);
void sendholdconfsignalLab(Signal* signal);
- void accIsLockedLab(Signal* signal);
- void insertExistElemLab(Signal* signal);
+ void accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr);
+ void insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr);
void refaccConnectLab(Signal* signal);
void releaseScanLab(Signal* signal);
void ndbrestart1Lab(Signal* signal);
@@ -840,8 +891,6 @@ private:
Operationrec *operationrec;
OperationrecPtr operationRecPtr;
OperationrecPtr idrOperationRecPtr;
- OperationrecPtr copyInOperPtr;
- OperationrecPtr copyOperPtr;
OperationrecPtr mlpqOperPtr;
OperationrecPtr queOperPtr;
OperationrecPtr readWriteOpPtr;
@@ -885,8 +934,6 @@ private:
Page8Ptr lcnPageptr;
Page8Ptr lcnCopyPageptr;
Page8Ptr lupPageptr;
- Page8Ptr priPageptr;
- Page8Ptr pwiPageptr;
Page8Ptr ciPageidptr;
Page8Ptr gsePageidptr;
Page8Ptr isoPageptr;
@@ -926,8 +973,6 @@ private:
Tabrec *tabrec;
TabrecPtr tabptr;
Uint32 ctablesize;
- Uint32 tpwiElementptr;
- Uint32 tpriElementptr;
Uint32 tgseElementptr;
Uint32 tgseContainerptr;
Uint32 trlHead;
@@ -961,8 +1006,6 @@ private:
Uint32 tdelForward;
Uint32 tiopPageId;
Uint32 tipPageId;
- Uint32 tgeLocked;
- Uint32 tgeResult;
Uint32 tgeContainerptr;
Uint32 tgeElementptr;
Uint32 tgeForward;
diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp
index 80fe82ed657..27355299a9c 100644
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp
@@ -130,8 +130,6 @@ Dbacc::Dbacc(Block_context& ctx):
&fragrecptr,
&operationRecPtr,
&idrOperationRecPtr,
- &copyInOperPtr,
- &copyOperPtr,
&mlpqOperPtr,
&queOperPtr,
&readWriteOpPtr,
@@ -161,8 +159,6 @@ Dbacc::Dbacc(Block_context& ctx):
&lcnPageptr,
&lcnCopyPageptr,
&lupPageptr,
- &priPageptr,
- &pwiPageptr,
&ciPageidptr,
&gsePageidptr,
&isoPageptr,
diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
index 3768609860b..5337c7c015d 100644
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
@@ -41,6 +41,19 @@
#define DEBUG(x)
#endif
+#ifdef ACC_SAFE_QUEUE
+#define vlqrequire(x) do { if (unlikely(!(x))) {\
+ dump_lock_queue(loPtr); \
+ ndbrequire(false); } } while(0)
+#else
+#define vlqrequire(x) ndbrequire(x)
+#endif
+
+
+// primary key is stored in TUP
+#include "../dbtup/Dbtup.hpp"
+#include "../dblqh/Dblqh.hpp"
+
// Signal entries and statement blocks
/* --------------------------------------------------------------------------------- */
@@ -208,8 +221,8 @@ void Dbacc::execSTTOR(Signal* signal)
switch (tstartphase) {
case 1:
jam();
- c_tup = (Dbtup*)globalData.getBlock(DBTUP);
- ndbrequire(c_tup != 0);
+ ndbrequire((c_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0);
+ ndbrequire((c_lqh = (Dblqh*)globalData.getBlock(DBLQH)) != 0);
break;
}
tuserblockref = signal->theData[3];
@@ -435,9 +448,7 @@ void Dbacc::initialiseOperationRec(Signal* signal)
for (operationRecPtr.i = 0; operationRecPtr.i < coprecsize; operationRecPtr.i++) {
refresh_watch_dog();
ptrAss(operationRecPtr, operationrec);
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- operationRecPtr.p->opState = FREE_OP;
+ operationRecPtr.p->m_op_bits = ~0;
operationRecPtr.p->nextOp = operationRecPtr.i + 1;
}//for
operationRecPtr.i = coprecsize - 1;
@@ -899,8 +910,7 @@ void Dbacc::execACCSEIZEREQ(Signal* signal)
ptrGuard(operationRecPtr);
operationRecPtr.p->userptr = tuserptr;
operationRecPtr.p->userblockref = tuserblockref;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- operationRecPtr.p->transactionstate = IDLE;
+ operationRecPtr.p->m_op_bits = ~0;
/* ******************************< */
/* ACCSEIZECONF */
/* ******************************< */
@@ -954,22 +964,19 @@ void Dbacc::initOpRec(Signal* signal)
operationRecPtr.p->xfrmtupkeylen = signal->theData[4];
operationRecPtr.p->transId1 = signal->theData[5];
operationRecPtr.p->transId2 = signal->theData[6];
- operationRecPtr.p->transactionstate = ACTIVE;
- operationRecPtr.p->commitDeleteCheckFlag = ZFALSE;
- operationRecPtr.p->operation = Treqinfo & 0x7;
- /* --------------------------------------------------------------------------------- */
- // opSimple is not used in this version. Is needed for deadlock handling later on.
- /* --------------------------------------------------------------------------------- */
- // operationRecPtr.p->opSimple = (Treqinfo >> 3) & 0x1;
-
- operationRecPtr.p->lockMode = (Treqinfo >> 4) & 0x3;
Uint32 readFlag = (((Treqinfo >> 4) & 0x3) == 0); // Only 1 if Read
Uint32 dirtyFlag = (((Treqinfo >> 6) & 0x1) == 1); // Only 1 if Dirty
Uint32 dirtyReadFlag = readFlag & dirtyFlag;
- operationRecPtr.p->dirtyRead = dirtyReadFlag;
- operationRecPtr.p->nodeType = (Treqinfo >> 7) & 0x3;
+ Uint32 opbits = 0;
+ opbits |= Treqinfo & 0x7;
+ opbits |= ((Treqinfo >> 4) & 0x3) ? Operationrec::OP_LOCK_MODE : 0;
+ opbits |= ((Treqinfo >> 4) & 0x3) ? Operationrec::OP_ACC_LOCK_MODE : 0;
+ opbits |= (dirtyReadFlag) ? Operationrec::OP_DIRTY_READ : 0;
+ opbits |= ((Treqinfo >> 31) & 0x1) ? Operationrec::OP_LOCK_REQ : 0;
+
+ //operationRecPtr.p->nodeType = (Treqinfo >> 7) & 0x3;
operationRecPtr.p->fid = fragrecptr.p->myfid;
operationRecPtr.p->fragptr = fragrecptr.i;
operationRecPtr.p->nextParallelQue = RNIL;
@@ -977,14 +984,10 @@ void Dbacc::initOpRec(Signal* signal)
operationRecPtr.p->nextSerialQue = RNIL;
operationRecPtr.p->prevSerialQue = RNIL;
operationRecPtr.p->elementPage = RNIL;
- operationRecPtr.p->lockOwner = ZFALSE;
- operationRecPtr.p->insertIsDone = ZFALSE;
- operationRecPtr.p->elementIsDisappeared = ZFALSE;
- operationRecPtr.p->insertDeleteLen = fragrecptr.p->elementLength;
operationRecPtr.p->scanRecPtr = RNIL;
+ operationRecPtr.p->m_op_bits = opbits;
// bit to mark lock operation
- operationRecPtr.p->isAccLockReq = (Treqinfo >> 31) & 0x1;
// undo log is not run via ACCKEYREQ
}//Dbacc::initOpRec()
@@ -995,7 +998,7 @@ void Dbacc::initOpRec(Signal* signal)
void Dbacc::sendAcckeyconf(Signal* signal)
{
signal->theData[0] = operationRecPtr.p->userptr;
- signal->theData[1] = operationRecPtr.p->operation;
+ signal->theData[1] = operationRecPtr.p->m_op_bits & Operationrec::OP_MASK;
signal->theData[2] = operationRecPtr.p->fid;
signal->theData[3] = operationRecPtr.p->localdata[0];
signal->theData[4] = operationRecPtr.p->localdata[1];
@@ -1003,7 +1006,8 @@ void Dbacc::sendAcckeyconf(Signal* signal)
}//Dbacc::sendAcckeyconf()
-void Dbacc::ACCKEY_error(Uint32 fromWhere)
+void
+Dbacc::ACCKEY_error(Uint32 fromWhere)
{
switch(fromWhere) {
case 0:
@@ -1057,7 +1061,8 @@ void Dbacc::execACCKEYREQ(Signal* signal)
}//if
ptrAss(operationRecPtr, operationrec);
ptrAss(fragrecptr, fragmentrec);
- ndbrequire(operationRecPtr.p->transactionstate == IDLE);
+
+ ndbrequire(operationRecPtr.p->m_op_bits == Operationrec::OP_INITIAL);
initOpRec(signal);
// normalize key if any char attr
@@ -1071,23 +1076,31 @@ void Dbacc::execACCKEYREQ(Signal* signal)
/* WE REMEMBER THESE ADDRESS IF WE LATER NEED TO INSERT */
/* THE ITEM AFTER NOT FINDING THE ITEM. */
/*---------------------------------------------------------------*/
- getElement(signal);
-
- if (tgeResult == ZTRUE) {
- switch (operationRecPtr.p->operation) {
+ OperationrecPtr lockOwnerPtr;
+ const Uint32 found = getElement(signal, lockOwnerPtr);
+
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ if (found == ZTRUE)
+ {
+ switch (op) {
case ZREAD:
case ZUPDATE:
case ZDELETE:
case ZWRITE:
case ZSCAN_OP:
- if (!tgeLocked){
- if(operationRecPtr.p->operation == ZWRITE)
+ if (!lockOwnerPtr.p)
+ {
+ if(op == ZWRITE)
{
jam();
- operationRecPtr.p->operation = ZUPDATE;
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits |= (op = ZUPDATE);
}
+ opbits |= Operationrec::OP_STATE_RUNNING;
+ opbits |= Operationrec::OP_RUN_QUEUE;
sendAcckeyconf(signal);
- if (operationRecPtr.p->dirtyRead == ZFALSE) {
+ if (! (opbits & Operationrec::OP_DIRTY_READ)) {
/*---------------------------------------------------------------*/
// It is not a dirty read. We proceed by locking and continue with
// the operation.
@@ -1104,42 +1117,44 @@ void Dbacc::execACCKEYREQ(Signal* signal)
dbgWord32(gePageptr, tgeElementptr, eh);
gePageptr.p->word32[tgeElementptr] = eh;
- insertLockOwnersList(signal , operationRecPtr);
- return;
+ opbits |= Operationrec::OP_LOCK_OWNER;
+ insertLockOwnersList(signal, operationRecPtr);
} else {
jam();
/*---------------------------------------------------------------*/
// It is a dirty read. We do not lock anything. Set state to
// IDLE since no COMMIT call will come.
/*---------------------------------------------------------------*/
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- return;
+ opbits = Operationrec::OP_EXECUTED_DIRTY_READ;
}//if
+ operationRecPtr.p->m_op_bits = opbits;
+ return;
} else {
jam();
- accIsLockedLab(signal);
+ accIsLockedLab(signal, lockOwnerPtr);
return;
}//if
break;
case ZINSERT:
jam();
- insertExistElemLab(signal);
+ insertExistElemLab(signal, lockOwnerPtr);
return;
break;
default:
ndbrequire(false);
break;
}//switch
- } else if (tgeResult == ZFALSE) {
- switch (operationRecPtr.p->operation) {
- case ZINSERT:
+ } else if (found == ZFALSE) {
+ switch (op){
case ZWRITE:
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits |= (op = ZINSERT);
+ case ZINSERT:
jam();
- // If a write operation makes an insert we switch operation to ZINSERT so
- // that the commit-method knows an insert has been made and updates noOfElements.
- operationRecPtr.p->operation = ZINSERT;
- operationRecPtr.p->insertIsDone = ZTRUE;
+ opbits |= Operationrec::OP_INSERT_IS_DONE;
+ opbits |= Operationrec::OP_STATE_RUNNING;
+ opbits |= Operationrec::OP_RUN_QUEUE;
+ operationRecPtr.p->m_op_bits = opbits;
insertelementLab(signal);
return;
break;
@@ -1157,13 +1172,285 @@ void Dbacc::execACCKEYREQ(Signal* signal)
}//switch
} else {
jam();
- acckeyref1Lab(signal, tgeResult);
+ acckeyref1Lab(signal, found);
return;
}//if
return;
}//Dbacc::execACCKEYREQ()
void
+Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI)
+{
+ OperationrecPtr lastOp;
+ lastOp.i = opPtrI;
+ ptrCheckGuard(lastOp, coprecsize, operationrec);
+ Uint32 opbits = lastOp.p->m_op_bits;
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
+
+ if (likely(opbits == Operationrec::OP_EXECUTED_DIRTY_READ))
+ {
+ jam();
+ lastOp.p->m_op_bits = Operationrec::OP_INITIAL;
+ return;
+ }
+ else if (likely(opstate == Operationrec::OP_STATE_RUNNING))
+ {
+ opbits |= Operationrec::OP_STATE_EXECUTED;
+ lastOp.p->m_op_bits = opbits;
+ startNext(signal, lastOp);
+ return;
+ }
+ else if (opstate == Operationrec::OP_STATE_RUNNING_ABORT)
+ {
+ }
+ else
+ {
+ }
+
+ ndbout_c("bits: %.8x state: %.8x", opbits, opstate);
+ ndbrequire(false);
+}
+
+void
+Dbacc::startNext(Signal* signal, OperationrecPtr lastOp)
+{
+ jam();
+ OperationrecPtr nextOp;
+ OperationrecPtr loPtr;
+ nextOp.i = lastOp.p->nextParallelQue;
+ loPtr.i = lastOp.p->m_lock_owner_ptr_i;
+ Uint32 opbits = lastOp.p->m_op_bits;
+
+ if ((opbits & Operationrec::OP_STATE_MASK)!= Operationrec::OP_STATE_EXECUTED)
+ {
+ jam();
+ return;
+ }
+
+ Uint32 nextbits;
+ if (nextOp.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(nextOp, coprecsize, operationrec);
+ nextbits = nextOp.p->m_op_bits;
+ goto checkop;
+ }
+
+ if ((opbits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ jam();
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ nextOp.i = loPtr.p->nextSerialQue;
+ }
+ else
+ {
+ jam();
+ nextOp.i = loPtr.i;
+ loPtr = lastOp;
+ }
+
+ ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+
+ if (nextOp.i == RNIL)
+ {
+ jam();
+ return;
+ }
+
+ /**
+ * There is an op in serie queue...
+ * Check if it can run
+ */
+ ptrCheckGuard(nextOp, coprecsize, operationrec);
+ nextbits = nextOp.p->m_op_bits;
+
+ bool same = nextOp.p->is_same_trans(lastOp.p);
+
+ if (!same && ((opbits & Operationrec::OP_ACC_LOCK_MODE) ||
+ (nextbits & Operationrec::OP_LOCK_MODE)))
+ {
+ jam();
+ /**
+ * Not same transaction
+ * and either last had exclusive lock
+ * or next had exclusive lock
+ */
+ return;
+ }
+
+ /**
+ * same trans and X-lock
+ */
+ if (same && (opbits & Operationrec::OP_ACC_LOCK_MODE))
+ {
+ jam();
+ goto upgrade;
+ }
+
+ /**
+ * all shared lock...
+ */
+ if ((opbits & Operationrec::OP_ACC_LOCK_MODE) == 0 &&
+ (nextbits & Operationrec::OP_LOCK_MODE) == 0)
+ {
+ jam();
+ goto upgrade;
+ }
+
+ /**
+ * There is a shared parallell queue & and exclusive op is first in queue
+ */
+ ndbassert((opbits & Operationrec::OP_ACC_LOCK_MODE) == 0 &&
+ (nextbits & Operationrec::OP_LOCK_MODE));
+
+ /**
+ * We must check if there are many transactions in parallel queue...
+ */
+ OperationrecPtr tmp;
+ tmp.i = loPtr.p->nextParallelQue;
+ while (tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ if (!nextOp.p->is_same_trans(tmp.p))
+ {
+ jam();
+ /**
+ * parallel queue contained another transaction, dont let it run
+ */
+ return;
+ }
+ }
+
+upgrade:
+ /**
+ * Move first op in serie queue to end of parallell queue
+ */
+
+ tmp.i = loPtr.p->nextSerialQue = nextOp.p->nextSerialQue;
+ loPtr.p->m_lo_last_parallel_op_ptr_i = nextOp.i;
+ nextOp.p->nextSerialQue = RNIL;
+ nextOp.p->prevSerialQue = RNIL;
+ nextOp.p->m_lock_owner_ptr_i = loPtr.i;
+ nextOp.p->prevParallelQue = lastOp.i;
+ lastOp.p->nextParallelQue = nextOp.i;
+
+ if (tmp.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ tmp.p->prevSerialQue = loPtr.i;
+ }
+ else
+ {
+ jam();
+ loPtr.p->m_lo_last_serial_op_ptr_i = RNIL;
+ }
+
+ nextbits |= Operationrec::OP_RUN_QUEUE;
+
+ /**
+ * Currently no grouping of ops in serie queue
+ */
+ ndbrequire(nextOp.p->nextParallelQue == RNIL);
+
+checkop:
+ Uint32 errCode = 0;
+ OperationrecPtr save = operationRecPtr;
+ operationRecPtr = nextOp;
+
+ Uint32 lastop = opbits & Operationrec::OP_MASK;
+ Uint32 nextop = nextbits & Operationrec::OP_MASK;
+
+ nextbits &= nextbits & ~(Uint32)Operationrec::OP_STATE_MASK;
+ nextbits |= Operationrec::OP_STATE_RUNNING;
+
+ if (lastop == ZDELETE)
+ {
+ jam();
+ if (nextop != ZINSERT && nextop != ZWRITE)
+ {
+ errCode = ZREAD_ERROR;
+ goto ref;
+ }
+
+ nextbits &= ~(Uint32)Operationrec::OP_MASK;
+ nextbits &= ~(Uint32)Operationrec::OP_ELEMENT_DISAPPEARED;
+ nextbits |= (nextop = ZINSERT);
+ nextbits |= Operationrec::OP_INSERT_IS_DONE;
+ goto conf;
+ }
+ else if (nextop == ZINSERT)
+ {
+ jam();
+ errCode = ZWRITE_ERROR;
+ goto ref;
+ }
+ else if (nextop == ZWRITE)
+ {
+ jam();
+ nextbits &= ~(Uint32)Operationrec::OP_MASK;
+ nextbits |= (nextop = ZUPDATE);
+ goto conf;
+ }
+ else
+ {
+ jam();
+ }
+
+conf:
+ nextOp.p->m_op_bits = nextbits;
+ nextOp.p->localdata[0] = lastOp.p->localdata[0];
+ nextOp.p->localdata[1] = lastOp.p->localdata[1];
+
+ if (nextop == ZSCAN_OP && (nextbits & Operationrec::OP_LOCK_REQ) == 0)
+ {
+ jam();
+ takeOutScanLockQueue(nextOp.p->scanRecPtr);
+ putReadyScanQueue(signal, nextOp.p->scanRecPtr);
+ }
+ else
+ {
+ jam();
+ sendAcckeyconf(signal);
+ sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF,
+ signal, 6, JBA);
+ }
+
+ operationRecPtr = save;
+ return;
+
+ref:
+ nextOp.p->m_op_bits = nextbits;
+
+ if (nextop == ZSCAN_OP && (nextbits & Operationrec::OP_LOCK_REQ) == 0)
+ {
+ jam();
+ nextOp.p->m_op_bits |= Operationrec::OP_ELEMENT_DISAPPEARED;
+ takeOutScanLockQueue(nextOp.p->scanRecPtr);
+ putReadyScanQueue(signal, nextOp.p->scanRecPtr);
+ }
+ else
+ {
+ jam();
+ signal->theData[0] = nextOp.p->userptr;
+ signal->theData[1] = errCode;
+ sendSignal(nextOp.p->userblockref, GSN_ACCKEYREF, signal,
+ 2, JBB);
+ }
+
+ operationRecPtr = save;
+ return;
+}
+
+
+#if 0
+void
+Dbacc::execACCKEY_REP_REF(Signal* signal, Uint32 opPtrI)
+{
+}
+#endif
+
+void
Dbacc::xfrmKeyData(Signal* signal)
{
Uint32 table = fragrecptr.p->myTableId;
@@ -1176,23 +1463,22 @@ Dbacc::xfrmKeyData(Signal* signal)
operationRecPtr.p->xfrmtupkeylen = len;
}
-void Dbacc::accIsLockedLab(Signal* signal)
+void
+Dbacc::accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr)
{
ndbrequire(csystemRestart == ZFALSE);
- queOperPtr.i = ElementHeader::getOpPtrI(gePageptr.p->word32[tgeElementptr]);
- ptrCheckGuard(queOperPtr, coprecsize, operationrec);
- if (operationRecPtr.p->dirtyRead == ZFALSE) {
+
+ Uint32 bits = operationRecPtr.p->m_op_bits;
+ validate_lock_queue(lockOwnerPtr);
+
+ if ((bits & Operationrec::OP_DIRTY_READ) == 0){
Uint32 return_result;
- if (operationRecPtr.p->lockMode == ZREADLOCK) {
+ if ((bits & Operationrec::OP_LOCK_MODE) == ZREADLOCK) {
jam();
- priPageptr = gePageptr;
- tpriElementptr = tgeElementptr;
- return_result = placeReadInLockQueue(signal);
+ return_result = placeReadInLockQueue(lockOwnerPtr);
} else {
jam();
- pwiPageptr = gePageptr;
- tpwiElementptr = tgeElementptr;
- return_result = placeWriteInLockQueue(signal);
+ return_result = placeWriteInLockQueue(lockOwnerPtr);
}//if
if (return_result == ZPARALLEL_QUEUE) {
jam();
@@ -1202,24 +1488,29 @@ void Dbacc::accIsLockedLab(Signal* signal)
jam();
signal->theData[0] = RNIL;
return;
- } else if (return_result == ZWRITE_ERROR) {
+ } else {
jam();
acckeyref1Lab(signal, return_result);
return;
}//if
ndbrequire(false);
- } else {
- if (queOperPtr.p->elementIsDisappeared == ZFALSE) {
+ }
+ else
+ {
+ if (!(lockOwnerPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED) &&
+ lockOwnerPtr.p->localdata[0] != ~(Uint32)0)
+ {
jam();
- /*---------------------------------------------------------------*/
- // It is a dirty read. We do not lock anything. Set state to
- // IDLE since no COMMIT call will arrive.
- /*---------------------------------------------------------------*/
+ /* ---------------------------------------------------------------
+ * It is a dirty read. We do not lock anything. Set state to
+ *IDLE since no COMMIT call will arrive.
+ * ---------------------------------------------------------------*/
sendAcckeyconf(signal);
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_EXECUTED_DIRTY_READ;
return;
- } else {
+ }
+ else
+ {
jam();
/*---------------------------------------------------------------*/
// The tuple does not exist in the committed world currently.
@@ -1231,17 +1522,18 @@ void Dbacc::accIsLockedLab(Signal* signal)
}//if
}//Dbacc::accIsLockedLab()
-/* --------------------------------------------------------------------------------- */
-/* I N S E R T E X I S T E L E M E N T */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::insertExistElemLab(Signal* signal)
+/* ------------------------------------------------------------------------ */
+/* I N S E R T E X I S T E L E M E N T */
+/* ------------------------------------------------------------------------ */
+void Dbacc::insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr)
{
- if (!tgeLocked){
+ if (!lockOwnerPtr.p)
+ {
jam();
acckeyref1Lab(signal, ZWRITE_ERROR);/* THE ELEMENT ALREADY EXIST */
return;
}//if
- accIsLockedLab(signal);
+ accIsLockedLab(signal, lockOwnerPtr);
}//Dbacc::insertExistElemLab()
/* --------------------------------------------------------------------------------- */
@@ -1259,26 +1551,8 @@ void Dbacc::insertelementLab(Signal* signal)
}//if
}//if
ndbrequire(operationRecPtr.p->tupkeylen <= fragrecptr.p->keyLength);
-
- Uint32 localKey;
- if(!operationRecPtr.p->isAccLockReq)
- {
- signal->theData[0] = operationRecPtr.p->userptr;
- Uint32 blockNo = refToBlock(operationRecPtr.p->userblockref);
- EXECUTE_DIRECT(blockNo, GSN_LQH_ALLOCREQ, signal, 1);
- jamEntry();
- if (signal->theData[0] != 0) {
- jam();
- Uint32 result_code = signal->theData[0];
- acckeyref1Lab(signal, result_code);
- return;
- }//if
- localKey = (signal->theData[1] << MAX_TUPLES_BITS) + signal->theData[2];
- }
- else
- {
- localKey = signal->theData[7];
- }
+ ndbassert(!(operationRecPtr.p->m_op_bits & Operationrec::OP_LOCK_REQ));
+ Uint32 localKey = ~(Uint32)0;
insertLockOwnersList(signal, operationRecPtr);
@@ -1293,196 +1567,18 @@ void Dbacc::insertelementLab(Signal* signal)
idrOperationRecPtr = operationRecPtr;
clocalkey[0] = localKey;
operationRecPtr.p->localdata[0] = localKey;
- /* --------------------------------------------------------------------------------- */
- /* WE SET THE LOCAL KEY TO MINUS ONE TO INDICATE IT IS NOT YET VALID. */
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
+ /* WE SET THE LOCAL KEY TO MINUS ONE TO INDICATE IT IS NOT YET VALID. */
+ /* ----------------------------------------------------------------------- */
insertElement(signal);
sendAcckeyconf(signal);
return;
}//Dbacc::insertelementLab()
-/* --------------------------------------------------------------------------------- */
-/* PLACE_READ_IN_LOCK_QUEUE */
-/* INPUT: OPERATION_REC_PTR OUR OPERATION POINTER */
-/* QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER */
-/* PRI_PAGEPTR PAGE POINTER OF ELEMENT */
-/* TPRI_ELEMENTPTR ELEMENT POINTER OF ELEMENT */
-/* OUTPUT TRESULT = */
-/* ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE */
-/* OPERATION CAN PROCEED NOW. */
-/* ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE */
-/* ERROR CODE OPERATION NEEDS ABORTING */
-/* THE ELEMENT WAS LOCKED AND WE WANT TO READ THE TUPLE. WE WILL CHECK THE LOCK */
-/* QUEUES TO PERFORM THE PROPER ACTION. */
-/* */
-/* IN SOME PLACES IN THE CODE BELOW THAT HANDLES WHAT TO DO WHEN THE TUPLE IS LOCKED */
-/* WE DO ASSUME THAT NEXT_PARALLEL_QUEUE AND NEXT_SERIAL_QUEUE ON OPERATION_REC_PTR */
-/* HAVE BEEN INITIALISED TO RNIL. THUS WE DO NOT PERFORM THIS ONCE MORE EVEN IF IT */
-/* COULD BE NICE FOR READABILITY. */
-/* --------------------------------------------------------------------------------- */
-Uint32 Dbacc::placeReadInLockQueue(Signal* signal)
-{
- if (getNoParallelTransaction(queOperPtr.p) == 1) {
- if ((queOperPtr.p->transId1 == operationRecPtr.p->transId1) &&
- (queOperPtr.p->transId2 == operationRecPtr.p->transId2)) {
- /* --------------------------------------------------------------------------------- */
- /* WE ARE PERFORMING A READ OPERATION AND THIS TRANSACTION ALREADY OWNS THE LOCK */
- /* ALONE. PUT THE OPERATION LAST IN THE PARALLEL QUEUE. */
- /* --------------------------------------------------------------------------------- */
- jam();
- mlpqOperPtr = queOperPtr;
- moveLastParallelQueue(signal);
- operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1];
- operationRecPtr.p->prevParallelQue = mlpqOperPtr.i;
- mlpqOperPtr.p->nextParallelQue = operationRecPtr.i;
- switch (queOperPtr.p->lockMode) {
- case ZREADLOCK:
- jam();
- /*empty*/;
- break;
- default:
- jam();
- /* --------------------------------------------------------------------------------- */
- /* IF THE TRANSACTION PREVIOUSLY SET A WRITE LOCK WE MUST ENSURE THAT ALL */
- /* OPERATIONS IN THE PARALLEL QUEUE HAVE WRITE LOCK MODE TO AVOID STRANGE BUGS.*/
- /* --------------------------------------------------------------------------------- */
- operationRecPtr.p->lockMode = queOperPtr.p->lockMode;
- break;
- }//switch
- return ZPARALLEL_QUEUE;
- }//if
- }//if
- if (queOperPtr.p->nextSerialQue == RNIL) {
- /* --------------------------------------------------------------------------------- */
- /* WE ARE PERFORMING A READ OPERATION AND THERE IS NO SERIAL QUEUE. IF THERE IS NO */
- /* WRITE OPERATION THAT OWNS THE LOCK OR ANY WRITE OPERATION IN THE PARALLEL QUEUE */
- /* IT IS ENOUGH TO CHECK THE LOCK MODE OF THE LEADER IN THE PARALLEL QUEUE. IF IT IS */
- /* A READ LOCK THEN WE PLACE OURSELVES IN THE PARALLEL QUEUE OTHERWISE WE GO ON TO */
- /* PLACE OURSELVES IN THE SERIAL QUEUE. */
- /* --------------------------------------------------------------------------------- */
- switch (queOperPtr.p->lockMode) {
- case ZREADLOCK:
- jam();
- mlpqOperPtr = queOperPtr;
- moveLastParallelQueue(signal);
- operationRecPtr.p->prevParallelQue = mlpqOperPtr.i;
- mlpqOperPtr.p->nextParallelQue = operationRecPtr.i;
- operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1];
- return ZPARALLEL_QUEUE;
- default:
- jam();
- queOperPtr.p->nextSerialQue = operationRecPtr.i;
- operationRecPtr.p->prevSerialQue = queOperPtr.i;
- break;
- }//switch
- } else {
- jam();
- placeSerialQueueRead(signal);
- }//if
- return ZSERIAL_QUEUE;
-}//Dbacc::placeReadInLockQueue()
-
-/* --------------------------------------------------------------------------------- */
-/* WE WILL CHECK IF THIS TRANSACTION IS ALREADY PLACED AT SOME SPOT IN THE PARALLEL */
-/* SERIAL QUEUE WITHOUT ANY NEIGHBORS FROM OTHER TRANSACTION. IF SO WE WILL INSERT */
-/* IT IN THAT PARALLEL QUEUE. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::placeSerialQueueRead(Signal* signal)
-{
- readWriteOpPtr.i = queOperPtr.p->nextSerialQue;
- ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec);
-PSQR_LOOP:
- jam();
- if (readWriteOpPtr.p->nextSerialQue == RNIL) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THERE WAS NO PREVIOUS OPERATION IN THIS TRANSACTION WHICH WE COULD PUT IT */
- /* IN THE PARALLEL QUEUE TOGETHER WITH. */
- /* --------------------------------------------------------------------------------- */
- checkOnlyReadEntry(signal);
- return;
- }//if
- if (getNoParallelTransaction(readWriteOpPtr.p) == 1) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */
- /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */
- /* --------------------------------------------------------------------------------- */
- if ((readWriteOpPtr.p->transId1 == operationRecPtr.p->transId1) &&
- (readWriteOpPtr.p->transId2 == operationRecPtr.p->transId2)) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* WE ARE PERFORMING A READ IN THE SAME TRANSACTION WHERE WE ALREADY */
- /* PREVIOUSLY HAVE EXECUTED AN OPERATION. INSERT-DELETE, READ-UPDATE, READ-READ, */
- /* UPDATE-UPDATE, UPDATE-DELETE, READ-DELETE, INSERT-READ, INSERT-UPDATE ARE ALLOWED */
- /* COMBINATIONS. A NEW INSERT AFTER A DELETE IS NOT ALLOWED AND SUCH AN INSERT WILL */
- /* GO TO THE SERIAL LOCK QUEUE WHICH IT WILL NOT LEAVE UNTIL A TIME-OUT AND THE */
- /* TRANSACTION IS ABORTED. READS AND UPDATES AFTER DELETES IS ALSO NOT ALLOWED. */
- /* --------------------------------------------------------------------------------- */
- mlpqOperPtr = readWriteOpPtr;
- moveLastParallelQueue(signal);
- readWriteOpPtr = mlpqOperPtr;
- operationRecPtr.p->prevParallelQue = readWriteOpPtr.i;
- readWriteOpPtr.p->nextParallelQue = operationRecPtr.i;
- operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1];
- switch (readWriteOpPtr.p->lockMode) {
- case ZREADLOCK:
- jam();
- /*empty*/;
- break;
- default:
- jam();
- /* --------------------------------------------------------------------------------- */
- /* IF THE TRANSACTION PREVIOUSLY SET A WRITE LOCK WE MUST ENSURE THAT ALL */
- /* OPERATIONS IN THE PARALLEL QUEUE HAVE WRITE LOCK MODE TO AVOID STRANGE BUGS.*/
- /* --------------------------------------------------------------------------------- */
- operationRecPtr.p->lockMode = readWriteOpPtr.p->lockMode;
- break;
- }//switch
- return;
- }//if
- }//if
- readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue;
- ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec);
- goto PSQR_LOOP;
-}//Dbacc::placeSerialQueueRead()
-/* --------------------------------------------------------------------------------- */
-/* WE WILL CHECK IF THE LAST ENTRY IN THE SERIAL QUEUE CONTAINS ONLY READ */
-/* OPERATIONS. IF SO WE WILL INSERT IT IN THAT PARALLEL QUEUE. OTHERWISE WE */
-/* WILL PLACE IT AT THE END OF THE SERIAL QUEUE. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::checkOnlyReadEntry(Signal* signal)
-{
- switch (readWriteOpPtr.p->lockMode) {
- case ZREADLOCK:
- jam();
- /* --------------------------------------------------------------------------------- */
- /* SINCE THIS LAST QUEUE ONLY CONTAINS READ LOCKS WE CAN JOIN THE PARALLEL QUEUE AT */
- /* THE END. */
- /* --------------------------------------------------------------------------------- */
- mlpqOperPtr = readWriteOpPtr;
- moveLastParallelQueue(signal);
- readWriteOpPtr = mlpqOperPtr;
- operationRecPtr.p->prevParallelQue = readWriteOpPtr.i;
- readWriteOpPtr.p->nextParallelQue = operationRecPtr.i;
- operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1];
- break;
- default:
- jam(); /* PUT THE OPERATION RECORD IN THE SERIAL QUEUE */
- readWriteOpPtr.p->nextSerialQue = operationRecPtr.i;
- operationRecPtr.p->prevSerialQue = readWriteOpPtr.i;
- break;
- }//switch
-}//Dbacc::checkOnlyReadEntry()
-
-/* --------------------------------------------------------------------------------- */
-/* GET_NO_PARALLEL_TRANSACTION */
-/* --------------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------ */
+/* GET_NO_PARALLEL_TRANSACTION */
+/* ------------------------------------------------------------------------ */
Uint32
Dbacc::getNoParallelTransaction(const Operationrec * op)
{
@@ -1502,135 +1598,641 @@ Dbacc::getNoParallelTransaction(const Operationrec * op)
return 1;
}//Dbacc::getNoParallelTransaction()
-void Dbacc::moveLastParallelQueue(Signal* signal)
+#ifdef VM_TRACE
+Uint32
+Dbacc::getNoParallelTransactionFull(const Operationrec * op)
{
- while (mlpqOperPtr.p->nextParallelQue != RNIL) {
- jam();
- mlpqOperPtr.i = mlpqOperPtr.p->nextParallelQue;
- ptrCheckGuard(mlpqOperPtr, coprecsize, operationrec);
- }//if
-}//Dbacc::moveLastParallelQueue()
+ ConstPtr<Operationrec> tmp;
+
+ tmp.p = op;
+ while ((tmp.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ tmp.i = tmp.p->prevParallelQue;
+ if (tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return getNoParallelTransaction(tmp.p);
+}
+#endif
-void Dbacc::moveLastParallelQueueWrite(Signal* signal)
+#ifdef ACC_SAFE_QUEUE
+
+Uint32
+Dbacc::get_parallel_head(OperationrecPtr opPtr)
{
- /* --------------------------------------------------------------------------------- */
- /* ENSURE THAT ALL OPERATIONS HAVE LOCK MODE SET TO WRITE SINCE WE INSERT A */
- /* WRITE LOCK INTO THE PARALLEL QUEUE. */
- /* --------------------------------------------------------------------------------- */
- while (mlpqOperPtr.p->nextParallelQue != RNIL) {
- jam();
- mlpqOperPtr.p->lockMode = operationRecPtr.p->lockMode;
- mlpqOperPtr.i = mlpqOperPtr.p->nextParallelQue;
- ptrCheckGuard(mlpqOperPtr, coprecsize, operationrec);
- }//if
- mlpqOperPtr.p->lockMode = operationRecPtr.p->lockMode;
-}//Dbacc::moveLastParallelQueueWrite()
+ while ((opPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 &&
+ opPtr.p->prevParallelQue != RNIL)
+ {
+ opPtr.i = opPtr.p->prevParallelQue;
+ ptrCheckGuard(opPtr, coprecsize, operationrec);
+ }
+
+ return opPtr.i;
+}
-/* --------------------------------------------------------------------------------- */
-/* PLACE_WRITE_IN_LOCK_QUEUE */
-/* INPUT: OPERATION_REC_PTR OUR OPERATION POINTER */
-/* QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER */
-/* PWI_PAGEPTR PAGE POINTER OF ELEMENT */
-/* TPWI_ELEMENTPTR ELEMENT POINTER OF ELEMENT */
-/* OUTPUT TRESULT = */
-/* ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE */
-/* OPERATION CAN PROCEED NOW. */
-/* ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE */
-/* ERROR CODE OPERATION NEEDS ABORTING */
-/* --------------------------------------------------------------------------------- */
-Uint32 Dbacc::placeWriteInLockQueue(Signal* signal)
+bool
+Dbacc::validate_lock_queue(OperationrecPtr opPtr)
+{
+ OperationrecPtr loPtr;
+ loPtr.i = get_parallel_head(opPtr);
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+
+ while((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 &&
+ loPtr.p->prevSerialQue != RNIL)
+ {
+ loPtr.i = loPtr.p->prevSerialQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+
+ // Now we have lock owner...
+ vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_RUN_QUEUE);
+
+ // 1 Validate page pointer
+ {
+ Page8Ptr pagePtr;
+ pagePtr.i = loPtr.p->elementPage;
+ ptrCheckGuard(pagePtr, cpagesize, page8);
+ arrGuard(loPtr.p->elementPointer, 2048);
+ Uint32 eh = pagePtr.p->word32[loPtr.p->elementPointer];
+ vlqrequire(ElementHeader::getLocked(eh));
+ vlqrequire(ElementHeader::getOpPtrI(eh) == loPtr.i);
+ }
+
+ // 2 Lock owner should always have same LOCK_MODE and ACC_LOCK_MODE
+ if (loPtr.p->m_op_bits & Operationrec::OP_LOCK_MODE)
+ {
+ vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE);
+ }
+ else
+ {
+ vlqrequire((loPtr.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE) == 0);
+ }
+
+ // 3 Lock owner should never be waiting...
+ bool running = false;
+ {
+ Uint32 opstate = loPtr.p->m_op_bits & Operationrec::OP_STATE_MASK;
+ if (opstate == Operationrec::OP_STATE_RUNNING ||
+ opstate == Operationrec::OP_STATE_RUNNING_ABORT)
+ running = true;
+ else
+ {
+ vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED);
+ }
+ }
+
+ // Validate parallel queue
+ {
+ bool many = false;
+ bool orlockmode = loPtr.p->m_op_bits & Operationrec::OP_LOCK_MODE;
+ OperationrecPtr lastP = loPtr;
+
+ while (lastP.p->nextParallelQue != RNIL)
+ {
+ Uint32 prev = lastP.i;
+ lastP.i = lastP.p->nextParallelQue;
+ ptrCheckGuard(lastP, coprecsize, operationrec);
+
+ vlqrequire(lastP.p->prevParallelQue == prev);
+
+ Uint32 opbits = lastP.p->m_op_bits;
+ many |= loPtr.p->is_same_trans(lastP.p) ? 0 : 1;
+ orlockmode |= !!(opbits & Operationrec::OP_LOCK_MODE);
+
+ vlqrequire(opbits & Operationrec::OP_RUN_QUEUE);
+ vlqrequire((opbits & Operationrec::OP_LOCK_OWNER) == 0);
+
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
+ if (running)
+ {
+ // If I found a running operation,
+ // all following should be waiting
+ vlqrequire(opstate == Operationrec::OP_STATE_WAITING);
+ }
+ else
+ {
+ if (opstate == Operationrec::OP_STATE_RUNNING ||
+ opstate == Operationrec::OP_STATE_RUNNING_ABORT)
+ running = true;
+ else
+ vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED);
+ }
+
+ if (lastP.p->m_op_bits & Operationrec::OP_LOCK_MODE)
+ {
+ vlqrequire(lastP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE);
+ }
+ else
+ {
+ vlqrequire((lastP.p->m_op_bits && orlockmode) == orlockmode);
+ vlqrequire((lastP.p->m_op_bits & Operationrec::OP_MASK) == ZREAD ||
+ (lastP.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP);
+ }
+
+ if (many)
+ {
+ vlqrequire(orlockmode == 0);
+ }
+ }
+
+ if (lastP.i != loPtr.i)
+ {
+ vlqrequire(loPtr.p->m_lo_last_parallel_op_ptr_i == lastP.i);
+ vlqrequire(lastP.p->m_lock_owner_ptr_i == loPtr.i);
+ }
+ else
+ {
+ vlqrequire(loPtr.p->m_lo_last_parallel_op_ptr_i == RNIL);
+ }
+ }
+
+ // Validate serie queue
+ if (loPtr.p->nextSerialQue != RNIL)
+ {
+ Uint32 prev = loPtr.i;
+ OperationrecPtr lastS;
+ lastS.i = loPtr.p->nextSerialQue;
+ while (true)
+ {
+ ptrCheckGuard(lastS, coprecsize, operationrec);
+ vlqrequire(lastS.p->prevSerialQue == prev);
+ vlqrequire(getNoParallelTransaction(lastS.p) == 1);
+ vlqrequire((lastS.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0);
+ vlqrequire((lastS.p->m_op_bits & Operationrec::OP_RUN_QUEUE) == 0);
+ vlqrequire((lastS.p->m_op_bits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_WAITING);
+ if (lastS.p->nextSerialQue == RNIL)
+ break;
+ prev = lastS.i;
+ lastS.i = lastS.p->nextSerialQue;
+ }
+
+ vlqrequire(loPtr.p->m_lo_last_serial_op_ptr_i == lastS.i);
+ }
+ else
+ {
+ vlqrequire(loPtr.p->m_lo_last_serial_op_ptr_i == RNIL);
+ }
+ return true;
+}
+
+NdbOut&
+operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr)
{
- if (!((getNoParallelTransaction(queOperPtr.p) == 1) &&
- (queOperPtr.p->transId1 == operationRecPtr.p->transId1) &&
- (queOperPtr.p->transId2 == operationRecPtr.p->transId2))) {
+ Uint32 opbits = ptr.p->m_op_bits;
+ out << "[ " << dec << ptr.i
+ << " [ " << hex << ptr.p->transId1
+ << " " << hex << ptr.p->transId2 << "] "
+ << " bits: H'" << hex << opbits << " ";
+
+ bool read = false;
+ switch(opbits & Dbacc::Operationrec::OP_MASK){
+ case ZREAD: out << "READ "; read = true; break;
+ case ZINSERT: out << "INSERT "; break;
+ case ZUPDATE: out << "UPDATE "; break;
+ case ZDELETE: out << "DELETE "; break;
+ case ZWRITE: out << "WRITE "; break;
+ case ZSCAN_OP: out << "SCAN "; read = true; break;
+ default:
+ out << "<Unknown: H'"
+ << hex << (opbits & Dbacc::Operationrec::OP_MASK)
+ << "> ";
+ }
+
+ if (read)
+ {
+ if (opbits & Dbacc::Operationrec::OP_LOCK_MODE)
+ out << "(X)";
+ else
+ out << "(S)";
+ if (opbits & Dbacc::Operationrec::OP_ACC_LOCK_MODE)
+ out << "(X)";
+ else
+ out << "(S)";
+ }
+
+ if (opbits)
+ {
+ out << "(RQ)";
+ }
+
+ switch(opbits & Dbacc::Operationrec::OP_STATE_MASK){
+ case Dbacc::Operationrec::OP_STATE_WAITING:
+ out << " WAITING "; break;
+ case Dbacc::Operationrec::OP_STATE_RUNNING:
+ out << " RUNNING "; break;
+ case Dbacc::Operationrec::OP_STATE_EXECUTED:
+ out << " EXECUTED "; break;
+ case Dbacc::Operationrec::OP_STATE_RUNNING_ABORT:
+ out << " RUNNIG_ABORT "; break;
+ case Dbacc::Operationrec::OP_STATE_IDLE:
+ out << " IDLE "; break;
+ default:
+ out << " <Unknown: H'"
+ << hex << (opbits & Dbacc::Operationrec::OP_STATE_MASK)
+ << "> ";
+ }
+
+/*
+ OP_MASK = 0x000F // 4 bits for operation type
+ ,OP_LOCK_MODE = 0x0010 // 0 - shared lock, 1 = exclusive lock
+ ,OP_ACC_LOCK_MODE = 0x0020 // Or:de lock mode of all operation
+ // before me
+ ,OP_LOCK_OWNER = 0x0040
+ ,OP_DIRTY_READ = 0x0080
+ ,OP_LOCK_REQ = 0x0100 // isAccLockReq
+ ,OP_COMMIT_DELETE_CHECK = 0x0200
+ ,OP_INSERT_IS_DONE = 0x0400
+ ,OP_ELEMENT_DISAPPEARED = 0x0800
+
+ ,OP_STATE_MASK = 0xF000
+ ,OP_STATE_IDLE = 0xF000
+ ,OP_STATE_WAITING = 0x0000
+ ,OP_STATE_RUNNING = 0x1000
+ ,OP_STATE_EXECUTED = 0x3000
+ ,OP_STATE_RUNNING_ABORT = 0x2000
+ };
+*/
+ if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER)
+ out << "LO ";
+
+ if (opbits & Dbacc::Operationrec::OP_DIRTY_READ)
+ out << "DR ";
+
+ if (opbits & Dbacc::Operationrec::OP_LOCK_REQ)
+ out << "LOCK_REQ ";
+
+ if (opbits & Dbacc::Operationrec::OP_COMMIT_DELETE_CHECK)
+ out << "COMMIT_DELETE_CHECK ";
+
+ if (opbits & Dbacc::Operationrec::OP_INSERT_IS_DONE)
+ out << "INSERT_IS_DONE ";
+
+ if (opbits & Dbacc::Operationrec::OP_ELEMENT_DISAPPEARED)
+ out << "ELEMENT_DISAPPEARED ";
+
+ if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER)
+ {
+ out << "last_parallel: " << dec << ptr.p->m_lo_last_parallel_op_ptr_i << " ";
+ out << "last_serial: " << dec << ptr.p->m_lo_last_serial_op_ptr_i << " ";
+ }
+
+ out << "]";
+ return out;
+}
+
+void
+Dbacc::dump_lock_queue(OperationrecPtr loPtr)
+{
+ if ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 &&
+ loPtr.p->prevParallelQue != RNIL)
+ {
+ loPtr.i = loPtr.p->prevParallelQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+
+ while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 &&
+ loPtr.p->prevSerialQue != RNIL)
+ {
+ loPtr.i = loPtr.p->prevSerialQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+
+ ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ }
+
+ ndbout << "-- HEAD --" << endl;
+ OperationrecPtr tmp = loPtr;
+ while (tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ ndbout << tmp << " ";
+ tmp.i = tmp.p->nextParallelQue;
+
+ if (tmp.i == loPtr.i)
+ {
+ ndbout << " <LOOP>";
+ break;
+ }
+ }
+ ndbout << endl;
+
+ tmp.i = loPtr.p->nextSerialQue;
+ while (tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ OperationrecPtr tmp2 = tmp;
+
+ if (tmp.i == loPtr.i)
+ {
+ ndbout << "<LOOP S>" << endl;
+ break;
+ }
+
+ while (tmp2.i != RNIL)
+ {
+ ptrCheckGuard(tmp2, coprecsize, operationrec);
+ ndbout << tmp2 << " ";
+ tmp2.i = tmp2.p->nextParallelQue;
+
+ if (tmp2.i == tmp.i)
+ {
+ ndbout << "<LOOP 3>";
+ break;
+ }
+ }
+ ndbout << endl;
+ tmp.i = tmp.p->nextSerialQue;
+ }
+}
+#endif
+
+/* -------------------------------------------------------------------------
+ * PLACE_WRITE_IN_LOCK_QUEUE
+ * INPUT: OPERATION_REC_PTR OUR OPERATION POINTER
+ * QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER
+ * PWI_PAGEPTR PAGE POINTER OF ELEMENT
+ * TPWI_ELEMENTPTR ELEMENT POINTER OF ELEMENT
+ * OUTPUT TRESULT =
+ * ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE
+ * OPERATION CAN PROCEED NOW.
+ * ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE
+ * ERROR CODE OPERATION NEEDS ABORTING
+ * ------------------------------------------------------------------------- */
+Uint32
+Dbacc::placeWriteInLockQueue(OperationrecPtr lockOwnerPtr)
+{
+ OperationrecPtr lastOpPtr;
+ lastOpPtr.i = lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+
+ if (lastOpPtr.i == RNIL)
+ {
+ lastOpPtr = lockOwnerPtr;
+ }
+ else
+ {
+ ptrCheckGuard(lastOpPtr, coprecsize, operationrec);
+ }
+
+ ndbassert(get_parallel_head(lastOpPtr) == lockOwnerPtr.i);
+
+ Uint32 lastbits = lastOpPtr.p->m_op_bits;
+ if (lastbits & Operationrec::OP_ACC_LOCK_MODE)
+ {
+ if(operationRecPtr.p->is_same_trans(lastOpPtr.p))
+ {
+ goto checkop;
+ }
+ }
+ else
+ {
+ /**
+ * We dont have an exclusive lock on operation and
+ *
+ */
jam();
- placeSerialQueueWrite(signal);
- return ZSERIAL_QUEUE;
- }//if
+
+ /**
+ * Scan parallell queue to see if we are the only one
+ */
+ OperationrecPtr loopPtr = lockOwnerPtr;
+ do
+ {
+ ptrCheckGuard(loopPtr, coprecsize, operationrec);
+ if (!loopPtr.p->is_same_trans(operationRecPtr.p))
+ {
+ goto serial;
+ }
+ loopPtr.i = loopPtr.p->nextParallelQue;
+ } while (loopPtr.i != RNIL);
+
+ goto checkop;
+ }
+serial:
+ jam();
+ placeSerialQueue(lockOwnerPtr, operationRecPtr);
+
+ validate_lock_queue(lockOwnerPtr);
+
+ return ZSERIAL_QUEUE;
+
+checkop:
/*
- WE ARE PERFORMING AN READ EXCLUSIVE, INSERT, UPDATE OR DELETE IN THE SAME
- TRANSACTION WHERE WE PREVIOUSLY HAVE EXECUTED AN OPERATION.
- Read-All, Update-All, Insert-All and Delete-Insert are allowed
- combinations.
- Delete-Read, Delete-Update and Delete-Delete are not an allowed
- combination and will result in tuple not found error.
+ WE ARE PERFORMING AN READ EXCLUSIVE, INSERT, UPDATE OR DELETE IN THE SAME
+ TRANSACTION WHERE WE PREVIOUSLY HAVE EXECUTED AN OPERATION.
+ Read-All, Update-All, Insert-All and Delete-Insert are allowed
+ combinations.
+ Delete-Read, Delete-Update and Delete-Delete are not an allowed
+ combination and will result in tuple not found error.
*/
- mlpqOperPtr = queOperPtr;
- moveLastParallelQueueWrite(signal);
+ Uint32 lstate = lastbits & Operationrec::OP_STATE_MASK;
- if (operationRecPtr.p->operation == ZINSERT &&
- mlpqOperPtr.p->operation != ZDELETE){
+ Uint32 retValue = ZSERIAL_QUEUE; // So that it gets blocked...
+ if (lstate == Operationrec::OP_STATE_EXECUTED)
+ {
jam();
- return ZWRITE_ERROR;
- }//if
- if(operationRecPtr.p->operation == ZWRITE)
- {
- operationRecPtr.p->operation =
- (mlpqOperPtr.p->operation == ZDELETE) ? ZINSERT : ZUPDATE;
+ /**
+ * Since last operation has executed...we can now check operation types
+ * if not, we have to wait until it has executed
+ */
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ Uint32 lop = lastbits & Operationrec::OP_MASK;
+ if (op == ZINSERT && lop != ZDELETE)
+ {
+ jam();
+ return ZWRITE_ERROR;
+ }//if
+
+ /**
+ * NOTE. No checking op operation types, as one can read different save
+ * points...
+ */
+#if 0
+ if (lop == ZDELETE && (op != ZINSERT && op != ZWRITE))
+ {
+ jam();
+ return ZREAD_ERROR;
+ }
+#else
+ if (lop == ZDELETE && (op == ZUPDATE && op == ZDELETE))
+ {
+ jam();
+ return ZREAD_ERROR;
+ }
+#endif
+
+ if(op == ZWRITE)
+ {
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits |= (lop == ZDELETE) ? ZINSERT : ZUPDATE;
+ }
+
+ opbits |= Operationrec::OP_STATE_RUNNING;
+ operationRecPtr.p->localdata[0] = lastOpPtr.p->localdata[0];
+ operationRecPtr.p->localdata[1] = lastOpPtr.p->localdata[1];
+ retValue = ZPARALLEL_QUEUE;
}
- operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1];
- operationRecPtr.p->prevParallelQue = mlpqOperPtr.i;
- mlpqOperPtr.p->nextParallelQue = operationRecPtr.i;
- return ZPARALLEL_QUEUE;
+ opbits |= Operationrec::OP_RUN_QUEUE;
+ operationRecPtr.p->m_op_bits = opbits;
+ operationRecPtr.p->prevParallelQue = lastOpPtr.i;
+ operationRecPtr.p->m_lock_owner_ptr_i = lockOwnerPtr.i;
+ lastOpPtr.p->nextParallelQue = operationRecPtr.i;
+ lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i = operationRecPtr.i;
+
+ validate_lock_queue(lockOwnerPtr);
+
+ return retValue;
}//Dbacc::placeWriteInLockQueue()
-/* --------------------------------------------------------------------------------- */
-/* WE HAVE TO PLACE IT SOMEWHERE IN THE SERIAL QUEUE INSTEAD. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::placeSerialQueueWrite(Signal* signal)
+Uint32
+Dbacc::placeReadInLockQueue(OperationrecPtr lockOwnerPtr)
{
- readWriteOpPtr = queOperPtr;
-PSQW_LOOP:
- if (readWriteOpPtr.p->nextSerialQue == RNIL) {
+ OperationrecPtr lastOpPtr;
+ OperationrecPtr loopPtr = lockOwnerPtr;
+ lastOpPtr.i = lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+
+ if (lastOpPtr.i == RNIL)
+ {
+ lastOpPtr = lockOwnerPtr;
+ }
+ else
+ {
+ ptrCheckGuard(lastOpPtr, coprecsize, operationrec);
+ }
+
+ ndbassert(get_parallel_head(lastOpPtr) == lockOwnerPtr.i);
+
+ /**
+ * Last operation in parallell queue of lock owner is same trans
+ * and ACC_LOCK_MODE is exlusive, then we can proceed
+ */
+ Uint32 lastbits = lastOpPtr.p->m_op_bits;
+ bool same = operationRecPtr.p->is_same_trans(lastOpPtr.p);
+ if (same && (lastbits & Operationrec::OP_ACC_LOCK_MODE))
+ {
jam();
- /* --------------------------------------------------------------------------------- */
- /* WE COULD NOT PUT IN ANY PARALLEL QUEUE. WE MUST PUT IT LAST IN THE SERIAL QUEUE. */
- /* --------------------------------------------------------------------------------- */
- readWriteOpPtr.p->nextSerialQue = operationRecPtr.i;
- operationRecPtr.p->prevSerialQue = readWriteOpPtr.i;
- return;
- }//if
- readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue;
- ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec);
- if (getNoParallelTransaction(readWriteOpPtr.p) == 1) {
- /* --------------------------------------------------------------------------------- */
- /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */
- /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */
- /* --------------------------------------------------------------------------------- */
- if ((readWriteOpPtr.p->transId1 == operationRecPtr.p->transId1) &&
- (readWriteOpPtr.p->transId2 == operationRecPtr.p->transId2)) {
+ goto checkop;
+ }
+
+ if ((lastbits & Operationrec::OP_ACC_LOCK_MODE) && !same)
+ {
+ jam();
+ /**
+ * Last op in serial queue had X-lock and was not our transaction...
+ */
+ goto serial;
+ }
+
+ if (lockOwnerPtr.p->nextSerialQue == RNIL)
+ {
+ jam();
+ goto checkop;
+ }
+
+ /**
+ * Scan parallell queue to see if we are already there...
+ */
+ do
+ {
+ ptrCheckGuard(loopPtr, coprecsize, operationrec);
+ if (loopPtr.p->is_same_trans(operationRecPtr.p))
+ goto checkop;
+ loopPtr.i = loopPtr.p->nextParallelQue;
+ } while (loopPtr.i != RNIL);
+
+serial:
+ placeSerialQueue(lockOwnerPtr, operationRecPtr);
+
+ validate_lock_queue(lockOwnerPtr);
+
+ return ZSERIAL_QUEUE;
+
+checkop:
+ Uint32 lstate = lastbits & Operationrec::OP_STATE_MASK;
+
+ Uint32 retValue = ZSERIAL_QUEUE; // So that it gets blocked...
+ if (lstate == Operationrec::OP_STATE_EXECUTED)
+ {
+ jam();
+
+ /**
+ * NOTE. No checking op operation types, as one can read different save
+ * points...
+ */
+
+#if 0
+ /**
+ * Since last operation has executed...we can now check operation types
+ * if not, we have to wait until it has executed
+ */
+ if (lop == ZDELETE)
+ {
jam();
- /* --------------------------------------------------------------------------------- */
- /* WE ARE PERFORMING AN UPDATE OR DELETE IN THE SAME TRANSACTION WHERE WE ALREADY */
- /* PREVIOUSLY HAVE EXECUTED AN OPERATION. INSERT-DELETE, READ-UPDATE, READ-READ, */
- /* UPDATE-UPDATE, UPDATE-DELETE, READ-DELETE, INSERT-READ, INSERT-UPDATE ARE ALLOWED */
- /* COMBINATIONS. A NEW INSERT AFTER A DELETE IS NOT ALLOWED AND SUCH AN INSERT WILL */
- /* GO TO THE SERIAL LOCK QUEUE WHICH IT WILL NOT LEAVE UNTIL A TIME-OUT AND THE */
- /* TRANSACTION IS ABORTED. READS AND UPDATES AFTER DELETES IS ALSO NOT ALLOWED. */
- /* --------------------------------------------------------------------------------- */
- mlpqOperPtr = readWriteOpPtr;
- moveLastParallelQueueWrite(signal);
- readWriteOpPtr = mlpqOperPtr;
- operationRecPtr.p->prevParallelQue = readWriteOpPtr.i;
- readWriteOpPtr.p->nextParallelQue = operationRecPtr.i;
- operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1];
- return;
- }//if
- }//if
- goto PSQW_LOOP;
-}//Dbacc::placeSerialQueueWrite()
+ return ZREAD_ERROR;
+ }
+#endif
+
+ opbits |= Operationrec::OP_STATE_RUNNING;
+ operationRecPtr.p->localdata[0] = lastOpPtr.p->localdata[0];
+ operationRecPtr.p->localdata[1] = lastOpPtr.p->localdata[1];
+ retValue = ZPARALLEL_QUEUE;
+ }
+ opbits |= (lastbits & Operationrec::OP_ACC_LOCK_MODE);
+ opbits |= Operationrec::OP_RUN_QUEUE;
+ operationRecPtr.p->m_op_bits = opbits;
+
+ operationRecPtr.p->prevParallelQue = lastOpPtr.i;
+ operationRecPtr.p->m_lock_owner_ptr_i = lockOwnerPtr.i;
+ lastOpPtr.p->nextParallelQue = operationRecPtr.i;
+ lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i = operationRecPtr.i;
+
+ validate_lock_queue(lockOwnerPtr);
+
+ return retValue;
+}//Dbacc::placeReadInLockQueue
+
+void Dbacc::placeSerialQueue(OperationrecPtr lockOwnerPtr,
+ OperationrecPtr opPtr)
+{
+ OperationrecPtr lastOpPtr;
+ lastOpPtr.i = lockOwnerPtr.p->m_lo_last_serial_op_ptr_i;
+
+ if (lastOpPtr.i == RNIL)
+ {
+ // Lock owner is last...
+ ndbrequire(lockOwnerPtr.p->nextSerialQue == RNIL);
+ lastOpPtr = lockOwnerPtr;
+ }
+ else
+ {
+ ptrCheckGuard(lastOpPtr, coprecsize, operationrec);
+ }
+
+ operationRecPtr.p->prevSerialQue = lastOpPtr.i;
+ lastOpPtr.p->nextSerialQue = opPtr.i;
+ lockOwnerPtr.p->m_lo_last_serial_op_ptr_i = opPtr.i;
+}
/* ------------------------------------------------------------------------- */
/* ACC KEYREQ END */
/* ------------------------------------------------------------------------- */
void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code)
{
- operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
+ operationRecPtr.p->m_op_bits = ~0;
/* ************************<< */
/* ACCKEYREF */
/* ************************<< */
@@ -1639,15 +2241,15 @@ void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code)
return;
}//Dbacc::acckeyref1Lab()
-/* ******************--------------------------------------------------------------- */
-/* ACCMINUPDATE UPDATE LOCAL KEY REQ */
-/* DESCRIPTION: UPDATES LOCAL KEY OF AN ELEMENTS IN THE HASH TABLE */
-/* THIS SIGNAL IS WAITED AFTER ANY INSERT REQ */
-/* ENTER ACCMINUPDATE WITH SENDER: LQH, LEVEL B */
-/* OPERATION_REC_PTR, OPERATION RECORD PTR */
-/* CLOCALKEY(0), LOCAL KEY 1 */
-/* CLOCALKEY(1) LOCAL KEY 2 */
-/* ******************--------------------------------------------------------------- */
+/* ******************----------------------------------------------------- */
+/* ACCMINUPDATE UPDATE LOCAL KEY REQ */
+/* DESCRIPTION: UPDATES LOCAL KEY OF AN ELEMENTS IN THE HASH TABLE */
+/* THIS SIGNAL IS WAITED AFTER ANY INSERT REQ */
+/* ENTER ACCMINUPDATE WITH SENDER: LQH, LEVEL B */
+/* OPERATION_REC_PTR, OPERATION RECORD PTR */
+/* CLOCALKEY(0), LOCAL KEY 1 */
+/* CLOCALKEY(1) LOCAL KEY 2 */
+/* ******************----------------------------------------------------- */
void Dbacc::execACCMINUPDATE(Signal* signal)
{
Page8Ptr ulkPageidptr;
@@ -1660,19 +2262,26 @@ void Dbacc::execACCMINUPDATE(Signal* signal)
tlocalkey1 = signal->theData[1];
tlocalkey2 = signal->theData[2];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
- if (operationRecPtr.p->transactionstate == ACTIVE) {
- fragrecptr.i = operationRecPtr.p->fragptr;
- ulkPageidptr.i = operationRecPtr.p->elementPage;
- tulkLocalPtr = operationRecPtr.p->elementPointer + operationRecPtr.p->elementIsforward;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ fragrecptr.i = operationRecPtr.p->fragptr;
+ ulkPageidptr.i = operationRecPtr.p->elementPage;
+ tulkLocalPtr = operationRecPtr.p->elementPointer +
+ operationRecPtr.p->elementIsforward;
+
+ if ((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_RUNNING)
+ {
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
ptrCheckGuard(ulkPageidptr, cpagesize, page8);
dbgWord32(ulkPageidptr, tulkLocalPtr, tlocalkey1);
arrGuard(tulkLocalPtr, 2048);
ulkPageidptr.p->word32[tulkLocalPtr] = tlocalkey1;
operationRecPtr.p->localdata[0] = tlocalkey1;
- if (fragrecptr.p->localkeylen == 1) {
+ if (likely(fragrecptr.p->localkeylen == 1))
+ {
return;
- } else if (fragrecptr.p->localkeylen == 2) {
+ }
+ else if (fragrecptr.p->localkeylen == 2)
+ {
jam();
tulkLocalPtr = tulkLocalPtr + operationRecPtr.p->elementIsforward;
operationRecPtr.p->localdata[1] = tlocalkey2;
@@ -1696,15 +2305,17 @@ void Dbacc::execACC_COMMITREQ(Signal* signal)
{
Uint8 Toperation;
jamEntry();
- operationRecPtr.i = signal->theData[0];
+ Uint32 tmp = operationRecPtr.i = signal->theData[0];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
- ndbrequire(operationRecPtr.p->transactionstate == ACTIVE);
+ void* ptr = operationRecPtr.p;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
fragrecptr.i = operationRecPtr.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
+ Toperation = opbits & Operationrec::OP_MASK;
commitOperation(signal);
- Toperation = operationRecPtr.p->operation;
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
+ ndbassert(operationRecPtr.i == tmp);
+ ndbassert(operationRecPtr.p == ptr);
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
if(Toperation != ZREAD){
fragrecptr.p->m_commit_count++;
if (Toperation != ZINSERT) {
@@ -1713,7 +2324,7 @@ void Dbacc::execACC_COMMITREQ(Signal* signal)
} else {
jam();
fragrecptr.p->noOfElements--;
- fragrecptr.p->slack += operationRecPtr.p->insertDeleteLen;
+ fragrecptr.p->slack += fragrecptr.p->elementLength;
if (fragrecptr.p->slack > fragrecptr.p->slackCheck) {
/* TIME FOR JOIN BUCKETS PROCESS */
if (fragrecptr.p->expandCounter > 0) {
@@ -1732,7 +2343,7 @@ void Dbacc::execACC_COMMITREQ(Signal* signal)
} else {
jam(); /* EXPAND PROCESS HANDLING */
fragrecptr.p->noOfElements++;
- fragrecptr.p->slack -= operationRecPtr.p->insertDeleteLen;
+ fragrecptr.p->slack -= fragrecptr.p->elementLength;
if (fragrecptr.p->slack >= (1u << 31)) {
/* IT MEANS THAT IF SLACK < ZERO */
if (fragrecptr.p->expandFlag == 0) {
@@ -1749,45 +2360,57 @@ void Dbacc::execACC_COMMITREQ(Signal* signal)
return;
}//Dbacc::execACC_COMMITREQ()
-/* ******************--------------------------------------------------------------- */
-/* ACC ABORT REQ ABORT ALL OPERATION OF THE TRANSACTION */
-/* ******************------------------------------+ */
-/* SENDER: LQH, LEVEL B */
-/* ******************--------------------------------------------------------------- */
-/* ACC ABORT REQ ABORT TRANSACTION */
-/* ******************------------------------------+ */
+/* ******************------------------------------------------------------- */
+/* ACC ABORT REQ ABORT ALL OPERATION OF THE TRANSACTION */
+/* ******************------------------------------+ */
+/* SENDER: LQH, LEVEL B */
+/* ******************------------------------------------------------------- */
+/* ACC ABORT REQ ABORT TRANSACTION */
+/* ******************------------------------------+ */
/* SENDER: LQH, LEVEL B */
void Dbacc::execACC_ABORTREQ(Signal* signal)
{
jamEntry();
- accAbortReqLab(signal);
-}//Dbacc::execACC_ABORTREQ()
-
-void Dbacc::accAbortReqLab(Signal* signal)
-{
operationRecPtr.i = signal->theData[0];
- bool sendConf = signal->theData[1];
+ Uint32 sendConf = signal->theData[1];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
+ fragrecptr.i = operationRecPtr.p->fragptr;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
tresult = 0; /* ZFALSE */
- if ((operationRecPtr.p->transactionstate == ACTIVE) ||
- (operationRecPtr.p->transactionstate == WAIT_COMMIT_ABORT)) {
+
+ if (opbits == Operationrec::OP_EXECUTED_DIRTY_READ)
+ {
+ jam();
+ }
+ else if (opstate == Operationrec::OP_STATE_EXECUTED ||
+ opstate == Operationrec::OP_STATE_WAITING ||
+ opstate == Operationrec::OP_STATE_RUNNING)
+ {
jam();
- fragrecptr.i = operationRecPtr.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- operationRecPtr.p->transactionstate = ABORT;
abortOperation(signal);
- } else {
- ndbrequire(operationRecPtr.p->transactionstate == IDLE);
- jam();
- }//if
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- if (! sendConf)
- return;
+ }
+
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
+
signal->theData[0] = operationRecPtr.p->userptr;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACC_ABORTCONF, signal, 1, JBB);
- return;
-}//Dbacc::accAbortReqLab()
+ signal->theData[1] = 0;
+ switch(sendConf){
+ case 0:
+ return;
+ case 2:
+ if (opstate != Operationrec::OP_STATE_RUNNING)
+ {
+ return;
+ }
+ case 1:
+ sendSignal(operationRecPtr.p->userblockref, GSN_ACC_ABORTCONF,
+ signal, 1, JBB);
+ }
+
+ signal->theData[1] = RNIL;
+}
/*
* Lock or unlock tuple.
@@ -1828,8 +2451,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal)
// init as in ACCSEIZEREQ
operationRecPtr.p->userptr = req->userPtr;
operationRecPtr.p->userblockref = req->userRef;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- operationRecPtr.p->transactionstate = IDLE;
+ operationRecPtr.p->m_op_bits = ~0;
operationRecPtr.p->scanRecPtr = RNIL;
// do read with lock via ACCKEYREQ
Uint32 lockMode = (lockOp == AccLockReq::LockShared) ? 0 : 1;
@@ -1880,8 +2502,8 @@ void Dbacc::execACC_LOCKREQ(Signal* signal)
jam();
// do abort via ACC_ABORTREQ (immediate)
signal->theData[0] = req->accOpPtr;
- signal->theData[1] = false; // Dont send abort
- accAbortReqLab(signal);
+ signal->theData[1] = 0; // Dont send abort
+ execACC_ABORTREQ(signal);
releaseOpRec(signal);
req->returnCode = AccLockReq::Success;
*sig = *req;
@@ -1891,8 +2513,8 @@ void Dbacc::execACC_LOCKREQ(Signal* signal)
jam();
// do abort via ACC_ABORTREQ (with conf signal)
signal->theData[0] = req->accOpPtr;
- signal->theData[1] = true; // send abort
- accAbortReqLab(signal);
+ signal->theData[1] = 1; // send abort
+ execACC_ABORTREQ(signal);
releaseOpRec(signal);
req->returnCode = AccLockReq::Success;
*sig = *req;
@@ -2551,16 +3173,29 @@ void Dbacc::getdirindex(Signal* signal)
}//Dbacc::getdirindex()
Uint32
-Dbacc::readTablePk(Uint32 localkey1)
+Dbacc::readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec* op)
{
+ int ret;
Uint32 tableId = fragrecptr.p->myTableId;
Uint32 fragId = fragrecptr.p->myfid;
- Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS;
- Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1);
+ bool xfrm = fragrecptr.p->hasCharAttr;
+
#ifdef VM_TRACE
memset(ckeys, 0x1f, (fragrecptr.p->keyLength * MAX_XFRM_MULTIPLY) << 2);
#endif
- int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys, true);
+
+ if (likely(localkey1 != ~(Uint32)0))
+ {
+ Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS;
+ Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1);
+ ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex,
+ ckeys, true);
+ }
+ else
+ {
+ ndbrequire(ElementHeader::getLocked(eh));
+ ret = c_lqh->readPrimaryKeys(op->userptr, ckeys, xfrm);
+ }
jamEntry();
ndbrequire(ret >= 0);
return ret;
@@ -2594,11 +3229,12 @@ void ndb_acc_ia64_icc810_dummy_func()
#endif
#endif
-void Dbacc::getElement(Signal* signal)
+Uint32
+Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr)
{
+ Uint32 errcode;
DirRangePtr geOverflowrangeptr;
DirectoryarrayPtr geOverflowDirptr;
- OperationrecPtr geTmpOperationRecPtr;
Uint32 tgeElementHeader;
Uint32 tgeElemStep;
Uint32 tgeContainerhead;
@@ -2613,7 +3249,6 @@ void Dbacc::getElement(Signal* signal)
getdirindex(signal);
tgePageindex = tgdiPageindex;
gePageptr = gdiPageptr;
- tgeResult = ZFALSE;
/*
* The value seached is
* - table key for ACCKEYREQ, stored in TUP
@@ -2623,7 +3258,6 @@ void Dbacc::getElement(Signal* signal)
ndbrequire(TelemLen == ZELEM_HEAD_SIZE + fragrecptr.p->localkeylen);
tgeNextptrtype = ZLEFT;
- tgeLocked = 0;
const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits;
const Uint32 opHashValuePart = (operationRecPtr.p->hashValue >> tmp) &0xFFFF;
@@ -2636,9 +3270,17 @@ void Dbacc::getElement(Signal* signal)
tgeKeyptr = (tgeElementptr + ZELEM_HEAD_SIZE) + fragrecptr.p->localkeylen;
tgeElemStep = TelemLen;
tgeForward = 1;
- if (tgeContainerptr >= 2048) { ACCKEY_error(4); return;}
+ if (unlikely(tgeContainerptr >= 2048))
+ {
+ errcode = 4;
+ goto error;
+ }
tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26;
- if ((tgeContainerptr + tgeRemLen - 1) >= 2048) { ACCKEY_error(5); return;}
+ if (unlikely(((tgeContainerptr + tgeRemLen - 1) >= 2048)))
+ {
+ errcode = 5;
+ goto error;
+ }
} else if (tgeNextptrtype == ZRIGHT) {
jam();
tgeContainerptr = tgeContainerptr + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
@@ -2646,29 +3288,42 @@ void Dbacc::getElement(Signal* signal)
tgeKeyptr = (tgeElementptr - ZELEM_HEAD_SIZE) - fragrecptr.p->localkeylen;
tgeElemStep = 0 - TelemLen;
tgeForward = (Uint32)-1;
- if (tgeContainerptr >= 2048) { ACCKEY_error(4); return;}
+ if (unlikely(tgeContainerptr >= 2048))
+ {
+ errcode = 4;
+ goto error;
+ }
tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26;
- if ((tgeContainerptr - tgeRemLen) >= 2048) { ACCKEY_error(5); return;}
+ if (unlikely((tgeContainerptr - tgeRemLen) >= 2048))
+ {
+ errcode = 5;
+ goto error;
+ }
} else {
- ACCKEY_error(6); return;
+ errcode = 6;
+ goto error;
}//if
if (tgeRemLen >= ZCON_HEAD_SIZE + TelemLen) {
- if (tgeRemLen > ZBUF_SIZE) {
- ACCKEY_error(7); return;
+ if (unlikely(tgeRemLen > ZBUF_SIZE))
+ {
+ errcode = 7;
+ goto error;
}//if
- /* --------------------------------------------------------------------------------- */
- // There is at least one element in this container. Check if it is the element
- // searched for.
- /* --------------------------------------------------------------------------------- */
+ /* ------------------------------------------------------------------- */
+ // There is at least one element in this container.
+ // Check if it is the element searched for.
+ /* ------------------------------------------------------------------- */
do {
tgeElementHeader = gePageptr.p->word32[tgeElementptr];
tgeRemLen = tgeRemLen - TelemLen;
Uint32 hashValuePart;
+ lockOwnerPtr.i = RNIL;
+ lockOwnerPtr.p = NULL;
if (ElementHeader::getLocked(tgeElementHeader)) {
jam();
- geTmpOperationRecPtr.i = ElementHeader::getOpPtrI(tgeElementHeader);
- ptrCheckGuard(geTmpOperationRecPtr, coprecsize, operationrec);
- hashValuePart = geTmpOperationRecPtr.p->hashvaluePart;
+ lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader);
+ ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec);
+ hashValuePart = lockOwnerPtr.p->hashvaluePart;
} else {
jam();
hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader);
@@ -2678,21 +3333,21 @@ void Dbacc::getElement(Signal* signal)
Uint32 localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward];
Uint32 localkey2 = 0;
bool found;
- if (! searchLocalKey) {
- Uint32 len = readTablePk(localkey1);
+ if (! searchLocalKey)
+ {
+ Uint32 len = readTablePk(localkey1, tgeElementptr, lockOwnerPtr.p);
found = (len == operationRecPtr.p->xfrmtupkeylen) &&
(memcmp(Tkeydata, ckeys, len << 2) == 0);
} else {
jam();
found = (localkey1 == Tkeydata[0]);
}
- if (found) {
+ if (found)
+ {
jam();
- tgeLocked = ElementHeader::getLocked(tgeElementHeader);
- tgeResult = ZTRUE;
operationRecPtr.p->localdata[0] = localkey1;
operationRecPtr.p->localdata[1] = localkey2;
- return;
+ return ZTRUE;
}
}
if (tgeRemLen <= ZCON_HEAD_SIZE) {
@@ -2701,18 +3356,22 @@ void Dbacc::getElement(Signal* signal)
tgeElementptr = tgeElementptr + tgeElemStep;
} while (true);
}//if
- if (tgeRemLen != ZCON_HEAD_SIZE) {
- ACCKEY_error(8); return;
+ if (unlikely(tgeRemLen != ZCON_HEAD_SIZE))
+ {
+ errcode = 8;
+ goto error;
}//if
tgeContainerhead = gePageptr.p->word32[tgeContainerptr];
tgeNextptrtype = (tgeContainerhead >> 7) & 0x3;
if (tgeNextptrtype == 0) {
jam();
- return; /* NO MORE CONTAINER */
+ return ZFALSE; /* NO MORE CONTAINER */
}//if
tgePageindex = tgeContainerhead & 0x7f; /* NEXT CONTAINER PAGE INDEX 7 BITS */
- if (tgePageindex > ZEMPTYLIST) {
- ACCKEY_error(9); return;
+ if (unlikely(tgePageindex > ZEMPTYLIST))
+ {
+ errcode = 9;
+ goto error;
}//if
if (((tgeContainerhead >> 9) & 1) == ZFALSE) {
jam();
@@ -2726,55 +3385,72 @@ void Dbacc::getElement(Signal* signal)
ptrCheckGuard(gePageptr, cpagesize, page8);
}//if
} while (1);
- return;
+
+ return ZFALSE;
+
+error:
+ ACCKEY_error(errcode);
+ return ~0;
}//Dbacc::getElement()
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* */
-/* END OF GET_ELEMENT MODULE */
-/* */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* */
-/* MODULE: DELETE */
-/* */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* COMMITDELETE */
-/* INPUT: OPERATION_REC_PTR, PTR TO AN OPERATION RECORD. */
-/* FRAGRECPTR, PTR TO A FRAGMENT RECORD */
-/* */
-/* OUTPUT: */
-/* NONE */
-/* DESCRIPTION: DELETE OPERATIONS WILL BE COMPLETED AT THE COMMIT OF TRANSA- */
-/* CTION. THIS SUBROUTINE SEARCHS FOR ELEMENT AND DELETES IT. IT DOES SO BY */
-/* REPLACING IT WITH THE LAST ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT */
-/* IS ALSO THE LAST ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT. */
-/* --------------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* */
+/* END OF GET_ELEMENT MODULE */
+/* */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* */
+/* MODULE: DELETE */
+/* */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* COMMITDELETE */
+/* INPUT: OPERATION_REC_PTR, PTR TO AN OPERATION RECORD. */
+/* FRAGRECPTR, PTR TO A FRAGMENT RECORD */
+/* */
+/* OUTPUT: */
+/* NONE */
+/* DESCRIPTION: DELETE OPERATIONS WILL BE COMPLETED AT THE
+ * COMMIT OF TRANSACTION. THIS SUBROUTINE SEARCHS FOR ELEMENT AND
+ * DELETES IT. IT DOES SO BY REPLACING IT WITH THE LAST
+ * ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT IS ALSO THE LAST
+ * ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT
+ * ------------------------------------------------------------------------- */
+void
+Dbacc::report_dealloc(Signal* signal, const Operationrec* opPtrP)
+{
+ Uint32 localKey = opPtrP->localdata[0];
+ Uint32 opbits = opPtrP->m_op_bits;
+ Uint32 userptr= opPtrP->userptr;
+ Uint32 scanInd =
+ ((opbits & Operationrec::OP_MASK) == ZSCAN_OP) ||
+ (opbits & Operationrec::OP_LOCK_REQ);
+
+ if (localKey != ~(Uint32)0)
+ {
+ signal->theData[0] = fragrecptr.p->myfid;
+ signal->theData[1] = fragrecptr.p->myTableId;
+ Uint32 pageId = localKey >> MAX_TUPLES_BITS;
+ Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1);
+ signal->theData[2] = pageId;
+ signal->theData[3] = pageIndex;
+ signal->theData[4] = userptr;
+ signal->theData[5] = scanInd;
+ EXECUTE_DIRECT(DBLQH, GSN_TUP_DEALLOCREQ, signal, 6);
+ jamEntry();
+ }
+}
+
void Dbacc::commitdelete(Signal* signal)
{
jam();
- Uint32 localKey = operationRecPtr.p->localdata[0];
- Uint32 userptr= operationRecPtr.p->userptr;
- Uint32 scanInd = operationRecPtr.p->operation == ZSCAN_OP
- || operationRecPtr.p->isAccLockReq;
-
- signal->theData[0] = fragrecptr.p->myfid;
- signal->theData[1] = fragrecptr.p->myTableId;
- Uint32 pageId = localKey >> MAX_TUPLES_BITS;
- Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1);
- signal->theData[2] = pageId;
- signal->theData[3] = pageIndex;
- signal->theData[4] = userptr;
- signal->theData[5] = scanInd;
- EXECUTE_DIRECT(DBLQH, GSN_TUP_DEALLOCREQ, signal, 6);
- jamEntry();
+ report_dealloc(signal, operationRecPtr.p);
getdirindex(signal);
tlastPageindex = tgdiPageindex;
@@ -3263,31 +3939,316 @@ void Dbacc::checkoverfreelist(Signal* signal)
/*BE CHECKED. THE OPERATION RECORD WILL BE REMOVED FROM THE QUEUE IF IT */
/*BELONGED TO ANY ONE, OTHERWISE THE ELEMENT HEAD WILL BE UPDATED. */
/* ------------------------------------------------------------------------- */
+
+/**
+ *
+ * P0 - P1 - P2 - P3
+ * S0
+ * S1
+ * S2
+ */
+void
+Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr)
+{
+ OperationrecPtr nextP;
+ OperationrecPtr prevP;
+ OperationrecPtr loPtr;
+
+ Uint32 opbits = opPtr.p->m_op_bits;
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
+ nextP.i = opPtr.p->nextParallelQue;
+ prevP.i = opPtr.p->prevParallelQue;
+ loPtr.i = opPtr.p->m_lock_owner_ptr_i;
+
+ ndbassert(! (opbits & Operationrec::OP_LOCK_OWNER));
+ ndbassert(opbits & Operationrec::OP_RUN_QUEUE);
+
+ ptrCheckGuard(prevP, coprecsize, operationrec);
+ ndbassert(prevP.p->nextParallelQue == opPtr.i);
+ prevP.p->nextParallelQue = nextP.i;
+
+ if (nextP.i != RNIL)
+ {
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ ndbassert(nextP.p->prevParallelQue == opPtr.i);
+ nextP.p->prevParallelQue = prevP.i;
+ }
+ else if (prevP.i != loPtr.i)
+ {
+ jam();
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ ndbassert(loPtr.p->m_lo_last_parallel_op_ptr_i == opPtr.i);
+ loPtr.p->m_lo_last_parallel_op_ptr_i = prevP.i;
+ prevP.p->m_lock_owner_ptr_i = loPtr.i;
+
+ /**
+ * Abort P3...check start next
+ */
+ startNext(signal, prevP);
+ validate_lock_queue(prevP);
+ return;
+ }
+ else
+ {
+ jam();
+ ndbassert(prevP.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ prevP.p->m_lo_last_parallel_op_ptr_i = RNIL;
+ startNext(signal, prevP);
+ validate_lock_queue(prevP);
+ return;
+ }
+
+ if (opbits & Operationrec::OP_LOCK_MODE)
+ {
+ Uint32 nextbits = nextP.p->m_op_bits;
+ while ((nextbits & Operationrec::OP_LOCK_MODE) == 0)
+ {
+ ndbassert(nextbits & Operationrec::OP_ACC_LOCK_MODE);
+ nextbits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE;
+ nextP.p->m_op_bits = nextbits;
+
+ if (nextP.p->nextParallelQue != RNIL)
+ {
+ nextP.i = nextP.p->nextParallelQue;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ nextbits = nextP.p->m_op_bits;
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Abort P1, P2
+ */
+
+ /**
+ * Scan to last of run queue
+ */
+ while (nextP.p->nextParallelQue != RNIL)
+ {
+ nextP.i = nextP.p->nextParallelQue;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ }
+
+#ifdef VM_TRACE
+ loPtr.i = nextP.p->m_lock_owner_ptr_i;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ ndbassert(loPtr.p->m_lo_last_parallel_op_ptr_i == nextP.i);
+#endif
+ startNext(signal, nextP);
+ validate_lock_queue(nextP);
+
+ return;
+}
+
+void
+Dbacc::abortSerieQueueOperation(Signal* signal, OperationrecPtr opPtr)
+{
+ OperationrecPtr prevS, nextS;
+ OperationrecPtr prevP, nextP;
+ OperationrecPtr loPtr;
+
+ Uint32 opbits = opPtr.p->m_op_bits;
+
+ prevS.i = opPtr.p->prevSerialQue;
+ nextS.i = opPtr.p->nextSerialQue;
+
+ prevP.i = opPtr.p->prevParallelQue;
+ nextP.i = opPtr.p->nextParallelQue;
+
+ ndbassert((opbits & Operationrec::OP_LOCK_OWNER) == 0);
+ ndbassert((opbits & Operationrec::OP_RUN_QUEUE) == 0);
+
+ if (prevP.i != RNIL)
+ {
+ /**
+ * We're not list head...
+ */
+ ptrCheckGuard(prevP, coprecsize, operationrec);
+ ndbassert(prevP.p->nextParallelQue == opPtr.i);
+ prevP.p->nextParallelQue = nextP.i;
+
+ if (nextP.i != RNIL)
+ {
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ ndbassert(nextP.p->prevParallelQue == opPtr.i);
+ ndbassert((nextP.p->m_op_bits & Operationrec::OP_STATE_MASK) ==
+ Operationrec::OP_STATE_WAITING);
+ nextP.p->prevParallelQue = prevP.i;
+
+ if ((prevP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE) == 0 &&
+ opbits & Operationrec::OP_LOCK_MODE)
+ {
+ /**
+ * Scan right in parallel queue to fix OP_ACC_LOCK_MODE
+ */
+ while ((nextP.p->m_op_bits & Operationrec::OP_LOCK_MODE) == 0)
+ {
+ ndbassert(nextP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE);
+ nextP.p->m_op_bits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE;
+ nextP.i = nextP.p->nextParallelQue;
+ if (nextP.i == RNIL)
+ break;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ }
+ }
+ }
+ validate_lock_queue(prevP);
+ return;
+ }
+ else
+ {
+ /**
+ * We're a list head
+ */
+ ptrCheckGuard(prevS, coprecsize, operationrec);
+ ndbassert(prevS.p->nextSerialQue == opPtr.i);
+
+ if (nextP.i != RNIL)
+ {
+ /**
+ * Promote nextP to list head
+ */
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ ndbassert(nextP.p->prevParallelQue == opPtr.i);
+ prevS.p->nextSerialQue = nextP.i;
+ nextP.p->prevParallelQue = RNIL;
+ nextP.p->nextSerialQue = nextS.i;
+ if (nextS.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(nextS, coprecsize, operationrec);
+ ndbassert(nextS.p->prevSerialQue == opPtr.i);
+ nextS.p->prevSerialQue = nextP.i;
+ validate_lock_queue(prevS);
+ return;
+ }
+ else
+ {
+ // nextS is RNIL, i.e we're last in serie queue...
+ // we must update lockOwner.m_lo_last_serial_op_ptr_i
+ loPtr = prevS;
+ while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ loPtr.i = loPtr.p->prevSerialQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+ ndbassert(loPtr.p->m_lo_last_serial_op_ptr_i == opPtr.i);
+ loPtr.p->m_lo_last_serial_op_ptr_i = nextP.i;
+ validate_lock_queue(loPtr);
+ return;
+ }
+ }
+
+ if (nextS.i == RNIL)
+ {
+ /**
+ * Abort S2
+ */
+
+ // nextS is RNIL, i.e we're last in serie queue...
+ // and we have no parallel queue,
+ // we must update lockOwner.m_lo_last_serial_op_ptr_i
+ prevS.p->nextSerialQue = RNIL;
+
+ loPtr = prevS;
+ while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ loPtr.i = loPtr.p->prevSerialQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+ ndbassert(loPtr.p->m_lo_last_serial_op_ptr_i == opPtr.i);
+ if (prevS.i != loPtr.i)
+ {
+ jam();
+ loPtr.p->m_lo_last_serial_op_ptr_i = prevS.i;
+ }
+ else
+ {
+ loPtr.p->m_lo_last_serial_op_ptr_i = RNIL;
+ }
+ validate_lock_queue(loPtr);
+ }
+ else if (nextP.i == RNIL)
+ {
+ ptrCheckGuard(nextS, coprecsize, operationrec);
+ ndbassert(nextS.p->prevSerialQue == opPtr.i);
+ prevS.p->nextSerialQue = nextS.i;
+ nextS.p->prevSerialQue = prevS.i;
+
+ if (prevS.p->m_op_bits & Operationrec::OP_LOCK_OWNER)
+ {
+ /**
+ * Abort S0
+ */
+ OperationrecPtr lastOp;
+ lastOp.i = prevS.p->m_lo_last_parallel_op_ptr_i;
+ if (lastOp.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(lastOp, coprecsize, operationrec);
+ ndbassert(lastOp.p->m_lock_owner_ptr_i = prevS.i);
+ }
+ else
+ {
+ jam();
+ lastOp = prevS;
+ }
+ startNext(signal, lastOp);
+ validate_lock_queue(lastOp);
+ }
+ else
+ {
+ validate_lock_queue(prevS);
+ }
+ }
+ }
+}
+
+
void Dbacc::abortOperation(Signal* signal)
{
- OperationrecPtr aboOperRecPtr;
- OperationrecPtr TaboOperRecPtr;
- Page8Ptr aboPageidptr;
- Uint32 taboElementptr;
- Uint32 tmp2Olq;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+
+ validate_lock_queue(operationRecPtr);
- if (operationRecPtr.p->lockOwner == ZTRUE) {
+ if (opbits & Operationrec::OP_LOCK_OWNER)
+ {
takeOutLockOwnersList(signal, operationRecPtr);
- if (operationRecPtr.p->insertIsDone == ZTRUE) {
+ opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER;
+ if (opbits & Operationrec::OP_INSERT_IS_DONE)
+ {
jam();
- operationRecPtr.p->elementIsDisappeared = ZTRUE;
+ opbits |= Operationrec::OP_ELEMENT_DISAPPEARED;
}//if
- if ((operationRecPtr.p->nextParallelQue != RNIL) ||
- (operationRecPtr.p->nextSerialQue != RNIL)) {
+ operationRecPtr.p->m_op_bits = opbits;
+ const bool queue = (operationRecPtr.p->nextParallelQue != RNIL ||
+ operationRecPtr.p->nextSerialQue != RNIL);
+
+ if (queue)
+ {
jam();
- releaselock(signal);
- } else {
- /* --------------------------------------------------------------------------------- */
- /* WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED. IF INSERT OR STANDBY */
- /* WE DELETE THE ELEMENT OTHERWISE WE REMOVE THE LOCK FROM THE ELEMENT. */
- /* --------------------------------------------------------------------------------- */
- if (operationRecPtr.p->elementIsDisappeared == ZFALSE) {
+ release_lockowner(signal, operationRecPtr, false);
+ }
+ else
+ {
+ /* -------------------------------------------------------------------
+ * WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED.
+ * IF INSERT OR STANDBY WE DELETE THE ELEMENT OTHERWISE WE REMOVE
+ * THE LOCK FROM THE ELEMENT.
+ * ------------------------------------------------------------------ */
+ if ((opbits & Operationrec::OP_ELEMENT_DISAPPEARED) == 0)
+ {
jam();
+ Page8Ptr aboPageidptr;
+ Uint32 taboElementptr;
+ Uint32 tmp2Olq;
+
taboElementptr = operationRecPtr.p->elementPointer;
aboPageidptr.i = operationRecPtr.p->elementPage;
tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
@@ -3297,87 +4258,31 @@ void Dbacc::abortOperation(Signal* signal)
arrGuard(taboElementptr, 2048);
aboPageidptr.p->word32[taboElementptr] = tmp2Olq;
return;
- } else {
+ }
+ else
+ {
jam();
commitdelete(signal);
}//if
}//if
- } else {
- /* --------------------------------------------------------------- */
- // We are not the lock owner.
- /* --------------------------------------------------------------- */
- jam();
- if (operationRecPtr.p->prevParallelQue != RNIL) {
- jam();
- /* ---------------------------------------------------------------------------------- */
- /* SINCE WE ARE NOT QUEUE LEADER WE NEED NOT CONSIDER IF THE ELEMENT IS TO BE DELETED.*/
- /* We will simply remove it from the parallel list without any other rearrangements. */
- /* ---------------------------------------------------------------------------------- */
- aboOperRecPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue;
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- aboOperRecPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
- }//if
- } else if (operationRecPtr.p->prevSerialQue != RNIL) {
- /* ------------------------------------------------------------------------- */
- // We are not in the parallel queue owning the lock. Thus we are in another parallel
- // queue longer down in the serial queue. We are however first since prevParallelQue
- // == RNIL.
- /* ------------------------------------------------------------------------- */
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- /* ------------------------------------------------------------------------- */
- // We have an operation in the queue after us. We simply rearrange this parallel queue.
- // The new leader of this parallel queue will be operation in the serial queue.
- /* ------------------------------------------------------------------------- */
- aboOperRecPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
- aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue;
- aboOperRecPtr.p->prevParallelQue = RNIL; // Queue Leader
- if (operationRecPtr.p->nextSerialQue != RNIL) {
- jam();
- TaboOperRecPtr.i = operationRecPtr.p->nextSerialQue;
- ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec);
- TaboOperRecPtr.p->prevSerialQue = aboOperRecPtr.i;
- }//if
- TaboOperRecPtr.i = operationRecPtr.p->prevSerialQue;
- ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec);
- TaboOperRecPtr.p->nextSerialQue = aboOperRecPtr.i;
- } else {
- jam();
- /* ------------------------------------------------------------------------- */
- // We are the only operation in this parallel queue. We will thus shrink the serial
- // queue.
- /* ------------------------------------------------------------------------- */
- aboOperRecPtr.i = operationRecPtr.p->prevSerialQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
- if (operationRecPtr.p->nextSerialQue != RNIL) {
- jam();
- aboOperRecPtr.i = operationRecPtr.p->nextSerialQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue;
- }//if
- }//if
- }//if
- }//if
- /* ------------------------------------------------------------------------- */
- // If prevParallelQue = RNIL and prevSerialQue = RNIL and we are not owner of the
- // lock then we cannot be in any lock queue at all.
- /* ------------------------------------------------------------------------- */
-}//Dbacc::abortOperation()
+ }
+ else if (opbits & Operationrec::OP_RUN_QUEUE)
+ {
+ abortParallelQueueOperation(signal, operationRecPtr);
+ }
+ else
+ {
+ abortSerieQueueOperation(signal, operationRecPtr);
+ }
+}
-void Dbacc::commitDeleteCheck()
+void
+Dbacc::commitDeleteCheck()
{
OperationrecPtr opPtr;
OperationrecPtr lastOpPtr;
OperationrecPtr deleteOpPtr;
- bool elementDeleted = false;
+ Uint32 elementDeleted = 0;
bool deleteCheckOngoing = true;
Uint32 hashValue = 0;
lastOpPtr = operationRecPtr;
@@ -3390,7 +4295,9 @@ void Dbacc::commitDeleteCheck()
}//while
deleteOpPtr = lastOpPtr;
do {
- if (deleteOpPtr.p->operation == ZDELETE) {
+ Uint32 opbits = deleteOpPtr.p->m_op_bits;
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ if (op == ZDELETE) {
jam();
/* -------------------------------------------------------------------
* IF THE CURRENT OPERATION TO BE COMMITTED IS A DELETE OPERATION DUE TO
@@ -3404,10 +4311,9 @@ void Dbacc::commitDeleteCheck()
* DELETE-OPERATION THAT HAS A HASH VALUE.
* ----------------------------------------------------------------- */
hashValue = deleteOpPtr.p->hashValue;
- elementDeleted = true;
+ elementDeleted = Operationrec::OP_ELEMENT_DISAPPEARED;
deleteCheckOngoing = false;
- } else if ((deleteOpPtr.p->operation == ZREAD) ||
- (deleteOpPtr.p->operation == ZSCAN_OP)) {
+ } else if (op == ZREAD || op == ZSCAN_OP) {
/* -------------------------------------------------------------------
* We are trying to find out whether the commit will in the end delete
* the tuple. Normally the delete will be the last operation in the
@@ -3418,7 +4324,7 @@ void Dbacc::commitDeleteCheck()
* we have to continue scanning the list looking for a delete operation.
*/
deleteOpPtr.i = deleteOpPtr.p->prevParallelQue;
- if (deleteOpPtr.i == RNIL) {
+ if (opbits & Operationrec::OP_LOCK_OWNER) {
jam();
deleteCheckOngoing = false;
} else {
@@ -3437,14 +4343,14 @@ void Dbacc::commitDeleteCheck()
opPtr = lastOpPtr;
do {
jam();
- opPtr.p->commitDeleteCheckFlag = ZTRUE;
+ opPtr.p->m_op_bits |= Operationrec::OP_COMMIT_DELETE_CHECK;
if (elementDeleted) {
jam();
- opPtr.p->elementIsDisappeared = ZTRUE;
+ opPtr.p->m_op_bits |= elementDeleted;
opPtr.p->hashValue = hashValue;
}//if
opPtr.i = opPtr.p->prevParallelQue;
- if (opPtr.i == RNIL) {
+ if (opPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) {
jam();
break;
}//if
@@ -3460,14 +4366,13 @@ void Dbacc::commitDeleteCheck()
/* ------------------------------------------------------------------------- */
void Dbacc::commitOperation(Signal* signal)
{
- OperationrecPtr tolqTmpPtr;
- Page8Ptr coPageidptr;
- Uint32 tcoElementptr;
- Uint32 tmp2Olq;
+ validate_lock_queue(operationRecPtr);
- if ((operationRecPtr.p->commitDeleteCheckFlag == ZFALSE) &&
- (operationRecPtr.p->operation != ZSCAN_OP) &&
- (operationRecPtr.p->operation != ZREAD)) {
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ ndbrequire((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_EXECUTED);
+ if ((opbits & Operationrec::OP_COMMIT_DELETE_CHECK) == 0 && (op != ZREAD))
+ {
jam();
/* This method is used to check whether the end result of the transaction
will be to delete the tuple. In this case all operation will be marked
@@ -3479,16 +4384,30 @@ void Dbacc::commitOperation(Signal* signal)
lock is released.
*/
commitDeleteCheck();
+ opbits = operationRecPtr.p->m_op_bits;
}//if
- if (operationRecPtr.p->lockOwner == ZTRUE) {
+
+ ndbassert(opbits & Operationrec::OP_RUN_QUEUE);
+
+ if (opbits & Operationrec::OP_LOCK_OWNER)
+ {
takeOutLockOwnersList(signal, operationRecPtr);
- if ((operationRecPtr.p->nextParallelQue == RNIL) &&
- (operationRecPtr.p->nextSerialQue == RNIL) &&
- (operationRecPtr.p->elementIsDisappeared == ZFALSE)) {
+ opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER;
+ operationRecPtr.p->m_op_bits = opbits;
+
+ const bool queue = (operationRecPtr.p->nextParallelQue != RNIL ||
+ operationRecPtr.p->nextSerialQue != RNIL);
+
+ if (!queue && (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) == 0)
+ {
/*
- This is the normal path through the commit for operations owning the
- lock without any queues and not a delete operation.
- */
+ * This is the normal path through the commit for operations owning the
+ * lock without any queues and not a delete operation.
+ */
+ Page8Ptr coPageidptr;
+ Uint32 tcoElementptr;
+ Uint32 tmp2Olq;
+
coPageidptr.i = operationRecPtr.p->elementPage;
tcoElementptr = operationRecPtr.p->elementPointer;
tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
@@ -3498,445 +4417,421 @@ void Dbacc::commitOperation(Signal* signal)
arrGuard(tcoElementptr, 2048);
coPageidptr.p->word32[tcoElementptr] = tmp2Olq;
return;
- } else if ((operationRecPtr.p->nextParallelQue != RNIL) ||
- (operationRecPtr.p->nextSerialQue != RNIL)) {
+ }
+ else if (queue)
+ {
jam();
/*
- The case when there is a queue lined up.
- Release the lock and pass it to the next operation lined up.
- */
- releaselock(signal);
+ * The case when there is a queue lined up.
+ * Release the lock and pass it to the next operation lined up.
+ */
+ release_lockowner(signal, operationRecPtr, true);
return;
- } else {
+ }
+ else
+ {
jam();
/*
- No queue and elementIsDisappeared is true. We perform the actual delete
- operation.
- */
+ * No queue and elementIsDisappeared is true.
+ * We perform the actual delete operation.
+ */
commitdelete(signal);
return;
}//if
- } else {
- /*
- THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE
- ELEMENT.
- */
- ndbrequire(operationRecPtr.p->prevParallelQue != RNIL);
+ }
+ else
+ {
+ /**
+ * THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE
+ * ELEMENT.
+ */
jam();
- tolqTmpPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
- tolqTmpPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue;
- if (operationRecPtr.p->nextParallelQue != RNIL) {
+ OperationrecPtr prev, next, lockOwner;
+ prev.i = operationRecPtr.p->prevParallelQue;
+ next.i = operationRecPtr.p->nextParallelQue;
+ lockOwner.i = operationRecPtr.p->m_lock_owner_ptr_i;
+ ptrCheckGuard(prev, coprecsize, operationrec);
+
+ prev.p->nextParallelQue = next.i;
+ if (next.i != RNIL)
+ {
jam();
- tolqTmpPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
- tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
- }//if
-
+ ptrCheckGuard(next, coprecsize, operationrec);
+ next.p->prevParallelQue = prev.i;
+ }
+ else if (prev.p->m_op_bits & Operationrec::OP_LOCK_OWNER)
+ {
+ jam();
+ ndbassert(lockOwner.i == prev.i);
+ prev.p->m_lo_last_parallel_op_ptr_i = RNIL;
+ next = prev;
+ }
+ else
+ {
+ jam();
+ /**
+ * Last operation in parallell queue
+ */
+ ndbassert(prev.i != lockOwner.i);
+ ptrCheckGuard(lockOwner, coprecsize, operationrec);
+ ndbassert(lockOwner.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ lockOwner.p->m_lo_last_parallel_op_ptr_i = prev.i;
+ prev.p->m_lock_owner_ptr_i = lockOwner.i;
+ next = prev;
+ }
+
/**
* Check possible lock upgrade
- * 1) Find lock owner
- * 2) Count transactions in parallel que
- * 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner)
- * upgrade next serial
*/
- if(operationRecPtr.p->lockMode)
+ if(opbits & Operationrec::OP_ACC_LOCK_MODE)
{
jam();
+
/**
- * Committing a non shared operation can't lead to lock upgrade
+ * Not lock owner...committing a exclusive operation...
+ *
+ * e.g
+ * T1(R) T1(X)
+ * T2(R/X)
+ *
+ * If T1(X) commits T2(R/X) is not supposed to run
+ * as T1(R) should also commit
+ *
+ * e.g
+ * T1(R) T1(X) T1*(R)
+ * T2(R/X)
+ *
+ * If T1*(R) commits T2(R/X) is not supposed to run
+ * as T1(R),T2(x) should also commit
*/
+ validate_lock_queue(prev);
return;
}
-
- OperationrecPtr lock_owner;
- lock_owner.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(lock_owner, coprecsize, operationrec);
- Uint32 transid[2] = { lock_owner.p->transId1,
- lock_owner.p->transId2 };
-
-
- while(lock_owner.p->prevParallelQue != RNIL)
+
+ /**
+ * We committed a shared lock
+ * Check if we can start next...
+ */
+ while(next.p->nextParallelQue != RNIL)
{
- lock_owner.i = lock_owner.p->prevParallelQue;
- ptrCheckGuard(lock_owner, coprecsize, operationrec);
+ jam();
+ next.i = next.p->nextParallelQue;
+ ptrCheckGuard(next, coprecsize, operationrec);
- if(lock_owner.p->transId1 != transid[0] ||
- lock_owner.p->transId2 != transid[1])
+ if ((next.p->m_op_bits & Operationrec::OP_STATE_MASK) !=
+ Operationrec::OP_STATE_EXECUTED)
{
jam();
- /**
- * If more than 1 trans in lock queue -> no lock upgrade
- */
return;
}
}
- check_lock_upgrade(signal, lock_owner, operationRecPtr);
+ startNext(signal, next);
+
+ validate_lock_queue(prev);
}
}//Dbacc::commitOperation()
-void
-Dbacc::check_lock_upgrade(Signal* signal,
- OperationrecPtr lock_owner,
- OperationrecPtr release_op)
-{
- if((lock_owner.p->transId1 == release_op.p->transId1 &&
- lock_owner.p->transId2 == release_op.p->transId2) ||
- release_op.p->lockMode ||
- lock_owner.p->nextSerialQue == RNIL)
- {
- jam();
- /**
- * No lock upgrade if same trans or lock owner has no serial queue
- * or releasing non shared op
- */
- return;
- }
-
- OperationrecPtr next;
- next.i = lock_owner.p->nextSerialQue;
- ptrCheckGuard(next, coprecsize, operationrec);
+void
+Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit)
+{
+ OperationrecPtr nextP;
+ OperationrecPtr nextS;
+ OperationrecPtr newOwner;
+ OperationrecPtr lastP;
- if(lock_owner.p->transId1 != next.p->transId1 ||
- lock_owner.p->transId2 != next.p->transId2)
+ Uint32 opbits = opPtr.p->m_op_bits;
+ nextP.i = opPtr.p->nextParallelQue;
+ nextS.i = opPtr.p->nextSerialQue;
+ lastP.i = opPtr.p->m_lo_last_parallel_op_ptr_i;
+ Uint32 lastS = opPtr.p->m_lo_last_serial_op_ptr_i;
+
+ ndbassert(lastP.i != RNIL || lastS != RNIL);
+ ndbassert(nextP.i != RNIL || nextS.i != RNIL);
+
+ enum {
+ NOTHING,
+ CHECK_LOCK_UPGRADE,
+ START_NEW
+ } action = NOTHING;
+
+ if (nextP.i != RNIL)
{
jam();
- /**
- * No lock upgrad if !same trans in serial queue
- */
- return;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ newOwner = nextP;
+
+ if (lastP.i == newOwner.i)
+ {
+ newOwner.p->m_lo_last_parallel_op_ptr_i = RNIL;
+ lastP = nextP;
+ }
+ else
+ {
+ ptrCheckGuard(lastP, coprecsize, operationrec);
+ newOwner.p->m_lo_last_parallel_op_ptr_i = lastP.i;
+ lastP.p->m_lock_owner_ptr_i = newOwner.i;
+ }
+
+ newOwner.p->m_lo_last_serial_op_ptr_i = lastS;
+ newOwner.p->nextSerialQue = nextS.i;
+
+ if (nextS.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(nextS, coprecsize, operationrec);
+ ndbassert(nextS.p->prevSerialQue == opPtr.i);
+ nextS.p->prevSerialQue = newOwner.i;
+ }
+
+ if (commit)
+ {
+ if ((opbits & Operationrec::OP_ACC_LOCK_MODE) == ZREADLOCK)
+ {
+ jam();
+ /**
+ * Lock owner...committing a shared operation...
+ * this can be a lock upgrade
+ *
+ * e.g
+ * T1(R) T2(R)
+ * T2(X)
+ *
+ * If T1(R) commits T2(X) is supposed to run
+ *
+ * e.g
+ * T1(X) T1(R)
+ * T2(R)
+ *
+ * If T1(X) commits, then T1(R) _should_ commit before T2(R) is
+ * allowed to proceed
+ */
+ action = CHECK_LOCK_UPGRADE;
+ }
+ else
+ {
+ jam();
+ newOwner.p->m_op_bits |= Operationrec::OP_LOCK_MODE;
+ }
+ }
+ else
+ {
+ /**
+ * Aborting an operation can *always* lead to lock upgrade
+ */
+ action = CHECK_LOCK_UPGRADE;
+
+ /**
+ * Update ACC_LOCK_MODE
+ */
+ if (opbits & Operationrec::OP_LOCK_MODE)
+ {
+ Uint32 nextbits = nextP.p->m_op_bits;
+ while ((nextbits & Operationrec::OP_LOCK_MODE) == 0)
+ {
+ ndbassert(nextbits & Operationrec::OP_ACC_LOCK_MODE);
+ nextbits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE;
+ nextP.p->m_op_bits = nextbits;
+
+ if (nextP.p->nextParallelQue != RNIL)
+ {
+ nextP.i = nextP.p->nextParallelQue;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ nextbits = nextP.p->m_op_bits;
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
}
-
- if (getNoParallelTransaction(lock_owner.p) > 1)
+ else
{
jam();
- /**
- * No lock upgrade if more than 1 transaction in parallell queue
- */
- return;
- }
+ ptrCheckGuard(nextS, coprecsize, operationrec);
+ newOwner = nextS;
+
+ newOwner.p->m_op_bits |= Operationrec::OP_RUN_QUEUE;
+
+ if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
+ {
+ report_dealloc(signal, opPtr.p);
+ newOwner.p->localdata[0] = ~0;
+ }
+ else
+ {
+ jam();
+ newOwner.p->localdata[0] = opPtr.p->localdata[0];
+ newOwner.p->localdata[1] = opPtr.p->localdata[1];
+ }
+
+ lastP = newOwner;
+ while (lastP.p->nextParallelQue != RNIL)
+ {
+ lastP.i = lastP.p->nextParallelQue;
+ ptrCheckGuard(lastP, coprecsize, operationrec);
+ lastP.p->m_op_bits |= Operationrec::OP_RUN_QUEUE;
+ }
+
+ if (newOwner.i != lastP.i)
+ {
+ jam();
+ newOwner.p->m_lo_last_parallel_op_ptr_i = lastP.i;
+ }
+ else
+ {
+ jam();
+ newOwner.p->m_lo_last_parallel_op_ptr_i = RNIL;
+ }
- if (getNoParallelTransaction(next.p) > 1)
- {
- jam();
- /**
- * No lock upgrade if more than 1 transaction in next's parallell queue
- */
- return;
+ if (newOwner.i != lastS)
+ {
+ jam();
+ newOwner.p->m_lo_last_serial_op_ptr_i = lastS;
+ }
+ else
+ {
+ jam();
+ newOwner.p->m_lo_last_serial_op_ptr_i = RNIL;
+ }
+
+ action = START_NEW;
}
- OperationrecPtr tmp;
- tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue;
- if(tmp.i != RNIL)
- {
- ptrCheckGuard(tmp, coprecsize, operationrec);
- ndbassert(tmp.p->prevSerialQue == next.i);
- tmp.p->prevSerialQue = lock_owner.i;
- }
- next.p->nextSerialQue = next.p->prevSerialQue = RNIL;
+ insertLockOwnersList(signal, newOwner);
- // Find end of parallell que
- tmp = lock_owner;
- Uint32 lockMode = next.p->lockMode > lock_owner.p->lockMode ?
- next.p->lockMode : lock_owner.p->lockMode;
- while(tmp.p->nextParallelQue != RNIL)
+ /**
+ * Copy op info, and store op in element
+ *
+ */
{
- jam();
- tmp.i = tmp.p->nextParallelQue;
- tmp.p->lockMode = lockMode;
- ptrCheckGuard(tmp, coprecsize, operationrec);
- }
- tmp.p->lockMode = lockMode;
-
- next.p->prevParallelQue = tmp.i;
- tmp.p->nextParallelQue = next.i;
-
- OperationrecPtr save = operationRecPtr;
-
- Uint32 localdata[2];
- localdata[0] = lock_owner.p->localdata[0];
- localdata[1] = lock_owner.p->localdata[1];
- do {
- next.p->localdata[0] = localdata[0];
- next.p->localdata[1] = localdata[1];
- next.p->lockMode = lockMode;
-
- operationRecPtr = next;
- executeNextOperation(signal);
- if (next.p->nextParallelQue != RNIL)
+ newOwner.p->elementPage = opPtr.p->elementPage;
+ newOwner.p->elementIsforward = opPtr.p->elementIsforward;
+ newOwner.p->elementPointer = opPtr.p->elementPointer;
+ newOwner.p->elementContainer = opPtr.p->elementContainer;
+ newOwner.p->scanBits = opPtr.p->scanBits;
+ newOwner.p->hashvaluePart = opPtr.p->hashvaluePart;
+ newOwner.p->m_op_bits |= (opbits & Operationrec::OP_ELEMENT_DISAPPEARED);
+ if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
{
+ /* ------------------------------------------------------------------- */
+ // If the elementIsDisappeared is set then we know that the
+ // hashValue is also set since it always originates from a
+ // committing abort or a aborting insert.
+ // Scans do not initialise the hashValue and must have this
+ // value initialised if they are
+ // to successfully commit the delete.
+ /* ------------------------------------------------------------------- */
jam();
- next.i = next.p->nextParallelQue;
- ptrCheckGuard(next, coprecsize, operationrec);
- } else {
- jam();
- break;
+ newOwner.p->hashValue = opPtr.p->hashValue;
}//if
- } while (1);
+
+ Page8Ptr pagePtr;
+ pagePtr.i = newOwner.p->elementPage;
+ ptrCheckGuard(pagePtr, cpagesize, page8);
+ const Uint32 tmp = ElementHeader::setLocked(newOwner.i);
+ arrGuard(newOwner.p->elementPointer, 2048);
+ pagePtr.p->word32[newOwner.p->elementPointer] = tmp;
+ }
- operationRecPtr = save;
+ switch(action){
+ case NOTHING:
+ validate_lock_queue(newOwner);
+ return;
+ case START_NEW:
+ startNew(signal, newOwner);
+ validate_lock_queue(newOwner);
+ return;
+ case CHECK_LOCK_UPGRADE:
+ startNext(signal, lastP);
+ validate_lock_queue(lastP);
+ break;
+ }
}
-/* ------------------------------------------------------------------------- */
-/* RELEASELOCK */
-/* RESETS LOCK OF AN ELEMENT. */
-/* INFORMATION ABOUT THE ELEMENT IS SAVED IN THE OPERATION RECORD */
-/* THESE INFORMATION IS USED TO UPDATE HEADER OF THE ELEMENT */
-/* ------------------------------------------------------------------------- */
-void Dbacc::releaselock(Signal* signal)
-{
- OperationrecPtr rloOperPtr;
- OperationrecPtr trlOperPtr;
- OperationrecPtr trlTmpOperPtr;
- Uint32 TelementIsDisappeared;
-
- trlOperPtr.i = RNIL;
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- /** ---------------------------------------------------------------------
- * NEXT OPERATION TAKES OVER THE LOCK.
- * We will simply move the info from the leader
- * to the new queue leader.
- * -------------------------------------------------------------------- */
- trlOperPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(trlOperPtr, coprecsize, operationrec);
- copyInOperPtr = trlOperPtr;
- copyOperPtr = operationRecPtr;
- copyOpInfo(signal);
- trlOperPtr.p->prevParallelQue = RNIL;
- if (operationRecPtr.p->nextSerialQue != RNIL) {
- jam();
- /* -----------------------------------------------------------------
- * THERE IS A SERIAL QUEUE. MOVE IT FROM RELEASED OP REC TO THE
- * NEW LOCK OWNER.
- * ------------------------------------------------------------------ */
- trlOperPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
- trlTmpOperPtr.i = trlOperPtr.p->nextSerialQue;
- ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
- trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i;
- }//if
-
- check_lock_upgrade(signal, copyInOperPtr, operationRecPtr);
- } else {
- ndbrequire(operationRecPtr.p->nextSerialQue != RNIL);
- jam();
- /** ---------------------------------------------------------------------
- * THE PARALLEL QUEUE IS EMPTY AND THE SERIAL QUEUE IS NOT EMPTY.
- * WE NEED TO REARRANGE LISTS AND START A NUMBER OF OPERATIONS.
- * -------------------------------------------------------------------- */
- trlOperPtr.i = operationRecPtr.p->nextSerialQue;
- ptrCheckGuard(trlOperPtr, coprecsize, operationrec);
- copyOperPtr = operationRecPtr;
- copyInOperPtr = trlOperPtr;
- copyOpInfo(signal);
- trlOperPtr.p->prevSerialQue = RNIL;
- ndbrequire(trlOperPtr.p->prevParallelQue == RNIL);
- /* --------------------------------------------------------------------- */
- /* WE HAVE MOVED TO THE NEXT PARALLEL QUEUE. WE MUST START ALL OF THOSE */
- /* OPERATIONS WHICH UP TILL NOW HAVE BEEN QUEUED WAITING FOR THE LOCK. */
- /* --------------------------------------------------------------------- */
- rloOperPtr = operationRecPtr;
- trlTmpOperPtr = trlOperPtr;
- TelementIsDisappeared = trlOperPtr.p->elementIsDisappeared;
- Uint32 ThashValue = trlOperPtr.p->hashValue;
- do {
- /* ------------------------------------------------------------------ */
- // Ensure that all operations in the queue are assigned with the
- // elementIsDisappeared to ensure that the element is removed after
- // a previous delete. An insert does however revert this decision
- // since the element is put back again.
- // Local checkpoints complicate life here since they do not
- // execute the next operation but simply change
- // the state on the operation.
- // We need to set-up the variable elementIsDisappeared
- // properly even when local checkpoints and inserts/writes after
- // deletes occur.
- /* ------------------------------------------------------------------- */
- trlTmpOperPtr.p->elementIsDisappeared = TelementIsDisappeared;
- if (TelementIsDisappeared == ZTRUE) {
- /* ----------------------------------------------------------------- */
- // If the elementIsDisappeared is set then we know that the
- // hashValue is also set since it always originates from a
- // committing abort or a aborting insert.
- // Scans do not initialise the hashValue and must have this
- // value initialised if they are to successfully commit the delete.
- /* ----------------------------------------------------------------- */
- jam();
- trlTmpOperPtr.p->hashValue = ThashValue;
- }//if
- trlTmpOperPtr.p->localdata[0] = trlOperPtr.p->localdata[0];
- trlTmpOperPtr.p->localdata[1] = trlOperPtr.p->localdata[1];
- /* ------------------------------------------------------------------- */
- // Restart the queued operation.
- /* ------------------------------------------------------------------- */
- operationRecPtr = trlTmpOperPtr;
- TelementIsDisappeared = executeNextOperation(signal);
- ThashValue = operationRecPtr.p->hashValue;
- if (trlTmpOperPtr.p->nextParallelQue != RNIL) {
- jam();
- /* ----------------------------------------------------------------- */
- // We will continue with the next operation in the parallel
- // queue and start this as well.
- /* ----------------------------------------------------------------- */
- trlTmpOperPtr.i = trlTmpOperPtr.p->nextParallelQue;
- ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
- } else {
- jam();
- break;
- }//if
- } while (1);
- operationRecPtr = rloOperPtr;
- }//if
-
- // Insert the next op into the lock owner list
- insertLockOwnersList(signal, trlOperPtr);
- return;
-}//Dbacc::releaselock()
-
-/* --------------------------------------------------------------------------------- */
-/* COPY_OP_INFO */
-/* INPUT: COPY_IN_OPER_PTR AND COPY_OPER_PTR. */
-/* DESCRIPTION:INFORMATION ABOUT THE ELEMENT WILL BE MOVED FROM OPERATION */
-/* REC TO QUEUE OP REC. QUE OP REC TAKES OVER THE LOCK. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::copyOpInfo(Signal* signal)
+void
+Dbacc::startNew(Signal* signal, OperationrecPtr newOwner)
{
- Page8Ptr coiPageidptr;
-
- copyInOperPtr.p->elementPage = copyOperPtr.p->elementPage;
- copyInOperPtr.p->elementIsforward = copyOperPtr.p->elementIsforward;
- copyInOperPtr.p->elementContainer = copyOperPtr.p->elementContainer;
- copyInOperPtr.p->elementPointer = copyOperPtr.p->elementPointer;
- copyInOperPtr.p->scanBits = copyOperPtr.p->scanBits;
- copyInOperPtr.p->hashvaluePart = copyOperPtr.p->hashvaluePart;
- copyInOperPtr.p->elementIsDisappeared = copyOperPtr.p->elementIsDisappeared;
- if (copyInOperPtr.p->elementIsDisappeared == ZTRUE) {
- /* --------------------------------------------------------------------------------- */
- // If the elementIsDisappeared is set then we know that the hashValue is also set
- // since it always originates from a committing abort or a aborting insert. Scans
- // do not initialise the hashValue and must have this value initialised if they are
- // to successfully commit the delete.
- /* --------------------------------------------------------------------------------- */
- jam();
- copyInOperPtr.p->hashValue = copyOperPtr.p->hashValue;
- }//if
- coiPageidptr.i = copyOperPtr.p->elementPage;
- ptrCheckGuard(coiPageidptr, cpagesize, page8);
- const Uint32 tmp = ElementHeader::setLocked(copyInOperPtr.i);
- dbgWord32(coiPageidptr, copyOperPtr.p->elementPointer, tmp);
- arrGuard(copyOperPtr.p->elementPointer, 2048);
- coiPageidptr.p->word32[copyOperPtr.p->elementPointer] = tmp;
- copyInOperPtr.p->localdata[0] = copyOperPtr.p->localdata[0];
- copyInOperPtr.p->localdata[1] = copyOperPtr.p->localdata[1];
-}//Dbacc::copyOpInfo()
+ OperationrecPtr save = operationRecPtr;
+ operationRecPtr = newOwner;
+
+ Uint32 opbits = newOwner.p->m_op_bits;
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ Uint32 opstate = (opbits & Operationrec::OP_STATE_MASK);
+ ndbassert(opstate == Operationrec::OP_STATE_WAITING);
+ ndbassert(opbits & Operationrec::OP_LOCK_OWNER);
+ const bool deleted = opbits & Operationrec::OP_ELEMENT_DISAPPEARED;
+ Uint32 errCode = 0;
+
+ opbits &= opbits & ~(Uint32)Operationrec::OP_STATE_MASK;
+ opbits |= Operationrec::OP_STATE_RUNNING;
+
+ if (op == ZSCAN_OP && (opbits & Operationrec::OP_LOCK_REQ) == 0)
+ goto scan;
-/* ******************--------------------------------------------------------------- */
-/* EXECUTE NEXT OPERATION */
-/* NEXT OPERATION IN A LOCK QUEUE WILL BE EXECUTED. */
-/* --------------------------------------------------------------------------------- */
-Uint32 Dbacc::executeNextOperation(Signal* signal)
-{
- ndbrequire(operationRecPtr.p->transactionstate == ACTIVE);
- if (operationRecPtr.p->elementIsDisappeared == ZTRUE) {
- /* --------------------------------------------------------------------- */
- /* PREVIOUS OPERATION WAS DELETE OPERATION AND THE ELEMENT IS DELETED. */
- /* --------------------------------------------------------------------- */
- if (((operationRecPtr.p->operation != ZINSERT) &&
- (operationRecPtr.p->operation != ZWRITE)) ||
- (operationRecPtr.p->prevParallelQue != RNIL)) {
- if (operationRecPtr.p->operation != ZSCAN_OP ||
- operationRecPtr.p->isAccLockReq) {
- jam();
- /* ----------------------------------------------------------------- */
- // Updates and reads with a previous delete simply aborts with read
- // error indicating that tuple did not exist.
- // Also inserts and writes not being the first operation.
- /* ----------------------------------------------------------------- */
- operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
- signal->theData[0] = operationRecPtr.p->userptr;
- signal->theData[1] = ZREAD_ERROR;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal,
- 2, JBB);
- return operationRecPtr.p->elementIsDisappeared;
- } else {
- /* ----------------------------------------------------------------- */
- /* ABORT OF OPERATION NEEDED BUT THE OPERATION IS A
- * SCAN => SPECIAL TREATMENT.
- * IF THE SCAN WAITS IN QUEUE THEN WE MUST REMOVE THE OPERATION
- * FROM THE SCAN LOCK QUEUE AND IF NO MORE OPERATIONS ARE QUEUED
- * THEN WE SHOULD RESTART THE SCAN PROCESS. OTHERWISE WE SIMPLY
- * RELEASE THE OPERATION AND DECREASE THE NUMBER OF LOCKS HELD.
- * ----------------------------------------------------------------- */
- takeOutScanLockQueue(operationRecPtr.p->scanRecPtr);
- putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr);
- return operationRecPtr.p->elementIsDisappeared;
- }//if
- }//if
- /* --------------------------------------------------------------------- */
- // Insert and writes can continue but need to be converted to inserts.
- /* --------------------------------------------------------------------- */
- jam();
- operationRecPtr.p->elementIsDisappeared = ZFALSE;
- operationRecPtr.p->operation = ZINSERT;
- operationRecPtr.p->insertIsDone = ZTRUE;
- } else if (operationRecPtr.p->operation == ZINSERT) {
- bool abortFlag = true;
- if (operationRecPtr.p->prevParallelQue != RNIL) {
- OperationrecPtr prevOpPtr;
- jam();
- prevOpPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(prevOpPtr, coprecsize, operationrec);
- if (prevOpPtr.p->operation == ZDELETE) {
- jam();
- abortFlag = false;
- }//if
- }//if
- if (abortFlag) {
- jam();
- /* ------------------------------------------------------------------- */
- /* ELEMENT STILL REMAINS AND WE ARE TRYING TO INSERT IT AGAIN. */
- /* THIS IS CLEARLY NOT A GOOD IDEA. */
- /* ------------------------------------------------------------------- */
- operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
- signal->theData[0] = operationRecPtr.p->userptr;
- signal->theData[1] = ZWRITE_ERROR;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal,
- 2, JBB);
- return operationRecPtr.p->elementIsDisappeared;
- }//if
- }
- else if(operationRecPtr.p->operation == ZWRITE)
+ if (deleted)
{
jam();
- operationRecPtr.p->operation = ZUPDATE;
- if (operationRecPtr.p->prevParallelQue != RNIL) {
- OperationrecPtr prevOpPtr;
- jam();
- prevOpPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(prevOpPtr, coprecsize, operationrec);
- if (prevOpPtr.p->operation == ZDELETE)
- {
- jam();
- operationRecPtr.p->operation = ZINSERT;
- }
+ if (op != ZINSERT && op != ZWRITE)
+ {
+ errCode = ZREAD_ERROR;
+ goto ref;
}
+
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits &= ~(Uint32)Operationrec::OP_ELEMENT_DISAPPEARED;
+ opbits |= (op = ZINSERT);
+ opbits |= Operationrec::OP_INSERT_IS_DONE;
+ goto conf;
}
-
- if (operationRecPtr.p->operation == ZSCAN_OP &&
- ! operationRecPtr.p->isAccLockReq) {
+ else if (op == ZINSERT)
+ {
jam();
- takeOutScanLockQueue(operationRecPtr.p->scanRecPtr);
- putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr);
- } else {
+ errCode = ZWRITE_ERROR;
+ goto ref;
+ }
+ else if (op == ZWRITE)
+ {
jam();
- sendAcckeyconf(signal);
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYCONF,
- signal, 6, JBB);
- }//if
- return operationRecPtr.p->elementIsDisappeared;
-}//Dbacc::executeNextOperation()
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits |= (op = ZUPDATE);
+ goto conf;
+ }
+
+conf:
+ newOwner.p->m_op_bits = opbits;
+
+ sendAcckeyconf(signal);
+ sendSignal(newOwner.p->userblockref, GSN_ACCKEYCONF,
+ signal, 6, JBA);
+
+ operationRecPtr = save;
+ return;
+
+scan:
+ jam();
+ newOwner.p->m_op_bits = opbits;
+
+ takeOutScanLockQueue(newOwner.p->scanRecPtr);
+ putReadyScanQueue(signal, newOwner.p->scanRecPtr);
+
+ operationRecPtr = save;
+ return;
+
+ref:
+ newOwner.p->m_op_bits = opbits;
+
+ signal->theData[0] = newOwner.p->userptr;
+ signal->theData[1] = errCode;
+ sendSignal(newOwner.p->userblockref, GSN_ACCKEYREF, signal,
+ 2, JBB);
+
+ operationRecPtr = save;
+ return;
+}
/**
* takeOutLockOwnersList
@@ -3950,7 +4845,6 @@ void Dbacc::takeOutLockOwnersList(Signal* signal,
{
const Uint32 Tprev = outOperPtr.p->prevLockOwnerOp;
const Uint32 Tnext = outOperPtr.p->nextLockOwnerOp;
-
#ifdef VM_TRACE
// Check that operation is already in the list
OperationrecPtr tmpOperPtr;
@@ -3965,8 +4859,7 @@ void Dbacc::takeOutLockOwnersList(Signal* signal,
ndbrequire(inList == true);
#endif
- ndbrequire(outOperPtr.p->lockOwner == ZTRUE);
- outOperPtr.p->lockOwner = ZFALSE;
+ ndbassert(outOperPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
// Fast path through the code for the common case.
if ((Tprev == RNIL) && (Tnext == RNIL)) {
@@ -4007,7 +4900,6 @@ void Dbacc::insertLockOwnersList(Signal* signal,
const OperationrecPtr& insOperPtr)
{
OperationrecPtr tmpOperPtr;
-
#ifdef VM_TRACE
// Check that operation is not already in list
tmpOperPtr.i = fragrecptr.p->lockOwnersList;
@@ -4017,12 +4909,12 @@ void Dbacc::insertLockOwnersList(Signal* signal,
tmpOperPtr.i = tmpOperPtr.p->nextLockOwnerOp;
}
#endif
+ tmpOperPtr.i = fragrecptr.p->lockOwnersList;
+
+ ndbrequire(! (insOperPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER));
- ndbrequire(insOperPtr.p->lockOwner == ZFALSE);
-
- insOperPtr.p->lockOwner = ZTRUE;
+ insOperPtr.p->m_op_bits |= Operationrec::OP_LOCK_OWNER;
insOperPtr.p->prevLockOwnerOp = RNIL;
- tmpOperPtr.i = fragrecptr.p->lockOwnersList;
insOperPtr.p->nextLockOwnerOp = tmpOperPtr.i;
fragrecptr.p->lockOwnersList = insOperPtr.i;
@@ -5385,6 +6277,7 @@ void Dbacc::execNEXT_SCANREQ(Signal* signal)
if (!scanPtr.p->scanReadCommittedFlag) {
commitOperation(signal);
}//if
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
takeOutActiveScanOp(signal);
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
@@ -5400,9 +6293,10 @@ void Dbacc::execNEXT_SCANREQ(Signal* signal)
jam();
fragrecptr.i = scanPtr.p->activeLocalFrag;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- /* --------------------------------------------------------------------------------- */
- /* THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL. RELESE ALL INVOLVED REC. */
- /* --------------------------------------------------------------------------------- */
+ /* ---------------------------------------------------------------------
+ * THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL.
+ * RELESE ALL INVOLVED REC.
+ * ------------------------------------------------------------------- */
releaseScanLab(signal);
return;
break;
@@ -5452,24 +6346,26 @@ void Dbacc::checkNextBucketLab(Signal* signal)
scanPtr.p->nextBucketIndex++;
if (scanPtr.p->scanBucketState == ScanRec::SECOND_LAP) {
if (scanPtr.p->nextBucketIndex > scanPtr.p->maxBucketIndexToRescan) {
- /* --------------------------------------------------------------------------------- */
- // We have finished the rescan phase. We are ready to proceed with the next fragment part.
- /* --------------------------------------------------------------------------------- */
+ /* ---------------------------------------------------------------- */
+ // We have finished the rescan phase.
+ // We are ready to proceed with the next fragment part.
+ /* ---------------------------------------------------------------- */
jam();
checkNextFragmentLab(signal);
return;
}//if
} else if (scanPtr.p->scanBucketState == ScanRec::FIRST_LAP) {
if ((fragrecptr.p->p + fragrecptr.p->maxp) < scanPtr.p->nextBucketIndex) {
- /* --------------------------------------------------------------------------------- */
+ /* ---------------------------------------------------------------- */
// All buckets have been scanned a first time.
- /* --------------------------------------------------------------------------------- */
+ /* ---------------------------------------------------------------- */
if (scanPtr.p->minBucketIndexToRescan == 0xFFFFFFFF) {
jam();
- /* --------------------------------------------------------------------------------- */
- // We have not had any merges behind the scan. Thus it is not necessary to perform
- // any rescan any buckets and we can proceed immediately with the next fragment part.
- /* --------------------------------------------------------------------------------- */
+ /* -------------------------------------------------------------- */
+ // We have not had any merges behind the scan.
+ // Thus it is not necessary to perform any rescan any buckets
+ // and we can proceed immediately with the next fragment part.
+ /* --------------------------------------------------------------- */
checkNextFragmentLab(signal);
return;
} else {
@@ -5561,18 +6457,23 @@ void Dbacc::checkNextBucketLab(Signal* signal)
tslElementptr = tnsElementptr;
setlock(signal);
insertLockOwnersList(signal, operationRecPtr);
+ operationRecPtr.p->m_op_bits |=
+ Operationrec::OP_STATE_RUNNING | Operationrec::OP_RUN_QUEUE;
}//if
} else {
arrGuard(tnsElementptr, 2048);
queOperPtr.i =
ElementHeader::getOpPtrI(nsPageptr.p->word32[tnsElementptr]);
ptrCheckGuard(queOperPtr, coprecsize, operationrec);
- if (queOperPtr.p->elementIsDisappeared == ZTRUE) {
+ if (queOperPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED ||
+ queOperPtr.p->localdata[0] == ~(Uint32)0)
+ {
jam();
- /* --------------------------------------------------------------------------------- */
- // If the lock owner indicates the element is disappeared then we will not report this
- // tuple. We will continue with the next tuple.
- /* --------------------------------------------------------------------------------- */
+ /* ------------------------------------------------------------------ */
+ // If the lock owner indicates the element is disappeared then
+ // we will not report this tuple. We will continue with the next tuple.
+ /* ------------------------------------------------------------------ */
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
signal->theData[0] = scanPtr.i;
@@ -5584,31 +6485,29 @@ void Dbacc::checkNextBucketLab(Signal* signal)
Uint32 return_result;
if (scanPtr.p->scanLockMode == ZREADLOCK) {
jam();
- priPageptr = nsPageptr;
- tpriElementptr = tnsElementptr;
- return_result = placeReadInLockQueue(signal);
+ return_result = placeReadInLockQueue(queOperPtr);
} else {
jam();
- pwiPageptr = nsPageptr;
- tpwiElementptr = tnsElementptr;
- return_result = placeWriteInLockQueue(signal);
+ return_result = placeWriteInLockQueue(queOperPtr);
}//if
if (return_result == ZSERIAL_QUEUE) {
- /* --------------------------------------------------------------------------------- */
- /* WE PLACED THE OPERATION INTO A SERIAL QUEUE AND THUS WE HAVE TO WAIT FOR */
- /* THE LOCK TO BE RELEASED. WE CONTINUE WITH THE NEXT ELEMENT. */
- /* --------------------------------------------------------------------------------- */
+ /* -----------------------------------------------------------------
+ * WE PLACED THE OPERATION INTO A SERIAL QUEUE AND THUS WE HAVE TO
+ * WAIT FOR THE LOCK TO BE RELEASED. WE CONTINUE WITH THE NEXT ELEMENT
+ * ----------------------------------------------------------------- */
putOpScanLockQue(); /* PUT THE OP IN A QUE IN THE SCAN REC */
signal->theData[0] = scanPtr.i;
signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
sendSignal(cownBlockref, GSN_ACC_CHECK_SCAN, signal, 2, JBB);
return;
- } else if (return_result == ZWRITE_ERROR) {
+ } else if (return_result != ZPARALLEL_QUEUE) {
jam();
- /* --------------------------------------------------------------------------------- */
- // The tuple is either not committed yet or a delete in the same transaction (not
- // possible here since we are a scan). Thus we simply continue with the next tuple.
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------- */
+ // The tuple is either not committed yet or a delete in
+ // the same transaction (not possible here since we are a scan).
+ // Thus we simply continue with the next tuple.
+ /* ----------------------------------------------------------------- */
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
signal->theData[0] = scanPtr.i;
@@ -5619,10 +6518,11 @@ void Dbacc::checkNextBucketLab(Signal* signal)
ndbassert(return_result == ZPARALLEL_QUEUE);
}//if
}//if
- /* --------------------------------------------------------------------------------- */
- // Committed read proceed without caring for locks immediately down here except when
- // the tuple was deleted permanently and no new operation has inserted it again.
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
+ // Committed read proceed without caring for locks immediately
+ // down here except when the tuple was deleted permanently
+ // and no new operation has inserted it again.
+ /* ----------------------------------------------------------------------- */
putActiveScanOp(signal);
sendNextScanConf(signal);
return;
@@ -5646,14 +6546,15 @@ void Dbacc::initScanFragmentPart(Signal* signal)
DirRangePtr cnfDirRangePtr;
DirectoryarrayPtr cnfDirptr;
Page8Ptr cnfPageidptr;
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
// Set the active fragment part.
// Set the current bucket scanned to the first.
// Start with the first lap.
// Remember the number of buckets at start of the scan.
- // Set the minimum and maximum to values that will always be smaller and larger than.
+ // Set the minimum and maximum to values that will always be smaller and
+ // larger than.
// Reset the scan indicator on the first bucket.
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
scanPtr.p->activeLocalFrag = fragrecptr.i;
scanPtr.p->nextBucketIndex = 0; /* INDEX OF SCAN BUCKET */
scanPtr.p->scanBucketState = ScanRec::FIRST_LAP;
@@ -5672,11 +6573,11 @@ void Dbacc::initScanFragmentPart(Signal* signal)
releaseScanBucket(signal);
}//Dbacc::initScanFragmentPart()
-/* --------------------------------------------------------------------------------- */
-/* FLAG = 6 = ZCOPY_CLOSE THE SCAN PROCESS IS READY OR ABORTED. ALL OPERATION IN THE */
-/* ACTIVE OR WAIT QUEUE ARE RELEASED, SCAN FLAG OF ROOT FRAG IS RESET AND THE SCAN */
-/* RECORD IS RELEASED. */
-/* --------------------------------------------------------------------------------- */
+/* -------------------------------------------------------------------------
+ * FLAG = 6 = ZCOPY_CLOSE THE SCAN PROCESS IS READY OR ABORTED.
+ * ALL OPERATION IN THE ACTIVE OR WAIT QUEUE ARE RELEASED,
+ * SCAN FLAG OF ROOT FRAG IS RESET AND THE SCAN RECORD IS RELEASED.
+ * ------------------------------------------------------------------------ */
void Dbacc::releaseScanLab(Signal* signal)
{
releaseAndCommitActiveOps(signal);
@@ -5715,8 +6616,17 @@ void Dbacc::releaseAndCommitActiveOps(Signal* signal)
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
if (!scanPtr.p->scanReadCommittedFlag) {
jam();
- commitOperation(signal);
+ if ((operationRecPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) ==
+ Operationrec::OP_STATE_EXECUTED)
+ {
+ commitOperation(signal);
+ }
+ else
+ {
+ abortOperation(signal);
+ }
}//if
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
takeOutActiveScanOp(signal);
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
@@ -5737,8 +6647,17 @@ void Dbacc::releaseAndCommitQueuedOps(Signal* signal)
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
if (!scanPtr.p->scanReadCommittedFlag) {
jam();
- commitOperation(signal);
+ if ((operationRecPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) ==
+ Operationrec::OP_STATE_EXECUTED)
+ {
+ commitOperation(signal);
+ }
+ else
+ {
+ abortOperation(signal);
+ }
}//if
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
takeOutReadyScanQueue(signal);
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
@@ -5761,6 +6680,7 @@ void Dbacc::releaseAndAbortLockedOps(Signal* signal) {
abortOperation(signal);
}//if
takeOutScanLockQueue(scanPtr.i);
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
operationRecPtr.i = trsoOperPtr.i;
@@ -5795,9 +6715,11 @@ void Dbacc::execACC_CHECK_SCAN(Signal* signal)
takeOutReadyScanQueue(signal);
fragrecptr.i = operationRecPtr.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- if (operationRecPtr.p->elementIsDisappeared == ZTRUE) {
+ if (operationRecPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED)
+ {
jam();
abortOperation(signal);
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
continue;
@@ -5884,7 +6806,8 @@ void Dbacc::execACC_TO_REQ(Signal* signal)
jamEntry();
tatrOpPtr.i = signal->theData[1]; /* OPER PTR OF ACC */
ptrCheckGuard(tatrOpPtr, coprecsize, operationrec);
- if (tatrOpPtr.p->operation == ZSCAN_OP) {
+ if ((tatrOpPtr.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP)
+ {
tatrOpPtr.p->transId1 = signal->theData[2];
tatrOpPtr.p->transId2 = signal->theData[3];
} else {
@@ -5981,29 +6904,28 @@ void Dbacc::initScanOpRec(Signal* signal)
scanPtr.p->scanOpsAllocated++;
+ Uint32 opbits = 0;
+ opbits |= ZSCAN_OP;
+ opbits |= scanPtr.p->scanLockMode ? Operationrec::OP_LOCK_MODE : 0;
+ opbits |= scanPtr.p->scanLockMode ? Operationrec::OP_ACC_LOCK_MODE : 0;
+ opbits |= scanPtr.p->scanReadCommittedFlag ?
+ Operationrec::OP_EXECUTED_DIRTY_READ : 0;
+ opbits |= Operationrec::OP_COMMIT_DELETE_CHECK;
operationRecPtr.p->userptr = RNIL;
operationRecPtr.p->scanRecPtr = scanPtr.i;
- operationRecPtr.p->operation = ZSCAN_OP;
- operationRecPtr.p->transactionstate = ACTIVE;
- operationRecPtr.p->commitDeleteCheckFlag = ZFALSE;
- operationRecPtr.p->lockMode = scanPtr.p->scanLockMode;
operationRecPtr.p->fid = fragrecptr.p->myfid;
operationRecPtr.p->fragptr = fragrecptr.i;
- operationRecPtr.p->elementIsDisappeared = ZFALSE;
operationRecPtr.p->nextParallelQue = RNIL;
operationRecPtr.p->prevParallelQue = RNIL;
operationRecPtr.p->nextSerialQue = RNIL;
operationRecPtr.p->prevSerialQue = RNIL;
operationRecPtr.p->transId1 = scanPtr.p->scanTrid1;
operationRecPtr.p->transId2 = scanPtr.p->scanTrid2;
- operationRecPtr.p->lockOwner = ZFALSE;
- operationRecPtr.p->dirtyRead = 0;
- operationRecPtr.p->nodeType = 0; // Not a stand-by node
operationRecPtr.p->elementIsforward = tisoIsforward;
operationRecPtr.p->elementContainer = tisoContainerptr;
operationRecPtr.p->elementPointer = tisoElementptr;
operationRecPtr.p->elementPage = isoPageptr.i;
- operationRecPtr.p->isAccLockReq = ZFALSE;
+ operationRecPtr.p->m_op_bits = opbits;
tisoLocalPtr = tisoElementptr + tisoIsforward;
guard24 = fragrecptr.p->localkeylen - 1;
for (tisoTmp = 0; tisoTmp <= guard24; tisoTmp++) {
@@ -6868,14 +7790,12 @@ void Dbacc::releaseOpRec(Signal* signal)
}
ndbrequire(opInList == false);
#endif
- ndbrequire(operationRecPtr.p->lockOwner == ZFALSE);
+ ndbrequire(operationRecPtr.p->m_op_bits == Operationrec::OP_INITIAL);
operationRecPtr.p->nextOp = cfreeopRec;
cfreeopRec = operationRecPtr.i; /* UPDATE FREE LIST OF OP RECORDS */
operationRecPtr.p->prevOp = RNIL;
- operationRecPtr.p->opState = FREE_OP;
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
}//Dbacc::releaseOpRec()
/* --------------------------------------------------------------------------------- */
@@ -7377,8 +8297,8 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal)
OperationrecPtr tmpOpPtr;
tmpOpPtr.i = recordNo;
ptrAss(tmpOpPtr, operationrec);
- infoEvent("Dbacc::operationrec[%d]: opState=%d, transid(0x%x, 0x%x)",
- tmpOpPtr.i, tmpOpPtr.p->opState, tmpOpPtr.p->transId1,
+ infoEvent("Dbacc::operationrec[%d]: transid(0x%x, 0x%x)",
+ tmpOpPtr.i, tmpOpPtr.p->transId1,
tmpOpPtr.p->transId2);
infoEvent("elementIsforward=%d, elementPage=%d, elementPointer=%d ",
tmpOpPtr.p->elementIsforward, tmpOpPtr.p->elementPage,
@@ -7386,8 +8306,7 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal)
infoEvent("fid=%d, fragptr=%d, hashvaluePart=%d ",
tmpOpPtr.p->fid, tmpOpPtr.p->fragptr,
tmpOpPtr.p->hashvaluePart);
- infoEvent("hashValue=%d, insertDeleteLen=%d",
- tmpOpPtr.p->hashValue, tmpOpPtr.p->insertDeleteLen);
+ infoEvent("hashValue=%d", tmpOpPtr.p->hashValue);
infoEvent("nextLockOwnerOp=%d, nextOp=%d, nextParallelQue=%d ",
tmpOpPtr.p->nextLockOwnerOp, tmpOpPtr.p->nextOp,
tmpOpPtr.p->nextParallelQue);
@@ -7398,15 +8317,8 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal)
tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue);
infoEvent("prevSerialQue=%d, scanRecPtr=%d",
tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr);
- infoEvent("transactionstate=%d, elementIsDisappeared=%d, insertIsDone=%d ",
- tmpOpPtr.p->transactionstate, tmpOpPtr.p->elementIsDisappeared,
- tmpOpPtr.p->insertIsDone);
- infoEvent("lockMode=%d, lockOwner=%d, nodeType=%d ",
- tmpOpPtr.p->lockMode, tmpOpPtr.p->lockOwner,
- tmpOpPtr.p->nodeType);
- infoEvent("operation=%d, opSimple=%d, dirtyRead=%d,scanBits=%d ",
- tmpOpPtr.p->operation, tmpOpPtr.p->opSimple,
- tmpOpPtr.p->dirtyRead, tmpOpPtr.p->scanBits);
+ infoEvent("m_op_bits=0x%x, scanBits=%d ",
+ tmpOpPtr.p->m_op_bits, tmpOpPtr.p->scanBits);
return;
}
diff --git a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
index 407cd8074ab..d228d8ae819 100644
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
@@ -33,8 +33,6 @@
// primary key is stored in TUP
#include "../dbtup/Dbtup.hpp"
-#include "../dbacc/Dbacc.hpp"
-
#ifdef DBLQH_C
// Constants
/* ------------------------------------------------------------------------- */
@@ -2556,8 +2554,19 @@ private:
Dbtup* c_tup;
Dbacc* c_acc;
+
+ /**
+ * Read primary key from tup
+ */
Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst);
+ /**
+ * Read primary key from operation
+ */
+public:
+ Uint32 readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm);
+private:
+
void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32);
void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32);
@@ -2924,6 +2933,11 @@ public:
}
DLHashTable<ScanRecord> c_scanTakeOverHash;
+
+#ifdef ERROR_INSERT
+ inline bool TRACE_OP_CHECK(const TcConnectionrec* regTcPtr);
+ void TRACE_OP_DUMP(const TcConnectionrec* regTcPtr, const char * pos);
+#endif
};
inline
@@ -2991,10 +3005,19 @@ Dblqh::accminupdate(Signal* signal, Uint32 opId, const Local_key* key)
signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS | key->m_page_idx;
c_acc->execACCMINUPDATE(signal);
- if (ERROR_INSERTED(5712))
+ if (ERROR_INSERTED(5712) || ERROR_INSERTED(5713))
ndbout << " LK: " << *key;
regTcPtr.p->m_row_id = *key;
}
+inline
+bool
+Dblqh::TRACE_OP_CHECK(const TcConnectionrec* regTcPtr)
+{
+ return (ERROR_INSERTED(5712) &&
+ (regTcPtr->operation == ZINSERT ||
+ regTcPtr->operation == ZDELETE)) ||
+ ERROR_INSERTED(5713);
+}
#endif
diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index d031f9a00bf..18e5e6cd585 100644
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -67,48 +67,70 @@
// seen only when we debug the product
#ifdef VM_TRACE
#define DEBUG(x) ndbout << "DBLQH: "<< x << endl;
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::TransactionState state){
out << (int)state;
return out;
}
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::LogWriteState state){
out << (int)state;
return out;
}
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::ListState state){
out << (int)state;
return out;
}
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::AbortState state){
out << (int)state;
return out;
}
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanState state){
out << (int)state;
return out;
}
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::LogFileOperationRecord::LfoState state){
out << (int)state;
return out;
}
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){
out << (int)state;
return out;
}
+static
+NdbOut &
+operator<<(NdbOut& out, Operation_t op)
+{
+ switch(op){
+ case ZREAD: out << "READ"; break;
+ case ZREAD_EX: out << "READ-EX"; break;
+ case ZINSERT: out << "INSERT"; break;
+ case ZUPDATE: out << "UPDATE"; break;
+ case ZDELETE: out << "DELETE"; break;
+ case ZWRITE: out << "WRITE"; break;
+ }
+ return out;
+}
+
#else
#define DEBUG(x)
#endif
@@ -120,7 +142,7 @@ const Uint32 NR_ScanNo = 0;
#if defined VM_TRACE || defined ERROR_INSERT || defined NDBD_TRACENR
#include <NdbConfig.h>
-NdbOut * tracenrout = 0;
+static NdbOut * tracenrout = 0;
static int TRACENR_FLAG = 0;
#define TRACENR(x) (* tracenrout) << x
#define SET_TRACENR_FLAG TRACENR_FLAG = 1
@@ -132,6 +154,13 @@ static int TRACENR_FLAG = 0;
#define CLEAR_TRACENR_FLAG
#endif
+#ifdef ERROR_INSERT
+static NdbOut * traceopout = 0;
+#define TRACE_OP(regTcPtr, place) do { if (TRACE_OP_CHECK(regTcPtr)) TRACE_OP_DUMP(regTcPtr, place); } while(0)
+#else
+#define TRACE_OP(x) {}
+#endif
+
/* ------------------------------------------------------------------------- */
/* ------- SEND SYSTEM ERROR ------- */
/* */
@@ -454,6 +483,10 @@ void Dblqh::execSTTOR(Signal* signal)
name = NdbConfig_SignalLogFileName(getOwnNodeId());
tracenrout = new NdbOut(* new FileOutputStream(fopen(name, "w+")));
#endif
+
+#ifdef ERROR_INSERT
+ traceopout = &ndbout;
+#endif
return;
break;
@@ -2531,14 +2564,15 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
case TcConnectionrec::WAIT_ACC_ABORT:
case TcConnectionrec::ABORT_QUEUED:
jam();
-/* -------------------------------------------------------------------------- */
-/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */
-/* -------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */
+/* ------------------------------------------------------------------------- */
break;
default:
ndbrequire(false);
break;
}//switch
+
}//Dblqh::execTUPKEYCONF()
/* ************> */
@@ -2560,6 +2594,8 @@ void Dblqh::execTUPKEYREF(Signal* signal)
c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr;
+ TRACE_OP(regTcPtr, "TUPKEYREF");
+
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{
jam();
@@ -2568,7 +2604,12 @@ void Dblqh::execTUPKEYREF(Signal* signal)
ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP ||
regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT);
}
-
+ else if (getNodeState().startLevel == NodeState::SL_STARTED)
+ {
+ if (terrorCode == 899)
+ ndbout << "899: " << regTcPtr->m_row_id << endl;
+ }
+
switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP:
jam();
@@ -3606,57 +3647,7 @@ void Dblqh::endgettupkeyLab(Signal* signal)
regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR;
return;
}//if
-//#define TRACE_LQHKEYREQ
-#ifdef TRACE_LQHKEYREQ
- {
- ndbout << (regTcPtr->operation == ZREAD ? "READ" :
- regTcPtr->operation == ZUPDATE ? "UPDATE" :
- regTcPtr->operation == ZINSERT ? "INSERT" :
- regTcPtr->operation == ZDELETE ? "DELETE" : "<Other>")
- << "(" << (int)regTcPtr->operation << ")"
- << " from=(" << getBlockName(refToBlock(regTcPtr->clientBlockref))
- << ", " << refToNode(regTcPtr->clientBlockref) << ")"
- << " table=" << regTcPtr->tableref << " ";
-
- ndbout << "hash: " << hex << regTcPtr->hashValue << endl;
-
- ndbout << "key=[" << hex;
- Uint32 i;
- for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
- ndbout << hex << regTcPtr->tupkeyData[i] << " ";
- }
-
- DatabufPtr regDatabufptr;
- regDatabufptr.i = regTcPtr->firstTupkeybuf;
- while(i < regTcPtr->primKeyLen)
- {
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
- ndbout << hex << regDatabufptr.p->data[j] << " ";
- }
- ndbout << "]" << endl;
-
- ndbout << "attr=[" << hex;
- for(i = 0; i<regTcPtr->reclenAiLqhkey && i < 5; i++)
- ndbout << hex << regTcPtr->firstAttrinfo[i] << " ";
-
- AttrbufPtr regAttrinbufptr;
- regAttrinbufptr.i= regTcPtr->firstAttrinbuf;
- while(i < regTcPtr->totReclenAi)
- {
- ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
- Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
- ndbrequire(dataLen != 0);
- ndbrequire(i + dataLen <= regTcPtr->totReclenAi);
- for(Uint32 j= 0; j<dataLen; j++, i++)
- ndbout << hex << regAttrinbufptr.p->attrbuf[j] << " ";
-
- regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
- }
-
- ndbout << "]" << endl;
- }
-#endif
+
/* ---------------------------------------------------------------------- */
/* NOW RECEPTION OF LQHKEYREQ IS COMPLETED THE NEXT STEP IS TO START*/
/* PROCESSING THE MESSAGE. IF THE MESSAGE IS TO A STAND-BY NODE */
@@ -3763,6 +3754,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
/* ----------------------------------------------------------------- */
if (TRACENR_FLAG)
{
+ TRACE_OP(regTcPtr, "RECEIVED");
switch (regTcPtr->operation) {
case ZREAD: TRACENR("READ"); break;
case ZUPDATE: TRACENR("UPDATE"); break;
@@ -3847,6 +3839,9 @@ Dblqh::exec_acckeyreq(Signal* signal, TcConnectionrecPtr regTcPtr)
signal->theData[8] = sig2;
signal->theData[9] = sig3;
signal->theData[10] = sig4;
+
+ TRACE_OP(regTcPtr.p, "ACC");
+
if (regTcPtr.p->primKeyLen > 4) {
sendKeyinfoAcc(signal, 11);
}//if
@@ -4133,7 +4128,7 @@ Dblqh::nr_copy_delete_row(Signal* signal,
jam();
ndbrequire(rowid == 0);
signal->theData[0] = accPtr;
- signal->theData[1] = false;
+ signal->theData[1] = 0;
EXECUTE_DIRECT(ref, GSN_ACC_ABORTREQ, signal, 2);
jamEntry();
return;
@@ -4144,16 +4139,18 @@ Dblqh::nr_copy_delete_row(Signal* signal,
*/
ndbrequire(regTcPtr.p->m_dealloc == 0);
Local_key save = regTcPtr.p->m_row_id;
- signal->theData[0] = regTcPtr.p->accConnectrec;
+
+ c_acc->execACCKEY_ORD(signal, accPtr);
+ signal->theData[0] = accPtr;
EXECUTE_DIRECT(ref, GSN_ACC_COMMITREQ, signal, 1);
jamEntry();
-
+
ndbrequire(regTcPtr.p->m_dealloc == 1);
int ret = c_tup->nr_delete(signal, regTcPtr.i,
fragPtr.p->tupFragptr, &regTcPtr.p->m_row_id,
regTcPtr.p->gci);
jamEntry();
-
+
if (ret)
{
ndbassert(ret == 1);
@@ -4167,7 +4164,7 @@ Dblqh::nr_copy_delete_row(Signal* signal,
}
TRACENR("DELETED: " << regTcPtr.p->m_row_id << endl);
-
+
regTcPtr.p->m_dealloc = 0;
regTcPtr.p->m_row_id = save;
fragptr = fragPtr;
@@ -4274,6 +4271,45 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op)
}
}
+Uint32
+Dblqh::readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm)
+{
+ TcConnectionrecPtr regTcPtr;
+ DatabufPtr regDatabufptr;
+ Uint64 Tmp[MAX_KEY_SIZE_IN_WORDS >> 1];
+
+ jamEntry();
+ regTcPtr.i = opPtrI;
+ ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
+
+ Uint32 tableId = regTcPtr.p->tableref;
+ Uint32 keyLen = regTcPtr.p->primKeyLen;
+ regDatabufptr.i = regTcPtr.p->firstTupkeybuf;
+ Uint32 * tmp = xfrm ? (Uint32*)Tmp : dst;
+
+ memcpy(tmp, regTcPtr.p->tupkeyData, sizeof(regTcPtr.p->tupkeyData));
+ if (keyLen > 4)
+ {
+ tmp += 4;
+ Uint32 pos = 4;
+ do {
+ ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
+ memcpy(tmp, regDatabufptr.p->data, sizeof(regDatabufptr.p->data));
+ regDatabufptr.i = regDatabufptr.p->nextDatabuf;
+ tmp += sizeof(regDatabufptr.p->data) >> 2;
+ pos += sizeof(regDatabufptr.p->data) >> 2;
+ } while(pos < keyLen);
+ }
+
+ if (xfrm)
+ {
+ jam();
+ Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
+ return xfrm_key(tableId, (Uint32*)Tmp, dst, ~0, keyPartLen);
+ }
+
+ return keyLen;
+}
/* =*======================================================================= */
/* ======= SEND KEYINFO TO ACC ======= */
@@ -4447,10 +4483,6 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr,
* ----------------------------------------------------------------------- */
Uint32 page_idx = local_key & MAX_TUPLES_PER_PAGE;
Uint32 page_no = local_key >> MAX_TUPLES_BITS;
-#ifdef TRACE_LQHKEYREQ
- ndbout << "localkey: [ " << hex << page_no << " " << page_idx << "]"
- << endl;
-#endif
Uint32 Ttupreq = regTcPtr->dirtyOp;
Ttupreq = Ttupreq + (regTcPtr->opSimple << 1);
Ttupreq = Ttupreq + (op << 6);
@@ -4506,70 +4538,13 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr,
tupKeyReq->m_row_id_page_no = sig0;
tupKeyReq->m_row_id_page_idx = sig1;
- if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT)
- {
- ndbout << "INSERT " << regFragptrP->tabRef
- << "(" << regFragptrP->fragId << ")";
-
- {
- ndbout << "key=[" << hex;
- Uint32 i;
- for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
- ndbout << hex << regTcPtr->tupkeyData[i] << " ";
- }
-
- DatabufPtr regDatabufptr;
- regDatabufptr.i = regTcPtr->firstTupkeybuf;
- while(i < regTcPtr->primKeyLen)
- {
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
- ndbout << hex << regDatabufptr.p->data[j] << " ";
- }
- ndbout << "] ";
- }
-
- if(regTcPtr->m_use_rowid)
- ndbout << " " << regTcPtr->m_row_id;
- }
-
- if (ERROR_INSERTED(5712) && regTcPtr->operation == ZDELETE)
- {
- Local_key lk; lk.assref(local_key);
-
- ndbout << "DELETE " << regFragptrP->tabRef
- << "(" << regFragptrP->fragId << ") " << lk;
-
- {
- ndbout << "key=[" << hex;
- Uint32 i;
- for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
- ndbout << hex << regTcPtr->tupkeyData[i] << " ";
- }
-
- DatabufPtr regDatabufptr;
- regDatabufptr.i = regTcPtr->firstTupkeybuf;
- while(i < regTcPtr->primKeyLen)
- {
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
- ndbout << hex << regDatabufptr.p->data[j] << " ";
- }
- ndbout << "]" << endl;
- }
-
- }
-
+ TRACE_OP(regTcPtr, "TUPKEYREQ");
+
regTcPtr->m_use_rowid |= (op == ZINSERT);
regTcPtr->m_row_id.m_page_no = page_no;
regTcPtr->m_row_id.m_page_idx = page_idx;
EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
-
- if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT)
- {
- ndbout << endl;
- }
}//Dblqh::execACCKEYCONF()
void
@@ -4654,27 +4629,37 @@ void Dblqh::tupkeyConfLab(Signal* signal)
const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0];
TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
+ Uint32 readLen = tupKeyConf->readLength;
+ Uint32 writeLen = tupKeyConf->writeLength;
+ Uint32 accOp = regTcPtr->accConnectrec;
+ c_acc->execACCKEY_ORD(signal, accOp);
+
+ TRACE_OP(regTcPtr, "TUPKEYCONF");
+
if (regTcPtr->simpleRead) {
jam();
/* ----------------------------------------------------------------------
- * THE OPERATION IS A SIMPLE READ. WE WILL IMMEDIATELY COMMIT THE OPERATION.
- * SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK (FOR LOCAL CHECKPOINTS) YET
+ * THE OPERATION IS A SIMPLE READ.
+ * WE WILL IMMEDIATELY COMMIT THE OPERATION.
+ * SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK
+ * (FOR LOCAL CHECKPOINTS) YET
* WE CAN GO IMMEDIATELY TO COMMIT_CONTINUE_AFTER_BLOCKED.
- * WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN READ LENGTH
- * ---------------------------------------------------------------------- */
+ * WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN
+ * READ LENGTH
+ * --------------------------------------------------------------------- */
commitContinueAfterBlockedLab(signal);
return;
}//if
- if (tupKeyConf->readLength != 0) {
+ if (readLen != 0)
+ {
jam();
/* SET BIT 15 IN REQINFO */
LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1);
-
- regTcPtr->readlenAi = tupKeyConf->readLength;
+ regTcPtr->readlenAi = readLen;
}//if
- regTcPtr->totSendlenAi = tupKeyConf->writeLength;
+ regTcPtr->totSendlenAi = writeLen;
ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
@@ -5597,6 +5582,8 @@ void Dblqh::releaseOprec(Signal* signal)
if (TRACENR_FLAG)
TRACENR("DELETED: " << regTcPtr->m_row_id << endl);
+
+ TRACE_OP(regTcPtr, "DEALLOC");
signal->theData[0] = regTcPtr->fragmentid;
signal->theData[1] = regTcPtr->tableref;
@@ -5818,6 +5805,10 @@ void Dblqh::execCOMMIT(Signal* signal)
ptrAss(tcConnectptr, regTcConnectionrec);
if ((tcConnectptr.p->transid[0] == transid1) &&
(tcConnectptr.p->transid[1] == transid2)) {
+
+ TcConnectionrec * const regTcPtr = tcConnectptr.p;
+ TRACE_OP(regTcPtr, "COMMIT");
+
commitReqLab(signal, gci);
return;
}//if
@@ -5937,6 +5928,10 @@ void Dblqh::execCOMPLETE(Signal* signal)
if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) &&
(tcConnectptr.p->transid[0] == transid1) &&
(tcConnectptr.p->transid[1] == transid2)) {
+
+ TcConnectionrec * const regTcPtr = tcConnectptr.p;
+ TRACE_OP(regTcPtr, "COMPLETE");
+
if (tcConnectptr.p->seqNoReplica != 0 &&
tcConnectptr.p->activeCreat == Fragrecord::AC_NORMAL) {
jam();
@@ -6313,12 +6308,16 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
TRACENR(endl);
}
+ TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
+
Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
} else {
if(!dirtyOp){
+ TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
+
Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
@@ -6362,6 +6361,8 @@ Dblqh::tupcommit_conf_callback(Signal* signal, Uint32 tcPtrI)
c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr;
+ TRACE_OP(tcPtr, "ACC_COMMITREQ");
+
Uint32 acc = refToBlock(tcPtr->tcAccBlockref);
signal->theData[0] = tcPtr->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
@@ -6670,6 +6671,8 @@ void Dblqh::execABORT(Signal* signal)
regTcPtr->commitAckMarker = RNIL;
}
+ TRACE_OP(regTcPtr, "ABORT");
+
abortStateHandlerLab(signal);
return;
@@ -7087,23 +7090,30 @@ void Dblqh::abortContinueAfterBlockedLab(Signal* signal, bool canBlock)
* ALSO AS PART OF A NORMAL ABORT WITHOUT BLOCKING.
* WE MUST ABORT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN
* AND SEES A STATE IN TUP.
- * ------------------------------------------------------------------------ */
+ * ----------------------------------------------------------------------- */
TcConnectionrec * const regTcPtr = tcConnectptr.p;
- fragptr.i = regTcPtr->fragmentptr;
- c_fragment_pool.getPtr(fragptr);
- signal->theData[0] = regTcPtr->tupConnectrec;
- EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
+
+ TRACE_OP(regTcPtr, "ACC ABORT");
+
regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
signal->theData[0] = regTcPtr->accConnectrec;
- signal->theData[1] = true;
+ signal->theData[1] = 2; // JOB BUFFER IF NEEDED
EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 2);
- /* ------------------------------------------------------------------------
- * We need to insert a real-time break by sending ACC_ABORTCONF through the
- * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
- * TUPKEYREF that are in the job buffer but not yet processed. Doing
- * everything without that would race and create a state error when they
- * are executed.
- * ----------------------------------------------------------------------- */
+
+ if (signal->theData[1] == RNIL)
+ {
+ jam();
+ /* ------------------------------------------------------------------------
+ * We need to insert a real-time break by sending ACC_ABORTCONF through the
+ * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
+ * TUPKEYREF that are in the job buffer but not yet processed. Doing
+ * everything without that would race and create a state error when they
+ * are executed.
+ * --------------------------------------------------------------------- */
+ return;
+ }
+
+ execACC_ABORTCONF(signal);
return;
}//Dblqh::abortContinueAfterBlockedLab()
@@ -7117,6 +7127,11 @@ void Dblqh::execACC_ABORTCONF(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
TcConnectionrec * const regTcPtr = tcConnectptr.p;
ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT);
+
+ TRACE_OP(regTcPtr, "ACC_ABORTCONF");
+ signal->theData[0] = regTcPtr->tupConnectrec;
+ EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
+
continueAbortLab(signal);
return;
}//Dblqh::execACC_ABORTCONF()
@@ -8837,7 +8852,9 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
}//if
// If accOperationPtr == RNIL no record was returned by ACC
- if (nextScanConf->accOperationPtr == RNIL) {
+ Uint32 accOpPtr = nextScanConf->accOperationPtr;
+ if (accOpPtr == RNIL)
+ {
jam();
/*************************************************************
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
@@ -8871,7 +8888,8 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
jam();
set_acc_ptr_in_scan_record(scanptr.p,
scanptr.p->m_curr_batch_size_rows,
- nextScanConf->accOperationPtr);
+ accOpPtr);
+
jam();
nextScanConfLoopLab(signal);
}//Dblqh::nextScanConfScanLab()
@@ -9071,12 +9089,24 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
+
+ Uint32 rows = scanptr.p->m_curr_batch_size_rows;
+ Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false);
+ if (accOpPtr != (Uint32)-1)
+ {
+ c_acc->execACCKEY_ORD(signal, accOpPtr);
+ }
+ else
+ {
+ ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC);
+ }
+
if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */
- if ((scanptr.p->scanLockHold == ZTRUE) &&
- (scanptr.p->m_curr_batch_size_rows > 0)) {
+ if ((scanptr.p->scanLockHold == ZTRUE) && rows)
+ {
jam();
scanptr.p->scanReleaseCounter = 1;
scanReleaseLocksLab(signal);
@@ -9093,7 +9123,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
}//if
ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
scanptr.p->m_curr_batch_size_bytes+= tdata4;
- scanptr.p->m_curr_batch_size_rows++;
+ scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->m_last_row = tdata5;
if (scanptr.p->check_scan_batch_completed() | tdata5){
if (scanptr.p->scanLockHold == ZTRUE) {
@@ -9103,7 +9133,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
return;
} else {
jam();
- scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows;
+ scanptr.p->scanReleaseCounter = rows + 1;
scanReleaseLocksLab(signal);
return;
}
@@ -9187,12 +9217,24 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
+
+ Uint32 rows = scanptr.p->m_curr_batch_size_rows;
+ Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false);
+ if (accOpPtr != (Uint32)-1)
+ {
+ c_acc->execACCKEY_ORD(signal, accOpPtr);
+ }
+ else
+ {
+ ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC);
+ }
+
if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */
- if ((scanptr.p->scanLockHold == ZTRUE) &&
- (scanptr.p->m_curr_batch_size_rows > 0)) {
+ if ((scanptr.p->scanLockHold == ZTRUE) && rows)
+ {
jam();
scanptr.p->scanReleaseCounter = 1;
scanReleaseLocksLab(signal);
@@ -9213,8 +9255,8 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
scanptr.p->scanReleaseCounter = 1;
} else {
jam();
- scanptr.p->m_curr_batch_size_rows++;
- scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows;
+ scanptr.p->m_curr_batch_size_rows = rows + 1;
+ scanptr.p->scanReleaseCounter = rows + 1;
}//if
/* --------------------------------------------------------------------
* WE NEED TO RELEASE ALL LOCKS CURRENTLY
@@ -9224,7 +9266,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
return;
}//if
Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount;
- if (scanptr.p->m_curr_batch_size_rows > 0) {
+ if (rows) {
if (time_passed > 1) {
/* -----------------------------------------------------------------------
* WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A
@@ -9232,7 +9274,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
* THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we
* send the found tuples to the API.
* ----------------------------------------------------------------------- */
- scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows + 1;
+ scanptr.p->scanReleaseCounter = rows + 1;
scanReleaseLocksLab(signal);
return;
}
@@ -9872,6 +9914,8 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
fragptr.p->m_scanNumberMask.clear(NR_ScanNo);
scanptr.p->scanBlockref = DBTUP_REF;
scanptr.p->scanLockHold = ZFALSE;
+ scanptr.p->m_curr_batch_size_rows = 0;
+ scanptr.p->m_curr_batch_size_bytes= 0;
initScanTc(0,
0,
@@ -10074,7 +10118,7 @@ void Dblqh::nextScanConfCopyLab(Signal* signal)
initCopyTc(signal, ZDELETE);
set_acc_ptr_in_scan_record(scanptr.p, 0, RNIL);
tcConP->gci = nextScanConf->gci;
-
+
tcConP->primKeyLen = 0;
tcConP->totSendlenAi = 0;
tcConP->connectState = TcConnectionrec::COPY_CONNECTED;
@@ -10197,6 +10241,12 @@ void Dblqh::copyTupkeyConfLab(Signal* signal)
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
ScanRecord* scanP = scanptr.p;
+
+ Uint32 rows = scanP->m_curr_batch_size_rows;
+ Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, rows, false);
+ ndbassert(accOpPtr != (Uint32)-1);
+ c_acc->execACCKEY_ORD(signal, accOpPtr);
+
if (tcConnectptr.p->errorCode != 0) {
jam();
closeCopyLab(signal);
@@ -18538,6 +18588,21 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
}
ndbrequire(arg != 2308);
}
+
+#ifdef ERROR_INSERT
+ if (arg == 5712 || arg == 5713)
+ {
+ if (arg == 5712)
+ {
+ traceopout = &ndbout;
+ }
+ else if (arg == 5713)
+ {
+ traceopout = tracenrout;
+ }
+ SET_ERROR_INSERT_VALUE(arg);
+ }
+#endif
}//Dblqh::execDUMP_STATE_ORD()
@@ -18702,3 +18767,39 @@ void Dblqh::writeDbgInfoPageHeader(LogPageRecordPtr logP, Uint32 place,
logP.p->logPageWord[ZPOS_IN_WRITING]= 1;
}
+#if defined ERROR_INSERT
+void
+Dblqh::TRACE_OP_DUMP(const Dblqh::TcConnectionrec* regTcPtr, const char * pos)
+{
+ (* traceopout)
+ << "[ " << hex << regTcPtr->transid[0]
+ << " " << hex << regTcPtr->transid[1] << " ] " << dec
+ << pos
+ << " " << (Operation_t)regTcPtr->operation
+ << " " << regTcPtr->tableref
+ << "(" << regTcPtr->fragmentid << ")"
+ << "(" << (regTcPtr->seqNoReplica == 0 ? "P" : "B") << ")" ;
+
+ {
+ (* traceopout) << "key=[" << hex;
+ Uint32 i;
+ for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
+ (* traceopout) << hex << regTcPtr->tupkeyData[i] << " ";
+ }
+
+ DatabufPtr regDatabufptr;
+ regDatabufptr.i = regTcPtr->firstTupkeybuf;
+ while(i < regTcPtr->primKeyLen)
+ {
+ ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
+ for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
+ (* traceopout) << hex << regDatabufptr.p->data[j] << " ";
+ }
+ (* traceopout) << "] ";
+ }
+
+ if (regTcPtr->m_use_rowid)
+ (* traceopout) << " " << regTcPtr->m_row_id;
+ (* traceopout) << endl;
+}
+#endif
diff --git a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
index b319932aec7..4ff6e069963 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
@@ -297,7 +297,6 @@ enum TransState {
};
enum TupleState {
- TUPLE_INITIAL_INSERT = 0,
TUPLE_PREPARED = 1,
TUPLE_ALREADY_ABORTED = 2,
TUPLE_TO_BE_COMMITTED = 3
@@ -305,7 +304,6 @@ enum TupleState {
enum State {
NOT_INITIALIZED = 0,
- COMMON_AREA_PAGES = 1,
IDLE = 17,
ACTIVE = 18,
SYSTEM_RESTART = 19,
@@ -1441,7 +1439,6 @@ private:
void execSET_VAR_REQ(Signal* signal);
void execDROP_TAB_REQ(Signal* signal);
void execALTER_TAB_REQ(Signal* signal);
- void execTUP_ALLOCREQ(Signal* signal);
void execTUP_DEALLOCREQ(Signal* signal);
void execTUP_WRITELOG_REQ(Signal* signal);
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
index 90abe2cb809..d45c5f9d988 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
@@ -202,29 +202,6 @@ Dbtup::receive_attrinfo(Signal* signal, Uint32 op,
}
}
-void Dbtup::execTUP_ALLOCREQ(Signal* signal)
-{
- OperationrecPtr regOperPtr;
-
- jamEntry();
-
- regOperPtr.i= signal->theData[0];
- c_operation_pool.getPtr(regOperPtr);
-
- regOperPtr.p->op_struct.tuple_state= TUPLE_INITIAL_INSERT;
- //ndbout_c("execTUP_ALLOCREQ");
-
- signal->theData[0]= 0;
- signal->theData[1]= ~0 >> MAX_TUPLES_BITS;
- signal->theData[2]= (1 << MAX_TUPLES_BITS) - 1;
- return;
-
-mem_error:
- jam();
- signal->theData[0]= ZMEM_NOMEM_ERROR;
- return;
-}
-
void
Dbtup::setChecksum(Tuple_header* tuple_ptr,
Tablerec* regTabPtr)
@@ -455,13 +432,13 @@ Dbtup::load_diskpage(Signal* signal,
ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
Tablerec* regTabPtr = tabptr.p;
- if(regOperPtr->op_struct.tuple_state == TUPLE_INITIAL_INSERT)
+ if(local_key == ~(Uint32)0)
{
jam();
regOperPtr->op_struct.m_wait_log_buffer= 1;
regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
return 1;
- }
+ }
jam();
Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
@@ -663,7 +640,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
regOperPtr->savepointId= sig1;
regOperPtr->op_struct.primary_replica= sig2;
- regOperPtr->m_tuple_location.m_page_idx= sig3;
+ Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3;
sig1= tupKeyReq->opRef;
sig2= tupKeyReq->tcOpIndex;
@@ -673,7 +650,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
req_struct.tc_operation_ptr= sig1;
req_struct.TC_index= sig2;
req_struct.TC_ref= sig3;
- req_struct.frag_page_id= sig4;
+ Uint32 pageid = req_struct.frag_page_id= sig4;
req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
sig1= tupKeyReq->attrBufLen;
@@ -706,7 +683,8 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
copyAttrinfo(regOperPtr, &cinBuffer[0]);
- if(Roptype == ZINSERT && get_tuple_state(regOperPtr)== TUPLE_INITIAL_INSERT)
+ Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
+ if(Roptype == ZINSERT && localkey == ~0)
{
// No tuple allocatated yet
goto do_insert;
@@ -1159,49 +1137,6 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
}
-void
-Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct,
- Operationrec* regOperPtr,
- Tablerec* regTabPtr)
-{
- regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc);
- req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
-
- const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
- const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
- Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
-
- if(cnt1)
- {
- // Disk part is 32-bit aligned
- char *varptr = req_struct->m_var_data[MM].m_data_ptr;
- ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset);
- }
- else
- {
- ptr -= Tuple_header::HeaderSize;
- }
-
- req_struct->m_disk_ptr= (Tuple_header*)ptr;
-
- if(cnt2)
- {
- KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
- ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
- dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
- dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
- dst->m_var_len_offset= cnt2;
- dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
- }
-
- // Set all null bits
- memset(req_struct->m_disk_ptr->m_null_bits+
- regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
- 4*regTabPtr->m_offsets[DD].m_null_words);
- req_struct->m_tuple_ptr->m_header_bits =
- (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE);
-}
-
int Dbtup::handleInsertReq(Signal* signal,
Ptr<Operationrec> regOperPtr,
Ptr<Fragrecord> fragPtr,
@@ -1215,8 +1150,8 @@ int Dbtup::handleInsertReq(Signal* signal,
Tuple_header *tuple_ptr;
bool disk = regTabPtr->m_no_of_disk_attributes > 0;
- bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT;
- bool disk_insert = regOperPtr.p->is_first_operation() && disk;
+ bool mem_insert = regOperPtr.p->is_first_operation();
+ bool disk_insert = mem_insert && disk;
bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
bool rowid = req_struct->m_use_rowid;
Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
@@ -1244,21 +1179,16 @@ int Dbtup::handleInsertReq(Signal* signal,
if(mem_insert)
{
jam();
- ndbassert(regOperPtr.p->is_first_operation()); // disk insert
prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
}
else
{
- if (!regOperPtr.p->is_first_operation())
- {
- Operationrec* prevOp= req_struct->prevOpPtr.p;
- ndbassert(prevOp->op_struct.op_type == ZDELETE);
- tup_version= prevOp->tupVersion + 1;
-
- if(!prevOp->is_first_operation())
- org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
- }
-
+ Operationrec* prevOp= req_struct->prevOpPtr.p;
+ ndbassert(prevOp->op_struct.op_type == ZDELETE);
+ tup_version= prevOp->tupVersion + 1;
+
+ if(!prevOp->is_first_operation())
+ org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
if (regTabPtr->need_expand())
expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
else
@@ -1268,11 +1198,6 @@ int Dbtup::handleInsertReq(Signal* signal,
if (disk_insert)
{
int res;
- if (unlikely(!mem_insert))
- {
- sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size;
- fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr);
- }
if (ERROR_INSERTED(4015))
{
@@ -1381,6 +1306,7 @@ int Dbtup::handleInsertReq(Signal* signal,
}
if (unlikely(ptr == 0))
{
+ jam();
goto alloc_rowid_error;
}
}
@@ -1396,7 +1322,7 @@ int Dbtup::handleInsertReq(Signal* signal,
(varsize ? Tuple_header::CHAINED_ROW : 0);
regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
}
- else if(!rowid || !regOperPtr.p->is_first_operation())
+ else
{
int ret;
if (ERROR_INSERTED(4020))
@@ -1417,20 +1343,6 @@ int Dbtup::handleInsertReq(Signal* signal,
req_struct->m_use_rowid = false;
base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
}
- else
- {
- if ((req_struct->m_row_id.m_page_no == frag_page_id &&
- req_struct->m_row_id.m_page_idx == regOperPtr.p->m_tuple_location.m_page_idx))
- {
- ndbout_c("no mem insert but rowid (same)");
- base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
- }
- else
- {
- // no mem insert, but rowid
- ndbrequire(false);
- }
- }
base->m_header_bits |= Tuple_header::ALLOC &
(regOperPtr.p->is_first_operation() ? ~0 : 1);
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
index d1580f8da54..8a68905cef9 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
@@ -89,7 +89,6 @@ Dbtup::Dbtup(Block_context& ctx, Pgman* pgman)
addRecSignal(GSN_DROP_TAB_REQ, &Dbtup::execDROP_TAB_REQ);
- addRecSignal(GSN_TUP_ALLOCREQ, &Dbtup::execTUP_ALLOCREQ);
addRecSignal(GSN_TUP_DEALLOCREQ, &Dbtup::execTUP_DEALLOCREQ);
addRecSignal(GSN_TUP_WRITELOG_REQ, &Dbtup::execTUP_WRITELOG_REQ);
diff --git a/storage/ndb/test/ndbapi/testOperations.cpp b/storage/ndb/test/ndbapi/testOperations.cpp
index 65b406f155d..e116545f7e3 100644
--- a/storage/ndb/test/ndbapi/testOperations.cpp
+++ b/storage/ndb/test/ndbapi/testOperations.cpp
@@ -665,9 +665,9 @@ main(int argc, const char** argv){
for(Uint32 i = 0; i < 12; i++)
{
- if(i == 6 || i == 8 || i == 10)
+ if(false && (i == 6 || i == 8 || i == 10))
continue;
-
+
BaseString name("bug_9749");
name.appfmt("_%d", i);
NDBT_TestCaseImpl1 *pt = new NDBT_TestCaseImpl1(&ts,