summaryrefslogtreecommitdiff
path: root/storage/cassandra/cassandra_se.cc
diff options
context:
space:
mode:
authorSergey Petrunya <psergey@askmonty.org>2012-09-22 23:30:29 +0400
committerSergey Petrunya <psergey@askmonty.org>2012-09-22 23:30:29 +0400
commitc59faf95ae31b9ba61ba14ed53ddd92695eb05c8 (patch)
tree6b705c1ec58ef32c328bcc4aa8c06c9d0b3a8617 /storage/cassandra/cassandra_se.cc
parent004e024775ed5c68dcc721f36aedda7f59a2197e (diff)
downloadmariadb-git-c59faf95ae31b9ba61ba14ed53ddd92695eb05c8.tar.gz
Cassandra SE: make consistency settings user-settable.
Diffstat (limited to 'storage/cassandra/cassandra_se.cc')
-rw-r--r--storage/cassandra/cassandra_se.cc33
1 files changed, 24 insertions, 9 deletions
diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc
index c6415e6a4aa..3be81c5e6bf 100644
--- a/storage/cassandra/cassandra_se.cc
+++ b/storage/cassandra/cassandra_se.cc
@@ -23,6 +23,7 @@ using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;
+
void Cassandra_se_interface::print_error(const char *format, ...)
{
va_list ap;
@@ -38,10 +39,13 @@ void Cassandra_se_interface::print_error(const char *format, ...)
class Cassandra_se_impl: public Cassandra_se_interface
{
CassandraClient *cass; /* Connection to cassandra */
- ConsistencyLevel::type cur_consistency_level;
std::string column_family;
std::string keyspace;
+
+ ConsistencyLevel::type write_consistency;
+ ConsistencyLevel::type read_consistency;
+
/* DDL data */
KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
@@ -69,7 +73,9 @@ class Cassandra_se_impl: public Cassandra_se_interface
SlicePredicate slice_pred;
bool get_slices_returned_less;
public:
- Cassandra_se_impl() : cass(NULL) {}
+ Cassandra_se_impl() : cass(NULL),
+ write_consistency(ConsistencyLevel::ONE),
+ read_consistency(ConsistencyLevel::ONE) {}
virtual ~Cassandra_se_impl(){ delete cass; }
/* Connection and DDL checks */
@@ -81,6 +87,9 @@ public:
bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
void get_rowkey_type(char **name, char **type);
+ /* Settings */
+ void set_consistency_levels(ulong read_cons_level, ulong write_cons_level);
+
/* Writes */
void clear_insert_buffer();
void start_row_insert(const char *key, int key_len);
@@ -166,14 +175,20 @@ bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace
print_error("Unknown exception");
}
- cur_consistency_level= ConsistencyLevel::ONE;
-
if (!res && setup_ddl_checks())
res= true;
return res;
}
+void Cassandra_se_impl::set_consistency_levels(ulong read_cons_level,
+ ulong write_cons_level)
+{
+ write_cons_level= (ConsistencyLevel::type)(write_cons_level + 1);
+ read_cons_level= (ConsistencyLevel::type)(read_cons_level + 1);
+}
+
+
bool Cassandra_se_impl::setup_ddl_checks()
{
try {
@@ -308,7 +323,7 @@ bool Cassandra_se_impl::do_insert()
try {
- cass->batch_mutate(batch_mutation, cur_consistency_level);
+ cass->batch_mutate(batch_mutation, write_consistency);
cassandra_counters.row_inserts+= batch_mutation.size();
cassandra_counters.row_insert_batches++;
@@ -356,7 +371,7 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
try {
cass->get_slice(column_data_vec, rowkey, cparent, slice_pred,
- cur_consistency_level);
+ read_consistency);
if (column_data_vec.size() == 0)
{
@@ -471,7 +486,7 @@ bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
cass->get_range_slices(key_slice_vec,
cparent, slice_pred, key_range,
- cur_consistency_level);
+ read_consistency);
res= false;
if (key_slice_vec.size() < (uint)read_batch_size)
@@ -589,7 +604,7 @@ bool Cassandra_se_impl::remove_row()
try {
- cass->remove(rowkey, column_path, get_i64_timestamp(), cur_consistency_level);
+ cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency);
res= false;
} catch (InvalidRequestException ire) {
@@ -643,7 +658,7 @@ bool Cassandra_se_impl::multiget_slice()
cassandra_counters.multiget_keys_scanned += mrr_keys.size();
cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred,
- cur_consistency_level);
+ read_consistency);
cassandra_counters.multiget_rows_read += mrr_result.size();