summaryrefslogtreecommitdiff
path: root/ndb/src/kernel/blocks
diff options
context:
space:
mode:
authorunknown <joreland@mysql.com>2005-04-12 17:54:34 +0200
committerunknown <joreland@mysql.com>2005-04-12 17:54:34 +0200
commit59acb039fbe99a1597fa67bbf95f00bd99c0ffb1 (patch)
treedc5a6eeca5c5e21e46f4c14c86d20ea7ec0720b1 /ndb/src/kernel/blocks
parent3fcbdd761c99a83fd48330d4e141fc3d59d53ab2 (diff)
downloadmariadb-git-59acb039fbe99a1597fa67bbf95f00bd99c0ffb1.tar.gz
bug#9749 - ndb lock upgrade
handle more cases... ndb/src/kernel/blocks/dbacc/Dbacc.hpp: bug#9749 - ndb lock upgrade ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: bug#9749 - ndb lock upgrade ndb/test/ndbapi/testOperations.cpp: bug#9749 - ndb lock upgrade
Diffstat (limited to 'ndb/src/kernel/blocks')
-rw-r--r--ndb/src/kernel/blocks/dbacc/Dbacc.hpp2
-rw-r--r--ndb/src/kernel/blocks/dbacc/DbaccMain.cpp143
2 files changed, 144 insertions, 1 deletions
diff --git a/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
index 64b947b5462..aaa4aca7b00 100644
--- a/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
+++ b/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
@@ -1100,6 +1100,8 @@ private:
Uint32 executeNextOperation(Signal* signal);
void releaselock(Signal* signal);
void takeOutFragWaitQue(Signal* signal);
+ void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner,
+ OperationrecPtr release_op);
void allocOverflowPage(Signal* signal);
bool getrootfragmentrec(Signal* signal, RootfragmentrecPtr&, Uint32 fragId);
void insertLockOwnersList(Signal* signal, const OperationrecPtr&);
diff --git a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
index 17c5a31cbed..28956de198c 100644
--- a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
+++ b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
@@ -5802,9 +5802,148 @@ void Dbacc::commitOperation(Signal* signal)
ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
}//if
- }//if
+
+ /**
+ * Check possible lock upgrade
+ * 1) Find lock owner
+ * 2) Count transactions in parallel que
+ * 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner)
+ * upgrade next serial
+ */
+ if(operationRecPtr.p->lockMode)
+ {
+ jam();
+ /**
+ * Committing a non shared operation can't lead to lock upgrade
+ */
+ return;
+ }
+
+ OperationrecPtr lock_owner;
+ lock_owner.i = operationRecPtr.p->prevParallelQue;
+ ptrCheckGuard(lock_owner, coprecsize, operationrec);
+ Uint32 transid[2] = { lock_owner.p->transId1,
+ lock_owner.p->transId2 };
+
+
+ while(lock_owner.p->prevParallelQue != RNIL)
+ {
+ lock_owner.i = lock_owner.p->prevParallelQue;
+ ptrCheckGuard(lock_owner, coprecsize, operationrec);
+
+ if(lock_owner.p->transId1 != transid[0] ||
+ lock_owner.p->transId2 != transid[1])
+ {
+ jam();
+ /**
+ * If more than 1 trans in lock queue -> no lock upgrade
+ */
+ return;
+ }
+ }
+
+ check_lock_upgrade(signal, lock_owner, operationRecPtr);
+ }
}//Dbacc::commitOperation()
+void
+Dbacc::check_lock_upgrade(Signal* signal,
+ OperationrecPtr lock_owner,
+ OperationrecPtr release_op)
+{
+ if((lock_owner.p->transId1 == release_op.p->transId1 &&
+ lock_owner.p->transId2 == release_op.p->transId2) ||
+ release_op.p->lockMode ||
+ lock_owner.p->nextSerialQue == RNIL)
+ {
+ jam();
+ /**
+ * No lock upgrade if same trans or lock owner has no serial queue
+ * or releasing non shared op
+ */
+ return;
+ }
+
+ OperationrecPtr next;
+ next.i = lock_owner.p->nextSerialQue;
+ ptrCheckGuard(next, coprecsize, operationrec);
+
+ if(lock_owner.p->transId1 != next.p->transId1 ||
+ lock_owner.p->transId2 != next.p->transId2)
+ {
+ jam();
+ /**
+ * No lock upgrad if !same trans in serial queue
+ */
+ return;
+ }
+
+ tgnptMainOpPtr = lock_owner;
+ getNoParallelTransaction(signal);
+ if (tgnptNrTransaction > 1)
+ {
+ jam();
+ /**
+ * No lock upgrade if more than 1 transaction in parallell queue
+ */
+ return;
+ }
+
+ OperationrecPtr tmp;
+ tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue;
+ if(tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ ndbassert(tmp.p->prevSerialQue == next.i);
+ tmp.p->prevSerialQue = lock_owner.i;
+ }
+ next.p->nextSerialQue = next.p->prevSerialQue = RNIL;
+
+ // Find end of parallell que
+ tmp = lock_owner;
+ tmp.p->lockMode= 1;
+ while(tmp.p->nextParallelQue != RNIL)
+ {
+ jam();
+ tmp.i = tmp.p->nextParallelQue;
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ tmp.p->lockMode= 1;
+ }
+
+ next.p->prevParallelQue = tmp.i;
+ tmp.p->nextParallelQue = next.i;
+
+ OperationrecPtr save = operationRecPtr;
+
+ Uint32 TelementIsDisappeared = 0; // lock upgrade = all reads
+ Uint32 ThashValue = lock_owner.p->hashValue;
+ Uint32 localdata[2];
+ localdata[0] = lock_owner.p->localdata[0];
+ localdata[1] = lock_owner.p->localdata[1];
+ do {
+ next.p->elementIsDisappeared = TelementIsDisappeared;
+ next.p->hashValue = ThashValue;
+ next.p->localdata[0] = localdata[0];
+ next.p->localdata[1] = localdata[1];
+
+ operationRecPtr = next;
+ ndbassert(next.p->lockMode);
+ TelementIsDisappeared = executeNextOperation(signal);
+ if (next.p->nextParallelQue != RNIL)
+ {
+ jam();
+ next.i = next.p->nextParallelQue;
+ ptrCheckGuard(next, coprecsize, operationrec);
+ } else {
+ jam();
+ break;
+ }//if
+ } while (1);
+
+ operationRecPtr = save;
+
+}
+
/* ------------------------------------------------------------------------- */
/* RELEASELOCK */
/* RESETS LOCK OF AN ELEMENT. */
@@ -5841,6 +5980,8 @@ void Dbacc::releaselock(Signal* signal)
ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i;
}//if
+
+ check_lock_upgrade(signal, copyInOperPtr, operationRecPtr);
/* --------------------------------------------------------------------------------- */
/* SINCE THERE ARE STILL ITEMS IN THE PARALLEL QUEUE WE NEED NOT WORRY ABOUT */
/* STARTING QUEUED OPERATIONS. THUS WE CAN END HERE. */