summaryrefslogtreecommitdiff
path: root/storage/cassandra
diff options
context:
space:
mode:
Diffstat (limited to 'storage/cassandra')
-rw-r--r--storage/cassandra/cassandra_se.cc338
-rw-r--r--storage/cassandra/cassandra_se.h3
-rw-r--r--storage/cassandra/ha_cassandra.cc12
3 files changed, 187 insertions, 166 deletions
diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc
index 3be81c5e6bf..7a825a9fc00 100644
--- a/storage/cassandra/cassandra_se.cc
+++ b/storage/cassandra/cassandra_se.cc
@@ -45,6 +45,9 @@ class Cassandra_se_impl: public Cassandra_se_interface
ConsistencyLevel::type write_consistency;
ConsistencyLevel::type read_consistency;
+
+ /* How many times to retry an operation before giving up */
+ int thrift_call_retries_to_do;
/* DDL data */
@@ -72,10 +75,12 @@ class Cassandra_se_impl: public Cassandra_se_interface
SlicePredicate slice_pred;
bool get_slices_returned_less;
+ bool get_slice_found_rows;
public:
Cassandra_se_impl() : cass(NULL),
write_consistency(ConsistencyLevel::ONE),
- read_consistency(ConsistencyLevel::ONE) {}
+ read_consistency(ConsistencyLevel::ONE),
+ thrift_call_retries_to_do(0) {}
virtual ~Cassandra_se_impl(){ delete cass; }
/* Connection and DDL checks */
@@ -94,6 +99,7 @@ public:
void clear_insert_buffer();
void start_row_insert(const char *key, int key_len);
void add_insert_column(const char *name, const char *value, int value_len);
+
bool do_insert();
/* Reads, point lookups */
@@ -105,6 +111,8 @@ public:
private:
bool have_rowkey_to_skip;
std::string rowkey_to_skip;
+
+ bool get_range_slices_param_last_key_as_start_key;
public:
bool get_range_slices(bool last_key_as_start_key);
void finish_reading_range_slices();
@@ -119,19 +127,30 @@ public:
int add_lookup_key(const char *key, size_t key_len);
bool multiget_slice();
-private:
- std::vector<std::string> mrr_keys; /* TODO: can we use allocator to put them onto MRR buffer? */
- std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result;
- std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it;
-public:
bool get_next_multiget_row();
bool truncate();
+
bool remove_row();
private:
+ bool retryable_truncate();
+ bool retryable_do_insert();
+ bool retryable_remove_row();
+ bool retryable_setup_ddl_checks();
+ bool retryable_multiget_slice();
+ bool retryable_get_range_slices();
+ bool retryable_get_slice();
+
+ std::vector<std::string> mrr_keys; /* can we use allocator to put these into MRR buffer? */
+ std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result;
+ std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it;
+
/* Non-inherited utility functions: */
int64_t get_i64_timestamp();
+
+ typedef bool (Cassandra_se_impl::*retryable_func_t)();
+ bool try_operation(retryable_func_t func);
};
@@ -189,34 +208,36 @@ void Cassandra_se_impl::set_consistency_levels(ulong read_cons_level,
}
-bool Cassandra_se_impl::setup_ddl_checks()
+bool Cassandra_se_impl::retryable_setup_ddl_checks()
{
try {
- cass->describe_keyspace(ks_def, keyspace);
- std::vector<CfDef>::iterator it;
- for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++)
- {
- cf_def= *it;
- if (!cf_def.name.compare(column_family))
- return false;
- }
-
- print_error("describe_keyspace() didn't return our column family");
-
- } catch (InvalidRequestException ire) {
- print_error("%s [%s]", ire.what(), ire.why.c_str());
+ cass->describe_keyspace(ks_def, keyspace);
+
} catch (NotFoundException nfe) {
- print_error("keyspace not found: %s", nfe.what());
- }catch(TException e){
- print_error("Thrift exception: %s", e.what());
- } catch (...) {
- print_error("Unknown exception");
+ print_error("keyspace `%s` not found: %s", keyspace.c_str(), nfe.what());
+ return true;
+ }
+
+ std::vector<CfDef>::iterator it;
+ for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++)
+ {
+ cf_def= *it;
+ if (!cf_def.name.compare(column_family))
+ return false;
}
+ print_error("Column family %s not found in keyspace %s",
+ column_family.c_str(),
+ keyspace.c_str());
return true;
}
+bool Cassandra_se_impl::setup_ddl_checks()
+{
+ return try_operation(&Cassandra_se_impl::retryable_setup_ddl_checks);
+}
+
void Cassandra_se_impl::first_ddl_column()
{
@@ -309,41 +330,29 @@ void Cassandra_se_impl::add_insert_column(const char *name, const char *value,
}
+bool Cassandra_se_impl::retryable_do_insert()
+{
+ cass->batch_mutate(batch_mutation, write_consistency);
+
+ cassandra_counters.row_inserts+= batch_mutation.size();
+ cassandra_counters.row_insert_batches++;
+
+ clear_insert_buffer();
+ return 0;
+}
+
+
bool Cassandra_se_impl::do_insert()
{
- bool res= true;
-
/*
- zero-size mutations are allowed by Cassandra's batch_mutate but lets not
+ zero-size mutations are allowed by Cassandra's batch_mutate but lets not
do them (we may attempt to do it if there is a bulk insert that stores
exactly @@cassandra_insert_batch_size*n elements.
*/
if (batch_mutation.empty())
return false;
-
- try {
-
- cass->batch_mutate(batch_mutation, write_consistency);
-
- cassandra_counters.row_inserts+= batch_mutation.size();
- cassandra_counters.row_insert_batches++;
-
- clear_insert_buffer();
- res= false;
-
- } catch (InvalidRequestException ire) {
- print_error("%s [%s]", ire.what(), ire.why.c_str());
- } catch (UnavailableException ue) {
- print_error("UnavailableException: %s", ue.what());
- } catch (TimedOutException te) {
- print_error("TimedOutException: %s", te.what());
- }catch(TException e){
- print_error("Thrift exception: %s", e.what());
- } catch (...) {
- print_error("Unknown exception");
- }
-
- return res;
+
+ return try_operation(&Cassandra_se_impl::retryable_do_insert);
}
@@ -358,47 +367,39 @@ bool Cassandra_se_impl::do_insert()
bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
{
+ bool res;
+ rowkey.assign(key, key_len);
+
+ if (!(res= try_operation(&Cassandra_se_impl::retryable_get_slice)))
+ *found= get_slice_found_rows;
+ return res;
+}
+
+
+bool Cassandra_se_impl::retryable_get_slice()
+{
ColumnParent cparent;
cparent.column_family= column_family;
- rowkey.assign(key, key_len);
-
SlicePredicate slice_pred;
SliceRange sr;
sr.start = "";
sr.finish = "";
slice_pred.__set_slice_range(sr);
- try {
- cass->get_slice(column_data_vec, rowkey, cparent, slice_pred,
- read_consistency);
+ cass->get_slice(column_data_vec, rowkey, cparent, slice_pred,
+ read_consistency);
- if (column_data_vec.size() == 0)
- {
- /*
- No columns found. Cassandra doesn't allow records without any column =>
- this means the seach key doesn't exist
- */
- *found= false;
- return false;
- }
- *found= true;
-
- } catch (InvalidRequestException ire) {
- print_error("%s [%s]", ire.what(), ire.why.c_str());
- return true;
- } catch (UnavailableException ue) {
- print_error("UnavailableException: %s", ue.what());
- return true;
- } catch (TimedOutException te) {
- print_error("TimedOutException: %s", te.what());
- return true;
- }catch(TException e){
- print_error("Thrift exception: %s", e.what());
- } catch (...) {
- print_error("Unknown exception");
- return true;
+ if (column_data_vec.size() == 0)
+ {
+ /*
+ No columns found. Cassandra doesn't allow records without any column =>
+ this means the seach key doesn't exist
+ */
+ get_slice_found_rows= false;
+ return false;
}
+ get_slice_found_rows= true;
column_data_it= column_data_vec.begin();
return false;
@@ -456,7 +457,15 @@ void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len)
bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
{
- bool res= true;
+ get_range_slices_param_last_key_as_start_key= last_key_as_start_key;
+
+ return try_operation(&Cassandra_se_impl::retryable_get_range_slices);
+}
+
+
+bool Cassandra_se_impl::retryable_get_range_slices()
+{
+ bool last_key_as_start_key= get_range_slices_param_last_key_as_start_key;
ColumnParent cparent;
cparent.column_family= column_family;
@@ -482,32 +491,17 @@ bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
key_range.end_key.assign("", 0);
key_range.count= read_batch_size;
- try {
-
- cass->get_range_slices(key_slice_vec,
- cparent, slice_pred, key_range,
- read_consistency);
- res= false;
- if (key_slice_vec.size() < (uint)read_batch_size)
- get_slices_returned_less= true;
- else
- get_slices_returned_less= false;
+ cass->get_range_slices(key_slice_vec, cparent, slice_pred, key_range,
+ read_consistency);
- } catch (InvalidRequestException ire) {
- print_error("%s [%s]", ire.what(), ire.why.c_str());
- } catch (UnavailableException ue) {
- print_error("UnavailableException: %s", ue.what());
- } catch (TimedOutException te) {
- print_error("TimedOutException: %s", te.what());
- }catch(TException e){
- print_error("Thrift exception: %s", e.what());
- } catch (...) {
- print_error("Unknown exception");
- }
+ if (key_slice_vec.size() < (uint)read_batch_size)
+ get_slices_returned_less= true;
+ else
+ get_slices_returned_less= false;
key_slice_it= key_slice_vec.begin();
- return res;
+ return false;
}
@@ -574,50 +568,78 @@ void Cassandra_se_impl::add_read_column(const char *name_arg)
bool Cassandra_se_impl::truncate()
{
- bool res= true;
- try {
-
- cass->truncate(column_family);
- res= false;
+ return try_operation(&Cassandra_se_impl::retryable_truncate);
+}
- } catch (InvalidRequestException ire) {
- print_error("%s [%s]", ire.what(), ire.why.c_str());
- } catch (UnavailableException ue) {
- print_error("UnavailableException: %s", ue.what());
- } catch (TimedOutException te) {
- print_error("TimedOutException: %s", te.what());
- }catch(TException e){
- print_error("Thrift exception: %s", e.what());
- } catch (...) {
- print_error("Unknown exception");
- }
- return res;
+bool Cassandra_se_impl::retryable_truncate()
+{
+ cass->truncate(column_family);
+ return 0;
}
+
bool Cassandra_se_impl::remove_row()
{
- bool res= true;
+ return try_operation(&Cassandra_se_impl::retryable_remove_row);
+}
+
+bool Cassandra_se_impl::retryable_remove_row()
+{
ColumnPath column_path;
column_path.column_family= column_family;
+ cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency);
+ return 0;
+}
- try {
-
- cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency);
- res= false;
+/*
+ This function will try a Cassandra operation, and handle errors.
- } catch (InvalidRequestException ire) {
- print_error("%s [%s]", ire.what(), ire.why.c_str());
- } catch (UnavailableException ue) {
- print_error("UnavailableException: %s", ue.what());
- } catch (TimedOutException te) {
- print_error("TimedOutException: %s", te.what());
- }catch(TException e){
- print_error("Thrift exception: %s", e.what());
- } catch (...) {
- print_error("Unknown exception");
- }
+*/
+bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call)
+{
+ bool res;
+ int n_retries= thrift_call_retries_to_do;
+
+ do
+ {
+ res= true;
+
+ try {
+
+ if ((res= (this->*func_to_call)()))
+ {
+ /*
+ The function call was made successfully (without timeouts, etc),
+ but something inside it returned 'true'.
+ This is supposedly a failure (or "not found" or other negative
+ result). We need to return this to the caller.
+ */
+ n_retries= 0;
+ }
+
+ } catch (InvalidRequestException ire) {
+ n_retries= 0; /* there is no point in retrying this operation */
+ print_error("%s [%s]", ire.what(), ire.why.c_str());
+ } catch (UnavailableException ue) {
+ cassandra_counters.unavailable_exceptions++;
+ if (!--n_retries)
+ print_error("UnavailableException: %s", ue.what());
+ } catch (TimedOutException te) {
+ cassandra_counters.timeout_exceptions++;
+ if (!--n_retries)
+ print_error("TimedOutException: %s", te.what());
+ }catch(TException e){
+ /* todo: we may use retry for certain kinds of Thrift errors */
+ n_retries= 0;
+ print_error("Thrift exception: %s", e.what());
+ } catch (...) {
+ n_retries= 0; /* Don't retry */
+ print_error("Unknown exception");
+ }
+
+ } while (res && n_retries > 0);
return res;
}
@@ -638,9 +660,14 @@ int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len)
return mrr_keys.size();
}
-
bool Cassandra_se_impl::multiget_slice()
{
+ return try_operation(&Cassandra_se_impl::retryable_multiget_slice);
+}
+
+
+bool Cassandra_se_impl::retryable_multiget_slice()
+{
ColumnParent cparent;
cparent.column_family= column_family;
@@ -650,34 +677,15 @@ bool Cassandra_se_impl::multiget_slice()
sr.finish = "";
slice_pred.__set_slice_range(sr);
- bool res= true;
-
- try {
-
- cassandra_counters.multiget_reads++;
- cassandra_counters.multiget_keys_scanned += mrr_keys.size();
-
- cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred,
- read_consistency);
-
- cassandra_counters.multiget_rows_read += mrr_result.size();
-
- res= false;
- mrr_result_it= mrr_result.begin();
+ cassandra_counters.multiget_reads++;
+ cassandra_counters.multiget_keys_scanned += mrr_keys.size();
+ cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred,
+ read_consistency);
- } catch (InvalidRequestException ire) {
- print_error("%s [%s]", ire.what(), ire.why.c_str());
- } catch (UnavailableException ue) {
- print_error("UnavailableException: %s", ue.what());
- } catch (TimedOutException te) {
- print_error("TimedOutException: %s", te.what());
- }catch(TException e){
- print_error("Thrift exception: %s", e.what());
- } catch (...) {
- print_error("Unknown exception");
- }
+ cassandra_counters.multiget_rows_read += mrr_result.size();
+ mrr_result_it= mrr_result.begin();
- return res;
+ return false;
}
diff --git a/storage/cassandra/cassandra_se.h b/storage/cassandra/cassandra_se.h
index 33ef677d276..c6de779f8bc 100644
--- a/storage/cassandra/cassandra_se.h
+++ b/storage/cassandra/cassandra_se.h
@@ -91,6 +91,9 @@ public:
ulong multiget_reads;
ulong multiget_keys_scanned;
ulong multiget_rows_read;
+
+ ulong timeout_exceptions;
+ ulong unavailable_exceptions;
};
diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc
index 18a4a8c2728..187df2a2dd8 100644
--- a/storage/cassandra/ha_cassandra.cc
+++ b/storage/cassandra/ha_cassandra.cc
@@ -82,6 +82,11 @@ static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
"Number of rows in an rnd_read (full scan) batch",
NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
+static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
+ "Number of times to retry Cassandra calls that failed due to timeouts or "
+ "network communication problems. The default, 0, means not to retry.",
+ NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0);
+
/* These match values in enum_cassandra_consistency_level */
const char *cassandra_consistency_level[] =
{
@@ -161,6 +166,7 @@ static struct st_mysql_sys_var* cassandra_system_variables[]= {
MYSQL_SYSVAR(default_thrift_host),
MYSQL_SYSVAR(write_consistency),
MYSQL_SYSVAR(read_consistency),
+ MYSQL_SYSVAR(failure_retries),
NULL
};
@@ -177,6 +183,11 @@ static SHOW_VAR cassandra_status_variables[]= {
(char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
{"multiget_rows_read",
(char*) &cassandra_counters.multiget_rows_read, SHOW_LONG},
+
+ {"timeout_exceptions",
+ (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
+ {"unavailable_exceptions",
+ (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG},
{NullS, NullS, SHOW_LONG}
};
@@ -1678,7 +1689,6 @@ bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff)
{
- //innodb_export_status();
cassandra_counters_copy= cassandra_counters;
var->type= SHOW_ARRAY;