summaryrefslogtreecommitdiff
path: root/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/kernel/blocks/dbtup/DbtupScan.cpp')
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupScan.cpp315
1 files changed, 315 insertions, 0 deletions
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp b/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
new file mode 100644
index 00000000000..396404faa8c
--- /dev/null
+++ b/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
@@ -0,0 +1,315 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#define DBTUP_C
+#include "Dbtup.hpp"
+#include <signaldata/AccScan.hpp>
+#include <signaldata/NextScan.hpp>
+
+#undef jam
+#undef jamEntry
+#define jam() { jamLine(32000 + __LINE__); }
+#define jamEntry() { jamEntryLine(32000 + __LINE__); }
+
+void
+Dbtup::execACC_SCANREQ(Signal* signal)
+{
+ jamEntry();
+ const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
+ const AccScanReq* const req = &reqCopy;
+ ScanOpPtr scanPtr;
+ scanPtr.i = RNIL;
+ do {
+ // find table and fragments
+ TablerecPtr tablePtr;
+ tablePtr.i = req->tableId;
+ ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
+ FragrecordPtr fragPtr[2];
+ Uint32 fragId = req->fragmentNo << 1;
+ fragPtr[0].i = fragPtr[1].i = RNIL;
+ getFragmentrec(fragPtr[0], fragId | 0, tablePtr.p);
+ getFragmentrec(fragPtr[1], fragId | 1, tablePtr.p);
+ ndbrequire(fragPtr[0].i != RNIL && fragPtr[1].i != RNIL);
+ Fragrecord& frag = *fragPtr[0].p;
+ // seize from pool and link to per-fragment list
+ if (! frag.m_scanList.seize(scanPtr)) {
+ jam();
+ break;
+ }
+ new (scanPtr.p) ScanOp();
+ ScanOp& scan = *scanPtr.p;
+ scan.m_state = ScanOp::First;
+ scan.m_userPtr = req->senderData;
+ scan.m_userRef = req->senderRef;
+ scan.m_tableId = tablePtr.i;
+ scan.m_fragId = frag.fragmentId;
+ scan.m_fragPtrI[0] = fragPtr[0].i;
+ scan.m_fragPtrI[1] = fragPtr[1].i;
+ scan.m_transId1 = req->transId1;
+ scan.m_transId2 = req->transId2;
+ // conf
+ AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
+ conf->scanPtr = req->senderData;
+ conf->accPtr = scanPtr.i;
+ conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
+ sendSignal(req->senderRef, GSN_ACC_SCANCONF, signal,
+ AccScanConf::SignalLength, JBB);
+ return;
+ } while (0);
+ if (scanPtr.i != RNIL) {
+ jam();
+ releaseScanOp(scanPtr);
+ }
+ // LQH does not handle REF
+ signal->theData[0] = 0x313;
+ sendSignal(req->senderRef, GSN_ACC_SCANREF, signal, 1, JBB);
+}
+
+void
+Dbtup::execNEXT_SCANREQ(Signal* signal)
+{
+ jamEntry();
+ const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
+ const NextScanReq* const req = &reqCopy;
+ ScanOpPtr scanPtr;
+ c_scanOpPool.getPtr(scanPtr, req->accPtr);
+ ScanOp& scan = *scanPtr.p;
+ FragrecordPtr fragPtr;
+ fragPtr.i = scan.m_fragPtrI[0];
+ ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+ Fragrecord& frag = *fragPtr.p;
+ switch (req->scanFlag) {
+ case NextScanReq::ZSCAN_NEXT:
+ jam();
+ break;
+ case NextScanReq::ZSCAN_NEXT_COMMIT:
+ jam();
+ break;
+ case NextScanReq::ZSCAN_COMMIT:
+ jam();
+ {
+ NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
+ conf->scanPtr = scan.m_userPtr;
+ unsigned signalLength = 1;
+ sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
+ signal, signalLength, JBB);
+ return;
+ }
+ break;
+ case NextScanReq::ZSCAN_CLOSE:
+ jam();
+ scanClose(signal, scanPtr);
+ return;
+ case NextScanReq::ZSCAN_NEXT_ABORT:
+ jam();
+ default:
+ jam();
+ ndbrequire(false);
+ break;
+ }
+ // start looking for next scan result
+ AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
+ checkReq->accPtr = scanPtr.i;
+ checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
+ EXECUTE_DIRECT(DBTUP, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
+ jamEntry();
+}
+
+void
+Dbtup::execACC_CHECK_SCAN(Signal* signal)
+{
+ jamEntry();
+ const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
+ const AccCheckScan* const req = &reqCopy;
+ ScanOpPtr scanPtr;
+ c_scanOpPool.getPtr(scanPtr, req->accPtr);
+ ScanOp& scan = *scanPtr.p;
+ FragrecordPtr fragPtr;
+ fragPtr.i = scan.m_fragPtrI[0];
+ ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+ Fragrecord& frag = *fragPtr.p;
+ if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
+ jam();
+ signal->theData[0] = scan.m_userPtr;
+ signal->theData[1] = true;
+ EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
+ jamEntry();
+ return;
+ }
+ if (scan.m_state == ScanOp::First) {
+ jam();
+ scanFirst(signal, scanPtr);
+ }
+ if (scan.m_state == ScanOp::Next) {
+ jam();
+ scanNext(signal, scanPtr);
+ }
+ if (scan.m_state == ScanOp::Locked) {
+ jam();
+ const PagePos& pos = scan.m_scanPos;
+ NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
+ conf->scanPtr = scan.m_userPtr;
+ conf->accOperationPtr = (Uint32)-1; // no lock returned
+ conf->fragId = frag.fragmentId | pos.m_fragBit;
+ conf->localKey[0] = (pos.m_pageId << MAX_TUPLES_BITS) |
+ (pos.m_tupleNo << 1);
+ conf->localKey[1] = 0;
+ conf->localKeyLength = 1;
+ unsigned signalLength = 6;
+ Uint32 blockNo = refToBlock(scan.m_userRef);
+ EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
+ jamEntry();
+ // next time look for next entry
+ scan.m_state = ScanOp::Next;
+ return;
+ }
+ if (scan.m_state == ScanOp::Last ||
+ scan.m_state == ScanOp::Invalid) {
+ jam();
+ NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
+ conf->scanPtr = scan.m_userPtr;
+ conf->accOperationPtr = RNIL;
+ conf->fragId = RNIL;
+ unsigned signalLength = 3;
+ sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
+ signal, signalLength, JBB);
+ return;
+ }
+ ndbrequire(false);
+}
+
+void
+Dbtup::scanFirst(Signal* signal, ScanOpPtr scanPtr)
+{
+ ScanOp& scan = *scanPtr.p;
+ // set to first fragment, first page, first tuple
+ PagePos& pos = scan.m_scanPos;
+ pos.m_fragId = scan.m_fragId;
+ pos.m_fragBit = 0;
+ pos.m_pageId = 0;
+ pos.m_tupleNo = 0;
+ // just before
+ pos.m_match = false;
+ // let scanNext() do the work
+ scan.m_state = ScanOp::Next;
+}
+
+// TODO optimize this + index build
+void
+Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
+{
+ ScanOp& scan = *scanPtr.p;
+ PagePos& pos = scan.m_scanPos;
+ TablerecPtr tablePtr;
+ tablePtr.i = scan.m_tableId;
+ ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
+ while (true) {
+ // TODO time-slice here after X loops
+ jam();
+ // get fragment
+ if (pos.m_fragBit == 2) {
+ jam();
+ scan.m_state = ScanOp::Last;
+ break;
+ }
+ ndbrequire(pos.m_fragBit <= 1);
+ FragrecordPtr fragPtr;
+ fragPtr.i = scan.m_fragPtrI[pos.m_fragBit];
+ ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+ Fragrecord& frag = *fragPtr.p;
+ // get page
+ PagePtr pagePtr;
+ if (pos.m_pageId >= frag.noOfPages) {
+ jam();
+ pos.m_fragBit++;
+ pos.m_pageId = 0;
+ pos.m_tupleNo = 0;
+ pos.m_match = false;
+ continue;
+ }
+ Uint32 realPageId = getRealpid(fragPtr.p, pos.m_pageId);
+ pagePtr.i = realPageId;
+ ptrCheckGuard(pagePtr, cnoOfPage, page);
+ const Uint32 pageState = pagePtr.p->pageWord[ZPAGE_STATE_POS];
+ if (pageState != ZTH_MM_FREE &&
+ pageState != ZTH_MM_FULL) {
+ jam();
+ pos.m_pageId++;
+ pos.m_tupleNo = 0;
+ pos.m_match = false;
+ continue;
+ }
+ // get next tuple
+ if (pos.m_match)
+ pos.m_tupleNo++;
+ pos.m_match = true;
+ const Uint32 tupheadsize = tablePtr.p->tupheadsize;
+ Uint32 pageOffset = ZPAGE_HEADER_SIZE + pos.m_tupleNo * tupheadsize;
+ if (pageOffset + tupheadsize > ZWORDS_ON_PAGE) {
+ jam();
+ pos.m_pageId++;
+ pos.m_tupleNo = 0;
+ pos.m_match = false;
+ continue;
+ }
+ // skip over free tuple
+ bool isFree = false;
+ if (pageState == ZTH_MM_FREE) {
+ jam();
+ if ((pagePtr.p->pageWord[pageOffset] >> 16) == tupheadsize) {
+ Uint32 nextTuple = pagePtr.p->pageWord[ZFREELIST_HEADER_POS] >> 16;
+ while (nextTuple != 0) {
+ jam();
+ if (nextTuple == pageOffset) {
+ jam();
+ isFree = true;
+ break;
+ }
+ nextTuple = pagePtr.p->pageWord[nextTuple] & 0xffff;
+ }
+ }
+ }
+ if (isFree) {
+ jam();
+ continue;
+ }
+ // TODO check for operation and return latest in own tx
+ scan.m_state = ScanOp::Locked;
+ break;
+ }
+}
+
+void
+Dbtup::scanClose(Signal* signal, ScanOpPtr scanPtr)
+{
+ NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
+ conf->scanPtr = scanPtr.p->m_userPtr;
+ conf->accOperationPtr = RNIL;
+ conf->fragId = RNIL;
+ unsigned signalLength = 3;
+ sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
+ signal, signalLength, JBB);
+ releaseScanOp(scanPtr);
+}
+
+void
+Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
+{
+ FragrecordPtr fragPtr;
+ fragPtr.i = scanPtr.p->m_fragPtrI[0];
+ ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+ fragPtr.p->m_scanList.release(scanPtr);
+}