summaryrefslogtreecommitdiff
path: root/ndb
diff options
context:
space:
mode:
authorunknown <pekka@mysql.com>2004-10-18 15:39:20 +0200
committerunknown <pekka@mysql.com>2004-10-18 15:39:20 +0200
commit4cbb9917cb41d6ee2b258252224286e985380bfc (patch)
tree5cbdced5cdce5057e36fbfe81749f75256d981cf /ndb
parent012de9d742c73de66683997a904acb83d967575a (diff)
parent4a9f774560846fe45bd43c221ba276153f271b60 (diff)
downloadmariadb-git-4cbb9917cb41d6ee2b258252224286e985380bfc.tar.gz
Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb
into mysql.com:/space/pekka/ndb/version/my41-tux
Diffstat (limited to 'ndb')
-rw-r--r--ndb/include/kernel/ndb_limits.h2
-rw-r--r--ndb/src/kernel/blocks/dbtux/Dbtux.hpp61
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp15
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp14
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp4
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp406
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp46
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp14
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp396
-rw-r--r--ndb/src/kernel/blocks/dbtux/Times.txt22
-rw-r--r--ndb/test/ndbapi/testOIBasic.cpp121
11 files changed, 690 insertions, 411 deletions
diff --git a/ndb/include/kernel/ndb_limits.h b/ndb/include/kernel/ndb_limits.h
index 9411a98f091..00378bcab81 100644
--- a/ndb/include/kernel/ndb_limits.h
+++ b/ndb/include/kernel/ndb_limits.h
@@ -110,7 +110,7 @@
*/
#define MAX_TTREE_NODE_SIZE 64 // total words in node
#define MAX_TTREE_PREF_SIZE 4 // words in min prefix
-#define MAX_TTREE_NODE_SLACK 3 // diff between max and min occupancy
+#define MAX_TTREE_NODE_SLACK 2 // diff between max and min occupancy
/*
* Blobs.
diff --git a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
index ad748d2636f..dc95bf8e8ea 100644
--- a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
+++ b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
@@ -32,7 +32,6 @@
// signal classes
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/TuxContinueB.hpp>
-#include <signaldata/BuildIndx.hpp>
#include <signaldata/TupFrag.hpp>
#include <signaldata/AlterIndx.hpp>
#include <signaldata/DropTab.hpp>
@@ -478,7 +477,7 @@ private:
Uint16 m_numAttrs;
bool m_storeNullKey;
TreeHead m_tree;
- TupLoc m_freeLoc; // one node pre-allocated for insert
+ TupLoc m_freeLoc; // list of free index nodes
DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2];
@@ -582,17 +581,24 @@ private:
* DbtuxNode.cpp
*/
int allocNode(Signal* signal, NodeHandle& node);
- void selectNode(Signal* signal, NodeHandle& node, TupLoc loc);
- void insertNode(Signal* signal, NodeHandle& node);
- void deleteNode(Signal* signal, NodeHandle& node);
- void setNodePref(Signal* signal, NodeHandle& node);
+ void selectNode(NodeHandle& node, TupLoc loc);
+ void insertNode(NodeHandle& node);
+ void deleteNode(NodeHandle& node);
+ void setNodePref(NodeHandle& node);
// node operations
- void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
- void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
- void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
- void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
- void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i);
+ void nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList);
+ void nodePushUpScans(NodeHandle& node, unsigned pos);
+ void nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList);
+ void nodePopDownScans(NodeHandle& node, unsigned pos);
+ void nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList);
+ void nodePushDownScans(NodeHandle& node, unsigned pos);
+ void nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList);
+ void nodePopUpScans(NodeHandle& node, unsigned pos);
+ void nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i);
// scans linked to node
+ void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList);
+ void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList);
+ void moveScanList(NodeHandle& node, unsigned pos);
void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);
@@ -600,10 +606,21 @@ private:
/*
* DbtuxTree.cpp
*/
- void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
- void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
- void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
- void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
+ // add entry
+ void treeAdd(Frag& frag, TreePos treePos, TreeEnt ent);
+ void treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent);
+ void treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i);
+ void treeAddRebalance(Frag& frag, NodeHandle node, unsigned i);
+ // remove entry
+ void treeRemove(Frag& frag, TreePos treePos);
+ void treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos);
+ void treeRemoveSemi(Frag& frag, NodeHandle node, unsigned i);
+ void treeRemoveLeaf(Frag& frag, NodeHandle node);
+ void treeRemoveNode(Frag& frag, NodeHandle node);
+ void treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i);
+ // rotate
+ void treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i);
+ void treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i);
/*
* DbtuxScan.cpp
@@ -615,9 +632,9 @@ private:
void execACCKEYCONF(Signal* signal);
void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal);
- void scanFirst(Signal* signal, ScanOpPtr scanPtr);
- void scanNext(Signal* signal, ScanOpPtr scanPtr);
- bool scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent);
+ void scanFirst(ScanOpPtr scanPtr);
+ void scanNext(ScanOpPtr scanPtr);
+ bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
void removeAccLockOp(ScanOp& scan, Uint32 accLockOp);
@@ -626,9 +643,9 @@ private:
/*
* DbtuxSearch.cpp
*/
- void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
- void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
- void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
+ void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
+ void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
+ void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/*
* DbtuxCmp.cpp
@@ -652,7 +669,7 @@ private:
PrintPar();
};
void printTree(Signal* signal, Frag& frag, NdbOut& out);
- void printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
+ void printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
friend class NdbOut& operator<<(NdbOut&, const TupLoc&);
friend class NdbOut& operator<<(NdbOut&, const TreeEnt&);
friend class NdbOut& operator<<(NdbOut&, const TreeNode&);
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
index 63c7f5d34a1..6c2a29f54cc 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
@@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
strcpy(par.m_path, ".");
par.m_side = 2;
par.m_parent = NullTupLoc;
- printNode(signal, frag, out, tree.m_root, par);
+ printNode(frag, out, tree.m_root, par);
out.m_out->flush();
if (! par.m_ok) {
if (debugFile == 0) {
@@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
}
void
-Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
+Dbtux::printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
{
if (loc == NullTupLoc) {
par.m_depth = 0;
@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
}
TreeHead& tree = frag.m_tree;
NodeHandle node(frag);
- selectNode(signal, node, loc);
+ selectNode(node, loc);
out << par.m_path << " " << node << endl;
// check children
PrintPar cpar[2];
@@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
cpar[i].m_side = i;
cpar[i].m_depth = 0;
cpar[i].m_parent = loc;
- printNode(signal, frag, out, node.getLink(i), cpar[i]);
+ printNode(frag, out, node.getLink(i), cpar[i]);
if (! cpar[i].m_ok) {
par.m_ok = false;
}
@@ -178,16 +178,19 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl;
}
- // check missed half-leaf/leaf merge
+#ifdef dbtux_totally_groks_t_trees
+ // check missed semi-leaf/leaf merge
for (unsigned i = 0; i <= 1; i++) {
if (node.getLink(i) != NullTupLoc &&
node.getLink(1 - i) == NullTupLoc &&
- node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
+ // our semi-leaf seems to satify interior minOccup condition
+ node.getOccup() < tree.m_minOccup) {
par.m_ok = false;
out << par.m_path << sep;
out << "missed merge with child " << i << endl;
}
}
+#endif
// check inline prefix
{ ConstData data1 = node.getPref();
Uint32 data2[MaxPrefSize];
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
index 565e64f9aeb..30afb51e7d7 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
@@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
switch (opCode) {
case TuxMaintReq::OpAdd:
jam();
- searchToAdd(signal, frag, c_searchKey, ent, treePos);
+ searchToAdd(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << (treePos.m_match ? " - error" : "") << endl;
@@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
}
/*
- * At most one new node is inserted in the operation. We keep one
- * free node pre-allocated so the operation cannot fail.
+ * At most one new node is inserted in the operation. Pre-allocate
+ * it so that the operation cannot fail.
*/
if (frag.m_freeLoc == NullTupLoc) {
jam();
@@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
jam();
break;
}
+ // link to freelist
+ node.setLink(0, frag.m_freeLoc);
frag.m_freeLoc = node.m_loc;
ndbrequire(frag.m_freeLoc != NullTupLoc);
}
- treeAdd(signal, frag, treePos, ent);
+ treeAdd(frag, treePos, ent);
break;
case TuxMaintReq::OpRemove:
jam();
- searchToRemove(signal, frag, c_searchKey, ent, treePos);
+ searchToRemove(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl;
@@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
}
break;
}
- treeRemove(signal, frag, treePos);
+ treeRemove(frag, treePos);
break;
default:
ndbrequire(false);
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
index e0b7fec19cf..1577c5045e0 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
@@ -211,11 +211,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
// make these configurable later
tree.m_nodeSize = MAX_TTREE_NODE_SIZE;
tree.m_prefSize = MAX_TTREE_PREF_SIZE;
-#ifdef dbtux_min_occup_less_max_occup
const unsigned maxSlack = MAX_TTREE_NODE_SLACK;
-#else
- const unsigned maxSlack = 0;
-#endif
// size up to and including first 2 entries
const unsigned pref = tree.getSize(AccPref);
if (! (pref <= tree.m_nodeSize)) {
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
index f155917caf5..389192fd0cf 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
@@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node)
* Set handle to point to existing node.
*/
void
-Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
+Dbtux::selectNode(NodeHandle& node, TupLoc loc)
{
Frag& frag = node.m_frag;
ndbrequire(loc != NullTupLoc);
@@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
}
/*
- * Set handle to point to new node. Uses the pre-allocated node.
+ * Set handle to point to new node. Uses a pre-allocated node.
*/
void
-Dbtux::insertNode(Signal* signal, NodeHandle& node)
+Dbtux::insertNode(NodeHandle& node)
{
Frag& frag = node.m_frag;
- TupLoc loc = frag.m_freeLoc;
- frag.m_freeLoc = NullTupLoc;
- selectNode(signal, node, loc);
+ // unlink from freelist
+ selectNode(node, frag.m_freeLoc);
+ frag.m_freeLoc = node.getLink(0);
new (node.m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
@@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node)
}
/*
- * Delete existing node.
+ * Delete existing node. Simply put it on the freelist.
*/
void
-Dbtux::deleteNode(Signal* signal, NodeHandle& node)
+Dbtux::deleteNode(NodeHandle& node)
{
Frag& frag = node.m_frag;
ndbrequire(node.getOccup() == 0);
- TupLoc loc = node.m_loc;
- Uint32 pageId = loc.getPageId();
- Uint32 pageOffset = loc.getPageOffset();
- Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node);
- c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
- jamEntry();
- // invalidate handle and storage
+ // link to freelist
+ node.setLink(0, frag.m_freeLoc);
+ frag.m_freeLoc = node.m_loc;
+ // invalidate the handle
node.m_loc = NullTupLoc;
node.m_node = 0;
}
@@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
* attribute headers for now. XXX use null mask instead
*/
void
-Dbtux::setNodePref(Signal* signal, NodeHandle& node)
+Dbtux::setNodePref(NodeHandle& node)
{
const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree;
@@ -117,18 +114,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node)
* v
* A B C D E _ _ => A B C X D E _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
+ *
+ * Add list of scans at the new entry.
*/
void
-Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent)
+Dbtux::nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup < tree.m_maxOccup && pos <= occup);
- // fix scans
+ // fix old scans
+ if (node.getNodeScan() != RNIL)
+ nodePushUpScans(node, pos);
+ // fix node
+ TreeEnt* const entList = tree.getEntList(node.m_node);
+ entList[occup] = entList[0];
+ TreeEnt* const tmpList = entList + 1;
+ for (unsigned i = occup; i > pos; i--) {
+ jam();
+ tmpList[i] = tmpList[i - 1];
+ }
+ tmpList[pos] = ent;
+ entList[0] = entList[occup + 1];
+ node.setOccup(occup + 1);
+ // add new scans
+ if (scanList != RNIL)
+ addScanList(node, pos, scanList);
+ // fix prefix
+ if (occup == 0 || pos == 0)
+ setNodePref(node);
+}
+
+void
+Dbtux::nodePushUpScans(NodeHandle& node, unsigned pos)
+{
+ const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
@@ -144,21 +168,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
scanPos.m_pos++;
}
scanPtr.i = scanPtr.p->m_nodeScan;
- }
- // fix node
- TreeEnt* const entList = tree.getEntList(node.m_node);
- entList[occup] = entList[0];
- TreeEnt* const tmpList = entList + 1;
- for (unsigned i = occup; i > pos; i--) {
- jam();
- tmpList[i] = tmpList[i - 1];
- }
- tmpList[pos] = ent;
- entList[0] = entList[occup + 1];
- node.setOccup(occup + 1);
- // fix prefix
- if (occup == 0 || pos == 0)
- setNodePref(signal, node);
+ } while (scanPtr.i != RNIL);
}
/*
@@ -169,42 +179,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
* ^ ^
* A B C D E F _ => A B C E F _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
+ *
+ * Scans at removed entry are returned if non-zero location is passed or
+ * else moved forward.
*/
void
-Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
+Dbtux::nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
- ScanOpPtr scanPtr;
- // move scans whose entry disappears
- scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ if (node.getNodeScan() != RNIL) {
+ // remove or move scans at this position
+ if (scanList == 0)
+ moveScanList(node, pos);
+ else
+ removeScanList(node, pos, *scanList);
+ // fix other scans
+ if (node.getNodeScan() != RNIL)
+ nodePopDownScans(node, pos);
+ }
+ // fix node
+ TreeEnt* const entList = tree.getEntList(node.m_node);
+ entList[occup] = entList[0];
+ TreeEnt* const tmpList = entList + 1;
+ ent = tmpList[pos];
+ for (unsigned i = pos; i < occup - 1; i++) {
jam();
- c_scanOpPool.getPtr(scanPtr);
- TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
- const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
- if (scanPos.m_pos == pos) {
- jam();
-#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
- debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
- debugOut << "At popDown pos=" << pos << " " << node << endl;
- }
-#endif
- scanNext(signal, scanPtr);
- }
- scanPtr.i = nextPtrI;
+ tmpList[i] = tmpList[i + 1];
}
- // fix other scans
+ entList[0] = entList[occup - 1];
+ node.setOccup(occup - 1);
+ // fix prefix
+ if (occup != 1 && pos == 0)
+ setNodePref(node);
+}
+
+void
+Dbtux::nodePopDownScans(NodeHandle& node, unsigned pos)
+{
+ const unsigned occup = node.getOccup();
+ ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
+ // handled before
ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos > pos) {
jam();
@@ -217,21 +240,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos--;
}
scanPtr.i = scanPtr.p->m_nodeScan;
- }
- // fix node
- TreeEnt* const entList = tree.getEntList(node.m_node);
- entList[occup] = entList[0];
- TreeEnt* const tmpList = entList + 1;
- ent = tmpList[pos];
- for (unsigned i = pos; i < occup - 1; i++) {
- jam();
- tmpList[i] = tmpList[i + 1];
- }
- entList[0] = entList[occup - 1];
- node.setOccup(occup - 1);
- // fix prefix
- if (occup != 1 && pos == 0)
- setNodePref(signal, node);
+ } while (scanPtr.i != RNIL);
}
/*
@@ -242,43 +251,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
* ^ v ^
* A B C D E _ _ => B C D X E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
+ *
+ * Return list of scans at the removed position 0.
*/
void
-Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
+Dbtux::nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
- ScanOpPtr scanPtr;
- // move scans whose entry disappears
- scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ if (node.getNodeScan() != RNIL) {
+ // remove scans at 0
+ removeScanList(node, 0, scanList);
+ // fix other scans
+ if (node.getNodeScan() != RNIL)
+ nodePushDownScans(node, pos);
+ }
+ // fix node
+ TreeEnt* const entList = tree.getEntList(node.m_node);
+ entList[occup] = entList[0];
+ TreeEnt* const tmpList = entList + 1;
+ TreeEnt oldMin = tmpList[0];
+ for (unsigned i = 0; i < pos; i++) {
jam();
- c_scanOpPool.getPtr(scanPtr);
- TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
- const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
- if (scanPos.m_pos == 0) {
- jam();
-#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
- debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
- debugOut << "At pushDown pos=" << pos << " " << node << endl;
- }
-#endif
- // here we may miss a valid entry "X" XXX known bug
- scanNext(signal, scanPtr);
- }
- scanPtr.i = nextPtrI;
+ tmpList[i] = tmpList[i + 1];
}
- // fix other scans
+ tmpList[pos] = ent;
+ ent = oldMin;
+ entList[0] = entList[occup];
+ // fix prefix
+ if (true)
+ setNodePref(node);
+}
+
+void
+Dbtux::nodePushDownScans(NodeHandle& node, unsigned pos)
+{
+ const unsigned occup = node.getOccup();
+ ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
+ // handled before
ndbrequire(scanPos.m_pos != 0);
if (scanPos.m_pos <= pos) {
jam();
@@ -291,22 +309,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
scanPos.m_pos--;
}
scanPtr.i = scanPtr.p->m_nodeScan;
- }
- // fix node
- TreeEnt* const entList = tree.getEntList(node.m_node);
- entList[occup] = entList[0];
- TreeEnt* const tmpList = entList + 1;
- TreeEnt oldMin = tmpList[0];
- for (unsigned i = 0; i < pos; i++) {
- jam();
- tmpList[i] = tmpList[i + 1];
- }
- tmpList[pos] = ent;
- ent = oldMin;
- entList[0] = entList[occup];
- // fix prefix
- if (true)
- setNodePref(signal, node);
+ } while (scanPtr.i != RNIL);
}
/*
@@ -318,39 +321,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
* v ^ ^
* A B C D E _ _ => X A B C E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
+ *
+ * Move scans at removed entry and add scans at the new entry.
*/
void
-Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
+Dbtux::nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
- ScanOpPtr scanPtr;
- // move scans whose entry disappears
- scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ if (node.getNodeScan() != RNIL) {
+ // move scans whose entry disappears
+ moveScanList(node, pos);
+ // fix other scans
+ if (node.getNodeScan() != RNIL)
+ nodePopUpScans(node, pos);
+ }
+ // fix node
+ TreeEnt* const entList = tree.getEntList(node.m_node);
+ entList[occup] = entList[0];
+ TreeEnt* const tmpList = entList + 1;
+ TreeEnt newMin = ent;
+ ent = tmpList[pos];
+ for (unsigned i = pos; i > 0; i--) {
jam();
- c_scanOpPool.getPtr(scanPtr);
- TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
- const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
- if (scanPos.m_pos == pos) {
- jam();
-#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
- debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
- debugOut << "At popUp pos=" << pos << " " << node << endl;
- }
-#endif
- // here we may miss a valid entry "X" XXX known bug
- scanNext(signal, scanPtr);
- }
- scanPtr.i = nextPtrI;
+ tmpList[i] = tmpList[i - 1];
}
- // fix other scans
+ tmpList[0] = newMin;
+ entList[0] = entList[occup];
+ // add scans
+ if (scanList != RNIL)
+ addScanList(node, 0, scanList);
+ // fix prefix
+ if (true)
+ setNodePref(node);
+}
+
+void
+Dbtux::nodePopUpScans(NodeHandle& node, unsigned pos)
+{
+ const unsigned occup = node.getOccup();
+ ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
@@ -367,41 +381,123 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos++;
}
scanPtr.i = scanPtr.p->m_nodeScan;
- }
- // fix node
- TreeEnt* const entList = tree.getEntList(node.m_node);
- entList[occup] = entList[0];
- TreeEnt* const tmpList = entList + 1;
- TreeEnt newMin = ent;
- ent = tmpList[pos];
- for (unsigned i = pos; i > 0; i--) {
- jam();
- tmpList[i] = tmpList[i - 1];
- }
- tmpList[0] = newMin;
- entList[0] = entList[occup];
- // fix prefix
- if (true)
- setNodePref(signal, node);
+ } while (scanPtr.i != RNIL);
}
/*
- * Move all possible entries from another node before the min (i=0) or
- * after the max (i=1). XXX can be optimized
+ * Move number of entries from another node to this node before the min
+ * (i=0) or after the max (i=1). Expensive but not often used.
*/
void
-Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i)
+Dbtux::nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i)
{
Frag& frag = dstNode.m_frag;
TreeHead& tree = frag.m_tree;
ndbrequire(i <= 1);
- while (dstNode.getOccup() < tree.m_maxOccup && srcNode.getOccup() != 0) {
+ while (cnt != 0) {
TreeEnt ent;
- nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent);
- nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent);
+ Uint32 scanList = RNIL;
+ nodePopDown(srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
+ nodePushUp(dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
+ cnt--;
}
}
+// scans linked to node
+
+
+/*
+ * Add list of scans to node at given position.
+ */
+void
+Dbtux::addScanList(NodeHandle& node, unsigned pos, Uint32 scanList)
+{
+ ScanOpPtr scanPtr;
+ scanPtr.i = scanList;
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Add scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "To pos=" << pos << " " << node << endl;
+ }
+#endif
+ const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
+ scanPtr.p->m_nodeScan = RNIL;
+ linkScan(node, scanPtr);
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ // set position but leave direction alone
+ scanPos.m_loc = node.m_loc;
+ scanPos.m_pos = pos;
+ scanPtr.i = nextPtrI;
+ } while (scanPtr.i != RNIL);
+}
+
+/*
+ * Remove list of scans from node at given position. The return
+ * location must point to existing list (in fact RNIL always).
+ */
+void
+Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList)
+{
+ ScanOpPtr scanPtr;
+ scanPtr.i = node.getNodeScan();
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+ const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ ndbrequire(scanPos.m_loc == node.m_loc);
+ if (scanPos.m_pos == pos) {
+ jam();
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Remove scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "Fron pos=" << pos << " " << node << endl;
+ }
+#endif
+ unlinkScan(node, scanPtr);
+ scanPtr.p->m_nodeScan = scanList;
+ scanList = scanPtr.i;
+ // unset position but leave direction alone
+ scanPos.m_loc = NullTupLoc;
+ scanPos.m_pos = ZNIL;
+ }
+ scanPtr.i = nextPtrI;
+ } while (scanPtr.i != RNIL);
+}
+
+/*
+ * Move list of scans away from entry about to be removed. Uses scan
+ * method scanNext().
+ */
+void
+Dbtux::moveScanList(NodeHandle& node, unsigned pos)
+{
+ ScanOpPtr scanPtr;
+ scanPtr.i = node.getNodeScan();
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
+ ndbrequire(scanPos.m_loc == node.m_loc);
+ if (scanPos.m_pos == pos) {
+ jam();
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At pos=" << pos << " " << node << endl;
+ }
+#endif
+ scanNext(scanPtr);
+ ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
+ }
+ scanPtr.i = nextPtrI;
+ } while (scanPtr.i != RNIL);
+}
+
/*
* Link scan to the list under the node. The list is single-linked and
* ordering does not matter.
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
index 585d2dbe58a..c58dec2f102 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jam();
const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandle node(frag);
- selectNode(signal, node, loc);
+ selectNode(node, loc);
unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc;
}
@@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::First) {
jam();
// search is done only once in single range scan
- scanFirst(signal, scanPtr);
+ scanFirst(scanPtr);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "First scan " << scanPtr.i << " " << scan << endl;
@@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::Next) {
jam();
// look for next
- scanNext(signal, scanPtr);
+ scanNext(scanPtr);
}
// for reading tuple key in Current or Locked state
Data pkData = c_dataBuffer;
@@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
* by scanNext.
*/
void
-Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
+Dbtux::scanFirst(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
@@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
}
// search for scan start position
TreePos treePos;
- searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
+ searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
if (treePos.m_loc == NullTupLoc) {
// empty tree
jam();
@@ -710,13 +710,13 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
scan.m_state = ScanOp::Next;
// link the scan to node found
NodeHandle node(frag);
- selectNode(signal, node, treePos.m_loc);
+ selectNode(node, treePos.m_loc);
linkScan(node, scanPtr);
}
/*
* Move to next entry. The scan is already linked to some node. When
- * we leave, if any entry was found, it will be linked to a possibly
+ * we leave, if an entry was found, it will be linked to a possibly
* different node. The scan has a position, and a direction which tells
* from where we came to this position. This is one of:
*
@@ -725,9 +725,12 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
* 2 - up from root (the scan ends)
* 3 - left to right within node (at end proceed to right child)
* 4 - down from parent (proceed to left child)
+ *
+ * If an entry was found, scan direction is 3. Therefore tree
+ * re-organizations need not worry about scan direction.
*/
void
-Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
+Dbtux::scanNext(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
@@ -736,22 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
debugOut << "Next in scan " << scanPtr.i << " " << scan << endl;
}
#endif
- if (scan.m_state == ScanOp::Locked) {
- jam();
- // version of a tuple locked by us cannot disappear (assert only)
-#ifdef dbtux_wl_1942_is_done
- ndbassert(false);
-#endif
- AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
- lockReq->returnCode = RNIL;
- lockReq->requestInfo = AccLockReq::Unlock;
- lockReq->accOpPtr = scan.m_accLockOp;
- EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
- jamEntry();
- ndbrequire(lockReq->returnCode == AccLockReq::Success);
- scan.m_accLockOp = RNIL;
- scan.m_state = ScanOp::Current;
- }
+ // cannot be moved away from tuple we have locked
+ ndbrequire(scan.m_state != ScanOp::Locked);
// set up index keys for this operation
setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer
@@ -767,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
TreePos pos = scan.m_scanPos;
// get and remember original node
NodeHandle origNode(frag);
- selectNode(signal, origNode, pos.m_loc);
+ selectNode(origNode, pos.m_loc);
ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop
NodeHandle node = origNode;
@@ -784,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
}
if (node.m_loc != pos.m_loc) {
jam();
- selectNode(signal, node, pos.m_loc);
+ selectNode(node, pos.m_loc);
}
if (pos.m_dir == 4) {
// coming down from parent proceed to left child
@@ -832,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break;
}
// can we see it
- if (! scanVisible(signal, scanPtr, ent)) {
+ if (! scanVisible(scanPtr, ent)) {
jam();
continue;
}
@@ -864,6 +853,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_scanPos = pos;
// relink
if (scan.m_state == ScanOp::Current) {
+ ndbrequire(pos.m_match == true && pos.m_dir == 3);
ndbrequire(pos.m_loc == node.m_loc);
if (origNode.m_loc != node.m_loc) {
jam();
@@ -894,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
* which are not analyzed or handled yet.
*/
bool
-Dbtux::scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent)
+Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
{
const ScanOp& scan = *scanPtr.p;
const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
index 1e20d3e3718..0f4b0862960 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
@@ -25,7 +25,7 @@
* TODO optimize for initial equal attrs in node min/max
*/
void
-Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
+Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
NodeHandle bottomNode(frag);
while (true) {
jam();
- selectNode(signal, currNode, currNode.m_loc);
+ selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
unsigned start = 0;
@@ -159,12 +159,12 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
*
* Compares search key to each node min. A move to right subtree can
* overshoot target node. The last such node is saved. The final node
- * is a half-leaf or leaf. If search key is less than final node min
+ * is a semi-leaf or leaf. If search key is less than final node min
* then the saved node is the g.l.b of the final node and we move back
* to it.
*/
void
-Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
+Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
@@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
jam();
- selectNode(signal, currNode, currNode.m_loc);
+ selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
unsigned start = 0;
@@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
* Similar to searchToAdd.
*/
void
-Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
+Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
@@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo
NodeHandle bottomNode(frag);
while (true) {
jam();
- selectNode(signal, currNode, currNode.m_loc);
+ selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
index 84d26976e05..b9e3b593a00 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
@@ -18,71 +18,105 @@
#include "Dbtux.hpp"
/*
- * Add entry.
+ * Add entry. Handle the case when there is room for one more. This
+ * is the common case given slack in nodes.
*/
void
-Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
+Dbtux::treeAdd(Frag& frag, TreePos treePos, TreeEnt ent)
{
TreeHead& tree = frag.m_tree;
- unsigned pos = treePos.m_pos;
NodeHandle node(frag);
- // check for empty tree
- if (treePos.m_loc == NullTupLoc) {
- jam();
- insertNode(signal, node);
- nodePushUp(signal, node, 0, ent);
- node.setSide(2);
- tree.m_root = node.m_loc;
- return;
- }
- selectNode(signal, node, treePos.m_loc);
- // check if it is bounding node
- if (pos != 0 && pos != node.getOccup()) {
+ if (treePos.m_loc != NullTupLoc) {
+ // non-empty tree
jam();
- // check if room for one more
+ selectNode(node, treePos.m_loc);
+ unsigned pos = treePos.m_pos;
if (node.getOccup() < tree.m_maxOccup) {
+ // node has room
jam();
- nodePushUp(signal, node, pos, ent);
+ nodePushUp(node, pos, ent, RNIL);
return;
}
- // returns min entry
- nodePushDown(signal, node, pos - 1, ent);
- // find position to add the removed min entry
- TupLoc childLoc = node.getLink(0);
- if (childLoc == NullTupLoc) {
+ treeAddFull(frag, node, pos, ent);
+ return;
+ }
+ jam();
+ insertNode(node);
+ nodePushUp(node, 0, ent, RNIL);
+ node.setSide(2);
+ tree.m_root = node.m_loc;
+}
+
+/*
+ * Add entry when node is full. Handle the case when there is g.l.b
+ * node in left subtree with room for one more. It will receive the min
+ * entry of this node. The min entry could be the entry to add.
+ */
+void
+Dbtux::treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent)
+{
+ TreeHead& tree = frag.m_tree;
+ TupLoc loc = lubNode.getLink(0);
+ if (loc != NullTupLoc) {
+ // find g.l.b node
+ NodeHandle glbNode(frag);
+ do {
jam();
- // left child will be added
- pos = 0;
- } else {
+ selectNode(glbNode, loc);
+ loc = glbNode.getLink(1);
+ } while (loc != NullTupLoc);
+ if (glbNode.getOccup() < tree.m_maxOccup) {
+ // g.l.b node has room
jam();
- // find glb node
- while (childLoc != NullTupLoc) {
+ Uint32 scanList = RNIL;
+ if (pos != 0) {
jam();
- selectNode(signal, node, childLoc);
- childLoc = node.getLink(1);
+ // add the new entry and return min entry
+ nodePushDown(lubNode, pos - 1, ent, scanList);
}
- pos = node.getOccup();
+ // g.l.b node receives min entry from l.u.b node
+ nodePushUp(glbNode, glbNode.getOccup(), ent, scanList);
+ return;
}
- // fall thru to next case
- }
- // adding new min or max
- unsigned i = (pos == 0 ? 0 : 1);
- ndbrequire(node.getLink(i) == NullTupLoc);
- // check if the half-leaf/leaf has room for one more
- if (node.getOccup() < tree.m_maxOccup) {
- jam();
- nodePushUp(signal, node, pos, ent);
+ treeAddNode(frag, lubNode, pos, ent, glbNode, 1);
return;
}
- // add a new node
- NodeHandle childNode(frag);
- insertNode(signal, childNode);
- nodePushUp(signal, childNode, 0, ent);
+ treeAddNode(frag, lubNode, pos, ent, lubNode, 0);
+}
+
+/*
+ * Add entry when there is no g.l.b node in left subtree or the g.l.b
+ * node is full. We must add a new left or right child node which
+ * becomes the new g.l.b node.
+ */
+void
+Dbtux::treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i)
+{
+ NodeHandle glbNode(frag);
+ insertNode(glbNode);
// connect parent and child
- node.setLink(i, childNode.m_loc);
- childNode.setLink(2, node.m_loc);
- childNode.setSide(i);
- // re-balance tree at each node
+ parentNode.setLink(i, glbNode.m_loc);
+ glbNode.setLink(2, parentNode.m_loc);
+ glbNode.setSide(i);
+ Uint32 scanList = RNIL;
+ if (pos != 0) {
+ jam();
+ // add the new entry and return min entry
+ nodePushDown(lubNode, pos - 1, ent, scanList);
+ }
+ // g.l.b node receives min entry from l.u.b node
+ nodePushUp(glbNode, 0, ent, scanList);
+ // re-balance the tree
+ treeAddRebalance(frag, parentNode, i);
+}
+
+/*
+ * Re-balance tree after adding a node. The process starts with the
+ * parent of the added node.
+ */
+void
+Dbtux::treeAddRebalance(Frag& frag, NodeHandle node, unsigned i)
+{
while (true) {
// height of subtree i has increased by 1
int j = (i == 0 ? -1 : +1);
@@ -102,14 +136,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// height of longer subtree increased
jam();
NodeHandle childNode(frag);
- selectNode(signal, childNode, node.getLink(i));
+ selectNode(childNode, node.getLink(i));
int b2 = childNode.getBalance();
if (b2 == b) {
jam();
- treeRotateSingle(signal, frag, node, i);
+ treeRotateSingle(frag, node, i);
} else if (b2 == -b) {
jam();
- treeRotateDouble(signal, frag, node, i);
+ treeRotateDouble(frag, node, i);
} else {
// height of subtree increased so it cannot be perfectly balanced
ndbrequire(false);
@@ -126,115 +160,169 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
break;
}
i = node.getSide();
- selectNode(signal, node, parentLoc);
+ selectNode(node, parentLoc);
}
}
/*
- * Remove entry.
+ * Remove entry. Optimize for nodes with slack. Handle the case when
+ * there is no underflow i.e. occupancy remains at least minOccup. For
+ * interior nodes this is a requirement. For others it means that we do
+ * not need to consider merge of semi-leaf and leaf.
*/
void
-Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
+Dbtux::treeRemove(Frag& frag, TreePos treePos)
{
TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos;
NodeHandle node(frag);
- selectNode(signal, node, treePos.m_loc);
+ selectNode(node, treePos.m_loc);
TreeEnt ent;
- // check interior node first
- if (node.getChilds() == 2) {
+ if (node.getOccup() > tree.m_minOccup) {
+ // no underflow in any node type
jam();
- ndbrequire(node.getOccup() >= tree.m_minOccup);
- // check if no underflow
- if (node.getOccup() > tree.m_minOccup) {
- jam();
- nodePopDown(signal, node, pos, ent);
- return;
- }
- // save current handle
- NodeHandle parentNode = node;
- // find glb node
- TupLoc childLoc = node.getLink(0);
- while (childLoc != NullTupLoc) {
- jam();
- selectNode(signal, node, childLoc);
- childLoc = node.getLink(1);
- }
- // use glb max as new parent min
- ent = node.getEnt(node.getOccup() - 1);
- nodePopUp(signal, parentNode, pos, ent);
- // set up to remove glb max
- pos = node.getOccup() - 1;
- // fall thru to next case
+ nodePopDown(node, pos, ent, 0);
+ return;
}
- // remove the element
- nodePopDown(signal, node, pos, ent);
- ndbrequire(node.getChilds() <= 1);
- // handle half-leaf
- unsigned i;
- for (i = 0; i <= 1; i++) {
+ if (node.getChilds() == 2) {
+ // underflow in interior node
jam();
- TupLoc childLoc = node.getLink(i);
- if (childLoc != NullTupLoc) {
- // move to child
- selectNode(signal, node, childLoc);
- // balance of half-leaf parent requires child to be leaf
- break;
- }
+ treeRemoveInner(frag, node, pos);
+ return;
}
- ndbrequire(node.getChilds() == 0);
- // get parent if any
- TupLoc parentLoc = node.getLink(2);
- NodeHandle parentNode(frag);
- i = node.getSide();
- // move all that fits into parent
- if (parentLoc != NullTupLoc) {
+ // remove entry in semi/leaf
+ nodePopDown(node, pos, ent, 0);
+ if (node.getLink(0) != NullTupLoc) {
jam();
- selectNode(signal, parentNode, node.getLink(2));
- nodeSlide(signal, parentNode, node, i);
- // fall thru to next case
+ treeRemoveSemi(frag, node, 0);
+ return;
}
- // non-empty leaf
- if (node.getOccup() >= 1) {
+ if (node.getLink(1) != NullTupLoc) {
jam();
+ treeRemoveSemi(frag, node, 1);
return;
}
- // remove empty leaf
- deleteNode(signal, node);
- if (parentLoc == NullTupLoc) {
+ treeRemoveLeaf(frag, node);
+}
+
+/*
+ * Remove entry when interior node underflows. There is g.l.b node in
+ * left subtree to borrow an entry from. The max entry of the g.l.b
+ * node becomes the min entry of this node.
+ */
+void
+Dbtux::treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos)
+{
+ TreeHead& tree = frag.m_tree;
+ TreeEnt ent;
+ // find g.l.b node
+ NodeHandle glbNode(frag);
+ TupLoc loc = lubNode.getLink(0);
+ do {
+ jam();
+ selectNode(glbNode, loc);
+ loc = glbNode.getLink(1);
+ } while (loc != NullTupLoc);
+ // borrow max entry from semi/leaf
+ Uint32 scanList = RNIL;
+ nodePopDown(glbNode, glbNode.getOccup() - 1, ent, &scanList);
+ nodePopUp(lubNode, pos, ent, scanList);
+ if (glbNode.getLink(0) != NullTupLoc) {
jam();
- // tree is now empty
- tree.m_root = NullTupLoc;
+ treeRemoveSemi(frag, glbNode, 0);
return;
}
- node = parentNode;
- node.setLink(i, NullTupLoc);
-#ifdef dbtux_min_occup_less_max_occup
- // check if we created a half-leaf
- if (node.getBalance() == 0) {
+ treeRemoveLeaf(frag, glbNode);
+}
+
+/*
+ * Handle semi-leaf after removing an entry. Move entries from leaf to
+ * semi-leaf to bring semi-leaf occupancy above minOccup, if possible.
+ * The leaf may become empty.
+ */
+void
+Dbtux::treeRemoveSemi(Frag& frag, NodeHandle semiNode, unsigned i)
+{
+ TreeHead& tree = frag.m_tree;
+ ndbrequire(semiNode.getChilds() < 2);
+ TupLoc leafLoc = semiNode.getLink(i);
+ NodeHandle leafNode(frag);
+ selectNode(leafNode, leafLoc);
+ if (semiNode.getOccup() < tree.m_minOccup) {
+ jam();
+ unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - semiNode.getOccup());
+ nodeSlide(semiNode, leafNode, cnt, i);
+ if (leafNode.getOccup() == 0) {
+ // remove empty leaf
+ jam();
+ treeRemoveNode(frag, leafNode);
+ }
+ }
+}
+
+/*
+ * Handle leaf after removing an entry. If parent is semi-leaf, move
+ * entries to it as in the semi-leaf case. If parent is interior node,
+ * do nothing.
+ */
+void
+Dbtux::treeRemoveLeaf(Frag& frag, NodeHandle leafNode)
+{
+ TreeHead& tree = frag.m_tree;
+ TupLoc parentLoc = leafNode.getLink(2);
+ if (parentLoc != NullTupLoc) {
jam();
- // move entries from the other child
- TupLoc childLoc = node.getLink(1 - i);
- NodeHandle childNode(frag);
- selectNode(signal, childNode, childLoc);
- nodeSlide(signal, node, childNode, 1 - i);
- if (childNode.getOccup() == 0) {
+ NodeHandle parentNode(frag);
+ selectNode(parentNode, parentLoc);
+ unsigned i = leafNode.getSide();
+ if (parentNode.getLink(1 - i) == NullTupLoc) {
+ // parent is semi-leaf
jam();
- deleteNode(signal, childNode);
- node.setLink(1 - i, NullTupLoc);
- // we are balanced again but our parent balance changes by -1
- parentLoc = node.getLink(2);
- if (parentLoc == NullTupLoc) {
+ if (parentNode.getOccup() < tree.m_minOccup) {
jam();
- return;
+ unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - parentNode.getOccup());
+ nodeSlide(parentNode, leafNode, cnt, i);
}
- // fix side and become parent
- i = node.getSide();
- selectNode(signal, node, parentLoc);
}
}
-#endif
- // re-balance tree at each node
+ if (leafNode.getOccup() == 0) {
+ jam();
+ // remove empty leaf
+ treeRemoveNode(frag, leafNode);
+ }
+}
+
+/*
+ * Remove empty leaf.
+ */
+void
+Dbtux::treeRemoveNode(Frag& frag, NodeHandle leafNode)
+{
+ TreeHead& tree = frag.m_tree;
+ ndbrequire(leafNode.getChilds() == 0);
+ TupLoc parentLoc = leafNode.getLink(2);
+ unsigned i = leafNode.getSide();
+ deleteNode(leafNode);
+ if (parentLoc != NullTupLoc) {
+ jam();
+ NodeHandle parentNode(frag);
+ selectNode(parentNode, parentLoc);
+ parentNode.setLink(i, NullTupLoc);
+ // re-balance the tree
+ treeRemoveRebalance(frag, parentNode, i);
+ return;
+ }
+ // tree is now empty
+ tree.m_root = NullTupLoc;
+}
+
+/*
+ * Re-balance tree after removing a node. The process starts with the
+ * parent of the removed node.
+ */
+void
+Dbtux::treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i)
+{
while (true) {
// height of subtree i has decreased by 1
int j = (i == 0 ? -1 : +1);
@@ -255,19 +343,19 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
jam();
// child on the other side
NodeHandle childNode(frag);
- selectNode(signal, childNode, node.getLink(1 - i));
+ selectNode(childNode, node.getLink(1 - i));
int b2 = childNode.getBalance();
if (b2 == b) {
jam();
- treeRotateSingle(signal, frag, node, 1 - i);
+ treeRotateSingle(frag, node, 1 - i);
// height of tree decreased and propagates up
} else if (b2 == -b) {
jam();
- treeRotateDouble(signal, frag, node, 1 - i);
+ treeRotateDouble(frag, node, 1 - i);
// height of tree decreased and propagates up
} else {
jam();
- treeRotateSingle(signal, frag, node, 1 - i);
+ treeRotateSingle(frag, node, 1 - i);
// height of tree did not change - done
return;
}
@@ -281,7 +369,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
return;
}
i = node.getSide();
- selectNode(signal, node, parentLoc);
+ selectNode(node, parentLoc);
}
}
@@ -302,10 +390,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
* all optional. If 4 are there it changes side.
*/
void
-Dbtux::treeRotateSingle(Signal* signal,
- Frag& frag,
- NodeHandle& node,
- unsigned i)
+Dbtux::treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i)
{
ndbrequire(i <= 1);
/*
@@ -325,7 +410,7 @@ Dbtux::treeRotateSingle(Signal* signal,
*/
TupLoc loc3 = node5.getLink(i);
NodeHandle node3(frag);
- selectNode(signal, node3, loc3);
+ selectNode(node3, loc3);
const int bal3 = node3.getBalance();
/*
2 must always be there but is not changed. Thus we mereley check that it
@@ -342,7 +427,7 @@ Dbtux::treeRotateSingle(Signal* signal,
NodeHandle node4(frag);
if (loc4 != NullTupLoc) {
jam();
- selectNode(signal, node4, loc4);
+ selectNode(node4, loc4);
ndbrequire(node4.getSide() == (1 - i) &&
node4.getLink(2) == loc3);
node4.setSide(i);
@@ -377,7 +462,7 @@ Dbtux::treeRotateSingle(Signal* signal,
if (loc0 != NullTupLoc) {
jam();
NodeHandle node0(frag);
- selectNode(signal, node0, loc0);
+ selectNode(node0, loc0);
node0.setLink(side5, loc3);
} else {
jam();
@@ -514,8 +599,10 @@ Dbtux::treeRotateSingle(Signal* signal,
*
*/
void
-Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i)
+Dbtux::treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i)
{
+ TreeHead& tree = frag.m_tree;
+
// old top node
NodeHandle node6 = node;
const TupLoc loc6 = node6.m_loc;
@@ -526,13 +613,13 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
// level 1
TupLoc loc2 = node6.getLink(i);
NodeHandle node2(frag);
- selectNode(signal, node2, loc2);
+ selectNode(node2, loc2);
const int bal2 = node2.getBalance();
// level 2
TupLoc loc4 = node2.getLink(1 - i);
NodeHandle node4(frag);
- selectNode(signal, node4, loc4);
+ selectNode(node4, loc4);
const int bal4 = node4.getBalance();
ndbrequire(i <= 1);
@@ -549,23 +636,26 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
// fill up leaf before it becomes internal
if (loc3 == NullTupLoc && loc5 == NullTupLoc) {
jam();
- TreeHead& tree = frag.m_tree;
- nodeSlide(signal, node4, node2, i);
- // implied by rule of merging half-leaves with leaves
- ndbrequire(node4.getOccup() >= tree.m_minOccup);
- ndbrequire(node2.getOccup() != 0);
+ if (node4.getOccup() < tree.m_minOccup) {
+ jam();
+ unsigned cnt = tree.m_minOccup - node4.getOccup();
+ ndbrequire(cnt < node2.getOccup());
+ nodeSlide(node4, node2, cnt, i);
+ ndbrequire(node4.getOccup() >= tree.m_minOccup);
+ ndbrequire(node2.getOccup() != 0);
+ }
} else {
if (loc3 != NullTupLoc) {
jam();
NodeHandle node3(frag);
- selectNode(signal, node3, loc3);
+ selectNode(node3, loc3);
node3.setLink(2, loc2);
node3.setSide(1 - i);
}
if (loc5 != NullTupLoc) {
jam();
NodeHandle node5(frag);
- selectNode(signal, node5, loc5);
+ selectNode(node5, loc5);
node5.setLink(2, node6.m_loc);
node5.setSide(i);
}
@@ -588,7 +678,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
if (loc0 != NullTupLoc) {
jam();
- selectNode(signal, node0, loc0);
+ selectNode(node0, loc0);
node0.setLink(side6, loc4);
} else {
jam();
diff --git a/ndb/src/kernel/blocks/dbtux/Times.txt b/ndb/src/kernel/blocks/dbtux/Times.txt
index 8fbb695ef82..bf466d12604 100644
--- a/ndb/src/kernel/blocks/dbtux/Times.txt
+++ b/ndb/src/kernel/blocks/dbtux/Times.txt
@@ -29,6 +29,7 @@ shows ms / 1000 rows for each and index time overhead
samples 10% of all PKs (100,000 pk reads, 100,000 scans)
the "pct" values are from more accurate total times (not shown)
+comments [ ... ] are after the case
040616 mc02/a 40 ms 87 ms 114 pct
mc02/b 51 ms 128 ms 148 pct
@@ -76,13 +77,12 @@ optim 13 mc02/a 40 ms 57 ms 42 pct
mc02/c 9 ms 13 ms 50 pct
mc02/d 170 ms 256 ms 50 pct
-after wl-1884 store all-NULL keys (the tests have pctnull=10 per column)
-
optim 13 mc02/a 39 ms 59 ms 50 pct
mc02/b 47 ms 77 ms 61 pct
mc02/c 9 ms 12 ms 44 pct
mc02/d 246 ms 289 ms 17 pct
+[ after wl-1884 store all-NULL keys (the tests have pctnull=10 per column) ]
[ case d: bug in testOIBasic killed PK read performance ]
optim 14 mc02/a 41 ms 60 ms 44 pct
@@ -98,8 +98,7 @@ none mc02/a 35 ms 60 ms 71 pct
mc02/c 5 ms 12 ms 106 pct
mc02/d 165 ms 238 ms 44 pct
-[ johan re-installed mc02 as fedora gcc-3.3.2 ]
-[ case c: table scan has improved... ]
+[ johan re-installed mc02 as fedora gcc-3.3.2, tux uses more C++ stuff than tup]
charsets mc02/a 35 ms 60 ms 71 pct
mc02/b 42 ms 84 ms 97 pct
@@ -118,6 +117,19 @@ optim 15 mc02/a 34 ms 60 ms 72 pct
optim 16 mc02/a 34 ms 53 ms 53 pct
mc02/b 42 ms 75 ms 75 pct
-[ case a, b: binary search of bounding node when adding entry ]
+[ binary search of bounding node when adding entry ]
+
+none mc02/a 35 ms 53 ms 51 pct
+ mc02/b 42 ms 75 ms 76 pct
+
+[ rewrote treeAdd / treeRemove ]
+
+optim 17 mc02/a 35 ms 52 ms 49 pct
+ mc02/b 43 ms 75 ms 75 pct
+
+[ allow slack (2) in interior nodes - almost no effect?? ]
+
+wl-1942 mc02/a 35 ms 52 ms 49 pct
+ mc02/b 42 ms 75 ms 76 pct
vim: set et:
diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp
index 214816a1ba1..c94dbf0fb19 100644
--- a/ndb/test/ndbapi/testOIBasic.cpp
+++ b/ndb/test/ndbapi/testOIBasic.cpp
@@ -42,7 +42,7 @@ struct Opt {
CHARSET_INFO* m_cs;
bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype;
- unsigned m_idxloop;
+ unsigned m_subsubloop;
const char* m_index;
unsigned m_loop;
bool m_nologging;
@@ -66,7 +66,7 @@ struct Opt {
m_cs(0),
m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined),
- m_idxloop(4),
+ m_subsubloop(4),
m_index(0),
m_loop(1),
m_nologging(false),
@@ -79,7 +79,7 @@ struct Opt {
m_seed(0),
m_subloop(4),
m_table(0),
- m_threads(4),
+ m_threads(6), // table + 5 indexes
m_v(1) {
}
};
@@ -208,6 +208,8 @@ struct Par : public Opt {
Set& set() const { assert(m_set != 0); return *m_set; }
Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
+ unsigned m_lno;
+ unsigned m_slno;
unsigned m_totrows;
// value calculation
unsigned m_range;
@@ -226,6 +228,8 @@ struct Par : public Opt {
m_tab(0),
m_set(0),
m_tmr(0),
+ m_lno(0),
+ m_slno(0),
m_totrows(m_threads * m_rows),
m_range(m_rows),
m_pctrange(0),
@@ -2069,7 +2073,8 @@ pkinsert(Par par)
CHK(con.startTransaction() == 0);
Lst lst;
for (unsigned j = 0; j < par.m_rows; j++) {
- unsigned i = thrrow(par, j);
+ unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
+ unsigned i = thrrow(par, j2);
set.lock();
if (set.exist(i) || set.pending(i)) {
set.unlock();
@@ -2174,7 +2179,8 @@ pkdelete(Par par)
Lst lst;
bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) {
- unsigned i = thrrow(par, j);
+ unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
+ unsigned i = thrrow(par, j2);
set.lock();
if (! set.exist(i) || set.pending(i)) {
set.unlock();
@@ -2398,7 +2404,7 @@ static int
scanreadindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
- for (unsigned i = 0; i < par.m_idxloop; i++) {
+ for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanreadindex(par, itab, bset) == 0);
@@ -2645,7 +2651,7 @@ static int
scanupdateindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
- for (unsigned i = 0; i < par.m_idxloop; i++) {
+ for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0);
@@ -2686,6 +2692,53 @@ readverify(Par par)
}
static int
+readverifyfull(Par par)
+{
+ par.m_verify = true;
+ if (par.m_no == 0)
+ CHK(scanreadtable(par) == 0);
+ else {
+ const Tab& tab = par.tab();
+ unsigned i = par.m_no;
+ if (i <= tab.m_itabs && useindex(i)) {
+ const ITab& itab = tab.m_itab[i - 1];
+ BSet bset(tab, itab, par.m_rows);
+ CHK(scanreadindex(par, itab, bset) == 0);
+ }
+ }
+ return 0;
+}
+
+static int
+pkops(Par par)
+{
+ par.m_randomkey = true;
+ for (unsigned i = 0; i < par.m_subsubloop; i++) {
+ unsigned sel = urandom(10);
+ if (par.m_slno % 2 == 0) {
+ // favor insert
+ if (sel < 8) {
+ CHK(pkinsert(par) == 0);
+ } else if (sel < 9) {
+ CHK(pkupdate(par) == 0);
+ } else {
+ CHK(pkdelete(par) == 0);
+ }
+ } else {
+ // favor delete
+ if (sel < 1) {
+ CHK(pkinsert(par) == 0);
+ } else if (sel < 2) {
+ CHK(pkupdate(par) == 0);
+ } else {
+ CHK(pkdelete(par) == 0);
+ }
+ }
+ }
+ return 0;
+}
+
+static int
pkupdatescanread(Par par)
{
par.m_dups = true;
@@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
thr.m_par.m_tab = par.m_tab;
thr.m_par.m_set = par.m_set;
thr.m_par.m_tmr = par.m_tmr;
+ thr.m_par.m_lno = par.m_lno;
+ thr.m_par.m_slno = par.m_slno;
thr.m_func = func;
thr.start();
}
@@ -2953,8 +3008,8 @@ tbuild(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
- for (unsigned i = 0; i < par.m_subloop; i++) {
- if (i % 2 == 0) {
+ for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ if (par.m_slno % 2 == 0) {
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkinsert, MT);
@@ -2963,9 +3018,10 @@ tbuild(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
}
- RUNSTEP(par, readverify, MT);
+ RUNSTEP(par, pkupdate, MT);
+ RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, pkdelete, MT);
- RUNSTEP(par, readverify, MT);
+ RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST);
}
return 0;
@@ -2977,11 +3033,27 @@ tpkops(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
+ RUNSTEP(par, createindex, ST);
+ RUNSTEP(par, invalidateindex, MT);
+ for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ RUNSTEP(par, pkops, MT);
+ LL2("rows=" << par.set().count());
+ RUNSTEP(par, readverifyfull, MT);
+ }
+ return 0;
+}
+
+static int
+tpkopsread(Par par)
+{
+ RUNSTEP(par, droptable, ST);
+ RUNSTEP(par, createtable, ST);
+ RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
- for (unsigned i = 0; i < par.m_subloop; i++) {
+ for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdatescanread, MT);
RUNSTEP(par, readverify, ST);
}
@@ -3000,7 +3072,7 @@ tmixedops(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
- for (unsigned i = 0; i < par.m_subloop; i++) {
+ for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST);
}
@@ -3014,7 +3086,7 @@ tbusybuild(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT);
- for (unsigned i = 0; i < par.m_subloop; i++) {
+ for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
@@ -3030,7 +3102,7 @@ ttimebuild(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
- for (unsigned i = 0; i < par.m_subloop; i++) {
+ for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, createindex, ST);
@@ -3049,7 +3121,7 @@ ttimemaint(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
- for (unsigned i = 0; i < par.m_subloop; i++) {
+ for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, pkupdate, MT);
@@ -3074,7 +3146,7 @@ ttimescan(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
- for (unsigned i = 0; i < par.m_subloop; i++) {
+ for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
par.m_tmr = &t1;
@@ -3096,7 +3168,7 @@ ttimepkread(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
- for (unsigned i = 0; i < par.m_subloop; i++) {
+ for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
par.m_tmr = &t1;
@@ -3132,9 +3204,10 @@ struct TCase {
static const TCase
tcaselist[] = {
TCase("a", tbuild, "index build"),
- TCase("b", tpkops, "pk operations and scan reads"),
- TCase("c", tmixedops, "pk operations and scan operations"),
- TCase("d", tbusybuild, "pk operations and index build"),
+ TCase("b", tpkops, "pk operations"),
+ TCase("c", tpkopsread, "pk operations and scan reads"),
+ TCase("d", tmixedops, "pk operations and scan operations"),
+ TCase("e", tbusybuild, "pk operations and index build"),
TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"),
@@ -3192,10 +3265,10 @@ runtest(Par par)
Thr& thr = *g_thrlist[n];
assert(thr.m_thread != 0);
}
- for (unsigned l = 0; par.m_loop == 0 || l < par.m_loop; l++) {
- LL1("loop " << l);
+ for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
+ LL1("loop " << par.m_lno);
if (par.m_seed == 0)
- srandom(l);
+ srandom(par.m_lno);
for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)