diff options
-rw-r--r-- | mysql-test/ndb/ndb_range_bounds.pl | 6 | ||||
-rw-r--r-- | mysql-test/r/ndb_index_ordered.result | 1 | ||||
-rw-r--r-- | mysql-test/t/ndb_index_ordered.test | 2 | ||||
-rw-r--r-- | ndb/test/ndbapi/testOIBasic.cpp | 1217 |
4 files changed, 961 insertions, 265 deletions
diff --git a/mysql-test/ndb/ndb_range_bounds.pl b/mysql-test/ndb/ndb_range_bounds.pl index 3b1844495b3..abe1ea28298 100644 --- a/mysql-test/ndb/ndb_range_bounds.pl +++ b/mysql-test/ndb/ndb_range_bounds.pl @@ -1,6 +1,7 @@ # # test range scan bounds # give option --all to test all cases +# set MYSQL_HOME to installation top # use strict; @@ -14,8 +15,9 @@ my $opt_verbose = 0; GetOptions("all" => \$opt_all, "cnt=i" => \$opt_cnt, "verbose" => \$opt_verbose) or die "options are: --all --cnt=N --verbose"; -my $mysql_top = $ENV{MYSQL_TOP}; -my $dsn = "dbi:mysql:database=test;host=localhost;mysql_read_default_file=$mysql_top/.target/var/my.cnf"; +my $mysql_home = $ENV{MYSQL_HOME}; +defined($mysql_home) or die "no MYSQL_HOME"; +my $dsn = "dbi:mysql:database=test;host=localhost;mysql_read_default_file=$mysql_home/var/my.cnf"; my $opts = { RaiseError => 0, PrintError => 0, AutoCommit => 1, }; my $dbh; diff --git a/mysql-test/r/ndb_index_ordered.result b/mysql-test/r/ndb_index_ordered.result index 00e3fbc5827..1cf2a97a6b3 100644 --- a/mysql-test/r/ndb_index_ordered.result +++ b/mysql-test/r/ndb_index_ordered.result @@ -383,6 +383,7 @@ b c select min(b), max(b) from t1; min(b) max(b) 1 5000000 +drop table t1; CREATE TABLE test1 ( SubscrID int(11) NOT NULL auto_increment, UsrID int(11) NOT NULL default '0', diff --git a/mysql-test/t/ndb_index_ordered.test b/mysql-test/t/ndb_index_ordered.test index 783ce99e739..42325e25ea3 100644 --- a/mysql-test/t/ndb_index_ordered.test +++ b/mysql-test/t/ndb_index_ordered.test @@ -175,6 +175,8 @@ select b, c from t1 where 1000<=b and b<=100000 and c<'j' order by b, c; select b, c from t1 where 1000<=b and b<=100000 and c<'j' order by b desc, c desc; # select min(b), max(b) from t1; +# +drop table t1; # # Bug #6435 diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp index 7a73ba872b2..e3cc3d0d90c 100644 --- a/ndb/test/ndbapi/testOIBasic.cpp +++ b/ndb/test/ndbapi/testOIBasic.cpp @@ -72,7 +72,7 @@ struct Opt { m_die(0), m_dups(false), m_fragtype(NdbDictionary::Object::FragUndefined), - m_subsubloop(4), + m_subsubloop(2), m_index(0), m_loop(1), m_msglock(true), @@ -257,6 +257,9 @@ struct Par : public Opt { bool m_verify; // deadlock possible bool m_deadlock; + // ordered range scan + bool m_ordered; + bool m_descending; // timer location Par(const Opt& opt) : Opt(opt), @@ -274,7 +277,9 @@ struct Par : public Opt { m_bdir(0), m_randomkey(false), m_verify(false), - m_deadlock(false) { + m_deadlock(false), + m_ordered(false), + m_descending(false) { } }; @@ -444,6 +449,9 @@ struct Chs { ~Chs(); }; +static NdbOut& +operator<<(NdbOut& out, const Chs& chs); + Chs::Chs(CHARSET_INFO* cs) : m_cs(cs) { @@ -455,10 +463,14 @@ Chs::Chs(CHARSET_INFO* cs) : unsigned i = 0; unsigned miss1 = 0; unsigned miss2 = 0; + unsigned miss3 = 0; + unsigned miss4 = 0; while (i < maxcharcount) { unsigned char* bytes = m_chr[i].m_bytes; unsigned char* xbytes = m_chr[i].m_xbytes; - unsigned size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1); + unsigned& size = m_chr[i].m_size; + bool ok; + size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1); assert(m_cs->mbminlen <= size && size <= m_cs->mbmaxlen); // prefer longer chars if (size == m_cs->mbminlen && m_cs->mbminlen < m_cs->mbmaxlen && urandom(5) != 0) @@ -466,33 +478,57 @@ Chs::Chs(CHARSET_INFO* cs) : for (unsigned j = 0; j < size; j++) { bytes[j] = urandom(256); } + // check wellformed const char* sbytes = (const char*)bytes; if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + size, 1) != size) { miss1++; continue; } - // do not trust well_formed_len currently + // check no proper prefix wellformed + ok = true; + for (unsigned j = 1; j < size; j++) { + if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + j, 1) == j) { + ok = false; + break; + } + } + if (! ok) { + miss2++; + continue; + } + // normalize memset(xbytes, 0, sizeof(xbytes)); // currently returns buffer size always int xlen = (*cs->coll->strnxfrm)(cs, xbytes, m_xmul * size, bytes, size); // check we got something - bool xok = false; + ok = false; for (unsigned j = 0; j < xlen; j++) { if (xbytes[j] != 0) { - xok = true; + ok = true; break; } } - if (! xok) { - miss2++; + if (! ok) { + miss3++; + continue; + } + // check for duplicate (before normalize) + ok = true; + for (unsigned j = 0; j < i; j++) { + const Chr& chr = m_chr[j]; + if (chr.m_size == size && memcmp(chr.m_bytes, bytes, size) == 0) { + ok = false; + break; + } + } + if (! ok) { + miss4++; continue; } - // occasional duplicate char is ok - m_chr[i].m_size = size; i++; } bool disorder = true; - unsigned bubbels = 0; + unsigned bubbles = 0; while (disorder) { disorder = false; for (unsigned i = 1; i < maxcharcount; i++) { @@ -502,11 +538,11 @@ Chs::Chs(CHARSET_INFO* cs) : m_chr[i] = m_chr[i-1]; m_chr[i-1] = chr; disorder = true; - bubbels++; + bubbles++; } } } - LL3("inited charset " << cs->name << " miss1=" << miss1 << " miss2=" << miss2 << " bubbels=" << bubbels); + LL3("inited charset " << *this << " miss=" << miss1 << "," << miss2 << "," << miss3 << "," << miss4 << " bubbles=" << bubbles); } Chs::~Chs() @@ -514,6 +550,14 @@ Chs::~Chs() delete [] m_chr; } +static NdbOut& +operator<<(NdbOut& out, const Chs& chs) +{ + CHARSET_INFO* cs = chs.m_cs; + out << cs->name << "[" << cs->mbminlen << "-" << cs->mbmaxlen << "," << chs.m_xmul << "]"; + return out; +} + static Chs* cslist[maxcsnumber]; static void @@ -552,22 +596,26 @@ getcs(Par par) // Col - table column struct Col { + enum Type { + Unsigned = NdbDictionary::Column::Unsigned, + Char = NdbDictionary::Column::Char + }; const class Tab& m_tab; unsigned m_num; const char* m_name; bool m_pk; - NdbDictionary::Column::Type m_type; + Type m_type; unsigned m_length; unsigned m_bytelength; bool m_nullable; const Chs* m_chs; - Col(const class Tab& tab, unsigned num, const char* name, bool pk, NdbDictionary::Column::Type type, unsigned length, bool nullable, const Chs* chs); + Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs); ~Col(); bool equal(const Col& col2) const; void verify(const void* addr) const; }; -Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, NdbDictionary::Column::Type type, unsigned length, bool nullable, const Chs* chs) : +Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs) : m_tab(tab), m_num(num), m_name(strcpy(new char [strlen(name) + 1], name)), @@ -595,9 +643,9 @@ void Col::verify(const void* addr) const { switch (m_type) { - case NdbDictionary::Column::Unsigned: + case Col::Unsigned: break; - case NdbDictionary::Column::Char: + case Col::Char: { CHARSET_INFO* cs = m_chs->m_cs; const char* src = (const char*)addr; @@ -616,10 +664,10 @@ operator<<(NdbOut& out, const Col& col) { out << "col[" << col.m_num << "] " << col.m_name; switch (col.m_type) { - case NdbDictionary::Column::Unsigned: + case Col::Unsigned: out << " unsigned"; break; - case NdbDictionary::Column::Char: + case Col::Char: { CHARSET_INFO* cs = col.m_chs->m_cs; out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")"; @@ -656,25 +704,41 @@ ICol::~ICol() { } +static NdbOut& +operator<<(NdbOut& out, const ICol& icol) +{ + out << "icol[" << icol.m_num << "] " << icol.m_col; + return out; +} + // ITab - index struct ITab { + enum Type { + OrderedIndex = NdbDictionary::Index::OrderedIndex, + UniqueHashIndex = NdbDictionary::Index::UniqueHashIndex + }; const class Tab& m_tab; const char* m_name; + Type m_type; unsigned m_icols; const ICol** m_icol; - ITab(const class Tab& tab, const char* name, unsigned icols); + unsigned m_colmask; + ITab(const class Tab& tab, const char* name, Type type, unsigned icols); ~ITab(); + void icoladd(unsigned k, const ICol* icolptr); }; -ITab::ITab(const class Tab& tab, const char* name, unsigned icols) : +ITab::ITab(const class Tab& tab, const char* name, Type type, unsigned icols) : m_tab(tab), m_name(strcpy(new char [strlen(name) + 1], name)), + m_type(type), m_icols(icols), - m_icol(new const ICol* [icols + 1]) + m_icol(new const ICol* [icols + 1]), + m_colmask(0) { - for (unsigned i = 0; i <= m_icols; i++) - m_icol[0] = 0; + for (unsigned k = 0; k <= m_icols; k++) + m_icol[k] = 0; } ITab::~ITab() @@ -685,13 +749,21 @@ ITab::~ITab() delete [] m_icol; } +void +ITab::icoladd(unsigned k, const ICol* icolptr) +{ + assert(k == icolptr->m_num && k < m_icols && m_icol[k] == 0); + m_icol[k] = icolptr; + m_colmask |= (1 << icolptr->m_col.m_num); +} + static NdbOut& operator<<(NdbOut& out, const ITab& itab) { out << "itab " << itab.m_name << " icols=" << itab.m_icols; for (unsigned k = 0; k < itab.m_icols; k++) { - out << endl; - out << "icol[" << k << "] " << itab.m_icol[k]->m_col; + const ICol& icol = *itab.m_icol[k]; + out << endl << icol; } return out; } @@ -706,6 +778,8 @@ struct Tab { const ITab** m_itab; // pk must contain an Unsigned column unsigned m_keycol; + void coladd(unsigned k, Col* colptr); + void itabadd(unsigned j, ITab* itab); Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol); ~Tab(); }; @@ -718,10 +792,10 @@ Tab::Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol) : m_itab(new const ITab* [itabs + 1]), m_keycol(keycol) { - for (unsigned i = 0; i <= cols; i++) - m_col[i] = 0; - for (unsigned i = 0; i <= itabs; i++) - m_itab[i] = 0; + for (unsigned k = 0; k <= cols; k++) + m_col[k] = 0; + for (unsigned j = 0; j <= itabs; j++) + m_itab[j] = 0; } Tab::~Tab() @@ -735,19 +809,33 @@ Tab::~Tab() delete [] m_itab; } +void +Tab::coladd(unsigned k, Col* colptr) +{ + assert(k == colptr->m_num && k < m_cols && m_col[k] == 0); + m_col[k] = colptr; +} + +void +Tab::itabadd(unsigned j, ITab* itabptr) +{ + assert(j < m_itabs && m_itab[j] == 0); + m_itab[j] = itabptr; +} + static NdbOut& operator<<(NdbOut& out, const Tab& tab) { out << "tab " << tab.m_name << " cols=" << tab.m_cols; for (unsigned k = 0; k < tab.m_cols; k++) { - out << endl; - out << *tab.m_col[k]; + const Col& col = *tab.m_col[k]; + out << endl << col; } for (unsigned i = 0; i < tab.m_itabs; i++) { if (tab.m_itab[i] == 0) continue; - out << endl; - out << *tab.m_itab[i]; + const ITab& itab = *tab.m_itab[i]; + out << endl << itab; } return out; } @@ -774,7 +862,7 @@ verifytables() { assert(t->m_keycol < t->m_cols); const Col* c = t->m_col[t->m_keycol]; - assert(c->m_pk && c->m_type == NdbDictionary::Column::Unsigned); + assert(c->m_pk && c->m_type == Col::Unsigned); } assert(t->m_itabs != 0 && t->m_itab != 0); for (unsigned i = 0; i < t->m_itabs; i++) { @@ -785,6 +873,9 @@ verifytables() for (unsigned k = 0; k < x->m_icols; k++) { const ICol* c = x->m_icol[k]; assert(c != 0 && c->m_num == k && c->m_col.m_num < t->m_cols); + if (x->m_type == ITab::UniqueHashIndex) { + assert(! c->m_col.m_nullable); + } } } assert(t->m_itab[t->m_itabs] == 0); @@ -810,127 +901,186 @@ makebuiltintables(Par par) } // ti0 - basic if (usetable(par, 0)) { - const Tab* t = new Tab("ti0", 5, 5, 0); + Tab* t = new Tab("ti0", 5, 7, 0); // name - pk - type - length - nullable - cs - t->m_col[0] = new Col(*t, 0, "a", 1, NdbDictionary::Column::Unsigned, 1, 0, 0); - t->m_col[1] = new Col(*t, 1, "b", 0, NdbDictionary::Column::Unsigned, 1, 1, 0); - t->m_col[2] = new Col(*t, 2, "c", 0, NdbDictionary::Column::Unsigned, 1, 1, 0); - t->m_col[3] = new Col(*t, 3, "d", 0, NdbDictionary::Column::Unsigned, 1, 1, 0); - t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Unsigned, 1, 1, 0); + t->coladd(0, new Col(*t, 0, "a", 1, Col::Unsigned, 1, 0, 0)); + t->coladd(1, new Col(*t, 1, "b", 0, Col::Unsigned, 1, 1, 0)); + t->coladd(2, new Col(*t, 2, "c", 0, Col::Unsigned, 1, 0, 0)); + t->coladd(3, new Col(*t, 3, "d", 0, Col::Unsigned, 1, 1, 0)); + t->coladd(4, new Col(*t, 4, "e", 0, Col::Unsigned, 1, 0, 0)); if (useindex(par, 0)) { // a - const ITab* x = t->m_itab[0] = new ITab(*t, "ti0x0", 1); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]); + ITab* x = new ITab(*t, "ti0x0", ITab::OrderedIndex, 1); + x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); + t->itabadd(0, x); } if (useindex(par, 1)) { // b - const ITab* x = t->m_itab[1] = new ITab(*t, "ti0x1", 1); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]); + ITab* x = new ITab(*t, "ti0x1", ITab::OrderedIndex, 1); + x->icoladd(0, new ICol(*x, 0, *t->m_col[1])); + t->itabadd(1, x); } if (useindex(par, 2)) { // b, c - const ITab* x = t->m_itab[2] = new ITab(*t, "ti0x2", 2); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]); - x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]); + ITab* x = new ITab(*t, "ti0x2", ITab::OrderedIndex, 2); + x->icoladd(0, new ICol(*x, 0, *t->m_col[1])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); + t->itabadd(2, x); } if (useindex(par, 3)) { // d, c, b - const ITab* x = t->m_itab[3] = new ITab(*t, "ti0x3", 3); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[3]); - x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]); - x->m_icol[2] = new ICol(*x, 2, *t->m_col[1]); + ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 3); + x->icoladd(0, new ICol(*x, 0, *t->m_col[3])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); + x->icoladd(2, new ICol(*x, 2, *t->m_col[1])); + t->itabadd(3, x); } if (useindex(par, 4)) { // b, e, c, d - const ITab* x = t->m_itab[4] = new ITab(*t, "ti0x4", 4); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]); - x->m_icol[1] = new ICol(*x, 1, *t->m_col[4]); - x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]); - x->m_icol[3] = new ICol(*x, 3, *t->m_col[3]); + ITab* x = new ITab(*t, "ti0x4", ITab::OrderedIndex, 4); + x->icoladd(0, new ICol(*x, 0, *t->m_col[1])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[4])); + x->icoladd(2, new ICol(*x, 2, *t->m_col[2])); + x->icoladd(3, new ICol(*x, 3, *t->m_col[3])); + t->itabadd(4, x); + } + if (useindex(par, 5)) { + // a, c + ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2); + x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); + t->itabadd(5, x); + } + if (useindex(par, 6)) { + // a, e + ITab* x = new ITab(*t, "ti0z6", ITab::UniqueHashIndex, 2); + x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[4])); + t->itabadd(6, x); } tablist[0] = t; } // ti1 - simple char fields if (usetable(par, 1)) { - const Tab* t = new Tab("ti1", 5, 5, 1); + Tab* t = new Tab("ti1", 5, 7, 1); // name - pk - type - length - nullable - cs - t->m_col[0] = new Col(*t, 0, "a", 0, NdbDictionary::Column::Unsigned, 1, 1, 0); - t->m_col[1] = new Col(*t, 1, "b", 1, NdbDictionary::Column::Unsigned, 1, 0, 0); - t->m_col[2] = new Col(*t, 2, "c", 0, NdbDictionary::Column::Char, 20, 1, getcs(par)); - t->m_col[3] = new Col(*t, 3, "d", 0, NdbDictionary::Column::Char, 5, 1, getcs(par)); - t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Char, 5, 1, getcs(par)); + t->coladd(0, new Col(*t, 0, "a", 0, Col::Unsigned, 1, 0, 0)); + t->coladd(1, new Col(*t, 1, "b", 1, Col::Unsigned, 1, 0, 0)); + t->coladd(2, new Col(*t, 2, "c", 0, Col::Char, 20, 1, getcs(par))); + t->coladd(3, new Col(*t, 3, "d", 0, Col::Char, 5, 0, getcs(par))); + t->coladd(4, new Col(*t, 4, "e", 0, Col::Char, 5, 1, getcs(par))); if (useindex(par, 0)) { // b - const ITab* x = t->m_itab[0] = new ITab(*t, "ti1x0", 1); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]); + ITab* x = new ITab(*t, "ti1x0", ITab::OrderedIndex, 1); + x->icoladd(0, new ICol(*x, 0, *t->m_col[1])); } if (useindex(par, 1)) { // a, c - const ITab* x = t->m_itab[1] = new ITab(*t, "ti1x1", 2); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]); - x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]); + ITab* x = new ITab(*t, "ti1x1", ITab::OrderedIndex, 2); + x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); + t->itabadd(1, x); } if (useindex(par, 2)) { // c, a - const ITab* x = t->m_itab[2] = new ITab(*t, "ti1x2", 2); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[2]); - x->m_icol[1] = new ICol(*x, 1, *t->m_col[0]); + ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 2); + x->icoladd(0, new ICol(*x, 0, *t->m_col[2])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[0])); + t->itabadd(2, x); } if (useindex(par, 3)) { // e - const ITab* x = t->m_itab[3] = new ITab(*t, "ti1x3", 1); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]); + ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 1); + x->icoladd(0, new ICol(*x, 0, *t->m_col[4])); + t->itabadd(3, x); } if (useindex(par, 4)) { // e, d, c, b - const ITab* x = t->m_itab[4] = new ITab(*t, "ti1x4", 4); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]); - x->m_icol[1] = new ICol(*x, 1, *t->m_col[3]); - x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]); - x->m_icol[3] = new ICol(*x, 3, *t->m_col[1]); + ITab* x = new ITab(*t, "ti1x4", ITab::OrderedIndex, 4); + x->icoladd(0, new ICol(*x, 0, *t->m_col[4])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[3])); + x->icoladd(2, new ICol(*x, 2, *t->m_col[2])); + x->icoladd(3, new ICol(*x, 3, *t->m_col[1])); + t->itabadd(4, x); + } + if (useindex(par, 5)) { + // a, b + ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 2); + x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[1])); + t->itabadd(5, x); + } + if (useindex(par, 6)) { + // a, b, d + ITab* x = new ITab(*t, "ti1z6", ITab::UniqueHashIndex, 3); + x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[1])); + x->icoladd(2, new ICol(*x, 2, *t->m_col[3])); + t->itabadd(6, x); } tablist[1] = t; } // ti2 - complex char fields if (usetable(par, 2)) { - const Tab* t = new Tab("ti2", 5, 5, 2); + Tab* t = new Tab("ti2", 5, 7, 2); // name - pk - type - length - nullable - cs - t->m_col[0] = new Col(*t, 0, "a", 1, NdbDictionary::Column::Char, 101, 0, getcs(par)); - t->m_col[1] = new Col(*t, 1, "b", 0, NdbDictionary::Column::Char, 4, 1, getcs(par)); - t->m_col[2] = new Col(*t, 2, "c", 1, NdbDictionary::Column::Unsigned, 1, 0, 0); - t->m_col[3] = new Col(*t, 3, "d", 1, NdbDictionary::Column::Char, 3, 0, getcs(par)); - t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Char, 101, 0, getcs(par)); + t->coladd(0, new Col(*t, 0, "a", 1, Col::Char, 31, 0, getcs(par))); + t->coladd(1, new Col(*t, 1, "b", 0, Col::Char, 4, 1, getcs(par))); + t->coladd(2, new Col(*t, 2, "c", 1, Col::Unsigned, 1, 0, 0)); + t->coladd(3, new Col(*t, 3, "d", 1, Col::Char, 3, 0, getcs(par))); + t->coladd(4, new Col(*t, 4, "e", 0, Col::Char, 17, 0, getcs(par))); if (useindex(par, 0)) { // a, c, d - const ITab* x = t->m_itab[0] = new ITab(*t, "ti2x0", 3); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]); - x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]); - x->m_icol[2] = new ICol(*x, 2, *t->m_col[3]); + ITab* x = new ITab(*t, "ti2x0", ITab::OrderedIndex, 3); + x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); + x->icoladd(2, new ICol(*x, 2, *t->m_col[3])); + t->itabadd(0, x); } if (useindex(par, 1)) { // e, d, c, b, a - const ITab* x = t->m_itab[1] = new ITab(*t, "ti2x1", 5); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]); - x->m_icol[1] = new ICol(*x, 1, *t->m_col[3]); - x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]); - x->m_icol[3] = new ICol(*x, 3, *t->m_col[1]); - x->m_icol[4] = new ICol(*x, 4, *t->m_col[0]); + ITab* x = new ITab(*t, "ti2x1", ITab::OrderedIndex, 5); + x->icoladd(0, new ICol(*x, 0, *t->m_col[4])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[3])); + x->icoladd(2, new ICol(*x, 2, *t->m_col[2])); + x->icoladd(3, new ICol(*x, 3, *t->m_col[1])); + x->icoladd(4, new ICol(*x, 4, *t->m_col[0])); + t->itabadd(1, x); } if (useindex(par, 2)) { // d - const ITab* x = t->m_itab[2] = new ITab(*t, "ti2x2", 1); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[3]); + ITab* x = new ITab(*t, "ti2x2", ITab::OrderedIndex, 1); + x->icoladd(0, new ICol(*x, 0, *t->m_col[3])); + t->itabadd(2, x); } if (useindex(par, 3)) { // b - const ITab* x = t->m_itab[3] = new ITab(*t, "ti2x3", 1); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]); + ITab* x = new ITab(*t, "ti2x3", ITab::OrderedIndex, 1); + x->icoladd(0, new ICol(*x, 0, *t->m_col[1])); + t->itabadd(3, x); } if (useindex(par, 4)) { // a, e - const ITab* x = t->m_itab[4] = new ITab(*t, "ti2x4", 2); - x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]); - x->m_icol[1] = new ICol(*x, 1, *t->m_col[4]); + ITab* x = new ITab(*t, "ti2x4", ITab::OrderedIndex, 2); + x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[4])); + t->itabadd(4, x); + } + if (useindex(par, 5)) { + // a, c + ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 2); + x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); + t->itabadd(5, x); + } + if (useindex(par, 6)) { + // a, c, d, e + ITab* x = new ITab(*t, "ti2z6", ITab::UniqueHashIndex, 4); + x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); + x->icoladd(2, new ICol(*x, 2, *t->m_col[3])); + x->icoladd(3, new ICol(*x, 3, *t->m_col[4])); + t->itabadd(6, x); } tablist[2] = t; } @@ -944,6 +1094,7 @@ struct Con { NdbDictionary::Dictionary* m_dic; NdbConnection* m_tx; NdbOperation* m_op; + NdbIndexOperation* m_indexop; NdbScanOperation* m_scanop; NdbIndexScanOperation* m_indexscanop; enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive }; @@ -951,7 +1102,7 @@ struct Con { enum ErrType { ErrNone = 0, ErrDeadlock, ErrOther }; ErrType m_errtype; Con() : - m_ndb(0), m_dic(0), m_tx(0), m_op(0), + m_ndb(0), m_dic(0), m_tx(0), m_op(0), m_indexop(0), m_scanop(0), m_indexscanop(0), m_scanmode(ScanNo), m_errtype(ErrNone) {} ~Con() { if (m_tx != 0) @@ -962,6 +1113,7 @@ struct Con { void disconnect(); int startTransaction(); int getNdbOperation(const Tab& tab); + int getNdbIndexOperation(const ITab& itab, const Tab& tab); int getNdbScanOperation(const Tab& tab); int getNdbScanOperation(const ITab& itab, const Tab& tab); int equal(int num, const char* addr); @@ -972,6 +1124,8 @@ struct Con { int execute(ExecType t, bool& deadlock); int openScanRead(unsigned scanbat, unsigned scanpar); int openScanExclusive(unsigned scanbat, unsigned scanpar); + int openScanOrdered(unsigned scanbat, unsigned scanpar, bool descending); + int openScanOrderedExclusive(unsigned scanbat, unsigned scanpar, bool descending); int executeScan(); int nextScanResult(bool fetchAllowed); int nextScanResult(bool fetchAllowed, bool& deadlock); @@ -1026,6 +1180,14 @@ Con::getNdbOperation(const Tab& tab) } int +Con::getNdbIndexOperation(const ITab& itab, const Tab& tab) +{ + assert(m_tx != 0); + CHKCON((m_op = m_indexop = m_tx->getNdbIndexOperation(itab.m_name, tab.m_name)) != 0, *this); + return 0; +} + +int Con::getNdbScanOperation(const Tab& tab) { assert(m_tx != 0); @@ -1116,6 +1278,25 @@ Con::openScanExclusive(unsigned scanbat, unsigned scanpar) } int +Con::openScanOrdered(unsigned scanbat, unsigned scanpar, bool descending) +{ + assert(m_tx != 0 && m_indexscanop != 0); + NdbOperation::LockMode lm = NdbOperation::LM_Read; + CHKCON(m_indexscanop->readTuples(lm, scanbat, scanpar, true, descending) == 0, *this); + return 0; +} + +int +Con::openScanOrderedExclusive(unsigned scanbat, unsigned scanpar, bool descending) +{ + assert(m_tx != 0 && m_indexscanop != 0); + NdbOperation::LockMode lm = NdbOperation::LM_Exclusive; + CHKCON(m_indexscanop->readTuples(lm, scanbat, scanpar, true, descending) == 0, *this); + return 0; +} + + +int Con::executeScan() { CHKCON(m_tx->execute(NoCommit) == 0, *this); @@ -1202,6 +1383,7 @@ Con::printerror(NdbOut& out) if ((code = m_tx->getNdbError().code) != 0) { LL0(++any << " con: error " << m_tx->getNdbError()); die += (code == g_opt.m_die); + // 631 is new, occurs only on 4 db nodes, needs to be checked out if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499 || code == 631) m_errtype = ErrDeadlock; } @@ -1290,7 +1472,7 @@ createtable(Par par) for (unsigned k = 0; k < tab.m_cols; k++) { const Col& col = *tab.m_col[k]; NdbDictionary::Column c(col.m_name); - c.setType(col.m_type); + c.setType((NdbDictionary::Column::Type)col.m_type); c.setLength(col.m_bytelength); // NDB API uses length in bytes c.setPrimaryKey(col.m_pk); c.setNullable(col.m_nullable); @@ -1343,8 +1525,10 @@ createindex(Par par, const ITab& itab) LL4(itab); NdbDictionary::Index x(itab.m_name); x.setTable(tab.m_name); - x.setType(NdbDictionary::Index::OrderedIndex); - x.setLogging(false); + x.setType((NdbDictionary::Index::Type)itab.m_type); + if (par.m_nologging || itab.m_type == ITab::OrderedIndex) { + x.setLogging(false); + } for (unsigned k = 0; k < itab.m_icols; k++) { const ICol& icol = *itab.m_icol[k]; const Col& col = icol.m_col; @@ -1385,6 +1569,8 @@ struct Val { void copy(const void* addr); const void* dataaddr() const; bool m_null; + int equal(Par par) const; + int equal(Par par, const ICol& icol) const; int setval(Par par) const; void calc(Par par, unsigned i); void calckey(Par par, unsigned i); @@ -1402,9 +1588,9 @@ Val::Val(const Col& col) : m_col(col) { switch (col.m_type) { - case NdbDictionary::Column::Unsigned: + case Col::Unsigned: break; - case NdbDictionary::Column::Char: + case Col::Char: m_char = new unsigned char [col.m_bytelength]; break; default: @@ -1417,9 +1603,9 @@ Val::~Val() { const Col& col = m_col; switch (col.m_type) { - case NdbDictionary::Column::Unsigned: + case Col::Unsigned: break; - case NdbDictionary::Column::Char: + case Col::Char: delete [] m_char; break; default: @@ -1446,10 +1632,10 @@ Val::copy(const void* addr) { const Col& col = m_col; switch (col.m_type) { - case NdbDictionary::Column::Unsigned: + case Col::Unsigned: m_uint32 = *(const Uint32*)addr; break; - case NdbDictionary::Column::Char: + case Col::Char: memcpy(m_char, addr, col.m_bytelength); break; default: @@ -1464,9 +1650,9 @@ Val::dataaddr() const { const Col& col = m_col; switch (col.m_type) { - case NdbDictionary::Column::Unsigned: + case Col::Unsigned: return &m_uint32; - case NdbDictionary::Column::Char: + case Col::Char: return m_char; default: break; @@ -1476,18 +1662,37 @@ Val::dataaddr() const } int -Val::setval(Par par) const +Val::equal(Par par) const { Con& con = par.con(); const Col& col = m_col; + assert(col.m_pk && ! m_null); const char* addr = (const char*)dataaddr(); - if (m_null) - addr = 0; - LL5("setval [" << m_col << "] " << *this); - if (col.m_pk) - CHK(con.equal(col.m_num, addr) == 0); - else - CHK(con.setValue(col.m_num, addr) == 0); + LL5("equal [" << col << "] " << *this); + CHK(con.equal(col.m_num, addr) == 0); + return 0; +} + +int +Val::equal(Par par, const ICol& icol) const +{ + Con& con = par.con(); + assert(! m_null); + const char* addr = (const char*)dataaddr(); + LL5("equal [" << icol << "] " << *this); + CHK(con.equal(icol.m_num, addr) == 0); + return 0; +} + +int +Val::setval(Par par) const +{ + Con& con = par.con(); + const Col& col = m_col; + assert(! col.m_pk); + const char* addr = ! m_null ? (const char*)dataaddr() : 0; + LL5("setval [" << col << "] " << *this); + CHK(con.setValue(col.m_num, addr) == 0); return 0; } @@ -1506,10 +1711,10 @@ Val::calckey(Par par, unsigned i) const Col& col = m_col; m_null = false; switch (col.m_type) { - case NdbDictionary::Column::Unsigned: + case Col::Unsigned: m_uint32 = i; break; - case NdbDictionary::Column::Char: + case Col::Char: { const Chs* chs = col.m_chs; CHARSET_INFO* cs = chs->m_cs; @@ -1549,10 +1754,10 @@ Val::calcnokey(Par par) } unsigned v = par.m_range + r; switch (col.m_type) { - case NdbDictionary::Column::Unsigned: + case Col::Unsigned: m_uint32 = v; break; - case NdbDictionary::Column::Char: + case Col::Char: { const Chs* chs = col.m_chs; CHARSET_INFO* cs = chs->m_cs; @@ -1609,7 +1814,7 @@ Val::cmp(const Val& val2) const col.verify(val2.dataaddr()); // compare switch (col.m_type) { - case NdbDictionary::Column::Unsigned: + case Col::Unsigned: { if (m_uint32 < val2.m_uint32) return -1; @@ -1618,7 +1823,7 @@ Val::cmp(const Val& val2) const return 0; } break; - case NdbDictionary::Column::Char: + case Col::Char: { const Chs* chs = col.m_chs; CHARSET_INFO* cs = chs->m_cs; @@ -1657,10 +1862,10 @@ operator<<(NdbOut& out, const Val& val) return out; } switch (col.m_type) { - case NdbDictionary::Column::Unsigned: + case Col::Unsigned: out << val.m_uint32; break; - case NdbDictionary::Column::Char: + case Col::Char: { char buf[4 * 8000]; char *p = buf; @@ -1697,19 +1902,25 @@ struct Row { const Tab& m_tab; Val** m_val; bool m_exist; - enum Op { NoOp = 0, ReadOp, InsOp, UpdOp, DelOp }; + enum Op { NoOp = 0, ReadOp = 1, InsOp = 2, UpdOp = 4, DelOp = 8, AnyOp = 15 }; Op m_pending; + Row* m_dbrow; // copy of db row before update Row(const Tab& tab); ~Row(); void copy(const Row& row2); - void calc(Par par, unsigned i); + void calc(Par par, unsigned i, unsigned mask = 0); + const Row& dbrow() const; int verify(const Row& row2) const; int insrow(Par par); int updrow(Par par); + int updrow(Par par, const ITab& itab); int delrow(Par par); + int delrow(Par par, const ITab& itab); int selrow(Par par); + int selrow(Par par, const ITab& itab); int setrow(Par par); int cmp(const Row& row2) const; + int cmp(const Row& row2, const ITab& itab) const; private: Row& operator=(const Row& row2); }; @@ -1724,6 +1935,7 @@ Row::Row(const Tab& tab) : } m_exist = false; m_pending = NoOp; + m_dbrow = 0; } Row::~Row() @@ -1733,6 +1945,7 @@ Row::~Row() delete m_val[k]; } delete [] m_val; + delete m_dbrow; } void @@ -1745,27 +1958,49 @@ Row::copy(const Row& row2) const Val& val2 = *row2.m_val[k]; val.copy(val2); } + m_exist = row2.m_exist; + m_pending = row2.m_pending; + if (row2.m_dbrow == 0) { + m_dbrow = 0; + } else { + assert(row2.m_dbrow->m_dbrow == 0); + if (m_dbrow == 0) + m_dbrow = new Row(tab); + m_dbrow->copy(*row2.m_dbrow); + } } void -Row::calc(Par par, unsigned i) +Row::calc(Par par, unsigned i, unsigned mask) { const Tab& tab = m_tab; for (unsigned k = 0; k < tab.m_cols; k++) { - Val& val = *m_val[k]; - val.calc(par, i); + if (! (mask & (1 << k))) { + Val& val = *m_val[k]; + val.calc(par, i); + } } } +const Row& +Row::dbrow() const +{ + if (m_dbrow == 0) + return *this; + assert(m_pending == Row::UpdOp || m_pending == Row::DelOp); + return *m_dbrow; +} + int Row::verify(const Row& row2) const { const Tab& tab = m_tab; - assert(&tab == &row2.m_tab && m_exist && row2.m_exist); + const Row& row1 = *this; + assert(&row1.m_tab == &row2.m_tab && row1.m_exist && row2.m_exist); for (unsigned k = 0; k < tab.m_cols; k++) { - const Val& val = *m_val[k]; + const Val& val1 = *row1.m_val[k]; const Val& val2 = *row2.m_val[k]; - CHK(val.verify(val2) == 0); + CHK(val1.verify(val2) == 0); } return 0; } @@ -1780,7 +2015,15 @@ Row::insrow(Par par) CHKCON(con.m_op->insertTuple() == 0, con); for (unsigned k = 0; k < tab.m_cols; k++) { const Val& val = *m_val[k]; - CHK(val.setval(par) == 0); + const Col& col = val.m_col; + if (col.m_pk) + CHK(val.equal(par) == 0); + } + for (unsigned k = 0; k < tab.m_cols; k++) { + const Val& val = *m_val[k]; + const Col& col = val.m_col; + if (! col.m_pk) + CHK(val.setval(par) == 0); } m_pending = InsOp; return 0; @@ -1797,16 +2040,40 @@ Row::updrow(Par par) for (unsigned k = 0; k < tab.m_cols; k++) { const Val& val = *m_val[k]; const Col& col = val.m_col; + if (col.m_pk) + CHK(val.equal(par) == 0); + } + for (unsigned k = 0; k < tab.m_cols; k++) { + const Val& val = *m_val[k]; + const Col& col = val.m_col; if (! col.m_pk) - continue; - CHK(val.setval(par) == 0); + CHK(val.setval(par) == 0); + } + m_pending = UpdOp; + return 0; +} + +int +Row::updrow(Par par, const ITab& itab) +{ + Con& con = par.con(); + const Tab& tab = m_tab; + assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab); + assert(m_exist); + CHK(con.getNdbIndexOperation(itab, tab) == 0); + CHKCON(con.m_op->updateTuple() == 0, con); + for (unsigned k = 0; k < itab.m_icols; k++) { + const ICol& icol = *itab.m_icol[k]; + const Col& col = icol.m_col; + unsigned m = col.m_num; + const Val& val = *m_val[m]; + CHK(val.equal(par, icol) == 0); } for (unsigned k = 0; k < tab.m_cols; k++) { const Val& val = *m_val[k]; const Col& col = val.m_col; - if (col.m_pk) - continue; - CHK(val.setval(par) == 0); + if (! col.m_pk) + CHK(val.setval(par) == 0); } m_pending = UpdOp; return 0; @@ -1824,7 +2091,27 @@ Row::delrow(Par par) const Val& val = *m_val[k]; const Col& col = val.m_col; if (col.m_pk) - CHK(val.setval(par) == 0); + CHK(val.equal(par) == 0); + } + m_pending = DelOp; + return 0; +} + +int +Row::delrow(Par par, const ITab& itab) +{ + Con& con = par.con(); + const Tab& tab = m_tab; + assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab); + assert(m_exist); + CHK(con.getNdbIndexOperation(itab, tab) == 0); + CHKCON(con.m_op->deleteTuple() == 0, con); + for (unsigned k = 0; k < itab.m_icols; k++) { + const ICol& icol = *itab.m_icol[k]; + const Col& col = icol.m_col; + unsigned m = col.m_num; + const Val& val = *m_val[m]; + CHK(val.equal(par, icol) == 0); } m_pending = DelOp; return 0; @@ -1841,7 +2128,25 @@ Row::selrow(Par par) const Val& val = *m_val[k]; const Col& col = val.m_col; if (col.m_pk) - CHK(val.setval(par) == 0); + CHK(val.equal(par) == 0); + } + return 0; +} + +int +Row::selrow(Par par, const ITab& itab) +{ + Con& con = par.con(); + const Tab& tab = m_tab; + assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab); + CHK(con.getNdbIndexOperation(itab, tab) == 0); + CHKCON(con.m_op->readTuple() == 0, con); + for (unsigned k = 0; k < itab.m_icols; k++) { + const ICol& icol = *itab.m_icol[k]; + const Col& col = icol.m_col; + unsigned m = col.m_num; + const Val& val = *m_val[m]; + CHK(val.equal(par, icol) == 0); } return 0; } @@ -1876,6 +2181,40 @@ Row::cmp(const Row& row2) const return c; } +int +Row::cmp(const Row& row2, const ITab& itab) const +{ + const Tab& tab = m_tab; + int c = 0; + for (unsigned i = 0; i < itab.m_icols; i++) { + const ICol& icol = *itab.m_icol[i]; + const Col& col = icol.m_col; + unsigned k = col.m_num; + assert(k < tab.m_cols); + const Val& val = *m_val[k]; + const Val& val2 = *row2.m_val[k]; + if ((c = val.cmp(val2)) != 0) + break; + } + return c; +} + +static NdbOut& +operator<<(NdbOut& out, const Row::Op op) +{ + if (op == Row::NoOp) + out << "NoOp"; + else if (op == Row::InsOp) + out << "InsOp"; + else if (op == Row::UpdOp) + out << "UpdOp"; + else if (op == Row::DelOp) + out << "DelOp"; + else + out << op; + return out; +} + static NdbOut& operator<<(NdbOut& out, const Row& row) { @@ -1885,10 +2224,21 @@ operator<<(NdbOut& out, const Row& row) out << " "; out << *row.m_val[i]; } - out << " [exist=" << row.m_exist; + out << " exist=" << row.m_exist; if (row.m_pending) out << " pending=" << row.m_pending; - out << "]"; + if (row.m_dbrow != 0) + out << " [dbrow=" << *row.m_dbrow << "]"; + return out; +} + +static NdbOut& +operator<<(NdbOut& out, const Row* rowptr) +{ + if (rowptr == 0) + out << "null"; + else + out << *rowptr; return out; } @@ -1898,38 +2248,47 @@ struct Set { const Tab& m_tab; unsigned m_rows; Row** m_row; - Row** m_saverow; + unsigned* m_rowkey; // maps row number (from 0) in scan to tuple key Row* m_keyrow; NdbRecAttr** m_rec; Set(const Tab& tab, unsigned rows); ~Set(); void reset(); unsigned count() const; - // row methods + // old and new values bool exist(unsigned i) const; - Row::Op pending(unsigned i) const; + void dbsave(unsigned i); + void calc(Par par, unsigned i, unsigned mask = 0); + bool pending(unsigned i, unsigned mask) const; void notpending(unsigned i); void notpending(const Lst& lst); - void calc(Par par, unsigned i); + void dbdiscard(unsigned i); + void dbdiscard(const Lst& lst); + const Row& dbrow(unsigned i) const; + // operations int insrow(Par par, unsigned i); int updrow(Par par, unsigned i); + int updrow(Par par, const ITab& itab, unsigned i); int delrow(Par par, unsigned i); - int selrow(Par par, unsigned i); + int delrow(Par par, const ITab& itab, unsigned i); + int selrow(Par par, const Row& keyrow); + int selrow(Par par, const ITab& itab, const Row& keyrow); + // set and get + void setkey(Par par, const Row& keyrow); + void setkey(Par par, const ITab& itab, const Row& keyrow); int setrow(Par par, unsigned i); int getval(Par par); int getkey(Par par, unsigned* i); - int putval(unsigned i, bool force); - // set methods + int putval(unsigned i, bool force, unsigned n = ~0); + // verify int verify(const Set& set2) const; - void savepoint(); - void commit(); - void rollback(); + int verifyorder(const ITab& itab, bool descending) const; // protect structure NdbMutex* m_mutex; - void lock() { + void lock() const { NdbMutex_Lock(m_mutex); } - void unlock() { + void unlock() const { NdbMutex_Unlock(m_mutex); } private: @@ -1945,7 +2304,11 @@ Set::Set(const Tab& tab, unsigned rows) : // allocate on need to save space m_row[i] = 0; } - m_saverow = 0; + m_rowkey = new unsigned [m_rows]; + for (unsigned n = 0; n < m_rows; n++) { + // initialize to null + m_rowkey[n] = ~0; + } m_keyrow = new Row(tab); m_rec = new NdbRecAttr* [tab.m_cols]; for (unsigned k = 0; k < tab.m_cols; k++) { @@ -1959,11 +2322,9 @@ Set::~Set() { for (unsigned i = 0; i < m_rows; i++) { delete m_row[i]; - if (m_saverow != 0) - delete m_saverow[i]; } delete [] m_row; - delete [] m_saverow; + delete [] m_rowkey; delete m_keyrow; delete [] m_rec; NdbMutex_Destroy(m_mutex); @@ -1994,6 +2355,8 @@ Set::count() const return count; } +// old and new values + bool Set::exist(unsigned i) const { @@ -2003,13 +2366,37 @@ Set::exist(unsigned i) const return m_row[i]->m_exist; } -Row::Op -Set::pending(unsigned i) const +void +Set::dbsave(unsigned i) +{ + const Tab& tab = m_tab; + assert(i < m_rows && m_row[i] != 0); + Row& row = *m_row[i]; + LL5("dbsave " << i << ": " << row); + assert(row.m_exist && ! row.m_pending && row.m_dbrow == 0); + // could swap pointers but making copy is safer + Row* rowptr = new Row(tab); + rowptr->copy(row); + row.m_dbrow = rowptr; +} + +void +Set::calc(Par par, unsigned i, unsigned mask) +{ + const Tab& tab = m_tab; + if (m_row[i] == 0) + m_row[i] = new Row(tab); + Row& row = *m_row[i]; + row.calc(par, i, mask); +} + +bool +Set::pending(unsigned i, unsigned mask) const { assert(i < m_rows); if (m_row[i] == 0) // not allocated => not pending return Row::NoOp; - return m_row[i]->m_pending; + return m_row[i]->m_pending & mask; } void @@ -2017,10 +2404,13 @@ Set::notpending(unsigned i) { assert(m_row[i] != 0); Row& row = *m_row[i]; - if (row.m_pending == Row::InsOp) + if (row.m_pending == Row::InsOp) { row.m_exist = true; - if (row.m_pending == Row::DelOp) + } else if (row.m_pending == Row::UpdOp) { + ; + } else if (row.m_pending == Row::DelOp) { row.m_exist = false; + } row.m_pending = Row::NoOp; } @@ -2034,15 +2424,35 @@ Set::notpending(const Lst& lst) } void -Set::calc(Par par, unsigned i) +Set::dbdiscard(unsigned i) { - const Tab& tab = m_tab; - if (m_row[i] == 0) - m_row[i] = new Row(tab); + assert(m_row[i] != 0); + Row& row = *m_row[i]; + LL5("dbdiscard " << i << ": " << row); + assert(row.m_dbrow != 0); + delete row.m_dbrow; + row.m_dbrow = 0; +} + +const Row& +Set::dbrow(unsigned i) const +{ + assert(m_row[i] != 0); Row& row = *m_row[i]; - row.calc(par, i); + return row.dbrow(); +} + +void +Set::dbdiscard(const Lst& lst) +{ + for (unsigned j = 0; j < lst.m_cnt; j++) { + unsigned i = lst.m_arr[j]; + dbdiscard(i); + } } +// operations + int Set::insrow(Par par, unsigned i) { @@ -2062,6 +2472,15 @@ Set::updrow(Par par, unsigned i) } int +Set::updrow(Par par, const ITab& itab, unsigned i) +{ + assert(m_row[i] != 0); + Row& row = *m_row[i]; + CHK(row.updrow(par, itab) == 0); + return 0; +} + +int Set::delrow(Par par, unsigned i) { assert(m_row[i] != 0); @@ -2071,16 +2490,68 @@ Set::delrow(Par par, unsigned i) } int -Set::selrow(Par par, unsigned i) +Set::delrow(Par par, const ITab& itab, unsigned i) +{ + assert(m_row[i] != 0); + Row& row = *m_row[i]; + CHK(row.delrow(par, itab) == 0); + return 0; +} + +int +Set::selrow(Par par, const Row& keyrow) { Con& con = par.con(); - m_keyrow->calc(par, i); + const Tab& tab = par.tab(); + setkey(par, keyrow); + LL5("selrow " << tab.m_name << ": keyrow: " << keyrow); CHK(m_keyrow->selrow(par) == 0); CHK(getval(par) == 0); return 0; } int +Set::selrow(Par par, const ITab& itab, const Row& keyrow) +{ + Con& con = par.con(); + setkey(par, itab, keyrow); + LL5("selrow " << itab.m_name << ": keyrow: " << keyrow); + CHK(m_keyrow->selrow(par, itab) == 0); + CHK(getval(par) == 0); + return 0; +} + +// set and get + +void +Set::setkey(Par par, const Row& keyrow) +{ + const Tab& tab = m_tab; + for (unsigned k = 0; k < tab.m_cols; k++) { + const Col& col = *tab.m_col[k]; + if (col.m_pk) { + Val& val1 = *m_keyrow->m_val[k]; + const Val& val2 = *keyrow.dbrow().m_val[k]; + val1.copy(val2); + } + } +} + +void +Set::setkey(Par par, const ITab& itab, const Row& keyrow) +{ + const Tab& tab = m_tab; + for (unsigned k = 0; k < itab.m_icols; k++) { + const ICol& icol = *itab.m_icol[k]; + const Col& col = icol.m_col; + unsigned m = col.m_num; + Val& val1 = *m_keyrow->m_val[m]; + const Val& val2 = *keyrow.dbrow().m_val[m]; + val1.copy(val2); + } +} + +int Set::setrow(Par par, unsigned i) { Con& con = par.con(); @@ -2114,7 +2585,7 @@ Set::getkey(Par par, unsigned* i) } int -Set::putval(unsigned i, bool force) +Set::putval(unsigned i, bool force, unsigned n) { const Tab& tab = m_tab; if (m_row[i] == 0) @@ -2135,55 +2606,55 @@ Set::putval(unsigned i, bool force) } if (! row.m_exist) row.m_exist = true; + if (n != ~0) + m_rowkey[n] = i; return 0; } +// verify + int Set::verify(const Set& set2) const { - const Tab& tab = m_tab; - assert(&tab == &set2.m_tab && m_rows == set2.m_rows); - LL3("verify set1 count=" << count() << " vs set2 count=" << set2.count()); + assert(&m_tab == &set2.m_tab && m_rows == set2.m_rows); + LL4("verify set1 count=" << count() << " vs set2 count=" << set2.count()); for (unsigned i = 0; i < m_rows; i++) { - CHK(exist(i) == set2.exist(i)); - if (! exist(i)) - continue; - Row& row = *m_row[i]; - Row& row2 = *set2.m_row[i]; - CHK(row.verify(row2) == 0); + bool ok = true; + if (exist(i) != set2.exist(i)) { + ok = false; + } else if (exist(i)) { + if (dbrow(i).verify(set2.dbrow(i)) != 0) + ok = false; + } + if (! ok) { + LL1("verify failed: key=" << i << " row1=" << m_row[i] << " row2=" << set2.m_row[i]); + CHK(0 == 1); + } } return 0; } -void -Set::savepoint() +int +Set::verifyorder(const ITab& itab, bool descending) const { const Tab& tab = m_tab; - assert(m_saverow == 0); - m_saverow = new Row* [m_rows]; - for (unsigned i = 0; i < m_rows; i++) { - if (m_row[i] == 0) - m_saverow[i] = 0; - else { - m_saverow[i] = new Row(tab); - m_saverow[i]->copy(*m_row[i]); - } + for (unsigned n = 0; n < m_rows; n++) { + unsigned i2 = m_rowkey[n]; + if (i2 == ~0) + break; + if (n == 0) + continue; + unsigned i1 = m_rowkey[n - 1]; + assert(i1 < m_rows && i2 < m_rows); + const Row& row1 = *m_row[i1]; + const Row& row2 = *m_row[i2]; + assert(row1.m_exist && row2.m_exist); + if (! descending) + CHK(row1.cmp(row2, itab) <= 0); + else + CHK(row1.cmp(row2, itab) >= 0); } -} - -void -Set::commit() -{ - delete [] m_saverow; - m_saverow = 0; -} - -void -Set::rollback() -{ - assert(m_saverow != 0); - m_row = m_saverow; - m_saverow = 0; + return 0; } static NdbOut& @@ -2384,7 +2855,9 @@ BSet::filter(const Set& set, Set& set2) const for (unsigned i = 0; i < set.m_rows; i++) { if (! set.exist(i)) continue; - const Row& row = *set.m_row[i]; + set.lock(); + const Row& row = set.dbrow(i); + set.unlock(); if (! g_store_null_key) { bool ok1 = false; for (unsigned k = 0; k < itab.m_icols; k++) { @@ -2430,7 +2903,6 @@ BSet::filter(const Set& set, Set& set2) const Row& row2 = *set2.m_row[i]; assert(! row2.m_exist); row2.copy(row); - row2.m_exist = true; } } @@ -2451,15 +2923,16 @@ static int pkinsert(Par par) { Con& con = par.con(); + const Tab& tab = par.tab(); Set& set = par.set(); - LL3("pkinsert"); + LL3("pkinsert " << tab.m_name); CHK(con.startTransaction() == 0); Lst lst; for (unsigned j = 0; j < par.m_rows; j++) { unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows); unsigned i = thrrow(par, j2); set.lock(); - if (set.exist(i) || set.pending(i)) { + if (set.exist(i) || set.pending(i, Row::AnyOp)) { set.unlock(); continue; } @@ -2473,7 +2946,7 @@ pkinsert(Par par) CHK(con.execute(Commit, deadlock) == 0); con.closeTransaction(); if (deadlock) { - LL1("pkinsert: stop on deadlock"); + LL1("pkinsert: stop on deadlock [at 1]"); return 0; } set.lock(); @@ -2488,7 +2961,7 @@ pkinsert(Par par) CHK(con.execute(Commit, deadlock) == 0); con.closeTransaction(); if (deadlock) { - LL1("pkinsert: stop on deadlock"); + LL1("pkinsert: stop on deadlock [at 2]"); return 0; } set.lock(); @@ -2504,8 +2977,9 @@ static int pkupdate(Par par) { Con& con = par.con(); + const Tab& tab = par.tab(); Set& set = par.set(); - LL3("pkupdate"); + LL3("pkupdate " << tab.m_name); CHK(con.startTransaction() == 0); Lst lst; bool deadlock = false; @@ -2513,10 +2987,11 @@ pkupdate(Par par) unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows); unsigned i = thrrow(par, j2); set.lock(); - if (! set.exist(i) || set.pending(i)) { + if (! set.exist(i) || set.pending(i, Row::AnyOp)) { set.unlock(); continue; } + set.dbsave(i); set.calc(par, i); CHK(set.updrow(par, i) == 0); set.unlock(); @@ -2526,12 +3001,13 @@ pkupdate(Par par) deadlock = par.m_deadlock; CHK(con.execute(Commit, deadlock) == 0); if (deadlock) { - LL1("pkupdate: stop on deadlock"); + LL1("pkupdate: stop on deadlock [at 1]"); break; } con.closeTransaction(); set.lock(); set.notpending(lst); + set.dbdiscard(lst); set.unlock(); lst.reset(); CHK(con.startTransaction() == 0); @@ -2541,10 +3017,11 @@ pkupdate(Par par) deadlock = par.m_deadlock; CHK(con.execute(Commit, deadlock) == 0); if (deadlock) { - LL1("pkupdate: stop on deadlock"); + LL1("pkupdate: stop on deadlock [at 1]"); } else { set.lock(); set.notpending(lst); + set.dbdiscard(lst); set.unlock(); } } @@ -2556,8 +3033,9 @@ static int pkdelete(Par par) { Con& con = par.con(); + const Tab& tab = par.tab(); Set& set = par.set(); - LL3("pkdelete"); + LL3("pkdelete " << tab.m_name); CHK(con.startTransaction() == 0); Lst lst; bool deadlock = false; @@ -2565,7 +3043,7 @@ pkdelete(Par par) unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows); unsigned i = thrrow(par, j2); set.lock(); - if (! set.exist(i) || set.pending(i)) { + if (! set.exist(i) || set.pending(i, Row::AnyOp)) { set.unlock(); continue; } @@ -2577,7 +3055,7 @@ pkdelete(Par par) deadlock = par.m_deadlock; CHK(con.execute(Commit, deadlock) == 0); if (deadlock) { - LL1("pkdelete: stop on deadlock"); + LL1("pkdelete: stop on deadlock [at 1]"); break; } con.closeTransaction(); @@ -2592,7 +3070,7 @@ pkdelete(Par par) deadlock = par.m_deadlock; CHK(con.execute(Commit, deadlock) == 0); if (deadlock) { - LL1("pkdelete: stop on deadlock"); + LL1("pkdelete: stop on deadlock [at 2]"); } else { set.lock(); set.notpending(lst); @@ -2609,19 +3087,19 @@ pkread(Par par) Con& con = par.con(); const Tab& tab = par.tab(); Set& set = par.set(); - LL3((par.m_verify ? "pkverify " : "pkread ") << tab.m_name); + LL3("pkread " << tab.m_name << " verify=" << par.m_verify); // expected const Set& set1 = set; Set set2(tab, set.m_rows); for (unsigned i = 0; i < set.m_rows; i++) { set.lock(); - if (! set.exist(i) || set.pending(i)) { + if (! set.exist(i)) { set.unlock(); continue; } set.unlock(); CHK(con.startTransaction() == 0); - CHK(set2.selrow(par, i) == 0); + CHK(set2.selrow(par, *set1.m_row[i]) == 0); CHK(con.execute(Commit) == 0); unsigned i2 = (unsigned)-1; CHK(set2.getkey(par, &i2) == 0 && i == i2); @@ -2659,6 +3137,146 @@ pkreadfast(Par par, unsigned count) return 0; } +// hash index operations + +static int +hashindexupdate(Par par, const ITab& itab) +{ + Con& con = par.con(); + Set& set = par.set(); + LL3("hashindexupdate " << itab.m_name); + CHK(con.startTransaction() == 0); + Lst lst; + bool deadlock = false; + for (unsigned j = 0; j < par.m_rows; j++) { + unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows); + unsigned i = thrrow(par, j2); + set.lock(); + if (! set.exist(i) || set.pending(i, Row::AnyOp)) { + set.unlock(); + continue; + } + set.dbsave(i); + // index key columns are not re-calculated + set.calc(par, i, itab.m_colmask); + CHK(set.updrow(par, itab, i) == 0); + set.unlock(); + LL4("hashindexupdate " << i << ": " << *set.m_row[i]); + lst.push(i); + if (lst.cnt() == par.m_batch) { + deadlock = par.m_deadlock; + CHK(con.execute(Commit, deadlock) == 0); + if (deadlock) { + LL1("hashindexupdate: stop on deadlock [at 1]"); + break; + } + con.closeTransaction(); + set.lock(); + set.notpending(lst); + set.dbdiscard(lst); + set.unlock(); + lst.reset(); + CHK(con.startTransaction() == 0); + } + } + if (! deadlock && lst.cnt() != 0) { + deadlock = par.m_deadlock; + CHK(con.execute(Commit, deadlock) == 0); + if (deadlock) { + LL1("hashindexupdate: stop on deadlock [at 1]"); + } else { + set.lock(); + set.notpending(lst); + set.dbdiscard(lst); + set.unlock(); + } + } + con.closeTransaction(); + return 0; +}; + +static int +hashindexdelete(Par par, const ITab& itab) +{ + Con& con = par.con(); + Set& set = par.set(); + LL3("hashindexdelete " << itab.m_name); + CHK(con.startTransaction() == 0); + Lst lst; + bool deadlock = false; + for (unsigned j = 0; j < par.m_rows; j++) { + unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows); + unsigned i = thrrow(par, j2); + set.lock(); + if (! set.exist(i) || set.pending(i, Row::AnyOp)) { + set.unlock(); + continue; + } + CHK(set.delrow(par, itab, i) == 0); + set.unlock(); + LL4("hashindexdelete " << i << ": " << *set.m_row[i]); + lst.push(i); + if (lst.cnt() == par.m_batch) { + deadlock = par.m_deadlock; + CHK(con.execute(Commit, deadlock) == 0); + if (deadlock) { + LL1("hashindexdelete: stop on deadlock [at 1]"); + break; + } + con.closeTransaction(); + set.lock(); + set.notpending(lst); + set.unlock(); + lst.reset(); + CHK(con.startTransaction() == 0); + } + } + if (! deadlock && lst.cnt() != 0) { + deadlock = par.m_deadlock; + CHK(con.execute(Commit, deadlock) == 0); + if (deadlock) { + LL1("hashindexdelete: stop on deadlock [at 2]"); + } else { + set.lock(); + set.notpending(lst); + set.unlock(); + } + } + con.closeTransaction(); + return 0; +}; + +static int +hashindexread(Par par, const ITab& itab) +{ + Con& con = par.con(); + const Tab& tab = par.tab(); + Set& set = par.set(); + LL3("hashindexread " << itab.m_name << " verify=" << par.m_verify); + // expected + const Set& set1 = set; + Set set2(tab, set.m_rows); + for (unsigned i = 0; i < set.m_rows; i++) { + set.lock(); + if (! set.exist(i)) { + set.unlock(); + continue; + } + set.unlock(); + CHK(con.startTransaction() == 0); + CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0); + CHK(con.execute(Commit) == 0); + unsigned i2 = (unsigned)-1; + CHK(set2.getkey(par, &i2) == 0 && i == i2); + CHK(set2.putval(i, false) == 0); + LL4("row " << set2.count() << ": " << *set2.m_row[i]); + con.closeTransaction(); + } + if (par.m_verify) + CHK(set1.verify(set2) == 0); + return 0; +} + // scan read static int @@ -2691,14 +3309,14 @@ scanreadtable(Par par) } unsigned i = (unsigned)-1; CHK(set2.getkey(par, &i) == 0); - CHK(set2.putval(i, false) == 0); + CHK(set2.putval(i, false, n) == 0); LL4("row " << n << ": " << *set2.m_row[i]); n++; } con.closeTransaction(); if (par.m_verify) CHK(set1.verify(set2) == 0); - LL3("scanread " << tab.m_name << " rows=" << n); + LL3("scanread " << tab.m_name << " done rows=" << n); return 0; } @@ -2745,19 +3363,22 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc) // prefer proper subset if (0 < n && n < set.m_rows) break; - if (urandom(5) == 0) + if (urandom(3) == 0) break; set1.reset(); } } else { bset.filter(set, set1); } - LL3("scanread " << itab.m_name << " bounds=" << bset.m_bvals << " verify=" << par.m_verify); + LL3("scanread " << itab.m_name << " bounds=" << bset << " verify=" << par.m_verify << " ordered=" << par.m_ordered << " descending=" << par.m_descending); LL4("expect " << set1.count() << " rows"); Set set2(tab, set.m_rows); CHK(con.startTransaction() == 0); CHK(con.getNdbScanOperation(itab, tab) == 0); - CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0); + if (! par.m_ordered) + CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0); + else + CHK(con.openScanOrdered(par.m_scanbat, par.m_scanpar, par.m_descending) == 0); CHK(bset.setbnd(par) == 0); set2.getval(par); CHK(con.executeScan() == 0); @@ -2775,15 +3396,17 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc) } unsigned i = (unsigned)-1; CHK(set2.getkey(par, &i) == 0); - LL4("key " << i); - CHK(set2.putval(i, par.m_dups) == 0); - LL4("row " << n << ": " << *set2.m_row[i]); + CHK(set2.putval(i, par.m_dups, n) == 0); + LL4("key " << i << " row " << n << ": " << *set2.m_row[i]); n++; } con.closeTransaction(); - if (par.m_verify) + if (par.m_verify) { CHK(set1.verify(set2) == 0); - LL3("scanread " << itab.m_name << " rows=" << n); + if (par.m_ordered) + CHK(set2.verifyorder(itab, par.m_descending) == 0); + } + LL3("scanread " << itab.m_name << " done rows=" << n); return 0; } @@ -2821,8 +3444,10 @@ scanreadindex(Par par, const ITab& itab) { const Tab& tab = par.tab(); for (unsigned i = 0; i < par.m_subsubloop; i++) { - BSet bset(tab, itab, par.m_rows); - CHK(scanreadindex(par, itab, bset, true) == 0); + if (itab.m_type == ITab::OrderedIndex) { + BSet bset(tab, itab, par.m_rows); + CHK(scanreadindex(par, itab, bset, true) == 0); + } } return 0; } @@ -2835,7 +3460,11 @@ scanreadindex(Par par) if (tab.m_itab[i] == 0) continue; const ITab& itab = *tab.m_itab[i]; - CHK(scanreadindex(par, itab) == 0); + if (itab.m_type == ITab::OrderedIndex) { + CHK(scanreadindex(par, itab) == 0); + } else { + CHK(hashindexread(par, itab) == 0); + } } return 0; } @@ -2932,7 +3561,7 @@ scanupdatetable(Par par) if (ret == 1) break; if (deadlock) { - LL1("scanupdatetable: stop on deadlock"); + LL1("scanupdatetable: stop on deadlock [at 1]"); break; } if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) { @@ -2944,13 +3573,14 @@ scanupdatetable(Par par) CHK(set2.getkey(par, &i) == 0); const Row& row = *set.m_row[i]; set.lock(); - if (! set.exist(i) || set.pending(i)) { + if (! set.exist(i) || set.pending(i, Row::AnyOp)) { LL4("scan update " << tab.m_name << ": skip: " << row); } else { CHKTRY(set2.putval(i, false) == 0, set.unlock()); CHKTRY(con.updateScanTuple(con2) == 0, set.unlock()); Par par2 = par; par2.m_con = &con2; + set.dbsave(i); set.calc(par, i); CHKTRY(set.setrow(par2, i) == 0, set.unlock()); LL4("scan update " << tab.m_name << ": " << row); @@ -2961,12 +3591,13 @@ scanupdatetable(Par par) deadlock = par.m_deadlock; CHK(con2.execute(Commit, deadlock) == 0); if (deadlock) { - LL1("scanupdateindex: stop on deadlock"); + LL1("scanupdatetable: stop on deadlock [at 2]"); goto out; } con2.closeTransaction(); set.lock(); set.notpending(lst); + set.dbdiscard(lst); set.unlock(); count += lst.cnt(); lst.reset(); @@ -2977,12 +3608,13 @@ scanupdatetable(Par par) deadlock = par.m_deadlock; CHK(con2.execute(Commit, deadlock) == 0); if (deadlock) { - LL1("scanupdateindex: stop on deadlock"); + LL1("scanupdatetable: stop on deadlock [at 3]"); goto out; } con2.closeTransaction(); set.lock(); set.notpending(lst); + set.dbdiscard(lst); set.unlock(); count += lst.cnt(); lst.reset(); @@ -3009,7 +3641,10 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) Set set2(tab, set.m_rows); CHK(con.startTransaction() == 0); CHK(con.getNdbScanOperation(itab, tab) == 0); - CHK(con.openScanExclusive(par.m_scanbat, par.m_scanpar) == 0); + if (! par.m_ordered) + CHK(con.openScanExclusive(par.m_scanbat, par.m_scanpar) == 0); + else + CHK(con.openScanOrderedExclusive(par.m_scanbat, par.m_scanpar, par.m_descending) == 0); CHK(bset.setbnd(par) == 0); set2.getval(par); CHK(con.executeScan() == 0); @@ -3027,7 +3662,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) if (ret == 1) break; if (deadlock) { - LL1("scanupdateindex: stop on deadlock"); + LL1("scanupdateindex: stop on deadlock [at 1]"); break; } if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) { @@ -3039,13 +3674,14 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) CHK(set2.getkey(par, &i) == 0); const Row& row = *set.m_row[i]; set.lock(); - if (! set.exist(i) || set.pending(i)) { + if (! set.exist(i) || set.pending(i, Row::AnyOp)) { LL4("scan update " << itab.m_name << ": skip: " << row); } else { CHKTRY(set2.putval(i, par.m_dups) == 0, set.unlock()); CHKTRY(con.updateScanTuple(con2) == 0, set.unlock()); Par par2 = par; par2.m_con = &con2; + set.dbsave(i); set.calc(par, i); CHKTRY(set.setrow(par2, i) == 0, set.unlock()); LL4("scan update " << itab.m_name << ": " << row); @@ -3056,12 +3692,13 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) deadlock = par.m_deadlock; CHK(con2.execute(Commit, deadlock) == 0); if (deadlock) { - LL1("scanupdateindex: stop on deadlock"); + LL1("scanupdateindex: stop on deadlock [at 2]"); goto out; } con2.closeTransaction(); set.lock(); set.notpending(lst); + set.dbdiscard(lst); set.unlock(); count += lst.cnt(); lst.reset(); @@ -3072,12 +3709,13 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) deadlock = par.m_deadlock; CHK(con2.execute(Commit, deadlock) == 0); if (deadlock) { - LL1("scanupdateindex: stop on deadlock"); + LL1("scanupdateindex: stop on deadlock [at 3]"); goto out; } con2.closeTransaction(); set.lock(); set.notpending(lst); + set.dbdiscard(lst); set.unlock(); count += lst.cnt(); lst.reset(); @@ -3097,9 +3735,13 @@ scanupdateindex(Par par, const ITab& itab) { const Tab& tab = par.tab(); for (unsigned i = 0; i < par.m_subsubloop; i++) { - BSet bset(tab, itab, par.m_rows); - bset.calc(par); - CHK(scanupdateindex(par, itab, bset) == 0); + if (itab.m_type == ITab::OrderedIndex) { + BSet bset(tab, itab, par.m_rows); + bset.calc(par); + CHK(scanupdateindex(par, itab, bset) == 0); + } else { + CHK(hashindexupdate(par, itab) == 0); + } } return 0; } @@ -3151,8 +3793,12 @@ readverifyfull(Par par) unsigned i = par.m_no - 1; if (i < tab.m_itabs && tab.m_itab[i] != 0) { const ITab& itab = *tab.m_itab[i]; - BSet bset(tab, itab, par.m_rows); - CHK(scanreadindex(par, itab, bset, false) == 0); + if (itab.m_type == ITab::OrderedIndex) { + BSet bset(tab, itab, par.m_rows); + CHK(scanreadindex(par, itab, bset, false) == 0); + } else { + CHK(hashindexread(par, itab) == 0); + } } } return 0; @@ -3162,6 +3808,11 @@ static int readverifyindex(Par par) { par.m_verify = true; + unsigned sel = urandom(10); + if (sel < 9) { + par.m_ordered = true; + par.m_descending = (sel < 5); + } CHK(scanreadindex(par) == 0); return 0; } @@ -3169,26 +3820,56 @@ readverifyindex(Par par) static int pkops(Par par) { + const Tab& tab = par.tab(); par.m_randomkey = true; for (unsigned i = 0; i < par.m_subsubloop; i++) { + unsigned j = 0; + while (j < tab.m_itabs) { + if (tab.m_itab[j] != 0) { + const ITab& itab = *tab.m_itab[j]; + if (itab.m_type == ITab::UniqueHashIndex && urandom(5) == 0) + break; + } + j++; + } unsigned sel = urandom(10); if (par.m_slno % 2 == 0) { // favor insert if (sel < 8) { CHK(pkinsert(par) == 0); } else if (sel < 9) { - CHK(pkupdate(par) == 0); + if (j == tab.m_itabs) + CHK(pkupdate(par) == 0); + else { + const ITab& itab = *tab.m_itab[j]; + CHK(hashindexupdate(par, itab) == 0); + } } else { - CHK(pkdelete(par) == 0); + if (j == tab.m_itabs) + CHK(pkdelete(par) == 0); + else { + const ITab& itab = *tab.m_itab[j]; + CHK(hashindexdelete(par, itab) == 0); + } } } else { // favor delete if (sel < 1) { CHK(pkinsert(par) == 0); } else if (sel < 2) { - CHK(pkupdate(par) == 0); + if (j == tab.m_itabs) + CHK(pkupdate(par) == 0); + else { + const ITab& itab = *tab.m_itab[j]; + CHK(hashindexupdate(par, itab) == 0); + } } else { - CHK(pkdelete(par) == 0); + if (j == tab.m_itabs) + CHK(pkdelete(par) == 0); + else { + const ITab& itab = *tab.m_itab[j]; + CHK(hashindexdelete(par, itab) == 0); + } } } } @@ -3208,6 +3889,10 @@ pkupdatescanread(Par par) CHK(scanreadtable(par) == 0); } else { par.m_verify = false; + if (sel < 8) { + par.m_ordered = true; + par.m_descending = (sel < 7); + } CHK(scanreadindex(par) == 0); } return 0; @@ -3227,6 +3912,10 @@ mixedoperations(Par par) } else if (sel < 6) { CHK(scanupdatetable(par) == 0); } else { + if (sel < 8) { + par.m_ordered = true; + par.m_descending = (sel < 7); + } CHK(scanupdateindex(par) == 0); } return 0; @@ -3720,7 +4409,7 @@ printtables() { Par par(g_opt); makebuiltintables(par); - ndbout << "builtin tables (index x0 is on table pk):" << endl; + ndbout << "builtin tables (x0 on pk, x=ordered z=hash):" << endl; for (unsigned j = 0; j < tabcount; j++) { if (tablist[j] == 0) continue; @@ -3744,6 +4433,7 @@ runtest(Par par) LL1("random seed: " << seed); srandom((unsigned)seed); } else if (par.m_seed != 0) + LL1("random seed: " << par.m_seed); srandom(par.m_seed); // cs assert(par.m_csname != 0); @@ -3953,7 +4643,8 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) if (strcmp(arg, "-threads") == 0) { if (++argv, --argc > 0) { g_opt.m_threads = atoi(argv[0]); - continue; + if (1 <= g_opt.m_threads) + continue; } } if (strcmp(arg, "-v") == 0) { @@ -3970,7 +4661,7 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) printhelp(); goto wrongargs; } - ndbout << "testOIBasic: unknown option " << arg; + ndbout << "testOIBasic: bad or unknown option " << arg; goto usage; } { |