summaryrefslogtreecommitdiff
path: root/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp')
-rw-r--r--storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp581
1 files changed, 581 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
new file mode 100644
index 00000000000..855a8ed1c29
--- /dev/null
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
@@ -0,0 +1,581 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#define DBTUX_NODE_CPP
+#include "Dbtux.hpp"
+
+/*
+ * Allocate index node in TUP.
+ */
+int
+Dbtux::allocNode(Signal* signal, NodeHandle& node)
+{
+ Frag& frag = node.m_frag;
+ Uint32 pageId = NullTupLoc.getPageId();
+ Uint32 pageOffset = NullTupLoc.getPageOffset();
+ Uint32* node32 = 0;
+ int errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
+ jamEntry();
+ if (errorCode == 0) {
+ jam();
+ node.m_loc = TupLoc(pageId, pageOffset);
+ node.m_node = reinterpret_cast<TreeNode*>(node32);
+ ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
+ }
+ return errorCode;
+}
+
+/*
+ * Set handle to point to existing node.
+ */
+void
+Dbtux::selectNode(NodeHandle& node, TupLoc loc)
+{
+ Frag& frag = node.m_frag;
+ ndbrequire(loc != NullTupLoc);
+ Uint32 pageId = loc.getPageId();
+ Uint32 pageOffset = loc.getPageOffset();
+ Uint32* node32 = 0;
+ c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
+ jamEntry();
+ node.m_loc = loc;
+ node.m_node = reinterpret_cast<TreeNode*>(node32);
+ ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
+}
+
+/*
+ * Set handle to point to new node. Uses a pre-allocated node.
+ */
+void
+Dbtux::insertNode(NodeHandle& node)
+{
+ Frag& frag = node.m_frag;
+ // unlink from freelist
+ selectNode(node, frag.m_freeLoc);
+ frag.m_freeLoc = node.getLink(0);
+ new (node.m_node) TreeNode();
+#ifdef VM_TRACE
+ TreeHead& tree = frag.m_tree;
+ memset(node.getPref(), DataFillByte, tree.m_prefSize << 2);
+ TreeEnt* entList = tree.getEntList(node.m_node);
+ memset(entList, NodeFillByte, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
+#endif
+}
+
+/*
+ * Delete existing node. Simply put it on the freelist.
+ */
+void
+Dbtux::deleteNode(NodeHandle& node)
+{
+ Frag& frag = node.m_frag;
+ ndbrequire(node.getOccup() == 0);
+ // link to freelist
+ node.setLink(0, frag.m_freeLoc);
+ frag.m_freeLoc = node.m_loc;
+ // invalidate the handle
+ node.m_loc = NullTupLoc;
+ node.m_node = 0;
+}
+
+/*
+ * Set prefix. Copies the number of words that fits. Includes
+ * attribute headers for now. XXX use null mask instead
+ */
+void
+Dbtux::setNodePref(NodeHandle& node)
+{
+ const Frag& frag = node.m_frag;
+ const TreeHead& tree = frag.m_tree;
+ readKeyAttrs(frag, node.getMinMax(0), 0, c_entryKey);
+ copyAttrs(frag, c_entryKey, node.getPref(), tree.m_prefSize);
+}
+
+// node operations
+
+/*
+ * Add entry at position. Move entries greater than or equal to the old
+ * one (if any) to the right.
+ *
+ * X
+ * v
+ * A B C D E _ _ => A B C X D E _
+ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
+ *
+ * Add list of scans at the new entry.
+ */
+void
+Dbtux::nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
+{
+ Frag& frag = node.m_frag;
+ TreeHead& tree = frag.m_tree;
+ const unsigned occup = node.getOccup();
+ ndbrequire(occup < tree.m_maxOccup && pos <= occup);
+ // fix old scans
+ if (node.getNodeScan() != RNIL)
+ nodePushUpScans(node, pos);
+ // fix node
+ TreeEnt* const entList = tree.getEntList(node.m_node);
+ entList[occup] = entList[0];
+ TreeEnt* const tmpList = entList + 1;
+ for (unsigned i = occup; i > pos; i--) {
+ jam();
+ tmpList[i] = tmpList[i - 1];
+ }
+ tmpList[pos] = ent;
+ entList[0] = entList[occup + 1];
+ node.setOccup(occup + 1);
+ // add new scans
+ if (scanList != RNIL)
+ addScanList(node, pos, scanList);
+ // fix prefix
+ if (occup == 0 || pos == 0)
+ setNodePref(node);
+}
+
+void
+Dbtux::nodePushUpScans(NodeHandle& node, unsigned pos)
+{
+ const unsigned occup = node.getOccup();
+ ScanOpPtr scanPtr;
+ scanPtr.i = node.getNodeScan();
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
+ if (scanPos.m_pos >= pos) {
+ jam();
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At pushUp pos=" << pos << " " << node << endl;
+ }
+#endif
+ scanPos.m_pos++;
+ }
+ scanPtr.i = scanPtr.p->m_nodeScan;
+ } while (scanPtr.i != RNIL);
+}
+
+/*
+ * Remove and return entry at position. Move entries greater than the
+ * removed one to the left. This is the opposite of nodePushUp.
+ *
+ * D
+ * ^ ^
+ * A B C D E F _ => A B C E F _ _
+ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
+ *
+ * Scans at removed entry are returned if non-zero location is passed or
+ * else moved forward.
+ */
+void
+Dbtux::nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
+{
+ Frag& frag = node.m_frag;
+ TreeHead& tree = frag.m_tree;
+ const unsigned occup = node.getOccup();
+ ndbrequire(occup <= tree.m_maxOccup && pos < occup);
+ if (node.getNodeScan() != RNIL) {
+ // remove or move scans at this position
+ if (scanList == 0)
+ moveScanList(node, pos);
+ else
+ removeScanList(node, pos, *scanList);
+ // fix other scans
+ if (node.getNodeScan() != RNIL)
+ nodePopDownScans(node, pos);
+ }
+ // fix node
+ TreeEnt* const entList = tree.getEntList(node.m_node);
+ entList[occup] = entList[0];
+ TreeEnt* const tmpList = entList + 1;
+ ent = tmpList[pos];
+ for (unsigned i = pos; i < occup - 1; i++) {
+ jam();
+ tmpList[i] = tmpList[i + 1];
+ }
+ entList[0] = entList[occup - 1];
+ node.setOccup(occup - 1);
+ // fix prefix
+ if (occup != 1 && pos == 0)
+ setNodePref(node);
+}
+
+void
+Dbtux::nodePopDownScans(NodeHandle& node, unsigned pos)
+{
+ const unsigned occup = node.getOccup();
+ ScanOpPtr scanPtr;
+ scanPtr.i = node.getNodeScan();
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
+ // handled before
+ ndbrequire(scanPos.m_pos != pos);
+ if (scanPos.m_pos > pos) {
+ jam();
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At popDown pos=" << pos << " " << node << endl;
+ }
+#endif
+ scanPos.m_pos--;
+ }
+ scanPtr.i = scanPtr.p->m_nodeScan;
+ } while (scanPtr.i != RNIL);
+}
+
+/*
+ * Add entry at existing position. Move entries less than or equal to
+ * the old one to the left. Remove and return old min entry.
+ *
+ * X A
+ * ^ v ^
+ * A B C D E _ _ => B C D X E _ _
+ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
+ *
+ * Return list of scans at the removed position 0.
+ */
+void
+Dbtux::nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
+{
+ Frag& frag = node.m_frag;
+ TreeHead& tree = frag.m_tree;
+ const unsigned occup = node.getOccup();
+ ndbrequire(occup <= tree.m_maxOccup && pos < occup);
+ if (node.getNodeScan() != RNIL) {
+ // remove scans at 0
+ removeScanList(node, 0, scanList);
+ // fix other scans
+ if (node.getNodeScan() != RNIL)
+ nodePushDownScans(node, pos);
+ }
+ // fix node
+ TreeEnt* const entList = tree.getEntList(node.m_node);
+ entList[occup] = entList[0];
+ TreeEnt* const tmpList = entList + 1;
+ TreeEnt oldMin = tmpList[0];
+ for (unsigned i = 0; i < pos; i++) {
+ jam();
+ tmpList[i] = tmpList[i + 1];
+ }
+ tmpList[pos] = ent;
+ ent = oldMin;
+ entList[0] = entList[occup];
+ // fix prefix
+ if (true)
+ setNodePref(node);
+}
+
+void
+Dbtux::nodePushDownScans(NodeHandle& node, unsigned pos)
+{
+ const unsigned occup = node.getOccup();
+ ScanOpPtr scanPtr;
+ scanPtr.i = node.getNodeScan();
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
+ // handled before
+ ndbrequire(scanPos.m_pos != 0);
+ if (scanPos.m_pos <= pos) {
+ jam();
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At pushDown pos=" << pos << " " << node << endl;
+ }
+#endif
+ scanPos.m_pos--;
+ }
+ scanPtr.i = scanPtr.p->m_nodeScan;
+ } while (scanPtr.i != RNIL);
+}
+
+/*
+ * Remove and return entry at position. Move entries less than the
+ * removed one to the right. Replace min entry by the input entry.
+ * This is the opposite of nodePushDown.
+ *
+ * X D
+ * v ^ ^
+ * A B C D E _ _ => X A B C E _ _
+ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
+ *
+ * Move scans at removed entry and add scans at the new entry.
+ */
+void
+Dbtux::nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
+{
+ Frag& frag = node.m_frag;
+ TreeHead& tree = frag.m_tree;
+ const unsigned occup = node.getOccup();
+ ndbrequire(occup <= tree.m_maxOccup && pos < occup);
+ if (node.getNodeScan() != RNIL) {
+ // move scans whose entry disappears
+ moveScanList(node, pos);
+ // fix other scans
+ if (node.getNodeScan() != RNIL)
+ nodePopUpScans(node, pos);
+ }
+ // fix node
+ TreeEnt* const entList = tree.getEntList(node.m_node);
+ entList[occup] = entList[0];
+ TreeEnt* const tmpList = entList + 1;
+ TreeEnt newMin = ent;
+ ent = tmpList[pos];
+ for (unsigned i = pos; i > 0; i--) {
+ jam();
+ tmpList[i] = tmpList[i - 1];
+ }
+ tmpList[0] = newMin;
+ entList[0] = entList[occup];
+ // add scans
+ if (scanList != RNIL)
+ addScanList(node, 0, scanList);
+ // fix prefix
+ if (true)
+ setNodePref(node);
+}
+
+void
+Dbtux::nodePopUpScans(NodeHandle& node, unsigned pos)
+{
+ const unsigned occup = node.getOccup();
+ ScanOpPtr scanPtr;
+ scanPtr.i = node.getNodeScan();
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
+ ndbrequire(scanPos.m_pos != pos);
+ if (scanPos.m_pos < pos) {
+ jam();
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At popUp pos=" << pos << " " << node << endl;
+ }
+#endif
+ scanPos.m_pos++;
+ }
+ scanPtr.i = scanPtr.p->m_nodeScan;
+ } while (scanPtr.i != RNIL);
+}
+
+/*
+ * Move number of entries from another node to this node before the min
+ * (i=0) or after the max (i=1). Expensive but not often used.
+ */
+void
+Dbtux::nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i)
+{
+ Frag& frag = dstNode.m_frag;
+ TreeHead& tree = frag.m_tree;
+ ndbrequire(i <= 1);
+ while (cnt != 0) {
+ TreeEnt ent;
+ Uint32 scanList = RNIL;
+ nodePopDown(srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
+ nodePushUp(dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
+ cnt--;
+ }
+}
+
+// scans linked to node
+
+
+/*
+ * Add list of scans to node at given position.
+ */
+void
+Dbtux::addScanList(NodeHandle& node, unsigned pos, Uint32 scanList)
+{
+ ScanOpPtr scanPtr;
+ scanPtr.i = scanList;
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Add scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "To pos=" << pos << " " << node << endl;
+ }
+#endif
+ const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
+ scanPtr.p->m_nodeScan = RNIL;
+ linkScan(node, scanPtr);
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ // set position but leave direction alone
+ scanPos.m_loc = node.m_loc;
+ scanPos.m_pos = pos;
+ scanPtr.i = nextPtrI;
+ } while (scanPtr.i != RNIL);
+}
+
+/*
+ * Remove list of scans from node at given position. The return
+ * location must point to existing list (in fact RNIL always).
+ */
+void
+Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList)
+{
+ ScanOpPtr scanPtr;
+ scanPtr.i = node.getNodeScan();
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+ const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ ndbrequire(scanPos.m_loc == node.m_loc);
+ if (scanPos.m_pos == pos) {
+ jam();
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Remove scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "Fron pos=" << pos << " " << node << endl;
+ }
+#endif
+ unlinkScan(node, scanPtr);
+ scanPtr.p->m_nodeScan = scanList;
+ scanList = scanPtr.i;
+ // unset position but leave direction alone
+ scanPos.m_loc = NullTupLoc;
+ scanPos.m_pos = ZNIL;
+ }
+ scanPtr.i = nextPtrI;
+ } while (scanPtr.i != RNIL);
+}
+
+/*
+ * Move list of scans away from entry about to be removed. Uses scan
+ * method scanNext().
+ */
+void
+Dbtux::moveScanList(NodeHandle& node, unsigned pos)
+{
+ ScanOpPtr scanPtr;
+ scanPtr.i = node.getNodeScan();
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
+ ndbrequire(scanPos.m_loc == node.m_loc);
+ if (scanPos.m_pos == pos) {
+ jam();
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At pos=" << pos << " " << node << endl;
+ }
+#endif
+ scanNext(scanPtr, true);
+ ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
+ }
+ scanPtr.i = nextPtrI;
+ } while (scanPtr.i != RNIL);
+}
+
+/*
+ * Link scan to the list under the node. The list is single-linked and
+ * ordering does not matter.
+ */
+void
+Dbtux::linkScan(NodeHandle& node, ScanOpPtr scanPtr)
+{
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "To node " << node << endl;
+ }
+#endif
+ ndbrequire(! islinkScan(node, scanPtr) && scanPtr.p->m_nodeScan == RNIL);
+ scanPtr.p->m_nodeScan = node.getNodeScan();
+ node.setNodeScan(scanPtr.i);
+}
+
+/*
+ * Unlink a scan from the list under the node.
+ */
+void
+Dbtux::unlinkScan(NodeHandle& node, ScanOpPtr scanPtr)
+{
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "From node " << node << endl;
+ }
+#endif
+ ScanOpPtr currPtr;
+ currPtr.i = node.getNodeScan();
+ ScanOpPtr prevPtr;
+ prevPtr.i = RNIL;
+ while (true) {
+ jam();
+ c_scanOpPool.getPtr(currPtr);
+ Uint32 nextPtrI = currPtr.p->m_nodeScan;
+ if (currPtr.i == scanPtr.i) {
+ jam();
+ if (prevPtr.i == RNIL) {
+ node.setNodeScan(nextPtrI);
+ } else {
+ jam();
+ prevPtr.p->m_nodeScan = nextPtrI;
+ }
+ scanPtr.p->m_nodeScan = RNIL;
+ // check for duplicates
+ ndbrequire(! islinkScan(node, scanPtr));
+ return;
+ }
+ prevPtr = currPtr;
+ currPtr.i = nextPtrI;
+ }
+}
+
+/*
+ * Check if a scan is linked to this node. Only for ndbrequire.
+ */
+bool
+Dbtux::islinkScan(NodeHandle& node, ScanOpPtr scanPtr)
+{
+ ScanOpPtr currPtr;
+ currPtr.i = node.getNodeScan();
+ while (currPtr.i != RNIL) {
+ jam();
+ c_scanOpPool.getPtr(currPtr);
+ if (currPtr.i == scanPtr.i) {
+ jam();
+ return true;
+ }
+ currPtr.i = currPtr.p->m_nodeScan;
+ }
+ return false;
+}
+
+void
+Dbtux::NodeHandle::progError(int line, int cause, const char* file)
+{
+ ErrorReporter::handleAssert("Dbtux::NodeHandle: assert failed", file, line);
+}