diff options
24 files changed, 1163 insertions, 423 deletions
diff --git a/mysql-test/r/ndb_alter_table.result b/mysql-test/r/ndb_alter_table.result index 1661fa35d13..f3b9e962873 100644 --- a/mysql-test/r/ndb_alter_table.result +++ b/mysql-test/r/ndb_alter_table.result @@ -34,13 +34,13 @@ col5 enum('PENDING', 'ACTIVE', 'DISABLED') not null, col6 int not null, to_be_deleted int) ENGINE=ndbcluster; show table status; Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment -t1 ndbcluster 9 Dynamic 0 0 0 0 0 0 1 NULL NULL NULL latin1_swedish_ci NULL +t1 ndbcluster 10 Dynamic 0 0 0 0 0 0 1 NULL NULL NULL latin1_swedish_ci NULL SET SQL_MODE=NO_AUTO_VALUE_ON_ZERO; insert into t1 values (0,4,3,5,"PENDING",1,7),(NULL,4,3,5,"PENDING",1,7),(31,4,3,5,"PENDING",1,7), (7,4,3,5,"PENDING",1,7), (NULL,4,3,5,"PENDING",1,7), (100,4,3,5,"PENDING",1,7), (99,4,3,5,"PENDING",1,7), (8,4,3,5,"PENDING",1,7), (NULL,4,3,5,"PENDING",1,7); show table status; Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment -t1 ndbcluster 9 Dynamic 9 0 0 0 0 0 101 NULL NULL NULL latin1_swedish_ci NULL +t1 ndbcluster 10 Dynamic 9 0 0 0 0 0 101 NULL NULL NULL latin1_swedish_ci NULL select * from t1 order by col1; col1 col2 col3 col4 col5 col6 to_be_deleted 0 4 3 5 PENDING 1 7 @@ -60,7 +60,7 @@ change column col2 fourth varchar(30) not null after col3, modify column col6 int not null first; show table status; Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment -t1 ndbcluster 9 Dynamic 9 0 0 0 0 0 102 NULL NULL NULL latin1_swedish_ci NULL +t1 ndbcluster 10 Dynamic 9 0 0 0 0 0 102 NULL NULL NULL latin1_swedish_ci NULL select * from t1 order by col1; col6 col1 col3 fourth col4 col4_5 col5 col7 col8 1 0 3 4 5 PENDING 0000-00-00 00:00:00 @@ -75,7 +75,7 @@ col6 col1 col3 fourth col4 col4_5 col5 col7 col8 insert into t1 values (2, NULL,4,3,5,99,"PENDING","EXTRA",'2004-01-01 00:00:00'); show table status; Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment -t1 ndbcluster 9 Dynamic 10 0 0 0 0 0 103 NULL NULL NULL latin1_swedish_ci NULL +t1 ndbcluster 10 Dynamic 10 0 0 0 0 0 103 NULL NULL NULL latin1_swedish_ci NULL select * from t1 order by col1; col6 col1 col3 fourth col4 col4_5 col5 col7 col8 1 0 3 4 5 PENDING 0000-00-00 00:00:00 diff --git a/mysql-test/r/ndb_bitfield.result b/mysql-test/r/ndb_bitfield.result index 1532697c428..66ec593e195 100644 --- a/mysql-test/r/ndb_bitfield.result +++ b/mysql-test/r/ndb_bitfield.result @@ -143,7 +143,7 @@ create table t1 ( pk1 bit(9) not null primary key, b int ) engine=ndbcluster; -ERROR HY000: Can't create table './test/t1.frm' (errno: 743) +ERROR HY000: Can't create table './test/t1.frm' (errno: 739) create table t1 ( pk1 int not null primary key, b bit(9), diff --git a/mysql-test/r/ndb_charset.result b/mysql-test/r/ndb_charset.result index 00bc36a7c0d..752a4fba630 100644 --- a/mysql-test/r/ndb_charset.result +++ b/mysql-test/r/ndb_charset.result @@ -47,6 +47,40 @@ a aAa drop table t1; create table t1 ( +a varchar(20) character set latin1 collate latin1_swedish_ci primary key +) engine=ndb; +insert into t1 values ('A'),('b '),('C '),('d '),('E'),('f'); +insert into t1 values('b'); +ERROR 23000: Duplicate entry 'b' for key 1 +insert into t1 values('a '); +ERROR 23000: Duplicate entry 'a ' for key 1 +select a,length(a) from t1 order by a; +a length(a) +A 1 +b 2 +C 3 +d 7 +E 1 +f 1 +select a,length(a) from t1 order by a desc; +a length(a) +f 1 +E 1 +d 7 +C 3 +b 2 +A 1 +select * from t1 where a = 'a'; +a +A +select * from t1 where a = 'a '; +a +A +select * from t1 where a = 'd'; +a +d +drop table t1; +create table t1 ( p int primary key, a char(3) character set latin1 collate latin1_bin not null, unique key(a) @@ -99,6 +133,42 @@ p a drop table t1; create table t1 ( p int primary key, +a varchar(20) character set latin1 collate latin1_swedish_ci not null, +unique key(a) +) engine=ndb; +insert into t1 values (1,'A'),(2,'b '),(3,'C '),(4,'d '),(5,'E'),(6,'f'); +insert into t1 values(99,'b'); +ERROR 23000: Duplicate entry '99' for key 1 +insert into t1 values(99,'a '); +ERROR 23000: Duplicate entry '99' for key 1 +select a,length(a) from t1 order by a; +a length(a) +A 1 +b 2 +C 3 +d 7 +E 1 +f 1 +select a,length(a) from t1 order by a desc; +a length(a) +f 1 +E 1 +d 7 +C 3 +b 2 +A 1 +select * from t1 where a = 'a'; +p a +1 A +select * from t1 where a = 'a '; +p a +1 A +select * from t1 where a = 'd'; +p a +4 d +drop table t1; +create table t1 ( +p int primary key, a char(3) character set latin1 collate latin1_bin not null, index(a) ) engine=ndb; @@ -190,7 +260,77 @@ p a 6 AAA drop table t1; create table t1 ( -a varchar(10) primary key +p int primary key, +a varchar(20) character set latin1 collate latin1_swedish_ci not null, +index(a, p) +) engine=ndb; +insert into t1 values (1,'A'),(2,'b '),(3,'C '),(4,'d '),(5,'E'),(6,'f'); +insert into t1 values (7,'a'),(8,'B '),(9,'c '),(10,'D'),(11,'e'),(12,'F '); +select p,a,length(a) from t1 order by a, p; +p a length(a) +1 A 1 +7 a 1 +2 b 2 +8 B 2 +3 C 3 +9 c 3 +4 d 7 +10 D 1 +5 E 1 +11 e 1 +6 f 1 +12 F 3 +select * from t1 where a = 'a ' order by a desc, p desc; +p a +7 a +1 A +select * from t1 where a >= 'D' order by a, p; +p a +4 d +10 D +5 E +11 e +6 f +12 F +select * from t1 where a < 'D' order by a, p; +p a +1 A +7 a +2 b +8 B +3 C +9 c +select count(*) from t1 x, t1 y, t1 z where x.a = y.a and y.a = z.a; +count(*) +48 +drop table t1; +create table t1 ( +a char(5) character set ucs2, +b varchar(7) character set utf8, +primary key(a, b) +) engine=ndb; +insert into t1 values +('a','A '),('B ','b'),('c','C '),('D','d'),('e ','E'),('F','f '), +('A','b '),('b ','C'),('C','d '),('d','E'),('E ','f'), +('a','C '),('B ','d'),('c','E '),('D','f'); +insert into t1 values('d','f'); +ERROR 23000: Duplicate entry '' for key 1 +select a,b,length(a),length(b) from t1 order by a,b limit 3; +a b length(a) length(b) +a A 2 2 +A b 2 2 +a C 2 2 +select a,b,length(a),length(b) from t1 order by a desc, b desc limit 3; +a b length(a) length(b) +F f 2 3 +E f 2 1 +e E 2 1 +select a,b,length(a),length(b) from t1 where a='c' and b='c'; +a b length(a) length(b) +c C 2 5 +drop table t1; +create table t1 ( +a char(10) primary key ) engine=ndb; insert into t1 values ('jonas % '); replace into t1 values ('jonas % '); diff --git a/mysql-test/t/ndb_charset.test b/mysql-test/t/ndb_charset.test index 1b9e7e8bfcc..ab2bbcc3ad8 100644 --- a/mysql-test/t/ndb_charset.test +++ b/mysql-test/t/ndb_charset.test @@ -53,6 +53,25 @@ select * from t1 where a = 'AaA'; select * from t1 where a = 'AAA'; drop table t1; +# pk - varchar + +create table t1 ( + a varchar(20) character set latin1 collate latin1_swedish_ci primary key +) engine=ndb; +# +insert into t1 values ('A'),('b '),('C '),('d '),('E'),('f'); +-- error 1062 +insert into t1 values('b'); +-- error 1062 +insert into t1 values('a '); +# +select a,length(a) from t1 order by a; +select a,length(a) from t1 order by a desc; +select * from t1 where a = 'a'; +select * from t1 where a = 'a '; +select * from t1 where a = 'd'; +drop table t1; + # unique hash index - binary create table t1 ( @@ -102,6 +121,27 @@ select * from t1 where a = 'AaA'; select * from t1 where a = 'AAA'; drop table t1; +# unique hash index - varchar + +create table t1 ( + p int primary key, + a varchar(20) character set latin1 collate latin1_swedish_ci not null, + unique key(a) +) engine=ndb; +# +insert into t1 values (1,'A'),(2,'b '),(3,'C '),(4,'d '),(5,'E'),(6,'f'); +-- error 1062 +insert into t1 values(99,'b'); +-- error 1062 +insert into t1 values(99,'a '); +# +select a,length(a) from t1 order by a; +select a,length(a) from t1 order by a desc; +select * from t1 where a = 'a'; +select * from t1 where a = 'a '; +select * from t1 where a = 'd'; +drop table t1; + # ordered index - binary create table t1 ( @@ -158,9 +198,47 @@ select * from t1 where a = 'AaA' order by p; select * from t1 where a = 'AAA' order by p; drop table t1; +# ordered index - varchar + +create table t1 ( + p int primary key, + a varchar(20) character set latin1 collate latin1_swedish_ci not null, + index(a, p) +) engine=ndb; +# +insert into t1 values (1,'A'),(2,'b '),(3,'C '),(4,'d '),(5,'E'),(6,'f'); +insert into t1 values (7,'a'),(8,'B '),(9,'c '),(10,'D'),(11,'e'),(12,'F '); +select p,a,length(a) from t1 order by a, p; +select * from t1 where a = 'a ' order by a desc, p desc; +select * from t1 where a >= 'D' order by a, p; +select * from t1 where a < 'D' order by a, p; +# +select count(*) from t1 x, t1 y, t1 z where x.a = y.a and y.a = z.a; +drop table t1; + +# minimal multi-byte test + +create table t1 ( + a char(5) character set ucs2, + b varchar(7) character set utf8, + primary key(a, b) +) engine=ndb; +# +insert into t1 values + ('a','A '),('B ','b'),('c','C '),('D','d'),('e ','E'),('F','f '), + ('A','b '),('b ','C'),('C','d '),('d','E'),('E ','f'), + ('a','C '),('B ','d'),('c','E '),('D','f'); +-- error 1062 +insert into t1 values('d','f'); +# +select a,b,length(a),length(b) from t1 order by a,b limit 3; +select a,b,length(a),length(b) from t1 order by a desc, b desc limit 3; +select a,b,length(a),length(b) from t1 where a='c' and b='c'; +drop table t1; + # bug create table t1 ( - a varchar(10) primary key + a char(10) primary key ) engine=ndb; insert into t1 values ('jonas % '); replace into t1 values ('jonas % '); diff --git a/ndb/include/kernel/signaldata/DictTabInfo.hpp b/ndb/include/kernel/signaldata/DictTabInfo.hpp index 616da05b3ae..cc8a647615c 100644 --- a/ndb/include/kernel/signaldata/DictTabInfo.hpp +++ b/ndb/include/kernel/signaldata/DictTabInfo.hpp @@ -269,7 +269,9 @@ public: ExtTimespec = NdbSqlUtil::Type::Timespec, ExtBlob = NdbSqlUtil::Type::Blob, ExtText = NdbSqlUtil::Type::Text, - ExtBit = NdbSqlUtil::Type::Bit + ExtBit = NdbSqlUtil::Type::Bit, + ExtLongvarchar = NdbSqlUtil::Type::Longvarchar, + ExtLongvarbinary = NdbSqlUtil::Type::Longvarbinary }; // Attribute data interpretation @@ -297,98 +299,91 @@ public: return ((1 << AttributeSize) * AttributeArraySize + 31) >> 5; } - // translate to old kernel types and sizes + // compute old-sty|e attribute size and array size inline bool translateExtType() { - AttributeType = ~0; // deprecated switch (AttributeExtType) { case DictTabInfo::ExtUndefined: - break; + return false; case DictTabInfo::ExtTinyint: - AttributeSize = DictTabInfo::an8Bit; - AttributeArraySize = AttributeExtLength; - return true; case DictTabInfo::ExtTinyunsigned: AttributeSize = DictTabInfo::an8Bit; AttributeArraySize = AttributeExtLength; - return true; + break; case DictTabInfo::ExtSmallint: - AttributeSize = DictTabInfo::a16Bit; - AttributeArraySize = AttributeExtLength; - return true; case DictTabInfo::ExtSmallunsigned: AttributeSize = DictTabInfo::a16Bit; AttributeArraySize = AttributeExtLength; - return true; + break; case DictTabInfo::ExtMediumint: - AttributeSize = DictTabInfo::an8Bit; - AttributeArraySize = 3 * AttributeExtLength; - return true; case DictTabInfo::ExtMediumunsigned: AttributeSize = DictTabInfo::an8Bit; AttributeArraySize = 3 * AttributeExtLength; - return true; + break; case DictTabInfo::ExtInt: - AttributeSize = DictTabInfo::a32Bit; - AttributeArraySize = AttributeExtLength; - return true; case DictTabInfo::ExtUnsigned: AttributeSize = DictTabInfo::a32Bit; AttributeArraySize = AttributeExtLength; - return true; + break; case DictTabInfo::ExtBigint: - AttributeSize = DictTabInfo::a64Bit; - AttributeArraySize = AttributeExtLength; - return true; case DictTabInfo::ExtBigunsigned: AttributeSize = DictTabInfo::a64Bit; AttributeArraySize = AttributeExtLength; - return true; + break; case DictTabInfo::ExtFloat: AttributeSize = DictTabInfo::a32Bit; AttributeArraySize = AttributeExtLength; - return true; + break; case DictTabInfo::ExtDouble: AttributeSize = DictTabInfo::a64Bit; AttributeArraySize = AttributeExtLength; - return true; + break; case DictTabInfo::ExtDecimal: // not yet implemented anywhere - break; + return false; case DictTabInfo::ExtChar: case DictTabInfo::ExtBinary: AttributeSize = DictTabInfo::an8Bit; AttributeArraySize = AttributeExtLength; - return true; + break; case DictTabInfo::ExtVarchar: case DictTabInfo::ExtVarbinary: - // to fix + if (AttributeExtLength > 0xff) + return false; AttributeSize = DictTabInfo::an8Bit; - AttributeArraySize = AttributeExtLength + 2; - return true; + AttributeArraySize = AttributeExtLength + 1; + break; case DictTabInfo::ExtDatetime: // to fix AttributeSize = DictTabInfo::an8Bit; AttributeArraySize = 8 * AttributeExtLength; - return true; + break; case DictTabInfo::ExtTimespec: // to fix AttributeSize = DictTabInfo::an8Bit; AttributeArraySize = 12 * AttributeExtLength; - return true; + break; case DictTabInfo::ExtBlob: case DictTabInfo::ExtText: AttributeSize = DictTabInfo::an8Bit; - // head + inline part [ attr precision lower half ] + // head + inline part (length in precision lower half) AttributeArraySize = (NDB_BLOB_HEAD_SIZE << 2) + (AttributeExtPrecision & 0xFFFF); - return true; + break; case DictTabInfo::ExtBit: AttributeSize = DictTabInfo::aBit; AttributeArraySize = AttributeExtLength; - return true; + break; + case DictTabInfo::ExtLongvarchar: + case DictTabInfo::ExtLongvarbinary: + if (AttributeExtLength > 0xffff) + return false; + AttributeSize = DictTabInfo::an8Bit; + AttributeArraySize = AttributeExtLength + 2; + break; + default: + return false; }; - - return false; + return true; } inline void print(FILE *out) { diff --git a/ndb/include/ndb_constants.h b/ndb/include/ndb_constants.h index 04d86e267f7..40a3d963955 100644 --- a/ndb/include/ndb_constants.h +++ b/ndb/include/ndb_constants.h @@ -21,7 +21,7 @@ * Changing the values makes database upgrade impossible. * * New or removed definitions must be replicated to - * NdbDictionary.hpp and NdbSqlUtil.cpp. + * NdbDictionary.hpp and NdbSqlUtil.hpp. * * Not for use by application programs. * Use the enums provided by NdbDictionary instead. @@ -58,7 +58,9 @@ #define NDB_TYPE_BLOB 20 #define NDB_TYPE_TEXT 21 #define NDB_TYPE_BIT 22 +#define NDB_TYPE_LONG_VARCHAR 23 +#define NDB_TYPE_LONG_VARBINARY 24 -#define NDB_TYPE_MAX 23 +#define NDB_TYPE_MAX 25 #endif diff --git a/ndb/include/ndbapi/NdbDictionary.hpp b/ndb/include/ndbapi/NdbDictionary.hpp index 700591d6cdf..3f6368a5d60 100644 --- a/ndb/include/ndbapi/NdbDictionary.hpp +++ b/ndb/include/ndbapi/NdbDictionary.hpp @@ -149,14 +149,20 @@ public: /** * @class Column - * @brief Represents an column in an NDB Cluster table + * @brief Represents a column in an NDB Cluster table * - * Each column has a type. The type of a column is determind by a number + * Each column has a type. The type of a column is determined by a number * of type specifiers. * The type specifiers are: * - Builtin type * - Array length or max length - * - Precision and scale + * - Precision and scale (not used yet) + * - Character set for string types + * - Inline and part sizes for blobs + * + * Types in general correspond to MySQL types and their variants. + * Data formats are same as in MySQL. NDB API provides no support for + * constructing such formats. NDB kernel checks them however. */ class Column { public: @@ -179,14 +185,16 @@ public: Double = NDB_TYPE_DOUBLE, ///< 64-bit float. 8 byte float, can be used in array Decimal = NDB_TYPE_DECIMAL, ///< Precision, Scale are applicable Char = NDB_TYPE_CHAR, ///< Len. A fixed array of 1-byte chars - Varchar = NDB_TYPE_VARCHAR, ///< Max len + Varchar = NDB_TYPE_VARCHAR, ///< Length bytes: 1, Max: 255 Binary = NDB_TYPE_BINARY, ///< Len - Varbinary = NDB_TYPE_VARBINARY, ///< Max len + Varbinary = NDB_TYPE_VARBINARY, ///< Length bytes: 1, Max: 255 Datetime = NDB_TYPE_DATETIME, ///< Precision down to 1 sec (sizeof(Datetime) == 8 bytes ) Timespec = NDB_TYPE_TIMESPEC, ///< Precision down to 1 nsec(sizeof(Datetime) == 12 bytes ) Blob = NDB_TYPE_BLOB, ///< Binary large object (see NdbBlob) - Text = NDB_TYPE_TEXT, ///< Text blob, - Bit = NDB_TYPE_BIT ///< Bit, length specifies no of bits + Text = NDB_TYPE_TEXT, ///< Text blob + Bit = NDB_TYPE_BIT, ///< Bit, length specifies no of bits + Longvarchar = NDB_TYPE_LONG_VARCHAR, ///< Length bytes: 2, little-endian + Longvarbinary = NDB_TYPE_LONG_VARBINARY ///< Length bytes: 2, little-endian }; /** diff --git a/ndb/include/util/NdbSqlUtil.hpp b/ndb/include/util/NdbSqlUtil.hpp index 8ea3e9c8124..feb2b97c54b 100644 --- a/ndb/include/util/NdbSqlUtil.hpp +++ b/ndb/include/util/NdbSqlUtil.hpp @@ -20,6 +20,9 @@ #include <ndb_global.h> #include <kernel/ndb_limits.h> +struct charset_info_st; +typedef struct charset_info_st CHARSET_INFO; + class NdbSqlUtil { public: /** @@ -86,9 +89,11 @@ public: Timespec = NDB_TYPE_TIMESPEC, Blob = NDB_TYPE_BLOB, Text = NDB_TYPE_TEXT, - Bit = NDB_TYPE_BIT + Bit = NDB_TYPE_BIT, + Longvarchar = NDB_TYPE_LONG_VARCHAR, + Longvarbinary = NDB_TYPE_LONG_VARBINARY }; - Enum m_typeId; + Enum m_typeId; // redundant Cmp* m_cmp; // comparison method }; @@ -98,16 +103,29 @@ public: static const Type& getType(Uint32 typeId); /** - * Get type by id but replace char type by corresponding binary type. + * Get the normalized type used in hashing and key comparisons. + * Maps all string types to Binary. */ static const Type& getTypeBinary(Uint32 typeId); /** * Check character set. */ - static bool usable_in_pk(Uint32 typeId, const void* cs); - static bool usable_in_hash_index(Uint32 typeId, const void* cs); - static bool usable_in_ordered_index(Uint32 typeId, const void* cs); + static bool usable_in_pk(Uint32 typeId, const void* info); + static bool usable_in_hash_index(Uint32 typeId, const void* info); + static bool usable_in_ordered_index(Uint32 typeId, const void* info); + + /** + * Get number of length bytes and length from variable length string. + * Returns false on error (invalid data). For other types returns + * zero length bytes and the fixed attribute length. + */ + static bool get_var_length(Uint32 typeId, const void* p, unsigned attrlen, Uint32& lb, Uint32& len); + + /** + * Temporary workaround for bug#7284. + */ + static int strnxfrm_bug7284(CHARSET_INFO* cs, unsigned char* dst, unsigned dstLen, const unsigned char*src, unsigned srcLen); private: /** @@ -138,6 +156,9 @@ private: static Cmp cmpTimespec; static Cmp cmpBlob; static Cmp cmpText; + static Cmp cmpBit; + static Cmp cmpLongvarchar; + static Cmp cmpLongvarbinary; }; #endif diff --git a/ndb/src/common/util/NdbSqlUtil.cpp b/ndb/src/common/util/NdbSqlUtil.cpp index 9ed791d2803..fd23781605c 100644 --- a/ndb/src/common/util/NdbSqlUtil.cpp +++ b/ndb/src/common/util/NdbSqlUtil.cpp @@ -71,7 +71,7 @@ NdbSqlUtil::char_like(const char* s1, unsigned n1, } /* - * Data types. + * Data types. The entries must be in the numerical order. */ const NdbSqlUtil::Type @@ -138,7 +138,7 @@ NdbSqlUtil::m_typeList[] = { }, { Type::Varchar, - NULL // cmpVarchar + cmpVarchar }, { Type::Binary, @@ -146,7 +146,7 @@ NdbSqlUtil::m_typeList[] = { }, { Type::Varbinary, - NULL // cmpVarbinary + cmpVarbinary }, { Type::Datetime, @@ -163,6 +163,18 @@ NdbSqlUtil::m_typeList[] = { { Type::Text, NULL // cmpText + }, + { + Type::Bit, + NULL // cmpBit + }, + { + Type::Longvarchar, + cmpLongvarchar + }, + { + Type::Longvarbinary, + cmpLongvarbinary } }; @@ -181,10 +193,12 @@ NdbSqlUtil::getTypeBinary(Uint32 typeId) { switch (typeId) { case Type::Char: - typeId = Type::Binary; - break; case Type::Varchar: - typeId = Type::Varbinary; + case Type::Binary: + case Type::Varbinary: + case Type::Longvarchar: + case Type::Longvarbinary: + typeId = Type::Binary; break; case Type::Text: typeId = Type::Blob; @@ -425,11 +439,27 @@ NdbSqlUtil::cmpChar(const void* info, const void* p1, unsigned n1, const void* p return k < 0 ? -1 : k > 0 ? +1 : 0; } -// waiting for MySQL and new NDB implementation int NdbSqlUtil::cmpVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(false); + const unsigned lb = 1; + // collation does not work on prefix for some charsets + assert(full && n1 >= lb && n2 >= lb); + const uchar* v1 = (const uchar*)p1; + const uchar* v2 = (const uchar*)p2; + unsigned m1 = *v1; + unsigned m2 = *v2; + if (m1 <= n1 - lb && m2 <= n2 - lb) { + CHARSET_INFO* cs = (CHARSET_INFO*)(info); + // compare with space padding + int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false); + return k < 0 ? -1 : k > 0 ? +1 : 0; + } + // treat bad data as NULL + if (m1 > n1 - lb && m2 <= n2 - lb) + return -1; + if (m1 <= n1 - lb && m2 > n2 - lb) + return +1; return 0; } @@ -442,20 +472,39 @@ NdbSqlUtil::cmpBinary(const void* info, const void* p1, unsigned n1, const void* unsigned n = (n1 <= n2 ? n1 : n2); int k = memcmp(v1, v2, n); if (k == 0) { - if (full) - k = (int)n1 - (int)n2; - else - k = (int)n - (int)n2; + k = (full ? n1 : n) - n2; } return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown; } -// waiting for MySQL and new NDB implementation int NdbSqlUtil::cmpVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) { - assert(false); - return 0; + const unsigned lb = 1; + if (n2 >= lb) { + assert(n1 >= lb); + const uchar* v1 = (const uchar*)p1; + const uchar* v2 = (const uchar*)p2; + unsigned m1 = *v1; + unsigned m2 = *v2; + if (m1 <= n1 - lb && m2 <= n2 - lb) { + // compare as binary strings + unsigned m = (m1 <= m2 ? m1 : m2); + int k = memcmp(v1 + lb, v2 + lb, m); + if (k == 0) { + k = (full ? m1 : m) - m2; + } + return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown; + } + // treat bad data as NULL + if (m1 > n1 - lb && m2 <= n2 - lb) + return -1; + if (m1 <= n1 - lb && m2 > n2 - lb) + return +1; + return 0; + } + assert(! full); + return CmpUnknown; } // allowed but ordering is wrong before wl-1442 done @@ -489,6 +538,68 @@ NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p return 0; } +// not yet +int +NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) +{ + assert(false); + return 0; +} + +int +NdbSqlUtil::cmpLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) +{ + const unsigned lb = 2; + // collation does not work on prefix for some charsets + assert(full && n1 >= lb && n2 >= lb); + const uchar* v1 = (const uchar*)p1; + const uchar* v2 = (const uchar*)p2; + unsigned m1 = uint2korr(v1); + unsigned m2 = uint2korr(v2); + if (m1 <= n1 - lb && m2 <= n2 - lb) { + CHARSET_INFO* cs = (CHARSET_INFO*)(info); + // compare with space padding + int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false); + return k < 0 ? -1 : k > 0 ? +1 : 0; + } + // treat bad data as NULL + if (m1 > n1 - lb && m2 <= n2 - lb) + return -1; + if (m1 <= n1 - lb && m2 > n2 - lb) + return +1; + return 0; +} + +int +NdbSqlUtil::cmpLongvarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) +{ + const unsigned lb = 2; + if (n2 >= lb) { + assert(n1 >= lb); + const uchar* v1 = (const uchar*)p1; + const uchar* v2 = (const uchar*)p2; + unsigned m1 = uint2korr(v1); + unsigned m2 = uint2korr(v2); + if (m1 <= n1 - lb && m2 <= n2 - lb) { + // compare as binary strings + unsigned m = (m1 <= m2 ? m1 : m2); + int k = memcmp(v1 + lb, v2 + lb, m); + if (k == 0) { + k = (full ? m1 : m) - m2; + } + return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown; + } + // treat bad data as NULL + if (m1 > n1 - lb && m2 <= n2 - lb) + return -1; + if (m1 <= n1 - lb && m2 > n2 - lb) + return +1; + return 0; + } + assert(! full); + return CmpUnknown; +} + // check charset bool @@ -508,8 +619,6 @@ NdbSqlUtil::usable_in_pk(Uint32 typeId, const void* info) } break; case Type::Undefined: - case Type::Varchar: - case Type::Varbinary: case Type::Blob: case Type::Text: break; @@ -545,8 +654,6 @@ NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info) } break; case Type::Undefined: - case Type::Varchar: - case Type::Varbinary: case Type::Blob: case Type::Text: break; @@ -555,3 +662,68 @@ NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info) } return false; } + +// utilities + +bool +NdbSqlUtil::get_var_length(Uint32 typeId, const void* p, unsigned attrlen, Uint32& lb, Uint32& len) +{ + const unsigned char* const src = (const unsigned char*)p; + switch (typeId) { + case NdbSqlUtil::Type::Varchar: + case NdbSqlUtil::Type::Varbinary: + lb = 1; + if (attrlen >= lb) { + len = src[0]; + if (attrlen >= lb + len) + return true; + } + break; + case NdbSqlUtil::Type::Longvarchar: + case NdbSqlUtil::Type::Longvarbinary: + lb = 2; + if (attrlen >= lb) { + len = src[0] + (src[1] << 8); + if (attrlen >= lb + len) + return true; + } + break; + default: + lb = 0; + len = attrlen; + return true; + break; + } + return false; +} + +// workaround + +int +NdbSqlUtil::strnxfrm_bug7284(CHARSET_INFO* cs, unsigned char* dst, unsigned dstLen, const unsigned char*src, unsigned srcLen) +{ + unsigned char nsp[20]; // native space char + unsigned char xsp[20]; // strxfrm-ed space char +#ifdef VM_TRACE + memset(nsp, 0x1f, sizeof(nsp)); + memset(xsp, 0x1f, sizeof(xsp)); +#endif + // convert from unicode codepoint for space + int n1 = (*cs->cset->wc_mb)(cs, (my_wc_t)0x20, nsp, nsp + sizeof(nsp)); + if (n1 <= 0) + return -1; + // strxfrm to binary + int n2 = (*cs->coll->strnxfrm)(cs, xsp, sizeof(xsp), nsp, n1); + if (n2 <= 0) + return -1; + // strxfrm argument string - returns no error indication + int n3 = (*cs->coll->strnxfrm)(cs, dst, dstLen, src, srcLen); + // pad with strxfrm-ed space chars + int n4 = n3; + while (n4 < (int)dstLen) { + dst[n4] = xsp[(n4 - n3) % n2]; + n4++; + } + // no check for partial last + return dstLen; +} diff --git a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp index 0db74a0b709..7033aecccf8 100644 --- a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp +++ b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp @@ -1861,12 +1861,18 @@ Dbacc::xfrmKeyData(Signal* signal) dstWords = srcWords; } else { jam(); + Uint32 typeId = AttributeDescriptor::getType(keyAttr.attributeDescriptor); + Uint32 lb, len; + bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len); + ndbrequire(ok); Uint32 xmul = cs->strxfrm_multiply; if (xmul == 0) xmul = 1; - Uint32 dstLen = xmul * srcBytes; + // see comment in DbtcMain.cpp + Uint32 dstLen = xmul * (srcBytes - lb); ndbrequire(dstLen <= ((dstSize - dstPos) << 2)); - uint n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes); + int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len); + ndbrequire(n != -1); while ((n & 3) != 0) dstPtr[n++] = 0; dstWords = (n >> 2); diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp index f62b1eda587..709cd8a4c0f 100644 --- a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp +++ b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp @@ -4827,9 +4827,7 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it, } } - /** - * Ignore incoming old-style type and recompute it. - */ + // compute attribute size and array size bool translateOk = attrDesc.translateExtType(); tabRequire(translateOk, CreateTableRef::Inconsistency); diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp index 60f57c60f1f..0472d25318b 100644 --- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp +++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp @@ -1446,7 +1446,7 @@ private: void gcpTcfinished(Signal* signal); void handleGcp(Signal* signal); void hash(Signal* signal); - Uint32 handle_special_hash(Uint32 dstHash[4], + bool handle_special_hash(Uint32 dstHash[4], Uint32* src, Uint32 srcLen, Uint32 tabPtrI, bool distr); diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index 2f371458b73..6d4ca2d9078 100644 --- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -2314,7 +2314,7 @@ void Dbtc::hash(Signal* signal) }//if }//Dbtc::hash() -Uint32 +bool Dbtc::handle_special_hash(Uint32 dstHash[4], Uint32* src, Uint32 srcLen, Uint32 tabPtrI, bool distr) @@ -2349,17 +2349,26 @@ Dbtc::handle_special_hash(Uint32 dstHash[4], Uint32* src, Uint32 srcLen, dstWords = srcWords; } else { jam(); + Uint32 typeId = + AttributeDescriptor::getType(keyAttr.attributeDescriptor); + Uint32 lb, len; + bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len); + ndbrequire(ok); Uint32 xmul = cs->strxfrm_multiply; if (xmul == 0) xmul = 1; - Uint32 dstLen = xmul * srcBytes; + /* + * Varchar is really Char. End spaces do not matter. To get + * same hash we blank-pad to maximum length via strnxfrm. + * TODO use MySQL charset-aware hash function instead + */ + Uint32 dstLen = xmul * (srcBytes - lb); ndbrequire(dstLen <= ((dstSize - dstPos) << 2)); - uint n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes); + uint n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len); while ((n & 3) != 0) { dstPtr[n++] = 0; } dstWords = (n >> 2); - } dstPos += dstWords; srcPos += srcWords; @@ -2418,6 +2427,7 @@ Dbtc::handle_special_hash(Uint32 dstHash[4], Uint32* src, Uint32 srcLen, md5_hash(tmp, (Uint64*)dst, dstPos); dstHash[1] = tmp[1]; } + return true; // success } /* diff --git a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp index 015d555e67d..06b2b3f4cb4 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp @@ -363,25 +363,25 @@ Dbtup::readFixedSizeTHManyWordNotNULL(Uint32* outBuffer, ljam(); Tablerec* regTabPtr = tabptr.p; Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor); + uchar* dstPtr = (uchar*)&outBuffer[indexBuf]; + const uchar* srcPtr = (uchar*)&tTupleHeader[readOffset]; Uint32 i = AttributeOffset::getCharsetPos(attrDes2); ndbrequire(i < regTabPtr->noOfCharsets); CHARSET_INFO* cs = regTabPtr->charsetArray[i]; - Uint32 xmul = cs->strxfrm_multiply; - if (xmul == 0) - xmul = 1; - Uint32 dstLen = xmul * srcBytes; - Uint32 maxIndexBuf = indexBuf + (dstLen >> 2); - if (maxIndexBuf <= maxRead) { - ljam(); - uchar* dstPtr = (uchar*)&outBuffer[indexBuf]; - const uchar* srcPtr = (uchar*)&tTupleHeader[readOffset]; - const char* ssrcPtr = (const char*)srcPtr; - // could verify data format optionally - if (true || - (*cs->cset->well_formed_len)(cs, ssrcPtr, ssrcPtr + srcBytes, ZNIL) == srcBytes) { + Uint32 typeId = AttributeDescriptor::getType(attrDescriptor); + Uint32 lb, len; + bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len); + if (ok) { + Uint32 xmul = cs->strxfrm_multiply; + if (xmul == 0) + xmul = 1; + // see comment in DbtcMain.cpp + Uint32 dstLen = xmul * (srcBytes - lb); + Uint32 maxIndexBuf = indexBuf + (dstLen >> 2); + if (maxIndexBuf <= maxRead) { ljam(); - // normalize - Uint32 n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes); + int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len); + ndbrequire(n != -1); while ((n & 3) != 0) { dstPtr[n++] = 0; } @@ -393,11 +393,11 @@ Dbtup::readFixedSizeTHManyWordNotNULL(Uint32* outBuffer, return true; } else { ljam(); - terrorCode = ZTUPLE_CORRUPTED_ERROR; + terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; } } else { ljam(); - terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; + terrorCode = ZTUPLE_CORRUPTED_ERROR; } } return false; @@ -814,10 +814,15 @@ Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer, // not const in MySQL CHARSET_INFO* cs = regTabPtr->charsetArray[i]; const char* ssrc = (const char*)&inBuffer[tInBufIndex + 1]; + Uint32 lb, len; + if (! NdbSqlUtil::get_var_length(typeId, ssrc, bytes, lb, len)) { + ljam(); + terrorCode = ZINVALID_CHAR_FORMAT; + return false; + } // fast fix bug#7340 if (typeId != NDB_TYPE_TEXT && - (*cs->cset->well_formed_len)(cs, ssrc, ssrc+bytes, ZNIL) != bytes) - { + (*cs->cset->well_formed_len)(cs, ssrc + lb, ssrc + lb + len, ZNIL) != len) { ljam(); terrorCode = ZINVALID_CHAR_FORMAT; return false; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp index 84081e76a8c..a61b7c1f5ca 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp @@ -177,18 +177,29 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal) dstWords = srcWords; } else { jam(); + Uint32 typeId = descAttr.m_typeId; + Uint32 lb, len; + bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len); + if (! ok) { + jam(); + scan.m_state = ScanOp::Invalid; + sig->errorCode = TuxBoundInfo::InvalidCharFormat; + return; + } CHARSET_INFO* cs = all_charsets[descAttr.m_charset]; Uint32 xmul = cs->strxfrm_multiply; if (xmul == 0) xmul = 1; - Uint32 dstLen = xmul * srcBytes; + // see comment in DbtcMain.cpp + Uint32 dstLen = xmul * (srcBytes - lb); if (dstLen > ((dstSize - dstPos) << 2)) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::TooMuchAttrInfo; return; } - Uint32 n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes); + int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len); + ndbrequire(n != -1); while ((n & 3) != 0) { dstPtr[n++] = 0; } diff --git a/ndb/src/ndbapi/NdbDictionary.cpp b/ndb/src/ndbapi/NdbDictionary.cpp index f641b8cd5a0..d4e4821ad39 100644 --- a/ndb/src/ndbapi/NdbDictionary.cpp +++ b/ndb/src/ndbapi/NdbDictionary.cpp @@ -937,6 +937,12 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col) case NdbDictionary::Column::Bit: out << "Bit(" << col.getLength() << ")"; break; + case NdbDictionary::Column::Longvarchar: + out << "Longvarchar(" << col.getLength() << ";" << csname << ")"; + break; + case NdbDictionary::Column::Longvarbinary: + out << "Longvarbinary(" << col.getLength() << ")"; + break; default: out << "Type" << (Uint32)col.getType(); break; diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index 6bef5bcadfc..71e95ebafa6 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -143,6 +143,24 @@ NdbColumnImpl::init(Type t) m_length = 4; m_cs = default_cs; break; + case Bit: + m_precision = 0; + m_scale = 0; + m_length = 1; + m_cs = NULL; + break; + case Longvarchar: + m_precision = 0; + m_scale = 0; + m_length = 1; // legal + m_cs = default_cs; + break; + case Longvarbinary: + m_precision = 0; + m_scale = 0; + m_length = 1; // legal + m_cs = NULL; + break; case Undefined: assert(false); break; @@ -1151,6 +1169,8 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, const Uint32 * data, Uint32 len, bool fullyQualifiedNames) { + DBUG_ENTER("NdbDictInterface::parseTableInfo"); + SimplePropertiesLinearReader it(data, len); DictTabInfo::Table tableDesc; tableDesc.init(); SimpleProperties::UnpackStatus s; @@ -1160,7 +1180,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, true, true); if(s != SimpleProperties::Break){ - return 703; + DBUG_RETURN(703); } const char * internalName = tableDesc.TableName; const char * externalName = Ndb::externalizeTableName(internalName, fullyQualifiedNames); @@ -1211,15 +1231,17 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, true, true); if(s != SimpleProperties::Break){ delete impl; - return 703; + DBUG_RETURN(703); } NdbColumnImpl * col = new NdbColumnImpl(); col->m_attrId = attrDesc.AttributeId; col->setName(attrDesc.AttributeName); - if (attrDesc.AttributeExtType >= NDB_TYPE_MAX) { + + // check type and compute attribute size and array size + if (! attrDesc.translateExtType()) { delete impl; - return 703; + DBUG_RETURN(703); } col->m_type = (NdbDictionary::Column::Type)attrDesc.AttributeExtType; col->m_precision = (attrDesc.AttributeExtPrecision & 0xFFFF); @@ -1230,21 +1252,15 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, // charset is defined exactly for char types if (col->getCharType() != (cs_number != 0)) { delete impl; - return 703; + DBUG_RETURN(703); } if (col->getCharType()) { col->m_cs = get_charset(cs_number, MYF(0)); if (col->m_cs == NULL) { delete impl; - return 743; + DBUG_RETURN(743); } } - - // translate to old kernel types and sizes - if (! attrDesc.translateExtType()) { - delete impl; - return 703; - } col->m_attrSize = (1 << attrDesc.AttributeSize) / 8; col->m_arraySize = attrDesc.AttributeArraySize; if(attrDesc.AttributeSize == 0) @@ -1277,7 +1293,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, if(impl->m_columns[attrDesc.AttributeId] != 0){ delete col; delete impl; - return 703; + DBUG_RETURN(703); } impl->m_columns[attrDesc.AttributeId] = col; it.next(); @@ -1288,7 +1304,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, impl->m_noOfBlobs = blobCount; impl->m_noOfDistributionKeys = distKeys; * ret = impl; - return 0; + DBUG_RETURN(0); } /***************************************************************** @@ -1448,7 +1464,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, if (col->m_autoIncrement) { if (haveAutoIncrement) { m_error.code = 4335; - return -1; + DBUG_RETURN(-1); } haveAutoIncrement = true; autoIncrementValue = col->m_autoIncrementInitialValue; @@ -1498,14 +1514,16 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, tmpAttr.AttributeNullableFlag = col->m_nullable; tmpAttr.AttributeDKey = col->m_distributionKey; - if (col->m_type >= NDB_TYPE_MAX) { - m_error.code = 703; - return -1; - } tmpAttr.AttributeExtType = (Uint32)col->m_type; tmpAttr.AttributeExtPrecision = ((unsigned)col->m_precision & 0xFFFF); tmpAttr.AttributeExtScale = col->m_scale; tmpAttr.AttributeExtLength = col->m_length; + + // check type and compute attribute size and array size + if (! tmpAttr.translateExtType()) { + m_error.code = 703; + DBUG_RETURN(-1); + } // charset is defined exactly for char types if (col->getCharType() != (col->m_cs != NULL)) { m_error.code = 703; @@ -1519,16 +1537,13 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, // distribution key not supported for Char attribute if (col->m_distributionKey && col->m_cs != NULL) { m_error.code = 745; - return -1; + DBUG_RETURN(-1); } // charset in upper half of precision if (col->getCharType()) { tmpAttr.AttributeExtPrecision |= (col->m_cs->number << 16); } - // DICT will ignore and recompute this - (void)tmpAttr.translateExtType(); - tmpAttr.AttributeAutoIncrement = col->m_autoIncrement; BaseString::snprintf(tmpAttr.AttributeDefaultValue, sizeof(tmpAttr.AttributeDefaultValue), @@ -1573,7 +1588,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, ret= createTable(&tSignal, ptr); if (ret) - return ret; + DBUG_RETURN(ret); if (haveAutoIncrement) { if (!ndb.setAutoIncrementValue(impl.m_externalName.c_str(), diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/ndb/src/ndbapi/NdbDictionaryImpl.hpp index df20915a0e0..0b2de8f2fdc 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.hpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.hpp @@ -448,7 +448,8 @@ bool NdbColumnImpl::getCharType() const { return (m_type == NdbDictionary::Column::Char || m_type == NdbDictionary::Column::Varchar || - m_type == NdbDictionary::Column::Text); + m_type == NdbDictionary::Column::Text || + m_type == NdbDictionary::Column::Longvarchar); } inline diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp index ce4a28c1273..835e33dfb40 100644 --- a/ndb/src/ndbapi/NdbOperationDefine.cpp +++ b/ndb/src/ndbapi/NdbOperationDefine.cpp @@ -406,6 +406,14 @@ int NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, const char* aValuePassed, Uint32 len) { + DBUG_ENTER("NdbOperation::setValue"); + DBUG_PRINT("enter", ("col=%s op=%d val=0x%x len=%u", + tAttrInfo->m_name.c_str(), + theOperationType, + aValuePassed, len)); + if (aValuePassed != NULL) + DBUG_DUMP("value", (char*)aValuePassed, len); + int tReturnCode; Uint32 tAttrId; Uint32 tData; @@ -421,7 +429,7 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, ; } else { setErrorCodeAbort(4234); - return -1; + DBUG_RETURN(-1); }//if } else { if (tStatus == GetValue) { @@ -432,7 +440,7 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, // to set values in the tuple by setValue. //-------------------------------------------------------------------- if (insertATTRINFO(Interpreter::EXIT_OK) == -1){ - return -1; + DBUG_RETURN(-1); } theInterpretedSize = theTotalCurrAI_Len - (theInitialReadSize + 5); @@ -443,47 +451,47 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, // setValue used in the wrong context. Application coding error. //------------------------------------------------------------------- setErrorCodeAbort(4234); //Wrong error code - return -1; + DBUG_RETURN(-1); }//if theStatus = SetValueInterpreted; }//if } else if (tOpType == InsertRequest) { if ((theStatus != SetValue) && (theStatus != OperationDefined)) { setErrorCodeAbort(4234); - return -1; + DBUG_RETURN(-1); }//if } else if (tOpType == ReadRequest || tOpType == ReadExclusive) { setErrorCodeAbort(4504); - return -1; + DBUG_RETURN(-1); } else if (tOpType == DeleteRequest) { setErrorCodeAbort(4504); - return -1; + DBUG_RETURN(-1); } else if (tOpType == OpenScanRequest || tOpType == OpenRangeScanRequest) { setErrorCodeAbort(4228); - return -1; + DBUG_RETURN(-1); } else { //--------------------------------------------------------------------- // setValue with undefined operation type. // Probably application coding error. //--------------------------------------------------------------------- setErrorCodeAbort(4108); - return -1; + DBUG_RETURN(-1); }//if if (tAttrInfo == NULL) { setErrorCodeAbort(4004); - return -1; + DBUG_RETURN(-1); }//if if (tAttrInfo->m_pk) { if (theOperationType == InsertRequest) { - return equal_impl(tAttrInfo, aValuePassed, len); + DBUG_RETURN(equal_impl(tAttrInfo, aValuePassed, len)); } else { setErrorCodeAbort(4202); - return -1; + DBUG_RETURN(-1); }//if }//if if (len > 8000) { setErrorCodeAbort(4216); - return -1; + DBUG_RETURN(-1); }//if tAttrId = tAttrInfo->m_attrId; @@ -496,13 +504,13 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, insertATTRINFO(ahValue); // Insert Attribute Id with the value // NULL into ATTRINFO part. - return 0; + DBUG_RETURN(0); } else { /*********************************************************************** * Setting a NULL value on a NOT NULL attribute is not allowed. **********************************************************************/ setErrorCodeAbort(4203); - return -1; + DBUG_RETURN(-1); }//if }//if @@ -522,7 +530,7 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, const Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ; if (len != sizeInBytes && (len != 0)) { setErrorCodeAbort(4209); - return -1; + DBUG_RETURN(-1); }//if const Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Including bits in last word const Uint32 sizeInWords = sizeInBytes / 4; // Excluding bits in last word @@ -550,7 +558,7 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, tReturnCode = insertATTRINFOloop((Uint32*)aValue, sizeInWords); if (tReturnCode == -1) { - return tReturnCode; + DBUG_RETURN(tReturnCode); }//if if (bitsInLastWord != 0) { tData = *(Uint32*)(aValue + sizeInWords*4); @@ -559,11 +567,11 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, tData = convertEndian(tData); tReturnCode = insertATTRINFO(tData); if (tReturnCode == -1) { - return tReturnCode; + DBUG_RETURN(tReturnCode); }//if }//if theErrorLine++; - return 0; + DBUG_RETURN(0); }//NdbOperation::setValue() NdbBlob* diff --git a/ndb/src/ndbapi/NdbOperationSearch.cpp b/ndb/src/ndbapi/NdbOperationSearch.cpp index 70850bcc66b..6e76287eef0 100644 --- a/ndb/src/ndbapi/NdbOperationSearch.cpp +++ b/ndb/src/ndbapi/NdbOperationSearch.cpp @@ -57,8 +57,16 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, const char* aValuePassed, Uint32 aVariableKeyLen) { - register Uint32 tAttrId; + DBUG_ENTER("NdbOperation::equal_impl"); + DBUG_PRINT("enter", ("col=%s op=%d val=0x%x len=%u", + tAttrInfo->m_name.c_str(), + theOperationType, + aValuePassed, aVariableKeyLen)); + if (aValuePassed != NULL) + DBUG_DUMP("value", (char*)aValuePassed, aVariableKeyLen); + register Uint32 tAttrId; + Uint32 tData; Uint32 tKeyInfoPosition; const char* aValue = aValuePassed; @@ -120,7 +128,9 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, theTupleKeyDefined[i][1] = tKeyInfoPosition; theTupleKeyDefined[i][2] = true; + OperationType tOpType = theOperationType; Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; + { /************************************************************************ * Check if the pointer of the value passed is aligned on a 4 byte @@ -176,7 +186,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, }//if #endif - OperationType tOpType = theOperationType; /************************************************************************** * If the operation is an insert request and the attribute is stored then * we also set the value in the stored part through putting the @@ -231,7 +240,7 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, } else { theStatus = SetValue; }//if - return 0; + DBUG_RETURN(0); } else if ((tOpType == ReadRequest) || (tOpType == DeleteRequest) || (tOpType == ReadExclusive)) { theStatus = GetValue; @@ -242,42 +251,42 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, assert(c != 0); if (c->getBlobType()) { if (getBlobHandle(theNdbCon, c) == NULL) - return -1; + DBUG_RETURN(-1); } } } - return 0; + DBUG_RETURN(0); } else if ((tOpType == InsertRequest) || (tOpType == WriteRequest)) { theStatus = SetValue; - return 0; + DBUG_RETURN(0); } else { setErrorCodeAbort(4005); - return -1; + DBUG_RETURN(-1); }//if - return 0; + DBUG_RETURN(0); }//if } else { - return -1; + DBUG_RETURN(-1); }//if - return 0; + DBUG_RETURN(0); } if (aValue == NULL) { // NULL value in primary key setErrorCodeAbort(4505); - return -1; + DBUG_RETURN(-1); }//if if ( tAttrInfo == NULL ) { // Attribute name not found in table setErrorCodeAbort(4004); - return -1; + DBUG_RETURN(-1); }//if if (theStatus == GetValue || theStatus == SetValue){ // All pk's defined setErrorCodeAbort(4225); - return -1; + DBUG_RETURN(-1); }//if ndbout_c("theStatus: %d", theStatus); @@ -285,19 +294,19 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, // If we come here, set a general errorcode // and exit setErrorCodeAbort(4200); - return -1; + DBUG_RETURN(-1); equal_error1: setErrorCodeAbort(4205); - return -1; + DBUG_RETURN(-1); equal_error2: setErrorCodeAbort(4206); - return -1; + DBUG_RETURN(-1); equal_error3: setErrorCodeAbort(4209); - return -1; + DBUG_RETURN(-1); } /****************************************************************************** diff --git a/ndb/src/ndbapi/NdbRecAttr.cpp b/ndb/src/ndbapi/NdbRecAttr.cpp index 37142c94f8f..57f896e7e42 100644 --- a/ndb/src/ndbapi/NdbRecAttr.cpp +++ b/ndb/src/ndbapi/NdbRecAttr.cpp @@ -224,9 +224,11 @@ NdbOut& operator<<(NdbOut& out, const NdbRecAttr &r) j = r.arraySize(); break; case NdbDictionary::Column::Varchar: - ndbrecattr_print_string(out,"Varchar", r.aRef()+ 2, - ntohs(r.u_short_value())); - j = r.arraySize(); + { + unsigned len = *(const unsigned char*)r.aRef(); + ndbrecattr_print_string(out,"Varchar", r.aRef()+1,len); + j = r.arraySize(); + } break; case NdbDictionary::Column::Float: out << r.float_value(); @@ -256,6 +258,13 @@ NdbOut& operator<<(NdbOut& out, const NdbRecAttr &r) j = r.arraySize(); } break; + case NdbDictionary::Column::Longvarchar: + { + unsigned len = uint2korr(r.aRef()); + ndbrecattr_print_string(out,"Longvarchar", r.aRef()+2,len); + j = r.arraySize(); + } + break; default: /* no print functions for the rest, just print type */ out << (int) r.getType(); j = r.arraySize(); diff --git a/ndb/src/ndbapi/NdbTransaction.cpp b/ndb/src/ndbapi/NdbTransaction.cpp index d010c0ae0d8..f6664657f73 100644 --- a/ndb/src/ndbapi/NdbTransaction.cpp +++ b/ndb/src/ndbapi/NdbTransaction.cpp @@ -1092,7 +1092,11 @@ NdbTransaction::getNdbIndexScanOperation(const char* anIndexName, { NdbIndexImpl* index = theNdb->theDictionary->getIndex(anIndexName, aTableName); + if (index == 0) + return 0; NdbTableImpl* table = theNdb->theDictionary->getTable(aTableName); + if (table == 0) + return 0; return getNdbIndexScanOperation(index, table); } diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp index adaccebed6e..8a09a2ec9d1 100644 --- a/ndb/test/ndbapi/testOIBasic.cpp +++ b/ndb/test/ndbapi/testOIBasic.cpp @@ -28,7 +28,9 @@ #include <NdbCondition.h> #include <NdbThread.h> #include <NdbTick.h> +#include <NdbSleep.h> #include <my_sys.h> +#include <NdbSqlUtil.hpp> // options @@ -72,7 +74,7 @@ struct Opt { m_die(0), m_dups(false), m_fragtype(NdbDictionary::Object::FragUndefined), - m_subsubloop(2), + m_subsubloop(4), m_index(0), m_loop(1), m_msglock(true), @@ -87,7 +89,7 @@ struct Opt { m_seed(-1), m_subloop(4), m_table(0), - m_threads(10), + m_threads(4), m_v(1) { } }; @@ -257,6 +259,7 @@ struct Par : public Opt { bool m_verify; // deadlock possible bool m_deadlock; + NdbOperation::LockMode m_lockmode; // ordered range scan bool m_ordered; bool m_descending; @@ -278,6 +281,7 @@ struct Par : public Opt { m_randomkey(false), m_verify(false), m_deadlock(false), + m_lockmode(NdbOperation::LM_Read), m_ordered(false), m_descending(false) { } @@ -598,7 +602,9 @@ getcs(Par par) struct Col { enum Type { Unsigned = NdbDictionary::Column::Unsigned, - Char = NdbDictionary::Column::Char + Char = NdbDictionary::Column::Char, + Varchar = NdbDictionary::Column::Varchar, + Longvarchar = NdbDictionary::Column::Longvarchar }; const class Tab& m_tab; unsigned m_num; @@ -612,7 +618,7 @@ struct Col { Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs); ~Col(); bool equal(const Col& col2) const; - void verify(const void* addr) const; + void wellformed(const void* addr) const; }; Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs) : @@ -626,6 +632,9 @@ Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type typ m_nullable(nullable), m_chs(chs) { + // fix long varchar + if (type == Varchar && m_bytelength > 255) + m_type = Longvarchar; } Col::~Col() @@ -640,7 +649,7 @@ Col::equal(const Col& col2) const } void -Col::verify(const void* addr) const +Col::wellformed(const void* addr) const { switch (m_type) { case Col::Unsigned: @@ -653,6 +662,26 @@ Col::verify(const void* addr) const assert((*cs->cset->well_formed_len)(cs, src, src + len, 0xffff) == len); } break; + case Col::Varchar: + { + CHARSET_INFO* cs = m_chs->m_cs; + const unsigned char* src = (const unsigned char*)addr; + const char* ssrc = (const char*)src; + unsigned len = src[0]; + assert(len <= m_bytelength); + assert((*cs->cset->well_formed_len)(cs, ssrc + 1, ssrc + 1 + len, 0xffff) == len); + } + break; + case Col::Longvarchar: + { + CHARSET_INFO* cs = m_chs->m_cs; + const unsigned char* src = (const unsigned char*)addr; + const char* ssrc = (const char*)src; + unsigned len = src[0] + (src[1] << 8); + assert(len <= m_bytelength); + assert((*cs->cset->well_formed_len)(cs, ssrc + 2, ssrc + 2 + len, 0xffff) == len); + } + break; default: assert(false); break; @@ -673,6 +702,18 @@ operator<<(NdbOut& out, const Col& col) out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")"; } break; + case Col::Varchar: + { + CHARSET_INFO* cs = col.m_chs->m_cs; + out << " varchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")"; + } + break; + case Col::Longvarchar: + { + CHARSET_INFO* cs = col.m_chs->m_cs; + out << " longvarchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")"; + } + break; default: out << "type" << (int)col.m_type; assert(false); @@ -928,35 +969,27 @@ makebuiltintables(Par par) t->itabadd(2, x); } if (useindex(par, 3)) { - // d, c, b - ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 3); - x->icoladd(0, new ICol(*x, 0, *t->m_col[3])); - x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); - x->icoladd(2, new ICol(*x, 2, *t->m_col[1])); - t->itabadd(3, x); - } - if (useindex(par, 4)) { // b, e, c, d - ITab* x = new ITab(*t, "ti0x4", ITab::OrderedIndex, 4); + ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 4); x->icoladd(0, new ICol(*x, 0, *t->m_col[1])); x->icoladd(1, new ICol(*x, 1, *t->m_col[4])); x->icoladd(2, new ICol(*x, 2, *t->m_col[2])); x->icoladd(3, new ICol(*x, 3, *t->m_col[3])); - t->itabadd(4, x); + t->itabadd(3, x); } - if (useindex(par, 5)) { + if (useindex(par, 4)) { // a, c - ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2); + ITab* x = new ITab(*t, "ti0z4", ITab::UniqueHashIndex, 2); x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); - t->itabadd(5, x); + t->itabadd(4, x); } - if (useindex(par, 6)) { + if (useindex(par, 5)) { // a, e - ITab* x = new ITab(*t, "ti0z6", ITab::UniqueHashIndex, 2); + ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2); x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); x->icoladd(1, new ICol(*x, 1, *t->m_col[4])); - t->itabadd(6, x); + t->itabadd(5, x); } tablist[0] = t; } @@ -967,56 +1000,50 @@ makebuiltintables(Par par) t->coladd(0, new Col(*t, 0, "a", 0, Col::Unsigned, 1, 0, 0)); t->coladd(1, new Col(*t, 1, "b", 1, Col::Unsigned, 1, 0, 0)); t->coladd(2, new Col(*t, 2, "c", 0, Col::Char, 20, 1, getcs(par))); - t->coladd(3, new Col(*t, 3, "d", 0, Col::Char, 5, 0, getcs(par))); - t->coladd(4, new Col(*t, 4, "e", 0, Col::Char, 5, 1, getcs(par))); + t->coladd(3, new Col(*t, 3, "d", 0, Col::Varchar, 5, 0, getcs(par))); + t->coladd(4, new Col(*t, 4, "e", 0, Col::Longvarchar, 5, 1, getcs(par))); if (useindex(par, 0)) { // b ITab* x = new ITab(*t, "ti1x0", ITab::OrderedIndex, 1); x->icoladd(0, new ICol(*x, 0, *t->m_col[1])); + t->itabadd(0, x); } if (useindex(par, 1)) { - // a, c + // c, a ITab* x = new ITab(*t, "ti1x1", ITab::OrderedIndex, 2); - x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); - x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); + x->icoladd(0, new ICol(*x, 0, *t->m_col[2])); + x->icoladd(1, new ICol(*x, 1, *t->m_col[0])); t->itabadd(1, x); } if (useindex(par, 2)) { - // c, a - ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 2); - x->icoladd(0, new ICol(*x, 0, *t->m_col[2])); - x->icoladd(1, new ICol(*x, 1, *t->m_col[0])); + // d + ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 1); + x->icoladd(0, new ICol(*x, 0, *t->m_col[3])); t->itabadd(2, x); } if (useindex(par, 3)) { - // e - ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 1); - x->icoladd(0, new ICol(*x, 0, *t->m_col[4])); - t->itabadd(3, x); - } - if (useindex(par, 4)) { // e, d, c, b - ITab* x = new ITab(*t, "ti1x4", ITab::OrderedIndex, 4); + ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 4); x->icoladd(0, new ICol(*x, 0, *t->m_col[4])); x->icoladd(1, new ICol(*x, 1, *t->m_col[3])); x->icoladd(2, new ICol(*x, 2, *t->m_col[2])); x->icoladd(3, new ICol(*x, 3, *t->m_col[1])); - t->itabadd(4, x); + t->itabadd(3, x); } - if (useindex(par, 5)) { + if (useindex(par, 4)) { // a, b - ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 2); + ITab* x = new ITab(*t, "ti1z4", ITab::UniqueHashIndex, 2); x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); x->icoladd(1, new ICol(*x, 1, *t->m_col[1])); - t->itabadd(5, x); + t->itabadd(4, x); } - if (useindex(par, 6)) { + if (useindex(par, 5)) { // a, b, d - ITab* x = new ITab(*t, "ti1z6", ITab::UniqueHashIndex, 3); + ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 3); x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); x->icoladd(1, new ICol(*x, 1, *t->m_col[1])); x->icoladd(2, new ICol(*x, 2, *t->m_col[3])); - t->itabadd(6, x); + t->itabadd(5, x); } tablist[1] = t; } @@ -1027,8 +1054,8 @@ makebuiltintables(Par par) t->coladd(0, new Col(*t, 0, "a", 1, Col::Char, 31, 0, getcs(par))); t->coladd(1, new Col(*t, 1, "b", 0, Col::Char, 4, 1, getcs(par))); t->coladd(2, new Col(*t, 2, "c", 1, Col::Unsigned, 1, 0, 0)); - t->coladd(3, new Col(*t, 3, "d", 1, Col::Char, 3, 0, getcs(par))); - t->coladd(4, new Col(*t, 4, "e", 0, Col::Char, 17, 0, getcs(par))); + t->coladd(3, new Col(*t, 3, "d", 1, Col::Varchar, 128, 0, getcs(par))); + t->coladd(4, new Col(*t, 4, "e", 0, Col::Varchar, 7, 0, getcs(par))); if (useindex(par, 0)) { // a, c, d ITab* x = new ITab(*t, "ti2x0", ITab::OrderedIndex, 3); @@ -1060,27 +1087,20 @@ makebuiltintables(Par par) t->itabadd(3, x); } if (useindex(par, 4)) { - // a, e - ITab* x = new ITab(*t, "ti2x4", ITab::OrderedIndex, 2); - x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); - x->icoladd(1, new ICol(*x, 1, *t->m_col[4])); - t->itabadd(4, x); - } - if (useindex(par, 5)) { // a, c - ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 2); + ITab* x = new ITab(*t, "ti2z4", ITab::UniqueHashIndex, 2); x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); - t->itabadd(5, x); + t->itabadd(4, x); } - if (useindex(par, 6)) { + if (useindex(par, 5)) { // a, c, d, e - ITab* x = new ITab(*t, "ti2z6", ITab::UniqueHashIndex, 4); + ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 4); x->icoladd(0, new ICol(*x, 0, *t->m_col[0])); x->icoladd(1, new ICol(*x, 1, *t->m_col[2])); x->icoladd(2, new ICol(*x, 2, *t->m_col[3])); x->icoladd(3, new ICol(*x, 3, *t->m_col[4])); - t->itabadd(6, x); + t->itabadd(5, x); } tablist[2] = t; } @@ -1115,19 +1135,19 @@ struct Con { void disconnect(); int startTransaction(); int getNdbOperation(const Tab& tab); + int getNdbIndexOperation1(const ITab& itab, const Tab& tab); int getNdbIndexOperation(const ITab& itab, const Tab& tab); int getNdbScanOperation(const Tab& tab); - int getNdbScanOperation(const ITab& itab, const Tab& tab); + int getNdbIndexScanOperation1(const ITab& itab, const Tab& tab); + int getNdbIndexScanOperation(const ITab& itab, const Tab& tab); int equal(int num, const char* addr); int getValue(int num, NdbRecAttr*& rec); int setValue(int num, const char* addr); int setBound(int num, int type, const void* value); int execute(ExecType t); int execute(ExecType t, bool& deadlock); - int openScanRead(unsigned scanbat, unsigned scanpar); - int openScanExclusive(unsigned scanbat, unsigned scanpar); - int openScanOrdered(unsigned scanbat, unsigned scanpar, bool descending); - int openScanOrderedExclusive(unsigned scanbat, unsigned scanpar, bool descending); + int readTuples(Par par); + int readIndexTuples(Par par); int executeScan(); int nextScanResult(bool fetchAllowed); int nextScanResult(bool fetchAllowed, bool& deadlock); @@ -1182,7 +1202,7 @@ Con::getNdbOperation(const Tab& tab) } int -Con::getNdbIndexOperation(const ITab& itab, const Tab& tab) +Con::getNdbIndexOperation1(const ITab& itab, const Tab& tab) { assert(m_tx != 0); CHKCON((m_op = m_indexop = m_tx->getNdbIndexOperation(itab.m_name, tab.m_name)) != 0, *this); @@ -1190,6 +1210,20 @@ Con::getNdbIndexOperation(const ITab& itab, const Tab& tab) } int +Con::getNdbIndexOperation(const ITab& itab, const Tab& tab) +{ + assert(m_tx != 0); + unsigned tries = 0; + while (1) { + if (getNdbIndexOperation1(itab, tab) == 0) + break; + CHK(++tries < 10); + NdbSleep_MilliSleep(100); + } + return 0; +} + +int Con::getNdbScanOperation(const Tab& tab) { assert(m_tx != 0); @@ -1198,7 +1232,7 @@ Con::getNdbScanOperation(const Tab& tab) } int -Con::getNdbScanOperation(const ITab& itab, const Tab& tab) +Con::getNdbIndexScanOperation1(const ITab& itab, const Tab& tab) { assert(m_tx != 0); CHKCON((m_op = m_scanop = m_indexscanop = m_tx->getNdbIndexScanOperation(itab.m_name, tab.m_name)) != 0, *this); @@ -1206,6 +1240,20 @@ Con::getNdbScanOperation(const ITab& itab, const Tab& tab) } int +Con::getNdbIndexScanOperation(const ITab& itab, const Tab& tab) +{ + assert(m_tx != 0); + unsigned tries = 0; + while (1) { + if (getNdbIndexScanOperation1(itab, tab) == 0) + break; + CHK(++tries < 10); + NdbSleep_MilliSleep(100); + } + return 0; +} + +int Con::equal(int num, const char* addr) { assert(m_tx != 0 && m_op != 0); @@ -1262,42 +1310,21 @@ Con::execute(ExecType t, bool& deadlock) } int -Con::openScanRead(unsigned scanbat, unsigned scanpar) +Con::readTuples(Par par) { assert(m_tx != 0 && m_scanop != 0); - NdbOperation::LockMode lm = NdbOperation::LM_Read; - CHKCON(m_scanop->readTuples(lm, scanbat, scanpar) == 0, *this); + CHKCON(m_scanop->readTuples(par.m_lockmode, par.m_scanbat, par.m_scanpar) == 0, *this); return 0; } int -Con::openScanExclusive(unsigned scanbat, unsigned scanpar) -{ - assert(m_tx != 0 && m_scanop != 0); - NdbOperation::LockMode lm = NdbOperation::LM_Exclusive; - CHKCON(m_scanop->readTuples(lm, scanbat, scanpar) == 0, *this); - return 0; -} - -int -Con::openScanOrdered(unsigned scanbat, unsigned scanpar, bool descending) -{ - assert(m_tx != 0 && m_indexscanop != 0); - NdbOperation::LockMode lm = NdbOperation::LM_Read; - CHKCON(m_indexscanop->readTuples(lm, scanbat, scanpar, true, descending) == 0, *this); - return 0; -} - -int -Con::openScanOrderedExclusive(unsigned scanbat, unsigned scanpar, bool descending) +Con::readIndexTuples(Par par) { assert(m_tx != 0 && m_indexscanop != 0); - NdbOperation::LockMode lm = NdbOperation::LM_Exclusive; - CHKCON(m_indexscanop->readTuples(lm, scanbat, scanpar, true, descending) == 0, *this); + CHKCON(m_indexscanop->readTuples(par.m_lockmode, par.m_scanbat, par.m_scanpar, par.m_ordered, par.m_descending) == 0, *this); return 0; } - int Con::executeScan() { @@ -1564,6 +1591,8 @@ struct Val { union { Uint32 m_uint32; unsigned char* m_char; + unsigned char* m_varchar; + unsigned char* m_longvarchar; }; Val(const Col& col); ~Val(); @@ -1576,9 +1605,12 @@ struct Val { int setval(Par par) const; void calc(Par par, unsigned i); void calckey(Par par, unsigned i); + void calckeychars(Par par, unsigned i, unsigned& n, unsigned char* buf); void calcnokey(Par par); - int verify(const Val& val2) const; - int cmp(const Val& val2) const; + void calcnokeychars(Par par, unsigned& n, unsigned char* buf); + int verify(Par par, const Val& val2) const; + int cmp(Par par, const Val& val2) const; + int cmpchars(Par par, const unsigned char* buf1, unsigned len1, const unsigned char* buf2, unsigned len2) const; private: Val& operator=(const Val& val2); }; @@ -1595,6 +1627,12 @@ Val::Val(const Col& col) : case Col::Char: m_char = new unsigned char [col.m_bytelength]; break; + case Col::Varchar: + m_varchar = new unsigned char [1 + col.m_bytelength]; + break; + case Col::Longvarchar: + m_longvarchar = new unsigned char [2 + col.m_bytelength]; + break; default: assert(false); break; @@ -1610,6 +1648,12 @@ Val::~Val() case Col::Char: delete [] m_char; break; + case Col::Varchar: + delete [] m_varchar; + break; + case Col::Longvarchar: + delete [] m_longvarchar; + break; default: assert(false); break; @@ -1640,6 +1684,12 @@ Val::copy(const void* addr) case Col::Char: memcpy(m_char, addr, col.m_bytelength); break; + case Col::Varchar: + memcpy(m_varchar, addr, 1 + col.m_bytelength); + break; + case Col::Longvarchar: + memcpy(m_longvarchar, addr, 2 + col.m_bytelength); + break; default: assert(false); break; @@ -1656,6 +1706,10 @@ Val::dataaddr() const return &m_uint32; case Col::Char: return m_char; + case Col::Varchar: + return m_varchar; + case Col::Longvarchar: + return m_longvarchar; default: break; } @@ -1704,7 +1758,7 @@ Val::calc(Par par, unsigned i) const Col& col = m_col; col.m_pk ? calckey(par, i) : calcnokey(par); if (! m_null) - col.verify(dataaddr()); + col.wellformed(dataaddr()); } void @@ -1721,19 +1775,30 @@ Val::calckey(Par par, unsigned i) const Chs* chs = col.m_chs; CHARSET_INFO* cs = chs->m_cs; unsigned n = 0; - // our random chars may not fill value exactly - while (n + cs->mbmaxlen <= col.m_bytelength) { - if (i % (1 + n) == 0) { - break; - } - const Chr& chr = chs->m_chr[i % maxcharcount]; - memcpy(&m_char[n], chr.m_bytes, chr.m_size); - n += chr.m_size; - } - // this will extend by appropriate space + calckeychars(par, i, n, m_char); + // extend by appropriate space (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20); } break; + case Col::Varchar: + { + unsigned n = 0; + calckeychars(par, i, n, m_varchar + 1); + // set length and pad with nulls + m_varchar[0] = n; + memset(&m_varchar[1 + n], 0, col.m_bytelength - n); + } + break; + case Col::Longvarchar: + { + unsigned n = 0; + calckeychars(par, i, n, m_longvarchar + 2); + // set length and pad with nulls + m_longvarchar[0] = (n & 0xff); + m_longvarchar[1] = (n >> 8); + memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n); + } + break; default: assert(false); break; @@ -1741,6 +1806,24 @@ Val::calckey(Par par, unsigned i) } void +Val::calckeychars(Par par, unsigned i, unsigned& n, unsigned char* buf) +{ + const Col& col = m_col; + const Chs* chs = col.m_chs; + CHARSET_INFO* cs = chs->m_cs; + n = 0; + // our random chars may not fill value exactly + while (n + cs->mbmaxlen <= col.m_bytelength) { + if (i % (1 + n) == 0) { + break; + } + const Chr& chr = chs->m_chr[i % maxcharcount]; + memcpy(buf + n, chr.m_bytes, chr.m_size); + n += chr.m_size; + } +} + +void Val::calcnokey(Par par) { const Col& col = m_col; @@ -1764,42 +1847,71 @@ Val::calcnokey(Par par) const Chs* chs = col.m_chs; CHARSET_INFO* cs = chs->m_cs; unsigned n = 0; - // our random chars may not fill value exactly - while (n + cs->mbmaxlen <= col.m_bytelength) { - if (urandom(1 + col.m_bytelength) == 0) { - break; - } - unsigned half = maxcharcount / 2; - int r = irandom((par.m_pctrange * half) / 100); - if (par.m_bdir != 0 && urandom(10) != 0) { - if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0) - r = -r; - } - unsigned i = half + r; - assert(i < maxcharcount); - const Chr& chr = chs->m_chr[i]; - memcpy(&m_char[n], chr.m_bytes, chr.m_size); - n += chr.m_size; - } - // this will extend by appropriate space + calcnokeychars(par, n, m_char); + // extend by appropriate space (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20); } break; + case Col::Varchar: + { + unsigned n = 0; + calcnokeychars(par, n, m_varchar + 1); + // set length and pad with nulls + m_varchar[0] = n; + memset(&m_varchar[1 + n], 0, col.m_bytelength - n); + } + break; + case Col::Longvarchar: + { + unsigned n = 0; + calcnokeychars(par, n, m_longvarchar + 2); + // set length and pad with nulls + m_longvarchar[0] = (n & 0xff); + m_longvarchar[1] = (n >> 8); + memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n); + } + break; default: assert(false); break; } } +void +Val::calcnokeychars(Par par, unsigned& n, unsigned char* buf) +{ + const Col& col = m_col; + const Chs* chs = col.m_chs; + CHARSET_INFO* cs = chs->m_cs; + n = 0; + // our random chars may not fill value exactly + while (n + cs->mbmaxlen <= col.m_bytelength) { + if (urandom(1 + col.m_bytelength) == 0) { + break; + } + unsigned half = maxcharcount / 2; + int r = irandom((par.m_pctrange * half) / 100); + if (par.m_bdir != 0 && urandom(10) != 0) { + if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0) + r = -r; + } + unsigned i = half + r; + assert(i < maxcharcount); + const Chr& chr = chs->m_chr[i]; + memcpy(buf + n, chr.m_bytes, chr.m_size); + n += chr.m_size; + } +} + int -Val::verify(const Val& val2) const +Val::verify(Par par, const Val& val2) const { - CHK(cmp(val2) == 0); + CHK(cmp(par, val2) == 0); return 0; } int -Val::cmp(const Val& val2) const +Val::cmp(Par par, const Val& val2) const { const Col& col = m_col; const Col& col2 = val2.m_col; @@ -1812,8 +1924,8 @@ Val::cmp(const Val& val2) const return 0; } // verify data formats - col.verify(dataaddr()); - col.verify(val2.dataaddr()); + col.wellformed(dataaddr()); + col.wellformed(val2.dataaddr()); // compare switch (col.m_type) { case Col::Unsigned: @@ -1827,25 +1939,22 @@ Val::cmp(const Val& val2) const break; case Col::Char: { - const Chs* chs = col.m_chs; - CHARSET_INFO* cs = chs->m_cs; unsigned len = col.m_bytelength; - int k; - if (! g_opt.m_collsp) { - unsigned char x1[maxxmulsize * 8000]; - unsigned char x2[maxxmulsize * 8000]; - int n1 = (*cs->coll->strnxfrm)(cs, x1, chs->m_xmul * len, m_char, len); - int n2 = (*cs->coll->strnxfrm)(cs, x2, chs->m_xmul * len, val2.m_char, len); - // currently same but do not assume it - unsigned n = (n1 > n2 ? n1 : n2); - // assume null padding - memset(x1 + n1, 0x0, n - n1); - memset(x2 + n2, 0x0, n - n2); - k = memcmp(x1, x2, n); - } else { - k = (*cs->coll->strnncollsp)(cs, m_char, len, val2.m_char, len, false); - } - return k < 0 ? -1 : k > 0 ? +1 : 0; + return cmpchars(par, m_char, len, val2.m_char, len); + } + break; + case Col::Varchar: + { + unsigned len1 = m_varchar[0]; + unsigned len2 = val2.m_varchar[0]; + return cmpchars(par, m_varchar + 1, len1, val2.m_varchar + 1, len2); + } + break; + case Col::Longvarchar: + { + unsigned len1 = m_longvarchar[0] + (m_longvarchar[1] << 8); + unsigned len2 = val2.m_longvarchar[0] + (val2.m_longvarchar[1] << 8); + return cmpchars(par, m_longvarchar + 2, len1, val2.m_longvarchar + 2, len2); } break; default: @@ -1855,6 +1964,56 @@ Val::cmp(const Val& val2) const return 0; } +int +Val::cmpchars(Par par, const unsigned char* buf1, unsigned len1, const unsigned char* buf2, unsigned len2) const +{ + const Col& col = m_col; + const Chs* chs = col.m_chs; + CHARSET_INFO* cs = chs->m_cs; + int k; + if (! par.m_collsp) { + unsigned char x1[maxxmulsize * 8000]; + unsigned char x2[maxxmulsize * 8000]; + // make strxfrm pad both to same length + unsigned len = maxxmulsize * col.m_bytelength; + int n1 = NdbSqlUtil::strnxfrm_bug7284(cs, x1, chs->m_xmul * len, buf1, len1); + int n2 = NdbSqlUtil::strnxfrm_bug7284(cs, x2, chs->m_xmul * len, buf2, len2); + assert(n1 == n2); + k = memcmp(x1, x2, n1); + } else { + k = (*cs->coll->strnncollsp)(cs, buf1, len1, buf2, len2, false); + } + return k < 0 ? -1 : k > 0 ? +1 : 0; +} + +static void +printstring(NdbOut& out, const unsigned char* str, unsigned len, bool showlen) +{ + char buf[4 * 8000]; + char *p = buf; + *p++ = '['; + if (showlen) { + sprintf(p, "%u:", len); + p += strlen(p); + } + for (unsigned i = 0; i < len; i++) { + unsigned char c = str[i]; + if (c == '\\') { + *p++ = '\\'; + *p++ = c; + } else if (0x20 <= c && c < 0x7e) { + *p++ = c; + } else { + *p++ = '\\'; + *p++ = hexstr[c >> 4]; + *p++ = hexstr[c & 15]; + } + } + *p++ = ']'; + *p = 0; + out << buf; +} + static NdbOut& operator<<(NdbOut& out, const Val& val) { @@ -1869,25 +2028,20 @@ operator<<(NdbOut& out, const Val& val) break; case Col::Char: { - char buf[4 * 8000]; - char *p = buf; - *p++ = '['; - for (unsigned i = 0; i < col.m_bytelength; i++) { - unsigned char c = val.m_char[i]; - if (c == '\\') { - *p++ = '\\'; - *p++ = '\\'; - } else if (0x20 <= c && c < 0x7e) { - *p++ = c; - } else { - *p++ = '\\'; - *p++ = hexstr[c >> 4]; - *p++ = hexstr[c & 15]; - } - } - *p++ = ']'; - *p = 0; - out << buf; + unsigned len = col.m_bytelength; + printstring(out, val.m_char, len, false); + } + break; + case Col::Varchar: + { + unsigned len = val.m_varchar[0]; + printstring(out, val.m_varchar + 1, len, true); + } + break; + case Col::Longvarchar: + { + unsigned len = val.m_longvarchar[0] + (val.m_longvarchar[1] << 8); + printstring(out, val.m_longvarchar + 2, len, true); } break; default: @@ -1912,7 +2066,7 @@ struct Row { void copy(const Row& row2); void calc(Par par, unsigned i, unsigned mask = 0); const Row& dbrow() const; - int verify(const Row& row2) const; + int verify(Par par, const Row& row2) const; int insrow(Par par); int updrow(Par par); int updrow(Par par, const ITab& itab); @@ -1921,8 +2075,8 @@ struct Row { int selrow(Par par); int selrow(Par par, const ITab& itab); int setrow(Par par); - int cmp(const Row& row2) const; - int cmp(const Row& row2, const ITab& itab) const; + int cmp(Par par, const Row& row2) const; + int cmp(Par par, const Row& row2, const ITab& itab) const; private: Row& operator=(const Row& row2); }; @@ -1994,7 +2148,7 @@ Row::dbrow() const } int -Row::verify(const Row& row2) const +Row::verify(Par par, const Row& row2) const { const Tab& tab = m_tab; const Row& row1 = *this; @@ -2002,7 +2156,7 @@ Row::verify(const Row& row2) const for (unsigned k = 0; k < tab.m_cols; k++) { const Val& val1 = *row1.m_val[k]; const Val& val2 = *row2.m_val[k]; - CHK(val1.verify(val2) == 0); + CHK(val1.verify(par, val2) == 0); } return 0; } @@ -2169,7 +2323,7 @@ Row::setrow(Par par) } int -Row::cmp(const Row& row2) const +Row::cmp(Par par, const Row& row2) const { const Tab& tab = m_tab; assert(&tab == &row2.m_tab); @@ -2177,14 +2331,14 @@ Row::cmp(const Row& row2) const for (unsigned k = 0; k < tab.m_cols; k++) { const Val& val = *m_val[k]; const Val& val2 = *row2.m_val[k]; - if ((c = val.cmp(val2)) != 0) + if ((c = val.cmp(par, val2)) != 0) break; } return c; } int -Row::cmp(const Row& row2, const ITab& itab) const +Row::cmp(Par par, const Row& row2, const ITab& itab) const { const Tab& tab = m_tab; int c = 0; @@ -2195,7 +2349,7 @@ Row::cmp(const Row& row2, const ITab& itab) const assert(k < tab.m_cols); const Val& val = *m_val[k]; const Val& val2 = *row2.m_val[k]; - if ((c = val.cmp(val2)) != 0) + if ((c = val.cmp(par, val2)) != 0) break; } return c; @@ -2283,8 +2437,8 @@ struct Set { int getkey(Par par, unsigned* i); int putval(unsigned i, bool force, unsigned n = ~0); // verify - int verify(const Set& set2) const; - int verifyorder(const ITab& itab, bool descending) const; + int verify(Par par, const Set& set2) const; + int verifyorder(Par par, const ITab& itab, bool descending) const; // protect structure NdbMutex* m_mutex; void lock() const { @@ -2616,7 +2770,7 @@ Set::putval(unsigned i, bool force, unsigned n) // verify int -Set::verify(const Set& set2) const +Set::verify(Par par, const Set& set2) const { assert(&m_tab == &set2.m_tab && m_rows == set2.m_rows); LL4("verify set1 count=" << count() << " vs set2 count=" << set2.count()); @@ -2625,7 +2779,7 @@ Set::verify(const Set& set2) const if (exist(i) != set2.exist(i)) { ok = false; } else if (exist(i)) { - if (dbrow(i).verify(set2.dbrow(i)) != 0) + if (dbrow(i).verify(par, set2.dbrow(i)) != 0) ok = false; } if (! ok) { @@ -2637,7 +2791,7 @@ Set::verify(const Set& set2) const } int -Set::verifyorder(const ITab& itab, bool descending) const +Set::verifyorder(Par par, const ITab& itab, bool descending) const { const Tab& tab = m_tab; for (unsigned n = 0; n < m_rows; n++) { @@ -2652,9 +2806,9 @@ Set::verifyorder(const ITab& itab, bool descending) const const Row& row2 = *m_row[i2]; assert(row1.m_exist && row2.m_exist); if (! descending) - CHK(row1.cmp(row2, itab) <= 0); + CHK(row1.cmp(par, row2, itab) <= 0); else - CHK(row1.cmp(row2, itab) >= 0); + CHK(row1.cmp(par, row2, itab) >= 0); } return 0; } @@ -2724,7 +2878,7 @@ struct BSet { void calc(Par par); void calcpk(Par par, unsigned i); int setbnd(Par par) const; - void filter(const Set& set, Set& set2) const; + void filter(Par par, const Set& set, Set& set2) const; }; BSet::BSet(const Tab& tab, const ITab& itab, unsigned rows) : @@ -2797,7 +2951,7 @@ BSet::calc(Par par) assert(m_bvals >= 2); const BVal& bv1 = *m_bval[m_bvals - 2]; const BVal& bv2 = *m_bval[m_bvals - 1]; - if (bv1.cmp(bv2) > 0 && urandom(100) != 0) + if (bv1.cmp(par, bv2) > 0 && urandom(100) != 0) continue; } } while (0); @@ -2848,7 +3002,7 @@ BSet::setbnd(Par par) const } void -BSet::filter(const Set& set, Set& set2) const +BSet::filter(Par par, const Set& set, Set& set2) const { const Tab& tab = m_tab; const ITab& itab = m_itab; @@ -2880,7 +3034,7 @@ BSet::filter(const Set& set, Set& set2) const const ICol& icol = bval.m_icol; const Col& col = icol.m_col; const Val& val = *row.m_val[col.m_num]; - int ret = bval.cmp(val); + int ret = bval.cmp(par, val); LL5("cmp: ret=" << ret << " " << bval << " vs " << val); if (bval.m_type == 0) ok2 = (ret <= 0); @@ -3110,7 +3264,7 @@ pkread(Par par) con.closeTransaction(); } if (par.m_verify) - CHK(set1.verify(set2) == 0); + CHK(set1.verify(par, set2) == 0); return 0; } @@ -3275,7 +3429,7 @@ hashindexread(Par par, const ITab& itab) con.closeTransaction(); } if (par.m_verify) - CHK(set1.verify(set2) == 0); + CHK(set1.verify(par, set2) == 0); return 0; } @@ -3289,12 +3443,11 @@ scanreadtable(Par par) const Set& set = par.set(); // expected const Set& set1 = set; - LL3("scanread " << tab.m_name << " verify=" << par.m_verify); - LL4("expect " << set.count() << " rows"); + LL3("scanread " << tab.m_name << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify); Set set2(tab, set.m_rows); CHK(con.startTransaction() == 0); CHK(con.getNdbScanOperation(tab) == 0); - CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0); + CHK(con.readTuples(par) == 0); set2.getval(par); CHK(con.executeScan() == 0); unsigned n = 0; @@ -3317,7 +3470,7 @@ scanreadtable(Par par) } con.closeTransaction(); if (par.m_verify) - CHK(set1.verify(set2) == 0); + CHK(set1.verify(par, set2) == 0); LL3("scanread " << tab.m_name << " done rows=" << n); return 0; } @@ -3331,7 +3484,7 @@ scanreadtablefast(Par par, unsigned countcheck) LL3("scanfast " << tab.m_name); CHK(con.startTransaction() == 0); CHK(con.getNdbScanOperation(tab) == 0); - CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0); + CHK(con.readTuples(par) == 0); // get 1st column NdbRecAttr* rec; CHK(con.getValue((Uint32)0, rec) == 0); @@ -3355,12 +3508,11 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc) Con& con = par.con(); const Tab& tab = par.tab(); const Set& set = par.set(); - LL4(bset); Set set1(tab, set.m_rows); if (calc) { while (true) { bset.calc(par); - bset.filter(set, set1); + bset.filter(par, set, set1); unsigned n = set1.count(); // prefer proper subset if (0 < n && n < set.m_rows) @@ -3370,17 +3522,13 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc) set1.reset(); } } else { - bset.filter(set, set1); + bset.filter(par, set, set1); } - LL3("scanread " << itab.m_name << " bounds=" << bset << " verify=" << par.m_verify << " ordered=" << par.m_ordered << " descending=" << par.m_descending); - LL4("expect " << set1.count() << " rows"); + LL3("scanread " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify << " ordered=" << par.m_ordered << " descending=" << par.m_descending); Set set2(tab, set.m_rows); CHK(con.startTransaction() == 0); - CHK(con.getNdbScanOperation(itab, tab) == 0); - if (! par.m_ordered) - CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0); - else - CHK(con.openScanOrdered(par.m_scanbat, par.m_scanpar, par.m_descending) == 0); + CHK(con.getNdbIndexScanOperation(itab, tab) == 0); + CHK(con.readIndexTuples(par) == 0); CHK(bset.setbnd(par) == 0); set2.getval(par); CHK(con.executeScan() == 0); @@ -3404,9 +3552,9 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc) } con.closeTransaction(); if (par.m_verify) { - CHK(set1.verify(set2) == 0); + CHK(set1.verify(par, set2) == 0); if (par.m_ordered) - CHK(set2.verifyorder(itab, par.m_descending) == 0); + CHK(set2.verifyorder(par, itab, par.m_descending) == 0); } LL3("scanread " << itab.m_name << " done rows=" << n); return 0; @@ -3418,11 +3566,11 @@ scanreadindexfast(Par par, const ITab& itab, const BSet& bset, unsigned countche Con& con = par.con(); const Tab& tab = par.tab(); const Set& set = par.set(); - LL3("scanfast " << itab.m_name << " bounds=" << bset.m_bvals); + LL3("scanfast " << itab.m_name << " " << bset); LL4(bset); CHK(con.startTransaction() == 0); - CHK(con.getNdbScanOperation(itab, tab) == 0); - CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0); + CHK(con.getNdbIndexScanOperation(itab, tab) == 0); + CHK(con.readIndexTuples(par) == 0); CHK(bset.setbnd(par) == 0); // get 1st column NdbRecAttr* rec; @@ -3544,9 +3692,10 @@ scanupdatetable(Par par) Set& set = par.set(); LL3("scan update " << tab.m_name); Set set2(tab, set.m_rows); + par.m_lockmode = NdbOperation::LM_Exclusive; CHK(con.startTransaction() == 0); CHK(con.getNdbScanOperation(tab) == 0); - CHK(con.openScanExclusive(par.m_scanbat, par.m_scanpar) == 0); + CHK(con.readTuples(par) == 0); set2.getval(par); CHK(con.executeScan() == 0); unsigned count = 0; @@ -3641,12 +3790,10 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) Set& set = par.set(); LL3("scan update " << itab.m_name); Set set2(tab, set.m_rows); + par.m_lockmode = NdbOperation::LM_Exclusive; CHK(con.startTransaction() == 0); - CHK(con.getNdbScanOperation(itab, tab) == 0); - if (! par.m_ordered) - CHK(con.openScanExclusive(par.m_scanbat, par.m_scanpar) == 0); - else - CHK(con.openScanOrderedExclusive(par.m_scanbat, par.m_scanpar, par.m_descending) == 0); + CHK(con.getNdbIndexScanOperation(itab, tab) == 0); + CHK(con.readTuples(par) == 0); CHK(bset.setbnd(par) == 0); set2.getval(par); CHK(con.executeScan() == 0); @@ -3777,6 +3924,7 @@ readverify(Par par) if (par.m_noverify) return 0; par.m_verify = true; + par.m_lockmode = NdbOperation::LM_CommittedRead; CHK(pkread(par) == 0); CHK(scanreadall(par) == 0); return 0; @@ -3788,19 +3936,24 @@ readverifyfull(Par par) if (par.m_noverify) return 0; par.m_verify = true; - if (par.m_no == 0) + par.m_lockmode = NdbOperation::LM_CommittedRead; + const Tab& tab = par.tab(); + if (par.m_no == 0) { + // thread 0 scans table CHK(scanreadtable(par) == 0); - else { - const Tab& tab = par.tab(); - unsigned i = par.m_no - 1; - if (i < tab.m_itabs && tab.m_itab[i] != 0) { - const ITab& itab = *tab.m_itab[i]; - if (itab.m_type == ITab::OrderedIndex) { - BSet bset(tab, itab, par.m_rows); - CHK(scanreadindex(par, itab, bset, false) == 0); - } else { - CHK(hashindexread(par, itab) == 0); - } + } + // each thread scans different indexes + for (unsigned i = 0; i < tab.m_itabs; i++) { + if (i % par.m_threads != par.m_no) + continue; + if (tab.m_itab[i] == 0) + continue; + const ITab& itab = *tab.m_itab[i]; + if (itab.m_type == ITab::OrderedIndex) { + BSet bset(tab, itab, par.m_rows); + CHK(scanreadindex(par, itab, bset, false) == 0); + } else { + CHK(hashindexread(par, itab) == 0); } } return 0; @@ -3809,7 +3962,10 @@ readverifyfull(Par par) static int readverifyindex(Par par) { + if (par.m_noverify) + return 0; par.m_verify = true; + par.m_lockmode = NdbOperation::LM_CommittedRead; unsigned sel = urandom(10); if (sel < 9) { par.m_ordered = true; @@ -4411,15 +4567,30 @@ printtables() { Par par(g_opt); makebuiltintables(par); - ndbout << "builtin tables (x0 on pk, x=ordered z=hash):" << endl; + ndbout << "tables and indexes (x=ordered z=hash x0=on pk):" << endl; for (unsigned j = 0; j < tabcount; j++) { if (tablist[j] == 0) continue; const Tab& tab = *tablist[j]; - ndbout << " " << tab.m_name; + const char* tname = tab.m_name; + ndbout << " " << tname; for (unsigned i = 0; i < tab.m_itabs; i++) { + if (tab.m_itab[i] == 0) + continue; const ITab& itab = *tab.m_itab[i]; - ndbout << " " << itab.m_name; + const char* iname = itab.m_name; + if (strncmp(tname, iname, strlen(tname)) == 0) + iname += strlen(tname); + ndbout << " " << iname; + ndbout << "("; + for (unsigned k = 0; k < itab.m_icols; k++) { + if (k != 0) + ndbout << ","; + const ICol& icol = *itab.m_icol[k]; + const Col& col = icol.m_col; + ndbout << col.m_name; + } + ndbout << ")"; } ndbout << endl; } @@ -4434,9 +4605,12 @@ runtest(Par par) unsigned short seed = (getpid() ^ time(0)); LL1("random seed: " << seed); srandom((unsigned)seed); - } else if (par.m_seed != 0) + } else if (par.m_seed != 0) { LL1("random seed: " << par.m_seed); srandom(par.m_seed); + } else { + LL1("random seed: loop number"); + } // cs assert(par.m_csname != 0); if (strcmp(par.m_csname, "random") != 0) { diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 290d73554ca..767ddc40400 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -416,6 +416,7 @@ static inline bool ndb_supported_type(enum_field_types type) case MYSQL_TYPE_YEAR: case MYSQL_TYPE_STRING: case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_TINY_BLOB: case MYSQL_TYPE_BLOB: case MYSQL_TYPE_MEDIUM_BLOB: @@ -426,7 +427,6 @@ static inline bool ndb_supported_type(enum_field_types type) return TRUE; case MYSQL_TYPE_NULL: case MYSQL_TYPE_GEOMETRY: - case MYSQL_TYPE_VARCHAR: break; } return FALSE; @@ -1013,6 +1013,24 @@ inline ulong ha_ndbcluster::index_flags(uint idx_no, uint part, DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]); } +static void shrink_varchar(Field* field, const byte* & ptr, char* buf) +{ + if (field->type() == MYSQL_TYPE_VARCHAR) { + Field_varstring* f= (Field_varstring*)field; + if (f->length_bytes < 256) { + uint pack_len= field->pack_length(); + DBUG_ASSERT(1 <= pack_len && pack_len <= 256); + if (ptr[1] == 0) { + buf[0]= ptr[0]; + } else { + DBUG_ASSERT(false); + buf[0]= 255; + } + memmove(buf + 1, ptr + 2, pack_len - 1); + ptr= buf; + } + } +} int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key) { @@ -1024,10 +1042,13 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key) for (; key_part != end; key_part++) { Field* field= key_part->field; + const byte* ptr= key; + char buf[256]; + shrink_varchar(field, ptr, buf); if (set_ndb_key(op, field, - key_part->fieldnr-1, key)) + key_part->fieldnr-1, ptr)) ERR_RETURN(op->getNdbError()); - key += key_part->length; + key += key_part->store_length; } DBUG_RETURN(0); } @@ -1080,8 +1101,11 @@ ha_ndbcluster::set_index_key(NdbOperation *op, for (i= 0; key_part != end; key_part++, i++) { - if (set_ndb_key(op, key_part->field, i, - key_part->null_bit ? key_ptr + 1 : key_ptr)) + Field* field= key_part->field; + const byte* ptr= key_part->null_bit ? key_ptr + 1 : key_ptr; + char buf[256]; + shrink_varchar(field, ptr, buf); + if (set_ndb_key(op, field, i, ptr)) ERR_RETURN(m_active_trans->getNdbError()); key_ptr+= key_part->store_length; } @@ -1542,7 +1566,10 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, char truncated_field_name[NDB_MAX_ATTR_NAME_SIZE]; strnmov(truncated_field_name,field->field_name,sizeof(truncated_field_name)); truncated_field_name[sizeof(truncated_field_name)-1]= '\0'; - if (op->setBound(truncated_field_name, p.bound_type, p.bound_ptr)) + const char* ptr= p.bound_ptr; + char buf[256]; + shrink_varchar(field, ptr, buf); + if (op->setBound(truncated_field_name, p.bound_type, ptr)) ERR_RETURN(op->getNdbError()); } } @@ -2343,21 +2370,26 @@ void ha_ndbcluster::print_results() my_snprintf(buf, sizeof(buf), "Decimal '%-*s'", field->pack_length(), value); break; } - case NdbDictionary::Column::Char:{ + case NdbDictionary::Column::Char: { const char *value= (char*)ptr; my_snprintf(buf, sizeof(buf), "Char '%.*s'", field->pack_length(), value); break; } - case NdbDictionary::Column::Varchar: - case NdbDictionary::Column::Binary: - case NdbDictionary::Column::Varbinary: { - const char *value= (char*)ptr; - my_snprintf(buf, sizeof(buf), "Var '%.*s'", field->pack_length(), value); + case NdbDictionary::Column::Varchar: { + uint len= *(uchar*)ptr; + const char *value= (char*)ptr + 1; + my_snprintf(buf, sizeof(buf), "Varchar (%u)'%.*s'", len, len, value); break; } - case NdbDictionary::Column::Bit: { + case NdbDictionary::Column::Binary: { const char *value= (char*)ptr; - my_snprintf(buf, sizeof(buf), "Bit '%.*s'", field->pack_length(), value); + my_snprintf(buf, sizeof(buf), "Binary '%.*s'", field->pack_length(), value); + break; + } + case NdbDictionary::Column::Varbinary: { + uint len= *(uchar*)ptr; + const char *value= (char*)ptr + 1; + my_snprintf(buf, sizeof(buf), "Varbinary (%u)'%.*s'", len, len, value); break; } case NdbDictionary::Column::Datetime: { @@ -2382,6 +2414,23 @@ void ha_ndbcluster::print_results() my_snprintf(buf, sizeof(buf), "Text [len=%u]", (unsigned)len); break; } + case NdbDictionary::Column::Bit: { + const char *value= (char*)ptr; + my_snprintf(buf, sizeof(buf), "Bit '%.*s'", field->pack_length(), value); + break; + } + case NdbDictionary::Column::Longvarchar: { + uint len= uint2korr(ptr); + const char *value= (char*)ptr + 2; + my_snprintf(buf, sizeof(buf), "Longvarchar (%u)'%.*s'", len, len, value); + break; + } + case NdbDictionary::Column::Longvarbinary: { + uint len= uint2korr(ptr); + const char *value= (char*)ptr + 2; + my_snprintf(buf, sizeof(buf), "Longvarbinary (%u)'%.*s'", len, len, value); + break; + } case NdbDictionary::Column::Undefined: my_snprintf(buf, sizeof(buf), "Unknown type: %d", col->getType()); break; @@ -3472,14 +3521,34 @@ static int create_ndb_column(NDBCOL &col, col.setLength(field->pack_length()); } break; - case MYSQL_TYPE_VAR_STRING: - if (field->flags & BINARY_FLAG) - col.setType(NDBCOL::Varbinary); - else { - col.setType(NDBCOL::Varchar); - col.setCharset(cs); + case MYSQL_TYPE_VAR_STRING: // ? + case MYSQL_TYPE_VARCHAR: + { + Field_varstring* f= (Field_varstring*)field; + if (f->length_bytes == 1) + { + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Varbinary); + else { + col.setType(NDBCOL::Varchar); + col.setCharset(cs); + } + } + else if (f->length_bytes == 2) + { + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Longvarbinary); + else { + col.setType(NDBCOL::Longvarchar); + col.setCharset(cs); + } + } + else + { + return HA_ERR_UNSUPPORTED; + } + col.setLength(field->field_length); } - col.setLength(field->pack_length()); break; // Blob types (all come in as MYSQL_TYPE_BLOB) mysql_type_tiny_blob: @@ -3599,7 +3668,7 @@ int ha_ndbcluster::create(const char *name, char name2[FN_HEADLEN]; bool create_from_engine= (info->table_options & HA_CREATE_FROM_ENGINE); - DBUG_ENTER("create"); + DBUG_ENTER("ha_ndbcluster::create"); DBUG_PRINT("enter", ("name: %s", name)); fn_format(name2, name, "", "",2); // Remove the .frm extension set_dbname(name2); @@ -3934,7 +4003,6 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_table_flags(HA_REC_NOT_IN_SEQ | HA_NULL_IN_KEY | HA_AUTO_PART_KEY | - HA_NO_VARCHAR | HA_NO_PREFIX_CHAR_KEYS | HA_NEED_READ_RANGE_BUFFER | HA_CAN_BIT_FIELD), |