summaryrefslogtreecommitdiff
path: root/storage
diff options
context:
space:
mode:
authorSergey Petrunya <psergey@askmonty.org>2012-09-24 19:15:12 +0400
committerSergey Petrunya <psergey@askmonty.org>2012-09-24 19:15:12 +0400
commitbce2e6683a19f7d32c4540b5850f370db6bb4e36 (patch)
treea90e415c3407dbba146899a059f7a3462267c350 /storage
parentc59faf95ae31b9ba61ba14ed53ddd92695eb05c8 (diff)
downloadmariadb-git-bce2e6683a19f7d32c4540b5850f370db6bb4e36.tar.gz
Cassandra SE
- Add support for Cassandra's 'varint' datatype, mappable to VARBINARY.
Diffstat (limited to 'storage')
-rw-r--r--storage/cassandra/ha_cassandra.cc102
-rw-r--r--storage/cassandra/ha_cassandra.h4
2 files changed, 80 insertions, 26 deletions
diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc
index 3643da97287..6a82f811eb4 100644
--- a/storage/cassandra/ha_cassandra.cc
+++ b/storage/cassandra/ha_cassandra.cc
@@ -531,7 +531,7 @@ public:
Field *field;
/* This will save Cassandra's data in the Field */
- virtual void cassandra_to_mariadb(const char *cass_data,
+ virtual int cassandra_to_mariadb(const char *cass_data,
int cass_data_len)=0;
/*
@@ -552,11 +552,12 @@ class DoubleDataConverter : public ColumnDataConverter
{
double buf;
public:
- void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+ int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len == sizeof(double));
double *pdata= (double*) cass_data;
field->store(*pdata);
+ return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -574,11 +575,12 @@ class FloatDataConverter : public ColumnDataConverter
{
float buf;
public:
- void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+ int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len == sizeof(float));
float *pdata= (float*) cass_data;
field->store(*pdata);
+ return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -608,7 +610,7 @@ class BigintDataConverter : public ColumnDataConverter
longlong buf;
bool flip; /* is false when reading counter columns */
public:
- void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+ int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
longlong tmp;
DBUG_ASSERT(cass_data_len == sizeof(longlong));
@@ -617,6 +619,7 @@ public:
else
memcpy(&tmp, cass_data, sizeof(longlong));
field->store(tmp);
+ return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -647,10 +650,11 @@ class TinyintDataConverter : public ColumnDataConverter
{
char buf;
public:
- void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+ int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len == 1);
field->store(cass_data[0]);
+ return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -668,12 +672,13 @@ class Int32DataConverter : public ColumnDataConverter
{
int32_t buf;
public:
- void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+ int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
int32_t tmp;
DBUG_ASSERT(cass_data_len == sizeof(int32_t));
flip32(cass_data, (char*)&tmp);
field->store(tmp);
+ return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -691,10 +696,14 @@ public:
class StringCopyConverter : public ColumnDataConverter
{
String buf;
+ size_t max_length;
public:
- void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+ int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
+ if ((size_t)cass_data_len > max_length)
+ return 1;
field->store(cass_data, cass_data_len,field->charset());
+ return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -704,6 +713,7 @@ public:
*cass_data_len= pstr->length();
return false;
}
+ StringCopyConverter(size_t max_length_arg) : max_length(max_length_arg) {}
~StringCopyConverter(){}
};
@@ -712,7 +722,7 @@ class TimestampDataConverter : public ColumnDataConverter
{
int64_t buf;
public:
- void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+ int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
/* Cassandra data is milliseconds-since-epoch in network byte order */
int64_t tmp;
@@ -724,6 +734,7 @@ public:
- microsecond fraction of a second.
*/
((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000);
+ return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -768,7 +779,7 @@ class UuidDataConverter : public ColumnDataConverter
char buf[16]; /* Binary UUID representation */
String str_buf;
public:
- void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
+ int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len==16);
char str[37];
@@ -783,6 +794,7 @@ public:
}
*ptr= 0;
field->store(str, 36,field->charset());
+ return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
@@ -836,6 +848,11 @@ const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType";
const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType";
+/*
+ VARINTs are stored as little-endian big numbers.
+*/
+const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerType";
+
ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
{
@@ -885,14 +902,20 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_
/* fall through: */
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING:
+ {
+ bool is_varint;
if (!strcmp(validator_name, validator_blob) ||
!strcmp(validator_name, validator_ascii) ||
- !strcmp(validator_name, validator_text))
+ !strcmp(validator_name, validator_text) ||
+ (is_varint= !strcmp(validator_name, validator_varint)))
{
- res= new StringCopyConverter;
+ size_t max_size= (size_t)-1;
+ if (is_varint)
+ max_size= field->field_length;
+ res= new StringCopyConverter(max_size);
}
break;
-
+ }
case MYSQL_TYPE_LONG:
if (!strcmp(validator_name, validator_int))
res= new Int32DataConverter;
@@ -1041,24 +1064,43 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
/* TODO: what if we're not reading all columns?? */
if (!found)
- {
rc= HA_ERR_KEY_NOT_FOUND;
- }
else
+ rc= read_cassandra_columns(false);
+
+ DBUG_RETURN(rc);
+}
+
+
+void ha_cassandra::print_conversion_error(const char *field_name,
+ char *cass_value,
+ int cass_value_len)
+{
+ char buf[32];
+ char *p= cass_value;
+ size_t i= 0;
+ for (; (i < (int)sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)
{
- read_cassandra_columns(false);
+ buf[i++]= map2number[(*p >> 4) & 0xF];
+ buf[i++]= map2number[*p & 0xF];
}
+ buf[i]=0;
- DBUG_RETURN(rc);
+ se->print_error("Unable to convert value for field `%s` from Cassandra's data"
+ " format. Source data is %d bytes, 0x%s%s",
+ field_name, cass_value_len, buf,
+ (i == sizeof(buf) - 1)? "..." : "");
+ my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
}
-void ha_cassandra::read_cassandra_columns(bool unpack_pk)
+int ha_cassandra::read_cassandra_columns(bool unpack_pk)
{
char *cass_name;
char *cass_value;
int cass_value_len;
Field **field;
+ int res= 0;
/*
cassandra_to_mariadb() calls will use field->store(...) methods, which
@@ -1082,7 +1124,14 @@ void ha_cassandra::read_cassandra_columns(bool unpack_pk)
{
int fieldnr= (*field)->field_index;
(*field)->set_notnull();
- field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len);
+ if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
+ cass_value_len))
+ {
+ print_conversion_error((*field)->field_name, cass_value,
+ cass_value_len);
+ res=1;
+ goto err;
+ }
break;
}
}
@@ -1094,10 +1143,17 @@ void ha_cassandra::read_cassandra_columns(bool unpack_pk)
field= table->field;
(*field)->set_notnull();
se->get_read_rowkey(&cass_value, &cass_value_len);
- rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len);
+ if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len))
+ {
+ print_conversion_error((*field)->field_name, cass_value, cass_value_len);
+ res=1;
+ goto err;
+ }
}
+err:
dbug_tmp_restore_column_map(table->write_set, old_map);
+ return res;
}
@@ -1234,10 +1290,7 @@ int ha_cassandra::rnd_next(uchar *buf)
if (reached_eof)
rc= HA_ERR_END_OF_FILE;
else
- {
- read_cassandra_columns(true);
- rc= 0;
- }
+ rc= read_cassandra_columns(true);
}
DBUG_RETURN(rc);
@@ -1422,8 +1475,7 @@ int ha_cassandra::multi_range_read_next(range_id_t *range_info)
{
if (!se->get_next_multiget_row())
{
- read_cassandra_columns(true);
- res= 0;
+ res= read_cassandra_columns(true);
break;
}
else
diff --git a/storage/cassandra/ha_cassandra.h b/storage/cassandra/ha_cassandra.h
index 9b11b055af3..52fd46fa5ef 100644
--- a/storage/cassandra/ha_cassandra.h
+++ b/storage/cassandra/ha_cassandra.h
@@ -58,7 +58,7 @@ class ha_cassandra: public handler
bool setup_field_converters(Field **field, uint n_fields);
void free_field_converters();
- void read_cassandra_columns(bool unpack_pk);
+ int read_cassandra_columns(bool unpack_pk);
int check_table_options(struct ha_table_option_struct* options);
bool doing_insert_batch;
@@ -66,6 +66,8 @@ class ha_cassandra: public handler
/* Used to produce 'wrong column %s at row %lu' warnings */
ha_rows insert_lineno;
+ void print_conversion_error(const char *field_name,
+ char *cass_value, int cass_value_len);
public:
ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
~ha_cassandra()