diff options
Diffstat (limited to 'sql')
48 files changed, 3558 insertions, 1368 deletions
diff --git a/sql/event_db_repository.cc b/sql/event_db_repository.cc index 9f3863eb2b0..a6f249d286f 100644 --- a/sql/event_db_repository.cc +++ b/sql/event_db_repository.cc @@ -1052,8 +1052,8 @@ update_timing_fields_for_event(THD *thd, Turn off row binlogging of event timing updates. These are not used for RBR of events replicated to the slave. */ - if (thd->current_stmt_binlog_row_based) - thd->clear_current_stmt_binlog_row_based(); + if (thd->is_current_stmt_binlog_format_row()) + thd->clear_current_stmt_binlog_format_row(); DBUG_ASSERT(thd->security_ctx->master_access & SUPER_ACL); diff --git a/sql/events.cc b/sql/events.cc index f5f837930c0..690c73b9699 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -430,8 +430,8 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, Turn off row binlogging of this statement and use statement-based so that all supporting tables are updated for CREATE EVENT command. */ - if (thd->current_stmt_binlog_row_based) - thd->clear_current_stmt_binlog_row_based(); + if (thd->is_current_stmt_binlog_format_row()) + thd->clear_current_stmt_binlog_format_row(); pthread_mutex_lock(&LOCK_event_metadata); @@ -563,8 +563,8 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, Turn off row binlogging of this statement and use statement-based so that all supporting tables are updated for UPDATE EVENT command. */ - if (thd->current_stmt_binlog_row_based) - thd->clear_current_stmt_binlog_row_based(); + if (thd->is_current_stmt_binlog_format_row()) + thd->clear_current_stmt_binlog_format_row(); pthread_mutex_lock(&LOCK_event_metadata); @@ -660,8 +660,8 @@ Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists) Turn off row binlogging of this statement and use statement-based so that all supporting tables are updated for DROP EVENT command. */ - if (thd->current_stmt_binlog_row_based) - thd->clear_current_stmt_binlog_row_based(); + if (thd->is_current_stmt_binlog_format_row()) + thd->clear_current_stmt_binlog_format_row(); pthread_mutex_lock(&LOCK_event_metadata); /* On error conditions my_error() is called so no need to handle here */ diff --git a/sql/field.cc b/sql/field.cc index 0934bb04ccd..8e5fa682e9e 100644 --- a/sql/field.cc +++ b/sql/field.cc @@ -59,6 +59,8 @@ const char field_separator=','; #define ASSERT_COLUMN_MARKED_FOR_READ DBUG_ASSERT(!table || (!table->read_set || bitmap_is_set(table->read_set, field_index))) #define ASSERT_COLUMN_MARKED_FOR_WRITE DBUG_ASSERT(!table || (!table->write_set || bitmap_is_set(table->write_set, field_index))) +#define FLAGSTR(S,F) ((S) & (F) ? #F " " : "") + /* Rules for merging different types of fields in UNION @@ -997,6 +999,22 @@ test_if_important_data(CHARSET_INFO *cs, const char *str, const char *strend) /** + Function to compare two unsigned integers for their relative order. + Used below. In an anonymous namespace to not clash with definitions + in other files. + */ +namespace { + int compare(unsigned int a, unsigned int b) + { + if (a < b) + return -1; + if (b < a) + return 1; + return 0; +} +} + +/** Detect Item_result by given field type of UNION merge result. @param field_type given field type @@ -1395,22 +1413,48 @@ bool Field::send_binary(Protocol *protocol) /** Check to see if field size is compatible with destination. - This method is used in row-based replication to verify that the slave's - field size is less than or equal to the master's field size. The - encoded field metadata (from the master or source) is decoded and compared - to the size of this field (the slave or destination). + This method is used in row-based replication to verify that the + slave's field size is less than or equal to the master's field + size. The encoded field metadata (from the master or source) is + decoded and compared to the size of this field (the slave or + destination). + + @note + + The comparison is made so that if the source data (from the master) + is less than the target data (on the slave), -1 is returned in @c + <code>*order_var</code>. This implies that a conversion is + necessary, but that it is lossy and can result in truncation of the + value. + + If the source data is strictly greater than the target data, 1 is + returned in <code>*order_var</code>. This implies that the source + type can is contained in the target type and that a conversion is + necessary but is non-lossy. + + If no conversion is required to fit the source type in the target + type, 0 is returned in <code>*order_var</code>. @param field_metadata Encoded size in field metadata + @param mflags Flags from the table map event for the table. + @param order_var Pointer to variable where the order + between the source field and this field + will be returned. - @retval 0 if this field's size is < the source field's size - @retval 1 if this field's size is >= the source field's size + @return @c true if this field's size is compatible with the + master's field size, @c false otherwise. */ -int Field::compatible_field_size(uint field_metadata, - const Relay_log_info *rli_arg __attribute__((unused))) +bool Field::compatible_field_size(uint field_metadata, + Relay_log_info *rli_arg __attribute__((unused)), + uint16 mflags __attribute__((unused)), + int *order_var) { uint const source_size= pack_length_from_metadata(field_metadata); uint const destination_size= row_pack_length(); - return (source_size <= destination_size); + DBUG_PRINT("debug", ("real_type: %d, source_size: %u, destination_size: %u", + real_type(), source_size, destination_size)); + *order_var = compare(source_size, destination_size); + return true; } @@ -2893,33 +2937,16 @@ uint Field_new_decimal::pack_length_from_metadata(uint field_metadata) } -/** - Check to see if field size is compatible with destination. - - This method is used in row-based replication to verify that the slave's - field size is less than or equal to the master's field size. The - encoded field metadata (from the master or source) is decoded and compared - to the size of this field (the slave or destination). - - @param field_metadata Encoded size in field metadata - - @retval 0 if this field's size is < the source field's size - @retval 1 if this field's size is >= the source field's size -*/ -int Field_new_decimal::compatible_field_size(uint field_metadata, - const Relay_log_info * __attribute__((unused))) +bool Field_new_decimal::compatible_field_size(uint field_metadata, + Relay_log_info * __attribute__((unused)), + uint16 mflags __attribute__((unused)), + int *order_var) { - int compatible= 0; uint const source_precision= (field_metadata >> 8U) & 0x00ff; uint const source_decimal= field_metadata & 0x00ff; - uint const source_size= my_decimal_get_binary_size(source_precision, - source_decimal); - uint const destination_size= row_pack_length(); - compatible= (source_size <= destination_size); - if (compatible) - compatible= (source_precision <= precision) && - (source_decimal <= decimals()); - return (compatible); + int order= compare(source_precision, precision); + *order_var= order != 0 ? order : compare(source_decimal, dec); + return true; } @@ -6675,8 +6702,11 @@ check_field_for_37426(const void *param_arg) } #endif -int Field_string::compatible_field_size(uint field_metadata, - const Relay_log_info *rli_arg) +bool +Field_string::compatible_field_size(uint field_metadata, + Relay_log_info *rli_arg, + uint16 mflags __attribute__((unused)), + int *order_var) { #ifdef HAVE_REPLICATION const Check_field_param check_param = { this }; @@ -6684,7 +6714,7 @@ int Field_string::compatible_field_size(uint field_metadata, check_field_for_37426, &check_param)) return FALSE; // Not compatible field sizes #endif - return Field::compatible_field_size(field_metadata, rli_arg); + return Field::compatible_field_size(field_metadata, rli_arg, mflags, order_var); } @@ -6745,6 +6775,8 @@ uchar *Field_string::pack(uchar *to, const uchar *from, { uint length= min(field_length,max_length); uint local_char_length= max_length/field_charset->mbmaxlen; + DBUG_PRINT("debug", ("Packing field '%s' - length: %u ", field_name, length)); + if (length > local_char_length) local_char_length= my_charpos(field_charset, from, from+length, local_char_length); @@ -7407,6 +7439,7 @@ Field_blob::Field_blob(uchar *ptr_arg, uchar *null_ptr_arg, uchar null_bit_arg, cs), packlength(blob_pack_length) { + DBUG_ASSERT(blob_pack_length <= 4); // Only pack lengths 1-4 supported currently flags|= BLOB_FLAG; share->blob_fields++; /* TODO: why do not fill table->s->blob_field array here? */ @@ -7817,8 +7850,10 @@ int Field_blob::key_cmp(const uchar *a,const uchar *b) */ int Field_blob::do_save_field_metadata(uchar *metadata_ptr) { + DBUG_ENTER("Field_blob::do_save_field_metadata"); *metadata_ptr= pack_length_no_ptr(); - return 1; + DBUG_PRINT("debug", ("metadata: %u (pack_length_no_ptr)", *metadata_ptr)); + DBUG_RETURN(1); } @@ -8626,6 +8661,9 @@ Field_bit::Field_bit(uchar *ptr_arg, uint32 len_arg, uchar *null_ptr_arg, bit_ptr(bit_ptr_arg), bit_ofs(bit_ofs_arg), bit_len(len_arg & 7), bytes_in_rec(len_arg / 8) { + DBUG_ENTER("Field_bit::Field_bit"); + DBUG_PRINT("enter", ("ptr_arg: %p, null_ptr_arg: %p, len_arg: %u, bit_len: %u, bytes_in_rec: %u", + ptr_arg, null_ptr_arg, len_arg, bit_len, bytes_in_rec)); flags|= UNSIGNED_FLAG; /* Ensure that Field::eq() can distinguish between two different bit fields. @@ -8633,6 +8671,7 @@ Field_bit::Field_bit(uchar *ptr_arg, uint32 len_arg, uchar *null_ptr_arg, */ if (!null_ptr_arg) null_bit= bit_ofs_arg; + DBUG_VOID_RETURN; } @@ -8917,9 +8956,17 @@ uint Field_bit::get_key_image(uchar *buff, uint length, imagetype type_arg) */ int Field_bit::do_save_field_metadata(uchar *metadata_ptr) { - *metadata_ptr= bit_len; - *(metadata_ptr + 1)= bytes_in_rec; - return 2; + DBUG_ENTER("Field_bit::do_save_field_metadata"); + DBUG_PRINT("debug", ("bit_len: %d, bytes_in_rec: %d", + bit_len, bytes_in_rec)); + /* + Since this class and Field_bit_as_char have different ideas of + what should be stored here, we compute the values of the metadata + explicitly using the field_length. + */ + metadata_ptr[0]= field_length % 8; + metadata_ptr[1]= field_length / 8; + DBUG_RETURN(2); } @@ -8944,34 +8991,34 @@ uint Field_bit::pack_length_from_metadata(uint field_metadata) } -/** - Check to see if field size is compatible with destination. - - This method is used in row-based replication to verify that the slave's - field size is less than or equal to the master's field size. The - encoded field metadata (from the master or source) is decoded and compared - to the size of this field (the slave or destination). +bool +Field_bit::compatible_field_size(uint field_metadata, + Relay_log_info * __attribute__((unused)), + uint16 mflags, + int *order_var) +{ + DBUG_ENTER("Field_bit::compatible_field_size"); + DBUG_ASSERT((field_metadata >> 16) == 0); + uint from_bit_len= + 8 * (field_metadata >> 8) + (field_metadata & 0xff); + uint to_bit_len= max_display_length(); + DBUG_PRINT("debug", ("from_bit_len: %u, to_bit_len: %u", + from_bit_len, to_bit_len)); + /* + If the bit length exact flag is clear, we are dealing with an old + master, so we allow some less strict behaviour if replicating by + moving both bit lengths to an even multiple of 8. - @param field_metadata Encoded size in field metadata + We do this by computing the number of bytes to store the field + instead, and then compare the result. + */ + if (!(mflags & Table_map_log_event::TM_BIT_LEN_EXACT_F)) { + from_bit_len= (from_bit_len + 7) / 8; + to_bit_len= (to_bit_len + 7) / 8; + } - @retval 0 if this field's size is < the source field's size - @retval 1 if this field's size is >= the source field's size -*/ -int Field_bit::compatible_field_size(uint field_metadata, - const Relay_log_info * __attribute__((unused))) -{ - int compatible= 0; - uint const source_size= pack_length_from_metadata(field_metadata); - uint const destination_size= row_pack_length(); - uint const from_bit_len= field_metadata & 0x00ff; - uint const from_len= (field_metadata >> 8U) & 0x00ff; - if ((bit_len == 0) || (from_bit_len == 0)) - compatible= (source_size <= destination_size); - else if (from_bit_len > bit_len) - compatible= (from_len < bytes_in_rec); - else - compatible= ((from_bit_len <= bit_len) && (from_len <= bytes_in_rec)); - return (compatible); + *order_var= compare(from_bit_len, to_bit_len); + DBUG_RETURN(TRUE); } @@ -9037,8 +9084,15 @@ const uchar * Field_bit::unpack(uchar *to, const uchar *from, uint param_data, bool low_byte_first __attribute__((unused))) { + DBUG_ENTER("Field_bit::unpack"); + DBUG_PRINT("enter", ("to: %p, from: %p, param_data: 0x%x", + to, from, param_data)); + DBUG_PRINT("debug", ("bit_ptr: %p, bit_len: %u, bit_ofs: %u", + bit_ptr, bit_len, bit_ofs)); uint const from_len= (param_data >> 8U) & 0x00ff; uint const from_bit_len= param_data & 0x00ff; + DBUG_PRINT("debug", ("from_len: %u, from_bit_len: %u", + from_len, from_bit_len)); /* If the parameter data is zero (i.e., undefined), or if the master and slave have the same sizes, then use the old unpack() method. @@ -9059,7 +9113,7 @@ Field_bit::unpack(uchar *to, const uchar *from, uint param_data, from++; } memcpy(to, from, bytes_in_rec); - return from + bytes_in_rec; + DBUG_RETURN(from + bytes_in_rec); } /* @@ -9085,7 +9139,7 @@ Field_bit::unpack(uchar *to, const uchar *from, uint param_data, bitmap_set_bit(table->write_set,field_index); store(value, new_len, system_charset_info); my_afree(value); - return from + len; + DBUG_RETURN(from + len); } @@ -9213,8 +9267,11 @@ void Create_field::create_length_to_internal_length(void) */ void Create_field::init_for_tmp_table(enum_field_types sql_type_arg, uint32 length_arg, uint32 decimals_arg, - bool maybe_null, bool is_unsigned) + bool maybe_null, bool is_unsigned, + uint pack_length) { + DBUG_ENTER("Create_field::init_for_tmp_table"); + field_name= ""; sql_type= sql_type_arg; char_length= length= length_arg;; @@ -9222,10 +9279,92 @@ void Create_field::init_for_tmp_table(enum_field_types sql_type_arg, interval= 0; charset= &my_charset_bin; geom_type= Field::GEOM_GEOMETRY; - pack_flag= (FIELDFLAG_NUMBER | - ((decimals_arg & FIELDFLAG_MAX_DEC) << FIELDFLAG_DEC_SHIFT) | - (maybe_null ? FIELDFLAG_MAYBE_NULL : 0) | - (is_unsigned ? 0 : FIELDFLAG_DECIMAL)); + + DBUG_PRINT("enter", ("sql_type: %d, length: %u, pack_length: %u", + sql_type_arg, length_arg, pack_length)); + + /* + These pack flags are crafted to get it correctly through the + branches of make_field(). + */ + switch (sql_type_arg) + { + case MYSQL_TYPE_VARCHAR: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_STRING: + case MYSQL_TYPE_SET: + pack_flag= 0; + break; + + case MYSQL_TYPE_GEOMETRY: + pack_flag= FIELDFLAG_GEOM; + break; + + case MYSQL_TYPE_ENUM: + pack_flag= FIELDFLAG_INTERVAL; + break; + + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_NEWDECIMAL: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + pack_flag= FIELDFLAG_DECIMAL | FIELDFLAG_NUMBER | + (decimals_arg & FIELDFLAG_MAX_DEC) << FIELDFLAG_DEC_SHIFT; + break; + + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + pack_flag= FIELDFLAG_BLOB; + break; + + case MYSQL_TYPE_BIT: + pack_flag= FIELDFLAG_NUMBER | FIELDFLAG_TREAT_BIT_AS_CHAR; + break; + + default: + pack_flag= FIELDFLAG_NUMBER; + break; + } + + /* + Set the pack flag correctly for the blob-like types. This sets the + packtype to something that make_field can use. If the pack type is + not set correctly, the packlength will be reeeeally wierd (like + 129 or so). + */ + switch (sql_type_arg) + { + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_GEOMETRY: + // If you are going to use the above types, you have to pass a + // pack_length as parameter. Assert that is really done. + DBUG_ASSERT(pack_length != ~0U); + pack_flag|= pack_length_to_packflag(pack_length); + break; + default: + /* Nothing */ + break; + } + + pack_flag|= + (maybe_null ? FIELDFLAG_MAYBE_NULL : 0) | + (is_unsigned ? 0 : FIELDFLAG_DECIMAL); + + DBUG_PRINT("debug", ("pack_flag: %s%s%s%s%s, pack_type: %d", + FLAGSTR(pack_flag, FIELDFLAG_BINARY), + FLAGSTR(pack_flag, FIELDFLAG_NUMBER), + FLAGSTR(pack_flag, FIELDFLAG_INTERVAL), + FLAGSTR(pack_flag, FIELDFLAG_GEOM), + FLAGSTR(pack_flag, FIELDFLAG_BLOB), + f_packtype(pack_flag))); + DBUG_VOID_RETURN; } @@ -9731,6 +9870,14 @@ Field *make_field(TABLE_SHARE *share, uchar *ptr, uint32 field_length, default: break; } + DBUG_PRINT("debug", ("field_type: %d, field_length: %u, interval: %p, pack_flag: %s%s%s%s%s", + field_type, field_length, interval, + FLAGSTR(pack_flag, FIELDFLAG_BINARY), + FLAGSTR(pack_flag, FIELDFLAG_INTERVAL), + FLAGSTR(pack_flag, FIELDFLAG_NUMBER), + FLAGSTR(pack_flag, FIELDFLAG_PACK), + FLAGSTR(pack_flag, FIELDFLAG_BLOB))); + if (f_is_alpha(pack_flag)) { if (!f_is_packed(pack_flag)) diff --git a/sql/field.h b/sql/field.h index bb3636c654e..e1f7b8c6a29 100644 --- a/sql/field.h +++ b/sql/field.h @@ -166,22 +166,13 @@ public: table, which is located on disk). */ virtual uint32 pack_length_in_rec() const { return pack_length(); } - virtual int compatible_field_size(uint field_metadata, - const Relay_log_info *); + virtual bool compatible_field_size(uint metadata, Relay_log_info *rli, + uint16 mflags, int *order); virtual uint pack_length_from_metadata(uint field_metadata) - { return field_metadata; } - /* - This method is used to return the size of the data in a row-based - replication row record. The default implementation of returning 0 is - designed to allow fields that do not use metadata to return TRUE (1) - from compatible_field_size() which uses this function in the comparison. - The default value for field metadata for fields that do not have - metadata is 0. Thus, 0 == 0 means the fields are compatible in size. - - Note: While most classes that override this method return pack_length(), - the classes Field_string, Field_varstring, and Field_blob return - field_length + 1, field_length, and pack_length_no_ptr() respectfully. - */ + { + DBUG_ENTER("Field::pack_length_from_metadata"); + DBUG_RETURN(field_metadata); + } virtual uint row_pack_length() { return 0; } virtual int save_field_metadata(uchar *first_byte) { return do_save_field_metadata(first_byte); } @@ -619,6 +610,13 @@ public: int store_decimal(const my_decimal *); my_decimal *val_decimal(my_decimal *); uint is_equal(Create_field *new_field); + uint row_pack_length() { return pack_length(); } + uint32 pack_length_from_metadata(uint field_metadata) { + uint32 length= pack_length(); + DBUG_PRINT("result", ("pack_length_from_metadata(%d): %u", + field_metadata, length)); + return length; + } int check_int(CHARSET_INFO *cs, const char *str, int length, const char *int_end, int error); bool get_int(CHARSET_INFO *cs, const char *from, uint len, @@ -783,8 +781,8 @@ public: uint32 pack_length() const { return (uint32) bin_size; } uint pack_length_from_metadata(uint field_metadata); uint row_pack_length() { return pack_length(); } - int compatible_field_size(uint field_metadata, - const Relay_log_info *rli); + bool compatible_field_size(uint field_metadata, Relay_log_info *rli, + uint16 mflags, int *order_var); uint is_equal(Create_field *new_field); virtual const uchar *unpack(uchar* to, const uchar *from, uint param_data, bool low_byte_first); @@ -1479,9 +1477,12 @@ public: return row_pack_length(); return (((field_metadata >> 4) & 0x300) ^ 0x300) + (field_metadata & 0x00ff); } - int compatible_field_size(uint field_metadata, - const Relay_log_info *rli); - uint row_pack_length() { return (field_length + 1); } + bool compatible_field_size(uint field_metadata, Relay_log_info *rli, + uint16 mflags, int *order_var); + uint row_pack_length() { return field_length; } + int pack_cmp(const uchar *a,const uchar *b,uint key_length, + my_bool insert_or_update); + int pack_cmp(const uchar *b,uint key_length,my_bool insert_or_update); uint packed_col_length(const uchar *to, uint length); uint max_packed_col_length(uint max_length); uint size_of() const { return sizeof(*this); } @@ -1926,8 +1927,8 @@ public: uint pack_length_from_metadata(uint field_metadata); uint row_pack_length() { return (bytes_in_rec + ((bit_len > 0) ? 1 : 0)); } - int compatible_field_size(uint field_metadata, - const Relay_log_info *rli); + bool compatible_field_size(uint metadata, Relay_log_info *rli, + uint16 mflags, int *order_var); void sql_type(String &str) const; virtual uchar *pack(uchar *to, const uchar *from, uint max_length, bool low_byte_first); @@ -2030,7 +2031,8 @@ public: /* Init for a tmp table field. To be extended if need be. */ void init_for_tmp_table(enum_field_types sql_type_arg, uint32 max_length, uint32 decimals, - bool maybe_null, bool is_unsigned); + bool maybe_null, bool is_unsigned, + uint pack_length = ~0U); bool init(THD *thd, char *field_name, enum_field_types type, char *length, char *decimals, uint type_modifier, Item *default_value, diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index bdbb57224b0..db6c628c195 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -302,6 +302,7 @@ static void run_query(THD *thd, char *buf, char *end, thd->transaction.all= save_thd_transaction_all; thd->transaction.stmt= save_thd_transaction_stmt; thd->net= save_thd_net; + thd->set_current_stmt_binlog_format_row(); if (thd == injector_thd) { @@ -1870,7 +1871,7 @@ static void ndb_binlog_query(THD *thd, Cluster_schema *schema) thd->db= schema->db; int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED); thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query, - schema->query_length, FALSE, + schema->query_length, FALSE, TRUE, schema->name[0] == 0 || thd->db[0] == 0, errcode); thd->server_id= thd_server_id_save; @@ -3647,6 +3648,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) thd= new THD; /* note that contructor of THD uses DBUG_ */ THD_CHECK_SENTRY(thd); + thd->set_current_stmt_binlog_format_row(); /* We need to set thd->thread_id before thd->store_globals, or it will set an invalid value for thd->variables.pseudo_thread_id. diff --git a/sql/ha_partition.cc b/sql/ha_partition.cc index 2ec92173d14..eb38be96113 100644 --- a/sql/ha_partition.cc +++ b/sql/ha_partition.cc @@ -6482,7 +6482,7 @@ void ha_partition::get_auto_increment(ulonglong offset, ulonglong increment, if (!auto_increment_safe_stmt_log_lock && thd->lex->sql_command != SQLCOM_INSERT && mysql_bin_log.is_open() && - !thd->current_stmt_binlog_row_based && + !thd->is_current_stmt_binlog_format_row() && (thd->options & OPTION_BIN_LOG)) { DBUG_PRINT("info", ("locking auto_increment_safe_stmt_log_lock")); diff --git a/sql/handler.cc b/sql/handler.cc index dd8eb93099d..48cf591abc1 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -2487,7 +2487,7 @@ int handler::update_auto_increment() variables->auto_increment_increment); auto_inc_intervals_count++; /* Row-based replication does not need to store intervals in binlog */ - if (mysql_bin_log.is_open() && !thd->current_stmt_binlog_row_based) + if (mysql_bin_log.is_open() && !thd->is_current_stmt_binlog_format_row()) thd->auto_inc_intervals_in_cur_stmt_for_binlog.append(auto_inc_interval_for_cur_row.minimum(), auto_inc_interval_for_cur_row.values(), variables->auto_increment_increment); @@ -4487,7 +4487,7 @@ static bool check_table_binlog_row_based(THD *thd, TABLE *table) DBUG_ASSERT(table->s->cached_row_logging_check == 0 || table->s->cached_row_logging_check == 1); - return (thd->current_stmt_binlog_row_based && + return (thd->is_current_stmt_binlog_format_row() && table->s->cached_row_logging_check && (thd->options & OPTION_BIN_LOG) && mysql_bin_log.is_open()); @@ -4549,7 +4549,21 @@ static int write_locked_table_maps(THD *thd) if (table->current_lock == F_WRLCK && check_table_binlog_row_based(thd, table)) { - int const has_trans= table->file->has_transactions(); + /* + We need to have a transactional behavior for SQLCOM_CREATE_TABLE + (e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a + compatible behavior with the STMT based replication even when + the table is not transactional. In other words, if the operation + fails while executing the insert phase nothing is written to the + binlog. + + Note that at this point, we check the type of a set of tables to + create the table map events. In the function binlog_log_row(), + which calls the current function, we check the type of the table + of the current row. + */ + bool const has_trans= thd->lex->sql_command == SQLCOM_CREATE_TABLE || + table->file->has_transactions(); int const error= thd->binlog_write_table_map(table, has_trans); /* If an error occurs, it is the responsibility of the caller to @@ -4598,10 +4612,20 @@ static int binlog_log_row(TABLE* table, { bitmap_set_all(&cols); if (likely(!(error= write_locked_table_maps(thd)))) - error= (*log_func)(thd, table, table->file->has_transactions(), - &cols, table->s->fields, + { + /* + We need to have a transactional behavior for SQLCOM_CREATE_TABLE + (i.e. CREATE TABLE... SELECT * FROM TABLE) in order to keep a + compatible behavior with the STMT based replication even when + the table is not transactional. In other words, if the operation + fails while executing the insert phase nothing is written to the + binlog. + */ + bool const has_trans= thd->lex->sql_command == SQLCOM_CREATE_TABLE || + table->file->has_transactions(); + error= (*log_func)(thd, table, has_trans, &cols, table->s->fields, before_record, after_record); - + } if (!use_bitbuf) bitmap_free(&cols); } diff --git a/sql/item_create.cc b/sql/item_create.cc index c00b5ec1701..7d36481a23b 100644 --- a/sql/item_create.cc +++ b/sql/item_create.cc @@ -2387,10 +2387,11 @@ Create_udf_func::create(THD *thd, udf_func *udf, List<Item> *item_list) Item *func= NULL; int arg_count= 0; + DBUG_ENTER("Create_udf_func::create"); if (item_list != NULL) arg_count= item_list->elements; - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_UDF); DBUG_ASSERT( (udf->type == UDFTYPE_FUNCTION) || (udf->type == UDFTYPE_AGGREGATE)); @@ -2474,7 +2475,7 @@ Create_udf_func::create(THD *thd, udf_func *udf, List<Item> *item_list) } } thd->lex->safe_to_cache_query= 0; - return func; + DBUG_RETURN(func); } #endif @@ -3400,9 +3401,10 @@ Create_func_found_rows Create_func_found_rows::s_singleton; Item* Create_func_found_rows::create(THD *thd) { - thd->lex->set_stmt_unsafe(); + DBUG_ENTER("Create_func_found_rows::create"); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->safe_to_cache_query= 0; - return new (thd->mem_root) Item_func_found_rows(); + DBUG_RETURN(new (thd->mem_root) Item_func_found_rows()); } @@ -3561,7 +3563,7 @@ Create_func_get_lock Create_func_get_lock::s_singleton; Item* Create_func_get_lock::create(THD *thd, Item *arg1, Item *arg2) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->uncacheable(UNCACHEABLE_SIDEEFFECT); return new (thd->mem_root) Item_func_get_lock(arg1, arg2); } @@ -3673,7 +3675,7 @@ Create_func_is_free_lock Create_func_is_free_lock::s_singleton; Item* Create_func_is_free_lock::create(THD *thd, Item *arg1) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->uncacheable(UNCACHEABLE_SIDEEFFECT); return new (thd->mem_root) Item_func_is_free_lock(arg1); } @@ -3684,7 +3686,7 @@ Create_func_is_used_lock Create_func_is_used_lock::s_singleton; Item* Create_func_is_used_lock::create(THD *thd, Item *arg1) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->uncacheable(UNCACHEABLE_SIDEEFFECT); return new (thd->mem_root) Item_func_is_used_lock(arg1); } @@ -3831,9 +3833,10 @@ Create_func_load_file Create_func_load_file::s_singleton; Item* Create_func_load_file::create(THD *thd, Item *arg1) { - thd->lex->set_stmt_unsafe(); + DBUG_ENTER("Create_func_load_file::create"); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->uncacheable(UNCACHEABLE_SIDEEFFECT); - return new (thd->mem_root) Item_load_file(arg1); + DBUG_RETURN(new (thd->mem_root) Item_load_file(arg1)); } @@ -4001,7 +4004,7 @@ Create_func_master_pos_wait::create_native(THD *thd, LEX_STRING name, Item *func= NULL; int arg_count= 0; - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); if (item_list != NULL) arg_count= item_list->elements; @@ -4245,7 +4248,7 @@ Create_func_release_lock Create_func_release_lock::s_singleton; Item* Create_func_release_lock::create(THD *thd, Item *arg1) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->uncacheable(UNCACHEABLE_SIDEEFFECT); return new (thd->mem_root) Item_func_release_lock(arg1); } @@ -4303,9 +4306,10 @@ Create_func_row_count Create_func_row_count::s_singleton; Item* Create_func_row_count::create(THD *thd) { - thd->lex->set_stmt_unsafe(); + DBUG_ENTER("Create_func_row_count::create"); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->safe_to_cache_query= 0; - return new (thd->mem_root) Item_func_row_count(); + DBUG_RETURN(new (thd->mem_root) Item_func_row_count()); } @@ -4368,7 +4372,7 @@ Create_func_sleep Create_func_sleep::s_singleton; Item* Create_func_sleep::create(THD *thd, Item *arg1) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->uncacheable(UNCACHEABLE_SIDEEFFECT); return new (thd->mem_root) Item_func_sleep(arg1); } @@ -4622,9 +4626,10 @@ Create_func_uuid Create_func_uuid::s_singleton; Item* Create_func_uuid::create(THD *thd) { - thd->lex->set_stmt_unsafe(); + DBUG_ENTER("Create_func_uuid::create"); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->safe_to_cache_query= 0; - return new (thd->mem_root) Item_func_uuid(); + DBUG_RETURN(new (thd->mem_root) Item_func_uuid()); } @@ -4633,9 +4638,10 @@ Create_func_uuid_short Create_func_uuid_short::s_singleton; Item* Create_func_uuid_short::create(THD *thd) { - thd->lex->set_stmt_unsafe(); + DBUG_ENTER("Create_func_uuid_short::create"); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); thd->lex->safe_to_cache_query= 0; - return new (thd->mem_root) Item_func_uuid_short(); + DBUG_RETURN(new (thd->mem_root) Item_func_uuid_short()); } @@ -4644,7 +4650,7 @@ Create_func_version Create_func_version::s_singleton; Item* Create_func_version::create(THD *thd) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); return new (thd->mem_root) Item_static_string_func("version()", server_version, (uint) strlen(server_version), diff --git a/sql/lex.h b/sql/lex.h index a12cf0c4b3e..7961339c4f3 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -196,6 +196,7 @@ static SYMBOL symbols[] = { { "ENGINE", SYM(ENGINE_SYM)}, { "ENGINES", SYM(ENGINES_SYM)}, { "ENUM", SYM(ENUM)}, + { "ERROR", SYM(ERROR_SYM)}, { "ERRORS", SYM(ERRORS)}, { "ESCAPE", SYM(ESCAPE_SYM)}, { "ESCAPED", SYM(ESCAPED)}, @@ -230,6 +231,7 @@ static SYMBOL symbols[] = { { "FULL", SYM(FULL)}, { "FULLTEXT", SYM(FULLTEXT_SYM)}, { "FUNCTION", SYM(FUNCTION_SYM)}, + { "GENERAL", SYM(GENERAL)}, { "GEOMETRY", SYM(GEOMETRY_SYM)}, { "GEOMETRYCOLLECTION",SYM(GEOMETRYCOLLECTION)}, { "GET_FORMAT", SYM(GET_FORMAT)}, @@ -438,6 +440,7 @@ static SYMBOL symbols[] = { { "REDUNDANT", SYM(REDUNDANT_SYM)}, { "REFERENCES", SYM(REFERENCES)}, { "REGEXP", SYM(REGEXP)}, + { "RELAY", SYM(RELAY)}, { "RELAYLOG", SYM(RELAYLOG_SYM)}, { "RELAY_LOG_FILE", SYM(RELAY_LOG_FILE_SYM)}, { "RELAY_LOG_POS", SYM(RELAY_LOG_POS_SYM)}, @@ -493,6 +496,7 @@ static SYMBOL symbols[] = { { "SIGNED", SYM(SIGNED_SYM)}, { "SIMPLE", SYM(SIMPLE_SYM)}, { "SLAVE", SYM(SLAVE)}, + { "SLOW", SYM(SLOW)}, { "SNAPSHOT", SYM(SNAPSHOT_SYM)}, { "SMALLINT", SYM(SMALLINT)}, { "SOCKET", SYM(SOCKET_SYM)}, diff --git a/sql/log.cc b/sql/log.cc index fb990d60186..15941cee376 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -189,115 +189,155 @@ private: }; /* - Helper class to store binary log transaction data. + Helper classes to store non-transactional and transactional data + before copying it to the binary log. */ -class binlog_trx_data { +class binlog_cache_data +{ public: - binlog_trx_data() - : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0), - before_stmt_pos(MY_OFF_T_UNDEF) + binlog_cache_data(): m_pending(0), before_stmt_pos (MY_OFF_T_UNDEF), + incident(FALSE) { - trans_log.end_of_file= max_binlog_cache_size; + cache_log.end_of_file= max_binlog_cache_size; } - ~binlog_trx_data() + ~binlog_cache_data() { - DBUG_ASSERT(pending() == NULL); - close_cached_file(&trans_log); + DBUG_ASSERT(empty()); + close_cached_file(&cache_log); } - my_off_t position() const { - return my_b_tell(&trans_log); + bool empty() const + { + return pending() == NULL && my_b_tell(&cache_log) == 0; } - bool empty() const + Rows_log_event *pending() const { - return pending() == NULL && my_b_tell(&trans_log) == 0; + return m_pending; } - /* - Truncate the transaction cache to a certain position. This - includes deleting the pending event. - */ - void truncate(my_off_t pos) + void set_pending(Rows_log_event *const pending) { - DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos)); - DBUG_PRINT("info", ("before_stmt_pos=%lu", (ulong) pos)); - if (pending()) - { - delete pending(); - } - set_pending(0); - reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0); - trans_log.end_of_file= max_binlog_cache_size; - if (pos < before_stmt_pos) - before_stmt_pos= MY_OFF_T_UNDEF; + m_pending= pending; + } - /* - The only valid positions that can be truncated to are at the - beginning of a statement. We are relying on this fact to be able - to set the at_least_one_stmt_committed flag correctly. In other word, if - we are truncating to the beginning of the transaction cache, - there will be no statements in the cache, otherwhise, we will - have at least one statement in the transaction cache. - */ - at_least_one_stmt_committed= (pos > 0); + void set_incident(void) + { + incident= TRUE; + } + + bool has_incident(void) + { + return(incident); } - /* - Reset the entire contents of the transaction cache, emptying it - completely. - */ - void reset() { - if (!empty()) - truncate(0); - before_stmt_pos= MY_OFF_T_UNDEF; + void reset() + { + truncate(0); incident= FALSE; - trans_log.end_of_file= max_binlog_cache_size; + before_stmt_pos= MY_OFF_T_UNDEF; + cache_log.end_of_file= max_binlog_cache_size; DBUG_ASSERT(empty()); } - Rows_log_event *pending() const + my_off_t get_byte_position() const { - return m_pending; + return my_b_tell(&cache_log); } - void set_pending(Rows_log_event *const pending) + my_off_t get_prev_position() { - m_pending= pending; + return(before_stmt_pos); } - IO_CACHE trans_log; // The transaction cache - - void set_incident(void) + void set_prev_position(my_off_t pos) { - incident= TRUE; + before_stmt_pos= pos; } - bool has_incident(void) + void restore_prev_position() { - return(incident); + truncate(before_stmt_pos); + } + + void restore_savepoint(my_off_t pos) + { + truncate(pos); + if (pos < before_stmt_pos) + before_stmt_pos= MY_OFF_T_UNDEF; } - /** - Boolean that is true if there is at least one statement in the - transaction cache. + /* + Cache to store data before copying it to the binary log. */ - bool at_least_one_stmt_committed; - bool incident; + IO_CACHE cache_log; private: /* - Pending binrows event. This event is the event where the rows are - currently written. + Pending binrows event. This event is the event where the rows are currently + written. */ Rows_log_event *m_pending; -public: /* Binlog position before the start of the current statement. */ my_off_t before_stmt_pos; + + /* + This indicates that some events did not get into the cache and most likely + it is corrupted. + */ + bool incident; + + /* + It truncates the cache to a certain position. This includes deleting the + pending event. + */ + void truncate(my_off_t pos) + { + DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos)); + if (pending()) + { + delete pending(); + set_pending(0); + } + reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, 0); + cache_log.end_of_file= max_binlog_cache_size; + } + + binlog_cache_data& operator=(const binlog_cache_data& info); + binlog_cache_data(const binlog_cache_data& info); +}; + +class binlog_cache_mngr { +public: + binlog_cache_mngr() {} + + void reset_cache(binlog_cache_data* cache_data) + { + cache_data->reset(); + } + + binlog_cache_data* get_binlog_cache_data(bool is_transactional) + { + return (is_transactional ? &trx_cache : &stmt_cache); + } + + IO_CACHE* get_binlog_cache_log(bool is_transactional) + { + return (is_transactional ? &trx_cache.cache_log : &stmt_cache.cache_log); + } + + binlog_cache_data stmt_cache; + + binlog_cache_data trx_cache; + +private: + + binlog_cache_mngr& operator=(const binlog_cache_mngr& info); + binlog_cache_mngr(const binlog_cache_mngr& info); }; handlerton *binlog_hton; @@ -977,6 +1017,54 @@ bool LOGGER::flush_logs(THD *thd) } +/** + Close and reopen the slow log (with locks). + + @returns FALSE. +*/ +bool LOGGER::flush_slow_log() +{ + /* + Now we lock logger, as nobody should be able to use logging routines while + log tables are closed + */ + logger.lock_exclusive(); + + /* Reopen slow log file */ + if (opt_slow_log) + file_log_handler->get_mysql_slow_log()->reopen_file(); + + /* End of log flush */ + logger.unlock(); + + return 0; +} + + +/** + Close and reopen the general log (with locks). + + @returns FALSE. +*/ +bool LOGGER::flush_general_log() +{ + /* + Now we lock logger, as nobody should be able to use logging routines while + log tables are closed + */ + logger.lock_exclusive(); + + /* Reopen general log file */ + if (opt_log) + file_log_handler->get_mysql_log()->reopen_file(); + + /* End of log flush */ + logger.unlock(); + + return 0; +} + + /* Log slow query with all enabled log event handlers @@ -1305,26 +1393,6 @@ int LOGGER::set_handlers(uint error_log_printer, return 0; } -/** - This function checks if a transactional talbe was updated by the - current statement. - - @param thd The client thread that executed the current statement. - @return - @c true if a transactional table was updated, @false otherwise. -*/ -static bool stmt_has_updated_trans_table(THD *thd) -{ - Ha_trx_info *ha_info; - - for (ha_info= thd->transaction.stmt.ha_list; ha_info && ha_info->is_started(); ha_info= ha_info->next()) - { - if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton) - return (TRUE); - } - return (FALSE); -} - /* Save position of binary log transaction cache. @@ -1347,10 +1415,10 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos) DBUG_ASSERT(pos != NULL); if (thd_get_ha_data(thd, binlog_hton) == NULL) thd->binlog_setup_trx_data(); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); DBUG_ASSERT(mysql_bin_log.is_open()); - *pos= trx_data->position(); + *pos= cache_mngr->trx_cache.get_byte_position(); DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos)); DBUG_VOID_RETURN; } @@ -1381,9 +1449,9 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos) /* Only true if binlog_trans_log_savepos() wasn't called before */ DBUG_ASSERT(pos != ~(my_off_t) 0); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - trx_data->truncate(pos); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + cache_mngr->trx_cache.restore_savepoint(pos); DBUG_VOID_RETURN; } @@ -1412,115 +1480,127 @@ int binlog_init(void *p) static int binlog_close_connection(handlerton *hton, THD *thd) { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - DBUG_ASSERT(trx_data->empty()); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + DBUG_ASSERT(cache_mngr->trx_cache.empty() && cache_mngr->stmt_cache.empty()); thd_set_ha_data(thd, binlog_hton, NULL); - trx_data->~binlog_trx_data(); - my_free((uchar*)trx_data, MYF(0)); + cache_mngr->~binlog_cache_mngr(); + my_free((uchar*)cache_mngr, MYF(0)); return 0; } -/* - End a transaction. +/** + This function flushes a transactional cache upon commit/rollback. - SYNOPSIS - binlog_end_trans() + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache data to be flushed + @param end_ev The end event either commit/rollback. - thd The thread whose transaction should be ended - trx_data Pointer to the transaction data to use - end_ev The end event to use, or NULL - all True if the entire transaction should be ended, false if - only the statement transaction should be ended. + @return + nonzero if an error pops up when flushing the transactional cache. +*/ +static int +binlog_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, + Log_event *end_ev) +{ + DBUG_ENTER("binlog_flush_trx_cache"); + int error=0; + IO_CACHE *cache_log= &cache_mngr->trx_cache.cache_log; - DESCRIPTION + /* + This function handles transactional changes and as such + this flag equals to true. + */ + bool const is_transactional= TRUE; - End the currently open transaction. The transaction can be either - a real transaction (if 'all' is true) or a statement transaction - (if 'all' is false). + if (thd->binlog_flush_pending_rows_event(TRUE, is_transactional)) + DBUG_RETURN(1); + /* + Doing a commit or a rollback including non-transactional tables, + i.e., ending a transaction where we might write the transaction + cache to the binary log. + + We can always end the statement when ending a transaction since + transactions are not allowed inside stored functions. If they + were, we would have to ensure that we're not ending a statement + inside a stored function. + */ + error= mysql_bin_log.write(thd, &cache_mngr->trx_cache.cache_log, end_ev, + cache_mngr->trx_cache.has_incident()); + cache_mngr->reset_cache(&cache_mngr->trx_cache); - If 'end_ev' is NULL, the transaction is a rollback of only - transactional tables, so the transaction cache will be truncated - to either just before the last opened statement transaction (if - 'all' is false), or reset completely (if 'all' is true). - */ + /* + We need to step the table map version after writing the + transaction cache to disk. + */ + mysql_bin_log.update_table_map_version(); + statistic_increment(binlog_cache_use, &LOCK_status); + if (cache_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + cache_log->disk_writes= 0; + } + + DBUG_ASSERT(cache_mngr->trx_cache.empty()); + DBUG_RETURN(error); +} + +/** + This function truncates the transactional cache upon committing or rolling + back either a transaction or a statement. + + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache data to be flushed + @param all @c true means truncate the transaction, otherwise the + statement must be truncated. + + @return + nonzero if an error pops up when truncating the transactional cache. +*/ static int -binlog_end_trans(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev, bool all) +binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) { - DBUG_ENTER("binlog_end_trans"); + DBUG_ENTER("binlog_truncate_trx_cache"); int error=0; - IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_PRINT("enter", ("transaction: %s end_ev: 0x%lx", - all ? "all" : "stmt", (long) end_ev)); - DBUG_PRINT("info", ("thd->options={ %s%s}", - FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), - FLAGSTR(thd->options, OPTION_BEGIN))); + /* + This function handles transactional changes and as such this flag + equals to true. + */ + bool const is_transactional= TRUE; + DBUG_PRINT("info", ("thd->options={ %s%s}, transaction: %s", + FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->options, OPTION_BEGIN), + all ? "all" : "stmt")); /* - NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of - only transactional tables. If the transaction contain changes to - any non-transactiona tables, we need write the transaction and log - a ROLLBACK last. + If rolling back an entire transaction or a single statement not + inside a transaction, we reset the transaction cache. */ - if (end_ev != NULL) + thd->binlog_remove_pending_rows_event(TRUE, is_transactional); + if (all || !thd->in_multi_stmt_transaction()) { - if (thd->binlog_flush_pending_rows_event(TRUE)) - DBUG_RETURN(1); - /* - Doing a commit or a rollback including non-transactional tables, - i.e., ending a transaction where we might write the transaction - cache to the binary log. - - We can always end the statement when ending a transaction since - transactions are not allowed inside stored functions. If they - were, we would have to ensure that we're not ending a statement - inside a stored function. - */ - error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev, - trx_data->has_incident()); - trx_data->reset(); + if (cache_mngr->trx_cache.has_incident()) + error= mysql_bin_log.write_incident(thd, TRUE); - /* - We need to step the table map version after writing the - transaction cache to disk. - */ - mysql_bin_log.update_table_map_version(); - statistic_increment(binlog_cache_use, &LOCK_status); - if (trans_log->disk_writes != 0) - { - statistic_increment(binlog_cache_disk_use, &LOCK_status); - trans_log->disk_writes= 0; - } + cache_mngr->reset_cache(&cache_mngr->trx_cache); + + thd->clear_binlog_table_maps(); } + /* + If rolling back a statement in a transaction, we truncate the + transaction cache to remove the statement. + */ else - { - /* - If rolling back an entire transaction or a single statement not - inside a transaction, we reset the transaction cache. - - If rolling back a statement in a transaction, we truncate the - transaction cache to remove the statement. - */ - thd->binlog_remove_pending_rows_event(TRUE); - if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) - { - if (trx_data->has_incident()) - error= mysql_bin_log.write_incident(thd, TRUE); - trx_data->reset(); - } - else // ...statement - trx_data->truncate(trx_data->before_stmt_pos); + cache_mngr->trx_cache.restore_prev_position(); - /* - We need to step the table map version on a rollback to ensure - that a new table map event is generated instead of the one that - was written to the thrown-away transaction cache. - */ - mysql_bin_log.update_table_map_version(); - } + /* + We need to step the table map version on a rollback to ensure that a new + table map event is generated instead of the one that was written to the + thrown-away transaction cache. + */ + mysql_bin_log.update_table_map_version(); - DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); + DBUG_ASSERT(thd->binlog_get_pending_rows_event(is_transactional) == NULL); DBUG_RETURN(error); } @@ -1536,10 +1616,56 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) } /** + This function flushes the non-transactional to the binary log upon + committing or rolling back a statement. + + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache data to be flushed + + @return + nonzero if an error pops up when flushing the non-transactional cache. +*/ +static int +binlog_flush_stmt_cache(THD *thd, binlog_cache_mngr *cache_mngr) +{ + int error= 0; + DBUG_ENTER("binlog_flush_stmt_cache"); + /* + If we are flushing the statement cache, it means that the changes get + through otherwise the cache is empty and this routine should not be called. + */ + DBUG_ASSERT(cache_mngr->stmt_cache.has_incident() == FALSE); + /* + This function handles non-transactional changes and as such this flag equals + to false. + */ + bool const is_transactional= FALSE; + IO_CACHE *cache_log= &cache_mngr->stmt_cache.cache_log; + thd->binlog_flush_pending_rows_event(TRUE, is_transactional); + Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE, TRUE, 0); + if ((error= mysql_bin_log.write(thd, cache_log, &qev, + cache_mngr->stmt_cache.has_incident()))) + DBUG_RETURN(error); + cache_mngr->reset_cache(&cache_mngr->stmt_cache); + + /* + We need to step the table map version after writing the + transaction cache to disk. + */ + mysql_bin_log.update_table_map_version(); + statistic_increment(binlog_cache_use, &LOCK_status); + if (cache_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + cache_log->disk_writes= 0; + } + DBUG_RETURN(error); +} + +/** This function is called once after each statement. - It has the responsibility to flush the transaction cache to the - binlog file on commits. + It has the responsibility to flush the caches to the binary log on commits. @param hton The binlog handlerton. @param thd The client thread that executes the transaction. @@ -1552,57 +1678,53 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) { int error= 0; DBUG_ENTER("binlog_commit"); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + bool const in_transaction= thd->in_multi_stmt_transaction(); + + DBUG_PRINT("debug", + ("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", + all, + YESNO(in_transaction), + YESNO(thd->transaction.all.modified_non_trans_table), + YESNO(thd->transaction.stmt.modified_non_trans_table))); + + if (!cache_mngr->stmt_cache.empty()) + { + binlog_flush_stmt_cache(thd, cache_mngr); + } - if (trx_data->empty()) + if (cache_mngr->trx_cache.empty()) { - // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid() - trx_data->reset(); + /* + we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() + */ + cache_mngr->reset_cache(&cache_mngr->trx_cache); DBUG_RETURN(0); } /* We commit the transaction if: - - We are not in a transaction and committing a statement, or - - - We are in a transaction and a full transaction is committed - - Otherwise, we accumulate the statement + - We are in a transaction and a full transaction is committed. + Otherwise, we accumulate the changes. */ - ulonglong const in_transaction= - thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN); - DBUG_PRINT("debug", - ("all: %d, empty: %s, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", - all, - YESNO(trx_data->empty()), - YESNO(in_transaction), - YESNO(thd->transaction.all.modified_non_trans_table), - YESNO(thd->transaction.stmt.modified_non_trans_table))); - if (!in_transaction || all || - (!all && !trx_data->at_least_one_stmt_committed && - !stmt_has_updated_trans_table(thd) && - thd->transaction.stmt.modified_non_trans_table)) + if (!in_transaction || all) { - Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); - error= binlog_end_trans(thd, trx_data, &qev, all); + Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE, TRUE, 0); + error= binlog_flush_trx_cache(thd, cache_mngr, &qev); } - trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0; - + /* + This is part of the stmt rollback. + */ if (!all) - trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt commit + cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); DBUG_RETURN(error); } /** - This function is called when a transaction involving a transactional - table is rolled back. - - It has the responsibility to flush the transaction cache to the - binlog file. However, if the transaction does not involve - non-transactional tables, nothing needs to be logged. + This function is called when a transaction or a statement is rolled back. @param hton The binlog handlerton. @param thd The client thread that executes the transaction. @@ -1615,18 +1737,38 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) { DBUG_ENTER("binlog_rollback"); int error=0; - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - - if (trx_data->empty()) { - trx_data->reset(); - DBUG_RETURN(0); - } + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", YESNO(all), YESNO(thd->transaction.all.modified_non_trans_table), YESNO(thd->transaction.stmt.modified_non_trans_table))); + + /* + If an incident event is set we do not flush the content of the statement + cache because it may be corrupted. + */ + if (cache_mngr->stmt_cache.has_incident()) + { + mysql_bin_log.write_incident(thd, TRUE); + cache_mngr->reset_cache(&cache_mngr->stmt_cache); + } + else if (!cache_mngr->stmt_cache.empty()) + { + binlog_flush_stmt_cache(thd, cache_mngr); + } + + if (cache_mngr->trx_cache.empty()) + { + /* + we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() + */ + cache_mngr->reset_cache(&cache_mngr->trx_cache); + DBUG_RETURN(0); + } + + if (mysql_bin_log.check_write_error(thd)) { /* @@ -1637,52 +1779,46 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) */ DBUG_ASSERT(!all); /* - We reach this point if either only transactional tables were modified or - the effect of a statement that did not get into the binlog needs to be - rolled back. In the latter case, if a statement changed non-transactional - tables or had the OPTION_KEEP_LOG associated, we write an incident event - to the binlog in order to stop slaves and notify users that some changes - on the master did not get into the binlog and slaves will be inconsistent. - On the other hand, if a statement is transactional, we just safely roll it - back. + We reach this point if the effect of a statement did not properly get into + a cache and need to be rolled back. */ - if ((thd->transaction.stmt.modified_non_trans_table || - (thd->options & OPTION_KEEP_LOG)) && - mysql_bin_log.check_write_error(thd)) - trx_data->set_incident(); - error= binlog_end_trans(thd, trx_data, 0, all); + error= binlog_truncate_trx_cache(thd, cache_mngr, all); } else - { - /* - We flush the cache with a rollback, wrapped in a beging/rollback if: - . aborting a transaction that modified a non-transactional table; + { + /* + We flush the cache wrapped in a beging/rollback if: + . aborting a transcation that modified a non-transactional table or; . aborting a statement that modified both transactional and - non-transactional tables but which is not in the boundaries of any - transaction or there was no early change; + non-transctional tables but which is not in the boundaries of any + transaction; . the OPTION_KEEP_LOG is activate. */ - if ((all && thd->transaction.all.modified_non_trans_table) || + if (thd->variables.binlog_format == BINLOG_FORMAT_STMT && + ((all && thd->transaction.all.modified_non_trans_table) || (!all && thd->transaction.stmt.modified_non_trans_table && - !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) || - (!all && thd->transaction.stmt.modified_non_trans_table && - !trx_data->at_least_one_stmt_committed && - thd->current_stmt_binlog_row_based) || - ((thd->options & OPTION_KEEP_LOG))) + !thd->in_multi_stmt_transaction()) || + (thd->options & OPTION_KEEP_LOG))) { - Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); - error= binlog_end_trans(thd, trx_data, &qev, all); + Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE, TRUE, 0); + error= binlog_flush_trx_cache(thd, cache_mngr, &qev); } /* Otherwise, we simply truncate the cache as there is no change on non-transactional tables as follows. */ - else if ((all && !thd->transaction.all.modified_non_trans_table) || - (!all && !thd->transaction.stmt.modified_non_trans_table)) - error= binlog_end_trans(thd, trx_data, 0, all); + else if (all || (!all && + (!thd->transaction.stmt.modified_non_trans_table || + !thd->in_multi_stmt_transaction() || + thd->variables.binlog_format != BINLOG_FORMAT_STMT))) + error= binlog_truncate_trx_cache(thd, cache_mngr, all); } + + /* + This is part of the stmt rollback. + */ if (!all) - trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt rollback + cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); DBUG_RETURN(error); } @@ -1758,7 +1894,8 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); int const error= thd->binlog_query(THD::STMT_QUERY_TYPE, - thd->query(), thd->query_length(), TRUE, FALSE, errcode); + thd->query(), thd->query_length(), TRUE, FALSE, FALSE, + errcode); DBUG_RETURN(error); } @@ -1777,7 +1914,8 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); int error= thd->binlog_query(THD::STMT_QUERY_TYPE, - thd->query(), thd->query_length(), TRUE, FALSE, errcode); + thd->query(), thd->query_length(), TRUE, FALSE, FALSE, + errcode); DBUG_RETURN(error); } binlog_trans_log_truncate(thd, *(my_off_t*)sv); @@ -4055,27 +4193,67 @@ bool MYSQL_BIN_LOG::is_query_in_union(THD *thd, query_id_t query_id_param) int THD::binlog_setup_trx_data() { DBUG_ENTER("THD::binlog_setup_trx_data"); - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); - if (trx_data) + if (cache_mngr) DBUG_RETURN(0); // Already set up - trx_data= (binlog_trx_data*) my_malloc(sizeof(binlog_trx_data), MYF(MY_ZEROFILL)); - if (!trx_data || - open_cached_file(&trx_data->trans_log, mysql_tmpdir, + cache_mngr= (binlog_cache_mngr*) my_malloc(sizeof(binlog_cache_mngr), MYF(MY_ZEROFILL)); + if (!cache_mngr || + open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir, + LOG_PREFIX, binlog_cache_size, MYF(MY_WME)) || + open_cached_file(&cache_mngr->trx_cache.cache_log, mysql_tmpdir, LOG_PREFIX, binlog_cache_size, MYF(MY_WME))) { - my_free((uchar*)trx_data, MYF(MY_ALLOW_ZERO_PTR)); + my_free((uchar*)cache_mngr, MYF(MY_ALLOW_ZERO_PTR)); DBUG_RETURN(1); // Didn't manage to set it up } - thd_set_ha_data(this, binlog_hton, trx_data); + thd_set_ha_data(this, binlog_hton, cache_mngr); - trx_data= new (thd_get_ha_data(this, binlog_hton)) binlog_trx_data; + cache_mngr= new (thd_get_ha_data(this, binlog_hton)) binlog_cache_mngr; DBUG_RETURN(0); } +/** + This function checks if a transactional talbe was updated by the + current transaction. + + @param thd The client thread that executed the current statement. + @return + @c true if a transactional table was updated, @false otherwise. +*/ +bool +trans_has_updated_trans_table(THD* thd) +{ + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + return (cache_mngr ? my_b_tell (&cache_mngr->trx_cache.cache_log) : 0); +} + +/** + This function checks if a transactional talbe was updated by the + current statement. + + @param thd The client thread that executed the current statement. + @return + @c true if a transactional table was updated, @false otherwise. +*/ +bool +stmt_has_updated_trans_table(THD *thd) +{ + Ha_trx_info *ha_info; + + for (ha_info= thd->transaction.stmt.ha_list; ha_info; ha_info= ha_info->next()) + { + if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton) + return (TRUE); + } + return (FALSE); +} + /* Function to start a statement and optionally a transaction for the binary log. @@ -4089,11 +4267,10 @@ int THD::binlog_setup_trx_data() - Start a transaction if not in autocommit mode or if a BEGIN statement has been seen. - - Start a statement transaction to allow us to truncate the binary - log. + - Start a statement transaction to allow us to truncate the cache. - Save the currrent binlog position so that we can roll back the - statement by truncating the transaction log. + statement by truncating the cache. We only update the saved position if the old one was undefined, the reason is that there are some cases (e.g., for CREATE-SELECT) @@ -4107,15 +4284,15 @@ int THD::binlog_setup_trx_data() void THD::binlog_start_trans_and_stmt() { - binlog_trx_data *trx_data= (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); DBUG_ENTER("binlog_start_trans_and_stmt"); - DBUG_PRINT("enter", ("trx_data: 0x%lx trx_data->before_stmt_pos: %lu", - (long) trx_data, - (trx_data ? (ulong) trx_data->before_stmt_pos : + DBUG_PRINT("enter", ("cache_mngr: %p cache_mngr->trx_cache.get_prev_position(): %lu", + cache_mngr, + (cache_mngr ? (ulong) cache_mngr->trx_cache.get_prev_position() : (ulong) 0))); - if (trx_data == NULL || - trx_data->before_stmt_pos == MY_OFF_T_UNDEF) + if (cache_mngr == NULL || + cache_mngr->trx_cache.get_prev_position() == MY_OFF_T_UNDEF) { this->binlog_set_stmt_begin(); if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) @@ -4136,27 +4313,35 @@ THD::binlog_start_trans_and_stmt() } void THD::binlog_set_stmt_begin() { - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); /* - The call to binlog_trans_log_savepos() might create the trx_data + The call to binlog_trans_log_savepos() might create the cache_mngr structure, if it didn't exist before, so we save the position into an auto variable and then write it into the transaction - data for the binary log (i.e., trx_data). + data for the binary log (i.e., cache_mngr). */ my_off_t pos= 0; binlog_trans_log_savepos(this, &pos); - trx_data= (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); - trx_data->before_stmt_pos= pos; + cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + cache_mngr->trx_cache.set_prev_position(pos); } -/* - Write a table map to the binary log. - */ - -int THD::binlog_write_table_map(TABLE *table, bool is_trans) +/** + This function writes a table map to the binary log. + Note that in order to keep the signature uniform with related methods, + we use a redundant parameter to indicate whether a transactional table + was changed or not. + + @param table a pointer to the table. + @param is_transactional @c true indicates a transactional table, + otherwise @c false a non-transactional. + @return + nonzero if an error pops up when writing the table map event. +*/ +int THD::binlog_write_table_map(TABLE *table, bool is_transactional) { int error; DBUG_ENTER("THD::binlog_write_table_map"); @@ -4165,19 +4350,21 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) table->s->table_map_id)); /* Pre-conditions */ - DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); - Table_map_log_event::flag_set const - flags= Table_map_log_event::TM_NO_FLAGS; - Table_map_log_event - the_event(this, table, table->s->table_map_id, is_trans, flags); + the_event(this, table, table->s->table_map_id, is_transactional); - if (is_trans && binlog_table_maps == 0) + if (binlog_table_maps == 0) binlog_start_trans_and_stmt(); - if ((error= mysql_bin_log.write(&the_event))) + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + + IO_CACHE *file= cache_mngr->get_binlog_cache_log(is_transactional); + + if ((error= the_event.write(file))) DBUG_RETURN(error); binlog_table_maps++; @@ -4185,144 +4372,163 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) DBUG_RETURN(0); } +/** + This function retrieves a pending row event from a cache which is + specified through the parameter @c is_transactional. Respectively, when it + is @c true, the pending event is returned from the transactional cache. + Otherwise from the non-transactional cache. + + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. + @return + The row event if any. +*/ Rows_log_event* -THD::binlog_get_pending_rows_event() const +THD::binlog_get_pending_rows_event(bool is_transactional) const { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + Rows_log_event* rows= NULL; + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + /* - This is less than ideal, but here's the story: If there is no - trx_data, prepare_pending_rows_event() has never been called - (since the trx_data is set up there). In that case, we just return - NULL. + This is less than ideal, but here's the story: If there is no cache_mngr, + prepare_pending_rows_event() has never been called (since the cache_mngr + is set up there). In that case, we just return NULL. */ - return trx_data ? trx_data->pending() : NULL; + if (cache_mngr) + { + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(is_transactional); + + rows= cache_data->pending(); + } + return (rows); } +/** + This function stores a pending row event into a cache which is specified + through the parameter @c is_transactional. Respectively, when it is @c + true, the pending event is stored into the transactional cache. Otherwise + into the non-transactional cache. + + @param evt a pointer to the row event. + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. +*/ void -THD::binlog_set_pending_rows_event(Rows_log_event* ev) +THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional) { if (thd_get_ha_data(this, binlog_hton) == NULL) binlog_setup_trx_data(); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + + DBUG_ASSERT(cache_mngr); - DBUG_ASSERT(trx_data); - trx_data->set_pending(ev); + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(is_transactional); + + cache_data->set_pending(ev); } /** - Remove the pending rows event, discarding any outstanding rows. - - If there is no pending rows event available, this is effectively a + This function removes the pending rows event, discarding any outstanding + rows. If there is no pending rows event available, this is effectively a no-op. - */ + + @param thd a pointer to the user thread. + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. +*/ int -MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd) +MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::remove_pending_rows_event"); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + DBUG_ASSERT(cache_mngr); - DBUG_ASSERT(trx_data); + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(is_transactional); - if (Rows_log_event* pending= trx_data->pending()) + if (Rows_log_event* pending= cache_data->pending()) { delete pending; - trx_data->set_pending(NULL); + cache_data->set_pending(NULL); } DBUG_RETURN(0); } /* - Moves the last bunch of rows from the pending Rows event to the binlog - (either cached binlog if transaction, or disk binlog). Sets a new pending - event. + Moves the last bunch of rows from the pending Rows event to a cache (either + transactional cache if is_transaction is @c true, or the non-transactional + cache otherwise. Sets a new pending event. + + @param thd a pointer to the user thread. + @param evt a pointer to the row event. + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. */ int MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, - Rows_log_event* event) + Rows_log_event* event, + bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); DBUG_ASSERT(mysql_bin_log.is_open()); DBUG_PRINT("enter", ("event: 0x%lx", (long) event)); int error= 0; + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + DBUG_ASSERT(cache_mngr); - DBUG_ASSERT(trx_data); + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(is_transactional); - DBUG_PRINT("info", ("trx_data->pending(): 0x%lx", (long) trx_data->pending())); + DBUG_PRINT("info", ("cache_mngr->pending(): 0x%lx", (long) cache_data->pending())); - if (Rows_log_event* pending= trx_data->pending()) + if (Rows_log_event* pending= cache_data->pending()) { - IO_CACHE *file= &log_file; + IO_CACHE *file= &cache_data->cache_log; /* - Decide if we should write to the log file directly or to the - transaction log. - */ - if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log)) - file= &trx_data->trans_log; - - /* - If we are writing to the log file directly, we could avoid - locking the log. This does not work since we need to step the - m_table_map_version below, and that change has to be protected - by the LOCK_log mutex. - */ - pthread_mutex_lock(&LOCK_log); - - /* - Write pending event to log file or transaction cache + Write pending event to the cache. */ if (pending->write(file)) { - pthread_mutex_unlock(&LOCK_log); set_write_error(thd); + if (check_write_error(thd) && cache_data && + thd->transaction.stmt.modified_non_trans_table) + cache_data->set_incident(); DBUG_RETURN(1); } /* We step the table map version if we are writing an event - representing the end of a statement. We do this regardless of - wheather we write to the transaction cache or to directly to the - file. - - In an ideal world, we could avoid stepping the table map version - if we were writing to a transaction cache, since we could then - reuse the table map that was written earlier in the transaction - cache. This does not work since STMT_END_F implies closing all - table mappings on the slave side. + representing the end of a statement. + In an ideal world, we could avoid stepping the table map version, + since we could then reuse the table map that was written earlier + in the cache. This does not work since STMT_END_F implies closing + all table mappings on the slave side. + TODO: Find a solution so that table maps does not have to be written several times within a transaction. - */ + */ if (pending->get_flags(Rows_log_event::STMT_END_F)) ++m_table_map_version; delete pending; - - if (file == &log_file) - { - error= flush_and_sync(0); - if (!error) - { - signal_update(); - rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); - } - } - - pthread_mutex_unlock(&LOCK_log); } - thd->binlog_set_pending_rows_event(event); + thd->binlog_set_pending_rows_event(event, is_transactional); DBUG_RETURN(error); } @@ -4336,6 +4542,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) THD *thd= event_info->thd; bool error= 1; DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)"); + binlog_cache_data *cache_data= 0; if (thd->binlog_evt_union.do_union) { @@ -4344,27 +4551,22 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) We will log the function call to the binary log on function exit */ thd->binlog_evt_union.unioned_events= TRUE; - thd->binlog_evt_union.unioned_events_trans |= event_info->cache_stmt; + thd->binlog_evt_union.unioned_events_trans |= + event_info->use_trans_cache(); DBUG_RETURN(0); } /* - Flush the pending rows event to the transaction cache or to the - log file. Since this function potentially aquire the LOCK_log - mutex, we do this before aquiring the LOCK_log mutex in this - function. - We only end the statement if we are in a top-level statement. If we are inside a stored function, we do not end the statement since this will close all tables on the slave. */ bool const end_stmt= thd->prelocked_mode && thd->lex->requires_prelocking(); - if (thd->binlog_flush_pending_rows_event(end_stmt)) + if (thd->binlog_flush_pending_rows_event(end_stmt, + event_info->use_trans_cache())) DBUG_RETURN(error); - pthread_mutex_lock(&LOCK_log); - /* In most cases this is only called if 'is_open()' is true; in fact this is mostly called if is_open() *was* true a few instructions before, but it @@ -4372,7 +4574,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) */ if (likely(is_open())) { - IO_CACHE *file= &log_file; #ifdef HAVE_REPLICATION /* In the future we need to add to the following if tests like @@ -4381,67 +4582,70 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) */ const char *local_db= event_info->get_db(); if ((thd && !(thd->options & OPTION_BIN_LOG)) || - (!binlog_filter->db_ok(local_db))) - { - pthread_mutex_unlock(&LOCK_log); + !binlog_filter->db_ok(local_db)) DBUG_RETURN(0); - } #endif /* HAVE_REPLICATION */ - /* - Should we write to the binlog cache or to the binlog on disk? - Write to the binlog cache if: - - it is already not empty (meaning we're in a transaction; note that the - present event could be about a non-transactional table, but still we need - to write to the binlog cache in that case to handle updates to mixed - trans/non-trans table types the best possible in binlogging) - - or if the event asks for it (cache_stmt == TRUE). - */ - if (opt_using_transactions && thd) + IO_CACHE *file= NULL; + + if (event_info->use_direct_logging()) + { + file= &log_file; + pthread_mutex_lock(&LOCK_log); + } + else { if (thd->binlog_setup_trx_data()) goto err; - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - IO_CACHE *trans_log= &trx_data->trans_log; - my_off_t trans_log_pos= my_b_tell(trans_log); - if (event_info->get_cache_stmt() || trans_log_pos != 0 || - stmt_has_updated_trans_table(thd)) + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + /* + If we are about to use write rows, we just need to check the type of + the event (either transactional or non-transactional) in order to + choose the cache. + */ + if (thd->is_current_stmt_binlog_format_row()) { - DBUG_PRINT("info", ("Using trans_log: cache: %d, trans_log_pos: %lu", - event_info->get_cache_stmt(), - (ulong) trans_log_pos)); - thd->binlog_start_trans_and_stmt(); - file= trans_log; + file= cache_mngr->get_binlog_cache_log(event_info->use_trans_cache()); + cache_data= cache_mngr->get_binlog_cache_data(event_info->use_trans_cache()); } /* - TODO as Mats suggested, for all the cases above where we write to - trans_log, it sounds unnecessary to lock LOCK_log. We should rather - test first if we want to write to trans_log, and if not, lock - LOCK_log. + However, if we are about to write statements we need to consider other + things. We use the non-transactional cache when: + + . the transactional cache is empty which means that there were no + early statement on behalf of the transaction. + . the respective event is tagged as non-transactional. */ - } + else if (cache_mngr->trx_cache.empty() && + !event_info->use_trans_cache()) + { + file= &cache_mngr->stmt_cache.cache_log; + cache_data= &cache_mngr->stmt_cache; + } + else + { + file= &cache_mngr->trx_cache.cache_log; + cache_data= &cache_mngr->trx_cache; + } + thd->binlog_start_trans_and_stmt(); + } DBUG_PRINT("info",("event type: %d",event_info->get_type_code())); /* - No check for auto events flag here - this write method should - never be called if auto-events are enabled - */ - - /* - 1. Write first log events which describe the 'run environment' - of the SQL command - */ + No check for auto events flag here - this write method should + never be called if auto-events are enabled. - /* - If row-based binlogging, Insert_id, Rand and other kind of "setting - context" events are not needed. + Write first log events which describe the 'run environment' + of the SQL command. If row-based binlogging, Insert_id, Rand + and other kind of "setting context" events are not needed. */ if (thd) { - if (!thd->current_stmt_binlog_row_based) + if (!thd->is_current_stmt_binlog_format_row()) { if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt) { @@ -4487,39 +4691,48 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) } /* - Write the SQL command - */ - - if (event_info->write(file) || + Write the event. + */ + if (event_info->write(file) || DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0)) goto err; - if (file == &log_file) // we are writing to the real log (disk) + error= 0; + +err: + if (event_info->use_direct_logging()) { - bool synced= 0; - if (flush_and_sync(&synced)) - goto err; + if (!error) + { + bool synced; + if ((error= flush_and_sync(&synced))) + goto unlock; - if (RUN_HOOK(binlog_storage, after_flush, - (thd, log_file_name, file->pos_in_file, synced))) { - sql_print_error("Failed to run 'after_flush' hooks"); - goto err; + if ((error= RUN_HOOK(binlog_storage, after_flush, + (thd, log_file_name, file->pos_in_file, synced)))) + { + sql_print_error("Failed to run 'after_flush' hooks"); + goto unlock; + } + signal_update(); + rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } - - signal_update(); - rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); +unlock: + pthread_mutex_unlock(&LOCK_log); } - error=0; -err: if (error) + { set_write_error(thd); + if (check_write_error(thd) && cache_data && + thd->transaction.stmt.modified_non_trans_table) + cache_data->set_incident(); + } } if (event_info->flags & LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F) ++m_table_map_version; - pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -4642,7 +4855,7 @@ uint MYSQL_BIN_LOG::next_file_id() write_cache() cache Cache to write to the binary log lock_log True if the LOCK_log mutex should be aquired, false otherwise - sync_log True if the log should be flushed and sync:ed + sync_log True if the log should be flushed and synced DESCRIPTION Write the contents of the cache to the binary log. The cache will @@ -4858,9 +5071,6 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)"); pthread_mutex_lock(&LOCK_log); - /* NULL would represent nothing to replicate after ROLLBACK */ - DBUG_ASSERT(commit_event != NULL); - DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { @@ -4875,19 +5085,9 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, transaction is either a BEGIN..COMMIT block or a single statement in autocommit mode. */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); - - /* - Now this Query_log_event has artificial log_pos 0. It must be - adjusted to reflect the real position in the log. Not doing it - would confuse the slave: it would prevent this one from - knowing where he is in the master's binlog, which would result - in wrong positions being shown to the user, MASTER_POS_WAIT - undue waiting etc. - */ + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE, TRUE, 0); if (qinfo.write(&log_file)) goto err; - DBUG_EXECUTE_IF("crash_before_writing_xid", { if ((write_error= write_cache(cache, false, true))) @@ -5985,13 +6185,13 @@ int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid) { DBUG_ENTER("TC_LOG_BINLOG::log"); Xid_log_event xle(thd, xid); - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); /* We always commit the entire transaction when writing an XID. Also note that the return value is inverted. */ - DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE)); + DBUG_RETURN(!binlog_flush_trx_cache(thd, cache_mngr, &xle)); } void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) diff --git a/sql/log.h b/sql/log.h index fa24a2c5803..8df5111d3c2 100644 --- a/sql/log.h +++ b/sql/log.h @@ -20,6 +20,9 @@ class Relay_log_info; class Format_description_log_event; +bool trans_has_updated_trans_table(THD* thd); +bool stmt_has_updated_trans_table(THD *thd); + /* Transaction Coordinator log - a base abstract class for two different implementations @@ -347,8 +350,9 @@ public: ulonglong table_map_version() const { return m_table_map_version; } void update_table_map_version() { ++m_table_map_version; } - int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event); - int remove_pending_rows_event(THD *thd); + int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event, + bool is_transactional); + int remove_pending_rows_event(THD *thd, bool is_transactional); #endif /* !defined(MYSQL_CLIENT) */ void reset_bytes_written() @@ -387,7 +391,7 @@ public: /* Use this to start writing a new log file */ void new_file(); - bool write(Log_event* event_info); // binary log write + bool write(Log_event* event_info); bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event, bool incident); bool write_incident(THD *thd, bool lock); @@ -586,6 +590,8 @@ public: void init_base(); void init_log_tables(); bool flush_logs(THD *thd); + bool flush_slow_log(); + bool flush_general_log(); /* Perform basic logger cleanup. this will leave e.g. error log open. */ void cleanup_base(); /* Free memory. Nothing could be logged after this function is called */ diff --git a/sql/log_event.cc b/sql/log_event.cc index 9cfa6cf1540..d5090c8534a 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -29,7 +29,6 @@ #include "rpl_rli.h" #include "rpl_mi.h" #include "rpl_filter.h" -#include "rpl_utility.h" #include "rpl_record.h" #include <my_dir.h> @@ -37,6 +36,7 @@ #include <base64.h> #include <my_bitmap.h> +#include "rpl_utility.h" #define log_cs &my_charset_latin1 @@ -665,10 +665,11 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) { server_id= thd->server_id; when= thd->start_time; - cache_stmt= using_trans; + cache_type= (using_trans || stmt_has_updated_trans_table(thd) + ? Log_event::EVENT_TRANSACTIONAL_CACHE : + Log_event::EVENT_STMT_CACHE); } - /** This minimal constructor is for when you are not even sure that there is a valid THD. For example in the server when we are shutting down or @@ -677,8 +678,8 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) */ Log_event::Log_event() - :temp_buf(0), exec_time(0), flags(0), cache_stmt(0), - thd(0) + :temp_buf(0), exec_time(0), flags(0), + cache_type(Log_event::EVENT_INVALID_CACHE), thd(0) { server_id= ::server_id; /* @@ -697,7 +698,7 @@ Log_event::Log_event() Log_event::Log_event(const char* buf, const Format_description_log_event* description_event) - :temp_buf(0), cache_stmt(0) + :temp_buf(0), cache_type(Log_event::EVENT_INVALID_CACHE) { #ifndef MYSQL_CLIENT thd = 0; @@ -1571,37 +1572,14 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr, /* a long CHAR() field: see #37426 */ length= byte1 | (((byte0 & 0x30) ^ 0x30) << 4); type= byte0 | 0x30; - goto beg; - } - - switch (byte0) - { - case MYSQL_TYPE_SET: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_STRING: - type= byte0; - length= byte1; - break; - - default: - - { - char tmp[5]; - my_snprintf(tmp, sizeof(tmp), "%04X", meta); - my_b_printf(file, - "!! Don't know how to handle column type=%d meta=%d (%s)", - type, meta, tmp); - return 0; - } } + else + length = meta & 0xFF; } else length= meta; } - -beg: - switch (type) { case MYSQL_TYPE_LONG: { @@ -1738,6 +1716,33 @@ beg: return 3; } + case MYSQL_TYPE_NEWDATE: + { + uint32 tmp= uint3korr(ptr); + int part; + char buf[10]; + char *pos= &buf[10]; + + /* Copied from field.cc */ + *pos--=0; // End NULL + part=(int) (tmp & 31); + *pos--= (char) ('0'+part%10); + *pos--= (char) ('0'+part/10); + *pos--= ':'; + part=(int) (tmp >> 5 & 15); + *pos--= (char) ('0'+part%10); + *pos--= (char) ('0'+part/10); + *pos--= ':'; + part=(int) (tmp >> 9); + *pos--= (char) ('0'+part%10); part/=10; + *pos--= (char) ('0'+part%10); part/=10; + *pos--= (char) ('0'+part%10); part/=10; + *pos= (char) ('0'+part); + my_b_printf(file , "'%s'", buf); + my_snprintf(typestr, typestr_length, "DATE"); + return 3; + } + case MYSQL_TYPE_DATE: { uint i32= uint3korr(ptr); @@ -1756,7 +1761,7 @@ beg: } case MYSQL_TYPE_ENUM: - switch (length) { + switch (meta & 0xFF) { case 1: my_b_printf(file, "%d", (int) *ptr); my_snprintf(typestr, typestr_length, "ENUM(1 byte)"); @@ -1769,15 +1774,15 @@ beg: return 2; } default: - my_b_printf(file, "!! Unknown ENUM packlen=%d", length); + my_b_printf(file, "!! Unknown ENUM packlen=%d", meta & 0xFF); return 0; } break; case MYSQL_TYPE_SET: - my_b_write_bit(file, ptr , length * 8); - my_snprintf(typestr, typestr_length, "SET(%d bytes)", length); - return length; + my_b_write_bit(file, ptr , (meta & 0xFF) * 8); + my_snprintf(typestr, typestr_length, "SET(%d bytes)", meta & 0xFF); + return meta & 0xFF; case MYSQL_TYPE_BLOB: switch (meta) { @@ -2357,7 +2362,7 @@ Query_log_event::Query_log_event() */ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length, bool using_trans, - bool suppress_use, int errcode) + bool direct, bool suppress_use, int errcode) :Log_event(thd_arg, (thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : @@ -2436,6 +2441,95 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, } else time_zone_len= 0; + + /* + In what follows, we decide whether to write to the binary log or to use a + cache. + */ + LEX *lex= thd->lex; + bool implicit_commit= FALSE; + cache_type= Log_event::EVENT_INVALID_CACHE; + switch (lex->sql_command) + { + case SQLCOM_ALTER_DB: + case SQLCOM_CREATE_FUNCTION: + case SQLCOM_DROP_FUNCTION: + case SQLCOM_DROP_PROCEDURE: + case SQLCOM_INSTALL_PLUGIN: + case SQLCOM_UNINSTALL_PLUGIN: + case SQLCOM_ALTER_TABLESPACE: + implicit_commit= TRUE; + break; + case SQLCOM_DROP_TABLE: + implicit_commit= !(lex->drop_temporary && thd->in_multi_stmt_transaction()); + break; + case SQLCOM_ALTER_TABLE: + case SQLCOM_CREATE_TABLE: + implicit_commit= !((lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) && + thd->in_multi_stmt_transaction()) && + !(lex->select_lex.item_list.elements && + thd->is_current_stmt_binlog_format_row()); + break; + case SQLCOM_SET_OPTION: + implicit_commit= (lex->autocommit ? TRUE : FALSE); + break; + /* + Replace what follows after CF_AUTO_COMMIT_TRANS is backported by: + + default: + implicit_commit= ((sql_command_flags[lex->sql_command] & + CF_AUTO_COMMIT_TRANS)); + break; + */ + case SQLCOM_CREATE_INDEX: + case SQLCOM_TRUNCATE: + case SQLCOM_CREATE_DB: + case SQLCOM_DROP_DB: + case SQLCOM_ALTER_DB_UPGRADE: + case SQLCOM_RENAME_TABLE: + case SQLCOM_DROP_INDEX: + case SQLCOM_CREATE_VIEW: + case SQLCOM_DROP_VIEW: + case SQLCOM_CREATE_TRIGGER: + case SQLCOM_DROP_TRIGGER: + case SQLCOM_CREATE_EVENT: + case SQLCOM_ALTER_EVENT: + case SQLCOM_DROP_EVENT: + case SQLCOM_REPAIR: + case SQLCOM_OPTIMIZE: + case SQLCOM_ANALYZE: + case SQLCOM_CREATE_USER: + case SQLCOM_DROP_USER: + case SQLCOM_RENAME_USER: + case SQLCOM_REVOKE_ALL: + case SQLCOM_REVOKE: + case SQLCOM_GRANT: + case SQLCOM_CREATE_PROCEDURE: + case SQLCOM_CREATE_SPFUNCTION: + case SQLCOM_ALTER_PROCEDURE: + case SQLCOM_ALTER_FUNCTION: + case SQLCOM_ASSIGN_TO_KEYCACHE: + case SQLCOM_PRELOAD_KEYS: + case SQLCOM_FLUSH: + case SQLCOM_CHECK: + implicit_commit= TRUE; + break; + default: + implicit_commit= FALSE; + break; + } + + if (implicit_commit || direct) + { + cache_type= Log_event::EVENT_NO_CACHE; + } + else + { + cache_type= (using_trans || stmt_has_updated_trans_table(thd) + ? Log_event::EVENT_TRANSACTIONAL_CACHE : + Log_event::EVENT_STMT_CACHE); + } + DBUG_ASSERT(cache_type != Log_event::EVENT_INVALID_CACHE); DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %lu", (ulong) flags2, sql_mode)); } @@ -6694,9 +6788,9 @@ Execute_load_query_log_event(THD *thd_arg, const char* query_arg, ulong query_length_arg, uint fn_pos_start_arg, uint fn_pos_end_arg, enum_load_dup_handling dup_handling_arg, - bool using_trans, bool suppress_use, + bool using_trans, bool direct, bool suppress_use, int errcode): - Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, + Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, direct, suppress_use, errcode), file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg), fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg) @@ -7265,7 +7359,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) We also call the mysql_reset_thd_for_next_command(), since this is the logical start of the next "statement". Note that this - call might reset the value of current_stmt_binlog_row_based, so + call might reset the value of current_stmt_binlog_format, so we need to do any changes to that value after this function. */ lex_start(thd); @@ -7277,16 +7371,12 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ thd->transaction.stmt.modified_non_trans_table= FALSE; /* - Check if the slave is set to use SBR. If so, it should switch - to using RBR until the end of the "statement", i.e., next - STMT_END_F or next error. + This is a row injection, so we flag the "statement" as + such. Note that this code is called both when the slave does row + injections and when the BINLOG statement is used to do row + injections. */ - if (!thd->current_stmt_binlog_row_based && - mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG)) - { - thd->set_current_stmt_binlog_row_based(); - } - + thd->lex->set_stmt_row_injection(); /* There are a few flags that are replicated with each row event. @@ -7335,11 +7425,18 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ { + DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p", + rli->tables_to_lock)); RPL_TABLE_LIST *ptr= rli->tables_to_lock; for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global)) { - if (ptr->m_tabledef.compatible_with(rli, ptr->table)) + TABLE *conv_table; + if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli), + ptr->table, &conv_table)) { + DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master", + ptr->table->s->db.str, + ptr->table->s->table_name.str)); /* We should not honour --slave-skip-errors at this point as we are having severe errors which should not be skiped. @@ -7350,12 +7447,17 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); DBUG_RETURN(ERR_BAD_TABLE_DEF); } + DBUG_PRINT("debug", ("Table: %s.%s is compatible with master" + " - conv_table: %p", + ptr->table->s->db.str, + ptr->table->s->table_name.str, conv_table)); + ptr->m_conv_table= conv_table; } } /* - ... and then we add all the tables to the table map and remove - them from tables to lock. + ... and then we add all the tables to the table map and but keep + them in the tables to lock list. We also invalidate the query cache for all the tables, since they will now be changed. @@ -7523,7 +7625,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) error= 0; } - if (!cache_stmt) + if (!use_trans_cache()) { DBUG_PRINT("info", ("Marked that we need to keep log")); thd->options|= OPTION_KEEP_LOG; @@ -7542,7 +7644,14 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table, get_type_str(), RPL_LOG_NAME, (ulong) log_pos); - thd->reset_current_stmt_binlog_row_based(); + /* + @todo We should probably not call + reset_current_stmt_binlog_format_row() from here. + + Note: this applies to log_event_old.cc too. + /Sven + */ + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); thd->is_slave_error= 1; DBUG_RETURN(error); @@ -7603,7 +7712,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) (assume the last master's transaction is ignored by the slave because of replicate-ignore rules). */ - error= thd->binlog_flush_pending_rows_event(true); + error= thd->binlog_flush_pending_rows_event(TRUE); /* If this event is not in a transaction, the call below will, if some @@ -7626,7 +7735,17 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) event flushed. */ - thd->reset_current_stmt_binlog_row_based(); + /* + @todo We should probably not call + reset_current_stmt_binlog_format_row() from here. + + Note: this applies to log_event_old.cc too + + Btw, the previous comment about transactional engines does not + seem related to anything that happens here. + /Sven + */ + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0); } @@ -7833,7 +7952,10 @@ int Table_map_log_event::save_field_metadata() DBUG_ENTER("Table_map_log_event::save_field_metadata"); int index= 0; for (unsigned int i= 0 ; i < m_table->s->fields ; i++) + { + DBUG_PRINT("debug", ("field_type: %d", m_coltype[i])); index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]); + } DBUG_RETURN(index); } #endif /* !defined(MYSQL_CLIENT) */ @@ -7845,8 +7967,8 @@ int Table_map_log_event::save_field_metadata() */ #if !defined(MYSQL_CLIENT) Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, - bool is_transactional, uint16 flags) - : Log_event(thd, 0, true), + bool is_transactional) + : Log_event(thd, 0, is_transactional), m_table(tbl), m_dbnam(tbl->s->db.str), m_dblen(m_dbnam ? tbl->s->db.length : 0), @@ -7855,7 +7977,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, m_colcnt(tbl->s->fields), m_memory(NULL), m_table_id(tid), - m_flags(flags), + m_flags(TM_BIT_LEN_EXACT_F), m_data_size(0), m_field_metadata(0), m_field_metadata_size(0), @@ -8111,8 +8233,10 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli) inside Relay_log_info::clear_tables_to_lock() by calling the table_def destructor explicitly. */ - new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt, - m_field_metadata, m_field_metadata_size, m_null_bits); + new (&table_list->m_tabledef) + table_def(m_coltype, m_colcnt, + m_field_metadata, m_field_metadata_size, + m_null_bits, m_flags); table_list->m_tabledef_valid= TRUE; /* diff --git a/sql/log_event.h b/sql/log_event.h index 1fdd7a05968..c3689a2317f 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -870,6 +870,31 @@ public: EVENT_SKIP_COUNT }; + enum enum_event_cache_type + { + EVENT_INVALID_CACHE, + /* + If possible the event should use a non-transactional cache before + being flushed to the binary log. This means that it must be flushed + right after its correspondent statement is completed. + */ + EVENT_STMT_CACHE, + /* + The event should use a transactional cache before being flushed to + the binary log. This means that it must be flushed upon commit or + rollback. + */ + EVENT_TRANSACTIONAL_CACHE, + /* + The event must be written directly to the binary log without going + through a cache. + */ + EVENT_NO_CACHE, + /** + If there is a need for different types, introduce them before this. + */ + EVENT_CACHE_COUNT + }; /* The following type definition is to be used whenever data is placed @@ -920,8 +945,12 @@ public: LOG_EVENT_SUPPRESS_USE_F for notes. */ uint16 flags; - - bool cache_stmt; + + /* + Defines the type of the cache, if any, where the event will be + stored before being flushed to disk. + */ + uint16 cache_type; /** A storage to cache the global system variable's value. @@ -933,7 +962,7 @@ public: THD* thd; Log_event(); - Log_event(THD* thd_arg, uint16 flags_arg, bool cache_stmt); + Log_event(THD* thd_arg, uint16 flags_arg, bool is_transactional); /* read_log_event() functions read an event from a binlog or relay log; used by SHOW BINLOG EVENTS, the binlog_dump thread on the @@ -1031,7 +1060,18 @@ public: void set_relay_log_event() { flags |= LOG_EVENT_RELAY_LOG_F; } bool is_artificial_event() const { return flags & LOG_EVENT_ARTIFICIAL_F; } bool is_relay_log_event() const { return flags & LOG_EVENT_RELAY_LOG_F; } - inline bool get_cache_stmt() const { return cache_stmt; } + inline bool use_trans_cache() const + { + return (cache_type == Log_event::EVENT_TRANSACTIONAL_CACHE); + } + inline void set_direct_logging() + { + cache_type = Log_event::EVENT_NO_CACHE; + } + inline bool use_direct_logging() + { + return (cache_type == Log_event::EVENT_NO_CACHE); + } Log_event(const char* buf, const Format_description_log_event *description_event); virtual ~Log_event() { free_temp_buf();} @@ -1645,7 +1685,7 @@ public: #ifndef MYSQL_CLIENT Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length, - bool using_trans, bool suppress_use, int error); + bool using_trans, bool direct, bool suppress_use, int error); const char* get_db() { return db; } #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); @@ -2895,8 +2935,8 @@ public: ulong query_length, uint fn_pos_start_arg, uint fn_pos_end_arg, enum_load_dup_handling dup_handling_arg, - bool using_trans, bool suppress_use, - int errcode); + bool using_trans, bool direct, + bool suppress_use, int errcode); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); #endif /* HAVE_REPLICATION */ @@ -3304,16 +3344,14 @@ public: /* Special constants representing sets of flags */ enum { - TM_NO_FLAGS = 0U + TM_NO_FLAGS = 0U, + TM_BIT_LEN_EXACT_F = (1U << 0) }; - void set_flags(flag_set flag) { m_flags |= flag; } - void clear_flags(flag_set flag) { m_flags &= ~flag; } flag_set get_flags(flag_set flag) const { return m_flags & flag; } #ifndef MYSQL_CLIENT - Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, - bool is_transactional, uint16 flags); + Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, bool is_transactional); #endif #ifdef HAVE_REPLICATION Table_map_log_event(const char *buf, uint event_len, @@ -3326,12 +3364,12 @@ public: table_def *create_table_def() { return new table_def(m_coltype, m_colcnt, m_field_metadata, - m_field_metadata_size, m_null_bits); + m_field_metadata_size, m_null_bits, m_flags); } +#endif ulong get_table_id() const { return m_table_id; } const char *get_table_name() const { return m_tblnam; } const char *get_db_name() const { return m_dbnam; } -#endif virtual Log_event_type get_type_code() { return TABLE_MAP_EVENT; } virtual bool is_valid() const { return m_memory != NULL; /* we check malloc */ } @@ -3883,6 +3921,7 @@ public: DBUG_PRINT("enter", ("m_incident: %d", m_incident)); m_message.str= NULL; /* Just as a precaution */ m_message.length= 0; + set_direct_logging(); DBUG_VOID_RETURN; } @@ -3892,6 +3931,7 @@ public: DBUG_ENTER("Incident_log_event::Incident_log_event"); DBUG_PRINT("enter", ("m_incident: %d", m_incident)); m_message= msg; + set_direct_logging(); DBUG_VOID_RETURN; } #endif diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index 72affd2bee9..ecdaa5ce25d 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -59,22 +59,19 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info We also call the mysql_reset_thd_for_next_command(), since this is the logical start of the next "statement". Note that this - call might reset the value of current_stmt_binlog_row_based, so + call might reset the value of current_stmt_binlog_format, so we need to do any changes to that value after this function. */ lex_start(thd); mysql_reset_thd_for_next_command(thd); /* - Check if the slave is set to use SBR. If so, it should switch - to using RBR until the end of the "statement", i.e., next - STMT_END_F or next error. + This is a row injection, so we flag the "statement" as + such. Note that this code is called both when the slave does row + injections and when the BINLOG statement is used to do row + injections. */ - if (!thd->current_stmt_binlog_row_based && - mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG)) - { - thd->set_current_stmt_binlog_row_based(); - } + thd->lex->set_stmt_row_injection(); if (simple_open_n_lock_tables(thd, rli->tables_to_lock)) { @@ -107,14 +104,24 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info RPL_TABLE_LIST *ptr= rli->tables_to_lock; for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global)) { - if (ptr->m_tabledef.compatible_with(rli, ptr->table)) + TABLE *conv_table; + if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli), + ptr->table, &conv_table)) { + DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master", + ptr->table->s->db.str, + ptr->table->s->table_name.str)); mysql_unlock_tables(thd, thd->lock); thd->lock= 0; thd->is_slave_error= 1; const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF); } + DBUG_PRINT("debug", ("Table: %s.%s is compatible with master" + " - conv_table: %p", + ptr->table->s->db.str, + ptr->table->s->table_name.str, conv_table)); + ptr->m_conv_table= conv_table; } } @@ -229,7 +236,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info DBUG_EXECUTE_IF("stop_slave_middle_group", const_cast<Relay_log_info*>(rli)->abort_slave= 1;); error= do_after_row_operations(table, error); - if (!ev->cache_stmt) + if (!ev->use_trans_cache()) { DBUG_PRINT("info", ("Marked that we need to keep log")); thd->options|= OPTION_KEEP_LOG; @@ -263,7 +270,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info thread is certainly going to stop. rollback at the caller along with sbr. */ - thd->reset_current_stmt_binlog_row_based(); + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); thd->is_slave_error= 1; DBUG_RETURN(error); @@ -1513,7 +1520,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) NOTE: For this new scheme there should be no pending event: need to add code to assert that is the case. */ - error= thd->binlog_flush_pending_rows_event(false); + error= thd->binlog_flush_pending_rows_event(FALSE); if (error) { rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, @@ -1558,7 +1565,9 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) RPL_TABLE_LIST *ptr= rli->tables_to_lock; for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global)) { - if (ptr->m_tabledef.compatible_with(rli, ptr->table)) + TABLE *conv_table; + if (ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli), + ptr->table, &conv_table)) { mysql_unlock_tables(thd, thd->lock); thd->lock= 0; @@ -1566,12 +1575,14 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); DBUG_RETURN(ERR_BAD_TABLE_DEF); } + ptr->m_conv_table= conv_table; } } /* - ... and then we add all the tables to the table map and remove - them from tables to lock. + ... and then we add all the tables to the table map but keep + them in the tables to lock list. + We also invalidate the query cache for all the tables, since they will now be changed. @@ -1727,7 +1738,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) DBUG_EXECUTE_IF("stop_slave_middle_group", const_cast<Relay_log_info*>(rli)->abort_slave= 1;); error= do_after_row_operations(rli, error); - if (!cache_stmt) + if (!use_trans_cache()) { DBUG_PRINT("info", ("Marked that we need to keep log")); thd->options|= OPTION_KEEP_LOG; @@ -1761,7 +1772,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) thread is certainly going to stop. rollback at the caller along with sbr. */ - thd->reset_current_stmt_binlog_row_based(); + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); thd->is_slave_error= 1; DBUG_RETURN(error); @@ -1773,7 +1784,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) last_event_start_time here instead. */ if (table && (table->s->primary_key == MAX_KEY) && - !cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS) + !use_trans_cache() && get_flags(STMT_END_F) == RLE_NO_FLAGS) { /* ------------ Temporary fix until WL#2975 is implemented --------- @@ -1811,7 +1822,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) (assume the last master's transaction is ignored by the slave because of replicate-ignore rules). */ - int binlog_error= thd->binlog_flush_pending_rows_event(true); + int binlog_error= thd->binlog_flush_pending_rows_event(TRUE); /* If this event is not in a transaction, the call below will, if some @@ -1840,7 +1851,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) event flushed. */ - thd->reset_current_stmt_binlog_row_based(); + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0); } diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 3ddaf114673..0e1f5b8c532 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -1041,7 +1041,8 @@ struct Query_cache_query_flags #endif /*HAVE_QUERY_CACHE*/ int write_bin_log(THD *thd, bool clear_error, - char const *query, ulong query_length); + char const *query, ulong query_length, + bool is_trans= FALSE); /* sql_connect.cc */ int check_user(THD *thd, enum enum_server_command command, @@ -1568,7 +1569,6 @@ TABLE *open_n_lock_single_table(THD *thd, TABLE_LIST *table_l, thr_lock_type lock_type); bool open_normal_and_derived_tables(THD *thd, TABLE_LIST *tables, uint flags); int lock_tables(THD *thd, TABLE_LIST *tables, uint counter, bool *need_reopen); -int decide_logging_format(THD *thd, TABLE_LIST *tables); TABLE *open_temporary_table(THD *thd, const char *path, const char *db, const char *table_name, bool link_in_list); bool rm_temporary_table(handlerton *base, char *path); @@ -2011,6 +2011,7 @@ extern my_bool opt_sql_bin_update, opt_safe_user_create, opt_no_mix_types; extern my_bool opt_safe_show_db, opt_local_infile, opt_myisam_use_mmap; extern my_bool opt_slave_compressed_protocol, use_temp_pool; extern ulong slave_exec_mode_options; +extern ulong slave_type_conversions_options; extern my_bool opt_readonly, lower_case_file_system; extern my_bool opt_enable_named_pipe, opt_sync_frm, opt_allow_suspicious_udfs; extern my_bool opt_secure_auth; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 826e8a6c980..97b48dc5855 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -535,6 +535,8 @@ ulong open_files_limit, max_binlog_size, max_relay_log_size; ulong slave_net_timeout, slave_trans_retries; ulong slave_exec_mode_options; const char *slave_exec_mode_str= "STRICT"; +ulong slave_type_conversions_options; +const char *slave_type_conversions_default= ""; ulong thread_cache_size=0, thread_pool_size= 0; ulong binlog_cache_size=0; ulonglong max_binlog_cache_size=0; @@ -5774,6 +5776,7 @@ enum options_mysqld #endif /* defined(ENABLED_DEBUG_SYNC) */ OPT_OLD_MODE, OPT_SLAVE_EXEC_MODE, + OPT_SLAVE_TYPE_CONVERSIONS, OPT_GENERAL_LOG_FILE, OPT_SLOW_QUERY_LOG_FILE, OPT_IGNORE_BUILTIN_INNODB, @@ -6466,6 +6469,15 @@ replicating a LOAD DATA INFILE command.", {"slave-exec-mode", OPT_SLAVE_EXEC_MODE, "Modes for how replication events should be executed. Legal values are STRICT (default) and IDEMPOTENT. In IDEMPOTENT mode, replication will not stop for operations that are idempotent. In STRICT mode, replication will stop on any unexpected difference between the master and the slave.", (uchar**) &slave_exec_mode_str, (uchar**) &slave_exec_mode_str, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"slave-type-conversions", OPT_SLAVE_TYPE_CONVERSIONS, + "Set of slave type conversions that are enabled. Legal values are:" + " ALL_LOSSY to enable lossy conversions and" + " ALL_NON_LOSSY to enable non-lossy conversions." + " If the variable is assigned the empty set, no conversions are" + " allowed and it is expected that the types match exactly.", + (uchar**) &slave_type_conversions_default, + (uchar**) &slave_type_conversions_default, + 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, #endif {"slow-query-log", OPT_SLOW_LOG, "Enable|disable slow query log", (uchar**) &opt_slow_log, @@ -7784,6 +7796,11 @@ static int mysql_init_variables(void) slave_exec_mode_options= (uint) find_bit_type_or_exit(slave_exec_mode_str, &slave_exec_mode_typelib, NULL, &error); + /* Slave type conversions */ + slave_type_conversions_options= 0; + slave_type_conversions_options= + find_bit_type_or_exit(slave_type_conversions_default, &slave_type_conversions_typelib, + NULL, &error); if (error) return 1; opt_specialflag= SPECIAL_ENGLISH; @@ -8007,6 +8024,12 @@ mysqld_get_one_option(int optid, if (error) return 1; break; + case OPT_SLAVE_TYPE_CONVERSIONS: + slave_type_conversions_options= (uint) + find_bit_type_or_exit(argument, &slave_type_conversions_typelib, "", &error); + if (error) + return 1; + break; #endif case OPT_SAFEMALLOC_MEM_LIMIT: #if !defined(DBUG_OFF) && defined(SAFEMALLOC) diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index 8392b2c1c3d..95841b57064 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -183,11 +183,18 @@ int register_slave(THD* thd, uchar* packet, uint packet_length) get_object(p,si->host, "Failed to register slave: too long 'report-host'"); get_object(p,si->user, "Failed to register slave: too long 'report-user'"); get_object(p,si->password, "Failed to register slave; too long 'report-password'"); - /*6 is the total length of port and master_id*/ - if (p+6 != p_end) + if (p+10 > p_end) goto err; si->port= uint2korr(p); p += 2; + /* + We need to by pass the bytes used in the fake rpl_recovery_rank + variable. It was removed in patch for BUG#13963. But this would + make a server with that patch unable to connect to an old master. + See: BUG#49259 + */ + // si->rpl_recovery_rank= uint4korr(p); + p += 4; if (!(si->master_id= uint4korr(p))) si->master_id= server_id; si->thd= thd; diff --git a/sql/rpl_injector.cc b/sql/rpl_injector.cc index 666622dbac4..ac879b5033f 100644 --- a/sql/rpl_injector.cc +++ b/sql/rpl_injector.cc @@ -36,8 +36,6 @@ injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd) m_start_pos.m_file_pos= log_info.pos; begin_trans(m_thd); - - thd->set_current_stmt_binlog_row_based(); } injector::transaction::~transaction() diff --git a/sql/rpl_record.cc b/sql/rpl_record.cc index 8e80620dd2c..c2d3ddbc353 100644 --- a/sql/rpl_record.cc +++ b/sql/rpl_record.cc @@ -104,10 +104,10 @@ pack_row(TABLE *table, MY_BITMAP const* cols, #endif pack_ptr= field->pack(pack_ptr, field->ptr + offset, field->max_data_length(), TRUE); - DBUG_PRINT("debug", ("field: %s; pack_ptr: 0x%lx;" + DBUG_PRINT("debug", ("field: %s; real_type: %d, pack_ptr: 0x%lx;" " pack_ptr':0x%lx; bytes: %d", - field->field_name, (ulong) old_pack_ptr, - (ulong) pack_ptr, + field->field_name, field->real_type(), + (ulong) old_pack_ptr, (ulong) pack_ptr, (int) (pack_ptr - old_pack_ptr))); } @@ -202,10 +202,30 @@ unpack_row(Relay_log_info const *rli, // The "current" null bits unsigned int null_bits= *null_ptr++; uint i= 0; - table_def *tabledef= ((Relay_log_info*)rli)->get_tabledef(table); + table_def *tabledef; + TABLE *conv_table; + bool table_found= rli->get_table_data(table, &tabledef, &conv_table); + DBUG_PRINT("debug", ("Table data: table_found: %d, tabldef: %p, conv_table: %p", + table_found, tabledef, conv_table)); + DBUG_ASSERT(table_found); + if (!table_found) + return HA_ERR_GENERIC; + for (field_ptr= begin_ptr ; field_ptr < end_ptr && *field_ptr ; ++field_ptr) { - Field *const f= *field_ptr; + /* + If there is a conversion table, we pick up the field pointer to + the conversion table. If the conversion table or the field + pointer is NULL, no conversions are necessary. + */ + Field *conv_field= + conv_table ? conv_table->field[field_ptr - begin_ptr] : NULL; + Field *const f= + conv_field ? conv_field : *field_ptr; + DBUG_PRINT("debug", ("Conversion %srequired for field '%s' (#%ld)", + conv_field ? "" : "not ", + (*field_ptr)->field_name, field_ptr - begin_ptr)); + DBUG_ASSERT(f != NULL); /* No need to bother about columns that does not exist: they have @@ -275,6 +295,39 @@ unpack_row(Relay_log_info const *rli, (int) (pack_ptr - old_pack_ptr))); } + /* + If conv_field is set, then we are doing a conversion. In this + case, we have unpacked the master data to the conversion + table, so we need to copy the value stored in the conversion + table into the final table and do the conversion at the same time. + */ + if (conv_field) + { + Copy_field copy; +#ifndef DBUG_OFF + char source_buf[MAX_FIELD_WIDTH]; + char value_buf[MAX_FIELD_WIDTH]; + String source_type(source_buf, sizeof(source_buf), system_charset_info); + String value_string(value_buf, sizeof(value_buf), system_charset_info); + conv_field->sql_type(source_type); + conv_field->val_str(&value_string); + DBUG_PRINT("debug", ("Copying field '%s' of type '%s' with value '%s'", + (*field_ptr)->field_name, + source_type.c_ptr_safe(), value_string.c_ptr_safe())); +#endif + copy.set(*field_ptr, f, TRUE); + (*copy.do_copy)(©); +#ifndef DBUG_OFF + char target_buf[MAX_FIELD_WIDTH]; + String target_type(target_buf, sizeof(target_buf), system_charset_info); + (*field_ptr)->sql_type(target_type); + (*field_ptr)->val_str(&value_string); + DBUG_PRINT("debug", ("Value of field '%s' of type '%s' is now '%s'", + (*field_ptr)->field_name, + target_type.c_ptr_safe(), value_string.c_ptr_safe())); +#endif + } + null_mask <<= 1; } i++; @@ -366,7 +419,6 @@ int prepare_record(TABLE *const table, */ for (Field **field_ptr= table->field+skip; *field_ptr; ++field_ptr) { - uint32 const mask= NOT_NULL_FLAG | NO_DEFAULT_VALUE_FLAG; Field *const f= *field_ptr; if ((f->flags & NO_DEFAULT_VALUE_FLAG) && (f->real_type() != MYSQL_TYPE_ENUM)) diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index ec314e50cc7..5be27cfd6f4 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -333,13 +333,21 @@ public: uint tables_to_lock_count; /* RBR: Count of tables to lock */ table_mapping m_table_map; /* RBR: Mapping table-id to table */ - inline table_def *get_tabledef(TABLE *tbl) + bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const { - table_def *td= 0; - for (TABLE_LIST *ptr= tables_to_lock; ptr && !td; ptr= ptr->next_global) - if (ptr->table == tbl) - td= &((RPL_TABLE_LIST *)ptr)->m_tabledef; - return (td); + DBUG_ASSERT(tabledef_var && conv_table_var); + for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global) + if (ptr->table == table_arg) + { + *tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef; + *conv_table_var= static_cast<RPL_TABLE_LIST*>(ptr)->m_conv_table; + DBUG_PRINT("debug", ("Fetching table data for table %s.%s:" + " tabledef: %p, conv_table: %p", + table_arg->s->db.str, table_arg->s->table_name.str, + *tabledef_var, *conv_table_var)); + return true; + } + return false; } /* diff --git a/sql/rpl_utility.cc b/sql/rpl_utility.cc index e34f8561051..e8e22216b38 100644 --- a/sql/rpl_utility.cc +++ b/sql/rpl_utility.cc @@ -14,8 +14,176 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "rpl_utility.h" + +#ifndef MYSQL_CLIENT #include "rpl_rli.h" +/** + Function to compare two size_t integers for their relative + order. Used below. + */ +int compare(size_t a, size_t b) +{ + if (a < b) + return -1; + if (b < a) + return 1; + return 0; +} + + +/** + Max value for an unsigned integer of 'bits' bits. + + The somewhat contorted expression is to avoid overflow. + */ +uint32 uint_max(int bits) { + return (((1UL << (bits - 1)) - 1) << 1) | 1; +} + + +/** + Compute the maximum display length of a field. + + @param sql_type Type of the field + @param metadata The metadata from the master for the field. + @return Maximum length of the field in bytes. + */ +static uint32 +max_display_length_for_field(enum_field_types sql_type, unsigned int metadata) +{ + DBUG_PRINT("debug", ("sql_type: %d, metadata: 0x%x", sql_type, metadata)); + DBUG_ASSERT(metadata >> 16 == 0); + + switch (sql_type) { + case MYSQL_TYPE_NEWDECIMAL: + return metadata >> 8; + + case MYSQL_TYPE_FLOAT: + return 12; + + case MYSQL_TYPE_DOUBLE: + return 22; + + case MYSQL_TYPE_SET: + case MYSQL_TYPE_ENUM: + return metadata & 0x00ff; + + case MYSQL_TYPE_STRING: + { + uchar type= metadata >> 8; + if (type == MYSQL_TYPE_SET || type == MYSQL_TYPE_ENUM) + return metadata & 0xff; + else + /* This is taken from Field_string::unpack. */ + return (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff); + } + + case MYSQL_TYPE_YEAR: + case MYSQL_TYPE_TINY: + return 4; + + case MYSQL_TYPE_SHORT: + return 6; + + case MYSQL_TYPE_INT24: + return 9; + + case MYSQL_TYPE_LONG: + return 11; + +#ifdef HAVE_LONG_LONG + case MYSQL_TYPE_LONGLONG: + return 20; + +#endif + case MYSQL_TYPE_NULL: + return 0; + + case MYSQL_TYPE_NEWDATE: + return 3; + + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_TIME: + return 3; + + case MYSQL_TYPE_TIMESTAMP: + return 4; + + case MYSQL_TYPE_DATETIME: + return 8; + + case MYSQL_TYPE_BIT: + /* + Decode the size of the bit field from the master. + */ + DBUG_ASSERT((metadata & 0xff) <= 7); + return 8 * (metadata >> 8U) + (metadata & 0x00ff); + + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_VARCHAR: + return metadata; + + /* + The actual length for these types does not really matter since + they are used to calc_pack_length, which ignores the given + length for these types. + + Since we want this to be accurate for other uses, we return the + maximum size in bytes of these BLOBs. + */ + + case MYSQL_TYPE_TINY_BLOB: + return uint_max(1 * 8); + + case MYSQL_TYPE_MEDIUM_BLOB: + return uint_max(3 * 8); + + case MYSQL_TYPE_BLOB: + /* + For the blob type, Field::real_type() lies and say that all + blobs are of type MYSQL_TYPE_BLOB. In that case, we have to look + at the length instead to decide what the max display size is. + */ + return uint_max(metadata * 8); + + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_GEOMETRY: + return uint_max(4 * 8); + + default: + return ~(uint32) 0; + } +} + + +/* + Compare the pack lengths of a source field (on the master) and a + target field (on the slave). + + @param field Target field. + @param type Source field type. + @param metadata Source field metadata. + + @retval -1 The length of the source field is smaller than the target field. + @retval 0 The length of the source and target fields are the same. + @retval 1 The length of the source field is greater than the target field. + */ +int compare_lengths(Field *field, enum_field_types source_type, uint16 metadata) +{ + DBUG_ENTER("compare_lengths"); + size_t const source_length= + max_display_length_for_field(source_type, metadata); + size_t const target_length= field->max_display_length(); + DBUG_PRINT("debug", ("source_length: %lu, source_type: %u," + " target_length: %lu, target_type: %u", + (unsigned long) source_length, source_type, + (unsigned long) target_length, field->real_type())); + int result= compare(source_length, target_length); + DBUG_PRINT("result", ("%d", result)); + DBUG_RETURN(result); +} + /********************************************************************* * table_def member definitions * *********************************************************************/ @@ -169,58 +337,704 @@ uint32 table_def::calc_field_size(uint col, uchar *master_data) const return length; } -/* + +/** + */ +void show_sql_type(enum_field_types type, uint16 metadata, String *str) +{ + DBUG_ENTER("show_sql_type"); + DBUG_PRINT("enter", ("type: %d, metadata: 0x%x", type, metadata)); + + switch (type) + { + case MYSQL_TYPE_TINY: + str->set_ascii(STRING_WITH_LEN("tinyint")); + break; + + case MYSQL_TYPE_SHORT: + str->set_ascii(STRING_WITH_LEN("smallint")); + break; + + case MYSQL_TYPE_LONG: + str->set_ascii(STRING_WITH_LEN("int")); + break; + + case MYSQL_TYPE_FLOAT: + str->set_ascii(STRING_WITH_LEN("float")); + break; + + case MYSQL_TYPE_DOUBLE: + str->set_ascii(STRING_WITH_LEN("double")); + break; + + case MYSQL_TYPE_NULL: + str->set_ascii(STRING_WITH_LEN("null")); + break; + + case MYSQL_TYPE_TIMESTAMP: + str->set_ascii(STRING_WITH_LEN("timestamp")); + break; + + case MYSQL_TYPE_LONGLONG: + str->set_ascii(STRING_WITH_LEN("bigint")); + break; + + case MYSQL_TYPE_INT24: + str->set_ascii(STRING_WITH_LEN("mediumint")); + break; + + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_DATE: + str->set_ascii(STRING_WITH_LEN("date")); + break; + + case MYSQL_TYPE_TIME: + str->set_ascii(STRING_WITH_LEN("time")); + break; + + case MYSQL_TYPE_DATETIME: + str->set_ascii(STRING_WITH_LEN("datetime")); + break; + + case MYSQL_TYPE_YEAR: + str->set_ascii(STRING_WITH_LEN("year")); + break; + + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_VARCHAR: + { + CHARSET_INFO *cs= str->charset(); + uint32 length= + cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(), + "varchar(%u)", metadata); + str->length(length); + } + break; + + case MYSQL_TYPE_BIT: + { + CHARSET_INFO *cs= str->charset(); + int bit_length= 8 * (metadata >> 8) + (metadata & 0xFF); + uint32 length= + cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(), + "bit(%d)", bit_length); + str->length(length); + } + break; + + case MYSQL_TYPE_DECIMAL: + { + CHARSET_INFO *cs= str->charset(); + uint32 length= + cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(), + "decimal(%d,?)", metadata); + str->length(length); + } + break; + + case MYSQL_TYPE_NEWDECIMAL: + { + CHARSET_INFO *cs= str->charset(); + uint32 length= + cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(), + "decimal(%d,%d)", metadata >> 8, metadata & 0xff); + str->length(length); + } + break; + + case MYSQL_TYPE_ENUM: + str->set_ascii(STRING_WITH_LEN("enum")); + break; + + case MYSQL_TYPE_SET: + str->set_ascii(STRING_WITH_LEN("set")); + break; + + case MYSQL_TYPE_BLOB: + /* + Field::real_type() lies regarding the actual type of a BLOB, so + it is necessary to check the pack length to figure out what kind + of blob it really is. + */ + switch (get_blob_type_from_length(metadata)) + { + case MYSQL_TYPE_TINY_BLOB: + str->set_ascii(STRING_WITH_LEN("tinyblob")); + break; + + case MYSQL_TYPE_MEDIUM_BLOB: + str->set_ascii(STRING_WITH_LEN("mediumblob")); + break; + + case MYSQL_TYPE_LONG_BLOB: + str->set_ascii(STRING_WITH_LEN("longblob")); + break; + + case MYSQL_TYPE_BLOB: + str->set_ascii(STRING_WITH_LEN("blob")); + break; + + default: + DBUG_ASSERT(0); + break; + } + break; + + case MYSQL_TYPE_STRING: + { + /* + This is taken from Field_string::unpack. + */ + CHARSET_INFO *cs= str->charset(); + uint bytes= (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff); + uint32 length= + cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(), + "char(%d)", bytes / cs->mbmaxlen); + str->length(length); + } + break; + + case MYSQL_TYPE_GEOMETRY: + str->set_ascii(STRING_WITH_LEN("geometry")); + break; + + default: + str->set_ascii(STRING_WITH_LEN("<unknown type>")); + } + DBUG_VOID_RETURN; +} + + +/** + Check the order variable and print errors if the order is not + acceptable according to the current settings. + + @param order The computed order of the conversion needed. + @param rli The relay log info data structure: for error reporting. + */ +bool is_conversion_ok(int order, Relay_log_info *rli) +{ + DBUG_ENTER("is_conversion_ok"); + bool allow_non_lossy= + bit_is_set(slave_type_conversions_options, SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY); + bool allow_lossy= + bit_is_set(slave_type_conversions_options, SLAVE_TYPE_CONVERSIONS_ALL_LOSSY); + + DBUG_PRINT("enter", ("order: %d, flags:%s%s", order, + allow_non_lossy ? " ALL_NON_LOSSY" : "", + allow_lossy ? " ALL_LOSSY" : "")); + if (order < 0 && !allow_non_lossy) + { + /* !!! Add error message saying that non-lossy conversions need to be allowed. */ + DBUG_RETURN(false); + } + + if (order > 0 && !allow_lossy) + { + /* !!! Add error message saying that lossy conversions need to be allowed. */ + DBUG_RETURN(false); + } + + DBUG_RETURN(true); +} + + +/** + Can a type potentially be converted to another type? + + This function check if the types are convertible and what + conversion is required. + + If conversion is not possible, and error is printed. + + If conversion is possible: + + - *order will be set to -1 if source type is smaller than target + type and a non-lossy conversion can be required. This includes + the case where the field types are different but types could + actually be converted in either direction. + + - *order will be set to 0 if no conversion is required. + + - *order will be set to 1 if the source type is strictly larger + than the target type and that conversion is potentially lossy. + + @param[in] field Target field + @param[in] type Source field type + @param[in] metadata Source field metadata + @param[in] rli Relay log info (for error reporting) + @param[in] mflags Flags from the table map event + @param[out] order Order between source field and target field + + @return @c true if conversion is possible according to the current + settings, @c false if conversion is not possible according to the + current setting. + */ +static bool +can_convert_field_to(Field *field, + enum_field_types source_type, uint16 metadata, + Relay_log_info *rli, uint16 mflags, + int *order_var) +{ + DBUG_ENTER("can_convert_field_to"); +#ifndef DBUG_OFF + char field_type_buf[MAX_FIELD_WIDTH]; + String field_type(field_type_buf, sizeof(field_type_buf), field->charset()); + field->sql_type(field_type); + DBUG_PRINT("enter", ("field_type: %s, target_type: %d, source_type: %d, source_metadata: 0x%x", + field_type.c_ptr_safe(), field->real_type(), source_type, metadata)); +#endif + /* + If the real type is the same, we need to check the metadata to + decide if conversions are allowed. + */ + if (field->real_type() == source_type) + { + DBUG_PRINT("debug", ("Base types are identical, doing field size comparison")); + if (field->compatible_field_size(metadata, rli, mflags, order_var)) + DBUG_RETURN(is_conversion_ok(*order_var, rli)); + else + DBUG_RETURN(false); + } + else if (!slave_type_conversions_options) + DBUG_RETURN(false); + + /* + Here, from and to will always be different. Since the types are + different, we cannot use the compatible_field_size() function, but + have to rely on hard-coded max-sizes for fields. + */ + + DBUG_PRINT("debug", ("Base types are different, checking conversion")); + switch (source_type) // Source type (on master) + { + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_NEWDECIMAL: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + switch (field->real_type()) + { + case MYSQL_TYPE_NEWDECIMAL: + /* + Then the other type is either FLOAT, DOUBLE, or old style + DECIMAL, so we require lossy conversion. + */ + *order_var= 1; + DBUG_RETURN(is_conversion_ok(*order_var, rli)); + + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + { + if (source_type == MYSQL_TYPE_NEWDECIMAL || + source_type == MYSQL_TYPE_DECIMAL) + *order_var = 1; // Always require lossy conversions + else + *order_var= compare_lengths(field, source_type, metadata); + DBUG_ASSERT(*order_var != 0); + DBUG_RETURN(is_conversion_ok(*order_var, rli)); + } + + default: + DBUG_RETURN(false); + } + break; + + /* + The length comparison check will do the correct job of comparing + the field lengths (in bytes) of two integer types. + */ + case MYSQL_TYPE_TINY: + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_INT24: + case MYSQL_TYPE_LONG: + case MYSQL_TYPE_LONGLONG: + switch (field->real_type()) + { + case MYSQL_TYPE_TINY: + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_INT24: + case MYSQL_TYPE_LONG: + case MYSQL_TYPE_LONGLONG: + *order_var= compare_lengths(field, source_type, metadata); + DBUG_ASSERT(*order_var != 0); + DBUG_RETURN(is_conversion_ok(*order_var, rli)); + + default: + DBUG_RETURN(false); + } + break; + + /* + Since source and target type is different, and it is not possible + to convert bit types to anything else, this will return false. + */ + case MYSQL_TYPE_BIT: + DBUG_RETURN(false); + + /* + If all conversions are disabled, it is not allowed to convert + between these types. Since the TEXT vs. BINARY is distinguished by + the charset, and the charset is not replicated, we cannot + currently distinguish between , e.g., TEXT and BLOB. + */ + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_STRING: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_VARCHAR: + switch (field->real_type()) + { + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_STRING: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_VARCHAR: + *order_var= compare_lengths(field, source_type, metadata); + /* + Here we know that the types are different, so if the order + gives that they do not require any conversion, we still need + to have non-lossy conversion enabled to allow conversion + between different (string) types of the same length. + */ + if (*order_var == 0) + *order_var= -1; + DBUG_RETURN(is_conversion_ok(*order_var, rli)); + + default: + DBUG_RETURN(false); + } + break; + + case MYSQL_TYPE_GEOMETRY: + case MYSQL_TYPE_TIMESTAMP: + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_TIME: + case MYSQL_TYPE_DATETIME: + case MYSQL_TYPE_YEAR: + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_NULL: + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + DBUG_RETURN(false); + } + DBUG_RETURN(false); // To keep GCC happy +} + + +/** Is the definition compatible with a table? + This function will compare the master table with an existing table + on the slave and see if they are compatible with respect to the + current settings of @c SLAVE_TYPE_CONVERSIONS. + + If the tables are compatible and conversions are required, @c + *tmp_table_var will be set to a virtual temporary table with field + pointers for the fields that require conversions. This allow simple + checking of whether a conversion are to be applied or not. + + If tables are compatible, but no conversions are necessary, @c + *tmp_table_var will be set to NULL. + + @param rli_arg[in] + Relay log info, for error reporting. + + @param table[in] + Table to compare with + + @param tmp_table_var[out] + Virtual temporary table for performing conversions, if necessary. + + @retval true Master table is compatible with slave table. + @retval false Master table is not compatible with slave table. */ -int -table_def::compatible_with(Relay_log_info const *rli_arg, TABLE *table) +bool +table_def::compatible_with(THD *thd, Relay_log_info *rli, + TABLE *table, TABLE **conv_table_var) const { /* We only check the initial columns for the tables. */ uint const cols_to_check= min(table->s->fields, size()); - int error= 0; - Relay_log_info const *rli= const_cast<Relay_log_info*>(rli_arg); - - TABLE_SHARE const *const tsh= table->s; + TABLE *tmp_table= NULL; for (uint col= 0 ; col < cols_to_check ; ++col) { Field *const field= table->field[col]; - if (field->type() != type(col)) + int order; + if (can_convert_field_to(field, type(col), field_metadata(col), rli, m_flags, &order)) { - DBUG_ASSERT(col < size() && col < tsh->fields); - DBUG_ASSERT(tsh->db.str && tsh->table_name.str); - error= 1; - char buf[256]; - my_snprintf(buf, sizeof(buf), "Column %d type mismatch - " - "received type %d, %s.%s has type %d", - col, type(col), tsh->db.str, tsh->table_name.str, - field->type()); - rli->report(ERROR_LEVEL, ER_BINLOG_ROW_WRONG_TABLE_DEF, - ER(ER_BINLOG_ROW_WRONG_TABLE_DEF), buf); + DBUG_PRINT("debug", ("Checking column %d -" + " field '%s' can be converted - order: %d", + col, field->field_name, order)); + DBUG_ASSERT(order >= -1 && order <= 1); + + /* + If order is not 0, a conversion is required, so we need to set + up the conversion table. + */ + if (order != 0 && tmp_table == NULL) + { + /* + This will create the full table with all fields. This is + necessary to ge the correct field lengths for the record. + */ + tmp_table= create_conversion_table(thd, rli, table); + if (tmp_table == NULL) + return false; + /* + Clear all fields up to, but not including, this column. + */ + for (unsigned int i= 0; i < col; ++i) + tmp_table->field[i]= NULL; + } + + if (order == 0 && tmp_table != NULL) + tmp_table->field[col]= NULL; } - /* - Check the slave's field size against that of the master. - */ - if (!error && - !field->compatible_field_size(field_metadata(col), rli_arg)) + else { - error= 1; - char buf[256]; - my_snprintf(buf, sizeof(buf), "Column %d size mismatch - " - "master has size %d, %s.%s on slave has size %d." - " Master's column size should be <= the slave's " - "column size.", col, - field->pack_length_from_metadata(m_field_metadata[col]), - tsh->db.str, tsh->table_name.str, - field->row_pack_length()); - rli->report(ERROR_LEVEL, ER_BINLOG_ROW_WRONG_TABLE_DEF, - ER(ER_BINLOG_ROW_WRONG_TABLE_DEF), buf); + DBUG_PRINT("debug", ("Checking column %d -" + " field '%s' can not be converted", + col, field->field_name)); + DBUG_ASSERT(col < size() && col < table->s->fields); + DBUG_ASSERT(table->s->db.str && table->s->table_name.str); + const char *db_name= table->s->db.str; + const char *tbl_name= table->s->table_name.str; + char source_buf[MAX_FIELD_WIDTH]; + char target_buf[MAX_FIELD_WIDTH]; + String source_type(source_buf, sizeof(source_buf), field->charset()); + String target_type(target_buf, sizeof(target_buf), field->charset()); + show_sql_type(type(col), field_metadata(col), &source_type); + field->sql_type(target_type); + rli->report(ERROR_LEVEL, ER_SLAVE_CONVERSION_FAILED, + ER(ER_SLAVE_CONVERSION_FAILED), + col, db_name, tbl_name, + source_type.c_ptr(), target_type.c_ptr()); + return false; } } - return error; +#ifndef DBUG_OFF + if (tmp_table) + { + for (unsigned int col= 0; col < tmp_table->s->fields; ++col) + if (tmp_table->field[col]) + { + char source_buf[MAX_FIELD_WIDTH]; + char target_buf[MAX_FIELD_WIDTH]; + String source_type(source_buf, sizeof(source_buf), table->field[col]->charset()); + String target_type(target_buf, sizeof(target_buf), table->field[col]->charset()); + tmp_table->field[col]->sql_type(source_type); + table->field[col]->sql_type(target_type); + DBUG_PRINT("debug", ("Field %s - conversion required." + " Source type: '%s', Target type: '%s'", + tmp_table->field[col]->field_name, + source_type.c_ptr_safe(), target_type.c_ptr_safe())); + } + } +#endif + + *conv_table_var= tmp_table; + return true; } + +/** + Create a conversion table. + + If the function is unable to create the conversion table, an error + will be printed and NULL will be returned. + + @return Pointer to conversion table, or NULL if unable to create + conversion table. + */ + +TABLE *table_def::create_conversion_table(THD *thd, Relay_log_info *rli, TABLE *target_table) const +{ + DBUG_ENTER("table_def::create_conversion_table"); + + List<Create_field> field_list; + + for (uint col= 0 ; col < size() ; ++col) + { + Create_field *field_def= + (Create_field*) alloc_root(thd->mem_root, sizeof(Create_field)); + if (field_list.push_back(field_def)) + DBUG_RETURN(NULL); + + uint decimals= 0; + TYPELIB* interval= NULL; + uint pack_length= 0; + uint32 max_length= + max_display_length_for_field(type(col), field_metadata(col)); + + switch(type(col)) + { + int precision; + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + interval= static_cast<Field_enum*>(target_table->field[col])->typelib; + pack_length= field_metadata(col) & 0x00ff; + break; + + case MYSQL_TYPE_NEWDECIMAL: + /* + The display length of a DECIMAL type is not the same as the + length that should be supplied to make_field, so we correct + the length here. + */ + precision= field_metadata(col) >> 8; + decimals= field_metadata(col) & 0x00ff; + max_length= + my_decimal_precision_to_length(precision, decimals, FALSE); + break; + + case MYSQL_TYPE_DECIMAL: + precision= field_metadata(col); + decimals= static_cast<Field_num*>(target_table->field[col])->dec; + max_length= field_metadata(col); + break; + + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_GEOMETRY: + pack_length= field_metadata(col) & 0x00ff; + break; + + default: + break; + } + + DBUG_PRINT("debug", ("sql_type: %d, target_field: '%s', max_length: %d, decimals: %d," + " maybe_null: %d, unsigned_flag: %d, pack_length: %u", + type(col), target_table->field[col]->field_name, + max_length, decimals, TRUE, FALSE, pack_length)); + field_def->init_for_tmp_table(type(col), + max_length, + decimals, + maybe_null(col), // maybe_null + FALSE, // unsigned_flag + pack_length); + field_def->charset= target_table->field[col]->charset(); + field_def->interval= interval; + } + + TABLE *conv_table= create_virtual_tmp_table(thd, field_list); + if (conv_table == NULL) + rli->report(ERROR_LEVEL, ER_SLAVE_CANT_CREATE_CONVERSION, + ER(ER_SLAVE_CANT_CREATE_CONVERSION), + target_table->s->db.str, + target_table->s->table_name.str); + DBUG_RETURN(conv_table); +} + +#endif /* MYSQL_CLIENT */ + +table_def::table_def(unsigned char *types, ulong size, + uchar *field_metadata, int metadata_size, + uchar *null_bitmap, uint16 flags) + : m_size(size), m_type(0), m_field_metadata_size(metadata_size), + m_field_metadata(0), m_null_bits(0), m_flags(flags), + m_memory(NULL) +{ + m_memory= (uchar *)my_multi_malloc(MYF(MY_WME), + &m_type, size, + &m_field_metadata, + size * sizeof(uint16), + &m_null_bits, (size + 7) / 8, + NULL); + + bzero(m_field_metadata, size * sizeof(uint16)); + + if (m_type) + memcpy(m_type, types, size); + else + m_size= 0; + /* + Extract the data from the table map into the field metadata array + iff there is field metadata. The variable metadata_size will be + 0 if we are replicating from an older version server since no field + metadata was written to the table map. This can also happen if + there were no fields in the master that needed extra metadata. + */ + if (m_size && metadata_size) + { + int index= 0; + for (unsigned int i= 0; i < m_size; i++) + { + switch (m_type[i]) { + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_DOUBLE: + case MYSQL_TYPE_FLOAT: + { + /* + These types store a single byte. + */ + m_field_metadata[i]= field_metadata[index]; + index++; + break; + } + case MYSQL_TYPE_SET: + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_STRING: + { + uint16 x= field_metadata[index++] << 8U; // real_type + x+= field_metadata[index++]; // pack or field length + m_field_metadata[i]= x; + break; + } + case MYSQL_TYPE_BIT: + { + uint16 x= field_metadata[index++]; + x = x + (field_metadata[index++] << 8U); + m_field_metadata[i]= x; + break; + } + case MYSQL_TYPE_VARCHAR: + { + /* + These types store two bytes. + */ + char *ptr= (char *)&field_metadata[index]; + m_field_metadata[i]= uint2korr(ptr); + index= index + 2; + break; + } + case MYSQL_TYPE_NEWDECIMAL: + { + uint16 x= field_metadata[index++] << 8U; // precision + x+= field_metadata[index++]; // decimals + m_field_metadata[i]= x; + break; + } + default: + m_field_metadata[i]= 0; + break; + } + } + } + if (m_size && null_bitmap) + memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8); +} + + +table_def::~table_def() +{ + my_free(m_memory, MYF(0)); +#ifndef DBUG_OFF + m_type= 0; + m_size= 0; +#endif +} + diff --git a/sql/rpl_utility.h b/sql/rpl_utility.h index 1f4ca246ff1..4b9bf3be93f 100644 --- a/sql/rpl_utility.h +++ b/sql/rpl_utility.h @@ -21,6 +21,7 @@ #endif #include "mysql_priv.h" +#include "mysql_com.h" class Relay_log_info; @@ -32,127 +33,24 @@ class Relay_log_info; - Extract and decode table definition data from the table map event - Check if table definition in table map is compatible with table definition on slave - - Currently, the only field type data available is an array of the - type operators that are present in the table map event. - - @todo Add type operands to this structure to allow detection of - difference between, e.g., BIT(5) and BIT(10). */ class table_def { public: /** - Convenience declaration of the type of the field type data in a - table map event. - */ - typedef unsigned char field_type; - - /** Constructor. - @param types Array of types + @param types Array of types, each stored as a byte @param size Number of elements in array 'types' @param field_metadata Array of extra information about fields @param metadata_size Size of the field_metadata array @param null_bitmap The bitmap of fields that can be null */ - table_def(field_type *types, ulong size, uchar *field_metadata, - int metadata_size, uchar *null_bitmap) - : m_size(size), m_type(0), m_field_metadata_size(metadata_size), - m_field_metadata(0), m_null_bits(0), m_memory(NULL) - { - m_memory= (uchar *)my_multi_malloc(MYF(MY_WME), - &m_type, size, - &m_field_metadata, - size * sizeof(uint16), - &m_null_bits, (size + 7) / 8, - NULL); - - bzero(m_field_metadata, size * sizeof(uint16)); - - if (m_type) - memcpy(m_type, types, size); - else - m_size= 0; - /* - Extract the data from the table map into the field metadata array - iff there is field metadata. The variable metadata_size will be - 0 if we are replicating from an older version server since no field - metadata was written to the table map. This can also happen if - there were no fields in the master that needed extra metadata. - */ - if (m_size && metadata_size) - { - int index= 0; - for (unsigned int i= 0; i < m_size; i++) - { - switch (m_type[i]) { - case MYSQL_TYPE_TINY_BLOB: - case MYSQL_TYPE_BLOB: - case MYSQL_TYPE_MEDIUM_BLOB: - case MYSQL_TYPE_LONG_BLOB: - case MYSQL_TYPE_DOUBLE: - case MYSQL_TYPE_FLOAT: - { - /* - These types store a single byte. - */ - m_field_metadata[i]= field_metadata[index]; - index++; - break; - } - case MYSQL_TYPE_SET: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_STRING: - { - uint16 x= field_metadata[index++] << 8U; // real_type - x+= field_metadata[index++]; // pack or field length - m_field_metadata[i]= x; - break; - } - case MYSQL_TYPE_BIT: - { - uint16 x= field_metadata[index++]; - x = x + (field_metadata[index++] << 8U); - m_field_metadata[i]= x; - break; - } - case MYSQL_TYPE_VARCHAR: - { - /* - These types store two bytes. - */ - char *ptr= (char *)&field_metadata[index]; - m_field_metadata[i]= uint2korr(ptr); - index= index + 2; - break; - } - case MYSQL_TYPE_NEWDECIMAL: - { - uint16 x= field_metadata[index++] << 8U; // precision - x+= field_metadata[index++]; // decimals - m_field_metadata[i]= x; - break; - } - default: - m_field_metadata[i]= 0; - break; - } - } - } - if (m_size && null_bitmap) - memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8); - } + table_def(unsigned char *types, ulong size, uchar *field_metadata, + int metadata_size, uchar *null_bitmap, uint16 flags); - ~table_def() { - my_free(m_memory, MYF(0)); -#ifndef DBUG_OFF - m_type= 0; - m_size= 0; -#endif - } + ~table_def(); /** Return the number of fields there is type data for. @@ -171,10 +69,40 @@ public: <code>index</code>. Currently, only the type identifier is returned. */ - field_type type(ulong index) const + enum_field_types type(ulong index) const { DBUG_ASSERT(index < m_size); - return m_type[index]; + /* + If the source type is MYSQL_TYPE_STRING, it can in reality be + either MYSQL_TYPE_STRING, MYSQL_TYPE_ENUM, or MYSQL_TYPE_SET, so + we might need to modify the type to get the real type. + */ + enum_field_types source_type= static_cast<enum_field_types>(m_type[index]); + uint16 source_metadata= m_field_metadata[index]; + switch (source_type) + { + case MYSQL_TYPE_STRING: + { + int real_type= source_metadata >> 8; + if (real_type == MYSQL_TYPE_ENUM || real_type == MYSQL_TYPE_SET) + source_type= static_cast<enum_field_types>(real_type); + break; + } + + /* + This type has not been used since before row-based replication, + so we can safely assume that it really is MYSQL_TYPE_NEWDATE. + */ + case MYSQL_TYPE_DATE: + source_type= MYSQL_TYPE_NEWDATE; + break; + + default: + /* Do nothing */ + break; + } + + return source_type; } @@ -226,26 +154,62 @@ public: with it. A table definition is compatible with a table if: - - the columns types of the table definition is a (not - necessarily proper) prefix of the column type of the table, or - - the other way around + - The columns types of the table definition is a (not + necessarily proper) prefix of the column type of the table. + + - The other way around. + - Each column on the master that also exists on the slave can be + converted according to the current settings of @c + SLAVE_TYPE_CONVERSIONS. + + @param thd @param rli Pointer to relay log info @param table Pointer to table to compare with. + @param[out] tmp_table_var Pointer to temporary table for holding + conversion table. + @retval 1 if the table definition is not compatible with @c table @retval 0 if the table definition is compatible with @c table */ #ifndef MYSQL_CLIENT - int compatible_with(Relay_log_info const *rli, TABLE *table) const; + bool compatible_with(THD *thd, Relay_log_info *rli, TABLE *table, + TABLE **conv_table_var) const; + + /** + Create a virtual in-memory temporary table structure. + + The table structure has records and field array so that a row can + be unpacked into the record for further processing. + + In the virtual table, each field that requires conversion will + have a non-NULL value, while fields that do not require + conversion will have a NULL value. + + Some information that is missing in the events, such as the + character set for string types, are taken from the table that the + field is going to be pushed into, so the target table that the data + eventually need to be pushed into need to be supplied. + + @param thd Thread to allocate memory from. + @param rli Relay log info structure, for error reporting. + @param target_table Target table for fields. + + @return A pointer to a temporary table with memory allocated in the + thread's memroot, NULL if the table could not be created + */ + TABLE *create_conversion_table(THD *thd, Relay_log_info *rli, TABLE *target_table) const; #endif + private: ulong m_size; // Number of elements in the types array - field_type *m_type; // Array of type descriptors + unsigned char *m_type; // Array of type descriptors uint m_field_metadata_size; uint16 *m_field_metadata; uchar *m_null_bits; + uint16 m_flags; // Table flags uchar *m_memory; }; @@ -260,6 +224,7 @@ struct RPL_TABLE_LIST { bool m_tabledef_valid; table_def m_tabledef; + TABLE *m_conv_table; }; diff --git a/sql/set_var.cc b/sql/set_var.cc index 42829590942..5f3ceb725ea 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -92,6 +92,33 @@ TYPELIB delay_key_write_typelib= delay_key_write_type_names, NULL }; +/** + SLAVE_TYPE_CONVERSIONS variable. + + Definition is equivalent to + @code + SET('ALL_NON_LOSSY', 'ALL_LOSSY') + @endcode + */ +const char *slave_type_conversions_type_name[]= { + "ALL_LOSSY", + "ALL_NON_LOSSY", + NullS +}; + +unsigned int slave_type_conversions_type_length[]= { + sizeof("ALL_LOSSY")-1, + sizeof("ALL_NON_LOSSY")-1, + 0 +}; + +TYPELIB slave_type_conversions_typelib= +{ + array_elements(slave_type_conversions_type_name)-1, "", + slave_type_conversions_type_name, + slave_type_conversions_type_length +}; + const char *slave_exec_mode_names[]= { "STRICT", "IDEMPOTENT", NullS }; static const unsigned int slave_exec_mode_names_len[]= @@ -583,6 +610,12 @@ static sys_var_set_slave_mode slave_exec_mode(&vars, &slave_exec_mode_options, &slave_exec_mode_typelib, 0); +static sys_var_set slave_type_conversions(&vars, + "slave_type_conversions", + &slave_type_conversions_options, + &slave_type_conversions_typelib, + 0); + static sys_var_long_ptr sys_slow_launch_time(&vars, "slow_launch_time", &slow_launch_time); static sys_var_thd_ulong sys_sort_buffer(&vars, "sort_buffer_size", @@ -1294,7 +1327,6 @@ bool sys_var_thd_binlog_format::check(THD *thd, set_var *var) { return result; } - bool sys_var_thd_binlog_format::is_readonly() const { /* @@ -1332,7 +1364,12 @@ bool sys_var_thd_binlog_format::is_readonly() const void fix_binlog_format_after_update(THD *thd, enum_var_type type) { - thd->reset_current_stmt_binlog_row_based(); + /* + @todo This function should be eliminated. We should not set the + current binlog format anywhere else than in decide_logging_format. + /Sven + */ + thd->reset_current_stmt_binlog_format_row(); } diff --git a/sql/set_var.h b/sql/set_var.h index 120388415f5..9b74dbce4ba 100644 --- a/sql/set_var.h +++ b/sql/set_var.h @@ -35,6 +35,7 @@ typedef struct my_locale_st MY_LOCALE; extern TYPELIB bool_typelib, delay_key_write_typelib, sql_mode_typelib, optimizer_switch_typelib, slave_exec_mode_typelib; +extern TYPELIB slave_type_conversions_typelib; typedef int (*sys_check_func)(THD *, set_var *); typedef bool (*sys_update_func)(THD *, set_var *); diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index 03d7ff30f7e..7d12143d143 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -6070,8 +6070,7 @@ ER_SLAVE_INCIDENT ER_NO_PARTITION_FOR_GIVEN_VALUE_SILENT eng "Table has no partition for some existing values" ER_BINLOG_UNSAFE_STATEMENT - eng "Statement may not be safe to log in statement format." - swe "Detta är inte säkert att logga i statement-format." + eng "Unsafe statement binlogged in statement format since BINLOG_FORMAT = STATEMENT. Reason for unsafeness: %s" ER_SLAVE_FATAL_ERROR eng "Fatal error: %s" ER_SLAVE_RELAY_LOG_READ_FAILURE @@ -6260,3 +6259,43 @@ ER_FIELD_TYPE_NOT_ALLOWED_AS_PARTITION_FIELD eng "Field '%-.192s' is of a not allowed type for this type of partitioning" ER_PARTITION_FIELDS_TOO_LONG eng "The total length of the partitioning fields is too large" + +ER_BINLOG_ROW_ENGINE_AND_STMT_ENGINE + eng "Cannot execute statement: binlogging impossible since both row-incapable engines and statement-incapable engines are involved." +ER_BINLOG_ROW_MODE_AND_STMT_ENGINE + eng "Cannot execute statement: binlogging impossible since BINLOG_FORMAT = ROW and at least one table uses a storage engine limited to statement-logging." +ER_BINLOG_UNSAFE_AND_STMT_ENGINE + eng "Cannot execute statement: binlogging of unsafe statement is impossible when storage engine is limited to statement-logging and BINLOG_FORMAT = MIXED. Reason for unsafeness: %s" +ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE + eng "Cannot execute row injection: binlogging impossible since at least one table uses a storage engine limited to statement-logging." +ER_BINLOG_STMT_MODE_AND_ROW_ENGINE + eng "Cannot execute statement: binlogging impossible since BINLOG_FORMAT = STATEMENT and at least one table uses a storage engine limited to row-logging.%s" +ER_BINLOG_ROW_INJECTION_AND_STMT_MODE + eng "Cannot execute row injection: binlogging impossible since BINLOG_FORMAT = STATEMENT." +ER_BINLOG_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE + eng "Cannot execute statement: binlogging impossible since more than one engine is involved and at least one engine is self-logging." + +ER_BINLOG_UNSAFE_LIMIT + eng "Statement uses a LIMIT clause. This is unsafe because the set of rows included cannot be predicted." +ER_BINLOG_UNSAFE_INSERT_DELAYED + eng "Statement uses INSERT DELAYED. This is unsafe because the time when rows are inserted cannot be predicted." +ER_BINLOG_UNSAFE_SYSTEM_TABLE + eng "Statement uses the general_log or slow_log table. This is unsafe because system tables may differ on slave." +ER_BINLOG_UNSAFE_TWO_AUTOINC_COLUMNS + eng "Statement updates two AUTO_INCREMENT columns. This is unsafe because the generated value cannot be predicted by slave." +ER_BINLOG_UNSAFE_UDF + eng "Statement uses a UDF. It cannot be determined if the UDF will return the same value on slave." +ER_BINLOG_UNSAFE_SYSTEM_VARIABLE + eng "Statement uses a system variable whose value may differ on slave." +ER_BINLOG_UNSAFE_SYSTEM_FUNCTION + eng "Statement uses a system function whose value may differ on slave." +ER_BINLOG_UNSAFE_NONTRANS_AFTER_TRANS + eng "Non-transactional reads or writes are unsafe if they occur after transactional reads or writes inside a transaction." + +ER_MESSAGE_AND_STATEMENT + eng "%s Statement: %s" + +ER_SLAVE_CONVERSION_FAILED + eng "Column %d of table '%-.192s.%-.192s' cannot be converted from type '%-.32s' to type '%-.32s'" +ER_SLAVE_CANT_CREATE_CONVERSION + eng "Can't create conversion table for table '%-.192s.%-.192s'" diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt index ab81c122497..b12141cd30f 100644 --- a/sql/share/errmsg.txt +++ b/sql/share/errmsg.txt @@ -6071,8 +6071,7 @@ ER_SLAVE_INCIDENT ER_NO_PARTITION_FOR_GIVEN_VALUE_SILENT eng "Table has no partition for some existing values" ER_BINLOG_UNSAFE_STATEMENT - eng "Statement may not be safe to log in statement format." - swe "Detta är inte säkert att logga i statement-format." + eng "Unsafe statement binlogged in statement format since BINLOG_FORMAT = STATEMENT. Reason for unsafeness: %s" ER_SLAVE_FATAL_ERROR eng "Fatal error: %s" ER_SLAVE_RELAY_LOG_READ_FAILURE @@ -6202,6 +6201,42 @@ ER_TOO_MANY_CONCURRENT_TRXS WARN_NON_ASCII_SEPARATOR_NOT_IMPLEMENTED eng "Non-ASCII separator arguments are not fully supported" +ER_BINLOG_ROW_ENGINE_AND_STMT_ENGINE + eng "Cannot execute statement: binlogging impossible since both row-incapable engines and statement-incapable engines are involved." +ER_BINLOG_ROW_MODE_AND_STMT_ENGINE + eng "Cannot execute statement: binlogging impossible since BINLOG_FORMAT = ROW and at least one table uses a storage engine limited to statement-logging." +ER_BINLOG_UNSAFE_AND_STMT_ENGINE + eng "Cannot execute statement: binlogging of unsafe statement is impossible when storage engine is limited to statement-logging and BINLOG_FORMAT = MIXED. Reason for unsafeness: %s" +ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE + eng "Cannot execute row injection: binlogging impossible since at least one table uses a storage engine limited to statement-logging." +ER_BINLOG_STMT_MODE_AND_ROW_ENGINE + eng "Cannot execute statement: binlogging impossible since BINLOG_FORMAT = STATEMENT and at least one table uses a storage engine limited to row-logging.%s" +ER_BINLOG_ROW_INJECTION_AND_STMT_MODE + eng "Cannot execute row injection: binlogging impossible since BINLOG_FORMAT = STATEMENT." +ER_BINLOG_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE + eng "Cannot execute statement: binlogging impossible since more than one engine is involved and at least one engine is self-logging." +ER_BINLOG_UNSAFE_LIMIT + eng "Statement uses a LIMIT clause. This is unsafe because the set of rows included cannot be predicted." +ER_BINLOG_UNSAFE_INSERT_DELAYED + eng "Statement uses INSERT DELAYED. This is unsafe because the time when rows are inserted cannot be predicted." +ER_BINLOG_UNSAFE_SYSTEM_TABLE + eng "Statement uses the general_log or slow_log table. This is unsafe because system tables may differ on slave." +ER_BINLOG_UNSAFE_TWO_AUTOINC_COLUMNS + eng "Statement updates two AUTO_INCREMENT columns. This is unsafe because the generated value cannot be predicted by slave." +ER_BINLOG_UNSAFE_UDF + eng "Statement uses a UDF. It cannot be determined if the UDF will return the same value on slave." +ER_BINLOG_UNSAFE_SYSTEM_VARIABLE + eng "Statement uses a system variable whose value may differ on slave." +ER_BINLOG_UNSAFE_SYSTEM_FUNCTION + eng "Statement uses a system function whose value may differ on slave." +ER_BINLOG_UNSAFE_NONTRANS_AFTER_TRANS + eng "Non-transactional reads or writes are unsafe if they occur after transactional reads or writes inside a transaction." + +ER_MESSAGE_AND_STATEMENT + eng "%s Statement: %s" + +ER_SLAVE_IGNORE_SERVER_IDS + eng "The requested server id %d clashes with the slave startup option --replicate-same-server-id" ER_DEBUG_SYNC_TIMEOUT eng "debug sync point wait timed out" ger "Debug Sync Point Wartezeit überschritten" @@ -6260,3 +6295,7 @@ ER_FIELD_TYPE_NOT_ALLOWED_AS_PARTITION_FIELD eng "Field '%-.192s' is of a not allowed type for this type of partitioning" ER_PARTITION_FIELDS_TOO_LONG eng "The total length of the partitioning fields is too large" +ER_SLAVE_CONVERSION_FAILED + eng "Column %d of table '%-.192s.%-.192s' cannot be converted from type '%-.32s' to type '%-.32s'" +ER_SLAVE_CANT_CREATE_CONVERSION + eng "Can't create conversion table for table '%-.192s.%-.192s'" diff --git a/sql/slave.cc b/sql/slave.cc index 1cfd54a7dc0..377ea3ecea9 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -464,6 +464,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) DBUG_RETURN(0); /* successfully do nothing */ int error,force_all = (thread_mask & SLAVE_FORCE_ALL); pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock; + pthread_mutex_t *log_lock= mi->rli.relay_log.get_log_lock(); if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) { @@ -475,6 +476,22 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) skip_lock)) && !force_all) DBUG_RETURN(error); + + pthread_mutex_lock(log_lock); + + DBUG_PRINT("info",("Flushing relay log and master info file.")); + if (current_thd) + thd_proc_info(current_thd, "Flushing relay log and master info files."); + if (flush_master_info(mi, TRUE /* flush relay log */)) + DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); + + if (my_sync(mi->rli.relay_log.get_log_file()->file, MYF(MY_WME))) + DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); + + if (my_sync(mi->fd, MYF(MY_WME))) + DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); + + pthread_mutex_unlock(log_lock); } if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) { @@ -486,8 +503,21 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) skip_lock)) && !force_all) DBUG_RETURN(error); + + pthread_mutex_lock(log_lock); + + DBUG_PRINT("info",("Flushing relay-log info file.")); + if (current_thd) + thd_proc_info(current_thd, "Flushing relay-log info file."); + if (flush_relay_log_info(&mi->rli)) + DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); + + if (my_sync(mi->rli.info_fd, MYF(MY_WME))) + DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); + + pthread_mutex_unlock(log_lock); } - DBUG_RETURN(0); + DBUG_RETURN(0); } @@ -1592,6 +1622,12 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, pos= net_store_data(pos, (uchar*) report_user, report_user_len); pos= net_store_data(pos, (uchar*) report_password, report_password_len); int2store(pos, (uint16) report_port); pos+= 2; + /* + Fake rpl_recovery_rank, which was removed in BUG#13963, + so that this server can register itself on old servers, + see BUG#49259. + */ + int4store(pos, /* rpl_recovery_rank */ 0); pos+= 4; /* The master will fill in master_id */ int4store(pos, 0); pos+= 4; @@ -4205,8 +4241,9 @@ MYSQL *rpl_connect_master(MYSQL *mysql) rli Relay log information NOTES - - As this is only called by the slave thread, we don't need to - have a lock on this. + - As this is only called by the slave thread or on STOP SLAVE, with the + log_lock grabbed and the slave thread stopped, we don't need to have + a lock here. - If there is an active transaction, then we don't update the position in the relay log. This is to ensure that we re-execute statements if we die in the middle of an transaction that was rolled back. @@ -4257,7 +4294,10 @@ bool flush_relay_log_info(Relay_log_info* rli) error=1; rli->sync_counter= 0; } - /* Flushing the relay log is done by the slave I/O thread */ + /* + Flushing the relay log is done by the slave I/O thread + or by the user on STOP SLAVE. + */ DBUG_RETURN(error); } diff --git a/sql/sp.cc b/sql/sp.cc index 0ed4855e5d3..11ae2646f14 100644 --- a/sql/sp.cc +++ b/sql/sp.cc @@ -926,7 +926,7 @@ sp_create_routine(THD *thd, int type, sp_head *sp) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); saved_count_cuted_fields= thd->count_cuted_fields; thd->count_cuted_fields= CHECK_FIELD_WARN; @@ -1118,9 +1118,9 @@ sp_create_routine(THD *thd, int type, sp_head *sp) /* restore sql_mode when binloging */ thd->variables.sql_mode= saved_mode; /* Such a statement can always go directly to binlog, no trans cache */ - if (thd->binlog_query(THD::MYSQL_QUERY_TYPE, + if (thd->binlog_query(THD::STMT_QUERY_TYPE, log_query.c_ptr(), log_query.length(), - FALSE, FALSE, 0)) + FALSE, FALSE, FALSE, 0)) ret= SP_INTERNAL_ERROR; thd->variables.sql_mode= 0; } @@ -1168,7 +1168,7 @@ sp_drop_routine(THD *thd, int type, sp_name *name) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); if (!(table= open_proc_table_for_update(thd))) DBUG_RETURN(SP_OPEN_TABLE_FAILED); @@ -1223,7 +1223,7 @@ sp_update_routine(THD *thd, int type, sp_name *name, st_sp_chistics *chistics) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); if (!(table= open_proc_table_for_update(thd))) DBUG_RETURN(SP_OPEN_TABLE_FAILED); @@ -2025,6 +2025,8 @@ sp_cache_routines_and_add_tables_for_triggers(THD *thd, LEX *lex, { int ret= 0; + DBUG_ENTER("sp_cache_routines_and_add_tables_for_triggers"); + Sroutine_hash_entry **last_cached_routine_ptr= (Sroutine_hash_entry **)lex->sroutines_list.next; @@ -2058,7 +2060,7 @@ sp_cache_routines_and_add_tables_for_triggers(THD *thd, LEX *lex, ret= sp_cache_routines_and_add_tables_aux(thd, lex, *last_cached_routine_ptr, FALSE); - return ret; + DBUG_RETURN(ret); } diff --git a/sql/sp_head.cc b/sql/sp_head.cc index 3f7d812384c..8faa65a1cf3 100644 --- a/sql/sp_head.cc +++ b/sql/sp_head.cc @@ -510,7 +510,7 @@ sp_head::operator delete(void *ptr, size_t size) throw() sp_head::sp_head() :Query_arena(&main_mem_root, INITIALIZED_FOR_SP), - m_flags(0), m_recursion_level(0), m_next_cached_sp(0), + m_flags(0), unsafe_flags(0), m_recursion_level(0), m_next_cached_sp(0), m_cont_level(0) { const LEX_STRING str_reset= { NULL, 0 }; @@ -1707,7 +1707,7 @@ sp_head::execute_function(THD *thd, Item **argp, uint argcount, each substatement be binlogged its way. */ need_binlog_call= mysql_bin_log.is_open() && - (thd->options & OPTION_BIN_LOG) && !thd->current_stmt_binlog_row_based; + (thd->options & OPTION_BIN_LOG) && !thd->is_current_stmt_binlog_format_row(); /* Remember the original arguments for unrolled replication of functions @@ -1796,7 +1796,7 @@ sp_head::execute_function(THD *thd, Item **argp, uint argcount, { int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED); Query_log_event qinfo(thd, binlog_buf.ptr(), binlog_buf.length(), - thd->binlog_evt_union.unioned_events_trans, FALSE, errcode); + thd->binlog_evt_union.unioned_events_trans, FALSE, FALSE, errcode); if (mysql_bin_log.write(&qinfo) && thd->binlog_evt_union.unioned_events_trans) { @@ -2144,13 +2144,10 @@ sp_head::restore_lex(THD *thd) oldlex->trg_table_fields.push_back(&sublex->trg_table_fields); - /* - If this substatement needs row-based, the entire routine does too (we - cannot switch from statement-based to row-based only for this - substatement). - */ - if (sublex->is_stmt_unsafe()) - m_flags|= BINLOG_ROW_BASED_IF_MIXED; + /* If this substatement is unsafe, the entire routine is too. */ + DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags: 0x%x", + thd->lex->get_stmt_unsafe_flags())); + unsafe_flags|= sublex->get_stmt_unsafe_flags(); /* Add routines which are used by statement to respective set for diff --git a/sql/sp_head.h b/sql/sp_head.h index 00c96d44f70..138d0a2f506 100644 --- a/sql/sp_head.h +++ b/sql/sp_head.h @@ -165,9 +165,8 @@ public: HAS_COMMIT_OR_ROLLBACK= 128, LOG_SLOW_STATEMENTS= 256, // Used by events LOG_GENERAL_LOG= 512, // Used by events - BINLOG_ROW_BASED_IF_MIXED= 1024, - HAS_SQLCOM_RESET= 2048, - HAS_SQLCOM_FLUSH= 4096 + HAS_SQLCOM_RESET= 1024, + HAS_SQLCOM_FLUSH= 2048 }; /** TYPE_ENUM_FUNCTION, TYPE_ENUM_PROCEDURE or TYPE_ENUM_TRIGGER */ @@ -198,6 +197,11 @@ public: private: Stored_program_creation_ctx *m_creation_ctx; + /** + Boolean combination of (1<<flag), where flag is a member of + LEX::enum_binlog_stmt_unsafe. + */ + uint32 unsafe_flags; public: inline Stored_program_creation_ctx *get_creation_ctx() @@ -453,20 +457,24 @@ public: #endif /* - This method is intended for attributes of a routine which need - to propagate upwards to the LEX of the caller (when a property of a - sp_head needs to "taint" the caller). + This method is intended for attributes of a routine which need to + propagate upwards to the LEX of the caller. */ void propagate_attributes(LEX *lex) { + DBUG_ENTER("sp_head::propagate_attributes"); /* If this routine needs row-based binary logging, the entire top statement too (we cannot switch from statement-based to row-based only for this routine, as in statement-based the top-statement may be binlogged and the substatements not). */ - if (m_flags & BINLOG_ROW_BASED_IF_MIXED) - lex->set_stmt_unsafe(); + DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x", + lex->get_stmt_unsafe_flags())); + DBUG_PRINT("info", ("sp_head(0x%p=%s)->unsafe_flags: 0x%x", + this, name(), unsafe_flags)); + lex->set_stmt_unsafe_flags(unsafe_flags); + DBUG_VOID_RETURN; } diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index 05e8cee9151..885072ab32c 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -1655,8 +1655,8 @@ bool change_password(THD *thd, const char *host, const char *user, acl_user->host.hostname ? acl_user->host.hostname : "", new_password)); thd->clear_error(); - result= thd->binlog_query(THD::MYSQL_QUERY_TYPE, buff, query_length, - FALSE, FALSE, 0); + result= thd->binlog_query(THD::STMT_QUERY_TYPE, buff, query_length, + FALSE, FALSE, FALSE, 0); } end: close_thread_tables(thd); @@ -3130,7 +3130,7 @@ int mysql_table_grant(THD *thd, TABLE_LIST *table_list, row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); #ifdef HAVE_REPLICATION /* @@ -3347,7 +3347,7 @@ bool mysql_routine_grant(THD *thd, TABLE_LIST *table_list, bool is_proc, row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); #ifdef HAVE_REPLICATION /* @@ -3487,7 +3487,7 @@ bool mysql_grant(THD *thd, const char *db, List <LEX_USER> &list, row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); #ifdef HAVE_REPLICATION /* @@ -5840,7 +5840,7 @@ bool mysql_create_user(THD *thd, List <LEX_USER> &list) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); /* CREATE USER may be skipped on replication client. */ if ((result= open_grant_tables(thd, tables))) @@ -5920,7 +5920,7 @@ bool mysql_drop_user(THD *thd, List <LEX_USER> &list) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); /* DROP USER may be skipped on replication client. */ if ((result= open_grant_tables(thd, tables))) @@ -5994,7 +5994,7 @@ bool mysql_rename_user(THD *thd, List <LEX_USER> &list) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); /* RENAME USER may be skipped on replication client. */ if ((result= open_grant_tables(thd, tables))) @@ -6076,7 +6076,7 @@ bool mysql_revoke_all(THD *thd, List <LEX_USER> &list) row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); if ((result= open_grant_tables(thd, tables))) DBUG_RETURN(result != 1); @@ -6339,7 +6339,7 @@ bool sp_revoke_privileges(THD *thd, const char *sp_db, const char *sp_name, row-based replication. The flag will be reset at the end of the statement. */ - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); /* Remove procedure access */ do diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 9a778e75a11..a3845ee53e9 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -31,7 +31,6 @@ #include <io.h> #endif -#define FLAGSTR(S,F) ((S) & (F) ? #F " " : "") /** This internal handler is used to trap internally @@ -1452,7 +1451,7 @@ void close_temporary_tables(THD *thd) return; if (!mysql_bin_log.is_open() || - (thd->current_stmt_binlog_row_based && thd->variables.binlog_format == BINLOG_FORMAT_ROW)) + (thd->is_current_stmt_binlog_format_row() && thd->variables.binlog_format == BINLOG_FORMAT_ROW)) { TABLE *tmp_next; for (table= thd->temporary_tables; table; table= tmp_next) @@ -1554,7 +1553,7 @@ void close_temporary_tables(THD *thd) thd->variables.character_set_client= system_charset_info; Query_log_event qinfo(thd, s_query.ptr(), s_query.length() - 1 /* to remove trailing ',' */, - 0, FALSE, 0); + FALSE, TRUE, FALSE, 0); qinfo.db= db.ptr(); qinfo.db_len= db.length(); thd->variables.character_set_client= cs_save; @@ -4069,7 +4068,7 @@ retry: int errcode= query_error_code(thd, TRUE); if (thd->binlog_query(THD::STMT_QUERY_TYPE, query, (ulong)(end-query), - FALSE, FALSE, errcode)) + FALSE, FALSE, FALSE, errcode)) { my_free(query, MYF(0)); goto err; @@ -4885,7 +4884,7 @@ static bool check_lock_and_start_stmt(THD *thd, TABLE *table, There may be more differences between open_n_lock_single_table() and open_ltable(). One known difference is that open_ltable() does - neither call decide_logging_format() nor handle some other logging + neither call thd->decide_logging_format() nor handle some other logging and locking issues because it does not call lock_tables(). */ @@ -5105,165 +5104,6 @@ static void mark_real_tables_as_free_for_reuse(TABLE_LIST *table) } -/** - Decide on logging format to use for the statement. - - Compute the capabilities vector for the involved storage engines - and mask out the flags for the binary log. Right now, the binlog - flags only include the capabilities of the storage engines, so this - is safe. - - We now have three alternatives that prevent the statement from - being loggable: - - 1. If there are no capabilities left (all flags are clear) it is - not possible to log the statement at all, so we roll back the - statement and report an error. - - 2. Statement mode is set, but the capabilities indicate that - statement format is not possible. - - 3. Row mode is set, but the capabilities indicate that row - format is not possible. - - 4. Statement is unsafe, but the capabilities indicate that row - format is not possible. - - If we are in MIXED mode, we then decide what logging format to use: - - 1. If the statement is unsafe, row-based logging is used. - - 2. If statement-based logging is not possible, row-based logging is - used. - - 3. Otherwise, statement-based logging is used. - - @param thd Client thread - @param tables Tables involved in the query - */ - -int decide_logging_format(THD *thd, TABLE_LIST *tables) -{ - /* - In SBR mode, we are only proceeding if we are binlogging this - statement, ie, the filtering rules won't later filter this out. - - This check here is needed to prevent some spurious error to be - raised in some cases (See BUG#42829). - */ - if (mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG) && - (thd->variables.binlog_format != BINLOG_FORMAT_STMT || - binlog_filter->db_ok(thd->db))) - { - /* - Compute the starting vectors for the computations by creating a - set with all the capabilities bits set and one with no - capabilities bits set. - */ - handler::Table_flags flags_some_set= 0; - handler::Table_flags flags_all_set= - HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE; - - my_bool multi_engine= FALSE; - void* prev_ht= NULL; - for (TABLE_LIST *table= tables; table; table= table->next_global) - { - if (table->placeholder()) - continue; - if (table->table->s->table_category == TABLE_CATEGORY_PERFORMANCE) - thd->lex->set_stmt_unsafe(); - if (table->lock_type >= TL_WRITE_ALLOW_WRITE) - { - ulonglong const flags= table->table->file->ha_table_flags(); - DBUG_PRINT("info", ("table: %s; ha_table_flags: %s%s", - table->table_name, - FLAGSTR(flags, HA_BINLOG_STMT_CAPABLE), - FLAGSTR(flags, HA_BINLOG_ROW_CAPABLE))); - if (prev_ht && prev_ht != table->table->file->ht) - multi_engine= TRUE; - prev_ht= table->table->file->ht; - flags_all_set &= flags; - flags_some_set |= flags; - } - } - - DBUG_PRINT("info", ("flags_all_set: %s%s", - FLAGSTR(flags_all_set, HA_BINLOG_STMT_CAPABLE), - FLAGSTR(flags_all_set, HA_BINLOG_ROW_CAPABLE))); - DBUG_PRINT("info", ("flags_some_set: %s%s", - FLAGSTR(flags_some_set, HA_BINLOG_STMT_CAPABLE), - FLAGSTR(flags_some_set, HA_BINLOG_ROW_CAPABLE))); - DBUG_PRINT("info", ("thd->variables.binlog_format: %ld", - thd->variables.binlog_format)); - DBUG_PRINT("info", ("multi_engine: %s", - multi_engine ? "TRUE" : "FALSE")); - - int error= 0; - if (flags_all_set == 0) - { - my_error((error= ER_BINLOG_LOGGING_IMPOSSIBLE), MYF(0), - "Statement cannot be logged to the binary log in" - " row-based nor statement-based format"); - } - else if (thd->variables.binlog_format == BINLOG_FORMAT_STMT && - (flags_all_set & HA_BINLOG_STMT_CAPABLE) == 0) - { - my_error((error= ER_BINLOG_LOGGING_IMPOSSIBLE), MYF(0), - "Statement-based format required for this statement," - " but not allowed by this combination of engines"); - } - else if ((thd->variables.binlog_format == BINLOG_FORMAT_ROW || - thd->lex->is_stmt_unsafe()) && - (flags_all_set & HA_BINLOG_ROW_CAPABLE) == 0) - { - my_error((error= ER_BINLOG_LOGGING_IMPOSSIBLE), MYF(0), - "Row-based format required for this statement," - " but not allowed by this combination of engines"); - } - - /* - If more than one engine is involved in the statement and at - least one is doing it's own logging (is *self-logging*), the - statement cannot be logged atomically, so we generate an error - rather than allowing the binlog to become corrupt. - */ - if (multi_engine && - (flags_some_set & HA_HAS_OWN_BINLOGGING)) - { - error= ER_BINLOG_LOGGING_IMPOSSIBLE; - my_error(error, MYF(0), - "Statement cannot be written atomically since more" - " than one engine involved and at least one engine" - " is self-logging"); - } - - DBUG_PRINT("info", ("error: %d", error)); - - if (error) - return -1; - - /* - We switch to row-based format if we are in mixed mode and one of - the following are true: - - 1. If the statement is unsafe - 2. If statement format cannot be used - - Observe that point to cannot be decided before the tables - involved in a statement has been checked, i.e., we cannot put - this code in reset_current_stmt_binlog_row_based(), it has to be - here. - */ - if (thd->lex->is_stmt_unsafe() || - (flags_all_set & HA_BINLOG_STMT_CAPABLE) == 0) - { - thd->set_current_stmt_binlog_row_based_if_mixed(); - } - } - - return 0; -} - /* Lock all tables in list @@ -5305,7 +5145,7 @@ int lock_tables(THD *thd, TABLE_LIST *tables, uint count, bool *need_reopen) *need_reopen= FALSE; if (!tables && !thd->lex->requires_prelocking()) - DBUG_RETURN(decide_logging_format(thd, tables)); + DBUG_RETURN(thd->decide_logging_format(tables)); /* We need this extra check for thd->prelocked_mode because we want to avoid @@ -5344,7 +5184,8 @@ int lock_tables(THD *thd, TABLE_LIST *tables, uint count, bool *need_reopen) if (thd->variables.binlog_format != BINLOG_FORMAT_ROW && tables && has_write_table_with_auto_increment(thd->lex->first_not_own_table())) { - thd->lex->set_stmt_unsafe(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_TWO_AUTOINC_COLUMNS); + thd->set_current_stmt_binlog_format_row_if_mixed(); } } @@ -5467,7 +5308,7 @@ int lock_tables(THD *thd, TABLE_LIST *tables, uint count, bool *need_reopen) } } - DBUG_RETURN(decide_logging_format(thd, tables)); + DBUG_RETURN(thd->decide_logging_format(tables)); } diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 019c22d9dd2..d0cfa1d7614 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -39,6 +39,7 @@ #include <io.h> #endif #include <mysys_err.h> +#include <limits.h> #include "sp_rcontext.h" #include "sp_cache.h" @@ -449,7 +450,7 @@ THD::THD() lock_id(&main_lock_id), user_time(0), in_sub_stmt(0), sql_log_bin_toplevel(false), - binlog_table_maps(0), binlog_flags(0UL), + binlog_unsafe_warning_flags(0), binlog_table_maps(0), table_map_for_update(0), arg_of_last_insert_id_function(FALSE), first_successful_insert_id_in_prev_stmt(0), @@ -910,14 +911,15 @@ void THD::init(void) else options &= ~OPTION_BIG_SELECTS; - transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= FALSE; + transaction.all.modified_non_trans_table= + transaction.stmt.modified_non_trans_table= FALSE; open_options=ha_open_options; update_lock_default= (variables.low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE); session_tx_isolation= (enum_tx_isolation) variables.tx_isolation; update_charset(); - reset_current_stmt_binlog_row_based(); + reset_current_stmt_binlog_format_row(); bzero((char *) &status_var, sizeof(status_var)); sql_log_bin_toplevel= options & OPTION_BIN_LOG; @@ -3176,13 +3178,13 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup, first_successful_insert_id_in_cur_stmt; if ((!lex->requires_prelocking() || is_update_query(lex->sql_command)) && - !current_stmt_binlog_row_based) + !is_current_stmt_binlog_format_row()) { options&= ~OPTION_BIN_LOG; } if ((backup->options & OPTION_BIN_LOG) && is_update_query(lex->sql_command)&& - !current_stmt_binlog_row_based) + !is_current_stmt_binlog_format_row()) mysql_bin_log.start_union_events(this, this->query_id); /* Disable result sets */ @@ -3244,7 +3246,7 @@ void THD::restore_sub_statement_state(Sub_statement_state *backup) is_fatal_sub_stmt_error= FALSE; if ((options & OPTION_BIN_LOG) && is_update_query(lex->sql_command) && - !current_stmt_binlog_row_based) + !is_current_stmt_binlog_format_row()) mysql_bin_log.stop_union_events(this); /* @@ -3407,6 +3409,392 @@ void xid_cache_delete(XID_STATE *xid_state) pthread_mutex_unlock(&LOCK_xid_cache); } + +/** + Decide on logging format to use for the statement and issue errors + or warnings as needed. The decision depends on the following + parameters: + + - The logging mode, i.e., the value of binlog_format. Can be + statement, mixed, or row. + + - The type of statement. There are three types of statements: + "normal" safe statements; unsafe statements; and row injections. + An unsafe statement is one that, if logged in statement format, + might produce different results when replayed on the slave (e.g., + INSERT DELAYED). A row injection is either a BINLOG statement, or + a row event executed by the slave's SQL thread. + + - The capabilities of tables modified by the statement. The + *capabilities vector* for a table is a set of flags associated + with the table. Currently, it only includes two flags: *row + capability flag* and *statement capability flag*. + + The row capability flag is set if and only if the engine can + handle row-based logging. The statement capability flag is set if + and only if the table can handle statement-based logging. + + Decision table for logging format + --------------------------------- + + The following table summarizes how the format and generated + warning/error depends on the tables' capabilities, the statement + type, and the current binlog_format. + + Row capable N NNNNNNNNN YYYYYYYYY YYYYYYYYY + Statement capable N YYYYYYYYY NNNNNNNNN YYYYYYYYY + + Statement type * SSSUUUIII SSSUUUIII SSSUUUIII + + binlog_format * SMRSMRSMR SMRSMRSMR SMRSMRSMR + + Logged format - SS-S----- -RR-RR-RR SRRSRR-RR + Warning/Error 1 --2732444 5--5--6-- ---7--6-- + + Legend + ------ + + Row capable: N - Some table not row-capable, Y - All tables row-capable + Stmt capable: N - Some table not stmt-capable, Y - All tables stmt-capable + Statement type: (S)afe, (U)nsafe, or Row (I)njection + binlog_format: (S)TATEMENT, (M)IXED, or (R)OW + Logged format: (S)tatement or (R)ow + Warning/Error: Warnings and error messages are as follows: + + 1. Error: Cannot execute statement: binlogging impossible since both + row-incapable engines and statement-incapable engines are + involved. + + 2. Error: Cannot execute statement: binlogging impossible since + BINLOG_FORMAT = ROW and at least one table uses a storage engine + limited to statement-logging. + + 3. Error: Cannot execute statement: binlogging of unsafe statement + is impossible when storage engine is limited to statement-logging + and BINLOG_FORMAT = MIXED. + + 4. Error: Cannot execute row injection: binlogging impossible since + at least one table uses a storage engine limited to + statement-logging. + + 5. Error: Cannot execute statement: binlogging impossible since + BINLOG_FORMAT = STATEMENT and at least one table uses a storage + engine limited to row-logging. + + 6. Error: Cannot execute row injection: binlogging impossible since + BINLOG_FORMAT = STATEMENT. + + 7. Warning: Unsafe statement binlogged in statement format since + BINLOG_FORMAT = STATEMENT. + + In addition, we can produce the following error (not depending on + the variables of the decision diagram): + + 8. Error: Cannot execute statement: binlogging impossible since more + than one engine is involved and at least one engine is + self-logging. + + For each error case above, the statement is prevented from being + logged, we report an error, and roll back the statement. For + warnings, we set the thd->binlog_flags variable: the warning will be + printed only if the statement is successfully logged. + + @see THD::binlog_query + + @param[in] thd Client thread + @param[in] tables Tables involved in the query + + @retval 0 No error; statement can be logged. + @retval -1 One of the error conditions above applies (1, 2, 4, 5, or 6). +*/ + +int THD::decide_logging_format(TABLE_LIST *tables) +{ + DBUG_ENTER("THD::decide_logging_format"); + DBUG_PRINT("info", ("query: %s", query())); + DBUG_PRINT("info", ("variables.binlog_format: %ld", + variables.binlog_format)); + DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x", + lex->get_stmt_unsafe_flags())); + + /* + We should not decide logging format if the binlog is closed or + binlogging is off, or if the statement is filtered out from the + binlog by filtering rules. + */ + if (mysql_bin_log.is_open() && (options & OPTION_BIN_LOG) && + !(variables.binlog_format == BINLOG_FORMAT_STMT && + !binlog_filter->db_ok(db))) + { + /* + Compute one bit field with the union of all the engine + capabilities, and one with the intersection of all the engine + capabilities. + */ + handler::Table_flags flags_some_set= 0; + handler::Table_flags flags_all_set= + HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE; + + my_bool multi_engine= FALSE; + my_bool mixed_engine= FALSE; + my_bool all_trans_engines= TRUE; + TABLE* prev_write_table= NULL; + TABLE* prev_access_table= NULL; + +#ifndef DBUG_OFF + { + static const char *prelocked_mode_name[] = { + "NON_PRELOCKED", + "PRELOCKED", + "PRELOCKED_UNDER_LOCK_TABLES", + }; + DBUG_PRINT("debug", ("prelocked_mode: %s", + prelocked_mode_name[prelocked_mode])); + } +#endif + + /* + Get the capabilities vector for all involved storage engines and + mask out the flags for the binary log. + */ + for (TABLE_LIST *table= tables; table; table= table->next_global) + { + if (table->placeholder()) + continue; + if (table->table->s->table_category == TABLE_CATEGORY_PERFORMANCE) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_TABLE); + if (table->lock_type >= TL_WRITE_ALLOW_WRITE) + { + handler::Table_flags const flags= table->table->file->ha_table_flags(); + DBUG_PRINT("info", ("table: %s; ha_table_flags: 0x%llx", + table->table_name, flags)); + if (prev_write_table && prev_write_table->file->ht != + table->table->file->ht) + multi_engine= TRUE; + all_trans_engines= all_trans_engines && + table->table->file->has_transactions(); + prev_write_table= table->table; + flags_all_set &= flags; + flags_some_set |= flags; + } + if (prev_access_table && prev_access_table->file->ht != table->table->file->ht) + mixed_engine= mixed_engine || (prev_access_table->file->has_transactions() != + table->table->file->has_transactions()); + prev_access_table= table->table; + } + + /* + Set the statement as unsafe if: + + . it is a mixed statement, i.e. access transactional and non-transactional + tables, and updates at least one; + or + . an early statement updated a transactional table; + . and, the current statement updates a non-transactional table. + + Any mixed statement is classified as unsafe to ensure that mixed mode is + completely safe. Consider the following example to understand why we + decided to do this: + + Note that mixed statements such as + + 1: INSERT INTO myisam_t SELECT * FROM innodb_t; + + 2: INSERT INTO innodb_t SELECT * FROM myisam_t; + + are classified as unsafe to ensure that in mixed mode the execution is + completely safe and equivalent to the row mode. Consider the following + statements and sessions (connections) to understand the reason: + + con1: INSERT INTO innodb_t VALUES (1); + con1: INSERT INTO innodb_t VALUES (100); + + con1: BEGIN + con2: INSERT INTO myisam_t SELECT * FROM innodb_t; + con1: INSERT INTO innodb_t VALUES (200); + con1: COMMIT; + + The point is that the concurrent statements may be written into the binary log + in a way different from the execution. For example, + + BINARY LOG: + + con2: BEGIN; + con2: INSERT INTO myisam_t SELECT * FROM innodb_t; + con2: COMMIT; + con1: BEGIN + con1: INSERT INTO innodb_t VALUES (200); + con1: COMMIT; + + .... + + or + + BINARY LOG: + + con1: BEGIN + con1: INSERT INTO innodb_t VALUES (200); + con1: COMMIT; + con2: BEGIN; + con2: INSERT INTO myisam_t SELECT * FROM innodb_t; + con2: COMMIT; + + Clearly, this may become a problem in STMT mode and setting the statement + as unsafe will make rows to be written into the binary log in MIXED mode + and as such the problem will not stand. + + In STMT mode, although such statement is classified as unsafe, i.e. + + INSERT INTO myisam_t SELECT * FROM innodb_t; + + there is no enough information to avoid writing it outside the boundaries + of a transaction. This is not a problem if we are considering snapshot + isolation level but if we have pure repeatable read or serializable the + lock history on the slave will be different from the master. + */ + if (mixed_engine || + trans_has_updated_trans_table(this) && !all_trans_engines) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_NONTRANS_AFTER_TRANS); + + DBUG_PRINT("info", ("flags_all_set: 0x%llx", flags_all_set)); + DBUG_PRINT("info", ("flags_some_set: 0x%llx", flags_some_set)); + DBUG_PRINT("info", ("multi_engine: %d", multi_engine)); + + int error= 0; + int unsafe_flags; + + /* + If more than one engine is involved in the statement and at + least one is doing it's own logging (is *self-logging*), the + statement cannot be logged atomically, so we generate an error + rather than allowing the binlog to become corrupt. + */ + if (multi_engine && + (flags_some_set & HA_HAS_OWN_BINLOGGING)) + { + my_error((error= ER_BINLOG_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE), + MYF(0)); + } + + /* both statement-only and row-only engines involved */ + if ((flags_all_set & (HA_BINLOG_STMT_CAPABLE | HA_BINLOG_ROW_CAPABLE)) == 0) + { + /* + 1. Error: Binary logging impossible since both row-incapable + engines and statement-incapable engines are involved + */ + my_error((error= ER_BINLOG_ROW_ENGINE_AND_STMT_ENGINE), MYF(0)); + } + /* statement-only engines involved */ + else if ((flags_all_set & HA_BINLOG_ROW_CAPABLE) == 0) + { + if (lex->is_stmt_row_injection()) + { + /* + 4. Error: Cannot execute row injection since table uses + storage engine limited to statement-logging + */ + my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0)); + } + else if (variables.binlog_format == BINLOG_FORMAT_ROW) + { + /* + 2. Error: Cannot modify table that uses a storage engine + limited to statement-logging when BINLOG_FORMAT = ROW + */ + my_error((error= ER_BINLOG_ROW_MODE_AND_STMT_ENGINE), MYF(0)); + } + else if ((unsafe_flags= lex->get_stmt_unsafe_flags()) != 0) + { + /* + 3. Error: Cannot execute statement: binlogging of unsafe + statement is impossible when storage engine is limited to + statement-logging and BINLOG_FORMAT = MIXED. + */ + for (int unsafe_type= 0; + unsafe_type < LEX::BINLOG_STMT_UNSAFE_COUNT; + unsafe_type++) + if (unsafe_flags & (1 << unsafe_type)) + my_error((error= ER_BINLOG_UNSAFE_AND_STMT_ENGINE), MYF(0), + ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); + } + /* log in statement format! */ + } + /* no statement-only engines */ + else + { + /* binlog_format = STATEMENT */ + if (variables.binlog_format == BINLOG_FORMAT_STMT) + { + if (lex->is_stmt_row_injection()) + { + /* + 6. Error: Cannot execute row injection since + BINLOG_FORMAT = STATEMENT + */ + my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_MODE), MYF(0)); + } + else if ((flags_all_set & HA_BINLOG_STMT_CAPABLE) == 0) + { + /* + 5. Error: Cannot modify table that uses a storage engine + limited to row-logging when binlog_format = STATEMENT + */ + my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), ""); + } + else if ((unsafe_flags= lex->get_stmt_unsafe_flags()) != 0) + { + /* + 7. Warning: Unsafe statement logged as statement due to + binlog_format = STATEMENT + */ + binlog_unsafe_warning_flags|= unsafe_flags; + DBUG_PRINT("info", ("Scheduling warning to be issued by " + "binlog_query: '%s'", + ER(ER_BINLOG_UNSAFE_STATEMENT))); + DBUG_PRINT("info", ("binlog_unsafe_warning_flags: 0x%x", + binlog_unsafe_warning_flags)); + } + /* log in statement format! */ + } + /* No statement-only engines and binlog_format != STATEMENT. + I.e., nothing prevents us from row logging if needed. */ + else + { + if (lex->is_stmt_unsafe() || lex->is_stmt_row_injection() + || (flags_all_set & HA_BINLOG_STMT_CAPABLE) == 0) + { + /* log in row format! */ + set_current_stmt_binlog_format_row_if_mixed(); + } + } + } + + if (error) { + DBUG_PRINT("info", ("decision: no logging since an error was generated")); + DBUG_RETURN(-1); + } + DBUG_PRINT("info", ("decision: logging in %s format", + is_current_stmt_binlog_format_row() ? + "ROW" : "STATEMENT")); + } +#ifndef DBUG_OFF + else + DBUG_PRINT("info", ("decision: no logging since " + "mysql_bin_log.is_open() = %d " + "and (options & OPTION_BIN_LOG) = 0x%llx " + "and binlog_format = %ld " + "and binlog_filter->db_ok(db) = %d", + mysql_bin_log.is_open(), + (options & OPTION_BIN_LOG), + variables.binlog_format, + binlog_filter->db_ok(db))); +#endif + + DBUG_RETURN(0); +} + + /* Implementation of interface to write rows to the binary log through the thread. The thread is responsible for writing the rows it has @@ -3458,7 +3846,7 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, if (binlog_setup_trx_data()) DBUG_RETURN(NULL); - Rows_log_event* pending= binlog_get_pending_rows_event(); + Rows_log_event* pending= binlog_get_pending_rows_event(is_transactional); if (unlikely(pending && !pending->is_valid())) DBUG_RETURN(NULL); @@ -3492,7 +3880,9 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, flush the pending event and replace it with the newly created event... */ - if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev))) + if (unlikely( + mysql_bin_log.flush_and_set_pending_rows_event(this, ev, + is_transactional))) { delete ev; DBUG_RETURN(NULL); @@ -3715,7 +4105,7 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, uchar const *record) { - DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); /* Pack records into format for transfer. We are allocating more @@ -3745,7 +4135,7 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, const uchar *before_record, const uchar *after_record) { - DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); size_t const before_maxlen = max_row_length(table, before_record); size_t const after_maxlen = max_row_length(table, after_record); @@ -3790,7 +4180,7 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, uchar const *record) { - DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); /* Pack records into format for transfer. We are allocating more @@ -3816,14 +4206,15 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, } -int THD::binlog_remove_pending_rows_event(bool clear_maps) +int THD::binlog_remove_pending_rows_event(bool clear_maps, + bool is_transactional) { DBUG_ENTER("THD::binlog_remove_pending_rows_event"); if (!mysql_bin_log.is_open()) DBUG_RETURN(0); - mysql_bin_log.remove_pending_rows_event(this); + mysql_bin_log.remove_pending_rows_event(this, is_transactional); if (clear_maps) binlog_table_maps= 0; @@ -3831,7 +4222,7 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps) DBUG_RETURN(0); } -int THD::binlog_flush_pending_rows_event(bool stmt_end) +int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) { DBUG_ENTER("THD::binlog_flush_pending_rows_event"); /* @@ -3847,7 +4238,7 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end) flag is set. */ int error= 0; - if (Rows_log_event *pending= binlog_get_pending_rows_event()) + if (Rows_log_event *pending= binlog_get_pending_rows_event(is_transactional)) { if (stmt_end) { @@ -3856,7 +4247,8 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end) binlog_table_maps= 0; } - error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0); + error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0, + is_transactional); } DBUG_RETURN(error); @@ -3872,8 +4264,6 @@ show_query_type(THD::enum_binlog_query_type qtype) return "ROW"; case THD::STMT_QUERY_TYPE: return "STMT"; - case THD::MYSQL_QUERY_TYPE: - return "MYSQL"; case THD::QUERY_TYPE_COUNT: default: DBUG_ASSERT(0 <= qtype && qtype < THD::QUERY_TYPE_COUNT); @@ -3885,32 +4275,97 @@ show_query_type(THD::enum_binlog_query_type qtype) #endif -/* - Member function that will log query, either row-based or - statement-based depending on the value of the 'current_stmt_binlog_row_based' - the value of the 'qtype' flag. +/** + Auxiliary method used by @c binlog_query() to raise warnings. - This function should be called after the all calls to ha_*_row() - functions have been issued, but before tables are unlocked and - closed. + The type of warning and the type of unsafeness is stored in + THD::binlog_unsafe_warning_flags. +*/ +void THD::issue_unsafe_warnings() +{ + DBUG_ENTER("issue_unsafe_warnings"); + /* + Ensure that binlog_unsafe_warning_flags is big enough to hold all + bits. This is actually a constant expression. + */ + DBUG_ASSERT(2 * LEX::BINLOG_STMT_UNSAFE_COUNT <= + sizeof(binlog_unsafe_warning_flags) * CHAR_BIT); - OBSERVE - There shall be no writes to any system table after calling - binlog_query(), so these writes has to be moved to before the call - of binlog_query() for correct functioning. + uint32 unsafe_type_flags= binlog_unsafe_warning_flags; - This is necessesary not only for RBR, but the master might crash - after binlogging the query but before changing the system tables. - This means that the slave and the master are not in the same state - (after the master has restarted), so therefore we have to - eliminate this problem. + /* + Clear: (1) bits above BINLOG_STMT_UNSAFE_COUNT; (2) bits for + warnings that have been printed already. + */ + unsafe_type_flags &= (LEX::BINLOG_STMT_UNSAFE_ALL_FLAGS ^ + (unsafe_type_flags >> LEX::BINLOG_STMT_UNSAFE_COUNT)); + /* If all warnings have been printed already, return. */ + if (unsafe_type_flags == 0) + DBUG_VOID_RETURN; - RETURN VALUE - Error code, or 0 if no error. + DBUG_PRINT("info", ("unsafe_type_flags: 0x%x", unsafe_type_flags)); + + /* + For each unsafe_type, check if the statement is unsafe in this way + and issue a warning. + */ + for (int unsafe_type=0; + unsafe_type < LEX::BINLOG_STMT_UNSAFE_COUNT; + unsafe_type++) + { + if ((unsafe_type_flags & (1 << unsafe_type)) != 0) + { + push_warning_printf(this, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_BINLOG_UNSAFE_STATEMENT, + ER(ER_BINLOG_UNSAFE_STATEMENT), + ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); + if (global_system_variables.log_warnings) + { + char buf[MYSQL_ERRMSG_SIZE * 2]; + sprintf(buf, ER(ER_BINLOG_UNSAFE_STATEMENT), + ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type])); + sql_print_warning(ER(ER_MESSAGE_AND_STATEMENT), buf, query()); + } + } + } + /* + Mark these unsafe types as already printed, to avoid printing + warnings for them again. + */ + binlog_unsafe_warning_flags|= + unsafe_type_flags << LEX::BINLOG_STMT_UNSAFE_COUNT; + DBUG_VOID_RETURN; +} + + +/** + Log the current query. + + The query will be logged in either row format or statement format + depending on the value of @c current_stmt_binlog_format_row field and + the value of the @c qtype parameter. + + This function must be called: + + - After the all calls to ha_*_row() functions have been issued. + + - After any writes to system tables. Rationale: if system tables + were written after a call to this function, and the master crashes + after the call to this function and before writing the system + tables, then the master and slave get out of sync. + + - Before tables are unlocked and closed. + + @see decide_logging_format + + @retval 0 Success + + @retval nonzero If there is a failure when writing the query (e.g., + write failure), then the error code is returned. */ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, - ulong query_len, bool is_trans, bool suppress_use, - int errcode) + ulong query_len, bool is_trans, bool direct, + bool suppress_use, int errcode) { DBUG_ENTER("THD::binlog_query"); DBUG_PRINT("enter", ("qtype: %s query: '%s'", @@ -3927,59 +4382,53 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, top-most close_thread_tables(). */ if (this->prelocked_mode == NON_PRELOCKED) - if (int error= binlog_flush_pending_rows_event(TRUE)) + if (int error= binlog_flush_pending_rows_event(TRUE, is_trans)) DBUG_RETURN(error); /* - If we are in statement mode and trying to log an unsafe statement, - we should print a warning. + Warnings for unsafe statements logged in statement format are + printed here instead of in decide_logging_format(). This is + because the warnings should be printed only if the statement is + actually logged. When executing decide_logging_format(), we cannot + know for sure if the statement will be logged. */ - if (sql_log_bin_toplevel && lex->is_stmt_unsafe() && - variables.binlog_format == BINLOG_FORMAT_STMT && - binlog_filter->db_ok(this->db)) - { - /* - A warning can be elevated a error when STRICT sql mode. - But we don't want to elevate binlog warning to error here. - */ - push_warning(this, MYSQL_ERROR::WARN_LEVEL_NOTE, - ER_BINLOG_UNSAFE_STATEMENT, - ER(ER_BINLOG_UNSAFE_STATEMENT)); - if (global_system_variables.log_warnings && - !(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED)) - { - sql_print_warning("%s Statement: %.*s", - ER(ER_BINLOG_UNSAFE_STATEMENT), - MYSQL_ERRMSG_SIZE, query_arg); - binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED; - } - } + if (sql_log_bin_toplevel) + issue_unsafe_warnings(); switch (qtype) { + /* + ROW_QUERY_TYPE means that the statement may be logged either in + row format or in statement format. If + current_stmt_binlog_format is row, it means that the + statement has already been logged in row format and hence shall + not be logged again. + */ case THD::ROW_QUERY_TYPE: DBUG_PRINT("debug", - ("current_stmt_binlog_row_based: %d", - current_stmt_binlog_row_based)); - if (current_stmt_binlog_row_based) + ("is_current_stmt_binlog_format_row: %d", + is_current_stmt_binlog_format_row())); + if (is_current_stmt_binlog_format_row()) DBUG_RETURN(0); - /* Otherwise, we fall through */ - case THD::MYSQL_QUERY_TYPE: - /* - Using this query type is a conveniece hack, since we have been - moving back and forth between using RBR for replication of - system tables and not using it. + /* Fall through */ - Make sure to change in check_table_binlog_row_based() according - to how you treat this. + /* + STMT_QUERY_TYPE means that the query must be logged in statement + format; it cannot be logged in row format. This is typically + used by DDL statements. It is an error to use this query type + if current_stmt_binlog_format_row is row. + + @todo Currently there are places that call this method with + STMT_QUERY_TYPE and current_stmt_binlog_format is row. Fix those + places and add assert to ensure correct behavior. /Sven */ case THD::STMT_QUERY_TYPE: /* The MYSQL_LOG::write() function will set the STMT_END_F flag and flush the pending rows event if necessary. - */ + */ { - Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use, - errcode); + Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, + suppress_use, errcode); qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F; /* Binlog table maps will be irrelevant after a Query_log_event diff --git a/sql/sql_class.h b/sql/sql_class.h index d3e3f2a48b6..7e5ae019fca 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -46,6 +46,12 @@ enum enum_delay_key_write { DELAY_KEY_WRITE_NONE, DELAY_KEY_WRITE_ON, enum enum_slave_exec_mode { SLAVE_EXEC_MODE_STRICT, SLAVE_EXEC_MODE_IDEMPOTENT, SLAVE_EXEC_MODE_LAST_BIT}; +enum enum_slave_type_conversions { + SLAVE_TYPE_CONVERSIONS_ALL_LOSSY, + SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY, + SLAVE_TYPE_CONVERSIONS_COUNT +}; + enum enum_mark_columns { MARK_COLUMNS_NONE, MARK_COLUMNS_READ, MARK_COLUMNS_WRITE}; enum enum_filetype { FILETYPE_CSV, FILETYPE_XML }; @@ -1203,6 +1209,7 @@ public: /* Used to execute base64 coded binlog events in MySQL server */ Relay_log_info* rli_fake; + void reset_for_next_command(); /* Constant for THD::where initialization in the beginning of every query. @@ -1378,32 +1385,81 @@ public: size_t needed, bool is_transactional, RowsEventT* hint); - Rows_log_event* binlog_get_pending_rows_event() const; - void binlog_set_pending_rows_event(Rows_log_event* ev); - int binlog_flush_pending_rows_event(bool stmt_end); - int binlog_remove_pending_rows_event(bool clear_maps); + Rows_log_event* binlog_get_pending_rows_event(bool is_transactional) const; + void binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional); + inline int binlog_flush_pending_rows_event(bool stmt_end) + { + return (binlog_flush_pending_rows_event(stmt_end, FALSE) || + binlog_flush_pending_rows_event(stmt_end, TRUE)); + } + int binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional); + int binlog_remove_pending_rows_event(bool clear_maps, bool is_transactional); + + /** + Determine the binlog format of the current statement. + + @retval 0 if the current statement will be logged in statement + format. + @retval nonzero if the current statement will be logged in row + format. + */ + int is_current_stmt_binlog_format_row() const { + DBUG_ASSERT(current_stmt_binlog_format == BINLOG_FORMAT_STMT || + current_stmt_binlog_format == BINLOG_FORMAT_ROW); + return current_stmt_binlog_format == BINLOG_FORMAT_ROW; + } private: + /** + Indicates the format in which the current statement will be + logged. This can only be set from @c decide_logging_format(). + */ + enum_binlog_format current_stmt_binlog_format; + + /** + Bit field for the state of binlog warnings. + + There are two groups of bits: + + - The first Lex::BINLOG_STMT_UNSAFE_COUNT bits list all types of + unsafeness that the current statement has. + + - The following Lex::BINLOG_STMT_UNSAFE_COUNT bits list all types + of unsafeness that the current statement has issued warnings + for. + + Hence, this variable must be big enough to hold + 2*Lex::BINLOG_STMT_UNSAFE_COUNT bits. This is asserted in @c + issue_unsafe_warnings(). + + The first and second groups of bits are set by @c + decide_logging_format() when it detects that a warning should be + issued. The third group of bits is set from @c binlog_query() + when a warning is issued. All bits are cleared at the end of the + top-level statement. + + This must be a member of THD and not of LEX, because warnings are + detected and issued in different places (@c + decide_logging_format() and @c binlog_query(), respectively). + Between these calls, the THD->lex object may change; e.g., if a + stored routine is invoked. Only THD persists between the calls. + */ + uint32 binlog_unsafe_warning_flags; + + void issue_unsafe_warnings(); + /* Number of outstanding table maps, i.e., table maps in the transaction cache. */ uint binlog_table_maps; - - enum enum_binlog_flag { - BINLOG_FLAG_UNSAFE_STMT_PRINTED, - BINLOG_FLAG_COUNT - }; - - /** - Flags with per-thread information regarding the status of the - binary log. - */ - uint32 binlog_flags; public: uint get_binlog_table_maps() const { return binlog_table_maps; } + void clear_binlog_table_maps() { + binlog_table_maps= 0; + } #endif /* MYSQL_CLIENT */ public: @@ -1691,7 +1747,7 @@ public: bool slave_thread, one_shot_set; /* tells if current statement should binlog row-based(1) or stmt-based(0) */ bool current_stmt_binlog_row_based; - bool some_tables_deleted; + bool locked, some_tables_deleted; bool last_cuted_field; bool no_errors, password; /** @@ -1847,27 +1903,18 @@ public: #ifndef MYSQL_CLIENT enum enum_binlog_query_type { - /* - The query can be logged row-based or statement-based - */ + /* The query can be logged in row format or in statement format. */ ROW_QUERY_TYPE, - /* - The query has to be logged statement-based - */ + /* The query has to be logged in statement format. */ STMT_QUERY_TYPE, - /* - The query represents a change to a table in the "mysql" - database and is currently mapped to ROW_QUERY_TYPE. - */ - MYSQL_QUERY_TYPE, QUERY_TYPE_COUNT }; int binlog_query(enum_binlog_query_type qtype, - char const *query, ulong query_len, - bool is_trans, bool suppress_use, + char const *query, ulong query_len, bool is_trans, + bool direct, bool suppress_use, int errcode); #endif @@ -1935,6 +1982,21 @@ public: { return server_status & SERVER_STATUS_IN_TRANS; } + /** + Returns TRUE if session is in a multi-statement transaction mode. + + OPTION_NOT_AUTOCOMMIT: When autocommit is off, a multi-statement + transaction is implicitly started on the first statement after a + previous transaction has been ended. + + OPTION_BEGIN: Regardless of the autocommit status, a multi-statement + transaction can be explicitly started with the statements "START + TRANSACTION", "BEGIN [WORK]", "[COMMIT | ROLLBACK] AND CHAIN", etc. + */ + inline bool in_multi_stmt_transaction() + { + return options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN); + } inline bool fill_derived_tables() { return !stmt_arena->is_stmt_prepare() && !lex->only_view_structure(); @@ -2079,31 +2141,51 @@ public: void set_n_backup_active_arena(Query_arena *set, Query_arena *backup); void restore_active_arena(Query_arena *set, Query_arena *backup); - inline void set_current_stmt_binlog_row_based_if_mixed() + /* + @todo Make these methods private or remove them completely. Only + decide_logging_format should call them. /Sven + */ + inline void set_current_stmt_binlog_format_row_if_mixed() { + DBUG_ENTER("set_current_stmt_binlog_format_row_if_mixed"); + /* + This should only be called from decide_logging_format. + + @todo Once we have ensured this, uncomment the following + statement, remove the big comment below that, and remove the + in_sub_stmt==0 condition from the following 'if'. + */ + /* DBUG_ASSERT(in_sub_stmt == 0); */ /* If in a stored/function trigger, the caller should already have done the change. We test in_sub_stmt to prevent introducing bugs where people wouldn't ensure that, and would switch to row-based mode in the middle of executing a stored function/trigger (which is too late, see also - reset_current_stmt_binlog_row_based()); this condition will make their + reset_current_stmt_binlog_format_row()); this condition will make their tests fail and so force them to propagate the lex->binlog_row_based_if_mixed upwards to the caller. */ if ((variables.binlog_format == BINLOG_FORMAT_MIXED) && (in_sub_stmt == 0)) - current_stmt_binlog_row_based= TRUE; + set_current_stmt_binlog_format_row(); + + DBUG_VOID_RETURN; } - inline void set_current_stmt_binlog_row_based() + inline void set_current_stmt_binlog_format_row() { - current_stmt_binlog_row_based= TRUE; + DBUG_ENTER("set_current_stmt_binlog_format_row"); + current_stmt_binlog_format= BINLOG_FORMAT_ROW; + DBUG_VOID_RETURN; } - inline void clear_current_stmt_binlog_row_based() + inline void clear_current_stmt_binlog_format_row() { - current_stmt_binlog_row_based= FALSE; + DBUG_ENTER("clear_current_stmt_binlog_format_row"); + current_stmt_binlog_format= BINLOG_FORMAT_STMT; + DBUG_VOID_RETURN; } - inline void reset_current_stmt_binlog_row_based() + inline void reset_current_stmt_binlog_format_row() { + DBUG_ENTER("reset_current_stmt_binlog_format_row"); /* If there are temporary tables, don't reset back to statement-based. Indeed it could be that: @@ -2118,19 +2200,19 @@ public: or trigger is decided when it starts executing, depending for example on the caller (for a stored function: if caller is SELECT or INSERT/UPDATE/DELETE...). - - Don't reset binlog format for NDB binlog injector thread. */ DBUG_PRINT("debug", ("temporary_tables: %s, in_sub_stmt: %s, system_thread: %s", YESNO(temporary_tables), YESNO(in_sub_stmt), show_system_thread(system_thread))); - if ((temporary_tables == NULL) && (in_sub_stmt == 0) && - (system_thread != SYSTEM_THREAD_NDBCLUSTER_BINLOG)) + if ((temporary_tables == NULL) && (in_sub_stmt == 0)) { - current_stmt_binlog_row_based= - test(variables.binlog_format == BINLOG_FORMAT_ROW); + if (variables.binlog_format == BINLOG_FORMAT_ROW) + set_current_stmt_binlog_format_row(); + else + clear_current_stmt_binlog_format_row(); } + DBUG_VOID_RETURN; } /** @@ -2328,7 +2410,9 @@ public: void set_query_and_id(char *query_arg, uint32 query_length_arg, query_id_t new_query_id); void set_query_id(query_id_t new_query_id); + int decide_logging_format(TABLE_LIST *tables); private: + /** The current internal error handler for this thread, or NULL. */ Internal_error_handler *m_internal_handler; /** diff --git a/sql/sql_db.cc b/sql/sql_db.cc index aa124a0a004..9fef7114898 100644 --- a/sql/sql_db.cc +++ b/sql/sql_db.cc @@ -181,7 +181,7 @@ uchar* dboptions_get_key(my_dbopt_t *opt, size_t *length, static inline int write_to_binlog(THD *thd, char *query, uint q_len, char *db, uint db_len) { - Query_log_event qinfo(thd, query, q_len, 0, 0, 0); + Query_log_event qinfo(thd, query, q_len, FALSE, TRUE, FALSE, 0); qinfo.db= db; qinfo.db_len= db_len; return mysql_bin_log.write(&qinfo); @@ -750,7 +750,7 @@ not_silent: if (mysql_bin_log.is_open()) { int errcode= query_error_code(thd, TRUE); - Query_log_event qinfo(thd, query, query_length, 0, + Query_log_event qinfo(thd, query, query_length, FALSE, TRUE, /* suppress_use */ TRUE, errcode); /* @@ -842,10 +842,9 @@ bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info) if (mysql_bin_log.is_open()) { - thd->clear_error(); - Query_log_event qinfo(thd, thd->query(), thd->query_length(), 0, - /* suppress_use */ TRUE, 0); - + int errcode= query_error_code(thd, TRUE); + Query_log_event qinfo(thd, thd->query(), thd->query_length(), FALSE, TRUE, + /* suppress_use */ TRUE, errcode); /* Write should use the database being created as the "current database" and not the threads current database, which is the @@ -994,9 +993,9 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent) } if (mysql_bin_log.is_open()) { - thd->clear_error(); - Query_log_event qinfo(thd, query, query_length, 0, - /* suppress_use */ TRUE, 0); + int errcode= query_error_code(thd, TRUE); + Query_log_event qinfo(thd, query, query_length, FALSE, TRUE, + /* suppress_use */ TRUE, errcode); /* Write should use the database being created as the "current database" and not the threads current database, which is the @@ -2008,7 +2007,7 @@ bool mysql_upgrade_db(THD *thd, LEX_STRING *old_db) { int errcode= query_error_code(thd, TRUE); Query_log_event qinfo(thd, thd->query(), thd->query_length(), - 0, TRUE, errcode); + FALSE, TRUE, TRUE, errcode); thd->clear_error(); error|= mysql_bin_log.write(&qinfo); } diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc index ab898950a1d..e446c54beef 100644 --- a/sql/sql_delete.cc +++ b/sql/sql_delete.cc @@ -131,7 +131,7 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, if (!using_limit && const_cond_result && !(specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) && (thd->lex->sql_command == SQLCOM_TRUNCATE || - (!thd->current_stmt_binlog_row_based && + (!thd->is_current_stmt_binlog_format_row() && !(table->triggers && table->triggers->has_delete_triggers())))) { /* Update the table->file->stats.records number */ @@ -385,7 +385,8 @@ cleanup: transactional_table= table->file->has_transactions(); if (!transactional_table && deleted > 0) - thd->transaction.stmt.modified_non_trans_table= TRUE; + thd->transaction.stmt.modified_non_trans_table= + thd->transaction.all.modified_non_trans_table= TRUE; /* See similar binlogging code in sql_update.cc, for comments */ if ((error < 0) || thd->transaction.stmt.modified_non_trans_table) @@ -414,15 +415,13 @@ cleanup: */ int log_result= thd->binlog_query(query_type, thd->query(), thd->query_length(), - is_trans, FALSE, errcode); + is_trans, FALSE, FALSE, errcode); if (log_result) { error=1; } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } DBUG_ASSERT(transactional_table || !deleted || thd->transaction.stmt.modified_non_trans_table); free_underlaid_joins(thd, select_lex); @@ -461,19 +460,6 @@ int mysql_prepare_delete(THD *thd, TABLE_LIST *table_list, Item **conds) DBUG_ENTER("mysql_prepare_delete"); List<Item> all_fields; - /* - Statement-based replication of DELETE ... LIMIT is not safe as order of - rows is not defined, so in mixed mode we go to row-based. - - Note that we may consider a statement as safe if ORDER BY primary_key - is present. However it may confuse users to see very similiar statements - replicated differently. - */ - if (thd->lex->current_select->select_limit) - { - thd->lex->set_stmt_unsafe(); - thd->set_current_stmt_binlog_row_based_if_mixed(); - } thd->lex->allow_sum_func= 0; if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context, &thd->lex->select_lex.top_join_list, @@ -822,6 +808,9 @@ void multi_delete::abort() if (deleted) query_cache_invalidate3(thd, delete_tables, 1); + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + /* If rows from the first table only has been deleted and it is transactional, just do rollback. @@ -852,10 +841,9 @@ void multi_delete::abort() int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); /* possible error of writing binary log is ignored deliberately */ (void) thd->binlog_query(THD::ROW_QUERY_TYPE, - thd->query(), thd->query_length(), - transactional_tables, FALSE, errcode); + thd->query(), thd->query_length(), + transactional_tables, FALSE, FALSE, errcode); } - thd->transaction.all.modified_non_trans_table= true; } DBUG_VOID_RETURN; } @@ -1008,6 +996,9 @@ bool multi_delete::send_eof() /* reset used flags */ thd_proc_info(thd, "end"); + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + /* We must invalidate the query cache before binlog writing and ha_autocommit_... @@ -1027,14 +1018,12 @@ bool multi_delete::send_eof() errcode= query_error_code(thd, killed_status == THD::NOT_KILLED); if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), thd->query_length(), - transactional_tables, FALSE, errcode) && + transactional_tables, FALSE, FALSE, errcode) && !normal_tables) { local_error=1; // Log write failed: roll back the SQL statement } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } if (local_error != 0) error_handled= TRUE; // to force early leave from ::send_error() @@ -1059,15 +1048,16 @@ bool multi_delete::send_eof() static bool mysql_truncate_by_delete(THD *thd, TABLE_LIST *table_list) { - bool error, save_binlog_row_based= thd->current_stmt_binlog_row_based; + bool error, save_binlog_row_based= thd->is_current_stmt_binlog_format_row(); DBUG_ENTER("mysql_truncate_by_delete"); table_list->lock_type= TL_WRITE; mysql_init_select(thd->lex); - thd->clear_current_stmt_binlog_row_based(); + thd->clear_current_stmt_binlog_format_row(); error= mysql_delete(thd, table_list, NULL, NULL, HA_POS_ERROR, LL(0), TRUE); ha_autocommit_or_rollback(thd, error); end_trans(thd, error ? ROLLBACK : COMMIT); - thd->current_stmt_binlog_row_based= save_binlog_row_based; + if (save_binlog_row_based) + thd->set_current_stmt_binlog_format_row(); DBUG_RETURN(error); } @@ -1182,10 +1172,10 @@ end: if (!error) { /* In RBR, the statement is not binlogged if the table is temporary. */ - if (!is_temporary_table || !thd->current_stmt_binlog_row_based) + if (!is_temporary_table || !thd->is_current_stmt_binlog_format_row()) error= write_bin_log(thd, TRUE, thd->query(), thd->query_length()); if (!error) - my_ok(thd); // This should return record count + my_ok(thd); // This should return record count } mysql_mutex_lock(&LOCK_open); unlock_table_name(thd, table_list); diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 147e03ddf55..f933ac46b62 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -899,6 +899,10 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, */ query_cache_invalidate3(thd, table_list, 1); } + + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + if ((changed && error <= 0) || thd->transaction.stmt.modified_non_trans_table || was_insert_delayed) @@ -937,15 +941,13 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, */ DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0); if (thd->binlog_query(THD::ROW_QUERY_TYPE, - thd->query(), thd->query_length(), - transactional_table, FALSE, - errcode)) + thd->query(), thd->query_length(), + transactional_table, FALSE, FALSE, + errcode)) { - error=1; + error= 1; } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } DBUG_ASSERT(transactional_table || !changed || thd->transaction.stmt.modified_non_trans_table); @@ -1786,6 +1788,7 @@ public: table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0), group_count(0) { + DBUG_ENTER("Delayed_insert constructor"); thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user; thd.security_ctx->host=(char*) my_localhost; thd.current_tablenr=0; @@ -1794,11 +1797,21 @@ public: thd.lex->current_select= 0; // for my_message_sql thd.lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock() /* - Statement-based replication of INSERT DELAYED has problems with RAND() - and user vars, so in mixed mode we go to row-based. + Statement-based replication of INSERT DELAYED has problems with + RAND() and user variables, so in mixed mode we go to row-based. + For normal commands, the unsafe flag is set at parse time. + However, since the flag is a member of the THD object, of which + the delayed_insert thread has its own copy, we must set the + statement to unsafe here and explicitly set row logging mode. + + @todo set_current_stmt_binlog_format_row_if_mixed should not be + called by anything else than thd->decide_logging_format(). When + we call set_current_blah here, none of the checks in + decide_logging_format is made. We should probably call + thd->decide_logging_format() directly instead. /Sven */ - thd.lex->set_stmt_unsafe(); - thd.set_current_stmt_binlog_row_based_if_mixed(); + thd.lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_DELAYED); + thd.set_current_stmt_binlog_format_row_if_mixed(); bzero((char*) &thd.net, sizeof(thd.net)); // Safety bzero((char*) &table_list, sizeof(table_list)); // Safety @@ -1813,6 +1826,7 @@ public: delayed_lock= global_system_variables.low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE; pthread_mutex_unlock(&LOCK_thread_count); + DBUG_VOID_RETURN; } ~Delayed_insert() { @@ -2393,8 +2407,8 @@ pthread_handler_t handle_delayed_insert(void *arg) Statement-based replication of INSERT DELAYED has problems with RAND() and user vars, so in mixed mode we go to row-based. */ - thd->lex->set_stmt_unsafe(); - thd->set_current_stmt_binlog_row_based_if_mixed(); + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_DELAYED); + thd->set_current_stmt_binlog_format_row_if_mixed(); /* Open table */ if (!(di->table= open_n_lock_single_table(thd, &di->table_list, @@ -2620,6 +2634,7 @@ bool Delayed_insert::handle_inserts(void) { int error; ulong max_rows; + bool has_trans = TRUE; bool using_ignore= 0, using_opt_replace= 0, using_bin_log= mysql_bin_log.is_open(); delayed_row *row; @@ -2772,9 +2787,9 @@ bool Delayed_insert::handle_inserts(void) */ if (thd.binlog_query(THD::ROW_QUERY_TYPE, row->query.str, row->query.length, - FALSE, FALSE, errcode)) + FALSE, FALSE, FALSE, errcode)) goto err; - + thd.time_zone_used = backup_time_zone_used; thd.variables.time_zone = backup_time_zone; } @@ -2847,9 +2862,11 @@ bool Delayed_insert::handle_inserts(void) or trigger. TODO: Move the logging to last in the sequence of rows. - */ - if (thd.current_stmt_binlog_row_based && - thd.binlog_flush_pending_rows_event(TRUE)) + */ + has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE || + table->file->has_transactions(); + if (thd.is_current_stmt_binlog_format_row() && + thd.binlog_flush_pending_rows_event(TRUE, has_trans)) goto err; if ((error=table->file->extra(HA_EXTRA_NO_CACHE))) @@ -2914,19 +2931,6 @@ bool mysql_insert_select_prepare(THD *thd) DBUG_ENTER("mysql_insert_select_prepare"); /* - Statement-based replication of INSERT ... SELECT ... LIMIT is not safe - as order of rows is not defined, so in mixed mode we go to row-based. - - Note that we may consider a statement as safe if ORDER BY primary_key - is present or we SELECT a constant. However it may confuse users to - see very similiar statements replicated differently. - */ - if (lex->current_select->select_limit) - { - lex->set_stmt_unsafe(); - thd->set_current_stmt_binlog_row_based_if_mixed(); - } - /* SELECT_LEX do not belong to INSERT statement, so we can't add WHERE clause if table is VIEW */ @@ -3291,9 +3295,11 @@ bool select_insert::send_eof() and ha_autocommit_or_rollback. */ query_cache_invalidate3(thd, table, 1); - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } + + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + DBUG_ASSERT(trans_table || !changed || thd->transaction.stmt.modified_non_trans_table); @@ -3313,7 +3319,7 @@ bool select_insert::send_eof() errcode= query_error_code(thd, killed_status == THD::NOT_KILLED); if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), thd->query_length(), - trans_table, FALSE, errcode)) + trans_table, FALSE, FALSE, errcode)) { table->file->ha_release_auto_increment(); DBUG_RETURN(1); @@ -3385,16 +3391,17 @@ void select_insert::abort() { transactional_table= table->file->has_transactions(); if (thd->transaction.stmt.modified_non_trans_table) { + if (!can_rollback_data()) + thd->transaction.all.modified_non_trans_table= TRUE; + if (mysql_bin_log.is_open()) { int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); /* error of writing binary log is ignored */ (void) thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), thd->query_length(), - transactional_table, FALSE, errcode); + transactional_table, FALSE, FALSE, errcode); } - if (!thd->current_stmt_binlog_row_based && !can_rollback_data()) - thd->transaction.all.modified_non_trans_table= TRUE; if (changed) query_cache_invalidate3(thd, table, 1); } @@ -3638,11 +3645,11 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) virtual int do_postlock(TABLE **tables, uint count) { THD *thd= const_cast<THD*>(ptr->get_thd()); - if (int error= decide_logging_format(thd, &all_tables)) + if (int error= thd->decide_logging_format(&all_tables)) return error; TABLE const *const table = *tables; - if (thd->current_stmt_binlog_row_based && + if (thd->is_current_stmt_binlog_format_row() && !table->s->tmp_table && !ptr->get_create_info()->table_existed) { @@ -3667,7 +3674,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) temporary table, we need to start a statement transaction. */ if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 && - thd->current_stmt_binlog_row_based && + thd->is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()) { thd->binlog_start_trans_and_stmt(); @@ -3686,7 +3693,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR), create_table->table_name); - if (thd->current_stmt_binlog_row_based) + if (thd->is_current_stmt_binlog_format_row()) binlog_show_create_table(&(create_table->table), 1); table= create_table->table; } @@ -3774,7 +3781,7 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) schema that will do a close_thread_tables(), destroying the statement transaction cache. */ - DBUG_ASSERT(thd->current_stmt_binlog_row_based); + DBUG_ASSERT(thd->is_current_stmt_binlog_format_row()); DBUG_ASSERT(tables && *tables && count > 0); char buf[2048]; @@ -3796,6 +3803,7 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) result= thd->binlog_query(THD::STMT_QUERY_TYPE, query.ptr(), query.length(), /* is_trans */ TRUE, + /* direct */ FALSE, /* suppress_use */ FALSE, errcode); } @@ -3815,7 +3823,7 @@ void select_create::send_error(uint errcode,const char *err) DBUG_PRINT("info", ("Current statement %s row-based", - thd->current_stmt_binlog_row_based ? "is" : "is NOT")); + thd->is_current_stmt_binlog_format_row() ? "is" : "is NOT")); DBUG_PRINT("info", ("Current table (at 0x%lu) %s a temporary (or non-existant) table", (ulong) table, @@ -3898,7 +3906,7 @@ void select_create::abort() thd->transaction.stmt.modified_non_trans_table= FALSE; reenable_binlog(thd); /* possible error of writing binary log is ignored deliberately */ - (void)thd->binlog_flush_pending_rows_event(TRUE); + (void) thd->binlog_flush_pending_rows_event(TRUE, TRUE); if (m_plock) { diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc index 64eb1d2b1a7..0d423ba85eb 100644 --- a/sql/sql_lex.cc +++ b/sql/sql_lex.cc @@ -38,6 +38,23 @@ sys_var *trg_new_row_fake_var= (sys_var*) 0x01; */ const LEX_STRING null_lex_str= {NULL, 0}; const LEX_STRING empty_lex_str= { (char*) "", 0 }; +/** + @note The order of the elements of this array must correspond to + the order of elements in enum_binlog_stmt_unsafe. +*/ +const int +Query_tables_list::binlog_stmt_unsafe_errcode[BINLOG_STMT_UNSAFE_COUNT] = +{ + ER_BINLOG_UNSAFE_LIMIT, + ER_BINLOG_UNSAFE_INSERT_DELAYED, + ER_BINLOG_UNSAFE_SYSTEM_TABLE, + ER_BINLOG_UNSAFE_TWO_AUTOINC_COLUMNS, + ER_BINLOG_UNSAFE_UDF, + ER_BINLOG_UNSAFE_SYSTEM_VARIABLE, + ER_BINLOG_UNSAFE_SYSTEM_FUNCTION, + ER_BINLOG_UNSAFE_NONTRANS_AFTER_TRANS +}; + /* Longest standard keyword name */ diff --git a/sql/sql_lex.h b/sql/sql_lex.h index e80d9dfcb3f..d313cb0484f 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -1068,25 +1068,156 @@ public: } } + /** - Has the parser/scanner detected that this statement is unsafe? - */ + Enumeration listing of all types of unsafe statement. + + @note The order of elements of this enumeration type must + correspond to the order of the elements of the @c explanations + array defined in the body of @c THD::issue_unsafe_warnings. + */ + enum enum_binlog_stmt_unsafe { + /** + SELECT..LIMIT is unsafe because the set of rows returned cannot + be predicted. + */ + BINLOG_STMT_UNSAFE_LIMIT= 0, + /** + INSERT DELAYED is unsafe because the time when rows are inserted + cannot be predicted. + */ + BINLOG_STMT_UNSAFE_INSERT_DELAYED, + /** + Access to log tables is unsafe because slave and master probably + log different things. + */ + BINLOG_STMT_UNSAFE_SYSTEM_TABLE, + /** + Update of two autoincrement columns is unsafe. With one + autoincrement column, we store the counter in the binlog so that + slave can restore the correct value. But we can only store one + such counter per statement, so updating more than one + autoincrement column is not safe. + */ + BINLOG_STMT_UNSAFE_TWO_AUTOINC_COLUMNS, + /** + Using a UDF (user-defined function) is unsafe. + */ + BINLOG_STMT_UNSAFE_UDF, + /** + Using most system variables is unsafe, because slave may run + with different options than master. + */ + BINLOG_STMT_UNSAFE_SYSTEM_VARIABLE, + /** + Using some functions is unsafe (e.g., UUID). + */ + BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION, + + /** + Mixing transactional and non-transactional statements are unsafe if + non-transactional reads or writes are occur after transactional + reads or writes inside a transaction. + */ + BINLOG_STMT_UNSAFE_NONTRANS_AFTER_TRANS, + + /* The last element of this enumeration type. */ + BINLOG_STMT_UNSAFE_COUNT + }; + /** + This has all flags from 0 (inclusive) to BINLOG_STMT_FLAG_COUNT + (exclusive) set. + */ + static const int BINLOG_STMT_UNSAFE_ALL_FLAGS= + ((1 << BINLOG_STMT_UNSAFE_COUNT) - 1); + + /** + Maps elements of enum_binlog_stmt_unsafe to error codes. + */ + static const int binlog_stmt_unsafe_errcode[BINLOG_STMT_UNSAFE_COUNT]; + + /** + Determine if this statement is marked as unsafe. + + @retval 0 if the statement is not marked as unsafe. + @retval nonzero if the statement is marked as unsafe. + */ inline bool is_stmt_unsafe() const { - return binlog_stmt_flags & (1U << BINLOG_STMT_FLAG_UNSAFE); + return get_stmt_unsafe_flags() != 0; } /** - Flag the current (top-level) statement as unsafe. + Flag the current (top-level) statement as unsafe. + The flag will be reset after the statement has finished. - The flag will be reset after the statement has finished. + @param unsafe_type The type of unsafety: one of the @c + BINLOG_STMT_FLAG_UNSAFE_* flags in @c enum_binlog_stmt_flag. + */ + inline void set_stmt_unsafe(enum_binlog_stmt_unsafe unsafe_type) { + DBUG_ENTER("set_stmt_unsafe"); + DBUG_ASSERT(unsafe_type >= 0 && unsafe_type < BINLOG_STMT_UNSAFE_COUNT); + binlog_stmt_flags|= (1U << unsafe_type); + DBUG_VOID_RETURN; + } - */ - inline void set_stmt_unsafe() { - binlog_stmt_flags|= (1U << BINLOG_STMT_FLAG_UNSAFE); + /** + Set the bits of binlog_stmt_flags determining the type of + unsafeness of the current statement. No existing bits will be + cleared, but new bits may be set. + + @param flags A binary combination of zero or more bits, (1<<flag) + where flag is a member of enum_binlog_stmt_unsafe. + */ + inline void set_stmt_unsafe_flags(uint32 flags) { + DBUG_ENTER("set_stmt_unsafe_flags"); + DBUG_ASSERT((flags & ~BINLOG_STMT_UNSAFE_ALL_FLAGS) == 0); + binlog_stmt_flags|= flags; + DBUG_VOID_RETURN; + } + + /** + Return a binary combination of all unsafe warnings for the + statement. If the statement has been marked as unsafe by the + 'flag' member of enum_binlog_stmt_unsafe, then the return value + from this function has bit (1<<flag) set to 1. + */ + inline uint32 get_stmt_unsafe_flags() const { + DBUG_ENTER("get_stmt_unsafe_flags"); + DBUG_RETURN(binlog_stmt_flags & BINLOG_STMT_UNSAFE_ALL_FLAGS); } + /** + Mark the current statement as safe; i.e., clear all bits in + binlog_stmt_flags that correspond to elements of + enum_binlog_stmt_unsafe. + */ inline void clear_stmt_unsafe() { - binlog_stmt_flags&= ~(1U << BINLOG_STMT_FLAG_UNSAFE); + DBUG_ENTER("clear_stmt_unsafe"); + binlog_stmt_flags&= ~BINLOG_STMT_UNSAFE_ALL_FLAGS; + DBUG_VOID_RETURN; + } + + /** + Determine if this statement is a row injection. + + @retval 0 if the statement is not a row injection + @retval nonzero if the statement is a row injection + */ + inline bool is_stmt_row_injection() const { + return binlog_stmt_flags & + (1U << (BINLOG_STMT_UNSAFE_COUNT + BINLOG_STMT_TYPE_ROW_INJECTION)); + } + + /** + Flag the statement as a row injection. A row injection is either + a BINLOG statement, or a row event in the relay log executed by + the slave SQL thread. + */ + inline void set_stmt_row_injection() { + DBUG_ENTER("set_stmt_row_injection"); + binlog_stmt_flags|= + (1U << (BINLOG_STMT_UNSAFE_COUNT + BINLOG_STMT_TYPE_ROW_INJECTION)); + DBUG_VOID_RETURN; } /** @@ -1097,16 +1228,37 @@ public: { return sroutines_list.elements != 0; } private: - enum enum_binlog_stmt_flag { - BINLOG_STMT_FLAG_UNSAFE, - BINLOG_STMT_FLAG_COUNT + + /** + Enumeration listing special types of statements. + + Currently, the only possible type is ROW_INJECTION. + */ + enum enum_binlog_stmt_type { + /** + The statement is a row injection (i.e., either a BINLOG + statement or a row event executed by the slave SQL thread). + */ + BINLOG_STMT_TYPE_ROW_INJECTION = 0, + + /** The last element of this enumeration type. */ + BINLOG_STMT_TYPE_COUNT }; - /* - Tells if the parsing stage detected properties of the statement, - for example: that some items require row-based binlogging to give - a reliable binlog/replication, or if we will use stored functions - or triggers which themselves need require row-based binlogging. + /** + Bit field indicating the type of statement. + + There are two groups of bits: + + - The low BINLOG_STMT_UNSAFE_COUNT bits indicate the types of + unsafeness that the current statement has. + + - The next BINLOG_STMT_TYPE_COUNT bits indicate if the statement + is of some special type. + + This must be a member of LEX, not of THD: each stored procedure + needs to remember its unsafeness state between calls and each + stored procedure has its own LEX object (but no own THD object). */ uint32 binlog_stmt_flags; }; diff --git a/sql/sql_load.cc b/sql/sql_load.cc index 9368053c658..8404690293f 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -580,8 +580,8 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, version for the binary log to mark that table maps are invalid after this point. */ - if (thd->current_stmt_binlog_row_based) - error= thd->binlog_flush_pending_rows_event(true); + if (thd->is_current_stmt_binlog_format_row()) + error= thd->binlog_flush_pending_rows_event(TRUE, transactional_table); else { /* @@ -729,7 +729,7 @@ static bool write_execute_load_query_log_event(THD *thd, sql_exchange* ex, (uint) ((char*) fname_end - (char*) thd->query()), (duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE : (ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR), - transactional_table, FALSE, errcode); + transactional_table, FALSE, FALSE, errcode); e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F; return mysql_bin_log.write(&e); } diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 0774e373908..c2af8de9015 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -5655,21 +5655,26 @@ bool my_yyoverflow(short **yyss, YYSTYPE **yyvs, ulong *yystacksize) /** - Reset THD part responsible for command processing state. + Reset the part of THD responsible for the state of command + processing. - This needs to be called before execution of every statement - (prepared or conventional). - It is not called by substatements of routines. + This needs to be called before execution of every statement + (prepared or conventional). It is not called by substatements of + routines. - @todo - Make it a method of THD and align its name with the rest of - reset/end/start/init methods. - @todo - Call it after we use THD for queries, not before. -*/ + @todo Remove mysql_reset_thd_for_next_command and only use the + member function. + @todo Call it after we use THD for queries, not before. +*/ void mysql_reset_thd_for_next_command(THD *thd) { + thd->reset_for_next_command(); +} + +void THD::reset_for_next_command() +{ + THD *thd= this; DBUG_ENTER("mysql_reset_thd_for_next_command"); DBUG_ASSERT(!thd->spcont); /* not for substatements of routines */ DBUG_ASSERT(! thd->in_sub_stmt); @@ -5713,15 +5718,12 @@ void mysql_reset_thd_for_next_command(THD *thd) thd->rand_used= 0; thd->sent_row_count= thd->examined_row_count= 0; - /* - Because we come here only for start of top-statements, binlog format is - constant inside a complex statement (using stored functions) etc. - */ - thd->reset_current_stmt_binlog_row_based(); + thd->reset_current_stmt_binlog_format_row(); + thd->binlog_unsafe_warning_flags= 0; DBUG_PRINT("debug", - ("current_stmt_binlog_row_based: %d", - thd->current_stmt_binlog_row_based)); + ("is_current_stmt_binlog_format_row(): %d", + thd->is_current_stmt_binlog_format_row())); DBUG_VOID_RETURN; } @@ -6856,6 +6858,30 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables, tables. */ + options|= REFRESH_BINARY_LOG; + options|= REFRESH_RELAY_LOG; + options|= REFRESH_SLOW_LOG; + options|= REFRESH_GENERAL_LOG; + options|= REFRESH_ENGINE_LOG; + options|= REFRESH_ERROR_LOG; + } + + if (options & REFRESH_ERROR_LOG) + if (flush_error_log()) + result= 1; + + if ((options & REFRESH_SLOW_LOG) && opt_slow_log) + logger.flush_slow_log(); + + if ((options & REFRESH_GENERAL_LOG) && opt_log) + logger.flush_general_log(); + + if (options & REFRESH_ENGINE_LOG) + if (ha_flush_logs(NULL)) + result= 1; + + if (options & REFRESH_BINARY_LOG) + { /* Writing this command to the binlog may result in infinite loops when doing mysqlbinlog|mysql, and anyway it does not really make @@ -6863,23 +6889,16 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables, than it would help them) */ tmp_write_to_binlog= 0; - if( mysql_bin_log.is_open() ) - { + if (mysql_bin_log.is_open()) mysql_bin_log.rotate_and_purge(RP_FORCE_ROTATE); - } + } + if (options & REFRESH_RELAY_LOG) + { #ifdef HAVE_REPLICATION pthread_mutex_lock(&LOCK_active_mi); rotate_relay_log(active_mi); pthread_mutex_unlock(&LOCK_active_mi); #endif - - /* flush slow and general logs */ - logger.flush_logs(thd); - - if (ha_flush_logs(NULL)) - result=1; - if (flush_error_log()) - result=1; } #ifdef HAVE_QUERY_CACHE if (options & REFRESH_QUERY_CACHE_FREE) diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index aa06b1cfb0e..ec03009c812 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1951,7 +1951,7 @@ int log_loaded_block(IO_CACHE* file) uchar* buffer= (uchar*) my_b_get_buffer_start(file); uint max_event_size= current_thd->variables.max_allowed_packet; lf_info= (LOAD_FILE_INFO*) file->arg; - if (lf_info->thd->current_stmt_binlog_row_based) + if (lf_info->thd->is_current_stmt_binlog_format_row()) DBUG_RETURN(0); if (lf_info->last_pos_in_file != HA_POS_ERROR && lf_info->last_pos_in_file >= my_b_get_pos_in_file(file)) diff --git a/sql/sql_select.cc b/sql/sql_select.cc index 5f52ef9f856..783d266aa37 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -10638,6 +10638,7 @@ TABLE *create_virtual_tmp_table(THD *thd, List<Create_field> &field_list) share->blob_field= blob_field; share->fields= field_count; share->blob_ptr_size= portable_sizeof_char_ptr; + share->db_low_byte_first=1; // True for HEAP and MyISAM setup_tmp_table_column_bitmaps(table, bitmaps); /* Create all fields and calculate the total length of record */ @@ -10702,6 +10703,18 @@ TABLE *create_virtual_tmp_table(THD *thd, List<Create_field> &field_list) null_bit= 1; } } + if (cur_field->type() == MYSQL_TYPE_BIT && + cur_field->key_type() == HA_KEYTYPE_BIT) + { + /* This is a Field_bit since key_type is HA_KEYTYPE_BIT */ + static_cast<Field_bit*>(cur_field)->set_bit_ptr(null_pos, null_bit); + null_bit+= cur_field->field_length & 7; + if (null_bit > 7) + { + null_pos++; + null_bit-= 8; + } + } cur_field->reset(); field_pos+= cur_field->pack_length(); diff --git a/sql/sql_table.cc b/sql/sql_table.cc index c4904feffa6..cf8a28b7216 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -1736,7 +1736,7 @@ end: */ int write_bin_log(THD *thd, bool clear_error, - char const *query, ulong query_length) + char const *query, ulong query_length, bool is_trans) { int error= 0; if (mysql_bin_log.is_open()) @@ -1747,7 +1747,8 @@ int write_bin_log(THD *thd, bool clear_error, else errcode= query_error_code(thd, TRUE); error= thd->binlog_query(THD::STMT_QUERY_TYPE, - query, query_length, FALSE, FALSE, errcode); + query, query_length, is_trans, FALSE, FALSE, + errcode); } return error; } @@ -1860,7 +1861,7 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, LINT_INIT(alias); LINT_INIT(path_length); - if (thd->current_stmt_binlog_row_based && !dont_log_query) + if (thd->is_current_stmt_binlog_format_row() && !dont_log_query) { built_query.set_charset(system_charset_info); if (if_exists) @@ -1920,7 +1921,7 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, // removed temporary table tmp_table_deleted= 1; if (thd->variables.binlog_format == BINLOG_FORMAT_MIXED && - thd->current_stmt_binlog_row_based) + thd->is_current_stmt_binlog_format_row()) { if (built_tmp_query.is_empty()) { @@ -1954,7 +1955,7 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, being built. The string always end in a comma and the comma will be chopped off before being written to the binary log. */ - if (!drop_temporary && thd->current_stmt_binlog_row_based && !dont_log_query) + if (!drop_temporary && thd->is_current_stmt_binlog_format_row() && !dont_log_query) { non_temp_tables_count++; /* @@ -2088,7 +2089,7 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, query_cache_invalidate3(thd, tables, 0); if (!dont_log_query) { - if (!thd->current_stmt_binlog_row_based || + if (!thd->is_current_stmt_binlog_format_row() || (non_temp_tables_count > 0 && !tmp_table_deleted)) { /* @@ -2100,7 +2101,7 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, */ error |= write_bin_log(thd, !error, thd->query(), thd->query_length()); } - else if (thd->current_stmt_binlog_row_based && + else if (thd->is_current_stmt_binlog_format_row() && tmp_table_deleted) { if (non_temp_tables_count > 0) @@ -2139,7 +2140,8 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, */ built_tmp_query.chop(); // Chop of the last comma built_tmp_query.append(" /* generated by server */"); - error|= write_bin_log(thd, !error, built_tmp_query.ptr(), built_tmp_query.length()); + error|= write_bin_log(thd, !error, built_tmp_query.ptr(), built_tmp_query.length(), + thd->in_multi_stmt_transaction()); } } @@ -3592,8 +3594,8 @@ static inline int write_create_table_bin_log(THD *thd, Otherwise, the statement shall be binlogged. */ if (!internal_tmp_table && - (!thd->current_stmt_binlog_row_based || - (thd->current_stmt_binlog_row_based && + (!thd->is_current_stmt_binlog_format_row() || + (thd->is_current_stmt_binlog_format_row() && !(create_info->options & HA_LEX_CREATE_TMP_TABLE)))) return write_bin_log(thd, TRUE, thd->query(), thd->query_length()); return 0; @@ -5361,7 +5363,7 @@ binlog: /* We have to write the query before we unlock the tables. */ - if (thd->current_stmt_binlog_row_based) + if (thd->is_current_stmt_binlog_format_row()) { /* Since temporary tables are not replicated under row-based @@ -6565,7 +6567,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name, { thd->clear_error(); Query_log_event qinfo(thd, thd->query(), thd->query_length(), - 0, FALSE, 0); + FALSE, TRUE, FALSE, 0); if (error= mysql_bin_log.write(&qinfo)) goto view_err_unlock; } @@ -7309,8 +7311,8 @@ view_err: /* Should pass the 'new_name' as we store table name in the cache */ if (rename_temporary_table(thd, new_table, new_db, new_name)) goto err1; - /* We don't replicate alter table statement on temporary tables */ - if (!thd->current_stmt_binlog_row_based && + + if (!thd->is_current_stmt_binlog_format_row() && write_bin_log(thd, TRUE, thd->query(), thd->query_length())) DBUG_RETURN(TRUE); goto end_temporary; @@ -7473,7 +7475,7 @@ view_err: db, table_name); DBUG_ASSERT(!(mysql_bin_log.is_open() && - thd->current_stmt_binlog_row_based && + thd->is_current_stmt_binlog_format_row() && (create_info->options & HA_LEX_CREATE_TMP_TABLE))); if (write_bin_log(thd, TRUE, thd->query(), thd->query_length())) DBUG_RETURN(TRUE); diff --git a/sql/sql_udf.cc b/sql/sql_udf.cc index 3a7309a0ea4..b7595b4fdcb 100644 --- a/sql/sql_udf.cc +++ b/sql/sql_udf.cc @@ -436,8 +436,8 @@ int mysql_create_function(THD *thd,udf_func *udf) Turn off row binlogging of this statement and use statement-based so that all supporting tables are updated for CREATE FUNCTION command. */ - if (thd->current_stmt_binlog_row_based) - thd->clear_current_stmt_binlog_row_based(); + if (thd->is_current_stmt_binlog_format_row()) + thd->clear_current_stmt_binlog_format_row(); rw_wrlock(&THR_LOCK_udf); if ((my_hash_search(&udf_hash,(uchar*) udf->name.str, udf->name.length))) @@ -539,8 +539,8 @@ int mysql_drop_function(THD *thd,const LEX_STRING *udf_name) Turn off row binlogging of this statement and use statement-based so that all supporting tables are updated for DROP FUNCTION command. */ - if (thd->current_stmt_binlog_row_based) - thd->clear_current_stmt_binlog_row_based(); + if (thd->is_current_stmt_binlog_format_row()) + thd->clear_current_stmt_binlog_format_row(); rw_wrlock(&THR_LOCK_udf); if (!(udf=(udf_func*) my_hash_search(&udf_hash,(uchar*) udf_name->str, diff --git a/sql/sql_update.cc b/sql/sql_update.cc index cc92d224cd9..0698116f3c0 100644 --- a/sql/sql_update.cc +++ b/sql/sql_update.cc @@ -792,6 +792,9 @@ int mysql_update(THD *thd, { query_cache_invalidate3(thd, table_list, 1); } + + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; /* error < 0 means really no error at all: we processed all rows until the @@ -814,13 +817,11 @@ int mysql_update(THD *thd, if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), thd->query_length(), - transactional_table, FALSE, errcode)) + transactional_table, FALSE, FALSE, errcode)) { error=1; // Rollback update } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } DBUG_ASSERT(transactional_table || !updated || thd->transaction.stmt.modified_non_trans_table); free_underlaid_joins(thd, select_lex); @@ -884,19 +885,6 @@ bool mysql_prepare_update(THD *thd, TABLE_LIST *table_list, SELECT_LEX *select_lex= &thd->lex->select_lex; DBUG_ENTER("mysql_prepare_update"); - /* - Statement-based replication of UPDATE ... LIMIT is not safe as order of - rows is not defined, so in mixed mode we go to row-based. - - Note that we may consider a statement as safe if ORDER BY primary_key - is present. However it may confuse users to see very similiar statements - replicated differently. - */ - if (thd->lex->current_select->select_limit) - { - thd->lex->set_stmt_unsafe(); - thd->set_current_stmt_binlog_row_based_if_mixed(); - } #ifndef NO_EMBEDDED_ACCESS_CHECKS table_list->grant.want_privilege= table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege); @@ -1763,10 +1751,10 @@ bool multi_update::send_data(List<Item> ¬_used_values) /* non-transactional or transactional table got modified */ /* either multi_update class' flag is raised in its branch */ if (table->file->has_transactions()) - transactional_tables= 1; + transactional_tables= TRUE; else { - trans_safe= 0; + trans_safe= FALSE; thd->transaction.stmt.modified_non_trans_table= TRUE; } } @@ -1874,10 +1862,9 @@ void multi_update::abort() into repl event. */ int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); - /* the error of binary logging is ignored */ (void)thd->binlog_query(THD::ROW_QUERY_TYPE, - thd->query(), thd->query_length(), - transactional_tables, FALSE, errcode); + thd->query(), thd->query_length(), + transactional_tables, FALSE, FALSE, errcode); } thd->transaction.all.modified_non_trans_table= TRUE; } @@ -2015,10 +2002,10 @@ int multi_update::do_updates() if (updated != org_updated) { if (table->file->has_transactions()) - transactional_tables= 1; + transactional_tables= TRUE; else { - trans_safe= 0; // Can't do safe rollback + trans_safe= FALSE; // Can't do safe rollback thd->transaction.stmt.modified_non_trans_table= TRUE; } } @@ -2047,10 +2034,10 @@ err2: if (updated != org_updated) { if (table->file->has_transactions()) - transactional_tables= 1; + transactional_tables= TRUE; else { - trans_safe= 0; + trans_safe= FALSE; thd->transaction.stmt.modified_non_trans_table= TRUE; } } @@ -2096,8 +2083,9 @@ bool multi_update::send_eof() either from the query's list or via a stored routine: bug#13270,23333 */ - DBUG_ASSERT(trans_safe || !updated || - thd->transaction.stmt.modified_non_trans_table); + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + if (local_error == 0 || thd->transaction.stmt.modified_non_trans_table) { if (mysql_bin_log.is_open()) @@ -2109,14 +2097,15 @@ bool multi_update::send_eof() errcode= query_error_code(thd, killed_status == THD::NOT_KILLED); if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), thd->query_length(), - transactional_tables, FALSE, errcode)) + transactional_tables, FALSE, FALSE, errcode)) { local_error= 1; // Rollback update } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } + DBUG_ASSERT(trans_safe || !updated || + thd->transaction.stmt.modified_non_trans_table); + if (local_error != 0) error_handled= TRUE; // to force early leave from ::send_error() diff --git a/sql/sql_view.cc b/sql/sql_view.cc index 8305303f351..e3515dbdf4c 100644 --- a/sql/sql_view.cc +++ b/sql/sql_view.cc @@ -663,7 +663,7 @@ bool mysql_create_view(THD *thd, TABLE_LIST *views, int errcode= query_error_code(thd, TRUE); if (thd->binlog_query(THD::STMT_QUERY_TYPE, - buff.ptr(), buff.length(), FALSE, FALSE, errcode)) + buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcode)) res= TRUE; } @@ -1309,8 +1309,8 @@ bool mysql_make_view(THD *thd, File_parser *parser, TABLE_LIST *table, If the view's body needs row-based binlogging (e.g. the VIEW is created from SELECT UUID()), the top statement also needs it. */ - if (lex->is_stmt_unsafe()) - old_lex->set_stmt_unsafe(); + old_lex->set_stmt_unsafe_flags(lex->get_stmt_unsafe_flags()); + view_is_mergeable= (table->algorithm != VIEW_ALGORITHM_TMPTABLE && lex->can_be_merged()); diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 0d3610ccee1..ed134f80312 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -936,6 +936,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token ENUM %token EQ /* OPERATOR */ %token EQUAL_SYM /* OPERATOR */ +%token ERROR_SYM %token ERRORS %token ESCAPED %token ESCAPE_SYM /* SQL-2003-R */ @@ -969,6 +970,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token FULLTEXT_SYM %token FUNCTION_SYM /* SQL-2003-R */ %token GE +%token GENERAL %token GEOMETRYCOLLECTION %token GEOMETRY_SYM %token GET_FORMAT /* MYSQL-FUNC */ @@ -1188,6 +1190,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token REDUNDANT_SYM %token REFERENCES /* SQL-2003-R */ %token REGEXP +%token RELAY %token RELAYLOG_SYM %token RELAY_LOG_FILE_SYM %token RELAY_LOG_POS_SYM @@ -1245,6 +1248,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token SIGNED_SYM %token SIMPLE_SYM /* SQL-2003-N */ %token SLAVE +%token SLOW %token SMALLINT /* SQL-2003-R */ %token SNAPSHOT_SYM %token SOCKET_SYM @@ -7892,7 +7896,7 @@ function_call_keyword: $$= new (YYTHD->mem_root) Item_func_current_user(Lex->current_context()); if ($$ == NULL) MYSQL_YYABORT; - Lex->set_stmt_unsafe(); + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); Lex->safe_to_cache_query= 0; } | DATE_SYM '(' expr ')' @@ -8047,7 +8051,7 @@ function_call_keyword: $$= new (YYTHD->mem_root) Item_func_user(); if ($$ == NULL) MYSQL_YYABORT; - Lex->set_stmt_unsafe(); + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); Lex->safe_to_cache_query=0; } | YEAR_SYM '(' expr ')' @@ -8197,7 +8201,7 @@ function_call_nonkeyword: sysdate_is_now=1, because the slave may have sysdate_is_now=0. */ - Lex->set_stmt_unsafe(); + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); if (global_system_variables.sysdate_is_now == 0) $$= new (YYTHD->mem_root) Item_func_sysdate_local(); else @@ -8791,7 +8795,7 @@ variable_aux: if (!($$= get_system_var(YYTHD, $2, $3, $4))) MYSQL_YYABORT; if (!((Item_func_get_system_var*) $$)->is_written_to_binlog()) - Lex->set_stmt_unsafe(); + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_VARIABLE); } ; @@ -9735,7 +9739,10 @@ opt_limit_clause: ; limit_clause: - LIMIT limit_options {} + LIMIT limit_options + { + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_LIMIT); + } ; limit_options: @@ -9797,6 +9804,7 @@ delete_limit_clause: { SELECT_LEX *sel= Select; sel->select_limit= $2; + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_LIMIT); sel->explicit_limit= 1; } ; @@ -10246,13 +10254,21 @@ insert_lock_option: #endif } | LOW_PRIORITY { $$= TL_WRITE_LOW_PRIORITY; } - | DELAYED_SYM { $$= TL_WRITE_DELAYED; } + | DELAYED_SYM + { + $$= TL_WRITE_DELAYED; + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_DELAYED); + } | HIGH_PRIORITY { $$= TL_WRITE; } ; replace_lock_option: opt_low_priority { $$= $1; } - | DELAYED_SYM { $$= TL_WRITE_DELAYED; } + | DELAYED_SYM + { + $$= TL_WRITE_DELAYED; + Lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_DELAYED); + } ; insert2: @@ -11036,6 +11052,18 @@ flush_option: opt_table_list {} | TABLES WITH READ_SYM LOCK_SYM { Lex->type|= REFRESH_TABLES | REFRESH_READ_LOCK; } + | ERROR_SYM LOGS_SYM + { Lex->type|= REFRESH_ERROR_LOG; } + | ENGINE_SYM LOGS_SYM + { Lex->type|= REFRESH_ENGINE_LOG; } + | GENERAL LOGS_SYM + { Lex->type|= REFRESH_GENERAL_LOG; } + | SLOW LOGS_SYM + { Lex->type|= REFRESH_SLOW_LOG; } + | BINARY LOGS_SYM + { Lex->type|= REFRESH_BINARY_LOG; } + | RELAY LOGS_SYM + { Lex->type|= REFRESH_RELAY_LOG; } | QUERY_SYM CACHE_SYM { Lex->type|= REFRESH_QUERY_CACHE_FREE; } | HOSTS_SYM @@ -12185,6 +12213,7 @@ keyword_sp: | ENUM {} | ENGINE_SYM {} | ENGINES_SYM {} + | ERROR_SYM {} | ERRORS {} | ESCAPE_SYM {} | EVENT_SYM {} @@ -12310,6 +12339,7 @@ keyword_sp: | REDO_BUFFER_SIZE_SYM {} | REDOFILE_SYM {} | REDUNDANT_SYM {} + | RELAY {} | RELAYLOG_SYM {} | RELAY_LOG_FILE_SYM {} | RELAY_LOG_POS_SYM {} |