summaryrefslogtreecommitdiff
path: root/ndb
diff options
context:
space:
mode:
authorunknown <pekka@mysql.com>2005-02-20 15:55:39 +0100
committerunknown <pekka@mysql.com>2005-02-20 15:55:39 +0100
commit6fc95e50a9d974d9fa402162952c9f15d2730a9c (patch)
treee5c0f50f8a9d951beddbed5e47f2345bae1b25d1 /ndb
parent2200e35c8c17fee129cb63c2f6b4fa2266de7a86 (diff)
downloadmariadb-git-6fc95e50a9d974d9fa402162952c9f15d2730a9c.tar.gz
ndb - simple scan filter test (in testOIBasic)
ndb/src/ndbapi/NdbDictionary.cpp: print unusual array size ndb/src/ndbapi/NdbRecAttr.cpp: print hex chars as unsigned ndb/src/ndbapi/NdbOperationInt.cpp: ignore of NULL bound ndb/test/ndbapi/testOIBasic.cpp: add simple scan filter test (table scan on index bounds)
Diffstat (limited to 'ndb')
-rw-r--r--ndb/src/ndbapi/NdbDictionary.cpp18
-rw-r--r--ndb/src/ndbapi/NdbOperationInt.cpp20
-rw-r--r--ndb/src/ndbapi/NdbRecAttr.cpp5
-rw-r--r--ndb/test/ndbapi/testOIBasic.cpp176
4 files changed, 202 insertions, 17 deletions
diff --git a/ndb/src/ndbapi/NdbDictionary.cpp b/ndb/src/ndbapi/NdbDictionary.cpp
index 664d568aee0..4cc47543cec 100644
--- a/ndb/src/ndbapi/NdbDictionary.cpp
+++ b/ndb/src/ndbapi/NdbDictionary.cpp
@@ -1011,6 +1011,24 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col)
out << "Type" << (Uint32)col.getType();
break;
}
+ // show unusual (non-MySQL) array size
+ if (col.getLength() != 1) {
+ switch (col.getType()) {
+ case NdbDictionary::Column::Char:
+ case NdbDictionary::Column::Varchar:
+ case NdbDictionary::Column::Binary:
+ case NdbDictionary::Column::Varbinary:
+ case NdbDictionary::Column::Blob:
+ case NdbDictionary::Column::Text:
+ case NdbDictionary::Column::Bit:
+ case NdbDictionary::Column::Longvarchar:
+ case NdbDictionary::Column::Longvarbinary:
+ break;
+ default:
+ out << " [" << col.getLength() << "]";
+ break;
+ }
+ }
if (col.getPrimaryKey())
out << " PRIMARY KEY";
else if (! col.getNullable())
diff --git a/ndb/src/ndbapi/NdbOperationInt.cpp b/ndb/src/ndbapi/NdbOperationInt.cpp
index 0fa6ef3de62..5b98f090395 100644
--- a/ndb/src/ndbapi/NdbOperationInt.cpp
+++ b/ndb/src/ndbapi/NdbOperationInt.cpp
@@ -1026,15 +1026,19 @@ NdbOperation::branch_col(Uint32 type,
abort();
}
- Uint32 sizeInBytes = col->m_attrSize * col->m_arraySize;
- if (! col->getCharType()) {
- // prevent assert in NdbSqlUtil on length error
- if(len != 0 && len != sizeInBytes)
- {
- setErrorCodeAbort(4209);
- return -1;
+ if (val == NULL)
+ len = 0;
+ else {
+ if (! col->getCharType()) {
+ // prevent assert in NdbSqlUtil on length error
+ Uint32 sizeInBytes = col->m_attrSize * col->m_arraySize;
+ if (len != 0 && len != sizeInBytes)
+ {
+ setErrorCodeAbort(4209);
+ return -1;
+ }
+ len = sizeInBytes;
}
- len = sizeInBytes;
}
if (insertATTRINFO(Interpreter::BranchCol(c, 0, 0, false)) == -1)
diff --git a/ndb/src/ndbapi/NdbRecAttr.cpp b/ndb/src/ndbapi/NdbRecAttr.cpp
index 86a777e79d2..5e5306fc33a 100644
--- a/ndb/src/ndbapi/NdbRecAttr.cpp
+++ b/ndb/src/ndbapi/NdbRecAttr.cpp
@@ -139,8 +139,9 @@ NdbRecAttr::receive_data(const Uint32 * data, Uint32 sz){
static void
ndbrecattr_print_string(NdbOut& out, const char *type,
- const char *ref, unsigned sz)
+ const char *aref, unsigned sz)
{
+ const unsigned char* ref = (const unsigned char*)aref;
int i, len, printable= 1;
// trailing zeroes are not printed
for (i=sz-1; i >= 0; i--)
@@ -166,7 +167,7 @@ ndbrecattr_print_string(NdbOut& out, const char *type,
for (i= len+1; ref[i] != 0; i++)
out.print("%u]",len-i);
assert((int)sz > i);
- ndbrecattr_print_string(out,type,ref+i,sz-i);
+ ndbrecattr_print_string(out,type,aref+i,sz-i);
}
}
diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp
index ce1851083b1..96926a421fb 100644
--- a/ndb/test/ndbapi/testOIBasic.cpp
+++ b/ndb/test/ndbapi/testOIBasic.cpp
@@ -612,7 +612,10 @@ struct Col {
bool m_pk;
Type m_type;
unsigned m_length;
- unsigned m_bytelength;
+ unsigned m_bytelength; // multiplied by char width
+ unsigned m_attrsize; // base type size
+ unsigned m_headsize; // length bytes
+ unsigned m_bytesize; // full value size
bool m_nullable;
const Chs* m_chs;
Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs);
@@ -629,12 +632,26 @@ Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type typ
m_type(type),
m_length(length),
m_bytelength(length * (chs == 0 ? 1 : chs->m_cs->mbmaxlen)),
+ m_attrsize(
+ type == Unsigned ? sizeof(Uint32) :
+ type == Char ? sizeof(char) :
+ type == Varchar ? sizeof(char) :
+ type == Longvarchar ? sizeof(char) : ~0),
+ m_headsize(
+ type == Unsigned ? 0 :
+ type == Char ? 0 :
+ type == Varchar ? 1 :
+ type == Longvarchar ? 2 : ~0),
+ m_bytesize(m_headsize + m_attrsize * m_bytelength),
m_nullable(nullable),
m_chs(chs)
{
// fix long varchar
- if (type == Varchar && m_bytelength > 255)
+ if (type == Varchar && m_bytelength > 255) {
m_type = Longvarchar;
+ m_headsize += 1;
+ m_bytesize += 1;
+ }
}
Col::~Col()
@@ -1119,13 +1136,15 @@ struct Con {
NdbIndexOperation* m_indexop;
NdbScanOperation* m_scanop;
NdbIndexScanOperation* m_indexscanop;
+ NdbScanFilter* m_scanfilter;
enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive };
ScanMode m_scanmode;
enum ErrType { ErrNone = 0, ErrDeadlock, ErrOther };
ErrType m_errtype;
Con() :
m_ndb(0), m_dic(0), m_tx(0), m_op(0), m_indexop(0),
- m_scanop(0), m_indexscanop(0), m_scanmode(ScanNo), m_errtype(ErrNone) {}
+ m_scanop(0), m_indexscanop(0), m_scanfilter(0),
+ m_scanmode(ScanNo), m_errtype(ErrNone) {}
~Con() {
if (m_tx != 0)
closeTransaction();
@@ -1140,10 +1159,14 @@ struct Con {
int getNdbScanOperation(const Tab& tab);
int getNdbIndexScanOperation1(const ITab& itab, const Tab& tab);
int getNdbIndexScanOperation(const ITab& itab, const Tab& tab);
+ int getNdbScanFilter();
int equal(int num, const char* addr);
int getValue(int num, NdbRecAttr*& rec);
int setValue(int num, const char* addr);
int setBound(int num, int type, const void* value);
+ int beginFilter(int group);
+ int endFilter();
+ int setFilter(int num, int cond, const void* value, unsigned len);
int execute(ExecType t);
int execute(ExecType t, bool& deadlock);
int readTuples(Par par);
@@ -1254,6 +1277,15 @@ Con::getNdbIndexScanOperation(const ITab& itab, const Tab& tab)
}
int
+Con::getNdbScanFilter()
+{
+ assert(m_tx != 0 && m_scanop != 0);
+ delete m_scanfilter;
+ m_scanfilter = new NdbScanFilter(m_scanop);
+ return 0;
+}
+
+int
Con::equal(int num, const char* addr)
{
assert(m_tx != 0 && m_op != 0);
@@ -1280,12 +1312,36 @@ Con::setValue(int num, const char* addr)
int
Con::setBound(int num, int type, const void* value)
{
- assert(m_tx != 0 && m_op != 0);
+ assert(m_tx != 0 && m_indexscanop != 0);
CHKCON(m_indexscanop->setBound(num, type, value) == 0, *this);
return 0;
}
int
+Con::beginFilter(int group)
+{
+ assert(m_tx != 0 && m_scanfilter != 0);
+ CHKCON(m_scanfilter->begin((NdbScanFilter::Group)group) == 0, *this);
+ return 0;
+}
+
+int
+Con::endFilter()
+{
+ assert(m_tx != 0 && m_scanfilter != 0);
+ CHKCON(m_scanfilter->end() == 0, *this);
+ return 0;
+}
+
+int
+Con::setFilter(int num, int cond, const void* value, unsigned len)
+{
+ assert(m_tx != 0 && m_scanfilter != 0);
+ CHKCON(m_scanfilter->cmp((NdbScanFilter::BinaryCondition)cond, num, value, len) == 0, *this);
+ return 0;
+}
+
+int
Con::execute(ExecType t)
{
assert(m_tx != 0);
@@ -1502,7 +1558,7 @@ createtable(Par par)
const Col& col = *tab.m_col[k];
NdbDictionary::Column c(col.m_name);
c.setType((NdbDictionary::Column::Type)col.m_type);
- c.setLength(col.m_bytelength); // NDB API uses length in bytes
+ c.setLength(col.m_bytelength); // for char NDB API uses length in bytes
c.setPrimaryKey(col.m_pk);
c.setNullable(col.m_nullable);
if (col.m_chs != 0)
@@ -2836,6 +2892,7 @@ struct BVal : public Val {
int m_type;
BVal(const ICol& icol);
int setbnd(Par par) const;
+ int setflt(Par par) const;
};
BVal::BVal(const ICol& icol) :
@@ -2855,6 +2912,27 @@ BVal::setbnd(Par par) const
return 0;
}
+int
+BVal::setflt(Par par) const
+{
+ static unsigned index_bound_to_filter_bound[5] = {
+ NdbScanFilter::COND_GE,
+ NdbScanFilter::COND_GT,
+ NdbScanFilter::COND_LE,
+ NdbScanFilter::COND_LT,
+ NdbScanFilter::COND_EQ
+ };
+ Con& con = par.con();
+ assert(g_compare_null || ! m_null);
+ const char* addr = ! m_null ? (const char*)dataaddr() : 0;
+ const ICol& icol = m_icol;
+ const Col& col = icol.m_col;
+ unsigned length = col.m_bytesize;
+ unsigned cond = index_bound_to_filter_bound[m_type];
+ CHK(con.setFilter(col.m_num, cond, addr, length) == 0);
+ return 0;
+}
+
static NdbOut&
operator<<(NdbOut& out, const BVal& bval)
{
@@ -2882,6 +2960,7 @@ struct BSet {
void calc(Par par);
void calcpk(Par par, unsigned i);
int setbnd(Par par) const;
+ int setflt(Par par) const;
void filter(Par par, const Set& set, Set& set2) const;
};
@@ -3005,6 +3084,33 @@ BSet::setbnd(Par par) const
return 0;
}
+int
+BSet::setflt(Par par) const
+{
+ Con& con = par.con();
+ CHK(con.getNdbScanFilter() == 0);
+ CHK(con.beginFilter(NdbScanFilter::AND) == 0);
+ if (m_bvals != 0) {
+ unsigned p1 = urandom(m_bvals);
+ unsigned p2 = 10009; // prime
+ const unsigned extras = 5;
+ // random order
+ for (unsigned j = 0; j < m_bvals + extras; j++) {
+ unsigned k = p1 + p2 * j;
+ const BVal& bval = *m_bval[k % m_bvals];
+ CHK(bval.setflt(par) == 0);
+ }
+ // duplicate
+ if (urandom(5) == 0) {
+ unsigned k = urandom(m_bvals);
+ const BVal& bval = *m_bval[k];
+ CHK(bval.setflt(par) == 0);
+ }
+ }
+ CHK(con.endFilter() == 0);
+ return 0;
+}
+
void
BSet::filter(Par par, const Set& set, Set& set2) const
{
@@ -3594,12 +3700,69 @@ scanreadindexfast(Par par, const ITab& itab, const BSet& bset, unsigned countche
}
static int
+scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
+{
+ Con& con = par.con();
+ const Tab& tab = par.tab();
+ const Set& set = par.set();
+ Set set1(tab, set.m_rows);
+ if (calc) {
+ while (true) {
+ bset.calc(par);
+ bset.filter(par, set, set1);
+ unsigned n = set1.count();
+ // prefer proper subset
+ if (0 < n && n < set.m_rows)
+ break;
+ if (urandom(3) == 0)
+ break;
+ set1.reset();
+ }
+ } else {
+ bset.filter(par, set, set1);
+ }
+ LL3("scanfilter " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify);
+ Set set2(tab, set.m_rows);
+ CHK(con.startTransaction() == 0);
+ CHK(con.getNdbScanOperation(tab) == 0);
+ CHK(con.readTuples(par) == 0);
+ CHK(bset.setflt(par) == 0);
+ set2.getval(par);
+ CHK(con.executeScan() == 0);
+ unsigned n = 0;
+ bool deadlock = false;
+ while (1) {
+ int ret;
+ deadlock = par.m_deadlock;
+ CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
+ if (ret == 1)
+ break;
+ if (deadlock) {
+ LL1("scanfilter: stop on deadlock");
+ break;
+ }
+ unsigned i = (unsigned)-1;
+ CHK(set2.getkey(par, &i) == 0);
+ CHK(set2.putval(i, par.m_dups, n) == 0);
+ LL4("key " << i << " row " << n << ": " << *set2.m_row[i]);
+ n++;
+ }
+ con.closeTransaction();
+ if (par.m_verify) {
+ CHK(set1.verify(par, set2) == 0);
+ }
+ LL3("scanfilter " << itab.m_name << " done rows=" << n);
+ return 0;
+}
+
+static int
scanreadindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subsubloop; i++) {
if (itab.m_type == ITab::OrderedIndex) {
BSet bset(tab, itab, par.m_rows);
+ CHK(scanreadfilter(par, itab, bset, true) == 0);
CHK(scanreadindex(par, itab, bset, true) == 0);
}
}
@@ -3626,8 +3789,7 @@ scanreadindex(Par par)
static int
scanreadall(Par par)
{
- if (par.m_no < 11)
- CHK(scanreadtable(par) == 0);
+ CHK(scanreadtable(par) == 0);
CHK(scanreadindex(par) == 0);
return 0;
}