summaryrefslogtreecommitdiff
path: root/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp')
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp1209
1 files changed, 0 insertions, 1209 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
deleted file mode 100644
index 572be897a13..00000000000
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
+++ /dev/null
@@ -1,1209 +0,0 @@
-/* Copyright (c) 2003, 2005-2008 MySQL AB
- Use is subject to license terms
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; version 2 of the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
-
-#define DBTUP_C
-#define DBTUP_SCAN_CPP
-#include "Dbtup.hpp"
-#include <signaldata/AccScan.hpp>
-#include <signaldata/NextScan.hpp>
-#include <signaldata/AccLock.hpp>
-#include <md5_hash.hpp>
-
-#undef jam
-#undef jamEntry
-#define jam() { jamLine(32000 + __LINE__); }
-#define jamEntry() { jamEntryLine(32000 + __LINE__); }
-
-#ifdef VM_TRACE
-#define dbg(x) globalSignalLoggers.log x
-#else
-#define dbg(x)
-#endif
-
-void
-Dbtup::execACC_SCANREQ(Signal* signal)
-{
- jamEntry();
- const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
- const AccScanReq* const req = &reqCopy;
- ScanOpPtr scanPtr;
- scanPtr.i = RNIL;
- do {
- // find table and fragment
- TablerecPtr tablePtr;
- tablePtr.i = req->tableId;
- ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
- FragrecordPtr fragPtr;
- Uint32 fragId = req->fragmentNo;
- fragPtr.i = RNIL;
- getFragmentrec(fragPtr, fragId, tablePtr.p);
- ndbrequire(fragPtr.i != RNIL);
- Fragrecord& frag = *fragPtr.p;
- // flags
- Uint32 bits = 0;
-
-
- if (AccScanReq::getLcpScanFlag(req->requestInfo))
- {
- jam();
- bits |= ScanOp::SCAN_LCP;
- c_scanOpPool.getPtr(scanPtr, c_lcp_scan_op);
- }
- else
- {
- // seize from pool and link to per-fragment list
- LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
- if (! list.seize(scanPtr)) {
- jam();
- break;
- }
- }
-
- if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
- && tablePtr.p->m_no_of_disk_attributes)
- {
- bits |= ScanOp::SCAN_DD;
- }
-
- bool mm = (bits & ScanOp::SCAN_DD);
- if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) {
- bits |= ScanOp::SCAN_VS;
-
- // disk pages have fixed page format
- ndbrequire(! (bits & ScanOp::SCAN_DD));
- }
- if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) {
- if (AccScanReq::getLockMode(req->requestInfo) == 0)
- bits |= ScanOp::SCAN_LOCK_SH;
- else
- bits |= ScanOp::SCAN_LOCK_EX;
- }
-
- if (AccScanReq::getNRScanFlag(req->requestInfo))
- {
- jam();
- bits |= ScanOp::SCAN_NR;
- scanPtr.p->m_endPage = req->maxPage;
- if (req->maxPage != RNIL && req->maxPage > frag.noOfPages)
- {
- ndbout_c("%u %u endPage: %u (noOfPages: %u)",
- tablePtr.i, fragId,
- req->maxPage, fragPtr.p->noOfPages);
- }
- }
- else
- {
- jam();
- scanPtr.p->m_endPage = RNIL;
- }
-
- if (AccScanReq::getLcpScanFlag(req->requestInfo))
- {
- jam();
- ndbrequire((bits & ScanOp::SCAN_DD) == 0);
- ndbrequire((bits & ScanOp::SCAN_LOCK) == 0);
- }
-
- // set up scan op
- new (scanPtr.p) ScanOp();
- ScanOp& scan = *scanPtr.p;
- scan.m_state = ScanOp::First;
- scan.m_bits = bits;
- scan.m_userPtr = req->senderData;
- scan.m_userRef = req->senderRef;
- scan.m_tableId = tablePtr.i;
- scan.m_fragId = frag.fragmentId;
- scan.m_fragPtrI = fragPtr.i;
- scan.m_transId1 = req->transId1;
- scan.m_transId2 = req->transId2;
- scan.m_savePointId = req->savePointId;
-
- // conf
- AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
- conf->scanPtr = req->senderData;
- conf->accPtr = scanPtr.i;
- conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
- sendSignal(req->senderRef, GSN_ACC_SCANCONF,
- signal, AccScanConf::SignalLength, JBB);
- return;
- } while (0);
- if (scanPtr.i != RNIL) {
- jam();
- releaseScanOp(scanPtr);
- }
- // LQH does not handle REF
- signal->theData[0] = 0x313;
- sendSignal(req->senderRef, GSN_ACC_SCANREF, signal, 1, JBB);
-}
-
-void
-Dbtup::execNEXT_SCANREQ(Signal* signal)
-{
- jamEntry();
- const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
- const NextScanReq* const req = &reqCopy;
- ScanOpPtr scanPtr;
- c_scanOpPool.getPtr(scanPtr, req->accPtr);
- ScanOp& scan = *scanPtr.p;
- switch (req->scanFlag) {
- case NextScanReq::ZSCAN_NEXT:
- jam();
- break;
- case NextScanReq::ZSCAN_NEXT_COMMIT:
- jam();
- case NextScanReq::ZSCAN_COMMIT:
- jam();
- if ((scan.m_bits & ScanOp::SCAN_LOCK) != 0) {
- jam();
- AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
- lockReq->returnCode = RNIL;
- lockReq->requestInfo = AccLockReq::Unlock;
- lockReq->accOpPtr = req->accOperationPtr;
- EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
- signal, AccLockReq::UndoSignalLength);
- jamEntry();
- ndbrequire(lockReq->returnCode == AccLockReq::Success);
- removeAccLockOp(scan, req->accOperationPtr);
- }
- if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
- NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
- conf->scanPtr = scan.m_userPtr;
- unsigned signalLength = 1;
- sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
- signal, signalLength, JBB);
- return;
- }
- break;
- case NextScanReq::ZSCAN_CLOSE:
- jam();
- if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
- jam();
- ndbrequire(scan.m_accLockOp != RNIL);
- // use ACC_ABORTCONF to flush out any reply in job buffer
- AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
- lockReq->returnCode = RNIL;
- lockReq->requestInfo = AccLockReq::AbortWithConf;
- lockReq->accOpPtr = scan.m_accLockOp;
- EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
- signal, AccLockReq::UndoSignalLength);
- jamEntry();
- ndbrequire(lockReq->returnCode == AccLockReq::Success);
- scan.m_state = ScanOp::Aborting;
- return;
- }
- if (scan.m_state == ScanOp::Locked) {
- jam();
- ndbrequire(scan.m_accLockOp != RNIL);
- AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
- lockReq->returnCode = RNIL;
- lockReq->requestInfo = AccLockReq::Abort;
- lockReq->accOpPtr = scan.m_accLockOp;
- EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
- signal, AccLockReq::UndoSignalLength);
- jamEntry();
- ndbrequire(lockReq->returnCode == AccLockReq::Success);
- scan.m_accLockOp = RNIL;
- }
- scan.m_state = ScanOp::Aborting;
- scanClose(signal, scanPtr);
- return;
- case NextScanReq::ZSCAN_NEXT_ABORT:
- jam();
- default:
- jam();
- ndbrequire(false);
- break;
- }
- // start looking for next scan result
- AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
- checkReq->accPtr = scanPtr.i;
- checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
- EXECUTE_DIRECT(DBTUP, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
- jamEntry();
-}
-
-void
-Dbtup::execACC_CHECK_SCAN(Signal* signal)
-{
- jamEntry();
- const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
- const AccCheckScan* const req = &reqCopy;
- ScanOpPtr scanPtr;
- c_scanOpPool.getPtr(scanPtr, req->accPtr);
- ScanOp& scan = *scanPtr.p;
- // fragment
- FragrecordPtr fragPtr;
- fragPtr.i = scan.m_fragPtrI;
- ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
- Fragrecord& frag = *fragPtr.p;
- if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
- jam();
- signal->theData[0] = scan.m_userPtr;
- signal->theData[1] = true;
- EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
- jamEntry();
- return;
- }
- if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
- jam();
- // LQH asks if we are waiting for lock and we tell it to ask again
- NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
- conf->scanPtr = scan.m_userPtr;
- conf->accOperationPtr = RNIL; // no tuple returned
- conf->fragId = frag.fragmentId;
- unsigned signalLength = 3;
- // if TC has ordered scan close, it will be detected here
- sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
- signal, signalLength, JBB);
- return; // stop
- }
- if (scan.m_state == ScanOp::First) {
- jam();
- scanFirst(signal, scanPtr);
- }
- if (scan.m_state == ScanOp::Next) {
- jam();
- bool immediate = scanNext(signal, scanPtr);
- if (! immediate) {
- jam();
- // time-slicing via TUP or PGMAN
- return;
- }
- }
- scanReply(signal, scanPtr);
-}
-
-void
-Dbtup::scanReply(Signal* signal, ScanOpPtr scanPtr)
-{
- ScanOp& scan = *scanPtr.p;
- FragrecordPtr fragPtr;
- fragPtr.i = scan.m_fragPtrI;
- ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
- Fragrecord& frag = *fragPtr.p;
- // for reading tuple key in Current state
- Uint32* pkData = (Uint32*)c_dataBuffer;
- unsigned pkSize = 0;
- if (scan.m_state == ScanOp::Current) {
- // found an entry to return
- jam();
- ndbrequire(scan.m_accLockOp == RNIL);
- if (scan.m_bits & ScanOp::SCAN_LOCK) {
- jam();
- // read tuple key - use TUX routine
- const ScanPos& pos = scan.m_scanPos;
- const Local_key& key_mm = pos.m_key_mm;
- int ret = tuxReadPk(fragPtr.i, pos.m_realpid_mm, key_mm.m_page_idx,
- pkData, true);
- ndbrequire(ret > 0);
- pkSize = ret;
- dbg((DBTUP, "PK size=%d data=%08x", pkSize, pkData[0]));
- // get read lock or exclusive lock
- AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
- lockReq->returnCode = RNIL;
- lockReq->requestInfo = (scan.m_bits & ScanOp::SCAN_LOCK_SH) ?
- AccLockReq::LockShared : AccLockReq::LockExclusive;
- lockReq->accOpPtr = RNIL;
- lockReq->userPtr = scanPtr.i;
- lockReq->userRef = reference();
- lockReq->tableId = scan.m_tableId;
- lockReq->fragId = frag.fragmentId;
- lockReq->fragPtrI = RNIL; // no cached frag ptr yet
- lockReq->hashValue = md5_hash((Uint64*)pkData, pkSize);
- lockReq->tupAddr = key_mm.ref();
- lockReq->transId1 = scan.m_transId1;
- lockReq->transId2 = scan.m_transId2;
- EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
- signal, AccLockReq::LockSignalLength);
- jamEntry();
- switch (lockReq->returnCode) {
- case AccLockReq::Success:
- jam();
- scan.m_state = ScanOp::Locked;
- scan.m_accLockOp = lockReq->accOpPtr;
- break;
- case AccLockReq::IsBlocked:
- jam();
- // normal lock wait
- scan.m_state = ScanOp::Blocked;
- scan.m_bits |= ScanOp::SCAN_LOCK_WAIT;
- scan.m_accLockOp = lockReq->accOpPtr;
- // LQH will wake us up
- signal->theData[0] = scan.m_userPtr;
- signal->theData[1] = true;
- EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
- jamEntry();
- return;
- break;
- case AccLockReq::Refused:
- jam();
- // we cannot see deleted tuple (assert only)
- ndbassert(false);
- // skip it
- scan.m_state = ScanOp::Next;
- signal->theData[0] = scan.m_userPtr;
- signal->theData[1] = true;
- EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
- jamEntry();
- return;
- break;
- case AccLockReq::NoFreeOp:
- jam();
- // max ops should depend on max scans (assert only)
- ndbassert(false);
- // stay in Current state
- scan.m_state = ScanOp::Current;
- signal->theData[0] = scan.m_userPtr;
- signal->theData[1] = true;
- EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
- jamEntry();
- return;
- break;
- default:
- ndbrequire(false);
- break;
- }
- } else {
- scan.m_state = ScanOp::Locked;
- }
- }
-
- if (scan.m_state == ScanOp::Locked) {
- // we have lock or do not need one
- jam();
- // conf signal
- NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
- conf->scanPtr = scan.m_userPtr;
- // the lock is passed to LQH
- Uint32 accLockOp = scan.m_accLockOp;
- if (accLockOp != RNIL) {
- scan.m_accLockOp = RNIL;
- // remember it until LQH unlocks it
- addAccLockOp(scan, accLockOp);
- } else {
- ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK));
- // operation RNIL in LQH would signal no tuple returned
- accLockOp = (Uint32)-1;
- }
- const ScanPos& pos = scan.m_scanPos;
- conf->accOperationPtr = accLockOp;
- conf->fragId = frag.fragmentId;
- conf->localKey[0] = pos.m_key_mm.ref();
- conf->localKey[1] = 0;
- conf->localKeyLength = 1;
- unsigned signalLength = 6;
- if (scan.m_bits & ScanOp::SCAN_LOCK) {
- sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
- signal, signalLength, JBB);
- } else {
- Uint32 blockNo = refToBlock(scan.m_userRef);
- EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
- jamEntry();
- }
- // next time look for next entry
- scan.m_state = ScanOp::Next;
- return;
- }
- if (scan.m_state == ScanOp::Last ||
- scan.m_state == ScanOp::Invalid) {
- jam();
- NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
- conf->scanPtr = scan.m_userPtr;
- conf->accOperationPtr = RNIL;
- conf->fragId = RNIL;
- unsigned signalLength = 3;
- sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
- signal, signalLength, JBB);
- return;
- }
- ndbrequire(false);
-}
-
-/*
- * Lock succeeded (after delay) in ACC. If the lock is for current
- * entry, set state to Locked. If the lock is for an entry we were
- * moved away from, simply unlock it. Finally, if we are closing the
- * scan, do nothing since we have already sent an abort request.
- */
-void
-Dbtup::execACCKEYCONF(Signal* signal)
-{
- jamEntry();
- ScanOpPtr scanPtr;
- scanPtr.i = signal->theData[0];
- c_scanOpPool.getPtr(scanPtr);
- ScanOp& scan = *scanPtr.p;
- ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
- scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
- if (scan.m_state == ScanOp::Blocked) {
- // the lock wait was for current entry
- jam();
- scan.m_state = ScanOp::Locked;
- // LQH has the ball
- return;
- }
- if (scan.m_state != ScanOp::Aborting) {
- // we were moved, release lock
- jam();
- AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
- lockReq->returnCode = RNIL;
- lockReq->requestInfo = AccLockReq::Abort;
- lockReq->accOpPtr = scan.m_accLockOp;
- EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
- jamEntry();
- ndbrequire(lockReq->returnCode == AccLockReq::Success);
- scan.m_accLockOp = RNIL;
- // LQH has the ball
- return;
- }
- // lose the lock
- scan.m_accLockOp = RNIL;
- // continue at ACC_ABORTCONF
-}
-
-/*
- * Lock failed (after delay) in ACC. Probably means somebody ahead of
- * us in lock queue deleted the tuple.
- */
-void
-Dbtup::execACCKEYREF(Signal* signal)
-{
- jamEntry();
- ScanOpPtr scanPtr;
- scanPtr.i = signal->theData[0];
- c_scanOpPool.getPtr(scanPtr);
- ScanOp& scan = *scanPtr.p;
- ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
- scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
- if (scan.m_state != ScanOp::Aborting) {
- jam();
- // release the operation
- AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
- lockReq->returnCode = RNIL;
- lockReq->requestInfo = AccLockReq::Abort;
- lockReq->accOpPtr = scan.m_accLockOp;
- EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
- jamEntry();
- ndbrequire(lockReq->returnCode == AccLockReq::Success);
- scan.m_accLockOp = RNIL;
- // scan position should already have been moved (assert only)
- if (scan.m_state == ScanOp::Blocked) {
- jam();
- //ndbassert(false);
- if (scan.m_bits & ScanOp::SCAN_NR)
- {
- jam();
- scan.m_state = ScanOp::Next;
- scan.m_scanPos.m_get = ScanPos::Get_tuple;
- ndbout_c("Ignoring scan.m_state == ScanOp::Blocked, refetch");
- }
- else
- {
- jam();
- scan.m_state = ScanOp::Next;
- ndbout_c("Ignoring scan.m_state == ScanOp::Blocked");
- }
- }
- // LQH has the ball
- return;
- }
- // lose the lock
- scan.m_accLockOp = RNIL;
- // continue at ACC_ABORTCONF
-}
-
-/*
- * Received when scan is closing. This signal arrives after any
- * ACCKEYCON or ACCKEYREF which may have been in job buffer.
- */
-void
-Dbtup::execACC_ABORTCONF(Signal* signal)
-{
- jamEntry();
- ScanOpPtr scanPtr;
- scanPtr.i = signal->theData[0];
- c_scanOpPool.getPtr(scanPtr);
- ScanOp& scan = *scanPtr.p;
- ndbrequire(scan.m_state == ScanOp::Aborting);
- // most likely we are still in lock wait
- if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
- jam();
- scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
- scan.m_accLockOp = RNIL;
- }
- scanClose(signal, scanPtr);
-}
-
-void
-Dbtup::scanFirst(Signal*, ScanOpPtr scanPtr)
-{
- ScanOp& scan = *scanPtr.p;
- ScanPos& pos = scan.m_scanPos;
- Local_key& key = pos.m_key;
- const Uint32 bits = scan.m_bits;
- // fragment
- FragrecordPtr fragPtr;
- fragPtr.i = scan.m_fragPtrI;
- ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
- Fragrecord& frag = *fragPtr.p;
- // in the future should not pre-allocate pages
- if (frag.noOfPages == 0 && ((bits & ScanOp::SCAN_NR) == 0)) {
- jam();
- scan.m_state = ScanOp::Last;
- return;
- }
- if (! (bits & ScanOp::SCAN_DD)) {
- key.m_file_no = ZNIL;
- key.m_page_no = 0;
- pos.m_get = ScanPos::Get_page_mm;
- // for MM scan real page id is cached for efficiency
- pos.m_realpid_mm = RNIL;
- } else {
- Disk_alloc_info& alloc = frag.m_disk_alloc_info;
- // for now must check disk part explicitly
- if (alloc.m_extent_list.firstItem == RNIL) {
- jam();
- scan.m_state = ScanOp::Last;
- return;
- }
- pos.m_extent_info_ptr_i = alloc.m_extent_list.firstItem;
- Extent_info* ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
- key.m_file_no = ext->m_key.m_file_no;
- key.m_page_no = ext->m_first_page_no;
- pos.m_get = ScanPos::Get_page_dd;
- }
- key.m_page_idx = 0;
- // let scanNext() do the work
- scan.m_state = ScanOp::Next;
-}
-
-bool
-Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
-{
- ScanOp& scan = *scanPtr.p;
- ScanPos& pos = scan.m_scanPos;
- Local_key& key = pos.m_key;
- const Uint32 bits = scan.m_bits;
- // table
- TablerecPtr tablePtr;
- tablePtr.i = scan.m_tableId;
- ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
- Tablerec& table = *tablePtr.p;
- // fragment
- FragrecordPtr fragPtr;
- fragPtr.i = scan.m_fragPtrI;
- ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
- Fragrecord& frag = *fragPtr.p;
- // tuple found
- Tuple_header* th = 0;
- Uint32 thbits = 0;
- Uint32 loop_count = 0;
- Uint32 scanGCI = scanPtr.p->m_scanGCI;
- Uint32 foundGCI;
-
- const bool mm = (bits & ScanOp::SCAN_DD);
- const bool lcp = (bits & ScanOp::SCAN_LCP);
-
- Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
- Uint32 size = table.m_offsets[mm].m_fix_header_size;
-
- if (lcp && lcp_list != RNIL)
- goto found_lcp_keep;
-
- switch(pos.m_get){
- case ScanPos::Get_next_tuple:
- case ScanPos::Get_next_tuple_fs:
- jam();
- key.m_page_idx += size;
- // fall through
- case ScanPos::Get_tuple:
- case ScanPos::Get_tuple_fs:
- jam();
- /**
- * We need to refetch page after timeslice
- */
- pos.m_get = ScanPos::Get_page;
- break;
- default:
- break;
- }
-
- while (true) {
- switch (pos.m_get) {
- case ScanPos::Get_next_page:
- // move to next page
- jam();
- {
- if (! (bits & ScanOp::SCAN_DD))
- pos.m_get = ScanPos::Get_next_page_mm;
- else
- pos.m_get = ScanPos::Get_next_page_dd;
- }
- continue;
- case ScanPos::Get_page:
- // get real page
- jam();
- {
- if (! (bits & ScanOp::SCAN_DD))
- pos.m_get = ScanPos::Get_page_mm;
- else
- pos.m_get = ScanPos::Get_page_dd;
- }
- continue;
- case ScanPos::Get_next_page_mm:
- // move to next logical TUP page
- jam();
- {
- key.m_page_no++;
- if (key.m_page_no >= frag.noOfPages) {
- jam();
-
- if ((bits & ScanOp::SCAN_NR) && (scan.m_endPage != RNIL))
- {
- jam();
- if (key.m_page_no < scan.m_endPage)
- {
- jam();
- ndbout_c("scanning page %u", key.m_page_no);
- goto cont;
- }
- }
- // no more pages, scan ends
- pos.m_get = ScanPos::Get_undef;
- scan.m_state = ScanOp::Last;
- return true;
- }
- cont:
- key.m_page_idx = 0;
- pos.m_get = ScanPos::Get_page_mm;
- // clear cached value
- pos.m_realpid_mm = RNIL;
- }
- /*FALLTHRU*/
- case ScanPos::Get_page_mm:
- // get TUP real page
- jam();
- {
- if (pos.m_realpid_mm == RNIL) {
- jam();
- if (key.m_page_no < frag.noOfPages)
- pos.m_realpid_mm = getRealpid(fragPtr.p, key.m_page_no);
- else
- {
- ndbassert(bits & ScanOp::SCAN_NR);
- goto nopage;
- }
- }
- PagePtr pagePtr;
- c_page_pool.getPtr(pagePtr, pos.m_realpid_mm);
-
- if (pagePtr.p->page_state == ZEMPTY_MM) {
- // skip empty page
- jam();
- if (! (bits & ScanOp::SCAN_NR))
- {
- pos.m_get = ScanPos::Get_next_page_mm;
- break; // incr loop count
- }
- else
- {
- jam();
- pos.m_realpid_mm = RNIL;
- }
- }
- nopage:
- pos.m_page = pagePtr.p;
- pos.m_get = ScanPos::Get_tuple;
- }
- continue;
- case ScanPos::Get_next_page_dd:
- // move to next disk page
- jam();
- {
- Disk_alloc_info& alloc = frag.m_disk_alloc_info;
- Local_fragment_extent_list list(c_extent_pool, alloc.m_extent_list);
- Ptr<Extent_info> ext_ptr;
- c_extent_pool.getPtr(ext_ptr, pos.m_extent_info_ptr_i);
- Extent_info* ext = ext_ptr.p;
- key.m_page_no++;
- if (key.m_page_no >= ext->m_first_page_no + alloc.m_extent_size) {
- // no more pages in this extent
- jam();
- if (! list.next(ext_ptr)) {
- // no more extents, scan ends
- jam();
- pos.m_get = ScanPos::Get_undef;
- scan.m_state = ScanOp::Last;
- return true;
- } else {
- // move to next extent
- jam();
- pos.m_extent_info_ptr_i = ext_ptr.i;
- ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
- key.m_file_no = ext->m_key.m_file_no;
- key.m_page_no = ext->m_first_page_no;
- }
- }
- key.m_page_idx = 0;
- pos.m_get = ScanPos::Get_page_dd;
- /*
- read ahead for scan in disk order
- do read ahead every 8:th page
- */
- if ((bits & ScanOp::SCAN_DD) &&
- (((key.m_page_no - ext->m_first_page_no) & 7) == 0))
- {
- jam();
- // initialize PGMAN request
- Page_cache_client::Request preq;
- preq.m_page = pos.m_key;
- preq.m_callback = TheNULLCallback;
-
- // set maximum read ahead
- Uint32 read_ahead = m_max_page_read_ahead;
-
- while (true)
- {
- // prepare page read ahead in current extent
- Uint32 page_no = preq.m_page.m_page_no;
- Uint32 page_no_limit = page_no + read_ahead;
- Uint32 limit = ext->m_first_page_no + alloc.m_extent_size;
- if (page_no_limit > limit)
- {
- jam();
- // read ahead crosses extent, set limit for this extent
- read_ahead = page_no_limit - limit;
- page_no_limit = limit;
- // and make sure we only read one extra extent next time around
- if (read_ahead > alloc.m_extent_size)
- read_ahead = alloc.m_extent_size;
- }
- else
- {
- jam();
- read_ahead = 0; // no more to read ahead after this
- }
- // do read ahead pages for this extent
- while (page_no < page_no_limit)
- {
- // page request to PGMAN
- jam();
- preq.m_page.m_page_no = page_no;
- int flags = 0;
- // ignore result
- m_pgman.get_page(signal, preq, flags);
- jamEntry();
- page_no++;
- }
- if (!read_ahead || !list.next(ext_ptr))
- {
- // no more extents after this or read ahead done
- jam();
- break;
- }
- // move to next extent and initialize PGMAN request accordingly
- Extent_info* ext = c_extent_pool.getPtr(ext_ptr.i);
- preq.m_page.m_file_no = ext->m_key.m_file_no;
- preq.m_page.m_page_no = ext->m_first_page_no;
- }
- } // if ScanOp::SCAN_DD read ahead
- }
- /*FALLTHRU*/
- case ScanPos::Get_page_dd:
- // get global page in PGMAN cache
- jam();
- {
- // check if page is un-allocated or empty
- if (likely(! (bits & ScanOp::SCAN_NR)))
- {
- Tablespace_client tsman(signal, c_tsman,
- frag.fragTableId,
- frag.fragmentId,
- frag.m_tablespace_id);
- unsigned uncommitted, committed;
- uncommitted = committed = ~(unsigned)0;
- int ret = tsman.get_page_free_bits(&key, &uncommitted, &committed);
- ndbrequire(ret == 0);
- if (committed == 0 && uncommitted == 0) {
- // skip empty page
- jam();
- pos.m_get = ScanPos::Get_next_page_dd;
- break; // incr loop count
- }
- }
- // page request to PGMAN
- Page_cache_client::Request preq;
- preq.m_page = pos.m_key;
- preq.m_callback.m_callbackData = scanPtr.i;
- preq.m_callback.m_callbackFunction =
- safe_cast(&Dbtup::disk_page_tup_scan_callback);
- int flags = 0;
- int res = m_pgman.get_page(signal, preq, flags);
- jamEntry();
- if (res == 0) {
- jam();
- // request queued
- pos.m_get = ScanPos::Get_tuple;
- return false;
- }
- ndbrequire(res > 0);
- pos.m_page = (Page*)m_pgman.m_ptr.p;
- }
- pos.m_get = ScanPos::Get_tuple;
- continue;
- // get tuple
- // move to next tuple
- case ScanPos::Get_next_tuple:
- case ScanPos::Get_next_tuple_fs:
- // move to next fixed size tuple
- jam();
- {
- key.m_page_idx += size;
- pos.m_get = ScanPos::Get_tuple_fs;
- }
- /*FALLTHRU*/
- case ScanPos::Get_tuple:
- case ScanPos::Get_tuple_fs:
- // get fixed size tuple
- jam();
- {
- Fix_page* page = (Fix_page*)pos.m_page;
- if (key.m_page_idx + size <= Fix_page::DATA_WORDS)
- {
- pos.m_get = ScanPos::Get_next_tuple_fs;
- th = (Tuple_header*)&page->m_data[key.m_page_idx];
-
- if (likely(! (bits & ScanOp::SCAN_NR)))
- {
- jam();
- thbits = th->m_header_bits;
- if (! (thbits & Tuple_header::FREE))
- {
- goto found_tuple;
- }
- }
- else
- {
- if (pos.m_realpid_mm == RNIL)
- {
- jam();
- foundGCI = 0;
- goto found_deleted_rowid;
- }
- thbits = th->m_header_bits;
- if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
- foundGCI == 0)
- {
- if (! (thbits & Tuple_header::FREE))
- {
- jam();
- goto found_tuple;
- }
- else
- {
- goto found_deleted_rowid;
- }
- }
- else if (thbits != Fix_page::FREE_RECORD &&
- th->m_operation_ptr_i != RNIL)
- {
- jam();
- goto found_tuple; // Locked tuple...
- // skip free tuple
- }
- }
- } else {
- jam();
- // no more tuples on this page
- pos.m_get = ScanPos::Get_next_page;
- }
- }
- break; // incr loop count
- found_tuple:
- // found possible tuple to return
- jam();
- {
- // caller has already set pos.m_get to next tuple
- if (! (bits & ScanOp::SCAN_LCP && thbits & Tuple_header::LCP_SKIP)) {
- Local_key& key_mm = pos.m_key_mm;
- if (! (bits & ScanOp::SCAN_DD)) {
- key_mm = pos.m_key;
- // real page id is already set
- } else {
- key_mm.assref(th->m_base_record_ref);
- // recompute for each disk tuple
- pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
- }
- // TUPKEYREQ handles savepoint stuff
- scan.m_state = ScanOp::Current;
- return true;
- } else {
- jam();
- // clear it so that it will show up in next LCP
- th->m_header_bits = thbits & ~(Uint32)Tuple_header::LCP_SKIP;
- if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
- jam();
- setChecksum(th, tablePtr.p);
- }
- }
- }
- break;
- found_deleted_rowid:
- jam();
- {
- ndbassert(bits & ScanOp::SCAN_NR);
- Local_key& key_mm = pos.m_key_mm;
- if (! (bits & ScanOp::SCAN_DD)) {
- key_mm = pos.m_key;
- // caller has already set pos.m_get to next tuple
- // real page id is already set
- } else {
- key_mm.assref(th->m_base_record_ref);
- // recompute for each disk tuple
- pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
-
- Fix_page *mmpage = (Fix_page*)c_page_pool.getPtr(pos.m_realpid_mm);
- th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
- if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI ||
- foundGCI == 0)
- {
- if (! (thbits & Tuple_header::FREE))
- break;
- }
- }
-
- NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
- conf->scanPtr = scan.m_userPtr;
- conf->accOperationPtr = RNIL;
- conf->fragId = frag.fragmentId;
- conf->localKey[0] = pos.m_key_mm.ref();
- conf->localKey[1] = 0;
- conf->localKeyLength = 1;
- conf->gci = foundGCI;
- Uint32 blockNo = refToBlock(scan.m_userRef);
- EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
- jamEntry();
-
- // TUPKEYREQ handles savepoint stuff
- loop_count = 32;
- scan.m_state = ScanOp::Next;
- return false;
- }
- break; // incr loop count
- default:
- ndbrequire(false);
- break;
- }
- if (++loop_count >= 32)
- break;
- }
- // TODO: at drop table we have to flush and terminate these
- jam();
- signal->theData[0] = ZTUP_SCAN;
- signal->theData[1] = scanPtr.i;
- sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
- return false;
-
-found_lcp_keep:
- Local_key tmp;
- tmp.assref(lcp_list);
- tmp.m_page_no = getRealpid(fragPtr.p, tmp.m_page_no);
-
- Ptr<Page> pagePtr;
- c_page_pool.getPtr(pagePtr, tmp.m_page_no);
- Tuple_header* ptr = (Tuple_header*)
- ((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
- Uint32 headerbits = ptr->m_header_bits;
- ndbrequire(headerbits & Tuple_header::LCP_KEEP);
-
- Uint32 next = ptr->m_operation_ptr_i;
- ptr->m_operation_ptr_i = RNIL;
- ptr->m_header_bits = headerbits & ~(Uint32)Tuple_header::FREE;
-
- if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
- jam();
- setChecksum(ptr, tablePtr.p);
- }
-
- NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
- conf->scanPtr = scan.m_userPtr;
- conf->accOperationPtr = (Uint32)-1;
- conf->fragId = frag.fragmentId;
- conf->localKey[0] = lcp_list;
- conf->localKey[1] = 0;
- conf->localKeyLength = 1;
- conf->gci = 0;
- Uint32 blockNo = refToBlock(scan.m_userRef);
- EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
-
- fragPtr.p->m_lcp_keep_list = next;
- ptr->m_header_bits |= Tuple_header::FREED; // RESTORE free flag
- if (headerbits & Tuple_header::FREED)
- {
- if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
- {
- jam();
- free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
- } else {
- jam();
- free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
- }
- }
- return false;
-}
-
-void
-Dbtup::scanCont(Signal* signal, ScanOpPtr scanPtr)
-{
- bool immediate = scanNext(signal, scanPtr);
- if (! immediate) {
- jam();
- // time-slicing again
- return;
- }
- scanReply(signal, scanPtr);
-}
-
-void
-Dbtup::disk_page_tup_scan_callback(Signal* signal, Uint32 scanPtrI, Uint32 page_i)
-{
- ScanOpPtr scanPtr;
- c_scanOpPool.getPtr(scanPtr, scanPtrI);
- ScanOp& scan = *scanPtr.p;
- ScanPos& pos = scan.m_scanPos;
- // get cache page
- Ptr<GlobalPage> gptr;
- m_global_page_pool.getPtr(gptr, page_i);
- pos.m_page = (Page*)gptr.p;
- // continue
- scanCont(signal, scanPtr);
-}
-
-void
-Dbtup::scanClose(Signal* signal, ScanOpPtr scanPtr)
-{
- ScanOp& scan = *scanPtr.p;
- ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) && scan.m_accLockOp == RNIL);
- // unlock all not unlocked by LQH
- LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
- ScanLockPtr lockPtr;
- while (list.first(lockPtr)) {
- jam();
- AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
- lockReq->returnCode = RNIL;
- lockReq->requestInfo = AccLockReq::Abort;
- lockReq->accOpPtr = lockPtr.p->m_accLockOp;
- EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
- jamEntry();
- ndbrequire(lockReq->returnCode == AccLockReq::Success);
- list.release(lockPtr);
- }
- // send conf
- NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
- conf->scanPtr = scanPtr.p->m_userPtr;
- conf->accOperationPtr = RNIL;
- conf->fragId = RNIL;
- unsigned signalLength = 3;
- sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
- signal, signalLength, JBB);
- releaseScanOp(scanPtr);
-}
-
-void
-Dbtup::addAccLockOp(ScanOp& scan, Uint32 accLockOp)
-{
- LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
- ScanLockPtr lockPtr;
-#ifdef VM_TRACE
- list.first(lockPtr);
- while (lockPtr.i != RNIL) {
- ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
- list.next(lockPtr);
- }
-#endif
- bool ok = list.seize(lockPtr);
- ndbrequire(ok);
- lockPtr.p->m_accLockOp = accLockOp;
-}
-
-void
-Dbtup::removeAccLockOp(ScanOp& scan, Uint32 accLockOp)
-{
- LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
- ScanLockPtr lockPtr;
- list.first(lockPtr);
- while (lockPtr.i != RNIL) {
- if (lockPtr.p->m_accLockOp == accLockOp) {
- jam();
- break;
- }
- list.next(lockPtr);
- }
- ndbrequire(lockPtr.i != RNIL);
- list.release(lockPtr);
-}
-
-void
-Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
-{
- FragrecordPtr fragPtr;
- fragPtr.i = scanPtr.p->m_fragPtrI;
- ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
-
- if(scanPtr.p->m_bits & ScanOp::SCAN_LCP)
- {
- jam();
- fragPtr.p->m_lcp_scan_op = RNIL;
- scanPtr.p->m_fragPtrI = RNIL;
- }
- else
- {
- jam();
- LocalDLList<ScanOp> list(c_scanOpPool, fragPtr.p->m_scanList);
- list.release(scanPtr);
- }
-}
-
-void
-Dbtup::execLCP_FRAG_ORD(Signal* signal)
-{
- LcpFragOrd* req= (LcpFragOrd*)signal->getDataPtr();
-
- TablerecPtr tablePtr;
- tablePtr.i = req->tableId;
- ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
-
- if (tablePtr.p->m_no_of_disk_attributes)
- {
- jam();
- FragrecordPtr fragPtr;
- Uint32 fragId = req->fragmentId;
- fragPtr.i = RNIL;
- getFragmentrec(fragPtr, fragId, tablePtr.p);
- ndbrequire(fragPtr.i != RNIL);
- Fragrecord& frag = *fragPtr.p;
-
- ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
- frag.m_lcp_scan_op = c_lcp_scan_op;
- ScanOpPtr scanPtr;
- c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
- ndbrequire(scanPtr.p->m_fragPtrI == RNIL);
- scanPtr.p->m_fragPtrI = fragPtr.i;
-
- scanFirst(signal, scanPtr);
- scanPtr.p->m_state = ScanOp::First;
- }
-}