diff options
author | Sergey Petrunya <psergey@askmonty.org> | 2012-08-19 12:50:53 +0400 |
---|---|---|
committer | Sergey Petrunya <psergey@askmonty.org> | 2012-08-19 12:50:53 +0400 |
commit | d36259703b8ffa37ed47ed9dec7f393c8283c4c5 (patch) | |
tree | ed320baeb01dc1736efd2a6354cde120706bef4e | |
parent | 9cdf5eeec91b60fbdffdc7d7a675f47bdb39ff50 (diff) | |
download | mariadb-git-d36259703b8ffa37ed47ed9dec7f393c8283c4c5.tar.gz |
MDEV-431: Cassandra storage engine
- Descriptive error messages
- Unpack PK column on range scans
-rw-r--r-- | mysql-test/r/cassandra.result | 19 | ||||
-rw-r--r-- | mysql-test/t/cassandra.test | 10 | ||||
-rw-r--r-- | storage/cassandra/cassandra_se.cc | 34 | ||||
-rw-r--r-- | storage/cassandra/cassandra_se.h | 2 | ||||
-rw-r--r-- | storage/cassandra/ha_cassandra.cc | 108 | ||||
-rw-r--r-- | storage/cassandra/ha_cassandra.h | 5 |
6 files changed, 147 insertions, 31 deletions
diff --git a/mysql-test/r/cassandra.result b/mysql-test/r/cassandra.result index 4e45685a8c2..b77dcc40c14 100644 --- a/mysql-test/r/cassandra.result +++ b/mysql-test/r/cassandra.result @@ -13,15 +13,26 @@ thrift_host='localhost' keyspace='no_such_keyspace' column_family='colfam'; ERROR HY000: Unable to connect to foreign data source: Default TException. [Keyspace no_such_keyspace does not exist] create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra thrift_host='localhost' keyspace='no_such_keyspace'; -ERROR HY000: Can't create table 'test.t1' (errno: 140) -create table t1 (rowkey char(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra +ERROR HY000: Unable to connect to foreign data source: thrift_host, keyspace, and column_family table options must be s +create table t1 (rowkey varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1'; +select * from t1; +rowkey data1 data2 insert into t1 values ('rowkey10', 'data1-value', 123456); insert into t1 values ('rowkey11', 'data1-value2', 34543); +insert into t1 values ('rowkey12', 'data1-value3', 454); select * from t1; rowkey data1 data2 - data1-value 123456 - data1-value2 34543 +rowkey12 data1-value3 454 +rowkey10 data1-value 123456 +rowkey11 data1-value2 34543 +explain +select * from t1 where rowkey='rowkey11'; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 const PRIMARY PRIMARY 38 const 1 +select * from t1 where rowkey='rowkey11'; +rowkey data1 data2 +rowkey11 data1-value2 34543 delete from t1; select * from t1; rowkey data1 data2 diff --git a/mysql-test/t/cassandra.test b/mysql-test/t/cassandra.test index 80271e48d3e..0f0a1544af5 100644 --- a/mysql-test/t/cassandra.test +++ b/mysql-test/t/cassandra.test @@ -25,7 +25,7 @@ create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra thrift_host='localhost' keyspace='no_such_keyspace' column_family='colfam'; # No column family specified ---error ER_CANT_CREATE_TABLE +--error ER_CONNECT_TO_FOREIGN_DATA_SOURCE create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra thrift_host='localhost' keyspace='no_such_keyspace'; @@ -49,13 +49,19 @@ create columnfamily cf1 ( pk varchar primary key, data1 varchar, data2 bigint); ############################################################################ # Now, create a table for real and insert data -create table t1 (rowkey char(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra +create table t1 (rowkey varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1'; +select * from t1; insert into t1 values ('rowkey10', 'data1-value', 123456); insert into t1 values ('rowkey11', 'data1-value2', 34543); +insert into t1 values ('rowkey12', 'data1-value3', 454); select * from t1; +explain +select * from t1 where rowkey='rowkey11'; +select * from t1 where rowkey='rowkey11'; + # Check if deletion works delete from t1; select * from t1; diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc index c47787d97f4..82b6dcbb93b 100644 --- a/storage/cassandra/cassandra_se.cc +++ b/storage/cassandra/cassandra_se.cc @@ -64,6 +64,8 @@ class Cassandra_se_impl: public Cassandra_se_interface /* Resultset we're reading */ std::vector<KeySlice> key_slice_vec; std::vector<KeySlice>::iterator key_slice_it; + + std::string rowkey; /* key of the record we're returning now */ SlicePredicate slice_pred; public: @@ -77,6 +79,7 @@ public: bool setup_ddl_checks(); void first_ddl_column(); bool next_ddl_column(char **name, int *name_len, char **value, int *value_len); + void get_rowkey_type(char **name, char **type); /* Writes */ void start_prepare_insert(const char *key, int key_len); @@ -86,6 +89,7 @@ public: /* Reads, point lookups */ bool get_slice(char *key, size_t key_len, bool *found); bool get_next_read_column(char **name, char **value, int *value_len); + void get_read_rowkey(char **value, int *value_len); /* Reads, multi-row scans */ bool get_range_slices(); @@ -193,6 +197,21 @@ bool Cassandra_se_impl::next_ddl_column(char **name, int *name_len, return false; } + +void Cassandra_se_impl::get_rowkey_type(char **name, char **type) +{ + if (cf_def.__isset.key_validation_class) + *type= (char*)cf_def.key_validation_class.c_str(); + else + *type= NULL; + + if (cf_def.__isset.key_alias) + *name= (char*)cf_def.key_alias.c_str(); + else + *name= NULL; +} + + ///////////////////////////////////////////////////////////////////////////// // Data writes ///////////////////////////////////////////////////////////////////////////// @@ -269,8 +288,7 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found) ColumnParent cparent; cparent.column_family= column_family; - std::string rowkey_str; - rowkey_str.assign(key, key_len); + rowkey.assign(key, key_len); SlicePredicate slice_pred; SliceRange sr; @@ -279,7 +297,7 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found) slice_pred.__set_slice_range(sr); try { - cass->get_slice(column_data_vec, rowkey_str, cparent, slice_pred, + cass->get_slice(column_data_vec, rowkey, cparent, slice_pred, cur_consistency_level); if (column_data_vec.size() == 0) @@ -333,6 +351,15 @@ bool Cassandra_se_impl::get_next_read_column(char **name, char **value, } +/* Return the rowkey for the record that was read */ + +void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len) +{ + *value= (char*)rowkey.c_str(); + *value_len= rowkey.length(); +} + + bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as parameters { bool res= true; @@ -375,6 +402,7 @@ bool Cassandra_se_impl::get_next_range_slice_row() return true; column_data_vec= key_slice_it->columns; + rowkey= key_slice_it->key; column_data_it= column_data_vec.begin(); key_slice_it++; return false; diff --git a/storage/cassandra/cassandra_se.h b/storage/cassandra/cassandra_se.h index 44b6ac05ca8..6e3380e7f50 100644 --- a/storage/cassandra/cassandra_se.h +++ b/storage/cassandra/cassandra_se.h @@ -25,6 +25,7 @@ public: virtual void first_ddl_column()=0; virtual bool next_ddl_column(char **name, int *name_len, char **value, int *value_len)=0; + virtual void get_rowkey_type(char **name, char **type)=0; /* Writes */ virtual void start_prepare_insert(const char *key, int key_len)=0; @@ -35,6 +36,7 @@ public: /* Reads */ virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ; virtual bool get_next_read_column(char **name, char **value, int *value_len)=0; + virtual void get_read_rowkey(char **value, int *value_len)=0; /* Reads, multi-row scans */ virtual bool get_range_slices()=0; diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc index 21131944bb3..8d7c4051c01 100644 --- a/storage/cassandra/ha_cassandra.cc +++ b/storage/cassandra/ha_cassandra.cc @@ -212,7 +212,7 @@ static handler* cassandra_create_handler(handlerton *hton, ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg) :handler(hton, table_arg), - se(NULL), field_converters(NULL) + se(NULL), field_converters(NULL),rowkey_converter(NULL) {} @@ -251,7 +251,6 @@ int ha_cassandra::open(const char *name, int mode, uint test_if_locked) if (setup_field_converters(table->field, table->s->fields)) { - my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "setup_field_converters"); DBUG_RETURN(HA_ERR_NO_CONNECTION); } @@ -341,8 +340,12 @@ int ha_cassandra::create(const char *name, TABLE *table_arg, */ #endif DBUG_ASSERT(!se); - if (!options->host || !options->keyspace || !options->column_family) + if (!options->host || !options->keyspace || !options->column_family) + { + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), + "thrift_host, keyspace, and column_family table options must be specified"); DBUG_RETURN(HA_WRONG_CREATE_OPTION); + } se= get_cassandra_se(); se->set_column_family(options->column_family); if (se->connect(options->host, options->keyspace)) @@ -515,6 +518,7 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_ case MYSQL_TYPE_VAR_STRING: case MYSQL_TYPE_VARCHAR: + //case MYSQL_TYPE_STRING: <-- todo: should we allow end-padded 'CHAR(N)'? if (!strcmp(validator_name, validator_blob) || !strcmp(validator_name, validator_ascii) || !strcmp(validator_name, validator_text)) @@ -560,7 +564,12 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields) n_mapped++; ColumnDataConverter **conv= field_converters + (*field)->field_index; if (!(*conv= map_field_to_validator(*field, col_type))) + { + se->print_error("Failed to map column %s to datatype %s", + (*field)->field_name, col_type); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); return true; + } (*conv)->field= *field; } } @@ -568,6 +577,28 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields) if (n_mapped != n_fields - 1) return true; + + /* + Setup type conversion for row_key. It may also have a name, but we ignore + it currently + */ + se->get_rowkey_type(&col_name, &col_type); + if (col_type != NULL) + { + if (!(rowkey_converter= map_field_to_validator(*field_arg, col_type))) + { + se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + return true; + } + rowkey_converter->field= *field_arg; + } + else + { + se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)"); + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + return true; + } return false; } @@ -575,6 +606,9 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields) void ha_cassandra::free_field_converters() { + delete rowkey_converter; + rowkey_converter= NULL; + if (field_converters) { for (uint i=0; i < n_field_converters; i++) @@ -588,8 +622,8 @@ void ha_cassandra::free_field_converters() void store_key_image_to_rec(Field *field, uchar *ptr, uint len); int ha_cassandra::index_read_map(uchar *buf, const uchar *key, - key_part_map keypart_map, - enum ha_rkey_function find_flag) + key_part_map keypart_map, + enum ha_rkey_function find_flag) { int rc; DBUG_ENTER("ha_cassandra::index_read_map"); @@ -597,19 +631,26 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key, if (find_flag != HA_READ_KEY_EXACT) DBUG_RETURN(HA_ERR_WRONG_COMMAND); - // todo: decode the search key. uint key_len= calculate_key_len(table, active_index, key, keypart_map); store_key_image_to_rec(table->field[0], (uchar*)key, key_len); - +#if 0 char buff[256]; String tmp(buff,sizeof(buff), &my_charset_bin); tmp.length(0); String *str; str= table->field[0]->val_str(&tmp); - +#endif + + char *cass_key; + int cass_key_len; + rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len); + bool found; - if (se->get_slice((char*)str->ptr(), str->length(), &found)) + if (se->get_slice(cass_key, cass_key_len, &found)) + { + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); rc= HA_ERR_INTERNAL_ERROR; + } /* TODO: what if we're not reading all columns?? */ if (!found) @@ -618,14 +659,14 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key, } else { - read_cassandra_columns(); + read_cassandra_columns(false); } DBUG_RETURN(rc); } -void ha_cassandra::read_cassandra_columns() +void ha_cassandra::read_cassandra_columns(bool unpack_pk) { char *cass_name; char *cass_value; @@ -659,6 +700,15 @@ void ha_cassandra::read_cassandra_columns() } } } + + if (unpack_pk) + { + /* Unpack rowkey to primary key */ + field= table->field; + (*field)->set_notnull(); + se->get_read_rowkey(&cass_value, &cass_value_len); + rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len); + } dbug_tmp_restore_column_map(table->write_set, old_map); } @@ -667,12 +717,13 @@ void ha_cassandra::read_cassandra_columns() int ha_cassandra::write_row(uchar *buf) { my_bitmap_map *old_map; - char buff[512]; +// char buff[512]; DBUG_ENTER("ha_cassandra::write_row"); old_map= dbug_tmp_use_all_columns(table, table->read_set); /* Convert the key (todo: unify with the rest of the processing) */ +#if 0 { Field *pk_col= table->field[0]; String tmp(buff,sizeof(buff), &my_charset_bin); @@ -682,6 +733,11 @@ int ha_cassandra::write_row(uchar *buf) se->start_prepare_insert(str->ptr(), str->length()); } +#endif + char *cass_key; + int cass_key_len; + rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len); + se->start_prepare_insert(cass_key, cass_key_len); /* Convert other fields */ for (uint i= 1; i < table->s->fields; i++) @@ -697,6 +753,9 @@ int ha_cassandra::write_row(uchar *buf) bool res= se->do_insert(); + if (res) + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); + DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0); } @@ -713,6 +772,8 @@ int ha_cassandra::rnd_init(bool scan) se->add_read_column(table->field[i]->field_name); bres= se->get_range_slices(); + if (bres) + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0); } @@ -739,7 +800,7 @@ int ha_cassandra::rnd_next(uchar *buf) } else { - read_cassandra_columns(); + read_cassandra_columns(true); rc= 0; } @@ -753,11 +814,22 @@ int ha_cassandra::delete_all_rows() DBUG_ENTER("ha_cassandra::delete_all_rows"); bres= se->truncate(); + + if (bres) + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0); } +int ha_cassandra::delete_row(const uchar *buf) +{ + DBUG_ENTER("ha_cassandra::delete_row"); + // todo: delete the row we've just read. + DBUG_RETURN(HA_ERR_WRONG_COMMAND); +} + + ///////////////////////////////////////////////////////////////////////////// // Dummy implementations start ///////////////////////////////////////////////////////////////////////////// @@ -815,7 +887,8 @@ ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key, key_range *max_key) { DBUG_ENTER("ha_cassandra::records_in_range"); - DBUG_RETURN(10); // low number to force index usage + //DBUG_RETURN(10); // low number to force index usage + DBUG_RETURN(HA_POS_ERROR); } @@ -866,13 +939,6 @@ int ha_cassandra::delete_table(const char *name) } -int ha_cassandra::delete_row(const uchar *buf) -{ - DBUG_ENTER("ha_cassandra::delete_row"); - DBUG_RETURN(HA_ERR_WRONG_COMMAND); -} - - /** check_if_incompatible_data() called if ALTER TABLE can't detect otherwise if new and old definition are compatible diff --git a/storage/cassandra/ha_cassandra.h b/storage/cassandra/ha_cassandra.h index 30958e4e17c..7a163363ab6 100644 --- a/storage/cassandra/ha_cassandra.h +++ b/storage/cassandra/ha_cassandra.h @@ -38,10 +38,13 @@ class ha_cassandra: public handler ColumnDataConverter **field_converters; uint n_field_converters; + + ColumnDataConverter *rowkey_converter; + bool setup_field_converters(Field **field, uint n_fields); void free_field_converters(); - void read_cassandra_columns(); + void read_cassandra_columns(bool unpack_pk); public: ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg); ~ha_cassandra() |