diff options
Diffstat (limited to 'ndb')
-rw-r--r-- | ndb/include/kernel/ndb_limits.h | 2 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/Dbtux.hpp | 61 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp | 15 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp | 14 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp | 4 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp | 406 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp | 46 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp | 14 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp | 396 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/Times.txt | 22 | ||||
-rw-r--r-- | ndb/test/ndbapi/testOIBasic.cpp | 121 |
11 files changed, 690 insertions, 411 deletions
diff --git a/ndb/include/kernel/ndb_limits.h b/ndb/include/kernel/ndb_limits.h index 9411a98f091..00378bcab81 100644 --- a/ndb/include/kernel/ndb_limits.h +++ b/ndb/include/kernel/ndb_limits.h @@ -110,7 +110,7 @@ */ #define MAX_TTREE_NODE_SIZE 64 // total words in node #define MAX_TTREE_PREF_SIZE 4 // words in min prefix -#define MAX_TTREE_NODE_SLACK 3 // diff between max and min occupancy +#define MAX_TTREE_NODE_SLACK 2 // diff between max and min occupancy /* * Blobs. diff --git a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp index ad748d2636f..dc95bf8e8ea 100644 --- a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp +++ b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp @@ -32,7 +32,6 @@ // signal classes #include <signaldata/DictTabInfo.hpp> #include <signaldata/TuxContinueB.hpp> -#include <signaldata/BuildIndx.hpp> #include <signaldata/TupFrag.hpp> #include <signaldata/AlterIndx.hpp> #include <signaldata/DropTab.hpp> @@ -478,7 +477,7 @@ private: Uint16 m_numAttrs; bool m_storeNullKey; TreeHead m_tree; - TupLoc m_freeLoc; // one node pre-allocated for insert + TupLoc m_freeLoc; // list of free index nodes DLList<ScanOp> m_scanList; // current scans on this fragment Uint32 m_tupIndexFragPtrI; Uint32 m_tupTableFragPtrI[2]; @@ -582,17 +581,24 @@ private: * DbtuxNode.cpp */ int allocNode(Signal* signal, NodeHandle& node); - void selectNode(Signal* signal, NodeHandle& node, TupLoc loc); - void insertNode(Signal* signal, NodeHandle& node); - void deleteNode(Signal* signal, NodeHandle& node); - void setNodePref(Signal* signal, NodeHandle& node); + void selectNode(NodeHandle& node, TupLoc loc); + void insertNode(NodeHandle& node); + void deleteNode(NodeHandle& node); + void setNodePref(NodeHandle& node); // node operations - void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent); - void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); - void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); - void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); - void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i); + void nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList); + void nodePushUpScans(NodeHandle& node, unsigned pos); + void nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList); + void nodePopDownScans(NodeHandle& node, unsigned pos); + void nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList); + void nodePushDownScans(NodeHandle& node, unsigned pos); + void nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList); + void nodePopUpScans(NodeHandle& node, unsigned pos); + void nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i); // scans linked to node + void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList); + void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList); + void moveScanList(NodeHandle& node, unsigned pos); void linkScan(NodeHandle& node, ScanOpPtr scanPtr); void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr); bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr); @@ -600,10 +606,21 @@ private: /* * DbtuxTree.cpp */ - void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent); - void treeRemove(Signal* signal, Frag& frag, TreePos treePos); - void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); - void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); + // add entry + void treeAdd(Frag& frag, TreePos treePos, TreeEnt ent); + void treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent); + void treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i); + void treeAddRebalance(Frag& frag, NodeHandle node, unsigned i); + // remove entry + void treeRemove(Frag& frag, TreePos treePos); + void treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos); + void treeRemoveSemi(Frag& frag, NodeHandle node, unsigned i); + void treeRemoveLeaf(Frag& frag, NodeHandle node); + void treeRemoveNode(Frag& frag, NodeHandle node); + void treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i); + // rotate + void treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i); + void treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i); /* * DbtuxScan.cpp @@ -615,9 +632,9 @@ private: void execACCKEYCONF(Signal* signal); void execACCKEYREF(Signal* signal); void execACC_ABORTCONF(Signal* signal); - void scanFirst(Signal* signal, ScanOpPtr scanPtr); - void scanNext(Signal* signal, ScanOpPtr scanPtr); - bool scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent); + void scanFirst(ScanOpPtr scanPtr); + void scanNext(ScanOpPtr scanPtr); + bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent); void scanClose(Signal* signal, ScanOpPtr scanPtr); void addAccLockOp(ScanOp& scan, Uint32 accLockOp); void removeAccLockOp(ScanOp& scan, Uint32 accLockOp); @@ -626,9 +643,9 @@ private: /* * DbtuxSearch.cpp */ - void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); - void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); - void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); + void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); + void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); + void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); /* * DbtuxCmp.cpp @@ -652,7 +669,7 @@ private: PrintPar(); }; void printTree(Signal* signal, Frag& frag, NdbOut& out); - void printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par); + void printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par); friend class NdbOut& operator<<(NdbOut&, const TupLoc&); friend class NdbOut& operator<<(NdbOut&, const TreeEnt&); friend class NdbOut& operator<<(NdbOut&, const TreeNode&); diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp index 63c7f5d34a1..6c2a29f54cc 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp @@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) strcpy(par.m_path, "."); par.m_side = 2; par.m_parent = NullTupLoc; - printNode(signal, frag, out, tree.m_root, par); + printNode(frag, out, tree.m_root, par); out.m_out->flush(); if (! par.m_ok) { if (debugFile == 0) { @@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) } void -Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par) +Dbtux::printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par) { if (loc == NullTupLoc) { par.m_depth = 0; @@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& } TreeHead& tree = frag.m_tree; NodeHandle node(frag); - selectNode(signal, node, loc); + selectNode(node, loc); out << par.m_path << " " << node << endl; // check children PrintPar cpar[2]; @@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& cpar[i].m_side = i; cpar[i].m_depth = 0; cpar[i].m_parent = loc; - printNode(signal, frag, out, node.getLink(i), cpar[i]); + printNode(frag, out, node.getLink(i), cpar[i]); if (! cpar[i].m_ok) { par.m_ok = false; } @@ -178,16 +178,19 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& out << "occupancy " << node.getOccup() << " of interior node"; out << " less than min " << tree.m_minOccup << endl; } - // check missed half-leaf/leaf merge +#ifdef dbtux_totally_groks_t_trees + // check missed semi-leaf/leaf merge for (unsigned i = 0; i <= 1; i++) { if (node.getLink(i) != NullTupLoc && node.getLink(1 - i) == NullTupLoc && - node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) { + // our semi-leaf seems to satify interior minOccup condition + node.getOccup() < tree.m_minOccup) { par.m_ok = false; out << par.m_path << sep; out << "missed merge with child " << i << endl; } } +#endif // check inline prefix { ConstData data1 = node.getPref(); Uint32 data2[MaxPrefSize]; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp index 565e64f9aeb..30afb51e7d7 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp @@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) switch (opCode) { case TuxMaintReq::OpAdd: jam(); - searchToAdd(signal, frag, c_searchKey, ent, treePos); + searchToAdd(frag, c_searchKey, ent, treePos); #ifdef VM_TRACE if (debugFlags & DebugMaint) { debugOut << treePos << (treePos.m_match ? " - error" : "") << endl; @@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) break; } /* - * At most one new node is inserted in the operation. We keep one - * free node pre-allocated so the operation cannot fail. + * At most one new node is inserted in the operation. Pre-allocate + * it so that the operation cannot fail. */ if (frag.m_freeLoc == NullTupLoc) { jam(); @@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) jam(); break; } + // link to freelist + node.setLink(0, frag.m_freeLoc); frag.m_freeLoc = node.m_loc; ndbrequire(frag.m_freeLoc != NullTupLoc); } - treeAdd(signal, frag, treePos, ent); + treeAdd(frag, treePos, ent); break; case TuxMaintReq::OpRemove: jam(); - searchToRemove(signal, frag, c_searchKey, ent, treePos); + searchToRemove(frag, c_searchKey, ent, treePos); #ifdef VM_TRACE if (debugFlags & DebugMaint) { debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl; @@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) } break; } - treeRemove(signal, frag, treePos); + treeRemove(frag, treePos); break; default: ndbrequire(false); diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp index e0b7fec19cf..1577c5045e0 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp @@ -211,11 +211,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) // make these configurable later tree.m_nodeSize = MAX_TTREE_NODE_SIZE; tree.m_prefSize = MAX_TTREE_PREF_SIZE; -#ifdef dbtux_min_occup_less_max_occup const unsigned maxSlack = MAX_TTREE_NODE_SLACK; -#else - const unsigned maxSlack = 0; -#endif // size up to and including first 2 entries const unsigned pref = tree.getSize(AccPref); if (! (pref <= tree.m_nodeSize)) { diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp index f155917caf5..389192fd0cf 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp @@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node) * Set handle to point to existing node. */ void -Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc) +Dbtux::selectNode(NodeHandle& node, TupLoc loc) { Frag& frag = node.m_frag; ndbrequire(loc != NullTupLoc); @@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc) } /* - * Set handle to point to new node. Uses the pre-allocated node. + * Set handle to point to new node. Uses a pre-allocated node. */ void -Dbtux::insertNode(Signal* signal, NodeHandle& node) +Dbtux::insertNode(NodeHandle& node) { Frag& frag = node.m_frag; - TupLoc loc = frag.m_freeLoc; - frag.m_freeLoc = NullTupLoc; - selectNode(signal, node, loc); + // unlink from freelist + selectNode(node, frag.m_freeLoc); + frag.m_freeLoc = node.getLink(0); new (node.m_node) TreeNode(); #ifdef VM_TRACE TreeHead& tree = frag.m_tree; @@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node) } /* - * Delete existing node. + * Delete existing node. Simply put it on the freelist. */ void -Dbtux::deleteNode(Signal* signal, NodeHandle& node) +Dbtux::deleteNode(NodeHandle& node) { Frag& frag = node.m_frag; ndbrequire(node.getOccup() == 0); - TupLoc loc = node.m_loc; - Uint32 pageId = loc.getPageId(); - Uint32 pageOffset = loc.getPageOffset(); - Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node); - c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); - jamEntry(); - // invalidate handle and storage + // link to freelist + node.setLink(0, frag.m_freeLoc); + frag.m_freeLoc = node.m_loc; + // invalidate the handle node.m_loc = NullTupLoc; node.m_node = 0; } @@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node) * attribute headers for now. XXX use null mask instead */ void -Dbtux::setNodePref(Signal* signal, NodeHandle& node) +Dbtux::setNodePref(NodeHandle& node) { const Frag& frag = node.m_frag; const TreeHead& tree = frag.m_tree; @@ -117,18 +114,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node) * v * A B C D E _ _ => A B C X D E _ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + * + * Add list of scans at the new entry. */ void -Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent) +Dbtux::nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; const unsigned occup = node.getOccup(); ndbrequire(occup < tree.m_maxOccup && pos <= occup); - // fix scans + // fix old scans + if (node.getNodeScan() != RNIL) + nodePushUpScans(node, pos); + // fix node + TreeEnt* const entList = tree.getEntList(node.m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + for (unsigned i = occup; i > pos; i--) { + jam(); + tmpList[i] = tmpList[i - 1]; + } + tmpList[pos] = ent; + entList[0] = entList[occup + 1]; + node.setOccup(occup + 1); + // add new scans + if (scanList != RNIL) + addScanList(node, pos, scanList); + // fix prefix + if (occup == 0 || pos == 0) + setNodePref(node); +} + +void +Dbtux::nodePushUpScans(NodeHandle& node, unsigned pos) +{ + const unsigned occup = node.getOccup(); ScanOpPtr scanPtr; scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + do { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; @@ -144,21 +168,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& scanPos.m_pos++; } scanPtr.i = scanPtr.p->m_nodeScan; - } - // fix node - TreeEnt* const entList = tree.getEntList(node.m_node); - entList[occup] = entList[0]; - TreeEnt* const tmpList = entList + 1; - for (unsigned i = occup; i > pos; i--) { - jam(); - tmpList[i] = tmpList[i - 1]; - } - tmpList[pos] = ent; - entList[0] = entList[occup + 1]; - node.setOccup(occup + 1); - // fix prefix - if (occup == 0 || pos == 0) - setNodePref(signal, node); + } while (scanPtr.i != RNIL); } /* @@ -169,42 +179,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& * ^ ^ * A B C D E F _ => A B C E F _ _ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + * + * Scans at removed entry are returned if non-zero location is passed or + * else moved forward. */ void -Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) +Dbtux::nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; const unsigned occup = node.getOccup(); ndbrequire(occup <= tree.m_maxOccup && pos < occup); - ScanOpPtr scanPtr; - // move scans whose entry disappears - scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + if (node.getNodeScan() != RNIL) { + // remove or move scans at this position + if (scanList == 0) + moveScanList(node, pos); + else + removeScanList(node, pos, *scanList); + // fix other scans + if (node.getNodeScan() != RNIL) + nodePopDownScans(node, pos); + } + // fix node + TreeEnt* const entList = tree.getEntList(node.m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + ent = tmpList[pos]; + for (unsigned i = pos; i < occup - 1; i++) { jam(); - c_scanOpPool.getPtr(scanPtr); - TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); - const Uint32 nextPtrI = scanPtr.p->m_nodeScan; - if (scanPos.m_pos == pos) { - jam(); -#ifdef VM_TRACE - if (debugFlags & DebugScan) { - debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; - debugOut << "At popDown pos=" << pos << " " << node << endl; - } -#endif - scanNext(signal, scanPtr); - } - scanPtr.i = nextPtrI; + tmpList[i] = tmpList[i + 1]; } - // fix other scans + entList[0] = entList[occup - 1]; + node.setOccup(occup - 1); + // fix prefix + if (occup != 1 && pos == 0) + setNodePref(node); +} + +void +Dbtux::nodePopDownScans(NodeHandle& node, unsigned pos) +{ + const unsigned occup = node.getOccup(); + ScanOpPtr scanPtr; scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + do { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); + // handled before ndbrequire(scanPos.m_pos != pos); if (scanPos.m_pos > pos) { jam(); @@ -217,21 +240,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) scanPos.m_pos--; } scanPtr.i = scanPtr.p->m_nodeScan; - } - // fix node - TreeEnt* const entList = tree.getEntList(node.m_node); - entList[occup] = entList[0]; - TreeEnt* const tmpList = entList + 1; - ent = tmpList[pos]; - for (unsigned i = pos; i < occup - 1; i++) { - jam(); - tmpList[i] = tmpList[i + 1]; - } - entList[0] = entList[occup - 1]; - node.setOccup(occup - 1); - // fix prefix - if (occup != 1 && pos == 0) - setNodePref(signal, node); + } while (scanPtr.i != RNIL); } /* @@ -242,43 +251,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) * ^ v ^ * A B C D E _ _ => B C D X E _ _ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + * + * Return list of scans at the removed position 0. */ void -Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) +Dbtux::nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; const unsigned occup = node.getOccup(); ndbrequire(occup <= tree.m_maxOccup && pos < occup); - ScanOpPtr scanPtr; - // move scans whose entry disappears - scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + if (node.getNodeScan() != RNIL) { + // remove scans at 0 + removeScanList(node, 0, scanList); + // fix other scans + if (node.getNodeScan() != RNIL) + nodePushDownScans(node, pos); + } + // fix node + TreeEnt* const entList = tree.getEntList(node.m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + TreeEnt oldMin = tmpList[0]; + for (unsigned i = 0; i < pos; i++) { jam(); - c_scanOpPool.getPtr(scanPtr); - TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); - const Uint32 nextPtrI = scanPtr.p->m_nodeScan; - if (scanPos.m_pos == 0) { - jam(); -#ifdef VM_TRACE - if (debugFlags & DebugScan) { - debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; - debugOut << "At pushDown pos=" << pos << " " << node << endl; - } -#endif - // here we may miss a valid entry "X" XXX known bug - scanNext(signal, scanPtr); - } - scanPtr.i = nextPtrI; + tmpList[i] = tmpList[i + 1]; } - // fix other scans + tmpList[pos] = ent; + ent = oldMin; + entList[0] = entList[occup]; + // fix prefix + if (true) + setNodePref(node); +} + +void +Dbtux::nodePushDownScans(NodeHandle& node, unsigned pos) +{ + const unsigned occup = node.getOccup(); + ScanOpPtr scanPtr; scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + do { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); + // handled before ndbrequire(scanPos.m_pos != 0); if (scanPos.m_pos <= pos) { jam(); @@ -291,22 +309,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent scanPos.m_pos--; } scanPtr.i = scanPtr.p->m_nodeScan; - } - // fix node - TreeEnt* const entList = tree.getEntList(node.m_node); - entList[occup] = entList[0]; - TreeEnt* const tmpList = entList + 1; - TreeEnt oldMin = tmpList[0]; - for (unsigned i = 0; i < pos; i++) { - jam(); - tmpList[i] = tmpList[i + 1]; - } - tmpList[pos] = ent; - ent = oldMin; - entList[0] = entList[occup]; - // fix prefix - if (true) - setNodePref(signal, node); + } while (scanPtr.i != RNIL); } /* @@ -318,39 +321,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent * v ^ ^ * A B C D E _ _ => X A B C E _ _ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + * + * Move scans at removed entry and add scans at the new entry. */ void -Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) +Dbtux::nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; const unsigned occup = node.getOccup(); ndbrequire(occup <= tree.m_maxOccup && pos < occup); - ScanOpPtr scanPtr; - // move scans whose entry disappears - scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + if (node.getNodeScan() != RNIL) { + // move scans whose entry disappears + moveScanList(node, pos); + // fix other scans + if (node.getNodeScan() != RNIL) + nodePopUpScans(node, pos); + } + // fix node + TreeEnt* const entList = tree.getEntList(node.m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + TreeEnt newMin = ent; + ent = tmpList[pos]; + for (unsigned i = pos; i > 0; i--) { jam(); - c_scanOpPool.getPtr(scanPtr); - TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); - const Uint32 nextPtrI = scanPtr.p->m_nodeScan; - if (scanPos.m_pos == pos) { - jam(); -#ifdef VM_TRACE - if (debugFlags & DebugScan) { - debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; - debugOut << "At popUp pos=" << pos << " " << node << endl; - } -#endif - // here we may miss a valid entry "X" XXX known bug - scanNext(signal, scanPtr); - } - scanPtr.i = nextPtrI; + tmpList[i] = tmpList[i - 1]; } - // fix other scans + tmpList[0] = newMin; + entList[0] = entList[occup]; + // add scans + if (scanList != RNIL) + addScanList(node, 0, scanList); + // fix prefix + if (true) + setNodePref(node); +} + +void +Dbtux::nodePopUpScans(NodeHandle& node, unsigned pos) +{ + const unsigned occup = node.getOccup(); + ScanOpPtr scanPtr; scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + do { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; @@ -367,41 +381,123 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) scanPos.m_pos++; } scanPtr.i = scanPtr.p->m_nodeScan; - } - // fix node - TreeEnt* const entList = tree.getEntList(node.m_node); - entList[occup] = entList[0]; - TreeEnt* const tmpList = entList + 1; - TreeEnt newMin = ent; - ent = tmpList[pos]; - for (unsigned i = pos; i > 0; i--) { - jam(); - tmpList[i] = tmpList[i - 1]; - } - tmpList[0] = newMin; - entList[0] = entList[occup]; - // fix prefix - if (true) - setNodePref(signal, node); + } while (scanPtr.i != RNIL); } /* - * Move all possible entries from another node before the min (i=0) or - * after the max (i=1). XXX can be optimized + * Move number of entries from another node to this node before the min + * (i=0) or after the max (i=1). Expensive but not often used. */ void -Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i) +Dbtux::nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i) { Frag& frag = dstNode.m_frag; TreeHead& tree = frag.m_tree; ndbrequire(i <= 1); - while (dstNode.getOccup() < tree.m_maxOccup && srcNode.getOccup() != 0) { + while (cnt != 0) { TreeEnt ent; - nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent); - nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent); + Uint32 scanList = RNIL; + nodePopDown(srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList); + nodePushUp(dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList); + cnt--; } } +// scans linked to node + + +/* + * Add list of scans to node at given position. + */ +void +Dbtux::addScanList(NodeHandle& node, unsigned pos, Uint32 scanList) +{ + ScanOpPtr scanPtr; + scanPtr.i = scanList; + do { + jam(); + c_scanOpPool.getPtr(scanPtr); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Add scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "To pos=" << pos << " " << node << endl; + } +#endif + const Uint32 nextPtrI = scanPtr.p->m_nodeScan; + scanPtr.p->m_nodeScan = RNIL; + linkScan(node, scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + // set position but leave direction alone + scanPos.m_loc = node.m_loc; + scanPos.m_pos = pos; + scanPtr.i = nextPtrI; + } while (scanPtr.i != RNIL); +} + +/* + * Remove list of scans from node at given position. The return + * location must point to existing list (in fact RNIL always). + */ +void +Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList) +{ + ScanOpPtr scanPtr; + scanPtr.i = node.getNodeScan(); + do { + jam(); + c_scanOpPool.getPtr(scanPtr); + const Uint32 nextPtrI = scanPtr.p->m_nodeScan; + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_loc == node.m_loc); + if (scanPos.m_pos == pos) { + jam(); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Remove scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "Fron pos=" << pos << " " << node << endl; + } +#endif + unlinkScan(node, scanPtr); + scanPtr.p->m_nodeScan = scanList; + scanList = scanPtr.i; + // unset position but leave direction alone + scanPos.m_loc = NullTupLoc; + scanPos.m_pos = ZNIL; + } + scanPtr.i = nextPtrI; + } while (scanPtr.i != RNIL); +} + +/* + * Move list of scans away from entry about to be removed. Uses scan + * method scanNext(). + */ +void +Dbtux::moveScanList(NodeHandle& node, unsigned pos) +{ + ScanOpPtr scanPtr; + scanPtr.i = node.getNodeScan(); + do { + jam(); + c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + const Uint32 nextPtrI = scanPtr.p->m_nodeScan; + ndbrequire(scanPos.m_loc == node.m_loc); + if (scanPos.m_pos == pos) { + jam(); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At pos=" << pos << " " << node << endl; + } +#endif + scanNext(scanPtr); + ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos)); + } + scanPtr.i = nextPtrI; + } while (scanPtr.i != RNIL); +} + /* * Link scan to the list under the node. The list is single-linked and * ordering does not matter. diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp index 585d2dbe58a..c58dec2f102 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp @@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) jam(); const TupLoc loc = scan.m_scanPos.m_loc; NodeHandle node(frag); - selectNode(signal, node, loc); + selectNode(node, loc); unlinkScan(node, scanPtr); scan.m_scanPos.m_loc = NullTupLoc; } @@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) if (scan.m_state == ScanOp::First) { jam(); // search is done only once in single range scan - scanFirst(signal, scanPtr); + scanFirst(scanPtr); #ifdef VM_TRACE if (debugFlags & DebugScan) { debugOut << "First scan " << scanPtr.i << " " << scan << endl; @@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) if (scan.m_state == ScanOp::Next) { jam(); // look for next - scanNext(signal, scanPtr); + scanNext(scanPtr); } // for reading tuple key in Current or Locked state Data pkData = c_dataBuffer; @@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal) * by scanNext. */ void -Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) +Dbtux::scanFirst(ScanOpPtr scanPtr) { ScanOp& scan = *scanPtr.p; Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); @@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) } // search for scan start position TreePos treePos; - searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos); + searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos); if (treePos.m_loc == NullTupLoc) { // empty tree jam(); @@ -710,13 +710,13 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) scan.m_state = ScanOp::Next; // link the scan to node found NodeHandle node(frag); - selectNode(signal, node, treePos.m_loc); + selectNode(node, treePos.m_loc); linkScan(node, scanPtr); } /* * Move to next entry. The scan is already linked to some node. When - * we leave, if any entry was found, it will be linked to a possibly + * we leave, if an entry was found, it will be linked to a possibly * different node. The scan has a position, and a direction which tells * from where we came to this position. This is one of: * @@ -725,9 +725,12 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) * 2 - up from root (the scan ends) * 3 - left to right within node (at end proceed to right child) * 4 - down from parent (proceed to left child) + * + * If an entry was found, scan direction is 3. Therefore tree + * re-organizations need not worry about scan direction. */ void -Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) +Dbtux::scanNext(ScanOpPtr scanPtr) { ScanOp& scan = *scanPtr.p; Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); @@ -736,22 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) debugOut << "Next in scan " << scanPtr.i << " " << scan << endl; } #endif - if (scan.m_state == ScanOp::Locked) { - jam(); - // version of a tuple locked by us cannot disappear (assert only) -#ifdef dbtux_wl_1942_is_done - ndbassert(false); -#endif - AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); - lockReq->returnCode = RNIL; - lockReq->requestInfo = AccLockReq::Unlock; - lockReq->accOpPtr = scan.m_accLockOp; - EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); - jamEntry(); - ndbrequire(lockReq->returnCode == AccLockReq::Success); - scan.m_accLockOp = RNIL; - scan.m_state = ScanOp::Current; - } + // cannot be moved away from tuple we have locked + ndbrequire(scan.m_state != ScanOp::Locked); // set up index keys for this operation setKeyAttrs(frag); // unpack upper bound into c_dataBuffer @@ -767,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) TreePos pos = scan.m_scanPos; // get and remember original node NodeHandle origNode(frag); - selectNode(signal, origNode, pos.m_loc); + selectNode(origNode, pos.m_loc); ndbrequire(islinkScan(origNode, scanPtr)); // current node in loop NodeHandle node = origNode; @@ -784,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) } if (node.m_loc != pos.m_loc) { jam(); - selectNode(signal, node, pos.m_loc); + selectNode(node, pos.m_loc); } if (pos.m_dir == 4) { // coming down from parent proceed to left child @@ -832,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) break; } // can we see it - if (! scanVisible(signal, scanPtr, ent)) { + if (! scanVisible(scanPtr, ent)) { jam(); continue; } @@ -864,6 +853,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) scan.m_scanPos = pos; // relink if (scan.m_state == ScanOp::Current) { + ndbrequire(pos.m_match == true && pos.m_dir == 3); ndbrequire(pos.m_loc == node.m_loc); if (origNode.m_loc != node.m_loc) { jam(); @@ -894,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) * which are not analyzed or handled yet. */ bool -Dbtux::scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent) +Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent) { const ScanOp& scan = *scanPtr.p; const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp index 1e20d3e3718..0f4b0862960 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp @@ -25,7 +25,7 @@ * TODO optimize for initial equal attrs in node min/max */ void -Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) +Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) { const TreeHead& tree = frag.m_tree; const unsigned numAttrs = frag.m_numAttrs; @@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear NodeHandle bottomNode(frag); while (true) { jam(); - selectNode(signal, currNode, currNode.m_loc); + selectNode(currNode, currNode.m_loc); int ret; // compare prefix unsigned start = 0; @@ -159,12 +159,12 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear * * Compares search key to each node min. A move to right subtree can * overshoot target node. The last such node is saved. The final node - * is a half-leaf or leaf. If search key is less than final node min + * is a semi-leaf or leaf. If search key is less than final node min * then the saved node is the g.l.b of the final node and we move back * to it. */ void -Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) +Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) { const TreeHead& tree = frag.m_tree; const unsigned numAttrs = frag.m_numAttrs; @@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s NodeHandle glbNode(frag); // potential g.l.b of final node while (true) { jam(); - selectNode(signal, currNode, currNode.m_loc); + selectNode(currNode, currNode.m_loc); int ret; // compare prefix unsigned start = 0; @@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s * Similar to searchToAdd. */ void -Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos) +Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos) { const TreeHead& tree = frag.m_tree; NodeHandle currNode(frag); @@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo NodeHandle bottomNode(frag); while (true) { jam(); - selectNode(signal, currNode, currNode.m_loc); + selectNode(currNode, currNode.m_loc); int ret; // compare prefix ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize); diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp index 84d26976e05..b9e3b593a00 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp @@ -18,71 +18,105 @@ #include "Dbtux.hpp" /* - * Add entry. + * Add entry. Handle the case when there is room for one more. This + * is the common case given slack in nodes. */ void -Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) +Dbtux::treeAdd(Frag& frag, TreePos treePos, TreeEnt ent) { TreeHead& tree = frag.m_tree; - unsigned pos = treePos.m_pos; NodeHandle node(frag); - // check for empty tree - if (treePos.m_loc == NullTupLoc) { - jam(); - insertNode(signal, node); - nodePushUp(signal, node, 0, ent); - node.setSide(2); - tree.m_root = node.m_loc; - return; - } - selectNode(signal, node, treePos.m_loc); - // check if it is bounding node - if (pos != 0 && pos != node.getOccup()) { + if (treePos.m_loc != NullTupLoc) { + // non-empty tree jam(); - // check if room for one more + selectNode(node, treePos.m_loc); + unsigned pos = treePos.m_pos; if (node.getOccup() < tree.m_maxOccup) { + // node has room jam(); - nodePushUp(signal, node, pos, ent); + nodePushUp(node, pos, ent, RNIL); return; } - // returns min entry - nodePushDown(signal, node, pos - 1, ent); - // find position to add the removed min entry - TupLoc childLoc = node.getLink(0); - if (childLoc == NullTupLoc) { + treeAddFull(frag, node, pos, ent); + return; + } + jam(); + insertNode(node); + nodePushUp(node, 0, ent, RNIL); + node.setSide(2); + tree.m_root = node.m_loc; +} + +/* + * Add entry when node is full. Handle the case when there is g.l.b + * node in left subtree with room for one more. It will receive the min + * entry of this node. The min entry could be the entry to add. + */ +void +Dbtux::treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent) +{ + TreeHead& tree = frag.m_tree; + TupLoc loc = lubNode.getLink(0); + if (loc != NullTupLoc) { + // find g.l.b node + NodeHandle glbNode(frag); + do { jam(); - // left child will be added - pos = 0; - } else { + selectNode(glbNode, loc); + loc = glbNode.getLink(1); + } while (loc != NullTupLoc); + if (glbNode.getOccup() < tree.m_maxOccup) { + // g.l.b node has room jam(); - // find glb node - while (childLoc != NullTupLoc) { + Uint32 scanList = RNIL; + if (pos != 0) { jam(); - selectNode(signal, node, childLoc); - childLoc = node.getLink(1); + // add the new entry and return min entry + nodePushDown(lubNode, pos - 1, ent, scanList); } - pos = node.getOccup(); + // g.l.b node receives min entry from l.u.b node + nodePushUp(glbNode, glbNode.getOccup(), ent, scanList); + return; } - // fall thru to next case - } - // adding new min or max - unsigned i = (pos == 0 ? 0 : 1); - ndbrequire(node.getLink(i) == NullTupLoc); - // check if the half-leaf/leaf has room for one more - if (node.getOccup() < tree.m_maxOccup) { - jam(); - nodePushUp(signal, node, pos, ent); + treeAddNode(frag, lubNode, pos, ent, glbNode, 1); return; } - // add a new node - NodeHandle childNode(frag); - insertNode(signal, childNode); - nodePushUp(signal, childNode, 0, ent); + treeAddNode(frag, lubNode, pos, ent, lubNode, 0); +} + +/* + * Add entry when there is no g.l.b node in left subtree or the g.l.b + * node is full. We must add a new left or right child node which + * becomes the new g.l.b node. + */ +void +Dbtux::treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i) +{ + NodeHandle glbNode(frag); + insertNode(glbNode); // connect parent and child - node.setLink(i, childNode.m_loc); - childNode.setLink(2, node.m_loc); - childNode.setSide(i); - // re-balance tree at each node + parentNode.setLink(i, glbNode.m_loc); + glbNode.setLink(2, parentNode.m_loc); + glbNode.setSide(i); + Uint32 scanList = RNIL; + if (pos != 0) { + jam(); + // add the new entry and return min entry + nodePushDown(lubNode, pos - 1, ent, scanList); + } + // g.l.b node receives min entry from l.u.b node + nodePushUp(glbNode, 0, ent, scanList); + // re-balance the tree + treeAddRebalance(frag, parentNode, i); +} + +/* + * Re-balance tree after adding a node. The process starts with the + * parent of the added node. + */ +void +Dbtux::treeAddRebalance(Frag& frag, NodeHandle node, unsigned i) +{ while (true) { // height of subtree i has increased by 1 int j = (i == 0 ? -1 : +1); @@ -102,14 +136,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) // height of longer subtree increased jam(); NodeHandle childNode(frag); - selectNode(signal, childNode, node.getLink(i)); + selectNode(childNode, node.getLink(i)); int b2 = childNode.getBalance(); if (b2 == b) { jam(); - treeRotateSingle(signal, frag, node, i); + treeRotateSingle(frag, node, i); } else if (b2 == -b) { jam(); - treeRotateDouble(signal, frag, node, i); + treeRotateDouble(frag, node, i); } else { // height of subtree increased so it cannot be perfectly balanced ndbrequire(false); @@ -126,115 +160,169 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) break; } i = node.getSide(); - selectNode(signal, node, parentLoc); + selectNode(node, parentLoc); } } /* - * Remove entry. + * Remove entry. Optimize for nodes with slack. Handle the case when + * there is no underflow i.e. occupancy remains at least minOccup. For + * interior nodes this is a requirement. For others it means that we do + * not need to consider merge of semi-leaf and leaf. */ void -Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) +Dbtux::treeRemove(Frag& frag, TreePos treePos) { TreeHead& tree = frag.m_tree; unsigned pos = treePos.m_pos; NodeHandle node(frag); - selectNode(signal, node, treePos.m_loc); + selectNode(node, treePos.m_loc); TreeEnt ent; - // check interior node first - if (node.getChilds() == 2) { + if (node.getOccup() > tree.m_minOccup) { + // no underflow in any node type jam(); - ndbrequire(node.getOccup() >= tree.m_minOccup); - // check if no underflow - if (node.getOccup() > tree.m_minOccup) { - jam(); - nodePopDown(signal, node, pos, ent); - return; - } - // save current handle - NodeHandle parentNode = node; - // find glb node - TupLoc childLoc = node.getLink(0); - while (childLoc != NullTupLoc) { - jam(); - selectNode(signal, node, childLoc); - childLoc = node.getLink(1); - } - // use glb max as new parent min - ent = node.getEnt(node.getOccup() - 1); - nodePopUp(signal, parentNode, pos, ent); - // set up to remove glb max - pos = node.getOccup() - 1; - // fall thru to next case + nodePopDown(node, pos, ent, 0); + return; } - // remove the element - nodePopDown(signal, node, pos, ent); - ndbrequire(node.getChilds() <= 1); - // handle half-leaf - unsigned i; - for (i = 0; i <= 1; i++) { + if (node.getChilds() == 2) { + // underflow in interior node jam(); - TupLoc childLoc = node.getLink(i); - if (childLoc != NullTupLoc) { - // move to child - selectNode(signal, node, childLoc); - // balance of half-leaf parent requires child to be leaf - break; - } + treeRemoveInner(frag, node, pos); + return; } - ndbrequire(node.getChilds() == 0); - // get parent if any - TupLoc parentLoc = node.getLink(2); - NodeHandle parentNode(frag); - i = node.getSide(); - // move all that fits into parent - if (parentLoc != NullTupLoc) { + // remove entry in semi/leaf + nodePopDown(node, pos, ent, 0); + if (node.getLink(0) != NullTupLoc) { jam(); - selectNode(signal, parentNode, node.getLink(2)); - nodeSlide(signal, parentNode, node, i); - // fall thru to next case + treeRemoveSemi(frag, node, 0); + return; } - // non-empty leaf - if (node.getOccup() >= 1) { + if (node.getLink(1) != NullTupLoc) { jam(); + treeRemoveSemi(frag, node, 1); return; } - // remove empty leaf - deleteNode(signal, node); - if (parentLoc == NullTupLoc) { + treeRemoveLeaf(frag, node); +} + +/* + * Remove entry when interior node underflows. There is g.l.b node in + * left subtree to borrow an entry from. The max entry of the g.l.b + * node becomes the min entry of this node. + */ +void +Dbtux::treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos) +{ + TreeHead& tree = frag.m_tree; + TreeEnt ent; + // find g.l.b node + NodeHandle glbNode(frag); + TupLoc loc = lubNode.getLink(0); + do { + jam(); + selectNode(glbNode, loc); + loc = glbNode.getLink(1); + } while (loc != NullTupLoc); + // borrow max entry from semi/leaf + Uint32 scanList = RNIL; + nodePopDown(glbNode, glbNode.getOccup() - 1, ent, &scanList); + nodePopUp(lubNode, pos, ent, scanList); + if (glbNode.getLink(0) != NullTupLoc) { jam(); - // tree is now empty - tree.m_root = NullTupLoc; + treeRemoveSemi(frag, glbNode, 0); return; } - node = parentNode; - node.setLink(i, NullTupLoc); -#ifdef dbtux_min_occup_less_max_occup - // check if we created a half-leaf - if (node.getBalance() == 0) { + treeRemoveLeaf(frag, glbNode); +} + +/* + * Handle semi-leaf after removing an entry. Move entries from leaf to + * semi-leaf to bring semi-leaf occupancy above minOccup, if possible. + * The leaf may become empty. + */ +void +Dbtux::treeRemoveSemi(Frag& frag, NodeHandle semiNode, unsigned i) +{ + TreeHead& tree = frag.m_tree; + ndbrequire(semiNode.getChilds() < 2); + TupLoc leafLoc = semiNode.getLink(i); + NodeHandle leafNode(frag); + selectNode(leafNode, leafLoc); + if (semiNode.getOccup() < tree.m_minOccup) { + jam(); + unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - semiNode.getOccup()); + nodeSlide(semiNode, leafNode, cnt, i); + if (leafNode.getOccup() == 0) { + // remove empty leaf + jam(); + treeRemoveNode(frag, leafNode); + } + } +} + +/* + * Handle leaf after removing an entry. If parent is semi-leaf, move + * entries to it as in the semi-leaf case. If parent is interior node, + * do nothing. + */ +void +Dbtux::treeRemoveLeaf(Frag& frag, NodeHandle leafNode) +{ + TreeHead& tree = frag.m_tree; + TupLoc parentLoc = leafNode.getLink(2); + if (parentLoc != NullTupLoc) { jam(); - // move entries from the other child - TupLoc childLoc = node.getLink(1 - i); - NodeHandle childNode(frag); - selectNode(signal, childNode, childLoc); - nodeSlide(signal, node, childNode, 1 - i); - if (childNode.getOccup() == 0) { + NodeHandle parentNode(frag); + selectNode(parentNode, parentLoc); + unsigned i = leafNode.getSide(); + if (parentNode.getLink(1 - i) == NullTupLoc) { + // parent is semi-leaf jam(); - deleteNode(signal, childNode); - node.setLink(1 - i, NullTupLoc); - // we are balanced again but our parent balance changes by -1 - parentLoc = node.getLink(2); - if (parentLoc == NullTupLoc) { + if (parentNode.getOccup() < tree.m_minOccup) { jam(); - return; + unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - parentNode.getOccup()); + nodeSlide(parentNode, leafNode, cnt, i); } - // fix side and become parent - i = node.getSide(); - selectNode(signal, node, parentLoc); } } -#endif - // re-balance tree at each node + if (leafNode.getOccup() == 0) { + jam(); + // remove empty leaf + treeRemoveNode(frag, leafNode); + } +} + +/* + * Remove empty leaf. + */ +void +Dbtux::treeRemoveNode(Frag& frag, NodeHandle leafNode) +{ + TreeHead& tree = frag.m_tree; + ndbrequire(leafNode.getChilds() == 0); + TupLoc parentLoc = leafNode.getLink(2); + unsigned i = leafNode.getSide(); + deleteNode(leafNode); + if (parentLoc != NullTupLoc) { + jam(); + NodeHandle parentNode(frag); + selectNode(parentNode, parentLoc); + parentNode.setLink(i, NullTupLoc); + // re-balance the tree + treeRemoveRebalance(frag, parentNode, i); + return; + } + // tree is now empty + tree.m_root = NullTupLoc; +} + +/* + * Re-balance tree after removing a node. The process starts with the + * parent of the removed node. + */ +void +Dbtux::treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i) +{ while (true) { // height of subtree i has decreased by 1 int j = (i == 0 ? -1 : +1); @@ -255,19 +343,19 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) jam(); // child on the other side NodeHandle childNode(frag); - selectNode(signal, childNode, node.getLink(1 - i)); + selectNode(childNode, node.getLink(1 - i)); int b2 = childNode.getBalance(); if (b2 == b) { jam(); - treeRotateSingle(signal, frag, node, 1 - i); + treeRotateSingle(frag, node, 1 - i); // height of tree decreased and propagates up } else if (b2 == -b) { jam(); - treeRotateDouble(signal, frag, node, 1 - i); + treeRotateDouble(frag, node, 1 - i); // height of tree decreased and propagates up } else { jam(); - treeRotateSingle(signal, frag, node, 1 - i); + treeRotateSingle(frag, node, 1 - i); // height of tree did not change - done return; } @@ -281,7 +369,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) return; } i = node.getSide(); - selectNode(signal, node, parentLoc); + selectNode(node, parentLoc); } } @@ -302,10 +390,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) * all optional. If 4 are there it changes side. */ void -Dbtux::treeRotateSingle(Signal* signal, - Frag& frag, - NodeHandle& node, - unsigned i) +Dbtux::treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i) { ndbrequire(i <= 1); /* @@ -325,7 +410,7 @@ Dbtux::treeRotateSingle(Signal* signal, */ TupLoc loc3 = node5.getLink(i); NodeHandle node3(frag); - selectNode(signal, node3, loc3); + selectNode(node3, loc3); const int bal3 = node3.getBalance(); /* 2 must always be there but is not changed. Thus we mereley check that it @@ -342,7 +427,7 @@ Dbtux::treeRotateSingle(Signal* signal, NodeHandle node4(frag); if (loc4 != NullTupLoc) { jam(); - selectNode(signal, node4, loc4); + selectNode(node4, loc4); ndbrequire(node4.getSide() == (1 - i) && node4.getLink(2) == loc3); node4.setSide(i); @@ -377,7 +462,7 @@ Dbtux::treeRotateSingle(Signal* signal, if (loc0 != NullTupLoc) { jam(); NodeHandle node0(frag); - selectNode(signal, node0, loc0); + selectNode(node0, loc0); node0.setLink(side5, loc3); } else { jam(); @@ -514,8 +599,10 @@ Dbtux::treeRotateSingle(Signal* signal, * */ void -Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i) +Dbtux::treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i) { + TreeHead& tree = frag.m_tree; + // old top node NodeHandle node6 = node; const TupLoc loc6 = node6.m_loc; @@ -526,13 +613,13 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i // level 1 TupLoc loc2 = node6.getLink(i); NodeHandle node2(frag); - selectNode(signal, node2, loc2); + selectNode(node2, loc2); const int bal2 = node2.getBalance(); // level 2 TupLoc loc4 = node2.getLink(1 - i); NodeHandle node4(frag); - selectNode(signal, node4, loc4); + selectNode(node4, loc4); const int bal4 = node4.getBalance(); ndbrequire(i <= 1); @@ -549,23 +636,26 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i // fill up leaf before it becomes internal if (loc3 == NullTupLoc && loc5 == NullTupLoc) { jam(); - TreeHead& tree = frag.m_tree; - nodeSlide(signal, node4, node2, i); - // implied by rule of merging half-leaves with leaves - ndbrequire(node4.getOccup() >= tree.m_minOccup); - ndbrequire(node2.getOccup() != 0); + if (node4.getOccup() < tree.m_minOccup) { + jam(); + unsigned cnt = tree.m_minOccup - node4.getOccup(); + ndbrequire(cnt < node2.getOccup()); + nodeSlide(node4, node2, cnt, i); + ndbrequire(node4.getOccup() >= tree.m_minOccup); + ndbrequire(node2.getOccup() != 0); + } } else { if (loc3 != NullTupLoc) { jam(); NodeHandle node3(frag); - selectNode(signal, node3, loc3); + selectNode(node3, loc3); node3.setLink(2, loc2); node3.setSide(1 - i); } if (loc5 != NullTupLoc) { jam(); NodeHandle node5(frag); - selectNode(signal, node5, loc5); + selectNode(node5, loc5); node5.setLink(2, node6.m_loc); node5.setSide(i); } @@ -588,7 +678,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i if (loc0 != NullTupLoc) { jam(); - selectNode(signal, node0, loc0); + selectNode(node0, loc0); node0.setLink(side6, loc4); } else { jam(); diff --git a/ndb/src/kernel/blocks/dbtux/Times.txt b/ndb/src/kernel/blocks/dbtux/Times.txt index 8fbb695ef82..bf466d12604 100644 --- a/ndb/src/kernel/blocks/dbtux/Times.txt +++ b/ndb/src/kernel/blocks/dbtux/Times.txt @@ -29,6 +29,7 @@ shows ms / 1000 rows for each and index time overhead samples 10% of all PKs (100,000 pk reads, 100,000 scans) the "pct" values are from more accurate total times (not shown) +comments [ ... ] are after the case 040616 mc02/a 40 ms 87 ms 114 pct mc02/b 51 ms 128 ms 148 pct @@ -76,13 +77,12 @@ optim 13 mc02/a 40 ms 57 ms 42 pct mc02/c 9 ms 13 ms 50 pct mc02/d 170 ms 256 ms 50 pct -after wl-1884 store all-NULL keys (the tests have pctnull=10 per column) - optim 13 mc02/a 39 ms 59 ms 50 pct mc02/b 47 ms 77 ms 61 pct mc02/c 9 ms 12 ms 44 pct mc02/d 246 ms 289 ms 17 pct +[ after wl-1884 store all-NULL keys (the tests have pctnull=10 per column) ] [ case d: bug in testOIBasic killed PK read performance ] optim 14 mc02/a 41 ms 60 ms 44 pct @@ -98,8 +98,7 @@ none mc02/a 35 ms 60 ms 71 pct mc02/c 5 ms 12 ms 106 pct mc02/d 165 ms 238 ms 44 pct -[ johan re-installed mc02 as fedora gcc-3.3.2 ] -[ case c: table scan has improved... ] +[ johan re-installed mc02 as fedora gcc-3.3.2, tux uses more C++ stuff than tup] charsets mc02/a 35 ms 60 ms 71 pct mc02/b 42 ms 84 ms 97 pct @@ -118,6 +117,19 @@ optim 15 mc02/a 34 ms 60 ms 72 pct optim 16 mc02/a 34 ms 53 ms 53 pct mc02/b 42 ms 75 ms 75 pct -[ case a, b: binary search of bounding node when adding entry ] +[ binary search of bounding node when adding entry ] + +none mc02/a 35 ms 53 ms 51 pct + mc02/b 42 ms 75 ms 76 pct + +[ rewrote treeAdd / treeRemove ] + +optim 17 mc02/a 35 ms 52 ms 49 pct + mc02/b 43 ms 75 ms 75 pct + +[ allow slack (2) in interior nodes - almost no effect?? ] + +wl-1942 mc02/a 35 ms 52 ms 49 pct + mc02/b 42 ms 75 ms 76 pct vim: set et: diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp index 214816a1ba1..c94dbf0fb19 100644 --- a/ndb/test/ndbapi/testOIBasic.cpp +++ b/ndb/test/ndbapi/testOIBasic.cpp @@ -42,7 +42,7 @@ struct Opt { CHARSET_INFO* m_cs; bool m_dups; NdbDictionary::Object::FragmentType m_fragtype; - unsigned m_idxloop; + unsigned m_subsubloop; const char* m_index; unsigned m_loop; bool m_nologging; @@ -66,7 +66,7 @@ struct Opt { m_cs(0), m_dups(false), m_fragtype(NdbDictionary::Object::FragUndefined), - m_idxloop(4), + m_subsubloop(4), m_index(0), m_loop(1), m_nologging(false), @@ -79,7 +79,7 @@ struct Opt { m_seed(0), m_subloop(4), m_table(0), - m_threads(4), + m_threads(6), // table + 5 indexes m_v(1) { } }; @@ -208,6 +208,8 @@ struct Par : public Opt { Set& set() const { assert(m_set != 0); return *m_set; } Tmr* m_tmr; Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; } + unsigned m_lno; + unsigned m_slno; unsigned m_totrows; // value calculation unsigned m_range; @@ -226,6 +228,8 @@ struct Par : public Opt { m_tab(0), m_set(0), m_tmr(0), + m_lno(0), + m_slno(0), m_totrows(m_threads * m_rows), m_range(m_rows), m_pctrange(0), @@ -2069,7 +2073,8 @@ pkinsert(Par par) CHK(con.startTransaction() == 0); Lst lst; for (unsigned j = 0; j < par.m_rows; j++) { - unsigned i = thrrow(par, j); + unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows); + unsigned i = thrrow(par, j2); set.lock(); if (set.exist(i) || set.pending(i)) { set.unlock(); @@ -2174,7 +2179,8 @@ pkdelete(Par par) Lst lst; bool deadlock = false; for (unsigned j = 0; j < par.m_rows; j++) { - unsigned i = thrrow(par, j); + unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows); + unsigned i = thrrow(par, j2); set.lock(); if (! set.exist(i) || set.pending(i)) { set.unlock(); @@ -2398,7 +2404,7 @@ static int scanreadindex(Par par, const ITab& itab) { const Tab& tab = par.tab(); - for (unsigned i = 0; i < par.m_idxloop; i++) { + for (unsigned i = 0; i < par.m_subsubloop; i++) { BSet bset(tab, itab, par.m_rows); bset.calc(par); CHK(scanreadindex(par, itab, bset) == 0); @@ -2645,7 +2651,7 @@ static int scanupdateindex(Par par, const ITab& itab) { const Tab& tab = par.tab(); - for (unsigned i = 0; i < par.m_idxloop; i++) { + for (unsigned i = 0; i < par.m_subsubloop; i++) { BSet bset(tab, itab, par.m_rows); bset.calc(par); CHK(scanupdateindex(par, itab, bset) == 0); @@ -2686,6 +2692,53 @@ readverify(Par par) } static int +readverifyfull(Par par) +{ + par.m_verify = true; + if (par.m_no == 0) + CHK(scanreadtable(par) == 0); + else { + const Tab& tab = par.tab(); + unsigned i = par.m_no; + if (i <= tab.m_itabs && useindex(i)) { + const ITab& itab = tab.m_itab[i - 1]; + BSet bset(tab, itab, par.m_rows); + CHK(scanreadindex(par, itab, bset) == 0); + } + } + return 0; +} + +static int +pkops(Par par) +{ + par.m_randomkey = true; + for (unsigned i = 0; i < par.m_subsubloop; i++) { + unsigned sel = urandom(10); + if (par.m_slno % 2 == 0) { + // favor insert + if (sel < 8) { + CHK(pkinsert(par) == 0); + } else if (sel < 9) { + CHK(pkupdate(par) == 0); + } else { + CHK(pkdelete(par) == 0); + } + } else { + // favor delete + if (sel < 1) { + CHK(pkinsert(par) == 0); + } else if (sel < 2) { + CHK(pkupdate(par) == 0); + } else { + CHK(pkdelete(par) == 0); + } + } + } + return 0; +} + +static int pkupdatescanread(Par par) { par.m_dups = true; @@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode) thr.m_par.m_tab = par.m_tab; thr.m_par.m_set = par.m_set; thr.m_par.m_tmr = par.m_tmr; + thr.m_par.m_lno = par.m_lno; + thr.m_par.m_slno = par.m_slno; thr.m_func = func; thr.start(); } @@ -2953,8 +3008,8 @@ tbuild(Par par) RUNSTEP(par, droptable, ST); RUNSTEP(par, createtable, ST); RUNSTEP(par, invalidatetable, MT); - for (unsigned i = 0; i < par.m_subloop; i++) { - if (i % 2 == 0) { + for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { + if (par.m_slno % 2 == 0) { RUNSTEP(par, createindex, ST); RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, pkinsert, MT); @@ -2963,9 +3018,10 @@ tbuild(Par par) RUNSTEP(par, createindex, ST); RUNSTEP(par, invalidateindex, MT); } - RUNSTEP(par, readverify, MT); + RUNSTEP(par, pkupdate, MT); + RUNSTEP(par, readverifyfull, MT); RUNSTEP(par, pkdelete, MT); - RUNSTEP(par, readverify, MT); + RUNSTEP(par, readverifyfull, MT); RUNSTEP(par, dropindex, ST); } return 0; @@ -2977,11 +3033,27 @@ tpkops(Par par) RUNSTEP(par, droptable, ST); RUNSTEP(par, createtable, ST); RUNSTEP(par, invalidatetable, MT); + RUNSTEP(par, createindex, ST); + RUNSTEP(par, invalidateindex, MT); + for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { + RUNSTEP(par, pkops, MT); + LL2("rows=" << par.set().count()); + RUNSTEP(par, readverifyfull, MT); + } + return 0; +} + +static int +tpkopsread(Par par) +{ + RUNSTEP(par, droptable, ST); + RUNSTEP(par, createtable, ST); + RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, pkinsert, MT); RUNSTEP(par, createindex, ST); RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, readverify, ST); - for (unsigned i = 0; i < par.m_subloop; i++) { + for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { RUNSTEP(par, pkupdatescanread, MT); RUNSTEP(par, readverify, ST); } @@ -3000,7 +3072,7 @@ tmixedops(Par par) RUNSTEP(par, createindex, ST); RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, readverify, ST); - for (unsigned i = 0; i < par.m_subloop; i++) { + for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { RUNSTEP(par, mixedoperations, MT); RUNSTEP(par, readverify, ST); } @@ -3014,7 +3086,7 @@ tbusybuild(Par par) RUNSTEP(par, createtable, ST); RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, pkinsert, MT); - for (unsigned i = 0; i < par.m_subloop; i++) { + for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { RUNSTEP(par, pkupdateindexbuild, MT); RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, readverify, ST); @@ -3030,7 +3102,7 @@ ttimebuild(Par par) RUNSTEP(par, droptable, ST); RUNSTEP(par, createtable, ST); RUNSTEP(par, invalidatetable, MT); - for (unsigned i = 0; i < par.m_subloop; i++) { + for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { RUNSTEP(par, pkinsert, MT); t1.on(); RUNSTEP(par, createindex, ST); @@ -3049,7 +3121,7 @@ ttimemaint(Par par) RUNSTEP(par, droptable, ST); RUNSTEP(par, createtable, ST); RUNSTEP(par, invalidatetable, MT); - for (unsigned i = 0; i < par.m_subloop; i++) { + for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { RUNSTEP(par, pkinsert, MT); t1.on(); RUNSTEP(par, pkupdate, MT); @@ -3074,7 +3146,7 @@ ttimescan(Par par) RUNSTEP(par, droptable, ST); RUNSTEP(par, createtable, ST); RUNSTEP(par, invalidatetable, MT); - for (unsigned i = 0; i < par.m_subloop; i++) { + for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { RUNSTEP(par, pkinsert, MT); RUNSTEP(par, createindex, ST); par.m_tmr = &t1; @@ -3096,7 +3168,7 @@ ttimepkread(Par par) RUNSTEP(par, droptable, ST); RUNSTEP(par, createtable, ST); RUNSTEP(par, invalidatetable, MT); - for (unsigned i = 0; i < par.m_subloop; i++) { + for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { RUNSTEP(par, pkinsert, MT); RUNSTEP(par, createindex, ST); par.m_tmr = &t1; @@ -3132,9 +3204,10 @@ struct TCase { static const TCase tcaselist[] = { TCase("a", tbuild, "index build"), - TCase("b", tpkops, "pk operations and scan reads"), - TCase("c", tmixedops, "pk operations and scan operations"), - TCase("d", tbusybuild, "pk operations and index build"), + TCase("b", tpkops, "pk operations"), + TCase("c", tpkopsread, "pk operations and scan reads"), + TCase("d", tmixedops, "pk operations and scan operations"), + TCase("e", tbusybuild, "pk operations and index build"), TCase("t", ttimebuild, "time index build"), TCase("u", ttimemaint, "time index maintenance"), TCase("v", ttimescan, "time full scan table vs index on pk"), @@ -3192,10 +3265,10 @@ runtest(Par par) Thr& thr = *g_thrlist[n]; assert(thr.m_thread != 0); } - for (unsigned l = 0; par.m_loop == 0 || l < par.m_loop; l++) { - LL1("loop " << l); + for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) { + LL1("loop " << par.m_lno); if (par.m_seed == 0) - srandom(l); + srandom(par.m_lno); for (unsigned i = 0; i < tcasecount; i++) { const TCase& tcase = tcaselist[i]; if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0) |