diff options
Diffstat (limited to 'ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp')
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp | 115 |
1 files changed, 69 insertions, 46 deletions
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp index 8e01c1abccd..bbc856af4c4 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp @@ -312,7 +312,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); jamEntry(); ndbrequire(lockReq->returnCode == AccLockReq::Success); - removeAccLockOp(scan, req->accOperationPtr); + removeAccLockOp(scanPtr, req->accOperationPtr); } if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) { jam(); @@ -466,7 +466,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) scan.m_state = ScanOp::Locked; scan.m_accLockOp = lockReq->accOpPtr; #ifdef VM_TRACE - if (debugFlags & DebugScan) { + if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl; } #endif @@ -478,7 +478,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) scan.m_lockwait = true; scan.m_accLockOp = lockReq->accOpPtr; #ifdef VM_TRACE - if (debugFlags & DebugScan) { + if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl; } #endif @@ -534,7 +534,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) if (accLockOp != RNIL) { scan.m_accLockOp = RNIL; // remember it until LQH unlocks it - addAccLockOp(scan, accLockOp); + addAccLockOp(scanPtr, accLockOp); } else { ndbrequire(scan.m_readCommitted); // operation RNIL in LQH would signal no tuple returned @@ -589,7 +589,7 @@ Dbtux::execACCKEYCONF(Signal* signal) c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE - if (debugFlags & DebugScan) { + if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl; } #endif @@ -634,7 +634,7 @@ Dbtux::execACCKEYREF(Signal* signal) c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE - if (debugFlags & DebugScan) { + if (debugFlags & (DebugScan | DebugLock)) { debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl; } #endif @@ -678,7 +678,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal) c_scanOpPool.getPtr(scanPtr); ScanOp& scan = *scanPtr.p; #ifdef VM_TRACE - if (debugFlags & DebugScan) { + if (debugFlags & (DebugScan | DebugLock)) { debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl; } #endif @@ -1014,18 +1014,9 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) ScanOp& scan = *scanPtr.p; ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL); // unlock all not unlocked by LQH - for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { - if (scan.m_accLockOps[i] != RNIL) { - jam(); - AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); - lockReq->returnCode = RNIL; - lockReq->requestInfo = AccLockReq::Abort; - lockReq->accOpPtr = scan.m_accLockOps[i]; - EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); - jamEntry(); - ndbrequire(lockReq->returnCode == AccLockReq::Success); - scan.m_accLockOps[i] = RNIL; - } + if (! scan.m_accLockOps.isEmpty()) { + jam(); + abortAccLockOps(signal, scanPtr); } // send conf NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); @@ -1039,44 +1030,76 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) } void -Dbtux::addAccLockOp(ScanOp& scan, Uint32 accLockOp) +Dbtux::abortAccLockOps(Signal* signal, ScanOpPtr scanPtr) { - ndbrequire(accLockOp != RNIL); - Uint32* list = scan.m_accLockOps; - bool ok = false; - for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { - ndbrequire(list[i] != accLockOp); - if (! ok && list[i] == RNIL) { - list[i] = accLockOp; - ok = true; - // continue check for duplicates - } + ScanOp& scan = *scanPtr.p; +#ifdef VM_TRACE + if (debugFlags & (DebugScan | DebugLock)) { + debugOut << "Abort locks in scan " << scanPtr.i << " " << scan << endl; } - if (! ok) { - unsigned i = scan.m_maxAccLockOps; - if (i < MaxAccLockOps) { - list[i] = accLockOp; - ok = true; - scan.m_maxAccLockOps = i + 1; - } +#endif + LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps); + ScanLockPtr lockPtr; + while (list.first(lockPtr)) { + jam(); + AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); + lockReq->returnCode = RNIL; + lockReq->requestInfo = AccLockReq::Abort; + lockReq->accOpPtr = lockPtr.p->m_accLockOp; + EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); + jamEntry(); + ndbrequire(lockReq->returnCode == AccLockReq::Success); + list.release(lockPtr); } - ndbrequire(ok); } void -Dbtux::removeAccLockOp(ScanOp& scan, Uint32 accLockOp) +Dbtux::addAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp) { + ScanOp& scan = *scanPtr.p; +#ifdef VM_TRACE + if (debugFlags & (DebugScan | DebugLock)) { + debugOut << "Add lock " << hex << accLockOp << dec + << " to scan " << scanPtr.i << " " << scan << endl; + } +#endif + LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps); + ScanLockPtr lockPtr; +#ifdef VM_TRACE + list.first(lockPtr); + while (lockPtr.i != RNIL) { + ndbrequire(lockPtr.p->m_accLockOp != accLockOp); + list.next(lockPtr); + } +#endif + bool ok = list.seize(lockPtr); + ndbrequire(ok); ndbrequire(accLockOp != RNIL); - Uint32* list = scan.m_accLockOps; - bool ok = false; - for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { - if (list[i] == accLockOp) { - list[i] = RNIL; - ok = true; + lockPtr.p->m_accLockOp = accLockOp; +} + +void +Dbtux::removeAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp) +{ + ScanOp& scan = *scanPtr.p; +#ifdef VM_TRACE + if (debugFlags & (DebugScan | DebugLock)) { + debugOut << "Remove lock " << hex << accLockOp << dec + << " from scan " << scanPtr.i << " " << scan << endl; + } +#endif + LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps); + ScanLockPtr lockPtr; + list.first(lockPtr); + while (lockPtr.i != RNIL) { + if (lockPtr.p->m_accLockOp == accLockOp) { + jam(); break; } + list.next(lockPtr); } - ndbrequire(ok); + ndbrequire(lockPtr.i != RNIL); + list.release(lockPtr); } /* |