diff options
author | He Zhenxing <zhenxing.he@sun.com> | 2009-09-26 12:49:49 +0800 |
---|---|---|
committer | He Zhenxing <zhenxing.he@sun.com> | 2009-09-26 12:49:49 +0800 |
commit | 623ed58cfda0aef6b6bf545a4200357a58a8a4cc (patch) | |
tree | 28e6a4c77de3c3073b4dbe0b0e09e019adeaa556 /sql/rpl_handler.h | |
parent | e465d113832aeac61a36902c7976d455e1525234 (diff) | |
download | mariadb-git-623ed58cfda0aef6b6bf545a4200357a58a8a4cc.tar.gz |
Backporting WL#4398 WL#1720
Backporting BUG#44058 BUG#42244 BUG#45672 BUG#45673
Backporting BUG#45819 BUG#45973 BUG#39012
Diffstat (limited to 'sql/rpl_handler.h')
-rw-r--r-- | sql/rpl_handler.h | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h new file mode 100644 index 00000000000..4fb7b4e035b --- /dev/null +++ b/sql/rpl_handler.h @@ -0,0 +1,213 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef RPL_HANDLER_H +#define RPL_HANDLER_H + +#include "mysql_priv.h" +#include "rpl_mi.h" +#include "rpl_rli.h" +#include "sql_plugin.h" +#include "replication.h" + +class Observer_info { +public: + void *observer; + st_plugin_int *plugin_int; + plugin_ref plugin; + + Observer_info(void *ob, st_plugin_int *p) + :observer(ob), plugin_int(p) + { + plugin= plugin_int_to_ref(plugin_int); + } +}; + +class Delegate { +public: + typedef List<Observer_info> Observer_info_list; + typedef List_iterator<Observer_info> Observer_info_iterator; + + int add_observer(void *observer, st_plugin_int *plugin) + { + int ret= FALSE; + if (!inited) + return TRUE; + write_lock(); + Observer_info_iterator iter(observer_info_list); + Observer_info *info= iter++; + while (info && info->observer != observer) + info= iter++; + if (!info) + { + info= new Observer_info(observer, plugin); + if (!info || observer_info_list.push_back(info, &memroot)) + ret= TRUE; + } + else + ret= TRUE; + unlock(); + return ret; + } + + int remove_observer(void *observer, st_plugin_int *plugin) + { + int ret= FALSE; + if (!inited) + return TRUE; + write_lock(); + Observer_info_iterator iter(observer_info_list); + Observer_info *info= iter++; + while (info && info->observer != observer) + info= iter++; + if (info) + iter.remove(); + else + ret= TRUE; + unlock(); + return ret; + } + + inline Observer_info_iterator observer_info_iter() + { + return Observer_info_iterator(observer_info_list); + } + + inline bool is_empty() + { + return observer_info_list.is_empty(); + } + + inline int read_lock() + { + if (!inited) + return TRUE; + return rw_rdlock(&lock); + } + + inline int write_lock() + { + if (!inited) + return TRUE; + return rw_wrlock(&lock); + } + + inline int unlock() + { + if (!inited) + return TRUE; + return rw_unlock(&lock); + } + + inline bool is_inited() + { + return inited; + } + + Delegate() + { + inited= FALSE; + if (my_rwlock_init(&lock, NULL)) + return; + init_sql_alloc(&memroot, 1024, 0); + inited= TRUE; + } + ~Delegate() + { + inited= FALSE; + rwlock_destroy(&lock); + free_root(&memroot, MYF(0)); + } + +private: + Observer_info_list observer_info_list; + rw_lock_t lock; + MEM_ROOT memroot; + bool inited; +}; + +class Trans_delegate + :public Delegate { +public: + typedef Trans_observer Observer; + int before_commit(THD *thd, bool all); + int before_rollback(THD *thd, bool all); + int after_commit(THD *thd, bool all); + int after_rollback(THD *thd, bool all); +}; + +class Binlog_storage_delegate + :public Delegate { +public: + typedef Binlog_storage_observer Observer; + int after_flush(THD *thd, const char *log_file, + my_off_t log_pos, bool synced); +}; + +#ifdef HAVE_REPLICATION +class Binlog_transmit_delegate + :public Delegate { +public: + typedef Binlog_transmit_observer Observer; + int transmit_start(THD *thd, ushort flags, + const char *log_file, my_off_t log_pos); + int transmit_stop(THD *thd, ushort flags); + int reserve_header(THD *thd, ushort flags, String *packet); + int before_send_event(THD *thd, ushort flags, + String *packet, const + char *log_file, my_off_t log_pos ); + int after_send_event(THD *thd, ushort flags, + String *packet); + int after_reset_master(THD *thd, ushort flags); +}; + +class Binlog_relay_IO_delegate + :public Delegate { +public: + typedef Binlog_relay_IO_observer Observer; + int thread_start(THD *thd, Master_info *mi); + int thread_stop(THD *thd, Master_info *mi); + int before_request_transmit(THD *thd, Master_info *mi, ushort flags); + int after_read_event(THD *thd, Master_info *mi, + const char *packet, ulong len, + const char **event_buf, ulong *event_len); + int after_queue_event(THD *thd, Master_info *mi, + const char *event_buf, ulong event_len, + bool synced); + int after_reset_slave(THD *thd, Master_info *mi); +private: + void init_param(Binlog_relay_IO_param *param, Master_info *mi); +}; +#endif /* HAVE_REPLICATION */ + +int delegates_init(); +void delegates_destroy(); + +extern Trans_delegate *transaction_delegate; +extern Binlog_storage_delegate *binlog_storage_delegate; +#ifdef HAVE_REPLICATION +extern Binlog_transmit_delegate *binlog_transmit_delegate; +extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; +#endif /* HAVE_REPLICATION */ + +/* + if there is no observers in the delegate, we can return 0 + immediately. +*/ +#define RUN_HOOK(group, hook, args) \ + (group ##_delegate->is_empty() ? \ + 0 : group ##_delegate->hook args) + +#endif /* RPL_HANDLER_H */ |