summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/r/cassandra.result64
-rw-r--r--mysql-test/t/cassandra.test35
-rw-r--r--sql/sql_join_cache.cc5
-rw-r--r--storage/cassandra/cassandra_se.cc82
-rw-r--r--storage/cassandra/cassandra_se.h14
-rw-r--r--storage/cassandra/ha_cassandra.cc161
-rw-r--r--storage/cassandra/ha_cassandra.h18
7 files changed, 365 insertions, 14 deletions
diff --git a/mysql-test/r/cassandra.result b/mysql-test/r/cassandra.result
index 3b78bd5466a..096e501ceae 100644
--- a/mysql-test/r/cassandra.result
+++ b/mysql-test/r/cassandra.result
@@ -79,6 +79,7 @@ Cassandra_row_inserts 8
Cassandra_row_insert_batches 7
CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+delete from t1;
INSERT INTO t1 VALUES (1,1),(2,2);
DELETE FROM t1 ORDER BY a LIMIT 1;
DROP TABLE t1;
@@ -92,3 +93,66 @@ show status like 'cassandra_row_insert%';
Variable_name Value
Cassandra_row_inserts 10
Cassandra_row_insert_batches 8
+#
+# Batched Key Access
+#
+# Control variable (we are not yet able to make use of MRR's buffer)
+show variables like 'cassandra_multi%';
+Variable_name Value
+cassandra_multiget_batch_size 100
+# MRR-related status variables:
+show status like 'cassandra_multi%';
+Variable_name Value
+Cassandra_multiget_reads 0
+Cassandra_multiget_keys_scanned 0
+Cassandra_multiget_rows_read 0
+CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
+thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+delete from t1;
+INSERT INTO t1 VALUES (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9);
+set @tmp_jcl=@@join_cache_level;
+set join_cache_level=8;
+explain select * from t1 A, t1 B where B.rowkey=A.a;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE A ALL NULL NULL NULL NULL 1000 Using where
+1 SIMPLE B eq_ref PRIMARY PRIMARY 8 test.A.a 1 Using join buffer (flat, BKAH join); multiget_slice
+select * from t1 A, t1 B where B.rowkey=A.a;
+rowkey a rowkey a
+0 0 0 0
+1 1 1 1
+2 2 2 2
+3 3 3 3
+4 4 4 4
+5 5 5 5
+6 6 6 6
+7 7 7 7
+8 8 8 8
+9 9 9 9
+show status like 'cassandra_multi%';
+Variable_name Value
+Cassandra_multiget_reads 1
+Cassandra_multiget_keys_scanned 10
+Cassandra_multiget_rows_read 10
+insert into t1 values(1, 8);
+insert into t1 values(3, 8);
+insert into t1 values(5, 8);
+insert into t1 values(7, 8);
+select * from t1 A, t1 B where B.rowkey=A.a;
+rowkey a rowkey a
+0 0 0 0
+2 2 2 2
+4 4 4 4
+6 6 6 6
+1 8 8 8
+7 8 8 8
+8 8 8 8
+5 8 8 8
+3 8 8 8
+9 9 9 9
+show status like 'cassandra_multi%';
+Variable_name Value
+Cassandra_multiget_reads 2
+Cassandra_multiget_keys_scanned 16
+Cassandra_multiget_rows_read 16
+delete from t1;
+drop table t1;
diff --git a/mysql-test/t/cassandra.test b/mysql-test/t/cassandra.test
index 195f365a372..2d76a4aeb71 100644
--- a/mysql-test/t/cassandra.test
+++ b/mysql-test/t/cassandra.test
@@ -117,6 +117,7 @@ show status like 'cassandra_row_insert%';
CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+delete from t1;
INSERT INTO t1 VALUES (1,1),(2,2);
DELETE FROM t1 ORDER BY a LIMIT 1;
@@ -127,6 +128,40 @@ show status like 'cassandra_row_insert%';
flush status;
show status like 'cassandra_row_insert%';
+--echo #
+--echo # Batched Key Access
+--echo #
+
+--echo # Control variable (we are not yet able to make use of MRR's buffer)
+show variables like 'cassandra_multi%';
+
+--echo # MRR-related status variables:
+show status like 'cassandra_multi%';
+
+CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
+ thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
+delete from t1;
+INSERT INTO t1 VALUES (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9);
+
+set @tmp_jcl=@@join_cache_level;
+set join_cache_level=8;
+explain select * from t1 A, t1 B where B.rowkey=A.a;
+
+select * from t1 A, t1 B where B.rowkey=A.a;
+show status like 'cassandra_multi%';
+
+# The following INSERTs are really UPDATEs
+insert into t1 values(1, 8);
+insert into t1 values(3, 8);
+insert into t1 values(5, 8);
+insert into t1 values(7, 8);
+
+select * from t1 A, t1 B where B.rowkey=A.a;
+show status like 'cassandra_multi%';
+
+
+delete from t1;
+drop table t1;
############################################################################
## Cassandra cleanup
############################################################################
diff --git a/sql/sql_join_cache.cc b/sql/sql_join_cache.cc
index f953cf4df57..d785366ae69 100644
--- a/sql/sql_join_cache.cc
+++ b/sql/sql_join_cache.cc
@@ -3876,8 +3876,11 @@ int JOIN_TAB_SCAN_MRR::next()
If a record in in an incremental cache contains no fields then the
association for the last record in cache will be equal to cache->end_pos
*/
+ /*
+ psergey: this makes no sense where HA_MRR_NO_ASSOC is used.
DBUG_ASSERT(cache->buff <= (uchar *) (*ptr) &&
(uchar *) (*ptr) <= cache->end_pos);
+ */
if (join_tab->table->vfield)
update_virtual_fields(join->thd, join_tab->table);
}
@@ -4543,7 +4546,7 @@ bool JOIN_CACHE_BKAH::prepare_look_for_matches(bool skip_last)
{
last_matching_rec_ref_ptr= next_matching_rec_ref_ptr= 0;
if (no_association &&
- (curr_matching_chain= get_matching_chain_by_join_key()))
+ !(curr_matching_chain= get_matching_chain_by_join_key())) //psergey: added '!'
return 1;
last_matching_rec_ref_ptr= get_next_rec_ref(curr_matching_chain);
return 0;
diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc
index 75ce33cb981..27ff83f7c0d 100644
--- a/storage/cassandra/cassandra_se.cc
+++ b/storage/cassandra/cassandra_se.cc
@@ -100,7 +100,18 @@ public:
/* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
void clear_read_columns();
void add_read_column(const char *name);
-
+
+ /* Reads, MRR scans */
+ void new_lookup_keys();
+ int add_lookup_key(const char *key, size_t key_len);
+ bool multiget_slice();
+
+ std::vector<std::string> mrr_keys; /* TODO: can we use allocator to put them onto MRR buffer? */
+ std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result;
+ std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it;
+
+ bool get_next_multiget_row();
+
bool truncate();
bool remove_row();
@@ -522,3 +533,72 @@ bool Cassandra_se_impl::remove_row()
return res;
}
+
+/////////////////////////////////////////////////////////////////////////////
+// MRR reads
+/////////////////////////////////////////////////////////////////////////////
+
+void Cassandra_se_impl::new_lookup_keys()
+{
+ mrr_keys.clear();
+}
+
+
+int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len)
+{
+ mrr_keys.push_back(std::string(key, key_len));
+ return mrr_keys.size();
+}
+
+
+bool Cassandra_se_impl::multiget_slice()
+{
+ ColumnParent cparent;
+ cparent.column_family= column_family;
+
+ SlicePredicate slice_pred;
+ SliceRange sr;
+ sr.start = "";
+ sr.finish = "";
+ slice_pred.__set_slice_range(sr);
+
+ bool res= true;
+
+ try {
+
+ cassandra_counters.multiget_reads++;
+ cassandra_counters.multiget_keys_scanned += mrr_keys.size();
+
+ cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred,
+ cur_consistency_level);
+
+ cassandra_counters.multiget_rows_read += mrr_result.size();
+
+ res= false;
+ mrr_result_it= mrr_result.begin();
+
+ } catch (InvalidRequestException ire) {
+ print_error("%s [%s]", ire.what(), ire.why.c_str());
+ } catch (UnavailableException ue) {
+ print_error("UnavailableException: %s", ue.what());
+ } catch (TimedOutException te) {
+ print_error("TimedOutException: %s", te.what());
+ }
+
+ return res;
+}
+
+
+bool Cassandra_se_impl::get_next_multiget_row()
+{
+ if (mrr_result_it == mrr_result.end())
+ return true; /* EOF */
+
+ column_data_vec= mrr_result_it->second;
+ rowkey= mrr_result_it->first;
+
+ column_data_it= column_data_vec.begin();
+ mrr_result_it++;
+ return false;
+}
+
diff --git a/storage/cassandra/cassandra_se.h b/storage/cassandra/cassandra_se.h
index 78bf1016fea..d2ece4d9441 100644
--- a/storage/cassandra/cassandra_se.h
+++ b/storage/cassandra/cassandra_se.h
@@ -41,11 +41,16 @@ public:
/* Reads, multi-row scans */
int read_batch_size;
-
virtual bool get_range_slices(bool last_key_as_start_key)=0;
virtual void finish_reading_range_slices()=0;
virtual bool get_next_range_slice_row(bool *eof)=0;
+ /* Reads, MRR scans */
+ virtual void new_lookup_keys()=0;
+ virtual int add_lookup_key(const char *key, size_t key_len)=0;
+ virtual bool multiget_slice()=0;
+ virtual bool get_next_multiget_row()=0;
+
/* read_set setup */
virtual void clear_read_columns()=0;
virtual void add_read_column(const char *name)=0;
@@ -59,13 +64,20 @@ public:
void print_error(const char *format, ...);
};
+
/* A structure with global counters */
class Cassandra_status_vars
{
public:
ulong row_inserts;
ulong row_insert_batches;
+
+ ulong multiget_reads;
+ ulong multiget_keys_scanned;
+ ulong multiget_rows_read;
};
+
+
extern Cassandra_status_vars cassandra_counters;
diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc
index 70d389d4992..9e94c848988 100644
--- a/storage/cassandra/ha_cassandra.cc
+++ b/storage/cassandra/ha_cassandra.cc
@@ -60,15 +60,35 @@ static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
"Number of rows in an INSERT batch",
NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
+static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG,
+ "Number of rows in a multiget(MRR) batch",
+ NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
static struct st_mysql_sys_var* cassandra_system_variables[]= {
MYSQL_SYSVAR(insert_batch_size),
+ MYSQL_SYSVAR(multiget_batch_size),
// MYSQL_SYSVAR(enum_var),
// MYSQL_SYSVAR(ulong_var),
NULL
};
+static SHOW_VAR cassandra_status_variables[]= {
+ {"row_inserts",
+ (char*) &cassandra_counters.row_inserts, SHOW_LONG},
+ {"row_insert_batches",
+ (char*) &cassandra_counters.row_insert_batches, SHOW_LONG},
+
+ {"multiget_reads",
+ (char*) &cassandra_counters.multiget_reads, SHOW_LONG},
+ {"multiget_keys_scanned",
+ (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
+ {"multiget_rows_read",
+ (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG},
+ {NullS, NullS, SHOW_LONG}
+};
+
+
Cassandra_status_vars cassandra_counters;
Cassandra_status_vars cassandra_counters_copy;
@@ -772,8 +792,7 @@ int ha_cassandra::write_row(uchar *buf)
if (doing_insert_batch)
{
res= 0;
- if (++insert_rows_batched >= /*insert_batch_size*/
- THDVAR(table->in_use, insert_batch_size))
+ if (++insert_rows_batched >= THDVAR(table->in_use, insert_batch_size))
{
res= se->do_insert();
insert_rows_batched= 0;
@@ -955,6 +974,135 @@ int ha_cassandra::reset()
return 0;
}
+/////////////////////////////////////////////////////////////////////////////
+// MRR implementation
+/////////////////////////////////////////////////////////////////////////////
+
+
+/*
+ - The key can be only primary key
+ - allow equality-ranges only.
+ - anything else?
+*/
+ha_rows ha_cassandra::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
+ void *seq_init_param,
+ uint n_ranges, uint *bufsz,
+ uint *flags, COST_VECT *cost)
+{
+ /* No support for const ranges so far */
+ return HA_POS_ERROR;
+}
+
+
+ha_rows ha_cassandra::multi_range_read_info(uint keyno, uint n_ranges, uint keys,
+ uint key_parts, uint *bufsz,
+ uint *flags, COST_VECT *cost)
+{
+ /* Can only be equality lookups on the primary key... */
+ // TODO anything else?
+ *flags &= ~HA_MRR_USE_DEFAULT_IMPL;
+ *flags |= HA_MRR_NO_ASSOCIATION;
+
+ return 10;
+}
+
+
+int ha_cassandra::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
+ uint n_ranges, uint mode, HANDLER_BUFFER *buf)
+{
+ int res;
+ mrr_iter= seq->init(seq_init_param, n_ranges, mode);
+ mrr_funcs= *seq;
+ res= mrr_start_read();
+ return (res? HA_ERR_INTERNAL_ERROR: 0);
+}
+
+
+bool ha_cassandra::mrr_start_read()
+{
+ uint key_len;
+
+ my_bitmap_map *old_map;
+ old_map= dbug_tmp_use_all_columns(table, table->read_set);
+
+ se->new_lookup_keys();
+
+ while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
+ {
+ char *cass_key;
+ int cass_key_len;
+
+ DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE);
+
+ uchar *key= (uchar*)mrr_cur_range.start_key.key;
+ key_len= mrr_cur_range.start_key.length;
+ //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS??
+ store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
+
+ rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
+
+ // Primitive buffer control
+ if (se->add_lookup_key(cass_key, cass_key_len) >
+ THDVAR(table->in_use, multiget_batch_size))
+ break;
+ }
+
+ dbug_tmp_restore_column_map(table->read_set, old_map);
+
+ return se->multiget_slice();
+}
+
+
+int ha_cassandra::multi_range_read_next(range_id_t *range_info)
+{
+ int res;
+ while(1)
+ {
+ if (!se->get_next_multiget_row())
+ {
+ read_cassandra_columns(true);
+ res= 0;
+ break;
+ }
+ else
+ {
+ if (source_exhausted)
+ {
+ res= HA_ERR_END_OF_FILE;
+ break;
+ }
+ else
+ {
+ if (mrr_start_read())
+ {
+ res= HA_ERR_INTERNAL_ERROR;
+ break;
+ }
+ }
+ }
+ /*
+ We get here if we've refilled the buffer and done another read. Try
+ reading from results again
+ */
+ }
+ return res;
+}
+
+
+int ha_cassandra::multi_range_read_explain_info(uint mrr_mode, char *str, size_t size)
+{
+ const char *mrr_str= "multiget_slice";
+
+ if (!(mrr_mode & HA_MRR_USE_DEFAULT_IMPL))
+ {
+ uint mrr_str_len= strlen(mrr_str);
+ uint copy_len= min(mrr_str_len, size);
+ memcpy(str, mrr_str, size);
+ return copy_len;
+ }
+ return 0;
+}
+
/////////////////////////////////////////////////////////////////////////////
// Dummy implementations start
@@ -1073,15 +1221,6 @@ bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
// Dummy implementations end
/////////////////////////////////////////////////////////////////////////////
-static SHOW_VAR cassandra_status_variables[]= {
- {"row_inserts",
- (char*) &cassandra_counters.row_inserts, SHOW_LONG},
- {"row_insert_batches",
- (char*) &cassandra_counters.row_insert_batches, SHOW_LONG},
- {NullS, NullS, SHOW_LONG}
-};
-
-
static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff)
{
//innodb_export_status();
diff --git a/storage/cassandra/ha_cassandra.h b/storage/cassandra/ha_cassandra.h
index c8042cfb14a..66e79f4ee70 100644
--- a/storage/cassandra/ha_cassandra.h
+++ b/storage/cassandra/ha_cassandra.h
@@ -154,6 +154,24 @@ public:
virtual int reset();
+
+ int multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
+ uint n_ranges, uint mode, HANDLER_BUFFER *buf);
+ int multi_range_read_next(range_id_t *range_info);
+ ha_rows multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
+ void *seq_init_param,
+ uint n_ranges, uint *bufsz,
+ uint *flags, COST_VECT *cost);
+ ha_rows multi_range_read_info(uint keyno, uint n_ranges, uint keys,
+ uint key_parts, uint *bufsz,
+ uint *flags, COST_VECT *cost);
+ int multi_range_read_explain_info(uint mrr_mode, char *str, size_t size);
+
+private:
+ bool source_exhausted;
+ bool mrr_start_read();
+public:
+
/*
Everything below are methods that we implement in ha_example.cc.