summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/field.cc267
-rw-r--r--sql/field.h45
-rw-r--r--sql/log_event.cc23
-rw-r--r--sql/log_event_old.cc22
-rw-r--r--sql/mysql_priv.h1
-rw-r--r--sql/mysqld.cc23
-rw-r--r--sql/rpl_record.cc63
-rw-r--r--sql/rpl_rli.h20
-rw-r--r--sql/rpl_utility.cc886
-rw-r--r--sql/rpl_utility.h186
-rw-r--r--sql/set_var.cc34
-rw-r--r--sql/set_var.h1
-rw-r--r--sql/share/errmsg.txt5
-rw-r--r--sql/sql_class.h6
-rw-r--r--sql/sql_select.cc12
15 files changed, 1336 insertions, 258 deletions
diff --git a/sql/field.cc b/sql/field.cc
index 0df9b0fc2e4..b6a795d34aa 100644
--- a/sql/field.cc
+++ b/sql/field.cc
@@ -59,6 +59,8 @@ const char field_separator=',';
#define ASSERT_COLUMN_MARKED_FOR_READ DBUG_ASSERT(!table || (!table->read_set || bitmap_is_set(table->read_set, field_index)))
#define ASSERT_COLUMN_MARKED_FOR_WRITE DBUG_ASSERT(!table || (!table->write_set || bitmap_is_set(table->write_set, field_index)))
+#define FLAGSTR(S,F) ((S) & (F) ? #F " " : "")
+
/*
Rules for merging different types of fields in UNION
@@ -997,6 +999,21 @@ test_if_important_data(CHARSET_INFO *cs, const char *str, const char *strend)
/**
+ Template function to compare two objects.
+ */
+namespace {
+ template <class A_type, class B_type>
+ int compare(A_type a, B_type b)
+ {
+ if (a < b)
+ return -1;
+ if (b < a)
+ return 1;
+ return 0;
+ }
+}
+
+/**
Detect Item_result by given field type of UNION merge result.
@param field_type given field type
@@ -1367,22 +1384,46 @@ bool Field::send_binary(Protocol *protocol)
/**
Check to see if field size is compatible with destination.
- This method is used in row-based replication to verify that the slave's
- field size is less than or equal to the master's field size. The
- encoded field metadata (from the master or source) is decoded and compared
- to the size of this field (the slave or destination).
+ This method is used in row-based replication to verify that the
+ slave's field size is less than or equal to the master's field
+ size. The encoded field metadata (from the master or source) is
+ decoded and compared to the size of this field (the slave or
+ destination).
+
+ @note
+
+ The comparison is made so that if the source data (from the master)
+ is less than the target data (on the slave), -1 is returned in @c
+ <code>*order_var</code>. This implies that a conversion is
+ necessary, but that it is lossy and can result in truncation of the
+ value.
+
+ If the source data is strictly greater than the target data, 1 is
+ returned in <code>*order_var</code>. This implies that the source
+ type can is contained in the target type and that a conversion is
+ necessary but is non-lossy.
+
+ If no conversion is required to fit the source type in the target
+ type, 0 is returned in <code>*order_var</code>.
@param field_metadata Encoded size in field metadata
+ @param order_var Pointer to variable where the order
+ between the source field and this field
+ will be returned.
- @retval 0 if this field's size is < the source field's size
- @retval 1 if this field's size is >= the source field's size
+ @return @c true if this field's size is compatible with the
+ master's field size, @c false otherwise.
*/
-int Field::compatible_field_size(uint field_metadata,
- const Relay_log_info *rli_arg __attribute__((unused)))
+bool Field::compatible_field_size(uint field_metadata,
+ Relay_log_info *rli_arg __attribute__((unused)),
+ int *order_var)
{
uint const source_size= pack_length_from_metadata(field_metadata);
uint const destination_size= row_pack_length();
- return (source_size <= destination_size);
+ DBUG_PRINT("debug", ("real_type: %d, source_size: %u, destination_size: %u",
+ real_type(), source_size, destination_size));
+ *order_var = compare(source_size, destination_size);
+ return true;
}
@@ -2907,33 +2948,15 @@ uint Field_new_decimal::pack_length_from_metadata(uint field_metadata)
}
-/**
- Check to see if field size is compatible with destination.
-
- This method is used in row-based replication to verify that the slave's
- field size is less than or equal to the master's field size. The
- encoded field metadata (from the master or source) is decoded and compared
- to the size of this field (the slave or destination).
-
- @param field_metadata Encoded size in field metadata
-
- @retval 0 if this field's size is < the source field's size
- @retval 1 if this field's size is >= the source field's size
-*/
-int Field_new_decimal::compatible_field_size(uint field_metadata,
- const Relay_log_info * __attribute__((unused)))
+bool Field_new_decimal::compatible_field_size(uint field_metadata,
+ Relay_log_info * __attribute__((unused)),
+ int *order_var)
{
- int compatible= 0;
uint const source_precision= (field_metadata >> 8U) & 0x00ff;
uint const source_decimal= field_metadata & 0x00ff;
- uint const source_size= my_decimal_get_binary_size(source_precision,
- source_decimal);
- uint const destination_size= row_pack_length();
- compatible= (source_size <= destination_size);
- if (compatible)
- compatible= (source_precision <= precision) &&
- (source_decimal <= decimals());
- return (compatible);
+ int order= compare(source_precision, precision);
+ *order_var= order != 0 ? order : compare(source_decimal, dec);
+ return true;
}
@@ -6707,8 +6730,10 @@ check_field_for_37426(const void *param_arg)
}
#endif
-int Field_string::compatible_field_size(uint field_metadata,
- const Relay_log_info *rli_arg)
+bool
+Field_string::compatible_field_size(uint field_metadata,
+ Relay_log_info *rli_arg,
+ int *order_var)
{
#ifdef HAVE_REPLICATION
const Check_field_param check_param = { this };
@@ -6716,7 +6741,7 @@ int Field_string::compatible_field_size(uint field_metadata,
check_field_for_37426, &check_param))
return FALSE; // Not compatible field sizes
#endif
- return Field::compatible_field_size(field_metadata, rli_arg);
+ return Field::compatible_field_size(field_metadata, rli_arg, order_var);
}
@@ -6778,6 +6803,8 @@ uchar *Field_string::pack(uchar *to, const uchar *from,
{
uint length= min(field_length,max_length);
uint local_char_length= max_length/field_charset->mbmaxlen;
+ DBUG_PRINT("debug", ("Packing field '%s' - length: %u ", field_name, length));
+
if (length > local_char_length)
local_char_length= my_charpos(field_charset, from, from+length,
local_char_length);
@@ -7625,6 +7652,7 @@ Field_blob::Field_blob(uchar *ptr_arg, uchar *null_ptr_arg, uchar null_bit_arg,
cs),
packlength(blob_pack_length)
{
+ DBUG_ASSERT(blob_pack_length <= 4); // Only pack lengths 1-4 supported currently
flags|= BLOB_FLAG;
share->blob_fields++;
/* TODO: why do not fill table->s->blob_field array here? */
@@ -8035,8 +8063,10 @@ int Field_blob::key_cmp(const uchar *a,const uchar *b)
*/
int Field_blob::do_save_field_metadata(uchar *metadata_ptr)
{
+ DBUG_ENTER("Field_blob::do_save_field_metadata");
*metadata_ptr= pack_length_no_ptr();
- return 1;
+ DBUG_PRINT("debug", ("metadata: %u (pack_length_no_ptr)", *metadata_ptr));
+ DBUG_RETURN(1);
}
@@ -8977,6 +9007,9 @@ Field_bit::Field_bit(uchar *ptr_arg, uint32 len_arg, uchar *null_ptr_arg,
bit_ptr(bit_ptr_arg), bit_ofs(bit_ofs_arg), bit_len(len_arg & 7),
bytes_in_rec(len_arg / 8)
{
+ DBUG_ENTER("Field_bit::Field_bit");
+ DBUG_PRINT("enter", ("ptr_arg: %p, null_ptr_arg: %p, len_arg: %u, bit_len: %u, bytes_in_rec: %u",
+ ptr_arg, null_ptr_arg, len_arg, bit_len, bytes_in_rec));
flags|= UNSIGNED_FLAG;
/*
Ensure that Field::eq() can distinguish between two different bit fields.
@@ -8984,6 +9017,7 @@ Field_bit::Field_bit(uchar *ptr_arg, uint32 len_arg, uchar *null_ptr_arg,
*/
if (!null_ptr_arg)
null_bit= bit_ofs_arg;
+ DBUG_VOID_RETURN;
}
@@ -9268,9 +9302,12 @@ uint Field_bit::get_key_image(uchar *buff, uint length, imagetype type_arg)
*/
int Field_bit::do_save_field_metadata(uchar *metadata_ptr)
{
+ DBUG_ENTER("Field_bit::do_save_field_metadata");
+ DBUG_PRINT("debug", ("bit_len: %d, bytes_in_rec: %d",
+ bit_len, bytes_in_rec));
*metadata_ptr= bit_len;
*(metadata_ptr + 1)= bytes_in_rec;
- return 2;
+ DBUG_RETURN(2);
}
@@ -9295,34 +9332,20 @@ uint Field_bit::pack_length_from_metadata(uint field_metadata)
}
-/**
- Check to see if field size is compatible with destination.
-
- This method is used in row-based replication to verify that the slave's
- field size is less than or equal to the master's field size. The
- encoded field metadata (from the master or source) is decoded and compared
- to the size of this field (the slave or destination).
-
- @param field_metadata Encoded size in field metadata
-
- @retval 0 if this field's size is < the source field's size
- @retval 1 if this field's size is >= the source field's size
-*/
-int Field_bit::compatible_field_size(uint field_metadata,
- const Relay_log_info * __attribute__((unused)))
+bool
+Field_bit::compatible_field_size(uint field_metadata,
+ Relay_log_info * __attribute__((unused)),
+ int *order_var)
{
- int compatible= 0;
- uint const source_size= pack_length_from_metadata(field_metadata);
- uint const destination_size= row_pack_length();
- uint const from_bit_len= field_metadata & 0x00ff;
- uint const from_len= (field_metadata >> 8U) & 0x00ff;
- if ((bit_len == 0) || (from_bit_len == 0))
- compatible= (source_size <= destination_size);
- else if (from_bit_len > bit_len)
- compatible= (from_len < bytes_in_rec);
- else
- compatible= ((from_bit_len <= bit_len) && (from_len <= bytes_in_rec));
- return (compatible);
+ DBUG_ENTER("Field_bit::compatible_field_size");
+ DBUG_ASSERT((field_metadata >> 16) == 0);
+ uint const from_bit_len=
+ 8 * (field_metadata >> 8) + (field_metadata & 0xff);
+ uint const to_bit_len= max_display_length();
+ DBUG_PRINT("debug", ("from_bit_len: %u, to_bit_len: %u",
+ from_bit_len, to_bit_len));
+ *order_var= compare(from_bit_len, to_bit_len);
+ DBUG_RETURN(TRUE);
}
@@ -9388,8 +9411,15 @@ const uchar *
Field_bit::unpack(uchar *to, const uchar *from, uint param_data,
bool low_byte_first __attribute__((unused)))
{
+ DBUG_ENTER("Field_bit::unpack");
+ DBUG_PRINT("enter", ("to: %p, from: %p, param_data: 0x%x",
+ to, from, param_data));
+ DBUG_PRINT("debug", ("bit_ptr: %p, bit_len: %u, bit_ofs: %u",
+ bit_ptr, bit_len, bit_ofs));
uint const from_len= (param_data >> 8U) & 0x00ff;
uint const from_bit_len= param_data & 0x00ff;
+ DBUG_PRINT("debug", ("from_len: %u, from_bit_len: %u",
+ from_len, from_bit_len));
/*
If the parameter data is zero (i.e., undefined), or if the master
and slave have the same sizes, then use the old unpack() method.
@@ -9410,7 +9440,7 @@ Field_bit::unpack(uchar *to, const uchar *from, uint param_data,
from++;
}
memcpy(to, from, bytes_in_rec);
- return from + bytes_in_rec;
+ DBUG_RETURN(from + bytes_in_rec);
}
/*
@@ -9436,7 +9466,7 @@ Field_bit::unpack(uchar *to, const uchar *from, uint param_data,
bitmap_set_bit(table->write_set,field_index);
store(value, new_len, system_charset_info);
my_afree(value);
- return from + len;
+ DBUG_RETURN(from + len);
}
@@ -9564,8 +9594,11 @@ void Create_field::create_length_to_internal_length(void)
*/
void Create_field::init_for_tmp_table(enum_field_types sql_type_arg,
uint32 length_arg, uint32 decimals_arg,
- bool maybe_null, bool is_unsigned)
+ bool maybe_null, bool is_unsigned,
+ uint pack_length)
{
+ DBUG_ENTER("Create_field::init_for_tmp_table");
+
field_name= "";
sql_type= sql_type_arg;
char_length= length= length_arg;;
@@ -9573,10 +9606,92 @@ void Create_field::init_for_tmp_table(enum_field_types sql_type_arg,
interval= 0;
charset= &my_charset_bin;
geom_type= Field::GEOM_GEOMETRY;
- pack_flag= (FIELDFLAG_NUMBER |
- ((decimals_arg & FIELDFLAG_MAX_DEC) << FIELDFLAG_DEC_SHIFT) |
- (maybe_null ? FIELDFLAG_MAYBE_NULL : 0) |
- (is_unsigned ? 0 : FIELDFLAG_DECIMAL));
+
+ DBUG_PRINT("enter", ("sql_type: %d, length: %u, pack_length: %u",
+ sql_type_arg, length_arg, pack_length));
+
+ /*
+ These pack flags are crafted to get it correctly through the
+ branches of make_field().
+ */
+ switch (sql_type_arg)
+ {
+ case MYSQL_TYPE_VARCHAR:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_STRING:
+ case MYSQL_TYPE_SET:
+ pack_flag= 0;
+ break;
+
+ case MYSQL_TYPE_GEOMETRY:
+ pack_flag= FIELDFLAG_GEOM;
+ break;
+
+ case MYSQL_TYPE_ENUM:
+ pack_flag= FIELDFLAG_INTERVAL;
+ break;
+
+ case MYSQL_TYPE_DECIMAL:
+ case MYSQL_TYPE_NEWDECIMAL:
+ case MYSQL_TYPE_FLOAT:
+ case MYSQL_TYPE_DOUBLE:
+ pack_flag= FIELDFLAG_DECIMAL | FIELDFLAG_NUMBER |
+ (decimals_arg & FIELDFLAG_MAX_DEC) << FIELDFLAG_DEC_SHIFT;
+ break;
+
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_BLOB:
+ pack_flag= FIELDFLAG_BLOB;
+ break;
+
+ case MYSQL_TYPE_BIT:
+ pack_flag= FIELDFLAG_NUMBER | FIELDFLAG_TREAT_BIT_AS_CHAR;
+ break;
+
+ default:
+ pack_flag= FIELDFLAG_NUMBER;
+ break;
+ }
+
+ /*
+ Set the pack flag correctly for the blob-like types. This sets the
+ packtype to something that make_field can use. If the pack type is
+ not set correctly, the packlength will be reeeeally wierd (like
+ 129 or so).
+ */
+ switch (sql_type_arg)
+ {
+ case MYSQL_TYPE_ENUM:
+ case MYSQL_TYPE_SET:
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_GEOMETRY:
+ // If you are going to use the above types, you have to pass a
+ // pack_length as parameter. Assert that is really done.
+ DBUG_ASSERT(pack_length != ~0U);
+ pack_flag|= pack_length_to_packflag(pack_length);
+ break;
+ default:
+ /* Nothing */
+ break;
+ }
+
+ pack_flag|=
+ (maybe_null ? FIELDFLAG_MAYBE_NULL : 0) |
+ (is_unsigned ? 0 : FIELDFLAG_DECIMAL);
+
+ DBUG_PRINT("debug", ("pack_flag: %s%s%s%s%s, pack_type: %d",
+ FLAGSTR(pack_flag, FIELDFLAG_BINARY),
+ FLAGSTR(pack_flag, FIELDFLAG_NUMBER),
+ FLAGSTR(pack_flag, FIELDFLAG_INTERVAL),
+ FLAGSTR(pack_flag, FIELDFLAG_GEOM),
+ FLAGSTR(pack_flag, FIELDFLAG_BLOB),
+ f_packtype(pack_flag)));
+ DBUG_VOID_RETURN;
}
@@ -10073,6 +10188,14 @@ Field *make_field(TABLE_SHARE *share, uchar *ptr, uint32 field_length,
default: break;
}
+ DBUG_PRINT("debug", ("field_type: %d, field_length: %u, interval: %p, pack_flag: %s%s%s%s%s",
+ field_type, field_length, interval,
+ FLAGSTR(pack_flag, FIELDFLAG_BINARY),
+ FLAGSTR(pack_flag, FIELDFLAG_INTERVAL),
+ FLAGSTR(pack_flag, FIELDFLAG_NUMBER),
+ FLAGSTR(pack_flag, FIELDFLAG_PACK),
+ FLAGSTR(pack_flag, FIELDFLAG_BLOB)));
+
if (f_is_alpha(pack_flag))
{
if (!f_is_packed(pack_flag))
diff --git a/sql/field.h b/sql/field.h
index 7a9b69eff40..cfa3e57d2a0 100644
--- a/sql/field.h
+++ b/sql/field.h
@@ -164,22 +164,13 @@ public:
table, which is located on disk).
*/
virtual uint32 pack_length_in_rec() const { return pack_length(); }
- virtual int compatible_field_size(uint field_metadata,
- const Relay_log_info *);
+ virtual bool compatible_field_size(uint metadata, Relay_log_info *rli,
+ int *order);
virtual uint pack_length_from_metadata(uint field_metadata)
- { return field_metadata; }
- /*
- This method is used to return the size of the data in a row-based
- replication row record. The default implementation of returning 0 is
- designed to allow fields that do not use metadata to return TRUE (1)
- from compatible_field_size() which uses this function in the comparison.
- The default value for field metadata for fields that do not have
- metadata is 0. Thus, 0 == 0 means the fields are compatible in size.
-
- Note: While most classes that override this method return pack_length(),
- the classes Field_string, Field_varstring, and Field_blob return
- field_length + 1, field_length, and pack_length_no_ptr() respectfully.
- */
+ {
+ DBUG_ENTER("Field::pack_length_from_metadata");
+ DBUG_RETURN(field_metadata);
+ }
virtual uint row_pack_length() { return 0; }
virtual int save_field_metadata(uchar *first_byte)
{ return do_save_field_metadata(first_byte); }
@@ -636,6 +627,13 @@ public:
int store_decimal(const my_decimal *);
my_decimal *val_decimal(my_decimal *);
uint is_equal(Create_field *new_field);
+ uint row_pack_length() { return pack_length(); }
+ uint32 pack_length_from_metadata(uint field_metadata) {
+ uint32 length= pack_length();
+ DBUG_PRINT("result", ("pack_length_from_metadata(%d): %u",
+ field_metadata, length));
+ return length;
+ }
int check_int(CHARSET_INFO *cs, const char *str, int length,
const char *int_end, int error);
bool get_int(CHARSET_INFO *cs, const char *from, uint len,
@@ -806,8 +804,8 @@ public:
uint32 pack_length() const { return (uint32) bin_size; }
uint pack_length_from_metadata(uint field_metadata);
uint row_pack_length() { return pack_length(); }
- int compatible_field_size(uint field_metadata,
- const Relay_log_info *rli);
+ bool compatible_field_size(uint field_metadata, Relay_log_info *rli,
+ int *order_var);
uint is_equal(Create_field *new_field);
virtual const uchar *unpack(uchar* to, const uchar *from,
uint param_data, bool low_byte_first);
@@ -1501,9 +1499,9 @@ public:
return row_pack_length();
return (((field_metadata >> 4) & 0x300) ^ 0x300) + (field_metadata & 0x00ff);
}
- int compatible_field_size(uint field_metadata,
- const Relay_log_info *rli);
- uint row_pack_length() { return (field_length + 1); }
+ bool compatible_field_size(uint field_metadata, Relay_log_info *rli,
+ int *order_var);
+ uint row_pack_length() { return field_length; }
int pack_cmp(const uchar *a,const uchar *b,uint key_length,
my_bool insert_or_update);
int pack_cmp(const uchar *b,uint key_length,my_bool insert_or_update);
@@ -1965,8 +1963,8 @@ public:
uint pack_length_from_metadata(uint field_metadata);
uint row_pack_length()
{ return (bytes_in_rec + ((bit_len > 0) ? 1 : 0)); }
- int compatible_field_size(uint field_metadata,
- const Relay_log_info *rli);
+ bool compatible_field_size(uint metadata, Relay_log_info *rli,
+ int *order_var);
void sql_type(String &str) const;
virtual uchar *pack(uchar *to, const uchar *from,
uint max_length, bool low_byte_first);
@@ -2069,7 +2067,8 @@ public:
/* Init for a tmp table field. To be extended if need be. */
void init_for_tmp_table(enum_field_types sql_type_arg,
uint32 max_length, uint32 decimals,
- bool maybe_null, bool is_unsigned);
+ bool maybe_null, bool is_unsigned,
+ uint pack_length = ~0U);
bool init(THD *thd, char *field_name, enum_field_types type, char *length,
char *decimals, uint type_modifier, Item *default_value,
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 0e1500dac39..00919b65fa0 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -29,7 +29,6 @@
#include "rpl_rli.h"
#include "rpl_mi.h"
#include "rpl_filter.h"
-#include "rpl_utility.h"
#include "rpl_record.h"
#include <my_dir.h>
@@ -37,6 +36,7 @@
#include <base64.h>
#include <my_bitmap.h>
+#include "rpl_utility.h"
#define log_cs &my_charset_latin1
@@ -7309,11 +7309,18 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
{
+ DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
+ rli->tables_to_lock));
RPL_TABLE_LIST *ptr= rli->tables_to_lock;
for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
{
- if (ptr->m_tabledef.compatible_with(rli, ptr->table))
+ TABLE *conv_table;
+ if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
+ ptr->table, &conv_table))
{
+ DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master",
+ ptr->table->s->db.str,
+ ptr->table->s->table_name.str));
/*
We should not honour --slave-skip-errors at this point as we are
having severe errors which should not be skiped.
@@ -7324,12 +7331,17 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
DBUG_RETURN(ERR_BAD_TABLE_DEF);
}
+ DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
+ " - conv_table: %p",
+ ptr->table->s->db.str,
+ ptr->table->s->table_name.str, conv_table));
+ ptr->m_conv_table= conv_table;
}
}
/*
- ... and then we add all the tables to the table map and remove
- them from tables to lock.
+ ... and then we add all the tables to the table map and but keep
+ them in the tables to lock list.
We also invalidate the query cache for all the tables, since
they will now be changed.
@@ -7825,7 +7837,10 @@ int Table_map_log_event::save_field_metadata()
DBUG_ENTER("Table_map_log_event::save_field_metadata");
int index= 0;
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
+ {
+ DBUG_PRINT("debug", ("field_type: %d", m_coltype[i]));
index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
+ }
DBUG_RETURN(index);
}
#endif /* !defined(MYSQL_CLIENT) */
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc
index 3389821a718..10d1dcaa27a 100644
--- a/sql/log_event_old.cc
+++ b/sql/log_event_old.cc
@@ -107,14 +107,24 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info
RPL_TABLE_LIST *ptr= rli->tables_to_lock;
for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
{
- if (ptr->m_tabledef.compatible_with(rli, ptr->table))
+ TABLE *conv_table;
+ if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
+ ptr->table, &conv_table))
{
+ DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master",
+ ptr->table->s->db.str,
+ ptr->table->s->table_name.str));
mysql_unlock_tables(thd, thd->lock);
thd->lock= 0;
thd->is_slave_error= 1;
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF);
}
+ DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
+ " - conv_table: %p",
+ ptr->table->s->db.str,
+ ptr->table->s->table_name.str, conv_table));
+ ptr->m_conv_table= conv_table;
}
}
@@ -1578,7 +1588,9 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
RPL_TABLE_LIST *ptr= rli->tables_to_lock;
for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
{
- if (ptr->m_tabledef.compatible_with(rli, ptr->table))
+ TABLE *conv_table;
+ if (ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
+ ptr->table, &conv_table))
{
mysql_unlock_tables(thd, thd->lock);
thd->lock= 0;
@@ -1586,12 +1598,14 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
DBUG_RETURN(ERR_BAD_TABLE_DEF);
}
+ ptr->m_conv_table= conv_table;
}
}
/*
- ... and then we add all the tables to the table map and remove
- them from tables to lock.
+ ... and then we add all the tables to the table map but keep
+ them in the tables to lock list.
+
We also invalidate the query cache for all the tables, since
they will now be changed.
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index c04b1f5ae38..522fa679e6f 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -1998,6 +1998,7 @@ extern my_bool opt_sql_bin_update, opt_safe_user_create, opt_no_mix_types;
extern my_bool opt_safe_show_db, opt_local_infile, opt_myisam_use_mmap;
extern my_bool opt_slave_compressed_protocol, use_temp_pool;
extern ulong slave_exec_mode_options;
+extern ulong slave_type_conversions_options;
extern my_bool opt_readonly, lower_case_file_system;
extern my_bool opt_enable_named_pipe, opt_sync_frm, opt_allow_suspicious_udfs;
extern my_bool opt_secure_auth;
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 896be4a7f19..c9fd9249af3 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -525,6 +525,8 @@ ulong open_files_limit, max_binlog_size, max_relay_log_size;
ulong slave_net_timeout, slave_trans_retries;
ulong slave_exec_mode_options;
const char *slave_exec_mode_str= "STRICT";
+ulong slave_type_conversions_options;
+const char *slave_type_conversions_default= "";
ulong thread_cache_size=0, thread_pool_size= 0;
ulong binlog_cache_size=0;
ulonglong max_binlog_cache_size=0;
@@ -5690,6 +5692,7 @@ enum options_mysqld
#endif /* defined(ENABLED_DEBUG_SYNC) */
OPT_OLD_MODE,
OPT_SLAVE_EXEC_MODE,
+ OPT_SLAVE_TYPE_CONVERSIONS,
OPT_GENERAL_LOG_FILE,
OPT_SLOW_QUERY_LOG_FILE,
OPT_IGNORE_BUILTIN_INNODB
@@ -6413,6 +6416,15 @@ replicating a LOAD DATA INFILE command.",
{"slave-exec-mode", OPT_SLAVE_EXEC_MODE,
"Modes for how replication events should be executed. Legal values are STRICT (default) and IDEMPOTENT. In IDEMPOTENT mode, replication will not stop for operations that are idempotent. In STRICT mode, replication will stop on any unexpected difference between the master and the slave.",
(uchar**) &slave_exec_mode_str, (uchar**) &slave_exec_mode_str, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ {"slave-type-conversions", OPT_SLAVE_TYPE_CONVERSIONS,
+ "Set of slave type conversions that are enabled. Legal values are:"
+ " ALL_LOSSY to enable lossy conversions and"
+ " ALL_NON_LOSSY to enable non-lossy conversions."
+ " If the variable is assigned the empty set, no conversions are"
+ " allowed and it is expected that the types match exactly.",
+ (uchar**) &slave_type_conversions_default,
+ (uchar**) &slave_type_conversions_default,
+ 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#endif
{"slow-query-log", OPT_SLOW_LOG,
"Enable|disable slow query log", (uchar**) &opt_slow_log,
@@ -7671,6 +7683,11 @@ static int mysql_init_variables(void)
slave_exec_mode_options= (uint)
find_bit_type_or_exit(slave_exec_mode_str, &slave_exec_mode_typelib, NULL,
&error);
+ /* Slave type conversions */
+ slave_type_conversions_options= 0;
+ slave_type_conversions_options=
+ find_bit_type_or_exit(slave_type_conversions_default, &slave_type_conversions_typelib,
+ NULL, &error);
if (error)
return 1;
opt_specialflag= SPECIAL_ENGLISH;
@@ -7900,6 +7917,12 @@ mysqld_get_one_option(int optid,
if (error)
return 1;
break;
+ case OPT_SLAVE_TYPE_CONVERSIONS:
+ slave_type_conversions_options= (uint)
+ find_bit_type_or_exit(argument, &slave_type_conversions_typelib, "", &error);
+ if (error)
+ return 1;
+ break;
#endif
case OPT_SAFEMALLOC_MEM_LIMIT:
#if !defined(DBUG_OFF) && defined(SAFEMALLOC)
diff --git a/sql/rpl_record.cc b/sql/rpl_record.cc
index 14a80cbb4b6..ab9b21b11a8 100644
--- a/sql/rpl_record.cc
+++ b/sql/rpl_record.cc
@@ -104,10 +104,10 @@ pack_row(TABLE *table, MY_BITMAP const* cols,
#endif
pack_ptr= field->pack(pack_ptr, field->ptr + offset,
field->max_data_length(), TRUE);
- DBUG_PRINT("debug", ("field: %s; pack_ptr: 0x%lx;"
+ DBUG_PRINT("debug", ("field: %s; real_type: %d, pack_ptr: 0x%lx;"
" pack_ptr':0x%lx; bytes: %d",
- field->field_name, (ulong) old_pack_ptr,
- (ulong) pack_ptr,
+ field->field_name, field->real_type(),
+ (ulong) old_pack_ptr, (ulong) pack_ptr,
(int) (pack_ptr - old_pack_ptr)));
}
@@ -201,10 +201,30 @@ unpack_row(Relay_log_info const *rli,
// The "current" null bits
unsigned int null_bits= *null_ptr++;
uint i= 0;
- table_def *tabledef= ((Relay_log_info*)rli)->get_tabledef(table);
+ table_def *tabledef;
+ TABLE *conv_table;
+ bool table_found= rli->get_table_data(table, &tabledef, &conv_table);
+ DBUG_PRINT("debug", ("Table data: table_found: %d, tabldef: %p, conv_table: %p",
+ table_found, tabledef, conv_table));
+ DBUG_ASSERT(table_found);
+ if (!table_found)
+ return HA_ERR_GENERIC;
+
for (field_ptr= begin_ptr ; field_ptr < end_ptr && *field_ptr ; ++field_ptr)
{
- Field *const f= *field_ptr;
+ /*
+ If there is a conversion table, we pick up the field pointer to
+ the conversion table. If the conversion table or the field
+ pointer is NULL, no conversions are necessary.
+ */
+ Field *conv_field=
+ conv_table ? conv_table->field[field_ptr - begin_ptr] : NULL;
+ Field *const f=
+ conv_field ? conv_field : *field_ptr;
+ DBUG_PRINT("debug", ("Conversion %srequired for field '%s' (#%d)",
+ conv_field ? "" : "not ",
+ (*field_ptr)->field_name, field_ptr - begin_ptr));
+ DBUG_ASSERT(f != NULL);
/*
No need to bother about columns that does not exist: they have
@@ -247,6 +267,39 @@ unpack_row(Relay_log_info const *rli,
(int) (pack_ptr - old_pack_ptr)));
}
+ /*
+ If conv_field is set, then we are doing a conversion. In this
+ case, we have unpacked the master data to the conversion
+ table, so we need to copy the value stored in the conversion
+ table into the final table and do the conversion at the same time.
+ */
+ if (conv_field)
+ {
+ Copy_field copy;
+#ifndef DBUG_OFF
+ char source_buf[MAX_FIELD_WIDTH];
+ char value_buf[MAX_FIELD_WIDTH];
+ String source_type(source_buf, sizeof(source_buf), system_charset_info);
+ String value_string(value_buf, sizeof(value_buf), system_charset_info);
+ conv_field->sql_type(source_type);
+ conv_field->val_str(&value_string);
+ DBUG_PRINT("debug", ("Copying field '%s' of type '%s' with value '%s'",
+ (*field_ptr)->field_name,
+ source_type.c_ptr(), value_string.c_ptr()));
+#endif
+ copy.set(*field_ptr, f, TRUE);
+ (*copy.do_copy)(&copy);
+#ifndef DBUG_OFF
+ char target_buf[MAX_FIELD_WIDTH];
+ String target_type(target_buf, sizeof(target_buf), system_charset_info);
+ (*field_ptr)->sql_type(target_type);
+ (*field_ptr)->val_str(&value_string);
+ DBUG_PRINT("debug", ("Value of field '%s' of type '%s' is now '%s'",
+ (*field_ptr)->field_name,
+ target_type.c_ptr(), value_string.c_ptr()));
+#endif
+ }
+
null_mask <<= 1;
}
i++;
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 171778d9675..7f04a3e5cdd 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -314,13 +314,21 @@ public:
uint tables_to_lock_count; /* RBR: Count of tables to lock */
table_mapping m_table_map; /* RBR: Mapping table-id to table */
- inline table_def *get_tabledef(TABLE *tbl)
+ bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
{
- table_def *td= 0;
- for (TABLE_LIST *ptr= tables_to_lock; ptr && !td; ptr= ptr->next_global)
- if (ptr->table == tbl)
- td= &((RPL_TABLE_LIST *)ptr)->m_tabledef;
- return (td);
+ DBUG_ASSERT(tabledef_var && conv_table_var);
+ for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global)
+ if (ptr->table == table_arg)
+ {
+ *tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef;
+ *conv_table_var= static_cast<RPL_TABLE_LIST*>(ptr)->m_conv_table;
+ DBUG_PRINT("debug", ("Fetching table data for table %s.%s:"
+ " tabledef: %p, conv_table: %p",
+ table_arg->s->db.str, table_arg->s->table_name.str,
+ *tabledef_var, *conv_table_var));
+ return true;
+ }
+ return false;
}
/*
diff --git a/sql/rpl_utility.cc b/sql/rpl_utility.cc
index e34f8561051..f24de23bcf3 100644
--- a/sql/rpl_utility.cc
+++ b/sql/rpl_utility.cc
@@ -14,8 +14,184 @@
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "rpl_utility.h"
+
+#ifndef MYSQL_CLIENT
#include "rpl_rli.h"
+/**
+ Template function to compare two objects.
+ */
+namespace {
+ template <class Type>
+ int compare(Type a, Type b)
+ {
+ if (a < b)
+ return -1;
+ if (b < a)
+ return 1;
+ return 0;
+ }
+}
+
+
+/**
+ Max value for an unsigned integer of 'bits' bits.
+
+ The somewhat contorted expression is to avoid overflow.
+ */
+uint32 uint_max(int bits) {
+ return (((1UL << (bits - 1)) - 1) << 1) | 1;
+}
+
+
+/**
+ Compute the maximum display length of a field.
+
+ @param sql_type Type of the field
+ @param metadata The metadata from the master for the field.
+ @return Maximum length of the field in bytes.
+ */
+static uint32
+max_display_length_for_field(enum_field_types sql_type, unsigned int metadata)
+{
+ DBUG_PRINT("debug", ("sql_type: %d, metadata: 0x%x", sql_type, metadata));
+ DBUG_ASSERT(metadata >> 16 == 0);
+
+ switch (sql_type) {
+ case MYSQL_TYPE_NEWDECIMAL:
+ return metadata >> 8;
+
+ case MYSQL_TYPE_FLOAT:
+ return 12;
+
+ case MYSQL_TYPE_DOUBLE:
+ return 22;
+
+ case MYSQL_TYPE_SET:
+ case MYSQL_TYPE_ENUM:
+ return metadata & 0x00ff;
+
+ case MYSQL_TYPE_STRING:
+ {
+ uchar type= metadata >> 8;
+ if (type == MYSQL_TYPE_SET || type == MYSQL_TYPE_ENUM)
+ return metadata & 0xff;
+ else
+ /* This is taken from Field_string::unpack. */
+ return (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff);
+ }
+
+ case MYSQL_TYPE_YEAR:
+ case MYSQL_TYPE_TINY:
+ return 4;
+
+ case MYSQL_TYPE_SHORT:
+ return 6;
+
+ case MYSQL_TYPE_INT24:
+ return 9;
+
+ case MYSQL_TYPE_LONG:
+ return 11;
+
+#ifdef HAVE_LONG_LONG
+ case MYSQL_TYPE_LONGLONG:
+ return 20;
+
+#endif
+ case MYSQL_TYPE_NULL:
+ return 0;
+
+ case MYSQL_TYPE_NEWDATE:
+ return 3;
+
+ case MYSQL_TYPE_DATE:
+ case MYSQL_TYPE_TIME:
+ return 3;
+
+ case MYSQL_TYPE_TIMESTAMP:
+ return 4;
+
+ case MYSQL_TYPE_DATETIME:
+ return 8;
+
+ case MYSQL_TYPE_BIT:
+ {
+ /*
+ Decode the size of the bit field from the master.
+ from_len is the length in bytes from the master
+ from_bit_len is the number of extra bits stored in the master record
+ */
+ uint from_len= (metadata >> 8U) & 0x00ff;
+ uint from_bit_len= metadata & 0x00ff;
+ DBUG_ASSERT(from_bit_len <= 7);
+ return 8 * from_len + from_bit_len;
+ }
+
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_VARCHAR:
+ return metadata;
+
+ /*
+ The actual length for these types does not really matter since
+ they are used to calc_pack_length, which ignores the given
+ length for these types.
+
+ Since we want this to be accurate for other uses, we return the
+ maximum size in bytes of these BLOBs.
+ */
+
+ case MYSQL_TYPE_TINY_BLOB:
+ return uint_max(1 * 8);
+
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ return uint_max(3 * 8);
+
+ case MYSQL_TYPE_BLOB:
+ /*
+ For the blob type, Field::real_type() lies and say that all
+ blobs are of type MYSQL_TYPE_BLOB. In that case, we have to look
+ at the length instead to decide what the max display size is.
+ */
+ return uint_max(metadata * 8);
+
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_GEOMETRY:
+ return uint_max(4 * 8);
+
+ default:
+ return ~(uint32) 0;
+ }
+}
+
+
+/*
+ Compare the pack lengths of a source field (on the master) and a
+ target field (on the slave).
+
+ @param field Target field.
+ @param type Source field type.
+ @param metadata Source field metadata.
+
+ @retval -1 The length of the source field is smaller than the target field.
+ @retval 0 The length of the source and target fields are the same.
+ @retval 1 The length of the source field is greater than the target field.
+ */
+int compare_lengths(Field *field, enum_field_types source_type, uint16 metadata)
+{
+ DBUG_ENTER("compare_lengths");
+ size_t const source_length=
+ max_display_length_for_field(source_type, metadata);
+ size_t const target_length= field->max_display_length();
+ DBUG_PRINT("debug", ("source_length: %lu, source_type: %u,"
+ " target_length: %lu, target_type: %u",
+ (unsigned long) source_length, source_type,
+ (unsigned long) target_length, field->real_type()));
+ int result= compare(source_length, target_length);
+ DBUG_PRINT("result", ("%d", result));
+ DBUG_RETURN(result);
+}
+
/*********************************************************************
* table_def member definitions *
*********************************************************************/
@@ -169,58 +345,698 @@ uint32 table_def::calc_field_size(uint col, uchar *master_data) const
return length;
}
-/*
+
+/**
+ */
+void show_sql_type(enum_field_types type, uint16 metadata, String *str)
+{
+ DBUG_ENTER("show_sql_type");
+ DBUG_PRINT("enter", ("type: %d, metadata: 0x%x", type, metadata));
+
+ switch (type)
+ {
+ case MYSQL_TYPE_TINY:
+ str->set_ascii(STRING_WITH_LEN("tinyint"));
+ break;
+
+ case MYSQL_TYPE_SHORT:
+ str->set_ascii(STRING_WITH_LEN("smallint"));
+ break;
+
+ case MYSQL_TYPE_LONG:
+ str->set_ascii(STRING_WITH_LEN("int"));
+ break;
+
+ case MYSQL_TYPE_FLOAT:
+ str->set_ascii(STRING_WITH_LEN("float"));
+ break;
+
+ case MYSQL_TYPE_DOUBLE:
+ str->set_ascii(STRING_WITH_LEN("double"));
+ break;
+
+ case MYSQL_TYPE_NULL:
+ str->set_ascii(STRING_WITH_LEN("null"));
+ break;
+
+ case MYSQL_TYPE_TIMESTAMP:
+ str->set_ascii(STRING_WITH_LEN("timestamp"));
+ break;
+
+ case MYSQL_TYPE_LONGLONG:
+ str->set_ascii(STRING_WITH_LEN("bigint"));
+ break;
+
+ case MYSQL_TYPE_INT24:
+ str->set_ascii(STRING_WITH_LEN("mediumint"));
+ break;
+
+ case MYSQL_TYPE_NEWDATE:
+ case MYSQL_TYPE_DATE:
+ str->set_ascii(STRING_WITH_LEN("date"));
+ break;
+
+ case MYSQL_TYPE_TIME:
+ str->set_ascii(STRING_WITH_LEN("time"));
+ break;
+
+ case MYSQL_TYPE_DATETIME:
+ str->set_ascii(STRING_WITH_LEN("datetime"));
+ break;
+
+ case MYSQL_TYPE_YEAR:
+ str->set_ascii(STRING_WITH_LEN("year"));
+ break;
+
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_VARCHAR:
+ {
+ CHARSET_INFO *cs= str->charset();
+ uint32 length=
+ cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
+ "varchar(%u)", metadata);
+ str->length(length);
+ }
+ break;
+
+ case MYSQL_TYPE_BIT:
+ {
+ CHARSET_INFO *cs= str->charset();
+ int bit_length= (metadata >> 8) + (metadata & 0xFF);
+ uint32 length=
+ cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
+ "bit(%d)", bit_length);
+ str->length(length);
+ }
+ break;
+
+ case MYSQL_TYPE_DECIMAL:
+ {
+ CHARSET_INFO *cs= str->charset();
+ uint32 length=
+ cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
+ "decimal(%d,?)", metadata);
+ str->length(length);
+ }
+ break;
+
+ case MYSQL_TYPE_NEWDECIMAL:
+ {
+ CHARSET_INFO *cs= str->charset();
+ uint32 length=
+ cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
+ "decimal(%d,%d)", metadata >> 8, metadata & 0xff);
+ str->length(length);
+ }
+ break;
+
+ case MYSQL_TYPE_ENUM:
+ str->set_ascii(STRING_WITH_LEN("enum"));
+ break;
+
+ case MYSQL_TYPE_SET:
+ str->set_ascii(STRING_WITH_LEN("set"));
+ break;
+
+ case MYSQL_TYPE_BLOB:
+ /*
+ Field::real_type() lies regarding the actual type of a BLOB, so
+ it is necessary to check the pack length to figure out what kind
+ of blob it really is.
+ */
+ switch (get_blob_type_from_length(metadata))
+ {
+ case MYSQL_TYPE_TINY_BLOB:
+ str->set_ascii(STRING_WITH_LEN("tinyblob"));
+ break;
+
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ str->set_ascii(STRING_WITH_LEN("mediumblob"));
+ break;
+
+ case MYSQL_TYPE_LONG_BLOB:
+ str->set_ascii(STRING_WITH_LEN("longblob"));
+ break;
+
+ case MYSQL_TYPE_BLOB:
+ str->set_ascii(STRING_WITH_LEN("blob"));
+ break;
+
+ default:
+ DBUG_ASSERT(0);
+ break;
+ }
+ break;
+
+ case MYSQL_TYPE_STRING:
+ {
+ /*
+ This is taken from Field_string::unpack.
+ */
+ CHARSET_INFO *cs= str->charset();
+ uint bytes= (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff);
+ uint32 length=
+ cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
+ "char(%d)", bytes / cs->mbmaxlen);
+ str->length(length);
+ }
+ break;
+
+ case MYSQL_TYPE_GEOMETRY:
+ str->set_ascii(STRING_WITH_LEN("geometry"));
+ break;
+
+ default:
+ str->set_ascii(STRING_WITH_LEN("<unknown type>"));
+ }
+ DBUG_VOID_RETURN;
+}
+
+
+/**
+ Check the order variable and print errors if the order is not
+ acceptable according to the current settings.
+ */
+bool is_conversion_ok(int order, Relay_log_info *rli)
+{
+ DBUG_ENTER("is_conversion_ok");
+ bool allow_non_lossy=
+ bit_is_set(slave_type_conversions_options, SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY);
+ bool allow_lossy=
+ bit_is_set(slave_type_conversions_options, SLAVE_TYPE_CONVERSIONS_ALL_LOSSY);
+
+ DBUG_PRINT("enter", ("order: %d, flags:%s%s", order,
+ allow_non_lossy ? " ALL_NON_LOSSY" : "",
+ allow_lossy ? " ALL_LOSSY" : ""));
+ if (order < 0 && !allow_non_lossy)
+ {
+ /* !!! Add error message saying that non-lossy conversions need to be allowed. */
+ DBUG_RETURN(false);
+ }
+
+ if (order > 0 && !allow_lossy)
+ {
+ /* !!! Add error message saying that lossy conversions need to be allowed. */
+ DBUG_RETURN(false);
+ }
+
+ DBUG_RETURN(true);
+}
+
+
+/**
+ Can a type potentially be converted to another type?
+
+ This function check if the types are convertible and what
+ conversion is required.
+
+ If conversion is not possible, and error is printed.
+
+ If conversion is possible:
+
+ - *order will be set to -1 if source type is smaller than target
+ type and a non-lossy conversion can be required. This includes
+ the case where the field types are different but types could
+ actually be converted in either direction.
+
+ - *order will be set to 0 if no conversion is required.
+
+ - *order will be set to 1 if the source type is strictly larger
+ than the target type and that conversion is potentially lossy.
+
+ @param[in] field Target field
+ @param[in] type Source field type
+ @param[in] metadata Source field metadata
+ @param[in] rli Relay log info (for error reporting)
+ @param[out] order Order between source field and target field
+
+ @return @c true if conversion is possible according to the current
+ settings, @c false if conversion is not possible according to the
+ current setting.
+ */
+static bool
+can_convert_field_to(Field *field,
+ enum_field_types source_type, uint16 metadata,
+ Relay_log_info *rli, int *order_var)
+{
+ DBUG_ENTER("can_convert_field_to");
+#ifndef DBUG_OFF
+ char field_type_buf[MAX_FIELD_WIDTH];
+ String field_type(field_type_buf, sizeof(field_type_buf), field->charset());
+ field->sql_type(field_type);
+ DBUG_PRINT("enter", ("field_type: %s, target_type: %d, source_type: %d, source_metadata: 0x%x",
+ field_type.c_ptr(), field->real_type(), source_type, metadata));
+#endif
+ /*
+ If the real type is the same, we need to check the metadata to
+ decide if conversions are allowed.
+ */
+ if (field->real_type() == source_type)
+ {
+ DBUG_PRINT("debug", ("Base types are identical, doing field size comparison"));
+ if (field->compatible_field_size(metadata, rli, order_var))
+ DBUG_RETURN(is_conversion_ok(*order_var, rli));
+ else
+ DBUG_RETURN(false);
+ }
+ else if (!slave_type_conversions_options)
+ DBUG_RETURN(false);
+
+ /*
+ Here, from and to will always be different. Since the types are
+ different, we cannot use the compatible_field_size() function, but
+ have to rely on hard-coded max-sizes for fields.
+ */
+
+ DBUG_PRINT("debug", ("Base types are different, checking conversion"));
+ switch (source_type) // Source type (on master)
+ {
+ case MYSQL_TYPE_DECIMAL:
+ case MYSQL_TYPE_NEWDECIMAL:
+ case MYSQL_TYPE_FLOAT:
+ case MYSQL_TYPE_DOUBLE:
+ switch (field->real_type())
+ {
+ case MYSQL_TYPE_NEWDECIMAL:
+ /*
+ Then the other type is either FLOAT, DOUBLE, or old style
+ DECIMAL, so we require lossy conversion.
+ */
+ *order_var= 1;
+ DBUG_RETURN(is_conversion_ok(*order_var, rli));
+
+ case MYSQL_TYPE_DECIMAL:
+ case MYSQL_TYPE_FLOAT:
+ case MYSQL_TYPE_DOUBLE:
+ {
+ if (source_type == MYSQL_TYPE_NEWDECIMAL ||
+ source_type == MYSQL_TYPE_DECIMAL)
+ *order_var = 1; // Always require lossy conversions
+ else
+ *order_var= compare_lengths(field, source_type, metadata);
+ DBUG_ASSERT(*order_var != 0);
+ DBUG_RETURN(is_conversion_ok(*order_var, rli));
+ }
+
+ default:
+ DBUG_RETURN(false);
+ }
+ break;
+
+ /*
+ The length comparison check will do the correct job of comparing
+ the field lengths (in bytes) of two integer types.
+ */
+ case MYSQL_TYPE_TINY:
+ case MYSQL_TYPE_SHORT:
+ case MYSQL_TYPE_INT24:
+ case MYSQL_TYPE_LONG:
+ case MYSQL_TYPE_LONGLONG:
+ switch (field->real_type())
+ {
+ case MYSQL_TYPE_TINY:
+ case MYSQL_TYPE_SHORT:
+ case MYSQL_TYPE_INT24:
+ case MYSQL_TYPE_LONG:
+ case MYSQL_TYPE_LONGLONG:
+ *order_var= compare_lengths(field, source_type, metadata);
+ DBUG_ASSERT(*order_var != 0);
+ DBUG_RETURN(is_conversion_ok(*order_var, rli));
+
+ default:
+ DBUG_RETURN(false);
+ }
+ break;
+
+ /*
+ Since source and target type is different, and it is not possible
+ to convert bit types to anything else, this will return false.
+ */
+ case MYSQL_TYPE_BIT:
+ DBUG_RETURN(false);
+
+ /*
+ If all conversions are disabled, it is not allowed to convert
+ between these types. Since the TEXT vs. BINARY is distinguished by
+ the charset, and the charset is not replicated, we cannot
+ currently distinguish between , e.g., TEXT and BLOB.
+ */
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_STRING:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_VARCHAR:
+ switch (field->real_type())
+ {
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_STRING:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_VARCHAR:
+ *order_var= compare_lengths(field, source_type, metadata);
+ /*
+ Here we know that the types are different, so if the order
+ gives that they do not require any conversion, we still need
+ to have non-lossy conversion enabled to allow conversion
+ between different (string) types of the same length.
+ */
+ if (*order_var == 0)
+ *order_var= -1;
+ DBUG_RETURN(is_conversion_ok(*order_var, rli));
+
+ default:
+ DBUG_RETURN(false);
+ }
+ break;
+
+ case MYSQL_TYPE_GEOMETRY:
+ case MYSQL_TYPE_TIMESTAMP:
+ case MYSQL_TYPE_DATE:
+ case MYSQL_TYPE_TIME:
+ case MYSQL_TYPE_DATETIME:
+ case MYSQL_TYPE_YEAR:
+ case MYSQL_TYPE_NEWDATE:
+ case MYSQL_TYPE_NULL:
+ case MYSQL_TYPE_ENUM:
+ case MYSQL_TYPE_SET:
+ DBUG_RETURN(false);
+ }
+ DBUG_RETURN(false); // To keep GCC happy
+}
+
+
+/**
Is the definition compatible with a table?
+ This function will compare the master table with an existing table
+ on the slave and see if they are compatible with respect to the
+ current settings of @c SLAVE_TYPE_CONVERSIONS.
+
+ If the tables are compatible and conversions are required, @c
+ *tmp_table_var will be set to a virtual temporary table with field
+ pointers for the fields that require conversions. This allow simple
+ checking of whether a conversion are to be applied or not.
+
+ If tables are compatible, but no conversions are necessary, @c
+ *tmp_table_var will be set to NULL.
+
+ @param rli_arg[in]
+ Relay log info, for error reporting.
+
+ @param table[in]
+ Table to compare with
+
+ @param tmp_table_var[out]
+ Virtual temporary table for performing conversions, if necessary.
+
+ @retval true Master table is compatible with slave table.
+ @retval false Master table is not compatible with slave table.
*/
-int
-table_def::compatible_with(Relay_log_info const *rli_arg, TABLE *table)
+bool
+table_def::compatible_with(THD *thd, Relay_log_info *rli,
+ TABLE *table, TABLE **conv_table_var)
const
{
/*
We only check the initial columns for the tables.
*/
uint const cols_to_check= min(table->s->fields, size());
- int error= 0;
- Relay_log_info const *rli= const_cast<Relay_log_info*>(rli_arg);
-
- TABLE_SHARE const *const tsh= table->s;
+ TABLE *tmp_table= NULL;
for (uint col= 0 ; col < cols_to_check ; ++col)
{
Field *const field= table->field[col];
- if (field->type() != type(col))
+ int order;
+ if (can_convert_field_to(field, type(col), field_metadata(col), rli, &order))
{
- DBUG_ASSERT(col < size() && col < tsh->fields);
- DBUG_ASSERT(tsh->db.str && tsh->table_name.str);
- error= 1;
- char buf[256];
- my_snprintf(buf, sizeof(buf), "Column %d type mismatch - "
- "received type %d, %s.%s has type %d",
- col, type(col), tsh->db.str, tsh->table_name.str,
- field->type());
- rli->report(ERROR_LEVEL, ER_BINLOG_ROW_WRONG_TABLE_DEF,
- ER(ER_BINLOG_ROW_WRONG_TABLE_DEF), buf);
+ DBUG_PRINT("debug", ("Checking column %d -"
+ " field '%s' can be converted - order: %d",
+ col, field->field_name, order));
+ DBUG_ASSERT(order >= -1 && order <= 1);
+
+ /*
+ If order is not 0, a conversion is required, so we need to set
+ up the conversion table.
+ */
+ if (order != 0 && tmp_table == NULL)
+ {
+ /*
+ This will create the full table with all fields. This is
+ necessary to ge the correct field lengths for the record.
+ */
+ tmp_table= create_conversion_table(thd, rli, table);
+ if (tmp_table == NULL)
+ return false;
+ /*
+ Clear all fields up to, but not including, this column.
+ */
+ for (unsigned int i= 0; i < col; ++i)
+ tmp_table->field[i]= NULL;
+ }
+
+ if (order == 0 && tmp_table != NULL)
+ tmp_table->field[col]= NULL;
}
- /*
- Check the slave's field size against that of the master.
- */
- if (!error &&
- !field->compatible_field_size(field_metadata(col), rli_arg))
+ else
+ {
+ DBUG_PRINT("debug", ("Checking column %d -"
+ " field '%s' can not be converted",
+ col, field->field_name));
+ DBUG_ASSERT(col < size() && col < table->s->fields);
+ DBUG_ASSERT(table->s->db.str && table->s->table_name.str);
+ const char *db_name= table->s->db.str;
+ const char *tbl_name= table->s->table_name.str;
+ char source_buf[MAX_FIELD_WIDTH];
+ char target_buf[MAX_FIELD_WIDTH];
+ String source_type(source_buf, sizeof(source_buf), field->charset());
+ String target_type(target_buf, sizeof(target_buf), field->charset());
+ show_sql_type(type(col), field_metadata(col), &source_type);
+ field->sql_type(target_type);
+ rli->report(ERROR_LEVEL, ER_SLAVE_CONVERSION_FAILED,
+ ER(ER_SLAVE_CONVERSION_FAILED),
+ col, db_name, tbl_name,
+ source_type.c_ptr(), target_type.c_ptr());
+ return false;
+ }
+ }
+
+#ifndef DBUG_OFF
+ if (tmp_table)
+ {
+ for (unsigned int col= 0; col < tmp_table->s->fields; ++col)
+ if (tmp_table->field[col])
+ {
+ char source_buf[MAX_FIELD_WIDTH];
+ char target_buf[MAX_FIELD_WIDTH];
+ String source_type(source_buf, sizeof(source_buf), table->field[col]->charset());
+ String target_type(target_buf, sizeof(target_buf), table->field[col]->charset());
+ tmp_table->field[col]->sql_type(source_type);
+ table->field[col]->sql_type(target_type);
+ DBUG_PRINT("debug", ("Field %s - conversion required."
+ " Source type: '%s', Target type: '%s'",
+ tmp_table->field[col]->field_name,
+ source_type.c_ptr(), target_type.c_ptr()));
+ }
+ }
+#endif
+
+ *conv_table_var= tmp_table;
+ return true;
+}
+
+/**
+ Create a conversion table.
+
+ If the function is unable to create the conversion table, an error
+ will be printed and NULL will be returned.
+
+ @return Pointer to conversion table, or NULL if unable to create
+ conversion table.
+ */
+
+TABLE *table_def::create_conversion_table(THD *thd, Relay_log_info *rli, TABLE *target_table) const
+{
+ DBUG_ENTER("table_def::create_conversion_table");
+
+ List<Create_field> field_list;
+
+ for (uint col= 0 ; col < size() ; ++col)
+ {
+ Create_field *field_def=
+ (Create_field*) alloc_root(thd->mem_root, sizeof(Create_field));
+ if (field_list.push_back(field_def))
+ DBUG_RETURN(NULL);
+
+ uint decimals= 0;
+ TYPELIB* interval= NULL;
+ uint pack_length= 0;
+ uint32 max_length=
+ max_display_length_for_field(type(col), field_metadata(col));
+
+ switch(type(col))
+ {
+ int precision;
+ case MYSQL_TYPE_ENUM:
+ case MYSQL_TYPE_SET:
+ interval= static_cast<Field_enum*>(target_table->field[col])->typelib;
+ pack_length= field_metadata(col) & 0x00ff;
+ break;
+
+ case MYSQL_TYPE_NEWDECIMAL:
+ /*
+ The display length of a DECIMAL type is not the same as the
+ length that should be supplied to make_field, so we correct
+ the length here.
+ */
+ precision= field_metadata(col) >> 8;
+ decimals= field_metadata(col) & 0x00ff;
+ max_length=
+ my_decimal_precision_to_length(precision, decimals, FALSE);
+ break;
+
+ case MYSQL_TYPE_DECIMAL:
+ precision= field_metadata(col);
+ decimals= static_cast<Field_num*>(target_table->field[col])->dec;
+ max_length= field_metadata(col);
+ break;
+
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_GEOMETRY:
+ pack_length= field_metadata(col) & 0x00ff;
+ break;
+
+ default:
+ break;
+ }
+
+ DBUG_PRINT("debug", ("sql_type: %d, target_field: '%s', max_length: %d, decimals: %d,"
+ " maybe_null: %d, unsigned_flag: %d, pack_length: %u",
+ type(col), target_table->field[col]->field_name,
+ max_length, decimals, TRUE, FALSE, pack_length));
+ field_def->init_for_tmp_table(type(col),
+ max_length,
+ decimals,
+ maybe_null(col), // maybe_null
+ FALSE, // unsigned_flag
+ pack_length);
+ field_def->charset= target_table->field[col]->charset();
+ field_def->interval= interval;
+ }
+
+ TABLE *conv_table= create_virtual_tmp_table(thd, field_list);
+ if (conv_table == NULL)
+ rli->report(ERROR_LEVEL, ER_SLAVE_CANT_CREATE_CONVERSION,
+ ER(ER_SLAVE_CANT_CREATE_CONVERSION),
+ target_table->s->db.str,
+ target_table->s->table_name.str);
+ DBUG_RETURN(conv_table);
+}
+
+#endif /* MYSQL_CLIENT */
+
+table_def::table_def(unsigned char *types, ulong size,
+ uchar *field_metadata, int metadata_size,
+ uchar *null_bitmap)
+ : m_size(size), m_type(0), m_field_metadata_size(metadata_size),
+ m_field_metadata(0), m_null_bits(0), m_memory(NULL)
+{
+ m_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
+ &m_type, size,
+ &m_field_metadata,
+ size * sizeof(uint16),
+ &m_null_bits, (size + 7) / 8,
+ NULL);
+
+ bzero(m_field_metadata, size * sizeof(uint16));
+
+ if (m_type)
+ memcpy(m_type, types, size);
+ else
+ m_size= 0;
+ /*
+ Extract the data from the table map into the field metadata array
+ iff there is field metadata. The variable metadata_size will be
+ 0 if we are replicating from an older version server since no field
+ metadata was written to the table map. This can also happen if
+ there were no fields in the master that needed extra metadata.
+ */
+ if (m_size && metadata_size)
+ {
+ int index= 0;
+ for (unsigned int i= 0; i < m_size; i++)
{
- error= 1;
- char buf[256];
- my_snprintf(buf, sizeof(buf), "Column %d size mismatch - "
- "master has size %d, %s.%s on slave has size %d."
- " Master's column size should be <= the slave's "
- "column size.", col,
- field->pack_length_from_metadata(m_field_metadata[col]),
- tsh->db.str, tsh->table_name.str,
- field->row_pack_length());
- rli->report(ERROR_LEVEL, ER_BINLOG_ROW_WRONG_TABLE_DEF,
- ER(ER_BINLOG_ROW_WRONG_TABLE_DEF), buf);
+ switch (m_type[i]) {
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_DOUBLE:
+ case MYSQL_TYPE_FLOAT:
+ {
+ /*
+ These types store a single byte.
+ */
+ m_field_metadata[i]= field_metadata[index];
+ index++;
+ break;
+ }
+ case MYSQL_TYPE_SET:
+ case MYSQL_TYPE_ENUM:
+ case MYSQL_TYPE_STRING:
+ {
+ uint16 x= field_metadata[index++] << 8U; // real_type
+ x+= field_metadata[index++]; // pack or field length
+ m_field_metadata[i]= x;
+ break;
+ }
+ case MYSQL_TYPE_BIT:
+ {
+ uint16 x= field_metadata[index++];
+ x = x + (field_metadata[index++] << 8U);
+ m_field_metadata[i]= x;
+ break;
+ }
+ case MYSQL_TYPE_VARCHAR:
+ {
+ /*
+ These types store two bytes.
+ */
+ char *ptr= (char *)&field_metadata[index];
+ m_field_metadata[i]= uint2korr(ptr);
+ index= index + 2;
+ break;
+ }
+ case MYSQL_TYPE_NEWDECIMAL:
+ {
+ uint16 x= field_metadata[index++] << 8U; // precision
+ x+= field_metadata[index++]; // decimals
+ m_field_metadata[i]= x;
+ break;
+ }
+ default:
+ m_field_metadata[i]= 0;
+ break;
+ }
}
}
+ if (m_size && null_bitmap)
+ memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8);
+}
+
- return error;
+table_def::~table_def()
+{
+ my_free(m_memory, MYF(0));
+#ifndef DBUG_OFF
+ m_type= 0;
+ m_size= 0;
+#endif
}
+
diff --git a/sql/rpl_utility.h b/sql/rpl_utility.h
index 1f4ca246ff1..97ee668c82d 100644
--- a/sql/rpl_utility.h
+++ b/sql/rpl_utility.h
@@ -21,6 +21,7 @@
#endif
#include "mysql_priv.h"
+#include "mysql_com.h"
class Relay_log_info;
@@ -44,115 +45,18 @@ class table_def
{
public:
/**
- Convenience declaration of the type of the field type data in a
- table map event.
- */
- typedef unsigned char field_type;
-
- /**
Constructor.
- @param types Array of types
+ @param types Array of types, each stored as a byte
@param size Number of elements in array 'types'
@param field_metadata Array of extra information about fields
@param metadata_size Size of the field_metadata array
@param null_bitmap The bitmap of fields that can be null
*/
- table_def(field_type *types, ulong size, uchar *field_metadata,
- int metadata_size, uchar *null_bitmap)
- : m_size(size), m_type(0), m_field_metadata_size(metadata_size),
- m_field_metadata(0), m_null_bits(0), m_memory(NULL)
- {
- m_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
- &m_type, size,
- &m_field_metadata,
- size * sizeof(uint16),
- &m_null_bits, (size + 7) / 8,
- NULL);
+ table_def(unsigned char *types, ulong size, uchar *field_metadata,
+ int metadata_size, uchar *null_bitmap);
- bzero(m_field_metadata, size * sizeof(uint16));
-
- if (m_type)
- memcpy(m_type, types, size);
- else
- m_size= 0;
- /*
- Extract the data from the table map into the field metadata array
- iff there is field metadata. The variable metadata_size will be
- 0 if we are replicating from an older version server since no field
- metadata was written to the table map. This can also happen if
- there were no fields in the master that needed extra metadata.
- */
- if (m_size && metadata_size)
- {
- int index= 0;
- for (unsigned int i= 0; i < m_size; i++)
- {
- switch (m_type[i]) {
- case MYSQL_TYPE_TINY_BLOB:
- case MYSQL_TYPE_BLOB:
- case MYSQL_TYPE_MEDIUM_BLOB:
- case MYSQL_TYPE_LONG_BLOB:
- case MYSQL_TYPE_DOUBLE:
- case MYSQL_TYPE_FLOAT:
- {
- /*
- These types store a single byte.
- */
- m_field_metadata[i]= field_metadata[index];
- index++;
- break;
- }
- case MYSQL_TYPE_SET:
- case MYSQL_TYPE_ENUM:
- case MYSQL_TYPE_STRING:
- {
- uint16 x= field_metadata[index++] << 8U; // real_type
- x+= field_metadata[index++]; // pack or field length
- m_field_metadata[i]= x;
- break;
- }
- case MYSQL_TYPE_BIT:
- {
- uint16 x= field_metadata[index++];
- x = x + (field_metadata[index++] << 8U);
- m_field_metadata[i]= x;
- break;
- }
- case MYSQL_TYPE_VARCHAR:
- {
- /*
- These types store two bytes.
- */
- char *ptr= (char *)&field_metadata[index];
- m_field_metadata[i]= uint2korr(ptr);
- index= index + 2;
- break;
- }
- case MYSQL_TYPE_NEWDECIMAL:
- {
- uint16 x= field_metadata[index++] << 8U; // precision
- x+= field_metadata[index++]; // decimals
- m_field_metadata[i]= x;
- break;
- }
- default:
- m_field_metadata[i]= 0;
- break;
- }
- }
- }
- if (m_size && null_bitmap)
- memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8);
- }
-
- ~table_def() {
- my_free(m_memory, MYF(0));
-#ifndef DBUG_OFF
- m_type= 0;
- m_size= 0;
-#endif
- }
+ ~table_def();
/**
Return the number of fields there is type data for.
@@ -171,10 +75,40 @@ public:
<code>index</code>. Currently, only the type identifier is
returned.
*/
- field_type type(ulong index) const
+ enum_field_types type(ulong index) const
{
DBUG_ASSERT(index < m_size);
- return m_type[index];
+ /*
+ If the source type is MYSQL_TYPE_STRING, it can in reality be
+ either MYSQL_TYPE_STRING, MYSQL_TYPE_ENUM, or MYSQL_TYPE_SET, so
+ we might need to modify the type to get the real type.
+ */
+ enum_field_types source_type= static_cast<enum_field_types>(m_type[index]);
+ uint16 source_metadata= m_field_metadata[index];
+ switch (source_type)
+ {
+ case MYSQL_TYPE_STRING:
+ {
+ int real_type= source_metadata >> 8;
+ if (real_type == MYSQL_TYPE_ENUM || real_type == MYSQL_TYPE_SET)
+ source_type= static_cast<enum_field_types>(real_type);
+ break;
+ }
+
+ /*
+ This type has not been used since before row-based replication,
+ so we can safely assume that it really is MYSQL_TYPE_NEWDATE.
+ */
+ case MYSQL_TYPE_DATE:
+ source_type= MYSQL_TYPE_NEWDATE;
+ break;
+
+ default:
+ /* Do nothing */
+ break;
+ }
+
+ return source_type;
}
@@ -226,23 +160,58 @@ public:
with it.
A table definition is compatible with a table if:
- - the columns types of the table definition is a (not
- necessarily proper) prefix of the column type of the table, or
- - the other way around
+ - The columns types of the table definition is a (not
+ necessarily proper) prefix of the column type of the table.
+ - The other way around.
+
+ - Each column on the master that also exists on the slave can be
+ converted according to the current settings of @c
+ SLAVE_TYPE_CONVERSIONS.
+
+ @param thd
@param rli Pointer to relay log info
@param table Pointer to table to compare with.
+ @param[out] tmp_table_var Pointer to temporary table for holding
+ conversion table.
+
@retval 1 if the table definition is not compatible with @c table
@retval 0 if the table definition is compatible with @c table
*/
#ifndef MYSQL_CLIENT
- int compatible_with(Relay_log_info const *rli, TABLE *table) const;
+ bool compatible_with(THD *thd, Relay_log_info *rli, TABLE *table,
+ TABLE **conv_table_var) const;
+
+ /**
+ Create a virtual in-memory temporary table structure.
+
+ The table structure has records and field array so that a row can
+ be unpacked into the record for further processing.
+
+ In the virtual table, each field that requires conversion will
+ have a non-NULL value, while fields that do not require
+ conversion will have a NULL value.
+
+ Some information that is missing in the events, such as the
+ character set for string types, are taken from the table that the
+ field is going to be pushed into, so the target table that the data
+ eventually need to be pushed into need to be supplied.
+
+ @param thd Thread to allocate memory from.
+ @param rli Relay log info structure, for error reporting.
+ @param target_table Target table for fields.
+
+ @return A pointer to a temporary table with memory allocated in the
+ thread's memroot, NULL if the table could not be created
+ */
+ TABLE *create_conversion_table(THD *thd, Relay_log_info *rli, TABLE *target_table) const;
#endif
+
private:
ulong m_size; // Number of elements in the types array
- field_type *m_type; // Array of type descriptors
+ unsigned char *m_type; // Array of type descriptors
uint m_field_metadata_size;
uint16 *m_field_metadata;
uchar *m_null_bits;
@@ -260,6 +229,7 @@ struct RPL_TABLE_LIST
{
bool m_tabledef_valid;
table_def m_tabledef;
+ TABLE *m_conv_table;
};
diff --git a/sql/set_var.cc b/sql/set_var.cc
index b80bdde9670..624205f3a2e 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -92,6 +92,33 @@ TYPELIB delay_key_write_typelib=
delay_key_write_type_names, NULL
};
+/**
+ SLAVE_TYPE_CONVERSIONS variable.
+
+ Definition is equivalent to
+ @code
+ SET('ALL_NON_LOSSY', 'ALL_LOSSY')
+ @endcode
+ */
+const char *slave_type_conversions_type_name[]= {
+ "ALL_LOSSY",
+ "ALL_NON_LOSSY",
+ NullS
+};
+
+unsigned int slave_type_conversions_type_length[]= {
+ sizeof("ALL_LOSSY")-1,
+ sizeof("ALL_NON_LOSSY")-1,
+ 0
+};
+
+TYPELIB slave_type_conversions_typelib=
+{
+ array_elements(slave_type_conversions_type_name)-1, "",
+ slave_type_conversions_type_name,
+ slave_type_conversions_type_length
+};
+
const char *slave_exec_mode_names[]=
{ "STRICT", "IDEMPOTENT", NullS };
static const unsigned int slave_exec_mode_names_len[]=
@@ -580,6 +607,12 @@ static sys_var_set_slave_mode slave_exec_mode(&vars,
&slave_exec_mode_options,
&slave_exec_mode_typelib,
0);
+static sys_var_set slave_type_conversions(&vars,
+ "slave_type_conversions",
+ &slave_type_conversions_options,
+ &slave_type_conversions_typelib,
+ 0);
+
static sys_var_long_ptr sys_slow_launch_time(&vars, "slow_launch_time",
&slow_launch_time);
static sys_var_thd_ulong sys_sort_buffer(&vars, "sort_buffer_size",
@@ -1261,7 +1294,6 @@ bool sys_var_thd_binlog_format::check(THD *thd, set_var *var) {
return result;
}
-
bool sys_var_thd_binlog_format::is_readonly() const
{
/*
diff --git a/sql/set_var.h b/sql/set_var.h
index fa747107870..31060fa870a 100644
--- a/sql/set_var.h
+++ b/sql/set_var.h
@@ -32,6 +32,7 @@ typedef struct my_locale_st MY_LOCALE;
extern TYPELIB bool_typelib, delay_key_write_typelib, sql_mode_typelib,
optimizer_switch_typelib, slave_exec_mode_typelib;
+extern TYPELIB slave_type_conversions_typelib;
typedef int (*sys_check_func)(THD *, set_var *);
typedef bool (*sys_update_func)(THD *, set_var *);
diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt
index a17ad94ba82..f981b03a538 100644
--- a/sql/share/errmsg.txt
+++ b/sql/share/errmsg.txt
@@ -6213,3 +6213,8 @@ ER_DEBUG_SYNC_TIMEOUT
ER_DEBUG_SYNC_HIT_LIMIT
eng "debug sync point hit limit reached"
ger "Debug Sync Point Hit Limit erreicht"
+
+ER_SLAVE_CONVERSION_FAILED
+ eng "Column %d of table '%-.192s.%-.192s' cannot be converted from type '%-.32s' to type '%-.32s'"
+ER_SLAVE_CANT_CREATE_CONVERSION
+ eng "Can't create conversion table for table '%-.192s.%-.192s'"
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 922e960c805..c3b5d05309f 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -86,6 +86,12 @@ enum enum_delay_key_write { DELAY_KEY_WRITE_NONE, DELAY_KEY_WRITE_ON,
enum enum_slave_exec_mode { SLAVE_EXEC_MODE_STRICT,
SLAVE_EXEC_MODE_IDEMPOTENT,
SLAVE_EXEC_MODE_LAST_BIT};
+enum enum_slave_type_conversions {
+ SLAVE_TYPE_CONVERSIONS_ALL_LOSSY,
+ SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY,
+ SLAVE_TYPE_CONVERSIONS_COUNT
+};
+
enum enum_mark_columns
{ MARK_COLUMNS_NONE, MARK_COLUMNS_READ, MARK_COLUMNS_WRITE};
diff --git a/sql/sql_select.cc b/sql/sql_select.cc
index c5e0bce498d..3e57d384192 100644
--- a/sql/sql_select.cc
+++ b/sql/sql_select.cc
@@ -10462,6 +10462,18 @@ TABLE *create_virtual_tmp_table(THD *thd, List<Create_field> &field_list)
null_bit= 1;
}
}
+ if (cur_field->type() == MYSQL_TYPE_BIT &&
+ cur_field->key_type() == HA_KEYTYPE_BIT)
+ {
+ /* This is a Field_bit since key_type is HA_KEYTYPE_BIT */
+ static_cast<Field_bit*>(cur_field)->set_bit_ptr(null_pos, null_bit);
+ null_bit+= cur_field->field_length & 7;
+ if (null_bit > 7)
+ {
+ null_pos++;
+ null_bit-= 8;
+ }
+ }
cur_field->reset();
field_pos+= cur_field->pack_length();