summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <pekka@mysql.com>2005-11-18 00:09:24 +0100
committerunknown <pekka@mysql.com>2005-11-18 00:09:24 +0100
commit5db598890e878c9da62465856d943b3d5efeb795 (patch)
tree79027866307115423ae19509e894b22d9a871fcd
parent081cc1558734f2ee7a674ddacaa2436b2a46d039 (diff)
downloadmariadb-git-5db598890e878c9da62465856d943b3d5efeb795.tar.gz
ndb - wl#1497 LinearPool seize_index
storage/ndb/src/kernel/vm/LinearPool.hpp: wl#1497 LinearPool seize_index storage/ndb/src/kernel/vm/testSuperPool.cpp: wl#1497 LinearPool seize_index
-rw-r--r--storage/ndb/src/kernel/vm/LinearPool.hpp326
-rw-r--r--storage/ndb/src/kernel/vm/testSuperPool.cpp53
2 files changed, 300 insertions, 79 deletions
diff --git a/storage/ndb/src/kernel/vm/LinearPool.hpp b/storage/ndb/src/kernel/vm/LinearPool.hpp
index 188185701f5..bb95a1e0016 100644
--- a/storage/ndb/src/kernel/vm/LinearPool.hpp
+++ b/storage/ndb/src/kernel/vm/LinearPool.hpp
@@ -17,7 +17,8 @@
#ifndef LINEAR_POOL_HPP
#define LINEAR_POOL_HPP
-#include <SuperPool.hpp>
+#include <Bitmask.hpp>
+#include "SuperPool.hpp"
/*
* LinearPool - indexed record pool
@@ -38,22 +39,26 @@
*
* This works exactly like numbers in a given base. Each map has base
* size entries. For implementation convenience the base must be power
- * of 2 and is given as its log2 value.
+ * of 2 between 2^1 and 2^15. It is given by its log2 value (1-15).
+ *
+ * A position in a map is also called a "digit".
*
* There is a doubly linked list of available maps (some free entries)
- * on each level. There is a singly linked free list within each map.
+ * on each level. There is a doubly linked freelist within each map.
+ * There is also a bitmask of used entries in each map.
*
* Level 0 free entry has space for one record. Level N free entry
* implies space for base^N records. The implied levels are created and
- * removed on demand. Completely empty maps are removed.
+ * removed on demand. Empty maps are usually removed.
*
* Default base is 256 (log2 = 8) which requires maximum 4 levels or
- * "digits" (similar to ip address).
+ * digits (similar to ip address).
*
* TODO
*
* - move most of the inline code to LinearPool.cpp
- * - add methods to check / seize user-specified index
+ * - optimize for common case
+ * - add optimized 2-level implementation (?)
*/
#include "SuperPool.hpp"
@@ -71,6 +76,9 @@ class LinearPool {
// Max possible levels (0 to max root level).
STATIC_CONST( MaxLevels = (32 + LogBase - 1) / LogBase );
+ // Number of words in map used bit mask.
+ STATIC_CONST( BitmaskSize = (Base + 31) / 32 );
+
// Map.
struct Map {
Uint32 m_level;
@@ -80,6 +88,7 @@ class LinearPool {
Uint32 m_index; // from root to here
PtrI m_nextavail;
PtrI m_prevavail;
+ Uint32 m_bitmask[BitmaskSize];
PtrI m_entry[Base];
};
@@ -97,9 +106,15 @@ public:
// Allocate record from the pool. Reuses free index if possible.
bool seize(Ptr<T>& ptr);
+ // Allocate given index. Like seize but returns -1 if in use.
+ int seize_index(Ptr<T>& ptr, Uint32 index);
+
// Return record to the pool.
void release(Ptr<T>& ptr);
+ // Return number of used records (may require 1 page scan).
+ Uint32 count();
+
// Verify (debugging).
void verify();
@@ -114,6 +129,12 @@ private:
// Add new non-root map.
bool add_map(Ptr<Map>& map_ptr, Ptr<Map> parent_ptr, Uint32 digit);
+ // Subroutine to initialize map free lists.
+ void init_free(Ptr<Map> map_ptr);
+
+ // Add entry at given free position.
+ void add_entry(Ptr<Map> map_ptr, Uint32 digit, PtrI ptr_i);
+
// Remove entry and map if it becomes empty.
void remove_entry(Ptr<Map> map_ptr, Uint32 digit);
@@ -126,8 +147,11 @@ private:
// Remove map from available list.
void remove_avail(Ptr<Map> map_ptr);
+ // Verify available lists
+ void verify_avail();
+
// Verify map (recursive).
- void verify(Ptr<Map> map_ptr, Uint32 level);
+ void verify_map(Ptr<Map> map_ptr, Uint32 level, Uint32* count);
RecordPool<T> m_records;
RecordPool<Map> m_maps;
@@ -166,6 +190,7 @@ LinearPool<T, LogBase>::getPtr(Ptr<T>& ptr)
// get record
Ptr<T> rec_ptr;
Uint32 digit = index & DigitMask;
+ assert(BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, digit));
rec_ptr.i = map_ptr.p->m_entry[digit];
m_records.getPtr(rec_ptr);
ptr.p = rec_ptr.p;
@@ -193,26 +218,20 @@ LinearPool<T, LogBase>::seize(Ptr<T>& ptr)
}
m_maps.getPtr(map_ptr);
// walk down creating missing levels and using an entry on each
- Uint32 firstfree;
+ Uint32 digit;
+ Ptr<Map> new_ptr;
+ new_ptr.i = RNIL;
while (true) {
- assert(map_ptr.p->m_occup < Base);
- map_ptr.p->m_occup++;
- firstfree = map_ptr.p->m_firstfree;
- assert(firstfree < Base);
- map_ptr.p->m_firstfree = map_ptr.p->m_entry[firstfree];
- if (map_ptr.p->m_occup == Base) {
- assert(map_ptr.p->m_firstfree == Base);
- // remove from available list
- remove_avail(map_ptr);
- }
+ digit = map_ptr.p->m_firstfree;
if (n == 0)
break;
Ptr<Map> child_ptr;
- if (! add_map(child_ptr, map_ptr, firstfree)) {
- remove_entry(map_ptr, firstfree);
+ if (! add_map(child_ptr, map_ptr, digit)) {
+ if (new_ptr.i != RNIL)
+ remove_map(new_ptr);
return false;
}
- map_ptr.p->m_entry[firstfree] = child_ptr.i;
+ new_ptr = child_ptr;
map_ptr = child_ptr;
n--;
}
@@ -220,11 +239,71 @@ LinearPool<T, LogBase>::seize(Ptr<T>& ptr)
assert(map_ptr.p->m_level == 0);
Ptr<T> rec_ptr;
if (! m_records.seize(rec_ptr)) {
- remove_entry(map_ptr, firstfree);
+ if (new_ptr.i != RNIL)
+ remove_map(new_ptr);
return false;
}
- map_ptr.p->m_entry[firstfree] = rec_ptr.i;
- ptr.i = firstfree + (map_ptr.p->m_index << LogBase);
+ add_entry(map_ptr, digit, rec_ptr.i);
+ ptr.i = digit + (map_ptr.p->m_index << LogBase);
+ ptr.p = rec_ptr.p;
+ return true;
+}
+
+template <class T, Uint32 LogBase>
+inline int
+LinearPool<T, LogBase>::seize_index(Ptr<T>& ptr, Uint32 index)
+{
+ // extract all digits at least up to current root level
+ Uint32 digits[MaxLevels];
+ Uint32 n = 0;
+ Uint32 tmp = index;
+ do {
+ digits[n] = tmp & DigitMask;
+ tmp >>= LogBase;
+ } while (++n < m_levels || tmp != 0);
+ // add any new root levels
+ while (n > m_levels) {
+ if (! add_root())
+ return false;
+ }
+ // start from root
+ Ptr<Map> map_ptr;
+ map_ptr.i = m_root;
+ m_maps.getPtr(map_ptr);
+ // walk down creating or re-using existing levels
+ Uint32 digit;
+ bool used;
+ Ptr<Map> new_ptr;
+ new_ptr.i = RNIL;
+ while (true) {
+ digit = digits[--n];
+ used = BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, digit);
+ if (n == 0)
+ break;
+ if (used) {
+ map_ptr.i = map_ptr.p->m_entry[digit];
+ m_maps.getPtr(map_ptr);
+ } else {
+ Ptr<Map> child_ptr;
+ if (! add_map(child_ptr, map_ptr, digit)) {
+ if (new_ptr.i != RNIL)
+ remove_map(new_ptr);
+ }
+ new_ptr = child_ptr;
+ map_ptr = child_ptr;
+ }
+ }
+ // now at level 0
+ assert(map_ptr.p->m_level == 0);
+ Ptr<T> rec_ptr;
+ if (used || ! m_records.seize(rec_ptr)) {
+ if (new_ptr.i != RNIL)
+ remove_map(new_ptr);
+ return used ? -1 : false;
+ }
+ add_entry(map_ptr, digit, rec_ptr.i);
+ assert(index == digit + (map_ptr.p->m_index << LogBase));
+ ptr.i = index;
ptr.p = rec_ptr.p;
return true;
}
@@ -250,16 +329,31 @@ LinearPool<T, LogBase>::release(Ptr<T>& ptr)
}
template <class T, Uint32 LogBase>
+inline Uint32
+LinearPool<T, LogBase>::count()
+{
+ SuperPool& sp = m_records.m_superPool;
+ Uint32 count1 = sp.getRecUseCount(m_records.m_recInfo);
+ return count1;
+}
+
+template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::verify()
{
- if (m_root == RNIL)
+ verify_avail();
+ if (m_root == RNIL) {
+ assert(m_levels == 0);
return;
+ }
assert(m_levels != 0);
Ptr<Map> map_ptr;
map_ptr.i = m_root;
m_maps.getPtr(map_ptr);
- verify(map_ptr, m_levels - 1);
+ Uint32 count1 = count();
+ Uint32 count2 = 0;
+ verify_map(map_ptr, m_levels - 1, &count2);
+ assert(count1 == count2);
}
// private methods
@@ -303,26 +397,18 @@ LinearPool<T, LogBase>::add_root()
assert(n < MaxLevels);
// set up
map_ptr.p->m_level = n;
- if (n == 0) {
- map_ptr.p->m_occup = 0;
- map_ptr.p->m_firstfree = 0;
- } else {
- // on level > 0 digit 0 points to old root
- map_ptr.p->m_occup = 1;
- map_ptr.p->m_firstfree = 1;
+ map_ptr.p->m_parent = RNIL;
+ map_ptr.p->m_index = 0;
+ init_free(map_ptr);
+ // on level > 0 digit 0 points to old root
+ if (n > 0) {
Ptr<Map> old_ptr;
old_ptr.i = m_root;
m_maps.getPtr(old_ptr);
assert(old_ptr.p->m_parent == RNIL);
old_ptr.p->m_parent = map_ptr.i;
- map_ptr.p->m_entry[0] = old_ptr.i;
+ add_entry(map_ptr, 0, old_ptr.i);
}
- // set up free list with Base as terminator
- for (Uint32 j = map_ptr.p->m_firstfree; j < Base; j++)
- map_ptr.p->m_entry[j] = j + 1;
- map_ptr.p->m_parent = RNIL;
- map_ptr.p->m_index = 0;
- add_avail(map_ptr);
// set new root
m_root = map_ptr.i;
return true;
@@ -337,25 +423,81 @@ LinearPool<T, LogBase>::add_map(Ptr<Map>& map_ptr, Ptr<Map> parent_ptr, Uint32 d
assert(parent_ptr.p->m_level != 0);
// set up
map_ptr.p->m_level = parent_ptr.p->m_level - 1;
- map_ptr.p->m_occup = 0;
- map_ptr.p->m_firstfree = 0;
- // set up free list with Base as terminator
- for (Uint32 j = map_ptr.p->m_firstfree; j < Base; j++)
- map_ptr.p->m_entry[j] = j + 1;
map_ptr.p->m_parent = parent_ptr.i;
map_ptr.p->m_index = digit + (parent_ptr.p->m_index << LogBase);
- add_avail(map_ptr);
+ init_free(map_ptr);
+ add_entry(parent_ptr, digit, map_ptr.i);
return true;
}
template <class T, Uint32 LogBase>
inline void
+LinearPool<T, LogBase>::init_free(Ptr<Map> map_ptr)
+{
+ map_ptr.p->m_occup = 0;
+ map_ptr.p->m_firstfree = 0;
+ // freelist
+ Uint32 j;
+ Uint16 back = ZNIL;
+ for (j = 0; j < Base - 1; j++) {
+ map_ptr.p->m_entry[j] = back | ((j + 1) << 16);
+ back = j;
+ }
+ map_ptr.p->m_entry[j] = back | (ZNIL << 16);
+ // bitmask
+ BitmaskImpl::clear(BitmaskSize, map_ptr.p->m_bitmask);
+ // add to available
+ add_avail(map_ptr);
+}
+
+template <class T, Uint32 LogBase>
+inline void
+LinearPool<T, LogBase>::add_entry(Ptr<Map> map_ptr, Uint32 digit, PtrI ptr_i)
+{
+ assert(map_ptr.p->m_occup < Base && digit < Base);
+ assert(! BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, digit));
+ // unlink from freelist
+ Uint32 val = map_ptr.p->m_entry[digit];
+ Uint16 back = val & ZNIL;
+ Uint16 forw = val >> 16;
+ if (back != ZNIL) {
+ assert(back < Base);
+ map_ptr.p->m_entry[back] &= ZNIL;
+ map_ptr.p->m_entry[back] |= (forw << 16);
+ }
+ if (forw != ZNIL) {
+ assert(forw < Base);
+ map_ptr.p->m_entry[forw] &= (ZNIL << 16);
+ map_ptr.p->m_entry[forw] |= back;
+ }
+ if (back == ZNIL) {
+ map_ptr.p->m_firstfree = forw;
+ }
+ // set new value
+ map_ptr.p->m_entry[digit] = ptr_i;
+ map_ptr.p->m_occup++;
+ BitmaskImpl::set(BitmaskSize, map_ptr.p->m_bitmask, digit);
+ if (map_ptr.p->m_occup == Base)
+ remove_avail(map_ptr);
+}
+
+template <class T, Uint32 LogBase>
+inline void
LinearPool<T, LogBase>::remove_entry(Ptr<Map> map_ptr, Uint32 digit)
{
assert(map_ptr.p->m_occup != 0 && digit < Base);
- map_ptr.p->m_occup--;
- map_ptr.p->m_entry[digit] = map_ptr.p->m_firstfree;
+ assert(BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, digit));
+ // add to freelist
+ Uint32 firstfree = map_ptr.p->m_firstfree;
+ map_ptr.p->m_entry[digit] = ZNIL | (firstfree << 16);
+ if (firstfree != ZNIL) {
+ assert(firstfree < Base);
+ map_ptr.p->m_entry[firstfree] &= (ZNIL << 16);
+ map_ptr.p->m_entry[firstfree] |= digit;
+ }
map_ptr.p->m_firstfree = digit;
+ map_ptr.p->m_occup--;
+ BitmaskImpl::clear(BitmaskSize, map_ptr.p->m_bitmask, digit);
if (map_ptr.p->m_occup + 1 == Base)
add_avail(map_ptr);
else if (map_ptr.p->m_occup == 0)
@@ -375,7 +517,7 @@ LinearPool<T, LogBase>::remove_map(Ptr<Map> map_ptr)
m_maps.release(map_ptr);
if (m_root == map_ptr_i) {
assert(parent_ptr.i == RNIL);
- Uint32 used = m_maps.m_superPool.getRecUseCount(m_maps.m_recInfo);
+ Uint32 used = count();
assert(used == 0);
m_root = RNIL;
m_levels = 0;
@@ -431,49 +573,83 @@ LinearPool<T, LogBase>::remove_avail(Ptr<Map> map_ptr)
template <class T, Uint32 LogBase>
inline void
-LinearPool<T, LogBase>::verify(Ptr<Map> map_ptr, Uint32 level)
+LinearPool<T, LogBase>::verify_avail()
+{
+ // check available lists
+ for (Uint32 n = 0; n < MaxLevels; n++) {
+ Ptr<Map> map_ptr;
+ map_ptr.i = m_avail[n];
+ Uint32 back = RNIL;
+ while (map_ptr.i != RNIL) {
+ m_maps.getPtr(map_ptr);
+ assert(map_ptr.p->m_occup < Base);
+ assert(back == map_ptr.p->m_prevavail);
+ back = map_ptr.i;
+ map_ptr.i = map_ptr.p->m_nextavail;
+ }
+ }
+}
+
+template <class T, Uint32 LogBase>
+inline void
+LinearPool<T, LogBase>::verify_map(Ptr<Map> map_ptr, Uint32 level, Uint32* count)
{
assert(level < MaxLevels);
assert(map_ptr.p->m_level == level);
- Uint32 j = 0;
- while (j < Base) {
- bool free = false;
- Uint32 j2 = map_ptr.p->m_firstfree;
- while (j2 != Base) {
- if (j2 == j) {
- free = true;
- break;
- }
- assert(j2 < Base);
- j2 = map_ptr.p->m_entry[j2];
+ // check freelist
+ {
+ Uint32 nused = BitmaskImpl::count(BitmaskSize, map_ptr.p->m_bitmask);
+ assert(nused <= Base);
+ assert(map_ptr.p->m_occup == nused);
+ Uint32 nfree = 0;
+ Uint32 j = map_ptr.p->m_firstfree;
+ Uint16 back = ZNIL;
+ while (j != ZNIL) {
+ assert(j < Base);
+ assert(! BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, j));
+ Uint32 val = map_ptr.p->m_entry[j];
+ assert(back == (val & ZNIL));
+ back = j;
+ j = (val >> 16);
+ nfree++;
}
- if (! free) {
+ assert(nused + nfree == Base);
+ }
+ // check entries
+ {
+ for (Uint32 j = 0; j < Base; j++) {
+ bool free = ! BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, j);
+ if (free)
+ continue;
if (level != 0) {
Ptr<Map> child_ptr;
child_ptr.i = map_ptr.p->m_entry[j];
m_maps.getPtr(child_ptr);
assert(child_ptr.p->m_parent == map_ptr.i);
assert(child_ptr.p->m_index == j + (map_ptr.p->m_index << LogBase));
- verify(child_ptr, level - 1);
+ verify_map(child_ptr, level - 1, count);
} else {
Ptr<T> rec_ptr;
rec_ptr.i = map_ptr.p->m_entry[j];
m_records.getPtr(rec_ptr);
+ (*count)++;
}
- Ptr<Map> avail_ptr;
- avail_ptr.i = m_avail[map_ptr.p->m_level];
- bool found = false;
- while (avail_ptr.i != RNIL) {
- if (avail_ptr.i == map_ptr.i) {
- found = true;
- break;
- }
- m_maps.getPtr(avail_ptr);
- avail_ptr.i = avail_ptr.p->m_nextavail;
+ }
+ }
+ // check membership on available list
+ {
+ Ptr<Map> avail_ptr;
+ avail_ptr.i = m_avail[map_ptr.p->m_level];
+ bool found = false;
+ while (avail_ptr.i != RNIL) {
+ if (avail_ptr.i == map_ptr.i) {
+ found = true;
+ break;
}
- assert(found == (map_ptr.p->m_occup < Base));
+ m_maps.getPtr(avail_ptr);
+ avail_ptr.i = avail_ptr.p->m_nextavail;
}
- j++;
+ assert(found == (map_ptr.p->m_occup < Base));
}
}
diff --git a/storage/ndb/src/kernel/vm/testSuperPool.cpp b/storage/ndb/src/kernel/vm/testSuperPool.cpp
index 78a1a29c883..3438c12323b 100644
--- a/storage/ndb/src/kernel/vm/testSuperPool.cpp
+++ b/storage/ndb/src/kernel/vm/testSuperPool.cpp
@@ -1,6 +1,6 @@
#if 0
make -f Makefile -f - testSuperPool <<'_eof_'
-testSuperPool: testSuperPool.cpp libkernel.a
+testSuperPool: testSuperPool.cpp libkernel.a LinearPool.hpp
$(CXXCOMPILE) -o $@ $@.cpp libkernel.a -L../../common/util/.libs -lgeneral
_eof_
exit $?
@@ -57,6 +57,7 @@ random_coprime(Uint32 n)
{
Uint32 prime[] = { 101, 211, 307, 401, 503, 601, 701, 809, 907 };
Uint32 count = sizeof(prime) / sizeof(prime[0]);
+ assert(n != 0);
while (1) {
Uint32 i = urandom(count);
if (n % prime[i] != 0)
@@ -208,7 +209,7 @@ lp_test(GroupPool& gp)
ndbout << "linear pool test" << endl;
Ptr<T> ptr;
Uint32 loop;
- for (loop = 0; loop < 3 * loopcount; loop++) {
+ for (loop = 0; loop < loopcount; loop++) {
int count = 0;
while (1) {
bool ret = lp.seize(ptr);
@@ -223,6 +224,7 @@ lp_test(GroupPool& gp)
assert(ptr.p == ptr2.p);
count++;
}
+ assert(count != 0);
ndbout << "seized " << count << endl;
switch (loop % 3) {
case 0:
@@ -264,6 +266,47 @@ lp_test(GroupPool& gp)
}
break;
}
+ { Uint32 cnt = lp.count(); assert(cnt == 0); }
+ // seize_index test
+ char *used = new char [10 * count];
+ memset(used, false, sizeof(used));
+ Uint32 i, ns = 0, nr = 0;
+ for (i = 0; i < count; i++) {
+ Uint32 index = urandom(10 * count);
+ if (used[index]) {
+ ptr.i = index;
+ lp.release(ptr);
+ lp.verify();
+ nr++;
+ } else {
+ int i = lp.seize_index(ptr, index);
+ assert(i >= 0);
+ lp.verify();
+ if (i == 0) // no space
+ continue;
+ assert(ptr.i == index);
+ Ptr<T> ptr2;
+ ptr2.i = ptr.i;
+ ptr2.p = 0;
+ lp.getPtr(ptr2);
+ assert(ptr.p == ptr2.p);
+ ns++;
+ }
+ used[index] = ! used[index];
+ }
+ ndbout << "random sparse seize " << ns << " release " << nr << endl;
+ nr = 0;
+ for (i = 0; i < 10 * count; i++) {
+ if (used[i]) {
+ ptr.i = i;
+ lp.release(ptr);
+ lp.verify();
+ used[i] = false;
+ nr++;
+ }
+ }
+ ndbout << "released " << nr << endl;
+ { Uint32 cnt = lp.count(); assert(cnt == 0); }
}
}
@@ -291,8 +334,10 @@ template static void sp_test<T5>(GroupPool& sp);
template static void lp_test<T3>(GroupPool& sp);
int
-main()
+main(int argc, char** argv)
{
+ if (argc > 1 && strncmp(argv[1], "-l", 2) == 0)
+ loopcount = atoi(argv[1] + 2);
HeapPool sp(pageSize, pageBits);
sp.setInitPages(7);
sp.setMaxPages(7);
@@ -304,7 +349,7 @@ main()
ndbout << "rand " << s << endl;
int count;
count = 0;
- while (++count <= 0) {
+ while (++count <= 0) { // change to 1 to find new bug
sp_test<T1>(gp);
sp_test<T2>(gp);
sp_test<T3>(gp);