diff options
-rw-r--r-- | include/ma_dyncol.h | 32 | ||||
-rw-r--r-- | mysql-test/r/cassandra.result | 143 | ||||
-rw-r--r-- | mysql-test/r/mysqld--help.result | 26 | ||||
-rw-r--r-- | mysql-test/t/cassandra.test | 126 | ||||
-rw-r--r-- | mysys/ma_dyncol.c | 289 | ||||
-rw-r--r-- | sql/sql_base.cc | 1 | ||||
-rw-r--r-- | sql/sql_base.h | 1 | ||||
-rw-r--r-- | storage/cassandra/CMakeLists.txt | 2 | ||||
-rw-r--r-- | storage/cassandra/cassandra_se.cc | 79 | ||||
-rw-r--r-- | storage/cassandra/cassandra_se.h | 19 | ||||
-rw-r--r-- | storage/cassandra/gen-cpp/cassandra_constants.cpp | 2 | ||||
-rw-r--r-- | storage/cassandra/ha_cassandra.cc | 1043 | ||||
-rw-r--r-- | storage/cassandra/ha_cassandra.h | 53 |
13 files changed, 1673 insertions, 143 deletions
diff --git a/include/ma_dyncol.h b/include/ma_dyncol.h index b4b9df7da19..2264ec6f53f 100644 --- a/include/ma_dyncol.h +++ b/include/ma_dyncol.h @@ -39,6 +39,12 @@ */ #define MAX_DYNAMIC_COLUMN_LENGTH 0X1FFFFFFFL +/* + Limits of implementation +*/ +#define MAX_NAME_LENGTH 255 +#define MAX_TOTAL_NAME_LENGTH 65535 + /* NO and OK is the same used just to show semantics */ #define ER_DYNCOL_NO ER_DYNCOL_OK @@ -50,7 +56,8 @@ enum enum_dyncol_func_result ER_DYNCOL_LIMIT= -2, /* Some limit reached */ ER_DYNCOL_RESOURCE= -3, /* Out of resourses */ ER_DYNCOL_DATA= -4, /* Incorrect input data */ - ER_DYNCOL_UNKNOWN_CHARSET= -5 /* Unknown character set */ + ER_DYNCOL_UNKNOWN_CHARSET= -5, /* Unknown character set */ + ER_DYNCOL_TRUNCATED= 2 /* OK, but data was truncated */ }; typedef DYNAMIC_STRING DYNAMIC_COLUMN; @@ -81,6 +88,7 @@ struct st_dynamic_column_value struct { LEX_STRING value; CHARSET_INFO *charset; + my_bool nonfreeable; } string; struct { decimal_digit_t buffer[DECIMAL_BUFF_LENGTH]; @@ -108,6 +116,13 @@ dynamic_column_create_many_fmt(DYNAMIC_COLUMN *str, uchar *column_keys, DYNAMIC_COLUMN_VALUE *values, my_bool names); +enum enum_dyncol_func_result +dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str, + uint column_count, + void *column_keys, + DYNAMIC_COLUMN_VALUE *values, + my_bool new_str, + my_bool string_keys); enum enum_dyncol_func_result dynamic_column_update(DYNAMIC_COLUMN *org, uint column_nr, @@ -163,6 +178,21 @@ dynamic_column_json(DYNAMIC_COLUMN *str, DYNAMIC_STRING *json); #define dynamic_column_initialize(A) memset((A), 0, sizeof(*(A))) #define dynamic_column_column_free(V) dynstr_free(V) +/* conversion of values to 3 base types */ +enum enum_dyncol_func_result +dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val, + CHARSET_INFO *cs, my_bool quote); +enum enum_dyncol_func_result +dynamic_column_val_long(longlong *ll, DYNAMIC_COLUMN_VALUE *val); +enum enum_dyncol_func_result +dynamic_column_val_double(double *dbl, DYNAMIC_COLUMN_VALUE *val); + + +enum enum_dyncol_func_result +dynamic_column_vals(DYNAMIC_COLUMN *str, + DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals, + char **free_names); + /*************************************************************************** Internal functions, don't use if you don't know what you are doing... ***************************************************************************/ diff --git a/mysql-test/r/cassandra.result b/mysql-test/r/cassandra.result index 07720bb5b23..3cf286013b8 100644 --- a/mysql-test/r/cassandra.result +++ b/mysql-test/r/cassandra.result @@ -365,8 +365,9 @@ CREATE TABLE t2 (rowkey bigint PRIMARY KEY, datecol bigint) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4'; select * from t2; rowkey datecol -1 1346189025000 -10 1346189026000 +1 1346192625000 +10 1346192626000 +delete from t2; drop table t2; # # Check whether changing parameters with ALTER TABLE works. @@ -407,3 +408,141 @@ new-rowkey12 data1-value3 454 rowkey11 updated-1 34543 delete from t1; drop table t1; +# +# Dynamic columns support +# +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; +drop table t2; +#error: dynamic column is not a blob +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36) DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; +ERROR 42000: Incorrect column specifier for column 'uuidcol' +#error: double dynamic column +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1, textcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; +ERROR 42000: Incorrect column specifier for column 'textcol' +# +# Dynamic column read +# +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA +thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; +delete from t2; +insert into t2 values(1,'9b5658dc-f32f-11e1-94cd-f46d046e9f09'); +insert into t2 values(2,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a'); +drop table t2; +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; +select rowkey, column_list(dyn), column_get(dyn, 'uuidcol' as char) from t2; +rowkey column_list(dyn) column_get(dyn, 'uuidcol' as char) +1 `uuidcol` 9b5658dc-f32f-11e1-94cd-f46d046e9f09 +2 `uuidcol` 9b5658dc-f32f-11e1-94cd-f46d046e9f0a +drop table t2; +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA +thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; +delete from t2; +drop table t2; +# +# Dynamic column insert +# +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; +insert into t2 values (1, column_create("dyn1", 1, "dyn2", "two")); +select rowkey, column_json(dyn) from t2; +rowkey column_json(dyn) +1 [{"dyn1":"1"},{"dyn2":"two"}] +delete from t2; +drop table t2; +# bigint +CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2'; +insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'a', 254324)); +insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'a', 2543)); +select rowkey, column_json(dyn) from t1; +rowkey column_json(dyn) +1 [{"a":254324},{"dyn1":"1"},{"dyn2":"two"}] +2 [{"a":2543},{"dyn1":"1"},{"dyn2":"two"}] +delete from t1; +drop table t1; +# int +CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf3'; +insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'intcol', 254324)); +insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'intcol', 2543)); +select rowkey, column_json(dyn) from t1; +rowkey column_json(dyn) +1 [{"dyn1":"1"},{"dyn2":"two"},{"intcol":254324}] +2 [{"dyn1":"1"},{"dyn2":"two"},{"intcol":2543}] +delete from t1; +drop table t1; +# timestamp +CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4'; +insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'datecol', 254324)); +insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'datecol', 2543)); +select rowkey, column_json(dyn) from t1; +rowkey column_json(dyn) +1 [{"dyn1":"1"},{"dyn2":"two"},{"datecol":254324}] +2 [{"dyn1":"1"},{"dyn2":"two"},{"datecol":2543}] +delete from t1; +drop table t1; +# boolean +CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf7'; +insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 254324)); +insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 0)); +select rowkey, column_json(dyn) from t1; +rowkey column_json(dyn) +1 [{"dyn1":"1"},{"dyn2":"two"},{"boolcol":1}] +2 [{"dyn1":"1"},{"dyn2":"two"},{"boolcol":0}] +select rowkey, column_json(dyn) from t1; +rowkey column_json(dyn) +1 [{"dyn1":"1"},{"dyn2":"two"},{"boolcol":1}] +2 [{"dyn1":"1"},{"dyn2":"two"},{"boolcol":0}] +update t1 set dyn=column_add(dyn, "dyn2", null, "dyn3", "3"); +select rowkey, column_json(dyn) from t1; +rowkey column_json(dyn) +1 [{"dyn1":"1"},{"dyn3":"3"},{"boolcol":1}] +2 [{"dyn1":"1"},{"dyn3":"3"},{"boolcol":0}] +update t1 set dyn=column_add(dyn, "dyn1", null) where rowkey= 1; +select rowkey, column_json(dyn) from t1; +rowkey column_json(dyn) +1 [{"dyn3":"3"},{"boolcol":1}] +2 [{"dyn1":"1"},{"dyn3":"3"},{"boolcol":0}] +update t1 set dyn=column_add(dyn, "dyn3", null, "a", "ddd"); +select rowkey, column_json(dyn) from t1; +rowkey column_json(dyn) +1 [{"a":"ddd"},{"boolcol":1}] +2 [{"a":"ddd"},{"dyn1":"1"},{"boolcol":0}] +update t1 set dyn=column_add(dyn, "12345678901234", "ddd"); +select rowkey, column_json(dyn) from t1; +rowkey column_json(dyn) +1 [{"a":"ddd"},{"boolcol":1},{"12345678901234":"ddd"}] +2 [{"a":"ddd"},{"dyn1":"1"},{"boolcol":0},{"12345678901234":"ddd"}] +update t1 set dyn=column_add(dyn, "12345678901234", null); +select rowkey, column_json(dyn) from t1; +rowkey column_json(dyn) +1 [{"a":"ddd"},{"boolcol":1}] +2 [{"a":"ddd"},{"dyn1":"1"},{"boolcol":0}] +update t1 set dyn=column_add(dyn, 'boolcol', null) where rowkey= 2; +select rowkey, column_json(dyn) from t1; +rowkey column_json(dyn) +1 [{"a":"ddd"},{"boolcol":1}] +2 [{"a":"ddd"},{"dyn1":"1"}] +update t1 set rowkey= 3, dyn=column_add(dyn, "dyn1", null, 'boolcol', 0) where rowkey= 2; +select rowkey, column_json(dyn) from t1; +rowkey column_json(dyn) +1 [{"a":"ddd"},{"boolcol":1}] +3 [{"a":"ddd"},{"boolcol":0}] +delete from t1; +drop table t1; +CREATE TABLE t1 (rowkey varchar(10) PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd1'; +select * from t1; +ERROR HY000: Internal error: 'Unable to convert value for field `dyn` from Cassandra's data format. Name length exceed limit of 255: 'very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_ver' +drop table t1; +CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) +ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd2'; +DELETE FROM t1; +insert into t1 values (1, column_create("dyn", 1)); +select rowkey, column_list(dyn) from t1; +rowkey column_list(dyn) +1 `dyn` +delete from t1; +DROP TABLE t1; +CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) +ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd2'; +insert into t1 values (1,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a'); +ERROR HY000: Encountered illegal format of dynamic column string +delete from t1; +DROP TABLE t1; diff --git a/mysql-test/r/mysqld--help.result b/mysql-test/r/mysqld--help.result index ad55bfa3003..e3c08d80876 100644 --- a/mysql-test/r/mysqld--help.result +++ b/mysql-test/r/mysqld--help.result @@ -89,6 +89,24 @@ The following options may be given as the first argument: --bulk-insert-buffer-size=# Size of tree cache used in bulk insert optimisation. Note that this is a limit per thread! + --cassandra[=name] Enable or disable CASSANDRA plugin. Possible values are + ON, OFF, FORCE (don't start if the plugin fails to load). + --cassandra-default-thrift-host=name + Default host for Cassandra thrift connections + --cassandra-failure-retries=# + Number of times to retry Cassandra calls that failed due + to timeouts or network communication problems. The + default, 0, means not to retry. + --cassandra-insert-batch-size=# + Number of rows in an INSERT batch + --cassandra-multiget-batch-size=# + Number of rows in a multiget(MRR) batch + --cassandra-read-consistency=name + Cassandra consistency level to use for read operations + --cassandra-rnd-batch-size=# + Number of rows in an rnd_read (full scan) batch + --cassandra-write-consistency=name + Cassandra consistency level to use for write operations --character-set-client-handshake Don't ignore client side character set value sent during handshake. @@ -863,6 +881,14 @@ binlog-optimize-thread-scheduling TRUE binlog-row-event-max-size 1024 binlog-stmt-cache-size 32768 bulk-insert-buffer-size 8388608 +cassandra ON +cassandra-default-thrift-host (No default value) +cassandra-failure-retries 0 +cassandra-insert-batch-size 100 +cassandra-multiget-batch-size 100 +cassandra-read-consistency ONE +cassandra-rnd-batch-size 10000 +cassandra-write-consistency ONE character-set-client-handshake TRUE character-set-filesystem binary character-set-server latin1 diff --git a/mysql-test/t/cassandra.test b/mysql-test/t/cassandra.test index 7e5b327580c..b0b29c52f21 100644 --- a/mysql-test/t/cassandra.test +++ b/mysql-test/t/cassandra.test @@ -94,6 +94,18 @@ CREATE COLUMN FAMILY cf10 WITH comparator = UTF8Type AND key_validation_class=UTF8Type AND default_validation_class = UTF8Type; + +CREATE COLUMN FAMILY cfd1 + WITH comparator = UTF8Type + AND key_validation_class=UTF8Type + AND default_validation_class = UTF8Type; +SET cfd1['1']['very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_long_name']='1'; + +CREATE COLUMN FAMILY cfd2 + WITH comparator = UTF8Type + AND key_validation_class=Int32Type + AND default_validation_class = UTF8Type; + EOF --error 0,1,2 @@ -463,7 +475,7 @@ drop table t2; CREATE TABLE t2 (rowkey bigint PRIMARY KEY, datecol bigint) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4'; select * from t2; - +delete from t2; drop table t2; --echo # @@ -511,6 +523,118 @@ select * from t1; delete from t1; drop table t1; +--echo # +--echo # Dynamic columns support +--echo # +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; +drop table t2; + +--echo #error: dynamic column is not a blob +--error ER_WRONG_FIELD_SPEC +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36) DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; + +--echo #error: double dynamic column +--error ER_WRONG_FIELD_SPEC +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol blob DYNAMIC_COLUMN_STORAGE=1, textcol blob DYNAMIC_COLUMN_STORAGE=1) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; + +--echo # +--echo # Dynamic column read +--echo # +#prepare data +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA + thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; +delete from t2; +insert into t2 values(1,'9b5658dc-f32f-11e1-94cd-f46d046e9f09'); +insert into t2 values(2,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a'); +drop table t2; + +#test dynamic column read +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; +select rowkey, column_list(dyn), column_get(dyn, 'uuidcol' as char) from t2; +drop table t2; + +#cleanup data +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA + thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; +delete from t2; +drop table t2; + +--echo # +--echo # Dynamic column insert +--echo # +CREATE TABLE t2 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5'; +insert into t2 values (1, column_create("dyn1", 1, "dyn2", "two")); +select rowkey, column_json(dyn) from t2; +delete from t2; +drop table t2; +--echo # bigint +CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2'; +insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'a', 254324)); +insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'a', 2543)); +select rowkey, column_json(dyn) from t1; +delete from t1; +drop table t1; +--echo # int +CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf3'; +insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'intcol', 254324)); +insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'intcol', 2543)); +select rowkey, column_json(dyn) from t1; +delete from t1; +drop table t1; +--echo # timestamp +CREATE TABLE t1 (rowkey bigint PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf4'; +insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'datecol', 254324)); +insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'datecol', 2543)); +select rowkey, column_json(dyn) from t1; +delete from t1; +drop table t1; +--echo # boolean +CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf7'; +insert into t1 values (1, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 254324)); +insert into t1 values (2, column_create("dyn1", 1, "dyn2", "two", 'boolcol', 0)); +select rowkey, column_json(dyn) from t1; +select rowkey, column_json(dyn) from t1; +update t1 set dyn=column_add(dyn, "dyn2", null, "dyn3", "3"); +select rowkey, column_json(dyn) from t1; +update t1 set dyn=column_add(dyn, "dyn1", null) where rowkey= 1; +select rowkey, column_json(dyn) from t1; +update t1 set dyn=column_add(dyn, "dyn3", null, "a", "ddd"); +select rowkey, column_json(dyn) from t1; +update t1 set dyn=column_add(dyn, "12345678901234", "ddd"); +select rowkey, column_json(dyn) from t1; +update t1 set dyn=column_add(dyn, "12345678901234", null); +select rowkey, column_json(dyn) from t1; +update t1 set dyn=column_add(dyn, 'boolcol', null) where rowkey= 2; +select rowkey, column_json(dyn) from t1; +update t1 set rowkey= 3, dyn=column_add(dyn, "dyn1", null, 'boolcol', 0) where rowkey= 2; +select rowkey, column_json(dyn) from t1; +delete from t1; +drop table t1; + +CREATE TABLE t1 (rowkey varchar(10) PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd1'; +--error ER_INTERNAL_ERROR +select * from t1; +drop table t1; + +# MDEV-560 +CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) +ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd2'; +DELETE FROM t1; +insert into t1 values (1, column_create("dyn", 1)); +select rowkey, column_list(dyn) from t1; +# Cleanup +delete from t1; +DROP TABLE t1; + +# MDEV-561 (incorrect format data to dynamic column) +CREATE TABLE t1 (rowkey int PRIMARY KEY, dyn blob DYNAMIC_COLUMN_STORAGE=yes) +ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cfd2'; +--error ER_DYN_COL_WRONG_FORMAT +insert into t1 values (1,'9b5658dc-f32f-11e1-94cd-f46d046e9f0a'); +delete from t1; +DROP TABLE t1; + + ############################################################################ ## Cassandra cleanup ############################################################################ diff --git a/mysys/ma_dyncol.c b/mysys/ma_dyncol.c index 70ae4935528..e7c9b835454 100644 --- a/mysys/ma_dyncol.c +++ b/mysys/ma_dyncol.c @@ -68,6 +68,8 @@ uint32 copy_and_convert(char *to, uint32 to_length, CHARSET_INFO *to_cs, #define MAX_OFFSET_LENGTH 5 +#define DYNCOL_NUM_CHAR 6 + my_bool dynamic_column_has_names(DYNAMIC_COLUMN *str) { if (str->length < 1) @@ -211,7 +213,7 @@ static my_bool check_limit_num(const void *val) static my_bool check_limit_str(const void *val) { - return (*((LEX_STRING **)val))->length > 255; + return (*((LEX_STRING **)val))->length > MAX_NAME_LENGTH; } @@ -288,7 +290,7 @@ my_bool put_header_entry_str(DYN_HEADER *hdr, size_t offset) { LEX_STRING *column_name= (LEX_STRING *)column_key; - DBUG_ASSERT(column_name->length <= 255); + DBUG_ASSERT(column_name->length <= MAX_NAME_LENGTH); hdr->entry[0]= column_name->length; DBUG_ASSERT(hdr->name - hdr->nmpool < (long) 0x10000L); int2store(hdr->entry + 1, hdr->name - hdr->nmpool); @@ -1381,6 +1383,9 @@ dynamic_new_column_store(DYNAMIC_COLUMN *str, DYNCOL_SYZERESERVE)) goto err; } + if (!column_count) + return ER_DYNCOL_OK; + bzero(str->str, fmt->fixed_hdr); str->length= fmt->fixed_hdr; @@ -1501,7 +1506,7 @@ calc_var_sizes(DYN_HEADER *hdr, @return ER_DYNCOL_* return code */ -static enum enum_dyncol_func_result +enum enum_dyncol_func_result dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str, uint column_count, void *column_keys, @@ -1761,7 +1766,7 @@ static my_bool find_column(DYN_HEADER *hdr, uint numkey, LEX_STRING *strkey) { LEX_STRING nmkey; - char nmkeybuff[6]; /* to fit max 2 bytes number */ + char nmkeybuff[DYNCOL_NUM_CHAR]; /* to fit max 2 bytes number */ DBUG_ASSERT(hdr->header != NULL); if (hdr->header + hdr->header_size > hdr->data_end) @@ -2169,10 +2174,10 @@ dynamic_column_list_str(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array_of_lexstr) if (header.format == DYNCOL_FMT_NUM) { uint nm= uint2korr(read); - tmp.str= my_malloc(6, MYF(0)); + tmp.str= my_malloc(DYNCOL_NUM_CHAR, MYF(0)); if (!tmp.str) return ER_DYNCOL_RESOURCE; - tmp.length= snprintf(tmp.str, 6, "%u", nm); + tmp.length= snprintf(tmp.str, DYNCOL_NUM_CHAR, "%u", nm); } else { @@ -2208,7 +2213,7 @@ find_place(DYN_HEADER *hdr, void *key, my_bool string_keys) uint mid, start, end, val; int flag; LEX_STRING str; - char buff[6]; + char buff[DYNCOL_NUM_CHAR]; my_bool need_conversion= ((string_keys ? DYNCOL_FMT_STR : DYNCOL_FMT_NUM) != hdr->format); LINT_INIT(flag); /* 100 % safe */ @@ -2425,7 +2430,7 @@ dynamic_column_update_copy(DYNAMIC_COLUMN *str, PLAN *plan, size_t offs; uint nm; DYNAMIC_COLUMN_TYPE tp; - char buff[6]; + char buff[DYNCOL_NUM_CHAR]; if (hdr->format == DYNCOL_FMT_NUM) { @@ -3438,7 +3443,7 @@ end: enum enum_dyncol_func_result dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val, - my_bool quote) + CHARSET_INFO *cs, my_bool quote) { char buff[40]; int len; @@ -3468,24 +3473,22 @@ dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val, char *alloc= NULL; char *from= val->x.string.value.str; uint bufflen; - my_bool conv= !my_charset_same(val->x.string.charset, - &my_charset_utf8_general_ci); + my_bool conv= !my_charset_same(val->x.string.charset, cs); my_bool rc; len= val->x.string.value.length; - bufflen= (len * (conv ? my_charset_utf8_general_ci.mbmaxlen : 1)); + bufflen= (len * (conv ? cs->mbmaxlen : 1)); if (dynstr_realloc(str, bufflen)) return ER_DYNCOL_RESOURCE; // guaranty UTF-8 string for value - if (!my_charset_same(val->x.string.charset, - &my_charset_utf8_general_ci)) + if (!my_charset_same(val->x.string.charset, cs)) { uint dummy_errors; if (!quote) { /* convert to the destination */ str->length+= copy_and_convert_extended(str->str, bufflen, - &my_charset_utf8_general_ci, + cs, from, len, val->x.string.charset, &dummy_errors); @@ -3494,8 +3497,7 @@ dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val, if ((alloc= (char *)my_malloc(bufflen, MYF(0)))) { len= - copy_and_convert_extended(alloc, bufflen, - &my_charset_utf8_general_ci, + copy_and_convert_extended(alloc, bufflen, cs, from, len, val->x.string.charset, &dummy_errors); from= alloc; @@ -3543,6 +3545,155 @@ dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val, return(ER_DYNCOL_OK); } + +enum enum_dyncol_func_result +dynamic_column_val_long(longlong *ll, DYNAMIC_COLUMN_VALUE *val) +{ + enum enum_dyncol_func_result rc= ER_DYNCOL_OK; + *ll= 0; + switch (val->type) { + case DYN_COL_INT: + *ll= val->x.long_value; + break; + case DYN_COL_UINT: + *ll= (longlong)val->x.ulong_value; + if (val->x.ulong_value > ULONGLONG_MAX) + rc= ER_DYNCOL_TRUNCATED; + break; + case DYN_COL_DOUBLE: + *ll= (longlong)val->x.double_value; + if (((double) *ll) != val->x.double_value) + rc= ER_DYNCOL_TRUNCATED; + break; + case DYN_COL_STRING: + { + longlong i= 0, sign= 1; + char *src= val->x.string.value.str; + uint len= val->x.string.value.length; + + while (len && my_isspace(&my_charset_latin1, *src)) src++,len--; + + if (len) + { + if (*src == '-') + { + sign= -1; + src++; + } else if (*src == '-') + src++; + while(len && my_isdigit(&my_charset_latin1, *src)) + { + i= i * 10 + (*src - '0'); + src++; + } + } + else + rc= ER_DYNCOL_TRUNCATED; + if (len) + rc= ER_DYNCOL_TRUNCATED; + *ll= i * sign; + break; + } + case DYN_COL_DECIMAL: + if (decimal2longlong(&val->x.decimal.value, ll) != E_DEC_OK) + rc= ER_DYNCOL_TRUNCATED; + break; + case DYN_COL_DATETIME: + *ll= (val->x.time_value.year * 10000000000L + + val->x.time_value.month * 100000000L + + val->x.time_value.day * 1000000 + + val->x.time_value.hour * 10000 + + val->x.time_value.minute * 100 + + val->x.time_value.second) * + (val->x.time_value.neg ? -1 : 1); + break; + case DYN_COL_DATE: + *ll= (val->x.time_value.year * 10000 + + val->x.time_value.month * 100 + + val->x.time_value.day) * + (val->x.time_value.neg ? -1 : 1); + break; + case DYN_COL_TIME: + *ll= (val->x.time_value.hour * 10000 + + val->x.time_value.minute * 100 + + val->x.time_value.second) * + (val->x.time_value.neg ? -1 : 1); + break; + case DYN_COL_NULL: + rc= ER_DYNCOL_TRUNCATED; + break; + default: + return(ER_DYNCOL_FORMAT); + } + return(rc); +} + + +enum enum_dyncol_func_result +dynamic_column_val_double(double *dbl, DYNAMIC_COLUMN_VALUE *val) +{ + enum enum_dyncol_func_result rc= ER_DYNCOL_OK; + *dbl= 0; + switch (val->type) { + case DYN_COL_INT: + *dbl= (double)val->x.long_value; + if (((longlong) *dbl) != val->x.long_value) + rc= ER_DYNCOL_TRUNCATED; + break; + case DYN_COL_UINT: + *dbl= (double)val->x.ulong_value; + if (((ulonglong) *dbl) != val->x.ulong_value) + rc= ER_DYNCOL_TRUNCATED; + break; + case DYN_COL_DOUBLE: + *dbl= val->x.double_value; + break; + case DYN_COL_STRING: + { + char *str, *end; + if ((str= malloc(val->x.string.value.length + 1))) + return ER_DYNCOL_RESOURCE; + memcpy(str, val->x.string.value.str, val->x.string.value.length); + str[val->x.string.value.length]= '\0'; + *dbl= strtod(str, &end); + if (*end != '\0') + rc= ER_DYNCOL_TRUNCATED; + } + case DYN_COL_DECIMAL: + if (decimal2double(&val->x.decimal.value, dbl) != E_DEC_OK) + rc= ER_DYNCOL_TRUNCATED; + break; + case DYN_COL_DATETIME: + *dbl= (double)(val->x.time_value.year * 10000000000L + + val->x.time_value.month * 100000000L + + val->x.time_value.day * 1000000 + + val->x.time_value.hour * 10000 + + val->x.time_value.minute * 100 + + val->x.time_value.second) * + (val->x.time_value.neg ? -1 : 1); + break; + case DYN_COL_DATE: + *dbl= (double)(val->x.time_value.year * 10000 + + val->x.time_value.month * 100 + + val->x.time_value.day) * + (val->x.time_value.neg ? -1 : 1); + break; + case DYN_COL_TIME: + *dbl= (double)(val->x.time_value.hour * 10000 + + val->x.time_value.minute * 100 + + val->x.time_value.second) * + (val->x.time_value.neg ? -1 : 1); + break; + case DYN_COL_NULL: + rc= ER_DYNCOL_TRUNCATED; + break; + default: + return(ER_DYNCOL_FORMAT); + } + return(rc); +} + + /** Convert to JSON @@ -3602,10 +3753,11 @@ dynamic_column_json(DYNAMIC_COLUMN *str, DYNAMIC_STRING *json) if (header.format == DYNCOL_FMT_NUM) { uint nm= uint2korr(header.entry); - if (dynstr_realloc(json, 6 + 3)) + if (dynstr_realloc(json, DYNCOL_NUM_CHAR + 3)) goto err; json->str[json->length++]= '"'; - json->length+= (snprintf(json->str + json->length, 6, "%u", nm)); + json->length+= (snprintf(json->str + json->length, + DYNCOL_NUM_CHAR, "%u", nm)); } else { @@ -3619,7 +3771,8 @@ dynamic_column_json(DYNAMIC_COLUMN *str, DYNAMIC_STRING *json) } json->str[json->length++]= '"'; json->str[json->length++]= ':'; - if ((rc= dynamic_column_val_str(json, &val, TRUE)) < 0 || + if ((rc= dynamic_column_val_str(json, &val, + &my_charset_utf8_general_ci, TRUE)) < 0 || dynstr_append_mem(json, "}", 1)) goto err; } @@ -3631,3 +3784,99 @@ err: json->length= 0; return rc; } + + +/** + Convert to DYNAMIC_COLUMN_VALUE values and names (LEX_STING) dynamic array + + @param str The packed string + @param names Where to put names + @param vals Where to put values + @param free_names pointer to free names buffer if there is it. + + @return ER_DYNCOL_* return code +*/ + +enum enum_dyncol_func_result +dynamic_column_vals(DYNAMIC_COLUMN *str, + DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals, + char **free_names) +{ + DYN_HEADER header; + char *nm; + uint i; + enum enum_dyncol_func_result rc; + + *free_names= 0; + bzero(names, sizeof(DYNAMIC_ARRAY)); /* In case of errors */ + bzero(vals, sizeof(DYNAMIC_ARRAY)); /* In case of errors */ + if (str->length == 0) + return ER_DYNCOL_OK; /* no columns */ + + if ((rc= init_read_hdr(&header, str)) < 0) + return rc; + + if (header.entry_size * header.column_count + FIXED_HEADER_SIZE > + str->length) + return ER_DYNCOL_FORMAT; + + if (init_dynamic_array(names, sizeof(LEX_STRING), + header.column_count, 0) || + init_dynamic_array(vals, sizeof(DYNAMIC_COLUMN_VALUE), + header.column_count, 0) || + (header.format == DYNCOL_FMT_NUM && + !(*free_names= (char *)malloc(DYNCOL_NUM_CHAR * header.column_count)))) + { + rc= ER_DYNCOL_RESOURCE; + goto err; + } + nm= *free_names; + + for (i= 0, header.entry= header.header; + i < header.column_count; + i++, header.entry+= header.entry_size) + { + DYNAMIC_COLUMN_VALUE val; + LEX_STRING name; + header.length= + hdr_interval_length(&header, header.entry + header.entry_size); + header.data= header.dtpool + header.offset; + /* + Check that the found data is withing the ranges. This can happen if + we get data with wrong offsets. + */ + if (header.length == DYNCOL_OFFSET_ERROR || + header.length > INT_MAX || header.offset > header.data_size) + { + rc= ER_DYNCOL_FORMAT; + goto err; + } + if ((rc= dynamic_column_get_value(&header, &val)) < 0) + goto err; + + if (header.format == DYNCOL_FMT_NUM) + { + uint num= uint2korr(header.entry); + name.str= nm; + name.length= snprintf(nm, DYNCOL_NUM_CHAR, "%u", num); + nm+= name.length + 1; + } + else + { + name.length= header.entry[0]; + name.str= (char *)header.nmpool + uint2korr(header.entry + 1); + } + /* following is preallocated and so do not fail */ + (void) insert_dynamic(names, (uchar *)&name); + (void) insert_dynamic(vals, (uchar *)&val); + } + return ER_DYNCOL_OK; + +err: + delete_dynamic(names); + delete_dynamic(vals); + if (*free_names) + my_free(*free_names); + *free_names= 0; + return rc; +} diff --git a/sql/sql_base.cc b/sql/sql_base.cc index acd330dd4d2..8f793f641ce 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -9773,6 +9773,7 @@ int dynamic_column_error_message(enum_dyncol_func_result rc) switch (rc) { case ER_DYNCOL_YES: case ER_DYNCOL_OK: + case ER_DYNCOL_TRUNCATED: break; // it is not an error case ER_DYNCOL_FORMAT: my_error(ER_DYN_COL_WRONG_FORMAT, MYF(0)); diff --git a/sql/sql_base.h b/sql/sql_base.h index 3deb97c9730..cb7abc972ea 100644 --- a/sql/sql_base.h +++ b/sql/sql_base.h @@ -272,6 +272,7 @@ bool rename_temporary_table(THD* thd, TABLE *table, const char *new_db, const char *table_name); bool is_equal(const LEX_STRING *a, const LEX_STRING *b); +class Open_tables_backup; /* Functions to work with system tables. */ bool open_system_tables_for_read(THD *thd, TABLE_LIST *table_list, Open_tables_backup *backup); diff --git a/storage/cassandra/CMakeLists.txt b/storage/cassandra/CMakeLists.txt index 7986b0244bb..f11dfaf4a29 100644 --- a/storage/cassandra/CMakeLists.txt +++ b/storage/cassandra/CMakeLists.txt @@ -12,7 +12,7 @@ SET(cassandra_sources gen-cpp/Cassandra.h) #INCLUDE_DIRECTORIES(BEFORE ${Boost_INCLUDE_DIRS}) -INCLUDE_DIRECTORIES(AFTER /home/psergey/cassandra/thrift/include/thrift/) +INCLUDE_DIRECTORIES(AFTER /usr/local/include/thrift) # STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc index 7d48bbc30d6..99a9a08b69d 100644 --- a/storage/cassandra/cassandra_se.cc +++ b/storage/cassandra/cassandra_se.cc @@ -17,6 +17,12 @@ #include "cassandra_se.h" +struct st_mysql_lex_string +{ + char *str; + size_t length; +}; + using namespace std; using namespace apache::thrift; using namespace apache::thrift::transport; @@ -74,6 +80,7 @@ class Cassandra_se_impl: public Cassandra_se_interface std::string rowkey; /* key of the record we're returning now */ SlicePredicate slice_pred; + SliceRange slice_pred_sr; bool get_slices_returned_less; bool get_slice_found_rows; public: @@ -91,6 +98,8 @@ public: void first_ddl_column(); bool next_ddl_column(char **name, int *name_len, char **value, int *value_len); void get_rowkey_type(char **name, char **type); + size_t get_ddl_size(); + const char* get_default_validator(); /* Settings */ void set_consistency_levels(ulong read_cons_level, ulong write_cons_level); @@ -98,15 +107,19 @@ public: /* Writes */ void clear_insert_buffer(); void start_row_insert(const char *key, int key_len); - void add_insert_column(const char *name, const char *value, int value_len); + void add_insert_column(const char *name, int name_len, + const char *value, int value_len); + void add_insert_delete_column(const char *name, int name_len); void add_row_deletion(const char *key, int key_len, - Column_name_enumerator *col_names); - + Column_name_enumerator *col_names, + LEX_STRING *names, uint nnames); + bool do_insert(); /* Reads, point lookups */ bool get_slice(char *key, size_t key_len, bool *found); - bool get_next_read_column(char **name, char **value, int *value_len); + bool get_next_read_column(char **name, int *name_len, + char **value, int *value_len ); void get_read_rowkey(char **value, int *value_len); /* Reads, multi-row scans */ @@ -122,6 +135,7 @@ public: /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */ void clear_read_columns(); + void clear_read_all_columns(); void add_read_column(const char *name); /* Reads, MRR scans */ @@ -277,6 +291,16 @@ void Cassandra_se_impl::get_rowkey_type(char **name, char **type) *name= NULL; } +size_t Cassandra_se_impl::get_ddl_size() +{ + return cf_def.column_metadata.size(); +} + +const char* Cassandra_se_impl::get_default_validator() +{ + return cf_def.default_validation_class.c_str(); +} + ///////////////////////////////////////////////////////////////////////////// // Data writes @@ -315,8 +339,9 @@ void Cassandra_se_impl::start_row_insert(const char *key, int key_len) } -void Cassandra_se_impl::add_row_deletion(const char *key, int key_len, - Column_name_enumerator *col_names) +void Cassandra_se_impl::add_row_deletion(const char *key, int key_len, + Column_name_enumerator *col_names, + LEX_STRING *names, uint nnames) { std::string key_to_delete; key_to_delete.assign(key, key_len); @@ -344,6 +369,9 @@ void Cassandra_se_impl::add_row_deletion(const char *key, int key_len, const char *col_name; while ((col_name= col_names->get_next_name())) slice_pred.column_names.push_back(std::string(col_name)); + for (uint i= 0; i < nnames; i++) + slice_pred.column_names.push_back(std::string(names[i].str, + names[i].length)); mut.deletion.predicate= slice_pred; @@ -351,7 +379,9 @@ void Cassandra_se_impl::add_row_deletion(const char *key, int key_len, } -void Cassandra_se_impl::add_insert_column(const char *name, const char *value, +void Cassandra_se_impl::add_insert_column(const char *name, + int name_len, + const char *value, int value_len) { Mutation mut; @@ -359,7 +389,10 @@ void Cassandra_se_impl::add_insert_column(const char *name, const char *value, mut.column_or_supercolumn.__isset.column= true; Column& col=mut.column_or_supercolumn.column; - col.name.assign(name); + if (name_len) + col.name.assign(name, name_len); + else + col.name.assign(name); col.value.assign(value, value_len); col.timestamp= insert_timestamp; col.__isset.value= true; @@ -367,6 +400,23 @@ void Cassandra_se_impl::add_insert_column(const char *name, const char *value, insert_list->push_back(mut); } +void Cassandra_se_impl::add_insert_delete_column(const char *name, + int name_len) +{ + Mutation mut; + mut.__isset.deletion= true; + mut.deletion.__isset.timestamp= true; + mut.deletion.timestamp= insert_timestamp; + mut.deletion.__isset.predicate= true; + + SlicePredicate slice_pred; + slice_pred.__isset.column_names= true; + slice_pred.column_names.push_back(std::string(name, name_len)); + mut.deletion.predicate= slice_pred; + + insert_list->push_back(mut); +} + bool Cassandra_se_impl::retryable_do_insert() { @@ -444,8 +494,8 @@ bool Cassandra_se_impl::retryable_get_slice() } -bool Cassandra_se_impl::get_next_read_column(char **name, char **value, - int *value_len) +bool Cassandra_se_impl::get_next_read_column(char **name, int *name_len, + char **value, int *value_len) { bool use_counter=false; while (1) @@ -468,12 +518,14 @@ bool Cassandra_se_impl::get_next_read_column(char **name, char **value, ColumnOrSuperColumn& cs= *column_data_it; if (use_counter) { + *name_len= cs.counter_column.name.size(); *name= (char*)cs.counter_column.name.c_str(); *value= (char*)&cs.counter_column.value; *value_len= sizeof(cs.counter_column.value); } else { + *name_len= cs.column.name.size(); *name= (char*)cs.column.name.c_str(); *value= (char*)cs.column.value.c_str(); *value_len= cs.column.value.length(); @@ -601,6 +653,13 @@ void Cassandra_se_impl::clear_read_columns() slice_pred.column_names.clear(); } +void Cassandra_se_impl::clear_read_all_columns() +{ + slice_pred_sr.start = ""; + slice_pred_sr.finish = ""; + slice_pred.__set_slice_range(slice_pred_sr); +} + void Cassandra_se_impl::add_read_column(const char *name_arg) { diff --git a/storage/cassandra/cassandra_se.h b/storage/cassandra/cassandra_se.h index 33c602d93a6..f74d8cbd909 100644 --- a/storage/cassandra/cassandra_se.h +++ b/storage/cassandra/cassandra_se.h @@ -6,6 +6,8 @@ both together causes compile errors due to conflicts). */ +struct st_mysql_lex_string; +typedef struct st_mysql_lex_string LEX_STRING; /* We need to define this here so that ha_cassandra.cc also has access to it */ typedef enum @@ -50,19 +52,25 @@ public: virtual bool next_ddl_column(char **name, int *name_len, char **value, int *value_len)=0; virtual void get_rowkey_type(char **name, char **type)=0; + virtual size_t get_ddl_size()=0; + virtual const char* get_default_validator()=0; /* Writes */ virtual void clear_insert_buffer()=0; virtual void add_row_deletion(const char *key, int key_len, - Column_name_enumerator *col_names)=0; + Column_name_enumerator *col_names, + LEX_STRING *names, uint nnames)=0; virtual void start_row_insert(const char *key, int key_len)=0; - virtual void add_insert_column(const char *name, const char *value, + virtual void add_insert_delete_column(const char *name, int name_len)= 0; + virtual void add_insert_column(const char *name, int name_len, + const char *value, int value_len)=0; virtual bool do_insert()=0; /* Reads */ virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ; - virtual bool get_next_read_column(char **name, char **value, int *value_len)=0; + virtual bool get_next_read_column(char **name, int *name_len, + char **value, int *value_len)=0; virtual void get_read_rowkey(char **value, int *value_len)=0; /* Reads, multi-row scans */ @@ -70,7 +78,7 @@ public: virtual bool get_range_slices(bool last_key_as_start_key)=0; virtual void finish_reading_range_slices()=0; virtual bool get_next_range_slice_row(bool *eof)=0; - + /* Reads, MRR scans */ virtual void new_lookup_keys()=0; virtual int add_lookup_key(const char *key, size_t key_len)=0; @@ -79,8 +87,9 @@ public: /* read_set setup */ virtual void clear_read_columns()=0; + virtual void clear_read_all_columns()=0; virtual void add_read_column(const char *name)=0; - + virtual bool truncate()=0; virtual bool remove_row()=0; diff --git a/storage/cassandra/gen-cpp/cassandra_constants.cpp b/storage/cassandra/gen-cpp/cassandra_constants.cpp index 621d39027ad..49a01d2773e 100644 --- a/storage/cassandra/gen-cpp/cassandra_constants.cpp +++ b/storage/cassandra/gen-cpp/cassandra_constants.cpp @@ -11,7 +11,7 @@ namespace org { namespace apache { namespace cassandra { const cassandraConstants g_cassandra_constants; cassandraConstants::cassandraConstants() { - cassandra_const_VERSION = "19.32.0"; + cassandra_const_VERSION = (char *)"19.32.0"; } }}} // namespace diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc index c4069458c41..8459e1abf7b 100644 --- a/storage/cassandra/ha_cassandra.cc +++ b/storage/cassandra/ha_cassandra.cc @@ -1,4 +1,4 @@ -/* +/* Copyright (c) 2012, Monty Program Ab This program is free software; you can redistribute it and/or modify @@ -22,15 +22,21 @@ #include "ha_cassandra.h" #include "sql_class.h" +#define DYNCOL_USUAL 20 +#define DYNCOL_DELTA 100 +#define DYNCOL_USUAL_REC 1024 +#define DYNCOL_DELTA_REC 1024 + static handler *cassandra_create_handler(handlerton *hton, - TABLE_SHARE *table, + TABLE_SHARE *table, MEM_ROOT *mem_root); +extern int dynamic_column_error_message(enum_dyncol_func_result rc); handlerton *cassandra_hton; -/* +/* Hash used to track the number of open tables; variable for example share methods */ @@ -69,6 +75,25 @@ ha_create_table_option cassandra_table_option_list[]= HA_TOPTION_END }; +/** + Structure for CREATE TABLE options (field options). +*/ + +struct ha_field_option_struct +{ + bool dyncol_field; +}; + +ha_create_table_option cassandra_field_option_list[]= +{ + /* + Collect all other columns as dynamic here, + the valid values are YES/NO, ON/OFF, 1/0. + The default is 0, that is true, yes, on. + */ + HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0), + HA_FOPTION_END +}; static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG, "Number of rows in an INSERT batch", @@ -245,17 +270,16 @@ static int cassandra_init_func(void *p) cassandra_hton->state= SHOW_OPTION_YES; cassandra_hton->create= cassandra_create_handler; - /* + /* Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE TABLE to create an *empty* table from scratch. Cassandra table won't be emptied if re-created. */ - cassandra_hton->flags= 0; + cassandra_hton->flags= 0; cassandra_hton->table_options= cassandra_table_option_list; - //cassandra_hton->field_options= example_field_option_list; - cassandra_hton->field_options= NULL; - - mysql_mutex_init(0 /* no instrumentation */, + cassandra_hton->field_options= cassandra_field_option_list; + + mysql_mutex_init(0 /* no instrumentation */, &cassandra_default_host_lock, MY_MUTEX_INIT_FAST); DBUG_RETURN(0); @@ -352,7 +376,7 @@ static int free_share(CASSANDRA_SHARE *share) static handler* cassandra_create_handler(handlerton *hton, - TABLE_SHARE *table, + TABLE_SHARE *table, MEM_ROOT *mem_root) { return new (mem_root) ha_cassandra(hton, table); @@ -361,7 +385,11 @@ static handler* cassandra_create_handler(handlerton *hton, ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg) :handler(hton, table_arg), - se(NULL), field_converters(NULL), rowkey_converter(NULL) + se(NULL), field_converters(NULL), + special_type_field_converters(NULL), + special_type_field_names(NULL), n_special_type_fields(0), + rowkey_converter(NULL), + dyncol_field(0), dyncol_set(0) {} @@ -381,7 +409,8 @@ int ha_cassandra::connect_and_check_options(TABLE *table_arg) int res; DBUG_ENTER("ha_cassandra::connect_and_check_options"); - if ((res= check_table_options(options))) + if ((res= check_field_options(table_arg->s->field)) || + (res= check_table_options(options))) DBUG_RETURN(res); se= create_cassandra_se(); @@ -403,6 +432,32 @@ int ha_cassandra::connect_and_check_options(TABLE *table_arg) } +int ha_cassandra::check_field_options(Field **fields) +{ + Field **field; + uint i; + DBUG_ENTER("ha_cassandra::check_field_options"); + for (field= fields, i= 0; *field; field++, i++) + { + ha_field_option_struct *field_options= (*field)->option_struct; + if (field_options && field_options->dyncol_field) + { + if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB) + { + my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name); + DBUG_RETURN(HA_WRONG_CREATE_OPTION); + } + dyncol_set= 1; + dyncol_field= i; + bzero(&dynamic_values, sizeof(dynamic_values)); + bzero(&dynamic_names, sizeof(dynamic_names)); + bzero(&dynamic_rec, sizeof(dynamic_rec)); + } + } + DBUG_RETURN(0); +} + + int ha_cassandra::open(const char *name, int mode, uint test_if_locked) { DBUG_ENTER("ha_cassandra::open"); @@ -578,7 +633,7 @@ public: field->store(*pdata); return 0; } - + bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) { buf= field->val_real(); @@ -770,6 +825,43 @@ static int convert_hex_digit(const char c) const char map2number[]="0123456789abcdef"; +static void convert_uuid2string(char *str, const char *cass_data) +{ + char *ptr= str; + /* UUID arrives as 16-byte number in network byte order */ + for (uint i=0; i < 16; i++) + { + *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF]; + *(ptr++)= map2number[cass_data[i] & 0xF]; + if (i == 3 || i == 5 || i == 7 || i == 9) + *(ptr++)= '-'; + } + *ptr= 0; +} + +static bool convert_string2uuid(char *buf, const char *str) +{ + int lower, upper; + for (uint i= 0; i < 16; i++) + { + if ((upper= convert_hex_digit(str[0])) == -1 || + (lower= convert_hex_digit(str[1])) == -1) + { + return true; + } + buf[i]= lower | (upper << 4); + str += 2; + if (i == 3 || i == 5 || i == 7 || i == 9) + { + if (str[0] != '-') + return true; + str++; + } + } + return false; +} + + class UuidDataConverter : public ColumnDataConverter { char buf[16]; /* Binary UUID representation */ @@ -779,16 +871,7 @@ public: { DBUG_ASSERT(cass_data_len==16); char str[37]; - char *ptr= str; - /* UUID arrives as 16-byte number in network byte order */ - for (uint i=0; i < 16; i++) - { - *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF]; - *(ptr++)= map2number[cass_data[i] & 0xF]; - if (i == 3 || i == 5 || i == 7 || i == 9) - *(ptr++)= '-'; - } - *ptr= 0; + convert_uuid2string(str, cass_data); field->store(str, 36,field->charset()); return 0; } @@ -796,29 +879,12 @@ public: bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) { String *uuid_str= field->val_str(&str_buf); - char *pstr= (char*)uuid_str->c_ptr(); - if (uuid_str->length() != 36) + if (uuid_str->length() != 36) + return true; + + if (convert_string2uuid(buf, (char*)uuid_str->c_ptr())) return true; - - int lower, upper; - for (uint i=0; i < 16; i++) - { - if ((upper= convert_hex_digit(pstr[0])) == -1 || - (lower= convert_hex_digit(pstr[1])) == -1) - { - return true; - } - buf[i]= lower | (upper << 4); - pstr += 2; - if (i == 3 || i == 5 || i == 7 || i == 9) - { - if (pstr[0] != '-') - return true; - pstr++; - } - } - *cass_data= buf; *cass_data_len= 16; return false; @@ -826,6 +892,302 @@ public: ~UuidDataConverter(){} }; +/** + Converting dynamic columns types to/from casandra types +*/ +bool cassandra_to_dyncol_intLong(const char *cass_data, + int cass_data_len __attribute__((unused)), + DYNAMIC_COLUMN_VALUE *value) +{ + value->type= DYN_COL_INT; +#ifdef WORDS_BIGENDIAN + value->x.long_value= (longlong *)*cass_data; +#else + flip64(cass_data, (char *)&value->x.long_value); +#endif + return 0; +} + +bool dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + longlong *tmp= (longlong *) buff; + enum enum_dyncol_func_result rc= + dynamic_column_val_long(tmp, value); + if (rc < 0) + return true; + *cass_data_len= sizeof(longlong); +#ifdef WORDS_BIGENDIAN + *cass_data= (char *)buff; +#else + flip64((char *)buff, (char *)buff + sizeof(longlong)); + *cass_data= (char *)buff + sizeof(longlong); +#endif + *freemem= NULL; + return false; +} + +bool cassandra_to_dyncol_intInt32(const char *cass_data, + int cass_data_len __attribute__((unused)), + DYNAMIC_COLUMN_VALUE *value) +{ + int32 tmp; + value->type= DYN_COL_INT; +#ifdef WORDS_BIGENDIAN + tmp= *((int32 *)cass_data); +#else + flip32(cass_data, (char *)&tmp); +#endif + value->x.long_value= tmp; + return 0; +} + + +bool dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + longlong *tmp= (longlong *) ((char *)buff + sizeof(longlong)); + enum enum_dyncol_func_result rc= + dynamic_column_val_long(tmp, value); + if (rc < 0) + return true; + *cass_data_len= sizeof(int32); + *cass_data= (char *)buff; +#ifdef WORDS_BIGENDIAN + *((int32 *) buff) = (int32) *tmp; +#else + { + int32 tmp2= (int32) *tmp; + flip32((char *)&tmp2, (char *)buff); + } +#endif + *freemem= NULL; + return false; +} + + +bool cassandra_to_dyncol_intCounter(const char *cass_data, + int cass_data_len __attribute__((unused)), + DYNAMIC_COLUMN_VALUE *value) +{ + value->type= DYN_COL_INT; + value->x.long_value= *((longlong *)cass_data); + return 0; +} + + +bool dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + longlong *tmp= (longlong *)buff; + enum enum_dyncol_func_result rc= + dynamic_column_val_long(tmp, value); + if (rc < 0) + return true; + *cass_data_len= sizeof(longlong); + *cass_data= (char *)buff; + *freemem= NULL; + return false; +} + +bool cassandra_to_dyncol_doubleFloat(const char *cass_data, + int cass_data_len __attribute__((unused)), + DYNAMIC_COLUMN_VALUE *value) +{ + value->type= DYN_COL_DOUBLE; + value->x.double_value= *((float *)cass_data); + return 0; +} + +bool dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + double tmp; + enum enum_dyncol_func_result rc= + dynamic_column_val_double(&tmp, value); + if (rc < 0) + return true; + *((float *)buff)= (float) tmp; + *cass_data_len= sizeof(float); + *cass_data= (char *)buff; + *freemem= NULL; + return false; +} + +bool cassandra_to_dyncol_doubleDouble(const char *cass_data, + int cass_data_len __attribute__((unused)), + DYNAMIC_COLUMN_VALUE *value) +{ + value->type= DYN_COL_DOUBLE; + value->x.double_value= *((double *)cass_data); + return 0; +} + +bool dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + double *tmp= (double *)buff; + enum enum_dyncol_func_result rc= + dynamic_column_val_double(tmp, value); + if (rc < 0) + return true; + *cass_data_len= sizeof(double); + *cass_data= (char *)buff; + *freemem= NULL; + return false; +} + +bool cassandra_to_dyncol_strStr(const char *cass_data, + int cass_data_len, + DYNAMIC_COLUMN_VALUE *value, + CHARSET_INFO *cs) +{ + value->type= DYN_COL_STRING; + value->x.string.charset= cs; + value->x.string.value.str= (char *)cass_data; + value->x.string.value.length= cass_data_len; + value->x.string.nonfreeable= TRUE; // do not try to free + return 0; +} + +bool dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem, CHARSET_INFO *cs) +{ + DYNAMIC_STRING tmp; + if (init_dynamic_string(&tmp, NULL, 1024, 1024)) + return 1; + enum enum_dyncol_func_result rc= + dynamic_column_val_str(&tmp, value, cs, FALSE); + if (rc < 0) + { + dynstr_free(&tmp); + return 1; + } + *cass_data_len= tmp.length; + *(cass_data)= tmp.str; + *freemem= tmp.str; + return 0; +} + +bool cassandra_to_dyncol_strBytes(const char *cass_data, + int cass_data_len, + DYNAMIC_COLUMN_VALUE *value) +{ + return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value, + &my_charset_bin); +} + +bool dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + return dyncol_to_cassandraStr(value, cass_data, cass_data_len, + buff, freemem, &my_charset_bin); +} + +bool cassandra_to_dyncol_strAscii(const char *cass_data, + int cass_data_len, + DYNAMIC_COLUMN_VALUE *value) +{ + return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value, + &my_charset_latin1_bin); +} + +bool dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + return dyncol_to_cassandraStr(value, cass_data, cass_data_len, + buff, freemem, &my_charset_latin1_bin); +} + +bool cassandra_to_dyncol_strUTF8(const char *cass_data, + int cass_data_len, + DYNAMIC_COLUMN_VALUE *value) +{ + return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value, + &my_charset_utf8_unicode_ci); +} + +bool dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + return dyncol_to_cassandraStr(value, cass_data, cass_data_len, + buff, freemem, &my_charset_utf8_unicode_ci); +} + +bool cassandra_to_dyncol_strUUID(const char *cass_data, + int cass_data_len, + DYNAMIC_COLUMN_VALUE *value) +{ + value->type= DYN_COL_STRING; + value->x.string.charset= &my_charset_bin; + value->x.string.value.str= (char *)my_malloc(37, MYF(0)); + if (!value->x.string.value.str) + { + value->x.string.value.length= 0; + value->x.string.nonfreeable= TRUE; + return 1; + } + convert_uuid2string(value->x.string.value.str, cass_data); + value->x.string.value.length= 36; + value->x.string.nonfreeable= FALSE; + return 0; +} + +bool dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + DYNAMIC_STRING tmp; + if (init_dynamic_string(&tmp, NULL, 1024, 1024)) + return true; + enum enum_dyncol_func_result rc= + dynamic_column_val_str(&tmp, value, &my_charset_latin1_bin, FALSE); + if (rc < 0 || tmp.length != 36 || convert_string2uuid((char *)buff, tmp.str)) + { + dynstr_free(&tmp); + return true; + } + + *cass_data_len= tmp.length; + *(cass_data)= tmp.str; + *freemem= tmp.str; + return 0; +} + +bool cassandra_to_dyncol_intBool(const char *cass_data, + int cass_data_len, + DYNAMIC_COLUMN_VALUE *value) +{ + value->type= DYN_COL_INT; + value->x.long_value= (cass_data[0] ? 1 : 0); + return 0; +} + +bool dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE *value, + char **cass_data, int *cass_data_len, + void* buff, void **freemem) +{ + longlong tmp; + enum enum_dyncol_func_result rc= + dynamic_column_val_long(&tmp, value); + if (rc < 0) + return true; + ((char *)buff)[0]= (tmp ? 1 : 0); + *cass_data_len= 1; + *(cass_data)= (char *)buff; + *freemem= 0; + return 0; +} + const char * const validator_bigint= "org.apache.cassandra.db.marshal.LongType"; const char * const validator_int= "org.apache.cassandra.db.marshal.Int32Type"; @@ -849,6 +1211,126 @@ const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerTyp const char * const validator_decimal= "org.apache.cassandra.db.marshal.DecimalType"; +static CASSANDRA_TYPE_DEF cassandra_types[]= +{ + { + validator_bigint, + &cassandra_to_dyncol_intLong, + &dyncol_to_cassandraLong + }, + { + validator_int, + &cassandra_to_dyncol_intInt32, + &dyncol_to_cassandraInt32 + }, + { + validator_counter, + cassandra_to_dyncol_intCounter, + &dyncol_to_cassandraCounter + }, + { + validator_float, + &cassandra_to_dyncol_doubleFloat, + &dyncol_to_cassandraFloat + }, + { + validator_double, + &cassandra_to_dyncol_doubleDouble, + &dyncol_to_cassandraDouble + }, + { + validator_blob, + &cassandra_to_dyncol_strBytes, + &dyncol_to_cassandraBytes + }, + { + validator_ascii, + &cassandra_to_dyncol_strAscii, + &dyncol_to_cassandraAscii + }, + { + validator_text, + &cassandra_to_dyncol_strUTF8, + &dyncol_to_cassandraUTF8 + }, + { + validator_timestamp, + &cassandra_to_dyncol_intLong, + &dyncol_to_cassandraLong + }, + { + validator_uuid, + &cassandra_to_dyncol_strUUID, + &dyncol_to_cassandraUUID + }, + { + validator_boolean, + &cassandra_to_dyncol_intBool, + &dyncol_to_cassandraBool + }, + { + validator_varint, + &cassandra_to_dyncol_strBytes, + &dyncol_to_cassandraBytes + }, + { + validator_decimal, + &cassandra_to_dyncol_strBytes, + &dyncol_to_cassandraBytes + } +}; + +CASSANDRA_TYPE get_cassandra_type(const char *validator) +{ + CASSANDRA_TYPE rc; + switch(validator[32]) + { + case 'L': + rc= CT_BIGINT; + break; + case 'I': + rc= (validator[35] == '3' ? CT_INT : CT_VARINT); + rc= CT_INT; + break; + case 'C': + rc= CT_COUNTER; + break; + case 'F': + rc= CT_FLOAT; + break; + case 'D': + switch (validator[33]) + { + case 'o': + rc= CT_DOUBLE; + break; + case 'a': + rc= CT_TIMESTAMP; + break; + case 'e': + rc= CT_DECIMAL; + break; + default: + rc= CT_BLOB; + break; + } + break; + case 'B': + rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB); + break; + case 'A': + rc= CT_ASCII; + break; + case 'U': + rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID); + break; + default: + rc= CT_BLOB; + } + DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0); + return rc; +} + ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name) { ColumnDataConverter *res= NULL; @@ -880,16 +1362,16 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_ if (!strcmp(validator_name, validator_double)) res= new DoubleDataConverter; break; - + case MYSQL_TYPE_TIMESTAMP: if (!strcmp(validator_name, validator_timestamp)) res= new TimestampDataConverter; break; case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings. - if (!strcmp(validator_name, validator_uuid) && + if (!strcmp(validator_name, validator_uuid) && field->real_type() == MYSQL_TYPE_STRING && - field->field_length == 36) + field->field_length == 36) { // UUID maps to CHAR(36), its text representation res= new UuidDataConverter; @@ -943,39 +1425,117 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields) int col_name_len; char *col_type; int col_type_len; + size_t ddl_fields= se->get_ddl_size(); + const char *default_type= se->get_default_validator(); + uint max_non_default_fields; + DBUG_ENTER("ha_cassandra::setup_field_converters"); + DBUG_ASSERT(default_type); DBUG_ASSERT(!field_converters); - size_t memsize= sizeof(ColumnDataConverter*) * n_fields; + DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1); + + /* + We always should take into account that in case of using dynamic columns + sql description contain one field which does not described in + Cassandra DDL also key field is described separately. So that + is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2". + */ + max_non_default_fields= ddl_fields + 2 - n_fields; + if (ddl_fields < (n_fields - dyncol_set - 1)) + { + se->print_error("Some of SQL fields were not mapped to Cassandra's fields"); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(true); + } + + /* allocate memory in one chunk */ + size_t memsize= sizeof(ColumnDataConverter*) * n_fields + + (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))* + (dyncol_set ? max_non_default_fields : 0); if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0)))) - return true; + DBUG_RETURN(true); bzero(field_converters, memsize); n_field_converters= n_fields; + if (dyncol_set) + { + special_type_field_converters= + (CASSANDRA_TYPE_DEF *)(field_converters + n_fields); + special_type_field_names= + ((LEX_STRING*)(special_type_field_converters + max_non_default_fields)); + } + + if (dyncol_set) + { + if (init_dynamic_array(&dynamic_values, + sizeof(DYNAMIC_COLUMN_VALUE), + DYNCOL_USUAL, DYNCOL_DELTA)) + DBUG_RETURN(true); + else + if (init_dynamic_array(&dynamic_names, + sizeof(LEX_STRING), + DYNCOL_USUAL, DYNCOL_DELTA)) + { + delete_dynamic(&dynamic_values); + DBUG_RETURN(true); + } + else + if (init_dynamic_string(&dynamic_rec, NULL, + DYNCOL_USUAL_REC, DYNCOL_DELTA_REC)) + { + delete_dynamic(&dynamic_values); + delete_dynamic(&dynamic_names); + DBUG_RETURN(true); + } + + /* Dynamic column field has special processing */ + field_converters[dyncol_field]= NULL; + + default_type_def= cassandra_types + get_cassandra_type(default_type); + } + se->first_ddl_column(); uint n_mapped= 0; while (!se->next_ddl_column(&col_name, &col_name_len, &col_type, &col_type_len)) { + Field **field; + uint i; /* Mapping for the 1st field is already known */ - for (Field **field= field_arg + 1; *field; field++) + for (field= field_arg + 1, i= 1; *field; field++, i++) { - if (!strcmp((*field)->field_name, col_name)) + if ((!dyncol_set || dyncol_field != i) && + !strcmp((*field)->field_name, col_name)) { n_mapped++; ColumnDataConverter **conv= field_converters + (*field)->field_index; if (!(*conv= map_field_to_validator(*field, col_type))) { - se->print_error("Failed to map column %s to datatype %s", + se->print_error("Failed to map column %s to datatype %s", (*field)->field_name, col_type); my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); - return true; + DBUG_RETURN(true); } (*conv)->field= *field; } } + if (dyncol_set && !(*field)) // is needed and not found + { + DBUG_PRINT("info",("Field not found: %s", col_name)); + if (strcmp(col_type, default_type)) + { + DBUG_PRINT("info",("Field '%s' non-default type: '%s'", + col_name, col_type)); + special_type_field_names[n_special_type_fields].length= col_name_len; + special_type_field_names[n_special_type_fields].str= col_name; + special_type_field_converters[n_special_type_fields]= + cassandra_types[get_cassandra_type(col_type)]; + n_special_type_fields++; + } + } } - if (n_mapped != n_fields - 1) + if (n_mapped != n_fields - 1 - dyncol_set) { Field *first_unmapped= NULL; /* Find the first field */ @@ -990,27 +1550,28 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields) DBUG_ASSERT(first_unmapped); se->print_error("Field `%s` could not be mapped to any field in Cassandra", - first_unmapped->field_name); + first_unmapped->field_name); my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); - return true; + DBUG_RETURN(true); } - - /* + + /* Setup type conversion for row_key. */ se->get_rowkey_type(&col_name, &col_type); if (col_name && strcmp(col_name, (*field_arg)->field_name)) { - se->print_error("PRIMARY KEY column must match Cassandra's name '%s'", col_name); + se->print_error("PRIMARY KEY column must match Cassandra's name '%s'", + col_name); my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); - return true; + DBUG_RETURN(true); } if (!col_name && strcmp("rowkey", (*field_arg)->field_name)) { se->print_error("target column family has no key_alias defined, " "PRIMARY KEY column must be named 'rowkey'"); my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); - return true; + DBUG_RETURN(true); } if (col_type != NULL) @@ -1019,7 +1580,7 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields) { se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type); my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); - return true; + DBUG_RETURN(true); } rowkey_converter->field= *field_arg; } @@ -1027,10 +1588,10 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields) { se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)"); my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); - return true; + DBUG_RETURN(true); } - return false; + DBUG_RETURN(false); } @@ -1039,10 +1600,20 @@ void ha_cassandra::free_field_converters() delete rowkey_converter; rowkey_converter= NULL; + if (dyncol_set) + { + delete_dynamic(&dynamic_values); + delete_dynamic(&dynamic_names); + dynstr_free(&dynamic_rec); + } if (field_converters) { for (uint i=0; i < n_field_converters; i++) - delete field_converters[i]; + if (field_converters[i]) + { + DBUG_ASSERT(!dyncol_set || i == dyncol_field); + delete field_converters[i]; + } my_free(field_converters); field_converters= NULL; } @@ -1065,7 +1636,7 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key, { int rc= 0; DBUG_ENTER("ha_cassandra::index_read_map"); - + if (find_flag != HA_READ_KEY_EXACT) DBUG_RETURN(HA_ERR_WRONG_COMMAND); @@ -1081,6 +1652,7 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key, if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len)) { /* We get here when making lookups like uuid_column='not-an-uuid' */ + dbug_tmp_restore_column_map(table->read_set, old_map); DBUG_RETURN(HA_ERR_KEY_NOT_FOUND); } @@ -1092,7 +1664,7 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key, my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); rc= HA_ERR_INTERNAL_ERROR; } - + /* TODO: what if we're not reading all columns?? */ if (!found) rc= HA_ERR_KEY_NOT_FOUND; @@ -1125,15 +1697,42 @@ void ha_cassandra::print_conversion_error(const char *field_name, } +void free_strings(DYNAMIC_COLUMN_VALUE *vals, uint num) +{ + for (uint i= 0; i < num; i++) + if (vals[i].type == DYN_COL_STRING && + !vals[i].x.string.nonfreeable) + my_free(vals[i].x.string.value.str); +} + + +CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name, + int cass_name_len) +{ + CASSANDRA_TYPE_DEF *type= default_type_def; + for(uint i= 0; i < n_special_type_fields; i++) + { + if (cass_name_len == (int)special_type_field_names[i].length && + memcmp(cass_name, special_type_field_names[i].str, + cass_name_len) == 0) + { + type= special_type_field_converters + i; + break; + } + } + return type; +} + int ha_cassandra::read_cassandra_columns(bool unpack_pk) { char *cass_name; char *cass_value; - int cass_value_len; + int cass_value_len, cass_name_len; Field **field; int res= 0; - - /* + ulong total_name_len= 0; + + /* cassandra_to_mariadb() calls will use field->store(...) methods, which require that the column is in the table->write_set */ @@ -1144,16 +1743,18 @@ int ha_cassandra::read_cassandra_columns(bool unpack_pk) for (field= table->field + 1; *field; field++) (*field)->set_null(); - while (!se->get_next_read_column(&cass_name, &cass_value, &cass_value_len)) + while (!se->get_next_read_column(&cass_name, &cass_name_len, + &cass_value, &cass_value_len)) { // map to our column. todo: use hash or something.. - int idx=1; + bool found= 0; for (field= table->field + 1; *field; field++) { - idx++; - if (!strcmp((*field)->field_name, cass_name)) + uint fieldnr= (*field)->field_index; + if ((!dyncol_set || dyncol_field != fieldnr) && + !strcmp((*field)->field_name, cass_name)) { - int fieldnr= (*field)->field_index; + found= 1; (*field)->set_notnull(); if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len)) @@ -1166,8 +1767,86 @@ int ha_cassandra::read_cassandra_columns(bool unpack_pk) break; } } + if (dyncol_set && !found) + { + DYNAMIC_COLUMN_VALUE val; + LEX_STRING nm; + CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name, + cass_name_len); + nm.str= cass_name; + nm.length= cass_name_len; + if (nm.length > MAX_NAME_LENGTH) + { + se->print_error("Unable to convert value for field `%s`" + " from Cassandra's data format. Name" + " length exceed limit of %u: '%s'", + table->field[dyncol_field]->field_name, + (uint)MAX_NAME_LENGTH, cass_name); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + res=1; + goto err; + } + total_name_len+= cass_name_len; + if (nm.length > MAX_TOTAL_NAME_LENGTH) + { + se->print_error("Unable to convert value for field `%s`" + " from Cassandra's data format. Sum of all names" + " length exceed limit of %lu", + table->field[dyncol_field]->field_name, + cass_name, (uint)MAX_TOTAL_NAME_LENGTH); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + res=1; + goto err; + } + + if ((res= (*(type->cassandra_to_dynamic))(cass_value, + cass_value_len, &val)) || + insert_dynamic(&dynamic_names, (uchar *) &nm) || + insert_dynamic(&dynamic_values, (uchar *) &val)) + { + if (res) + { + print_conversion_error(cass_name, cass_value, cass_value_len); + } + free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer, + dynamic_values.elements); + // EOM shouldm be already reported if happened + res=1; + goto err; + } + } } - + + dynamic_rec.length= 0; + if (dyncol_set) + { + if (dynamic_column_create_many_internal_fmt(&dynamic_rec, + dynamic_names.elements, + dynamic_names.buffer, + (DYNAMIC_COLUMN_VALUE *) + dynamic_values.buffer, + FALSE, + TRUE) < 0) + dynamic_rec.length= 0; + + free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer, + dynamic_values.elements); + dynamic_values.elements= dynamic_names.elements= 0; + } + if (dyncol_set) + { + if (dynamic_rec.length == 0) + table->field[dyncol_field]->set_null(); + else + { + Field_blob *blob= (Field_blob *)table->field[dyncol_field]; + blob->set_notnull(); + blob->store_length(dynamic_rec.length); + *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))= + dynamic_rec.str; + } + } + if (unpack_pk) { /* Unpack rowkey to primary key */ @@ -1187,6 +1866,82 @@ err: return res; } +int ha_cassandra::read_dyncol(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names, + String *valcol, char **freenames) +{ + String *strcol; + DYNAMIC_COLUMN col; + enum enum_dyncol_func_result rc; + DBUG_ENTER("ha_cassandra::read_dyncol"); + + Field *field= table->field[dyncol_field]; + DBUG_ASSERT(field->type() == MYSQL_TYPE_BLOB); + /* It is blob and it does not use buffer */ + strcol= field->val_str(NULL, valcol); + if (field->is_null()) + { + bzero(vals, sizeof(DYNAMIC_ARRAY)); + bzero(names, sizeof(DYNAMIC_ARRAY)); + DBUG_RETURN(0); // nothing to write + } + /* + dynamic_column_vals only read the string so we can + cheat here with assignment + */ + bzero(&col, sizeof(col)); + col.str= (char *)strcol->ptr(); + col.length= strcol->length(); + if ((rc= dynamic_column_vals(&col, names, vals, freenames)) < 0) + { + dynamic_column_error_message(rc); + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + } + DBUG_RETURN(0); +} + +int ha_cassandra::write_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names) +{ + uint i; + DBUG_ENTER("ha_cassandra::write_dynamic_row"); + DBUG_ASSERT(dyncol_set); + + + DBUG_ASSERT(names->elements == vals->elements); + for (i= 0; i < names->elements; i++) + { + char buff[16]; + CASSANDRA_TYPE_DEF *type; + void *freemem= NULL; + char *cass_data; + int cass_data_len; + LEX_STRING *name= dynamic_element(names, i, LEX_STRING*); + DYNAMIC_COLUMN_VALUE *val= dynamic_element(vals, i, DYNAMIC_COLUMN_VALUE*); + + DBUG_PRINT("info", ("field %*s", (int)name->length, name->str)); + type= get_cassandra_field_def(name->str, (int) name->length); + if ((*type->dynamic_to_cassandra)(val, &cass_data, &cass_data_len, + buff, &freemem)) + { + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), + name->str, insert_lineno); + DBUG_RETURN(HA_ERR_AUTOINC_ERANGE); + } + se->add_insert_column(name->str, name->length, + cass_data, cass_data_len); + if (freemem) + my_free(freemem); + } + DBUG_RETURN(0); +} + +void ha_cassandra::free_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names, + char *free_names) +{ + delete_dynamic(names); + delete_dynamic(vals); + if (free_names) + my_free(free_names); +} int ha_cassandra::write_row(uchar *buf) { @@ -1221,15 +1976,35 @@ int ha_cassandra::write_row(uchar *buf) { char *cass_data; int cass_data_len; - if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len)) + if (dyncol_set && dyncol_field == i) { - my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), - field_converters[i]->field->field_name, insert_lineno); - dbug_tmp_restore_column_map(table->read_set, old_map); - DBUG_RETURN(HA_ERR_AUTOINC_ERANGE); + String valcol; + DYNAMIC_ARRAY vals, names; + char *free_names; + int rc; + DBUG_ASSERT(field_converters[i] == NULL); + if (!(rc= read_dyncol(&vals, &names, &valcol, &free_names))) + rc= write_dynamic_row(&vals, &names); + free_dynamic_row(&vals, &names, free_names); + if (rc) + { + dbug_tmp_restore_column_map(table->read_set, old_map); + DBUG_RETURN(rc); + } + } + else + { + if (field_converters[i]->mariadb_to_cassandra(&cass_data, + &cass_data_len)) + { + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), + field_converters[i]->field->field_name, insert_lineno); + dbug_tmp_restore_column_map(table->read_set, old_map); + DBUG_RETURN(HA_ERR_AUTOINC_ERANGE); + } + se->add_insert_column(field_converters[i]->field->field_name, 0, + cass_data, cass_data_len); } - se->add_insert_column(field_converters[i]->field->field_name, - cass_data, cass_data_len); } dbug_tmp_restore_column_map(table->read_set, old_map); @@ -1296,9 +2071,16 @@ int ha_cassandra::rnd_init(bool scan) DBUG_RETURN(0); } - se->clear_read_columns(); - for (uint i= 1; i < table->s->fields; i++) - se->add_read_column(table->field[i]->field_name); + if (dyncol_set) + { + se->clear_read_all_columns(); + } + else + { + se->clear_read_columns(); + for (uint i= 1; i < table->s->fields; i++) + se->add_read_column(table->field[i]->field_name); + } se->read_batch_size= THDVAR(table->in_use, rnd_batch_size); bres= se->get_range_slices(false); @@ -1633,13 +2415,16 @@ public: int ha_cassandra::update_row(const uchar *old_data, uchar *new_data) { + DYNAMIC_ARRAY oldvals, oldnames, vals, names; + String oldvalcol, valcol; + char *oldfree_names= NULL, *free_names= NULL; my_bitmap_map *old_map; + int res; DBUG_ENTER("ha_cassandra::update_row"); /* Currently, it is guaranteed that new_data == table->record[0] */ - + DBUG_ASSERT(new_data == table->record[0]); /* For now, just rewrite the full record */ se->clear_insert_buffer(); - old_map= dbug_tmp_use_all_columns(table, table->read_set); @@ -1668,6 +2453,22 @@ int ha_cassandra::update_row(const uchar *old_data, uchar *new_data) else new_primary_key= false; + if (dyncol_set) + { + Field *field= table->field[dyncol_field]; + /* move to get old_data */ + my_ptrdiff_t diff; + diff= (my_ptrdiff_t) (old_data - new_data); + field->move_field_offset(diff); // Points now at old_data + if ((res= read_dyncol(&oldvals, &oldnames, &oldvalcol, &oldfree_names))) + DBUG_RETURN(res); + field->move_field_offset(-diff); // back to new_data + if ((res= read_dyncol(&vals, &names, &valcol, &free_names))) + { + free_dynamic_row(&oldnames, &oldvals, oldfree_names); + DBUG_RETURN(res); + } + } if (new_primary_key) { @@ -1676,7 +2477,10 @@ int ha_cassandra::update_row(const uchar *old_data, uchar *new_data) Add a DELETE operation into the batch */ Column_name_enumerator_impl name_enumerator(this); - se->add_row_deletion(old_key, old_key_len, &name_enumerator); + se->add_row_deletion(old_key, old_key_len, &name_enumerator, + (LEX_STRING *)oldnames.buffer, + (dyncol_set ? oldnames.elements : 0)); + oldnames.elements= oldvals.elements= 0; // they will be deleted } se->start_row_insert(new_key, new_key_len); @@ -1686,23 +2490,64 @@ int ha_cassandra::update_row(const uchar *old_data, uchar *new_data) { char *cass_data; int cass_data_len; - if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len)) + if (dyncol_set && dyncol_field == i) { - my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), - field_converters[i]->field->field_name, insert_lineno); - dbug_tmp_restore_column_map(table->read_set, old_map); - DBUG_RETURN(HA_ERR_AUTOINC_ERANGE); + DBUG_ASSERT(field_converters[i] == NULL); + if ((res= write_dynamic_row(&vals, &names))) + goto err; + } + else + { + if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len)) + { + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0), + field_converters[i]->field->field_name, insert_lineno); + dbug_tmp_restore_column_map(table->read_set, old_map); + DBUG_RETURN(HA_ERR_AUTOINC_ERANGE); + } + se->add_insert_column(field_converters[i]->field->field_name, 0, + cass_data, cass_data_len); + } + } + if (dyncol_set) + { + /* find removed fields */ + uint i= 0, j= 0; + LEX_STRING *onames= (LEX_STRING *)oldnames.buffer; + LEX_STRING *nnames= (LEX_STRING *)names.buffer; + /* both array are sorted */ + for(; i < oldnames.elements; i++) + { + int scmp= 0; + while (j < names.elements && + (nnames[j].length < onames[i].length || + (nnames[j].length == onames[i].length && + (scmp= memcmp(nnames[j].str, onames[i].str, + onames[i].length)) < 0))) + j++; + if (j < names.elements && + nnames[j].length == onames[i].length && + scmp == 0) + j++; + else + se->add_insert_delete_column(onames[i].str, onames[i].length); } - se->add_insert_column(field_converters[i]->field->field_name, - cass_data, cass_data_len); } + dbug_tmp_restore_column_map(table->read_set, old_map); - - bool res= se->do_insert(); + + res= se->do_insert(); if (res) my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); - + +err: + if (dyncol_set) + { + free_dynamic_row(&oldnames, &oldvals, oldfree_names); + free_dynamic_row(&names, &vals, free_names); + } + DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0); } diff --git a/storage/cassandra/ha_cassandra.h b/storage/cassandra/ha_cassandra.h index fb5236bf118..f32151849e5 100644 --- a/storage/cassandra/ha_cassandra.h +++ b/storage/cassandra/ha_cassandra.h @@ -40,6 +40,33 @@ class ColumnDataConverter; struct ha_table_option_struct; + +struct st_dynamic_column_value; + +typedef bool (* CAS2DYN_CONVERTER)(const char *cass_data, + int cass_data_len, + struct st_dynamic_column_value *value); +typedef bool (* DYN2CAS_CONVERTER)(struct st_dynamic_column_value *value, + char **cass_data, + int *cass_data_len, + void *buf, void **freemem); +struct cassandra_type_def +{ + const char *name; + CAS2DYN_CONVERTER cassandra_to_dynamic; + DYN2CAS_CONVERTER dynamic_to_cassandra; +}; + +typedef struct cassandra_type_def CASSANDRA_TYPE_DEF; + +enum cassandtra_type_enum {CT_BIGINT, CT_INT, CT_COUNTER, CT_FLOAT, CT_DOUBLE, + CT_BLOB, CT_ASCII, CT_TEXT, CT_TIMESTAMP, CT_UUID, CT_BOOLEAN, CT_VARINT, + CT_DECIMAL}; + +typedef enum cassandtra_type_enum CASSANDRA_TYPE; + + + /** @brief Class definition for the storage engine */ @@ -48,23 +75,35 @@ class ha_cassandra: public handler friend class Column_name_enumerator_impl; THR_LOCK_DATA lock; ///< MySQL lock CASSANDRA_SHARE *share; ///< Shared lock info - + Cassandra_se_interface *se; + /* description of static part of the table definition */ ColumnDataConverter **field_converters; uint n_field_converters; + CASSANDRA_TYPE_DEF *default_type_def; + /* description of dynamic columns part */ + CASSANDRA_TYPE_DEF *special_type_field_converters; + LEX_STRING *special_type_field_names; + uint n_special_type_fields; + DYNAMIC_ARRAY dynamic_values, dynamic_names; + DYNAMIC_STRING dynamic_rec; + ColumnDataConverter *rowkey_converter; bool setup_field_converters(Field **field, uint n_fields); void free_field_converters(); - + int read_cassandra_columns(bool unpack_pk); int check_table_options(struct ha_table_option_struct* options); bool doing_insert_batch; ha_rows insert_rows_batched; - + + uint dyncol_field; + bool dyncol_set; + /* Used to produce 'wrong column %s at row %lu' warnings */ ha_rows insert_lineno; void print_conversion_error(const char *field_name, @@ -191,6 +230,14 @@ public: private: bool source_exhausted; bool mrr_start_read(); + int check_field_options(Field **fields); + int read_dyncol(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names, + String *valcol, char **freenames); + int write_dynamic_row(DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals); + void static free_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names, + char *free_names); + CASSANDRA_TYPE_DEF * get_cassandra_field_def(char *cass_name, + int cass_name_length); public: /* |