diff options
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 274 |
1 files changed, 214 insertions, 60 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 039dd046086..db8dc694502 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1,5 +1,5 @@ /* Copyright (c) 2000, 2012, Oracle and/or its affiliates. - Copyright (c) 2008, 2011, Monty Program Ab + Copyright (c) 2008, 2012, Monty Program Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -34,14 +34,6 @@ my_bool opt_sporadic_binlog_dump_fail = 0; static int binlog_dump_count = 0; #endif -/** - a copy of active_mi->rli->slave_skip_counter, for showing in SHOW VARIABLES, - INFORMATION_SCHEMA.GLOBAL_VARIABLES and @@sql_slave_skip_counter without - taking all the mutexes needed to access active_mi->rli->slave_skip_counter - properly. -*/ -uint sql_slave_skip_counter; - extern TYPELIB binlog_checksum_typelib; /* @@ -492,6 +484,27 @@ static ulonglong get_heartbeat_period(THD * thd) } /* + Lookup the capabilities of the slave, which it announces by setting a value + MARIA_SLAVE_CAPABILITY_XXX in @mariadb_slave_capability. + + Older MariaDB slaves, and other MySQL slaves, do not set + @mariadb_slave_capability, corresponding to a capability of + MARIA_SLAVE_CAPABILITY_UNKNOWN (0). +*/ +static int +get_mariadb_slave_capability(THD *thd) +{ + bool null_value; + const LEX_STRING name= { C_STRING_WITH_LEN("mariadb_slave_capability") }; + const user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry ? + (int)(entry->val_int(&null_value)) : MARIA_SLAVE_CAPABILITY_UNKNOWN; +} + + +/* Function prepares and sends repliation heartbeat event. @param net net object of THD @@ -563,14 +576,68 @@ static int send_heartbeat_event(NET* net, String* packet, static const char * send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, Log_event_type event_type, char *log_file_name, - IO_CACHE *log) + IO_CACHE *log, int mariadb_slave_capability, + ulong ev_offset, uint8 current_checksum_alg) { my_off_t pos; /* Do not send annotate_rows events unless slave requested it. */ - if (event_type == ANNOTATE_ROWS_EVENT && - !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) - return NULL; + if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) + { + if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES) + { + /* This slave can tolerate events omitted from the binlog stream. */ + return NULL; + } + else if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_ANNOTATE) + { + /* + The slave did not request ANNOTATE_ROWS_EVENT (it does not need them as + it will not log them in its own binary log). However, it understands the + event and will just ignore it, and it would break if we omitted it, + leaving a hole in the binlog stream. So just send the event as-is. + */ + } + else + { + /* + The slave does not understand ANNOTATE_ROWS_EVENT. + + Older MariaDB slaves (and MySQL slaves) will break replication if there + are holes in the binlog stream (they will miscompute the binlog offset + and request the wrong position when reconnecting). + + So replace the event with a dummy event of the same size that will be + a no-operation on the slave. + */ + if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) + return "Failed to replace row annotate event with dummy: too small event."; + } + } + + /* + Do not send binlog checkpoint events to a slave that does not understand it. + */ + if (unlikely(event_type == BINLOG_CHECKPOINT_EVENT) && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) + { + if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES) + { + /* This slave can tolerate events omitted from the binlog stream. */ + return NULL; + } + else + { + /* + The slave does not understand BINLOG_CHECKPOINT_EVENT. Send a dummy + event instead, with same length so slave does not get confused about + binlog positions. + */ + if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) + return "Failed to replace binlog checkpoint event with dummy: " + "too small event."; + } + } /* Skip events with the @@skip_replication flag set, if slave requested @@ -628,6 +695,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, NET* net = &thd->net; mysql_mutex_t *log_lock; mysql_cond_t *log_cond; + int mariadb_slave_capability; uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; int old_max_allowed_packet= thd->variables.max_allowed_packet; @@ -653,7 +721,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, heartbeat_ts= &heartbeat_buf; set_timespec_nsec(*heartbeat_ts, 0); } - sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", + mariadb_slave_capability= get_mariadb_slave_capability(thd); + if (global_system_variables.log_warnings > 1) + sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", thd->server_id, log_ident, (ulong)pos); if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) { @@ -765,7 +835,7 @@ impossible position"; this larger than the corresponding packet (query) sent from client to master. */ - thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER; + thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET; /* We can set log_lock now, it does not move (it's a member of @@ -938,7 +1008,9 @@ impossible position"; } if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, - log_file_name, &log))) + log_file_name, &log, + mariadb_slave_capability, ev_offset, + current_checksum_alg))) { errmsg= tmp_msg; my_errno= ER_UNKNOWN_ERROR; @@ -1096,7 +1168,9 @@ impossible position"; if (read_packet && (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, - log_file_name, &log))) + log_file_name, &log, + mariadb_slave_capability, ev_offset, + current_checksum_alg))) { errmsg= tmp_msg; my_errno= ER_UNKNOWN_ERROR; @@ -1223,15 +1297,28 @@ err: @retval 0 success @retval 1 error + @retval -1 fatal error */ + int start_slave(THD* thd , Master_info* mi, bool net_report) { int slave_errno= 0; int thread_mask; + char master_info_file_tmp[FN_REFLEN]; + char relay_log_info_file_tmp[FN_REFLEN]; DBUG_ENTER("start_slave"); if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0)) - DBUG_RETURN(1); + DBUG_RETURN(-1); + + create_logfile_name_with_suffix(master_info_file_tmp, + sizeof(master_info_file_tmp), + master_info_file, 0, &mi->connection_name); + create_logfile_name_with_suffix(relay_log_info_file_tmp, + sizeof(relay_log_info_file_tmp), + relay_log_info_file, 0, + &mi->connection_name); + lock_slave_threads(mi); // this allows us to cleanly read slave_running // Get a mask of _stopped_ threads init_thread_mask(&thread_mask,mi,1 /* inverse */); @@ -1245,7 +1332,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) thread_mask&= thd->lex->slave_thd_opt; if (thread_mask) //some threads are stopped, start them { - if (init_master_info(mi,master_info_file,relay_log_info_file, 0, + if (init_master_info(mi,master_info_file_tmp,relay_log_info_file_tmp, 0, thread_mask)) slave_errno=ER_MASTER_INFO; else if (server_id_supplied && *mi->host) @@ -1319,10 +1406,11 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) if (!slave_errno) slave_errno = start_slave_threads(0 /*no mutex */, - 1 /* wait for start */, - mi, - master_info_file,relay_log_info_file, - thread_mask); + 1 /* wait for start */, + mi, + master_info_file_tmp, + relay_log_info_file_tmp, + thread_mask); } else slave_errno = ER_BAD_SLAVE; @@ -1339,11 +1427,11 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) if (slave_errno) { if (net_report) - my_message(slave_errno, ER(slave_errno), MYF(0)); - DBUG_RETURN(1); + my_error(slave_errno, MYF(0), + (int) mi->connection_name.length, + mi->connection_name.str); + DBUG_RETURN(slave_errno == ER_BAD_SLAVE ? -1 : 1); } - else if (net_report) - my_ok(thd); DBUG_RETURN(0); } @@ -1361,17 +1449,17 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) @retval 0 success @retval 1 error + @retval -1 error */ + int stop_slave(THD* thd, Master_info* mi, bool net_report ) { - DBUG_ENTER("stop_slave"); - int slave_errno; - if (!thd) - thd = current_thd; + DBUG_ENTER("stop_slave"); + DBUG_PRINT("enter",("Connection: %s", mi->connection_name.str)); if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0)) - DBUG_RETURN(1); + DBUG_RETURN(-1); THD_STAGE_INFO(thd, stage_killing_slave); int thread_mask; lock_slave_threads(mi); @@ -1406,8 +1494,6 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report ) my_message(slave_errno, ER(slave_errno), MYF(0)); DBUG_RETURN(1); } - else if (net_report) - my_ok(thd); DBUG_RETURN(0); } @@ -1431,15 +1517,18 @@ int reset_slave(THD *thd, Master_info* mi) int thread_mask= 0, error= 0; uint sql_errno=ER_UNKNOWN_ERROR; const char* errmsg= "Unknown error occured while reseting slave"; + char master_info_file_tmp[FN_REFLEN]; + char relay_log_info_file_tmp[FN_REFLEN]; DBUG_ENTER("reset_slave"); lock_slave_threads(mi); init_thread_mask(&thread_mask,mi,0 /* not inverse */); if (thread_mask) // We refuse if any slave thread is running { - sql_errno= ER_SLAVE_MUST_STOP; - error=1; - goto err; + unlock_slave_threads(mi); + my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, + mi->connection_name.str); + DBUG_RETURN(ER_SLAVE_MUST_STOP); } ha_reset_slave(thd); @@ -1466,22 +1555,35 @@ int reset_slave(THD *thd, Master_info* mi) // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0 end_master_info(mi); + // and delete these two files - fn_format(fname, master_info_file, mysql_data_home, "", 4+32); + create_logfile_name_with_suffix(master_info_file_tmp, + sizeof(master_info_file_tmp), + master_info_file, 0, &mi->connection_name); + create_logfile_name_with_suffix(relay_log_info_file_tmp, + sizeof(relay_log_info_file_tmp), + relay_log_info_file, 0, &mi->connection_name); + + fn_format(fname, master_info_file_tmp, mysql_data_home, "", 4+32); if (mysql_file_stat(key_file_master_info, fname, &stat_area, MYF(0)) && mysql_file_delete(key_file_master_info, fname, MYF(MY_WME))) { error=1; goto err; } + else if (global_system_variables.log_warnings > 1) + sql_print_information("Deleted Master_info file '%s'.", fname); + // delete relay_log_info_file - fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32); + fn_format(fname, relay_log_info_file_tmp, mysql_data_home, "", 4+32); if (mysql_file_stat(key_file_relay_log_info, fname, &stat_area, MYF(0)) && mysql_file_delete(key_file_relay_log_info, fname, MYF(MY_WME))) { error=1; goto err; } + else if (global_system_variables.log_warnings > 1) + sql_print_information("Deleted Master_info file '%s'.", fname); RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); err: @@ -1561,20 +1663,12 @@ bool change_master(THD* thd, Master_info* mi) char saved_host[HOSTNAME_LENGTH + 1]; uint saved_port; char saved_log_name[FN_REFLEN]; + char master_info_file_tmp[FN_REFLEN]; + char relay_log_info_file_tmp[FN_REFLEN]; my_off_t saved_log_pos; - DBUG_ENTER("change_master"); - - lock_slave_threads(mi); - init_thread_mask(&thread_mask,mi,0 /*not inverse*/); LEX_MASTER_INFO* lex_mi= &thd->lex->mi; - if (thread_mask) // We refuse if any slave thread is running - { - my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0)); - ret= TRUE; - goto err; - } + DBUG_ENTER("change_master"); - THD_STAGE_INFO(thd, stage_changing_master); /* We need to check if there is an empty master_host. Otherwise change master succeeds, a master.info file is created containing @@ -1582,17 +1676,61 @@ bool change_master(THD* thd, Master_info* mi) is thrown stating that the server is not configured as slave. (See BUG#28796). */ - if(lex_mi->host && !*lex_mi->host) + if (lex_mi->host && !*lex_mi->host) { my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST"); - unlock_slave_threads(mi); DBUG_RETURN(TRUE); } - // TODO: see if needs re-write - if (init_master_info(mi, master_info_file, relay_log_info_file, 0, + if (master_info_index->check_duplicate_master_info(&lex_mi->connection_name, + lex_mi->host, + lex_mi->port)) + DBUG_RETURN(TRUE); + + lock_slave_threads(mi); + init_thread_mask(&thread_mask,mi,0 /*not inverse*/); + if (thread_mask) // We refuse if any slave thread is running + { + my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, + mi->connection_name.str); + ret= TRUE; + goto err; + } + + THD_STAGE_INFO(thd, stage_changing_master); + + create_logfile_name_with_suffix(master_info_file_tmp, + sizeof(master_info_file_tmp), + master_info_file, 0, &mi->connection_name); + create_logfile_name_with_suffix(relay_log_info_file_tmp, + sizeof(relay_log_info_file_tmp), + relay_log_info_file, 0, &mi->connection_name); + + /* if new Master_info doesn't exists, add it */ + if (!master_info_index->get_master_info(&mi->connection_name, + MYSQL_ERROR::WARN_LEVEL_NOTE)) + { + if (master_info_index->add_master_info(mi, TRUE)) + { + my_error(ER_MASTER_INFO, MYF(0), + (int) lex_mi->connection_name.length, + lex_mi->connection_name.str); + ret= TRUE; + goto err; + } + } + if (global_system_variables.log_warnings > 1) + sql_print_information("Master: '%.*s' Master_info_file: '%s' " + "Relay_info_file: '%s'", + (int) mi->connection_name.length, + mi->connection_name.str, + master_info_file_tmp, relay_log_info_file_tmp); + + if (init_master_info(mi, master_info_file_tmp, relay_log_info_file_tmp, 0, thread_mask)) { - my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0)); + my_error(ER_MASTER_INFO, MYF(0), + (int) lex_mi->connection_name.length, + lex_mi->connection_name.str); ret= TRUE; goto err; } @@ -1860,7 +1998,7 @@ int reset_master(THD* thd) return 1; } - if (mysql_bin_log.reset_logs(thd)) + if (mysql_bin_log.reset_logs(thd, 1)) return 1; RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); return 0; @@ -1886,6 +2024,7 @@ bool mysql_show_binlog_events(THD* thd) File file = -1; MYSQL_BIN_LOG *binary_log= NULL; int old_max_allowed_packet= thd->variables.max_allowed_packet; + Master_info *mi= 0; LOG_INFO linfo; DBUG_ENTER("mysql_show_binlog_events"); @@ -1915,10 +2054,15 @@ bool mysql_show_binlog_events(THD* thd) } else /* showing relay log contents */ { - if (!active_mi) + mysql_mutex_lock(&LOCK_active_mi); + if (!(mi= master_info_index-> + get_master_info(&thd->variables.default_master_connection, + MYSQL_ERROR::WARN_LEVEL_ERROR))) + { + mysql_mutex_unlock(&LOCK_active_mi); DBUG_RETURN(TRUE); - - binary_log= &(active_mi->rli.relay_log); + } + binary_log= &(mi->rli.relay_log); } if (binary_log->is_open()) @@ -1932,6 +2076,13 @@ bool mysql_show_binlog_events(THD* thd) mysql_mutex_t *log_lock = binary_log->get_log_lock(); Log_event* ev; + if (mi) + { + /* We can unlock the mutex as we have a lock on the file */ + mysql_mutex_unlock(&LOCK_active_mi); + mi= 0; + } + unit->set_limit(thd->lex->current_select); limit_start= unit->offset_limit_cnt; limit_end= unit->select_limit_cnt; @@ -2002,7 +2153,7 @@ bool mysql_show_binlog_events(THD* thd) description_event->checksum_alg= ev->checksum_alg; if (event_count >= limit_start && - ev->net_send(protocol, linfo.log_file_name, pos)) + ev->net_send(thd, protocol, linfo.log_file_name, pos)) { errmsg = "Net error"; delete ev; @@ -2026,6 +2177,9 @@ bool mysql_show_binlog_events(THD* thd) mysql_mutex_unlock(log_lock); } + else if (mi) + mysql_mutex_unlock(&LOCK_active_mi); + // Check that linfo is still on the function scope. DEBUG_SYNC(thd, "after_show_binlog_events"); |