summaryrefslogtreecommitdiff
path: root/storage/cassandra/cassandra_se.cc
diff options
context:
space:
mode:
authorSergey Petrunya <psergey@askmonty.org>2013-05-28 12:38:22 +0400
committerSergey Petrunya <psergey@askmonty.org>2013-05-28 12:38:22 +0400
commitc00a37d11384a0c86ae0f42af971bb8691f4018b (patch)
tree79dccd3dc6f781056e4d883ef530c3f3db2a7833 /storage/cassandra/cassandra_se.cc
parent08ce9bfe057b6cd31e7fbca4e4e9e48edde242fb (diff)
downloadmariadb-git-c00a37d11384a0c86ae0f42af971bb8691f4018b.tar.gz
MDEV-4443: Cassandra SE: ERROR 1928 (HY000): Internal error: 'Thrift exception: Called write on non-open socket'
- Made call re-try system also handle network disconnects (it will reconnect before retrying) - Added Cassandra_network_exceptions counter. - @@cassandra_failure_retries is now always honored.
Diffstat (limited to 'storage/cassandra/cassandra_se.cc')
-rw-r--r--storage/cassandra/cassandra_se.cc97
1 files changed, 80 insertions, 17 deletions
diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc
index 6ae3b7e3609..111f30f715f 100644
--- a/storage/cassandra/cassandra_se.cc
+++ b/storage/cassandra/cassandra_se.cc
@@ -42,10 +42,14 @@ class Cassandra_se_impl: public Cassandra_se_interface
ConsistencyLevel::type write_consistency;
ConsistencyLevel::type read_consistency;
-
+
+ /* Connection data */
+ std::string host;
+ int port;
/* How many times to retry an operation before giving up */
int thrift_call_retries_to_do;
+ bool inside_try_operation;
/* DDL data */
KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
@@ -74,15 +78,19 @@ class Cassandra_se_impl: public Cassandra_se_interface
SliceRange slice_pred_sr;
bool get_slices_returned_less;
bool get_slice_found_rows;
+
+ bool reconnect();
public:
Cassandra_se_impl() : cass(NULL),
write_consistency(ConsistencyLevel::ONE),
read_consistency(ConsistencyLevel::ONE),
- thrift_call_retries_to_do(0) {}
+ thrift_call_retries_to_do(1),
+ inside_try_operation(false)
+ {}
virtual ~Cassandra_se_impl(){ delete cass; }
/* Connection and DDL checks */
- bool connect(const char *host, int port, const char *keyspace);
+ bool connect(const char *host_arg, int port_arg, const char *keyspace);
void set_column_family(const char *cfname) { column_family.assign(cfname); }
bool setup_ddl_checks();
@@ -94,6 +102,9 @@ public:
/* Settings */
void set_consistency_levels(ulong read_cons_level, ulong write_cons_level);
+ virtual void set_n_retries(uint retries_arg) {
+ thrift_call_retries_to_do= retries_arg;
+ }
/* Writes */
void clear_insert_buffer();
@@ -170,15 +181,25 @@ Cassandra_se_interface *create_cassandra_se()
}
-bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace_arg)
+bool Cassandra_se_impl::connect(const char *host_arg, int port_arg, const char *keyspace_arg)
{
- bool res= true;
-
keyspace.assign(keyspace_arg);
+ host.assign(host_arg);
+ port= port_arg;
+ return reconnect();
+}
+
+bool Cassandra_se_impl::reconnect()
+{
+
+ delete cass;
+ cass= NULL;
+
+ bool res= true;
try {
boost::shared_ptr<TTransport> socket =
- boost::shared_ptr<TSocket>(new TSocket(host, port));
+ boost::shared_ptr<TSocket>(new TSocket(host.c_str(), port));
boost::shared_ptr<TTransport> tr =
boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket));
boost::shared_ptr<TProtocol> p =
@@ -186,7 +207,7 @@ bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace
cass= new CassandraClient(p);
tr->open();
- cass->set_keyspace(keyspace_arg);
+ cass->set_keyspace(keyspace.c_str());
res= false; // success
}catch(TTransportException te){
@@ -694,7 +715,10 @@ bool Cassandra_se_impl::retryable_remove_row()
bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call)
{
bool res;
- int n_retries= thrift_call_retries_to_do;
+ int n_attempts= thrift_call_retries_to_do;
+
+ bool was_inside_try_operation= inside_try_operation;
+ inside_try_operation= true;
do
{
@@ -710,31 +734,70 @@ bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call)
This is supposedly a failure (or "not found" or other negative
result). We need to return this to the caller.
*/
- n_retries= 0;
+ n_attempts= 0;
}
} catch (InvalidRequestException ire) {
- n_retries= 0; /* there is no point in retrying this operation */
+ n_attempts= 0; /* there is no point in retrying this operation */
print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) {
cassandra_counters.unavailable_exceptions++;
- if (!--n_retries)
+ if (!--n_attempts)
print_error("UnavailableException: %s", ue.what());
} catch (TimedOutException te) {
+ /*
+ Note: this is a timeout generated *inside Cassandra cluster*.
+ Connection between us and the cluster is ok, but something went wrong
+ within the cluster.
+ */
cassandra_counters.timeout_exceptions++;
- if (!--n_retries)
+ if (!--n_attempts)
print_error("TimedOutException: %s", te.what());
+ } catch (TTransportException tte) {
+ /* Something went wrong in communication between us and Cassandra */
+ cassandra_counters.network_exceptions++;
+
+ switch (tte.getType())
+ {
+ case TTransportException::NOT_OPEN:
+ case TTransportException::TIMED_OUT:
+ case TTransportException::END_OF_FILE:
+ case TTransportException::INTERRUPTED:
+ {
+ if (!was_inside_try_operation && reconnect())
+ {
+ /* Failed to reconnect, no point to retry the operation */
+ n_attempts= 0;
+ print_error("%s", tte.what());
+ }
+ else
+ {
+ n_attempts--;
+ }
+ break;
+ }
+ default:
+ {
+ /*
+ We assume it doesn't make sense to retry for
+ unknown kinds of TTransportException-s
+ */
+ n_attempts= 0;
+ print_error("%s", tte.what());
+ }
+ }
}catch(TException e){
/* todo: we may use retry for certain kinds of Thrift errors */
- n_retries= 0;
+ n_attempts= 0;
print_error("Thrift exception: %s", e.what());
} catch (...) {
- n_retries= 0; /* Don't retry */
+ n_attempts= 0; /* Don't retry */
print_error("Unknown exception");
}
- } while (res && n_retries > 0);
-
+ } while (res && n_attempts > 0);
+
+ inside_try_operation= was_inside_try_operation;
return res;
}