From 74b35b68744ce552f09124e27ef1785af69d7989 Mon Sep 17 00:00:00 2001 From: Andrei Elkin Date: Wed, 22 Nov 2017 19:34:42 +0200 Subject: MDEV-13073. This part patch weeds out RUN_HOOK from the server as semisync is defined statically. Consequently the observer interfaces are removed as well. --- libmysqld/CMakeLists.txt | 2 +- .../perfschema/r/dml_setup_instruments.result | 4 +- .../r/sysvars_server_notembedded,32bit.rdiff | 93 +++- sql/CMakeLists.txt | 2 +- sql/handler.cc | 8 +- sql/log.cc | 75 +-- sql/mysqld.cc | 12 +- sql/mysqld.h | 1 - sql/rpl_handler.cc | 555 --------------------- sql/rpl_handler.h | 222 --------- sql/semisync_master.cc | 8 +- sql/semisync_master.h | 2 +- sql/semisync_slave.h | 2 + sql/slave.cc | 26 +- sql/sql_parse.cc | 1 - sql/sql_repl.cc | 59 ++- sql/transaction.cc | 7 - 17 files changed, 137 insertions(+), 942 deletions(-) delete mode 100644 sql/rpl_handler.cc delete mode 100644 sql/rpl_handler.h diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt index abb19fd9932..19962df0bec 100644 --- a/libmysqld/CMakeLists.txt +++ b/libmysqld/CMakeLists.txt @@ -90,7 +90,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc ../sql/scheduler.cc ../sql/sql_audit.cc ../sql/sql_alter.cc ../sql/sql_partition_admin.cc ../sql/event_parse_data.cc - ../sql/sql_signal.cc ../sql/rpl_handler.cc + ../sql/sql_signal.cc ../sql/sys_vars.cc ${CMAKE_BINARY_DIR}/sql/sql_builtin.cc ../sql/mdl.cc ../sql/transaction.cc diff --git a/mysql-test/suite/perfschema/r/dml_setup_instruments.result b/mysql-test/suite/perfschema/r/dml_setup_instruments.result index 3f4d6107717..ff184806e2e 100644 --- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result +++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result @@ -4,7 +4,7 @@ where name like 'Wait/Synch/Mutex/sql/%' and name not in ('wait/synch/mutex/sql/DEBUG_SYNC::mutex') order by name limit 10; NAME ENABLED TIMED -wait/synch/mutex/sql/Ack_receiver::m_mutex YES YES +wait/synch/mutex/sql/Ack_receiver::mutex YES YES wait/synch/mutex/sql/Cversion_lock YES YES wait/synch/mutex/sql/Delayed_insert::mutex YES YES wait/synch/mutex/sql/Event_scheduler::LOCK_scheduler_state YES YES @@ -36,7 +36,7 @@ where name like 'Wait/Synch/Cond/sql/%' 'wait/synch/cond/sql/DEBUG_SYNC::cond') order by name limit 10; NAME ENABLED TIMED -wait/synch/cond/sql/Ack_receiver::m_cond YES YES +wait/synch/cond/sql/Ack_receiver::cond YES YES wait/synch/cond/sql/COND_binlog_send YES YES wait/synch/cond/sql/COND_flush_thread_cache YES YES wait/synch/cond/sql/COND_group_commit_orderer YES YES diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded,32bit.rdiff b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded,32bit.rdiff index 8b0749810e9..cb9e84b81c8 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded,32bit.rdiff +++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded,32bit.rdiff @@ -1,5 +1,5 @@ ---- sysvars_server_notembedded.result 2017-11-17 17:00:22.470630255 +0100 -+++ sysvars_server_notembedded,32bit.reject 2017-11-17 19:12:42.732453556 +0100 +--- sysvars_server_notembedded.result 2017-12-15 20:57:40.174654761 +0200 ++++ sysvars_server_notembedded,32bit.reject 2017-12-15 21:02:20.476044700 +0200 @@ -58,7 +58,7 @@ GLOBAL_VALUE_ORIGIN CONFIG DEFAULT_VALUE 1 @@ -1116,7 +1116,46 @@ NUMERIC_BLOCK_SIZE 1 ENUM_VALUE_LIST NULL READ_ONLY NO -@@ -4048,7 +4048,7 @@ +@@ -4034,10 +4034,10 @@ + GLOBAL_VALUE_ORIGIN COMPILE-TIME + DEFAULT_VALUE 10000 + VARIABLE_SCOPE GLOBAL +-VARIABLE_TYPE BIGINT UNSIGNED ++VARIABLE_TYPE INT UNSIGNED + VARIABLE_COMMENT The timeout value (in ms) for semi-synchronous replication in the master + NUMERIC_MIN_VALUE 0 +-NUMERIC_MAX_VALUE 18446744073709551615 ++NUMERIC_MAX_VALUE 4294967295 + NUMERIC_BLOCK_SIZE 1 + ENUM_VALUE_LIST NULL + READ_ONLY NO +@@ -4048,10 +4048,10 @@ + GLOBAL_VALUE_ORIGIN COMPILE-TIME + DEFAULT_VALUE 32 + VARIABLE_SCOPE GLOBAL +-VARIABLE_TYPE BIGINT UNSIGNED ++VARIABLE_TYPE INT UNSIGNED + VARIABLE_COMMENT The tracing level for semi-sync replication. + NUMERIC_MIN_VALUE 0 +-NUMERIC_MAX_VALUE 18446744073709551615 ++NUMERIC_MAX_VALUE 4294967295 + NUMERIC_BLOCK_SIZE 1 + ENUM_VALUE_LIST NULL + READ_ONLY NO +@@ -4132,10 +4132,10 @@ + GLOBAL_VALUE_ORIGIN COMPILE-TIME + DEFAULT_VALUE 32 + VARIABLE_SCOPE GLOBAL +-VARIABLE_TYPE BIGINT UNSIGNED ++VARIABLE_TYPE INT UNSIGNED + VARIABLE_COMMENT The tracing level for semi-sync replication. + NUMERIC_MIN_VALUE 0 +-NUMERIC_MAX_VALUE 18446744073709551615 ++NUMERIC_MAX_VALUE 4294967295 + NUMERIC_BLOCK_SIZE 1 + ENUM_VALUE_LIST NULL + READ_ONLY NO +@@ -4174,7 +4174,7 @@ GLOBAL_VALUE_ORIGIN CONFIG DEFAULT_VALUE 1 VARIABLE_SCOPE SESSION @@ -1125,7 +1164,7 @@ VARIABLE_COMMENT Uniquely identifies the server instance in the community of replication partners NUMERIC_MIN_VALUE 1 NUMERIC_MAX_VALUE 4294967295 -@@ -4230,7 +4230,7 @@ +@@ -4356,7 +4356,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 0 VARIABLE_SCOPE GLOBAL @@ -1134,7 +1173,7 @@ VARIABLE_COMMENT Maximum number of parallel threads to use on slave for events in a single replication domain. When using multiple domains, this can be used to limit a single domain from grabbing all threads and thus stalling other domains. The default of 0 means to allow a domain to grab as many threads as it wants, up to the value of slave_parallel_threads. NUMERIC_MIN_VALUE 0 NUMERIC_MAX_VALUE 16383 -@@ -4272,7 +4272,7 @@ +@@ -4398,7 +4398,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 1073741824 VARIABLE_SCOPE GLOBAL @@ -1143,7 +1182,7 @@ VARIABLE_COMMENT The maximum packet length to sent successfully from the master to slave. NUMERIC_MIN_VALUE 1024 NUMERIC_MAX_VALUE 1073741824 -@@ -4300,7 +4300,7 @@ +@@ -4426,7 +4426,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 131072 VARIABLE_SCOPE GLOBAL @@ -1152,7 +1191,7 @@ VARIABLE_COMMENT Limit on how much memory SQL threads should use per parallel replication thread when reading ahead in the relay log looking for opportunities for parallel replication. Only used when --slave-parallel-threads > 0. NUMERIC_MIN_VALUE 0 NUMERIC_MAX_VALUE 2147483647 -@@ -4328,7 +4328,7 @@ +@@ -4454,7 +4454,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 0 VARIABLE_SCOPE GLOBAL @@ -1161,7 +1200,7 @@ VARIABLE_COMMENT If non-zero, number of threads to spawn to apply in parallel events on the slave that were group-committed on the master or were logged with GTID in different replication domains. Note that these threads are in addition to the IO and SQL threads, which are always created by a replication slave NUMERIC_MIN_VALUE 0 NUMERIC_MAX_VALUE 16383 -@@ -4342,7 +4342,7 @@ +@@ -4468,7 +4468,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 0 VARIABLE_SCOPE GLOBAL @@ -1170,7 +1209,7 @@ VARIABLE_COMMENT Alias for slave_parallel_threads NUMERIC_MIN_VALUE 0 NUMERIC_MAX_VALUE 16383 -@@ -4398,7 +4398,7 @@ +@@ -4524,7 +4524,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 10 VARIABLE_SCOPE GLOBAL @@ -1179,7 +1218,7 @@ VARIABLE_COMMENT Number of times the slave SQL thread will retry a transaction in case it failed with a deadlock, elapsed lock wait timeout or listed in slave_transaction_retry_errors, before giving up and stopping NUMERIC_MIN_VALUE 0 NUMERIC_MAX_VALUE 4294967295 -@@ -4426,7 +4426,7 @@ +@@ -4552,7 +4552,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 0 VARIABLE_SCOPE GLOBAL @@ -1188,7 +1227,7 @@ VARIABLE_COMMENT Interval of the slave SQL thread will retry a transaction in case it failed with a deadlock or elapsed lock wait timeout or listed in slave_transaction_retry_errors NUMERIC_MIN_VALUE 0 NUMERIC_MAX_VALUE 3600 -@@ -4257,7 +4257,7 @@ +@@ -4580,7 +4580,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 2 VARIABLE_SCOPE GLOBAL @@ -1197,7 +1236,7 @@ VARIABLE_COMMENT If creating the thread takes longer than this value (in seconds), the Slow_launch_threads counter will be incremented NUMERIC_MIN_VALUE 0 NUMERIC_MAX_VALUE 31536000 -@@ -4316,7 +4316,7 @@ +@@ -4639,7 +4639,7 @@ VARIABLE_TYPE BIGINT UNSIGNED VARIABLE_COMMENT Each thread that needs to do a sort allocates a buffer of this size NUMERIC_MIN_VALUE 1024 @@ -1206,7 +1245,7 @@ NUMERIC_BLOCK_SIZE 1 ENUM_VALUE_LIST NULL READ_ONLY NO -@@ -4621,7 +4621,7 @@ +@@ -4944,7 +4944,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 256 VARIABLE_SCOPE GLOBAL @@ -1215,7 +1254,7 @@ VARIABLE_COMMENT The soft upper limit for number of cached stored routines for one connection. NUMERIC_MIN_VALUE 0 NUMERIC_MAX_VALUE 524288 -@@ -4719,7 +4719,7 @@ +@@ -5042,7 +5042,7 @@ GLOBAL_VALUE_ORIGIN AUTO DEFAULT_VALUE 400 VARIABLE_SCOPE GLOBAL @@ -1224,7 +1263,7 @@ VARIABLE_COMMENT The number of cached table definitions NUMERIC_MIN_VALUE 400 NUMERIC_MAX_VALUE 524288 -@@ -4733,7 +4733,7 @@ +@@ -5056,7 +5056,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 2000 VARIABLE_SCOPE GLOBAL @@ -1233,7 +1272,7 @@ VARIABLE_COMMENT The number of cached open tables NUMERIC_MIN_VALUE 1 NUMERIC_MAX_VALUE 1048576 -@@ -4761,7 +4761,7 @@ +@@ -5126,7 +5126,7 @@ GLOBAL_VALUE_ORIGIN AUTO DEFAULT_VALUE 256 VARIABLE_SCOPE GLOBAL @@ -1242,7 +1281,7 @@ VARIABLE_COMMENT How many threads we should keep in a cache for reuse. These are freed after 5 minutes of idle time NUMERIC_MIN_VALUE 0 NUMERIC_MAX_VALUE 16384 -@@ -4775,7 +4775,7 @@ +@@ -5140,7 +5140,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 10 VARIABLE_SCOPE GLOBAL @@ -1251,7 +1290,7 @@ VARIABLE_COMMENT Permits the application to give the threads system a hint for the desired number of threads that should be run at the same time.This variable has no effect, and is deprecated. It will be removed in a future release. NUMERIC_MIN_VALUE 1 NUMERIC_MAX_VALUE 512 -@@ -4980,15 +4980,15 @@ +@@ -5345,15 +5345,15 @@ READ_ONLY YES COMMAND_LINE_ARGUMENT REQUIRED VARIABLE_NAME TMP_DISK_TABLE_SIZE @@ -1271,7 +1310,7 @@ NUMERIC_BLOCK_SIZE 1 ENUM_VALUE_LIST NULL READ_ONLY NO -@@ -5002,7 +5002,7 @@ +@@ -5367,7 +5367,7 @@ VARIABLE_TYPE BIGINT UNSIGNED VARIABLE_COMMENT If an internal in-memory temporary table exceeds this size, MariaDB will automatically convert it to an on-disk MyISAM or Aria table. Same as tmp_table_size. NUMERIC_MIN_VALUE 1024 @@ -1280,7 +1319,7 @@ NUMERIC_BLOCK_SIZE 1 ENUM_VALUE_LIST NULL READ_ONLY NO -@@ -5016,7 +5016,7 @@ +@@ -5381,7 +5381,7 @@ VARIABLE_TYPE BIGINT UNSIGNED VARIABLE_COMMENT Alias for tmp_memory_table_size. If an internal in-memory temporary table exceeds this size, MariaDB will automatically convert it to an on-disk MyISAM or Aria table. NUMERIC_MIN_VALUE 1024 @@ -1289,7 +1328,7 @@ NUMERIC_BLOCK_SIZE 1 ENUM_VALUE_LIST NULL READ_ONLY NO -@@ -5027,7 +5027,7 @@ +@@ -5392,7 +5392,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 8192 VARIABLE_SCOPE SESSION @@ -1298,7 +1337,7 @@ VARIABLE_COMMENT Allocation block size for transactions to be stored in binary log NUMERIC_MIN_VALUE 1024 NUMERIC_MAX_VALUE 134217728 -@@ -5041,7 +5041,7 @@ +@@ -5406,7 +5406,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 4096 VARIABLE_SCOPE SESSION @@ -1307,7 +1346,7 @@ VARIABLE_COMMENT Persistent buffer for transactions to be stored in binary log NUMERIC_MIN_VALUE 1024 NUMERIC_MAX_VALUE 134217728 -@@ -5139,7 +5139,7 @@ +@@ -5504,7 +5504,7 @@ GLOBAL_VALUE_ORIGIN COMPILE-TIME DEFAULT_VALUE 28800 VARIABLE_SCOPE SESSION @@ -1316,7 +1355,7 @@ VARIABLE_COMMENT The number of seconds the server waits for activity on a connection before closing it NUMERIC_MIN_VALUE 1 NUMERIC_MAX_VALUE 31536000 -@@ -5243,7 +5243,7 @@ +@@ -5609,7 +5609,7 @@ COMMAND_LINE_ARGUMENT OPTIONAL VARIABLE_NAME OPEN_FILES_LIMIT VARIABLE_SCOPE GLOBAL @@ -1325,7 +1364,7 @@ VARIABLE_COMMENT If this is not 0, then mysqld will use this value to reserve file descriptors to use with setrlimit(). If this value is 0 or autoset then mysqld will reserve max_connections*5 or max_connections + table_cache*2 (whichever is larger) number of file descriptors NUMERIC_MIN_VALUE 0 NUMERIC_MAX_VALUE 4294967295 -@@ -5256,7 +5256,7 @@ +@@ -5622,7 +5622,7 @@ VARIABLE_TYPE BIGINT UNSIGNED VARIABLE_COMMENT Sets the internal state of the RAND() generator for replication purposes NUMERIC_MIN_VALUE 0 @@ -1334,7 +1373,7 @@ NUMERIC_BLOCK_SIZE 1 ENUM_VALUE_LIST NULL READ_ONLY NO -@@ -5266,7 +5266,7 @@ +@@ -5632,7 +5632,7 @@ VARIABLE_TYPE BIGINT UNSIGNED VARIABLE_COMMENT Sets the internal state of the RAND() generator for replication purposes NUMERIC_MIN_VALUE 0 @@ -1343,7 +1382,7 @@ NUMERIC_BLOCK_SIZE 1 ENUM_VALUE_LIST NULL READ_ONLY NO -@@ -5351,7 +5351,7 @@ +@@ -5727,7 +5727,7 @@ VARIABLE_NAME LOG_TC_SIZE GLOBAL_VALUE_ORIGIN AUTO VARIABLE_SCOPE GLOBAL diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index fadee80491b..24e1dd27d02 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -122,7 +122,7 @@ SET (SQL_SOURCE rpl_rli.cc rpl_mi.cc sql_servers.cc sql_audit.cc sql_connect.cc scheduler.cc sql_partition_admin.cc sql_profile.cc event_parse_data.cc sql_alter.cc - sql_signal.cc rpl_handler.cc mdl.cc sql_admin.cc + sql_signal.cc mdl.cc sql_admin.cc transaction.cc sys_vars.cc sql_truncate.cc datadict.cc sql_reload.cc item_inetfunc.cc diff --git a/sql/handler.cc b/sql/handler.cc index 7ea02278abc..a0b1c0ebf07 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -23,7 +23,7 @@ #include "mariadb.h" #include "sql_priv.h" #include "unireg.h" -#include "rpl_handler.h" +#include "rpl_rli.h" #include "sql_cache.h" // query_cache, query_cache_* #include "sql_connect.h" // global_table_stats #include "key.h" // key_copy, key_unpack, key_cmp_if_same, key_cmp @@ -1485,8 +1485,7 @@ done: mysql_mutex_assert_not_owner(mysql_bin_log.get_log_lock()); mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); - (void) RUN_HOOK(transaction, after_commit, (thd, FALSE)); -#ifdef REPLICATION +#ifdef HAVE_REPLICATION repl_semisync_master.waitAfterCommit(thd, all); DEBUG_SYNC(thd, "after_group_after_commit"); #endif @@ -1734,8 +1733,7 @@ int ha_rollback_trans(THD *thd, bool all) push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARNING_NOT_COMPLETE_ROLLBACK, ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK)); - (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); -#ifdef REPLICATION +#ifdef HAVE_REPLICATION repl_semisync_master.waitAfterRollback(thd, all); #endif DBUG_RETURN(error); diff --git a/sql/log.cc b/sql/log.cc index a866d72d785..cd757cac30e 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -49,7 +49,6 @@ #endif #include "sql_plugin.h" -#include "rpl_handler.h" #include "debug_sync.h" #include "sql_show.h" #include "my_pthread.h" @@ -6376,19 +6375,15 @@ err: mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); - if ((error= RUN_HOOK(binlog_storage, after_flush, - (thd, log_file_name, file->pos_in_file, - synced, true, true))) -#ifdef REPLICATION - || repl_semisync_master.reportBinlogUpdate(thd, log_file_name, - file->pos_in_file) -#endif - ) +#ifdef HAVE_REPLICATION + if (repl_semisync_master.reportBinlogUpdate(thd, log_file_name, + file->pos_in_file)) { sql_print_error("Failed to run 'after_flush' hooks"); error= 1; } else +#endif { /* update binlog_end_pos so it can be read by dump thread @@ -6413,18 +6408,14 @@ err: mysql_mutex_assert_not_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); - if (RUN_HOOK(binlog_storage, after_sync, - (thd, log_file_name, file->pos_in_file, - true, true)) -#ifdef REPLICATION - || repl_semisync_master.waitAfterSync(log_file_name, - file->pos_in_file) -#endif - ) +#ifdef HAVE_REPLICATION + if (repl_semisync_master.waitAfterSync(log_file_name, + file->pos_in_file)) { error=1; /* error is already printed inside hook */ } +#endif /* Take mutex to protect against a reader seeing partial writes of 64-bit @@ -7850,33 +7841,23 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); - bool first __attribute__((unused))= true; - bool last __attribute__((unused)); + for (current= queue; current != NULL; current= current->next) { - last= current->next == NULL; +#ifdef HAVE_REPLICATION if (!current->error && - (RUN_HOOK(binlog_storage, after_flush, - (current->thd, - current->cache_mngr->last_commit_pos_file, - current->cache_mngr->last_commit_pos_offset, synced, - first, last)) -#ifdef REPLICATION - || (DBUG_EVALUATE_IF("failed_report_binlog_update", 1, 0) || - repl_semisync_master. - reportBinlogUpdate(current->thd, - current->cache_mngr->last_commit_pos_file, - current->cache_mngr-> - last_commit_pos_offset)) -#endif - )) + repl_semisync_master. + reportBinlogUpdate(current->thd, + current->cache_mngr->last_commit_pos_file, + current->cache_mngr-> + last_commit_pos_offset)) { current->error= ER_ERROR_ON_WRITE; current->commit_errno= -1; current->error_cache= NULL; any_error= true; } - first= false; +#endif } /* @@ -7951,24 +7932,14 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) for (current= queue; current != NULL; current= current->next) { last= current->next == NULL; - if (!current->error && - (RUN_HOOK(binlog_storage, after_sync, - (current->thd, current->cache_mngr->last_commit_pos_file, - current->cache_mngr->last_commit_pos_offset, - first, last)) -#ifdef REPLICATION - || (DBUG_EVALUATE_IF("simulate_after_sync_hook_error", 1, 0) || - repl_semisync_master.waitAfterSync(current->cache_mngr-> - last_commit_pos_file, - current->cache_mngr-> - last_commit_pos_offset)) +#ifdef HAVE_REPLICATION + if (!current->error) + current->error= + repl_semisync_master.waitAfterSync(current->cache_mngr-> + last_commit_pos_file, + current->cache_mngr-> + last_commit_pos_offset); #endif - )) - { - const char *hook_name= rpl_semi_sync_master_enabled ? - "'waitAfterSync'" : "binlog_storage 'after_sync'"; - sql_print_error("Failed to call '%s'", hook_name); - } first= false; } } diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 86759b8ed8b..f6a4ea60755 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -97,7 +97,6 @@ #include "set_var.h" #include "rpl_injector.h" -#include "rpl_handler.h" #include "semisync_master.h" #include "semisync_slave.h" @@ -390,7 +389,7 @@ static longlong start_memory_used; /* Global variables */ bool opt_bin_log, opt_bin_log_used=0, opt_ignore_builtin_innodb= 0; -bool opt_bin_log_compress, run_hooks_enabled; +bool opt_bin_log_compress; uint opt_bin_log_compress_min_len; my_bool opt_log, debug_assert_if_crashed_table= 0, opt_help= 0; my_bool debug_assert_on_not_freed_memory= 0; @@ -2237,7 +2236,6 @@ void clean_up(bool print_message) #ifdef HAVE_REPLICATION semi_sync_master_deinit(); #endif - delegates_destroy(); xid_cache_free(); tdc_deinit(); mdl_destroy(); @@ -5154,13 +5152,6 @@ static int init_server_components() xid_cache_init(); - /* - initialize delegates for extension observers, errors have already - been reported in the function - */ - if (delegates_init()) - unireg_abort(1); - /* need to configure logging before initializing storage engines */ if (!opt_bin_log_used && !WSREP_ON) { @@ -8960,7 +8951,6 @@ static int mysql_init_variables(void) transactions_multi_engine= 0; rpl_transactions_multi_engine= 0; transactions_gtid_foreign_engine= 0; - run_hooks_enabled= 0; // don't run hooks, semisync does not need 'em log_bin_basename= NULL; log_bin_index= NULL; diff --git a/sql/mysqld.h b/sql/mysqld.h index 5399ec91b19..9a2cde9b145 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -109,7 +109,6 @@ extern CHARSET_INFO *character_set_filesystem; extern MY_BITMAP temp_pool; extern bool opt_large_files; extern bool opt_update_log, opt_bin_log, opt_error_log, opt_bin_log_compress; -extern bool run_hooks_enabled; extern uint opt_bin_log_compress_min_len; extern my_bool opt_log, opt_bootstrap; extern my_bool opt_backup_history_log; diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc deleted file mode 100644 index 27e411ca6de..00000000000 --- a/sql/rpl_handler.cc +++ /dev/null @@ -1,555 +0,0 @@ -/* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - -#include "mariadb.h" -#include "sql_priv.h" -#include "unireg.h" - -#include "rpl_mi.h" -#include "sql_repl.h" -#include "log_event.h" -#include "rpl_filter.h" -#include -#include "rpl_handler.h" - -Trans_delegate *transaction_delegate; -Binlog_storage_delegate *binlog_storage_delegate; -#ifdef HAVE_REPLICATION -Binlog_transmit_delegate *binlog_transmit_delegate; -Binlog_relay_IO_delegate *binlog_relay_io_delegate; -#endif /* HAVE_REPLICATION */ - -/* - structure to save transaction log filename and position -*/ -typedef struct Trans_binlog_info { - my_off_t log_pos; - char log_file[FN_REFLEN]; -} Trans_binlog_info; - -int get_user_var_int(const char *name, - long long int *value, int *null_value) -{ - bool null_val; - user_var_entry *entry= - (user_var_entry*) my_hash_search(¤t_thd->user_vars, - (uchar*) name, strlen(name)); - if (!entry) - return 1; - *value= entry->val_int(&null_val); - if (null_value) - *null_value= null_val; - return 0; -} - -int get_user_var_real(const char *name, - double *value, int *null_value) -{ - bool null_val; - user_var_entry *entry= - (user_var_entry*) my_hash_search(¤t_thd->user_vars, - (uchar*) name, strlen(name)); - if (!entry) - return 1; - *value= entry->val_real(&null_val); - if (null_value) - *null_value= null_val; - return 0; -} - -int get_user_var_str(const char *name, char *value, - size_t len, unsigned int precision, int *null_value) -{ - String str; - bool null_val; - user_var_entry *entry= - (user_var_entry*) my_hash_search(¤t_thd->user_vars, - (uchar*) name, strlen(name)); - if (!entry) - return 1; - entry->val_str(&null_val, &str, precision); - strncpy(value, str.c_ptr(), len); - if (null_value) - *null_value= null_val; - return 0; -} - -int delegates_init() -{ - static my_aligned_storage trans_mem; - static my_aligned_storage storage_mem; -#ifdef HAVE_REPLICATION - static my_aligned_storage transmit_mem; - static my_aligned_storage relay_io_mem; -#endif - - void *place_trans_mem= trans_mem.data; - void *place_storage_mem= storage_mem.data; - - transaction_delegate= new (place_trans_mem) Trans_delegate; - - if (!transaction_delegate->is_inited()) - { - sql_print_error("Initialization of transaction delegates failed. " - "Please report a bug."); - return 1; - } - - binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate; - - if (!binlog_storage_delegate->is_inited()) - { - sql_print_error("Initialization binlog storage delegates failed. " - "Please report a bug."); - return 1; - } - -#ifdef HAVE_REPLICATION - void *place_transmit_mem= transmit_mem.data; - void *place_relay_io_mem= relay_io_mem.data; - - binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate; - - if (!binlog_transmit_delegate->is_inited()) - { - sql_print_error("Initialization of binlog transmit delegates failed. " - "Please report a bug."); - return 1; - } - - binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate; - - if (!binlog_relay_io_delegate->is_inited()) - { - sql_print_error("Initialization binlog relay IO delegates failed. " - "Please report a bug."); - return 1; - } -#endif - - return 0; -} - -void delegates_destroy() -{ - if (transaction_delegate) - transaction_delegate->~Trans_delegate(); - transaction_delegate= 0; - if (binlog_storage_delegate) - binlog_storage_delegate->~Binlog_storage_delegate(); - binlog_storage_delegate= 0; -#ifdef HAVE_REPLICATION - if (binlog_transmit_delegate) - binlog_transmit_delegate->~Binlog_transmit_delegate(); - binlog_transmit_delegate= 0; - if (binlog_relay_io_delegate) - binlog_relay_io_delegate->~Binlog_relay_IO_delegate(); - binlog_relay_io_delegate= 0; -#endif /* HAVE_REPLICATION */ -} - -/* - This macro is used by almost all the Delegate methods to iterate - over all the observers running given callback function of the - delegate. - */ -#define FOREACH_OBSERVER(r, f, do_lock, args) \ - param.server_id= thd->variables.server_id; \ - read_lock(); \ - Observer_info_iterator iter= observer_info_iter(); \ - Observer_info *info= iter++; \ - for (; info; info= iter++) \ - { \ - if (((Observer *)info->observer)->f \ - && ((Observer *)info->observer)->f args) \ - { \ - r= 1; \ - sql_print_error("Run function '" #f "' failed"); \ - break; \ - } \ - } \ - unlock(); - - -int Trans_delegate::after_commit(THD *thd, bool all) -{ - Trans_param param; - Trans_binlog_info *log_info; - bool is_real_trans= (all || thd->transaction.all.ha_list == 0); - int ret= 0; - - param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0; - - log_info= thd->semisync_info; - - param.log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0; - param.log_pos= log_info ? log_info->log_pos : 0; - - FOREACH_OBSERVER(ret, after_commit, false, (¶m)); - - /* - This is the end of a real transaction or autocommit statement, we - can mark the memory unused. - */ - if (is_real_trans && log_info) - { - log_info->log_file[0]= 0; - log_info->log_pos= 0; - } - return ret; -} - -int Trans_delegate::after_rollback(THD *thd, bool all) -{ - Trans_param param; - Trans_binlog_info *log_info; - bool is_real_trans= (all || thd->transaction.all.ha_list == 0); - int ret= 0; - - param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0; - - log_info= thd->semisync_info; - - param.log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0; - param.log_pos= log_info ? log_info->log_pos : 0; - - FOREACH_OBSERVER(ret, after_rollback, false, (¶m)); - - /* - This is the end of a real transaction or autocommit statement, we - can mark the memory unused. - */ - if (is_real_trans && log_info) - { - log_info->log_file[0]= 0; - log_info->log_pos= 0; - } - return ret; -} - -int Binlog_storage_delegate::after_flush(THD *thd, - const char *log_file, - my_off_t log_pos, - bool synced, - bool first_in_group, - bool last_in_group) -{ - Binlog_storage_param param; - Trans_binlog_info *log_info; - uint32 flags=0; - int ret= 0; - - if (synced) - flags |= BINLOG_STORAGE_IS_SYNCED; - if (first_in_group) - flags|= BINLOG_GROUP_COMMIT_LEADER; - if (last_in_group) - flags|= BINLOG_GROUP_COMMIT_TRAILER; - - if (!(log_info= thd->semisync_info)) - { - if(!(log_info= - (Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0)))) - return 1; - thd->semisync_info= log_info; - } - - strmake_buf(log_info->log_file, log_file+dirname_length(log_file)); - log_info->log_pos = log_pos; - - FOREACH_OBSERVER(ret, after_flush, false, - (¶m, log_info->log_file, log_info->log_pos, flags)); - return ret; -} - -int Binlog_storage_delegate::after_sync(THD *thd, - const char *log_file, - my_off_t log_pos, - bool first_in_group, - bool last_in_group) -{ - Binlog_storage_param param; - uint32 flags=0; - - if (first_in_group) - flags|= BINLOG_GROUP_COMMIT_LEADER; - if (last_in_group) - flags|= BINLOG_GROUP_COMMIT_TRAILER; - - int ret= 0; - FOREACH_OBSERVER(ret, after_sync, false, - (¶m, log_file+dirname_length(log_file), log_pos, flags)); - - return ret; -} - -#ifdef HAVE_REPLICATION -int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags, - const char *log_file, - my_off_t log_pos) -{ - Binlog_transmit_param param; - param.flags= flags; - - int ret= 0; - FOREACH_OBSERVER(ret, transmit_start, true, (¶m, log_file, log_pos)); - return ret; -} - -int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags) -{ - Binlog_transmit_param param; - param.flags= flags; - - int ret= 0; - FOREACH_OBSERVER(ret, transmit_stop, false, (¶m)); - return ret; -} - -int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags, - String *packet) -{ - /* NOTE2ME: Maximum extra header size for each observer, I hope 32 - bytes should be enough for each Observer to reserve their extra - header. If later found this is not enough, we can increase this - /HEZX - */ -#define RESERVE_HEADER_SIZE 32 - unsigned char header[RESERVE_HEADER_SIZE]; - ulong hlen; - Binlog_transmit_param param; - param.flags= flags; - param.server_id= thd->variables.server_id; - - int ret= 0; - read_lock(); - Observer_info_iterator iter= observer_info_iter(); - Observer_info *info= iter++; - for (; info; info= iter++) - { - hlen= 0; - if (((Observer *)info->observer)->reserve_header - && ((Observer *)info->observer)->reserve_header(¶m, - header, - RESERVE_HEADER_SIZE, - &hlen)) - { - ret= 1; - break; - } - if (hlen == 0) - continue; - if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen)) - { - ret= 1; - break; - } - } - unlock(); - return ret; -} - -int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags, - String *packet, - const char *log_file, - my_off_t log_pos) -{ - Binlog_transmit_param param; - param.flags= flags; - - int ret= 0; - FOREACH_OBSERVER(ret, before_send_event, false, - (¶m, (uchar *)packet->c_ptr(), - packet->length(), - log_file+dirname_length(log_file), log_pos)); - return ret; -} - -int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags, - String *packet) -{ - Binlog_transmit_param param; - param.flags= flags; - - int ret= 0; - FOREACH_OBSERVER(ret, after_send_event, false, - (¶m, packet->c_ptr(), packet->length())); - return ret; -} - -int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags) - -{ - Binlog_transmit_param param; - param.flags= flags; - - int ret= 0; - FOREACH_OBSERVER(ret, after_reset_master, false, (¶m)); - return ret; -} - -void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param, - Master_info *mi) -{ - param->mysql= mi->mysql; - param->user= mi->user; - param->host= mi->host; - param->port= mi->port; - param->master_log_name= mi->master_log_name; - param->master_log_pos= mi->master_log_pos; -} - -int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi) -{ - Binlog_relay_IO_param param; - init_param(¶m, mi); - - int ret= 0; - FOREACH_OBSERVER(ret, thread_start, true, (¶m)); - return ret; -} - - -int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi) -{ - - Binlog_relay_IO_param param; - init_param(¶m, mi); - - int ret= 0; - FOREACH_OBSERVER(ret, thread_stop, false, (¶m)); - return ret; -} - -int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, - Master_info *mi, - ushort flags) -{ - Binlog_relay_IO_param param; - init_param(¶m, mi); - - int ret= 0; - FOREACH_OBSERVER(ret, before_request_transmit, false, (¶m, (uint32)flags)); - return ret; -} - -int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi, - const char *packet, ulong len, - const char **event_buf, - ulong *event_len) -{ - Binlog_relay_IO_param param; - init_param(¶m, mi); - - int ret= 0; - FOREACH_OBSERVER(ret, after_read_event, false, - (¶m, packet, len, event_buf, event_len)); - return ret; -} - -int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi, - const char *event_buf, - ulong event_len, - bool synced) -{ - Binlog_relay_IO_param param; - init_param(¶m, mi); - - uint32 flags=0; - if (synced) - flags |= BINLOG_STORAGE_IS_SYNCED; - - int ret= 0; - FOREACH_OBSERVER(ret, after_queue_event, false, - (¶m, event_buf, event_len, flags)); - return ret; -} - -int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi) - -{ - Binlog_relay_IO_param param; - init_param(¶m, mi); - - int ret= 0; - FOREACH_OBSERVER(ret, after_reset_slave, false, (¶m)); - return ret; -} -#endif /* HAVE_REPLICATION */ - -int register_trans_observer(Trans_observer *observer, void *p) -{ - return transaction_delegate->add_observer(observer, (st_plugin_int *)p); -} - -int unregister_trans_observer(Trans_observer *observer, void *p) -{ - return transaction_delegate->remove_observer(observer, (st_plugin_int *)p); -} - -int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p) -{ - return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p); -} - -int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p) -{ - return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p); -} - -#ifdef HAVE_REPLICATION -int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) -{ - return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p); -} - -int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) -{ - return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p); -} - -int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) -{ - return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p); -} - -int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) -{ - return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p); -} -#else -int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) -{ - return 0; -} - -int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) -{ - return 0; -} - -int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) -{ - return 0; -} - -int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) -{ - return 0; -} -#endif /* HAVE_REPLICATION */ diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h deleted file mode 100644 index f8b11adbb69..00000000000 --- a/sql/rpl_handler.h +++ /dev/null @@ -1,222 +0,0 @@ -/* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved. - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - -#ifndef RPL_HANDLER_H -#define RPL_HANDLER_H - -#include "sql_priv.h" -#include "rpl_mi.h" -#include "rpl_rli.h" -#include "sql_plugin.h" -#include "replication.h" - -class Observer_info { -public: - void *observer; - st_plugin_int *plugin_int; - - Observer_info(void *ob, st_plugin_int *p) - :observer(ob), plugin_int(p) - { } -}; - -class Delegate { -public: - typedef List Observer_info_list; - typedef List_iterator Observer_info_iterator; - - int add_observer(void *observer, st_plugin_int *plugin) - { - int ret= FALSE; - if (!inited) - return TRUE; - write_lock(); - Observer_info_iterator iter(observer_info_list); - Observer_info *info= iter++; - while (info && info->observer != observer) - info= iter++; - if (!info) - { - info= new Observer_info(observer, plugin); - if (!info || observer_info_list.push_back(info, &memroot)) - ret= TRUE; - } - else - ret= TRUE; - unlock(); - return ret; - } - - int remove_observer(void *observer, st_plugin_int *plugin) - { - int ret= FALSE; - if (!inited) - return TRUE; - write_lock(); - Observer_info_iterator iter(observer_info_list); - Observer_info *info= iter++; - while (info && info->observer != observer) - info= iter++; - if (info) - { - iter.remove(); - delete info; - } - else - ret= TRUE; - unlock(); - return ret; - } - - inline Observer_info_iterator observer_info_iter() - { - return Observer_info_iterator(observer_info_list); - } - - inline bool is_empty() - { - return observer_info_list.is_empty(); - } - - inline int read_lock() - { - if (!inited) - return TRUE; - return rw_rdlock(&lock); - } - - inline int write_lock() - { - if (!inited) - return TRUE; - return rw_wrlock(&lock); - } - - inline int unlock() - { - if (!inited) - return TRUE; - return rw_unlock(&lock); - } - - inline bool is_inited() - { - return inited; - } - - Delegate() - { - inited= FALSE; - if (my_rwlock_init(&lock, NULL)) - return; - init_sql_alloc(&memroot, 1024, 0, MYF(0)); - inited= TRUE; - } - ~Delegate() - { - inited= FALSE; - rwlock_destroy(&lock); - free_root(&memroot, MYF(0)); - } - -private: - Observer_info_list observer_info_list; - rw_lock_t lock; - MEM_ROOT memroot; - bool inited; -}; - -class Trans_delegate - :public Delegate { -public: - typedef Trans_observer Observer; - int before_commit(THD *thd, bool all); - int before_rollback(THD *thd, bool all); - int after_commit(THD *thd, bool all); - int after_rollback(THD *thd, bool all); -}; - -class Binlog_storage_delegate - :public Delegate { -public: - typedef Binlog_storage_observer Observer; - int after_flush(THD *thd, const char *log_file, - my_off_t log_pos, bool synced, - bool first_in_group, bool last_in_group); - int after_sync(THD *thd, const char *log_file, my_off_t log_pos, - bool first_in_group, bool last_in_group); -}; - -#ifdef HAVE_REPLICATION -class Binlog_transmit_delegate - :public Delegate { -public: - typedef Binlog_transmit_observer Observer; - int transmit_start(THD *thd, ushort flags, - const char *log_file, my_off_t log_pos); - int transmit_stop(THD *thd, ushort flags); - int reserve_header(THD *thd, ushort flags, String *packet); - int before_send_event(THD *thd, ushort flags, - String *packet, const - char *log_file, my_off_t log_pos ); - int after_send_event(THD *thd, ushort flags, - String *packet); - int after_reset_master(THD *thd, ushort flags); -}; - -class Binlog_relay_IO_delegate - :public Delegate { -public: - typedef Binlog_relay_IO_observer Observer; - int thread_start(THD *thd, Master_info *mi); - int thread_stop(THD *thd, Master_info *mi); - int before_request_transmit(THD *thd, Master_info *mi, ushort flags); - int after_read_event(THD *thd, Master_info *mi, - const char *packet, ulong len, - const char **event_buf, ulong *event_len); - int after_queue_event(THD *thd, Master_info *mi, - const char *event_buf, ulong event_len, - bool synced); - int after_reset_slave(THD *thd, Master_info *mi); -private: - void init_param(Binlog_relay_IO_param *param, Master_info *mi); -}; -#endif /* HAVE_REPLICATION */ - -int delegates_init(); -void delegates_destroy(); - -extern Trans_delegate *transaction_delegate; -extern Binlog_storage_delegate *binlog_storage_delegate; -#ifdef HAVE_REPLICATION -extern Binlog_transmit_delegate *binlog_transmit_delegate; -extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; -#endif /* HAVE_REPLICATION */ - -/* - if semisync replication is not enabled, we can return immediately. -*/ -#ifdef HAVE_REPLICATION -/* - As semisync is unpluggined and its hooks are turned into static - invocations all other hooks are not run for optimization sake. -*/ -#define RUN_HOOK(group, hook, args) \ - (unlikely(run_hooks_enabled) ? group ##_delegate->hook args : 0) -#else -#define RUN_HOOK(group, hook, args) 0 -#endif /* HAVE_REPLICATION */ - -#endif /* RPL_HANDLER_H */ diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc index de91e30beec..8adbce179e1 100644 --- a/sql/semisync_master.cc +++ b/sql/semisync_master.cc @@ -738,19 +738,19 @@ int ReplSemiSyncMaster::reportBinlogUpdate(THD* thd, const char *log_file, return 0; } -void ReplSemiSyncMaster::dump_start(THD* thd, +int ReplSemiSyncMaster::dump_start(THD* thd, const char *log_file, my_off_t log_pos) { if (!thd->semi_sync_slave) - return; + return 0; if (ack_receiver.add_slave(thd)) { sql_print_error("Failed to register slave to semi-sync ACK receiver " "thread. Turning off semisync"); thd->semi_sync_slave= 0; - return; + return 1; } add_slave(); @@ -758,7 +758,7 @@ void ReplSemiSyncMaster::dump_start(THD* thd, sql_print_information("Start semi-sync binlog_dump to slave (server_id: %d), pos(%s, %lu", thd->variables.server_id, log_file, (unsigned long)log_pos); - return; + return 0; } void ReplSemiSyncMaster::dump_end(THD* thd) diff --git a/sql/semisync_master.h b/sql/semisync_master.h index 66fac17cd45..737ad46dd27 100644 --- a/sql/semisync_master.h +++ b/sql/semisync_master.h @@ -553,7 +553,7 @@ class ReplSemiSyncMaster * be acked by slave*/ int reportBinlogUpdate(THD *thd, const char *log_file,my_off_t log_pos); - void dump_start(THD* thd, + int dump_start(THD* thd, const char *log_file, my_off_t log_pos); diff --git a/sql/semisync_slave.h b/sql/semisync_slave.h index 0df4445ee4a..d67e1a05070 100644 --- a/sql/semisync_slave.h +++ b/sql/semisync_slave.h @@ -24,6 +24,8 @@ #include "rpl_mi.h" #include "mysql.h" +class Master_info; + /** The extension class for the slave of semi-synchronous replication */ diff --git a/sql/slave.cc b/sql/slave.cc index b4a817d6ecd..acca99481e8 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -43,7 +43,6 @@ #include #include #include -#include "rpl_handler.h" #include #include #include @@ -3587,10 +3586,6 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, if (opt_log_slave_updates && opt_replicate_annotate_row_events) binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT; - if (RUN_HOOK(binlog_relay_io, - before_request_transmit, - (thd, mi, binlog_flags))) - DBUG_RETURN(1); if (repl_semisync_slave.requestTransmit(mi)) DBUG_RETURN(1); @@ -4618,9 +4613,8 @@ pthread_handler_t handle_slave_io(void *arg) } - if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)) || - (DBUG_EVALUATE_IF("failed_slave_start", 1, 0) - || repl_semisync_slave.slaveStart(mi))) + if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0) + || repl_semisync_slave.slaveStart(mi)) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER_THD(thd, ER_SLAVE_FATAL_ERROR), @@ -4811,10 +4805,7 @@ Stopping slave I/O thread due to out-of-memory error from master"); THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log); event_buf= (const char*)mysql->net.read_pos + 1; mi->semi_ack= 0; - if (RUN_HOOK(binlog_relay_io, after_read_event, - (thd, mi,(const char*)mysql->net.read_pos + 1, - event_len, &event_buf, &event_len)) || - repl_semisync_slave. + if (repl_semisync_slave. slaveReadSyncHeader((const char*)mysql->net.read_pos + 1, event_len, &(mi->semi_ack), &event_buf, &event_len)) { @@ -4865,9 +4856,6 @@ Stopping slave I/O thread due to out-of-memory error from master"); tokenamount -= network_read_len; } - /* XXX: 'synced' should be updated by queue_event to indicate - whether event has been synced to disk */ - bool synced= 0; if (queue_event(mi, event_buf, event_len)) { mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, @@ -4876,11 +4864,8 @@ Stopping slave I/O thread due to out-of-memory error from master"); goto err; } - if (RUN_HOOK(binlog_relay_io, after_queue_event, - (thd, mi, event_buf, event_len, synced)) || - (rpl_semi_sync_slave_status && - (mi->semi_ack & SEMI_SYNC_NEED_ACK) && - repl_semisync_slave.slaveReply(mi))) + if (rpl_semi_sync_slave_status && (mi->semi_ack & SEMI_SYNC_NEED_ACK) && + repl_semisync_slave.slaveReply(mi)) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER_THD(thd, ER_SLAVE_FATAL_ERROR), @@ -4952,7 +4937,6 @@ err: IO_RPL_LOG_NAME, mi->master_log_pos, tmp.c_ptr_safe()); } - (void) RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi)); repl_semisync_slave.slaveStop(mi); thd->reset_query(); thd->reset_db(NULL, 0); diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 9ce6c6b8787..60ab84069b0 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -82,7 +82,6 @@ #include #include #include -#include "rpl_handler.h" #include "rpl_mi.h" #include "sql_digest.h" diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 36fa6120584..c9e804135a5 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -28,7 +28,6 @@ #include "log_event.h" #include "rpl_filter.h" #include -#include "rpl_handler.h" #include "debug_sync.h" #include "semisync_master.h" #include "semisync_slave.h" @@ -315,24 +314,35 @@ static int reset_transmit_packet(binlog_send_info *info, ushort flags, packet->length(0); packet->set("\0", 1, &my_charset_bin); - if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet))) - { - /* RUN_HOOK() must return zero when thd->semi_sync_slave */ - DBUG_ASSERT(!info->thd->semi_sync_slave); - - info->error= ER_UNKNOWN_ERROR; - *errmsg= "Failed to run hook 'reserve_header'"; - ret= 1; - } if (info->thd->semi_sync_slave) { - repl_semisync_master.reserveSyncHeader(packet); + if (repl_semisync_master.reserveSyncHeader(packet)) + { + info->error= ER_UNKNOWN_ERROR; + *errmsg= "Failed to run hook 'reserve_header'"; + ret= 1; + } } *ev_offset= packet->length(); return ret; } +int get_user_var_int(const char *name, + long long int *value, int *null_value) +{ + bool null_val; + user_var_entry *entry= + (user_var_entry*) my_hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + *value= entry->val_int(&null_val); + if (null_value) + *null_value= null_val; + return 0; +} + inline bool is_semi_sync_slave() { int null_value; @@ -1935,9 +1945,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave); pos= my_b_tell(log); - if (RUN_HOOK(binlog_transmit, before_send_event, - (info->thd, info->flags, packet, info->log_file_name, pos)) || - repl_semisync_master.updateSyncHeader(info->thd, (uchar *)packet->c_ptr(), + if (repl_semisync_master.updateSyncHeader(info->thd, (uchar *)packet->c_ptr(), info->log_file_name + info->dirlen, pos, &need_sync)) { @@ -1961,14 +1969,11 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, } } - if (RUN_HOOK(binlog_transmit, after_send_event, - (info->thd, info->flags, packet))) + if (need_sync && repl_semisync_master.flushNet(info->thd, packet->c_ptr())) { info->error= ER_UNKNOWN_ERROR; return "Failed to run hook 'after_send_event'"; } - if (need_sync) - repl_semisync_master.flushNet(info->thd, packet->c_ptr()); return NULL; /* Success */ } @@ -2740,21 +2745,16 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, if (init_binlog_sender(info, &linfo, log_ident, &pos)) goto err; - /* - run hook first when all check has been made that slave seems to - be requesting a reasonable position. i.e when transmit actually starts - */ - if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) + has_transmit_started= true; + + /* Check if the dump thread is created by a slave with semisync enabled. */ + thd->semi_sync_slave = is_semi_sync_slave(); + if (repl_semisync_master.dump_start(thd, log_ident, pos)) { info->errmsg= "Failed to run hook 'transmit_start'"; info->error= ER_UNKNOWN_ERROR; goto err; } - has_transmit_started= true; - - /* Check if the dump thread is created by a slave with semisync enabled. */ - thd->semi_sync_slave = is_semi_sync_slave(); - repl_semisync_master.dump_start(thd, log_ident, pos); /* heartbeat_period from @master_heartbeat_period user variable @@ -2871,7 +2871,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, err: THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination); - (void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); if (has_transmit_started) { repl_semisync_master.dump_end(thd); @@ -3341,7 +3340,6 @@ int reset_slave(THD *thd, Master_info* mi) else if (global_system_variables.log_warnings > 1) sql_print_information("Deleted Master_info file '%s'.", fname); - (void) RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); if (rpl_semi_sync_slave_enabled) repl_semisync_slave.resetSlave(mi); err: @@ -3850,7 +3848,6 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len, repl_semisync_master.beforeResetMaster(); ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len, next_log_number); - (void) RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); repl_semisync_master.afterResetMaster(); return ret; } diff --git a/sql/transaction.cc b/sql/transaction.cc index f6ccf5a1930..349f4549d31 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -21,7 +21,6 @@ #include "mariadb.h" #include "sql_priv.h" #include "transaction.h" -#include "rpl_handler.h" #include "debug_sync.h" // DEBUG_SYNC #include "sql_acl.h" #include "semisync_master.h" @@ -319,14 +318,12 @@ bool trans_commit(THD *thd) */ if (res) { - (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); #ifdef HAVE_REPLICATION repl_semisync_master.waitAfterRollback(thd, FALSE); #endif } else { - (void) RUN_HOOK(transaction, after_commit, (thd, FALSE)); #ifdef HAVE_REPLICATION repl_semisync_master.waitAfterCommit(thd, FALSE); #endif @@ -423,7 +420,6 @@ bool trans_rollback(THD *thd) ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); res= ha_rollback_trans(thd, TRUE); - (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); #ifdef HAVE_REPLICATION repl_semisync_master.waitAfterRollback(thd, FALSE); #endif @@ -540,14 +536,12 @@ bool trans_commit_stmt(THD *thd) */ if (res) { - (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); #ifdef HAVE_REPLICATION repl_semisync_master.waitAfterRollback(thd, FALSE); #endif } else { - (void) RUN_HOOK(transaction, after_commit, (thd, FALSE)); #ifdef HAVE_REPLICATION repl_semisync_master.waitAfterCommit(thd, FALSE); #endif @@ -590,7 +584,6 @@ bool trans_rollback_stmt(THD *thd) trans_reset_one_shot_chistics(thd); } - (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); #ifdef HAVE_REPLICATION repl_semisync_master.waitAfterRollback(thd, FALSE); #endif -- cgit v1.2.1