diff options
author | Sergey Petrunya <psergey@askmonty.org> | 2013-05-28 12:38:22 +0400 |
---|---|---|
committer | Sergey Petrunya <psergey@askmonty.org> | 2013-05-28 12:38:22 +0400 |
commit | c00a37d11384a0c86ae0f42af971bb8691f4018b (patch) | |
tree | 79dccd3dc6f781056e4d883ef530c3f3db2a7833 /storage/cassandra | |
parent | 08ce9bfe057b6cd31e7fbca4e4e9e48edde242fb (diff) | |
download | mariadb-git-c00a37d11384a0c86ae0f42af971bb8691f4018b.tar.gz |
MDEV-4443: Cassandra SE: ERROR 1928 (HY000): Internal error: 'Thrift exception: Called write on non-open socket'
- Made call re-try system also handle network disconnects (it will reconnect before retrying)
- Added Cassandra_network_exceptions counter.
- @@cassandra_failure_retries is now always honored.
Diffstat (limited to 'storage/cassandra')
-rw-r--r-- | storage/cassandra/cassandra_se.cc | 97 | ||||
-rw-r--r-- | storage/cassandra/cassandra_se.h | 2 | ||||
-rw-r--r-- | storage/cassandra/ha_cassandra.cc | 5 |
3 files changed, 86 insertions, 18 deletions
diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc index 6ae3b7e3609..111f30f715f 100644 --- a/storage/cassandra/cassandra_se.cc +++ b/storage/cassandra/cassandra_se.cc @@ -42,10 +42,14 @@ class Cassandra_se_impl: public Cassandra_se_interface ConsistencyLevel::type write_consistency; ConsistencyLevel::type read_consistency; - + + /* Connection data */ + std::string host; + int port; /* How many times to retry an operation before giving up */ int thrift_call_retries_to_do; + bool inside_try_operation; /* DDL data */ KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */ @@ -74,15 +78,19 @@ class Cassandra_se_impl: public Cassandra_se_interface SliceRange slice_pred_sr; bool get_slices_returned_less; bool get_slice_found_rows; + + bool reconnect(); public: Cassandra_se_impl() : cass(NULL), write_consistency(ConsistencyLevel::ONE), read_consistency(ConsistencyLevel::ONE), - thrift_call_retries_to_do(0) {} + thrift_call_retries_to_do(1), + inside_try_operation(false) + {} virtual ~Cassandra_se_impl(){ delete cass; } /* Connection and DDL checks */ - bool connect(const char *host, int port, const char *keyspace); + bool connect(const char *host_arg, int port_arg, const char *keyspace); void set_column_family(const char *cfname) { column_family.assign(cfname); } bool setup_ddl_checks(); @@ -94,6 +102,9 @@ public: /* Settings */ void set_consistency_levels(ulong read_cons_level, ulong write_cons_level); + virtual void set_n_retries(uint retries_arg) { + thrift_call_retries_to_do= retries_arg; + } /* Writes */ void clear_insert_buffer(); @@ -170,15 +181,25 @@ Cassandra_se_interface *create_cassandra_se() } -bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace_arg) +bool Cassandra_se_impl::connect(const char *host_arg, int port_arg, const char *keyspace_arg) { - bool res= true; - keyspace.assign(keyspace_arg); + host.assign(host_arg); + port= port_arg; + return reconnect(); +} + +bool Cassandra_se_impl::reconnect() +{ + + delete cass; + cass= NULL; + + bool res= true; try { boost::shared_ptr<TTransport> socket = - boost::shared_ptr<TSocket>(new TSocket(host, port)); + boost::shared_ptr<TSocket>(new TSocket(host.c_str(), port)); boost::shared_ptr<TTransport> tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket)); boost::shared_ptr<TProtocol> p = @@ -186,7 +207,7 @@ bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace cass= new CassandraClient(p); tr->open(); - cass->set_keyspace(keyspace_arg); + cass->set_keyspace(keyspace.c_str()); res= false; // success }catch(TTransportException te){ @@ -694,7 +715,10 @@ bool Cassandra_se_impl::retryable_remove_row() bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call) { bool res; - int n_retries= thrift_call_retries_to_do; + int n_attempts= thrift_call_retries_to_do; + + bool was_inside_try_operation= inside_try_operation; + inside_try_operation= true; do { @@ -710,31 +734,70 @@ bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call) This is supposedly a failure (or "not found" or other negative result). We need to return this to the caller. */ - n_retries= 0; + n_attempts= 0; } } catch (InvalidRequestException ire) { - n_retries= 0; /* there is no point in retrying this operation */ + n_attempts= 0; /* there is no point in retrying this operation */ print_error("%s [%s]", ire.what(), ire.why.c_str()); } catch (UnavailableException ue) { cassandra_counters.unavailable_exceptions++; - if (!--n_retries) + if (!--n_attempts) print_error("UnavailableException: %s", ue.what()); } catch (TimedOutException te) { + /* + Note: this is a timeout generated *inside Cassandra cluster*. + Connection between us and the cluster is ok, but something went wrong + within the cluster. + */ cassandra_counters.timeout_exceptions++; - if (!--n_retries) + if (!--n_attempts) print_error("TimedOutException: %s", te.what()); + } catch (TTransportException tte) { + /* Something went wrong in communication between us and Cassandra */ + cassandra_counters.network_exceptions++; + + switch (tte.getType()) + { + case TTransportException::NOT_OPEN: + case TTransportException::TIMED_OUT: + case TTransportException::END_OF_FILE: + case TTransportException::INTERRUPTED: + { + if (!was_inside_try_operation && reconnect()) + { + /* Failed to reconnect, no point to retry the operation */ + n_attempts= 0; + print_error("%s", tte.what()); + } + else + { + n_attempts--; + } + break; + } + default: + { + /* + We assume it doesn't make sense to retry for + unknown kinds of TTransportException-s + */ + n_attempts= 0; + print_error("%s", tte.what()); + } + } }catch(TException e){ /* todo: we may use retry for certain kinds of Thrift errors */ - n_retries= 0; + n_attempts= 0; print_error("Thrift exception: %s", e.what()); } catch (...) { - n_retries= 0; /* Don't retry */ + n_attempts= 0; /* Don't retry */ print_error("Unknown exception"); } - } while (res && n_retries > 0); - + } while (res && n_attempts > 0); + + inside_try_operation= was_inside_try_operation; return res; } diff --git a/storage/cassandra/cassandra_se.h b/storage/cassandra/cassandra_se.h index 050c65e6dde..2d3d5f27a56 100644 --- a/storage/cassandra/cassandra_se.h +++ b/storage/cassandra/cassandra_se.h @@ -45,6 +45,7 @@ public: /* Settings */ virtual void set_consistency_levels(ulong read_cons_level, ulong write_cons_level)=0; + virtual void set_n_retries(uint retries_arg)=0; /* Check underlying DDL */ virtual bool setup_ddl_checks()=0; @@ -113,6 +114,7 @@ public: ulong timeout_exceptions; ulong unavailable_exceptions; + ulong network_exceptions; }; diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc index 481b520e79a..403d21f75f9 100644 --- a/storage/cassandra/ha_cassandra.cc +++ b/storage/cassandra/ha_cassandra.cc @@ -110,7 +110,7 @@ static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG, static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG, "Number of times to retry Cassandra calls that failed due to timeouts or " "network communication problems. The default, 0, means not to retry.", - NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0); + NULL, NULL, /*default*/ 3, /*min*/ 1, /*max*/ 1024*1024*1024, 0); /* These match values in enum_cassandra_consistency_level */ const char *cassandra_consistency_level[] = @@ -2210,6 +2210,7 @@ int ha_cassandra::reset() { se->set_consistency_levels(THDVAR(table->in_use, read_consistency), THDVAR(table->in_use, write_consistency)); + se->set_n_retries(THDVAR(table->in_use, failure_retries)); } return 0; } @@ -2581,6 +2582,8 @@ static SHOW_VAR cassandra_status_variables[]= { {"multiget_rows_read", (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG}, + {"network_exceptions", + (char*) &cassandra_counters.network_exceptions, SHOW_LONG}, {"timeout_exceptions", (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG}, {"unavailable_exceptions", |