diff options
author | He Zhenxing <zhenxing.he@sun.com> | 2009-09-26 12:49:49 +0800 |
---|---|---|
committer | He Zhenxing <zhenxing.he@sun.com> | 2009-09-26 12:49:49 +0800 |
commit | 623ed58cfda0aef6b6bf545a4200357a58a8a4cc (patch) | |
tree | 28e6a4c77de3c3073b4dbe0b0e09e019adeaa556 | |
parent | e465d113832aeac61a36902c7976d455e1525234 (diff) | |
download | mariadb-git-623ed58cfda0aef6b6bf545a4200357a58a8a4cc.tar.gz |
Backporting WL#4398 WL#1720
Backporting BUG#44058 BUG#42244 BUG#45672 BUG#45673
Backporting BUG#45819 BUG#45973 BUG#39012
34 files changed, 4240 insertions, 85 deletions
diff --git a/.bzr-mysql/default.conf b/.bzr-mysql/default.conf index f044f8e62da..44969de4744 100644 --- a/.bzr-mysql/default.conf +++ b/.bzr-mysql/default.conf @@ -1,4 +1,4 @@ [MYSQL] post_commit_to = "commits@lists.mysql.com" post_push_to = "commits@lists.mysql.com" -tree_name = "mysql-5.1" +tree_name = "mysql-5.1-rep-semisync" diff --git a/include/mysql/plugin.h b/include/mysql/plugin.h index 2e59262d061..45d0234cb67 100644 --- a/include/mysql/plugin.h +++ b/include/mysql/plugin.h @@ -16,6 +16,11 @@ #ifndef _my_plugin_h #define _my_plugin_h +/* size_t */ +#include <stdlib.h> + +typedef struct st_mysql MYSQL; + /* On Windows, exports from DLL need to be declared @@ -75,7 +80,8 @@ typedef struct st_mysql_xid MYSQL_XID; #define MYSQL_FTPARSER_PLUGIN 2 /* Full-text parser plugin */ #define MYSQL_DAEMON_PLUGIN 3 /* The daemon/raw plugin type */ #define MYSQL_INFORMATION_SCHEMA_PLUGIN 4 /* The I_S plugin type */ -#define MYSQL_MAX_PLUGIN_TYPE_NUM 5 /* The number of plugin types */ +#define MYSQL_REPLICATION_PLUGIN 5 /* The replication plugin type */ +#define MYSQL_MAX_PLUGIN_TYPE_NUM 6 /* The number of plugin types */ /* We use the following strings to define licenses for plugins */ #define PLUGIN_LICENSE_PROPRIETARY 0 @@ -650,6 +656,17 @@ struct st_mysql_information_schema int interface_version; }; +/* + API for Replication plugin. (MYSQL_REPLICATION_PLUGIN) +*/ + #define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100 + + /** + Replication plugin descriptor + */ + struct Mysql_replication { + int interface_version; + }; /* st_mysql_value struct for reading values from mysqld. @@ -801,6 +818,64 @@ void mysql_query_cache_invalidate4(MYSQL_THD thd, const char *key, unsigned int key_length, int using_trx); +/** + Get the value of user variable as an integer. + + This function will return the value of variable @a name as an + integer. If the original value of the variable is not an integer, + the value will be converted into an integer. + + @param name user variable name + @param value pointer to return the value + @param null_value if not NULL, the function will set it to true if + the value of variable is null, set to false if not + + @retval 0 Success + @retval 1 Variable not found +*/ +int get_user_var_int(const char *name, + long long int *value, int *null_value); + +/** + Get the value of user variable as a double precision float number. + + This function will return the value of variable @a name as real + number. If the original value of the variable is not a real number, + the value will be converted into a real number. + + @param name user variable name + @param value pointer to return the value + @param null_value if not NULL, the function will set it to true if + the value of variable is null, set to false if not + + @retval 0 Success + @retval 1 Variable not found +*/ +int get_user_var_real(const char *name, + double *value, int *null_value); + +/** + Get the value of user variable as a string. + + This function will return the value of variable @a name as + string. If the original value of the variable is not a string, + the value will be converted into a string. + + @param name user variable name + @param value pointer to the value buffer + @param len length of the value buffer + @param precision precision of the value if it is a float number + @param null_value if not NULL, the function will set it to true if + the value of variable is null, set to false if not + + @retval 0 Success + @retval 1 Variable not found +*/ +int get_user_var_str(const char *name, + char *value, unsigned long len, + unsigned int precision, int *null_value); + + #ifdef __cplusplus } #endif diff --git a/include/mysql/plugin.h.pp b/include/mysql/plugin.h.pp index 50511f515ab..d864140333b 100644 --- a/include/mysql/plugin.h.pp +++ b/include/mysql/plugin.h.pp @@ -1,3 +1,5 @@ +#include <stdlib.h> +typedef struct st_mysql MYSQL; struct st_mysql_lex_string { char *str; @@ -105,6 +107,9 @@ struct st_mysql_information_schema { int interface_version; }; +struct Mysql_replication { + int interface_version; +}; struct st_mysql_value { int (*value_type)(struct st_mysql_value *); @@ -137,3 +142,10 @@ void thd_get_xid(const void* thd, MYSQL_XID *xid); void mysql_query_cache_invalidate4(void* thd, const char *key, unsigned int key_length, int using_trx); +int get_user_var_int(const char *name, + long long int *value, int *null_value); +int get_user_var_real(const char *name, + double *value, int *null_value); +int get_user_var_str(const char *name, + char *value, unsigned long len, + unsigned int precision, int *null_value); diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt index 8500d73863a..f7be98889ef 100644 --- a/libmysqld/CMakeLists.txt +++ b/libmysqld/CMakeLists.txt @@ -139,6 +139,7 @@ SET(LIBMYSQLD_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc ../sql/time.cc ../sql/tztime.cc ../sql/uniques.cc ../sql/unireg.cc ../sql/partition_info.cc ../sql/sql_connect.cc ../sql/scheduler.cc ../sql/event_parse_data.cc + ../sql/rpl_handler.cc ${GEN_SOURCES} ${LIB_SOURCES}) diff --git a/libmysqld/Makefile.am b/libmysqld/Makefile.am index a9bd8d9e17c..244400e1b8d 100644 --- a/libmysqld/Makefile.am +++ b/libmysqld/Makefile.am @@ -76,7 +76,8 @@ sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \ rpl_filter.cc sql_partition.cc sql_builtin.cc sql_plugin.cc \ sql_tablespace.cc \ rpl_injector.cc my_user.c partition_info.cc \ - sql_servers.cc event_parse_data.cc + sql_servers.cc event_parse_data.cc \ + rpl_handler.cc libmysqld_int_a_SOURCES= $(libmysqld_sources) nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources) diff --git a/mysql-test/mysql-test-run.pl b/mysql-test/mysql-test-run.pl index 114b6c84aa3..434896df6a5 100755 --- a/mysql-test/mysql-test-run.pl +++ b/mysql-test/mysql-test-run.pl @@ -1815,6 +1815,26 @@ sub environment_setup { $ENV{'EXAMPLE_PLUGIN_LOAD'}="--plugin_load=;EXAMPLE=".$plugin_filename.";"; } + # -------------------------------------------------------------------------- + # Add the path where mysqld will find semisync plugins + # -------------------------------------------------------------------------- + my $lib_semisync_master_plugin= + mtr_file_exists("$basedir/plugin/semisync/.libs/libsemisync_master.so"); + my $lib_semisync_slave_plugin= + mtr_file_exists("$basedir/plugin/semisync/.libs/libsemisync_slave.so"); + if ($lib_semisync_master_plugin && $lib_semisync_slave_plugin) + { + $ENV{'SEMISYNC_MASTER_PLUGIN'}= basename($lib_semisync_master_plugin); + $ENV{'SEMISYNC_SLAVE_PLUGIN'}= basename($lib_semisync_slave_plugin); + $ENV{'SEMISYNC_PLUGIN_OPT'}= "--plugin-dir=".dirname($lib_semisync_master_plugin); + } + else + { + $ENV{'SEMISYNC_MASTER_PLUGIN'}= ""; + $ENV{'SEMISYNC_SLAVE_PLUGIN'}= ""; + $ENV{'SEMISYNC_PLUGIN_OPT'}=""; + } + # ---------------------------------------------------- # Add the path where mysqld will find mypluglib.so # ---------------------------------------------------- diff --git a/mysql-test/suite/rpl/t/rpl000017.test b/mysql-test/suite/rpl/t/rpl000017.test index 2ba321cd8c3..d6b3e46fa31 100644 --- a/mysql-test/suite/rpl/t/rpl000017.test +++ b/mysql-test/suite/rpl/t/rpl000017.test @@ -6,6 +6,7 @@ grant replication slave on *.* to replicate@localhost identified by 'aaaaaaaaaaa grant replication slave on *.* to replicate@127.0.0.1 identified by 'aaaaaaaaaaaaaaab'; connection slave; start slave; +source include/wait_for_slave_to_start.inc; connection master; --disable_warnings drop table if exists t1; diff --git a/plugin/semisync/Makefile.am b/plugin/semisync/Makefile.am new file mode 100644 index 00000000000..dd9a630670c --- /dev/null +++ b/plugin/semisync/Makefile.am @@ -0,0 +1,35 @@ +# Copyright (C) 2006 MySQL AB +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +## Makefile.am for semi-synchronous replication + +pkgplugindir = $(pkglibdir)/plugin +INCLUDES = -I$(top_srcdir)/include \ + -I$(top_srcdir)/sql \ + -I$(srcdir) + +noinst_HEADERS = semisync.h semisync_master.h semisync_slave.h + +pkgplugin_LTLIBRARIES = libsemisync_master.la libsemisync_slave.la + +libsemisync_master_la_LDFLAGS = -module +libsemisync_master_la_CXXFLAGS= $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN +libsemisync_master_la_CFLAGS = $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN +libsemisync_master_la_SOURCES = semisync.cc semisync_master.cc semisync_master_plugin.cc + +libsemisync_slave_la_LDFLAGS = -module +libsemisync_slave_la_CXXFLAGS= $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN +libsemisync_slave_la_CFLAGS = $(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN +libsemisync_slave_la_SOURCES = semisync.cc semisync_slave.cc semisync_slave_plugin.cc diff --git a/plugin/semisync/configure.in b/plugin/semisync/configure.in new file mode 100644 index 00000000000..894251258db --- /dev/null +++ b/plugin/semisync/configure.in @@ -0,0 +1,9 @@ +# configure.in for semi-synchronous replication + +AC_INIT(mysql-semi-sync-plugin, 0.2) +AM_INIT_AUTOMAKE +AC_DISABLE_STATIC +AC_PROG_LIBTOOL +AC_CONFIG_FILES([Makefile]) +AC_OUTPUT + diff --git a/plugin/semisync/plug.in b/plugin/semisync/plug.in new file mode 100644 index 00000000000..917c8950f02 --- /dev/null +++ b/plugin/semisync/plug.in @@ -0,0 +1,3 @@ +MYSQL_PLUGIN(semisync,[Semi-synchronous Replication Plugin], + [Semi-synchronous replication plugin.]) +MYSQL_PLUGIN_DYNAMIC(semisync, [libsemisync_master.la libsemisync_slave.la]) diff --git a/plugin/semisync/semisync.cc b/plugin/semisync/semisync.cc new file mode 100644 index 00000000000..83c7791c14b --- /dev/null +++ b/plugin/semisync/semisync.cc @@ -0,0 +1,30 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#include "semisync.h" + +const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef; +const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01; + + +const unsigned long Trace::kTraceGeneral = 0x0001; +const unsigned long Trace::kTraceDetail = 0x0010; +const unsigned long Trace::kTraceNetWait = 0x0020; +const unsigned long Trace::kTraceFunction = 0x0040; + +const char ReplSemiSyncBase::kSyncHeader[2] = + {ReplSemiSyncBase::kPacketMagicNum, 0}; diff --git a/plugin/semisync/semisync.h b/plugin/semisync/semisync.h new file mode 100644 index 00000000000..c9d35a093f6 --- /dev/null +++ b/plugin/semisync/semisync.h @@ -0,0 +1,95 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#ifndef SEMISYNC_H +#define SEMISYNC_H + +#include <stdint.h> +#include <string.h> +#include <assert.h> +#include <sys/time.h> +#include <time.h> +#include <stdio.h> +#include <pthread.h> +#include <mysql.h> + +typedef uint32_t uint32; +typedef unsigned long long my_off_t; +#define FN_REFLEN 512 /* Max length of full path-name */ +void sql_print_error(const char *format, ...); +void sql_print_warning(const char *format, ...); +void sql_print_information(const char *format, ...); +extern unsigned long max_connections; + +#define MYSQL_SERVER +#define HAVE_REPLICATION +#include <my_global.h> +#include <my_pthread.h> +#include <mysql/plugin.h> +#include <replication.h> + +typedef struct st_mysql_show_var SHOW_VAR; +typedef struct st_mysql_sys_var SYS_VAR; + + +/** + This class is used to trace function calls and other process + information +*/ +class Trace { +public: + static const unsigned long kTraceFunction; + static const unsigned long kTraceGeneral; + static const unsigned long kTraceDetail; + static const unsigned long kTraceNetWait; + + unsigned long trace_level_; /* the level for tracing */ + + inline void function_enter(const char *func_name) + { + if (trace_level_ & kTraceFunction) + sql_print_information("---> %s enter", func_name); + } + inline int function_exit(const char *func_name, int exit_code) + { + if (trace_level_ & kTraceFunction) + sql_print_information("<--- %s exit (%d)", func_name, exit_code); + return exit_code; + } + + Trace() + :trace_level_(0L) + {} + Trace(unsigned long trace_level) + :trace_level_(trace_level) + {} +}; + +/** + Base class for semi-sync master and slave classes +*/ +class ReplSemiSyncBase + :public Trace { +public: + static const char kSyncHeader[2]; /* three byte packet header */ + + /* Constants in network packet header. */ + static const unsigned char kPacketMagicNum; + static const unsigned char kPacketFlagSync; +}; + +#endif /* SEMISYNC_H */ diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc new file mode 100644 index 00000000000..b3454c49829 --- /dev/null +++ b/plugin/semisync/semisync_master.cc @@ -0,0 +1,1199 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#include "semisync_master.h" + +#define TIME_THOUSAND 1000 +#define TIME_MILLION 1000000 +#define TIME_BILLION 1000000000 + +/* This indicates whether semi-synchronous replication is enabled. */ +char rpl_semi_sync_master_enabled; +unsigned long rpl_semi_sync_master_timeout; +unsigned long rpl_semi_sync_master_trace_level; +unsigned long rpl_semi_sync_master_status = 0; +unsigned long rpl_semi_sync_master_yes_transactions = 0; +unsigned long rpl_semi_sync_master_no_transactions = 0; +unsigned long rpl_semi_sync_master_off_times = 0; +unsigned long rpl_semi_sync_master_timefunc_fails = 0; +unsigned long rpl_semi_sync_master_num_timeouts = 0; +unsigned long rpl_semi_sync_master_wait_sessions = 0; +unsigned long rpl_semi_sync_master_back_wait_pos = 0; +unsigned long rpl_semi_sync_master_trx_wait_time = 0; +unsigned long long rpl_semi_sync_master_trx_wait_num = 0; +unsigned long rpl_semi_sync_master_net_wait_time = 0; +unsigned long long rpl_semi_sync_master_net_wait_num = 0; +unsigned long rpl_semi_sync_master_clients = 0; +unsigned long long rpl_semi_sync_master_net_wait_total_time = 0; +unsigned long long rpl_semi_sync_master_trx_wait_total_time = 0; + + +static int getWaitTime(const struct timeval& start_tv); + +/******************************************************************************* + * + * <ActiveTranx> class : manage all active transaction nodes + * + ******************************************************************************/ + +ActiveTranx::ActiveTranx(int max_connections, + pthread_mutex_t *lock, + unsigned long trace_level) + : Trace(trace_level), num_transactions_(max_connections), + num_entries_(max_connections << 1), + lock_(lock) +{ + /* Allocate the memory for the array */ + node_array_ = new TranxNode[num_transactions_]; + for (int idx = 0; idx < num_transactions_; ++idx) + { + node_array_[idx].log_pos_ = 0; + node_array_[idx].hash_next_ = NULL; + node_array_[idx].next_ = node_array_ + idx + 1; + + node_array_[idx].log_name_ = new char[FN_REFLEN]; + node_array_[idx].log_name_[0] = '\x0'; + } + node_array_[num_transactions_-1].next_ = NULL; + + /* All nodes in the array go to the pool initially. */ + free_pool_ = node_array_; + + /* No transactions are in the list initially. */ + trx_front_ = NULL; + trx_rear_ = NULL; + + /* Create the hash table to find a transaction's ending event. */ + trx_htb_ = new TranxNode *[num_entries_]; + for (int idx = 0; idx < num_entries_; ++idx) + trx_htb_[idx] = NULL; + + sql_print_information("Semi-sync replication initialized for %d " + "transactions.", num_transactions_); +} + +ActiveTranx::~ActiveTranx() +{ + for (int idx = 0; idx < num_transactions_; ++idx) + { + delete [] node_array_[idx].log_name_; + node_array_[idx].log_name_ = NULL; + } + + delete [] node_array_; + delete [] trx_htb_; + + node_array_ = NULL; + trx_htb_ = NULL; + num_transactions_ = 0; + num_entries_ = 0; +} + +unsigned int ActiveTranx::calc_hash(const unsigned char *key, + unsigned int length) +{ + unsigned int nr = 1, nr2 = 4; + + /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */ + while (length--) + { + nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8); + nr2 += 3; + } + return((unsigned int) nr); +} + +unsigned int ActiveTranx::get_hash_value(const char *log_file_name, + my_off_t log_file_pos) +{ + unsigned int hash1 = calc_hash((const unsigned char *)log_file_name, + strlen(log_file_name)); + unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos), + sizeof(log_file_pos)); + + return (hash1 + hash2) % num_entries_; +} + +ActiveTranx::TranxNode* ActiveTranx::alloc_tranx_node() +{ + TranxNode *ptr = free_pool_; + + if (free_pool_) + { + free_pool_ = free_pool_->next_; + ptr->next_ = NULL; + ptr->hash_next_ = NULL; + } + else + { + /* + free_pool should never be NULL here, because we have + max_connections number of pre-allocated nodes. + */ + sql_print_error("You have encountered a semi-sync bug (free_pool == NULL), " + "please report to http://bugs.mysql.com"); + assert(free_pool_); + } + + return ptr; +} + +int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1, + const char *log_file_name2, my_off_t log_file_pos2) +{ + int cmp = strcmp(log_file_name1, log_file_name2); + + if (cmp != 0) + return cmp; + + if (log_file_pos1 > log_file_pos2) + return 1; + else if (log_file_pos1 < log_file_pos2) + return -1; + return 0; +} + +int ActiveTranx::insert_tranx_node(const char *log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ActiveTranx:insert_tranx_node"; + TranxNode *ins_node; + int result = 0; + unsigned int hash_val; + + function_enter(kWho); + + ins_node = alloc_tranx_node(); + if (!ins_node) + { + sql_print_error("%s: transaction node allocation failed for: (%s, %lu)", + kWho, log_file_name, (unsigned long)log_file_pos); + result = -1; + goto l_end; + } + + /* insert the binlog position in the active transaction list. */ + strcpy(ins_node->log_name_, log_file_name); + ins_node->log_pos_ = log_file_pos; + + if (!trx_front_) + { + /* The list is empty. */ + trx_front_ = trx_rear_ = ins_node; + } + else + { + int cmp = compare(ins_node, trx_rear_); + if (cmp > 0) + { + /* Compare with the tail first. If the transaction happens later in + * binlog, then make it the new tail. + */ + trx_rear_->next_ = ins_node; + trx_rear_ = ins_node; + } + else + { + /* Otherwise, it is an error because the transaction should hold the + * mysql_bin_log.LOCK_log when appending events. + */ + sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), " + "new node (%s, %lu)", kWho, + trx_rear_->log_name_, (unsigned long)trx_rear_->log_pos_, + ins_node->log_name_, (unsigned long)ins_node->log_pos_); + result = -1; + goto l_end; + } + } + + hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_); + ins_node->hash_next_ = trx_htb_[hash_val]; + trx_htb_[hash_val] = ins_node; + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho, + ins_node->log_name_, (unsigned long)ins_node->log_pos_, + hash_val); + + l_end: + return function_exit(kWho, result); +} + +bool ActiveTranx::is_tranx_end_pos(const char *log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ActiveTranx::is_tranx_end_pos"; + function_enter(kWho); + + unsigned int hash_val = get_hash_value(log_file_name, log_file_pos); + TranxNode *entry = trx_htb_[hash_val]; + + while (entry != NULL) + { + if (compare(entry, log_file_name, log_file_pos) == 0) + break; + + entry = entry->hash_next_; + } + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho, + log_file_name, (unsigned long)log_file_pos, hash_val); + + function_exit(kWho, (entry != NULL)); + return (entry != NULL); +} + +int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ActiveTranx::::clear_active_tranx_nodes"; + TranxNode *new_front; + + function_enter(kWho); + + if (log_file_name != NULL) + { + new_front = trx_front_; + + while (new_front) + { + if (compare(new_front, log_file_name, log_file_pos) > 0) + break; + new_front = new_front->next_; + } + } + else + { + /* If log_file_name is NULL, clear everything. */ + new_front = NULL; + } + + if (new_front == NULL) + { + /* No active transaction nodes after the call. */ + + /* Clear the hash table. */ + memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *)); + + /* Clear the active transaction list. */ + if (trx_front_ != NULL) + { + trx_rear_->next_ = free_pool_; + free_pool_ = trx_front_; + trx_front_ = NULL; + trx_rear_ = NULL; + } + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: free all nodes back to free list", kWho); + } + else if (new_front != trx_front_) + { + TranxNode *curr_node, *next_node; + + /* Delete all transaction nodes before the confirmation point. */ + int n_frees = 0; + curr_node = trx_front_; + while (curr_node != new_front) + { + next_node = curr_node->next_; + + /* Put the node in the memory pool. */ + curr_node->next_ = free_pool_; + free_pool_ = curr_node; + n_frees++; + + /* Remove the node from the hash table. */ + unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_); + TranxNode **hash_ptr = &(trx_htb_[hash_val]); + while ((*hash_ptr) != NULL) + { + if ((*hash_ptr) == curr_node) + { + (*hash_ptr) = curr_node->hash_next_; + break; + } + hash_ptr = &((*hash_ptr)->hash_next_); + } + + curr_node = next_node; + } + + trx_front_ = new_front; + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: free %d nodes back until pos (%s, %lu)", + kWho, n_frees, + trx_front_->log_name_, (unsigned long)trx_front_->log_pos_); + } + + return function_exit(kWho, 0); +} + + +/******************************************************************************* + * + * <ReplSemiSyncMaster> class: the basic code layer for sync-replication master. + * <ReplSemiSyncSlave> class: the basic code layer for sync-replication slave. + * + * The most important functions during semi-syn replication listed: + * + * Master: + * . reportReplyBinlog(): called by the binlog dump thread when it receives + * the slave's status information. + * . updateSyncHeader(): based on transaction waiting information, decide + * whether to request the slave to reply. + * . writeTraxInBinlog(): called by the transaction thread when it finishes + * writing all transaction events in binlog. + * . commitTrx(): transaction thread wait for the slave reply. + * + * Slave: + * . slaveReadSyncHeader(): read the semi-sync header from the master, get the + * sync status and get the payload for events. + * . slaveReply(): reply to the master about the replication progress. + * + ******************************************************************************/ + +ReplSemiSyncMaster::ReplSemiSyncMaster() + : active_tranxs_(NULL), + init_done_(false), + reply_file_name_inited_(false), + reply_file_pos_(0L), + wait_file_name_inited_(false), + wait_file_pos_(0), + master_enabled_(false), + wait_timeout_(0L), + state_(0), + enabled_transactions_(0), + disabled_transactions_(0), + switched_off_times_(0), + timefunc_fails_(0), + wait_sessions_(0), + wait_backtraverse_(0), + total_trx_wait_num_(0), + total_trx_wait_time_(0), + total_net_wait_num_(0), + total_net_wait_time_(0), + max_transactions_(0L) +{ + strcpy(reply_file_name_, ""); + strcpy(wait_file_name_, ""); +} + +int ReplSemiSyncMaster::initObject() +{ + int result; + const char *kWho = "ReplSemiSyncMaster::initObject"; + + if (init_done_) + { + fprintf(stderr, "%s called twice\n", kWho); + return 1; + } + init_done_ = true; + + /* References to the parameter works after set_options(). */ + setWaitTimeout(rpl_semi_sync_master_timeout); + setTraceLevel(rpl_semi_sync_master_trace_level); + max_transactions_ = (int)max_connections; + + /* Mutex initialization can only be done after MY_INIT(). */ + pthread_mutex_init(&LOCK_binlog_, MY_MUTEX_INIT_FAST); + pthread_cond_init(&COND_binlog_send_, NULL); + + if (rpl_semi_sync_master_enabled) + result = enableMaster(); + else + result = disableMaster(); + + return result; +} + +int ReplSemiSyncMaster::enableMaster() +{ + int result = 0; + + /* Must have the lock when we do enable of disable. */ + lock(); + + if (!getMasterEnabled()) + { + active_tranxs_ = new ActiveTranx(max_connections, + &LOCK_binlog_, + trace_level_); + if (active_tranxs_ != NULL) + { + commit_file_name_inited_ = false; + reply_file_name_inited_ = false; + wait_file_name_inited_ = false; + + set_master_enabled(true); + state_ = true; + sql_print_information("Semi-sync replication enabled on the master."); + } + else + { + sql_print_error("Cannot allocate memory to enable semi-sync on the master."); + result = -1; + } + } + + unlock(); + + return result; +} + +int ReplSemiSyncMaster::disableMaster() +{ + /* Must have the lock when we do enable of disable. */ + lock(); + + if (getMasterEnabled()) + { + /* Switch off the semi-sync first so that waiting transaction will be + * waken up. + */ + switch_off(); + + assert(active_tranxs_ != NULL); + delete active_tranxs_; + active_tranxs_ = NULL; + + reply_file_name_inited_ = false; + wait_file_name_inited_ = false; + commit_file_name_inited_ = false; + + set_master_enabled(false); + sql_print_information("Semi-sync replication disabled on the master."); + } + + unlock(); + + return 0; +} + +ReplSemiSyncMaster::~ReplSemiSyncMaster() +{ + if (init_done_) + { + pthread_mutex_destroy(&LOCK_binlog_); + pthread_cond_destroy(&COND_binlog_send_); + } + + delete active_tranxs_; +} + +void ReplSemiSyncMaster::lock() +{ + pthread_mutex_lock(&LOCK_binlog_); +} + +void ReplSemiSyncMaster::unlock() +{ + pthread_mutex_unlock(&LOCK_binlog_); +} + +void ReplSemiSyncMaster::cond_broadcast() +{ + pthread_cond_broadcast(&COND_binlog_send_); +} + +int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time) +{ + const char *kWho = "ReplSemiSyncMaster::cond_timewait()"; + int wait_res; + + function_enter(kWho); + wait_res = pthread_cond_timedwait(&COND_binlog_send_, + &LOCK_binlog_, wait_time); + return function_exit(kWho, wait_res); +} + +void ReplSemiSyncMaster::add_slave() +{ + lock(); + rpl_semi_sync_master_clients++; + unlock(); +} + +void ReplSemiSyncMaster::remove_slave() +{ + lock(); + rpl_semi_sync_master_clients--; + unlock(); +} + +bool ReplSemiSyncMaster::is_semi_sync_slave() +{ + int null_value; + long long val= 0; + get_user_var_int("rpl_semi_sync_slave", &val, &null_value); + return val; +} + +int ReplSemiSyncMaster::reportReplyBinlog(const char *log_file_pos) +{ + char log_name[FN_REFLEN]; + char *endptr; + my_off_t log_pos= strtoull(log_file_pos, &endptr, 10); + if (!log_pos || !endptr || *endptr != ':' ) + return 1; + endptr++; // skip the ':' seperator + strncpy(log_name, endptr, FN_REFLEN); + uint32 server_id= 0; + return reportReplyBinlog(server_id, log_name, log_pos); +} + +int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, + const char *log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog"; + int cmp; + bool can_release_threads = false; + bool need_copy_send_pos = true; + + if (!(getMasterEnabled())) + return 0; + + function_enter(kWho); + + lock(); + + /* This is the real check inside the mutex. */ + if (!getMasterEnabled()) + goto l_end; + + if (!is_on()) + /* We check to see whether we can switch semi-sync ON. */ + try_switch_on(server_id, log_file_name, log_file_pos); + + /* The position should increase monotonically, if there is only one + * thread sending the binlog to the slave. + * In reality, to improve the transaction availability, we allow multiple + * sync replication slaves. So, if any one of them get the transaction, + * the transaction session in the primary can move forward. + */ + if (reply_file_name_inited_) + { + cmp = ActiveTranx::compare(log_file_name, log_file_pos, + reply_file_name_, reply_file_pos_); + + /* If the requested position is behind the sending binlog position, + * would not adjust sending binlog position. + * We based on the assumption that there are multiple semi-sync slave, + * and at least one of them shou/ld be up to date. + * If all semi-sync slaves are behind, at least initially, the primary + * can find the situation after the waiting timeout. After that, some + * slaves should catch up quickly. + */ + if (cmp < 0) + { + /* If the position is behind, do not copy it. */ + need_copy_send_pos = false; + } + } + + if (need_copy_send_pos) + { + strcpy(reply_file_name_, log_file_name); + reply_file_pos_ = log_file_pos; + reply_file_name_inited_ = true; + + /* Remove all active transaction nodes before this point. */ + assert(active_tranxs_ != NULL); + active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos); + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: Got reply at (%s, %lu)", kWho, + log_file_name, (unsigned long)log_file_pos); + } + + if (wait_sessions_ > 0) + { + /* Let us check if some of the waiting threads doing a trx + * commit can now proceed. + */ + cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_, + wait_file_name_, wait_file_pos_); + if (cmp >= 0) + { + /* Yes, at least one waiting thread can now proceed: + * let us release all waiting threads with a broadcast + */ + can_release_threads = true; + wait_file_name_inited_ = false; + } + } + + l_end: + unlock(); + + if (can_release_threads) + { + if (trace_level_ & kTraceDetail) + sql_print_information("%s: signal all waiting threads.", kWho); + + cond_broadcast(); + } + + return function_exit(kWho, 0); +} + +int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, + my_off_t trx_wait_binlog_pos) +{ + const char *kWho = "ReplSemiSyncMaster::commitTrx"; + + function_enter(kWho); + + if (getMasterEnabled() && trx_wait_binlog_name) + { + struct timeval start_tv; + struct timespec abstime; + int wait_result, start_time_err; + const char *old_msg= 0; + + start_time_err = gettimeofday(&start_tv, 0); + + /* Acquire the mutex. */ + lock(); + + /* This must be called after acquired the lock */ + old_msg= thd_enter_cond(NULL, &COND_binlog_send_, &LOCK_binlog_, + "Waiting for semi-sync ACK from slave"); + + /* This is the real check inside the mutex. */ + if (!getMasterEnabled() || !is_on() || !rpl_semi_sync_master_clients) + goto l_end; + + if (trace_level_ & kTraceDetail) + { + sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho, + trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos, + (int)is_on()); + } + + while (is_on()) + { + int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_, + trx_wait_binlog_name, trx_wait_binlog_pos); + if (cmp >= 0) + { + /* We have already sent the relevant binlog to the slave: no need to + * wait here. + */ + if (trace_level_ & kTraceDetail) + sql_print_information("%s: Binlog reply is ahead (%s, %lu),", + kWho, reply_file_name_, (unsigned long)reply_file_pos_); + break; + } + + /* Let us update the info about the minimum binlog position of waiting + * threads. + */ + if (wait_file_name_inited_) + { + cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos, + wait_file_name_, wait_file_pos_); + if (cmp <= 0) + { + /* This thd has a lower position, let's update the minimum info. */ + strcpy(wait_file_name_, trx_wait_binlog_name); + wait_file_pos_ = trx_wait_binlog_pos; + + wait_backtraverse_++; + if (trace_level_ & kTraceDetail) + sql_print_information("%s: move back wait position (%s, %lu),", + kWho, wait_file_name_, (unsigned long)wait_file_pos_); + } + } + else + { + strcpy(wait_file_name_, trx_wait_binlog_name); + wait_file_pos_ = trx_wait_binlog_pos; + wait_file_name_inited_ = true; + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: init wait position (%s, %lu),", + kWho, wait_file_name_, (unsigned long)wait_file_pos_); + } + + if (start_time_err == 0) + { + int diff_usecs = start_tv.tv_usec + wait_timeout_ * TIME_THOUSAND; + + /* Calcuate the waiting period. */ + abstime.tv_sec = start_tv.tv_sec; + if (diff_usecs < TIME_MILLION) + { + abstime.tv_nsec = diff_usecs * TIME_THOUSAND; + } + else + { + while (diff_usecs >= TIME_MILLION) + { + abstime.tv_sec++; + diff_usecs -= TIME_MILLION; + } + abstime.tv_nsec = diff_usecs * TIME_THOUSAND; + } + + /* In semi-synchronous replication, we wait until the binlog-dump + * thread has received the reply on the relevant binlog segment from the + * replication slave. + * + * Let us suspend this thread to wait on the condition; + * when replication has progressed far enough, we will release + * these waiting threads. + */ + wait_sessions_++; + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)", + kWho, wait_timeout_, + wait_file_name_, (unsigned long)wait_file_pos_); + + wait_result = cond_timewait(&abstime); + wait_sessions_--; + + if (wait_result != 0) + { + /* This is a real wait timeout. */ + sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), " + "semi-sync up to file %s, position %lu.", + trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos, + reply_file_name_, (unsigned long)reply_file_pos_); + total_wait_timeouts_++; + + /* switch semi-sync off */ + switch_off(); + } + else + { + int wait_time; + + wait_time = getWaitTime(start_tv); + if (wait_time < 0) + { + if (trace_level_ & kTraceGeneral) + { + /* This is a time/gettimeofday function call error. */ + sql_print_error("Replication semi-sync gettimeofday fail1 at " + "wait position (%s, %lu)", + trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos); + } + timefunc_fails_++; + } + else + { + total_trx_wait_num_++; + total_trx_wait_time_ += wait_time; + } + } + } + else + { + if (trace_level_ & kTraceGeneral) + { + /* This is a gettimeofday function call error. */ + sql_print_error("Replication semi-sync gettimeofday fail2 at " + "wait position (%s, %lu)", + trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos); + } + timefunc_fails_++; + + /* switch semi-sync off */ + switch_off(); + } + } + + l_end: + /* Update the status counter. */ + if (is_on() && rpl_semi_sync_master_clients) + enabled_transactions_++; + else + disabled_transactions_++; + + /* The lock held will be released by thd_exit_cond, so no need to + call unlock() here */ + thd_exit_cond(NULL, old_msg); + } + + return function_exit(kWho, 0); +} + +/* Indicate that semi-sync replication is OFF now. + * + * What should we do when it is disabled? The problem is that we want + * the semi-sync replication enabled again when the slave catches up + * later. But, it is not that easy to detect that the slave has caught + * up. This is caused by the fact that MySQL's replication protocol is + * asynchronous, meaning that if the master does not use the semi-sync + * protocol, the slave would not send anything to the master. + * Still, if the master is sending (N+1)-th event, we assume that it is + * an indicator that the slave has received N-th event and earlier ones. + * + * If semi-sync is disabled, all transactions still update the wait + * position with the last position in binlog. But no transactions will + * wait for confirmations and the active transaction list would not be + * maintained. In binlog dump thread, updateSyncHeader() checks whether + * the current sending event catches up with last wait position. If it + * does match, semi-sync will be switched on again. + */ +int ReplSemiSyncMaster::switch_off() +{ + const char *kWho = "ReplSemiSyncMaster::switch_off"; + int result; + + function_enter(kWho); + state_ = false; + + /* Clear the active transaction list. */ + assert(active_tranxs_ != NULL); + result = active_tranxs_->clear_active_tranx_nodes(NULL, 0); + + switched_off_times_++; + wait_file_name_inited_ = false; + reply_file_name_inited_ = false; + sql_print_information("Semi-sync replication switched OFF."); + cond_broadcast(); /* wake up all waiting threads */ + + return function_exit(kWho, result); +} + +int ReplSemiSyncMaster::try_switch_on(int server_id, + const char *log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ReplSemiSyncMaster::try_switch_on"; + bool semi_sync_on = false; + + function_enter(kWho); + + /* If the current sending event's position is larger than or equal to the + * 'largest' commit transaction binlog position, the slave is already + * catching up now and we can switch semi-sync on here. + * If commit_file_name_inited_ indicates there are no recent transactions, + * we can enable semi-sync immediately. + */ + if (commit_file_name_inited_) + { + int cmp = ActiveTranx::compare(log_file_name, log_file_pos, + commit_file_name_, commit_file_pos_); + semi_sync_on = (cmp >= 0); + } + else + { + semi_sync_on = true; + } + + if (semi_sync_on) + { + /* Switch semi-sync replication on. */ + state_ = true; + + sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) " + "at (%s, %lu)", + server_id, log_file_name, + (unsigned long)log_file_pos); + } + + return function_exit(kWho, 0); +} + +int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header, + unsigned long size) +{ + const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader"; + function_enter(kWho); + + int hlen=0; + if (!is_semi_sync_slave()) + { + hlen= 0; + } + else + { + /* No enough space for the extra header, disable semi-sync master */ + if (sizeof(kSyncHeader) > size) + { + sql_print_warning("No enough space in the packet " + "for semi-sync extra header, " + "semi-sync replication disabled"); + disableMaster(); + return 0; + } + + /* Set the magic number and the sync status. By default, no sync + * is required. + */ + memcpy(header, kSyncHeader, sizeof(kSyncHeader)); + hlen= sizeof(kSyncHeader); + } + return function_exit(kWho, hlen); +} + +int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, + const char *log_file_name, + my_off_t log_file_pos, + uint32 server_id) +{ + const char *kWho = "ReplSemiSyncMaster::updateSyncHeader"; + int cmp = 0; + bool sync = false; + + /* If the semi-sync master is not enabled, or the slave is not a semi-sync + * target, do not request replies from the slave. + */ + if (!getMasterEnabled() || !is_semi_sync_slave()) + { + sync = false; + return 0; + } + + function_enter(kWho); + + lock(); + + /* This is the real check inside the mutex. */ + if (!getMasterEnabled()) + { + sync = false; + goto l_end; + } + + if (is_on()) + { + /* semi-sync is ON */ + sync = false; /* No sync unless a transaction is involved. */ + + if (reply_file_name_inited_) + { + cmp = ActiveTranx::compare(log_file_name, log_file_pos, + reply_file_name_, reply_file_pos_); + if (cmp <= 0) + { + /* If we have already got the reply for the event, then we do + * not need to sync the transaction again. + */ + goto l_end; + } + } + + if (wait_file_name_inited_) + { + cmp = ActiveTranx::compare(log_file_name, log_file_pos, + wait_file_name_, wait_file_pos_); + } + else + { + cmp = 1; + } + + /* If we are already waiting for some transaction replies which + * are later in binlog, do not wait for this one event. + */ + if (cmp >= 0) + { + /* + * We only wait if the event is a transaction's ending event. + */ + assert(active_tranxs_ != NULL); + sync = active_tranxs_->is_tranx_end_pos(log_file_name, + log_file_pos); + } + } + else + { + if (commit_file_name_inited_) + { + int cmp = ActiveTranx::compare(log_file_name, log_file_pos, + commit_file_name_, commit_file_pos_); + sync = (cmp >= 0); + } + else + { + sync = true; + } + } + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)", + kWho, server_id, log_file_name, + (unsigned long)log_file_pos, sync, (int)is_on()); + + l_end: + unlock(); + + /* We do not need to clear sync flag because we set it to 0 when we + * reserve the packet header. + */ + if (sync) + (packet)[2] = kPacketFlagSync; + + return function_exit(kWho, 0); +} + +int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, + my_off_t log_file_pos) +{ + const char *kWho = "ReplSemiSyncMaster::writeTranxInBinlog"; + int result = 0; + + function_enter(kWho); + + lock(); + + /* This is the real check inside the mutex. */ + if (!getMasterEnabled()) + goto l_end; + + /* Update the 'largest' transaction commit position seen so far even + * though semi-sync is switched off. + * It is much better that we update commit_file_* here, instead of + * inside commitTrx(). This is mostly because updateSyncHeader() + * will watch for commit_file_* to decide whether to switch semi-sync + * on. The detailed reason is explained in function updateSyncHeader(). + */ + if (commit_file_name_inited_) + { + int cmp = ActiveTranx::compare(log_file_name, log_file_pos, + commit_file_name_, commit_file_pos_); + if (cmp > 0) + { + /* This is a larger position, let's update the maximum info. */ + strcpy(commit_file_name_, log_file_name); + commit_file_pos_ = log_file_pos; + } + } + else + { + strcpy(commit_file_name_, log_file_name); + commit_file_pos_ = log_file_pos; + commit_file_name_inited_ = true; + } + + if (is_on() && rpl_semi_sync_master_clients) + { + assert(active_tranxs_ != NULL); + if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)) + { + /* + if insert tranx_node failed, print a warning message + and turn off semi-sync + */ + sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %ul", + log_file_name, log_file_pos); + switch_off(); + } + } + + l_end: + unlock(); + + return function_exit(kWho, result); +} + +int ReplSemiSyncMaster::resetMaster() +{ + const char *kWho = "ReplSemiSyncMaster::resetMaster"; + int result = 0; + + function_enter(kWho); + + + lock(); + + state_ = getMasterEnabled()? 1 : 0; + + wait_file_name_inited_ = false; + reply_file_name_inited_ = false; + commit_file_name_inited_ = false; + + enabled_transactions_ = 0; + disabled_transactions_ = 0; + switched_off_times_ = 0; + timefunc_fails_ = 0; + wait_sessions_ = 0; + wait_backtraverse_ = 0; + total_trx_wait_num_ = 0; + total_trx_wait_time_ = 0; + total_net_wait_num_ = 0; + total_net_wait_time_ = 0; + + unlock(); + + return function_exit(kWho, result); +} + +void ReplSemiSyncMaster::setExportStats() +{ + lock(); + + rpl_semi_sync_master_status = state_ && rpl_semi_sync_master_clients; + rpl_semi_sync_master_yes_transactions = enabled_transactions_; + rpl_semi_sync_master_no_transactions = disabled_transactions_; + rpl_semi_sync_master_off_times = switched_off_times_; + rpl_semi_sync_master_timefunc_fails = timefunc_fails_; + rpl_semi_sync_master_num_timeouts = total_wait_timeouts_; + rpl_semi_sync_master_wait_sessions = wait_sessions_; + rpl_semi_sync_master_back_wait_pos = wait_backtraverse_; + rpl_semi_sync_master_trx_wait_num = total_trx_wait_num_; + rpl_semi_sync_master_trx_wait_time = + ((total_trx_wait_num_) ? + (unsigned long)((double)total_trx_wait_time_ / + ((double)total_trx_wait_num_)) : 0); + rpl_semi_sync_master_net_wait_num = total_net_wait_num_; + rpl_semi_sync_master_net_wait_time = + ((total_net_wait_num_) ? + (unsigned long)((double)total_net_wait_time_ / + ((double)total_net_wait_num_)) : 0); + + rpl_semi_sync_master_net_wait_total_time = total_net_wait_time_; + rpl_semi_sync_master_trx_wait_total_time = total_trx_wait_time_; + + unlock(); +} + +/* Get the waiting time given the wait's staring time. + * + * Return: + * >= 0: the waiting time in microsecons(us) + * < 0: error in gettimeofday or time back traverse + */ +static int getWaitTime(const struct timeval& start_tv) +{ + unsigned long long start_usecs, end_usecs; + struct timeval end_tv; + int end_time_err; + + /* Starting time in microseconds(us). */ + start_usecs = start_tv.tv_sec * TIME_MILLION + start_tv.tv_usec; + + /* Get the wait time interval. */ + end_time_err = gettimeofday(&end_tv, 0); + + /* Ending time in microseconds(us). */ + end_usecs = end_tv.tv_sec * TIME_MILLION + end_tv.tv_usec; + + if (end_time_err != 0 || end_usecs < start_usecs) + return -1; + + return (int)(end_usecs - start_usecs); +} diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h new file mode 100644 index 00000000000..a1697b2ae67 --- /dev/null +++ b/plugin/semisync/semisync_master.h @@ -0,0 +1,366 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#ifndef SEMISYNC_MASTER_H +#define SEMISYNC_MASTER_H + +#include "semisync.h" + +/** + This class manages memory for active transaction list. + + We record each active transaction with a TranxNode. Because each + session can only have only one open transaction, the total active + transaction nodes can not exceed the maximum sessions. Currently + in MySQL, sessions are the same as connections. +*/ +class ActiveTranx + :public Trace { +private: + struct TranxNode { + char *log_name_; + my_off_t log_pos_; + struct TranxNode *next_; /* the next node in the sorted list */ + struct TranxNode *hash_next_; /* the next node during hash collision */ + }; + + /* The following data structure maintains an active transaction list. */ + TranxNode *node_array_; + TranxNode *free_pool_; + + /* These two record the active transaction list in sort order. */ + TranxNode *trx_front_, *trx_rear_; + + TranxNode **trx_htb_; /* A hash table on active transactions. */ + + int num_transactions_; /* maximum transactions */ + int num_entries_; /* maximum hash table entries */ + pthread_mutex_t *lock_; /* mutex lock */ + + inline void assert_lock_owner(); + + inline TranxNode* alloc_tranx_node(); + + inline unsigned int calc_hash(const unsigned char *key,unsigned int length); + unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos); + + int compare(const char *log_file_name1, my_off_t log_file_pos1, + const TranxNode *node2) { + return compare(log_file_name1, log_file_pos1, + node2->log_name_, node2->log_pos_); + } + int compare(const TranxNode *node1, + const char *log_file_name2, my_off_t log_file_pos2) { + return compare(node1->log_name_, node1->log_pos_, + log_file_name2, log_file_pos2); + } + int compare(const TranxNode *node1, const TranxNode *node2) { + return compare(node1->log_name_, node1->log_pos_, + node2->log_name_, node2->log_pos_); + } + +public: + ActiveTranx(int max_connections, pthread_mutex_t *lock, + unsigned long trace_level); + ~ActiveTranx(); + + /* Insert an active transaction node with the specified position. + * + * Return: + * 0: success; -1 or otherwise: error + */ + int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos); + + /* Clear the active transaction nodes until(inclusive) the specified + * position. + * If log_file_name is NULL, everything will be cleared: the sorted + * list and the hash table will be reset to empty. + * + * Return: + * 0: success; -1 or otherwise: error + */ + int clear_active_tranx_nodes(const char *log_file_name, + my_off_t log_file_pos); + + /* Given a position, check to see whether the position is an active + * transaction's ending position by probing the hash table. + */ + bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos); + + /* Given two binlog positions, compare which one is bigger based on + * (file_name, file_position). + */ + static int compare(const char *log_file_name1, my_off_t log_file_pos1, + const char *log_file_name2, my_off_t log_file_pos2); + +}; + +/** + The extension class for the master of semi-synchronous replication +*/ +class ReplSemiSyncMaster + :public ReplSemiSyncBase { + private: + ActiveTranx *active_tranxs_; /* active transaction list: the list will + be cleared when semi-sync switches off. */ + + /* True when initObject has been called */ + bool init_done_; + + /* This cond variable is signaled when enough binlog has been sent to slave, + * so that a waiting trx can return the 'ok' to the client for a commit. + */ + pthread_cond_t COND_binlog_send_; + + /* Mutex that protects the following state variables and the active + * transaction list. + * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are + * already holding LOCK_binlog_ because it can cause deadlocks. + */ + pthread_mutex_t LOCK_binlog_; + + /* This is set to true when reply_file_name_ contains meaningful data. */ + bool reply_file_name_inited_; + + /* The binlog name up to which we have received replies from any slaves. */ + char reply_file_name_[FN_REFLEN]; + + /* The position in that file up to which we have the reply from any slaves. */ + my_off_t reply_file_pos_; + + /* This is set to true when we know the 'smallest' wait position. */ + bool wait_file_name_inited_; + + /* NULL, or the 'smallest' filename that a transaction is waiting for + * slave replies. + */ + char wait_file_name_[FN_REFLEN]; + + /* The smallest position in that file that a trx is waiting for: the trx + * can proceed and send an 'ok' to the client when the master has got the + * reply from the slave indicating that it already got the binlog events. + */ + my_off_t wait_file_pos_; + + /* This is set to true when we know the 'largest' transaction commit + * position in the binlog file. + * We always maintain the position no matter whether semi-sync is switched + * on switched off. When a transaction wait timeout occurs, semi-sync will + * switch off. Binlog-dump thread can use the three fields to detect when + * slaves catch up on replication so that semi-sync can switch on again. + */ + bool commit_file_name_inited_; + + /* The 'largest' binlog filename that a commit transaction is seeing. */ + char commit_file_name_[FN_REFLEN]; + + /* The 'largest' position in that file that a commit transaction is seeing. */ + my_off_t commit_file_pos_; + + /* All global variables which can be set by parameters. */ + volatile bool master_enabled_; /* semi-sync is enabled on the master */ + unsigned long wait_timeout_; /* timeout period(ms) during tranx wait */ + + /* All status variables. */ + bool state_; /* whether semi-sync is switched */ + unsigned long enabled_transactions_; /* semi-sync'ed tansactions */ + unsigned long disabled_transactions_; /* non-semi-sync'ed tansactions */ + unsigned long switched_off_times_; /* how many times are switched off? */ + unsigned long timefunc_fails_; /* how many time function fails? */ + unsigned long total_wait_timeouts_; /* total number of wait timeouts */ + unsigned long wait_sessions_; /* how many sessions wait for replies? */ + unsigned long wait_backtraverse_; /* wait position back traverses */ + unsigned long long total_trx_wait_num_; /* total trx waits: non-timeout ones */ + unsigned long long total_trx_wait_time_; /* total trx wait time: in us */ + unsigned long long total_net_wait_num_; /* total network waits */ + unsigned long long total_net_wait_time_; /* total network wait time */ + + /* The number of maximum active transactions. This should be the same as + * maximum connections because MySQL does not do connection sharing now. + */ + int max_transactions_; + + void lock(); + void unlock(); + void cond_broadcast(); + int cond_timewait(struct timespec *wait_time); + + /* Is semi-sync replication on? */ + bool is_on() { + return (state_); + } + + void set_master_enabled(bool enabled) { + master_enabled_ = enabled; + } + + /* Switch semi-sync off because of timeout in transaction waiting. */ + int switch_off(); + + /* Switch semi-sync on when slaves catch up. */ + int try_switch_on(int server_id, + const char *log_file_name, my_off_t log_file_pos); + + public: + ReplSemiSyncMaster(); + ~ReplSemiSyncMaster(); + + bool getMasterEnabled() { + return master_enabled_; + } + void setTraceLevel(unsigned long trace_level) { + trace_level_ = trace_level; + if (active_tranxs_) + active_tranxs_->trace_level_ = trace_level; + } + + /* Set the transaction wait timeout period, in milliseconds. */ + void setWaitTimeout(unsigned long wait_timeout) { + wait_timeout_ = wait_timeout; + } + + /* Initialize this class after MySQL parameters are initialized. this + * function should be called once at bootstrap time. + */ + int initObject(); + + /* Enable the object to enable semi-sync replication inside the master. */ + int enableMaster(); + + /* Enable the object to enable semi-sync replication inside the master. */ + int disableMaster(); + + /* Add a semi-sync replication slave */ + void add_slave(); + + /* Remove a semi-sync replication slave */ + void remove_slave(); + + /* Is the slave servered by the thread requested semi-sync */ + bool is_semi_sync_slave(); + + int reportReplyBinlog(const char *log_file_pos); + + /* In semi-sync replication, reports up to which binlog position we have + * received replies from the slave indicating that it already get the events. + * + * Input: + * server_id - (IN) master server id number + * log_file_name - (IN) binlog file name + * end_offset - (IN) the offset in the binlog file up to which we have + * the replies from the slave + * + * Return: + * 0: success; -1 or otherwise: error + */ + int reportReplyBinlog(uint32 server_id, + const char* log_file_name, + my_off_t end_offset); + + /* Commit a transaction in the final step. This function is called from + * InnoDB before returning from the low commit. If semi-sync is switch on, + * the function will wait to see whether binlog-dump thread get the reply for + * the events of the transaction. Remember that this is not a direct wait, + * instead, it waits to see whether the binlog-dump thread has reached the + * point. If the wait times out, semi-sync status will be switched off and + * all other transaction would not wait either. + * + * Input: (the transaction events' ending binlog position) + * trx_wait_binlog_name - (IN) ending position's file name + * trx_wait_binlog_pos - (IN) ending position's file offset + * + * Return: + * 0: success; -1 or otherwise: error + */ + int commitTrx(const char* trx_wait_binlog_name, + my_off_t trx_wait_binlog_pos); + + /* Reserve space in the replication event packet header: + * . slave semi-sync off: 1 byte - (0) + * . slave semi-sync on: 3 byte - (0, 0xef, 0/1} + * + * Input: + * header - (IN) the header buffer + * size - (IN) size of the header buffer + * + * Return: + * size of the bytes reserved for header + */ + int reserveSyncHeader(unsigned char *header, unsigned long size); + + /* Update the sync bit in the packet header to indicate to the slave whether + * the master will wait for the reply of the event. If semi-sync is switched + * off and we detect that the slave is catching up, we switch semi-sync on. + * + * Input: + * packet - (IN) the packet containing the replication event + * log_file_name - (IN) the event ending position's file name + * log_file_pos - (IN) the event ending position's file offset + * server_id - (IN) master server id number + * + * Return: + * 0: success; -1 or otherwise: error + */ + int updateSyncHeader(unsigned char *packet, + const char *log_file_name, + my_off_t log_file_pos, + uint32 server_id); + + /* Called when a transaction finished writing binlog events. + * . update the 'largest' transactions' binlog event position + * . insert the ending position in the active transaction list if + * semi-sync is on + * + * Input: (the transaction events' ending binlog position) + * log_file_name - (IN) transaction ending position's file name + * log_file_pos - (IN) transaction ending position's file offset + * + * Return: + * 0: success; -1 or otherwise: error + */ + int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos); + + /* Export internal statistics for semi-sync replication. */ + void setExportStats(); + + /* 'reset master' command is issued from the user and semi-sync need to + * go off for that. + */ + int resetMaster(); +}; + +/* System and status variables for the master component */ +extern char rpl_semi_sync_master_enabled; +extern unsigned long rpl_semi_sync_master_timeout; +extern unsigned long rpl_semi_sync_master_trace_level; +extern unsigned long rpl_semi_sync_master_status; +extern unsigned long rpl_semi_sync_master_yes_transactions; +extern unsigned long rpl_semi_sync_master_no_transactions; +extern unsigned long rpl_semi_sync_master_off_times; +extern unsigned long rpl_semi_sync_master_timefunc_fails; +extern unsigned long rpl_semi_sync_master_num_timeouts; +extern unsigned long rpl_semi_sync_master_wait_sessions; +extern unsigned long rpl_semi_sync_master_back_wait_pos; +extern unsigned long rpl_semi_sync_master_trx_wait_time; +extern unsigned long rpl_semi_sync_master_net_wait_time; +extern unsigned long long rpl_semi_sync_master_net_wait_num; +extern unsigned long long rpl_semi_sync_master_trx_wait_num; +extern unsigned long long rpl_semi_sync_master_net_wait_total_time; +extern unsigned long long rpl_semi_sync_master_trx_wait_total_time; +extern unsigned long rpl_semi_sync_master_clients; + +#endif /* SEMISYNC_MASTER_H */ diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc new file mode 100644 index 00000000000..dc19d09e622 --- /dev/null +++ b/plugin/semisync/semisync_master_plugin.cc @@ -0,0 +1,380 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#include "semisync_master.h" + +ReplSemiSyncMaster repl_semisync; + +int repl_semi_report_binlog_update(Binlog_storage_param *param, + const char *log_file, + my_off_t log_pos, uint32 flags) +{ + int error= 0; + + if (repl_semisync.getMasterEnabled()) + { + /* + Let us store the binlog file name and the position, so that + we know how long to wait for the binlog to the replicated to + the slave in synchronous replication. + */ + error= repl_semisync.writeTranxInBinlog(log_file, + log_pos); + } + + return error; +} + +int repl_semi_request_commit(Trans_param *param) +{ + return 0; +} + +int repl_semi_report_commit(Trans_param *param) +{ + + bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS; + + if (is_real_trans && param->log_pos) + { + const char *binlog_name= param->log_file; + return repl_semisync.commitTrx(binlog_name, param->log_pos); + } + return 0; +} + +int repl_semi_report_rollback(Trans_param *param) +{ + return repl_semi_report_commit(param); +} + +int repl_semi_binlog_dump_start(Binlog_transmit_param *param, + const char *log_file, + my_off_t log_pos) +{ + bool semi_sync_slave= repl_semisync.is_semi_sync_slave(); + + if (semi_sync_slave) + /* One more semi-sync slave */ + repl_semisync.add_slave(); + sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)", + semi_sync_slave ? "semi-sync" : "asynchronous", + param->server_id, log_file, (unsigned long)log_pos); + + return 0; +} + +int repl_semi_binlog_dump_end(Binlog_transmit_param *param) +{ + bool semi_sync_slave= repl_semisync.is_semi_sync_slave(); + + sql_print_information("Stop %s binlog_dump to slave (server_id: %d)", + semi_sync_slave ? "semi-sync" : "asynchronous", + param->server_id); + if (semi_sync_slave) + { + /* One less semi-sync slave */ + repl_semisync.remove_slave(); + } + return 0; +} + +int repl_semi_reserve_header(Binlog_transmit_param *param, + unsigned char *header, + unsigned long size, unsigned long *len) +{ + *len += repl_semisync.reserveSyncHeader(header, size); + return 0; +} + +int repl_semi_before_send_event(Binlog_transmit_param *param, + unsigned char *packet, unsigned long len, + const char *log_file, my_off_t log_pos) +{ + return repl_semisync.updateSyncHeader(packet, + log_file, + log_pos, + param->server_id); +} + +int repl_semi_after_send_event(Binlog_transmit_param *param, + const char *event_buf, unsigned long len) +{ + return 0; +} + +int repl_semi_reset_master(Binlog_transmit_param *param) +{ + if (repl_semisync.resetMaster()) + return 1; + return 0; +} + +/* + semisync system variables + */ +static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd, + SYS_VAR *var, + void *ptr, + const void *val); + +static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd, + SYS_VAR *var, + void *ptr, + const void *val); + +static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd, + SYS_VAR *var, + void *ptr, + const void *val); + +static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd, + SYS_VAR *var, + void *ptr, + const void *val); + +static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled, + PLUGIN_VAR_OPCMDARG, + "Enable semi-synchronous replication master (disabled by default). ", + NULL, // check + &fix_rpl_semi_sync_master_enabled, // update + 0); + +static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout, + PLUGIN_VAR_OPCMDARG, + "The timeout value (in ms) for semi-synchronous replication in the master", + NULL, // check + fix_rpl_semi_sync_master_timeout, // update + 10000, 0, ~0L, 1); + +static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level, + PLUGIN_VAR_OPCMDARG, + "The tracing level for semi-sync replication.", + NULL, // check + &fix_rpl_semi_sync_master_trace_level, // update + 32, 0, ~0L, 1); + +/* + Use a SESSION instead of GLOBAL variable for slave to send reply to + avoid requiring SUPER privilege. +*/ +static MYSQL_THDVAR_STR(reply_log_file_pos, + PLUGIN_VAR_NOCMDOPT, + "The log filename and position slave has queued to relay log.", + NULL, // check + &fix_rpl_semi_sync_master_reply_log_file_pos, + ""); + +static SYS_VAR* semi_sync_master_system_vars[]= { + MYSQL_SYSVAR(enabled), + MYSQL_SYSVAR(timeout), + MYSQL_SYSVAR(trace_level), + MYSQL_SYSVAR(reply_log_file_pos), + NULL, +}; + + +static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd, + SYS_VAR *var, + void *ptr, + const void *val) +{ + *(unsigned long *)ptr= *(unsigned long *)val; + repl_semisync.setWaitTimeout(rpl_semi_sync_master_timeout); + return; +} + +static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd, + SYS_VAR *var, + void *ptr, + const void *val) +{ + *(unsigned long *)ptr= *(unsigned long *)val; + repl_semisync.setTraceLevel(rpl_semi_sync_master_trace_level); + return; +} + +static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd, + SYS_VAR *var, + void *ptr, + const void *val) +{ + *(char *)ptr= *(char *)val; + if (rpl_semi_sync_master_enabled) + { + if (repl_semisync.enableMaster() != 0) + rpl_semi_sync_master_enabled = false; + } + else + { + if (repl_semisync.disableMaster() != 0) + rpl_semi_sync_master_enabled = true; + } + + return; +} + +static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd, + SYS_VAR *var, + void *ptr, + const void *val) +{ + const char *log_file_pos= *(char **)val; + + if (repl_semisync.reportReplyBinlog(log_file_pos)) + sql_print_error("report slave binlog reply failed."); + + return; +} + +Trans_observer trans_observer = { + sizeof(Trans_observer), // len + + repl_semi_report_commit, // after_commit + repl_semi_report_rollback, // after_rollback +}; + +Binlog_storage_observer storage_observer = { + sizeof(Binlog_storage_observer), // len + + repl_semi_report_binlog_update, // report_update +}; + +Binlog_transmit_observer transmit_observer = { + sizeof(Binlog_transmit_observer), // len + + repl_semi_binlog_dump_start, // start + repl_semi_binlog_dump_end, // stop + repl_semi_reserve_header, // reserve_header + repl_semi_before_send_event, // before_send_event + repl_semi_after_send_event, // after_send_event + repl_semi_reset_master, // reset +}; + + +#define SHOW_FNAME(name) \ + rpl_semi_sync_master_show_##name + +#define DEF_SHOW_FUNC(name, show_type) \ + static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \ + { \ + repl_semisync.setExportStats(); \ + var->type= show_type; \ + var->value= (char *)&rpl_semi_sync_master_##name; \ + return 0; \ + } + +DEF_SHOW_FUNC(clients, SHOW_LONG) +DEF_SHOW_FUNC(net_wait_time, SHOW_LONG) +DEF_SHOW_FUNC(net_wait_total_time, SHOW_LONGLONG) +DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG) +DEF_SHOW_FUNC(off_times, SHOW_LONG) +DEF_SHOW_FUNC(no_transactions, SHOW_LONG) +DEF_SHOW_FUNC(status, SHOW_BOOL) +DEF_SHOW_FUNC(timefunc_fails, SHOW_LONG) +DEF_SHOW_FUNC(trx_wait_time, SHOW_LONG) +DEF_SHOW_FUNC(trx_wait_total_time, SHOW_LONGLONG) +DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG) +DEF_SHOW_FUNC(back_wait_pos, SHOW_LONG) +DEF_SHOW_FUNC(wait_sessions, SHOW_LONG) +DEF_SHOW_FUNC(yes_transactions, SHOW_LONG) + + +/* plugin status variables */ +static SHOW_VAR semi_sync_master_status_vars[]= { + {"Rpl_semi_sync_master_clients", (char*) &SHOW_FNAME(clients), SHOW_FUNC}, + {"Rpl_semi_sync_master_net_avg_wait_time", + (char*) &SHOW_FNAME(net_wait_time), SHOW_FUNC}, + {"Rpl_semi_sync_master_net_wait_time", + (char*) &SHOW_FNAME(net_wait_total_time), SHOW_FUNC}, + {"Rpl_semi_sync_master_net_waits", (char*) &SHOW_FNAME(net_wait_num), SHOW_FUNC}, + {"Rpl_semi_sync_master_no_times", (char*) &SHOW_FNAME(off_times), SHOW_FUNC}, + {"Rpl_semi_sync_master_no_tx", (char*) &SHOW_FNAME(no_transactions), SHOW_FUNC}, + {"Rpl_semi_sync_master_status", (char*) &SHOW_FNAME(status), SHOW_FUNC}, + {"Rpl_semi_sync_master_timefunc_failures", + (char*) &SHOW_FNAME(timefunc_fails), SHOW_FUNC}, + {"Rpl_semi_sync_master_tx_avg_wait_time", + (char*) &SHOW_FNAME(trx_wait_time), SHOW_FUNC}, + {"Rpl_semi_sync_master_tx_wait_time", + (char*) &SHOW_FNAME(trx_wait_total_time), SHOW_FUNC}, + {"Rpl_semi_sync_master_tx_waits", (char*) &SHOW_FNAME(trx_wait_num), SHOW_FUNC}, + {"Rpl_semi_sync_master_wait_pos_backtraverse", + (char*) &SHOW_FNAME(back_wait_pos), SHOW_FUNC}, + {"Rpl_semi_sync_master_wait_sessions", + (char*) &SHOW_FNAME(wait_sessions), SHOW_FUNC}, + {"Rpl_semi_sync_master_yes_tx", (char*) &SHOW_FNAME(yes_transactions), SHOW_FUNC}, + {NULL, NULL, SHOW_LONG}, +}; + + +static int semi_sync_master_plugin_init(void *p) +{ + if (repl_semisync.initObject()) + return 1; + if (register_trans_observer(&trans_observer, p)) + return 1; + if (register_binlog_storage_observer(&storage_observer, p)) + return 1; + if (register_binlog_transmit_observer(&transmit_observer, p)) + return 1; + return 0; +} + +static int semi_sync_master_plugin_deinit(void *p) +{ + if (unregister_trans_observer(&trans_observer, p)) + { + sql_print_error("unregister_trans_observer failed"); + return 1; + } + if (unregister_binlog_storage_observer(&storage_observer, p)) + { + sql_print_error("unregister_binlog_storage_observer failed"); + return 1; + } + if (unregister_binlog_transmit_observer(&transmit_observer, p)) + { + sql_print_error("unregister_binlog_transmit_observer failed"); + return 1; + } + sql_print_information("unregister_replicator OK"); + return 0; +} + +struct Mysql_replication semi_sync_master_plugin= { + MYSQL_REPLICATION_INTERFACE_VERSION +}; + +/* + Plugin library descriptor +*/ +mysql_declare_plugin(semi_sync_master) +{ + MYSQL_REPLICATION_PLUGIN, + &semi_sync_master_plugin, + "rpl_semi_sync_master", + "He Zhenxing", + "Semi-synchronous replication master", + PLUGIN_LICENSE_GPL, + semi_sync_master_plugin_init, /* Plugin Init */ + semi_sync_master_plugin_deinit, /* Plugin Deinit */ + 0x0100 /* 1.0 */, + semi_sync_master_status_vars, /* status variables */ + semi_sync_master_system_vars, /* system variables */ + NULL /* config options */ +} +mysql_declare_plugin_end; diff --git a/plugin/semisync/semisync_slave.cc b/plugin/semisync/semisync_slave.cc new file mode 100644 index 00000000000..f6bbb17ce9d --- /dev/null +++ b/plugin/semisync/semisync_slave.cc @@ -0,0 +1,122 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#include "semisync_slave.h" + +char rpl_semi_sync_slave_enabled; +unsigned long rpl_semi_sync_slave_status= 0; +unsigned long rpl_semi_sync_slave_trace_level; + +int ReplSemiSyncSlave::initObject() +{ + int result= 0; + const char *kWho = "ReplSemiSyncSlave::initObject"; + + if (init_done_) + { + fprintf(stderr, "%s called twice\n", kWho); + return 1; + } + init_done_ = true; + + /* References to the parameter works after set_options(). */ + setSlaveEnabled(rpl_semi_sync_slave_enabled); + setTraceLevel(rpl_semi_sync_slave_trace_level); + + return result; +} + +int ReplSemiSyncSlave::slaveReplyConnect() +{ + if (!mysql_reply && !(mysql_reply= rpl_connect_master(NULL))) + { + sql_print_error("Semisync slave connect to master for reply failed"); + return 1; + } + return 0; +} + +int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header, + unsigned long total_len, + bool *need_reply, + const char **payload, + unsigned long *payload_len) +{ + const char *kWho = "ReplSemiSyncSlave::slaveReadSyncHeader"; + int read_res = 0; + function_enter(kWho); + + if ((unsigned char)(header[0]) == kPacketMagicNum) + { + *need_reply = (header[1] & kPacketFlagSync); + *payload_len = total_len - 2; + *payload = header + 2; + + if (trace_level_ & kTraceDetail) + sql_print_information("%s: reply - %d", kWho, *need_reply); + } + else + { + sql_print_error("Missing magic number for semi-sync packet, packet " + "len: %lu", total_len); + read_res = -1; + } + + return function_exit(kWho, read_res); +} + +int ReplSemiSyncSlave::slaveStart(Binlog_relay_IO_param *param) +{ + bool semi_sync= getSlaveEnabled(); + + sql_print_information("Slave I/O thread: Start %s replication to\ + master '%s@%s:%d' in log '%s' at position %lu", + semi_sync ? "semi-sync" : "asynchronous", + param->user, param->host, param->port, + param->master_log_name[0] ? param->master_log_name : "FIRST", + (unsigned long)param->master_log_pos); + + if (semi_sync && !rpl_semi_sync_slave_status) + rpl_semi_sync_slave_status= 1; + return 0; +} + +int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param) +{ + if (rpl_semi_sync_slave_status) + rpl_semi_sync_slave_status= 0; + if (mysql_reply) + mysql_close(mysql_reply); + mysql_reply= 0; + return 0; +} + +int ReplSemiSyncSlave::slaveReply(const char *log_name, my_off_t log_pos) +{ + char query[FN_REFLEN + 100]; + sprintf(query, "SET SESSION rpl_semi_sync_master_reply_log_file_pos='%llu:%s'", + (unsigned long long)log_pos, log_name); + if (mysql_real_query(mysql_reply, query, strlen(query))) + { + sql_print_error("Set 'rpl_semi_sync_master_reply_log_file_pos' on master failed"); + mysql_free_result(mysql_store_result(mysql_reply)); + mysql_close(mysql_reply); + mysql_reply= 0; + return 1; + } + mysql_free_result(mysql_store_result(mysql_reply)); + return 0; +} diff --git a/plugin/semisync/semisync_slave.h b/plugin/semisync/semisync_slave.h new file mode 100644 index 00000000000..73bc8aeeade --- /dev/null +++ b/plugin/semisync/semisync_slave.h @@ -0,0 +1,99 @@ +/* Copyright (C) 2006 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#ifndef SEMISYNC_SLAVE_H +#define SEMISYNC_SLAVE_H + +#include "semisync.h" + +/** + The extension class for the slave of semi-synchronous replication +*/ +class ReplSemiSyncSlave + :public ReplSemiSyncBase { +public: + ReplSemiSyncSlave() + :slave_enabled_(false) + {} + ~ReplSemiSyncSlave() {} + + void setTraceLevel(unsigned long trace_level) { + trace_level_ = trace_level; + } + + /* Initialize this class after MySQL parameters are initialized. this + * function should be called once at bootstrap time. + */ + int initObject(); + + bool getSlaveEnabled() { + return slave_enabled_; + } + void setSlaveEnabled(bool enabled) { + slave_enabled_ = enabled; + } + + /* A slave reads the semi-sync packet header and separate the metadata + * from the payload data. + * + * Input: + * header - (IN) packet header pointer + * total_len - (IN) total packet length: metadata + payload + * need_reply - (IN) whether the master is waiting for the reply + * payload - (IN) payload: the replication event + * payload_len - (IN) payload length + * + * Return: + * 0: success; -1 or otherwise: error + */ + int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply, + const char **payload, unsigned long *payload_len); + + /* A slave replies to the master indicating its replication process. It + * indicates that the slave has received all events before the specified + * binlog position. + * + * Input: + * log_name - (IN) the reply point's binlog file name + * log_pos - (IN) the reply point's binlog file offset + * + * Return: + * 0: success; -1 or otherwise: error + */ + int slaveReply(const char *log_name, my_off_t log_pos); + + /* + Connect to master for sending reply + */ + int slaveReplyConnect(); + + int slaveStart(Binlog_relay_IO_param *param); + int slaveStop(Binlog_relay_IO_param *param); + +private: + /* True when initObject has been called */ + bool init_done_; + bool slave_enabled_; /* semi-sycn is enabled on the slave */ + MYSQL *mysql_reply; /* connection to send reply */ +}; + + +/* System and status variables for the slave component */ +extern char rpl_semi_sync_slave_enabled; +extern unsigned long rpl_semi_sync_slave_trace_level; +extern unsigned long rpl_semi_sync_slave_status; + +#endif /* SEMISYNC_SLAVE_H */ diff --git a/plugin/semisync/semisync_slave_plugin.cc b/plugin/semisync/semisync_slave_plugin.cc new file mode 100644 index 00000000000..ffc663c9bdb --- /dev/null +++ b/plugin/semisync/semisync_slave_plugin.cc @@ -0,0 +1,224 @@ +/* Copyright (C) 2007 Google Inc. + Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +#include "semisync_slave.h" + +ReplSemiSyncSlave repl_semisync; + +/* + indicate whether or not the slave should send a reply to the master. + + This is set to true in repl_semi_slave_read_event if the current + event read is the last event of a transaction. And the value is + checked in repl_semi_slave_queue_event. +*/ +bool semi_sync_need_reply= false; + +int repl_semi_reset_slave(Binlog_relay_IO_param *param) +{ + // TODO: reset semi-sync slave status here + return 0; +} + +int repl_semi_slave_request_dump(Binlog_relay_IO_param *param, + uint32 flags) +{ + MYSQL *mysql= param->mysql; + MYSQL_RES *res= 0; + MYSQL_ROW row; + const char *query; + + if (!repl_semisync.getSlaveEnabled()) + return 0; + + /* + Create the connection that is used to send slave ACK replies to + master + */ + if (repl_semisync.slaveReplyConnect()) + return 1; + + /* Check if master server has semi-sync plugin installed */ + query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'"; + if (mysql_real_query(mysql, query, strlen(query)) || + !(res= mysql_store_result(mysql))) + { + mysql_free_result(mysql_store_result(mysql)); + sql_print_error("Execution failed on master: %s", query); + return 1; + } + + row= mysql_fetch_row(res); + if (!row || strcmp(row[1], "ON")) + { + /* Master does not support or not configured semi-sync */ + sql_print_warning("Master server does not support or not configured semi-sync replication, fallback to asynchronous"); + rpl_semi_sync_slave_status= 0; + return 0; + } + + /* + Tell master dump thread that we want to do semi-sync + replication + */ + query= "SET @rpl_semi_sync_slave= 1"; + if (mysql_real_query(mysql, query, strlen(query))) + { + sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed"); + mysql_free_result(mysql_store_result(mysql)); + return 1; + } + mysql_free_result(mysql_store_result(mysql)); + rpl_semi_sync_slave_status= 1; + return 0; +} + +int repl_semi_slave_read_event(Binlog_relay_IO_param *param, + const char *packet, unsigned long len, + const char **event_buf, unsigned long *event_len) +{ + if (rpl_semi_sync_slave_status) + return repl_semisync.slaveReadSyncHeader(packet, len, + &semi_sync_need_reply, + event_buf, event_len); + *event_buf= packet; + *event_len= len; + return 0; +} + +int repl_semi_slave_queue_event(Binlog_relay_IO_param *param, + const char *event_buf, + unsigned long event_len, + uint32 flags) +{ + if (rpl_semi_sync_slave_status && semi_sync_need_reply) + return repl_semisync.slaveReply(param->master_log_name, + param->master_log_pos); + return 0; +} + +int repl_semi_slave_io_start(Binlog_relay_IO_param *param) +{ + return repl_semisync.slaveStart(param); +} + +int repl_semi_slave_io_end(Binlog_relay_IO_param *param) +{ + return repl_semisync.slaveStop(param); +} + + +static void fix_rpl_semi_sync_slave_enabled(MYSQL_THD thd, + SYS_VAR *var, + void *ptr, + const void *val) +{ + *(char *)ptr= *(char *)val; + repl_semisync.setSlaveEnabled(rpl_semi_sync_slave_enabled != 0); + return; +} + +static void fix_rpl_semi_sync_trace_level(MYSQL_THD thd, + SYS_VAR *var, + void *ptr, + const void *val) +{ + *(unsigned long *)ptr= *(unsigned long *)val; + repl_semisync.setTraceLevel(rpl_semi_sync_slave_trace_level); + return; +} + +/* plugin system variables */ +static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_slave_enabled, + PLUGIN_VAR_OPCMDARG, + "Enable semi-synchronous replication slave (disabled by default). ", + NULL, // check + &fix_rpl_semi_sync_slave_enabled, // update + 0); + +static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level, + PLUGIN_VAR_OPCMDARG, + "The tracing level for semi-sync replication.", + NULL, // check + &fix_rpl_semi_sync_trace_level, // update + 32, 0, ~0L, 1); + +static SYS_VAR* semi_sync_slave_system_vars[]= { + MYSQL_SYSVAR(enabled), + MYSQL_SYSVAR(trace_level), + NULL, +}; + + +/* plugin status variables */ +static SHOW_VAR semi_sync_slave_status_vars[]= { + {"Rpl_semi_sync_slave_status", + (char*) &rpl_semi_sync_slave_status, SHOW_BOOL}, + {NULL, NULL, SHOW_BOOL}, +}; + +Binlog_relay_IO_observer relay_io_observer = { + sizeof(Binlog_relay_IO_observer), // len + + repl_semi_slave_io_start, // start + repl_semi_slave_io_end, // stop + repl_semi_slave_request_dump, // request_transmit + repl_semi_slave_read_event, // after_read_event + repl_semi_slave_queue_event, // after_queue_event + repl_semi_reset_slave, // reset +}; + +static int semi_sync_slave_plugin_init(void *p) +{ + if (repl_semisync.initObject()) + return 1; + if (register_binlog_relay_io_observer(&relay_io_observer, p)) + return 1; + return 0; +} + +static int semi_sync_slave_plugin_deinit(void *p) +{ + if (unregister_binlog_relay_io_observer(&relay_io_observer, p)) + return 1; + return 0; +} + + +struct Mysql_replication semi_sync_slave_plugin= { + MYSQL_REPLICATION_INTERFACE_VERSION +}; + +/* + Plugin library descriptor +*/ +mysql_declare_plugin(semi_sync_slave) +{ + MYSQL_REPLICATION_PLUGIN, + &semi_sync_slave_plugin, + "rpl_semi_sync_slave", + "He Zhenxing", + "Semi-synchronous replication slave", + PLUGIN_LICENSE_GPL, + semi_sync_slave_plugin_init, /* Plugin Init */ + semi_sync_slave_plugin_deinit, /* Plugin Deinit */ + 0x0100 /* 1.0 */, + semi_sync_slave_status_vars, /* status variables */ + semi_sync_slave_system_vars, /* system variables */ + NULL /* config options */ +} +mysql_declare_plugin_end; diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 6f162f4d84d..18508468f60 100755 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -75,6 +75,7 @@ SET (SQL_SOURCE rpl_rli.cc rpl_mi.cc sql_servers.cc sql_connect.cc scheduler.cc sql_profile.cc event_parse_data.cc + rpl_handler.cc ${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc ${PROJECT_SOURCE_DIR}/sql/sql_yacc.h ${PROJECT_SOURCE_DIR}/include/mysqld_error.h diff --git a/sql/Makefile.am b/sql/Makefile.am index 2a12de2eaf6..afce7db59f2 100644 --- a/sql/Makefile.am +++ b/sql/Makefile.am @@ -76,7 +76,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ sql_plugin.h authors.h event_parse_data.h \ event_data_objects.h event_scheduler.h \ sql_partition.h partition_info.h partition_element.h \ - contributors.h sql_servers.h + contributors.h sql_servers.h \ + rpl_handler.h replication.h mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ item.cc item_sum.cc item_buff.cc item_func.cc \ @@ -120,7 +121,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ event_queue.cc event_db_repository.cc events.cc \ sql_plugin.cc sql_binlog.cc \ sql_builtin.cc sql_tablespace.cc partition_info.cc \ - sql_servers.cc event_parse_data.cc + sql_servers.cc event_parse_data.cc \ + rpl_handler.cc nodist_mysqld_SOURCES = mini_client_errors.c pack.c client.c my_time.c my_user.c diff --git a/sql/handler.cc b/sql/handler.cc index e5c64452aaf..f17bb9f8036 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -24,6 +24,7 @@ #endif #include "mysql_priv.h" +#include "rpl_handler.h" #include "rpl_filter.h" #include <myisampack.h> #include <errno.h> @@ -221,6 +222,8 @@ handlerton *ha_checktype(THD *thd, enum legacy_db_type database_type, return NULL; } + RUN_HOOK(transaction, after_rollback, (thd, FALSE)); + switch (database_type) { #ifndef NO_HASH case DB_TYPE_HASH: @@ -1190,6 +1193,7 @@ int ha_commit_trans(THD *thd, bool all) if (cookie) tc_log->unlog(cookie, xid); DBUG_EXECUTE_IF("crash_commit_after", abort();); + RUN_HOOK(transaction, after_commit, (thd, FALSE)); end: if (rw_trans) start_waiting_global_read_lock(thd); @@ -1337,6 +1341,7 @@ int ha_rollback_trans(THD *thd, bool all) push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_WARNING_NOT_COMPLETE_ROLLBACK, ER(ER_WARNING_NOT_COMPLETE_ROLLBACK)); + RUN_HOOK(transaction, after_rollback, (thd, FALSE)); DBUG_RETURN(error); } @@ -1371,7 +1376,14 @@ int ha_autocommit_or_rollback(THD *thd, int error) thd->variables.tx_isolation=thd->session_tx_isolation; } + else #endif + { + if (!error) + RUN_HOOK(transaction, after_commit, (thd, FALSE)); + else + RUN_HOOK(transaction, after_rollback, (thd, FALSE)); + } DBUG_RETURN(error); } diff --git a/sql/log.cc b/sql/log.cc index 1af2f3a4ddc..042431fc008 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -38,6 +38,7 @@ #endif #include <mysql/plugin.h> +#include "rpl_handler.h" /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 @@ -3683,9 +3684,11 @@ err: } -bool MYSQL_BIN_LOG::flush_and_sync() +bool MYSQL_BIN_LOG::flush_and_sync(bool *synced) { int err=0, fd=log_file.file; + if (synced) + *synced= 0; safe_mutex_assert_owner(&LOCK_log); if (flush_io_cache(&log_file)) return 1; @@ -3693,6 +3696,8 @@ bool MYSQL_BIN_LOG::flush_and_sync() { sync_binlog_counter= 0; err=my_sync(fd, MYF(MY_WME)); + if (synced) + *synced= 1; } return err; } @@ -3983,7 +3988,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, if (file == &log_file) { - error= flush_and_sync(); + error= flush_and_sync(0); if (!error) { signal_update(); @@ -4169,8 +4174,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) if (file == &log_file) // we are writing to the real log (disk) { - if (flush_and_sync()) + bool synced= 0; + if (flush_and_sync(&synced)) goto err; + + if (RUN_HOOK(binlog_storage, after_flush, + (thd, log_file_name, file->pos_in_file, synced))) { + sql_print_error("Failed to run 'after_flush' hooks"); + goto err; + } + signal_update(); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } @@ -4425,7 +4438,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log) DBUG_ASSERT(carry == 0); if (sync_log) - flush_and_sync(); + flush_and_sync(0); return 0; // All OK } @@ -4472,7 +4485,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock) ev.write(&log_file); if (lock) { - if (!error && !(error= flush_and_sync())) + if (!error && !(error= flush_and_sync(0))) { signal_update(); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); @@ -4560,7 +4573,8 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, if (incident && write_incident(thd, FALSE)) goto err; - if (flush_and_sync()) + bool synced= 0; + if (flush_and_sync(&synced)) goto err; DBUG_EXECUTE_IF("half_binlogged_transaction", abort();); if (cache->error) // Error on read @@ -4569,6 +4583,15 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, write_error=1; // Don't give more errors goto err; } + + if (RUN_HOOK(binlog_storage, after_flush, + (thd, log_file_name, log_file.pos_in_file, synced))) + { + sql_print_error("Failed to run 'after_flush' hooks"); + write_error=1; + goto err; + } + signal_update(); } diff --git a/sql/log.h b/sql/log.h index d306d6f7182..0550c921658 100644 --- a/sql/log.h +++ b/sql/log.h @@ -378,7 +378,21 @@ public: bool is_active(const char* log_file_name); int update_log_index(LOG_INFO* linfo, bool need_update_threads); void rotate_and_purge(uint flags); - bool flush_and_sync(); + + /** + Flush binlog cache and synchronize to disk. + + This function flushes events in binlog cache to binary log file, + it will do synchronizing according to the setting of system + variable 'sync_binlog'. If file is synchronized, @c synced will + be set to 1, otherwise 0. + + @param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0 + + @retval 0 Success + @retval other Failure + */ + bool flush_and_sync(bool *synced); int purge_logs(const char *to_log, bool included, bool need_mutex, bool need_update_threads, ulonglong *decrease_log_space); diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 7e9eb6e7291..2ae4ec9e9b6 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -31,6 +31,8 @@ #include "rpl_injector.h" +#include "rpl_handler.h" + #ifdef HAVE_SYS_PRCTL_H #include <sys/prctl.h> #endif @@ -1284,6 +1286,7 @@ void clean_up(bool print_message) ha_end(); if (tc_log) tc_log->close(); + delegates_destroy(); xid_cache_free(); delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache); multi_keycache_free(); @@ -3760,6 +3763,13 @@ static int init_server_components() unireg_abort(1); } + /* initialize delegates for extension observers */ + if (delegates_init()) + { + sql_print_error("Initialize extension delegates failed"); + unireg_abort(1); + } + /* need to configure logging before initializing storage engines */ if (opt_update_log) { diff --git a/sql/replication.h b/sql/replication.h new file mode 100644 index 00000000000..6b7be58a5b1 --- /dev/null +++ b/sql/replication.h @@ -0,0 +1,490 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef REPLICATION_H +#define REPLICATION_H + +#ifdef __cplusplus +extern "C" { +#endif + +/** + Transaction observer flags. +*/ +enum Trans_flags { + /** Transaction is a real transaction */ + TRANS_IS_REAL_TRANS = 1 +}; + +/** + Transaction observer parameter +*/ +typedef struct Trans_param { + uint32 server_id; + uint32 flags; + + /* + The latest binary log file name and position written by current + transaction, if binary log is disabled or no log event has been + written into binary log file by current transaction (events + written into transaction log cache are not counted), these two + member will be zero. + */ + const char *log_file; + my_off_t log_pos; +} Trans_param; + +/** + Observes and extends transaction execution +*/ +typedef struct Trans_observer { + uint32 len; + + /** + This callback is called after transaction commit + + This callback is called right after commit to storage engines for + transactional tables. + + For non-transactional tables, this is called at the end of the + statement, before sending statement status, if the statement + succeeded. + + @note The return value is currently ignored by the server. + + @param param The parameter for transaction observers + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_commit)(Trans_param *param); + + /** + This callback is called after transaction rollback + + This callback is called right after rollback to storage engines + for transactional tables. + + For non-transactional tables, this is called at the end of the + statement, before sending statement status, if the statement + failed. + + @note The return value is currently ignored by the server. + + @param param The parameter for transaction observers + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_rollback)(Trans_param *param); +} Trans_observer; + +/** + Binlog storage flags +*/ +enum Binlog_storage_flags { + /** Binary log was sync:ed */ + BINLOG_STORAGE_IS_SYNCED = 1 +}; + +/** + Binlog storage observer parameters + */ +typedef struct Binlog_storage_param { + uint32 server_id; +} Binlog_storage_param; + +/** + Observe binlog logging storage +*/ +typedef struct Binlog_storage_observer { + uint32 len; + + /** + This callback is called after binlog has been flushed + + This callback is called after cached events have been flushed to + binary log file. Whether the binary log file is synchronized to + disk is indicated by the bit BINLOG_STORAGE_IS_SYNCED in @a flags. + + @param param Observer common parameter + @param log_file Binlog file name been updated + @param log_pos Binlog position after update + @param flags flags for binlog storage + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_flush)(Binlog_storage_param *param, + const char *log_file, my_off_t log_pos, + uint32 flags); +} Binlog_storage_observer; + +/** + Replication binlog transmitter (binlog dump) observer parameter. +*/ +typedef struct Binlog_transmit_param { + uint32 server_id; + uint32 flags; +} Binlog_transmit_param; + +/** + Observe and extends the binlog dumping thread. +*/ +typedef struct Binlog_transmit_observer { + uint32 len; + + /** + This callback is called when binlog dumping starts + + + @param param Observer common parameter + @param log_file Binlog file name to transmit from + @param log_pos Binlog position to transmit from + + @retval 0 Sucess + @retval 1 Failure + */ + int (*transmit_start)(Binlog_transmit_param *param, + const char *log_file, my_off_t log_pos); + + /** + This callback is called when binlog dumping stops + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*transmit_stop)(Binlog_transmit_param *param); + + /** + This callback is called to reserve bytes in packet header for event transmission + + This callback is called when resetting transmit packet header to + reserve bytes for this observer in packet header. + + The @a header buffer is allocated by the server code, and @a size + is the size of the header buffer. Each observer can only reserve + a maximum size of @a size in the header. + + @param param Observer common parameter + @param header Pointer of the header buffer + @param size Size of the header buffer + @param len Header length reserved by this observer + + @retval 0 Sucess + @retval 1 Failure + */ + int (*reserve_header)(Binlog_transmit_param *param, + unsigned char *header, + unsigned long size, + unsigned long *len); + + /** + This callback is called before sending an event packet to slave + + @param param Observer common parameter + @param packet Binlog event packet to send + @param len Length of the event packet + @param log_file Binlog file name of the event packet to send + @param log_pos Binlog position of the event packet to send + + @retval 0 Sucess + @retval 1 Failure + */ + int (*before_send_event)(Binlog_transmit_param *param, + unsigned char *packet, unsigned long len, + const char *log_file, my_off_t log_pos ); + + /** + This callback is called after sending an event packet to slave + + @param param Observer common parameter + @param event_buf Binlog event packet buffer sent + @param len length of the event packet buffer + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_send_event)(Binlog_transmit_param *param, + const char *event_buf, unsigned long len); + + /** + This callback is called after resetting master status + + This is called when executing the command RESET MASTER, and is + used to reset status variables added by observers. + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_reset_master)(Binlog_transmit_param *param); +} Binlog_transmit_observer; + +/** + Binlog relay IO flags +*/ +enum Binlog_relay_IO_flags { + /** Binary relay log was sync:ed */ + BINLOG_RELAY_IS_SYNCED = 1 +}; + + +/** + Replication binlog relay IO observer parameter +*/ +typedef struct Binlog_relay_IO_param { + uint32 server_id; + + /* Master host, user and port */ + char *host; + char *user; + unsigned int port; + + char *master_log_name; + my_off_t master_log_pos; + + MYSQL *mysql; /* the connection to master */ +} Binlog_relay_IO_param; + +/** + Observes and extends the service of slave IO thread. +*/ +typedef struct Binlog_relay_IO_observer { + uint32 len; + + /** + This callback is called when slave IO thread starts + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*thread_start)(Binlog_relay_IO_param *param); + + /** + This callback is called when slave IO thread stops + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*thread_stop)(Binlog_relay_IO_param *param); + + /** + This callback is called before slave requesting binlog transmission from master + + This is called before slave issuing BINLOG_DUMP command to master + to request binlog. + + @param param Observer common parameter + @param flags binlog dump flags + + @retval 0 Sucess + @retval 1 Failure + */ + int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags); + + /** + This callback is called after read an event packet from master + + @param param Observer common parameter + @param packet The event packet read from master + @param len Length of the event packet read from master + @param event_buf The event packet return after process + @param event_len The length of event packet return after process + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_read_event)(Binlog_relay_IO_param *param, + const char *packet, unsigned long len, + const char **event_buf, unsigned long *event_len); + + /** + This callback is called after written an event packet to relay log + + @param param Observer common parameter + @param event_buf Event packet written to relay log + @param event_len Length of the event packet written to relay log + @param flags flags for relay log + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_queue_event)(Binlog_relay_IO_param *param, + const char *event_buf, unsigned long event_len, + uint32 flags); + + /** + This callback is called after reset slave relay log IO status + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_reset_slave)(Binlog_relay_IO_param *param); +} Binlog_relay_IO_observer; + + +/** + Register a transaction observer + + @param observer The transaction observer to register + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer already exists +*/ +int register_trans_observer(Trans_observer *observer, void *p); + +/** + Unregister a transaction observer + + @param observer The transaction observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer not exists +*/ +int unregister_trans_observer(Trans_observer *observer, void *p); + +/** + Register a binlog storage observer + + @param observer The binlog storage observer to register + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer already exists +*/ +int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p); + +/** + Unregister a binlog storage observer + + @param observer The binlog storage observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer not exists +*/ +int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p); + +/** + Register a binlog transmit observer + + @param observer The binlog transmit observer to register + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer already exists +*/ +int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p); + +/** + Unregister a binlog transmit observer + + @param observer The binlog transmit observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer not exists +*/ +int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p); + +/** + Register a binlog relay IO (slave IO thread) observer + + @param observer The binlog relay IO observer to register + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer already exists +*/ +int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p); + +/** + Unregister a binlog relay IO (slave IO thread) observer + + @param observer The binlog relay IO observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer not exists +*/ +int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p); + +/** + Connect to master + + This function can only used in the slave I/O thread context, and + will use the same master information to do the connection. + + @code + MYSQL *mysql = mysql_init(NULL); + if (rpl_connect_master(mysql)) + { + // do stuff with the connection + } + mysql_close(mysql); // close the connection + @endcode + + @param mysql address of MYSQL structure to use, pass NULL will + create a new one + + @return address of MYSQL structure on success, NULL on failure +*/ +MYSQL *rpl_connect_master(MYSQL *mysql); + +/** + Set thread entering a condition + + This function should be called before putting a thread to wait for + a condition. @a mutex should be held before calling this + function. After being waken up, @f thd_exit_cond should be called. + + @param thd The thread entering the condition, NULL means current thread + @param cond The condition the thread is going to wait for + @param mutex The mutex associated with the condition, this must be + held before call this function + @param msg The new process message for the thread +*/ +const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond, + pthread_mutex_t *mutex, const char *msg); + +/** + Set thread leaving a condition + + This function should be called after a thread being waken up for a + condition. + + @param thd The thread entering the condition, NULL means current thread + @param old_msg The process message, ususally this should be the old process + message before calling @f thd_enter_cond +*/ +void thd_exit_cond(MYSQL_THD thd, const char *old_msg); + + +#ifdef __cplusplus +} +#endif +#endif /* REPLICATION_H */ diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc new file mode 100644 index 00000000000..aea838928b9 --- /dev/null +++ b/sql/rpl_handler.cc @@ -0,0 +1,493 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "mysql_priv.h" + +#include "rpl_mi.h" +#include "sql_repl.h" +#include "log_event.h" +#include "rpl_filter.h" +#include <my_dir.h> +#include "rpl_handler.h" + +Trans_delegate *transaction_delegate; +Binlog_storage_delegate *binlog_storage_delegate; +#ifdef HAVE_REPLICATION +Binlog_transmit_delegate *binlog_transmit_delegate; +Binlog_relay_IO_delegate *binlog_relay_io_delegate; +#endif /* HAVE_REPLICATION */ + +/* + structure to save transaction log filename and position +*/ +typedef struct Trans_binlog_info { + my_off_t log_pos; + char log_file[FN_REFLEN]; +} Trans_binlog_info; + +static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + +int get_user_var_int(const char *name, + long long int *value, int *null_value) +{ + my_bool null_val; + user_var_entry *entry= + (user_var_entry*) hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + *value= entry->val_int(&null_val); + if (null_value) + *null_value= null_val; + return 0; +} + +int get_user_var_real(const char *name, + double *value, int *null_value) +{ + my_bool null_val; + user_var_entry *entry= + (user_var_entry*) hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + *value= entry->val_real(&null_val); + if (null_value) + *null_value= null_val; + return 0; +} + +int get_user_var_str(const char *name, char *value, + size_t len, unsigned int precision, int *null_value) +{ + String str; + my_bool null_val; + user_var_entry *entry= + (user_var_entry*) hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + entry->val_str(&null_val, &str, precision); + strncpy(value, str.c_ptr(), len); + if (null_value) + *null_value= null_val; + return 0; +} + +int delegates_init() +{ + static unsigned char trans_mem[sizeof(Trans_delegate)]; + static unsigned char storage_mem[sizeof(Binlog_storage_delegate)]; +#ifdef HAVE_REPLICATION + static unsigned char transmit_mem[sizeof(Binlog_transmit_delegate)]; + static unsigned char relay_io_mem[sizeof(Binlog_relay_IO_delegate)]; +#endif + + if (!(transaction_delegate= new (trans_mem) Trans_delegate) + || (!transaction_delegate->is_inited()) + || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate) + || (!binlog_storage_delegate->is_inited()) +#ifdef HAVE_REPLICATION + || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate) + || (!binlog_transmit_delegate->is_inited()) + || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate) + || (!binlog_relay_io_delegate->is_inited()) +#endif /* HAVE_REPLICATION */ + ) + return 1; + + if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL)) + return 1; + return 0; +} + +void delegates_destroy() +{ + if (transaction_delegate) + transaction_delegate->~Trans_delegate(); + if (binlog_storage_delegate) + binlog_storage_delegate->~Binlog_storage_delegate(); +#ifdef HAVE_REPLICATION + if (binlog_transmit_delegate) + binlog_transmit_delegate->~Binlog_transmit_delegate(); + if (binlog_relay_io_delegate) + binlog_relay_io_delegate->~Binlog_relay_IO_delegate(); +#endif /* HAVE_REPLICATION */ +} + +/* + This macro is used by almost all the Delegate methods to iterate + over all the observers running given callback function of the + delegate . + + Add observer plugins to the thd->lex list, after each statement, all + plugins add to thd->lex will be automatically unlocked. + */ +#define FOREACH_OBSERVER(r, f, thd, args) \ + param.server_id= thd->server_id; \ + read_lock(); \ + Observer_info_iterator iter= observer_info_iter(); \ + Observer_info *info= iter++; \ + for (; info; info= iter++) \ + { \ + plugin_ref plugin= \ + my_plugin_lock(thd, &info->plugin); \ + if (!plugin) \ + { \ + r= 1; \ + break; \ + } \ + if (((Observer *)info->observer)->f \ + && ((Observer *)info->observer)->f args) \ + { \ + r= 1; \ + plugin_unlock(thd, plugin); \ + break; \ + } \ + plugin_unlock(thd, plugin); \ + } \ + unlock() + + +int Trans_delegate::after_commit(THD *thd, bool all) +{ + Trans_param param; + bool is_real_trans= (all || thd->transaction.all.ha_list == 0); + if (is_real_trans) + param.flags |= TRANS_IS_REAL_TRANS; + + Trans_binlog_info *log_info= + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + + param.log_file= log_info ? log_info->log_file : 0; + param.log_pos= log_info ? log_info->log_pos : 0; + + int ret= 0; + FOREACH_OBSERVER(ret, after_commit, thd, (¶m)); + + /* + This is the end of a real transaction or autocommit statement, we + can free the memory allocated for binlog file and position. + */ + if (is_real_trans && log_info) + { + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL); + my_free(log_info, MYF(0)); + } + return ret; +} + +int Trans_delegate::after_rollback(THD *thd, bool all) +{ + Trans_param param; + bool is_real_trans= (all || thd->transaction.all.ha_list == 0); + if (is_real_trans) + param.flags |= TRANS_IS_REAL_TRANS; + + Trans_binlog_info *log_info= + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + + param.log_file= log_info ? log_info->log_file : 0; + param.log_pos= log_info ? log_info->log_pos : 0; + + int ret= 0; + FOREACH_OBSERVER(ret, after_commit, thd, (¶m)); + + /* + This is the end of a real transaction or autocommit statement, we + can free the memory allocated for binlog file and position. + */ + if (is_real_trans && log_info) + { + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL); + my_free(log_info, MYF(0)); + } + return ret; +} + +int Binlog_storage_delegate::after_flush(THD *thd, + const char *log_file, + my_off_t log_pos, + bool synced) +{ + Binlog_storage_param param; + uint32 flags=0; + if (synced) + flags |= BINLOG_STORAGE_IS_SYNCED; + + Trans_binlog_info *log_info= + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + + if (!log_info) + { + if(!(log_info= + (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0)))) + return 1; + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info); + } + + strcpy(log_info->log_file, log_file+dirname_length(log_file)); + log_info->log_pos = log_pos; + + int ret= 0; + FOREACH_OBSERVER(ret, after_flush, thd, + (¶m, log_info->log_file, log_info->log_pos, flags)); + return ret; +} + +#ifdef HAVE_REPLICATION +int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags, + const char *log_file, + my_off_t log_pos) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos)); + return ret; +} + +int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m)); + return ret; +} + +int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags, + String *packet) +{ + /* NOTE2ME: Maximum extra header size for each observer, I hope 32 + bytes should be enough for each Observer to reserve their extra + header. If later found this is not enough, we can increase this + /HEZX + */ +#define RESERVE_HEADER_SIZE 32 + unsigned char header[RESERVE_HEADER_SIZE]; + ulong hlen; + Binlog_transmit_param param; + param.flags= flags; + param.server_id= thd->server_id; + + int ret= 0; + read_lock(); + Observer_info_iterator iter= observer_info_iter(); + Observer_info *info= iter++; + for (; info; info= iter++) + { + plugin_ref plugin= + my_plugin_lock(thd, &info->plugin); + if (!plugin) + { + ret= 1; + break; + } + hlen= 0; + if (((Observer *)info->observer)->reserve_header + && ((Observer *)info->observer)->reserve_header(¶m, + header, + RESERVE_HEADER_SIZE, + &hlen)) + { + ret= 1; + plugin_unlock(thd, plugin); + break; + } + plugin_unlock(thd, plugin); + if (hlen == 0) + continue; + if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen)) + { + ret= 1; + break; + } + } + unlock(); + return ret; +} + +int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags, + String *packet, + const char *log_file, + my_off_t log_pos) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, before_send_event, thd, + (¶m, (uchar *)packet->c_ptr(), + packet->length(), + log_file+dirname_length(log_file), log_pos)); + return ret; +} + +int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags, + String *packet) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, after_send_event, thd, + (¶m, packet->c_ptr(), packet->length())); + return ret; +} + +int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags) + +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m)); + return ret; +} + +void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param, + Master_info *mi) +{ + param->mysql= mi->mysql; + param->user= mi->user; + param->host= mi->host; + param->port= mi->port; + param->master_log_name= mi->master_log_name; + param->master_log_pos= mi->master_log_pos; +} + +int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, thread_start, thd, (¶m)); + return ret; +} + + +int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi) +{ + + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, thread_stop, thd, (¶m)); + return ret; +} + +int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, + Master_info *mi, + ushort flags) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags)); + return ret; +} + +int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi, + const char *packet, ulong len, + const char **event_buf, + ulong *event_len) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, after_read_event, thd, + (¶m, packet, len, event_buf, event_len)); + return ret; +} + +int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi, + const char *event_buf, + ulong event_len, + bool synced) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + uint32 flags=0; + if (synced) + flags |= BINLOG_STORAGE_IS_SYNCED; + + int ret= 0; + FOREACH_OBSERVER(ret, after_queue_event, thd, + (¶m, event_buf, event_len, flags)); + return ret; +} + +int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi) + +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m)); + return ret; +} +#endif /* HAVE_REPLICATION */ + +int register_trans_observer(Trans_observer *observer, void *p) +{ + return transaction_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_trans_observer(Trans_observer *observer, void *p) +{ + return transaction_delegate->remove_observer(observer, (st_plugin_int *)p); +} + +int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p) +{ + return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p) +{ + return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p); +} + +#ifdef HAVE_REPLICATION +int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) +{ + return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) +{ + return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p); +} + +int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) +{ + return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) +{ + return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p); +} +#endif /* HAVE_REPLICATION */ diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h new file mode 100644 index 00000000000..4fb7b4e035b --- /dev/null +++ b/sql/rpl_handler.h @@ -0,0 +1,213 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef RPL_HANDLER_H +#define RPL_HANDLER_H + +#include "mysql_priv.h" +#include "rpl_mi.h" +#include "rpl_rli.h" +#include "sql_plugin.h" +#include "replication.h" + +class Observer_info { +public: + void *observer; + st_plugin_int *plugin_int; + plugin_ref plugin; + + Observer_info(void *ob, st_plugin_int *p) + :observer(ob), plugin_int(p) + { + plugin= plugin_int_to_ref(plugin_int); + } +}; + +class Delegate { +public: + typedef List<Observer_info> Observer_info_list; + typedef List_iterator<Observer_info> Observer_info_iterator; + + int add_observer(void *observer, st_plugin_int *plugin) + { + int ret= FALSE; + if (!inited) + return TRUE; + write_lock(); + Observer_info_iterator iter(observer_info_list); + Observer_info *info= iter++; + while (info && info->observer != observer) + info= iter++; + if (!info) + { + info= new Observer_info(observer, plugin); + if (!info || observer_info_list.push_back(info, &memroot)) + ret= TRUE; + } + else + ret= TRUE; + unlock(); + return ret; + } + + int remove_observer(void *observer, st_plugin_int *plugin) + { + int ret= FALSE; + if (!inited) + return TRUE; + write_lock(); + Observer_info_iterator iter(observer_info_list); + Observer_info *info= iter++; + while (info && info->observer != observer) + info= iter++; + if (info) + iter.remove(); + else + ret= TRUE; + unlock(); + return ret; + } + + inline Observer_info_iterator observer_info_iter() + { + return Observer_info_iterator(observer_info_list); + } + + inline bool is_empty() + { + return observer_info_list.is_empty(); + } + + inline int read_lock() + { + if (!inited) + return TRUE; + return rw_rdlock(&lock); + } + + inline int write_lock() + { + if (!inited) + return TRUE; + return rw_wrlock(&lock); + } + + inline int unlock() + { + if (!inited) + return TRUE; + return rw_unlock(&lock); + } + + inline bool is_inited() + { + return inited; + } + + Delegate() + { + inited= FALSE; + if (my_rwlock_init(&lock, NULL)) + return; + init_sql_alloc(&memroot, 1024, 0); + inited= TRUE; + } + ~Delegate() + { + inited= FALSE; + rwlock_destroy(&lock); + free_root(&memroot, MYF(0)); + } + +private: + Observer_info_list observer_info_list; + rw_lock_t lock; + MEM_ROOT memroot; + bool inited; +}; + +class Trans_delegate + :public Delegate { +public: + typedef Trans_observer Observer; + int before_commit(THD *thd, bool all); + int before_rollback(THD *thd, bool all); + int after_commit(THD *thd, bool all); + int after_rollback(THD *thd, bool all); +}; + +class Binlog_storage_delegate + :public Delegate { +public: + typedef Binlog_storage_observer Observer; + int after_flush(THD *thd, const char *log_file, + my_off_t log_pos, bool synced); +}; + +#ifdef HAVE_REPLICATION +class Binlog_transmit_delegate + :public Delegate { +public: + typedef Binlog_transmit_observer Observer; + int transmit_start(THD *thd, ushort flags, + const char *log_file, my_off_t log_pos); + int transmit_stop(THD *thd, ushort flags); + int reserve_header(THD *thd, ushort flags, String *packet); + int before_send_event(THD *thd, ushort flags, + String *packet, const + char *log_file, my_off_t log_pos ); + int after_send_event(THD *thd, ushort flags, + String *packet); + int after_reset_master(THD *thd, ushort flags); +}; + +class Binlog_relay_IO_delegate + :public Delegate { +public: + typedef Binlog_relay_IO_observer Observer; + int thread_start(THD *thd, Master_info *mi); + int thread_stop(THD *thd, Master_info *mi); + int before_request_transmit(THD *thd, Master_info *mi, ushort flags); + int after_read_event(THD *thd, Master_info *mi, + const char *packet, ulong len, + const char **event_buf, ulong *event_len); + int after_queue_event(THD *thd, Master_info *mi, + const char *event_buf, ulong event_len, + bool synced); + int after_reset_slave(THD *thd, Master_info *mi); +private: + void init_param(Binlog_relay_IO_param *param, Master_info *mi); +}; +#endif /* HAVE_REPLICATION */ + +int delegates_init(); +void delegates_destroy(); + +extern Trans_delegate *transaction_delegate; +extern Binlog_storage_delegate *binlog_storage_delegate; +#ifdef HAVE_REPLICATION +extern Binlog_transmit_delegate *binlog_transmit_delegate; +extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; +#endif /* HAVE_REPLICATION */ + +/* + if there is no observers in the delegate, we can return 0 + immediately. +*/ +#define RUN_HOOK(group, hook, args) \ + (group ##_delegate->is_empty() ? \ + 0 : group ##_delegate->hook args) + +#endif /* RPL_HANDLER_H */ diff --git a/sql/slave.cc b/sql/slave.cc index fac9ee214c5..4988886dce4 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -40,6 +40,7 @@ #include <errmsg.h> #include <mysqld_error.h> #include <mysys_err.h> +#include "rpl_handler.h" #ifdef HAVE_REPLICATION @@ -69,6 +70,8 @@ ulonglong relay_log_space_limit = 0; int disconnect_slave_event_count = 0, abort_slave_event_count = 0; int events_till_abort = -1; +static pthread_key(Master_info*, RPL_MASTER_INFO); + enum enum_slave_reconnect_actions { SLAVE_RECON_ACT_REG= 0, @@ -231,6 +234,10 @@ int init_slave() TODO: re-write this to interate through the list of files for multi-master */ + + if (pthread_key_create(&RPL_MASTER_INFO, NULL)) + goto err; + active_mi= new Master_info; /* @@ -1868,17 +1875,22 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed, } -static int request_dump(MYSQL* mysql, Master_info* mi, - bool *suppress_warnings) +static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, + bool *suppress_warnings) { uchar buf[FN_REFLEN + 10]; int len; - int binlog_flags = 0; // for now + ushort binlog_flags = 0; // for now char* logname = mi->master_log_name; DBUG_ENTER("request_dump"); *suppress_warnings= FALSE; + if (RUN_HOOK(binlog_relay_io, + before_request_transmit, + (thd, mi, binlog_flags))) + DBUG_RETURN(1); + // TODO if big log files: Change next to int8store() int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); @@ -2532,6 +2544,16 @@ pthread_handler_t handle_slave_io(void *arg) mi->master_log_name, llstr(mi->master_log_pos,llbuff))); + /* This must be called before run any binlog_relay_io hooks */ + my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi); + + if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook"); + goto err; + } + if (!(mi->mysql = mysql = mysql_init(NULL))) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, @@ -2621,7 +2643,7 @@ connected: while (!io_slave_killed(thd,mi)) { thd_proc_info(thd, "Requesting binlog dump"); - if (request_dump(mysql, mi, &suppress_warnings)) + if (request_dump(thd, mysql, mi, &suppress_warnings)) { sql_print_error("Failed on request_dump()"); if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \ @@ -2641,6 +2663,7 @@ requesting master dump") || goto err; goto connected; }); + const char *event_buf; DBUG_ASSERT(mi->last_error().number == 0); while (!io_slave_killed(thd,mi)) @@ -2697,14 +2720,37 @@ Stopping slave I/O thread due to out-of-memory error from master"); retry_count=0; // ok event, reset retry counter thd_proc_info(thd, "Queueing master event to the relay log"); - if (queue_event(mi,(const char*)mysql->net.read_pos + 1, - event_len)) + event_buf= (const char*)mysql->net.read_pos + 1; + if (RUN_HOOK(binlog_relay_io, after_read_event, + (thd, mi,(const char*)mysql->net.read_pos + 1, + event_len, &event_buf, &event_len))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER(ER_SLAVE_FATAL_ERROR), + "Failed to run 'after_read_event' hook"); + goto err; + } + + /* XXX: 'synced' should be updated by queue_event to indicate + whether event has been synced to disk */ + bool synced= 0; + if (queue_event(mi, event_buf, event_len)) { mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), "could not queue event from master"); goto err; } + + if (RUN_HOOK(binlog_relay_io, after_queue_event, + (thd, mi, event_buf, event_len, synced))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER(ER_SLAVE_FATAL_ERROR), + "Failed to run 'after_queue_event' hook"); + goto err; + } + if (flush_master_info(mi, 1)) { sql_print_error("Failed to flush master info file"); @@ -2750,6 +2796,7 @@ err: // print the current replication position sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s", IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); + RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi)); thd->set_query(NULL, 0); thd->reset_db(NULL, 0); if (mysql) @@ -3906,6 +3953,71 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, } +MYSQL *rpl_connect_master(MYSQL *mysql) +{ + THD *thd= current_thd; + Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO); + if (!mi) + { + sql_print_error("'rpl_connect_master' must be called in slave I/O thread context."); + return NULL; + } + + bool allocated= false; + + if (!mysql) + { + if(!(mysql= mysql_init(NULL))) + { + sql_print_error("rpl_connect_master: failed in mysql_init()"); + return NULL; + } + allocated= true; + } + + /* + XXX: copied from connect_to_master, this function should not + change the slave status, so we cannot use connect_to_master + directly + + TODO: make this part a seperate function to eliminate duplication + */ + mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout); + mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout); + +#ifdef HAVE_OPENSSL + if (mi->ssl) + { + mysql_ssl_set(mysql, + mi->ssl_key[0]?mi->ssl_key:0, + mi->ssl_cert[0]?mi->ssl_cert:0, + mi->ssl_ca[0]?mi->ssl_ca:0, + mi->ssl_capath[0]?mi->ssl_capath:0, + mi->ssl_cipher[0]?mi->ssl_cipher:0); + mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, + &mi->ssl_verify_server_cert); + } +#endif + + mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname); + /* This one is not strictly needed but we have it here for completeness */ + mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir); + + if (io_slave_killed(thd, mi) + || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, + mi->port, 0, 0)) + { + if (!io_slave_killed(thd, mi)) + sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)", + mysql_error(mysql), mysql_errno(mysql)); + + if (allocated) + mysql_close(mysql); // this will free the object + return NULL; + } + return mysql; +} + /* Store the file and position where the execute-slave thread are in the relay log. diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 3f568566c89..755e60b4fc2 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -278,6 +278,42 @@ const char *set_thd_proc_info(THD *thd, const char *info, } extern "C" +const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond, + pthread_mutex_t *mutex, const char *msg) +{ + if (!thd) + thd= current_thd; + + const char* old_msg = thd->proc_info; + safe_mutex_assert_owner(mutex); + thd->mysys_var->current_mutex = mutex; + thd->mysys_var->current_cond = cond; + thd->proc_info = msg; + return old_msg; +} + +extern "C" +void thd_exit_cond(MYSQL_THD thd, const char *old_msg) +{ + if (!thd) + thd= current_thd; + + /* + Putting the mutex unlock in thd_exit_cond() ensures that + mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is + locked (if that would not be the case, you'll get a deadlock if someone + does a THD::awake() on you). + */ + pthread_mutex_unlock(thd->mysys_var->current_mutex); + pthread_mutex_lock(&thd->mysys_var->mutex); + thd->mysys_var->current_mutex = 0; + thd->mysys_var->current_cond = 0; + thd->proc_info = old_msg; + pthread_mutex_unlock(&thd->mysys_var->mutex); + return; +} + +extern "C" void **thd_ha_data(const THD *thd, const struct handlerton *hton) { return (void **) &thd->ha_data[hton->slot].ha_ptr; diff --git a/sql/sql_class.h b/sql/sql_class.h index f52d5fae76f..3d7ff0ca0a1 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -22,6 +22,7 @@ #include "log.h" #include "rpl_tblmap.h" +#include "replication.h" /** An interface that is used to take an action when @@ -1940,27 +1941,11 @@ public: inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex, const char* msg) { - const char* old_msg = proc_info; - safe_mutex_assert_owner(mutex); - mysys_var->current_mutex = mutex; - mysys_var->current_cond = cond; - proc_info = msg; - return old_msg; + return thd_enter_cond(this, cond, mutex, msg); } inline void exit_cond(const char* old_msg) { - /* - Putting the mutex unlock in exit_cond() ensures that - mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is - locked (if that would not be the case, you'll get a deadlock if someone - does a THD::awake() on you). - */ - pthread_mutex_unlock(mysys_var->current_mutex); - pthread_mutex_lock(&mysys_var->mutex); - mysys_var->current_mutex = 0; - mysys_var->current_cond = 0; - proc_info = old_msg; - pthread_mutex_unlock(&mysys_var->mutex); + thd_exit_cond(this, old_msg); } inline time_t query_start() { query_start_used=1; return start_time; } inline void set_time() diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index ca27d476213..6019c385fb8 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -21,6 +21,7 @@ #include <m_ctype.h> #include <myisam.h> #include <my_dir.h> +#include "rpl_handler.h" #include "sp_head.h" #include "sp.h" diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc index da168d36429..8cf8c4cb81f 100644 --- a/sql/sql_plugin.cc +++ b/sql/sql_plugin.cc @@ -19,14 +19,6 @@ #define REPORT_TO_LOG 1 #define REPORT_TO_USER 2 -#ifdef DBUG_OFF -#define plugin_ref_to_int(A) A -#define plugin_int_to_ref(A) A -#else -#define plugin_ref_to_int(A) (A ? A[0] : NULL) -#define plugin_int_to_ref(A) &(A) -#endif - extern struct st_mysql_plugin *mysqld_builtins[]; /** @@ -54,7 +46,8 @@ const LEX_STRING plugin_type_names[MYSQL_MAX_PLUGIN_TYPE_NUM]= { C_STRING_WITH_LEN("STORAGE ENGINE") }, { C_STRING_WITH_LEN("FTPARSER") }, { C_STRING_WITH_LEN("DAEMON") }, - { C_STRING_WITH_LEN("INFORMATION SCHEMA") } + { C_STRING_WITH_LEN("INFORMATION SCHEMA") }, + { C_STRING_WITH_LEN("REPLICATION") }, }; extern int initialize_schema_table(st_plugin_int *plugin); @@ -93,7 +86,8 @@ static int min_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= MYSQL_HANDLERTON_INTERFACE_VERSION, MYSQL_FTPARSER_INTERFACE_VERSION, MYSQL_DAEMON_INTERFACE_VERSION, - MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION + MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION, + MYSQL_REPLICATION_INTERFACE_VERSION, }; static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= { @@ -101,7 +95,8 @@ static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= MYSQL_HANDLERTON_INTERFACE_VERSION, MYSQL_FTPARSER_INTERFACE_VERSION, MYSQL_DAEMON_INTERFACE_VERSION, - MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION + MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION, + MYSQL_REPLICATION_INTERFACE_VERSION, }; static bool initialized= 0; diff --git a/sql/sql_plugin.h b/sql/sql_plugin.h index 004d0d5abb7..c6ad846943c 100644 --- a/sql/sql_plugin.h +++ b/sql/sql_plugin.h @@ -18,6 +18,14 @@ class sys_var; +#ifdef DBUG_OFF +#define plugin_ref_to_int(A) A +#define plugin_int_to_ref(A) A +#else +#define plugin_ref_to_int(A) (A ? A[0] : NULL) +#define plugin_int_to_ref(A) &(A) +#endif + /* the following flags are valid for plugin_init() */ diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 0ec8d91214c..671f6785640 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -21,6 +21,7 @@ #include "log_event.h" #include "rpl_filter.h" #include <my_dir.h> +#include "rpl_handler.h" int max_binlog_dump_events = 0; // unlimited my_bool opt_sporadic_binlog_dump_fail = 0; @@ -80,6 +81,32 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, DBUG_RETURN(0); } +/* + Reset thread transmit packet buffer for event sending + + This function allocates header bytes for event transmission, and + should be called before store the event data to the packet buffer. +*/ +static int reset_transmit_packet(THD *thd, ushort flags, + ulong *ev_offset, const char **errmsg) +{ + int ret= 0; + String *packet= &thd->packet; + + /* reserve and set default header */ + packet->length(0); + packet->set("\0", 1, &my_charset_bin); + + if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet))) + { + *errmsg= "Failed to run hook 'reserve_header'"; + my_errno= ER_UNKNOWN_ERROR; + ret= 1; + } + *ev_offset= packet->length(); + return ret; +} + static int send_file(THD *thd) { NET* net = &thd->net; @@ -346,6 +373,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, LOG_INFO linfo; char *log_file_name = linfo.log_file_name; char search_file_name[FN_REFLEN], *name; + + ulong ev_offset; + IO_CACHE log; File file = -1; String* packet = &thd->packet; @@ -361,6 +391,14 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); bzero((char*) &log,sizeof(log)); + sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", + thd->server_id, log_ident, (ulong)pos); + if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) + { + errmsg= "Failed to run hook 'transmit_start'"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } #ifndef DBUG_OFF if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2)) @@ -416,11 +454,9 @@ impossible position"; goto err; } - /* - We need to start a packet with something other than 255 - to distinguish it from error - */ - packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */ + /* reset transmit packet for the fake rotate event below */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; /* Tell the client about the log name with a fake Rotate event; @@ -460,7 +496,7 @@ impossible position"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; } - packet->set("\0", 1, &my_charset_bin); + /* Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become this larger than the corresponding packet (query) sent @@ -476,6 +512,11 @@ impossible position"; log_lock = mysql_bin_log.get_log_lock(); if (pos > BIN_LOG_HEADER_SIZE) { + /* reset transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* Try to find a Format_description_log_event at the beginning of the binlog @@ -483,29 +524,30 @@ impossible position"; if (!(error = Log_event::read_log_event(&log, packet, log_lock))) { /* - The packet has offsets equal to the normal offsets in a binlog - event +1 (the first character is \0). + The packet has offsets equal to the normal offsets in a + binlog event + ev_offset (the first ev_offset characters are + the header (default \0)). */ DBUG_PRINT("info", ("Looked for a Format_description_log_event, found event type %d", - (*packet)[EVENT_TYPE_OFFSET+1])); - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + (*packet)[EVENT_TYPE_OFFSET+ev_offset])); + if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT) { - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & + binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] & LOG_EVENT_BINLOG_IN_USE_F); - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; /* mark that this event with "log_pos=0", so the slave should not increment master's binlog position (rli->group_master_log_pos) */ - int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0); + int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0); /* if reconnect master sends FD event with `created' as 0 to avoid destroying temp tables. */ int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ - ST_CREATED_OFFSET+1, (ulong) 0); + ST_CREATED_OFFSET+ev_offset, (ulong) 0); /* send it */ if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { @@ -531,8 +573,6 @@ impossible position"; Format_description_log_event will be found naturally if it is written. */ } - /* reset the packet as we wrote to it in any case */ - packet->set("\0", 1, &my_charset_bin); } /* end of if (pos > BIN_LOG_HEADER_SIZE); */ else { @@ -544,6 +584,12 @@ impossible position"; while (!net->error && net->vio != 0 && !thd->killed) { + Log_event_type event_type= UNKNOWN_EVENT; + + /* reset the transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; while (!(error = Log_event::read_log_event(&log, packet, log_lock))) { #ifndef DBUG_OFF @@ -556,15 +602,25 @@ impossible position"; } #endif - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]); + if (event_type == FORMAT_DESCRIPTION_EVENT) { - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & + binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] & LOG_EVENT_BINLOG_IN_USE_F); - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; } - else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) + else if (event_type == STOP_EVENT) binlog_can_be_corrupted= FALSE; + pos = my_b_tell(&log); + if (RUN_HOOK(binlog_transmit, before_send_event, + (thd, flags, packet, log_file_name, pos))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "run 'before_send_event' hook failed"; + goto err; + } + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { errmsg = "Failed on my_net_write()"; @@ -572,9 +628,8 @@ impossible position"; goto err; } - DBUG_PRINT("info", ("log event code %d", - (*packet)[LOG_EVENT_OFFSET+1] )); - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + DBUG_PRINT("info", ("log event code %d", event_type)); + if (event_type == LOAD_EVENT) { if (send_file(thd)) { @@ -583,7 +638,17 @@ impossible position"; goto err; } } - packet->set("\0", 1, &my_charset_bin); + + if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + { + errmsg= "Failed to run hook 'after_send_event'"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + + /* reset transmit packet for next loop */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; } /* @@ -634,6 +699,11 @@ impossible position"; } #endif + /* reset the transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* No one will update the log while we are reading now, but we'll be quick and just read one record @@ -650,6 +720,7 @@ impossible position"; /* we read successfully, so we'll need to send it to the slave */ pthread_mutex_unlock(log_lock); read_packet = 1; + event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]); break; case LOG_READ_EOF: @@ -676,8 +747,17 @@ impossible position"; } if (read_packet) - { - thd_proc_info(thd, "Sending binlog event to slave"); + { + thd_proc_info(thd, "Sending binlog event to slave"); + pos = my_b_tell(&log); + if (RUN_HOOK(binlog_transmit, before_send_event, + (thd, flags, packet, log_file_name, pos))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "run 'before_send_event' hook failed"; + goto err; + } + if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ) { errmsg = "Failed on my_net_write()"; @@ -685,7 +765,7 @@ impossible position"; goto err; } - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + if (event_type == LOAD_EVENT) { if (send_file(thd)) { @@ -694,11 +774,13 @@ impossible position"; goto err; } } - packet->set("\0", 1, &my_charset_bin); - /* - No need to net_flush because we will get to flush later when - we hit EOF pretty quick - */ + + if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "Failed to run hook 'after_send_event'"; + goto err; + } } if (fatal_error) @@ -734,6 +816,10 @@ impossible position"; end_io_cache(&log); (void) my_close(file, MYF(MY_WME)); + /* reset transmit packet for the possible fake rotate event */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* Call fake_rotate_event() in case the previous log (the one which we have just finished reading) did not contain a Rotate event @@ -750,9 +836,6 @@ impossible position"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; } - - packet->length(0); - packet->append('\0'); } } @@ -760,6 +843,7 @@ end: end_io_cache(&log); (void)my_close(file, MYF(MY_WME)); + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); my_eof(thd); thd_proc_info(thd, "Waiting to finalize termination"); pthread_mutex_lock(&LOCK_thread_count); @@ -770,6 +854,7 @@ end: err: thd_proc_info(thd, "Waiting to finalize termination"); end_io_cache(&log); + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); /* Exclude iteration through thread list this is needed for purge_logs() - it will iterate through @@ -1064,6 +1149,7 @@ int reset_slave(THD *thd, Master_info* mi) goto err; } + RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); err: unlock_slave_threads(mi); if (error) @@ -1363,7 +1449,11 @@ int reset_master(THD* thd) ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG)); return 1; } - return mysql_bin_log.reset_logs(thd); + + if (mysql_bin_log.reset_logs(thd)) + return 1; + RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); + return 0; } int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, @@ -1836,5 +1926,3 @@ int init_replication_sys_vars() } #endif /* HAVE_REPLICATION */ - - |