summaryrefslogtreecommitdiff
path: root/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp')
-rw-r--r--storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp3184
1 files changed, 2054 insertions, 1130 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
index 2d6f579302c..dc8b123f10f 100644
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
@@ -41,6 +41,19 @@
#define DEBUG(x)
#endif
+#ifdef ACC_SAFE_QUEUE
+#define vlqrequire(x) do { if (unlikely(!(x))) {\
+ dump_lock_queue(loPtr); \
+ ndbrequire(false); } } while(0)
+#else
+#define vlqrequire(x) ndbrequire(x)
+#endif
+
+
+// primary key is stored in TUP
+#include "../dbtup/Dbtup.hpp"
+#include "../dblqh/Dblqh.hpp"
+
// Signal entries and statement blocks
/* --------------------------------------------------------------------------------- */
@@ -208,8 +221,8 @@ void Dbacc::execSTTOR(Signal* signal)
switch (tstartphase) {
case 1:
jam();
- c_tup = (Dbtup*)globalData.getBlock(DBTUP);
- ndbrequire(c_tup != 0);
+ ndbrequire((c_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0);
+ ndbrequire((c_lqh = (Dblqh*)globalData.getBlock(DBLQH)) != 0);
break;
}
tuserblockref = signal->theData[3];
@@ -435,9 +448,7 @@ void Dbacc::initialiseOperationRec(Signal* signal)
for (operationRecPtr.i = 0; operationRecPtr.i < coprecsize; operationRecPtr.i++) {
refresh_watch_dog();
ptrAss(operationRecPtr, operationrec);
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- operationRecPtr.p->opState = FREE_OP;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
operationRecPtr.p->nextOp = operationRecPtr.i + 1;
}//for
operationRecPtr.i = coprecsize - 1;
@@ -899,8 +910,7 @@ void Dbacc::execACCSEIZEREQ(Signal* signal)
ptrGuard(operationRecPtr);
operationRecPtr.p->userptr = tuserptr;
operationRecPtr.p->userblockref = tuserblockref;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- operationRecPtr.p->transactionstate = IDLE;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
/* ******************************< */
/* ACCSEIZECONF */
/* ******************************< */
@@ -954,51 +964,32 @@ void Dbacc::initOpRec(Signal* signal)
operationRecPtr.p->xfrmtupkeylen = signal->theData[4];
operationRecPtr.p->transId1 = signal->theData[5];
operationRecPtr.p->transId2 = signal->theData[6];
- operationRecPtr.p->transactionstate = ACTIVE;
- operationRecPtr.p->commitDeleteCheckFlag = ZFALSE;
- operationRecPtr.p->operation = Treqinfo & 0x7;
- /* --------------------------------------------------------------------------------- */
- // opSimple is not used in this version. Is needed for deadlock handling later on.
- /* --------------------------------------------------------------------------------- */
- // operationRecPtr.p->opSimple = (Treqinfo >> 3) & 0x1;
-
- operationRecPtr.p->lockMode = (Treqinfo >> 4) & 0x3;
Uint32 readFlag = (((Treqinfo >> 4) & 0x3) == 0); // Only 1 if Read
Uint32 dirtyFlag = (((Treqinfo >> 6) & 0x1) == 1); // Only 1 if Dirty
Uint32 dirtyReadFlag = readFlag & dirtyFlag;
- operationRecPtr.p->dirtyRead = dirtyReadFlag;
- operationRecPtr.p->nodeType = (Treqinfo >> 7) & 0x3;
+ Uint32 opbits = 0;
+ opbits |= Treqinfo & 0x7;
+ opbits |= ((Treqinfo >> 4) & 0x3) ? Operationrec::OP_LOCK_MODE : 0;
+ opbits |= ((Treqinfo >> 4) & 0x3) ? Operationrec::OP_ACC_LOCK_MODE : 0;
+ opbits |= (dirtyReadFlag) ? Operationrec::OP_DIRTY_READ : 0;
+ opbits |= ((Treqinfo >> 31) & 0x1) ? Operationrec::OP_LOCK_REQ : 0;
+
+ //operationRecPtr.p->nodeType = (Treqinfo >> 7) & 0x3;
operationRecPtr.p->fid = fragrecptr.p->myfid;
operationRecPtr.p->fragptr = fragrecptr.i;
operationRecPtr.p->nextParallelQue = RNIL;
operationRecPtr.p->prevParallelQue = RNIL;
- operationRecPtr.p->prevQueOp = RNIL;
- operationRecPtr.p->nextQueOp = RNIL;
operationRecPtr.p->nextSerialQue = RNIL;
operationRecPtr.p->prevSerialQue = RNIL;
operationRecPtr.p->elementPage = RNIL;
- operationRecPtr.p->keyinfoPage = RNIL;
- operationRecPtr.p->lockOwner = ZFALSE;
- operationRecPtr.p->insertIsDone = ZFALSE;
- operationRecPtr.p->elementIsDisappeared = ZFALSE;
- operationRecPtr.p->insertDeleteLen = fragrecptr.p->elementLength;
- operationRecPtr.p->longPagePtr = RNIL;
- operationRecPtr.p->longKeyPageIndex = RNIL;
operationRecPtr.p->scanRecPtr = RNIL;
+ operationRecPtr.p->m_op_bits = opbits;
// bit to mark lock operation
- operationRecPtr.p->isAccLockReq = (Treqinfo >> 31) & 0x1;
// undo log is not run via ACCKEYREQ
- if(ERROR_INSERTED(5900) || ERROR_INSERTED(5901))
- {
- for(unsigned i = 0; i<8 && i<signal->theData[4]; i++){
- operationRecPtr.p->keydata[i] = signal->theData[i+7];
- }
- }
-
}//Dbacc::initOpRec()
/* --------------------------------------------------------------------------------- */
@@ -1007,7 +998,7 @@ void Dbacc::initOpRec(Signal* signal)
void Dbacc::sendAcckeyconf(Signal* signal)
{
signal->theData[0] = operationRecPtr.p->userptr;
- signal->theData[1] = operationRecPtr.p->operation;
+ signal->theData[1] = operationRecPtr.p->m_op_bits & Operationrec::OP_MASK;
signal->theData[2] = operationRecPtr.p->fid;
signal->theData[3] = operationRecPtr.p->localdata[0];
signal->theData[4] = operationRecPtr.p->localdata[1];
@@ -1015,7 +1006,8 @@ void Dbacc::sendAcckeyconf(Signal* signal)
}//Dbacc::sendAcckeyconf()
-void Dbacc::ACCKEY_error(Uint32 fromWhere)
+void
+Dbacc::ACCKEY_error(Uint32 fromWhere)
{
switch(fromWhere) {
case 0:
@@ -1069,7 +1061,8 @@ void Dbacc::execACCKEYREQ(Signal* signal)
}//if
ptrAss(operationRecPtr, operationrec);
ptrAss(fragrecptr, fragmentrec);
- ndbrequire(operationRecPtr.p->transactionstate == IDLE);
+
+ ndbrequire(operationRecPtr.p->m_op_bits == Operationrec::OP_INITIAL);
initOpRec(signal);
// normalize key if any char attr
@@ -1083,23 +1076,32 @@ void Dbacc::execACCKEYREQ(Signal* signal)
/* WE REMEMBER THESE ADDRESS IF WE LATER NEED TO INSERT */
/* THE ITEM AFTER NOT FINDING THE ITEM. */
/*---------------------------------------------------------------*/
- getElement(signal);
-
- if (tgeResult == ZTRUE) {
- switch (operationRecPtr.p->operation) {
+ OperationrecPtr lockOwnerPtr;
+ const Uint32 found = getElement(signal, lockOwnerPtr);
+
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ if (found == ZTRUE)
+ {
+ switch (op) {
case ZREAD:
case ZUPDATE:
case ZDELETE:
case ZWRITE:
case ZSCAN_OP:
- if (!tgeLocked){
- if(operationRecPtr.p->operation == ZWRITE)
+ if (!lockOwnerPtr.p)
+ {
+ if(op == ZWRITE)
{
jam();
- operationRecPtr.p->operation = ZUPDATE;
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits |= (op = ZUPDATE);
+ operationRecPtr.p->m_op_bits = opbits; // store to get correct ACCKEYCONF
}
+ opbits |= Operationrec::OP_STATE_RUNNING;
+ opbits |= Operationrec::OP_RUN_QUEUE;
sendAcckeyconf(signal);
- if (operationRecPtr.p->dirtyRead == ZFALSE) {
+ if (! (opbits & Operationrec::OP_DIRTY_READ)) {
/*---------------------------------------------------------------*/
// It is not a dirty read. We proceed by locking and continue with
// the operation.
@@ -1116,42 +1118,44 @@ void Dbacc::execACCKEYREQ(Signal* signal)
dbgWord32(gePageptr, tgeElementptr, eh);
gePageptr.p->word32[tgeElementptr] = eh;
- insertLockOwnersList(signal , operationRecPtr);
- return;
+ opbits |= Operationrec::OP_LOCK_OWNER;
+ insertLockOwnersList(signal, operationRecPtr);
} else {
jam();
/*---------------------------------------------------------------*/
// It is a dirty read. We do not lock anything. Set state to
// IDLE since no COMMIT call will come.
/*---------------------------------------------------------------*/
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- return;
+ opbits = Operationrec::OP_EXECUTED_DIRTY_READ;
}//if
+ operationRecPtr.p->m_op_bits = opbits;
+ return;
} else {
jam();
- accIsLockedLab(signal);
+ accIsLockedLab(signal, lockOwnerPtr);
return;
}//if
break;
case ZINSERT:
jam();
- insertExistElemLab(signal);
+ insertExistElemLab(signal, lockOwnerPtr);
return;
break;
default:
ndbrequire(false);
break;
}//switch
- } else if (tgeResult == ZFALSE) {
- switch (operationRecPtr.p->operation) {
- case ZINSERT:
+ } else if (found == ZFALSE) {
+ switch (op){
case ZWRITE:
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits |= (op = ZINSERT);
+ case ZINSERT:
jam();
- // If a write operation makes an insert we switch operation to ZINSERT so
- // that the commit-method knows an insert has been made and updates noOfElements.
- operationRecPtr.p->operation = ZINSERT;
- operationRecPtr.p->insertIsDone = ZTRUE;
+ opbits |= Operationrec::OP_INSERT_IS_DONE;
+ opbits |= Operationrec::OP_STATE_RUNNING;
+ opbits |= Operationrec::OP_RUN_QUEUE;
+ operationRecPtr.p->m_op_bits = opbits;
insertelementLab(signal);
return;
break;
@@ -1169,13 +1173,287 @@ void Dbacc::execACCKEYREQ(Signal* signal)
}//switch
} else {
jam();
- acckeyref1Lab(signal, tgeResult);
+ acckeyref1Lab(signal, found);
return;
}//if
return;
}//Dbacc::execACCKEYREQ()
void
+Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI)
+{
+ jamEntry();
+ OperationrecPtr lastOp;
+ lastOp.i = opPtrI;
+ ptrCheckGuard(lastOp, coprecsize, operationrec);
+ Uint32 opbits = lastOp.p->m_op_bits;
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
+
+ if (likely(opbits == Operationrec::OP_EXECUTED_DIRTY_READ))
+ {
+ jam();
+ lastOp.p->m_op_bits = Operationrec::OP_INITIAL;
+ return;
+ }
+ else if (likely(opstate == Operationrec::OP_STATE_RUNNING))
+ {
+ opbits |= Operationrec::OP_STATE_EXECUTED;
+ lastOp.p->m_op_bits = opbits;
+ startNext(signal, lastOp);
+ return;
+ }
+ else
+ {
+ }
+
+ ndbout_c("bits: %.8x state: %.8x", opbits, opstate);
+ ndbrequire(false);
+}
+
+void
+Dbacc::startNext(Signal* signal, OperationrecPtr lastOp)
+{
+ jam();
+ OperationrecPtr nextOp;
+ OperationrecPtr loPtr;
+ nextOp.i = lastOp.p->nextParallelQue;
+ loPtr.i = lastOp.p->m_lock_owner_ptr_i;
+ Uint32 opbits = lastOp.p->m_op_bits;
+
+ if ((opbits & Operationrec::OP_STATE_MASK)!= Operationrec::OP_STATE_EXECUTED)
+ {
+ jam();
+ return;
+ }
+
+ Uint32 nextbits;
+ if (nextOp.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(nextOp, coprecsize, operationrec);
+ nextbits = nextOp.p->m_op_bits;
+ goto checkop;
+ }
+
+ if ((opbits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ jam();
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+ else
+ {
+ jam();
+ loPtr = lastOp;
+ }
+
+ nextOp.i = loPtr.p->nextSerialQue;
+ ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+
+ if (nextOp.i == RNIL)
+ {
+ jam();
+ return;
+ }
+
+ /**
+ * There is an op in serie queue...
+ * Check if it can run
+ */
+ ptrCheckGuard(nextOp, coprecsize, operationrec);
+ nextbits = nextOp.p->m_op_bits;
+
+ {
+ const bool same = nextOp.p->is_same_trans(lastOp.p);
+
+ if (!same && ((opbits & Operationrec::OP_ACC_LOCK_MODE) ||
+ (nextbits & Operationrec::OP_LOCK_MODE)))
+ {
+ jam();
+ /**
+ * Not same transaction
+ * and either last had exclusive lock
+ * or next had exclusive lock
+ */
+ return;
+ }
+
+ /**
+ * same trans and X-lock
+ */
+ if (same && (opbits & Operationrec::OP_ACC_LOCK_MODE))
+ {
+ jam();
+ goto upgrade;
+ }
+ }
+
+ /**
+ * all shared lock...
+ */
+ if ((opbits & Operationrec::OP_ACC_LOCK_MODE) == 0 &&
+ (nextbits & Operationrec::OP_LOCK_MODE) == 0)
+ {
+ jam();
+ goto upgrade;
+ }
+
+ /**
+ * There is a shared parallell queue & and exclusive op is first in queue
+ */
+ ndbassert((opbits & Operationrec::OP_ACC_LOCK_MODE) == 0 &&
+ (nextbits & Operationrec::OP_LOCK_MODE));
+
+ /**
+ * We must check if there are many transactions in parallel queue...
+ */
+ OperationrecPtr tmp;
+ tmp.i = loPtr.p->nextParallelQue;
+ while (tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ if (!nextOp.p->is_same_trans(tmp.p))
+ {
+ jam();
+ /**
+ * parallel queue contained another transaction, dont let it run
+ */
+ return;
+ }
+ }
+
+upgrade:
+ /**
+ * Move first op in serie queue to end of parallell queue
+ */
+
+ tmp.i = loPtr.p->nextSerialQue = nextOp.p->nextSerialQue;
+ loPtr.p->m_lo_last_parallel_op_ptr_i = nextOp.i;
+ nextOp.p->nextSerialQue = RNIL;
+ nextOp.p->prevSerialQue = RNIL;
+ nextOp.p->m_lock_owner_ptr_i = loPtr.i;
+ nextOp.p->prevParallelQue = lastOp.i;
+ lastOp.p->nextParallelQue = nextOp.i;
+
+ if (tmp.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ tmp.p->prevSerialQue = loPtr.i;
+ }
+ else
+ {
+ jam();
+ loPtr.p->m_lo_last_serial_op_ptr_i = RNIL;
+ }
+
+ nextbits |= Operationrec::OP_RUN_QUEUE;
+
+ /**
+ * Currently no grouping of ops in serie queue
+ */
+ ndbrequire(nextOp.p->nextParallelQue == RNIL);
+
+checkop:
+ Uint32 errCode = 0;
+ OperationrecPtr save = operationRecPtr;
+ operationRecPtr = nextOp;
+
+ Uint32 lastop = opbits & Operationrec::OP_MASK;
+ Uint32 nextop = nextbits & Operationrec::OP_MASK;
+
+ nextbits &= nextbits & ~(Uint32)Operationrec::OP_STATE_MASK;
+ nextbits |= Operationrec::OP_STATE_RUNNING;
+
+ if (lastop == ZDELETE)
+ {
+ jam();
+ if (nextop != ZINSERT && nextop != ZWRITE)
+ {
+ errCode = ZREAD_ERROR;
+ goto ref;
+ }
+
+ nextbits &= ~(Uint32)Operationrec::OP_MASK;
+ nextbits &= ~(Uint32)Operationrec::OP_ELEMENT_DISAPPEARED;
+ nextbits |= (nextop = ZINSERT);
+ nextbits |= Operationrec::OP_INSERT_IS_DONE;
+ goto conf;
+ }
+ else if (nextop == ZINSERT)
+ {
+ jam();
+ errCode = ZWRITE_ERROR;
+ goto ref;
+ }
+ else if (nextop == ZWRITE)
+ {
+ jam();
+ nextbits &= ~(Uint32)Operationrec::OP_MASK;
+ nextbits |= (nextop = ZUPDATE);
+ goto conf;
+ }
+ else
+ {
+ jam();
+ }
+
+conf:
+ nextOp.p->m_op_bits = nextbits;
+ nextOp.p->localdata[0] = lastOp.p->localdata[0];
+ nextOp.p->localdata[1] = lastOp.p->localdata[1];
+
+ if (nextop == ZSCAN_OP && (nextbits & Operationrec::OP_LOCK_REQ) == 0)
+ {
+ jam();
+ takeOutScanLockQueue(nextOp.p->scanRecPtr);
+ putReadyScanQueue(signal, nextOp.p->scanRecPtr);
+ }
+ else
+ {
+ jam();
+ fragrecptr.i = nextOp.p->fragptr;
+ ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
+
+ sendAcckeyconf(signal);
+ sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF,
+ signal, 6, JBB);
+ }
+
+ operationRecPtr = save;
+ return;
+
+ref:
+ nextOp.p->m_op_bits = nextbits;
+
+ if (nextop == ZSCAN_OP && (nextbits & Operationrec::OP_LOCK_REQ) == 0)
+ {
+ jam();
+ nextOp.p->m_op_bits |= Operationrec::OP_ELEMENT_DISAPPEARED;
+ takeOutScanLockQueue(nextOp.p->scanRecPtr);
+ putReadyScanQueue(signal, nextOp.p->scanRecPtr);
+ }
+ else
+ {
+ jam();
+ signal->theData[0] = nextOp.p->userptr;
+ signal->theData[1] = errCode;
+ sendSignal(nextOp.p->userblockref, GSN_ACCKEYREF, signal,
+ 2, JBB);
+ }
+
+ operationRecPtr = save;
+ return;
+}
+
+
+#if 0
+void
+Dbacc::execACCKEY_REP_REF(Signal* signal, Uint32 opPtrI)
+{
+}
+#endif
+
+void
Dbacc::xfrmKeyData(Signal* signal)
{
Uint32 table = fragrecptr.p->myTableId;
@@ -1188,23 +1466,22 @@ Dbacc::xfrmKeyData(Signal* signal)
operationRecPtr.p->xfrmtupkeylen = len;
}
-void Dbacc::accIsLockedLab(Signal* signal)
+void
+Dbacc::accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr)
{
ndbrequire(csystemRestart == ZFALSE);
- queOperPtr.i = ElementHeader::getOpPtrI(gePageptr.p->word32[tgeElementptr]);
- ptrCheckGuard(queOperPtr, coprecsize, operationrec);
- if (operationRecPtr.p->dirtyRead == ZFALSE) {
+
+ Uint32 bits = operationRecPtr.p->m_op_bits;
+ validate_lock_queue(lockOwnerPtr);
+
+ if ((bits & Operationrec::OP_DIRTY_READ) == 0){
Uint32 return_result;
- if (operationRecPtr.p->lockMode == ZREADLOCK) {
+ if ((bits & Operationrec::OP_LOCK_MODE) == ZREADLOCK) {
jam();
- priPageptr = gePageptr;
- tpriElementptr = tgeElementptr;
- return_result = placeReadInLockQueue(signal);
+ return_result = placeReadInLockQueue(lockOwnerPtr);
} else {
jam();
- pwiPageptr = gePageptr;
- tpwiElementptr = tgeElementptr;
- return_result = placeWriteInLockQueue(signal);
+ return_result = placeWriteInLockQueue(lockOwnerPtr);
}//if
if (return_result == ZPARALLEL_QUEUE) {
jam();
@@ -1214,24 +1491,29 @@ void Dbacc::accIsLockedLab(Signal* signal)
jam();
signal->theData[0] = RNIL;
return;
- } else if (return_result == ZWRITE_ERROR) {
+ } else {
jam();
acckeyref1Lab(signal, return_result);
return;
}//if
ndbrequire(false);
- } else {
- if (queOperPtr.p->elementIsDisappeared == ZFALSE) {
+ }
+ else
+ {
+ if (!(lockOwnerPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED) &&
+ lockOwnerPtr.p->localdata[0] != ~(Uint32)0)
+ {
jam();
- /*---------------------------------------------------------------*/
- // It is a dirty read. We do not lock anything. Set state to
- // IDLE since no COMMIT call will arrive.
- /*---------------------------------------------------------------*/
+ /* ---------------------------------------------------------------
+ * It is a dirty read. We do not lock anything. Set state to
+ *IDLE since no COMMIT call will arrive.
+ * ---------------------------------------------------------------*/
sendAcckeyconf(signal);
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_EXECUTED_DIRTY_READ;
return;
- } else {
+ }
+ else
+ {
jam();
/*---------------------------------------------------------------*/
// The tuple does not exist in the committed world currently.
@@ -1243,17 +1525,18 @@ void Dbacc::accIsLockedLab(Signal* signal)
}//if
}//Dbacc::accIsLockedLab()
-/* --------------------------------------------------------------------------------- */
-/* I N S E R T E X I S T E L E M E N T */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::insertExistElemLab(Signal* signal)
+/* ------------------------------------------------------------------------ */
+/* I N S E R T E X I S T E L E M E N T */
+/* ------------------------------------------------------------------------ */
+void Dbacc::insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr)
{
- if (!tgeLocked){
+ if (!lockOwnerPtr.p)
+ {
jam();
acckeyref1Lab(signal, ZWRITE_ERROR);/* THE ELEMENT ALREADY EXIST */
return;
}//if
- accIsLockedLab(signal);
+ accIsLockedLab(signal, lockOwnerPtr);
}//Dbacc::insertExistElemLab()
/* --------------------------------------------------------------------------------- */
@@ -1271,26 +1554,8 @@ void Dbacc::insertelementLab(Signal* signal)
}//if
}//if
ndbrequire(operationRecPtr.p->tupkeylen <= fragrecptr.p->keyLength);
-
- Uint32 localKey;
- if(!operationRecPtr.p->isAccLockReq)
- {
- signal->theData[0] = operationRecPtr.p->userptr;
- Uint32 blockNo = refToBlock(operationRecPtr.p->userblockref);
- EXECUTE_DIRECT(blockNo, GSN_LQH_ALLOCREQ, signal, 1);
- jamEntry();
- if (signal->theData[0] != 0) {
- jam();
- Uint32 result_code = signal->theData[0];
- acckeyref1Lab(signal, result_code);
- return;
- }//if
- localKey = (signal->theData[1] << MAX_TUPLES_BITS) + signal->theData[2];
- }
- else
- {
- localKey = signal->theData[7];
- }
+ ndbassert(!(operationRecPtr.p->m_op_bits & Operationrec::OP_LOCK_REQ));
+ Uint32 localKey = ~(Uint32)0;
insertLockOwnersList(signal, operationRecPtr);
@@ -1305,196 +1570,18 @@ void Dbacc::insertelementLab(Signal* signal)
idrOperationRecPtr = operationRecPtr;
clocalkey[0] = localKey;
operationRecPtr.p->localdata[0] = localKey;
- /* --------------------------------------------------------------------------------- */
- /* WE SET THE LOCAL KEY TO MINUS ONE TO INDICATE IT IS NOT YET VALID. */
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
+ /* WE SET THE LOCAL KEY TO MINUS ONE TO INDICATE IT IS NOT YET VALID. */
+ /* ----------------------------------------------------------------------- */
insertElement(signal);
sendAcckeyconf(signal);
return;
}//Dbacc::insertelementLab()
-/* --------------------------------------------------------------------------------- */
-/* PLACE_READ_IN_LOCK_QUEUE */
-/* INPUT: OPERATION_REC_PTR OUR OPERATION POINTER */
-/* QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER */
-/* PRI_PAGEPTR PAGE POINTER OF ELEMENT */
-/* TPRI_ELEMENTPTR ELEMENT POINTER OF ELEMENT */
-/* OUTPUT TRESULT = */
-/* ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE */
-/* OPERATION CAN PROCEED NOW. */
-/* ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE */
-/* ERROR CODE OPERATION NEEDS ABORTING */
-/* THE ELEMENT WAS LOCKED AND WE WANT TO READ THE TUPLE. WE WILL CHECK THE LOCK */
-/* QUEUES TO PERFORM THE PROPER ACTION. */
-/* */
-/* IN SOME PLACES IN THE CODE BELOW THAT HANDLES WHAT TO DO WHEN THE TUPLE IS LOCKED */
-/* WE DO ASSUME THAT NEXT_PARALLEL_QUEUE AND NEXT_SERIAL_QUEUE ON OPERATION_REC_PTR */
-/* HAVE BEEN INITIALISED TO RNIL. THUS WE DO NOT PERFORM THIS ONCE MORE EVEN IF IT */
-/* COULD BE NICE FOR READABILITY. */
-/* --------------------------------------------------------------------------------- */
-Uint32 Dbacc::placeReadInLockQueue(Signal* signal)
-{
- if (getNoParallelTransaction(queOperPtr.p) == 1) {
- if ((queOperPtr.p->transId1 == operationRecPtr.p->transId1) &&
- (queOperPtr.p->transId2 == operationRecPtr.p->transId2)) {
- /* --------------------------------------------------------------------------------- */
- /* WE ARE PERFORMING A READ OPERATION AND THIS TRANSACTION ALREADY OWNS THE LOCK */
- /* ALONE. PUT THE OPERATION LAST IN THE PARALLEL QUEUE. */
- /* --------------------------------------------------------------------------------- */
- jam();
- mlpqOperPtr = queOperPtr;
- moveLastParallelQueue(signal);
- operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1];
- operationRecPtr.p->prevParallelQue = mlpqOperPtr.i;
- mlpqOperPtr.p->nextParallelQue = operationRecPtr.i;
- switch (queOperPtr.p->lockMode) {
- case ZREADLOCK:
- jam();
- /*empty*/;
- break;
- default:
- jam();
- /* --------------------------------------------------------------------------------- */
- /* IF THE TRANSACTION PREVIOUSLY SET A WRITE LOCK WE MUST ENSURE THAT ALL */
- /* OPERATIONS IN THE PARALLEL QUEUE HAVE WRITE LOCK MODE TO AVOID STRANGE BUGS.*/
- /* --------------------------------------------------------------------------------- */
- operationRecPtr.p->lockMode = queOperPtr.p->lockMode;
- break;
- }//switch
- return ZPARALLEL_QUEUE;
- }//if
- }//if
- if (queOperPtr.p->nextSerialQue == RNIL) {
- /* --------------------------------------------------------------------------------- */
- /* WE ARE PERFORMING A READ OPERATION AND THERE IS NO SERIAL QUEUE. IF THERE IS NO */
- /* WRITE OPERATION THAT OWNS THE LOCK OR ANY WRITE OPERATION IN THE PARALLEL QUEUE */
- /* IT IS ENOUGH TO CHECK THE LOCK MODE OF THE LEADER IN THE PARALLEL QUEUE. IF IT IS */
- /* A READ LOCK THEN WE PLACE OURSELVES IN THE PARALLEL QUEUE OTHERWISE WE GO ON TO */
- /* PLACE OURSELVES IN THE SERIAL QUEUE. */
- /* --------------------------------------------------------------------------------- */
- switch (queOperPtr.p->lockMode) {
- case ZREADLOCK:
- jam();
- mlpqOperPtr = queOperPtr;
- moveLastParallelQueue(signal);
- operationRecPtr.p->prevParallelQue = mlpqOperPtr.i;
- mlpqOperPtr.p->nextParallelQue = operationRecPtr.i;
- operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1];
- return ZPARALLEL_QUEUE;
- default:
- jam();
- queOperPtr.p->nextSerialQue = operationRecPtr.i;
- operationRecPtr.p->prevSerialQue = queOperPtr.i;
- break;
- }//switch
- } else {
- jam();
- placeSerialQueueRead(signal);
- }//if
- return ZSERIAL_QUEUE;
-}//Dbacc::placeReadInLockQueue()
-
-/* --------------------------------------------------------------------------------- */
-/* WE WILL CHECK IF THIS TRANSACTION IS ALREADY PLACED AT SOME SPOT IN THE PARALLEL */
-/* SERIAL QUEUE WITHOUT ANY NEIGHBORS FROM OTHER TRANSACTION. IF SO WE WILL INSERT */
-/* IT IN THAT PARALLEL QUEUE. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::placeSerialQueueRead(Signal* signal)
-{
- readWriteOpPtr.i = queOperPtr.p->nextSerialQue;
- ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec);
- PSQR_LOOP:
- jam();
- if (readWriteOpPtr.p->nextSerialQue == RNIL) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THERE WAS NO PREVIOUS OPERATION IN THIS TRANSACTION WHICH WE COULD PUT IT */
- /* IN THE PARALLEL QUEUE TOGETHER WITH. */
- /* --------------------------------------------------------------------------------- */
- checkOnlyReadEntry(signal);
- return;
- }//if
- if (getNoParallelTransaction(readWriteOpPtr.p) == 1) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */
- /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */
- /* --------------------------------------------------------------------------------- */
- if ((readWriteOpPtr.p->transId1 == operationRecPtr.p->transId1) &&
- (readWriteOpPtr.p->transId2 == operationRecPtr.p->transId2)) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* WE ARE PERFORMING A READ IN THE SAME TRANSACTION WHERE WE ALREADY */
- /* PREVIOUSLY HAVE EXECUTED AN OPERATION. INSERT-DELETE, READ-UPDATE, READ-READ, */
- /* UPDATE-UPDATE, UPDATE-DELETE, READ-DELETE, INSERT-READ, INSERT-UPDATE ARE ALLOWED */
- /* COMBINATIONS. A NEW INSERT AFTER A DELETE IS NOT ALLOWED AND SUCH AN INSERT WILL */
- /* GO TO THE SERIAL LOCK QUEUE WHICH IT WILL NOT LEAVE UNTIL A TIME-OUT AND THE */
- /* TRANSACTION IS ABORTED. READS AND UPDATES AFTER DELETES IS ALSO NOT ALLOWED. */
- /* --------------------------------------------------------------------------------- */
- mlpqOperPtr = readWriteOpPtr;
- moveLastParallelQueue(signal);
- readWriteOpPtr = mlpqOperPtr;
- operationRecPtr.p->prevParallelQue = readWriteOpPtr.i;
- readWriteOpPtr.p->nextParallelQue = operationRecPtr.i;
- operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1];
- switch (readWriteOpPtr.p->lockMode) {
- case ZREADLOCK:
- jam();
- /*empty*/;
- break;
- default:
- jam();
- /* --------------------------------------------------------------------------------- */
- /* IF THE TRANSACTION PREVIOUSLY SET A WRITE LOCK WE MUST ENSURE THAT ALL */
- /* OPERATIONS IN THE PARALLEL QUEUE HAVE WRITE LOCK MODE TO AVOID STRANGE BUGS.*/
- /* --------------------------------------------------------------------------------- */
- operationRecPtr.p->lockMode = readWriteOpPtr.p->lockMode;
- break;
- }//switch
- return;
- }//if
- }//if
- readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue;
- ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec);
- goto PSQR_LOOP;
-}//Dbacc::placeSerialQueueRead()
-
-/* --------------------------------------------------------------------------------- */
-/* WE WILL CHECK IF THE LAST ENTRY IN THE SERIAL QUEUE CONTAINS ONLY READ */
-/* OPERATIONS. IF SO WE WILL INSERT IT IN THAT PARALLEL QUEUE. OTHERWISE WE */
-/* WILL PLACE IT AT THE END OF THE SERIAL QUEUE. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::checkOnlyReadEntry(Signal* signal)
-{
- switch (readWriteOpPtr.p->lockMode) {
- case ZREADLOCK:
- jam();
- /* --------------------------------------------------------------------------------- */
- /* SINCE THIS LAST QUEUE ONLY CONTAINS READ LOCKS WE CAN JOIN THE PARALLEL QUEUE AT */
- /* THE END. */
- /* --------------------------------------------------------------------------------- */
- mlpqOperPtr = readWriteOpPtr;
- moveLastParallelQueue(signal);
- readWriteOpPtr = mlpqOperPtr;
- operationRecPtr.p->prevParallelQue = readWriteOpPtr.i;
- readWriteOpPtr.p->nextParallelQue = operationRecPtr.i;
- operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1];
- break;
- default:
- jam(); /* PUT THE OPERATION RECORD IN THE SERIAL QUEUE */
- readWriteOpPtr.p->nextSerialQue = operationRecPtr.i;
- operationRecPtr.p->prevSerialQue = readWriteOpPtr.i;
- break;
- }//switch
-}//Dbacc::checkOnlyReadEntry()
-/* --------------------------------------------------------------------------------- */
-/* GET_NO_PARALLEL_TRANSACTION */
-/* --------------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------ */
+/* GET_NO_PARALLEL_TRANSACTION */
+/* ------------------------------------------------------------------------ */
Uint32
Dbacc::getNoParallelTransaction(const Operationrec * op)
{
@@ -1514,142 +1601,636 @@ Dbacc::getNoParallelTransaction(const Operationrec * op)
return 1;
}//Dbacc::getNoParallelTransaction()
-void Dbacc::moveLastParallelQueue(Signal* signal)
+#ifdef VM_TRACE
+Uint32
+Dbacc::getNoParallelTransactionFull(const Operationrec * op)
{
- while (mlpqOperPtr.p->nextParallelQue != RNIL) {
- jam();
- mlpqOperPtr.i = mlpqOperPtr.p->nextParallelQue;
- ptrCheckGuard(mlpqOperPtr, coprecsize, operationrec);
- }//if
-}//Dbacc::moveLastParallelQueue()
+ ConstPtr<Operationrec> tmp;
+
+ tmp.p = op;
+ while ((tmp.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ tmp.i = tmp.p->prevParallelQue;
+ if (tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return getNoParallelTransaction(tmp.p);
+}
+#endif
+
+#ifdef ACC_SAFE_QUEUE
-void Dbacc::moveLastParallelQueueWrite(Signal* signal)
+Uint32
+Dbacc::get_parallel_head(OperationrecPtr opPtr)
{
- /* --------------------------------------------------------------------------------- */
- /* ENSURE THAT ALL OPERATIONS HAVE LOCK MODE SET TO WRITE SINCE WE INSERT A */
- /* WRITE LOCK INTO THE PARALLEL QUEUE. */
- /* --------------------------------------------------------------------------------- */
- while (mlpqOperPtr.p->nextParallelQue != RNIL) {
- jam();
- mlpqOperPtr.p->lockMode = operationRecPtr.p->lockMode;
- mlpqOperPtr.i = mlpqOperPtr.p->nextParallelQue;
- ptrCheckGuard(mlpqOperPtr, coprecsize, operationrec);
- }//if
- mlpqOperPtr.p->lockMode = operationRecPtr.p->lockMode;
-}//Dbacc::moveLastParallelQueueWrite()
+ while ((opPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 &&
+ opPtr.p->prevParallelQue != RNIL)
+ {
+ opPtr.i = opPtr.p->prevParallelQue;
+ ptrCheckGuard(opPtr, coprecsize, operationrec);
+ }
+
+ return opPtr.i;
+}
-/* --------------------------------------------------------------------------------- */
-/* PLACE_WRITE_IN_LOCK_QUEUE */
-/* INPUT: OPERATION_REC_PTR OUR OPERATION POINTER */
-/* QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER */
-/* PWI_PAGEPTR PAGE POINTER OF ELEMENT */
-/* TPWI_ELEMENTPTR ELEMENT POINTER OF ELEMENT */
-/* OUTPUT TRESULT = */
-/* ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE */
-/* OPERATION CAN PROCEED NOW. */
-/* ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE */
-/* ERROR CODE OPERATION NEEDS ABORTING */
-/* --------------------------------------------------------------------------------- */
-Uint32 Dbacc::placeWriteInLockQueue(Signal* signal)
+bool
+Dbacc::validate_lock_queue(OperationrecPtr opPtr)
+{
+ OperationrecPtr loPtr;
+ loPtr.i = get_parallel_head(opPtr);
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+
+ while((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 &&
+ loPtr.p->prevSerialQue != RNIL)
+ {
+ loPtr.i = loPtr.p->prevSerialQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+
+ // Now we have lock owner...
+ vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_RUN_QUEUE);
+
+ // 1 Validate page pointer
+ {
+ Page8Ptr pagePtr;
+ pagePtr.i = loPtr.p->elementPage;
+ ptrCheckGuard(pagePtr, cpagesize, page8);
+ arrGuard(loPtr.p->elementPointer, 2048);
+ Uint32 eh = pagePtr.p->word32[loPtr.p->elementPointer];
+ vlqrequire(ElementHeader::getLocked(eh));
+ vlqrequire(ElementHeader::getOpPtrI(eh) == loPtr.i);
+ }
+
+ // 2 Lock owner should always have same LOCK_MODE and ACC_LOCK_MODE
+ if (loPtr.p->m_op_bits & Operationrec::OP_LOCK_MODE)
+ {
+ vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE);
+ }
+ else
+ {
+ vlqrequire((loPtr.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE) == 0);
+ }
+
+ // 3 Lock owner should never be waiting...
+ bool running = false;
+ {
+ Uint32 opstate = loPtr.p->m_op_bits & Operationrec::OP_STATE_MASK;
+ if (opstate == Operationrec::OP_STATE_RUNNING)
+ running = true;
+ else
+ {
+ vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED);
+ }
+ }
+
+ // Validate parallel queue
+ {
+ bool many = false;
+ bool orlockmode = loPtr.p->m_op_bits & Operationrec::OP_LOCK_MODE;
+ OperationrecPtr lastP = loPtr;
+
+ while (lastP.p->nextParallelQue != RNIL)
+ {
+ Uint32 prev = lastP.i;
+ lastP.i = lastP.p->nextParallelQue;
+ ptrCheckGuard(lastP, coprecsize, operationrec);
+
+ vlqrequire(lastP.p->prevParallelQue == prev);
+
+ Uint32 opbits = lastP.p->m_op_bits;
+ many |= loPtr.p->is_same_trans(lastP.p) ? 0 : 1;
+ orlockmode |= !!(opbits & Operationrec::OP_LOCK_MODE);
+
+ vlqrequire(opbits & Operationrec::OP_RUN_QUEUE);
+ vlqrequire((opbits & Operationrec::OP_LOCK_OWNER) == 0);
+
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
+ if (running)
+ {
+ // If I found a running operation,
+ // all following should be waiting
+ vlqrequire(opstate == Operationrec::OP_STATE_WAITING);
+ }
+ else
+ {
+ if (opstate == Operationrec::OP_STATE_RUNNING)
+ running = true;
+ else
+ vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED);
+ }
+
+ if (lastP.p->m_op_bits & Operationrec::OP_LOCK_MODE)
+ {
+ vlqrequire(lastP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE);
+ }
+ else
+ {
+ vlqrequire((lastP.p->m_op_bits && orlockmode) == orlockmode);
+ vlqrequire((lastP.p->m_op_bits & Operationrec::OP_MASK) == ZREAD ||
+ (lastP.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP);
+ }
+
+ if (many)
+ {
+ vlqrequire(orlockmode == 0);
+ }
+ }
+
+ if (lastP.i != loPtr.i)
+ {
+ vlqrequire(loPtr.p->m_lo_last_parallel_op_ptr_i == lastP.i);
+ vlqrequire(lastP.p->m_lock_owner_ptr_i == loPtr.i);
+ }
+ else
+ {
+ vlqrequire(loPtr.p->m_lo_last_parallel_op_ptr_i == RNIL);
+ }
+ }
+
+ // Validate serie queue
+ if (loPtr.p->nextSerialQue != RNIL)
+ {
+ Uint32 prev = loPtr.i;
+ OperationrecPtr lastS;
+ lastS.i = loPtr.p->nextSerialQue;
+ while (true)
+ {
+ ptrCheckGuard(lastS, coprecsize, operationrec);
+ vlqrequire(lastS.p->prevSerialQue == prev);
+ vlqrequire(getNoParallelTransaction(lastS.p) == 1);
+ vlqrequire((lastS.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0);
+ vlqrequire((lastS.p->m_op_bits & Operationrec::OP_RUN_QUEUE) == 0);
+ vlqrequire((lastS.p->m_op_bits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_WAITING);
+ if (lastS.p->nextSerialQue == RNIL)
+ break;
+ prev = lastS.i;
+ lastS.i = lastS.p->nextSerialQue;
+ }
+
+ vlqrequire(loPtr.p->m_lo_last_serial_op_ptr_i == lastS.i);
+ }
+ else
+ {
+ vlqrequire(loPtr.p->m_lo_last_serial_op_ptr_i == RNIL);
+ }
+ return true;
+}
+
+NdbOut&
+operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr)
{
- if (!((getNoParallelTransaction(queOperPtr.p) == 1) &&
- (queOperPtr.p->transId1 == operationRecPtr.p->transId1) &&
- (queOperPtr.p->transId2 == operationRecPtr.p->transId2))) {
+ Uint32 opbits = ptr.p->m_op_bits;
+ out << "[ " << dec << ptr.i
+ << " [ " << hex << ptr.p->transId1
+ << " " << hex << ptr.p->transId2 << "] "
+ << " bits: H'" << hex << opbits << " ";
+
+ bool read = false;
+ switch(opbits & Dbacc::Operationrec::OP_MASK){
+ case ZREAD: out << "READ "; read = true; break;
+ case ZINSERT: out << "INSERT "; break;
+ case ZUPDATE: out << "UPDATE "; break;
+ case ZDELETE: out << "DELETE "; break;
+ case ZWRITE: out << "WRITE "; break;
+ case ZSCAN_OP: out << "SCAN "; read = true; break;
+ default:
+ out << "<Unknown: H'"
+ << hex << (opbits & Dbacc::Operationrec::OP_MASK)
+ << "> ";
+ }
+
+ if (read)
+ {
+ if (opbits & Dbacc::Operationrec::OP_LOCK_MODE)
+ out << "(X)";
+ else
+ out << "(S)";
+ if (opbits & Dbacc::Operationrec::OP_ACC_LOCK_MODE)
+ out << "(X)";
+ else
+ out << "(S)";
+ }
+
+ if (opbits)
+ {
+ out << "(RQ)";
+ }
+
+ switch(opbits & Dbacc::Operationrec::OP_STATE_MASK){
+ case Dbacc::Operationrec::OP_STATE_WAITING:
+ out << " WAITING "; break;
+ case Dbacc::Operationrec::OP_STATE_RUNNING:
+ out << " RUNNING "; break;
+ case Dbacc::Operationrec::OP_STATE_EXECUTED:
+ out << " EXECUTED "; break;
+ case Dbacc::Operationrec::OP_STATE_IDLE:
+ out << " IDLE "; break;
+ default:
+ out << " <Unknown: H'"
+ << hex << (opbits & Dbacc::Operationrec::OP_STATE_MASK)
+ << "> ";
+ }
+
+/*
+ OP_MASK = 0x000F // 4 bits for operation type
+ ,OP_LOCK_MODE = 0x0010 // 0 - shared lock, 1 = exclusive lock
+ ,OP_ACC_LOCK_MODE = 0x0020 // Or:de lock mode of all operation
+ // before me
+ ,OP_LOCK_OWNER = 0x0040
+ ,OP_DIRTY_READ = 0x0080
+ ,OP_LOCK_REQ = 0x0100 // isAccLockReq
+ ,OP_COMMIT_DELETE_CHECK = 0x0200
+ ,OP_INSERT_IS_DONE = 0x0400
+ ,OP_ELEMENT_DISAPPEARED = 0x0800
+
+ ,OP_STATE_MASK = 0xF000
+ ,OP_STATE_IDLE = 0xF000
+ ,OP_STATE_WAITING = 0x0000
+ ,OP_STATE_RUNNING = 0x1000
+ ,OP_STATE_EXECUTED = 0x3000
+ };
+*/
+ if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER)
+ out << "LO ";
+
+ if (opbits & Dbacc::Operationrec::OP_DIRTY_READ)
+ out << "DR ";
+
+ if (opbits & Dbacc::Operationrec::OP_LOCK_REQ)
+ out << "LOCK_REQ ";
+
+ if (opbits & Dbacc::Operationrec::OP_COMMIT_DELETE_CHECK)
+ out << "COMMIT_DELETE_CHECK ";
+
+ if (opbits & Dbacc::Operationrec::OP_INSERT_IS_DONE)
+ out << "INSERT_IS_DONE ";
+
+ if (opbits & Dbacc::Operationrec::OP_ELEMENT_DISAPPEARED)
+ out << "ELEMENT_DISAPPEARED ";
+
+ if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER)
+ {
+ out << "last_parallel: " << dec << ptr.p->m_lo_last_parallel_op_ptr_i << " ";
+ out << "last_serial: " << dec << ptr.p->m_lo_last_serial_op_ptr_i << " ";
+ }
+
+ out << "]";
+ return out;
+}
+
+void
+Dbacc::dump_lock_queue(OperationrecPtr loPtr)
+{
+ if ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 &&
+ loPtr.p->prevParallelQue != RNIL)
+ {
+ loPtr.i = loPtr.p->prevParallelQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+
+ while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 &&
+ loPtr.p->prevSerialQue != RNIL)
+ {
+ loPtr.i = loPtr.p->prevSerialQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+
+ ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ }
+
+ ndbout << "-- HEAD --" << endl;
+ OperationrecPtr tmp = loPtr;
+ while (tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ ndbout << tmp << " ";
+ tmp.i = tmp.p->nextParallelQue;
+
+ if (tmp.i == loPtr.i)
+ {
+ ndbout << " <LOOP>";
+ break;
+ }
+ }
+ ndbout << endl;
+
+ tmp.i = loPtr.p->nextSerialQue;
+ while (tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ OperationrecPtr tmp2 = tmp;
+
+ if (tmp.i == loPtr.i)
+ {
+ ndbout << "<LOOP S>" << endl;
+ break;
+ }
+
+ while (tmp2.i != RNIL)
+ {
+ ptrCheckGuard(tmp2, coprecsize, operationrec);
+ ndbout << tmp2 << " ";
+ tmp2.i = tmp2.p->nextParallelQue;
+
+ if (tmp2.i == tmp.i)
+ {
+ ndbout << "<LOOP 3>";
+ break;
+ }
+ }
+ ndbout << endl;
+ tmp.i = tmp.p->nextSerialQue;
+ }
+}
+#endif
+
+/* -------------------------------------------------------------------------
+ * PLACE_WRITE_IN_LOCK_QUEUE
+ * INPUT: OPERATION_REC_PTR OUR OPERATION POINTER
+ * QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER
+ * PWI_PAGEPTR PAGE POINTER OF ELEMENT
+ * TPWI_ELEMENTPTR ELEMENT POINTER OF ELEMENT
+ * OUTPUT TRESULT =
+ * ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE
+ * OPERATION CAN PROCEED NOW.
+ * ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE
+ * ERROR CODE OPERATION NEEDS ABORTING
+ * ------------------------------------------------------------------------- */
+Uint32
+Dbacc::placeWriteInLockQueue(OperationrecPtr lockOwnerPtr)
+{
+ OperationrecPtr lastOpPtr;
+ lastOpPtr.i = lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+
+ if (lastOpPtr.i == RNIL)
+ {
+ lastOpPtr = lockOwnerPtr;
+ }
+ else
+ {
+ ptrCheckGuard(lastOpPtr, coprecsize, operationrec);
+ }
+
+ ndbassert(get_parallel_head(lastOpPtr) == lockOwnerPtr.i);
+
+ Uint32 lastbits = lastOpPtr.p->m_op_bits;
+ if (lastbits & Operationrec::OP_ACC_LOCK_MODE)
+ {
+ if(operationRecPtr.p->is_same_trans(lastOpPtr.p))
+ {
+ goto checkop;
+ }
+ }
+ else
+ {
+ /**
+ * We dont have an exclusive lock on operation and
+ *
+ */
jam();
- placeSerialQueueWrite(signal);
- return ZSERIAL_QUEUE;
- }//if
+
+ /**
+ * Scan parallell queue to see if we are the only one
+ */
+ OperationrecPtr loopPtr = lockOwnerPtr;
+ do
+ {
+ ptrCheckGuard(loopPtr, coprecsize, operationrec);
+ if (!loopPtr.p->is_same_trans(operationRecPtr.p))
+ {
+ goto serial;
+ }
+ loopPtr.i = loopPtr.p->nextParallelQue;
+ } while (loopPtr.i != RNIL);
+
+ goto checkop;
+ }
+
+serial:
+ jam();
+ placeSerialQueue(lockOwnerPtr, operationRecPtr);
+
+ validate_lock_queue(lockOwnerPtr);
+ return ZSERIAL_QUEUE;
+
+checkop:
/*
- WE ARE PERFORMING AN READ EXCLUSIVE, INSERT, UPDATE OR DELETE IN THE SAME
- TRANSACTION WHERE WE PREVIOUSLY HAVE EXECUTED AN OPERATION.
- Read-All, Update-All, Insert-All and Delete-Insert are allowed
- combinations.
- Delete-Read, Delete-Update and Delete-Delete are not an allowed
- combination and will result in tuple not found error.
+ WE ARE PERFORMING AN READ EXCLUSIVE, INSERT, UPDATE OR DELETE IN THE SAME
+ TRANSACTION WHERE WE PREVIOUSLY HAVE EXECUTED AN OPERATION.
+ Read-All, Update-All, Insert-All and Delete-Insert are allowed
+ combinations.
+ Delete-Read, Delete-Update and Delete-Delete are not an allowed
+ combination and will result in tuple not found error.
*/
- mlpqOperPtr = queOperPtr;
- moveLastParallelQueueWrite(signal);
+ Uint32 lstate = lastbits & Operationrec::OP_STATE_MASK;
- if (operationRecPtr.p->operation == ZINSERT &&
- mlpqOperPtr.p->operation != ZDELETE){
+ Uint32 retValue = ZSERIAL_QUEUE; // So that it gets blocked...
+ if (lstate == Operationrec::OP_STATE_EXECUTED)
+ {
jam();
- return ZWRITE_ERROR;
- }//if
- if(operationRecPtr.p->operation == ZWRITE)
- {
- operationRecPtr.p->operation =
- (mlpqOperPtr.p->operation == ZDELETE) ? ZINSERT : ZUPDATE;
+ /**
+ * Since last operation has executed...we can now check operation types
+ * if not, we have to wait until it has executed
+ */
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ Uint32 lop = lastbits & Operationrec::OP_MASK;
+ if (op == ZINSERT && lop != ZDELETE)
+ {
+ jam();
+ return ZWRITE_ERROR;
+ }//if
+
+ /**
+ * NOTE. No checking op operation types, as one can read different save
+ * points...
+ */
+#if 0
+ if (lop == ZDELETE && (op != ZINSERT && op != ZWRITE))
+ {
+ jam();
+ return ZREAD_ERROR;
+ }
+#else
+ if (lop == ZDELETE && (op == ZUPDATE && op == ZDELETE))
+ {
+ jam();
+ return ZREAD_ERROR;
+ }
+#endif
+
+ if(op == ZWRITE)
+ {
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits |= (lop == ZDELETE) ? ZINSERT : ZUPDATE;
+ }
+
+ opbits |= Operationrec::OP_STATE_RUNNING;
+ operationRecPtr.p->localdata[0] = lastOpPtr.p->localdata[0];
+ operationRecPtr.p->localdata[1] = lastOpPtr.p->localdata[1];
+ retValue = ZPARALLEL_QUEUE;
}
- operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1];
- operationRecPtr.p->prevParallelQue = mlpqOperPtr.i;
- mlpqOperPtr.p->nextParallelQue = operationRecPtr.i;
- return ZPARALLEL_QUEUE;
+ opbits |= Operationrec::OP_RUN_QUEUE;
+ operationRecPtr.p->m_op_bits = opbits;
+ operationRecPtr.p->prevParallelQue = lastOpPtr.i;
+ operationRecPtr.p->m_lock_owner_ptr_i = lockOwnerPtr.i;
+ lastOpPtr.p->nextParallelQue = operationRecPtr.i;
+ lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i = operationRecPtr.i;
+
+ validate_lock_queue(lockOwnerPtr);
+
+ return retValue;
}//Dbacc::placeWriteInLockQueue()
-/* --------------------------------------------------------------------------------- */
-/* WE HAVE TO PLACE IT SOMEWHERE IN THE SERIAL QUEUE INSTEAD. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::placeSerialQueueWrite(Signal* signal)
+Uint32
+Dbacc::placeReadInLockQueue(OperationrecPtr lockOwnerPtr)
{
- readWriteOpPtr = queOperPtr;
- PSQW_LOOP:
- if (readWriteOpPtr.p->nextSerialQue == RNIL) {
+ OperationrecPtr lastOpPtr;
+ OperationrecPtr loopPtr = lockOwnerPtr;
+ lastOpPtr.i = lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+
+ if (lastOpPtr.i == RNIL)
+ {
+ lastOpPtr = lockOwnerPtr;
+ }
+ else
+ {
+ ptrCheckGuard(lastOpPtr, coprecsize, operationrec);
+ }
+
+ ndbassert(get_parallel_head(lastOpPtr) == lockOwnerPtr.i);
+
+ /**
+ * Last operation in parallell queue of lock owner is same trans
+ * and ACC_LOCK_MODE is exlusive, then we can proceed
+ */
+ Uint32 lastbits = lastOpPtr.p->m_op_bits;
+ bool same = operationRecPtr.p->is_same_trans(lastOpPtr.p);
+ if (same && (lastbits & Operationrec::OP_ACC_LOCK_MODE))
+ {
jam();
- /* --------------------------------------------------------------------------------- */
- /* WE COULD NOT PUT IN ANY PARALLEL QUEUE. WE MUST PUT IT LAST IN THE SERIAL QUEUE. */
- /* --------------------------------------------------------------------------------- */
- readWriteOpPtr.p->nextSerialQue = operationRecPtr.i;
- operationRecPtr.p->prevSerialQue = readWriteOpPtr.i;
- return;
- }//if
- readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue;
- ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec);
- if (getNoParallelTransaction(readWriteOpPtr.p) == 1) {
- /* --------------------------------------------------------------------------------- */
- /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */
- /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */
- /* --------------------------------------------------------------------------------- */
- if ((readWriteOpPtr.p->transId1 == operationRecPtr.p->transId1) &&
- (readWriteOpPtr.p->transId2 == operationRecPtr.p->transId2)) {
+ goto checkop;
+ }
+
+ if ((lastbits & Operationrec::OP_ACC_LOCK_MODE) && !same)
+ {
+ jam();
+ /**
+ * Last op in serial queue had X-lock and was not our transaction...
+ */
+ goto serial;
+ }
+
+ if (lockOwnerPtr.p->nextSerialQue == RNIL)
+ {
+ jam();
+ goto checkop;
+ }
+
+ /**
+ * Scan parallell queue to see if we are already there...
+ */
+ do
+ {
+ ptrCheckGuard(loopPtr, coprecsize, operationrec);
+ if (loopPtr.p->is_same_trans(operationRecPtr.p))
+ goto checkop;
+ loopPtr.i = loopPtr.p->nextParallelQue;
+ } while (loopPtr.i != RNIL);
+
+serial:
+ placeSerialQueue(lockOwnerPtr, operationRecPtr);
+
+ validate_lock_queue(lockOwnerPtr);
+
+ return ZSERIAL_QUEUE;
+
+checkop:
+ Uint32 lstate = lastbits & Operationrec::OP_STATE_MASK;
+
+ Uint32 retValue = ZSERIAL_QUEUE; // So that it gets blocked...
+ if (lstate == Operationrec::OP_STATE_EXECUTED)
+ {
+ jam();
+
+ /**
+ * NOTE. No checking op operation types, as one can read different save
+ * points...
+ */
+
+#if 0
+ /**
+ * Since last operation has executed...we can now check operation types
+ * if not, we have to wait until it has executed
+ */
+ if (lop == ZDELETE)
+ {
jam();
- /* --------------------------------------------------------------------------------- */
- /* WE ARE PERFORMING AN UPDATE OR DELETE IN THE SAME TRANSACTION WHERE WE ALREADY */
- /* PREVIOUSLY HAVE EXECUTED AN OPERATION. INSERT-DELETE, READ-UPDATE, READ-READ, */
- /* UPDATE-UPDATE, UPDATE-DELETE, READ-DELETE, INSERT-READ, INSERT-UPDATE ARE ALLOWED */
- /* COMBINATIONS. A NEW INSERT AFTER A DELETE IS NOT ALLOWED AND SUCH AN INSERT WILL */
- /* GO TO THE SERIAL LOCK QUEUE WHICH IT WILL NOT LEAVE UNTIL A TIME-OUT AND THE */
- /* TRANSACTION IS ABORTED. READS AND UPDATES AFTER DELETES IS ALSO NOT ALLOWED. */
- /* --------------------------------------------------------------------------------- */
- mlpqOperPtr = readWriteOpPtr;
- moveLastParallelQueueWrite(signal);
- readWriteOpPtr = mlpqOperPtr;
- operationRecPtr.p->prevParallelQue = readWriteOpPtr.i;
- readWriteOpPtr.p->nextParallelQue = operationRecPtr.i;
- operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1];
- return;
- }//if
- }//if
- goto PSQW_LOOP;
-}//Dbacc::placeSerialQueueWrite()
+ return ZREAD_ERROR;
+ }
+#endif
+
+ opbits |= Operationrec::OP_STATE_RUNNING;
+ operationRecPtr.p->localdata[0] = lastOpPtr.p->localdata[0];
+ operationRecPtr.p->localdata[1] = lastOpPtr.p->localdata[1];
+ retValue = ZPARALLEL_QUEUE;
+ }
+ opbits |= (lastbits & Operationrec::OP_ACC_LOCK_MODE);
+ opbits |= Operationrec::OP_RUN_QUEUE;
+ operationRecPtr.p->m_op_bits = opbits;
+
+ operationRecPtr.p->prevParallelQue = lastOpPtr.i;
+ operationRecPtr.p->m_lock_owner_ptr_i = lockOwnerPtr.i;
+ lastOpPtr.p->nextParallelQue = operationRecPtr.i;
+ lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i = operationRecPtr.i;
+
+ validate_lock_queue(lockOwnerPtr);
+
+ return retValue;
+}//Dbacc::placeReadInLockQueue
+
+void Dbacc::placeSerialQueue(OperationrecPtr lockOwnerPtr,
+ OperationrecPtr opPtr)
+{
+ OperationrecPtr lastOpPtr;
+ lastOpPtr.i = lockOwnerPtr.p->m_lo_last_serial_op_ptr_i;
+
+ if (lastOpPtr.i == RNIL)
+ {
+ // Lock owner is last...
+ ndbrequire(lockOwnerPtr.p->nextSerialQue == RNIL);
+ lastOpPtr = lockOwnerPtr;
+ }
+ else
+ {
+ ptrCheckGuard(lastOpPtr, coprecsize, operationrec);
+ }
+
+ operationRecPtr.p->prevSerialQue = lastOpPtr.i;
+ lastOpPtr.p->nextSerialQue = opPtr.i;
+ lockOwnerPtr.p->m_lo_last_serial_op_ptr_i = opPtr.i;
+}
/* ------------------------------------------------------------------------- */
/* ACC KEYREQ END */
/* ------------------------------------------------------------------------- */
void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code)
{
- if (operationRecPtr.p->keyinfoPage != RNIL) {
- jam();
- rpPageptr.i = operationRecPtr.p->keyinfoPage;
- ptrCheckGuard(rpPageptr, cpagesize, page8);
- releasePage(signal);
- operationRecPtr.p->keyinfoPage = RNIL;
- }//if
- operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
/* ************************<< */
/* ACCKEYREF */
/* ************************<< */
@@ -1658,15 +2239,15 @@ void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code)
return;
}//Dbacc::acckeyref1Lab()
-/* ******************--------------------------------------------------------------- */
-/* ACCMINUPDATE UPDATE LOCAL KEY REQ */
-/* DESCRIPTION: UPDATES LOCAL KEY OF AN ELEMENTS IN THE HASH TABLE */
-/* THIS SIGNAL IS WAITED AFTER ANY INSERT REQ */
-/* ENTER ACCMINUPDATE WITH SENDER: LQH, LEVEL B */
-/* OPERATION_REC_PTR, OPERATION RECORD PTR */
-/* CLOCALKEY(0), LOCAL KEY 1 */
-/* CLOCALKEY(1) LOCAL KEY 2 */
-/* ******************--------------------------------------------------------------- */
+/* ******************----------------------------------------------------- */
+/* ACCMINUPDATE UPDATE LOCAL KEY REQ */
+/* DESCRIPTION: UPDATES LOCAL KEY OF AN ELEMENTS IN THE HASH TABLE */
+/* THIS SIGNAL IS WAITED AFTER ANY INSERT REQ */
+/* ENTER ACCMINUPDATE WITH SENDER: LQH, LEVEL B */
+/* OPERATION_REC_PTR, OPERATION RECORD PTR */
+/* CLOCALKEY(0), LOCAL KEY 1 */
+/* CLOCALKEY(1) LOCAL KEY 2 */
+/* ******************----------------------------------------------------- */
void Dbacc::execACCMINUPDATE(Signal* signal)
{
Page8Ptr ulkPageidptr;
@@ -1679,19 +2260,26 @@ void Dbacc::execACCMINUPDATE(Signal* signal)
tlocalkey1 = signal->theData[1];
tlocalkey2 = signal->theData[2];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
- if (operationRecPtr.p->transactionstate == ACTIVE) {
- fragrecptr.i = operationRecPtr.p->fragptr;
- ulkPageidptr.i = operationRecPtr.p->elementPage;
- tulkLocalPtr = operationRecPtr.p->elementPointer + operationRecPtr.p->elementIsforward;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ fragrecptr.i = operationRecPtr.p->fragptr;
+ ulkPageidptr.i = operationRecPtr.p->elementPage;
+ tulkLocalPtr = operationRecPtr.p->elementPointer +
+ operationRecPtr.p->elementIsforward;
+
+ if ((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_RUNNING)
+ {
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
ptrCheckGuard(ulkPageidptr, cpagesize, page8);
dbgWord32(ulkPageidptr, tulkLocalPtr, tlocalkey1);
arrGuard(tulkLocalPtr, 2048);
ulkPageidptr.p->word32[tulkLocalPtr] = tlocalkey1;
operationRecPtr.p->localdata[0] = tlocalkey1;
- if (fragrecptr.p->localkeylen == 1) {
+ if (likely(fragrecptr.p->localkeylen == 1))
+ {
return;
- } else if (fragrecptr.p->localkeylen == 2) {
+ }
+ else if (fragrecptr.p->localkeylen == 2)
+ {
jam();
tulkLocalPtr = tulkLocalPtr + operationRecPtr.p->elementIsforward;
operationRecPtr.p->localdata[1] = tlocalkey2;
@@ -1715,15 +2303,17 @@ void Dbacc::execACC_COMMITREQ(Signal* signal)
{
Uint8 Toperation;
jamEntry();
- operationRecPtr.i = signal->theData[0];
+ Uint32 tmp = operationRecPtr.i = signal->theData[0];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
- ndbrequire(operationRecPtr.p->transactionstate == ACTIVE);
+ void* ptr = operationRecPtr.p;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
fragrecptr.i = operationRecPtr.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
+ Toperation = opbits & Operationrec::OP_MASK;
commitOperation(signal);
- Toperation = operationRecPtr.p->operation;
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
+ ndbassert(operationRecPtr.i == tmp);
+ ndbassert(operationRecPtr.p == ptr);
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
if(Toperation != ZREAD){
fragrecptr.p->m_commit_count++;
if (Toperation != ZINSERT) {
@@ -1732,7 +2322,7 @@ void Dbacc::execACC_COMMITREQ(Signal* signal)
} else {
jam();
fragrecptr.p->noOfElements--;
- fragrecptr.p->slack += operationRecPtr.p->insertDeleteLen;
+ fragrecptr.p->slack += fragrecptr.p->elementLength;
if (fragrecptr.p->slack > fragrecptr.p->slackCheck) {
/* TIME FOR JOIN BUCKETS PROCESS */
if (fragrecptr.p->expandCounter > 0) {
@@ -1751,7 +2341,7 @@ void Dbacc::execACC_COMMITREQ(Signal* signal)
} else {
jam(); /* EXPAND PROCESS HANDLING */
fragrecptr.p->noOfElements++;
- fragrecptr.p->slack -= operationRecPtr.p->insertDeleteLen;
+ fragrecptr.p->slack -= fragrecptr.p->elementLength;
if (fragrecptr.p->slack >= (1u << 31)) {
/* IT MEANS THAT IF SLACK < ZERO */
if (fragrecptr.p->expandFlag == 0) {
@@ -1768,45 +2358,57 @@ void Dbacc::execACC_COMMITREQ(Signal* signal)
return;
}//Dbacc::execACC_COMMITREQ()
-/* ******************--------------------------------------------------------------- */
-/* ACC ABORT REQ ABORT ALL OPERATION OF THE TRANSACTION */
-/* ******************------------------------------+ */
-/* SENDER: LQH, LEVEL B */
-/* ******************--------------------------------------------------------------- */
-/* ACC ABORT REQ ABORT TRANSACTION */
-/* ******************------------------------------+ */
+/* ******************------------------------------------------------------- */
+/* ACC ABORT REQ ABORT ALL OPERATION OF THE TRANSACTION */
+/* ******************------------------------------+ */
+/* SENDER: LQH, LEVEL B */
+/* ******************------------------------------------------------------- */
+/* ACC ABORT REQ ABORT TRANSACTION */
+/* ******************------------------------------+ */
/* SENDER: LQH, LEVEL B */
void Dbacc::execACC_ABORTREQ(Signal* signal)
{
jamEntry();
- accAbortReqLab(signal);
-}//Dbacc::execACC_ABORTREQ()
-
-void Dbacc::accAbortReqLab(Signal* signal)
-{
operationRecPtr.i = signal->theData[0];
- bool sendConf = signal->theData[1];
+ Uint32 sendConf = signal->theData[1];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
+ fragrecptr.i = operationRecPtr.p->fragptr;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
tresult = 0; /* ZFALSE */
- if ((operationRecPtr.p->transactionstate == ACTIVE) ||
- (operationRecPtr.p->transactionstate == WAIT_COMMIT_ABORT)) {
+
+ if (opbits == Operationrec::OP_EXECUTED_DIRTY_READ)
+ {
+ jam();
+ }
+ else if (opstate == Operationrec::OP_STATE_EXECUTED ||
+ opstate == Operationrec::OP_STATE_WAITING ||
+ opstate == Operationrec::OP_STATE_RUNNING)
+ {
jam();
- fragrecptr.i = operationRecPtr.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- operationRecPtr.p->transactionstate = ABORT;
abortOperation(signal);
- } else {
- ndbrequire(operationRecPtr.p->transactionstate == IDLE);
- jam();
- }//if
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- if (! sendConf)
- return;
+ }
+
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
+
signal->theData[0] = operationRecPtr.p->userptr;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACC_ABORTCONF, signal, 1, JBB);
- return;
-}//Dbacc::accAbortReqLab()
+ signal->theData[1] = 0;
+ switch(sendConf){
+ case 0:
+ return;
+ case 2:
+ if (opstate != Operationrec::OP_STATE_RUNNING)
+ {
+ return;
+ }
+ case 1:
+ sendSignal(operationRecPtr.p->userblockref, GSN_ACC_ABORTCONF,
+ signal, 1, JBB);
+ }
+
+ signal->theData[1] = RNIL;
+}
/*
* Lock or unlock tuple.
@@ -1847,8 +2449,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal)
// init as in ACCSEIZEREQ
operationRecPtr.p->userptr = req->userPtr;
operationRecPtr.p->userblockref = req->userRef;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- operationRecPtr.p->transactionstate = IDLE;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
operationRecPtr.p->scanRecPtr = RNIL;
// do read with lock via ACCKEYREQ
Uint32 lockMode = (lockOp == AccLockReq::LockShared) ? 0 : 1;
@@ -1899,8 +2500,8 @@ void Dbacc::execACC_LOCKREQ(Signal* signal)
jam();
// do abort via ACC_ABORTREQ (immediate)
signal->theData[0] = req->accOpPtr;
- signal->theData[1] = false; // Dont send abort
- accAbortReqLab(signal);
+ signal->theData[1] = 0; // Dont send abort
+ execACC_ABORTREQ(signal);
releaseOpRec(signal);
req->returnCode = AccLockReq::Success;
*sig = *req;
@@ -1910,8 +2511,8 @@ void Dbacc::execACC_LOCKREQ(Signal* signal)
jam();
// do abort via ACC_ABORTREQ (with conf signal)
signal->theData[0] = req->accOpPtr;
- signal->theData[1] = true; // send abort
- accAbortReqLab(signal);
+ signal->theData[1] = 1; // send abort
+ execACC_ABORTREQ(signal);
releaseOpRec(signal);
req->returnCode = AccLockReq::Success;
*sig = *req;
@@ -2570,16 +3171,30 @@ void Dbacc::getdirindex(Signal* signal)
}//Dbacc::getdirindex()
Uint32
-Dbacc::readTablePk(Uint32 localkey1)
+Dbacc::readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec* op)
{
+ int ret;
Uint32 tableId = fragrecptr.p->myTableId;
Uint32 fragId = fragrecptr.p->myfid;
- Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS;
- Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1);
+ bool xfrm = fragrecptr.p->hasCharAttr;
+
#ifdef VM_TRACE
memset(ckeys, 0x1f, (fragrecptr.p->keyLength * MAX_XFRM_MULTIPLY) << 2);
#endif
- int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys, true);
+
+ if (likely(localkey1 != ~(Uint32)0))
+ {
+ Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS;
+ Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1);
+ ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex,
+ ckeys, true);
+ }
+ else
+ {
+ ndbrequire(ElementHeader::getLocked(eh));
+ ndbrequire((op->m_op_bits & Operationrec::OP_MASK) != ZSCAN_OP);
+ ret = c_lqh->readPrimaryKeys(op->userptr, ckeys, xfrm);
+ }
jamEntry();
ndbrequire(ret >= 0);
return ret;
@@ -2613,11 +3228,12 @@ void ndb_acc_ia64_icc810_dummy_func()
#endif
#endif
-void Dbacc::getElement(Signal* signal)
+Uint32
+Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr)
{
+ Uint32 errcode;
DirRangePtr geOverflowrangeptr;
DirectoryarrayPtr geOverflowDirptr;
- OperationrecPtr geTmpOperationRecPtr;
Uint32 tgeElementHeader;
Uint32 tgeElemStep;
Uint32 tgeContainerhead;
@@ -2632,7 +3248,6 @@ void Dbacc::getElement(Signal* signal)
getdirindex(signal);
tgePageindex = tgdiPageindex;
gePageptr = gdiPageptr;
- tgeResult = ZFALSE;
/*
* The value seached is
* - table key for ACCKEYREQ, stored in TUP
@@ -2642,7 +3257,6 @@ void Dbacc::getElement(Signal* signal)
ndbrequire(TelemLen == ZELEM_HEAD_SIZE + fragrecptr.p->localkeylen);
tgeNextptrtype = ZLEFT;
- tgeLocked = 0;
const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits;
const Uint32 opHashValuePart = (operationRecPtr.p->hashValue >> tmp) &0xFFFF;
@@ -2655,9 +3269,17 @@ void Dbacc::getElement(Signal* signal)
tgeKeyptr = (tgeElementptr + ZELEM_HEAD_SIZE) + fragrecptr.p->localkeylen;
tgeElemStep = TelemLen;
tgeForward = 1;
- if (tgeContainerptr >= 2048) { ACCKEY_error(4); return;}
+ if (unlikely(tgeContainerptr >= 2048))
+ {
+ errcode = 4;
+ goto error;
+ }
tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26;
- if ((tgeContainerptr + tgeRemLen - 1) >= 2048) { ACCKEY_error(5); return;}
+ if (unlikely(((tgeContainerptr + tgeRemLen - 1) >= 2048)))
+ {
+ errcode = 5;
+ goto error;
+ }
} else if (tgeNextptrtype == ZRIGHT) {
jam();
tgeContainerptr = tgeContainerptr + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
@@ -2665,53 +3287,69 @@ void Dbacc::getElement(Signal* signal)
tgeKeyptr = (tgeElementptr - ZELEM_HEAD_SIZE) - fragrecptr.p->localkeylen;
tgeElemStep = 0 - TelemLen;
tgeForward = (Uint32)-1;
- if (tgeContainerptr >= 2048) { ACCKEY_error(4); return;}
+ if (unlikely(tgeContainerptr >= 2048))
+ {
+ errcode = 4;
+ goto error;
+ }
tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26;
- if ((tgeContainerptr - tgeRemLen) >= 2048) { ACCKEY_error(5); return;}
+ if (unlikely((tgeContainerptr - tgeRemLen) >= 2048))
+ {
+ errcode = 5;
+ goto error;
+ }
} else {
- ACCKEY_error(6); return;
+ errcode = 6;
+ goto error;
}//if
if (tgeRemLen >= ZCON_HEAD_SIZE + TelemLen) {
- if (tgeRemLen > ZBUF_SIZE) {
- ACCKEY_error(7); return;
+ if (unlikely(tgeRemLen > ZBUF_SIZE))
+ {
+ errcode = 7;
+ goto error;
}//if
- /* --------------------------------------------------------------------------------- */
- // There is at least one element in this container. Check if it is the element
- // searched for.
- /* --------------------------------------------------------------------------------- */
+ /* ------------------------------------------------------------------- */
+ // There is at least one element in this container.
+ // Check if it is the element searched for.
+ /* ------------------------------------------------------------------- */
do {
tgeElementHeader = gePageptr.p->word32[tgeElementptr];
tgeRemLen = tgeRemLen - TelemLen;
Uint32 hashValuePart;
+ Uint32 localkey1, localkey2;
+ lockOwnerPtr.i = RNIL;
+ lockOwnerPtr.p = NULL;
if (ElementHeader::getLocked(tgeElementHeader)) {
jam();
- geTmpOperationRecPtr.i = ElementHeader::getOpPtrI(tgeElementHeader);
- ptrCheckGuard(geTmpOperationRecPtr, coprecsize, operationrec);
- hashValuePart = geTmpOperationRecPtr.p->hashvaluePart;
+ lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader);
+ ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec);
+ hashValuePart = lockOwnerPtr.p->hashvaluePart;
+ localkey1 = lockOwnerPtr.p->localdata[0];
+ localkey2 = lockOwnerPtr.p->localdata[1];
} else {
jam();
hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader);
+ localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward];
+ localkey2 = 0;
}
if (hashValuePart == opHashValuePart) {
jam();
- Uint32 localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward];
- Uint32 localkey2 = 0;
bool found;
- if (! searchLocalKey) {
- Uint32 len = readTablePk(localkey1);
+ if (! searchLocalKey)
+ {
+ Uint32 len = readTablePk(localkey1, tgeElementptr, lockOwnerPtr.p);
found = (len == operationRecPtr.p->xfrmtupkeylen) &&
(memcmp(Tkeydata, ckeys, len << 2) == 0);
} else {
jam();
found = (localkey1 == Tkeydata[0]);
}
- if (found) {
+ if (found)
+ {
jam();
- tgeLocked = ElementHeader::getLocked(tgeElementHeader);
- tgeResult = ZTRUE;
operationRecPtr.p->localdata[0] = localkey1;
operationRecPtr.p->localdata[1] = localkey2;
- return;
+ return ZTRUE;
}
}
if (tgeRemLen <= ZCON_HEAD_SIZE) {
@@ -2720,18 +3358,22 @@ void Dbacc::getElement(Signal* signal)
tgeElementptr = tgeElementptr + tgeElemStep;
} while (true);
}//if
- if (tgeRemLen != ZCON_HEAD_SIZE) {
- ACCKEY_error(8); return;
+ if (unlikely(tgeRemLen != ZCON_HEAD_SIZE))
+ {
+ errcode = 8;
+ goto error;
}//if
tgeContainerhead = gePageptr.p->word32[tgeContainerptr];
tgeNextptrtype = (tgeContainerhead >> 7) & 0x3;
if (tgeNextptrtype == 0) {
jam();
- return; /* NO MORE CONTAINER */
+ return ZFALSE; /* NO MORE CONTAINER */
}//if
tgePageindex = tgeContainerhead & 0x7f; /* NEXT CONTAINER PAGE INDEX 7 BITS */
- if (tgePageindex > ZEMPTYLIST) {
- ACCKEY_error(9); return;
+ if (unlikely(tgePageindex > ZEMPTYLIST))
+ {
+ errcode = 9;
+ goto error;
}//if
if (((tgeContainerhead >> 9) & 1) == ZFALSE) {
jam();
@@ -2745,55 +3387,72 @@ void Dbacc::getElement(Signal* signal)
ptrCheckGuard(gePageptr, cpagesize, page8);
}//if
} while (1);
- return;
+
+ return ZFALSE;
+
+error:
+ ACCKEY_error(errcode);
+ return ~0;
}//Dbacc::getElement()
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* */
-/* END OF GET_ELEMENT MODULE */
-/* */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* */
-/* MODULE: DELETE */
-/* */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* COMMITDELETE */
-/* INPUT: OPERATION_REC_PTR, PTR TO AN OPERATION RECORD. */
-/* FRAGRECPTR, PTR TO A FRAGMENT RECORD */
-/* */
-/* OUTPUT: */
-/* NONE */
-/* DESCRIPTION: DELETE OPERATIONS WILL BE COMPLETED AT THE COMMIT OF TRANSA- */
-/* CTION. THIS SUBROUTINE SEARCHS FOR ELEMENT AND DELETES IT. IT DOES SO BY */
-/* REPLACING IT WITH THE LAST ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT */
-/* IS ALSO THE LAST ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT. */
-/* --------------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* */
+/* END OF GET_ELEMENT MODULE */
+/* */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* */
+/* MODULE: DELETE */
+/* */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* COMMITDELETE */
+/* INPUT: OPERATION_REC_PTR, PTR TO AN OPERATION RECORD. */
+/* FRAGRECPTR, PTR TO A FRAGMENT RECORD */
+/* */
+/* OUTPUT: */
+/* NONE */
+/* DESCRIPTION: DELETE OPERATIONS WILL BE COMPLETED AT THE
+ * COMMIT OF TRANSACTION. THIS SUBROUTINE SEARCHS FOR ELEMENT AND
+ * DELETES IT. IT DOES SO BY REPLACING IT WITH THE LAST
+ * ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT IS ALSO THE LAST
+ * ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT
+ * ------------------------------------------------------------------------- */
+void
+Dbacc::report_dealloc(Signal* signal, const Operationrec* opPtrP)
+{
+ Uint32 localKey = opPtrP->localdata[0];
+ Uint32 opbits = opPtrP->m_op_bits;
+ Uint32 userptr= opPtrP->userptr;
+ Uint32 scanInd =
+ ((opbits & Operationrec::OP_MASK) == ZSCAN_OP) ||
+ (opbits & Operationrec::OP_LOCK_REQ);
+
+ if (localKey != ~(Uint32)0)
+ {
+ signal->theData[0] = fragrecptr.p->myfid;
+ signal->theData[1] = fragrecptr.p->myTableId;
+ Uint32 pageId = localKey >> MAX_TUPLES_BITS;
+ Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1);
+ signal->theData[2] = pageId;
+ signal->theData[3] = pageIndex;
+ signal->theData[4] = userptr;
+ signal->theData[5] = scanInd;
+ EXECUTE_DIRECT(DBLQH, GSN_TUP_DEALLOCREQ, signal, 6);
+ jamEntry();
+ }
+}
+
void Dbacc::commitdelete(Signal* signal)
{
jam();
- Uint32 localKey = operationRecPtr.p->localdata[0];
- Uint32 userptr= operationRecPtr.p->userptr;
- Uint32 scanInd = operationRecPtr.p->operation == ZSCAN_OP
- || operationRecPtr.p->isAccLockReq;
-
- signal->theData[0] = fragrecptr.p->myfid;
- signal->theData[1] = fragrecptr.p->myTableId;
- Uint32 pageId = localKey >> MAX_TUPLES_BITS;
- Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1);
- signal->theData[2] = pageId;
- signal->theData[3] = pageIndex;
- signal->theData[4] = userptr;
- signal->theData[5] = scanInd;
- EXECUTE_DIRECT(DBLQH, GSN_TUP_DEALLOCREQ, signal, 6);
- jamEntry();
+ report_dealloc(signal, operationRecPtr.p);
getdirindex(signal);
tlastPageindex = tgdiPageindex;
@@ -3282,31 +3941,337 @@ void Dbacc::checkoverfreelist(Signal* signal)
/*BE CHECKED. THE OPERATION RECORD WILL BE REMOVED FROM THE QUEUE IF IT */
/*BELONGED TO ANY ONE, OTHERWISE THE ELEMENT HEAD WILL BE UPDATED. */
/* ------------------------------------------------------------------------- */
+
+/**
+ *
+ * P0 - P1 - P2 - P3
+ * S0
+ * S1
+ * S2
+ */
+void
+Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr)
+{
+ jam();
+ OperationrecPtr nextP;
+ OperationrecPtr prevP;
+ OperationrecPtr loPtr;
+
+ Uint32 opbits = opPtr.p->m_op_bits;
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
+ nextP.i = opPtr.p->nextParallelQue;
+ prevP.i = opPtr.p->prevParallelQue;
+ loPtr.i = opPtr.p->m_lock_owner_ptr_i;
+
+ ndbassert(! (opbits & Operationrec::OP_LOCK_OWNER));
+ ndbassert(opbits & Operationrec::OP_RUN_QUEUE);
+
+ ptrCheckGuard(prevP, coprecsize, operationrec);
+ ndbassert(prevP.p->nextParallelQue == opPtr.i);
+ prevP.p->nextParallelQue = nextP.i;
+
+ if (nextP.i != RNIL)
+ {
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ ndbassert(nextP.p->prevParallelQue == opPtr.i);
+ nextP.p->prevParallelQue = prevP.i;
+ }
+ else if (prevP.i != loPtr.i)
+ {
+ jam();
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ ndbassert(loPtr.p->m_lo_last_parallel_op_ptr_i == opPtr.i);
+ loPtr.p->m_lo_last_parallel_op_ptr_i = prevP.i;
+ prevP.p->m_lock_owner_ptr_i = loPtr.i;
+
+ /**
+ * Abort P3...check start next
+ */
+ startNext(signal, prevP);
+ validate_lock_queue(prevP);
+ return;
+ }
+ else
+ {
+ jam();
+ /**
+ * P0 - P1
+ *
+ * Abort P1, check start next
+ */
+ ndbassert(prevP.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ prevP.p->m_lo_last_parallel_op_ptr_i = RNIL;
+ startNext(signal, prevP);
+ validate_lock_queue(prevP);
+ return;
+ }
+
+ /**
+ * Abort P1/P2
+ */
+ if (opbits & Operationrec::OP_LOCK_MODE)
+ {
+ Uint32 nextbits = nextP.p->m_op_bits;
+ while ((nextbits & Operationrec::OP_LOCK_MODE) == 0)
+ {
+ ndbassert(nextbits & Operationrec::OP_ACC_LOCK_MODE);
+ nextbits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE;
+ nextP.p->m_op_bits = nextbits;
+
+ if (nextP.p->nextParallelQue != RNIL)
+ {
+ nextP.i = nextP.p->nextParallelQue;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ nextbits = nextP.p->m_op_bits;
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Abort P1, P2
+ */
+ if (opstate == Operationrec::OP_STATE_RUNNING)
+ {
+ jam();
+ startNext(signal, prevP);
+ validate_lock_queue(prevP);
+ return;
+ }
+
+ ndbassert(opstate == Operationrec::OP_STATE_EXECUTED ||
+ opstate == Operationrec::OP_STATE_WAITING);
+
+ /**
+ * Scan to last of run queue
+ */
+ while (nextP.p->nextParallelQue != RNIL)
+ {
+ jam();
+ nextP.i = nextP.p->nextParallelQue;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ }
+
+#ifdef VM_TRACE
+ loPtr.i = nextP.p->m_lock_owner_ptr_i;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ ndbassert(loPtr.p->m_lo_last_parallel_op_ptr_i == nextP.i);
+#endif
+ startNext(signal, nextP);
+ validate_lock_queue(nextP);
+
+ return;
+}
+
+void
+Dbacc::abortSerieQueueOperation(Signal* signal, OperationrecPtr opPtr)
+{
+ jam();
+ OperationrecPtr prevS, nextS;
+ OperationrecPtr prevP, nextP;
+ OperationrecPtr loPtr;
+
+ Uint32 opbits = opPtr.p->m_op_bits;
+
+ prevS.i = opPtr.p->prevSerialQue;
+ nextS.i = opPtr.p->nextSerialQue;
+
+ prevP.i = opPtr.p->prevParallelQue;
+ nextP.i = opPtr.p->nextParallelQue;
+
+ ndbassert((opbits & Operationrec::OP_LOCK_OWNER) == 0);
+ ndbassert((opbits & Operationrec::OP_RUN_QUEUE) == 0);
+
+ if (prevP.i != RNIL)
+ {
+ /**
+ * We're not list head...
+ */
+ ptrCheckGuard(prevP, coprecsize, operationrec);
+ ndbassert(prevP.p->nextParallelQue == opPtr.i);
+ prevP.p->nextParallelQue = nextP.i;
+
+ if (nextP.i != RNIL)
+ {
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ ndbassert(nextP.p->prevParallelQue == opPtr.i);
+ ndbassert((nextP.p->m_op_bits & Operationrec::OP_STATE_MASK) ==
+ Operationrec::OP_STATE_WAITING);
+ nextP.p->prevParallelQue = prevP.i;
+
+ if ((prevP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE) == 0 &&
+ opbits & Operationrec::OP_LOCK_MODE)
+ {
+ /**
+ * Scan right in parallel queue to fix OP_ACC_LOCK_MODE
+ */
+ while ((nextP.p->m_op_bits & Operationrec::OP_LOCK_MODE) == 0)
+ {
+ ndbassert(nextP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE);
+ nextP.p->m_op_bits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE;
+ nextP.i = nextP.p->nextParallelQue;
+ if (nextP.i == RNIL)
+ break;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ }
+ }
+ }
+ validate_lock_queue(prevP);
+ return;
+ }
+ else
+ {
+ /**
+ * We're a list head
+ */
+ ptrCheckGuard(prevS, coprecsize, operationrec);
+ ndbassert(prevS.p->nextSerialQue == opPtr.i);
+
+ if (nextP.i != RNIL)
+ {
+ /**
+ * Promote nextP to list head
+ */
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ ndbassert(nextP.p->prevParallelQue == opPtr.i);
+ prevS.p->nextSerialQue = nextP.i;
+ nextP.p->prevParallelQue = RNIL;
+ nextP.p->nextSerialQue = nextS.i;
+ if (nextS.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(nextS, coprecsize, operationrec);
+ ndbassert(nextS.p->prevSerialQue == opPtr.i);
+ nextS.p->prevSerialQue = nextP.i;
+ validate_lock_queue(prevS);
+ return;
+ }
+ else
+ {
+ // nextS is RNIL, i.e we're last in serie queue...
+ // we must update lockOwner.m_lo_last_serial_op_ptr_i
+ loPtr = prevS;
+ while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ loPtr.i = loPtr.p->prevSerialQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+ ndbassert(loPtr.p->m_lo_last_serial_op_ptr_i == opPtr.i);
+ loPtr.p->m_lo_last_serial_op_ptr_i = nextP.i;
+ validate_lock_queue(loPtr);
+ return;
+ }
+ }
+
+ if (nextS.i == RNIL)
+ {
+ /**
+ * Abort S2
+ */
+
+ // nextS is RNIL, i.e we're last in serie queue...
+ // and we have no parallel queue,
+ // we must update lockOwner.m_lo_last_serial_op_ptr_i
+ prevS.p->nextSerialQue = RNIL;
+
+ loPtr = prevS;
+ while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ loPtr.i = loPtr.p->prevSerialQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+ ndbassert(loPtr.p->m_lo_last_serial_op_ptr_i == opPtr.i);
+ if (prevS.i != loPtr.i)
+ {
+ jam();
+ loPtr.p->m_lo_last_serial_op_ptr_i = prevS.i;
+ }
+ else
+ {
+ loPtr.p->m_lo_last_serial_op_ptr_i = RNIL;
+ }
+ validate_lock_queue(loPtr);
+ }
+ else if (nextP.i == RNIL)
+ {
+ ptrCheckGuard(nextS, coprecsize, operationrec);
+ ndbassert(nextS.p->prevSerialQue == opPtr.i);
+ prevS.p->nextSerialQue = nextS.i;
+ nextS.p->prevSerialQue = prevS.i;
+
+ if (prevS.p->m_op_bits & Operationrec::OP_LOCK_OWNER)
+ {
+ /**
+ * Abort S0
+ */
+ OperationrecPtr lastOp;
+ lastOp.i = prevS.p->m_lo_last_parallel_op_ptr_i;
+ if (lastOp.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(lastOp, coprecsize, operationrec);
+ ndbassert(lastOp.p->m_lock_owner_ptr_i = prevS.i);
+ }
+ else
+ {
+ jam();
+ lastOp = prevS;
+ }
+ startNext(signal, lastOp);
+ validate_lock_queue(lastOp);
+ }
+ else
+ {
+ validate_lock_queue(prevS);
+ }
+ }
+ }
+}
+
+
void Dbacc::abortOperation(Signal* signal)
{
- OperationrecPtr aboOperRecPtr;
- OperationrecPtr TaboOperRecPtr;
- Page8Ptr aboPageidptr;
- Uint32 taboElementptr;
- Uint32 tmp2Olq;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+
+ validate_lock_queue(operationRecPtr);
- if (operationRecPtr.p->lockOwner == ZTRUE) {
+ if (opbits & Operationrec::OP_LOCK_OWNER)
+ {
takeOutLockOwnersList(signal, operationRecPtr);
- if (operationRecPtr.p->insertIsDone == ZTRUE) {
+ opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER;
+ if (opbits & Operationrec::OP_INSERT_IS_DONE)
+ {
jam();
- operationRecPtr.p->elementIsDisappeared = ZTRUE;
+ opbits |= Operationrec::OP_ELEMENT_DISAPPEARED;
}//if
- if ((operationRecPtr.p->nextParallelQue != RNIL) ||
- (operationRecPtr.p->nextSerialQue != RNIL)) {
+ operationRecPtr.p->m_op_bits = opbits;
+ const bool queue = (operationRecPtr.p->nextParallelQue != RNIL ||
+ operationRecPtr.p->nextSerialQue != RNIL);
+
+ if (queue)
+ {
jam();
- releaselock(signal);
- } else {
- /* --------------------------------------------------------------------------------- */
- /* WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED. IF INSERT OR STANDBY */
- /* WE DELETE THE ELEMENT OTHERWISE WE REMOVE THE LOCK FROM THE ELEMENT. */
- /* --------------------------------------------------------------------------------- */
- if (operationRecPtr.p->elementIsDisappeared == ZFALSE) {
+ release_lockowner(signal, operationRecPtr, false);
+ }
+ else
+ {
+ /* -------------------------------------------------------------------
+ * WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED.
+ * IF INSERT OR STANDBY WE DELETE THE ELEMENT OTHERWISE WE REMOVE
+ * THE LOCK FROM THE ELEMENT.
+ * ------------------------------------------------------------------ */
+ if ((opbits & Operationrec::OP_ELEMENT_DISAPPEARED) == 0)
+ {
jam();
+ Page8Ptr aboPageidptr;
+ Uint32 taboElementptr;
+ Uint32 tmp2Olq;
+
taboElementptr = operationRecPtr.p->elementPointer;
aboPageidptr.i = operationRecPtr.p->elementPage;
tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
@@ -3316,87 +4281,31 @@ void Dbacc::abortOperation(Signal* signal)
arrGuard(taboElementptr, 2048);
aboPageidptr.p->word32[taboElementptr] = tmp2Olq;
return;
- } else {
+ }
+ else
+ {
jam();
commitdelete(signal);
}//if
}//if
- } else {
- /* --------------------------------------------------------------- */
- // We are not the lock owner.
- /* --------------------------------------------------------------- */
- jam();
- if (operationRecPtr.p->prevParallelQue != RNIL) {
- jam();
- /* ---------------------------------------------------------------------------------- */
- /* SINCE WE ARE NOT QUEUE LEADER WE NEED NOT CONSIDER IF THE ELEMENT IS TO BE DELETED.*/
- /* We will simply remove it from the parallel list without any other rearrangements. */
- /* ---------------------------------------------------------------------------------- */
- aboOperRecPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue;
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- aboOperRecPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
- }//if
- } else if (operationRecPtr.p->prevSerialQue != RNIL) {
- /* ------------------------------------------------------------------------- */
- // We are not in the parallel queue owning the lock. Thus we are in another parallel
- // queue longer down in the serial queue. We are however first since prevParallelQue
- // == RNIL.
- /* ------------------------------------------------------------------------- */
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- /* ------------------------------------------------------------------------- */
- // We have an operation in the queue after us. We simply rearrange this parallel queue.
- // The new leader of this parallel queue will be operation in the serial queue.
- /* ------------------------------------------------------------------------- */
- aboOperRecPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
- aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue;
- aboOperRecPtr.p->prevParallelQue = RNIL; // Queue Leader
- if (operationRecPtr.p->nextSerialQue != RNIL) {
- jam();
- TaboOperRecPtr.i = operationRecPtr.p->nextSerialQue;
- ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec);
- TaboOperRecPtr.p->prevSerialQue = aboOperRecPtr.i;
- }//if
- TaboOperRecPtr.i = operationRecPtr.p->prevSerialQue;
- ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec);
- TaboOperRecPtr.p->nextSerialQue = aboOperRecPtr.i;
- } else {
- jam();
- /* ------------------------------------------------------------------------- */
- // We are the only operation in this parallel queue. We will thus shrink the serial
- // queue.
- /* ------------------------------------------------------------------------- */
- aboOperRecPtr.i = operationRecPtr.p->prevSerialQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
- if (operationRecPtr.p->nextSerialQue != RNIL) {
- jam();
- aboOperRecPtr.i = operationRecPtr.p->nextSerialQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue;
- }//if
- }//if
- }//if
- }//if
- /* ------------------------------------------------------------------------- */
- // If prevParallelQue = RNIL and prevSerialQue = RNIL and we are not owner of the
- // lock then we cannot be in any lock queue at all.
- /* ------------------------------------------------------------------------- */
-}//Dbacc::abortOperation()
+ }
+ else if (opbits & Operationrec::OP_RUN_QUEUE)
+ {
+ abortParallelQueueOperation(signal, operationRecPtr);
+ }
+ else
+ {
+ abortSerieQueueOperation(signal, operationRecPtr);
+ }
+}
-void Dbacc::commitDeleteCheck()
+void
+Dbacc::commitDeleteCheck()
{
OperationrecPtr opPtr;
OperationrecPtr lastOpPtr;
OperationrecPtr deleteOpPtr;
- bool elementDeleted = false;
+ Uint32 elementDeleted = 0;
bool deleteCheckOngoing = true;
Uint32 hashValue = 0;
lastOpPtr = operationRecPtr;
@@ -3409,7 +4318,9 @@ void Dbacc::commitDeleteCheck()
}//while
deleteOpPtr = lastOpPtr;
do {
- if (deleteOpPtr.p->operation == ZDELETE) {
+ Uint32 opbits = deleteOpPtr.p->m_op_bits;
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ if (op == ZDELETE) {
jam();
/* -------------------------------------------------------------------
* IF THE CURRENT OPERATION TO BE COMMITTED IS A DELETE OPERATION DUE TO
@@ -3423,10 +4334,9 @@ void Dbacc::commitDeleteCheck()
* DELETE-OPERATION THAT HAS A HASH VALUE.
* ----------------------------------------------------------------- */
hashValue = deleteOpPtr.p->hashValue;
- elementDeleted = true;
+ elementDeleted = Operationrec::OP_ELEMENT_DISAPPEARED;
deleteCheckOngoing = false;
- } else if ((deleteOpPtr.p->operation == ZREAD) ||
- (deleteOpPtr.p->operation == ZSCAN_OP)) {
+ } else if (op == ZREAD || op == ZSCAN_OP) {
/* -------------------------------------------------------------------
* We are trying to find out whether the commit will in the end delete
* the tuple. Normally the delete will be the last operation in the
@@ -3437,7 +4347,7 @@ void Dbacc::commitDeleteCheck()
* we have to continue scanning the list looking for a delete operation.
*/
deleteOpPtr.i = deleteOpPtr.p->prevParallelQue;
- if (deleteOpPtr.i == RNIL) {
+ if (opbits & Operationrec::OP_LOCK_OWNER) {
jam();
deleteCheckOngoing = false;
} else {
@@ -3456,14 +4366,14 @@ void Dbacc::commitDeleteCheck()
opPtr = lastOpPtr;
do {
jam();
- opPtr.p->commitDeleteCheckFlag = ZTRUE;
+ opPtr.p->m_op_bits |= Operationrec::OP_COMMIT_DELETE_CHECK;
if (elementDeleted) {
jam();
- opPtr.p->elementIsDisappeared = ZTRUE;
+ opPtr.p->m_op_bits |= elementDeleted;
opPtr.p->hashValue = hashValue;
}//if
opPtr.i = opPtr.p->prevParallelQue;
- if (opPtr.i == RNIL) {
+ if (opPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) {
jam();
break;
}//if
@@ -3479,14 +4389,13 @@ void Dbacc::commitDeleteCheck()
/* ------------------------------------------------------------------------- */
void Dbacc::commitOperation(Signal* signal)
{
- OperationrecPtr tolqTmpPtr;
- Page8Ptr coPageidptr;
- Uint32 tcoElementptr;
- Uint32 tmp2Olq;
+ validate_lock_queue(operationRecPtr);
- if ((operationRecPtr.p->commitDeleteCheckFlag == ZFALSE) &&
- (operationRecPtr.p->operation != ZSCAN_OP) &&
- (operationRecPtr.p->operation != ZREAD)) {
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ ndbrequire((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_EXECUTED);
+ if ((opbits & Operationrec::OP_COMMIT_DELETE_CHECK) == 0 && (op != ZREAD))
+ {
jam();
/* This method is used to check whether the end result of the transaction
will be to delete the tuple. In this case all operation will be marked
@@ -3498,16 +4407,30 @@ void Dbacc::commitOperation(Signal* signal)
lock is released.
*/
commitDeleteCheck();
+ opbits = operationRecPtr.p->m_op_bits;
}//if
- if (operationRecPtr.p->lockOwner == ZTRUE) {
+
+ ndbassert(opbits & Operationrec::OP_RUN_QUEUE);
+
+ if (opbits & Operationrec::OP_LOCK_OWNER)
+ {
takeOutLockOwnersList(signal, operationRecPtr);
- if ((operationRecPtr.p->nextParallelQue == RNIL) &&
- (operationRecPtr.p->nextSerialQue == RNIL) &&
- (operationRecPtr.p->elementIsDisappeared == ZFALSE)) {
+ opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER;
+ operationRecPtr.p->m_op_bits = opbits;
+
+ const bool queue = (operationRecPtr.p->nextParallelQue != RNIL ||
+ operationRecPtr.p->nextSerialQue != RNIL);
+
+ if (!queue && (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) == 0)
+ {
/*
- This is the normal path through the commit for operations owning the
- lock without any queues and not a delete operation.
- */
+ * This is the normal path through the commit for operations owning the
+ * lock without any queues and not a delete operation.
+ */
+ Page8Ptr coPageidptr;
+ Uint32 tcoElementptr;
+ Uint32 tmp2Olq;
+
coPageidptr.i = operationRecPtr.p->elementPage;
tcoElementptr = operationRecPtr.p->elementPointer;
tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
@@ -3517,445 +4440,439 @@ void Dbacc::commitOperation(Signal* signal)
arrGuard(tcoElementptr, 2048);
coPageidptr.p->word32[tcoElementptr] = tmp2Olq;
return;
- } else if ((operationRecPtr.p->nextParallelQue != RNIL) ||
- (operationRecPtr.p->nextSerialQue != RNIL)) {
+ }
+ else if (queue)
+ {
jam();
/*
- The case when there is a queue lined up.
- Release the lock and pass it to the next operation lined up.
- */
- releaselock(signal);
+ * The case when there is a queue lined up.
+ * Release the lock and pass it to the next operation lined up.
+ */
+ release_lockowner(signal, operationRecPtr, true);
return;
- } else {
+ }
+ else
+ {
jam();
/*
- No queue and elementIsDisappeared is true. We perform the actual delete
- operation.
- */
+ * No queue and elementIsDisappeared is true.
+ * We perform the actual delete operation.
+ */
commitdelete(signal);
return;
}//if
- } else {
- /*
- THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE
- ELEMENT.
- */
- ndbrequire(operationRecPtr.p->prevParallelQue != RNIL);
+ }
+ else
+ {
+ /**
+ * THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE
+ * ELEMENT.
+ */
jam();
- tolqTmpPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
- tolqTmpPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue;
- if (operationRecPtr.p->nextParallelQue != RNIL) {
+ OperationrecPtr prev, next, lockOwner;
+ prev.i = operationRecPtr.p->prevParallelQue;
+ next.i = operationRecPtr.p->nextParallelQue;
+ lockOwner.i = operationRecPtr.p->m_lock_owner_ptr_i;
+ ptrCheckGuard(prev, coprecsize, operationrec);
+
+ prev.p->nextParallelQue = next.i;
+ if (next.i != RNIL)
+ {
jam();
- tolqTmpPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
- tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
- }//if
-
+ ptrCheckGuard(next, coprecsize, operationrec);
+ next.p->prevParallelQue = prev.i;
+ }
+ else if (prev.p->m_op_bits & Operationrec::OP_LOCK_OWNER)
+ {
+ jam();
+ ndbassert(lockOwner.i == prev.i);
+ prev.p->m_lo_last_parallel_op_ptr_i = RNIL;
+ next = prev;
+ }
+ else
+ {
+ jam();
+ /**
+ * Last operation in parallell queue
+ */
+ ndbassert(prev.i != lockOwner.i);
+ ptrCheckGuard(lockOwner, coprecsize, operationrec);
+ ndbassert(lockOwner.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ lockOwner.p->m_lo_last_parallel_op_ptr_i = prev.i;
+ prev.p->m_lock_owner_ptr_i = lockOwner.i;
+ next = prev;
+ }
+
/**
* Check possible lock upgrade
- * 1) Find lock owner
- * 2) Count transactions in parallel que
- * 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner)
- * upgrade next serial
*/
- if(operationRecPtr.p->lockMode)
+ if(opbits & Operationrec::OP_ACC_LOCK_MODE)
{
jam();
+
/**
- * Committing a non shared operation can't lead to lock upgrade
+ * Not lock owner...committing a exclusive operation...
+ *
+ * e.g
+ * T1(R) T1(X)
+ * T2(R/X)
+ *
+ * If T1(X) commits T2(R/X) is not supposed to run
+ * as T1(R) should also commit
+ *
+ * e.g
+ * T1(R) T1(X) T1*(R)
+ * T2(R/X)
+ *
+ * If T1*(R) commits T2(R/X) is not supposed to run
+ * as T1(R),T2(x) should also commit
*/
+ validate_lock_queue(prev);
return;
}
-
- OperationrecPtr lock_owner;
- lock_owner.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(lock_owner, coprecsize, operationrec);
- Uint32 transid[2] = { lock_owner.p->transId1,
- lock_owner.p->transId2 };
-
-
- while(lock_owner.p->prevParallelQue != RNIL)
+
+ /**
+ * We committed a shared lock
+ * Check if we can start next...
+ */
+ while(next.p->nextParallelQue != RNIL)
{
- lock_owner.i = lock_owner.p->prevParallelQue;
- ptrCheckGuard(lock_owner, coprecsize, operationrec);
+ jam();
+ next.i = next.p->nextParallelQue;
+ ptrCheckGuard(next, coprecsize, operationrec);
- if(lock_owner.p->transId1 != transid[0] ||
- lock_owner.p->transId2 != transid[1])
+ if ((next.p->m_op_bits & Operationrec::OP_STATE_MASK) !=
+ Operationrec::OP_STATE_EXECUTED)
{
jam();
- /**
- * If more than 1 trans in lock queue -> no lock upgrade
- */
return;
}
}
- check_lock_upgrade(signal, lock_owner, operationRecPtr);
+ startNext(signal, next);
+
+ validate_lock_queue(prev);
}
}//Dbacc::commitOperation()
-void
-Dbacc::check_lock_upgrade(Signal* signal,
- OperationrecPtr lock_owner,
- OperationrecPtr release_op)
-{
- if((lock_owner.p->transId1 == release_op.p->transId1 &&
- lock_owner.p->transId2 == release_op.p->transId2) ||
- release_op.p->lockMode ||
- lock_owner.p->nextSerialQue == RNIL)
- {
- jam();
- /**
- * No lock upgrade if same trans or lock owner has no serial queue
- * or releasing non shared op
- */
- return;
- }
-
- OperationrecPtr next;
- next.i = lock_owner.p->nextSerialQue;
- ptrCheckGuard(next, coprecsize, operationrec);
+void
+Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit)
+{
+ OperationrecPtr nextP;
+ OperationrecPtr nextS;
+ OperationrecPtr newOwner;
+ OperationrecPtr lastP;
- if(lock_owner.p->transId1 != next.p->transId1 ||
- lock_owner.p->transId2 != next.p->transId2)
+ Uint32 opbits = opPtr.p->m_op_bits;
+ nextP.i = opPtr.p->nextParallelQue;
+ nextS.i = opPtr.p->nextSerialQue;
+ lastP.i = opPtr.p->m_lo_last_parallel_op_ptr_i;
+ Uint32 lastS = opPtr.p->m_lo_last_serial_op_ptr_i;
+
+ ndbassert(lastP.i != RNIL || lastS != RNIL);
+ ndbassert(nextP.i != RNIL || nextS.i != RNIL);
+
+ enum {
+ NOTHING,
+ CHECK_LOCK_UPGRADE,
+ START_NEW
+ } action = NOTHING;
+
+ if (nextP.i != RNIL)
{
jam();
- /**
- * No lock upgrad if !same trans in serial queue
- */
- return;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ newOwner = nextP;
+
+ if (lastP.i == newOwner.i)
+ {
+ newOwner.p->m_lo_last_parallel_op_ptr_i = RNIL;
+ lastP = nextP;
+ }
+ else
+ {
+ ptrCheckGuard(lastP, coprecsize, operationrec);
+ newOwner.p->m_lo_last_parallel_op_ptr_i = lastP.i;
+ lastP.p->m_lock_owner_ptr_i = newOwner.i;
+ }
+
+ newOwner.p->m_lo_last_serial_op_ptr_i = lastS;
+ newOwner.p->nextSerialQue = nextS.i;
+
+ if (nextS.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(nextS, coprecsize, operationrec);
+ ndbassert(nextS.p->prevSerialQue == opPtr.i);
+ nextS.p->prevSerialQue = newOwner.i;
+ }
+
+ if (commit)
+ {
+ if ((opbits & Operationrec::OP_ACC_LOCK_MODE) == ZREADLOCK)
+ {
+ jam();
+ /**
+ * Lock owner...committing a shared operation...
+ * this can be a lock upgrade
+ *
+ * e.g
+ * T1(R) T2(R)
+ * T2(X)
+ *
+ * If T1(R) commits T2(X) is supposed to run
+ *
+ * e.g
+ * T1(X) T1(R)
+ * T2(R)
+ *
+ * If T1(X) commits, then T1(R) _should_ commit before T2(R) is
+ * allowed to proceed
+ */
+ action = CHECK_LOCK_UPGRADE;
+ }
+ else
+ {
+ jam();
+ newOwner.p->m_op_bits |= Operationrec::OP_LOCK_MODE;
+ }
+ }
+ else
+ {
+ /**
+ * Aborting an operation can *always* lead to lock upgrade
+ */
+ action = CHECK_LOCK_UPGRADE;
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
+ if (opstate != Operationrec::OP_STATE_EXECUTED)
+ {
+ ndbassert(opstate == Operationrec::OP_STATE_RUNNING);
+ if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
+ {
+ jam();
+ report_dealloc(signal, opPtr.p);
+ newOwner.p->localdata[0] = ~(Uint32)0;
+ }
+ else
+ {
+ jam();
+ newOwner.p->localdata[0] = opPtr.p->localdata[0];
+ newOwner.p->localdata[1] = opPtr.p->localdata[1];
+ }
+ action = START_NEW;
+ }
+
+ /**
+ * Update ACC_LOCK_MODE
+ */
+ if (opbits & Operationrec::OP_LOCK_MODE)
+ {
+ Uint32 nextbits = nextP.p->m_op_bits;
+ while ((nextbits & Operationrec::OP_LOCK_MODE) == 0)
+ {
+ ndbassert(nextbits & Operationrec::OP_ACC_LOCK_MODE);
+ nextbits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE;
+ nextP.p->m_op_bits = nextbits;
+
+ if (nextP.p->nextParallelQue != RNIL)
+ {
+ nextP.i = nextP.p->nextParallelQue;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ nextbits = nextP.p->m_op_bits;
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
}
-
- if (getNoParallelTransaction(lock_owner.p) > 1)
+ else
{
jam();
- /**
- * No lock upgrade if more than 1 transaction in parallell queue
- */
- return;
- }
+ ptrCheckGuard(nextS, coprecsize, operationrec);
+ newOwner = nextS;
+
+ newOwner.p->m_op_bits |= Operationrec::OP_RUN_QUEUE;
+
+ if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
+ {
+ report_dealloc(signal, opPtr.p);
+ newOwner.p->localdata[0] = ~(Uint32)0;
+ }
+ else
+ {
+ jam();
+ newOwner.p->localdata[0] = opPtr.p->localdata[0];
+ newOwner.p->localdata[1] = opPtr.p->localdata[1];
+ }
+
+ lastP = newOwner;
+ while (lastP.p->nextParallelQue != RNIL)
+ {
+ lastP.i = lastP.p->nextParallelQue;
+ ptrCheckGuard(lastP, coprecsize, operationrec);
+ lastP.p->m_op_bits |= Operationrec::OP_RUN_QUEUE;
+ }
+
+ if (newOwner.i != lastP.i)
+ {
+ jam();
+ newOwner.p->m_lo_last_parallel_op_ptr_i = lastP.i;
+ }
+ else
+ {
+ jam();
+ newOwner.p->m_lo_last_parallel_op_ptr_i = RNIL;
+ }
- if (getNoParallelTransaction(next.p) > 1)
- {
- jam();
- /**
- * No lock upgrade if more than 1 transaction in next's parallell queue
- */
- return;
+ if (newOwner.i != lastS)
+ {
+ jam();
+ newOwner.p->m_lo_last_serial_op_ptr_i = lastS;
+ }
+ else
+ {
+ jam();
+ newOwner.p->m_lo_last_serial_op_ptr_i = RNIL;
+ }
+
+ action = START_NEW;
}
- OperationrecPtr tmp;
- tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue;
- if(tmp.i != RNIL)
- {
- ptrCheckGuard(tmp, coprecsize, operationrec);
- ndbassert(tmp.p->prevSerialQue == next.i);
- tmp.p->prevSerialQue = lock_owner.i;
- }
- next.p->nextSerialQue = next.p->prevSerialQue = RNIL;
+ insertLockOwnersList(signal, newOwner);
- // Find end of parallell que
- tmp = lock_owner;
- Uint32 lockMode = next.p->lockMode > lock_owner.p->lockMode ?
- next.p->lockMode : lock_owner.p->lockMode;
- while(tmp.p->nextParallelQue != RNIL)
+ /**
+ * Copy op info, and store op in element
+ *
+ */
{
- jam();
- tmp.i = tmp.p->nextParallelQue;
- tmp.p->lockMode = lockMode;
- ptrCheckGuard(tmp, coprecsize, operationrec);
- }
- tmp.p->lockMode = lockMode;
-
- next.p->prevParallelQue = tmp.i;
- tmp.p->nextParallelQue = next.i;
-
- OperationrecPtr save = operationRecPtr;
-
- Uint32 localdata[2];
- localdata[0] = lock_owner.p->localdata[0];
- localdata[1] = lock_owner.p->localdata[1];
- do {
- next.p->localdata[0] = localdata[0];
- next.p->localdata[1] = localdata[1];
- next.p->lockMode = lockMode;
-
- operationRecPtr = next;
- executeNextOperation(signal);
- if (next.p->nextParallelQue != RNIL)
+ newOwner.p->elementPage = opPtr.p->elementPage;
+ newOwner.p->elementIsforward = opPtr.p->elementIsforward;
+ newOwner.p->elementPointer = opPtr.p->elementPointer;
+ newOwner.p->elementContainer = opPtr.p->elementContainer;
+ newOwner.p->scanBits = opPtr.p->scanBits;
+ newOwner.p->hashvaluePart = opPtr.p->hashvaluePart;
+ newOwner.p->m_op_bits |= (opbits & Operationrec::OP_ELEMENT_DISAPPEARED);
+ if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
{
+ /* ------------------------------------------------------------------- */
+ // If the elementIsDisappeared is set then we know that the
+ // hashValue is also set since it always originates from a
+ // committing abort or a aborting insert.
+ // Scans do not initialise the hashValue and must have this
+ // value initialised if they are
+ // to successfully commit the delete.
+ /* ------------------------------------------------------------------- */
jam();
- next.i = next.p->nextParallelQue;
- ptrCheckGuard(next, coprecsize, operationrec);
- } else {
- jam();
- break;
+ newOwner.p->hashValue = opPtr.p->hashValue;
}//if
- } while (1);
+
+ Page8Ptr pagePtr;
+ pagePtr.i = newOwner.p->elementPage;
+ ptrCheckGuard(pagePtr, cpagesize, page8);
+ const Uint32 tmp = ElementHeader::setLocked(newOwner.i);
+ arrGuard(newOwner.p->elementPointer, 2048);
+ pagePtr.p->word32[newOwner.p->elementPointer] = tmp;
+ }
- operationRecPtr = save;
+ switch(action){
+ case NOTHING:
+ validate_lock_queue(newOwner);
+ return;
+ case START_NEW:
+ startNew(signal, newOwner);
+ validate_lock_queue(newOwner);
+ return;
+ case CHECK_LOCK_UPGRADE:
+ startNext(signal, lastP);
+ validate_lock_queue(lastP);
+ break;
+ }
}
-/* ------------------------------------------------------------------------- */
-/* RELEASELOCK */
-/* RESETS LOCK OF AN ELEMENT. */
-/* INFORMATION ABOUT THE ELEMENT IS SAVED IN THE OPERATION RECORD */
-/* THESE INFORMATION IS USED TO UPDATE HEADER OF THE ELEMENT */
-/* ------------------------------------------------------------------------- */
-void Dbacc::releaselock(Signal* signal)
-{
- OperationrecPtr rloOperPtr;
- OperationrecPtr trlOperPtr;
- OperationrecPtr trlTmpOperPtr;
- Uint32 TelementIsDisappeared;
-
- trlOperPtr.i = RNIL;
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- /** ---------------------------------------------------------------------
- * NEXT OPERATION TAKES OVER THE LOCK.
- * We will simply move the info from the leader
- * to the new queue leader.
- * -------------------------------------------------------------------- */
- trlOperPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(trlOperPtr, coprecsize, operationrec);
- copyInOperPtr = trlOperPtr;
- copyOperPtr = operationRecPtr;
- copyOpInfo(signal);
- trlOperPtr.p->prevParallelQue = RNIL;
- if (operationRecPtr.p->nextSerialQue != RNIL) {
- jam();
- /* -----------------------------------------------------------------
- * THERE IS A SERIAL QUEUE. MOVE IT FROM RELEASED OP REC TO THE
- * NEW LOCK OWNER.
- * ------------------------------------------------------------------ */
- trlOperPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
- trlTmpOperPtr.i = trlOperPtr.p->nextSerialQue;
- ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
- trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i;
- }//if
-
- check_lock_upgrade(signal, copyInOperPtr, operationRecPtr);
- } else {
- ndbrequire(operationRecPtr.p->nextSerialQue != RNIL);
- jam();
- /** ---------------------------------------------------------------------
- * THE PARALLEL QUEUE IS EMPTY AND THE SERIAL QUEUE IS NOT EMPTY.
- * WE NEED TO REARRANGE LISTS AND START A NUMBER OF OPERATIONS.
- * -------------------------------------------------------------------- */
- trlOperPtr.i = operationRecPtr.p->nextSerialQue;
- ptrCheckGuard(trlOperPtr, coprecsize, operationrec);
- copyOperPtr = operationRecPtr;
- copyInOperPtr = trlOperPtr;
- copyOpInfo(signal);
- trlOperPtr.p->prevSerialQue = RNIL;
- ndbrequire(trlOperPtr.p->prevParallelQue == RNIL);
- /* --------------------------------------------------------------------- */
- /* WE HAVE MOVED TO THE NEXT PARALLEL QUEUE. WE MUST START ALL OF THOSE */
- /* OPERATIONS WHICH UP TILL NOW HAVE BEEN QUEUED WAITING FOR THE LOCK. */
- /* --------------------------------------------------------------------- */
- rloOperPtr = operationRecPtr;
- trlTmpOperPtr = trlOperPtr;
- TelementIsDisappeared = trlOperPtr.p->elementIsDisappeared;
- Uint32 ThashValue = trlOperPtr.p->hashValue;
- do {
- /* ------------------------------------------------------------------ */
- // Ensure that all operations in the queue are assigned with the
- // elementIsDisappeared to ensure that the element is removed after
- // a previous delete. An insert does however revert this decision
- // since the element is put back again.
- // Local checkpoints complicate life here since they do not
- // execute the next operation but simply change
- // the state on the operation.
- // We need to set-up the variable elementIsDisappeared
- // properly even when local checkpoints and inserts/writes after
- // deletes occur.
- /* ------------------------------------------------------------------- */
- trlTmpOperPtr.p->elementIsDisappeared = TelementIsDisappeared;
- if (TelementIsDisappeared == ZTRUE) {
- /* ----------------------------------------------------------------- */
- // If the elementIsDisappeared is set then we know that the
- // hashValue is also set since it always originates from a
- // committing abort or a aborting insert.
- // Scans do not initialise the hashValue and must have this
- // value initialised if they are to successfully commit the delete.
- /* ----------------------------------------------------------------- */
- jam();
- trlTmpOperPtr.p->hashValue = ThashValue;
- }//if
- trlTmpOperPtr.p->localdata[0] = trlOperPtr.p->localdata[0];
- trlTmpOperPtr.p->localdata[1] = trlOperPtr.p->localdata[1];
- /* ------------------------------------------------------------------- */
- // Restart the queued operation.
- /* ------------------------------------------------------------------- */
- operationRecPtr = trlTmpOperPtr;
- TelementIsDisappeared = executeNextOperation(signal);
- ThashValue = operationRecPtr.p->hashValue;
- if (trlTmpOperPtr.p->nextParallelQue != RNIL) {
- jam();
- /* ----------------------------------------------------------------- */
- // We will continue with the next operation in the parallel
- // queue and start this as well.
- /* ----------------------------------------------------------------- */
- trlTmpOperPtr.i = trlTmpOperPtr.p->nextParallelQue;
- ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
- } else {
- jam();
- break;
- }//if
- } while (1);
- operationRecPtr = rloOperPtr;
- }//if
-
- // Insert the next op into the lock owner list
- insertLockOwnersList(signal, trlOperPtr);
- return;
-}//Dbacc::releaselock()
-
-/* --------------------------------------------------------------------------------- */
-/* COPY_OP_INFO */
-/* INPUT: COPY_IN_OPER_PTR AND COPY_OPER_PTR. */
-/* DESCRIPTION:INFORMATION ABOUT THE ELEMENT WILL BE MOVED FROM OPERATION */
-/* REC TO QUEUE OP REC. QUE OP REC TAKES OVER THE LOCK. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::copyOpInfo(Signal* signal)
+void
+Dbacc::startNew(Signal* signal, OperationrecPtr newOwner)
{
- Page8Ptr coiPageidptr;
-
- copyInOperPtr.p->elementPage = copyOperPtr.p->elementPage;
- copyInOperPtr.p->elementIsforward = copyOperPtr.p->elementIsforward;
- copyInOperPtr.p->elementContainer = copyOperPtr.p->elementContainer;
- copyInOperPtr.p->elementPointer = copyOperPtr.p->elementPointer;
- copyInOperPtr.p->scanBits = copyOperPtr.p->scanBits;
- copyInOperPtr.p->hashvaluePart = copyOperPtr.p->hashvaluePart;
- copyInOperPtr.p->elementIsDisappeared = copyOperPtr.p->elementIsDisappeared;
- if (copyInOperPtr.p->elementIsDisappeared == ZTRUE) {
- /* --------------------------------------------------------------------------------- */
- // If the elementIsDisappeared is set then we know that the hashValue is also set
- // since it always originates from a committing abort or a aborting insert. Scans
- // do not initialise the hashValue and must have this value initialised if they are
- // to successfully commit the delete.
- /* --------------------------------------------------------------------------------- */
- jam();
- copyInOperPtr.p->hashValue = copyOperPtr.p->hashValue;
- }//if
- coiPageidptr.i = copyOperPtr.p->elementPage;
- ptrCheckGuard(coiPageidptr, cpagesize, page8);
- const Uint32 tmp = ElementHeader::setLocked(copyInOperPtr.i);
- dbgWord32(coiPageidptr, copyOperPtr.p->elementPointer, tmp);
- arrGuard(copyOperPtr.p->elementPointer, 2048);
- coiPageidptr.p->word32[copyOperPtr.p->elementPointer] = tmp;
- copyInOperPtr.p->localdata[0] = copyOperPtr.p->localdata[0];
- copyInOperPtr.p->localdata[1] = copyOperPtr.p->localdata[1];
-}//Dbacc::copyOpInfo()
+ OperationrecPtr save = operationRecPtr;
+ operationRecPtr = newOwner;
+
+ Uint32 opbits = newOwner.p->m_op_bits;
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ Uint32 opstate = (opbits & Operationrec::OP_STATE_MASK);
+ ndbassert(opstate == Operationrec::OP_STATE_WAITING);
+ ndbassert(opbits & Operationrec::OP_LOCK_OWNER);
+ const bool deleted = opbits & Operationrec::OP_ELEMENT_DISAPPEARED;
+ Uint32 errCode = 0;
+
+ opbits &= opbits & ~(Uint32)Operationrec::OP_STATE_MASK;
+ opbits |= Operationrec::OP_STATE_RUNNING;
+
+ if (op == ZSCAN_OP && (opbits & Operationrec::OP_LOCK_REQ) == 0)
+ goto scan;
-/* ******************--------------------------------------------------------------- */
-/* EXECUTE NEXT OPERATION */
-/* NEXT OPERATION IN A LOCK QUEUE WILL BE EXECUTED. */
-/* --------------------------------------------------------------------------------- */
-Uint32 Dbacc::executeNextOperation(Signal* signal)
-{
- ndbrequire(operationRecPtr.p->transactionstate == ACTIVE);
- if (operationRecPtr.p->elementIsDisappeared == ZTRUE) {
- /* --------------------------------------------------------------------- */
- /* PREVIOUS OPERATION WAS DELETE OPERATION AND THE ELEMENT IS DELETED. */
- /* --------------------------------------------------------------------- */
- if (((operationRecPtr.p->operation != ZINSERT) &&
- (operationRecPtr.p->operation != ZWRITE)) ||
- (operationRecPtr.p->prevParallelQue != RNIL)) {
- if (operationRecPtr.p->operation != ZSCAN_OP ||
- operationRecPtr.p->isAccLockReq) {
- jam();
- /* ----------------------------------------------------------------- */
- // Updates and reads with a previous delete simply aborts with read
- // error indicating that tuple did not exist.
- // Also inserts and writes not being the first operation.
- /* ----------------------------------------------------------------- */
- operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
- signal->theData[0] = operationRecPtr.p->userptr;
- signal->theData[1] = ZREAD_ERROR;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal,
- 2, JBB);
- return operationRecPtr.p->elementIsDisappeared;
- } else {
- /* ----------------------------------------------------------------- */
- /* ABORT OF OPERATION NEEDED BUT THE OPERATION IS A
- * SCAN => SPECIAL TREATMENT.
- * IF THE SCAN WAITS IN QUEUE THEN WE MUST REMOVE THE OPERATION
- * FROM THE SCAN LOCK QUEUE AND IF NO MORE OPERATIONS ARE QUEUED
- * THEN WE SHOULD RESTART THE SCAN PROCESS. OTHERWISE WE SIMPLY
- * RELEASE THE OPERATION AND DECREASE THE NUMBER OF LOCKS HELD.
- * ----------------------------------------------------------------- */
- takeOutScanLockQueue(operationRecPtr.p->scanRecPtr);
- putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr);
- return operationRecPtr.p->elementIsDisappeared;
- }//if
- }//if
- /* --------------------------------------------------------------------- */
- // Insert and writes can continue but need to be converted to inserts.
- /* --------------------------------------------------------------------- */
- jam();
- operationRecPtr.p->elementIsDisappeared = ZFALSE;
- operationRecPtr.p->operation = ZINSERT;
- operationRecPtr.p->insertIsDone = ZTRUE;
- } else if (operationRecPtr.p->operation == ZINSERT) {
- bool abortFlag = true;
- if (operationRecPtr.p->prevParallelQue != RNIL) {
- OperationrecPtr prevOpPtr;
- jam();
- prevOpPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(prevOpPtr, coprecsize, operationrec);
- if (prevOpPtr.p->operation == ZDELETE) {
- jam();
- abortFlag = false;
- }//if
- }//if
- if (abortFlag) {
- jam();
- /* ------------------------------------------------------------------- */
- /* ELEMENT STILL REMAINS AND WE ARE TRYING TO INSERT IT AGAIN. */
- /* THIS IS CLEARLY NOT A GOOD IDEA. */
- /* ------------------------------------------------------------------- */
- operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
- signal->theData[0] = operationRecPtr.p->userptr;
- signal->theData[1] = ZWRITE_ERROR;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal,
- 2, JBB);
- return operationRecPtr.p->elementIsDisappeared;
- }//if
- }
- else if(operationRecPtr.p->operation == ZWRITE)
+ if (deleted)
{
jam();
- operationRecPtr.p->operation = ZUPDATE;
- if (operationRecPtr.p->prevParallelQue != RNIL) {
- OperationrecPtr prevOpPtr;
- jam();
- prevOpPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(prevOpPtr, coprecsize, operationrec);
- if (prevOpPtr.p->operation == ZDELETE)
- {
- jam();
- operationRecPtr.p->operation = ZINSERT;
- }
+ if (op != ZINSERT && op != ZWRITE)
+ {
+ errCode = ZREAD_ERROR;
+ goto ref;
}
+
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits &= ~(Uint32)Operationrec::OP_ELEMENT_DISAPPEARED;
+ opbits |= (op = ZINSERT);
+ opbits |= Operationrec::OP_INSERT_IS_DONE;
+ goto conf;
}
-
- if (operationRecPtr.p->operation == ZSCAN_OP &&
- ! operationRecPtr.p->isAccLockReq) {
+ else if (op == ZINSERT)
+ {
jam();
- takeOutScanLockQueue(operationRecPtr.p->scanRecPtr);
- putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr);
- } else {
+ errCode = ZWRITE_ERROR;
+ goto ref;
+ }
+ else if (op == ZWRITE)
+ {
jam();
- sendAcckeyconf(signal);
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYCONF,
- signal, 6, JBB);
- }//if
- return operationRecPtr.p->elementIsDisappeared;
-}//Dbacc::executeNextOperation()
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits |= (op = ZUPDATE);
+ goto conf;
+ }
+
+conf:
+ newOwner.p->m_op_bits = opbits;
+
+ sendAcckeyconf(signal);
+ sendSignal(newOwner.p->userblockref, GSN_ACCKEYCONF,
+ signal, 6, JBB);
+
+ operationRecPtr = save;
+ return;
+
+scan:
+ jam();
+ newOwner.p->m_op_bits = opbits;
+
+ takeOutScanLockQueue(newOwner.p->scanRecPtr);
+ putReadyScanQueue(signal, newOwner.p->scanRecPtr);
+
+ operationRecPtr = save;
+ return;
+
+ref:
+ newOwner.p->m_op_bits = opbits;
+
+ signal->theData[0] = newOwner.p->userptr;
+ signal->theData[1] = errCode;
+ sendSignal(newOwner.p->userblockref, GSN_ACCKEYREF, signal,
+ 2, JBB);
+
+ operationRecPtr = save;
+ return;
+}
/**
* takeOutLockOwnersList
@@ -3969,7 +4886,6 @@ void Dbacc::takeOutLockOwnersList(Signal* signal,
{
const Uint32 Tprev = outOperPtr.p->prevLockOwnerOp;
const Uint32 Tnext = outOperPtr.p->nextLockOwnerOp;
-
#ifdef VM_TRACE
// Check that operation is already in the list
OperationrecPtr tmpOperPtr;
@@ -3984,8 +4900,7 @@ void Dbacc::takeOutLockOwnersList(Signal* signal,
ndbrequire(inList == true);
#endif
- ndbrequire(outOperPtr.p->lockOwner == ZTRUE);
- outOperPtr.p->lockOwner = ZFALSE;
+ ndbassert(outOperPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
// Fast path through the code for the common case.
if ((Tprev == RNIL) && (Tnext == RNIL)) {
@@ -4026,7 +4941,6 @@ void Dbacc::insertLockOwnersList(Signal* signal,
const OperationrecPtr& insOperPtr)
{
OperationrecPtr tmpOperPtr;
-
#ifdef VM_TRACE
// Check that operation is not already in list
tmpOperPtr.i = fragrecptr.p->lockOwnersList;
@@ -4036,18 +4950,15 @@ void Dbacc::insertLockOwnersList(Signal* signal,
tmpOperPtr.i = tmpOperPtr.p->nextLockOwnerOp;
}
#endif
+ tmpOperPtr.i = fragrecptr.p->lockOwnersList;
+
+ ndbrequire(! (insOperPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER));
- ndbrequire(insOperPtr.p->lockOwner == ZFALSE);
-
- insOperPtr.p->lockOwner = ZTRUE;
+ insOperPtr.p->m_op_bits |= Operationrec::OP_LOCK_OWNER;
insOperPtr.p->prevLockOwnerOp = RNIL;
- tmpOperPtr.i = fragrecptr.p->lockOwnersList;
- const Uint32 seq = fragrecptr.p->m_current_sequence_no;
insOperPtr.p->nextLockOwnerOp = tmpOperPtr.i;
- insOperPtr.p->m_sequence_no = seq;
fragrecptr.p->lockOwnersList = insOperPtr.i;
- fragrecptr.p->m_current_sequence_no = seq+1;
if (tmpOperPtr.i == RNIL) {
return;
} else {
@@ -5407,6 +6318,7 @@ void Dbacc::execNEXT_SCANREQ(Signal* signal)
if (!scanPtr.p->scanReadCommittedFlag) {
commitOperation(signal);
}//if
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
takeOutActiveScanOp(signal);
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
@@ -5422,9 +6334,10 @@ void Dbacc::execNEXT_SCANREQ(Signal* signal)
jam();
fragrecptr.i = scanPtr.p->activeLocalFrag;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- /* --------------------------------------------------------------------------------- */
- /* THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL. RELESE ALL INVOLVED REC. */
- /* --------------------------------------------------------------------------------- */
+ /* ---------------------------------------------------------------------
+ * THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL.
+ * RELESE ALL INVOLVED REC.
+ * ------------------------------------------------------------------- */
releaseScanLab(signal);
return;
break;
@@ -5474,24 +6387,26 @@ void Dbacc::checkNextBucketLab(Signal* signal)
scanPtr.p->nextBucketIndex++;
if (scanPtr.p->scanBucketState == ScanRec::SECOND_LAP) {
if (scanPtr.p->nextBucketIndex > scanPtr.p->maxBucketIndexToRescan) {
- /* --------------------------------------------------------------------------------- */
- // We have finished the rescan phase. We are ready to proceed with the next fragment part.
- /* --------------------------------------------------------------------------------- */
+ /* ---------------------------------------------------------------- */
+ // We have finished the rescan phase.
+ // We are ready to proceed with the next fragment part.
+ /* ---------------------------------------------------------------- */
jam();
checkNextFragmentLab(signal);
return;
}//if
} else if (scanPtr.p->scanBucketState == ScanRec::FIRST_LAP) {
if ((fragrecptr.p->p + fragrecptr.p->maxp) < scanPtr.p->nextBucketIndex) {
- /* --------------------------------------------------------------------------------- */
+ /* ---------------------------------------------------------------- */
// All buckets have been scanned a first time.
- /* --------------------------------------------------------------------------------- */
+ /* ---------------------------------------------------------------- */
if (scanPtr.p->minBucketIndexToRescan == 0xFFFFFFFF) {
jam();
- /* --------------------------------------------------------------------------------- */
- // We have not had any merges behind the scan. Thus it is not necessary to perform
- // any rescan any buckets and we can proceed immediately with the next fragment part.
- /* --------------------------------------------------------------------------------- */
+ /* -------------------------------------------------------------- */
+ // We have not had any merges behind the scan.
+ // Thus it is not necessary to perform any rescan any buckets
+ // and we can proceed immediately with the next fragment part.
+ /* --------------------------------------------------------------- */
checkNextFragmentLab(signal);
return;
} else {
@@ -5583,18 +6498,23 @@ void Dbacc::checkNextBucketLab(Signal* signal)
tslElementptr = tnsElementptr;
setlock(signal);
insertLockOwnersList(signal, operationRecPtr);
+ operationRecPtr.p->m_op_bits |=
+ Operationrec::OP_STATE_RUNNING | Operationrec::OP_RUN_QUEUE;
}//if
} else {
arrGuard(tnsElementptr, 2048);
queOperPtr.i =
ElementHeader::getOpPtrI(nsPageptr.p->word32[tnsElementptr]);
ptrCheckGuard(queOperPtr, coprecsize, operationrec);
- if (queOperPtr.p->elementIsDisappeared == ZTRUE) {
+ if (queOperPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED ||
+ queOperPtr.p->localdata[0] == ~(Uint32)0)
+ {
jam();
- /* --------------------------------------------------------------------------------- */
- // If the lock owner indicates the element is disappeared then we will not report this
- // tuple. We will continue with the next tuple.
- /* --------------------------------------------------------------------------------- */
+ /* ------------------------------------------------------------------ */
+ // If the lock owner indicates the element is disappeared then
+ // we will not report this tuple. We will continue with the next tuple.
+ /* ------------------------------------------------------------------ */
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
signal->theData[0] = scanPtr.i;
@@ -5606,31 +6526,29 @@ void Dbacc::checkNextBucketLab(Signal* signal)
Uint32 return_result;
if (scanPtr.p->scanLockMode == ZREADLOCK) {
jam();
- priPageptr = nsPageptr;
- tpriElementptr = tnsElementptr;
- return_result = placeReadInLockQueue(signal);
+ return_result = placeReadInLockQueue(queOperPtr);
} else {
jam();
- pwiPageptr = nsPageptr;
- tpwiElementptr = tnsElementptr;
- return_result = placeWriteInLockQueue(signal);
+ return_result = placeWriteInLockQueue(queOperPtr);
}//if
if (return_result == ZSERIAL_QUEUE) {
- /* --------------------------------------------------------------------------------- */
- /* WE PLACED THE OPERATION INTO A SERIAL QUEUE AND THUS WE HAVE TO WAIT FOR */
- /* THE LOCK TO BE RELEASED. WE CONTINUE WITH THE NEXT ELEMENT. */
- /* --------------------------------------------------------------------------------- */
+ /* -----------------------------------------------------------------
+ * WE PLACED THE OPERATION INTO A SERIAL QUEUE AND THUS WE HAVE TO
+ * WAIT FOR THE LOCK TO BE RELEASED. WE CONTINUE WITH THE NEXT ELEMENT
+ * ----------------------------------------------------------------- */
putOpScanLockQue(); /* PUT THE OP IN A QUE IN THE SCAN REC */
signal->theData[0] = scanPtr.i;
signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
sendSignal(cownBlockref, GSN_ACC_CHECK_SCAN, signal, 2, JBB);
return;
- } else if (return_result == ZWRITE_ERROR) {
+ } else if (return_result != ZPARALLEL_QUEUE) {
jam();
- /* --------------------------------------------------------------------------------- */
- // The tuple is either not committed yet or a delete in the same transaction (not
- // possible here since we are a scan). Thus we simply continue with the next tuple.
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------- */
+ // The tuple is either not committed yet or a delete in
+ // the same transaction (not possible here since we are a scan).
+ // Thus we simply continue with the next tuple.
+ /* ----------------------------------------------------------------- */
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
signal->theData[0] = scanPtr.i;
@@ -5641,10 +6559,11 @@ void Dbacc::checkNextBucketLab(Signal* signal)
ndbassert(return_result == ZPARALLEL_QUEUE);
}//if
}//if
- /* --------------------------------------------------------------------------------- */
- // Committed read proceed without caring for locks immediately down here except when
- // the tuple was deleted permanently and no new operation has inserted it again.
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
+ // Committed read proceed without caring for locks immediately
+ // down here except when the tuple was deleted permanently
+ // and no new operation has inserted it again.
+ /* ----------------------------------------------------------------------- */
putActiveScanOp(signal);
sendNextScanConf(signal);
return;
@@ -5668,14 +6587,15 @@ void Dbacc::initScanFragmentPart(Signal* signal)
DirRangePtr cnfDirRangePtr;
DirectoryarrayPtr cnfDirptr;
Page8Ptr cnfPageidptr;
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
// Set the active fragment part.
// Set the current bucket scanned to the first.
// Start with the first lap.
// Remember the number of buckets at start of the scan.
- // Set the minimum and maximum to values that will always be smaller and larger than.
+ // Set the minimum and maximum to values that will always be smaller and
+ // larger than.
// Reset the scan indicator on the first bucket.
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
scanPtr.p->activeLocalFrag = fragrecptr.i;
scanPtr.p->nextBucketIndex = 0; /* INDEX OF SCAN BUCKET */
scanPtr.p->scanBucketState = ScanRec::FIRST_LAP;
@@ -5694,11 +6614,11 @@ void Dbacc::initScanFragmentPart(Signal* signal)
releaseScanBucket(signal);
}//Dbacc::initScanFragmentPart()
-/* --------------------------------------------------------------------------------- */
-/* FLAG = 6 = ZCOPY_CLOSE THE SCAN PROCESS IS READY OR ABORTED. ALL OPERATION IN THE */
-/* ACTIVE OR WAIT QUEUE ARE RELEASED, SCAN FLAG OF ROOT FRAG IS RESET AND THE SCAN */
-/* RECORD IS RELEASED. */
-/* --------------------------------------------------------------------------------- */
+/* -------------------------------------------------------------------------
+ * FLAG = 6 = ZCOPY_CLOSE THE SCAN PROCESS IS READY OR ABORTED.
+ * ALL OPERATION IN THE ACTIVE OR WAIT QUEUE ARE RELEASED,
+ * SCAN FLAG OF ROOT FRAG IS RESET AND THE SCAN RECORD IS RELEASED.
+ * ------------------------------------------------------------------------ */
void Dbacc::releaseScanLab(Signal* signal)
{
releaseAndCommitActiveOps(signal);
@@ -5737,8 +6657,17 @@ void Dbacc::releaseAndCommitActiveOps(Signal* signal)
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
if (!scanPtr.p->scanReadCommittedFlag) {
jam();
- commitOperation(signal);
+ if ((operationRecPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) ==
+ Operationrec::OP_STATE_EXECUTED)
+ {
+ commitOperation(signal);
+ }
+ else
+ {
+ abortOperation(signal);
+ }
}//if
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
takeOutActiveScanOp(signal);
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
@@ -5759,8 +6688,17 @@ void Dbacc::releaseAndCommitQueuedOps(Signal* signal)
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
if (!scanPtr.p->scanReadCommittedFlag) {
jam();
- commitOperation(signal);
+ if ((operationRecPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) ==
+ Operationrec::OP_STATE_EXECUTED)
+ {
+ commitOperation(signal);
+ }
+ else
+ {
+ abortOperation(signal);
+ }
}//if
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
takeOutReadyScanQueue(signal);
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
@@ -5783,6 +6721,7 @@ void Dbacc::releaseAndAbortLockedOps(Signal* signal) {
abortOperation(signal);
}//if
takeOutScanLockQueue(scanPtr.i);
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
operationRecPtr.i = trsoOperPtr.i;
@@ -5817,9 +6756,11 @@ void Dbacc::execACC_CHECK_SCAN(Signal* signal)
takeOutReadyScanQueue(signal);
fragrecptr.i = operationRecPtr.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- if (operationRecPtr.p->elementIsDisappeared == ZTRUE) {
+ if (operationRecPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED)
+ {
jam();
abortOperation(signal);
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
continue;
@@ -5906,7 +6847,8 @@ void Dbacc::execACC_TO_REQ(Signal* signal)
jamEntry();
tatrOpPtr.i = signal->theData[1]; /* OPER PTR OF ACC */
ptrCheckGuard(tatrOpPtr, coprecsize, operationrec);
- if (tatrOpPtr.p->operation == ZSCAN_OP) {
+ if ((tatrOpPtr.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP)
+ {
tatrOpPtr.p->transId1 = signal->theData[2];
tatrOpPtr.p->transId2 = signal->theData[3];
} else {
@@ -6003,32 +6945,28 @@ void Dbacc::initScanOpRec(Signal* signal)
scanPtr.p->scanOpsAllocated++;
+ Uint32 opbits = 0;
+ opbits |= ZSCAN_OP;
+ opbits |= scanPtr.p->scanLockMode ? Operationrec::OP_LOCK_MODE : 0;
+ opbits |= scanPtr.p->scanLockMode ? Operationrec::OP_ACC_LOCK_MODE : 0;
+ opbits |= scanPtr.p->scanReadCommittedFlag ?
+ Operationrec::OP_EXECUTED_DIRTY_READ : 0;
+ opbits |= Operationrec::OP_COMMIT_DELETE_CHECK;
operationRecPtr.p->userptr = RNIL;
operationRecPtr.p->scanRecPtr = scanPtr.i;
- operationRecPtr.p->operation = ZSCAN_OP;
- operationRecPtr.p->transactionstate = ACTIVE;
- operationRecPtr.p->commitDeleteCheckFlag = ZFALSE;
- operationRecPtr.p->lockMode = scanPtr.p->scanLockMode;
operationRecPtr.p->fid = fragrecptr.p->myfid;
operationRecPtr.p->fragptr = fragrecptr.i;
- operationRecPtr.p->elementIsDisappeared = ZFALSE;
operationRecPtr.p->nextParallelQue = RNIL;
operationRecPtr.p->prevParallelQue = RNIL;
operationRecPtr.p->nextSerialQue = RNIL;
operationRecPtr.p->prevSerialQue = RNIL;
- operationRecPtr.p->prevQueOp = RNIL;
- operationRecPtr.p->nextQueOp = RNIL;
- operationRecPtr.p->keyinfoPage = RNIL; // Safety precaution
operationRecPtr.p->transId1 = scanPtr.p->scanTrid1;
operationRecPtr.p->transId2 = scanPtr.p->scanTrid2;
- operationRecPtr.p->lockOwner = ZFALSE;
- operationRecPtr.p->dirtyRead = 0;
- operationRecPtr.p->nodeType = 0; // Not a stand-by node
operationRecPtr.p->elementIsforward = tisoIsforward;
operationRecPtr.p->elementContainer = tisoContainerptr;
operationRecPtr.p->elementPointer = tisoElementptr;
operationRecPtr.p->elementPage = isoPageptr.i;
- operationRecPtr.p->isAccLockReq = ZFALSE;
+ operationRecPtr.p->m_op_bits = opbits;
tisoLocalPtr = tisoElementptr + tisoIsforward;
guard24 = fragrecptr.p->localkeylen - 1;
for (tisoTmp = 0; tisoTmp <= guard24; tisoTmp++) {
@@ -6038,7 +6976,6 @@ void Dbacc::initScanOpRec(Signal* signal)
tisoLocalPtr = tisoLocalPtr + tisoIsforward;
}//for
arrGuard(tisoLocalPtr, 2048);
- operationRecPtr.p->keydata[0] = isoPageptr.p->word32[tisoLocalPtr];
operationRecPtr.p->tupkeylen = fragrecptr.p->keyLength;
operationRecPtr.p->xfrmtupkeylen = 0; // not used
}//Dbacc::initScanOpRec()
@@ -6894,14 +7831,12 @@ void Dbacc::releaseOpRec(Signal* signal)
}
ndbrequire(opInList == false);
#endif
- ndbrequire(operationRecPtr.p->lockOwner == ZFALSE);
+ ndbrequire(operationRecPtr.p->m_op_bits == Operationrec::OP_INITIAL);
operationRecPtr.p->nextOp = cfreeopRec;
cfreeopRec = operationRecPtr.i; /* UPDATE FREE LIST OF OP RECORDS */
operationRecPtr.p->prevOp = RNIL;
- operationRecPtr.p->opState = FREE_OP;
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
}//Dbacc::releaseOpRec()
/* --------------------------------------------------------------------------------- */
@@ -7403,8 +8338,8 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal)
OperationrecPtr tmpOpPtr;
tmpOpPtr.i = recordNo;
ptrAss(tmpOpPtr, operationrec);
- infoEvent("Dbacc::operationrec[%d]: opState=%d, transid(0x%x, 0x%x)",
- tmpOpPtr.i, tmpOpPtr.p->opState, tmpOpPtr.p->transId1,
+ infoEvent("Dbacc::operationrec[%d]: transid(0x%x, 0x%x)",
+ tmpOpPtr.i, tmpOpPtr.p->transId1,
tmpOpPtr.p->transId2);
infoEvent("elementIsforward=%d, elementPage=%d, elementPointer=%d ",
tmpOpPtr.p->elementIsforward, tmpOpPtr.p->elementPage,
@@ -7412,30 +8347,19 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal)
infoEvent("fid=%d, fragptr=%d, hashvaluePart=%d ",
tmpOpPtr.p->fid, tmpOpPtr.p->fragptr,
tmpOpPtr.p->hashvaluePart);
- infoEvent("hashValue=%d, insertDeleteLen=%d, keyinfoPage=%d ",
- tmpOpPtr.p->hashValue, tmpOpPtr.p->insertDeleteLen,
- tmpOpPtr.p->keyinfoPage);
+ infoEvent("hashValue=%d", tmpOpPtr.p->hashValue);
infoEvent("nextLockOwnerOp=%d, nextOp=%d, nextParallelQue=%d ",
tmpOpPtr.p->nextLockOwnerOp, tmpOpPtr.p->nextOp,
tmpOpPtr.p->nextParallelQue);
- infoEvent("nextQueOp=%d, nextSerialQue=%d, prevOp=%d ",
- tmpOpPtr.p->nextQueOp, tmpOpPtr.p->nextSerialQue,
+ infoEvent("nextSerialQue=%d, prevOp=%d ",
+ tmpOpPtr.p->nextSerialQue,
tmpOpPtr.p->prevOp);
- infoEvent("prevLockOwnerOp=%d, prevParallelQue=%d, prevQueOp=%d ",
- tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue,
- tmpOpPtr.p->prevQueOp);
- infoEvent("prevSerialQue=%d, scanRecPtr=%d, longPagePtr=%d ",
- tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr,
- tmpOpPtr.p->longPagePtr);
- infoEvent("transactionstate=%d, elementIsDisappeared=%d, insertIsDone=%d ",
- tmpOpPtr.p->transactionstate, tmpOpPtr.p->elementIsDisappeared,
- tmpOpPtr.p->insertIsDone);
- infoEvent("lockMode=%d, lockOwner=%d, nodeType=%d ",
- tmpOpPtr.p->lockMode, tmpOpPtr.p->lockOwner,
- tmpOpPtr.p->nodeType);
- infoEvent("operation=%d, opSimple=%d, dirtyRead=%d,scanBits=%d ",
- tmpOpPtr.p->operation, tmpOpPtr.p->opSimple,
- tmpOpPtr.p->dirtyRead, tmpOpPtr.p->scanBits);
+ infoEvent("prevLockOwnerOp=%d, prevParallelQue=%d",
+ tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue);
+ infoEvent("prevSerialQue=%d, scanRecPtr=%d",
+ tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr);
+ infoEvent("m_op_bits=0x%x, scanBits=%d ",
+ tmpOpPtr.p->m_op_bits, tmpOpPtr.p->scanBits);
return;
}