summaryrefslogtreecommitdiff
path: root/storage
diff options
context:
space:
mode:
authorunknown <jonas@perch.ndb.mysql.com>2006-05-22 17:11:05 +0200
committerunknown <jonas@perch.ndb.mysql.com>2006-05-22 17:11:05 +0200
commit3283735a2a5d3a32abc7e2aca730c81dd4e710be (patch)
tree94926fe6859105f8304905c09f7ce41e86ebda66 /storage
parent27a213f3e0e0ef770a7fffcfc4df53e90cd96760 (diff)
downloadmariadb-git-3283735a2a5d3a32abc7e2aca730c81dd4e710be.tar.gz
ndb - dbacc rewamp
fix so that getElement read localkey from lockowner instead of from page plus some cleanups storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: 1) Use OP_INITIAL instead of ~0 2) Use JBB instead of JBA (once that I temporary changed...) 3) Add more validation to validate_lock_queue (insert/delete) 4) make getElement read localkey from lockowner instead of from page
Diffstat (limited to 'storage')
-rw-r--r--storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp58
1 files changed, 48 insertions, 10 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
index 62a786d9f0e..4ee8a91f611 100644
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
@@ -448,7 +448,7 @@ void Dbacc::initialiseOperationRec(Signal* signal)
for (operationRecPtr.i = 0; operationRecPtr.i < coprecsize; operationRecPtr.i++) {
refresh_watch_dog();
ptrAss(operationRecPtr, operationrec);
- operationRecPtr.p->m_op_bits = ~0;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
operationRecPtr.p->nextOp = operationRecPtr.i + 1;
}//for
operationRecPtr.i = coprecsize - 1;
@@ -910,7 +910,7 @@ void Dbacc::execACCSEIZEREQ(Signal* signal)
ptrGuard(operationRecPtr);
operationRecPtr.p->userptr = tuserptr;
operationRecPtr.p->userblockref = tuserblockref;
- operationRecPtr.p->m_op_bits = ~0;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
/* ******************************< */
/* ACCSEIZECONF */
/* ******************************< */
@@ -1413,7 +1413,7 @@ conf:
sendAcckeyconf(signal);
sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF,
- signal, 6, JBA);
+ signal, 6, JBB);
}
operationRecPtr = save;
@@ -1688,6 +1688,21 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr)
}
}
+ bool exists = true;
+ switch (loPtr.p->m_op_bits & Operationrec::OP_MASK){
+ case ZREAD:
+ case ZINSERT:
+ case ZUPDATE:
+ case ZSCAN_OP:
+ exists = true;
+ break;
+ case ZDELETE:
+ exists = false;
+ break;
+ case ZWRITE:
+ vlqrequire(false);
+ }
+
// Validate parallel queue
{
bool many = false;
@@ -1739,6 +1754,26 @@ Dbacc::validate_lock_queue(OperationrecPtr opPtr)
{
vlqrequire(orlockmode == 0);
}
+
+ if (opstate == Operationrec::OP_STATE_RUNNING ||
+ opstate == Operationrec::OP_STATE_EXECUTED)
+ {
+ switch (lastP.p->m_op_bits & Operationrec::OP_MASK){
+ case ZREAD:
+ case ZUPDATE:
+ case ZSCAN_OP:
+ vlqrequire(exists);
+ break;
+ case ZDELETE:
+ vlqrequire(exists);
+ exists = false;
+ break;
+ case ZINSERT:
+ vlqrequire(!exists);
+ exists = true;
+ break;
+ }
+ }
}
if (lastP.i != loPtr.i)
@@ -2227,7 +2262,7 @@ void Dbacc::placeSerialQueue(OperationrecPtr lockOwnerPtr,
/* ------------------------------------------------------------------------- */
void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code)
{
- operationRecPtr.p->m_op_bits = ~0;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
/* ************************<< */
/* ACCKEYREF */
/* ************************<< */
@@ -2446,7 +2481,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal)
// init as in ACCSEIZEREQ
operationRecPtr.p->userptr = req->userPtr;
operationRecPtr.p->userblockref = req->userRef;
- operationRecPtr.p->m_op_bits = ~0;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
operationRecPtr.p->scanRecPtr = RNIL;
// do read with lock via ACCKEYREQ
Uint32 lockMode = (lockOp == AccLockReq::LockShared) ? 0 : 1;
@@ -3312,6 +3347,7 @@ Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr)
tgeElementHeader = gePageptr.p->word32[tgeElementptr];
tgeRemLen = tgeRemLen - TelemLen;
Uint32 hashValuePart;
+ Uint32 localkey1, localkey2;
lockOwnerPtr.i = RNIL;
lockOwnerPtr.p = NULL;
if (ElementHeader::getLocked(tgeElementHeader)) {
@@ -3319,14 +3355,16 @@ Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr)
lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader);
ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec);
hashValuePart = lockOwnerPtr.p->hashvaluePart;
+ localkey1 = lockOwnerPtr.p->localdata[0];
+ localkey2 = lockOwnerPtr.p->localdata[1];
} else {
jam();
hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader);
+ localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward];
+ localkey2 = 0;
}
if (hashValuePart == opHashValuePart) {
jam();
- Uint32 localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward];
- Uint32 localkey2 = 0;
bool found;
if (! searchLocalKey)
{
@@ -4644,7 +4682,7 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit)
{
jam();
report_dealloc(signal, opPtr.p);
- newOwner.p->localdata[0] = ~0;
+ newOwner.p->localdata[0] = ~(Uint32)0;
}
else
{
@@ -4692,7 +4730,7 @@ Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit)
if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
{
report_dealloc(signal, opPtr.p);
- newOwner.p->localdata[0] = ~0;
+ newOwner.p->localdata[0] = ~(Uint32)0;
}
else
{
@@ -4840,7 +4878,7 @@ conf:
sendAcckeyconf(signal);
sendSignal(newOwner.p->userblockref, GSN_ACCKEYCONF,
- signal, 6, JBA);
+ signal, 6, JBB);
operationRecPtr = save;
return;