diff options
author | unknown <pekka@mysql.com> | 2005-01-07 11:55:20 +0100 |
---|---|---|
committer | unknown <pekka@mysql.com> | 2005-01-07 11:55:20 +0100 |
commit | 2331344909a96d2acb4fb33ea649c22be6dbfe52 (patch) | |
tree | 56e42d07f6ec8428d0ac03e0621935d6a69a10ad /ndb/test | |
parent | bec9de67d4b780da8a5b5b7ecb5a31534434b823 (diff) | |
download | mariadb-git-2331344909a96d2acb4fb33ea649c22be6dbfe52.tar.gz |
ndb - wl-1442 new varchar
mysql-test/r/ndb_alter_table.result:
wl-1442 new varchar
mysql-test/r/ndb_bitfield.result:
wl-1442 new varchar
mysql-test/r/ndb_charset.result:
wl-1442 new varchar
mysql-test/t/ndb_charset.test:
wl-1442 new varchar
ndb/include/kernel/signaldata/DictTabInfo.hpp:
wl-1442 new varchar
ndb/include/ndb_constants.h:
wl-1442 new varchar
ndb/include/ndbapi/NdbDictionary.hpp:
wl-1442 new varchar
ndb/include/util/NdbSqlUtil.hpp:
wl-1442 new varchar
ndb/src/common/util/NdbSqlUtil.cpp:
wl-1442 new varchar
ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
wl-1442 new varchar
ndb/src/kernel/blocks/dbdict/Dbdict.cpp:
wl-1442 new varchar
ndb/src/kernel/blocks/dbtc/Dbtc.hpp:
wl-1442 new varchar
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
wl-1442 new varchar
ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp:
wl-1442 new varchar
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp:
wl-1442 new varchar
ndb/src/ndbapi/NdbDictionary.cpp:
wl-1442 new varchar
ndb/src/ndbapi/NdbDictionaryImpl.cpp:
wl-1442 new varchar
ndb/src/ndbapi/NdbDictionaryImpl.hpp:
wl-1442 new varchar
ndb/src/ndbapi/NdbOperationDefine.cpp:
wl-1442 new varchar
ndb/src/ndbapi/NdbOperationSearch.cpp:
wl-1442 new varchar
ndb/src/ndbapi/NdbRecAttr.cpp:
wl-1442 new varchar
ndb/src/ndbapi/NdbTransaction.cpp:
wl-1442 new varchar
ndb/test/ndbapi/testOIBasic.cpp:
wl-1442 new varchar
sql/ha_ndbcluster.cc:
wl-1442 new varchar
Diffstat (limited to 'ndb/test')
-rw-r--r-- | ndb/test/ndbapi/testOIBasic.cpp | 628 |
1 files changed, 401 insertions, 227 deletions
diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp index adaccebed6e..8a09a2ec9d1 100644 --- a/ndb/test/ndbapi/testOIBasic.cpp +++ b/ndb/test/ndbapi/testOIBasic.cpp @@ -28,7 +28,9 @@ #include <NdbCondition.h> #include <NdbThread.h> #include <NdbTick.h> +#include <NdbSleep.h> #include <my_sys.h> +#include <NdbSqlUtil.hpp> // options @@ -72,7 +74,7 @@ struct Opt { m_die(0), m_dups(false), m_fragtype(NdbDictionary::Object::FragUndefined), - m_subsubloop(2), + m_subsubloop(4), m_index(0), m_loop(1), m_msglock(true), @@ -87,7 +89,7 @@ struct Opt { m_seed(-1), m_subloop(4), m_table(0), - m_threads(10), + m_threads(4), m_v(1) { } }; @@ -257,6 +259,7 @@ struct Par : public Opt { bool m_verify; // deadlock possible bool m_deadlock; + NdbOperation::LockMode m_lockmode; // ordered range scan bool m_ordered; bool m_descending; @@ -278,6 +281,7 @@ struct Par : public Opt { m_randomkey(false), m_verify(false), m_deadlock(false), + m_lockmode(NdbOperation::LM_Read), m_ordered(false), m_descending(false) { } @@ -598,7 +602,9 @@ getcs(Par par) struct Col { enum Type { Unsigned = NdbDictionary::Column::Unsigned, - Char = NdbDictionary::Column::Char + Char = NdbDictionary::Column::Char, + Varchar = NdbDictionary::Column::Varchar, + Longvarchar = NdbDictionary::Column::Longvarchar }; const class Tab& m_tab; unsigned m_num; @@ -612,7 +618,7 @@ struct Col { Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs); ~Col(); bool equal(const Col& col2) const; - void verify(const void* addr) const; + void wellformed(const void* addr) const; }; Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs) : @@ -626,6 +632,9 @@ Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type typ m_nullable(nullable), m_chs(chs) { + // fix long varchar + if (type == Varchar && m_bytelength > 255) + m_type = Longvarchar; } Col::~Col() @@ -640,7 +649,7 @@ Col::equal(const Col& col2) const } void -Col::verify(const void* addr) const +Col::wellformed(const void* addr) const { switch (m_type) { case Col::Unsigned: @@ -653,6 +662,26 @@ Col::verify(const void* addr) const assert((*cs->cset->well_formed_len)(cs, src, src + len, 0xffff) == len); } break; + case Col::Varchar: + { + CHARSET_INFO* cs = m_chs->m_cs; + const unsigned char* src = (const unsigned char*)addr; + const char* ssrc = (const char*)src; + unsigned len = src[0]; + assert(len <= m_bytelength); + assert((*cs->cset->well_formed_len)(cs, ssrc + 1, ssrc + 1 + len, 0xffff) == len); + } + break; + case Col::Longvarchar: + { + CHARSET_INFO* cs = m_chs->m_cs; + const unsigned char* src = (const unsigned char*)addr; + const char* ssrc = (const char*)src; + unsigned len = src[0] + (src[1] << 8); + assert(len <= m_bytelength); + assert((*cs->cset->well_formed_len)(cs, ssrc + 2, ssrc + 2 + len, 0xffff) == len); + } + break; default: assert(false); break; @@ -673,6 +702,18 @@ operator<<(NdbOut& out, const Col& col) out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")"; } break; + case Col::Varchar: + { + CHARSET_INFO* cs = col.m_chs->m_cs; + out << " varchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")"; + } + break; + case Col::Longvarchar: + { + CHARSET_INFO* cs = col.m_chs->m_cs; + out << " longvarchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")"; + } + break; default: out << "type" << (int)col.m_type; assert(false); @@ -928,35 +969,27 @@ makebuiltintables(Par par) t->itabadd(2, x); } if (useindex(par, 3)) { - // d, c, b - ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 3); - x->icoladd(0, new ICol(*x, 0, *t->m_col[3])); - x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); - x->icoladd(2, new ICol(*x, 2, *t->m_col[1])); - t->itabadd(3, x); - } - if (useindex(par, 4)) { // b, e, c, d - ITab* x = new ITab(*t, "ti0x4", ITab::OrderedIndex, 4); + ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 4); x->icoladd(0, new ICol(*x, 0, *t->m_col[1])); x->icoladd(1, new ICol(*x, 1, *t->m_col[4])); x->icoladd(2, new ICol(*x, 2, *t->m_col[2])); x->icoladd(3, new ICol(*x, 3, *t->m_col[3])); - t->itabadd(4, x); + t->itabadd(3, x); } - if (useindex(par, 5)) { + if (useindex(par, 4)) { // a, c - ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2); + ITab* x = new ITab(*t, "ti0z4", ITab::UniqueHashIndex, 2); x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); - t->itabadd(5, x); + t->itabadd(4, x); } - if (useindex(par, 6)) { + if (useindex(par, 5)) { // a, e - ITab* x = new ITab(*t, "ti0z6", ITab::UniqueHashIndex, 2); + ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2); x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); x->icoladd(1, new ICol(*x, 1, *t->m_col[4])); - t->itabadd(6, x); + t->itabadd(5, x); } tablist[0] = t; } @@ -967,56 +1000,50 @@ makebuiltintables(Par par) t->coladd(0, new Col(*t, 0, "a", 0, Col::Unsigned, 1, 0, 0)); t->coladd(1, new Col(*t, 1, "b", 1, Col::Unsigned, 1, 0, 0)); t->coladd(2, new Col(*t, 2, "c", 0, Col::Char, 20, 1, getcs(par))); - t->coladd(3, new Col(*t, 3, "d", 0, Col::Char, 5, 0, getcs(par))); - t->coladd(4, new Col(*t, 4, "e", 0, Col::Char, 5, 1, getcs(par))); + t->coladd(3, new Col(*t, 3, "d", 0, Col::Varchar, 5, 0, getcs(par))); + t->coladd(4, new Col(*t, 4, "e", 0, Col::Longvarchar, 5, 1, getcs(par))); if (useindex(par, 0)) { // b ITab* x = new ITab(*t, "ti1x0", ITab::OrderedIndex, 1); x->icoladd(0, new ICol(*x, 0, *t->m_col[1])); + t->itabadd(0, x); } if (useindex(par, 1)) { - // a, c + // c, a ITab* x = new ITab(*t, "ti1x1", ITab::OrderedIndex, 2); - x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); - x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); + x->icoladd(0, new ICol(*x, 0, *t->m_col[2])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[0])); t->itabadd(1, x); } if (useindex(par, 2)) { - // c, a - ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 2); - x->icoladd(0, new ICol(*x, 0, *t->m_col[2])); - x->icoladd(1, new ICol(*x, 1, *t->m_col[0])); + // d + ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 1); + x->icoladd(0, new ICol(*x, 0, *t->m_col[3])); t->itabadd(2, x); } if (useindex(par, 3)) { - // e - ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 1); - x->icoladd(0, new ICol(*x, 0, *t->m_col[4])); - t->itabadd(3, x); - } - if (useindex(par, 4)) { // e, d, c, b - ITab* x = new ITab(*t, "ti1x4", ITab::OrderedIndex, 4); + ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 4); x->icoladd(0, new ICol(*x, 0, *t->m_col[4])); x->icoladd(1, new ICol(*x, 1, *t->m_col[3])); x->icoladd(2, new ICol(*x, 2, *t->m_col[2])); x->icoladd(3, new ICol(*x, 3, *t->m_col[1])); - t->itabadd(4, x); + t->itabadd(3, x); } - if (useindex(par, 5)) { + if (useindex(par, 4)) { // a, b - ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 2); + ITab* x = new ITab(*t, "ti1z4", ITab::UniqueHashIndex, 2); x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); x->icoladd(1, new ICol(*x, 1, *t->m_col[1])); - t->itabadd(5, x); + t->itabadd(4, x); } - if (useindex(par, 6)) { + if (useindex(par, 5)) { // a, b, d - ITab* x = new ITab(*t, "ti1z6", ITab::UniqueHashIndex, 3); + ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 3); x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); x->icoladd(1, new ICol(*x, 1, *t->m_col[1])); x->icoladd(2, new ICol(*x, 2, *t->m_col[3])); - t->itabadd(6, x); + t->itabadd(5, x); } tablist[1] = t; } @@ -1027,8 +1054,8 @@ makebuiltintables(Par par) t->coladd(0, new Col(*t, 0, "a", 1, Col::Char, 31, 0, getcs(par))); t->coladd(1, new Col(*t, 1, "b", 0, Col::Char, 4, 1, getcs(par))); t->coladd(2, new Col(*t, 2, "c", 1, Col::Unsigned, 1, 0, 0)); - t->coladd(3, new Col(*t, 3, "d", 1, Col::Char, 3, 0, getcs(par))); - t->coladd(4, new Col(*t, 4, "e", 0, Col::Char, 17, 0, getcs(par))); + t->coladd(3, new Col(*t, 3, "d", 1, Col::Varchar, 128, 0, getcs(par))); + t->coladd(4, new Col(*t, 4, "e", 0, Col::Varchar, 7, 0, getcs(par))); if (useindex(par, 0)) { // a, c, d ITab* x = new ITab(*t, "ti2x0", ITab::OrderedIndex, 3); @@ -1060,27 +1087,20 @@ makebuiltintables(Par par) t->itabadd(3, x); } if (useindex(par, 4)) { - // a, e - ITab* x = new ITab(*t, "ti2x4", ITab::OrderedIndex, 2); - x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); - x->icoladd(1, new ICol(*x, 1, *t->m_col[4])); - t->itabadd(4, x); - } - if (useindex(par, 5)) { // a, c - ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 2); + ITab* x = new ITab(*t, "ti2z4", ITab::UniqueHashIndex, 2); x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); - t->itabadd(5, x); + t->itabadd(4, x); } - if (useindex(par, 6)) { + if (useindex(par, 5)) { // a, c, d, e - ITab* x = new ITab(*t, "ti2z6", ITab::UniqueHashIndex, 4); + ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 4); x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); x->icoladd(2, new ICol(*x, 2, *t->m_col[3])); x->icoladd(3, new ICol(*x, 3, *t->m_col[4])); - t->itabadd(6, x); + t->itabadd(5, x); } tablist[2] = t; } @@ -1115,19 +1135,19 @@ struct Con { void disconnect(); int startTransaction(); int getNdbOperation(const Tab& tab); + int getNdbIndexOperation1(const ITab& itab, const Tab& tab); int getNdbIndexOperation(const ITab& itab, const Tab& tab); int getNdbScanOperation(const Tab& tab); - int getNdbScanOperation(const ITab& itab, const Tab& tab); + int getNdbIndexScanOperation1(const ITab& itab, const Tab& tab); + int getNdbIndexScanOperation(const ITab& itab, const Tab& tab); int equal(int num, const char* addr); int getValue(int num, NdbRecAttr*& rec); int setValue(int num, const char* addr); int setBound(int num, int type, const void* value); int execute(ExecType t); int execute(ExecType t, bool& deadlock); - int openScanRead(unsigned scanbat, unsigned scanpar); - int openScanExclusive(unsigned scanbat, unsigned scanpar); - int openScanOrdered(unsigned scanbat, unsigned scanpar, bool descending); - int openScanOrderedExclusive(unsigned scanbat, unsigned scanpar, bool descending); + int readTuples(Par par); + int readIndexTuples(Par par); int executeScan(); int nextScanResult(bool fetchAllowed); int nextScanResult(bool fetchAllowed, bool& deadlock); @@ -1182,7 +1202,7 @@ Con::getNdbOperation(const Tab& tab) } int -Con::getNdbIndexOperation(const ITab& itab, const Tab& tab) +Con::getNdbIndexOperation1(const ITab& itab, const Tab& tab) { assert(m_tx != 0); CHKCON((m_op = m_indexop = m_tx->getNdbIndexOperation(itab.m_name, tab.m_name)) != 0, *this); @@ -1190,6 +1210,20 @@ Con::getNdbIndexOperation(const ITab& itab, const Tab& tab) } int +Con::getNdbIndexOperation(const ITab& itab, const Tab& tab) +{ + assert(m_tx != 0); + unsigned tries = 0; + while (1) { + if (getNdbIndexOperation1(itab, tab) == 0) + break; + CHK(++tries < 10); + NdbSleep_MilliSleep(100); + } + return 0; +} + +int Con::getNdbScanOperation(const Tab& tab) { assert(m_tx != 0); @@ -1198,7 +1232,7 @@ Con::getNdbScanOperation(const Tab& tab) } int -Con::getNdbScanOperation(const ITab& itab, const Tab& tab) +Con::getNdbIndexScanOperation1(const ITab& itab, const Tab& tab) { assert(m_tx != 0); CHKCON((m_op = m_scanop = m_indexscanop = m_tx->getNdbIndexScanOperation(itab.m_name, tab.m_name)) != 0, *this); @@ -1206,6 +1240,20 @@ Con::getNdbScanOperation(const ITab& itab, const Tab& tab) } int +Con::getNdbIndexScanOperation(const ITab& itab, const Tab& tab) +{ + assert(m_tx != 0); + unsigned tries = 0; + while (1) { + if (getNdbIndexScanOperation1(itab, tab) == 0) + break; + CHK(++tries < 10); + NdbSleep_MilliSleep(100); + } + return 0; +} + +int Con::equal(int num, const char* addr) { assert(m_tx != 0 && m_op != 0); @@ -1262,42 +1310,21 @@ Con::execute(ExecType t, bool& deadlock) } int -Con::openScanRead(unsigned scanbat, unsigned scanpar) +Con::readTuples(Par par) { assert(m_tx != 0 && m_scanop != 0); - NdbOperation::LockMode lm = NdbOperation::LM_Read; - CHKCON(m_scanop->readTuples(lm, scanbat, scanpar) == 0, *this); + CHKCON(m_scanop->readTuples(par.m_lockmode, par.m_scanbat, par.m_scanpar) == 0, *this); return 0; } int -Con::openScanExclusive(unsigned scanbat, unsigned scanpar) -{ - assert(m_tx != 0 && m_scanop != 0); - NdbOperation::LockMode lm = NdbOperation::LM_Exclusive; - CHKCON(m_scanop->readTuples(lm, scanbat, scanpar) == 0, *this); - return 0; -} - -int -Con::openScanOrdered(unsigned scanbat, unsigned scanpar, bool descending) -{ - assert(m_tx != 0 && m_indexscanop != 0); - NdbOperation::LockMode lm = NdbOperation::LM_Read; - CHKCON(m_indexscanop->readTuples(lm, scanbat, scanpar, true, descending) == 0, *this); - return 0; -} - -int -Con::openScanOrderedExclusive(unsigned scanbat, unsigned scanpar, bool descending) +Con::readIndexTuples(Par par) { assert(m_tx != 0 && m_indexscanop != 0); - NdbOperation::LockMode lm = NdbOperation::LM_Exclusive; - CHKCON(m_indexscanop->readTuples(lm, scanbat, scanpar, true, descending) == 0, *this); + CHKCON(m_indexscanop->readTuples(par.m_lockmode, par.m_scanbat, par.m_scanpar, par.m_ordered, par.m_descending) == 0, *this); return 0; } - int Con::executeScan() { @@ -1564,6 +1591,8 @@ struct Val { union { Uint32 m_uint32; unsigned char* m_char; + unsigned char* m_varchar; + unsigned char* m_longvarchar; }; Val(const Col& col); ~Val(); @@ -1576,9 +1605,12 @@ struct Val { int setval(Par par) const; void calc(Par par, unsigned i); void calckey(Par par, unsigned i); + void calckeychars(Par par, unsigned i, unsigned& n, unsigned char* buf); void calcnokey(Par par); - int verify(const Val& val2) const; - int cmp(const Val& val2) const; + void calcnokeychars(Par par, unsigned& n, unsigned char* buf); + int verify(Par par, const Val& val2) const; + int cmp(Par par, const Val& val2) const; + int cmpchars(Par par, const unsigned char* buf1, unsigned len1, const unsigned char* buf2, unsigned len2) const; private: Val& operator=(const Val& val2); }; @@ -1595,6 +1627,12 @@ Val::Val(const Col& col) : case Col::Char: m_char = new unsigned char [col.m_bytelength]; break; + case Col::Varchar: + m_varchar = new unsigned char [1 + col.m_bytelength]; + break; + case Col::Longvarchar: + m_longvarchar = new unsigned char [2 + col.m_bytelength]; + break; default: assert(false); break; @@ -1610,6 +1648,12 @@ Val::~Val() case Col::Char: delete [] m_char; break; + case Col::Varchar: + delete [] m_varchar; + break; + case Col::Longvarchar: + delete [] m_longvarchar; + break; default: assert(false); break; @@ -1640,6 +1684,12 @@ Val::copy(const void* addr) case Col::Char: memcpy(m_char, addr, col.m_bytelength); break; + case Col::Varchar: + memcpy(m_varchar, addr, 1 + col.m_bytelength); + break; + case Col::Longvarchar: + memcpy(m_longvarchar, addr, 2 + col.m_bytelength); + break; default: assert(false); break; @@ -1656,6 +1706,10 @@ Val::dataaddr() const return &m_uint32; case Col::Char: return m_char; + case Col::Varchar: + return m_varchar; + case Col::Longvarchar: + return m_longvarchar; default: break; } @@ -1704,7 +1758,7 @@ Val::calc(Par par, unsigned i) const Col& col = m_col; col.m_pk ? calckey(par, i) : calcnokey(par); if (! m_null) - col.verify(dataaddr()); + col.wellformed(dataaddr()); } void @@ -1721,19 +1775,30 @@ Val::calckey(Par par, unsigned i) const Chs* chs = col.m_chs; CHARSET_INFO* cs = chs->m_cs; unsigned n = 0; - // our random chars may not fill value exactly - while (n + cs->mbmaxlen <= col.m_bytelength) { - if (i % (1 + n) == 0) { - break; - } - const Chr& chr = chs->m_chr[i % maxcharcount]; - memcpy(&m_char[n], chr.m_bytes, chr.m_size); - n += chr.m_size; - } - // this will extend by appropriate space + calckeychars(par, i, n, m_char); + // extend by appropriate space (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20); } break; + case Col::Varchar: + { + unsigned n = 0; + calckeychars(par, i, n, m_varchar + 1); + // set length and pad with nulls + m_varchar[0] = n; + memset(&m_varchar[1 + n], 0, col.m_bytelength - n); + } + break; + case Col::Longvarchar: + { + unsigned n = 0; + calckeychars(par, i, n, m_longvarchar + 2); + // set length and pad with nulls + m_longvarchar[0] = (n & 0xff); + m_longvarchar[1] = (n >> 8); + memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n); + } + break; default: assert(false); break; @@ -1741,6 +1806,24 @@ Val::calckey(Par par, unsigned i) } void +Val::calckeychars(Par par, unsigned i, unsigned& n, unsigned char* buf) +{ + const Col& col = m_col; + const Chs* chs = col.m_chs; + CHARSET_INFO* cs = chs->m_cs; + n = 0; + // our random chars may not fill value exactly + while (n + cs->mbmaxlen <= col.m_bytelength) { + if (i % (1 + n) == 0) { + break; + } + const Chr& chr = chs->m_chr[i % maxcharcount]; + memcpy(buf + n, chr.m_bytes, chr.m_size); + n += chr.m_size; + } +} + +void Val::calcnokey(Par par) { const Col& col = m_col; @@ -1764,42 +1847,71 @@ Val::calcnokey(Par par) const Chs* chs = col.m_chs; CHARSET_INFO* cs = chs->m_cs; unsigned n = 0; - // our random chars may not fill value exactly - while (n + cs->mbmaxlen <= col.m_bytelength) { - if (urandom(1 + col.m_bytelength) == 0) { - break; - } - unsigned half = maxcharcount / 2; - int r = irandom((par.m_pctrange * half) / 100); - if (par.m_bdir != 0 && urandom(10) != 0) { - if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0) - r = -r; - } - unsigned i = half + r; - assert(i < maxcharcount); - const Chr& chr = chs->m_chr[i]; - memcpy(&m_char[n], chr.m_bytes, chr.m_size); - n += chr.m_size; - } - // this will extend by appropriate space + calcnokeychars(par, n, m_char); + // extend by appropriate space (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20); } break; + case Col::Varchar: + { + unsigned n = 0; + calcnokeychars(par, n, m_varchar + 1); + // set length and pad with nulls + m_varchar[0] = n; + memset(&m_varchar[1 + n], 0, col.m_bytelength - n); + } + break; + case Col::Longvarchar: + { + unsigned n = 0; + calcnokeychars(par, n, m_longvarchar + 2); + // set length and pad with nulls + m_longvarchar[0] = (n & 0xff); + m_longvarchar[1] = (n >> 8); + memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n); + } + break; default: assert(false); break; } } +void +Val::calcnokeychars(Par par, unsigned& n, unsigned char* buf) +{ + const Col& col = m_col; + const Chs* chs = col.m_chs; + CHARSET_INFO* cs = chs->m_cs; + n = 0; + // our random chars may not fill value exactly + while (n + cs->mbmaxlen <= col.m_bytelength) { + if (urandom(1 + col.m_bytelength) == 0) { + break; + } + unsigned half = maxcharcount / 2; + int r = irandom((par.m_pctrange * half) / 100); + if (par.m_bdir != 0 && urandom(10) != 0) { + if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0) + r = -r; + } + unsigned i = half + r; + assert(i < maxcharcount); + const Chr& chr = chs->m_chr[i]; + memcpy(buf + n, chr.m_bytes, chr.m_size); + n += chr.m_size; + } +} + int -Val::verify(const Val& val2) const +Val::verify(Par par, const Val& val2) const { - CHK(cmp(val2) == 0); + CHK(cmp(par, val2) == 0); return 0; } int -Val::cmp(const Val& val2) const +Val::cmp(Par par, const Val& val2) const { const Col& col = m_col; const Col& col2 = val2.m_col; @@ -1812,8 +1924,8 @@ Val::cmp(const Val& val2) const return 0; } // verify data formats - col.verify(dataaddr()); - col.verify(val2.dataaddr()); + col.wellformed(dataaddr()); + col.wellformed(val2.dataaddr()); // compare switch (col.m_type) { case Col::Unsigned: @@ -1827,25 +1939,22 @@ Val::cmp(const Val& val2) const break; case Col::Char: { - const Chs* chs = col.m_chs; - CHARSET_INFO* cs = chs->m_cs; unsigned len = col.m_bytelength; - int k; - if (! g_opt.m_collsp) { - unsigned char x1[maxxmulsize * 8000]; - unsigned char x2[maxxmulsize * 8000]; - int n1 = (*cs->coll->strnxfrm)(cs, x1, chs->m_xmul * len, m_char, len); - int n2 = (*cs->coll->strnxfrm)(cs, x2, chs->m_xmul * len, val2.m_char, len); - // currently same but do not assume it - unsigned n = (n1 > n2 ? n1 : n2); - // assume null padding - memset(x1 + n1, 0x0, n - n1); - memset(x2 + n2, 0x0, n - n2); - k = memcmp(x1, x2, n); - } else { - k = (*cs->coll->strnncollsp)(cs, m_char, len, val2.m_char, len, false); - } - return k < 0 ? -1 : k > 0 ? +1 : 0; + return cmpchars(par, m_char, len, val2.m_char, len); + } + break; + case Col::Varchar: + { + unsigned len1 = m_varchar[0]; + unsigned len2 = val2.m_varchar[0]; + return cmpchars(par, m_varchar + 1, len1, val2.m_varchar + 1, len2); + } + break; + case Col::Longvarchar: + { + unsigned len1 = m_longvarchar[0] + (m_longvarchar[1] << 8); + unsigned len2 = val2.m_longvarchar[0] + (val2.m_longvarchar[1] << 8); + return cmpchars(par, m_longvarchar + 2, len1, val2.m_longvarchar + 2, len2); } break; default: @@ -1855,6 +1964,56 @@ Val::cmp(const Val& val2) const return 0; } +int +Val::cmpchars(Par par, const unsigned char* buf1, unsigned len1, const unsigned char* buf2, unsigned len2) const +{ + const Col& col = m_col; + const Chs* chs = col.m_chs; + CHARSET_INFO* cs = chs->m_cs; + int k; + if (! par.m_collsp) { + unsigned char x1[maxxmulsize * 8000]; + unsigned char x2[maxxmulsize * 8000]; + // make strxfrm pad both to same length + unsigned len = maxxmulsize * col.m_bytelength; + int n1 = NdbSqlUtil::strnxfrm_bug7284(cs, x1, chs->m_xmul * len, buf1, len1); + int n2 = NdbSqlUtil::strnxfrm_bug7284(cs, x2, chs->m_xmul * len, buf2, len2); + assert(n1 == n2); + k = memcmp(x1, x2, n1); + } else { + k = (*cs->coll->strnncollsp)(cs, buf1, len1, buf2, len2, false); + } + return k < 0 ? -1 : k > 0 ? +1 : 0; +} + +static void +printstring(NdbOut& out, const unsigned char* str, unsigned len, bool showlen) +{ + char buf[4 * 8000]; + char *p = buf; + *p++ = '['; + if (showlen) { + sprintf(p, "%u:", len); + p += strlen(p); + } + for (unsigned i = 0; i < len; i++) { + unsigned char c = str[i]; + if (c == '\\') { + *p++ = '\\'; + *p++ = c; + } else if (0x20 <= c && c < 0x7e) { + *p++ = c; + } else { + *p++ = '\\'; + *p++ = hexstr[c >> 4]; + *p++ = hexstr[c & 15]; + } + } + *p++ = ']'; + *p = 0; + out << buf; +} + static NdbOut& operator<<(NdbOut& out, const Val& val) { @@ -1869,25 +2028,20 @@ operator<<(NdbOut& out, const Val& val) break; case Col::Char: { - char buf[4 * 8000]; - char *p = buf; - *p++ = '['; - for (unsigned i = 0; i < col.m_bytelength; i++) { - unsigned char c = val.m_char[i]; - if (c == '\\') { - *p++ = '\\'; - *p++ = '\\'; - } else if (0x20 <= c && c < 0x7e) { - *p++ = c; - } else { - *p++ = '\\'; - *p++ = hexstr[c >> 4]; - *p++ = hexstr[c & 15]; - } - } - *p++ = ']'; - *p = 0; - out << buf; + unsigned len = col.m_bytelength; + printstring(out, val.m_char, len, false); + } + break; + case Col::Varchar: + { + unsigned len = val.m_varchar[0]; + printstring(out, val.m_varchar + 1, len, true); + } + break; + case Col::Longvarchar: + { + unsigned len = val.m_longvarchar[0] + (val.m_longvarchar[1] << 8); + printstring(out, val.m_longvarchar + 2, len, true); } break; default: @@ -1912,7 +2066,7 @@ struct Row { void copy(const Row& row2); void calc(Par par, unsigned i, unsigned mask = 0); const Row& dbrow() const; - int verify(const Row& row2) const; + int verify(Par par, const Row& row2) const; int insrow(Par par); int updrow(Par par); int updrow(Par par, const ITab& itab); @@ -1921,8 +2075,8 @@ struct Row { int selrow(Par par); int selrow(Par par, const ITab& itab); int setrow(Par par); - int cmp(const Row& row2) const; - int cmp(const Row& row2, const ITab& itab) const; + int cmp(Par par, const Row& row2) const; + int cmp(Par par, const Row& row2, const ITab& itab) const; private: Row& operator=(const Row& row2); }; @@ -1994,7 +2148,7 @@ Row::dbrow() const } int -Row::verify(const Row& row2) const +Row::verify(Par par, const Row& row2) const { const Tab& tab = m_tab; const Row& row1 = *this; @@ -2002,7 +2156,7 @@ Row::verify(const Row& row2) const for (unsigned k = 0; k < tab.m_cols; k++) { const Val& val1 = *row1.m_val[k]; const Val& val2 = *row2.m_val[k]; - CHK(val1.verify(val2) == 0); + CHK(val1.verify(par, val2) == 0); } return 0; } @@ -2169,7 +2323,7 @@ Row::setrow(Par par) } int -Row::cmp(const Row& row2) const +Row::cmp(Par par, const Row& row2) const { const Tab& tab = m_tab; assert(&tab == &row2.m_tab); @@ -2177,14 +2331,14 @@ Row::cmp(const Row& row2) const for (unsigned k = 0; k < tab.m_cols; k++) { const Val& val = *m_val[k]; const Val& val2 = *row2.m_val[k]; - if ((c = val.cmp(val2)) != 0) + if ((c = val.cmp(par, val2)) != 0) break; } return c; } int -Row::cmp(const Row& row2, const ITab& itab) const +Row::cmp(Par par, const Row& row2, const ITab& itab) const { const Tab& tab = m_tab; int c = 0; @@ -2195,7 +2349,7 @@ Row::cmp(const Row& row2, const ITab& itab) const assert(k < tab.m_cols); const Val& val = *m_val[k]; const Val& val2 = *row2.m_val[k]; - if ((c = val.cmp(val2)) != 0) + if ((c = val.cmp(par, val2)) != 0) break; } return c; @@ -2283,8 +2437,8 @@ struct Set { int getkey(Par par, unsigned* i); int putval(unsigned i, bool force, unsigned n = ~0); // verify - int verify(const Set& set2) const; - int verifyorder(const ITab& itab, bool descending) const; + int verify(Par par, const Set& set2) const; + int verifyorder(Par par, const ITab& itab, bool descending) const; // protect structure NdbMutex* m_mutex; void lock() const { @@ -2616,7 +2770,7 @@ Set::putval(unsigned i, bool force, unsigned n) // verify int -Set::verify(const Set& set2) const +Set::verify(Par par, const Set& set2) const { assert(&m_tab == &set2.m_tab && m_rows == set2.m_rows); LL4("verify set1 count=" << count() << " vs set2 count=" << set2.count()); @@ -2625,7 +2779,7 @@ Set::verify(const Set& set2) const if (exist(i) != set2.exist(i)) { ok = false; } else if (exist(i)) { - if (dbrow(i).verify(set2.dbrow(i)) != 0) + if (dbrow(i).verify(par, set2.dbrow(i)) != 0) ok = false; } if (! ok) { @@ -2637,7 +2791,7 @@ Set::verify(const Set& set2) const } int -Set::verifyorder(const ITab& itab, bool descending) const +Set::verifyorder(Par par, const ITab& itab, bool descending) const { const Tab& tab = m_tab; for (unsigned n = 0; n < m_rows; n++) { @@ -2652,9 +2806,9 @@ Set::verifyorder(const ITab& itab, bool descending) const const Row& row2 = *m_row[i2]; assert(row1.m_exist && row2.m_exist); if (! descending) - CHK(row1.cmp(row2, itab) <= 0); + CHK(row1.cmp(par, row2, itab) <= 0); else - CHK(row1.cmp(row2, itab) >= 0); + CHK(row1.cmp(par, row2, itab) >= 0); } return 0; } @@ -2724,7 +2878,7 @@ struct BSet { void calc(Par par); void calcpk(Par par, unsigned i); int setbnd(Par par) const; - void filter(const Set& set, Set& set2) const; + void filter(Par par, const Set& set, Set& set2) const; }; BSet::BSet(const Tab& tab, const ITab& itab, unsigned rows) : @@ -2797,7 +2951,7 @@ BSet::calc(Par par) assert(m_bvals >= 2); const BVal& bv1 = *m_bval[m_bvals - 2]; const BVal& bv2 = *m_bval[m_bvals - 1]; - if (bv1.cmp(bv2) > 0 && urandom(100) != 0) + if (bv1.cmp(par, bv2) > 0 && urandom(100) != 0) continue; } } while (0); @@ -2848,7 +3002,7 @@ BSet::setbnd(Par par) const } void -BSet::filter(const Set& set, Set& set2) const +BSet::filter(Par par, const Set& set, Set& set2) const { const Tab& tab = m_tab; const ITab& itab = m_itab; @@ -2880,7 +3034,7 @@ BSet::filter(const Set& set, Set& set2) const const ICol& icol = bval.m_icol; const Col& col = icol.m_col; const Val& val = *row.m_val[col.m_num]; - int ret = bval.cmp(val); + int ret = bval.cmp(par, val); LL5("cmp: ret=" << ret << " " << bval << " vs " << val); if (bval.m_type == 0) ok2 = (ret <= 0); @@ -3110,7 +3264,7 @@ pkread(Par par) con.closeTransaction(); } if (par.m_verify) - CHK(set1.verify(set2) == 0); + CHK(set1.verify(par, set2) == 0); return 0; } @@ -3275,7 +3429,7 @@ hashindexread(Par par, const ITab& itab) con.closeTransaction(); } if (par.m_verify) - CHK(set1.verify(set2) == 0); + CHK(set1.verify(par, set2) == 0); return 0; } @@ -3289,12 +3443,11 @@ scanreadtable(Par par) const Set& set = par.set(); // expected const Set& set1 = set; - LL3("scanread " << tab.m_name << " verify=" << par.m_verify); - LL4("expect " << set.count() << " rows"); + LL3("scanread " << tab.m_name << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify); Set set2(tab, set.m_rows); CHK(con.startTransaction() == 0); CHK(con.getNdbScanOperation(tab) == 0); - CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0); + CHK(con.readTuples(par) == 0); set2.getval(par); CHK(con.executeScan() == 0); unsigned n = 0; @@ -3317,7 +3470,7 @@ scanreadtable(Par par) } con.closeTransaction(); if (par.m_verify) - CHK(set1.verify(set2) == 0); + CHK(set1.verify(par, set2) == 0); LL3("scanread " << tab.m_name << " done rows=" << n); return 0; } @@ -3331,7 +3484,7 @@ scanreadtablefast(Par par, unsigned countcheck) LL3("scanfast " << tab.m_name); CHK(con.startTransaction() == 0); CHK(con.getNdbScanOperation(tab) == 0); - CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0); + CHK(con.readTuples(par) == 0); // get 1st column NdbRecAttr* rec; CHK(con.getValue((Uint32)0, rec) == 0); @@ -3355,12 +3508,11 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc) Con& con = par.con(); const Tab& tab = par.tab(); const Set& set = par.set(); - LL4(bset); Set set1(tab, set.m_rows); if (calc) { while (true) { bset.calc(par); - bset.filter(set, set1); + bset.filter(par, set, set1); unsigned n = set1.count(); // prefer proper subset if (0 < n && n < set.m_rows) @@ -3370,17 +3522,13 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc) set1.reset(); } } else { - bset.filter(set, set1); + bset.filter(par, set, set1); } - LL3("scanread " << itab.m_name << " bounds=" << bset << " verify=" << par.m_verify << " ordered=" << par.m_ordered << " descending=" << par.m_descending); - LL4("expect " << set1.count() << " rows"); + LL3("scanread " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify << " ordered=" << par.m_ordered << " descending=" << par.m_descending); Set set2(tab, set.m_rows); CHK(con.startTransaction() == 0); - CHK(con.getNdbScanOperation(itab, tab) == 0); - if (! par.m_ordered) - CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0); - else - CHK(con.openScanOrdered(par.m_scanbat, par.m_scanpar, par.m_descending) == 0); + CHK(con.getNdbIndexScanOperation(itab, tab) == 0); + CHK(con.readIndexTuples(par) == 0); CHK(bset.setbnd(par) == 0); set2.getval(par); CHK(con.executeScan() == 0); @@ -3404,9 +3552,9 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc) } con.closeTransaction(); if (par.m_verify) { - CHK(set1.verify(set2) == 0); + CHK(set1.verify(par, set2) == 0); if (par.m_ordered) - CHK(set2.verifyorder(itab, par.m_descending) == 0); + CHK(set2.verifyorder(par, itab, par.m_descending) == 0); } LL3("scanread " << itab.m_name << " done rows=" << n); return 0; @@ -3418,11 +3566,11 @@ scanreadindexfast(Par par, const ITab& itab, const BSet& bset, unsigned countche Con& con = par.con(); const Tab& tab = par.tab(); const Set& set = par.set(); - LL3("scanfast " << itab.m_name << " bounds=" << bset.m_bvals); + LL3("scanfast " << itab.m_name << " " << bset); LL4(bset); CHK(con.startTransaction() == 0); - CHK(con.getNdbScanOperation(itab, tab) == 0); - CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0); + CHK(con.getNdbIndexScanOperation(itab, tab) == 0); + CHK(con.readIndexTuples(par) == 0); CHK(bset.setbnd(par) == 0); // get 1st column NdbRecAttr* rec; @@ -3544,9 +3692,10 @@ scanupdatetable(Par par) Set& set = par.set(); LL3("scan update " << tab.m_name); Set set2(tab, set.m_rows); + par.m_lockmode = NdbOperation::LM_Exclusive; CHK(con.startTransaction() == 0); CHK(con.getNdbScanOperation(tab) == 0); - CHK(con.openScanExclusive(par.m_scanbat, par.m_scanpar) == 0); + CHK(con.readTuples(par) == 0); set2.getval(par); CHK(con.executeScan() == 0); unsigned count = 0; @@ -3641,12 +3790,10 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) Set& set = par.set(); LL3("scan update " << itab.m_name); Set set2(tab, set.m_rows); + par.m_lockmode = NdbOperation::LM_Exclusive; CHK(con.startTransaction() == 0); - CHK(con.getNdbScanOperation(itab, tab) == 0); - if (! par.m_ordered) - CHK(con.openScanExclusive(par.m_scanbat, par.m_scanpar) == 0); - else - CHK(con.openScanOrderedExclusive(par.m_scanbat, par.m_scanpar, par.m_descending) == 0); + CHK(con.getNdbIndexScanOperation(itab, tab) == 0); + CHK(con.readTuples(par) == 0); CHK(bset.setbnd(par) == 0); set2.getval(par); CHK(con.executeScan() == 0); @@ -3777,6 +3924,7 @@ readverify(Par par) if (par.m_noverify) return 0; par.m_verify = true; + par.m_lockmode = NdbOperation::LM_CommittedRead; CHK(pkread(par) == 0); CHK(scanreadall(par) == 0); return 0; @@ -3788,19 +3936,24 @@ readverifyfull(Par par) if (par.m_noverify) return 0; par.m_verify = true; - if (par.m_no == 0) + par.m_lockmode = NdbOperation::LM_CommittedRead; + const Tab& tab = par.tab(); + if (par.m_no == 0) { + // thread 0 scans table CHK(scanreadtable(par) == 0); - else { - const Tab& tab = par.tab(); - unsigned i = par.m_no - 1; - if (i < tab.m_itabs && tab.m_itab[i] != 0) { - const ITab& itab = *tab.m_itab[i]; - if (itab.m_type == ITab::OrderedIndex) { - BSet bset(tab, itab, par.m_rows); - CHK(scanreadindex(par, itab, bset, false) == 0); - } else { - CHK(hashindexread(par, itab) == 0); - } + } + // each thread scans different indexes + for (unsigned i = 0; i < tab.m_itabs; i++) { + if (i % par.m_threads != par.m_no) + continue; + if (tab.m_itab[i] == 0) + continue; + const ITab& itab = *tab.m_itab[i]; + if (itab.m_type == ITab::OrderedIndex) { + BSet bset(tab, itab, par.m_rows); + CHK(scanreadindex(par, itab, bset, false) == 0); + } else { + CHK(hashindexread(par, itab) == 0); } } return 0; @@ -3809,7 +3962,10 @@ readverifyfull(Par par) static int readverifyindex(Par par) { + if (par.m_noverify) + return 0; par.m_verify = true; + par.m_lockmode = NdbOperation::LM_CommittedRead; unsigned sel = urandom(10); if (sel < 9) { par.m_ordered = true; @@ -4411,15 +4567,30 @@ printtables() { Par par(g_opt); makebuiltintables(par); - ndbout << "builtin tables (x0 on pk, x=ordered z=hash):" << endl; + ndbout << "tables and indexes (x=ordered z=hash x0=on pk):" << endl; for (unsigned j = 0; j < tabcount; j++) { if (tablist[j] == 0) continue; const Tab& tab = *tablist[j]; - ndbout << " " << tab.m_name; + const char* tname = tab.m_name; + ndbout << " " << tname; for (unsigned i = 0; i < tab.m_itabs; i++) { + if (tab.m_itab[i] == 0) + continue; const ITab& itab = *tab.m_itab[i]; - ndbout << " " << itab.m_name; + const char* iname = itab.m_name; + if (strncmp(tname, iname, strlen(tname)) == 0) + iname += strlen(tname); + ndbout << " " << iname; + ndbout << "("; + for (unsigned k = 0; k < itab.m_icols; k++) { + if (k != 0) + ndbout << ","; + const ICol& icol = *itab.m_icol[k]; + const Col& col = icol.m_col; + ndbout << col.m_name; + } + ndbout << ")"; } ndbout << endl; } @@ -4434,9 +4605,12 @@ runtest(Par par) unsigned short seed = (getpid() ^ time(0)); LL1("random seed: " << seed); srandom((unsigned)seed); - } else if (par.m_seed != 0) + } else if (par.m_seed != 0) { LL1("random seed: " << par.m_seed); srandom(par.m_seed); + } else { + LL1("random seed: loop number"); + } // cs assert(par.m_csname != 0); if (strcmp(par.m_csname, "random") != 0) { |