path: root/ndb/test/ndbapi/testOIBasic.cpp
diff options
authorunknown <>2004-12-15 13:12:54 +0100
committerunknown <>2004-12-15 13:12:54 +0100
commitae971ecd9702e5ef6d78a51dd5a015da98202ed8 (patch)
tree4b4ae0a6afb23da4a7a062c9e46cf1517e5d1cd8 /ndb/test/ndbapi/testOIBasic.cpp
parent78a4cebee8566d52fc3944f5c7212724826c44e5 (diff)
parent7a7ac8f63ad6e1ba7f74ac2c85bcb4b8df3b16ee (diff)
ndb/include/kernel/AttributeHeader.hpp: Auto merged ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: Auto merged ndb/src/kernel/blocks/dbtc/DbtcMain.cpp: Auto merged ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp: Auto merged ndb/src/ndbapi/NdbScanOperation.cpp: Auto merged ndb/test/include/HugoOperations.hpp: Auto merged ndb/test/ndbapi/testOIBasic.cpp: Auto merged ndb/test/src/HugoOperations.cpp: Auto merged ndb/test/src/HugoTransactions.cpp: Auto merged ndb/test/src/UtilTransactions.cpp: Auto merged ndb/tools/select_count.cpp: Auto merged sql/ Auto merged ndb/include/ndbapi/NdbDictionary.hpp: SCCS merged ndb/src/ndbapi/NdbDictionary.cpp: SCCS merged ndb/src/ndbapi/NdbDictionaryImpl.cpp: SCCS merged
Diffstat (limited to 'ndb/test/ndbapi/testOIBasic.cpp')
1 files changed, 826 insertions, 365 deletions
diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp
index 3485f6d91fc..7a73ba872b2 100644
--- a/ndb/test/ndbapi/testOIBasic.cpp
+++ b/ndb/test/ndbapi/testOIBasic.cpp
@@ -37,6 +37,7 @@ struct Opt {
unsigned m_batch;
const char* m_bound;
const char* m_case;
+ bool m_collsp;
bool m_core;
const char* m_csname;
@@ -55,17 +56,18 @@ struct Opt {
unsigned m_scanbat;
unsigned m_scanpar;
unsigned m_scanstop;
- unsigned m_seed;
+ int m_seed;
unsigned m_subloop;
const char* m_table;
unsigned m_threads;
- unsigned m_v;
+ int m_v; // int for lint
Opt() :
+ m_collsp(false),
- m_csname("latin1_bin"),
+ m_csname("random"),
@@ -82,7 +84,7 @@ struct Opt {
- m_seed(0),
+ m_seed(-1),
@@ -104,12 +106,13 @@ printhelp()
<< " -batch N pk operations in batch [" << d.m_batch << "]" << endl
<< " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl
<< " -case abc only given test cases (letters a-z)" << endl
+ << " -collsp use strnncollsp instead of strnxfrm" << endl
<< " -core core dump on error [" << d.m_core << "]" << endl
- << " -csname S charset (collation) of non-pk char column [" << d.m_csname << "]" << endl
+ << " -csname S charset or collation [" << d.m_csname << "]" << endl
<< " -die nnn exit immediately on NDB error code nnn" << endl
<< " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
<< " -fragtype T fragment type single/small/medium/large" << endl
- << " -index xyz only given index numbers (digits 1-9)" << endl
+ << " -index xyz only given index numbers (digits 0-9)" << endl
<< " -loop N loop count full suite 0=forever [" << d.m_loop << "]" << endl
<< " -nologging create tables in no-logging mode" << endl
<< " -noverify skip index verifications" << endl
@@ -118,9 +121,9 @@ printhelp()
<< " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl
<< " -scanbat N scan batch per fragment (ignored by ndb api) [" << d.m_scanbat << "]" << endl
<< " -scanpar N scan parallelism [" << d.m_scanpar << "]" << endl
- << " -seed N srandom seed 0=loop number[" << d.m_seed << "]" << endl
+ << " -seed N srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl
<< " -subloop N subtest loop count [" << d.m_subloop << "]" << endl
- << " -table xyz only given table numbers (digits 1-9)" << endl
+ << " -table xyz only given table numbers (digits 0-9)" << endl
<< " -threads N number of threads [" << d.m_threads << "]" << endl
<< " -vN verbosity [" << d.m_v << "]" << endl
<< " -h or -help print this help text" << endl
@@ -135,9 +138,33 @@ static const bool g_store_null_key = true;
// compare NULL like normal value (NULL < not NULL, NULL == NULL)
static const bool g_compare_null = true;
+static const char* hexstr = "0123456789abcdef";
+// random ints
+static unsigned
+urandom(unsigned n)
+ if (n == 0)
+ return 0;
+ unsigned i = random() % n;
+ return i;
+static int
+irandom(unsigned n)
+ if (n == 0)
+ return 0;
+ int i = random() % n;
+ if (random() & 0x1)
+ i = -i;
+ return i;
// log and error macros
-static NdbMutex *ndbout_mutex= NULL;
+static NdbMutex *ndbout_mutex = NULL;
static unsigned getthrno();
@@ -198,7 +225,7 @@ getthrstr()
return -1; \
} while (0)
-// method parameters base class
+// method parameters
class Thr;
class Con;
@@ -222,6 +249,8 @@ struct Par : public Opt {
// value calculation
unsigned m_range;
unsigned m_pctrange;
+ unsigned m_pctbrange;
+ int m_bdir;
// choice of key
bool m_randomkey;
// do verify after read
@@ -240,7 +269,9 @@ struct Par : public Opt {
m_totrows(m_threads * m_rows),
- m_pctrange(0),
+ m_pctrange(40),
+ m_pctbrange(80),
+ m_bdir(0),
m_deadlock(false) {
@@ -248,15 +279,15 @@ struct Par : public Opt {
static bool
-usetable(unsigned i)
+usetable(Par par, unsigned i)
- return g_opt.m_table == 0 || strchr(g_opt.m_table, '1' + i) != 0;
+ return par.m_table == 0 || strchr(par.m_table, '0' + i) != 0;
static bool
-useindex(unsigned i)
+useindex(Par par, unsigned i)
- return g_opt.m_index == 0 || strchr(g_opt.m_index, '1' + i) != 0;
+ return par.m_index == 0 || strchr(par.m_index, '0' + i) != 0;
static unsigned
@@ -382,38 +413,196 @@ Lst::reset()
m_cnt = 0;
+// character sets
+static const unsigned maxcsnumber = 512;
+static const unsigned maxcharcount = 32;
+static const unsigned maxcharsize = 4;
+static const unsigned maxxmulsize = 8;
+// single mb char
+struct Chr {
+ unsigned char m_bytes[maxcharsize];
+ unsigned char m_xbytes[maxxmulsize * maxcharsize];
+ unsigned m_size;
+ Chr();
+ memset(m_bytes, 0, sizeof(m_bytes));
+ memset(m_xbytes, 0, sizeof(m_xbytes));
+ m_size = 0;
+// charset and random valid chars to use
+struct Chs {
+ unsigned m_xmul;
+ Chr* m_chr;
+ Chs(CHARSET_INFO* cs);
+ ~Chs();
+Chs::Chs(CHARSET_INFO* cs) :
+ m_cs(cs)
+ m_xmul = m_cs->strxfrm_multiply;
+ if (m_xmul == 0)
+ m_xmul = 1;
+ assert(m_xmul <= maxxmulsize);
+ m_chr = new Chr [maxcharcount];
+ unsigned i = 0;
+ unsigned miss1 = 0;
+ unsigned miss2 = 0;
+ while (i < maxcharcount) {
+ unsigned char* bytes = m_chr[i].m_bytes;
+ unsigned char* xbytes = m_chr[i].m_xbytes;
+ unsigned size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1);
+ assert(m_cs->mbminlen <= size && size <= m_cs->mbmaxlen);
+ // prefer longer chars
+ if (size == m_cs->mbminlen && m_cs->mbminlen < m_cs->mbmaxlen && urandom(5) != 0)
+ continue;
+ for (unsigned j = 0; j < size; j++) {
+ bytes[j] = urandom(256);
+ }
+ const char* sbytes = (const char*)bytes;
+ if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + size, 1) != size) {
+ miss1++;
+ continue;
+ }
+ // do not trust well_formed_len currently
+ memset(xbytes, 0, sizeof(xbytes));
+ // currently returns buffer size always
+ int xlen = (*cs->coll->strnxfrm)(cs, xbytes, m_xmul * size, bytes, size);
+ // check we got something
+ bool xok = false;
+ for (unsigned j = 0; j < xlen; j++) {
+ if (xbytes[j] != 0) {
+ xok = true;
+ break;
+ }
+ }
+ if (! xok) {
+ miss2++;
+ continue;
+ }
+ // occasional duplicate char is ok
+ m_chr[i].m_size = size;
+ i++;
+ }
+ bool disorder = true;
+ unsigned bubbels = 0;
+ while (disorder) {
+ disorder = false;
+ for (unsigned i = 1; i < maxcharcount; i++) {
+ unsigned len = sizeof(m_chr[i].m_xbytes);
+ if (memcmp(m_chr[i-1].m_xbytes, m_chr[i].m_xbytes, len) > 0) {
+ Chr chr = m_chr[i];
+ m_chr[i] = m_chr[i-1];
+ m_chr[i-1] = chr;
+ disorder = true;
+ bubbels++;
+ }
+ }
+ }
+ LL3("inited charset " << cs->name << " miss1=" << miss1 << " miss2=" << miss2 << " bubbels=" << bubbels);
+ delete [] m_chr;
+static Chs* cslist[maxcsnumber];
+static void
+ for (unsigned i = 0; i < maxcsnumber; i++) {
+ delete cslist[i];
+ cslist[i] = 0;
+ }
+static Chs*
+getcs(Par par)
+ if (par.m_cs != 0) {
+ cs = par.m_cs;
+ } else {
+ while (1) {
+ unsigned n = urandom(maxcsnumber);
+ cs = get_charset(n, MYF(0));
+ if (cs != 0) {
+ // prefer complex charsets
+ if (cs->mbmaxlen != 1 || urandom(5) == 0)
+ break;
+ }
+ }
+ }
+ if (cslist[cs->number] == 0)
+ cslist[cs->number] = new Chs(cs);
+ return cslist[cs->number];
// tables and indexes
// Col - table column
struct Col {
+ const class Tab& m_tab;
unsigned m_num;
const char* m_name;
bool m_pk;
NdbDictionary::Column::Type m_type;
unsigned m_length;
+ unsigned m_bytelength;
bool m_nullable;
+ const Chs* m_chs;
+ Col(const class Tab& tab, unsigned num, const char* name, bool pk, NdbDictionary::Column::Type type, unsigned length, bool nullable, const Chs* chs);
+ ~Col();
+ bool equal(const Col& col2) const;
void verify(const void* addr) const;
+Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, NdbDictionary::Column::Type type, unsigned length, bool nullable, const Chs* chs) :
+ m_tab(tab),
+ m_num(num),
+ m_name(strcpy(new char [strlen(name) + 1], name)),
+ m_pk(pk),
+ m_type(type),
+ m_length(length),
+ m_bytelength(length * (chs == 0 ? 1 : chs->m_cs->mbmaxlen)),
+ m_nullable(nullable),
+ m_chs(chs)
+ delete [] m_name;
+Col::equal(const Col& col2) const
+ return m_type == col2.m_type && m_length == col2.m_length && m_chs == col2.m_chs;
Col::verify(const void* addr) const
switch (m_type) {
case NdbDictionary::Column::Unsigned:
- case NdbDictionary::Column::Varchar:
+ case NdbDictionary::Column::Char:
- const unsigned char* p = (const unsigned char*)addr;
- unsigned n = (p[0] << 8) | p[1];
- assert(n <= m_length);
- unsigned i;
- for (i = 0; i < n; i++) {
- assert(p[2 + i] != 0);
- }
- for (i = n; i < m_length; i++) {
- assert(p[2 + i] == 0);
- }
+ CHARSET_INFO* cs = m_chs->m_cs;
+ const char* src = (const char*)addr;
+ unsigned len = m_bytelength;
+ assert((*cs->cset->well_formed_len)(cs, src, src + len, 0xffff) == len);
@@ -425,14 +614,16 @@ Col::verify(const void* addr) const
static NdbOut&
operator<<(NdbOut& out, const Col& col)
- out << "col " << col.m_num;
- out << " " << col.m_name;
+ out << "col[" << col.m_num << "] " << col.m_name;
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
out << " unsigned";
- case NdbDictionary::Column::Varchar:
- out << " varchar(" << col.m_length << ")";
+ case NdbDictionary::Column::Char:
+ {
+ CHARSET_INFO* cs = col.m_chs->m_cs;
+ out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
+ }
out << "type" << (int)col.m_type;
@@ -447,25 +638,60 @@ operator<<(NdbOut& out, const Col& col)
// ICol - index column
struct ICol {
+ const class ITab& m_itab;
unsigned m_num;
- struct Col m_col;
+ const Col& m_col;
+ ICol(const class ITab& itab, unsigned num, const Col& col);
+ ~ICol();
+ICol::ICol(const class ITab& itab, unsigned num, const Col& col) :
+ m_itab(itab),
+ m_num(num),
+ m_col(col)
// ITab - index
struct ITab {
+ const class Tab& m_tab;
const char* m_name;
unsigned m_icols;
- const ICol* m_icol;
+ const ICol** m_icol;
+ ITab(const class Tab& tab, const char* name, unsigned icols);
+ ~ITab();
+ITab::ITab(const class Tab& tab, const char* name, unsigned icols) :
+ m_tab(tab),
+ m_name(strcpy(new char [strlen(name) + 1], name)),
+ m_icols(icols),
+ m_icol(new const ICol* [icols + 1])
+ for (unsigned i = 0; i <= m_icols; i++)
+ m_icol[0] = 0;
+ delete [] m_name;
+ for (unsigned i = 0; i < m_icols; i++)
+ delete m_icol[i];
+ delete [] m_icol;
static NdbOut&
operator<<(NdbOut& out, const ITab& itab)
- out << "itab " << itab.m_name << " " << itab.m_icols;
+ out << "itab " << itab.m_name << " icols=" << itab.m_icols;
for (unsigned k = 0; k < itab.m_icols; k++) {
out << endl;
- out << "icol " << k << " " << itab.m_icol[k].m_col;
+ out << "icol[" << k << "] " << itab.m_icol[k]->m_col;
return out;
@@ -475,200 +701,241 @@ operator<<(NdbOut& out, const ITab& itab)
struct Tab {
const char* m_name;
unsigned m_cols;
- const Col* m_col;
+ const Col** m_col;
unsigned m_itabs;
- const ITab* m_itab;
+ const ITab** m_itab;
+ // pk must contain an Unsigned column
+ unsigned m_keycol;
+ Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol);
+ ~Tab();
+Tab::Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol) :
+ m_name(strcpy(new char [strlen(name) + 1], name)),
+ m_cols(cols),
+ m_col(new const Col* [cols + 1]),
+ m_itabs(itabs),
+ m_itab(new const ITab* [itabs + 1]),
+ m_keycol(keycol)
+ for (unsigned i = 0; i <= cols; i++)
+ m_col[i] = 0;
+ for (unsigned i = 0; i <= itabs; i++)
+ m_itab[i] = 0;
+ delete [] m_name;
+ for (unsigned i = 0; i < m_cols; i++)
+ delete m_col[i];
+ delete [] m_col;
+ for (unsigned i = 0; i < m_itabs; i++)
+ delete m_itab[i];
+ delete [] m_itab;
static NdbOut&
operator<<(NdbOut& out, const Tab& tab)
- out << "tab " << tab.m_name << " " << tab.m_cols;
+ out << "tab " << tab.m_name << " cols=" << tab.m_cols;
for (unsigned k = 0; k < tab.m_cols; k++) {
out << endl;
- out << tab.m_col[k];
+ out << *tab.m_col[k];
for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (! useindex(i))
+ if (tab.m_itab[i] == 0)
out << endl;
- out << tab.m_itab[i];
+ out << *tab.m_itab[i];
return out;
-// tt1 + tt1x1 tt1x2 tt1x3 tt1x4 tt1x5
-static const Col
-tt1col[] = {
- { 0, "A", 1, NdbDictionary::Column::Unsigned, 1, 0 },
- { 1, "B", 0, NdbDictionary::Column::Unsigned, 1, 1 },
- { 2, "C", 0, NdbDictionary::Column::Unsigned, 1, 1 },
- { 3, "D", 0, NdbDictionary::Column::Unsigned, 1, 1 },
- { 4, "E", 0, NdbDictionary::Column::Unsigned, 1, 1 }
-static const ICol
-tt1x1col[] = {
- { 0, tt1col[0] }
-static const ICol
-tt1x2col[] = {
- { 0, tt1col[1] }
-static const ICol
-tt1x3col[] = {
- { 0, tt1col[1] },
- { 1, tt1col[2] }
-static const ICol
-tt1x4col[] = {
- { 0, tt1col[3] },
- { 1, tt1col[2] },
- { 2, tt1col[1] }
-static const ICol
-tt1x5col[] = {
- { 0, tt1col[1] },
- { 1, tt1col[4] },
- { 2, tt1col[2] },
- { 3, tt1col[3] }
-static const ITab
-tt1x1 = {
- "TT1X1", 1, tt1x1col
-static const ITab
-tt1x2 = {
- "TT1X2", 1, tt1x2col
-static const ITab
-tt1x3 = {
- "TT1X3", 2, tt1x3col
-static const ITab
-tt1x4 = {
- "TT1X4", 3, tt1x4col
-static const ITab
-tt1x5 = {
- "TT1X5", 4, tt1x5col
-static const ITab
-tt1itab[] = {
- tt1x1,
- tt1x2,
- tt1x3,
- tt1x4,
- tt1x5
-static const Tab
-tt1 = {
- "TT1", 5, tt1col, 5, tt1itab
-// tt2 + tt2x1 tt2x2 tt2x3 tt2x4 tt2x5
-static const Col
-tt2col[] = {
- { 0, "A", 1, NdbDictionary::Column::Unsigned, 1, 0 },
- { 1, "B", 0, NdbDictionary::Column::Unsigned, 1, 1 },
- { 2, "C", 0, NdbDictionary::Column::Varchar, 20, 1 },
- { 3, "D", 0, NdbDictionary::Column::Varchar, 5, 1 },
- { 4, "E", 0, NdbDictionary::Column::Varchar, 5, 1 }
-static const ICol
-tt2x1col[] = {
- { 0, tt2col[0] }
-static const ICol
-tt2x2col[] = {
- { 0, tt2col[1] },
- { 1, tt2col[2] }
-static const ICol
-tt2x3col[] = {
- { 0, tt2col[2] },
- { 1, tt2col[1] }
-static const ICol
-tt2x4col[] = {
- { 0, tt2col[3] },
- { 1, tt2col[4] }
-static const ICol
-tt2x5col[] = {
- { 0, tt2col[4] },
- { 1, tt2col[3] },
- { 2, tt2col[2] },
- { 3, tt2col[1] }
-static const ITab
-tt2x1 = {
- "TT2X1", 1, tt2x1col
-static const ITab
-tt2x2 = {
- "TT2X2", 2, tt2x2col
+// make table structs
-static const ITab
-tt2x3 = {
- "TT2X3", 2, tt2x3col
-static const ITab
-tt2x4 = {
- "TT2X4", 2, tt2x4col
+static const Tab** tablist = 0;
+static unsigned tabcount = 0;
-static const ITab
-tt2x5 = {
- "TT2X5", 4, tt2x5col
-static const ITab
-tt2itab[] = {
- tt2x1,
- tt2x2,
- tt2x3,
- tt2x4,
- tt2x5
-static const Tab
-tt2 = {
- "TT2", 5, tt2col, 5, tt2itab
-// all tables
-static const Tab
-tablist[] = {
- tt1,
- tt2
+static void
+ for (unsigned j = 0; j < tabcount; j++) {
+ const Tab* t = tablist[j];
+ if (t == 0)
+ continue;
+ assert(t->m_cols != 0 && t->m_col != 0);
+ for (unsigned k = 0; k < t->m_cols; k++) {
+ const Col* c = t->m_col[k];
+ assert(c != 0 && c->m_num == k);
+ assert(! (c->m_pk && c->m_nullable));
+ }
+ assert(t->m_col[t->m_cols] == 0);
+ {
+ assert(t->m_keycol < t->m_cols);
+ const Col* c = t->m_col[t->m_keycol];
+ assert(c->m_pk && c->m_type == NdbDictionary::Column::Unsigned);
+ }
+ assert(t->m_itabs != 0 && t->m_itab != 0);
+ for (unsigned i = 0; i < t->m_itabs; i++) {
+ const ITab* x = t->m_itab[i];
+ if (x == 0)
+ continue;
+ assert(x != 0 && x->m_icols != 0 && x->m_icol != 0);
+ for (unsigned k = 0; k < x->m_icols; k++) {
+ const ICol* c = x->m_icol[k];
+ assert(c != 0 && c->m_num == k && c->m_col.m_num < t->m_cols);
+ }
+ }
+ assert(t->m_itab[t->m_itabs] == 0);
+ }
-static const unsigned
-tabcount = sizeof(tablist) / sizeof(tablist[0]);
+static void
+makebuiltintables(Par par)
+ LL2("makebuiltintables");
+ resetcslist();
+ tabcount = 3;
+ if (tablist == 0) {
+ tablist = new const Tab* [tabcount];
+ for (unsigned j = 0; j < tabcount; j++) {
+ tablist[j] = 0;
+ }
+ } else {
+ for (unsigned j = 0; j < tabcount; j++) {
+ delete tablist[j];
+ tablist[j] = 0;
+ }
+ }
+ // ti0 - basic
+ if (usetable(par, 0)) {
+ const Tab* t = new Tab("ti0", 5, 5, 0);
+ // name - pk - type - length - nullable - cs
+ t->m_col[0] = new Col(*t, 0, "a", 1, NdbDictionary::Column::Unsigned, 1, 0, 0);
+ t->m_col[1] = new Col(*t, 1, "b", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
+ t->m_col[2] = new Col(*t, 2, "c", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
+ t->m_col[3] = new Col(*t, 3, "d", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
+ t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
+ if (useindex(par, 0)) {
+ // a
+ const ITab* x = t->m_itab[0] = new ITab(*t, "ti0x0", 1);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
+ }
+ if (useindex(par, 1)) {
+ // b
+ const ITab* x = t->m_itab[1] = new ITab(*t, "ti0x1", 1);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ }
+ if (useindex(par, 2)) {
+ // b, c
+ const ITab* x = t->m_itab[2] = new ITab(*t, "ti0x2", 2);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
+ }
+ if (useindex(par, 3)) {
+ // d, c, b
+ const ITab* x = t->m_itab[3] = new ITab(*t, "ti0x3", 3);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[3]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
+ x->m_icol[2] = new ICol(*x, 2, *t->m_col[1]);
+ }
+ if (useindex(par, 4)) {
+ // b, e, c, d
+ const ITab* x = t->m_itab[4] = new ITab(*t, "ti0x4", 4);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[4]);
+ x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]);
+ x->m_icol[3] = new ICol(*x, 3, *t->m_col[3]);
+ }
+ tablist[0] = t;
+ }
+ // ti1 - simple char fields
+ if (usetable(par, 1)) {
+ const Tab* t = new Tab("ti1", 5, 5, 1);
+ // name - pk - type - length - nullable - cs
+ t->m_col[0] = new Col(*t, 0, "a", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
+ t->m_col[1] = new Col(*t, 1, "b", 1, NdbDictionary::Column::Unsigned, 1, 0, 0);
+ t->m_col[2] = new Col(*t, 2, "c", 0, NdbDictionary::Column::Char, 20, 1, getcs(par));
+ t->m_col[3] = new Col(*t, 3, "d", 0, NdbDictionary::Column::Char, 5, 1, getcs(par));
+ t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Char, 5, 1, getcs(par));
+ if (useindex(par, 0)) {
+ // b
+ const ITab* x = t->m_itab[0] = new ITab(*t, "ti1x0", 1);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ }
+ if (useindex(par, 1)) {
+ // a, c
+ const ITab* x = t->m_itab[1] = new ITab(*t, "ti1x1", 2);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
+ }
+ if (useindex(par, 2)) {
+ // c, a
+ const ITab* x = t->m_itab[2] = new ITab(*t, "ti1x2", 2);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[2]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[0]);
+ }
+ if (useindex(par, 3)) {
+ // e
+ const ITab* x = t->m_itab[3] = new ITab(*t, "ti1x3", 1);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]);
+ }
+ if (useindex(par, 4)) {
+ // e, d, c, b
+ const ITab* x = t->m_itab[4] = new ITab(*t, "ti1x4", 4);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[3]);
+ x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]);
+ x->m_icol[3] = new ICol(*x, 3, *t->m_col[1]);
+ }
+ tablist[1] = t;
+ }
+ // ti2 - complex char fields
+ if (usetable(par, 2)) {
+ const Tab* t = new Tab("ti2", 5, 5, 2);
+ // name - pk - type - length - nullable - cs
+ t->m_col[0] = new Col(*t, 0, "a", 1, NdbDictionary::Column::Char, 101, 0, getcs(par));
+ t->m_col[1] = new Col(*t, 1, "b", 0, NdbDictionary::Column::Char, 4, 1, getcs(par));
+ t->m_col[2] = new Col(*t, 2, "c", 1, NdbDictionary::Column::Unsigned, 1, 0, 0);
+ t->m_col[3] = new Col(*t, 3, "d", 1, NdbDictionary::Column::Char, 3, 0, getcs(par));
+ t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Char, 101, 0, getcs(par));
+ if (useindex(par, 0)) {
+ // a, c, d
+ const ITab* x = t->m_itab[0] = new ITab(*t, "ti2x0", 3);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
+ x->m_icol[2] = new ICol(*x, 2, *t->m_col[3]);
+ }
+ if (useindex(par, 1)) {
+ // e, d, c, b, a
+ const ITab* x = t->m_itab[1] = new ITab(*t, "ti2x1", 5);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[3]);
+ x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]);
+ x->m_icol[3] = new ICol(*x, 3, *t->m_col[1]);
+ x->m_icol[4] = new ICol(*x, 4, *t->m_col[0]);
+ }
+ if (useindex(par, 2)) {
+ // d
+ const ITab* x = t->m_itab[2] = new ITab(*t, "ti2x2", 1);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[3]);
+ }
+ if (useindex(par, 3)) {
+ // b
+ const ITab* x = t->m_itab[3] = new ITab(*t, "ti2x3", 1);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ }
+ if (useindex(par, 4)) {
+ // a, e
+ const ITab* x = t->m_itab[4] = new ITab(*t, "ti2x4", 2);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[4]);
+ }
+ tablist[2] = t;
+ }
+ verifytables();
// connections
@@ -833,7 +1100,7 @@ Con::execute(ExecType t, bool& deadlock)
Con::openScanRead(unsigned scanbat, unsigned scanpar)
- assert(m_tx != 0 && m_op != 0);
+ assert(m_tx != 0 && m_scanop != 0);
NdbOperation::LockMode lm = NdbOperation::LM_Read;
CHKCON(m_scanop->readTuples(lm, scanbat, scanpar) == 0, *this);
return 0;
@@ -842,7 +1109,7 @@ Con::openScanRead(unsigned scanbat, unsigned scanpar)
Con::openScanExclusive(unsigned scanbat, unsigned scanpar)
- assert(m_tx != 0 && m_op != 0);
+ assert(m_tx != 0 && m_scanop != 0);
NdbOperation::LockMode lm = NdbOperation::LM_Exclusive;
CHKCON(m_scanop->readTuples(lm, scanbat, scanpar) == 0, *this);
return 0;
@@ -935,7 +1202,7 @@ Con::printerror(NdbOut& out)
if ((code = m_tx->getNdbError().code) != 0) {
LL0(++any << " con: error " << m_tx->getNdbError());
die += (code == g_opt.m_die);
- if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499)
+ if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499 || code == 631)
m_errtype = ErrDeadlock;
if (m_op && m_op->getNdbError().code != 0) {
@@ -971,9 +1238,9 @@ invalidateindex(Par par)
Con& con = par.con();
const Tab& tab =;
for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (! useindex(i))
+ if (tab.m_itab[i] == 0)
- const ITab& itab = tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
invalidateindex(par, itab);
return 0;
@@ -1021,16 +1288,14 @@ createtable(Par par)
for (unsigned k = 0; k < tab.m_cols; k++) {
- const Col& col = tab.m_col[k];
+ const Col& col = *tab.m_col[k];
NdbDictionary::Column c(col.m_name);
- c.setLength(col.m_length);
+ c.setLength(col.m_bytelength); // NDB API uses length in bytes
- if (c.getCharset()) { // test if char type
- if (! col.m_pk)
- c.setCharset(par.m_cs);
- }
+ if (col.m_chs != 0)
+ c.setCharset(col.m_chs->m_cs);
con.m_dic = con.m_ndb->getDictionary();
@@ -1061,9 +1326,9 @@ dropindex(Par par)
const Tab& tab =;
for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (! useindex(i))
+ if (tab.m_itab[i] == 0)
- const ITab& itab = tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
CHK(dropindex(par, itab) == 0);
return 0;
@@ -1081,7 +1346,8 @@ createindex(Par par, const ITab& itab)
for (unsigned k = 0; k < itab.m_icols; k++) {
- const Col& col = itab.m_icol[k].m_col;
+ const ICol& icol = *itab.m_icol[k];
+ const Col& col = icol.m_col;
con.m_dic = con.m_ndb->getDictionary();
@@ -1095,9 +1361,9 @@ createindex(Par par)
const Tab& tab =;
for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (! useindex(i))
+ if (tab.m_itab[i] == 0)
- const ITab& itab = tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
CHK(createindex(par, itab) == 0);
return 0;
@@ -1105,33 +1371,13 @@ createindex(Par par)
// data sets
-static unsigned
-urandom(unsigned n)
- if (n == 0)
- return 0;
- unsigned i = random() % n;
- return i;
-static int
-irandom(unsigned n)
- if (n == 0)
- return 0;
- int i = random() % n;
- if (random() & 0x1)
- i = -i;
- return i;
// Val - typed column value
struct Val {
const Col& m_col;
union {
Uint32 m_uint32;
- char* m_varchar;
+ unsigned char* m_char;
Val(const Col& col);
@@ -1141,6 +1387,8 @@ struct Val {
bool m_null;
int setval(Par par) const;
void calc(Par par, unsigned i);
+ void calckey(Par par, unsigned i);
+ void calcnokey(Par par);
int verify(const Val& val2) const;
int cmp(const Val& val2) const;
@@ -1156,8 +1404,8 @@ Val::Val(const Col& col) :
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
- case NdbDictionary::Column::Varchar:
- m_varchar = new char [2 + col.m_length];
+ case NdbDictionary::Column::Char:
+ m_char = new unsigned char [col.m_bytelength];
@@ -1171,8 +1419,8 @@ Val::~Val()
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
- case NdbDictionary::Column::Varchar:
- delete [] m_varchar;
+ case NdbDictionary::Column::Char:
+ delete [] m_char;
@@ -1201,8 +1449,8 @@ Val::copy(const void* addr)
case NdbDictionary::Column::Unsigned:
m_uint32 = *(const Uint32*)addr;
- case NdbDictionary::Column::Varchar:
- memcpy(m_varchar, addr, 2 + col.m_length);
+ case NdbDictionary::Column::Char:
+ memcpy(m_char, addr, col.m_bytelength);
@@ -1218,8 +1466,8 @@ Val::dataaddr() const
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
return &m_uint32;
- case NdbDictionary::Column::Varchar:
- return m_varchar;
+ case NdbDictionary::Column::Char:
+ return m_char;
@@ -1235,11 +1483,11 @@ Val::setval(Par par) const
const char* addr = (const char*)dataaddr();
if (m_null)
addr = 0;
+ LL5("setval [" << m_col << "] " << *this);
if (col.m_pk)
CHK(con.equal(col.m_num, addr) == 0);
CHK(con.setValue(col.m_num, addr) == 0);
- LL5("setval [" << m_col << "] " << *this);
return 0;
@@ -1247,43 +1495,93 @@ void
Val::calc(Par par, unsigned i)
const Col& col = m_col;
+ col.m_pk ? calckey(par, i) : calcnokey(par);
+ if (! m_null)
+ col.verify(dataaddr());
+Val::calckey(Par par, unsigned i)
+ const Col& col = m_col;
m_null = false;
- if (col.m_pk) {
+ switch (col.m_type) {
+ case NdbDictionary::Column::Unsigned:
m_uint32 = i;
- return;
+ break;
+ case NdbDictionary::Column::Char:
+ {
+ const Chs* chs = col.m_chs;
+ CHARSET_INFO* cs = chs->m_cs;
+ unsigned n = 0;
+ // our random chars may not fill value exactly
+ while (n + cs->mbmaxlen <= col.m_bytelength) {
+ if (i % (1 + n) == 0) {
+ break;
+ }
+ const Chr& chr = chs->m_chr[i % maxcharcount];
+ memcpy(&m_char[n], chr.m_bytes, chr.m_size);
+ n += chr.m_size;
+ }
+ // this will extend by appropriate space
+ (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
+ }
+ break;
+ default:
+ assert(false);
+ break;
+Val::calcnokey(Par par)
+ const Col& col = m_col;
+ m_null = false;
if (col.m_nullable && urandom(100) < par.m_pctnull) {
m_null = true;
- unsigned v = par.m_range + irandom((par.m_pctrange * par.m_range) / 100);
+ int r = irandom((par.m_pctrange * par.m_range) / 100);
+ if (par.m_bdir != 0 && urandom(10) != 0) {
+ if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0)
+ r = -r;
+ }
+ unsigned v = par.m_range + r;
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
m_uint32 = v;
- case NdbDictionary::Column::Varchar:
+ case NdbDictionary::Column::Char:
+ const Chs* chs = col.m_chs;
+ CHARSET_INFO* cs = chs->m_cs;
unsigned n = 0;
- while (n < col.m_length) {
- if (urandom(1 + col.m_length) == 0) {
- // nice distribution on lengths
+ // our random chars may not fill value exactly
+ while (n + cs->mbmaxlen <= col.m_bytelength) {
+ if (urandom(1 + col.m_bytelength) == 0) {
- m_varchar[2 + n++] = 'a' + urandom((par.m_pctrange * 10) / 100);
- }
- m_varchar[0] = (n >> 8);
- m_varchar[1] = (n & 0xff);
- while (n < col.m_length) {
- m_varchar[2 + n++] = 0;
+ unsigned half = maxcharcount / 2;
+ int r = irandom((par.m_pctrange * half) / 100);
+ if (par.m_bdir != 0 && urandom(10) != 0) {
+ if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0)
+ r = -r;
+ }
+ unsigned i = half + r;
+ assert(i < maxcharcount);
+ const Chr& chr = chs->m_chr[i];
+ memcpy(&m_char[n], chr.m_bytes, chr.m_size);
+ n += chr.m_size;
+ // this will extend by appropriate space
+ (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
- // verify format
- col.verify(dataaddr());
@@ -1298,7 +1596,7 @@ Val::cmp(const Val& val2) const
const Col& col = m_col;
const Col& col2 = val2.m_col;
- assert(col.m_type == col2.m_type && col.m_length == col2.m_length);
+ assert(col.equal(col2));
if (m_null || val2.m_null) {
if (! m_null)
return +1;
@@ -1312,13 +1610,37 @@ Val::cmp(const Val& val2) const
// compare
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
- if (m_uint32 < val2.m_uint32)
- return -1;
- if (m_uint32 > val2.m_uint32)
- return +1;
- return 0;
- case NdbDictionary::Column::Varchar:
- return memcmp(&m_varchar[2], &val2.m_varchar[2], col.m_length);
+ {
+ if (m_uint32 < val2.m_uint32)
+ return -1;
+ if (m_uint32 > val2.m_uint32)
+ return +1;
+ return 0;
+ }
+ break;
+ case NdbDictionary::Column::Char:
+ {
+ const Chs* chs = col.m_chs;
+ CHARSET_INFO* cs = chs->m_cs;
+ unsigned len = col.m_bytelength;
+ int k;
+ if (! g_opt.m_collsp) {
+ unsigned char x1[maxxmulsize * 8000];
+ unsigned char x2[maxxmulsize * 8000];
+ int n1 = (*cs->coll->strnxfrm)(cs, x1, chs->m_xmul * len, m_char, len);
+ int n2 = (*cs->coll->strnxfrm)(cs, x2, chs->m_xmul * len, val2.m_char, len);
+ // currently same but do not assume it
+ unsigned n = (n1 > n2 ? n1 : n2);
+ // assume null padding
+ memset(x1 + n1, 0x0, n - n1);
+ memset(x2 + n2, 0x0, n - n2);
+ k = memcmp(x1, x2, n);
+ } else {
+ k = (*cs->coll->strnncollsp)(cs, m_char, len, val2.m_char, len, false);
+ }
+ return k < 0 ? -1 : k > 0 ? +1 : 0;
+ }
+ break;
@@ -1338,12 +1660,26 @@ operator<<(NdbOut& out, const Val& val)
case NdbDictionary::Column::Unsigned:
out << val.m_uint32;
- case NdbDictionary::Column::Varchar:
+ case NdbDictionary::Column::Char:
- char buf[8000];
- unsigned n = (val.m_varchar[0] << 8) | val.m_varchar[1];
- assert(n <= col.m_length);
- sprintf(buf, "'%.*s'[%d]", n, &val.m_varchar[2], n);
+ char buf[4 * 8000];
+ char *p = buf;
+ *p++ = '[';
+ for (unsigned i = 0; i < col.m_bytelength; i++) {
+ unsigned char c = val.m_char[i];
+ if (c == '\\') {
+ *p++ = '\\';
+ *p++ = '\\';
+ } else if (0x20 <= c && c < 0x7e) {
+ *p++ = c;
+ } else {
+ *p++ = '\\';
+ *p++ = hexstr[c >> 4];
+ *p++ = hexstr[c & 15];
+ }
+ }
+ *p++ = ']';
+ *p = 0;
out << buf;
@@ -1383,7 +1719,7 @@ Row::Row(const Tab& tab) :
m_val = new Val* [tab.m_cols];
for (unsigned k = 0; k < tab.m_cols; k++) {
- const Col& col = tab.m_col[k];
+ const Col& col = *tab.m_col[k];
m_val[k] = new Val(col);
m_exist = false;
@@ -1460,6 +1796,16 @@ Row::updrow(Par par)
CHKCON(con.m_op->updateTuple() == 0, con);
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
+ const Col& col = val.m_col;
+ if (! col.m_pk)
+ continue;
+ CHK(val.setval(par) == 0);
+ }
+ for (unsigned k = 0; k < tab.m_cols; k++) {
+ const Val& val = *m_val[k];
+ const Col& col = val.m_col;
+ if (col.m_pk)
+ continue;
CHK(val.setval(par) == 0);
m_pending = UpdOp;
@@ -1667,14 +2013,33 @@ Set::pending(unsigned i) const
+Set::notpending(unsigned i)
+ assert(m_row[i] != 0);
+ Row& row = *m_row[i];
+ if (row.m_pending == Row::InsOp)
+ row.m_exist = true;
+ if (row.m_pending == Row::DelOp)
+ row.m_exist = false;
+ row.m_pending = Row::NoOp;
+Set::notpending(const Lst& lst)
+ for (unsigned j = 0; j < lst.m_cnt; j++) {
+ unsigned i = lst.m_arr[j];
+ notpending(i);
+ }
Set::calc(Par par, unsigned i)
const Tab& tab = m_tab;
if (m_row[i] == 0)
m_row[i] = new Row(tab);
Row& row = *m_row[i];
- // value generation parameters
- par.m_pctrange = 40;
row.calc(par, i);
@@ -1738,9 +2103,11 @@ Set::getval(Par par)
Set::getkey(Par par, unsigned* i)
- assert(m_rec[0] != 0);
- const char* aRef0 = m_rec[0]->aRef();
- Uint32 key = *(const Uint32*)aRef0;
+ const Tab& tab = m_tab;
+ unsigned k = tab.m_keycol;
+ assert(m_rec[k] != 0);
+ const char* aRef = m_rec[k]->aRef();
+ Uint32 key = *(const Uint32*)aRef;
CHK(key < m_rows);
*i = key;
return 0;
@@ -1771,32 +2138,12 @@ Set::putval(unsigned i, bool force)
return 0;
-Set::notpending(unsigned i)
- assert(m_row[i] != 0);
- Row& row = *m_row[i];
- if (row.m_pending == Row::InsOp)
- row.m_exist = true;
- if (row.m_pending == Row::DelOp)
- row.m_exist = false;
- row.m_pending = Row::NoOp;
-Set::notpending(const Lst& lst)
- for (unsigned j = 0; j < lst.m_cnt; j++) {
- unsigned i = lst.m_arr[j];
- notpending(i);
- }
Set::verify(const Set& set2) const
const Tab& tab = m_tab;
assert(&tab == &set2.m_tab && m_rows == set2.m_rows);
+ LL3("verify set1 count=" << count() << " vs set2 count=" << set2.count());
for (unsigned i = 0; i < m_rows; i++) {
CHK(exist(i) == set2.exist(i));
if (! exist(i))
@@ -1883,10 +2230,10 @@ operator<<(NdbOut& out, const BVal& bval)
const ICol& icol = bval.m_icol;
const Col& col = icol.m_col;
const Val& val = bval;
- out << "type " << bval.m_type;
- out << " icol " << icol.m_num;
- out << " col " << col.m_name << "(" << col.m_num << ")";
- out << " value " << val;
+ out << "type=" << bval.m_type;
+ out << " icol=" << icol.m_num;
+ out << " col=" << col.m_num << "," << col.m_name;
+ out << " value=" << val;
return out;
@@ -1938,12 +2285,15 @@ void
BSet::calc(Par par)
const ITab& itab = m_itab;
+ par.m_pctrange = par.m_pctbrange;
for (unsigned k = 0; k < itab.m_icols; k++) {
- const ICol& icol = itab.m_icol[k];
+ const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
for (unsigned i = 0; i <= 1; i++) {
- if (urandom(10) == 0)
+ if (m_bvals == 0 && urandom(100) == 0)
+ return;
+ if (m_bvals != 0 && urandom(3) == 0)
assert(m_bvals < m_alloc);
BVal& bval = *new BVal(icol);
@@ -1962,12 +2312,14 @@ BSet::calc(Par par)
bval.m_type = 4;
if (k + 1 < itab.m_icols)
bval.m_type = 4;
- // value generation parammeters
if (! g_compare_null)
par.m_pctnull = 0;
- par.m_pctrange = 50; // bit higher
+ if (bval.m_type == 0 || bval.m_type == 1)
+ par.m_bdir = -1;
+ if (bval.m_type == 2 || bval.m_type == 3)
+ par.m_bdir = +1;
do {
- bval.calc(par, 0);
+ bval.calcnokey(par);
if (i == 1) {
assert(m_bvals >= 2);
const BVal& bv1 = *m_bval[m_bvals - 2];
@@ -1989,7 +2341,7 @@ BSet::calcpk(Par par, unsigned i)
const ITab& itab = m_itab;
for (unsigned k = 0; k < itab.m_icols; k++) {
- const ICol& icol = itab.m_icol[k];
+ const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
assert(m_bvals < m_alloc);
@@ -2036,7 +2388,7 @@ BSet::filter(const Set& set, Set& set2) const
if (! g_store_null_key) {
bool ok1 = false;
for (unsigned k = 0; k < itab.m_icols; k++) {
- const ICol& icol = itab.m_icol[k];
+ const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
const Val& val = *row.m_val[col.m_num];
if (! val.m_null) {
@@ -2054,6 +2406,7 @@ BSet::filter(const Set& set, Set& set2) const
const Col& col = icol.m_col;
const Val& val = *row.m_val[col.m_num];
int ret = bval.cmp(val);
+ LL5("cmp: ret=" << ret << " " << bval << " vs " << val);
if (bval.m_type == 0)
ok2 = (ret <= 0);
else if (bval.m_type == 1)
@@ -2086,9 +2439,8 @@ operator<<(NdbOut& out, const BSet& bset)
out << "bounds=" << bset.m_bvals;
for (unsigned j = 0; j < bset.m_bvals; j++) {
- out << endl;
const BVal& bval = *bset.m_bval[j];
- out << "bound " << j << ": " << bval;
+ out << " [bound " << j << ": " << bval << "]";
return out;
@@ -2317,26 +2669,36 @@ scanreadtable(Par par)
const Set& set = par.set();
// expected
const Set& set1 = set;
- LL3((par.m_verify ? "scanverify " : "scanread ") << tab.m_name);
+ LL3("scanread " << tab.m_name << " verify=" << par.m_verify);
+ LL4("expect " << set.count() << " rows");
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(tab) == 0);
CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0);
CHK(con.executeScan() == 0);
+ unsigned n = 0;
+ bool deadlock = false;
while (1) {
int ret;
- CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
+ deadlock = par.m_deadlock;
+ CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
if (ret == 1)
+ if (deadlock) {
+ LL1("scanreadtable: stop on deadlock");
+ break;
+ }
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
CHK(set2.putval(i, false) == 0);
- LL4("row " << set2.count() << ": " << *set2.m_row[i]);
+ LL4("row " << n << ": " << *set2.m_row[i]);
+ n++;
if (par.m_verify)
CHK(set1.verify(set2) == 0);
+ LL3("scanread " << tab.m_name << " rows=" << n);
return 0;
@@ -2368,16 +2730,30 @@ scanreadtablefast(Par par, unsigned countcheck)
static int
-scanreadindex(Par par, const ITab& itab, const BSet& bset)
+scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
Con& con = par.con();
const Tab& tab =;
const Set& set = par.set();
- // expected
- Set set1(tab, set.m_rows);
- bset.filter(set, set1);
- LL3((par.m_verify ? "scanverify " : "scanread ") << itab.m_name << " bounds=" << bset.m_bvals);
+ Set set1(tab, set.m_rows);
+ if (calc) {
+ while (true) {
+ bset.calc(par);
+ bset.filter(set, set1);
+ unsigned n = set1.count();
+ // prefer proper subset
+ if (0 < n && n < set.m_rows)
+ break;
+ if (urandom(5) == 0)
+ break;
+ set1.reset();
+ }
+ } else {
+ bset.filter(set, set1);
+ }
+ LL3("scanread " << itab.m_name << " bounds=" << bset.m_bvals << " verify=" << par.m_verify);
+ LL4("expect " << set1.count() << " rows");
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(itab, tab) == 0);
@@ -2385,20 +2761,29 @@ scanreadindex(Par par, const ITab& itab, const BSet& bset)
CHK(bset.setbnd(par) == 0);
CHK(con.executeScan() == 0);
+ unsigned n = 0;
+ bool deadlock = false;
while (1) {
int ret;
- CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
+ deadlock = par.m_deadlock;
+ CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
if (ret == 1)
+ if (deadlock) {
+ LL1("scanreadindex: stop on deadlock");
+ break;
+ }
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
LL4("key " << i);
CHK(set2.putval(i, par.m_dups) == 0);
- LL4("row " << set2.count() << ": " << *set2.m_row[i]);
+ LL4("row " << n << ": " << *set2.m_row[i]);
+ n++;
if (par.m_verify)
CHK(set1.verify(set2) == 0);
+ LL3("scanread " << itab.m_name << " rows=" << n);
return 0;
@@ -2437,8 +2822,7 @@ scanreadindex(Par par, const ITab& itab)
const Tab& tab =;
for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows);
- bset.calc(par);
- CHK(scanreadindex(par, itab, bset) == 0);
+ CHK(scanreadindex(par, itab, bset, true) == 0);
return 0;
@@ -2448,9 +2832,9 @@ scanreadindex(Par par)
const Tab& tab =;
for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (! useindex(i))
+ if (tab.m_itab[i] == 0)
- const ITab& itab = tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
CHK(scanreadindex(par, itab) == 0);
return 0;
@@ -2480,7 +2864,7 @@ static int
timescanpkindex(Par par)
const Tab& tab =;
- const ITab& itab = tab.m_itab[0]; // 1st index is on PK
+ const ITab& itab = *tab.m_itab[0]; // 1st index is on PK
BSet bset(tab, itab, par.m_rows);
CHK(scanreadindexfast(par, itab, bset, par.m_totrows) == 0);
@@ -2504,7 +2888,7 @@ static int
timepkreadindex(Par par)
const Tab& tab =;
- const ITab& itab = tab.m_itab[0]; // 1st index is on PK
+ const ITab& itab = *tab.m_itab[0]; // 1st index is on PK
BSet bset(tab, itab, par.m_rows);
unsigned count = par.m_samples;
if (count == 0)
@@ -2574,7 +2958,12 @@ scanupdatetable(Par par)
if (lst.cnt() == par.m_batch) {
- CHK(con2.execute(Commit) == 0);
+ deadlock = par.m_deadlock;
+ CHK(con2.execute(Commit, deadlock) == 0);
+ if (deadlock) {
+ LL1("scanupdateindex: stop on deadlock");
+ goto out;
+ }
@@ -2585,7 +2974,12 @@ scanupdatetable(Par par)
CHK((ret = con.nextScanResult(false)) == 0 || ret == 1 || ret == 2);
if (ret == 2 && lst.cnt() != 0) {
- CHK(con2.execute(Commit) == 0);
+ deadlock = par.m_deadlock;
+ CHK(con2.execute(Commit, deadlock) == 0);
+ if (deadlock) {
+ LL1("scanupdateindex: stop on deadlock");
+ goto out;
+ }
@@ -2598,6 +2992,7 @@ scanupdatetable(Par par)
if (ret == 1)
LL3("scan update " << tab.m_name << " rows updated=" << count);
@@ -2658,7 +3053,12 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
if (lst.cnt() == par.m_batch) {
- CHK(con2.execute(Commit) == 0);
+ deadlock = par.m_deadlock;
+ CHK(con2.execute(Commit, deadlock) == 0);
+ if (deadlock) {
+ LL1("scanupdateindex: stop on deadlock");
+ goto out;
+ }
@@ -2669,7 +3069,12 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
CHK((ret = con.nextScanResult(false)) == 0 || ret == 1 || ret == 2);
if (ret == 2 && lst.cnt() != 0) {
- CHK(con2.execute(Commit) == 0);
+ deadlock = par.m_deadlock;
+ CHK(con2.execute(Commit, deadlock) == 0);
+ if (deadlock) {
+ LL1("scanupdateindex: stop on deadlock");
+ goto out;
+ }
@@ -2680,6 +3085,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
} while (ret == 0);
LL3("scan update " << itab.m_name << " rows updated=" << count);
@@ -2703,9 +3109,9 @@ scanupdateindex(Par par)
const Tab& tab =;
for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (! useindex(i))
+ if (tab.m_itab[i] == 0)
- const ITab& itab = tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
CHK(scanupdateindex(par, itab) == 0);
return 0;
@@ -2742,17 +3148,25 @@ readverifyfull(Par par)
CHK(scanreadtable(par) == 0);
else {
const Tab& tab =;
- unsigned i = par.m_no;
- if (i <= tab.m_itabs && useindex(i)) {
- const ITab& itab = tab.m_itab[i - 1];
+ unsigned i = par.m_no - 1;
+ if (i < tab.m_itabs && tab.m_itab[i] != 0) {
+ const ITab& itab = *tab.m_itab[i];
BSet bset(tab, itab, par.m_rows);
- CHK(scanreadindex(par, itab, bset) == 0);
+ CHK(scanreadindex(par, itab, bset, false) == 0);
return 0;
static int
+readverifyindex(Par par)
+ par.m_verify = true;
+ CHK(scanreadindex(par) == 0);
+ return 0;
+static int
pkops(Par par)
par.m_randomkey = true;
@@ -2785,6 +3199,7 @@ static int
pkupdatescanread(Par par)
par.m_dups = true;
+ par.m_deadlock = true;
unsigned sel = urandom(10);
if (sel < 5) {
CHK(pkupdate(par) == 0);
@@ -3073,6 +3488,24 @@ tbuild(Par par)
static int
+tindexscan(Par par)
+ RUNSTEP(par, droptable, ST);
+ RUNSTEP(par, createtable, ST);
+ RUNSTEP(par, invalidatetable, MT);
+ RUNSTEP(par, createindex, ST);
+ RUNSTEP(par, invalidateindex, MT);
+ RUNSTEP(par, pkinsert, MT);
+ RUNSTEP(par, readverifyfull, MT);
+ for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL4("subloop " << par.m_slno);
+ RUNSTEP(par, readverifyindex, MT);
+ }
+ return 0;
+static int
tpkops(Par par)
RUNSTEP(par, droptable, ST);
@@ -3187,6 +3620,10 @@ ttimemaint(Par par)
static int
ttimescan(Par par)
+ if ([0] == 0) {
+ LL1("ttimescan - no index 0, skipped");
+ return 0;
+ }
Tmr t1, t2;
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
@@ -3209,6 +3646,10 @@ ttimescan(Par par)
static int
ttimepkread(Par par)
+ if ([0] == 0) {
+ LL1("ttimescan - no index 0, skipped");
+ return 0;
+ }
Tmr t1, t2;
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
@@ -3249,10 +3690,11 @@ struct TCase {
static const TCase
tcaselist[] = {
TCase("a", tbuild, "index build"),
- TCase("b", tpkops, "pk operations"),
- TCase("c", tpkopsread, "pk operations and scan reads"),
- TCase("d", tmixedops, "pk operations and scan operations"),
- TCase("e", tbusybuild, "pk operations and index build"),
+ TCase("b", tindexscan, "index scans"),
+ TCase("c", tpkops, "pk operations"),
+ TCase("d", tpkopsread, "pk operations and scan reads"),
+ TCase("e", tmixedops, "pk operations and scan operations"),
+ TCase("f", tbusybuild, "pk operations and index build"),
TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"),
@@ -3276,12 +3718,16 @@ printcases()
static void
- ndbout << "tables and indexes (X1 is on table PK):" << endl;
+ Par par(g_opt);
+ makebuiltintables(par);
+ ndbout << "builtin tables (index x0 is on table pk):" << endl;
for (unsigned j = 0; j < tabcount; j++) {
- const Tab& tab = tablist[j];
+ if (tablist[j] == 0)
+ continue;
+ const Tab& tab = *tablist[j];
ndbout << " " << tab.m_name;
for (unsigned i = 0; i < tab.m_itabs; i++) {
- const ITab& itab = tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
ndbout << " " << itab.m_name;
ndbout << endl;
@@ -3292,15 +3738,25 @@ static int
runtest(Par par)
- if (par.m_seed != 0)
+ if (par.m_seed == -1) {
+ // good enough for daily run
+ unsigned short seed = (getpid() ^ time(0));
+ LL1("random seed: " << seed);
+ srandom((unsigned)seed);
+ } else if (par.m_seed != 0)
+ // cs
assert(par.m_csname != 0);
- CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0);
- par.m_cs = cs;
+ if (strcmp(par.m_csname, "random") != 0) {
+ CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0);
+ par.m_cs = cs;
+ }
+ // con
Con con;
CHK(con.connect() == 0);
par.m_con = &con;
+ // threads
g_thrlist = new Thr* [par.m_threads];
unsigned n;
for (n = 0; n < par.m_threads; n++) {
@@ -3319,16 +3775,18 @@ runtest(Par par)
const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
+ makebuiltintables(par);
LL1("case " << tcase.m_name << " - " << tcase.m_desc);
for (unsigned j = 0; j < tabcount; j++) {
- if (! usetable(j))
+ if (tablist[j] == 0)
- const Tab& tab = tablist[j];
+ const Tab& tab = *tablist[j];
par.m_tab = &tab;
- delete par.m_set;
par.m_set = new Set(tab, par.m_totrows);
LL1("table " << tab.m_name);
CHK(tcase.m_func(par) == 0);
+ delete par.m_set;
+ par.m_set = 0;
@@ -3352,7 +3810,7 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
if (ndbout_mutex == NULL)
- ndbout_mutex= NdbMutex_Create();
+ ndbout_mutex = NdbMutex_Create();
while (++argv, --argc > 0) {
const char* arg = argv[0];
if (*arg != '-') {
@@ -3380,6 +3838,10 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
+ if (strcmp(arg, "-collsp") == 0) {
+ g_opt.m_collsp = true;
+ continue;
+ }
if (strcmp(arg, "-core") == 0) {
g_opt.m_core = true;
@@ -3516,7 +3978,6 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
if (runtest(par) < 0)
goto failed;
- // always exit with NDBT code
return NDBT_ProgramExit(NDBT_OK);