diff options
author | He Zhenxing <zhenxing.he@sun.com> | 2009-09-26 12:49:49 +0800 |
---|---|---|
committer | He Zhenxing <zhenxing.he@sun.com> | 2009-09-26 12:49:49 +0800 |
commit | 623ed58cfda0aef6b6bf545a4200357a58a8a4cc (patch) | |
tree | 28e6a4c77de3c3073b4dbe0b0e09e019adeaa556 /sql/sql_repl.cc | |
parent | e465d113832aeac61a36902c7976d455e1525234 (diff) | |
download | mariadb-git-623ed58cfda0aef6b6bf545a4200357a58a8a4cc.tar.gz |
Backporting WL#4398 WL#1720
Backporting BUG#44058 BUG#42244 BUG#45672 BUG#45673
Backporting BUG#45819 BUG#45973 BUG#39012
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 164 |
1 files changed, 126 insertions, 38 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 0ec8d91214c..671f6785640 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -21,6 +21,7 @@ #include "log_event.h" #include "rpl_filter.h" #include <my_dir.h> +#include "rpl_handler.h" int max_binlog_dump_events = 0; // unlimited my_bool opt_sporadic_binlog_dump_fail = 0; @@ -80,6 +81,32 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, DBUG_RETURN(0); } +/* + Reset thread transmit packet buffer for event sending + + This function allocates header bytes for event transmission, and + should be called before store the event data to the packet buffer. +*/ +static int reset_transmit_packet(THD *thd, ushort flags, + ulong *ev_offset, const char **errmsg) +{ + int ret= 0; + String *packet= &thd->packet; + + /* reserve and set default header */ + packet->length(0); + packet->set("\0", 1, &my_charset_bin); + + if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet))) + { + *errmsg= "Failed to run hook 'reserve_header'"; + my_errno= ER_UNKNOWN_ERROR; + ret= 1; + } + *ev_offset= packet->length(); + return ret; +} + static int send_file(THD *thd) { NET* net = &thd->net; @@ -346,6 +373,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, LOG_INFO linfo; char *log_file_name = linfo.log_file_name; char search_file_name[FN_REFLEN], *name; + + ulong ev_offset; + IO_CACHE log; File file = -1; String* packet = &thd->packet; @@ -361,6 +391,14 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); bzero((char*) &log,sizeof(log)); + sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", + thd->server_id, log_ident, (ulong)pos); + if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) + { + errmsg= "Failed to run hook 'transmit_start'"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } #ifndef DBUG_OFF if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2)) @@ -416,11 +454,9 @@ impossible position"; goto err; } - /* - We need to start a packet with something other than 255 - to distinguish it from error - */ - packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */ + /* reset transmit packet for the fake rotate event below */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; /* Tell the client about the log name with a fake Rotate event; @@ -460,7 +496,7 @@ impossible position"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; } - packet->set("\0", 1, &my_charset_bin); + /* Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become this larger than the corresponding packet (query) sent @@ -476,6 +512,11 @@ impossible position"; log_lock = mysql_bin_log.get_log_lock(); if (pos > BIN_LOG_HEADER_SIZE) { + /* reset transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* Try to find a Format_description_log_event at the beginning of the binlog @@ -483,29 +524,30 @@ impossible position"; if (!(error = Log_event::read_log_event(&log, packet, log_lock))) { /* - The packet has offsets equal to the normal offsets in a binlog - event +1 (the first character is \0). + The packet has offsets equal to the normal offsets in a + binlog event + ev_offset (the first ev_offset characters are + the header (default \0)). */ DBUG_PRINT("info", ("Looked for a Format_description_log_event, found event type %d", - (*packet)[EVENT_TYPE_OFFSET+1])); - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + (*packet)[EVENT_TYPE_OFFSET+ev_offset])); + if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT) { - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & + binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] & LOG_EVENT_BINLOG_IN_USE_F); - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; /* mark that this event with "log_pos=0", so the slave should not increment master's binlog position (rli->group_master_log_pos) */ - int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0); + int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0); /* if reconnect master sends FD event with `created' as 0 to avoid destroying temp tables. */ int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ - ST_CREATED_OFFSET+1, (ulong) 0); + ST_CREATED_OFFSET+ev_offset, (ulong) 0); /* send it */ if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { @@ -531,8 +573,6 @@ impossible position"; Format_description_log_event will be found naturally if it is written. */ } - /* reset the packet as we wrote to it in any case */ - packet->set("\0", 1, &my_charset_bin); } /* end of if (pos > BIN_LOG_HEADER_SIZE); */ else { @@ -544,6 +584,12 @@ impossible position"; while (!net->error && net->vio != 0 && !thd->killed) { + Log_event_type event_type= UNKNOWN_EVENT; + + /* reset the transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; while (!(error = Log_event::read_log_event(&log, packet, log_lock))) { #ifndef DBUG_OFF @@ -556,15 +602,25 @@ impossible position"; } #endif - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]); + if (event_type == FORMAT_DESCRIPTION_EVENT) { - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & + binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] & LOG_EVENT_BINLOG_IN_USE_F); - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; } - else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) + else if (event_type == STOP_EVENT) binlog_can_be_corrupted= FALSE; + pos = my_b_tell(&log); + if (RUN_HOOK(binlog_transmit, before_send_event, + (thd, flags, packet, log_file_name, pos))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "run 'before_send_event' hook failed"; + goto err; + } + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { errmsg = "Failed on my_net_write()"; @@ -572,9 +628,8 @@ impossible position"; goto err; } - DBUG_PRINT("info", ("log event code %d", - (*packet)[LOG_EVENT_OFFSET+1] )); - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + DBUG_PRINT("info", ("log event code %d", event_type)); + if (event_type == LOAD_EVENT) { if (send_file(thd)) { @@ -583,7 +638,17 @@ impossible position"; goto err; } } - packet->set("\0", 1, &my_charset_bin); + + if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + { + errmsg= "Failed to run hook 'after_send_event'"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + + /* reset transmit packet for next loop */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; } /* @@ -634,6 +699,11 @@ impossible position"; } #endif + /* reset the transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* No one will update the log while we are reading now, but we'll be quick and just read one record @@ -650,6 +720,7 @@ impossible position"; /* we read successfully, so we'll need to send it to the slave */ pthread_mutex_unlock(log_lock); read_packet = 1; + event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]); break; case LOG_READ_EOF: @@ -676,8 +747,17 @@ impossible position"; } if (read_packet) - { - thd_proc_info(thd, "Sending binlog event to slave"); + { + thd_proc_info(thd, "Sending binlog event to slave"); + pos = my_b_tell(&log); + if (RUN_HOOK(binlog_transmit, before_send_event, + (thd, flags, packet, log_file_name, pos))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "run 'before_send_event' hook failed"; + goto err; + } + if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ) { errmsg = "Failed on my_net_write()"; @@ -685,7 +765,7 @@ impossible position"; goto err; } - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + if (event_type == LOAD_EVENT) { if (send_file(thd)) { @@ -694,11 +774,13 @@ impossible position"; goto err; } } - packet->set("\0", 1, &my_charset_bin); - /* - No need to net_flush because we will get to flush later when - we hit EOF pretty quick - */ + + if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "Failed to run hook 'after_send_event'"; + goto err; + } } if (fatal_error) @@ -734,6 +816,10 @@ impossible position"; end_io_cache(&log); (void) my_close(file, MYF(MY_WME)); + /* reset transmit packet for the possible fake rotate event */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* Call fake_rotate_event() in case the previous log (the one which we have just finished reading) did not contain a Rotate event @@ -750,9 +836,6 @@ impossible position"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; } - - packet->length(0); - packet->append('\0'); } } @@ -760,6 +843,7 @@ end: end_io_cache(&log); (void)my_close(file, MYF(MY_WME)); + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); my_eof(thd); thd_proc_info(thd, "Waiting to finalize termination"); pthread_mutex_lock(&LOCK_thread_count); @@ -770,6 +854,7 @@ end: err: thd_proc_info(thd, "Waiting to finalize termination"); end_io_cache(&log); + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); /* Exclude iteration through thread list this is needed for purge_logs() - it will iterate through @@ -1064,6 +1149,7 @@ int reset_slave(THD *thd, Master_info* mi) goto err; } + RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); err: unlock_slave_threads(mi); if (error) @@ -1363,7 +1449,11 @@ int reset_master(THD* thd) ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG)); return 1; } - return mysql_bin_log.reset_logs(thd); + + if (mysql_bin_log.reset_logs(thd)) + return 1; + RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); + return 0; } int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, @@ -1836,5 +1926,3 @@ int init_replication_sys_vars() } #endif /* HAVE_REPLICATION */ - - |