diff options
author | unknown <pekka@mysql.com> | 2004-12-13 11:17:55 +0100 |
---|---|---|
committer | unknown <pekka@mysql.com> | 2004-12-13 11:17:55 +0100 |
commit | 756572e12d1efe1df81fb85df38dd1aad97d45dc (patch) | |
tree | 419a37a372189f52399734df878b5c741078a86a | |
parent | a59a6361e8521458f2a9b5867681ce1a86af468b (diff) | |
parent | a7fe1c393391f77f90a84ed2240542af3ca33ad4 (diff) | |
download | mariadb-git-756572e12d1efe1df81fb85df38dd1aad97d45dc.tar.gz |
Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-5.0-ndb
into mysql.com:/space/pekka/ndb/version/my50-cc
31 files changed, 1592 insertions, 984 deletions
diff --git a/ndb/include/kernel/AttributeDescriptor.hpp b/ndb/include/kernel/AttributeDescriptor.hpp index 071d45e2607..789325c7939 100644 --- a/ndb/include/kernel/AttributeDescriptor.hpp +++ b/ndb/include/kernel/AttributeDescriptor.hpp @@ -19,6 +19,8 @@ class AttributeDescriptor { friend class Dbdict; + friend class Dbtc; + friend class Dbacc; friend class Dbtup; friend class Dbtux; @@ -36,6 +38,7 @@ private: static Uint32 getType(const Uint32 &); static Uint32 getSize(const Uint32 &); + static Uint32 getSizeInBytes(const Uint32 &); static Uint32 getSizeInWords(const Uint32 &); static Uint32 getArrayType(const Uint32 &); static Uint32 getArraySize(const Uint32 &); @@ -79,6 +82,8 @@ private: #define AD_SIZE_SHIFT (4) #define AD_SIZE_MASK (7) +#define AD_SIZE_IN_BYTES_SHIFT (3) + #define AD_SIZE_IN_WORDS_OFFSET (31) #define AD_SIZE_IN_WORDS_SHIFT (5) @@ -187,6 +192,13 @@ AttributeDescriptor::getSize(const Uint32 & desc){ inline Uint32 +AttributeDescriptor::getSizeInBytes(const Uint32 & desc){ + return (getArraySize(desc) << getSize(desc)) + >> AD_SIZE_IN_BYTES_SHIFT; +} + +inline +Uint32 AttributeDescriptor::getSizeInWords(const Uint32 & desc){ return ((getArraySize(desc) << getSize(desc)) + AD_SIZE_IN_WORDS_OFFSET) diff --git a/ndb/include/kernel/ndb_limits.h b/ndb/include/kernel/ndb_limits.h index 4ed38a194e4..384882d6e5f 100644 --- a/ndb/include/kernel/ndb_limits.h +++ b/ndb/include/kernel/ndb_limits.h @@ -119,6 +119,11 @@ #define NDB_BLOB_HEAD_SIZE 2 /* sizeof(NdbBlob::Head) >> 2 */ /* + * Character sets. + */ +#define MAX_XFRM_MULTIPLY 8 /* max expansion when normalizing */ + +/* * Long signals */ #define NDB_SECTION_SEGMENT_SZ 60 diff --git a/ndb/include/kernel/signaldata/TuxBound.hpp b/ndb/include/kernel/signaldata/TuxBound.hpp index d829d6e82dd..7e12897407b 100644 --- a/ndb/include/kernel/signaldata/TuxBound.hpp +++ b/ndb/include/kernel/signaldata/TuxBound.hpp @@ -34,7 +34,9 @@ public: enum ErrorCode { InvalidAttrInfo = 4110, InvalidBounds = 4259, - OutOfBuffers = 873 + OutOfBuffers = 873, + InvalidCharFormat = 744, + TooMuchAttrInfo = 823 }; STATIC_CONST( SignalLength = 3 ); private: diff --git a/ndb/include/util/NdbSqlUtil.hpp b/ndb/include/util/NdbSqlUtil.hpp index 55fb76df354..3deaf81cfc7 100644 --- a/ndb/include/util/NdbSqlUtil.hpp +++ b/ndb/include/util/NdbSqlUtil.hpp @@ -37,17 +37,23 @@ public: const char* s2, unsigned n2, bool padded); /** - * Compare kernel attribute values. Returns -1, 0, +1 for less, - * equal, greater, respectively. Parameters are pointers to values, - * full attribute size in words, and size of available data in words. - * There is also pointer to type specific extra info. Char types - * receive CHARSET_INFO in it. + * Compare attribute values. Returns -1, 0, +1 for less, equal, + * greater, respectively. Parameters are pointers to values and their + * lengths in bytes. The lengths can differ. * - * If available size is less than full size, CmpUnknown may be - * returned. If a value cannot be parsed, it compares like NULL i.e. - * less than any valid value. + * First value is a full value but second value can be partial. If + * the partial value is not enough to determine the result, CmpUnknown + * will be returned. A shorter second value is not necessarily + * partial. Partial values are allowed only for types where prefix + * comparison is possible (basically, binary types). + * + * First parameter is a pointer to type specific extra info. Char + * types receive CHARSET_INFO in it. + * + * If a value cannot be parsed, it compares like NULL i.e. less than + * any valid value. */ - typedef int Cmp(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size); + typedef int Cmp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full); enum CmpResult { CmpLess = -1, diff --git a/ndb/src/common/util/NdbSqlUtil.cpp b/ndb/src/common/util/NdbSqlUtil.cpp index 6798655a71a..36b5075ef1b 100644 --- a/ndb/src/common/util/NdbSqlUtil.cpp +++ b/ndb/src/common/util/NdbSqlUtil.cpp @@ -70,7 +70,7 @@ NdbSqlUtil::char_like(const char* s1, unsigned n1, return i1 == n2 && i2 == n2; } -/** +/* * Data types. */ @@ -138,7 +138,7 @@ NdbSqlUtil::m_typeList[] = { }, { Type::Varchar, - cmpVarchar + NULL // cmpVarchar }, { Type::Binary, @@ -146,23 +146,23 @@ NdbSqlUtil::m_typeList[] = { }, { Type::Varbinary, - cmpVarbinary + NULL // cmpVarbinary }, { Type::Datetime, - cmpDatetime + NULL // cmpDatetime }, { Type::Timespec, - cmpTimespec + NULL // cmpTimespec }, { Type::Blob, - cmpBlob + NULL // cmpBlob }, { Type::Text, - cmpText + NULL // cmpText } }; @@ -195,374 +195,299 @@ NdbSqlUtil::getTypeBinary(Uint32 typeId) return getType(typeId); } -// compare +/* + * Comparison functions. + */ int -NdbSqlUtil::cmpTinyint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpTinyint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - union { Uint32 p[1]; Int8 v; } u1, u2; - u1.p[0] = p1[0]; - u2.p[0] = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; + if (n2 >= sizeof(Int8)) { + Int8 v1, v2; + memcpy(&v1, p1, sizeof(Int8)); + memcpy(&v2, p2, sizeof(Int8)); + if (v1 < v2) + return -1; + if (v1 > v2) + return +1; + return 0; + } + assert(! full); + return CmpUnknown; } int -NdbSqlUtil::cmpTinyunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpTinyunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - union { Uint32 p[1]; Uint8 v; } u1, u2; - u1.p[0] = p1[0]; - u2.p[0] = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; + if (n2 >= sizeof(Uint8)) { + Uint8 v1, v2; + memcpy(&v1, p1, sizeof(Uint8)); + memcpy(&v2, p2, sizeof(Uint8)); + if (v1 < v2) + return -1; + if (v1 > v2) + return +1; + return 0; + } + assert(! full); + return CmpUnknown; } int -NdbSqlUtil::cmpSmallint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpSmallint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - union { Uint32 p[1]; Int16 v; } u1, u2; - u1.p[0] = p1[0]; - u2.p[0] = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; + if (n2 >= sizeof(Int16)) { + Int16 v1, v2; + memcpy(&v1, p1, sizeof(Int16)); + memcpy(&v2, p2, sizeof(Int16)); + if (v1 < v2) + return -1; + if (v1 > v2) + return +1; + return 0; + } + assert(! full); + return CmpUnknown; } int -NdbSqlUtil::cmpSmallunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpSmallunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - union { Uint32 p[1]; Uint16 v; } u1, u2; - u1.p[0] = p1[0]; - u2.p[0] = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; + if (n2 >= sizeof(Uint16)) { + Uint16 v1, v2; + memcpy(&v1, p1, sizeof(Uint16)); + memcpy(&v2, p2, sizeof(Uint16)); + if (v1 < v2) + return -1; + if (v1 > v2) + return +1; + return 0; + } + assert(! full); + return CmpUnknown; } int -NdbSqlUtil::cmpMediumint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpMediumint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - union { const Uint32* p; const unsigned char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - Int32 v1 = sint3korr(u1.v); - Int32 v2 = sint3korr(u2.v); - if (v1 < v2) - return -1; - if (v1 > v2) - return +1; - return 0; + if (n2 >= 3) { + Int32 v1, v2; + v1 = sint3korr((const uchar*)p1); + v2 = sint3korr((const uchar*)p2); + if (v1 < v2) + return -1; + if (v1 > v2) + return +1; + return 0; + } + assert(! full); + return CmpUnknown; } int -NdbSqlUtil::cmpMediumunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpMediumunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - union { const Uint32* p; const unsigned char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - Uint32 v1 = uint3korr(u1.v); - Uint32 v2 = uint3korr(u2.v); - if (v1 < v2) - return -1; - if (v1 > v2) - return +1; - return 0; + if (n2 >= 3) { + Uint32 v1, v2; + v1 = uint3korr((const uchar*)p1); + v2 = uint3korr((const uchar*)p2); + if (v1 < v2) + return -1; + if (v1 > v2) + return +1; + return 0; + } + assert(! full); + return CmpUnknown; } int -NdbSqlUtil::cmpInt(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpInt(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - union { Uint32 p[1]; Int32 v; } u1, u2; - u1.p[0] = p1[0]; - u2.p[0] = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; + if (n2 >= sizeof(Int32)) { + Int32 v1, v2; + memcpy(&v1, p1, sizeof(Int32)); + memcpy(&v2, p2, sizeof(Int32)); + if (v1 < v2) + return -1; + if (v1 > v2) + return +1; + return 0; + } + assert(! full); + return CmpUnknown; } int -NdbSqlUtil::cmpUnsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpUnsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - union { Uint32 p[1]; Uint32 v; } u1, u2; - u1.v = p1[0]; - u2.v = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; + if (n2 >= sizeof(Uint32)) { + Uint32 v1, v2; + memcpy(&v1, p1, sizeof(Uint32)); + memcpy(&v2, p2, sizeof(Uint32)); + if (v1 < v2) + return -1; + if (v1 > v2) + return +1; + return 0; + } + assert(! full); + return CmpUnknown; } int -NdbSqlUtil::cmpBigint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpBigint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - if (size >= 2) { - union { Uint32 p[2]; Int64 v; } u1, u2; - u1.p[0] = p1[0]; - u1.p[1] = p1[1]; - u2.p[0] = p2[0]; - u2.p[1] = p2[1]; - if (u1.v < u2.v) + if (n2 >= sizeof(Int64)) { + Int64 v1, v2; + memcpy(&v1, p1, sizeof(Int64)); + memcpy(&v2, p2, sizeof(Int64)); + if (v1 < v2) return -1; - if (u1.v > u2.v) + if (v1 > v2) return +1; return 0; } + assert(! full); return CmpUnknown; } int -NdbSqlUtil::cmpBigunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpBigunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - if (size >= 2) { - union { Uint32 p[2]; Uint64 v; } u1, u2; - u1.p[0] = p1[0]; - u1.p[1] = p1[1]; - u2.p[0] = p2[0]; - u2.p[1] = p2[1]; - if (u1.v < u2.v) + if (n2 >= sizeof(Uint64)) { + Uint64 v1, v2; + memcpy(&v1, p1, sizeof(Uint64)); + memcpy(&v2, p2, sizeof(Uint64)); + if (v1 < v2) return -1; - if (u1.v > u2.v) + if (v1 > v2) return +1; return 0; } + assert(! full); return CmpUnknown; } int -NdbSqlUtil::cmpFloat(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpFloat(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - union { Uint32 p[1]; float v; } u1, u2; - u1.p[0] = p1[0]; - u2.p[0] = p2[0]; - // no format check - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; + if (n2 >= sizeof(float)) { + float v1, v2; + memcpy(&v1, p1, sizeof(float)); + memcpy(&v2, p2, sizeof(float)); + if (v1 < v2) + return -1; + if (v1 > v2) + return +1; + return 0; + } + assert(! full); + return CmpUnknown; } int -NdbSqlUtil::cmpDouble(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpDouble(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - if (size >= 2) { - union { Uint32 p[2]; double v; } u1, u2; - u1.p[0] = p1[0]; - u1.p[1] = p1[1]; - u2.p[0] = p2[0]; - u2.p[1] = p2[1]; - // no format check - if (u1.v < u2.v) + if (n2 >= sizeof(double)) { + double v1, v2; + memcpy(&v1, p1, sizeof(double)); + memcpy(&v2, p2, sizeof(double)); + if (v1 < v2) return -1; - if (u1.v > u2.v) + if (v1 > v2) return +1; return 0; } + assert(! full); return CmpUnknown; } +// not used by MySQL or NDB int -NdbSqlUtil::cmpDecimal(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpDecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - // not used by MySQL or NDB assert(false); return 0; } int -NdbSqlUtil::cmpChar(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { // collation does not work on prefix for some charsets - assert(full == size && size > 0); - /* - * Char is blank-padded to length and null-padded to word size. - */ - union { const Uint32* p; const uchar* v; } u1, u2; - u1.p = p1; - u2.p = p2; + assert(full); + const uchar* v1 = (const uchar*)p1; + const uchar* v2 = (const uchar*)p2; // not const in MySQL CHARSET_INFO* cs = (CHARSET_INFO*)(info); - // length in bytes including null padding to Uint32 - uint l1 = (full << 2); - int k = (*cs->coll->strnncollsp)(cs, u1.v, l1, u2.v, l1, 0); + // compare with space padding + int k = (*cs->coll->strnncollsp)(cs, v1, n1, v2, n2, false); return k < 0 ? -1 : k > 0 ? +1 : 0; } +// waiting for MySQL and new NDB implementation int -NdbSqlUtil::cmpVarchar(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - /* - * Varchar is not allowed to contain a null byte and the value is - * null-padded. Therefore comparison does not need to use the length. - * - * Not used before MySQL 5.0. Format is likely to change. Handle - * only binary collation for now. - */ - union { const Uint32* p; const char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - // skip length in first 2 bytes - int k = strncmp(u1.v + 2, u2.v + 2, (size << 2) - 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; + assert(false); + return 0; } int -NdbSqlUtil::cmpBinary(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - /* - * Binary data of full length. Compare bytewise. - */ - union { const Uint32* p; const unsigned char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - int k = memcmp(u1.v, u2.v, size << 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; + const uchar* v1 = (const uchar*)p1; + const uchar* v2 = (const uchar*)p2; + // compare as binary strings + unsigned n = (n1 <= n2 ? n1 : n2); + int k = memcmp(v1, v2, n); + if (k == 0) { + if (full) + k = (int)n1 - (int)n2; + else + k = (int)n - (int)n2; + } + return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown; } +// waiting for MySQL and new NDB implementation int -NdbSqlUtil::cmpVarbinary(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - /* - * Binary data of variable length padded with nulls. The comparison - * does not need to use the length. - * - * Not used before MySQL 5.0. Format is likely to change. - */ - union { const Uint32* p; const unsigned char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - // skip length in first 2 bytes - int k = memcmp(u1.v + 2, u2.v + 2, (size << 2) - 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; + assert(false); + return 0; } +// not used by MySQL or NDB int -NdbSqlUtil::cmpDatetime(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpDatetime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - /* - * Datetime is CC YY MM DD hh mm ss \0 - * - * Not used via MySQL. - */ - union { const Uint32* p; const unsigned char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - // no format check - int k = memcmp(u1.v, u2.v, 4); - if (k != 0) - return k < 0 ? -1 : +1; - if (size >= 2) { - k = memcmp(u1.v + 4, u2.v + 4, 4); - return k < 0 ? -1 : k > 0 ? +1 : 0; - } - return CmpUnknown; + assert(false); + return 0; } +// not used by MySQL or NDB int -NdbSqlUtil::cmpTimespec(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpTimespec(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - /* - * Timespec is CC YY MM DD hh mm ss \0 NN NN NN NN - * - * Not used via MySQL. - */ - union { const Uint32* p; const unsigned char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - // no format check - int k = memcmp(u1.v, u2.v, 4); - if (k != 0) - return k < 0 ? -1 : +1; - if (size >= 2) { - k = memcmp(u1.v + 4, u2.v + 4, 4); - if (k != 0) - return k < 0 ? -1 : +1; - if (size >= 3) { - Uint32 n1 = *(const Uint32*)(u1.v + 8); - Uint32 n2 = *(const Uint32*)(u2.v + 8); - if (n1 < n2) - return -1; - if (n2 > n1) - return +1; - return 0; - } - } - return CmpUnknown; + assert(false); + return 0; } +// not supported int -NdbSqlUtil::cmpBlob(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpBlob(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(full >= size && size > 0); - /* - * Blob comparison is on the inline bytes (null padded). - */ - const unsigned head = NDB_BLOB_HEAD_SIZE; - // skip blob head - if (size >= head + 1) { - union { const Uint32* p; const unsigned char* v; } u1, u2; - u1.p = p1 + head; - u2.p = p2 + head; - int k = memcmp(u1.v, u2.v, (size - head) << 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; - } - return CmpUnknown; + assert(false); + return 0; } +// not supported int -NdbSqlUtil::cmpText(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - // collation does not work on prefix for some charsets - assert(full == size && size > 0); - /* - * Text comparison is on the inline bytes (blank padded). Currently - * not supported for multi-byte charsets. - */ - const unsigned head = NDB_BLOB_HEAD_SIZE; - // skip blob head - if (size >= head + 1) { - union { const Uint32* p; const uchar* v; } u1, u2; - u1.p = p1 + head; - u2.p = p2 + head; - // not const in MySQL - CHARSET_INFO* cs = (CHARSET_INFO*)(info); - // length in bytes including null padding to Uint32 - uint l1 = (full << 2); - int k = (*cs->coll->strnncollsp)(cs, u1.v, l1, u2.v, l1,0); - return k < 0 ? -1 : k > 0 ? +1 : 0; - } - return CmpUnknown; + assert(false); + return 0; } // check charset @@ -572,8 +497,6 @@ NdbSqlUtil::usable_in_pk(Uint32 typeId, const void* info) { const Type& type = getType(typeId); switch (type.m_typeId) { - case Type::Undefined: - break; case Type::Char: { const CHARSET_INFO *cs = (const CHARSET_INFO*)info; @@ -582,11 +505,12 @@ NdbSqlUtil::usable_in_pk(Uint32 typeId, const void* info) cs->cset != 0 && cs->coll != 0 && cs->coll->strnxfrm != 0 && - cs->strxfrm_multiply <= 1; // current limitation + cs->strxfrm_multiply <= MAX_XFRM_MULTIPLY; } break; + case Type::Undefined: case Type::Varchar: - return true; // Varchar not used via MySQL + case Type::Varbinary: case Type::Blob: case Type::Text: break; @@ -606,9 +530,9 @@ bool NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info) { const Type& type = getType(typeId); + if (type.m_cmp == NULL) + return false; switch (type.m_typeId) { - case Type::Undefined: - break; case Type::Char: { const CHARSET_INFO *cs = (const CHARSET_INFO*)info; @@ -618,92 +542,17 @@ NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info) cs->coll != 0 && cs->coll->strnxfrm != 0 && cs->coll->strnncollsp != 0 && - cs->strxfrm_multiply <= 1; // current limitation + cs->strxfrm_multiply <= MAX_XFRM_MULTIPLY; } break; + case Type::Undefined: case Type::Varchar: - return true; // Varchar not used via MySQL + case Type::Varbinary: + case Type::Blob: case Type::Text: - { - const CHARSET_INFO *cs = (const CHARSET_INFO*)info; - return - cs != 0 && - cs->mbmaxlen == 1 && // extra limitation - cs->cset != 0 && - cs->coll != 0 && - cs->coll->strnxfrm != 0 && - cs->coll->strnncollsp != 0 && - cs->strxfrm_multiply <= 1; // current limitation - } break; default: return true; } return false; } - -#ifdef NDB_SQL_UTIL_TEST - -#include <NdbTick.h> -#include <NdbOut.hpp> - -struct Testcase { - int op; // 1=compare 2=like - int res; - const char* s1; - const char* s2; - int pad; -}; -const Testcase testcase[] = { - { 2, 1, "abc", "abc", 0 }, - { 2, 1, "abc", "abc%", 0 }, - { 2, 1, "abcdef", "abc%", 0 }, - { 2, 1, "abcdefabcdefabcdef", "abc%", 0 }, - { 2, 1, "abcdefabcdefabcdef", "abc%f", 0 }, - { 2, 0, "abcdefabcdefabcdef", "abc%z", 0 }, - { 2, 1, "abcdefabcdefabcdef", "%f", 0 }, - { 2, 1, "abcdef", "a%b%c%d%e%f", 0 }, - { 0, 0, 0, 0 } -}; - -int -main(int argc, char** argv) -{ - ndb_init(); // for charsets - unsigned count = argc > 1 ? atoi(argv[1]) : 1000000; - ndbout_c("count = %u", count); - assert(count != 0); - for (const Testcase* t = testcase; t->s1 != 0; t++) { - ndbout_c("%d = '%s' %s '%s' pad=%d", - t->res, t->s1, t->op == 1 ? "comp" : "like", t->s2); - NDB_TICKS x1 = NdbTick_CurrentMillisecond(); - unsigned n1 = strlen(t->s1); - unsigned n2 = strlen(t->s2); - for (unsigned i = 0; i < count; i++) { - if (t->op == 1) { - int res = NdbSqlUtil::char_compare(t->s1, n1, t->s2, n2, t->pad); - assert(res == t->res); - continue; - } - if (t->op == 2) { - int res = NdbSqlUtil::char_like(t->s1, n1, t->s2, n2, t->pad); - assert(res == t->res); - continue; - } - assert(false); - } - NDB_TICKS x2 = NdbTick_CurrentMillisecond(); - if (x2 < x1) - x2 = x1; - double usec = 1000000.0 * double(x2 - x1) / double(count); - ndbout_c("time %.0f usec per call", usec); - } - // quick check - for (unsigned i = 0; i < sizeof(m_typeList) / sizeof(m_typeList[0]); i++) { - const NdbSqlUtil::Type& t = m_typeList[i]; - assert(t.m_typeId == i); - } - return 0; -} - -#endif diff --git a/ndb/src/kernel/blocks/dbacc/Dbacc.hpp b/ndb/src/kernel/blocks/dbacc/Dbacc.hpp index af8ee136934..6a65da5bb6a 100644 --- a/ndb/src/kernel/blocks/dbacc/Dbacc.hpp +++ b/ndb/src/kernel/blocks/dbacc/Dbacc.hpp @@ -607,8 +607,7 @@ struct Fragmentrec { //----------------------------------------------------------------------------- // elementLength: Length of element in bucket and overflow pages -// keyLength: Length of key (== 0 if long key or variable key length) -// wl-2066 always Length of key +// keyLength: Length of key //----------------------------------------------------------------------------- Uint8 elementLength; Uint16 keyLength; @@ -637,6 +636,11 @@ struct Fragmentrec { //----------------------------------------------------------------------------- Uint8 nodetype; Uint8 stopQueOp; + +//----------------------------------------------------------------------------- +// flag to avoid accessing table record if no char attributes +//----------------------------------------------------------------------------- + Uint8 hasCharAttr; }; typedef Ptr<Fragmentrec> FragmentrecPtr; @@ -719,6 +723,7 @@ struct Operationrec { State transactionstate; Uint16 elementContainer; Uint16 tupkeylen; + Uint32 xfrmtupkeylen; Uint32 userblockref; Uint32 scanBits; Uint8 elementIsDisappeared; @@ -846,6 +851,13 @@ struct Tabrec { Uint32 fragptrholder[MAX_FRAG_PER_NODE]; Uint32 tabUserPtr; BlockReference tabUserRef; + + Uint8 noOfKeyAttr; + Uint8 hasCharAttr; + struct KeyAttr { + Uint32 attributeDescriptor; + CHARSET_INFO* charsetInfo; + } keyAttr[MAX_ATTRIBUTES_IN_INDEX]; }; typedef Ptr<Tabrec> TabrecPtr; @@ -891,6 +903,7 @@ private: void execACCKEYREQ(Signal* signal); void execACCSEIZEREQ(Signal* signal); void execACCFRAGREQ(Signal* signal); + void execTC_SCHVERREQ(Signal* signal); void execACC_SRREQ(Signal* signal); void execNEXT_SCANREQ(Signal* signal); void execACC_ABORTREQ(Signal* signal); @@ -1016,7 +1029,7 @@ private: void increaselistcont(Signal* signal); void seizeLeftlist(Signal* signal); void seizeRightlist(Signal* signal); - void readTablePk(Uint32 localkey1); + Uint32 readTablePk(Uint32 localkey1); void getElement(Signal* signal); void getdirindex(Signal* signal); void commitdelete(Signal* signal, bool systemRestart); @@ -1123,6 +1136,8 @@ private: void lcp_write_op_to_undolog(Signal* signal); void reenable_expand_after_redo_log_exection_complete(Signal*); + // charsets + void xfrmKeyData(Signal* signal); // Initialisation void initData(); diff --git a/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp b/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp index c9b6c0ea17d..0b2e8aeeb9a 100644 --- a/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp +++ b/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp @@ -179,6 +179,7 @@ Dbacc::Dbacc(const class Configuration & conf): addRecSignal(GSN_ACCKEYREQ, &Dbacc::execACCKEYREQ); addRecSignal(GSN_ACCSEIZEREQ, &Dbacc::execACCSEIZEREQ); addRecSignal(GSN_ACCFRAGREQ, &Dbacc::execACCFRAGREQ); + addRecSignal(GSN_TC_SCHVERREQ, &Dbacc::execTC_SCHVERREQ); addRecSignal(GSN_ACC_SRREQ, &Dbacc::execACC_SRREQ); addRecSignal(GSN_NEXT_SCANREQ, &Dbacc::execNEXT_SCANREQ); addRecSignal(GSN_ACC_ABORTREQ, &Dbacc::execACC_ABORTREQ); diff --git a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 1a5e22ac70a..eaf2176b390 100644 --- a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -16,6 +16,7 @@ #define DBACC_C #include "Dbacc.hpp" +#include <my_sys.h> #include <AttributeHeader.hpp> #include <signaldata/AccFrag.hpp> @@ -27,6 +28,7 @@ #include <signaldata/FsRemoveReq.hpp> #include <signaldata/DropTab.hpp> #include <signaldata/DumpStateOrd.hpp> +#include <SectionReader.hpp> // TO_DO_RONM is a label for comments on what needs to be improved in future versions // when more time is given. @@ -1033,6 +1035,12 @@ void Dbacc::initialiseTableRec(Signal* signal) tabptr.p->fragholder[i] = RNIL; tabptr.p->fragptrholder[i] = RNIL; }//for + tabptr.p->noOfKeyAttr = 0; + tabptr.p->hasCharAttr = 0; + for (Uint32 k = 0; k < MAX_ATTRIBUTES_IN_INDEX; k++) { + tabptr.p->keyAttr[k].attributeDescriptor = 0; + tabptr.p->keyAttr[k].charsetInfo = 0; + } }//for }//Dbacc::initialiseTableRec() @@ -1188,6 +1196,66 @@ void Dbacc::addFragRefuse(Signal* signal, Uint32 errorCode) }//Dbacc::addFragRefuseEarly() void +Dbacc::execTC_SCHVERREQ(Signal* signal) +{ + jamEntry(); + if (! assembleFragments(signal)) { + jam(); + return; + } + tabptr.i = signal->theData[0]; + ptrCheckGuard(tabptr, ctablesize, tabrec); + Uint32 noOfKeyAttr = signal->theData[6]; + ndbrequire(noOfKeyAttr <= MAX_ATTRIBUTES_IN_INDEX); + Uint32 hasCharAttr = 0; + + SegmentedSectionPtr s0Ptr; + signal->getSection(s0Ptr, 0); + SectionReader r0(s0Ptr, getSectionSegmentPool()); + Uint32 i = 0; + while (i < noOfKeyAttr) { + jam(); + Uint32 attributeDescriptor = ~0; + Uint32 csNumber = ~0; + if (! r0.getWord(&attributeDescriptor) || + ! r0.getWord(&csNumber)) { + jam(); + break; + } + CHARSET_INFO* cs = 0; + if (csNumber != 0) { + cs = all_charsets[csNumber]; + ndbrequire(cs != 0); + hasCharAttr = 1; + } + tabptr.p->keyAttr[i].attributeDescriptor = attributeDescriptor; + tabptr.p->keyAttr[i].charsetInfo = cs; + i++; + } + ndbrequire(i == noOfKeyAttr); + releaseSections(signal); + + tabptr.p->noOfKeyAttr = noOfKeyAttr; + tabptr.p->hasCharAttr = hasCharAttr; + + // copy char attr flag to each fragment + for (Uint32 i1 = 0; i1 < MAX_FRAG_PER_NODE; i1++) { + jam(); + if (tabptr.p->fragptrholder[i1] != RNIL) { + rootfragrecptr.i = tabptr.p->fragptrholder[i1]; + ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec); + for (Uint32 i2 = 0; i2 < 2; i2++) { + fragrecptr.i = rootfragrecptr.p->fragmentptr[i2]; + ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); + fragrecptr.p->hasCharAttr = hasCharAttr; + } + } + } + + // no reply to DICT +} + +void Dbacc::execDROP_TAB_REQ(Signal* signal){ jamEntry(); DropTabReq* req = (DropTabReq*)signal->getDataPtr(); @@ -1550,6 +1618,7 @@ void Dbacc::initOpRec(Signal* signal) operationRecPtr.p->hashValue = signal->theData[3]; operationRecPtr.p->tupkeylen = signal->theData[4]; + operationRecPtr.p->xfrmtupkeylen = signal->theData[4]; operationRecPtr.p->transId1 = signal->theData[5]; operationRecPtr.p->transId2 = signal->theData[6]; operationRecPtr.p->transactionstate = ACTIVE; @@ -1664,6 +1733,10 @@ void Dbacc::execACCKEYREQ(Signal* signal) ndbrequire(operationRecPtr.p->transactionstate == IDLE); initOpRec(signal); + // normalize key if any char attr + if (! operationRecPtr.p->isAccLockReq && fragrecptr.p->hasCharAttr) + xfrmKeyData(signal); + /*---------------------------------------------------------------*/ /* */ /* WE WILL USE THE HASH VALUE TO LOOK UP THE PROPER MEMORY */ @@ -1758,6 +1831,54 @@ void Dbacc::execACCKEYREQ(Signal* signal) return; }//Dbacc::execACCKEYREQ() +void +Dbacc::xfrmKeyData(Signal* signal) +{ + tabptr.i = fragrecptr.p->myTableId; + ptrCheckGuard(tabptr, ctablesize, tabrec); + + Uint32 dst[1024]; + Uint32 dstSize = (sizeof(dst) >> 2); + Uint32* src = &signal->theData[7]; + const Uint32 noOfKeyAttr = tabptr.p->noOfKeyAttr; + Uint32 dstPos = 0; + Uint32 srcPos = 0; + Uint32 i = 0; + + while (i < noOfKeyAttr) { + const Tabrec::KeyAttr& keyAttr = tabptr.p->keyAttr[i]; + + Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(keyAttr.attributeDescriptor); + Uint32 srcWords = (srcBytes + 3) / 4; + Uint32 dstWords = ~0; + uchar* dstPtr = (uchar*)&dst[dstPos]; + const uchar* srcPtr = (const uchar*)&src[srcPos]; + CHARSET_INFO* cs = keyAttr.charsetInfo; + + if (cs == 0) { + jam(); + memcpy(dstPtr, srcPtr, srcWords << 2); + dstWords = srcWords; + } else { + jam(); + Uint32 xmul = cs->strxfrm_multiply; + if (xmul == 0) + xmul = 1; + Uint32 dstLen = xmul * srcBytes; + ndbrequire(dstLen <= ((dstSize - dstPos) << 2)); + uint n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes); + while ((n & 3) != 0) + dstPtr[n++] = 0; + dstWords = (n >> 2); + } + dstPos += dstWords; + srcPos += srcWords; + i++; + } + memcpy(src, dst, dstPos << 2); + operationRecPtr.p->xfrmtupkeylen = dstPos; +} + void Dbacc::accIsLockedLab(Signal* signal) { ndbrequire(csystemRestart == ZFALSE); @@ -1848,6 +1969,7 @@ void Dbacc::insertelementLab(Signal* signal) }//if }//if if (fragrecptr.p->keyLength != operationRecPtr.p->tupkeylen) { + // historical ndbrequire(fragrecptr.p->keyLength == 0); }//if @@ -3251,7 +3373,7 @@ void Dbacc::getdirindex(Signal* signal) ptrCheckGuard(gdiPageptr, cpagesize, page8); }//Dbacc::getdirindex() -void +Uint32 Dbacc::readTablePk(Uint32 localkey1) { Uint32 tableId = fragrecptr.p->myTableId; @@ -3259,10 +3381,11 @@ Dbacc::readTablePk(Uint32 localkey1) Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS; Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1); #ifdef VM_TRACE - memset(ckeys, 0x1f, fragrecptr.p->keyLength << 2); + memset(ckeys, 0x1f, (fragrecptr.p->keyLength * MAX_XFRM_MULTIPLY) << 2); #endif - int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys); - ndbrequire(ret == fragrecptr.p->keyLength); + int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys, true); + ndbrequire(ret > 0); + return ret; } /* --------------------------------------------------------------------------------- */ @@ -3306,7 +3429,6 @@ void Dbacc::getElement(Signal* signal) Uint32 tgeNextptrtype; register Uint32 tgeKeyptr; register Uint32 tgeRemLen; - register Uint32 tgeCompareLen; register Uint32 TelemLen = fragrecptr.p->elementLength; register Uint32* Tkeydata = (Uint32*)&signal->theData[7]; @@ -3314,7 +3436,6 @@ void Dbacc::getElement(Signal* signal) tgePageindex = tgdiPageindex; gePageptr = gdiPageptr; tgeResult = ZFALSE; - tgeCompareLen = fragrecptr.p->keyLength; /* * The value seached is * - table key for ACCKEYREQ, stored in TUP @@ -3381,12 +3502,15 @@ void Dbacc::getElement(Signal* signal) Uint32 localkey2 = 0; bool found; if (! searchLocalKey) { - readTablePk(localkey1); - found = (memcmp(Tkeydata, ckeys, fragrecptr.p->keyLength << 2) == 0); + Uint32 len = readTablePk(localkey1); + found = (len == operationRecPtr.p->xfrmtupkeylen) && + (memcmp(Tkeydata, ckeys, len << 2) == 0); } else { + jam(); found = (localkey1 == Tkeydata[0]); } if (found) { + jam(); tgeLocked = ElementHeader::getLocked(tgeElementHeader); tgeResult = ZTRUE; operationRecPtr.p->localdata[0] = localkey1; @@ -7731,6 +7855,7 @@ void Dbacc::initFragGeneral(FragmentrecPtr regFragPtr) regFragPtr.p->activeDataPage = 0; regFragPtr.p->createLcp = ZFALSE; regFragPtr.p->stopQueOp = ZFALSE; + regFragPtr.p->hasCharAttr = ZFALSE; regFragPtr.p->nextAllocPage = 0; regFragPtr.p->nrWaitWriteUndoExit = 0; regFragPtr.p->lastUndoIsStored = ZFALSE; @@ -8680,6 +8805,7 @@ void Dbacc::srDoUndoLab(Signal* signal) const Uint32 tkeylen = undopageptr.p->undoword[tmpindex]; tmpindex++; operationRecPtr.p->tupkeylen = tkeylen; + operationRecPtr.p->xfrmtupkeylen = 0; // not used operationRecPtr.p->fragptr = fragrecptr.i; ndbrequire(fragrecptr.p->keyLength != 0 && @@ -9750,6 +9876,7 @@ void Dbacc::initScanOpRec(Signal* signal) arrGuard(tisoLocalPtr, 2048); operationRecPtr.p->keydata[0] = isoPageptr.p->word32[tisoLocalPtr]; operationRecPtr.p->tupkeylen = fragrecptr.p->keyLength; + operationRecPtr.p->xfrmtupkeylen = 0; // not used }//Dbacc::initScanOpRec() /* --------------------------------------------------------------------------------- */ diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp index cedd2bed415..17972abc2dd 100644 --- a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp +++ b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp @@ -4318,7 +4318,28 @@ Dbdict::execTAB_COMMITCONF(Signal* signal){ signal->theData[3] = reference(); signal->theData[4] = (Uint32)tabPtr.p->tableType; signal->theData[5] = createTabPtr.p->key; - sendSignal(DBTC_REF, GSN_TC_SCHVERREQ, signal, 6, JBB); + signal->theData[6] = (Uint32)tabPtr.p->noOfPrimkey; + + Uint32 buf[2 * MAX_ATTRIBUTES_IN_INDEX]; + Uint32 sz = 0; + Uint32 tAttr = tabPtr.p->firstAttribute; + while (tAttr != RNIL) { + jam(); + AttributeRecord* aRec = c_attributeRecordPool.getPtr(tAttr); + if (aRec->tupleKey) { + buf[sz++] = aRec->attributeDescriptor; + buf[sz++] = (aRec->extPrecision >> 16); // charset number + } + tAttr = aRec->nextAttrInTable; + } + ndbrequire(sz == 2 * tabPtr.p->noOfPrimkey); + + LinearSectionPtr lsPtr[3]; + lsPtr[0].p = buf; + lsPtr[0].sz = sz; + // note: ACC does not reply + sendSignal(DBACC_REF, GSN_TC_SCHVERREQ, signal, 7, JBB, lsPtr, 1); + sendSignal(DBTC_REF, GSN_TC_SCHVERREQ, signal, 7, JBB, lsPtr, 1); return; } @@ -4785,12 +4806,18 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it, // charset in upper half of precision unsigned csNumber = (attrPtr.p->extPrecision >> 16); if (csNumber != 0) { + /* + * A new charset is first accessed here on this node. + * TODO use separate thread (e.g. via NDBFS) if need to load from file + */ CHARSET_INFO* cs = get_charset(csNumber, MYF(0)); if (cs == NULL) { parseP->errorCode = CreateTableRef::InvalidCharset; parseP->errorLine = __LINE__; return; } + // XXX should be done somewhere in mysql + all_charsets[cs->number] = cs; unsigned i = 0; while (i < noOfCharsets) { if (charsets[i] == csNumber) diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 2b531ede3d1..e141ab09617 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -8280,7 +8280,7 @@ Dblqh::readPrimaryKeys(ScanRecord *scanP, TcConnectionrec *tcConP, Uint32 *dst) tableId = tFragPtr.p->tabRef; } - int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, dst); + int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, dst, false); if(0) ndbout_c("readPrimaryKeys(table: %d fragment: %d [ %d %d ] -> %d", tableId, fragId, fragPageId, pageIndex, ret); diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp index ccb7d55396d..9afe4a5da0e 100644 --- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp +++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp @@ -972,6 +972,7 @@ public: typedef Ptr<HostRecord> HostRecordPtr; /* *********** TABLE RECORD ********************************************* */ + /********************************************************/ /* THIS RECORD CONTAINS THE CURRENT SCHEMA VERSION OF */ /* ALL TABLES IN THE SYSTEM. */ @@ -982,13 +983,21 @@ public: Uint8 dropping; Uint8 tableType; Uint8 storedTable; - + + Uint8 noOfKeyAttr; + Uint8 hasCharAttr; + + struct KeyAttr { + Uint32 attributeDescriptor; + CHARSET_INFO* charsetInfo; + } keyAttr[MAX_ATTRIBUTES_IN_INDEX]; + bool checkTable(Uint32 schemaVersion) const { return enabled && !dropping && (schemaVersion == currentSchemaVersion); } - + Uint32 getErrorCode(Uint32 schemaVersion) const; - + struct DropTable { Uint32 senderRef; Uint32 senderData; @@ -1436,6 +1445,7 @@ private: void gcpTcfinished(Signal* signal); void handleGcp(Signal* signal); void hash(Signal* signal); + Uint32 xfrmKeyData(Signal* signal, Uint32* dst, Uint32 dstSize, const Uint32* src); void initApiConnect(Signal* signal); void initApiConnectRec(Signal* signal, ApiConnectRecord * const regApiPtr, diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index 42821e4fe7f..1ce6ffa4817 100644 --- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -20,6 +20,7 @@ #include "md5_hash.hpp" #include <RefConvert.hpp> #include <ndb_limits.h> +#include <my_sys.h> #include <signaldata/EventReport.hpp> #include <signaldata/TcKeyReq.hpp> @@ -63,6 +64,8 @@ #include <signaldata/PackedSignal.hpp> #include <AttributeHeader.hpp> #include <signaldata/DictTabInfo.hpp> +#include <AttributeDescriptor.hpp> +#include <SectionReader.hpp> #include <NdbOut.hpp> #include <DebuggerNames.hpp> @@ -313,6 +316,10 @@ void Dbtc::execREAD_NODESREF(Signal* signal) void Dbtc::execTC_SCHVERREQ(Signal* signal) { jamEntry(); + if (! assembleFragments(signal)) { + jam(); + return; + } tabptr.i = signal->theData[0]; ptrCheckGuard(tabptr, ctabrecFilesize, tableRecord); tabptr.p->currentSchemaVersion = signal->theData[1]; @@ -320,10 +327,41 @@ void Dbtc::execTC_SCHVERREQ(Signal* signal) BlockReference retRef = signal->theData[3]; tabptr.p->tableType = (Uint8)signal->theData[4]; BlockReference retPtr = signal->theData[5]; + Uint32 noOfKeyAttr = signal->theData[6]; + ndbrequire(noOfKeyAttr <= MAX_ATTRIBUTES_IN_INDEX); + Uint32 hasCharAttr = 0; + + SegmentedSectionPtr s0Ptr; + signal->getSection(s0Ptr, 0); + SectionReader r0(s0Ptr, getSectionSegmentPool()); + Uint32 i = 0; + while (i < noOfKeyAttr) { + jam(); + Uint32 attributeDescriptor = ~0; + Uint32 csNumber = ~0; + if (! r0.getWord(&attributeDescriptor) || + ! r0.getWord(&csNumber)) { + jam(); + break; + } + CHARSET_INFO* cs = 0; + if (csNumber != 0) { + cs = all_charsets[csNumber]; + ndbrequire(cs != 0); + hasCharAttr = 1; + } + tabptr.p->keyAttr[i].attributeDescriptor = attributeDescriptor; + tabptr.p->keyAttr[i].charsetInfo = cs; + i++; + } + ndbrequire(i == noOfKeyAttr); + releaseSections(signal); ndbrequire(tabptr.p->enabled == false); tabptr.p->enabled = true; tabptr.p->dropping = false; + tabptr.p->noOfKeyAttr = noOfKeyAttr; + tabptr.p->hasCharAttr = hasCharAttr; signal->theData[0] = tabptr.i; signal->theData[1] = retPtr; @@ -2221,6 +2259,7 @@ void Dbtc::hash(Signal* signal) UintR Tdata3; UintR* Tdata32; Uint64 Tdata[512]; + Uint64 Txfrmdata[512 * MAX_XFRM_MULTIPLY]; CacheRecord * const regCachePtr = cachePtr.p; Tdata32 = (UintR*)&Tdata[0]; @@ -2250,8 +2289,21 @@ void Dbtc::hash(Signal* signal) ti += 4; }//while }//if + + UintR keylen = (UintR)regCachePtr->keylen; + + TableRecordPtr tabptrSave = tabptr; + tabptr.i = regCachePtr->tableref; // table or hash index id + ptrCheckGuard(tabptr, ctabrecFilesize, tableRecord); + if (tabptr.p->hasCharAttr) { + jam(); + keylen = xfrmKeyData(signal, (Uint32*)Txfrmdata, sizeof(Txfrmdata) >> 2, Tdata32); + Tdata32 = (UintR*)&Txfrmdata[0]; + } + tabptr = tabptrSave; + Uint32 tmp[4]; - md5_hash(tmp, (Uint64*)&Tdata32[0], (UintR)regCachePtr->keylen); + md5_hash(tmp, (Uint64*)&Tdata32[0], keylen); thashValue = tmp[0]; if (regCachePtr->distributionKeyIndicator == 1) { @@ -2263,6 +2315,47 @@ void Dbtc::hash(Signal* signal) }//if }//Dbtc::hash() +Uint32 +Dbtc::xfrmKeyData(Signal* signal, Uint32* dst, Uint32 dstSize, const Uint32* src) +{ + const Uint32 noOfKeyAttr = tabptr.p->noOfKeyAttr; + Uint32 dstPos = 0; + Uint32 srcPos = 0; + Uint32 i = 0; + while (i < noOfKeyAttr) { + const TableRecord::KeyAttr& keyAttr = tabptr.p->keyAttr[i]; + + Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(keyAttr.attributeDescriptor); + Uint32 srcWords = (srcBytes + 3) / 4; + Uint32 dstWords = ~0; + uchar* dstPtr = (uchar*)&dst[dstPos]; + const uchar* srcPtr = (const uchar*)&src[srcPos]; + CHARSET_INFO* cs = keyAttr.charsetInfo; + + if (cs == NULL) { + jam(); + memcpy(dstPtr, srcPtr, srcWords << 2); + dstWords = srcWords; + } else { + jam(); + Uint32 xmul = cs->strxfrm_multiply; + if (xmul == 0) + xmul = 1; + Uint32 dstLen = xmul * srcBytes; + ndbrequire(dstLen <= ((dstSize - dstPos) << 2)); + uint n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes); + while ((n & 3) != 0) { + dstPtr[n++] = 0; + } + dstWords = (n >> 2); + } + dstPos += dstWords; + srcPos += srcWords; + i++; + } + return dstPos; +} + /* INIT_API_CONNECT_REC --------------------------- @@ -9973,6 +10066,12 @@ void Dbtc::initTable(Signal* signal) tabptr.p->tableType = 0; tabptr.p->enabled = false; tabptr.p->dropping = false; + tabptr.p->noOfKeyAttr = 0; + tabptr.p->hasCharAttr = 0; + for (unsigned k = 0; k < MAX_ATTRIBUTES_IN_INDEX; k++) { + tabptr.p->keyAttr[k].attributeDescriptor = 0; + tabptr.p->keyAttr[k].charsetInfo = 0; + } }//for }//Dbtc::initTable() diff --git a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp index 1c7662c5d55..6d169d20d16 100644 --- a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp +++ b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp @@ -207,6 +207,8 @@ #define ZTUPLE_DELETED_ERROR 626 #define ZINSERT_ERROR 630 +#define ZINVALID_CHAR_FORMAT 744 + /* SOME WORD POSITIONS OF FIELDS IN SOME HEADERS */ #define ZPAGE_STATE_POS 0 /* POSITION OF PAGE STATE */ @@ -1020,14 +1022,14 @@ public: * for md5 summing and when returning keyinfo. Returns number of * words or negative (-terrorCode) on error. */ - int tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut); + int tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut, bool xfrmFlag); /* * ACC reads primary key without headers into an array of words. At * this point in ACC deconstruction, ACC still uses logical references * to fragment and tuple. */ - int accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut); + int accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut, bool xfrmFlag); /* * TUX checks if tuple is visible to scan. @@ -1637,20 +1639,6 @@ private: bool readBitsNotNULL(Uint32* outBuffer, AttributeHeader*, Uint32, Uint32); bool updateBitsNotNULL(Uint32* inBuffer, Uint32, Uint32); -// ***************************************************************** -// Read char routines optionally (tXfrmFlag) apply strxfrm -// ***************************************************************** - - bool readCharNotNULL(Uint32* outBuffer, - AttributeHeader* ahOut, - Uint32 attrDescriptor, - Uint32 attrDes2); - - bool readCharNULLable(Uint32* outBuffer, - AttributeHeader* ahOut, - Uint32 attrDescriptor, - Uint32 attrDes2); - //------------------------------------------------------------------ //------------------------------------------------------------------ bool nullFlagCheck(Uint32 attrDes2); diff --git a/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp b/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp index 3b0ba1c196f..ab6e0642e11 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp @@ -173,7 +173,7 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu } int -Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut) +Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut, bool xfrmFlag) { ljamEntry(); // use own variables instead of globals @@ -200,7 +200,7 @@ Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* data operPtr.i = RNIL; operPtr.p = NULL; // do it - int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, true); + int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, xfrmFlag); // restore globals tabptr = tabptr_old; fragptr = fragptr_old; @@ -229,7 +229,7 @@ Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* data } int -Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut) +Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut, bool xfrmFlag) { ljamEntry(); // get table @@ -245,7 +245,7 @@ Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIn ndbrequire((pageIndex & 0x1) == 0); Uint32 pageOffset = ZPAGE_HEADER_SIZE + (pageIndex >> 1) * tablePtr.p->tupheadsize; // use TUX routine - optimize later - int ret = tuxReadPk(fragPtr.i, pageId, pageOffset, dataOut); + int ret = tuxReadPk(fragPtr.i, pageId, pageOffset, dataOut, xfrmFlag); return ret; } diff --git a/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp b/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp index 56ae861270c..13593602abc 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp @@ -364,13 +364,8 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ndbrequire(false); }//if if (csNumber != 0) { - CHARSET_INFO* cs = get_charset(csNumber, MYF(0)); - if (cs == NULL) { - ljam(); - terrorCode = TupAddAttrRef::InvalidCharset; - addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId); - return; - } + CHARSET_INFO* cs = all_charsets[csNumber]; + ndbrequire(cs != NULL); Uint32 i = 0; while (i < fragOperPtr.p->charsetIndex) { ljam(); diff --git a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp index 5bcbb482a8c..73cddfd0382 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp @@ -59,10 +59,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) } else { ndbrequire(false); }//if - // replace read function of char attribute + // replace functions for char attribute if (AttributeOffset::getCharsetFlag(attrOffset)) { ljam(); - regTabPtr->readFunctionArray[i] = &Dbtup::readCharNotNULL; + regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNotNULL; + regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNotNULL; } } else { if (AttributeDescriptor::getSize(attrDescriptor) == 0){ @@ -86,10 +87,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHZeroWordNULLable; regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable; }//if - // replace read function of char attribute + // replace functions for char attribute if (AttributeOffset::getCharsetFlag(attrOffset)) { ljam(); - regTabPtr->readFunctionArray[i] = &Dbtup::readCharNULLable; + regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNULLable; + regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable; } }//if } else if (AttributeDescriptor::getArrayType(attrDescriptor) == ZVAR_ARRAY) { @@ -337,25 +339,68 @@ Dbtup::readFixedSizeTHManyWordNotNULL(Uint32* outBuffer, Uint32 attrDes2) { Uint32 indexBuf = tOutBufIndex; + Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2); Uint32 readOffset = AttributeOffset::getOffset(attrDes2); Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor); - Uint32 newIndexBuf = indexBuf + attrNoOfWords; Uint32 maxRead = tMaxRead; ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset); - if (newIndexBuf <= maxRead) { - ljam(); - ahOut->setDataSize(attrNoOfWords); - MEMCOPY_NO_WORDS(&outBuffer[indexBuf], - &tTupleHeader[readOffset], - attrNoOfWords); - tOutBufIndex = newIndexBuf; - return true; + if (! charsetFlag || ! tXfrmFlag) { + Uint32 newIndexBuf = indexBuf + attrNoOfWords; + if (newIndexBuf <= maxRead) { + ljam(); + ahOut->setDataSize(attrNoOfWords); + MEMCOPY_NO_WORDS(&outBuffer[indexBuf], + &tTupleHeader[readOffset], + attrNoOfWords); + tOutBufIndex = newIndexBuf; + return true; + } else { + ljam(); + terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; + }//if } else { ljam(); - terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; - return false; - }//if + Tablerec* regTabPtr = tabptr.p; + Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor); + Uint32 i = AttributeOffset::getCharsetPos(attrDes2); + ndbrequire(i < regTabPtr->noOfCharsets); + CHARSET_INFO* cs = regTabPtr->charsetArray[i]; + Uint32 xmul = cs->strxfrm_multiply; + if (xmul == 0) + xmul = 1; + Uint32 dstLen = xmul * srcBytes; + Uint32 maxIndexBuf = indexBuf + (dstLen >> 2); + if (maxIndexBuf <= maxRead) { + ljam(); + uchar* dstPtr = (uchar*)&outBuffer[indexBuf]; + const uchar* srcPtr = (uchar*)&tTupleHeader[readOffset]; + const char* ssrcPtr = (const char*)srcPtr; + // could verify data format optionally + if (true || + (*cs->cset->well_formed_len)(cs, ssrcPtr, ssrcPtr + srcBytes, ZNIL) == srcBytes) { + ljam(); + // normalize + Uint32 n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes); + while ((n & 3) != 0) { + dstPtr[n++] = 0; + } + Uint32 dstWords = (n >> 2); + ahOut->setDataSize(dstWords); + Uint32 newIndexBuf = indexBuf + dstWords; + ndbrequire(newIndexBuf <= maxRead); + tOutBufIndex = newIndexBuf; + return true; + } else { + ljam(); + terrorCode = ZTUPLE_CORRUPTED_ERROR; + } + } else { + ljam(); + terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; + } + } + return false; }//Dbtup::readFixedSizeTHManyWordNotNULL() bool @@ -402,7 +447,6 @@ Dbtup::readFixedSizeTHManyWordNULLable(Uint32* outBuffer, Uint32 attrDescriptor, Uint32 attrDes2) { -ljam(); if (!nullFlagCheck(attrDes2)) { ljam(); return readFixedSizeTHManyWordNotNULL(outBuffer, @@ -563,74 +607,6 @@ Dbtup::readDynSmallVarSize(Uint32* outBuffer, return false; }//Dbtup::readDynSmallVarSize() - -bool -Dbtup::readCharNotNULL(Uint32* outBuffer, - AttributeHeader* ahOut, - Uint32 attrDescriptor, - Uint32 attrDes2) -{ - Uint32 indexBuf = tOutBufIndex; - Uint32 readOffset = AttributeOffset::getOffset(attrDes2); - Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor); - Uint32 newIndexBuf = indexBuf + attrNoOfWords; - Uint32 maxRead = tMaxRead; - - ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset); - if (newIndexBuf <= maxRead) { - ljam(); - ahOut->setDataSize(attrNoOfWords); - if (! tXfrmFlag) { - MEMCOPY_NO_WORDS(&outBuffer[indexBuf], - &tTupleHeader[readOffset], - attrNoOfWords); - } else { - ljam(); - Tablerec* regTabPtr = tabptr.p; - Uint32 i = AttributeOffset::getCharsetPos(attrDes2); - ndbrequire(i < tabptr.p->noOfCharsets); - // not const in MySQL - CHARSET_INFO* cs = tabptr.p->charsetArray[i]; - // XXX should strip Uint32 null padding - const unsigned nBytes = attrNoOfWords << 2; - unsigned n = - (*cs->coll->strnxfrm)(cs, - (uchar*)&outBuffer[indexBuf], - nBytes, - (const uchar*)&tTupleHeader[readOffset], - nBytes); - // pad with ascii spaces - while (n < nBytes) - ((uchar*)&outBuffer[indexBuf])[n++] = 0x20; - } - tOutBufIndex = newIndexBuf; - return true; - } else { - ljam(); - terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; - return false; - } -} - -bool -Dbtup::readCharNULLable(Uint32* outBuffer, - AttributeHeader* ahOut, - Uint32 attrDescriptor, - Uint32 attrDes2) -{ - if (!nullFlagCheck(attrDes2)) { - ljam(); - return readCharNotNULL(outBuffer, - ahOut, - attrDescriptor, - attrDes2); - } else { - ljam(); - ahOut->setNULL(); - return true; - } -} - /* ---------------------------------------------------------------------- */ /* THIS ROUTINE IS USED TO UPDATE A NUMBER OF ATTRIBUTES. IT IS */ /* USED BY THE INSERT ROUTINE, THE UPDATE ROUTINE AND IT CAN BE */ @@ -818,6 +794,7 @@ Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer, Uint32 indexBuf = tInBufIndex; Uint32 inBufLen = tInBufLen; Uint32 updateOffset = AttributeOffset::getOffset(attrDes2); + Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2); AttributeHeader ahIn(inBuffer[indexBuf]); Uint32 nullIndicator = ahIn.isNULL(); Uint32 noOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor); @@ -827,6 +804,21 @@ Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer, if (newIndex <= inBufLen) { if (!nullIndicator) { ljam(); + if (charsetFlag) { + ljam(); + Tablerec* regTabPtr = tabptr.p; + Uint32 bytes = AttributeDescriptor::getSizeInBytes(attrDescriptor); + Uint32 i = AttributeOffset::getCharsetPos(attrDes2); + ndbrequire(i < regTabPtr->noOfCharsets); + // not const in MySQL + CHARSET_INFO* cs = regTabPtr->charsetArray[i]; + const char* ssrc = (const char*)&inBuffer[tInBufIndex + 1]; + if ((*cs->cset->well_formed_len)(cs, ssrc, ssrc + bytes, ZNIL) != bytes) { + ljam(); + terrorCode = ZINVALID_CHAR_FORMAT; + return false; + } + } tInBufIndex = newIndex; MEMCOPY_NO_WORDS(&tTupleHeader[updateOffset], &inBuffer[indexBuf + 1], diff --git a/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp b/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp index aac5c326cad..476a4b5724b 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp @@ -753,7 +753,7 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, regTabPtr->noOfKeyAttr, keyBuffer, ZATTR_BUFFER_SIZE, - true); + false); ndbrequire(ret != -1); noPrimKey= ret; @@ -796,7 +796,7 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, numAttrsToRead, mainBuffer, ZATTR_BUFFER_SIZE, - true); + false); ndbrequire(ret != -1); noMainWords= ret; } else { @@ -822,7 +822,7 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, numAttrsToRead, copyBuffer, ZATTR_BUFFER_SIZE, - true); + false); ndbrequire(ret != -1); noCopyWords = ret; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp index ddab77b97b5..ffce9969074 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp @@ -18,24 +18,26 @@ #include "Dbtux.hpp" /* - * Search key vs node prefix or entry + * Search key vs node prefix or entry. * * The comparison starts at given attribute position. The position is * updated by number of equal initial attributes found. The entry data * may be partial in which case CmpUnknown may be returned. + * + * The attributes are normalized and have variable size given in words. */ int Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen) { const unsigned numAttrs = frag.m_numAttrs; const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); - // number of words of attribute data left - unsigned len2 = maxlen; // skip to right position in search key only for (unsigned i = 0; i < start; i++) { jam(); searchKey += AttributeHeaderSize + searchKey.ah().getDataSize(); } + // number of words of entry data left + unsigned len2 = maxlen; int ret = 0; while (start < numAttrs) { if (len2 <= AttributeHeaderSize) { @@ -47,18 +49,20 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons if (! searchKey.ah().isNULL()) { if (! entryData.ah().isNULL()) { jam(); - // current attribute + // verify attribute id const DescAttr& descAttr = descEnt.m_descAttr[start]; - // full data size - const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); - ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize()); - const unsigned size2 = min(size1, len2); + ndbrequire(searchKey.ah().getAttributeId() == descAttr.m_primaryAttrId); + ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId); + // sizes + const unsigned size1 = searchKey.ah().getDataSize(); + const unsigned size2 = min(entryData.ah().getDataSize(), len2); len2 -= size2; // compare NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start]; const Uint32* const p1 = &searchKey[AttributeHeaderSize]; const Uint32* const p2 = &entryData[AttributeHeaderSize]; - ret = (*cmp)(0, p1, p2, size1, size2); + const bool full = (maxlen == MaxAttrDataSize); + ret = (*cmp)(0, p1, size1 << 2, p2, size2 << 2, full); if (ret != 0) { jam(); break; @@ -104,6 +108,8 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons * 0 a >= 2 and b > 3 yes +1 * 1 a <= 2 and b <= 3 no +1 * 1 a <= 2 and b < 3 yes -1 + * + * The attributes are normalized and have variable size given in words. */ int Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen) @@ -127,21 +133,21 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne if (! boundInfo.ah().isNULL()) { if (! entryData.ah().isNULL()) { jam(); - // current attribute - const unsigned index = boundInfo.ah().getAttributeId(); + // verify attribute id + const Uint32 index = boundInfo.ah().getAttributeId(); ndbrequire(index < frag.m_numAttrs); const DescAttr& descAttr = descEnt.m_descAttr[index]; ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId); - // full data size + // sizes const unsigned size1 = boundInfo.ah().getDataSize(); - ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize()); - const unsigned size2 = min(size1, len2); + const unsigned size2 = min(entryData.ah().getDataSize(), len2); len2 -= size2; // compare NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index]; const Uint32* const p1 = &boundInfo[AttributeHeaderSize]; const Uint32* const p2 = &entryData[AttributeHeaderSize]; - int ret = (*cmp)(0, p1, p2, size1, size2); + const bool full = (maxlen == MaxAttrDataSize); + int ret = (*cmp)(0, p1, size1 << 2, p2, size2 << 2, full); if (ret != 0) { jam(); return ret; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp index 3c2613c6cd1..0efcc950e02 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp @@ -340,7 +340,7 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan) out << " [savePointId " << dec << scan.m_savePointId << "]"; out << " [accLockOp " << hex << scan.m_accLockOp << "]"; out << " [accLockOps"; - for (unsigned i = 0; i < Dbtux::MaxAccLockOps; i++) { + for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) { if (scan.m_accLockOps[i] != RNIL) out << " " << hex << scan.m_accLockOps[i]; } diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp index 18aa914de05..b6d48b44aef 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp @@ -217,6 +217,7 @@ Dbtux::setKeyAttrs(const Frag& frag) const unsigned numAttrs = frag.m_numAttrs; const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); for (unsigned i = 0; i < numAttrs; i++) { + jam(); const DescAttr& descAttr = descEnt.m_descAttr[i]; Uint32 size = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); // set attr id and fixed size @@ -244,6 +245,26 @@ Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData) jamEntry(); // TODO handle error ndbrequire(ret > 0); +#ifdef VM_TRACE + if (debugFlags & (DebugMaint | DebugScan)) { + debugOut << "readKeyAttrs:" << endl; + ConstData data = keyData; + Uint32 totalSize = 0; + for (Uint32 i = start; i < numAttrs; i++) { + Uint32 attrId = data.ah().getAttributeId(); + Uint32 dataSize = data.ah().getDataSize(); + debugOut << i << " attrId=" << attrId << " size=" << dataSize; + data += 1; + for (Uint32 j = 0; j < dataSize; j++) { + debugOut << " " << hex << data[0]; + data += 1; + } + debugOut << endl; + totalSize += 1 + dataSize; + } + ndbassert(totalSize == ret); + } +#endif } void @@ -251,7 +272,7 @@ Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize) { const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit]; const TupLoc tupLoc = ent.m_tupLoc; - int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), pkData); + int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), pkData, true); jamEntry(); // TODO handle error ndbrequire(ret > 0); diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp index 2d256487dcc..4b568badc67 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp @@ -59,7 +59,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) // get base fragment id and extra bits const Uint32 fragId = req->fragId & ~1; const Uint32 fragBit = req->fragId & 1; - // get the fragment FragPtr fragPtr; fragPtr.i = RNIL; @@ -71,7 +70,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) break; } } - ndbrequire(fragPtr.i != RNIL); Frag& frag = *fragPtr.p; // set up index keys for this operation diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp index cee4aaa8625..021d3d94d8e 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp @@ -16,6 +16,7 @@ #define DBTUX_META_CPP #include "Dbtux.hpp" +#include <my_sys.h> /* * Create index. @@ -215,17 +216,15 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) errorCode = TuxAddAttrRef::InvalidAttributeType; break; } -#ifdef dbtux_uses_charset if (descAttr.m_charset != 0) { - CHARSET_INFO *cs = get_charset(descAttr.m_charset, MYF(0)); - // here use the non-binary type + CHARSET_INFO *cs = all_charsets[descAttr.m_charset]; + ndbrequire(cs != 0); if (! NdbSqlUtil::usable_in_ordered_index(descAttr.m_typeId, cs)) { jam(); errorCode = TuxAddAttrRef::InvalidCharset; break; } } -#endif const bool lastAttr = (indexPtr.p->m_numAttrs == fragOpPtr.p->m_numAttrsRecvd); if (ERROR_INSERTED(12003) && fragOpPtr.p->m_fragNo == 0 && attrId == 0 || ERROR_INSERTED(12004) && fragOpPtr.p->m_fragNo == 0 && lastAttr || diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp index 31a11a5c16b..34f2ab3c525 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp @@ -16,6 +16,7 @@ #define DBTUX_SCAN_CPP #include "Dbtux.hpp" +#include <my_sys.h> void Dbtux::execACC_SCANREQ(Signal* signal) @@ -112,50 +113,89 @@ Dbtux::execACC_SCANREQ(Signal* signal) * keys and that all but possibly last bound is non-strict. * * Finally save the sets of lower and upper bounds (i.e. start key and - * end key). Full bound type (< 4) is included but only the strict bit - * is used since lower and upper have now been separated. + * end key). Full bound type is included but only the strict bit is + * used since lower and upper have now been separated. */ void Dbtux::execTUX_BOUND_INFO(Signal* signal) { jamEntry(); - struct BoundInfo { - int type; - unsigned offset; - unsigned size; - }; - TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend(); - const TuxBoundInfo reqCopy = *(const TuxBoundInfo*)sig; - const TuxBoundInfo* const req = &reqCopy; // get records + TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend(); + const TuxBoundInfo* const req = (const TuxBoundInfo*)sig; ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI); - Index& index = *c_indexPool.getPtr(scan.m_indexId); - // collect lower and upper bounds + const Index& index = *c_indexPool.getPtr(scan.m_indexId); + const DescEnt& descEnt = getDescEnt(index.m_descPage, index.m_descOff); + // collect normalized lower and upper bounds + struct BoundInfo { + int type2; // with EQ -> LE/GE + Uint32 offset; // offset in xfrmData + Uint32 size; + }; BoundInfo boundInfo[2][MaxIndexAttributes]; + const unsigned dstSize = 1024 * MAX_XFRM_MULTIPLY; + Uint32 xfrmData[dstSize]; + Uint32 dstPos = 0; // largest attrId seen plus one Uint32 maxAttrId[2] = { 0, 0 }; - unsigned offset = 0; - const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength; // walk through entries + const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength; + Uint32 offset = 0; while (offset + 2 <= req->boundAiLength) { jam(); const unsigned type = data[offset]; - if (type > 4) { - jam(); - scan.m_state = ScanOp::Invalid; - sig->errorCode = TuxBoundInfo::InvalidAttrInfo; - return; - } const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1]; const Uint32 attrId = ah->getAttributeId(); const Uint32 dataSize = ah->getDataSize(); - if (attrId >= index.m_numAttrs) { + if (type > 4 || attrId >= index.m_numAttrs || dstPos + 2 + dataSize > dstSize) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidAttrInfo; return; } + // copy header + xfrmData[dstPos + 0] = data[offset + 0]; + xfrmData[dstPos + 1] = data[offset + 1]; + // copy bound value + Uint32 dstWords = 0; + if (! ah->isNULL()) { + jam(); + const DescAttr& descAttr = descEnt.m_descAttr[attrId]; + Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(descAttr.m_attrDesc); + Uint32 srcWords = (srcBytes + 3) / 4; + if (srcWords != dataSize) { + jam(); + scan.m_state = ScanOp::Invalid; + sig->errorCode = TuxBoundInfo::InvalidAttrInfo; + return; + } + uchar* dstPtr = (uchar*)&xfrmData[dstPos + 2]; + const uchar* srcPtr = (const uchar*)&data[offset + 2]; + if (descAttr.m_charset == 0) { + memcpy(dstPtr, srcPtr, srcWords << 2); + dstWords = srcWords; + } else { + jam(); + CHARSET_INFO* cs = all_charsets[descAttr.m_charset]; + Uint32 xmul = cs->strxfrm_multiply; + if (xmul == 0) + xmul = 1; + Uint32 dstLen = xmul * srcBytes; + if (dstLen > ((dstSize - dstPos) << 2)) { + jam(); + scan.m_state = ScanOp::Invalid; + sig->errorCode = TuxBoundInfo::TooMuchAttrInfo; + return; + } + Uint32 n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes); + while ((n & 3) != 0) { + dstPtr[n++] = 0; + } + dstWords = n / 4; + } + } for (unsigned j = 0; j <= 1; j++) { + jam(); // check if lower/upper bit matches const unsigned luBit = (j << 1); if ((type & 0x2) != luBit && type != 4) @@ -164,29 +204,35 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal) const unsigned type2 = (type & 0x1) | luBit; // fill in any gap while (maxAttrId[j] <= attrId) { + jam(); BoundInfo& b = boundInfo[j][maxAttrId[j]++]; - b.type = -1; + b.type2 = -1; } BoundInfo& b = boundInfo[j][attrId]; - if (b.type != -1) { - // compare with previous bound - if (b.type != (int)type2 || - b.size != 2 + dataSize || - memcmp(&data[b.offset + 2], &data[offset + 2], dataSize << 2) != 0) { + if (b.type2 != -1) { + // compare with previously defined bound + if (b.type2 != (int)type2 || + b.size != 2 + dstWords || + memcmp(&xfrmData[b.offset + 2], &xfrmData[dstPos + 2], dstWords << 2) != 0) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidBounds; return; } } else { + // fix length + AttributeHeader* ah = (AttributeHeader*)&xfrmData[dstPos + 1]; + ah->setDataSize(dstWords); // enter new bound - b.type = type2; - b.offset = offset; - b.size = 2 + dataSize; + jam(); + b.type2 = type2; + b.offset = dstPos; + b.size = 2 + dstWords; } } // jump to next offset += 2 + dataSize; + dstPos += 2 + dstWords; } if (offset != req->boundAiLength) { jam(); @@ -200,13 +246,13 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal) jam(); const BoundInfo& b = boundInfo[j][i]; // check for gap or strict bound before last - if (b.type == -1 || (i + 1 < maxAttrId[j] && (b.type & 0x1))) { + if (b.type2 == -1 || (i + 1 < maxAttrId[j] && (b.type2 & 0x1))) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidBounds; return; } - bool ok = scan.m_bound[j]->append(&data[b.offset], b.size); + bool ok = scan.m_bound[j]->append(&xfrmData[b.offset], b.size); if (! ok) { jam(); scan.m_state = ScanOp::Invalid; diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index f56c3ce94c2..3f6ef69ca96 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -1546,6 +1546,11 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, m_error.code = 743; return -1; } + // distribution key not supported for Char attribute + if (col->m_distributionKey && col->m_cs != NULL) { + m_error.code = 745; + return -1; + } // charset in upper half of precision if (col->getCharType()) { tmpAttr.AttributeExtPrecision |= (col->m_cs->number << 16); diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp index d9aa860f71f..aabb58fc974 100644 --- a/ndb/src/ndbapi/NdbOperationDefine.cpp +++ b/ndb/src/ndbapi/NdbOperationDefine.cpp @@ -520,16 +520,6 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, // Insert Attribute Id into ATTRINFO part. const Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; - CHARSET_INFO* cs = tAttrInfo->m_cs; - // invalid data can crash kernel - if (cs != NULL && - (*cs->cset->well_formed_len)(cs, - aValue, - aValue + sizeInBytes, - sizeInBytes) != sizeInBytes) { - setErrorCodeAbort(744); - return -1; - } #if 0 tAttrSize = tAttrInfo->theAttrSize; tArraySize = tAttrInfo->theArraySize; diff --git a/ndb/src/ndbapi/NdbOperationSearch.cpp b/ndb/src/ndbapi/NdbOperationSearch.cpp index 7ae5bd70a11..972dc3d94eb 100644 --- a/ndb/src/ndbapi/NdbOperationSearch.cpp +++ b/ndb/src/ndbapi/NdbOperationSearch.cpp @@ -62,7 +62,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, Uint32 tData; Uint32 tKeyInfoPosition; const char* aValue = aValuePassed; - Uint64 xfrmData[512]; Uint64 tempData[512]; if ((theStatus == OperationDefined) && @@ -140,21 +139,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, aValue = (char*)&tempData[0]; }//if } - const char* aValueToWrite = aValue; - - CHARSET_INFO* cs = tAttrInfo->m_cs; - if (cs != 0) { - // current limitation: strxfrm does not increase length - assert(cs->strxfrm_multiply <= 1); - ((Uint32*)xfrmData)[sizeInBytes >> 2] = 0; - unsigned n = - (*cs->coll->strnxfrm)(cs, - (uchar*)xfrmData, sizeof(xfrmData), - (const uchar*)aValue, sizeInBytes); - while (n < sizeInBytes) - ((uchar*)xfrmData)[n++] = 0x20; - aValue = (char*)xfrmData; - } Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Inc. bits in last word @@ -200,13 +184,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, *************************************************************************/ if ((tOpType == InsertRequest) || (tOpType == WriteRequest)) { - // invalid data can crash kernel - if (cs != NULL && - (*cs->cset->well_formed_len)(cs, - aValueToWrite, - aValueToWrite + sizeInBytes, - sizeInBytes) != sizeInBytes) - goto equal_error4; Uint32 ahValue; const Uint32 sz = totalSizeInWords; @@ -224,7 +201,7 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, } insertATTRINFO( ahValue ); - insertATTRINFOloop((Uint32*)aValueToWrite, sz); + insertATTRINFOloop((Uint32*)aValue, sz); }//if /************************************************************************** @@ -321,10 +298,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, equal_error3: setErrorCodeAbort(4209); return -1; - - equal_error4: - setErrorCodeAbort(744); - return -1; } /****************************************************************************** diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index f4973b2e66f..671996bc534 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -1072,29 +1072,6 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, setErrorCodeAbort(4209); return -1; } - - // normalize char bound - CHARSET_INFO* cs = tAttrInfo->m_cs; - Uint64 xfrmData[1001]; - if (cs != NULL && aValue != NULL) { - // current limitation: strxfrm does not increase length - assert(cs->strxfrm_multiply <= 1); - ((Uint32*)xfrmData)[len >> 2] = 0; - unsigned n = - (*cs->coll->strnxfrm)(cs, - (uchar*)xfrmData, sizeof(xfrmData), - (const uchar*)aValue, len); - - while (n < len) - ((uchar*)xfrmData)[n++] = 0x20; - - if(len & 3) - { - len += (4 - (len & 3)); - } - - aValue = (char*)xfrmData; - } // insert attribute header Uint32 tIndexAttrId = tAttrInfo->m_attrId; @@ -1117,7 +1094,7 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, theTotalNrOfKeyWordInSignal = currLen + totalLen; } else { if(!aligned || !nobytes){ - Uint32 *tempData = (Uint32*)xfrmData; + Uint32 tempData[2000]; tempData[0] = type; tempData[1] = ahValue; tempData[2 + (len >> 2)] = 0; @@ -1273,10 +1250,10 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, return (r1_null ? -1 : 1); } const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column); - Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4; + Uint32 len = r1->theAttrSize * r1->theArraySize; if(!r1_null){ const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_extType); - int r = (*sqlType.m_cmp)(col.m_cs, d1, d2, size, size); + int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true); if(r){ assert(r != NdbSqlUtil::CmpUnknown); return r; diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c index 638996530e2..9762eb85de1 100644 --- a/ndb/src/ndbapi/ndberror.c +++ b/ndb/src/ndbapi/ndberror.c @@ -205,6 +205,7 @@ ErrorBundle ErrorCodes[] = { */ { 892, IE, "Inconsistent hash index. The index needs to be dropped and recreated" }, { 895, IE, "Inconsistent ordered index. The index needs to be dropped and recreated" }, + { 896, IE, "Tuple corrupted - wrong checksum or column data in invalid format" }, { 202, IE, "202" }, { 203, IE, "203" }, { 207, IE, "207" }, @@ -311,6 +312,7 @@ ErrorBundle ErrorCodes[] = { { 742, SE, "Unsupported attribute type in index" }, { 743, SE, "Unsupported character set in table or index" }, { 744, SE, "Character string is invalid for given character set" }, + { 745, SE, "Distribution key not supported for char attribute (use binary attribute)" }, { 241, SE, "Invalid schema object version" }, { 283, SE, "Table is being dropped" }, { 284, SE, "Table not defined in transaction coordinator" }, diff --git a/ndb/test/ndbapi/Makefile.am b/ndb/test/ndbapi/Makefile.am index 90f0a08d633..c050c42a701 100644 --- a/ndb/test/ndbapi/Makefile.am +++ b/ndb/test/ndbapi/Makefile.am @@ -32,7 +32,7 @@ testTransactions \ testDeadlock \ test_event ndbapi_slow_select testReadPerf testLcp \ testPartitioning \ -testBitfield +testBitfield foo #flexTimedAsynch #testBlobs @@ -73,6 +73,7 @@ testReadPerf_SOURCES = testReadPerf.cpp testLcp_SOURCES = testLcp.cpp testPartitioning_SOURCES = testPartitioning.cpp testBitfield_SOURCES = testBitfield.cpp +foo_SOURCES = foo.cpp INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel @@ -89,3 +90,4 @@ testBackup_LDADD = $(LDADD) bank/libbank.a %::SCCS/s.% + diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp index 41f0686e63b..2dd63178d01 100644 --- a/ndb/test/ndbapi/testOIBasic.cpp +++ b/ndb/test/ndbapi/testOIBasic.cpp @@ -37,6 +37,7 @@ struct Opt { unsigned m_batch; const char* m_bound; const char* m_case; + bool m_collsp; bool m_core; const char* m_csname; CHARSET_INFO* m_cs; @@ -55,17 +56,18 @@ struct Opt { unsigned m_scanbat; unsigned m_scanpar; unsigned m_scanstop; - unsigned m_seed; + int m_seed; unsigned m_subloop; const char* m_table; unsigned m_threads; - unsigned m_v; + int m_v; // int for lint Opt() : m_batch(32), m_bound("01234"), m_case(0), + m_collsp(false), m_core(false), - m_csname("latin1_bin"), + m_csname("random"), m_cs(0), m_die(0), m_dups(false), @@ -82,7 +84,7 @@ struct Opt { m_scanbat(0), m_scanpar(0), m_scanstop(0), - m_seed(0), + m_seed(-1), m_subloop(4), m_table(0), m_threads(10), @@ -104,12 +106,13 @@ printhelp() << " -batch N pk operations in batch [" << d.m_batch << "]" << endl << " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl << " -case abc only given test cases (letters a-z)" << endl + << " -collsp use strnncollsp instead of strnxfrm" << endl << " -core core dump on error [" << d.m_core << "]" << endl - << " -csname S charset (collation) of non-pk char column [" << d.m_csname << "]" << endl + << " -csname S charset or collation [" << d.m_csname << "]" << endl << " -die nnn exit immediately on NDB error code nnn" << endl << " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl << " -fragtype T fragment type single/small/medium/large" << endl - << " -index xyz only given index numbers (digits 1-9)" << endl + << " -index xyz only given index numbers (digits 0-9)" << endl << " -loop N loop count full suite 0=forever [" << d.m_loop << "]" << endl << " -nologging create tables in no-logging mode" << endl << " -noverify skip index verifications" << endl @@ -118,9 +121,9 @@ printhelp() << " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl << " -scanbat N scan batch per fragment (ignored by ndb api) [" << d.m_scanbat << "]" << endl << " -scanpar N scan parallelism [" << d.m_scanpar << "]" << endl - << " -seed N srandom seed 0=loop number[" << d.m_seed << "]" << endl + << " -seed N srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl << " -subloop N subtest loop count [" << d.m_subloop << "]" << endl - << " -table xyz only given table numbers (digits 1-9)" << endl + << " -table xyz only given table numbers (digits 0-9)" << endl << " -threads N number of threads [" << d.m_threads << "]" << endl << " -vN verbosity [" << d.m_v << "]" << endl << " -h or -help print this help text" << endl @@ -135,9 +138,33 @@ static const bool g_store_null_key = true; // compare NULL like normal value (NULL < not NULL, NULL == NULL) static const bool g_compare_null = true; +static const char* hexstr = "0123456789abcdef"; + +// random ints + +static unsigned +urandom(unsigned n) +{ + if (n == 0) + return 0; + unsigned i = random() % n; + return i; +} + +static int +irandom(unsigned n) +{ + if (n == 0) + return 0; + int i = random() % n; + if (random() & 0x1) + i = -i; + return i; +} + // log and error macros -static NdbMutex *ndbout_mutex= NULL; +static NdbMutex *ndbout_mutex = NULL; static unsigned getthrno(); @@ -198,7 +225,7 @@ getthrstr() return -1; \ } while (0) -// method parameters base class +// method parameters class Thr; class Con; @@ -222,6 +249,8 @@ struct Par : public Opt { // value calculation unsigned m_range; unsigned m_pctrange; + unsigned m_pctbrange; + int m_bdir; // choice of key bool m_randomkey; // do verify after read @@ -240,7 +269,9 @@ struct Par : public Opt { m_slno(0), m_totrows(m_threads * m_rows), m_range(m_rows), - m_pctrange(0), + m_pctrange(40), + m_pctbrange(80), + m_bdir(0), m_randomkey(false), m_verify(false), m_deadlock(false) { @@ -248,15 +279,15 @@ struct Par : public Opt { }; static bool -usetable(unsigned i) +usetable(Par par, unsigned i) { - return g_opt.m_table == 0 || strchr(g_opt.m_table, '1' + i) != 0; + return par.m_table == 0 || strchr(par.m_table, '0' + i) != 0; } static bool -useindex(unsigned i) +useindex(Par par, unsigned i) { - return g_opt.m_index == 0 || strchr(g_opt.m_index, '1' + i) != 0; + return par.m_index == 0 || strchr(par.m_index, '0' + i) != 0; } static unsigned @@ -382,38 +413,196 @@ Lst::reset() m_cnt = 0; } +// character sets + +static const unsigned maxcsnumber = 512; +static const unsigned maxcharcount = 32; +static const unsigned maxcharsize = 4; +static const unsigned maxxmulsize = 8; + +// single mb char +struct Chr { + unsigned char m_bytes[maxcharsize]; + unsigned char m_xbytes[maxxmulsize * maxcharsize]; + unsigned m_size; + Chr(); +}; + +Chr::Chr() +{ + memset(m_bytes, 0, sizeof(m_bytes)); + memset(m_xbytes, 0, sizeof(m_xbytes)); + m_size = 0; +} + +// charset and random valid chars to use +struct Chs { + CHARSET_INFO* m_cs; + unsigned m_xmul; + Chr* m_chr; + Chs(CHARSET_INFO* cs); + ~Chs(); +}; + +Chs::Chs(CHARSET_INFO* cs) : + m_cs(cs) +{ + m_xmul = m_cs->strxfrm_multiply; + if (m_xmul == 0) + m_xmul = 1; + assert(m_xmul <= maxxmulsize); + m_chr = new Chr [maxcharcount]; + unsigned i = 0; + unsigned miss1 = 0; + unsigned miss2 = 0; + while (i < maxcharcount) { + unsigned char* bytes = m_chr[i].m_bytes; + unsigned char* xbytes = m_chr[i].m_xbytes; + unsigned size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1); + assert(m_cs->mbminlen <= size && size <= m_cs->mbmaxlen); + // prefer longer chars + if (size == m_cs->mbminlen && m_cs->mbminlen < m_cs->mbmaxlen && urandom(5) != 0) + continue; + for (unsigned j = 0; j < size; j++) { + bytes[j] = urandom(256); + } + const char* sbytes = (const char*)bytes; + if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + size, 1) != size) { + miss1++; + continue; + } + // do not trust well_formed_len currently + memset(xbytes, 0, sizeof(xbytes)); + // currently returns buffer size always + int xlen = (*cs->coll->strnxfrm)(cs, xbytes, m_xmul * size, bytes, size); + // check we got something + bool xok = false; + for (unsigned j = 0; j < xlen; j++) { + if (xbytes[j] != 0) { + xok = true; + break; + } + } + if (! xok) { + miss2++; + continue; + } + // occasional duplicate char is ok + m_chr[i].m_size = size; + i++; + } + bool disorder = true; + unsigned bubbels = 0; + while (disorder) { + disorder = false; + for (unsigned i = 1; i < maxcharcount; i++) { + unsigned len = sizeof(m_chr[i].m_xbytes); + if (memcmp(m_chr[i-1].m_xbytes, m_chr[i].m_xbytes, len) > 0) { + Chr chr = m_chr[i]; + m_chr[i] = m_chr[i-1]; + m_chr[i-1] = chr; + disorder = true; + bubbels++; + } + } + } + LL3("inited charset " << cs->name << " miss1=" << miss1 << " miss2=" << miss2 << " bubbels=" << bubbels); +} + +Chs::~Chs() +{ + delete [] m_chr; +} + +static Chs* cslist[maxcsnumber]; + +static void +resetcslist() +{ + for (unsigned i = 0; i < maxcsnumber; i++) { + delete cslist[i]; + cslist[i] = 0; + } +} + +static Chs* +getcs(Par par) +{ + CHARSET_INFO* cs; + if (par.m_cs != 0) { + cs = par.m_cs; + } else { + while (1) { + unsigned n = urandom(maxcsnumber); + cs = get_charset(n, MYF(0)); + if (cs != 0) { + // prefer complex charsets + if (cs->mbmaxlen != 1 || urandom(5) == 0) + break; + } + } + } + if (cslist[cs->number] == 0) + cslist[cs->number] = new Chs(cs); + return cslist[cs->number]; +} + // tables and indexes // Col - table column struct Col { + const class Tab& m_tab; unsigned m_num; const char* m_name; bool m_pk; NdbDictionary::Column::Type m_type; unsigned m_length; + unsigned m_bytelength; bool m_nullable; + const Chs* m_chs; + Col(const class Tab& tab, unsigned num, const char* name, bool pk, NdbDictionary::Column::Type type, unsigned length, bool nullable, const Chs* chs); + ~Col(); + bool equal(const Col& col2) const; void verify(const void* addr) const; }; +Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, NdbDictionary::Column::Type type, unsigned length, bool nullable, const Chs* chs) : + m_tab(tab), + m_num(num), + m_name(strcpy(new char [strlen(name) + 1], name)), + m_pk(pk), + m_type(type), + m_length(length), + m_bytelength(length * (chs == 0 ? 1 : chs->m_cs->mbmaxlen)), + m_nullable(nullable), + m_chs(chs) +{ +} + +Col::~Col() +{ + delete [] m_name; +} + +bool +Col::equal(const Col& col2) const +{ + return m_type == col2.m_type && m_length == col2.m_length && m_chs == col2.m_chs; +} + void Col::verify(const void* addr) const { switch (m_type) { case NdbDictionary::Column::Unsigned: break; - case NdbDictionary::Column::Varchar: + case NdbDictionary::Column::Char: { - const unsigned char* p = (const unsigned char*)addr; - unsigned n = (p[0] << 8) | p[1]; - assert(n <= m_length); - unsigned i; - for (i = 0; i < n; i++) { - assert(p[2 + i] != 0); - } - for (i = n; i < m_length; i++) { - assert(p[2 + i] == 0); - } + CHARSET_INFO* cs = m_chs->m_cs; + const char* src = (const char*)addr; + unsigned len = m_bytelength; + assert((*cs->cset->well_formed_len)(cs, src, src + len, 0xffff) == len); } break; default: @@ -425,14 +614,16 @@ Col::verify(const void* addr) const static NdbOut& operator<<(NdbOut& out, const Col& col) { - out << "col " << col.m_num; - out << " " << col.m_name; + out << "col[" << col.m_num << "] " << col.m_name; switch (col.m_type) { case NdbDictionary::Column::Unsigned: out << " unsigned"; break; - case NdbDictionary::Column::Varchar: - out << " varchar(" << col.m_length << ")"; + case NdbDictionary::Column::Char: + { + CHARSET_INFO* cs = col.m_chs->m_cs; + out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")"; + } break; default: out << "type" << (int)col.m_type; @@ -447,25 +638,60 @@ operator<<(NdbOut& out, const Col& col) // ICol - index column struct ICol { + const class ITab& m_itab; unsigned m_num; - struct Col m_col; + const Col& m_col; + ICol(const class ITab& itab, unsigned num, const Col& col); + ~ICol(); }; +ICol::ICol(const class ITab& itab, unsigned num, const Col& col) : + m_itab(itab), + m_num(num), + m_col(col) +{ +} + +ICol::~ICol() +{ +} + // ITab - index struct ITab { + const class Tab& m_tab; const char* m_name; unsigned m_icols; - const ICol* m_icol; + const ICol** m_icol; + ITab(const class Tab& tab, const char* name, unsigned icols); + ~ITab(); }; +ITab::ITab(const class Tab& tab, const char* name, unsigned icols) : + m_tab(tab), + m_name(strcpy(new char [strlen(name) + 1], name)), + m_icols(icols), + m_icol(new const ICol* [icols + 1]) +{ + for (unsigned i = 0; i <= m_icols; i++) + m_icol[0] = 0; +} + +ITab::~ITab() +{ + delete [] m_name; + for (unsigned i = 0; i < m_icols; i++) + delete m_icol[i]; + delete [] m_icol; +} + static NdbOut& operator<<(NdbOut& out, const ITab& itab) { - out << "itab " << itab.m_name << " " << itab.m_icols; + out << "itab " << itab.m_name << " icols=" << itab.m_icols; for (unsigned k = 0; k < itab.m_icols; k++) { out << endl; - out << "icol " << k << " " << itab.m_icol[k].m_col; + out << "icol[" << k << "] " << itab.m_icol[k]->m_col; } return out; } @@ -475,200 +701,241 @@ operator<<(NdbOut& out, const ITab& itab) struct Tab { const char* m_name; unsigned m_cols; - const Col* m_col; + const Col** m_col; unsigned m_itabs; - const ITab* m_itab; + const ITab** m_itab; + // pk must contain an Unsigned column + unsigned m_keycol; + Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol); + ~Tab(); }; +Tab::Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol) : + m_name(strcpy(new char [strlen(name) + 1], name)), + m_cols(cols), + m_col(new const Col* [cols + 1]), + m_itabs(itabs), + m_itab(new const ITab* [itabs + 1]), + m_keycol(keycol) +{ + for (unsigned i = 0; i <= cols; i++) + m_col[i] = 0; + for (unsigned i = 0; i <= itabs; i++) + m_itab[i] = 0; +} + +Tab::~Tab() +{ + delete [] m_name; + for (unsigned i = 0; i < m_cols; i++) + delete m_col[i]; + delete [] m_col; + for (unsigned i = 0; i < m_itabs; i++) + delete m_itab[i]; + delete [] m_itab; +} + static NdbOut& operator<<(NdbOut& out, const Tab& tab) { - out << "tab " << tab.m_name << " " << tab.m_cols; + out << "tab " << tab.m_name << " cols=" << tab.m_cols; for (unsigned k = 0; k < tab.m_cols; k++) { out << endl; - out << tab.m_col[k]; + out << *tab.m_col[k]; } for (unsigned i = 0; i < tab.m_itabs; i++) { - if (! useindex(i)) + if (tab.m_itab[i] == 0) continue; out << endl; - out << tab.m_itab[i]; + out << *tab.m_itab[i]; } return out; } -// tt1 + tt1x1 tt1x2 tt1x3 tt1x4 tt1x5 - -static const Col -tt1col[] = { - { 0, "A", 1, NdbDictionary::Column::Unsigned, 1, 0 }, - { 1, "B", 0, NdbDictionary::Column::Unsigned, 1, 1 }, - { 2, "C", 0, NdbDictionary::Column::Unsigned, 1, 1 }, - { 3, "D", 0, NdbDictionary::Column::Unsigned, 1, 1 }, - { 4, "E", 0, NdbDictionary::Column::Unsigned, 1, 1 } -}; - -static const ICol -tt1x1col[] = { - { 0, tt1col[0] } -}; - -static const ICol -tt1x2col[] = { - { 0, tt1col[1] } -}; - -static const ICol -tt1x3col[] = { - { 0, tt1col[1] }, - { 1, tt1col[2] } -}; - -static const ICol -tt1x4col[] = { - { 0, tt1col[3] }, - { 1, tt1col[2] }, - { 2, tt1col[1] } -}; - -static const ICol -tt1x5col[] = { - { 0, tt1col[1] }, - { 1, tt1col[4] }, - { 2, tt1col[2] }, - { 3, tt1col[3] } -}; - -static const ITab -tt1x1 = { - "TT1X1", 1, tt1x1col -}; - -static const ITab -tt1x2 = { - "TT1X2", 1, tt1x2col -}; - -static const ITab -tt1x3 = { - "TT1X3", 2, tt1x3col -}; - -static const ITab -tt1x4 = { - "TT1X4", 3, tt1x4col -}; - -static const ITab -tt1x5 = { - "TT1X5", 4, tt1x5col -}; - -static const ITab -tt1itab[] = { - tt1x1, - tt1x2, - tt1x3, - tt1x4, - tt1x5 -}; - -static const Tab -tt1 = { - "TT1", 5, tt1col, 5, tt1itab -}; - -// tt2 + tt2x1 tt2x2 tt2x3 tt2x4 tt2x5 - -static const Col -tt2col[] = { - { 0, "A", 1, NdbDictionary::Column::Unsigned, 1, 0 }, - { 1, "B", 0, NdbDictionary::Column::Unsigned, 1, 1 }, - { 2, "C", 0, NdbDictionary::Column::Varchar, 20, 1 }, - { 3, "D", 0, NdbDictionary::Column::Varchar, 5, 1 }, - { 4, "E", 0, NdbDictionary::Column::Varchar, 5, 1 } -}; - -static const ICol -tt2x1col[] = { - { 0, tt2col[0] } -}; - -static const ICol -tt2x2col[] = { - { 0, tt2col[1] }, - { 1, tt2col[2] } -}; - -static const ICol -tt2x3col[] = { - { 0, tt2col[2] }, - { 1, tt2col[1] } -}; - -static const ICol -tt2x4col[] = { - { 0, tt2col[3] }, - { 1, tt2col[4] } -}; - -static const ICol -tt2x5col[] = { - { 0, tt2col[4] }, - { 1, tt2col[3] }, - { 2, tt2col[2] }, - { 3, tt2col[1] } -}; - -static const ITab -tt2x1 = { - "TT2X1", 1, tt2x1col -}; - -static const ITab -tt2x2 = { - "TT2X2", 2, tt2x2col -}; +// make table structs -static const ITab -tt2x3 = { - "TT2X3", 2, tt2x3col -}; - -static const ITab -tt2x4 = { - "TT2X4", 2, tt2x4col -}; +static const Tab** tablist = 0; +static unsigned tabcount = 0; -static const ITab -tt2x5 = { - "TT2X5", 4, tt2x5col -}; - -static const ITab -tt2itab[] = { - tt2x1, - tt2x2, - tt2x3, - tt2x4, - tt2x5 -}; - -static const Tab -tt2 = { - "TT2", 5, tt2col, 5, tt2itab -}; - -// all tables - -static const Tab -tablist[] = { - tt1, - tt2 -}; +static void +verifytables() +{ + for (unsigned j = 0; j < tabcount; j++) { + const Tab* t = tablist[j]; + if (t == 0) + continue; + assert(t->m_cols != 0 && t->m_col != 0); + for (unsigned k = 0; k < t->m_cols; k++) { + const Col* c = t->m_col[k]; + assert(c != 0 && c->m_num == k); + assert(! (c->m_pk && c->m_nullable)); + } + assert(t->m_col[t->m_cols] == 0); + { + assert(t->m_keycol < t->m_cols); + const Col* c = t->m_col[t->m_keycol]; + assert(c->m_pk && c->m_type == NdbDictionary::Column::Unsigned); + } + assert(t->m_itabs != 0 && t->m_itab != 0); + for (unsigned i = 0; i < t->m_itabs; i++) { + const ITab* x = t->m_itab[i]; + if (x == 0) + continue; + assert(x != 0 && x->m_icols != 0 && x->m_icol != 0); + for (unsigned k = 0; k < x->m_icols; k++) { + const ICol* c = x->m_icol[k]; + assert(c != 0 && c->m_num == k && c->m_col.m_num < t->m_cols); + } + } + assert(t->m_itab[t->m_itabs] == 0); + } +} -static const unsigned -tabcount = sizeof(tablist) / sizeof(tablist[0]); +static void +makebuiltintables(Par par) +{ + LL2("makebuiltintables"); + resetcslist(); + tabcount = 3; + if (tablist == 0) { + tablist = new const Tab* [tabcount]; + for (unsigned j = 0; j < tabcount; j++) { + tablist[j] = 0; + } + } else { + for (unsigned j = 0; j < tabcount; j++) { + delete tablist[j]; + tablist[j] = 0; + } + } + // ti0 - basic + if (usetable(par, 0)) { + const Tab* t = new Tab("ti0", 5, 5, 0); + // name - pk - type - length - nullable - cs + t->m_col[0] = new Col(*t, 0, "a", 1, NdbDictionary::Column::Unsigned, 1, 0, 0); + t->m_col[1] = new Col(*t, 1, "b", 0, NdbDictionary::Column::Unsigned, 1, 1, 0); + t->m_col[2] = new Col(*t, 2, "c", 0, NdbDictionary::Column::Unsigned, 1, 1, 0); + t->m_col[3] = new Col(*t, 3, "d", 0, NdbDictionary::Column::Unsigned, 1, 1, 0); + t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Unsigned, 1, 1, 0); + if (useindex(par, 0)) { + // a + const ITab* x = t->m_itab[0] = new ITab(*t, "ti0x0", 1); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]); + } + if (useindex(par, 1)) { + // b + const ITab* x = t->m_itab[1] = new ITab(*t, "ti0x1", 1); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]); + } + if (useindex(par, 2)) { + // b, c + const ITab* x = t->m_itab[2] = new ITab(*t, "ti0x2", 2); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]); + x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]); + } + if (useindex(par, 3)) { + // d, c, b + const ITab* x = t->m_itab[3] = new ITab(*t, "ti0x3", 3); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[3]); + x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]); + x->m_icol[2] = new ICol(*x, 2, *t->m_col[1]); + } + if (useindex(par, 4)) { + // b, e, c, d + const ITab* x = t->m_itab[4] = new ITab(*t, "ti0x4", 4); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]); + x->m_icol[1] = new ICol(*x, 1, *t->m_col[4]); + x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]); + x->m_icol[3] = new ICol(*x, 3, *t->m_col[3]); + } + tablist[0] = t; + } + // ti1 - simple char fields + if (usetable(par, 1)) { + const Tab* t = new Tab("ti1", 5, 5, 1); + // name - pk - type - length - nullable - cs + t->m_col[0] = new Col(*t, 0, "a", 0, NdbDictionary::Column::Unsigned, 1, 1, 0); + t->m_col[1] = new Col(*t, 1, "b", 1, NdbDictionary::Column::Unsigned, 1, 0, 0); + t->m_col[2] = new Col(*t, 2, "c", 0, NdbDictionary::Column::Char, 20, 1, getcs(par)); + t->m_col[3] = new Col(*t, 3, "d", 0, NdbDictionary::Column::Char, 5, 1, getcs(par)); + t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Char, 5, 1, getcs(par)); + if (useindex(par, 0)) { + // b + const ITab* x = t->m_itab[0] = new ITab(*t, "ti1x0", 1); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]); + } + if (useindex(par, 1)) { + // a, c + const ITab* x = t->m_itab[1] = new ITab(*t, "ti1x1", 2); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]); + x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]); + } + if (useindex(par, 2)) { + // c, a + const ITab* x = t->m_itab[2] = new ITab(*t, "ti1x2", 2); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[2]); + x->m_icol[1] = new ICol(*x, 1, *t->m_col[0]); + } + if (useindex(par, 3)) { + // e + const ITab* x = t->m_itab[3] = new ITab(*t, "ti1x3", 1); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]); + } + if (useindex(par, 4)) { + // e, d, c, b + const ITab* x = t->m_itab[4] = new ITab(*t, "ti1x4", 4); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]); + x->m_icol[1] = new ICol(*x, 1, *t->m_col[3]); + x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]); + x->m_icol[3] = new ICol(*x, 3, *t->m_col[1]); + } + tablist[1] = t; + } + // ti2 - complex char fields + if (usetable(par, 2)) { + const Tab* t = new Tab("ti2", 5, 5, 2); + // name - pk - type - length - nullable - cs + t->m_col[0] = new Col(*t, 0, "a", 1, NdbDictionary::Column::Char, 101, 0, getcs(par)); + t->m_col[1] = new Col(*t, 1, "b", 0, NdbDictionary::Column::Char, 4, 1, getcs(par)); + t->m_col[2] = new Col(*t, 2, "c", 1, NdbDictionary::Column::Unsigned, 1, 0, 0); + t->m_col[3] = new Col(*t, 3, "d", 1, NdbDictionary::Column::Char, 3, 0, getcs(par)); + t->m_col[4] = new Col(*t, 4, "e", 0, NdbDictionary::Column::Char, 101, 0, getcs(par)); + if (useindex(par, 0)) { + // a, c, d + const ITab* x = t->m_itab[0] = new ITab(*t, "ti2x0", 3); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]); + x->m_icol[1] = new ICol(*x, 1, *t->m_col[2]); + x->m_icol[2] = new ICol(*x, 2, *t->m_col[3]); + } + if (useindex(par, 1)) { + // e, d, c, b, a + const ITab* x = t->m_itab[1] = new ITab(*t, "ti2x1", 5); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[4]); + x->m_icol[1] = new ICol(*x, 1, *t->m_col[3]); + x->m_icol[2] = new ICol(*x, 2, *t->m_col[2]); + x->m_icol[3] = new ICol(*x, 3, *t->m_col[1]); + x->m_icol[4] = new ICol(*x, 4, *t->m_col[0]); + } + if (useindex(par, 2)) { + // d + const ITab* x = t->m_itab[2] = new ITab(*t, "ti2x2", 1); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[3]); + } + if (useindex(par, 3)) { + // b + const ITab* x = t->m_itab[3] = new ITab(*t, "ti2x3", 1); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[1]); + } + if (useindex(par, 4)) { + // a, e + const ITab* x = t->m_itab[4] = new ITab(*t, "ti2x4", 2); + x->m_icol[0] = new ICol(*x, 0, *t->m_col[0]); + x->m_icol[1] = new ICol(*x, 1, *t->m_col[4]); + } + tablist[2] = t; + } + verifytables(); +} // connections @@ -834,7 +1101,7 @@ Con::execute(ExecType t, bool& deadlock) int Con::openScanRead(unsigned scanbat, unsigned scanpar) { - assert(m_tx != 0 && m_op != 0); + assert(m_tx != 0 && m_scanop != 0); NdbOperation::LockMode lm = NdbOperation::LM_Read; CHKCON((m_resultset = m_scanop->readTuples(lm, scanbat, scanpar)) != 0, *this); return 0; @@ -843,7 +1110,7 @@ Con::openScanRead(unsigned scanbat, unsigned scanpar) int Con::openScanExclusive(unsigned scanbat, unsigned scanpar) { - assert(m_tx != 0 && m_op != 0); + assert(m_tx != 0 && m_scanop != 0); NdbOperation::LockMode lm = NdbOperation::LM_Exclusive; CHKCON((m_resultset = m_scanop->readTuples(lm, scanbat, scanpar)) != 0, *this); return 0; @@ -936,7 +1203,7 @@ Con::printerror(NdbOut& out) if ((code = m_tx->getNdbError().code) != 0) { LL0(++any << " con: error " << m_tx->getNdbError()); die += (code == g_opt.m_die); - if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499) + if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499 || code == 631) m_errtype = ErrDeadlock; } if (m_op && m_op->getNdbError().code != 0) { @@ -972,9 +1239,9 @@ invalidateindex(Par par) Con& con = par.con(); const Tab& tab = par.tab(); for (unsigned i = 0; i < tab.m_itabs; i++) { - if (! useindex(i)) + if (tab.m_itab[i] == 0) continue; - const ITab& itab = tab.m_itab[i]; + const ITab& itab = *tab.m_itab[i]; invalidateindex(par, itab); } return 0; @@ -1022,16 +1289,14 @@ createtable(Par par) t.setLogging(false); } for (unsigned k = 0; k < tab.m_cols; k++) { - const Col& col = tab.m_col[k]; + const Col& col = *tab.m_col[k]; NdbDictionary::Column c(col.m_name); c.setType(col.m_type); - c.setLength(col.m_length); + c.setLength(col.m_bytelength); // NDB API uses length in bytes c.setPrimaryKey(col.m_pk); c.setNullable(col.m_nullable); - if (c.getCharset()) { // test if char type - if (! col.m_pk) - c.setCharset(par.m_cs); - } + if (col.m_chs != 0) + c.setCharset(col.m_chs->m_cs); t.addColumn(c); } con.m_dic = con.m_ndb->getDictionary(); @@ -1062,9 +1327,9 @@ dropindex(Par par) { const Tab& tab = par.tab(); for (unsigned i = 0; i < tab.m_itabs; i++) { - if (! useindex(i)) + if (tab.m_itab[i] == 0) continue; - const ITab& itab = tab.m_itab[i]; + const ITab& itab = *tab.m_itab[i]; CHK(dropindex(par, itab) == 0); } return 0; @@ -1082,7 +1347,8 @@ createindex(Par par, const ITab& itab) x.setType(NdbDictionary::Index::OrderedIndex); x.setLogging(false); for (unsigned k = 0; k < itab.m_icols; k++) { - const Col& col = itab.m_icol[k].m_col; + const ICol& icol = *itab.m_icol[k]; + const Col& col = icol.m_col; x.addColumnName(col.m_name); } con.m_dic = con.m_ndb->getDictionary(); @@ -1096,9 +1362,9 @@ createindex(Par par) { const Tab& tab = par.tab(); for (unsigned i = 0; i < tab.m_itabs; i++) { - if (! useindex(i)) + if (tab.m_itab[i] == 0) continue; - const ITab& itab = tab.m_itab[i]; + const ITab& itab = *tab.m_itab[i]; CHK(createindex(par, itab) == 0); } return 0; @@ -1106,33 +1372,13 @@ createindex(Par par) // data sets -static unsigned -urandom(unsigned n) -{ - if (n == 0) - return 0; - unsigned i = random() % n; - return i; -} - -static int -irandom(unsigned n) -{ - if (n == 0) - return 0; - int i = random() % n; - if (random() & 0x1) - i = -i; - return i; -} - // Val - typed column value struct Val { const Col& m_col; union { Uint32 m_uint32; - char* m_varchar; + unsigned char* m_char; }; Val(const Col& col); ~Val(); @@ -1142,6 +1388,8 @@ struct Val { bool m_null; int setval(Par par) const; void calc(Par par, unsigned i); + void calckey(Par par, unsigned i); + void calcnokey(Par par); int verify(const Val& val2) const; int cmp(const Val& val2) const; private: @@ -1157,8 +1405,8 @@ Val::Val(const Col& col) : switch (col.m_type) { case NdbDictionary::Column::Unsigned: break; - case NdbDictionary::Column::Varchar: - m_varchar = new char [2 + col.m_length]; + case NdbDictionary::Column::Char: + m_char = new unsigned char [col.m_bytelength]; break; default: assert(false); @@ -1172,8 +1420,8 @@ Val::~Val() switch (col.m_type) { case NdbDictionary::Column::Unsigned: break; - case NdbDictionary::Column::Varchar: - delete [] m_varchar; + case NdbDictionary::Column::Char: + delete [] m_char; break; default: assert(false); @@ -1202,8 +1450,8 @@ Val::copy(const void* addr) case NdbDictionary::Column::Unsigned: m_uint32 = *(const Uint32*)addr; break; - case NdbDictionary::Column::Varchar: - memcpy(m_varchar, addr, 2 + col.m_length); + case NdbDictionary::Column::Char: + memcpy(m_char, addr, col.m_bytelength); break; default: assert(false); @@ -1219,8 +1467,8 @@ Val::dataaddr() const switch (col.m_type) { case NdbDictionary::Column::Unsigned: return &m_uint32; - case NdbDictionary::Column::Varchar: - return m_varchar; + case NdbDictionary::Column::Char: + return m_char; default: break; } @@ -1236,11 +1484,11 @@ Val::setval(Par par) const const char* addr = (const char*)dataaddr(); if (m_null) addr = 0; + LL5("setval [" << m_col << "] " << *this); if (col.m_pk) CHK(con.equal(col.m_num, addr) == 0); else CHK(con.setValue(col.m_num, addr) == 0); - LL5("setval [" << m_col << "] " << *this); return 0; } @@ -1248,43 +1496,93 @@ void Val::calc(Par par, unsigned i) { const Col& col = m_col; + col.m_pk ? calckey(par, i) : calcnokey(par); + if (! m_null) + col.verify(dataaddr()); +} + +void +Val::calckey(Par par, unsigned i) +{ + const Col& col = m_col; m_null = false; - if (col.m_pk) { + switch (col.m_type) { + case NdbDictionary::Column::Unsigned: m_uint32 = i; - return; + break; + case NdbDictionary::Column::Char: + { + const Chs* chs = col.m_chs; + CHARSET_INFO* cs = chs->m_cs; + unsigned n = 0; + // our random chars may not fill value exactly + while (n + cs->mbmaxlen <= col.m_bytelength) { + if (i % (1 + n) == 0) { + break; + } + const Chr& chr = chs->m_chr[i % maxcharcount]; + memcpy(&m_char[n], chr.m_bytes, chr.m_size); + n += chr.m_size; + } + // this will extend by appropriate space + (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20); + } + break; + default: + assert(false); + break; } +} + +void +Val::calcnokey(Par par) +{ + const Col& col = m_col; + m_null = false; if (col.m_nullable && urandom(100) < par.m_pctnull) { m_null = true; return; } - unsigned v = par.m_range + irandom((par.m_pctrange * par.m_range) / 100); + int r = irandom((par.m_pctrange * par.m_range) / 100); + if (par.m_bdir != 0 && urandom(10) != 0) { + if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0) + r = -r; + } + unsigned v = par.m_range + r; switch (col.m_type) { case NdbDictionary::Column::Unsigned: m_uint32 = v; break; - case NdbDictionary::Column::Varchar: + case NdbDictionary::Column::Char: { + const Chs* chs = col.m_chs; + CHARSET_INFO* cs = chs->m_cs; unsigned n = 0; - while (n < col.m_length) { - if (urandom(1 + col.m_length) == 0) { - // nice distribution on lengths + // our random chars may not fill value exactly + while (n + cs->mbmaxlen <= col.m_bytelength) { + if (urandom(1 + col.m_bytelength) == 0) { break; } - m_varchar[2 + n++] = 'a' + urandom((par.m_pctrange * 10) / 100); - } - m_varchar[0] = (n >> 8); - m_varchar[1] = (n & 0xff); - while (n < col.m_length) { - m_varchar[2 + n++] = 0; + unsigned half = maxcharcount / 2; + int r = irandom((par.m_pctrange * half) / 100); + if (par.m_bdir != 0 && urandom(10) != 0) { + if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0) + r = -r; + } + unsigned i = half + r; + assert(i < maxcharcount); + const Chr& chr = chs->m_chr[i]; + memcpy(&m_char[n], chr.m_bytes, chr.m_size); + n += chr.m_size; } + // this will extend by appropriate space + (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20); } break; default: assert(false); break; } - // verify format - col.verify(dataaddr()); } int @@ -1299,7 +1597,7 @@ Val::cmp(const Val& val2) const { const Col& col = m_col; const Col& col2 = val2.m_col; - assert(col.m_type == col2.m_type && col.m_length == col2.m_length); + assert(col.equal(col2)); if (m_null || val2.m_null) { if (! m_null) return +1; @@ -1313,13 +1611,37 @@ Val::cmp(const Val& val2) const // compare switch (col.m_type) { case NdbDictionary::Column::Unsigned: - if (m_uint32 < val2.m_uint32) - return -1; - if (m_uint32 > val2.m_uint32) - return +1; - return 0; - case NdbDictionary::Column::Varchar: - return memcmp(&m_varchar[2], &val2.m_varchar[2], col.m_length); + { + if (m_uint32 < val2.m_uint32) + return -1; + if (m_uint32 > val2.m_uint32) + return +1; + return 0; + } + break; + case NdbDictionary::Column::Char: + { + const Chs* chs = col.m_chs; + CHARSET_INFO* cs = chs->m_cs; + unsigned len = col.m_bytelength; + int k; + if (! g_opt.m_collsp) { + unsigned char x1[maxxmulsize * 8000]; + unsigned char x2[maxxmulsize * 8000]; + int n1 = (*cs->coll->strnxfrm)(cs, x1, chs->m_xmul * len, m_char, len); + int n2 = (*cs->coll->strnxfrm)(cs, x2, chs->m_xmul * len, val2.m_char, len); + // currently same but do not assume it + unsigned n = (n1 > n2 ? n1 : n2); + // assume null padding + memset(x1 + n1, 0x0, n - n1); + memset(x2 + n2, 0x0, n - n2); + k = memcmp(x1, x2, n); + } else { + k = (*cs->coll->strnncollsp)(cs, m_char, len, val2.m_char, len, false); + } + return k < 0 ? -1 : k > 0 ? +1 : 0; + } + break; default: break; } @@ -1339,12 +1661,26 @@ operator<<(NdbOut& out, const Val& val) case NdbDictionary::Column::Unsigned: out << val.m_uint32; break; - case NdbDictionary::Column::Varchar: + case NdbDictionary::Column::Char: { - char buf[8000]; - unsigned n = (val.m_varchar[0] << 8) | val.m_varchar[1]; - assert(n <= col.m_length); - sprintf(buf, "'%.*s'[%d]", n, &val.m_varchar[2], n); + char buf[4 * 8000]; + char *p = buf; + *p++ = '['; + for (unsigned i = 0; i < col.m_bytelength; i++) { + unsigned char c = val.m_char[i]; + if (c == '\\') { + *p++ = '\\'; + *p++ = '\\'; + } else if (0x20 <= c && c < 0x7e) { + *p++ = c; + } else { + *p++ = '\\'; + *p++ = hexstr[c >> 4]; + *p++ = hexstr[c & 15]; + } + } + *p++ = ']'; + *p = 0; out << buf; } break; @@ -1384,7 +1720,7 @@ Row::Row(const Tab& tab) : { m_val = new Val* [tab.m_cols]; for (unsigned k = 0; k < tab.m_cols; k++) { - const Col& col = tab.m_col[k]; + const Col& col = *tab.m_col[k]; m_val[k] = new Val(col); } m_exist = false; @@ -1461,6 +1797,16 @@ Row::updrow(Par par) CHKCON(con.m_op->updateTuple() == 0, con); for (unsigned k = 0; k < tab.m_cols; k++) { const Val& val = *m_val[k]; + const Col& col = val.m_col; + if (! col.m_pk) + continue; + CHK(val.setval(par) == 0); + } + for (unsigned k = 0; k < tab.m_cols; k++) { + const Val& val = *m_val[k]; + const Col& col = val.m_col; + if (col.m_pk) + continue; CHK(val.setval(par) == 0); } m_pending = UpdOp; @@ -1668,14 +2014,33 @@ Set::pending(unsigned i) const } void +Set::notpending(unsigned i) +{ + assert(m_row[i] != 0); + Row& row = *m_row[i]; + if (row.m_pending == Row::InsOp) + row.m_exist = true; + if (row.m_pending == Row::DelOp) + row.m_exist = false; + row.m_pending = Row::NoOp; +} + +void +Set::notpending(const Lst& lst) +{ + for (unsigned j = 0; j < lst.m_cnt; j++) { + unsigned i = lst.m_arr[j]; + notpending(i); + } +} + +void Set::calc(Par par, unsigned i) { const Tab& tab = m_tab; if (m_row[i] == 0) m_row[i] = new Row(tab); Row& row = *m_row[i]; - // value generation parameters - par.m_pctrange = 40; row.calc(par, i); } @@ -1739,9 +2104,11 @@ Set::getval(Par par) int Set::getkey(Par par, unsigned* i) { - assert(m_rec[0] != 0); - const char* aRef0 = m_rec[0]->aRef(); - Uint32 key = *(const Uint32*)aRef0; + const Tab& tab = m_tab; + unsigned k = tab.m_keycol; + assert(m_rec[k] != 0); + const char* aRef = m_rec[k]->aRef(); + Uint32 key = *(const Uint32*)aRef; CHK(key < m_rows); *i = key; return 0; @@ -1772,32 +2139,12 @@ Set::putval(unsigned i, bool force) return 0; } -void -Set::notpending(unsigned i) -{ - assert(m_row[i] != 0); - Row& row = *m_row[i]; - if (row.m_pending == Row::InsOp) - row.m_exist = true; - if (row.m_pending == Row::DelOp) - row.m_exist = false; - row.m_pending = Row::NoOp; -} - -void -Set::notpending(const Lst& lst) -{ - for (unsigned j = 0; j < lst.m_cnt; j++) { - unsigned i = lst.m_arr[j]; - notpending(i); - } -} - int Set::verify(const Set& set2) const { const Tab& tab = m_tab; assert(&tab == &set2.m_tab && m_rows == set2.m_rows); + LL3("verify set1 count=" << count() << " vs set2 count=" << set2.count()); for (unsigned i = 0; i < m_rows; i++) { CHK(exist(i) == set2.exist(i)); if (! exist(i)) @@ -1884,10 +2231,10 @@ operator<<(NdbOut& out, const BVal& bval) const ICol& icol = bval.m_icol; const Col& col = icol.m_col; const Val& val = bval; - out << "type " << bval.m_type; - out << " icol " << icol.m_num; - out << " col " << col.m_name << "(" << col.m_num << ")"; - out << " value " << val; + out << "type=" << bval.m_type; + out << " icol=" << icol.m_num; + out << " col=" << col.m_num << "," << col.m_name; + out << " value=" << val; return out; } @@ -1939,12 +2286,15 @@ void BSet::calc(Par par) { const ITab& itab = m_itab; + par.m_pctrange = par.m_pctbrange; reset(); for (unsigned k = 0; k < itab.m_icols; k++) { - const ICol& icol = itab.m_icol[k]; + const ICol& icol = *itab.m_icol[k]; const Col& col = icol.m_col; for (unsigned i = 0; i <= 1; i++) { - if (urandom(10) == 0) + if (m_bvals == 0 && urandom(100) == 0) + return; + if (m_bvals != 0 && urandom(3) == 0) return; assert(m_bvals < m_alloc); BVal& bval = *new BVal(icol); @@ -1963,12 +2313,14 @@ BSet::calc(Par par) bval.m_type = 4; if (k + 1 < itab.m_icols) bval.m_type = 4; - // value generation parammeters if (! g_compare_null) par.m_pctnull = 0; - par.m_pctrange = 50; // bit higher + if (bval.m_type == 0 || bval.m_type == 1) + par.m_bdir = -1; + if (bval.m_type == 2 || bval.m_type == 3) + par.m_bdir = +1; do { - bval.calc(par, 0); + bval.calcnokey(par); if (i == 1) { assert(m_bvals >= 2); const BVal& bv1 = *m_bval[m_bvals - 2]; @@ -1990,7 +2342,7 @@ BSet::calcpk(Par par, unsigned i) const ITab& itab = m_itab; reset(); for (unsigned k = 0; k < itab.m_icols; k++) { - const ICol& icol = itab.m_icol[k]; + const ICol& icol = *itab.m_icol[k]; const Col& col = icol.m_col; assert(col.m_pk); assert(m_bvals < m_alloc); @@ -2037,7 +2389,7 @@ BSet::filter(const Set& set, Set& set2) const if (! g_store_null_key) { bool ok1 = false; for (unsigned k = 0; k < itab.m_icols; k++) { - const ICol& icol = itab.m_icol[k]; + const ICol& icol = *itab.m_icol[k]; const Col& col = icol.m_col; const Val& val = *row.m_val[col.m_num]; if (! val.m_null) { @@ -2055,6 +2407,7 @@ BSet::filter(const Set& set, Set& set2) const const Col& col = icol.m_col; const Val& val = *row.m_val[col.m_num]; int ret = bval.cmp(val); + LL5("cmp: ret=" << ret << " " << bval << " vs " << val); if (bval.m_type == 0) ok2 = (ret <= 0); else if (bval.m_type == 1) @@ -2087,9 +2440,8 @@ operator<<(NdbOut& out, const BSet& bset) { out << "bounds=" << bset.m_bvals; for (unsigned j = 0; j < bset.m_bvals; j++) { - out << endl; const BVal& bval = *bset.m_bval[j]; - out << "bound " << j << ": " << bval; + out << " [bound " << j << ": " << bval << "]"; } return out; } @@ -2318,26 +2670,36 @@ scanreadtable(Par par) const Set& set = par.set(); // expected const Set& set1 = set; - LL3((par.m_verify ? "scanverify " : "scanread ") << tab.m_name); + LL3("scanread " << tab.m_name << " verify=" << par.m_verify); + LL4("expect " << set.count() << " rows"); Set set2(tab, set.m_rows); CHK(con.startTransaction() == 0); CHK(con.getNdbScanOperation(tab) == 0); CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0); set2.getval(par); CHK(con.executeScan() == 0); + unsigned n = 0; + bool deadlock = false; while (1) { int ret; - CHK((ret = con.nextScanResult(true)) == 0 || ret == 1); + deadlock = par.m_deadlock; + CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1); if (ret == 1) break; + if (deadlock) { + LL1("scanreadtable: stop on deadlock"); + break; + } unsigned i = (unsigned)-1; CHK(set2.getkey(par, &i) == 0); CHK(set2.putval(i, false) == 0); - LL4("row " << set2.count() << ": " << *set2.m_row[i]); + LL4("row " << n << ": " << *set2.m_row[i]); + n++; } con.closeTransaction(); if (par.m_verify) CHK(set1.verify(set2) == 0); + LL3("scanread " << tab.m_name << " rows=" << n); return 0; } @@ -2369,16 +2731,30 @@ scanreadtablefast(Par par, unsigned countcheck) } static int -scanreadindex(Par par, const ITab& itab, const BSet& bset) +scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc) { Con& con = par.con(); const Tab& tab = par.tab(); const Set& set = par.set(); - // expected - Set set1(tab, set.m_rows); - bset.filter(set, set1); - LL3((par.m_verify ? "scanverify " : "scanread ") << itab.m_name << " bounds=" << bset.m_bvals); LL4(bset); + Set set1(tab, set.m_rows); + if (calc) { + while (true) { + bset.calc(par); + bset.filter(set, set1); + unsigned n = set1.count(); + // prefer proper subset + if (0 < n && n < set.m_rows) + break; + if (urandom(5) == 0) + break; + set1.reset(); + } + } else { + bset.filter(set, set1); + } + LL3("scanread " << itab.m_name << " bounds=" << bset.m_bvals << " verify=" << par.m_verify); + LL4("expect " << set1.count() << " rows"); Set set2(tab, set.m_rows); CHK(con.startTransaction() == 0); CHK(con.getNdbScanOperation(itab, tab) == 0); @@ -2386,20 +2762,29 @@ scanreadindex(Par par, const ITab& itab, const BSet& bset) CHK(bset.setbnd(par) == 0); set2.getval(par); CHK(con.executeScan() == 0); + unsigned n = 0; + bool deadlock = false; while (1) { int ret; - CHK((ret = con.nextScanResult(true)) == 0 || ret == 1); + deadlock = par.m_deadlock; + CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1); if (ret == 1) break; + if (deadlock) { + LL1("scanreadindex: stop on deadlock"); + break; + } unsigned i = (unsigned)-1; CHK(set2.getkey(par, &i) == 0); LL4("key " << i); CHK(set2.putval(i, par.m_dups) == 0); - LL4("row " << set2.count() << ": " << *set2.m_row[i]); + LL4("row " << n << ": " << *set2.m_row[i]); + n++; } con.closeTransaction(); if (par.m_verify) CHK(set1.verify(set2) == 0); + LL3("scanread " << itab.m_name << " rows=" << n); return 0; } @@ -2438,8 +2823,7 @@ scanreadindex(Par par, const ITab& itab) const Tab& tab = par.tab(); for (unsigned i = 0; i < par.m_subsubloop; i++) { BSet bset(tab, itab, par.m_rows); - bset.calc(par); - CHK(scanreadindex(par, itab, bset) == 0); + CHK(scanreadindex(par, itab, bset, true) == 0); } return 0; } @@ -2449,9 +2833,9 @@ scanreadindex(Par par) { const Tab& tab = par.tab(); for (unsigned i = 0; i < tab.m_itabs; i++) { - if (! useindex(i)) + if (tab.m_itab[i] == 0) continue; - const ITab& itab = tab.m_itab[i]; + const ITab& itab = *tab.m_itab[i]; CHK(scanreadindex(par, itab) == 0); } return 0; @@ -2481,7 +2865,7 @@ static int timescanpkindex(Par par) { const Tab& tab = par.tab(); - const ITab& itab = tab.m_itab[0]; // 1st index is on PK + const ITab& itab = *tab.m_itab[0]; // 1st index is on PK BSet bset(tab, itab, par.m_rows); par.tmr().on(); CHK(scanreadindexfast(par, itab, bset, par.m_totrows) == 0); @@ -2505,7 +2889,7 @@ static int timepkreadindex(Par par) { const Tab& tab = par.tab(); - const ITab& itab = tab.m_itab[0]; // 1st index is on PK + const ITab& itab = *tab.m_itab[0]; // 1st index is on PK BSet bset(tab, itab, par.m_rows); unsigned count = par.m_samples; if (count == 0) @@ -2575,7 +2959,12 @@ scanupdatetable(Par par) } set.unlock(); if (lst.cnt() == par.m_batch) { - CHK(con2.execute(Commit) == 0); + deadlock = par.m_deadlock; + CHK(con2.execute(Commit, deadlock) == 0); + if (deadlock) { + LL1("scanupdateindex: stop on deadlock"); + goto out; + } con2.closeTransaction(); set.lock(); set.notpending(lst); @@ -2586,7 +2975,12 @@ scanupdatetable(Par par) } CHK((ret = con.nextScanResult(false)) == 0 || ret == 1 || ret == 2); if (ret == 2 && lst.cnt() != 0) { - CHK(con2.execute(Commit) == 0); + deadlock = par.m_deadlock; + CHK(con2.execute(Commit, deadlock) == 0); + if (deadlock) { + LL1("scanupdateindex: stop on deadlock"); + goto out; + } con2.closeTransaction(); set.lock(); set.notpending(lst); @@ -2599,6 +2993,7 @@ scanupdatetable(Par par) if (ret == 1) break; } +out: con2.closeTransaction(); LL3("scan update " << tab.m_name << " rows updated=" << count); con.closeTransaction(); @@ -2659,7 +3054,12 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) } set.unlock(); if (lst.cnt() == par.m_batch) { - CHK(con2.execute(Commit) == 0); + deadlock = par.m_deadlock; + CHK(con2.execute(Commit, deadlock) == 0); + if (deadlock) { + LL1("scanupdateindex: stop on deadlock"); + goto out; + } con2.closeTransaction(); set.lock(); set.notpending(lst); @@ -2670,7 +3070,12 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) } CHK((ret = con.nextScanResult(false)) == 0 || ret == 1 || ret == 2); if (ret == 2 && lst.cnt() != 0) { - CHK(con2.execute(Commit) == 0); + deadlock = par.m_deadlock; + CHK(con2.execute(Commit, deadlock) == 0); + if (deadlock) { + LL1("scanupdateindex: stop on deadlock"); + goto out; + } con2.closeTransaction(); set.lock(); set.notpending(lst); @@ -2681,6 +3086,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) } } while (ret == 0); } +out: con2.closeTransaction(); LL3("scan update " << itab.m_name << " rows updated=" << count); con.closeTransaction(); @@ -2704,9 +3110,9 @@ scanupdateindex(Par par) { const Tab& tab = par.tab(); for (unsigned i = 0; i < tab.m_itabs; i++) { - if (! useindex(i)) + if (tab.m_itab[i] == 0) continue; - const ITab& itab = tab.m_itab[i]; + const ITab& itab = *tab.m_itab[i]; CHK(scanupdateindex(par, itab) == 0); } return 0; @@ -2743,17 +3149,25 @@ readverifyfull(Par par) CHK(scanreadtable(par) == 0); else { const Tab& tab = par.tab(); - unsigned i = par.m_no; - if (i <= tab.m_itabs && useindex(i)) { - const ITab& itab = tab.m_itab[i - 1]; + unsigned i = par.m_no - 1; + if (i < tab.m_itabs && tab.m_itab[i] != 0) { + const ITab& itab = *tab.m_itab[i]; BSet bset(tab, itab, par.m_rows); - CHK(scanreadindex(par, itab, bset) == 0); + CHK(scanreadindex(par, itab, bset, false) == 0); } } return 0; } static int +readverifyindex(Par par) +{ + par.m_verify = true; + CHK(scanreadindex(par) == 0); + return 0; +} + +static int pkops(Par par) { par.m_randomkey = true; @@ -2786,6 +3200,7 @@ static int pkupdatescanread(Par par) { par.m_dups = true; + par.m_deadlock = true; unsigned sel = urandom(10); if (sel < 5) { CHK(pkupdate(par) == 0); @@ -3074,6 +3489,24 @@ tbuild(Par par) } static int +tindexscan(Par par) +{ + RUNSTEP(par, droptable, ST); + RUNSTEP(par, createtable, ST); + RUNSTEP(par, invalidatetable, MT); + RUNSTEP(par, createindex, ST); + RUNSTEP(par, invalidateindex, MT); + RUNSTEP(par, pkinsert, MT); + RUNSTEP(par, readverifyfull, MT); + for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { + LL4("subloop " << par.m_slno); + RUNSTEP(par, readverifyindex, MT); + } + return 0; +} + + +static int tpkops(Par par) { RUNSTEP(par, droptable, ST); @@ -3188,6 +3621,10 @@ ttimemaint(Par par) static int ttimescan(Par par) { + if (par.tab().m_itab[0] == 0) { + LL1("ttimescan - no index 0, skipped"); + return 0; + } Tmr t1, t2; RUNSTEP(par, droptable, ST); RUNSTEP(par, createtable, ST); @@ -3210,6 +3647,10 @@ ttimescan(Par par) static int ttimepkread(Par par) { + if (par.tab().m_itab[0] == 0) { + LL1("ttimescan - no index 0, skipped"); + return 0; + } Tmr t1, t2; RUNSTEP(par, droptable, ST); RUNSTEP(par, createtable, ST); @@ -3250,10 +3691,11 @@ struct TCase { static const TCase tcaselist[] = { TCase("a", tbuild, "index build"), - TCase("b", tpkops, "pk operations"), - TCase("c", tpkopsread, "pk operations and scan reads"), - TCase("d", tmixedops, "pk operations and scan operations"), - TCase("e", tbusybuild, "pk operations and index build"), + TCase("b", tindexscan, "index scans"), + TCase("c", tpkops, "pk operations"), + TCase("d", tpkopsread, "pk operations and scan reads"), + TCase("e", tmixedops, "pk operations and scan operations"), + TCase("f", tbusybuild, "pk operations and index build"), TCase("t", ttimebuild, "time index build"), TCase("u", ttimemaint, "time index maintenance"), TCase("v", ttimescan, "time full scan table vs index on pk"), @@ -3277,12 +3719,16 @@ printcases() static void printtables() { - ndbout << "tables and indexes (X1 is on table PK):" << endl; + Par par(g_opt); + makebuiltintables(par); + ndbout << "builtin tables (index x0 is on table pk):" << endl; for (unsigned j = 0; j < tabcount; j++) { - const Tab& tab = tablist[j]; + if (tablist[j] == 0) + continue; + const Tab& tab = *tablist[j]; ndbout << " " << tab.m_name; for (unsigned i = 0; i < tab.m_itabs; i++) { - const ITab& itab = tab.m_itab[i]; + const ITab& itab = *tab.m_itab[i]; ndbout << " " << itab.m_name; } ndbout << endl; @@ -3293,15 +3739,25 @@ static int runtest(Par par) { LL1("start"); - if (par.m_seed != 0) + if (par.m_seed == -1) { + // good enough for daily run + unsigned short seed = (getpid() ^ time(0)); + LL1("random seed: " << seed); + srandom((unsigned)seed); + } else if (par.m_seed != 0) srandom(par.m_seed); + // cs assert(par.m_csname != 0); - CHARSET_INFO* cs; - CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0); - par.m_cs = cs; + if (strcmp(par.m_csname, "random") != 0) { + CHARSET_INFO* cs; + CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0); + par.m_cs = cs; + } + // con Con con; CHK(con.connect() == 0); par.m_con = &con; + // threads g_thrlist = new Thr* [par.m_threads]; unsigned n; for (n = 0; n < par.m_threads; n++) { @@ -3320,16 +3776,18 @@ runtest(Par par) const TCase& tcase = tcaselist[i]; if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0) continue; + makebuiltintables(par); LL1("case " << tcase.m_name << " - " << tcase.m_desc); for (unsigned j = 0; j < tabcount; j++) { - if (! usetable(j)) + if (tablist[j] == 0) continue; - const Tab& tab = tablist[j]; + const Tab& tab = *tablist[j]; par.m_tab = &tab; - delete par.m_set; par.m_set = new Set(tab, par.m_totrows); LL1("table " << tab.m_name); CHK(tcase.m_func(par) == 0); + delete par.m_set; + par.m_set = 0; } } } @@ -3353,7 +3811,7 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) { ndb_init(); if (ndbout_mutex == NULL) - ndbout_mutex= NdbMutex_Create(); + ndbout_mutex = NdbMutex_Create(); while (++argv, --argc > 0) { const char* arg = argv[0]; if (*arg != '-') { @@ -3381,6 +3839,10 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) continue; } } + if (strcmp(arg, "-collsp") == 0) { + g_opt.m_collsp = true; + continue; + } if (strcmp(arg, "-core") == 0) { g_opt.m_core = true; continue; @@ -3517,7 +3979,6 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) if (runtest(par) < 0) goto failed; } - // always exit with NDBT code ok: return NDBT_ProgramExit(NDBT_OK); failed: diff --git a/ndb/tools/desc.cpp b/ndb/tools/desc.cpp index c5e9efdfa8a..2f59d5fcd0b 100644 --- a/ndb/tools/desc.cpp +++ b/ndb/tools/desc.cpp @@ -102,7 +102,7 @@ int main(int argc, char** argv){ unsigned j; for (j= 0; (int)j < pTab->getNoOfPrimaryKeys(); j++) { - const NdbDictionary::Column * col = pTab->getColumn(j); + const NdbDictionary::Column * col = pTab->getColumn(pTab->getPrimaryKey(j)); ndbout << col->getName(); if ((int)j < pTab->getNoOfPrimaryKeys()-1) ndbout << ", "; |