diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 666 |
1 files changed, 416 insertions, 250 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index c07bb573188..29d1fd8a084 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -1,4 +1,4 @@ -/* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc. +/* Copyright 2000-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,7 +16,7 @@ #ifdef MYSQL_CLIENT -#include "mysql_priv.h" +#include "sql_priv.h" #else @@ -24,19 +24,32 @@ #pragma implementation // gcc: Class implementation #endif -#include "mysql_priv.h" +#include "sql_priv.h" +#include "unireg.h" +#include "my_global.h" // REQUIRED by log_event.h > m_string.h > my_bitmap.h +#include "log_event.h" +#include "sql_base.h" // close_thread_tables +#include "sql_cache.h" // QUERY_CACHE_FLAGS_SIZE +#include "sql_locale.h" // MY_LOCALE, my_locale_by_number, my_locale_en_US +#include "key.h" // key_copy +#include "lock.h" // mysql_unlock_tables +#include "sql_parse.h" // mysql_test_parse_for_slave +#include "tztime.h" // struct Time_zone +#include "sql_load.h" // mysql_load +#include "sql_db.h" // load_db_opt_by_name #include "slave.h" #include "rpl_rli.h" #include "rpl_mi.h" #include "rpl_filter.h" -#include "rpl_utility.h" #include "rpl_record.h" +#include "transaction.h" #include <my_dir.h> #endif /* MYSQL_CLIENT */ #include <base64.h> #include <my_bitmap.h> +#include "rpl_utility.h" #define log_cs &my_charset_latin1 @@ -134,7 +147,7 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error, char buff[MAX_SLAVE_ERRMSG], *slider; const char *buff_end= buff + sizeof(buff); uint len; - List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); + List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list()); MYSQL_ERROR *err; buff[0]= 0; @@ -142,10 +155,11 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error, slider += len, err= it++) { len= my_snprintf(slider, buff_end - slider, - " %s, Error_code: %d;", err->msg, err->code); + " %s, Error_code: %d;", err->get_message_text(), + err->get_sql_errno()); } - rli->report(level, thd->is_error()? thd->main_da.sql_errno() : 0, + rli->report(level, thd->is_error()? thd->stmt_da->sql_errno() : 0, "Could not execute %s event on table %s.%s;" "%s handler error %s; " "the event's master log %s, end_log_pos %lu", @@ -353,13 +367,13 @@ inline int ignored_error_code(int err_code) */ int convert_handler_error(int error, THD* thd, TABLE *table) { - uint actual_error= (thd->is_error() ? thd->main_da.sql_errno() : + uint actual_error= (thd->is_error() ? thd->stmt_da->sql_errno() : 0); if (actual_error == 0) { table->file->print_error(error, MYF(0)); - actual_error= (thd->is_error() ? thd->main_da.sql_errno() : + actual_error= (thd->is_error() ? thd->stmt_da->sql_errno() : ER_UNKNOWN_ERROR); if (actual_error == ER_UNKNOWN_ERROR) if (global_system_variables.log_warnings) @@ -496,7 +510,7 @@ static void cleanup_load_tmpdir() if (is_prefix(file->name, prefbuf)) { fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME); - my_delete(fname, MYF(0)); + mysql_file_delete(key_file_misc, fname, MYF(0)); } } @@ -664,10 +678,12 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) { server_id= thd->server_id; when= thd->start_time; - cache_stmt= using_trans; + cache_type= (using_trans || stmt_has_updated_trans_table(thd) + || thd->thread_temporary_used + ? Log_event::EVENT_TRANSACTIONAL_CACHE : + Log_event::EVENT_STMT_CACHE); } - /** This minimal constructor is for when you are not even sure that there is a valid THD. For example in the server when we are shutting down or @@ -676,8 +692,8 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) */ Log_event::Log_event() - :temp_buf(0), exec_time(0), flags(0), cache_stmt(0), - thd(0) + :temp_buf(0), exec_time(0), flags(0), + cache_type(Log_event::EVENT_INVALID_CACHE), thd(0) { server_id= ::server_id; /* @@ -696,7 +712,7 @@ Log_event::Log_event() Log_event::Log_event(const char* buf, const Format_description_log_event* description_event) - :temp_buf(0), cache_stmt(0) + :temp_buf(0), cache_type(Log_event::EVENT_INVALID_CACHE) { #ifndef MYSQL_CLIENT thd = 0; @@ -966,7 +982,7 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) */ int Log_event::read_log_event(IO_CACHE* file, String* packet, - pthread_mutex_t* log_lock) + mysql_mutex_t* log_lock) { ulong data_len; int result=0; @@ -974,7 +990,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, DBUG_ENTER("Log_event::read_log_event"); if (log_lock) - pthread_mutex_lock(log_lock); + mysql_mutex_lock(log_lock); if (my_b_read(file, (uchar*) buf, sizeof(buf))) { /* @@ -1032,14 +1048,14 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, end: if (log_lock) - pthread_mutex_unlock(log_lock); + mysql_mutex_unlock(log_lock); DBUG_RETURN(result); } #endif /* !MYSQL_CLIENT */ #ifndef MYSQL_CLIENT -#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock); -#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock); +#define UNLOCK_MUTEX if (log_lock) mysql_mutex_unlock(log_lock); +#define LOCK_MUTEX if (log_lock) mysql_mutex_lock(log_lock); #else #define UNLOCK_MUTEX #define LOCK_MUTEX @@ -1051,7 +1067,7 @@ end: Allocates memory; The caller is responsible for clean-up. */ Log_event* Log_event::read_log_event(IO_CACHE* file, - pthread_mutex_t* log_lock, + mysql_mutex_t* log_lock, const Format_description_log_event *description_event) #else @@ -1199,15 +1215,11 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, */ if (description_event->event_type_permutation) { - IF_DBUG({ - int new_event_type= - description_event->event_type_permutation[event_type]; - DBUG_PRINT("info", - ("converting event type %d to %d (%s)", - event_type, new_event_type, - get_type_str((Log_event_type)new_event_type))); - }); - event_type= description_event->event_type_permutation[event_type]; + int new_event_type= description_event->event_type_permutation[event_type]; + DBUG_PRINT("info", ("converting event type %d to %d (%s)", + event_type, new_event_type, + get_type_str((Log_event_type)new_event_type))); + event_type= new_event_type; } switch(event_type) { @@ -1570,37 +1582,14 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr, /* a long CHAR() field: see #37426 */ length= byte1 | (((byte0 & 0x30) ^ 0x30) << 4); type= byte0 | 0x30; - goto beg; - } - - switch (byte0) - { - case MYSQL_TYPE_SET: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_STRING: - type= byte0; - length= byte1; - break; - - default: - - { - char tmp[5]; - my_snprintf(tmp, sizeof(tmp), "%04X", meta); - my_b_printf(file, - "!! Don't know how to handle column type=%d meta=%d (%s)", - type, meta, tmp); - return 0; - } } + else + length = meta & 0xFF; } else length= meta; } - -beg: - switch (type) { case MYSQL_TYPE_LONG: { @@ -1737,6 +1726,33 @@ beg: return 3; } + case MYSQL_TYPE_NEWDATE: + { + uint32 tmp= uint3korr(ptr); + int part; + char buf[11]; + char *pos= &buf[10]; // start from '\0' to the beginning + + /* Copied from field.cc */ + *pos--=0; // End NULL + part=(int) (tmp & 31); + *pos--= (char) ('0'+part%10); + *pos--= (char) ('0'+part/10); + *pos--= ':'; + part=(int) (tmp >> 5 & 15); + *pos--= (char) ('0'+part%10); + *pos--= (char) ('0'+part/10); + *pos--= ':'; + part=(int) (tmp >> 9); + *pos--= (char) ('0'+part%10); part/=10; + *pos--= (char) ('0'+part%10); part/=10; + *pos--= (char) ('0'+part%10); part/=10; + *pos= (char) ('0'+part); + my_b_printf(file , "'%s'", buf); + my_snprintf(typestr, typestr_length, "DATE"); + return 3; + } + case MYSQL_TYPE_DATE: { uint i32= uint3korr(ptr); @@ -1755,7 +1771,7 @@ beg: } case MYSQL_TYPE_ENUM: - switch (length) { + switch (meta & 0xFF) { case 1: my_b_printf(file, "%d", (int) *ptr); my_snprintf(typestr, typestr_length, "ENUM(1 byte)"); @@ -1768,15 +1784,15 @@ beg: return 2; } default: - my_b_printf(file, "!! Unknown ENUM packlen=%d", length); + my_b_printf(file, "!! Unknown ENUM packlen=%d", meta & 0xFF); return 0; } break; case MYSQL_TYPE_SET: - my_b_write_bit(file, ptr , length * 8); - my_snprintf(typestr, typestr_length, "SET(%d bytes)", length); - return length; + my_b_write_bit(file, ptr , (meta & 0xFF) * 8); + my_snprintf(typestr, typestr_length, "SET(%d bytes)", meta & 0xFF); + return meta & 0xFF; case MYSQL_TYPE_BLOB: switch (meta) { @@ -2368,7 +2384,7 @@ Query_log_event::Query_log_event() */ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length, bool using_trans, - bool suppress_use, int errcode) + bool direct, bool suppress_use, int errcode) :Log_event(thd_arg, (thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : @@ -2427,7 +2443,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, the autocommit flag as written by the master to the binlog. This behavior may change after WL#4162 has been implemented. */ - flags2= (uint32) (thd_arg->options & + flags2= (uint32) (thd_arg->variables.option_bits & (OPTIONS_WRITTEN_TO_BIN_LOG & ~OPTION_NOT_AUTOCOMMIT)); DBUG_ASSERT(thd_arg->variables.character_set_client->number < 256*256); DBUG_ASSERT(thd_arg->variables.collation_connection->number < 256*256); @@ -2448,6 +2464,100 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, } else time_zone_len= 0; + + /* + In what follows, we decide whether to write to the binary log or to use a + cache. + */ + LEX *lex= thd->lex; + bool implicit_commit= FALSE; + bool force_trans= FALSE; + cache_type= Log_event::EVENT_INVALID_CACHE; + switch (lex->sql_command) + { + case SQLCOM_ALTER_DB: + case SQLCOM_CREATE_FUNCTION: + case SQLCOM_DROP_FUNCTION: + case SQLCOM_DROP_PROCEDURE: + case SQLCOM_INSTALL_PLUGIN: + case SQLCOM_UNINSTALL_PLUGIN: + case SQLCOM_ALTER_TABLESPACE: + implicit_commit= TRUE; + break; + case SQLCOM_DROP_TABLE: + force_trans= lex->drop_temporary && thd->in_multi_stmt_transaction(); + implicit_commit= !force_trans; + break; + case SQLCOM_ALTER_TABLE: + case SQLCOM_CREATE_TABLE: + force_trans= (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) && + thd->in_multi_stmt_transaction(); + implicit_commit= !force_trans && + !(lex->select_lex.item_list.elements && + thd->is_current_stmt_binlog_format_row()); + break; + case SQLCOM_SET_OPTION: + implicit_commit= (lex->autocommit ? TRUE : FALSE); + break; + /* + Replace what follows after CF_AUTO_COMMIT_TRANS is backported by: + + default: + implicit_commit= ((sql_command_flags[lex->sql_command] & + CF_AUTO_COMMIT_TRANS)); + break; + */ + case SQLCOM_CREATE_INDEX: + case SQLCOM_TRUNCATE: + case SQLCOM_CREATE_DB: + case SQLCOM_DROP_DB: + case SQLCOM_ALTER_DB_UPGRADE: + case SQLCOM_RENAME_TABLE: + case SQLCOM_DROP_INDEX: + case SQLCOM_CREATE_VIEW: + case SQLCOM_DROP_VIEW: + case SQLCOM_CREATE_TRIGGER: + case SQLCOM_DROP_TRIGGER: + case SQLCOM_CREATE_EVENT: + case SQLCOM_ALTER_EVENT: + case SQLCOM_DROP_EVENT: + case SQLCOM_REPAIR: + case SQLCOM_OPTIMIZE: + case SQLCOM_ANALYZE: + case SQLCOM_CREATE_USER: + case SQLCOM_DROP_USER: + case SQLCOM_RENAME_USER: + case SQLCOM_REVOKE_ALL: + case SQLCOM_REVOKE: + case SQLCOM_GRANT: + case SQLCOM_CREATE_PROCEDURE: + case SQLCOM_CREATE_SPFUNCTION: + case SQLCOM_ALTER_PROCEDURE: + case SQLCOM_ALTER_FUNCTION: + case SQLCOM_ASSIGN_TO_KEYCACHE: + case SQLCOM_PRELOAD_KEYS: + case SQLCOM_FLUSH: + case SQLCOM_RESET: + case SQLCOM_CHECK: + implicit_commit= TRUE; + break; + default: + implicit_commit= FALSE; + break; + } + + if (implicit_commit || direct) + { + cache_type= Log_event::EVENT_NO_CACHE; + } + else + { + cache_type= ((using_trans || stmt_has_updated_trans_table(thd) || + force_trans || thd->thread_temporary_used) + ? Log_event::EVENT_TRANSACTIONAL_CACHE : + Log_event::EVENT_STMT_CACHE); + } + DBUG_ASSERT(cache_type != Log_event::EVENT_INVALID_CACHE); DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %lu", (ulong) flags2, sql_mode)); } @@ -3066,7 +3176,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, } else { - const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); } /* @@ -3082,10 +3192,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, if (is_trans_keyword() || rpl_filter->db_ok(thd->db)) { thd->set_time((time_t)when); - thd->set_query((char*)query_arg, q_len_arg); - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query_id = next_query_id(); - VOID(pthread_mutex_unlock(&LOCK_thread_count)); + thd->set_query_and_id((char*)query_arg, q_len_arg, next_query_id()); thd->variables.pseudo_thread_id= thread_id; // for temp tables DBUG_PRINT("query",("%s", thd->query())); @@ -3094,13 +3201,13 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, { if (flags2_inited) /* - all bits of thd->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG + all bits of thd->variables.option_bits which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG must take their value from flags2. */ - thd->options= flags2|(thd->options & ~OPTIONS_WRITTEN_TO_BIN_LOG); + thd->variables.option_bits= flags2|(thd->variables.option_bits & ~OPTIONS_WRITTEN_TO_BIN_LOG); /* else, we are in a 3.23/4.0 binlog; we previously received a - Rotate_log_event which reset thd->options and sql_mode etc, so + Rotate_log_event which reset thd->variables.option_bits and sql_mode etc, so nothing to do. */ /* @@ -3180,8 +3287,8 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, thd->table_map_for_update= (table_map)table_map_for_update; /* Execute the query (note that we bypass dispatch_command()) */ - const char* found_semicolon= NULL; - mysql_parse(thd, thd->query(), thd->query_length(), &found_semicolon); + Parser_state parser_state(thd, thd->query(), thd->query_length()); + mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); log_slow_statement(thd); /* @@ -3222,7 +3329,7 @@ START SLAVE; . Query: '%s'", expected_error, thd->query()); } /* If the query was not ignored, it is printed to the general log */ - if (!thd->is_error() || thd->main_da.sql_errno() != ER_SLAVE_IGNORED_TABLE) + if (!thd->is_error() || thd->stmt_da->sql_errno() != ER_SLAVE_IGNORED_TABLE) general_log_write(thd, COM_QUERY, thd->query(), thd->query_length()); compare_errors: @@ -3235,14 +3342,14 @@ compare_errors: not exist errors", we silently clear the error if TEMPORARY was used. */ if (thd->lex->sql_command == SQLCOM_DROP_TABLE && thd->lex->drop_temporary && - thd->is_error() && thd->main_da.sql_errno() == ER_BAD_TABLE_ERROR && + thd->is_error() && thd->stmt_da->sql_errno() == ER_BAD_TABLE_ERROR && !expected_error) - thd->main_da.reset_diagnostics_area(); + thd->stmt_da->reset_diagnostics_area(); /* If we expected a non-zero error code, and we don't get the same error code, and it should be ignored or is related to a concurrency issue. */ - actual_error= thd->is_error() ? thd->main_da.sql_errno() : 0; + actual_error= thd->is_error() ? thd->stmt_da->sql_errno() : 0; DBUG_PRINT("info",("expected_error: %d sql_errno: %d", expected_error, actual_error)); if ((expected_error && expected_error != actual_error && @@ -3257,7 +3364,7 @@ Error on master: '%s' (%d), Error on slave: '%s' (%d). \ Default database: '%s'. Query: '%s'", ER_SAFE(expected_error), expected_error, - actual_error ? thd->main_da.message() : "no error", + actual_error ? thd->stmt_da->message() : "no error", actual_error, print_slave_db_safe(db), query_arg); thd->is_slave_error= 1; @@ -3281,7 +3388,7 @@ Default database: '%s'. Query: '%s'", them back here. */ if (expected_error && expected_error == actual_error) - ha_autocommit_or_rollback(thd, TRUE); + trans_rollback_stmt(thd); } /* If we expected a non-zero error code and get nothing and, it is a concurrency @@ -3290,7 +3397,8 @@ Default database: '%s'. Query: '%s'", else if (expected_error && !actual_error && (concurrency_error_code(expected_error) || ignored_error_code(expected_error))) - ha_autocommit_or_rollback(thd, TRUE); + trans_rollback_stmt(thd); + /* Other cases: mostly we expected no error and get one. */ @@ -3298,7 +3406,7 @@ Default database: '%s'. Query: '%s'", { rli->report(ERROR_LEVEL, actual_error, "Error '%s' on query. Default database: '%s'. Query: '%s'", - (actual_error ? thd->main_da.message() : + (actual_error ? thd->stmt_da->message() : "unexpected success or fatal error"), print_slave_db_safe(thd->db), query_arg); thd->is_slave_error= 1; @@ -3327,6 +3435,21 @@ Default database: '%s'. Query: '%s'", */ } /* End of if (db_ok(... */ + {/** + The following failure injecion works in cooperation with tests + setting @@global.debug= 'd,stop_slave_middle_group'. + The sql thread receives the killed status and will proceed + to shutdown trying to finish incomplete events group. + */ + DBUG_EXECUTE_IF("stop_slave_middle_group", + if (strcmp("COMMIT", query) != 0 && + strcmp("BEGIN", query) != 0) + { + if (thd->transaction.all.modified_non_trans_table) + const_cast<Relay_log_info*>(rli)->abort_slave= 1; + };); + } + end: /* Probably we have set thd->query, thd->db, thd->catalog to point to places @@ -3385,13 +3508,13 @@ Query_log_event::do_shall_skip(Relay_log_info *rli) { if (strcmp("BEGIN", query) == 0) { - thd->options|= OPTION_BEGIN; + thd->variables.option_bits|= OPTION_BEGIN; DBUG_RETURN(Log_event::continue_group(rli)); } if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0) { - thd->options&= ~OPTION_BEGIN; + thd->variables.option_bits&= ~OPTION_BEGIN; DBUG_RETURN(Log_event::EVENT_SKIP_COUNT); } } @@ -3644,10 +3767,11 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) */ if (post_header_len) { +#ifndef DBUG_OFF // Allows us to sanity-check that all events initialized their // events (see the end of this 'if' block). - IF_DBUG(memset(post_header_len, 255, - number_of_event_types*sizeof(uint8));); + memset(post_header_len, 255, number_of_event_types*sizeof(uint8)); +#endif /* Note: all event types must explicitly fill in their lengths here. */ post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN; @@ -3698,13 +3822,12 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[UPDATE_ROWS_EVENT-1]= post_header_len[DELETE_ROWS_EVENT-1]= 6;); post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN; + post_header_len[HEARTBEAT_LOG_EVENT-1]= 0; // Sanity-check that all post header lengths are initialized. - IF_DBUG({ - int i; - for (i=0; i<number_of_event_types; i++) - assert(post_header_len[i] != 255); - }); + int i; + for (i=0; i<number_of_event_types; i++) + DBUG_ASSERT(post_header_len[i] != 255); } break; @@ -3929,7 +4052,6 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli) int ret= 0; DBUG_ENTER("Format_description_log_event::do_apply_event"); -#ifdef USING_TRANSACTIONS /* As a transaction NEVER spans on 2 or more binlogs: if we have an active transaction at this point, the master died @@ -3951,7 +4073,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli) "its binary log, thus rolled back too."); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 1); } -#endif + /* If this event comes from ourselves, there is no cleaning task to perform, we don't call Start_log_event_v3::do_apply_event() @@ -4620,22 +4742,14 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, if (rpl_filter->db_ok(thd->db)) { thd->set_time((time_t)when); - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query_id = next_query_id(); - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - /* - Initing thd->row_count is not necessary in theory as this variable has no - influence in the case of the slave SQL thread (it is used to generate a - "data truncated" warning but which is absorbed and never gets to the - error log); still we init it to avoid a Valgrind message. - */ - mysql_reset_errors(thd, 0); + thd->set_query_id(next_query_id()); + thd->warning_info->opt_clear_warning_info(thd->query_id); TABLE_LIST tables; - bzero((char*) &tables,sizeof(tables)); - tables.db= thd->strmake(thd->db, thd->db_length); - tables.alias = tables.table_name = (char*) table_name; - tables.lock_type = TL_WRITE; + tables.init_one_table(thd->strmake(thd->db, thd->db_length), + thd->db_length, + table_name, strlen(table_name), + table_name, TL_WRITE); tables.updating= 1; // the table will be opened in mysql_load @@ -4789,8 +4903,8 @@ error: int sql_errno; if (thd->is_error()) { - err= thd->main_da.message(); - sql_errno= thd->main_da.sql_errno(); + err= thd->stmt_da->message(); + sql_errno= thd->stmt_da->sql_errno(); } else { @@ -4983,7 +5097,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli) !is_relay_log_event() && !rli->is_in_group()) { - pthread_mutex_lock(&rli->data_lock); + mysql_mutex_lock(&rli->data_lock); DBUG_PRINT("info", ("old group_master_log_name: '%s' " "old group_master_log_pos: %lu", rli->group_master_log_name, @@ -4995,11 +5109,11 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli) "new group_master_log_pos: %lu", rli->group_master_log_name, (ulong) rli->group_master_log_pos)); - pthread_mutex_unlock(&rli->data_lock); + mysql_mutex_unlock(&rli->data_lock); flush_relay_log_info(rli); /* - Reset thd->options and sql_mode etc, because this could be the signal of + Reset thd->variables.option_bits and sql_mode etc, because this could be the signal of a master's downgrade from 5.0 to 4.0. However, no need to reset description_event_for_exec: indeed, if the next master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next @@ -5358,10 +5472,16 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) int Xid_log_event::do_apply_event(Relay_log_info const *rli) { + bool res; /* For a slave Xid_log_event is COMMIT */ general_log_print(thd, COM_QUERY, "COMMIT /* implicit, from Xid_log_event */"); - return end_trans(thd, COMMIT); + if (!(res= trans_commit(thd))) + { + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(); + } + return res; } Log_event::enum_skip_reason @@ -5369,7 +5489,7 @@ Xid_log_event::do_shall_skip(Relay_log_info *rli) { DBUG_ENTER("Xid_log_event::do_shall_skip"); if (rli->slave_skip_counter > 0) { - thd->options&= ~OPTION_BEGIN; + thd->variables.option_bits&= ~OPTION_BEGIN; DBUG_RETURN(Log_event::EVENT_SKIP_COUNT); } DBUG_RETURN(Log_event::do_shall_skip(rli)); @@ -5401,16 +5521,18 @@ void User_var_log_event::pack_info(Protocol* protocol) case REAL_RESULT: double real_val; float8get(real_val, val); - if (!(buf= (char*) my_malloc(val_offset + FLOATING_POINT_BUFFER, + if (!(buf= (char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1, MYF(MY_WME)))) return; - event_len+= my_sprintf(buf + val_offset, - (buf + val_offset, "%.14g", real_val)); + event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH, + buf + val_offset, NULL); break; case INT_RESULT: if (!(buf= (char*) my_malloc(val_offset + 22, MYF(MY_WME)))) return; - event_len= longlong10_to_str(uint8korr(val), buf + val_offset,-10)-buf; + event_len= longlong10_to_str(uint8korr(val), buf + val_offset, + ((flags & User_var_log_event::UNSIGNED_F) ? + 10 : -10))-buf; break; case DECIMAL_RESULT: { @@ -5468,12 +5590,14 @@ User_var_log_event(const char* buf, :Log_event(buf, description_event) { /* The Post-Header is empty. The Variable Data part begins immediately. */ + const char *start= buf; buf+= description_event->common_header_len + description_event->post_header_len[USER_VAR_EVENT-1]; name_len= uint4korr(buf); name= (char *) buf + UV_NAME_LEN_SIZE; buf+= UV_NAME_LEN_SIZE + name_len; is_null= (bool) *buf; + flags= User_var_log_event::UNDEF_F; // defaults to UNDEF_F if (is_null) { type= STRING_RESULT; @@ -5489,6 +5613,27 @@ User_var_log_event(const char* buf, UV_CHARSET_NUMBER_SIZE); val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE); + + /** + We need to check if this is from an old server + that did not pack information for flags. + We do this by checking if there are extra bytes + after the packed value. If there are we take the + extra byte and it's value is assumed to contain + the flags value. + + Old events will not have this extra byte, thence, + we keep the flags set to UNDEF_F. + */ + uint bytes_read= ((val + val_len) - start); + DBUG_ASSERT(bytes_read==data_written || + bytes_read==(data_written-1)); + if ((data_written - bytes_read) > 0) + { + flags= (uint) *(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + + UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE + + val_len); + } } } @@ -5500,6 +5645,7 @@ bool User_var_log_event::write(IO_CACHE* file) char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE]; uchar buf2[max(8, DECIMAL_MAX_FIELD_SIZE + 2)], *pos= buf2; + uint unsigned_len= 0; uint buf1_length; ulong event_length; @@ -5521,6 +5667,7 @@ bool User_var_log_event::write(IO_CACHE* file) break; case INT_RESULT: int8store(buf2, *(longlong*) val); + unsigned_len= 1; break; case DECIMAL_RESULT: { @@ -5545,13 +5692,14 @@ bool User_var_log_event::write(IO_CACHE* file) } /* Length of the whole event */ - event_length= sizeof(buf)+ name_len + buf1_length + val_len; + event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len; return (write_header(file, event_length) || my_b_safe_write(file, (uchar*) buf, sizeof(buf)) || my_b_safe_write(file, (uchar*) name, name_len) || my_b_safe_write(file, (uchar*) buf1, buf1_length) || - my_b_safe_write(file, pos, val_len)); + my_b_safe_write(file, pos, val_len) || + my_b_safe_write(file, &flags, unsigned_len)); } #endif @@ -5592,7 +5740,8 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) break; case INT_RESULT: char int_buf[22]; - longlong10_to_str(uint8korr(val), int_buf, -10); + longlong10_to_str(uint8korr(val), int_buf, + ((flags & User_var_log_event::UNSIGNED_F) ? 10 : -10)); my_b_printf(&cache, ":=%s%s\n", int_buf, print_event_info->delimiter); break; case DECIMAL_RESULT: @@ -5739,7 +5888,8 @@ int User_var_log_event::do_apply_event(Relay_log_info const *rli) a single record and with a single column. Thus, like a column value, it could always have IMPLICIT derivation. */ - e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0); + e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, + (flags & User_var_log_event::UNSIGNED_F)); free_root(thd->mem_root,0); return 0; @@ -5816,8 +5966,8 @@ Slave_log_event::Slave_log_event(THD* thd_arg, Master_info* mi = rli->mi; // TODO: re-write this better without holding both locks at the same time - pthread_mutex_lock(&mi->data_lock); - pthread_mutex_lock(&rli->data_lock); + mysql_mutex_lock(&mi->data_lock); + mysql_mutex_lock(&rli->data_lock); master_host_len = strlen(mi->host); master_log_len = strlen(rli->group_master_log_name); // on OOM, just do not initialize the structure and print the error @@ -5835,8 +5985,8 @@ Slave_log_event::Slave_log_event(THD* thd_arg, } else sql_print_error("Out of memory while recording slave event"); - pthread_mutex_unlock(&rli->data_lock); - pthread_mutex_unlock(&mi->data_lock); + mysql_mutex_unlock(&rli->data_lock); + mysql_mutex_unlock(&mi->data_lock); DBUG_VOID_RETURN; } #endif /* !MYSQL_CLIENT */ @@ -5970,7 +6120,7 @@ int Stop_log_event::do_update_pos(Relay_log_info *rli) could give false triggers in MASTER_POS_WAIT() that we have reached the target position when in fact we have not. */ - if (thd->options & OPTION_BEGIN) + if (thd->variables.option_bits & OPTION_BEGIN) rli->inc_event_relay_log_pos(); else { @@ -6193,10 +6343,12 @@ int Create_file_log_event::do_apply_event(Relay_log_info const *rli) fname_buf= strmov(proc_info, "Making temp file "); ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info"); thd_proc_info(thd, proc_info); - my_delete(fname_buf, MYF(0)); // old copy may exist already - if ((fd= my_create(fname_buf, CREATE_MODE, - O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, - MYF(MY_WME))) < 0 || + /* old copy may exist already */ + mysql_file_delete(key_file_log_event_info, fname_buf, MYF(0)); + if ((fd= mysql_file_create(key_file_log_event_info, + fname_buf, CREATE_MODE, + O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, + MYF(MY_WME))) < 0 || init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0, MYF(MY_WME|MY_NABP))) { @@ -6218,20 +6370,22 @@ int Create_file_log_event::do_apply_event(Relay_log_info const *rli) goto err; } end_io_cache(&file); - my_close(fd, MYF(0)); + mysql_file_close(fd, MYF(0)); // fname_buf now already has .data, not .info, because we did our trick - my_delete(fname_buf, MYF(0)); // old copy may exist already - if ((fd= my_create(fname_buf, CREATE_MODE, - O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, - MYF(MY_WME))) < 0) + /* old copy may exist already */ + mysql_file_delete(key_file_log_event_data, fname_buf, MYF(0)); + if ((fd= mysql_file_create(key_file_log_event_data, + fname_buf, CREATE_MODE, + O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, + MYF(MY_WME))) < 0) { rli->report(ERROR_LEVEL, my_errno, "Error in Create_file event: could not open file '%s'", fname_buf); goto err; } - if (my_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP))) + if (mysql_file_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP))) { rli->report(ERROR_LEVEL, my_errno, "Error in Create_file event: write to '%s' failed", @@ -6244,7 +6398,7 @@ err: if (error) end_io_cache(&file); if (fd >= 0) - my_close(fd, MYF(0)); + mysql_file_close(fd, MYF(0)); thd_proc_info(thd, 0); return error != 0; } @@ -6376,10 +6530,12 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli) */ lex_start(thd); mysql_reset_thd_for_next_command(thd); - my_delete(fname, MYF(0)); // old copy may exist already - if ((fd= my_create(fname, CREATE_MODE, - O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, - MYF(MY_WME))) < 0) + /* old copy may exist already */ + mysql_file_delete(key_file_log_event_data, fname, MYF(0)); + if ((fd= mysql_file_create(key_file_log_event_data, + fname, CREATE_MODE, + O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, + MYF(MY_WME))) < 0) { rli->report(ERROR_LEVEL, my_errno, "Error in %s event: could not create file '%s'", @@ -6387,8 +6543,10 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli) goto err; } } - else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW, - MYF(MY_WME))) < 0) + else if ((fd= mysql_file_open(key_file_log_event_data, + fname, + O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW, + MYF(MY_WME))) < 0) { rli->report(ERROR_LEVEL, my_errno, "Error in %s event: could not open file '%s'", @@ -6396,10 +6554,12 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli) goto err; } - DBUG_EXECUTE_IF("remove_slave_load_file_before_write", - my_close(fd,MYF(0)); fd= -1; my_delete(fname, MYF(0));); + DBUG_EXECUTE_IF("remove_slave_load_file_before_write", + { + my_delete_allow_opened(fname, MYF(0)); + }); - if (my_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP))) + if (mysql_file_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP))) { rli->report(ERROR_LEVEL, my_errno, "Error in %s event: write to '%s' failed", @@ -6410,7 +6570,7 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli) err: if (fd >= 0) - my_close(fd, MYF(0)); + mysql_file_close(fd, MYF(0)); thd_proc_info(thd, 0); DBUG_RETURN(error); } @@ -6504,9 +6664,9 @@ int Delete_file_log_event::do_apply_event(Relay_log_info const *rli) { char fname[FN_REFLEN+10]; char *ext= slave_load_file_stem(fname, file_id, server_id, ".data"); - (void) my_delete(fname, MYF(MY_WME)); + mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME)); strmov(ext, ".info"); - (void) my_delete(fname, MYF(MY_WME)); + mysql_file_delete(key_file_log_event_info, fname, MYF(MY_WME)); return 0; } #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ @@ -6607,8 +6767,9 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli) Load_log_event *lev= 0; ext= slave_load_file_stem(fname, file_id, server_id, ".info"); - if ((fd = my_open(fname, O_RDONLY | O_BINARY | O_NOFOLLOW, - MYF(MY_WME))) < 0 || + if ((fd= mysql_file_open(key_file_log_event_info, + fname, O_RDONLY | O_BINARY | O_NOFOLLOW, + MYF(MY_WME))) < 0 || init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0, MYF(MY_WME|MY_NABP))) { @@ -6618,7 +6779,7 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli) goto err; } if (!(lev = (Load_log_event*)Log_event::read_log_event(&file, - (pthread_mutex_t*)0, + (mysql_mutex_t*)0, rli->relay_log.description_event_for_exec)) || lev->get_type_code() != NEW_LOAD_EVENT) { @@ -6658,24 +6819,24 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli) } /* We have an open file descriptor to the .info file; we need to close it - or Windows will refuse to delete the file in my_delete(). + or Windows will refuse to delete the file in mysql_file_delete(). */ if (fd >= 0) { - my_close(fd, MYF(0)); + mysql_file_close(fd, MYF(0)); end_io_cache(&file); fd= -1; } - (void) my_delete(fname, MYF(MY_WME)); + mysql_file_delete(key_file_log_event_info, fname, MYF(MY_WME)); memcpy(ext, ".data", 6); - (void) my_delete(fname, MYF(MY_WME)); + mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME)); error = 0; err: delete lev; if (fd >= 0) { - my_close(fd, MYF(0)); + mysql_file_close(fd, MYF(0)); end_io_cache(&file); } return error; @@ -6740,9 +6901,9 @@ Execute_load_query_log_event(THD *thd_arg, const char* query_arg, ulong query_length_arg, uint fn_pos_start_arg, uint fn_pos_end_arg, enum_load_dup_handling dup_handling_arg, - bool using_trans, bool suppress_use, + bool using_trans, bool direct, bool suppress_use, int errcode): - Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, + Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, direct, suppress_use, errcode), file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg), fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg) @@ -6914,7 +7075,7 @@ Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli) file so that we can re-execute this event at START SLAVE. */ if (!error) - (void) my_delete(fname, MYF(MY_WME)); + mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME)); my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); return error; @@ -7037,9 +7198,9 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, DBUG_ASSERT((tbl_arg && tbl_arg->s && tid != ~0UL) || (!tbl_arg && !cols && tid == ~0UL)); - if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS) + if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS) set_flags(NO_FOREIGN_KEY_CHECKS_F); - if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS) + if (thd_arg->variables.option_bits & OPTION_RELAXED_UNIQUE_CHECKS) set_flags(RELAXED_UNIQUE_CHECKS_F); /* if bitmap_init fails, caught in is_valid() */ if (likely(!bitmap_init(&m_cols, @@ -7284,8 +7445,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ DBUG_ASSERT(get_flags(STMT_END_F)); - const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); - close_thread_tables(thd); + const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); thd->clear_error(); DBUG_RETURN(0); } @@ -7311,7 +7471,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) We also call the mysql_reset_thd_for_next_command(), since this is the logical start of the next "statement". Note that this - call might reset the value of current_stmt_binlog_row_based, so + call might reset the value of current_stmt_binlog_format, so we need to do any changes to that value after this function. */ lex_start(thd); @@ -7323,16 +7483,12 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ thd->transaction.stmt.modified_non_trans_table= FALSE; /* - Check if the slave is set to use SBR. If so, it should switch - to using RBR until the end of the "statement", i.e., next - STMT_END_F or next error. + This is a row injection, so we flag the "statement" as + such. Note that this code is called both when the slave does row + injections and when the BINLOG statement is used to do row + injections. */ - if (!thd->current_stmt_binlog_row_based && - mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG)) - { - thd->set_current_stmt_binlog_row_based(); - } - + thd->lex->set_stmt_row_injection(); /* There are a few flags that are replicated with each row event. @@ -7340,20 +7496,20 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) the event. */ if (get_flags(NO_FOREIGN_KEY_CHECKS_F)) - thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS; + thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS; else - thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; + thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; if (get_flags(RELAXED_UNIQUE_CHECKS_F)) - thd->options|= OPTION_RELAXED_UNIQUE_CHECKS; + thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS; else - thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; + thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS; /* A small test to verify that objects have consistent types */ - DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); + DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); - if (simple_open_n_lock_tables(thd, rli->tables_to_lock)) + if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0)) { - uint actual_error= thd->main_da.sql_errno(); + uint actual_error= thd->stmt_da->sql_errno(); if (thd->is_slave_error || thd->is_fatal_error) { /* @@ -7363,12 +7519,12 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) having severe errors which should not be skiped. */ rli->report(ERROR_LEVEL, actual_error, - "Error '%s' on opening tables", - (actual_error ? thd->main_da.message() : + "Error executing row event: '%s'", + (actual_error ? thd->stmt_da->message() : "unexpected success or fatal error")); thd->is_slave_error= 1; } - const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); DBUG_RETURN(actual_error); } @@ -7381,11 +7537,18 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ { + DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p", + rli->tables_to_lock)); RPL_TABLE_LIST *ptr= rli->tables_to_lock; for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global)) { - if (ptr->m_tabledef.compatible_with(rli, ptr->table)) + TABLE *conv_table; + if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli), + ptr->table, &conv_table)) { + DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master", + ptr->table->s->db.str, + ptr->table->s->table_name.str)); /* We should not honour --slave-skip-errors at this point as we are having severe errors which should not be skiped. @@ -7393,15 +7556,20 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) mysql_unlock_tables(thd, thd->lock); thd->lock= 0; thd->is_slave_error= 1; - const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); DBUG_RETURN(ERR_BAD_TABLE_DEF); } + DBUG_PRINT("debug", ("Table: %s.%s is compatible with master" + " - conv_table: %p", + ptr->table->s->db.str, + ptr->table->s->table_name.str, conv_table)); + ptr->m_conv_table= conv_table; } } /* - ... and then we add all the tables to the table map and remove - them from tables to lock. + ... and then we add all the tables to the table map and but keep + them in the tables to lock list. We also invalidate the query cache for all the tables, since they will now be changed. @@ -7503,8 +7671,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) { int actual_error= convert_handler_error(error, thd, table); bool idempotent_error= (idempotent_error_code(error) && - ((bit_is_set(slave_exec_mode, - SLAVE_EXEC_MODE_IDEMPOTENT)) == 1)); + (slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT)); bool ignored_error= (idempotent_error == 0 ? ignored_error_code(actual_error) : 0); @@ -7546,8 +7713,16 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) thd->transaction.stmt.modified_non_trans_table= TRUE; } // row processing loop - DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event", - const_cast<Relay_log_info*>(rli)->abort_slave= 1;); + {/** + The following failure injecion works in cooperation with tests + setting @@global.debug= 'd,stop_slave_middle_group'. + The sql thread receives the killed status and will proceed + to shutdown trying to finish incomplete events group. + */ + DBUG_EXECUTE_IF("stop_slave_middle_group", + if (thd->transaction.all.modified_non_trans_table) + const_cast<Relay_log_info*>(rli)->abort_slave= 1;); + } if ((error= do_after_row_operations(rli, error)) && ignored_error_code(convert_handler_error(error, thd, table))) @@ -7560,57 +7735,26 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); error= 0; } - - if (!cache_stmt) - { - DBUG_PRINT("info", ("Marked that we need to keep log")); - thd->options|= OPTION_KEEP_LOG; - } } // if (table) - /* - We need to delay this clear until here bacause unpack_current_row() uses - master-side table definitions stored in rli. - */ - if (rli->tables_to_lock && get_flags(STMT_END_F)) - const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); if (error) { slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table, get_type_str(), RPL_LOG_NAME, (ulong) log_pos); - thd->reset_current_stmt_binlog_row_based(); + /* + @todo We should probably not call + reset_current_stmt_binlog_format_row() from here. + + Note: this applies to log_event_old.cc too. + /Sven + */ + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); thd->is_slave_error= 1; DBUG_RETURN(error); } - /* - This code would ideally be placed in do_update_pos() instead, but - since we have no access to table there, we do the setting of - last_event_start_time here instead. - */ - else if (table && (table->s->primary_key == MAX_KEY) && - !cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS) - { - /* - ------------ Temporary fix until WL#2975 is implemented --------- - - This event is not the last one (no STMT_END_F). If we stop now - (in case of terminate_slave_thread()), how will we restart? We - have to restart from Table_map_log_event, but as this table is - not transactional, the rows already inserted will still be - present, and idempotency is not guaranteed (no PK) so we risk - that repeating leads to double insert. So we desperately try to - continue, hope we'll eventually leave this buggy situation (by - executing the final Rows_log_event). If we are in a hopeless - wait (reached end of last relay log and nothing gets appended - there), we timeout after one minute, and notify DBA about the - problem. When WL#2975 is implemented, just remove the member - Relay_log_info::last_event_start_time and all its occurrences. - */ - const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0); - } if (get_flags(STMT_END_F)) if ((error= rows_event_stmt_cleanup(rli, thd))) @@ -7667,7 +7811,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) (assume the last master's transaction is ignored by the slave because of replicate-ignore rules). */ - error= thd->binlog_flush_pending_rows_event(true); + error= thd->binlog_flush_pending_rows_event(TRUE); /* If this event is not in a transaction, the call below will, if some @@ -7678,7 +7822,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) are involved, commit the transaction and flush the pending event to the binlog. */ - error|= ha_autocommit_or_rollback(thd, error); + error|= (error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd)); /* Now what if this is not a transactional engine? we still need to @@ -7690,7 +7834,17 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) event flushed. */ - thd->reset_current_stmt_binlog_row_based(); + /* + @todo We should probably not call + reset_current_stmt_binlog_format_row() from here. + + Note: this applies to log_event_old.cc too + + Btw, the previous comment about transactional engines does not + seem related to anything that happens here. + /Sven + */ + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0); } @@ -7897,7 +8051,10 @@ int Table_map_log_event::save_field_metadata() DBUG_ENTER("Table_map_log_event::save_field_metadata"); int index= 0; for (unsigned int i= 0 ; i < m_table->s->fields ; i++) + { + DBUG_PRINT("debug", ("field_type: %d", m_coltype[i])); index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]); + } DBUG_RETURN(index); } #endif /* !defined(MYSQL_CLIENT) */ @@ -7910,7 +8067,7 @@ int Table_map_log_event::save_field_metadata() #if !defined(MYSQL_CLIENT) Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, bool is_transactional) - : Log_event(thd, 0, true), + : Log_event(thd, 0, is_transactional), m_table(tbl), m_dbnam(tbl->s->db.str), m_dblen(m_dbnam ? tbl->s->db.length : 0), @@ -8135,9 +8292,7 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli) DBUG_ASSERT(rli->sql_thd == thd); /* Step the query id to mark what columns that are actually used. */ - pthread_mutex_lock(&LOCK_thread_count); - thd->query_id= next_query_id(); - pthread_mutex_unlock(&LOCK_thread_count); + thd->set_query_id(next_query_id()); if (!(memory= my_multi_malloc(MYF(MY_WME), &table_list, (uint) sizeof(RPL_TABLE_LIST), @@ -8146,15 +8301,15 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli) NullS))) DBUG_RETURN(HA_ERR_OUT_OF_MEM); - bzero(table_list, sizeof(*table_list)); - table_list->db = db_mem; - table_list->alias= table_list->table_name = tname_mem; - table_list->lock_type= TL_WRITE; - table_list->next_global= table_list->next_local= 0; + strmov(db_mem, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len)); + strmov(tname_mem, m_tblnam); + + table_list->init_one_table(db_mem, strlen(db_mem), + tname_mem, strlen(tname_mem), + tname_mem, TL_WRITE); + table_list->table_id= m_table_id; table_list->updating= 1; - strmov(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len)); - strmov(table_list->table_name, m_tblnam); int error= 0; @@ -8338,8 +8493,8 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability todo: to introduce a property for the event (handler?) which forces applying the event in the replace (idempotent) fashion. */ - if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 || - m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER) + if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) || + (m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)) { /* We are using REPLACE semantics and not INSERT IGNORE semantics @@ -8417,7 +8572,7 @@ Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability * int local_error= 0; m_table->next_number_field=0; m_table->auto_increment_field_not_null= FALSE; - if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 || + if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) || m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER) { m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); @@ -8718,9 +8873,7 @@ int Write_rows_log_event::do_exec_row(const Relay_log_info *const rli) { DBUG_ASSERT(m_table != NULL); - int error= - write_row(rli, /* if 1 then overwrite */ - bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1); + int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT); if (error && !thd->is_error()) { @@ -9565,3 +9718,16 @@ st_print_event_info::st_print_event_info() open_cached_file(&body_cache, NULL, NULL, 0, flags); } #endif + + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len, + const Format_description_log_event* description_event) + :Log_event(buf, description_event) +{ + uint8 header_size= description_event->common_header_len; + ident_len = event_len - header_size; + set_if_smaller(ident_len,FN_REFLEN-1); + log_ident= buf + header_size; +} +#endif |