diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 750 |
1 files changed, 526 insertions, 224 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index f884b2996f0..13de7523054 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -46,6 +46,7 @@ #include "rpl_record.h" #include "transaction.h" #include <my_dir.h> +#include "sql_show.h" #endif /* MYSQL_CLIENT */ @@ -471,29 +472,28 @@ inline bool unexpected_error_code(int unexpected_error) pretty_print_str() */ -static char *pretty_print_str(char *packet, const char *str, int len) +static void +pretty_print_str(String *packet, const char *str, int len) { const char *end= str + len; - char *pos= packet; - *pos++= '\''; + packet->append(STRING_WITH_LEN("'")); while (str < end) { char c; switch ((c=*str++)) { - case '\n': *pos++= '\\'; *pos++= 'n'; break; - case '\r': *pos++= '\\'; *pos++= 'r'; break; - case '\\': *pos++= '\\'; *pos++= '\\'; break; - case '\b': *pos++= '\\'; *pos++= 'b'; break; - case '\t': *pos++= '\\'; *pos++= 't'; break; - case '\'': *pos++= '\\'; *pos++= '\''; break; - case 0 : *pos++= '\\'; *pos++= '0'; break; + case '\n': packet->append(STRING_WITH_LEN("\\n")); break; + case '\r': packet->append(STRING_WITH_LEN("\\r")); break; + case '\\': packet->append(STRING_WITH_LEN("\\\\")); break; + case '\b': packet->append(STRING_WITH_LEN("\\b")); break; + case '\t': packet->append(STRING_WITH_LEN("\\t")); break; + case '\'': packet->append(STRING_WITH_LEN("\\'")); break; + case 0 : packet->append(STRING_WITH_LEN("\\0")); break; default: - *pos++= c; + packet->append(&c, 1); break; } } - *pos++= '\''; - return pos; + packet->append(STRING_WITH_LEN("'")); } #endif /* !MYSQL_CLIENT */ @@ -731,6 +731,7 @@ const char* Log_event::get_type_str(Log_event_type type) case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query"; case INCIDENT_EVENT: return "Incident"; case ANNOTATE_ROWS_EVENT: return "Annotate_rows"; + case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint"; default: return "Unknown"; /* impossible */ } } @@ -905,9 +906,9 @@ int Log_event::do_update_pos(Relay_log_info *rli) Log_event::enum_skip_reason Log_event::do_shall_skip(Relay_log_info *rli) { - DBUG_PRINT("info", ("ev->server_id=%lu, ::server_id=%lu," - " rli->replicate_same_server_id=%d," - " rli->slave_skip_counter=%d", + DBUG_PRINT("info", ("ev->server_id: %lu, ::server_id: %lu," + " rli->replicate_same_server_id: %d," + " rli->slave_skip_counter: %lu", (ulong) server_id, (ulong) ::server_id, rli->replicate_same_server_id, rli->slave_skip_counter)); @@ -926,7 +927,7 @@ Log_event::do_shall_skip(Relay_log_info *rli) Log_event::pack_info() */ -void Log_event::pack_info(Protocol *protocol) +void Log_event::pack_info(THD *thd, Protocol *protocol) { protocol->store("", &my_charset_bin); } @@ -935,7 +936,8 @@ void Log_event::pack_info(Protocol *protocol) /** Only called by SHOW BINLOG EVENTS */ -int Log_event::net_send(Protocol *protocol, const char* log_name, my_off_t pos) +int Log_event::net_send(THD *thd, Protocol *protocol, const char* log_name, + my_off_t pos) { const char *p= strrchr(log_name, FN_LIBCHAR); const char *event_type; @@ -949,7 +951,7 @@ int Log_event::net_send(Protocol *protocol, const char* log_name, my_off_t pos) protocol->store(event_type, strlen(event_type), &my_charset_bin); protocol->store((uint32) server_id); protocol->store((ulonglong) log_pos); - pack_info(protocol); + pack_info(thd, protocol); return protocol->write(); } #endif /* HAVE_REPLICATION */ @@ -1085,6 +1087,9 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) ulong now; bool ret; DBUG_ENTER("Log_event::write_header"); + DBUG_PRINT("enter", ("filepos: %lld length: %lu type: %d", + (longlong) my_b_tell(file), event_data_length, + (int) get_type_code())); /* Store number of bytes that will be written by this event */ data_written= event_data_length + sizeof(header); @@ -1348,7 +1353,7 @@ failed my_b_read")); Log_event *res= 0; #ifndef max_allowed_packet THD *thd=current_thd; - uint max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~(ulong)0; + uint max_allowed_packet= thd ? slave_max_allowed_packet:~(ulong)0; #endif if (data_len > max_allowed_packet) @@ -1386,7 +1391,7 @@ err: DBUG_ASSERT(error != 0); sql_print_error("Error in Log_event::read_log_event(): " "'%s', data_len: %d, event_type: %d", - error,data_len,head[EVENT_TYPE_OFFSET]); + error,data_len,(uchar)(head[EVENT_TYPE_OFFSET])); my_free(buf); /* The SQL slave thread will check if file->error<0 to know @@ -1535,6 +1540,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case ROTATE_EVENT: ev = new Rotate_log_event(buf, event_len, description_event); break; + case BINLOG_CHECKPOINT_EVENT: + ev = new Binlog_checkpoint_log_event(buf, event_len, description_event); + break; #ifdef HAVE_REPLICATION case SLAVE_EVENT: /* can never happen (unused event) */ ev = new Slave_log_event(buf, event_len, description_event); @@ -2448,27 +2456,22 @@ Log_event::continue_group(Relay_log_info *rli) show the catalog ?? */ -void Query_log_event::pack_info(Protocol *protocol) +void Query_log_event::pack_info(THD *thd, Protocol *protocol) { // TODO: show the catalog ?? - char *buf, *pos; - if (!(buf= (char*) my_malloc(9 + db_len + q_len, MYF(MY_WME)))) - return; - pos= buf; + char buf_mem[1024]; + String buf(buf_mem, sizeof(buf_mem), system_charset_info); + buf.real_alloc(9 + db_len + q_len); if (!(flags & LOG_EVENT_SUPPRESS_USE_F) && db && db_len) { - pos= strmov(buf, "use `"); - memcpy(pos, db, db_len); - pos= strmov(pos+db_len, "`; "); + buf.append(STRING_WITH_LEN("use ")); + append_identifier(thd, &buf, db, db_len); + buf.append("; "); } if (query && q_len) - { - memcpy(pos, query, q_len); - pos+= q_len; - } - protocol->store(buf, pos-buf, &my_charset_bin); - my_free(buf); + buf.append(query, q_len); + protocol->store(&buf); } #endif @@ -3218,24 +3221,41 @@ Query_log_event::Query_log_event(const char* buf, uint event_len, pos= (const uchar*) end; // Break loop } } - + + /** + Layout for the data buffer is as follows + +--------+-----------+------+------+---------+----+-------+ + | catlog | time_zone | user | host | db name | \0 | Query | + +--------+-----------+------+------+---------+----+-------+ + + To support the query cache we append the following buffer to the above + +-------+----------------------------------------+-------+ + |db len | uninitiatlized space of size of db len | FLAGS | + +-------+----------------------------------------+-------+ + + The area of buffer starting from Query field all the way to the end belongs + to the Query buffer and its structure is described in alloc_query() in + sql_parse.cc + */ + #if !defined(MYSQL_CLIENT) && defined(HAVE_QUERY_CACHE) - if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1 + - time_zone_len + 1 + - data_len + 1 + - QUERY_CACHE_DB_LENGTH_SIZE + - QUERY_CACHE_FLAGS_SIZE + - user.length + 1 + - host.length + 1 + - db_len + 1, - MYF(MY_WME)))) + if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1 + + time_zone_len + 1 + + user.length + 1 + + host.length + 1 + + data_len + 1 + + sizeof(size_t)//for db_len + + db_len + 1 + + QUERY_CACHE_DB_LENGTH_SIZE + + QUERY_CACHE_FLAGS_SIZE, + MYF(MY_WME)))) #else - if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1 + - time_zone_len + 1 + - data_len + 1 + - user.length + 1 + - host.length + 1, - MYF(MY_WME)))) + if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1 + + time_zone_len + 1 + + user.length + 1 + + host.length + 1 + + data_len + 1, + MYF(MY_WME)))) #endif DBUG_VOID_RETURN; if (catalog_len) // If catalog is given @@ -3275,10 +3295,127 @@ Query_log_event::Query_log_event(const char* buf, uint event_len, db= (char *)start; query= (char *)(start + db_len + 1); q_len= data_len - db_len -1; + /** + Append the db length at the end of the buffer. This will be used by + Query_cache::send_result_to_client() in case the query cache is On. + */ +#if !defined(MYSQL_CLIENT) && defined(HAVE_QUERY_CACHE) + size_t db_length= (size_t)db_len; + memcpy(start + data_len + 1, &db_length, sizeof(size_t)); +#endif DBUG_VOID_RETURN; } +/* + Replace a binlog event read into a packet with a dummy event. Either a + Query_log_event that has just a comment, or if that will not fit in the + space used for the event to be replaced, then a NULL user_var event. + + This is used when sending binlog data to a slave which does not understand + this particular event and which is too old to support informational events + or holes in the event stream. + + This allows to write such events into the binlog on the master and still be + able to replicate against old slaves without them breaking. + + Clears the flag LOG_EVENT_THREAD_SPECIFIC_F and set LOG_EVENT_SUPPRESS_USE_F. + Overwrites the type with QUERY_EVENT (or USER_VAR_EVENT), and replaces the + body with a minimal query / NULL user var. + + Returns zero on success, -1 if error due to too little space in original + event. A minimum of 25 bytes (19 bytes fixed header + 6 bytes in the body) + is needed in any event to be replaced with a dummy event. +*/ +int +Query_log_event::dummy_event(String *packet, ulong ev_offset, + uint8 checksum_alg) +{ + uchar *p= (uchar *)packet->ptr() + ev_offset; + size_t data_len= packet->length() - ev_offset; + uint16 flags; + static const size_t min_user_var_event_len= + LOG_EVENT_HEADER_LEN + UV_NAME_LEN_SIZE + 1 + UV_VAL_IS_NULL; // 25 + static const size_t min_query_event_len= + LOG_EVENT_HEADER_LEN + QUERY_HEADER_LEN + 1 + 1; // 34 + + if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) + data_len-= BINLOG_CHECKSUM_LEN; + else + DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + checksum_alg == BINLOG_CHECKSUM_ALG_OFF); + + if (data_len < min_user_var_event_len) + /* Cannot replace with dummy, event too short. */ + return -1; + + flags= uint2korr(p + FLAGS_OFFSET); + flags&= ~LOG_EVENT_THREAD_SPECIFIC_F; + flags|= LOG_EVENT_SUPPRESS_USE_F; + int2store(p + FLAGS_OFFSET, flags); + + if (data_len < min_query_event_len) + { + /* + Have to use dummy user_var event for such a short packet. + + This works, but the event will be considered part of an event group with + the following event. So for example @@global.sql_slave_skip_counter=1 + will skip not only the dummy event, but also the immediately following + event. + + We write a NULL user var with the name @`!dummyvar` (or as much + as that as will fit within the size of the original event - so + possibly just @`!`). + */ + static const char var_name[]= "!dummyvar"; + uint name_len= data_len - (min_user_var_event_len - 1); + + p[EVENT_TYPE_OFFSET]= USER_VAR_EVENT; + int4store(p + LOG_EVENT_HEADER_LEN, name_len); + memcpy(p + LOG_EVENT_HEADER_LEN + UV_NAME_LEN_SIZE, var_name, name_len); + p[LOG_EVENT_HEADER_LEN + UV_NAME_LEN_SIZE + name_len]= 1; // indicates NULL + } + else + { + /* + Use a dummy query event, just a comment. + */ + static const char message[]= + "# Dummy event replacing event type %u that slave cannot handle."; + char buf[sizeof(message)+1]; /* +1, as %u can expand to 3 digits. */ + uchar old_type= p[EVENT_TYPE_OFFSET]; + uchar *q= p + LOG_EVENT_HEADER_LEN; + size_t comment_len, len; + + p[EVENT_TYPE_OFFSET]= QUERY_EVENT; + int4store(q + Q_THREAD_ID_OFFSET, 0); + int4store(q + Q_EXEC_TIME_OFFSET, 0); + q[Q_DB_LEN_OFFSET]= 0; + int2store(q + Q_ERR_CODE_OFFSET, 0); + int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0); + q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */ + q+= Q_DATA_OFFSET + 1; + len= my_snprintf(buf, sizeof(buf), message, old_type); + comment_len= data_len - (min_query_event_len - 1); + if (comment_len <= len) + memcpy(q, buf, comment_len); + else + { + memcpy(q, buf, len); + memset(q+len, ' ', comment_len - len); + } + } + + if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) + { + ha_checksum crc= my_checksum(0L, p, data_len); + int4store(p + data_len, crc); + } + return 0; +} + + #ifdef MYSQL_CLIENT /** Query_log_event::print(). @@ -3309,11 +3446,17 @@ void Query_log_event::print_query_header(IO_CACHE* file, } else if (db) { + /* Room for expand ` to `` + initial/final ` + \0 */ + char buf[FN_REFLEN*2+3]; + different_db= memcmp(print_event_info->db, db, db_len + 1); if (different_db) memcpy(print_event_info->db, db, db_len + 1); if (db[0] && different_db) - my_b_printf(file, "use %s%s\n", db, print_event_info->delimiter); + { + my_snprintf(buf, sizeof(buf), "%`s", db); + my_b_printf(file, "use %s%s\n", buf, print_event_info->delimiter); + } } end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10); @@ -3461,6 +3604,12 @@ void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { Write_on_release_cache cache(&print_event_info->head_cache, file); + /** + reduce the size of io cache so that the write function is called + for every call to my_b_write(). + */ + DBUG_EXECUTE_IF ("simulate_file_write_error", + {(&cache)->write_pos= (&cache)->write_end- 500;}); print_query_header(&cache, print_event_info); my_b_write(&cache, (uchar*) query, q_len); my_b_printf(&cache, "\n%s\n", print_event_info->delimiter); @@ -3479,6 +3628,34 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli) return do_apply_event(rli, query, q_len); } +/** + Compare if two errors should be regarded as equal. + This is to handle the case when you can get slightly different errors + on master and slave for the same thing. + @param + expected_error Error we got on master + actual_error Error we got on slave + + @return + 1 Errors are equal + 0 Errors are different +*/ + +bool test_if_equal_repl_errors(int expected_error, int actual_error) +{ + if (expected_error == actual_error) + return 1; + switch (expected_error) { + case ER_DUP_ENTRY: + case ER_AUTOINC_READ_FAILED: + return (actual_error == ER_AUTOINC_READ_FAILED || + actual_error == HA_ERR_AUTOINC_ERANGE); + default: + break; + } + return 0; +} + /** @todo @@ -3503,6 +3680,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, LEX_STRING new_db; int expected_error,actual_error= 0; HA_CREATE_INFO db_options; + DBUG_ENTER("Query_log_event::do_apply_event"); /* Colleagues: please never free(thd->catalog) in MySQL. This would @@ -3790,7 +3968,8 @@ compare_errors: DBUG_PRINT("info",("expected_error: %d sql_errno: %d", expected_error, actual_error)); - if ((expected_error && expected_error != actual_error && + if ((expected_error && + !test_if_equal_repl_errors(expected_error, actual_error) && !concurrency_error_code(expected_error)) && !ignored_error_code(actual_error) && !ignored_error_code(expected_error)) @@ -3812,7 +3991,7 @@ Default database: '%s'. Query: '%s'", If we get the same error code as expected and it is not a concurrency issue, or should be ignored. */ - else if ((expected_error == actual_error && + else if ((test_if_equal_repl_errors(expected_error, actual_error) && !concurrency_error_code(expected_error)) || ignored_error_code(actual_error)) { @@ -3898,7 +4077,7 @@ end: thd->first_successful_insert_id_in_prev_stmt= 0; thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0; free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); - return thd->is_slave_error; + DBUG_RETURN(thd->is_slave_error); } int Query_log_event::do_update_pos(Relay_log_info *rli) @@ -3971,7 +4150,7 @@ Start_log_event_v3::Start_log_event_v3() */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Start_log_event_v3::pack_info(Protocol *protocol) +void Start_log_event_v3::pack_info(THD *thd, Protocol *protocol) { char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos; pos= strmov(buf, "Server ver: "); @@ -4262,6 +4441,8 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) // Set header lengths of Maria events post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN; + post_header_len[BINLOG_CHECKPOINT_EVENT-1]= + BINLOG_CHECKPOINT_HEADER_LEN; // Sanity-check that all post header lengths are initialized. int i; @@ -4747,131 +4928,113 @@ uint8 get_checksum_alg(const char* buf, ulong len) */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -uint Load_log_event::get_query_buffer_length() +void Load_log_event::print_query(THD *thd, bool need_db, const char *cs, + String *buf, my_off_t *fn_start, + my_off_t *fn_end, const char *qualify_db) { - return - 5 + db_len + 3 + // "use DB; " - 18 + fname_len + 2 + // "LOAD DATA INFILE 'file''" - 11 + // "CONCURRENT " - 7 + // LOCAL - 9 + // " REPLACE or IGNORE " - 13 + table_name_len*2 + // "INTO TABLE `table`" - 21 + sql_ex.field_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'" - 23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'" - 12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'" - 21 + sql_ex.line_term_len*4 + 2 + // " LINES TERMINATED BY 'str'" - 19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'" - 15 + 22 + // " IGNORE xxx LINES" - 3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)" -} - - -void Load_log_event::print_query(bool need_db, const char *cs, char *buf, - char **end, char **fn_start, char **fn_end) -{ - char *pos= buf; - if (need_db && db && db_len) { - pos= strmov(pos, "use `"); - memcpy(pos, db, db_len); - pos= strmov(pos+db_len, "`; "); + buf->append(STRING_WITH_LEN("use ")); + append_identifier(thd, buf, db, db_len); + buf->append(STRING_WITH_LEN("; ")); } - pos= strmov(pos, "LOAD DATA "); + buf->append(STRING_WITH_LEN("LOAD DATA ")); if (is_concurrent) - pos= strmov(pos, "CONCURRENT "); + buf->append(STRING_WITH_LEN("CONCURRENT ")); if (fn_start) - *fn_start= pos; + *fn_start= buf->length(); if (check_fname_outside_temp_buf()) - pos= strmov(pos, "LOCAL "); - pos= strmov(pos, "INFILE '"); - memcpy(pos, fname, fname_len); - pos= strmov(pos+fname_len, "' "); + buf->append(STRING_WITH_LEN("LOCAL ")); + buf->append(STRING_WITH_LEN("INFILE '")); + buf->append_for_single_quote(fname, fname_len); + buf->append(STRING_WITH_LEN("' ")); if (sql_ex.opt_flags & REPLACE_FLAG) - pos= strmov(pos, "REPLACE "); + buf->append(STRING_WITH_LEN("REPLACE ")); else if (sql_ex.opt_flags & IGNORE_FLAG) - pos= strmov(pos, "IGNORE "); + buf->append(STRING_WITH_LEN("IGNORE ")); - pos= strmov(pos ,"INTO"); + buf->append(STRING_WITH_LEN("INTO")); if (fn_end) - *fn_end= pos; + *fn_end= buf->length(); - pos= strmov(pos ," TABLE `"); - memcpy(pos, table_name, table_name_len); - pos+= table_name_len; + buf->append(STRING_WITH_LEN(" TABLE ")); + if (qualify_db) + { + append_identifier(thd, buf, qualify_db, strlen(qualify_db)); + buf->append(STRING_WITH_LEN(".")); + } + append_identifier(thd, buf, table_name, table_name_len); if (cs != NULL) { - pos= strmov(pos ,"` CHARACTER SET "); - pos= strmov(pos , cs); + buf->append(STRING_WITH_LEN(" CHARACTER SET ")); + buf->append(cs, strlen(cs)); } - else - pos= strmov(pos, "`"); /* We have to create all optional fields as the default is not empty */ - pos= strmov(pos, " FIELDS TERMINATED BY "); - pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len); + buf->append(STRING_WITH_LEN(" FIELDS TERMINATED BY ")); + pretty_print_str(buf, sql_ex.field_term, sql_ex.field_term_len); if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG) - pos= strmov(pos, " OPTIONALLY "); - pos= strmov(pos, " ENCLOSED BY "); - pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len); + buf->append(STRING_WITH_LEN(" OPTIONALLY ")); + buf->append(STRING_WITH_LEN(" ENCLOSED BY ")); + pretty_print_str(buf, sql_ex.enclosed, sql_ex.enclosed_len); - pos= strmov(pos, " ESCAPED BY "); - pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len); + buf->append(STRING_WITH_LEN(" ESCAPED BY ")); + pretty_print_str(buf, sql_ex.escaped, sql_ex.escaped_len); - pos= strmov(pos, " LINES TERMINATED BY "); - pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len); + buf->append(STRING_WITH_LEN(" LINES TERMINATED BY ")); + pretty_print_str(buf, sql_ex.line_term, sql_ex.line_term_len); if (sql_ex.line_start_len) { - pos= strmov(pos, " STARTING BY "); - pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len); + buf->append(STRING_WITH_LEN(" STARTING BY ")); + pretty_print_str(buf, sql_ex.line_start, sql_ex.line_start_len); } if ((long) skip_lines > 0) { - pos= strmov(pos, " IGNORE "); - pos= longlong10_to_str((longlong) skip_lines, pos, 10); - pos= strmov(pos," LINES "); + buf->append(STRING_WITH_LEN(" IGNORE ")); + buf->append_ulonglong(skip_lines); + buf->append(STRING_WITH_LEN(" LINES ")); } if (num_fields) { uint i; const char *field= fields; - pos= strmov(pos, " ("); + buf->append(STRING_WITH_LEN(" (")); for (i = 0; i < num_fields; i++) { if (i) { - *pos++= ' '; - *pos++= ','; + /* + Yes, the space and comma is reversed here. But this is mostly dead + code, at most used when reading really old binlogs from old servers, + so better just leave it as is... + */ + buf->append(STRING_WITH_LEN(" ,")); } - memcpy(pos, field, field_lens[i]); - pos+= field_lens[i]; + append_identifier(thd, buf, field, field_lens[i]); field+= field_lens[i] + 1; } - *pos++= ')'; + buf->append(STRING_WITH_LEN(")")); } - - *end= pos; } -void Load_log_event::pack_info(Protocol *protocol) +void Load_log_event::pack_info(THD *thd, Protocol *protocol) { - char *buf, *end; + char query_buffer[1024]; + String query_str(query_buffer, sizeof(query_buffer), system_charset_info); - if (!(buf= (char*) my_malloc(get_query_buffer_length(), MYF(MY_WME)))) - return; - print_query(TRUE, NULL, buf, &end, 0, 0); - protocol->store(buf, end-buf, &my_charset_bin); - my_free(buf); + query_str.length(0); + print_query(thd, TRUE, NULL, &query_str, 0, 0, NULL); + protocol->store(query_str.ptr(), query_str.length(), &my_charset_bin); } #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ @@ -5253,6 +5416,8 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, bool use_rli_only_for_errors) { LEX_STRING new_db; + DBUG_ENTER("Load_log_event::do_apply_event"); + new_db.length= db_len; new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length); thd->set_db(new_db.str, new_db.length); @@ -5327,16 +5492,20 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, else { char llbuff[22]; - char *end; enum enum_duplicates handle_dup; bool ignore= 0; + char query_buffer[1024]; + String query_str(query_buffer, sizeof(query_buffer), system_charset_info); char *load_data_query; + query_str.length(0); /* Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST and written to slave's binlog if binlogging is on. */ - if (!(load_data_query= (char *)thd->alloc(get_query_buffer_length() + 1))) + print_query(thd, FALSE, NULL, &query_str, NULL, NULL, NULL); + if (!(load_data_query= (char *)thd->strmake(query_str.ptr(), + query_str.length()))) { /* This will set thd->fatal_error in case of OOM. So we surely will notice @@ -5345,9 +5514,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, goto error; } - print_query(FALSE, NULL, load_data_query, &end, NULL, NULL); - *end= 0; - thd->set_query(load_data_query, (uint) (end - load_data_query)); + thd->set_query(load_data_query, (uint) (query_str.length())); if (sql_ex.opt_flags & REPLACE_FLAG) handle_dup= DUP_REPLACE; @@ -5493,7 +5660,7 @@ error: Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'", err, (char*)table_name, print_slave_db_safe(remember_db)); free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); - return 1; + DBUG_RETURN(1); } free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); @@ -5508,10 +5675,10 @@ Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'", rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR), buf); - return 1; + DBUG_RETURN(1); } - return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) ); + DBUG_RETURN( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) ); } #endif @@ -5525,7 +5692,7 @@ Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'", */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Rotate_log_event::pack_info(Protocol *protocol) +void Rotate_log_event::pack_info(THD *thd, Protocol *protocol) { char buf1[256], buf[22]; String tmp(buf1, sizeof(buf1), log_cs); @@ -5735,6 +5902,86 @@ Rotate_log_event::do_shall_skip(Relay_log_info *rli) /************************************************************************** + Binlog_checkpoint_log_event methods +**************************************************************************/ + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +void Binlog_checkpoint_log_event::pack_info(THD *thd, Protocol *protocol) +{ + protocol->store(binlog_file_name, binlog_file_len, &my_charset_bin); +} +#endif + + +#ifdef MYSQL_CLIENT +void Binlog_checkpoint_log_event::print(FILE *file, + PRINT_EVENT_INFO *print_event_info) +{ + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + + if (print_event_info->short_form) + return; + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tBinlog checkpoint "); + my_b_write(&cache, (uchar*)binlog_file_name, binlog_file_len); + my_b_printf(&cache, "\n"); +} +#endif /* MYSQL_CLIENT */ + + +#ifdef MYSQL_SERVER +Binlog_checkpoint_log_event::Binlog_checkpoint_log_event( + const char *binlog_file_name_arg, + uint binlog_file_len_arg) + :Log_event(), + binlog_file_name(my_strndup(binlog_file_name_arg, binlog_file_len_arg, + MYF(MY_WME))), + binlog_file_len(binlog_file_len_arg) +{ + cache_type= EVENT_NO_CACHE; +} +#endif /* MYSQL_SERVER */ + + +Binlog_checkpoint_log_event::Binlog_checkpoint_log_event( + const char *buf, uint event_len, + const Format_description_log_event *description_event) + :Log_event(buf, description_event), binlog_file_name(0) +{ + uint8 header_size= description_event->common_header_len; + uint8 post_header_len= + description_event->post_header_len[BINLOG_CHECKPOINT_EVENT-1]; + if (event_len < header_size + post_header_len || + post_header_len < BINLOG_CHECKPOINT_HEADER_LEN) + return; + buf+= header_size; + /* See uint4korr and int4store below */ + compile_time_assert(BINLOG_CHECKPOINT_HEADER_LEN == 4); + binlog_file_len= uint4korr(buf); + if (event_len - (header_size + post_header_len) < binlog_file_len) + return; + binlog_file_name= my_strndup(buf + post_header_len, binlog_file_len, + MYF(MY_WME)); + return; +} + + +#ifndef MYSQL_CLIENT +bool Binlog_checkpoint_log_event::write(IO_CACHE *file) +{ + uchar buf[BINLOG_CHECKPOINT_HEADER_LEN]; + int4store(buf, binlog_file_len); + return write_header(file, BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) || + wrapper_my_b_safe_write(file, buf, BINLOG_CHECKPOINT_HEADER_LEN) || + wrapper_my_b_safe_write(file, (const uchar *)binlog_file_name, + binlog_file_len) || + write_footer(file); +} +#endif /* MYSQL_CLIENT */ + + +/************************************************************************** Intvar_log_event methods **************************************************************************/ @@ -5743,7 +5990,7 @@ Rotate_log_event::do_shall_skip(Relay_log_info *rli) */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Intvar_log_event::pack_info(Protocol *protocol) +void Intvar_log_event::pack_info(THD *thd, Protocol *protocol) { char buf[256], *pos; pos= strmake(buf, get_var_type_name(), sizeof(buf)-23); @@ -5897,7 +6144,7 @@ Intvar_log_event::do_shall_skip(Relay_log_info *rli) **************************************************************************/ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Rand_log_event::pack_info(Protocol *protocol) +void Rand_log_event::pack_info(THD *thd, Protocol *protocol) { char buf1[256], *pos; pos= strmov(buf1,"rand_seed1="); @@ -6022,7 +6269,7 @@ bool slave_execute_deferred_events(THD *thd) **************************************************************************/ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Xid_log_event::pack_info(Protocol *protocol) +void Xid_log_event::pack_info(THD *thd, Protocol *protocol) { char buf[128], *pos; pos= strmov(buf, "COMMIT /* xid="); @@ -6119,84 +6366,117 @@ Xid_log_event::do_shall_skip(Relay_log_info *rli) **************************************************************************/ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void User_var_log_event::pack_info(Protocol* protocol) +static bool +user_var_append_name_part(THD *thd, String *buf, + const char *name, size_t name_len) { - char *buf= 0; - uint val_offset= 4 + name_len; - uint event_len= val_offset; + return buf->append("@") || + append_identifier(thd, buf, name, name_len) || + buf->append("="); +} +void User_var_log_event::pack_info(THD *thd, Protocol* protocol) +{ if (is_null) { - if (!(buf= (char*) my_malloc(val_offset + 5, MYF(MY_WME)))) + char buf_mem[FN_REFLEN+7]; + String buf(buf_mem, sizeof(buf_mem), system_charset_info); + buf.length(0); + if (user_var_append_name_part(thd, &buf, name, name_len) || + buf.append("NULL")) return; - strmov(buf + val_offset, "NULL"); - event_len= val_offset + 4; + protocol->store(buf.ptr(), buf.length(), &my_charset_bin); } else { switch (type) { case REAL_RESULT: + { double real_val; + char buf2[MY_GCVT_MAX_FIELD_WIDTH+1]; + char buf_mem[FN_REFLEN + MY_GCVT_MAX_FIELD_WIDTH + 1]; + String buf(buf_mem, sizeof(buf_mem), system_charset_info); float8get(real_val, val); - if (!(buf= (char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1, - MYF(MY_WME)))) + buf.length(0); + if (user_var_append_name_part(thd, &buf, name, name_len) || + buf.append(buf2, my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, + MY_GCVT_MAX_FIELD_WIDTH, buf2, NULL))) return; - event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH, - buf + val_offset, NULL); + protocol->store(buf.ptr(), buf.length(), &my_charset_bin); break; + } case INT_RESULT: - if (!(buf= (char*) my_malloc(val_offset + 22, MYF(MY_WME)))) + { + char buf2[22]; + char buf_mem[FN_REFLEN + 22]; + String buf(buf_mem, sizeof(buf_mem), system_charset_info); + buf.length(0); + if (user_var_append_name_part(thd, &buf, name, name_len) || + buf.append(buf2, + longlong10_to_str(uint8korr(val), buf2, + ((flags & User_var_log_event::UNSIGNED_F) ? 10 : -10))-buf2)) return; - event_len= longlong10_to_str(uint8korr(val), buf + val_offset, - ((flags & User_var_log_event::UNSIGNED_F) ? - 10 : -10))-buf; + protocol->store(buf.ptr(), buf.length(), &my_charset_bin); break; + } case DECIMAL_RESULT: { - if (!(buf= (char*) my_malloc(val_offset + DECIMAL_MAX_STR_LENGTH, - MYF(MY_WME)))) - return; - String str(buf+val_offset, DECIMAL_MAX_STR_LENGTH, &my_charset_bin); + char buf_mem[FN_REFLEN + DECIMAL_MAX_STR_LENGTH]; + String buf(buf_mem, sizeof(buf_mem), system_charset_info); + char buf2[DECIMAL_MAX_STR_LENGTH+1]; + String str(buf2, sizeof(buf2), &my_charset_bin); my_decimal dec; + buf.length(0); binary2my_decimal(E_DEC_FATAL_ERROR, (uchar*) (val+2), &dec, val[0], val[1]); my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str); - event_len= str.length() + val_offset; + if (user_var_append_name_part(thd, &buf, name, name_len) || + buf.append(buf2)) + return; + protocol->store(buf.ptr(), buf.length(), &my_charset_bin); break; - } + } case STRING_RESULT: + { /* 15 is for 'COLLATE' and other chars */ - buf= (char*) my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15, - MYF(MY_WME)); + char buf_mem[FN_REFLEN + 512 + 1 + 2*MY_CS_NAME_SIZE+15]; + String buf(buf_mem, sizeof(buf_mem), system_charset_info); CHARSET_INFO *cs; - if (!buf) - return; + buf.length(0); if (!(cs= get_charset(charset_number, MYF(0)))) { - strmov(buf+val_offset, "???"); - event_len+= 3; + if (buf.append("???")) + return; } else { - char *p= strxmov(buf + val_offset, "_", cs->csname, " ", NullS); - p= str_to_hex(p, val, val_len); - p= strxmov(p, " COLLATE ", cs->name, NullS); - event_len= p-buf; + size_t old_len; + char *beg, *end; + if (user_var_append_name_part(thd, &buf, name, name_len) || + buf.append("_") || + buf.append(cs->csname) || + buf.append(" ")) + return; + old_len= buf.length(); + if (buf.reserve(old_len + val_len*2 + 2 + sizeof(" COLLATE ") + + MY_CS_NAME_SIZE)) + return; + beg= const_cast<char *>(buf.ptr()) + old_len; + end= str_to_hex(beg, val, val_len); + buf.length(old_len + (end - beg)); + if (buf.append(" COLLATE ") || + buf.append(cs->name)) + return; } + protocol->store(buf.ptr(), buf.length(), &my_charset_bin); break; + } case ROW_RESULT: default: DBUG_ASSERT(0); return; } } - buf[0]= '@'; - buf[1]= '`'; - memcpy(buf+2, name, name_len); - buf[2+name_len]= '`'; - buf[3+name_len]= '='; - protocol->store(buf, event_len, &my_charset_bin); - my_free(buf); } #endif /* !MYSQL_CLIENT */ @@ -6205,6 +6485,9 @@ User_var_log_event:: User_var_log_event(const char* buf, const Format_description_log_event* description_event) :Log_event(buf, description_event) +#ifndef MYSQL_CLIENT + , deferred(false) +#endif { /* The Post-Header is empty. The Variable Data part begins immediately. */ const char *start= buf; @@ -6351,9 +6634,8 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) my_b_printf(&cache, "\tUser_var\n"); } - my_b_printf(&cache, "SET @`"); - my_b_write(&cache, (uchar*) name, (uint) (name_len)); - my_b_printf(&cache, "`"); + my_b_printf(&cache, "SET @"); + my_b_write_backtick_quote(&cache, name, name_len); if (is_null) { @@ -6452,12 +6734,16 @@ int User_var_log_event::do_apply_event(Relay_log_info const *rli) { Item *it= 0; CHARSET_INFO *charset; + DBUG_ENTER("User_var_log_event::do_apply_event"); if (rli->deferred_events_collecting) - return rli->deferred_events->add(this); + { + set_deferred(); + DBUG_RETURN(rli->deferred_events->add(this)); + } if (!(charset= get_charset(charset_number, MYF(MY_WME)))) - return 1; + DBUG_RETURN(1); LEX_STRING user_var_name; user_var_name.str= name; user_var_name.length= name_len; @@ -6503,10 +6789,11 @@ int User_var_log_event::do_apply_event(Relay_log_info const *rli) case ROW_RESULT: default: DBUG_ASSERT(0); - return 0; + DBUG_RETURN(0); } } - Item_func_set_user_var e(user_var_name, it); + + Item_func_set_user_var *e= new Item_func_set_user_var(user_var_name, it); /* Item_func_set_user_var can't substitute something else on its place => 0 can be passed as last argument (reference on item) @@ -6515,19 +6802,20 @@ int User_var_log_event::do_apply_event(Relay_log_info const *rli) crash the server, so if fix fields fails, we just return with an error. */ - if (e.fix_fields(thd, 0)) - return 1; + if (e->fix_fields(thd, 0)) + DBUG_RETURN(1); /* A variable can just be considered as a table with a single record and with a single column. Thus, like a column value, it could always have IMPLICIT derivation. */ - e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, - (flags & User_var_log_event::UNSIGNED_F)); - free_root(thd->mem_root,0); + e->update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, + (flags & User_var_log_event::UNSIGNED_F)); + if (!is_deferred()) + free_root(thd->mem_root, 0); - return 0; + DBUG_RETURN(0); } int User_var_log_event::do_update_pos(Relay_log_info *rli) @@ -6570,7 +6858,7 @@ void Unknown_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info #endif #ifndef MYSQL_CLIENT -void Slave_log_event::pack_info(Protocol *protocol) +void Slave_log_event::pack_info(THD *thd, Protocol *protocol) { char buf[256+HOSTNAME_LENGTH], *pos; pos= strmov(buf, "host="); @@ -6922,11 +7210,18 @@ void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info { Load_log_event::print(file, print_event_info, !check_fname_outside_temp_buf()); - /* - That one is for "file_id: etc" below: in mysqlbinlog we want the #, in - SHOW BINLOG EVENTS we don't. - */ - my_b_printf(&cache, "#"); + /** + reduce the size of io cache so that the write function is called + for every call to my_b_printf(). + */ + DBUG_EXECUTE_IF ("simulate_create_event_write_error", + {(&cache)->write_pos= (&cache)->write_end; + DBUG_SET("+d,simulate_file_write_error");}); + /* + That one is for "file_id: etc" below: in mysqlbinlog we want the #, in + SHOW BINLOG EVENTS we don't. + */ + my_b_printf(&cache, "#"); } my_b_printf(&cache, " file_id: %d block_len: %d\n", file_id, block_len); @@ -6945,7 +7240,7 @@ void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Create_file_log_event::pack_info(Protocol *protocol) +void Create_file_log_event::pack_info(THD *thd, Protocol *protocol) { char buf[SAFE_NAME_LEN*2 + 30 + 21*2], *pos; pos= strmov(buf, "db="); @@ -7129,7 +7424,7 @@ void Append_block_log_event::print(FILE* file, */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Append_block_log_event::pack_info(Protocol *protocol) +void Append_block_log_event::pack_info(THD *thd, Protocol *protocol) { char buf[256]; uint length; @@ -7284,7 +7579,7 @@ void Delete_file_log_event::print(FILE* file, */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Delete_file_log_event::pack_info(Protocol *protocol) +void Delete_file_log_event::pack_info(THD *thd, Protocol *protocol) { char buf[64]; uint length; @@ -7383,7 +7678,7 @@ void Execute_load_log_event::print(FILE* file, */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Execute_load_log_event::pack_info(Protocol *protocol) +void Execute_load_log_event::pack_info(THD *thd, Protocol *protocol) { char buf[64]; uint length; @@ -7612,6 +7907,13 @@ void Execute_load_query_log_event::print(FILE* file, Write_on_release_cache cache(&print_event_info->head_cache, file); print_query_header(&cache, print_event_info); + /** + reduce the size of io cache so that the write function is called + for every call to my_b_printf(). + */ + DBUG_EXECUTE_IF ("simulate_execute_event_write_error", + {(&cache)->write_pos= (&cache)->write_end; + DBUG_SET("+d,simulate_file_write_error");}); if (local_fname) { @@ -7638,27 +7940,24 @@ void Execute_load_query_log_event::print(FILE* file, #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Execute_load_query_log_event::pack_info(Protocol *protocol) +void Execute_load_query_log_event::pack_info(THD *thd, Protocol *protocol) { - char *buf, *pos; - if (!(buf= (char*) my_malloc(9 + db_len + q_len + 10 + 21, MYF(MY_WME)))) - return; - pos= buf; + char buf_mem[1024]; + String buf(buf_mem, sizeof(buf_mem), system_charset_info); + buf.real_alloc(9 + db_len + q_len + 10 + 21); if (db && db_len) { - pos= strmov(buf, "use `"); - memcpy(pos, db, db_len); - pos= strmov(pos+db_len, "`; "); - } - if (query && q_len) - { - memcpy(pos, query, q_len); - pos+= q_len; + if (buf.append("use ") || + append_identifier(thd, &buf, db, db_len) || + buf.append("; ")) + return; } - pos= strmov(pos, " ;file_id="); - pos= int10_to_str((long) file_id, pos, 10); - protocol->store(buf, pos-buf, &my_charset_bin); - my_free(buf); + if (query && q_len && buf.append(query, q_len)) + return; + if (buf.append(" ;file_id=") || + buf.append_ulonglong(file_id)) + return; + protocol->store(buf.ptr(), buf.length(), &my_charset_bin); } @@ -8621,7 +8920,7 @@ bool Rows_log_event::write_data_body(IO_CACHE*file) #endif #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Rows_log_event::pack_info(Protocol *protocol) +void Rows_log_event::pack_info(THD *thd, Protocol *protocol) { char buf[256]; char const *const flagstr= @@ -8725,7 +9024,7 @@ bool Annotate_rows_log_event::write_data_body(IO_CACHE *file) #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -void Annotate_rows_log_event::pack_info(Protocol* protocol) +void Annotate_rows_log_event::pack_info(THD *thd, Protocol* protocol) { if (m_query_txt && m_query_len) protocol->store(m_query_txt, m_query_len, &my_charset_bin); @@ -9469,7 +9768,7 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file) */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Table_map_log_event::pack_info(Protocol *protocol) +void Table_map_log_event::pack_info(THD *thd, Protocol *protocol) { char buf[256]; size_t bytes= my_snprintf(buf, sizeof(buf), @@ -9490,7 +9789,7 @@ void Table_map_log_event::print(FILE *, PRINT_EVENT_INFO *print_event_info) { print_header(&print_event_info->head_cache, print_event_info, TRUE); my_b_printf(&print_event_info->head_cache, - "\tTable_map: `%s`.`%s` mapped to number %lu\n", + "\tTable_map: %`s.%`s mapped to number %lu\n", m_dbnam, m_tblnam, m_table_id); print_base64(&print_event_info->body_cache, print_event_info, TRUE); } @@ -10829,7 +11128,7 @@ Incident_log_event::description() const #ifndef MYSQL_CLIENT -void Incident_log_event::pack_info(Protocol *protocol) +void Incident_log_event::pack_info(THD *thd, Protocol *protocol) { char buf[256]; size_t bytes; @@ -10950,6 +11249,9 @@ Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len, There is a dummy replacement for this in the embedded library that returns FALSE; this is used by XtraDB to allow it to access replication stuff while still being able to use the same plugin in both stand-alone and embedded. + + In this function it's ok to use active_mi, as this is only called for + the main replication server. */ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos, const char **group_relay_log_name, |