diff options
author | Sergey Petrunya <psergey@askmonty.org> | 2012-09-26 14:13:03 +0400 |
---|---|---|
committer | Sergey Petrunya <psergey@askmonty.org> | 2012-09-26 14:13:03 +0400 |
commit | 344c0ea4232a4954a6eb83195bb78cace03bd4c4 (patch) | |
tree | f6bdcabff1b29bf63c588c99d65adb43715246c3 /storage/cassandra | |
parent | 73dfd5782bf2ec845dc5490de22d9ef8ea9f7326 (diff) | |
download | mariadb-git-344c0ea4232a4954a6eb83195bb78cace03bd4c4.tar.gz |
Cassandra SE: Add capability to retry failed API calls
- Add capability to retry calls that have failed with UnavailableException or
[Cassandra's] TimedOutException.
- We don't retry for Thrift errors yet, although could easily do, now.
Diffstat (limited to 'storage/cassandra')
-rw-r--r-- | storage/cassandra/cassandra_se.cc | 338 | ||||
-rw-r--r-- | storage/cassandra/cassandra_se.h | 3 | ||||
-rw-r--r-- | storage/cassandra/ha_cassandra.cc | 12 |
3 files changed, 187 insertions, 166 deletions
diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc index 3be81c5e6bf..7a825a9fc00 100644 --- a/storage/cassandra/cassandra_se.cc +++ b/storage/cassandra/cassandra_se.cc @@ -45,6 +45,9 @@ class Cassandra_se_impl: public Cassandra_se_interface ConsistencyLevel::type write_consistency; ConsistencyLevel::type read_consistency; + + /* How many times to retry an operation before giving up */ + int thrift_call_retries_to_do; /* DDL data */ @@ -72,10 +75,12 @@ class Cassandra_se_impl: public Cassandra_se_interface SlicePredicate slice_pred; bool get_slices_returned_less; + bool get_slice_found_rows; public: Cassandra_se_impl() : cass(NULL), write_consistency(ConsistencyLevel::ONE), - read_consistency(ConsistencyLevel::ONE) {} + read_consistency(ConsistencyLevel::ONE), + thrift_call_retries_to_do(0) {} virtual ~Cassandra_se_impl(){ delete cass; } /* Connection and DDL checks */ @@ -94,6 +99,7 @@ public: void clear_insert_buffer(); void start_row_insert(const char *key, int key_len); void add_insert_column(const char *name, const char *value, int value_len); + bool do_insert(); /* Reads, point lookups */ @@ -105,6 +111,8 @@ public: private: bool have_rowkey_to_skip; std::string rowkey_to_skip; + + bool get_range_slices_param_last_key_as_start_key; public: bool get_range_slices(bool last_key_as_start_key); void finish_reading_range_slices(); @@ -119,19 +127,30 @@ public: int add_lookup_key(const char *key, size_t key_len); bool multiget_slice(); -private: - std::vector<std::string> mrr_keys; /* TODO: can we use allocator to put them onto MRR buffer? */ - std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result; - std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it; -public: bool get_next_multiget_row(); bool truncate(); + bool remove_row(); private: + bool retryable_truncate(); + bool retryable_do_insert(); + bool retryable_remove_row(); + bool retryable_setup_ddl_checks(); + bool retryable_multiget_slice(); + bool retryable_get_range_slices(); + bool retryable_get_slice(); + + std::vector<std::string> mrr_keys; /* can we use allocator to put these into MRR buffer? */ + std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result; + std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it; + /* Non-inherited utility functions: */ int64_t get_i64_timestamp(); + + typedef bool (Cassandra_se_impl::*retryable_func_t)(); + bool try_operation(retryable_func_t func); }; @@ -189,34 +208,36 @@ void Cassandra_se_impl::set_consistency_levels(ulong read_cons_level, } -bool Cassandra_se_impl::setup_ddl_checks() +bool Cassandra_se_impl::retryable_setup_ddl_checks() { try { - cass->describe_keyspace(ks_def, keyspace); - std::vector<CfDef>::iterator it; - for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++) - { - cf_def= *it; - if (!cf_def.name.compare(column_family)) - return false; - } - - print_error("describe_keyspace() didn't return our column family"); - - } catch (InvalidRequestException ire) { - print_error("%s [%s]", ire.what(), ire.why.c_str()); + cass->describe_keyspace(ks_def, keyspace); + } catch (NotFoundException nfe) { - print_error("keyspace not found: %s", nfe.what()); - }catch(TException e){ - print_error("Thrift exception: %s", e.what()); - } catch (...) { - print_error("Unknown exception"); + print_error("keyspace `%s` not found: %s", keyspace.c_str(), nfe.what()); + return true; + } + + std::vector<CfDef>::iterator it; + for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++) + { + cf_def= *it; + if (!cf_def.name.compare(column_family)) + return false; } + print_error("Column family %s not found in keyspace %s", + column_family.c_str(), + keyspace.c_str()); return true; } +bool Cassandra_se_impl::setup_ddl_checks() +{ + return try_operation(&Cassandra_se_impl::retryable_setup_ddl_checks); +} + void Cassandra_se_impl::first_ddl_column() { @@ -309,41 +330,29 @@ void Cassandra_se_impl::add_insert_column(const char *name, const char *value, } +bool Cassandra_se_impl::retryable_do_insert() +{ + cass->batch_mutate(batch_mutation, write_consistency); + + cassandra_counters.row_inserts+= batch_mutation.size(); + cassandra_counters.row_insert_batches++; + + clear_insert_buffer(); + return 0; +} + + bool Cassandra_se_impl::do_insert() { - bool res= true; - /* - zero-size mutations are allowed by Cassandra's batch_mutate but lets not + zero-size mutations are allowed by Cassandra's batch_mutate but lets not do them (we may attempt to do it if there is a bulk insert that stores exactly @@cassandra_insert_batch_size*n elements. */ if (batch_mutation.empty()) return false; - - try { - - cass->batch_mutate(batch_mutation, write_consistency); - - cassandra_counters.row_inserts+= batch_mutation.size(); - cassandra_counters.row_insert_batches++; - - clear_insert_buffer(); - res= false; - - } catch (InvalidRequestException ire) { - print_error("%s [%s]", ire.what(), ire.why.c_str()); - } catch (UnavailableException ue) { - print_error("UnavailableException: %s", ue.what()); - } catch (TimedOutException te) { - print_error("TimedOutException: %s", te.what()); - }catch(TException e){ - print_error("Thrift exception: %s", e.what()); - } catch (...) { - print_error("Unknown exception"); - } - - return res; + + return try_operation(&Cassandra_se_impl::retryable_do_insert); } @@ -358,47 +367,39 @@ bool Cassandra_se_impl::do_insert() bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found) { + bool res; + rowkey.assign(key, key_len); + + if (!(res= try_operation(&Cassandra_se_impl::retryable_get_slice))) + *found= get_slice_found_rows; + return res; +} + + +bool Cassandra_se_impl::retryable_get_slice() +{ ColumnParent cparent; cparent.column_family= column_family; - rowkey.assign(key, key_len); - SlicePredicate slice_pred; SliceRange sr; sr.start = ""; sr.finish = ""; slice_pred.__set_slice_range(sr); - try { - cass->get_slice(column_data_vec, rowkey, cparent, slice_pred, - read_consistency); + cass->get_slice(column_data_vec, rowkey, cparent, slice_pred, + read_consistency); - if (column_data_vec.size() == 0) - { - /* - No columns found. Cassandra doesn't allow records without any column => - this means the seach key doesn't exist - */ - *found= false; - return false; - } - *found= true; - - } catch (InvalidRequestException ire) { - print_error("%s [%s]", ire.what(), ire.why.c_str()); - return true; - } catch (UnavailableException ue) { - print_error("UnavailableException: %s", ue.what()); - return true; - } catch (TimedOutException te) { - print_error("TimedOutException: %s", te.what()); - return true; - }catch(TException e){ - print_error("Thrift exception: %s", e.what()); - } catch (...) { - print_error("Unknown exception"); - return true; + if (column_data_vec.size() == 0) + { + /* + No columns found. Cassandra doesn't allow records without any column => + this means the seach key doesn't exist + */ + get_slice_found_rows= false; + return false; } + get_slice_found_rows= true; column_data_it= column_data_vec.begin(); return false; @@ -456,7 +457,15 @@ void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len) bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key) { - bool res= true; + get_range_slices_param_last_key_as_start_key= last_key_as_start_key; + + return try_operation(&Cassandra_se_impl::retryable_get_range_slices); +} + + +bool Cassandra_se_impl::retryable_get_range_slices() +{ + bool last_key_as_start_key= get_range_slices_param_last_key_as_start_key; ColumnParent cparent; cparent.column_family= column_family; @@ -482,32 +491,17 @@ bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key) key_range.end_key.assign("", 0); key_range.count= read_batch_size; - try { - - cass->get_range_slices(key_slice_vec, - cparent, slice_pred, key_range, - read_consistency); - res= false; - if (key_slice_vec.size() < (uint)read_batch_size) - get_slices_returned_less= true; - else - get_slices_returned_less= false; + cass->get_range_slices(key_slice_vec, cparent, slice_pred, key_range, + read_consistency); - } catch (InvalidRequestException ire) { - print_error("%s [%s]", ire.what(), ire.why.c_str()); - } catch (UnavailableException ue) { - print_error("UnavailableException: %s", ue.what()); - } catch (TimedOutException te) { - print_error("TimedOutException: %s", te.what()); - }catch(TException e){ - print_error("Thrift exception: %s", e.what()); - } catch (...) { - print_error("Unknown exception"); - } + if (key_slice_vec.size() < (uint)read_batch_size) + get_slices_returned_less= true; + else + get_slices_returned_less= false; key_slice_it= key_slice_vec.begin(); - return res; + return false; } @@ -574,50 +568,78 @@ void Cassandra_se_impl::add_read_column(const char *name_arg) bool Cassandra_se_impl::truncate() { - bool res= true; - try { - - cass->truncate(column_family); - res= false; + return try_operation(&Cassandra_se_impl::retryable_truncate); +} - } catch (InvalidRequestException ire) { - print_error("%s [%s]", ire.what(), ire.why.c_str()); - } catch (UnavailableException ue) { - print_error("UnavailableException: %s", ue.what()); - } catch (TimedOutException te) { - print_error("TimedOutException: %s", te.what()); - }catch(TException e){ - print_error("Thrift exception: %s", e.what()); - } catch (...) { - print_error("Unknown exception"); - } - return res; +bool Cassandra_se_impl::retryable_truncate() +{ + cass->truncate(column_family); + return 0; } + bool Cassandra_se_impl::remove_row() { - bool res= true; + return try_operation(&Cassandra_se_impl::retryable_remove_row); +} + +bool Cassandra_se_impl::retryable_remove_row() +{ ColumnPath column_path; column_path.column_family= column_family; + cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency); + return 0; +} - try { - - cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency); - res= false; +/* + This function will try a Cassandra operation, and handle errors. - } catch (InvalidRequestException ire) { - print_error("%s [%s]", ire.what(), ire.why.c_str()); - } catch (UnavailableException ue) { - print_error("UnavailableException: %s", ue.what()); - } catch (TimedOutException te) { - print_error("TimedOutException: %s", te.what()); - }catch(TException e){ - print_error("Thrift exception: %s", e.what()); - } catch (...) { - print_error("Unknown exception"); - } +*/ +bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call) +{ + bool res; + int n_retries= thrift_call_retries_to_do; + + do + { + res= true; + + try { + + if ((res= (this->*func_to_call)())) + { + /* + The function call was made successfully (without timeouts, etc), + but something inside it returned 'true'. + This is supposedly a failure (or "not found" or other negative + result). We need to return this to the caller. + */ + n_retries= 0; + } + + } catch (InvalidRequestException ire) { + n_retries= 0; /* there is no point in retrying this operation */ + print_error("%s [%s]", ire.what(), ire.why.c_str()); + } catch (UnavailableException ue) { + cassandra_counters.unavailable_exceptions++; + if (!--n_retries) + print_error("UnavailableException: %s", ue.what()); + } catch (TimedOutException te) { + cassandra_counters.timeout_exceptions++; + if (!--n_retries) + print_error("TimedOutException: %s", te.what()); + }catch(TException e){ + /* todo: we may use retry for certain kinds of Thrift errors */ + n_retries= 0; + print_error("Thrift exception: %s", e.what()); + } catch (...) { + n_retries= 0; /* Don't retry */ + print_error("Unknown exception"); + } + + } while (res && n_retries > 0); return res; } @@ -638,9 +660,14 @@ int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len) return mrr_keys.size(); } - bool Cassandra_se_impl::multiget_slice() { + return try_operation(&Cassandra_se_impl::retryable_multiget_slice); +} + + +bool Cassandra_se_impl::retryable_multiget_slice() +{ ColumnParent cparent; cparent.column_family= column_family; @@ -650,34 +677,15 @@ bool Cassandra_se_impl::multiget_slice() sr.finish = ""; slice_pred.__set_slice_range(sr); - bool res= true; - - try { - - cassandra_counters.multiget_reads++; - cassandra_counters.multiget_keys_scanned += mrr_keys.size(); - - cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred, - read_consistency); - - cassandra_counters.multiget_rows_read += mrr_result.size(); - - res= false; - mrr_result_it= mrr_result.begin(); + cassandra_counters.multiget_reads++; + cassandra_counters.multiget_keys_scanned += mrr_keys.size(); + cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred, + read_consistency); - } catch (InvalidRequestException ire) { - print_error("%s [%s]", ire.what(), ire.why.c_str()); - } catch (UnavailableException ue) { - print_error("UnavailableException: %s", ue.what()); - } catch (TimedOutException te) { - print_error("TimedOutException: %s", te.what()); - }catch(TException e){ - print_error("Thrift exception: %s", e.what()); - } catch (...) { - print_error("Unknown exception"); - } + cassandra_counters.multiget_rows_read += mrr_result.size(); + mrr_result_it= mrr_result.begin(); - return res; + return false; } diff --git a/storage/cassandra/cassandra_se.h b/storage/cassandra/cassandra_se.h index 33ef677d276..c6de779f8bc 100644 --- a/storage/cassandra/cassandra_se.h +++ b/storage/cassandra/cassandra_se.h @@ -91,6 +91,9 @@ public: ulong multiget_reads; ulong multiget_keys_scanned; ulong multiget_rows_read; + + ulong timeout_exceptions; + ulong unavailable_exceptions; }; diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc index 18a4a8c2728..187df2a2dd8 100644 --- a/storage/cassandra/ha_cassandra.cc +++ b/storage/cassandra/ha_cassandra.cc @@ -82,6 +82,11 @@ static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG, "Number of rows in an rnd_read (full scan) batch", NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0); +static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG, + "Number of times to retry Cassandra calls that failed due to timeouts or " + "network communication problems. The default, 0, means not to retry.", + NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0); + /* These match values in enum_cassandra_consistency_level */ const char *cassandra_consistency_level[] = { @@ -161,6 +166,7 @@ static struct st_mysql_sys_var* cassandra_system_variables[]= { MYSQL_SYSVAR(default_thrift_host), MYSQL_SYSVAR(write_consistency), MYSQL_SYSVAR(read_consistency), + MYSQL_SYSVAR(failure_retries), NULL }; @@ -177,6 +183,11 @@ static SHOW_VAR cassandra_status_variables[]= { (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG}, {"multiget_rows_read", (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG}, + + {"timeout_exceptions", + (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG}, + {"unavailable_exceptions", + (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG}, {NullS, NullS, SHOW_LONG} }; @@ -1678,7 +1689,6 @@ bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info, static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff) { - //innodb_export_status(); cassandra_counters_copy= cassandra_counters; var->type= SHOW_ARRAY; |