summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc89
1 files changed, 63 insertions, 26 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 08e9dcf3fe6..1d6aa0aaab1 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -28,9 +28,9 @@
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
-#include "rpl_handler.h"
#include "debug_sync.h"
-#include "log.h" // get_gtid_list_event
+#include "semisync_master.h"
+#include "semisync_slave.h"
enum enum_gtid_until_state {
GTID_UNTIL_NOT_DONE,
@@ -160,6 +160,7 @@ struct binlog_send_info {
bool clear_initial_log_pos;
bool should_stop;
+ size_t dirlen;
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
char *lfn)
@@ -313,16 +314,43 @@ static int reset_transmit_packet(binlog_send_info *info, ushort flags,
packet->length(0);
packet->set("\0", 1, &my_charset_bin);
- if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet)))
+ if (info->thd->semi_sync_slave)
{
- info->error= ER_UNKNOWN_ERROR;
- *errmsg= "Failed to run hook 'reserve_header'";
- ret= 1;
+ if (repl_semisync_master.reserve_sync_header(packet))
+ {
+ info->error= ER_UNKNOWN_ERROR;
+ *errmsg= "Failed to run hook 'reserve_header'";
+ ret= 1;
+ }
}
+
*ev_offset= packet->length();
return ret;
}
+int get_user_var_int(const char *name,
+ long long int *value, int *null_value)
+{
+ bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&current_thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (!entry)
+ return 1;
+ *value= entry->val_int(&null_val);
+ if (null_value)
+ *null_value= null_val;
+ return 0;
+}
+
+inline bool is_semi_sync_slave()
+{
+ int null_value;
+ long long val= 0;
+ get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
+ return val;
+}
+
static int send_file(THD *thd)
{
NET* net = &thd->net;
@@ -1606,6 +1634,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
enum enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg;
slave_connection_state *gtid_state= &info->gtid_state;
slave_connection_state *until_gtid_state= info->until_gtid_state;
+ bool need_sync= false;
if (event_type == GTID_LIST_EVENT &&
info->using_gtid_state && until_gtid_state)
@@ -1916,8 +1945,10 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave);
pos= my_b_tell(log);
- if (RUN_HOOK(binlog_transmit, before_send_event,
- (info->thd, info->flags, packet, info->log_file_name, pos)))
+ if (repl_semisync_master.update_sync_header(info->thd,
+ (uchar*) packet->c_ptr(),
+ info->log_file_name + info->dirlen,
+ pos, &need_sync))
{
info->error= ER_UNKNOWN_ERROR;
return "run 'before_send_event' hook failed";
@@ -1939,8 +1970,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
- if (RUN_HOOK(binlog_transmit, after_send_event,
- (info->thd, info->flags, packet)))
+ if (need_sync && repl_semisync_master.flush_net(info->thd, packet->c_ptr()))
{
info->error= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'";
@@ -2370,7 +2400,7 @@ static int wait_new_events(binlog_send_info *info, /* in */
PSI_stage_info old_stage;
mysql_bin_log.lock_binlog_end_pos();
- info->thd->ENTER_COND(mysql_bin_log.get_log_cond(),
+ info->thd->ENTER_COND(mysql_bin_log.get_bin_log_cond(),
mysql_bin_log.get_binlog_end_pos_lock(),
&stage_master_has_sent_all_binlog_to_slave,
&old_stage);
@@ -2681,7 +2711,7 @@ static int send_one_binlog_file(binlog_send_info *info,
/** end of file or error */
return (int)end_pos;
}
-
+ info->dirlen= dirname_length(info->log_file_name);
/**
* send events from current position up to end_pos
*/
@@ -2703,6 +2733,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name);
binlog_send_info *info= &infoobj;
+ bool has_transmit_started= false;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
@@ -2715,11 +2746,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
if (init_binlog_sender(info, &linfo, log_ident, &pos))
goto err;
- /*
- run hook first when all check has been made that slave seems to
- be requesting a reasonable position. i.e when transmit actually starts
- */
- if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+ has_transmit_started= true;
+
+ /* Check if the dump thread is created by a slave with semisync enabled. */
+ thd->semi_sync_slave = is_semi_sync_slave();
+ if (repl_semisync_master.dump_start(thd, log_ident, pos))
{
info->errmsg= "Failed to run hook 'transmit_start'";
info->error= ER_UNKNOWN_ERROR;
@@ -2841,7 +2872,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
err:
THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
- RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
+ if (has_transmit_started)
+ {
+ repl_semisync_master.dump_end(thd);
+ }
if (info->thd->killed == KILL_SLAVE_SAME_ID)
{
@@ -3307,7 +3341,8 @@ int reset_slave(THD *thd, Master_info* mi)
else if (global_system_variables.log_warnings > 1)
sql_print_information("Deleted Master_info file '%s'.", fname);
- RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
+ if (rpl_semi_sync_slave_enabled)
+ repl_semisync_slave.reset_slave(mi);
err:
mi->unlock_slave_threads();
if (error)
@@ -3809,11 +3844,13 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len,
return 1;
}
- if (mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
- next_log_number))
- return 1;
- RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
- return 0;
+ bool ret= 0;
+ /* Temporarily disable master semisync before reseting master. */
+ repl_semisync_master.before_reset_master();
+ ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
+ next_log_number);
+ repl_semisync_master.after_reset_master();
+ return ret;
}
@@ -3930,7 +3967,7 @@ bool mysql_show_binlog_events(THD* thd)
my_off_t scan_pos = BIN_LOG_HEADER_SIZE;
while (scan_pos < pos)
{
- ev= Log_event::read_log_event(&log, (mysql_mutex_t*)0, description_event,
+ ev= Log_event::read_log_event(&log, description_event,
opt_master_verify_checksum);
scan_pos = my_b_tell(&log);
if (ev == NULL || !ev->is_valid())
@@ -3964,7 +4001,7 @@ bool mysql_show_binlog_events(THD* thd)
my_b_seek(&log, pos);
for (event_count = 0;
- (ev = Log_event::read_log_event(&log, (mysql_mutex_t*) 0,
+ (ev = Log_event::read_log_event(&log,
description_event,
opt_master_verify_checksum)); )
{