diff options
author | Aleksey Midenkov <midenok@gmail.com> | 2017-12-21 11:16:42 +0300 |
---|---|---|
committer | Aleksey Midenkov <midenok@gmail.com> | 2017-12-21 11:16:42 +0300 |
commit | 5c0a19c873382c9fec696f827e6766f61c6682af (patch) | |
tree | c39d60a5557669b779ad72aac6c10abef3ed0eba /sql | |
parent | 5c760d952b8ae8a8722b206da3de0ebbad4978e5 (diff) | |
parent | 9ec2479778269fb33194c088216119d4f1dca58d (diff) | |
download | mariadb-git-5c0a19c873382c9fec696f827e6766f61c6682af.tar.gz |
System Versioning 1.0 pre7
Merge branch '10.3' into trunk
Diffstat (limited to 'sql')
39 files changed, 3707 insertions, 1038 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 0697b53e414..c362ba4a8ee 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -36,7 +36,7 @@ ELSE() ENDIF() INCLUDE_DIRECTORIES( -${CMAKE_SOURCE_DIR}/include +${CMAKE_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/sql ${PCRE_INCLUDES} ${ZLIB_INCLUDE_DIR} @@ -122,7 +122,7 @@ SET (SQL_SOURCE rpl_rli.cc rpl_mi.cc sql_servers.cc sql_audit.cc sql_connect.cc scheduler.cc sql_partition_admin.cc sql_profile.cc event_parse_data.cc sql_alter.cc - sql_signal.cc rpl_handler.cc mdl.cc sql_admin.cc + sql_signal.cc mdl.cc sql_admin.cc transaction.cc sys_vars.cc sql_truncate.cc datadict.cc sql_reload.cc item_inetfunc.cc @@ -138,6 +138,8 @@ SET (SQL_SOURCE my_apc.cc mf_iocache_encr.cc item_jsonfunc.cc my_json_writer.cc rpl_gtid.cc rpl_parallel.cc + semisync.cc semisync_master.cc semisync_slave.cc + semisync_master_ack_receiver.cc sql_type.cc item_windowfunc.cc sql_window.cc sql_cte.cc diff --git a/sql/events.cc b/sql/events.cc index 3ad546217a7..36f03e26125 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -421,7 +421,7 @@ Events::create_event(THD *thd, Event_parse_data *parse_data) DBUG_RETURN(ret); #ifdef WITH_WSREP error: - DBUG_RETURN(TRUE); + DBUG_RETURN(true); #endif /* WITH_WSREP */ } diff --git a/sql/field.cc b/sql/field.cc index 21f869d8a07..8cc9e7d1223 100644 --- a/sql/field.cc +++ b/sql/field.cc @@ -4741,6 +4741,15 @@ double Field_double::val_real(void) } +longlong Field_double::val_int_from_real(bool want_unsigned_result) +{ + Converter_double_to_longlong conv(val_real(), want_unsigned_result); + if (!want_unsigned_result && conv.error()) + conv.push_warning(get_thd(), Field_double::val_real(), false); + return conv.result(); +} + + my_decimal *Field_real::val_decimal(my_decimal *decimal_value) { ASSERT_COLUMN_MARKED_FOR_READ; diff --git a/sql/field.h b/sql/field.h index 42a3b9e58bd..1e72fd16e6a 100644 --- a/sql/field.h +++ b/sql/field.h @@ -850,6 +850,10 @@ public: } virtual double val_real(void)=0; virtual longlong val_int(void)=0; + virtual ulonglong val_uint(void) + { + return (ulonglong) val_int(); + } virtual bool val_bool(void)= 0; virtual my_decimal *val_decimal(my_decimal *); inline String *val_str(String *str) { return val_str(str, str); } @@ -2278,6 +2282,7 @@ private: class Field_double :public Field_real { + longlong val_int_from_real(bool want_unsigned_result); public: Field_double(uchar *ptr_arg, uint32 len_arg, uchar *null_ptr_arg, uchar null_bit_arg, @@ -2315,13 +2320,8 @@ public: int store(longlong nr, bool unsigned_val); int reset(void) { bzero(ptr,sizeof(double)); return 0; } double val_real(void); - longlong val_int(void) - { - Converter_double_to_longlong conv(Field_double::val_real(), false); - if (conv.error()) - conv.push_warning(get_thd(), Field_double::val_real(), false); - return conv.result(); - } + longlong val_int(void) { return val_int_from_real(false); } + ulonglong val_uint(void) { return (ulonglong) val_int_from_real(true); } String *val_str(String*,String *); bool send_binary(Protocol *protocol); int cmp(const uchar *,const uchar *); diff --git a/sql/handler.cc b/sql/handler.cc index 5b8aaa99c5a..00c38f0dd74 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -23,7 +23,7 @@ #include "mariadb.h" #include "sql_priv.h" #include "unireg.h" -#include "rpl_handler.h" +#include "rpl_rli.h" #include "sql_cache.h" // query_cache, query_cache_* #include "sql_connect.h" // global_table_stats #include "key.h" // key_copy, key_unpack, key_cmp_if_same, key_cmp @@ -50,6 +50,7 @@ #ifdef WITH_ARIA_STORAGE_ENGINE #include "../storage/maria/ha_maria.h" #endif +#include "semisync_master.h" #include "wsrep_mysqld.h" #include "wsrep.h" @@ -1518,7 +1519,10 @@ done: mysql_mutex_assert_not_owner(mysql_bin_log.get_log_lock()); mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); - RUN_HOOK(transaction, after_commit, (thd, FALSE)); +#ifdef HAVE_REPLICATION + repl_semisync_master.wait_after_commit(thd, all); + DEBUG_SYNC(thd, "after_group_after_commit"); +#endif goto end; /* Come here if error and we need to rollback. */ @@ -1763,7 +1767,9 @@ int ha_rollback_trans(THD *thd, bool all) push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARNING_NOT_COMPLETE_ROLLBACK, ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK)); - (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); +#ifdef HAVE_REPLICATION + repl_semisync_master.wait_after_rollback(thd, all); +#endif DBUG_RETURN(error); } @@ -2021,6 +2027,97 @@ int ha_recover(HASH *commit_list) } /** + return the XID as it appears in the SQL function's arguments. + So this string can be passed to XA START, XA PREPARE etc... + + @note + the 'buf' has to have space for at least SQL_XIDSIZE bytes. +*/ + + +/* + 'a'..'z' 'A'..'Z', '0'..'9' + and '-' '_' ' ' symbols don't have to be + converted. +*/ + +static const char xid_needs_conv[128]= +{ + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, + 0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1, + 0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1, + 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0, + 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1 +}; + +uint get_sql_xid(XID *xid, char *buf) +{ + int tot_len= xid->gtrid_length + xid->bqual_length; + int i; + const char *orig_buf= buf; + + for (i=0; i<tot_len; i++) + { + uchar c= ((uchar *) xid->data)[i]; + if (c >= 128 || xid_needs_conv[c]) + break; + } + + if (i >= tot_len) + { + /* No need to convert characters to hexadecimals. */ + *buf++= '\''; + memcpy(buf, xid->data, xid->gtrid_length); + buf+= xid->gtrid_length; + *buf++= '\''; + if (xid->bqual_length > 0 || xid->formatID != 1) + { + *buf++= ','; + *buf++= '\''; + memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length); + buf+= xid->bqual_length; + *buf++= '\''; + } + } + else + { + *buf++= 'X'; + *buf++= '\''; + for (i= 0; i < xid->gtrid_length; i++) + { + *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4]; + *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f]; + } + *buf++= '\''; + if (xid->bqual_length > 0 || xid->formatID != 1) + { + *buf++= ','; + *buf++= 'X'; + *buf++= '\''; + for (; i < tot_len; i++) + { + *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4]; + *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f]; + } + *buf++= '\''; + } + } + + if (xid->formatID != 1) + { + *buf++= ','; + buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf, + MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID); + } + + return buf - orig_buf; +} + + +/** return the list of XID's to a client, the same way SHOW commands do. @note @@ -2029,7 +2126,8 @@ int ha_recover(HASH *commit_list) It can be easily fixed later, if necessary. */ -static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol) +static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol, + char *data, uint data_len, CHARSET_INFO *data_cs) { if (xs->xa_state == XA_PREPARED) { @@ -2037,8 +2135,7 @@ static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol) protocol->store_longlong((longlong) xs->xid.formatID, FALSE); protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE); protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE); - protocol->store(xs->xid.data, xs->xid.gtrid_length + xs->xid.bqual_length, - &my_charset_bin); + protocol->store(data, data_len, data_cs); if (protocol->write()) return TRUE; } @@ -2046,11 +2143,28 @@ static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol) } +static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol) +{ + return xa_recover_callback(xs, protocol, xs->xid.data, + xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin); +} + + +static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol) +{ + char buf[SQL_XIDSIZE]; + uint len= get_sql_xid(&xs->xid, buf); + return xa_recover_callback(xs, protocol, buf, len, + &my_charset_utf8_general_ci); +} + + bool mysql_xa_recover(THD *thd) { List<Item> field_list; Protocol *protocol= thd->protocol; MEM_ROOT *mem_root= thd->mem_root; + my_hash_walk_action action; DBUG_ENTER("mysql_xa_recover"); field_list.push_back(new (mem_root) @@ -2062,16 +2176,32 @@ bool mysql_xa_recover(THD *thd) field_list.push_back(new (mem_root) Item_int(thd, "bqual_length", 0, MY_INT32_NUM_DECIMAL_DIGITS), mem_root); - field_list.push_back(new (mem_root) - Item_empty_string(thd, "data", - XIDDATASIZE), mem_root); + { + uint len; + CHARSET_INFO *cs; + + if (thd->lex->verbose) + { + len= SQL_XIDSIZE; + cs= &my_charset_utf8_general_ci; + action= (my_hash_walk_action) xa_recover_callback_verbose; + } + else + { + len= XIDDATASIZE; + cs= &my_charset_bin; + action= (my_hash_walk_action) xa_recover_callback_short; + } + + field_list.push_back(new (mem_root) + Item_empty_string(thd, "data", len, cs), mem_root); + } if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); - if (xid_cache_iterate(thd, (my_hash_walk_action) xa_recover_callback, - protocol)) + if (xid_cache_iterate(thd, action, protocol)) DBUG_RETURN(1); my_eof(thd); DBUG_RETURN(0); diff --git a/sql/handler.h b/sql/handler.h index 13b8dc41f65..1a4e83c093d 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -672,6 +672,15 @@ struct xid_t { }; typedef struct xid_t XID; +/* + The size of XID string representation in the form + 'gtrid', 'bqual', formatID + see xid_t::get_sql_string() for details. +*/ +#define SQL_XIDSIZE (XIDDATASIZE * 2 + 8 + MY_INT64_NUM_DECIMAL_DIGITS) +/* The 'buf' has to have space for at least SQL_XIDSIZE bytes. */ +uint get_sql_xid(XID *xid, char *buf); + /* for recover() handlerton call */ #define MIN_XID_LIST_SIZE 128 #define MAX_XID_LIST_SIZE (1024*128) @@ -1929,6 +1938,13 @@ struct HA_CREATE_INFO: public Table_scope_and_contents_source_st, used_fields|= (HA_CREATE_USED_CHARSET | HA_CREATE_USED_DEFAULT_CHARSET); return false; } + ulong table_options_with_row_type() + { + if (row_type == ROW_TYPE_DYNAMIC || row_type == ROW_TYPE_PAGE) + return table_options | HA_OPTION_PACK_RECORD; + else + return table_options; + } }; diff --git a/sql/log.cc b/sql/log.cc index e5ff85e4544..bdf0b6fdc59 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -49,10 +49,10 @@ #endif #include "sql_plugin.h" -#include "rpl_handler.h" #include "debug_sync.h" #include "sql_show.h" #include "my_pthread.h" +#include "semisync_master.h" #include "wsrep_mysqld.h" #include "sp_rcontext.h" #include "sp_head.h" @@ -3211,7 +3211,7 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) group_commit_trigger_lock_wait(0), sync_period_ptr(sync_period), sync_counter(0), state_file_deleted(false), binlog_state_recover_done(false), - is_relay_log(0), signal_cnt(0), + is_relay_log(0), relay_signal_cnt(0), checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF), relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), description_event_for_exec(0), description_event_for_queue(0), @@ -3281,7 +3281,8 @@ void MYSQL_BIN_LOG::cleanup() mysql_mutex_destroy(&LOCK_xid_list); mysql_mutex_destroy(&LOCK_binlog_background_thread); mysql_mutex_destroy(&LOCK_binlog_end_pos); - mysql_cond_destroy(&update_cond); + mysql_cond_destroy(&COND_relay_log_updated); + mysql_cond_destroy(&COND_bin_log_updated); mysql_cond_destroy(&COND_queue_busy); mysql_cond_destroy(&COND_xid_list); mysql_cond_destroy(&COND_binlog_background_thread); @@ -3316,7 +3317,8 @@ void MYSQL_BIN_LOG::init_pthread_objects() mysql_mutex_setflags(&LOCK_index, MYF_NO_DEADLOCK_DETECTION); mysql_mutex_init(key_BINLOG_LOCK_xid_list, &LOCK_xid_list, MY_MUTEX_INIT_FAST); - mysql_cond_init(m_key_update_cond, &update_cond, 0); + mysql_cond_init(m_key_relay_log_update, &COND_relay_log_updated, 0); + mysql_cond_init(m_key_bin_log_update, &COND_bin_log_updated, 0); mysql_cond_init(m_key_COND_queue_busy, &COND_queue_busy, 0); mysql_cond_init(key_BINLOG_COND_xid_list, &COND_xid_list, 0); @@ -3802,6 +3804,11 @@ bool MYSQL_BIN_LOG::open(const char *log_name, close_purge_index_file(); #endif + /* Notify the io thread that binlog is rotated to a new file */ + if (is_relay_log) + signal_relay_log_update(); + else + update_binlog_end_pos(); DBUG_RETURN(0); err: @@ -5112,7 +5119,12 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) new file name in the current binary log file. */ if ((error= generate_new_name(new_name, name, 0))) + { +#ifdef ENABLE_AND_FIX_HANG + close_on_error= TRUE; +#endif goto end; + } new_name_ptr=new_name; if (log_type == LOG_BIN) @@ -5143,13 +5155,20 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) } bytes_written += r.data_written; } - /* - Update needs to be signalled even if there is no rotate event - log rotation should give the waiting thread a signal to - discover EOF and move on to the next log. - */ - signal_update(); } + + /* + Update needs to be signalled even if there is no rotate event + log rotation should give the waiting thread a signal to + discover EOF and move on to the next log. + */ + if ((error= flush_io_cache(&log_file))) + { + close_on_error= TRUE; + goto end; + } + update_binlog_end_pos(); + old_name=name; name=0; // Don't free name close_flag= LOG_CLOSE_TO_BE_OPENED | LOG_CLOSE_INDEX; @@ -5280,7 +5299,7 @@ bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev) if (my_b_append_tell(&log_file) > max_size) error= new_file_without_locking(); err: - signal_update(); // Safe as we don't call close + update_binlog_end_pos(); DBUG_RETURN(error); } @@ -5341,7 +5360,7 @@ bool MYSQL_BIN_LOG::write_event_buffer(uchar* buf, uint len) err: my_safe_afree(ebuf, len); if (!error) - signal_update(); + update_binlog_end_pos(); DBUG_RETURN(error); } @@ -6341,6 +6360,7 @@ err: { my_off_t offset= my_b_tell(file); bool check_purge= false; + DBUG_ASSERT(!is_relay_log); if (!error) { @@ -6355,25 +6375,23 @@ err: mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); - bool first= true; - bool last= true; - if ((error= RUN_HOOK(binlog_storage, after_flush, - (thd, log_file_name, file->pos_in_file, - synced, first, last)))) +#ifdef HAVE_REPLICATION + if (repl_semisync_master.report_binlog_update(thd, log_file_name, + file->pos_in_file)) { sql_print_error("Failed to run 'after_flush' hooks"); error= 1; } else +#endif { - /* update binlog_end_pos so it can be read by dump thread - * - * note: must be _after_ the RUN_HOOK(after_flush) or else - * semi-sync-plugin might not have put the transaction into - * it's list before dump-thread tries to send it - */ + /* + update binlog_end_pos so it can be read by dump thread + note: must be _after_ the RUN_HOOK(after_flush) or else + semi-sync might not have put the transaction into + it's list before dump-thread tries to send it + */ update_binlog_end_pos(offset); - if ((error= rotate(false, &check_purge))) check_purge= false; } @@ -6390,15 +6408,14 @@ err: mysql_mutex_assert_not_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); - bool first= true; - bool last= true; - if (RUN_HOOK(binlog_storage, after_sync, - (thd, log_file_name, file->pos_in_file, - first, last))) +#ifdef HAVE_REPLICATION + if (repl_semisync_master.wait_after_sync(log_file_name, + file->pos_in_file)) { error=1; /* error is already printed inside hook */ } +#endif /* Take mutex to protect against a reader seeing partial writes of 64-bit @@ -7099,7 +7116,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) if (!(error= write_incident_already_locked(thd)) && !(error= flush_and_sync(0))) { - signal_update(); + update_binlog_end_pos(); if ((error= rotate(false, &check_purge))) check_purge= false; } @@ -7140,7 +7157,7 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name_arg */ if (!write_event(&ev) && !flush_and_sync(0)) { - signal_update(); + update_binlog_end_pos(); } else { @@ -7575,7 +7592,11 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) else if (is_leader) trx_group_commit_leader(entry); else if (!entry->queued_by_other) + { + DEBUG_SYNC(entry->thd, "after_semisync_queue"); + entry->thd->wait_for_wakeup_ready(); + } else { /* @@ -7820,31 +7841,31 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); - bool first= true, last; + for (current= queue; current != NULL; current= current->next) { - last= current->next == NULL; +#ifdef HAVE_REPLICATION if (!current->error && - RUN_HOOK(binlog_storage, after_flush, - (current->thd, - current->cache_mngr->last_commit_pos_file, - current->cache_mngr->last_commit_pos_offset, synced, - first, last))) + repl_semisync_master. + report_binlog_update(current->thd, + current->cache_mngr->last_commit_pos_file, + current->cache_mngr-> + last_commit_pos_offset)) { current->error= ER_ERROR_ON_WRITE; current->commit_errno= -1; current->error_cache= NULL; any_error= true; } - first= false; +#endif } - /* update binlog_end_pos so it can be read by dump thread - * - * note: must be _after_ the RUN_HOOK(after_flush) or else - * semi-sync-plugin might not have put the transaction into - * it's list before dump-thread tries to send it - */ + /* + update binlog_end_pos so it can be read by dump thread + Note: must be _after_ the RUN_HOOK(after_flush) or else + semi-sync might not have put the transaction into + it's list before dump-thread tries to send it + */ update_binlog_end_pos(commit_offset); if (any_error) @@ -7906,18 +7927,19 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) mysql_mutex_assert_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); - bool first= true, last; + bool first __attribute__((unused))= true; + bool last __attribute__((unused)); for (current= queue; current != NULL; current= current->next) { last= current->next == NULL; - if (!current->error && - RUN_HOOK(binlog_storage, after_sync, - (current->thd, current->cache_mngr->last_commit_pos_file, - current->cache_mngr->last_commit_pos_offset, - first, last))) - { - /* error is already printed inside hook */ - } +#ifdef HAVE_REPLICATION + if (!current->error) + current->error= + repl_semisync_master.wait_after_sync(current->cache_mngr-> + last_commit_pos_file, + current->cache_mngr-> + last_commit_pos_offset); +#endif first= false; } } @@ -8228,10 +8250,10 @@ void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd) DBUG_ENTER("wait_for_update_relay_log"); mysql_mutex_assert_owner(&LOCK_log); - thd->ENTER_COND(&update_cond, &LOCK_log, + thd->ENTER_COND(&COND_relay_log_updated, &LOCK_log, &stage_slave_has_read_all_relay_log, &old_stage); - mysql_cond_wait(&update_cond, &LOCK_log); + mysql_cond_wait(&COND_relay_log_updated, &LOCK_log); thd->EXIT_COND(&old_stage); DBUG_VOID_RETURN; } @@ -8261,9 +8283,9 @@ int MYSQL_BIN_LOG::wait_for_update_binlog_end_pos(THD* thd, thd_wait_begin(thd, THD_WAIT_BINLOG); mysql_mutex_assert_owner(get_binlog_end_pos_lock()); if (!timeout) - mysql_cond_wait(&update_cond, get_binlog_end_pos_lock()); + mysql_cond_wait(&COND_bin_log_updated, get_binlog_end_pos_lock()); else - ret= mysql_cond_timedwait(&update_cond, get_binlog_end_pos_lock(), + ret= mysql_cond_timedwait(&COND_bin_log_updated, get_binlog_end_pos_lock(), timeout); thd_wait_end(thd); DBUG_RETURN(ret); @@ -8308,7 +8330,8 @@ void MYSQL_BIN_LOG::close(uint exiting) relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); write_event(&s); bytes_written+= s.data_written; - signal_update(); + flush_io_cache(&log_file); + update_binlog_end_pos(); /* When we shut down server, write out the binlog state to a separate @@ -8527,14 +8550,6 @@ bool flush_error_log() return result; } -void MYSQL_BIN_LOG::signal_update() -{ - DBUG_ENTER("MYSQL_BIN_LOG::signal_update"); - signal_cnt++; - mysql_cond_broadcast(&update_cond); - DBUG_VOID_RETURN; -} - #ifdef _WIN32 static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff, size_t length, size_t buffLen) @@ -9918,7 +9933,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, for (;;) { while ((ev= Log_event::read_log_event(first_round ? first_log : &log, - 0, fdle, opt_master_verify_checksum)) + fdle, opt_master_verify_checksum)) && ev->is_valid()) { enum Log_event_type typ= ev->get_type_code(); @@ -10159,7 +10174,7 @@ MYSQL_BIN_LOG::do_binlog_recovery(const char *opt_name, bool do_xa_recovery) return 1; } - if ((ev= Log_event::read_log_event(&log, 0, &fdle, + if ((ev= Log_event::read_log_event(&log, &fdle, opt_master_verify_checksum)) && ev->get_type_code() == FORMAT_DESCRIPTION_EVENT) { @@ -10400,7 +10415,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list) *out_gtid_list= NULL; - if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle, + if (!(ev= Log_event::read_log_event(cache, &init_fdle, opt_master_verify_checksum)) || ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) { @@ -10416,7 +10431,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list) { Log_event_type typ; - ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum); + ev= Log_event::read_log_event(cache, fdle, opt_master_verify_checksum); if (!ev) { errormsg= "Could not read GTID list event while looking for GTID " @@ -10446,6 +10461,7 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list) return errormsg; } + struct st_mysql_storage_engine binlog_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; diff --git a/sql/log.h b/sql/log.h index dffb6a80d54..02ace7c7921 100644 --- a/sql/log.h +++ b/sql/log.h @@ -349,6 +349,11 @@ public: /* for documentation of mutexes held in various places in code */ }; +/* Tell the io thread if we can delay the master info sync. */ +#define SEMI_SYNC_SLAVE_DELAY_SYNC 1 +/* Tell the io thread if the current event needs a ack. */ +#define SEMI_SYNC_NEED_ACK 2 + class MYSQL_QUERY_LOG: public MYSQL_LOG { public: @@ -425,14 +430,18 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG #ifdef HAVE_PSI_INTERFACE /** The instrumentation key to use for @ LOCK_index. */ PSI_mutex_key m_key_LOCK_index; - /** The instrumentation key to use for @ update_cond. */ - PSI_cond_key m_key_update_cond; + /** The instrumentation key to use for @ COND_relay_log_updated */ + PSI_cond_key m_key_relay_log_update; + /** The instrumentation key to use for @ COND_bin_log_updated */ + PSI_cond_key m_key_bin_log_update; /** The instrumentation key to use for opening the log file. */ PSI_file_key m_key_file_log; /** The instrumentation key to use for opening the log index file. */ PSI_file_key m_key_file_log_index; PSI_file_key m_key_COND_queue_busy; + /** The instrumentation key to use for LOCK_binlog_end_pos. */ + PSI_mutex_key m_key_LOCK_binlog_end_pos; #endif struct group_commit_entry @@ -488,7 +497,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG mysql_mutex_t LOCK_binlog_end_pos; mysql_mutex_t LOCK_xid_list; mysql_cond_t COND_xid_list; - mysql_cond_t update_cond; + mysql_cond_t COND_relay_log_updated, COND_bin_log_updated; ulonglong bytes_written; IO_CACHE index_file; char index_file_name[FN_REFLEN]; @@ -598,7 +607,7 @@ public: /* This is relay log */ bool is_relay_log; - ulong signal_cnt; // update of the counter is checked by heartbeat + ulong relay_signal_cnt; // update of the counter is checked by heartbeat enum enum_binlog_checksum_alg checksum_alg_reset; // to contain a new value when binlog is rotated /* Holds the last seen in Relay-Log FD's checksum alg value. @@ -661,16 +670,20 @@ public: #ifdef HAVE_PSI_INTERFACE void set_psi_keys(PSI_mutex_key key_LOCK_index, - PSI_cond_key key_update_cond, + PSI_cond_key key_relay_log_update, + PSI_cond_key key_bin_log_update, PSI_file_key key_file_log, PSI_file_key key_file_log_index, - PSI_file_key key_COND_queue_busy) + PSI_file_key key_COND_queue_busy, + PSI_mutex_key key_LOCK_binlog_end_pos) { m_key_LOCK_index= key_LOCK_index; - m_key_update_cond= key_update_cond; + m_key_relay_log_update= key_relay_log_update; + m_key_bin_log_update= key_bin_log_update; m_key_file_log= key_file_log; m_key_file_log_index= key_file_log_index; m_key_COND_queue_busy= key_COND_queue_busy; + m_key_LOCK_binlog_end_pos= key_LOCK_binlog_end_pos; } #endif @@ -707,7 +720,53 @@ public: DBUG_VOID_RETURN; } void set_max_size(ulong max_size_arg); - void signal_update(); + + /* Handle signaling that relay has been updated */ + void signal_relay_log_update() + { + mysql_mutex_assert_owner(&LOCK_log); + DBUG_ASSERT(is_relay_log); + DBUG_ENTER("MYSQL_BIN_LOG::signal_relay_log_update"); + relay_signal_cnt++; + mysql_cond_broadcast(&COND_relay_log_updated); + DBUG_VOID_RETURN; + } + void signal_bin_log_update() + { + mysql_mutex_assert_owner(&LOCK_binlog_end_pos); + DBUG_ASSERT(!is_relay_log); + DBUG_ENTER("MYSQL_BIN_LOG::signal_bin_log_update"); + mysql_cond_broadcast(&COND_bin_log_updated); + DBUG_VOID_RETURN; + } + void update_binlog_end_pos() + { + if (is_relay_log) + signal_relay_log_update(); + else + { + lock_binlog_end_pos(); + binlog_end_pos= my_b_safe_tell(&log_file); + signal_bin_log_update(); + unlock_binlog_end_pos(); + } + } + void update_binlog_end_pos(my_off_t pos) + { + mysql_mutex_assert_owner(&LOCK_log); + mysql_mutex_assert_not_owner(&LOCK_binlog_end_pos); + lock_binlog_end_pos(); + /* + Note: it would make more sense to assert(pos > binlog_end_pos) + but there are two places triggered by mtr that has pos == binlog_end_pos + i didn't investigate but accepted as it should do no harm + */ + DBUG_ASSERT(pos >= binlog_end_pos); + binlog_end_pos= pos; + signal_bin_log_update(); + unlock_binlog_end_pos(); + } + void wait_for_sufficient_commits(); void binlog_trigger_immediate_group_commit(); void wait_for_update_relay_log(THD* thd); @@ -807,7 +866,7 @@ public: inline char* get_log_fname() { return log_file_name; } inline char* get_name() { return name; } inline mysql_mutex_t* get_log_lock() { return &LOCK_log; } - inline mysql_cond_t* get_log_cond() { return &update_cond; } + inline mysql_cond_t* get_bin_log_cond() { return &COND_bin_log_updated; } inline IO_CACHE* get_log_file() { return &log_file; } inline void lock_index() { mysql_mutex_lock(&LOCK_index);} @@ -831,23 +890,6 @@ public: bool check_strict_gtid_sequence(uint32 domain_id, uint32 server_id, uint64 seq_no); - - void update_binlog_end_pos(my_off_t pos) - { - mysql_mutex_assert_owner(&LOCK_log); - mysql_mutex_assert_not_owner(&LOCK_binlog_end_pos); - lock_binlog_end_pos(); - /** - * note: it would make more sense to assert(pos > binlog_end_pos) - * but there are two places triggered by mtr that has pos == binlog_end_pos - * i didn't investigate but accepted as it should do no harm - */ - DBUG_ASSERT(pos >= binlog_end_pos); - binlog_end_pos= pos; - signal_update(); - unlock_binlog_end_pos(); - } - /** * used when opening new file, and binlog_end_pos moves backwards */ @@ -858,7 +900,7 @@ public: lock_binlog_end_pos(); binlog_end_pos= pos; strcpy(binlog_end_pos_file, file_name); - signal_update(); + signal_bin_log_update(); unlock_binlog_end_pos(); } diff --git a/sql/log_event.cc b/sql/log_event.cc index 1256da729f0..abc979f1b58 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -1855,7 +1855,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, DBUG_RETURN(0); } -Log_event* Log_event::read_log_event(IO_CACHE* file, mysql_mutex_t* log_lock, +Log_event* Log_event::read_log_event(IO_CACHE* file, const Format_description_log_event *fdle, my_bool crc_check) { @@ -1865,9 +1865,6 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, mysql_mutex_t* log_lock, const char *error= 0; Log_event *res= 0; - if (log_lock) - mysql_mutex_lock(log_lock); - switch (read_log_event(file, &event, fdle, BINLOG_CHECKSUM_ALG_OFF)) { case 0: @@ -1904,8 +1901,6 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, mysql_mutex_t* log_lock, res->register_temp_buf(event.release(), true); err: - if (log_lock) - mysql_mutex_unlock(log_lock); if (error) { DBUG_ASSERT(!res); @@ -9736,7 +9731,6 @@ int Execute_load_log_event::do_apply_event(rpl_group_info *rgi) } if (!(lev= (Load_log_event*) Log_event::read_log_event(&file, - (mysql_mutex_t*)0, rli->relay_log.description_event_for_exec, opt_slave_sql_verify_checksum)) || lev->get_type_code() != NEW_LOAD_EVENT) diff --git a/sql/log_event.h b/sql/log_event.h index 4d84bc34a47..29cae604678 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1300,7 +1300,6 @@ public: constructor and pass description_event as an argument. */ static Log_event* read_log_event(IO_CACHE* file, - mysql_mutex_t* log_lock, const Format_description_log_event *description_event, my_bool crc_check); diff --git a/sql/mysqld.cc b/sql/mysqld.cc index effb81e06f6..afbde520f7c 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -97,8 +97,8 @@ #include "set_var.h" #include "rpl_injector.h" - -#include "rpl_handler.h" +#include "semisync_master.h" +#include "semisync_slave.h" #include "transaction.h" @@ -914,7 +914,7 @@ PSI_mutex_key key_LOCK_des_key_file; PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_BINLOG_LOCK_binlog_background_thread, - m_key_LOCK_binlog_end_pos, + key_LOCK_binlog_end_pos, key_delayed_insert_mutex, key_hash_filo_lock, key_LOCK_active_mi, key_LOCK_connection_count, key_LOCK_crypt, key_LOCK_delayed_create, key_LOCK_delayed_insert, key_LOCK_delayed_status, key_LOCK_error_log, @@ -936,8 +936,10 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_LOCK_thread_count, key_LOCK_thread_cache, key_PARTITION_LOCK_auto_inc; PSI_mutex_key key_RELAYLOG_LOCK_index; +PSI_mutex_key key_LOCK_relaylog_end_pos; PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state, key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry; +PSI_mutex_key key_LOCK_binlog; PSI_mutex_key key_LOCK_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, @@ -949,6 +951,7 @@ PSI_mutex_key key_LOCK_after_binlog_sync; PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered, key_LOCK_slave_background; PSI_mutex_key key_TABLE_SHARE_LOCK_share; +PSI_mutex_key key_LOCK_ack_receiver; PSI_mutex_key key_TABLE_SHARE_LOCK_rotation; PSI_cond_key key_TABLE_SHARE_COND_rotation; @@ -970,8 +973,9 @@ static PSI_mutex_info all_server_mutexes[]= { &key_BINLOG_LOCK_index, "MYSQL_BIN_LOG::LOCK_index", 0}, { &key_BINLOG_LOCK_xid_list, "MYSQL_BIN_LOG::LOCK_xid_list", 0}, { &key_BINLOG_LOCK_binlog_background_thread, "MYSQL_BIN_LOG::LOCK_binlog_background_thread", 0}, - { &m_key_LOCK_binlog_end_pos, "MYSQL_BIN_LOG::LOCK_binlog_end_pos", 0 }, + { &key_LOCK_binlog_end_pos, "MYSQL_BIN_LOG::LOCK_binlog_end_pos", 0 }, { &key_RELAYLOG_LOCK_index, "MYSQL_RELAY_LOG::LOCK_index", 0}, + { &key_LOCK_relaylog_end_pos, "MYSQL_RELAY_LOG::LOCK_binlog_end_pos", 0}, { &key_delayed_insert_mutex, "Delayed_insert::mutex", 0}, { &key_hash_filo_lock, "hash_filo::lock", 0}, { &key_LOCK_active_mi, "LOCK_active_mi", PSI_FLAG_GLOBAL}, @@ -1029,7 +1033,9 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_binlog_state, "LOCK_binlog_state", 0}, { &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0}, { &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0}, - { &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0} + { &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0}, + { &key_LOCK_ack_receiver, "Ack_receiver::mutex", 0}, + { &key_LOCK_binlog, "LOCK_binlog", 0} }; PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, @@ -1058,7 +1064,8 @@ static PSI_rwlock_info all_server_rwlocks[]= PSI_cond_key key_PAGE_cond, key_COND_active, key_COND_pool; #endif /* HAVE_MMAP */ -PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond, +PSI_cond_key key_BINLOG_COND_xid_list, + key_BINLOG_COND_bin_log_updated, key_BINLOG_COND_relay_log_updated, key_BINLOG_COND_binlog_background_thread, key_BINLOG_COND_binlog_background_thread_end, key_COND_cache_status_changed, key_COND_manager, @@ -1072,9 +1079,10 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond, key_rpl_group_info_sleep_cond, key_TABLE_SHARE_cond, key_user_level_lock_cond, key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache, - key_COND_start_thread, + key_COND_start_thread, key_COND_binlog_send, key_BINLOG_COND_queue_busy; -PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, +PSI_cond_key key_RELAYLOG_COND_relay_log_updated, + key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready, key_COND_wait_commit; PSI_cond_key key_RELAYLOG_COND_queue_busy; PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; @@ -1083,6 +1091,7 @@ PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread, key_COND_parallel_entry, key_COND_group_commit_orderer, key_COND_prepare_ordered, key_COND_slave_background; PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates; +PSI_cond_key key_COND_ack_receiver; static PSI_cond_info all_server_conds[]= { @@ -1095,12 +1104,13 @@ static PSI_cond_info all_server_conds[]= { &key_COND_pool, "TC_LOG_MMAP::COND_pool", 0}, { &key_TC_LOG_MMAP_COND_queue_busy, "TC_LOG_MMAP::COND_queue_busy", 0}, #endif /* HAVE_MMAP */ + { &key_BINLOG_COND_bin_log_updated, "MYSQL_BIN_LOG::COND_bin_log_updated", 0}, { &key_BINLOG_COND_relay_log_updated, "MYSQL_BIN_LOG::COND_relay_log_updated", 0}, { &key_BINLOG_COND_xid_list, "MYSQL_BIN_LOG::COND_xid_list", 0}, - { &key_BINLOG_update_cond, "MYSQL_BIN_LOG::update_cond", 0}, { &key_BINLOG_COND_binlog_background_thread, "MYSQL_BIN_LOG::COND_binlog_background_thread", 0}, { &key_BINLOG_COND_binlog_background_thread_end, "MYSQL_BIN_LOG::COND_binlog_background_thread_end", 0}, { &key_BINLOG_COND_queue_busy, "MYSQL_BIN_LOG::COND_queue_busy", 0}, - { &key_RELAYLOG_update_cond, "MYSQL_RELAY_LOG::update_cond", 0}, + { &key_RELAYLOG_COND_relay_log_updated, "MYSQL_RELAY_LOG::COND_relay_log_updated", 0}, + { &key_RELAYLOG_COND_bin_log_updated, "MYSQL_RELAY_LOG::COND_bin_log_updated", 0}, { &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0}, { &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0}, { &key_COND_wait_commit, "wait_for_commit::COND_wait_commit", 0}, @@ -1135,6 +1145,8 @@ static PSI_cond_info all_server_conds[]= { &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL}, { &key_COND_wait_gtid, "COND_wait_gtid", 0}, { &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}, + { &key_COND_ack_receiver, "Ack_receiver::cond", 0}, + { &key_COND_binlog_send, "COND_binlog_send", 0}, { &key_TABLE_SHARE_COND_rotation, "TABLE_SHARE::COND_rotation", 0} }; @@ -1142,6 +1154,7 @@ PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, key_thread_handle_manager, key_thread_main, key_thread_one_connection, key_thread_signal_hand, key_thread_slave_background, key_rpl_parallel_thread; +PSI_thread_key key_thread_ack_receiver; static PSI_thread_info all_server_threads[]= { @@ -1168,6 +1181,7 @@ static PSI_thread_info all_server_threads[]= { &key_thread_one_connection, "one_connection", 0}, { &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL}, { &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL}, + { &key_thread_ack_receiver, "Ack_receiver", PSI_FLAG_GLOBAL}, { &key_rpl_parallel_thread, "rpl_parallel_thread", 0} }; @@ -1747,6 +1761,7 @@ static void close_connections(void) Events::deinit(); slave_prepare_for_shutdown(); mysql_bin_log.stop_background_thread(); + ack_receiver.stop(); /* Give threads time to die. @@ -2228,7 +2243,9 @@ void clean_up(bool print_message) ha_end(); if (tc_log) tc_log->close(); - delegates_destroy(); +#ifdef HAVE_REPLICATION + semi_sync_master_deinit(); +#endif xid_cache_free(); tdc_deinit(); mdl_destroy(); @@ -4247,10 +4264,12 @@ static int init_common_variables() constructor (called before main()). */ mysql_bin_log.set_psi_keys(key_BINLOG_LOCK_index, - key_BINLOG_update_cond, + key_BINLOG_COND_relay_log_updated, + key_BINLOG_COND_bin_log_updated, key_file_binlog, key_file_binlog_index, - key_BINLOG_COND_queue_busy); + key_BINLOG_COND_queue_busy, + key_LOCK_binlog_end_pos); #endif /* @@ -5143,13 +5162,6 @@ static int init_server_components() xid_cache_init(); - /* - initialize delegates for extension observers, errors have already - been reported in the function - */ - if (delegates_init()) - unireg_abort(1); - /* need to configure logging before initializing storage engines */ if (!opt_bin_log_used && !WSREP_ON) { @@ -5181,6 +5193,13 @@ static int init_server_components() "this server. However this will be ignored as the " "--log-bin option is not defined."); } + + if (repl_semisync_master.init_object() || + repl_semisync_slave.init_object()) + { + sql_print_error("Could not initialize semisync."); + unireg_abort(1); + } #endif if (opt_bin_log) @@ -8271,6 +8290,27 @@ static int show_ssl_get_cipher_list(THD *thd, SHOW_VAR *var, char *buff, return 0; } +#define SHOW_FNAME(name) \ + rpl_semi_sync_master_show_##name + +#define DEF_SHOW_FUNC(name, show_type) \ + static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \ + { \ + repl_semisync_master.set_export_stats(); \ + var->type= show_type; \ + var->value= (char *)&rpl_semi_sync_master_##name; \ + return 0; \ + } + +DEF_SHOW_FUNC(status, SHOW_BOOL) +DEF_SHOW_FUNC(clients, SHOW_LONG) +DEF_SHOW_FUNC(wait_sessions, SHOW_LONG) +DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG) +DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG) +DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG) +DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG) +DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG) +DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG) #ifdef HAVE_YASSL @@ -8589,6 +8629,26 @@ SHOW_VAR status_vars[]= { {"Rows_sent", (char*) offsetof(STATUS_VAR, rows_sent), SHOW_LONGLONG_STATUS}, {"Rows_read", (char*) offsetof(STATUS_VAR, rows_read), SHOW_LONGLONG_STATUS}, {"Rows_tmp_read", (char*) offsetof(STATUS_VAR, rows_tmp_read), SHOW_LONGLONG_STATUS}, +#ifdef HAVE_REPLICATION + {"Rpl_semi_sync_master_status", (char*) &SHOW_FNAME(status), SHOW_FUNC}, + {"Rpl_semi_sync_master_clients", (char*) &SHOW_FNAME(clients), SHOW_FUNC}, + {"Rpl_semi_sync_master_yes_tx", (char*) &rpl_semi_sync_master_yes_transactions, SHOW_LONG}, + {"Rpl_semi_sync_master_no_tx", (char*) &rpl_semi_sync_master_no_transactions, SHOW_LONG}, + {"Rpl_semi_sync_master_wait_sessions", (char*) &SHOW_FNAME(wait_sessions), SHOW_FUNC}, + {"Rpl_semi_sync_master_no_times", (char*) &rpl_semi_sync_master_off_times, SHOW_LONG}, + {"Rpl_semi_sync_master_timefunc_failures", (char*) &rpl_semi_sync_master_timefunc_fails, SHOW_LONG}, + {"Rpl_semi_sync_master_wait_pos_backtraverse", (char*) &rpl_semi_sync_master_wait_pos_backtraverse, SHOW_LONG}, + {"Rpl_semi_sync_master_tx_wait_time", (char*) &SHOW_FNAME(trx_wait_time), SHOW_FUNC}, + {"Rpl_semi_sync_master_tx_waits", (char*) &SHOW_FNAME(trx_wait_num), SHOW_FUNC}, + {"Rpl_semi_sync_master_tx_avg_wait_time", (char*) &SHOW_FNAME(avg_trx_wait_time), SHOW_FUNC}, + {"Rpl_semi_sync_master_net_wait_time", (char*) &SHOW_FNAME(net_wait_time), SHOW_FUNC}, + {"Rpl_semi_sync_master_net_waits", (char*) &SHOW_FNAME(net_wait_num), SHOW_FUNC}, + {"Rpl_semi_sync_master_net_avg_wait_time", (char*) &SHOW_FNAME(avg_net_wait_time), SHOW_FUNC}, + {"Rpl_semi_sync_master_request_ack", (char*) &rpl_semi_sync_master_request_ack, SHOW_LONGLONG}, + {"Rpl_semi_sync_master_get_ack", (char*)&rpl_semi_sync_master_get_ack, SHOW_LONGLONG}, + {"Rpl_semi_sync_slave_status", (char*) &rpl_semi_sync_slave_status, SHOW_BOOL}, + {"Rpl_semi_sync_slave_send_ack", (char*) &rpl_semi_sync_slave_send_ack, SHOW_LONGLONG}, +#endif /* HAVE_REPLICATION */ #ifdef HAVE_QUERY_CACHE {"Qcache_free_blocks", (char*) &query_cache.free_memory_blocks, SHOW_LONG_NOFLUSH}, {"Qcache_free_memory", (char*) &query_cache.free_memory, SHOW_LONG_NOFLUSH}, @@ -10329,6 +10389,10 @@ PSI_stage_info stage_waiting_for_insert= { 0, "Waiting for INSERT", 0}; PSI_stage_info stage_waiting_for_master_to_send_event= { 0, "Waiting for master to send event", 0}; PSI_stage_info stage_waiting_for_master_update= { 0, "Waiting for master update", 0}; PSI_stage_info stage_waiting_for_relay_log_space= { 0, "Waiting for the slave SQL thread to free enough relay log space", 0}; +PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave= +{ 0, "Waiting for semi-sync ACK from slave", 0}; +PSI_stage_info stage_waiting_for_semi_sync_slave={ 0, "Waiting for semi-sync slave connection", 0}; +PSI_stage_info stage_reading_semi_sync_ack={ 0, "Reading semi-sync ACK from slave", 0}; PSI_stage_info stage_waiting_for_slave_mutex_on_exit= { 0, "Waiting for slave mutex on exit", 0}; PSI_stage_info stage_waiting_for_slave_thread_to_start= { 0, "Waiting for slave thread to start", 0}; PSI_stage_info stage_waiting_for_table_flush= { 0, "Waiting for table flush", 0}; @@ -10489,6 +10553,9 @@ PSI_stage_info *all_server_stages[]= & stage_gtid_wait_other_connection, & stage_slave_background_process_request, & stage_slave_background_wait_request, + & stage_waiting_for_semi_sync_ack_from_slave, + & stage_waiting_for_semi_sync_slave, + & stage_reading_semi_sync_ack, & stage_waiting_for_deadlock_kill }; diff --git a/sql/mysqld.h b/sql/mysqld.h index 2463f569c94..a5857394b7a 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -329,7 +329,7 @@ extern PSI_mutex_key key_LOCK_des_key_file; extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_BINLOG_LOCK_binlog_background_thread, - m_key_LOCK_binlog_end_pos, + key_LOCK_binlog_end_pos, key_delayed_insert_mutex, key_hash_filo_lock, key_LOCK_active_mi, key_LOCK_connection_count, key_LOCK_crypt, key_LOCK_delayed_create, key_LOCK_delayed_insert, key_LOCK_delayed_status, key_LOCK_error_log, @@ -349,6 +349,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_LOCK_start_thread, key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; extern PSI_mutex_key key_RELAYLOG_LOCK_index; +extern PSI_mutex_key key_LOCK_relaylog_end_pos; extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state, key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry; @@ -383,7 +384,8 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond, key_TABLE_SHARE_cond, key_user_level_lock_cond, key_COND_start_thread, key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache; -extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, +extern PSI_cond_key key_RELAYLOG_COND_relay_log_updated, + key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready, key_COND_wait_commit; extern PSI_cond_key key_RELAYLOG_COND_queue_busy; extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; diff --git a/sql/replication.h b/sql/replication.h index 4731c2246ef..d8672310110 100644 --- a/sql/replication.h +++ b/sql/replication.h @@ -18,16 +18,14 @@ /*************************************************************************** NOTE: plugin locking. - This API was created specifically for the semisync plugin and its locking - logic is also matches semisync plugin usage pattern. In particular, a plugin - is locked on Binlog_transmit_observer::transmit_start and is unlocked after - Binlog_transmit_observer::transmit_stop. All other master observable events - happen between these two and don't lock the plugin at all. This works well - for the semisync_master plugin. + + The plugin is locked on Binlog_transmit_observer::transmit_start and is + unlocked after Binlog_transmit_observer::transmit_stop. All other + master observable events happen between these two and don't lock the + plugin at all. Also a plugin is locked on Binlog_relay_IO_observer::thread_start - and unlocked after Binlog_relay_IO_observer::thread_stop. This works well for - the semisync_slave plugin. + and unlocked after Binlog_relay_IO_observer::thread_stop. ***************************************************************************/ #include <mysql.h> diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc deleted file mode 100644 index e3ff2a17a6a..00000000000 --- a/sql/rpl_handler.cc +++ /dev/null @@ -1,553 +0,0 @@ -/* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - -#include "mariadb.h" -#include "sql_priv.h" -#include "unireg.h" - -#include "rpl_mi.h" -#include "sql_repl.h" -#include "log_event.h" -#include "rpl_filter.h" -#include <my_dir.h> -#include "rpl_handler.h" - -Trans_delegate *transaction_delegate; -Binlog_storage_delegate *binlog_storage_delegate; -#ifdef HAVE_REPLICATION -Binlog_transmit_delegate *binlog_transmit_delegate; -Binlog_relay_IO_delegate *binlog_relay_io_delegate; -#endif /* HAVE_REPLICATION */ - -/* - structure to save transaction log filename and position -*/ -typedef struct Trans_binlog_info { - my_off_t log_pos; - char log_file[FN_REFLEN]; -} Trans_binlog_info; - -int get_user_var_int(const char *name, - long long int *value, int *null_value) -{ - bool null_val; - user_var_entry *entry= - (user_var_entry*) my_hash_search(¤t_thd->user_vars, - (uchar*) name, strlen(name)); - if (!entry) - return 1; - *value= entry->val_int(&null_val); - if (null_value) - *null_value= null_val; - return 0; -} - -int get_user_var_real(const char *name, - double *value, int *null_value) -{ - bool null_val; - user_var_entry *entry= - (user_var_entry*) my_hash_search(¤t_thd->user_vars, - (uchar*) name, strlen(name)); - if (!entry) - return 1; - *value= entry->val_real(&null_val); - if (null_value) - *null_value= null_val; - return 0; -} - -int get_user_var_str(const char *name, char *value, - size_t len, unsigned int precision, int *null_value) -{ - String str; - bool null_val; - user_var_entry *entry= - (user_var_entry*) my_hash_search(¤t_thd->user_vars, - (uchar*) name, strlen(name)); - if (!entry) - return 1; - entry->val_str(&null_val, &str, precision); - strncpy(value, str.c_ptr(), len); - if (null_value) - *null_value= null_val; - return 0; -} - -int delegates_init() -{ - static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem; - static my_aligned_storage<sizeof(Binlog_storage_delegate), - MY_ALIGNOF(long)> storage_mem; -#ifdef HAVE_REPLICATION - static my_aligned_storage<sizeof(Binlog_transmit_delegate), - MY_ALIGNOF(long)> transmit_mem; - static my_aligned_storage<sizeof(Binlog_relay_IO_delegate), - MY_ALIGNOF(long)> relay_io_mem; -#endif - - void *place_trans_mem= trans_mem.data; - void *place_storage_mem= storage_mem.data; - - transaction_delegate= new (place_trans_mem) Trans_delegate; - - if (!transaction_delegate->is_inited()) - { - sql_print_error("Initialization of transaction delegates failed. " - "Please report a bug."); - return 1; - } - - binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate; - - if (!binlog_storage_delegate->is_inited()) - { - sql_print_error("Initialization binlog storage delegates failed. " - "Please report a bug."); - return 1; - } - -#ifdef HAVE_REPLICATION - void *place_transmit_mem= transmit_mem.data; - void *place_relay_io_mem= relay_io_mem.data; - - binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate; - - if (!binlog_transmit_delegate->is_inited()) - { - sql_print_error("Initialization of binlog transmit delegates failed. " - "Please report a bug."); - return 1; - } - - binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate; - - if (!binlog_relay_io_delegate->is_inited()) - { - sql_print_error("Initialization binlog relay IO delegates failed. " - "Please report a bug."); - return 1; - } -#endif - - return 0; -} - -void delegates_destroy() -{ - if (transaction_delegate) - transaction_delegate->~Trans_delegate(); - if (binlog_storage_delegate) - binlog_storage_delegate->~Binlog_storage_delegate(); -#ifdef HAVE_REPLICATION - if (binlog_transmit_delegate) - binlog_transmit_delegate->~Binlog_transmit_delegate(); - if (binlog_relay_io_delegate) - binlog_relay_io_delegate->~Binlog_relay_IO_delegate(); -#endif /* HAVE_REPLICATION */ -} - -/* - This macro is used by almost all the Delegate methods to iterate - over all the observers running given callback function of the - delegate. - */ -#define FOREACH_OBSERVER(r, f, do_lock, args) \ - param.server_id= thd->variables.server_id; \ - read_lock(); \ - Observer_info_iterator iter= observer_info_iter(); \ - Observer_info *info= iter++; \ - for (; info; info= iter++) \ - { \ - if (do_lock) plugin_lock(thd, plugin_int_to_ref(info->plugin_int)); \ - if (((Observer *)info->observer)->f \ - && ((Observer *)info->observer)->f args) \ - { \ - r= 1; \ - sql_print_error("Run function '" #f "' in plugin '%s' failed", \ - info->plugin_int->name.str); \ - break; \ - } \ - } \ - unlock(); - - -int Trans_delegate::after_commit(THD *thd, bool all) -{ - Trans_param param; - Trans_binlog_info *log_info; - bool is_real_trans= (all || thd->transaction.all.ha_list == 0); - int ret= 0; - - param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0; - - log_info= thd->semisync_info; - - param.log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0; - param.log_pos= log_info ? log_info->log_pos : 0; - - FOREACH_OBSERVER(ret, after_commit, false, (¶m)); - - /* - This is the end of a real transaction or autocommit statement, we - can mark the memory unused. - */ - if (is_real_trans && log_info) - { - log_info->log_file[0]= 0; - log_info->log_pos= 0; - } - return ret; -} - -int Trans_delegate::after_rollback(THD *thd, bool all) -{ - Trans_param param; - Trans_binlog_info *log_info; - bool is_real_trans= (all || thd->transaction.all.ha_list == 0); - int ret= 0; - - param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0; - - log_info= thd->semisync_info; - - param.log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0; - param.log_pos= log_info ? log_info->log_pos : 0; - - FOREACH_OBSERVER(ret, after_rollback, false, (¶m)); - - /* - This is the end of a real transaction or autocommit statement, we - can mark the memory unused. - */ - if (is_real_trans && log_info) - { - log_info->log_file[0]= 0; - log_info->log_pos= 0; - } - return ret; -} - -int Binlog_storage_delegate::after_flush(THD *thd, - const char *log_file, - my_off_t log_pos, - bool synced, - bool first_in_group, - bool last_in_group) -{ - Binlog_storage_param param; - Trans_binlog_info *log_info; - uint32 flags=0; - int ret= 0; - - if (synced) - flags |= BINLOG_STORAGE_IS_SYNCED; - if (first_in_group) - flags|= BINLOG_GROUP_COMMIT_LEADER; - if (last_in_group) - flags|= BINLOG_GROUP_COMMIT_TRAILER; - - if (!(log_info= thd->semisync_info)) - { - if(!(log_info= - (Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0)))) - return 1; - thd->semisync_info= log_info; - } - - strmake_buf(log_info->log_file, log_file+dirname_length(log_file)); - log_info->log_pos = log_pos; - - FOREACH_OBSERVER(ret, after_flush, false, - (¶m, log_info->log_file, log_info->log_pos, flags)); - return ret; -} - -int Binlog_storage_delegate::after_sync(THD *thd, - const char *log_file, - my_off_t log_pos, - bool first_in_group, - bool last_in_group) -{ - Binlog_storage_param param; - uint32 flags=0; - - if (first_in_group) - flags|= BINLOG_GROUP_COMMIT_LEADER; - if (last_in_group) - flags|= BINLOG_GROUP_COMMIT_TRAILER; - - int ret= 0; - FOREACH_OBSERVER(ret, after_sync, false, - (¶m, log_file+dirname_length(log_file), log_pos, flags)); - - return ret; -} - -#ifdef HAVE_REPLICATION -int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags, - const char *log_file, - my_off_t log_pos) -{ - Binlog_transmit_param param; - param.flags= flags; - - int ret= 0; - FOREACH_OBSERVER(ret, transmit_start, true, (¶m, log_file, log_pos)); - return ret; -} - -int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags) -{ - Binlog_transmit_param param; - param.flags= flags; - - int ret= 0; - FOREACH_OBSERVER(ret, transmit_stop, false, (¶m)); - return ret; -} - -int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags, - String *packet) -{ - /* NOTE2ME: Maximum extra header size for each observer, I hope 32 - bytes should be enough for each Observer to reserve their extra - header. If later found this is not enough, we can increase this - /HEZX - */ -#define RESERVE_HEADER_SIZE 32 - unsigned char header[RESERVE_HEADER_SIZE]; - ulong hlen; - Binlog_transmit_param param; - param.flags= flags; - param.server_id= thd->variables.server_id; - - int ret= 0; - read_lock(); - Observer_info_iterator iter= observer_info_iter(); - Observer_info *info= iter++; - for (; info; info= iter++) - { - hlen= 0; - if (((Observer *)info->observer)->reserve_header - && ((Observer *)info->observer)->reserve_header(¶m, - header, - RESERVE_HEADER_SIZE, - &hlen)) - { - ret= 1; - break; - } - if (hlen == 0) - continue; - if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen)) - { - ret= 1; - break; - } - } - unlock(); - return ret; -} - -int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags, - String *packet, - const char *log_file, - my_off_t log_pos) -{ - Binlog_transmit_param param; - param.flags= flags; - - int ret= 0; - FOREACH_OBSERVER(ret, before_send_event, false, - (¶m, (uchar *)packet->c_ptr(), - packet->length(), - log_file+dirname_length(log_file), log_pos)); - return ret; -} - -int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags, - String *packet) -{ - Binlog_transmit_param param; - param.flags= flags; - - int ret= 0; - FOREACH_OBSERVER(ret, after_send_event, false, - (¶m, packet->c_ptr(), packet->length())); - return ret; -} - -int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags) - -{ - Binlog_transmit_param param; - param.flags= flags; - - int ret= 0; - FOREACH_OBSERVER(ret, after_reset_master, false, (¶m)); - return ret; -} - -void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param, - Master_info *mi) -{ - param->mysql= mi->mysql; - param->user= mi->user; - param->host= mi->host; - param->port= mi->port; - param->master_log_name= mi->master_log_name; - param->master_log_pos= mi->master_log_pos; -} - -int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi) -{ - Binlog_relay_IO_param param; - init_param(¶m, mi); - - int ret= 0; - FOREACH_OBSERVER(ret, thread_start, true, (¶m)); - return ret; -} - - -int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi) -{ - - Binlog_relay_IO_param param; - init_param(¶m, mi); - - int ret= 0; - FOREACH_OBSERVER(ret, thread_stop, false, (¶m)); - return ret; -} - -int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, - Master_info *mi, - ushort flags) -{ - Binlog_relay_IO_param param; - init_param(¶m, mi); - - int ret= 0; - FOREACH_OBSERVER(ret, before_request_transmit, false, (¶m, (uint32)flags)); - return ret; -} - -int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi, - const char *packet, ulong len, - const char **event_buf, - ulong *event_len) -{ - Binlog_relay_IO_param param; - init_param(¶m, mi); - - int ret= 0; - FOREACH_OBSERVER(ret, after_read_event, false, - (¶m, packet, len, event_buf, event_len)); - return ret; -} - -int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi, - const char *event_buf, - ulong event_len, - bool synced) -{ - Binlog_relay_IO_param param; - init_param(¶m, mi); - - uint32 flags=0; - if (synced) - flags |= BINLOG_STORAGE_IS_SYNCED; - - int ret= 0; - FOREACH_OBSERVER(ret, after_queue_event, false, - (¶m, event_buf, event_len, flags)); - return ret; -} - -int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi) - -{ - Binlog_relay_IO_param param; - init_param(¶m, mi); - - int ret= 0; - FOREACH_OBSERVER(ret, after_reset_slave, false, (¶m)); - return ret; -} -#endif /* HAVE_REPLICATION */ - -int register_trans_observer(Trans_observer *observer, void *p) -{ - return transaction_delegate->add_observer(observer, (st_plugin_int *)p); -} - -int unregister_trans_observer(Trans_observer *observer, void *p) -{ - return transaction_delegate->remove_observer(observer, (st_plugin_int *)p); -} - -int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p) -{ - return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p); -} - -int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p) -{ - return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p); -} - -#ifdef HAVE_REPLICATION -int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) -{ - return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p); -} - -int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) -{ - return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p); -} - -int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) -{ - return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p); -} - -int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) -{ - return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p); -} -#else -int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) -{ - return 0; -} - -int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) -{ - return 0; -} - -int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) -{ - return 0; -} - -int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) -{ - return 0; -} -#endif /* HAVE_REPLICATION */ diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h deleted file mode 100644 index afcfd9d55b1..00000000000 --- a/sql/rpl_handler.h +++ /dev/null @@ -1,216 +0,0 @@ -/* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved. - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - -#ifndef RPL_HANDLER_H -#define RPL_HANDLER_H - -#include "sql_priv.h" -#include "rpl_mi.h" -#include "rpl_rli.h" -#include "sql_plugin.h" -#include "replication.h" - -class Observer_info { -public: - void *observer; - st_plugin_int *plugin_int; - - Observer_info(void *ob, st_plugin_int *p) - :observer(ob), plugin_int(p) - { } -}; - -class Delegate { -public: - typedef List<Observer_info> Observer_info_list; - typedef List_iterator<Observer_info> Observer_info_iterator; - - int add_observer(void *observer, st_plugin_int *plugin) - { - int ret= FALSE; - if (!inited) - return TRUE; - write_lock(); - Observer_info_iterator iter(observer_info_list); - Observer_info *info= iter++; - while (info && info->observer != observer) - info= iter++; - if (!info) - { - info= new Observer_info(observer, plugin); - if (!info || observer_info_list.push_back(info, &memroot)) - ret= TRUE; - } - else - ret= TRUE; - unlock(); - return ret; - } - - int remove_observer(void *observer, st_plugin_int *plugin) - { - int ret= FALSE; - if (!inited) - return TRUE; - write_lock(); - Observer_info_iterator iter(observer_info_list); - Observer_info *info= iter++; - while (info && info->observer != observer) - info= iter++; - if (info) - { - iter.remove(); - delete info; - } - else - ret= TRUE; - unlock(); - return ret; - } - - inline Observer_info_iterator observer_info_iter() - { - return Observer_info_iterator(observer_info_list); - } - - inline bool is_empty() - { - return observer_info_list.is_empty(); - } - - inline int read_lock() - { - if (!inited) - return TRUE; - return rw_rdlock(&lock); - } - - inline int write_lock() - { - if (!inited) - return TRUE; - return rw_wrlock(&lock); - } - - inline int unlock() - { - if (!inited) - return TRUE; - return rw_unlock(&lock); - } - - inline bool is_inited() - { - return inited; - } - - Delegate() - { - inited= FALSE; - if (my_rwlock_init(&lock, NULL)) - return; - init_sql_alloc(&memroot, 1024, 0, MYF(0)); - inited= TRUE; - } - ~Delegate() - { - inited= FALSE; - rwlock_destroy(&lock); - free_root(&memroot, MYF(0)); - } - -private: - Observer_info_list observer_info_list; - rw_lock_t lock; - MEM_ROOT memroot; - bool inited; -}; - -class Trans_delegate - :public Delegate { -public: - typedef Trans_observer Observer; - int before_commit(THD *thd, bool all); - int before_rollback(THD *thd, bool all); - int after_commit(THD *thd, bool all); - int after_rollback(THD *thd, bool all); -}; - -class Binlog_storage_delegate - :public Delegate { -public: - typedef Binlog_storage_observer Observer; - int after_flush(THD *thd, const char *log_file, - my_off_t log_pos, bool synced, - bool first_in_group, bool last_in_group); - int after_sync(THD *thd, const char *log_file, my_off_t log_pos, - bool first_in_group, bool last_in_group); -}; - -#ifdef HAVE_REPLICATION -class Binlog_transmit_delegate - :public Delegate { -public: - typedef Binlog_transmit_observer Observer; - int transmit_start(THD *thd, ushort flags, - const char *log_file, my_off_t log_pos); - int transmit_stop(THD *thd, ushort flags); - int reserve_header(THD *thd, ushort flags, String *packet); - int before_send_event(THD *thd, ushort flags, - String *packet, const - char *log_file, my_off_t log_pos ); - int after_send_event(THD *thd, ushort flags, - String *packet); - int after_reset_master(THD *thd, ushort flags); -}; - -class Binlog_relay_IO_delegate - :public Delegate { -public: - typedef Binlog_relay_IO_observer Observer; - int thread_start(THD *thd, Master_info *mi); - int thread_stop(THD *thd, Master_info *mi); - int before_request_transmit(THD *thd, Master_info *mi, ushort flags); - int after_read_event(THD *thd, Master_info *mi, - const char *packet, ulong len, - const char **event_buf, ulong *event_len); - int after_queue_event(THD *thd, Master_info *mi, - const char *event_buf, ulong event_len, - bool synced); - int after_reset_slave(THD *thd, Master_info *mi); -private: - void init_param(Binlog_relay_IO_param *param, Master_info *mi); -}; -#endif /* HAVE_REPLICATION */ - -int delegates_init(); -void delegates_destroy(); - -extern Trans_delegate *transaction_delegate; -extern Binlog_storage_delegate *binlog_storage_delegate; -#ifdef HAVE_REPLICATION -extern Binlog_transmit_delegate *binlog_transmit_delegate; -extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; -#endif /* HAVE_REPLICATION */ - -/* - if there is no observers in the delegate, we can return 0 - immediately. -*/ -#define RUN_HOOK(group, hook, args) \ - (group ##_delegate->is_empty() ? \ - 0 : group ##_delegate->hook args) - -#endif /* RPL_HANDLER_H */ diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index 610bc77b683..14d74dc4bb7 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -311,6 +311,11 @@ class Master_info : public Slave_reporting_capability /* The parallel replication mode. */ enum_slave_parallel_mode parallel_mode; + /* + semi_ack is used to identify if the current binlog event needs an + ACK from slave, or if delay_master is enabled. + */ + int semi_ack; }; int init_master_info(Master_info* mi, const char* master_info_fname, diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 4a6e813d73b..9ebd19a90e2 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -255,10 +255,8 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err) rgi->rli->abort_slave= true; rgi->rli->stop_for_until= false; mysql_mutex_lock(rgi->rli->relay_log.get_log_lock()); + rgi->rli->relay_log.signal_relay_log_update(); mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock()); - rgi->rli->relay_log.lock_binlog_end_pos(); - rgi->rli->relay_log.signal_update(); - rgi->rli->relay_log.unlock_binlog_end_pos(); } @@ -823,7 +821,7 @@ do_retry: for (;;) { old_offset= cur_offset; - ev= Log_event::read_log_event(&rlog, 0, description_event, + ev= Log_event::read_log_event(&rlog, description_event, opt_slave_sql_verify_checksum); cur_offset= my_b_tell(&rlog); diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index efb256fbe11..321eef97700 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -70,10 +70,12 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) relay_log_state.init(); #ifdef HAVE_PSI_INTERFACE relay_log.set_psi_keys(key_RELAYLOG_LOCK_index, - key_RELAYLOG_update_cond, + key_RELAYLOG_COND_relay_log_updated, + key_RELAYLOG_COND_bin_log_updated, key_file_relaylog, key_file_relaylog_index, - key_RELAYLOG_COND_queue_busy); + key_RELAYLOG_COND_queue_busy, + key_LOCK_relaylog_end_pos); #endif group_relay_log_name[0]= event_relay_log_name[0]= @@ -538,7 +540,7 @@ read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos, if (my_b_tell(cur_log) >= start_pos) break; - if (!(ev= Log_event::read_log_event(cur_log, 0, fdev, + if (!(ev= Log_event::read_log_event(cur_log, fdev, opt_slave_sql_verify_checksum))) { DBUG_PRINT("info",("could not read event, cur_log->error=%d", diff --git a/sql/semisync.cc b/sql/semisync.cc new file mode 100644 index 00000000000..a8a11f091db --- /dev/null +++ b/sql/semisync.cc @@ -0,0 +1,32 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (C) 2008 MySQL AB + Use is subject to license terms + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#include <my_global.h> +#include "semisync.h" + +const unsigned char Repl_semi_sync_base::k_packet_magic_num= 0xef; +const unsigned char Repl_semi_sync_base::k_packet_flag_sync= 0x01; + + +const unsigned long Trace::k_trace_general= 0x0001; +const unsigned long Trace::k_trace_detail= 0x0010; +const unsigned long Trace::k_trace_net_wait= 0x0020; +const unsigned long Trace::k_trace_function= 0x0040; + +const unsigned char Repl_semi_sync_base::k_sync_header[2]= + {Repl_semi_sync_base::k_packet_magic_num, 0}; diff --git a/sql/semisync.h b/sql/semisync.h new file mode 100644 index 00000000000..9deb6c5fd01 --- /dev/null +++ b/sql/semisync.h @@ -0,0 +1,73 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#ifndef SEMISYNC_H +#define SEMISYNC_H + +#include "mysqld.h" +#include "log_event.h" +#include "replication.h" + +/** + This class is used to trace function calls and other process + information +*/ +class Trace { +public: + static const unsigned long k_trace_function; + static const unsigned long k_trace_general; + static const unsigned long k_trace_detail; + static const unsigned long k_trace_net_wait; + + unsigned long m_trace_level; /* the level for tracing */ + + Trace() + :m_trace_level(0L) + {} + Trace(unsigned long trace_level) + :m_trace_level(trace_level) + {} +}; + +/** + Base class for semi-sync master and slave classes +*/ +class Repl_semi_sync_base + :public Trace { +public: + static const unsigned char k_sync_header[2]; /* three byte packet header */ + + /* Constants in network packet header. */ + static const unsigned char k_packet_magic_num; + static const unsigned char k_packet_flag_sync; +}; + +/* The layout of a semisync slave reply packet: + 1 byte for the magic num + 8 bytes for the binlog positon + n bytes for the binlog filename, terminated with a '\0' +*/ +#define REPLY_MAGIC_NUM_LEN 1 +#define REPLY_BINLOG_POS_LEN 8 +#define REPLY_BINLOG_NAME_LEN (FN_REFLEN + 1) +#define REPLY_MAGIC_NUM_OFFSET 0 +#define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN) +#define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN) +#define REPLY_MESSAGE_MAX_LENGTH \ + (REPLY_MAGIC_NUM_LEN + REPLY_BINLOG_POS_LEN + REPLY_BINLOG_NAME_LEN) + +#endif /* SEMISYNC_H */ diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc new file mode 100644 index 00000000000..99d8b75ece8 --- /dev/null +++ b/sql/semisync_master.cc @@ -0,0 +1,1352 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (c) 2008, 2013, Oracle and/or its affiliates. + Copyright (c) 2011, 2016, MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#include <my_global.h> +#include "semisync_master.h" + +#define TIME_THOUSAND 1000 +#define TIME_MILLION 1000000 +#define TIME_BILLION 1000000000 + +/* This indicates whether semi-synchronous replication is enabled. */ +my_bool rpl_semi_sync_master_enabled= 0; +unsigned long long rpl_semi_sync_master_request_ack = 0; +unsigned long long rpl_semi_sync_master_get_ack = 0; +my_bool rpl_semi_sync_master_wait_no_slave = 1; +my_bool rpl_semi_sync_master_status = 0; +ulong rpl_semi_sync_master_wait_point = + SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT; +ulong rpl_semi_sync_master_timeout; +ulong rpl_semi_sync_master_trace_level; +ulong rpl_semi_sync_master_yes_transactions = 0; +ulong rpl_semi_sync_master_no_transactions = 0; +ulong rpl_semi_sync_master_off_times = 0; +ulong rpl_semi_sync_master_timefunc_fails = 0; +ulong rpl_semi_sync_master_wait_timeouts = 0; +ulong rpl_semi_sync_master_wait_sessions = 0; +ulong rpl_semi_sync_master_wait_pos_backtraverse = 0; +ulong rpl_semi_sync_master_avg_trx_wait_time = 0; +ulonglong rpl_semi_sync_master_trx_wait_num = 0; +ulong rpl_semi_sync_master_avg_net_wait_time = 0; +ulonglong rpl_semi_sync_master_net_wait_num = 0; +ulong rpl_semi_sync_master_clients = 0; +ulonglong rpl_semi_sync_master_net_wait_time = 0; +ulonglong rpl_semi_sync_master_trx_wait_time = 0; + +Repl_semi_sync_master repl_semisync_master; +Ack_receiver ack_receiver; + +/* + structure to save transaction log filename and position +*/ +typedef struct Trans_binlog_info { + my_off_t log_pos; + char log_file[FN_REFLEN]; +} Trans_binlog_info; + +static int get_wait_time(const struct timespec& start_ts); + +static ulonglong timespec_to_usec(const struct timespec *ts) +{ + return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND; +} + +/******************************************************************************* + * + * <Active_tranx> class : manage all active transaction nodes + * + ******************************************************************************/ + +Active_tranx::Active_tranx(mysql_mutex_t *lock, + ulong trace_level) + : Trace(trace_level), m_allocator(max_connections), + m_num_entries(max_connections << 1), /* Transaction hash table size + * is set to double the size + * of max_connections */ + m_lock(lock) +{ + /* No transactions are in the list initially. */ + m_trx_front = NULL; + m_trx_rear = NULL; + + /* Create the hash table to find a transaction's ending event. */ + m_trx_htb = new Tranx_node *[m_num_entries]; + for (int idx = 0; idx < m_num_entries; ++idx) + m_trx_htb[idx] = NULL; + + sql_print_information("Semi-sync replication initialized for transactions."); +} + +Active_tranx::~Active_tranx() +{ + delete [] m_trx_htb; + m_trx_htb = NULL; + m_num_entries = 0; +} + +unsigned int Active_tranx::calc_hash(const unsigned char *key, + unsigned int length) +{ + unsigned int nr = 1, nr2 = 4; + + /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */ + while (length--) + { + nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8); + nr2 += 3; + } + return((unsigned int) nr); +} + +unsigned int Active_tranx::get_hash_value(const char *log_file_name, + my_off_t log_file_pos) +{ + unsigned int hash1 = calc_hash((const unsigned char *)log_file_name, + strlen(log_file_name)); + unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos), + sizeof(log_file_pos)); + + return (hash1 + hash2) % m_num_entries; +} + +int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1, + const char *log_file_name2, my_off_t log_file_pos2) +{ + int cmp = strcmp(log_file_name1, log_file_name2); + + if (cmp != 0) + return cmp; + + if (log_file_pos1 > log_file_pos2) + return 1; + else if (log_file_pos1 < log_file_pos2) + return -1; + return 0; +} + +int Active_tranx::insert_tranx_node(const char *log_file_name, + my_off_t log_file_pos) +{ + Tranx_node *ins_node; + int result = 0; + unsigned int hash_val; + + DBUG_ENTER("Active_tranx:insert_tranx_node"); + + ins_node = m_allocator.allocate_node(); + if (!ins_node) + { + sql_print_error("%s: transaction node allocation failed for: (%s, %lu)", + "Active_tranx:insert_tranx_node", + log_file_name, (ulong)log_file_pos); + result = -1; + goto l_end; + } + + /* insert the binlog position in the active transaction list. */ + strncpy(ins_node->log_name, log_file_name, FN_REFLEN-1); + ins_node->log_name[FN_REFLEN-1] = 0; /* make sure it ends properly */ + ins_node->log_pos = log_file_pos; + + if (!m_trx_front) + { + /* The list is empty. */ + m_trx_front = m_trx_rear = ins_node; + } + else + { + int cmp = compare(ins_node, m_trx_rear); + if (cmp > 0) + { + /* Compare with the tail first. If the transaction happens later in + * binlog, then make it the new tail. + */ + m_trx_rear->next = ins_node; + m_trx_rear = ins_node; + } + else + { + /* Otherwise, it is an error because the transaction should hold the + * mysql_bin_log.LOCK_log when appending events. + */ + sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), " + "new node (%s, %lu)", "Active_tranx:insert_tranx_node", + m_trx_rear->log_name, (ulong)m_trx_rear->log_pos, + ins_node->log_name, (ulong)ins_node->log_pos); + result = -1; + goto l_end; + } + } + + hash_val = get_hash_value(ins_node->log_name, ins_node->log_pos); + ins_node->hash_next = m_trx_htb[hash_val]; + m_trx_htb[hash_val] = ins_node; + + DBUG_PRINT("semisync", ("%s: insert (%s, %lu) in entry(%u)", + "Active_tranx:insert_tranx_node", + ins_node->log_name, (ulong)ins_node->log_pos, + hash_val)); + l_end: + + DBUG_RETURN(result); +} + +bool Active_tranx::is_tranx_end_pos(const char *log_file_name, + my_off_t log_file_pos) +{ + DBUG_ENTER("Active_tranx::is_tranx_end_pos"); + + unsigned int hash_val = get_hash_value(log_file_name, log_file_pos); + Tranx_node *entry = m_trx_htb[hash_val]; + + while (entry != NULL) + { + if (compare(entry, log_file_name, log_file_pos) == 0) + break; + + entry = entry->hash_next; + } + + DBUG_PRINT("semisync", ("%s: probe (%s, %lu) in entry(%u)", + "Active_tranx::is_tranx_end_pos", + log_file_name, (ulong)log_file_pos, hash_val)); + + DBUG_RETURN(entry != NULL); +} + +int Active_tranx::clear_active_tranx_nodes(const char *log_file_name, + my_off_t log_file_pos) +{ + Tranx_node *new_front; + + DBUG_ENTER("Active_tranx::::clear_active_tranx_nodes"); + + if (log_file_name != NULL) + { + new_front = m_trx_front; + + while (new_front) + { + if (compare(new_front, log_file_name, log_file_pos) > 0) + break; + new_front = new_front->next; + } + } + else + { + /* If log_file_name is NULL, clear everything. */ + new_front = NULL; + } + + if (new_front == NULL) + { + /* No active transaction nodes after the call. */ + + /* Clear the hash table. */ + memset(m_trx_htb, 0, m_num_entries * sizeof(Tranx_node *)); + m_allocator.free_all_nodes(); + + /* Clear the active transaction list. */ + if (m_trx_front != NULL) + { + m_trx_front = NULL; + m_trx_rear = NULL; + } + + DBUG_PRINT("semisync", ("%s: cleared all nodes", + "Active_tranx::::clear_active_tranx_nodes")); + } + else if (new_front != m_trx_front) + { + Tranx_node *curr_node, *next_node; + + /* Delete all transaction nodes before the confirmation point. */ + int n_frees = 0; + curr_node = m_trx_front; + while (curr_node != new_front) + { + next_node = curr_node->next; + n_frees++; + + /* Remove the node from the hash table. */ + unsigned int hash_val = get_hash_value(curr_node->log_name, curr_node->log_pos); + Tranx_node **hash_ptr = &(m_trx_htb[hash_val]); + while ((*hash_ptr) != NULL) + { + if ((*hash_ptr) == curr_node) + { + (*hash_ptr) = curr_node->hash_next; + break; + } + hash_ptr = &((*hash_ptr)->hash_next); + } + + curr_node = next_node; + } + + m_trx_front = new_front; + m_allocator.free_nodes_before(m_trx_front); + + DBUG_PRINT("semisync", ("%s: cleared %d nodes back until pos (%s, %lu)", + "Active_tranx::::clear_active_tranx_nodes", + n_frees, + m_trx_front->log_name, (ulong)m_trx_front->log_pos)); + } + + DBUG_RETURN(0); +} + + +/******************************************************************************* + * + * <Repl_semi_sync_master> class: the basic code layer for syncsync master. + * <Repl_semi_sync_slave> class: the basic code layer for syncsync slave. + * + * The most important functions during semi-syn replication listed: + * + * Master: + * . report_reply_binlog(): called by the binlog dump thread when it receives + * the slave's status information. + * . update_sync_header(): based on transaction waiting information, decide + * whether to request the slave to reply. + * . write_tranx_in_binlog(): called by the transaction thread when it finishes + * writing all transaction events in binlog. + * . commit_trx(): transaction thread wait for the slave reply. + * + * Slave: + * . slave_read_sync_header(): read the semi-sync header from the master, get + * the sync status and get the payload for events. + * . slave_reply(): reply to the master about the replication progress. + * + ******************************************************************************/ + +Repl_semi_sync_master::Repl_semi_sync_master() + : m_active_tranxs(NULL), + m_init_done(false), + m_reply_file_name_inited(false), + m_reply_file_pos(0L), + m_wait_file_name_inited(false), + m_wait_file_pos(0), + m_master_enabled(false), + m_wait_timeout(0L), + m_state(0), + m_wait_point(0) +{ + strcpy(m_reply_file_name, ""); + strcpy(m_wait_file_name, ""); +} + +int Repl_semi_sync_master::init_object() +{ + int result; + + m_init_done = true; + + /* References to the parameter works after set_options(). */ + set_wait_timeout(rpl_semi_sync_master_timeout); + set_trace_level(rpl_semi_sync_master_trace_level); + set_wait_point(rpl_semi_sync_master_wait_point); + + /* Mutex initialization can only be done after MY_INIT(). */ + mysql_mutex_init(key_LOCK_binlog, + &LOCK_binlog, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_binlog_send, + &COND_binlog_send, NULL); + + if (rpl_semi_sync_master_enabled) + { + result = enable_master(); + if (!result) + result= ack_receiver.start(); /* Start the ACK thread. */ + } + else + { + result = disable_master(); + } + + /* + If rpl_semi_sync_master_wait_no_slave is disabled, let's temporarily + switch off semisync to avoid hang if there's none active slave. + */ + if (!rpl_semi_sync_master_wait_no_slave) + switch_off(); + + return result; +} + +int Repl_semi_sync_master::enable_master() +{ + int result = 0; + + /* Must have the lock when we do enable of disable. */ + lock(); + + if (!get_master_enabled()) + { + m_active_tranxs = new Active_tranx(&LOCK_binlog, m_trace_level); + if (m_active_tranxs != NULL) + { + m_commit_file_name_inited = false; + m_reply_file_name_inited = false; + m_wait_file_name_inited = false; + + set_master_enabled(true); + m_state = true; + sql_print_information("Semi-sync replication enabled on the master."); + } + else + { + sql_print_error("Cannot allocate memory to enable semi-sync on the master."); + result = -1; + } + } + + unlock(); + + return result; +} + +int Repl_semi_sync_master::disable_master() +{ + /* Must have the lock when we do enable of disable. */ + lock(); + + if (get_master_enabled()) + { + /* Switch off the semi-sync first so that waiting transaction will be + * waken up. + */ + switch_off(); + + assert(m_active_tranxs != NULL); + delete m_active_tranxs; + m_active_tranxs = NULL; + + m_reply_file_name_inited = false; + m_wait_file_name_inited = false; + m_commit_file_name_inited = false; + + set_master_enabled(false); + sql_print_information("Semi-sync replication disabled on the master."); + } + + unlock(); + + return 0; +} + +void Repl_semi_sync_master::cleanup() +{ + if (m_init_done) + { + mysql_mutex_destroy(&LOCK_binlog); + mysql_cond_destroy(&COND_binlog_send); + m_init_done= 0; + } + + delete m_active_tranxs; +} + +void Repl_semi_sync_master::lock() +{ + mysql_mutex_lock(&LOCK_binlog); +} + +void Repl_semi_sync_master::unlock() +{ + mysql_mutex_unlock(&LOCK_binlog); +} + +void Repl_semi_sync_master::cond_broadcast() +{ + mysql_cond_broadcast(&COND_binlog_send); +} + +int Repl_semi_sync_master::cond_timewait(struct timespec *wait_time) +{ + int wait_res; + + DBUG_ENTER("Repl_semi_sync_master::cond_timewait()"); + + wait_res= mysql_cond_timedwait(&COND_binlog_send, + &LOCK_binlog, wait_time); + + DBUG_RETURN(wait_res); +} + +void Repl_semi_sync_master::add_slave() +{ + lock(); + rpl_semi_sync_master_clients++; + unlock(); +} + +void Repl_semi_sync_master::remove_slave() +{ + lock(); + rpl_semi_sync_master_clients--; + + /* Only switch off if semi-sync is enabled and is on */ + if (get_master_enabled() && is_on()) + { + /* If user has chosen not to wait if no semi-sync slave available + and the last semi-sync slave exits, turn off semi-sync on master + immediately. + */ + if (!rpl_semi_sync_master_wait_no_slave && + rpl_semi_sync_master_clients == 0) + switch_off(); + } + unlock(); +} + +int Repl_semi_sync_master::report_reply_packet(uint32 server_id, + const uchar *packet, + ulong packet_len) +{ + int result= -1; + char log_file_name[FN_REFLEN+1]; + my_off_t log_file_pos; + ulong log_file_len = 0; + + DBUG_ENTER("Repl_semi_sync_master::report_reply_packet"); + + if (unlikely(packet[REPLY_MAGIC_NUM_OFFSET] != + Repl_semi_sync_master::k_packet_magic_num)) + { + sql_print_error("Read semi-sync reply magic number error"); + goto l_end; + } + + if (unlikely(packet_len < REPLY_BINLOG_NAME_OFFSET)) + { + sql_print_error("Read semi-sync reply length error: packet is too small"); + goto l_end; + } + + log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET); + log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET; + if (unlikely(log_file_len >= FN_REFLEN)) + { + sql_print_error("Read semi-sync reply binlog file length too large"); + goto l_end; + } + strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len); + log_file_name[log_file_len] = 0; + + DBUG_ASSERT(dirname_length(log_file_name) == 0); + + DBUG_PRINT("semisync", ("%s: Got reply(%s, %lu) from server %u", + "Repl_semi_sync_master::report_reply_packet", + log_file_name, (ulong)log_file_pos, server_id)); + + rpl_semi_sync_master_get_ack++; + report_reply_binlog(server_id, log_file_name, log_file_pos); + +l_end: + + DBUG_RETURN(result); +} + +int Repl_semi_sync_master::report_reply_binlog(uint32 server_id, + const char *log_file_name, + my_off_t log_file_pos) +{ + int cmp; + bool can_release_threads = false; + bool need_copy_send_pos = true; + + DBUG_ENTER("Repl_semi_sync_master::report_reply_binlog"); + + if (!(get_master_enabled())) + DBUG_RETURN(0); + + lock(); + + /* This is the real check inside the mutex. */ + if (!get_master_enabled()) + goto l_end; + + if (!is_on()) + /* We check to see whether we can switch semi-sync ON. */ + try_switch_on(server_id, log_file_name, log_file_pos); + + /* The position should increase monotonically, if there is only one + * thread sending the binlog to the slave. + * In reality, to improve the transaction availability, we allow multiple + * sync replication slaves. So, if any one of them get the transaction, + * the transaction session in the primary can move forward. + */ + if (m_reply_file_name_inited) + { + cmp = Active_tranx::compare(log_file_name, log_file_pos, + m_reply_file_name, m_reply_file_pos); + + /* If the requested position is behind the sending binlog position, + * would not adjust sending binlog position. + * We based on the assumption that there are multiple semi-sync slave, + * and at least one of them shou/ld be up to date. + * If all semi-sync slaves are behind, at least initially, the primary + * can find the situation after the waiting timeout. After that, some + * slaves should catch up quickly. + */ + if (cmp < 0) + { + /* If the position is behind, do not copy it. */ + need_copy_send_pos = false; + } + } + + if (need_copy_send_pos) + { + strmake_buf(m_reply_file_name, log_file_name); + m_reply_file_pos = log_file_pos; + m_reply_file_name_inited = true; + + /* Remove all active transaction nodes before this point. */ + assert(m_active_tranxs != NULL); + m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos); + + DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)", + "Repl_semi_sync_master::report_reply_binlog", + log_file_name, (ulong)log_file_pos)); + } + + if (rpl_semi_sync_master_wait_sessions > 0) + { + /* Let us check if some of the waiting threads doing a trx + * commit can now proceed. + */ + cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos, + m_wait_file_name, m_wait_file_pos); + if (cmp >= 0) + { + /* Yes, at least one waiting thread can now proceed: + * let us release all waiting threads with a broadcast + */ + can_release_threads = true; + m_wait_file_name_inited = false; + } + } + + l_end: + unlock(); + + if (can_release_threads) + { + DBUG_PRINT("semisync", ("%s: signal all waiting threads.", + "Repl_semi_sync_master::report_reply_binlog")); + + cond_broadcast(); + } + + DBUG_RETURN(0); +} + +int Repl_semi_sync_master::wait_after_sync(const char *log_file, my_off_t log_pos) +{ + if (!get_master_enabled()) + return 0; + + int ret= 0; + if(log_pos && + wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC) + ret= commit_trx(log_file + dirname_length(log_file), log_pos); + + return ret; +} + +int Repl_semi_sync_master::wait_after_commit(THD* thd, bool all) +{ + if (!get_master_enabled()) + return 0; + + int ret= 0; + const char *log_file; + my_off_t log_pos; + + bool is_real_trans= + (all || thd->transaction.all.ha_list == 0); + /* + The coordinates are propagated to this point having been computed + in report_binlog_update + */ + Trans_binlog_info *log_info= thd->semisync_info; + log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0; + log_pos= log_info ? log_info->log_pos : 0; + + DBUG_ASSERT(!log_file || dirname_length(log_file) == 0); + + if (is_real_trans && + log_pos && + wait_point() == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT) + ret= commit_trx(log_file, log_pos); + + if (is_real_trans && log_info) + { + log_info->log_file[0]= 0; + log_info->log_pos= 0; + } + + return ret; +} + +int Repl_semi_sync_master::wait_after_rollback(THD *thd, bool all) +{ + return wait_after_commit(thd, all); +} + +/** + The method runs after flush to binary log is done. +*/ +int Repl_semi_sync_master::report_binlog_update(THD* thd, const char *log_file, + my_off_t log_pos) +{ + if (get_master_enabled()) + { + Trans_binlog_info *log_info; + + if (!(log_info= thd->semisync_info)) + { + if(!(log_info= + (Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0)))) + return 1; + thd->semisync_info= log_info; + } + strcpy(log_info->log_file, log_file + dirname_length(log_file)); + log_info->log_pos = log_pos; + + return write_tranx_in_binlog(log_info->log_file, log_pos); + } + + return 0; +} + +int Repl_semi_sync_master::dump_start(THD* thd, + const char *log_file, + my_off_t log_pos) +{ + if (!thd->semi_sync_slave) + return 0; + + if (ack_receiver.add_slave(thd)) + { + sql_print_error("Failed to register slave to semi-sync ACK receiver " + "thread. Turning off semisync"); + thd->semi_sync_slave= 0; + return 1; + } + + add_slave(); + report_reply_binlog(thd->variables.server_id, + log_file + dirname_length(log_file), log_pos); + sql_print_information("Start semi-sync binlog_dump to slave (server_id: %d), " + "pos(%s, %lu", + thd->variables.server_id, log_file, + (unsigned long)log_pos); + + return 0; +} + +void Repl_semi_sync_master::dump_end(THD* thd) +{ + if (!thd->semi_sync_slave) + return; + + sql_print_information("Stop semi-sync binlog_dump to slave (server_id: %d)", thd->variables.server_id); + + remove_slave(); + ack_receiver.remove_slave(thd); + + return; +} + +int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, + my_off_t trx_wait_binlog_pos) +{ + + DBUG_ENTER("Repl_semi_sync_master::commit_trx"); + + if (get_master_enabled() && trx_wait_binlog_name) + { + struct timespec start_ts; + struct timespec abstime; + int wait_result; + PSI_stage_info old_stage; + + set_timespec(start_ts, 0); + + DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock"); + /* Acquire the mutex. */ + lock(); + + /* This must be called after acquired the lock */ + THD_ENTER_COND(NULL, &COND_binlog_send, &LOCK_binlog, + & stage_waiting_for_semi_sync_ack_from_slave, + & old_stage); + + /* This is the real check inside the mutex. */ + if (!get_master_enabled() || !is_on()) + goto l_end; + + DBUG_PRINT("semisync", ("%s: wait pos (%s, %lu), repl(%d)\n", + "Repl_semi_sync_master::commit_trx", + trx_wait_binlog_name, (ulong)trx_wait_binlog_pos, + (int)is_on())); + + while (is_on() && !thd_killed(current_thd)) + { + if (m_reply_file_name_inited) + { + int cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos, + trx_wait_binlog_name, + trx_wait_binlog_pos); + if (cmp >= 0) + { + /* We have already sent the relevant binlog to the slave: no need to + * wait here. + */ + DBUG_PRINT("semisync", ("%s: Binlog reply is ahead (%s, %lu),", + "Repl_semi_sync_master::commit_trx", + m_reply_file_name, + (ulong)m_reply_file_pos)); + break; + } + } + + /* Let us update the info about the minimum binlog position of waiting + * threads. + */ + if (m_wait_file_name_inited) + { + int cmp = Active_tranx::compare(trx_wait_binlog_name, + trx_wait_binlog_pos, + m_wait_file_name, m_wait_file_pos); + if (cmp <= 0) + { + /* This thd has a lower position, let's update the minimum info. */ + strmake_buf(m_wait_file_name, trx_wait_binlog_name); + m_wait_file_pos = trx_wait_binlog_pos; + + rpl_semi_sync_master_wait_pos_backtraverse++; + DBUG_PRINT("semisync", ("%s: move back wait position (%s, %lu),", + "Repl_semi_sync_master::commit_trx", + m_wait_file_name, (ulong)m_wait_file_pos)); + } + } + else + { + strmake_buf(m_wait_file_name, trx_wait_binlog_name); + m_wait_file_pos = trx_wait_binlog_pos; + m_wait_file_name_inited = true; + + DBUG_PRINT("semisync", ("%s: init wait position (%s, %lu),", + "Repl_semi_sync_master::commit_trx", + m_wait_file_name, (ulong)m_wait_file_pos)); + } + + /* Calcuate the waiting period. */ + long diff_secs = (long) (m_wait_timeout / TIME_THOUSAND); + long diff_nsecs = (long) ((m_wait_timeout % TIME_THOUSAND) * TIME_MILLION); + long nsecs = start_ts.tv_nsec + diff_nsecs; + abstime.tv_sec = start_ts.tv_sec + diff_secs + nsecs/TIME_BILLION; + abstime.tv_nsec = nsecs % TIME_BILLION; + + /* In semi-synchronous replication, we wait until the binlog-dump + * thread has received the reply on the relevant binlog segment from the + * replication slave. + * + * Let us suspend this thread to wait on the condition; + * when replication has progressed far enough, we will release + * these waiting threads. + */ + rpl_semi_sync_master_wait_sessions++; + + DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)", + "Repl_semi_sync_master::commit_trx", + m_wait_timeout, + m_wait_file_name, (ulong)m_wait_file_pos)); + + wait_result = cond_timewait(&abstime); + rpl_semi_sync_master_wait_sessions--; + + if (wait_result != 0) + { + /* This is a real wait timeout. */ + sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), " + "semi-sync up to file %s, position %lu.", + trx_wait_binlog_name, (ulong)trx_wait_binlog_pos, + m_reply_file_name, (ulong)m_reply_file_pos); + rpl_semi_sync_master_wait_timeouts++; + + /* switch semi-sync off */ + switch_off(); + } + else + { + int wait_time; + + wait_time = get_wait_time(start_ts); + if (wait_time < 0) + { + DBUG_PRINT("semisync", ("Replication semi-sync getWaitTime fail at " + "wait position (%s, %lu)", + trx_wait_binlog_name, + (ulong)trx_wait_binlog_pos)); + rpl_semi_sync_master_timefunc_fails++; + } + else + { + rpl_semi_sync_master_trx_wait_num++; + rpl_semi_sync_master_trx_wait_time += wait_time; + } + } + } + + /* + At this point, the binlog file and position of this transaction + must have been removed from Active_tranx. + m_active_tranxs may be NULL if someone disabled semi sync during + cond_timewait() + */ + assert(thd_killed(current_thd) || !m_active_tranxs || + !m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name, + trx_wait_binlog_pos)); + + l_end: + /* Update the status counter. */ + if (is_on()) + rpl_semi_sync_master_yes_transactions++; + else + rpl_semi_sync_master_no_transactions++; + + /* The lock held will be released by thd_exit_cond, so no need to + call unlock() here */ + THD_EXIT_COND(NULL, & old_stage); + } + + DBUG_RETURN(0); +} + +/* Indicate that semi-sync replication is OFF now. + * + * What should we do when it is disabled? The problem is that we want + * the semi-sync replication enabled again when the slave catches up + * later. But, it is not that easy to detect that the slave has caught + * up. This is caused by the fact that MySQL's replication protocol is + * asynchronous, meaning that if the master does not use the semi-sync + * protocol, the slave would not send anything to the master. + * Still, if the master is sending (N+1)-th event, we assume that it is + * an indicator that the slave has received N-th event and earlier ones. + * + * If semi-sync is disabled, all transactions still update the wait + * position with the last position in binlog. But no transactions will + * wait for confirmations and the active transaction list would not be + * maintained. In binlog dump thread, update_sync_header() checks whether + * the current sending event catches up with last wait position. If it + * does match, semi-sync will be switched on again. + */ +int Repl_semi_sync_master::switch_off() +{ + int result; + + DBUG_ENTER("Repl_semi_sync_master::switch_off"); + + m_state = false; + + /* Clear the active transaction list. */ + assert(m_active_tranxs != NULL); + result = m_active_tranxs->clear_active_tranx_nodes(NULL, 0); + + rpl_semi_sync_master_off_times++; + m_wait_file_name_inited = false; + m_reply_file_name_inited = false; + sql_print_information("Semi-sync replication switched OFF."); + cond_broadcast(); /* wake up all waiting threads */ + + DBUG_RETURN(result); +} + +int Repl_semi_sync_master::try_switch_on(int server_id, + const char *log_file_name, + my_off_t log_file_pos) +{ + bool semi_sync_on = false; + + DBUG_ENTER("Repl_semi_sync_master::try_switch_on"); + + /* If the current sending event's position is larger than or equal to the + * 'largest' commit transaction binlog position, the slave is already + * catching up now and we can switch semi-sync on here. + * If m_commit_file_name_inited indicates there are no recent transactions, + * we can enable semi-sync immediately. + */ + if (m_commit_file_name_inited) + { + int cmp = Active_tranx::compare(log_file_name, log_file_pos, + m_commit_file_name, m_commit_file_pos); + semi_sync_on = (cmp >= 0); + } + else + { + semi_sync_on = true; + } + + if (semi_sync_on) + { + /* Switch semi-sync replication on. */ + m_state = true; + + sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) " + "at (%s, %lu)", + server_id, log_file_name, + (ulong)log_file_pos); + } + + DBUG_RETURN(0); +} + +int Repl_semi_sync_master::reserve_sync_header(String* packet) +{ + DBUG_ENTER("Repl_semi_sync_master::reserve_sync_header"); + + /* Set the magic number and the sync status. By default, no sync + * is required. + */ + packet->append(reinterpret_cast<const char*>(k_sync_header), + sizeof(k_sync_header)); + DBUG_RETURN(0); +} + +int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet, + const char *log_file_name, + my_off_t log_file_pos, + bool* need_sync) +{ + int cmp = 0; + bool sync = false; + + DBUG_ENTER("Repl_semi_sync_master::update_sync_header"); + + /* If the semi-sync master is not enabled, or the slave is not a semi-sync + * target, do not request replies from the slave. + */ + if (!get_master_enabled() || !thd->semi_sync_slave) + { + *need_sync = false; + DBUG_RETURN(0); + } + + lock(); + + /* This is the real check inside the mutex. */ + if (!get_master_enabled()) + { + assert(sync == false); + goto l_end; + } + + if (is_on()) + { + /* semi-sync is ON */ + sync = false; /* No sync unless a transaction is involved. */ + + if (m_reply_file_name_inited) + { + cmp = Active_tranx::compare(log_file_name, log_file_pos, + m_reply_file_name, m_reply_file_pos); + if (cmp <= 0) + { + /* If we have already got the reply for the event, then we do + * not need to sync the transaction again. + */ + goto l_end; + } + } + + if (m_wait_file_name_inited) + { + cmp = Active_tranx::compare(log_file_name, log_file_pos, + m_wait_file_name, m_wait_file_pos); + } + else + { + cmp = 1; + } + + /* If we are already waiting for some transaction replies which + * are later in binlog, do not wait for this one event. + */ + if (cmp >= 0) + { + /* + * We only wait if the event is a transaction's ending event. + */ + assert(m_active_tranxs != NULL); + sync = m_active_tranxs->is_tranx_end_pos(log_file_name, + log_file_pos); + } + } + else + { + if (m_commit_file_name_inited) + { + int cmp = Active_tranx::compare(log_file_name, log_file_pos, + m_commit_file_name, m_commit_file_pos); + sync = (cmp >= 0); + } + else + { + sync = true; + } + } + + DBUG_PRINT("semisync", ("%s: server(%lu), (%s, %lu) sync(%d), repl(%d)", + "Repl_semi_sync_master::update_sync_header", + thd->variables.server_id, log_file_name, + (ulong)log_file_pos, sync, (int)is_on())); + *need_sync= sync; + + l_end: + unlock(); + + /* We do not need to clear sync flag because we set it to 0 when we + * reserve the packet header. + */ + if (sync) + { + (packet)[2] = k_packet_flag_sync; + } + + DBUG_RETURN(0); +} + +int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name, + my_off_t log_file_pos) +{ + int result = 0; + + DBUG_ENTER("Repl_semi_sync_master::write_tranx_in_binlog"); + + lock(); + + /* This is the real check inside the mutex. */ + if (!get_master_enabled()) + goto l_end; + + /* Update the 'largest' transaction commit position seen so far even + * though semi-sync is switched off. + * It is much better that we update m_commit_file* here, instead of + * inside commit_trx(). This is mostly because update_sync_header() + * will watch for m_commit_file* to decide whether to switch semi-sync + * on. The detailed reason is explained in function update_sync_header(). + */ + if (m_commit_file_name_inited) + { + int cmp = Active_tranx::compare(log_file_name, log_file_pos, + m_commit_file_name, m_commit_file_pos); + if (cmp > 0) + { + /* This is a larger position, let's update the maximum info. */ + strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1); + m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */ + m_commit_file_pos = log_file_pos; + } + } + else + { + strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1); + m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */ + m_commit_file_pos = log_file_pos; + m_commit_file_name_inited = true; + } + + if (is_on()) + { + assert(m_active_tranxs != NULL); + if(m_active_tranxs->insert_tranx_node(log_file_name, log_file_pos)) + { + /* + if insert tranx_node failed, print a warning message + and turn off semi-sync + */ + sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu", + log_file_name, (ulong)log_file_pos); + switch_off(); + } + else + { + rpl_semi_sync_master_request_ack++; + } + } + + l_end: + unlock(); + + DBUG_RETURN(result); +} + +int Repl_semi_sync_master::flush_net(THD *thd, + const char *event_buf) +{ + int result = -1; + NET* net= &thd->net; + + DBUG_ENTER("Repl_semi_sync_master::flush_net"); + + assert((unsigned char)event_buf[1] == k_packet_magic_num); + if ((unsigned char)event_buf[2] != k_packet_flag_sync) + { + /* current event does not require reply */ + result = 0; + goto l_end; + } + + /* We flush to make sure that the current event is sent to the network, + * instead of being buffered in the TCP/IP stack. + */ + if (net_flush(net)) + { + sql_print_error("Semi-sync master failed on net_flush() " + "before waiting for slave reply"); + goto l_end; + } + + net_clear(net, 0); + net->pkt_nr++; + result = 0; + rpl_semi_sync_master_net_wait_num++; + + l_end: + thd->clear_error(); + + DBUG_RETURN(result); +} + +int Repl_semi_sync_master::after_reset_master() +{ + int result = 0; + + DBUG_ENTER("Repl_semi_sync_master::after_reset_master"); + + if (rpl_semi_sync_master_enabled) + { + sql_print_information("Enable Semi-sync Master after reset master"); + enable_master(); + } + + lock(); + + if (rpl_semi_sync_master_clients == 0 && + !rpl_semi_sync_master_wait_no_slave) + m_state = 0; + else + m_state = get_master_enabled()? 1 : 0; + + m_wait_file_name_inited = false; + m_reply_file_name_inited = false; + m_commit_file_name_inited = false; + + rpl_semi_sync_master_yes_transactions = 0; + rpl_semi_sync_master_no_transactions = 0; + rpl_semi_sync_master_off_times = 0; + rpl_semi_sync_master_timefunc_fails = 0; + rpl_semi_sync_master_wait_sessions = 0; + rpl_semi_sync_master_wait_pos_backtraverse = 0; + rpl_semi_sync_master_trx_wait_num = 0; + rpl_semi_sync_master_trx_wait_time = 0; + rpl_semi_sync_master_net_wait_num = 0; + rpl_semi_sync_master_net_wait_time = 0; + + unlock(); + + DBUG_RETURN(result); +} + +int Repl_semi_sync_master::before_reset_master() +{ + int result = 0; + + DBUG_ENTER("Repl_semi_sync_master::before_reset_master"); + + if (rpl_semi_sync_master_enabled) + disable_master(); + + DBUG_RETURN(result); +} + +void Repl_semi_sync_master::check_and_switch() +{ + lock(); + if (get_master_enabled() && is_on()) + { + if (!rpl_semi_sync_master_wait_no_slave + && rpl_semi_sync_master_clients == 0) + switch_off(); + } + unlock(); +} + +void Repl_semi_sync_master::set_export_stats() +{ + lock(); + + rpl_semi_sync_master_status = m_state; + rpl_semi_sync_master_avg_trx_wait_time= + ((rpl_semi_sync_master_trx_wait_num) ? + (ulong)((double)rpl_semi_sync_master_trx_wait_time / + ((double)rpl_semi_sync_master_trx_wait_num)) : 0); + rpl_semi_sync_master_avg_net_wait_time= + ((rpl_semi_sync_master_net_wait_num) ? + (ulong)((double)rpl_semi_sync_master_net_wait_time / + ((double)rpl_semi_sync_master_net_wait_num)) : 0); + + unlock(); +} + +/* Get the waiting time given the wait's staring time. + * + * Return: + * >= 0: the waiting time in microsecons(us) + * < 0: error in get time or time back traverse + */ +static int get_wait_time(const struct timespec& start_ts) +{ + ulonglong start_usecs, end_usecs; + struct timespec end_ts; + + /* Starting time in microseconds(us). */ + start_usecs = timespec_to_usec(&start_ts); + + /* Get the wait time interval. */ + set_timespec(end_ts, 0); + + /* Ending time in microseconds(us). */ + end_usecs = timespec_to_usec(&end_ts); + + if (end_usecs < start_usecs) + return -1; + + return (int)(end_usecs - start_usecs); +} + +void semi_sync_master_deinit() +{ + repl_semisync_master.cleanup(); + ack_receiver.cleanup(); +} diff --git a/sql/semisync_master.h b/sql/semisync_master.h new file mode 100644 index 00000000000..a58c1a7ae6e --- /dev/null +++ b/sql/semisync_master.h @@ -0,0 +1,674 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc. + Use is subject to license terms. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#ifndef SEMISYNC_MASTER_H +#define SEMISYNC_MASTER_H + +#include "semisync.h" +#include "semisync_master_ack_receiver.h" + +#ifdef HAVE_PSI_INTERFACE +extern PSI_mutex_key key_LOCK_binlog; +extern PSI_cond_key key_COND_binlog_send; +#endif + +struct Tranx_node { + char log_name[FN_REFLEN]; + my_off_t log_pos; + struct Tranx_node *next; /* the next node in the sorted list */ + struct Tranx_node *hash_next; /* the next node during hash collision */ +}; + +/** + @class Tranx_node_allocator + + This class provides memory allocating and freeing methods for + Tranx_node. The main target is performance. + + @section ALLOCATE How to allocate a node + The pointer of the first node after 'last_node' in current_block is + returned. current_block will move to the next free Block when all nodes of + it are in use. A new Block is allocated and is put into the rear of the + Block link table if no Block is free. + + The list starts up empty (ie, there is no allocated Block). + + After some nodes are freed, there probably are some free nodes before + the sequence of the allocated nodes, but we do not reuse it. It is better + to keep the allocated nodes are in the sequence, for it is more efficient + for allocating and freeing Tranx_node. + + @section FREENODE How to free nodes + There are two methods for freeing nodes. They are free_all_nodes and + free_nodes_before. + + 'A Block is free' means all of its nodes are free. + @subsection free_nodes_before + As all allocated nodes are in the sequence, 'Before one node' means all + nodes before given node in the same Block and all Blocks before the Block + which containing the given node. As such, all Blocks before the given one + ('node') are free Block and moved into the rear of the Block link table. + The Block containing the given 'node', however, is not. For at least the + given 'node' is still in use. This will waste at most one Block, but it is + more efficient. + */ +#define BLOCK_TRANX_NODES 16 +class Tranx_node_allocator +{ +public: + /** + @param reserved_nodes + The number of reserved Tranx_nodes. It is used to set 'reserved_blocks' + which can contain at least 'reserved_nodes' number of Tranx_nodes. When + freeing memory, we will reserve at least reserved_blocks of Blocks not + freed. + */ + Tranx_node_allocator(uint reserved_nodes) : + reserved_blocks(reserved_nodes/BLOCK_TRANX_NODES + + (reserved_nodes%BLOCK_TRANX_NODES > 1 ? 2 : 1)), + first_block(NULL), last_block(NULL), + current_block(NULL), last_node(-1), block_num(0) {} + + ~Tranx_node_allocator() + { + Block *block= first_block; + while (block != NULL) + { + Block *next= block->next; + free_block(block); + block= next; + } + } + + /** + The pointer of the first node after 'last_node' in current_block is + returned. current_block will move to the next free Block when all nodes of + it are in use. A new Block is allocated and is put into the rear of the + Block link table if no Block is free. + + @return Return a Tranx_node *, or NULL if an error occurred. + */ + Tranx_node *allocate_node() + { + Tranx_node *trx_node; + Block *block= current_block; + + if (last_node == BLOCK_TRANX_NODES-1) + { + current_block= current_block->next; + last_node= -1; + } + + if (current_block == NULL && allocate_block()) + { + current_block= block; + if (current_block) + last_node= BLOCK_TRANX_NODES-1; + return NULL; + } + + trx_node= &(current_block->nodes[++last_node]); + trx_node->log_name[0] = '\0'; + trx_node->log_pos= 0; + trx_node->next= 0; + trx_node->hash_next= 0; + return trx_node; + } + + /** + All nodes are freed. + + @return Return 0, or 1 if an error occurred. + */ + int free_all_nodes() + { + current_block= first_block; + last_node= -1; + free_blocks(); + return 0; + } + + /** + All Blocks before the given 'node' are free Block and moved into the rear + of the Block link table. + + @param node All nodes before 'node' will be freed + + @return Return 0, or 1 if an error occurred. + */ + int free_nodes_before(Tranx_node* node) + { + Block *block; + Block *prev_block= NULL; + + block= first_block; + while (block != current_block->next) + { + /* Find the Block containing the given node */ + if (&(block->nodes[0]) <= node && &(block->nodes[BLOCK_TRANX_NODES]) >= node) + { + /* All Blocks before the given node are put into the rear */ + if (first_block != block) + { + last_block->next= first_block; + first_block= block; + last_block= prev_block; + last_block->next= NULL; + free_blocks(); + } + return 0; + } + prev_block= block; + block= block->next; + } + + /* Node does not find should never happen */ + DBUG_ASSERT(0); + return 1; + } + +private: + uint reserved_blocks; + + /** + A sequence memory which contains BLOCK_TRANX_NODES Tranx_nodes. + + BLOCK_TRANX_NODES The number of Tranx_nodes which are in a Block. + + next Every Block has a 'next' pointer which points to the next Block. + These linking Blocks constitute a Block link table. + */ + struct Block { + Block *next; + Tranx_node nodes[BLOCK_TRANX_NODES]; + }; + + /** + The 'first_block' is the head of the Block link table; + */ + Block *first_block; + /** + The 'last_block' is the rear of the Block link table; + */ + Block *last_block; + + /** + current_block always points the Block in the Block link table in + which the last allocated node is. The Blocks before it are all in use + and the Blocks after it are all free. + */ + Block *current_block; + + /** + It always points to the last node which has been allocated in the + current_block. + */ + int last_node; + + /** + How many Blocks are in the Block link table. + */ + uint block_num; + + /** + Allocate a block and then assign it to current_block. + */ + int allocate_block() + { + Block *block= (Block *)my_malloc(sizeof(Block), MYF(0)); + if (block) + { + block->next= NULL; + + if (first_block == NULL) + first_block= block; + else + last_block->next= block; + + /* New Block is always put into the rear */ + last_block= block; + /* New Block is always the current_block */ + current_block= block; + ++block_num; + return 0; + } + return 1; + } + + /** + Free a given Block. + @param block The Block will be freed. + */ + void free_block(Block *block) + { + my_free(block); + --block_num; + } + + + /** + If there are some free Blocks and the total number of the Blocks in the + Block link table is larger than the 'reserved_blocks', Some free Blocks + will be freed until the total number of the Blocks is equal to the + 'reserved_blocks' or there is only one free Block behind the + 'current_block'. + */ + void free_blocks() + { + if (current_block == NULL || current_block->next == NULL) + return; + + /* One free Block is always kept behind the current block */ + Block *block= current_block->next->next; + while (block_num > reserved_blocks && block != NULL) + { + Block *next= block->next; + free_block(block); + block= next; + } + current_block->next->next= block; + if (block == NULL) + last_block= current_block->next; + } +}; + +/** + This class manages memory for active transaction list. + + We record each active transaction with a Tranx_node, each session + can have only one open transaction. Because of EVENT, the total + active transaction nodes can exceed the maximum allowed + connections. +*/ +class Active_tranx + :public Trace { +private: + + Tranx_node_allocator m_allocator; + /* These two record the active transaction list in sort order. */ + Tranx_node *m_trx_front, *m_trx_rear; + + Tranx_node **m_trx_htb; /* A hash table on active transactions. */ + + int m_num_entries; /* maximum hash table entries */ + mysql_mutex_t *m_lock; /* mutex lock */ + + inline void assert_lock_owner(); + + inline unsigned int calc_hash(const unsigned char *key,unsigned int length); + unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos); + + int compare(const char *log_file_name1, my_off_t log_file_pos1, + const Tranx_node *node2) { + return compare(log_file_name1, log_file_pos1, + node2->log_name, node2->log_pos); + } + int compare(const Tranx_node *node1, + const char *log_file_name2, my_off_t log_file_pos2) { + return compare(node1->log_name, node1->log_pos, + log_file_name2, log_file_pos2); + } + int compare(const Tranx_node *node1, const Tranx_node *node2) { + return compare(node1->log_name, node1->log_pos, + node2->log_name, node2->log_pos); + } + +public: + Active_tranx(mysql_mutex_t *lock, unsigned long trace_level); + ~Active_tranx(); + + /* Insert an active transaction node with the specified position. + * + * Return: + * 0: success; non-zero: error + */ + int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos); + + /* Clear the active transaction nodes until(inclusive) the specified + * position. + * If log_file_name is NULL, everything will be cleared: the sorted + * list and the hash table will be reset to empty. + * + * Return: + * 0: success; non-zero: error + */ + int clear_active_tranx_nodes(const char *log_file_name, + my_off_t log_file_pos); + + /* Given a position, check to see whether the position is an active + * transaction's ending position by probing the hash table. + */ + bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos); + + /* Given two binlog positions, compare which one is bigger based on + * (file_name, file_position). + */ + static int compare(const char *log_file_name1, my_off_t log_file_pos1, + const char *log_file_name2, my_off_t log_file_pos2); + +}; + +/** + The extension class for the master of semi-synchronous replication +*/ +class Repl_semi_sync_master + :public Repl_semi_sync_base { + private: + Active_tranx *m_active_tranxs; /* active transaction list: the list will + be cleared when semi-sync switches off. */ + + /* True when init_object has been called */ + bool m_init_done; + + /* This cond variable is signaled when enough binlog has been sent to slave, + * so that a waiting trx can return the 'ok' to the client for a commit. + */ + mysql_cond_t COND_binlog_send; + + /* Mutex that protects the following state variables and the active + * transaction list. + * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are + * already holding m_LOCK_binlog because it can cause deadlocks. + */ + mysql_mutex_t LOCK_binlog; + + /* This is set to true when m_reply_file_name contains meaningful data. */ + bool m_reply_file_name_inited; + + /* The binlog name up to which we have received replies from any slaves. */ + char m_reply_file_name[FN_REFLEN]; + + /* The position in that file up to which we have the reply from any slaves. */ + my_off_t m_reply_file_pos; + + /* This is set to true when we know the 'smallest' wait position. */ + bool m_wait_file_name_inited; + + /* NULL, or the 'smallest' filename that a transaction is waiting for + * slave replies. + */ + char m_wait_file_name[FN_REFLEN]; + + /* The smallest position in that file that a trx is waiting for: the trx + * can proceed and send an 'ok' to the client when the master has got the + * reply from the slave indicating that it already got the binlog events. + */ + my_off_t m_wait_file_pos; + + /* This is set to true when we know the 'largest' transaction commit + * position in the binlog file. + * We always maintain the position no matter whether semi-sync is switched + * on switched off. When a transaction wait timeout occurs, semi-sync will + * switch off. Binlog-dump thread can use the three fields to detect when + * slaves catch up on replication so that semi-sync can switch on again. + */ + bool m_commit_file_name_inited; + + /* The 'largest' binlog filename that a commit transaction is seeing. */ + char m_commit_file_name[FN_REFLEN]; + + /* The 'largest' position in that file that a commit transaction is seeing. */ + my_off_t m_commit_file_pos; + + /* All global variables which can be set by parameters. */ + volatile bool m_master_enabled; /* semi-sync is enabled on the master */ + unsigned long m_wait_timeout; /* timeout period(ms) during tranx wait */ + + bool m_state; /* whether semi-sync is switched */ + + /*Waiting for ACK before/after innodb commit*/ + ulong m_wait_point; + + void lock(); + void unlock(); + void cond_broadcast(); + int cond_timewait(struct timespec *wait_time); + + /* Is semi-sync replication on? */ + bool is_on() { + return (m_state); + } + + void set_master_enabled(bool enabled) { + m_master_enabled = enabled; + } + + /* Switch semi-sync off because of timeout in transaction waiting. */ + int switch_off(); + + /* Switch semi-sync on when slaves catch up. */ + int try_switch_on(int server_id, + const char *log_file_name, my_off_t log_file_pos); + + public: + Repl_semi_sync_master(); + ~Repl_semi_sync_master() {} + + void cleanup(); + + bool get_master_enabled() { + return m_master_enabled; + } + void set_trace_level(unsigned long trace_level) { + m_trace_level = trace_level; + if (m_active_tranxs) + m_active_tranxs->m_trace_level = trace_level; + } + + /* Set the transaction wait timeout period, in milliseconds. */ + void set_wait_timeout(unsigned long wait_timeout) { + m_wait_timeout = wait_timeout; + } + + /*set the ACK point, after binlog sync or after transaction commit*/ + void set_wait_point(unsigned long ack_point) + { + m_wait_point = ack_point; + } + + ulong wait_point() //no cover line + { + return m_wait_point; //no cover line + } + + /* Initialize this class after MySQL parameters are initialized. this + * function should be called once at bootstrap time. + */ + int init_object(); + + /* Enable the object to enable semi-sync replication inside the master. */ + int enable_master(); + + /* Enable the object to enable semi-sync replication inside the master. */ + int disable_master(); + + /* Add a semi-sync replication slave */ + void add_slave(); + + /* Remove a semi-sync replication slave */ + void remove_slave(); + + /* It parses a reply packet and call report_reply_binlog to handle it. */ + int report_reply_packet(uint32 server_id, const uchar *packet, + ulong packet_len); + + /* In semi-sync replication, reports up to which binlog position we have + * received replies from the slave indicating that it already get the events. + * + * Input: + * server_id - (IN) master server id number + * log_file_name - (IN) binlog file name + * end_offset - (IN) the offset in the binlog file up to which we have + * the replies from the slave + * + * Return: + * 0: success; non-zero: error + */ + int report_reply_binlog(uint32 server_id, + const char* log_file_name, + my_off_t end_offset); + + /* Commit a transaction in the final step. This function is called from + * InnoDB before returning from the low commit. If semi-sync is switch on, + * the function will wait to see whether binlog-dump thread get the reply for + * the events of the transaction. Remember that this is not a direct wait, + * instead, it waits to see whether the binlog-dump thread has reached the + * point. If the wait times out, semi-sync status will be switched off and + * all other transaction would not wait either. + * + * Input: (the transaction events' ending binlog position) + * trx_wait_binlog_name - (IN) ending position's file name + * trx_wait_binlog_pos - (IN) ending position's file offset + * + * Return: + * 0: success; non-zero: error + */ + int commit_trx(const char* trx_wait_binlog_name, + my_off_t trx_wait_binlog_pos); + + /*Wait for ACK after writing/sync binlog to file*/ + int wait_after_sync(const char* log_file, my_off_t log_pos); + + /*Wait for ACK after commting the transaction*/ + int wait_after_commit(THD* thd, bool all); + + /*Wait after the transaction is rollback*/ + int wait_after_rollback(THD *thd, bool all); + /*Store the current binlog position in m_active_tranxs. This position should + * be acked by slave*/ + int report_binlog_update(THD *thd, const char *log_file,my_off_t log_pos); + + int dump_start(THD* thd, + const char *log_file, + my_off_t log_pos); + + void dump_end(THD* thd); + + /* Reserve space in the replication event packet header: + * . slave semi-sync off: 1 byte - (0) + * . slave semi-sync on: 3 byte - (0, 0xef, 0/1} + * + * Input: + * packet - (IN) the header buffer + * + * Return: + * size of the bytes reserved for header + */ + int reserve_sync_header(String* packet); + + /* Update the sync bit in the packet header to indicate to the slave whether + * the master will wait for the reply of the event. If semi-sync is switched + * off and we detect that the slave is catching up, we switch semi-sync on. + * + * Input: + * THD - (IN) current dump thread + * packet - (IN) the packet containing the replication event + * log_file_name - (IN) the event ending position's file name + * log_file_pos - (IN) the event ending position's file offset + * need_sync - (IN) identify if flush_net is needed to call. + * server_id - (IN) master server id number + * + * Return: + * 0: success; non-zero: error + */ + int update_sync_header(THD* thd, unsigned char *packet, + const char *log_file_name, + my_off_t log_file_pos, + bool* need_sync); + + /* Called when a transaction finished writing binlog events. + * . update the 'largest' transactions' binlog event position + * . insert the ending position in the active transaction list if + * semi-sync is on + * + * Input: (the transaction events' ending binlog position) + * log_file_name - (IN) transaction ending position's file name + * log_file_pos - (IN) transaction ending position's file offset + * + * Return: + * 0: success; non-zero: error + */ + int write_tranx_in_binlog(const char* log_file_name, my_off_t log_file_pos); + + /* Read the slave's reply so that we know how much progress the slave makes + * on receive replication events. + */ + int flush_net(THD* thd, const char *event_buf); + + /* Export internal statistics for semi-sync replication. */ + void set_export_stats(); + + /* 'reset master' command is issued from the user and semi-sync need to + * go off for that. + */ + int after_reset_master(); + + /*called before reset master*/ + int before_reset_master(); + + void check_and_switch(); +}; + +enum rpl_semi_sync_master_wait_point_t { + SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC, + SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT, +}; + +extern Repl_semi_sync_master repl_semisync_master; +extern Ack_receiver ack_receiver; + +/* System and status variables for the master component */ +extern my_bool rpl_semi_sync_master_enabled; +extern my_bool rpl_semi_sync_master_status; +extern ulong rpl_semi_sync_master_wait_point; +extern ulong rpl_semi_sync_master_clients; +extern ulong rpl_semi_sync_master_timeout; +extern ulong rpl_semi_sync_master_trace_level; +extern ulong rpl_semi_sync_master_yes_transactions; +extern ulong rpl_semi_sync_master_no_transactions; +extern ulong rpl_semi_sync_master_off_times; +extern ulong rpl_semi_sync_master_wait_timeouts; +extern ulong rpl_semi_sync_master_timefunc_fails; +extern ulong rpl_semi_sync_master_num_timeouts; +extern ulong rpl_semi_sync_master_wait_sessions; +extern ulong rpl_semi_sync_master_wait_pos_backtraverse; +extern ulong rpl_semi_sync_master_avg_trx_wait_time; +extern ulong rpl_semi_sync_master_avg_net_wait_time; +extern ulonglong rpl_semi_sync_master_net_wait_num; +extern ulonglong rpl_semi_sync_master_trx_wait_num; +extern ulonglong rpl_semi_sync_master_net_wait_time; +extern ulonglong rpl_semi_sync_master_trx_wait_time; +extern unsigned long long rpl_semi_sync_master_request_ack; +extern unsigned long long rpl_semi_sync_master_get_ack; + +/* + This indicates whether we should keep waiting if no semi-sync slave + is available. + 0 : stop waiting if detected no avaialable semi-sync slave. + 1 (default) : keep waiting until timeout even no available semi-sync slave. +*/ +extern char rpl_semi_sync_master_wait_no_slave; +extern Repl_semi_sync_master repl_semisync_master; + +extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave; +extern PSI_stage_info stage_reading_semi_sync_ack; +extern PSI_stage_info stage_waiting_for_semi_sync_slave; + +void semi_sync_master_deinit(); + +#endif /* SEMISYNC_MASTER_H */ diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc new file mode 100644 index 00000000000..f986c629f65 --- /dev/null +++ b/sql/semisync_master_ack_receiver.cc @@ -0,0 +1,303 @@ +/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include <my_global.h> +#include "semisync_master.h" +#include "semisync_master_ack_receiver.h" + +extern PSI_mutex_key key_LOCK_ack_receiver; +extern PSI_cond_key key_COND_ack_receiver; +extern PSI_thread_key key_thread_ack_receiver; +extern Repl_semi_sync_master repl_semisync; + +/* Callback function of ack receive thread */ +pthread_handler_t ack_receive_handler(void *arg) +{ + Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg); + + my_thread_init(); + recv->run(); + my_thread_end(); + + return NULL; +} + +Ack_receiver::Ack_receiver() +{ + DBUG_ENTER("Ack_receiver::Ack_receiver"); + + m_status= ST_DOWN; + mysql_mutex_init(key_LOCK_ack_receiver, &m_mutex, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_ack_receiver, &m_cond, NULL); + m_pid= 0; + + DBUG_VOID_RETURN; +} + +void Ack_receiver::cleanup() +{ + DBUG_ENTER("Ack_receiver::~Ack_receiver"); + + stop(); + mysql_mutex_destroy(&m_mutex); + mysql_cond_destroy(&m_cond); + + DBUG_VOID_RETURN; +} + +bool Ack_receiver::start() +{ + DBUG_ENTER("Ack_receiver::start"); + + mysql_mutex_lock(&m_mutex); + if(m_status == ST_DOWN) + { + pthread_attr_t attr; + + m_status= ST_UP; + + if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) || + pthread_attr_init(&attr) != 0 || + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 || +#ifndef _WIN32 + pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 || +#endif + mysql_thread_create(key_thread_ack_receiver, &m_pid, + &attr, ack_receive_handler, this)) + { + sql_print_error("Failed to start semi-sync ACK receiver thread, " + " could not create thread(errno:%d)", errno); + + m_status= ST_DOWN; + mysql_mutex_unlock(&m_mutex); + + DBUG_RETURN(true); + } + (void) pthread_attr_destroy(&attr); + } + mysql_mutex_unlock(&m_mutex); + + DBUG_RETURN(false); +} + +void Ack_receiver::stop() +{ + DBUG_ENTER("Ack_receiver::stop"); + + mysql_mutex_lock(&m_mutex); + if (m_status == ST_UP) + { + m_status= ST_STOPPING; + mysql_cond_broadcast(&m_cond); + + while (m_status == ST_STOPPING) + mysql_cond_wait(&m_cond, &m_mutex); + + DBUG_ASSERT(m_status == ST_DOWN); + + m_pid= 0; + } + mysql_mutex_unlock(&m_mutex); + + DBUG_VOID_RETURN; +} + +bool Ack_receiver::add_slave(THD *thd) +{ + Slave *slave; + DBUG_ENTER("Ack_receiver::add_slave"); + + if (!(slave= new Slave)) + DBUG_RETURN(true); + + slave->thd= thd; + slave->vio= *thd->net.vio; + slave->vio.mysql_socket.m_psi= NULL; + slave->vio.read_timeout= 1; + + mysql_mutex_lock(&m_mutex); + m_slaves.push_back(slave); + m_slaves_changed= true; + mysql_cond_broadcast(&m_cond); + mysql_mutex_unlock(&m_mutex); + + DBUG_RETURN(false); +} + +void Ack_receiver::remove_slave(THD *thd) +{ + I_List_iterator<Slave> it(m_slaves); + Slave *slave; + DBUG_ENTER("Ack_receiver::remove_slave"); + + mysql_mutex_lock(&m_mutex); + + while ((slave= it++)) + { + if (slave->thd == thd) + { + delete slave; + m_slaves_changed= true; + break; + } + } + mysql_mutex_unlock(&m_mutex); + + DBUG_VOID_RETURN; +} + +inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage) +{ + MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__); +} + +inline void Ack_receiver::wait_for_slave_connection() +{ + set_stage_info(stage_waiting_for_semi_sync_slave); + mysql_cond_wait(&m_cond, &m_mutex); +} + +my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count) +{ + my_socket max_fd= INVALID_SOCKET; + Slave *slave; + I_List_iterator<Slave> it(m_slaves); + + *count= 0; + FD_ZERO(fds); + while ((slave= it++)) + { + (*count)++; + my_socket fd= slave->sock_fd(); + max_fd= (fd > max_fd ? fd : max_fd); + FD_SET(fd, fds); + } + + return max_fd; +} + +/* Auxilary function to initialize a NET object with given net buffer. */ +static void init_net(NET *net, unsigned char *buff, unsigned int buff_len) +{ + memset(net, 0, sizeof(NET)); + net->max_packet= buff_len; + net->buff= buff; + net->buff_end= buff + buff_len; + net->read_pos= net->buff; +} + +void Ack_receiver::run() +{ + // skip LOCK_global_system_variables due to the 3rd arg + THD *thd= new THD(next_thread_id(), false, true); + NET net; + unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH]; + fd_set read_fds; + my_socket max_fd= INVALID_SOCKET; + Slave *slave; + + my_thread_init(); + + DBUG_ENTER("Ack_receiver::run"); + + sql_print_information("Starting ack receiver thread"); + thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND; + thd->thread_stack= (char*) &thd; + thd->store_globals(); + thd->security_ctx->skip_grants(); + thread_safe_increment32(&service_thread_count); + thd->set_command(COM_DAEMON); + init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH); + + mysql_mutex_lock(&m_mutex); + m_slaves_changed= true; + mysql_mutex_unlock(&m_mutex); + + while (1) + { + fd_set fds; + int ret; + uint slave_count; + + mysql_mutex_lock(&m_mutex); + if (unlikely(m_status == ST_STOPPING)) + goto end; + + set_stage_info(stage_waiting_for_semi_sync_ack_from_slave); + if (unlikely(m_slaves_changed)) + { + if (unlikely(m_slaves.is_empty())) + { + wait_for_slave_connection(); + mysql_mutex_unlock(&m_mutex); + continue; + } + + max_fd= get_slave_sockets(&read_fds, &slave_count); + m_slaves_changed= false; + DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count, max_fd)); + } + + struct timeval tv= {1, 0}; + fds= read_fds; + /* select requires max fd + 1 for the first argument */ + ret= select(max_fd+1, &fds, NULL, NULL, &tv); + if (ret <= 0) + { + mysql_mutex_unlock(&m_mutex); + + ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret); + + if (ret == -1) + sql_print_information("Failed to select() on semi-sync dump sockets, " + "error: errno=%d", socket_errno); + /* Sleep 1us, so other threads can catch the m_mutex easily. */ + my_sleep(1); + continue; + } + + set_stage_info(stage_reading_semi_sync_ack); + I_List_iterator<Slave> it(m_slaves); + + while ((slave= it++)) + { + if (FD_ISSET(slave->sock_fd(), &fds)) + { + ulong len; + + net_clear(&net, 0); + net.vio= &slave->vio; + + len= my_net_read(&net); + if (likely(len != packet_error)) + repl_semisync_master.report_reply_packet(slave->server_id(), + net.read_pos, len); + else if (net.last_errno == ER_NET_READ_ERROR) + FD_CLR(slave->sock_fd(), &read_fds); + } + } + mysql_mutex_unlock(&m_mutex); + } +end: + sql_print_information("Stopping ack receiver thread"); + m_status= ST_DOWN; + delete thd; + thread_safe_decrement32(&service_thread_count); + signal_thd_deleted(); + mysql_cond_broadcast(&m_cond); + mysql_mutex_unlock(&m_mutex); + DBUG_VOID_RETURN; +} diff --git a/sql/semisync_master_ack_receiver.h b/sql/semisync_master_ack_receiver.h new file mode 100644 index 00000000000..619748a2159 --- /dev/null +++ b/sql/semisync_master_ack_receiver.h @@ -0,0 +1,119 @@ +/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#ifndef SEMISYNC_MASTER_ACK_RECEIVER_DEFINED +#define SEMISYNC_MASTER_ACK_RECEIVER_DEFINED + +#include "my_global.h" +#include "my_pthread.h" +#include "sql_class.h" +#include "semisync.h" +/** + Ack_receiver is responsible to control ack receive thread and maintain + slave information used by ack receive thread. + + There are mainly four operations on ack receive thread: + start: start ack receive thread + stop: stop ack receive thread + add_slave: maintain a new semisync slave's information + remove_slave: remove a semisync slave's information + */ +class Ack_receiver : public Repl_semi_sync_base +{ +public: + Ack_receiver(); + ~Ack_receiver() {} + void cleanup(); + /** + Notify ack receiver to receive acks on the dump session. + + It adds the given dump thread into the slave list and wakes + up ack thread if it is waiting for any slave coming. + + @param[in] thd THD of a dump thread. + + @return it return false if succeeds, otherwise true is returned. + */ + bool add_slave(THD *thd); + + /** + Notify ack receiver not to receive ack on the dump session. + + it removes the given dump thread from slave list. + + @param[in] thd THD of a dump thread. + */ + void remove_slave(THD *thd); + + /** + Start ack receive thread + + @return it return false if succeeds, otherwise true is returned. + */ + bool start(); + + /** + Stop ack receive thread + */ + void stop(); + + /** + The core of ack receive thread. + + It monitors all slaves' sockets and receives acks when they come. + */ + void run(); + + void set_trace_level(unsigned long trace_level) + { + m_trace_level= trace_level; + } +private: + enum status {ST_UP, ST_DOWN, ST_STOPPING}; + uint8 m_status; + /* + Protect m_status, m_slaves_changed and m_slaves. ack thread and other + session may access the variables at the same time. + */ + mysql_mutex_t m_mutex; + mysql_cond_t m_cond; + /* If slave list is updated(add or remove). */ + bool m_slaves_changed; + + class Slave :public ilink + { +public: + THD *thd; + Vio vio; + + my_socket sock_fd() { return vio.mysql_socket.fd; } + uint server_id() { return thd->variables.server_id; } + }; + + I_List<Slave> m_slaves; + + pthread_t m_pid; + +/* Declare them private, so no one can copy the object. */ + Ack_receiver(const Ack_receiver &ack_receiver); + Ack_receiver& operator=(const Ack_receiver &ack_receiver); + + void set_stage_info(const PSI_stage_info &stage); + void wait_for_slave_connection(); + my_socket get_slave_sockets(fd_set *fds, uint *count); +}; + +extern Ack_receiver ack_receiver; +#endif diff --git a/sql/semisync_slave.cc b/sql/semisync_slave.cc new file mode 100644 index 00000000000..2d77ee7b10c --- /dev/null +++ b/sql/semisync_slave.cc @@ -0,0 +1,251 @@ +/* Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc. + Use is subject to license terms. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#include <my_global.h> +#include "semisync_slave.h" + +Repl_semi_sync_slave repl_semisync_slave; + +my_bool rpl_semi_sync_slave_enabled= 0; + +char rpl_semi_sync_slave_delay_master; +my_bool rpl_semi_sync_slave_status= 0; +ulong rpl_semi_sync_slave_trace_level; + +/* + indicate whether or not the slave should send a reply to the master. + + This is set to true in repl_semi_slave_read_event if the current + event read is the last event of a transaction. And the value is + checked in repl_semi_slave_queue_event. +*/ +bool semi_sync_need_reply= false; +unsigned int rpl_semi_sync_slave_kill_conn_timeout; +unsigned long long rpl_semi_sync_slave_send_ack = 0; + +int Repl_semi_sync_slave::init_object() +{ + int result= 0; + + m_init_done = true; + + /* References to the parameter works after set_options(). */ + set_slave_enabled(rpl_semi_sync_slave_enabled); + set_trace_level(rpl_semi_sync_slave_trace_level); + set_delay_master(rpl_semi_sync_slave_delay_master); + set_kill_conn_timeout(rpl_semi_sync_slave_kill_conn_timeout); + + return result; +} + +int Repl_semi_sync_slave::slave_read_sync_header(const char *header, + unsigned long total_len, + int *semi_flags, + const char **payload, + unsigned long *payload_len) +{ + int read_res = 0; + DBUG_ENTER("Repl_semi_sync_slave::slave_read_sync_header"); + + if (rpl_semi_sync_slave_status) + { + if (DBUG_EVALUATE_IF("semislave_corrupt_log", 0, 1) + && (unsigned char)(header[0]) == k_packet_magic_num) + { + semi_sync_need_reply = (header[1] & k_packet_flag_sync); + *payload_len = total_len - 2; + *payload = header + 2; + + DBUG_PRINT("semisync", ("%s: reply - %d", + "Repl_semi_sync_slave::slave_read_sync_header", + semi_sync_need_reply)); + + if (semi_sync_need_reply) + *semi_flags |= SEMI_SYNC_NEED_ACK; + if (is_delay_master()) + *semi_flags |= SEMI_SYNC_SLAVE_DELAY_SYNC; + } + else + { + sql_print_error("Missing magic number for semi-sync packet, packet " + "len: %lu", total_len); + read_res = -1; + } + } else { + *payload= header; + *payload_len= total_len; + } + + DBUG_RETURN(read_res); +} + +int Repl_semi_sync_slave::slave_start(Master_info *mi) +{ + bool semi_sync= get_slave_enabled(); + + sql_print_information("Slave I/O thread: Start %s replication to\ + master '%s@%s:%d' in log '%s' at position %lu", + semi_sync ? "semi-sync" : "asynchronous", + const_cast<char *>(mi->user), mi->host, mi->port, + const_cast<char *>(mi->master_log_name), + (unsigned long)(mi->master_log_pos)); + + if (semi_sync && !rpl_semi_sync_slave_status) + rpl_semi_sync_slave_status= 1; + + /*clear the counter*/ + rpl_semi_sync_slave_send_ack= 0; + return 0; +} + +int Repl_semi_sync_slave::slave_stop(Master_info *mi) +{ + if (rpl_semi_sync_slave_status) + rpl_semi_sync_slave_status= 0; + if (get_slave_enabled()) + kill_connection(mi->mysql); + return 0; +} + +int Repl_semi_sync_slave::reset_slave(Master_info *mi) +{ + return 0; +} + +void Repl_semi_sync_slave::kill_connection(MYSQL *mysql) +{ + if (!mysql) + return; + + char kill_buffer[30]; + MYSQL *kill_mysql = NULL; + kill_mysql = mysql_init(kill_mysql); + mysql_options(kill_mysql, MYSQL_OPT_CONNECT_TIMEOUT, &m_kill_conn_timeout); + mysql_options(kill_mysql, MYSQL_OPT_READ_TIMEOUT, &m_kill_conn_timeout); + mysql_options(kill_mysql, MYSQL_OPT_WRITE_TIMEOUT, &m_kill_conn_timeout); + + bool ret= (!mysql_real_connect(kill_mysql, mysql->host, + mysql->user, mysql->passwd,0, mysql->port, mysql->unix_socket, 0)); + if (DBUG_EVALUATE_IF("semisync_slave_failed_kill", 1, 0) || ret) + { + sql_print_information("cannot connect to master to kill slave io_thread's " + "connection"); + if (!ret) + mysql_close(kill_mysql); + return; + } + uint kill_buffer_length = my_snprintf(kill_buffer, 30, "KILL %lu", + mysql->thread_id); + mysql_real_query(kill_mysql, kill_buffer, kill_buffer_length); + mysql_close(kill_mysql); +} + +int Repl_semi_sync_slave::request_transmit(Master_info *mi) +{ + MYSQL *mysql= mi->mysql; + MYSQL_RES *res= 0; + MYSQL_ROW row; + const char *query; + + if (!get_slave_enabled()) + return 0; + + query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'"; + if (mysql_real_query(mysql, query, strlen(query)) || + !(res= mysql_store_result(mysql))) + { + sql_print_error("Execution failed on master: %s, error :%s", query, mysql_error(mysql)); + return 1; + } + + row= mysql_fetch_row(res); + if (DBUG_EVALUATE_IF("master_not_support_semisync", 1, 0) + || !row) + { + /* Master does not support semi-sync */ + sql_print_warning("Master server does not support semi-sync, " + "fallback to asynchronous replication"); + rpl_semi_sync_slave_status= 0; + mysql_free_result(res); + return 0; + } + mysql_free_result(res); + + /* + Tell master dump thread that we want to do semi-sync + replication + */ + query= "SET @rpl_semi_sync_slave= 1"; + if (mysql_real_query(mysql, query, strlen(query))) + { + sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed"); + return 1; + } + mysql_free_result(mysql_store_result(mysql)); + rpl_semi_sync_slave_status= 1; + + return 0; +} + +int Repl_semi_sync_slave::slave_reply(Master_info *mi) +{ + MYSQL* mysql= mi->mysql; + const char *binlog_filename= const_cast<char *>(mi->master_log_name); + my_off_t binlog_filepos= mi->master_log_pos; + + NET *net= &mysql->net; + uchar reply_buffer[REPLY_MAGIC_NUM_LEN + + REPLY_BINLOG_POS_LEN + + REPLY_BINLOG_NAME_LEN]; + int reply_res = 0; + int name_len = strlen(binlog_filename); + + DBUG_ENTER("Repl_semi_sync_slave::slave_reply"); + + if (rpl_semi_sync_slave_status && semi_sync_need_reply) + { + /* Prepare the buffer of the reply. */ + reply_buffer[REPLY_MAGIC_NUM_OFFSET] = k_packet_magic_num; + int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos); + memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET, + binlog_filename, + name_len + 1 /* including trailing '\0' */); + + DBUG_PRINT("semisync", ("%s: reply (%s, %lu)", + "Repl_semi_sync_slave::slave_reply", + binlog_filename, (ulong)binlog_filepos)); + + net_clear(net, 0); + /* Send the reply. */ + reply_res = my_net_write(net, reply_buffer, + name_len + REPLY_BINLOG_NAME_OFFSET); + if (!reply_res) + { + reply_res = DBUG_EVALUATE_IF("semislave_failed_net_flush", 1, net_flush(net)); + if (reply_res) + sql_print_error("Semi-sync slave net_flush() reply failed"); + rpl_semi_sync_slave_send_ack++; + } + else + { + sql_print_error("Semi-sync slave send reply failed: %s (%d)", + net->last_error, net->last_errno); + } + } + + DBUG_RETURN(reply_res); +} diff --git a/sql/semisync_slave.h b/sql/semisync_slave.h new file mode 100644 index 00000000000..d65262f151d --- /dev/null +++ b/sql/semisync_slave.h @@ -0,0 +1,115 @@ +/* Copyright (c) 2006 MySQL AB, 2009 Sun Microsystems, Inc. + Use is subject to license terms. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#ifndef SEMISYNC_SLAVE_H +#define SEMISYNC_SLAVE_H + +#include "semisync.h" +#include "my_global.h" +#include "sql_priv.h" +#include "rpl_mi.h" +#include "mysql.h" + +class Master_info; + +/** + The extension class for the slave of semi-synchronous replication +*/ +class Repl_semi_sync_slave + :public Repl_semi_sync_base { +public: + Repl_semi_sync_slave() :m_slave_enabled(false) {} + ~Repl_semi_sync_slave() {} + + void set_trace_level(unsigned long trace_level) { + m_trace_level = trace_level; + } + + /* Initialize this class after MySQL parameters are initialized. this + * function should be called once at bootstrap time. + */ + int init_object(); + + bool get_slave_enabled() { + return m_slave_enabled; + } + + void set_slave_enabled(bool enabled) { + m_slave_enabled = enabled; + } + + bool is_delay_master(){ + return m_delay_master; + } + + void set_delay_master(bool enabled) { + m_delay_master = enabled; + } + + void set_kill_conn_timeout(unsigned int timeout) { + m_kill_conn_timeout = timeout; + } + + /* A slave reads the semi-sync packet header and separate the metadata + * from the payload data. + * + * Input: + * header - (IN) packet header pointer + * total_len - (IN) total packet length: metadata + payload + * semi_flags - (IN) store flags: SEMI_SYNC_SLAVE_DELAY_SYNC and + SEMI_SYNC_NEED_ACK + * payload - (IN) payload: the replication event + * payload_len - (IN) payload length + * + * Return: + * 0: success; non-zero: error + */ + int slave_read_sync_header(const char *header, unsigned long total_len, + int *semi_flags, + const char **payload, unsigned long *payload_len); + + /* A slave replies to the master indicating its replication process. It + * indicates that the slave has received all events before the specified + * binlog position. + */ + int slave_reply(Master_info* mi); + int slave_start(Master_info *mi); + int slave_stop(Master_info *mi); + int request_transmit(Master_info*); + void kill_connection(MYSQL *mysql); + int reset_slave(Master_info *mi); + +private: + /* True when init_object has been called */ + bool m_init_done; + bool m_slave_enabled; /* semi-sycn is enabled on the slave */ + bool m_delay_master; + unsigned int m_kill_conn_timeout; +}; + + +/* System and status variables for the slave component */ +extern my_bool rpl_semi_sync_slave_enabled; +extern my_bool rpl_semi_sync_slave_status; +extern ulong rpl_semi_sync_slave_trace_level; +extern Repl_semi_sync_slave repl_semisync_slave; + +extern char rpl_semi_sync_slave_delay_master; +extern unsigned int rpl_semi_sync_slave_kill_conn_timeout; +extern unsigned long long rpl_semi_sync_slave_send_ack; + +#endif /* SEMISYNC_SLAVE_H */ diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index 730910ed111..2b3a4640008 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -6850,8 +6850,8 @@ ER_CANT_SET_GTID_NEXT_WHEN_OWNING_GTID eng "GTID_NEXT cannot be changed by a client that owns a GTID. The client owns %s. Ownership is released on COMMIT or ROLLBACK" ER_UNKNOWN_EXPLAIN_FORMAT - eng "Unknown EXPLAIN format name: '%s'" - rus "Неизвестное имя формата команды EXPLAIN: '%s'" + eng "Unknown %s format name: '%s'" + rus "Неизвестное имя формата команды %s: '%s'" ER_CANT_EXECUTE_IN_READ_ONLY_TRANSACTION 25006 eng "Cannot execute statement in a READ ONLY transaction" diff --git a/sql/slave.cc b/sql/slave.cc index a57312998f1..f36af66f780 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -43,7 +43,6 @@ #include <ssl_compat.h> #include <mysqld_error.h> #include <mysys_err.h> -#include "rpl_handler.h" #include <signal.h> #include <mysql.h> #include <myisam.h> @@ -61,6 +60,7 @@ #include "debug_sync.h" #include "rpl_parallel.h" #include "sql_show.h" +#include "semisync_slave.h" #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -3586,11 +3586,9 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, if (opt_log_slave_updates && opt_replicate_annotate_row_events) binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT; - if (RUN_HOOK(binlog_relay_io, - before_request_transmit, - (thd, mi, binlog_flags))) + if (repl_semisync_slave.request_transmit(mi)) DBUG_RETURN(1); - + // TODO if big log files: Change next to int8store() int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); @@ -4615,7 +4613,8 @@ pthread_handler_t handle_slave_io(void *arg) } - if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi))) + if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0) + || repl_semisync_slave.slave_start(mi)) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER_THD(thd, ER_SLAVE_FATAL_ERROR), @@ -4805,9 +4804,10 @@ Stopping slave I/O thread due to out-of-memory error from master"); retry_count=0; // ok event, reset retry counter THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log); event_buf= (const char*)mysql->net.read_pos + 1; - if (RUN_HOOK(binlog_relay_io, after_read_event, - (thd, mi,(const char*)mysql->net.read_pos + 1, - event_len, &event_buf, &event_len))) + mi->semi_ack= 0; + if (repl_semisync_slave. + slave_read_sync_header((const char*)mysql->net.read_pos + 1, event_len, + &(mi->semi_ack), &event_buf, &event_len)) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER_THD(thd, ER_SLAVE_FATAL_ERROR), @@ -4856,9 +4856,6 @@ Stopping slave I/O thread due to out-of-memory error from master"); tokenamount -= network_read_len; } - /* XXX: 'synced' should be updated by queue_event to indicate - whether event has been synced to disk */ - bool synced= 0; if (queue_event(mi, event_buf, event_len)) { mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, @@ -4867,8 +4864,8 @@ Stopping slave I/O thread due to out-of-memory error from master"); goto err; } - if (RUN_HOOK(binlog_relay_io, after_queue_event, - (thd, mi, event_buf, event_len, synced))) + if (rpl_semi_sync_slave_status && (mi->semi_ack & SEMI_SYNC_NEED_ACK) && + repl_semisync_slave.slave_reply(mi)) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER_THD(thd, ER_SLAVE_FATAL_ERROR), @@ -4877,7 +4874,16 @@ Stopping slave I/O thread due to out-of-memory error from master"); } if (mi->using_gtid == Master_info::USE_GTID_NO && - flush_master_info(mi, TRUE, TRUE)) + /* + If rpl_semi_sync_slave_delay_master is enabled, we will flush + master info only when ack is needed. This may lead to at least one + group transaction delay but affords better performance improvement. + */ + (!repl_semisync_slave.get_slave_enabled() || + (!(mi->semi_ack & SEMI_SYNC_SLAVE_DELAY_SYNC) || + (mi->semi_ack & (SEMI_SYNC_NEED_ACK)))) && + (DBUG_EVALUATE_IF("failed_flush_master_info", 1, 0) || + flush_master_info(mi, TRUE, TRUE))) { sql_print_error("Failed to flush master info file"); goto err; @@ -4931,7 +4937,7 @@ err: IO_RPL_LOG_NAME, mi->master_log_pos, tmp.c_ptr_safe()); } - RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi)); + repl_semisync_slave.slave_stop(mi); thd->reset_query(); thd->reset_db(NULL, 0); if (mysql) @@ -6253,7 +6259,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mysql_mutex_unlock(log_lock); goto err; } - rli->relay_log.signal_update(); + rli->relay_log.signal_relay_log_update(); mysql_mutex_unlock(log_lock); mi->gtid_reconnect_event_skip_count= 0; @@ -6798,7 +6804,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) if (got_gtid_event) rli->ign_gtids.update(&event_gtid); } - rli->relay_log.signal_update(); // the slave SQL thread needs to re-check + // the slave SQL thread needs to re-check + rli->relay_log.signal_relay_log_update(); DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored", (ulong) mi->master_log_pos, uint4korr(buf + SERVER_ID_OFFSET))); } @@ -7308,7 +7315,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) MYSQL_BIN_LOG::open() will write the buffered description event. */ old_pos= rli->event_relay_log_pos; - if ((ev= Log_event::read_log_event(cur_log,0, + if ((ev= Log_event::read_log_event(cur_log, rli->relay_log.description_event_for_exec, opt_slave_sql_verify_checksum))) diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 8a00b7832c3..00144a9a12d 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -705,7 +705,7 @@ extern "C" void thd_kill_timeout(THD* thd) thd->awake(KILL_TIMEOUT); } -THD::THD(my_thread_id id, bool is_wsrep_applier) +THD::THD(my_thread_id id, bool is_wsrep_applier, bool skip_global_sys_var_lock) :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), rli_fake(0), rgi_fake(0), rgi_slave(NULL), @@ -892,7 +892,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); - init(); + init(skip_global_sys_var_lock); #if defined(ENABLED_PROFILING) profiling.set_thd(this); #endif @@ -1263,10 +1263,11 @@ const Type_handler *THD::type_handler_for_date() const Init common variables that has to be reset on start and on change_user */ -void THD::init(void) +void THD::init(bool skip_lock) { DBUG_ENTER("thd::init"); - mysql_mutex_lock(&LOCK_global_system_variables); + if (!skip_lock) + mysql_mutex_lock(&LOCK_global_system_variables); plugin_thdvar_init(this); /* plugin_thd_var_init() sets variables= global_system_variables, which @@ -1279,8 +1280,8 @@ void THD::init(void) ::strmake(default_master_connection_buff, global_system_variables.default_master_connection.str, variables.default_master_connection.length); - - mysql_mutex_unlock(&LOCK_global_system_variables); + if (!skip_lock) + mysql_mutex_unlock(&LOCK_global_system_variables); user_time.val= start_time= start_time_sec_part= 0; @@ -4193,7 +4194,8 @@ my_bool thd_net_is_killed() void thd_increment_bytes_received(void *thd, ulong length) { - ((THD*) thd)->status_var.bytes_received+= length; + if (thd != NULL) // MDEV-13073 Ack collector having NULL + ((THD*) thd)->status_var.bytes_received+= length; } diff --git a/sql/sql_class.h b/sql/sql_class.h index 41b8efc4464..b56729737de 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1599,7 +1599,8 @@ enum enum_thread_type SYSTEM_THREAD_EVENT_WORKER= 16, SYSTEM_THREAD_BINLOG_BACKGROUND= 32, SYSTEM_THREAD_SLAVE_BACKGROUND= 64, - SYSTEM_THREAD_GENERIC= 128 + SYSTEM_THREAD_GENERIC= 128, + SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND= 256 }; inline char const * @@ -1615,6 +1616,7 @@ show_system_thread(enum_thread_type thread) RETURN_NAME_AS_STRING(SYSTEM_THREAD_EVENT_SCHEDULER); RETURN_NAME_AS_STRING(SYSTEM_THREAD_EVENT_WORKER); RETURN_NAME_AS_STRING(SYSTEM_THREAD_SLAVE_BACKGROUND); + RETURN_NAME_AS_STRING(SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND); default: sprintf(buf, "<UNKNOWN SYSTEM THREAD: %d>", thread); return buf; @@ -2291,7 +2293,8 @@ public: /* Needed by MariaDB semi sync replication */ Trans_binlog_info *semisync_info; - + /* If this is a semisync slave connection. */ + bool semi_sync_slave; ulonglong client_capabilities; /* What the client supports */ ulong max_client_packet_length; @@ -3177,11 +3180,20 @@ public: /* Debug Sync facility. See debug_sync.cc. */ struct st_debug_sync_control *debug_sync_control; #endif /* defined(ENABLED_DEBUG_SYNC) */ - THD(my_thread_id id, bool is_wsrep_applier= false); + /** + @param id thread identifier + @param is_wsrep_applier thread type + @param skip_lock instruct whether @c LOCK_global_system_variables + is already locked, to not acquire it then. + */ + THD(my_thread_id id, bool is_wsrep_applier= false, bool skip_lock= false); ~THD(); - - void init(void); + /** + @param skip_lock instruct whether @c LOCK_global_system_variables + is already locked, to not acquire it then. + */ + void init(bool skip_lock= false); /* Initialize memory roots necessary for query processing and (!) pre-allocate memory for it. We can't do that in THD constructor because diff --git a/sql/sql_lex.h b/sql/sql_lex.h index 221d3d76f69..79c0f377a20 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -746,7 +746,7 @@ public: /* thread handler */ THD *thd; /* - SELECT_LEX for hidden SELECT in onion which process global + SELECT_LEX for hidden SELECT in union which process global ORDER BY and LIMIT */ st_select_lex *fake_select_lex; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 01b77f49b7d..4afecb67ec9 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -82,7 +82,6 @@ #include <m_ctype.h> #include <myisam.h> #include <my_dir.h> -#include "rpl_handler.h" #include "rpl_mi.h" #include "sql_digest.h" @@ -2399,11 +2398,12 @@ com_multi_end: THD_STAGE_INFO(thd, stage_cleaning_up); thd->reset_query(); - thd->set_examined_row_count(0); // For processlist - thd->set_command(COM_SLEEP); /* Performance Schema Interface instrumentation, end */ MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); + thd->set_examined_row_count(0); // For processlist + thd->set_command(COM_SLEEP); + thd->m_statement_psi= NULL; thd->m_digest= NULL; @@ -5114,6 +5114,9 @@ end_with_restore_list: { List<set_var_base> *lex_var_list= &lex->var_list; + if (check_dependencies_in_with_clauses(thd->lex->with_clauses_list)) + goto error; + if ((check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE) || open_and_lock_tables(thd, all_tables, TRUE, 0))) goto error; diff --git a/sql/sql_partition.cc b/sql/sql_partition.cc index 2dc8ad498ca..879a29f4e42 100644 --- a/sql/sql_partition.cc +++ b/sql/sql_partition.cc @@ -6749,10 +6749,7 @@ uint fast_alter_partition_table(THD *thd, TABLE *table, lpt->part_info= part_info; lpt->alter_info= alter_info; lpt->create_info= create_info; - lpt->db_options= create_info->table_options; - if (create_info->row_type != ROW_TYPE_FIXED && - create_info->row_type != ROW_TYPE_DEFAULT) - lpt->db_options|= HA_OPTION_PACK_RECORD; + lpt->db_options= create_info->table_options_with_row_type(); lpt->table= table; lpt->key_info_buffer= 0; lpt->key_count= 0; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 08e9dcf3fe6..1d6aa0aaab1 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -28,9 +28,9 @@ #include "log_event.h" #include "rpl_filter.h" #include <my_dir.h> -#include "rpl_handler.h" #include "debug_sync.h" -#include "log.h" // get_gtid_list_event +#include "semisync_master.h" +#include "semisync_slave.h" enum enum_gtid_until_state { GTID_UNTIL_NOT_DONE, @@ -160,6 +160,7 @@ struct binlog_send_info { bool clear_initial_log_pos; bool should_stop; + size_t dirlen; binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn) @@ -313,16 +314,43 @@ static int reset_transmit_packet(binlog_send_info *info, ushort flags, packet->length(0); packet->set("\0", 1, &my_charset_bin); - if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet))) + if (info->thd->semi_sync_slave) { - info->error= ER_UNKNOWN_ERROR; - *errmsg= "Failed to run hook 'reserve_header'"; - ret= 1; + if (repl_semisync_master.reserve_sync_header(packet)) + { + info->error= ER_UNKNOWN_ERROR; + *errmsg= "Failed to run hook 'reserve_header'"; + ret= 1; + } } + *ev_offset= packet->length(); return ret; } +int get_user_var_int(const char *name, + long long int *value, int *null_value) +{ + bool null_val; + user_var_entry *entry= + (user_var_entry*) my_hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + *value= entry->val_int(&null_val); + if (null_value) + *null_value= null_val; + return 0; +} + +inline bool is_semi_sync_slave() +{ + int null_value; + long long val= 0; + get_user_var_int("rpl_semi_sync_slave", &val, &null_value); + return val; +} + static int send_file(THD *thd) { NET* net = &thd->net; @@ -1606,6 +1634,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, enum enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg; slave_connection_state *gtid_state= &info->gtid_state; slave_connection_state *until_gtid_state= info->until_gtid_state; + bool need_sync= false; if (event_type == GTID_LIST_EVENT && info->using_gtid_state && until_gtid_state) @@ -1916,8 +1945,10 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave); pos= my_b_tell(log); - if (RUN_HOOK(binlog_transmit, before_send_event, - (info->thd, info->flags, packet, info->log_file_name, pos))) + if (repl_semisync_master.update_sync_header(info->thd, + (uchar*) packet->c_ptr(), + info->log_file_name + info->dirlen, + pos, &need_sync)) { info->error= ER_UNKNOWN_ERROR; return "run 'before_send_event' hook failed"; @@ -1939,8 +1970,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, } } - if (RUN_HOOK(binlog_transmit, after_send_event, - (info->thd, info->flags, packet))) + if (need_sync && repl_semisync_master.flush_net(info->thd, packet->c_ptr())) { info->error= ER_UNKNOWN_ERROR; return "Failed to run hook 'after_send_event'"; @@ -2370,7 +2400,7 @@ static int wait_new_events(binlog_send_info *info, /* in */ PSI_stage_info old_stage; mysql_bin_log.lock_binlog_end_pos(); - info->thd->ENTER_COND(mysql_bin_log.get_log_cond(), + info->thd->ENTER_COND(mysql_bin_log.get_bin_log_cond(), mysql_bin_log.get_binlog_end_pos_lock(), &stage_master_has_sent_all_binlog_to_slave, &old_stage); @@ -2681,7 +2711,7 @@ static int send_one_binlog_file(binlog_send_info *info, /** end of file or error */ return (int)end_pos; } - + info->dirlen= dirname_length(info->log_file_name); /** * send events from current position up to end_pos */ @@ -2703,6 +2733,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name); binlog_send_info *info= &infoobj; + bool has_transmit_started= false; int old_max_allowed_packet= thd->variables.max_allowed_packet; thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET; @@ -2715,11 +2746,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, if (init_binlog_sender(info, &linfo, log_ident, &pos)) goto err; - /* - run hook first when all check has been made that slave seems to - be requesting a reasonable position. i.e when transmit actually starts - */ - if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) + has_transmit_started= true; + + /* Check if the dump thread is created by a slave with semisync enabled. */ + thd->semi_sync_slave = is_semi_sync_slave(); + if (repl_semisync_master.dump_start(thd, log_ident, pos)) { info->errmsg= "Failed to run hook 'transmit_start'"; info->error= ER_UNKNOWN_ERROR; @@ -2841,7 +2872,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, err: THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination); - RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); + if (has_transmit_started) + { + repl_semisync_master.dump_end(thd); + } if (info->thd->killed == KILL_SLAVE_SAME_ID) { @@ -3307,7 +3341,8 @@ int reset_slave(THD *thd, Master_info* mi) else if (global_system_variables.log_warnings > 1) sql_print_information("Deleted Master_info file '%s'.", fname); - RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); + if (rpl_semi_sync_slave_enabled) + repl_semisync_slave.reset_slave(mi); err: mi->unlock_slave_threads(); if (error) @@ -3809,11 +3844,13 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len, return 1; } - if (mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len, - next_log_number)) - return 1; - RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); - return 0; + bool ret= 0; + /* Temporarily disable master semisync before reseting master. */ + repl_semisync_master.before_reset_master(); + ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len, + next_log_number); + repl_semisync_master.after_reset_master(); + return ret; } @@ -3930,7 +3967,7 @@ bool mysql_show_binlog_events(THD* thd) my_off_t scan_pos = BIN_LOG_HEADER_SIZE; while (scan_pos < pos) { - ev= Log_event::read_log_event(&log, (mysql_mutex_t*)0, description_event, + ev= Log_event::read_log_event(&log, description_event, opt_master_verify_checksum); scan_pos = my_b_tell(&log); if (ev == NULL || !ev->is_valid()) @@ -3964,7 +4001,7 @@ bool mysql_show_binlog_events(THD* thd) my_b_seek(&log, pos); for (event_count = 0; - (ev = Log_event::read_log_event(&log, (mysql_mutex_t*) 0, + (ev = Log_event::read_log_event(&log, description_event, opt_master_verify_checksum)); ) { diff --git a/sql/sql_table.cc b/sql/sql_table.cc index ebde0b6f726..dafe838bc99 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -4509,10 +4509,7 @@ handler *mysql_create_frm_image(THD *thd, set_table_default_charset(thd, create_info, (char*) db); - db_options= create_info->table_options; - if (create_info->row_type == ROW_TYPE_DYNAMIC || - create_info->row_type == ROW_TYPE_PAGE) - db_options|= HA_OPTION_PACK_RECORD; + db_options= create_info->table_options_with_row_type(); if (!(file= get_new_handler((TABLE_SHARE*) 0, thd->mem_root, create_info->db_type))) diff --git a/sql/sql_view.cc b/sql/sql_view.cc index 21ceb6ce19b..50ea5581cc9 100644 --- a/sql/sql_view.cc +++ b/sql/sql_view.cc @@ -430,14 +430,14 @@ bool mysql_create_view(THD *thd, TABLE_LIST *views, lex->link_first_table_back(view, link_to_local); view->open_type= OT_BASE_ONLY; - WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) - if (check_dependencies_in_with_clauses(lex->with_clauses_list)) { res= TRUE; goto err; } + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) + /* ignore lock specs for CREATE statement */ diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index c8d54b967f0..63514d7257d 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -1726,7 +1726,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); opt_default_time_precision case_stmt_body opt_bin_mod opt_for_system_time_clause opt_if_exists_table_element opt_if_not_exists_table_element - opt_recursive + opt_recursive opt_format_xid %type <object_ddl_options> create_or_replace @@ -17497,9 +17497,26 @@ xa: { Lex->sql_command = SQLCOM_XA_ROLLBACK; } - | XA_SYM RECOVER_SYM + | XA_SYM RECOVER_SYM opt_format_xid { Lex->sql_command = SQLCOM_XA_RECOVER; + Lex->verbose= $3; + } + ; + +opt_format_xid: + /* empty */ { $$= false; } + | FORMAT_SYM '=' ident_or_text + { + if (!my_strcasecmp(system_charset_info, $3.str, "SQL")) + $$= true; + else if (!my_strcasecmp(system_charset_info, $3.str, "RAW")) + $$= false; + else + { + my_yyabort_error((ER_UNKNOWN_EXPLAIN_FORMAT, MYF(0), "XA RECOVER", $3.str)); + $$= false; + } } ; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 9b534fabb5b..4cb1edc3d31 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -35,6 +35,7 @@ #include "sql_priv.h" #include "sql_class.h" // set_var.h: THD #include "sys_vars.ic" +#include "my_sys.h" #include "events.h" #include <thr_alarm.h> @@ -61,6 +62,8 @@ #include "sql_repl.h" #include "opt_range.h" #include "rpl_parallel.h" +#include "semisync_master.h" +#include "semisync_slave.h" #include <ssl_compat.h> /* @@ -3093,8 +3096,174 @@ static Sys_var_replicate_events_marked_for_skip Replicate_events_marked_for_skip "the slave).", GLOBAL_VAR(opt_replicate_events_marked_for_skip), CMD_LINE(REQUIRED_ARG), replicate_events_marked_for_skip_names, DEFAULT(RPL_SKIP_REPLICATE)); -#endif +/* new options for semisync */ + +static bool fix_rpl_semi_sync_master_enabled(sys_var *self, THD *thd, + enum_var_type type) +{ + if (rpl_semi_sync_master_enabled) + { + if (repl_semisync_master.enable_master() != 0) + rpl_semi_sync_master_enabled= false; + else if (ack_receiver.start()) + { + repl_semisync_master.disable_master(); + rpl_semi_sync_master_enabled= false; + } + } + else + { + if (repl_semisync_master.disable_master() != 0) + rpl_semi_sync_master_enabled= true; + if (!rpl_semi_sync_master_enabled) + ack_receiver.stop(); + } + return false; +} + +static bool fix_rpl_semi_sync_master_timeout(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_master.set_wait_timeout(rpl_semi_sync_master_timeout); + return false; +} + +static bool fix_rpl_semi_sync_master_trace_level(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_master.set_trace_level(rpl_semi_sync_master_trace_level); + ack_receiver.set_trace_level(rpl_semi_sync_master_trace_level); + return false; +} + +static bool fix_rpl_semi_sync_master_wait_point(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_master.set_wait_point(rpl_semi_sync_master_wait_point); + return false; +} + +static bool fix_rpl_semi_sync_master_wait_no_slave(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_master.check_and_switch(); + return false; +} + +static Sys_var_mybool Sys_semisync_master_enabled( + "rpl_semi_sync_master_enabled", + "Enable semi-synchronous replication master (disabled by default).", + GLOBAL_VAR(rpl_semi_sync_master_enabled), + CMD_LINE(OPT_ARG), DEFAULT(FALSE), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_master_enabled)); + +static Sys_var_ulong Sys_semisync_master_timeout( + "rpl_semi_sync_master_timeout", + "The timeout value (in ms) for semi-synchronous replication in the " + "master", + GLOBAL_VAR(rpl_semi_sync_master_timeout), + CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,~0L),DEFAULT(10000),BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_master_timeout)); + +static Sys_var_mybool Sys_semisync_master_wait_no_slave( + "rpl_semi_sync_master_wait_no_slave", + "Wait until timeout when no semi-synchronous replication slave " + "available (enabled by default).", + GLOBAL_VAR(rpl_semi_sync_master_wait_no_slave), + CMD_LINE(OPT_ARG), DEFAULT(TRUE), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_master_wait_no_slave)); + +static Sys_var_ulong Sys_semisync_master_trace_level( + "rpl_semi_sync_master_trace_level", + "The tracing level for semi-sync replication.", + GLOBAL_VAR(rpl_semi_sync_master_trace_level), + CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,~0L),DEFAULT(32),BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_master_trace_level)); + +static const char *repl_semisync_wait_point[]= +{"AFTER_SYNC", "AFTER_COMMIT", NullS}; + +static Sys_var_enum Sys_semisync_master_wait_point( + "rpl_semi_sync_master_wait_point", + "Should transaction wait for semi-sync ack after having synced binlog, " + "or after having committed in storage engine.", + GLOBAL_VAR(rpl_semi_sync_master_wait_point), CMD_LINE(REQUIRED_ARG), + repl_semisync_wait_point, DEFAULT(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG,ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_master_wait_point)); + +static bool fix_rpl_semi_sync_slave_enabled(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_slave.set_slave_enabled(rpl_semi_sync_slave_enabled != 0); + return false; +} + +static bool fix_rpl_semi_sync_slave_trace_level(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_slave.set_trace_level(rpl_semi_sync_slave_trace_level); + return false; +} + +static bool fix_rpl_semi_sync_slave_delay_master(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_slave.set_delay_master(rpl_semi_sync_slave_delay_master); + return false; +} + +static bool fix_rpl_semi_sync_slave_kill_conn_timeout(sys_var *self, THD *thd, + enum_var_type type) +{ + repl_semisync_slave. + set_kill_conn_timeout(rpl_semi_sync_slave_kill_conn_timeout); + return false; +} + +static Sys_var_mybool Sys_semisync_slave_enabled( + "rpl_semi_sync_slave_enabled", + "Enable semi-synchronous replication slave (disabled by default).", + GLOBAL_VAR(rpl_semi_sync_slave_enabled), + CMD_LINE(OPT_ARG), DEFAULT(FALSE), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_slave_enabled)); + +static Sys_var_ulong Sys_semisync_slave_trace_level( + "rpl_semi_sync_slave_trace_level", + "The tracing level for semi-sync replication.", + GLOBAL_VAR(rpl_semi_sync_slave_trace_level), + CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,~0L),DEFAULT(32),BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_slave_trace_level)); + +static Sys_var_mybool Sys_semisync_slave_delay_master( + "rpl_semi_sync_slave_delay_master", + "Only write master info file when ack is needed.", + GLOBAL_VAR(rpl_semi_sync_slave_delay_master), + CMD_LINE(OPT_ARG), DEFAULT(FALSE), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_slave_delay_master)); + +static Sys_var_uint Sys_semisync_slave_kill_conn_timeout( + "rpl_semi_sync_slave_kill_conn_timeout", + "Timeout for the mysql connection used to kill the slave io_thread's " + "connection on master. This timeout comes into play when stop slave " + "is executed.", + GLOBAL_VAR(rpl_semi_sync_slave_kill_conn_timeout), + CMD_LINE(OPT_ARG), + VALID_RANGE(0, UINT_MAX), DEFAULT(5), BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_rpl_semi_sync_slave_kill_conn_timeout)); +#endif /* HAVE_REPLICATION */ static Sys_var_ulong Sys_slow_launch_time( "slow_launch_time", @@ -3625,27 +3794,6 @@ static Sys_var_charptr Sys_version_source_revision( CMD_LINE_HELP_ONLY, IN_SYSTEM_CHARSET, DEFAULT(SOURCE_REVISION)); -static char *guess_malloc_library() -{ - if (strcmp(MALLOC_LIBRARY, "system") == 0) - { -#ifdef HAVE_DLOPEN - typedef int (*mallctl_type)(const char*, void*, size_t*, void*, size_t); - mallctl_type mallctl_func; - mallctl_func= (mallctl_type)dlsym(RTLD_DEFAULT, "mallctl"); - if (mallctl_func) - { - static char buf[128]; - char *ver; - size_t len = sizeof(ver); - mallctl_func("version", &ver, &len, NULL, 0); - strxnmov(buf, sizeof(buf)-1, "jemalloc ", ver, NULL); - return buf; - } -#endif - } - return const_cast<char*>(MALLOC_LIBRARY); -} static char *malloc_library; static Sys_var_charptr Sys_malloc_library( "version_malloc_library", "Version of the used malloc library", diff --git a/sql/transaction.cc b/sql/transaction.cc index cbd875e3114..ec277e9c9c4 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -21,10 +21,9 @@ #include "mariadb.h" #include "sql_priv.h" #include "transaction.h" -#include "rpl_handler.h" #include "debug_sync.h" // DEBUG_SYNC #include "sql_acl.h" - +#include "semisync_master.h" #ifndef EMBEDDED_LIBRARY /** @@ -318,9 +317,17 @@ bool trans_commit(THD *thd) transaction, so the hooks for rollback will be called. */ if (res) - (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); + { +#ifdef HAVE_REPLICATION + repl_semisync_master.wait_after_rollback(thd, FALSE); +#endif + } else - (void) RUN_HOOK(transaction, after_commit, (thd, FALSE)); + { +#ifdef HAVE_REPLICATION + repl_semisync_master.wait_after_commit(thd, FALSE); +#endif + } thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); thd->transaction.all.reset(); thd->lex->start_transaction_opt= 0; @@ -413,7 +420,9 @@ bool trans_rollback(THD *thd) ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); res= ha_rollback_trans(thd, TRUE); - (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); +#ifdef HAVE_REPLICATION + repl_semisync_master.wait_after_rollback(thd, FALSE); +#endif thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); /* Reset the binlog transaction marker */ thd->variables.option_bits&= ~OPTION_GTID_BEGIN; @@ -526,9 +535,17 @@ bool trans_commit_stmt(THD *thd) transaction, so the hooks for rollback will be called. */ if (res) - (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); + { +#ifdef HAVE_REPLICATION + repl_semisync_master.wait_after_rollback(thd, FALSE); +#endif + } else - (void) RUN_HOOK(transaction, after_commit, (thd, FALSE)); + { +#ifdef HAVE_REPLICATION + repl_semisync_master.wait_after_commit(thd, FALSE); +#endif + } thd->transaction.stmt.reset(); @@ -567,7 +584,9 @@ bool trans_rollback_stmt(THD *thd) trans_reset_one_shot_chistics(thd); } - (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); +#ifdef HAVE_REPLICATION + repl_semisync_master.wait_after_rollback(thd, FALSE); +#endif thd->transaction.stmt.reset(); |