diff options
author | unknown <pekka@orca.ndb.mysql.com> | 2006-10-09 16:13:32 +0200 |
---|---|---|
committer | unknown <pekka@orca.ndb.mysql.com> | 2006-10-09 16:13:32 +0200 |
commit | eb97edf4f0d7e920c2c915662a48a0933b83604b (patch) | |
tree | 8e7539317dfd903b753f38a137824291daaef363 /ndb | |
parent | a82f6781af744c98ee483f366b8ab7606d7d0541 (diff) | |
download | mariadb-git-eb97edf4f0d7e920c2c915662a48a0933b83604b.tar.gz |
ndb - bug#20446: fix + other TUX scan improvements
ndb/src/kernel/blocks/dbtux/Dbtux.hpp:
fixes to move scan when entry removed: 1. do not check visibility to transaction 2. keep Blocked if moved to another version of same tuple 3. general improvements (remove m_match etc)
ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp:
fixes to move scan when entry removed: 1. do not check visibility to transaction 2. keep Blocked if moved to another version of same tuple 3. general improvements (remove m_match etc)
ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp:
fixes to move scan when entry removed: 1. do not check visibility to transaction 2. keep Blocked if moved to another version of same tuple 3. general improvements (remove m_match etc)
ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp:
fixes to move scan when entry removed: 1. do not check visibility to transaction 2. keep Blocked if moved to another version of same tuple 3. general improvements (remove m_match etc)
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp:
fixes to move scan when entry removed: 1. do not check visibility to transaction 2. keep Blocked if moved to another version of same tuple 3. general improvements (remove m_match etc)
ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp:
fixes to move scan when entry removed: 1. do not check visibility to transaction 2. keep Blocked if moved to another version of same tuple 3. general improvements (remove m_match etc)
Diffstat (limited to 'ndb')
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/Dbtux.hpp | 65 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp | 1 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp | 13 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp | 13 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp | 221 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp | 57 |
6 files changed, 225 insertions, 145 deletions
diff --git a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp index 2f177da9eb0..6333206a311 100644 --- a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp +++ b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp @@ -188,6 +188,7 @@ private: unsigned m_fragBit : 1; // which duplicated table fragment TreeEnt(); // methods + bool eqtuple(const TreeEnt ent) const; bool eq(const TreeEnt ent) const; int cmp(const TreeEnt ent) const; }; @@ -265,8 +266,7 @@ private: struct TreePos { TupLoc m_loc; // physical node address Uint16 m_pos; // position 0 to m_occup - Uint8 m_match; // at an existing entry - Uint8 m_dir; // see scanNext() + Uint8 m_dir; // see scanNext TreePos(); }; @@ -357,12 +357,13 @@ private: enum { Undef = 0, First = 1, // before first entry - Current = 2, // at current before locking - Blocked = 3, // at current waiting for ACC lock - Locked = 4, // at current and locked or no lock needed - Next = 5, // looking for next extry - Last = 6, // after last entry - Aborting = 7, // lock wait at scan close + Current = 2, // at some entry + Found = 3, // return current as next scan result + Blocked = 4, // found and waiting for ACC lock + Locked = 5, // found and locked or no lock needed + Next = 6, // looking for next extry + Last = 7, // after last entry + Aborting = 8, // lock wait at scan close Invalid = 9 // cannot return REF to LQH currently }; Uint16 m_state; @@ -539,6 +540,7 @@ private: void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData); void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize); void copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize); + void unpackBound(const ScanBound& bound, Data data); /* * DbtuxMeta.cpp @@ -613,7 +615,9 @@ private: void execACCKEYREF(Signal* signal); void execACC_ABORTCONF(Signal* signal); void scanFirst(ScanOpPtr scanPtr); + void scanFind(ScanOpPtr scanPtr); void scanNext(ScanOpPtr scanPtr, bool fromMaintReq); + bool scanCheck(ScanOpPtr scanPtr, TreeEnt ent); bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent); void scanClose(Signal* signal, ScanOpPtr scanPtr); void addAccLockOp(ScanOp& scan, Uint32 accLockOp); @@ -623,8 +627,8 @@ private: /* * DbtuxSearch.cpp */ - void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); - void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); + bool searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); + bool searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos); void searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); void searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); @@ -782,6 +786,14 @@ Dbtux::TreeEnt::TreeEnt() : } inline bool +Dbtux::TreeEnt::eqtuple(const TreeEnt ent) const +{ + return + m_tupLoc == ent.m_tupLoc && + m_fragBit == ent.m_fragBit; +} + +inline bool Dbtux::TreeEnt::eq(const TreeEnt ent) const { return @@ -793,6 +805,11 @@ Dbtux::TreeEnt::eq(const TreeEnt ent) const inline int Dbtux::TreeEnt::cmp(const TreeEnt ent) const { + // compare frag first to improve cacheing in 5.0 + if (m_fragBit < ent.m_fragBit) + return -1; + if (m_fragBit > ent.m_fragBit) + return +1; if (m_tupLoc.getPageId() < ent.m_tupLoc.getPageId()) return -1; if (m_tupLoc.getPageId() > ent.m_tupLoc.getPageId()) @@ -801,14 +818,25 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const return -1; if (m_tupLoc.getPageOffset() > ent.m_tupLoc.getPageOffset()) return +1; - if (m_tupVersion < ent.m_tupVersion) - return -1; - if (m_tupVersion > ent.m_tupVersion) - return +1; - if (m_fragBit < ent.m_fragBit) - return -1; - if (m_fragBit > ent.m_fragBit) - return +1; + /* + * Guess if one tuple version has wrapped around. This is well + * defined ordering on existing versions since versions are assigned + * consecutively and different versions exists only on uncommitted + * tuple. Assuming max 2**14 uncommitted ops on same tuple. + */ + const unsigned version_wrap_limit = (1 << (ZTUP_VERSION_BITS - 1)); + if (m_tupVersion < ent.m_tupVersion) { + if (ent.m_tupVersion - m_tupVersion < version_wrap_limit) + return -1; + else + return +1; + } + if (m_tupVersion > ent.m_tupVersion) { + if (m_tupVersion - ent.m_tupVersion < version_wrap_limit) + return +1; + else + return -1; + } return 0; } @@ -875,7 +903,6 @@ inline Dbtux::TreePos::TreePos() : m_loc(), m_pos(ZNIL), - m_match(false), m_dir(255) { } diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp index ed29dc57915..708e7d6f246 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp @@ -311,7 +311,6 @@ operator<<(NdbOut& out, const Dbtux::TreePos& pos) out << "[TreePos " << hex << &pos; out << " [loc " << pos.m_loc << "]"; out << " [pos " << dec << pos.m_pos << "]"; - out << " [match " << dec << pos.m_match << "]"; out << " [dir " << dec << pos.m_dir << "]"; out << "]"; return out; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp index 806eac1dcba..b2ec2b1f2df 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp @@ -314,4 +314,17 @@ Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 #endif } +void +Dbtux::unpackBound(const ScanBound& bound, Data dest) +{ + ScanBoundIterator iter; + bound.first(iter); + const unsigned n = bound.getSize(); + unsigned j; + for (j = 0; j < n; j++) { + dest[j] = *iter.data; + bound.next(iter); + } +} + BLOCK_FUNCTIONS(Dbtux) diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp index 4b568badc67..e7138b4110d 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp @@ -113,16 +113,17 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) // do the operation req->errorCode = 0; TreePos treePos; + bool ok; switch (opCode) { case TuxMaintReq::OpAdd: jam(); - searchToAdd(frag, c_searchKey, ent, treePos); + ok = searchToAdd(frag, c_searchKey, ent, treePos); #ifdef VM_TRACE if (debugFlags & DebugMaint) { - debugOut << treePos << (treePos.m_match ? " - error" : "") << endl; + debugOut << treePos << (! ok ? " - error" : "") << endl; } #endif - if (treePos.m_match) { + if (! ok) { jam(); // there is no "Building" state so this will have to do if (indexPtr.p->m_state == Index::Online) { @@ -152,13 +153,13 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) break; case TuxMaintReq::OpRemove: jam(); - searchToRemove(frag, c_searchKey, ent, treePos); + ok = searchToRemove(frag, c_searchKey, ent, treePos); #ifdef VM_TRACE if (debugFlags & DebugMaint) { - debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl; + debugOut << treePos << (! ok ? " - error" : "") << endl; } #endif - if (! treePos.m_match) { + if (! ok) { jam(); // there is no "Building" state so this will have to do if (indexPtr.p->m_state == Index::Online) { diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp index a61b7c1f5ca..8e01c1abccd 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp @@ -421,21 +421,17 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) jam(); // search is done only once in single range scan scanFirst(scanPtr); -#ifdef VM_TRACE - if (debugFlags & DebugScan) { - debugOut << "First scan " << scanPtr.i << " " << scan << endl; - } -#endif } - if (scan.m_state == ScanOp::Next) { + if (scan.m_state == ScanOp::Current || + scan.m_state == ScanOp::Next) { jam(); // look for next - scanNext(scanPtr, false); + scanFind(scanPtr); } - // for reading tuple key in Current or Locked state + // for reading tuple key in Found or Locked state Data pkData = c_dataBuffer; unsigned pkSize = 0; // indicates not yet done - if (scan.m_state == ScanOp::Current) { + if (scan.m_state == ScanOp::Found) { // found an entry to return jam(); ndbrequire(scan.m_accLockOp == RNIL); @@ -509,8 +505,8 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) jam(); // max ops should depend on max scans (assert only) ndbassert(false); - // stay in Current state - scan.m_state = ScanOp::Current; + // stay in Found state + scan.m_state = ScanOp::Found; signal->theData[0] = scan.m_userPtr; signal->theData[1] = true; EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); @@ -697,44 +693,95 @@ Dbtux::execACC_ABORTCONF(Signal* signal) } /* - * Find start position for single range scan. If it exists, sets state - * to Next and links the scan to the node. The first entry is returned - * by scanNext. + * Find start position for single range scan. */ void Dbtux::scanFirst(ScanOpPtr scanPtr) { ScanOp& scan = *scanPtr.p; Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl; + } +#endif TreeHead& tree = frag.m_tree; // set up index keys for this operation setKeyAttrs(frag); // scan direction 0, 1 const unsigned idir = scan.m_descending; - // unpack start key into c_dataBuffer - const ScanBound& bound = *scan.m_bound[idir]; - ScanBoundIterator iter; - bound.first(iter); - for (unsigned j = 0; j < bound.getSize(); j++) { - jam(); - c_dataBuffer[j] = *iter.data; - bound.next(iter); - } + unpackBound(*scan.m_bound[idir], c_dataBuffer); TreePos treePos; searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos); - if (treePos.m_loc == NullTupLoc) { - // empty result set + if (treePos.m_loc != NullTupLoc) { + scan.m_scanPos = treePos; + // link the scan to node found + NodeHandle node(frag); + selectNode(node, treePos.m_loc); + linkScan(node, scanPtr); + if (treePos.m_dir == 3) { + jam(); + // check upper bound + TreeEnt ent = node.getEnt(treePos.m_pos); + if (scanCheck(scanPtr, ent)) + scan.m_state = ScanOp::Current; + else + scan.m_state = ScanOp::Last; + } else { + scan.m_state = ScanOp::Next; + } + } else { jam(); scan.m_state = ScanOp::Last; - return; } - // set position and state - scan.m_scanPos = treePos; - scan.m_state = ScanOp::Next; - // link the scan to node found - NodeHandle node(frag); - selectNode(node, treePos.m_loc); - linkScan(node, scanPtr); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Leave first scan " << scanPtr.i << " " << scan << endl; + } +#endif +} + +/* + * Look for entry to return as scan result. + */ +void +Dbtux::scanFind(ScanOpPtr scanPtr) +{ + ScanOp& scan = *scanPtr.p; + Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Enter find scan " << scanPtr.i << " " << scan << endl; + } +#endif + ndbrequire(scan.m_state == ScanOp::Current || scan.m_state == ScanOp::Next); + while (1) { + jam(); + if (scan.m_state == ScanOp::Next) + scanNext(scanPtr, false); + if (scan.m_state == ScanOp::Current) { + jam(); + const TreePos pos = scan.m_scanPos; + NodeHandle node(frag); + selectNode(node, pos.m_loc); + const TreeEnt ent = node.getEnt(pos.m_pos); + if (scanVisible(scanPtr, ent)) { + jam(); + scan.m_state = ScanOp::Found; + scan.m_scanEnt = ent; + break; + } + } else { + jam(); + break; + } + scan.m_state = ScanOp::Next; + } +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Leave find scan " << scanPtr.i << " " << scan << endl; + } +#endif } /* @@ -752,6 +799,11 @@ Dbtux::scanFirst(ScanOpPtr scanPtr) * * If an entry was found, scan direction is 3. Therefore tree * re-organizations need not worry about scan direction. + * + * This method is also used to move a scan when its entry is removed + * (see moveScanList). If the scan is Blocked, we check if it remains + * Blocked on a different version of the tuple. Otherwise the tuple is + * lost and state becomes Current. */ void Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) @@ -759,8 +811,8 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) ScanOp& scan = *scanPtr.p; Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); #ifdef VM_TRACE - if (debugFlags & DebugScan) { - debugOut << "Next in scan " << scanPtr.i << " " << scan << endl; + if (debugFlags & (DebugMaint | DebugScan)) { + debugOut << "Enter next scan " << scanPtr.i << " " << scan << endl; } #endif // cannot be moved away from tuple we have locked @@ -770,15 +822,7 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) // scan direction const unsigned idir = scan.m_descending; // 0, 1 const int jdir = 1 - 2 * (int)idir; // 1, -1 - // unpack end key into c_dataBuffer - const ScanBound& bound = *scan.m_bound[1 - idir]; - ScanBoundIterator iter; - bound.first(iter); - for (unsigned j = 0; j < bound.getSize(); j++) { - jam(); - c_dataBuffer[j] = *iter.data; - bound.next(iter); - } + unpackBound(*scan.m_bound[1 - idir], c_dataBuffer); // use copy of position TreePos pos = scan.m_scanPos; // get and remember original node @@ -792,15 +836,14 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) while (true) { jam(); #ifdef VM_TRACE - if (debugFlags & DebugScan) { - debugOut << "Scan next pos " << pos << " " << node << endl; + if (debugFlags & (DebugMaint | DebugScan)) { + debugOut << "Current scan " << scanPtr.i << " pos " << pos << " node " << node << endl; } #endif if (pos.m_dir == 2) { // coming up from root ends the scan jam(); pos.m_loc = NullTupLoc; - scan.m_state = ScanOp::Last; break; } if (node.m_loc != pos.m_loc) { @@ -832,41 +875,22 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) if (pos.m_dir == idir) { // coming up from left child scan current node jam(); - pos.m_pos = idir == 0 ? 0 : occup - 1; - pos.m_match = false; + pos.m_pos = idir == 0 ? (Uint16)-1 : occup; pos.m_dir = 3; } if (pos.m_dir == 3) { - // within node + // before or within node jam(); - // advance position - if (! pos.m_match) - pos.m_match = true; - else - // becomes ZNIL (which is > occup) if 0 and scan descending - pos.m_pos += jdir; + // advance position - becomes ZNIL (> occup) if 0 and descending + pos.m_pos += jdir; if (pos.m_pos < occup) { jam(); - ent = node.getEnt(pos.m_pos); pos.m_dir = 3; // unchanged - // read and compare all attributes - readKeyAttrs(frag, ent, 0, c_entryKey); - int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, scan.m_boundCnt[1 - idir], c_entryKey); - ndbrequire(ret != NdbSqlUtil::CmpUnknown); - if (jdir * ret < 0) { + ent = node.getEnt(pos.m_pos); + if (! scanCheck(scanPtr, ent)) { jam(); - // hit upper bound of single range scan pos.m_loc = NullTupLoc; - scan.m_state = ScanOp::Last; - break; - } - // can we see it - if (! scanVisible(scanPtr, ent)) { - jam(); - continue; } - // found entry - scan.m_state = ScanOp::Current; break; } // after node proceed to right child @@ -892,31 +916,64 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) // copy back position scan.m_scanPos = pos; // relink - if (scan.m_state == ScanOp::Current) { - ndbrequire(pos.m_match == true && pos.m_dir == 3); + if (pos.m_loc != NullTupLoc) { + ndbrequire(pos.m_dir == 3); ndbrequire(pos.m_loc == node.m_loc); if (origNode.m_loc != node.m_loc) { jam(); unlinkScan(origNode, scanPtr); linkScan(node, scanPtr); } - // copy found entry - scan.m_scanEnt = ent; - } else if (scan.m_state == ScanOp::Last) { + if (scan.m_state != ScanOp::Blocked) { + scan.m_state = ScanOp::Current; + } else { + jam(); + ndbrequire(fromMaintReq); + TreeEnt& scanEnt = scan.m_scanEnt; + ndbrequire(scanEnt.m_tupLoc != NullTupLoc); + if (scanEnt.eqtuple(ent)) { + // remains blocked on another version + scanEnt = ent; + } else { + jam(); + scanEnt.m_tupLoc = NullTupLoc; + scan.m_state = ScanOp::Current; + } + } + } else { jam(); - ndbrequire(pos.m_loc == NullTupLoc); unlinkScan(origNode, scanPtr); - } else { - ndbrequire(false); + scan.m_state = ScanOp::Last; } #ifdef VM_TRACE - if (debugFlags & DebugScan) { - debugOut << "Next out scan " << scanPtr.i << " " << scan << endl; + if (debugFlags & (DebugMaint | DebugScan)) { + debugOut << "Leave next scan " << scanPtr.i << " " << scan << endl; } #endif } /* + * Check end key. Return true if scan is still within range. + */ +bool +Dbtux::scanCheck(ScanOpPtr scanPtr, TreeEnt ent) +{ + ScanOp& scan = *scanPtr.p; + Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); + const unsigned idir = scan.m_descending; + const int jdir = 1 - 2 * (int)idir; + unpackBound(*scan.m_bound[1 - idir], c_dataBuffer); + unsigned boundCnt = scan.m_boundCnt[1 - idir]; + readKeyAttrs(frag, ent, 0, c_entryKey); + int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, boundCnt, c_entryKey); + ndbrequire(ret != NdbSqlUtil::CmpUnknown); + if (jdir * ret > 0) + return true; + // hit upper bound of single range scan + return false; +} + +/* * Check if an entry is visible to the scan. * * There is a special check to never accept same tuple twice in a row. diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp index b0e2a664bfd..4b5c0b791f9 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp @@ -21,22 +21,18 @@ * Search for entry to add. * * Similar to searchToRemove (see below). - * - * TODO optimize for initial equal attrs in node min/max */ -void +bool Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) { const TreeHead& tree = frag.m_tree; const unsigned numAttrs = frag.m_numAttrs; NodeHandle currNode(frag); currNode.m_loc = tree.m_root; - // assume success - treePos.m_match = false; if (currNode.m_loc == NullTupLoc) { // empty tree jam(); - return; + return true; } NodeHandle glbNode(frag); // potential g.l.b of final node /* @@ -94,9 +90,8 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& jam(); treePos.m_loc = currNode.m_loc; treePos.m_pos = 0; - // failed - treePos.m_match = true; - return; + // entry found - error + return false; } break; } @@ -104,7 +99,7 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos.m_loc = currNode.m_loc; // binary search int lo = -1; - unsigned hi = currNode.getOccup(); + int hi = currNode.getOccup(); int ret; while (1) { jam(); @@ -126,9 +121,8 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& lo = j; else { treePos.m_pos = j; - // failed - treePos.m_match = true; - return; + // entry found - error + return false; } if (hi - lo == 1) break; @@ -136,22 +130,23 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& if (ret < 0) { jam(); treePos.m_pos = hi; - return; + return true; } if (hi < currNode.getOccup()) { jam(); treePos.m_pos = hi; - return; + return true; } if (bottomNode.isNull()) { jam(); treePos.m_pos = hi; - return; + return true; } jam(); // backwards compatible for now treePos.m_loc = bottomNode.m_loc; treePos.m_pos = 0; + return true; } /* @@ -163,21 +158,17 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& * then the saved node is the g.l.b of the final node and we move back * to it. */ -void +bool Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) { const TreeHead& tree = frag.m_tree; const unsigned numAttrs = frag.m_numAttrs; NodeHandle currNode(frag); currNode.m_loc = tree.m_root; - // assume success - treePos.m_match = true; if (currNode.m_loc == NullTupLoc) { - // empty tree + // empty tree - failed jam(); - // failed - treePos.m_match = false; - return; + return false; } NodeHandle glbNode(frag); // potential g.l.b of final node while (true) { @@ -229,7 +220,7 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo jam(); treePos.m_loc = currNode.m_loc; treePos.m_pos = 0; - return; + return true; } break; } @@ -242,12 +233,12 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo if (searchEnt.eq(currNode.getEnt(j))) { jam(); treePos.m_pos = j; - return; + return true; } } treePos.m_pos = currNode.getOccup(); - // failed - treePos.m_match = false; + // not found - failed + return false; } /* @@ -278,8 +269,6 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun currNode.m_loc = tree.m_root; NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle bottomNode(frag); - // always before entry - treePos.m_match = false; while (true) { jam(); selectNode(currNode, currNode.m_loc); @@ -315,7 +304,7 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun treePos.m_dir = 3; return; } - } else if (ret > 0) { + } else { // bound is at or right of this node jam(); const TupLoc loc = currNode.getLink(1); @@ -327,8 +316,6 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun currNode.m_loc = loc; continue; } - } else { - ndbrequire(false); } break; } @@ -369,8 +356,6 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou currNode.m_loc = tree.m_root; NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle bottomNode(frag); - // always before entry - treePos.m_match = false; while (true) { jam(); selectNode(currNode, currNode.m_loc); @@ -403,7 +388,7 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou // empty result set return; } - } else if (ret > 0) { + } else { // bound is at or right of this node jam(); const TupLoc loc = currNode.getLink(1); @@ -415,8 +400,6 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou currNode.m_loc = loc; continue; } - } else { - ndbrequire(false); } break; } |