summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/r/ndb_alter_table.result8
-rw-r--r--mysql-test/r/ndb_bitfield.result2
-rw-r--r--mysql-test/r/ndb_charset.result142
-rw-r--r--mysql-test/t/ndb_charset.test80
-rw-r--r--ndb/include/kernel/signaldata/DictTabInfo.hpp71
-rw-r--r--ndb/include/ndb_constants.h6
-rw-r--r--ndb/include/ndbapi/NdbDictionary.hpp22
-rw-r--r--ndb/include/util/NdbSqlUtil.hpp33
-rw-r--r--ndb/src/common/util/NdbSqlUtil.cpp210
-rw-r--r--ndb/src/kernel/blocks/dbacc/DbaccMain.cpp10
-rw-r--r--ndb/src/kernel/blocks/dbdict/Dbdict.cpp4
-rw-r--r--ndb/src/kernel/blocks/dbtc/Dbtc.hpp2
-rw-r--r--ndb/src/kernel/blocks/dbtc/DbtcMain.cpp18
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp43
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp15
-rw-r--r--ndb/src/ndbapi/NdbDictionary.cpp6
-rw-r--r--ndb/src/ndbapi/NdbDictionaryImpl.cpp63
-rw-r--r--ndb/src/ndbapi/NdbDictionaryImpl.hpp3
-rw-r--r--ndb/src/ndbapi/NdbOperationDefine.cpp44
-rw-r--r--ndb/src/ndbapi/NdbOperationSearch.cpp43
-rw-r--r--ndb/src/ndbapi/NdbRecAttr.cpp15
-rw-r--r--ndb/src/ndbapi/NdbTransaction.cpp4
-rw-r--r--ndb/test/ndbapi/testOIBasic.cpp628
-rw-r--r--sql/ha_ndbcluster.cc114
24 files changed, 1163 insertions, 423 deletions
diff --git a/mysql-test/r/ndb_alter_table.result b/mysql-test/r/ndb_alter_table.result
index 1661fa35d13..f3b9e962873 100644
--- a/mysql-test/r/ndb_alter_table.result
+++ b/mysql-test/r/ndb_alter_table.result
@@ -34,13 +34,13 @@ col5 enum('PENDING', 'ACTIVE', 'DISABLED') not null,
col6 int not null, to_be_deleted int) ENGINE=ndbcluster;
show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
-t1 ndbcluster 9 Dynamic 0 0 0 0 0 0 1 NULL NULL NULL latin1_swedish_ci NULL
+t1 ndbcluster 10 Dynamic 0 0 0 0 0 0 1 NULL NULL NULL latin1_swedish_ci NULL
SET SQL_MODE=NO_AUTO_VALUE_ON_ZERO;
insert into t1 values
(0,4,3,5,"PENDING",1,7),(NULL,4,3,5,"PENDING",1,7),(31,4,3,5,"PENDING",1,7), (7,4,3,5,"PENDING",1,7), (NULL,4,3,5,"PENDING",1,7), (100,4,3,5,"PENDING",1,7), (99,4,3,5,"PENDING",1,7), (8,4,3,5,"PENDING",1,7), (NULL,4,3,5,"PENDING",1,7);
show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
-t1 ndbcluster 9 Dynamic 9 0 0 0 0 0 101 NULL NULL NULL latin1_swedish_ci NULL
+t1 ndbcluster 10 Dynamic 9 0 0 0 0 0 101 NULL NULL NULL latin1_swedish_ci NULL
select * from t1 order by col1;
col1 col2 col3 col4 col5 col6 to_be_deleted
0 4 3 5 PENDING 1 7
@@ -60,7 +60,7 @@ change column col2 fourth varchar(30) not null after col3,
modify column col6 int not null first;
show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
-t1 ndbcluster 9 Dynamic 9 0 0 0 0 0 102 NULL NULL NULL latin1_swedish_ci NULL
+t1 ndbcluster 10 Dynamic 9 0 0 0 0 0 102 NULL NULL NULL latin1_swedish_ci NULL
select * from t1 order by col1;
col6 col1 col3 fourth col4 col4_5 col5 col7 col8
1 0 3 4 5 PENDING 0000-00-00 00:00:00
@@ -75,7 +75,7 @@ col6 col1 col3 fourth col4 col4_5 col5 col7 col8
insert into t1 values (2, NULL,4,3,5,99,"PENDING","EXTRA",'2004-01-01 00:00:00');
show table status;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
-t1 ndbcluster 9 Dynamic 10 0 0 0 0 0 103 NULL NULL NULL latin1_swedish_ci NULL
+t1 ndbcluster 10 Dynamic 10 0 0 0 0 0 103 NULL NULL NULL latin1_swedish_ci NULL
select * from t1 order by col1;
col6 col1 col3 fourth col4 col4_5 col5 col7 col8
1 0 3 4 5 PENDING 0000-00-00 00:00:00
diff --git a/mysql-test/r/ndb_bitfield.result b/mysql-test/r/ndb_bitfield.result
index 1532697c428..66ec593e195 100644
--- a/mysql-test/r/ndb_bitfield.result
+++ b/mysql-test/r/ndb_bitfield.result
@@ -143,7 +143,7 @@ create table t1 (
pk1 bit(9) not null primary key,
b int
) engine=ndbcluster;
-ERROR HY000: Can't create table './test/t1.frm' (errno: 743)
+ERROR HY000: Can't create table './test/t1.frm' (errno: 739)
create table t1 (
pk1 int not null primary key,
b bit(9),
diff --git a/mysql-test/r/ndb_charset.result b/mysql-test/r/ndb_charset.result
index 00bc36a7c0d..752a4fba630 100644
--- a/mysql-test/r/ndb_charset.result
+++ b/mysql-test/r/ndb_charset.result
@@ -47,6 +47,40 @@ a
aAa
drop table t1;
create table t1 (
+a varchar(20) character set latin1 collate latin1_swedish_ci primary key
+) engine=ndb;
+insert into t1 values ('A'),('b '),('C '),('d '),('E'),('f');
+insert into t1 values('b');
+ERROR 23000: Duplicate entry 'b' for key 1
+insert into t1 values('a ');
+ERROR 23000: Duplicate entry 'a ' for key 1
+select a,length(a) from t1 order by a;
+a length(a)
+A 1
+b 2
+C 3
+d 7
+E 1
+f 1
+select a,length(a) from t1 order by a desc;
+a length(a)
+f 1
+E 1
+d 7
+C 3
+b 2
+A 1
+select * from t1 where a = 'a';
+a
+A
+select * from t1 where a = 'a ';
+a
+A
+select * from t1 where a = 'd';
+a
+d
+drop table t1;
+create table t1 (
p int primary key,
a char(3) character set latin1 collate latin1_bin not null,
unique key(a)
@@ -99,6 +133,42 @@ p a
drop table t1;
create table t1 (
p int primary key,
+a varchar(20) character set latin1 collate latin1_swedish_ci not null,
+unique key(a)
+) engine=ndb;
+insert into t1 values (1,'A'),(2,'b '),(3,'C '),(4,'d '),(5,'E'),(6,'f');
+insert into t1 values(99,'b');
+ERROR 23000: Duplicate entry '99' for key 1
+insert into t1 values(99,'a ');
+ERROR 23000: Duplicate entry '99' for key 1
+select a,length(a) from t1 order by a;
+a length(a)
+A 1
+b 2
+C 3
+d 7
+E 1
+f 1
+select a,length(a) from t1 order by a desc;
+a length(a)
+f 1
+E 1
+d 7
+C 3
+b 2
+A 1
+select * from t1 where a = 'a';
+p a
+1 A
+select * from t1 where a = 'a ';
+p a
+1 A
+select * from t1 where a = 'd';
+p a
+4 d
+drop table t1;
+create table t1 (
+p int primary key,
a char(3) character set latin1 collate latin1_bin not null,
index(a)
) engine=ndb;
@@ -190,7 +260,77 @@ p a
6 AAA
drop table t1;
create table t1 (
-a varchar(10) primary key
+p int primary key,
+a varchar(20) character set latin1 collate latin1_swedish_ci not null,
+index(a, p)
+) engine=ndb;
+insert into t1 values (1,'A'),(2,'b '),(3,'C '),(4,'d '),(5,'E'),(6,'f');
+insert into t1 values (7,'a'),(8,'B '),(9,'c '),(10,'D'),(11,'e'),(12,'F ');
+select p,a,length(a) from t1 order by a, p;
+p a length(a)
+1 A 1
+7 a 1
+2 b 2
+8 B 2
+3 C 3
+9 c 3
+4 d 7
+10 D 1
+5 E 1
+11 e 1
+6 f 1
+12 F 3
+select * from t1 where a = 'a ' order by a desc, p desc;
+p a
+7 a
+1 A
+select * from t1 where a >= 'D' order by a, p;
+p a
+4 d
+10 D
+5 E
+11 e
+6 f
+12 F
+select * from t1 where a < 'D' order by a, p;
+p a
+1 A
+7 a
+2 b
+8 B
+3 C
+9 c
+select count(*) from t1 x, t1 y, t1 z where x.a = y.a and y.a = z.a;
+count(*)
+48
+drop table t1;
+create table t1 (
+a char(5) character set ucs2,
+b varchar(7) character set utf8,
+primary key(a, b)
+) engine=ndb;
+insert into t1 values
+('a','A '),('B ','b'),('c','C '),('D','d'),('e ','E'),('F','f '),
+('A','b '),('b ','C'),('C','d '),('d','E'),('E ','f'),
+('a','C '),('B ','d'),('c','E '),('D','f');
+insert into t1 values('d','f');
+ERROR 23000: Duplicate entry '' for key 1
+select a,b,length(a),length(b) from t1 order by a,b limit 3;
+a b length(a) length(b)
+a A 2 2
+A b 2 2
+a C 2 2
+select a,b,length(a),length(b) from t1 order by a desc, b desc limit 3;
+a b length(a) length(b)
+F f 2 3
+E f 2 1
+e E 2 1
+select a,b,length(a),length(b) from t1 where a='c' and b='c';
+a b length(a) length(b)
+c C 2 5
+drop table t1;
+create table t1 (
+a char(10) primary key
) engine=ndb;
insert into t1 values ('jonas % ');
replace into t1 values ('jonas % ');
diff --git a/mysql-test/t/ndb_charset.test b/mysql-test/t/ndb_charset.test
index 1b9e7e8bfcc..ab2bbcc3ad8 100644
--- a/mysql-test/t/ndb_charset.test
+++ b/mysql-test/t/ndb_charset.test
@@ -53,6 +53,25 @@ select * from t1 where a = 'AaA';
select * from t1 where a = 'AAA';
drop table t1;
+# pk - varchar
+
+create table t1 (
+ a varchar(20) character set latin1 collate latin1_swedish_ci primary key
+) engine=ndb;
+#
+insert into t1 values ('A'),('b '),('C '),('d '),('E'),('f');
+-- error 1062
+insert into t1 values('b');
+-- error 1062
+insert into t1 values('a ');
+#
+select a,length(a) from t1 order by a;
+select a,length(a) from t1 order by a desc;
+select * from t1 where a = 'a';
+select * from t1 where a = 'a ';
+select * from t1 where a = 'd';
+drop table t1;
+
# unique hash index - binary
create table t1 (
@@ -102,6 +121,27 @@ select * from t1 where a = 'AaA';
select * from t1 where a = 'AAA';
drop table t1;
+# unique hash index - varchar
+
+create table t1 (
+ p int primary key,
+ a varchar(20) character set latin1 collate latin1_swedish_ci not null,
+ unique key(a)
+) engine=ndb;
+#
+insert into t1 values (1,'A'),(2,'b '),(3,'C '),(4,'d '),(5,'E'),(6,'f');
+-- error 1062
+insert into t1 values(99,'b');
+-- error 1062
+insert into t1 values(99,'a ');
+#
+select a,length(a) from t1 order by a;
+select a,length(a) from t1 order by a desc;
+select * from t1 where a = 'a';
+select * from t1 where a = 'a ';
+select * from t1 where a = 'd';
+drop table t1;
+
# ordered index - binary
create table t1 (
@@ -158,9 +198,47 @@ select * from t1 where a = 'AaA' order by p;
select * from t1 where a = 'AAA' order by p;
drop table t1;
+# ordered index - varchar
+
+create table t1 (
+ p int primary key,
+ a varchar(20) character set latin1 collate latin1_swedish_ci not null,
+ index(a, p)
+) engine=ndb;
+#
+insert into t1 values (1,'A'),(2,'b '),(3,'C '),(4,'d '),(5,'E'),(6,'f');
+insert into t1 values (7,'a'),(8,'B '),(9,'c '),(10,'D'),(11,'e'),(12,'F ');
+select p,a,length(a) from t1 order by a, p;
+select * from t1 where a = 'a ' order by a desc, p desc;
+select * from t1 where a >= 'D' order by a, p;
+select * from t1 where a < 'D' order by a, p;
+#
+select count(*) from t1 x, t1 y, t1 z where x.a = y.a and y.a = z.a;
+drop table t1;
+
+# minimal multi-byte test
+
+create table t1 (
+ a char(5) character set ucs2,
+ b varchar(7) character set utf8,
+ primary key(a, b)
+) engine=ndb;
+#
+insert into t1 values
+ ('a','A '),('B ','b'),('c','C '),('D','d'),('e ','E'),('F','f '),
+ ('A','b '),('b ','C'),('C','d '),('d','E'),('E ','f'),
+ ('a','C '),('B ','d'),('c','E '),('D','f');
+-- error 1062
+insert into t1 values('d','f');
+#
+select a,b,length(a),length(b) from t1 order by a,b limit 3;
+select a,b,length(a),length(b) from t1 order by a desc, b desc limit 3;
+select a,b,length(a),length(b) from t1 where a='c' and b='c';
+drop table t1;
+
# bug
create table t1 (
- a varchar(10) primary key
+ a char(10) primary key
) engine=ndb;
insert into t1 values ('jonas % ');
replace into t1 values ('jonas % ');
diff --git a/ndb/include/kernel/signaldata/DictTabInfo.hpp b/ndb/include/kernel/signaldata/DictTabInfo.hpp
index 616da05b3ae..cc8a647615c 100644
--- a/ndb/include/kernel/signaldata/DictTabInfo.hpp
+++ b/ndb/include/kernel/signaldata/DictTabInfo.hpp
@@ -269,7 +269,9 @@ public:
ExtTimespec = NdbSqlUtil::Type::Timespec,
ExtBlob = NdbSqlUtil::Type::Blob,
ExtText = NdbSqlUtil::Type::Text,
- ExtBit = NdbSqlUtil::Type::Bit
+ ExtBit = NdbSqlUtil::Type::Bit,
+ ExtLongvarchar = NdbSqlUtil::Type::Longvarchar,
+ ExtLongvarbinary = NdbSqlUtil::Type::Longvarbinary
};
// Attribute data interpretation
@@ -297,98 +299,91 @@ public:
return ((1 << AttributeSize) * AttributeArraySize + 31) >> 5;
}
- // translate to old kernel types and sizes
+ // compute old-sty|e attribute size and array size
inline bool
translateExtType() {
- AttributeType = ~0; // deprecated
switch (AttributeExtType) {
case DictTabInfo::ExtUndefined:
- break;
+ return false;
case DictTabInfo::ExtTinyint:
- AttributeSize = DictTabInfo::an8Bit;
- AttributeArraySize = AttributeExtLength;
- return true;
case DictTabInfo::ExtTinyunsigned:
AttributeSize = DictTabInfo::an8Bit;
AttributeArraySize = AttributeExtLength;
- return true;
+ break;
case DictTabInfo::ExtSmallint:
- AttributeSize = DictTabInfo::a16Bit;
- AttributeArraySize = AttributeExtLength;
- return true;
case DictTabInfo::ExtSmallunsigned:
AttributeSize = DictTabInfo::a16Bit;
AttributeArraySize = AttributeExtLength;
- return true;
+ break;
case DictTabInfo::ExtMediumint:
- AttributeSize = DictTabInfo::an8Bit;
- AttributeArraySize = 3 * AttributeExtLength;
- return true;
case DictTabInfo::ExtMediumunsigned:
AttributeSize = DictTabInfo::an8Bit;
AttributeArraySize = 3 * AttributeExtLength;
- return true;
+ break;
case DictTabInfo::ExtInt:
- AttributeSize = DictTabInfo::a32Bit;
- AttributeArraySize = AttributeExtLength;
- return true;
case DictTabInfo::ExtUnsigned:
AttributeSize = DictTabInfo::a32Bit;
AttributeArraySize = AttributeExtLength;
- return true;
+ break;
case DictTabInfo::ExtBigint:
- AttributeSize = DictTabInfo::a64Bit;
- AttributeArraySize = AttributeExtLength;
- return true;
case DictTabInfo::ExtBigunsigned:
AttributeSize = DictTabInfo::a64Bit;
AttributeArraySize = AttributeExtLength;
- return true;
+ break;
case DictTabInfo::ExtFloat:
AttributeSize = DictTabInfo::a32Bit;
AttributeArraySize = AttributeExtLength;
- return true;
+ break;
case DictTabInfo::ExtDouble:
AttributeSize = DictTabInfo::a64Bit;
AttributeArraySize = AttributeExtLength;
- return true;
+ break;
case DictTabInfo::ExtDecimal:
// not yet implemented anywhere
- break;
+ return false;
case DictTabInfo::ExtChar:
case DictTabInfo::ExtBinary:
AttributeSize = DictTabInfo::an8Bit;
AttributeArraySize = AttributeExtLength;
- return true;
+ break;
case DictTabInfo::ExtVarchar:
case DictTabInfo::ExtVarbinary:
- // to fix
+ if (AttributeExtLength > 0xff)
+ return false;
AttributeSize = DictTabInfo::an8Bit;
- AttributeArraySize = AttributeExtLength + 2;
- return true;
+ AttributeArraySize = AttributeExtLength + 1;
+ break;
case DictTabInfo::ExtDatetime:
// to fix
AttributeSize = DictTabInfo::an8Bit;
AttributeArraySize = 8 * AttributeExtLength;
- return true;
+ break;
case DictTabInfo::ExtTimespec:
// to fix
AttributeSize = DictTabInfo::an8Bit;
AttributeArraySize = 12 * AttributeExtLength;
- return true;
+ break;
case DictTabInfo::ExtBlob:
case DictTabInfo::ExtText:
AttributeSize = DictTabInfo::an8Bit;
- // head + inline part [ attr precision lower half ]
+ // head + inline part (length in precision lower half)
AttributeArraySize = (NDB_BLOB_HEAD_SIZE << 2) + (AttributeExtPrecision & 0xFFFF);
- return true;
+ break;
case DictTabInfo::ExtBit:
AttributeSize = DictTabInfo::aBit;
AttributeArraySize = AttributeExtLength;
- return true;
+ break;
+ case DictTabInfo::ExtLongvarchar:
+ case DictTabInfo::ExtLongvarbinary:
+ if (AttributeExtLength > 0xffff)
+ return false;
+ AttributeSize = DictTabInfo::an8Bit;
+ AttributeArraySize = AttributeExtLength + 2;
+ break;
+ default:
+ return false;
};
-
- return false;
+ return true;
}
inline void print(FILE *out) {
diff --git a/ndb/include/ndb_constants.h b/ndb/include/ndb_constants.h
index 04d86e267f7..40a3d963955 100644
--- a/ndb/include/ndb_constants.h
+++ b/ndb/include/ndb_constants.h
@@ -21,7 +21,7 @@
* Changing the values makes database upgrade impossible.
*
* New or removed definitions must be replicated to
- * NdbDictionary.hpp and NdbSqlUtil.cpp.
+ * NdbDictionary.hpp and NdbSqlUtil.hpp.
*
* Not for use by application programs.
* Use the enums provided by NdbDictionary instead.
@@ -58,7 +58,9 @@
#define NDB_TYPE_BLOB 20
#define NDB_TYPE_TEXT 21
#define NDB_TYPE_BIT 22
+#define NDB_TYPE_LONG_VARCHAR 23
+#define NDB_TYPE_LONG_VARBINARY 24
-#define NDB_TYPE_MAX 23
+#define NDB_TYPE_MAX 25
#endif
diff --git a/ndb/include/ndbapi/NdbDictionary.hpp b/ndb/include/ndbapi/NdbDictionary.hpp
index 700591d6cdf..3f6368a5d60 100644
--- a/ndb/include/ndbapi/NdbDictionary.hpp
+++ b/ndb/include/ndbapi/NdbDictionary.hpp
@@ -149,14 +149,20 @@ public:
/**
* @class Column
- * @brief Represents an column in an NDB Cluster table
+ * @brief Represents a column in an NDB Cluster table
*
- * Each column has a type. The type of a column is determind by a number
+ * Each column has a type. The type of a column is determined by a number
* of type specifiers.
* The type specifiers are:
* - Builtin type
* - Array length or max length
- * - Precision and scale
+ * - Precision and scale (not used yet)
+ * - Character set for string types
+ * - Inline and part sizes for blobs
+ *
+ * Types in general correspond to MySQL types and their variants.
+ * Data formats are same as in MySQL. NDB API provides no support for
+ * constructing such formats. NDB kernel checks them however.
*/
class Column {
public:
@@ -179,14 +185,16 @@ public:
Double = NDB_TYPE_DOUBLE, ///< 64-bit float. 8 byte float, can be used in array
Decimal = NDB_TYPE_DECIMAL, ///< Precision, Scale are applicable
Char = NDB_TYPE_CHAR, ///< Len. A fixed array of 1-byte chars
- Varchar = NDB_TYPE_VARCHAR, ///< Max len
+ Varchar = NDB_TYPE_VARCHAR, ///< Length bytes: 1, Max: 255
Binary = NDB_TYPE_BINARY, ///< Len
- Varbinary = NDB_TYPE_VARBINARY, ///< Max len
+ Varbinary = NDB_TYPE_VARBINARY, ///< Length bytes: 1, Max: 255
Datetime = NDB_TYPE_DATETIME, ///< Precision down to 1 sec (sizeof(Datetime) == 8 bytes )
Timespec = NDB_TYPE_TIMESPEC, ///< Precision down to 1 nsec(sizeof(Datetime) == 12 bytes )
Blob = NDB_TYPE_BLOB, ///< Binary large object (see NdbBlob)
- Text = NDB_TYPE_TEXT, ///< Text blob,
- Bit = NDB_TYPE_BIT ///< Bit, length specifies no of bits
+ Text = NDB_TYPE_TEXT, ///< Text blob
+ Bit = NDB_TYPE_BIT, ///< Bit, length specifies no of bits
+ Longvarchar = NDB_TYPE_LONG_VARCHAR, ///< Length bytes: 2, little-endian
+ Longvarbinary = NDB_TYPE_LONG_VARBINARY ///< Length bytes: 2, little-endian
};
/**
diff --git a/ndb/include/util/NdbSqlUtil.hpp b/ndb/include/util/NdbSqlUtil.hpp
index 8ea3e9c8124..feb2b97c54b 100644
--- a/ndb/include/util/NdbSqlUtil.hpp
+++ b/ndb/include/util/NdbSqlUtil.hpp
@@ -20,6 +20,9 @@
#include <ndb_global.h>
#include <kernel/ndb_limits.h>
+struct charset_info_st;
+typedef struct charset_info_st CHARSET_INFO;
+
class NdbSqlUtil {
public:
/**
@@ -86,9 +89,11 @@ public:
Timespec = NDB_TYPE_TIMESPEC,
Blob = NDB_TYPE_BLOB,
Text = NDB_TYPE_TEXT,
- Bit = NDB_TYPE_BIT
+ Bit = NDB_TYPE_BIT,
+ Longvarchar = NDB_TYPE_LONG_VARCHAR,
+ Longvarbinary = NDB_TYPE_LONG_VARBINARY
};
- Enum m_typeId;
+ Enum m_typeId; // redundant
Cmp* m_cmp; // comparison method
};
@@ -98,16 +103,29 @@ public:
static const Type& getType(Uint32 typeId);
/**
- * Get type by id but replace char type by corresponding binary type.
+ * Get the normalized type used in hashing and key comparisons.
+ * Maps all string types to Binary.
*/
static const Type& getTypeBinary(Uint32 typeId);
/**
* Check character set.
*/
- static bool usable_in_pk(Uint32 typeId, const void* cs);
- static bool usable_in_hash_index(Uint32 typeId, const void* cs);
- static bool usable_in_ordered_index(Uint32 typeId, const void* cs);
+ static bool usable_in_pk(Uint32 typeId, const void* info);
+ static bool usable_in_hash_index(Uint32 typeId, const void* info);
+ static bool usable_in_ordered_index(Uint32 typeId, const void* info);
+
+ /**
+ * Get number of length bytes and length from variable length string.
+ * Returns false on error (invalid data). For other types returns
+ * zero length bytes and the fixed attribute length.
+ */
+ static bool get_var_length(Uint32 typeId, const void* p, unsigned attrlen, Uint32& lb, Uint32& len);
+
+ /**
+ * Temporary workaround for bug#7284.
+ */
+ static int strnxfrm_bug7284(CHARSET_INFO* cs, unsigned char* dst, unsigned dstLen, const unsigned char*src, unsigned srcLen);
private:
/**
@@ -138,6 +156,9 @@ private:
static Cmp cmpTimespec;
static Cmp cmpBlob;
static Cmp cmpText;
+ static Cmp cmpBit;
+ static Cmp cmpLongvarchar;
+ static Cmp cmpLongvarbinary;
};
#endif
diff --git a/ndb/src/common/util/NdbSqlUtil.cpp b/ndb/src/common/util/NdbSqlUtil.cpp
index 9ed791d2803..fd23781605c 100644
--- a/ndb/src/common/util/NdbSqlUtil.cpp
+++ b/ndb/src/common/util/NdbSqlUtil.cpp
@@ -71,7 +71,7 @@ NdbSqlUtil::char_like(const char* s1, unsigned n1,
}
/*
- * Data types.
+ * Data types. The entries must be in the numerical order.
*/
const NdbSqlUtil::Type
@@ -138,7 +138,7 @@ NdbSqlUtil::m_typeList[] = {
},
{
Type::Varchar,
- NULL // cmpVarchar
+ cmpVarchar
},
{
Type::Binary,
@@ -146,7 +146,7 @@ NdbSqlUtil::m_typeList[] = {
},
{
Type::Varbinary,
- NULL // cmpVarbinary
+ cmpVarbinary
},
{
Type::Datetime,
@@ -163,6 +163,18 @@ NdbSqlUtil::m_typeList[] = {
{
Type::Text,
NULL // cmpText
+ },
+ {
+ Type::Bit,
+ NULL // cmpBit
+ },
+ {
+ Type::Longvarchar,
+ cmpLongvarchar
+ },
+ {
+ Type::Longvarbinary,
+ cmpLongvarbinary
}
};
@@ -181,10 +193,12 @@ NdbSqlUtil::getTypeBinary(Uint32 typeId)
{
switch (typeId) {
case Type::Char:
- typeId = Type::Binary;
- break;
case Type::Varchar:
- typeId = Type::Varbinary;
+ case Type::Binary:
+ case Type::Varbinary:
+ case Type::Longvarchar:
+ case Type::Longvarbinary:
+ typeId = Type::Binary;
break;
case Type::Text:
typeId = Type::Blob;
@@ -425,11 +439,27 @@ NdbSqlUtil::cmpChar(const void* info, const void* p1, unsigned n1, const void* p
return k < 0 ? -1 : k > 0 ? +1 : 0;
}
-// waiting for MySQL and new NDB implementation
int
NdbSqlUtil::cmpVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(false);
+ const unsigned lb = 1;
+ // collation does not work on prefix for some charsets
+ assert(full && n1 >= lb && n2 >= lb);
+ const uchar* v1 = (const uchar*)p1;
+ const uchar* v2 = (const uchar*)p2;
+ unsigned m1 = *v1;
+ unsigned m2 = *v2;
+ if (m1 <= n1 - lb && m2 <= n2 - lb) {
+ CHARSET_INFO* cs = (CHARSET_INFO*)(info);
+ // compare with space padding
+ int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
+ return k < 0 ? -1 : k > 0 ? +1 : 0;
+ }
+ // treat bad data as NULL
+ if (m1 > n1 - lb && m2 <= n2 - lb)
+ return -1;
+ if (m1 <= n1 - lb && m2 > n2 - lb)
+ return +1;
return 0;
}
@@ -442,20 +472,39 @@ NdbSqlUtil::cmpBinary(const void* info, const void* p1, unsigned n1, const void*
unsigned n = (n1 <= n2 ? n1 : n2);
int k = memcmp(v1, v2, n);
if (k == 0) {
- if (full)
- k = (int)n1 - (int)n2;
- else
- k = (int)n - (int)n2;
+ k = (full ? n1 : n) - n2;
}
return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
}
-// waiting for MySQL and new NDB implementation
int
NdbSqlUtil::cmpVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
- assert(false);
- return 0;
+ const unsigned lb = 1;
+ if (n2 >= lb) {
+ assert(n1 >= lb);
+ const uchar* v1 = (const uchar*)p1;
+ const uchar* v2 = (const uchar*)p2;
+ unsigned m1 = *v1;
+ unsigned m2 = *v2;
+ if (m1 <= n1 - lb && m2 <= n2 - lb) {
+ // compare as binary strings
+ unsigned m = (m1 <= m2 ? m1 : m2);
+ int k = memcmp(v1 + lb, v2 + lb, m);
+ if (k == 0) {
+ k = (full ? m1 : m) - m2;
+ }
+ return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
+ }
+ // treat bad data as NULL
+ if (m1 > n1 - lb && m2 <= n2 - lb)
+ return -1;
+ if (m1 <= n1 - lb && m2 > n2 - lb)
+ return +1;
+ return 0;
+ }
+ assert(! full);
+ return CmpUnknown;
}
// allowed but ordering is wrong before wl-1442 done
@@ -489,6 +538,68 @@ NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p
return 0;
}
+// not yet
+int
+NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+{
+ assert(false);
+ return 0;
+}
+
+int
+NdbSqlUtil::cmpLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+{
+ const unsigned lb = 2;
+ // collation does not work on prefix for some charsets
+ assert(full && n1 >= lb && n2 >= lb);
+ const uchar* v1 = (const uchar*)p1;
+ const uchar* v2 = (const uchar*)p2;
+ unsigned m1 = uint2korr(v1);
+ unsigned m2 = uint2korr(v2);
+ if (m1 <= n1 - lb && m2 <= n2 - lb) {
+ CHARSET_INFO* cs = (CHARSET_INFO*)(info);
+ // compare with space padding
+ int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
+ return k < 0 ? -1 : k > 0 ? +1 : 0;
+ }
+ // treat bad data as NULL
+ if (m1 > n1 - lb && m2 <= n2 - lb)
+ return -1;
+ if (m1 <= n1 - lb && m2 > n2 - lb)
+ return +1;
+ return 0;
+}
+
+int
+NdbSqlUtil::cmpLongvarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+{
+ const unsigned lb = 2;
+ if (n2 >= lb) {
+ assert(n1 >= lb);
+ const uchar* v1 = (const uchar*)p1;
+ const uchar* v2 = (const uchar*)p2;
+ unsigned m1 = uint2korr(v1);
+ unsigned m2 = uint2korr(v2);
+ if (m1 <= n1 - lb && m2 <= n2 - lb) {
+ // compare as binary strings
+ unsigned m = (m1 <= m2 ? m1 : m2);
+ int k = memcmp(v1 + lb, v2 + lb, m);
+ if (k == 0) {
+ k = (full ? m1 : m) - m2;
+ }
+ return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
+ }
+ // treat bad data as NULL
+ if (m1 > n1 - lb && m2 <= n2 - lb)
+ return -1;
+ if (m1 <= n1 - lb && m2 > n2 - lb)
+ return +1;
+ return 0;
+ }
+ assert(! full);
+ return CmpUnknown;
+}
+
// check charset
bool
@@ -508,8 +619,6 @@ NdbSqlUtil::usable_in_pk(Uint32 typeId, const void* info)
}
break;
case Type::Undefined:
- case Type::Varchar:
- case Type::Varbinary:
case Type::Blob:
case Type::Text:
break;
@@ -545,8 +654,6 @@ NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info)
}
break;
case Type::Undefined:
- case Type::Varchar:
- case Type::Varbinary:
case Type::Blob:
case Type::Text:
break;
@@ -555,3 +662,68 @@ NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info)
}
return false;
}
+
+// utilities
+
+bool
+NdbSqlUtil::get_var_length(Uint32 typeId, const void* p, unsigned attrlen, Uint32& lb, Uint32& len)
+{
+ const unsigned char* const src = (const unsigned char*)p;
+ switch (typeId) {
+ case NdbSqlUtil::Type::Varchar:
+ case NdbSqlUtil::Type::Varbinary:
+ lb = 1;
+ if (attrlen >= lb) {
+ len = src[0];
+ if (attrlen >= lb + len)
+ return true;
+ }
+ break;
+ case NdbSqlUtil::Type::Longvarchar:
+ case NdbSqlUtil::Type::Longvarbinary:
+ lb = 2;
+ if (attrlen >= lb) {
+ len = src[0] + (src[1] << 8);
+ if (attrlen >= lb + len)
+ return true;
+ }
+ break;
+ default:
+ lb = 0;
+ len = attrlen;
+ return true;
+ break;
+ }
+ return false;
+}
+
+// workaround
+
+int
+NdbSqlUtil::strnxfrm_bug7284(CHARSET_INFO* cs, unsigned char* dst, unsigned dstLen, const unsigned char*src, unsigned srcLen)
+{
+ unsigned char nsp[20]; // native space char
+ unsigned char xsp[20]; // strxfrm-ed space char
+#ifdef VM_TRACE
+ memset(nsp, 0x1f, sizeof(nsp));
+ memset(xsp, 0x1f, sizeof(xsp));
+#endif
+ // convert from unicode codepoint for space
+ int n1 = (*cs->cset->wc_mb)(cs, (my_wc_t)0x20, nsp, nsp + sizeof(nsp));
+ if (n1 <= 0)
+ return -1;
+ // strxfrm to binary
+ int n2 = (*cs->coll->strnxfrm)(cs, xsp, sizeof(xsp), nsp, n1);
+ if (n2 <= 0)
+ return -1;
+ // strxfrm argument string - returns no error indication
+ int n3 = (*cs->coll->strnxfrm)(cs, dst, dstLen, src, srcLen);
+ // pad with strxfrm-ed space chars
+ int n4 = n3;
+ while (n4 < (int)dstLen) {
+ dst[n4] = xsp[(n4 - n3) % n2];
+ n4++;
+ }
+ // no check for partial last
+ return dstLen;
+}
diff --git a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
index 0db74a0b709..7033aecccf8 100644
--- a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
+++ b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
@@ -1861,12 +1861,18 @@ Dbacc::xfrmKeyData(Signal* signal)
dstWords = srcWords;
} else {
jam();
+ Uint32 typeId = AttributeDescriptor::getType(keyAttr.attributeDescriptor);
+ Uint32 lb, len;
+ bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
+ ndbrequire(ok);
Uint32 xmul = cs->strxfrm_multiply;
if (xmul == 0)
xmul = 1;
- Uint32 dstLen = xmul * srcBytes;
+ // see comment in DbtcMain.cpp
+ Uint32 dstLen = xmul * (srcBytes - lb);
ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
- uint n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes);
+ int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
+ ndbrequire(n != -1);
while ((n & 3) != 0)
dstPtr[n++] = 0;
dstWords = (n >> 2);
diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
index f62b1eda587..709cd8a4c0f 100644
--- a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
+++ b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
@@ -4827,9 +4827,7 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
}
}
- /**
- * Ignore incoming old-style type and recompute it.
- */
+ // compute attribute size and array size
bool translateOk = attrDesc.translateExtType();
tabRequire(translateOk, CreateTableRef::Inconsistency);
diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
index 60f57c60f1f..0472d25318b 100644
--- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
+++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
@@ -1446,7 +1446,7 @@ private:
void gcpTcfinished(Signal* signal);
void handleGcp(Signal* signal);
void hash(Signal* signal);
- Uint32 handle_special_hash(Uint32 dstHash[4],
+ bool handle_special_hash(Uint32 dstHash[4],
Uint32* src, Uint32 srcLen,
Uint32 tabPtrI, bool distr);
diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
index 2f371458b73..6d4ca2d9078 100644
--- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
+++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
@@ -2314,7 +2314,7 @@ void Dbtc::hash(Signal* signal)
}//if
}//Dbtc::hash()
-Uint32
+bool
Dbtc::handle_special_hash(Uint32 dstHash[4], Uint32* src, Uint32 srcLen,
Uint32 tabPtrI,
bool distr)
@@ -2349,17 +2349,26 @@ Dbtc::handle_special_hash(Uint32 dstHash[4], Uint32* src, Uint32 srcLen,
dstWords = srcWords;
} else {
jam();
+ Uint32 typeId =
+ AttributeDescriptor::getType(keyAttr.attributeDescriptor);
+ Uint32 lb, len;
+ bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
+ ndbrequire(ok);
Uint32 xmul = cs->strxfrm_multiply;
if (xmul == 0)
xmul = 1;
- Uint32 dstLen = xmul * srcBytes;
+ /*
+ * Varchar is really Char. End spaces do not matter. To get
+ * same hash we blank-pad to maximum length via strnxfrm.
+ * TODO use MySQL charset-aware hash function instead
+ */
+ Uint32 dstLen = xmul * (srcBytes - lb);
ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
- uint n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes);
+ uint n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
while ((n & 3) != 0) {
dstPtr[n++] = 0;
}
dstWords = (n >> 2);
-
}
dstPos += dstWords;
srcPos += srcWords;
@@ -2418,6 +2427,7 @@ Dbtc::handle_special_hash(Uint32 dstHash[4], Uint32* src, Uint32 srcLen,
md5_hash(tmp, (Uint64*)dst, dstPos);
dstHash[1] = tmp[1];
}
+ return true; // success
}
/*
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
index 015d555e67d..06b2b3f4cb4 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
@@ -363,25 +363,25 @@ Dbtup::readFixedSizeTHManyWordNotNULL(Uint32* outBuffer,
ljam();
Tablerec* regTabPtr = tabptr.p;
Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+ uchar* dstPtr = (uchar*)&outBuffer[indexBuf];
+ const uchar* srcPtr = (uchar*)&tTupleHeader[readOffset];
Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
ndbrequire(i < regTabPtr->noOfCharsets);
CHARSET_INFO* cs = regTabPtr->charsetArray[i];
- Uint32 xmul = cs->strxfrm_multiply;
- if (xmul == 0)
- xmul = 1;
- Uint32 dstLen = xmul * srcBytes;
- Uint32 maxIndexBuf = indexBuf + (dstLen >> 2);
- if (maxIndexBuf <= maxRead) {
- ljam();
- uchar* dstPtr = (uchar*)&outBuffer[indexBuf];
- const uchar* srcPtr = (uchar*)&tTupleHeader[readOffset];
- const char* ssrcPtr = (const char*)srcPtr;
- // could verify data format optionally
- if (true ||
- (*cs->cset->well_formed_len)(cs, ssrcPtr, ssrcPtr + srcBytes, ZNIL) == srcBytes) {
+ Uint32 typeId = AttributeDescriptor::getType(attrDescriptor);
+ Uint32 lb, len;
+ bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
+ if (ok) {
+ Uint32 xmul = cs->strxfrm_multiply;
+ if (xmul == 0)
+ xmul = 1;
+ // see comment in DbtcMain.cpp
+ Uint32 dstLen = xmul * (srcBytes - lb);
+ Uint32 maxIndexBuf = indexBuf + (dstLen >> 2);
+ if (maxIndexBuf <= maxRead) {
ljam();
- // normalize
- Uint32 n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes);
+ int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
+ ndbrequire(n != -1);
while ((n & 3) != 0) {
dstPtr[n++] = 0;
}
@@ -393,11 +393,11 @@ Dbtup::readFixedSizeTHManyWordNotNULL(Uint32* outBuffer,
return true;
} else {
ljam();
- terrorCode = ZTUPLE_CORRUPTED_ERROR;
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
}
} else {
ljam();
- terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ terrorCode = ZTUPLE_CORRUPTED_ERROR;
}
}
return false;
@@ -814,10 +814,15 @@ Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer,
// not const in MySQL
CHARSET_INFO* cs = regTabPtr->charsetArray[i];
const char* ssrc = (const char*)&inBuffer[tInBufIndex + 1];
+ Uint32 lb, len;
+ if (! NdbSqlUtil::get_var_length(typeId, ssrc, bytes, lb, len)) {
+ ljam();
+ terrorCode = ZINVALID_CHAR_FORMAT;
+ return false;
+ }
// fast fix bug#7340
if (typeId != NDB_TYPE_TEXT &&
- (*cs->cset->well_formed_len)(cs, ssrc, ssrc+bytes, ZNIL) != bytes)
- {
+ (*cs->cset->well_formed_len)(cs, ssrc + lb, ssrc + lb + len, ZNIL) != len) {
ljam();
terrorCode = ZINVALID_CHAR_FORMAT;
return false;
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
index 84081e76a8c..a61b7c1f5ca 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
@@ -177,18 +177,29 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
dstWords = srcWords;
} else {
jam();
+ Uint32 typeId = descAttr.m_typeId;
+ Uint32 lb, len;
+ bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
+ if (! ok) {
+ jam();
+ scan.m_state = ScanOp::Invalid;
+ sig->errorCode = TuxBoundInfo::InvalidCharFormat;
+ return;
+ }
CHARSET_INFO* cs = all_charsets[descAttr.m_charset];
Uint32 xmul = cs->strxfrm_multiply;
if (xmul == 0)
xmul = 1;
- Uint32 dstLen = xmul * srcBytes;
+ // see comment in DbtcMain.cpp
+ Uint32 dstLen = xmul * (srcBytes - lb);
if (dstLen > ((dstSize - dstPos) << 2)) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::TooMuchAttrInfo;
return;
}
- Uint32 n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes);
+ int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
+ ndbrequire(n != -1);
while ((n & 3) != 0) {
dstPtr[n++] = 0;
}
diff --git a/ndb/src/ndbapi/NdbDictionary.cpp b/ndb/src/ndbapi/NdbDictionary.cpp
index f641b8cd5a0..d4e4821ad39 100644
--- a/ndb/src/ndbapi/NdbDictionary.cpp
+++ b/ndb/src/ndbapi/NdbDictionary.cpp
@@ -937,6 +937,12 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col)
case NdbDictionary::Column::Bit:
out << "Bit(" << col.getLength() << ")";
break;
+ case NdbDictionary::Column::Longvarchar:
+ out << "Longvarchar(" << col.getLength() << ";" << csname << ")";
+ break;
+ case NdbDictionary::Column::Longvarbinary:
+ out << "Longvarbinary(" << col.getLength() << ")";
+ break;
default:
out << "Type" << (Uint32)col.getType();
break;
diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp
index 6bef5bcadfc..71e95ebafa6 100644
--- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp
+++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp
@@ -143,6 +143,24 @@ NdbColumnImpl::init(Type t)
m_length = 4;
m_cs = default_cs;
break;
+ case Bit:
+ m_precision = 0;
+ m_scale = 0;
+ m_length = 1;
+ m_cs = NULL;
+ break;
+ case Longvarchar:
+ m_precision = 0;
+ m_scale = 0;
+ m_length = 1; // legal
+ m_cs = default_cs;
+ break;
+ case Longvarbinary:
+ m_precision = 0;
+ m_scale = 0;
+ m_length = 1; // legal
+ m_cs = NULL;
+ break;
case Undefined:
assert(false);
break;
@@ -1151,6 +1169,8 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
const Uint32 * data, Uint32 len,
bool fullyQualifiedNames)
{
+ DBUG_ENTER("NdbDictInterface::parseTableInfo");
+
SimplePropertiesLinearReader it(data, len);
DictTabInfo::Table tableDesc; tableDesc.init();
SimpleProperties::UnpackStatus s;
@@ -1160,7 +1180,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
true, true);
if(s != SimpleProperties::Break){
- return 703;
+ DBUG_RETURN(703);
}
const char * internalName = tableDesc.TableName;
const char * externalName = Ndb::externalizeTableName(internalName, fullyQualifiedNames);
@@ -1211,15 +1231,17 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
true, true);
if(s != SimpleProperties::Break){
delete impl;
- return 703;
+ DBUG_RETURN(703);
}
NdbColumnImpl * col = new NdbColumnImpl();
col->m_attrId = attrDesc.AttributeId;
col->setName(attrDesc.AttributeName);
- if (attrDesc.AttributeExtType >= NDB_TYPE_MAX) {
+
+ // check type and compute attribute size and array size
+ if (! attrDesc.translateExtType()) {
delete impl;
- return 703;
+ DBUG_RETURN(703);
}
col->m_type = (NdbDictionary::Column::Type)attrDesc.AttributeExtType;
col->m_precision = (attrDesc.AttributeExtPrecision & 0xFFFF);
@@ -1230,21 +1252,15 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
// charset is defined exactly for char types
if (col->getCharType() != (cs_number != 0)) {
delete impl;
- return 703;
+ DBUG_RETURN(703);
}
if (col->getCharType()) {
col->m_cs = get_charset(cs_number, MYF(0));
if (col->m_cs == NULL) {
delete impl;
- return 743;
+ DBUG_RETURN(743);
}
}
-
- // translate to old kernel types and sizes
- if (! attrDesc.translateExtType()) {
- delete impl;
- return 703;
- }
col->m_attrSize = (1 << attrDesc.AttributeSize) / 8;
col->m_arraySize = attrDesc.AttributeArraySize;
if(attrDesc.AttributeSize == 0)
@@ -1277,7 +1293,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
if(impl->m_columns[attrDesc.AttributeId] != 0){
delete col;
delete impl;
- return 703;
+ DBUG_RETURN(703);
}
impl->m_columns[attrDesc.AttributeId] = col;
it.next();
@@ -1288,7 +1304,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
impl->m_noOfBlobs = blobCount;
impl->m_noOfDistributionKeys = distKeys;
* ret = impl;
- return 0;
+ DBUG_RETURN(0);
}
/*****************************************************************
@@ -1448,7 +1464,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
if (col->m_autoIncrement) {
if (haveAutoIncrement) {
m_error.code = 4335;
- return -1;
+ DBUG_RETURN(-1);
}
haveAutoIncrement = true;
autoIncrementValue = col->m_autoIncrementInitialValue;
@@ -1498,14 +1514,16 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
tmpAttr.AttributeNullableFlag = col->m_nullable;
tmpAttr.AttributeDKey = col->m_distributionKey;
- if (col->m_type >= NDB_TYPE_MAX) {
- m_error.code = 703;
- return -1;
- }
tmpAttr.AttributeExtType = (Uint32)col->m_type;
tmpAttr.AttributeExtPrecision = ((unsigned)col->m_precision & 0xFFFF);
tmpAttr.AttributeExtScale = col->m_scale;
tmpAttr.AttributeExtLength = col->m_length;
+
+ // check type and compute attribute size and array size
+ if (! tmpAttr.translateExtType()) {
+ m_error.code = 703;
+ DBUG_RETURN(-1);
+ }
// charset is defined exactly for char types
if (col->getCharType() != (col->m_cs != NULL)) {
m_error.code = 703;
@@ -1519,16 +1537,13 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
// distribution key not supported for Char attribute
if (col->m_distributionKey && col->m_cs != NULL) {
m_error.code = 745;
- return -1;
+ DBUG_RETURN(-1);
}
// charset in upper half of precision
if (col->getCharType()) {
tmpAttr.AttributeExtPrecision |= (col->m_cs->number << 16);
}
- // DICT will ignore and recompute this
- (void)tmpAttr.translateExtType();
-
tmpAttr.AttributeAutoIncrement = col->m_autoIncrement;
BaseString::snprintf(tmpAttr.AttributeDefaultValue,
sizeof(tmpAttr.AttributeDefaultValue),
@@ -1573,7 +1588,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
ret= createTable(&tSignal, ptr);
if (ret)
- return ret;
+ DBUG_RETURN(ret);
if (haveAutoIncrement) {
if (!ndb.setAutoIncrementValue(impl.m_externalName.c_str(),
diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/ndb/src/ndbapi/NdbDictionaryImpl.hpp
index df20915a0e0..0b2de8f2fdc 100644
--- a/ndb/src/ndbapi/NdbDictionaryImpl.hpp
+++ b/ndb/src/ndbapi/NdbDictionaryImpl.hpp
@@ -448,7 +448,8 @@ bool
NdbColumnImpl::getCharType() const {
return (m_type == NdbDictionary::Column::Char ||
m_type == NdbDictionary::Column::Varchar ||
- m_type == NdbDictionary::Column::Text);
+ m_type == NdbDictionary::Column::Text ||
+ m_type == NdbDictionary::Column::Longvarchar);
}
inline
diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp
index ce4a28c1273..835e33dfb40 100644
--- a/ndb/src/ndbapi/NdbOperationDefine.cpp
+++ b/ndb/src/ndbapi/NdbOperationDefine.cpp
@@ -406,6 +406,14 @@ int
NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
const char* aValuePassed, Uint32 len)
{
+ DBUG_ENTER("NdbOperation::setValue");
+ DBUG_PRINT("enter", ("col=%s op=%d val=0x%x len=%u",
+ tAttrInfo->m_name.c_str(),
+ theOperationType,
+ aValuePassed, len));
+ if (aValuePassed != NULL)
+ DBUG_DUMP("value", (char*)aValuePassed, len);
+
int tReturnCode;
Uint32 tAttrId;
Uint32 tData;
@@ -421,7 +429,7 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
;
} else {
setErrorCodeAbort(4234);
- return -1;
+ DBUG_RETURN(-1);
}//if
} else {
if (tStatus == GetValue) {
@@ -432,7 +440,7 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
// to set values in the tuple by setValue.
//--------------------------------------------------------------------
if (insertATTRINFO(Interpreter::EXIT_OK) == -1){
- return -1;
+ DBUG_RETURN(-1);
}
theInterpretedSize = theTotalCurrAI_Len -
(theInitialReadSize + 5);
@@ -443,47 +451,47 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
// setValue used in the wrong context. Application coding error.
//-------------------------------------------------------------------
setErrorCodeAbort(4234); //Wrong error code
- return -1;
+ DBUG_RETURN(-1);
}//if
theStatus = SetValueInterpreted;
}//if
} else if (tOpType == InsertRequest) {
if ((theStatus != SetValue) && (theStatus != OperationDefined)) {
setErrorCodeAbort(4234);
- return -1;
+ DBUG_RETURN(-1);
}//if
} else if (tOpType == ReadRequest || tOpType == ReadExclusive) {
setErrorCodeAbort(4504);
- return -1;
+ DBUG_RETURN(-1);
} else if (tOpType == DeleteRequest) {
setErrorCodeAbort(4504);
- return -1;
+ DBUG_RETURN(-1);
} else if (tOpType == OpenScanRequest || tOpType == OpenRangeScanRequest) {
setErrorCodeAbort(4228);
- return -1;
+ DBUG_RETURN(-1);
} else {
//---------------------------------------------------------------------
// setValue with undefined operation type.
// Probably application coding error.
//---------------------------------------------------------------------
setErrorCodeAbort(4108);
- return -1;
+ DBUG_RETURN(-1);
}//if
if (tAttrInfo == NULL) {
setErrorCodeAbort(4004);
- return -1;
+ DBUG_RETURN(-1);
}//if
if (tAttrInfo->m_pk) {
if (theOperationType == InsertRequest) {
- return equal_impl(tAttrInfo, aValuePassed, len);
+ DBUG_RETURN(equal_impl(tAttrInfo, aValuePassed, len));
} else {
setErrorCodeAbort(4202);
- return -1;
+ DBUG_RETURN(-1);
}//if
}//if
if (len > 8000) {
setErrorCodeAbort(4216);
- return -1;
+ DBUG_RETURN(-1);
}//if
tAttrId = tAttrInfo->m_attrId;
@@ -496,13 +504,13 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
insertATTRINFO(ahValue);
// Insert Attribute Id with the value
// NULL into ATTRINFO part.
- return 0;
+ DBUG_RETURN(0);
} else {
/***********************************************************************
* Setting a NULL value on a NOT NULL attribute is not allowed.
**********************************************************************/
setErrorCodeAbort(4203);
- return -1;
+ DBUG_RETURN(-1);
}//if
}//if
@@ -522,7 +530,7 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
const Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ;
if (len != sizeInBytes && (len != 0)) {
setErrorCodeAbort(4209);
- return -1;
+ DBUG_RETURN(-1);
}//if
const Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Including bits in last word
const Uint32 sizeInWords = sizeInBytes / 4; // Excluding bits in last word
@@ -550,7 +558,7 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
tReturnCode = insertATTRINFOloop((Uint32*)aValue, sizeInWords);
if (tReturnCode == -1) {
- return tReturnCode;
+ DBUG_RETURN(tReturnCode);
}//if
if (bitsInLastWord != 0) {
tData = *(Uint32*)(aValue + sizeInWords*4);
@@ -559,11 +567,11 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
tData = convertEndian(tData);
tReturnCode = insertATTRINFO(tData);
if (tReturnCode == -1) {
- return tReturnCode;
+ DBUG_RETURN(tReturnCode);
}//if
}//if
theErrorLine++;
- return 0;
+ DBUG_RETURN(0);
}//NdbOperation::setValue()
NdbBlob*
diff --git a/ndb/src/ndbapi/NdbOperationSearch.cpp b/ndb/src/ndbapi/NdbOperationSearch.cpp
index 70850bcc66b..6e76287eef0 100644
--- a/ndb/src/ndbapi/NdbOperationSearch.cpp
+++ b/ndb/src/ndbapi/NdbOperationSearch.cpp
@@ -57,8 +57,16 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
const char* aValuePassed,
Uint32 aVariableKeyLen)
{
- register Uint32 tAttrId;
+ DBUG_ENTER("NdbOperation::equal_impl");
+ DBUG_PRINT("enter", ("col=%s op=%d val=0x%x len=%u",
+ tAttrInfo->m_name.c_str(),
+ theOperationType,
+ aValuePassed, aVariableKeyLen));
+ if (aValuePassed != NULL)
+ DBUG_DUMP("value", (char*)aValuePassed, aVariableKeyLen);
+ register Uint32 tAttrId;
+
Uint32 tData;
Uint32 tKeyInfoPosition;
const char* aValue = aValuePassed;
@@ -120,7 +128,9 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
theTupleKeyDefined[i][1] = tKeyInfoPosition;
theTupleKeyDefined[i][2] = true;
+ OperationType tOpType = theOperationType;
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
+
{
/************************************************************************
* Check if the pointer of the value passed is aligned on a 4 byte
@@ -176,7 +186,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
}//if
#endif
- OperationType tOpType = theOperationType;
/**************************************************************************
* If the operation is an insert request and the attribute is stored then
* we also set the value in the stored part through putting the
@@ -231,7 +240,7 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
} else {
theStatus = SetValue;
}//if
- return 0;
+ DBUG_RETURN(0);
} else if ((tOpType == ReadRequest) || (tOpType == DeleteRequest) ||
(tOpType == ReadExclusive)) {
theStatus = GetValue;
@@ -242,42 +251,42 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
assert(c != 0);
if (c->getBlobType()) {
if (getBlobHandle(theNdbCon, c) == NULL)
- return -1;
+ DBUG_RETURN(-1);
}
}
}
- return 0;
+ DBUG_RETURN(0);
} else if ((tOpType == InsertRequest) || (tOpType == WriteRequest)) {
theStatus = SetValue;
- return 0;
+ DBUG_RETURN(0);
} else {
setErrorCodeAbort(4005);
- return -1;
+ DBUG_RETURN(-1);
}//if
- return 0;
+ DBUG_RETURN(0);
}//if
} else {
- return -1;
+ DBUG_RETURN(-1);
}//if
- return 0;
+ DBUG_RETURN(0);
}
if (aValue == NULL) {
// NULL value in primary key
setErrorCodeAbort(4505);
- return -1;
+ DBUG_RETURN(-1);
}//if
if ( tAttrInfo == NULL ) {
// Attribute name not found in table
setErrorCodeAbort(4004);
- return -1;
+ DBUG_RETURN(-1);
}//if
if (theStatus == GetValue || theStatus == SetValue){
// All pk's defined
setErrorCodeAbort(4225);
- return -1;
+ DBUG_RETURN(-1);
}//if
ndbout_c("theStatus: %d", theStatus);
@@ -285,19 +294,19 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
// If we come here, set a general errorcode
// and exit
setErrorCodeAbort(4200);
- return -1;
+ DBUG_RETURN(-1);
equal_error1:
setErrorCodeAbort(4205);
- return -1;
+ DBUG_RETURN(-1);
equal_error2:
setErrorCodeAbort(4206);
- return -1;
+ DBUG_RETURN(-1);
equal_error3:
setErrorCodeAbort(4209);
- return -1;
+ DBUG_RETURN(-1);
}
/******************************************************************************
diff --git a/ndb/src/ndbapi/NdbRecAttr.cpp b/ndb/src/ndbapi/NdbRecAttr.cpp
index 37142c94f8f..57f896e7e42 100644
--- a/ndb/src/ndbapi/NdbRecAttr.cpp
+++ b/ndb/src/ndbapi/NdbRecAttr.cpp
@@ -224,9 +224,11 @@ NdbOut& operator<<(NdbOut& out, const NdbRecAttr &r)
j = r.arraySize();
break;
case NdbDictionary::Column::Varchar:
- ndbrecattr_print_string(out,"Varchar", r.aRef()+ 2,
- ntohs(r.u_short_value()));
- j = r.arraySize();
+ {
+ unsigned len = *(const unsigned char*)r.aRef();
+ ndbrecattr_print_string(out,"Varchar", r.aRef()+1,len);
+ j = r.arraySize();
+ }
break;
case NdbDictionary::Column::Float:
out << r.float_value();
@@ -256,6 +258,13 @@ NdbOut& operator<<(NdbOut& out, const NdbRecAttr &r)
j = r.arraySize();
}
break;
+ case NdbDictionary::Column::Longvarchar:
+ {
+ unsigned len = uint2korr(r.aRef());
+ ndbrecattr_print_string(out,"Longvarchar", r.aRef()+2,len);
+ j = r.arraySize();
+ }
+ break;
default: /* no print functions for the rest, just print type */
out << (int) r.getType();
j = r.arraySize();
diff --git a/ndb/src/ndbapi/NdbTransaction.cpp b/ndb/src/ndbapi/NdbTransaction.cpp
index d010c0ae0d8..f6664657f73 100644
--- a/ndb/src/ndbapi/NdbTransaction.cpp
+++ b/ndb/src/ndbapi/NdbTransaction.cpp
@@ -1092,7 +1092,11 @@ NdbTransaction::getNdbIndexScanOperation(const char* anIndexName,
{
NdbIndexImpl* index =
theNdb->theDictionary->getIndex(anIndexName, aTableName);
+ if (index == 0)
+ return 0;
NdbTableImpl* table = theNdb->theDictionary->getTable(aTableName);
+ if (table == 0)
+ return 0;
return getNdbIndexScanOperation(index, table);
}
diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp
index adaccebed6e..8a09a2ec9d1 100644
--- a/ndb/test/ndbapi/testOIBasic.cpp
+++ b/ndb/test/ndbapi/testOIBasic.cpp
@@ -28,7 +28,9 @@
#include <NdbCondition.h>
#include <NdbThread.h>
#include <NdbTick.h>
+#include <NdbSleep.h>
#include <my_sys.h>
+#include <NdbSqlUtil.hpp>
// options
@@ -72,7 +74,7 @@ struct Opt {
m_die(0),
m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined),
- m_subsubloop(2),
+ m_subsubloop(4),
m_index(0),
m_loop(1),
m_msglock(true),
@@ -87,7 +89,7 @@ struct Opt {
m_seed(-1),
m_subloop(4),
m_table(0),
- m_threads(10),
+ m_threads(4),
m_v(1) {
}
};
@@ -257,6 +259,7 @@ struct Par : public Opt {
bool m_verify;
// deadlock possible
bool m_deadlock;
+ NdbOperation::LockMode m_lockmode;
// ordered range scan
bool m_ordered;
bool m_descending;
@@ -278,6 +281,7 @@ struct Par : public Opt {
m_randomkey(false),
m_verify(false),
m_deadlock(false),
+ m_lockmode(NdbOperation::LM_Read),
m_ordered(false),
m_descending(false) {
}
@@ -598,7 +602,9 @@ getcs(Par par)
struct Col {
enum Type {
Unsigned = NdbDictionary::Column::Unsigned,
- Char = NdbDictionary::Column::Char
+ Char = NdbDictionary::Column::Char,
+ Varchar = NdbDictionary::Column::Varchar,
+ Longvarchar = NdbDictionary::Column::Longvarchar
};
const class Tab& m_tab;
unsigned m_num;
@@ -612,7 +618,7 @@ struct Col {
Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs);
~Col();
bool equal(const Col& col2) const;
- void verify(const void* addr) const;
+ void wellformed(const void* addr) const;
};
Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs) :
@@ -626,6 +632,9 @@ Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type typ
m_nullable(nullable),
m_chs(chs)
{
+ // fix long varchar
+ if (type == Varchar && m_bytelength > 255)
+ m_type = Longvarchar;
}
Col::~Col()
@@ -640,7 +649,7 @@ Col::equal(const Col& col2) const
}
void
-Col::verify(const void* addr) const
+Col::wellformed(const void* addr) const
{
switch (m_type) {
case Col::Unsigned:
@@ -653,6 +662,26 @@ Col::verify(const void* addr) const
assert((*cs->cset->well_formed_len)(cs, src, src + len, 0xffff) == len);
}
break;
+ case Col::Varchar:
+ {
+ CHARSET_INFO* cs = m_chs->m_cs;
+ const unsigned char* src = (const unsigned char*)addr;
+ const char* ssrc = (const char*)src;
+ unsigned len = src[0];
+ assert(len <= m_bytelength);
+ assert((*cs->cset->well_formed_len)(cs, ssrc + 1, ssrc + 1 + len, 0xffff) == len);
+ }
+ break;
+ case Col::Longvarchar:
+ {
+ CHARSET_INFO* cs = m_chs->m_cs;
+ const unsigned char* src = (const unsigned char*)addr;
+ const char* ssrc = (const char*)src;
+ unsigned len = src[0] + (src[1] << 8);
+ assert(len <= m_bytelength);
+ assert((*cs->cset->well_formed_len)(cs, ssrc + 2, ssrc + 2 + len, 0xffff) == len);
+ }
+ break;
default:
assert(false);
break;
@@ -673,6 +702,18 @@ operator<<(NdbOut& out, const Col& col)
out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
}
break;
+ case Col::Varchar:
+ {
+ CHARSET_INFO* cs = col.m_chs->m_cs;
+ out << " varchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
+ }
+ break;
+ case Col::Longvarchar:
+ {
+ CHARSET_INFO* cs = col.m_chs->m_cs;
+ out << " longvarchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
+ }
+ break;
default:
out << "type" << (int)col.m_type;
assert(false);
@@ -928,35 +969,27 @@ makebuiltintables(Par par)
t->itabadd(2, x);
}
if (useindex(par, 3)) {
- // d, c, b
- ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 3);
- x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
- x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
- x->icoladd(2, new ICol(*x, 2, *t->m_col[1]));
- t->itabadd(3, x);
- }
- if (useindex(par, 4)) {
// b, e, c, d
- ITab* x = new ITab(*t, "ti0x4", ITab::OrderedIndex, 4);
+ ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 4);
x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
x->icoladd(3, new ICol(*x, 3, *t->m_col[3]));
- t->itabadd(4, x);
+ t->itabadd(3, x);
}
- if (useindex(par, 5)) {
+ if (useindex(par, 4)) {
// a, c
- ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2);
+ ITab* x = new ITab(*t, "ti0z4", ITab::UniqueHashIndex, 2);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
- t->itabadd(5, x);
+ t->itabadd(4, x);
}
- if (useindex(par, 6)) {
+ if (useindex(par, 5)) {
// a, e
- ITab* x = new ITab(*t, "ti0z6", ITab::UniqueHashIndex, 2);
+ ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
- t->itabadd(6, x);
+ t->itabadd(5, x);
}
tablist[0] = t;
}
@@ -967,56 +1000,50 @@ makebuiltintables(Par par)
t->coladd(0, new Col(*t, 0, "a", 0, Col::Unsigned, 1, 0, 0));
t->coladd(1, new Col(*t, 1, "b", 1, Col::Unsigned, 1, 0, 0));
t->coladd(2, new Col(*t, 2, "c", 0, Col::Char, 20, 1, getcs(par)));
- t->coladd(3, new Col(*t, 3, "d", 0, Col::Char, 5, 0, getcs(par)));
- t->coladd(4, new Col(*t, 4, "e", 0, Col::Char, 5, 1, getcs(par)));
+ t->coladd(3, new Col(*t, 3, "d", 0, Col::Varchar, 5, 0, getcs(par)));
+ t->coladd(4, new Col(*t, 4, "e", 0, Col::Longvarchar, 5, 1, getcs(par)));
if (useindex(par, 0)) {
// b
ITab* x = new ITab(*t, "ti1x0", ITab::OrderedIndex, 1);
x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
+ t->itabadd(0, x);
}
if (useindex(par, 1)) {
- // a, c
+ // c, a
ITab* x = new ITab(*t, "ti1x1", ITab::OrderedIndex, 2);
- x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
- x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[2]));
+ x->icoladd(1, new ICol(*x, 1, *t->m_col[0]));
t->itabadd(1, x);
}
if (useindex(par, 2)) {
- // c, a
- ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 2);
- x->icoladd(0, new ICol(*x, 0, *t->m_col[2]));
- x->icoladd(1, new ICol(*x, 1, *t->m_col[0]));
+ // d
+ ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 1);
+ x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
t->itabadd(2, x);
}
if (useindex(par, 3)) {
- // e
- ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 1);
- x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
- t->itabadd(3, x);
- }
- if (useindex(par, 4)) {
// e, d, c, b
- ITab* x = new ITab(*t, "ti1x4", ITab::OrderedIndex, 4);
+ ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 4);
x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
- t->itabadd(4, x);
+ t->itabadd(3, x);
}
- if (useindex(par, 5)) {
+ if (useindex(par, 4)) {
// a, b
- ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 2);
+ ITab* x = new ITab(*t, "ti1z4", ITab::UniqueHashIndex, 2);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[1]));
- t->itabadd(5, x);
+ t->itabadd(4, x);
}
- if (useindex(par, 6)) {
+ if (useindex(par, 5)) {
// a, b, d
- ITab* x = new ITab(*t, "ti1z6", ITab::UniqueHashIndex, 3);
+ ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 3);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[1]));
x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
- t->itabadd(6, x);
+ t->itabadd(5, x);
}
tablist[1] = t;
}
@@ -1027,8 +1054,8 @@ makebuiltintables(Par par)
t->coladd(0, new Col(*t, 0, "a", 1, Col::Char, 31, 0, getcs(par)));
t->coladd(1, new Col(*t, 1, "b", 0, Col::Char, 4, 1, getcs(par)));
t->coladd(2, new Col(*t, 2, "c", 1, Col::Unsigned, 1, 0, 0));
- t->coladd(3, new Col(*t, 3, "d", 1, Col::Char, 3, 0, getcs(par)));
- t->coladd(4, new Col(*t, 4, "e", 0, Col::Char, 17, 0, getcs(par)));
+ t->coladd(3, new Col(*t, 3, "d", 1, Col::Varchar, 128, 0, getcs(par)));
+ t->coladd(4, new Col(*t, 4, "e", 0, Col::Varchar, 7, 0, getcs(par)));
if (useindex(par, 0)) {
// a, c, d
ITab* x = new ITab(*t, "ti2x0", ITab::OrderedIndex, 3);
@@ -1060,27 +1087,20 @@ makebuiltintables(Par par)
t->itabadd(3, x);
}
if (useindex(par, 4)) {
- // a, e
- ITab* x = new ITab(*t, "ti2x4", ITab::OrderedIndex, 2);
- x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
- x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
- t->itabadd(4, x);
- }
- if (useindex(par, 5)) {
// a, c
- ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 2);
+ ITab* x = new ITab(*t, "ti2z4", ITab::UniqueHashIndex, 2);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
- t->itabadd(5, x);
+ t->itabadd(4, x);
}
- if (useindex(par, 6)) {
+ if (useindex(par, 5)) {
// a, c, d, e
- ITab* x = new ITab(*t, "ti2z6", ITab::UniqueHashIndex, 4);
+ ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 4);
x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
x->icoladd(3, new ICol(*x, 3, *t->m_col[4]));
- t->itabadd(6, x);
+ t->itabadd(5, x);
}
tablist[2] = t;
}
@@ -1115,19 +1135,19 @@ struct Con {
void disconnect();
int startTransaction();
int getNdbOperation(const Tab& tab);
+ int getNdbIndexOperation1(const ITab& itab, const Tab& tab);
int getNdbIndexOperation(const ITab& itab, const Tab& tab);
int getNdbScanOperation(const Tab& tab);
- int getNdbScanOperation(const ITab& itab, const Tab& tab);
+ int getNdbIndexScanOperation1(const ITab& itab, const Tab& tab);
+ int getNdbIndexScanOperation(const ITab& itab, const Tab& tab);
int equal(int num, const char* addr);
int getValue(int num, NdbRecAttr*& rec);
int setValue(int num, const char* addr);
int setBound(int num, int type, const void* value);
int execute(ExecType t);
int execute(ExecType t, bool& deadlock);
- int openScanRead(unsigned scanbat, unsigned scanpar);
- int openScanExclusive(unsigned scanbat, unsigned scanpar);
- int openScanOrdered(unsigned scanbat, unsigned scanpar, bool descending);
- int openScanOrderedExclusive(unsigned scanbat, unsigned scanpar, bool descending);
+ int readTuples(Par par);
+ int readIndexTuples(Par par);
int executeScan();
int nextScanResult(bool fetchAllowed);
int nextScanResult(bool fetchAllowed, bool& deadlock);
@@ -1182,7 +1202,7 @@ Con::getNdbOperation(const Tab& tab)
}
int
-Con::getNdbIndexOperation(const ITab& itab, const Tab& tab)
+Con::getNdbIndexOperation1(const ITab& itab, const Tab& tab)
{
assert(m_tx != 0);
CHKCON((m_op = m_indexop = m_tx->getNdbIndexOperation(itab.m_name, tab.m_name)) != 0, *this);
@@ -1190,6 +1210,20 @@ Con::getNdbIndexOperation(const ITab& itab, const Tab& tab)
}
int
+Con::getNdbIndexOperation(const ITab& itab, const Tab& tab)
+{
+ assert(m_tx != 0);
+ unsigned tries = 0;
+ while (1) {
+ if (getNdbIndexOperation1(itab, tab) == 0)
+ break;
+ CHK(++tries < 10);
+ NdbSleep_MilliSleep(100);
+ }
+ return 0;
+}
+
+int
Con::getNdbScanOperation(const Tab& tab)
{
assert(m_tx != 0);
@@ -1198,7 +1232,7 @@ Con::getNdbScanOperation(const Tab& tab)
}
int
-Con::getNdbScanOperation(const ITab& itab, const Tab& tab)
+Con::getNdbIndexScanOperation1(const ITab& itab, const Tab& tab)
{
assert(m_tx != 0);
CHKCON((m_op = m_scanop = m_indexscanop = m_tx->getNdbIndexScanOperation(itab.m_name, tab.m_name)) != 0, *this);
@@ -1206,6 +1240,20 @@ Con::getNdbScanOperation(const ITab& itab, const Tab& tab)
}
int
+Con::getNdbIndexScanOperation(const ITab& itab, const Tab& tab)
+{
+ assert(m_tx != 0);
+ unsigned tries = 0;
+ while (1) {
+ if (getNdbIndexScanOperation1(itab, tab) == 0)
+ break;
+ CHK(++tries < 10);
+ NdbSleep_MilliSleep(100);
+ }
+ return 0;
+}
+
+int
Con::equal(int num, const char* addr)
{
assert(m_tx != 0 && m_op != 0);
@@ -1262,42 +1310,21 @@ Con::execute(ExecType t, bool& deadlock)
}
int
-Con::openScanRead(unsigned scanbat, unsigned scanpar)
+Con::readTuples(Par par)
{
assert(m_tx != 0 && m_scanop != 0);
- NdbOperation::LockMode lm = NdbOperation::LM_Read;
- CHKCON(m_scanop->readTuples(lm, scanbat, scanpar) == 0, *this);
+ CHKCON(m_scanop->readTuples(par.m_lockmode, par.m_scanbat, par.m_scanpar) == 0, *this);
return 0;
}
int
-Con::openScanExclusive(unsigned scanbat, unsigned scanpar)
-{
- assert(m_tx != 0 && m_scanop != 0);
- NdbOperation::LockMode lm = NdbOperation::LM_Exclusive;
- CHKCON(m_scanop->readTuples(lm, scanbat, scanpar) == 0, *this);
- return 0;
-}
-
-int
-Con::openScanOrdered(unsigned scanbat, unsigned scanpar, bool descending)
-{
- assert(m_tx != 0 && m_indexscanop != 0);
- NdbOperation::LockMode lm = NdbOperation::LM_Read;
- CHKCON(m_indexscanop->readTuples(lm, scanbat, scanpar, true, descending) == 0, *this);
- return 0;
-}
-
-int
-Con::openScanOrderedExclusive(unsigned scanbat, unsigned scanpar, bool descending)
+Con::readIndexTuples(Par par)
{
assert(m_tx != 0 && m_indexscanop != 0);
- NdbOperation::LockMode lm = NdbOperation::LM_Exclusive;
- CHKCON(m_indexscanop->readTuples(lm, scanbat, scanpar, true, descending) == 0, *this);
+ CHKCON(m_indexscanop->readTuples(par.m_lockmode, par.m_scanbat, par.m_scanpar, par.m_ordered, par.m_descending) == 0, *this);
return 0;
}
-
int
Con::executeScan()
{
@@ -1564,6 +1591,8 @@ struct Val {
union {
Uint32 m_uint32;
unsigned char* m_char;
+ unsigned char* m_varchar;
+ unsigned char* m_longvarchar;
};
Val(const Col& col);
~Val();
@@ -1576,9 +1605,12 @@ struct Val {
int setval(Par par) const;
void calc(Par par, unsigned i);
void calckey(Par par, unsigned i);
+ void calckeychars(Par par, unsigned i, unsigned& n, unsigned char* buf);
void calcnokey(Par par);
- int verify(const Val& val2) const;
- int cmp(const Val& val2) const;
+ void calcnokeychars(Par par, unsigned& n, unsigned char* buf);
+ int verify(Par par, const Val& val2) const;
+ int cmp(Par par, const Val& val2) const;
+ int cmpchars(Par par, const unsigned char* buf1, unsigned len1, const unsigned char* buf2, unsigned len2) const;
private:
Val& operator=(const Val& val2);
};
@@ -1595,6 +1627,12 @@ Val::Val(const Col& col) :
case Col::Char:
m_char = new unsigned char [col.m_bytelength];
break;
+ case Col::Varchar:
+ m_varchar = new unsigned char [1 + col.m_bytelength];
+ break;
+ case Col::Longvarchar:
+ m_longvarchar = new unsigned char [2 + col.m_bytelength];
+ break;
default:
assert(false);
break;
@@ -1610,6 +1648,12 @@ Val::~Val()
case Col::Char:
delete [] m_char;
break;
+ case Col::Varchar:
+ delete [] m_varchar;
+ break;
+ case Col::Longvarchar:
+ delete [] m_longvarchar;
+ break;
default:
assert(false);
break;
@@ -1640,6 +1684,12 @@ Val::copy(const void* addr)
case Col::Char:
memcpy(m_char, addr, col.m_bytelength);
break;
+ case Col::Varchar:
+ memcpy(m_varchar, addr, 1 + col.m_bytelength);
+ break;
+ case Col::Longvarchar:
+ memcpy(m_longvarchar, addr, 2 + col.m_bytelength);
+ break;
default:
assert(false);
break;
@@ -1656,6 +1706,10 @@ Val::dataaddr() const
return &m_uint32;
case Col::Char:
return m_char;
+ case Col::Varchar:
+ return m_varchar;
+ case Col::Longvarchar:
+ return m_longvarchar;
default:
break;
}
@@ -1704,7 +1758,7 @@ Val::calc(Par par, unsigned i)
const Col& col = m_col;
col.m_pk ? calckey(par, i) : calcnokey(par);
if (! m_null)
- col.verify(dataaddr());
+ col.wellformed(dataaddr());
}
void
@@ -1721,19 +1775,30 @@ Val::calckey(Par par, unsigned i)
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
unsigned n = 0;
- // our random chars may not fill value exactly
- while (n + cs->mbmaxlen <= col.m_bytelength) {
- if (i % (1 + n) == 0) {
- break;
- }
- const Chr& chr = chs->m_chr[i % maxcharcount];
- memcpy(&m_char[n], chr.m_bytes, chr.m_size);
- n += chr.m_size;
- }
- // this will extend by appropriate space
+ calckeychars(par, i, n, m_char);
+ // extend by appropriate space
(*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
}
break;
+ case Col::Varchar:
+ {
+ unsigned n = 0;
+ calckeychars(par, i, n, m_varchar + 1);
+ // set length and pad with nulls
+ m_varchar[0] = n;
+ memset(&m_varchar[1 + n], 0, col.m_bytelength - n);
+ }
+ break;
+ case Col::Longvarchar:
+ {
+ unsigned n = 0;
+ calckeychars(par, i, n, m_longvarchar + 2);
+ // set length and pad with nulls
+ m_longvarchar[0] = (n & 0xff);
+ m_longvarchar[1] = (n >> 8);
+ memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n);
+ }
+ break;
default:
assert(false);
break;
@@ -1741,6 +1806,24 @@ Val::calckey(Par par, unsigned i)
}
void
+Val::calckeychars(Par par, unsigned i, unsigned& n, unsigned char* buf)
+{
+ const Col& col = m_col;
+ const Chs* chs = col.m_chs;
+ CHARSET_INFO* cs = chs->m_cs;
+ n = 0;
+ // our random chars may not fill value exactly
+ while (n + cs->mbmaxlen <= col.m_bytelength) {
+ if (i % (1 + n) == 0) {
+ break;
+ }
+ const Chr& chr = chs->m_chr[i % maxcharcount];
+ memcpy(buf + n, chr.m_bytes, chr.m_size);
+ n += chr.m_size;
+ }
+}
+
+void
Val::calcnokey(Par par)
{
const Col& col = m_col;
@@ -1764,42 +1847,71 @@ Val::calcnokey(Par par)
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
unsigned n = 0;
- // our random chars may not fill value exactly
- while (n + cs->mbmaxlen <= col.m_bytelength) {
- if (urandom(1 + col.m_bytelength) == 0) {
- break;
- }
- unsigned half = maxcharcount / 2;
- int r = irandom((par.m_pctrange * half) / 100);
- if (par.m_bdir != 0 && urandom(10) != 0) {
- if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0)
- r = -r;
- }
- unsigned i = half + r;
- assert(i < maxcharcount);
- const Chr& chr = chs->m_chr[i];
- memcpy(&m_char[n], chr.m_bytes, chr.m_size);
- n += chr.m_size;
- }
- // this will extend by appropriate space
+ calcnokeychars(par, n, m_char);
+ // extend by appropriate space
(*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
}
break;
+ case Col::Varchar:
+ {
+ unsigned n = 0;
+ calcnokeychars(par, n, m_varchar + 1);
+ // set length and pad with nulls
+ m_varchar[0] = n;
+ memset(&m_varchar[1 + n], 0, col.m_bytelength - n);
+ }
+ break;
+ case Col::Longvarchar:
+ {
+ unsigned n = 0;
+ calcnokeychars(par, n, m_longvarchar + 2);
+ // set length and pad with nulls
+ m_longvarchar[0] = (n & 0xff);
+ m_longvarchar[1] = (n >> 8);
+ memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n);
+ }
+ break;
default:
assert(false);
break;
}
}
+void
+Val::calcnokeychars(Par par, unsigned& n, unsigned char* buf)
+{
+ const Col& col = m_col;
+ const Chs* chs = col.m_chs;
+ CHARSET_INFO* cs = chs->m_cs;
+ n = 0;
+ // our random chars may not fill value exactly
+ while (n + cs->mbmaxlen <= col.m_bytelength) {
+ if (urandom(1 + col.m_bytelength) == 0) {
+ break;
+ }
+ unsigned half = maxcharcount / 2;
+ int r = irandom((par.m_pctrange * half) / 100);
+ if (par.m_bdir != 0 && urandom(10) != 0) {
+ if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0)
+ r = -r;
+ }
+ unsigned i = half + r;
+ assert(i < maxcharcount);
+ const Chr& chr = chs->m_chr[i];
+ memcpy(buf + n, chr.m_bytes, chr.m_size);
+ n += chr.m_size;
+ }
+}
+
int
-Val::verify(const Val& val2) const
+Val::verify(Par par, const Val& val2) const
{
- CHK(cmp(val2) == 0);
+ CHK(cmp(par, val2) == 0);
return 0;
}
int
-Val::cmp(const Val& val2) const
+Val::cmp(Par par, const Val& val2) const
{
const Col& col = m_col;
const Col& col2 = val2.m_col;
@@ -1812,8 +1924,8 @@ Val::cmp(const Val& val2) const
return 0;
}
// verify data formats
- col.verify(dataaddr());
- col.verify(val2.dataaddr());
+ col.wellformed(dataaddr());
+ col.wellformed(val2.dataaddr());
// compare
switch (col.m_type) {
case Col::Unsigned:
@@ -1827,25 +1939,22 @@ Val::cmp(const Val& val2) const
break;
case Col::Char:
{
- const Chs* chs = col.m_chs;
- CHARSET_INFO* cs = chs->m_cs;
unsigned len = col.m_bytelength;
- int k;
- if (! g_opt.m_collsp) {
- unsigned char x1[maxxmulsize * 8000];
- unsigned char x2[maxxmulsize * 8000];
- int n1 = (*cs->coll->strnxfrm)(cs, x1, chs->m_xmul * len, m_char, len);
- int n2 = (*cs->coll->strnxfrm)(cs, x2, chs->m_xmul * len, val2.m_char, len);
- // currently same but do not assume it
- unsigned n = (n1 > n2 ? n1 : n2);
- // assume null padding
- memset(x1 + n1, 0x0, n - n1);
- memset(x2 + n2, 0x0, n - n2);
- k = memcmp(x1, x2, n);
- } else {
- k = (*cs->coll->strnncollsp)(cs, m_char, len, val2.m_char, len, false);
- }
- return k < 0 ? -1 : k > 0 ? +1 : 0;
+ return cmpchars(par, m_char, len, val2.m_char, len);
+ }
+ break;
+ case Col::Varchar:
+ {
+ unsigned len1 = m_varchar[0];
+ unsigned len2 = val2.m_varchar[0];
+ return cmpchars(par, m_varchar + 1, len1, val2.m_varchar + 1, len2);
+ }
+ break;
+ case Col::Longvarchar:
+ {
+ unsigned len1 = m_longvarchar[0] + (m_longvarchar[1] << 8);
+ unsigned len2 = val2.m_longvarchar[0] + (val2.m_longvarchar[1] << 8);
+ return cmpchars(par, m_longvarchar + 2, len1, val2.m_longvarchar + 2, len2);
}
break;
default:
@@ -1855,6 +1964,56 @@ Val::cmp(const Val& val2) const
return 0;
}
+int
+Val::cmpchars(Par par, const unsigned char* buf1, unsigned len1, const unsigned char* buf2, unsigned len2) const
+{
+ const Col& col = m_col;
+ const Chs* chs = col.m_chs;
+ CHARSET_INFO* cs = chs->m_cs;
+ int k;
+ if (! par.m_collsp) {
+ unsigned char x1[maxxmulsize * 8000];
+ unsigned char x2[maxxmulsize * 8000];
+ // make strxfrm pad both to same length
+ unsigned len = maxxmulsize * col.m_bytelength;
+ int n1 = NdbSqlUtil::strnxfrm_bug7284(cs, x1, chs->m_xmul * len, buf1, len1);
+ int n2 = NdbSqlUtil::strnxfrm_bug7284(cs, x2, chs->m_xmul * len, buf2, len2);
+ assert(n1 == n2);
+ k = memcmp(x1, x2, n1);
+ } else {
+ k = (*cs->coll->strnncollsp)(cs, buf1, len1, buf2, len2, false);
+ }
+ return k < 0 ? -1 : k > 0 ? +1 : 0;
+}
+
+static void
+printstring(NdbOut& out, const unsigned char* str, unsigned len, bool showlen)
+{
+ char buf[4 * 8000];
+ char *p = buf;
+ *p++ = '[';
+ if (showlen) {
+ sprintf(p, "%u:", len);
+ p += strlen(p);
+ }
+ for (unsigned i = 0; i < len; i++) {
+ unsigned char c = str[i];
+ if (c == '\\') {
+ *p++ = '\\';
+ *p++ = c;
+ } else if (0x20 <= c && c < 0x7e) {
+ *p++ = c;
+ } else {
+ *p++ = '\\';
+ *p++ = hexstr[c >> 4];
+ *p++ = hexstr[c & 15];
+ }
+ }
+ *p++ = ']';
+ *p = 0;
+ out << buf;
+}
+
static NdbOut&
operator<<(NdbOut& out, const Val& val)
{
@@ -1869,25 +2028,20 @@ operator<<(NdbOut& out, const Val& val)
break;
case Col::Char:
{
- char buf[4 * 8000];
- char *p = buf;
- *p++ = '[';
- for (unsigned i = 0; i < col.m_bytelength; i++) {
- unsigned char c = val.m_char[i];
- if (c == '\\') {
- *p++ = '\\';
- *p++ = '\\';
- } else if (0x20 <= c && c < 0x7e) {
- *p++ = c;
- } else {
- *p++ = '\\';
- *p++ = hexstr[c >> 4];
- *p++ = hexstr[c & 15];
- }
- }
- *p++ = ']';
- *p = 0;
- out << buf;
+ unsigned len = col.m_bytelength;
+ printstring(out, val.m_char, len, false);
+ }
+ break;
+ case Col::Varchar:
+ {
+ unsigned len = val.m_varchar[0];
+ printstring(out, val.m_varchar + 1, len, true);
+ }
+ break;
+ case Col::Longvarchar:
+ {
+ unsigned len = val.m_longvarchar[0] + (val.m_longvarchar[1] << 8);
+ printstring(out, val.m_longvarchar + 2, len, true);
}
break;
default:
@@ -1912,7 +2066,7 @@ struct Row {
void copy(const Row& row2);
void calc(Par par, unsigned i, unsigned mask = 0);
const Row& dbrow() const;
- int verify(const Row& row2) const;
+ int verify(Par par, const Row& row2) const;
int insrow(Par par);
int updrow(Par par);
int updrow(Par par, const ITab& itab);
@@ -1921,8 +2075,8 @@ struct Row {
int selrow(Par par);
int selrow(Par par, const ITab& itab);
int setrow(Par par);
- int cmp(const Row& row2) const;
- int cmp(const Row& row2, const ITab& itab) const;
+ int cmp(Par par, const Row& row2) const;
+ int cmp(Par par, const Row& row2, const ITab& itab) const;
private:
Row& operator=(const Row& row2);
};
@@ -1994,7 +2148,7 @@ Row::dbrow() const
}
int
-Row::verify(const Row& row2) const
+Row::verify(Par par, const Row& row2) const
{
const Tab& tab = m_tab;
const Row& row1 = *this;
@@ -2002,7 +2156,7 @@ Row::verify(const Row& row2) const
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val1 = *row1.m_val[k];
const Val& val2 = *row2.m_val[k];
- CHK(val1.verify(val2) == 0);
+ CHK(val1.verify(par, val2) == 0);
}
return 0;
}
@@ -2169,7 +2323,7 @@ Row::setrow(Par par)
}
int
-Row::cmp(const Row& row2) const
+Row::cmp(Par par, const Row& row2) const
{
const Tab& tab = m_tab;
assert(&tab == &row2.m_tab);
@@ -2177,14 +2331,14 @@ Row::cmp(const Row& row2) const
for (unsigned k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
const Val& val2 = *row2.m_val[k];
- if ((c = val.cmp(val2)) != 0)
+ if ((c = val.cmp(par, val2)) != 0)
break;
}
return c;
}
int
-Row::cmp(const Row& row2, const ITab& itab) const
+Row::cmp(Par par, const Row& row2, const ITab& itab) const
{
const Tab& tab = m_tab;
int c = 0;
@@ -2195,7 +2349,7 @@ Row::cmp(const Row& row2, const ITab& itab) const
assert(k < tab.m_cols);
const Val& val = *m_val[k];
const Val& val2 = *row2.m_val[k];
- if ((c = val.cmp(val2)) != 0)
+ if ((c = val.cmp(par, val2)) != 0)
break;
}
return c;
@@ -2283,8 +2437,8 @@ struct Set {
int getkey(Par par, unsigned* i);
int putval(unsigned i, bool force, unsigned n = ~0);
// verify
- int verify(const Set& set2) const;
- int verifyorder(const ITab& itab, bool descending) const;
+ int verify(Par par, const Set& set2) const;
+ int verifyorder(Par par, const ITab& itab, bool descending) const;
// protect structure
NdbMutex* m_mutex;
void lock() const {
@@ -2616,7 +2770,7 @@ Set::putval(unsigned i, bool force, unsigned n)
// verify
int
-Set::verify(const Set& set2) const
+Set::verify(Par par, const Set& set2) const
{
assert(&m_tab == &set2.m_tab && m_rows == set2.m_rows);
LL4("verify set1 count=" << count() << " vs set2 count=" << set2.count());
@@ -2625,7 +2779,7 @@ Set::verify(const Set& set2) const
if (exist(i) != set2.exist(i)) {
ok = false;
} else if (exist(i)) {
- if (dbrow(i).verify(set2.dbrow(i)) != 0)
+ if (dbrow(i).verify(par, set2.dbrow(i)) != 0)
ok = false;
}
if (! ok) {
@@ -2637,7 +2791,7 @@ Set::verify(const Set& set2) const
}
int
-Set::verifyorder(const ITab& itab, bool descending) const
+Set::verifyorder(Par par, const ITab& itab, bool descending) const
{
const Tab& tab = m_tab;
for (unsigned n = 0; n < m_rows; n++) {
@@ -2652,9 +2806,9 @@ Set::verifyorder(const ITab& itab, bool descending) const
const Row& row2 = *m_row[i2];
assert(row1.m_exist && row2.m_exist);
if (! descending)
- CHK(row1.cmp(row2, itab) <= 0);
+ CHK(row1.cmp(par, row2, itab) <= 0);
else
- CHK(row1.cmp(row2, itab) >= 0);
+ CHK(row1.cmp(par, row2, itab) >= 0);
}
return 0;
}
@@ -2724,7 +2878,7 @@ struct BSet {
void calc(Par par);
void calcpk(Par par, unsigned i);
int setbnd(Par par) const;
- void filter(const Set& set, Set& set2) const;
+ void filter(Par par, const Set& set, Set& set2) const;
};
BSet::BSet(const Tab& tab, const ITab& itab, unsigned rows) :
@@ -2797,7 +2951,7 @@ BSet::calc(Par par)
assert(m_bvals >= 2);
const BVal& bv1 = *m_bval[m_bvals - 2];
const BVal& bv2 = *m_bval[m_bvals - 1];
- if (bv1.cmp(bv2) > 0 && urandom(100) != 0)
+ if (bv1.cmp(par, bv2) > 0 && urandom(100) != 0)
continue;
}
} while (0);
@@ -2848,7 +3002,7 @@ BSet::setbnd(Par par) const
}
void
-BSet::filter(const Set& set, Set& set2) const
+BSet::filter(Par par, const Set& set, Set& set2) const
{
const Tab& tab = m_tab;
const ITab& itab = m_itab;
@@ -2880,7 +3034,7 @@ BSet::filter(const Set& set, Set& set2) const
const ICol& icol = bval.m_icol;
const Col& col = icol.m_col;
const Val& val = *row.m_val[col.m_num];
- int ret = bval.cmp(val);
+ int ret = bval.cmp(par, val);
LL5("cmp: ret=" << ret << " " << bval << " vs " << val);
if (bval.m_type == 0)
ok2 = (ret <= 0);
@@ -3110,7 +3264,7 @@ pkread(Par par)
con.closeTransaction();
}
if (par.m_verify)
- CHK(set1.verify(set2) == 0);
+ CHK(set1.verify(par, set2) == 0);
return 0;
}
@@ -3275,7 +3429,7 @@ hashindexread(Par par, const ITab& itab)
con.closeTransaction();
}
if (par.m_verify)
- CHK(set1.verify(set2) == 0);
+ CHK(set1.verify(par, set2) == 0);
return 0;
}
@@ -3289,12 +3443,11 @@ scanreadtable(Par par)
const Set& set = par.set();
// expected
const Set& set1 = set;
- LL3("scanread " << tab.m_name << " verify=" << par.m_verify);
- LL4("expect " << set.count() << " rows");
+ LL3("scanread " << tab.m_name << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify);
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(tab) == 0);
- CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0);
+ CHK(con.readTuples(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
unsigned n = 0;
@@ -3317,7 +3470,7 @@ scanreadtable(Par par)
}
con.closeTransaction();
if (par.m_verify)
- CHK(set1.verify(set2) == 0);
+ CHK(set1.verify(par, set2) == 0);
LL3("scanread " << tab.m_name << " done rows=" << n);
return 0;
}
@@ -3331,7 +3484,7 @@ scanreadtablefast(Par par, unsigned countcheck)
LL3("scanfast " << tab.m_name);
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(tab) == 0);
- CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0);
+ CHK(con.readTuples(par) == 0);
// get 1st column
NdbRecAttr* rec;
CHK(con.getValue((Uint32)0, rec) == 0);
@@ -3355,12 +3508,11 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
Con& con = par.con();
const Tab& tab = par.tab();
const Set& set = par.set();
- LL4(bset);
Set set1(tab, set.m_rows);
if (calc) {
while (true) {
bset.calc(par);
- bset.filter(set, set1);
+ bset.filter(par, set, set1);
unsigned n = set1.count();
// prefer proper subset
if (0 < n && n < set.m_rows)
@@ -3370,17 +3522,13 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
set1.reset();
}
} else {
- bset.filter(set, set1);
+ bset.filter(par, set, set1);
}
- LL3("scanread " << itab.m_name << " bounds=" << bset << " verify=" << par.m_verify << " ordered=" << par.m_ordered << " descending=" << par.m_descending);
- LL4("expect " << set1.count() << " rows");
+ LL3("scanread " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify << " ordered=" << par.m_ordered << " descending=" << par.m_descending);
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
- CHK(con.getNdbScanOperation(itab, tab) == 0);
- if (! par.m_ordered)
- CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0);
- else
- CHK(con.openScanOrdered(par.m_scanbat, par.m_scanpar, par.m_descending) == 0);
+ CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
+ CHK(con.readIndexTuples(par) == 0);
CHK(bset.setbnd(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
@@ -3404,9 +3552,9 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
}
con.closeTransaction();
if (par.m_verify) {
- CHK(set1.verify(set2) == 0);
+ CHK(set1.verify(par, set2) == 0);
if (par.m_ordered)
- CHK(set2.verifyorder(itab, par.m_descending) == 0);
+ CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
}
LL3("scanread " << itab.m_name << " done rows=" << n);
return 0;
@@ -3418,11 +3566,11 @@ scanreadindexfast(Par par, const ITab& itab, const BSet& bset, unsigned countche
Con& con = par.con();
const Tab& tab = par.tab();
const Set& set = par.set();
- LL3("scanfast " << itab.m_name << " bounds=" << bset.m_bvals);
+ LL3("scanfast " << itab.m_name << " " << bset);
LL4(bset);
CHK(con.startTransaction() == 0);
- CHK(con.getNdbScanOperation(itab, tab) == 0);
- CHK(con.openScanRead(par.m_scanbat, par.m_scanpar) == 0);
+ CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
+ CHK(con.readIndexTuples(par) == 0);
CHK(bset.setbnd(par) == 0);
// get 1st column
NdbRecAttr* rec;
@@ -3544,9 +3692,10 @@ scanupdatetable(Par par)
Set& set = par.set();
LL3("scan update " << tab.m_name);
Set set2(tab, set.m_rows);
+ par.m_lockmode = NdbOperation::LM_Exclusive;
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(tab) == 0);
- CHK(con.openScanExclusive(par.m_scanbat, par.m_scanpar) == 0);
+ CHK(con.readTuples(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
unsigned count = 0;
@@ -3641,12 +3790,10 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
Set& set = par.set();
LL3("scan update " << itab.m_name);
Set set2(tab, set.m_rows);
+ par.m_lockmode = NdbOperation::LM_Exclusive;
CHK(con.startTransaction() == 0);
- CHK(con.getNdbScanOperation(itab, tab) == 0);
- if (! par.m_ordered)
- CHK(con.openScanExclusive(par.m_scanbat, par.m_scanpar) == 0);
- else
- CHK(con.openScanOrderedExclusive(par.m_scanbat, par.m_scanpar, par.m_descending) == 0);
+ CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
+ CHK(con.readTuples(par) == 0);
CHK(bset.setbnd(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
@@ -3777,6 +3924,7 @@ readverify(Par par)
if (par.m_noverify)
return 0;
par.m_verify = true;
+ par.m_lockmode = NdbOperation::LM_CommittedRead;
CHK(pkread(par) == 0);
CHK(scanreadall(par) == 0);
return 0;
@@ -3788,19 +3936,24 @@ readverifyfull(Par par)
if (par.m_noverify)
return 0;
par.m_verify = true;
- if (par.m_no == 0)
+ par.m_lockmode = NdbOperation::LM_CommittedRead;
+ const Tab& tab = par.tab();
+ if (par.m_no == 0) {
+ // thread 0 scans table
CHK(scanreadtable(par) == 0);
- else {
- const Tab& tab = par.tab();
- unsigned i = par.m_no - 1;
- if (i < tab.m_itabs && tab.m_itab[i] != 0) {
- const ITab& itab = *tab.m_itab[i];
- if (itab.m_type == ITab::OrderedIndex) {
- BSet bset(tab, itab, par.m_rows);
- CHK(scanreadindex(par, itab, bset, false) == 0);
- } else {
- CHK(hashindexread(par, itab) == 0);
- }
+ }
+ // each thread scans different indexes
+ for (unsigned i = 0; i < tab.m_itabs; i++) {
+ if (i % par.m_threads != par.m_no)
+ continue;
+ if (tab.m_itab[i] == 0)
+ continue;
+ const ITab& itab = *tab.m_itab[i];
+ if (itab.m_type == ITab::OrderedIndex) {
+ BSet bset(tab, itab, par.m_rows);
+ CHK(scanreadindex(par, itab, bset, false) == 0);
+ } else {
+ CHK(hashindexread(par, itab) == 0);
}
}
return 0;
@@ -3809,7 +3962,10 @@ readverifyfull(Par par)
static int
readverifyindex(Par par)
{
+ if (par.m_noverify)
+ return 0;
par.m_verify = true;
+ par.m_lockmode = NdbOperation::LM_CommittedRead;
unsigned sel = urandom(10);
if (sel < 9) {
par.m_ordered = true;
@@ -4411,15 +4567,30 @@ printtables()
{
Par par(g_opt);
makebuiltintables(par);
- ndbout << "builtin tables (x0 on pk, x=ordered z=hash):" << endl;
+ ndbout << "tables and indexes (x=ordered z=hash x0=on pk):" << endl;
for (unsigned j = 0; j < tabcount; j++) {
if (tablist[j] == 0)
continue;
const Tab& tab = *tablist[j];
- ndbout << " " << tab.m_name;
+ const char* tname = tab.m_name;
+ ndbout << " " << tname;
for (unsigned i = 0; i < tab.m_itabs; i++) {
+ if (tab.m_itab[i] == 0)
+ continue;
const ITab& itab = *tab.m_itab[i];
- ndbout << " " << itab.m_name;
+ const char* iname = itab.m_name;
+ if (strncmp(tname, iname, strlen(tname)) == 0)
+ iname += strlen(tname);
+ ndbout << " " << iname;
+ ndbout << "(";
+ for (unsigned k = 0; k < itab.m_icols; k++) {
+ if (k != 0)
+ ndbout << ",";
+ const ICol& icol = *itab.m_icol[k];
+ const Col& col = icol.m_col;
+ ndbout << col.m_name;
+ }
+ ndbout << ")";
}
ndbout << endl;
}
@@ -4434,9 +4605,12 @@ runtest(Par par)
unsigned short seed = (getpid() ^ time(0));
LL1("random seed: " << seed);
srandom((unsigned)seed);
- } else if (par.m_seed != 0)
+ } else if (par.m_seed != 0) {
LL1("random seed: " << par.m_seed);
srandom(par.m_seed);
+ } else {
+ LL1("random seed: loop number");
+ }
// cs
assert(par.m_csname != 0);
if (strcmp(par.m_csname, "random") != 0) {
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 290d73554ca..767ddc40400 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -416,6 +416,7 @@ static inline bool ndb_supported_type(enum_field_types type)
case MYSQL_TYPE_YEAR:
case MYSQL_TYPE_STRING:
case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
@@ -426,7 +427,6 @@ static inline bool ndb_supported_type(enum_field_types type)
return TRUE;
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
- case MYSQL_TYPE_VARCHAR:
break;
}
return FALSE;
@@ -1013,6 +1013,24 @@ inline ulong ha_ndbcluster::index_flags(uint idx_no, uint part,
DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]);
}
+static void shrink_varchar(Field* field, const byte* & ptr, char* buf)
+{
+ if (field->type() == MYSQL_TYPE_VARCHAR) {
+ Field_varstring* f= (Field_varstring*)field;
+ if (f->length_bytes < 256) {
+ uint pack_len= field->pack_length();
+ DBUG_ASSERT(1 <= pack_len && pack_len <= 256);
+ if (ptr[1] == 0) {
+ buf[0]= ptr[0];
+ } else {
+ DBUG_ASSERT(false);
+ buf[0]= 255;
+ }
+ memmove(buf + 1, ptr + 2, pack_len - 1);
+ ptr= buf;
+ }
+ }
+}
int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
{
@@ -1024,10 +1042,13 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
for (; key_part != end; key_part++)
{
Field* field= key_part->field;
+ const byte* ptr= key;
+ char buf[256];
+ shrink_varchar(field, ptr, buf);
if (set_ndb_key(op, field,
- key_part->fieldnr-1, key))
+ key_part->fieldnr-1, ptr))
ERR_RETURN(op->getNdbError());
- key += key_part->length;
+ key += key_part->store_length;
}
DBUG_RETURN(0);
}
@@ -1080,8 +1101,11 @@ ha_ndbcluster::set_index_key(NdbOperation *op,
for (i= 0; key_part != end; key_part++, i++)
{
- if (set_ndb_key(op, key_part->field, i,
- key_part->null_bit ? key_ptr + 1 : key_ptr))
+ Field* field= key_part->field;
+ const byte* ptr= key_part->null_bit ? key_ptr + 1 : key_ptr;
+ char buf[256];
+ shrink_varchar(field, ptr, buf);
+ if (set_ndb_key(op, field, i, ptr))
ERR_RETURN(m_active_trans->getNdbError());
key_ptr+= key_part->store_length;
}
@@ -1542,7 +1566,10 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
char truncated_field_name[NDB_MAX_ATTR_NAME_SIZE];
strnmov(truncated_field_name,field->field_name,sizeof(truncated_field_name));
truncated_field_name[sizeof(truncated_field_name)-1]= '\0';
- if (op->setBound(truncated_field_name, p.bound_type, p.bound_ptr))
+ const char* ptr= p.bound_ptr;
+ char buf[256];
+ shrink_varchar(field, ptr, buf);
+ if (op->setBound(truncated_field_name, p.bound_type, ptr))
ERR_RETURN(op->getNdbError());
}
}
@@ -2343,21 +2370,26 @@ void ha_ndbcluster::print_results()
my_snprintf(buf, sizeof(buf), "Decimal '%-*s'", field->pack_length(), value);
break;
}
- case NdbDictionary::Column::Char:{
+ case NdbDictionary::Column::Char: {
const char *value= (char*)ptr;
my_snprintf(buf, sizeof(buf), "Char '%.*s'", field->pack_length(), value);
break;
}
- case NdbDictionary::Column::Varchar:
- case NdbDictionary::Column::Binary:
- case NdbDictionary::Column::Varbinary: {
- const char *value= (char*)ptr;
- my_snprintf(buf, sizeof(buf), "Var '%.*s'", field->pack_length(), value);
+ case NdbDictionary::Column::Varchar: {
+ uint len= *(uchar*)ptr;
+ const char *value= (char*)ptr + 1;
+ my_snprintf(buf, sizeof(buf), "Varchar (%u)'%.*s'", len, len, value);
break;
}
- case NdbDictionary::Column::Bit: {
+ case NdbDictionary::Column::Binary: {
const char *value= (char*)ptr;
- my_snprintf(buf, sizeof(buf), "Bit '%.*s'", field->pack_length(), value);
+ my_snprintf(buf, sizeof(buf), "Binary '%.*s'", field->pack_length(), value);
+ break;
+ }
+ case NdbDictionary::Column::Varbinary: {
+ uint len= *(uchar*)ptr;
+ const char *value= (char*)ptr + 1;
+ my_snprintf(buf, sizeof(buf), "Varbinary (%u)'%.*s'", len, len, value);
break;
}
case NdbDictionary::Column::Datetime: {
@@ -2382,6 +2414,23 @@ void ha_ndbcluster::print_results()
my_snprintf(buf, sizeof(buf), "Text [len=%u]", (unsigned)len);
break;
}
+ case NdbDictionary::Column::Bit: {
+ const char *value= (char*)ptr;
+ my_snprintf(buf, sizeof(buf), "Bit '%.*s'", field->pack_length(), value);
+ break;
+ }
+ case NdbDictionary::Column::Longvarchar: {
+ uint len= uint2korr(ptr);
+ const char *value= (char*)ptr + 2;
+ my_snprintf(buf, sizeof(buf), "Longvarchar (%u)'%.*s'", len, len, value);
+ break;
+ }
+ case NdbDictionary::Column::Longvarbinary: {
+ uint len= uint2korr(ptr);
+ const char *value= (char*)ptr + 2;
+ my_snprintf(buf, sizeof(buf), "Longvarbinary (%u)'%.*s'", len, len, value);
+ break;
+ }
case NdbDictionary::Column::Undefined:
my_snprintf(buf, sizeof(buf), "Unknown type: %d", col->getType());
break;
@@ -3472,14 +3521,34 @@ static int create_ndb_column(NDBCOL &col,
col.setLength(field->pack_length());
}
break;
- case MYSQL_TYPE_VAR_STRING:
- if (field->flags & BINARY_FLAG)
- col.setType(NDBCOL::Varbinary);
- else {
- col.setType(NDBCOL::Varchar);
- col.setCharset(cs);
+ case MYSQL_TYPE_VAR_STRING: // ?
+ case MYSQL_TYPE_VARCHAR:
+ {
+ Field_varstring* f= (Field_varstring*)field;
+ if (f->length_bytes == 1)
+ {
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Varbinary);
+ else {
+ col.setType(NDBCOL::Varchar);
+ col.setCharset(cs);
+ }
+ }
+ else if (f->length_bytes == 2)
+ {
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Longvarbinary);
+ else {
+ col.setType(NDBCOL::Longvarchar);
+ col.setCharset(cs);
+ }
+ }
+ else
+ {
+ return HA_ERR_UNSUPPORTED;
+ }
+ col.setLength(field->field_length);
}
- col.setLength(field->pack_length());
break;
// Blob types (all come in as MYSQL_TYPE_BLOB)
mysql_type_tiny_blob:
@@ -3599,7 +3668,7 @@ int ha_ndbcluster::create(const char *name,
char name2[FN_HEADLEN];
bool create_from_engine= (info->table_options & HA_CREATE_FROM_ENGINE);
- DBUG_ENTER("create");
+ DBUG_ENTER("ha_ndbcluster::create");
DBUG_PRINT("enter", ("name: %s", name));
fn_format(name2, name, "", "",2); // Remove the .frm extension
set_dbname(name2);
@@ -3934,7 +4003,6 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_table_flags(HA_REC_NOT_IN_SEQ |
HA_NULL_IN_KEY |
HA_AUTO_PART_KEY |
- HA_NO_VARCHAR |
HA_NO_PREFIX_CHAR_KEYS |
HA_NEED_READ_RANGE_BUFFER |
HA_CAN_BIT_FIELD),