summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorHe Zhenxing <zhenxing.he@sun.com>2009-09-26 12:49:49 +0800
committerHe Zhenxing <zhenxing.he@sun.com>2009-09-26 12:49:49 +0800
commit623ed58cfda0aef6b6bf545a4200357a58a8a4cc (patch)
tree28e6a4c77de3c3073b4dbe0b0e09e019adeaa556 /sql
parente465d113832aeac61a36902c7976d455e1525234 (diff)
downloadmariadb-git-623ed58cfda0aef6b6bf545a4200357a58a8a4cc.tar.gz
Backporting WL#4398 WL#1720
Backporting BUG#44058 BUG#42244 BUG#45672 BUG#45673 Backporting BUG#45819 BUG#45973 BUG#39012
Diffstat (limited to 'sql')
-rwxr-xr-xsql/CMakeLists.txt1
-rw-r--r--sql/Makefile.am6
-rw-r--r--sql/handler.cc12
-rw-r--r--sql/log.cc35
-rw-r--r--sql/log.h16
-rw-r--r--sql/mysqld.cc10
-rw-r--r--sql/replication.h490
-rw-r--r--sql/rpl_handler.cc493
-rw-r--r--sql/rpl_handler.h213
-rw-r--r--sql/slave.cc124
-rw-r--r--sql/sql_class.cc36
-rw-r--r--sql/sql_class.h21
-rw-r--r--sql/sql_parse.cc1
-rw-r--r--sql/sql_plugin.cc17
-rw-r--r--sql/sql_plugin.h8
-rw-r--r--sql/sql_repl.cc164
16 files changed, 1565 insertions, 82 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 6f162f4d84d..18508468f60 100755
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -75,6 +75,7 @@ SET (SQL_SOURCE
rpl_rli.cc rpl_mi.cc sql_servers.cc
sql_connect.cc scheduler.cc
sql_profile.cc event_parse_data.cc
+ rpl_handler.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.h
${PROJECT_SOURCE_DIR}/include/mysqld_error.h
diff --git a/sql/Makefile.am b/sql/Makefile.am
index 2a12de2eaf6..afce7db59f2 100644
--- a/sql/Makefile.am
+++ b/sql/Makefile.am
@@ -76,7 +76,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
sql_plugin.h authors.h event_parse_data.h \
event_data_objects.h event_scheduler.h \
sql_partition.h partition_info.h partition_element.h \
- contributors.h sql_servers.h
+ contributors.h sql_servers.h \
+ rpl_handler.h replication.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \
@@ -120,7 +121,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
event_queue.cc event_db_repository.cc events.cc \
sql_plugin.cc sql_binlog.cc \
sql_builtin.cc sql_tablespace.cc partition_info.cc \
- sql_servers.cc event_parse_data.cc
+ sql_servers.cc event_parse_data.cc \
+ rpl_handler.cc
nodist_mysqld_SOURCES = mini_client_errors.c pack.c client.c my_time.c my_user.c
diff --git a/sql/handler.cc b/sql/handler.cc
index e5c64452aaf..f17bb9f8036 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -24,6 +24,7 @@
#endif
#include "mysql_priv.h"
+#include "rpl_handler.h"
#include "rpl_filter.h"
#include <myisampack.h>
#include <errno.h>
@@ -221,6 +222,8 @@ handlerton *ha_checktype(THD *thd, enum legacy_db_type database_type,
return NULL;
}
+ RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+
switch (database_type) {
#ifndef NO_HASH
case DB_TYPE_HASH:
@@ -1190,6 +1193,7 @@ int ha_commit_trans(THD *thd, bool all)
if (cookie)
tc_log->unlog(cookie, xid);
DBUG_EXECUTE_IF("crash_commit_after", abort(););
+ RUN_HOOK(transaction, after_commit, (thd, FALSE));
end:
if (rw_trans)
start_waiting_global_read_lock(thd);
@@ -1337,6 +1341,7 @@ int ha_rollback_trans(THD *thd, bool all)
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
+ RUN_HOOK(transaction, after_rollback, (thd, FALSE));
DBUG_RETURN(error);
}
@@ -1371,7 +1376,14 @@ int ha_autocommit_or_rollback(THD *thd, int error)
thd->variables.tx_isolation=thd->session_tx_isolation;
}
+ else
#endif
+ {
+ if (!error)
+ RUN_HOOK(transaction, after_commit, (thd, FALSE));
+ else
+ RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+ }
DBUG_RETURN(error);
}
diff --git a/sql/log.cc b/sql/log.cc
index 1af2f3a4ddc..042431fc008 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -38,6 +38,7 @@
#endif
#include <mysql/plugin.h>
+#include "rpl_handler.h"
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
@@ -3683,9 +3684,11 @@ err:
}
-bool MYSQL_BIN_LOG::flush_and_sync()
+bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
{
int err=0, fd=log_file.file;
+ if (synced)
+ *synced= 0;
safe_mutex_assert_owner(&LOCK_log);
if (flush_io_cache(&log_file))
return 1;
@@ -3693,6 +3696,8 @@ bool MYSQL_BIN_LOG::flush_and_sync()
{
sync_binlog_counter= 0;
err=my_sync(fd, MYF(MY_WME));
+ if (synced)
+ *synced= 1;
}
return err;
}
@@ -3983,7 +3988,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
if (file == &log_file)
{
- error= flush_and_sync();
+ error= flush_and_sync(0);
if (!error)
{
signal_update();
@@ -4169,8 +4174,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
if (file == &log_file) // we are writing to the real log (disk)
{
- if (flush_and_sync())
+ bool synced= 0;
+ if (flush_and_sync(&synced))
goto err;
+
+ if (RUN_HOOK(binlog_storage, after_flush,
+ (thd, log_file_name, file->pos_in_file, synced))) {
+ sql_print_error("Failed to run 'after_flush' hooks");
+ goto err;
+ }
+
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
@@ -4425,7 +4438,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
DBUG_ASSERT(carry == 0);
if (sync_log)
- flush_and_sync();
+ flush_and_sync(0);
return 0; // All OK
}
@@ -4472,7 +4485,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
ev.write(&log_file);
if (lock)
{
- if (!error && !(error= flush_and_sync()))
+ if (!error && !(error= flush_and_sync(0)))
{
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
@@ -4560,7 +4573,8 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
if (incident && write_incident(thd, FALSE))
goto err;
- if (flush_and_sync())
+ bool synced= 0;
+ if (flush_and_sync(&synced))
goto err;
DBUG_EXECUTE_IF("half_binlogged_transaction", abort(););
if (cache->error) // Error on read
@@ -4569,6 +4583,15 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
write_error=1; // Don't give more errors
goto err;
}
+
+ if (RUN_HOOK(binlog_storage, after_flush,
+ (thd, log_file_name, log_file.pos_in_file, synced)))
+ {
+ sql_print_error("Failed to run 'after_flush' hooks");
+ write_error=1;
+ goto err;
+ }
+
signal_update();
}
diff --git a/sql/log.h b/sql/log.h
index d306d6f7182..0550c921658 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -378,7 +378,21 @@ public:
bool is_active(const char* log_file_name);
int update_log_index(LOG_INFO* linfo, bool need_update_threads);
void rotate_and_purge(uint flags);
- bool flush_and_sync();
+
+ /**
+ Flush binlog cache and synchronize to disk.
+
+ This function flushes events in binlog cache to binary log file,
+ it will do synchronizing according to the setting of system
+ variable 'sync_binlog'. If file is synchronized, @c synced will
+ be set to 1, otherwise 0.
+
+ @param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0
+
+ @retval 0 Success
+ @retval other Failure
+ */
+ bool flush_and_sync(bool *synced);
int purge_logs(const char *to_log, bool included,
bool need_mutex, bool need_update_threads,
ulonglong *decrease_log_space);
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 7e9eb6e7291..2ae4ec9e9b6 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -31,6 +31,8 @@
#include "rpl_injector.h"
+#include "rpl_handler.h"
+
#ifdef HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
#endif
@@ -1284,6 +1286,7 @@ void clean_up(bool print_message)
ha_end();
if (tc_log)
tc_log->close();
+ delegates_destroy();
xid_cache_free();
delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
multi_keycache_free();
@@ -3760,6 +3763,13 @@ static int init_server_components()
unireg_abort(1);
}
+ /* initialize delegates for extension observers */
+ if (delegates_init())
+ {
+ sql_print_error("Initialize extension delegates failed");
+ unireg_abort(1);
+ }
+
/* need to configure logging before initializing storage engines */
if (opt_update_log)
{
diff --git a/sql/replication.h b/sql/replication.h
new file mode 100644
index 00000000000..6b7be58a5b1
--- /dev/null
+++ b/sql/replication.h
@@ -0,0 +1,490 @@
+/* Copyright (C) 2008 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef REPLICATION_H
+#define REPLICATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ Transaction observer flags.
+*/
+enum Trans_flags {
+ /** Transaction is a real transaction */
+ TRANS_IS_REAL_TRANS = 1
+};
+
+/**
+ Transaction observer parameter
+*/
+typedef struct Trans_param {
+ uint32 server_id;
+ uint32 flags;
+
+ /*
+ The latest binary log file name and position written by current
+ transaction, if binary log is disabled or no log event has been
+ written into binary log file by current transaction (events
+ written into transaction log cache are not counted), these two
+ member will be zero.
+ */
+ const char *log_file;
+ my_off_t log_pos;
+} Trans_param;
+
+/**
+ Observes and extends transaction execution
+*/
+typedef struct Trans_observer {
+ uint32 len;
+
+ /**
+ This callback is called after transaction commit
+
+ This callback is called right after commit to storage engines for
+ transactional tables.
+
+ For non-transactional tables, this is called at the end of the
+ statement, before sending statement status, if the statement
+ succeeded.
+
+ @note The return value is currently ignored by the server.
+
+ @param param The parameter for transaction observers
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_commit)(Trans_param *param);
+
+ /**
+ This callback is called after transaction rollback
+
+ This callback is called right after rollback to storage engines
+ for transactional tables.
+
+ For non-transactional tables, this is called at the end of the
+ statement, before sending statement status, if the statement
+ failed.
+
+ @note The return value is currently ignored by the server.
+
+ @param param The parameter for transaction observers
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_rollback)(Trans_param *param);
+} Trans_observer;
+
+/**
+ Binlog storage flags
+*/
+enum Binlog_storage_flags {
+ /** Binary log was sync:ed */
+ BINLOG_STORAGE_IS_SYNCED = 1
+};
+
+/**
+ Binlog storage observer parameters
+ */
+typedef struct Binlog_storage_param {
+ uint32 server_id;
+} Binlog_storage_param;
+
+/**
+ Observe binlog logging storage
+*/
+typedef struct Binlog_storage_observer {
+ uint32 len;
+
+ /**
+ This callback is called after binlog has been flushed
+
+ This callback is called after cached events have been flushed to
+ binary log file. Whether the binary log file is synchronized to
+ disk is indicated by the bit BINLOG_STORAGE_IS_SYNCED in @a flags.
+
+ @param param Observer common parameter
+ @param log_file Binlog file name been updated
+ @param log_pos Binlog position after update
+ @param flags flags for binlog storage
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_flush)(Binlog_storage_param *param,
+ const char *log_file, my_off_t log_pos,
+ uint32 flags);
+} Binlog_storage_observer;
+
+/**
+ Replication binlog transmitter (binlog dump) observer parameter.
+*/
+typedef struct Binlog_transmit_param {
+ uint32 server_id;
+ uint32 flags;
+} Binlog_transmit_param;
+
+/**
+ Observe and extends the binlog dumping thread.
+*/
+typedef struct Binlog_transmit_observer {
+ uint32 len;
+
+ /**
+ This callback is called when binlog dumping starts
+
+
+ @param param Observer common parameter
+ @param log_file Binlog file name to transmit from
+ @param log_pos Binlog position to transmit from
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*transmit_start)(Binlog_transmit_param *param,
+ const char *log_file, my_off_t log_pos);
+
+ /**
+ This callback is called when binlog dumping stops
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*transmit_stop)(Binlog_transmit_param *param);
+
+ /**
+ This callback is called to reserve bytes in packet header for event transmission
+
+ This callback is called when resetting transmit packet header to
+ reserve bytes for this observer in packet header.
+
+ The @a header buffer is allocated by the server code, and @a size
+ is the size of the header buffer. Each observer can only reserve
+ a maximum size of @a size in the header.
+
+ @param param Observer common parameter
+ @param header Pointer of the header buffer
+ @param size Size of the header buffer
+ @param len Header length reserved by this observer
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*reserve_header)(Binlog_transmit_param *param,
+ unsigned char *header,
+ unsigned long size,
+ unsigned long *len);
+
+ /**
+ This callback is called before sending an event packet to slave
+
+ @param param Observer common parameter
+ @param packet Binlog event packet to send
+ @param len Length of the event packet
+ @param log_file Binlog file name of the event packet to send
+ @param log_pos Binlog position of the event packet to send
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*before_send_event)(Binlog_transmit_param *param,
+ unsigned char *packet, unsigned long len,
+ const char *log_file, my_off_t log_pos );
+
+ /**
+ This callback is called after sending an event packet to slave
+
+ @param param Observer common parameter
+ @param event_buf Binlog event packet buffer sent
+ @param len length of the event packet buffer
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_send_event)(Binlog_transmit_param *param,
+ const char *event_buf, unsigned long len);
+
+ /**
+ This callback is called after resetting master status
+
+ This is called when executing the command RESET MASTER, and is
+ used to reset status variables added by observers.
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_reset_master)(Binlog_transmit_param *param);
+} Binlog_transmit_observer;
+
+/**
+ Binlog relay IO flags
+*/
+enum Binlog_relay_IO_flags {
+ /** Binary relay log was sync:ed */
+ BINLOG_RELAY_IS_SYNCED = 1
+};
+
+
+/**
+ Replication binlog relay IO observer parameter
+*/
+typedef struct Binlog_relay_IO_param {
+ uint32 server_id;
+
+ /* Master host, user and port */
+ char *host;
+ char *user;
+ unsigned int port;
+
+ char *master_log_name;
+ my_off_t master_log_pos;
+
+ MYSQL *mysql; /* the connection to master */
+} Binlog_relay_IO_param;
+
+/**
+ Observes and extends the service of slave IO thread.
+*/
+typedef struct Binlog_relay_IO_observer {
+ uint32 len;
+
+ /**
+ This callback is called when slave IO thread starts
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*thread_start)(Binlog_relay_IO_param *param);
+
+ /**
+ This callback is called when slave IO thread stops
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*thread_stop)(Binlog_relay_IO_param *param);
+
+ /**
+ This callback is called before slave requesting binlog transmission from master
+
+ This is called before slave issuing BINLOG_DUMP command to master
+ to request binlog.
+
+ @param param Observer common parameter
+ @param flags binlog dump flags
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags);
+
+ /**
+ This callback is called after read an event packet from master
+
+ @param param Observer common parameter
+ @param packet The event packet read from master
+ @param len Length of the event packet read from master
+ @param event_buf The event packet return after process
+ @param event_len The length of event packet return after process
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_read_event)(Binlog_relay_IO_param *param,
+ const char *packet, unsigned long len,
+ const char **event_buf, unsigned long *event_len);
+
+ /**
+ This callback is called after written an event packet to relay log
+
+ @param param Observer common parameter
+ @param event_buf Event packet written to relay log
+ @param event_len Length of the event packet written to relay log
+ @param flags flags for relay log
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_queue_event)(Binlog_relay_IO_param *param,
+ const char *event_buf, unsigned long event_len,
+ uint32 flags);
+
+ /**
+ This callback is called after reset slave relay log IO status
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_reset_slave)(Binlog_relay_IO_param *param);
+} Binlog_relay_IO_observer;
+
+
+/**
+ Register a transaction observer
+
+ @param observer The transaction observer to register
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer already exists
+*/
+int register_trans_observer(Trans_observer *observer, void *p);
+
+/**
+ Unregister a transaction observer
+
+ @param observer The transaction observer to unregister
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer not exists
+*/
+int unregister_trans_observer(Trans_observer *observer, void *p);
+
+/**
+ Register a binlog storage observer
+
+ @param observer The binlog storage observer to register
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer already exists
+*/
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+ Unregister a binlog storage observer
+
+ @param observer The binlog storage observer to unregister
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer not exists
+*/
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+ Register a binlog transmit observer
+
+ @param observer The binlog transmit observer to register
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer already exists
+*/
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+ Unregister a binlog transmit observer
+
+ @param observer The binlog transmit observer to unregister
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer not exists
+*/
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+ Register a binlog relay IO (slave IO thread) observer
+
+ @param observer The binlog relay IO observer to register
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer already exists
+*/
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+ Unregister a binlog relay IO (slave IO thread) observer
+
+ @param observer The binlog relay IO observer to unregister
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer not exists
+*/
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+ Connect to master
+
+ This function can only used in the slave I/O thread context, and
+ will use the same master information to do the connection.
+
+ @code
+ MYSQL *mysql = mysql_init(NULL);
+ if (rpl_connect_master(mysql))
+ {
+ // do stuff with the connection
+ }
+ mysql_close(mysql); // close the connection
+ @endcode
+
+ @param mysql address of MYSQL structure to use, pass NULL will
+ create a new one
+
+ @return address of MYSQL structure on success, NULL on failure
+*/
+MYSQL *rpl_connect_master(MYSQL *mysql);
+
+/**
+ Set thread entering a condition
+
+ This function should be called before putting a thread to wait for
+ a condition. @a mutex should be held before calling this
+ function. After being waken up, @f thd_exit_cond should be called.
+
+ @param thd The thread entering the condition, NULL means current thread
+ @param cond The condition the thread is going to wait for
+ @param mutex The mutex associated with the condition, this must be
+ held before call this function
+ @param msg The new process message for the thread
+*/
+const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond,
+ pthread_mutex_t *mutex, const char *msg);
+
+/**
+ Set thread leaving a condition
+
+ This function should be called after a thread being waken up for a
+ condition.
+
+ @param thd The thread entering the condition, NULL means current thread
+ @param old_msg The process message, ususally this should be the old process
+ message before calling @f thd_enter_cond
+*/
+void thd_exit_cond(MYSQL_THD thd, const char *old_msg);
+
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* REPLICATION_H */
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
new file mode 100644
index 00000000000..aea838928b9
--- /dev/null
+++ b/sql/rpl_handler.cc
@@ -0,0 +1,493 @@
+/* Copyright (C) 2008 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "mysql_priv.h"
+
+#include "rpl_mi.h"
+#include "sql_repl.h"
+#include "log_event.h"
+#include "rpl_filter.h"
+#include <my_dir.h>
+#include "rpl_handler.h"
+
+Trans_delegate *transaction_delegate;
+Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+Binlog_transmit_delegate *binlog_transmit_delegate;
+Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+/*
+ structure to save transaction log filename and position
+*/
+typedef struct Trans_binlog_info {
+ my_off_t log_pos;
+ char log_file[FN_REFLEN];
+} Trans_binlog_info;
+
+static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+int get_user_var_int(const char *name,
+ long long int *value, int *null_value)
+{
+ my_bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) hash_search(&current_thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (!entry)
+ return 1;
+ *value= entry->val_int(&null_val);
+ if (null_value)
+ *null_value= null_val;
+ return 0;
+}
+
+int get_user_var_real(const char *name,
+ double *value, int *null_value)
+{
+ my_bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) hash_search(&current_thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (!entry)
+ return 1;
+ *value= entry->val_real(&null_val);
+ if (null_value)
+ *null_value= null_val;
+ return 0;
+}
+
+int get_user_var_str(const char *name, char *value,
+ size_t len, unsigned int precision, int *null_value)
+{
+ String str;
+ my_bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) hash_search(&current_thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (!entry)
+ return 1;
+ entry->val_str(&null_val, &str, precision);
+ strncpy(value, str.c_ptr(), len);
+ if (null_value)
+ *null_value= null_val;
+ return 0;
+}
+
+int delegates_init()
+{
+ static unsigned char trans_mem[sizeof(Trans_delegate)];
+ static unsigned char storage_mem[sizeof(Binlog_storage_delegate)];
+#ifdef HAVE_REPLICATION
+ static unsigned char transmit_mem[sizeof(Binlog_transmit_delegate)];
+ static unsigned char relay_io_mem[sizeof(Binlog_relay_IO_delegate)];
+#endif
+
+ if (!(transaction_delegate= new (trans_mem) Trans_delegate)
+ || (!transaction_delegate->is_inited())
+ || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate)
+ || (!binlog_storage_delegate->is_inited())
+#ifdef HAVE_REPLICATION
+ || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate)
+ || (!binlog_transmit_delegate->is_inited())
+ || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate)
+ || (!binlog_relay_io_delegate->is_inited())
+#endif /* HAVE_REPLICATION */
+ )
+ return 1;
+
+ if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
+ return 1;
+ return 0;
+}
+
+void delegates_destroy()
+{
+ if (transaction_delegate)
+ transaction_delegate->~Trans_delegate();
+ if (binlog_storage_delegate)
+ binlog_storage_delegate->~Binlog_storage_delegate();
+#ifdef HAVE_REPLICATION
+ if (binlog_transmit_delegate)
+ binlog_transmit_delegate->~Binlog_transmit_delegate();
+ if (binlog_relay_io_delegate)
+ binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
+#endif /* HAVE_REPLICATION */
+}
+
+/*
+ This macro is used by almost all the Delegate methods to iterate
+ over all the observers running given callback function of the
+ delegate .
+
+ Add observer plugins to the thd->lex list, after each statement, all
+ plugins add to thd->lex will be automatically unlocked.
+ */
+#define FOREACH_OBSERVER(r, f, thd, args) \
+ param.server_id= thd->server_id; \
+ read_lock(); \
+ Observer_info_iterator iter= observer_info_iter(); \
+ Observer_info *info= iter++; \
+ for (; info; info= iter++) \
+ { \
+ plugin_ref plugin= \
+ my_plugin_lock(thd, &info->plugin); \
+ if (!plugin) \
+ { \
+ r= 1; \
+ break; \
+ } \
+ if (((Observer *)info->observer)->f \
+ && ((Observer *)info->observer)->f args) \
+ { \
+ r= 1; \
+ plugin_unlock(thd, plugin); \
+ break; \
+ } \
+ plugin_unlock(thd, plugin); \
+ } \
+ unlock()
+
+
+int Trans_delegate::after_commit(THD *thd, bool all)
+{
+ Trans_param param;
+ bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+ if (is_real_trans)
+ param.flags |= TRANS_IS_REAL_TRANS;
+
+ Trans_binlog_info *log_info=
+ my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+ param.log_file= log_info ? log_info->log_file : 0;
+ param.log_pos= log_info ? log_info->log_pos : 0;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+
+ /*
+ This is the end of a real transaction or autocommit statement, we
+ can free the memory allocated for binlog file and position.
+ */
+ if (is_real_trans && log_info)
+ {
+ my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+ my_free(log_info, MYF(0));
+ }
+ return ret;
+}
+
+int Trans_delegate::after_rollback(THD *thd, bool all)
+{
+ Trans_param param;
+ bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+ if (is_real_trans)
+ param.flags |= TRANS_IS_REAL_TRANS;
+
+ Trans_binlog_info *log_info=
+ my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+ param.log_file= log_info ? log_info->log_file : 0;
+ param.log_pos= log_info ? log_info->log_pos : 0;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+
+ /*
+ This is the end of a real transaction or autocommit statement, we
+ can free the memory allocated for binlog file and position.
+ */
+ if (is_real_trans && log_info)
+ {
+ my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+ my_free(log_info, MYF(0));
+ }
+ return ret;
+}
+
+int Binlog_storage_delegate::after_flush(THD *thd,
+ const char *log_file,
+ my_off_t log_pos,
+ bool synced)
+{
+ Binlog_storage_param param;
+ uint32 flags=0;
+ if (synced)
+ flags |= BINLOG_STORAGE_IS_SYNCED;
+
+ Trans_binlog_info *log_info=
+ my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+ if (!log_info)
+ {
+ if(!(log_info=
+ (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
+ return 1;
+ my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
+ }
+
+ strcpy(log_info->log_file, log_file+dirname_length(log_file));
+ log_info->log_pos = log_pos;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_flush, thd,
+ (&param, log_info->log_file, log_info->log_pos, flags));
+ return ret;
+}
+
+#ifdef HAVE_REPLICATION
+int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
+ const char *log_file,
+ my_off_t log_pos)
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, transmit_start, thd, (&param, log_file, log_pos));
+ return ret;
+}
+
+int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, transmit_stop, thd, (&param));
+ return ret;
+}
+
+int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
+ String *packet)
+{
+ /* NOTE2ME: Maximum extra header size for each observer, I hope 32
+ bytes should be enough for each Observer to reserve their extra
+ header. If later found this is not enough, we can increase this
+ /HEZX
+ */
+#define RESERVE_HEADER_SIZE 32
+ unsigned char header[RESERVE_HEADER_SIZE];
+ ulong hlen;
+ Binlog_transmit_param param;
+ param.flags= flags;
+ param.server_id= thd->server_id;
+
+ int ret= 0;
+ read_lock();
+ Observer_info_iterator iter= observer_info_iter();
+ Observer_info *info= iter++;
+ for (; info; info= iter++)
+ {
+ plugin_ref plugin=
+ my_plugin_lock(thd, &info->plugin);
+ if (!plugin)
+ {
+ ret= 1;
+ break;
+ }
+ hlen= 0;
+ if (((Observer *)info->observer)->reserve_header
+ && ((Observer *)info->observer)->reserve_header(&param,
+ header,
+ RESERVE_HEADER_SIZE,
+ &hlen))
+ {
+ ret= 1;
+ plugin_unlock(thd, plugin);
+ break;
+ }
+ plugin_unlock(thd, plugin);
+ if (hlen == 0)
+ continue;
+ if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
+ {
+ ret= 1;
+ break;
+ }
+ }
+ unlock();
+ return ret;
+}
+
+int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
+ String *packet,
+ const char *log_file,
+ my_off_t log_pos)
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, before_send_event, thd,
+ (&param, (uchar *)packet->c_ptr(),
+ packet->length(),
+ log_file+dirname_length(log_file), log_pos));
+ return ret;
+}
+
+int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
+ String *packet)
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_send_event, thd,
+ (&param, packet->c_ptr(), packet->length()));
+ return ret;
+}
+
+int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
+
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_reset_master, thd, (&param));
+ return ret;
+}
+
+void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
+ Master_info *mi)
+{
+ param->mysql= mi->mysql;
+ param->user= mi->user;
+ param->host= mi->host;
+ param->port= mi->port;
+ param->master_log_name= mi->master_log_name;
+ param->master_log_pos= mi->master_log_pos;
+}
+
+int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
+{
+ Binlog_relay_IO_param param;
+ init_param(&param, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, thread_start, thd, (&param));
+ return ret;
+}
+
+
+int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
+{
+
+ Binlog_relay_IO_param param;
+ init_param(&param, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, thread_stop, thd, (&param));
+ return ret;
+}
+
+int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
+ Master_info *mi,
+ ushort flags)
+{
+ Binlog_relay_IO_param param;
+ init_param(&param, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, before_request_transmit, thd, (&param, (uint32)flags));
+ return ret;
+}
+
+int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
+ const char *packet, ulong len,
+ const char **event_buf,
+ ulong *event_len)
+{
+ Binlog_relay_IO_param param;
+ init_param(&param, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_read_event, thd,
+ (&param, packet, len, event_buf, event_len));
+ return ret;
+}
+
+int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
+ const char *event_buf,
+ ulong event_len,
+ bool synced)
+{
+ Binlog_relay_IO_param param;
+ init_param(&param, mi);
+
+ uint32 flags=0;
+ if (synced)
+ flags |= BINLOG_STORAGE_IS_SYNCED;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_queue_event, thd,
+ (&param, event_buf, event_len, flags));
+ return ret;
+}
+
+int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
+
+{
+ Binlog_relay_IO_param param;
+ init_param(&param, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_reset_slave, thd, (&param));
+ return ret;
+}
+#endif /* HAVE_REPLICATION */
+
+int register_trans_observer(Trans_observer *observer, void *p)
+{
+ return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_trans_observer(Trans_observer *observer, void *p)
+{
+ return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+ return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+ return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+#ifdef HAVE_REPLICATION
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+ return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+ return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+ return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+ return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+#endif /* HAVE_REPLICATION */
diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h
new file mode 100644
index 00000000000..4fb7b4e035b
--- /dev/null
+++ b/sql/rpl_handler.h
@@ -0,0 +1,213 @@
+/* Copyright (C) 2008 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef RPL_HANDLER_H
+#define RPL_HANDLER_H
+
+#include "mysql_priv.h"
+#include "rpl_mi.h"
+#include "rpl_rli.h"
+#include "sql_plugin.h"
+#include "replication.h"
+
+class Observer_info {
+public:
+ void *observer;
+ st_plugin_int *plugin_int;
+ plugin_ref plugin;
+
+ Observer_info(void *ob, st_plugin_int *p)
+ :observer(ob), plugin_int(p)
+ {
+ plugin= plugin_int_to_ref(plugin_int);
+ }
+};
+
+class Delegate {
+public:
+ typedef List<Observer_info> Observer_info_list;
+ typedef List_iterator<Observer_info> Observer_info_iterator;
+
+ int add_observer(void *observer, st_plugin_int *plugin)
+ {
+ int ret= FALSE;
+ if (!inited)
+ return TRUE;
+ write_lock();
+ Observer_info_iterator iter(observer_info_list);
+ Observer_info *info= iter++;
+ while (info && info->observer != observer)
+ info= iter++;
+ if (!info)
+ {
+ info= new Observer_info(observer, plugin);
+ if (!info || observer_info_list.push_back(info, &memroot))
+ ret= TRUE;
+ }
+ else
+ ret= TRUE;
+ unlock();
+ return ret;
+ }
+
+ int remove_observer(void *observer, st_plugin_int *plugin)
+ {
+ int ret= FALSE;
+ if (!inited)
+ return TRUE;
+ write_lock();
+ Observer_info_iterator iter(observer_info_list);
+ Observer_info *info= iter++;
+ while (info && info->observer != observer)
+ info= iter++;
+ if (info)
+ iter.remove();
+ else
+ ret= TRUE;
+ unlock();
+ return ret;
+ }
+
+ inline Observer_info_iterator observer_info_iter()
+ {
+ return Observer_info_iterator(observer_info_list);
+ }
+
+ inline bool is_empty()
+ {
+ return observer_info_list.is_empty();
+ }
+
+ inline int read_lock()
+ {
+ if (!inited)
+ return TRUE;
+ return rw_rdlock(&lock);
+ }
+
+ inline int write_lock()
+ {
+ if (!inited)
+ return TRUE;
+ return rw_wrlock(&lock);
+ }
+
+ inline int unlock()
+ {
+ if (!inited)
+ return TRUE;
+ return rw_unlock(&lock);
+ }
+
+ inline bool is_inited()
+ {
+ return inited;
+ }
+
+ Delegate()
+ {
+ inited= FALSE;
+ if (my_rwlock_init(&lock, NULL))
+ return;
+ init_sql_alloc(&memroot, 1024, 0);
+ inited= TRUE;
+ }
+ ~Delegate()
+ {
+ inited= FALSE;
+ rwlock_destroy(&lock);
+ free_root(&memroot, MYF(0));
+ }
+
+private:
+ Observer_info_list observer_info_list;
+ rw_lock_t lock;
+ MEM_ROOT memroot;
+ bool inited;
+};
+
+class Trans_delegate
+ :public Delegate {
+public:
+ typedef Trans_observer Observer;
+ int before_commit(THD *thd, bool all);
+ int before_rollback(THD *thd, bool all);
+ int after_commit(THD *thd, bool all);
+ int after_rollback(THD *thd, bool all);
+};
+
+class Binlog_storage_delegate
+ :public Delegate {
+public:
+ typedef Binlog_storage_observer Observer;
+ int after_flush(THD *thd, const char *log_file,
+ my_off_t log_pos, bool synced);
+};
+
+#ifdef HAVE_REPLICATION
+class Binlog_transmit_delegate
+ :public Delegate {
+public:
+ typedef Binlog_transmit_observer Observer;
+ int transmit_start(THD *thd, ushort flags,
+ const char *log_file, my_off_t log_pos);
+ int transmit_stop(THD *thd, ushort flags);
+ int reserve_header(THD *thd, ushort flags, String *packet);
+ int before_send_event(THD *thd, ushort flags,
+ String *packet, const
+ char *log_file, my_off_t log_pos );
+ int after_send_event(THD *thd, ushort flags,
+ String *packet);
+ int after_reset_master(THD *thd, ushort flags);
+};
+
+class Binlog_relay_IO_delegate
+ :public Delegate {
+public:
+ typedef Binlog_relay_IO_observer Observer;
+ int thread_start(THD *thd, Master_info *mi);
+ int thread_stop(THD *thd, Master_info *mi);
+ int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
+ int after_read_event(THD *thd, Master_info *mi,
+ const char *packet, ulong len,
+ const char **event_buf, ulong *event_len);
+ int after_queue_event(THD *thd, Master_info *mi,
+ const char *event_buf, ulong event_len,
+ bool synced);
+ int after_reset_slave(THD *thd, Master_info *mi);
+private:
+ void init_param(Binlog_relay_IO_param *param, Master_info *mi);
+};
+#endif /* HAVE_REPLICATION */
+
+int delegates_init();
+void delegates_destroy();
+
+extern Trans_delegate *transaction_delegate;
+extern Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+extern Binlog_transmit_delegate *binlog_transmit_delegate;
+extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+/*
+ if there is no observers in the delegate, we can return 0
+ immediately.
+*/
+#define RUN_HOOK(group, hook, args) \
+ (group ##_delegate->is_empty() ? \
+ 0 : group ##_delegate->hook args)
+
+#endif /* RPL_HANDLER_H */
diff --git a/sql/slave.cc b/sql/slave.cc
index fac9ee214c5..4988886dce4 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -40,6 +40,7 @@
#include <errmsg.h>
#include <mysqld_error.h>
#include <mysys_err.h>
+#include "rpl_handler.h"
#ifdef HAVE_REPLICATION
@@ -69,6 +70,8 @@ ulonglong relay_log_space_limit = 0;
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
int events_till_abort = -1;
+static pthread_key(Master_info*, RPL_MASTER_INFO);
+
enum enum_slave_reconnect_actions
{
SLAVE_RECON_ACT_REG= 0,
@@ -231,6 +234,10 @@ int init_slave()
TODO: re-write this to interate through the list of files
for multi-master
*/
+
+ if (pthread_key_create(&RPL_MASTER_INFO, NULL))
+ goto err;
+
active_mi= new Master_info;
/*
@@ -1868,17 +1875,22 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
}
-static int request_dump(MYSQL* mysql, Master_info* mi,
- bool *suppress_warnings)
+static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
+ bool *suppress_warnings)
{
uchar buf[FN_REFLEN + 10];
int len;
- int binlog_flags = 0; // for now
+ ushort binlog_flags = 0; // for now
char* logname = mi->master_log_name;
DBUG_ENTER("request_dump");
*suppress_warnings= FALSE;
+ if (RUN_HOOK(binlog_relay_io,
+ before_request_transmit,
+ (thd, mi, binlog_flags)))
+ DBUG_RETURN(1);
+
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
@@ -2532,6 +2544,16 @@ pthread_handler_t handle_slave_io(void *arg)
mi->master_log_name,
llstr(mi->master_log_pos,llbuff)));
+ /* This must be called before run any binlog_relay_io hooks */
+ my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
+
+ if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
+ {
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook");
+ goto err;
+ }
+
if (!(mi->mysql = mysql = mysql_init(NULL)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
@@ -2621,7 +2643,7 @@ connected:
while (!io_slave_killed(thd,mi))
{
thd_proc_info(thd, "Requesting binlog dump");
- if (request_dump(mysql, mi, &suppress_warnings))
+ if (request_dump(thd, mysql, mi, &suppress_warnings))
{
sql_print_error("Failed on request_dump()");
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
@@ -2641,6 +2663,7 @@ requesting master dump") ||
goto err;
goto connected;
});
+ const char *event_buf;
DBUG_ASSERT(mi->last_error().number == 0);
while (!io_slave_killed(thd,mi))
@@ -2697,14 +2720,37 @@ Stopping slave I/O thread due to out-of-memory error from master");
retry_count=0; // ok event, reset retry counter
thd_proc_info(thd, "Queueing master event to the relay log");
- if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
- event_len))
+ event_buf= (const char*)mysql->net.read_pos + 1;
+ if (RUN_HOOK(binlog_relay_io, after_read_event,
+ (thd, mi,(const char*)mysql->net.read_pos + 1,
+ event_len, &event_buf, &event_len)))
+ {
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR),
+ "Failed to run 'after_read_event' hook");
+ goto err;
+ }
+
+ /* XXX: 'synced' should be updated by queue_event to indicate
+ whether event has been synced to disk */
+ bool synced= 0;
+ if (queue_event(mi, event_buf, event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"could not queue event from master");
goto err;
}
+
+ if (RUN_HOOK(binlog_relay_io, after_queue_event,
+ (thd, mi, event_buf, event_len, synced)))
+ {
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR),
+ "Failed to run 'after_queue_event' hook");
+ goto err;
+ }
+
if (flush_master_info(mi, 1))
{
sql_print_error("Failed to flush master info file");
@@ -2750,6 +2796,7 @@ err:
// print the current replication position
sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
+ RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
thd->set_query(NULL, 0);
thd->reset_db(NULL, 0);
if (mysql)
@@ -3906,6 +3953,71 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
}
+MYSQL *rpl_connect_master(MYSQL *mysql)
+{
+ THD *thd= current_thd;
+ Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
+ if (!mi)
+ {
+ sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
+ return NULL;
+ }
+
+ bool allocated= false;
+
+ if (!mysql)
+ {
+ if(!(mysql= mysql_init(NULL)))
+ {
+ sql_print_error("rpl_connect_master: failed in mysql_init()");
+ return NULL;
+ }
+ allocated= true;
+ }
+
+ /*
+ XXX: copied from connect_to_master, this function should not
+ change the slave status, so we cannot use connect_to_master
+ directly
+
+ TODO: make this part a seperate function to eliminate duplication
+ */
+ mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
+ mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
+
+#ifdef HAVE_OPENSSL
+ if (mi->ssl)
+ {
+ mysql_ssl_set(mysql,
+ mi->ssl_key[0]?mi->ssl_key:0,
+ mi->ssl_cert[0]?mi->ssl_cert:0,
+ mi->ssl_ca[0]?mi->ssl_ca:0,
+ mi->ssl_capath[0]?mi->ssl_capath:0,
+ mi->ssl_cipher[0]?mi->ssl_cipher:0);
+ mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
+ &mi->ssl_verify_server_cert);
+ }
+#endif
+
+ mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+ /* This one is not strictly needed but we have it here for completeness */
+ mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
+
+ if (io_slave_killed(thd, mi)
+ || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
+ mi->port, 0, 0))
+ {
+ if (!io_slave_killed(thd, mi))
+ sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)",
+ mysql_error(mysql), mysql_errno(mysql));
+
+ if (allocated)
+ mysql_close(mysql); // this will free the object
+ return NULL;
+ }
+ return mysql;
+}
+
/*
Store the file and position where the execute-slave thread are in the
relay log.
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 3f568566c89..755e60b4fc2 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -278,6 +278,42 @@ const char *set_thd_proc_info(THD *thd, const char *info,
}
extern "C"
+const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond,
+ pthread_mutex_t *mutex, const char *msg)
+{
+ if (!thd)
+ thd= current_thd;
+
+ const char* old_msg = thd->proc_info;
+ safe_mutex_assert_owner(mutex);
+ thd->mysys_var->current_mutex = mutex;
+ thd->mysys_var->current_cond = cond;
+ thd->proc_info = msg;
+ return old_msg;
+}
+
+extern "C"
+void thd_exit_cond(MYSQL_THD thd, const char *old_msg)
+{
+ if (!thd)
+ thd= current_thd;
+
+ /*
+ Putting the mutex unlock in thd_exit_cond() ensures that
+ mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
+ locked (if that would not be the case, you'll get a deadlock if someone
+ does a THD::awake() on you).
+ */
+ pthread_mutex_unlock(thd->mysys_var->current_mutex);
+ pthread_mutex_lock(&thd->mysys_var->mutex);
+ thd->mysys_var->current_mutex = 0;
+ thd->mysys_var->current_cond = 0;
+ thd->proc_info = old_msg;
+ pthread_mutex_unlock(&thd->mysys_var->mutex);
+ return;
+}
+
+extern "C"
void **thd_ha_data(const THD *thd, const struct handlerton *hton)
{
return (void **) &thd->ha_data[hton->slot].ha_ptr;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index f52d5fae76f..3d7ff0ca0a1 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -22,6 +22,7 @@
#include "log.h"
#include "rpl_tblmap.h"
+#include "replication.h"
/**
An interface that is used to take an action when
@@ -1940,27 +1941,11 @@ public:
inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex,
const char* msg)
{
- const char* old_msg = proc_info;
- safe_mutex_assert_owner(mutex);
- mysys_var->current_mutex = mutex;
- mysys_var->current_cond = cond;
- proc_info = msg;
- return old_msg;
+ return thd_enter_cond(this, cond, mutex, msg);
}
inline void exit_cond(const char* old_msg)
{
- /*
- Putting the mutex unlock in exit_cond() ensures that
- mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
- locked (if that would not be the case, you'll get a deadlock if someone
- does a THD::awake() on you).
- */
- pthread_mutex_unlock(mysys_var->current_mutex);
- pthread_mutex_lock(&mysys_var->mutex);
- mysys_var->current_mutex = 0;
- mysys_var->current_cond = 0;
- proc_info = old_msg;
- pthread_mutex_unlock(&mysys_var->mutex);
+ thd_exit_cond(this, old_msg);
}
inline time_t query_start() { query_start_used=1; return start_time; }
inline void set_time()
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index ca27d476213..6019c385fb8 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -21,6 +21,7 @@
#include <m_ctype.h>
#include <myisam.h>
#include <my_dir.h>
+#include "rpl_handler.h"
#include "sp_head.h"
#include "sp.h"
diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc
index da168d36429..8cf8c4cb81f 100644
--- a/sql/sql_plugin.cc
+++ b/sql/sql_plugin.cc
@@ -19,14 +19,6 @@
#define REPORT_TO_LOG 1
#define REPORT_TO_USER 2
-#ifdef DBUG_OFF
-#define plugin_ref_to_int(A) A
-#define plugin_int_to_ref(A) A
-#else
-#define plugin_ref_to_int(A) (A ? A[0] : NULL)
-#define plugin_int_to_ref(A) &(A)
-#endif
-
extern struct st_mysql_plugin *mysqld_builtins[];
/**
@@ -54,7 +46,8 @@ const LEX_STRING plugin_type_names[MYSQL_MAX_PLUGIN_TYPE_NUM]=
{ C_STRING_WITH_LEN("STORAGE ENGINE") },
{ C_STRING_WITH_LEN("FTPARSER") },
{ C_STRING_WITH_LEN("DAEMON") },
- { C_STRING_WITH_LEN("INFORMATION SCHEMA") }
+ { C_STRING_WITH_LEN("INFORMATION SCHEMA") },
+ { C_STRING_WITH_LEN("REPLICATION") },
};
extern int initialize_schema_table(st_plugin_int *plugin);
@@ -93,7 +86,8 @@ static int min_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
MYSQL_HANDLERTON_INTERFACE_VERSION,
MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION,
- MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION
+ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
+ MYSQL_REPLICATION_INTERFACE_VERSION,
};
static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
{
@@ -101,7 +95,8 @@ static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
MYSQL_HANDLERTON_INTERFACE_VERSION,
MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION,
- MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION
+ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
+ MYSQL_REPLICATION_INTERFACE_VERSION,
};
static bool initialized= 0;
diff --git a/sql/sql_plugin.h b/sql/sql_plugin.h
index 004d0d5abb7..c6ad846943c 100644
--- a/sql/sql_plugin.h
+++ b/sql/sql_plugin.h
@@ -18,6 +18,14 @@
class sys_var;
+#ifdef DBUG_OFF
+#define plugin_ref_to_int(A) A
+#define plugin_int_to_ref(A) A
+#else
+#define plugin_ref_to_int(A) (A ? A[0] : NULL)
+#define plugin_int_to_ref(A) &(A)
+#endif
+
/*
the following flags are valid for plugin_init()
*/
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 0ec8d91214c..671f6785640 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -21,6 +21,7 @@
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
+#include "rpl_handler.h"
int max_binlog_dump_events = 0; // unlimited
my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -80,6 +81,32 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
DBUG_RETURN(0);
}
+/*
+ Reset thread transmit packet buffer for event sending
+
+ This function allocates header bytes for event transmission, and
+ should be called before store the event data to the packet buffer.
+*/
+static int reset_transmit_packet(THD *thd, ushort flags,
+ ulong *ev_offset, const char **errmsg)
+{
+ int ret= 0;
+ String *packet= &thd->packet;
+
+ /* reserve and set default header */
+ packet->length(0);
+ packet->set("\0", 1, &my_charset_bin);
+
+ if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
+ {
+ *errmsg= "Failed to run hook 'reserve_header'";
+ my_errno= ER_UNKNOWN_ERROR;
+ ret= 1;
+ }
+ *ev_offset= packet->length();
+ return ret;
+}
+
static int send_file(THD *thd)
{
NET* net = &thd->net;
@@ -346,6 +373,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
LOG_INFO linfo;
char *log_file_name = linfo.log_file_name;
char search_file_name[FN_REFLEN], *name;
+
+ ulong ev_offset;
+
IO_CACHE log;
File file = -1;
String* packet = &thd->packet;
@@ -361,6 +391,14 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log));
+ sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
+ thd->server_id, log_ident, (ulong)pos);
+ if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+ {
+ errmsg= "Failed to run hook 'transmit_start'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
#ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
@@ -416,11 +454,9 @@ impossible position";
goto err;
}
- /*
- We need to start a packet with something other than 255
- to distinguish it from error
- */
- packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
+ /* reset transmit packet for the fake rotate event below */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
/*
Tell the client about the log name with a fake Rotate event;
@@ -460,7 +496,7 @@ impossible position";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
- packet->set("\0", 1, &my_charset_bin);
+
/*
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
this larger than the corresponding packet (query) sent
@@ -476,6 +512,11 @@ impossible position";
log_lock = mysql_bin_log.get_log_lock();
if (pos > BIN_LOG_HEADER_SIZE)
{
+ /* reset transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
Try to find a Format_description_log_event at the beginning of
the binlog
@@ -483,29 +524,30 @@ impossible position";
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
/*
- The packet has offsets equal to the normal offsets in a binlog
- event +1 (the first character is \0).
+ The packet has offsets equal to the normal offsets in a
+ binlog event + ev_offset (the first ev_offset characters are
+ the header (default \0)).
*/
DBUG_PRINT("info",
("Looked for a Format_description_log_event, found event type %d",
- (*packet)[EVENT_TYPE_OFFSET+1]));
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
+ if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
{
- binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+ binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
- (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
mark that this event with "log_pos=0", so the slave
should not increment master's binlog position
(rli->group_master_log_pos)
*/
- int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
+ int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
/*
if reconnect master sends FD event with `created' as 0
to avoid destroying temp tables.
*/
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
- ST_CREATED_OFFSET+1, (ulong) 0);
+ ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* send it */
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
@@ -531,8 +573,6 @@ impossible position";
Format_description_log_event will be found naturally if it is written.
*/
}
- /* reset the packet as we wrote to it in any case */
- packet->set("\0", 1, &my_charset_bin);
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
else
{
@@ -544,6 +584,12 @@ impossible position";
while (!net->error && net->vio != 0 && !thd->killed)
{
+ Log_event_type event_type= UNKNOWN_EVENT;
+
+ /* reset the transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
#ifndef DBUG_OFF
@@ -556,15 +602,25 @@ impossible position";
}
#endif
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
+ if (event_type == FORMAT_DESCRIPTION_EVENT)
{
- binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+ binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
- (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
- else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
+ else if (event_type == STOP_EVENT)
binlog_can_be_corrupted= FALSE;
+ pos = my_b_tell(&log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "run 'before_send_event' hook failed";
+ goto err;
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
@@ -572,9 +628,8 @@ impossible position";
goto err;
}
- DBUG_PRINT("info", ("log event code %d",
- (*packet)[LOG_EVENT_OFFSET+1] ));
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ DBUG_PRINT("info", ("log event code %d", event_type));
+ if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
@@ -583,7 +638,17 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1, &my_charset_bin);
+
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ errmsg= "Failed to run hook 'after_send_event'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+
+ /* reset transmit packet for next loop */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
}
/*
@@ -634,6 +699,11 @@ impossible position";
}
#endif
+ /* reset the transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
No one will update the log while we are reading
now, but we'll be quick and just read one record
@@ -650,6 +720,7 @@ impossible position";
/* we read successfully, so we'll need to send it to the slave */
pthread_mutex_unlock(log_lock);
read_packet = 1;
+ event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
break;
case LOG_READ_EOF:
@@ -676,8 +747,17 @@ impossible position";
}
if (read_packet)
- {
- thd_proc_info(thd, "Sending binlog event to slave");
+ {
+ thd_proc_info(thd, "Sending binlog event to slave");
+ pos = my_b_tell(&log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "run 'before_send_event' hook failed";
+ goto err;
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
{
errmsg = "Failed on my_net_write()";
@@ -685,7 +765,7 @@ impossible position";
goto err;
}
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
@@ -694,11 +774,13 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1, &my_charset_bin);
- /*
- No need to net_flush because we will get to flush later when
- we hit EOF pretty quick
- */
+
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "Failed to run hook 'after_send_event'";
+ goto err;
+ }
}
if (fatal_error)
@@ -734,6 +816,10 @@ impossible position";
end_io_cache(&log);
(void) my_close(file, MYF(MY_WME));
+ /* reset transmit packet for the possible fake rotate event */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
Call fake_rotate_event() in case the previous log (the one which
we have just finished reading) did not contain a Rotate event
@@ -750,9 +836,6 @@ impossible position";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
-
- packet->length(0);
- packet->append('\0');
}
}
@@ -760,6 +843,7 @@ end:
end_io_cache(&log);
(void)my_close(file, MYF(MY_WME));
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
my_eof(thd);
thd_proc_info(thd, "Waiting to finalize termination");
pthread_mutex_lock(&LOCK_thread_count);
@@ -770,6 +854,7 @@ end:
err:
thd_proc_info(thd, "Waiting to finalize termination");
end_io_cache(&log);
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
/*
Exclude iteration through thread list
this is needed for purge_logs() - it will iterate through
@@ -1064,6 +1149,7 @@ int reset_slave(THD *thd, Master_info* mi)
goto err;
}
+ RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
err:
unlock_slave_threads(mi);
if (error)
@@ -1363,7 +1449,11 @@ int reset_master(THD* thd)
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
return 1;
}
- return mysql_bin_log.reset_logs(thd);
+
+ if (mysql_bin_log.reset_logs(thd))
+ return 1;
+ RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
+ return 0;
}
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
@@ -1836,5 +1926,3 @@ int init_replication_sys_vars()
}
#endif /* HAVE_REPLICATION */
-
-