summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <pekka@mysql.com>2004-07-22 12:38:09 +0200
committerunknown <pekka@mysql.com>2004-07-22 12:38:09 +0200
commitd1e7ef7927ede0c1754676324c9be407beb6f4f4 (patch)
tree9205f79127318a2c7f17c845f906a213a92eb452
parent9864327a61c5756c6ce00bd8e8b7d347cf2b3938 (diff)
downloadmariadb-git-d1e7ef7927ede0c1754676324c9be407beb6f4f4.tar.gz
ha_ndb blobs 2
-rw-r--r--mysql-test/r/ndb_blob.result272
-rw-r--r--mysql-test/t/ndb_blob.test249
-rw-r--r--sql/ha_ndbcluster.cc677
-rw-r--r--sql/ha_ndbcluster.h23
4 files changed, 1016 insertions, 205 deletions
diff --git a/mysql-test/r/ndb_blob.result b/mysql-test/r/ndb_blob.result
index e69de29bb2d..89b53aea7d1 100644
--- a/mysql-test/r/ndb_blob.result
+++ b/mysql-test/r/ndb_blob.result
@@ -0,0 +1,272 @@
+drop table if exists t1;
+set autocommit=0;
+create table t1 (
+a int not null primary key,
+b text not null,
+c int not null,
+d longblob,
+key (c)
+) engine=ndbcluster;
+set @x0 = '01234567012345670123456701234567';
+set @x0 = concat(@x0,@x0,@x0,@x0,@x0,@x0,@x0,@x0);
+set @b1 = 'b1';
+set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
+set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
+set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
+set @b1 = concat(@b1,@x0);
+set @d1 = 'dd1';
+set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
+set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
+set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
+set @b2 = 'b2';
+set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
+set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
+set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
+set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
+set @d2 = 'dd2';
+set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
+set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
+set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
+set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
+select length(@x0),length(@b1),length(@d1) from dual;
+length(@x0) length(@b1) length(@d1)
+256 2256 3000
+select length(@x0),length(@b2),length(@d2) from dual;
+length(@x0) length(@b2) length(@d2)
+256 20000 30000
+insert into t1 values(1,@b1,111,@d1);
+insert into t1 values(2,@b2,222,@d2);
+commit;
+explain select * from t1 where a = 1;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE t1 const PRIMARY PRIMARY 4 const 1
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where a=1;
+a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
+1 2256 b1 3000 dd1
+select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
+from t1 where a=2;
+a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
+2 20000 b2 30000 dd2
+update t1 set b=@b2,d=@d2 where a=1;
+update t1 set b=@b1,d=@d1 where a=2;
+commit;
+select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
+from t1 where a=1;
+a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
+1 20000 b2 30000 dd2
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where a=2;
+a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
+2 2256 b1 3000 dd1
+update t1 set b=concat(b,b),d=concat(d,d) where a=1;
+update t1 set b=concat(b,b),d=concat(d,d) where a=2;
+commit;
+select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
+from t1 where a=1;
+a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
+1 40000 b2 60000 dd2
+select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3)
+from t1 where a=2;
+a length(b) substr(b,1+4*900,2) length(d) substr(d,1+6*900,3)
+2 4512 b1 6000 dd1
+update t1 set d=null where a=1;
+commit;
+select a from t1 where d is null;
+a
+1
+delete from t1 where a=1;
+delete from t1 where a=2;
+commit;
+select count(*) from t1;
+count(*)
+0
+insert into t1 values(1,@b1,111,@d1);
+insert into t1 values(2,@b2,222,@d2);
+commit;
+explain select * from t1 where c = 111;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE t1 ref c c 4 const 10 Using where
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where c=111;
+a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
+1 2256 b1 3000 dd1
+select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
+from t1 where c=222;
+a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
+2 20000 b2 30000 dd2
+update t1 set b=@b2,d=@d2 where c=111;
+update t1 set b=@b1,d=@d1 where c=222;
+commit;
+select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
+from t1 where c=111;
+a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
+1 20000 b2 30000 dd2
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where c=222;
+a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
+2 2256 b1 3000 dd1
+update t1 set d=null where c=111;
+commit;
+select a from t1 where d is null;
+a
+1
+delete from t1 where c=111;
+delete from t1 where c=222;
+commit;
+select count(*) from t1;
+count(*)
+0
+insert into t1 values(1,'b1',111,'dd1');
+insert into t1 values(2,'b2',222,'dd2');
+insert into t1 values(3,'b3',333,'dd3');
+insert into t1 values(4,'b4',444,'dd4');
+insert into t1 values(5,'b5',555,'dd5');
+insert into t1 values(6,'b6',666,'dd6');
+insert into t1 values(7,'b7',777,'dd7');
+insert into t1 values(8,'b8',888,'dd8');
+insert into t1 values(9,'b9',999,'dd9');
+commit;
+explain select * from t1;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE t1 ALL NULL NULL NULL NULL 100
+select * from t1 order by a;
+a b c d
+1 b1 111 dd1
+2 b2 222 dd2
+3 b3 333 dd3
+4 b4 444 dd4
+5 b5 555 dd5
+6 b6 666 dd6
+7 b7 777 dd7
+8 b8 888 dd8
+9 b9 999 dd9
+update t1 set b=concat(a,'x',b),d=concat(a,'x',d);
+commit;
+select * from t1 order by a;
+a b c d
+1 1xb1 111 1xdd1
+2 2xb2 222 2xdd2
+3 3xb3 333 3xdd3
+4 4xb4 444 4xdd4
+5 5xb5 555 5xdd5
+6 6xb6 666 6xdd6
+7 7xb7 777 7xdd7
+8 8xb8 888 8xdd8
+9 9xb9 999 9xdd9
+delete from t1;
+commit;
+select count(*) from t1;
+count(*)
+0
+insert into t1 values(1,@b1,111,@d1);
+insert into t1 values(2,@b2,222,@d2);
+commit;
+explain select * from t1;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE t1 ALL NULL NULL NULL NULL 100
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 order by a;
+a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
+1 2256 b1 3000 dd1
+2 20000 b2 30000 dd2
+update t1 set b=concat(b,b),d=concat(d,d);
+commit;
+select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
+from t1 order by a;
+a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
+1 4512 6000
+2 40000 b2 60000 dd2
+delete from t1;
+commit;
+select count(*) from t1;
+count(*)
+0
+insert into t1 values(1,'b1',111,'dd1');
+insert into t1 values(2,'b2',222,'dd2');
+insert into t1 values(3,'b3',333,'dd3');
+insert into t1 values(4,'b4',444,'dd4');
+insert into t1 values(5,'b5',555,'dd5');
+insert into t1 values(6,'b6',666,'dd6');
+insert into t1 values(7,'b7',777,'dd7');
+insert into t1 values(8,'b8',888,'dd8');
+insert into t1 values(9,'b9',999,'dd9');
+commit;
+explain select * from t1 where c >= 100 order by a;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE t1 range c c 4 NULL 10 Using where; Using filesort
+select * from t1 where c >= 100 order by a;
+a b c d
+1 b1 111 dd1
+2 b2 222 dd2
+3 b3 333 dd3
+4 b4 444 dd4
+5 b5 555 dd5
+6 b6 666 dd6
+7 b7 777 dd7
+8 b8 888 dd8
+9 b9 999 dd9
+update t1 set b=concat(a,'x',b),d=concat(a,'x',d)
+where c >= 100;
+commit;
+select * from t1 where c >= 100 order by a;
+a b c d
+1 1xb1 111 1xdd1
+2 2xb2 222 2xdd2
+3 3xb3 333 3xdd3
+4 4xb4 444 4xdd4
+5 5xb5 555 5xdd5
+6 6xb6 666 6xdd6
+7 7xb7 777 7xdd7
+8 8xb8 888 8xdd8
+9 9xb9 999 9xdd9
+delete from t1 where c >= 100;
+commit;
+select count(*) from t1;
+count(*)
+0
+insert into t1 values(1,@b1,111,@d1);
+insert into t1 values(2,@b2,222,@d2);
+commit;
+explain select * from t1 where c >= 100 order by a;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE t1 range c c 4 NULL 10 Using where; Using filesort
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where c >= 100 order by a;
+a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
+1 2256 b1 3000 dd1
+2 20000 b2 30000 dd2
+update t1 set b=concat(b,b),d=concat(d,d);
+commit;
+select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
+from t1 where c >= 100 order by a;
+a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
+1 4512 6000
+2 40000 b2 60000 dd2
+delete from t1 where c >= 100;
+commit;
+select count(*) from t1;
+count(*)
+0
+insert into t1 values(1,@b1,111,@d1);
+insert into t1 values(2,@b2,222,@d2);
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where a = 0;
+a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where a = 1;
+a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
+1 2256 b1 3000 dd1
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where a = 2;
+a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
+2 20000 b2 30000 dd2
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 order by a;
+a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
+1 2256 b1 3000 dd1
+2 20000 b2 30000 dd2
+rollback;
+select count(*) from t1;
+count(*)
+0
diff --git a/mysql-test/t/ndb_blob.test b/mysql-test/t/ndb_blob.test
index e69de29bb2d..c1166a7a90c 100644
--- a/mysql-test/t/ndb_blob.test
+++ b/mysql-test/t/ndb_blob.test
@@ -0,0 +1,249 @@
+--source include/have_ndb.inc
+
+--disable_warnings
+drop table if exists t1;
+--enable_warnings
+
+#
+# Minimal NDB blobs test.
+#
+# On NDB API level there is an extensive test program "testBlobs".
+# A prerequisite for this handler test is that "testBlobs" succeeds.
+#
+
+# make test harder with autocommit off
+set autocommit=0;
+
+create table t1 (
+ a int not null primary key,
+ b text not null,
+ c int not null,
+ d longblob,
+ key (c)
+) engine=ndbcluster;
+
+# -- values --
+
+# x0 size 256 (current inline size)
+set @x0 = '01234567012345670123456701234567';
+set @x0 = concat(@x0,@x0,@x0,@x0,@x0,@x0,@x0,@x0);
+
+# b1 length 2000+256 (blob part aligned)
+set @b1 = 'b1';
+set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
+set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
+set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
+set @b1 = concat(@b1,@x0);
+# d1 length 3000
+set @d1 = 'dd1';
+set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
+set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
+set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
+
+# b2 length 20000
+set @b2 = 'b2';
+set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
+set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
+set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
+set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
+# d2 length 30000
+set @d2 = 'dd2';
+set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
+set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
+set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
+set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
+
+select length(@x0),length(@b1),length(@d1) from dual;
+select length(@x0),length(@b2),length(@d2) from dual;
+
+# -- pk ops --
+
+insert into t1 values(1,@b1,111,@d1);
+insert into t1 values(2,@b2,222,@d2);
+commit;
+explain select * from t1 where a = 1;
+
+# pk read
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where a=1;
+select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
+from t1 where a=2;
+
+# pk update
+update t1 set b=@b2,d=@d2 where a=1;
+update t1 set b=@b1,d=@d1 where a=2;
+commit;
+select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
+from t1 where a=1;
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where a=2;
+
+# pk update
+update t1 set b=concat(b,b),d=concat(d,d) where a=1;
+update t1 set b=concat(b,b),d=concat(d,d) where a=2;
+commit;
+select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
+from t1 where a=1;
+select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3)
+from t1 where a=2;
+
+# pk update to null
+update t1 set d=null where a=1;
+commit;
+select a from t1 where d is null;
+
+# pk delete
+delete from t1 where a=1;
+delete from t1 where a=2;
+commit;
+select count(*) from t1;
+
+# -- hash index ops --
+
+insert into t1 values(1,@b1,111,@d1);
+insert into t1 values(2,@b2,222,@d2);
+commit;
+explain select * from t1 where c = 111;
+
+# hash key read
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where c=111;
+select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
+from t1 where c=222;
+
+# hash key update
+update t1 set b=@b2,d=@d2 where c=111;
+update t1 set b=@b1,d=@d1 where c=222;
+commit;
+select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
+from t1 where c=111;
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where c=222;
+
+# hash key update to null
+update t1 set d=null where c=111;
+commit;
+select a from t1 where d is null;
+
+# hash key delete
+delete from t1 where c=111;
+delete from t1 where c=222;
+commit;
+select count(*) from t1;
+
+# -- table scan ops, short values --
+
+insert into t1 values(1,'b1',111,'dd1');
+insert into t1 values(2,'b2',222,'dd2');
+insert into t1 values(3,'b3',333,'dd3');
+insert into t1 values(4,'b4',444,'dd4');
+insert into t1 values(5,'b5',555,'dd5');
+insert into t1 values(6,'b6',666,'dd6');
+insert into t1 values(7,'b7',777,'dd7');
+insert into t1 values(8,'b8',888,'dd8');
+insert into t1 values(9,'b9',999,'dd9');
+commit;
+explain select * from t1;
+
+# table scan read
+select * from t1 order by a;
+
+# table scan update
+update t1 set b=concat(a,'x',b),d=concat(a,'x',d);
+commit;
+select * from t1 order by a;
+
+# table scan delete
+delete from t1;
+commit;
+select count(*) from t1;
+
+# -- table scan ops, long values --
+
+insert into t1 values(1,@b1,111,@d1);
+insert into t1 values(2,@b2,222,@d2);
+commit;
+explain select * from t1;
+
+# table scan read
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 order by a;
+
+# table scan update
+update t1 set b=concat(b,b),d=concat(d,d);
+commit;
+select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
+from t1 order by a;
+
+# table scan delete
+delete from t1;
+commit;
+select count(*) from t1;
+
+# -- range scan ops, short values --
+
+insert into t1 values(1,'b1',111,'dd1');
+insert into t1 values(2,'b2',222,'dd2');
+insert into t1 values(3,'b3',333,'dd3');
+insert into t1 values(4,'b4',444,'dd4');
+insert into t1 values(5,'b5',555,'dd5');
+insert into t1 values(6,'b6',666,'dd6');
+insert into t1 values(7,'b7',777,'dd7');
+insert into t1 values(8,'b8',888,'dd8');
+insert into t1 values(9,'b9',999,'dd9');
+commit;
+explain select * from t1 where c >= 100 order by a;
+
+# range scan read
+select * from t1 where c >= 100 order by a;
+
+# range scan update
+update t1 set b=concat(a,'x',b),d=concat(a,'x',d)
+where c >= 100;
+commit;
+select * from t1 where c >= 100 order by a;
+
+# range scan delete
+delete from t1 where c >= 100;
+commit;
+select count(*) from t1;
+
+# -- range scan ops, long values --
+
+insert into t1 values(1,@b1,111,@d1);
+insert into t1 values(2,@b2,222,@d2);
+commit;
+explain select * from t1 where c >= 100 order by a;
+
+# range scan read
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where c >= 100 order by a;
+
+# range scan update
+update t1 set b=concat(b,b),d=concat(d,d);
+commit;
+select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
+from t1 where c >= 100 order by a;
+
+# range scan delete
+delete from t1 where c >= 100;
+commit;
+select count(*) from t1;
+
+# -- rollback --
+
+insert into t1 values(1,@b1,111,@d1);
+insert into t1 values(2,@b2,222,@d2);
+# 626
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where a = 0;
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where a = 1;
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 where a = 2;
+select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
+from t1 order by a;
+rollback;
+select count(*) from t1;
+
+--drop table t1;
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index b57735b9de6..e46857bc6f6 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -182,6 +182,21 @@ bool ha_ndbcluster::get_error_message(int error,
/*
+ Check if type is supported by NDB.
+*/
+
+static inline bool ndb_supported_type(enum_field_types type)
+{
+ switch (type) {
+ case MYSQL_TYPE_NULL:
+ case MYSQL_TYPE_GEOMETRY:
+ return false;
+ }
+ return true;
+}
+
+
+/*
Instruct NDB to set the value of the hidden primary key
*/
@@ -208,40 +223,15 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
pack_len));
DBUG_DUMP("key", (char*)field_ptr, pack_len);
- switch (field->type()) {
- case MYSQL_TYPE_DECIMAL:
- case MYSQL_TYPE_TINY:
- case MYSQL_TYPE_SHORT:
- case MYSQL_TYPE_LONG:
- case MYSQL_TYPE_FLOAT:
- case MYSQL_TYPE_DOUBLE:
- case MYSQL_TYPE_TIMESTAMP:
- case MYSQL_TYPE_LONGLONG:
- case MYSQL_TYPE_INT24:
- case MYSQL_TYPE_DATE:
- case MYSQL_TYPE_TIME:
- case MYSQL_TYPE_DATETIME:
- case MYSQL_TYPE_YEAR:
- case MYSQL_TYPE_NEWDATE:
- case MYSQL_TYPE_ENUM:
- case MYSQL_TYPE_SET:
- case MYSQL_TYPE_VAR_STRING:
- case MYSQL_TYPE_STRING:
- // Common implementation for most field types
- DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
-
- case MYSQL_TYPE_TINY_BLOB:
- case MYSQL_TYPE_MEDIUM_BLOB:
- case MYSQL_TYPE_LONG_BLOB:
- case MYSQL_TYPE_BLOB:
- case MYSQL_TYPE_NULL:
- case MYSQL_TYPE_GEOMETRY:
- default:
- // Unhandled field types
- DBUG_PRINT("error", ("Field type %d not supported", field->type()));
- DBUG_RETURN(2);
+ if (ndb_supported_type(field->type()))
+ {
+ if (! (field->flags & BLOB_FLAG))
+ // Common implementation for most field types
+ DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
}
- DBUG_RETURN(3);
+ // Unhandled field types
+ DBUG_PRINT("error", ("Field type %d not supported", field->type()));
+ DBUG_RETURN(2);
}
@@ -259,63 +249,197 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
fieldnr, field->field_name, field->type(),
pack_len, field->is_null()?"Y":"N"));
DBUG_DUMP("value", (char*) field_ptr, pack_len);
-
- if (field->is_null())
+
+ if (ndb_supported_type(field->type()))
{
- // Set value to NULL
- DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
- }
-
- switch (field->type()) {
- case MYSQL_TYPE_DECIMAL:
- case MYSQL_TYPE_TINY:
- case MYSQL_TYPE_SHORT:
- case MYSQL_TYPE_LONG:
- case MYSQL_TYPE_FLOAT:
- case MYSQL_TYPE_DOUBLE:
- case MYSQL_TYPE_TIMESTAMP:
- case MYSQL_TYPE_LONGLONG:
- case MYSQL_TYPE_INT24:
- case MYSQL_TYPE_DATE:
- case MYSQL_TYPE_TIME:
- case MYSQL_TYPE_DATETIME:
- case MYSQL_TYPE_YEAR:
- case MYSQL_TYPE_NEWDATE:
- case MYSQL_TYPE_ENUM:
- case MYSQL_TYPE_SET:
- case MYSQL_TYPE_VAR_STRING:
- case MYSQL_TYPE_STRING:
- // Common implementation for most field types
- DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
-
- case MYSQL_TYPE_TINY_BLOB:
- case MYSQL_TYPE_MEDIUM_BLOB:
- case MYSQL_TYPE_LONG_BLOB:
- case MYSQL_TYPE_BLOB:
- case MYSQL_TYPE_NULL:
- case MYSQL_TYPE_GEOMETRY:
- default:
- // Unhandled field types
- DBUG_PRINT("error", ("Field type %d not supported", field->type()));
- DBUG_RETURN(2);
+ if (! (field->flags & BLOB_FLAG))
+ {
+ if (field->is_null())
+ // Set value to NULL
+ DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
+ // Common implementation for most field types
+ DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
+ }
+
+ // Blob type
+ NdbBlob *ndb_blob = ndb_op->getBlobHandle(fieldnr);
+ if (ndb_blob != NULL)
+ {
+ if (field->is_null())
+ DBUG_RETURN(ndb_blob->setNull() != 0);
+
+ Field_blob *field_blob= (Field_blob*)field;
+
+ // Get length and pointer to data
+ uint32 blob_len= field_blob->get_length(field_ptr);
+ char* blob_ptr= NULL;
+ field_blob->get_ptr(&blob_ptr);
+
+ // Looks like NULL blob can also be signaled in this way
+ if (blob_ptr == NULL)
+ DBUG_RETURN(ndb_blob->setNull() != 0);
+
+ DBUG_PRINT("value", ("set blob ptr=%x len=%u",
+ (unsigned)blob_ptr, blob_len));
+ DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26));
+
+ // No callback needed to write value
+ DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0);
+ }
+ DBUG_RETURN(1);
+ }
+ // Unhandled field types
+ DBUG_PRINT("error", ("Field type %d not supported", field->type()));
+ DBUG_RETURN(2);
+}
+
+
+/*
+ Callback to read all blob values.
+ - not done in unpack_record because unpack_record is valid
+ after execute(Commit) but reading blobs is not
+ - may only generate read operations; they have to be executed
+ somewhere before the data is available
+ - due to single buffer for all blobs, we let the last blob
+ process all blobs (last so that all are active)
+ - null bit is still set in unpack_record
+ - TODO allocate blob part aligned buffers
+*/
+
+NdbBlob::ActiveHook get_ndb_blobs_value;
+
+int get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
+{
+ DBUG_ENTER("get_ndb_blobs_value [callback]");
+ if (ndb_blob->blobsNextBlob() != NULL)
+ DBUG_RETURN(0);
+ ha_ndbcluster *ha= (ha_ndbcluster *)arg;
+ DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob));
+}
+
+int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
+{
+ DBUG_ENTER("get_ndb_blobs_value");
+
+ // Field has no field number so cannot use TABLE blob_field
+ // Loop twice, first only counting total buffer size
+ for (int loop= 0; loop <= 1; loop++)
+ {
+ uint32 offset= 0;
+ for (uint i= 0; i < table->fields; i++)
+ {
+ Field *field= table->field[i];
+ NdbValue value= m_value[i];
+ if (value.ptr != NULL && (field->flags & BLOB_FLAG))
+ {
+ Field_blob *field_blob= (Field_blob *)field;
+ NdbBlob *ndb_blob= value.blob;
+ Uint64 blob_len= 0;
+ if (ndb_blob->getLength(blob_len) != 0)
+ DBUG_RETURN(-1);
+ // Align to Uint64
+ uint32 blob_size= blob_len;
+ if (blob_size % 8 != 0)
+ blob_size+= 8 - blob_size % 8;
+ if (loop == 1)
+ {
+ char *buf= blobs_buffer + offset;
+ uint32 len= 0xffffffff; // Max uint32
+ DBUG_PRINT("value", ("read blob ptr=%x len=%u",
+ (uint)buf, (uint)blob_len));
+ if (ndb_blob->readData(buf, len) != 0)
+ DBUG_RETURN(-1);
+ DBUG_ASSERT(len == blob_len);
+ field_blob->set_ptr(len, buf);
+ }
+ offset+= blob_size;
+ }
+ }
+ if (loop == 0 && offset > blobs_buffer_size)
+ {
+ my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+ blobs_buffer_size= 0;
+ DBUG_PRINT("value", ("allocate blobs buffer size %u", offset));
+ blobs_buffer= my_malloc(offset, MYF(MY_WME));
+ if (blobs_buffer == NULL)
+ DBUG_RETURN(-1);
+ blobs_buffer_size= offset;
+ }
}
- DBUG_RETURN(3);
+ DBUG_RETURN(0);
}
/*
Instruct NDB to fetch one field
- - data is read directly into buffer provided by field_ptr
- if it's NULL, data is read into memory provided by NDBAPI
+ - data is read directly into buffer provided by field
+ if field is NULL, data is read into memory provided by NDBAPI
*/
-int ha_ndbcluster::get_ndb_value(NdbOperation *op,
- uint field_no, byte *field_ptr)
+int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
+ uint fieldnr)
{
DBUG_ENTER("get_ndb_value");
- DBUG_PRINT("enter", ("field_no: %d", field_no));
- m_value[field_no]= op->getValue(field_no, field_ptr);
- DBUG_RETURN(m_value == NULL);
+ DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr,
+ (int)(field != NULL ? field->flags : 0)));
+
+ if (field != NULL)
+ {
+ if (ndb_supported_type(field->type()))
+ {
+ DBUG_ASSERT(field->ptr != NULL);
+ if (! (field->flags & BLOB_FLAG))
+ {
+ m_value[fieldnr].rec= ndb_op->getValue(fieldnr, field->ptr);
+ DBUG_RETURN(m_value[fieldnr].rec == NULL);
+ }
+
+ // Blob type
+ NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
+ m_value[fieldnr].blob= ndb_blob;
+ if (ndb_blob != NULL)
+ {
+ // Set callback
+ void *arg= (void *)this;
+ DBUG_RETURN(ndb_blob->setActiveHook(::get_ndb_blobs_value, arg) != 0);
+ }
+ DBUG_RETURN(1);
+ }
+ // Unhandled field types
+ DBUG_PRINT("error", ("Field type %d not supported", field->type()));
+ DBUG_RETURN(2);
+ }
+
+ // Used for hidden key only
+ m_value[fieldnr].rec= ndb_op->getValue(fieldnr, NULL);
+ DBUG_RETURN(m_value[fieldnr].rec == NULL);
+}
+
+
+/*
+ Check if any set or get of blob value in current query.
+*/
+bool ha_ndbcluster::uses_blob_value(bool all_fields)
+{
+ if (table->blob_fields == 0)
+ return false;
+ if (all_fields)
+ return true;
+ {
+ uint no_fields= table->fields;
+ int i;
+ THD *thd= current_thd;
+ // They always put blobs at the end..
+ for (i= no_fields - 1; i >= 0; i--)
+ {
+ Field *field= table->field[i];
+ if (thd->query_id == field->query_id)
+ {
+ return true;
+ }
+ }
+ }
+ return false;
}
@@ -462,10 +586,19 @@ void ha_ndbcluster::release_metadata()
DBUG_VOID_RETURN;
}
-NdbScanOperation::LockMode get_ndb_lock_type(enum thr_lock_type type)
+int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{
- return (type == TL_WRITE_ALLOW_WRITE) ?
- NdbScanOperation::LM_Exclusive : NdbScanOperation::LM_Read;
+ int lm;
+ if (type == TL_WRITE_ALLOW_WRITE)
+ lm = NdbScanOperation::LM_Exclusive;
+ else if (uses_blob_value(retrieve_all_fields))
+ /*
+ TODO use a new scan mode to read + lock + keyinfo
+ */
+ lm = NdbScanOperation::LM_Exclusive;
+ else
+ lm = NdbScanOperation::LM_Read;
+ return lm;
}
static const ulong index_type_flags[]=
@@ -614,7 +747,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
if (set_hidden_key(op, no_fields, key))
goto err;
// Read key at the same time, for future reference
- if (get_ndb_value(op, no_fields, NULL))
+ if (get_ndb_value(op, NULL, no_fields))
goto err;
}
else
@@ -630,13 +763,13 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
Field *field= table->field[i];
if (thd->query_id == field->query_id)
{
- if (get_ndb_value(op, i, field->ptr))
+ if (get_ndb_value(op, field, i))
goto err;
}
else
{
// Attribute was not to be read
- m_value[i]= NULL;
+ m_value[i].ptr= NULL;
}
}
@@ -700,13 +833,13 @@ int ha_ndbcluster::unique_index_read(const byte *key,
if ((thd->query_id == field->query_id) ||
(field->flags & PRI_KEY_FLAG))
{
- if (get_ndb_value(op, i, field->ptr))
+ if (get_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError());
}
else
{
// Attribute was not to be read
- m_value[i]= NULL;
+ m_value[i].ptr= NULL;
}
}
@@ -749,11 +882,22 @@ inline int ha_ndbcluster::next_result(byte *buf)
bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE;
do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
+ /*
+ We can only handle one tuple with blobs at a time.
+ */
+ if (ops_pending && blobs_pending)
+ {
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ ops_pending= 0;
+ blobs_pending= false;
+ }
check= cursor->nextResult(contact_ndb);
if (check == 0)
{
// One more record found
DBUG_PRINT("info", ("One more record found"));
+
unpack_record(buf);
table->status= 0;
DBUG_RETURN(0);
@@ -867,8 +1011,10 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
index_name= get_index_name(active_index);
if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname)))
ERR_RETURN(trans->getNdbError());
- if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,
- parallelism, sorted)))
+
+ NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
+ get_ndb_lock_type(m_lock.type);
+ if (!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
@@ -928,7 +1074,9 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
if (!(op= trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError());
- if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism)))
+ NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
+ get_ndb_lock_type(m_lock.type);
+ if (!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
@@ -997,7 +1145,9 @@ int ha_ndbcluster::full_table_scan(byte *buf)
if (!(op=trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError());
- if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism)))
+ NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
+ get_ndb_lock_type(m_lock.type);
+ if (!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op));
@@ -1021,12 +1171,12 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
(field->flags & PRI_KEY_FLAG) ||
retrieve_all_fields)
{
- if (get_ndb_value(op, i, field->ptr))
+ if (get_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError());
}
else
{
- m_value[i]= NULL;
+ m_value[i].ptr= NULL;
}
}
@@ -1040,7 +1190,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
if (!tab->getColumn(hidden_no))
DBUG_RETURN(1);
#endif
- if (get_ndb_value(op, hidden_no, NULL))
+ if (get_ndb_value(op, NULL, hidden_no))
ERR_RETURN(op->getNdbError());
}
@@ -1108,12 +1258,13 @@ int ha_ndbcluster::write_row(byte *record)
*/
rows_inserted++;
if ((rows_inserted == rows_to_insert) ||
- ((rows_inserted % bulk_insert_rows) == 0))
+ ((rows_inserted % bulk_insert_rows) == 0) ||
+ uses_blob_value(false) != 0)
{
// Send rows to NDB
DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d",
- rows_inserted, bulk_insert_rows));
+ (int)rows_inserted, (int)bulk_insert_rows));
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
}
@@ -1190,6 +1341,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (!(op= cursor->updateTuple()))
ERR_RETURN(trans->getNdbError());
ops_pending++;
+ if (uses_blob_value(false))
+ blobs_pending= true;
}
else
{
@@ -1205,7 +1358,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
// Require that the PK for this record has previously been
// read into m_value
uint no_fields= table->fields;
- NdbRecAttr* rec= m_value[no_fields];
+ NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec);
DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
@@ -1280,7 +1433,7 @@ int ha_ndbcluster::delete_row(const byte *record)
// This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key"));
uint no_fields= table->fields;
- NdbRecAttr* rec= m_value[no_fields];
+ NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec != NULL);
if (set_hidden_key(op, no_fields, rec->aRef()))
@@ -1318,7 +1471,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
{
uint row_offset= (uint) (buf - table->record[0]);
Field **field, **end;
- NdbRecAttr **value= m_value;
+ NdbValue *value= m_value;
DBUG_ENTER("unpack_record");
// Set null flag(s)
@@ -1327,8 +1480,23 @@ void ha_ndbcluster::unpack_record(byte* buf)
field < end;
field++, value++)
{
- if (*value && (*value)->isNULL())
- (*field)->set_null(row_offset);
+ if ((*value).ptr)
+ {
+ if (! ((*field)->flags & BLOB_FLAG))
+ {
+ if ((*value).rec->isNULL())
+ (*field)->set_null(row_offset);
+ }
+ else
+ {
+ NdbBlob* ndb_blob= (*value).blob;
+ bool isNull= true;
+ int ret= ndb_blob->getNull(isNull);
+ DBUG_ASSERT(ret == 0);
+ if (isNull)
+ (*field)->set_null(row_offset);
+ }
+ }
}
#ifndef DBUG_OFF
@@ -1339,7 +1507,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
int hidden_no= table->fields;
const NDBTAB *tab= (NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
- NdbRecAttr* rec= m_value[hidden_no];
+ NdbRecAttr* rec= m_value[hidden_no].rec;
DBUG_ASSERT(rec);
DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no,
hidden_col->getName(), rec->u_64_value()));
@@ -1367,9 +1535,9 @@ void ha_ndbcluster::print_results()
{
Field *field;
const NDBCOL *col;
- NdbRecAttr *value;
+ NdbValue value;
- if (!(value= m_value[f]))
+ if (!(value= m_value[f]).ptr)
{
fprintf(DBUG_FILE, "Field %d was not read\n", f);
continue;
@@ -1378,19 +1546,28 @@ void ha_ndbcluster::print_results()
DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length());
col= tab->getColumn(f);
fprintf(DBUG_FILE, "%d: %s\t", f, col->getName());
-
- if (value->isNULL())
+
+ NdbBlob *ndb_blob= NULL;
+ if (! (field->flags & BLOB_FLAG))
{
- fprintf(DBUG_FILE, "NULL\n");
- continue;
+ if (value.rec->isNULL())
+ {
+ fprintf(DBUG_FILE, "NULL\n");
+ continue;
+ }
+ }
+ else
+ {
+ ndb_blob= value.blob;
+ bool isNull= true;
+ ndb_blob->getNull(isNull);
+ if (isNull) {
+ fprintf(DBUG_FILE, "NULL\n");
+ continue;
+ }
}
switch (col->getType()) {
- case NdbDictionary::Column::Blob:
- case NdbDictionary::Column::Clob:
- case NdbDictionary::Column::Undefined:
- fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
- break;
case NdbDictionary::Column::Tinyint: {
char value= *field->ptr;
fprintf(DBUG_FILE, "Tinyint\t%d", value);
@@ -1482,6 +1659,21 @@ void ha_ndbcluster::print_results()
fprintf(DBUG_FILE, "Timespec\t%llu", value);
break;
}
+ case NdbDictionary::Column::Blob: {
+ Uint64 len= 0;
+ ndb_blob->getLength(len);
+ fprintf(DBUG_FILE, "Blob\t[len=%u]", (unsigned)len);
+ break;
+ }
+ case NdbDictionary::Column::Text: {
+ Uint64 len= 0;
+ ndb_blob->getLength(len);
+ fprintf(DBUG_FILE, "Text\t[len=%u]", (unsigned)len);
+ break;
+ }
+ case NdbDictionary::Column::Undefined:
+ fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
+ break;
}
fprintf(DBUG_FILE, "\n");
@@ -1727,7 +1919,7 @@ void ha_ndbcluster::position(const byte *record)
// No primary key, get hidden key
DBUG_PRINT("info", ("Getting hidden key"));
int hidden_no= table->fields;
- NdbRecAttr* rec= m_value[hidden_no];
+ NdbRecAttr* rec= m_value[hidden_no].rec;
const NDBTAB *tab= (NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
DBUG_ASSERT(hidden_col->getPrimaryKey() &&
@@ -1901,7 +2093,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
const NDBTAB *tab= (NDBTAB *) m_table;
DBUG_ENTER("start_bulk_insert");
- DBUG_PRINT("enter", ("rows: %d", rows));
+ DBUG_PRINT("enter", ("rows: %d", (int)rows));
rows_inserted= 0;
rows_to_insert= rows;
@@ -1936,7 +2128,7 @@ int ha_ndbcluster::end_bulk_insert()
int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
{
DBUG_ENTER("extra_opt");
- DBUG_PRINT("enter", ("cache_size: %d", cache_size));
+ DBUG_PRINT("enter", ("cache_size: %lu", cache_size));
DBUG_RETURN(extra(operation));
}
@@ -2157,7 +2349,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
NdbConnection *tablock_trans=
(NdbConnection*)thd->transaction.all.ndb_tid;
- DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans));
+ DBUG_PRINT("info", ("tablock_trans: %x", (uint)tablock_trans));
DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans);
if (trans == NULL)
ERR_RETURN(m_ndb->getNdbError());
@@ -2234,71 +2426,184 @@ int ndbcluster_rollback(THD *thd, void *ndb_transaction)
/*
- Map MySQL type to the corresponding NDB type
+ Define NDB column based on Field.
+ Returns 0 or mysql error code.
+ Not member of ha_ndbcluster because NDBCOL cannot be declared.
*/
-inline NdbDictionary::Column::Type
-mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg)
+static int create_ndb_column(NDBCOL &col,
+ Field *field,
+ HA_CREATE_INFO *info)
{
- switch(mysql_type) {
+ // Set name
+ col.setName(field->field_name);
+ // Set type and sizes
+ const enum enum_field_types mysql_type= field->real_type();
+ switch (mysql_type) {
+ // Numeric types
case MYSQL_TYPE_DECIMAL:
- return NdbDictionary::Column::Char;
+ col.setType(NDBCOL::Char);
+ col.setLength(field->pack_length());
+ break;
case MYSQL_TYPE_TINY:
- return (unsigned_flg) ?
- NdbDictionary::Column::Tinyunsigned :
- NdbDictionary::Column::Tinyint;
+ if (field->flags & UNSIGNED_FLAG)
+ col.setType(NDBCOL::Tinyunsigned);
+ else
+ col.setType(NDBCOL::Tinyint);
+ col.setLength(1);
+ break;
case MYSQL_TYPE_SHORT:
- return (unsigned_flg) ?
- NdbDictionary::Column::Smallunsigned :
- NdbDictionary::Column::Smallint;
+ if (field->flags & UNSIGNED_FLAG)
+ col.setType(NDBCOL::Smallunsigned);
+ else
+ col.setType(NDBCOL::Smallint);
+ col.setLength(1);
+ break;
case MYSQL_TYPE_LONG:
- return (unsigned_flg) ?
- NdbDictionary::Column::Unsigned :
- NdbDictionary::Column::Int;
- case MYSQL_TYPE_TIMESTAMP:
- return NdbDictionary::Column::Unsigned;
- case MYSQL_TYPE_LONGLONG:
- return (unsigned_flg) ?
- NdbDictionary::Column::Bigunsigned :
- NdbDictionary::Column::Bigint;
+ if (field->flags & UNSIGNED_FLAG)
+ col.setType(NDBCOL::Unsigned);
+ else
+ col.setType(NDBCOL::Int);
+ col.setLength(1);
+ break;
case MYSQL_TYPE_INT24:
- return (unsigned_flg) ?
- NdbDictionary::Column::Mediumunsigned :
- NdbDictionary::Column::Mediumint;
+ if (field->flags & UNSIGNED_FLAG)
+ col.setType(NDBCOL::Mediumunsigned);
+ else
+ col.setType(NDBCOL::Mediumint);
+ col.setLength(1);
+ break;
+ case MYSQL_TYPE_LONGLONG:
+ if (field->flags & UNSIGNED_FLAG)
+ col.setType(NDBCOL::Bigunsigned);
+ else
+ col.setType(NDBCOL::Bigint);
+ col.setLength(1);
break;
case MYSQL_TYPE_FLOAT:
- return NdbDictionary::Column::Float;
+ col.setType(NDBCOL::Float);
+ col.setLength(1);
+ break;
case MYSQL_TYPE_DOUBLE:
- return NdbDictionary::Column::Double;
- case MYSQL_TYPE_DATETIME :
- return NdbDictionary::Column::Datetime;
- case MYSQL_TYPE_DATE :
- case MYSQL_TYPE_NEWDATE :
- case MYSQL_TYPE_TIME :
- case MYSQL_TYPE_YEAR :
- // Missing NDB data types, mapped to char
- return NdbDictionary::Column::Char;
- case MYSQL_TYPE_ENUM :
- return NdbDictionary::Column::Char;
- case MYSQL_TYPE_SET :
- return NdbDictionary::Column::Char;
- case MYSQL_TYPE_TINY_BLOB :
- case MYSQL_TYPE_MEDIUM_BLOB :
- case MYSQL_TYPE_LONG_BLOB :
- case MYSQL_TYPE_BLOB :
- return NdbDictionary::Column::Blob;
- case MYSQL_TYPE_VAR_STRING :
- return NdbDictionary::Column::Varchar;
- case MYSQL_TYPE_STRING :
- return NdbDictionary::Column::Char;
- case MYSQL_TYPE_NULL :
- case MYSQL_TYPE_GEOMETRY :
- return NdbDictionary::Column::Undefined;
- }
- return NdbDictionary::Column::Undefined;
+ col.setType(NDBCOL::Double);
+ col.setLength(1);
+ break;
+ // Date types
+ case MYSQL_TYPE_TIMESTAMP:
+ col.setType(NDBCOL::Unsigned);
+ col.setLength(1);
+ break;
+ case MYSQL_TYPE_DATETIME:
+ col.setType(NDBCOL::Datetime);
+ col.setLength(1);
+ break;
+ case MYSQL_TYPE_DATE:
+ case MYSQL_TYPE_NEWDATE:
+ case MYSQL_TYPE_TIME:
+ case MYSQL_TYPE_YEAR:
+ col.setType(NDBCOL::Char);
+ col.setLength(field->pack_length());
+ break;
+ // Char types
+ case MYSQL_TYPE_STRING:
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Binary);
+ else
+ col.setType(NDBCOL::Char);
+ col.setLength(field->pack_length());
+ break;
+ case MYSQL_TYPE_VAR_STRING:
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Varbinary);
+ else
+ col.setType(NDBCOL::Varchar);
+ col.setLength(field->pack_length());
+ break;
+ // Blob types (all come in as MYSQL_TYPE_BLOB)
+ mysql_type_tiny_blob:
+ case MYSQL_TYPE_TINY_BLOB:
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Blob);
+ else
+ col.setType(NDBCOL::Text);
+ col.setInlineSize(256);
+ // No parts
+ col.setPartSize(0);
+ col.setStripeSize(0);
+ break;
+ mysql_type_blob:
+ case MYSQL_TYPE_BLOB:
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Blob);
+ else
+ col.setType(NDBCOL::Text);
+ // Use "<=" even if "<" is the exact condition
+ if (field->max_length() <= (1 << 8))
+ goto mysql_type_tiny_blob;
+ else if (field->max_length() <= (1 << 16))
+ {
+ col.setInlineSize(256);
+ col.setPartSize(2000);
+ col.setStripeSize(16);
+ }
+ else if (field->max_length() <= (1 << 24))
+ goto mysql_type_medium_blob;
+ else
+ goto mysql_type_long_blob;
+ break;
+ mysql_type_medium_blob:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Blob);
+ else
+ col.setType(NDBCOL::Text);
+ col.setInlineSize(256);
+ col.setPartSize(4000);
+ col.setStripeSize(8);
+ break;
+ mysql_type_long_blob:
+ case MYSQL_TYPE_LONG_BLOB:
+ if (field->flags & BINARY_FLAG)
+ col.setType(NDBCOL::Blob);
+ else
+ col.setType(NDBCOL::Text);
+ col.setInlineSize(256);
+ col.setPartSize(8000);
+ col.setStripeSize(4);
+ break;
+ // Other types
+ case MYSQL_TYPE_ENUM:
+ col.setType(NDBCOL::Char);
+ col.setLength(field->pack_length());
+ break;
+ case MYSQL_TYPE_SET:
+ col.setType(NDBCOL::Char);
+ col.setLength(field->pack_length());
+ break;
+ case MYSQL_TYPE_NULL:
+ case MYSQL_TYPE_GEOMETRY:
+ goto mysql_type_unsupported;
+ mysql_type_unsupported:
+ default:
+ return HA_ERR_UNSUPPORTED;
+ }
+ // Set nullable and pk
+ col.setNullable(field->maybe_null());
+ col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
+ // Set autoincrement
+ if (field->flags & AUTO_INCREMENT_FLAG)
+ {
+ col.setAutoIncrement(TRUE);
+ ulonglong value= info->auto_increment_value ?
+ info->auto_increment_value -1 : (ulonglong) 0;
+ DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value));
+ col.setAutoIncrementInitialValue(value);
+ }
+ else
+ col.setAutoIncrement(false);
+ return 0;
}
-
/*
Create a table in NDB Cluster
*/
@@ -2308,7 +2613,6 @@ int ha_ndbcluster::create(const char *name,
HA_CREATE_INFO *info)
{
NDBTAB tab;
- NdbDictionary::Column::Type ndb_type;
NDBCOL col;
uint pack_length, length, i;
const void *data, *pack_data;
@@ -2339,31 +2643,11 @@ int ha_ndbcluster::create(const char *name,
for (i= 0; i < form->fields; i++)
{
Field *field= form->field[i];
- ndb_type= mysql_to_ndb_type(field->real_type(),
- field->flags & UNSIGNED_FLAG);
DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d",
field->field_name, field->real_type(),
field->pack_length()));
- col.setName(field->field_name);
- col.setType(ndb_type);
- if ((ndb_type == NdbDictionary::Column::Char) ||
- (ndb_type == NdbDictionary::Column::Varchar))
- col.setLength(field->pack_length());
- else
- col.setLength(1);
- col.setNullable(field->maybe_null());
- col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
- if (field->flags & AUTO_INCREMENT_FLAG)
- {
- col.setAutoIncrement(TRUE);
- ulonglong value= info->auto_increment_value ?
- info->auto_increment_value -1 : (ulonglong) 0;
- DBUG_PRINT("info", ("Autoincrement key, initial: %d", value));
- col.setAutoIncrementInitialValue(value);
- }
- else
- col.setAutoIncrement(false);
-
+ if (my_errno= create_ndb_column(col, field, info))
+ DBUG_RETURN(my_errno);
tab.addColumn(col);
}
@@ -2631,14 +2915,15 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_table(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ |
HA_NOT_EXACT_COUNT |
- HA_NO_PREFIX_CHAR_KEYS |
- HA_NO_BLOBS),
+ HA_NO_PREFIX_CHAR_KEYS),
m_use_write(false),
retrieve_all_fields(FALSE),
rows_to_insert(0),
rows_inserted(0),
bulk_insert_rows(1024),
- ops_pending(0)
+ ops_pending(0),
+ blobs_buffer(0),
+ blobs_buffer_size(0)
{
int i;
@@ -2671,6 +2956,8 @@ ha_ndbcluster::~ha_ndbcluster()
DBUG_ENTER("~ha_ndbcluster");
release_metadata();
+ my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+ blobs_buffer= 0;
// Check for open cursor/transaction
DBUG_ASSERT(m_active_cursor == NULL);
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index fc0d607abaa..e08b409059c 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -35,6 +35,7 @@ class NdbRecAttr; // Forward declaration
class NdbResultSet; // Forward declaration
class NdbScanOperation;
class NdbIndexScanOperation;
+class NdbBlob;
typedef enum ndb_index_type {
UNDEFINED_INDEX = 0,
@@ -171,6 +172,7 @@ class ha_ndbcluster: public handler
enum ha_rkey_function find_flag);
int close_scan();
void unpack_record(byte *buf);
+ int get_ndb_lock_type(enum thr_lock_type type);
void set_dbname(const char *pathname);
void set_tabname(const char *pathname);
@@ -181,7 +183,9 @@ class ha_ndbcluster: public handler
int set_ndb_key(NdbOperation*, Field *field,
uint fieldnr, const byte* field_ptr);
int set_ndb_value(NdbOperation*, Field *field, uint fieldnr);
- int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr);
+ int get_ndb_value(NdbOperation*, Field *field, uint fieldnr);
+ friend int ::get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg);
+ int get_ndb_blobs_value(NdbBlob *last_ndb_blob);
int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key(NdbOperation *op);
int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data);
@@ -191,8 +195,8 @@ class ha_ndbcluster: public handler
void print_results();
longlong get_auto_increment();
-
int ndb_err(NdbConnection*);
+ bool uses_blob_value(bool all_fields);
private:
int check_ndb_connection();
@@ -209,13 +213,19 @@ class ha_ndbcluster: public handler
NDB_SHARE *m_share;
NDB_INDEX_TYPE m_indextype[MAX_KEY];
const char* m_unique_index_name[MAX_KEY];
- NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
+ // NdbRecAttr has no reference to blob
+ typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
+ NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
bool m_use_write;
bool retrieve_all_fields;
ha_rows rows_to_insert;
ha_rows rows_inserted;
ha_rows bulk_insert_rows;
ha_rows ops_pending;
+ bool blobs_pending;
+ // memory for blobs in one tuple
+ char *blobs_buffer;
+ uint32 blobs_buffer_size;
};
bool ndbcluster_init(void);
@@ -231,10 +241,3 @@ int ndbcluster_discover(const char* dbname, const char* name,
int ndbcluster_drop_database(const char* path);
void ndbcluster_print_error(int error, const NdbOperation *error_op);
-
-
-
-
-
-
-