diff options
-rw-r--r-- | mysql-test/r/ndb_blob.result | 272 | ||||
-rw-r--r-- | mysql-test/t/ndb_blob.test | 249 | ||||
-rw-r--r-- | sql/ha_ndbcluster.cc | 677 | ||||
-rw-r--r-- | sql/ha_ndbcluster.h | 23 |
4 files changed, 1016 insertions, 205 deletions
diff --git a/mysql-test/r/ndb_blob.result b/mysql-test/r/ndb_blob.result index e69de29bb2d..89b53aea7d1 100644 --- a/mysql-test/r/ndb_blob.result +++ b/mysql-test/r/ndb_blob.result @@ -0,0 +1,272 @@ +drop table if exists t1; +set autocommit=0; +create table t1 ( +a int not null primary key, +b text not null, +c int not null, +d longblob, +key (c) +) engine=ndbcluster; +set @x0 = '01234567012345670123456701234567'; +set @x0 = concat(@x0,@x0,@x0,@x0,@x0,@x0,@x0,@x0); +set @b1 = 'b1'; +set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1); +set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1); +set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1); +set @b1 = concat(@b1,@x0); +set @d1 = 'dd1'; +set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1); +set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1); +set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1); +set @b2 = 'b2'; +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @d2 = 'dd2'; +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +select length(@x0),length(@b1),length(@d1) from dual; +length(@x0) length(@b1) length(@d1) +256 2256 3000 +select length(@x0),length(@b2),length(@d2) from dual; +length(@x0) length(@b2) length(@d2) +256 20000 30000 +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1 where a = 1; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 const PRIMARY PRIMARY 4 const 1 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a=1; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +1 2256 b1 3000 dd1 +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where a=2; +a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3) +2 20000 b2 30000 dd2 +update t1 set b=@b2,d=@d2 where a=1; +update t1 set b=@b1,d=@d1 where a=2; +commit; +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where a=1; +a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3) +1 20000 b2 30000 dd2 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a=2; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +2 2256 b1 3000 dd1 +update t1 set b=concat(b,b),d=concat(d,d) where a=1; +update t1 set b=concat(b,b),d=concat(d,d) where a=2; +commit; +select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3) +from t1 where a=1; +a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3) +1 40000 b2 60000 dd2 +select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3) +from t1 where a=2; +a length(b) substr(b,1+4*900,2) length(d) substr(d,1+6*900,3) +2 4512 b1 6000 dd1 +update t1 set d=null where a=1; +commit; +select a from t1 where d is null; +a +1 +delete from t1 where a=1; +delete from t1 where a=2; +commit; +select count(*) from t1; +count(*) +0 +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1 where c = 111; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 ref c c 4 const 10 Using where +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where c=111; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +1 2256 b1 3000 dd1 +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where c=222; +a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3) +2 20000 b2 30000 dd2 +update t1 set b=@b2,d=@d2 where c=111; +update t1 set b=@b1,d=@d1 where c=222; +commit; +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where c=111; +a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3) +1 20000 b2 30000 dd2 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where c=222; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +2 2256 b1 3000 dd1 +update t1 set d=null where c=111; +commit; +select a from t1 where d is null; +a +1 +delete from t1 where c=111; +delete from t1 where c=222; +commit; +select count(*) from t1; +count(*) +0 +insert into t1 values(1,'b1',111,'dd1'); +insert into t1 values(2,'b2',222,'dd2'); +insert into t1 values(3,'b3',333,'dd3'); +insert into t1 values(4,'b4',444,'dd4'); +insert into t1 values(5,'b5',555,'dd5'); +insert into t1 values(6,'b6',666,'dd6'); +insert into t1 values(7,'b7',777,'dd7'); +insert into t1 values(8,'b8',888,'dd8'); +insert into t1 values(9,'b9',999,'dd9'); +commit; +explain select * from t1; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 ALL NULL NULL NULL NULL 100 +select * from t1 order by a; +a b c d +1 b1 111 dd1 +2 b2 222 dd2 +3 b3 333 dd3 +4 b4 444 dd4 +5 b5 555 dd5 +6 b6 666 dd6 +7 b7 777 dd7 +8 b8 888 dd8 +9 b9 999 dd9 +update t1 set b=concat(a,'x',b),d=concat(a,'x',d); +commit; +select * from t1 order by a; +a b c d +1 1xb1 111 1xdd1 +2 2xb2 222 2xdd2 +3 3xb3 333 3xdd3 +4 4xb4 444 4xdd4 +5 5xb5 555 5xdd5 +6 6xb6 666 6xdd6 +7 7xb7 777 7xdd7 +8 8xb8 888 8xdd8 +9 9xb9 999 9xdd9 +delete from t1; +commit; +select count(*) from t1; +count(*) +0 +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 ALL NULL NULL NULL NULL 100 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 order by a; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +1 2256 b1 3000 dd1 +2 20000 b2 30000 dd2 +update t1 set b=concat(b,b),d=concat(d,d); +commit; +select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3) +from t1 order by a; +a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3) +1 4512 6000 +2 40000 b2 60000 dd2 +delete from t1; +commit; +select count(*) from t1; +count(*) +0 +insert into t1 values(1,'b1',111,'dd1'); +insert into t1 values(2,'b2',222,'dd2'); +insert into t1 values(3,'b3',333,'dd3'); +insert into t1 values(4,'b4',444,'dd4'); +insert into t1 values(5,'b5',555,'dd5'); +insert into t1 values(6,'b6',666,'dd6'); +insert into t1 values(7,'b7',777,'dd7'); +insert into t1 values(8,'b8',888,'dd8'); +insert into t1 values(9,'b9',999,'dd9'); +commit; +explain select * from t1 where c >= 100 order by a; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 range c c 4 NULL 10 Using where; Using filesort +select * from t1 where c >= 100 order by a; +a b c d +1 b1 111 dd1 +2 b2 222 dd2 +3 b3 333 dd3 +4 b4 444 dd4 +5 b5 555 dd5 +6 b6 666 dd6 +7 b7 777 dd7 +8 b8 888 dd8 +9 b9 999 dd9 +update t1 set b=concat(a,'x',b),d=concat(a,'x',d) +where c >= 100; +commit; +select * from t1 where c >= 100 order by a; +a b c d +1 1xb1 111 1xdd1 +2 2xb2 222 2xdd2 +3 3xb3 333 3xdd3 +4 4xb4 444 4xdd4 +5 5xb5 555 5xdd5 +6 6xb6 666 6xdd6 +7 7xb7 777 7xdd7 +8 8xb8 888 8xdd8 +9 9xb9 999 9xdd9 +delete from t1 where c >= 100; +commit; +select count(*) from t1; +count(*) +0 +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1 where c >= 100 order by a; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 range c c 4 NULL 10 Using where; Using filesort +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where c >= 100 order by a; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +1 2256 b1 3000 dd1 +2 20000 b2 30000 dd2 +update t1 set b=concat(b,b),d=concat(d,d); +commit; +select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3) +from t1 where c >= 100 order by a; +a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3) +1 4512 6000 +2 40000 b2 60000 dd2 +delete from t1 where c >= 100; +commit; +select count(*) from t1; +count(*) +0 +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a = 0; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a = 1; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +1 2256 b1 3000 dd1 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a = 2; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +2 20000 b2 30000 dd2 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 order by a; +a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3) +1 2256 b1 3000 dd1 +2 20000 b2 30000 dd2 +rollback; +select count(*) from t1; +count(*) +0 diff --git a/mysql-test/t/ndb_blob.test b/mysql-test/t/ndb_blob.test index e69de29bb2d..c1166a7a90c 100644 --- a/mysql-test/t/ndb_blob.test +++ b/mysql-test/t/ndb_blob.test @@ -0,0 +1,249 @@ +--source include/have_ndb.inc + +--disable_warnings +drop table if exists t1; +--enable_warnings + +# +# Minimal NDB blobs test. +# +# On NDB API level there is an extensive test program "testBlobs". +# A prerequisite for this handler test is that "testBlobs" succeeds. +# + +# make test harder with autocommit off +set autocommit=0; + +create table t1 ( + a int not null primary key, + b text not null, + c int not null, + d longblob, + key (c) +) engine=ndbcluster; + +# -- values -- + +# x0 size 256 (current inline size) +set @x0 = '01234567012345670123456701234567'; +set @x0 = concat(@x0,@x0,@x0,@x0,@x0,@x0,@x0,@x0); + +# b1 length 2000+256 (blob part aligned) +set @b1 = 'b1'; +set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1); +set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1); +set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1); +set @b1 = concat(@b1,@x0); +# d1 length 3000 +set @d1 = 'dd1'; +set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1); +set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1); +set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1); + +# b2 length 20000 +set @b2 = 'b2'; +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2); +# d2 length 30000 +set @d2 = 'dd2'; +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); +set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2); + +select length(@x0),length(@b1),length(@d1) from dual; +select length(@x0),length(@b2),length(@d2) from dual; + +# -- pk ops -- + +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1 where a = 1; + +# pk read +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a=1; +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where a=2; + +# pk update +update t1 set b=@b2,d=@d2 where a=1; +update t1 set b=@b1,d=@d1 where a=2; +commit; +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where a=1; +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a=2; + +# pk update +update t1 set b=concat(b,b),d=concat(d,d) where a=1; +update t1 set b=concat(b,b),d=concat(d,d) where a=2; +commit; +select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3) +from t1 where a=1; +select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3) +from t1 where a=2; + +# pk update to null +update t1 set d=null where a=1; +commit; +select a from t1 where d is null; + +# pk delete +delete from t1 where a=1; +delete from t1 where a=2; +commit; +select count(*) from t1; + +# -- hash index ops -- + +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1 where c = 111; + +# hash key read +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where c=111; +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where c=222; + +# hash key update +update t1 set b=@b2,d=@d2 where c=111; +update t1 set b=@b1,d=@d1 where c=222; +commit; +select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3) +from t1 where c=111; +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where c=222; + +# hash key update to null +update t1 set d=null where c=111; +commit; +select a from t1 where d is null; + +# hash key delete +delete from t1 where c=111; +delete from t1 where c=222; +commit; +select count(*) from t1; + +# -- table scan ops, short values -- + +insert into t1 values(1,'b1',111,'dd1'); +insert into t1 values(2,'b2',222,'dd2'); +insert into t1 values(3,'b3',333,'dd3'); +insert into t1 values(4,'b4',444,'dd4'); +insert into t1 values(5,'b5',555,'dd5'); +insert into t1 values(6,'b6',666,'dd6'); +insert into t1 values(7,'b7',777,'dd7'); +insert into t1 values(8,'b8',888,'dd8'); +insert into t1 values(9,'b9',999,'dd9'); +commit; +explain select * from t1; + +# table scan read +select * from t1 order by a; + +# table scan update +update t1 set b=concat(a,'x',b),d=concat(a,'x',d); +commit; +select * from t1 order by a; + +# table scan delete +delete from t1; +commit; +select count(*) from t1; + +# -- table scan ops, long values -- + +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1; + +# table scan read +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 order by a; + +# table scan update +update t1 set b=concat(b,b),d=concat(d,d); +commit; +select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3) +from t1 order by a; + +# table scan delete +delete from t1; +commit; +select count(*) from t1; + +# -- range scan ops, short values -- + +insert into t1 values(1,'b1',111,'dd1'); +insert into t1 values(2,'b2',222,'dd2'); +insert into t1 values(3,'b3',333,'dd3'); +insert into t1 values(4,'b4',444,'dd4'); +insert into t1 values(5,'b5',555,'dd5'); +insert into t1 values(6,'b6',666,'dd6'); +insert into t1 values(7,'b7',777,'dd7'); +insert into t1 values(8,'b8',888,'dd8'); +insert into t1 values(9,'b9',999,'dd9'); +commit; +explain select * from t1 where c >= 100 order by a; + +# range scan read +select * from t1 where c >= 100 order by a; + +# range scan update +update t1 set b=concat(a,'x',b),d=concat(a,'x',d) +where c >= 100; +commit; +select * from t1 where c >= 100 order by a; + +# range scan delete +delete from t1 where c >= 100; +commit; +select count(*) from t1; + +# -- range scan ops, long values -- + +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +commit; +explain select * from t1 where c >= 100 order by a; + +# range scan read +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where c >= 100 order by a; + +# range scan update +update t1 set b=concat(b,b),d=concat(d,d); +commit; +select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3) +from t1 where c >= 100 order by a; + +# range scan delete +delete from t1 where c >= 100; +commit; +select count(*) from t1; + +# -- rollback -- + +insert into t1 values(1,@b1,111,@d1); +insert into t1 values(2,@b2,222,@d2); +# 626 +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a = 0; +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a = 1; +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 where a = 2; +select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) +from t1 order by a; +rollback; +select count(*) from t1; + +--drop table t1; diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index b57735b9de6..e46857bc6f6 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -182,6 +182,21 @@ bool ha_ndbcluster::get_error_message(int error, /* + Check if type is supported by NDB. +*/ + +static inline bool ndb_supported_type(enum_field_types type) +{ + switch (type) { + case MYSQL_TYPE_NULL: + case MYSQL_TYPE_GEOMETRY: + return false; + } + return true; +} + + +/* Instruct NDB to set the value of the hidden primary key */ @@ -208,40 +223,15 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field, pack_len)); DBUG_DUMP("key", (char*)field_ptr, pack_len); - switch (field->type()) { - case MYSQL_TYPE_DECIMAL: - case MYSQL_TYPE_TINY: - case MYSQL_TYPE_SHORT: - case MYSQL_TYPE_LONG: - case MYSQL_TYPE_FLOAT: - case MYSQL_TYPE_DOUBLE: - case MYSQL_TYPE_TIMESTAMP: - case MYSQL_TYPE_LONGLONG: - case MYSQL_TYPE_INT24: - case MYSQL_TYPE_DATE: - case MYSQL_TYPE_TIME: - case MYSQL_TYPE_DATETIME: - case MYSQL_TYPE_YEAR: - case MYSQL_TYPE_NEWDATE: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_SET: - case MYSQL_TYPE_VAR_STRING: - case MYSQL_TYPE_STRING: - // Common implementation for most field types - DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0); - - case MYSQL_TYPE_TINY_BLOB: - case MYSQL_TYPE_MEDIUM_BLOB: - case MYSQL_TYPE_LONG_BLOB: - case MYSQL_TYPE_BLOB: - case MYSQL_TYPE_NULL: - case MYSQL_TYPE_GEOMETRY: - default: - // Unhandled field types - DBUG_PRINT("error", ("Field type %d not supported", field->type())); - DBUG_RETURN(2); + if (ndb_supported_type(field->type())) + { + if (! (field->flags & BLOB_FLAG)) + // Common implementation for most field types + DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0); } - DBUG_RETURN(3); + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); } @@ -259,63 +249,197 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, fieldnr, field->field_name, field->type(), pack_len, field->is_null()?"Y":"N")); DBUG_DUMP("value", (char*) field_ptr, pack_len); - - if (field->is_null()) + + if (ndb_supported_type(field->type())) { - // Set value to NULL - DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0)); - } - - switch (field->type()) { - case MYSQL_TYPE_DECIMAL: - case MYSQL_TYPE_TINY: - case MYSQL_TYPE_SHORT: - case MYSQL_TYPE_LONG: - case MYSQL_TYPE_FLOAT: - case MYSQL_TYPE_DOUBLE: - case MYSQL_TYPE_TIMESTAMP: - case MYSQL_TYPE_LONGLONG: - case MYSQL_TYPE_INT24: - case MYSQL_TYPE_DATE: - case MYSQL_TYPE_TIME: - case MYSQL_TYPE_DATETIME: - case MYSQL_TYPE_YEAR: - case MYSQL_TYPE_NEWDATE: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_SET: - case MYSQL_TYPE_VAR_STRING: - case MYSQL_TYPE_STRING: - // Common implementation for most field types - DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0); - - case MYSQL_TYPE_TINY_BLOB: - case MYSQL_TYPE_MEDIUM_BLOB: - case MYSQL_TYPE_LONG_BLOB: - case MYSQL_TYPE_BLOB: - case MYSQL_TYPE_NULL: - case MYSQL_TYPE_GEOMETRY: - default: - // Unhandled field types - DBUG_PRINT("error", ("Field type %d not supported", field->type())); - DBUG_RETURN(2); + if (! (field->flags & BLOB_FLAG)) + { + if (field->is_null()) + // Set value to NULL + DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0)); + // Common implementation for most field types + DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0); + } + + // Blob type + NdbBlob *ndb_blob = ndb_op->getBlobHandle(fieldnr); + if (ndb_blob != NULL) + { + if (field->is_null()) + DBUG_RETURN(ndb_blob->setNull() != 0); + + Field_blob *field_blob= (Field_blob*)field; + + // Get length and pointer to data + uint32 blob_len= field_blob->get_length(field_ptr); + char* blob_ptr= NULL; + field_blob->get_ptr(&blob_ptr); + + // Looks like NULL blob can also be signaled in this way + if (blob_ptr == NULL) + DBUG_RETURN(ndb_blob->setNull() != 0); + + DBUG_PRINT("value", ("set blob ptr=%x len=%u", + (unsigned)blob_ptr, blob_len)); + DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26)); + + // No callback needed to write value + DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0); + } + DBUG_RETURN(1); + } + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); +} + + +/* + Callback to read all blob values. + - not done in unpack_record because unpack_record is valid + after execute(Commit) but reading blobs is not + - may only generate read operations; they have to be executed + somewhere before the data is available + - due to single buffer for all blobs, we let the last blob + process all blobs (last so that all are active) + - null bit is still set in unpack_record + - TODO allocate blob part aligned buffers +*/ + +NdbBlob::ActiveHook get_ndb_blobs_value; + +int get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg) +{ + DBUG_ENTER("get_ndb_blobs_value [callback]"); + if (ndb_blob->blobsNextBlob() != NULL) + DBUG_RETURN(0); + ha_ndbcluster *ha= (ha_ndbcluster *)arg; + DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob)); +} + +int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) +{ + DBUG_ENTER("get_ndb_blobs_value"); + + // Field has no field number so cannot use TABLE blob_field + // Loop twice, first only counting total buffer size + for (int loop= 0; loop <= 1; loop++) + { + uint32 offset= 0; + for (uint i= 0; i < table->fields; i++) + { + Field *field= table->field[i]; + NdbValue value= m_value[i]; + if (value.ptr != NULL && (field->flags & BLOB_FLAG)) + { + Field_blob *field_blob= (Field_blob *)field; + NdbBlob *ndb_blob= value.blob; + Uint64 blob_len= 0; + if (ndb_blob->getLength(blob_len) != 0) + DBUG_RETURN(-1); + // Align to Uint64 + uint32 blob_size= blob_len; + if (blob_size % 8 != 0) + blob_size+= 8 - blob_size % 8; + if (loop == 1) + { + char *buf= blobs_buffer + offset; + uint32 len= 0xffffffff; // Max uint32 + DBUG_PRINT("value", ("read blob ptr=%x len=%u", + (uint)buf, (uint)blob_len)); + if (ndb_blob->readData(buf, len) != 0) + DBUG_RETURN(-1); + DBUG_ASSERT(len == blob_len); + field_blob->set_ptr(len, buf); + } + offset+= blob_size; + } + } + if (loop == 0 && offset > blobs_buffer_size) + { + my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); + blobs_buffer_size= 0; + DBUG_PRINT("value", ("allocate blobs buffer size %u", offset)); + blobs_buffer= my_malloc(offset, MYF(MY_WME)); + if (blobs_buffer == NULL) + DBUG_RETURN(-1); + blobs_buffer_size= offset; + } } - DBUG_RETURN(3); + DBUG_RETURN(0); } /* Instruct NDB to fetch one field - - data is read directly into buffer provided by field_ptr - if it's NULL, data is read into memory provided by NDBAPI + - data is read directly into buffer provided by field + if field is NULL, data is read into memory provided by NDBAPI */ -int ha_ndbcluster::get_ndb_value(NdbOperation *op, - uint field_no, byte *field_ptr) +int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field, + uint fieldnr) { DBUG_ENTER("get_ndb_value"); - DBUG_PRINT("enter", ("field_no: %d", field_no)); - m_value[field_no]= op->getValue(field_no, field_ptr); - DBUG_RETURN(m_value == NULL); + DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr, + (int)(field != NULL ? field->flags : 0))); + + if (field != NULL) + { + if (ndb_supported_type(field->type())) + { + DBUG_ASSERT(field->ptr != NULL); + if (! (field->flags & BLOB_FLAG)) + { + m_value[fieldnr].rec= ndb_op->getValue(fieldnr, field->ptr); + DBUG_RETURN(m_value[fieldnr].rec == NULL); + } + + // Blob type + NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr); + m_value[fieldnr].blob= ndb_blob; + if (ndb_blob != NULL) + { + // Set callback + void *arg= (void *)this; + DBUG_RETURN(ndb_blob->setActiveHook(::get_ndb_blobs_value, arg) != 0); + } + DBUG_RETURN(1); + } + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); + } + + // Used for hidden key only + m_value[fieldnr].rec= ndb_op->getValue(fieldnr, NULL); + DBUG_RETURN(m_value[fieldnr].rec == NULL); +} + + +/* + Check if any set or get of blob value in current query. +*/ +bool ha_ndbcluster::uses_blob_value(bool all_fields) +{ + if (table->blob_fields == 0) + return false; + if (all_fields) + return true; + { + uint no_fields= table->fields; + int i; + THD *thd= current_thd; + // They always put blobs at the end.. + for (i= no_fields - 1; i >= 0; i--) + { + Field *field= table->field[i]; + if (thd->query_id == field->query_id) + { + return true; + } + } + } + return false; } @@ -462,10 +586,19 @@ void ha_ndbcluster::release_metadata() DBUG_VOID_RETURN; } -NdbScanOperation::LockMode get_ndb_lock_type(enum thr_lock_type type) +int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type) { - return (type == TL_WRITE_ALLOW_WRITE) ? - NdbScanOperation::LM_Exclusive : NdbScanOperation::LM_Read; + int lm; + if (type == TL_WRITE_ALLOW_WRITE) + lm = NdbScanOperation::LM_Exclusive; + else if (uses_blob_value(retrieve_all_fields)) + /* + TODO use a new scan mode to read + lock + keyinfo + */ + lm = NdbScanOperation::LM_Exclusive; + else + lm = NdbScanOperation::LM_Read; + return lm; } static const ulong index_type_flags[]= @@ -614,7 +747,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) if (set_hidden_key(op, no_fields, key)) goto err; // Read key at the same time, for future reference - if (get_ndb_value(op, no_fields, NULL)) + if (get_ndb_value(op, NULL, no_fields)) goto err; } else @@ -630,13 +763,13 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) Field *field= table->field[i]; if (thd->query_id == field->query_id) { - if (get_ndb_value(op, i, field->ptr)) + if (get_ndb_value(op, field, i)) goto err; } else { // Attribute was not to be read - m_value[i]= NULL; + m_value[i].ptr= NULL; } } @@ -700,13 +833,13 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((thd->query_id == field->query_id) || (field->flags & PRI_KEY_FLAG)) { - if (get_ndb_value(op, i, field->ptr)) + if (get_ndb_value(op, field, i)) ERR_RETURN(op->getNdbError()); } else { // Attribute was not to be read - m_value[i]= NULL; + m_value[i].ptr= NULL; } } @@ -749,11 +882,22 @@ inline int ha_ndbcluster::next_result(byte *buf) bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE; do { DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb)); + /* + We can only handle one tuple with blobs at a time. + */ + if (ops_pending && blobs_pending) + { + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + ops_pending= 0; + blobs_pending= false; + } check= cursor->nextResult(contact_ndb); if (check == 0) { // One more record found DBUG_PRINT("info", ("One more record found")); + unpack_record(buf); table->status= 0; DBUG_RETURN(0); @@ -867,8 +1011,10 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, index_name= get_index_name(active_index); if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname))) ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0, - parallelism, sorted))) + + NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) + get_ndb_lock_type(m_lock.type); + if (!(cursor= op->readTuples(lm, 0, parallelism, sorted))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; @@ -928,7 +1074,9 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, if (!(op= trans->getNdbScanOperation(m_tabname))) ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism))) + NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) + get_ndb_lock_type(m_lock.type); + if (!(cursor= op->readTuples(lm, 0, parallelism))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; @@ -997,7 +1145,9 @@ int ha_ndbcluster::full_table_scan(byte *buf) if (!(op=trans->getNdbScanOperation(m_tabname))) ERR_RETURN(trans->getNdbError()); - if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism))) + NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode) + get_ndb_lock_type(m_lock.type); + if (!(cursor= op->readTuples(lm, 0, parallelism))) ERR_RETURN(trans->getNdbError()); m_active_cursor= cursor; DBUG_RETURN(define_read_attrs(buf, op)); @@ -1021,12 +1171,12 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) (field->flags & PRI_KEY_FLAG) || retrieve_all_fields) { - if (get_ndb_value(op, i, field->ptr)) + if (get_ndb_value(op, field, i)) ERR_RETURN(op->getNdbError()); } else { - m_value[i]= NULL; + m_value[i].ptr= NULL; } } @@ -1040,7 +1190,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) if (!tab->getColumn(hidden_no)) DBUG_RETURN(1); #endif - if (get_ndb_value(op, hidden_no, NULL)) + if (get_ndb_value(op, NULL, hidden_no)) ERR_RETURN(op->getNdbError()); } @@ -1108,12 +1258,13 @@ int ha_ndbcluster::write_row(byte *record) */ rows_inserted++; if ((rows_inserted == rows_to_insert) || - ((rows_inserted % bulk_insert_rows) == 0)) + ((rows_inserted % bulk_insert_rows) == 0) || + uses_blob_value(false) != 0) { // Send rows to NDB DBUG_PRINT("info", ("Sending inserts to NDB, "\ "rows_inserted:%d, bulk_insert_rows: %d", - rows_inserted, bulk_insert_rows)); + (int)rows_inserted, (int)bulk_insert_rows)); if (trans->execute(NoCommit) != 0) DBUG_RETURN(ndb_err(trans)); } @@ -1190,6 +1341,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) if (!(op= cursor->updateTuple())) ERR_RETURN(trans->getNdbError()); ops_pending++; + if (uses_blob_value(false)) + blobs_pending= true; } else { @@ -1205,7 +1358,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) // Require that the PK for this record has previously been // read into m_value uint no_fields= table->fields; - NdbRecAttr* rec= m_value[no_fields]; + NdbRecAttr* rec= m_value[no_fields].rec; DBUG_ASSERT(rec); DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH); @@ -1280,7 +1433,7 @@ int ha_ndbcluster::delete_row(const byte *record) // This table has no primary key, use "hidden" primary key DBUG_PRINT("info", ("Using hidden key")); uint no_fields= table->fields; - NdbRecAttr* rec= m_value[no_fields]; + NdbRecAttr* rec= m_value[no_fields].rec; DBUG_ASSERT(rec != NULL); if (set_hidden_key(op, no_fields, rec->aRef())) @@ -1318,7 +1471,7 @@ void ha_ndbcluster::unpack_record(byte* buf) { uint row_offset= (uint) (buf - table->record[0]); Field **field, **end; - NdbRecAttr **value= m_value; + NdbValue *value= m_value; DBUG_ENTER("unpack_record"); // Set null flag(s) @@ -1327,8 +1480,23 @@ void ha_ndbcluster::unpack_record(byte* buf) field < end; field++, value++) { - if (*value && (*value)->isNULL()) - (*field)->set_null(row_offset); + if ((*value).ptr) + { + if (! ((*field)->flags & BLOB_FLAG)) + { + if ((*value).rec->isNULL()) + (*field)->set_null(row_offset); + } + else + { + NdbBlob* ndb_blob= (*value).blob; + bool isNull= true; + int ret= ndb_blob->getNull(isNull); + DBUG_ASSERT(ret == 0); + if (isNull) + (*field)->set_null(row_offset); + } + } } #ifndef DBUG_OFF @@ -1339,7 +1507,7 @@ void ha_ndbcluster::unpack_record(byte* buf) int hidden_no= table->fields; const NDBTAB *tab= (NDBTAB *) m_table; const NDBCOL *hidden_col= tab->getColumn(hidden_no); - NdbRecAttr* rec= m_value[hidden_no]; + NdbRecAttr* rec= m_value[hidden_no].rec; DBUG_ASSERT(rec); DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no, hidden_col->getName(), rec->u_64_value())); @@ -1367,9 +1535,9 @@ void ha_ndbcluster::print_results() { Field *field; const NDBCOL *col; - NdbRecAttr *value; + NdbValue value; - if (!(value= m_value[f])) + if (!(value= m_value[f]).ptr) { fprintf(DBUG_FILE, "Field %d was not read\n", f); continue; @@ -1378,19 +1546,28 @@ void ha_ndbcluster::print_results() DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length()); col= tab->getColumn(f); fprintf(DBUG_FILE, "%d: %s\t", f, col->getName()); - - if (value->isNULL()) + + NdbBlob *ndb_blob= NULL; + if (! (field->flags & BLOB_FLAG)) { - fprintf(DBUG_FILE, "NULL\n"); - continue; + if (value.rec->isNULL()) + { + fprintf(DBUG_FILE, "NULL\n"); + continue; + } + } + else + { + ndb_blob= value.blob; + bool isNull= true; + ndb_blob->getNull(isNull); + if (isNull) { + fprintf(DBUG_FILE, "NULL\n"); + continue; + } } switch (col->getType()) { - case NdbDictionary::Column::Blob: - case NdbDictionary::Column::Clob: - case NdbDictionary::Column::Undefined: - fprintf(DBUG_FILE, "Unknown type: %d", col->getType()); - break; case NdbDictionary::Column::Tinyint: { char value= *field->ptr; fprintf(DBUG_FILE, "Tinyint\t%d", value); @@ -1482,6 +1659,21 @@ void ha_ndbcluster::print_results() fprintf(DBUG_FILE, "Timespec\t%llu", value); break; } + case NdbDictionary::Column::Blob: { + Uint64 len= 0; + ndb_blob->getLength(len); + fprintf(DBUG_FILE, "Blob\t[len=%u]", (unsigned)len); + break; + } + case NdbDictionary::Column::Text: { + Uint64 len= 0; + ndb_blob->getLength(len); + fprintf(DBUG_FILE, "Text\t[len=%u]", (unsigned)len); + break; + } + case NdbDictionary::Column::Undefined: + fprintf(DBUG_FILE, "Unknown type: %d", col->getType()); + break; } fprintf(DBUG_FILE, "\n"); @@ -1727,7 +1919,7 @@ void ha_ndbcluster::position(const byte *record) // No primary key, get hidden key DBUG_PRINT("info", ("Getting hidden key")); int hidden_no= table->fields; - NdbRecAttr* rec= m_value[hidden_no]; + NdbRecAttr* rec= m_value[hidden_no].rec; const NDBTAB *tab= (NDBTAB *) m_table; const NDBCOL *hidden_col= tab->getColumn(hidden_no); DBUG_ASSERT(hidden_col->getPrimaryKey() && @@ -1901,7 +2093,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) const NDBTAB *tab= (NDBTAB *) m_table; DBUG_ENTER("start_bulk_insert"); - DBUG_PRINT("enter", ("rows: %d", rows)); + DBUG_PRINT("enter", ("rows: %d", (int)rows)); rows_inserted= 0; rows_to_insert= rows; @@ -1936,7 +2128,7 @@ int ha_ndbcluster::end_bulk_insert() int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size) { DBUG_ENTER("extra_opt"); - DBUG_PRINT("enter", ("cache_size: %d", cache_size)); + DBUG_PRINT("enter", ("cache_size: %lu", cache_size)); DBUG_RETURN(extra(operation)); } @@ -2157,7 +2349,7 @@ int ha_ndbcluster::start_stmt(THD *thd) NdbConnection *tablock_trans= (NdbConnection*)thd->transaction.all.ndb_tid; - DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans)); + DBUG_PRINT("info", ("tablock_trans: %x", (uint)tablock_trans)); DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans); if (trans == NULL) ERR_RETURN(m_ndb->getNdbError()); @@ -2234,71 +2426,184 @@ int ndbcluster_rollback(THD *thd, void *ndb_transaction) /* - Map MySQL type to the corresponding NDB type + Define NDB column based on Field. + Returns 0 or mysql error code. + Not member of ha_ndbcluster because NDBCOL cannot be declared. */ -inline NdbDictionary::Column::Type -mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg) +static int create_ndb_column(NDBCOL &col, + Field *field, + HA_CREATE_INFO *info) { - switch(mysql_type) { + // Set name + col.setName(field->field_name); + // Set type and sizes + const enum enum_field_types mysql_type= field->real_type(); + switch (mysql_type) { + // Numeric types case MYSQL_TYPE_DECIMAL: - return NdbDictionary::Column::Char; + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; case MYSQL_TYPE_TINY: - return (unsigned_flg) ? - NdbDictionary::Column::Tinyunsigned : - NdbDictionary::Column::Tinyint; + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Tinyunsigned); + else + col.setType(NDBCOL::Tinyint); + col.setLength(1); + break; case MYSQL_TYPE_SHORT: - return (unsigned_flg) ? - NdbDictionary::Column::Smallunsigned : - NdbDictionary::Column::Smallint; + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Smallunsigned); + else + col.setType(NDBCOL::Smallint); + col.setLength(1); + break; case MYSQL_TYPE_LONG: - return (unsigned_flg) ? - NdbDictionary::Column::Unsigned : - NdbDictionary::Column::Int; - case MYSQL_TYPE_TIMESTAMP: - return NdbDictionary::Column::Unsigned; - case MYSQL_TYPE_LONGLONG: - return (unsigned_flg) ? - NdbDictionary::Column::Bigunsigned : - NdbDictionary::Column::Bigint; + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Unsigned); + else + col.setType(NDBCOL::Int); + col.setLength(1); + break; case MYSQL_TYPE_INT24: - return (unsigned_flg) ? - NdbDictionary::Column::Mediumunsigned : - NdbDictionary::Column::Mediumint; + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Mediumunsigned); + else + col.setType(NDBCOL::Mediumint); + col.setLength(1); + break; + case MYSQL_TYPE_LONGLONG: + if (field->flags & UNSIGNED_FLAG) + col.setType(NDBCOL::Bigunsigned); + else + col.setType(NDBCOL::Bigint); + col.setLength(1); break; case MYSQL_TYPE_FLOAT: - return NdbDictionary::Column::Float; + col.setType(NDBCOL::Float); + col.setLength(1); + break; case MYSQL_TYPE_DOUBLE: - return NdbDictionary::Column::Double; - case MYSQL_TYPE_DATETIME : - return NdbDictionary::Column::Datetime; - case MYSQL_TYPE_DATE : - case MYSQL_TYPE_NEWDATE : - case MYSQL_TYPE_TIME : - case MYSQL_TYPE_YEAR : - // Missing NDB data types, mapped to char - return NdbDictionary::Column::Char; - case MYSQL_TYPE_ENUM : - return NdbDictionary::Column::Char; - case MYSQL_TYPE_SET : - return NdbDictionary::Column::Char; - case MYSQL_TYPE_TINY_BLOB : - case MYSQL_TYPE_MEDIUM_BLOB : - case MYSQL_TYPE_LONG_BLOB : - case MYSQL_TYPE_BLOB : - return NdbDictionary::Column::Blob; - case MYSQL_TYPE_VAR_STRING : - return NdbDictionary::Column::Varchar; - case MYSQL_TYPE_STRING : - return NdbDictionary::Column::Char; - case MYSQL_TYPE_NULL : - case MYSQL_TYPE_GEOMETRY : - return NdbDictionary::Column::Undefined; - } - return NdbDictionary::Column::Undefined; + col.setType(NDBCOL::Double); + col.setLength(1); + break; + // Date types + case MYSQL_TYPE_TIMESTAMP: + col.setType(NDBCOL::Unsigned); + col.setLength(1); + break; + case MYSQL_TYPE_DATETIME: + col.setType(NDBCOL::Datetime); + col.setLength(1); + break; + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_TIME: + case MYSQL_TYPE_YEAR: + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; + // Char types + case MYSQL_TYPE_STRING: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Binary); + else + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; + case MYSQL_TYPE_VAR_STRING: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Varbinary); + else + col.setType(NDBCOL::Varchar); + col.setLength(field->pack_length()); + break; + // Blob types (all come in as MYSQL_TYPE_BLOB) + mysql_type_tiny_blob: + case MYSQL_TYPE_TINY_BLOB: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Blob); + else + col.setType(NDBCOL::Text); + col.setInlineSize(256); + // No parts + col.setPartSize(0); + col.setStripeSize(0); + break; + mysql_type_blob: + case MYSQL_TYPE_BLOB: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Blob); + else + col.setType(NDBCOL::Text); + // Use "<=" even if "<" is the exact condition + if (field->max_length() <= (1 << 8)) + goto mysql_type_tiny_blob; + else if (field->max_length() <= (1 << 16)) + { + col.setInlineSize(256); + col.setPartSize(2000); + col.setStripeSize(16); + } + else if (field->max_length() <= (1 << 24)) + goto mysql_type_medium_blob; + else + goto mysql_type_long_blob; + break; + mysql_type_medium_blob: + case MYSQL_TYPE_MEDIUM_BLOB: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Blob); + else + col.setType(NDBCOL::Text); + col.setInlineSize(256); + col.setPartSize(4000); + col.setStripeSize(8); + break; + mysql_type_long_blob: + case MYSQL_TYPE_LONG_BLOB: + if (field->flags & BINARY_FLAG) + col.setType(NDBCOL::Blob); + else + col.setType(NDBCOL::Text); + col.setInlineSize(256); + col.setPartSize(8000); + col.setStripeSize(4); + break; + // Other types + case MYSQL_TYPE_ENUM: + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; + case MYSQL_TYPE_SET: + col.setType(NDBCOL::Char); + col.setLength(field->pack_length()); + break; + case MYSQL_TYPE_NULL: + case MYSQL_TYPE_GEOMETRY: + goto mysql_type_unsupported; + mysql_type_unsupported: + default: + return HA_ERR_UNSUPPORTED; + } + // Set nullable and pk + col.setNullable(field->maybe_null()); + col.setPrimaryKey(field->flags & PRI_KEY_FLAG); + // Set autoincrement + if (field->flags & AUTO_INCREMENT_FLAG) + { + col.setAutoIncrement(TRUE); + ulonglong value= info->auto_increment_value ? + info->auto_increment_value -1 : (ulonglong) 0; + DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value)); + col.setAutoIncrementInitialValue(value); + } + else + col.setAutoIncrement(false); + return 0; } - /* Create a table in NDB Cluster */ @@ -2308,7 +2613,6 @@ int ha_ndbcluster::create(const char *name, HA_CREATE_INFO *info) { NDBTAB tab; - NdbDictionary::Column::Type ndb_type; NDBCOL col; uint pack_length, length, i; const void *data, *pack_data; @@ -2339,31 +2643,11 @@ int ha_ndbcluster::create(const char *name, for (i= 0; i < form->fields; i++) { Field *field= form->field[i]; - ndb_type= mysql_to_ndb_type(field->real_type(), - field->flags & UNSIGNED_FLAG); DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d", field->field_name, field->real_type(), field->pack_length())); - col.setName(field->field_name); - col.setType(ndb_type); - if ((ndb_type == NdbDictionary::Column::Char) || - (ndb_type == NdbDictionary::Column::Varchar)) - col.setLength(field->pack_length()); - else - col.setLength(1); - col.setNullable(field->maybe_null()); - col.setPrimaryKey(field->flags & PRI_KEY_FLAG); - if (field->flags & AUTO_INCREMENT_FLAG) - { - col.setAutoIncrement(TRUE); - ulonglong value= info->auto_increment_value ? - info->auto_increment_value -1 : (ulonglong) 0; - DBUG_PRINT("info", ("Autoincrement key, initial: %d", value)); - col.setAutoIncrementInitialValue(value); - } - else - col.setAutoIncrement(false); - + if (my_errno= create_ndb_column(col, field, info)) + DBUG_RETURN(my_errno); tab.addColumn(col); } @@ -2631,14 +2915,15 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_table(NULL), m_table_flags(HA_REC_NOT_IN_SEQ | HA_NOT_EXACT_COUNT | - HA_NO_PREFIX_CHAR_KEYS | - HA_NO_BLOBS), + HA_NO_PREFIX_CHAR_KEYS), m_use_write(false), retrieve_all_fields(FALSE), rows_to_insert(0), rows_inserted(0), bulk_insert_rows(1024), - ops_pending(0) + ops_pending(0), + blobs_buffer(0), + blobs_buffer_size(0) { int i; @@ -2671,6 +2956,8 @@ ha_ndbcluster::~ha_ndbcluster() DBUG_ENTER("~ha_ndbcluster"); release_metadata(); + my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); + blobs_buffer= 0; // Check for open cursor/transaction DBUG_ASSERT(m_active_cursor == NULL); diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h index fc0d607abaa..e08b409059c 100644 --- a/sql/ha_ndbcluster.h +++ b/sql/ha_ndbcluster.h @@ -35,6 +35,7 @@ class NdbRecAttr; // Forward declaration class NdbResultSet; // Forward declaration class NdbScanOperation; class NdbIndexScanOperation; +class NdbBlob; typedef enum ndb_index_type { UNDEFINED_INDEX = 0, @@ -171,6 +172,7 @@ class ha_ndbcluster: public handler enum ha_rkey_function find_flag); int close_scan(); void unpack_record(byte *buf); + int get_ndb_lock_type(enum thr_lock_type type); void set_dbname(const char *pathname); void set_tabname(const char *pathname); @@ -181,7 +183,9 @@ class ha_ndbcluster: public handler int set_ndb_key(NdbOperation*, Field *field, uint fieldnr, const byte* field_ptr); int set_ndb_value(NdbOperation*, Field *field, uint fieldnr); - int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr); + int get_ndb_value(NdbOperation*, Field *field, uint fieldnr); + friend int ::get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg); + int get_ndb_blobs_value(NdbBlob *last_ndb_blob); int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key(NdbOperation *op); int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data); @@ -191,8 +195,8 @@ class ha_ndbcluster: public handler void print_results(); longlong get_auto_increment(); - int ndb_err(NdbConnection*); + bool uses_blob_value(bool all_fields); private: int check_ndb_connection(); @@ -209,13 +213,19 @@ class ha_ndbcluster: public handler NDB_SHARE *m_share; NDB_INDEX_TYPE m_indextype[MAX_KEY]; const char* m_unique_index_name[MAX_KEY]; - NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; + // NdbRecAttr has no reference to blob + typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue; + NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; bool m_use_write; bool retrieve_all_fields; ha_rows rows_to_insert; ha_rows rows_inserted; ha_rows bulk_insert_rows; ha_rows ops_pending; + bool blobs_pending; + // memory for blobs in one tuple + char *blobs_buffer; + uint32 blobs_buffer_size; }; bool ndbcluster_init(void); @@ -231,10 +241,3 @@ int ndbcluster_discover(const char* dbname, const char* name, int ndbcluster_drop_database(const char* path); void ndbcluster_print_error(int error, const NdbOperation *error_op); - - - - - - - |