diff options
author | unknown <mkindahl@dl145h.mysql.com> | 2008-01-31 17:46:50 +0100 |
---|---|---|
committer | unknown <mkindahl@dl145h.mysql.com> | 2008-01-31 17:46:50 +0100 |
commit | 248c752b2a320d62d198043866e973ebcdda436c (patch) | |
tree | c327f90b44e25f92a820d5886ba6220cf6d9c2c8 /sql/log_event.cc | |
parent | 24cda91887c1a230b4bc70e859879caede1f909e (diff) | |
parent | 0a51f0ba27351180209631c52d254bbd2e607a0b (diff) | |
download | mariadb-git-248c752b2a320d62d198043866e973ebcdda436c.tar.gz |
Merge dl145h.mysql.com:/data0/mkindahl/mysql-5.1
into dl145h.mysql.com:/data0/mkindahl/mysql-5.1-rpl-merge
client/client_priv.h:
Auto merged
include/my_sys.h:
Auto merged
mysql-test/mysql-test-run.pl:
Auto merged
mysql-test/lib/mtr_report.pl:
Auto merged
mysql-test/suite/rpl/t/rpl_err_ignoredtable.test:
Auto merged
sql/item_cmpfunc.cc:
Auto merged
sql/log_event.cc:
Auto merged
sql/mysql_priv.h:
Auto merged
sql/mysqld.cc:
Auto merged
sql/set_var.cc:
Auto merged
sql/set_var.h:
Auto merged
sql/slave.cc:
Auto merged
sql/sql_acl.cc:
Auto merged
sql/sql_class.h:
Auto merged
sql/sql_parse.cc:
Auto merged
sql/sql_repl.cc:
Auto merged
sql/sql_view.cc:
Auto merged
mysql-test/suite/rpl/r/rpl_invoked_features.result:
Manual merge.
mysql-test/suite/rpl/t/rpl_invoked_features.test:
Manual merge.
sql/log.cc:
Manual merge.
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 571 |
1 files changed, 377 insertions, 194 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index cf03dd5bf44..df0d1e8a020 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -36,7 +36,17 @@ #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") -#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) && !defined(DBUG_OFF) && !defined(_lint) + +/* + Size of buffer for printing a double in format %.<PREC>g + + optional '-' + optional zero + '.' + PREC digits + 'e' + sign + + exponent digits + '\0' +*/ +#define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1) + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) static const char *HA_ERR(int i) { switch (i) { @@ -90,7 +100,28 @@ static const char *HA_ERR(int i) case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE"; case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT"; } - return "<unknown error>"; + return 0; +} + +/** + macro to call from different branches of Rows_log_event::do_apply_event +*/ +static void inline slave_rows_error_report(enum loglevel level, int ha_error, + Relay_log_info const *rli, THD *thd, + TABLE *table, const char * type, + const char *log_name, ulong pos) +{ + const char *handler_error= HA_ERR(ha_error); + rli->report(level, thd->net.client_last_errno, + "Could not execute %s event on table %s.%s;" + "%s%s handler error %s; " + "the event's master log %s, end_log_pos %lu", + type, table->s->db.str, + table->s->table_name.str, + thd->net.client_last_error[0] != 0 ? thd->net.client_last_error : "", + thd->net.client_last_error[0] != 0 ? ";" : "", + handler_error == NULL? "<unknown>" : handler_error, + log_name, pos); } #endif @@ -460,9 +491,9 @@ static void print_set_option(IO_CACHE* file, uint32 bits_changed, returns the human readable name of the event's type */ -const char* Log_event::get_type_str() +const char* Log_event::get_type_str(Log_event_type type) { - switch(get_type_code()) { + switch(type) { case START_EVENT_V3: return "Start_v3"; case STOP_EVENT: return "Stop"; case QUERY_EVENT: return "Query"; @@ -480,6 +511,9 @@ const char* Log_event::get_type_str() case USER_VAR_EVENT: return "User var"; case FORMAT_DESCRIPTION_EVENT: return "Format_desc"; case TABLE_MAP_EVENT: return "Table_map"; + case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old"; + case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old"; + case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old"; case WRITE_ROWS_EVENT: return "Write_rows"; case UPDATE_ROWS_EVENT: return "Update_rows"; case DELETE_ROWS_EVENT: return "Delete_rows"; @@ -490,6 +524,11 @@ const char* Log_event::get_type_str() } } +const char* Log_event::get_type_str() +{ + return get_type_str(get_type_code()); +} + /* Log_event::Log_event() @@ -997,6 +1036,8 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, DBUG_ENTER("Log_event::read_log_event(char*,...)"); DBUG_ASSERT(description_event != 0); DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version)); + DBUG_DUMP("data", (unsigned char*) buf, event_len); + /* Check the integrity */ if (event_len < EVENT_LEN_OFFSET || buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT || @@ -1006,94 +1047,134 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, DBUG_RETURN(NULL); // general sanity check - will fail on a partial read } - switch(buf[EVENT_TYPE_OFFSET]) { - case QUERY_EVENT: - ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT); - break; - case LOAD_EVENT: - ev = new Load_log_event(buf, event_len, description_event); - break; - case NEW_LOAD_EVENT: - ev = new Load_log_event(buf, event_len, description_event); - break; - case ROTATE_EVENT: - ev = new Rotate_log_event(buf, event_len, description_event); - break; + uint event_type= buf[EVENT_TYPE_OFFSET]; + if (event_type > description_event->number_of_event_types && + event_type != FORMAT_DESCRIPTION_EVENT) + { + /* + It is unsafe to use the description_event if its post_header_len + array does not include the event type. + */ + DBUG_PRINT("error", ("event type %d found, but the current " + "Format_description_log_event supports only %d event " + "types", event_type, + description_event->number_of_event_types)); + ev= NULL; + } + else + { + /* + In some previuos versions (see comment in + Format_description_log_event::Format_description_log_event(char*,...)), + event types were assigned different id numbers than in the + present version. In order to replicate from such versions to the + present version, we must map those event type id's to our event + type id's. The mapping is done with the event_type_permutation + array, which was set up when the Format_description_log_event + was read. + */ + if (description_event->event_type_permutation) + { + IF_DBUG({ + int new_event_type= + description_event->event_type_permutation[event_type]; + DBUG_PRINT("info", + ("converting event type %d to %d (%s)", + event_type, new_event_type, + get_type_str((Log_event_type)new_event_type))); + }); + event_type= description_event->event_type_permutation[event_type]; + } + + switch(event_type) { + case QUERY_EVENT: + ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT); + break; + case LOAD_EVENT: + ev = new Load_log_event(buf, event_len, description_event); + break; + case NEW_LOAD_EVENT: + ev = new Load_log_event(buf, event_len, description_event); + break; + case ROTATE_EVENT: + ev = new Rotate_log_event(buf, event_len, description_event); + break; #ifdef HAVE_REPLICATION - case SLAVE_EVENT: /* can never happen (unused event) */ - ev = new Slave_log_event(buf, event_len); - break; + case SLAVE_EVENT: /* can never happen (unused event) */ + ev = new Slave_log_event(buf, event_len); + break; #endif /* HAVE_REPLICATION */ - case CREATE_FILE_EVENT: - ev = new Create_file_log_event(buf, event_len, description_event); - break; - case APPEND_BLOCK_EVENT: - ev = new Append_block_log_event(buf, event_len, description_event); - break; - case DELETE_FILE_EVENT: - ev = new Delete_file_log_event(buf, event_len, description_event); - break; - case EXEC_LOAD_EVENT: - ev = new Execute_load_log_event(buf, event_len, description_event); - break; - case START_EVENT_V3: /* this is sent only by MySQL <=4.x */ - ev = new Start_log_event_v3(buf, description_event); - break; - case STOP_EVENT: - ev = new Stop_log_event(buf, description_event); - break; - case INTVAR_EVENT: - ev = new Intvar_log_event(buf, description_event); - break; - case XID_EVENT: - ev = new Xid_log_event(buf, description_event); - break; - case RAND_EVENT: - ev = new Rand_log_event(buf, description_event); - break; - case USER_VAR_EVENT: - ev = new User_var_log_event(buf, description_event); - break; - case FORMAT_DESCRIPTION_EVENT: - ev = new Format_description_log_event(buf, event_len, description_event); - break; + case CREATE_FILE_EVENT: + ev = new Create_file_log_event(buf, event_len, description_event); + break; + case APPEND_BLOCK_EVENT: + ev = new Append_block_log_event(buf, event_len, description_event); + break; + case DELETE_FILE_EVENT: + ev = new Delete_file_log_event(buf, event_len, description_event); + break; + case EXEC_LOAD_EVENT: + ev = new Execute_load_log_event(buf, event_len, description_event); + break; + case START_EVENT_V3: /* this is sent only by MySQL <=4.x */ + ev = new Start_log_event_v3(buf, description_event); + break; + case STOP_EVENT: + ev = new Stop_log_event(buf, description_event); + break; + case INTVAR_EVENT: + ev = new Intvar_log_event(buf, description_event); + break; + case XID_EVENT: + ev = new Xid_log_event(buf, description_event); + break; + case RAND_EVENT: + ev = new Rand_log_event(buf, description_event); + break; + case USER_VAR_EVENT: + ev = new User_var_log_event(buf, description_event); + break; + case FORMAT_DESCRIPTION_EVENT: + ev = new Format_description_log_event(buf, event_len, description_event); + break; #if defined(HAVE_REPLICATION) - case PRE_GA_WRITE_ROWS_EVENT: - ev = new Write_rows_log_event_old(buf, event_len, description_event); - break; - case PRE_GA_UPDATE_ROWS_EVENT: - ev = new Update_rows_log_event_old(buf, event_len, description_event); - break; - case PRE_GA_DELETE_ROWS_EVENT: - ev = new Delete_rows_log_event_old(buf, event_len, description_event); - break; - case WRITE_ROWS_EVENT: - ev = new Write_rows_log_event(buf, event_len, description_event); - break; - case UPDATE_ROWS_EVENT: - ev = new Update_rows_log_event(buf, event_len, description_event); - break; - case DELETE_ROWS_EVENT: - ev = new Delete_rows_log_event(buf, event_len, description_event); - break; - case TABLE_MAP_EVENT: - ev = new Table_map_log_event(buf, event_len, description_event); - break; + case PRE_GA_WRITE_ROWS_EVENT: + ev = new Write_rows_log_event_old(buf, event_len, description_event); + break; + case PRE_GA_UPDATE_ROWS_EVENT: + ev = new Update_rows_log_event_old(buf, event_len, description_event); + break; + case PRE_GA_DELETE_ROWS_EVENT: + ev = new Delete_rows_log_event_old(buf, event_len, description_event); + break; + case WRITE_ROWS_EVENT: + ev = new Write_rows_log_event(buf, event_len, description_event); + break; + case UPDATE_ROWS_EVENT: + ev = new Update_rows_log_event(buf, event_len, description_event); + break; + case DELETE_ROWS_EVENT: + ev = new Delete_rows_log_event(buf, event_len, description_event); + break; + case TABLE_MAP_EVENT: + ev = new Table_map_log_event(buf, event_len, description_event); + break; #endif - case BEGIN_LOAD_QUERY_EVENT: - ev = new Begin_load_query_log_event(buf, event_len, description_event); - break; - case EXECUTE_LOAD_QUERY_EVENT: - ev= new Execute_load_query_log_event(buf, event_len, description_event); - break; - case INCIDENT_EVENT: - ev = new Incident_log_event(buf, event_len, description_event); - break; - default: - DBUG_PRINT("error",("Unknown event code: %d", - (int) buf[EVENT_TYPE_OFFSET])); - ev= NULL; - break; + case BEGIN_LOAD_QUERY_EVENT: + ev = new Begin_load_query_log_event(buf, event_len, description_event); + break; + case EXECUTE_LOAD_QUERY_EVENT: + ev= new Execute_load_query_log_event(buf, event_len, description_event); + break; + case INCIDENT_EVENT: + ev = new Incident_log_event(buf, event_len, description_event); + break; + default: + DBUG_PRINT("error",("Unknown event code: %d", + (int) buf[EVENT_TYPE_OFFSET])); + ev= NULL; + break; + } } DBUG_PRINT("read_event", ("%s(type_code: %d; event_len: %d)", @@ -1632,6 +1713,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, DBUG_ASSERT(thd_arg->variables.character_set_client->number < 256*256); DBUG_ASSERT(thd_arg->variables.collation_connection->number < 256*256); DBUG_ASSERT(thd_arg->variables.collation_server->number < 256*256); + DBUG_ASSERT(thd_arg->variables.character_set_client->mbminlen == 1); int2store(charset, thd_arg->variables.character_set_client->number); int2store(charset+2, thd_arg->variables.collation_connection->number); int2store(charset+4, thd_arg->variables.collation_server->number); @@ -2135,7 +2217,7 @@ void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) print_query_header(&cache, print_event_info); my_b_write(&cache, (uchar*) query, q_len); - my_b_printf(&cache, "%s\n", print_event_info->delimiter); + my_b_printf(&cache, "\n%s\n", print_event_info->delimiter); } #endif /* MYSQL_CLIENT */ @@ -2580,6 +2662,14 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) my_b_printf(&cache,"ROLLBACK%s\n", print_event_info->delimiter); #endif } + if (temp_buf && + print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER && + !print_event_info->short_form) + { + my_b_printf(&cache, "BINLOG '\n"); + print_base64(&cache, print_event_info, FALSE); + print_event_info->printed_fd_event= TRUE; + } DBUG_VOID_RETURN; } #endif /* MYSQL_CLIENT */ @@ -2715,7 +2805,7 @@ int Start_log_event_v3::do_apply_event(Relay_log_info const *rli) Format_description_log_event:: Format_description_log_event(uint8 binlog_ver, const char* server_ver) - :Start_log_event_v3() + :Start_log_event_v3(), event_type_permutation(0) { binlog_version= binlog_ver; switch (binlog_ver) { @@ -2842,7 +2932,7 @@ Format_description_log_event(const char* buf, const Format_description_log_event* description_event) - :Start_log_event_v3(buf, description_event) + :Start_log_event_v3(buf, description_event), event_type_permutation(0) { DBUG_ENTER("Format_description_log_event::Format_description_log_event(char*,...)"); buf+= LOG_EVENT_MINIMAL_HEADER_LEN; @@ -2857,6 +2947,65 @@ Format_description_log_event(const char* buf, number_of_event_types* sizeof(*post_header_len), MYF(0)); calc_server_version_split(); + + /* + In some previous versions, the events were given other event type + id numbers than in the present version. When replicating from such + a version, we therefore set up an array that maps those id numbers + to the id numbers of the present server. + + If post_header_len is null, it means malloc failed, and is_valid + will fail, so there is no need to do anything. + + The trees which have wrong event id's are: + mysql-5.1-wl2325-5.0-drop6p13-alpha, mysql-5.1-wl2325-5.0-drop6, + mysql-5.1-wl2325-5.0, mysql-5.1-wl2325-no-dd (`grep -C2 + BEGIN_LOAD_QUERY_EVENT /home/bk/ * /sql/log_event.h`). The + corresponding version (`grep mysql, configure.in` in those trees) + strings are 5.2.2-a_drop6p13-alpha, 5.2.2-a_drop6p13c, + 5.1.5-a_drop5p20, 5.1.2-a_drop5p5. + */ + if (post_header_len && + (strncmp(server_version, "5.1.2-a_drop5", 13) == 0 || + strncmp(server_version, "5.1.5-a_drop5", 13) == 0 || + strncmp(server_version, "5.2.2-a_drop6", 13) == 0)) + { + if (number_of_event_types != 22) + { + DBUG_PRINT("info", (" number_of_event_types=%d", + number_of_event_types)); + /* this makes is_valid() return false. */ + my_free(post_header_len, MYF(MY_ALLOW_ZERO_PTR)); + post_header_len= NULL; + DBUG_VOID_RETURN; + } + static const uint8 perm[23]= + { + UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT, + INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT, + APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT, + NEW_LOAD_EVENT, + RAND_EVENT, USER_VAR_EVENT, + FORMAT_DESCRIPTION_EVENT, + TABLE_MAP_EVENT, + PRE_GA_WRITE_ROWS_EVENT, + PRE_GA_UPDATE_ROWS_EVENT, + PRE_GA_DELETE_ROWS_EVENT, + XID_EVENT, + BEGIN_LOAD_QUERY_EVENT, + EXECUTE_LOAD_QUERY_EVENT, + }; + event_type_permutation= perm; + /* + Since we use (permuted) event id's to index the post_header_len + array, we need to permute the post_header_len array too. + */ + uint8 post_header_len_temp[23]; + for (int i= 1; i < 23; i++) + post_header_len_temp[perm[i] - 1]= post_header_len[i - 1]; + for (int i= 0; i < 22; i++) + post_header_len[i] = post_header_len_temp[i]; + } DBUG_VOID_RETURN; } @@ -4267,6 +4416,7 @@ Xid_log_event(const char* buf, #ifndef MYSQL_CLIENT bool Xid_log_event::write(IO_CACHE* file) { + DBUG_EXECUTE_IF("do_not_write_xid", return 0;); return write_header(file, sizeof(xid)) || my_b_safe_write(file, (uchar*) &xid, sizeof(xid)); } @@ -4520,8 +4670,10 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) switch (type) { case REAL_RESULT: double real_val; + char real_buf[FMT_G_BUFSIZE(14)]; float8get(real_val, val); - my_b_printf(&cache, ":=%.14g%s\n", real_val, print_event_info->delimiter); + my_sprintf(real_buf, (real_buf, "%.14g", real_val)); + my_b_printf(&cache, ":=%s%s\n", real_buf, print_event_info->delimiter); break; case INT_RESULT: char int_buf[22]; @@ -4925,7 +5077,7 @@ Create_file_log_event(THD* thd_arg, sql_exchange* ex, const char* db_arg, const char* table_name_arg, List<Item>& fields_arg, enum enum_duplicates handle_dup, bool ignore, - char* block_arg, uint block_len_arg, bool using_trans) + uchar* block_arg, uint block_len_arg, bool using_trans) :Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore, using_trans), fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg), @@ -5023,8 +5175,8 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len, Load_log_event::get_data_size() + create_file_header_len + 1); if (len < block_offset) - return; - block = (char*)buf + block_offset; + DBUG_VOID_RETURN; + block = (uchar*)buf + block_offset; block_len = len - block_offset; } else @@ -5182,7 +5334,7 @@ err: #ifndef MYSQL_CLIENT Append_block_log_event::Append_block_log_event(THD *thd_arg, const char *db_arg, - char *block_arg, + uchar *block_arg, uint block_len_arg, bool using_trans) :Log_event(thd_arg,0, using_trans), block(block_arg), @@ -5208,7 +5360,7 @@ Append_block_log_event::Append_block_log_event(const char* buf, uint len, if (len < total_header_len) DBUG_VOID_RETURN; file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET); - block= (char*)buf + total_header_len; + block= (uchar*)buf + total_header_len; block_len= len - total_header_len; DBUG_VOID_RETURN; } @@ -5600,7 +5752,7 @@ err: #ifndef MYSQL_CLIENT Begin_load_query_log_event:: -Begin_load_query_log_event(THD* thd_arg, const char* db_arg, char* block_arg, +Begin_load_query_log_event(THD* thd_arg, const char* db_arg, uchar* block_arg, uint block_len_arg, bool using_trans) :Append_block_log_event(thd_arg, db_arg, block_arg, block_len_arg, using_trans) @@ -5732,12 +5884,12 @@ void Execute_load_query_log_event::print(FILE* file, my_b_printf(&cache, " REPLACE"); my_b_printf(&cache, " INTO"); my_b_write(&cache, (uchar*) query + fn_pos_end, q_len-fn_pos_end); - my_b_printf(&cache, "%s\n", print_event_info->delimiter); + my_b_printf(&cache, "\n%s\n", print_event_info->delimiter); } else { my_b_write(&cache, (uchar*) query, q_len); - my_b_printf(&cache, "%s\n", print_event_info->delimiter); + my_b_printf(&cache, "\n%s\n", print_event_info->delimiter); } if (!print_event_info->short_form) @@ -6180,7 +6332,6 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) { DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)"); int error= 0; - /* If m_table_id == ~0UL, then we have a dummy event that does not contain any data. In that case, we just remove all tables in the @@ -6226,6 +6377,24 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ lex_start(thd); + /* + There are a few flags that are replicated with each row event. + Make sure to set/clear them before executing the main body of + the event. + */ + if (get_flags(NO_FOREIGN_KEY_CHECKS_F)) + thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS; + else + thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; + + if (get_flags(RELAXED_UNIQUE_CHECKS_F)) + thd->options|= OPTION_RELAXED_UNIQUE_CHECKS; + else + thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; + /* A small test to verify that objects have consistent types */ + DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); + + while ((error= lock_tables(thd, rli->tables_to_lock, rli->tables_to_lock_count, &need_reopen))) { @@ -6360,22 +6529,6 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) So we call set_time(), like in SBR. Presently it changes nothing. */ thd->set_time((time_t)when); - /* - There are a few flags that are replicated with each row event. - Make sure to set/clear them before executing the main body of - the event. - */ - if (get_flags(NO_FOREIGN_KEY_CHECKS_F)) - thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS; - else - thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; - - if (get_flags(RELAXED_UNIQUE_CHECKS_F)) - thd->options|= OPTION_RELAXED_UNIQUE_CHECKS; - else - thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; - /* A small test to verify that objects have consistent types */ - DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); /* Now we are in a statement and will stay in a statement until we @@ -6408,8 +6561,9 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) if (!get_flags(COMPLETE_ROWS_F)) bitmap_intersect(table->write_set,&m_cols); + this->slave_exec_mode= slave_exec_mode_options; // fix the mode + // Do event specific preparations - error= do_before_row_operations(rli); // row processing loop @@ -6428,23 +6582,41 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) { case 0: break; - - /* Some recoverable errors */ + /* + The following list of "idempotent" errors + means that an error from the list might happen + because of idempotent (more than once) + applying of a binlog file. + Notice, that binlog has a ddl operation its + second applying may cause + + case HA_ERR_TABLE_DEF_CHANGED: + case HA_ERR_CANNOT_ADD_FOREIGN: + + which are not included into to the list. + */ case HA_ERR_RECORD_CHANGED: case HA_ERR_RECORD_DELETED: case HA_ERR_KEY_NOT_FOUND: case HA_ERR_END_OF_FILE: - /* Idempotency support: OK if tuple does not exist */ + case HA_ERR_FOUND_DUPP_KEY: + case HA_ERR_FOUND_DUPP_UNIQUE: + case HA_ERR_FOREIGN_DUPLICATE_KEY: + case HA_ERR_NO_REFERENCED_ROW: + case HA_ERR_ROW_IS_REFERENCED: + DBUG_PRINT("info", ("error: %s", HA_ERR(error))); - error= 0; + if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1) + { + if (global_system_variables.log_warnings) + slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table, + get_type_str(), + RPL_LOG_NAME, (ulong) log_pos); + error= 0; + } break; - + default: - rli->report(ERROR_LEVEL, - thd->is_error() ? thd->main_da.sql_errno() : 0, - "Error in %s event: row application failed. %s", - get_type_str(), - thd->is_error() ? thd->main_da.message() : ""); thd->is_slave_error= 1; break; } @@ -6488,17 +6660,14 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ if (rli->tables_to_lock && get_flags(STMT_END_F)) const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); - + if (error) { /* error has occured during the transaction */ - rli->report(ERROR_LEVEL, - thd->is_error() ? thd->main_da.sql_errno() : 0, - "Error in %s event: error during transaction execution " - "on table %s.%s. %s", - get_type_str(), table->s->db.str, - table->s->table_name.str, - thd->is_error() ? thd->main_da.message() : ""); - + slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table, + get_type_str(), RPL_LOG_NAME, (ulong) log_pos); + } + if (error) + { /* If one day we honour --skip-slave-errors in row-based replication, and the error should be skipped, then we would clear mappings, rollback, @@ -6610,6 +6779,7 @@ Rows_log_event::do_update_pos(Relay_log_info *rli) */ thd->reset_current_stmt_binlog_row_based(); + rli->cleanup_context(thd, 0); if (error == 0) { @@ -7299,43 +7469,50 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability { int error= 0; - /* - We are using REPLACE semantics and not INSERT IGNORE semantics - when writing rows, that is: new rows replace old rows. We need to - inform the storage engine that it should use this behaviour. + /** + todo: to introduce a property for the event (handler?) which forces + applying the event in the replace (idempotent) fashion. */ + if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 || + m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER) + { + /* + We are using REPLACE semantics and not INSERT IGNORE semantics + when writing rows, that is: new rows replace old rows. We need to + inform the storage engine that it should use this behaviour. + */ + + /* Tell the storage engine that we are using REPLACE semantics. */ + thd->lex->duplicates= DUP_REPLACE; + + /* + Pretend we're executing a REPLACE command: this is needed for + InnoDB and NDB Cluster since they are not (properly) checking the + lex->duplicates flag. + */ + thd->lex->sql_command= SQLCOM_REPLACE; + /* + Do not raise the error flag in case of hitting to an unique attribute + */ + m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); + /* + NDB specific: update from ndb master wrapped as Write_rows + so that the event should be applied to replace slave's row + */ + m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + /* + NDB specific: if update from ndb master wrapped as Write_rows + does not find the row it's assumed idempotent binlog applying + is taking place; don't raise the error. + */ + m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY); + /* + TODO: the cluster team (Tomas?) says that it's better if the engine knows + how many rows are going to be inserted, then it can allocate needed memory + from the start. + */ + } - /* Tell the storage engine that we are using REPLACE semantics. */ - thd->lex->duplicates= DUP_REPLACE; - - /* - Pretend we're executing a REPLACE command: this is needed for - InnoDB and NDB Cluster since they are not (properly) checking the - lex->duplicates flag. - */ - thd->lex->sql_command= SQLCOM_REPLACE; - /* - Do not raise the error flag in case of hitting to an unique attribute - */ - m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); - /* - NDB specific: update from ndb master wrapped as Write_rows - */ - /* - so that the event should be applied to replace slave's row - */ - m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); - /* - NDB specific: if update from ndb master wrapped as Write_rows - does not find the row it's assumed idempotent binlog applying - is taking place; don't raise the error. - */ - m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY); - /* - TODO: the cluster team (Tomas?) says that it's better if the engine knows - how many rows are going to be inserted, then it can allocate needed memory - from the start. - */ m_table->file->ha_start_bulk_insert(0); /* We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill @@ -7357,18 +7534,23 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability } int -Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, +Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, int error) { int local_error= 0; - m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); - m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); - /* - reseting the extra with - table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); - fires bug#27077 - todo: explain or fix - */ + if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 || + m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER) + { + m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); + /* + resetting the extra with + table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); + fires bug#27077 + explanation: file->reset() performs this duty + ultimately. Still todo: fix + */ + } if ((local_error= m_table->file->ha_end_bulk_insert())) { m_table->file->print_error(local_error, MYF(0)); @@ -7487,23 +7669,22 @@ Rows_log_event::write_row(const Relay_log_info *const rli, while ((error= table->file->ha_write_row(table->record[0]))) { - if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT) - { - table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */ - DBUG_RETURN(error); - } - if ((keynum= table->file->get_dup_key(error)) < 0) + if (error == HA_ERR_LOCK_DEADLOCK || + error == HA_ERR_LOCK_WAIT_TIMEOUT || + (keynum= table->file->get_dup_key(error)) < 0 || + !overwrite) { - DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum)); - table->file->print_error(error, MYF(0)); + DBUG_PRINT("info",("get_dup_key returns %d)", keynum)); /* - We failed to retrieve the duplicate key + Deadlock, waiting for lock or just an error from the handler + such as HA_ERR_FOUND_DUPP_KEY when overwrite is false. + Retrieval of the duplicate key number may fail - either because the error was not "duplicate key" error - or because the information which key is not available */ + table->file->print_error(error, MYF(0)); DBUG_RETURN(error); } - /* We need to retrieve the old row into record[1] to be able to either update or delete the offending record. We either: @@ -7641,14 +7822,16 @@ int Write_rows_log_event::do_exec_row(const Relay_log_info *const rli) { DBUG_ASSERT(m_table != NULL); - int error= write_row(rli, TRUE /* overwrite */); - + int error= + write_row(rli, /* if 1 then overwrite */ + bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1); + if (error && !thd->is_error()) { DBUG_ASSERT(0); my_error(ER_UNKNOWN_ERROR, MYF(0)); } - + return error; } |