diff options
Diffstat (limited to 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp')
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp | 581 |
1 files changed, 581 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp new file mode 100644 index 00000000000..855a8ed1c29 --- /dev/null +++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp @@ -0,0 +1,581 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#define DBTUX_NODE_CPP +#include "Dbtux.hpp" + +/* + * Allocate index node in TUP. + */ +int +Dbtux::allocNode(Signal* signal, NodeHandle& node) +{ + Frag& frag = node.m_frag; + Uint32 pageId = NullTupLoc.getPageId(); + Uint32 pageOffset = NullTupLoc.getPageOffset(); + Uint32* node32 = 0; + int errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); + jamEntry(); + if (errorCode == 0) { + jam(); + node.m_loc = TupLoc(pageId, pageOffset); + node.m_node = reinterpret_cast<TreeNode*>(node32); + ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0); + } + return errorCode; +} + +/* + * Set handle to point to existing node. + */ +void +Dbtux::selectNode(NodeHandle& node, TupLoc loc) +{ + Frag& frag = node.m_frag; + ndbrequire(loc != NullTupLoc); + Uint32 pageId = loc.getPageId(); + Uint32 pageOffset = loc.getPageOffset(); + Uint32* node32 = 0; + c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); + jamEntry(); + node.m_loc = loc; + node.m_node = reinterpret_cast<TreeNode*>(node32); + ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0); +} + +/* + * Set handle to point to new node. Uses a pre-allocated node. + */ +void +Dbtux::insertNode(NodeHandle& node) +{ + Frag& frag = node.m_frag; + // unlink from freelist + selectNode(node, frag.m_freeLoc); + frag.m_freeLoc = node.getLink(0); + new (node.m_node) TreeNode(); +#ifdef VM_TRACE + TreeHead& tree = frag.m_tree; + memset(node.getPref(), DataFillByte, tree.m_prefSize << 2); + TreeEnt* entList = tree.getEntList(node.m_node); + memset(entList, NodeFillByte, (tree.m_maxOccup + 1) * (TreeEntSize << 2)); +#endif +} + +/* + * Delete existing node. Simply put it on the freelist. + */ +void +Dbtux::deleteNode(NodeHandle& node) +{ + Frag& frag = node.m_frag; + ndbrequire(node.getOccup() == 0); + // link to freelist + node.setLink(0, frag.m_freeLoc); + frag.m_freeLoc = node.m_loc; + // invalidate the handle + node.m_loc = NullTupLoc; + node.m_node = 0; +} + +/* + * Set prefix. Copies the number of words that fits. Includes + * attribute headers for now. XXX use null mask instead + */ +void +Dbtux::setNodePref(NodeHandle& node) +{ + const Frag& frag = node.m_frag; + const TreeHead& tree = frag.m_tree; + readKeyAttrs(frag, node.getMinMax(0), 0, c_entryKey); + copyAttrs(frag, c_entryKey, node.getPref(), tree.m_prefSize); +} + +// node operations + +/* + * Add entry at position. Move entries greater than or equal to the old + * one (if any) to the right. + * + * X + * v + * A B C D E _ _ => A B C X D E _ + * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + * + * Add list of scans at the new entry. + */ +void +Dbtux::nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList) +{ + Frag& frag = node.m_frag; + TreeHead& tree = frag.m_tree; + const unsigned occup = node.getOccup(); + ndbrequire(occup < tree.m_maxOccup && pos <= occup); + // fix old scans + if (node.getNodeScan() != RNIL) + nodePushUpScans(node, pos); + // fix node + TreeEnt* const entList = tree.getEntList(node.m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + for (unsigned i = occup; i > pos; i--) { + jam(); + tmpList[i] = tmpList[i - 1]; + } + tmpList[pos] = ent; + entList[0] = entList[occup + 1]; + node.setOccup(occup + 1); + // add new scans + if (scanList != RNIL) + addScanList(node, pos, scanList); + // fix prefix + if (occup == 0 || pos == 0) + setNodePref(node); +} + +void +Dbtux::nodePushUpScans(NodeHandle& node, unsigned pos) +{ + const unsigned occup = node.getOccup(); + ScanOpPtr scanPtr; + scanPtr.i = node.getNodeScan(); + do { + jam(); + c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); + if (scanPos.m_pos >= pos) { + jam(); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At pushUp pos=" << pos << " " << node << endl; + } +#endif + scanPos.m_pos++; + } + scanPtr.i = scanPtr.p->m_nodeScan; + } while (scanPtr.i != RNIL); +} + +/* + * Remove and return entry at position. Move entries greater than the + * removed one to the left. This is the opposite of nodePushUp. + * + * D + * ^ ^ + * A B C D E F _ => A B C E F _ _ + * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + * + * Scans at removed entry are returned if non-zero location is passed or + * else moved forward. + */ +void +Dbtux::nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList) +{ + Frag& frag = node.m_frag; + TreeHead& tree = frag.m_tree; + const unsigned occup = node.getOccup(); + ndbrequire(occup <= tree.m_maxOccup && pos < occup); + if (node.getNodeScan() != RNIL) { + // remove or move scans at this position + if (scanList == 0) + moveScanList(node, pos); + else + removeScanList(node, pos, *scanList); + // fix other scans + if (node.getNodeScan() != RNIL) + nodePopDownScans(node, pos); + } + // fix node + TreeEnt* const entList = tree.getEntList(node.m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + ent = tmpList[pos]; + for (unsigned i = pos; i < occup - 1; i++) { + jam(); + tmpList[i] = tmpList[i + 1]; + } + entList[0] = entList[occup - 1]; + node.setOccup(occup - 1); + // fix prefix + if (occup != 1 && pos == 0) + setNodePref(node); +} + +void +Dbtux::nodePopDownScans(NodeHandle& node, unsigned pos) +{ + const unsigned occup = node.getOccup(); + ScanOpPtr scanPtr; + scanPtr.i = node.getNodeScan(); + do { + jam(); + c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); + // handled before + ndbrequire(scanPos.m_pos != pos); + if (scanPos.m_pos > pos) { + jam(); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At popDown pos=" << pos << " " << node << endl; + } +#endif + scanPos.m_pos--; + } + scanPtr.i = scanPtr.p->m_nodeScan; + } while (scanPtr.i != RNIL); +} + +/* + * Add entry at existing position. Move entries less than or equal to + * the old one to the left. Remove and return old min entry. + * + * X A + * ^ v ^ + * A B C D E _ _ => B C D X E _ _ + * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + * + * Return list of scans at the removed position 0. + */ +void +Dbtux::nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList) +{ + Frag& frag = node.m_frag; + TreeHead& tree = frag.m_tree; + const unsigned occup = node.getOccup(); + ndbrequire(occup <= tree.m_maxOccup && pos < occup); + if (node.getNodeScan() != RNIL) { + // remove scans at 0 + removeScanList(node, 0, scanList); + // fix other scans + if (node.getNodeScan() != RNIL) + nodePushDownScans(node, pos); + } + // fix node + TreeEnt* const entList = tree.getEntList(node.m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + TreeEnt oldMin = tmpList[0]; + for (unsigned i = 0; i < pos; i++) { + jam(); + tmpList[i] = tmpList[i + 1]; + } + tmpList[pos] = ent; + ent = oldMin; + entList[0] = entList[occup]; + // fix prefix + if (true) + setNodePref(node); +} + +void +Dbtux::nodePushDownScans(NodeHandle& node, unsigned pos) +{ + const unsigned occup = node.getOccup(); + ScanOpPtr scanPtr; + scanPtr.i = node.getNodeScan(); + do { + jam(); + c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); + // handled before + ndbrequire(scanPos.m_pos != 0); + if (scanPos.m_pos <= pos) { + jam(); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At pushDown pos=" << pos << " " << node << endl; + } +#endif + scanPos.m_pos--; + } + scanPtr.i = scanPtr.p->m_nodeScan; + } while (scanPtr.i != RNIL); +} + +/* + * Remove and return entry at position. Move entries less than the + * removed one to the right. Replace min entry by the input entry. + * This is the opposite of nodePushDown. + * + * X D + * v ^ ^ + * A B C D E _ _ => X A B C E _ _ + * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + * + * Move scans at removed entry and add scans at the new entry. + */ +void +Dbtux::nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList) +{ + Frag& frag = node.m_frag; + TreeHead& tree = frag.m_tree; + const unsigned occup = node.getOccup(); + ndbrequire(occup <= tree.m_maxOccup && pos < occup); + if (node.getNodeScan() != RNIL) { + // move scans whose entry disappears + moveScanList(node, pos); + // fix other scans + if (node.getNodeScan() != RNIL) + nodePopUpScans(node, pos); + } + // fix node + TreeEnt* const entList = tree.getEntList(node.m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + TreeEnt newMin = ent; + ent = tmpList[pos]; + for (unsigned i = pos; i > 0; i--) { + jam(); + tmpList[i] = tmpList[i - 1]; + } + tmpList[0] = newMin; + entList[0] = entList[occup]; + // add scans + if (scanList != RNIL) + addScanList(node, 0, scanList); + // fix prefix + if (true) + setNodePref(node); +} + +void +Dbtux::nodePopUpScans(NodeHandle& node, unsigned pos) +{ + const unsigned occup = node.getOccup(); + ScanOpPtr scanPtr; + scanPtr.i = node.getNodeScan(); + do { + jam(); + c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); + ndbrequire(scanPos.m_pos != pos); + if (scanPos.m_pos < pos) { + jam(); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At popUp pos=" << pos << " " << node << endl; + } +#endif + scanPos.m_pos++; + } + scanPtr.i = scanPtr.p->m_nodeScan; + } while (scanPtr.i != RNIL); +} + +/* + * Move number of entries from another node to this node before the min + * (i=0) or after the max (i=1). Expensive but not often used. + */ +void +Dbtux::nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i) +{ + Frag& frag = dstNode.m_frag; + TreeHead& tree = frag.m_tree; + ndbrequire(i <= 1); + while (cnt != 0) { + TreeEnt ent; + Uint32 scanList = RNIL; + nodePopDown(srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList); + nodePushUp(dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList); + cnt--; + } +} + +// scans linked to node + + +/* + * Add list of scans to node at given position. + */ +void +Dbtux::addScanList(NodeHandle& node, unsigned pos, Uint32 scanList) +{ + ScanOpPtr scanPtr; + scanPtr.i = scanList; + do { + jam(); + c_scanOpPool.getPtr(scanPtr); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Add scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "To pos=" << pos << " " << node << endl; + } +#endif + const Uint32 nextPtrI = scanPtr.p->m_nodeScan; + scanPtr.p->m_nodeScan = RNIL; + linkScan(node, scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + // set position but leave direction alone + scanPos.m_loc = node.m_loc; + scanPos.m_pos = pos; + scanPtr.i = nextPtrI; + } while (scanPtr.i != RNIL); +} + +/* + * Remove list of scans from node at given position. The return + * location must point to existing list (in fact RNIL always). + */ +void +Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList) +{ + ScanOpPtr scanPtr; + scanPtr.i = node.getNodeScan(); + do { + jam(); + c_scanOpPool.getPtr(scanPtr); + const Uint32 nextPtrI = scanPtr.p->m_nodeScan; + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_loc == node.m_loc); + if (scanPos.m_pos == pos) { + jam(); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Remove scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "Fron pos=" << pos << " " << node << endl; + } +#endif + unlinkScan(node, scanPtr); + scanPtr.p->m_nodeScan = scanList; + scanList = scanPtr.i; + // unset position but leave direction alone + scanPos.m_loc = NullTupLoc; + scanPos.m_pos = ZNIL; + } + scanPtr.i = nextPtrI; + } while (scanPtr.i != RNIL); +} + +/* + * Move list of scans away from entry about to be removed. Uses scan + * method scanNext(). + */ +void +Dbtux::moveScanList(NodeHandle& node, unsigned pos) +{ + ScanOpPtr scanPtr; + scanPtr.i = node.getNodeScan(); + do { + jam(); + c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + const Uint32 nextPtrI = scanPtr.p->m_nodeScan; + ndbrequire(scanPos.m_loc == node.m_loc); + if (scanPos.m_pos == pos) { + jam(); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At pos=" << pos << " " << node << endl; + } +#endif + scanNext(scanPtr, true); + ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos)); + } + scanPtr.i = nextPtrI; + } while (scanPtr.i != RNIL); +} + +/* + * Link scan to the list under the node. The list is single-linked and + * ordering does not matter. + */ +void +Dbtux::linkScan(NodeHandle& node, ScanOpPtr scanPtr) +{ +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "To node " << node << endl; + } +#endif + ndbrequire(! islinkScan(node, scanPtr) && scanPtr.p->m_nodeScan == RNIL); + scanPtr.p->m_nodeScan = node.getNodeScan(); + node.setNodeScan(scanPtr.i); +} + +/* + * Unlink a scan from the list under the node. + */ +void +Dbtux::unlinkScan(NodeHandle& node, ScanOpPtr scanPtr) +{ +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "From node " << node << endl; + } +#endif + ScanOpPtr currPtr; + currPtr.i = node.getNodeScan(); + ScanOpPtr prevPtr; + prevPtr.i = RNIL; + while (true) { + jam(); + c_scanOpPool.getPtr(currPtr); + Uint32 nextPtrI = currPtr.p->m_nodeScan; + if (currPtr.i == scanPtr.i) { + jam(); + if (prevPtr.i == RNIL) { + node.setNodeScan(nextPtrI); + } else { + jam(); + prevPtr.p->m_nodeScan = nextPtrI; + } + scanPtr.p->m_nodeScan = RNIL; + // check for duplicates + ndbrequire(! islinkScan(node, scanPtr)); + return; + } + prevPtr = currPtr; + currPtr.i = nextPtrI; + } +} + +/* + * Check if a scan is linked to this node. Only for ndbrequire. + */ +bool +Dbtux::islinkScan(NodeHandle& node, ScanOpPtr scanPtr) +{ + ScanOpPtr currPtr; + currPtr.i = node.getNodeScan(); + while (currPtr.i != RNIL) { + jam(); + c_scanOpPool.getPtr(currPtr); + if (currPtr.i == scanPtr.i) { + jam(); + return true; + } + currPtr.i = currPtr.p->m_nodeScan; + } + return false; +} + +void +Dbtux::NodeHandle::progError(int line, int cause, const char* file) +{ + ErrorReporter::handleAssert("Dbtux::NodeHandle: assert failed", file, line); +} |