diff options
author | Konstantin Osipov <kostja@sun.com> | 2010-02-05 01:08:08 +0300 |
---|---|---|
committer | Konstantin Osipov <kostja@sun.com> | 2010-02-05 01:08:08 +0300 |
commit | e7b332ba83f7ecd5dbc518df5d5f4c84fc542552 (patch) | |
tree | a1d34e4650281143b62067088c8965f2cceacb2d /sql | |
parent | 00dc9a6e70512905ef441274b0574fd2503f15b1 (diff) | |
parent | 08bcd2d8f67dcacd36abd2592fa2a59321abe7c2 (diff) | |
download | mariadb-git-e7b332ba83f7ecd5dbc518df5d5f4c84fc542552.tar.gz |
Merge next-mr -> next-4284.
Diffstat (limited to 'sql')
47 files changed, 3553 insertions, 1385 deletions
diff --git a/sql/event_db_repository.cc b/sql/event_db_repository.cc index 432a4a4a555..2037856f7b4 100644 --- a/sql/event_db_repository.cc +++ b/sql/event_db_repository.cc @@ -1052,8 +1052,8 @@ update_timing_fields_for_event(THD *thd, Turn off row binlogging of event timing updates. These are not used for RBR of events replicated to the slave. */ - if (thd->current_stmt_binlog_row_based) - thd->clear_current_stmt_binlog_row_based(); + if (thd->is_current_stmt_binlog_format_row()) + thd->clear_current_stmt_binlog_format_row(); DBUG_ASSERT(thd->security_ctx->master_access & SUPER_ACL); diff --git a/sql/events.cc b/sql/events.cc index 93d98b8624d..2d76f00c897 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -325,8 +325,8 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, Turn off row binlogging of this statement and use statement-based so that all supporting tables are updated for CREATE EVENT command. */ - if (thd->current_stmt_binlog_row_based) - thd->clear_current_stmt_binlog_row_based(); + if (thd->is_current_stmt_binlog_format_row()) + thd->clear_current_stmt_binlog_format_row(); mysql_mutex_lock(&LOCK_event_metadata); @@ -449,8 +449,8 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, Turn off row binlogging of this statement and use statement-based so that all supporting tables are updated for UPDATE EVENT command. */ - if (thd->current_stmt_binlog_row_based) - thd->clear_current_stmt_binlog_row_based(); + if (thd->is_current_stmt_binlog_format_row()) + thd->clear_current_stmt_binlog_format_row(); mysql_mutex_lock(&LOCK_event_metadata); @@ -531,8 +531,8 @@ Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists) Turn off row binlogging of this statement and use statement-based so that all supporting tables are updated for DROP EVENT command. */ - if (thd->current_stmt_binlog_row_based) - thd->clear_current_stmt_binlog_row_based(); + if (thd->is_current_stmt_binlog_format_row()) + thd->clear_current_stmt_binlog_format_row(); mysql_mutex_lock(&LOCK_event_metadata); /* On error conditions my_error() is called so no need to handle here */ diff --git a/sql/field.cc b/sql/field.cc index 56da32959f9..a07493a6964 100644 --- a/sql/field.cc +++ b/sql/field.cc @@ -56,6 +56,8 @@ const char field_separator=','; #define ASSERT_COLUMN_MARKED_FOR_READ DBUG_ASSERT(!table || (!table->read_set || bitmap_is_set(table->read_set, field_index))) #define ASSERT_COLUMN_MARKED_FOR_WRITE DBUG_ASSERT(!table || (!table->write_set || bitmap_is_set(table->write_set, field_index))) +#define FLAGSTR(S,F) ((S) & (F) ? #F " " : "") + /* Rules for merging different types of fields in UNION @@ -994,6 +996,22 @@ test_if_important_data(CHARSET_INFO *cs, const char *str, const char *strend) /** + Function to compare two unsigned integers for their relative order. + Used below. In an anonymous namespace to not clash with definitions + in other files. + */ +namespace { + int compare(unsigned int a, unsigned int b) + { + if (a < b) + return -1; + if (b < a) + return 1; + return 0; +} +} + +/** Detect Item_result by given field type of UNION merge result. @param field_type given field type @@ -1392,22 +1410,48 @@ bool Field::send_binary(Protocol *protocol) /** Check to see if field size is compatible with destination. - This method is used in row-based replication to verify that the slave's - field size is less than or equal to the master's field size. The - encoded field metadata (from the master or source) is decoded and compared - to the size of this field (the slave or destination). + This method is used in row-based replication to verify that the + slave's field size is less than or equal to the master's field + size. The encoded field metadata (from the master or source) is + decoded and compared to the size of this field (the slave or + destination). + + @note + + The comparison is made so that if the source data (from the master) + is less than the target data (on the slave), -1 is returned in @c + <code>*order_var</code>. This implies that a conversion is + necessary, but that it is lossy and can result in truncation of the + value. + + If the source data is strictly greater than the target data, 1 is + returned in <code>*order_var</code>. This implies that the source + type can is contained in the target type and that a conversion is + necessary but is non-lossy. + + If no conversion is required to fit the source type in the target + type, 0 is returned in <code>*order_var</code>. @param field_metadata Encoded size in field metadata + @param mflags Flags from the table map event for the table. + @param order_var Pointer to variable where the order + between the source field and this field + will be returned. - @retval 0 if this field's size is < the source field's size - @retval 1 if this field's size is >= the source field's size + @return @c true if this field's size is compatible with the + master's field size, @c false otherwise. */ -int Field::compatible_field_size(uint field_metadata, - const Relay_log_info *rli_arg __attribute__((unused))) +bool Field::compatible_field_size(uint field_metadata, + Relay_log_info *rli_arg __attribute__((unused)), + uint16 mflags __attribute__((unused)), + int *order_var) { uint const source_size= pack_length_from_metadata(field_metadata); uint const destination_size= row_pack_length(); - return (source_size <= destination_size); + DBUG_PRINT("debug", ("real_type: %d, source_size: %u, destination_size: %u", + real_type(), source_size, destination_size)); + *order_var = compare(source_size, destination_size); + return true; } @@ -2873,33 +2917,16 @@ uint Field_new_decimal::pack_length_from_metadata(uint field_metadata) } -/** - Check to see if field size is compatible with destination. - - This method is used in row-based replication to verify that the slave's - field size is less than or equal to the master's field size. The - encoded field metadata (from the master or source) is decoded and compared - to the size of this field (the slave or destination). - - @param field_metadata Encoded size in field metadata - - @retval 0 if this field's size is < the source field's size - @retval 1 if this field's size is >= the source field's size -*/ -int Field_new_decimal::compatible_field_size(uint field_metadata, - const Relay_log_info * __attribute__((unused))) +bool Field_new_decimal::compatible_field_size(uint field_metadata, + Relay_log_info * __attribute__((unused)), + uint16 mflags __attribute__((unused)), + int *order_var) { - int compatible= 0; uint const source_precision= (field_metadata >> 8U) & 0x00ff; uint const source_decimal= field_metadata & 0x00ff; - uint const source_size= my_decimal_get_binary_size(source_precision, - source_decimal); - uint const destination_size= row_pack_length(); - compatible= (source_size <= destination_size); - if (compatible) - compatible= (source_precision <= precision) && - (source_decimal <= decimals()); - return (compatible); + int order= compare(source_precision, precision); + *order_var= order != 0 ? order : compare(source_decimal, dec); + return true; } @@ -6492,8 +6519,11 @@ check_field_for_37426(const void *param_arg) } #endif -int Field_string::compatible_field_size(uint field_metadata, - const Relay_log_info *rli_arg) +bool +Field_string::compatible_field_size(uint field_metadata, + Relay_log_info *rli_arg, + uint16 mflags __attribute__((unused)), + int *order_var) { #ifdef HAVE_REPLICATION const Check_field_param check_param = { this }; @@ -6501,7 +6531,7 @@ int Field_string::compatible_field_size(uint field_metadata, check_field_for_37426, &check_param)) return FALSE; // Not compatible field sizes #endif - return Field::compatible_field_size(field_metadata, rli_arg); + return Field::compatible_field_size(field_metadata, rli_arg, mflags, order_var); } @@ -6562,6 +6592,8 @@ uchar *Field_string::pack(uchar *to, const uchar *from, { uint length= min(field_length,max_length); uint local_char_length= max_length/field_charset->mbmaxlen; + DBUG_PRINT("debug", ("Packing field '%s' - length: %u ", field_name, length)); + if (length > local_char_length) local_char_length= my_charpos(field_charset, from, from+length, local_char_length); @@ -7224,6 +7256,7 @@ Field_blob::Field_blob(uchar *ptr_arg, uchar *null_ptr_arg, uchar null_bit_arg, cs), packlength(blob_pack_length) { + DBUG_ASSERT(blob_pack_length <= 4); // Only pack lengths 1-4 supported currently flags|= BLOB_FLAG; share->blob_fields++; /* TODO: why do not fill table->s->blob_field array here? */ @@ -7634,8 +7667,10 @@ int Field_blob::key_cmp(const uchar *a,const uchar *b) */ int Field_blob::do_save_field_metadata(uchar *metadata_ptr) { + DBUG_ENTER("Field_blob::do_save_field_metadata"); *metadata_ptr= pack_length_no_ptr(); - return 1; + DBUG_PRINT("debug", ("metadata: %u (pack_length_no_ptr)", *metadata_ptr)); + DBUG_RETURN(1); } @@ -8443,6 +8478,9 @@ Field_bit::Field_bit(uchar *ptr_arg, uint32 len_arg, uchar *null_ptr_arg, bit_ptr(bit_ptr_arg), bit_ofs(bit_ofs_arg), bit_len(len_arg & 7), bytes_in_rec(len_arg / 8) { + DBUG_ENTER("Field_bit::Field_bit"); + DBUG_PRINT("enter", ("ptr_arg: %p, null_ptr_arg: %p, len_arg: %u, bit_len: %u, bytes_in_rec: %u", + ptr_arg, null_ptr_arg, len_arg, bit_len, bytes_in_rec)); flags|= UNSIGNED_FLAG; /* Ensure that Field::eq() can distinguish between two different bit fields. @@ -8450,6 +8488,7 @@ Field_bit::Field_bit(uchar *ptr_arg, uint32 len_arg, uchar *null_ptr_arg, */ if (!null_ptr_arg) null_bit= bit_ofs_arg; + DBUG_VOID_RETURN; } @@ -8734,9 +8773,17 @@ uint Field_bit::get_key_image(uchar *buff, uint length, imagetype type_arg) */ int Field_bit::do_save_field_metadata(uchar *metadata_ptr) { - *metadata_ptr= bit_len; - *(metadata_ptr + 1)= bytes_in_rec; - return 2; + DBUG_ENTER("Field_bit::do_save_field_metadata"); + DBUG_PRINT("debug", ("bit_len: %d, bytes_in_rec: %d", + bit_len, bytes_in_rec)); + /* + Since this class and Field_bit_as_char have different ideas of + what should be stored here, we compute the values of the metadata + explicitly using the field_length. + */ + metadata_ptr[0]= field_length % 8; + metadata_ptr[1]= field_length / 8; + DBUG_RETURN(2); } @@ -8761,34 +8808,34 @@ uint Field_bit::pack_length_from_metadata(uint field_metadata) } -/** - Check to see if field size is compatible with destination. - - This method is used in row-based replication to verify that the slave's - field size is less than or equal to the master's field size. The - encoded field metadata (from the master or source) is decoded and compared - to the size of this field (the slave or destination). +bool +Field_bit::compatible_field_size(uint field_metadata, + Relay_log_info * __attribute__((unused)), + uint16 mflags, + int *order_var) +{ + DBUG_ENTER("Field_bit::compatible_field_size"); + DBUG_ASSERT((field_metadata >> 16) == 0); + uint from_bit_len= + 8 * (field_metadata >> 8) + (field_metadata & 0xff); + uint to_bit_len= max_display_length(); + DBUG_PRINT("debug", ("from_bit_len: %u, to_bit_len: %u", + from_bit_len, to_bit_len)); + /* + If the bit length exact flag is clear, we are dealing with an old + master, so we allow some less strict behaviour if replicating by + moving both bit lengths to an even multiple of 8. - @param field_metadata Encoded size in field metadata + We do this by computing the number of bytes to store the field + instead, and then compare the result. + */ + if (!(mflags & Table_map_log_event::TM_BIT_LEN_EXACT_F)) { + from_bit_len= (from_bit_len + 7) / 8; + to_bit_len= (to_bit_len + 7) / 8; + } - @retval 0 if this field's size is < the source field's size - @retval 1 if this field's size is >= the source field's size -*/ -int Field_bit::compatible_field_size(uint field_metadata, - const Relay_log_info * __attribute__((unused))) -{ - int compatible= 0; - uint const source_size= pack_length_from_metadata(field_metadata); - uint const destination_size= row_pack_length(); - uint const from_bit_len= field_metadata & 0x00ff; - uint const from_len= (field_metadata >> 8U) & 0x00ff; - if ((bit_len == 0) || (from_bit_len == 0)) - compatible= (source_size <= destination_size); - else if (from_bit_len > bit_len) - compatible= (from_len < bytes_in_rec); - else - compatible= ((from_bit_len <= bit_len) && (from_len <= bytes_in_rec)); - return (compatible); + *order_var= compare(from_bit_len, to_bit_len); + DBUG_RETURN(TRUE); } @@ -8854,8 +8901,15 @@ const uchar * Field_bit::unpack(uchar *to, const uchar *from, uint param_data, bool low_byte_first __attribute__((unused))) { + DBUG_ENTER("Field_bit::unpack"); + DBUG_PRINT("enter", ("to: %p, from: %p, param_data: 0x%x", + to, from, param_data)); + DBUG_PRINT("debug", ("bit_ptr: %p, bit_len: %u, bit_ofs: %u", + bit_ptr, bit_len, bit_ofs)); uint const from_len= (param_data >> 8U) & 0x00ff; uint const from_bit_len= param_data & 0x00ff; + DBUG_PRINT("debug", ("from_len: %u, from_bit_len: %u", + from_len, from_bit_len)); /* If the parameter data is zero (i.e., undefined), or if the master and slave have the same sizes, then use the old unpack() method. @@ -8876,7 +8930,7 @@ Field_bit::unpack(uchar *to, const uchar *from, uint param_data, from++; } memcpy(to, from, bytes_in_rec); - return from + bytes_in_rec; + DBUG_RETURN(from + bytes_in_rec); } /* @@ -8902,7 +8956,7 @@ Field_bit::unpack(uchar *to, const uchar *from, uint param_data, bitmap_set_bit(table->write_set,field_index); store(value, new_len, system_charset_info); my_afree(value); - return from + len; + DBUG_RETURN(from + len); } @@ -9030,8 +9084,11 @@ void Create_field::create_length_to_internal_length(void) */ void Create_field::init_for_tmp_table(enum_field_types sql_type_arg, uint32 length_arg, uint32 decimals_arg, - bool maybe_null, bool is_unsigned) + bool maybe_null, bool is_unsigned, + uint pack_length) { + DBUG_ENTER("Create_field::init_for_tmp_table"); + field_name= ""; sql_type= sql_type_arg; char_length= length= length_arg;; @@ -9039,10 +9096,92 @@ void Create_field::init_for_tmp_table(enum_field_types sql_type_arg, interval= 0; charset= &my_charset_bin; geom_type= Field::GEOM_GEOMETRY; - pack_flag= (FIELDFLAG_NUMBER | - ((decimals_arg & FIELDFLAG_MAX_DEC) << FIELDFLAG_DEC_SHIFT) | - (maybe_null ? FIELDFLAG_MAYBE_NULL : 0) | - (is_unsigned ? 0 : FIELDFLAG_DECIMAL)); + + DBUG_PRINT("enter", ("sql_type: %d, length: %u, pack_length: %u", + sql_type_arg, length_arg, pack_length)); + + /* + These pack flags are crafted to get it correctly through the + branches of make_field(). + */ + switch (sql_type_arg) + { + case MYSQL_TYPE_VARCHAR: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_STRING: + case MYSQL_TYPE_SET: + pack_flag= 0; + break; + + case MYSQL_TYPE_GEOMETRY: + pack_flag= FIELDFLAG_GEOM; + break; + + case MYSQL_TYPE_ENUM: + pack_flag= FIELDFLAG_INTERVAL; + break; + + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_NEWDECIMAL: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + pack_flag= FIELDFLAG_DECIMAL | FIELDFLAG_NUMBER | + (decimals_arg & FIELDFLAG_MAX_DEC) << FIELDFLAG_DEC_SHIFT; + break; + + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + pack_flag= FIELDFLAG_BLOB; + break; + + case MYSQL_TYPE_BIT: + pack_flag= FIELDFLAG_NUMBER | FIELDFLAG_TREAT_BIT_AS_CHAR; + break; + + default: + pack_flag= FIELDFLAG_NUMBER; + break; + } + + /* + Set the pack flag correctly for the blob-like types. This sets the + packtype to something that make_field can use. If the pack type is + not set correctly, the packlength will be reeeeally wierd (like + 129 or so). + */ + switch (sql_type_arg) + { + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_GEOMETRY: + // If you are going to use the above types, you have to pass a + // pack_length as parameter. Assert that is really done. + DBUG_ASSERT(pack_length != ~0U); + pack_flag|= pack_length_to_packflag(pack_length); + break; + default: + /* Nothing */ + break; + } + + pack_flag|= + (maybe_null ? FIELDFLAG_MAYBE_NULL : 0) | + (is_unsigned ? 0 : FIELDFLAG_DECIMAL); + + DBUG_PRINT("debug", ("pack_flag: %s%s%s%s%s, pack_type: %d", + FLAGSTR(pack_flag, FIELDFLAG_BINARY), + FLAGSTR(pack_flag, FIELDFLAG_NUMBER), + FLAGSTR(pack_flag, FIELDFLAG_INTERVAL), + FLAGSTR(pack_flag, FIELDFLAG_GEOM), + FLAGSTR(pack_flag, FIELDFLAG_BLOB), + f_packtype(pack_flag))); + DBUG_VOID_RETURN; } @@ -9548,6 +9687,14 @@ Field *make_field(TABLE_SHARE *share, uchar *ptr, uint32 field_length, default: break; } + DBUG_PRINT("debug", ("field_type: %d, field_length: %u, interval: %p, pack_flag: %s%s%s%s%s", + field_type, field_length, interval, + FLAGSTR(pack_flag, FIELDFLAG_BINARY), + FLAGSTR(pack_flag, FIELDFLAG_INTERVAL), + FLAGSTR(pack_flag, FIELDFLAG_NUMBER), + FLAGSTR(pack_flag, FIELDFLAG_PACK), + FLAGSTR(pack_flag, FIELDFLAG_BLOB))); + if (f_is_alpha(pack_flag)) { if (!f_is_packed(pack_flag)) diff --git a/sql/field.h b/sql/field.h index b091431de29..ce874706bc5 100644 --- a/sql/field.h +++ b/sql/field.h @@ -166,22 +166,13 @@ public: table, which is located on disk). */ virtual uint32 pack_length_in_rec() const { return pack_length(); } - virtual int compatible_field_size(uint field_metadata, - const Relay_log_info *); + virtual bool compatible_field_size(uint metadata, Relay_log_info *rli, + uint16 mflags, int *order); virtual uint pack_length_from_metadata(uint field_metadata) - { return field_metadata; } - /* - This method is used to return the size of the data in a row-based - replication row record. The default implementation of returning 0 is - designed to allow fields that do not use metadata to return TRUE (1) - from compatible_field_size() which uses this function in the comparison. - The default value for field metadata for fields that do not have - metadata is 0. Thus, 0 == 0 means the fields are compatible in size. - - Note: While most classes that override this method return pack_length(), - the classes Field_string, Field_varstring, and Field_blob return - field_length + 1, field_length, and pack_length_no_ptr() respectfully. - */ + { + DBUG_ENTER("Field::pack_length_from_metadata"); + DBUG_RETURN(field_metadata); + } virtual uint row_pack_length() { return 0; } virtual int save_field_metadata(uchar *first_byte) { return do_save_field_metadata(first_byte); } @@ -618,6 +609,13 @@ public: int store_decimal(const my_decimal *); my_decimal *val_decimal(my_decimal *); uint is_equal(Create_field *new_field); + uint row_pack_length() { return pack_length(); } + uint32 pack_length_from_metadata(uint field_metadata) { + uint32 length= pack_length(); + DBUG_PRINT("result", ("pack_length_from_metadata(%d): %u", + field_metadata, length)); + return length; + } int check_int(CHARSET_INFO *cs, const char *str, int length, const char *int_end, int error); bool get_int(CHARSET_INFO *cs, const char *from, uint len, @@ -782,8 +780,8 @@ public: uint32 pack_length() const { return (uint32) bin_size; } uint pack_length_from_metadata(uint field_metadata); uint row_pack_length() { return pack_length(); } - int compatible_field_size(uint field_metadata, - const Relay_log_info *rli); + bool compatible_field_size(uint field_metadata, Relay_log_info *rli, + uint16 mflags, int *order_var); uint is_equal(Create_field *new_field); virtual const uchar *unpack(uchar* to, const uchar *from, uint param_data, bool low_byte_first); @@ -1478,9 +1476,12 @@ public: return row_pack_length(); return (((field_metadata >> 4) & 0x300) ^ 0x300) + (field_metadata & 0x00ff); } - int compatible_field_size(uint field_metadata, - const Relay_log_info *rli); - uint row_pack_length() { return (field_length + 1); } + bool compatible_field_size(uint field_metadata, Relay_log_info *rli, + uint16 mflags, int *order_var); + uint row_pack_length() { return field_length; } + int pack_cmp(const uchar *a,const uchar *b,uint key_length, + my_bool insert_or_update); + int pack_cmp(const uchar *b,uint key_length,my_bool insert_or_update); uint packed_col_length(const uchar *to, uint length); uint max_packed_col_length(uint max_length); uint size_of() const { return sizeof(*this); } @@ -1925,8 +1926,8 @@ public: uint pack_length_from_metadata(uint field_metadata); uint row_pack_length() { return (bytes_in_rec + ((bit_len > 0) ? 1 : 0)); } - int compatible_field_size(uint field_metadata, - const Relay_log_info *rli); + bool compatible_field_size(uint metadata, Relay_log_info *rli, + uint16 mflags, int *order_var); void sql_type(String &str) const; virtual uchar *pack(uchar *to, const uchar *from, uint max_length, bool low_byte_first); @@ -2029,7 +2030,8 @@ public: /* Init for a tmp table field. To be extended if need be. */ void init_for_tmp_table(enum_field_types sql_type_arg, uint32 max_length, uint32 decimals, - bool maybe_null, bool is_unsigned); + bool maybe_null, bool is_unsigned, + uint pack_length = ~0U); bool init(THD *thd, char *field_name, enum_field_types type, char *length, char *decimals, uint type_modifier, Item *default_value, diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index 1e6d6e26697..ed2bdfbbe71 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -312,6 +312,7 @@ static void run_query(THD *thd, char *buf, char *end, thd->transaction.all= save_thd_transaction_all; thd->transaction.stmt= save_thd_transaction_stmt; thd->net= save_thd_net; + thd->set_current_stmt_binlog_format_row(); if (thd == injector_thd) { @@ -1880,7 +1881,7 @@ static void ndb_binlog_query(THD *thd, Cluster_schema *schema) thd->db= schema->db; int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED); thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query, - schema->query_length, FALSE, + schema->query_length, FALSE, TRUE, schema->name[0] == 0 || thd->db[0] == 0, errcode); thd->server_id= thd_server_id_save; @@ -3639,6 +3640,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) thd= new THD; /* note that contructor of THD uses DBUG_ */ THD_CHECK_SENTRY(thd); + thd->set_current_stmt_binlog_format_row(); /* We need to set thd->thread_id before thd->store_globals, or it will set an invalid value for thd->variables.pseudo_thread_id. diff --git a/sql/ha_partition.cc b/sql/ha_partition.cc index 20783a97d4b..d994c0bf4a5 100644 --- a/sql/ha_partition.cc +++ b/sql/ha_partition.cc @@ -6502,7 +6502,7 @@ void ha_partition::get_auto_increment(ulonglong offset, ulonglong increment, if (!auto_increment_safe_stmt_log_lock && thd->lex->sql_command != SQLCOM_INSERT && mysql_bin_log.is_open() && - !thd->current_stmt_binlog_row_based && + !thd->is_current_stmt_binlog_format_row() && (thd->variables.option_bits & OPTION_BIN_LOG)) { DBUG_PRINT("info", ("locking auto_increment_safe_stmt_log_lock")); diff --git a/sql/handler.cc b/sql/handler.cc index 41e35a007cf..c29a6568c3f 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -2450,7 +2450,7 @@ int handler::update_auto_increment() variables->auto_increment_increment); auto_inc_intervals_count++; /* Row-based replication does not need to store intervals in binlog */ - if (mysql_bin_log.is_open() && !thd->current_stmt_binlog_row_based) + if (mysql_bin_log.is_open() && !thd->is_current_stmt_binlog_format_row()) thd->auto_inc_intervals_in_cur_stmt_for_binlog.append(auto_inc_interval_for_cur_row.minimum(), auto_inc_interval_for_cur_row.values(), variables->auto_increment_increment); @@ -4434,7 +4434,7 @@ static bool check_table_binlog_row_based(THD *thd, TABLE *table) DBUG_ASSERT(table->s->cached_row_logging_check == 0 || table->s->cached_row_logging_check == 1); - return (thd->current_stmt_binlog_row_based && + return (thd->is_current_stmt_binlog_format_row() && table->s->cached_row_logging_check && (thd->variables.option_bits & OPTION_BIN_LOG) && mysql_bin_log.is_open()); @@ -4491,7 +4491,21 @@ static int write_locked_table_maps(THD *thd) if (table->current_lock == F_WRLCK && check_table_binlog_row_based(thd, table)) { - int const has_trans= table->file->has_transactions(); + /* + We need to have a transactional behavior for SQLCOM_CREATE_TABLE + (e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a + compatible behavior with the STMT based replication even when + the table is not transactional. In other words, if the operation + fails while executing the insert phase nothing is written to the + binlog. + + Note that at this point, we check the type of a set of tables to + create the table map events. In the function binlog_log_row(), + which calls the current function, we check the type of the table + of the current row. + */ + bool const has_trans= thd->lex->sql_command == SQLCOM_CREATE_TABLE || + table->file->has_transactions(); int const error= thd->binlog_write_table_map(table, has_trans); /* If an error occurs, it is the responsibility of the caller to @@ -4540,10 +4554,20 @@ static int binlog_log_row(TABLE* table, { bitmap_set_all(&cols); if (likely(!(error= write_locked_table_maps(thd)))) - error= (*log_func)(thd, table, table->file->has_transactions(), - &cols, table->s->fields, + { + /* + We need to have a transactional behavior for SQLCOM_CREATE_TABLE + (i.e. CREATE TABLE... SELECT * FROM TABLE) in order to keep a + compatible behavior with the STMT based replication even when + the table is not transactional. In other words, if the operation + fails while executing the insert phase nothing is written to the + binlog. + */ + bool const has_trans= thd->lex->sql_command == SQLCOM_CREATE_TABLE || + table->file->has_transactions(); + error= (*log_func)(thd, table, has_trans, &cols, table->s->fields, before_record, after_record); - + } if (!use_bitbuf) bitmap_free(&cols); } diff --git a/sql/item_create.cc b/sql/item_create.cc index c00b5ec1701..7d36481a23b 100644 --- a/sql/item_create.cc +++ b/sql/item_create.cc @@ -2387,10 +2387,11 @@ Create_udf_func::create(THD *thd, udf_func *udf, List<Item> *item_list) Item *func= NULL; int arg_count= 0; + DBUG_ENTER("Create_udf_func::create"); if (item_list != NULL) arg_count= item_list->elements; - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_UDF); DBUG_ASSERT( (udf->type == UDFTYPE_FUNCTION) || (udf->type == UDFTYPE_AGGREGATE)); @@ -2474,7 +2475,7 @@ Create_udf_func::create(THD *thd, udf_func *udf, List<Item> *item_list) } } thd->lex->safe_to_cache_query= 0; - return func; + DBUG_RETURN(func); } #endif @@ -3400,9 +3401,10 @@ Create_func_found_rows Create_func_found_rows::s_singleton; Item* Create_func_found_rows::create(THD *thd) { - thd->lex->set_stmt_unsafe(); + DBUG_ENTER("Create_func_found_rows::create"); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->safe_to_cache_query= 0; - return new (thd->mem_root) Item_func_found_rows(); + DBUG_RETURN(new (thd->mem_root) Item_func_found_rows()); } @@ -3561,7 +3563,7 @@ Create_func_get_lock Create_func_get_lock::s_singleton; Item* Create_func_get_lock::create(THD *thd, Item *arg1, Item *arg2) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->uncacheable(UNCACHEABLE_SIDEEFFECT); return new (thd->mem_root) Item_func_get_lock(arg1, arg2); } @@ -3673,7 +3675,7 @@ Create_func_is_free_lock Create_func_is_free_lock::s_singleton; Item* Create_func_is_free_lock::create(THD *thd, Item *arg1) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->uncacheable(UNCACHEABLE_SIDEEFFECT); return new (thd->mem_root) Item_func_is_free_lock(arg1); } @@ -3684,7 +3686,7 @@ Create_func_is_used_lock Create_func_is_used_lock::s_singleton; Item* Create_func_is_used_lock::create(THD *thd, Item *arg1) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->uncacheable(UNCACHEABLE_SIDEEFFECT); return new (thd->mem_root) Item_func_is_used_lock(arg1); } @@ -3831,9 +3833,10 @@ Create_func_load_file Create_func_load_file::s_singleton; Item* Create_func_load_file::create(THD *thd, Item *arg1) { - thd->lex->set_stmt_unsafe(); + DBUG_ENTER("Create_func_load_file::create"); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->uncacheable(UNCACHEABLE_SIDEEFFECT); - return new (thd->mem_root) Item_load_file(arg1); + DBUG_RETURN(new (thd->mem_root) Item_load_file(arg1)); } @@ -4001,7 +4004,7 @@ Create_func_master_pos_wait::create_native(THD *thd, LEX_STRING name, Item *func= NULL; int arg_count= 0; - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); if (item_list != NULL) arg_count= item_list->elements; @@ -4245,7 +4248,7 @@ Create_func_release_lock Create_func_release_lock::s_singleton; Item* Create_func_release_lock::create(THD *thd, Item *arg1) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->uncacheable(UNCACHEABLE_SIDEEFFECT); return new (thd->mem_root) Item_func_release_lock(arg1); } @@ -4303,9 +4306,10 @@ Create_func_row_count Create_func_row_count::s_singleton; Item* Create_func_row_count::create(THD *thd) { - thd->lex->set_stmt_unsafe(); + DBUG_ENTER("Create_func_row_count::create"); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->safe_to_cache_query= 0; - return new (thd->mem_root) Item_func_row_count(); + DBUG_RETURN(new (thd->mem_root) Item_func_row_count()); } @@ -4368,7 +4372,7 @@ Create_func_sleep Create_func_sleep::s_singleton; Item* Create_func_sleep::create(THD *thd, Item *arg1) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->uncacheable(UNCACHEABLE_SIDEEFFECT); return new (thd->mem_root) Item_func_sleep(arg1); } @@ -4622,9 +4626,10 @@ Create_func_uuid Create_func_uuid::s_singleton; Item* Create_func_uuid::create(THD *thd) { - thd->lex->set_stmt_unsafe(); + DBUG_ENTER("Create_func_uuid::create"); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->safe_to_cache_query= 0; - return new (thd->mem_root) Item_func_uuid(); + DBUG_RETURN(new (thd->mem_root) Item_func_uuid()); } @@ -4633,9 +4638,10 @@ Create_func_uuid_short Create_func_uuid_short::s_singleton; Item* Create_func_uuid_short::create(THD *thd) { - thd->lex->set_stmt_unsafe(); + DBUG_ENTER("Create_func_uuid_short::create"); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->safe_to_cache_query= 0; - return new (thd->mem_root) Item_func_uuid_short(); + DBUG_RETURN(new (thd->mem_root) Item_func_uuid_short()); } @@ -4644,7 +4650,7 @@ Create_func_version Create_func_version::s_singleton; Item* Create_func_version::create(THD *thd) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); return new (thd->mem_root) Item_static_string_func("version()", server_version, (uint) strlen(server_version), diff --git a/sql/lex.h b/sql/lex.h index 91ef5287a1e..5493206c214 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -196,6 +196,7 @@ static SYMBOL symbols[] = { { "ENGINE", SYM(ENGINE_SYM)}, { "ENGINES", SYM(ENGINES_SYM)}, { "ENUM", SYM(ENUM)}, + { "ERROR", SYM(ERROR_SYM)}, { "ERRORS", SYM(ERRORS)}, { "ESCAPE", SYM(ESCAPE_SYM)}, { "ESCAPED", SYM(ESCAPED)}, @@ -230,6 +231,7 @@ static SYMBOL symbols[] = { { "FULL", SYM(FULL)}, { "FULLTEXT", SYM(FULLTEXT_SYM)}, { "FUNCTION", SYM(FUNCTION_SYM)}, + { "GENERAL", SYM(GENERAL)}, { "GEOMETRY", SYM(GEOMETRY_SYM)}, { "GEOMETRYCOLLECTION",SYM(GEOMETRYCOLLECTION)}, { "GET_FORMAT", SYM(GET_FORMAT)}, @@ -438,6 +440,7 @@ static SYMBOL symbols[] = { { "REDUNDANT", SYM(REDUNDANT_SYM)}, { "REFERENCES", SYM(REFERENCES)}, { "REGEXP", SYM(REGEXP)}, + { "RELAY", SYM(RELAY)}, { "RELAYLOG", SYM(RELAYLOG_SYM)}, { "RELAY_LOG_FILE", SYM(RELAY_LOG_FILE_SYM)}, { "RELAY_LOG_POS", SYM(RELAY_LOG_POS_SYM)}, @@ -493,6 +496,7 @@ static SYMBOL symbols[] = { { "SIGNED", SYM(SIGNED_SYM)}, { "SIMPLE", SYM(SIMPLE_SYM)}, { "SLAVE", SYM(SLAVE)}, + { "SLOW", SYM(SLOW)}, { "SNAPSHOT", SYM(SNAPSHOT_SYM)}, { "SMALLINT", SYM(SMALLINT)}, { "SOCKET", SYM(SOCKET_SYM)}, diff --git a/sql/log.cc b/sql/log.cc index 9ccb1d3aef5..78ae332c353 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -193,115 +193,155 @@ private: }; /* - Helper class to store binary log transaction data. + Helper classes to store non-transactional and transactional data + before copying it to the binary log. */ -class binlog_trx_data { +class binlog_cache_data +{ public: - binlog_trx_data() - : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0), - before_stmt_pos(MY_OFF_T_UNDEF) + binlog_cache_data(): m_pending(0), before_stmt_pos (MY_OFF_T_UNDEF), + incident(FALSE) { - trans_log.end_of_file= max_binlog_cache_size; + cache_log.end_of_file= max_binlog_cache_size; } - ~binlog_trx_data() + ~binlog_cache_data() { - DBUG_ASSERT(pending() == NULL); - close_cached_file(&trans_log); + DBUG_ASSERT(empty()); + close_cached_file(&cache_log); } - my_off_t position() const { - return my_b_tell(&trans_log); + bool empty() const + { + return pending() == NULL && my_b_tell(&cache_log) == 0; } - bool empty() const + Rows_log_event *pending() const { - return pending() == NULL && my_b_tell(&trans_log) == 0; + return m_pending; } - /* - Truncate the transaction cache to a certain position. This - includes deleting the pending event. - */ - void truncate(my_off_t pos) + void set_pending(Rows_log_event *const pending) { - DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos)); - DBUG_PRINT("info", ("before_stmt_pos=%lu", (ulong) pos)); - if (pending()) - { - delete pending(); - } - set_pending(0); - reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0); - trans_log.end_of_file= max_binlog_cache_size; - if (pos < before_stmt_pos) - before_stmt_pos= MY_OFF_T_UNDEF; + m_pending= pending; + } - /* - The only valid positions that can be truncated to are at the - beginning of a statement. We are relying on this fact to be able - to set the at_least_one_stmt_committed flag correctly. In other word, if - we are truncating to the beginning of the transaction cache, - there will be no statements in the cache, otherwhise, we will - have at least one statement in the transaction cache. - */ - at_least_one_stmt_committed= (pos > 0); + void set_incident(void) + { + incident= TRUE; + } + + bool has_incident(void) + { + return(incident); } - /* - Reset the entire contents of the transaction cache, emptying it - completely. - */ - void reset() { - if (!empty()) - truncate(0); - before_stmt_pos= MY_OFF_T_UNDEF; + void reset() + { + truncate(0); incident= FALSE; - trans_log.end_of_file= max_binlog_cache_size; + before_stmt_pos= MY_OFF_T_UNDEF; + cache_log.end_of_file= max_binlog_cache_size; DBUG_ASSERT(empty()); } - Rows_log_event *pending() const + my_off_t get_byte_position() const { - return m_pending; + return my_b_tell(&cache_log); } - void set_pending(Rows_log_event *const pending) + my_off_t get_prev_position() { - m_pending= pending; + return(before_stmt_pos); } - IO_CACHE trans_log; // The transaction cache - - void set_incident(void) + void set_prev_position(my_off_t pos) { - incident= TRUE; + before_stmt_pos= pos; } - bool has_incident(void) + void restore_prev_position() { - return(incident); + truncate(before_stmt_pos); } - /** - Boolean that is true if there is at least one statement in the - transaction cache. + void restore_savepoint(my_off_t pos) + { + truncate(pos); + if (pos < before_stmt_pos) + before_stmt_pos= MY_OFF_T_UNDEF; + } + + /* + Cache to store data before copying it to the binary log. */ - bool at_least_one_stmt_committed; - bool incident; + IO_CACHE cache_log; private: /* - Pending binrows event. This event is the event where the rows are - currently written. + Pending binrows event. This event is the event where the rows are currently + written. */ Rows_log_event *m_pending; -public: /* Binlog position before the start of the current statement. */ my_off_t before_stmt_pos; + + /* + This indicates that some events did not get into the cache and most likely + it is corrupted. + */ + bool incident; + + /* + It truncates the cache to a certain position. This includes deleting the + pending event. + */ + void truncate(my_off_t pos) + { + DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos)); + if (pending()) + { + delete pending(); + set_pending(0); + } + reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, 0); + cache_log.end_of_file= max_binlog_cache_size; + } + + binlog_cache_data& operator=(const binlog_cache_data& info); + binlog_cache_data(const binlog_cache_data& info); +}; + +class binlog_cache_mngr { +public: + binlog_cache_mngr() {} + + void reset_cache(binlog_cache_data* cache_data) + { + cache_data->reset(); + } + + binlog_cache_data* get_binlog_cache_data(bool is_transactional) + { + return (is_transactional ? &trx_cache : &stmt_cache); + } + + IO_CACHE* get_binlog_cache_log(bool is_transactional) + { + return (is_transactional ? &trx_cache.cache_log : &stmt_cache.cache_log); + } + + binlog_cache_data stmt_cache; + + binlog_cache_data trx_cache; + +private: + + binlog_cache_mngr& operator=(const binlog_cache_mngr& info); + binlog_cache_mngr(const binlog_cache_mngr& info); }; handlerton *binlog_hton; @@ -966,6 +1006,54 @@ bool LOGGER::flush_logs(THD *thd) } +/** + Close and reopen the slow log (with locks). + + @returns FALSE. +*/ +bool LOGGER::flush_slow_log() +{ + /* + Now we lock logger, as nobody should be able to use logging routines while + log tables are closed + */ + logger.lock_exclusive(); + + /* Reopen slow log file */ + if (opt_slow_log) + file_log_handler->get_mysql_slow_log()->reopen_file(); + + /* End of log flush */ + logger.unlock(); + + return 0; +} + + +/** + Close and reopen the general log (with locks). + + @returns FALSE. +*/ +bool LOGGER::flush_general_log() +{ + /* + Now we lock logger, as nobody should be able to use logging routines while + log tables are closed + */ + logger.lock_exclusive(); + + /* Reopen general log file */ + if (opt_log) + file_log_handler->get_mysql_log()->reopen_file(); + + /* End of log flush */ + logger.unlock(); + + return 0; +} + + /* Log slow query with all enabled log event handlers @@ -1294,26 +1382,6 @@ int LOGGER::set_handlers(uint error_log_printer, return 0; } -/** - This function checks if a transactional talbe was updated by the - current statement. - - @param thd The client thread that executed the current statement. - @return - @c true if a transactional table was updated, @false otherwise. -*/ -static bool stmt_has_updated_trans_table(THD *thd) -{ - Ha_trx_info *ha_info; - - for (ha_info= thd->transaction.stmt.ha_list; ha_info && ha_info->is_started(); ha_info= ha_info->next()) - { - if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton) - return (TRUE); - } - return (FALSE); -} - /* Save position of binary log transaction cache. @@ -1336,10 +1404,10 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos) DBUG_ASSERT(pos != NULL); if (thd_get_ha_data(thd, binlog_hton) == NULL) thd->binlog_setup_trx_data(); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); DBUG_ASSERT(mysql_bin_log.is_open()); - *pos= trx_data->position(); + *pos= cache_mngr->trx_cache.get_byte_position(); DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos)); DBUG_VOID_RETURN; } @@ -1370,9 +1438,9 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos) /* Only true if binlog_trans_log_savepos() wasn't called before */ DBUG_ASSERT(pos != ~(my_off_t) 0); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - trx_data->truncate(pos); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + cache_mngr->trx_cache.restore_savepoint(pos); DBUG_VOID_RETURN; } @@ -1401,115 +1469,127 @@ int binlog_init(void *p) static int binlog_close_connection(handlerton *hton, THD *thd) { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - DBUG_ASSERT(trx_data->empty()); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + DBUG_ASSERT(cache_mngr->trx_cache.empty() && cache_mngr->stmt_cache.empty()); thd_set_ha_data(thd, binlog_hton, NULL); - trx_data->~binlog_trx_data(); - my_free((uchar*)trx_data, MYF(0)); + cache_mngr->~binlog_cache_mngr(); + my_free((uchar*)cache_mngr, MYF(0)); return 0; } -/* - End a transaction. +/** + This function flushes a transactional cache upon commit/rollback. - SYNOPSIS - binlog_end_trans() + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache data to be flushed + @param end_ev The end event either commit/rollback. + + @return + nonzero if an error pops up when flushing the transactional cache. +*/ +static int +binlog_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, + Log_event *end_ev) +{ + DBUG_ENTER("binlog_flush_trx_cache"); + int error=0; + IO_CACHE *cache_log= &cache_mngr->trx_cache.cache_log; - thd The thread whose transaction should be ended - trx_data Pointer to the transaction data to use - end_ev The end event to use, or NULL - all True if the entire transaction should be ended, false if - only the statement transaction should be ended. + /* + This function handles transactional changes and as such + this flag equals to true. + */ + bool const is_transactional= TRUE; - DESCRIPTION + if (thd->binlog_flush_pending_rows_event(TRUE, is_transactional)) + DBUG_RETURN(1); + /* + Doing a commit or a rollback including non-transactional tables, + i.e., ending a transaction where we might write the transaction + cache to the binary log. + + We can always end the statement when ending a transaction since + transactions are not allowed inside stored functions. If they + were, we would have to ensure that we're not ending a statement + inside a stored function. + */ + error= mysql_bin_log.write(thd, &cache_mngr->trx_cache.cache_log, end_ev, + cache_mngr->trx_cache.has_incident()); + cache_mngr->reset_cache(&cache_mngr->trx_cache); - End the currently open transaction. The transaction can be either - a real transaction (if 'all' is true) or a statement transaction - (if 'all' is false). + /* + We need to step the table map version after writing the + transaction cache to disk. + */ + mysql_bin_log.update_table_map_version(); + statistic_increment(binlog_cache_use, &LOCK_status); + if (cache_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + cache_log->disk_writes= 0; + } - If 'end_ev' is NULL, the transaction is a rollback of only - transactional tables, so the transaction cache will be truncated - to either just before the last opened statement transaction (if - 'all' is false), or reset completely (if 'all' is true). - */ + DBUG_ASSERT(cache_mngr->trx_cache.empty()); + DBUG_RETURN(error); +} + +/** + This function truncates the transactional cache upon committing or rolling + back either a transaction or a statement. + + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache data to be flushed + @param all @c true means truncate the transaction, otherwise the + statement must be truncated. + + @return + nonzero if an error pops up when truncating the transactional cache. +*/ static int -binlog_end_trans(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev, bool all) +binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) { - DBUG_ENTER("binlog_end_trans"); + DBUG_ENTER("binlog_truncate_trx_cache"); int error=0; - IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_PRINT("enter", ("transaction: %s end_ev: 0x%lx", - all ? "all" : "stmt", (long) end_ev)); - DBUG_PRINT("info", ("thd->options={ %s%s}", - FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT), - FLAGSTR(thd->variables.option_bits, OPTION_BEGIN))); + /* + This function handles transactional changes and as such this flag + equals to true. + */ + bool const is_transactional= TRUE; + DBUG_PRINT("info", ("thd->options={ %s%s}, transaction: %s", + FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->variables.option_bits, OPTION_BEGIN), + all ? "all" : "stmt")); /* - NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of - only transactional tables. If the transaction contain changes to - any non-transactiona tables, we need write the transaction and log - a ROLLBACK last. + If rolling back an entire transaction or a single statement not + inside a transaction, we reset the transaction cache. */ - if (end_ev != NULL) + thd->binlog_remove_pending_rows_event(TRUE, is_transactional); + if (all || !thd->in_multi_stmt_transaction()) { - if (thd->binlog_flush_pending_rows_event(TRUE)) - DBUG_RETURN(1); - /* - Doing a commit or a rollback including non-transactional tables, - i.e., ending a transaction where we might write the transaction - cache to the binary log. - - We can always end the statement when ending a transaction since - transactions are not allowed inside stored functions. If they - were, we would have to ensure that we're not ending a statement - inside a stored function. - */ - error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev, - trx_data->has_incident()); - trx_data->reset(); + if (cache_mngr->trx_cache.has_incident()) + error= mysql_bin_log.write_incident(thd, TRUE); - /* - We need to step the table map version after writing the - transaction cache to disk. - */ - mysql_bin_log.update_table_map_version(); - statistic_increment(binlog_cache_use, &LOCK_status); - if (trans_log->disk_writes != 0) - { - statistic_increment(binlog_cache_disk_use, &LOCK_status); - trans_log->disk_writes= 0; - } + cache_mngr->reset_cache(&cache_mngr->trx_cache); + + thd->clear_binlog_table_maps(); } + /* + If rolling back a statement in a transaction, we truncate the + transaction cache to remove the statement. + */ else - { - /* - If rolling back an entire transaction or a single statement not - inside a transaction, we reset the transaction cache. - - If rolling back a statement in a transaction, we truncate the - transaction cache to remove the statement. - */ - thd->binlog_remove_pending_rows_event(TRUE); - if (all || !thd->in_multi_stmt_transaction()) - { - if (trx_data->has_incident()) - error= mysql_bin_log.write_incident(thd, TRUE); - trx_data->reset(); - } - else // ...statement - trx_data->truncate(trx_data->before_stmt_pos); + cache_mngr->trx_cache.restore_prev_position(); - /* - We need to step the table map version on a rollback to ensure - that a new table map event is generated instead of the one that - was written to the thrown-away transaction cache. - */ - mysql_bin_log.update_table_map_version(); - } + /* + We need to step the table map version on a rollback to ensure that a new + table map event is generated instead of the one that was written to the + thrown-away transaction cache. + */ + mysql_bin_log.update_table_map_version(); - DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); + DBUG_ASSERT(thd->binlog_get_pending_rows_event(is_transactional) == NULL); DBUG_RETURN(error); } @@ -1525,10 +1605,56 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) } /** + This function flushes the non-transactional to the binary log upon + committing or rolling back a statement. + + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache data to be flushed + + @return + nonzero if an error pops up when flushing the non-transactional cache. +*/ +static int +binlog_flush_stmt_cache(THD *thd, binlog_cache_mngr *cache_mngr) +{ + int error= 0; + DBUG_ENTER("binlog_flush_stmt_cache"); + /* + If we are flushing the statement cache, it means that the changes get + through otherwise the cache is empty and this routine should not be called. + */ + DBUG_ASSERT(cache_mngr->stmt_cache.has_incident() == FALSE); + /* + This function handles non-transactional changes and as such this flag equals + to false. + */ + bool const is_transactional= FALSE; + IO_CACHE *cache_log= &cache_mngr->stmt_cache.cache_log; + thd->binlog_flush_pending_rows_event(TRUE, is_transactional); + Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE, TRUE, 0); + if ((error= mysql_bin_log.write(thd, cache_log, &qev, + cache_mngr->stmt_cache.has_incident()))) + DBUG_RETURN(error); + cache_mngr->reset_cache(&cache_mngr->stmt_cache); + + /* + We need to step the table map version after writing the + transaction cache to disk. + */ + mysql_bin_log.update_table_map_version(); + statistic_increment(binlog_cache_use, &LOCK_status); + if (cache_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + cache_log->disk_writes= 0; + } + DBUG_RETURN(error); +} + +/** This function is called once after each statement. - It has the responsibility to flush the transaction cache to the - binlog file on commits. + It has the responsibility to flush the caches to the binary log on commits. @param hton The binlog handlerton. @param thd The client thread that executes the transaction. @@ -1541,56 +1667,53 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) { int error= 0; DBUG_ENTER("binlog_commit"); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + bool const in_transaction= thd->in_multi_stmt_transaction(); + + DBUG_PRINT("debug", + ("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", + all, + YESNO(in_transaction), + YESNO(thd->transaction.all.modified_non_trans_table), + YESNO(thd->transaction.stmt.modified_non_trans_table))); + + if (!cache_mngr->stmt_cache.empty()) + { + binlog_flush_stmt_cache(thd, cache_mngr); + } - if (trx_data->empty()) + if (cache_mngr->trx_cache.empty()) { - // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid() - trx_data->reset(); + /* + we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() + */ + cache_mngr->reset_cache(&cache_mngr->trx_cache); DBUG_RETURN(0); } /* We commit the transaction if: - - We are not in a transaction and committing a statement, or - - - We are in a transaction and a full transaction is committed - - Otherwise, we accumulate the statement + - We are in a transaction and a full transaction is committed. + Otherwise, we accumulate the changes. */ - bool const in_transaction= thd->in_multi_stmt_transaction(); - DBUG_PRINT("debug", - ("all: %d, empty: %s, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", - all, - YESNO(trx_data->empty()), - YESNO(in_transaction), - YESNO(thd->transaction.all.modified_non_trans_table), - YESNO(thd->transaction.stmt.modified_non_trans_table))); - if (!in_transaction || all || - (!all && !trx_data->at_least_one_stmt_committed && - !stmt_has_updated_trans_table(thd) && - thd->transaction.stmt.modified_non_trans_table)) + if (!in_transaction || all) { - Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); - error= binlog_end_trans(thd, trx_data, &qev, all); + Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE, TRUE, 0); + error= binlog_flush_trx_cache(thd, cache_mngr, &qev); } - trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0; - + /* + This is part of the stmt rollback. + */ if (!all) - trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt commit + cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); DBUG_RETURN(error); } /** - This function is called when a transaction involving a transactional - table is rolled back. - - It has the responsibility to flush the transaction cache to the - binlog file. However, if the transaction does not involve - non-transactional tables, nothing needs to be logged. + This function is called when a transaction or a statement is rolled back. @param hton The binlog handlerton. @param thd The client thread that executes the transaction. @@ -1603,18 +1726,38 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) { DBUG_ENTER("binlog_rollback"); int error=0; - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - - if (trx_data->empty()) { - trx_data->reset(); - DBUG_RETURN(0); - } + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", YESNO(all), YESNO(thd->transaction.all.modified_non_trans_table), YESNO(thd->transaction.stmt.modified_non_trans_table))); + + /* + If an incident event is set we do not flush the content of the statement + cache because it may be corrupted. + */ + if (cache_mngr->stmt_cache.has_incident()) + { + mysql_bin_log.write_incident(thd, TRUE); + cache_mngr->reset_cache(&cache_mngr->stmt_cache); + } + else if (!cache_mngr->stmt_cache.empty()) + { + binlog_flush_stmt_cache(thd, cache_mngr); + } + + if (cache_mngr->trx_cache.empty()) + { + /* + we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() + */ + cache_mngr->reset_cache(&cache_mngr->trx_cache); + DBUG_RETURN(0); + } + + if (mysql_bin_log.check_write_error(thd)) { /* @@ -1625,52 +1768,46 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) */ DBUG_ASSERT(!all); /* - We reach this point if either only transactional tables were modified or - the effect of a statement that did not get into the binlog needs to be - rolled back. In the latter case, if a statement changed non-transactional - tables or had the OPTION_KEEP_LOG associated, we write an incident event - to the binlog in order to stop slaves and notify users that some changes - on the master did not get into the binlog and slaves will be inconsistent. - On the other hand, if a statement is transactional, we just safely roll it - back. + We reach this point if the effect of a statement did not properly get into + a cache and need to be rolled back. */ - if ((thd->transaction.stmt.modified_non_trans_table || - (thd->variables.option_bits & OPTION_KEEP_LOG)) && - mysql_bin_log.check_write_error(thd)) - trx_data->set_incident(); - error= binlog_end_trans(thd, trx_data, 0, all); + error= binlog_truncate_trx_cache(thd, cache_mngr, all); } else - { - /* - We flush the cache with a rollback, wrapped in a beging/rollback if: - . aborting a transaction that modified a non-transactional table; + { + /* + We flush the cache wrapped in a beging/rollback if: + . aborting a transcation that modified a non-transactional table or; . aborting a statement that modified both transactional and - non-transactional tables but which is not in the boundaries of any - transaction or there was no early change; + non-transctional tables but which is not in the boundaries of any + transaction; . the OPTION_KEEP_LOG is activate. */ - if ((all && thd->transaction.all.modified_non_trans_table) || - (!all && thd->transaction.stmt.modified_non_trans_table && - !(thd->variables.option_bits & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) || + if (thd->variables.binlog_format == BINLOG_FORMAT_STMT && + ((all && thd->transaction.all.modified_non_trans_table) || (!all && thd->transaction.stmt.modified_non_trans_table && - !trx_data->at_least_one_stmt_committed && - thd->current_stmt_binlog_row_based) || - ((thd->variables.option_bits & OPTION_KEEP_LOG))) + !thd->in_multi_stmt_transaction()) || + (thd->variables.option_bits & OPTION_KEEP_LOG))) { - Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); - error= binlog_end_trans(thd, trx_data, &qev, all); + Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE, TRUE, 0); + error= binlog_flush_trx_cache(thd, cache_mngr, &qev); } /* Otherwise, we simply truncate the cache as there is no change on non-transactional tables as follows. */ - else if ((all && !thd->transaction.all.modified_non_trans_table) || - (!all && !thd->transaction.stmt.modified_non_trans_table)) - error= binlog_end_trans(thd, trx_data, 0, all); + else if (all || (!all && + (!thd->transaction.stmt.modified_non_trans_table || + !thd->in_multi_stmt_transaction() || + thd->variables.binlog_format != BINLOG_FORMAT_STMT))) + error= binlog_truncate_trx_cache(thd, cache_mngr, all); } + + /* + This is part of the stmt rollback. + */ if (!all) - trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt rollback + cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); DBUG_RETURN(error); } @@ -1746,7 +1883,8 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); int const error= thd->binlog_query(THD::STMT_QUERY_TYPE, - thd->query(), thd->query_length(), TRUE, FALSE, errcode); + thd->query(), thd->query_length(), TRUE, FALSE, FALSE, + errcode); DBUG_RETURN(error); } @@ -1765,7 +1903,8 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); int error= thd->binlog_query(THD::STMT_QUERY_TYPE, - thd->query(), thd->query_length(), TRUE, FALSE, errcode); + thd->query(), thd->query_length(), TRUE, FALSE, FALSE, + errcode); DBUG_RETURN(error); } binlog_trans_log_truncate(thd, *(my_off_t*)sv); @@ -4049,27 +4188,67 @@ bool MYSQL_BIN_LOG::is_query_in_union(THD *thd, query_id_t query_id_param) int THD::binlog_setup_trx_data() { DBUG_ENTER("THD::binlog_setup_trx_data"); - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); - if (trx_data) + if (cache_mngr) DBUG_RETURN(0); // Already set up - trx_data= (binlog_trx_data*) my_malloc(sizeof(binlog_trx_data), MYF(MY_ZEROFILL)); - if (!trx_data || - open_cached_file(&trx_data->trans_log, mysql_tmpdir, + cache_mngr= (binlog_cache_mngr*) my_malloc(sizeof(binlog_cache_mngr), MYF(MY_ZEROFILL)); + if (!cache_mngr || + open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir, + LOG_PREFIX, binlog_cache_size, MYF(MY_WME)) || + open_cached_file(&cache_mngr->trx_cache.cache_log, mysql_tmpdir, LOG_PREFIX, binlog_cache_size, MYF(MY_WME))) { - my_free((uchar*)trx_data, MYF(MY_ALLOW_ZERO_PTR)); + my_free((uchar*)cache_mngr, MYF(MY_ALLOW_ZERO_PTR)); DBUG_RETURN(1); // Didn't manage to set it up } - thd_set_ha_data(this, binlog_hton, trx_data); + thd_set_ha_data(this, binlog_hton, cache_mngr); - trx_data= new (thd_get_ha_data(this, binlog_hton)) binlog_trx_data; + cache_mngr= new (thd_get_ha_data(this, binlog_hton)) binlog_cache_mngr; DBUG_RETURN(0); } +/** + This function checks if a transactional talbe was updated by the + current transaction. + + @param thd The client thread that executed the current statement. + @return + @c true if a transactional table was updated, @false otherwise. +*/ +bool +trans_has_updated_trans_table(THD* thd) +{ + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + return (cache_mngr ? my_b_tell (&cache_mngr->trx_cache.cache_log) : 0); +} + +/** + This function checks if a transactional talbe was updated by the + current statement. + + @param thd The client thread that executed the current statement. + @return + @c true if a transactional table was updated, @false otherwise. +*/ +bool +stmt_has_updated_trans_table(THD *thd) +{ + Ha_trx_info *ha_info; + + for (ha_info= thd->transaction.stmt.ha_list; ha_info; ha_info= ha_info->next()) + { + if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton) + return (TRUE); + } + return (FALSE); +} + /* Function to start a statement and optionally a transaction for the binary log. @@ -4083,11 +4262,10 @@ int THD::binlog_setup_trx_data() - Start a transaction if not in autocommit mode or if a BEGIN statement has been seen. - - Start a statement transaction to allow us to truncate the binary - log. + - Start a statement transaction to allow us to truncate the cache. - Save the currrent binlog position so that we can roll back the - statement by truncating the transaction log. + statement by truncating the cache. We only update the saved position if the old one was undefined, the reason is that there are some cases (e.g., for CREATE-SELECT) @@ -4101,15 +4279,15 @@ int THD::binlog_setup_trx_data() void THD::binlog_start_trans_and_stmt() { - binlog_trx_data *trx_data= (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); DBUG_ENTER("binlog_start_trans_and_stmt"); - DBUG_PRINT("enter", ("trx_data: 0x%lx trx_data->before_stmt_pos: %lu", - (long) trx_data, - (trx_data ? (ulong) trx_data->before_stmt_pos : + DBUG_PRINT("enter", ("cache_mngr: %p cache_mngr->trx_cache.get_prev_position(): %lu", + cache_mngr, + (cache_mngr ? (ulong) cache_mngr->trx_cache.get_prev_position() : (ulong) 0))); - if (trx_data == NULL || - trx_data->before_stmt_pos == MY_OFF_T_UNDEF) + if (cache_mngr == NULL || + cache_mngr->trx_cache.get_prev_position() == MY_OFF_T_UNDEF) { this->binlog_set_stmt_begin(); if (in_multi_stmt_transaction()) @@ -4130,27 +4308,35 @@ THD::binlog_start_trans_and_stmt() } void THD::binlog_set_stmt_begin() { - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); /* - The call to binlog_trans_log_savepos() might create the trx_data + The call to binlog_trans_log_savepos() might create the cache_mngr structure, if it didn't exist before, so we save the position into an auto variable and then write it into the transaction - data for the binary log (i.e., trx_data). + data for the binary log (i.e., cache_mngr). */ my_off_t pos= 0; binlog_trans_log_savepos(this, &pos); - trx_data= (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); - trx_data->before_stmt_pos= pos; + cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + cache_mngr->trx_cache.set_prev_position(pos); } -/* - Write a table map to the binary log. - */ - -int THD::binlog_write_table_map(TABLE *table, bool is_trans) +/** + This function writes a table map to the binary log. + Note that in order to keep the signature uniform with related methods, + we use a redundant parameter to indicate whether a transactional table + was changed or not. + + @param table a pointer to the table. + @param is_transactional @c true indicates a transactional table, + otherwise @c false a non-transactional. + @return + nonzero if an error pops up when writing the table map event. +*/ +int THD::binlog_write_table_map(TABLE *table, bool is_transactional) { int error; DBUG_ENTER("THD::binlog_write_table_map"); @@ -4159,19 +4345,21 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) table->s->table_map_id)); /* Pre-conditions */ - DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); - Table_map_log_event::flag_set const - flags= Table_map_log_event::TM_NO_FLAGS; - Table_map_log_event - the_event(this, table, table->s->table_map_id, is_trans, flags); + the_event(this, table, table->s->table_map_id, is_transactional); - if (is_trans && binlog_table_maps == 0) + if (binlog_table_maps == 0) binlog_start_trans_and_stmt(); - if ((error= mysql_bin_log.write(&the_event))) + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + + IO_CACHE *file= cache_mngr->get_binlog_cache_log(is_transactional); + + if ((error= the_event.write(file))) DBUG_RETURN(error); binlog_table_maps++; @@ -4179,144 +4367,163 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) DBUG_RETURN(0); } +/** + This function retrieves a pending row event from a cache which is + specified through the parameter @c is_transactional. Respectively, when it + is @c true, the pending event is returned from the transactional cache. + Otherwise from the non-transactional cache. + + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. + @return + The row event if any. +*/ Rows_log_event* -THD::binlog_get_pending_rows_event() const +THD::binlog_get_pending_rows_event(bool is_transactional) const { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + Rows_log_event* rows= NULL; + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + /* - This is less than ideal, but here's the story: If there is no - trx_data, prepare_pending_rows_event() has never been called - (since the trx_data is set up there). In that case, we just return - NULL. + This is less than ideal, but here's the story: If there is no cache_mngr, + prepare_pending_rows_event() has never been called (since the cache_mngr + is set up there). In that case, we just return NULL. */ - return trx_data ? trx_data->pending() : NULL; + if (cache_mngr) + { + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(is_transactional); + + rows= cache_data->pending(); + } + return (rows); } +/** + This function stores a pending row event into a cache which is specified + through the parameter @c is_transactional. Respectively, when it is @c + true, the pending event is stored into the transactional cache. Otherwise + into the non-transactional cache. + + @param evt a pointer to the row event. + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. +*/ void -THD::binlog_set_pending_rows_event(Rows_log_event* ev) +THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional) { if (thd_get_ha_data(this, binlog_hton) == NULL) binlog_setup_trx_data(); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); - DBUG_ASSERT(trx_data); - trx_data->set_pending(ev); + DBUG_ASSERT(cache_mngr); + + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(is_transactional); + + cache_data->set_pending(ev); } /** - Remove the pending rows event, discarding any outstanding rows. - - If there is no pending rows event available, this is effectively a + This function removes the pending rows event, discarding any outstanding + rows. If there is no pending rows event available, this is effectively a no-op. - */ + + @param thd a pointer to the user thread. + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. +*/ int -MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd) +MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::remove_pending_rows_event"); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - DBUG_ASSERT(trx_data); + DBUG_ASSERT(cache_mngr); - if (Rows_log_event* pending= trx_data->pending()) + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(is_transactional); + + if (Rows_log_event* pending= cache_data->pending()) { delete pending; - trx_data->set_pending(NULL); + cache_data->set_pending(NULL); } DBUG_RETURN(0); } /* - Moves the last bunch of rows from the pending Rows event to the binlog - (either cached binlog if transaction, or disk binlog). Sets a new pending - event. + Moves the last bunch of rows from the pending Rows event to a cache (either + transactional cache if is_transaction is @c true, or the non-transactional + cache otherwise. Sets a new pending event. + + @param thd a pointer to the user thread. + @param evt a pointer to the row event. + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. */ int MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, - Rows_log_event* event) + Rows_log_event* event, + bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); DBUG_ASSERT(mysql_bin_log.is_open()); DBUG_PRINT("enter", ("event: 0x%lx", (long) event)); int error= 0; + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + DBUG_ASSERT(cache_mngr); - DBUG_ASSERT(trx_data); + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(is_transactional); - DBUG_PRINT("info", ("trx_data->pending(): 0x%lx", (long) trx_data->pending())); + DBUG_PRINT("info", ("cache_mngr->pending(): 0x%lx", (long) cache_data->pending())); - if (Rows_log_event* pending= trx_data->pending()) + if (Rows_log_event* pending= cache_data->pending()) { - IO_CACHE *file= &log_file; - - /* - Decide if we should write to the log file directly or to the - transaction log. - */ - if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log)) - file= &trx_data->trans_log; + IO_CACHE *file= &cache_data->cache_log; /* - If we are writing to the log file directly, we could avoid - locking the log. This does not work since we need to step the - m_table_map_version below, and that change has to be protected - by the LOCK_log mutex. - */ - mysql_mutex_lock(&LOCK_log); - - /* - Write pending event to log file or transaction cache + Write pending event to the cache. */ if (pending->write(file)) { - mysql_mutex_unlock(&LOCK_log); set_write_error(thd); + if (check_write_error(thd) && cache_data && + thd->transaction.stmt.modified_non_trans_table) + cache_data->set_incident(); DBUG_RETURN(1); } /* We step the table map version if we are writing an event - representing the end of a statement. We do this regardless of - wheather we write to the transaction cache or to directly to the - file. - - In an ideal world, we could avoid stepping the table map version - if we were writing to a transaction cache, since we could then - reuse the table map that was written earlier in the transaction - cache. This does not work since STMT_END_F implies closing all - table mappings on the slave side. + representing the end of a statement. + In an ideal world, we could avoid stepping the table map version, + since we could then reuse the table map that was written earlier + in the cache. This does not work since STMT_END_F implies closing + all table mappings on the slave side. + TODO: Find a solution so that table maps does not have to be written several times within a transaction. - */ + */ if (pending->get_flags(Rows_log_event::STMT_END_F)) ++m_table_map_version; delete pending; - - if (file == &log_file) - { - error= flush_and_sync(0); - if (!error) - { - signal_update(); - rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); - } - } - - mysql_mutex_unlock(&LOCK_log); } - thd->binlog_set_pending_rows_event(event); + thd->binlog_set_pending_rows_event(event, is_transactional); DBUG_RETURN(error); } @@ -4330,6 +4537,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) THD *thd= event_info->thd; bool error= 1; DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)"); + binlog_cache_data *cache_data= 0; if (thd->binlog_evt_union.do_union) { @@ -4338,27 +4546,22 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) We will log the function call to the binary log on function exit */ thd->binlog_evt_union.unioned_events= TRUE; - thd->binlog_evt_union.unioned_events_trans |= event_info->cache_stmt; + thd->binlog_evt_union.unioned_events_trans |= + event_info->use_trans_cache(); DBUG_RETURN(0); } /* - Flush the pending rows event to the transaction cache or to the - log file. Since this function potentially aquire the LOCK_log - mutex, we do this before aquiring the LOCK_log mutex in this - function. - We only end the statement if we are in a top-level statement. If we are inside a stored function, we do not end the statement since this will close all tables on the slave. */ bool const end_stmt= thd->locked_tables_mode && thd->lex->requires_prelocking(); - if (thd->binlog_flush_pending_rows_event(end_stmt)) + if (thd->binlog_flush_pending_rows_event(end_stmt, + event_info->use_trans_cache())) DBUG_RETURN(error); - mysql_mutex_lock(&LOCK_log); - /* In most cases this is only called if 'is_open()' is true; in fact this is mostly called if is_open() *was* true a few instructions before, but it @@ -4366,7 +4569,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) */ if (likely(is_open())) { - IO_CACHE *file= &log_file; #ifdef HAVE_REPLICATION /* In the future we need to add to the following if tests like @@ -4375,67 +4577,70 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) */ const char *local_db= event_info->get_db(); if ((thd && !(thd->variables.option_bits & OPTION_BIN_LOG)) || - (!binlog_filter->db_ok(local_db))) - { - mysql_mutex_unlock(&LOCK_log); + !binlog_filter->db_ok(local_db)) DBUG_RETURN(0); - } #endif /* HAVE_REPLICATION */ - /* - Should we write to the binlog cache or to the binlog on disk? - Write to the binlog cache if: - - it is already not empty (meaning we're in a transaction; note that the - present event could be about a non-transactional table, but still we need - to write to the binlog cache in that case to handle updates to mixed - trans/non-trans table types the best possible in binlogging) - - or if the event asks for it (cache_stmt == TRUE). - */ - if (opt_using_transactions && thd) + IO_CACHE *file= NULL; + + if (event_info->use_direct_logging()) + { + file= &log_file; + mysql_mutex_lock(&LOCK_log); + } + else { if (thd->binlog_setup_trx_data()) goto err; - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - IO_CACHE *trans_log= &trx_data->trans_log; - my_off_t trans_log_pos= my_b_tell(trans_log); - if (event_info->get_cache_stmt() || trans_log_pos != 0 || - stmt_has_updated_trans_table(thd)) + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + /* + If we are about to use write rows, we just need to check the type of + the event (either transactional or non-transactional) in order to + choose the cache. + */ + if (thd->is_current_stmt_binlog_format_row()) { - DBUG_PRINT("info", ("Using trans_log: cache: %d, trans_log_pos: %lu", - event_info->get_cache_stmt(), - (ulong) trans_log_pos)); - thd->binlog_start_trans_and_stmt(); - file= trans_log; + file= cache_mngr->get_binlog_cache_log(event_info->use_trans_cache()); + cache_data= cache_mngr->get_binlog_cache_data(event_info->use_trans_cache()); } /* - TODO as Mats suggested, for all the cases above where we write to - trans_log, it sounds unnecessary to lock LOCK_log. We should rather - test first if we want to write to trans_log, and if not, lock - LOCK_log. + However, if we are about to write statements we need to consider other + things. We use the non-transactional cache when: + + . the transactional cache is empty which means that there were no + early statement on behalf of the transaction. + . the respective event is tagged as non-transactional. */ - } + else if (cache_mngr->trx_cache.empty() && + !event_info->use_trans_cache()) + { + file= &cache_mngr->stmt_cache.cache_log; + cache_data= &cache_mngr->stmt_cache; + } + else + { + file= &cache_mngr->trx_cache.cache_log; + cache_data= &cache_mngr->trx_cache; + } + thd->binlog_start_trans_and_stmt(); + } DBUG_PRINT("info",("event type: %d",event_info->get_type_code())); /* - No check for auto events flag here - this write method should - never be called if auto-events are enabled - */ - - /* - 1. Write first log events which describe the 'run environment' - of the SQL command - */ + No check for auto events flag here - this write method should + never be called if auto-events are enabled. - /* - If row-based binlogging, Insert_id, Rand and other kind of "setting - context" events are not needed. + Write first log events which describe the 'run environment' + of the SQL command. If row-based binlogging, Insert_id, Rand + and other kind of "setting context" events are not needed. */ if (thd) { - if (!thd->current_stmt_binlog_row_based) + if (!thd->is_current_stmt_binlog_format_row()) { if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt) { @@ -4481,39 +4686,48 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) } /* - Write the SQL command - */ - - if (event_info->write(file) || + Write the event. + */ + if (event_info->write(file) || DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0)) goto err; - if (file == &log_file) // we are writing to the real log (disk) + error= 0; + +err: + if (event_info->use_direct_logging()) { - bool synced= 0; - if (flush_and_sync(&synced)) - goto err; + if (!error) + { + bool synced; + if ((error= flush_and_sync(&synced))) + goto unlock; - if (RUN_HOOK(binlog_storage, after_flush, - (thd, log_file_name, file->pos_in_file, synced))) { - sql_print_error("Failed to run 'after_flush' hooks"); - goto err; + if ((error= RUN_HOOK(binlog_storage, after_flush, + (thd, log_file_name, file->pos_in_file, synced)))) + { + sql_print_error("Failed to run 'after_flush' hooks"); + goto unlock; + } + signal_update(); + rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } - - signal_update(); - rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); +unlock: + mysql_mutex_unlock(&LOCK_log); } - error=0; -err: if (error) + { set_write_error(thd); + if (check_write_error(thd) && cache_data && + thd->transaction.stmt.modified_non_trans_table) + cache_data->set_incident(); + } } if (event_info->flags & LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F) ++m_table_map_version; - mysql_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -4636,7 +4850,7 @@ uint MYSQL_BIN_LOG::next_file_id() write_cache() cache Cache to write to the binary log lock_log True if the LOCK_log mutex should be aquired, false otherwise - sync_log True if the log should be flushed and sync:ed + sync_log True if the log should be flushed and synced DESCRIPTION Write the contents of the cache to the binary log. The cache will @@ -4852,9 +5066,6 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)"); mysql_mutex_lock(&LOCK_log); - /* NULL would represent nothing to replicate after ROLLBACK */ - DBUG_ASSERT(commit_event != NULL); - DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { @@ -4869,19 +5080,9 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, transaction is either a BEGIN..COMMIT block or a single statement in autocommit mode. */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); - - /* - Now this Query_log_event has artificial log_pos 0. It must be - adjusted to reflect the real position in the log. Not doing it - would confuse the slave: it would prevent this one from - knowing where he is in the master's binlog, which would result - in wrong positions being shown to the user, MASTER_POS_WAIT - undue waiting etc. - */ + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE, TRUE, 0); if (qinfo.write(&log_file)) goto err; - DBUG_EXECUTE_IF("crash_before_writing_xid", { if ((write_error= write_cache(cache, false, true))) @@ -5981,13 +6182,14 @@ int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid) { DBUG_ENTER("TC_LOG_BINLOG::log"); Xid_log_event xle(thd, xid); - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); /* We always commit the entire transaction when writing an XID. Also note that the return value is inverted. */ - DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE)); + DBUG_RETURN(!binlog_flush_stmt_cache(thd, cache_mngr) && + !binlog_flush_trx_cache(thd, cache_mngr, &xle)); } void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) diff --git a/sql/log.h b/sql/log.h index db774bc62fb..5e13d153db8 100644 --- a/sql/log.h +++ b/sql/log.h @@ -20,6 +20,9 @@ class Relay_log_info; class Format_description_log_event; +bool trans_has_updated_trans_table(THD* thd); +bool stmt_has_updated_trans_table(THD *thd); + /* Transaction Coordinator log - a base abstract class for two different implementations @@ -351,8 +354,9 @@ public: ulonglong table_map_version() const { return m_table_map_version; } void update_table_map_version() { ++m_table_map_version; } - int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event); - int remove_pending_rows_event(THD *thd); + int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event, + bool is_transactional); + int remove_pending_rows_event(THD *thd, bool is_transactional); #endif /* !defined(MYSQL_CLIENT) */ void reset_bytes_written() @@ -391,7 +395,7 @@ public: /* Use this to start writing a new log file */ void new_file(); - bool write(Log_event* event_info); // binary log write + bool write(Log_event* event_info); bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event, bool incident); bool write_incident(THD *thd, bool lock); @@ -590,6 +594,8 @@ public: void init_base(); void init_log_tables(); bool flush_logs(THD *thd); + bool flush_slow_log(); + bool flush_general_log(); /* Perform basic logger cleanup. this will leave e.g. error log open. */ void cleanup_base(); /* Free memory. Nothing could be logged after this function is called */ diff --git a/sql/log_event.cc b/sql/log_event.cc index 2d5b7102c7a..996267b171c 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -29,7 +29,6 @@ #include "rpl_rli.h" #include "rpl_mi.h" #include "rpl_filter.h" -#include "rpl_utility.h" #include "rpl_record.h" #include "transaction.h" #include <my_dir.h> @@ -38,6 +37,7 @@ #include <base64.h> #include <my_bitmap.h> +#include "rpl_utility.h" #define log_cs &my_charset_latin1 @@ -666,10 +666,11 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) { server_id= thd->server_id; when= thd->start_time; - cache_stmt= using_trans; + cache_type= (using_trans || stmt_has_updated_trans_table(thd) + ? Log_event::EVENT_TRANSACTIONAL_CACHE : + Log_event::EVENT_STMT_CACHE); } - /** This minimal constructor is for when you are not even sure that there is a valid THD. For example in the server when we are shutting down or @@ -678,8 +679,8 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) */ Log_event::Log_event() - :temp_buf(0), exec_time(0), flags(0), cache_stmt(0), - thd(0) + :temp_buf(0), exec_time(0), flags(0), + cache_type(Log_event::EVENT_INVALID_CACHE), thd(0) { server_id= ::server_id; /* @@ -698,7 +699,7 @@ Log_event::Log_event() Log_event::Log_event(const char* buf, const Format_description_log_event* description_event) - :temp_buf(0), cache_stmt(0) + :temp_buf(0), cache_type(Log_event::EVENT_INVALID_CACHE) { #ifndef MYSQL_CLIENT thd = 0; @@ -1568,37 +1569,14 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr, /* a long CHAR() field: see #37426 */ length= byte1 | (((byte0 & 0x30) ^ 0x30) << 4); type= byte0 | 0x30; - goto beg; - } - - switch (byte0) - { - case MYSQL_TYPE_SET: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_STRING: - type= byte0; - length= byte1; - break; - - default: - - { - char tmp[5]; - my_snprintf(tmp, sizeof(tmp), "%04X", meta); - my_b_printf(file, - "!! Don't know how to handle column type=%d meta=%d (%s)", - type, meta, tmp); - return 0; - } } + else + length = meta & 0xFF; } else length= meta; } - -beg: - switch (type) { case MYSQL_TYPE_LONG: { @@ -1735,6 +1713,33 @@ beg: return 3; } + case MYSQL_TYPE_NEWDATE: + { + uint32 tmp= uint3korr(ptr); + int part; + char buf[10]; + char *pos= &buf[10]; + + /* Copied from field.cc */ + *pos--=0; // End NULL + part=(int) (tmp & 31); + *pos--= (char) ('0'+part%10); + *pos--= (char) ('0'+part/10); + *pos--= ':'; + part=(int) (tmp >> 5 & 15); + *pos--= (char) ('0'+part%10); + *pos--= (char) ('0'+part/10); + *pos--= ':'; + part=(int) (tmp >> 9); + *pos--= (char) ('0'+part%10); part/=10; + *pos--= (char) ('0'+part%10); part/=10; + *pos--= (char) ('0'+part%10); part/=10; + *pos= (char) ('0'+part); + my_b_printf(file , "'%s'", buf); + my_snprintf(typestr, typestr_length, "DATE"); + return 3; + } + case MYSQL_TYPE_DATE: { uint i32= uint3korr(ptr); @@ -1753,7 +1758,7 @@ beg: } case MYSQL_TYPE_ENUM: - switch (length) { + switch (meta & 0xFF) { case 1: my_b_printf(file, "%d", (int) *ptr); my_snprintf(typestr, typestr_length, "ENUM(1 byte)"); @@ -1766,15 +1771,15 @@ beg: return 2; } default: - my_b_printf(file, "!! Unknown ENUM packlen=%d", length); + my_b_printf(file, "!! Unknown ENUM packlen=%d", meta & 0xFF); return 0; } break; case MYSQL_TYPE_SET: - my_b_write_bit(file, ptr , length * 8); - my_snprintf(typestr, typestr_length, "SET(%d bytes)", length); - return length; + my_b_write_bit(file, ptr , (meta & 0xFF) * 8); + my_snprintf(typestr, typestr_length, "SET(%d bytes)", meta & 0xFF); + return meta & 0xFF; case MYSQL_TYPE_BLOB: switch (meta) { @@ -2354,7 +2359,7 @@ Query_log_event::Query_log_event() */ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length, bool using_trans, - bool suppress_use, int errcode) + bool direct, bool suppress_use, int errcode) :Log_event(thd_arg, (thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : @@ -2433,6 +2438,95 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, } else time_zone_len= 0; + + /* + In what follows, we decide whether to write to the binary log or to use a + cache. + */ + LEX *lex= thd->lex; + bool implicit_commit= FALSE; + cache_type= Log_event::EVENT_INVALID_CACHE; + switch (lex->sql_command) + { + case SQLCOM_ALTER_DB: + case SQLCOM_CREATE_FUNCTION: + case SQLCOM_DROP_FUNCTION: + case SQLCOM_DROP_PROCEDURE: + case SQLCOM_INSTALL_PLUGIN: + case SQLCOM_UNINSTALL_PLUGIN: + case SQLCOM_ALTER_TABLESPACE: + implicit_commit= TRUE; + break; + case SQLCOM_DROP_TABLE: + implicit_commit= !(lex->drop_temporary && thd->in_multi_stmt_transaction()); + break; + case SQLCOM_ALTER_TABLE: + case SQLCOM_CREATE_TABLE: + implicit_commit= !((lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) && + thd->in_multi_stmt_transaction()) && + !(lex->select_lex.item_list.elements && + thd->is_current_stmt_binlog_format_row()); + break; + case SQLCOM_SET_OPTION: + implicit_commit= (lex->autocommit ? TRUE : FALSE); + break; + /* + Replace what follows after CF_AUTO_COMMIT_TRANS is backported by: + + default: + implicit_commit= ((sql_command_flags[lex->sql_command] & + CF_AUTO_COMMIT_TRANS)); + break; + */ + case SQLCOM_CREATE_INDEX: + case SQLCOM_TRUNCATE: + case SQLCOM_CREATE_DB: + case SQLCOM_DROP_DB: + case SQLCOM_ALTER_DB_UPGRADE: + case SQLCOM_RENAME_TABLE: + case SQLCOM_DROP_INDEX: + case SQLCOM_CREATE_VIEW: + case SQLCOM_DROP_VIEW: + case SQLCOM_CREATE_TRIGGER: + case SQLCOM_DROP_TRIGGER: + case SQLCOM_CREATE_EVENT: + case SQLCOM_ALTER_EVENT: + case SQLCOM_DROP_EVENT: + case SQLCOM_REPAIR: + case SQLCOM_OPTIMIZE: + case SQLCOM_ANALYZE: + case SQLCOM_CREATE_USER: + case SQLCOM_DROP_USER: + case SQLCOM_RENAME_USER: + case SQLCOM_REVOKE_ALL: + case SQLCOM_REVOKE: + case SQLCOM_GRANT: + case SQLCOM_CREATE_PROCEDURE: + case SQLCOM_CREATE_SPFUNCTION: + case SQLCOM_ALTER_PROCEDURE: + case SQLCOM_ALTER_FUNCTION: + case SQLCOM_ASSIGN_TO_KEYCACHE: + case SQLCOM_PRELOAD_KEYS: + case SQLCOM_FLUSH: + case SQLCOM_CHECK: + implicit_commit= TRUE; + break; + default: + implicit_commit= FALSE; + break; + } + + if (implicit_commit || direct) + { + cache_type= Log_event::EVENT_NO_CACHE; + } + else + { + cache_type= (using_trans || stmt_has_updated_trans_table(thd) + ? Log_event::EVENT_TRANSACTIONAL_CACHE : + Log_event::EVENT_STMT_CACHE); + } + DBUG_ASSERT(cache_type != Log_event::EVENT_INVALID_CACHE); DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %lu", (ulong) flags2, sql_mode)); } @@ -6710,9 +6804,9 @@ Execute_load_query_log_event(THD *thd_arg, const char* query_arg, ulong query_length_arg, uint fn_pos_start_arg, uint fn_pos_end_arg, enum_load_dup_handling dup_handling_arg, - bool using_trans, bool suppress_use, + bool using_trans, bool direct, bool suppress_use, int errcode): - Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, + Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, direct, suppress_use, errcode), file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg), fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg) @@ -7280,7 +7374,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) We also call the mysql_reset_thd_for_next_command(), since this is the logical start of the next "statement". Note that this - call might reset the value of current_stmt_binlog_row_based, so + call might reset the value of current_stmt_binlog_format, so we need to do any changes to that value after this function. */ lex_start(thd); @@ -7292,16 +7386,12 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ thd->transaction.stmt.modified_non_trans_table= FALSE; /* - Check if the slave is set to use SBR. If so, it should switch - to using RBR until the end of the "statement", i.e., next - STMT_END_F or next error. + This is a row injection, so we flag the "statement" as + such. Note that this code is called both when the slave does row + injections and when the BINLOG statement is used to do row + injections. */ - if (!thd->current_stmt_binlog_row_based && - mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG)) - { - thd->set_current_stmt_binlog_row_based(); - } - + thd->lex->set_stmt_row_injection(); /* There are a few flags that are replicated with each row event. @@ -7350,11 +7440,18 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ { + DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p", + rli->tables_to_lock)); RPL_TABLE_LIST *ptr= rli->tables_to_lock; for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global)) { - if (ptr->m_tabledef.compatible_with(rli, ptr->table)) + TABLE *conv_table; + if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli), + ptr->table, &conv_table)) { + DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master", + ptr->table->s->db.str, + ptr->table->s->table_name.str)); /* We should not honour --slave-skip-errors at this point as we are having severe errors which should not be skiped. @@ -7365,12 +7462,17 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); DBUG_RETURN(ERR_BAD_TABLE_DEF); } + DBUG_PRINT("debug", ("Table: %s.%s is compatible with master" + " - conv_table: %p", + ptr->table->s->db.str, + ptr->table->s->table_name.str, conv_table)); + ptr->m_conv_table= conv_table; } } /* - ... and then we add all the tables to the table map and remove - them from tables to lock. + ... and then we add all the tables to the table map and but keep + them in the tables to lock list. We also invalidate the query cache for all the tables, since they will now be changed. @@ -7537,7 +7639,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) error= 0; } - if (!cache_stmt) + if (!use_trans_cache()) { DBUG_PRINT("info", ("Marked that we need to keep log")); thd->variables.option_bits|= OPTION_KEEP_LOG; @@ -7550,7 +7652,14 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table, get_type_str(), RPL_LOG_NAME, (ulong) log_pos); - thd->reset_current_stmt_binlog_row_based(); + /* + @todo We should probably not call + reset_current_stmt_binlog_format_row() from here. + + Note: this applies to log_event_old.cc too. + /Sven + */ + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); thd->is_slave_error= 1; DBUG_RETURN(error); @@ -7611,7 +7720,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) (assume the last master's transaction is ignored by the slave because of replicate-ignore rules). */ - error= thd->binlog_flush_pending_rows_event(true); + error= thd->binlog_flush_pending_rows_event(TRUE); /* If this event is not in a transaction, the call below will, if some @@ -7634,7 +7743,17 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) event flushed. */ - thd->reset_current_stmt_binlog_row_based(); + /* + @todo We should probably not call + reset_current_stmt_binlog_format_row() from here. + + Note: this applies to log_event_old.cc too + + Btw, the previous comment about transactional engines does not + seem related to anything that happens here. + /Sven + */ + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0); } @@ -7841,7 +7960,10 @@ int Table_map_log_event::save_field_metadata() DBUG_ENTER("Table_map_log_event::save_field_metadata"); int index= 0; for (unsigned int i= 0 ; i < m_table->s->fields ; i++) + { + DBUG_PRINT("debug", ("field_type: %d", m_coltype[i])); index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]); + } DBUG_RETURN(index); } #endif /* !defined(MYSQL_CLIENT) */ @@ -7853,8 +7975,8 @@ int Table_map_log_event::save_field_metadata() */ #if !defined(MYSQL_CLIENT) Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, - bool is_transactional, uint16 flags) - : Log_event(thd, 0, true), + bool is_transactional) + : Log_event(thd, 0, is_transactional), m_table(tbl), m_dbnam(tbl->s->db.str), m_dblen(m_dbnam ? tbl->s->db.length : 0), @@ -7863,7 +7985,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, m_colcnt(tbl->s->fields), m_memory(NULL), m_table_id(tid), - m_flags(flags), + m_flags(TM_BIT_LEN_EXACT_F), m_data_size(0), m_field_metadata(0), m_field_metadata_size(0), @@ -8119,8 +8241,10 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli) inside Relay_log_info::clear_tables_to_lock() by calling the table_def destructor explicitly. */ - new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt, - m_field_metadata, m_field_metadata_size, m_null_bits); + new (&table_list->m_tabledef) + table_def(m_coltype, m_colcnt, + m_field_metadata, m_field_metadata_size, + m_null_bits, m_flags); table_list->m_tabledef_valid= TRUE; /* diff --git a/sql/log_event.h b/sql/log_event.h index 5dd5de26fcf..9f14b30d2e2 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -870,6 +870,31 @@ public: EVENT_SKIP_COUNT }; + enum enum_event_cache_type + { + EVENT_INVALID_CACHE, + /* + If possible the event should use a non-transactional cache before + being flushed to the binary log. This means that it must be flushed + right after its correspondent statement is completed. + */ + EVENT_STMT_CACHE, + /* + The event should use a transactional cache before being flushed to + the binary log. This means that it must be flushed upon commit or + rollback. + */ + EVENT_TRANSACTIONAL_CACHE, + /* + The event must be written directly to the binary log without going + through a cache. + */ + EVENT_NO_CACHE, + /** + If there is a need for different types, introduce them before this. + */ + EVENT_CACHE_COUNT + }; /* The following type definition is to be used whenever data is placed @@ -920,8 +945,12 @@ public: LOG_EVENT_SUPPRESS_USE_F for notes. */ uint16 flags; - - bool cache_stmt; + + /* + Defines the type of the cache, if any, where the event will be + stored before being flushed to disk. + */ + uint16 cache_type; /** A storage to cache the global system variable's value. @@ -933,7 +962,7 @@ public: THD* thd; Log_event(); - Log_event(THD* thd_arg, uint16 flags_arg, bool cache_stmt); + Log_event(THD* thd_arg, uint16 flags_arg, bool is_transactional); /* read_log_event() functions read an event from a binlog or relay log; used by SHOW BINLOG EVENTS, the binlog_dump thread on the @@ -1031,7 +1060,18 @@ public: void set_relay_log_event() { flags |= LOG_EVENT_RELAY_LOG_F; } bool is_artificial_event() const { return flags & LOG_EVENT_ARTIFICIAL_F; } bool is_relay_log_event() const { return flags & LOG_EVENT_RELAY_LOG_F; } - inline bool get_cache_stmt() const { return cache_stmt; } + inline bool use_trans_cache() const + { + return (cache_type == Log_event::EVENT_TRANSACTIONAL_CACHE); + } + inline void set_direct_logging() + { + cache_type = Log_event::EVENT_NO_CACHE; + } + inline bool use_direct_logging() + { + return (cache_type == Log_event::EVENT_NO_CACHE); + } Log_event(const char* buf, const Format_description_log_event *description_event); virtual ~Log_event() { free_temp_buf();} @@ -1645,7 +1685,7 @@ public: #ifndef MYSQL_CLIENT Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length, - bool using_trans, bool suppress_use, int error); + bool using_trans, bool direct, bool suppress_use, int error); const char* get_db() { return db; } #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); @@ -2895,8 +2935,8 @@ public: ulong query_length, uint fn_pos_start_arg, uint fn_pos_end_arg, enum_load_dup_handling dup_handling_arg, - bool using_trans, bool suppress_use, - int errcode); + bool using_trans, bool direct, + bool suppress_use, int errcode); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); #endif /* HAVE_REPLICATION */ @@ -3304,16 +3344,14 @@ public: /* Special constants representing sets of flags */ enum { - TM_NO_FLAGS = 0U + TM_NO_FLAGS = 0U, + TM_BIT_LEN_EXACT_F = (1U << 0) }; - void set_flags(flag_set flag) { m_flags |= flag; } - void clear_flags(flag_set flag) { m_flags &= ~flag; } flag_set get_flags(flag_set flag) const { return m_flags & flag; } #ifndef MYSQL_CLIENT - Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, - bool is_transactional, uint16 flags); + Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, bool is_transactional); #endif #ifdef HAVE_REPLICATION Table_map_log_event(const char *buf, uint event_len, @@ -3326,12 +3364,12 @@ public: table_def *create_table_def() { return new table_def(m_coltype, m_colcnt, m_field_metadata, - m_field_metadata_size, m_null_bits); + m_field_metadata_size, m_null_bits, m_flags); } +#endif ulong get_table_id() const { return m_table_id; } const char *get_table_name() const { return m_tblnam; } const char *get_db_name() const { return m_dbnam; } -#endif virtual Log_event_type get_type_code() { return TABLE_MAP_EVENT; } virtual bool is_valid() const { return m_memory != NULL; /* we check malloc */ } @@ -3883,6 +3921,7 @@ public: DBUG_PRINT("enter", ("m_incident: %d", m_incident)); m_message.str= NULL; /* Just as a precaution */ m_message.length= 0; + set_direct_logging(); DBUG_VOID_RETURN; } @@ -3892,6 +3931,7 @@ public: DBUG_ENTER("Incident_log_event::Incident_log_event"); DBUG_PRINT("enter", ("m_incident: %d", m_incident)); m_message= msg; + set_direct_logging(); DBUG_VOID_RETURN; } #endif diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index 48028335d21..caa69269e28 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -59,22 +59,19 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info We also call the mysql_reset_thd_for_next_command(), since this is the logical start of the next "statement". Note that this - call might reset the value of current_stmt_binlog_row_based, so + call might reset the value of current_stmt_binlog_format, so we need to do any changes to that value after this function. */ lex_start(thd); mysql_reset_thd_for_next_command(thd); /* - Check if the slave is set to use SBR. If so, it should switch - to using RBR until the end of the "statement", i.e., next - STMT_END_F or next error. + This is a row injection, so we flag the "statement" as + such. Note that this code is called both when the slave does row + injections and when the BINLOG statement is used to do row + injections. */ - if (!thd->current_stmt_binlog_row_based && - mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG)) - { - thd->set_current_stmt_binlog_row_based(); - } + thd->lex->set_stmt_row_injection(); if (simple_open_n_lock_tables(thd, rli->tables_to_lock)) { @@ -107,12 +104,19 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info RPL_TABLE_LIST *ptr= rli->tables_to_lock; for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global)) { - if (ptr->m_tabledef.compatible_with(rli, ptr->table)) + TABLE *conv_table; + if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli), + ptr->table, &conv_table)) { thd->is_slave_error= 1; const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF); } + DBUG_PRINT("debug", ("Table: %s.%s is compatible with master" + " - conv_table: %p", + ptr->table->s->db.str, + ptr->table->s->table_name.str, conv_table)); + ptr->m_conv_table= conv_table; } } @@ -227,7 +231,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info DBUG_EXECUTE_IF("stop_slave_middle_group", const_cast<Relay_log_info*>(rli)->abort_slave= 1;); error= do_after_row_operations(table, error); - if (!ev->cache_stmt) + if (!ev->use_trans_cache()) { DBUG_PRINT("info", ("Marked that we need to keep log")); thd->variables.option_bits|= OPTION_KEEP_LOG; @@ -254,7 +258,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info thread is certainly going to stop. rollback at the caller along with sbr. */ - thd->reset_current_stmt_binlog_row_based(); + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); thd->is_slave_error= 1; DBUG_RETURN(error); @@ -1504,7 +1508,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) NOTE: For this new scheme there should be no pending event: need to add code to assert that is the case. */ - error= thd->binlog_flush_pending_rows_event(false); + error= thd->binlog_flush_pending_rows_event(FALSE); if (error) { rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, @@ -1549,18 +1553,22 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) RPL_TABLE_LIST *ptr= rli->tables_to_lock; for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global)) { - if (ptr->m_tabledef.compatible_with(rli, ptr->table)) + TABLE *conv_table; + if (ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli), + ptr->table, &conv_table)) { thd->is_slave_error= 1; const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); DBUG_RETURN(ERR_BAD_TABLE_DEF); } + ptr->m_conv_table= conv_table; } } /* - ... and then we add all the tables to the table map and remove - them from tables to lock. + ... and then we add all the tables to the table map but keep + them in the tables to lock list. + We also invalidate the query cache for all the tables, since they will now be changed. @@ -1716,7 +1724,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) DBUG_EXECUTE_IF("stop_slave_middle_group", const_cast<Relay_log_info*>(rli)->abort_slave= 1;); error= do_after_row_operations(rli, error); - if (!cache_stmt) + if (!use_trans_cache()) { DBUG_PRINT("info", ("Marked that we need to keep log")); thd->variables.option_bits|= OPTION_KEEP_LOG; @@ -1743,7 +1751,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) thread is certainly going to stop. rollback at the caller along with sbr. */ - thd->reset_current_stmt_binlog_row_based(); + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); thd->is_slave_error= 1; DBUG_RETURN(error); @@ -1755,7 +1763,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) last_event_start_time here instead. */ if (table && (table->s->primary_key == MAX_KEY) && - !cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS) + !use_trans_cache() && get_flags(STMT_END_F) == RLE_NO_FLAGS) { /* ------------ Temporary fix until WL#2975 is implemented --------- @@ -1793,7 +1801,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) (assume the last master's transaction is ignored by the slave because of replicate-ignore rules). */ - int binlog_error= thd->binlog_flush_pending_rows_event(true); + int binlog_error= thd->binlog_flush_pending_rows_event(TRUE); /* If this event is not in a transaction, the call below will, if some @@ -1822,7 +1830,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) event flushed. */ - thd->reset_current_stmt_binlog_row_based(); + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0); } diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 3707a7cf62b..3aa0e6622f5 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -1030,7 +1030,8 @@ struct Query_cache_query_flags #endif /*HAVE_QUERY_CACHE*/ int write_bin_log(THD *thd, bool clear_error, - char const *query, ulong query_length); + char const *query, ulong query_length, + bool is_trans= FALSE); /* sql_connect.cc */ int check_user(THD *thd, enum enum_server_command command, @@ -1576,7 +1577,6 @@ TABLE *open_n_lock_single_table(THD *thd, TABLE_LIST *table_l, bool open_normal_and_derived_tables(THD *thd, TABLE_LIST *tables, uint flags); bool lock_tables(THD *thd, TABLE_LIST *tables, uint counter, uint flags, bool *need_reopen); -bool decide_logging_format(THD *thd, TABLE_LIST *tables); TABLE *open_temporary_table(THD *thd, const char *path, const char *db, const char *table_name, bool link_in_list); bool rm_temporary_table(handlerton *base, char *path); @@ -2018,6 +2018,7 @@ extern my_bool opt_sql_bin_update, opt_safe_user_create, opt_no_mix_types; extern my_bool opt_safe_show_db, opt_local_infile, opt_myisam_use_mmap; extern my_bool opt_slave_compressed_protocol, use_temp_pool; extern uint slave_exec_mode_options; +extern ulonglong slave_type_conversions_options; extern my_bool opt_readonly, lower_case_file_system; extern my_bool opt_enable_named_pipe, opt_sync_frm, opt_allow_suspicious_udfs; extern my_bool opt_secure_auth; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 64975875da7..5bb526795df 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -454,6 +454,7 @@ ulong query_buff_size, slow_launch_time, slave_open_temp_tables; ulong open_files_limit, max_binlog_size, max_relay_log_size; ulong slave_net_timeout, slave_trans_retries; uint slave_exec_mode_options; +ulonglong slave_type_conversions_options; ulong thread_cache_size=0, thread_pool_size= 0; ulong binlog_cache_size=0; ulonglong max_binlog_cache_size=0; @@ -3338,6 +3339,16 @@ static int init_common_variables() strmov(fn_ext(pidfile_name),".pid"); // Add proper extension /* + The default-storage-engine entry in my_long_options should have a + non-null default value. It was earlier intialized as + (longlong)"MyISAM" in my_long_options but this triggered a + compiler error in the Sun Studio 12 compiler. As a work-around we + set the def_value member to 0 in my_long_options and initialize it + to the correct value here. + */ + default_storage_engine="MyISAM"; + + /* Add server status variables to the dynamic list of status variables that is shown by SHOW STATUS. Later, in plugin_init, and mysql_install_plugin @@ -5958,9 +5969,12 @@ struct my_option my_long_options[]= {"default-collation", 0, "Set the default collation (deprecated option, use --collation-server instead).", (uchar**) &default_collation_name, (uchar**) &default_collation_name, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, + /* default-storage-engine should have "MyISAM" as def_value. Instead + of initializing it here it is done in init_common_variables() due + to a compiler bug in Sun Studio compiler. */ {"default-storage-engine", 0, "The default storage engine for new tables", (uchar**) &default_storage_engine, 0, 0, GET_STR, REQUIRED_ARG, - (longlong)"MyISAM", 0, 0, 0, 0, 0 }, + 0, 0, 0, 0, 0, 0 }, {"default-time-zone", 0, "Set the default time zone.", (uchar**) &default_tz_name, (uchar**) &default_tz_name, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index e10288552a5..5bf87dea90e 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -180,11 +180,18 @@ int register_slave(THD* thd, uchar* packet, uint packet_length) get_object(p,si->host, "Failed to register slave: too long 'report-host'"); get_object(p,si->user, "Failed to register slave: too long 'report-user'"); get_object(p,si->password, "Failed to register slave; too long 'report-password'"); - /*6 is the total length of port and master_id*/ - if (p+6 != p_end) + if (p+10 > p_end) goto err; si->port= uint2korr(p); p += 2; + /* + We need to by pass the bytes used in the fake rpl_recovery_rank + variable. It was removed in patch for BUG#13963. But this would + make a server with that patch unable to connect to an old master. + See: BUG#49259 + */ + // si->rpl_recovery_rank= uint4korr(p); + p += 4; if (!(si->master_id= uint4korr(p))) si->master_id= server_id; si->thd= thd; diff --git a/sql/rpl_injector.cc b/sql/rpl_injector.cc index ab244024810..c8d561af1c2 100644 --- a/sql/rpl_injector.cc +++ b/sql/rpl_injector.cc @@ -37,8 +37,6 @@ injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd) m_start_pos.m_file_pos= log_info.pos; trans_begin(m_thd); - - thd->set_current_stmt_binlog_row_based(); } injector::transaction::~transaction() diff --git a/sql/rpl_record.cc b/sql/rpl_record.cc index 8f3a41fbeef..7bf7b81aa3a 100644 --- a/sql/rpl_record.cc +++ b/sql/rpl_record.cc @@ -104,10 +104,10 @@ pack_row(TABLE *table, MY_BITMAP const* cols, #endif pack_ptr= field->pack(pack_ptr, field->ptr + offset, field->max_data_length(), TRUE); - DBUG_PRINT("debug", ("field: %s; pack_ptr: 0x%lx;" + DBUG_PRINT("debug", ("field: %s; real_type: %d, pack_ptr: 0x%lx;" " pack_ptr':0x%lx; bytes: %d", - field->field_name, (ulong) old_pack_ptr, - (ulong) pack_ptr, + field->field_name, field->real_type(), + (ulong) old_pack_ptr, (ulong) pack_ptr, (int) (pack_ptr - old_pack_ptr))); } @@ -150,13 +150,20 @@ pack_row(TABLE *table, MY_BITMAP const* cols, the various member functions of Field and subclasses expect to write. - The row is assumed to only consist of the fields for which the corresponding - bit in bitset @c cols is set; the other parts of the record are left alone. + The row is assumed to only consist of the fields for which the + corresponding bit in bitset @c cols is set; the other parts of the + record are left alone. At most @c colcnt columns are read: if the table is larger than that, the remaining fields are not filled in. - @param rli Relay log info + @note The relay log information can be NULL, which means that no + checking or comparison with the source table is done, simply + because it is not used. This feature is used by MySQL Backup to + unpack a row from from the backup image, but can be used for other + purposes as well. + + @param rli Relay log info, which can be NULL @param table Table to unpack into @param colcnt Number of columns to read from record @param row_data @@ -170,10 +177,8 @@ pack_row(TABLE *table, MY_BITMAP const* cols, @retval 0 No error - @retval ER_NO_DEFAULT_FOR_FIELD - Returned if one of the fields existing on the slave but not on the - master does not have a default value (and isn't nullable) - + @retval HA_ERR_GENERIC + A generic, internal, error caused the unpacking to fail. */ #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) int @@ -185,6 +190,7 @@ unpack_row(Relay_log_info const *rli, { DBUG_ENTER("unpack_row"); DBUG_ASSERT(row_data); + DBUG_ASSERT(table); size_t const master_null_byte_count= (bitmap_bits_set(cols) + 7) / 8; int error= 0; @@ -202,10 +208,37 @@ unpack_row(Relay_log_info const *rli, // The "current" null bits unsigned int null_bits= *null_ptr++; uint i= 0; - table_def *tabledef= ((Relay_log_info*)rli)->get_tabledef(table); + table_def *tabledef= NULL; + TABLE *conv_table= NULL; + bool table_found= rli && rli->get_table_data(table, &tabledef, &conv_table); + DBUG_PRINT("debug", ("Table data: table_found: %d, tabldef: %p, conv_table: %p", + table_found, tabledef, conv_table)); + DBUG_ASSERT(table_found); + + /* + If rli is NULL it means that there is no source table and that the + row shall just be unpacked without doing any checks. This feature + is used by MySQL Backup, but can be used for other purposes as + well. + */ + if (rli && !table_found) + DBUG_RETURN(HA_ERR_GENERIC); + for (field_ptr= begin_ptr ; field_ptr < end_ptr && *field_ptr ; ++field_ptr) { - Field *const f= *field_ptr; + /* + If there is a conversion table, we pick up the field pointer to + the conversion table. If the conversion table or the field + pointer is NULL, no conversions are necessary. + */ + Field *conv_field= + conv_table ? conv_table->field[field_ptr - begin_ptr] : NULL; + Field *const f= + conv_field ? conv_field : *field_ptr; + DBUG_PRINT("debug", ("Conversion %srequired for field '%s' (#%ld)", + conv_field ? "" : "not ", + (*field_ptr)->field_name, field_ptr - begin_ptr)); + DBUG_ASSERT(f != NULL); /* No need to bother about columns that does not exist: they have @@ -275,6 +308,39 @@ unpack_row(Relay_log_info const *rli, (int) (pack_ptr - old_pack_ptr))); } + /* + If conv_field is set, then we are doing a conversion. In this + case, we have unpacked the master data to the conversion + table, so we need to copy the value stored in the conversion + table into the final table and do the conversion at the same time. + */ + if (conv_field) + { + Copy_field copy; +#ifndef DBUG_OFF + char source_buf[MAX_FIELD_WIDTH]; + char value_buf[MAX_FIELD_WIDTH]; + String source_type(source_buf, sizeof(source_buf), system_charset_info); + String value_string(value_buf, sizeof(value_buf), system_charset_info); + conv_field->sql_type(source_type); + conv_field->val_str(&value_string); + DBUG_PRINT("debug", ("Copying field '%s' of type '%s' with value '%s'", + (*field_ptr)->field_name, + source_type.c_ptr_safe(), value_string.c_ptr_safe())); +#endif + copy.set(*field_ptr, f, TRUE); + (*copy.do_copy)(©); +#ifndef DBUG_OFF + char target_buf[MAX_FIELD_WIDTH]; + String target_type(target_buf, sizeof(target_buf), system_charset_info); + (*field_ptr)->sql_type(target_type); + (*field_ptr)->val_str(&value_string); + DBUG_PRINT("debug", ("Value of field '%s' of type '%s' is now '%s'", + (*field_ptr)->field_name, + target_type.c_ptr_safe(), value_string.c_ptr_safe())); +#endif + } + null_mask <<= 1; } i++; diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 838782e1de8..b1ed75146a0 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -334,13 +334,21 @@ public: uint tables_to_lock_count; /* RBR: Count of tables to lock */ table_mapping m_table_map; /* RBR: Mapping table-id to table */ - inline table_def *get_tabledef(TABLE *tbl) + bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const { - table_def *td= 0; - for (TABLE_LIST *ptr= tables_to_lock; ptr && !td; ptr= ptr->next_global) - if (ptr->table == tbl) - td= &((RPL_TABLE_LIST *)ptr)->m_tabledef; - return (td); + DBUG_ASSERT(tabledef_var && conv_table_var); + for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global) + if (ptr->table == table_arg) + { + *tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef; + *conv_table_var= static_cast<RPL_TABLE_LIST*>(ptr)->m_conv_table; + DBUG_PRINT("debug", ("Fetching table data for table %s.%s:" + " tabledef: %p, conv_table: %p", + table_arg->s->db.str, table_arg->s->table_name.str, + *tabledef_var, *conv_table_var)); + return true; + } + return false; } /* diff --git a/sql/rpl_utility.cc b/sql/rpl_utility.cc index e34f8561051..feb35527b62 100644 --- a/sql/rpl_utility.cc +++ b/sql/rpl_utility.cc @@ -14,8 +14,176 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "rpl_utility.h" + +#ifndef MYSQL_CLIENT #include "rpl_rli.h" +/** + Function to compare two size_t integers for their relative + order. Used below. + */ +int compare(size_t a, size_t b) +{ + if (a < b) + return -1; + if (b < a) + return 1; + return 0; +} + + +/** + Max value for an unsigned integer of 'bits' bits. + + The somewhat contorted expression is to avoid overflow. + */ +uint32 uint_max(int bits) { + return (((1UL << (bits - 1)) - 1) << 1) | 1; +} + + +/** + Compute the maximum display length of a field. + + @param sql_type Type of the field + @param metadata The metadata from the master for the field. + @return Maximum length of the field in bytes. + */ +static uint32 +max_display_length_for_field(enum_field_types sql_type, unsigned int metadata) +{ + DBUG_PRINT("debug", ("sql_type: %d, metadata: 0x%x", sql_type, metadata)); + DBUG_ASSERT(metadata >> 16 == 0); + + switch (sql_type) { + case MYSQL_TYPE_NEWDECIMAL: + return metadata >> 8; + + case MYSQL_TYPE_FLOAT: + return 12; + + case MYSQL_TYPE_DOUBLE: + return 22; + + case MYSQL_TYPE_SET: + case MYSQL_TYPE_ENUM: + return metadata & 0x00ff; + + case MYSQL_TYPE_STRING: + { + uchar type= metadata >> 8; + if (type == MYSQL_TYPE_SET || type == MYSQL_TYPE_ENUM) + return metadata & 0xff; + else + /* This is taken from Field_string::unpack. */ + return (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff); + } + + case MYSQL_TYPE_YEAR: + case MYSQL_TYPE_TINY: + return 4; + + case MYSQL_TYPE_SHORT: + return 6; + + case MYSQL_TYPE_INT24: + return 9; + + case MYSQL_TYPE_LONG: + return 11; + +#ifdef HAVE_LONG_LONG + case MYSQL_TYPE_LONGLONG: + return 20; + +#endif + case MYSQL_TYPE_NULL: + return 0; + + case MYSQL_TYPE_NEWDATE: + return 3; + + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_TIME: + return 3; + + case MYSQL_TYPE_TIMESTAMP: + return 4; + + case MYSQL_TYPE_DATETIME: + return 8; + + case MYSQL_TYPE_BIT: + /* + Decode the size of the bit field from the master. + */ + DBUG_ASSERT((metadata & 0xff) <= 7); + return 8 * (metadata >> 8U) + (metadata & 0x00ff); + + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_VARCHAR: + return metadata; + + /* + The actual length for these types does not really matter since + they are used to calc_pack_length, which ignores the given + length for these types. + + Since we want this to be accurate for other uses, we return the + maximum size in bytes of these BLOBs. + */ + + case MYSQL_TYPE_TINY_BLOB: + return uint_max(1 * 8); + + case MYSQL_TYPE_MEDIUM_BLOB: + return uint_max(3 * 8); + + case MYSQL_TYPE_BLOB: + /* + For the blob type, Field::real_type() lies and say that all + blobs are of type MYSQL_TYPE_BLOB. In that case, we have to look + at the length instead to decide what the max display size is. + */ + return uint_max(metadata * 8); + + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_GEOMETRY: + return uint_max(4 * 8); + + default: + return ~(uint32) 0; + } +} + + +/* + Compare the pack lengths of a source field (on the master) and a + target field (on the slave). + + @param field Target field. + @param type Source field type. + @param metadata Source field metadata. + + @retval -1 The length of the source field is smaller than the target field. + @retval 0 The length of the source and target fields are the same. + @retval 1 The length of the source field is greater than the target field. + */ +int compare_lengths(Field *field, enum_field_types source_type, uint16 metadata) +{ + DBUG_ENTER("compare_lengths"); + size_t const source_length= + max_display_length_for_field(source_type, metadata); + size_t const target_length= field->max_display_length(); + DBUG_PRINT("debug", ("source_length: %lu, source_type: %u," + " target_length: %lu, target_type: %u", + (unsigned long) source_length, source_type, + (unsigned long) target_length, field->real_type())); + int result= compare(source_length, target_length); + DBUG_PRINT("result", ("%d", result)); + DBUG_RETURN(result); +} + /********************************************************************* * table_def member definitions * *********************************************************************/ @@ -169,58 +337,718 @@ uint32 table_def::calc_field_size(uint col, uchar *master_data) const return length; } -/* + +/** + */ +void show_sql_type(enum_field_types type, uint16 metadata, String *str) +{ + DBUG_ENTER("show_sql_type"); + DBUG_PRINT("enter", ("type: %d, metadata: 0x%x", type, metadata)); + + switch (type) + { + case MYSQL_TYPE_TINY: + str->set_ascii(STRING_WITH_LEN("tinyint")); + break; + + case MYSQL_TYPE_SHORT: + str->set_ascii(STRING_WITH_LEN("smallint")); + break; + + case MYSQL_TYPE_LONG: + str->set_ascii(STRING_WITH_LEN("int")); + break; + + case MYSQL_TYPE_FLOAT: + str->set_ascii(STRING_WITH_LEN("float")); + break; + + case MYSQL_TYPE_DOUBLE: + str->set_ascii(STRING_WITH_LEN("double")); + break; + + case MYSQL_TYPE_NULL: + str->set_ascii(STRING_WITH_LEN("null")); + break; + + case MYSQL_TYPE_TIMESTAMP: + str->set_ascii(STRING_WITH_LEN("timestamp")); + break; + + case MYSQL_TYPE_LONGLONG: + str->set_ascii(STRING_WITH_LEN("bigint")); + break; + + case MYSQL_TYPE_INT24: + str->set_ascii(STRING_WITH_LEN("mediumint")); + break; + + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_DATE: + str->set_ascii(STRING_WITH_LEN("date")); + break; + + case MYSQL_TYPE_TIME: + str->set_ascii(STRING_WITH_LEN("time")); + break; + + case MYSQL_TYPE_DATETIME: + str->set_ascii(STRING_WITH_LEN("datetime")); + break; + + case MYSQL_TYPE_YEAR: + str->set_ascii(STRING_WITH_LEN("year")); + break; + + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_VARCHAR: + { + CHARSET_INFO *cs= str->charset(); + uint32 length= + cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(), + "varchar(%u)", metadata); + str->length(length); + } + break; + + case MYSQL_TYPE_BIT: + { + CHARSET_INFO *cs= str->charset(); + int bit_length= 8 * (metadata >> 8) + (metadata & 0xFF); + uint32 length= + cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(), + "bit(%d)", bit_length); + str->length(length); + } + break; + + case MYSQL_TYPE_DECIMAL: + { + CHARSET_INFO *cs= str->charset(); + uint32 length= + cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(), + "decimal(%d,?)", metadata); + str->length(length); + } + break; + + case MYSQL_TYPE_NEWDECIMAL: + { + CHARSET_INFO *cs= str->charset(); + uint32 length= + cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(), + "decimal(%d,%d)", metadata >> 8, metadata & 0xff); + str->length(length); + } + break; + + case MYSQL_TYPE_ENUM: + str->set_ascii(STRING_WITH_LEN("enum")); + break; + + case MYSQL_TYPE_SET: + str->set_ascii(STRING_WITH_LEN("set")); + break; + + case MYSQL_TYPE_BLOB: + /* + Field::real_type() lies regarding the actual type of a BLOB, so + it is necessary to check the pack length to figure out what kind + of blob it really is. + */ + switch (get_blob_type_from_length(metadata)) + { + case MYSQL_TYPE_TINY_BLOB: + str->set_ascii(STRING_WITH_LEN("tinyblob")); + break; + + case MYSQL_TYPE_MEDIUM_BLOB: + str->set_ascii(STRING_WITH_LEN("mediumblob")); + break; + + case MYSQL_TYPE_LONG_BLOB: + str->set_ascii(STRING_WITH_LEN("longblob")); + break; + + case MYSQL_TYPE_BLOB: + str->set_ascii(STRING_WITH_LEN("blob")); + break; + + default: + DBUG_ASSERT(0); + break; + } + break; + + case MYSQL_TYPE_STRING: + { + /* + This is taken from Field_string::unpack. + */ + CHARSET_INFO *cs= str->charset(); + uint bytes= (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff); + uint32 length= + cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(), + "char(%d)", bytes / cs->mbmaxlen); + str->length(length); + } + break; + + case MYSQL_TYPE_GEOMETRY: + str->set_ascii(STRING_WITH_LEN("geometry")); + break; + + default: + str->set_ascii(STRING_WITH_LEN("<unknown type>")); + } + DBUG_VOID_RETURN; +} + + +/** + Check the order variable and print errors if the order is not + acceptable according to the current settings. + + @param order The computed order of the conversion needed. + @param rli The relay log info data structure: for error reporting. + */ +bool is_conversion_ok(int order, Relay_log_info *rli) +{ + DBUG_ENTER("is_conversion_ok"); + bool allow_non_lossy= + bit_is_set(slave_type_conversions_options, SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY); + bool allow_lossy= + bit_is_set(slave_type_conversions_options, SLAVE_TYPE_CONVERSIONS_ALL_LOSSY); + + DBUG_PRINT("enter", ("order: %d, flags:%s%s", order, + allow_non_lossy ? " ALL_NON_LOSSY" : "", + allow_lossy ? " ALL_LOSSY" : "")); + if (order < 0 && !allow_non_lossy) + { + /* !!! Add error message saying that non-lossy conversions need to be allowed. */ + DBUG_RETURN(false); + } + + if (order > 0 && !allow_lossy) + { + /* !!! Add error message saying that lossy conversions need to be allowed. */ + DBUG_RETURN(false); + } + + DBUG_RETURN(true); +} + + +/** + Can a type potentially be converted to another type? + + This function check if the types are convertible and what + conversion is required. + + If conversion is not possible, and error is printed. + + If conversion is possible: + + - *order will be set to -1 if source type is smaller than target + type and a non-lossy conversion can be required. This includes + the case where the field types are different but types could + actually be converted in either direction. + + - *order will be set to 0 if no conversion is required. + + - *order will be set to 1 if the source type is strictly larger + than the target type and that conversion is potentially lossy. + + @param[in] field Target field + @param[in] type Source field type + @param[in] metadata Source field metadata + @param[in] rli Relay log info (for error reporting) + @param[in] mflags Flags from the table map event + @param[out] order Order between source field and target field + + @return @c true if conversion is possible according to the current + settings, @c false if conversion is not possible according to the + current setting. + */ +static bool +can_convert_field_to(Field *field, + enum_field_types source_type, uint16 metadata, + Relay_log_info *rli, uint16 mflags, + int *order_var) +{ + DBUG_ENTER("can_convert_field_to"); +#ifndef DBUG_OFF + char field_type_buf[MAX_FIELD_WIDTH]; + String field_type(field_type_buf, sizeof(field_type_buf), field->charset()); + field->sql_type(field_type); + DBUG_PRINT("enter", ("field_type: %s, target_type: %d, source_type: %d, source_metadata: 0x%x", + field_type.c_ptr_safe(), field->real_type(), source_type, metadata)); +#endif + /* + If the real type is the same, we need to check the metadata to + decide if conversions are allowed. + */ + if (field->real_type() == source_type) + { + if (metadata == 0) // Metadata can only be zero if no metadata was provided + { + /* + If there is no metadata, we either have an old event where no + metadata were supplied, or a type that does not require any + metadata. In either case, conversion can be done but no + conversion table is necessary. + */ + DBUG_PRINT("debug", ("Base types are identical, but there is no metadata")); + *order_var= 0; + DBUG_RETURN(true); + } + + DBUG_PRINT("debug", ("Base types are identical, doing field size comparison")); + if (field->compatible_field_size(metadata, rli, mflags, order_var)) + DBUG_RETURN(is_conversion_ok(*order_var, rli)); + else + DBUG_RETURN(false); + } + else if (!slave_type_conversions_options) + DBUG_RETURN(false); + + /* + Here, from and to will always be different. Since the types are + different, we cannot use the compatible_field_size() function, but + have to rely on hard-coded max-sizes for fields. + */ + + DBUG_PRINT("debug", ("Base types are different, checking conversion")); + switch (source_type) // Source type (on master) + { + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_NEWDECIMAL: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + switch (field->real_type()) + { + case MYSQL_TYPE_NEWDECIMAL: + /* + Then the other type is either FLOAT, DOUBLE, or old style + DECIMAL, so we require lossy conversion. + */ + *order_var= 1; + DBUG_RETURN(is_conversion_ok(*order_var, rli)); + + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + { + if (source_type == MYSQL_TYPE_NEWDECIMAL || + source_type == MYSQL_TYPE_DECIMAL) + *order_var = 1; // Always require lossy conversions + else + *order_var= compare_lengths(field, source_type, metadata); + DBUG_ASSERT(*order_var != 0); + DBUG_RETURN(is_conversion_ok(*order_var, rli)); + } + + default: + DBUG_RETURN(false); + } + break; + + /* + The length comparison check will do the correct job of comparing + the field lengths (in bytes) of two integer types. + */ + case MYSQL_TYPE_TINY: + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_INT24: + case MYSQL_TYPE_LONG: + case MYSQL_TYPE_LONGLONG: + switch (field->real_type()) + { + case MYSQL_TYPE_TINY: + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_INT24: + case MYSQL_TYPE_LONG: + case MYSQL_TYPE_LONGLONG: + *order_var= compare_lengths(field, source_type, metadata); + DBUG_ASSERT(*order_var != 0); + DBUG_RETURN(is_conversion_ok(*order_var, rli)); + + default: + DBUG_RETURN(false); + } + break; + + /* + Since source and target type is different, and it is not possible + to convert bit types to anything else, this will return false. + */ + case MYSQL_TYPE_BIT: + DBUG_RETURN(false); + + /* + If all conversions are disabled, it is not allowed to convert + between these types. Since the TEXT vs. BINARY is distinguished by + the charset, and the charset is not replicated, we cannot + currently distinguish between , e.g., TEXT and BLOB. + */ + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_STRING: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_VARCHAR: + switch (field->real_type()) + { + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_STRING: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_VARCHAR: + *order_var= compare_lengths(field, source_type, metadata); + /* + Here we know that the types are different, so if the order + gives that they do not require any conversion, we still need + to have non-lossy conversion enabled to allow conversion + between different (string) types of the same length. + */ + if (*order_var == 0) + *order_var= -1; + DBUG_RETURN(is_conversion_ok(*order_var, rli)); + + default: + DBUG_RETURN(false); + } + break; + + case MYSQL_TYPE_GEOMETRY: + case MYSQL_TYPE_TIMESTAMP: + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_TIME: + case MYSQL_TYPE_DATETIME: + case MYSQL_TYPE_YEAR: + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_NULL: + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + DBUG_RETURN(false); + } + DBUG_RETURN(false); // To keep GCC happy +} + + +/** Is the definition compatible with a table? + This function will compare the master table with an existing table + on the slave and see if they are compatible with respect to the + current settings of @c SLAVE_TYPE_CONVERSIONS. + + If the tables are compatible and conversions are required, @c + *tmp_table_var will be set to a virtual temporary table with field + pointers for the fields that require conversions. This allow simple + checking of whether a conversion are to be applied or not. + + If tables are compatible, but no conversions are necessary, @c + *tmp_table_var will be set to NULL. + + @param rli_arg[in] + Relay log info, for error reporting. + + @param table[in] + Table to compare with + + @param tmp_table_var[out] + Virtual temporary table for performing conversions, if necessary. + + @retval true Master table is compatible with slave table. + @retval false Master table is not compatible with slave table. */ -int -table_def::compatible_with(Relay_log_info const *rli_arg, TABLE *table) +bool +table_def::compatible_with(THD *thd, Relay_log_info *rli, + TABLE *table, TABLE **conv_table_var) const { /* We only check the initial columns for the tables. */ uint const cols_to_check= min(table->s->fields, size()); - int error= 0; - Relay_log_info const *rli= const_cast<Relay_log_info*>(rli_arg); - - TABLE_SHARE const *const tsh= table->s; + TABLE *tmp_table= NULL; for (uint col= 0 ; col < cols_to_check ; ++col) { Field *const field= table->field[col]; - if (field->type() != type(col)) - { - DBUG_ASSERT(col < size() && col < tsh->fields); - DBUG_ASSERT(tsh->db.str && tsh->table_name.str); - error= 1; - char buf[256]; - my_snprintf(buf, sizeof(buf), "Column %d type mismatch - " - "received type %d, %s.%s has type %d", - col, type(col), tsh->db.str, tsh->table_name.str, - field->type()); - rli->report(ERROR_LEVEL, ER_BINLOG_ROW_WRONG_TABLE_DEF, - ER(ER_BINLOG_ROW_WRONG_TABLE_DEF), buf); + int order; + if (can_convert_field_to(field, type(col), field_metadata(col), rli, m_flags, &order)) + { + DBUG_PRINT("debug", ("Checking column %d -" + " field '%s' can be converted - order: %d", + col, field->field_name, order)); + DBUG_ASSERT(order >= -1 && order <= 1); + + /* + If order is not 0, a conversion is required, so we need to set + up the conversion table. + */ + if (order != 0 && tmp_table == NULL) + { + /* + This will create the full table with all fields. This is + necessary to ge the correct field lengths for the record. + */ + tmp_table= create_conversion_table(thd, rli, table); + if (tmp_table == NULL) + return false; + /* + Clear all fields up to, but not including, this column. + */ + for (unsigned int i= 0; i < col; ++i) + tmp_table->field[i]= NULL; + } + + if (order == 0 && tmp_table != NULL) + tmp_table->field[col]= NULL; } - /* - Check the slave's field size against that of the master. - */ - if (!error && - !field->compatible_field_size(field_metadata(col), rli_arg)) + else + { + DBUG_PRINT("debug", ("Checking column %d -" + " field '%s' can not be converted", + col, field->field_name)); + DBUG_ASSERT(col < size() && col < table->s->fields); + DBUG_ASSERT(table->s->db.str && table->s->table_name.str); + const char *db_name= table->s->db.str; + const char *tbl_name= table->s->table_name.str; + char source_buf[MAX_FIELD_WIDTH]; + char target_buf[MAX_FIELD_WIDTH]; + String source_type(source_buf, sizeof(source_buf), field->charset()); + String target_type(target_buf, sizeof(target_buf), field->charset()); + show_sql_type(type(col), field_metadata(col), &source_type); + field->sql_type(target_type); + rli->report(ERROR_LEVEL, ER_SLAVE_CONVERSION_FAILED, + ER(ER_SLAVE_CONVERSION_FAILED), + col, db_name, tbl_name, + source_type.c_ptr_safe(), target_type.c_ptr_safe()); + return false; + } + } + +#ifndef DBUG_OFF + if (tmp_table) + { + for (unsigned int col= 0; col < tmp_table->s->fields; ++col) + if (tmp_table->field[col]) + { + char source_buf[MAX_FIELD_WIDTH]; + char target_buf[MAX_FIELD_WIDTH]; + String source_type(source_buf, sizeof(source_buf), table->field[col]->charset()); + String target_type(target_buf, sizeof(target_buf), table->field[col]->charset()); + tmp_table->field[col]->sql_type(source_type); + table->field[col]->sql_type(target_type); + DBUG_PRINT("debug", ("Field %s - conversion required." + " Source type: '%s', Target type: '%s'", + tmp_table->field[col]->field_name, + source_type.c_ptr_safe(), target_type.c_ptr_safe())); + } + } +#endif + + *conv_table_var= tmp_table; + return true; +} + +/** + Create a conversion table. + + If the function is unable to create the conversion table, an error + will be printed and NULL will be returned. + + @return Pointer to conversion table, or NULL if unable to create + conversion table. + */ + +TABLE *table_def::create_conversion_table(THD *thd, Relay_log_info *rli, TABLE *target_table) const +{ + DBUG_ENTER("table_def::create_conversion_table"); + + List<Create_field> field_list; + + for (uint col= 0 ; col < size() ; ++col) + { + Create_field *field_def= + (Create_field*) alloc_root(thd->mem_root, sizeof(Create_field)); + if (field_list.push_back(field_def)) + DBUG_RETURN(NULL); + + uint decimals= 0; + TYPELIB* interval= NULL; + uint pack_length= 0; + uint32 max_length= + max_display_length_for_field(type(col), field_metadata(col)); + + switch(type(col)) + { + int precision; + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + interval= static_cast<Field_enum*>(target_table->field[col])->typelib; + pack_length= field_metadata(col) & 0x00ff; + break; + + case MYSQL_TYPE_NEWDECIMAL: + /* + The display length of a DECIMAL type is not the same as the + length that should be supplied to make_field, so we correct + the length here. + */ + precision= field_metadata(col) >> 8; + decimals= field_metadata(col) & 0x00ff; + max_length= + my_decimal_precision_to_length(precision, decimals, FALSE); + break; + + case MYSQL_TYPE_DECIMAL: + precision= field_metadata(col); + decimals= static_cast<Field_num*>(target_table->field[col])->dec; + max_length= field_metadata(col); + break; + + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_GEOMETRY: + pack_length= field_metadata(col) & 0x00ff; + break; + + default: + break; + } + + DBUG_PRINT("debug", ("sql_type: %d, target_field: '%s', max_length: %d, decimals: %d," + " maybe_null: %d, unsigned_flag: %d, pack_length: %u", + type(col), target_table->field[col]->field_name, + max_length, decimals, TRUE, FALSE, pack_length)); + field_def->init_for_tmp_table(type(col), + max_length, + decimals, + TRUE, // maybe_null + FALSE, // unsigned_flag + pack_length); + field_def->charset= target_table->field[col]->charset(); + field_def->interval= interval; + } + + TABLE *conv_table= create_virtual_tmp_table(thd, field_list); + if (conv_table == NULL) + rli->report(ERROR_LEVEL, ER_SLAVE_CANT_CREATE_CONVERSION, + ER(ER_SLAVE_CANT_CREATE_CONVERSION), + target_table->s->db.str, + target_table->s->table_name.str); + DBUG_RETURN(conv_table); +} + +#endif /* MYSQL_CLIENT */ + +table_def::table_def(unsigned char *types, ulong size, + uchar *field_metadata, int metadata_size, + uchar *null_bitmap, uint16 flags) + : m_size(size), m_type(0), m_field_metadata_size(metadata_size), + m_field_metadata(0), m_null_bits(0), m_flags(flags), + m_memory(NULL) +{ + m_memory= (uchar *)my_multi_malloc(MYF(MY_WME), + &m_type, size, + &m_field_metadata, + size * sizeof(uint16), + &m_null_bits, (size + 7) / 8, + NULL); + + bzero(m_field_metadata, size * sizeof(uint16)); + + if (m_type) + memcpy(m_type, types, size); + else + m_size= 0; + /* + Extract the data from the table map into the field metadata array + iff there is field metadata. The variable metadata_size will be + 0 if we are replicating from an older version server since no field + metadata was written to the table map. This can also happen if + there were no fields in the master that needed extra metadata. + */ + if (m_size && metadata_size) + { + int index= 0; + for (unsigned int i= 0; i < m_size; i++) { - error= 1; - char buf[256]; - my_snprintf(buf, sizeof(buf), "Column %d size mismatch - " - "master has size %d, %s.%s on slave has size %d." - " Master's column size should be <= the slave's " - "column size.", col, - field->pack_length_from_metadata(m_field_metadata[col]), - tsh->db.str, tsh->table_name.str, - field->row_pack_length()); - rli->report(ERROR_LEVEL, ER_BINLOG_ROW_WRONG_TABLE_DEF, - ER(ER_BINLOG_ROW_WRONG_TABLE_DEF), buf); + switch (m_type[i]) { + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_DOUBLE: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_GEOMETRY: + { + /* + These types store a single byte. + */ + m_field_metadata[i]= field_metadata[index]; + index++; + break; + } + case MYSQL_TYPE_SET: + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_STRING: + { + uint16 x= field_metadata[index++] << 8U; // real_type + x+= field_metadata[index++]; // pack or field length + m_field_metadata[i]= x; + break; + } + case MYSQL_TYPE_BIT: + { + uint16 x= field_metadata[index++]; + x = x + (field_metadata[index++] << 8U); + m_field_metadata[i]= x; + break; + } + case MYSQL_TYPE_VARCHAR: + { + /* + These types store two bytes. + */ + char *ptr= (char *)&field_metadata[index]; + m_field_metadata[i]= uint2korr(ptr); + index= index + 2; + break; + } + case MYSQL_TYPE_NEWDECIMAL: + { + uint16 x= field_metadata[index++] << 8U; // precision + x+= field_metadata[index++]; // decimals + m_field_metadata[i]= x; + break; + } + default: + m_field_metadata[i]= 0; + break; + } } } + if (m_size && null_bitmap) + memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8); +} + - return error; +table_def::~table_def() +{ + my_free(m_memory, MYF(0)); +#ifndef DBUG_OFF + m_type= 0; + m_size= 0; +#endif } + diff --git a/sql/rpl_utility.h b/sql/rpl_utility.h index 1f4ca246ff1..4b9bf3be93f 100644 --- a/sql/rpl_utility.h +++ b/sql/rpl_utility.h @@ -21,6 +21,7 @@ #endif #include "mysql_priv.h" +#include "mysql_com.h" class Relay_log_info; @@ -32,127 +33,24 @@ class Relay_log_info; - Extract and decode table definition data from the table map event - Check if table definition in table map is compatible with table definition on slave - - Currently, the only field type data available is an array of the - type operators that are present in the table map event. - - @todo Add type operands to this structure to allow detection of - difference between, e.g., BIT(5) and BIT(10). */ class table_def { public: /** - Convenience declaration of the type of the field type data in a - table map event. - */ - typedef unsigned char field_type; - - /** Constructor. - @param types Array of types + @param types Array of types, each stored as a byte @param size Number of elements in array 'types' @param field_metadata Array of extra information about fields @param metadata_size Size of the field_metadata array @param null_bitmap The bitmap of fields that can be null */ - table_def(field_type *types, ulong size, uchar *field_metadata, - int metadata_size, uchar *null_bitmap) - : m_size(size), m_type(0), m_field_metadata_size(metadata_size), - m_field_metadata(0), m_null_bits(0), m_memory(NULL) - { - m_memory= (uchar *)my_multi_malloc(MYF(MY_WME), - &m_type, size, - &m_field_metadata, - size * sizeof(uint16), - &m_null_bits, (size + 7) / 8, - NULL); - - bzero(m_field_metadata, size * sizeof(uint16)); - - if (m_type) - memcpy(m_type, types, size); - else - m_size= 0; - /* - Extract the data from the table map into the field metadata array - iff there is field metadata. The variable metadata_size will be - 0 if we are replicating from an older version server since no field - metadata was written to the table map. This can also happen if - there were no fields in the master that needed extra metadata. - */ - if (m_size && metadata_size) - { - int index= 0; - for (unsigned int i= 0; i < m_size; i++) - { - switch (m_type[i]) { - case MYSQL_TYPE_TINY_BLOB: - case MYSQL_TYPE_BLOB: - case MYSQL_TYPE_MEDIUM_BLOB: - case MYSQL_TYPE_LONG_BLOB: - case MYSQL_TYPE_DOUBLE: - case MYSQL_TYPE_FLOAT: - { - /* - These types store a single byte. - */ - m_field_metadata[i]= field_metadata[index]; - index++; - break; - } - case MYSQL_TYPE_SET: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_STRING: - { - uint16 x= field_metadata[index++] << 8U; // real_type - x+= field_metadata[index++]; // pack or field length - m_field_metadata[i]= x; - break; - } - case MYSQL_TYPE_BIT: - { - uint16 x= field_metadata[index++]; - x = x + (field_metadata[index++] << 8U); - m_field_metadata[i]= x; - break; - } - case MYSQL_TYPE_VARCHAR: - { - /* - These types store two bytes. - */ - char *ptr= (char *)&field_metadata[index]; - m_field_metadata[i]= uint2korr(ptr); - index= index + 2; - break; - } - case MYSQL_TYPE_NEWDECIMAL: - { - uint16 x= field_metadata[index++] << 8U; // precision - x+= field_metadata[index++]; // decimals - m_field_metadata[i]= x; - break; - } - default: - m_field_metadata[i]= 0; - break; - } - } - } - if (m_size && null_bitmap) - memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8); - } + table_def(unsigned char *types, ulong size, uchar *field_metadata, + int metadata_size, uchar *null_bitmap, uint16 flags); - ~table_def() { - my_free(m_memory, MYF(0)); -#ifndef DBUG_OFF - m_type= 0; - m_size= 0; -#endif - } + ~table_def(); /** Return the number of fields there is type data for. @@ -171,10 +69,40 @@ public: <code>index</code>. Currently, only the type identifier is returned. */ - field_type type(ulong index) const + enum_field_types type(ulong index) const { DBUG_ASSERT(index < m_size); - return m_type[index]; + /* + If the source type is MYSQL_TYPE_STRING, it can in reality be + either MYSQL_TYPE_STRING, MYSQL_TYPE_ENUM, or MYSQL_TYPE_SET, so + we might need to modify the type to get the real type. + */ + enum_field_types source_type= static_cast<enum_field_types>(m_type[index]); + uint16 source_metadata= m_field_metadata[index]; + switch (source_type) + { + case MYSQL_TYPE_STRING: + { + int real_type= source_metadata >> 8; + if (real_type == MYSQL_TYPE_ENUM || real_type == MYSQL_TYPE_SET) + source_type= static_cast<enum_field_types>(real_type); + break; + } + + /* + This type has not been used since before row-based replication, + so we can safely assume that it really is MYSQL_TYPE_NEWDATE. + */ + case MYSQL_TYPE_DATE: + source_type= MYSQL_TYPE_NEWDATE; + break; + + default: + /* Do nothing */ + break; + } + + return source_type; } @@ -226,26 +154,62 @@ public: with it. A table definition is compatible with a table if: - - the columns types of the table definition is a (not - necessarily proper) prefix of the column type of the table, or - - the other way around + - The columns types of the table definition is a (not + necessarily proper) prefix of the column type of the table. + + - The other way around. + - Each column on the master that also exists on the slave can be + converted according to the current settings of @c + SLAVE_TYPE_CONVERSIONS. + + @param thd @param rli Pointer to relay log info @param table Pointer to table to compare with. + @param[out] tmp_table_var Pointer to temporary table for holding + conversion table. + @retval 1 if the table definition is not compatible with @c table @retval 0 if the table definition is compatible with @c table */ #ifndef MYSQL_CLIENT - int compatible_with(Relay_log_info const *rli, TABLE *table) const; + bool compatible_with(THD *thd, Relay_log_info *rli, TABLE *table, + TABLE **conv_table_var) const; + + /** + Create a virtual in-memory temporary table structure. + + The table structure has records and field array so that a row can + be unpacked into the record for further processing. + + In the virtual table, each field that requires conversion will + have a non-NULL value, while fields that do not require + conversion will have a NULL value. + + Some information that is missing in the events, such as the + character set for string types, are taken from the table that the + field is going to be pushed into, so the target table that the data + eventually need to be pushed into need to be supplied. + + @param thd Thread to allocate memory from. + @param rli Relay log info structure, for error reporting. + @param target_table Target table for fields. + + @return A pointer to a temporary table with memory allocated in the + thread's memroot, NULL if the table could not be created + */ + TABLE *create_conversion_table(THD *thd, Relay_log_info *rli, TABLE *target_table) const; #endif + private: ulong m_size; // Number of elements in the types array - field_type *m_type; // Array of type descriptors + unsigned char *m_type; // Array of type descriptors uint m_field_metadata_size; uint16 *m_field_metadata; uchar *m_null_bits; + uint16 m_flags; // Table flags uchar *m_memory; }; @@ -260,6 +224,7 @@ struct RPL_TABLE_LIST { bool m_tabledef_valid; table_def m_tabledef; + TABLE *m_conv_table; }; diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index f40c0e9bfe1..21f803b2caa 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -6070,8 +6070,7 @@ ER_SLAVE_INCIDENT ER_NO_PARTITION_FOR_GIVEN_VALUE_SILENT eng "Table has no partition for some existing values" ER_BINLOG_UNSAFE_STATEMENT - eng "Statement may not be safe to log in statement format." - swe "Detta är inte säkert att logga i statement-format." + eng "Unsafe statement binlogged in statement format since BINLOG_FORMAT = STATEMENT. Reason for unsafeness: %s" ER_SLAVE_FATAL_ERROR eng "Fatal error: %s" ER_SLAVE_RELAY_LOG_READ_FAILURE @@ -6262,6 +6261,47 @@ ER_FIELD_TYPE_NOT_ALLOWED_AS_PARTITION_FIELD eng "Field '%-.192s' is of a not allowed type for this type of partitioning" ER_PARTITION_FIELDS_TOO_LONG eng "The total length of the partitioning fields is too large" +ER_BINLOG_ROW_ENGINE_AND_STMT_ENGINE + eng "Cannot execute statement: binlogging impossible since both row-incapable engines and statement-incapable engines are involved." +ER_BINLOG_ROW_MODE_AND_STMT_ENGINE + eng "Cannot execute statement: binlogging impossible since BINLOG_FORMAT = ROW and at least one table uses a storage engine limited to statement-logging." +ER_BINLOG_UNSAFE_AND_STMT_ENGINE + eng "Cannot execute statement: binlogging of unsafe statement is impossible when storage engine is limited to statement-logging and BINLOG_FORMAT = MIXED. Reason for unsafeness: %s" +ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE + eng "Cannot execute row injection: binlogging impossible since at least one table uses a storage engine limited to statement-logging." +ER_BINLOG_STMT_MODE_AND_ROW_ENGINE + eng "Cannot execute statement: binlogging impossible since BINLOG_FORMAT = STATEMENT and at least one table uses a storage engine limited to row-logging.%s" +ER_BINLOG_ROW_INJECTION_AND_STMT_MODE + eng "Cannot execute row injection: binlogging impossible since BINLOG_FORMAT = STATEMENT." +ER_BINLOG_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE + eng "Cannot execute statement: binlogging impossible since more than one engine is involved and at least one engine is self-logging." + +ER_BINLOG_UNSAFE_LIMIT + eng "Statement uses a LIMIT clause. This is unsafe because the set of rows included cannot be predicted." +ER_BINLOG_UNSAFE_INSERT_DELAYED + eng "Statement uses INSERT DELAYED. This is unsafe because the time when rows are inserted cannot be predicted." +ER_BINLOG_UNSAFE_SYSTEM_TABLE + eng "Statement uses the general_log, slow_log or performance_schema table(s). This is unsafe because system tables may differ on slave." +ER_BINLOG_UNSAFE_TWO_AUTOINC_COLUMNS + eng "Statement updates two AUTO_INCREMENT columns. This is unsafe because the generated value cannot be predicted by slave." +ER_BINLOG_UNSAFE_UDF + eng "Statement uses a UDF. It cannot be determined if the UDF will return the same value on slave." +ER_BINLOG_UNSAFE_SYSTEM_VARIABLE + eng "Statement uses a system variable whose value may differ on slave." +ER_BINLOG_UNSAFE_SYSTEM_FUNCTION + eng "Statement uses a system function whose value may differ on slave." +ER_BINLOG_UNSAFE_NONTRANS_AFTER_TRANS + eng "Non-transactional reads or writes are unsafe if they occur after transactional reads or writes inside a transaction." + +ER_MESSAGE_AND_STATEMENT + eng "%s Statement: %s" + +ER_SLAVE_CONVERSION_FAILED + eng "Column %d of table '%-.192s.%-.192s' cannot be converted from type '%-.32s' to type '%-.32s'" +ER_SLAVE_CANT_CREATE_CONVERSION + eng "Can't create conversion table for table '%-.192s.%-.192s'" +ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_BINLOG_FORMAT + eng "Cannot modify @@session.binlog_format inside a transaction" ER_PATH_LENGTH eng "The path specified for %.64s is too long." ER_WARN_DEPRECATED_SYNTAX_NO_REPLACEMENT diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt index 97f4d71cd50..a6c6485f076 100644 --- a/sql/share/errmsg.txt +++ b/sql/share/errmsg.txt @@ -6071,8 +6071,7 @@ ER_SLAVE_INCIDENT ER_NO_PARTITION_FOR_GIVEN_VALUE_SILENT eng "Table has no partition for some existing values" ER_BINLOG_UNSAFE_STATEMENT - eng "Statement may not be safe to log in statement format." - swe "Detta är inte säkert att logga i statement-format." + eng "Unsafe statement binlogged in statement format since BINLOG_FORMAT = STATEMENT. Reason for unsafeness: %s" ER_SLAVE_FATAL_ERROR eng "Fatal error: %s" ER_SLAVE_RELAY_LOG_READ_FAILURE @@ -6202,6 +6201,40 @@ ER_TOO_MANY_CONCURRENT_TRXS WARN_NON_ASCII_SEPARATOR_NOT_IMPLEMENTED eng "Non-ASCII separator arguments are not fully supported" +ER_BINLOG_ROW_ENGINE_AND_STMT_ENGINE + eng "Cannot execute statement: binlogging impossible since both row-incapable engines and statement-incapable engines are involved." +ER_BINLOG_ROW_MODE_AND_STMT_ENGINE + eng "Cannot execute statement: binlogging impossible since BINLOG_FORMAT = ROW and at least one table uses a storage engine limited to statement-logging." +ER_BINLOG_UNSAFE_AND_STMT_ENGINE + eng "Cannot execute statement: binlogging of unsafe statement is impossible when storage engine is limited to statement-logging and BINLOG_FORMAT = MIXED. Reason for unsafeness: %s" +ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE + eng "Cannot execute row injection: binlogging impossible since at least one table uses a storage engine limited to statement-logging." +ER_BINLOG_STMT_MODE_AND_ROW_ENGINE + eng "Cannot execute statement: binlogging impossible since BINLOG_FORMAT = STATEMENT and at least one table uses a storage engine limited to row-logging.%s" +ER_BINLOG_ROW_INJECTION_AND_STMT_MODE + eng "Cannot execute row injection: binlogging impossible since BINLOG_FORMAT = STATEMENT." +ER_BINLOG_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE + eng "Cannot execute statement: binlogging impossible since more than one engine is involved and at least one engine is self-logging." +ER_BINLOG_UNSAFE_LIMIT + eng "Statement uses a LIMIT clause. This is unsafe because the set of rows included cannot be predicted." +ER_BINLOG_UNSAFE_INSERT_DELAYED + eng "Statement uses INSERT DELAYED. This is unsafe because the time when rows are inserted cannot be predicted." +ER_BINLOG_UNSAFE_SYSTEM_TABLE + eng "Statement uses the general_log, slow_log or performance_schema table(s). This is unsafe because system tables may differ on slave." +ER_BINLOG_UNSAFE_TWO_AUTOINC_COLUMNS + eng "Statement updates two AUTO_INCREMENT columns. This is unsafe because the generated value cannot be predicted by slave." +ER_BINLOG_UNSAFE_UDF + eng "Statement uses a UDF. It cannot be determined if the UDF will return the same value on slave." +ER_BINLOG_UNSAFE_SYSTEM_VARIABLE + eng "Statement uses a system variable whose value may differ on slave." +ER_BINLOG_UNSAFE_SYSTEM_FUNCTION + eng "Statement uses a system function whose value may differ on slave." +ER_BINLOG_UNSAFE_NONTRANS_AFTER_TRANS + eng "Non-transactional reads or writes are unsafe if they occur after transactional reads or writes inside a transaction." + +ER_MESSAGE_AND_STATEMENT + eng "%s Statement: %s" + ER_DEBUG_SYNC_TIMEOUT eng "debug sync point wait timed out" ger "Debug Sync Point Wartezeit überschritten" @@ -6262,6 +6295,12 @@ ER_FIELD_TYPE_NOT_ALLOWED_AS_PARTITION_FIELD eng "Field '%-.192s' is of a not allowed type for this type of partitioning" ER_PARTITION_FIELDS_TOO_LONG eng "The total length of the partitioning fields is too large" +ER_SLAVE_CONVERSION_FAILED + eng "Column %d of table '%-.192s.%-.192s' cannot be converted from type '%-.32s' to type '%-.32s'" +ER_SLAVE_CANT_CREATE_CONVERSION + eng "Can't create conversion table for table '%-.192s.%-.192s'" +ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_BINLOG_FORMAT + eng "Cannot modify @@session.binlog_format inside a transaction" ER_PATH_LENGTH eng "The path specified for %.64s is too long." ER_WARN_DEPRECATED_SYNTAX_NO_REPLACEMENT diff --git a/sql/slave.cc b/sql/slave.cc index 7dca55f9181..69b7d8c4553 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -493,6 +493,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) DBUG_RETURN(0); /* successfully do nothing */ int error,force_all = (thread_mask & SLAVE_FORCE_ALL); mysql_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock; + mysql_mutex_t *log_lock= mi->rli.relay_log.get_log_lock(); if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) { @@ -504,6 +505,22 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) skip_lock)) && !force_all) DBUG_RETURN(error); + + mysql_mutex_lock(log_lock); + + DBUG_PRINT("info",("Flushing relay log and master info file.")); + if (current_thd) + thd_proc_info(current_thd, "Flushing relay log and master info files."); + if (flush_master_info(mi, TRUE /* flush relay log */)) + DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); + + if (my_sync(mi->rli.relay_log.get_log_file()->file, MYF(MY_WME))) + DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); + + if (my_sync(mi->fd, MYF(MY_WME))) + DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); + + mysql_mutex_unlock(log_lock); } if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) { @@ -515,8 +532,21 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) skip_lock)) && !force_all) DBUG_RETURN(error); + + mysql_mutex_lock(log_lock); + + DBUG_PRINT("info",("Flushing relay-log info file.")); + if (current_thd) + thd_proc_info(current_thd, "Flushing relay-log info file."); + if (flush_relay_log_info(&mi->rli)) + DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); + + if (my_sync(mi->rli.info_fd, MYF(MY_WME))) + DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); + + mysql_mutex_unlock(log_lock); } - DBUG_RETURN(0); + DBUG_RETURN(0); } @@ -1634,6 +1664,12 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, pos= net_store_data(pos, (uchar*) report_user, report_user_len); pos= net_store_data(pos, (uchar*) report_password, report_password_len); int2store(pos, (uint16) report_port); pos+= 2; + /* + Fake rpl_recovery_rank, which was removed in BUG#13963, + so that this server can register itself on old servers, + see BUG#49259. + */ + int4store(pos, /* rpl_recovery_rank */ 0); pos+= 4; /* The master will fill in master_id */ int4store(pos, 0); pos+= 4; @@ -4250,8 +4286,9 @@ MYSQL *rpl_connect_master(MYSQL *mysql) rli Relay log information NOTES - - As this is only called by the slave thread, we don't need to - have a lock on this. + - As this is only called by the slave thread or on STOP SLAVE, with the + log_lock grabbed and the slave thread stopped, we don't need to have + a lock here. - If there is an active transaction, then we don't update the position in the relay log. This is to ensure that we re-execute statements if we die in the middle of an transaction that was rolled back. @@ -4302,7 +4339,10 @@ bool flush_relay_log_info(Relay_log_info* rli) error=1; rli->sync_counter= 0; } - /* Flushing the relay log is done by the slave I/O thread */ + /* + Flushing the relay log is done by the slave I/O thread + or by the user on STOP SLAVE. + */ DBUG_RETURN(error); } diff --git a/sql/sp.cc b/sql/sp.cc index be05b9dd48b..62071218a93 100644 --- a/sql/sp.cc +++ b/sql/sp.cc @@ -926,7 +926,7 @@ sp_create_routine(THD *thd, int type, sp_head *sp) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); /* Grab an exclusive MDL lock. */ if (lock_routine_name(thd, type == TYPE_ENUM_FUNCTION, @@ -1127,9 +1127,9 @@ sp_create_routine(THD *thd, int type, sp_head *sp) /* restore sql_mode when binloging */ thd->variables.sql_mode= saved_mode; /* Such a statement can always go directly to binlog, no trans cache */ - if (thd->binlog_query(THD::MYSQL_QUERY_TYPE, + if (thd->binlog_query(THD::STMT_QUERY_TYPE, log_query.c_ptr(), log_query.length(), - FALSE, FALSE, 0)) + FALSE, FALSE, FALSE, 0)) ret= SP_INTERNAL_ERROR; thd->variables.sql_mode= 0; } @@ -1176,7 +1176,7 @@ sp_drop_routine(THD *thd, int type, sp_name *name) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); /* Grab an exclusive MDL lock. */ if (lock_routine_name(thd, type == TYPE_ENUM_FUNCTION, @@ -1256,7 +1256,7 @@ sp_update_routine(THD *thd, int type, sp_name *name, st_sp_chistics *chistics) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); if (!(table= open_proc_table_for_update(thd))) DBUG_RETURN(SP_OPEN_TABLE_FAILED); diff --git a/sql/sp_head.cc b/sql/sp_head.cc index 5cbe08f53f0..8bea84dc391 100644 --- a/sql/sp_head.cc +++ b/sql/sp_head.cc @@ -514,6 +514,7 @@ sp_head::sp_head() :Query_arena(&main_mem_root, INITIALIZED_FOR_SP), m_flags(0), m_sp_cache_version(0), + unsafe_flags(0), m_recursion_level(0), m_next_cached_sp(0), m_cont_level(0) @@ -1692,7 +1693,8 @@ sp_head::execute_function(THD *thd, Item **argp, uint argcount, each substatement be binlogged its way. */ need_binlog_call= mysql_bin_log.is_open() && - (thd->variables.option_bits & OPTION_BIN_LOG) && !thd->current_stmt_binlog_row_based; + (thd->variables.option_bits & OPTION_BIN_LOG) && + !thd->is_current_stmt_binlog_format_row(); /* Remember the original arguments for unrolled replication of functions @@ -1781,7 +1783,7 @@ sp_head::execute_function(THD *thd, Item **argp, uint argcount, { int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED); Query_log_event qinfo(thd, binlog_buf.ptr(), binlog_buf.length(), - thd->binlog_evt_union.unioned_events_trans, FALSE, errcode); + thd->binlog_evt_union.unioned_events_trans, FALSE, FALSE, errcode); if (mysql_bin_log.write(&qinfo) && thd->binlog_evt_union.unioned_events_trans) { @@ -2129,13 +2131,10 @@ sp_head::restore_lex(THD *thd) oldlex->trg_table_fields.push_back(&sublex->trg_table_fields); - /* - If this substatement needs row-based, the entire routine does too (we - cannot switch from statement-based to row-based only for this - substatement). - */ - if (sublex->is_stmt_unsafe()) - m_flags|= BINLOG_ROW_BASED_IF_MIXED; + /* If this substatement is unsafe, the entire routine is too. */ + DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags: 0x%x", + thd->lex->get_stmt_unsafe_flags())); + unsafe_flags|= sublex->get_stmt_unsafe_flags(); /* Add routines which are used by statement to respective set for diff --git a/sql/sp_head.h b/sql/sp_head.h index ea891f84024..d1e152765f2 100644 --- a/sql/sp_head.h +++ b/sql/sp_head.h @@ -150,9 +150,8 @@ public: HAS_COMMIT_OR_ROLLBACK= 128, LOG_SLOW_STATEMENTS= 256, // Used by events LOG_GENERAL_LOG= 512, // Used by events - BINLOG_ROW_BASED_IF_MIXED= 1024, - HAS_SQLCOM_RESET= 2048, - HAS_SQLCOM_FLUSH= 4096 + HAS_SQLCOM_RESET= 1024, + HAS_SQLCOM_FLUSH= 2048 }; /** TYPE_ENUM_FUNCTION, TYPE_ENUM_PROCEDURE or TYPE_ENUM_TRIGGER */ @@ -204,6 +203,11 @@ private: */ ulong m_sp_cache_version; Stored_program_creation_ctx *m_creation_ctx; + /** + Boolean combination of (1<<flag), where flag is a member of + LEX::enum_binlog_stmt_unsafe. + */ + uint32 unsafe_flags; public: inline Stored_program_creation_ctx *get_creation_ctx() @@ -462,14 +466,19 @@ public: */ void propagate_attributes(Query_tables_list *prelocking_ctx) { + DBUG_ENTER("sp_head::propagate_attributes"); /* If this routine needs row-based binary logging, the entire top statement too (we cannot switch from statement-based to row-based only for this routine, as in statement-based the top-statement may be binlogged and the substatements not). */ - if (m_flags & BINLOG_ROW_BASED_IF_MIXED) - prelocking_ctx->set_stmt_unsafe(); + DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x", + prelocking_ctx->get_stmt_unsafe_flags())); + DBUG_PRINT("info", ("sp_head(0x%p=%s)->unsafe_flags: 0x%x", + this, name(), unsafe_flags)); + prelocking_ctx->set_stmt_unsafe_flags(unsafe_flags); + DBUG_VOID_RETURN; } sp_pcontext *get_parse_context() { return m_pcont; } diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index 1f8b68a70bf..0ec8671abaf 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -1632,8 +1632,8 @@ bool change_password(THD *thd, const char *host, const char *user, acl_user->host.hostname ? acl_user->host.hostname : "", new_password)); thd->clear_error(); - result= thd->binlog_query(THD::MYSQL_QUERY_TYPE, buff, query_length, - FALSE, FALSE, 0); + result= thd->binlog_query(THD::STMT_QUERY_TYPE, buff, query_length, + FALSE, FALSE, FALSE, 0); } end: close_thread_tables(thd); @@ -3108,7 +3108,7 @@ int mysql_table_grant(THD *thd, TABLE_LIST *table_list, row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); #ifdef HAVE_REPLICATION /* @@ -3326,7 +3326,7 @@ bool mysql_routine_grant(THD *thd, TABLE_LIST *table_list, bool is_proc, row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); #ifdef HAVE_REPLICATION /* @@ -3467,7 +3467,7 @@ bool mysql_grant(THD *thd, const char *db, List <LEX_USER> &list, row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); #ifdef HAVE_REPLICATION /* @@ -5762,7 +5762,7 @@ bool mysql_create_user(THD *thd, List <LEX_USER> &list) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); /* CREATE USER may be skipped on replication client. */ if ((result= open_grant_tables(thd, tables))) @@ -5842,7 +5842,7 @@ bool mysql_drop_user(THD *thd, List <LEX_USER> &list) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); /* DROP USER may be skipped on replication client. */ if ((result= open_grant_tables(thd, tables))) @@ -5916,7 +5916,7 @@ bool mysql_rename_user(THD *thd, List <LEX_USER> &list) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); /* RENAME USER may be skipped on replication client. */ if ((result= open_grant_tables(thd, tables))) @@ -5998,7 +5998,7 @@ bool mysql_revoke_all(THD *thd, List <LEX_USER> &list) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); if ((result= open_grant_tables(thd, tables))) DBUG_RETURN(result != 1); @@ -6261,7 +6261,7 @@ bool sp_revoke_privileges(THD *thd, const char *sp_db, const char *sp_name, row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); /* Remove procedure access */ do diff --git a/sql/sql_base.cc b/sql/sql_base.cc index c5b442396c1..f68d9a29f05 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -33,7 +33,6 @@ #include <io.h> #endif -#define FLAGSTR(S,F) ((S) & (F) ? #F " " : "") /** This internal handler is used to trap internally @@ -1604,7 +1603,7 @@ void close_temporary_tables(THD *thd) return; if (!mysql_bin_log.is_open() || - (thd->current_stmt_binlog_row_based && thd->variables.binlog_format == BINLOG_FORMAT_ROW)) + (thd->is_current_stmt_binlog_format_row() && thd->variables.binlog_format == BINLOG_FORMAT_ROW)) { TABLE *tmp_next; for (table= thd->temporary_tables; table; table= tmp_next) @@ -1706,7 +1705,7 @@ void close_temporary_tables(THD *thd) thd->variables.character_set_client= system_charset_info; Query_log_event qinfo(thd, s_query.ptr(), s_query.length() - 1 /* to remove trailing ',' */, - 0, FALSE, 0); + FALSE, TRUE, FALSE, 0); qinfo.db= db.ptr(); qinfo.db_len= db.length(); thd->variables.character_set_client= cs_save; @@ -3639,7 +3638,7 @@ static bool open_table_entry_fini(THD *thd, TABLE_SHARE *share, TABLE *entry) int errcode= query_error_code(thd, TRUE); if (thd->binlog_query(THD::STMT_QUERY_TYPE, query, (ulong)(end-query), - FALSE, FALSE, errcode)) + FALSE, FALSE, FALSE, errcode)) { my_free(query, MYF(0)); return TRUE; @@ -4924,7 +4923,7 @@ static bool check_lock_and_start_stmt(THD *thd, TABLE *table, There may be more differences between open_n_lock_single_table() and open_ltable(). One known difference is that open_ltable() does - neither call decide_logging_format() nor handle some other logging + neither call thd->decide_logging_format() nor handle some other logging and locking issues because it does not call lock_tables(). */ @@ -5191,168 +5190,6 @@ static void mark_real_tables_as_free_for_reuse(TABLE_LIST *table) } -/** - Decide on logging format to use for the statement. - - Compute the capabilities vector for the involved storage engines - and mask out the flags for the binary log. Right now, the binlog - flags only include the capabilities of the storage engines, so this - is safe. - - We now have three alternatives that prevent the statement from - being loggable: - - 1. If there are no capabilities left (all flags are clear) it is - not possible to log the statement at all, so we roll back the - statement and report an error. - - 2. Statement mode is set, but the capabilities indicate that - statement format is not possible. - - 3. Row mode is set, but the capabilities indicate that row - format is not possible. - - 4. Statement is unsafe, but the capabilities indicate that row - format is not possible. - - If we are in MIXED mode, we then decide what logging format to use: - - 1. If the statement is unsafe, row-based logging is used. - - 2. If statement-based logging is not possible, row-based logging is - used. - - 3. Otherwise, statement-based logging is used. - - @param thd Client thread - @param tables Tables involved in the query - */ - -bool decide_logging_format(THD *thd, TABLE_LIST *tables) -{ - /* - In SBR mode, we are only proceeding if we are binlogging this - statement, ie, the filtering rules won't later filter this out. - - This check here is needed to prevent some spurious error to be - raised in some cases (See BUG#42829). - */ - if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG) && - (thd->variables.binlog_format != BINLOG_FORMAT_STMT || - binlog_filter->db_ok(thd->db))) - { - /* - Compute the starting vectors for the computations by creating a - set with all the capabilities bits set and one with no - capabilities bits set. - */ - handler::Table_flags flags_some_set= 0; - handler::Table_flags flags_all_set= - HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE; - - my_bool multi_engine= FALSE; - void* prev_ht= NULL; - for (TABLE_LIST *table= tables; table; table= table->next_global) - { - TABLE_CATEGORY category; - if (table->placeholder()) - continue; - category= table->table->s->table_category; - if ((category == TABLE_CATEGORY_LOG) || - (category == TABLE_CATEGORY_PERFORMANCE)) - thd->lex->set_stmt_unsafe(); - if (table->lock_type >= TL_WRITE_ALLOW_WRITE) - { - ulonglong const flags= table->table->file->ha_table_flags(); - DBUG_PRINT("info", ("table: %s; ha_table_flags: %s%s", - table->table_name, - FLAGSTR(flags, HA_BINLOG_STMT_CAPABLE), - FLAGSTR(flags, HA_BINLOG_ROW_CAPABLE))); - if (prev_ht && prev_ht != table->table->file->ht) - multi_engine= TRUE; - prev_ht= table->table->file->ht; - flags_all_set &= flags; - flags_some_set |= flags; - } - } - - DBUG_PRINT("info", ("flags_all_set: %s%s", - FLAGSTR(flags_all_set, HA_BINLOG_STMT_CAPABLE), - FLAGSTR(flags_all_set, HA_BINLOG_ROW_CAPABLE))); - DBUG_PRINT("info", ("flags_some_set: %s%s", - FLAGSTR(flags_some_set, HA_BINLOG_STMT_CAPABLE), - FLAGSTR(flags_some_set, HA_BINLOG_ROW_CAPABLE))); - DBUG_PRINT("info", ("thd->variables.binlog_format: %u", - thd->variables.binlog_format)); - DBUG_PRINT("info", ("multi_engine: %s", - multi_engine ? "TRUE" : "FALSE")); - - int error= 0; - if (flags_all_set == 0) - { - my_error((error= ER_BINLOG_LOGGING_IMPOSSIBLE), MYF(0), - "Statement cannot be logged to the binary log in" - " row-based nor statement-based format"); - } - else if (thd->variables.binlog_format == BINLOG_FORMAT_STMT && - (flags_all_set & HA_BINLOG_STMT_CAPABLE) == 0) - { - my_error((error= ER_BINLOG_LOGGING_IMPOSSIBLE), MYF(0), - "Statement-based format required for this statement," - " but not allowed by this combination of engines"); - } - else if ((thd->variables.binlog_format == BINLOG_FORMAT_ROW || - thd->lex->is_stmt_unsafe()) && - (flags_all_set & HA_BINLOG_ROW_CAPABLE) == 0) - { - my_error((error= ER_BINLOG_LOGGING_IMPOSSIBLE), MYF(0), - "Row-based format required for this statement," - " but not allowed by this combination of engines"); - } - - /* - If more than one engine is involved in the statement and at - least one is doing it's own logging (is *self-logging*), the - statement cannot be logged atomically, so we generate an error - rather than allowing the binlog to become corrupt. - */ - if (multi_engine && - (flags_some_set & HA_HAS_OWN_BINLOGGING)) - { - error= ER_BINLOG_LOGGING_IMPOSSIBLE; - my_error(error, MYF(0), - "Statement cannot be written atomically since more" - " than one engine involved and at least one engine" - " is self-logging"); - } - - DBUG_PRINT("info", ("error: %d", error)); - - if (error) - return TRUE; - - /* - We switch to row-based format if we are in mixed mode and one of - the following are true: - - 1. If the statement is unsafe - 2. If statement format cannot be used - - Observe that point to cannot be decided before the tables - involved in a statement has been checked, i.e., we cannot put - this code in reset_current_stmt_binlog_row_based(), it has to be - here. - */ - if (thd->lex->is_stmt_unsafe() || - (flags_all_set & HA_BINLOG_STMT_CAPABLE) == 0) - { - thd->set_current_stmt_binlog_row_based_if_mixed(); - } - } - - return FALSE; -} - /* Lock all tables in list @@ -5397,7 +5234,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, *need_reopen= FALSE; if (!tables && !thd->lex->requires_prelocking()) - DBUG_RETURN(decide_logging_format(thd, tables)); + DBUG_RETURN(thd->decide_logging_format(tables)); /* Check for thd->locked_tables_mode to avoid a redundant @@ -5434,7 +5271,8 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, if (thd->variables.binlog_format != BINLOG_FORMAT_ROW && tables && has_write_table_with_auto_increment(thd->lex->first_not_own_table())) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_TWO_AUTOINC_COLUMNS); + thd->set_current_stmt_binlog_format_row_if_mixed(); } } @@ -5541,7 +5379,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, } } - DBUG_RETURN(decide_logging_format(thd, tables)); + DBUG_RETURN(thd->decide_logging_format(tables)); } diff --git a/sql/sql_class.cc b/sql/sql_class.cc index aaacce19b53..95156a67022 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -39,6 +39,7 @@ #include <io.h> #endif #include <mysys_err.h> +#include <limits.h> #include "sp_rcontext.h" #include "sp_cache.h" @@ -447,7 +448,7 @@ THD::THD() lock_id(&main_lock_id), user_time(0), in_sub_stmt(0), sql_log_bin_toplevel(false), - binlog_table_maps(0), binlog_flags(0UL), + binlog_unsafe_warning_flags(0), binlog_table_maps(0), table_map_for_update(0), arg_of_last_insert_id_function(FALSE), first_successful_insert_id_in_prev_stmt(0), @@ -899,14 +900,15 @@ void THD::init(void) if (variables.sql_mode & MODE_NO_BACKSLASH_ESCAPES) server_status|= SERVER_STATUS_NO_BACKSLASH_ESCAPES; - transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= FALSE; + transaction.all.modified_non_trans_table= + transaction.stmt.modified_non_trans_table= FALSE; open_options=ha_open_options; update_lock_default= (variables.low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE); session_tx_isolation= (enum_tx_isolation) variables.tx_isolation; update_charset(); - reset_current_stmt_binlog_row_based(); + reset_current_stmt_binlog_format_row(); bzero((char *) &status_var, sizeof(status_var)); sql_log_bin_toplevel= variables.option_bits & OPTION_BIN_LOG; @@ -3185,13 +3187,14 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup, first_successful_insert_id_in_cur_stmt; if ((!lex->requires_prelocking() || is_update_query(lex->sql_command)) && - !current_stmt_binlog_row_based) + !is_current_stmt_binlog_format_row()) { variables.option_bits&= ~OPTION_BIN_LOG; } - if ((backup->option_bits & OPTION_BIN_LOG) && is_update_query(lex->sql_command)&& - !current_stmt_binlog_row_based) + if ((backup->option_bits & OPTION_BIN_LOG) && + is_update_query(lex->sql_command) && + !is_current_stmt_binlog_format_row()) mysql_bin_log.start_union_events(this, this->query_id); /* Disable result sets */ @@ -3253,7 +3256,7 @@ void THD::restore_sub_statement_state(Sub_statement_state *backup) is_fatal_sub_stmt_error= FALSE; if ((variables.option_bits & OPTION_BIN_LOG) && is_update_query(lex->sql_command) && - !current_stmt_binlog_row_based) + !is_current_stmt_binlog_format_row()) mysql_bin_log.stop_union_events(this); /* @@ -3441,6 +3444,392 @@ void xid_cache_delete(XID_STATE *xid_state) mysql_mutex_unlock(&LOCK_xid_cache); } + +/** + Decide on logging format to use for the statement and issue errors + or warnings as needed. The decision depends on the following + parameters: + + - The logging mode, i.e., the value of binlog_format. Can be + statement, mixed, or row. + + - The type of statement. There are three types of statements: + "normal" safe statements; unsafe statements; and row injections. + An unsafe statement is one that, if logged in statement format, + might produce different results when replayed on the slave (e.g., + INSERT DELAYED). A row injection is either a BINLOG statement, or + a row event executed by the slave's SQL thread. + + - The capabilities of tables modified by the statement. The + *capabilities vector* for a table is a set of flags associated + with the table. Currently, it only includes two flags: *row + capability flag* and *statement capability flag*. + + The row capability flag is set if and only if the engine can + handle row-based logging. The statement capability flag is set if + and only if the table can handle statement-based logging. + + Decision table for logging format + --------------------------------- + + The following table summarizes how the format and generated + warning/error depends on the tables' capabilities, the statement + type, and the current binlog_format. + + Row capable N NNNNNNNNN YYYYYYYYY YYYYYYYYY + Statement capable N YYYYYYYYY NNNNNNNNN YYYYYYYYY + + Statement type * SSSUUUIII SSSUUUIII SSSUUUIII + + binlog_format * SMRSMRSMR SMRSMRSMR SMRSMRSMR + + Logged format - SS-S----- -RR-RR-RR SRRSRR-RR + Warning/Error 1 --2732444 5--5--6-- ---7--6-- + + Legend + ------ + + Row capable: N - Some table not row-capable, Y - All tables row-capable + Stmt capable: N - Some table not stmt-capable, Y - All tables stmt-capable + Statement type: (S)afe, (U)nsafe, or Row (I)njection + binlog_format: (S)TATEMENT, (M)IXED, or (R)OW + Logged format: (S)tatement or (R)ow + Warning/Error: Warnings and error messages are as follows: + + 1. Error: Cannot execute statement: binlogging impossible since both + row-incapable engines and statement-incapable engines are + involved. + + 2. Error: Cannot execute statement: binlogging impossible since + BINLOG_FORMAT = ROW and at least one table uses a storage engine + limited to statement-logging. + + 3. Error: Cannot execute statement: binlogging of unsafe statement + is impossible when storage engine is limited to statement-logging + and BINLOG_FORMAT = MIXED. + + 4. Error: Cannot execute row injection: binlogging impossible since + at least one table uses a storage engine limited to + statement-logging. + + 5. Error: Cannot execute statement: binlogging impossible since + BINLOG_FORMAT = STATEMENT and at least one table uses a storage + engine limited to row-logging. + + 6. Error: Cannot execute row injection: binlogging impossible since + BINLOG_FORMAT = STATEMENT. + + 7. Warning: Unsafe statement binlogged in statement format since + BINLOG_FORMAT = STATEMENT. + + In addition, we can produce the following error (not depending on + the variables of the decision diagram): + + 8. Error: Cannot execute statement: binlogging impossible since more + than one engine is involved and at least one engine is + self-logging. + + For each error case above, the statement is prevented from being + logged, we report an error, and roll back the statement. For + warnings, we set the thd->binlog_flags variable: the warning will be + printed only if the statement is successfully logged. + + @see THD::binlog_query + + @param[in] thd Client thread + @param[in] tables Tables involved in the query + + @retval 0 No error; statement can be logged. + @retval -1 One of the error conditions above applies (1, 2, 4, 5, or 6). +*/ + +int THD::decide_logging_format(TABLE_LIST *tables) +{ + DBUG_ENTER("THD::decide_logging_format"); + DBUG_PRINT("info", ("query: %s", query())); + DBUG_PRINT("info", ("variables.binlog_format: %ld", + variables.binlog_format)); + DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x", + lex->get_stmt_unsafe_flags())); + + /* + We should not decide logging format if the binlog is closed or + binlogging is off, or if the statement is filtered out from the + binlog by filtering rules. + */ + if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) && + !(variables.binlog_format == BINLOG_FORMAT_STMT && + !binlog_filter->db_ok(db))) + { + /* + Compute one bit field with the union of all the engine + capabilities, and one with the intersection of all the engine + capabilities. + */ + handler::Table_flags flags_some_set= 0; + handler::Table_flags flags_all_set= + HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE; + + my_bool multi_engine= FALSE; + my_bool mixed_engine= FALSE; + my_bool all_trans_engines= TRUE; + TABLE* prev_write_table= NULL; + TABLE* prev_access_table= NULL; + +#ifndef DBUG_OFF + { + static const char *prelocked_mode_name[] = { + "NON_PRELOCKED", + "PRELOCKED", + "PRELOCKED_UNDER_LOCK_TABLES", + }; + DBUG_PRINT("debug", ("prelocked_mode: %s", + prelocked_mode_name[locked_tables_mode])); + } +#endif + + /* + Get the capabilities vector for all involved storage engines and + mask out the flags for the binary log. + */ + for (TABLE_LIST *table= tables; table; table= table->next_global) + { + if (table->placeholder()) + continue; + if (table->table->s->table_category == TABLE_CATEGORY_PERFORMANCE) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_TABLE); + if (table->lock_type >= TL_WRITE_ALLOW_WRITE) + { + handler::Table_flags const flags= table->table->file->ha_table_flags(); + DBUG_PRINT("info", ("table: %s; ha_table_flags: 0x%llx", + table->table_name, flags)); + if (prev_write_table && prev_write_table->file->ht != + table->table->file->ht) + multi_engine= TRUE; + all_trans_engines= all_trans_engines && + table->table->file->has_transactions(); + prev_write_table= table->table; + flags_all_set &= flags; + flags_some_set |= flags; + } + if (prev_access_table && prev_access_table->file->ht != table->table->file->ht) + mixed_engine= mixed_engine || (prev_access_table->file->has_transactions() != + table->table->file->has_transactions()); + prev_access_table= table->table; + } + + /* + Set the statement as unsafe if: + + . it is a mixed statement, i.e. access transactional and non-transactional + tables, and updates at least one; + or + . an early statement updated a transactional table; + . and, the current statement updates a non-transactional table. + + Any mixed statement is classified as unsafe to ensure that mixed mode is + completely safe. Consider the following example to understand why we + decided to do this: + + Note that mixed statements such as + + 1: INSERT INTO myisam_t SELECT * FROM innodb_t; + + 2: INSERT INTO innodb_t SELECT * FROM myisam_t; + + are classified as unsafe to ensure that in mixed mode the execution is + completely safe and equivalent to the row mode. Consider the following + statements and sessions (connections) to understand the reason: + + con1: INSERT INTO innodb_t VALUES (1); + con1: INSERT INTO innodb_t VALUES (100); + + con1: BEGIN + con2: INSERT INTO myisam_t SELECT * FROM innodb_t; + con1: INSERT INTO innodb_t VALUES (200); + con1: COMMIT; + + The point is that the concurrent statements may be written into the binary log + in a way different from the execution. For example, + + BINARY LOG: + + con2: BEGIN; + con2: INSERT INTO myisam_t SELECT * FROM innodb_t; + con2: COMMIT; + con1: BEGIN + con1: INSERT INTO innodb_t VALUES (200); + con1: COMMIT; + + .... + + or + + BINARY LOG: + + con1: BEGIN + con1: INSERT INTO innodb_t VALUES (200); + con1: COMMIT; + con2: BEGIN; + con2: INSERT INTO myisam_t SELECT * FROM innodb_t; + con2: COMMIT; + + Clearly, this may become a problem in STMT mode and setting the statement + as unsafe will make rows to be written into the binary log in MIXED mode + and as such the problem will not stand. + + In STMT mode, although such statement is classified as unsafe, i.e. + + INSERT INTO myisam_t SELECT * FROM innodb_t; + + there is no enough information to avoid writing it outside the boundaries + of a transaction. This is not a problem if we are considering snapshot + isolation level but if we have pure repeatable read or serializable the + lock history on the slave will be different from the master. + */ + if (mixed_engine || + trans_has_updated_trans_table(this) && !all_trans_engines) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_NONTRANS_AFTER_TRANS); + + DBUG_PRINT("info", ("flags_all_set: 0x%llx", flags_all_set)); + DBUG_PRINT("info", ("flags_some_set: 0x%llx", flags_some_set)); + DBUG_PRINT("info", ("multi_engine: %d", multi_engine)); + + int error= 0; + int unsafe_flags; + + /* + If more than one engine is involved in the statement and at + least one is doing it's own logging (is *self-logging*), the + statement cannot be logged atomically, so we generate an error + rather than allowing the binlog to become corrupt. + */ + if (multi_engine && + (flags_some_set & HA_HAS_OWN_BINLOGGING)) + { + my_error((error= ER_BINLOG_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE), + MYF(0)); + } + + /* both statement-only and row-only engines involved */ + if ((flags_all_set & (HA_BINLOG_STMT_CAPABLE | HA_BINLOG_ROW_CAPABLE)) == 0) + { + /* + 1. Error: Binary logging impossible since both row-incapable + engines and statement-incapable engines are involved + */ + my_error((error= ER_BINLOG_ROW_ENGINE_AND_STMT_ENGINE), MYF(0)); + } + /* statement-only engines involved */ + else if ((flags_all_set & HA_BINLOG_ROW_CAPABLE) == 0) + { + if (lex->is_stmt_row_injection()) + { + /* + 4. Error: Cannot execute row injection since table uses + storage engine limited to statement-logging + */ + my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0)); + } + else if (variables.binlog_format == BINLOG_FORMAT_ROW) + { + /* + 2. Error: Cannot modify table that uses a storage engine + limited to statement-logging when BINLOG_FORMAT = ROW + */ + my_error((error= ER_BINLOG_ROW_MODE_AND_STMT_ENGINE), MYF(0)); + } + else if ((unsafe_flags= lex->get_stmt_unsafe_flags()) != 0) + { + /* + 3. Error: Cannot execute statement: binlogging of unsafe + statement is impossible when storage engine is limited to + statement-logging and BINLOG_FORMAT = MIXED. + */ + for (int unsafe_type= 0; + unsafe_type < LEX::BINLOG_STMT_UNSAFE_COUNT; + unsafe_type++) + if (unsafe_flags & (1 << unsafe_type)) + my_error((error= ER_BINLOG_UNSAFE_AND_STMT_ENGINE), MYF(0), + ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); + } + /* log in statement format! */ + } + /* no statement-only engines */ + else + { + /* binlog_format = STATEMENT */ + if (variables.binlog_format == BINLOG_FORMAT_STMT) + { + if (lex->is_stmt_row_injection()) + { + /* + 6. Error: Cannot execute row injection since + BINLOG_FORMAT = STATEMENT + */ + my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_MODE), MYF(0)); + } + else if ((flags_all_set & HA_BINLOG_STMT_CAPABLE) == 0) + { + /* + 5. Error: Cannot modify table that uses a storage engine + limited to row-logging when binlog_format = STATEMENT + */ + my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), ""); + } + else if ((unsafe_flags= lex->get_stmt_unsafe_flags()) != 0) + { + /* + 7. Warning: Unsafe statement logged as statement due to + binlog_format = STATEMENT + */ + binlog_unsafe_warning_flags|= unsafe_flags; + DBUG_PRINT("info", ("Scheduling warning to be issued by " + "binlog_query: '%s'", + ER(ER_BINLOG_UNSAFE_STATEMENT))); + DBUG_PRINT("info", ("binlog_unsafe_warning_flags: 0x%x", + binlog_unsafe_warning_flags)); + } + /* log in statement format! */ + } + /* No statement-only engines and binlog_format != STATEMENT. + I.e., nothing prevents us from row logging if needed. */ + else + { + if (lex->is_stmt_unsafe() || lex->is_stmt_row_injection() + || (flags_all_set & HA_BINLOG_STMT_CAPABLE) == 0) + { + /* log in row format! */ + set_current_stmt_binlog_format_row_if_mixed(); + } + } + } + + if (error) { + DBUG_PRINT("info", ("decision: no logging since an error was generated")); + DBUG_RETURN(-1); + } + DBUG_PRINT("info", ("decision: logging in %s format", + is_current_stmt_binlog_format_row() ? + "ROW" : "STATEMENT")); + } +#ifndef DBUG_OFF + else + DBUG_PRINT("info", ("decision: no logging since " + "mysql_bin_log.is_open() = %d " + "and (options & OPTION_BIN_LOG) = 0x%llx " + "and binlog_format = %ld " + "and binlog_filter->db_ok(db) = %d", + mysql_bin_log.is_open(), + (variables.option_bits & OPTION_BIN_LOG), + variables.binlog_format, + binlog_filter->db_ok(db))); +#endif + + DBUG_RETURN(0); +} + + /* Implementation of interface to write rows to the binary log through the thread. The thread is responsible for writing the rows it has @@ -3492,7 +3881,7 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, if (binlog_setup_trx_data()) DBUG_RETURN(NULL); - Rows_log_event* pending= binlog_get_pending_rows_event(); + Rows_log_event* pending= binlog_get_pending_rows_event(is_transactional); if (unlikely(pending && !pending->is_valid())) DBUG_RETURN(NULL); @@ -3526,7 +3915,9 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, flush the pending event and replace it with the newly created event... */ - if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev))) + if (unlikely( + mysql_bin_log.flush_and_set_pending_rows_event(this, ev, + is_transactional))) { delete ev; DBUG_RETURN(NULL); @@ -3749,7 +4140,7 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, uchar const *record) { - DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); /* Pack records into format for transfer. We are allocating more @@ -3779,7 +4170,7 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, const uchar *before_record, const uchar *after_record) { - DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); size_t const before_maxlen = max_row_length(table, before_record); size_t const after_maxlen = max_row_length(table, after_record); @@ -3824,7 +4215,7 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, uchar const *record) { - DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); /* Pack records into format for transfer. We are allocating more @@ -3850,14 +4241,15 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, } -int THD::binlog_remove_pending_rows_event(bool clear_maps) +int THD::binlog_remove_pending_rows_event(bool clear_maps, + bool is_transactional) { DBUG_ENTER("THD::binlog_remove_pending_rows_event"); if (!mysql_bin_log.is_open()) DBUG_RETURN(0); - mysql_bin_log.remove_pending_rows_event(this); + mysql_bin_log.remove_pending_rows_event(this, is_transactional); if (clear_maps) binlog_table_maps= 0; @@ -3865,7 +4257,7 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps) DBUG_RETURN(0); } -int THD::binlog_flush_pending_rows_event(bool stmt_end) +int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) { DBUG_ENTER("THD::binlog_flush_pending_rows_event"); /* @@ -3881,7 +4273,7 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end) flag is set. */ int error= 0; - if (Rows_log_event *pending= binlog_get_pending_rows_event()) + if (Rows_log_event *pending= binlog_get_pending_rows_event(is_transactional)) { if (stmt_end) { @@ -3890,7 +4282,8 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end) binlog_table_maps= 0; } - error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0); + error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0, + is_transactional); } DBUG_RETURN(error); @@ -3906,8 +4299,6 @@ show_query_type(THD::enum_binlog_query_type qtype) return "ROW"; case THD::STMT_QUERY_TYPE: return "STMT"; - case THD::MYSQL_QUERY_TYPE: - return "MYSQL"; case THD::QUERY_TYPE_COUNT: default: DBUG_ASSERT(0 <= qtype && qtype < THD::QUERY_TYPE_COUNT); @@ -3919,32 +4310,97 @@ show_query_type(THD::enum_binlog_query_type qtype) #endif -/* - Member function that will log query, either row-based or - statement-based depending on the value of the 'current_stmt_binlog_row_based' - the value of the 'qtype' flag. +/** + Auxiliary method used by @c binlog_query() to raise warnings. - This function should be called after the all calls to ha_*_row() - functions have been issued, but before tables are unlocked and - closed. + The type of warning and the type of unsafeness is stored in + THD::binlog_unsafe_warning_flags. +*/ +void THD::issue_unsafe_warnings() +{ + DBUG_ENTER("issue_unsafe_warnings"); + /* + Ensure that binlog_unsafe_warning_flags is big enough to hold all + bits. This is actually a constant expression. + */ + DBUG_ASSERT(2 * LEX::BINLOG_STMT_UNSAFE_COUNT <= + sizeof(binlog_unsafe_warning_flags) * CHAR_BIT); - OBSERVE - There shall be no writes to any system table after calling - binlog_query(), so these writes has to be moved to before the call - of binlog_query() for correct functioning. + uint32 unsafe_type_flags= binlog_unsafe_warning_flags; - This is necessesary not only for RBR, but the master might crash - after binlogging the query but before changing the system tables. - This means that the slave and the master are not in the same state - (after the master has restarted), so therefore we have to - eliminate this problem. + /* + Clear: (1) bits above BINLOG_STMT_UNSAFE_COUNT; (2) bits for + warnings that have been printed already. + */ + unsafe_type_flags &= (LEX::BINLOG_STMT_UNSAFE_ALL_FLAGS ^ + (unsafe_type_flags >> LEX::BINLOG_STMT_UNSAFE_COUNT)); + /* If all warnings have been printed already, return. */ + if (unsafe_type_flags == 0) + DBUG_VOID_RETURN; - RETURN VALUE - Error code, or 0 if no error. + DBUG_PRINT("info", ("unsafe_type_flags: 0x%x", unsafe_type_flags)); + + /* + For each unsafe_type, check if the statement is unsafe in this way + and issue a warning. + */ + for (int unsafe_type=0; + unsafe_type < LEX::BINLOG_STMT_UNSAFE_COUNT; + unsafe_type++) + { + if ((unsafe_type_flags & (1 << unsafe_type)) != 0) + { + push_warning_printf(this, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_BINLOG_UNSAFE_STATEMENT, + ER(ER_BINLOG_UNSAFE_STATEMENT), + ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); + if (global_system_variables.log_warnings) + { + char buf[MYSQL_ERRMSG_SIZE * 2]; + sprintf(buf, ER(ER_BINLOG_UNSAFE_STATEMENT), + ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); + sql_print_warning(ER(ER_MESSAGE_AND_STATEMENT), buf, query()); + } + } + } + /* + Mark these unsafe types as already printed, to avoid printing + warnings for them again. + */ + binlog_unsafe_warning_flags|= + unsafe_type_flags << LEX::BINLOG_STMT_UNSAFE_COUNT; + DBUG_VOID_RETURN; +} + + +/** + Log the current query. + + The query will be logged in either row format or statement format + depending on the value of @c current_stmt_binlog_format_row field and + the value of the @c qtype parameter. + + This function must be called: + + - After the all calls to ha_*_row() functions have been issued. + + - After any writes to system tables. Rationale: if system tables + were written after a call to this function, and the master crashes + after the call to this function and before writing the system + tables, then the master and slave get out of sync. + + - Before tables are unlocked and closed. + + @see decide_logging_format + + @retval 0 Success + + @retval nonzero If there is a failure when writing the query (e.g., + write failure), then the error code is returned. */ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, - ulong query_len, bool is_trans, bool suppress_use, - int errcode) + ulong query_len, bool is_trans, bool direct, + bool suppress_use, int errcode) { DBUG_ENTER("THD::binlog_query"); DBUG_PRINT("enter", ("qtype: %s query: '%s'", @@ -3961,59 +4417,53 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, top-most close_thread_tables(). */ if (this->locked_tables_mode <= LTM_LOCK_TABLES) - if (int error= binlog_flush_pending_rows_event(TRUE)) + if (int error= binlog_flush_pending_rows_event(TRUE, is_trans)) DBUG_RETURN(error); /* - If we are in statement mode and trying to log an unsafe statement, - we should print a warning. + Warnings for unsafe statements logged in statement format are + printed here instead of in decide_logging_format(). This is + because the warnings should be printed only if the statement is + actually logged. When executing decide_logging_format(), we cannot + know for sure if the statement will be logged. */ - if (sql_log_bin_toplevel && lex->is_stmt_unsafe() && - variables.binlog_format == BINLOG_FORMAT_STMT && - binlog_filter->db_ok(this->db)) - { - /* - A warning can be elevated a error when STRICT sql mode. - But we don't want to elevate binlog warning to error here. - */ - push_warning(this, MYSQL_ERROR::WARN_LEVEL_NOTE, - ER_BINLOG_UNSAFE_STATEMENT, - ER(ER_BINLOG_UNSAFE_STATEMENT)); - if (global_system_variables.log_warnings && - !(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED)) - { - sql_print_warning("%s Statement: %.*s", - ER(ER_BINLOG_UNSAFE_STATEMENT), - MYSQL_ERRMSG_SIZE, query_arg); - binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED; - } - } + if (sql_log_bin_toplevel) + issue_unsafe_warnings(); switch (qtype) { + /* + ROW_QUERY_TYPE means that the statement may be logged either in + row format or in statement format. If + current_stmt_binlog_format is row, it means that the + statement has already been logged in row format and hence shall + not be logged again. + */ case THD::ROW_QUERY_TYPE: DBUG_PRINT("debug", - ("current_stmt_binlog_row_based: %d", - current_stmt_binlog_row_based)); - if (current_stmt_binlog_row_based) + ("is_current_stmt_binlog_format_row: %d", + is_current_stmt_binlog_format_row())); + if (is_current_stmt_binlog_format_row()) DBUG_RETURN(0); - /* Otherwise, we fall through */ - case THD::MYSQL_QUERY_TYPE: - /* - Using this query type is a conveniece hack, since we have been - moving back and forth between using RBR for replication of - system tables and not using it. + /* Fall through */ - Make sure to change in check_table_binlog_row_based() according - to how you treat this. + /* + STMT_QUERY_TYPE means that the query must be logged in statement + format; it cannot be logged in row format. This is typically + used by DDL statements. It is an error to use this query type + if current_stmt_binlog_format_row is row. + + @todo Currently there are places that call this method with + STMT_QUERY_TYPE and current_stmt_binlog_format is row. Fix those + places and add assert to ensure correct behavior. /Sven */ case THD::STMT_QUERY_TYPE: /* The MYSQL_LOG::write() function will set the STMT_END_F flag and flush the pending rows event if necessary. - */ + */ { - Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use, - errcode); + Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, + suppress_use, errcode); qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F; /* Binlog table maps will be irrelevant after a Query_log_event diff --git a/sql/sql_class.h b/sql/sql_class.h index 51aad24b3a7..b93ef4d5492 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -48,6 +48,8 @@ enum enum_delay_key_write { DELAY_KEY_WRITE_NONE, DELAY_KEY_WRITE_ON, enum enum_slave_exec_mode { SLAVE_EXEC_MODE_STRICT, SLAVE_EXEC_MODE_IDEMPOTENT, SLAVE_EXEC_MODE_LAST_BIT}; +enum enum_slave_type_conversions { SLAVE_TYPE_CONVERSIONS_ALL_LOSSY, + SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY}; enum enum_mark_columns { MARK_COLUMNS_NONE, MARK_COLUMNS_READ, MARK_COLUMNS_WRITE}; enum enum_filetype { FILETYPE_CSV, FILETYPE_XML }; @@ -1467,6 +1469,7 @@ public: /* Used to execute base64 coded binlog events in MySQL server */ Relay_log_info* rli_fake; + void reset_for_next_command(); /* Constant for THD::where initialization in the beginning of every query. @@ -1641,32 +1644,81 @@ public: size_t needed, bool is_transactional, RowsEventT* hint); - Rows_log_event* binlog_get_pending_rows_event() const; - void binlog_set_pending_rows_event(Rows_log_event* ev); - int binlog_flush_pending_rows_event(bool stmt_end); - int binlog_remove_pending_rows_event(bool clear_maps); + Rows_log_event* binlog_get_pending_rows_event(bool is_transactional) const; + void binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional); + inline int binlog_flush_pending_rows_event(bool stmt_end) + { + return (binlog_flush_pending_rows_event(stmt_end, FALSE) || + binlog_flush_pending_rows_event(stmt_end, TRUE)); + } + int binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional); + int binlog_remove_pending_rows_event(bool clear_maps, bool is_transactional); + + /** + Determine the binlog format of the current statement. + + @retval 0 if the current statement will be logged in statement + format. + @retval nonzero if the current statement will be logged in row + format. + */ + int is_current_stmt_binlog_format_row() const { + DBUG_ASSERT(current_stmt_binlog_format == BINLOG_FORMAT_STMT || + current_stmt_binlog_format == BINLOG_FORMAT_ROW); + return current_stmt_binlog_format == BINLOG_FORMAT_ROW; + } private: + /** + Indicates the format in which the current statement will be + logged. This can only be set from @c decide_logging_format(). + */ + enum_binlog_format current_stmt_binlog_format; + + /** + Bit field for the state of binlog warnings. + + There are two groups of bits: + + - The first Lex::BINLOG_STMT_UNSAFE_COUNT bits list all types of + unsafeness that the current statement has. + + - The following Lex::BINLOG_STMT_UNSAFE_COUNT bits list all types + of unsafeness that the current statement has issued warnings + for. + + Hence, this variable must be big enough to hold + 2*Lex::BINLOG_STMT_UNSAFE_COUNT bits. This is asserted in @c + issue_unsafe_warnings(). + + The first and second groups of bits are set by @c + decide_logging_format() when it detects that a warning should be + issued. The third group of bits is set from @c binlog_query() + when a warning is issued. All bits are cleared at the end of the + top-level statement. + + This must be a member of THD and not of LEX, because warnings are + detected and issued in different places (@c + decide_logging_format() and @c binlog_query(), respectively). + Between these calls, the THD->lex object may change; e.g., if a + stored routine is invoked. Only THD persists between the calls. + */ + uint32 binlog_unsafe_warning_flags; + + void issue_unsafe_warnings(); + /* Number of outstanding table maps, i.e., table maps in the transaction cache. */ uint binlog_table_maps; - - enum enum_binlog_flag { - BINLOG_FLAG_UNSAFE_STMT_PRINTED, - BINLOG_FLAG_COUNT - }; - - /** - Flags with per-thread information regarding the status of the - binary log. - */ - uint32 binlog_flags; public: uint get_binlog_table_maps() const { return binlog_table_maps; } + void clear_binlog_table_maps() { + binlog_table_maps= 0; + } #endif /* MYSQL_CLIENT */ public: @@ -2110,27 +2162,18 @@ public: #ifndef MYSQL_CLIENT enum enum_binlog_query_type { - /* - The query can be logged row-based or statement-based - */ + /* The query can be logged in row format or in statement format. */ ROW_QUERY_TYPE, - /* - The query has to be logged statement-based - */ + /* The query has to be logged in statement format. */ STMT_QUERY_TYPE, - /* - The query represents a change to a table in the "mysql" - database and is currently mapped to ROW_QUERY_TYPE. - */ - MYSQL_QUERY_TYPE, QUERY_TYPE_COUNT }; int binlog_query(enum_binlog_query_type qtype, - char const *query, ulong query_len, - bool is_trans, bool suppress_use, + char const *query, ulong query_len, bool is_trans, + bool direct, bool suppress_use, int errcode); #endif @@ -2351,31 +2394,51 @@ public: void set_n_backup_active_arena(Query_arena *set, Query_arena *backup); void restore_active_arena(Query_arena *set, Query_arena *backup); - inline void set_current_stmt_binlog_row_based_if_mixed() + /* + @todo Make these methods private or remove them completely. Only + decide_logging_format should call them. /Sven + */ + inline void set_current_stmt_binlog_format_row_if_mixed() { + DBUG_ENTER("set_current_stmt_binlog_format_row_if_mixed"); + /* + This should only be called from decide_logging_format. + + @todo Once we have ensured this, uncomment the following + statement, remove the big comment below that, and remove the + in_sub_stmt==0 condition from the following 'if'. + */ + /* DBUG_ASSERT(in_sub_stmt == 0); */ /* If in a stored/function trigger, the caller should already have done the change. We test in_sub_stmt to prevent introducing bugs where people wouldn't ensure that, and would switch to row-based mode in the middle of executing a stored function/trigger (which is too late, see also - reset_current_stmt_binlog_row_based()); this condition will make their + reset_current_stmt_binlog_format_row()); this condition will make their tests fail and so force them to propagate the lex->binlog_row_based_if_mixed upwards to the caller. */ if ((variables.binlog_format == BINLOG_FORMAT_MIXED) && (in_sub_stmt == 0)) - current_stmt_binlog_row_based= TRUE; + set_current_stmt_binlog_format_row(); + + DBUG_VOID_RETURN; } - inline void set_current_stmt_binlog_row_based() + inline void set_current_stmt_binlog_format_row() { - current_stmt_binlog_row_based= TRUE; + DBUG_ENTER("set_current_stmt_binlog_format_row"); + current_stmt_binlog_format= BINLOG_FORMAT_ROW; + DBUG_VOID_RETURN; } - inline void clear_current_stmt_binlog_row_based() + inline void clear_current_stmt_binlog_format_row() { - current_stmt_binlog_row_based= FALSE; + DBUG_ENTER("clear_current_stmt_binlog_format_row"); + current_stmt_binlog_format= BINLOG_FORMAT_STMT; + DBUG_VOID_RETURN; } - inline void reset_current_stmt_binlog_row_based() + inline void reset_current_stmt_binlog_format_row() { + DBUG_ENTER("reset_current_stmt_binlog_format_row"); /* If there are temporary tables, don't reset back to statement-based. Indeed it could be that: @@ -2390,19 +2453,19 @@ public: or trigger is decided when it starts executing, depending for example on the caller (for a stored function: if caller is SELECT or INSERT/UPDATE/DELETE...). - - Don't reset binlog format for NDB binlog injector thread. */ DBUG_PRINT("debug", ("temporary_tables: %s, in_sub_stmt: %s, system_thread: %s", YESNO(temporary_tables), YESNO(in_sub_stmt), show_system_thread(system_thread))); - if ((temporary_tables == NULL) && (in_sub_stmt == 0) && - (system_thread != SYSTEM_THREAD_NDBCLUSTER_BINLOG)) + if ((temporary_tables == NULL) && (in_sub_stmt == 0)) { - current_stmt_binlog_row_based= - test(variables.binlog_format == BINLOG_FORMAT_ROW); + if (variables.binlog_format == BINLOG_FORMAT_ROW) + set_current_stmt_binlog_format_row(); + else + clear_current_stmt_binlog_format_row(); } + DBUG_VOID_RETURN; } /** @@ -2614,7 +2677,9 @@ public: /* Make sure we don't release the global read lock when leaving LTM. */ mdl_context.reset_trans_sentinel(global_read_lock.global_shared_lock()); } + int decide_logging_format(TABLE_LIST *tables); private: + /** The current internal error handler for this thread, or NULL. */ Internal_error_handler *m_internal_handler; /** diff --git a/sql/sql_db.cc b/sql/sql_db.cc index f10ac39d054..95db9cb3681 100644 --- a/sql/sql_db.cc +++ b/sql/sql_db.cc @@ -181,7 +181,7 @@ uchar* dboptions_get_key(my_dbopt_t *opt, size_t *length, static inline int write_to_binlog(THD *thd, char *query, uint q_len, char *db, uint db_len) { - Query_log_event qinfo(thd, query, q_len, 0, 0, 0); + Query_log_event qinfo(thd, query, q_len, FALSE, TRUE, FALSE, 0); qinfo.db= db; qinfo.db_len= db_len; return mysql_bin_log.write(&qinfo); @@ -750,7 +750,7 @@ not_silent: if (mysql_bin_log.is_open()) { int errcode= query_error_code(thd, TRUE); - Query_log_event qinfo(thd, query, query_length, 0, + Query_log_event qinfo(thd, query, query_length, FALSE, TRUE, /* suppress_use */ TRUE, errcode); /* @@ -842,10 +842,9 @@ bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info) if (mysql_bin_log.is_open()) { - thd->clear_error(); - Query_log_event qinfo(thd, thd->query(), thd->query_length(), 0, - /* suppress_use */ TRUE, 0); - + int errcode= query_error_code(thd, TRUE); + Query_log_event qinfo(thd, thd->query(), thd->query_length(), FALSE, TRUE, + /* suppress_use */ TRUE, errcode); /* Write should use the database being created as the "current database" and not the threads current database, which is the @@ -990,9 +989,9 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent) } if (mysql_bin_log.is_open()) { - thd->clear_error(); - Query_log_event qinfo(thd, query, query_length, 0, - /* suppress_use */ TRUE, 0); + int errcode= query_error_code(thd, TRUE); + Query_log_event qinfo(thd, query, query_length, FALSE, TRUE, + /* suppress_use */ TRUE, errcode); /* Write should use the database being created as the "current database" and not the threads current database, which is the @@ -2009,7 +2008,7 @@ bool mysql_upgrade_db(THD *thd, LEX_STRING *old_db) { int errcode= query_error_code(thd, TRUE); Query_log_event qinfo(thd, thd->query(), thd->query_length(), - 0, TRUE, errcode); + FALSE, TRUE, TRUE, errcode); thd->clear_error(); error|= mysql_bin_log.write(&qinfo); } diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc index dcb23add037..0f84f5e9d73 100644 --- a/sql/sql_delete.cc +++ b/sql/sql_delete.cc @@ -132,7 +132,7 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, if (!using_limit && const_cond_result && !(specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) && (thd->lex->sql_command == SQLCOM_TRUNCATE || - (!thd->current_stmt_binlog_row_based && + (!thd->is_current_stmt_binlog_format_row() && !(table->triggers && table->triggers->has_delete_triggers())))) { /* Update the table->file->stats.records number */ @@ -386,7 +386,8 @@ cleanup: transactional_table= table->file->has_transactions(); if (!transactional_table && deleted > 0) - thd->transaction.stmt.modified_non_trans_table= TRUE; + thd->transaction.stmt.modified_non_trans_table= + thd->transaction.all.modified_non_trans_table= TRUE; /* See similar binlogging code in sql_update.cc, for comments */ if ((error < 0) || thd->transaction.stmt.modified_non_trans_table) @@ -415,15 +416,13 @@ cleanup: */ int log_result= thd->binlog_query(query_type, thd->query(), thd->query_length(), - is_trans, FALSE, errcode); + is_trans, FALSE, FALSE, errcode); if (log_result) { error=1; } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } DBUG_ASSERT(transactional_table || !deleted || thd->transaction.stmt.modified_non_trans_table); free_underlaid_joins(thd, select_lex); @@ -462,19 +461,6 @@ int mysql_prepare_delete(THD *thd, TABLE_LIST *table_list, Item **conds) DBUG_ENTER("mysql_prepare_delete"); List<Item> all_fields; - /* - Statement-based replication of DELETE ... LIMIT is not safe as order of - rows is not defined, so in mixed mode we go to row-based. - - Note that we may consider a statement as safe if ORDER BY primary_key - is present. However it may confuse users to see very similiar statements - replicated differently. - */ - if (thd->lex->current_select->select_limit) - { - thd->lex->set_stmt_unsafe(); - thd->set_current_stmt_binlog_row_based_if_mixed(); - } thd->lex->allow_sum_func= 0; if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context, &thd->lex->select_lex.top_join_list, @@ -823,6 +809,9 @@ void multi_delete::abort() if (deleted) query_cache_invalidate3(thd, delete_tables, 1); + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + /* If rows from the first table only has been deleted and it is transactional, just do rollback. @@ -853,10 +842,9 @@ void multi_delete::abort() int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); /* possible error of writing binary log is ignored deliberately */ (void) thd->binlog_query(THD::ROW_QUERY_TYPE, - thd->query(), thd->query_length(), - transactional_tables, FALSE, errcode); + thd->query(), thd->query_length(), + transactional_tables, FALSE, FALSE, errcode); } - thd->transaction.all.modified_non_trans_table= true; } DBUG_VOID_RETURN; } @@ -1009,6 +997,9 @@ bool multi_delete::send_eof() /* reset used flags */ thd_proc_info(thd, "end"); + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + /* We must invalidate the query cache before binlog writing and ha_autocommit_... @@ -1028,14 +1019,12 @@ bool multi_delete::send_eof() errcode= query_error_code(thd, killed_status == THD::NOT_KILLED); if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), thd->query_length(), - transactional_tables, FALSE, errcode) && + transactional_tables, FALSE, FALSE, errcode) && !normal_tables) { local_error=1; // Log write failed: roll back the SQL statement } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } if (local_error != 0) error_handled= TRUE; // to force early leave from ::send_error() @@ -1060,11 +1049,11 @@ bool multi_delete::send_eof() static bool mysql_truncate_by_delete(THD *thd, TABLE_LIST *table_list) { - bool error, save_binlog_row_based= thd->current_stmt_binlog_row_based; + bool error, save_binlog_row_based= thd->is_current_stmt_binlog_format_row(); DBUG_ENTER("mysql_truncate_by_delete"); table_list->lock_type= TL_WRITE; mysql_init_select(thd->lex); - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); /* Delete all rows from table */ error= mysql_delete(thd, table_list, NULL, NULL, HA_POS_ERROR, LL(0), TRUE); /* @@ -1077,7 +1066,8 @@ static bool mysql_truncate_by_delete(THD *thd, TABLE_LIST *table_list) trans_rollback_stmt(thd); trans_rollback(thd); } - thd->current_stmt_binlog_row_based= save_binlog_row_based; + if (save_binlog_row_based) + thd->set_current_stmt_binlog_format_row(); DBUG_RETURN(error); } @@ -1254,10 +1244,10 @@ end: if (!error) { /* In RBR, the statement is not binlogged if the table is temporary. */ - if (!is_temporary_table || !thd->current_stmt_binlog_row_based) + if (!is_temporary_table || !thd->is_current_stmt_binlog_format_row()) error= write_bin_log(thd, TRUE, thd->query(), thd->query_length()); if (!error) - my_ok(thd); // This should return record count + my_ok(thd); // This should return record count } if (has_mdl_lock) thd->mdl_context.release_transactional_locks(); diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index bfd5dcc0fae..a207ce09de7 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -910,6 +910,10 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, */ query_cache_invalidate3(thd, table_list, 1); } + + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + if ((changed && error <= 0) || thd->transaction.stmt.modified_non_trans_table || was_insert_delayed) @@ -948,15 +952,13 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, */ DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0); if (thd->binlog_query(THD::ROW_QUERY_TYPE, - thd->query(), thd->query_length(), - transactional_table, FALSE, - errcode)) + thd->query(), thd->query_length(), + transactional_table, FALSE, FALSE, + errcode)) { - error=1; + error= 1; } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } DBUG_ASSERT(transactional_table || !changed || thd->transaction.stmt.modified_non_trans_table); @@ -1796,6 +1798,7 @@ public: :locks_in_memory(0), table(0),tables_in_use(0),stacked_inserts(0), status(0), group_count(0) { + DBUG_ENTER("Delayed_insert constructor"); thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user; thd.security_ctx->host=(char*) my_localhost; thd.current_tablenr=0; @@ -1804,11 +1807,21 @@ public: thd.lex->current_select= 0; // for my_message_sql thd.lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock() /* - Statement-based replication of INSERT DELAYED has problems with RAND() - and user vars, so in mixed mode we go to row-based. + Statement-based replication of INSERT DELAYED has problems with + RAND() and user variables, so in mixed mode we go to row-based. + For normal commands, the unsafe flag is set at parse time. + However, since the flag is a member of the THD object, of which + the delayed_insert thread has its own copy, we must set the + statement to unsafe here and explicitly set row logging mode. + + @todo set_current_stmt_binlog_format_row_if_mixed should not be + called by anything else than thd->decide_logging_format(). When + we call set_current_blah here, none of the checks in + decide_logging_format is made. We should probably call + thd->decide_logging_format() directly instead. /Sven */ - thd.lex->set_stmt_unsafe(); - thd.set_current_stmt_binlog_row_based_if_mixed(); + thd.lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_DELAYED); + thd.set_current_stmt_binlog_format_row_if_mixed(); bzero((char*) &thd.net, sizeof(thd.net)); // Safety bzero((char*) &table_list, sizeof(table_list)); // Safety @@ -1823,6 +1836,7 @@ public: delayed_lock= global_system_variables.low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE; mysql_mutex_unlock(&LOCK_thread_count); + DBUG_VOID_RETURN; } ~Delayed_insert() { @@ -2459,8 +2473,8 @@ pthread_handler_t handle_delayed_insert(void *arg) Statement-based replication of INSERT DELAYED has problems with RAND() and user vars, so in mixed mode we go to row-based. */ - thd->lex->set_stmt_unsafe(); - thd->set_current_stmt_binlog_row_based_if_mixed(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_DELAYED); + thd->set_current_stmt_binlog_format_row_if_mixed(); init_mdl_requests(&di->table_list); @@ -2678,6 +2692,7 @@ bool Delayed_insert::handle_inserts(void) { int error; ulong max_rows; + bool has_trans = TRUE; bool using_ignore= 0, using_opt_replace= 0, using_bin_log= mysql_bin_log.is_open(); delayed_row *row; @@ -2830,9 +2845,9 @@ bool Delayed_insert::handle_inserts(void) */ if (thd.binlog_query(THD::ROW_QUERY_TYPE, row->query.str, row->query.length, - FALSE, FALSE, errcode)) + FALSE, FALSE, FALSE, errcode)) goto err; - + thd.time_zone_used = backup_time_zone_used; thd.variables.time_zone = backup_time_zone; } @@ -2905,9 +2920,11 @@ bool Delayed_insert::handle_inserts(void) or trigger. TODO: Move the logging to last in the sequence of rows. - */ - if (thd.current_stmt_binlog_row_based && - thd.binlog_flush_pending_rows_event(TRUE)) + */ + has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE || + table->file->has_transactions(); + if (thd.is_current_stmt_binlog_format_row() && + thd.binlog_flush_pending_rows_event(TRUE, has_trans)) goto err; if ((error=table->file->extra(HA_EXTRA_NO_CACHE))) @@ -2972,19 +2989,6 @@ bool mysql_insert_select_prepare(THD *thd) DBUG_ENTER("mysql_insert_select_prepare"); /* - Statement-based replication of INSERT ... SELECT ... LIMIT is not safe - as order of rows is not defined, so in mixed mode we go to row-based. - - Note that we may consider a statement as safe if ORDER BY primary_key - is present or we SELECT a constant. However it may confuse users to - see very similiar statements replicated differently. - */ - if (lex->current_select->select_limit) - { - lex->set_stmt_unsafe(); - thd->set_current_stmt_binlog_row_based_if_mixed(); - } - /* SELECT_LEX do not belong to INSERT statement, so we can't add WHERE clause if table is VIEW */ @@ -3350,9 +3354,11 @@ bool select_insert::send_eof() and ha_autocommit_or_rollback. */ query_cache_invalidate3(thd, table, 1); - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } + + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + DBUG_ASSERT(trans_table || !changed || thd->transaction.stmt.modified_non_trans_table); @@ -3372,7 +3378,7 @@ bool select_insert::send_eof() errcode= query_error_code(thd, killed_status == THD::NOT_KILLED); if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), thd->query_length(), - trans_table, FALSE, errcode)) + trans_table, FALSE, FALSE, errcode)) { table->file->ha_release_auto_increment(); DBUG_RETURN(1); @@ -3444,16 +3450,17 @@ void select_insert::abort() { transactional_table= table->file->has_transactions(); if (thd->transaction.stmt.modified_non_trans_table) { + if (!can_rollback_data()) + thd->transaction.all.modified_non_trans_table= TRUE; + if (mysql_bin_log.is_open()) { int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); /* error of writing binary log is ignored */ (void) thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), thd->query_length(), - transactional_table, FALSE, errcode); + transactional_table, FALSE, FALSE, errcode); } - if (!thd->current_stmt_binlog_row_based && !can_rollback_data()) - thd->transaction.all.modified_non_trans_table= TRUE; if (changed) query_cache_invalidate3(thd, table, 1); } @@ -3708,7 +3715,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) create_table->next_global= select_tables; - error= decide_logging_format(thd, create_table); + error= thd->decide_logging_format(create_table); create_table->next_global= save_next_global; @@ -3716,7 +3723,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) return error; TABLE const *const table = *tables; - if (thd->current_stmt_binlog_row_based && + if (thd->is_current_stmt_binlog_format_row() && !table->s->tmp_table && !ptr->get_create_info()->table_existed) { @@ -3742,7 +3749,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) temporary table, we need to start a statement transaction. */ if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 && - thd->current_stmt_binlog_row_based && + thd->is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()) { thd->binlog_start_trans_and_stmt(); @@ -3760,7 +3767,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR), create_table->table_name); - if (thd->current_stmt_binlog_row_based) + if (thd->is_current_stmt_binlog_format_row()) binlog_show_create_table(&(create_table->table), 1); table= create_table->table; } @@ -3848,7 +3855,7 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) schema that will do a close_thread_tables(), destroying the statement transaction cache. */ - DBUG_ASSERT(thd->current_stmt_binlog_row_based); + DBUG_ASSERT(thd->is_current_stmt_binlog_format_row()); DBUG_ASSERT(tables && *tables && count > 0); char buf[2048]; @@ -3870,6 +3877,7 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) result= thd->binlog_query(THD::STMT_QUERY_TYPE, query.ptr(), query.length(), /* is_trans */ TRUE, + /* direct */ FALSE, /* suppress_use */ FALSE, errcode); } @@ -3889,7 +3897,7 @@ void select_create::send_error(uint errcode,const char *err) DBUG_PRINT("info", ("Current statement %s row-based", - thd->current_stmt_binlog_row_based ? "is" : "is NOT")); + thd->is_current_stmt_binlog_format_row() ? "is" : "is NOT")); DBUG_PRINT("info", ("Current table (at 0x%lu) %s a temporary (or non-existant) table", (ulong) table, @@ -3972,7 +3980,7 @@ void select_create::abort() thd->transaction.stmt.modified_non_trans_table= FALSE; reenable_binlog(thd); /* possible error of writing binary log is ignored deliberately */ - (void)thd->binlog_flush_pending_rows_event(TRUE); + (void) thd->binlog_flush_pending_rows_event(TRUE, TRUE); if (m_plock) { diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc index 64eb1d2b1a7..0d423ba85eb 100644 --- a/sql/sql_lex.cc +++ b/sql/sql_lex.cc @@ -38,6 +38,23 @@ sys_var *trg_new_row_fake_var= (sys_var*) 0x01; */ const LEX_STRING null_lex_str= {NULL, 0}; const LEX_STRING empty_lex_str= { (char*) "", 0 }; +/** + @note The order of the elements of this array must correspond to + the order of elements in enum_binlog_stmt_unsafe. +*/ +const int +Query_tables_list::binlog_stmt_unsafe_errcode[BINLOG_STMT_UNSAFE_COUNT] = +{ + ER_BINLOG_UNSAFE_LIMIT, + ER_BINLOG_UNSAFE_INSERT_DELAYED, + ER_BINLOG_UNSAFE_SYSTEM_TABLE, + ER_BINLOG_UNSAFE_TWO_AUTOINC_COLUMNS, + ER_BINLOG_UNSAFE_UDF, + ER_BINLOG_UNSAFE_SYSTEM_VARIABLE, + ER_BINLOG_UNSAFE_SYSTEM_FUNCTION, + ER_BINLOG_UNSAFE_NONTRANS_AFTER_TRANS +}; + /* Longest standard keyword name */ diff --git a/sql/sql_lex.h b/sql/sql_lex.h index 1e6c3ce7db3..0cf188c31da 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -1088,24 +1088,154 @@ public: } /** - Has the parser/scanner detected that this statement is unsafe? - */ + Enumeration listing of all types of unsafe statement. + + @note The order of elements of this enumeration type must + correspond to the order of the elements of the @c explanations + array defined in the body of @c THD::issue_unsafe_warnings. + */ + enum enum_binlog_stmt_unsafe { + /** + SELECT..LIMIT is unsafe because the set of rows returned cannot + be predicted. + */ + BINLOG_STMT_UNSAFE_LIMIT= 0, + /** + INSERT DELAYED is unsafe because the time when rows are inserted + cannot be predicted. + */ + BINLOG_STMT_UNSAFE_INSERT_DELAYED, + /** + Access to log tables is unsafe because slave and master probably + log different things. + */ + BINLOG_STMT_UNSAFE_SYSTEM_TABLE, + /** + Update of two autoincrement columns is unsafe. With one + autoincrement column, we store the counter in the binlog so that + slave can restore the correct value. But we can only store one + such counter per statement, so updating more than one + autoincrement column is not safe. + */ + BINLOG_STMT_UNSAFE_TWO_AUTOINC_COLUMNS, + /** + Using a UDF (user-defined function) is unsafe. + */ + BINLOG_STMT_UNSAFE_UDF, + /** + Using most system variables is unsafe, because slave may run + with different options than master. + */ + BINLOG_STMT_UNSAFE_SYSTEM_VARIABLE, + /** + Using some functions is unsafe (e.g., UUID). + */ + BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION, + + /** + Mixing transactional and non-transactional statements are unsafe if + non-transactional reads or writes are occur after transactional + reads or writes inside a transaction. + */ + BINLOG_STMT_UNSAFE_NONTRANS_AFTER_TRANS, + + /* The last element of this enumeration type. */ + BINLOG_STMT_UNSAFE_COUNT + }; + /** + This has all flags from 0 (inclusive) to BINLOG_STMT_FLAG_COUNT + (exclusive) set. + */ + static const int BINLOG_STMT_UNSAFE_ALL_FLAGS= + ((1 << BINLOG_STMT_UNSAFE_COUNT) - 1); + + /** + Maps elements of enum_binlog_stmt_unsafe to error codes. + */ + static const int binlog_stmt_unsafe_errcode[BINLOG_STMT_UNSAFE_COUNT]; + + /** + Determine if this statement is marked as unsafe. + + @retval 0 if the statement is not marked as unsafe. + @retval nonzero if the statement is marked as unsafe. + */ inline bool is_stmt_unsafe() const { - return binlog_stmt_flags & (1U << BINLOG_STMT_FLAG_UNSAFE); + return get_stmt_unsafe_flags() != 0; } /** - Flag the current (top-level) statement as unsafe. + Flag the current (top-level) statement as unsafe. + The flag will be reset after the statement has finished. - The flag will be reset after the statement has finished. + @param unsafe_type The type of unsafety: one of the @c + BINLOG_STMT_FLAG_UNSAFE_* flags in @c enum_binlog_stmt_flag. + */ + inline void set_stmt_unsafe(enum_binlog_stmt_unsafe unsafe_type) { + DBUG_ENTER("set_stmt_unsafe"); + DBUG_ASSERT(unsafe_type >= 0 && unsafe_type < BINLOG_STMT_UNSAFE_COUNT); + binlog_stmt_flags|= (1U << unsafe_type); + DBUG_VOID_RETURN; + } - */ - inline void set_stmt_unsafe() { - binlog_stmt_flags|= (1U << BINLOG_STMT_FLAG_UNSAFE); + /** + Set the bits of binlog_stmt_flags determining the type of + unsafeness of the current statement. No existing bits will be + cleared, but new bits may be set. + + @param flags A binary combination of zero or more bits, (1<<flag) + where flag is a member of enum_binlog_stmt_unsafe. + */ + inline void set_stmt_unsafe_flags(uint32 flags) { + DBUG_ENTER("set_stmt_unsafe_flags"); + DBUG_ASSERT((flags & ~BINLOG_STMT_UNSAFE_ALL_FLAGS) == 0); + binlog_stmt_flags|= flags; + DBUG_VOID_RETURN; + } + + /** + Return a binary combination of all unsafe warnings for the + statement. If the statement has been marked as unsafe by the + 'flag' member of enum_binlog_stmt_unsafe, then the return value + from this function has bit (1<<flag) set to 1. + */ + inline uint32 get_stmt_unsafe_flags() const { + DBUG_ENTER("get_stmt_unsafe_flags"); + DBUG_RETURN(binlog_stmt_flags & BINLOG_STMT_UNSAFE_ALL_FLAGS); } + /** + Mark the current statement as safe; i.e., clear all bits in + binlog_stmt_flags that correspond to elements of + enum_binlog_stmt_unsafe. + */ inline void clear_stmt_unsafe() { - binlog_stmt_flags&= ~(1U << BINLOG_STMT_FLAG_UNSAFE); + DBUG_ENTER("clear_stmt_unsafe"); + binlog_stmt_flags&= ~BINLOG_STMT_UNSAFE_ALL_FLAGS; + DBUG_VOID_RETURN; + } + + /** + Determine if this statement is a row injection. + + @retval 0 if the statement is not a row injection + @retval nonzero if the statement is a row injection + */ + inline bool is_stmt_row_injection() const { + return binlog_stmt_flags & + (1U << (BINLOG_STMT_UNSAFE_COUNT + BINLOG_STMT_TYPE_ROW_INJECTION)); + } + + /** + Flag the statement as a row injection. A row injection is either + a BINLOG statement, or a row event in the relay log executed by + the slave SQL thread. + */ + inline void set_stmt_row_injection() { + DBUG_ENTER("set_stmt_row_injection"); + binlog_stmt_flags|= + (1U << (BINLOG_STMT_UNSAFE_COUNT + BINLOG_STMT_TYPE_ROW_INJECTION)); + DBUG_VOID_RETURN; } /** @@ -1116,16 +1246,37 @@ public: { return sroutines_list.elements != 0; } private: - enum enum_binlog_stmt_flag { - BINLOG_STMT_FLAG_UNSAFE, - BINLOG_STMT_FLAG_COUNT + + /** + Enumeration listing special types of statements. + + Currently, the only possible type is ROW_INJECTION. + */ + enum enum_binlog_stmt_type { + /** + The statement is a row injection (i.e., either a BINLOG + statement or a row event executed by the slave SQL thread). + */ + BINLOG_STMT_TYPE_ROW_INJECTION = 0, + + /** The last element of this enumeration type. */ + BINLOG_STMT_TYPE_COUNT }; - /* - Tells if the parsing stage detected properties of the statement, - for example: that some items require row-based binlogging to give - a reliable binlog/replication, or if we will use stored functions - or triggers which themselves need require row-based binlogging. + /** + Bit field indicating the type of statement. + + There are two groups of bits: + + - The low BINLOG_STMT_UNSAFE_COUNT bits indicate the types of + unsafeness that the current statement has. + + - The next BINLOG_STMT_TYPE_COUNT bits indicate if the statement + is of some special type. + + This must be a member of LEX, not of THD: each stored procedure + needs to remember its unsafeness state between calls and each + stored procedure has its own LEX object (but no own THD object). */ uint32 binlog_stmt_flags; }; diff --git a/sql/sql_load.cc b/sql/sql_load.cc index 39f4c1d279a..e86aa42f67c 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -582,8 +582,8 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, version for the binary log to mark that table maps are invalid after this point. */ - if (thd->current_stmt_binlog_row_based) - error= thd->binlog_flush_pending_rows_event(true); + if (thd->is_current_stmt_binlog_format_row()) + error= thd->binlog_flush_pending_rows_event(TRUE, transactional_table); else { /* @@ -731,7 +731,7 @@ static bool write_execute_load_query_log_event(THD *thd, sql_exchange* ex, (uint) ((char*) fname_end - (char*) thd->query()), (duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE : (ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR), - transactional_table, FALSE, errcode); + transactional_table, FALSE, FALSE, errcode); e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F; return mysql_bin_log.write(&e); } diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 977d0f5c276..be6081f9f12 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -5258,21 +5258,26 @@ bool my_yyoverflow(short **yyss, YYSTYPE **yyvs, ulong *yystacksize) /** - Reset THD part responsible for command processing state. + Reset the part of THD responsible for the state of command + processing. - This needs to be called before execution of every statement - (prepared or conventional). - It is not called by substatements of routines. + This needs to be called before execution of every statement + (prepared or conventional). It is not called by substatements of + routines. - @todo - Make it a method of THD and align its name with the rest of - reset/end/start/init methods. - @todo - Call it after we use THD for queries, not before. -*/ + @todo Remove mysql_reset_thd_for_next_command and only use the + member function. + @todo Call it after we use THD for queries, not before. +*/ void mysql_reset_thd_for_next_command(THD *thd) { + thd->reset_for_next_command(); +} + +void THD::reset_for_next_command() +{ + THD *thd= this; DBUG_ENTER("mysql_reset_thd_for_next_command"); DBUG_ASSERT(!thd->spcont); /* not for substatements of routines */ DBUG_ASSERT(! thd->in_sub_stmt); @@ -5316,15 +5321,12 @@ void mysql_reset_thd_for_next_command(THD *thd) thd->rand_used= 0; thd->sent_row_count= thd->examined_row_count= 0; - /* - Because we come here only for start of top-statements, binlog format is - constant inside a complex statement (using stored functions) etc. - */ - thd->reset_current_stmt_binlog_row_based(); + thd->reset_current_stmt_binlog_format_row(); + thd->binlog_unsafe_warning_flags= 0; DBUG_PRINT("debug", - ("current_stmt_binlog_row_based: %d", - thd->current_stmt_binlog_row_based)); + ("is_current_stmt_binlog_format_row(): %d", + thd->is_current_stmt_binlog_format_row())); DBUG_VOID_RETURN; } @@ -6461,6 +6463,30 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables, tables. */ + options|= REFRESH_BINARY_LOG; + options|= REFRESH_RELAY_LOG; + options|= REFRESH_SLOW_LOG; + options|= REFRESH_GENERAL_LOG; + options|= REFRESH_ENGINE_LOG; + options|= REFRESH_ERROR_LOG; + } + + if (options & REFRESH_ERROR_LOG) + if (flush_error_log()) + result= 1; + + if ((options & REFRESH_SLOW_LOG) && opt_slow_log) + logger.flush_slow_log(); + + if ((options & REFRESH_GENERAL_LOG) && opt_log) + logger.flush_general_log(); + + if (options & REFRESH_ENGINE_LOG) + if (ha_flush_logs(NULL)) + result= 1; + + if (options & REFRESH_BINARY_LOG) + { /* Writing this command to the binlog may result in infinite loops when doing mysqlbinlog|mysql, and anyway it does not really make @@ -6468,23 +6494,16 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables, than it would help them) */ tmp_write_to_binlog= 0; - if( mysql_bin_log.is_open() ) - { + if (mysql_bin_log.is_open()) mysql_bin_log.rotate_and_purge(RP_FORCE_ROTATE); - } + } + if (options & REFRESH_RELAY_LOG) + { #ifdef HAVE_REPLICATION mysql_mutex_lock(&LOCK_active_mi); rotate_relay_log(active_mi); mysql_mutex_unlock(&LOCK_active_mi); #endif - - /* flush slow and general logs */ - logger.flush_logs(thd); - - if (ha_flush_logs(NULL)) - result=1; - if (flush_error_log()) - result=1; } #ifdef HAVE_QUERY_CACHE if (options & REFRESH_QUERY_CACHE_FREE) diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 51a8ba12358..00cc28e6213 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1963,7 +1963,7 @@ int log_loaded_block(IO_CACHE* file) uchar* buffer= (uchar*) my_b_get_buffer_start(file); uint max_event_size= current_thd->variables.max_allowed_packet; lf_info= (LOAD_FILE_INFO*) file->arg; - if (lf_info->thd->current_stmt_binlog_row_based) + if (lf_info->thd->is_current_stmt_binlog_format_row()) DBUG_RETURN(0); if (lf_info->last_pos_in_file != HA_POS_ERROR && lf_info->last_pos_in_file >= my_b_get_pos_in_file(file)) diff --git a/sql/sql_select.cc b/sql/sql_select.cc index fd2e7ca911d..c412cf5124f 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -10631,6 +10631,7 @@ TABLE *create_virtual_tmp_table(THD *thd, List<Create_field> &field_list) share->blob_field= blob_field; share->fields= field_count; share->blob_ptr_size= portable_sizeof_char_ptr; + share->db_low_byte_first=1; // True for HEAP and MyISAM setup_tmp_table_column_bitmaps(table, bitmaps); /* Create all fields and calculate the total length of record */ @@ -10695,6 +10696,18 @@ TABLE *create_virtual_tmp_table(THD *thd, List<Create_field> &field_list) null_bit= 1; } } + if (cur_field->type() == MYSQL_TYPE_BIT && + cur_field->key_type() == HA_KEYTYPE_BIT) + { + /* This is a Field_bit since key_type is HA_KEYTYPE_BIT */ + static_cast<Field_bit*>(cur_field)->set_bit_ptr(null_pos, null_bit); + null_bit+= cur_field->field_length & 7; + if (null_bit > 7) + { + null_pos++; + null_bit-= 8; + } + } cur_field->reset(); field_pos+= cur_field->pack_length(); diff --git a/sql/sql_table.cc b/sql/sql_table.cc index e17bfe41a25..0111d43b5ff 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -1743,7 +1743,7 @@ end: */ int write_bin_log(THD *thd, bool clear_error, - char const *query, ulong query_length) + char const *query, ulong query_length, bool is_trans) { int error= 0; if (mysql_bin_log.is_open()) @@ -1754,7 +1754,8 @@ int write_bin_log(THD *thd, bool clear_error, else errcode= query_error_code(thd, TRUE); error= thd->binlog_query(THD::STMT_QUERY_TYPE, - query, query_length, FALSE, FALSE, errcode); + query, query_length, is_trans, FALSE, FALSE, + errcode); } return error; } @@ -1862,7 +1863,7 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, LINT_INIT(alias); LINT_INIT(path_length); - if (thd->current_stmt_binlog_row_based && !dont_log_query) + if (thd->is_current_stmt_binlog_format_row() && !dont_log_query) { built_query.set_charset(system_charset_info); if (if_exists) @@ -1964,7 +1965,7 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, // removed temporary table tmp_table_deleted= 1; if (thd->variables.binlog_format == BINLOG_FORMAT_MIXED && - thd->current_stmt_binlog_row_based) + thd->is_current_stmt_binlog_format_row()) { if (built_tmp_query.is_empty()) { @@ -2002,7 +2003,7 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, being built. The string always end in a comma and the comma will be chopped off before being written to the binary log. */ - if (!drop_temporary && thd->current_stmt_binlog_row_based && !dont_log_query) + if (!drop_temporary && thd->is_current_stmt_binlog_format_row() && !dont_log_query) { /* Don't write the database name if it is the current one (or if @@ -2135,7 +2136,7 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, query_cache_invalidate3(thd, tables, 0); if (!dont_log_query) { - if (!thd->current_stmt_binlog_row_based || + if (!thd->is_current_stmt_binlog_format_row() || (non_temp_tables_count > 0 && !tmp_table_deleted)) { /* @@ -2147,7 +2148,7 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, */ error |= write_bin_log(thd, !error, thd->query(), thd->query_length()); } - else if (thd->current_stmt_binlog_row_based && + else if (thd->is_current_stmt_binlog_format_row() && tmp_table_deleted) { if (non_temp_tables_count > 0) @@ -2186,7 +2187,8 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, */ built_tmp_query.chop(); // Chop of the last comma built_tmp_query.append(" /* generated by server */"); - error|= write_bin_log(thd, !error, built_tmp_query.ptr(), built_tmp_query.length()); + error|= write_bin_log(thd, !error, built_tmp_query.ptr(), built_tmp_query.length(), + thd->in_multi_stmt_transaction()); } } @@ -3674,8 +3676,8 @@ static inline int write_create_table_bin_log(THD *thd, Otherwise, the statement shall be binlogged. */ if (!internal_tmp_table && - (!thd->current_stmt_binlog_row_based || - (thd->current_stmt_binlog_row_based && + (!thd->is_current_stmt_binlog_format_row() || + (thd->is_current_stmt_binlog_format_row() && !(create_info->options & HA_LEX_CREATE_TMP_TABLE)))) return write_bin_log(thd, TRUE, thd->query(), thd->query_length()); return 0; @@ -3947,7 +3949,7 @@ bool mysql_create_table_no_lock(THD *thd, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR), alias); - error= write_create_table_bin_log(thd, create_info, internal_tmp_table); + error= 0; goto err; } my_error(ER_TABLE_EXISTS_ERROR, MYF(0), alias); @@ -4096,7 +4098,7 @@ bool mysql_create_table_no_lock(THD *thd, thd->thread_specific_used= TRUE; } - error= write_create_table_bin_log(thd, create_info, internal_tmp_table); + error= FALSE; unlock_and_end: mysql_mutex_unlock(&LOCK_open); @@ -4111,7 +4113,6 @@ warn: ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR), alias); create_info->table_existed= 1; // Mark that table existed - error= write_create_table_bin_log(thd, create_info, internal_tmp_table); goto unlock_and_end; } @@ -4165,14 +4166,12 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table, /* Don't write statement if: - Table creation has failed - - Table has already existed - Row-based logging is used and we are creating a temporary table Otherwise, the statement shall be binlogged. */ if (!result && - !create_info->table_existed && - (!thd->current_stmt_binlog_row_based || - (thd->current_stmt_binlog_row_based && + (!thd->is_current_stmt_binlog_format_row() || + (thd->is_current_stmt_binlog_format_row() && !(create_info->options & HA_LEX_CREATE_TMP_TABLE)))) result= write_bin_log(thd, TRUE, thd->query(), thd->query_length()); @@ -5247,7 +5246,7 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table, TABLE_LIST* src_table, /* We have to write the query before we unlock the tables. */ - if (thd->current_stmt_binlog_row_based) + if (thd->is_current_stmt_binlog_format_row()) { /* Since temporary tables are not replicated under row-based @@ -6443,15 +6442,15 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name, { thd->clear_error(); Query_log_event qinfo(thd, thd->query(), thd->query_length(), - 0, FALSE, 0); + FALSE, TRUE, FALSE, 0); if (error= mysql_bin_log.write(&qinfo)) goto view_err_unlock; } my_ok(thd); } - mysql_mutex_unlock(&LOCK_open); view_err_unlock: + mysql_mutex_unlock(&LOCK_open); unlock_table_names(thd); view_err: @@ -7232,7 +7231,7 @@ view_err: if (rename_temporary_table(thd, new_table, new_db, new_name)) goto err_new_table_cleanup; /* We don't replicate alter table statement on temporary tables */ - if (!thd->current_stmt_binlog_row_based && + if (!thd->is_current_stmt_binlog_format_row() && write_bin_log(thd, TRUE, thd->query(), thd->query_length())) DBUG_RETURN(TRUE); goto end_temporary; @@ -7396,7 +7395,7 @@ view_err: db, table_name); DBUG_ASSERT(!(mysql_bin_log.is_open() && - thd->current_stmt_binlog_row_based && + thd->is_current_stmt_binlog_format_row() && (create_info->options & HA_LEX_CREATE_TMP_TABLE))); if (write_bin_log(thd, TRUE, thd->query(), thd->query_length())) DBUG_RETURN(TRUE); diff --git a/sql/sql_udf.cc b/sql/sql_udf.cc index 95dd653cd5b..462555606c8 100644 --- a/sql/sql_udf.cc +++ b/sql/sql_udf.cc @@ -457,8 +457,8 @@ int mysql_create_function(THD *thd,udf_func *udf) Turn off row binlogging of this statement and use statement-based so that all supporting tables are updated for CREATE FUNCTION command. */ - if (thd->current_stmt_binlog_row_based) - thd->clear_current_stmt_binlog_row_based(); + if (thd->is_current_stmt_binlog_format_row()) + thd->clear_current_stmt_binlog_format_row(); mysql_rwlock_wrlock(&THR_LOCK_udf); if ((my_hash_search(&udf_hash,(uchar*) udf->name.str, udf->name.length))) @@ -558,8 +558,8 @@ int mysql_drop_function(THD *thd,const LEX_STRING *udf_name) Turn off row binlogging of this statement and use statement-based so that all supporting tables are updated for DROP FUNCTION command. */ - if (thd->current_stmt_binlog_row_based) - thd->clear_current_stmt_binlog_row_based(); + if (thd->is_current_stmt_binlog_format_row()) + thd->clear_current_stmt_binlog_format_row(); mysql_rwlock_wrlock(&THR_LOCK_udf); if (!(udf=(udf_func*) my_hash_search(&udf_hash,(uchar*) udf_name->str, diff --git a/sql/sql_update.cc b/sql/sql_update.cc index eeb0dbdbb51..9d05400e160 100644 --- a/sql/sql_update.cc +++ b/sql/sql_update.cc @@ -793,6 +793,9 @@ int mysql_update(THD *thd, { query_cache_invalidate3(thd, table_list, 1); } + + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; /* error < 0 means really no error at all: we processed all rows until the @@ -815,13 +818,11 @@ int mysql_update(THD *thd, if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), thd->query_length(), - transactional_table, FALSE, errcode)) + transactional_table, FALSE, FALSE, errcode)) { error=1; // Rollback update } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } DBUG_ASSERT(transactional_table || !updated || thd->transaction.stmt.modified_non_trans_table); free_underlaid_joins(thd, select_lex); @@ -885,19 +886,6 @@ bool mysql_prepare_update(THD *thd, TABLE_LIST *table_list, SELECT_LEX *select_lex= &thd->lex->select_lex; DBUG_ENTER("mysql_prepare_update"); - /* - Statement-based replication of UPDATE ... LIMIT is not safe as order of - rows is not defined, so in mixed mode we go to row-based. - - Note that we may consider a statement as safe if ORDER BY primary_key - is present. However it may confuse users to see very similiar statements - replicated differently. - */ - if (thd->lex->current_select->select_limit) - { - thd->lex->set_stmt_unsafe(); - thd->set_current_stmt_binlog_row_based_if_mixed(); - } #ifndef NO_EMBEDDED_ACCESS_CHECKS table_list->grant.want_privilege= table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege); @@ -1763,10 +1751,10 @@ bool multi_update::send_data(List<Item> ¬_used_values) /* non-transactional or transactional table got modified */ /* either multi_update class' flag is raised in its branch */ if (table->file->has_transactions()) - transactional_tables= 1; + transactional_tables= TRUE; else { - trans_safe= 0; + trans_safe= FALSE; thd->transaction.stmt.modified_non_trans_table= TRUE; } } @@ -1874,10 +1862,9 @@ void multi_update::abort() into repl event. */ int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); - /* the error of binary logging is ignored */ (void)thd->binlog_query(THD::ROW_QUERY_TYPE, - thd->query(), thd->query_length(), - transactional_tables, FALSE, errcode); + thd->query(), thd->query_length(), + transactional_tables, FALSE, FALSE, errcode); } thd->transaction.all.modified_non_trans_table= TRUE; } @@ -2015,10 +2002,10 @@ int multi_update::do_updates() if (updated != org_updated) { if (table->file->has_transactions()) - transactional_tables= 1; + transactional_tables= TRUE; else { - trans_safe= 0; // Can't do safe rollback + trans_safe= FALSE; // Can't do safe rollback thd->transaction.stmt.modified_non_trans_table= TRUE; } } @@ -2047,10 +2034,10 @@ err2: if (updated != org_updated) { if (table->file->has_transactions()) - transactional_tables= 1; + transactional_tables= TRUE; else { - trans_safe= 0; + trans_safe= FALSE; thd->transaction.stmt.modified_non_trans_table= TRUE; } } @@ -2096,8 +2083,9 @@ bool multi_update::send_eof() either from the query's list or via a stored routine: bug#13270,23333 */ - DBUG_ASSERT(trans_safe || !updated || - thd->transaction.stmt.modified_non_trans_table); + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + if (local_error == 0 || thd->transaction.stmt.modified_non_trans_table) { if (mysql_bin_log.is_open()) @@ -2109,14 +2097,15 @@ bool multi_update::send_eof() errcode= query_error_code(thd, killed_status == THD::NOT_KILLED); if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), thd->query_length(), - transactional_tables, FALSE, errcode)) + transactional_tables, FALSE, FALSE, errcode)) { local_error= 1; // Rollback update } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } + DBUG_ASSERT(trans_safe || !updated || + thd->transaction.stmt.modified_non_trans_table); + if (local_error != 0) error_handled= TRUE; // to force early leave from ::send_error() diff --git a/sql/sql_view.cc b/sql/sql_view.cc index 56dcb6f8ced..931a7adb57f 100644 --- a/sql/sql_view.cc +++ b/sql/sql_view.cc @@ -660,7 +660,7 @@ bool mysql_create_view(THD *thd, TABLE_LIST *views, int errcode= query_error_code(thd, TRUE); if (thd->binlog_query(THD::STMT_QUERY_TYPE, - buff.ptr(), buff.length(), FALSE, FALSE, errcode)) + buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcode)) res= TRUE; } @@ -1323,8 +1323,8 @@ bool mysql_make_view(THD *thd, File_parser *parser, TABLE_LIST *table, If the view's body needs row-based binlogging (e.g. the VIEW is created from SELECT UUID()), the top statement also needs it. */ - if (lex->is_stmt_unsafe()) - old_lex->set_stmt_unsafe(); + old_lex->set_stmt_unsafe_flags(lex->get_stmt_unsafe_flags()); + view_is_mergeable= (table->algorithm != VIEW_ALGORITHM_TMPTABLE && lex->can_be_merged()); diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 2773a4ff0ed..285d5596b24 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -939,6 +939,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token ENUM %token EQ /* OPERATOR */ %token EQUAL_SYM /* OPERATOR */ +%token ERROR_SYM %token ERRORS %token ESCAPED %token ESCAPE_SYM /* SQL-2003-R */ @@ -972,6 +973,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token FULLTEXT_SYM %token FUNCTION_SYM /* SQL-2003-R */ %token GE +%token GENERAL %token GEOMETRYCOLLECTION %token GEOMETRY_SYM %token GET_FORMAT /* MYSQL-FUNC */ @@ -1191,6 +1193,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token REDUNDANT_SYM %token REFERENCES /* SQL-2003-R */ %token REGEXP +%token RELAY %token RELAYLOG_SYM %token RELAY_LOG_FILE_SYM %token RELAY_LOG_POS_SYM @@ -1248,6 +1251,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token SIGNED_SYM %token SIMPLE_SYM /* SQL-2003-N */ %token SLAVE +%token SLOW %token SMALLINT /* SQL-2003-R */ %token SNAPSHOT_SYM %token SOCKET_SYM @@ -7939,7 +7943,7 @@ function_call_keyword: $$= new (YYTHD->mem_root) Item_func_current_user(Lex->current_context()); if ($$ == NULL) MYSQL_YYABORT; - Lex->set_stmt_unsafe(); + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); Lex->safe_to_cache_query= 0; } | DATE_SYM '(' expr ')' @@ -8094,7 +8098,7 @@ function_call_keyword: $$= new (YYTHD->mem_root) Item_func_user(); if ($$ == NULL) MYSQL_YYABORT; - Lex->set_stmt_unsafe(); + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); Lex->safe_to_cache_query=0; } | YEAR_SYM '(' expr ')' @@ -8244,7 +8248,7 @@ function_call_nonkeyword: sysdate_is_now=1, because the slave may have sysdate_is_now=0. */ - Lex->set_stmt_unsafe(); + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); if (global_system_variables.sysdate_is_now == 0) $$= new (YYTHD->mem_root) Item_func_sysdate_local(); else @@ -8838,7 +8842,7 @@ variable_aux: if (!($$= get_system_var(YYTHD, $2, $3, $4))) MYSQL_YYABORT; if (!((Item_func_get_system_var*) $$)->is_written_to_binlog()) - Lex->set_stmt_unsafe(); + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_VARIABLE); } ; @@ -9781,7 +9785,10 @@ opt_limit_clause: ; limit_clause: - LIMIT limit_options {} + LIMIT limit_options + { + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_LIMIT); + } ; limit_options: @@ -9843,6 +9850,7 @@ delete_limit_clause: { SELECT_LEX *sel= Select; sel->select_limit= $2; + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_LIMIT); sel->explicit_limit= 1; } ; @@ -10293,13 +10301,21 @@ insert_lock_option: #endif } | LOW_PRIORITY { $$= TL_WRITE_LOW_PRIORITY; } - | DELAYED_SYM { $$= TL_WRITE_DELAYED; } + | DELAYED_SYM + { + $$= TL_WRITE_DELAYED; + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_DELAYED); + } | HIGH_PRIORITY { $$= TL_WRITE; } ; replace_lock_option: opt_low_priority { $$= $1; } - | DELAYED_SYM { $$= TL_WRITE_DELAYED; } + | DELAYED_SYM + { + $$= TL_WRITE_DELAYED; + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_DELAYED); + } ; insert2: @@ -11083,6 +11099,18 @@ flush_option: opt_table_list {} | TABLES WITH READ_SYM LOCK_SYM { Lex->type|= REFRESH_TABLES | REFRESH_READ_LOCK; } + | ERROR_SYM LOGS_SYM + { Lex->type|= REFRESH_ERROR_LOG; } + | ENGINE_SYM LOGS_SYM + { Lex->type|= REFRESH_ENGINE_LOG; } + | GENERAL LOGS_SYM + { Lex->type|= REFRESH_GENERAL_LOG; } + | SLOW LOGS_SYM + { Lex->type|= REFRESH_SLOW_LOG; } + | BINARY LOGS_SYM + { Lex->type|= REFRESH_BINARY_LOG; } + | RELAY LOGS_SYM + { Lex->type|= REFRESH_RELAY_LOG; } | QUERY_SYM CACHE_SYM { Lex->type|= REFRESH_QUERY_CACHE_FREE; } | HOSTS_SYM @@ -12232,6 +12260,7 @@ keyword_sp: | ENUM {} | ENGINE_SYM {} | ENGINES_SYM {} + | ERROR_SYM {} | ERRORS {} | ESCAPE_SYM {} | EVENT_SYM {} @@ -12357,6 +12386,7 @@ keyword_sp: | REDO_BUFFER_SIZE_SYM {} | REDOFILE_SYM {} | REDUNDANT_SYM {} + | RELAY {} | RELAYLOG_SYM {} | RELAY_LOG_FILE_SYM {} | RELAY_LOG_POS_SYM {} diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 80de0658143..e04b7be616f 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -241,12 +241,6 @@ static bool check_has_super(sys_var *self, THD *thd, set_var *var) } static bool binlog_format_check(sys_var *self, THD *thd, set_var *var) { - if (check_has_super(self, thd, var)) - return true; - if (var->type == OPT_GLOBAL || - (thd->variables.binlog_format == var->save_result.ulonglong_value)) - return false; - /* If RBR and open temporary tables, their CREATE TABLE may not be in the binlog, so we can't toggle to SBR in this connection. @@ -265,6 +259,21 @@ static bool binlog_format_check(sys_var *self, THD *thd, set_var *var) my_error(ER_STORED_FUNCTION_PREVENTS_SWITCH_BINLOG_FORMAT, MYF(0)); return true; } + /* + Make the session variable 'binlog_format' read-only inside a transaction. + */ + if (thd->active_transaction() && (var->type == OPT_SESSION)) + { + my_error(ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_BINLOG_FORMAT, MYF(0)); + return true; + } + + if (check_has_super(self, thd, var)) + return true; + if (var->type == OPT_GLOBAL || + (thd->variables.binlog_format == var->save_result.ulonglong_value)) + return false; + return false; } @@ -272,7 +281,7 @@ static bool fix_binlog_format_after_update(sys_var *self, THD *thd, enum_var_type type) { if (type == OPT_SESSION) - thd->reset_current_stmt_binlog_row_based(); + thd->reset_current_stmt_binlog_format_row(); return false; } @@ -1685,8 +1694,20 @@ static Sys_var_enum Slave_exec_mode( "between the master and the slave", GLOBAL_VAR(slave_exec_mode_options), CMD_LINE(REQUIRED_ARG), slave_exec_mode_names, DEFAULT(SLAVE_EXEC_MODE_STRICT)); +const char *slave_type_conversions_name[]= {"ALL_LOSSY", "ALL_NON_LOSSY", 0}; +static Sys_var_set Slave_type_conversions( + "slave_type_conversions", + "Set of slave type conversions that are enabled. Legal values are:" + " ALL_LOSSY to enable lossy conversions and" + " ALL_NON_LOSSY to enable non-lossy conversions." + " If the variable is assigned the empty set, no conversions are" + " allowed and it is expected that the types match exactly.", + GLOBAL_VAR(slave_type_conversions_options), CMD_LINE(REQUIRED_ARG), + slave_type_conversions_name, + DEFAULT(0)); #endif + static Sys_var_ulong Sys_slow_launch_time( "slow_launch_time", "If creating the thread takes longer than this value (in seconds), " |