diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 246 |
1 files changed, 185 insertions, 61 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 2d5b7102c7a..996267b171c 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -29,7 +29,6 @@ #include "rpl_rli.h" #include "rpl_mi.h" #include "rpl_filter.h" -#include "rpl_utility.h" #include "rpl_record.h" #include "transaction.h" #include <my_dir.h> @@ -38,6 +37,7 @@ #include <base64.h> #include <my_bitmap.h> +#include "rpl_utility.h" #define log_cs &my_charset_latin1 @@ -666,10 +666,11 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) { server_id= thd->server_id; when= thd->start_time; - cache_stmt= using_trans; + cache_type= (using_trans || stmt_has_updated_trans_table(thd) + ? Log_event::EVENT_TRANSACTIONAL_CACHE : + Log_event::EVENT_STMT_CACHE); } - /** This minimal constructor is for when you are not even sure that there is a valid THD. For example in the server when we are shutting down or @@ -678,8 +679,8 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) */ Log_event::Log_event() - :temp_buf(0), exec_time(0), flags(0), cache_stmt(0), - thd(0) + :temp_buf(0), exec_time(0), flags(0), + cache_type(Log_event::EVENT_INVALID_CACHE), thd(0) { server_id= ::server_id; /* @@ -698,7 +699,7 @@ Log_event::Log_event() Log_event::Log_event(const char* buf, const Format_description_log_event* description_event) - :temp_buf(0), cache_stmt(0) + :temp_buf(0), cache_type(Log_event::EVENT_INVALID_CACHE) { #ifndef MYSQL_CLIENT thd = 0; @@ -1568,37 +1569,14 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr, /* a long CHAR() field: see #37426 */ length= byte1 | (((byte0 & 0x30) ^ 0x30) << 4); type= byte0 | 0x30; - goto beg; - } - - switch (byte0) - { - case MYSQL_TYPE_SET: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_STRING: - type= byte0; - length= byte1; - break; - - default: - - { - char tmp[5]; - my_snprintf(tmp, sizeof(tmp), "%04X", meta); - my_b_printf(file, - "!! Don't know how to handle column type=%d meta=%d (%s)", - type, meta, tmp); - return 0; - } } + else + length = meta & 0xFF; } else length= meta; } - -beg: - switch (type) { case MYSQL_TYPE_LONG: { @@ -1735,6 +1713,33 @@ beg: return 3; } + case MYSQL_TYPE_NEWDATE: + { + uint32 tmp= uint3korr(ptr); + int part; + char buf[10]; + char *pos= &buf[10]; + + /* Copied from field.cc */ + *pos--=0; // End NULL + part=(int) (tmp & 31); + *pos--= (char) ('0'+part%10); + *pos--= (char) ('0'+part/10); + *pos--= ':'; + part=(int) (tmp >> 5 & 15); + *pos--= (char) ('0'+part%10); + *pos--= (char) ('0'+part/10); + *pos--= ':'; + part=(int) (tmp >> 9); + *pos--= (char) ('0'+part%10); part/=10; + *pos--= (char) ('0'+part%10); part/=10; + *pos--= (char) ('0'+part%10); part/=10; + *pos= (char) ('0'+part); + my_b_printf(file , "'%s'", buf); + my_snprintf(typestr, typestr_length, "DATE"); + return 3; + } + case MYSQL_TYPE_DATE: { uint i32= uint3korr(ptr); @@ -1753,7 +1758,7 @@ beg: } case MYSQL_TYPE_ENUM: - switch (length) { + switch (meta & 0xFF) { case 1: my_b_printf(file, "%d", (int) *ptr); my_snprintf(typestr, typestr_length, "ENUM(1 byte)"); @@ -1766,15 +1771,15 @@ beg: return 2; } default: - my_b_printf(file, "!! Unknown ENUM packlen=%d", length); + my_b_printf(file, "!! Unknown ENUM packlen=%d", meta & 0xFF); return 0; } break; case MYSQL_TYPE_SET: - my_b_write_bit(file, ptr , length * 8); - my_snprintf(typestr, typestr_length, "SET(%d bytes)", length); - return length; + my_b_write_bit(file, ptr , (meta & 0xFF) * 8); + my_snprintf(typestr, typestr_length, "SET(%d bytes)", meta & 0xFF); + return meta & 0xFF; case MYSQL_TYPE_BLOB: switch (meta) { @@ -2354,7 +2359,7 @@ Query_log_event::Query_log_event() */ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length, bool using_trans, - bool suppress_use, int errcode) + bool direct, bool suppress_use, int errcode) :Log_event(thd_arg, (thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : @@ -2433,6 +2438,95 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, } else time_zone_len= 0; + + /* + In what follows, we decide whether to write to the binary log or to use a + cache. + */ + LEX *lex= thd->lex; + bool implicit_commit= FALSE; + cache_type= Log_event::EVENT_INVALID_CACHE; + switch (lex->sql_command) + { + case SQLCOM_ALTER_DB: + case SQLCOM_CREATE_FUNCTION: + case SQLCOM_DROP_FUNCTION: + case SQLCOM_DROP_PROCEDURE: + case SQLCOM_INSTALL_PLUGIN: + case SQLCOM_UNINSTALL_PLUGIN: + case SQLCOM_ALTER_TABLESPACE: + implicit_commit= TRUE; + break; + case SQLCOM_DROP_TABLE: + implicit_commit= !(lex->drop_temporary && thd->in_multi_stmt_transaction()); + break; + case SQLCOM_ALTER_TABLE: + case SQLCOM_CREATE_TABLE: + implicit_commit= !((lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) && + thd->in_multi_stmt_transaction()) && + !(lex->select_lex.item_list.elements && + thd->is_current_stmt_binlog_format_row()); + break; + case SQLCOM_SET_OPTION: + implicit_commit= (lex->autocommit ? TRUE : FALSE); + break; + /* + Replace what follows after CF_AUTO_COMMIT_TRANS is backported by: + + default: + implicit_commit= ((sql_command_flags[lex->sql_command] & + CF_AUTO_COMMIT_TRANS)); + break; + */ + case SQLCOM_CREATE_INDEX: + case SQLCOM_TRUNCATE: + case SQLCOM_CREATE_DB: + case SQLCOM_DROP_DB: + case SQLCOM_ALTER_DB_UPGRADE: + case SQLCOM_RENAME_TABLE: + case SQLCOM_DROP_INDEX: + case SQLCOM_CREATE_VIEW: + case SQLCOM_DROP_VIEW: + case SQLCOM_CREATE_TRIGGER: + case SQLCOM_DROP_TRIGGER: + case SQLCOM_CREATE_EVENT: + case SQLCOM_ALTER_EVENT: + case SQLCOM_DROP_EVENT: + case SQLCOM_REPAIR: + case SQLCOM_OPTIMIZE: + case SQLCOM_ANALYZE: + case SQLCOM_CREATE_USER: + case SQLCOM_DROP_USER: + case SQLCOM_RENAME_USER: + case SQLCOM_REVOKE_ALL: + case SQLCOM_REVOKE: + case SQLCOM_GRANT: + case SQLCOM_CREATE_PROCEDURE: + case SQLCOM_CREATE_SPFUNCTION: + case SQLCOM_ALTER_PROCEDURE: + case SQLCOM_ALTER_FUNCTION: + case SQLCOM_ASSIGN_TO_KEYCACHE: + case SQLCOM_PRELOAD_KEYS: + case SQLCOM_FLUSH: + case SQLCOM_CHECK: + implicit_commit= TRUE; + break; + default: + implicit_commit= FALSE; + break; + } + + if (implicit_commit || direct) + { + cache_type= Log_event::EVENT_NO_CACHE; + } + else + { + cache_type= (using_trans || stmt_has_updated_trans_table(thd) + ? Log_event::EVENT_TRANSACTIONAL_CACHE : + Log_event::EVENT_STMT_CACHE); + } + DBUG_ASSERT(cache_type != Log_event::EVENT_INVALID_CACHE); DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %lu", (ulong) flags2, sql_mode)); } @@ -6710,9 +6804,9 @@ Execute_load_query_log_event(THD *thd_arg, const char* query_arg, ulong query_length_arg, uint fn_pos_start_arg, uint fn_pos_end_arg, enum_load_dup_handling dup_handling_arg, - bool using_trans, bool suppress_use, + bool using_trans, bool direct, bool suppress_use, int errcode): - Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, + Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, direct, suppress_use, errcode), file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg), fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg) @@ -7280,7 +7374,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) We also call the mysql_reset_thd_for_next_command(), since this is the logical start of the next "statement". Note that this - call might reset the value of current_stmt_binlog_row_based, so + call might reset the value of current_stmt_binlog_format, so we need to do any changes to that value after this function. */ lex_start(thd); @@ -7292,16 +7386,12 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ thd->transaction.stmt.modified_non_trans_table= FALSE; /* - Check if the slave is set to use SBR. If so, it should switch - to using RBR until the end of the "statement", i.e., next - STMT_END_F or next error. + This is a row injection, so we flag the "statement" as + such. Note that this code is called both when the slave does row + injections and when the BINLOG statement is used to do row + injections. */ - if (!thd->current_stmt_binlog_row_based && - mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG)) - { - thd->set_current_stmt_binlog_row_based(); - } - + thd->lex->set_stmt_row_injection(); /* There are a few flags that are replicated with each row event. @@ -7350,11 +7440,18 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ { + DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p", + rli->tables_to_lock)); RPL_TABLE_LIST *ptr= rli->tables_to_lock; for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global)) { - if (ptr->m_tabledef.compatible_with(rli, ptr->table)) + TABLE *conv_table; + if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli), + ptr->table, &conv_table)) { + DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master", + ptr->table->s->db.str, + ptr->table->s->table_name.str)); /* We should not honour --slave-skip-errors at this point as we are having severe errors which should not be skiped. @@ -7365,12 +7462,17 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); DBUG_RETURN(ERR_BAD_TABLE_DEF); } + DBUG_PRINT("debug", ("Table: %s.%s is compatible with master" + " - conv_table: %p", + ptr->table->s->db.str, + ptr->table->s->table_name.str, conv_table)); + ptr->m_conv_table= conv_table; } } /* - ... and then we add all the tables to the table map and remove - them from tables to lock. + ... and then we add all the tables to the table map and but keep + them in the tables to lock list. We also invalidate the query cache for all the tables, since they will now be changed. @@ -7537,7 +7639,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) error= 0; } - if (!cache_stmt) + if (!use_trans_cache()) { DBUG_PRINT("info", ("Marked that we need to keep log")); thd->variables.option_bits|= OPTION_KEEP_LOG; @@ -7550,7 +7652,14 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table, get_type_str(), RPL_LOG_NAME, (ulong) log_pos); - thd->reset_current_stmt_binlog_row_based(); + /* + @todo We should probably not call + reset_current_stmt_binlog_format_row() from here. + + Note: this applies to log_event_old.cc too. + /Sven + */ + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); thd->is_slave_error= 1; DBUG_RETURN(error); @@ -7611,7 +7720,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) (assume the last master's transaction is ignored by the slave because of replicate-ignore rules). */ - error= thd->binlog_flush_pending_rows_event(true); + error= thd->binlog_flush_pending_rows_event(TRUE); /* If this event is not in a transaction, the call below will, if some @@ -7634,7 +7743,17 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) event flushed. */ - thd->reset_current_stmt_binlog_row_based(); + /* + @todo We should probably not call + reset_current_stmt_binlog_format_row() from here. + + Note: this applies to log_event_old.cc too + + Btw, the previous comment about transactional engines does not + seem related to anything that happens here. + /Sven + */ + thd->reset_current_stmt_binlog_format_row(); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0); } @@ -7841,7 +7960,10 @@ int Table_map_log_event::save_field_metadata() DBUG_ENTER("Table_map_log_event::save_field_metadata"); int index= 0; for (unsigned int i= 0 ; i < m_table->s->fields ; i++) + { + DBUG_PRINT("debug", ("field_type: %d", m_coltype[i])); index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]); + } DBUG_RETURN(index); } #endif /* !defined(MYSQL_CLIENT) */ @@ -7853,8 +7975,8 @@ int Table_map_log_event::save_field_metadata() */ #if !defined(MYSQL_CLIENT) Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, - bool is_transactional, uint16 flags) - : Log_event(thd, 0, true), + bool is_transactional) + : Log_event(thd, 0, is_transactional), m_table(tbl), m_dbnam(tbl->s->db.str), m_dblen(m_dbnam ? tbl->s->db.length : 0), @@ -7863,7 +7985,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, m_colcnt(tbl->s->fields), m_memory(NULL), m_table_id(tid), - m_flags(flags), + m_flags(TM_BIT_LEN_EXACT_F), m_data_size(0), m_field_metadata(0), m_field_metadata_size(0), @@ -8119,8 +8241,10 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli) inside Relay_log_info::clear_tables_to_lock() by calling the table_def destructor explicitly. */ - new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt, - m_field_metadata, m_field_metadata_size, m_null_bits); + new (&table_list->m_tabledef) + table_def(m_coltype, m_colcnt, + m_field_metadata, m_field_metadata_size, + m_null_bits, m_flags); table_list->m_tabledef_valid= TRUE; /* |