summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <pekka@mysql.com>2004-12-13 11:17:55 +0100
committerunknown <pekka@mysql.com>2004-12-13 11:17:55 +0100
commit756572e12d1efe1df81fb85df38dd1aad97d45dc (patch)
tree419a37a372189f52399734df878b5c741078a86a
parenta59a6361e8521458f2a9b5867681ce1a86af468b (diff)
parenta7fe1c393391f77f90a84ed2240542af3ca33ad4 (diff)
downloadmariadb-git-756572e12d1efe1df81fb85df38dd1aad97d45dc.tar.gz
Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-5.0-ndb
into mysql.com:/space/pekka/ndb/version/my50-cc
-rw-r--r--ndb/include/kernel/AttributeDescriptor.hpp12
-rw-r--r--ndb/include/kernel/ndb_limits.h5
-rw-r--r--ndb/include/kernel/signaldata/TuxBound.hpp4
-rw-r--r--ndb/include/util/NdbSqlUtil.hpp24
-rw-r--r--ndb/src/common/util/NdbSqlUtil.cpp561
-rw-r--r--ndb/src/kernel/blocks/dbacc/Dbacc.hpp21
-rw-r--r--ndb/src/kernel/blocks/dbacc/DbaccInit.cpp1
-rw-r--r--ndb/src/kernel/blocks/dbacc/DbaccMain.cpp143
-rw-r--r--ndb/src/kernel/blocks/dbdict/Dbdict.cpp29
-rw-r--r--ndb/src/kernel/blocks/dblqh/DblqhMain.cpp2
-rw-r--r--ndb/src/kernel/blocks/dbtc/Dbtc.hpp16
-rw-r--r--ndb/src/kernel/blocks/dbtc/DbtcMain.cpp101
-rw-r--r--ndb/src/kernel/blocks/dbtup/Dbtup.hpp20
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp8
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp9
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp162
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp6
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp36
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp2
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp23
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp2
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp7
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp110
-rw-r--r--ndb/src/ndbapi/NdbDictionaryImpl.cpp5
-rw-r--r--ndb/src/ndbapi/NdbOperationDefine.cpp10
-rw-r--r--ndb/src/ndbapi/NdbOperationSearch.cpp29
-rw-r--r--ndb/src/ndbapi/NdbScanOperation.cpp29
-rw-r--r--ndb/src/ndbapi/ndberror.c2
-rw-r--r--ndb/test/ndbapi/Makefile.am4
-rw-r--r--ndb/test/ndbapi/testOIBasic.cpp1191
-rw-r--r--ndb/tools/desc.cpp2
31 files changed, 1592 insertions, 984 deletions
diff --git a/ndb/include/kernel/AttributeDescriptor.hpp b/ndb/include/kernel/AttributeDescriptor.hpp
index 071d45e2607..789325c7939 100644
--- a/ndb/include/kernel/AttributeDescriptor.hpp
+++ b/ndb/include/kernel/AttributeDescriptor.hpp
@@ -19,6 +19,8 @@
class AttributeDescriptor {
friend class Dbdict;
+ friend class Dbtc;
+ friend class Dbacc;
friend class Dbtup;
friend class Dbtux;
@@ -36,6 +38,7 @@ private:
static Uint32 getType(const Uint32 &);
static Uint32 getSize(const Uint32 &);
+ static Uint32 getSizeInBytes(const Uint32 &);
static Uint32 getSizeInWords(const Uint32 &);
static Uint32 getArrayType(const Uint32 &);
static Uint32 getArraySize(const Uint32 &);
@@ -79,6 +82,8 @@ private:
#define AD_SIZE_SHIFT (4)
#define AD_SIZE_MASK (7)
+#define AD_SIZE_IN_BYTES_SHIFT (3)
+
#define AD_SIZE_IN_WORDS_OFFSET (31)
#define AD_SIZE_IN_WORDS_SHIFT (5)
@@ -187,6 +192,13 @@ AttributeDescriptor::getSize(const Uint32 & desc){
inline
Uint32
+AttributeDescriptor::getSizeInBytes(const Uint32 & desc){
+ return (getArraySize(desc) << getSize(desc))
+ >> AD_SIZE_IN_BYTES_SHIFT;
+}
+
+inline
+Uint32
AttributeDescriptor::getSizeInWords(const Uint32 & desc){
return ((getArraySize(desc) << getSize(desc))
+ AD_SIZE_IN_WORDS_OFFSET)
diff --git a/ndb/include/kernel/ndb_limits.h b/ndb/include/kernel/ndb_limits.h
index 4ed38a194e4..384882d6e5f 100644
--- a/ndb/include/kernel/ndb_limits.h
+++ b/ndb/include/kernel/ndb_limits.h
@@ -119,6 +119,11 @@
#define NDB_BLOB_HEAD_SIZE 2 /* sizeof(NdbBlob::Head) >> 2 */
/*
+ * Character sets.
+ */
+#define MAX_XFRM_MULTIPLY 8 /* max expansion when normalizing */
+
+/*
* Long signals
*/
#define NDB_SECTION_SEGMENT_SZ 60
diff --git a/ndb/include/kernel/signaldata/TuxBound.hpp b/ndb/include/kernel/signaldata/TuxBound.hpp
index d829d6e82dd..7e12897407b 100644
--- a/ndb/include/kernel/signaldata/TuxBound.hpp
+++ b/ndb/include/kernel/signaldata/TuxBound.hpp
@@ -34,7 +34,9 @@ public:
enum ErrorCode {
InvalidAttrInfo = 4110,
InvalidBounds = 4259,
- OutOfBuffers = 873
+ OutOfBuffers = 873,
+ InvalidCharFormat = 744,
+ TooMuchAttrInfo = 823
};
STATIC_CONST( SignalLength = 3 );
private:
diff --git a/ndb/include/util/NdbSqlUtil.hpp b/ndb/include/util/NdbSqlUtil.hpp
index 55fb76df354..3deaf81cfc7 100644
--- a/ndb/include/util/NdbSqlUtil.hpp
+++ b/ndb/include/util/NdbSqlUtil.hpp
@@ -37,17 +37,23 @@ public:
const char* s2, unsigned n2, bool padded);
/**
- * Compare kernel attribute values. Returns -1, 0, +1 for less,
- * equal, greater, respectively. Parameters are pointers to values,
- * full attribute size in words, and size of available data in words.
- * There is also pointer to type specific extra info. Char types
- * receive CHARSET_INFO in it.
+ * Compare attribute values. Returns -1, 0, +1 for less, equal,
+ * greater, respectively. Parameters are pointers to values and their
+ * lengths in bytes. The lengths can differ.
*
- * If available size is less than full size, CmpUnknown may be
- * returned. If a value cannot be parsed, it compares like NULL i.e.
- * less than any valid value.
+ * First value is a full value but second value can be partial. If
+ * the partial value is not enough to determine the result, CmpUnknown
+ * will be returned. A shorter second value is not necessarily
+ * partial. Partial values are allowed only for types where prefix
+ * comparison is possible (basically, binary types).
+ *
+ * First parameter is a pointer to type specific extra info. Char
+ * types receive CHARSET_INFO in it.
+ *
+ * If a value cannot be parsed, it compares like NULL i.e. less than
+ * any valid value.
*/
- typedef int Cmp(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size);
+ typedef int Cmp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full);
enum CmpResult {
CmpLess = -1,
diff --git a/ndb/src/common/util/NdbSqlUtil.cpp b/ndb/src/common/util/NdbSqlUtil.cpp
index 6798655a71a..36b5075ef1b 100644
--- a/ndb/src/common/util/NdbSqlUtil.cpp
+++ b/ndb/src/common/util/NdbSqlUtil.cpp
@@ -70,7 +70,7 @@ NdbSqlUtil::char_like(const char* s1, unsigned n1,
return i1 == n2 && i2 == n2;
}
-/**
+/*
* Data types.
*/
@@ -138,7 +138,7 @@ NdbSqlUtil::m_typeList[] = {
},
{
Type::Varchar,
- cmpVarchar
+ NULL // cmpVarchar
},
{
Type::Binary,
@@ -146,23 +146,23 @@ NdbSqlUtil::m_typeList[] = {
},
{
Type::Varbinary,
- cmpVarbinary
+ NULL // cmpVarbinary
},
{
Type::Datetime,
- cmpDatetime
+ NULL // cmpDatetime
},
{
Type::Timespec,
- cmpTimespec
+ NULL // cmpTimespec
},
{
Type::Blob,
- cmpBlob
+ NULL // cmpBlob
},
{
Type::Text,
- cmpText
+ NULL // cmpText
}
};
@@ -195,374 +195,299 @@ NdbSqlUtil::getTypeBinary(Uint32 typeId)
return getType(typeId);
}
-// compare
+/*
+ * Comparison functions.
+ */
int
-NdbSqlUtil::cmpTinyint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpTinyint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- union { Uint32 p[1]; Int8 v; } u1, u2;
- u1.p[0] = p1[0];
- u2.p[0] = p2[0];
- if (u1.v < u2.v)
- return -1;
- if (u1.v > u2.v)
- return +1;
- return 0;
+ if (n2 >= sizeof(Int8)) {
+ Int8 v1, v2;
+ memcpy(&v1, p1, sizeof(Int8));
+ memcpy(&v2, p2, sizeof(Int8));
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
+ }
+ assert(! full);
+ return CmpUnknown;
}
int
-NdbSqlUtil::cmpTinyunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpTinyunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- union { Uint32 p[1]; Uint8 v; } u1, u2;
- u1.p[0] = p1[0];
- u2.p[0] = p2[0];
- if (u1.v < u2.v)
- return -1;
- if (u1.v > u2.v)
- return +1;
- return 0;
+ if (n2 >= sizeof(Uint8)) {
+ Uint8 v1, v2;
+ memcpy(&v1, p1, sizeof(Uint8));
+ memcpy(&v2, p2, sizeof(Uint8));
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
+ }
+ assert(! full);
+ return CmpUnknown;
}
int
-NdbSqlUtil::cmpSmallint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpSmallint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- union { Uint32 p[1]; Int16 v; } u1, u2;
- u1.p[0] = p1[0];
- u2.p[0] = p2[0];
- if (u1.v < u2.v)
- return -1;
- if (u1.v > u2.v)
- return +1;
- return 0;
+ if (n2 >= sizeof(Int16)) {
+ Int16 v1, v2;
+ memcpy(&v1, p1, sizeof(Int16));
+ memcpy(&v2, p2, sizeof(Int16));
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
+ }
+ assert(! full);
+ return CmpUnknown;
}
int
-NdbSqlUtil::cmpSmallunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpSmallunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- union { Uint32 p[1]; Uint16 v; } u1, u2;
- u1.p[0] = p1[0];
- u2.p[0] = p2[0];
- if (u1.v < u2.v)
- return -1;
- if (u1.v > u2.v)
- return +1;
- return 0;
+ if (n2 >= sizeof(Uint16)) {
+ Uint16 v1, v2;
+ memcpy(&v1, p1, sizeof(Uint16));
+ memcpy(&v2, p2, sizeof(Uint16));
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
+ }
+ assert(! full);
+ return CmpUnknown;
}
int
-NdbSqlUtil::cmpMediumint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpMediumint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- union { const Uint32* p; const unsigned char* v; } u1, u2;
- u1.p = p1;
- u2.p = p2;
- Int32 v1 = sint3korr(u1.v);
- Int32 v2 = sint3korr(u2.v);
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
+ if (n2 >= 3) {
+ Int32 v1, v2;
+ v1 = sint3korr((const uchar*)p1);
+ v2 = sint3korr((const uchar*)p2);
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
+ }
+ assert(! full);
+ return CmpUnknown;
}
int
-NdbSqlUtil::cmpMediumunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpMediumunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- union { const Uint32* p; const unsigned char* v; } u1, u2;
- u1.p = p1;
- u2.p = p2;
- Uint32 v1 = uint3korr(u1.v);
- Uint32 v2 = uint3korr(u2.v);
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
+ if (n2 >= 3) {
+ Uint32 v1, v2;
+ v1 = uint3korr((const uchar*)p1);
+ v2 = uint3korr((const uchar*)p2);
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
+ }
+ assert(! full);
+ return CmpUnknown;
}
int
-NdbSqlUtil::cmpInt(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpInt(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- union { Uint32 p[1]; Int32 v; } u1, u2;
- u1.p[0] = p1[0];
- u2.p[0] = p2[0];
- if (u1.v < u2.v)
- return -1;
- if (u1.v > u2.v)
- return +1;
- return 0;
+ if (n2 >= sizeof(Int32)) {
+ Int32 v1, v2;
+ memcpy(&v1, p1, sizeof(Int32));
+ memcpy(&v2, p2, sizeof(Int32));
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
+ }
+ assert(! full);
+ return CmpUnknown;
}
int
-NdbSqlUtil::cmpUnsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpUnsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- union { Uint32 p[1]; Uint32 v; } u1, u2;
- u1.v = p1[0];
- u2.v = p2[0];
- if (u1.v < u2.v)
- return -1;
- if (u1.v > u2.v)
- return +1;
- return 0;
+ if (n2 >= sizeof(Uint32)) {
+ Uint32 v1, v2;
+ memcpy(&v1, p1, sizeof(Uint32));
+ memcpy(&v2, p2, sizeof(Uint32));
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
+ }
+ assert(! full);
+ return CmpUnknown;
}
int
-NdbSqlUtil::cmpBigint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpBigint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- if (size >= 2) {
- union { Uint32 p[2]; Int64 v; } u1, u2;
- u1.p[0] = p1[0];
- u1.p[1] = p1[1];
- u2.p[0] = p2[0];
- u2.p[1] = p2[1];
- if (u1.v < u2.v)
+ if (n2 >= sizeof(Int64)) {
+ Int64 v1, v2;
+ memcpy(&v1, p1, sizeof(Int64));
+ memcpy(&v2, p2, sizeof(Int64));
+ if (v1 < v2)
return -1;
- if (u1.v > u2.v)
+ if (v1 > v2)
return +1;
return 0;
}
+ assert(! full);
return CmpUnknown;
}
int
-NdbSqlUtil::cmpBigunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpBigunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- if (size >= 2) {
- union { Uint32 p[2]; Uint64 v; } u1, u2;
- u1.p[0] = p1[0];
- u1.p[1] = p1[1];
- u2.p[0] = p2[0];
- u2.p[1] = p2[1];
- if (u1.v < u2.v)
+ if (n2 >= sizeof(Uint64)) {
+ Uint64 v1, v2;
+ memcpy(&v1, p1, sizeof(Uint64));
+ memcpy(&v2, p2, sizeof(Uint64));
+ if (v1 < v2)
return -1;
- if (u1.v > u2.v)
+ if (v1 > v2)
return +1;
return 0;
}
+ assert(! full);
return CmpUnknown;
}
int
-NdbSqlUtil::cmpFloat(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpFloat(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- union { Uint32 p[1]; float v; } u1, u2;
- u1.p[0] = p1[0];
- u2.p[0] = p2[0];
- // no format check
- if (u1.v < u2.v)
- return -1;
- if (u1.v > u2.v)
- return +1;
- return 0;
+ if (n2 >= sizeof(float)) {
+ float v1, v2;
+ memcpy(&v1, p1, sizeof(float));
+ memcpy(&v2, p2, sizeof(float));
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
+ }
+ assert(! full);
+ return CmpUnknown;
}
int
-NdbSqlUtil::cmpDouble(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpDouble(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- if (size >= 2) {
- union { Uint32 p[2]; double v; } u1, u2;
- u1.p[0] = p1[0];
- u1.p[1] = p1[1];
- u2.p[0] = p2[0];
- u2.p[1] = p2[1];
- // no format check
- if (u1.v < u2.v)
+ if (n2 >= sizeof(double)) {
+ double v1, v2;
+ memcpy(&v1, p1, sizeof(double));
+ memcpy(&v2, p2, sizeof(double));
+ if (v1 < v2)
return -1;
- if (u1.v > u2.v)
+ if (v1 > v2)
return +1;
return 0;
}
+ assert(! full);
return CmpUnknown;
}
+// not used by MySQL or NDB
int
-NdbSqlUtil::cmpDecimal(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpDecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- // not used by MySQL or NDB
assert(false);
return 0;
}
int
-NdbSqlUtil::cmpChar(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
// collation does not work on prefix for some charsets
- assert(full == size && size > 0);
- /*
- * Char is blank-padded to length and null-padded to word size.
- */
- union { const Uint32* p; const uchar* v; } u1, u2;
- u1.p = p1;
- u2.p = p2;
+ assert(full);
+ const uchar* v1 = (const uchar*)p1;
+ const uchar* v2 = (const uchar*)p2;
// not const in MySQL
CHARSET_INFO* cs = (CHARSET_INFO*)(info);
- // length in bytes including null padding to Uint32
- uint l1 = (full << 2);
- int k = (*cs->coll->strnncollsp)(cs, u1.v, l1, u2.v, l1, 0);
+ // compare with space padding
+ int k = (*cs->coll->strnncollsp)(cs, v1, n1, v2, n2, false);
return k < 0 ? -1 : k > 0 ? +1 : 0;
}
+// waiting for MySQL and new NDB implementation
int
-NdbSqlUtil::cmpVarchar(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- /*
- * Varchar is not allowed to contain a null byte and the value is
- * null-padded. Therefore comparison does not need to use the length.
- *
- * Not used before MySQL 5.0. Format is likely to change. Handle
- * only binary collation for now.
- */
- union { const Uint32* p; const char* v; } u1, u2;
- u1.p = p1;
- u2.p = p2;
- // skip length in first 2 bytes
- int k = strncmp(u1.v + 2, u2.v + 2, (size << 2) - 2);
- return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
+ assert(false);
+ return 0;
}
int
-NdbSqlUtil::cmpBinary(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- /*
- * Binary data of full length. Compare bytewise.
- */
- union { const Uint32* p; const unsigned char* v; } u1, u2;
- u1.p = p1;
- u2.p = p2;
- int k = memcmp(u1.v, u2.v, size << 2);
- return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
+ const uchar* v1 = (const uchar*)p1;
+ const uchar* v2 = (const uchar*)p2;
+ // compare as binary strings
+ unsigned n = (n1 <= n2 ? n1 : n2);
+ int k = memcmp(v1, v2, n);
+ if (k == 0) {
+ if (full)
+ k = (int)n1 - (int)n2;
+ else
+ k = (int)n - (int)n2;
+ }
+ return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
}
+// waiting for MySQL and new NDB implementation
int
-NdbSqlUtil::cmpVarbinary(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- /*
- * Binary data of variable length padded with nulls. The comparison
- * does not need to use the length.
- *
- * Not used before MySQL 5.0. Format is likely to change.
- */
- union { const Uint32* p; const unsigned char* v; } u1, u2;
- u1.p = p1;
- u2.p = p2;
- // skip length in first 2 bytes
- int k = memcmp(u1.v + 2, u2.v + 2, (size << 2) - 2);
- return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
+ assert(false);
+ return 0;
}
+// not used by MySQL or NDB
int
-NdbSqlUtil::cmpDatetime(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpDatetime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- /*
- * Datetime is CC YY MM DD hh mm ss \0
- *
- * Not used via MySQL.
- */
- union { const Uint32* p; const unsigned char* v; } u1, u2;
- u1.p = p1;
- u2.p = p2;
- // no format check
- int k = memcmp(u1.v, u2.v, 4);
- if (k != 0)
- return k < 0 ? -1 : +1;
- if (size >= 2) {
- k = memcmp(u1.v + 4, u2.v + 4, 4);
- return k < 0 ? -1 : k > 0 ? +1 : 0;
- }
- return CmpUnknown;
+ assert(false);
+ return 0;
}
+// not used by MySQL or NDB
int
-NdbSqlUtil::cmpTimespec(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpTimespec(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- /*
- * Timespec is CC YY MM DD hh mm ss \0 NN NN NN NN
- *
- * Not used via MySQL.
- */
- union { const Uint32* p; const unsigned char* v; } u1, u2;
- u1.p = p1;
- u2.p = p2;
- // no format check
- int k = memcmp(u1.v, u2.v, 4);
- if (k != 0)
- return k < 0 ? -1 : +1;
- if (size >= 2) {
- k = memcmp(u1.v + 4, u2.v + 4, 4);
- if (k != 0)
- return k < 0 ? -1 : +1;
- if (size >= 3) {
- Uint32 n1 = *(const Uint32*)(u1.v + 8);
- Uint32 n2 = *(const Uint32*)(u2.v + 8);
- if (n1 < n2)
- return -1;
- if (n2 > n1)
- return +1;
- return 0;
- }
- }
- return CmpUnknown;
+ assert(false);
+ return 0;
}
+// not supported
int
-NdbSqlUtil::cmpBlob(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpBlob(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(full >= size && size > 0);
- /*
- * Blob comparison is on the inline bytes (null padded).
- */
- const unsigned head = NDB_BLOB_HEAD_SIZE;
- // skip blob head
- if (size >= head + 1) {
- union { const Uint32* p; const unsigned char* v; } u1, u2;
- u1.p = p1 + head;
- u2.p = p2 + head;
- int k = memcmp(u1.v, u2.v, (size - head) << 2);
- return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
- }
- return CmpUnknown;
+ assert(false);
+ return 0;
}
+// not supported
int
-NdbSqlUtil::cmpText(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- // collation does not work on prefix for some charsets
- assert(full == size && size > 0);
- /*
- * Text comparison is on the inline bytes (blank padded). Currently
- * not supported for multi-byte charsets.
- */
- const unsigned head = NDB_BLOB_HEAD_SIZE;
- // skip blob head
- if (size >= head + 1) {
- union { const Uint32* p; const uchar* v; } u1, u2;
- u1.p = p1 + head;
- u2.p = p2 + head;
- // not const in MySQL
- CHARSET_INFO* cs = (CHARSET_INFO*)(info);
- // length in bytes including null padding to Uint32
- uint l1 = (full << 2);
- int k = (*cs->coll->strnncollsp)(cs, u1.v, l1, u2.v, l1,0);
- return k < 0 ? -1 : k > 0 ? +1 : 0;
- }
- return CmpUnknown;
+ assert(false);
+ return 0;
}
// check charset
@@ -572,8 +497,6 @@ NdbSqlUtil::usable_in_pk(Uint32 typeId, const void* info)
{
const Type& type = getType(typeId);
switch (type.m_typeId) {
- case Type::Undefined:
- break;
case Type::Char:
{
const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
@@ -582,11 +505,12 @@ NdbSqlUtil::usable_in_pk(Uint32 typeId, const void* info)
cs->cset != 0 &&
cs->coll != 0 &&
cs->coll->strnxfrm != 0 &&
- cs->strxfrm_multiply <= 1; // current limitation
+ cs->strxfrm_multiply <= MAX_XFRM_MULTIPLY;
}
break;
+ case Type::Undefined:
case Type::Varchar:
- return true; // Varchar not used via MySQL
+ case Type::Varbinary:
case Type::Blob:
case Type::Text:
break;
@@ -606,9 +530,9 @@ bool
NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info)
{
const Type& type = getType(typeId);
+ if (type.m_cmp == NULL)
+ return false;
switch (type.m_typeId) {
- case Type::Undefined:
- break;
case Type::Char:
{
const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
@@ -618,92 +542,17 @@ NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info)
cs->coll != 0 &&
cs->coll->strnxfrm != 0 &&
cs->coll->strnncollsp != 0 &&
- cs->strxfrm_multiply <= 1; // current limitation
+ cs->strxfrm_multiply <= MAX_XFRM_MULTIPLY;
}
break;
+ case Type::Undefined:
case Type::Varchar:
- return true; // Varchar not used via MySQL
+ case Type::Varbinary:
+ case Type::Blob:
case Type::Text:
- {
- const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
- return
- cs != 0 &&
- cs->mbmaxlen == 1 && // extra limitation
- cs->cset != 0 &&
- cs->coll != 0 &&
- cs->coll->strnxfrm != 0 &&
- cs->coll->strnncollsp != 0 &&
- cs->strxfrm_multiply <= 1; // current limitation
- }
break;
default:
return true;
}
return false;
}
-
-#ifdef NDB_SQL_UTIL_TEST
-
-#include <NdbTick.h>
-#include <NdbOut.hpp>
-
-struct Testcase {
- int op; // 1=compare 2=like
- int res;
- const char* s1;
- const char* s2;
- int pad;
-};
-const Testcase testcase[] = {
- { 2, 1, "abc", "abc", 0 },
- { 2, 1, "abc", "abc%", 0 },
- { 2, 1, "abcdef", "abc%", 0 },
- { 2, 1, "abcdefabcdefabcdef", "abc%", 0 },
- { 2, 1, "abcdefabcdefabcdef", "abc%f", 0 },
- { 2, 0, "abcdefabcdefabcdef", "abc%z", 0 },
- { 2, 1, "abcdefabcdefabcdef", "%f", 0 },
- { 2, 1, "abcdef", "a%b%c%d%e%f", 0 },
- { 0, 0, 0, 0 }
-};
-
-int
-main(int argc, char** argv)
-{
- ndb_init(); // for charsets
- unsigned count = argc > 1 ? atoi(argv[1]) : 1000000;
- ndbout_c("count = %u", count);
- assert(count != 0);
- for (const Testcase* t = testcase; t->s1 != 0; t++) {
- ndbout_c("%d = '%s' %s '%s' pad=%d",
- t->res, t->s1, t->op == 1 ? "comp" : "like", t->s2);
- NDB_TICKS x1 = NdbTick_CurrentMillisecond();
- unsigned n1 = strlen(t->s1);
- unsigned n2 = strlen(t->s2);
- for (unsigned i = 0; i < count; i++) {
- if (t->op == 1) {
- int res = NdbSqlUtil::char_compare(t->s1, n1, t->s2, n2, t->pad);
- assert(res == t->res);
- continue;
- }
- if (t->op == 2) {
- int res = NdbSqlUtil::char_like(t->s1, n1, t->s2, n2, t->pad);
- assert(res == t->res);
- continue;
- }
- assert(false);
- }
- NDB_TICKS x2 = NdbTick_CurrentMillisecond();
- if (x2 < x1)
- x2 = x1;
- double usec = 1000000.0 * double(x2 - x1) / double(count);
- ndbout_c("time %.0f usec per call", usec);
- }
- // quick check
- for (unsigned i = 0; i < sizeof(m_typeList) / sizeof(m_typeList[0]); i++) {
- const NdbSqlUtil::Type& t = m_typeList[i];
- assert(t.m_typeId == i);
- }
- return 0;
-}
-
-#endif
diff --git a/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
index af8ee136934..6a65da5bb6a 100644
--- a/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
+++ b/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
@@ -607,8 +607,7 @@ struct Fragmentrec {
//-----------------------------------------------------------------------------
// elementLength: Length of element in bucket and overflow pages
-// keyLength: Length of key (== 0 if long key or variable key length)
-// wl-2066 always Length of key
+// keyLength: Length of key
//-----------------------------------------------------------------------------
Uint8 elementLength;
Uint16 keyLength;
@@ -637,6 +636,11 @@ struct Fragmentrec {
//-----------------------------------------------------------------------------
Uint8 nodetype;
Uint8 stopQueOp;
+
+//-----------------------------------------------------------------------------
+// flag to avoid accessing table record if no char attributes
+//-----------------------------------------------------------------------------
+ Uint8 hasCharAttr;
};
typedef Ptr<Fragmentrec> FragmentrecPtr;
@@ -719,6 +723,7 @@ struct Operationrec {
State transactionstate;
Uint16 elementContainer;
Uint16 tupkeylen;
+ Uint32 xfrmtupkeylen;
Uint32 userblockref;
Uint32 scanBits;
Uint8 elementIsDisappeared;
@@ -846,6 +851,13 @@ struct Tabrec {
Uint32 fragptrholder[MAX_FRAG_PER_NODE];
Uint32 tabUserPtr;
BlockReference tabUserRef;
+
+ Uint8 noOfKeyAttr;
+ Uint8 hasCharAttr;
+ struct KeyAttr {
+ Uint32 attributeDescriptor;
+ CHARSET_INFO* charsetInfo;
+ } keyAttr[MAX_ATTRIBUTES_IN_INDEX];
};
typedef Ptr<Tabrec> TabrecPtr;
@@ -891,6 +903,7 @@ private:
void execACCKEYREQ(Signal* signal);
void execACCSEIZEREQ(Signal* signal);
void execACCFRAGREQ(Signal* signal);
+ void execTC_SCHVERREQ(Signal* signal);
void execACC_SRREQ(Signal* signal);
void execNEXT_SCANREQ(Signal* signal);
void execACC_ABORTREQ(Signal* signal);
@@ -1016,7 +1029,7 @@ private:
void increaselistcont(Signal* signal);
void seizeLeftlist(Signal* signal);
void seizeRightlist(Signal* signal);
- void readTablePk(Uint32 localkey1);
+ Uint32 readTablePk(Uint32 localkey1);
void getElement(Signal* signal);
void getdirindex(Signal* signal);
void commitdelete(Signal* signal, bool systemRestart);
@@ -1123,6 +1136,8 @@ private:
void lcp_write_op_to_undolog(Signal* signal);
void reenable_expand_after_redo_log_exection_complete(Signal*);
+ // charsets
+ void xfrmKeyData(Signal* signal);
// Initialisation
void initData();
diff --git a/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp b/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp
index c9b6c0ea17d..0b2e8aeeb9a 100644
--- a/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp
+++ b/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp
@@ -179,6 +179,7 @@ Dbacc::Dbacc(const class Configuration & conf):
addRecSignal(GSN_ACCKEYREQ, &Dbacc::execACCKEYREQ);
addRecSignal(GSN_ACCSEIZEREQ, &Dbacc::execACCSEIZEREQ);
addRecSignal(GSN_ACCFRAGREQ, &Dbacc::execACCFRAGREQ);
+ addRecSignal(GSN_TC_SCHVERREQ, &Dbacc::execTC_SCHVERREQ);
addRecSignal(GSN_ACC_SRREQ, &Dbacc::execACC_SRREQ);
addRecSignal(GSN_NEXT_SCANREQ, &Dbacc::execNEXT_SCANREQ);
addRecSignal(GSN_ACC_ABORTREQ, &Dbacc::execACC_ABORTREQ);
diff --git a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
index 1a5e22ac70a..eaf2176b390 100644
--- a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
+++ b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
@@ -16,6 +16,7 @@
#define DBACC_C
#include "Dbacc.hpp"
+#include <my_sys.h>
#include <AttributeHeader.hpp>
#include <signaldata/AccFrag.hpp>
@@ -27,6 +28,7 @@
#include <signaldata/FsRemoveReq.hpp>
#include <signaldata/DropTab.hpp>
#include <signaldata/DumpStateOrd.hpp>
+#include <SectionReader.hpp>
// TO_DO_RONM is a label for comments on what needs to be improved in future versions
// when more time is given.
@@ -1033,6 +1035,12 @@ void Dbacc::initialiseTableRec(Signal* signal)
tabptr.p->fragholder[i] = RNIL;
tabptr.p->fragptrholder[i] = RNIL;
}//for
+ tabptr.p->noOfKeyAttr = 0;
+ tabptr.p->hasCharAttr = 0;
+ for (Uint32 k = 0; k < MAX_ATTRIBUTES_IN_INDEX; k++) {
+ tabptr.p->keyAttr[k].attributeDescriptor = 0;
+ tabptr.p->keyAttr[k].charsetInfo = 0;
+ }
}//for
}//Dbacc::initialiseTableRec()
@@ -1188,6 +1196,66 @@ void Dbacc::addFragRefuse(Signal* signal, Uint32 errorCode)
}//Dbacc::addFragRefuseEarly()
void
+Dbacc::execTC_SCHVERREQ(Signal* signal)
+{
+ jamEntry();
+ if (! assembleFragments(signal)) {
+ jam();
+ return;
+ }
+ tabptr.i = signal->theData[0];
+ ptrCheckGuard(tabptr, ctablesize, tabrec);
+ Uint32 noOfKeyAttr = signal->theData[6];
+ ndbrequire(noOfKeyAttr <= MAX_ATTRIBUTES_IN_INDEX);
+ Uint32 hasCharAttr = 0;
+
+ SegmentedSectionPtr s0Ptr;
+ signal->getSection(s0Ptr, 0);
+ SectionReader r0(s0Ptr, getSectionSegmentPool());
+ Uint32 i = 0;
+ while (i < noOfKeyAttr) {
+ jam();
+ Uint32 attributeDescriptor = ~0;
+ Uint32 csNumber = ~0;
+ if (! r0.getWord(&attributeDescriptor) ||
+ ! r0.getWord(&csNumber)) {
+ jam();
+ break;
+ }
+ CHARSET_INFO* cs = 0;
+ if (csNumber != 0) {
+ cs = all_charsets[csNumber];
+ ndbrequire(cs != 0);
+ hasCharAttr = 1;
+ }
+ tabptr.p->keyAttr[i].attributeDescriptor = attributeDescriptor;
+ tabptr.p->keyAttr[i].charsetInfo = cs;
+ i++;
+ }
+ ndbrequire(i == noOfKeyAttr);
+ releaseSections(signal);
+
+ tabptr.p->noOfKeyAttr = noOfKeyAttr;
+ tabptr.p->hasCharAttr = hasCharAttr;
+
+ // copy char attr flag to each fragment
+ for (Uint32 i1 = 0; i1 < MAX_FRAG_PER_NODE; i1++) {
+ jam();
+ if (tabptr.p->fragptrholder[i1] != RNIL) {
+ rootfragrecptr.i = tabptr.p->fragptrholder[i1];
+ ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
+ for (Uint32 i2 = 0; i2 < 2; i2++) {
+ fragrecptr.i = rootfragrecptr.p->fragmentptr[i2];
+ ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
+ fragrecptr.p->hasCharAttr = hasCharAttr;
+ }
+ }
+ }
+
+ // no reply to DICT
+}
+
+void
Dbacc::execDROP_TAB_REQ(Signal* signal){
jamEntry();
DropTabReq* req = (DropTabReq*)signal->getDataPtr();
@@ -1550,6 +1618,7 @@ void Dbacc::initOpRec(Signal* signal)
operationRecPtr.p->hashValue = signal->theData[3];
operationRecPtr.p->tupkeylen = signal->theData[4];
+ operationRecPtr.p->xfrmtupkeylen = signal->theData[4];
operationRecPtr.p->transId1 = signal->theData[5];
operationRecPtr.p->transId2 = signal->theData[6];
operationRecPtr.p->transactionstate = ACTIVE;
@@ -1664,6 +1733,10 @@ void Dbacc::execACCKEYREQ(Signal* signal)
ndbrequire(operationRecPtr.p->transactionstate == IDLE);
initOpRec(signal);
+ // normalize key if any char attr
+ if (! operationRecPtr.p->isAccLockReq && fragrecptr.p->hasCharAttr)
+ xfrmKeyData(signal);
+
/*---------------------------------------------------------------*/
/* */
/* WE WILL USE THE HASH VALUE TO LOOK UP THE PROPER MEMORY */
@@ -1758,6 +1831,54 @@ void Dbacc::execACCKEYREQ(Signal* signal)
return;
}//Dbacc::execACCKEYREQ()
+void
+Dbacc::xfrmKeyData(Signal* signal)
+{
+ tabptr.i = fragrecptr.p->myTableId;
+ ptrCheckGuard(tabptr, ctablesize, tabrec);
+
+ Uint32 dst[1024];
+ Uint32 dstSize = (sizeof(dst) >> 2);
+ Uint32* src = &signal->theData[7];
+ const Uint32 noOfKeyAttr = tabptr.p->noOfKeyAttr;
+ Uint32 dstPos = 0;
+ Uint32 srcPos = 0;
+ Uint32 i = 0;
+
+ while (i < noOfKeyAttr) {
+ const Tabrec::KeyAttr& keyAttr = tabptr.p->keyAttr[i];
+
+ Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(keyAttr.attributeDescriptor);
+ Uint32 srcWords = (srcBytes + 3) / 4;
+ Uint32 dstWords = ~0;
+ uchar* dstPtr = (uchar*)&dst[dstPos];
+ const uchar* srcPtr = (const uchar*)&src[srcPos];
+ CHARSET_INFO* cs = keyAttr.charsetInfo;
+
+ if (cs == 0) {
+ jam();
+ memcpy(dstPtr, srcPtr, srcWords << 2);
+ dstWords = srcWords;
+ } else {
+ jam();
+ Uint32 xmul = cs->strxfrm_multiply;
+ if (xmul == 0)
+ xmul = 1;
+ Uint32 dstLen = xmul * srcBytes;
+ ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
+ uint n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes);
+ while ((n & 3) != 0)
+ dstPtr[n++] = 0;
+ dstWords = (n >> 2);
+ }
+ dstPos += dstWords;
+ srcPos += srcWords;
+ i++;
+ }
+ memcpy(src, dst, dstPos << 2);
+ operationRecPtr.p->xfrmtupkeylen = dstPos;
+}
+
void Dbacc::accIsLockedLab(Signal* signal)
{
ndbrequire(csystemRestart == ZFALSE);
@@ -1848,6 +1969,7 @@ void Dbacc::insertelementLab(Signal* signal)
}//if
}//if
if (fragrecptr.p->keyLength != operationRecPtr.p->tupkeylen) {
+ // historical
ndbrequire(fragrecptr.p->keyLength == 0);
}//if
@@ -3251,7 +3373,7 @@ void Dbacc::getdirindex(Signal* signal)
ptrCheckGuard(gdiPageptr, cpagesize, page8);
}//Dbacc::getdirindex()
-void
+Uint32
Dbacc::readTablePk(Uint32 localkey1)
{
Uint32 tableId = fragrecptr.p->myTableId;
@@ -3259,10 +3381,11 @@ Dbacc::readTablePk(Uint32 localkey1)
Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS;
Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1);
#ifdef VM_TRACE
- memset(ckeys, 0x1f, fragrecptr.p->keyLength << 2);
+ memset(ckeys, 0x1f, (fragrecptr.p->keyLength * MAX_XFRM_MULTIPLY) << 2);
#endif
- int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys);
- ndbrequire(ret == fragrecptr.p->keyLength);
+ int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys, true);
+ ndbrequire(ret > 0);
+ return ret;
}
/* --------------------------------------------------------------------------------- */
@@ -3306,7 +3429,6 @@ void Dbacc::getElement(Signal* signal)
Uint32 tgeNextptrtype;
register Uint32 tgeKeyptr;
register Uint32 tgeRemLen;
- register Uint32 tgeCompareLen;
register Uint32 TelemLen = fragrecptr.p->elementLength;
register Uint32* Tkeydata = (Uint32*)&signal->theData[7];
@@ -3314,7 +3436,6 @@ void Dbacc::getElement(Signal* signal)
tgePageindex = tgdiPageindex;
gePageptr = gdiPageptr;
tgeResult = ZFALSE;
- tgeCompareLen = fragrecptr.p->keyLength;
/*
* The value seached is
* - table key for ACCKEYREQ, stored in TUP
@@ -3381,12 +3502,15 @@ void Dbacc::getElement(Signal* signal)
Uint32 localkey2 = 0;
bool found;
if (! searchLocalKey) {
- readTablePk(localkey1);
- found = (memcmp(Tkeydata, ckeys, fragrecptr.p->keyLength << 2) == 0);
+ Uint32 len = readTablePk(localkey1);
+ found = (len == operationRecPtr.p->xfrmtupkeylen) &&
+ (memcmp(Tkeydata, ckeys, len << 2) == 0);
} else {
+ jam();
found = (localkey1 == Tkeydata[0]);
}
if (found) {
+ jam();
tgeLocked = ElementHeader::getLocked(tgeElementHeader);
tgeResult = ZTRUE;
operationRecPtr.p->localdata[0] = localkey1;
@@ -7731,6 +7855,7 @@ void Dbacc::initFragGeneral(FragmentrecPtr regFragPtr)
regFragPtr.p->activeDataPage = 0;
regFragPtr.p->createLcp = ZFALSE;
regFragPtr.p->stopQueOp = ZFALSE;
+ regFragPtr.p->hasCharAttr = ZFALSE;
regFragPtr.p->nextAllocPage = 0;
regFragPtr.p->nrWaitWriteUndoExit = 0;
regFragPtr.p->lastUndoIsStored = ZFALSE;
@@ -8680,6 +8805,7 @@ void Dbacc::srDoUndoLab(Signal* signal)
const Uint32 tkeylen = undopageptr.p->undoword[tmpindex];
tmpindex++;
operationRecPtr.p->tupkeylen = tkeylen;
+ operationRecPtr.p->xfrmtupkeylen = 0; // not used
operationRecPtr.p->fragptr = fragrecptr.i;
ndbrequire(fragrecptr.p->keyLength != 0 &&
@@ -9750,6 +9876,7 @@ void Dbacc::initScanOpRec(Signal* signal)
arrGuard(tisoLocalPtr, 2048);
operationRecPtr.p->keydata[0] = isoPageptr.p->word32[tisoLocalPtr];
operationRecPtr.p->tupkeylen = fragrecptr.p->keyLength;
+ operationRecPtr.p->xfrmtupkeylen = 0; // not used
}//Dbacc::initScanOpRec()
/* --------------------------------------------------------------------------------- */
diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
index cedd2bed415..17972abc2dd 100644
--- a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
+++ b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
@@ -4318,7 +4318,28 @@ Dbdict::execTAB_COMMITCONF(Signal* signal){
signal->theData[3] = reference();
signal->theData[4] = (Uint32)tabPtr.p->tableType;
signal->theData[5] = createTabPtr.p->key;
- sendSignal(DBTC_REF, GSN_TC_SCHVERREQ, signal, 6, JBB);
+ signal->theData[6] = (Uint32)tabPtr.p->noOfPrimkey;
+
+ Uint32 buf[2 * MAX_ATTRIBUTES_IN_INDEX];
+ Uint32 sz = 0;
+ Uint32 tAttr = tabPtr.p->firstAttribute;
+ while (tAttr != RNIL) {
+ jam();
+ AttributeRecord* aRec = c_attributeRecordPool.getPtr(tAttr);
+ if (aRec->tupleKey) {
+ buf[sz++] = aRec->attributeDescriptor;
+ buf[sz++] = (aRec->extPrecision >> 16); // charset number
+ }
+ tAttr = aRec->nextAttrInTable;
+ }
+ ndbrequire(sz == 2 * tabPtr.p->noOfPrimkey);
+
+ LinearSectionPtr lsPtr[3];
+ lsPtr[0].p = buf;
+ lsPtr[0].sz = sz;
+ // note: ACC does not reply
+ sendSignal(DBACC_REF, GSN_TC_SCHVERREQ, signal, 7, JBB, lsPtr, 1);
+ sendSignal(DBTC_REF, GSN_TC_SCHVERREQ, signal, 7, JBB, lsPtr, 1);
return;
}
@@ -4785,12 +4806,18 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
// charset in upper half of precision
unsigned csNumber = (attrPtr.p->extPrecision >> 16);
if (csNumber != 0) {
+ /*
+ * A new charset is first accessed here on this node.
+ * TODO use separate thread (e.g. via NDBFS) if need to load from file
+ */
CHARSET_INFO* cs = get_charset(csNumber, MYF(0));
if (cs == NULL) {
parseP->errorCode = CreateTableRef::InvalidCharset;
parseP->errorLine = __LINE__;
return;
}
+ // XXX should be done somewhere in mysql
+ all_charsets[cs->number] = cs;
unsigned i = 0;
while (i < noOfCharsets) {
if (charsets[i] == csNumber)
diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index 2b531ede3d1..e141ab09617 100644
--- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -8280,7 +8280,7 @@ Dblqh::readPrimaryKeys(ScanRecord *scanP, TcConnectionrec *tcConP, Uint32 *dst)
tableId = tFragPtr.p->tabRef;
}
- int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, dst);
+ int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, dst, false);
if(0)
ndbout_c("readPrimaryKeys(table: %d fragment: %d [ %d %d ] -> %d",
tableId, fragId, fragPageId, pageIndex, ret);
diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
index ccb7d55396d..9afe4a5da0e 100644
--- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
+++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
@@ -972,6 +972,7 @@ public:
typedef Ptr<HostRecord> HostRecordPtr;
/* *********** TABLE RECORD ********************************************* */
+
/********************************************************/
/* THIS RECORD CONTAINS THE CURRENT SCHEMA VERSION OF */
/* ALL TABLES IN THE SYSTEM. */
@@ -982,13 +983,21 @@ public:
Uint8 dropping;
Uint8 tableType;
Uint8 storedTable;
-
+
+ Uint8 noOfKeyAttr;
+ Uint8 hasCharAttr;
+
+ struct KeyAttr {
+ Uint32 attributeDescriptor;
+ CHARSET_INFO* charsetInfo;
+ } keyAttr[MAX_ATTRIBUTES_IN_INDEX];
+
bool checkTable(Uint32 schemaVersion) const {
return enabled && !dropping && (schemaVersion == currentSchemaVersion);
}
-
+
Uint32 getErrorCode(Uint32 schemaVersion) const;
-
+
struct DropTable {
Uint32 senderRef;
Uint32 senderData;
@@ -1436,6 +1445,7 @@ private:
void gcpTcfinished(Signal* signal);
void handleGcp(Signal* signal);
void hash(Signal* signal);
+ Uint32 xfrmKeyData(Signal* signal, Uint32* dst, Uint32 dstSize, const Uint32* src);
void initApiConnect(Signal* signal);
void initApiConnectRec(Signal* signal,
ApiConnectRecord * const regApiPtr,
diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
index 42821e4fe7f..1ce6ffa4817 100644
--- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
+++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
@@ -20,6 +20,7 @@
#include "md5_hash.hpp"
#include <RefConvert.hpp>
#include <ndb_limits.h>
+#include <my_sys.h>
#include <signaldata/EventReport.hpp>
#include <signaldata/TcKeyReq.hpp>
@@ -63,6 +64,8 @@
#include <signaldata/PackedSignal.hpp>
#include <AttributeHeader.hpp>
#include <signaldata/DictTabInfo.hpp>
+#include <AttributeDescriptor.hpp>
+#include <SectionReader.hpp>
#include <NdbOut.hpp>
#include <DebuggerNames.hpp>
@@ -313,6 +316,10 @@ void Dbtc::execREAD_NODESREF(Signal* signal)
void Dbtc::execTC_SCHVERREQ(Signal* signal)
{
jamEntry();
+ if (! assembleFragments(signal)) {
+ jam();
+ return;
+ }
tabptr.i = signal->theData[0];
ptrCheckGuard(tabptr, ctabrecFilesize, tableRecord);
tabptr.p->currentSchemaVersion = signal->theData[1];
@@ -320,10 +327,41 @@ void Dbtc::execTC_SCHVERREQ(Signal* signal)
BlockReference retRef = signal->theData[3];
tabptr.p->tableType = (Uint8)signal->theData[4];
BlockReference retPtr = signal->theData[5];
+ Uint32 noOfKeyAttr = signal->theData[6];
+ ndbrequire(noOfKeyAttr <= MAX_ATTRIBUTES_IN_INDEX);
+ Uint32 hasCharAttr = 0;
+
+ SegmentedSectionPtr s0Ptr;
+ signal->getSection(s0Ptr, 0);
+ SectionReader r0(s0Ptr, getSectionSegmentPool());
+ Uint32 i = 0;
+ while (i < noOfKeyAttr) {
+ jam();
+ Uint32 attributeDescriptor = ~0;
+ Uint32 csNumber = ~0;
+ if (! r0.getWord(&attributeDescriptor) ||
+ ! r0.getWord(&csNumber)) {
+ jam();
+ break;
+ }
+ CHARSET_INFO* cs = 0;
+ if (csNumber != 0) {
+ cs = all_charsets[csNumber];
+ ndbrequire(cs != 0);
+ hasCharAttr = 1;
+ }
+ tabptr.p->keyAttr[i].attributeDescriptor = attributeDescriptor;
+ tabptr.p->keyAttr[i].charsetInfo = cs;
+ i++;
+ }
+ ndbrequire(i == noOfKeyAttr);
+ releaseSections(signal);
ndbrequire(tabptr.p->enabled == false);
tabptr.p->enabled = true;
tabptr.p->dropping = false;
+ tabptr.p->noOfKeyAttr = noOfKeyAttr;
+ tabptr.p->hasCharAttr = hasCharAttr;
signal->theData[0] = tabptr.i;
signal->theData[1] = retPtr;
@@ -2221,6 +2259,7 @@ void Dbtc::hash(Signal* signal)
UintR Tdata3;
UintR* Tdata32;
Uint64 Tdata[512];
+ Uint64 Txfrmdata[512 * MAX_XFRM_MULTIPLY];
CacheRecord * const regCachePtr = cachePtr.p;
Tdata32 = (UintR*)&Tdata[0];
@@ -2250,8 +2289,21 @@ void Dbtc::hash(Signal* signal)
ti += 4;
}//while
}//if
+
+ UintR keylen = (UintR)regCachePtr->keylen;
+
+ TableRecordPtr tabptrSave = tabptr;
+ tabptr.i = regCachePtr->tableref; // table or hash index id
+ ptrCheckGuard(tabptr, ctabrecFilesize, tableRecord);
+ if (tabptr.p->hasCharAttr) {
+ jam();
+ keylen = xfrmKeyData(signal, (Uint32*)Txfrmdata, sizeof(Txfrmdata) >> 2, Tdata32);
+ Tdata32 = (UintR*)&Txfrmdata[0];
+ }
+ tabptr = tabptrSave;
+
Uint32 tmp[4];
- md5_hash(tmp, (Uint64*)&Tdata32[0], (UintR)regCachePtr->keylen);
+ md5_hash(tmp, (Uint64*)&Tdata32[0], keylen);
thashValue = tmp[0];
if (regCachePtr->distributionKeyIndicator == 1) {
@@ -2263,6 +2315,47 @@ void Dbtc::hash(Signal* signal)
}//if
}//Dbtc::hash()
+Uint32
+Dbtc::xfrmKeyData(Signal* signal, Uint32* dst, Uint32 dstSize, const Uint32* src)
+{
+ const Uint32 noOfKeyAttr = tabptr.p->noOfKeyAttr;
+ Uint32 dstPos = 0;
+ Uint32 srcPos = 0;
+ Uint32 i = 0;
+ while (i < noOfKeyAttr) {
+ const TableRecord::KeyAttr& keyAttr = tabptr.p->keyAttr[i];
+
+ Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(keyAttr.attributeDescriptor);
+ Uint32 srcWords = (srcBytes + 3) / 4;
+ Uint32 dstWords = ~0;
+ uchar* dstPtr = (uchar*)&dst[dstPos];
+ const uchar* srcPtr = (const uchar*)&src[srcPos];
+ CHARSET_INFO* cs = keyAttr.charsetInfo;
+
+ if (cs == NULL) {
+ jam();
+ memcpy(dstPtr, srcPtr, srcWords << 2);
+ dstWords = srcWords;
+ } else {
+ jam();
+ Uint32 xmul = cs->strxfrm_multiply;
+ if (xmul == 0)
+ xmul = 1;
+ Uint32 dstLen = xmul * srcBytes;
+ ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
+ uint n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes);
+ while ((n & 3) != 0) {
+ dstPtr[n++] = 0;
+ }
+ dstWords = (n >> 2);
+ }
+ dstPos += dstWords;
+ srcPos += srcWords;
+ i++;
+ }
+ return dstPos;
+}
+
/*
INIT_API_CONNECT_REC
---------------------------
@@ -9973,6 +10066,12 @@ void Dbtc::initTable(Signal* signal)
tabptr.p->tableType = 0;
tabptr.p->enabled = false;
tabptr.p->dropping = false;
+ tabptr.p->noOfKeyAttr = 0;
+ tabptr.p->hasCharAttr = 0;
+ for (unsigned k = 0; k < MAX_ATTRIBUTES_IN_INDEX; k++) {
+ tabptr.p->keyAttr[k].attributeDescriptor = 0;
+ tabptr.p->keyAttr[k].charsetInfo = 0;
+ }
}//for
}//Dbtc::initTable()
diff --git a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
index 1c7662c5d55..6d169d20d16 100644
--- a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
+++ b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
@@ -207,6 +207,8 @@
#define ZTUPLE_DELETED_ERROR 626
#define ZINSERT_ERROR 630
+#define ZINVALID_CHAR_FORMAT 744
+
/* SOME WORD POSITIONS OF FIELDS IN SOME HEADERS */
#define ZPAGE_STATE_POS 0 /* POSITION OF PAGE STATE */
@@ -1020,14 +1022,14 @@ public:
* for md5 summing and when returning keyinfo. Returns number of
* words or negative (-terrorCode) on error.
*/
- int tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut);
+ int tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut, bool xfrmFlag);
/*
* ACC reads primary key without headers into an array of words. At
* this point in ACC deconstruction, ACC still uses logical references
* to fragment and tuple.
*/
- int accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut);
+ int accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut, bool xfrmFlag);
/*
* TUX checks if tuple is visible to scan.
@@ -1637,20 +1639,6 @@ private:
bool readBitsNotNULL(Uint32* outBuffer, AttributeHeader*, Uint32, Uint32);
bool updateBitsNotNULL(Uint32* inBuffer, Uint32, Uint32);
-// *****************************************************************
-// Read char routines optionally (tXfrmFlag) apply strxfrm
-// *****************************************************************
-
- bool readCharNotNULL(Uint32* outBuffer,
- AttributeHeader* ahOut,
- Uint32 attrDescriptor,
- Uint32 attrDes2);
-
- bool readCharNULLable(Uint32* outBuffer,
- AttributeHeader* ahOut,
- Uint32 attrDescriptor,
- Uint32 attrDes2);
-
//------------------------------------------------------------------
//------------------------------------------------------------------
bool nullFlagCheck(Uint32 attrDes2);
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp b/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
index 3b0ba1c196f..ab6e0642e11 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
@@ -173,7 +173,7 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu
}
int
-Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut)
+Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut, bool xfrmFlag)
{
ljamEntry();
// use own variables instead of globals
@@ -200,7 +200,7 @@ Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* data
operPtr.i = RNIL;
operPtr.p = NULL;
// do it
- int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, true);
+ int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, xfrmFlag);
// restore globals
tabptr = tabptr_old;
fragptr = fragptr_old;
@@ -229,7 +229,7 @@ Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* data
}
int
-Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut)
+Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut, bool xfrmFlag)
{
ljamEntry();
// get table
@@ -245,7 +245,7 @@ Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIn
ndbrequire((pageIndex & 0x1) == 0);
Uint32 pageOffset = ZPAGE_HEADER_SIZE + (pageIndex >> 1) * tablePtr.p->tupheadsize;
// use TUX routine - optimize later
- int ret = tuxReadPk(fragPtr.i, pageId, pageOffset, dataOut);
+ int ret = tuxReadPk(fragPtr.i, pageId, pageOffset, dataOut, xfrmFlag);
return ret;
}
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp b/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
index 56ae861270c..13593602abc 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
@@ -364,13 +364,8 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
ndbrequire(false);
}//if
if (csNumber != 0) {
- CHARSET_INFO* cs = get_charset(csNumber, MYF(0));
- if (cs == NULL) {
- ljam();
- terrorCode = TupAddAttrRef::InvalidCharset;
- addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId);
- return;
- }
+ CHARSET_INFO* cs = all_charsets[csNumber];
+ ndbrequire(cs != NULL);
Uint32 i = 0;
while (i < fragOperPtr.p->charsetIndex) {
ljam();
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
index 5bcbb482a8c..73cddfd0382 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
@@ -59,10 +59,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
} else {
ndbrequire(false);
}//if
- // replace read function of char attribute
+ // replace functions for char attribute
if (AttributeOffset::getCharsetFlag(attrOffset)) {
ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readCharNotNULL;
+ regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNotNULL;
+ regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNotNULL;
}
} else {
if (AttributeDescriptor::getSize(attrDescriptor) == 0){
@@ -86,10 +87,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHZeroWordNULLable;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
}//if
- // replace read function of char attribute
+ // replace functions for char attribute
if (AttributeOffset::getCharsetFlag(attrOffset)) {
ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readCharNULLable;
+ regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNULLable;
+ regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
}
}//if
} else if (AttributeDescriptor::getArrayType(attrDescriptor) == ZVAR_ARRAY) {
@@ -337,25 +339,68 @@ Dbtup::readFixedSizeTHManyWordNotNULL(Uint32* outBuffer,
Uint32 attrDes2)
{
Uint32 indexBuf = tOutBufIndex;
+ Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
- Uint32 newIndexBuf = indexBuf + attrNoOfWords;
Uint32 maxRead = tMaxRead;
ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset);
- if (newIndexBuf <= maxRead) {
- ljam();
- ahOut->setDataSize(attrNoOfWords);
- MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
- &tTupleHeader[readOffset],
- attrNoOfWords);
- tOutBufIndex = newIndexBuf;
- return true;
+ if (! charsetFlag || ! tXfrmFlag) {
+ Uint32 newIndexBuf = indexBuf + attrNoOfWords;
+ if (newIndexBuf <= maxRead) {
+ ljam();
+ ahOut->setDataSize(attrNoOfWords);
+ MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
+ &tTupleHeader[readOffset],
+ attrNoOfWords);
+ tOutBufIndex = newIndexBuf;
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ }//if
} else {
ljam();
- terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
- return false;
- }//if
+ Tablerec* regTabPtr = tabptr.p;
+ Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+ Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
+ ndbrequire(i < regTabPtr->noOfCharsets);
+ CHARSET_INFO* cs = regTabPtr->charsetArray[i];
+ Uint32 xmul = cs->strxfrm_multiply;
+ if (xmul == 0)
+ xmul = 1;
+ Uint32 dstLen = xmul * srcBytes;
+ Uint32 maxIndexBuf = indexBuf + (dstLen >> 2);
+ if (maxIndexBuf <= maxRead) {
+ ljam();
+ uchar* dstPtr = (uchar*)&outBuffer[indexBuf];
+ const uchar* srcPtr = (uchar*)&tTupleHeader[readOffset];
+ const char* ssrcPtr = (const char*)srcPtr;
+ // could verify data format optionally
+ if (true ||
+ (*cs->cset->well_formed_len)(cs, ssrcPtr, ssrcPtr + srcBytes, ZNIL) == srcBytes) {
+ ljam();
+ // normalize
+ Uint32 n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes);
+ while ((n & 3) != 0) {
+ dstPtr[n++] = 0;
+ }
+ Uint32 dstWords = (n >> 2);
+ ahOut->setDataSize(dstWords);
+ Uint32 newIndexBuf = indexBuf + dstWords;
+ ndbrequire(newIndexBuf <= maxRead);
+ tOutBufIndex = newIndexBuf;
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZTUPLE_CORRUPTED_ERROR;
+ }
+ } else {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ }
+ }
+ return false;
}//Dbtup::readFixedSizeTHManyWordNotNULL()
bool
@@ -402,7 +447,6 @@ Dbtup::readFixedSizeTHManyWordNULLable(Uint32* outBuffer,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
-ljam();
if (!nullFlagCheck(attrDes2)) {
ljam();
return readFixedSizeTHManyWordNotNULL(outBuffer,
@@ -563,74 +607,6 @@ Dbtup::readDynSmallVarSize(Uint32* outBuffer,
return false;
}//Dbtup::readDynSmallVarSize()
-
-bool
-Dbtup::readCharNotNULL(Uint32* outBuffer,
- AttributeHeader* ahOut,
- Uint32 attrDescriptor,
- Uint32 attrDes2)
-{
- Uint32 indexBuf = tOutBufIndex;
- Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
- Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
- Uint32 newIndexBuf = indexBuf + attrNoOfWords;
- Uint32 maxRead = tMaxRead;
-
- ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset);
- if (newIndexBuf <= maxRead) {
- ljam();
- ahOut->setDataSize(attrNoOfWords);
- if (! tXfrmFlag) {
- MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
- &tTupleHeader[readOffset],
- attrNoOfWords);
- } else {
- ljam();
- Tablerec* regTabPtr = tabptr.p;
- Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
- ndbrequire(i < tabptr.p->noOfCharsets);
- // not const in MySQL
- CHARSET_INFO* cs = tabptr.p->charsetArray[i];
- // XXX should strip Uint32 null padding
- const unsigned nBytes = attrNoOfWords << 2;
- unsigned n =
- (*cs->coll->strnxfrm)(cs,
- (uchar*)&outBuffer[indexBuf],
- nBytes,
- (const uchar*)&tTupleHeader[readOffset],
- nBytes);
- // pad with ascii spaces
- while (n < nBytes)
- ((uchar*)&outBuffer[indexBuf])[n++] = 0x20;
- }
- tOutBufIndex = newIndexBuf;
- return true;
- } else {
- ljam();
- terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
- return false;
- }
-}
-
-bool
-Dbtup::readCharNULLable(Uint32* outBuffer,
- AttributeHeader* ahOut,
- Uint32 attrDescriptor,
- Uint32 attrDes2)
-{
- if (!nullFlagCheck(attrDes2)) {
- ljam();
- return readCharNotNULL(outBuffer,
- ahOut,
- attrDescriptor,
- attrDes2);
- } else {
- ljam();
- ahOut->setNULL();
- return true;
- }
-}
-
/* ---------------------------------------------------------------------- */
/* THIS ROUTINE IS USED TO UPDATE A NUMBER OF ATTRIBUTES. IT IS */
/* USED BY THE INSERT ROUTINE, THE UPDATE ROUTINE AND IT CAN BE */
@@ -818,6 +794,7 @@ Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer,
Uint32 indexBuf = tInBufIndex;
Uint32 inBufLen = tInBufLen;
Uint32 updateOffset = AttributeOffset::getOffset(attrDes2);
+ Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
AttributeHeader ahIn(inBuffer[indexBuf]);
Uint32 nullIndicator = ahIn.isNULL();
Uint32 noOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
@@ -827,6 +804,21 @@ Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer,
if (newIndex <= inBufLen) {
if (!nullIndicator) {
ljam();
+ if (charsetFlag) {
+ ljam();
+ Tablerec* regTabPtr = tabptr.p;
+ Uint32 bytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+ Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
+ ndbrequire(i < regTabPtr->noOfCharsets);
+ // not const in MySQL
+ CHARSET_INFO* cs = regTabPtr->charsetArray[i];
+ const char* ssrc = (const char*)&inBuffer[tInBufIndex + 1];
+ if ((*cs->cset->well_formed_len)(cs, ssrc, ssrc + bytes, ZNIL) != bytes) {
+ ljam();
+ terrorCode = ZINVALID_CHAR_FORMAT;
+ return false;
+ }
+ }
tInBufIndex = newIndex;
MEMCOPY_NO_WORDS(&tTupleHeader[updateOffset],
&inBuffer[indexBuf + 1],
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp b/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
index aac5c326cad..476a4b5724b 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
@@ -753,7 +753,7 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
regTabPtr->noOfKeyAttr,
keyBuffer,
ZATTR_BUFFER_SIZE,
- true);
+ false);
ndbrequire(ret != -1);
noPrimKey= ret;
@@ -796,7 +796,7 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
numAttrsToRead,
mainBuffer,
ZATTR_BUFFER_SIZE,
- true);
+ false);
ndbrequire(ret != -1);
noMainWords= ret;
} else {
@@ -822,7 +822,7 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
numAttrsToRead,
copyBuffer,
ZATTR_BUFFER_SIZE,
- true);
+ false);
ndbrequire(ret != -1);
noCopyWords = ret;
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
index ddab77b97b5..ffce9969074 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
@@ -18,24 +18,26 @@
#include "Dbtux.hpp"
/*
- * Search key vs node prefix or entry
+ * Search key vs node prefix or entry.
*
* The comparison starts at given attribute position. The position is
* updated by number of equal initial attributes found. The entry data
* may be partial in which case CmpUnknown may be returned.
+ *
+ * The attributes are normalized and have variable size given in words.
*/
int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen)
{
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
- // number of words of attribute data left
- unsigned len2 = maxlen;
// skip to right position in search key only
for (unsigned i = 0; i < start; i++) {
jam();
searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
}
+ // number of words of entry data left
+ unsigned len2 = maxlen;
int ret = 0;
while (start < numAttrs) {
if (len2 <= AttributeHeaderSize) {
@@ -47,18 +49,20 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
if (! searchKey.ah().isNULL()) {
if (! entryData.ah().isNULL()) {
jam();
- // current attribute
+ // verify attribute id
const DescAttr& descAttr = descEnt.m_descAttr[start];
- // full data size
- const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
- ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
- const unsigned size2 = min(size1, len2);
+ ndbrequire(searchKey.ah().getAttributeId() == descAttr.m_primaryAttrId);
+ ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
+ // sizes
+ const unsigned size1 = searchKey.ah().getDataSize();
+ const unsigned size2 = min(entryData.ah().getDataSize(), len2);
len2 -= size2;
// compare
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start];
const Uint32* const p1 = &searchKey[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize];
- ret = (*cmp)(0, p1, p2, size1, size2);
+ const bool full = (maxlen == MaxAttrDataSize);
+ ret = (*cmp)(0, p1, size1 << 2, p2, size2 << 2, full);
if (ret != 0) {
jam();
break;
@@ -104,6 +108,8 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
* 0 a >= 2 and b > 3 yes +1
* 1 a <= 2 and b <= 3 no +1
* 1 a <= 2 and b < 3 yes -1
+ *
+ * The attributes are normalized and have variable size given in words.
*/
int
Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
@@ -127,21 +133,21 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
if (! boundInfo.ah().isNULL()) {
if (! entryData.ah().isNULL()) {
jam();
- // current attribute
- const unsigned index = boundInfo.ah().getAttributeId();
+ // verify attribute id
+ const Uint32 index = boundInfo.ah().getAttributeId();
ndbrequire(index < frag.m_numAttrs);
const DescAttr& descAttr = descEnt.m_descAttr[index];
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
- // full data size
+ // sizes
const unsigned size1 = boundInfo.ah().getDataSize();
- ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
- const unsigned size2 = min(size1, len2);
+ const unsigned size2 = min(entryData.ah().getDataSize(), len2);
len2 -= size2;
// compare
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index];
const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize];
- int ret = (*cmp)(0, p1, p2, size1, size2);
+ const bool full = (maxlen == MaxAttrDataSize);
+ int ret = (*cmp)(0, p1, size1 << 2, p2, size2 << 2, full);
if (ret != 0) {
jam();
return ret;
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
index 3c2613c6cd1..0efcc950e02 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
@@ -340,7 +340,7 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan)
out << " [savePointId " << dec << scan.m_savePointId << "]";
out << " [accLockOp " << hex << scan.m_accLockOp << "]";
out << " [accLockOps";
- for (unsigned i = 0; i < Dbtux::MaxAccLockOps; i++) {
+ for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) {
if (scan.m_accLockOps[i] != RNIL)
out << " " << hex << scan.m_accLockOps[i];
}
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
index 18aa914de05..b6d48b44aef 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
@@ -217,6 +217,7 @@ Dbtux::setKeyAttrs(const Frag& frag)
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
for (unsigned i = 0; i < numAttrs; i++) {
+ jam();
const DescAttr& descAttr = descEnt.m_descAttr[i];
Uint32 size = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// set attr id and fixed size
@@ -244,6 +245,26 @@ Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData)
jamEntry();
// TODO handle error
ndbrequire(ret > 0);
+#ifdef VM_TRACE
+ if (debugFlags & (DebugMaint | DebugScan)) {
+ debugOut << "readKeyAttrs:" << endl;
+ ConstData data = keyData;
+ Uint32 totalSize = 0;
+ for (Uint32 i = start; i < numAttrs; i++) {
+ Uint32 attrId = data.ah().getAttributeId();
+ Uint32 dataSize = data.ah().getDataSize();
+ debugOut << i << " attrId=" << attrId << " size=" << dataSize;
+ data += 1;
+ for (Uint32 j = 0; j < dataSize; j++) {
+ debugOut << " " << hex << data[0];
+ data += 1;
+ }
+ debugOut << endl;
+ totalSize += 1 + dataSize;
+ }
+ ndbassert(totalSize == ret);
+ }
+#endif
}
void
@@ -251,7 +272,7 @@ Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize)
{
const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
const TupLoc tupLoc = ent.m_tupLoc;
- int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), pkData);
+ int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), pkData, true);
jamEntry();
// TODO handle error
ndbrequire(ret > 0);
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
index 2d256487dcc..4b568badc67 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
@@ -59,7 +59,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
// get base fragment id and extra bits
const Uint32 fragId = req->fragId & ~1;
const Uint32 fragBit = req->fragId & 1;
-
// get the fragment
FragPtr fragPtr;
fragPtr.i = RNIL;
@@ -71,7 +70,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
}
}
-
ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p;
// set up index keys for this operation
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
index cee4aaa8625..021d3d94d8e 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
@@ -16,6 +16,7 @@
#define DBTUX_META_CPP
#include "Dbtux.hpp"
+#include <my_sys.h>
/*
* Create index.
@@ -215,17 +216,15 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
errorCode = TuxAddAttrRef::InvalidAttributeType;
break;
}
-#ifdef dbtux_uses_charset
if (descAttr.m_charset != 0) {
- CHARSET_INFO *cs = get_charset(descAttr.m_charset, MYF(0));
- // here use the non-binary type
+ CHARSET_INFO *cs = all_charsets[descAttr.m_charset];
+ ndbrequire(cs != 0);
if (! NdbSqlUtil::usable_in_ordered_index(descAttr.m_typeId, cs)) {
jam();
errorCode = TuxAddAttrRef::InvalidCharset;
break;
}
}
-#endif
const bool lastAttr = (indexPtr.p->m_numAttrs == fragOpPtr.p->m_numAttrsRecvd);
if (ERROR_INSERTED(12003) && fragOpPtr.p->m_fragNo == 0 && attrId == 0 ||
ERROR_INSERTED(12004) && fragOpPtr.p->m_fragNo == 0 && lastAttr ||
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
index 31a11a5c16b..34f2ab3c525 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
@@ -16,6 +16,7 @@
#define DBTUX_SCAN_CPP
#include "Dbtux.hpp"
+#include <my_sys.h>
void
Dbtux::execACC_SCANREQ(Signal* signal)
@@ -112,50 +113,89 @@ Dbtux::execACC_SCANREQ(Signal* signal)
* keys and that all but possibly last bound is non-strict.
*
* Finally save the sets of lower and upper bounds (i.e. start key and
- * end key). Full bound type (< 4) is included but only the strict bit
- * is used since lower and upper have now been separated.
+ * end key). Full bound type is included but only the strict bit is
+ * used since lower and upper have now been separated.
*/
void
Dbtux::execTUX_BOUND_INFO(Signal* signal)
{
jamEntry();
- struct BoundInfo {
- int type;
- unsigned offset;
- unsigned size;
- };
- TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend();
- const TuxBoundInfo reqCopy = *(const TuxBoundInfo*)sig;
- const TuxBoundInfo* const req = &reqCopy;
// get records
+ TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend();
+ const TuxBoundInfo* const req = (const TuxBoundInfo*)sig;
ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI);
- Index& index = *c_indexPool.getPtr(scan.m_indexId);
- // collect lower and upper bounds
+ const Index& index = *c_indexPool.getPtr(scan.m_indexId);
+ const DescEnt& descEnt = getDescEnt(index.m_descPage, index.m_descOff);
+ // collect normalized lower and upper bounds
+ struct BoundInfo {
+ int type2; // with EQ -> LE/GE
+ Uint32 offset; // offset in xfrmData
+ Uint32 size;
+ };
BoundInfo boundInfo[2][MaxIndexAttributes];
+ const unsigned dstSize = 1024 * MAX_XFRM_MULTIPLY;
+ Uint32 xfrmData[dstSize];
+ Uint32 dstPos = 0;
// largest attrId seen plus one
Uint32 maxAttrId[2] = { 0, 0 };
- unsigned offset = 0;
- const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
// walk through entries
+ const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
+ Uint32 offset = 0;
while (offset + 2 <= req->boundAiLength) {
jam();
const unsigned type = data[offset];
- if (type > 4) {
- jam();
- scan.m_state = ScanOp::Invalid;
- sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
- return;
- }
const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
const Uint32 attrId = ah->getAttributeId();
const Uint32 dataSize = ah->getDataSize();
- if (attrId >= index.m_numAttrs) {
+ if (type > 4 || attrId >= index.m_numAttrs || dstPos + 2 + dataSize > dstSize) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
return;
}
+ // copy header
+ xfrmData[dstPos + 0] = data[offset + 0];
+ xfrmData[dstPos + 1] = data[offset + 1];
+ // copy bound value
+ Uint32 dstWords = 0;
+ if (! ah->isNULL()) {
+ jam();
+ const DescAttr& descAttr = descEnt.m_descAttr[attrId];
+ Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(descAttr.m_attrDesc);
+ Uint32 srcWords = (srcBytes + 3) / 4;
+ if (srcWords != dataSize) {
+ jam();
+ scan.m_state = ScanOp::Invalid;
+ sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
+ return;
+ }
+ uchar* dstPtr = (uchar*)&xfrmData[dstPos + 2];
+ const uchar* srcPtr = (const uchar*)&data[offset + 2];
+ if (descAttr.m_charset == 0) {
+ memcpy(dstPtr, srcPtr, srcWords << 2);
+ dstWords = srcWords;
+ } else {
+ jam();
+ CHARSET_INFO* cs = all_charsets[descAttr.m_charset];
+ Uint32 xmul = cs->strxfrm_multiply;
+ if (xmul == 0)
+ xmul = 1;
+ Uint32 dstLen = xmul * srcBytes;
+ if (dstLen > ((dstSize - dstPos) << 2)) {
+ jam();
+ scan.m_state = ScanOp::Invalid;
+ sig->errorCode = TuxBoundInfo::TooMuchAttrInfo;
+ return;
+ }
+ Uint32 n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes);
+ while ((n & 3) != 0) {
+ dstPtr[n++] = 0;
+ }
+ dstWords = n / 4;
+ }
+ }
for (unsigned j = 0; j <= 1; j++) {
+ jam();
// check if lower/upper bit matches
const unsigned luBit = (j << 1);
if ((type & 0x2) != luBit && type != 4)
@@ -164,29 +204,35 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
const unsigned type2 = (type & 0x1) | luBit;
// fill in any gap
while (maxAttrId[j] <= attrId) {
+ jam();
BoundInfo& b = boundInfo[j][maxAttrId[j]++];
- b.type = -1;
+ b.type2 = -1;
}
BoundInfo& b = boundInfo[j][attrId];
- if (b.type != -1) {
- // compare with previous bound
- if (b.type != (int)type2 ||
- b.size != 2 + dataSize ||
- memcmp(&data[b.offset + 2], &data[offset + 2], dataSize << 2) != 0) {
+ if (b.type2 != -1) {
+ // compare with previously defined bound
+ if (b.type2 != (int)type2 ||
+ b.size != 2 + dstWords ||
+ memcmp(&xfrmData[b.offset + 2], &xfrmData[dstPos + 2], dstWords << 2) != 0) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidBounds;
return;
}
} else {
+ // fix length
+ AttributeHeader* ah = (AttributeHeader*)&xfrmData[dstPos + 1];
+ ah->setDataSize(dstWords);
// enter new bound
- b.type = type2;
- b.offset = offset;
- b.size = 2 + dataSize;
+ jam();
+ b.type2 = type2;
+ b.offset = dstPos;
+ b.size = 2 + dstWords;
}
}
// jump to next
offset += 2 + dataSize;
+ dstPos += 2 + dstWords;
}
if (offset != req->boundAiLength) {
jam();
@@ -200,13 +246,13 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
jam();
const BoundInfo& b = boundInfo[j][i];
// check for gap or strict bound before last
- if (b.type == -1 || (i + 1 < maxAttrId[j] && (b.type & 0x1))) {
+ if (b.type2 == -1 || (i + 1 < maxAttrId[j] && (b.type2 & 0x1))) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidBounds;
return;
}
- bool ok = scan.m_bound[j]->append(&data[b.offset], b.size);
+ bool ok = scan.m_bound[j]->append(&xfrmData[b.offset], b.size);
if (! ok) {
jam();
scan.m_state = ScanOp::Invalid;
diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp
index f56c3ce94c2..3f6ef69ca96 100644
--- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp
+++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp
@@ -1546,6 +1546,11 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
m_error.code = 743;
return -1;
}
+ // distribution key not supported for Char attribute
+ if (col->m_distributionKey && col->m_cs != NULL) {
+ m_error.code = 745;
+ return -1;
+ }
// charset in upper half of precision
if (col->getCharType()) {
tmpAttr.AttributeExtPrecision |= (col->m_cs->number << 16);
diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp
index d9aa860f71f..aabb58fc974 100644
--- a/ndb/src/ndbapi/NdbOperationDefine.cpp
+++ b/ndb/src/ndbapi/NdbOperationDefine.cpp
@@ -520,16 +520,6 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
// Insert Attribute Id into ATTRINFO part.
const Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
- CHARSET_INFO* cs = tAttrInfo->m_cs;
- // invalid data can crash kernel
- if (cs != NULL &&
- (*cs->cset->well_formed_len)(cs,
- aValue,
- aValue + sizeInBytes,
- sizeInBytes) != sizeInBytes) {
- setErrorCodeAbort(744);
- return -1;
- }
#if 0
tAttrSize = tAttrInfo->theAttrSize;
tArraySize = tAttrInfo->theArraySize;
diff --git a/ndb/src/ndbapi/NdbOperationSearch.cpp b/ndb/src/ndbapi/NdbOperationSearch.cpp
index 7ae5bd70a11..972dc3d94eb 100644
--- a/ndb/src/ndbapi/NdbOperationSearch.cpp
+++ b/ndb/src/ndbapi/NdbOperationSearch.cpp
@@ -62,7 +62,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
Uint32 tData;
Uint32 tKeyInfoPosition;
const char* aValue = aValuePassed;
- Uint64 xfrmData[512];
Uint64 tempData[512];
if ((theStatus == OperationDefined) &&
@@ -140,21 +139,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
aValue = (char*)&tempData[0];
}//if
}
- const char* aValueToWrite = aValue;
-
- CHARSET_INFO* cs = tAttrInfo->m_cs;
- if (cs != 0) {
- // current limitation: strxfrm does not increase length
- assert(cs->strxfrm_multiply <= 1);
- ((Uint32*)xfrmData)[sizeInBytes >> 2] = 0;
- unsigned n =
- (*cs->coll->strnxfrm)(cs,
- (uchar*)xfrmData, sizeof(xfrmData),
- (const uchar*)aValue, sizeInBytes);
- while (n < sizeInBytes)
- ((uchar*)xfrmData)[n++] = 0x20;
- aValue = (char*)xfrmData;
- }
Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Inc. bits in last word
@@ -200,13 +184,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
*************************************************************************/
if ((tOpType == InsertRequest) ||
(tOpType == WriteRequest)) {
- // invalid data can crash kernel
- if (cs != NULL &&
- (*cs->cset->well_formed_len)(cs,
- aValueToWrite,
- aValueToWrite + sizeInBytes,
- sizeInBytes) != sizeInBytes)
- goto equal_error4;
Uint32 ahValue;
const Uint32 sz = totalSizeInWords;
@@ -224,7 +201,7 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
}
insertATTRINFO( ahValue );
- insertATTRINFOloop((Uint32*)aValueToWrite, sz);
+ insertATTRINFOloop((Uint32*)aValue, sz);
}//if
/**************************************************************************
@@ -321,10 +298,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
equal_error3:
setErrorCodeAbort(4209);
return -1;
-
- equal_error4:
- setErrorCodeAbort(744);
- return -1;
}
/******************************************************************************
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index f4973b2e66f..671996bc534 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -1072,29 +1072,6 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
setErrorCodeAbort(4209);
return -1;
}
-
- // normalize char bound
- CHARSET_INFO* cs = tAttrInfo->m_cs;
- Uint64 xfrmData[1001];
- if (cs != NULL && aValue != NULL) {
- // current limitation: strxfrm does not increase length
- assert(cs->strxfrm_multiply <= 1);
- ((Uint32*)xfrmData)[len >> 2] = 0;
- unsigned n =
- (*cs->coll->strnxfrm)(cs,
- (uchar*)xfrmData, sizeof(xfrmData),
- (const uchar*)aValue, len);
-
- while (n < len)
- ((uchar*)xfrmData)[n++] = 0x20;
-
- if(len & 3)
- {
- len += (4 - (len & 3));
- }
-
- aValue = (char*)xfrmData;
- }
// insert attribute header
Uint32 tIndexAttrId = tAttrInfo->m_attrId;
@@ -1117,7 +1094,7 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
theTotalNrOfKeyWordInSignal = currLen + totalLen;
} else {
if(!aligned || !nobytes){
- Uint32 *tempData = (Uint32*)xfrmData;
+ Uint32 tempData[2000];
tempData[0] = type;
tempData[1] = ahValue;
tempData[2 + (len >> 2)] = 0;
@@ -1273,10 +1250,10 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
return (r1_null ? -1 : 1);
}
const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
- Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4;
+ Uint32 len = r1->theAttrSize * r1->theArraySize;
if(!r1_null){
const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_extType);
- int r = (*sqlType.m_cmp)(col.m_cs, d1, d2, size, size);
+ int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true);
if(r){
assert(r != NdbSqlUtil::CmpUnknown);
return r;
diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c
index 638996530e2..9762eb85de1 100644
--- a/ndb/src/ndbapi/ndberror.c
+++ b/ndb/src/ndbapi/ndberror.c
@@ -205,6 +205,7 @@ ErrorBundle ErrorCodes[] = {
*/
{ 892, IE, "Inconsistent hash index. The index needs to be dropped and recreated" },
{ 895, IE, "Inconsistent ordered index. The index needs to be dropped and recreated" },
+ { 896, IE, "Tuple corrupted - wrong checksum or column data in invalid format" },
{ 202, IE, "202" },
{ 203, IE, "203" },
{ 207, IE, "207" },
@@ -311,6 +312,7 @@ ErrorBundle ErrorCodes[] = {
{ 742, SE, "Unsupported attribute type in index" },
{ 743, SE, "Unsupported character set in table or index" },
{ 744, SE, "Character string is invalid for given character set" },
+ { 745, SE, "Distribution key not supported for char attribute (use binary attribute)" },
{ 241, SE, "Invalid schema object version" },
{ 283, SE, "Table is being dropped" },
{ 284, SE, "Table not defined in transaction coordinator" },
diff --git a/ndb/test/ndbapi/Makefile.am b/ndb/test/ndbapi/Makefile.am
index 90f0a08d633..c050c42a701 100644
--- a/ndb/test/ndbapi/Makefile.am
+++ b/ndb/test/ndbapi/Makefile.am
@@ -32,7 +32,7 @@ testTransactions \
testDeadlock \
test_event ndbapi_slow_select testReadPerf testLcp \
testPartitioning \
-testBitfield
+testBitfield foo
#flexTimedAsynch
#testBlobs
@@ -73,6 +73,7 @@ testReadPerf_SOURCES = testReadPerf.cpp
testLcp_SOURCES = testLcp.cpp
testPartitioning_SOURCES = testPartitioning.cpp
testBitfield_SOURCES = testBitfield.cpp
+foo_SOURCES = foo.cpp
INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel
@@ -89,3 +90,4 @@ testBackup_LDADD = $(LDADD) bank/libbank.a
%::SCCS/s.%
+
diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp
index 41f0686e63b..2dd63178d01 100644
--- a/ndb/test/ndbapi/testOIBasic.cpp
+++ b/ndb/test/ndbapi/testOIBasic.cpp
@@ -37,6 +37,7 @@ struct Opt {
unsigned m_batch;
const char* m_bound;
const char* m_case;
+ bool m_collsp;
bool m_core;
const char* m_csname;
CHARSET_INFO* m_cs;
@@ -55,17 +56,18 @@ struct Opt {
unsigned m_scanbat;
unsigned m_scanpar;
unsigned m_scanstop;
- unsigned m_seed;
+ int m_seed;
unsigned m_subloop;
const char* m_table;
unsigned m_threads;
- unsigned m_v;
+ int m_v; // int for lint
Opt() :
m_batch(32),
m_bound("01234"),
m_case(0),
+ m_collsp(false),
m_core(false),
- m_csname("latin1_bin"),
+ m_csname("random"),
m_cs(0),
m_die(0),
m_dups(false),
@@ -82,7 +84,7 @@ struct Opt {
m_scanbat(0),
m_scanpar(0),
m_scanstop(0),
- m_seed(0),
+ m_seed(-1),
m_subloop(4),
m_table(0),
m_threads(10),
@@ -104,12 +106,13 @@ printhelp()
<< " -batch N pk operations in batch [" << d.m_batch << "]" << endl
<< " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl
<< " -case abc only given test cases (letters a-z)" << endl
+ << " -collsp use strnncollsp instead of strnxfrm" << endl
<< " -core core dump on error [" << d.m_core << "]" << endl
- << " -csname S charset (collation) of non-pk char column [" << d.m_csname << "]" << endl
+ << " -csname S charset or collation [" << d.m_csname << "]" << endl
<< " -die nnn exit immediately on NDB error code nnn" << endl
<< " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
<< " -fragtype T fragment type single/small/medium/large" << endl
- << " -index xyz only given index numbers (digits 1-9)" << endl
+ << " -index xyz only given index numbers (digits 0-9)" << endl
<< " -loop N loop count full suite 0=forever [" << d.m_loop << "]" << endl
<< " -nologging create tables in no-logging mode" << endl
<< " -noverify skip index verifications" << endl
@@ -118,9 +121,9 @@ printhelp()
<< " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl
<< " -scanbat N scan batch per fragment (ignored by ndb api) [" << d.m_scanbat << "]" << endl
<< " -scanpar N scan parallelism [" << d.m_scanpar << "]" << endl
- << " -seed N srandom seed 0=loop number[" << d.m_seed << "]" << endl
+ << " -seed N srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl
<< " -subloop N subtest loop count [" << d.m_subloop << "]" << endl
- << " -table xyz only given table numbers (digits 1-9)" << endl
+ << " -table xyz only given table numbers (digits 0-9)" << endl
<< " -threads N number of threads [" << d.m_threads << "]" << endl
<< " -vN verbosity [" << d.m_v << "]" << endl
<< " -h or -help print this help text" << endl
@@ -135,9 +138,33 @@ static const bool g_store_null_key = true;
// compare NULL like normal value (NULL < not NULL, NULL == NULL)
static const bool g_compare_null = true;
+static const char* hexstr = "0123456789abcdef";
+
+// random ints
+
+static unsigned
+urandom(unsigned n)
+{
+ if (n == 0)
+ return 0;
+ unsigned i = random() % n;
+ return i;
+}
+
+static int
+irandom(unsigned n)
+{
+ if (n == 0)
+ return 0;
+ int i = random() % n;
+ if (random() & 0x1)
+ i = -i;
+ return i;
+}
+
// log and error macros
-static NdbMutex *ndbout_mutex= NULL;
+static NdbMutex *ndbout_mutex = NULL;
static unsigned getthrno();
@@ -198,7 +225,7 @@ getthrstr()
return -1; \
} while (0)
-// method parameters base class
+// method parameters
class Thr;
class Con;
@@ -222,6 +249,8 @@ struct Par : public Opt {
// value calculation
unsigned m_range;
unsigned m_pctrange;
+ unsigned m_pctbrange;
+ int m_bdir;
// choice of key
bool m_randomkey;
// do verify after read
@@ -240,7 +269,9 @@ struct Par : public Opt {
m_slno(0),
m_totrows(m_threads * m_rows),
m_range(m_rows),
- m_pctrange(0),
+ m_pctrange(40),
+ m_pctbrange(80),
+ m_bdir(0),
m_randomkey(false),
m_verify(false),
m_deadlock(false) {
@@ -248,15 +279,15 @@ struct Par : public Opt {
};
static bool
-usetable(unsigned i)
+usetable(Par par, unsigned i)
{
- return g_opt.m_table == 0 || strchr(g_opt.m_table, '1' + i) != 0;
+ return par.m_table == 0 || strchr(par.m_table, '0' + i) != 0;
}
static bool
-useindex(unsigned i)
+useindex(Par par, unsigned i)
{
- return g_opt.m_index == 0 || strchr(g_opt.m_index, '1' + i) != 0;
+ return par.m_index == 0 || strchr(par.m_index, '0' + i) != 0;
}
static unsigned
@@ -382,38 +413,196 @@ Lst::reset()
m_cnt = 0;
}
+// character sets
+
+static const unsigned maxcsnumber = 512;
+static const unsigned maxcharcount = 32;
+static const unsigned maxcharsize = 4;
+static const unsigned maxxmulsize = 8;
+
+// single mb char
+struct Chr {
+ unsigned char m_bytes[maxcharsize];
+ unsigned char m_xbytes[maxxmulsize * maxcharsize];
+ unsigned m_size;
+ Chr();
+};
+
+Chr::Chr()
+{
+ memset(m_bytes, 0, sizeof(m_bytes));
+ memset(m_xbytes, 0, sizeof(m_xbytes));
+ m_size = 0;
+}
+
+// charset and random valid chars to use
+struct Chs {
+ CHARSET_INFO* m_cs;
+ unsigned m_xmul;
+ Chr* m_chr;
+ Chs(CHARSET_INFO* cs);
+ ~Chs();
+};
+
+Chs::Chs(CHARSET_INFO* cs) :
+ m_cs(cs)
+{
+ m_xmul = m_cs->strxfrm_multiply;
+ if (m_xmul == 0)
+ m_xmul = 1;
+ assert(m_xmul <= maxxmulsize);
+ m_chr = new Chr [maxcharcount];
+ unsigned i = 0;
+ unsigned miss1 = 0;
+ unsigned miss2 = 0;
+ while (i < maxcharcount) {
+ unsigned char* bytes = m_chr[i].m_bytes;
+ unsigned char* xbytes = m_chr[i].m_xbytes;
+ unsigned size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1);
+ assert(m_cs->mbminlen <= size && size <= m_cs->mbmaxlen);
+ // prefer longer chars
+ if (size == m_cs->mbminlen && m_cs->mbminlen < m_cs->mbmaxlen && urandom(5) != 0)
+ continue;
+ for (unsigned j = 0; j < size; j++) {
+ bytes[j] = urandom(256);
+ }
+ const char* sbytes = (const char*)bytes;
+ if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + size, 1) != size) {
+ miss1++;
+ continue;
+ }
+ // do not trust well_formed_len currently
+ memset(xbytes, 0, sizeof(xbytes));
+ // currently returns buffer size always
+ int xlen = (*cs->coll->strnxfrm)(cs, xbytes, m_xmul * size, bytes, size);
+ // check we got something
+ bool xok = false;
+ for (unsigned j = 0; j < xlen; j++) {
+ if (xbytes[j] != 0) {
+ xok = true;
+ break;
+ }
+ }
+ if (! xok) {
+ miss2++;
+ continue;
+ }
+ // occasional duplicate char is ok
+ m_chr[i].m_size = size;
+ i++;
+ }
+ bool disorder = true;
+ unsigned bubbels = 0;
+ while (disorder) {
+ disorder = false;
+ for (unsigned i = 1; i < maxcharcount; i++) {
+ unsigned len = sizeof(m_chr[i].m_xbytes);
+ if (memcmp(m_chr[i-1].m_xbytes, m_chr[i].m_xbytes, len) > 0) {
+ Chr chr = m_chr[i];
+ m_chr[i] = m_chr[i-1];
+ m_chr[i-1] = chr;
+ disorder = true;
+ bubbels++;
+ }
+ }
+ }
+ LL3("inited charset " << cs->name << " miss1=" << miss1 << " miss2=" << miss2 << " bubbels=" << bubbels);
+}
+
+Chs::~Chs()
+{
+ delete [] m_chr;
+}
+
+static Chs* cslist[maxcsnumber];
+
+static void
+resetcslist()
+{
+ for (unsigned i = 0; i < maxcsnumber; i++) {
+ delete cslist[i];
+ cslist[i] = 0;
+ }
+}
+
+static Chs*
+getcs(Par par)
+{
+ CHARSET_INFO* cs;
+ if (par.m_cs != 0) {
+ cs = par.m_cs;
+ } else {
+ while (1) {
+ unsigned n = urandom(maxcsnumber);
+ cs = get_charset(n, MYF(0));
+ if (cs != 0) {
+ // prefer complex charsets
+ if (cs->mbmaxlen != 1 || urandom(5) == 0)
+ break;
+ }
+ }
+ }
+ if (cslist[cs->number] == 0)
+ cslist[cs->number] = new Chs(cs);
+ return cslist[cs->number];
+}
+
// tables and indexes
// Col - table column
struct Col {
+ const class Tab& m_tab;
unsigned m_num;
const char* m_name;
bool m_pk;
NdbDictionary::Column::Type m_type;
unsigned m_length;
+ unsigned m_bytelength;
bool m_nullable;
+ const Chs* m_chs;
+ Col(const class Tab& tab, unsigned num, const char* name, bool pk, NdbDictionary::Column::Type type, unsigned length, bool nullable, const Chs* chs);
+ ~Col();
+ bool equal(const Col& col2) const;
void verify(const void* addr) const;
};
+Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, NdbDictionary::Column::Type type, unsigned length, bool nullable, const Chs* chs) :
+ m_tab(tab),
+ m_num(num),
+ m_name(strcpy(new char [strlen(name) + 1], name)),
+ m_pk(pk),
+ m_type(type),
+ m_length(length),
+ m_bytelength(length * (chs == 0 ? 1 : chs->m_cs->mbmaxlen)),
+ m_nullable(nullable),
+ m_chs(chs)
+{
+}
+
+Col::~Col()
+{
+ delete [] m_name;
+}
+
+bool
+Col::equal(const Col& col2) const
+{
+ return m_type == col2.m_type && m_length == col2.m_length && m_chs == col2.m_chs;
+}
+
void
Col::verify(const void* addr) const
{
switch (m_type) {
case NdbDictionary::Column::Unsigned:
break;
- case NdbDictionary::Column::Varchar:
+ case NdbDictionary::Column::Char:
{
- const unsigned char* p = (const unsigned char*)addr;
- unsigned n = (p[0] << 8) | p[1];
- assert(n <= m_length);
- unsigned i;
- for (i = 0; i < n; i++) {
- assert(p[2 + i] != 0);
- }
- for (i = n; i < m_length; i++) {
- assert(p[2 + i] == 0);
- }
+ CHARSET_INFO* cs = m_chs->m_cs;
+ const char* src = (const char*)addr;
+ unsigned len = m_bytelength;
+ assert((*cs->cset->well_formed_len)(cs, src, src + len, 0xffff) == len);
}
break;
default:
@@ -425,14 +614,16 @@ Col::verify(const void* addr) const
static NdbOut&
operator<<(NdbOut& out, const Col& col)
{
- out << "col " << col.m_num;
- out << " " << col.m_name;
+ out << "col[" << col.m_num << "] " << col.m_name;
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
out << " unsigned";
break;
- case NdbDictionary::Column::Varchar:
- out << " varchar(" << col.m_length << ")";
+ case NdbDictionary::Column::Char:
+ {
+ CHARSET_INFO* cs = col.m_chs->m_cs;
+ out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
+ }
break;
default:
out << "type" << (int)col.m_type;
@@ -447,25 +638,60 @@ operator<<(NdbOut& out, const Col& col)
// ICol - index column
struct ICol {
+ const class ITab& m_itab;
unsigned m_num;
- struct Col m_col;
+ const Col& m_col;
+ ICol(const class ITab& itab, unsigned num, const Col& col);
+ ~ICol();
};
+ICol::ICol(const class ITab& itab, unsigned num, const Col& col) :
+ m_itab(itab),
+ m_num(num),
+ m_col(col)
+{
+}
+
+ICol::~ICol()
+{
+}
+
// ITab - index
struct ITab {
+ const class Tab& m_tab;
const char* m_name;
unsigned m_icols;
- const ICol* m_icol;
+ const ICol** m_icol;
+ ITab(const class Tab& tab, const char* name, unsigned icols);
+ ~ITab();
};
+ITab::ITab(const class Tab& tab, const char* name, unsigned icols) :
+ m_tab(tab),
+ m_name(strcpy(new char [strlen(name) + 1], name)),
+ m_icols(icols),
+ m_icol(new const ICol* [icols + 1])
+{
+ for (unsigned i = 0; i <= m_icols; i++)
+ m_icol[0] = 0;
+}
+
+ITab::~ITab()
+{
+ delete [] m_name;
+ for (unsigned i = 0; i < m_icols; i++)
+ delete m_icol[i];
+ delete [] m_icol;
+}
+
static NdbOut&
operator<<(NdbOut& out, const ITab& itab)
{
- out << "itab " << itab.m_name << " " << itab.m_icols;
+ out << "itab " << itab.m_name << " icols=" << itab.m_icols;
for (unsigned k = 0; k < itab.m_icols; k++) {
out << endl;
- out << "icol " << k << " " << itab.m_icol[k].m_col;
+ out << "icol[" << k << "] " << itab.m_icol[k]->m_col;
}
return out;
}
@@ -475,200 +701,241 @@ operator<<(NdbOut& out, const ITab& itab)
struct Tab {
const char* m_name;
unsigned m_cols;
- const Col* m_col;
+ const Col** m_col;
unsigned m_itabs;
- const ITab* m_itab;
+ const ITab** m_itab;
+ // pk must contain an Unsigned column
+ unsigned m_keycol;
+ Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol);
+ ~Tab();
};
+Tab::Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol) :
+ m_name(strcpy(new char [strlen(name) + 1], name)),
+ m_cols(cols),
+ m_col(new const Col* [cols + 1]),
+ m_itabs(itabs),
+ m_itab(new const ITab* [itabs + 1]),
+ m_keycol(keycol)
+{
+ for (unsigned i = 0; i <= cols; i++)
+ m_col[i] = 0;
+ for (unsigned i = 0; i <= itabs; i++)
+ m_itab[i] = 0;
+}
+
+Tab::~Tab()
+{
+ delete [] m_name;
+ for (unsigned i = 0; i < m_cols; i++)
+ delete m_col[i];
+ delete [] m_col;
+ for (unsigned i = 0; i < m_itabs; i++)
+ delete m_itab[i];
+ delete [] m_itab;
+}
+
static NdbOut&
operator<<(NdbOut& out, const Tab& tab)
{
- out << "tab " << tab.m_name << " " << tab.m_cols;
+ out << "tab " << tab.m_name << " cols=" << tab.m_cols;
for (unsigned k = 0; k < tab.m_cols; k++) {
out << endl;
- out << tab.m_col[k];
+ out << *tab.m_col[k];
}
for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (! useindex(i))
+ if (tab.m_itab[i] == 0)
continue;
out << endl;
- out << tab.m_itab[i];
+ out << *tab.m_itab[i];
}
return out;
}
-// tt1 + tt1x1 tt1x2 tt1x3 tt1x4 tt1x5
-
-static const Col
-tt1col[] = {
- { 0, "A", 1, NdbDictionary::Column::Unsigned, 1, 0 },
- { 1, "B", 0, NdbDictionary::Column::Unsigned, 1, 1 },
- { 2, "C", 0, NdbDictionary::Column::Unsigned, 1, 1 },
- { 3, "D", 0, NdbDictionary::Column::Unsigned, 1, 1 },
- { 4, "E", 0, NdbDictionary::Column::Unsigned, 1, 1 }
-};
-
-static const ICol
-tt1x1col[] = {
- { 0, tt1col[0] }
-};
-
-static const ICol
-tt1x2col[] = {
- { 0, tt1col[1] }
-};
-
-static const ICol
-tt1x3col[] = {
- { 0, tt1col[1] },
- { 1, tt1col[2] }
-};
-
-static const ICol
-tt1x4col[] = {
- { 0, tt1col[3] },
- { 1, tt1col[2] },
- { 2, tt1col[1] }
-};
-
-static const ICol
-tt1x5col[] = {
- { 0, tt1col[1] },
- { 1, tt1col[4] },
- { 2, tt1col[2] },
- { 3, tt1col[3] }
-};
-
-static const ITab
-tt1x1 = {
- "TT1X1", 1, tt1x1col
-};
-
-static const ITab
-tt1x2 = {
- "TT1X2", 1, tt1x2col
-};
-
-static const ITab
-tt1x3 = {
- "TT1X3", 2, tt1x3col
-};
-
-static const ITab
-tt1x4 = {
- "TT1X4", 3, tt1x4col
-};
-
-static const ITab
-tt1x5 = {
- "TT1X5", 4, tt1x5col
-};
-
-static const ITab
-tt1itab[] = {
- tt1x1,
- tt1x2,
- tt1x3,
- tt1x4,
- tt1x5
-};
-
-static const Tab
-tt1 = {
- "TT1", 5, tt1col, 5, tt1itab
-};
-
-// tt2 + tt2x1 tt2x2 tt2x3 tt2x4 tt2x5
-
-static const Col
-tt2col[] = {
- { 0, "A", 1, NdbDictionary::Column::Unsigned, 1, 0 },
- { 1, "B", 0, NdbDictionary::Column::Unsigned, 1, 1 },
- { 2, "C", 0, NdbDictionary::Column::Varchar, 20, 1 },
- { 3, "D", 0, NdbDictionary::Column::Varchar, 5, 1 },
- { 4, "E", 0, NdbDictionary::Column::Varchar, 5, 1 }
-};
-
-static const ICol
-tt2x1col[] = {
- { 0, tt2col[0] }
-};
-
-static const ICol
-tt2x2col[] = {
- { 0, tt2col[1] },
- { 1, tt2col[2] }
-};
-
-static const ICol
-tt2x3col[] = {
- { 0, tt2col[2] },
- { 1, tt2col[1] }
-};
-
-static const ICol
-tt2x4col[] = {
- { 0, tt2col[3] },
- { 1, tt2col[4] }
-};
-
-static const ICol
-tt2x5col[] = {
- { 0, tt2col[4] },
- { 1, tt2col[3] },
- { 2, tt2col[2] },
- { 3, tt2col[1] }
-};
-
-static const ITab
-tt2x1 = {
- "TT2X1", 1, tt2x1col
-};
-
-static const ITab
-tt2x2 = {
- "TT2X2", 2, tt2x2col
-};
+// make table structs
-static const ITab
-tt2x3 = {
- "TT2X3", 2, tt2x3col
-};
-
-static const ITab
-tt2x4 = {
- "TT2X4", 2, tt2x4col
-};
+static const Tab** tablist = 0;
+static unsigned tabcount = 0;
-static const ITab
-tt2x5 = {
- "TT2X5", 4, tt2x5col
-};
-
-static const ITab
-tt2itab[] = {
- tt2x1,
- tt2x2,
- tt2x3,
- tt2x4,
- tt2x5
-};
-
-static const Tab
-tt2 = {
- "TT2", 5, tt2col, 5, tt2itab
-};
-
-// all tables
-
-static const Tab
-tablist[] = {
- tt1,
- tt2
-};
+static void
+verifytables()
+{
+ for (unsigned j = 0; j < tabcount; j++) {
+ const Tab* t = tablist[j];
+ if (t == 0)
+ continue;
+ assert(t->m_cols != 0 && t->m_col != 0);
+ for (unsigned k = 0; k < t->m_cols; k++) {
+ const Col* c = t->m_col[k];
+ assert(c != 0 && c->m_num == k);
+ assert(! (c->m_pk && c->m_nullable));
+ }
+ assert(t->m_col[t->m_cols] == 0);
+ {
+ assert(t->m_keycol < t->m_cols);
+ const Col* c = t->m_col[t->m_keycol];
+ assert(c->m_pk && c->m_type == NdbDictionary::Column::Unsigned);
+ }
+ assert(t->m_itabs != 0 && t->m_itab != 0);
+ for (unsigned i = 0; i < t->m_itabs; i++) {
+ const ITab* x = t->m_itab[i];
+ if (x == 0)
+ continue;
+ assert(x != 0 && x->m_icols != 0 && x->m_icol != 0);
+ for (unsigned k = 0; k < x->m_icols; k++) {
+ const ICol* c = x->m_icol[k];
+ assert(c != 0 && c->m_num == k && c->m_col.m_num < t->m_cols);
+ }
+ }
+ assert(t->m_itab[t->m_itabs] == 0);
+ }
+}
-static const unsigned
-tabcount = sizeof(tablist) / sizeof(tablist[0]);
+static void
+makebuiltintables(Par par)
+{
+ LL2("makebuiltintables");
+ resetcslist();
+ tabcount = 3;
+ if (tablist == 0) {
+ tablist = new const Tab* [tabcount];
+ for (unsigned j = 0; j < tabcount; j++) {
+ tablist[j] = 0;
+ }
+ } else {
+ for (unsigned j = 0; j < tabcount; j++) {
+ delete tablist[j];
+ tablist[j] = 0;
+ }
+ }
+ // ti0 - basic
+ if (usetable(par, 0)) {
+ const Tab* t = new Tab("ti0", 5, 5, 0);
+ // name - pk - type - length - nullable - cs
+ t->m_col[0] = new Col(*t, 0, "a", 1, NdbDictionary::Column::Unsigned, 1, 0, 0);
+ t->m_col[1] = new Col(*t, 1, "b", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
+ t->m_col[2] = new Col(*t, 2, "c", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
+ t->m_col[3] = new Col(*t, 3, "d", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
+ t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
+ if (useindex(par, 0)) {
+ // a
+ const ITab* x = t->m_itab[0] = new ITab(*t, "ti0x0", 1);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
+ }
+ if (useindex(par, 1)) {
+ // b
+ const ITab* x = t->m_itab[1] = new ITab(*t, "ti0x1", 1);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ }
+ if (useindex(par, 2)) {
+ // b, c
+ const ITab* x = t->m_itab[2] = new ITab(*t, "ti0x2", 2);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
+ }
+ if (useindex(par, 3)) {
+ // d, c, b
+ const ITab* x = t->m_itab[3] = new ITab(*t, "ti0x3", 3);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[3]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
+ x->m_icol[2] = new ICol(*x, 2, *t->m_col[1]);
+ }
+ if (useindex(par, 4)) {
+ // b, e, c, d
+ const ITab* x = t->m_itab[4] = new ITab(*t, "ti0x4", 4);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[4]);
+ x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]);
+ x->m_icol[3] = new ICol(*x, 3, *t->m_col[3]);
+ }
+ tablist[0] = t;
+ }
+ // ti1 - simple char fields
+ if (usetable(par, 1)) {
+ const Tab* t = new Tab("ti1", 5, 5, 1);
+ // name - pk - type - length - nullable - cs
+ t->m_col[0] = new Col(*t, 0, "a", 0, NdbDictionary::Column::Unsigned, 1, 1, 0);
+ t->m_col[1] = new Col(*t, 1, "b", 1, NdbDictionary::Column::Unsigned, 1, 0, 0);
+ t->m_col[2] = new Col(*t, 2, "c", 0, NdbDictionary::Column::Char, 20, 1, getcs(par));
+ t->m_col[3] = new Col(*t, 3, "d", 0, NdbDictionary::Column::Char, 5, 1, getcs(par));
+ t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Char, 5, 1, getcs(par));
+ if (useindex(par, 0)) {
+ // b
+ const ITab* x = t->m_itab[0] = new ITab(*t, "ti1x0", 1);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ }
+ if (useindex(par, 1)) {
+ // a, c
+ const ITab* x = t->m_itab[1] = new ITab(*t, "ti1x1", 2);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
+ }
+ if (useindex(par, 2)) {
+ // c, a
+ const ITab* x = t->m_itab[2] = new ITab(*t, "ti1x2", 2);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[2]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[0]);
+ }
+ if (useindex(par, 3)) {
+ // e
+ const ITab* x = t->m_itab[3] = new ITab(*t, "ti1x3", 1);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]);
+ }
+ if (useindex(par, 4)) {
+ // e, d, c, b
+ const ITab* x = t->m_itab[4] = new ITab(*t, "ti1x4", 4);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[3]);
+ x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]);
+ x->m_icol[3] = new ICol(*x, 3, *t->m_col[1]);
+ }
+ tablist[1] = t;
+ }
+ // ti2 - complex char fields
+ if (usetable(par, 2)) {
+ const Tab* t = new Tab("ti2", 5, 5, 2);
+ // name - pk - type - length - nullable - cs
+ t->m_col[0] = new Col(*t, 0, "a", 1, NdbDictionary::Column::Char, 101, 0, getcs(par));
+ t->m_col[1] = new Col(*t, 1, "b", 0, NdbDictionary::Column::Char, 4, 1, getcs(par));
+ t->m_col[2] = new Col(*t, 2, "c", 1, NdbDictionary::Column::Unsigned, 1, 0, 0);
+ t->m_col[3] = new Col(*t, 3, "d", 1, NdbDictionary::Column::Char, 3, 0, getcs(par));
+ t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Char, 101, 0, getcs(par));
+ if (useindex(par, 0)) {
+ // a, c, d
+ const ITab* x = t->m_itab[0] = new ITab(*t, "ti2x0", 3);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]);
+ x->m_icol[2] = new ICol(*x, 2, *t->m_col[3]);
+ }
+ if (useindex(par, 1)) {
+ // e, d, c, b, a
+ const ITab* x = t->m_itab[1] = new ITab(*t, "ti2x1", 5);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[3]);
+ x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]);
+ x->m_icol[3] = new ICol(*x, 3, *t->m_col[1]);
+ x->m_icol[4] = new ICol(*x, 4, *t->m_col[0]);
+ }
+ if (useindex(par, 2)) {
+ // d
+ const ITab* x = t->m_itab[2] = new ITab(*t, "ti2x2", 1);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[3]);
+ }
+ if (useindex(par, 3)) {
+ // b
+ const ITab* x = t->m_itab[3] = new ITab(*t, "ti2x3", 1);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]);
+ }
+ if (useindex(par, 4)) {
+ // a, e
+ const ITab* x = t->m_itab[4] = new ITab(*t, "ti2x4", 2);
+ x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]);
+ x->m_icol[1] = new ICol(*x, 1, *t->m_col[4]);
+ }
+ tablist[2] = t;
+ }
+ verifytables();
+}
// connections
@@ -834,7 +1101,7 @@ Con::execute(ExecType t, bool& deadlock)
int
Con::openScanRead(unsigned scanbat, unsigned scanpar)
{
- assert(m_tx != 0 && m_op != 0);
+ assert(m_tx != 0 && m_scanop != 0);
NdbOperation::LockMode lm = NdbOperation::LM_Read;
CHKCON((m_resultset = m_scanop->readTuples(lm, scanbat, scanpar)) != 0, *this);
return 0;
@@ -843,7 +1110,7 @@ Con::openScanRead(unsigned scanbat, unsigned scanpar)
int
Con::openScanExclusive(unsigned scanbat, unsigned scanpar)
{
- assert(m_tx != 0 && m_op != 0);
+ assert(m_tx != 0 && m_scanop != 0);
NdbOperation::LockMode lm = NdbOperation::LM_Exclusive;
CHKCON((m_resultset = m_scanop->readTuples(lm, scanbat, scanpar)) != 0, *this);
return 0;
@@ -936,7 +1203,7 @@ Con::printerror(NdbOut& out)
if ((code = m_tx->getNdbError().code) != 0) {
LL0(++any << " con: error " << m_tx->getNdbError());
die += (code == g_opt.m_die);
- if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499)
+ if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499 || code == 631)
m_errtype = ErrDeadlock;
}
if (m_op && m_op->getNdbError().code != 0) {
@@ -972,9 +1239,9 @@ invalidateindex(Par par)
Con& con = par.con();
const Tab& tab = par.tab();
for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (! useindex(i))
+ if (tab.m_itab[i] == 0)
continue;
- const ITab& itab = tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
invalidateindex(par, itab);
}
return 0;
@@ -1022,16 +1289,14 @@ createtable(Par par)
t.setLogging(false);
}
for (unsigned k = 0; k < tab.m_cols; k++) {
- const Col& col = tab.m_col[k];
+ const Col& col = *tab.m_col[k];
NdbDictionary::Column c(col.m_name);
c.setType(col.m_type);
- c.setLength(col.m_length);
+ c.setLength(col.m_bytelength); // NDB API uses length in bytes
c.setPrimaryKey(col.m_pk);
c.setNullable(col.m_nullable);
- if (c.getCharset()) { // test if char type
- if (! col.m_pk)
- c.setCharset(par.m_cs);
- }
+ if (col.m_chs != 0)
+ c.setCharset(col.m_chs->m_cs);
t.addColumn(c);
}
con.m_dic = con.m_ndb->getDictionary();
@@ -1062,9 +1327,9 @@ dropindex(Par par)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (! useindex(i))
+ if (tab.m_itab[i] == 0)
continue;
- const ITab& itab = tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
CHK(dropindex(par, itab) == 0);
}
return 0;
@@ -1082,7 +1347,8 @@ createindex(Par par, const ITab& itab)
x.setType(NdbDictionary::Index::OrderedIndex);
x.setLogging(false);
for (unsigned k = 0; k < itab.m_icols; k++) {
- const Col& col = itab.m_icol[k].m_col;
+ const ICol& icol = *itab.m_icol[k];
+ const Col& col = icol.m_col;
x.addColumnName(col.m_name);
}
con.m_dic = con.m_ndb->getDictionary();
@@ -1096,9 +1362,9 @@ createindex(Par par)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (! useindex(i))
+ if (tab.m_itab[i] == 0)
continue;
- const ITab& itab = tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
CHK(createindex(par, itab) == 0);
}
return 0;
@@ -1106,33 +1372,13 @@ createindex(Par par)
// data sets
-static unsigned
-urandom(unsigned n)
-{
- if (n == 0)
- return 0;
- unsigned i = random() % n;
- return i;
-}
-
-static int
-irandom(unsigned n)
-{
- if (n == 0)
- return 0;
- int i = random() % n;
- if (random() & 0x1)
- i = -i;
- return i;
-}
-
// Val - typed column value
struct Val {
const Col& m_col;
union {
Uint32 m_uint32;
- char* m_varchar;
+ unsigned char* m_char;
};
Val(const Col& col);
~Val();
@@ -1142,6 +1388,8 @@ struct Val {
bool m_null;
int setval(Par par) const;
void calc(Par par, unsigned i);
+ void calckey(Par par, unsigned i);
+ void calcnokey(Par par);
int verify(const Val& val2) const;
int cmp(const Val& val2) const;
private:
@@ -1157,8 +1405,8 @@ Val::Val(const Col& col) :
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
break;
- case NdbDictionary::Column::Varchar:
- m_varchar = new char [2 + col.m_length];
+ case NdbDictionary::Column::Char:
+ m_char = new unsigned char [col.m_bytelength];
break;
default:
assert(false);
@@ -1172,8 +1420,8 @@ Val::~Val()
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
break;
- case NdbDictionary::Column::Varchar:
- delete [] m_varchar;
+ case NdbDictionary::Column::Char:
+ delete [] m_char;
break;
default:
assert(false);
@@ -1202,8 +1450,8 @@ Val::copy(const void* addr)
case NdbDictionary::Column::Unsigned:
m_uint32 = *(const Uint32*)addr;
break;
- case NdbDictionary::Column::Varchar:
- memcpy(m_varchar, addr, 2 + col.m_length);
+ case NdbDictionary::Column::Char:
+ memcpy(m_char, addr, col.m_bytelength);
break;
default:
assert(false);
@@ -1219,8 +1467,8 @@ Val::dataaddr() const
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
return &m_uint32;
- case NdbDictionary::Column::Varchar:
- return m_varchar;
+ case NdbDictionary::Column::Char:
+ return m_char;
default:
break;
}
@@ -1236,11 +1484,11 @@ Val::setval(Par par) const
const char* addr = (const char*)dataaddr();
if (m_null)
addr = 0;
+ LL5("setval [" << m_col << "] " << *this);
if (col.m_pk)
CHK(con.equal(col.m_num, addr) == 0);
else
CHK(con.setValue(col.m_num, addr) == 0);
- LL5("setval [" << m_col << "] " << *this);
return 0;
}
@@ -1248,43 +1496,93 @@ void
Val::calc(Par par, unsigned i)
{
const Col& col = m_col;
+ col.m_pk ? calckey(par, i) : calcnokey(par);
+ if (! m_null)
+ col.verify(dataaddr());
+}
+
+void
+Val::calckey(Par par, unsigned i)
+{
+ const Col& col = m_col;
m_null = false;
- if (col.m_pk) {
+ switch (col.m_type) {
+ case NdbDictionary::Column::Unsigned:
m_uint32 = i;
- return;
+ break;
+ case NdbDictionary::Column::Char:
+ {
+ const Chs* chs = col.m_chs;
+ CHARSET_INFO* cs = chs->m_cs;
+ unsigned n = 0;
+ // our random chars may not fill value exactly
+ while (n + cs->mbmaxlen <= col.m_bytelength) {
+ if (i % (1 + n) == 0) {
+ break;
+ }
+ const Chr& chr = chs->m_chr[i % maxcharcount];
+ memcpy(&m_char[n], chr.m_bytes, chr.m_size);
+ n += chr.m_size;
+ }
+ // this will extend by appropriate space
+ (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
+ }
+ break;
+ default:
+ assert(false);
+ break;
}
+}
+
+void
+Val::calcnokey(Par par)
+{
+ const Col& col = m_col;
+ m_null = false;
if (col.m_nullable && urandom(100) < par.m_pctnull) {
m_null = true;
return;
}
- unsigned v = par.m_range + irandom((par.m_pctrange * par.m_range) / 100);
+ int r = irandom((par.m_pctrange * par.m_range) / 100);
+ if (par.m_bdir != 0 && urandom(10) != 0) {
+ if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0)
+ r = -r;
+ }
+ unsigned v = par.m_range + r;
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
m_uint32 = v;
break;
- case NdbDictionary::Column::Varchar:
+ case NdbDictionary::Column::Char:
{
+ const Chs* chs = col.m_chs;
+ CHARSET_INFO* cs = chs->m_cs;
unsigned n = 0;
- while (n < col.m_length) {
- if (urandom(1 + col.m_length) == 0) {
- // nice distribution on lengths
+ // our random chars may not fill value exactly
+ while (n + cs->mbmaxlen <= col.m_bytelength) {
+ if (urandom(1 + col.m_bytelength) == 0) {
break;
}
- m_varchar[2 + n++] = 'a' + urandom((par.m_pctrange * 10) / 100);
- }
- m_varchar[0] = (n >> 8);
- m_varchar[1] = (n & 0xff);
- while (n < col.m_length) {
- m_varchar[2 + n++] = 0;
+ unsigned half = maxcharcount / 2;
+ int r = irandom((par.m_pctrange * half) / 100);
+ if (par.m_bdir != 0 && urandom(10) != 0) {
+ if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0)
+ r = -r;
+ }
+ unsigned i = half + r;
+ assert(i < maxcharcount);
+ const Chr& chr = chs->m_chr[i];
+ memcpy(&m_char[n], chr.m_bytes, chr.m_size);
+ n += chr.m_size;
}
+ // this will extend by appropriate space
+ (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
}
break;
default:
assert(false);
break;
}
- // verify format
- col.verify(dataaddr());
}
int
@@ -1299,7 +1597,7 @@ Val::cmp(const Val& val2) const
{
const Col& col = m_col;
const Col& col2 = val2.m_col;
- assert(col.m_type == col2.m_type && col.m_length == col2.m_length);
+ assert(col.equal(col2));
if (m_null || val2.m_null) {
if (! m_null)
return +1;
@@ -1313,13 +1611,37 @@ Val::cmp(const Val& val2) const
// compare
switch (col.m_type) {
case NdbDictionary::Column::Unsigned:
- if (m_uint32 < val2.m_uint32)
- return -1;
- if (m_uint32 > val2.m_uint32)
- return +1;
- return 0;
- case NdbDictionary::Column::Varchar:
- return memcmp(&m_varchar[2], &val2.m_varchar[2], col.m_length);
+ {
+ if (m_uint32 < val2.m_uint32)
+ return -1;
+ if (m_uint32 > val2.m_uint32)
+ return +1;
+ return 0;
+ }
+ break;
+ case NdbDictionary::Column::Char:
+ {
+ const Chs* chs = col.m_chs;
+ CHARSET_INFO* cs = chs->m_cs;
+ unsigned len = col.m_bytelength;
+ int k;
+ if (! g_opt.m_collsp) {
+ unsigned char x1[maxxmulsize * 8000];
+ unsigned char x2[maxxmulsize * 8000];
+ int n1 = (*cs->coll->strnxfrm)(cs, x1, chs->m_xmul * len, m_char, len);
+ int n2 = (*cs->coll->strnxfrm)(cs, x2, chs->m_xmul * len, val2.m_char, len);
+ // currently same but do not assume it
+ unsigned n = (n1 > n2 ? n1 : n2);
+ // assume null padding
+ memset(x1 + n1, 0x0, n - n1);
+ memset(x2 + n2, 0x0, n - n2);
+ k = memcmp(x1, x2, n);
+ } else {
+ k = (*cs->coll->strnncollsp)(cs, m_char, len, val2.m_char, len, false);
+ }
+ return k < 0 ? -1 : k > 0 ? +1 : 0;
+ }
+ break;
default:
break;
}
@@ -1339,12 +1661,26 @@ operator<<(NdbOut& out, const Val& val)
case NdbDictionary::Column::Unsigned:
out << val.m_uint32;
break;
- case NdbDictionary::Column::Varchar:
+ case NdbDictionary::Column::Char:
{
- char buf[8000];
- unsigned n = (val.m_varchar[0] << 8) | val.m_varchar[1];
- assert(n <= col.m_length);
- sprintf(buf, "'%.*s'[%d]", n, &val.m_varchar[2], n);
+ char buf[4 * 8000];
+ char *p = buf;
+ *p++ = '[';
+ for (unsigned i = 0; i < col.m_bytelength; i++) {
+ unsigned char c = val.m_char[i];
+ if (c == '\\') {
+ *p++ = '\\';
+ *p++ = '\\';
+ } else if (0x20 <= c && c < 0x7e) {
+ *p++ = c;
+ } else {
+ *p++ = '\\';
+ *p++ = hexstr[c >> 4];
+ *p++ = hexstr[c & 15];
+ }
+ }
+ *p++ = ']';
+ *p = 0;
out << buf;
}
break;
@@ -1384,7 +1720,7 @@ Row::Row(const Tab& tab) :
{
m_val = new Val* [tab.m_cols];
for (unsigned k = 0; k < tab.m_cols; k++) {
- const Col& col = tab.m_col[k];
+ const Col& col = *tab.m_col[k];
m_val[k] = new Val(col);
}
m_exist = false;
@@ -1461,6 +1797,16 @@ Row::updrow(Par par)
CHKCON(con.m_op->updateTuple() == 0, con);
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
+ const Col& col = val.m_col;
+ if (! col.m_pk)
+ continue;
+ CHK(val.setval(par) == 0);
+ }
+ for (unsigned k = 0; k < tab.m_cols; k++) {
+ const Val& val = *m_val[k];
+ const Col& col = val.m_col;
+ if (col.m_pk)
+ continue;
CHK(val.setval(par) == 0);
}
m_pending = UpdOp;
@@ -1668,14 +2014,33 @@ Set::pending(unsigned i) const
}
void
+Set::notpending(unsigned i)
+{
+ assert(m_row[i] != 0);
+ Row& row = *m_row[i];
+ if (row.m_pending == Row::InsOp)
+ row.m_exist = true;
+ if (row.m_pending == Row::DelOp)
+ row.m_exist = false;
+ row.m_pending = Row::NoOp;
+}
+
+void
+Set::notpending(const Lst& lst)
+{
+ for (unsigned j = 0; j < lst.m_cnt; j++) {
+ unsigned i = lst.m_arr[j];
+ notpending(i);
+ }
+}
+
+void
Set::calc(Par par, unsigned i)
{
const Tab& tab = m_tab;
if (m_row[i] == 0)
m_row[i] = new Row(tab);
Row& row = *m_row[i];
- // value generation parameters
- par.m_pctrange = 40;
row.calc(par, i);
}
@@ -1739,9 +2104,11 @@ Set::getval(Par par)
int
Set::getkey(Par par, unsigned* i)
{
- assert(m_rec[0] != 0);
- const char* aRef0 = m_rec[0]->aRef();
- Uint32 key = *(const Uint32*)aRef0;
+ const Tab& tab = m_tab;
+ unsigned k = tab.m_keycol;
+ assert(m_rec[k] != 0);
+ const char* aRef = m_rec[k]->aRef();
+ Uint32 key = *(const Uint32*)aRef;
CHK(key < m_rows);
*i = key;
return 0;
@@ -1772,32 +2139,12 @@ Set::putval(unsigned i, bool force)
return 0;
}
-void
-Set::notpending(unsigned i)
-{
- assert(m_row[i] != 0);
- Row& row = *m_row[i];
- if (row.m_pending == Row::InsOp)
- row.m_exist = true;
- if (row.m_pending == Row::DelOp)
- row.m_exist = false;
- row.m_pending = Row::NoOp;
-}
-
-void
-Set::notpending(const Lst& lst)
-{
- for (unsigned j = 0; j < lst.m_cnt; j++) {
- unsigned i = lst.m_arr[j];
- notpending(i);
- }
-}
-
int
Set::verify(const Set& set2) const
{
const Tab& tab = m_tab;
assert(&tab == &set2.m_tab && m_rows == set2.m_rows);
+ LL3("verify set1 count=" << count() << " vs set2 count=" << set2.count());
for (unsigned i = 0; i < m_rows; i++) {
CHK(exist(i) == set2.exist(i));
if (! exist(i))
@@ -1884,10 +2231,10 @@ operator<<(NdbOut& out, const BVal& bval)
const ICol& icol = bval.m_icol;
const Col& col = icol.m_col;
const Val& val = bval;
- out << "type " << bval.m_type;
- out << " icol " << icol.m_num;
- out << " col " << col.m_name << "(" << col.m_num << ")";
- out << " value " << val;
+ out << "type=" << bval.m_type;
+ out << " icol=" << icol.m_num;
+ out << " col=" << col.m_num << "," << col.m_name;
+ out << " value=" << val;
return out;
}
@@ -1939,12 +2286,15 @@ void
BSet::calc(Par par)
{
const ITab& itab = m_itab;
+ par.m_pctrange = par.m_pctbrange;
reset();
for (unsigned k = 0; k < itab.m_icols; k++) {
- const ICol& icol = itab.m_icol[k];
+ const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
for (unsigned i = 0; i <= 1; i++) {
- if (urandom(10) == 0)
+ if (m_bvals == 0 && urandom(100) == 0)
+ return;
+ if (m_bvals != 0 && urandom(3) == 0)
return;
assert(m_bvals < m_alloc);
BVal& bval = *new BVal(icol);
@@ -1963,12 +2313,14 @@ BSet::calc(Par par)
bval.m_type = 4;
if (k + 1 < itab.m_icols)
bval.m_type = 4;
- // value generation parammeters
if (! g_compare_null)
par.m_pctnull = 0;
- par.m_pctrange = 50; // bit higher
+ if (bval.m_type == 0 || bval.m_type == 1)
+ par.m_bdir = -1;
+ if (bval.m_type == 2 || bval.m_type == 3)
+ par.m_bdir = +1;
do {
- bval.calc(par, 0);
+ bval.calcnokey(par);
if (i == 1) {
assert(m_bvals >= 2);
const BVal& bv1 = *m_bval[m_bvals - 2];
@@ -1990,7 +2342,7 @@ BSet::calcpk(Par par, unsigned i)
const ITab& itab = m_itab;
reset();
for (unsigned k = 0; k < itab.m_icols; k++) {
- const ICol& icol = itab.m_icol[k];
+ const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
assert(col.m_pk);
assert(m_bvals < m_alloc);
@@ -2037,7 +2389,7 @@ BSet::filter(const Set& set, Set& set2) const
if (! g_store_null_key) {
bool ok1 = false;
for (unsigned k = 0; k < itab.m_icols; k++) {
- const ICol& icol = itab.m_icol[k];
+ const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
const Val& val = *row.m_val[col.m_num];
if (! val.m_null) {
@@ -2055,6 +2407,7 @@ BSet::filter(const Set& set, Set& set2) const
const Col& col = icol.m_col;
const Val& val = *row.m_val[col.m_num];
int ret = bval.cmp(val);
+ LL5("cmp: ret=" << ret << " " << bval << " vs " << val);
if (bval.m_type == 0)
ok2 = (ret <= 0);
else if (bval.m_type == 1)
@@ -2087,9 +2440,8 @@ operator<<(NdbOut& out, const BSet& bset)
{
out << "bounds=" << bset.m_bvals;
for (unsigned j = 0; j < bset.m_bvals; j++) {
- out << endl;
const BVal& bval = *bset.m_bval[j];
- out << "bound " << j << ": " << bval;
+ out << " [bound " << j << ": " << bval << "]";
}
return out;
}
@@ -2318,26 +2670,36 @@ scanreadtable(Par par)
const Set& set = par.set();
// expected
const Set& set1 = set;
- LL3((par.m_verify ? "scanverify " : "scanread ") << tab.m_name);
+ LL3("scanread " << tab.m_name << " verify=" << par.m_verify);
+ LL4("expect " << set.count() << " rows");
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(tab) == 0);
CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
+ unsigned n = 0;
+ bool deadlock = false;
while (1) {
int ret;
- CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
+ deadlock = par.m_deadlock;
+ CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
if (ret == 1)
break;
+ if (deadlock) {
+ LL1("scanreadtable: stop on deadlock");
+ break;
+ }
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
CHK(set2.putval(i, false) == 0);
- LL4("row " << set2.count() << ": " << *set2.m_row[i]);
+ LL4("row " << n << ": " << *set2.m_row[i]);
+ n++;
}
con.closeTransaction();
if (par.m_verify)
CHK(set1.verify(set2) == 0);
+ LL3("scanread " << tab.m_name << " rows=" << n);
return 0;
}
@@ -2369,16 +2731,30 @@ scanreadtablefast(Par par, unsigned countcheck)
}
static int
-scanreadindex(Par par, const ITab& itab, const BSet& bset)
+scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
{
Con& con = par.con();
const Tab& tab = par.tab();
const Set& set = par.set();
- // expected
- Set set1(tab, set.m_rows);
- bset.filter(set, set1);
- LL3((par.m_verify ? "scanverify " : "scanread ") << itab.m_name << " bounds=" << bset.m_bvals);
LL4(bset);
+ Set set1(tab, set.m_rows);
+ if (calc) {
+ while (true) {
+ bset.calc(par);
+ bset.filter(set, set1);
+ unsigned n = set1.count();
+ // prefer proper subset
+ if (0 < n && n < set.m_rows)
+ break;
+ if (urandom(5) == 0)
+ break;
+ set1.reset();
+ }
+ } else {
+ bset.filter(set, set1);
+ }
+ LL3("scanread " << itab.m_name << " bounds=" << bset.m_bvals << " verify=" << par.m_verify);
+ LL4("expect " << set1.count() << " rows");
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(itab, tab) == 0);
@@ -2386,20 +2762,29 @@ scanreadindex(Par par, const ITab& itab, const BSet& bset)
CHK(bset.setbnd(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
+ unsigned n = 0;
+ bool deadlock = false;
while (1) {
int ret;
- CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
+ deadlock = par.m_deadlock;
+ CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
if (ret == 1)
break;
+ if (deadlock) {
+ LL1("scanreadindex: stop on deadlock");
+ break;
+ }
unsigned i = (unsigned)-1;
CHK(set2.getkey(par, &i) == 0);
LL4("key " << i);
CHK(set2.putval(i, par.m_dups) == 0);
- LL4("row " << set2.count() << ": " << *set2.m_row[i]);
+ LL4("row " << n << ": " << *set2.m_row[i]);
+ n++;
}
con.closeTransaction();
if (par.m_verify)
CHK(set1.verify(set2) == 0);
+ LL3("scanread " << itab.m_name << " rows=" << n);
return 0;
}
@@ -2438,8 +2823,7 @@ scanreadindex(Par par, const ITab& itab)
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows);
- bset.calc(par);
- CHK(scanreadindex(par, itab, bset) == 0);
+ CHK(scanreadindex(par, itab, bset, true) == 0);
}
return 0;
}
@@ -2449,9 +2833,9 @@ scanreadindex(Par par)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (! useindex(i))
+ if (tab.m_itab[i] == 0)
continue;
- const ITab& itab = tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
CHK(scanreadindex(par, itab) == 0);
}
return 0;
@@ -2481,7 +2865,7 @@ static int
timescanpkindex(Par par)
{
const Tab& tab = par.tab();
- const ITab& itab = tab.m_itab[0]; // 1st index is on PK
+ const ITab& itab = *tab.m_itab[0]; // 1st index is on PK
BSet bset(tab, itab, par.m_rows);
par.tmr().on();
CHK(scanreadindexfast(par, itab, bset, par.m_totrows) == 0);
@@ -2505,7 +2889,7 @@ static int
timepkreadindex(Par par)
{
const Tab& tab = par.tab();
- const ITab& itab = tab.m_itab[0]; // 1st index is on PK
+ const ITab& itab = *tab.m_itab[0]; // 1st index is on PK
BSet bset(tab, itab, par.m_rows);
unsigned count = par.m_samples;
if (count == 0)
@@ -2575,7 +2959,12 @@ scanupdatetable(Par par)
}
set.unlock();
if (lst.cnt() == par.m_batch) {
- CHK(con2.execute(Commit) == 0);
+ deadlock = par.m_deadlock;
+ CHK(con2.execute(Commit, deadlock) == 0);
+ if (deadlock) {
+ LL1("scanupdateindex: stop on deadlock");
+ goto out;
+ }
con2.closeTransaction();
set.lock();
set.notpending(lst);
@@ -2586,7 +2975,12 @@ scanupdatetable(Par par)
}
CHK((ret = con.nextScanResult(false)) == 0 || ret == 1 || ret == 2);
if (ret == 2 && lst.cnt() != 0) {
- CHK(con2.execute(Commit) == 0);
+ deadlock = par.m_deadlock;
+ CHK(con2.execute(Commit, deadlock) == 0);
+ if (deadlock) {
+ LL1("scanupdateindex: stop on deadlock");
+ goto out;
+ }
con2.closeTransaction();
set.lock();
set.notpending(lst);
@@ -2599,6 +2993,7 @@ scanupdatetable(Par par)
if (ret == 1)
break;
}
+out:
con2.closeTransaction();
LL3("scan update " << tab.m_name << " rows updated=" << count);
con.closeTransaction();
@@ -2659,7 +3054,12 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
}
set.unlock();
if (lst.cnt() == par.m_batch) {
- CHK(con2.execute(Commit) == 0);
+ deadlock = par.m_deadlock;
+ CHK(con2.execute(Commit, deadlock) == 0);
+ if (deadlock) {
+ LL1("scanupdateindex: stop on deadlock");
+ goto out;
+ }
con2.closeTransaction();
set.lock();
set.notpending(lst);
@@ -2670,7 +3070,12 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
}
CHK((ret = con.nextScanResult(false)) == 0 || ret == 1 || ret == 2);
if (ret == 2 && lst.cnt() != 0) {
- CHK(con2.execute(Commit) == 0);
+ deadlock = par.m_deadlock;
+ CHK(con2.execute(Commit, deadlock) == 0);
+ if (deadlock) {
+ LL1("scanupdateindex: stop on deadlock");
+ goto out;
+ }
con2.closeTransaction();
set.lock();
set.notpending(lst);
@@ -2681,6 +3086,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
}
} while (ret == 0);
}
+out:
con2.closeTransaction();
LL3("scan update " << itab.m_name << " rows updated=" << count);
con.closeTransaction();
@@ -2704,9 +3110,9 @@ scanupdateindex(Par par)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (! useindex(i))
+ if (tab.m_itab[i] == 0)
continue;
- const ITab& itab = tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
CHK(scanupdateindex(par, itab) == 0);
}
return 0;
@@ -2743,17 +3149,25 @@ readverifyfull(Par par)
CHK(scanreadtable(par) == 0);
else {
const Tab& tab = par.tab();
- unsigned i = par.m_no;
- if (i <= tab.m_itabs && useindex(i)) {
- const ITab& itab = tab.m_itab[i - 1];
+ unsigned i = par.m_no - 1;
+ if (i < tab.m_itabs && tab.m_itab[i] != 0) {
+ const ITab& itab = *tab.m_itab[i];
BSet bset(tab, itab, par.m_rows);
- CHK(scanreadindex(par, itab, bset) == 0);
+ CHK(scanreadindex(par, itab, bset, false) == 0);
}
}
return 0;
}
static int
+readverifyindex(Par par)
+{
+ par.m_verify = true;
+ CHK(scanreadindex(par) == 0);
+ return 0;
+}
+
+static int
pkops(Par par)
{
par.m_randomkey = true;
@@ -2786,6 +3200,7 @@ static int
pkupdatescanread(Par par)
{
par.m_dups = true;
+ par.m_deadlock = true;
unsigned sel = urandom(10);
if (sel < 5) {
CHK(pkupdate(par) == 0);
@@ -3074,6 +3489,24 @@ tbuild(Par par)
}
static int
+tindexscan(Par par)
+{
+ RUNSTEP(par, droptable, ST);
+ RUNSTEP(par, createtable, ST);
+ RUNSTEP(par, invalidatetable, MT);
+ RUNSTEP(par, createindex, ST);
+ RUNSTEP(par, invalidateindex, MT);
+ RUNSTEP(par, pkinsert, MT);
+ RUNSTEP(par, readverifyfull, MT);
+ for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL4("subloop " << par.m_slno);
+ RUNSTEP(par, readverifyindex, MT);
+ }
+ return 0;
+}
+
+
+static int
tpkops(Par par)
{
RUNSTEP(par, droptable, ST);
@@ -3188,6 +3621,10 @@ ttimemaint(Par par)
static int
ttimescan(Par par)
{
+ if (par.tab().m_itab[0] == 0) {
+ LL1("ttimescan - no index 0, skipped");
+ return 0;
+ }
Tmr t1, t2;
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
@@ -3210,6 +3647,10 @@ ttimescan(Par par)
static int
ttimepkread(Par par)
{
+ if (par.tab().m_itab[0] == 0) {
+ LL1("ttimescan - no index 0, skipped");
+ return 0;
+ }
Tmr t1, t2;
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
@@ -3250,10 +3691,11 @@ struct TCase {
static const TCase
tcaselist[] = {
TCase("a", tbuild, "index build"),
- TCase("b", tpkops, "pk operations"),
- TCase("c", tpkopsread, "pk operations and scan reads"),
- TCase("d", tmixedops, "pk operations and scan operations"),
- TCase("e", tbusybuild, "pk operations and index build"),
+ TCase("b", tindexscan, "index scans"),
+ TCase("c", tpkops, "pk operations"),
+ TCase("d", tpkopsread, "pk operations and scan reads"),
+ TCase("e", tmixedops, "pk operations and scan operations"),
+ TCase("f", tbusybuild, "pk operations and index build"),
TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"),
@@ -3277,12 +3719,16 @@ printcases()
static void
printtables()
{
- ndbout << "tables and indexes (X1 is on table PK):" << endl;
+ Par par(g_opt);
+ makebuiltintables(par);
+ ndbout << "builtin tables (index x0 is on table pk):" << endl;
for (unsigned j = 0; j < tabcount; j++) {
- const Tab& tab = tablist[j];
+ if (tablist[j] == 0)
+ continue;
+ const Tab& tab = *tablist[j];
ndbout << " " << tab.m_name;
for (unsigned i = 0; i < tab.m_itabs; i++) {
- const ITab& itab = tab.m_itab[i];
+ const ITab& itab = *tab.m_itab[i];
ndbout << " " << itab.m_name;
}
ndbout << endl;
@@ -3293,15 +3739,25 @@ static int
runtest(Par par)
{
LL1("start");
- if (par.m_seed != 0)
+ if (par.m_seed == -1) {
+ // good enough for daily run
+ unsigned short seed = (getpid() ^ time(0));
+ LL1("random seed: " << seed);
+ srandom((unsigned)seed);
+ } else if (par.m_seed != 0)
srandom(par.m_seed);
+ // cs
assert(par.m_csname != 0);
- CHARSET_INFO* cs;
- CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0);
- par.m_cs = cs;
+ if (strcmp(par.m_csname, "random") != 0) {
+ CHARSET_INFO* cs;
+ CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0);
+ par.m_cs = cs;
+ }
+ // con
Con con;
CHK(con.connect() == 0);
par.m_con = &con;
+ // threads
g_thrlist = new Thr* [par.m_threads];
unsigned n;
for (n = 0; n < par.m_threads; n++) {
@@ -3320,16 +3776,18 @@ runtest(Par par)
const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
continue;
+ makebuiltintables(par);
LL1("case " << tcase.m_name << " - " << tcase.m_desc);
for (unsigned j = 0; j < tabcount; j++) {
- if (! usetable(j))
+ if (tablist[j] == 0)
continue;
- const Tab& tab = tablist[j];
+ const Tab& tab = *tablist[j];
par.m_tab = &tab;
- delete par.m_set;
par.m_set = new Set(tab, par.m_totrows);
LL1("table " << tab.m_name);
CHK(tcase.m_func(par) == 0);
+ delete par.m_set;
+ par.m_set = 0;
}
}
}
@@ -3353,7 +3811,7 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
{
ndb_init();
if (ndbout_mutex == NULL)
- ndbout_mutex= NdbMutex_Create();
+ ndbout_mutex = NdbMutex_Create();
while (++argv, --argc > 0) {
const char* arg = argv[0];
if (*arg != '-') {
@@ -3381,6 +3839,10 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
continue;
}
}
+ if (strcmp(arg, "-collsp") == 0) {
+ g_opt.m_collsp = true;
+ continue;
+ }
if (strcmp(arg, "-core") == 0) {
g_opt.m_core = true;
continue;
@@ -3517,7 +3979,6 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
if (runtest(par) < 0)
goto failed;
}
- // always exit with NDBT code
ok:
return NDBT_ProgramExit(NDBT_OK);
failed:
diff --git a/ndb/tools/desc.cpp b/ndb/tools/desc.cpp
index c5e9efdfa8a..2f59d5fcd0b 100644
--- a/ndb/tools/desc.cpp
+++ b/ndb/tools/desc.cpp
@@ -102,7 +102,7 @@ int main(int argc, char** argv){
unsigned j;
for (j= 0; (int)j < pTab->getNoOfPrimaryKeys(); j++)
{
- const NdbDictionary::Column * col = pTab->getColumn(j);
+ const NdbDictionary::Column * col = pTab->getColumn(pTab->getPrimaryKey(j));
ndbout << col->getName();
if ((int)j < pTab->getNoOfPrimaryKeys()-1)
ndbout << ", ";