diff options
-rw-r--r-- | mysql-test/r/cassandra.result | 64 | ||||
-rw-r--r-- | mysql-test/t/cassandra.test | 35 | ||||
-rw-r--r-- | sql/sql_join_cache.cc | 5 | ||||
-rw-r--r-- | storage/cassandra/cassandra_se.cc | 82 | ||||
-rw-r--r-- | storage/cassandra/cassandra_se.h | 14 | ||||
-rw-r--r-- | storage/cassandra/ha_cassandra.cc | 161 | ||||
-rw-r--r-- | storage/cassandra/ha_cassandra.h | 18 |
7 files changed, 365 insertions, 14 deletions
diff --git a/mysql-test/r/cassandra.result b/mysql-test/r/cassandra.result index 3b78bd5466a..096e501ceae 100644 --- a/mysql-test/r/cassandra.result +++ b/mysql-test/r/cassandra.result @@ -79,6 +79,7 @@ Cassandra_row_inserts 8 Cassandra_row_insert_batches 7 CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2'; +delete from t1; INSERT INTO t1 VALUES (1,1),(2,2); DELETE FROM t1 ORDER BY a LIMIT 1; DROP TABLE t1; @@ -92,3 +93,66 @@ show status like 'cassandra_row_insert%'; Variable_name Value Cassandra_row_inserts 10 Cassandra_row_insert_batches 8 +# +# Batched Key Access +# +# Control variable (we are not yet able to make use of MRR's buffer) +show variables like 'cassandra_multi%'; +Variable_name Value +cassandra_multiget_batch_size 100 +# MRR-related status variables: +show status like 'cassandra_multi%'; +Variable_name Value +Cassandra_multiget_reads 0 +Cassandra_multiget_keys_scanned 0 +Cassandra_multiget_rows_read 0 +CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA +thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2'; +delete from t1; +INSERT INTO t1 VALUES (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9); +set @tmp_jcl=@@join_cache_level; +set join_cache_level=8; +explain select * from t1 A, t1 B where B.rowkey=A.a; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE A ALL NULL NULL NULL NULL 1000 Using where +1 SIMPLE B eq_ref PRIMARY PRIMARY 8 test.A.a 1 Using join buffer (flat, BKAH join); multiget_slice +select * from t1 A, t1 B where B.rowkey=A.a; +rowkey a rowkey a +0 0 0 0 +1 1 1 1 +2 2 2 2 +3 3 3 3 +4 4 4 4 +5 5 5 5 +6 6 6 6 +7 7 7 7 +8 8 8 8 +9 9 9 9 +show status like 'cassandra_multi%'; +Variable_name Value +Cassandra_multiget_reads 1 +Cassandra_multiget_keys_scanned 10 +Cassandra_multiget_rows_read 10 +insert into t1 values(1, 8); +insert into t1 values(3, 8); +insert into t1 values(5, 8); +insert into t1 values(7, 8); +select * from t1 A, t1 B where B.rowkey=A.a; +rowkey a rowkey a +0 0 0 0 +2 2 2 2 +4 4 4 4 +6 6 6 6 +1 8 8 8 +7 8 8 8 +8 8 8 8 +5 8 8 8 +3 8 8 8 +9 9 9 9 +show status like 'cassandra_multi%'; +Variable_name Value +Cassandra_multiget_reads 2 +Cassandra_multiget_keys_scanned 16 +Cassandra_multiget_rows_read 16 +delete from t1; +drop table t1; diff --git a/mysql-test/t/cassandra.test b/mysql-test/t/cassandra.test index 195f365a372..2d76a4aeb71 100644 --- a/mysql-test/t/cassandra.test +++ b/mysql-test/t/cassandra.test @@ -117,6 +117,7 @@ show status like 'cassandra_row_insert%'; CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2'; +delete from t1; INSERT INTO t1 VALUES (1,1),(2,2); DELETE FROM t1 ORDER BY a LIMIT 1; @@ -127,6 +128,40 @@ show status like 'cassandra_row_insert%'; flush status; show status like 'cassandra_row_insert%'; +--echo # +--echo # Batched Key Access +--echo # + +--echo # Control variable (we are not yet able to make use of MRR's buffer) +show variables like 'cassandra_multi%'; + +--echo # MRR-related status variables: +show status like 'cassandra_multi%'; + +CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA + thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2'; +delete from t1; +INSERT INTO t1 VALUES (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9); + +set @tmp_jcl=@@join_cache_level; +set join_cache_level=8; +explain select * from t1 A, t1 B where B.rowkey=A.a; + +select * from t1 A, t1 B where B.rowkey=A.a; +show status like 'cassandra_multi%'; + +# The following INSERTs are really UPDATEs +insert into t1 values(1, 8); +insert into t1 values(3, 8); +insert into t1 values(5, 8); +insert into t1 values(7, 8); + +select * from t1 A, t1 B where B.rowkey=A.a; +show status like 'cassandra_multi%'; + + +delete from t1; +drop table t1; ############################################################################ ## Cassandra cleanup ############################################################################ diff --git a/sql/sql_join_cache.cc b/sql/sql_join_cache.cc index f953cf4df57..d785366ae69 100644 --- a/sql/sql_join_cache.cc +++ b/sql/sql_join_cache.cc @@ -3876,8 +3876,11 @@ int JOIN_TAB_SCAN_MRR::next() If a record in in an incremental cache contains no fields then the association for the last record in cache will be equal to cache->end_pos */ + /* + psergey: this makes no sense where HA_MRR_NO_ASSOC is used. DBUG_ASSERT(cache->buff <= (uchar *) (*ptr) && (uchar *) (*ptr) <= cache->end_pos); + */ if (join_tab->table->vfield) update_virtual_fields(join->thd, join_tab->table); } @@ -4543,7 +4546,7 @@ bool JOIN_CACHE_BKAH::prepare_look_for_matches(bool skip_last) { last_matching_rec_ref_ptr= next_matching_rec_ref_ptr= 0; if (no_association && - (curr_matching_chain= get_matching_chain_by_join_key())) + !(curr_matching_chain= get_matching_chain_by_join_key())) //psergey: added '!' return 1; last_matching_rec_ref_ptr= get_next_rec_ref(curr_matching_chain); return 0; diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc index 75ce33cb981..27ff83f7c0d 100644 --- a/storage/cassandra/cassandra_se.cc +++ b/storage/cassandra/cassandra_se.cc @@ -100,7 +100,18 @@ public: /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */ void clear_read_columns(); void add_read_column(const char *name); - + + /* Reads, MRR scans */ + void new_lookup_keys(); + int add_lookup_key(const char *key, size_t key_len); + bool multiget_slice(); + + std::vector<std::string> mrr_keys; /* TODO: can we use allocator to put them onto MRR buffer? */ + std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result; + std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it; + + bool get_next_multiget_row(); + bool truncate(); bool remove_row(); @@ -522,3 +533,72 @@ bool Cassandra_se_impl::remove_row() return res; } + +///////////////////////////////////////////////////////////////////////////// +// MRR reads +///////////////////////////////////////////////////////////////////////////// + +void Cassandra_se_impl::new_lookup_keys() +{ + mrr_keys.clear(); +} + + +int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len) +{ + mrr_keys.push_back(std::string(key, key_len)); + return mrr_keys.size(); +} + + +bool Cassandra_se_impl::multiget_slice() +{ + ColumnParent cparent; + cparent.column_family= column_family; + + SlicePredicate slice_pred; + SliceRange sr; + sr.start = ""; + sr.finish = ""; + slice_pred.__set_slice_range(sr); + + bool res= true; + + try { + + cassandra_counters.multiget_reads++; + cassandra_counters.multiget_keys_scanned += mrr_keys.size(); + + cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred, + cur_consistency_level); + + cassandra_counters.multiget_rows_read += mrr_result.size(); + + res= false; + mrr_result_it= mrr_result.begin(); + + } catch (InvalidRequestException ire) { + print_error("%s [%s]", ire.what(), ire.why.c_str()); + } catch (UnavailableException ue) { + print_error("UnavailableException: %s", ue.what()); + } catch (TimedOutException te) { + print_error("TimedOutException: %s", te.what()); + } + + return res; +} + + +bool Cassandra_se_impl::get_next_multiget_row() +{ + if (mrr_result_it == mrr_result.end()) + return true; /* EOF */ + + column_data_vec= mrr_result_it->second; + rowkey= mrr_result_it->first; + + column_data_it= column_data_vec.begin(); + mrr_result_it++; + return false; +} + diff --git a/storage/cassandra/cassandra_se.h b/storage/cassandra/cassandra_se.h index 78bf1016fea..d2ece4d9441 100644 --- a/storage/cassandra/cassandra_se.h +++ b/storage/cassandra/cassandra_se.h @@ -41,11 +41,16 @@ public: /* Reads, multi-row scans */ int read_batch_size; - virtual bool get_range_slices(bool last_key_as_start_key)=0; virtual void finish_reading_range_slices()=0; virtual bool get_next_range_slice_row(bool *eof)=0; + /* Reads, MRR scans */ + virtual void new_lookup_keys()=0; + virtual int add_lookup_key(const char *key, size_t key_len)=0; + virtual bool multiget_slice()=0; + virtual bool get_next_multiget_row()=0; + /* read_set setup */ virtual void clear_read_columns()=0; virtual void add_read_column(const char *name)=0; @@ -59,13 +64,20 @@ public: void print_error(const char *format, ...); }; + /* A structure with global counters */ class Cassandra_status_vars { public: ulong row_inserts; ulong row_insert_batches; + + ulong multiget_reads; + ulong multiget_keys_scanned; + ulong multiget_rows_read; }; + + extern Cassandra_status_vars cassandra_counters; diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc index 70d389d4992..9e94c848988 100644 --- a/storage/cassandra/ha_cassandra.cc +++ b/storage/cassandra/ha_cassandra.cc @@ -60,15 +60,35 @@ static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG, "Number of rows in an INSERT batch", NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0); +static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG, + "Number of rows in a multiget(MRR) batch", + NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0); static struct st_mysql_sys_var* cassandra_system_variables[]= { MYSQL_SYSVAR(insert_batch_size), + MYSQL_SYSVAR(multiget_batch_size), // MYSQL_SYSVAR(enum_var), // MYSQL_SYSVAR(ulong_var), NULL }; +static SHOW_VAR cassandra_status_variables[]= { + {"row_inserts", + (char*) &cassandra_counters.row_inserts, SHOW_LONG}, + {"row_insert_batches", + (char*) &cassandra_counters.row_insert_batches, SHOW_LONG}, + + {"multiget_reads", + (char*) &cassandra_counters.multiget_reads, SHOW_LONG}, + {"multiget_keys_scanned", + (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG}, + {"multiget_rows_read", + (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG}, + {NullS, NullS, SHOW_LONG} +}; + + Cassandra_status_vars cassandra_counters; Cassandra_status_vars cassandra_counters_copy; @@ -772,8 +792,7 @@ int ha_cassandra::write_row(uchar *buf) if (doing_insert_batch) { res= 0; - if (++insert_rows_batched >= /*insert_batch_size*/ - THDVAR(table->in_use, insert_batch_size)) + if (++insert_rows_batched >= THDVAR(table->in_use, insert_batch_size)) { res= se->do_insert(); insert_rows_batched= 0; @@ -955,6 +974,135 @@ int ha_cassandra::reset() return 0; } +///////////////////////////////////////////////////////////////////////////// +// MRR implementation +///////////////////////////////////////////////////////////////////////////// + + +/* + - The key can be only primary key + - allow equality-ranges only. + - anything else? +*/ +ha_rows ha_cassandra::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, + void *seq_init_param, + uint n_ranges, uint *bufsz, + uint *flags, COST_VECT *cost) +{ + /* No support for const ranges so far */ + return HA_POS_ERROR; +} + + +ha_rows ha_cassandra::multi_range_read_info(uint keyno, uint n_ranges, uint keys, + uint key_parts, uint *bufsz, + uint *flags, COST_VECT *cost) +{ + /* Can only be equality lookups on the primary key... */ + // TODO anything else? + *flags &= ~HA_MRR_USE_DEFAULT_IMPL; + *flags |= HA_MRR_NO_ASSOCIATION; + + return 10; +} + + +int ha_cassandra::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param, + uint n_ranges, uint mode, HANDLER_BUFFER *buf) +{ + int res; + mrr_iter= seq->init(seq_init_param, n_ranges, mode); + mrr_funcs= *seq; + res= mrr_start_read(); + return (res? HA_ERR_INTERNAL_ERROR: 0); +} + + +bool ha_cassandra::mrr_start_read() +{ + uint key_len; + + my_bitmap_map *old_map; + old_map= dbug_tmp_use_all_columns(table, table->read_set); + + se->new_lookup_keys(); + + while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range))) + { + char *cass_key; + int cass_key_len; + + DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE); + + uchar *key= (uchar*)mrr_cur_range.start_key.key; + key_len= mrr_cur_range.start_key.length; + //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS?? + store_key_image_to_rec(table->field[0], (uchar*)key, key_len); + + rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len); + + // Primitive buffer control + if (se->add_lookup_key(cass_key, cass_key_len) > + THDVAR(table->in_use, multiget_batch_size)) + break; + } + + dbug_tmp_restore_column_map(table->read_set, old_map); + + return se->multiget_slice(); +} + + +int ha_cassandra::multi_range_read_next(range_id_t *range_info) +{ + int res; + while(1) + { + if (!se->get_next_multiget_row()) + { + read_cassandra_columns(true); + res= 0; + break; + } + else + { + if (source_exhausted) + { + res= HA_ERR_END_OF_FILE; + break; + } + else + { + if (mrr_start_read()) + { + res= HA_ERR_INTERNAL_ERROR; + break; + } + } + } + /* + We get here if we've refilled the buffer and done another read. Try + reading from results again + */ + } + return res; +} + + +int ha_cassandra::multi_range_read_explain_info(uint mrr_mode, char *str, size_t size) +{ + const char *mrr_str= "multiget_slice"; + + if (!(mrr_mode & HA_MRR_USE_DEFAULT_IMPL)) + { + uint mrr_str_len= strlen(mrr_str); + uint copy_len= min(mrr_str_len, size); + memcpy(str, mrr_str, size); + return copy_len; + } + return 0; +} + ///////////////////////////////////////////////////////////////////////////// // Dummy implementations start @@ -1073,15 +1221,6 @@ bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info, // Dummy implementations end ///////////////////////////////////////////////////////////////////////////// -static SHOW_VAR cassandra_status_variables[]= { - {"row_inserts", - (char*) &cassandra_counters.row_inserts, SHOW_LONG}, - {"row_insert_batches", - (char*) &cassandra_counters.row_insert_batches, SHOW_LONG}, - {NullS, NullS, SHOW_LONG} -}; - - static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff) { //innodb_export_status(); diff --git a/storage/cassandra/ha_cassandra.h b/storage/cassandra/ha_cassandra.h index c8042cfb14a..66e79f4ee70 100644 --- a/storage/cassandra/ha_cassandra.h +++ b/storage/cassandra/ha_cassandra.h @@ -154,6 +154,24 @@ public: virtual int reset(); + + int multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param, + uint n_ranges, uint mode, HANDLER_BUFFER *buf); + int multi_range_read_next(range_id_t *range_info); + ha_rows multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, + void *seq_init_param, + uint n_ranges, uint *bufsz, + uint *flags, COST_VECT *cost); + ha_rows multi_range_read_info(uint keyno, uint n_ranges, uint keys, + uint key_parts, uint *bufsz, + uint *flags, COST_VECT *cost); + int multi_range_read_explain_info(uint mrr_mode, char *str, size_t size); + +private: + bool source_exhausted; + bool mrr_start_read(); +public: + /* Everything below are methods that we implement in ha_example.cc. |