diff options
author | Sergei Golubchik <sergii@pisem.net> | 2011-10-19 21:45:18 +0200 |
---|---|---|
committer | Sergei Golubchik <sergii@pisem.net> | 2011-10-19 21:45:18 +0200 |
commit | 76f0b94bb0b2994d639353530c5b251d0f1a204b (patch) | |
tree | 9ed50628aac34f89a37637bab2fc4915b86b5eb4 /sql/log_event.cc | |
parent | 4e46d8e5bff140f2549841167dc4b65a3c0a645d (diff) | |
parent | 5dc1a2231f55bacc9aaf0e24816f3d9c2ee1f21d (diff) | |
download | mariadb-git-76f0b94bb0b2994d639353530c5b251d0f1a204b.tar.gz |
merge with 5.3
sql/sql_insert.cc:
CREATE ... IF NOT EXISTS may do nothing, but
it is still not a failure. don't forget to my_ok it.
******
CREATE ... IF NOT EXISTS may do nothing, but
it is still not a failure. don't forget to my_ok it.
sql/sql_table.cc:
small cleanup
******
small cleanup
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 994 |
1 files changed, 849 insertions, 145 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 70087ed4da3..49383778b58 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -51,6 +51,31 @@ #include <my_bitmap.h> #include "rpl_utility.h" + +/** + BINLOG_CHECKSUM variable. +*/ +const char *binlog_checksum_type_names[]= { + "NONE", + "CRC32", + NullS +}; + +unsigned int binlog_checksum_type_length[]= { + sizeof("NONE") - 1, + sizeof("CRC32") - 1, + 0 +}; + +TYPELIB binlog_checksum_typelib= +{ + array_elements(binlog_checksum_type_names) - 1, "", + binlog_checksum_type_names, + binlog_checksum_type_length +}; + + + #define log_cs &my_charset_latin1 #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -64,6 +89,24 @@ */ #define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1) +/* + replication event checksum is introduced in the following "checksum-home" version. + The checksum-aware servers extract FD's version to decide whether the FD event + carries checksum info. + + TODO: correct the constant when it has been determined + (which main tree to push and when) +*/ +const uchar checksum_version_split_mysql[3]= {5, 6, 1}; +const ulong checksum_version_product_mysql= + (checksum_version_split_mysql[0] * 256 + + checksum_version_split_mysql[1]) * 256 + + checksum_version_split_mysql[2]; +const uchar checksum_version_split_mariadb[3]= {5, 3, 0}; +const ulong checksum_version_product_mariadb= + (checksum_version_split_mariadb[0] * 256 + + checksum_version_split_mariadb[1]) * 256 + + checksum_version_split_mariadb[2]; #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD* thd); @@ -587,7 +630,7 @@ append_query_string(CHARSET_INFO *csinfo, if (to->reserve(orig_len + from->length()*2+3)) return 1; - beg= to->c_ptr_quick() + to->length(); + beg= (char*) to->ptr() + to->length(); ptr= beg; if (csinfo->escape_with_backslash_is_dangerous) ptr= str_to_hex(ptr, from->ptr(), from->length()); @@ -624,7 +667,6 @@ static void print_set_option(IO_CACHE* file, uint32 bits_changed, } } #endif - /************************************************************************** Log_event methods (= the parent class of all events) **************************************************************************/ @@ -663,6 +705,7 @@ const char* Log_event::get_type_str(Log_event_type type) case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query"; case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query"; case INCIDENT_EVENT: return "Incident"; + case ANNOTATE_ROWS_EVENT: return "Annotate_rows"; default: return "Unknown"; /* impossible */ } } @@ -680,10 +723,12 @@ const char* Log_event::get_type_str() #ifndef MYSQL_CLIENT Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), - cache_type(Log_event::EVENT_INVALID_CACHE), thd(thd_arg) + crc(0), thd(thd_arg), + checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) { server_id= thd->server_id; - when= thd->start_time; + when= thd->start_time; + when_sec_part=thd->start_time_sec_part; if (using_trans) cache_type= Log_event::EVENT_TRANSACTIONAL_CACHE; @@ -700,14 +745,16 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) Log_event::Log_event() :temp_buf(0), exec_time(0), flags(0), - cache_type(Log_event::EVENT_INVALID_CACHE), thd(0) + cache_type(Log_event::EVENT_INVALID_CACHE), crc(0), + thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) { server_id= ::server_id; /* We can't call my_time() here as this would cause a call before my_init() is called */ - when= 0; + when= 0; + when_sec_part=0; log_pos= 0; } #endif /* !MYSQL_CLIENT */ @@ -719,12 +766,14 @@ Log_event::Log_event() Log_event::Log_event(const char* buf, const Format_description_log_event* description_event) - :temp_buf(0), cache_type(Log_event::EVENT_INVALID_CACHE) + :temp_buf(0), cache_type(Log_event::EVENT_INVALID_CACHE), + crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) { #ifndef MYSQL_CLIENT thd = 0; #endif when = uint4korr(buf); + when_sec_part= 0; server_id = uint4korr(buf + SERVER_ID_OFFSET); data_written= uint4korr(buf + EVENT_LEN_OFFSET); if (description_event->binlog_version==1) @@ -746,7 +795,7 @@ Log_event::Log_event(const char* buf, logs are in 4.0 format, until it finds a Format_desc). */ if (description_event->binlog_version==3 && - buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos) + (uchar)buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos) { /* If log_pos=0, don't change it. log_pos==0 is a marker to mean @@ -764,8 +813,8 @@ Log_event::Log_event(const char* buf, DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); flags= uint2korr(buf + FLAGS_OFFSET); - if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) || - (buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT)) + if (((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) || + ((uchar)buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT)) { /* These events always have a header which stops here (i.e. their @@ -813,21 +862,13 @@ int Log_event::do_update_pos(Relay_log_info *rli) DBUG_EXECUTE_IF("let_first_flush_log_change_timestamp", if (debug_not_change_ts_if_art_event == 1 && is_artificial_event()) - { - debug_not_change_ts_if_art_event= 0; - }); -#ifndef DBUG_OFF - rli->stmt_done(log_pos, - is_artificial_event() && - debug_not_change_ts_if_art_event > 0 ? 0 : when); -#else - rli->stmt_done(log_pos, is_artificial_event()? 0 : when); -#endif + debug_not_change_ts_if_art_event= 0; ); + rli->stmt_done(log_pos, is_artificial_event() && + IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ? + 0 : when); DBUG_EXECUTE_IF("let_first_flush_log_change_timestamp", if (debug_not_change_ts_if_art_event == 0) - { - debug_not_change_ts_if_art_event= 2; - }); + debug_not_change_ts_if_art_event= 2; ); } return 0; // Cannot fail currently } @@ -904,6 +945,105 @@ void Log_event::init_show_field_list(List<Item>* field_list) field_list->push_back(new Item_empty_string("Info", 20)); } +/** + A decider of whether to trigger checksum computation or not. + To be invoked in Log_event::write() stack. + The decision is positive + + S,M) if it's been marked for checksumming with @c checksum_alg + + M) otherwise, if @@global.binlog_checksum is not NONE and the event is + directly written to the binlog file. + The to-be-cached event decides at @c write_cache() time. + + Otherwise the decision is negative. + + @note A side effect of the method is altering Log_event::checksum_alg + it the latter was undefined at calling. + + @return true (positive) or false (negative) +*/ +my_bool Log_event::need_checksum() +{ + DBUG_ENTER("Log_event::need_checksum"); + my_bool ret; + /* + few callers of Log_event::write + (incl FD::write, FD constructing code on the slave side, Rotate relay log + and Stop event) + provides their checksum alg preference through Log_event::checksum_alg. + */ + ret= ((checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ? + (checksum_alg != BINLOG_CHECKSUM_ALG_OFF) : + ((binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF) && + (cache_type == Log_event::EVENT_NO_CACHE)) ? + test(binlog_checksum_options) : FALSE); + + /* + FD calls the methods before data_written has been calculated. + The following invariant claims if the current is not the first + call (and therefore data_written is not zero) then `ret' must be + TRUE. It may not be null because FD is always checksummed. + */ + + DBUG_ASSERT(get_type_code() != FORMAT_DESCRIPTION_EVENT || ret || + data_written == 0); + + if (checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF) + checksum_alg= ret ? // calculated value stored + (uint8) binlog_checksum_options : (uint8) BINLOG_CHECKSUM_ALG_OFF; + + DBUG_ASSERT(!ret || + ((checksum_alg == binlog_checksum_options || + /* + Stop event closes the relay-log and its checksum alg + preference is set by the caller can be different + from the server's binlog_checksum_options. + */ + get_type_code() == STOP_EVENT || + /* + Rotate:s can be checksummed regardless of the server's + binlog_checksum_options. That applies to both + the local RL's Rotate and the master's Rotate + which IO thread instantiates via queue_binlog_ver_3_event. + */ + get_type_code() == ROTATE_EVENT + || /* FD is always checksummed */ + get_type_code() == FORMAT_DESCRIPTION_EVENT) && + checksum_alg != BINLOG_CHECKSUM_ALG_OFF)); + + DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); + + DBUG_ASSERT(((get_type_code() != ROTATE_EVENT && + get_type_code() != STOP_EVENT) || + get_type_code() != FORMAT_DESCRIPTION_EVENT) || + cache_type == Log_event::EVENT_NO_CACHE); + + DBUG_RETURN(ret); +} + +bool Log_event::wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong size) +{ + if (need_checksum() && size != 0) + crc= my_checksum(crc, buf, size); + + return my_b_safe_write(file, buf, size); +} + +bool Log_event::write_footer(IO_CACHE* file) +{ + /* + footer contains the checksum-algorithm descriptor + followed by the checksum value + */ + if (need_checksum()) + { + uchar buf[BINLOG_CHECKSUM_LEN]; + int4store(buf, crc); + return (my_b_safe_write(file, (uchar*) buf, sizeof(buf))); + } + return 0; +} /* Log_event::write() @@ -913,11 +1053,18 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) { uchar header[LOG_EVENT_HEADER_LEN]; ulong now; + bool ret; DBUG_ENTER("Log_event::write_header"); /* Store number of bytes that will be written by this event */ data_written= event_data_length + sizeof(header); + if (need_checksum()) + { + crc= my_checksum(0L, NULL, 0); + data_written += BINLOG_CHECKSUM_LEN; + } + /* log_pos != 0 if this is relay-log event. In this case we should not change the position @@ -962,7 +1109,7 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) log_pos= my_b_safe_tell(file)+data_written; } - now= (ulong) get_time(); // Query start time + now= get_time(); // Query start time /* Header will be of size LOG_EVENT_HEADER_LEN for all events, except for @@ -976,9 +1123,36 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) int4store(header+ SERVER_ID_OFFSET, server_id); int4store(header+ EVENT_LEN_OFFSET, data_written); int4store(header+ LOG_POS_OFFSET, log_pos); - int2store(header+ FLAGS_OFFSET, flags); - - DBUG_RETURN(my_b_safe_write(file, header, sizeof(header)) != 0); + /* + recording checksum of FD event computed with dropped + possibly active LOG_EVENT_BINLOG_IN_USE_F flag. + Similar step at verication: the active flag is dropped before + checksum computing. + */ + if (header[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT || + !need_checksum() || !(flags & LOG_EVENT_BINLOG_IN_USE_F)) + { + int2store(header+ FLAGS_OFFSET, flags); + ret= wrapper_my_b_safe_write(file, header, sizeof(header)) != 0; + } + else + { + ret= (wrapper_my_b_safe_write(file, header, FLAGS_OFFSET) != 0); + if (!ret) + { + flags &= ~LOG_EVENT_BINLOG_IN_USE_F; + int2store(header + FLAGS_OFFSET, flags); + crc= my_checksum(crc, header + FLAGS_OFFSET, sizeof(flags)); + flags |= LOG_EVENT_BINLOG_IN_USE_F; + int2store(header + FLAGS_OFFSET, flags); + ret= (my_b_safe_write(file, header + FLAGS_OFFSET, sizeof(flags)) != 0); + } + if (!ret) + ret= (wrapper_my_b_safe_write(file, header + FLAGS_OFFSET + sizeof(flags), + sizeof(header) + - (FLAGS_OFFSET + sizeof(flags))) != 0); + } + DBUG_RETURN( ret); } @@ -988,11 +1162,13 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) */ int Log_event::read_log_event(IO_CACHE* file, String* packet, - mysql_mutex_t* log_lock) + mysql_mutex_t* log_lock, + uint8 checksum_alg_arg) { ulong data_len; int result=0; char buf[LOG_EVENT_MINIMAL_HEADER_LEN]; + uchar ev_offset= packet->length(); DBUG_ENTER("Log_event::read_log_event"); if (log_lock) @@ -1050,6 +1226,31 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, (file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO)); /* Implicit goto end; */ } + else + { + /* Corrupt the event for Dump thread*/ + DBUG_EXECUTE_IF("corrupt_read_log_event2", + uchar *debug_event_buf_c = (uchar*) packet->ptr() + ev_offset; + if (debug_event_buf_c[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) + { + int debug_cor_pos = rand() % (data_len + sizeof(buf) - BINLOG_CHECKSUM_LEN); + debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos]; + DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event: byte on position %d", debug_cor_pos)); + DBUG_SET("-d,corrupt_read_log_event2"); + } + ); + /* + CRC verification of the Dump thread + */ + if (opt_master_verify_checksum && + event_checksum_test((uchar*) packet->ptr() + ev_offset, + data_len + sizeof(buf), + checksum_alg_arg)) + { + result= LOG_READ_CHECKSUM_FAILURE; + goto end; + } + } } end: @@ -1075,11 +1276,13 @@ end: Log_event* Log_event::read_log_event(IO_CACHE* file, mysql_mutex_t* log_lock, const Format_description_log_event - *description_event) + *description_event, + my_bool crc_check) #else Log_event* Log_event::read_log_event(IO_CACHE* file, const Format_description_log_event - *description_event) + *description_event, + my_bool crc_check) #endif { DBUG_ENTER("Log_event::read_log_event"); @@ -1143,7 +1346,7 @@ failed my_b_read")); error = "read error"; goto err; } - if ((res= read_log_event(buf, data_len, &error, description_event))) + if ((res= read_log_event(buf, data_len, &error, description_event, crc_check))) res->register_temp_buf(buf, TRUE); err: @@ -1176,9 +1379,11 @@ err: Log_event* Log_event::read_log_event(const char* buf, uint event_len, const char **error, - const Format_description_log_event *description_event) + const Format_description_log_event *description_event, + my_bool crc_check) { Log_event* ev; + uint8 alg; DBUG_ENTER("Log_event::read_log_event(char*,...)"); DBUG_ASSERT(description_event != 0); DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version)); @@ -1186,14 +1391,68 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, /* Check the integrity */ if (event_len < EVENT_LEN_OFFSET || - buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT || + (uchar)buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT || (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET)) { *error="Sanity check failed"; // Needed to free buffer DBUG_RETURN(NULL); // general sanity check - will fail on a partial read } - uint event_type= buf[EVENT_TYPE_OFFSET]; + uint event_type= (uchar)buf[EVENT_TYPE_OFFSET]; + // all following START events in the current file are without checksum + if (event_type == START_EVENT_V3) + (const_cast< Format_description_log_event *>(description_event))->checksum_alg= BINLOG_CHECKSUM_ALG_OFF; + /* + CRC verification by SQL and Show-Binlog-Events master side. + The caller has to provide @description_event->checksum_alg to + be the last seen FD's (A) descriptor. + If event is FD the descriptor is in it. + Notice, FD of the binlog can be only in one instance and therefore + Show-Binlog-Events executing master side thread needs just to know + the only FD's (A) value - whereas RL can contain more. + In the RL case, the alg is kept in FD_e (@description_event) which is reset + to the newer read-out event after its execution with possibly new alg descriptor. + Therefore in a typical sequence of RL: + {FD_s^0, FD_m, E_m^1} E_m^1 + will be verified with (A) of FD_m. + + See legends definition on MYSQL_BIN_LOG::relay_log_checksum_alg docs + lines (log.h). + + Notice, a pre-checksum FD version forces alg := BINLOG_CHECKSUM_ALG_UNDEF. + */ + alg= (event_type != FORMAT_DESCRIPTION_EVENT) ? + description_event->checksum_alg : get_checksum_alg(buf, event_len); + // Emulate the corruption during reading an event + DBUG_EXECUTE_IF("corrupt_read_log_event_char", + if (event_type != FORMAT_DESCRIPTION_EVENT) + { + char *debug_event_buf_c = (char *)buf; + int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN); + debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos]; + DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event(char*,...): byte on position %d", debug_cor_pos)); + DBUG_SET("-d,corrupt_read_log_event_char"); + } + ); + if (crc_check && + event_checksum_test((uchar *) buf, event_len, alg)) + { +#ifdef MYSQL_CLIENT + *error= "Event crc check failed! Most likely there is event corruption."; + if (force_opt) + { + ev= new Unknown_log_event(buf, description_event); + DBUG_RETURN(ev); + } + else + DBUG_RETURN(NULL); +#else + *error= ER(ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE); + sql_print_error("%s", ER(ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE)); + DBUG_RETURN(NULL); +#endif + } + if (event_type > description_event->number_of_event_types && event_type != FORMAT_DESCRIPTION_EVENT) { @@ -1228,6 +1487,11 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, event_type= new_event_type; } + if (alg != BINLOG_CHECKSUM_ALG_UNDEF && + (event_type == FORMAT_DESCRIPTION_EVENT || + alg != BINLOG_CHECKSUM_ALG_OFF)) + event_len= event_len - BINLOG_CHECKSUM_LEN; + switch(event_type) { case QUERY_EVENT: ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT); @@ -1311,6 +1575,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case INCIDENT_EVENT: ev = new Incident_log_event(buf, event_len, description_event); break; + case ANNOTATE_ROWS_EVENT: + ev = new Annotate_rows_log_event(buf, event_len, description_event); + break; default: DBUG_PRINT("error",("Unknown event code: %d", (int) buf[EVENT_TYPE_OFFSET])); @@ -1319,6 +1586,14 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, } } + if (ev) + { + ev->checksum_alg= alg; + if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + ev->crc= uint4korr(buf + (event_len)); + } + DBUG_PRINT("read_event", ("%s(type_code: %d; event_len: %d)", ev ? ev->get_type_str() : "<unknown>", buf[EVENT_TYPE_OFFSET], @@ -1373,6 +1648,18 @@ void Log_event::print_header(IO_CACHE* file, my_b_printf(file, " server id %lu end_log_pos %s ", (ulong) server_id, llstr(log_pos,llbuff)); + /* print the checksum */ + + if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + { + char checksum_buf[BINLOG_CHECKSUM_LEN * 2 + 4]; // to fit to "0x%lx " + size_t const bytes_written= + my_snprintf(checksum_buf, sizeof(checksum_buf), "0x%08lx ", (ulong) crc); + my_b_printf(file, "%s ", get_type(&binlog_checksum_typelib, checksum_alg)); + my_b_printf(file, checksum_buf, bytes_written); + } + /* mysqlbinlog --hexdump */ if (print_event_info->hexdump_from) { @@ -1718,6 +2005,7 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr, uint64 i64= uint8korr(ptr); /* YYYYMMDDhhmmss */ d= (ulong) (i64 / 1000000); t= (ulong) (i64 % 1000000); + my_b_printf(file, "%04d-%02d-%02d %02d:%02d:%02d", (int) (d / 10000), (int) (d % 10000) / 100, (int) (d % 100), (int) (t / 10000), (int) (t % 10000) / 100, (int) t % 100); @@ -2000,12 +2288,10 @@ end: delete td; } -#ifdef MYSQL_CLIENT void free_table_map_log_event(Table_map_log_event *event) { delete event; } -#endif void Log_event::print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, @@ -2042,6 +2328,9 @@ void Log_event::print_base64(IO_CACHE* file, if (print_event_info->verbose) { Rows_log_event *ev= NULL; + if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF && + checksum_alg != BINLOG_CHECKSUM_ALG_OFF) + size-= BINLOG_CHECKSUM_LEN; // checksum is displayed through the header if (ptr[4] == TABLE_MAP_EVENT) { @@ -2085,15 +2374,11 @@ void Log_event::print_base64(IO_CACHE* file, void Log_event::print_timestamp(IO_CACHE* file, time_t* ts) { struct tm *res; + time_t my_when= when; DBUG_ENTER("Log_event::print_timestamp"); if (!ts) - ts = &when; -#ifdef MYSQL_SERVER // This is always false - struct tm tm_tmp; - localtime_r(ts,(res= &tm_tmp)); -#else + ts = &my_when; res=localtime(ts); -#endif my_b_printf(file,"%02d%02d%02d %2d:%02d:%02d", res->tm_year % 100, @@ -2378,6 +2663,14 @@ bool Query_log_event::write(IO_CACHE* file) start+= host.length; } } + + if (thd && thd->query_start_sec_part_used) + { + *start++= Q_HRNOW; + get_time(); + int3store(start, when_sec_part); + start+= 3; + } /* NOTE: When adding new status vars, please don't forget to update the MAX_SIZE_LOG_EVENT_STATUS in log_event.h and update the function @@ -2404,12 +2697,13 @@ bool Query_log_event::write(IO_CACHE* file) event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len; return (write_header(file, event_length) || - my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) || + wrapper_my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) || write_post_header_for_derived(file) || - my_b_safe_write(file, (uchar*) start_of_status, + wrapper_my_b_safe_write(file, (uchar*) start_of_status, (uint) (start-start_of_status)) || - my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) || - my_b_safe_write(file, (uchar*) query, q_len)) ? 1 : 0; + wrapper_my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) || + wrapper_my_b_safe_write(file, (uchar*) query, q_len) || + write_footer(file)) ? 1 : 0; } /** @@ -2469,7 +2763,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, error_code= errcode; - time(&end_time); + end_time= my_time(0); exec_time = (ulong) (end_time - thd_arg->start_time); /** @todo this means that if we have no catalog, then it is replicated @@ -2661,6 +2955,7 @@ code_name(int code) case Q_CHARSET_DATABASE_CODE: return "Q_CHARSET_DATABASE_CODE"; case Q_TABLE_MAP_FOR_UPDATE_CODE: return "Q_TABLE_MAP_FOR_UPDATE_CODE"; case Q_MASTER_DATA_WRITTEN_CODE: return "Q_MASTER_DATA_WRITTEN_CODE"; + case Q_HRNOW: return "Q_HRNOW"; } sprintf(buf, "CODE#%d", code); return buf; @@ -2877,6 +3172,14 @@ Query_log_event::Query_log_event(const char* buf, uint event_len, CHECK_SPACE(pos, end, host.length); host.str= (char *)pos; pos+= host.length; + break; + } + case Q_HRNOW: + { + CHECK_SPACE(pos, end, 3); + when_sec_part= uint3korr(pos); + pos+= 3; + break; } default: /* That's why you must write status vars in growing order of code */ @@ -2956,7 +3259,7 @@ void Query_log_event::print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info) { // TODO: print the catalog ?? - char buff[40],*end; // Enough for SET TIMESTAMP + char buff[64], *end; // Enough for SET TIMESTAMP bool different_db= 1; uint32 tmp; @@ -2983,6 +3286,11 @@ void Query_log_event::print_query_header(IO_CACHE* file, } end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10); + if (when_sec_part) + { + *end++= '.'; + end=int10_to_str(when_sec_part, end, 10); + } end= strmov(end, print_event_info->delimiter); *end++='\n'; my_b_write(file, (uchar*) buff, (uint) (end-buff)); @@ -3244,7 +3552,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, */ if (is_trans_keyword() || rpl_filter->db_ok(thd->db)) { - thd->set_time((time_t)when); + thd->set_time(when, when_sec_part); thd->set_query_and_id((char*)query_arg, q_len_arg, thd->charset(), next_query_id()); thd->variables.pseudo_thread_id= thread_id; // for temp tables @@ -3417,6 +3725,19 @@ START SLAVE; . Query: '%s'", expected_error, thd->query()); /* If the query was not ignored, it is printed to the general log */ if (!thd->is_error() || thd->stmt_da->sql_errno() != ER_SLAVE_IGNORED_TABLE) general_log_write(thd, COM_QUERY, thd->query(), thd->query_length()); + else + { + /* + Bug#54201: If we skip an INSERT query that uses auto_increment, then we + should reset any @@INSERT_ID set by an Intvar_log_event associated with + the query; otherwise the @@INSERT_ID will linger until the next INSERT + that uses auto_increment and may affect extra triggers on the slave etc. + + We reset INSERT_ID unconditionally; it is probably cheaper than + checking if it is necessary. + */ + thd->auto_inc_intervals_forced.empty(); + } compare_errors: /* @@ -3706,10 +4027,11 @@ bool Start_log_event_v3::write(IO_CACHE* file) int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); if (!dont_set_created) - created= when= get_time(); + created= get_time(); // this sets when and when_sec_part as a side effect int4store(buff + ST_CREATED_OFFSET,created); return (write_header(file, sizeof(buff)) || - my_b_safe_write(file, (uchar*) buff, sizeof(buff))); + wrapper_my_b_safe_write(file, (uchar*) buff, sizeof(buff)) || + write_footer(file)); } #endif @@ -3812,6 +4134,7 @@ int Start_log_event_v3::do_apply_event(Relay_log_info const *rli) old 4.0 (binlog version 2) is not supported; it should not be used for replication with 5.0. + @param server_ver a string containing the server version. */ Format_description_log_event:: @@ -3827,9 +4150,9 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) common_header_len= LOG_EVENT_HEADER_LEN; number_of_event_types= LOG_EVENT_TYPES; /* we'll catch my_malloc() error in is_valid() */ - post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8), + post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8) + + BINLOG_CHECKSUM_ALG_DESC_LEN, MYF(0)); - /* This long list of assignments is not beautiful, but I see no way to make it nicer, as the right members are #defines, not array members, so @@ -3894,6 +4217,13 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN; post_header_len[HEARTBEAT_LOG_EVENT-1]= 0; + // Set header length of the reserved events to 0 + memset(post_header_len + MYSQL_EVENTS_END - 1, 0, + (MARIA_EVENTS_BEGIN - MYSQL_EVENTS_END)*sizeof(uint8)); + + // Set header lengths of Maria events + post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN; + // Sanity-check that all post header lengths are initialized. int i; for (i=0; i<number_of_event_types; i++) @@ -3946,6 +4276,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) break; } calc_server_version_split(); + checksum_alg= (uint8) BINLOG_CHECKSUM_ALG_UNDEF; } @@ -3980,14 +4311,26 @@ Format_description_log_event(const char* buf, if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN) DBUG_VOID_RETURN; /* sanity check */ number_of_event_types= - event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1); + event_len - (LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET + 1); DBUG_PRINT("info", ("common_header_len=%d number_of_event_types=%d", common_header_len, number_of_event_types)); /* If alloc fails, we'll detect it in is_valid() */ + post_header_len= (uint8*) my_memdup((uchar*)buf+ST_COMMON_HEADER_LEN_OFFSET+1, number_of_event_types* - sizeof(*post_header_len), MYF(0)); + sizeof(*post_header_len), + MYF(0)); calc_server_version_split(); + if (!is_version_before_checksum(&server_version_split)) + { + /* the last bytes are the checksum alg desc and value (or value's room) */ + number_of_event_types -= BINLOG_CHECKSUM_ALG_DESC_LEN; + checksum_alg= post_header_len[number_of_event_types]; + } + else + { + checksum_alg= (uint8) BINLOG_CHECKSUM_ALG_UNDEF; + } /* In some previous versions, the events were given other event type @@ -4098,21 +4441,59 @@ Format_description_log_event(const char* buf, #ifndef MYSQL_CLIENT bool Format_description_log_event::write(IO_CACHE* file) { + bool ret; + bool no_checksum; /* We don't call Start_log_event_v3::write() because this would make 2 my_b_safe_write(). */ - uchar buff[FORMAT_DESCRIPTION_HEADER_LEN]; + uchar buff[FORMAT_DESCRIPTION_HEADER_LEN + BINLOG_CHECKSUM_ALG_DESC_LEN]; + size_t rec_size= sizeof(buff); int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); memcpy((char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); if (!dont_set_created) - created= when= get_time(); + created= get_time(); int4store(buff + ST_CREATED_OFFSET,created); buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN; - memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET+1, (uchar*) post_header_len, + memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET + 1, (uchar*) post_header_len, LOG_EVENT_TYPES); - return (write_header(file, sizeof(buff)) || - my_b_safe_write(file, buff, sizeof(buff))); + /* + if checksum is requested + record the checksum-algorithm descriptor next to + post_header_len vector which will be followed by the checksum value. + Master is supposed to trigger checksum computing by binlog_checksum_options, + slave does it via marking the event according to + FD_queue checksum_alg value. + */ + compile_time_assert(sizeof(BINLOG_CHECKSUM_ALG_DESC_LEN == 1)); +#ifndef DBUG_OFF + data_written= 0; // to prepare for need_checksum assert +#endif + buff[FORMAT_DESCRIPTION_HEADER_LEN]= need_checksum() ? + checksum_alg : (uint8) BINLOG_CHECKSUM_ALG_OFF; + /* + FD of checksum-aware server is always checksum-equipped, (V) is in, + regardless of @@global.binlog_checksum policy. + Thereby a combination of (A) == 0, (V) != 0 means + it's the checksum-aware server's FD event that heads checksum-free binlog + file. + Here 0 stands for checksumming OFF to evaluate (V) as 0 is that case. + A combination of (A) != 0, (V) != 0 denotes FD of the checksum-aware server + heading the checksummed binlog. + (A), (V) presence in FD of the checksum-aware server makes the event + 1 + 4 bytes bigger comparing to the former FD. + */ + + if ((no_checksum= (checksum_alg == BINLOG_CHECKSUM_ALG_OFF))) + { + checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway + } + ret= (write_header(file, rec_size) || + wrapper_my_b_safe_write(file, buff, rec_size) || + write_footer(file)); + if (no_checksum) + checksum_alg= BINLOG_CHECKSUM_ALG_OFF; + return ret; } #endif @@ -4207,6 +4588,30 @@ Format_description_log_event::do_shall_skip(Relay_log_info *rli) #endif +static inline void +do_server_version_split(char* version, + Format_description_log_event::master_version_split *split_versions) +{ + char *p= version, *r; + ulong number; + for (uint i= 0; i<=2; i++) + { + number= strtoul(p, &r, 10); + split_versions->ver[i]= (uchar) number; + DBUG_ASSERT(number < 256); // fit in uchar + p= r; + DBUG_ASSERT(!((i == 0) && (*r != '.'))); // should be true in practice + if (*r == '.') + p++; // skip the dot + } + if (strstr(p, "MariaDB") != 0 || strstr(p, "-maria-") != 0) + split_versions->kind= + Format_description_log_event::master_version_split::KIND_MARIADB; + else + split_versions->kind= + Format_description_log_event::master_version_split::KIND_MYSQL; +} + /** Splits the event's 'server_version' string into three numeric pieces stored @@ -4219,24 +4624,67 @@ Format_description_log_event::do_shall_skip(Relay_log_info *rli) */ void Format_description_log_event::calc_server_version_split() { - char *p= server_version, *r; - ulong number; - for (uint i= 0; i<=2; i++) - { - number= strtoul(p, &r, 10); - server_version_split[i]= (uchar)number; - DBUG_ASSERT(number < 256); // fit in uchar - p= r; - DBUG_ASSERT(!((i == 0) && (*r != '.'))); // should be true in practice - if (*r == '.') - p++; // skip the dot - } + do_server_version_split(server_version, &server_version_split); + DBUG_PRINT("info",("Format_description_log_event::server_version_split:" " '%s' %d %d %d", server_version, - server_version_split[0], - server_version_split[1], server_version_split[2])); + server_version_split.ver[0], + server_version_split.ver[1], server_version_split.ver[2])); +} + +static inline ulong +version_product(const Format_description_log_event::master_version_split* version_split) +{ + return ((version_split->ver[0] * 256 + version_split->ver[1]) * 256 + + version_split->ver[2]); +} + +/** + @return TRUE is the event's version is earlier than one that introduced + the replication event checksum. FALSE otherwise. +*/ +bool +Format_description_log_event::is_version_before_checksum(const master_version_split + *version_split) +{ + return version_product(version_split) < + (version_split->kind == master_version_split::KIND_MARIADB ? + checksum_version_product_mariadb : checksum_version_product_mysql); } +/** + @param buf buffer holding serialized FD event + @param len netto (possible checksum is stripped off) length of the event buf + + @return the version-safe checksum alg descriptor where zero + designates no checksum, 255 - the orginator is + checksum-unaware (effectively no checksum) and the actuall + [1-254] range alg descriptor. +*/ +uint8 get_checksum_alg(const char* buf, ulong len) +{ + uint8 ret; + char version[ST_SERVER_VER_LEN]; + Format_description_log_event::master_version_split version_split; + + DBUG_ENTER("get_checksum_alg"); + DBUG_ASSERT(buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT); + + memcpy(version, buf + + buf[LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET] + + ST_SERVER_VER_OFFSET, ST_SERVER_VER_LEN); + version[ST_SERVER_VER_LEN - 1]= 0; + + do_server_version_split(version, &version_split); + ret= Format_description_log_event::is_version_before_checksum(&version_split) ? + (uint8) BINLOG_CHECKSUM_ALG_UNDEF : + * (uint8*) (buf + len - BINLOG_CHECKSUM_LEN - BINLOG_CHECKSUM_ALG_DESC_LEN); + DBUG_ASSERT(ret == BINLOG_CHECKSUM_ALG_OFF || + ret == BINLOG_CHECKSUM_ALG_UNDEF || + ret == BINLOG_CHECKSUM_ALG_CRC32); + DBUG_RETURN(ret); +} + /************************************************************************** Load_log_event methods @@ -4543,8 +4991,8 @@ Load_log_event::Load_log_event(const char *buf, uint event_len, */ if (event_len) copy_log_event(buf, event_len, - ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? - LOAD_HEADER_LEN + + (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? + LOAD_HEADER_LEN + description_event->common_header_len : LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN), description_event); @@ -4581,7 +5029,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, */ if (!(field_lens= (uchar*)sql_ex.init((char*)buf + body_offset, buf_end, - buf[EVENT_TYPE_OFFSET] != LOAD_EVENT))) + (uchar)buf[EVENT_TYPE_OFFSET] != LOAD_EVENT))) DBUG_RETURN(1); data_len = event_len - body_offset; @@ -4819,7 +5267,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, */ if (rpl_filter->db_ok(thd->db)) { - thd->set_time((time_t)when); + thd->set_time(when, when_sec_part); thd->set_query_id(next_query_id()); thd->warning_info->opt_clear_warning_info(thd->query_id); @@ -5093,6 +5541,7 @@ Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg, DBUG_PRINT("enter",("new_log_ident: %s pos: %s flags: %lu", new_log_ident_arg, llstr(pos_arg, buff), (ulong) flags)); #endif + cache_type= EVENT_NO_CACHE; if (flags & DUP_NAME) new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME)); if (flags & RELAY_LOG) @@ -5134,9 +5583,11 @@ bool Rotate_log_event::write(IO_CACHE* file) { char buf[ROTATE_HEADER_LEN]; int8store(buf + R_POS_OFFSET, pos); - return (write_header(file, ROTATE_HEADER_LEN + ident_len) || - my_b_safe_write(file, (uchar*)buf, ROTATE_HEADER_LEN) || - my_b_safe_write(file, (uchar*)new_log_ident, (uint) ident_len)); + return (write_header(file, ROTATE_HEADER_LEN + ident_len) || + wrapper_my_b_safe_write(file, (uchar*) buf, ROTATE_HEADER_LEN) || + wrapper_my_b_safe_write(file, (uchar*) new_log_ident, + (uint) ident_len) || + write_footer(file)); } #endif @@ -5305,7 +5756,8 @@ bool Intvar_log_event::write(IO_CACHE* file) buf[I_TYPE_OFFSET]= (uchar) type; int8store(buf + I_VAL_OFFSET, val); return (write_header(file, sizeof(buf)) || - my_b_safe_write(file, buf, sizeof(buf))); + wrapper_my_b_safe_write(file, buf, sizeof(buf)) || + write_footer(file)); } #endif @@ -5433,7 +5885,8 @@ bool Rand_log_event::write(IO_CACHE* file) int8store(buf + RAND_SEED1_OFFSET, seed1); int8store(buf + RAND_SEED2_OFFSET, seed2); return (write_header(file, sizeof(buf)) || - my_b_safe_write(file, buf, sizeof(buf))); + wrapper_my_b_safe_write(file, buf, sizeof(buf)) || + write_footer(file)); } #endif @@ -5535,8 +5988,9 @@ Xid_log_event(const char* buf, bool Xid_log_event::write(IO_CACHE* file) { DBUG_EXECUTE_IF("do_not_write_xid", return 0;); - return write_header(file, sizeof(xid)) || - my_b_safe_write(file, (uchar*) &xid, sizeof(xid)); + return (write_header(file, sizeof(xid)) || + wrapper_my_b_safe_write(file, (uchar*) &xid, sizeof(xid)) || + write_footer(file)); } #endif @@ -5715,8 +6169,21 @@ User_var_log_event(const char* buf, we keep the flags set to UNDEF_F. */ uint bytes_read= ((val + val_len) - start); - DBUG_ASSERT(bytes_read==data_written || - bytes_read==(data_written-1)); +#ifndef DBUG_OFF + bool old_pre_checksum_fd= description_event->is_version_before_checksum( + &description_event->server_version_split); +#endif + DBUG_ASSERT((bytes_read == data_written - + (old_pre_checksum_fd || + (description_event->checksum_alg == + BINLOG_CHECKSUM_ALG_OFF)) ? + 0 : BINLOG_CHECKSUM_LEN) + || + (bytes_read == data_written -1 - + (old_pre_checksum_fd || + (description_event->checksum_alg == + BINLOG_CHECKSUM_ALG_OFF)) ? + 0 : BINLOG_CHECKSUM_LEN)); if ((data_written - bytes_read) > 0) { flags= (uint) *(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + @@ -5784,11 +6251,12 @@ bool User_var_log_event::write(IO_CACHE* file) event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len; return (write_header(file, event_length) || - my_b_safe_write(file, (uchar*) buf, sizeof(buf)) || - my_b_safe_write(file, (uchar*) name, name_len) || - my_b_safe_write(file, (uchar*) buf1, buf1_length) || - my_b_safe_write(file, pos, val_len) || - my_b_safe_write(file, &flags, unsigned_len)); + wrapper_my_b_safe_write(file, (uchar*) buf, sizeof(buf)) || + wrapper_my_b_safe_write(file, (uchar*) name, name_len) || + wrapper_my_b_safe_write(file, (uchar*) buf1, buf1_length) || + wrapper_my_b_safe_write(file, pos, val_len) || + wrapper_my_b_safe_write(file, &flags, unsigned_len) || + write_footer(file)); } #endif @@ -6315,7 +6783,7 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len, uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1]; if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) || copy_log_event(event_buf,len, - ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? + (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? load_header_len + header_len : (fake_base ? (header_len+load_header_len) : (header_len+load_header_len) + @@ -6554,8 +7022,9 @@ bool Append_block_log_event::write(IO_CACHE* file) uchar buf[APPEND_BLOCK_HEADER_LEN]; int4store(buf + AB_FILE_ID_OFFSET, file_id); return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) || - my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) || - my_b_safe_write(file, (uchar*) block, block_len)); + wrapper_my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) || + wrapper_my_b_safe_write(file, (uchar*) block, block_len) || + write_footer(file)); } #endif @@ -6713,7 +7182,8 @@ bool Delete_file_log_event::write(IO_CACHE* file) uchar buf[DELETE_FILE_HEADER_LEN]; int4store(buf + DF_FILE_ID_OFFSET, file_id); return (write_header(file, sizeof(buf)) || - my_b_safe_write(file, buf, sizeof(buf))); + wrapper_my_b_safe_write(file, buf, sizeof(buf)) || + write_footer(file)); } #endif @@ -6810,7 +7280,8 @@ bool Execute_load_log_event::write(IO_CACHE* file) uchar buf[EXEC_LOAD_HEADER_LEN]; int4store(buf + EL_FILE_ID_OFFSET, file_id); return (write_header(file, sizeof(buf)) || - my_b_safe_write(file, buf, sizeof(buf))); + wrapper_my_b_safe_write(file, buf, sizeof(buf)) || + write_footer(file)); } #endif @@ -6872,16 +7343,17 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli) fname); goto err; } - if (!(lev = (Load_log_event*)Log_event::read_log_event(&file, - (mysql_mutex_t*)0, - rli->relay_log.description_event_for_exec)) || + if (!(lev= (Load_log_event*) + Log_event::read_log_event(&file, + (mysql_mutex_t*)0, + rli->relay_log.description_event_for_exec, + opt_slave_sql_verify_checksum)) || lev->get_type_code() != NEW_LOAD_EVENT) { rli->report(ERROR_LEVEL, 0, "Error in Exec_load event: " "file '%s' appears corrupted", fname); goto err; } - lev->thd = thd; /* lev->do_apply_event should use rli only for errors i.e. should @@ -7044,7 +7516,7 @@ Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file) int4store(buf + 4, fn_pos_start); int4store(buf + 4 + 4, fn_pos_end); *(buf + 4 + 4 + 4)= (uchar) dup_handling; - return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); + return wrapper_my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); } #endif @@ -7280,7 +7752,8 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, m_width(tbl_arg ? tbl_arg->s->fields : 1), m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0) #ifdef HAVE_REPLICATION - , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL) + , m_curr_row(NULL), m_curr_row_end(NULL), + m_key(NULL), m_key_info(NULL), m_key_nr(0) #endif { /* @@ -7328,7 +7801,8 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, #endif m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) - , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL) + , m_curr_row(NULL), m_curr_row_end(NULL), + m_key(NULL), m_key_info(NULL), m_key_nr(0) #endif { DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)"); @@ -7678,7 +8152,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table); } #ifdef HAVE_QUERY_CACHE - query_cache.invalidate_locked_for_write(rli->tables_to_lock); + query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock); #endif } @@ -7705,7 +8179,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) TIMESTAMP column to a table with one. So we call set_time(), like in SBR. Presently it changes nothing. */ - thd->set_time((time_t)when); + thd->set_time(when, when_sec_part); /* Now we are in a statement and will stay in a statement until we @@ -8007,11 +8481,11 @@ bool Rows_log_event::write_data_header(IO_CACHE *file) { int4store(buf + 0, m_table_id); int2store(buf + 4, m_flags); - return (my_b_safe_write(file, buf, 6)); + return (wrapper_my_b_safe_write(file, buf, 6)); }); int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id); int2store(buf + RW_FLAGS_OFFSET, m_flags); - return (my_b_safe_write(file, buf, ROWS_HEADER_LEN)); + return (wrapper_my_b_safe_write(file, buf, ROWS_HEADER_LEN)); } bool Rows_log_event::write_data_body(IO_CACHE*file) @@ -8027,10 +8501,10 @@ bool Rows_log_event::write_data_body(IO_CACHE*file) DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf)); - res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf)); + res= res || wrapper_my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf)); DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); - res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap, + res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); /* TODO[refactor write]: Remove the "down cast" here (and elsewhere). @@ -8039,11 +8513,11 @@ bool Rows_log_event::write_data_body(IO_CACHE*file) { DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap, no_bytes_in_map(&m_cols_ai)); - res= res || my_b_safe_write(file, (uchar*) m_cols_ai.bitmap, + res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols_ai.bitmap, no_bytes_in_map(&m_cols_ai)); } DBUG_DUMP("rows", m_rows_buf, data_size); - res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size); + res= res || wrapper_my_b_safe_write(file, m_rows_buf, (size_t) data_size); return res; @@ -8088,6 +8562,144 @@ void Rows_log_event::print_helper(FILE *file, #endif /************************************************************************** + Annotate_rows_log_event member functions +**************************************************************************/ + +#ifndef MYSQL_CLIENT +Annotate_rows_log_event::Annotate_rows_log_event(THD *thd, + uint16 cache_type_arg) + : Log_event(thd, 0, true), + m_save_thd_query_txt(0), + m_save_thd_query_len(0) +{ + m_query_txt= thd->query(); + m_query_len= thd->query_length(); + cache_type= cache_type_arg; +} +#endif + +Annotate_rows_log_event::Annotate_rows_log_event(const char *buf, + uint event_len, + const Format_description_log_event *desc) + : Log_event(buf, desc), + m_save_thd_query_txt(0), + m_save_thd_query_len(0) +{ + m_query_len= event_len - desc->common_header_len; + m_query_txt= (char*) buf + desc->common_header_len; +} + +Annotate_rows_log_event::~Annotate_rows_log_event() +{ +#ifndef MYSQL_CLIENT + if (m_save_thd_query_txt) + thd->set_query(m_save_thd_query_txt, m_save_thd_query_len); +#endif +} + +int Annotate_rows_log_event::get_data_size() +{ + return m_query_len; +} + +Log_event_type Annotate_rows_log_event::get_type_code() +{ + return ANNOTATE_ROWS_EVENT; +} + +bool Annotate_rows_log_event::is_valid() const +{ + return (m_query_txt != NULL && m_query_len != 0); +} + +#ifndef MYSQL_CLIENT +bool Annotate_rows_log_event::write_data_header(IO_CACHE *file) +{ + return 0; +} +#endif + +#ifndef MYSQL_CLIENT +bool Annotate_rows_log_event::write_data_body(IO_CACHE *file) +{ + return wrapper_my_b_safe_write(file, (uchar*) m_query_txt, m_query_len); +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +void Annotate_rows_log_event::pack_info(Protocol* protocol) +{ + if (m_query_txt && m_query_len) + protocol->store(m_query_txt, m_query_len, &my_charset_bin); +} +#endif + +#ifdef MYSQL_CLIENT +void Annotate_rows_log_event::print(FILE *file, PRINT_EVENT_INFO *pinfo) +{ + if (pinfo->short_form) + return; + + print_header(&pinfo->head_cache, pinfo, TRUE); + my_b_printf(&pinfo->head_cache, "\tAnnotate_rows:\n"); + + char *pbeg; // beginning of the next line + char *pend; // end of the next line + uint cnt= 0; // characters counter + + for (pbeg= m_query_txt; ; pbeg= pend) + { + // skip all \r's and \n's at the beginning of the next line + for (;; pbeg++) + { + if (++cnt > m_query_len) + return; + + if (*pbeg != '\r' && *pbeg != '\n') + break; + } + + // find end of the next line + for (pend= pbeg + 1; + ++cnt <= m_query_len && *pend != '\r' && *pend != '\n'; + pend++) + ; + + // print next line + my_b_write(&pinfo->head_cache, (const uchar*) "#Q> ", 4); + my_b_write(&pinfo->head_cache, (const uchar*) pbeg, pend - pbeg); + my_b_write(&pinfo->head_cache, (const uchar*) "\n", 1); + } +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int Annotate_rows_log_event::do_apply_event(Relay_log_info const *rli) +{ + m_save_thd_query_txt= thd->query(); + m_save_thd_query_len= thd->query_length(); + thd->set_query(m_query_txt, m_query_len); + return 0; +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int Annotate_rows_log_event::do_update_pos(Relay_log_info *rli) +{ + rli->inc_event_relay_log_pos(); + return 0; +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +Log_event::enum_skip_reason +Annotate_rows_log_event::do_shall_skip(Relay_log_info *rli) +{ + return continue_group(rli); +} +#endif + +/************************************************************************** Table_map_log_event member functions and support functions **************************************************************************/ @@ -8587,11 +9199,11 @@ bool Table_map_log_event::write_data_header(IO_CACHE *file) { int4store(buf + 0, m_table_id); int2store(buf + 4, m_flags); - return (my_b_safe_write(file, buf, 6)); + return (wrapper_my_b_safe_write(file, buf, 6)); }); int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id); int2store(buf + TM_FLAGS_OFFSET, m_flags); - return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN)); + return (wrapper_my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN)); } bool Table_map_log_event::write_data_body(IO_CACHE *file) @@ -8615,15 +9227,15 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file) uchar mbuf[sizeof(m_field_metadata_size)]; uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size); - return (my_b_safe_write(file, dbuf, sizeof(dbuf)) || - my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) || - my_b_safe_write(file, tbuf, sizeof(tbuf)) || - my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) || - my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) || - my_b_safe_write(file, m_coltype, m_colcnt) || - my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) || - my_b_safe_write(file, m_field_metadata, m_field_metadata_size), - my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8)); + return (wrapper_my_b_safe_write(file, dbuf, sizeof(dbuf)) || + wrapper_my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) || + wrapper_my_b_safe_write(file, tbuf, sizeof(tbuf)) || + wrapper_my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) || + wrapper_my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) || + wrapper_my_b_safe_write(file, m_coltype, m_colcnt) || + wrapper_my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) || + wrapper_my_b_safe_write(file, m_field_metadata, m_field_metadata_size), + wrapper_my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8)); } #endif @@ -9224,6 +9836,86 @@ record_compare_exit: return result; } + +/** + Find the best key to use when locating the row in @c find_row(). + + A primary key is preferred if it exists; otherwise a unique index is + preferred. Else we pick the index with the smalles rec_per_key value. + + If a suitable key is found, set @c m_key, @c m_key_nr and @c m_key_info + member fields appropriately. + + @returns Error code on failure, 0 on success. +*/ +int Rows_log_event::find_key() +{ + uint i, best_key_nr, last_part; + KEY *key, *best_key; + ulong best_rec_per_key, tmp; + DBUG_ENTER("Rows_log_event::find_key"); + DBUG_ASSERT(m_table); + + best_key_nr= MAX_KEY; + LINT_INIT(best_key); + LINT_INIT(best_rec_per_key); + + /* + Keys are sorted so that any primary key is first, followed by unique keys, + followed by any other. So we will automatically pick the primary key if + it exists. + */ + for (i= 0, key= m_table->key_info; i < m_table->s->keys; i++, key++) + { + if (!m_table->s->keys_in_use.is_set(i)) + continue; + /* + We cannot use a unique key with NULL-able columns to uniquely identify + a row (but we can still select it for range scan below if nothing better + is available). + */ + if ((key->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME) + { + best_key_nr= i; + best_key= key; + break; + } + /* + We can only use a non-unique key if it allows range scans (ie. skip + FULLTEXT indexes and such). + */ + last_part= key->key_parts - 1; + DBUG_PRINT("info", ("Index %s rec_per_key[%u]= %lu", + key->name, last_part, key->rec_per_key[last_part])); + if (!(m_table->file->index_flags(i, last_part, 1) & HA_READ_NEXT)) + continue; + + tmp= key->rec_per_key[last_part]; + if (best_key_nr == MAX_KEY || (tmp > 0 && tmp < best_rec_per_key)) + { + best_key_nr= i; + best_key= key; + best_rec_per_key= tmp; + } + } + + if (best_key_nr == MAX_KEY) + { + m_key_info= NULL; + DBUG_RETURN(0); + } + + // Allocate buffer for key searches + m_key= (uchar *) my_malloc(best_key->key_length, MYF(MY_WME)); + if (m_key == NULL) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + m_key_info= best_key; + m_key_nr= best_key_nr; + + DBUG_RETURN(0);; +} + + /** Locate the current row in event's table. @@ -9323,12 +10015,17 @@ int Rows_log_event::find_row(const Relay_log_info *rli) */ store_record(table,record[1]); - if (table->s->keys > 0 && table->s->keys_in_use.is_set(0)) + if (m_key_info) { - DBUG_PRINT("info",("locating record using primary key (index_read)")); + DBUG_PRINT("info",("locating record using key #%u [%s] (index_read)", + m_key_nr, m_key_info->name)); + /* We use this to test that the correct key is used in test cases. */ + DBUG_EXECUTE_IF("slave_crash_if_wrong_index", + if(0 != strcmp(m_key_info->name,"expected_key")) abort();); - /* The 0th key is active: search the table using the index */ - if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE))) + /* The key is active: search the table using the index */ + if (!table->file->inited && + (error= table->file->ha_index_init(m_key_nr, FALSE))) { DBUG_PRINT("info",("ha_index_init returns error %d",error)); table->file->print_error(error, MYF(0)); @@ -9338,14 +10035,14 @@ int Rows_log_event::find_row(const Relay_log_info *rli) /* Fill key data for the row */ DBUG_ASSERT(m_key); - key_copy(m_key, table->record[0], table->key_info, 0); + key_copy(m_key, table->record[0], m_key_info, 0); /* Don't print debug messages when running valgrind since they can trigger false warnings. */ #ifndef HAVE_valgrind - DBUG_DUMP("key data", m_key, table->key_info->key_length); + DBUG_DUMP("key data", m_key, m_key_info->key_length); #endif /* @@ -9431,6 +10128,8 @@ int Rows_log_event::find_row(const Relay_log_info *rli) record we are looking for is stored in record[1]. */ DBUG_PRINT("info",("non-unique index, scanning it to find matching record")); + /* We use this to test that the correct key is used in test cases. */ + DBUG_EXECUTE_IF("slave_crash_if_index_scan", abort();); while (record_compare(table)) { @@ -9469,6 +10168,8 @@ int Rows_log_event::find_row(const Relay_log_info *rli) else { DBUG_PRINT("info",("locating record using table scan (rnd_next)")); + /* We use this to test that the correct key is used in test cases. */ + DBUG_EXECUTE_IF("slave_crash_if_table_scan", abort();); int restart_count= 0; // Number of times scanning has restarted from top @@ -9588,14 +10289,7 @@ Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability return 0; } - if (m_table->s->keys > 0) - { - // Allocate buffer for key searches - m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME)); - if (!m_key) - return HA_ERR_OUT_OF_MEM; - } - return 0; + return find_key(); } int @@ -9606,6 +10300,7 @@ Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability m_table->file->ha_index_or_rnd_end(); my_free(m_key); m_key= NULL; + m_key_info= NULL; return error; } @@ -9708,13 +10403,9 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len, int Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) { - if (m_table->s->keys > 0) - { - // Allocate buffer for key searches - m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME)); - if (!m_key) - return HA_ERR_OUT_OF_MEM; - } + int err; + if ((err= find_key())) + return err; m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; @@ -9729,6 +10420,7 @@ Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability m_table->file->ha_index_or_rnd_end(); my_free(m_key); // Free for multi_malloc m_key= NULL; + m_key_info= NULL; return error; } @@ -9904,13 +10596,25 @@ Incident_log_event::write_data_header(IO_CACHE *file) DBUG_PRINT("enter", ("m_incident: %d", m_incident)); uchar buf[sizeof(int16)]; int2store(buf, (int16) m_incident); - DBUG_RETURN(my_b_safe_write(file, buf, sizeof(buf))); +#ifndef MYSQL_CLIENT + DBUG_RETURN(wrapper_my_b_safe_write(file, buf, sizeof(buf))); +#else + DBUG_RETURN(my_b_safe_write(file, buf, sizeof(buf))); +#endif } bool Incident_log_event::write_data_body(IO_CACHE *file) { + uchar tmp[1]; DBUG_ENTER("Incident_log_event::write_data_body"); + tmp[0]= (uchar) m_message.length; + crc= my_checksum(crc, (uchar*) tmp, 1); + if (m_message.length > 0) + { + crc= my_checksum(crc, (uchar*) m_message.str, m_message.length); + // todo: report a bug on write_str accepts uint but treats it as uchar + } DBUG_RETURN(write_str(file, m_message.str, (uint) m_message.length)); } |