diff options
-rw-r--r-- | mysql-test/r/cassandra.result | 17 | ||||
-rw-r--r-- | mysql-test/t/cassandra.test | 24 | ||||
-rw-r--r-- | storage/cassandra/ha_cassandra.cc | 102 | ||||
-rw-r--r-- | storage/cassandra/ha_cassandra.h | 4 |
4 files changed, 121 insertions, 26 deletions
diff --git a/mysql-test/r/cassandra.result b/mysql-test/r/cassandra.result index 32e1789983c..3ca4091ed29 100644 --- a/mysql-test/r/cassandra.result +++ b/mysql-test/r/cassandra.result @@ -326,3 +326,20 @@ set cassandra_write_consistency='ANY'; set cassandra_write_consistency='TWO'; set cassandra_write_consistency='THREE'; set cassandra_write_consistency=@tmp; +# +# varint datatype support +# +CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(32)) ENGINE=CASSANDRA +thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9'; +select rowkey, hex(varint_col) from t2; +rowkey hex(varint_col) +val-01 01 +val-0x123456 123456 +val-0x12345678 12345678 +drop table t2; +# now, let's check what happens when MariaDB's column is not wide enough: +CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(2)) ENGINE=CASSANDRA +thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9'; +select rowkey, hex(varint_col) from t2; +ERROR HY000: Internal error: 'Unable to convert value of field `varint_col` from cassandra's data format. Source has 4 bytes, data: 12345678' +drop table t2; diff --git a/mysql-test/t/cassandra.test b/mysql-test/t/cassandra.test index 8a2da608c22..5ff6018214e 100644 --- a/mysql-test/t/cassandra.test +++ b/mysql-test/t/cassandra.test @@ -68,6 +68,11 @@ create columnfamily cf8 (rowkey varchar primary key, countercol counter); update cf8 set countercol=countercol+1 where rowkey='cnt1'; update cf8 set countercol=countercol+100 where rowkey='cnt2'; +create columnfamily cf9 (rowkey varchar primary key, varint_col varint); +insert into cf9 (rowkey, varint_col) values ('val-01', 1); +insert into cf9 (rowkey, varint_col) values ('val-0x123456', 1193046); +insert into cf9 (rowkey, varint_col) values ('val-0x12345678', 305419896); + EOF --error 0,1,2 --system cqlsh -3 -f $MYSQLTEST_VARDIR/cassandra_test_init.cql @@ -413,6 +418,25 @@ set cassandra_write_consistency='THREE'; set cassandra_write_consistency=@tmp; +--echo # +--echo # varint datatype support +--echo # +# create columnfamily cf9 (rowkey varchar primary key, varint_col varint); +CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(32)) ENGINE=CASSANDRA + thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9'; +--sorted_result +select rowkey, hex(varint_col) from t2; +drop table t2; + +--echo # now, let's check what happens when MariaDB's column is not wide enough: +CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(2)) ENGINE=CASSANDRA + thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9'; +--sorted_result +--error ER_INTERNAL_ERROR +select rowkey, hex(varint_col) from t2; +drop table t2; + + ############################################################################ ## Cassandra cleanup ############################################################################ diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc index 3643da97287..6a82f811eb4 100644 --- a/storage/cassandra/ha_cassandra.cc +++ b/storage/cassandra/ha_cassandra.cc @@ -531,7 +531,7 @@ public: Field *field; /* This will save Cassandra's data in the Field */ - virtual void cassandra_to_mariadb(const char *cass_data, + virtual int cassandra_to_mariadb(const char *cass_data, int cass_data_len)=0; /* @@ -552,11 +552,12 @@ class DoubleDataConverter : public ColumnDataConverter { double buf; public: - void cassandra_to_mariadb(const char *cass_data, int cass_data_len) + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) { DBUG_ASSERT(cass_data_len == sizeof(double)); double *pdata= (double*) cass_data; field->store(*pdata); + return 0; } bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) @@ -574,11 +575,12 @@ class FloatDataConverter : public ColumnDataConverter { float buf; public: - void cassandra_to_mariadb(const char *cass_data, int cass_data_len) + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) { DBUG_ASSERT(cass_data_len == sizeof(float)); float *pdata= (float*) cass_data; field->store(*pdata); + return 0; } bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) @@ -608,7 +610,7 @@ class BigintDataConverter : public ColumnDataConverter longlong buf; bool flip; /* is false when reading counter columns */ public: - void cassandra_to_mariadb(const char *cass_data, int cass_data_len) + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) { longlong tmp; DBUG_ASSERT(cass_data_len == sizeof(longlong)); @@ -617,6 +619,7 @@ public: else memcpy(&tmp, cass_data, sizeof(longlong)); field->store(tmp); + return 0; } bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) @@ -647,10 +650,11 @@ class TinyintDataConverter : public ColumnDataConverter { char buf; public: - void cassandra_to_mariadb(const char *cass_data, int cass_data_len) + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) { DBUG_ASSERT(cass_data_len == 1); field->store(cass_data[0]); + return 0; } bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) @@ -668,12 +672,13 @@ class Int32DataConverter : public ColumnDataConverter { int32_t buf; public: - void cassandra_to_mariadb(const char *cass_data, int cass_data_len) + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) { int32_t tmp; DBUG_ASSERT(cass_data_len == sizeof(int32_t)); flip32(cass_data, (char*)&tmp); field->store(tmp); + return 0; } bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) @@ -691,10 +696,14 @@ public: class StringCopyConverter : public ColumnDataConverter { String buf; + size_t max_length; public: - void cassandra_to_mariadb(const char *cass_data, int cass_data_len) + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) { + if ((size_t)cass_data_len > max_length) + return 1; field->store(cass_data, cass_data_len,field->charset()); + return 0; } bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) @@ -704,6 +713,7 @@ public: *cass_data_len= pstr->length(); return false; } + StringCopyConverter(size_t max_length_arg) : max_length(max_length_arg) {} ~StringCopyConverter(){} }; @@ -712,7 +722,7 @@ class TimestampDataConverter : public ColumnDataConverter { int64_t buf; public: - void cassandra_to_mariadb(const char *cass_data, int cass_data_len) + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) { /* Cassandra data is milliseconds-since-epoch in network byte order */ int64_t tmp; @@ -724,6 +734,7 @@ public: - microsecond fraction of a second. */ ((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000); + return 0; } bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) @@ -768,7 +779,7 @@ class UuidDataConverter : public ColumnDataConverter char buf[16]; /* Binary UUID representation */ String str_buf; public: - void cassandra_to_mariadb(const char *cass_data, int cass_data_len) + int cassandra_to_mariadb(const char *cass_data, int cass_data_len) { DBUG_ASSERT(cass_data_len==16); char str[37]; @@ -783,6 +794,7 @@ public: } *ptr= 0; field->store(str, 36,field->charset()); + return 0; } bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) @@ -836,6 +848,11 @@ const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType"; const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType"; +/* + VARINTs are stored as little-endian big numbers. +*/ +const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerType"; + ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name) { @@ -885,14 +902,20 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_ /* fall through: */ case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VAR_STRING: + { + bool is_varint; if (!strcmp(validator_name, validator_blob) || !strcmp(validator_name, validator_ascii) || - !strcmp(validator_name, validator_text)) + !strcmp(validator_name, validator_text) || + (is_varint= !strcmp(validator_name, validator_varint))) { - res= new StringCopyConverter; + size_t max_size= (size_t)-1; + if (is_varint) + max_size= field->field_length; + res= new StringCopyConverter(max_size); } break; - + } case MYSQL_TYPE_LONG: if (!strcmp(validator_name, validator_int)) res= new Int32DataConverter; @@ -1041,24 +1064,43 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key, /* TODO: what if we're not reading all columns?? */ if (!found) - { rc= HA_ERR_KEY_NOT_FOUND; - } else + rc= read_cassandra_columns(false); + + DBUG_RETURN(rc); +} + + +void ha_cassandra::print_conversion_error(const char *field_name, + char *cass_value, + int cass_value_len) +{ + char buf[32]; + char *p= cass_value; + size_t i= 0; + for (; (i < (int)sizeof(buf)-1) && (p < cass_value + cass_value_len); p++) { - read_cassandra_columns(false); + buf[i++]= map2number[(*p >> 4) & 0xF]; + buf[i++]= map2number[*p & 0xF]; } + buf[i]=0; - DBUG_RETURN(rc); + se->print_error("Unable to convert value for field `%s` from Cassandra's data" + " format. Source data is %d bytes, 0x%s%s", + field_name, cass_value_len, buf, + (i == sizeof(buf) - 1)? "..." : ""); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); } -void ha_cassandra::read_cassandra_columns(bool unpack_pk) +int ha_cassandra::read_cassandra_columns(bool unpack_pk) { char *cass_name; char *cass_value; int cass_value_len; Field **field; + int res= 0; /* cassandra_to_mariadb() calls will use field->store(...) methods, which @@ -1082,7 +1124,14 @@ void ha_cassandra::read_cassandra_columns(bool unpack_pk) { int fieldnr= (*field)->field_index; (*field)->set_notnull(); - field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len); + if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value, + cass_value_len)) + { + print_conversion_error((*field)->field_name, cass_value, + cass_value_len); + res=1; + goto err; + } break; } } @@ -1094,10 +1143,17 @@ void ha_cassandra::read_cassandra_columns(bool unpack_pk) field= table->field; (*field)->set_notnull(); se->get_read_rowkey(&cass_value, &cass_value_len); - rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len); + if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len)) + { + print_conversion_error((*field)->field_name, cass_value, cass_value_len); + res=1; + goto err; + } } +err: dbug_tmp_restore_column_map(table->write_set, old_map); + return res; } @@ -1234,10 +1290,7 @@ int ha_cassandra::rnd_next(uchar *buf) if (reached_eof) rc= HA_ERR_END_OF_FILE; else - { - read_cassandra_columns(true); - rc= 0; - } + rc= read_cassandra_columns(true); } DBUG_RETURN(rc); @@ -1422,8 +1475,7 @@ int ha_cassandra::multi_range_read_next(range_id_t *range_info) { if (!se->get_next_multiget_row()) { - read_cassandra_columns(true); - res= 0; + res= read_cassandra_columns(true); break; } else diff --git a/storage/cassandra/ha_cassandra.h b/storage/cassandra/ha_cassandra.h index 9b11b055af3..52fd46fa5ef 100644 --- a/storage/cassandra/ha_cassandra.h +++ b/storage/cassandra/ha_cassandra.h @@ -58,7 +58,7 @@ class ha_cassandra: public handler bool setup_field_converters(Field **field, uint n_fields); void free_field_converters(); - void read_cassandra_columns(bool unpack_pk); + int read_cassandra_columns(bool unpack_pk); int check_table_options(struct ha_table_option_struct* options); bool doing_insert_batch; @@ -66,6 +66,8 @@ class ha_cassandra: public handler /* Used to produce 'wrong column %s at row %lu' warnings */ ha_rows insert_lineno; + void print_conversion_error(const char *field_name, + char *cass_value, int cass_value_len); public: ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg); ~ha_cassandra() |