summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAleksey Midenkov <midenok@gmail.com>2017-12-21 11:16:42 +0300
committerAleksey Midenkov <midenok@gmail.com>2017-12-21 11:16:42 +0300
commit5c0a19c873382c9fec696f827e6766f61c6682af (patch)
treec39d60a5557669b779ad72aac6c10abef3ed0eba /sql
parent5c760d952b8ae8a8722b206da3de0ebbad4978e5 (diff)
parent9ec2479778269fb33194c088216119d4f1dca58d (diff)
downloadmariadb-git-5c0a19c873382c9fec696f827e6766f61c6682af.tar.gz
System Versioning 1.0 pre7
Merge branch '10.3' into trunk
Diffstat (limited to 'sql')
-rw-r--r--sql/CMakeLists.txt6
-rw-r--r--sql/events.cc2
-rw-r--r--sql/field.cc9
-rw-r--r--sql/field.h14
-rw-r--r--sql/handler.cc152
-rw-r--r--sql/handler.h16
-rw-r--r--sql/log.cc158
-rw-r--r--sql/log.h96
-rw-r--r--sql/log_event.cc8
-rw-r--r--sql/log_event.h1
-rw-r--r--sql/mysqld.cc107
-rw-r--r--sql/mysqld.h6
-rw-r--r--sql/replication.h14
-rw-r--r--sql/rpl_handler.cc553
-rw-r--r--sql/rpl_handler.h216
-rw-r--r--sql/rpl_mi.h5
-rw-r--r--sql/rpl_parallel.cc6
-rw-r--r--sql/rpl_rli.cc8
-rw-r--r--sql/semisync.cc32
-rw-r--r--sql/semisync.h73
-rw-r--r--sql/semisync_master.cc1352
-rw-r--r--sql/semisync_master.h674
-rw-r--r--sql/semisync_master_ack_receiver.cc303
-rw-r--r--sql/semisync_master_ack_receiver.h119
-rw-r--r--sql/semisync_slave.cc251
-rw-r--r--sql/semisync_slave.h115
-rw-r--r--sql/share/errmsg-utf8.txt4
-rw-r--r--sql/slave.cc45
-rw-r--r--sql/sql_class.cc16
-rw-r--r--sql/sql_class.h22
-rw-r--r--sql/sql_lex.h2
-rw-r--r--sql/sql_parse.cc9
-rw-r--r--sql/sql_partition.cc5
-rw-r--r--sql/sql_repl.cc89
-rw-r--r--sql/sql_table.cc5
-rw-r--r--sql/sql_view.cc4
-rw-r--r--sql/sql_yacc.yy21
-rw-r--r--sql/sys_vars.cc192
-rw-r--r--sql/transaction.cc35
39 files changed, 3707 insertions, 1038 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 0697b53e414..c362ba4a8ee 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -36,7 +36,7 @@ ELSE()
ENDIF()
INCLUDE_DIRECTORIES(
-${CMAKE_SOURCE_DIR}/include
+${CMAKE_SOURCE_DIR}/include
${CMAKE_SOURCE_DIR}/sql
${PCRE_INCLUDES}
${ZLIB_INCLUDE_DIR}
@@ -122,7 +122,7 @@ SET (SQL_SOURCE
rpl_rli.cc rpl_mi.cc sql_servers.cc sql_audit.cc
sql_connect.cc scheduler.cc sql_partition_admin.cc
sql_profile.cc event_parse_data.cc sql_alter.cc
- sql_signal.cc rpl_handler.cc mdl.cc sql_admin.cc
+ sql_signal.cc mdl.cc sql_admin.cc
transaction.cc sys_vars.cc sql_truncate.cc datadict.cc
sql_reload.cc item_inetfunc.cc
@@ -138,6 +138,8 @@ SET (SQL_SOURCE
my_apc.cc mf_iocache_encr.cc item_jsonfunc.cc
my_json_writer.cc
rpl_gtid.cc rpl_parallel.cc
+ semisync.cc semisync_master.cc semisync_slave.cc
+ semisync_master_ack_receiver.cc
sql_type.cc
item_windowfunc.cc sql_window.cc
sql_cte.cc
diff --git a/sql/events.cc b/sql/events.cc
index 3ad546217a7..36f03e26125 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -421,7 +421,7 @@ Events::create_event(THD *thd, Event_parse_data *parse_data)
DBUG_RETURN(ret);
#ifdef WITH_WSREP
error:
- DBUG_RETURN(TRUE);
+ DBUG_RETURN(true);
#endif /* WITH_WSREP */
}
diff --git a/sql/field.cc b/sql/field.cc
index 21f869d8a07..8cc9e7d1223 100644
--- a/sql/field.cc
+++ b/sql/field.cc
@@ -4741,6 +4741,15 @@ double Field_double::val_real(void)
}
+longlong Field_double::val_int_from_real(bool want_unsigned_result)
+{
+ Converter_double_to_longlong conv(val_real(), want_unsigned_result);
+ if (!want_unsigned_result && conv.error())
+ conv.push_warning(get_thd(), Field_double::val_real(), false);
+ return conv.result();
+}
+
+
my_decimal *Field_real::val_decimal(my_decimal *decimal_value)
{
ASSERT_COLUMN_MARKED_FOR_READ;
diff --git a/sql/field.h b/sql/field.h
index 42a3b9e58bd..1e72fd16e6a 100644
--- a/sql/field.h
+++ b/sql/field.h
@@ -850,6 +850,10 @@ public:
}
virtual double val_real(void)=0;
virtual longlong val_int(void)=0;
+ virtual ulonglong val_uint(void)
+ {
+ return (ulonglong) val_int();
+ }
virtual bool val_bool(void)= 0;
virtual my_decimal *val_decimal(my_decimal *);
inline String *val_str(String *str) { return val_str(str, str); }
@@ -2278,6 +2282,7 @@ private:
class Field_double :public Field_real {
+ longlong val_int_from_real(bool want_unsigned_result);
public:
Field_double(uchar *ptr_arg, uint32 len_arg, uchar *null_ptr_arg,
uchar null_bit_arg,
@@ -2315,13 +2320,8 @@ public:
int store(longlong nr, bool unsigned_val);
int reset(void) { bzero(ptr,sizeof(double)); return 0; }
double val_real(void);
- longlong val_int(void)
- {
- Converter_double_to_longlong conv(Field_double::val_real(), false);
- if (conv.error())
- conv.push_warning(get_thd(), Field_double::val_real(), false);
- return conv.result();
- }
+ longlong val_int(void) { return val_int_from_real(false); }
+ ulonglong val_uint(void) { return (ulonglong) val_int_from_real(true); }
String *val_str(String*,String *);
bool send_binary(Protocol *protocol);
int cmp(const uchar *,const uchar *);
diff --git a/sql/handler.cc b/sql/handler.cc
index 5b8aaa99c5a..00c38f0dd74 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -23,7 +23,7 @@
#include "mariadb.h"
#include "sql_priv.h"
#include "unireg.h"
-#include "rpl_handler.h"
+#include "rpl_rli.h"
#include "sql_cache.h" // query_cache, query_cache_*
#include "sql_connect.h" // global_table_stats
#include "key.h" // key_copy, key_unpack, key_cmp_if_same, key_cmp
@@ -50,6 +50,7 @@
#ifdef WITH_ARIA_STORAGE_ENGINE
#include "../storage/maria/ha_maria.h"
#endif
+#include "semisync_master.h"
#include "wsrep_mysqld.h"
#include "wsrep.h"
@@ -1518,7 +1519,10 @@ done:
mysql_mutex_assert_not_owner(mysql_bin_log.get_log_lock());
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
- RUN_HOOK(transaction, after_commit, (thd, FALSE));
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.wait_after_commit(thd, all);
+ DEBUG_SYNC(thd, "after_group_after_commit");
+#endif
goto end;
/* Come here if error and we need to rollback. */
@@ -1763,7 +1767,9 @@ int ha_rollback_trans(THD *thd, bool all)
push_warning(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK));
- (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.wait_after_rollback(thd, all);
+#endif
DBUG_RETURN(error);
}
@@ -2021,6 +2027,97 @@ int ha_recover(HASH *commit_list)
}
/**
+ return the XID as it appears in the SQL function's arguments.
+ So this string can be passed to XA START, XA PREPARE etc...
+
+ @note
+ the 'buf' has to have space for at least SQL_XIDSIZE bytes.
+*/
+
+
+/*
+ 'a'..'z' 'A'..'Z', '0'..'9'
+ and '-' '_' ' ' symbols don't have to be
+ converted.
+*/
+
+static const char xid_needs_conv[128]=
+{
+ 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
+ 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
+ 0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,
+ 0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,
+ 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
+ 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,
+ 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
+ 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1
+};
+
+uint get_sql_xid(XID *xid, char *buf)
+{
+ int tot_len= xid->gtrid_length + xid->bqual_length;
+ int i;
+ const char *orig_buf= buf;
+
+ for (i=0; i<tot_len; i++)
+ {
+ uchar c= ((uchar *) xid->data)[i];
+ if (c >= 128 || xid_needs_conv[c])
+ break;
+ }
+
+ if (i >= tot_len)
+ {
+ /* No need to convert characters to hexadecimals. */
+ *buf++= '\'';
+ memcpy(buf, xid->data, xid->gtrid_length);
+ buf+= xid->gtrid_length;
+ *buf++= '\'';
+ if (xid->bqual_length > 0 || xid->formatID != 1)
+ {
+ *buf++= ',';
+ *buf++= '\'';
+ memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length);
+ buf+= xid->bqual_length;
+ *buf++= '\'';
+ }
+ }
+ else
+ {
+ *buf++= 'X';
+ *buf++= '\'';
+ for (i= 0; i < xid->gtrid_length; i++)
+ {
+ *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
+ *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
+ }
+ *buf++= '\'';
+ if (xid->bqual_length > 0 || xid->formatID != 1)
+ {
+ *buf++= ',';
+ *buf++= 'X';
+ *buf++= '\'';
+ for (; i < tot_len; i++)
+ {
+ *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
+ *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
+ }
+ *buf++= '\'';
+ }
+ }
+
+ if (xid->formatID != 1)
+ {
+ *buf++= ',';
+ buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf,
+ MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID);
+ }
+
+ return buf - orig_buf;
+}
+
+
+/**
return the list of XID's to a client, the same way SHOW commands do.
@note
@@ -2029,7 +2126,8 @@ int ha_recover(HASH *commit_list)
It can be easily fixed later, if necessary.
*/
-static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol)
+static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol,
+ char *data, uint data_len, CHARSET_INFO *data_cs)
{
if (xs->xa_state == XA_PREPARED)
{
@@ -2037,8 +2135,7 @@ static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol)
protocol->store_longlong((longlong) xs->xid.formatID, FALSE);
protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE);
protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE);
- protocol->store(xs->xid.data, xs->xid.gtrid_length + xs->xid.bqual_length,
- &my_charset_bin);
+ protocol->store(data, data_len, data_cs);
if (protocol->write())
return TRUE;
}
@@ -2046,11 +2143,28 @@ static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol)
}
+static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol)
+{
+ return xa_recover_callback(xs, protocol, xs->xid.data,
+ xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin);
+}
+
+
+static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol)
+{
+ char buf[SQL_XIDSIZE];
+ uint len= get_sql_xid(&xs->xid, buf);
+ return xa_recover_callback(xs, protocol, buf, len,
+ &my_charset_utf8_general_ci);
+}
+
+
bool mysql_xa_recover(THD *thd)
{
List<Item> field_list;
Protocol *protocol= thd->protocol;
MEM_ROOT *mem_root= thd->mem_root;
+ my_hash_walk_action action;
DBUG_ENTER("mysql_xa_recover");
field_list.push_back(new (mem_root)
@@ -2062,16 +2176,32 @@ bool mysql_xa_recover(THD *thd)
field_list.push_back(new (mem_root)
Item_int(thd, "bqual_length", 0,
MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
- field_list.push_back(new (mem_root)
- Item_empty_string(thd, "data",
- XIDDATASIZE), mem_root);
+ {
+ uint len;
+ CHARSET_INFO *cs;
+
+ if (thd->lex->verbose)
+ {
+ len= SQL_XIDSIZE;
+ cs= &my_charset_utf8_general_ci;
+ action= (my_hash_walk_action) xa_recover_callback_verbose;
+ }
+ else
+ {
+ len= XIDDATASIZE;
+ cs= &my_charset_bin;
+ action= (my_hash_walk_action) xa_recover_callback_short;
+ }
+
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "data", len, cs), mem_root);
+ }
if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
- if (xid_cache_iterate(thd, (my_hash_walk_action) xa_recover_callback,
- protocol))
+ if (xid_cache_iterate(thd, action, protocol))
DBUG_RETURN(1);
my_eof(thd);
DBUG_RETURN(0);
diff --git a/sql/handler.h b/sql/handler.h
index 13b8dc41f65..1a4e83c093d 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -672,6 +672,15 @@ struct xid_t {
};
typedef struct xid_t XID;
+/*
+ The size of XID string representation in the form
+ 'gtrid', 'bqual', formatID
+ see xid_t::get_sql_string() for details.
+*/
+#define SQL_XIDSIZE (XIDDATASIZE * 2 + 8 + MY_INT64_NUM_DECIMAL_DIGITS)
+/* The 'buf' has to have space for at least SQL_XIDSIZE bytes. */
+uint get_sql_xid(XID *xid, char *buf);
+
/* for recover() handlerton call */
#define MIN_XID_LIST_SIZE 128
#define MAX_XID_LIST_SIZE (1024*128)
@@ -1929,6 +1938,13 @@ struct HA_CREATE_INFO: public Table_scope_and_contents_source_st,
used_fields|= (HA_CREATE_USED_CHARSET | HA_CREATE_USED_DEFAULT_CHARSET);
return false;
}
+ ulong table_options_with_row_type()
+ {
+ if (row_type == ROW_TYPE_DYNAMIC || row_type == ROW_TYPE_PAGE)
+ return table_options | HA_OPTION_PACK_RECORD;
+ else
+ return table_options;
+ }
};
diff --git a/sql/log.cc b/sql/log.cc
index e5ff85e4544..bdf0b6fdc59 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -49,10 +49,10 @@
#endif
#include "sql_plugin.h"
-#include "rpl_handler.h"
#include "debug_sync.h"
#include "sql_show.h"
#include "my_pthread.h"
+#include "semisync_master.h"
#include "wsrep_mysqld.h"
#include "sp_rcontext.h"
#include "sp_head.h"
@@ -3211,7 +3211,7 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
group_commit_trigger_lock_wait(0),
sync_period_ptr(sync_period), sync_counter(0),
state_file_deleted(false), binlog_state_recover_done(false),
- is_relay_log(0), signal_cnt(0),
+ is_relay_log(0), relay_signal_cnt(0),
checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF),
relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
description_event_for_exec(0), description_event_for_queue(0),
@@ -3281,7 +3281,8 @@ void MYSQL_BIN_LOG::cleanup()
mysql_mutex_destroy(&LOCK_xid_list);
mysql_mutex_destroy(&LOCK_binlog_background_thread);
mysql_mutex_destroy(&LOCK_binlog_end_pos);
- mysql_cond_destroy(&update_cond);
+ mysql_cond_destroy(&COND_relay_log_updated);
+ mysql_cond_destroy(&COND_bin_log_updated);
mysql_cond_destroy(&COND_queue_busy);
mysql_cond_destroy(&COND_xid_list);
mysql_cond_destroy(&COND_binlog_background_thread);
@@ -3316,7 +3317,8 @@ void MYSQL_BIN_LOG::init_pthread_objects()
mysql_mutex_setflags(&LOCK_index, MYF_NO_DEADLOCK_DETECTION);
mysql_mutex_init(key_BINLOG_LOCK_xid_list,
&LOCK_xid_list, MY_MUTEX_INIT_FAST);
- mysql_cond_init(m_key_update_cond, &update_cond, 0);
+ mysql_cond_init(m_key_relay_log_update, &COND_relay_log_updated, 0);
+ mysql_cond_init(m_key_bin_log_update, &COND_bin_log_updated, 0);
mysql_cond_init(m_key_COND_queue_busy, &COND_queue_busy, 0);
mysql_cond_init(key_BINLOG_COND_xid_list, &COND_xid_list, 0);
@@ -3802,6 +3804,11 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
close_purge_index_file();
#endif
+ /* Notify the io thread that binlog is rotated to a new file */
+ if (is_relay_log)
+ signal_relay_log_update();
+ else
+ update_binlog_end_pos();
DBUG_RETURN(0);
err:
@@ -5112,7 +5119,12 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
new file name in the current binary log file.
*/
if ((error= generate_new_name(new_name, name, 0)))
+ {
+#ifdef ENABLE_AND_FIX_HANG
+ close_on_error= TRUE;
+#endif
goto end;
+ }
new_name_ptr=new_name;
if (log_type == LOG_BIN)
@@ -5143,13 +5155,20 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
}
bytes_written += r.data_written;
}
- /*
- Update needs to be signalled even if there is no rotate event
- log rotation should give the waiting thread a signal to
- discover EOF and move on to the next log.
- */
- signal_update();
}
+
+ /*
+ Update needs to be signalled even if there is no rotate event
+ log rotation should give the waiting thread a signal to
+ discover EOF and move on to the next log.
+ */
+ if ((error= flush_io_cache(&log_file)))
+ {
+ close_on_error= TRUE;
+ goto end;
+ }
+ update_binlog_end_pos();
+
old_name=name;
name=0; // Don't free name
close_flag= LOG_CLOSE_TO_BE_OPENED | LOG_CLOSE_INDEX;
@@ -5280,7 +5299,7 @@ bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev)
if (my_b_append_tell(&log_file) > max_size)
error= new_file_without_locking();
err:
- signal_update(); // Safe as we don't call close
+ update_binlog_end_pos();
DBUG_RETURN(error);
}
@@ -5341,7 +5360,7 @@ bool MYSQL_BIN_LOG::write_event_buffer(uchar* buf, uint len)
err:
my_safe_afree(ebuf, len);
if (!error)
- signal_update();
+ update_binlog_end_pos();
DBUG_RETURN(error);
}
@@ -6341,6 +6360,7 @@ err:
{
my_off_t offset= my_b_tell(file);
bool check_purge= false;
+ DBUG_ASSERT(!is_relay_log);
if (!error)
{
@@ -6355,25 +6375,23 @@ err:
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
- bool first= true;
- bool last= true;
- if ((error= RUN_HOOK(binlog_storage, after_flush,
- (thd, log_file_name, file->pos_in_file,
- synced, first, last))))
+#ifdef HAVE_REPLICATION
+ if (repl_semisync_master.report_binlog_update(thd, log_file_name,
+ file->pos_in_file))
{
sql_print_error("Failed to run 'after_flush' hooks");
error= 1;
}
else
+#endif
{
- /* update binlog_end_pos so it can be read by dump thread
- *
- * note: must be _after_ the RUN_HOOK(after_flush) or else
- * semi-sync-plugin might not have put the transaction into
- * it's list before dump-thread tries to send it
- */
+ /*
+ update binlog_end_pos so it can be read by dump thread
+ note: must be _after_ the RUN_HOOK(after_flush) or else
+ semi-sync might not have put the transaction into
+ it's list before dump-thread tries to send it
+ */
update_binlog_end_pos(offset);
-
if ((error= rotate(false, &check_purge)))
check_purge= false;
}
@@ -6390,15 +6408,14 @@ err:
mysql_mutex_assert_not_owner(&LOCK_log);
mysql_mutex_assert_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
- bool first= true;
- bool last= true;
- if (RUN_HOOK(binlog_storage, after_sync,
- (thd, log_file_name, file->pos_in_file,
- first, last)))
+#ifdef HAVE_REPLICATION
+ if (repl_semisync_master.wait_after_sync(log_file_name,
+ file->pos_in_file))
{
error=1;
/* error is already printed inside hook */
}
+#endif
/*
Take mutex to protect against a reader seeing partial writes of 64-bit
@@ -7099,7 +7116,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
if (!(error= write_incident_already_locked(thd)) &&
!(error= flush_and_sync(0)))
{
- signal_update();
+ update_binlog_end_pos();
if ((error= rotate(false, &check_purge)))
check_purge= false;
}
@@ -7140,7 +7157,7 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name_arg
*/
if (!write_event(&ev) && !flush_and_sync(0))
{
- signal_update();
+ update_binlog_end_pos();
}
else
{
@@ -7575,7 +7592,11 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
else if (is_leader)
trx_group_commit_leader(entry);
else if (!entry->queued_by_other)
+ {
+ DEBUG_SYNC(entry->thd, "after_semisync_queue");
+
entry->thd->wait_for_wakeup_ready();
+ }
else
{
/*
@@ -7820,31 +7841,31 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
- bool first= true, last;
+
for (current= queue; current != NULL; current= current->next)
{
- last= current->next == NULL;
+#ifdef HAVE_REPLICATION
if (!current->error &&
- RUN_HOOK(binlog_storage, after_flush,
- (current->thd,
- current->cache_mngr->last_commit_pos_file,
- current->cache_mngr->last_commit_pos_offset, synced,
- first, last)))
+ repl_semisync_master.
+ report_binlog_update(current->thd,
+ current->cache_mngr->last_commit_pos_file,
+ current->cache_mngr->
+ last_commit_pos_offset))
{
current->error= ER_ERROR_ON_WRITE;
current->commit_errno= -1;
current->error_cache= NULL;
any_error= true;
}
- first= false;
+#endif
}
- /* update binlog_end_pos so it can be read by dump thread
- *
- * note: must be _after_ the RUN_HOOK(after_flush) or else
- * semi-sync-plugin might not have put the transaction into
- * it's list before dump-thread tries to send it
- */
+ /*
+ update binlog_end_pos so it can be read by dump thread
+ Note: must be _after_ the RUN_HOOK(after_flush) or else
+ semi-sync might not have put the transaction into
+ it's list before dump-thread tries to send it
+ */
update_binlog_end_pos(commit_offset);
if (any_error)
@@ -7906,18 +7927,19 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
mysql_mutex_assert_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
- bool first= true, last;
+ bool first __attribute__((unused))= true;
+ bool last __attribute__((unused));
for (current= queue; current != NULL; current= current->next)
{
last= current->next == NULL;
- if (!current->error &&
- RUN_HOOK(binlog_storage, after_sync,
- (current->thd, current->cache_mngr->last_commit_pos_file,
- current->cache_mngr->last_commit_pos_offset,
- first, last)))
- {
- /* error is already printed inside hook */
- }
+#ifdef HAVE_REPLICATION
+ if (!current->error)
+ current->error=
+ repl_semisync_master.wait_after_sync(current->cache_mngr->
+ last_commit_pos_file,
+ current->cache_mngr->
+ last_commit_pos_offset);
+#endif
first= false;
}
}
@@ -8228,10 +8250,10 @@ void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd)
DBUG_ENTER("wait_for_update_relay_log");
mysql_mutex_assert_owner(&LOCK_log);
- thd->ENTER_COND(&update_cond, &LOCK_log,
+ thd->ENTER_COND(&COND_relay_log_updated, &LOCK_log,
&stage_slave_has_read_all_relay_log,
&old_stage);
- mysql_cond_wait(&update_cond, &LOCK_log);
+ mysql_cond_wait(&COND_relay_log_updated, &LOCK_log);
thd->EXIT_COND(&old_stage);
DBUG_VOID_RETURN;
}
@@ -8261,9 +8283,9 @@ int MYSQL_BIN_LOG::wait_for_update_binlog_end_pos(THD* thd,
thd_wait_begin(thd, THD_WAIT_BINLOG);
mysql_mutex_assert_owner(get_binlog_end_pos_lock());
if (!timeout)
- mysql_cond_wait(&update_cond, get_binlog_end_pos_lock());
+ mysql_cond_wait(&COND_bin_log_updated, get_binlog_end_pos_lock());
else
- ret= mysql_cond_timedwait(&update_cond, get_binlog_end_pos_lock(),
+ ret= mysql_cond_timedwait(&COND_bin_log_updated, get_binlog_end_pos_lock(),
timeout);
thd_wait_end(thd);
DBUG_RETURN(ret);
@@ -8308,7 +8330,8 @@ void MYSQL_BIN_LOG::close(uint exiting)
relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
write_event(&s);
bytes_written+= s.data_written;
- signal_update();
+ flush_io_cache(&log_file);
+ update_binlog_end_pos();
/*
When we shut down server, write out the binlog state to a separate
@@ -8527,14 +8550,6 @@ bool flush_error_log()
return result;
}
-void MYSQL_BIN_LOG::signal_update()
-{
- DBUG_ENTER("MYSQL_BIN_LOG::signal_update");
- signal_cnt++;
- mysql_cond_broadcast(&update_cond);
- DBUG_VOID_RETURN;
-}
-
#ifdef _WIN32
static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff,
size_t length, size_t buffLen)
@@ -9918,7 +9933,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
for (;;)
{
while ((ev= Log_event::read_log_event(first_round ? first_log : &log,
- 0, fdle, opt_master_verify_checksum))
+ fdle, opt_master_verify_checksum))
&& ev->is_valid())
{
enum Log_event_type typ= ev->get_type_code();
@@ -10159,7 +10174,7 @@ MYSQL_BIN_LOG::do_binlog_recovery(const char *opt_name, bool do_xa_recovery)
return 1;
}
- if ((ev= Log_event::read_log_event(&log, 0, &fdle,
+ if ((ev= Log_event::read_log_event(&log, &fdle,
opt_master_verify_checksum)) &&
ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
{
@@ -10400,7 +10415,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
*out_gtid_list= NULL;
- if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
+ if (!(ev= Log_event::read_log_event(cache, &init_fdle,
opt_master_verify_checksum)) ||
ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
{
@@ -10416,7 +10431,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
{
Log_event_type typ;
- ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum);
+ ev= Log_event::read_log_event(cache, fdle, opt_master_verify_checksum);
if (!ev)
{
errormsg= "Could not read GTID list event while looking for GTID "
@@ -10446,6 +10461,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
return errormsg;
}
+
struct st_mysql_storage_engine binlog_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
diff --git a/sql/log.h b/sql/log.h
index dffb6a80d54..02ace7c7921 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -349,6 +349,11 @@ public:
/* for documentation of mutexes held in various places in code */
};
+/* Tell the io thread if we can delay the master info sync. */
+#define SEMI_SYNC_SLAVE_DELAY_SYNC 1
+/* Tell the io thread if the current event needs a ack. */
+#define SEMI_SYNC_NEED_ACK 2
+
class MYSQL_QUERY_LOG: public MYSQL_LOG
{
public:
@@ -425,14 +430,18 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
#ifdef HAVE_PSI_INTERFACE
/** The instrumentation key to use for @ LOCK_index. */
PSI_mutex_key m_key_LOCK_index;
- /** The instrumentation key to use for @ update_cond. */
- PSI_cond_key m_key_update_cond;
+ /** The instrumentation key to use for @ COND_relay_log_updated */
+ PSI_cond_key m_key_relay_log_update;
+ /** The instrumentation key to use for @ COND_bin_log_updated */
+ PSI_cond_key m_key_bin_log_update;
/** The instrumentation key to use for opening the log file. */
PSI_file_key m_key_file_log;
/** The instrumentation key to use for opening the log index file. */
PSI_file_key m_key_file_log_index;
PSI_file_key m_key_COND_queue_busy;
+ /** The instrumentation key to use for LOCK_binlog_end_pos. */
+ PSI_mutex_key m_key_LOCK_binlog_end_pos;
#endif
struct group_commit_entry
@@ -488,7 +497,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
mysql_mutex_t LOCK_binlog_end_pos;
mysql_mutex_t LOCK_xid_list;
mysql_cond_t COND_xid_list;
- mysql_cond_t update_cond;
+ mysql_cond_t COND_relay_log_updated, COND_bin_log_updated;
ulonglong bytes_written;
IO_CACHE index_file;
char index_file_name[FN_REFLEN];
@@ -598,7 +607,7 @@ public:
/* This is relay log */
bool is_relay_log;
- ulong signal_cnt; // update of the counter is checked by heartbeat
+ ulong relay_signal_cnt; // update of the counter is checked by heartbeat
enum enum_binlog_checksum_alg checksum_alg_reset; // to contain a new value when binlog is rotated
/*
Holds the last seen in Relay-Log FD's checksum alg value.
@@ -661,16 +670,20 @@ public:
#ifdef HAVE_PSI_INTERFACE
void set_psi_keys(PSI_mutex_key key_LOCK_index,
- PSI_cond_key key_update_cond,
+ PSI_cond_key key_relay_log_update,
+ PSI_cond_key key_bin_log_update,
PSI_file_key key_file_log,
PSI_file_key key_file_log_index,
- PSI_file_key key_COND_queue_busy)
+ PSI_file_key key_COND_queue_busy,
+ PSI_mutex_key key_LOCK_binlog_end_pos)
{
m_key_LOCK_index= key_LOCK_index;
- m_key_update_cond= key_update_cond;
+ m_key_relay_log_update= key_relay_log_update;
+ m_key_bin_log_update= key_bin_log_update;
m_key_file_log= key_file_log;
m_key_file_log_index= key_file_log_index;
m_key_COND_queue_busy= key_COND_queue_busy;
+ m_key_LOCK_binlog_end_pos= key_LOCK_binlog_end_pos;
}
#endif
@@ -707,7 +720,53 @@ public:
DBUG_VOID_RETURN;
}
void set_max_size(ulong max_size_arg);
- void signal_update();
+
+ /* Handle signaling that relay has been updated */
+ void signal_relay_log_update()
+ {
+ mysql_mutex_assert_owner(&LOCK_log);
+ DBUG_ASSERT(is_relay_log);
+ DBUG_ENTER("MYSQL_BIN_LOG::signal_relay_log_update");
+ relay_signal_cnt++;
+ mysql_cond_broadcast(&COND_relay_log_updated);
+ DBUG_VOID_RETURN;
+ }
+ void signal_bin_log_update()
+ {
+ mysql_mutex_assert_owner(&LOCK_binlog_end_pos);
+ DBUG_ASSERT(!is_relay_log);
+ DBUG_ENTER("MYSQL_BIN_LOG::signal_bin_log_update");
+ mysql_cond_broadcast(&COND_bin_log_updated);
+ DBUG_VOID_RETURN;
+ }
+ void update_binlog_end_pos()
+ {
+ if (is_relay_log)
+ signal_relay_log_update();
+ else
+ {
+ lock_binlog_end_pos();
+ binlog_end_pos= my_b_safe_tell(&log_file);
+ signal_bin_log_update();
+ unlock_binlog_end_pos();
+ }
+ }
+ void update_binlog_end_pos(my_off_t pos)
+ {
+ mysql_mutex_assert_owner(&LOCK_log);
+ mysql_mutex_assert_not_owner(&LOCK_binlog_end_pos);
+ lock_binlog_end_pos();
+ /*
+ Note: it would make more sense to assert(pos > binlog_end_pos)
+ but there are two places triggered by mtr that has pos == binlog_end_pos
+ i didn't investigate but accepted as it should do no harm
+ */
+ DBUG_ASSERT(pos >= binlog_end_pos);
+ binlog_end_pos= pos;
+ signal_bin_log_update();
+ unlock_binlog_end_pos();
+ }
+
void wait_for_sufficient_commits();
void binlog_trigger_immediate_group_commit();
void wait_for_update_relay_log(THD* thd);
@@ -807,7 +866,7 @@ public:
inline char* get_log_fname() { return log_file_name; }
inline char* get_name() { return name; }
inline mysql_mutex_t* get_log_lock() { return &LOCK_log; }
- inline mysql_cond_t* get_log_cond() { return &update_cond; }
+ inline mysql_cond_t* get_bin_log_cond() { return &COND_bin_log_updated; }
inline IO_CACHE* get_log_file() { return &log_file; }
inline void lock_index() { mysql_mutex_lock(&LOCK_index);}
@@ -831,23 +890,6 @@ public:
bool check_strict_gtid_sequence(uint32 domain_id, uint32 server_id,
uint64 seq_no);
-
- void update_binlog_end_pos(my_off_t pos)
- {
- mysql_mutex_assert_owner(&LOCK_log);
- mysql_mutex_assert_not_owner(&LOCK_binlog_end_pos);
- lock_binlog_end_pos();
- /**
- * note: it would make more sense to assert(pos > binlog_end_pos)
- * but there are two places triggered by mtr that has pos == binlog_end_pos
- * i didn't investigate but accepted as it should do no harm
- */
- DBUG_ASSERT(pos >= binlog_end_pos);
- binlog_end_pos= pos;
- signal_update();
- unlock_binlog_end_pos();
- }
-
/**
* used when opening new file, and binlog_end_pos moves backwards
*/
@@ -858,7 +900,7 @@ public:
lock_binlog_end_pos();
binlog_end_pos= pos;
strcpy(binlog_end_pos_file, file_name);
- signal_update();
+ signal_bin_log_update();
unlock_binlog_end_pos();
}
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 1256da729f0..abc979f1b58 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -1855,7 +1855,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
DBUG_RETURN(0);
}
-Log_event* Log_event::read_log_event(IO_CACHE* file, mysql_mutex_t* log_lock,
+Log_event* Log_event::read_log_event(IO_CACHE* file,
const Format_description_log_event *fdle,
my_bool crc_check)
{
@@ -1865,9 +1865,6 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, mysql_mutex_t* log_lock,
const char *error= 0;
Log_event *res= 0;
- if (log_lock)
- mysql_mutex_lock(log_lock);
-
switch (read_log_event(file, &event, fdle, BINLOG_CHECKSUM_ALG_OFF))
{
case 0:
@@ -1904,8 +1901,6 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, mysql_mutex_t* log_lock,
res->register_temp_buf(event.release(), true);
err:
- if (log_lock)
- mysql_mutex_unlock(log_lock);
if (error)
{
DBUG_ASSERT(!res);
@@ -9736,7 +9731,6 @@ int Execute_load_log_event::do_apply_event(rpl_group_info *rgi)
}
if (!(lev= (Load_log_event*)
Log_event::read_log_event(&file,
- (mysql_mutex_t*)0,
rli->relay_log.description_event_for_exec,
opt_slave_sql_verify_checksum)) ||
lev->get_type_code() != NEW_LOAD_EVENT)
diff --git a/sql/log_event.h b/sql/log_event.h
index 4d84bc34a47..29cae604678 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -1300,7 +1300,6 @@ public:
constructor and pass description_event as an argument.
*/
static Log_event* read_log_event(IO_CACHE* file,
- mysql_mutex_t* log_lock,
const Format_description_log_event
*description_event,
my_bool crc_check);
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index effb81e06f6..afbde520f7c 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -97,8 +97,8 @@
#include "set_var.h"
#include "rpl_injector.h"
-
-#include "rpl_handler.h"
+#include "semisync_master.h"
+#include "semisync_slave.h"
#include "transaction.h"
@@ -914,7 +914,7 @@ PSI_mutex_key key_LOCK_des_key_file;
PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_BINLOG_LOCK_binlog_background_thread,
- m_key_LOCK_binlog_end_pos,
+ key_LOCK_binlog_end_pos,
key_delayed_insert_mutex, key_hash_filo_lock, key_LOCK_active_mi,
key_LOCK_connection_count, key_LOCK_crypt, key_LOCK_delayed_create,
key_LOCK_delayed_insert, key_LOCK_delayed_status, key_LOCK_error_log,
@@ -936,8 +936,10 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_thread_count, key_LOCK_thread_cache,
key_PARTITION_LOCK_auto_inc;
PSI_mutex_key key_RELAYLOG_LOCK_index;
+PSI_mutex_key key_LOCK_relaylog_end_pos;
PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
+PSI_mutex_key key_LOCK_binlog;
PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
@@ -949,6 +951,7 @@ PSI_mutex_key key_LOCK_after_binlog_sync;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered,
key_LOCK_slave_background;
PSI_mutex_key key_TABLE_SHARE_LOCK_share;
+PSI_mutex_key key_LOCK_ack_receiver;
PSI_mutex_key key_TABLE_SHARE_LOCK_rotation;
PSI_cond_key key_TABLE_SHARE_COND_rotation;
@@ -970,8 +973,9 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_BINLOG_LOCK_index, "MYSQL_BIN_LOG::LOCK_index", 0},
{ &key_BINLOG_LOCK_xid_list, "MYSQL_BIN_LOG::LOCK_xid_list", 0},
{ &key_BINLOG_LOCK_binlog_background_thread, "MYSQL_BIN_LOG::LOCK_binlog_background_thread", 0},
- { &m_key_LOCK_binlog_end_pos, "MYSQL_BIN_LOG::LOCK_binlog_end_pos", 0 },
+ { &key_LOCK_binlog_end_pos, "MYSQL_BIN_LOG::LOCK_binlog_end_pos", 0 },
{ &key_RELAYLOG_LOCK_index, "MYSQL_RELAY_LOG::LOCK_index", 0},
+ { &key_LOCK_relaylog_end_pos, "MYSQL_RELAY_LOG::LOCK_binlog_end_pos", 0},
{ &key_delayed_insert_mutex, "Delayed_insert::mutex", 0},
{ &key_hash_filo_lock, "hash_filo::lock", 0},
{ &key_LOCK_active_mi, "LOCK_active_mi", PSI_FLAG_GLOBAL},
@@ -1029,7 +1033,9 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_binlog_state, "LOCK_binlog_state", 0},
{ &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0},
{ &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0},
- { &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0}
+ { &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0},
+ { &key_LOCK_ack_receiver, "Ack_receiver::mutex", 0},
+ { &key_LOCK_binlog, "LOCK_binlog", 0}
};
PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
@@ -1058,7 +1064,8 @@ static PSI_rwlock_info all_server_rwlocks[]=
PSI_cond_key key_PAGE_cond, key_COND_active, key_COND_pool;
#endif /* HAVE_MMAP */
-PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
+PSI_cond_key key_BINLOG_COND_xid_list,
+ key_BINLOG_COND_bin_log_updated, key_BINLOG_COND_relay_log_updated,
key_BINLOG_COND_binlog_background_thread,
key_BINLOG_COND_binlog_background_thread_end,
key_COND_cache_status_changed, key_COND_manager,
@@ -1072,9 +1079,10 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_rpl_group_info_sleep_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache,
- key_COND_start_thread,
+ key_COND_start_thread, key_COND_binlog_send,
key_BINLOG_COND_queue_busy;
-PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
+PSI_cond_key key_RELAYLOG_COND_relay_log_updated,
+ key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready,
key_COND_wait_commit;
PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
@@ -1083,6 +1091,7 @@ PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_parallel_entry, key_COND_group_commit_orderer,
key_COND_prepare_ordered, key_COND_slave_background;
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
+PSI_cond_key key_COND_ack_receiver;
static PSI_cond_info all_server_conds[]=
{
@@ -1095,12 +1104,13 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_pool, "TC_LOG_MMAP::COND_pool", 0},
{ &key_TC_LOG_MMAP_COND_queue_busy, "TC_LOG_MMAP::COND_queue_busy", 0},
#endif /* HAVE_MMAP */
+ { &key_BINLOG_COND_bin_log_updated, "MYSQL_BIN_LOG::COND_bin_log_updated", 0}, { &key_BINLOG_COND_relay_log_updated, "MYSQL_BIN_LOG::COND_relay_log_updated", 0},
{ &key_BINLOG_COND_xid_list, "MYSQL_BIN_LOG::COND_xid_list", 0},
- { &key_BINLOG_update_cond, "MYSQL_BIN_LOG::update_cond", 0},
{ &key_BINLOG_COND_binlog_background_thread, "MYSQL_BIN_LOG::COND_binlog_background_thread", 0},
{ &key_BINLOG_COND_binlog_background_thread_end, "MYSQL_BIN_LOG::COND_binlog_background_thread_end", 0},
{ &key_BINLOG_COND_queue_busy, "MYSQL_BIN_LOG::COND_queue_busy", 0},
- { &key_RELAYLOG_update_cond, "MYSQL_RELAY_LOG::update_cond", 0},
+ { &key_RELAYLOG_COND_relay_log_updated, "MYSQL_RELAY_LOG::COND_relay_log_updated", 0},
+ { &key_RELAYLOG_COND_bin_log_updated, "MYSQL_RELAY_LOG::COND_bin_log_updated", 0},
{ &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0},
{ &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0},
{ &key_COND_wait_commit, "wait_for_commit::COND_wait_commit", 0},
@@ -1135,6 +1145,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0},
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0},
+ { &key_COND_ack_receiver, "Ack_receiver::cond", 0},
+ { &key_COND_binlog_send, "COND_binlog_send", 0},
{ &key_TABLE_SHARE_COND_rotation, "TABLE_SHARE::COND_rotation", 0}
};
@@ -1142,6 +1154,7 @@ PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_main,
key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_background, key_rpl_parallel_thread;
+PSI_thread_key key_thread_ack_receiver;
static PSI_thread_info all_server_threads[]=
{
@@ -1168,6 +1181,7 @@ static PSI_thread_info all_server_threads[]=
{ &key_thread_one_connection, "one_connection", 0},
{ &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL},
{ &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL},
+ { &key_thread_ack_receiver, "Ack_receiver", PSI_FLAG_GLOBAL},
{ &key_rpl_parallel_thread, "rpl_parallel_thread", 0}
};
@@ -1747,6 +1761,7 @@ static void close_connections(void)
Events::deinit();
slave_prepare_for_shutdown();
mysql_bin_log.stop_background_thread();
+ ack_receiver.stop();
/*
Give threads time to die.
@@ -2228,7 +2243,9 @@ void clean_up(bool print_message)
ha_end();
if (tc_log)
tc_log->close();
- delegates_destroy();
+#ifdef HAVE_REPLICATION
+ semi_sync_master_deinit();
+#endif
xid_cache_free();
tdc_deinit();
mdl_destroy();
@@ -4247,10 +4264,12 @@ static int init_common_variables()
constructor (called before main()).
*/
mysql_bin_log.set_psi_keys(key_BINLOG_LOCK_index,
- key_BINLOG_update_cond,
+ key_BINLOG_COND_relay_log_updated,
+ key_BINLOG_COND_bin_log_updated,
key_file_binlog,
key_file_binlog_index,
- key_BINLOG_COND_queue_busy);
+ key_BINLOG_COND_queue_busy,
+ key_LOCK_binlog_end_pos);
#endif
/*
@@ -5143,13 +5162,6 @@ static int init_server_components()
xid_cache_init();
- /*
- initialize delegates for extension observers, errors have already
- been reported in the function
- */
- if (delegates_init())
- unireg_abort(1);
-
/* need to configure logging before initializing storage engines */
if (!opt_bin_log_used && !WSREP_ON)
{
@@ -5181,6 +5193,13 @@ static int init_server_components()
"this server. However this will be ignored as the "
"--log-bin option is not defined.");
}
+
+ if (repl_semisync_master.init_object() ||
+ repl_semisync_slave.init_object())
+ {
+ sql_print_error("Could not initialize semisync.");
+ unireg_abort(1);
+ }
#endif
if (opt_bin_log)
@@ -8271,6 +8290,27 @@ static int show_ssl_get_cipher_list(THD *thd, SHOW_VAR *var, char *buff,
return 0;
}
+#define SHOW_FNAME(name) \
+ rpl_semi_sync_master_show_##name
+
+#define DEF_SHOW_FUNC(name, show_type) \
+ static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \
+ { \
+ repl_semisync_master.set_export_stats(); \
+ var->type= show_type; \
+ var->value= (char *)&rpl_semi_sync_master_##name; \
+ return 0; \
+ }
+
+DEF_SHOW_FUNC(status, SHOW_BOOL)
+DEF_SHOW_FUNC(clients, SHOW_LONG)
+DEF_SHOW_FUNC(wait_sessions, SHOW_LONG)
+DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG)
+DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG)
+DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
+DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
+DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
+DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
#ifdef HAVE_YASSL
@@ -8589,6 +8629,26 @@ SHOW_VAR status_vars[]= {
{"Rows_sent", (char*) offsetof(STATUS_VAR, rows_sent), SHOW_LONGLONG_STATUS},
{"Rows_read", (char*) offsetof(STATUS_VAR, rows_read), SHOW_LONGLONG_STATUS},
{"Rows_tmp_read", (char*) offsetof(STATUS_VAR, rows_tmp_read), SHOW_LONGLONG_STATUS},
+#ifdef HAVE_REPLICATION
+ {"Rpl_semi_sync_master_status", (char*) &SHOW_FNAME(status), SHOW_FUNC},
+ {"Rpl_semi_sync_master_clients", (char*) &SHOW_FNAME(clients), SHOW_FUNC},
+ {"Rpl_semi_sync_master_yes_tx", (char*) &rpl_semi_sync_master_yes_transactions, SHOW_LONG},
+ {"Rpl_semi_sync_master_no_tx", (char*) &rpl_semi_sync_master_no_transactions, SHOW_LONG},
+ {"Rpl_semi_sync_master_wait_sessions", (char*) &SHOW_FNAME(wait_sessions), SHOW_FUNC},
+ {"Rpl_semi_sync_master_no_times", (char*) &rpl_semi_sync_master_off_times, SHOW_LONG},
+ {"Rpl_semi_sync_master_timefunc_failures", (char*) &rpl_semi_sync_master_timefunc_fails, SHOW_LONG},
+ {"Rpl_semi_sync_master_wait_pos_backtraverse", (char*) &rpl_semi_sync_master_wait_pos_backtraverse, SHOW_LONG},
+ {"Rpl_semi_sync_master_tx_wait_time", (char*) &SHOW_FNAME(trx_wait_time), SHOW_FUNC},
+ {"Rpl_semi_sync_master_tx_waits", (char*) &SHOW_FNAME(trx_wait_num), SHOW_FUNC},
+ {"Rpl_semi_sync_master_tx_avg_wait_time", (char*) &SHOW_FNAME(avg_trx_wait_time), SHOW_FUNC},
+ {"Rpl_semi_sync_master_net_wait_time", (char*) &SHOW_FNAME(net_wait_time), SHOW_FUNC},
+ {"Rpl_semi_sync_master_net_waits", (char*) &SHOW_FNAME(net_wait_num), SHOW_FUNC},
+ {"Rpl_semi_sync_master_net_avg_wait_time", (char*) &SHOW_FNAME(avg_net_wait_time), SHOW_FUNC},
+ {"Rpl_semi_sync_master_request_ack", (char*) &rpl_semi_sync_master_request_ack, SHOW_LONGLONG},
+ {"Rpl_semi_sync_master_get_ack", (char*)&rpl_semi_sync_master_get_ack, SHOW_LONGLONG},
+ {"Rpl_semi_sync_slave_status", (char*) &rpl_semi_sync_slave_status, SHOW_BOOL},
+ {"Rpl_semi_sync_slave_send_ack", (char*) &rpl_semi_sync_slave_send_ack, SHOW_LONGLONG},
+#endif /* HAVE_REPLICATION */
#ifdef HAVE_QUERY_CACHE
{"Qcache_free_blocks", (char*) &query_cache.free_memory_blocks, SHOW_LONG_NOFLUSH},
{"Qcache_free_memory", (char*) &query_cache.free_memory, SHOW_LONG_NOFLUSH},
@@ -10329,6 +10389,10 @@ PSI_stage_info stage_waiting_for_insert= { 0, "Waiting for INSERT", 0};
PSI_stage_info stage_waiting_for_master_to_send_event= { 0, "Waiting for master to send event", 0};
PSI_stage_info stage_waiting_for_master_update= { 0, "Waiting for master update", 0};
PSI_stage_info stage_waiting_for_relay_log_space= { 0, "Waiting for the slave SQL thread to free enough relay log space", 0};
+PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
+{ 0, "Waiting for semi-sync ACK from slave", 0};
+PSI_stage_info stage_waiting_for_semi_sync_slave={ 0, "Waiting for semi-sync slave connection", 0};
+PSI_stage_info stage_reading_semi_sync_ack={ 0, "Reading semi-sync ACK from slave", 0};
PSI_stage_info stage_waiting_for_slave_mutex_on_exit= { 0, "Waiting for slave mutex on exit", 0};
PSI_stage_info stage_waiting_for_slave_thread_to_start= { 0, "Waiting for slave thread to start", 0};
PSI_stage_info stage_waiting_for_table_flush= { 0, "Waiting for table flush", 0};
@@ -10489,6 +10553,9 @@ PSI_stage_info *all_server_stages[]=
& stage_gtid_wait_other_connection,
& stage_slave_background_process_request,
& stage_slave_background_wait_request,
+ & stage_waiting_for_semi_sync_ack_from_slave,
+ & stage_waiting_for_semi_sync_slave,
+ & stage_reading_semi_sync_ack,
& stage_waiting_for_deadlock_kill
};
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 2463f569c94..a5857394b7a 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -329,7 +329,7 @@ extern PSI_mutex_key key_LOCK_des_key_file;
extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_BINLOG_LOCK_binlog_background_thread,
- m_key_LOCK_binlog_end_pos,
+ key_LOCK_binlog_end_pos,
key_delayed_insert_mutex, key_hash_filo_lock, key_LOCK_active_mi,
key_LOCK_connection_count, key_LOCK_crypt, key_LOCK_delayed_create,
key_LOCK_delayed_insert, key_LOCK_delayed_status, key_LOCK_error_log,
@@ -349,6 +349,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_start_thread,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
+extern PSI_mutex_key key_LOCK_relaylog_end_pos;
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
@@ -383,7 +384,8 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_start_thread,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache;
-extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
+extern PSI_cond_key key_RELAYLOG_COND_relay_log_updated,
+ key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready,
key_COND_wait_commit;
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
diff --git a/sql/replication.h b/sql/replication.h
index 4731c2246ef..d8672310110 100644
--- a/sql/replication.h
+++ b/sql/replication.h
@@ -18,16 +18,14 @@
/***************************************************************************
NOTE: plugin locking.
- This API was created specifically for the semisync plugin and its locking
- logic is also matches semisync plugin usage pattern. In particular, a plugin
- is locked on Binlog_transmit_observer::transmit_start and is unlocked after
- Binlog_transmit_observer::transmit_stop. All other master observable events
- happen between these two and don't lock the plugin at all. This works well
- for the semisync_master plugin.
+
+ The plugin is locked on Binlog_transmit_observer::transmit_start and is
+ unlocked after Binlog_transmit_observer::transmit_stop. All other
+ master observable events happen between these two and don't lock the
+ plugin at all.
Also a plugin is locked on Binlog_relay_IO_observer::thread_start
- and unlocked after Binlog_relay_IO_observer::thread_stop. This works well for
- the semisync_slave plugin.
+ and unlocked after Binlog_relay_IO_observer::thread_stop.
***************************************************************************/
#include <mysql.h>
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
deleted file mode 100644
index e3ff2a17a6a..00000000000
--- a/sql/rpl_handler.cc
+++ /dev/null
@@ -1,553 +0,0 @@
-/* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; version 2 of the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
-
-#include "mariadb.h"
-#include "sql_priv.h"
-#include "unireg.h"
-
-#include "rpl_mi.h"
-#include "sql_repl.h"
-#include "log_event.h"
-#include "rpl_filter.h"
-#include <my_dir.h>
-#include "rpl_handler.h"
-
-Trans_delegate *transaction_delegate;
-Binlog_storage_delegate *binlog_storage_delegate;
-#ifdef HAVE_REPLICATION
-Binlog_transmit_delegate *binlog_transmit_delegate;
-Binlog_relay_IO_delegate *binlog_relay_io_delegate;
-#endif /* HAVE_REPLICATION */
-
-/*
- structure to save transaction log filename and position
-*/
-typedef struct Trans_binlog_info {
- my_off_t log_pos;
- char log_file[FN_REFLEN];
-} Trans_binlog_info;
-
-int get_user_var_int(const char *name,
- long long int *value, int *null_value)
-{
- bool null_val;
- user_var_entry *entry=
- (user_var_entry*) my_hash_search(&current_thd->user_vars,
- (uchar*) name, strlen(name));
- if (!entry)
- return 1;
- *value= entry->val_int(&null_val);
- if (null_value)
- *null_value= null_val;
- return 0;
-}
-
-int get_user_var_real(const char *name,
- double *value, int *null_value)
-{
- bool null_val;
- user_var_entry *entry=
- (user_var_entry*) my_hash_search(&current_thd->user_vars,
- (uchar*) name, strlen(name));
- if (!entry)
- return 1;
- *value= entry->val_real(&null_val);
- if (null_value)
- *null_value= null_val;
- return 0;
-}
-
-int get_user_var_str(const char *name, char *value,
- size_t len, unsigned int precision, int *null_value)
-{
- String str;
- bool null_val;
- user_var_entry *entry=
- (user_var_entry*) my_hash_search(&current_thd->user_vars,
- (uchar*) name, strlen(name));
- if (!entry)
- return 1;
- entry->val_str(&null_val, &str, precision);
- strncpy(value, str.c_ptr(), len);
- if (null_value)
- *null_value= null_val;
- return 0;
-}
-
-int delegates_init()
-{
- static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
- static my_aligned_storage<sizeof(Binlog_storage_delegate),
- MY_ALIGNOF(long)> storage_mem;
-#ifdef HAVE_REPLICATION
- static my_aligned_storage<sizeof(Binlog_transmit_delegate),
- MY_ALIGNOF(long)> transmit_mem;
- static my_aligned_storage<sizeof(Binlog_relay_IO_delegate),
- MY_ALIGNOF(long)> relay_io_mem;
-#endif
-
- void *place_trans_mem= trans_mem.data;
- void *place_storage_mem= storage_mem.data;
-
- transaction_delegate= new (place_trans_mem) Trans_delegate;
-
- if (!transaction_delegate->is_inited())
- {
- sql_print_error("Initialization of transaction delegates failed. "
- "Please report a bug.");
- return 1;
- }
-
- binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate;
-
- if (!binlog_storage_delegate->is_inited())
- {
- sql_print_error("Initialization binlog storage delegates failed. "
- "Please report a bug.");
- return 1;
- }
-
-#ifdef HAVE_REPLICATION
- void *place_transmit_mem= transmit_mem.data;
- void *place_relay_io_mem= relay_io_mem.data;
-
- binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate;
-
- if (!binlog_transmit_delegate->is_inited())
- {
- sql_print_error("Initialization of binlog transmit delegates failed. "
- "Please report a bug.");
- return 1;
- }
-
- binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate;
-
- if (!binlog_relay_io_delegate->is_inited())
- {
- sql_print_error("Initialization binlog relay IO delegates failed. "
- "Please report a bug.");
- return 1;
- }
-#endif
-
- return 0;
-}
-
-void delegates_destroy()
-{
- if (transaction_delegate)
- transaction_delegate->~Trans_delegate();
- if (binlog_storage_delegate)
- binlog_storage_delegate->~Binlog_storage_delegate();
-#ifdef HAVE_REPLICATION
- if (binlog_transmit_delegate)
- binlog_transmit_delegate->~Binlog_transmit_delegate();
- if (binlog_relay_io_delegate)
- binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
-#endif /* HAVE_REPLICATION */
-}
-
-/*
- This macro is used by almost all the Delegate methods to iterate
- over all the observers running given callback function of the
- delegate.
- */
-#define FOREACH_OBSERVER(r, f, do_lock, args) \
- param.server_id= thd->variables.server_id; \
- read_lock(); \
- Observer_info_iterator iter= observer_info_iter(); \
- Observer_info *info= iter++; \
- for (; info; info= iter++) \
- { \
- if (do_lock) plugin_lock(thd, plugin_int_to_ref(info->plugin_int)); \
- if (((Observer *)info->observer)->f \
- && ((Observer *)info->observer)->f args) \
- { \
- r= 1; \
- sql_print_error("Run function '" #f "' in plugin '%s' failed", \
- info->plugin_int->name.str); \
- break; \
- } \
- } \
- unlock();
-
-
-int Trans_delegate::after_commit(THD *thd, bool all)
-{
- Trans_param param;
- Trans_binlog_info *log_info;
- bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
- int ret= 0;
-
- param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
-
- log_info= thd->semisync_info;
-
- param.log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
- param.log_pos= log_info ? log_info->log_pos : 0;
-
- FOREACH_OBSERVER(ret, after_commit, false, (&param));
-
- /*
- This is the end of a real transaction or autocommit statement, we
- can mark the memory unused.
- */
- if (is_real_trans && log_info)
- {
- log_info->log_file[0]= 0;
- log_info->log_pos= 0;
- }
- return ret;
-}
-
-int Trans_delegate::after_rollback(THD *thd, bool all)
-{
- Trans_param param;
- Trans_binlog_info *log_info;
- bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
- int ret= 0;
-
- param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
-
- log_info= thd->semisync_info;
-
- param.log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
- param.log_pos= log_info ? log_info->log_pos : 0;
-
- FOREACH_OBSERVER(ret, after_rollback, false, (&param));
-
- /*
- This is the end of a real transaction or autocommit statement, we
- can mark the memory unused.
- */
- if (is_real_trans && log_info)
- {
- log_info->log_file[0]= 0;
- log_info->log_pos= 0;
- }
- return ret;
-}
-
-int Binlog_storage_delegate::after_flush(THD *thd,
- const char *log_file,
- my_off_t log_pos,
- bool synced,
- bool first_in_group,
- bool last_in_group)
-{
- Binlog_storage_param param;
- Trans_binlog_info *log_info;
- uint32 flags=0;
- int ret= 0;
-
- if (synced)
- flags |= BINLOG_STORAGE_IS_SYNCED;
- if (first_in_group)
- flags|= BINLOG_GROUP_COMMIT_LEADER;
- if (last_in_group)
- flags|= BINLOG_GROUP_COMMIT_TRAILER;
-
- if (!(log_info= thd->semisync_info))
- {
- if(!(log_info=
- (Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0))))
- return 1;
- thd->semisync_info= log_info;
- }
-
- strmake_buf(log_info->log_file, log_file+dirname_length(log_file));
- log_info->log_pos = log_pos;
-
- FOREACH_OBSERVER(ret, after_flush, false,
- (&param, log_info->log_file, log_info->log_pos, flags));
- return ret;
-}
-
-int Binlog_storage_delegate::after_sync(THD *thd,
- const char *log_file,
- my_off_t log_pos,
- bool first_in_group,
- bool last_in_group)
-{
- Binlog_storage_param param;
- uint32 flags=0;
-
- if (first_in_group)
- flags|= BINLOG_GROUP_COMMIT_LEADER;
- if (last_in_group)
- flags|= BINLOG_GROUP_COMMIT_TRAILER;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, after_sync, false,
- (&param, log_file+dirname_length(log_file), log_pos, flags));
-
- return ret;
-}
-
-#ifdef HAVE_REPLICATION
-int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
- const char *log_file,
- my_off_t log_pos)
-{
- Binlog_transmit_param param;
- param.flags= flags;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, transmit_start, true, (&param, log_file, log_pos));
- return ret;
-}
-
-int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
-{
- Binlog_transmit_param param;
- param.flags= flags;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, transmit_stop, false, (&param));
- return ret;
-}
-
-int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
- String *packet)
-{
- /* NOTE2ME: Maximum extra header size for each observer, I hope 32
- bytes should be enough for each Observer to reserve their extra
- header. If later found this is not enough, we can increase this
- /HEZX
- */
-#define RESERVE_HEADER_SIZE 32
- unsigned char header[RESERVE_HEADER_SIZE];
- ulong hlen;
- Binlog_transmit_param param;
- param.flags= flags;
- param.server_id= thd->variables.server_id;
-
- int ret= 0;
- read_lock();
- Observer_info_iterator iter= observer_info_iter();
- Observer_info *info= iter++;
- for (; info; info= iter++)
- {
- hlen= 0;
- if (((Observer *)info->observer)->reserve_header
- && ((Observer *)info->observer)->reserve_header(&param,
- header,
- RESERVE_HEADER_SIZE,
- &hlen))
- {
- ret= 1;
- break;
- }
- if (hlen == 0)
- continue;
- if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
- {
- ret= 1;
- break;
- }
- }
- unlock();
- return ret;
-}
-
-int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
- String *packet,
- const char *log_file,
- my_off_t log_pos)
-{
- Binlog_transmit_param param;
- param.flags= flags;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, before_send_event, false,
- (&param, (uchar *)packet->c_ptr(),
- packet->length(),
- log_file+dirname_length(log_file), log_pos));
- return ret;
-}
-
-int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
- String *packet)
-{
- Binlog_transmit_param param;
- param.flags= flags;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, after_send_event, false,
- (&param, packet->c_ptr(), packet->length()));
- return ret;
-}
-
-int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
-
-{
- Binlog_transmit_param param;
- param.flags= flags;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, after_reset_master, false, (&param));
- return ret;
-}
-
-void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
- Master_info *mi)
-{
- param->mysql= mi->mysql;
- param->user= mi->user;
- param->host= mi->host;
- param->port= mi->port;
- param->master_log_name= mi->master_log_name;
- param->master_log_pos= mi->master_log_pos;
-}
-
-int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
-{
- Binlog_relay_IO_param param;
- init_param(&param, mi);
-
- int ret= 0;
- FOREACH_OBSERVER(ret, thread_start, true, (&param));
- return ret;
-}
-
-
-int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
-{
-
- Binlog_relay_IO_param param;
- init_param(&param, mi);
-
- int ret= 0;
- FOREACH_OBSERVER(ret, thread_stop, false, (&param));
- return ret;
-}
-
-int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
- Master_info *mi,
- ushort flags)
-{
- Binlog_relay_IO_param param;
- init_param(&param, mi);
-
- int ret= 0;
- FOREACH_OBSERVER(ret, before_request_transmit, false, (&param, (uint32)flags));
- return ret;
-}
-
-int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
- const char *packet, ulong len,
- const char **event_buf,
- ulong *event_len)
-{
- Binlog_relay_IO_param param;
- init_param(&param, mi);
-
- int ret= 0;
- FOREACH_OBSERVER(ret, after_read_event, false,
- (&param, packet, len, event_buf, event_len));
- return ret;
-}
-
-int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
- const char *event_buf,
- ulong event_len,
- bool synced)
-{
- Binlog_relay_IO_param param;
- init_param(&param, mi);
-
- uint32 flags=0;
- if (synced)
- flags |= BINLOG_STORAGE_IS_SYNCED;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, after_queue_event, false,
- (&param, event_buf, event_len, flags));
- return ret;
-}
-
-int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
-
-{
- Binlog_relay_IO_param param;
- init_param(&param, mi);
-
- int ret= 0;
- FOREACH_OBSERVER(ret, after_reset_slave, false, (&param));
- return ret;
-}
-#endif /* HAVE_REPLICATION */
-
-int register_trans_observer(Trans_observer *observer, void *p)
-{
- return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
-}
-
-int unregister_trans_observer(Trans_observer *observer, void *p)
-{
- return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
-}
-
-int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
-{
- return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
-}
-
-int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
-{
- return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
-}
-
-#ifdef HAVE_REPLICATION
-int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
-{
- return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
-}
-
-int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
-{
- return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
-}
-
-int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
-{
- return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
-}
-
-int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
-{
- return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
-}
-#else
-int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
-{
- return 0;
-}
-
-int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
-{
- return 0;
-}
-
-int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
-{
- return 0;
-}
-
-int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
-{
- return 0;
-}
-#endif /* HAVE_REPLICATION */
diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h
deleted file mode 100644
index afcfd9d55b1..00000000000
--- a/sql/rpl_handler.h
+++ /dev/null
@@ -1,216 +0,0 @@
-/* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; version 2 of the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
-
-#ifndef RPL_HANDLER_H
-#define RPL_HANDLER_H
-
-#include "sql_priv.h"
-#include "rpl_mi.h"
-#include "rpl_rli.h"
-#include "sql_plugin.h"
-#include "replication.h"
-
-class Observer_info {
-public:
- void *observer;
- st_plugin_int *plugin_int;
-
- Observer_info(void *ob, st_plugin_int *p)
- :observer(ob), plugin_int(p)
- { }
-};
-
-class Delegate {
-public:
- typedef List<Observer_info> Observer_info_list;
- typedef List_iterator<Observer_info> Observer_info_iterator;
-
- int add_observer(void *observer, st_plugin_int *plugin)
- {
- int ret= FALSE;
- if (!inited)
- return TRUE;
- write_lock();
- Observer_info_iterator iter(observer_info_list);
- Observer_info *info= iter++;
- while (info && info->observer != observer)
- info= iter++;
- if (!info)
- {
- info= new Observer_info(observer, plugin);
- if (!info || observer_info_list.push_back(info, &memroot))
- ret= TRUE;
- }
- else
- ret= TRUE;
- unlock();
- return ret;
- }
-
- int remove_observer(void *observer, st_plugin_int *plugin)
- {
- int ret= FALSE;
- if (!inited)
- return TRUE;
- write_lock();
- Observer_info_iterator iter(observer_info_list);
- Observer_info *info= iter++;
- while (info && info->observer != observer)
- info= iter++;
- if (info)
- {
- iter.remove();
- delete info;
- }
- else
- ret= TRUE;
- unlock();
- return ret;
- }
-
- inline Observer_info_iterator observer_info_iter()
- {
- return Observer_info_iterator(observer_info_list);
- }
-
- inline bool is_empty()
- {
- return observer_info_list.is_empty();
- }
-
- inline int read_lock()
- {
- if (!inited)
- return TRUE;
- return rw_rdlock(&lock);
- }
-
- inline int write_lock()
- {
- if (!inited)
- return TRUE;
- return rw_wrlock(&lock);
- }
-
- inline int unlock()
- {
- if (!inited)
- return TRUE;
- return rw_unlock(&lock);
- }
-
- inline bool is_inited()
- {
- return inited;
- }
-
- Delegate()
- {
- inited= FALSE;
- if (my_rwlock_init(&lock, NULL))
- return;
- init_sql_alloc(&memroot, 1024, 0, MYF(0));
- inited= TRUE;
- }
- ~Delegate()
- {
- inited= FALSE;
- rwlock_destroy(&lock);
- free_root(&memroot, MYF(0));
- }
-
-private:
- Observer_info_list observer_info_list;
- rw_lock_t lock;
- MEM_ROOT memroot;
- bool inited;
-};
-
-class Trans_delegate
- :public Delegate {
-public:
- typedef Trans_observer Observer;
- int before_commit(THD *thd, bool all);
- int before_rollback(THD *thd, bool all);
- int after_commit(THD *thd, bool all);
- int after_rollback(THD *thd, bool all);
-};
-
-class Binlog_storage_delegate
- :public Delegate {
-public:
- typedef Binlog_storage_observer Observer;
- int after_flush(THD *thd, const char *log_file,
- my_off_t log_pos, bool synced,
- bool first_in_group, bool last_in_group);
- int after_sync(THD *thd, const char *log_file, my_off_t log_pos,
- bool first_in_group, bool last_in_group);
-};
-
-#ifdef HAVE_REPLICATION
-class Binlog_transmit_delegate
- :public Delegate {
-public:
- typedef Binlog_transmit_observer Observer;
- int transmit_start(THD *thd, ushort flags,
- const char *log_file, my_off_t log_pos);
- int transmit_stop(THD *thd, ushort flags);
- int reserve_header(THD *thd, ushort flags, String *packet);
- int before_send_event(THD *thd, ushort flags,
- String *packet, const
- char *log_file, my_off_t log_pos );
- int after_send_event(THD *thd, ushort flags,
- String *packet);
- int after_reset_master(THD *thd, ushort flags);
-};
-
-class Binlog_relay_IO_delegate
- :public Delegate {
-public:
- typedef Binlog_relay_IO_observer Observer;
- int thread_start(THD *thd, Master_info *mi);
- int thread_stop(THD *thd, Master_info *mi);
- int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
- int after_read_event(THD *thd, Master_info *mi,
- const char *packet, ulong len,
- const char **event_buf, ulong *event_len);
- int after_queue_event(THD *thd, Master_info *mi,
- const char *event_buf, ulong event_len,
- bool synced);
- int after_reset_slave(THD *thd, Master_info *mi);
-private:
- void init_param(Binlog_relay_IO_param *param, Master_info *mi);
-};
-#endif /* HAVE_REPLICATION */
-
-int delegates_init();
-void delegates_destroy();
-
-extern Trans_delegate *transaction_delegate;
-extern Binlog_storage_delegate *binlog_storage_delegate;
-#ifdef HAVE_REPLICATION
-extern Binlog_transmit_delegate *binlog_transmit_delegate;
-extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
-#endif /* HAVE_REPLICATION */
-
-/*
- if there is no observers in the delegate, we can return 0
- immediately.
-*/
-#define RUN_HOOK(group, hook, args) \
- (group ##_delegate->is_empty() ? \
- 0 : group ##_delegate->hook args)
-
-#endif /* RPL_HANDLER_H */
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 610bc77b683..14d74dc4bb7 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -311,6 +311,11 @@ class Master_info : public Slave_reporting_capability
/* The parallel replication mode. */
enum_slave_parallel_mode parallel_mode;
+ /*
+ semi_ack is used to identify if the current binlog event needs an
+ ACK from slave, or if delay_master is enabled.
+ */
+ int semi_ack;
};
int init_master_info(Master_info* mi, const char* master_info_fname,
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 4a6e813d73b..9ebd19a90e2 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -255,10 +255,8 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err)
rgi->rli->abort_slave= true;
rgi->rli->stop_for_until= false;
mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
+ rgi->rli->relay_log.signal_relay_log_update();
mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
- rgi->rli->relay_log.lock_binlog_end_pos();
- rgi->rli->relay_log.signal_update();
- rgi->rli->relay_log.unlock_binlog_end_pos();
}
@@ -823,7 +821,7 @@ do_retry:
for (;;)
{
old_offset= cur_offset;
- ev= Log_event::read_log_event(&rlog, 0, description_event,
+ ev= Log_event::read_log_event(&rlog, description_event,
opt_slave_sql_verify_checksum);
cur_offset= my_b_tell(&rlog);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index efb256fbe11..321eef97700 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -70,10 +70,12 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
relay_log_state.init();
#ifdef HAVE_PSI_INTERFACE
relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
- key_RELAYLOG_update_cond,
+ key_RELAYLOG_COND_relay_log_updated,
+ key_RELAYLOG_COND_bin_log_updated,
key_file_relaylog,
key_file_relaylog_index,
- key_RELAYLOG_COND_queue_busy);
+ key_RELAYLOG_COND_queue_busy,
+ key_LOCK_relaylog_end_pos);
#endif
group_relay_log_name[0]= event_relay_log_name[0]=
@@ -538,7 +540,7 @@ read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos,
if (my_b_tell(cur_log) >= start_pos)
break;
- if (!(ev= Log_event::read_log_event(cur_log, 0, fdev,
+ if (!(ev= Log_event::read_log_event(cur_log, fdev,
opt_slave_sql_verify_checksum)))
{
DBUG_PRINT("info",("could not read event, cur_log->error=%d",
diff --git a/sql/semisync.cc b/sql/semisync.cc
new file mode 100644
index 00000000000..a8a11f091db
--- /dev/null
+++ b/sql/semisync.cc
@@ -0,0 +1,32 @@
+/* Copyright (C) 2007 Google Inc.
+ Copyright (C) 2008 MySQL AB
+ Use is subject to license terms
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+#include <my_global.h>
+#include "semisync.h"
+
+const unsigned char Repl_semi_sync_base::k_packet_magic_num= 0xef;
+const unsigned char Repl_semi_sync_base::k_packet_flag_sync= 0x01;
+
+
+const unsigned long Trace::k_trace_general= 0x0001;
+const unsigned long Trace::k_trace_detail= 0x0010;
+const unsigned long Trace::k_trace_net_wait= 0x0020;
+const unsigned long Trace::k_trace_function= 0x0040;
+
+const unsigned char Repl_semi_sync_base::k_sync_header[2]=
+ {Repl_semi_sync_base::k_packet_magic_num, 0};
diff --git a/sql/semisync.h b/sql/semisync.h
new file mode 100644
index 00000000000..9deb6c5fd01
--- /dev/null
+++ b/sql/semisync.h
@@ -0,0 +1,73 @@
+/* Copyright (C) 2007 Google Inc.
+ Copyright (C) 2008 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+#ifndef SEMISYNC_H
+#define SEMISYNC_H
+
+#include "mysqld.h"
+#include "log_event.h"
+#include "replication.h"
+
+/**
+ This class is used to trace function calls and other process
+ information
+*/
+class Trace {
+public:
+ static const unsigned long k_trace_function;
+ static const unsigned long k_trace_general;
+ static const unsigned long k_trace_detail;
+ static const unsigned long k_trace_net_wait;
+
+ unsigned long m_trace_level; /* the level for tracing */
+
+ Trace()
+ :m_trace_level(0L)
+ {}
+ Trace(unsigned long trace_level)
+ :m_trace_level(trace_level)
+ {}
+};
+
+/**
+ Base class for semi-sync master and slave classes
+*/
+class Repl_semi_sync_base
+ :public Trace {
+public:
+ static const unsigned char k_sync_header[2]; /* three byte packet header */
+
+ /* Constants in network packet header. */
+ static const unsigned char k_packet_magic_num;
+ static const unsigned char k_packet_flag_sync;
+};
+
+/* The layout of a semisync slave reply packet:
+ 1 byte for the magic num
+ 8 bytes for the binlog positon
+ n bytes for the binlog filename, terminated with a '\0'
+*/
+#define REPLY_MAGIC_NUM_LEN 1
+#define REPLY_BINLOG_POS_LEN 8
+#define REPLY_BINLOG_NAME_LEN (FN_REFLEN + 1)
+#define REPLY_MAGIC_NUM_OFFSET 0
+#define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN)
+#define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN)
+#define REPLY_MESSAGE_MAX_LENGTH \
+ (REPLY_MAGIC_NUM_LEN + REPLY_BINLOG_POS_LEN + REPLY_BINLOG_NAME_LEN)
+
+#endif /* SEMISYNC_H */
diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc
new file mode 100644
index 00000000000..99d8b75ece8
--- /dev/null
+++ b/sql/semisync_master.cc
@@ -0,0 +1,1352 @@
+/* Copyright (C) 2007 Google Inc.
+ Copyright (c) 2008, 2013, Oracle and/or its affiliates.
+ Copyright (c) 2011, 2016, MariaDB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+#include <my_global.h>
+#include "semisync_master.h"
+
+#define TIME_THOUSAND 1000
+#define TIME_MILLION 1000000
+#define TIME_BILLION 1000000000
+
+/* This indicates whether semi-synchronous replication is enabled. */
+my_bool rpl_semi_sync_master_enabled= 0;
+unsigned long long rpl_semi_sync_master_request_ack = 0;
+unsigned long long rpl_semi_sync_master_get_ack = 0;
+my_bool rpl_semi_sync_master_wait_no_slave = 1;
+my_bool rpl_semi_sync_master_status = 0;
+ulong rpl_semi_sync_master_wait_point =
+ SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT;
+ulong rpl_semi_sync_master_timeout;
+ulong rpl_semi_sync_master_trace_level;
+ulong rpl_semi_sync_master_yes_transactions = 0;
+ulong rpl_semi_sync_master_no_transactions = 0;
+ulong rpl_semi_sync_master_off_times = 0;
+ulong rpl_semi_sync_master_timefunc_fails = 0;
+ulong rpl_semi_sync_master_wait_timeouts = 0;
+ulong rpl_semi_sync_master_wait_sessions = 0;
+ulong rpl_semi_sync_master_wait_pos_backtraverse = 0;
+ulong rpl_semi_sync_master_avg_trx_wait_time = 0;
+ulonglong rpl_semi_sync_master_trx_wait_num = 0;
+ulong rpl_semi_sync_master_avg_net_wait_time = 0;
+ulonglong rpl_semi_sync_master_net_wait_num = 0;
+ulong rpl_semi_sync_master_clients = 0;
+ulonglong rpl_semi_sync_master_net_wait_time = 0;
+ulonglong rpl_semi_sync_master_trx_wait_time = 0;
+
+Repl_semi_sync_master repl_semisync_master;
+Ack_receiver ack_receiver;
+
+/*
+ structure to save transaction log filename and position
+*/
+typedef struct Trans_binlog_info {
+ my_off_t log_pos;
+ char log_file[FN_REFLEN];
+} Trans_binlog_info;
+
+static int get_wait_time(const struct timespec& start_ts);
+
+static ulonglong timespec_to_usec(const struct timespec *ts)
+{
+ return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
+}
+
+/*******************************************************************************
+ *
+ * <Active_tranx> class : manage all active transaction nodes
+ *
+ ******************************************************************************/
+
+Active_tranx::Active_tranx(mysql_mutex_t *lock,
+ ulong trace_level)
+ : Trace(trace_level), m_allocator(max_connections),
+ m_num_entries(max_connections << 1), /* Transaction hash table size
+ * is set to double the size
+ * of max_connections */
+ m_lock(lock)
+{
+ /* No transactions are in the list initially. */
+ m_trx_front = NULL;
+ m_trx_rear = NULL;
+
+ /* Create the hash table to find a transaction's ending event. */
+ m_trx_htb = new Tranx_node *[m_num_entries];
+ for (int idx = 0; idx < m_num_entries; ++idx)
+ m_trx_htb[idx] = NULL;
+
+ sql_print_information("Semi-sync replication initialized for transactions.");
+}
+
+Active_tranx::~Active_tranx()
+{
+ delete [] m_trx_htb;
+ m_trx_htb = NULL;
+ m_num_entries = 0;
+}
+
+unsigned int Active_tranx::calc_hash(const unsigned char *key,
+ unsigned int length)
+{
+ unsigned int nr = 1, nr2 = 4;
+
+ /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
+ while (length--)
+ {
+ nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
+ nr2 += 3;
+ }
+ return((unsigned int) nr);
+}
+
+unsigned int Active_tranx::get_hash_value(const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
+ strlen(log_file_name));
+ unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
+ sizeof(log_file_pos));
+
+ return (hash1 + hash2) % m_num_entries;
+}
+
+int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
+ const char *log_file_name2, my_off_t log_file_pos2)
+{
+ int cmp = strcmp(log_file_name1, log_file_name2);
+
+ if (cmp != 0)
+ return cmp;
+
+ if (log_file_pos1 > log_file_pos2)
+ return 1;
+ else if (log_file_pos1 < log_file_pos2)
+ return -1;
+ return 0;
+}
+
+int Active_tranx::insert_tranx_node(const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ Tranx_node *ins_node;
+ int result = 0;
+ unsigned int hash_val;
+
+ DBUG_ENTER("Active_tranx:insert_tranx_node");
+
+ ins_node = m_allocator.allocate_node();
+ if (!ins_node)
+ {
+ sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
+ "Active_tranx:insert_tranx_node",
+ log_file_name, (ulong)log_file_pos);
+ result = -1;
+ goto l_end;
+ }
+
+ /* insert the binlog position in the active transaction list. */
+ strncpy(ins_node->log_name, log_file_name, FN_REFLEN-1);
+ ins_node->log_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
+ ins_node->log_pos = log_file_pos;
+
+ if (!m_trx_front)
+ {
+ /* The list is empty. */
+ m_trx_front = m_trx_rear = ins_node;
+ }
+ else
+ {
+ int cmp = compare(ins_node, m_trx_rear);
+ if (cmp > 0)
+ {
+ /* Compare with the tail first. If the transaction happens later in
+ * binlog, then make it the new tail.
+ */
+ m_trx_rear->next = ins_node;
+ m_trx_rear = ins_node;
+ }
+ else
+ {
+ /* Otherwise, it is an error because the transaction should hold the
+ * mysql_bin_log.LOCK_log when appending events.
+ */
+ sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
+ "new node (%s, %lu)", "Active_tranx:insert_tranx_node",
+ m_trx_rear->log_name, (ulong)m_trx_rear->log_pos,
+ ins_node->log_name, (ulong)ins_node->log_pos);
+ result = -1;
+ goto l_end;
+ }
+ }
+
+ hash_val = get_hash_value(ins_node->log_name, ins_node->log_pos);
+ ins_node->hash_next = m_trx_htb[hash_val];
+ m_trx_htb[hash_val] = ins_node;
+
+ DBUG_PRINT("semisync", ("%s: insert (%s, %lu) in entry(%u)",
+ "Active_tranx:insert_tranx_node",
+ ins_node->log_name, (ulong)ins_node->log_pos,
+ hash_val));
+ l_end:
+
+ DBUG_RETURN(result);
+}
+
+bool Active_tranx::is_tranx_end_pos(const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ DBUG_ENTER("Active_tranx::is_tranx_end_pos");
+
+ unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
+ Tranx_node *entry = m_trx_htb[hash_val];
+
+ while (entry != NULL)
+ {
+ if (compare(entry, log_file_name, log_file_pos) == 0)
+ break;
+
+ entry = entry->hash_next;
+ }
+
+ DBUG_PRINT("semisync", ("%s: probe (%s, %lu) in entry(%u)",
+ "Active_tranx::is_tranx_end_pos",
+ log_file_name, (ulong)log_file_pos, hash_val));
+
+ DBUG_RETURN(entry != NULL);
+}
+
+int Active_tranx::clear_active_tranx_nodes(const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ Tranx_node *new_front;
+
+ DBUG_ENTER("Active_tranx::::clear_active_tranx_nodes");
+
+ if (log_file_name != NULL)
+ {
+ new_front = m_trx_front;
+
+ while (new_front)
+ {
+ if (compare(new_front, log_file_name, log_file_pos) > 0)
+ break;
+ new_front = new_front->next;
+ }
+ }
+ else
+ {
+ /* If log_file_name is NULL, clear everything. */
+ new_front = NULL;
+ }
+
+ if (new_front == NULL)
+ {
+ /* No active transaction nodes after the call. */
+
+ /* Clear the hash table. */
+ memset(m_trx_htb, 0, m_num_entries * sizeof(Tranx_node *));
+ m_allocator.free_all_nodes();
+
+ /* Clear the active transaction list. */
+ if (m_trx_front != NULL)
+ {
+ m_trx_front = NULL;
+ m_trx_rear = NULL;
+ }
+
+ DBUG_PRINT("semisync", ("%s: cleared all nodes",
+ "Active_tranx::::clear_active_tranx_nodes"));
+ }
+ else if (new_front != m_trx_front)
+ {
+ Tranx_node *curr_node, *next_node;
+
+ /* Delete all transaction nodes before the confirmation point. */
+ int n_frees = 0;
+ curr_node = m_trx_front;
+ while (curr_node != new_front)
+ {
+ next_node = curr_node->next;
+ n_frees++;
+
+ /* Remove the node from the hash table. */
+ unsigned int hash_val = get_hash_value(curr_node->log_name, curr_node->log_pos);
+ Tranx_node **hash_ptr = &(m_trx_htb[hash_val]);
+ while ((*hash_ptr) != NULL)
+ {
+ if ((*hash_ptr) == curr_node)
+ {
+ (*hash_ptr) = curr_node->hash_next;
+ break;
+ }
+ hash_ptr = &((*hash_ptr)->hash_next);
+ }
+
+ curr_node = next_node;
+ }
+
+ m_trx_front = new_front;
+ m_allocator.free_nodes_before(m_trx_front);
+
+ DBUG_PRINT("semisync", ("%s: cleared %d nodes back until pos (%s, %lu)",
+ "Active_tranx::::clear_active_tranx_nodes",
+ n_frees,
+ m_trx_front->log_name, (ulong)m_trx_front->log_pos));
+ }
+
+ DBUG_RETURN(0);
+}
+
+
+/*******************************************************************************
+ *
+ * <Repl_semi_sync_master> class: the basic code layer for syncsync master.
+ * <Repl_semi_sync_slave> class: the basic code layer for syncsync slave.
+ *
+ * The most important functions during semi-syn replication listed:
+ *
+ * Master:
+ * . report_reply_binlog(): called by the binlog dump thread when it receives
+ * the slave's status information.
+ * . update_sync_header(): based on transaction waiting information, decide
+ * whether to request the slave to reply.
+ * . write_tranx_in_binlog(): called by the transaction thread when it finishes
+ * writing all transaction events in binlog.
+ * . commit_trx(): transaction thread wait for the slave reply.
+ *
+ * Slave:
+ * . slave_read_sync_header(): read the semi-sync header from the master, get
+ * the sync status and get the payload for events.
+ * . slave_reply(): reply to the master about the replication progress.
+ *
+ ******************************************************************************/
+
+Repl_semi_sync_master::Repl_semi_sync_master()
+ : m_active_tranxs(NULL),
+ m_init_done(false),
+ m_reply_file_name_inited(false),
+ m_reply_file_pos(0L),
+ m_wait_file_name_inited(false),
+ m_wait_file_pos(0),
+ m_master_enabled(false),
+ m_wait_timeout(0L),
+ m_state(0),
+ m_wait_point(0)
+{
+ strcpy(m_reply_file_name, "");
+ strcpy(m_wait_file_name, "");
+}
+
+int Repl_semi_sync_master::init_object()
+{
+ int result;
+
+ m_init_done = true;
+
+ /* References to the parameter works after set_options(). */
+ set_wait_timeout(rpl_semi_sync_master_timeout);
+ set_trace_level(rpl_semi_sync_master_trace_level);
+ set_wait_point(rpl_semi_sync_master_wait_point);
+
+ /* Mutex initialization can only be done after MY_INIT(). */
+ mysql_mutex_init(key_LOCK_binlog,
+ &LOCK_binlog, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_binlog_send,
+ &COND_binlog_send, NULL);
+
+ if (rpl_semi_sync_master_enabled)
+ {
+ result = enable_master();
+ if (!result)
+ result= ack_receiver.start(); /* Start the ACK thread. */
+ }
+ else
+ {
+ result = disable_master();
+ }
+
+ /*
+ If rpl_semi_sync_master_wait_no_slave is disabled, let's temporarily
+ switch off semisync to avoid hang if there's none active slave.
+ */
+ if (!rpl_semi_sync_master_wait_no_slave)
+ switch_off();
+
+ return result;
+}
+
+int Repl_semi_sync_master::enable_master()
+{
+ int result = 0;
+
+ /* Must have the lock when we do enable of disable. */
+ lock();
+
+ if (!get_master_enabled())
+ {
+ m_active_tranxs = new Active_tranx(&LOCK_binlog, m_trace_level);
+ if (m_active_tranxs != NULL)
+ {
+ m_commit_file_name_inited = false;
+ m_reply_file_name_inited = false;
+ m_wait_file_name_inited = false;
+
+ set_master_enabled(true);
+ m_state = true;
+ sql_print_information("Semi-sync replication enabled on the master.");
+ }
+ else
+ {
+ sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
+ result = -1;
+ }
+ }
+
+ unlock();
+
+ return result;
+}
+
+int Repl_semi_sync_master::disable_master()
+{
+ /* Must have the lock when we do enable of disable. */
+ lock();
+
+ if (get_master_enabled())
+ {
+ /* Switch off the semi-sync first so that waiting transaction will be
+ * waken up.
+ */
+ switch_off();
+
+ assert(m_active_tranxs != NULL);
+ delete m_active_tranxs;
+ m_active_tranxs = NULL;
+
+ m_reply_file_name_inited = false;
+ m_wait_file_name_inited = false;
+ m_commit_file_name_inited = false;
+
+ set_master_enabled(false);
+ sql_print_information("Semi-sync replication disabled on the master.");
+ }
+
+ unlock();
+
+ return 0;
+}
+
+void Repl_semi_sync_master::cleanup()
+{
+ if (m_init_done)
+ {
+ mysql_mutex_destroy(&LOCK_binlog);
+ mysql_cond_destroy(&COND_binlog_send);
+ m_init_done= 0;
+ }
+
+ delete m_active_tranxs;
+}
+
+void Repl_semi_sync_master::lock()
+{
+ mysql_mutex_lock(&LOCK_binlog);
+}
+
+void Repl_semi_sync_master::unlock()
+{
+ mysql_mutex_unlock(&LOCK_binlog);
+}
+
+void Repl_semi_sync_master::cond_broadcast()
+{
+ mysql_cond_broadcast(&COND_binlog_send);
+}
+
+int Repl_semi_sync_master::cond_timewait(struct timespec *wait_time)
+{
+ int wait_res;
+
+ DBUG_ENTER("Repl_semi_sync_master::cond_timewait()");
+
+ wait_res= mysql_cond_timedwait(&COND_binlog_send,
+ &LOCK_binlog, wait_time);
+
+ DBUG_RETURN(wait_res);
+}
+
+void Repl_semi_sync_master::add_slave()
+{
+ lock();
+ rpl_semi_sync_master_clients++;
+ unlock();
+}
+
+void Repl_semi_sync_master::remove_slave()
+{
+ lock();
+ rpl_semi_sync_master_clients--;
+
+ /* Only switch off if semi-sync is enabled and is on */
+ if (get_master_enabled() && is_on())
+ {
+ /* If user has chosen not to wait if no semi-sync slave available
+ and the last semi-sync slave exits, turn off semi-sync on master
+ immediately.
+ */
+ if (!rpl_semi_sync_master_wait_no_slave &&
+ rpl_semi_sync_master_clients == 0)
+ switch_off();
+ }
+ unlock();
+}
+
+int Repl_semi_sync_master::report_reply_packet(uint32 server_id,
+ const uchar *packet,
+ ulong packet_len)
+{
+ int result= -1;
+ char log_file_name[FN_REFLEN+1];
+ my_off_t log_file_pos;
+ ulong log_file_len = 0;
+
+ DBUG_ENTER("Repl_semi_sync_master::report_reply_packet");
+
+ if (unlikely(packet[REPLY_MAGIC_NUM_OFFSET] !=
+ Repl_semi_sync_master::k_packet_magic_num))
+ {
+ sql_print_error("Read semi-sync reply magic number error");
+ goto l_end;
+ }
+
+ if (unlikely(packet_len < REPLY_BINLOG_NAME_OFFSET))
+ {
+ sql_print_error("Read semi-sync reply length error: packet is too small");
+ goto l_end;
+ }
+
+ log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
+ log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
+ if (unlikely(log_file_len >= FN_REFLEN))
+ {
+ sql_print_error("Read semi-sync reply binlog file length too large");
+ goto l_end;
+ }
+ strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
+ log_file_name[log_file_len] = 0;
+
+ DBUG_ASSERT(dirname_length(log_file_name) == 0);
+
+ DBUG_PRINT("semisync", ("%s: Got reply(%s, %lu) from server %u",
+ "Repl_semi_sync_master::report_reply_packet",
+ log_file_name, (ulong)log_file_pos, server_id));
+
+ rpl_semi_sync_master_get_ack++;
+ report_reply_binlog(server_id, log_file_name, log_file_pos);
+
+l_end:
+
+ DBUG_RETURN(result);
+}
+
+int Repl_semi_sync_master::report_reply_binlog(uint32 server_id,
+ const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ int cmp;
+ bool can_release_threads = false;
+ bool need_copy_send_pos = true;
+
+ DBUG_ENTER("Repl_semi_sync_master::report_reply_binlog");
+
+ if (!(get_master_enabled()))
+ DBUG_RETURN(0);
+
+ lock();
+
+ /* This is the real check inside the mutex. */
+ if (!get_master_enabled())
+ goto l_end;
+
+ if (!is_on())
+ /* We check to see whether we can switch semi-sync ON. */
+ try_switch_on(server_id, log_file_name, log_file_pos);
+
+ /* The position should increase monotonically, if there is only one
+ * thread sending the binlog to the slave.
+ * In reality, to improve the transaction availability, we allow multiple
+ * sync replication slaves. So, if any one of them get the transaction,
+ * the transaction session in the primary can move forward.
+ */
+ if (m_reply_file_name_inited)
+ {
+ cmp = Active_tranx::compare(log_file_name, log_file_pos,
+ m_reply_file_name, m_reply_file_pos);
+
+ /* If the requested position is behind the sending binlog position,
+ * would not adjust sending binlog position.
+ * We based on the assumption that there are multiple semi-sync slave,
+ * and at least one of them shou/ld be up to date.
+ * If all semi-sync slaves are behind, at least initially, the primary
+ * can find the situation after the waiting timeout. After that, some
+ * slaves should catch up quickly.
+ */
+ if (cmp < 0)
+ {
+ /* If the position is behind, do not copy it. */
+ need_copy_send_pos = false;
+ }
+ }
+
+ if (need_copy_send_pos)
+ {
+ strmake_buf(m_reply_file_name, log_file_name);
+ m_reply_file_pos = log_file_pos;
+ m_reply_file_name_inited = true;
+
+ /* Remove all active transaction nodes before this point. */
+ assert(m_active_tranxs != NULL);
+ m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos);
+
+ DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)",
+ "Repl_semi_sync_master::report_reply_binlog",
+ log_file_name, (ulong)log_file_pos));
+ }
+
+ if (rpl_semi_sync_master_wait_sessions > 0)
+ {
+ /* Let us check if some of the waiting threads doing a trx
+ * commit can now proceed.
+ */
+ cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos,
+ m_wait_file_name, m_wait_file_pos);
+ if (cmp >= 0)
+ {
+ /* Yes, at least one waiting thread can now proceed:
+ * let us release all waiting threads with a broadcast
+ */
+ can_release_threads = true;
+ m_wait_file_name_inited = false;
+ }
+ }
+
+ l_end:
+ unlock();
+
+ if (can_release_threads)
+ {
+ DBUG_PRINT("semisync", ("%s: signal all waiting threads.",
+ "Repl_semi_sync_master::report_reply_binlog"));
+
+ cond_broadcast();
+ }
+
+ DBUG_RETURN(0);
+}
+
+int Repl_semi_sync_master::wait_after_sync(const char *log_file, my_off_t log_pos)
+{
+ if (!get_master_enabled())
+ return 0;
+
+ int ret= 0;
+ if(log_pos &&
+ wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
+ ret= commit_trx(log_file + dirname_length(log_file), log_pos);
+
+ return ret;
+}
+
+int Repl_semi_sync_master::wait_after_commit(THD* thd, bool all)
+{
+ if (!get_master_enabled())
+ return 0;
+
+ int ret= 0;
+ const char *log_file;
+ my_off_t log_pos;
+
+ bool is_real_trans=
+ (all || thd->transaction.all.ha_list == 0);
+ /*
+ The coordinates are propagated to this point having been computed
+ in report_binlog_update
+ */
+ Trans_binlog_info *log_info= thd->semisync_info;
+ log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
+ log_pos= log_info ? log_info->log_pos : 0;
+
+ DBUG_ASSERT(!log_file || dirname_length(log_file) == 0);
+
+ if (is_real_trans &&
+ log_pos &&
+ wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
+ ret= commit_trx(log_file, log_pos);
+
+ if (is_real_trans && log_info)
+ {
+ log_info->log_file[0]= 0;
+ log_info->log_pos= 0;
+ }
+
+ return ret;
+}
+
+int Repl_semi_sync_master::wait_after_rollback(THD *thd, bool all)
+{
+ return wait_after_commit(thd, all);
+}
+
+/**
+ The method runs after flush to binary log is done.
+*/
+int Repl_semi_sync_master::report_binlog_update(THD* thd, const char *log_file,
+ my_off_t log_pos)
+{
+ if (get_master_enabled())
+ {
+ Trans_binlog_info *log_info;
+
+ if (!(log_info= thd->semisync_info))
+ {
+ if(!(log_info=
+ (Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0))))
+ return 1;
+ thd->semisync_info= log_info;
+ }
+ strcpy(log_info->log_file, log_file + dirname_length(log_file));
+ log_info->log_pos = log_pos;
+
+ return write_tranx_in_binlog(log_info->log_file, log_pos);
+ }
+
+ return 0;
+}
+
+int Repl_semi_sync_master::dump_start(THD* thd,
+ const char *log_file,
+ my_off_t log_pos)
+{
+ if (!thd->semi_sync_slave)
+ return 0;
+
+ if (ack_receiver.add_slave(thd))
+ {
+ sql_print_error("Failed to register slave to semi-sync ACK receiver "
+ "thread. Turning off semisync");
+ thd->semi_sync_slave= 0;
+ return 1;
+ }
+
+ add_slave();
+ report_reply_binlog(thd->variables.server_id,
+ log_file + dirname_length(log_file), log_pos);
+ sql_print_information("Start semi-sync binlog_dump to slave (server_id: %d), "
+ "pos(%s, %lu",
+ thd->variables.server_id, log_file,
+ (unsigned long)log_pos);
+
+ return 0;
+}
+
+void Repl_semi_sync_master::dump_end(THD* thd)
+{
+ if (!thd->semi_sync_slave)
+ return;
+
+ sql_print_information("Stop semi-sync binlog_dump to slave (server_id: %d)", thd->variables.server_id);
+
+ remove_slave();
+ ack_receiver.remove_slave(thd);
+
+ return;
+}
+
+int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
+ my_off_t trx_wait_binlog_pos)
+{
+
+ DBUG_ENTER("Repl_semi_sync_master::commit_trx");
+
+ if (get_master_enabled() && trx_wait_binlog_name)
+ {
+ struct timespec start_ts;
+ struct timespec abstime;
+ int wait_result;
+ PSI_stage_info old_stage;
+
+ set_timespec(start_ts, 0);
+
+ DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock");
+ /* Acquire the mutex. */
+ lock();
+
+ /* This must be called after acquired the lock */
+ THD_ENTER_COND(NULL, &COND_binlog_send, &LOCK_binlog,
+ & stage_waiting_for_semi_sync_ack_from_slave,
+ & old_stage);
+
+ /* This is the real check inside the mutex. */
+ if (!get_master_enabled() || !is_on())
+ goto l_end;
+
+ DBUG_PRINT("semisync", ("%s: wait pos (%s, %lu), repl(%d)\n",
+ "Repl_semi_sync_master::commit_trx",
+ trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
+ (int)is_on()));
+
+ while (is_on() && !thd_killed(current_thd))
+ {
+ if (m_reply_file_name_inited)
+ {
+ int cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos,
+ trx_wait_binlog_name,
+ trx_wait_binlog_pos);
+ if (cmp >= 0)
+ {
+ /* We have already sent the relevant binlog to the slave: no need to
+ * wait here.
+ */
+ DBUG_PRINT("semisync", ("%s: Binlog reply is ahead (%s, %lu),",
+ "Repl_semi_sync_master::commit_trx",
+ m_reply_file_name,
+ (ulong)m_reply_file_pos));
+ break;
+ }
+ }
+
+ /* Let us update the info about the minimum binlog position of waiting
+ * threads.
+ */
+ if (m_wait_file_name_inited)
+ {
+ int cmp = Active_tranx::compare(trx_wait_binlog_name,
+ trx_wait_binlog_pos,
+ m_wait_file_name, m_wait_file_pos);
+ if (cmp <= 0)
+ {
+ /* This thd has a lower position, let's update the minimum info. */
+ strmake_buf(m_wait_file_name, trx_wait_binlog_name);
+ m_wait_file_pos = trx_wait_binlog_pos;
+
+ rpl_semi_sync_master_wait_pos_backtraverse++;
+ DBUG_PRINT("semisync", ("%s: move back wait position (%s, %lu),",
+ "Repl_semi_sync_master::commit_trx",
+ m_wait_file_name, (ulong)m_wait_file_pos));
+ }
+ }
+ else
+ {
+ strmake_buf(m_wait_file_name, trx_wait_binlog_name);
+ m_wait_file_pos = trx_wait_binlog_pos;
+ m_wait_file_name_inited = true;
+
+ DBUG_PRINT("semisync", ("%s: init wait position (%s, %lu),",
+ "Repl_semi_sync_master::commit_trx",
+ m_wait_file_name, (ulong)m_wait_file_pos));
+ }
+
+ /* Calcuate the waiting period. */
+ long diff_secs = (long) (m_wait_timeout / TIME_THOUSAND);
+ long diff_nsecs = (long) ((m_wait_timeout % TIME_THOUSAND) * TIME_MILLION);
+ long nsecs = start_ts.tv_nsec + diff_nsecs;
+ abstime.tv_sec = start_ts.tv_sec + diff_secs + nsecs/TIME_BILLION;
+ abstime.tv_nsec = nsecs % TIME_BILLION;
+
+ /* In semi-synchronous replication, we wait until the binlog-dump
+ * thread has received the reply on the relevant binlog segment from the
+ * replication slave.
+ *
+ * Let us suspend this thread to wait on the condition;
+ * when replication has progressed far enough, we will release
+ * these waiting threads.
+ */
+ rpl_semi_sync_master_wait_sessions++;
+
+ DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)",
+ "Repl_semi_sync_master::commit_trx",
+ m_wait_timeout,
+ m_wait_file_name, (ulong)m_wait_file_pos));
+
+ wait_result = cond_timewait(&abstime);
+ rpl_semi_sync_master_wait_sessions--;
+
+ if (wait_result != 0)
+ {
+ /* This is a real wait timeout. */
+ sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
+ "semi-sync up to file %s, position %lu.",
+ trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
+ m_reply_file_name, (ulong)m_reply_file_pos);
+ rpl_semi_sync_master_wait_timeouts++;
+
+ /* switch semi-sync off */
+ switch_off();
+ }
+ else
+ {
+ int wait_time;
+
+ wait_time = get_wait_time(start_ts);
+ if (wait_time < 0)
+ {
+ DBUG_PRINT("semisync", ("Replication semi-sync getWaitTime fail at "
+ "wait position (%s, %lu)",
+ trx_wait_binlog_name,
+ (ulong)trx_wait_binlog_pos));
+ rpl_semi_sync_master_timefunc_fails++;
+ }
+ else
+ {
+ rpl_semi_sync_master_trx_wait_num++;
+ rpl_semi_sync_master_trx_wait_time += wait_time;
+ }
+ }
+ }
+
+ /*
+ At this point, the binlog file and position of this transaction
+ must have been removed from Active_tranx.
+ m_active_tranxs may be NULL if someone disabled semi sync during
+ cond_timewait()
+ */
+ assert(thd_killed(current_thd) || !m_active_tranxs ||
+ !m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name,
+ trx_wait_binlog_pos));
+
+ l_end:
+ /* Update the status counter. */
+ if (is_on())
+ rpl_semi_sync_master_yes_transactions++;
+ else
+ rpl_semi_sync_master_no_transactions++;
+
+ /* The lock held will be released by thd_exit_cond, so no need to
+ call unlock() here */
+ THD_EXIT_COND(NULL, & old_stage);
+ }
+
+ DBUG_RETURN(0);
+}
+
+/* Indicate that semi-sync replication is OFF now.
+ *
+ * What should we do when it is disabled? The problem is that we want
+ * the semi-sync replication enabled again when the slave catches up
+ * later. But, it is not that easy to detect that the slave has caught
+ * up. This is caused by the fact that MySQL's replication protocol is
+ * asynchronous, meaning that if the master does not use the semi-sync
+ * protocol, the slave would not send anything to the master.
+ * Still, if the master is sending (N+1)-th event, we assume that it is
+ * an indicator that the slave has received N-th event and earlier ones.
+ *
+ * If semi-sync is disabled, all transactions still update the wait
+ * position with the last position in binlog. But no transactions will
+ * wait for confirmations and the active transaction list would not be
+ * maintained. In binlog dump thread, update_sync_header() checks whether
+ * the current sending event catches up with last wait position. If it
+ * does match, semi-sync will be switched on again.
+ */
+int Repl_semi_sync_master::switch_off()
+{
+ int result;
+
+ DBUG_ENTER("Repl_semi_sync_master::switch_off");
+
+ m_state = false;
+
+ /* Clear the active transaction list. */
+ assert(m_active_tranxs != NULL);
+ result = m_active_tranxs->clear_active_tranx_nodes(NULL, 0);
+
+ rpl_semi_sync_master_off_times++;
+ m_wait_file_name_inited = false;
+ m_reply_file_name_inited = false;
+ sql_print_information("Semi-sync replication switched OFF.");
+ cond_broadcast(); /* wake up all waiting threads */
+
+ DBUG_RETURN(result);
+}
+
+int Repl_semi_sync_master::try_switch_on(int server_id,
+ const char *log_file_name,
+ my_off_t log_file_pos)
+{
+ bool semi_sync_on = false;
+
+ DBUG_ENTER("Repl_semi_sync_master::try_switch_on");
+
+ /* If the current sending event's position is larger than or equal to the
+ * 'largest' commit transaction binlog position, the slave is already
+ * catching up now and we can switch semi-sync on here.
+ * If m_commit_file_name_inited indicates there are no recent transactions,
+ * we can enable semi-sync immediately.
+ */
+ if (m_commit_file_name_inited)
+ {
+ int cmp = Active_tranx::compare(log_file_name, log_file_pos,
+ m_commit_file_name, m_commit_file_pos);
+ semi_sync_on = (cmp >= 0);
+ }
+ else
+ {
+ semi_sync_on = true;
+ }
+
+ if (semi_sync_on)
+ {
+ /* Switch semi-sync replication on. */
+ m_state = true;
+
+ sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
+ "at (%s, %lu)",
+ server_id, log_file_name,
+ (ulong)log_file_pos);
+ }
+
+ DBUG_RETURN(0);
+}
+
+int Repl_semi_sync_master::reserve_sync_header(String* packet)
+{
+ DBUG_ENTER("Repl_semi_sync_master::reserve_sync_header");
+
+ /* Set the magic number and the sync status. By default, no sync
+ * is required.
+ */
+ packet->append(reinterpret_cast<const char*>(k_sync_header),
+ sizeof(k_sync_header));
+ DBUG_RETURN(0);
+}
+
+int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet,
+ const char *log_file_name,
+ my_off_t log_file_pos,
+ bool* need_sync)
+{
+ int cmp = 0;
+ bool sync = false;
+
+ DBUG_ENTER("Repl_semi_sync_master::update_sync_header");
+
+ /* If the semi-sync master is not enabled, or the slave is not a semi-sync
+ * target, do not request replies from the slave.
+ */
+ if (!get_master_enabled() || !thd->semi_sync_slave)
+ {
+ *need_sync = false;
+ DBUG_RETURN(0);
+ }
+
+ lock();
+
+ /* This is the real check inside the mutex. */
+ if (!get_master_enabled())
+ {
+ assert(sync == false);
+ goto l_end;
+ }
+
+ if (is_on())
+ {
+ /* semi-sync is ON */
+ sync = false; /* No sync unless a transaction is involved. */
+
+ if (m_reply_file_name_inited)
+ {
+ cmp = Active_tranx::compare(log_file_name, log_file_pos,
+ m_reply_file_name, m_reply_file_pos);
+ if (cmp <= 0)
+ {
+ /* If we have already got the reply for the event, then we do
+ * not need to sync the transaction again.
+ */
+ goto l_end;
+ }
+ }
+
+ if (m_wait_file_name_inited)
+ {
+ cmp = Active_tranx::compare(log_file_name, log_file_pos,
+ m_wait_file_name, m_wait_file_pos);
+ }
+ else
+ {
+ cmp = 1;
+ }
+
+ /* If we are already waiting for some transaction replies which
+ * are later in binlog, do not wait for this one event.
+ */
+ if (cmp >= 0)
+ {
+ /*
+ * We only wait if the event is a transaction's ending event.
+ */
+ assert(m_active_tranxs != NULL);
+ sync = m_active_tranxs->is_tranx_end_pos(log_file_name,
+ log_file_pos);
+ }
+ }
+ else
+ {
+ if (m_commit_file_name_inited)
+ {
+ int cmp = Active_tranx::compare(log_file_name, log_file_pos,
+ m_commit_file_name, m_commit_file_pos);
+ sync = (cmp >= 0);
+ }
+ else
+ {
+ sync = true;
+ }
+ }
+
+ DBUG_PRINT("semisync", ("%s: server(%lu), (%s, %lu) sync(%d), repl(%d)",
+ "Repl_semi_sync_master::update_sync_header",
+ thd->variables.server_id, log_file_name,
+ (ulong)log_file_pos, sync, (int)is_on()));
+ *need_sync= sync;
+
+ l_end:
+ unlock();
+
+ /* We do not need to clear sync flag because we set it to 0 when we
+ * reserve the packet header.
+ */
+ if (sync)
+ {
+ (packet)[2] = k_packet_flag_sync;
+ }
+
+ DBUG_RETURN(0);
+}
+
+int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name,
+ my_off_t log_file_pos)
+{
+ int result = 0;
+
+ DBUG_ENTER("Repl_semi_sync_master::write_tranx_in_binlog");
+
+ lock();
+
+ /* This is the real check inside the mutex. */
+ if (!get_master_enabled())
+ goto l_end;
+
+ /* Update the 'largest' transaction commit position seen so far even
+ * though semi-sync is switched off.
+ * It is much better that we update m_commit_file* here, instead of
+ * inside commit_trx(). This is mostly because update_sync_header()
+ * will watch for m_commit_file* to decide whether to switch semi-sync
+ * on. The detailed reason is explained in function update_sync_header().
+ */
+ if (m_commit_file_name_inited)
+ {
+ int cmp = Active_tranx::compare(log_file_name, log_file_pos,
+ m_commit_file_name, m_commit_file_pos);
+ if (cmp > 0)
+ {
+ /* This is a larger position, let's update the maximum info. */
+ strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1);
+ m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
+ m_commit_file_pos = log_file_pos;
+ }
+ }
+ else
+ {
+ strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1);
+ m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
+ m_commit_file_pos = log_file_pos;
+ m_commit_file_name_inited = true;
+ }
+
+ if (is_on())
+ {
+ assert(m_active_tranxs != NULL);
+ if(m_active_tranxs->insert_tranx_node(log_file_name, log_file_pos))
+ {
+ /*
+ if insert tranx_node failed, print a warning message
+ and turn off semi-sync
+ */
+ sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
+ log_file_name, (ulong)log_file_pos);
+ switch_off();
+ }
+ else
+ {
+ rpl_semi_sync_master_request_ack++;
+ }
+ }
+
+ l_end:
+ unlock();
+
+ DBUG_RETURN(result);
+}
+
+int Repl_semi_sync_master::flush_net(THD *thd,
+ const char *event_buf)
+{
+ int result = -1;
+ NET* net= &thd->net;
+
+ DBUG_ENTER("Repl_semi_sync_master::flush_net");
+
+ assert((unsigned char)event_buf[1] == k_packet_magic_num);
+ if ((unsigned char)event_buf[2] != k_packet_flag_sync)
+ {
+ /* current event does not require reply */
+ result = 0;
+ goto l_end;
+ }
+
+ /* We flush to make sure that the current event is sent to the network,
+ * instead of being buffered in the TCP/IP stack.
+ */
+ if (net_flush(net))
+ {
+ sql_print_error("Semi-sync master failed on net_flush() "
+ "before waiting for slave reply");
+ goto l_end;
+ }
+
+ net_clear(net, 0);
+ net->pkt_nr++;
+ result = 0;
+ rpl_semi_sync_master_net_wait_num++;
+
+ l_end:
+ thd->clear_error();
+
+ DBUG_RETURN(result);
+}
+
+int Repl_semi_sync_master::after_reset_master()
+{
+ int result = 0;
+
+ DBUG_ENTER("Repl_semi_sync_master::after_reset_master");
+
+ if (rpl_semi_sync_master_enabled)
+ {
+ sql_print_information("Enable Semi-sync Master after reset master");
+ enable_master();
+ }
+
+ lock();
+
+ if (rpl_semi_sync_master_clients == 0 &&
+ !rpl_semi_sync_master_wait_no_slave)
+ m_state = 0;
+ else
+ m_state = get_master_enabled()? 1 : 0;
+
+ m_wait_file_name_inited = false;
+ m_reply_file_name_inited = false;
+ m_commit_file_name_inited = false;
+
+ rpl_semi_sync_master_yes_transactions = 0;
+ rpl_semi_sync_master_no_transactions = 0;
+ rpl_semi_sync_master_off_times = 0;
+ rpl_semi_sync_master_timefunc_fails = 0;
+ rpl_semi_sync_master_wait_sessions = 0;
+ rpl_semi_sync_master_wait_pos_backtraverse = 0;
+ rpl_semi_sync_master_trx_wait_num = 0;
+ rpl_semi_sync_master_trx_wait_time = 0;
+ rpl_semi_sync_master_net_wait_num = 0;
+ rpl_semi_sync_master_net_wait_time = 0;
+
+ unlock();
+
+ DBUG_RETURN(result);
+}
+
+int Repl_semi_sync_master::before_reset_master()
+{
+ int result = 0;
+
+ DBUG_ENTER("Repl_semi_sync_master::before_reset_master");
+
+ if (rpl_semi_sync_master_enabled)
+ disable_master();
+
+ DBUG_RETURN(result);
+}
+
+void Repl_semi_sync_master::check_and_switch()
+{
+ lock();
+ if (get_master_enabled() && is_on())
+ {
+ if (!rpl_semi_sync_master_wait_no_slave
+ && rpl_semi_sync_master_clients == 0)
+ switch_off();
+ }
+ unlock();
+}
+
+void Repl_semi_sync_master::set_export_stats()
+{
+ lock();
+
+ rpl_semi_sync_master_status = m_state;
+ rpl_semi_sync_master_avg_trx_wait_time=
+ ((rpl_semi_sync_master_trx_wait_num) ?
+ (ulong)((double)rpl_semi_sync_master_trx_wait_time /
+ ((double)rpl_semi_sync_master_trx_wait_num)) : 0);
+ rpl_semi_sync_master_avg_net_wait_time=
+ ((rpl_semi_sync_master_net_wait_num) ?
+ (ulong)((double)rpl_semi_sync_master_net_wait_time /
+ ((double)rpl_semi_sync_master_net_wait_num)) : 0);
+
+ unlock();
+}
+
+/* Get the waiting time given the wait's staring time.
+ *
+ * Return:
+ * >= 0: the waiting time in microsecons(us)
+ * < 0: error in get time or time back traverse
+ */
+static int get_wait_time(const struct timespec& start_ts)
+{
+ ulonglong start_usecs, end_usecs;
+ struct timespec end_ts;
+
+ /* Starting time in microseconds(us). */
+ start_usecs = timespec_to_usec(&start_ts);
+
+ /* Get the wait time interval. */
+ set_timespec(end_ts, 0);
+
+ /* Ending time in microseconds(us). */
+ end_usecs = timespec_to_usec(&end_ts);
+
+ if (end_usecs < start_usecs)
+ return -1;
+
+ return (int)(end_usecs - start_usecs);
+}
+
+void semi_sync_master_deinit()
+{
+ repl_semisync_master.cleanup();
+ ack_receiver.cleanup();
+}
diff --git a/sql/semisync_master.h b/sql/semisync_master.h
new file mode 100644
index 00000000000..a58c1a7ae6e
--- /dev/null
+++ b/sql/semisync_master.h
@@ -0,0 +1,674 @@
+/* Copyright (C) 2007 Google Inc.
+ Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc.
+ Use is subject to license terms.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+#ifndef SEMISYNC_MASTER_H
+#define SEMISYNC_MASTER_H
+
+#include "semisync.h"
+#include "semisync_master_ack_receiver.h"
+
+#ifdef HAVE_PSI_INTERFACE
+extern PSI_mutex_key key_LOCK_binlog;
+extern PSI_cond_key key_COND_binlog_send;
+#endif
+
+struct Tranx_node {
+ char log_name[FN_REFLEN];
+ my_off_t log_pos;
+ struct Tranx_node *next; /* the next node in the sorted list */
+ struct Tranx_node *hash_next; /* the next node during hash collision */
+};
+
+/**
+ @class Tranx_node_allocator
+
+ This class provides memory allocating and freeing methods for
+ Tranx_node. The main target is performance.
+
+ @section ALLOCATE How to allocate a node
+ The pointer of the first node after 'last_node' in current_block is
+ returned. current_block will move to the next free Block when all nodes of
+ it are in use. A new Block is allocated and is put into the rear of the
+ Block link table if no Block is free.
+
+ The list starts up empty (ie, there is no allocated Block).
+
+ After some nodes are freed, there probably are some free nodes before
+ the sequence of the allocated nodes, but we do not reuse it. It is better
+ to keep the allocated nodes are in the sequence, for it is more efficient
+ for allocating and freeing Tranx_node.
+
+ @section FREENODE How to free nodes
+ There are two methods for freeing nodes. They are free_all_nodes and
+ free_nodes_before.
+
+ 'A Block is free' means all of its nodes are free.
+ @subsection free_nodes_before
+ As all allocated nodes are in the sequence, 'Before one node' means all
+ nodes before given node in the same Block and all Blocks before the Block
+ which containing the given node. As such, all Blocks before the given one
+ ('node') are free Block and moved into the rear of the Block link table.
+ The Block containing the given 'node', however, is not. For at least the
+ given 'node' is still in use. This will waste at most one Block, but it is
+ more efficient.
+ */
+#define BLOCK_TRANX_NODES 16
+class Tranx_node_allocator
+{
+public:
+ /**
+ @param reserved_nodes
+ The number of reserved Tranx_nodes. It is used to set 'reserved_blocks'
+ which can contain at least 'reserved_nodes' number of Tranx_nodes. When
+ freeing memory, we will reserve at least reserved_blocks of Blocks not
+ freed.
+ */
+ Tranx_node_allocator(uint reserved_nodes) :
+ reserved_blocks(reserved_nodes/BLOCK_TRANX_NODES +
+ (reserved_nodes%BLOCK_TRANX_NODES > 1 ? 2 : 1)),
+ first_block(NULL), last_block(NULL),
+ current_block(NULL), last_node(-1), block_num(0) {}
+
+ ~Tranx_node_allocator()
+ {
+ Block *block= first_block;
+ while (block != NULL)
+ {
+ Block *next= block->next;
+ free_block(block);
+ block= next;
+ }
+ }
+
+ /**
+ The pointer of the first node after 'last_node' in current_block is
+ returned. current_block will move to the next free Block when all nodes of
+ it are in use. A new Block is allocated and is put into the rear of the
+ Block link table if no Block is free.
+
+ @return Return a Tranx_node *, or NULL if an error occurred.
+ */
+ Tranx_node *allocate_node()
+ {
+ Tranx_node *trx_node;
+ Block *block= current_block;
+
+ if (last_node == BLOCK_TRANX_NODES-1)
+ {
+ current_block= current_block->next;
+ last_node= -1;
+ }
+
+ if (current_block == NULL && allocate_block())
+ {
+ current_block= block;
+ if (current_block)
+ last_node= BLOCK_TRANX_NODES-1;
+ return NULL;
+ }
+
+ trx_node= &(current_block->nodes[++last_node]);
+ trx_node->log_name[0] = '\0';
+ trx_node->log_pos= 0;
+ trx_node->next= 0;
+ trx_node->hash_next= 0;
+ return trx_node;
+ }
+
+ /**
+ All nodes are freed.
+
+ @return Return 0, or 1 if an error occurred.
+ */
+ int free_all_nodes()
+ {
+ current_block= first_block;
+ last_node= -1;
+ free_blocks();
+ return 0;
+ }
+
+ /**
+ All Blocks before the given 'node' are free Block and moved into the rear
+ of the Block link table.
+
+ @param node All nodes before 'node' will be freed
+
+ @return Return 0, or 1 if an error occurred.
+ */
+ int free_nodes_before(Tranx_node* node)
+ {
+ Block *block;
+ Block *prev_block= NULL;
+
+ block= first_block;
+ while (block != current_block->next)
+ {
+ /* Find the Block containing the given node */
+ if (&(block->nodes[0]) <= node && &(block->nodes[BLOCK_TRANX_NODES]) >= node)
+ {
+ /* All Blocks before the given node are put into the rear */
+ if (first_block != block)
+ {
+ last_block->next= first_block;
+ first_block= block;
+ last_block= prev_block;
+ last_block->next= NULL;
+ free_blocks();
+ }
+ return 0;
+ }
+ prev_block= block;
+ block= block->next;
+ }
+
+ /* Node does not find should never happen */
+ DBUG_ASSERT(0);
+ return 1;
+ }
+
+private:
+ uint reserved_blocks;
+
+ /**
+ A sequence memory which contains BLOCK_TRANX_NODES Tranx_nodes.
+
+ BLOCK_TRANX_NODES The number of Tranx_nodes which are in a Block.
+
+ next Every Block has a 'next' pointer which points to the next Block.
+ These linking Blocks constitute a Block link table.
+ */
+ struct Block {
+ Block *next;
+ Tranx_node nodes[BLOCK_TRANX_NODES];
+ };
+
+ /**
+ The 'first_block' is the head of the Block link table;
+ */
+ Block *first_block;
+ /**
+ The 'last_block' is the rear of the Block link table;
+ */
+ Block *last_block;
+
+ /**
+ current_block always points the Block in the Block link table in
+ which the last allocated node is. The Blocks before it are all in use
+ and the Blocks after it are all free.
+ */
+ Block *current_block;
+
+ /**
+ It always points to the last node which has been allocated in the
+ current_block.
+ */
+ int last_node;
+
+ /**
+ How many Blocks are in the Block link table.
+ */
+ uint block_num;
+
+ /**
+ Allocate a block and then assign it to current_block.
+ */
+ int allocate_block()
+ {
+ Block *block= (Block *)my_malloc(sizeof(Block), MYF(0));
+ if (block)
+ {
+ block->next= NULL;
+
+ if (first_block == NULL)
+ first_block= block;
+ else
+ last_block->next= block;
+
+ /* New Block is always put into the rear */
+ last_block= block;
+ /* New Block is always the current_block */
+ current_block= block;
+ ++block_num;
+ return 0;
+ }
+ return 1;
+ }
+
+ /**
+ Free a given Block.
+ @param block The Block will be freed.
+ */
+ void free_block(Block *block)
+ {
+ my_free(block);
+ --block_num;
+ }
+
+
+ /**
+ If there are some free Blocks and the total number of the Blocks in the
+ Block link table is larger than the 'reserved_blocks', Some free Blocks
+ will be freed until the total number of the Blocks is equal to the
+ 'reserved_blocks' or there is only one free Block behind the
+ 'current_block'.
+ */
+ void free_blocks()
+ {
+ if (current_block == NULL || current_block->next == NULL)
+ return;
+
+ /* One free Block is always kept behind the current block */
+ Block *block= current_block->next->next;
+ while (block_num > reserved_blocks && block != NULL)
+ {
+ Block *next= block->next;
+ free_block(block);
+ block= next;
+ }
+ current_block->next->next= block;
+ if (block == NULL)
+ last_block= current_block->next;
+ }
+};
+
+/**
+ This class manages memory for active transaction list.
+
+ We record each active transaction with a Tranx_node, each session
+ can have only one open transaction. Because of EVENT, the total
+ active transaction nodes can exceed the maximum allowed
+ connections.
+*/
+class Active_tranx
+ :public Trace {
+private:
+
+ Tranx_node_allocator m_allocator;
+ /* These two record the active transaction list in sort order. */
+ Tranx_node *m_trx_front, *m_trx_rear;
+
+ Tranx_node **m_trx_htb; /* A hash table on active transactions. */
+
+ int m_num_entries; /* maximum hash table entries */
+ mysql_mutex_t *m_lock; /* mutex lock */
+
+ inline void assert_lock_owner();
+
+ inline unsigned int calc_hash(const unsigned char *key,unsigned int length);
+ unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos);
+
+ int compare(const char *log_file_name1, my_off_t log_file_pos1,
+ const Tranx_node *node2) {
+ return compare(log_file_name1, log_file_pos1,
+ node2->log_name, node2->log_pos);
+ }
+ int compare(const Tranx_node *node1,
+ const char *log_file_name2, my_off_t log_file_pos2) {
+ return compare(node1->log_name, node1->log_pos,
+ log_file_name2, log_file_pos2);
+ }
+ int compare(const Tranx_node *node1, const Tranx_node *node2) {
+ return compare(node1->log_name, node1->log_pos,
+ node2->log_name, node2->log_pos);
+ }
+
+public:
+ Active_tranx(mysql_mutex_t *lock, unsigned long trace_level);
+ ~Active_tranx();
+
+ /* Insert an active transaction node with the specified position.
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
+
+ /* Clear the active transaction nodes until(inclusive) the specified
+ * position.
+ * If log_file_name is NULL, everything will be cleared: the sorted
+ * list and the hash table will be reset to empty.
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int clear_active_tranx_nodes(const char *log_file_name,
+ my_off_t log_file_pos);
+
+ /* Given a position, check to see whether the position is an active
+ * transaction's ending position by probing the hash table.
+ */
+ bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
+
+ /* Given two binlog positions, compare which one is bigger based on
+ * (file_name, file_position).
+ */
+ static int compare(const char *log_file_name1, my_off_t log_file_pos1,
+ const char *log_file_name2, my_off_t log_file_pos2);
+
+};
+
+/**
+ The extension class for the master of semi-synchronous replication
+*/
+class Repl_semi_sync_master
+ :public Repl_semi_sync_base {
+ private:
+ Active_tranx *m_active_tranxs; /* active transaction list: the list will
+ be cleared when semi-sync switches off. */
+
+ /* True when init_object has been called */
+ bool m_init_done;
+
+ /* This cond variable is signaled when enough binlog has been sent to slave,
+ * so that a waiting trx can return the 'ok' to the client for a commit.
+ */
+ mysql_cond_t COND_binlog_send;
+
+ /* Mutex that protects the following state variables and the active
+ * transaction list.
+ * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are
+ * already holding m_LOCK_binlog because it can cause deadlocks.
+ */
+ mysql_mutex_t LOCK_binlog;
+
+ /* This is set to true when m_reply_file_name contains meaningful data. */
+ bool m_reply_file_name_inited;
+
+ /* The binlog name up to which we have received replies from any slaves. */
+ char m_reply_file_name[FN_REFLEN];
+
+ /* The position in that file up to which we have the reply from any slaves. */
+ my_off_t m_reply_file_pos;
+
+ /* This is set to true when we know the 'smallest' wait position. */
+ bool m_wait_file_name_inited;
+
+ /* NULL, or the 'smallest' filename that a transaction is waiting for
+ * slave replies.
+ */
+ char m_wait_file_name[FN_REFLEN];
+
+ /* The smallest position in that file that a trx is waiting for: the trx
+ * can proceed and send an 'ok' to the client when the master has got the
+ * reply from the slave indicating that it already got the binlog events.
+ */
+ my_off_t m_wait_file_pos;
+
+ /* This is set to true when we know the 'largest' transaction commit
+ * position in the binlog file.
+ * We always maintain the position no matter whether semi-sync is switched
+ * on switched off. When a transaction wait timeout occurs, semi-sync will
+ * switch off. Binlog-dump thread can use the three fields to detect when
+ * slaves catch up on replication so that semi-sync can switch on again.
+ */
+ bool m_commit_file_name_inited;
+
+ /* The 'largest' binlog filename that a commit transaction is seeing. */
+ char m_commit_file_name[FN_REFLEN];
+
+ /* The 'largest' position in that file that a commit transaction is seeing. */
+ my_off_t m_commit_file_pos;
+
+ /* All global variables which can be set by parameters. */
+ volatile bool m_master_enabled; /* semi-sync is enabled on the master */
+ unsigned long m_wait_timeout; /* timeout period(ms) during tranx wait */
+
+ bool m_state; /* whether semi-sync is switched */
+
+ /*Waiting for ACK before/after innodb commit*/
+ ulong m_wait_point;
+
+ void lock();
+ void unlock();
+ void cond_broadcast();
+ int cond_timewait(struct timespec *wait_time);
+
+ /* Is semi-sync replication on? */
+ bool is_on() {
+ return (m_state);
+ }
+
+ void set_master_enabled(bool enabled) {
+ m_master_enabled = enabled;
+ }
+
+ /* Switch semi-sync off because of timeout in transaction waiting. */
+ int switch_off();
+
+ /* Switch semi-sync on when slaves catch up. */
+ int try_switch_on(int server_id,
+ const char *log_file_name, my_off_t log_file_pos);
+
+ public:
+ Repl_semi_sync_master();
+ ~Repl_semi_sync_master() {}
+
+ void cleanup();
+
+ bool get_master_enabled() {
+ return m_master_enabled;
+ }
+ void set_trace_level(unsigned long trace_level) {
+ m_trace_level = trace_level;
+ if (m_active_tranxs)
+ m_active_tranxs->m_trace_level = trace_level;
+ }
+
+ /* Set the transaction wait timeout period, in milliseconds. */
+ void set_wait_timeout(unsigned long wait_timeout) {
+ m_wait_timeout = wait_timeout;
+ }
+
+ /*set the ACK point, after binlog sync or after transaction commit*/
+ void set_wait_point(unsigned long ack_point)
+ {
+ m_wait_point = ack_point;
+ }
+
+ ulong wait_point() //no cover line
+ {
+ return m_wait_point; //no cover line
+ }
+
+ /* Initialize this class after MySQL parameters are initialized. this
+ * function should be called once at bootstrap time.
+ */
+ int init_object();
+
+ /* Enable the object to enable semi-sync replication inside the master. */
+ int enable_master();
+
+ /* Enable the object to enable semi-sync replication inside the master. */
+ int disable_master();
+
+ /* Add a semi-sync replication slave */
+ void add_slave();
+
+ /* Remove a semi-sync replication slave */
+ void remove_slave();
+
+ /* It parses a reply packet and call report_reply_binlog to handle it. */
+ int report_reply_packet(uint32 server_id, const uchar *packet,
+ ulong packet_len);
+
+ /* In semi-sync replication, reports up to which binlog position we have
+ * received replies from the slave indicating that it already get the events.
+ *
+ * Input:
+ * server_id - (IN) master server id number
+ * log_file_name - (IN) binlog file name
+ * end_offset - (IN) the offset in the binlog file up to which we have
+ * the replies from the slave
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int report_reply_binlog(uint32 server_id,
+ const char* log_file_name,
+ my_off_t end_offset);
+
+ /* Commit a transaction in the final step. This function is called from
+ * InnoDB before returning from the low commit. If semi-sync is switch on,
+ * the function will wait to see whether binlog-dump thread get the reply for
+ * the events of the transaction. Remember that this is not a direct wait,
+ * instead, it waits to see whether the binlog-dump thread has reached the
+ * point. If the wait times out, semi-sync status will be switched off and
+ * all other transaction would not wait either.
+ *
+ * Input: (the transaction events' ending binlog position)
+ * trx_wait_binlog_name - (IN) ending position's file name
+ * trx_wait_binlog_pos - (IN) ending position's file offset
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int commit_trx(const char* trx_wait_binlog_name,
+ my_off_t trx_wait_binlog_pos);
+
+ /*Wait for ACK after writing/sync binlog to file*/
+ int wait_after_sync(const char* log_file, my_off_t log_pos);
+
+ /*Wait for ACK after commting the transaction*/
+ int wait_after_commit(THD* thd, bool all);
+
+ /*Wait after the transaction is rollback*/
+ int wait_after_rollback(THD *thd, bool all);
+ /*Store the current binlog position in m_active_tranxs. This position should
+ * be acked by slave*/
+ int report_binlog_update(THD *thd, const char *log_file,my_off_t log_pos);
+
+ int dump_start(THD* thd,
+ const char *log_file,
+ my_off_t log_pos);
+
+ void dump_end(THD* thd);
+
+ /* Reserve space in the replication event packet header:
+ * . slave semi-sync off: 1 byte - (0)
+ * . slave semi-sync on: 3 byte - (0, 0xef, 0/1}
+ *
+ * Input:
+ * packet - (IN) the header buffer
+ *
+ * Return:
+ * size of the bytes reserved for header
+ */
+ int reserve_sync_header(String* packet);
+
+ /* Update the sync bit in the packet header to indicate to the slave whether
+ * the master will wait for the reply of the event. If semi-sync is switched
+ * off and we detect that the slave is catching up, we switch semi-sync on.
+ *
+ * Input:
+ * THD - (IN) current dump thread
+ * packet - (IN) the packet containing the replication event
+ * log_file_name - (IN) the event ending position's file name
+ * log_file_pos - (IN) the event ending position's file offset
+ * need_sync - (IN) identify if flush_net is needed to call.
+ * server_id - (IN) master server id number
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int update_sync_header(THD* thd, unsigned char *packet,
+ const char *log_file_name,
+ my_off_t log_file_pos,
+ bool* need_sync);
+
+ /* Called when a transaction finished writing binlog events.
+ * . update the 'largest' transactions' binlog event position
+ * . insert the ending position in the active transaction list if
+ * semi-sync is on
+ *
+ * Input: (the transaction events' ending binlog position)
+ * log_file_name - (IN) transaction ending position's file name
+ * log_file_pos - (IN) transaction ending position's file offset
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int write_tranx_in_binlog(const char* log_file_name, my_off_t log_file_pos);
+
+ /* Read the slave's reply so that we know how much progress the slave makes
+ * on receive replication events.
+ */
+ int flush_net(THD* thd, const char *event_buf);
+
+ /* Export internal statistics for semi-sync replication. */
+ void set_export_stats();
+
+ /* 'reset master' command is issued from the user and semi-sync need to
+ * go off for that.
+ */
+ int after_reset_master();
+
+ /*called before reset master*/
+ int before_reset_master();
+
+ void check_and_switch();
+};
+
+enum rpl_semi_sync_master_wait_point_t {
+ SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC,
+ SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT,
+};
+
+extern Repl_semi_sync_master repl_semisync_master;
+extern Ack_receiver ack_receiver;
+
+/* System and status variables for the master component */
+extern my_bool rpl_semi_sync_master_enabled;
+extern my_bool rpl_semi_sync_master_status;
+extern ulong rpl_semi_sync_master_wait_point;
+extern ulong rpl_semi_sync_master_clients;
+extern ulong rpl_semi_sync_master_timeout;
+extern ulong rpl_semi_sync_master_trace_level;
+extern ulong rpl_semi_sync_master_yes_transactions;
+extern ulong rpl_semi_sync_master_no_transactions;
+extern ulong rpl_semi_sync_master_off_times;
+extern ulong rpl_semi_sync_master_wait_timeouts;
+extern ulong rpl_semi_sync_master_timefunc_fails;
+extern ulong rpl_semi_sync_master_num_timeouts;
+extern ulong rpl_semi_sync_master_wait_sessions;
+extern ulong rpl_semi_sync_master_wait_pos_backtraverse;
+extern ulong rpl_semi_sync_master_avg_trx_wait_time;
+extern ulong rpl_semi_sync_master_avg_net_wait_time;
+extern ulonglong rpl_semi_sync_master_net_wait_num;
+extern ulonglong rpl_semi_sync_master_trx_wait_num;
+extern ulonglong rpl_semi_sync_master_net_wait_time;
+extern ulonglong rpl_semi_sync_master_trx_wait_time;
+extern unsigned long long rpl_semi_sync_master_request_ack;
+extern unsigned long long rpl_semi_sync_master_get_ack;
+
+/*
+ This indicates whether we should keep waiting if no semi-sync slave
+ is available.
+ 0 : stop waiting if detected no avaialable semi-sync slave.
+ 1 (default) : keep waiting until timeout even no available semi-sync slave.
+*/
+extern char rpl_semi_sync_master_wait_no_slave;
+extern Repl_semi_sync_master repl_semisync_master;
+
+extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
+extern PSI_stage_info stage_reading_semi_sync_ack;
+extern PSI_stage_info stage_waiting_for_semi_sync_slave;
+
+void semi_sync_master_deinit();
+
+#endif /* SEMISYNC_MASTER_H */
diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc
new file mode 100644
index 00000000000..f986c629f65
--- /dev/null
+++ b/sql/semisync_master_ack_receiver.cc
@@ -0,0 +1,303 @@
+/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+#include <my_global.h>
+#include "semisync_master.h"
+#include "semisync_master_ack_receiver.h"
+
+extern PSI_mutex_key key_LOCK_ack_receiver;
+extern PSI_cond_key key_COND_ack_receiver;
+extern PSI_thread_key key_thread_ack_receiver;
+extern Repl_semi_sync_master repl_semisync;
+
+/* Callback function of ack receive thread */
+pthread_handler_t ack_receive_handler(void *arg)
+{
+ Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg);
+
+ my_thread_init();
+ recv->run();
+ my_thread_end();
+
+ return NULL;
+}
+
+Ack_receiver::Ack_receiver()
+{
+ DBUG_ENTER("Ack_receiver::Ack_receiver");
+
+ m_status= ST_DOWN;
+ mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex,
+ MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL);
+ m_pid= 0;
+
+ DBUG_VOID_RETURN;
+}
+
+void Ack_receiver::cleanup()
+{
+ DBUG_ENTER("Ack_receiver::~Ack_receiver");
+
+ stop();
+ mysql_mutex_destroy(&m_mutex);
+ mysql_cond_destroy(&m_cond);
+
+ DBUG_VOID_RETURN;
+}
+
+bool Ack_receiver::start()
+{
+ DBUG_ENTER("Ack_receiver::start");
+
+ mysql_mutex_lock(&m_mutex);
+ if(m_status == ST_DOWN)
+ {
+ pthread_attr_t attr;
+
+ m_status= ST_UP;
+
+ if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) ||
+ pthread_attr_init(&attr) != 0 ||
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 ||
+#ifndef _WIN32
+ pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 ||
+#endif
+ mysql_thread_create(key_thread_ack_receiver, &m_pid,
+ &attr, ack_receive_handler, this))
+ {
+ sql_print_error("Failed to start semi-sync ACK receiver thread, "
+ " could not create thread(errno:%d)", errno);
+
+ m_status= ST_DOWN;
+ mysql_mutex_unlock(&m_mutex);
+
+ DBUG_RETURN(true);
+ }
+ (void) pthread_attr_destroy(&attr);
+ }
+ mysql_mutex_unlock(&m_mutex);
+
+ DBUG_RETURN(false);
+}
+
+void Ack_receiver::stop()
+{
+ DBUG_ENTER("Ack_receiver::stop");
+
+ mysql_mutex_lock(&m_mutex);
+ if (m_status == ST_UP)
+ {
+ m_status= ST_STOPPING;
+ mysql_cond_broadcast(&m_cond);
+
+ while (m_status == ST_STOPPING)
+ mysql_cond_wait(&m_cond, &m_mutex);
+
+ DBUG_ASSERT(m_status == ST_DOWN);
+
+ m_pid= 0;
+ }
+ mysql_mutex_unlock(&m_mutex);
+
+ DBUG_VOID_RETURN;
+}
+
+bool Ack_receiver::add_slave(THD *thd)
+{
+ Slave *slave;
+ DBUG_ENTER("Ack_receiver::add_slave");
+
+ if (!(slave= new Slave))
+ DBUG_RETURN(true);
+
+ slave->thd= thd;
+ slave->vio= *thd->net.vio;
+ slave->vio.mysql_socket.m_psi= NULL;
+ slave->vio.read_timeout= 1;
+
+ mysql_mutex_lock(&m_mutex);
+ m_slaves.push_back(slave);
+ m_slaves_changed= true;
+ mysql_cond_broadcast(&m_cond);
+ mysql_mutex_unlock(&m_mutex);
+
+ DBUG_RETURN(false);
+}
+
+void Ack_receiver::remove_slave(THD *thd)
+{
+ I_List_iterator<Slave> it(m_slaves);
+ Slave *slave;
+ DBUG_ENTER("Ack_receiver::remove_slave");
+
+ mysql_mutex_lock(&m_mutex);
+
+ while ((slave= it++))
+ {
+ if (slave->thd == thd)
+ {
+ delete slave;
+ m_slaves_changed= true;
+ break;
+ }
+ }
+ mysql_mutex_unlock(&m_mutex);
+
+ DBUG_VOID_RETURN;
+}
+
+inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage)
+{
+ MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__);
+}
+
+inline void Ack_receiver::wait_for_slave_connection()
+{
+ set_stage_info(stage_waiting_for_semi_sync_slave);
+ mysql_cond_wait(&m_cond, &m_mutex);
+}
+
+my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count)
+{
+ my_socket max_fd= INVALID_SOCKET;
+ Slave *slave;
+ I_List_iterator<Slave> it(m_slaves);
+
+ *count= 0;
+ FD_ZERO(fds);
+ while ((slave= it++))
+ {
+ (*count)++;
+ my_socket fd= slave->sock_fd();
+ max_fd= (fd > max_fd ? fd : max_fd);
+ FD_SET(fd, fds);
+ }
+
+ return max_fd;
+}
+
+/* Auxilary function to initialize a NET object with given net buffer. */
+static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
+{
+ memset(net, 0, sizeof(NET));
+ net->max_packet= buff_len;
+ net->buff= buff;
+ net->buff_end= buff + buff_len;
+ net->read_pos= net->buff;
+}
+
+void Ack_receiver::run()
+{
+ // skip LOCK_global_system_variables due to the 3rd arg
+ THD *thd= new THD(next_thread_id(), false, true);
+ NET net;
+ unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
+ fd_set read_fds;
+ my_socket max_fd= INVALID_SOCKET;
+ Slave *slave;
+
+ my_thread_init();
+
+ DBUG_ENTER("Ack_receiver::run");
+
+ sql_print_information("Starting ack receiver thread");
+ thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
+ thd->thread_stack= (char*) &thd;
+ thd->store_globals();
+ thd->security_ctx->skip_grants();
+ thread_safe_increment32(&service_thread_count);
+ thd->set_command(COM_DAEMON);
+ init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
+
+ mysql_mutex_lock(&m_mutex);
+ m_slaves_changed= true;
+ mysql_mutex_unlock(&m_mutex);
+
+ while (1)
+ {
+ fd_set fds;
+ int ret;
+ uint slave_count;
+
+ mysql_mutex_lock(&m_mutex);
+ if (unlikely(m_status == ST_STOPPING))
+ goto end;
+
+ set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
+ if (unlikely(m_slaves_changed))
+ {
+ if (unlikely(m_slaves.is_empty()))
+ {
+ wait_for_slave_connection();
+ mysql_mutex_unlock(&m_mutex);
+ continue;
+ }
+
+ max_fd= get_slave_sockets(&read_fds, &slave_count);
+ m_slaves_changed= false;
+ DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count, max_fd));
+ }
+
+ struct timeval tv= {1, 0};
+ fds= read_fds;
+ /* select requires max fd + 1 for the first argument */
+ ret= select(max_fd+1, &fds, NULL, NULL, &tv);
+ if (ret <= 0)
+ {
+ mysql_mutex_unlock(&m_mutex);
+
+ ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);
+
+ if (ret == -1)
+ sql_print_information("Failed to select() on semi-sync dump sockets, "
+ "error: errno=%d", socket_errno);
+ /* Sleep 1us, so other threads can catch the m_mutex easily. */
+ my_sleep(1);
+ continue;
+ }
+
+ set_stage_info(stage_reading_semi_sync_ack);
+ I_List_iterator<Slave> it(m_slaves);
+
+ while ((slave= it++))
+ {
+ if (FD_ISSET(slave->sock_fd(), &fds))
+ {
+ ulong len;
+
+ net_clear(&net, 0);
+ net.vio= &slave->vio;
+
+ len= my_net_read(&net);
+ if (likely(len != packet_error))
+ repl_semisync_master.report_reply_packet(slave->server_id(),
+ net.read_pos, len);
+ else if (net.last_errno == ER_NET_READ_ERROR)
+ FD_CLR(slave->sock_fd(), &read_fds);
+ }
+ }
+ mysql_mutex_unlock(&m_mutex);
+ }
+end:
+ sql_print_information("Stopping ack receiver thread");
+ m_status= ST_DOWN;
+ delete thd;
+ thread_safe_decrement32(&service_thread_count);
+ signal_thd_deleted();
+ mysql_cond_broadcast(&m_cond);
+ mysql_mutex_unlock(&m_mutex);
+ DBUG_VOID_RETURN;
+}
diff --git a/sql/semisync_master_ack_receiver.h b/sql/semisync_master_ack_receiver.h
new file mode 100644
index 00000000000..619748a2159
--- /dev/null
+++ b/sql/semisync_master_ack_receiver.h
@@ -0,0 +1,119 @@
+/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+#ifndef SEMISYNC_MASTER_ACK_RECEIVER_DEFINED
+#define SEMISYNC_MASTER_ACK_RECEIVER_DEFINED
+
+#include "my_global.h"
+#include "my_pthread.h"
+#include "sql_class.h"
+#include "semisync.h"
+/**
+ Ack_receiver is responsible to control ack receive thread and maintain
+ slave information used by ack receive thread.
+
+ There are mainly four operations on ack receive thread:
+ start: start ack receive thread
+ stop: stop ack receive thread
+ add_slave: maintain a new semisync slave's information
+ remove_slave: remove a semisync slave's information
+ */
+class Ack_receiver : public Repl_semi_sync_base
+{
+public:
+ Ack_receiver();
+ ~Ack_receiver() {}
+ void cleanup();
+ /**
+ Notify ack receiver to receive acks on the dump session.
+
+ It adds the given dump thread into the slave list and wakes
+ up ack thread if it is waiting for any slave coming.
+
+ @param[in] thd THD of a dump thread.
+
+ @return it return false if succeeds, otherwise true is returned.
+ */
+ bool add_slave(THD *thd);
+
+ /**
+ Notify ack receiver not to receive ack on the dump session.
+
+ it removes the given dump thread from slave list.
+
+ @param[in] thd THD of a dump thread.
+ */
+ void remove_slave(THD *thd);
+
+ /**
+ Start ack receive thread
+
+ @return it return false if succeeds, otherwise true is returned.
+ */
+ bool start();
+
+ /**
+ Stop ack receive thread
+ */
+ void stop();
+
+ /**
+ The core of ack receive thread.
+
+ It monitors all slaves' sockets and receives acks when they come.
+ */
+ void run();
+
+ void set_trace_level(unsigned long trace_level)
+ {
+ m_trace_level= trace_level;
+ }
+private:
+ enum status {ST_UP, ST_DOWN, ST_STOPPING};
+ uint8 m_status;
+ /*
+ Protect m_status, m_slaves_changed and m_slaves. ack thread and other
+ session may access the variables at the same time.
+ */
+ mysql_mutex_t m_mutex;
+ mysql_cond_t m_cond;
+ /* If slave list is updated(add or remove). */
+ bool m_slaves_changed;
+
+ class Slave :public ilink
+ {
+public:
+ THD *thd;
+ Vio vio;
+
+ my_socket sock_fd() { return vio.mysql_socket.fd; }
+ uint server_id() { return thd->variables.server_id; }
+ };
+
+ I_List<Slave> m_slaves;
+
+ pthread_t m_pid;
+
+/* Declare them private, so no one can copy the object. */
+ Ack_receiver(const Ack_receiver &ack_receiver);
+ Ack_receiver& operator=(const Ack_receiver &ack_receiver);
+
+ void set_stage_info(const PSI_stage_info &stage);
+ void wait_for_slave_connection();
+ my_socket get_slave_sockets(fd_set *fds, uint *count);
+};
+
+extern Ack_receiver ack_receiver;
+#endif
diff --git a/sql/semisync_slave.cc b/sql/semisync_slave.cc
new file mode 100644
index 00000000000..2d77ee7b10c
--- /dev/null
+++ b/sql/semisync_slave.cc
@@ -0,0 +1,251 @@
+/* Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc.
+ Use is subject to license terms.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+#include <my_global.h>
+#include "semisync_slave.h"
+
+Repl_semi_sync_slave repl_semisync_slave;
+
+my_bool rpl_semi_sync_slave_enabled= 0;
+
+char rpl_semi_sync_slave_delay_master;
+my_bool rpl_semi_sync_slave_status= 0;
+ulong rpl_semi_sync_slave_trace_level;
+
+/*
+ indicate whether or not the slave should send a reply to the master.
+
+ This is set to true in repl_semi_slave_read_event if the current
+ event read is the last event of a transaction. And the value is
+ checked in repl_semi_slave_queue_event.
+*/
+bool semi_sync_need_reply= false;
+unsigned int rpl_semi_sync_slave_kill_conn_timeout;
+unsigned long long rpl_semi_sync_slave_send_ack = 0;
+
+int Repl_semi_sync_slave::init_object()
+{
+ int result= 0;
+
+ m_init_done = true;
+
+ /* References to the parameter works after set_options(). */
+ set_slave_enabled(rpl_semi_sync_slave_enabled);
+ set_trace_level(rpl_semi_sync_slave_trace_level);
+ set_delay_master(rpl_semi_sync_slave_delay_master);
+ set_kill_conn_timeout(rpl_semi_sync_slave_kill_conn_timeout);
+
+ return result;
+}
+
+int Repl_semi_sync_slave::slave_read_sync_header(const char *header,
+ unsigned long total_len,
+ int *semi_flags,
+ const char **payload,
+ unsigned long *payload_len)
+{
+ int read_res = 0;
+ DBUG_ENTER("Repl_semi_sync_slave::slave_read_sync_header");
+
+ if (rpl_semi_sync_slave_status)
+ {
+ if (DBUG_EVALUATE_IF("semislave_corrupt_log", 0, 1)
+ && (unsigned char)(header[0]) == k_packet_magic_num)
+ {
+ semi_sync_need_reply = (header[1] & k_packet_flag_sync);
+ *payload_len = total_len - 2;
+ *payload = header + 2;
+
+ DBUG_PRINT("semisync", ("%s: reply - %d",
+ "Repl_semi_sync_slave::slave_read_sync_header",
+ semi_sync_need_reply));
+
+ if (semi_sync_need_reply)
+ *semi_flags |= SEMI_SYNC_NEED_ACK;
+ if (is_delay_master())
+ *semi_flags |= SEMI_SYNC_SLAVE_DELAY_SYNC;
+ }
+ else
+ {
+ sql_print_error("Missing magic number for semi-sync packet, packet "
+ "len: %lu", total_len);
+ read_res = -1;
+ }
+ } else {
+ *payload= header;
+ *payload_len= total_len;
+ }
+
+ DBUG_RETURN(read_res);
+}
+
+int Repl_semi_sync_slave::slave_start(Master_info *mi)
+{
+ bool semi_sync= get_slave_enabled();
+
+ sql_print_information("Slave I/O thread: Start %s replication to\
+ master '%s@%s:%d' in log '%s' at position %lu",
+ semi_sync ? "semi-sync" : "asynchronous",
+ const_cast<char *>(mi->user), mi->host, mi->port,
+ const_cast<char *>(mi->master_log_name),
+ (unsigned long)(mi->master_log_pos));
+
+ if (semi_sync && !rpl_semi_sync_slave_status)
+ rpl_semi_sync_slave_status= 1;
+
+ /*clear the counter*/
+ rpl_semi_sync_slave_send_ack= 0;
+ return 0;
+}
+
+int Repl_semi_sync_slave::slave_stop(Master_info *mi)
+{
+ if (rpl_semi_sync_slave_status)
+ rpl_semi_sync_slave_status= 0;
+ if (get_slave_enabled())
+ kill_connection(mi->mysql);
+ return 0;
+}
+
+int Repl_semi_sync_slave::reset_slave(Master_info *mi)
+{
+ return 0;
+}
+
+void Repl_semi_sync_slave::kill_connection(MYSQL *mysql)
+{
+ if (!mysql)
+ return;
+
+ char kill_buffer[30];
+ MYSQL *kill_mysql = NULL;
+ kill_mysql = mysql_init(kill_mysql);
+ mysql_options(kill_mysql, MYSQL_OPT_CONNECT_TIMEOUT, &m_kill_conn_timeout);
+ mysql_options(kill_mysql, MYSQL_OPT_READ_TIMEOUT, &m_kill_conn_timeout);
+ mysql_options(kill_mysql, MYSQL_OPT_WRITE_TIMEOUT, &m_kill_conn_timeout);
+
+ bool ret= (!mysql_real_connect(kill_mysql, mysql->host,
+ mysql->user, mysql->passwd,0, mysql->port, mysql->unix_socket, 0));
+ if (DBUG_EVALUATE_IF("semisync_slave_failed_kill", 1, 0) || ret)
+ {
+ sql_print_information("cannot connect to master to kill slave io_thread's "
+ "connection");
+ if (!ret)
+ mysql_close(kill_mysql);
+ return;
+ }
+ uint kill_buffer_length = my_snprintf(kill_buffer, 30, "KILL %lu",
+ mysql->thread_id);
+ mysql_real_query(kill_mysql, kill_buffer, kill_buffer_length);
+ mysql_close(kill_mysql);
+}
+
+int Repl_semi_sync_slave::request_transmit(Master_info *mi)
+{
+ MYSQL *mysql= mi->mysql;
+ MYSQL_RES *res= 0;
+ MYSQL_ROW row;
+ const char *query;
+
+ if (!get_slave_enabled())
+ return 0;
+
+ query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
+ if (mysql_real_query(mysql, query, strlen(query)) ||
+ !(res= mysql_store_result(mysql)))
+ {
+ sql_print_error("Execution failed on master: %s, error :%s", query, mysql_error(mysql));
+ return 1;
+ }
+
+ row= mysql_fetch_row(res);
+ if (DBUG_EVALUATE_IF("master_not_support_semisync", 1, 0)
+ || !row)
+ {
+ /* Master does not support semi-sync */
+ sql_print_warning("Master server does not support semi-sync, "
+ "fallback to asynchronous replication");
+ rpl_semi_sync_slave_status= 0;
+ mysql_free_result(res);
+ return 0;
+ }
+ mysql_free_result(res);
+
+ /*
+ Tell master dump thread that we want to do semi-sync
+ replication
+ */
+ query= "SET @rpl_semi_sync_slave= 1";
+ if (mysql_real_query(mysql, query, strlen(query)))
+ {
+ sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed");
+ return 1;
+ }
+ mysql_free_result(mysql_store_result(mysql));
+ rpl_semi_sync_slave_status= 1;
+
+ return 0;
+}
+
+int Repl_semi_sync_slave::slave_reply(Master_info *mi)
+{
+ MYSQL* mysql= mi->mysql;
+ const char *binlog_filename= const_cast<char *>(mi->master_log_name);
+ my_off_t binlog_filepos= mi->master_log_pos;
+
+ NET *net= &mysql->net;
+ uchar reply_buffer[REPLY_MAGIC_NUM_LEN
+ + REPLY_BINLOG_POS_LEN
+ + REPLY_BINLOG_NAME_LEN];
+ int reply_res = 0;
+ int name_len = strlen(binlog_filename);
+
+ DBUG_ENTER("Repl_semi_sync_slave::slave_reply");
+
+ if (rpl_semi_sync_slave_status && semi_sync_need_reply)
+ {
+ /* Prepare the buffer of the reply. */
+ reply_buffer[REPLY_MAGIC_NUM_OFFSET] = k_packet_magic_num;
+ int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
+ memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
+ binlog_filename,
+ name_len + 1 /* including trailing '\0' */);
+
+ DBUG_PRINT("semisync", ("%s: reply (%s, %lu)",
+ "Repl_semi_sync_slave::slave_reply",
+ binlog_filename, (ulong)binlog_filepos));
+
+ net_clear(net, 0);
+ /* Send the reply. */
+ reply_res = my_net_write(net, reply_buffer,
+ name_len + REPLY_BINLOG_NAME_OFFSET);
+ if (!reply_res)
+ {
+ reply_res = DBUG_EVALUATE_IF("semislave_failed_net_flush", 1, net_flush(net));
+ if (reply_res)
+ sql_print_error("Semi-sync slave net_flush() reply failed");
+ rpl_semi_sync_slave_send_ack++;
+ }
+ else
+ {
+ sql_print_error("Semi-sync slave send reply failed: %s (%d)",
+ net->last_error, net->last_errno);
+ }
+ }
+
+ DBUG_RETURN(reply_res);
+}
diff --git a/sql/semisync_slave.h b/sql/semisync_slave.h
new file mode 100644
index 00000000000..d65262f151d
--- /dev/null
+++ b/sql/semisync_slave.h
@@ -0,0 +1,115 @@
+/* Copyright (c) 2006 MySQL AB, 2009 Sun Microsystems, Inc.
+ Use is subject to license terms.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+#ifndef SEMISYNC_SLAVE_H
+#define SEMISYNC_SLAVE_H
+
+#include "semisync.h"
+#include "my_global.h"
+#include "sql_priv.h"
+#include "rpl_mi.h"
+#include "mysql.h"
+
+class Master_info;
+
+/**
+ The extension class for the slave of semi-synchronous replication
+*/
+class Repl_semi_sync_slave
+ :public Repl_semi_sync_base {
+public:
+ Repl_semi_sync_slave() :m_slave_enabled(false) {}
+ ~Repl_semi_sync_slave() {}
+
+ void set_trace_level(unsigned long trace_level) {
+ m_trace_level = trace_level;
+ }
+
+ /* Initialize this class after MySQL parameters are initialized. this
+ * function should be called once at bootstrap time.
+ */
+ int init_object();
+
+ bool get_slave_enabled() {
+ return m_slave_enabled;
+ }
+
+ void set_slave_enabled(bool enabled) {
+ m_slave_enabled = enabled;
+ }
+
+ bool is_delay_master(){
+ return m_delay_master;
+ }
+
+ void set_delay_master(bool enabled) {
+ m_delay_master = enabled;
+ }
+
+ void set_kill_conn_timeout(unsigned int timeout) {
+ m_kill_conn_timeout = timeout;
+ }
+
+ /* A slave reads the semi-sync packet header and separate the metadata
+ * from the payload data.
+ *
+ * Input:
+ * header - (IN) packet header pointer
+ * total_len - (IN) total packet length: metadata + payload
+ * semi_flags - (IN) store flags: SEMI_SYNC_SLAVE_DELAY_SYNC and
+ SEMI_SYNC_NEED_ACK
+ * payload - (IN) payload: the replication event
+ * payload_len - (IN) payload length
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int slave_read_sync_header(const char *header, unsigned long total_len,
+ int *semi_flags,
+ const char **payload, unsigned long *payload_len);
+
+ /* A slave replies to the master indicating its replication process. It
+ * indicates that the slave has received all events before the specified
+ * binlog position.
+ */
+ int slave_reply(Master_info* mi);
+ int slave_start(Master_info *mi);
+ int slave_stop(Master_info *mi);
+ int request_transmit(Master_info*);
+ void kill_connection(MYSQL *mysql);
+ int reset_slave(Master_info *mi);
+
+private:
+ /* True when init_object has been called */
+ bool m_init_done;
+ bool m_slave_enabled; /* semi-sycn is enabled on the slave */
+ bool m_delay_master;
+ unsigned int m_kill_conn_timeout;
+};
+
+
+/* System and status variables for the slave component */
+extern my_bool rpl_semi_sync_slave_enabled;
+extern my_bool rpl_semi_sync_slave_status;
+extern ulong rpl_semi_sync_slave_trace_level;
+extern Repl_semi_sync_slave repl_semisync_slave;
+
+extern char rpl_semi_sync_slave_delay_master;
+extern unsigned int rpl_semi_sync_slave_kill_conn_timeout;
+extern unsigned long long rpl_semi_sync_slave_send_ack;
+
+#endif /* SEMISYNC_SLAVE_H */
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 730910ed111..2b3a4640008 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6850,8 +6850,8 @@ ER_CANT_SET_GTID_NEXT_WHEN_OWNING_GTID
eng "GTID_NEXT cannot be changed by a client that owns a GTID. The client owns %s. Ownership is released on COMMIT or ROLLBACK"
ER_UNKNOWN_EXPLAIN_FORMAT
- eng "Unknown EXPLAIN format name: '%s'"
- rus "Неизвестное имя формата команды EXPLAIN: '%s'"
+ eng "Unknown %s format name: '%s'"
+ rus "Неизвестное имя формата команды %s: '%s'"
ER_CANT_EXECUTE_IN_READ_ONLY_TRANSACTION 25006
eng "Cannot execute statement in a READ ONLY transaction"
diff --git a/sql/slave.cc b/sql/slave.cc
index a57312998f1..f36af66f780 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -43,7 +43,6 @@
#include <ssl_compat.h>
#include <mysqld_error.h>
#include <mysys_err.h>
-#include "rpl_handler.h"
#include <signal.h>
#include <mysql.h>
#include <myisam.h>
@@ -61,6 +60,7 @@
#include "debug_sync.h"
#include "rpl_parallel.h"
#include "sql_show.h"
+#include "semisync_slave.h"
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
@@ -3586,11 +3586,9 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
if (opt_log_slave_updates && opt_replicate_annotate_row_events)
binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
- if (RUN_HOOK(binlog_relay_io,
- before_request_transmit,
- (thd, mi, binlog_flags)))
+ if (repl_semisync_slave.request_transmit(mi))
DBUG_RETURN(1);
-
+
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
@@ -4615,7 +4613,8 @@ pthread_handler_t handle_slave_io(void *arg)
}
- if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
+ if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0)
+ || repl_semisync_slave.slave_start(mi))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@@ -4805,9 +4804,10 @@ Stopping slave I/O thread due to out-of-memory error from master");
retry_count=0; // ok event, reset retry counter
THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log);
event_buf= (const char*)mysql->net.read_pos + 1;
- if (RUN_HOOK(binlog_relay_io, after_read_event,
- (thd, mi,(const char*)mysql->net.read_pos + 1,
- event_len, &event_buf, &event_len)))
+ mi->semi_ack= 0;
+ if (repl_semisync_slave.
+ slave_read_sync_header((const char*)mysql->net.read_pos + 1, event_len,
+ &(mi->semi_ack), &event_buf, &event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@@ -4856,9 +4856,6 @@ Stopping slave I/O thread due to out-of-memory error from master");
tokenamount -= network_read_len;
}
- /* XXX: 'synced' should be updated by queue_event to indicate
- whether event has been synced to disk */
- bool synced= 0;
if (queue_event(mi, event_buf, event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
@@ -4867,8 +4864,8 @@ Stopping slave I/O thread due to out-of-memory error from master");
goto err;
}
- if (RUN_HOOK(binlog_relay_io, after_queue_event,
- (thd, mi, event_buf, event_len, synced)))
+ if (rpl_semi_sync_slave_status && (mi->semi_ack & SEMI_SYNC_NEED_ACK) &&
+ repl_semisync_slave.slave_reply(mi))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@@ -4877,7 +4874,16 @@ Stopping slave I/O thread due to out-of-memory error from master");
}
if (mi->using_gtid == Master_info::USE_GTID_NO &&
- flush_master_info(mi, TRUE, TRUE))
+ /*
+ If rpl_semi_sync_slave_delay_master is enabled, we will flush
+ master info only when ack is needed. This may lead to at least one
+ group transaction delay but affords better performance improvement.
+ */
+ (!repl_semisync_slave.get_slave_enabled() ||
+ (!(mi->semi_ack & SEMI_SYNC_SLAVE_DELAY_SYNC) ||
+ (mi->semi_ack & (SEMI_SYNC_NEED_ACK)))) &&
+ (DBUG_EVALUATE_IF("failed_flush_master_info", 1, 0) ||
+ flush_master_info(mi, TRUE, TRUE)))
{
sql_print_error("Failed to flush master info file");
goto err;
@@ -4931,7 +4937,7 @@ err:
IO_RPL_LOG_NAME, mi->master_log_pos,
tmp.c_ptr_safe());
}
- RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
+ repl_semisync_slave.slave_stop(mi);
thd->reset_query();
thd->reset_db(NULL, 0);
if (mysql)
@@ -6253,7 +6259,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mysql_mutex_unlock(log_lock);
goto err;
}
- rli->relay_log.signal_update();
+ rli->relay_log.signal_relay_log_update();
mysql_mutex_unlock(log_lock);
mi->gtid_reconnect_event_skip_count= 0;
@@ -6798,7 +6804,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
if (got_gtid_event)
rli->ign_gtids.update(&event_gtid);
}
- rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
+ // the slave SQL thread needs to re-check
+ rli->relay_log.signal_relay_log_update();
DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored",
(ulong) mi->master_log_pos, uint4korr(buf + SERVER_ID_OFFSET)));
}
@@ -7308,7 +7315,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
MYSQL_BIN_LOG::open() will write the buffered description event.
*/
old_pos= rli->event_relay_log_pos;
- if ((ev= Log_event::read_log_event(cur_log,0,
+ if ((ev= Log_event::read_log_event(cur_log,
rli->relay_log.description_event_for_exec,
opt_slave_sql_verify_checksum)))
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 8a00b7832c3..00144a9a12d 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -705,7 +705,7 @@ extern "C" void thd_kill_timeout(THD* thd)
thd->awake(KILL_TIMEOUT);
}
-THD::THD(my_thread_id id, bool is_wsrep_applier)
+THD::THD(my_thread_id id, bool is_wsrep_applier, bool skip_global_sys_var_lock)
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
/* statement id */ 0),
rli_fake(0), rgi_fake(0), rgi_slave(NULL),
@@ -892,7 +892,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
/* Call to init() below requires fully initialized Open_tables_state. */
reset_open_tables_state(this);
- init();
+ init(skip_global_sys_var_lock);
#if defined(ENABLED_PROFILING)
profiling.set_thd(this);
#endif
@@ -1263,10 +1263,11 @@ const Type_handler *THD::type_handler_for_date() const
Init common variables that has to be reset on start and on change_user
*/
-void THD::init(void)
+void THD::init(bool skip_lock)
{
DBUG_ENTER("thd::init");
- mysql_mutex_lock(&LOCK_global_system_variables);
+ if (!skip_lock)
+ mysql_mutex_lock(&LOCK_global_system_variables);
plugin_thdvar_init(this);
/*
plugin_thd_var_init() sets variables= global_system_variables, which
@@ -1279,8 +1280,8 @@ void THD::init(void)
::strmake(default_master_connection_buff,
global_system_variables.default_master_connection.str,
variables.default_master_connection.length);
-
- mysql_mutex_unlock(&LOCK_global_system_variables);
+ if (!skip_lock)
+ mysql_mutex_unlock(&LOCK_global_system_variables);
user_time.val= start_time= start_time_sec_part= 0;
@@ -4193,7 +4194,8 @@ my_bool thd_net_is_killed()
void thd_increment_bytes_received(void *thd, ulong length)
{
- ((THD*) thd)->status_var.bytes_received+= length;
+ if (thd != NULL) // MDEV-13073 Ack collector having NULL
+ ((THD*) thd)->status_var.bytes_received+= length;
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 41b8efc4464..b56729737de 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1599,7 +1599,8 @@ enum enum_thread_type
SYSTEM_THREAD_EVENT_WORKER= 16,
SYSTEM_THREAD_BINLOG_BACKGROUND= 32,
SYSTEM_THREAD_SLAVE_BACKGROUND= 64,
- SYSTEM_THREAD_GENERIC= 128
+ SYSTEM_THREAD_GENERIC= 128,
+ SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND= 256
};
inline char const *
@@ -1615,6 +1616,7 @@ show_system_thread(enum_thread_type thread)
RETURN_NAME_AS_STRING(SYSTEM_THREAD_EVENT_SCHEDULER);
RETURN_NAME_AS_STRING(SYSTEM_THREAD_EVENT_WORKER);
RETURN_NAME_AS_STRING(SYSTEM_THREAD_SLAVE_BACKGROUND);
+ RETURN_NAME_AS_STRING(SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND);
default:
sprintf(buf, "<UNKNOWN SYSTEM THREAD: %d>", thread);
return buf;
@@ -2291,7 +2293,8 @@ public:
/* Needed by MariaDB semi sync replication */
Trans_binlog_info *semisync_info;
-
+ /* If this is a semisync slave connection. */
+ bool semi_sync_slave;
ulonglong client_capabilities; /* What the client supports */
ulong max_client_packet_length;
@@ -3177,11 +3180,20 @@ public:
/* Debug Sync facility. See debug_sync.cc. */
struct st_debug_sync_control *debug_sync_control;
#endif /* defined(ENABLED_DEBUG_SYNC) */
- THD(my_thread_id id, bool is_wsrep_applier= false);
+ /**
+ @param id thread identifier
+ @param is_wsrep_applier thread type
+ @param skip_lock instruct whether @c LOCK_global_system_variables
+ is already locked, to not acquire it then.
+ */
+ THD(my_thread_id id, bool is_wsrep_applier= false, bool skip_lock= false);
~THD();
-
- void init(void);
+ /**
+ @param skip_lock instruct whether @c LOCK_global_system_variables
+ is already locked, to not acquire it then.
+ */
+ void init(bool skip_lock= false);
/*
Initialize memory roots necessary for query processing and (!)
pre-allocate memory for it. We can't do that in THD constructor because
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 221d3d76f69..79c0f377a20 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -746,7 +746,7 @@ public:
/* thread handler */
THD *thd;
/*
- SELECT_LEX for hidden SELECT in onion which process global
+ SELECT_LEX for hidden SELECT in union which process global
ORDER BY and LIMIT
*/
st_select_lex *fake_select_lex;
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 01b77f49b7d..4afecb67ec9 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -82,7 +82,6 @@
#include <m_ctype.h>
#include <myisam.h>
#include <my_dir.h>
-#include "rpl_handler.h"
#include "rpl_mi.h"
#include "sql_digest.h"
@@ -2399,11 +2398,12 @@ com_multi_end:
THD_STAGE_INFO(thd, stage_cleaning_up);
thd->reset_query();
- thd->set_examined_row_count(0); // For processlist
- thd->set_command(COM_SLEEP);
/* Performance Schema Interface instrumentation, end */
MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
+ thd->set_examined_row_count(0); // For processlist
+ thd->set_command(COM_SLEEP);
+
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
@@ -5114,6 +5114,9 @@ end_with_restore_list:
{
List<set_var_base> *lex_var_list= &lex->var_list;
+ if (check_dependencies_in_with_clauses(thd->lex->with_clauses_list))
+ goto error;
+
if ((check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE)
|| open_and_lock_tables(thd, all_tables, TRUE, 0)))
goto error;
diff --git a/sql/sql_partition.cc b/sql/sql_partition.cc
index 2dc8ad498ca..879a29f4e42 100644
--- a/sql/sql_partition.cc
+++ b/sql/sql_partition.cc
@@ -6749,10 +6749,7 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
lpt->part_info= part_info;
lpt->alter_info= alter_info;
lpt->create_info= create_info;
- lpt->db_options= create_info->table_options;
- if (create_info->row_type != ROW_TYPE_FIXED &&
- create_info->row_type != ROW_TYPE_DEFAULT)
- lpt->db_options|= HA_OPTION_PACK_RECORD;
+ lpt->db_options= create_info->table_options_with_row_type();
lpt->table= table;
lpt->key_info_buffer= 0;
lpt->key_count= 0;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 08e9dcf3fe6..1d6aa0aaab1 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -28,9 +28,9 @@
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
-#include "rpl_handler.h"
#include "debug_sync.h"
-#include "log.h" // get_gtid_list_event
+#include "semisync_master.h"
+#include "semisync_slave.h"
enum enum_gtid_until_state {
GTID_UNTIL_NOT_DONE,
@@ -160,6 +160,7 @@ struct binlog_send_info {
bool clear_initial_log_pos;
bool should_stop;
+ size_t dirlen;
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
char *lfn)
@@ -313,16 +314,43 @@ static int reset_transmit_packet(binlog_send_info *info, ushort flags,
packet->length(0);
packet->set("\0", 1, &my_charset_bin);
- if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet)))
+ if (info->thd->semi_sync_slave)
{
- info->error= ER_UNKNOWN_ERROR;
- *errmsg= "Failed to run hook 'reserve_header'";
- ret= 1;
+ if (repl_semisync_master.reserve_sync_header(packet))
+ {
+ info->error= ER_UNKNOWN_ERROR;
+ *errmsg= "Failed to run hook 'reserve_header'";
+ ret= 1;
+ }
}
+
*ev_offset= packet->length();
return ret;
}
+int get_user_var_int(const char *name,
+ long long int *value, int *null_value)
+{
+ bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&current_thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (!entry)
+ return 1;
+ *value= entry->val_int(&null_val);
+ if (null_value)
+ *null_value= null_val;
+ return 0;
+}
+
+inline bool is_semi_sync_slave()
+{
+ int null_value;
+ long long val= 0;
+ get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
+ return val;
+}
+
static int send_file(THD *thd)
{
NET* net = &thd->net;
@@ -1606,6 +1634,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
enum enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg;
slave_connection_state *gtid_state= &info->gtid_state;
slave_connection_state *until_gtid_state= info->until_gtid_state;
+ bool need_sync= false;
if (event_type == GTID_LIST_EVENT &&
info->using_gtid_state && until_gtid_state)
@@ -1916,8 +1945,10 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave);
pos= my_b_tell(log);
- if (RUN_HOOK(binlog_transmit, before_send_event,
- (info->thd, info->flags, packet, info->log_file_name, pos)))
+ if (repl_semisync_master.update_sync_header(info->thd,
+ (uchar*) packet->c_ptr(),
+ info->log_file_name + info->dirlen,
+ pos, &need_sync))
{
info->error= ER_UNKNOWN_ERROR;
return "run 'before_send_event' hook failed";
@@ -1939,8 +1970,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
- if (RUN_HOOK(binlog_transmit, after_send_event,
- (info->thd, info->flags, packet)))
+ if (need_sync && repl_semisync_master.flush_net(info->thd, packet->c_ptr()))
{
info->error= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'";
@@ -2370,7 +2400,7 @@ static int wait_new_events(binlog_send_info *info, /* in */
PSI_stage_info old_stage;
mysql_bin_log.lock_binlog_end_pos();
- info->thd->ENTER_COND(mysql_bin_log.get_log_cond(),
+ info->thd->ENTER_COND(mysql_bin_log.get_bin_log_cond(),
mysql_bin_log.get_binlog_end_pos_lock(),
&stage_master_has_sent_all_binlog_to_slave,
&old_stage);
@@ -2681,7 +2711,7 @@ static int send_one_binlog_file(binlog_send_info *info,
/** end of file or error */
return (int)end_pos;
}
-
+ info->dirlen= dirname_length(info->log_file_name);
/**
* send events from current position up to end_pos
*/
@@ -2703,6 +2733,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name);
binlog_send_info *info= &infoobj;
+ bool has_transmit_started= false;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
@@ -2715,11 +2746,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
if (init_binlog_sender(info, &linfo, log_ident, &pos))
goto err;
- /*
- run hook first when all check has been made that slave seems to
- be requesting a reasonable position. i.e when transmit actually starts
- */
- if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+ has_transmit_started= true;
+
+ /* Check if the dump thread is created by a slave with semisync enabled. */
+ thd->semi_sync_slave = is_semi_sync_slave();
+ if (repl_semisync_master.dump_start(thd, log_ident, pos))
{
info->errmsg= "Failed to run hook 'transmit_start'";
info->error= ER_UNKNOWN_ERROR;
@@ -2841,7 +2872,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
err:
THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
- RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
+ if (has_transmit_started)
+ {
+ repl_semisync_master.dump_end(thd);
+ }
if (info->thd->killed == KILL_SLAVE_SAME_ID)
{
@@ -3307,7 +3341,8 @@ int reset_slave(THD *thd, Master_info* mi)
else if (global_system_variables.log_warnings > 1)
sql_print_information("Deleted Master_info file '%s'.", fname);
- RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
+ if (rpl_semi_sync_slave_enabled)
+ repl_semisync_slave.reset_slave(mi);
err:
mi->unlock_slave_threads();
if (error)
@@ -3809,11 +3844,13 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len,
return 1;
}
- if (mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
- next_log_number))
- return 1;
- RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
- return 0;
+ bool ret= 0;
+ /* Temporarily disable master semisync before reseting master. */
+ repl_semisync_master.before_reset_master();
+ ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
+ next_log_number);
+ repl_semisync_master.after_reset_master();
+ return ret;
}
@@ -3930,7 +3967,7 @@ bool mysql_show_binlog_events(THD* thd)
my_off_t scan_pos = BIN_LOG_HEADER_SIZE;
while (scan_pos < pos)
{
- ev= Log_event::read_log_event(&log, (mysql_mutex_t*)0, description_event,
+ ev= Log_event::read_log_event(&log, description_event,
opt_master_verify_checksum);
scan_pos = my_b_tell(&log);
if (ev == NULL || !ev->is_valid())
@@ -3964,7 +4001,7 @@ bool mysql_show_binlog_events(THD* thd)
my_b_seek(&log, pos);
for (event_count = 0;
- (ev = Log_event::read_log_event(&log, (mysql_mutex_t*) 0,
+ (ev = Log_event::read_log_event(&log,
description_event,
opt_master_verify_checksum)); )
{
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index ebde0b6f726..dafe838bc99 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -4509,10 +4509,7 @@ handler *mysql_create_frm_image(THD *thd,
set_table_default_charset(thd, create_info, (char*) db);
- db_options= create_info->table_options;
- if (create_info->row_type == ROW_TYPE_DYNAMIC ||
- create_info->row_type == ROW_TYPE_PAGE)
- db_options|= HA_OPTION_PACK_RECORD;
+ db_options= create_info->table_options_with_row_type();
if (!(file= get_new_handler((TABLE_SHARE*) 0, thd->mem_root,
create_info->db_type)))
diff --git a/sql/sql_view.cc b/sql/sql_view.cc
index 21ceb6ce19b..50ea5581cc9 100644
--- a/sql/sql_view.cc
+++ b/sql/sql_view.cc
@@ -430,14 +430,14 @@ bool mysql_create_view(THD *thd, TABLE_LIST *views,
lex->link_first_table_back(view, link_to_local);
view->open_type= OT_BASE_ONLY;
- WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
-
if (check_dependencies_in_with_clauses(lex->with_clauses_list))
{
res= TRUE;
goto err;
}
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
+
/*
ignore lock specs for CREATE statement
*/
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index c8d54b967f0..63514d7257d 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -1726,7 +1726,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
opt_default_time_precision
case_stmt_body opt_bin_mod opt_for_system_time_clause
opt_if_exists_table_element opt_if_not_exists_table_element
- opt_recursive
+ opt_recursive opt_format_xid
%type <object_ddl_options>
create_or_replace
@@ -17497,9 +17497,26 @@ xa:
{
Lex->sql_command = SQLCOM_XA_ROLLBACK;
}
- | XA_SYM RECOVER_SYM
+ | XA_SYM RECOVER_SYM opt_format_xid
{
Lex->sql_command = SQLCOM_XA_RECOVER;
+ Lex->verbose= $3;
+ }
+ ;
+
+opt_format_xid:
+ /* empty */ { $$= false; }
+ | FORMAT_SYM '=' ident_or_text
+ {
+ if (!my_strcasecmp(system_charset_info, $3.str, "SQL"))
+ $$= true;
+ else if (!my_strcasecmp(system_charset_info, $3.str, "RAW"))
+ $$= false;
+ else
+ {
+ my_yyabort_error((ER_UNKNOWN_EXPLAIN_FORMAT, MYF(0), "XA RECOVER", $3.str));
+ $$= false;
+ }
}
;
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 9b534fabb5b..4cb1edc3d31 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -35,6 +35,7 @@
#include "sql_priv.h"
#include "sql_class.h" // set_var.h: THD
#include "sys_vars.ic"
+#include "my_sys.h"
#include "events.h"
#include <thr_alarm.h>
@@ -61,6 +62,8 @@
#include "sql_repl.h"
#include "opt_range.h"
#include "rpl_parallel.h"
+#include "semisync_master.h"
+#include "semisync_slave.h"
#include <ssl_compat.h>
/*
@@ -3093,8 +3096,174 @@ static Sys_var_replicate_events_marked_for_skip Replicate_events_marked_for_skip
"the slave).",
GLOBAL_VAR(opt_replicate_events_marked_for_skip), CMD_LINE(REQUIRED_ARG),
replicate_events_marked_for_skip_names, DEFAULT(RPL_SKIP_REPLICATE));
-#endif
+/* new options for semisync */
+
+static bool fix_rpl_semi_sync_master_enabled(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ if (rpl_semi_sync_master_enabled)
+ {
+ if (repl_semisync_master.enable_master() != 0)
+ rpl_semi_sync_master_enabled= false;
+ else if (ack_receiver.start())
+ {
+ repl_semisync_master.disable_master();
+ rpl_semi_sync_master_enabled= false;
+ }
+ }
+ else
+ {
+ if (repl_semisync_master.disable_master() != 0)
+ rpl_semi_sync_master_enabled= true;
+ if (!rpl_semi_sync_master_enabled)
+ ack_receiver.stop();
+ }
+ return false;
+}
+
+static bool fix_rpl_semi_sync_master_timeout(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_master.set_wait_timeout(rpl_semi_sync_master_timeout);
+ return false;
+}
+
+static bool fix_rpl_semi_sync_master_trace_level(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_master.set_trace_level(rpl_semi_sync_master_trace_level);
+ ack_receiver.set_trace_level(rpl_semi_sync_master_trace_level);
+ return false;
+}
+
+static bool fix_rpl_semi_sync_master_wait_point(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_master.set_wait_point(rpl_semi_sync_master_wait_point);
+ return false;
+}
+
+static bool fix_rpl_semi_sync_master_wait_no_slave(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_master.check_and_switch();
+ return false;
+}
+
+static Sys_var_mybool Sys_semisync_master_enabled(
+ "rpl_semi_sync_master_enabled",
+ "Enable semi-synchronous replication master (disabled by default).",
+ GLOBAL_VAR(rpl_semi_sync_master_enabled),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_master_enabled));
+
+static Sys_var_ulong Sys_semisync_master_timeout(
+ "rpl_semi_sync_master_timeout",
+ "The timeout value (in ms) for semi-synchronous replication in the "
+ "master",
+ GLOBAL_VAR(rpl_semi_sync_master_timeout),
+ CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0,~0L),DEFAULT(10000),BLOCK_SIZE(1),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_master_timeout));
+
+static Sys_var_mybool Sys_semisync_master_wait_no_slave(
+ "rpl_semi_sync_master_wait_no_slave",
+ "Wait until timeout when no semi-synchronous replication slave "
+ "available (enabled by default).",
+ GLOBAL_VAR(rpl_semi_sync_master_wait_no_slave),
+ CMD_LINE(OPT_ARG), DEFAULT(TRUE),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_master_wait_no_slave));
+
+static Sys_var_ulong Sys_semisync_master_trace_level(
+ "rpl_semi_sync_master_trace_level",
+ "The tracing level for semi-sync replication.",
+ GLOBAL_VAR(rpl_semi_sync_master_trace_level),
+ CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0,~0L),DEFAULT(32),BLOCK_SIZE(1),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_master_trace_level));
+
+static const char *repl_semisync_wait_point[]=
+{"AFTER_SYNC", "AFTER_COMMIT", NullS};
+
+static Sys_var_enum Sys_semisync_master_wait_point(
+ "rpl_semi_sync_master_wait_point",
+ "Should transaction wait for semi-sync ack after having synced binlog, "
+ "or after having committed in storage engine.",
+ GLOBAL_VAR(rpl_semi_sync_master_wait_point), CMD_LINE(REQUIRED_ARG),
+ repl_semisync_wait_point, DEFAULT(1),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_master_wait_point));
+
+static bool fix_rpl_semi_sync_slave_enabled(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_slave.set_slave_enabled(rpl_semi_sync_slave_enabled != 0);
+ return false;
+}
+
+static bool fix_rpl_semi_sync_slave_trace_level(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_slave.set_trace_level(rpl_semi_sync_slave_trace_level);
+ return false;
+}
+
+static bool fix_rpl_semi_sync_slave_delay_master(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_slave.set_delay_master(rpl_semi_sync_slave_delay_master);
+ return false;
+}
+
+static bool fix_rpl_semi_sync_slave_kill_conn_timeout(sys_var *self, THD *thd,
+ enum_var_type type)
+{
+ repl_semisync_slave.
+ set_kill_conn_timeout(rpl_semi_sync_slave_kill_conn_timeout);
+ return false;
+}
+
+static Sys_var_mybool Sys_semisync_slave_enabled(
+ "rpl_semi_sync_slave_enabled",
+ "Enable semi-synchronous replication slave (disabled by default).",
+ GLOBAL_VAR(rpl_semi_sync_slave_enabled),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_slave_enabled));
+
+static Sys_var_ulong Sys_semisync_slave_trace_level(
+ "rpl_semi_sync_slave_trace_level",
+ "The tracing level for semi-sync replication.",
+ GLOBAL_VAR(rpl_semi_sync_slave_trace_level),
+ CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0,~0L),DEFAULT(32),BLOCK_SIZE(1),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_slave_trace_level));
+
+static Sys_var_mybool Sys_semisync_slave_delay_master(
+ "rpl_semi_sync_slave_delay_master",
+ "Only write master info file when ack is needed.",
+ GLOBAL_VAR(rpl_semi_sync_slave_delay_master),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_slave_delay_master));
+
+static Sys_var_uint Sys_semisync_slave_kill_conn_timeout(
+ "rpl_semi_sync_slave_kill_conn_timeout",
+ "Timeout for the mysql connection used to kill the slave io_thread's "
+ "connection on master. This timeout comes into play when stop slave "
+ "is executed.",
+ GLOBAL_VAR(rpl_semi_sync_slave_kill_conn_timeout),
+ CMD_LINE(OPT_ARG),
+ VALID_RANGE(0, UINT_MAX), DEFAULT(5), BLOCK_SIZE(1),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_rpl_semi_sync_slave_kill_conn_timeout));
+#endif /* HAVE_REPLICATION */
static Sys_var_ulong Sys_slow_launch_time(
"slow_launch_time",
@@ -3625,27 +3794,6 @@ static Sys_var_charptr Sys_version_source_revision(
CMD_LINE_HELP_ONLY,
IN_SYSTEM_CHARSET, DEFAULT(SOURCE_REVISION));
-static char *guess_malloc_library()
-{
- if (strcmp(MALLOC_LIBRARY, "system") == 0)
- {
-#ifdef HAVE_DLOPEN
- typedef int (*mallctl_type)(const char*, void*, size_t*, void*, size_t);
- mallctl_type mallctl_func;
- mallctl_func= (mallctl_type)dlsym(RTLD_DEFAULT, "mallctl");
- if (mallctl_func)
- {
- static char buf[128];
- char *ver;
- size_t len = sizeof(ver);
- mallctl_func("version", &ver, &len, NULL, 0);
- strxnmov(buf, sizeof(buf)-1, "jemalloc ", ver, NULL);
- return buf;
- }
-#endif
- }
- return const_cast<char*>(MALLOC_LIBRARY);
-}
static char *malloc_library;
static Sys_var_charptr Sys_malloc_library(
"version_malloc_library", "Version of the used malloc library",
diff --git a/sql/transaction.cc b/sql/transaction.cc
index cbd875e3114..ec277e9c9c4 100644
--- a/sql/transaction.cc
+++ b/sql/transaction.cc
@@ -21,10 +21,9 @@
#include "mariadb.h"
#include "sql_priv.h"
#include "transaction.h"
-#include "rpl_handler.h"
#include "debug_sync.h" // DEBUG_SYNC
#include "sql_acl.h"
-
+#include "semisync_master.h"
#ifndef EMBEDDED_LIBRARY
/**
@@ -318,9 +317,17 @@ bool trans_commit(THD *thd)
transaction, so the hooks for rollback will be called.
*/
if (res)
- (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+ {
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.wait_after_rollback(thd, FALSE);
+#endif
+ }
else
- (void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
+ {
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.wait_after_commit(thd, FALSE);
+#endif
+ }
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.reset();
thd->lex->start_transaction_opt= 0;
@@ -413,7 +420,9 @@ bool trans_rollback(THD *thd)
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
res= ha_rollback_trans(thd, TRUE);
- (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.wait_after_rollback(thd, FALSE);
+#endif
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
/* Reset the binlog transaction marker */
thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
@@ -526,9 +535,17 @@ bool trans_commit_stmt(THD *thd)
transaction, so the hooks for rollback will be called.
*/
if (res)
- (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+ {
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.wait_after_rollback(thd, FALSE);
+#endif
+ }
else
- (void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
+ {
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.wait_after_commit(thd, FALSE);
+#endif
+ }
thd->transaction.stmt.reset();
@@ -567,7 +584,9 @@ bool trans_rollback_stmt(THD *thd)
trans_reset_one_shot_chistics(thd);
}
- (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+#ifdef HAVE_REPLICATION
+ repl_semisync_master.wait_after_rollback(thd, FALSE);
+#endif
thd->transaction.stmt.reset();