summaryrefslogtreecommitdiff
path: root/ndb
diff options
context:
space:
mode:
Diffstat (limited to 'ndb')
-rw-r--r--ndb/src/kernel/blocks/dbtux/Dbtux.hpp39
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp94
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp216
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp16
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp22
-rw-r--r--ndb/src/kernel/blocks/dbtux/Times.txt5
6 files changed, 153 insertions, 239 deletions
diff --git a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
index aa0c9b41c3c..45ea5f847d4 100644
--- a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
+++ b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
@@ -505,17 +505,15 @@ private:
struct NodeHandle;
friend struct NodeHandle;
struct NodeHandle {
- Dbtux& m_tux; // this block
Frag& m_frag; // fragment using the node
TupLoc m_loc; // physical node address
+ TreeNode* m_node; // pointer to node storage
AccSize m_acc; // accessed size
union {
Uint32 m_next; // next active node under fragment
Uint32 nextPool;
};
- TreeNode* m_node; // pointer to node storage
- Uint32 m_cache[MaxTreeNodeSize];
- NodeHandle(Dbtux& tux, Frag& frag);
+ NodeHandle(Frag& frag);
// getters
TupLoc getLink(unsigned i);
unsigned getChilds(); // cannot spell
@@ -532,17 +530,8 @@ private:
void setOccup(unsigned n);
void setBalance(int b);
void setNodeScan(Uint32 scanPtrI);
- // operations XXX maybe these should move to Dbtux level
- void pushUp(Signal* signal, unsigned pos, const TreeEnt& ent);
- void popDown(Signal* signal, unsigned pos, TreeEnt& ent);
- void pushDown(Signal* signal, unsigned pos, TreeEnt& ent);
- void popUp(Signal* signal, unsigned pos, TreeEnt& ent);
- void slide(Signal* signal, Ptr<NodeHandle> nodePtr, unsigned i);
- void linkScan(Dbtux::ScanOpPtr scanPtr);
- void unlinkScan(Dbtux::ScanOpPtr scanPtr);
- bool islinkScan(Dbtux::ScanOpPtr scanPtr);
- // for ndbrequire
- void progError(int line, int cause, const char* extra);
+ // for ndbrequire and ndbassert
+ void progError(int line, int cause, const char* file);
};
typedef Ptr<NodeHandle> NodeHandlePtr;
ArrayPool<NodeHandle> c_nodeHandlePool;
@@ -656,7 +645,6 @@ private:
void execTUX_MAINT_REQ(Signal* signal);
void tupReadAttrs(Signal* signal, const Frag& frag, ReadPar& readPar);
void tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar);
- void tupStoreTh(Signal* signal, const Frag& frag, NodeHandlePtr nodePtr, StorePar storePar);
/*
* DbtuxNode.cpp
@@ -668,8 +656,18 @@ private:
void insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc);
void deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr);
void accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc);
- void setNodePref(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
+ void setNodePref(Signal* signal, NodeHandle& node, unsigned i);
void commitNodes(Signal* signal, Frag& frag, bool updateOk);
+ // node operations
+ void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
+ void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
+ void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
+ void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
+ void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i);
+ // scans linked to node
+ void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
+ void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
+ bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);
/*
* DbtuxTree.cpp
@@ -1084,13 +1082,12 @@ Dbtux::FragOp::FragOp() :
// Dbtux::NodeHandle
inline
-Dbtux::NodeHandle::NodeHandle(Dbtux& tux, Frag& frag) :
- m_tux(tux),
+Dbtux::NodeHandle::NodeHandle(Frag& frag) :
m_frag(frag),
m_loc(),
+ m_node(0),
m_acc(AccNone),
- m_next(RNIL),
- m_node(0)
+ m_next(RNIL)
{
}
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
index 0d867b6c501..75e942adbba 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
@@ -270,97 +270,3 @@ Dbtux::tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar)
readPar.m_count = numKeys;
readPar.m_size = copyPar.m_numwords;
}
-
-/*
- * Operate on index node tuple in TUP. The data is copied between node
- * cache and index storage via signal data.
- */
-void
-Dbtux::tupStoreTh(Signal* signal, const Frag& frag, NodeHandlePtr nodePtr, StorePar storePar)
-{
- const TreeHead& tree = frag.m_tree;
- // define the direct signal
- TupStoreTh* req = (TupStoreTh*)signal->getDataPtrSend();
- req->errorCode = RNIL;
- req->tableId = frag.m_indexId;
- req->fragId = frag.m_fragId;
- req->fragPtrI = frag.m_tupIndexFragPtrI;
- req->tupAddr = RNIL; // no longer used
- req->tupVersion = 0; // no longer used
- req->pageId = nodePtr.p->m_loc.m_pageId;
- req->pageOffset = nodePtr.p->m_loc.m_pageOffset;
- req->bufferId = 0;
- req->opCode = storePar.m_opCode;
- ndbrequire(storePar.m_offset + storePar.m_size <= tree.m_nodeSize);
- req->dataOffset = storePar.m_offset;
- req->dataSize = storePar.m_size;
- // the node cache
- ndbrequire(nodePtr.p->m_node != 0);
- // the buffer in signal data
- Uint32* const buffer = (Uint32*)req + TupStoreTh::SignalLength;
- // copy in data
- switch (storePar.m_opCode) {
- case TupStoreTh::OpRead:
- jam();
- #ifdef VM_TRACE
- {
- Uint32* dst = buffer + storePar.m_offset;
- memset(dst, 0xa9, storePar.m_size << 2);
- }
- #endif
- break;
- case TupStoreTh::OpInsert:
- jam();
- // fallthru
- case TupStoreTh::OpUpdate:
- jam();
- // copy from cache to signal data
- {
- Uint32* dst = buffer + storePar.m_offset;
- const Uint32* src = (const Uint32*)nodePtr.p->m_node + storePar.m_offset;
- memcpy(dst, src, storePar.m_size << 2);
- }
- break;
- case TupStoreTh::OpDelete:
- jam();
- break;
- default:
- ndbrequire(false);
- break;
- }
- // execute
- EXECUTE_DIRECT(DBTUP, GSN_TUP_STORE_TH, signal, TupStoreTh::SignalLength);
- jamEntry();
- if (req->errorCode != 0) {
- jam();
- storePar.m_errorCode = req->errorCode;
- return;
- }
- ndbrequire(req->errorCode == 0);
- // copy out data
- switch (storePar.m_opCode) {
- case TupStoreTh::OpRead:
- jam();
- {
- Uint32* dst = (Uint32*)nodePtr.p->m_node + storePar.m_offset;
- const Uint32* src = (const Uint32*)buffer + storePar.m_offset;
- memcpy(dst, src, storePar.m_size << 2);
- }
- break;
- case TupStoreTh::OpInsert:
- jam();
- nodePtr.p->m_loc.m_pageId = req->pageId;
- nodePtr.p->m_loc.m_pageOffset = req->pageOffset;
- break;
- case TupStoreTh::OpUpdate:
- jam();
- break;
- case TupStoreTh::OpDelete:
- jam();
- nodePtr.p->m_loc = NullTupLoc;
- break;
- default:
- ndbrequire(false);
- break;
- }
-}
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
index fb5bc92adc9..4a81dc60316 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
@@ -32,7 +32,7 @@ Dbtux::seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr)
jam();
return;
}
- new (nodePtr.p) NodeHandle(*this, frag);
+ new (nodePtr.p) NodeHandle(frag);
nodePtr.p->m_next = frag.m_nodeList;
frag.m_nodeList = nodePtr.i;
}
@@ -158,7 +158,7 @@ Dbtux::deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr)
// invalidate handle and storage
tmpPtr.p->m_loc = NullTupLoc;
tmpPtr.p->m_node = 0;
- // scans have already been moved by popDown or popUp
+ // scans have already been moved by nodePopDown or nodePopUp
}
/*
@@ -179,8 +179,9 @@ Dbtux::accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize ac
* Set prefix.
*/
void
-Dbtux::setNodePref(Signal* signal, Frag& frag, NodeHandle& node, unsigned i)
+Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i)
{
+ Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
ReadPar readPar;
ndbrequire(i <= 1);
@@ -219,7 +220,7 @@ Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk)
}
}
-// Dbtux::NodeHandle
+// node operations
/*
* Add entry at position. Move entries greater than or equal to the old
@@ -231,25 +232,26 @@ Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk)
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*/
void
-Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
+Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent)
{
- TreeHead& tree = m_frag.m_tree;
- const unsigned occup = getOccup();
+ Frag& frag = node.m_frag;
+ TreeHead& tree = frag.m_tree;
+ const unsigned occup = node.getOccup();
ndbrequire(occup < tree.m_maxOccup && pos <= occup);
// fix scans
ScanOpPtr scanPtr;
- scanPtr.i = getNodeScan();
+ scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
- m_tux.c_scanOpPool.getPtr(scanPtr);
+ c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
+ ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
if (scanPos.m_pos >= pos) {
jam();
#ifdef VM_TRACE
- if (m_tux.debugFlags & m_tux.DebugScan) {
- m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
- m_tux.debugOut << "At pushUp pos=" << pos << " " << *this << endl;
+ if (debugFlags & DebugScan) {
+ debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At pushUp pos=" << pos << " " << node << endl;
}
#endif
scanPos.m_pos++;
@@ -257,7 +259,7 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
- TreeEnt* const entList = tree.getEntList(m_node);
+ TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
@@ -266,17 +268,17 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
}
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
- setOccup(occup + 1);
+ node.setOccup(occup + 1);
// fix prefixes
if (occup == 0 || pos == 0)
- m_tux.setNodePref(signal, m_frag, *this, 0);
+ setNodePref(signal, node, 0);
if (occup == 0 || pos == occup)
- m_tux.setNodePref(signal, m_frag, *this, 1);
+ setNodePref(signal, node, 1);
}
/*
* Remove and return entry at position. Move entries greater than the
- * removed one to the left. This is the opposite of pushUp.
+ * removed one to the left. This is the opposite of nodePushUp.
*
* D
* ^ ^
@@ -284,46 +286,47 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*/
void
-Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
+Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
{
- TreeHead& tree = m_frag.m_tree;
- const unsigned occup = getOccup();
+ Frag& frag = node.m_frag;
+ TreeHead& tree = frag.m_tree;
+ const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr;
// move scans whose entry disappears
- scanPtr.i = getNodeScan();
+ scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
- m_tux.c_scanOpPool.getPtr(scanPtr);
+ c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
+ ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
- if (m_tux.debugFlags & m_tux.DebugScan) {
- m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
- m_tux.debugOut << "At popDown pos=" << pos << " " << *this << endl;
+ if (debugFlags & DebugScan) {
+ debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At popDown pos=" << pos << " " << node << endl;
}
#endif
- m_tux.scanNext(signal, scanPtr);
+ scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
}
// fix other scans
- scanPtr.i = getNodeScan();
+ scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
- m_tux.c_scanOpPool.getPtr(scanPtr);
+ c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
+ ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos > pos) {
jam();
#ifdef VM_TRACE
- if (m_tux.debugFlags & m_tux.DebugScan) {
- m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
- m_tux.debugOut << "At popDown pos=" << pos << " " << *this << endl;
+ if (debugFlags & DebugScan) {
+ debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At popDown pos=" << pos << " " << node << endl;
}
#endif
scanPos.m_pos--;
@@ -331,7 +334,7 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
- TreeEnt* const entList = tree.getEntList(m_node);
+ TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
ent = tmpList[pos];
@@ -340,12 +343,12 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
tmpList[i] = tmpList[i + 1];
}
entList[0] = entList[occup - 1];
- setOccup(occup - 1);
+ node.setOccup(occup - 1);
// fix prefixes
if (occup != 1 && pos == 0)
- m_tux.setNodePref(signal, m_frag, *this, 0);
+ setNodePref(signal, node, 0);
if (occup != 1 && pos == occup - 1)
- m_tux.setNodePref(signal, m_frag, *this, 1);
+ setNodePref(signal, node, 1);
}
/*
@@ -358,47 +361,48 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*/
void
-Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
+Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
{
- TreeHead& tree = m_frag.m_tree;
- const unsigned occup = getOccup();
+ Frag& frag = node.m_frag;
+ TreeHead& tree = frag.m_tree;
+ const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr;
// move scans whose entry disappears
- scanPtr.i = getNodeScan();
+ scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
- m_tux.c_scanOpPool.getPtr(scanPtr);
+ c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
+ ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == 0) {
jam();
#ifdef VM_TRACE
- if (m_tux.debugFlags & m_tux.DebugScan) {
- m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
- m_tux.debugOut << "At pushDown pos=" << pos << " " << *this << endl;
+ if (debugFlags & DebugScan) {
+ debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At pushDown pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
- m_tux.scanNext(signal, scanPtr);
+ scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
}
// fix other scans
- scanPtr.i = getNodeScan();
+ scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
- m_tux.c_scanOpPool.getPtr(scanPtr);
+ c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
+ ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
ndbrequire(scanPos.m_pos != 0);
if (scanPos.m_pos <= pos) {
jam();
#ifdef VM_TRACE
- if (m_tux.debugFlags & m_tux.DebugScan) {
- m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
- m_tux.debugOut << "At pushDown pos=" << pos << " " << *this << endl;
+ if (debugFlags & DebugScan) {
+ debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At pushDown pos=" << pos << " " << node << endl;
}
#endif
scanPos.m_pos--;
@@ -406,7 +410,7 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
- TreeEnt* const entList = tree.getEntList(m_node);
+ TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt oldMin = tmpList[0];
@@ -419,15 +423,15 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
entList[0] = entList[occup];
// fix prefixes
if (true)
- m_tux.setNodePref(signal, m_frag, *this, 0);
+ setNodePref(signal, node, 0);
if (occup == 1 || pos == occup - 1)
- m_tux.setNodePref(signal, m_frag, *this, 1);
+ setNodePref(signal, node, 1);
}
/*
* Remove and return entry at position. Move entries less than the
* removed one to the right. Replace min entry by the input entry.
- * This is the opposite of pushDown.
+ * This is the opposite of nodePushDown.
*
* X D
* v ^ ^
@@ -435,47 +439,48 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*/
void
-Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
+Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
{
- TreeHead& tree = m_frag.m_tree;
- const unsigned occup = getOccup();
+ Frag& frag = node.m_frag;
+ TreeHead& tree = frag.m_tree;
+ const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr;
// move scans whose entry disappears
- scanPtr.i = getNodeScan();
+ scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
- m_tux.c_scanOpPool.getPtr(scanPtr);
+ c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
+ ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
- if (m_tux.debugFlags & m_tux.DebugScan) {
- m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
- m_tux.debugOut << "At popUp pos=" << pos << " " << *this << endl;
+ if (debugFlags & DebugScan) {
+ debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At popUp pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
- m_tux.scanNext(signal, scanPtr);
+ scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
}
// fix other scans
- scanPtr.i = getNodeScan();
+ scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
- m_tux.c_scanOpPool.getPtr(scanPtr);
+ c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == m_loc && scanPos.m_pos < occup);
+ ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos < pos) {
jam();
#ifdef VM_TRACE
- if (m_tux.debugFlags & m_tux.DebugScan) {
- m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
- m_tux.debugOut << "At popUp pos=" << pos << " " << *this << endl;
+ if (debugFlags & DebugScan) {
+ debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At popUp pos=" << pos << " " << node << endl;
}
#endif
scanPos.m_pos++;
@@ -483,7 +488,7 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
- TreeEnt* const entList = tree.getEntList(m_node);
+ TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
@@ -496,9 +501,9 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
entList[0] = entList[occup];
// fix prefixes
if (true)
- m_tux.setNodePref(signal, m_frag, *this, 0);
+ setNodePref(signal, node, 0);
if (occup == 1 || pos == occup - 1)
- m_tux.setNodePref(signal, m_frag, *this, 1);
+ setNodePref(signal, node, 1);
}
/*
@@ -506,14 +511,15 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
* after the max (i=1). XXX can be optimized
*/
void
-Dbtux::NodeHandle::slide(Signal* signal, NodeHandlePtr nodePtr, unsigned i)
+Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i)
{
+ Frag& frag = dstNode.m_frag;
+ TreeHead& tree = frag.m_tree;
ndbrequire(i <= 1);
- TreeHead& tree = m_frag.m_tree;
- while (getOccup() < tree.m_maxOccup && nodePtr.p->getOccup() != 0) {
+ while (dstNode.getOccup() < tree.m_maxOccup && srcNode.getOccup() != 0) {
TreeEnt ent;
- nodePtr.p->popDown(signal, i == 0 ? nodePtr.p->getOccup() - 1 : 0, ent);
- pushUp(signal, i == 0 ? 0 : getOccup(), ent);
+ nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent);
+ nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent);
}
}
@@ -522,50 +528,50 @@ Dbtux::NodeHandle::slide(Signal* signal, NodeHandlePtr nodePtr, unsigned i)
* ordering does not matter.
*/
void
-Dbtux::NodeHandle::linkScan(Dbtux::ScanOpPtr scanPtr)
+Dbtux::linkScan(NodeHandle& node, ScanOpPtr scanPtr)
{
#ifdef VM_TRACE
- if (m_tux.debugFlags & m_tux.DebugScan) {
- m_tux.debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl;
- m_tux.debugOut << "To node " << *this << endl;
+ if (debugFlags & DebugScan) {
+ debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "To node " << node << endl;
}
#endif
- ndbrequire(! islinkScan(scanPtr) && scanPtr.p->m_nodeScan == RNIL);
- scanPtr.p->m_nodeScan = getNodeScan();
- setNodeScan(scanPtr.i);
+ ndbrequire(! islinkScan(node, scanPtr) && scanPtr.p->m_nodeScan == RNIL);
+ scanPtr.p->m_nodeScan = node.getNodeScan();
+ node.setNodeScan(scanPtr.i);
}
/*
* Unlink a scan from the list under the node.
*/
void
-Dbtux::NodeHandle::unlinkScan(Dbtux::ScanOpPtr scanPtr)
+Dbtux::unlinkScan(NodeHandle& node, ScanOpPtr scanPtr)
{
#ifdef VM_TRACE
- if (m_tux.debugFlags & m_tux.DebugScan) {
- m_tux.debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl;
- m_tux.debugOut << "From node " << *this << endl;
+ if (debugFlags & DebugScan) {
+ debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "From node " << node << endl;
}
#endif
- Dbtux::ScanOpPtr currPtr;
- currPtr.i = getNodeScan();
- Dbtux::ScanOpPtr prevPtr;
+ ScanOpPtr currPtr;
+ currPtr.i = node.getNodeScan();
+ ScanOpPtr prevPtr;
prevPtr.i = RNIL;
while (true) {
jam();
- m_tux.c_scanOpPool.getPtr(currPtr);
+ c_scanOpPool.getPtr(currPtr);
Uint32 nextPtrI = currPtr.p->m_nodeScan;
if (currPtr.i == scanPtr.i) {
jam();
if (prevPtr.i == RNIL) {
- setNodeScan(nextPtrI);
+ node.setNodeScan(nextPtrI);
} else {
jam();
prevPtr.p->m_nodeScan = nextPtrI;
}
scanPtr.p->m_nodeScan = RNIL;
// check for duplicates
- ndbrequire(! islinkScan(scanPtr));
+ ndbrequire(! islinkScan(node, scanPtr));
return;
}
prevPtr = currPtr;
@@ -577,13 +583,13 @@ Dbtux::NodeHandle::unlinkScan(Dbtux::ScanOpPtr scanPtr)
* Check if a scan is linked to this node. Only for ndbrequire.
*/
bool
-Dbtux::NodeHandle::islinkScan(Dbtux::ScanOpPtr scanPtr)
+Dbtux::islinkScan(NodeHandle& node, ScanOpPtr scanPtr)
{
- Dbtux::ScanOpPtr currPtr;
- currPtr.i = getNodeScan();
+ ScanOpPtr currPtr;
+ currPtr.i = node.getNodeScan();
while (currPtr.i != RNIL) {
jam();
- m_tux.c_scanOpPool.getPtr(currPtr);
+ c_scanOpPool.getPtr(currPtr);
if (currPtr.i == scanPtr.i) {
jam();
return true;
@@ -594,7 +600,7 @@ Dbtux::NodeHandle::islinkScan(Dbtux::ScanOpPtr scanPtr)
}
void
-Dbtux::NodeHandle::progError(int line, int cause, const char* extra)
+Dbtux::NodeHandle::progError(int line, int cause, const char* file)
{
- m_tux.progError(line, cause, extra);
+ ErrorReporter::handleAssert("Dbtux::NodeHandle: assert failed", file, line);
}
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
index f97175666f9..376af4ff5bf 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
@@ -280,7 +280,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandlePtr nodePtr;
selectNode(signal, frag, nodePtr, loc, AccHead);
- nodePtr.p->unlinkScan(scanPtr);
+ unlinkScan(*nodePtr.p, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc;
}
if (scan.m_lockwait) {
@@ -763,7 +763,7 @@ loop: {
pos.m_dir = 3;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
- nodePtr.p->linkScan(scanPtr);
+ linkScan(*nodePtr.p, scanPtr);
return;
}
if (i == 1 && ret > 0) {
@@ -779,7 +779,7 @@ loop: {
pos.m_dir = 1;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
- nodePtr.p->linkScan(scanPtr);
+ linkScan(*nodePtr.p, scanPtr);
return;
}
}
@@ -808,7 +808,7 @@ loop: {
pos.m_dir = 3;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
- nodePtr.p->linkScan(scanPtr);
+ linkScan(*nodePtr.p, scanPtr);
return;
}
}
@@ -870,7 +870,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
// get and remember original node
NodeHandlePtr origNodePtr;
selectNode(signal, frag, origNodePtr, pos.m_loc, AccHead);
- ndbrequire(origNodePtr.p->islinkScan(scanPtr));
+ ndbrequire(islinkScan(*origNodePtr.p, scanPtr));
// current node in loop
NodeHandlePtr nodePtr = origNodePtr;
while (true) {
@@ -977,13 +977,13 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
ndbrequire(pos.m_loc == nodePtr.p->m_loc);
if (origNodePtr.i != nodePtr.i) {
jam();
- origNodePtr.p->unlinkScan(scanPtr);
- nodePtr.p->linkScan(scanPtr);
+ unlinkScan(*origNodePtr.p, scanPtr);
+ linkScan(*nodePtr.p, scanPtr);
}
} else if (scan.m_state == ScanOp::Last) {
jam();
ndbrequire(pos.m_loc == NullTupLoc);
- origNodePtr.p->unlinkScan(scanPtr);
+ unlinkScan(*origNodePtr.p, scanPtr);
} else {
ndbrequire(false);
}
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
index 98c2aa75e11..9af2c6672c0 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
@@ -161,7 +161,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
if (treePos.m_loc == NullTupLoc) {
jam();
insertNode(signal, frag, nodePtr, AccPref);
- nodePtr.p->pushUp(signal, 0, ent);
+ nodePushUp(signal, *nodePtr.p, 0, ent);
nodePtr.p->setSide(2);
tree.m_root = nodePtr.p->m_loc;
return;
@@ -174,11 +174,11 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// check if room for one more
if (nodePtr.p->getOccup() < tree.m_maxOccup) {
jam();
- nodePtr.p->pushUp(signal, pos, ent);
+ nodePushUp(signal, *nodePtr.p, pos, ent);
return;
}
// returns min entry
- nodePtr.p->pushDown(signal, pos - 1, ent);
+ nodePushDown(signal, *nodePtr.p, pos - 1, ent);
// find position to add the removed min entry
TupLoc childLoc = nodePtr.p->getLink(0);
if (childLoc == NullTupLoc) {
@@ -205,13 +205,13 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// check if the half-leaf/leaf has room for one more
if (nodePtr.p->getOccup() < tree.m_maxOccup) {
jam();
- nodePtr.p->pushUp(signal, pos, ent);
+ nodePushUp(signal, *nodePtr.p, pos, ent);
return;
}
// add a new node
NodeHandlePtr childPtr;
insertNode(signal, frag, childPtr, AccPref);
- childPtr.p->pushUp(signal, 0, ent);
+ nodePushUp(signal, *childPtr.p, 0, ent);
// connect parent and child
nodePtr.p->setLink(i, childPtr.p->m_loc);
childPtr.p->setLink(2, nodePtr.p->m_loc);
@@ -283,7 +283,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
// check if no underflow
if (nodePtr.p->getOccup() > tree.m_minOccup) {
jam();
- nodePtr.p->popDown(signal, pos, ent);
+ nodePopDown(signal, *nodePtr.p, pos, ent);
return;
}
// save current handle
@@ -299,13 +299,13 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
accessNode(signal, frag, nodePtr, AccFull);
// use glb max as new parent min
ent = nodePtr.p->getEnt(nodePtr.p->getOccup() - 1);
- parentPtr.p->popUp(signal, pos, ent);
+ nodePopUp(signal, *parentPtr.p, pos, ent);
// set up to remove glb max
pos = nodePtr.p->getOccup() - 1;
// fall thru to next case
}
// remove the element
- nodePtr.p->popDown(signal, pos, ent);
+ nodePopDown(signal, *nodePtr.p, pos, ent);
ndbrequire(nodePtr.p->getChilds() <= 1);
// handle half-leaf
for (unsigned i = 0; i <= 1; i++) {
@@ -327,7 +327,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
if (parentLoc != NullTupLoc) {
jam();
selectNode(signal, frag, parentPtr, nodePtr.p->getLink(2), AccFull);
- parentPtr.p->slide(signal, nodePtr, i);
+ nodeSlide(signal, *parentPtr.p, *nodePtr.p, i);
// fall thru to next case
}
// non-empty leaf
@@ -353,7 +353,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
TupLoc childLoc = nodePtr.p->getLink(1 - i);
NodeHandlePtr childPtr;
selectNode(signal, frag, childPtr, childLoc, AccFull);
- nodePtr.p->slide(signal, childPtr, 1 - i);
+ nodeSlide(signal, *nodePtr.p, *childPtr.i, 1 - i);
if (childPtr.p->getOccup() == 0) {
jam();
deleteNode(signal, frag, childPtr);
@@ -688,7 +688,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsi
TreeHead& tree = frag.m_tree;
accessNode(signal, frag, n2Ptr, AccFull);
accessNode(signal, frag, n4Ptr, AccFull);
- n4Ptr.p->slide(signal, n2Ptr, i);
+ nodeSlide(signal, *n4Ptr.p, *n2Ptr.p, i);
// implied by rule of merging half-leaves with leaves
ndbrequire(n4Ptr.p->getOccup() >= tree.m_minOccup);
ndbrequire(n2Ptr.p->getOccup() != 0);
diff --git a/ndb/src/kernel/blocks/dbtux/Times.txt b/ndb/src/kernel/blocks/dbtux/Times.txt
index d2b6fafa7a0..7b2e9c7a425 100644
--- a/ndb/src/kernel/blocks/dbtux/Times.txt
+++ b/ndb/src/kernel/blocks/dbtux/Times.txt
@@ -11,6 +11,7 @@ testOIBasic -case u -table 2 -index 4 -fragtype small -threads 10 -rows 100000 -
1 million rows, pk update without index, pk update with index
shows ms / 1000 rows for each and pct overhead
+the figures are based on single run on idle machine
040616 mc02/a 40 ms 87 ms 114 pct
mc02/b 51 ms 128 ms 148 pct
@@ -27,5 +28,9 @@ optim 3 mc02/a 43 ms 80 ms 85 pct
optim 4 mc02/a 42 ms 80 ms 87 pct
mc02/b 51 ms 119 ms 129 pct
+optim 5 mc02/a 43 ms 77 ms 77 pct
+ mc02/b 54 ms 118 ms 117 pct
+
+
vim: set et: