summaryrefslogtreecommitdiff
path: root/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp')
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp115
1 files changed, 69 insertions, 46 deletions
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
index 8e01c1abccd..bbc856af4c4 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
@@ -312,7 +312,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
- removeAccLockOp(scan, req->accOperationPtr);
+ removeAccLockOp(scanPtr, req->accOperationPtr);
}
if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
jam();
@@ -466,7 +466,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
scan.m_state = ScanOp::Locked;
scan.m_accLockOp = lockReq->accOpPtr;
#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
+ if (debugFlags & (DebugScan | DebugLock)) {
debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl;
}
#endif
@@ -478,7 +478,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
scan.m_lockwait = true;
scan.m_accLockOp = lockReq->accOpPtr;
#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
+ if (debugFlags & (DebugScan | DebugLock)) {
debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl;
}
#endif
@@ -534,7 +534,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (accLockOp != RNIL) {
scan.m_accLockOp = RNIL;
// remember it until LQH unlocks it
- addAccLockOp(scan, accLockOp);
+ addAccLockOp(scanPtr, accLockOp);
} else {
ndbrequire(scan.m_readCommitted);
// operation RNIL in LQH would signal no tuple returned
@@ -589,7 +589,7 @@ Dbtux::execACCKEYCONF(Signal* signal)
c_scanOpPool.getPtr(scanPtr);
ScanOp& scan = *scanPtr.p;
#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
+ if (debugFlags & (DebugScan | DebugLock)) {
debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl;
}
#endif
@@ -634,7 +634,7 @@ Dbtux::execACCKEYREF(Signal* signal)
c_scanOpPool.getPtr(scanPtr);
ScanOp& scan = *scanPtr.p;
#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
+ if (debugFlags & (DebugScan | DebugLock)) {
debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl;
}
#endif
@@ -678,7 +678,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
c_scanOpPool.getPtr(scanPtr);
ScanOp& scan = *scanPtr.p;
#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
+ if (debugFlags & (DebugScan | DebugLock)) {
debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl;
}
#endif
@@ -1014,18 +1014,9 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
ScanOp& scan = *scanPtr.p;
ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL);
// unlock all not unlocked by LQH
- for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) {
- if (scan.m_accLockOps[i] != RNIL) {
- jam();
- AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
- lockReq->returnCode = RNIL;
- lockReq->requestInfo = AccLockReq::Abort;
- lockReq->accOpPtr = scan.m_accLockOps[i];
- EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
- jamEntry();
- ndbrequire(lockReq->returnCode == AccLockReq::Success);
- scan.m_accLockOps[i] = RNIL;
- }
+ if (! scan.m_accLockOps.isEmpty()) {
+ jam();
+ abortAccLockOps(signal, scanPtr);
}
// send conf
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
@@ -1039,44 +1030,76 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
}
void
-Dbtux::addAccLockOp(ScanOp& scan, Uint32 accLockOp)
+Dbtux::abortAccLockOps(Signal* signal, ScanOpPtr scanPtr)
{
- ndbrequire(accLockOp != RNIL);
- Uint32* list = scan.m_accLockOps;
- bool ok = false;
- for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) {
- ndbrequire(list[i] != accLockOp);
- if (! ok && list[i] == RNIL) {
- list[i] = accLockOp;
- ok = true;
- // continue check for duplicates
- }
+ ScanOp& scan = *scanPtr.p;
+#ifdef VM_TRACE
+ if (debugFlags & (DebugScan | DebugLock)) {
+ debugOut << "Abort locks in scan " << scanPtr.i << " " << scan << endl;
}
- if (! ok) {
- unsigned i = scan.m_maxAccLockOps;
- if (i < MaxAccLockOps) {
- list[i] = accLockOp;
- ok = true;
- scan.m_maxAccLockOps = i + 1;
- }
+#endif
+ LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
+ ScanLockPtr lockPtr;
+ while (list.first(lockPtr)) {
+ jam();
+ AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
+ lockReq->returnCode = RNIL;
+ lockReq->requestInfo = AccLockReq::Abort;
+ lockReq->accOpPtr = lockPtr.p->m_accLockOp;
+ EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
+ jamEntry();
+ ndbrequire(lockReq->returnCode == AccLockReq::Success);
+ list.release(lockPtr);
}
- ndbrequire(ok);
}
void
-Dbtux::removeAccLockOp(ScanOp& scan, Uint32 accLockOp)
+Dbtux::addAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp)
{
+ ScanOp& scan = *scanPtr.p;
+#ifdef VM_TRACE
+ if (debugFlags & (DebugScan | DebugLock)) {
+ debugOut << "Add lock " << hex << accLockOp << dec
+ << " to scan " << scanPtr.i << " " << scan << endl;
+ }
+#endif
+ LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
+ ScanLockPtr lockPtr;
+#ifdef VM_TRACE
+ list.first(lockPtr);
+ while (lockPtr.i != RNIL) {
+ ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
+ list.next(lockPtr);
+ }
+#endif
+ bool ok = list.seize(lockPtr);
+ ndbrequire(ok);
ndbrequire(accLockOp != RNIL);
- Uint32* list = scan.m_accLockOps;
- bool ok = false;
- for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) {
- if (list[i] == accLockOp) {
- list[i] = RNIL;
- ok = true;
+ lockPtr.p->m_accLockOp = accLockOp;
+}
+
+void
+Dbtux::removeAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp)
+{
+ ScanOp& scan = *scanPtr.p;
+#ifdef VM_TRACE
+ if (debugFlags & (DebugScan | DebugLock)) {
+ debugOut << "Remove lock " << hex << accLockOp << dec
+ << " from scan " << scanPtr.i << " " << scan << endl;
+ }
+#endif
+ LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
+ ScanLockPtr lockPtr;
+ list.first(lockPtr);
+ while (lockPtr.i != RNIL) {
+ if (lockPtr.p->m_accLockOp == accLockOp) {
+ jam();
break;
}
+ list.next(lockPtr);
}
- ndbrequire(ok);
+ ndbrequire(lockPtr.i != RNIL);
+ list.release(lockPtr);
}
/*