summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/ndb/ndb_range_bounds.pl6
-rw-r--r--mysql-test/r/ndb_index_ordered.result1
-rw-r--r--mysql-test/t/ndb_index_ordered.test2
-rw-r--r--ndb/test/ndbapi/testOIBasic.cpp1217
4 files changed, 961 insertions, 265 deletions
diff --git a/mysql-test/ndb/ndb_range_bounds.pl b/mysql-test/ndb/ndb_range_bounds.pl
index 3b1844495b3..abe1ea28298 100644
--- a/mysql-test/ndb/ndb_range_bounds.pl
+++ b/mysql-test/ndb/ndb_range_bounds.pl
@@ -1,6 +1,7 @@
#
# test range scan bounds
# give option --all to test all cases
+# set MYSQL_HOME to installation top
#
use strict;
@@ -14,8 +15,9 @@ my $opt_verbose = 0;
GetOptions("all" => \$opt_all, "cnt=i" => \$opt_cnt, "verbose" => \$opt_verbose)
or die "options are: --all --cnt=N --verbose";
-my $mysql_top = $ENV{MYSQL_TOP};
-my $dsn = "dbi:mysql:database=test;host=localhost;mysql_read_default_file=$mysql_top/.target/var/my.cnf";
+my $mysql_home = $ENV{MYSQL_HOME};
+defined($mysql_home) or die "no MYSQL_HOME";
+my $dsn = "dbi:mysql:database=test;host=localhost;mysql_read_default_file=$mysql_home/var/my.cnf";
my $opts = { RaiseError => 0, PrintError => 0, AutoCommit => 1, };
my $dbh;
diff --git a/mysql-test/r/ndb_index_ordered.result b/mysql-test/r/ndb_index_ordered.result
index 00e3fbc5827..1cf2a97a6b3 100644
--- a/mysql-test/r/ndb_index_ordered.result
+++ b/mysql-test/r/ndb_index_ordered.result
@@ -383,6 +383,7 @@ b c
select min(b), max(b) from t1;
min(b) max(b)
1 5000000
+drop table t1;
CREATE TABLE test1 (
SubscrID int(11) NOT NULL auto_increment,
UsrID int(11) NOT NULL default '0',
diff --git a/mysql-test/t/ndb_index_ordered.test b/mysql-test/t/ndb_index_ordered.test
index 783ce99e739..42325e25ea3 100644
--- a/mysql-test/t/ndb_index_ordered.test
+++ b/mysql-test/t/ndb_index_ordered.test
@@ -175,6 +175,8 @@ select b, c from t1 where 1000<=b and b<=100000 and c<'j' order by b, c;
select b, c from t1 where 1000<=b and b<=100000 and c<'j' order by b desc, c desc;
#
select min(b), max(b) from t1;
+#
+drop table t1;
#
# Bug #6435
diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp
index 7a73ba872b2..e3cc3d0d90c 100644
--- a/ndb/test/ndbapi/testOIBasic.cpp
+++ b/ndb/test/ndbapi/testOIBasic.cpp
@@ -72,7 +72,7 @@ struct Opt {
m_die(0),
m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined),
- m_subsubloop(4),
+ m_subsubloop(2),
m_index(0),
m_loop(1),
m_msglock(true),
@@ -257,6 +257,9 @@ struct Par : public Opt {
bool m_verify;
// deadlock possible
bool m_deadlock;
+ // ordered range scan
+ bool m_ordered;
+ bool m_descending;
// timer location
Par(const Opt& opt) :
Opt(opt),
@@ -274,7 +277,9 @@ struct Par : public Opt {
m_bdir(0),
m_randomkey(false),
m_verify(false),
- m_deadlock(false) {
+ m_deadlock(false),
+ m_ordered(false),
+ m_descending(false) {
}
};
@@ -444,6 +449,9 @@ struct Chs {
~Chs();
};
+static NdbOut&
+operator<<(NdbOut& out, const Chs& chs);
+
Chs::Chs(CHARSET_INFO* cs) :
m_cs(cs)
{
@@ -455,10 +463,14 @@ Chs::Chs(CHARSET_INFO* cs) :
unsigned i = 0;
unsigned miss1 = 0;
unsigned miss2 = 0;
+ unsigned miss3 = 0;
+ unsigned miss4 = 0;
while (i < maxcharcount) {
unsigned char* bytes = m_chr[i].m_bytes;
unsigned char* xbytes = m_chr[i].m_xbytes;
- unsigned size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1);
+ unsigned& size = m_chr[i].m_size;
+ bool ok;
+ size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1);
assert(m_cs->mbminlen <= size && size <= m_cs->mbmaxlen);
// prefer longer chars
if (size == m_cs->mbminlen && m_cs->mbminlen < m_cs->mbmaxlen && urandom(5) != 0)
@@ -466,33 +478,57 @@ Chs::Chs(CHARSET_INFO* cs) :
for (unsigned j = 0; j < size; j++) {
bytes[j] = urandom(256);
}
+ // check wellformed
const char* sbytes = (const char*)bytes;
if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + size, 1) != size) {
miss1++;
continue;
}
- // do not trust well_formed_len currently
+ // check no proper prefix wellformed
+ ok = true;
+ for (unsigned j = 1; j < size; j++) {
+ if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + j, 1) == j) {
+ ok = false;
+ break;
+ }
+ }
+ if (! ok) {
+ miss2++;
+ continue;
+ }
+ // normalize
memset(xbytes, 0, sizeof(xbytes));
// currently returns buffer size always
int xlen = (*cs->coll->strnxfrm)(cs, xbytes, m_xmul * size, bytes, size);
// check we got something
- bool xok = false;
+ ok = false;
for (unsigned j = 0; j < xlen; j++) {
if (xbytes[j] != 0) {
- xok = true;
+ ok = true;
break;
}
}
- if (! xok) {
- miss2++;
+ if (! ok) {
+ miss3++;
+ continue;
+ }
+ // check for duplicate (before normalize)
+ ok = true;
+ for (unsigned j = 0; j < i; j++) {
+ const Chr& chr = m_chr[j];
+ if (chr.m_size == size && memcmp(chr.m_bytes, bytes, size) == 0) {
+ ok = false;
+ break;
+ }
+ }
+ if (! ok) {
+ miss4++;
continue;
}
- // occasional duplicate char is ok
- m_chr[i].m_size = size;
i++;
}
bool disorder = true;
- unsigned bubbels = 0;
+ unsigned bubbles = 0;
while (disorder) {
disorder = false;
for (unsigned i = 1; i < maxcharcount; i++) {
@@ -502,11 +538,11 @@ Chs::Chs(CHARSET_INFO* cs) :
m_chr[i] = m_chr[i-1];
m_chr[i-1] = chr;
disorder = true;
- bubbels++;
+ bubbles++;
}
}
}
- LL3("inited charset " << cs->name << " miss1=" << miss1 << " miss2=" << miss2 << " bubbels=" << bubbels);
+ LL3("inited charset " << *this << " miss=" << miss1 << "," << miss2 << "," << miss3 << "," << miss4 << " bubbles=" << bubbles);
}
Chs::~Chs()
@@ -514,6 +550,14 @@ Chs::~Chs()
delete [] m_chr;
}
+static NdbOut&
+operator<<(NdbOut& out, const Chs& chs)
+{
+ CHARSET_INFO* cs = chs.m_cs;
+ out << cs->name << "[" << cs->mbminlen << "-" << cs->mbmaxlen << "," << chs.m_xmul << "]";
+ return out;
+}
+
static Chs* cslist[maxcsnumber];
static void
@@ -552,22 +596,26 @@ getcs(Par par)
// Col - table column
struct Col {
+ enum Type {
+ Unsigned = NdbDictionary::Column::Unsigned,
+ Char = NdbDictionary::Column::Char
+ };
const class Tab& m_tab;
unsigned m_num;
const char* m_name;
bool m_pk;
- NdbDictionary::Column::Type m_type;
+ Type m_type;
unsigned m_length;
unsigned m_bytelength;
bool m_nullable;
const Chs* m_chs;
- Col(const class Tab& tab, unsigned num, const char* name, bool pk, NdbDictionary::Column::Type type, unsigned length, bool nullable, const Chs* chs);
+ Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs);
~Col();
bool equal(const Col& col2) const;
void verify(const void* addr) const;
};
-Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, NdbDictionary::Column::Type type, unsigned length, bool nullable, const Chs* chs) :
+Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs) :
m_tab(tab),
m_num(num),
m_name(strcpy(new char [strlen(name) + 1], name)),
@@ -595,9 +643,9 @@ void
Col::verify(const void* addr) const
{
switch (m_type) {
- case NdbDictionary::Column::Unsigned:
+ case Col::Unsigned:
break;
- case NdbDictionary::Column::Char:
+ case Col::Char:
{
CHARSET_INFO* cs = m_chs->m_cs;
const char* src = (const char*)addr;
@@ -616,10 +664,10 @@ operator<<(NdbOut& out, const Col& col)
{
out << "col[" << col.m_num << "] " << col.m_name;
switch (col.m_type) {
- case NdbDictionary::Column::Unsigned:
+ case Col::Unsigned:
out << " unsigned";
break;
- case NdbDictionary::Column::Char:
+ case Col::Char:
{
CHARSET_INFO* cs = col.m_chs->m_cs;
out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
@@ -656,25 +704,41 @@ ICol::~ICol()
{
}
+static NdbOut&
+operator<<(NdbOut& out, const ICol& icol)
+{
+ out << "icol[" << icol.m_num << "] " << icol.m_col;
+ return out;
+}
+
// ITab - index
struct ITab {
+ enum Type {
+ OrderedIndex = NdbDictionary::Index::OrderedIndex,
+ UniqueHashIndex = NdbDictionary::Index::UniqueHashIndex
+ };
const class Tab& m_tab;
const char* m_name;
+ Type m_type;
unsigned m_icols;
const ICol** m_icol;
- ITab(const class Tab& tab, const char* name, unsigned icols);
+ unsigned m_colmask;
+ ITab(const class Tab& tab, const char* name, Type type, unsigned icols);
~ITab();
+ void icoladd(unsigned k, const ICol* icolptr);
};
-ITab::ITab(const class Tab& tab, const char* name, unsigned icols) :
+ITab::ITab(const class Tab& tab, const char* name, Type type, unsigned icols) :
m_tab(tab),
m_name(strcpy(new char [strlen(name) + 1], name)),
+ m_type(type),
m_icols(icols),
- m_icol(new const ICol* [icols + 1])
+ m_icol(new const ICol* [icols + 1]),
+ m_colmask(0)
{
- for (unsigned i = 0; i <= m_icols; i++)
- m_icol[0] = 0;
+ for (unsigned k = 0; k <= m_icols; k++)
+ m_icol[k] = 0;
}
ITab::~ITab()
@@ -685,13 +749,21 @@ ITab::~ITab()
delete [] m_icol;
}
+void
+ITab::icoladd(unsigned k, const ICol* icolptr)
+{
+ assert(k == icolptr->m_num && k < m_icols && m_icol[k] == 0);
+ m_icol[k] = icolptr;
+ m_colmask |= (1 << icolptr->m_col.m_num);
+}
+
static NdbOut&
operator<<(NdbOut& out, const ITab& itab)
{
out << "itab " << itab.m_name << " icols=" << itab.m_icols;
for (unsigned k = 0; k < itab.m_icols; k++) {
- out << endl;
- out << "icol[" << k << "] " << itab.m_icol[k]->m_col;
+ const ICol& icol = *itab.m_icol[k];
+ out << endl << icol;
}
return out;
}
@@ -706,6 +778,8 @@ struct Tab {
const ITab** m_itab;
// pk must contain an Unsigned column
unsigned m_keycol;
+ void coladd(unsigned k, Col* colptr);
+ void itabadd(unsigned j, ITab* itab);
Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol);
~Tab();
};
@@ -718,10 +792,10 @@ Tab::Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol) :
m_itab(new const ITab* [itabs + 1]),
m_keycol(keycol)
{
- for (unsigned i = 0; i <= cols; i++)
- m_col[i] = 0;
- for (unsigned i = 0; i <= itabs; i++)
- m_itab[i] = 0;
+ for (unsigned k = 0; k <= cols; k++)
+ m_col[k] = 0;
+ for (unsigned j = 0; j <= itabs; j++)
+ m_itab[j] = 0;
}
Tab::~Tab()
@@ -735,19 +809,33 @@ Tab::~Tab()
delete [] m_itab;
}
+void
+Tab::coladd(unsigned k, Col* colptr)
+{
+ assert(k == colptr->m_num && k < m_cols && m_col[k] == 0);
+ m_col[k] = colptr;
+}
+
+void
+Tab::itabadd(unsigned j, ITab* itabptr)
+{
+ assert(j < m_itabs && m_itab[j] == 0);
+ m_itab[j] = itabptr;
+}
+
static NdbOut&
operator<<(NdbOut& out, const Tab& tab)
{
out << "tab " << tab.m_name << " cols=" << tab.m_cols;
for (unsigned k = 0; k < tab.m_cols; k++) {
- out << endl;
- out << *tab.m_col[k];
+ const Col& col = *tab.m_col[k];
+ out << endl << col;
}
for (unsigned i = 0; i < tab.m_itabs; i++) {
if (tab.m_itab[i] == 0)
continue;
- out << endl;
- out << *tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
+ out << endl << itab;
}
return out;
}
@@ -774,7 +862,7 @@ verifytables()
{
assert(t->m_keycol < t->m_cols);
const Col* c = t->m_col[t->m_keycol];
- assert(c->m_pk && c->m_type == NdbDictionary::Column::Unsigned);
+ assert(c->m_pk && c->m_type == Col::Unsigned);
}
assert(t->m_itabs != 0 && t->m_itab != 0);
for (unsigned i = 0; i < t->m_itabs; i++) {
@@ -785,6 +873,9 @@ verifytables()
for (unsigned k = 0; k < x->m_icols; k++) {
const ICol* c = x->m_icol[k];
assert(c != 0 && c->m_num == k && c->m_col.m_num < t->m_cols);
+ if (x->m_type == ITab::UniqueHashIndex) {
+ assert(! c->m_col.m_nullable);
+ }
}
}
assert(t->m_itab[t->m_itabs] == 0);
@@ -810,127 +901,186 @@ makebuiltintables(Par par)
}
// ti0 - basic
if (usetable(par, 0)) {
- const Tab* t = new Tab("ti0", 5, 5, 0);
+ Tab* t = new Tab("ti0", 5, 7, 0);
// name - pk - type - length - nullable - cs
- t->m_col[0] = new Col(*t, 0, "a", 1, NdbDictionary::Column::Unsigned, 1, 0, 0);
- t->m_col[1] = new Col(*t, 1, "b", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
- t->m_col[2] = new Col(*t, 2, "c", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
- t->m_col[3] = new Col(*t, 3, "d", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
- t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
+ t->coladd(0, new Col(*t, 0, "a", 1, Col::Unsigned, 1, 0, 0));
+ t->coladd(1, new Col(*t, 1, "b", 0, Col::Unsigned, 1, 1, 0));
+ t->coladd(2, new Col(*t, 2, "c", 0, Col::Unsigned, 1, 0, 0));
+ t->coladd(3, new Col(*t, 3, "d", 0, Col::Unsigned, 1, 1, 0));
+ t->coladd(4, new Col(*t, 4, "e", 0, Col::Unsigned, 1, 0, 0));
if (useindex(par, 0)) {
// a
- const ITab* x = t->m_itab[0] = new ITab(*t, "ti0x0", 1);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
+ ITab* x = new ITab(*t, "ti0x0", ITab::OrderedIndex, 1);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
+ t->itabadd(0, x);
}
if (useindex(par, 1)) {
// b
- const ITab* x = t->m_itab[1] = new ITab(*t, "ti0x1", 1);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ ITab* x = new ITab(*t, "ti0x1", ITab::OrderedIndex, 1);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
+ t->itabadd(1, x);
}
if (useindex(par, 2)) {
// b, c
- const ITab* x = t->m_itab[2] = new ITab(*t, "ti0x2", 2);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
- x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
+ ITab* x = new ITab(*t, "ti0x2", ITab::OrderedIndex, 2);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
+ t->itabadd(2, x);
}
if (useindex(par, 3)) {
// d, c, b
- const ITab* x = t->m_itab[3] = new ITab(*t, "ti0x3", 3);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[3]);
- x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
- x->m_icol[2] = new ICol(*x, 2, *t->m_col[1]);
+ ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 3);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
+ x->icoladd(2, new ICol(*x, 2, *t->m_col[1]));
+ t->itabadd(3, x);
}
if (useindex(par, 4)) {
// b, e, c, d
- const ITab* x = t->m_itab[4] = new ITab(*t, "ti0x4", 4);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
- x->m_icol[1] = new ICol(*x, 1, *t->m_col[4]);
- x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]);
- x->m_icol[3] = new ICol(*x, 3, *t->m_col[3]);
+ ITab* x = new ITab(*t, "ti0x4", ITab::OrderedIndex, 4);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
+ x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
+ x->icoladd(3, new ICol(*x, 3, *t->m_col[3]));
+ t->itabadd(4, x);
+ }
+ if (useindex(par, 5)) {
+ // a, c
+ ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
+ t->itabadd(5, x);
+ }
+ if (useindex(par, 6)) {
+ // a, e
+ ITab* x = new ITab(*t, "ti0z6", ITab::UniqueHashIndex, 2);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
+ t->itabadd(6, x);
}
tablist[0] = t;
}
// ti1 - simple char fields
if (usetable(par, 1)) {
- const Tab* t = new Tab("ti1", 5, 5, 1);
+ Tab* t = new Tab("ti1", 5, 7, 1);
// name - pk - type - length - nullable - cs
- t->m_col[0] = new Col(*t, 0, "a", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
- t->m_col[1] = new Col(*t, 1, "b", 1, NdbDictionary::Column::Unsigned, 1, 0, 0);
- t->m_col[2] = new Col(*t, 2, "c", 0, NdbDictionary::Column::Char, 20, 1, getcs(par));
- t->m_col[3] = new Col(*t, 3, "d", 0, NdbDictionary::Column::Char, 5, 1, getcs(par));
- t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Char, 5, 1, getcs(par));
+ t->coladd(0, new Col(*t, 0, "a", 0, Col::Unsigned, 1, 0, 0));
+ t->coladd(1, new Col(*t, 1, "b", 1, Col::Unsigned, 1, 0, 0));
+ t->coladd(2, new Col(*t, 2, "c", 0, Col::Char, 20, 1, getcs(par)));
+ t->coladd(3, new Col(*t, 3, "d", 0, Col::Char, 5, 0, getcs(par)));
+ t->coladd(4, new Col(*t, 4, "e", 0, Col::Char, 5, 1, getcs(par)));
if (useindex(par, 0)) {
// b
- const ITab* x = t->m_itab[0] = new ITab(*t, "ti1x0", 1);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ ITab* x = new ITab(*t, "ti1x0", ITab::OrderedIndex, 1);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
}
if (useindex(par, 1)) {
// a, c
- const ITab* x = t->m_itab[1] = new ITab(*t, "ti1x1", 2);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
- x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
+ ITab* x = new ITab(*t, "ti1x1", ITab::OrderedIndex, 2);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
+ t->itabadd(1, x);
}
if (useindex(par, 2)) {
// c, a
- const ITab* x = t->m_itab[2] = new ITab(*t, "ti1x2", 2);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[2]);
- x->m_icol[1] = new ICol(*x, 1, *t->m_col[0]);
+ ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 2);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[2]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[0]));
+ t->itabadd(2, x);
}
if (useindex(par, 3)) {
// e
- const ITab* x = t->m_itab[3] = new ITab(*t, "ti1x3", 1);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]);
+ ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 1);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
+ t->itabadd(3, x);
}
if (useindex(par, 4)) {
// e, d, c, b
- const ITab* x = t->m_itab[4] = new ITab(*t, "ti1x4", 4);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]);
- x->m_icol[1] = new ICol(*x, 1, *t->m_col[3]);
- x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]);
- x->m_icol[3] = new ICol(*x, 3, *t->m_col[1]);
+ ITab* x = new ITab(*t, "ti1x4", ITab::OrderedIndex, 4);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
+ x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
+ x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
+ t->itabadd(4, x);
+ }
+ if (useindex(par, 5)) {
+ // a, b
+ ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 2);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[1]));
+ t->itabadd(5, x);
+ }
+ if (useindex(par, 6)) {
+ // a, b, d
+ ITab* x = new ITab(*t, "ti1z6", ITab::UniqueHashIndex, 3);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[1]));
+ x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
+ t->itabadd(6, x);
}
tablist[1] = t;
}
// ti2 - complex char fields
if (usetable(par, 2)) {
- const Tab* t = new Tab("ti2", 5, 5, 2);
+ Tab* t = new Tab("ti2", 5, 7, 2);
// name - pk - type - length - nullable - cs
- t->m_col[0] = new Col(*t, 0, "a", 1, NdbDictionary::Column::Char, 101, 0, getcs(par));
- t->m_col[1] = new Col(*t, 1, "b", 0, NdbDictionary::Column::Char, 4, 1, getcs(par));
- t->m_col[2] = new Col(*t, 2, "c", 1, NdbDictionary::Column::Unsigned, 1, 0, 0);
- t->m_col[3] = new Col(*t, 3, "d", 1, NdbDictionary::Column::Char, 3, 0, getcs(par));
- t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Char, 101, 0, getcs(par));
+ t->coladd(0, new Col(*t, 0, "a", 1, Col::Char, 31, 0, getcs(par)));
+ t->coladd(1, new Col(*t, 1, "b", 0, Col::Char, 4, 1, getcs(par)));
+ t->coladd(2, new Col(*t, 2, "c", 1, Col::Unsigned, 1, 0, 0));
+ t->coladd(3, new Col(*t, 3, "d", 1, Col::Char, 3, 0, getcs(par)));
+ t->coladd(4, new Col(*t, 4, "e", 0, Col::Char, 17, 0, getcs(par)));
if (useindex(par, 0)) {
// a, c, d
- const ITab* x = t->m_itab[0] = new ITab(*t, "ti2x0", 3);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
- x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
- x->m_icol[2] = new ICol(*x, 2, *t->m_col[3]);
+ ITab* x = new ITab(*t, "ti2x0", ITab::OrderedIndex, 3);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
+ x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
+ t->itabadd(0, x);
}
if (useindex(par, 1)) {
// e, d, c, b, a
- const ITab* x = t->m_itab[1] = new ITab(*t, "ti2x1", 5);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]);
- x->m_icol[1] = new ICol(*x, 1, *t->m_col[3]);
- x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]);
- x->m_icol[3] = new ICol(*x, 3, *t->m_col[1]);
- x->m_icol[4] = new ICol(*x, 4, *t->m_col[0]);
+ ITab* x = new ITab(*t, "ti2x1", ITab::OrderedIndex, 5);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
+ x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
+ x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
+ x->icoladd(4, new ICol(*x, 4, *t->m_col[0]));
+ t->itabadd(1, x);
}
if (useindex(par, 2)) {
// d
- const ITab* x = t->m_itab[2] = new ITab(*t, "ti2x2", 1);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[3]);
+ ITab* x = new ITab(*t, "ti2x2", ITab::OrderedIndex, 1);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
+ t->itabadd(2, x);
}
if (useindex(par, 3)) {
// b
- const ITab* x = t->m_itab[3] = new ITab(*t, "ti2x3", 1);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ ITab* x = new ITab(*t, "ti2x3", ITab::OrderedIndex, 1);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
+ t->itabadd(3, x);
}
if (useindex(par, 4)) {
// a, e
- const ITab* x = t->m_itab[4] = new ITab(*t, "ti2x4", 2);
- x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
- x->m_icol[1] = new ICol(*x, 1, *t->m_col[4]);
+ ITab* x = new ITab(*t, "ti2x4", ITab::OrderedIndex, 2);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
+ t->itabadd(4, x);
+ }
+ if (useindex(par, 5)) {
+ // a, c
+ ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 2);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
+ t->itabadd(5, x);
+ }
+ if (useindex(par, 6)) {
+ // a, c, d, e
+ ITab* x = new ITab(*t, "ti2z6", ITab::UniqueHashIndex, 4);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
+ x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
+ x->icoladd(3, new ICol(*x, 3, *t->m_col[4]));
+ t->itabadd(6, x);
}
tablist[2] = t;
}
@@ -944,6 +1094,7 @@ struct Con {
NdbDictionary::Dictionary* m_dic;
NdbConnection* m_tx;
NdbOperation* m_op;
+ NdbIndexOperation* m_indexop;
NdbScanOperation* m_scanop;
NdbIndexScanOperation* m_indexscanop;
enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive };
@@ -951,7 +1102,7 @@ struct Con {
enum ErrType { ErrNone = 0, ErrDeadlock, ErrOther };
ErrType m_errtype;
Con() :
- m_ndb(0), m_dic(0), m_tx(0), m_op(0),
+ m_ndb(0), m_dic(0), m_tx(0), m_op(0), m_indexop(0),
m_scanop(0), m_indexscanop(0), m_scanmode(ScanNo), m_errtype(ErrNone) {}
~Con() {
if (m_tx != 0)
@@ -962,6 +1113,7 @@ struct Con {
void disconnect();
int startTransaction();
int getNdbOperation(const Tab& tab);
+ int getNdbIndexOperation(const ITab& itab, const Tab& tab);
int getNdbScanOperation(const Tab& tab);
int getNdbScanOperation(const ITab& itab, const Tab& tab);
int equal(int num, const char* addr);
@@ -972,6 +1124,8 @@ struct Con {
int execute(ExecType t, bool& deadlock);
int openScanRead(unsigned scanbat, unsigned scanpar);
int openScanExclusive(unsigned scanbat, unsigned scanpar);
+ int openScanOrdered(unsigned scanbat, unsigned scanpar, bool descending);
+ int openScanOrderedExclusive(unsigned scanbat, unsigned scanpar, bool descending);
int executeScan();
int nextScanResult(bool fetchAllowed);
int nextScanResult(bool fetchAllowed, bool& deadlock);
@@ -1026,6 +1180,14 @@ Con::getNdbOperation(const Tab& tab)
}
int
+Con::getNdbIndexOperation(const ITab& itab, const Tab& tab)
+{
+ assert(m_tx != 0);
+ CHKCON((m_op = m_indexop = m_tx->getNdbIndexOperation(itab.m_name, tab.m_name)) != 0, *this);
+ return 0;
+}
+
+int
Con::getNdbScanOperation(const Tab& tab)
{
assert(m_tx != 0);
@@ -1116,6 +1278,25 @@ Con::openScanExclusive(unsigned scanbat, unsigned scanpar)
}
int
+Con::openScanOrdered(unsigned scanbat, unsigned scanpar, bool descending)
+{
+ assert(m_tx != 0 && m_indexscanop != 0);
+ NdbOperation::LockMode lm = NdbOperation::LM_Read;
+ CHKCON(m_indexscanop->readTuples(lm, scanbat, scanpar, true, descending) == 0, *this);
+ return 0;
+}
+
+int
+Con::openScanOrderedExclusive(unsigned scanbat, unsigned scanpar, bool descending)
+{
+ assert(m_tx != 0 && m_indexscanop != 0);
+ NdbOperation::LockMode lm = NdbOperation::LM_Exclusive;
+ CHKCON(m_indexscanop->readTuples(lm, scanbat, scanpar, true, descending) == 0, *this);
+ return 0;
+}
+
+
+int
Con::executeScan()
{
CHKCON(m_tx->execute(NoCommit) == 0, *this);
@@ -1202,6 +1383,7 @@ Con::printerror(NdbOut& out)
if ((code = m_tx->getNdbError().code) != 0) {
LL0(++any << " con: error " << m_tx->getNdbError());
die += (code == g_opt.m_die);
+ // 631 is new, occurs only on 4 db nodes, needs to be checked out
if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499 || code == 631)
m_errtype = ErrDeadlock;
}
@@ -1290,7 +1472,7 @@ createtable(Par par)
for (unsigned k = 0; k < tab.m_cols; k++) {
const Col& col = *tab.m_col[k];
NdbDictionary::Column c(col.m_name);
- c.setType(col.m_type);
+ c.setType((NdbDictionary::Column::Type)col.m_type);
c.setLength(col.m_bytelength); // NDB API uses length in bytes
c.setPrimaryKey(col.m_pk);
c.setNullable(col.m_nullable);
@@ -1343,8 +1525,10 @@ createindex(Par par, const ITab& itab)
LL4(itab);
NdbDictionary::Index x(itab.m_name);
x.setTable(tab.m_name);
- x.setType(NdbDictionary::Index::OrderedIndex);
- x.setLogging(false);
+ x.setType((NdbDictionary::Index::Type)itab.m_type);
+ if (par.m_nologging || itab.m_type == ITab::OrderedIndex) {
+ x.setLogging(false);
+ }
for (unsigned k = 0; k < itab.m_icols; k++) {
const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
@@ -1385,6 +1569,8 @@ struct Val {
void copy(const void* addr);
const void* dataaddr() const;
bool m_null;
+ int equal(Par par) const;
+ int equal(Par par, const ICol& icol) const;
int setval(Par par) const;
void calc(Par par, unsigned i);
void calckey(Par par, unsigned i);
@@ -1402,9 +1588,9 @@ Val::Val(const Col& col) :
m_col(col)
{
switch (col.m_type) {
- case NdbDictionary::Column::Unsigned:
+ case Col::Unsigned:
break;
- case NdbDictionary::Column::Char:
+ case Col::Char:
m_char = new unsigned char [col.m_bytelength];
break;
default:
@@ -1417,9 +1603,9 @@ Val::~Val()
{
const Col& col = m_col;
switch (col.m_type) {
- case NdbDictionary::Column::Unsigned:
+ case Col::Unsigned:
break;
- case NdbDictionary::Column::Char:
+ case Col::Char:
delete [] m_char;
break;
default:
@@ -1446,10 +1632,10 @@ Val::copy(const void* addr)
{
const Col& col = m_col;
switch (col.m_type) {
- case NdbDictionary::Column::Unsigned:
+ case Col::Unsigned:
m_uint32 = *(const Uint32*)addr;
break;
- case NdbDictionary::Column::Char:
+ case Col::Char:
memcpy(m_char, addr, col.m_bytelength);
break;
default:
@@ -1464,9 +1650,9 @@ Val::dataaddr() const
{
const Col& col = m_col;
switch (col.m_type) {
- case NdbDictionary::Column::Unsigned:
+ case Col::Unsigned:
return &m_uint32;
- case NdbDictionary::Column::Char:
+ case Col::Char:
return m_char;
default:
break;
@@ -1476,18 +1662,37 @@ Val::dataaddr() const
}
int
-Val::setval(Par par) const
+Val::equal(Par par) const
{
Con& con = par.con();
const Col& col = m_col;
+ assert(col.m_pk && ! m_null);
const char* addr = (const char*)dataaddr();
- if (m_null)
- addr = 0;
- LL5("setval [" << m_col << "] " << *this);
- if (col.m_pk)
- CHK(con.equal(col.m_num, addr) == 0);
- else
- CHK(con.setValue(col.m_num, addr) == 0);
+ LL5("equal [" << col << "] " << *this);
+ CHK(con.equal(col.m_num, addr) == 0);
+ return 0;
+}
+
+int
+Val::equal(Par par, const ICol& icol) const
+{
+ Con& con = par.con();
+ assert(! m_null);
+ const char* addr = (const char*)dataaddr();
+ LL5("equal [" << icol << "] " << *this);
+ CHK(con.equal(icol.m_num, addr) == 0);
+ return 0;
+}
+
+int
+Val::setval(Par par) const
+{
+ Con& con = par.con();
+ const Col& col = m_col;
+ assert(! col.m_pk);
+ const char* addr = ! m_null ? (const char*)dataaddr() : 0;
+ LL5("setval [" << col << "] " << *this);
+ CHK(con.setValue(col.m_num, addr) == 0);
return 0;
}
@@ -1506,10 +1711,10 @@ Val::calckey(Par par, unsigned i)
const Col& col = m_col;
m_null = false;
switch (col.m_type) {
- case NdbDictionary::Column::Unsigned:
+ case Col::Unsigned:
m_uint32 = i;
break;
- case NdbDictionary::Column::Char:
+ case Col::Char:
{
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
@@ -1549,10 +1754,10 @@ Val::calcnokey(Par par)
}
unsigned v = par.m_range + r;
switch (col.m_type) {
- case NdbDictionary::Column::Unsigned:
+ case Col::Unsigned:
m_uint32 = v;
break;
- case NdbDictionary::Column::Char:
+ case Col::Char:
{
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
@@ -1609,7 +1814,7 @@ Val::cmp(const Val& val2) const
col.verify(val2.dataaddr());
// compare
switch (col.m_type) {
- case NdbDictionary::Column::Unsigned:
+ case Col::Unsigned:
{
if (m_uint32 < val2.m_uint32)
return -1;
@@ -1618,7 +1823,7 @@ Val::cmp(const Val& val2) const
return 0;
}
break;
- case NdbDictionary::Column::Char:
+ case Col::Char:
{
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
@@ -1657,10 +1862,10 @@ operator<<(NdbOut& out, const Val& val)
return out;
}
switch (col.m_type) {
- case NdbDictionary::Column::Unsigned:
+ case Col::Unsigned:
out << val.m_uint32;
break;
- case NdbDictionary::Column::Char:
+ case Col::Char:
{
char buf[4 * 8000];
char *p = buf;
@@ -1697,19 +1902,25 @@ struct Row {
const Tab& m_tab;
Val** m_val;
bool m_exist;
- enum Op { NoOp = 0, ReadOp, InsOp, UpdOp, DelOp };
+ enum Op { NoOp = 0, ReadOp = 1, InsOp = 2, UpdOp = 4, DelOp = 8, AnyOp = 15 };
Op m_pending;
+ Row* m_dbrow; // copy of db row before update
Row(const Tab& tab);
~Row();
void copy(const Row& row2);
- void calc(Par par, unsigned i);
+ void calc(Par par, unsigned i, unsigned mask = 0);
+ const Row& dbrow() const;
int verify(const Row& row2) const;
int insrow(Par par);
int updrow(Par par);
+ int updrow(Par par, const ITab& itab);
int delrow(Par par);
+ int delrow(Par par, const ITab& itab);
int selrow(Par par);
+ int selrow(Par par, const ITab& itab);
int setrow(Par par);
int cmp(const Row& row2) const;
+ int cmp(const Row& row2, const ITab& itab) const;
private:
Row& operator=(const Row& row2);
};
@@ -1724,6 +1935,7 @@ Row::Row(const Tab& tab) :
}
m_exist = false;
m_pending = NoOp;
+ m_dbrow = 0;
}
Row::~Row()
@@ -1733,6 +1945,7 @@ Row::~Row()
delete m_val[k];
}
delete [] m_val;
+ delete m_dbrow;
}
void
@@ -1745,27 +1958,49 @@ Row::copy(const Row& row2)
const Val& val2 = *row2.m_val[k];
val.copy(val2);
}
+ m_exist = row2.m_exist;
+ m_pending = row2.m_pending;
+ if (row2.m_dbrow == 0) {
+ m_dbrow = 0;
+ } else {
+ assert(row2.m_dbrow->m_dbrow == 0);
+ if (m_dbrow == 0)
+ m_dbrow = new Row(tab);
+ m_dbrow->copy(*row2.m_dbrow);
+ }
}
void
-Row::calc(Par par, unsigned i)
+Row::calc(Par par, unsigned i, unsigned mask)
{
const Tab& tab = m_tab;
for (unsigned k = 0; k < tab.m_cols; k++) {
- Val& val = *m_val[k];
- val.calc(par, i);
+ if (! (mask & (1 << k))) {
+ Val& val = *m_val[k];
+ val.calc(par, i);
+ }
}
}
+const Row&
+Row::dbrow() const
+{
+ if (m_dbrow == 0)
+ return *this;
+ assert(m_pending == Row::UpdOp || m_pending == Row::DelOp);
+ return *m_dbrow;
+}
+
int
Row::verify(const Row& row2) const
{
const Tab& tab = m_tab;
- assert(&tab == &row2.m_tab && m_exist && row2.m_exist);
+ const Row& row1 = *this;
+ assert(&row1.m_tab == &row2.m_tab && row1.m_exist && row2.m_exist);
for (unsigned k = 0; k < tab.m_cols; k++) {
- const Val& val = *m_val[k];
+ const Val& val1 = *row1.m_val[k];
const Val& val2 = *row2.m_val[k];
- CHK(val.verify(val2) == 0);
+ CHK(val1.verify(val2) == 0);
}
return 0;
}
@@ -1780,7 +2015,15 @@ Row::insrow(Par par)
CHKCON(con.m_op->insertTuple() == 0, con);
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
- CHK(val.setval(par) == 0);
+ const Col& col = val.m_col;
+ if (col.m_pk)
+ CHK(val.equal(par) == 0);
+ }
+ for (unsigned k = 0; k < tab.m_cols; k++) {
+ const Val& val = *m_val[k];
+ const Col& col = val.m_col;
+ if (! col.m_pk)
+ CHK(val.setval(par) == 0);
}
m_pending = InsOp;
return 0;
@@ -1797,16 +2040,40 @@ Row::updrow(Par par)
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
const Col& col = val.m_col;
+ if (col.m_pk)
+ CHK(val.equal(par) == 0);
+ }
+ for (unsigned k = 0; k < tab.m_cols; k++) {
+ const Val& val = *m_val[k];
+ const Col& col = val.m_col;
if (! col.m_pk)
- continue;
- CHK(val.setval(par) == 0);
+ CHK(val.setval(par) == 0);
+ }
+ m_pending = UpdOp;
+ return 0;
+}
+
+int
+Row::updrow(Par par, const ITab& itab)
+{
+ Con& con = par.con();
+ const Tab& tab = m_tab;
+ assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
+ assert(m_exist);
+ CHK(con.getNdbIndexOperation(itab, tab) == 0);
+ CHKCON(con.m_op->updateTuple() == 0, con);
+ for (unsigned k = 0; k < itab.m_icols; k++) {
+ const ICol& icol = *itab.m_icol[k];
+ const Col& col = icol.m_col;
+ unsigned m = col.m_num;
+ const Val& val = *m_val[m];
+ CHK(val.equal(par, icol) == 0);
}
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
const Col& col = val.m_col;
- if (col.m_pk)
- continue;
- CHK(val.setval(par) == 0);
+ if (! col.m_pk)
+ CHK(val.setval(par) == 0);
}
m_pending = UpdOp;
return 0;
@@ -1824,7 +2091,27 @@ Row::delrow(Par par)
const Val& val = *m_val[k];
const Col& col = val.m_col;
if (col.m_pk)
- CHK(val.setval(par) == 0);
+ CHK(val.equal(par) == 0);
+ }
+ m_pending = DelOp;
+ return 0;
+}
+
+int
+Row::delrow(Par par, const ITab& itab)
+{
+ Con& con = par.con();
+ const Tab& tab = m_tab;
+ assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
+ assert(m_exist);
+ CHK(con.getNdbIndexOperation(itab, tab) == 0);
+ CHKCON(con.m_op->deleteTuple() == 0, con);
+ for (unsigned k = 0; k < itab.m_icols; k++) {
+ const ICol& icol = *itab.m_icol[k];
+ const Col& col = icol.m_col;
+ unsigned m = col.m_num;
+ const Val& val = *m_val[m];
+ CHK(val.equal(par, icol) == 0);
}
m_pending = DelOp;
return 0;
@@ -1841,7 +2128,25 @@ Row::selrow(Par par)
const Val& val = *m_val[k];
const Col& col = val.m_col;
if (col.m_pk)
- CHK(val.setval(par) == 0);
+ CHK(val.equal(par) == 0);
+ }
+ return 0;
+}
+
+int
+Row::selrow(Par par, const ITab& itab)
+{
+ Con& con = par.con();
+ const Tab& tab = m_tab;
+ assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
+ CHK(con.getNdbIndexOperation(itab, tab) == 0);
+ CHKCON(con.m_op->readTuple() == 0, con);
+ for (unsigned k = 0; k < itab.m_icols; k++) {
+ const ICol& icol = *itab.m_icol[k];
+ const Col& col = icol.m_col;
+ unsigned m = col.m_num;
+ const Val& val = *m_val[m];
+ CHK(val.equal(par, icol) == 0);
}
return 0;
}
@@ -1876,6 +2181,40 @@ Row::cmp(const Row& row2) const
return c;
}
+int
+Row::cmp(const Row& row2, const ITab& itab) const
+{
+ const Tab& tab = m_tab;
+ int c = 0;
+ for (unsigned i = 0; i < itab.m_icols; i++) {
+ const ICol& icol = *itab.m_icol[i];
+ const Col& col = icol.m_col;
+ unsigned k = col.m_num;
+ assert(k < tab.m_cols);
+ const Val& val = *m_val[k];
+ const Val& val2 = *row2.m_val[k];
+ if ((c = val.cmp(val2)) != 0)
+ break;
+ }
+ return c;
+}
+
+static NdbOut&
+operator<<(NdbOut& out, const Row::Op op)
+{
+ if (op == Row::NoOp)
+ out << "NoOp";
+ else if (op == Row::InsOp)
+ out << "InsOp";
+ else if (op == Row::UpdOp)
+ out << "UpdOp";
+ else if (op == Row::DelOp)
+ out << "DelOp";
+ else
+ out << op;
+ return out;
+}
+
static NdbOut&
operator<<(NdbOut& out, const Row& row)
{
@@ -1885,10 +2224,21 @@ operator<<(NdbOut& out, const Row& row)
out << " ";
out << *row.m_val[i];
}
- out << " [exist=" << row.m_exist;
+ out << " exist=" << row.m_exist;
if (row.m_pending)
out << " pending=" << row.m_pending;
- out << "]";
+ if (row.m_dbrow != 0)
+ out << " [dbrow=" << *row.m_dbrow << "]";
+ return out;
+}
+
+static NdbOut&
+operator<<(NdbOut& out, const Row* rowptr)
+{
+ if (rowptr == 0)
+ out << "null";
+ else
+ out << *rowptr;
return out;
}
@@ -1898,38 +2248,47 @@ struct Set {
const Tab& m_tab;
unsigned m_rows;
Row** m_row;
- Row** m_saverow;
+ unsigned* m_rowkey; // maps row number (from 0) in scan to tuple key
Row* m_keyrow;
NdbRecAttr** m_rec;
Set(const Tab& tab, unsigned rows);
~Set();
void reset();
unsigned count() const;
- // row methods
+ // old and new values
bool exist(unsigned i) const;
- Row::Op pending(unsigned i) const;
+ void dbsave(unsigned i);
+ void calc(Par par, unsigned i, unsigned mask = 0);
+ bool pending(unsigned i, unsigned mask) const;
void notpending(unsigned i);
void notpending(const Lst& lst);
- void calc(Par par, unsigned i);
+ void dbdiscard(unsigned i);
+ void dbdiscard(const Lst& lst);
+ const Row& dbrow(unsigned i) const;
+ // operations
int insrow(Par par, unsigned i);
int updrow(Par par, unsigned i);
+ int updrow(Par par, const ITab& itab, unsigned i);
int delrow(Par par, unsigned i);
- int selrow(Par par, unsigned i);
+ int delrow(Par par, const ITab& itab, unsigned i);
+ int selrow(Par par, const Row& keyrow);
+ int selrow(Par par, const ITab& itab, const Row& keyrow);
+ // set and get
+ void setkey(Par par, const Row& keyrow);
+ void setkey(Par par, const ITab& itab, const Row& keyrow);
int setrow(Par par, unsigned i);
int getval(Par par);
int getkey(Par par, unsigned* i);
- int putval(unsigned i, bool force);
- // set methods
+ int putval(unsigned i, bool force, unsigned n = ~0);
+ // verify
int verify(const Set& set2) const;
- void savepoint();
- void commit();
- void rollback();
+ int verifyorder(const ITab& itab, bool descending) const;
// protect structure
NdbMutex* m_mutex;
- void lock() {
+ void lock() const {
NdbMutex_Lock(m_mutex);
}
- void unlock() {
+ void unlock() const {
NdbMutex_Unlock(m_mutex);
}
private:
@@ -1945,7 +2304,11 @@ Set::Set(const Tab& tab, unsigned rows) :
// allocate on need to save space
m_row[i] = 0;
}
- m_saverow = 0;
+ m_rowkey = new unsigned [m_rows];
+ for (unsigned n = 0; n < m_rows; n++) {
+ // initialize to null
+ m_rowkey[n] = ~0;
+ }
m_keyrow = new Row(tab);
m_rec = new NdbRecAttr* [tab.m_cols];
for (unsigned k = 0; k < tab.m_cols; k++) {
@@ -1959,11 +2322,9 @@ Set::~Set()
{
for (unsigned i = 0; i < m_rows; i++) {
delete m_row[i];
- if (m_saverow != 0)
- delete m_saverow[i];
}
delete [] m_row;
- delete [] m_saverow;
+ delete [] m_rowkey;
delete m_keyrow;
delete [] m_rec;
NdbMutex_Destroy(m_mutex);
@@ -1994,6 +2355,8 @@ Set::count() const
return count;
}
+// old and new values
+
bool
Set::exist(unsigned i) const
{
@@ -2003,13 +2366,37 @@ Set::exist(unsigned i) const
return m_row[i]->m_exist;
}
-Row::Op
-Set::pending(unsigned i) const
+void
+Set::dbsave(unsigned i)
+{
+ const Tab& tab = m_tab;
+ assert(i < m_rows && m_row[i] != 0);
+ Row& row = *m_row[i];
+ LL5("dbsave " << i << ": " << row);
+ assert(row.m_exist && ! row.m_pending && row.m_dbrow == 0);
+ // could swap pointers but making copy is safer
+ Row* rowptr = new Row(tab);
+ rowptr->copy(row);
+ row.m_dbrow = rowptr;
+}
+
+void
+Set::calc(Par par, unsigned i, unsigned mask)
+{
+ const Tab& tab = m_tab;
+ if (m_row[i] == 0)
+ m_row[i] = new Row(tab);
+ Row& row = *m_row[i];
+ row.calc(par, i, mask);
+}
+
+bool
+Set::pending(unsigned i, unsigned mask) const
{
assert(i < m_rows);
if (m_row[i] == 0) // not allocated => not pending
return Row::NoOp;
- return m_row[i]->m_pending;
+ return m_row[i]->m_pending & mask;
}
void
@@ -2017,10 +2404,13 @@ Set::notpending(unsigned i)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
- if (row.m_pending == Row::InsOp)
+ if (row.m_pending == Row::InsOp) {
row.m_exist = true;
- if (row.m_pending == Row::DelOp)
+ } else if (row.m_pending == Row::UpdOp) {
+ ;
+ } else if (row.m_pending == Row::DelOp) {
row.m_exist = false;
+ }
row.m_pending = Row::NoOp;
}
@@ -2034,15 +2424,35 @@ Set::notpending(const Lst& lst)
}
void
-Set::calc(Par par, unsigned i)
+Set::dbdiscard(unsigned i)
{
- const Tab& tab = m_tab;
- if (m_row[i] == 0)
- m_row[i] = new Row(tab);
+ assert(m_row[i] != 0);
+ Row& row = *m_row[i];
+ LL5("dbdiscard " << i << ": " << row);
+ assert(row.m_dbrow != 0);
+ delete row.m_dbrow;
+ row.m_dbrow = 0;
+}
+
+const Row&
+Set::dbrow(unsigned i) const
+{
+ assert(m_row[i] != 0);
Row& row = *m_row[i];
- row.calc(par, i);
+ return row.dbrow();
+}
+
+void
+Set::dbdiscard(const Lst& lst)
+{
+ for (unsigned j = 0; j < lst.m_cnt; j++) {
+ unsigned i = lst.m_arr[j];
+ dbdiscard(i);
+ }
}
+// operations
+
int
Set::insrow(Par par, unsigned i)
{
@@ -2062,6 +2472,15 @@ Set::updrow(Par par, unsigned i)
}
int
+Set::updrow(Par par, const ITab& itab, unsigned i)
+{
+ assert(m_row[i] != 0);
+ Row& row = *m_row[i];
+ CHK(row.updrow(par, itab) == 0);
+ return 0;
+}
+
+int
Set::delrow(Par par, unsigned i)
{
assert(m_row[i] != 0);
@@ -2071,16 +2490,68 @@ Set::delrow(Par par, unsigned i)
}
int
-Set::selrow(Par par, unsigned i)
+Set::delrow(Par par, const ITab& itab, unsigned i)
+{
+ assert(m_row[i] != 0);
+ Row& row = *m_row[i];
+ CHK(row.delrow(par, itab) == 0);
+ return 0;
+}
+
+int
+Set::selrow(Par par, const Row& keyrow)
{
Con& con = par.con();
- m_keyrow->calc(par, i);
+ const Tab& tab = par.tab();
+ setkey(par, keyrow);
+ LL5("selrow " << tab.m_name << ": keyrow: " << keyrow);
CHK(m_keyrow->selrow(par) == 0);
CHK(getval(par) == 0);
return 0;
}
int
+Set::selrow(Par par, const ITab& itab, const Row& keyrow)
+{
+ Con& con = par.con();
+ setkey(par, itab, keyrow);
+ LL5("selrow " << itab.m_name << ": keyrow: " << keyrow);
+ CHK(m_keyrow->selrow(par, itab) == 0);
+ CHK(getval(par) == 0);
+ return 0;
+}
+
+// set and get
+
+void
+Set::setkey(Par par, const Row& keyrow)
+{
+ const Tab& tab = m_tab;
+ for (unsigned k = 0; k < tab.m_cols; k++) {
+ const Col& col = *tab.m_col[k];
+ if (col.m_pk) {
+ Val& val1 = *m_keyrow->m_val[k];
+ const Val& val2 = *keyrow.dbrow().m_val[k];
+ val1.copy(val2);
+ }
+ }
+}
+
+void
+Set::setkey(Par par, const ITab& itab, const Row& keyrow)
+{
+ const Tab& tab = m_tab;
+ for (unsigned k = 0; k < itab.m_icols; k++) {
+ const ICol& icol = *itab.m_icol[k];
+ const Col& col = icol.m_col;
+ unsigned m = col.m_num;
+ Val& val1 = *m_keyrow->m_val[m];
+ const Val& val2 = *keyrow.dbrow().m_val[m];
+ val1.copy(val2);
+ }
+}
+
+int
Set::setrow(Par par, unsigned i)
{
Con& con = par.con();
@@ -2114,7 +2585,7 @@ Set::getkey(Par par, unsigned* i)
}
int
-Set::putval(unsigned i, bool force)
+Set::putval(unsigned i, bool force, unsigned n)
{
const Tab& tab = m_tab;
if (m_row[i] == 0)
@@ -2135,55 +2606,55 @@ Set::putval(unsigned i, bool force)
}
if (! row.m_exist)
row.m_exist = true;
+ if (n != ~0)
+ m_rowkey[n] = i;
return 0;
}
+// verify
+
int
Set::verify(const Set& set2) const
{
- const Tab& tab = m_tab;
- assert(&tab == &set2.m_tab && m_rows == set2.m_rows);
- LL3("verify set1 count=" << count() << " vs set2 count=" << set2.count());
+ assert(&m_tab == &set2.m_tab && m_rows == set2.m_rows);
+ LL4("verify set1 count=" << count() << " vs set2 count=" << set2.count());
for (unsigned i = 0; i < m_rows; i++) {
- CHK(exist(i) == set2.exist(i));
- if (! exist(i))
- continue;
- Row& row = *m_row[i];
- Row& row2 = *set2.m_row[i];
- CHK(row.verify(row2) == 0);
+ bool ok = true;
+ if (exist(i) != set2.exist(i)) {
+ ok = false;
+ } else if (exist(i)) {
+ if (dbrow(i).verify(set2.dbrow(i)) != 0)
+ ok = false;
+ }
+ if (! ok) {
+ LL1("verify failed: key=" << i << " row1=" << m_row[i] << " row2=" << set2.m_row[i]);
+ CHK(0 == 1);
+ }
}
return 0;
}
-void
-Set::savepoint()
+int
+Set::verifyorder(const ITab& itab, bool descending) const
{
const Tab& tab = m_tab;
- assert(m_saverow == 0);
- m_saverow = new Row* [m_rows];
- for (unsigned i = 0; i < m_rows; i++) {
- if (m_row[i] == 0)
- m_saverow[i] = 0;
- else {
- m_saverow[i] = new Row(tab);
- m_saverow[i]->copy(*m_row[i]);
- }
+ for (unsigned n = 0; n < m_rows; n++) {
+ unsigned i2 = m_rowkey[n];
+ if (i2 == ~0)
+ break;
+ if (n == 0)
+ continue;
+ unsigned i1 = m_rowkey[n - 1];
+ assert(i1 < m_rows && i2 < m_rows);
+ const Row& row1 = *m_row[i1];
+ const Row& row2 = *m_row[i2];
+ assert(row1.m_exist && row2.m_exist);
+ if (! descending)
+ CHK(row1.cmp(row2, itab) <= 0);
+ else
+ CHK(row1.cmp(row2, itab) >= 0);
}
-}
-
-void
-Set::commit()
-{
- delete [] m_saverow;
- m_saverow = 0;
-}
-
-void
-Set::rollback()
-{
- assert(m_saverow != 0);
- m_row = m_saverow;
- m_saverow = 0;
+ return 0;
}
static NdbOut&
@@ -2384,7 +2855,9 @@ BSet::filter(const Set& set, Set& set2) const
for (unsigned i = 0; i < set.m_rows; i++) {
if (! set.exist(i))
continue;
- const Row& row = *set.m_row[i];
+ set.lock();
+ const Row& row = set.dbrow(i);
+ set.unlock();
if (! g_store_null_key) {
bool ok1 = false;
for (unsigned k = 0; k < itab.m_icols; k++) {
@@ -2430,7 +2903,6 @@ BSet::filter(const Set& set, Set& set2) const
Row& row2 = *set2.m_row[i];
assert(! row2.m_exist);
row2.copy(row);
- row2.m_exist = true;
}
}
@@ -2451,15 +2923,16 @@ static int
pkinsert(Par par)
{
Con& con = par.con();
+ const Tab& tab = par.tab();
Set& set = par.set();
- LL3("pkinsert");
+ LL3("pkinsert " << tab.m_name);
CHK(con.startTransaction() == 0);
Lst lst;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
- if (set.exist(i) || set.pending(i)) {
+ if (set.exist(i) || set.pending(i, Row::AnyOp)) {
set.unlock();
continue;
}
@@ -2473,7 +2946,7 @@ pkinsert(Par par)
CHK(con.execute(Commit, deadlock) == 0);
con.closeTransaction();
if (deadlock) {
- LL1("pkinsert: stop on deadlock");
+ LL1("pkinsert: stop on deadlock [at 1]");
return 0;
}
set.lock();
@@ -2488,7 +2961,7 @@ pkinsert(Par par)
CHK(con.execute(Commit, deadlock) == 0);
con.closeTransaction();
if (deadlock) {
- LL1("pkinsert: stop on deadlock");
+ LL1("pkinsert: stop on deadlock [at 2]");
return 0;
}
set.lock();
@@ -2504,8 +2977,9 @@ static int
pkupdate(Par par)
{
Con& con = par.con();
+ const Tab& tab = par.tab();
Set& set = par.set();
- LL3("pkupdate");
+ LL3("pkupdate " << tab.m_name);
CHK(con.startTransaction() == 0);
Lst lst;
bool deadlock = false;
@@ -2513,10 +2987,11 @@ pkupdate(Par par)
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
- if (! set.exist(i) || set.pending(i)) {
+ if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
set.unlock();
continue;
}
+ set.dbsave(i);
set.calc(par, i);
CHK(set.updrow(par, i) == 0);
set.unlock();
@@ -2526,12 +3001,13 @@ pkupdate(Par par)
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
- LL1("pkupdate: stop on deadlock");
+ LL1("pkupdate: stop on deadlock [at 1]");
break;
}
con.closeTransaction();
set.lock();
set.notpending(lst);
+ set.dbdiscard(lst);
set.unlock();
lst.reset();
CHK(con.startTransaction() == 0);
@@ -2541,10 +3017,11 @@ pkupdate(Par par)
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
- LL1("pkupdate: stop on deadlock");
+ LL1("pkupdate: stop on deadlock [at 1]");
} else {
set.lock();
set.notpending(lst);
+ set.dbdiscard(lst);
set.unlock();
}
}
@@ -2556,8 +3033,9 @@ static int
pkdelete(Par par)
{
Con& con = par.con();
+ const Tab& tab = par.tab();
Set& set = par.set();
- LL3("pkdelete");
+ LL3("pkdelete " << tab.m_name);
CHK(con.startTransaction() == 0);
Lst lst;
bool deadlock = false;
@@ -2565,7 +3043,7 @@ pkdelete(Par par)
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
- if (! set.exist(i) || set.pending(i)) {
+ if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
set.unlock();
continue;
}
@@ -2577,7 +3055,7 @@ pkdelete(Par par)
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
- LL1("pkdelete: stop on deadlock");
+ LL1("pkdelete: stop on deadlock [at 1]");
break;
}
con.closeTransaction();
@@ -2592,7 +3070,7 @@ pkdelete(Par par)
deadlock = par.m_deadlock;
CHK(con.execute(Commit, deadlock) == 0);
if (deadlock) {
- LL1("pkdelete: stop on deadlock");
+ LL1("pkdelete: stop on deadlock [at 2]");
} else {
set.lock();
set.notpending(lst);
@@ -2609,19 +3087,19 @@ pkread(Par par)
Con& con = par.con();
const Tab& tab = par.tab();
Set& set = par.set();
- LL3((par.m_verify ? "pkverify " : "pkread ") << tab.m_name);
+ LL3("pkread " << tab.m_name << " verify=" << par.m_verify);
// expected
const Set& set1 = set;
Set set2(tab, set.m_rows);
for (unsigned i = 0; i < set.m_rows; i++) {
set.lock();
- if (! set.exist(i) || set.pending(i)) {
+ if (! set.exist(i)) {
set.unlock();
continue;
}
set.unlock();
CHK(con.startTransaction() == 0);
- CHK(set2.selrow(par, i) == 0);
+ CHK(set2.selrow(par, *set1.m_row[i]) == 0);
CHK(con.execute(Commit) == 0);
unsigned i2 = (unsigned)-1;
CHK(set2.getkey(par, &i2) == 0 && i == i2);
@@ -2659,6 +3137,146 @@ pkreadfast(Par par, unsigned count)
return 0;
}
+// hash index operations
+
+static int
+hashindexupdate(Par par, const ITab& itab)
+{
+ Con& con = par.con();
+ Set& set = par.set();
+ LL3("hashindexupdate " << itab.m_name);
+ CHK(con.startTransaction() == 0);
+ Lst lst;
+ bool deadlock = false;
+ for (unsigned j = 0; j < par.m_rows; j++) {
+ unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
+ unsigned i = thrrow(par, j2);
+ set.lock();
+ if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
+ set.unlock();
+ continue;
+ }
+ set.dbsave(i);
+ // index key columns are not re-calculated
+ set.calc(par, i, itab.m_colmask);
+ CHK(set.updrow(par, itab, i) == 0);
+ set.unlock();
+ LL4("hashindexupdate " << i << ": " << *set.m_row[i]);
+ lst.push(i);
+ if (lst.cnt() == par.m_batch) {
+ deadlock = par.m_deadlock;
+ CHK(con.execute(Commit, deadlock) == 0);
+ if (deadlock) {
+ LL1("hashindexupdate: stop on deadlock [at 1]");
+ break;
+ }
+ con.closeTransaction();
+ set.lock();
+ set.notpending(lst);
+ set.dbdiscard(lst);
+ set.unlock();
+ lst.reset();
+ CHK(con.startTransaction() == 0);
+ }
+ }
+ if (! deadlock && lst.cnt() != 0) {
+ deadlock = par.m_deadlock;
+ CHK(con.execute(Commit, deadlock) == 0);
+ if (deadlock) {
+ LL1("hashindexupdate: stop on deadlock [at 1]");
+ } else {
+ set.lock();
+ set.notpending(lst);
+ set.dbdiscard(lst);
+ set.unlock();
+ }
+ }
+ con.closeTransaction();
+ return 0;
+};
+
+static int
+hashindexdelete(Par par, const ITab& itab)
+{
+ Con& con = par.con();
+ Set& set = par.set();
+ LL3("hashindexdelete " << itab.m_name);
+ CHK(con.startTransaction() == 0);
+ Lst lst;
+ bool deadlock = false;
+ for (unsigned j = 0; j < par.m_rows; j++) {
+ unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
+ unsigned i = thrrow(par, j2);
+ set.lock();
+ if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
+ set.unlock();
+ continue;
+ }
+ CHK(set.delrow(par, itab, i) == 0);
+ set.unlock();
+ LL4("hashindexdelete " << i << ": " << *set.m_row[i]);
+ lst.push(i);
+ if (lst.cnt() == par.m_batch) {
+ deadlock = par.m_deadlock;
+ CHK(con.execute(Commit, deadlock) == 0);
+ if (deadlock) {
+ LL1("hashindexdelete: stop on deadlock [at 1]");
+ break;
+ }
+ con.closeTransaction();
+ set.lock();
+ set.notpending(lst);
+ set.unlock();
+ lst.reset();
+ CHK(con.startTransaction() == 0);
+ }
+ }
+ if (! deadlock && lst.cnt() != 0) {
+ deadlock = par.m_deadlock;
+ CHK(con.execute(Commit, deadlock) == 0);
+ if (deadlock) {
+ LL1("hashindexdelete: stop on deadlock [at 2]");
+ } else {
+ set.lock();
+ set.notpending(lst);
+ set.unlock();
+ }
+ }
+ con.closeTransaction();
+ return 0;
+};
+
+static int
+hashindexread(Par par, const ITab& itab)
+{
+ Con& con = par.con();
+ const Tab& tab = par.tab();
+ Set& set = par.set();
+ LL3("hashindexread " << itab.m_name << " verify=" << par.m_verify);
+ // expected
+ const Set& set1 = set;
+ Set set2(tab, set.m_rows);
+ for (unsigned i = 0; i < set.m_rows; i++) {
+ set.lock();
+ if (! set.exist(i)) {
+ set.unlock();
+ continue;
+ }
+ set.unlock();
+ CHK(con.startTransaction() == 0);
+ CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0);
+ CHK(con.execute(Commit) == 0);
+ unsigned i2 = (unsigned)-1;
+ CHK(set2.getkey(par, &i2) == 0 && i == i2);
+ CHK(set2.putval(i, false) == 0);
+ LL4("row " << set2.count() << ": " << *set2.m_row[i]);
+ con.closeTransaction();
+ }
+ if (par.m_verify)
+ CHK(set1.verify(set2) == 0);
+ return 0;
+}
+
// scan read
static int
@@ -2691,14 +3309,14 @@ scanreadtable(Par par)
}
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
- CHK(set2.putval(i, false) == 0);
+ CHK(set2.putval(i, false, n) == 0);
LL4("row " << n << ": " << *set2.m_row[i]);
n++;
}
con.closeTransaction();
if (par.m_verify)
CHK(set1.verify(set2) == 0);
- LL3("scanread " << tab.m_name << " rows=" << n);
+ LL3("scanread " << tab.m_name << " done rows=" << n);
return 0;
}
@@ -2745,19 +3363,22 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
// prefer proper subset
if (0 < n && n < set.m_rows)
break;
- if (urandom(5) == 0)
+ if (urandom(3) == 0)
break;
set1.reset();
}
} else {
bset.filter(set, set1);
}
- LL3("scanread " << itab.m_name << " bounds=" << bset.m_bvals << " verify=" << par.m_verify);
+ LL3("scanread " << itab.m_name << " bounds=" << bset << " verify=" << par.m_verify << " ordered=" << par.m_ordered << " descending=" << par.m_descending);
LL4("expect " << set1.count() << " rows");
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(itab, tab) == 0);
- CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0);
+ if (! par.m_ordered)
+ CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0);
+ else
+ CHK(con.openScanOrdered(par.m_scanbat, par.m_scanpar, par.m_descending) == 0);
CHK(bset.setbnd(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
@@ -2775,15 +3396,17 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
}
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
- LL4("key " << i);
- CHK(set2.putval(i, par.m_dups) == 0);
- LL4("row " << n << ": " << *set2.m_row[i]);
+ CHK(set2.putval(i, par.m_dups, n) == 0);
+ LL4("key " << i << " row " << n << ": " << *set2.m_row[i]);
n++;
}
con.closeTransaction();
- if (par.m_verify)
+ if (par.m_verify) {
CHK(set1.verify(set2) == 0);
- LL3("scanread " << itab.m_name << " rows=" << n);
+ if (par.m_ordered)
+ CHK(set2.verifyorder(itab, par.m_descending) == 0);
+ }
+ LL3("scanread " << itab.m_name << " done rows=" << n);
return 0;
}
@@ -2821,8 +3444,10 @@ scanreadindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subsubloop; i++) {
- BSet bset(tab, itab, par.m_rows);
- CHK(scanreadindex(par, itab, bset, true) == 0);
+ if (itab.m_type == ITab::OrderedIndex) {
+ BSet bset(tab, itab, par.m_rows);
+ CHK(scanreadindex(par, itab, bset, true) == 0);
+ }
}
return 0;
}
@@ -2835,7 +3460,11 @@ scanreadindex(Par par)
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
- CHK(scanreadindex(par, itab) == 0);
+ if (itab.m_type == ITab::OrderedIndex) {
+ CHK(scanreadindex(par, itab) == 0);
+ } else {
+ CHK(hashindexread(par, itab) == 0);
+ }
}
return 0;
}
@@ -2932,7 +3561,7 @@ scanupdatetable(Par par)
if (ret == 1)
break;
if (deadlock) {
- LL1("scanupdatetable: stop on deadlock");
+ LL1("scanupdatetable: stop on deadlock [at 1]");
break;
}
if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
@@ -2944,13 +3573,14 @@ scanupdatetable(Par par)
CHK(set2.getkey(par, &i) == 0);
const Row& row = *set.m_row[i];
set.lock();
- if (! set.exist(i) || set.pending(i)) {
+ if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
LL4("scan update " << tab.m_name << ": skip: " << row);
} else {
CHKTRY(set2.putval(i, false) == 0, set.unlock());
CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
Par par2 = par;
par2.m_con = &con2;
+ set.dbsave(i);
set.calc(par, i);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
LL4("scan update " << tab.m_name << ": " << row);
@@ -2961,12 +3591,13 @@ scanupdatetable(Par par)
deadlock = par.m_deadlock;
CHK(con2.execute(Commit, deadlock) == 0);
if (deadlock) {
- LL1("scanupdateindex: stop on deadlock");
+ LL1("scanupdatetable: stop on deadlock [at 2]");
goto out;
}
con2.closeTransaction();
set.lock();
set.notpending(lst);
+ set.dbdiscard(lst);
set.unlock();
count += lst.cnt();
lst.reset();
@@ -2977,12 +3608,13 @@ scanupdatetable(Par par)
deadlock = par.m_deadlock;
CHK(con2.execute(Commit, deadlock) == 0);
if (deadlock) {
- LL1("scanupdateindex: stop on deadlock");
+ LL1("scanupdatetable: stop on deadlock [at 3]");
goto out;
}
con2.closeTransaction();
set.lock();
set.notpending(lst);
+ set.dbdiscard(lst);
set.unlock();
count += lst.cnt();
lst.reset();
@@ -3009,7 +3641,10 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(itab, tab) == 0);
- CHK(con.openScanExclusive(par.m_scanbat, par.m_scanpar) == 0);
+ if (! par.m_ordered)
+ CHK(con.openScanExclusive(par.m_scanbat, par.m_scanpar) == 0);
+ else
+ CHK(con.openScanOrderedExclusive(par.m_scanbat, par.m_scanpar, par.m_descending) == 0);
CHK(bset.setbnd(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
@@ -3027,7 +3662,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
if (ret == 1)
break;
if (deadlock) {
- LL1("scanupdateindex: stop on deadlock");
+ LL1("scanupdateindex: stop on deadlock [at 1]");
break;
}
if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
@@ -3039,13 +3674,14 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
CHK(set2.getkey(par, &i) == 0);
const Row& row = *set.m_row[i];
set.lock();
- if (! set.exist(i) || set.pending(i)) {
+ if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
LL4("scan update " << itab.m_name << ": skip: " << row);
} else {
CHKTRY(set2.putval(i, par.m_dups) == 0, set.unlock());
CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
Par par2 = par;
par2.m_con = &con2;
+ set.dbsave(i);
set.calc(par, i);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
LL4("scan update " << itab.m_name << ": " << row);
@@ -3056,12 +3692,13 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
deadlock = par.m_deadlock;
CHK(con2.execute(Commit, deadlock) == 0);
if (deadlock) {
- LL1("scanupdateindex: stop on deadlock");
+ LL1("scanupdateindex: stop on deadlock [at 2]");
goto out;
}
con2.closeTransaction();
set.lock();
set.notpending(lst);
+ set.dbdiscard(lst);
set.unlock();
count += lst.cnt();
lst.reset();
@@ -3072,12 +3709,13 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
deadlock = par.m_deadlock;
CHK(con2.execute(Commit, deadlock) == 0);
if (deadlock) {
- LL1("scanupdateindex: stop on deadlock");
+ LL1("scanupdateindex: stop on deadlock [at 3]");
goto out;
}
con2.closeTransaction();
set.lock();
set.notpending(lst);
+ set.dbdiscard(lst);
set.unlock();
count += lst.cnt();
lst.reset();
@@ -3097,9 +3735,13 @@ scanupdateindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subsubloop; i++) {
- BSet bset(tab, itab, par.m_rows);
- bset.calc(par);
- CHK(scanupdateindex(par, itab, bset) == 0);
+ if (itab.m_type == ITab::OrderedIndex) {
+ BSet bset(tab, itab, par.m_rows);
+ bset.calc(par);
+ CHK(scanupdateindex(par, itab, bset) == 0);
+ } else {
+ CHK(hashindexupdate(par, itab) == 0);
+ }
}
return 0;
}
@@ -3151,8 +3793,12 @@ readverifyfull(Par par)
unsigned i = par.m_no - 1;
if (i < tab.m_itabs && tab.m_itab[i] != 0) {
const ITab& itab = *tab.m_itab[i];
- BSet bset(tab, itab, par.m_rows);
- CHK(scanreadindex(par, itab, bset, false) == 0);
+ if (itab.m_type == ITab::OrderedIndex) {
+ BSet bset(tab, itab, par.m_rows);
+ CHK(scanreadindex(par, itab, bset, false) == 0);
+ } else {
+ CHK(hashindexread(par, itab) == 0);
+ }
}
}
return 0;
@@ -3162,6 +3808,11 @@ static int
readverifyindex(Par par)
{
par.m_verify = true;
+ unsigned sel = urandom(10);
+ if (sel < 9) {
+ par.m_ordered = true;
+ par.m_descending = (sel < 5);
+ }
CHK(scanreadindex(par) == 0);
return 0;
}
@@ -3169,26 +3820,56 @@ readverifyindex(Par par)
static int
pkops(Par par)
{
+ const Tab& tab = par.tab();
par.m_randomkey = true;
for (unsigned i = 0; i < par.m_subsubloop; i++) {
+ unsigned j = 0;
+ while (j < tab.m_itabs) {
+ if (tab.m_itab[j] != 0) {
+ const ITab& itab = *tab.m_itab[j];
+ if (itab.m_type == ITab::UniqueHashIndex && urandom(5) == 0)
+ break;
+ }
+ j++;
+ }
unsigned sel = urandom(10);
if (par.m_slno % 2 == 0) {
// favor insert
if (sel < 8) {
CHK(pkinsert(par) == 0);
} else if (sel < 9) {
- CHK(pkupdate(par) == 0);
+ if (j == tab.m_itabs)
+ CHK(pkupdate(par) == 0);
+ else {
+ const ITab& itab = *tab.m_itab[j];
+ CHK(hashindexupdate(par, itab) == 0);
+ }
} else {
- CHK(pkdelete(par) == 0);
+ if (j == tab.m_itabs)
+ CHK(pkdelete(par) == 0);
+ else {
+ const ITab& itab = *tab.m_itab[j];
+ CHK(hashindexdelete(par, itab) == 0);
+ }
}
} else {
// favor delete
if (sel < 1) {
CHK(pkinsert(par) == 0);
} else if (sel < 2) {
- CHK(pkupdate(par) == 0);
+ if (j == tab.m_itabs)
+ CHK(pkupdate(par) == 0);
+ else {
+ const ITab& itab = *tab.m_itab[j];
+ CHK(hashindexupdate(par, itab) == 0);
+ }
} else {
- CHK(pkdelete(par) == 0);
+ if (j == tab.m_itabs)
+ CHK(pkdelete(par) == 0);
+ else {
+ const ITab& itab = *tab.m_itab[j];
+ CHK(hashindexdelete(par, itab) == 0);
+ }
}
}
}
@@ -3208,6 +3889,10 @@ pkupdatescanread(Par par)
CHK(scanreadtable(par) == 0);
} else {
par.m_verify = false;
+ if (sel < 8) {
+ par.m_ordered = true;
+ par.m_descending = (sel < 7);
+ }
CHK(scanreadindex(par) == 0);
}
return 0;
@@ -3227,6 +3912,10 @@ mixedoperations(Par par)
} else if (sel < 6) {
CHK(scanupdatetable(par) == 0);
} else {
+ if (sel < 8) {
+ par.m_ordered = true;
+ par.m_descending = (sel < 7);
+ }
CHK(scanupdateindex(par) == 0);
}
return 0;
@@ -3720,7 +4409,7 @@ printtables()
{
Par par(g_opt);
makebuiltintables(par);
- ndbout << "builtin tables (index x0 is on table pk):" << endl;
+ ndbout << "builtin tables (x0 on pk, x=ordered z=hash):" << endl;
for (unsigned j = 0; j < tabcount; j++) {
if (tablist[j] == 0)
continue;
@@ -3744,6 +4433,7 @@ runtest(Par par)
LL1("random seed: " << seed);
srandom((unsigned)seed);
} else if (par.m_seed != 0)
+ LL1("random seed: " << par.m_seed);
srandom(par.m_seed);
// cs
assert(par.m_csname != 0);
@@ -3953,7 +4643,8 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
if (strcmp(arg, "-threads") == 0) {
if (++argv, --argc > 0) {
g_opt.m_threads = atoi(argv[0]);
- continue;
+ if (1 <= g_opt.m_threads)
+ continue;
}
}
if (strcmp(arg, "-v") == 0) {
@@ -3970,7 +4661,7 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
printhelp();
goto wrongargs;
}
- ndbout << "testOIBasic: unknown option " << arg;
+ ndbout << "testOIBasic: bad or unknown option " << arg;
goto usage;
}
{