diff options
author | unknown <jonas@perch.ndb.mysql.com> | 2006-05-22 17:11:05 +0200 |
---|---|---|
committer | unknown <jonas@perch.ndb.mysql.com> | 2006-05-22 17:11:05 +0200 |
commit | 3283735a2a5d3a32abc7e2aca730c81dd4e710be (patch) | |
tree | 94926fe6859105f8304905c09f7ce41e86ebda66 /storage | |
parent | 27a213f3e0e0ef770a7fffcfc4df53e90cd96760 (diff) | |
download | mariadb-git-3283735a2a5d3a32abc7e2aca730c81dd4e710be.tar.gz |
ndb - dbacc rewamp
fix so that getElement read localkey from lockowner instead of from page
plus some cleanups
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
1) Use OP_INITIAL instead of ~0
2) Use JBB instead of JBA (once that I temporary changed...)
3) Add more validation to validate_lock_queue (insert/delete)
4) make getElement read localkey from lockowner instead of from page
Diffstat (limited to 'storage')
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp | 58 |
1 files changed, 48 insertions, 10 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 62a786d9f0e..4ee8a91f611 100644 --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -448,7 +448,7 @@ void Dbacc::initialiseOperationRec(Signal* signal) for (operationRecPtr.i = 0; operationRecPtr.i < coprecsize; operationRecPtr.i++) { refresh_watch_dog(); ptrAss(operationRecPtr, operationrec); - operationRecPtr.p->m_op_bits = ~0; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; operationRecPtr.p->nextOp = operationRecPtr.i + 1; }//for operationRecPtr.i = coprecsize - 1; @@ -910,7 +910,7 @@ void Dbacc::execACCSEIZEREQ(Signal* signal) ptrGuard(operationRecPtr); operationRecPtr.p->userptr = tuserptr; operationRecPtr.p->userblockref = tuserblockref; - operationRecPtr.p->m_op_bits = ~0; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; /* ******************************< */ /* ACCSEIZECONF */ /* ******************************< */ @@ -1413,7 +1413,7 @@ conf: sendAcckeyconf(signal); sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF, - signal, 6, JBA); + signal, 6, JBB); } operationRecPtr = save; @@ -1688,6 +1688,21 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr) } } + bool exists = true; + switch (loPtr.p->m_op_bits & Operationrec::OP_MASK){ + case ZREAD: + case ZINSERT: + case ZUPDATE: + case ZSCAN_OP: + exists = true; + break; + case ZDELETE: + exists = false; + break; + case ZWRITE: + vlqrequire(false); + } + // Validate parallel queue { bool many = false; @@ -1739,6 +1754,26 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr) { vlqrequire(orlockmode == 0); } + + if (opstate == Operationrec::OP_STATE_RUNNING || + opstate == Operationrec::OP_STATE_EXECUTED) + { + switch (lastP.p->m_op_bits & Operationrec::OP_MASK){ + case ZREAD: + case ZUPDATE: + case ZSCAN_OP: + vlqrequire(exists); + break; + case ZDELETE: + vlqrequire(exists); + exists = false; + break; + case ZINSERT: + vlqrequire(!exists); + exists = true; + break; + } + } } if (lastP.i != loPtr.i) @@ -2227,7 +2262,7 @@ void Dbacc::placeSerialQueue(OperationrecPtr lockOwnerPtr, /* ------------------------------------------------------------------------- */ void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code) { - operationRecPtr.p->m_op_bits = ~0; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; /* ************************<< */ /* ACCKEYREF */ /* ************************<< */ @@ -2446,7 +2481,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal) // init as in ACCSEIZEREQ operationRecPtr.p->userptr = req->userPtr; operationRecPtr.p->userblockref = req->userRef; - operationRecPtr.p->m_op_bits = ~0; + operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL; operationRecPtr.p->scanRecPtr = RNIL; // do read with lock via ACCKEYREQ Uint32 lockMode = (lockOp == AccLockReq::LockShared) ? 0 : 1; @@ -3312,6 +3347,7 @@ Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr) tgeElementHeader = gePageptr.p->word32[tgeElementptr]; tgeRemLen = tgeRemLen - TelemLen; Uint32 hashValuePart; + Uint32 localkey1, localkey2; lockOwnerPtr.i = RNIL; lockOwnerPtr.p = NULL; if (ElementHeader::getLocked(tgeElementHeader)) { @@ -3319,14 +3355,16 @@ Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr) lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader); ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec); hashValuePart = lockOwnerPtr.p->hashvaluePart; + localkey1 = lockOwnerPtr.p->localdata[0]; + localkey2 = lockOwnerPtr.p->localdata[1]; } else { jam(); hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader); + localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward]; + localkey2 = 0; } if (hashValuePart == opHashValuePart) { jam(); - Uint32 localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward]; - Uint32 localkey2 = 0; bool found; if (! searchLocalKey) { @@ -4644,7 +4682,7 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit) { jam(); report_dealloc(signal, opPtr.p); - newOwner.p->localdata[0] = ~0; + newOwner.p->localdata[0] = ~(Uint32)0; } else { @@ -4692,7 +4730,7 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit) if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) { report_dealloc(signal, opPtr.p); - newOwner.p->localdata[0] = ~0; + newOwner.p->localdata[0] = ~(Uint32)0; } else { @@ -4840,7 +4878,7 @@ conf: sendAcckeyconf(signal); sendSignal(newOwner.p->userblockref, GSN_ACCKEYCONF, - signal, 6, JBA); + signal, 6, JBB); operationRecPtr = save; return; |