diff options
Diffstat (limited to 'sql/rpl_handler.h')
-rw-r--r-- | sql/rpl_handler.h | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h new file mode 100644 index 00000000000..4d1bce45c54 --- /dev/null +++ b/sql/rpl_handler.h @@ -0,0 +1,216 @@ +/* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#ifndef RPL_HANDLER_H +#define RPL_HANDLER_H + +#include "sql_priv.h" +#include "rpl_mi.h" +#include "rpl_rli.h" +#include "sql_plugin.h" +#include "replication.h" + +class Observer_info { +public: + void *observer; + st_plugin_int *plugin_int; + plugin_ref plugin; + + Observer_info(void *ob, st_plugin_int *p) + :observer(ob), plugin_int(p) + { + plugin= plugin_int_to_ref(plugin_int); + } +}; + +class Delegate { +public: + typedef List<Observer_info> Observer_info_list; + typedef List_iterator<Observer_info> Observer_info_iterator; + + int add_observer(void *observer, st_plugin_int *plugin) + { + int ret= FALSE; + if (!inited) + return TRUE; + write_lock(); + Observer_info_iterator iter(observer_info_list); + Observer_info *info= iter++; + while (info && info->observer != observer) + info= iter++; + if (!info) + { + info= new Observer_info(observer, plugin); + if (!info || observer_info_list.push_back(info, &memroot)) + ret= TRUE; + } + else + ret= TRUE; + unlock(); + return ret; + } + + int remove_observer(void *observer, st_plugin_int *plugin) + { + int ret= FALSE; + if (!inited) + return TRUE; + write_lock(); + Observer_info_iterator iter(observer_info_list); + Observer_info *info= iter++; + while (info && info->observer != observer) + info= iter++; + if (info) + { + iter.remove(); + delete info; + } + else + ret= TRUE; + unlock(); + return ret; + } + + inline Observer_info_iterator observer_info_iter() + { + return Observer_info_iterator(observer_info_list); + } + + inline bool is_empty() + { + return observer_info_list.is_empty(); + } + + inline int read_lock() + { + if (!inited) + return TRUE; + return rw_rdlock(&lock); + } + + inline int write_lock() + { + if (!inited) + return TRUE; + return rw_wrlock(&lock); + } + + inline int unlock() + { + if (!inited) + return TRUE; + return rw_unlock(&lock); + } + + inline bool is_inited() + { + return inited; + } + + Delegate() + { + inited= FALSE; + if (my_rwlock_init(&lock, NULL)) + return; + init_sql_alloc(&memroot, 1024, 0); + inited= TRUE; + } + ~Delegate() + { + inited= FALSE; + rwlock_destroy(&lock); + free_root(&memroot, MYF(0)); + } + +private: + Observer_info_list observer_info_list; + rw_lock_t lock; + MEM_ROOT memroot; + bool inited; +}; + +class Trans_delegate + :public Delegate { +public: + typedef Trans_observer Observer; + int before_commit(THD *thd, bool all); + int before_rollback(THD *thd, bool all); + int after_commit(THD *thd, bool all); + int after_rollback(THD *thd, bool all); +}; + +class Binlog_storage_delegate + :public Delegate { +public: + typedef Binlog_storage_observer Observer; + int after_flush(THD *thd, const char *log_file, + my_off_t log_pos, bool synced); +}; + +#ifdef HAVE_REPLICATION +class Binlog_transmit_delegate + :public Delegate { +public: + typedef Binlog_transmit_observer Observer; + int transmit_start(THD *thd, ushort flags, + const char *log_file, my_off_t log_pos); + int transmit_stop(THD *thd, ushort flags); + int reserve_header(THD *thd, ushort flags, String *packet); + int before_send_event(THD *thd, ushort flags, + String *packet, const + char *log_file, my_off_t log_pos ); + int after_send_event(THD *thd, ushort flags, + String *packet); + int after_reset_master(THD *thd, ushort flags); +}; + +class Binlog_relay_IO_delegate + :public Delegate { +public: + typedef Binlog_relay_IO_observer Observer; + int thread_start(THD *thd, Master_info *mi); + int thread_stop(THD *thd, Master_info *mi); + int before_request_transmit(THD *thd, Master_info *mi, ushort flags); + int after_read_event(THD *thd, Master_info *mi, + const char *packet, ulong len, + const char **event_buf, ulong *event_len); + int after_queue_event(THD *thd, Master_info *mi, + const char *event_buf, ulong event_len, + bool synced); + int after_reset_slave(THD *thd, Master_info *mi); +private: + void init_param(Binlog_relay_IO_param *param, Master_info *mi); +}; +#endif /* HAVE_REPLICATION */ + +int delegates_init(); +void delegates_destroy(); + +extern Trans_delegate *transaction_delegate; +extern Binlog_storage_delegate *binlog_storage_delegate; +#ifdef HAVE_REPLICATION +extern Binlog_transmit_delegate *binlog_transmit_delegate; +extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; +#endif /* HAVE_REPLICATION */ + +/* + if there is no observers in the delegate, we can return 0 + immediately. +*/ +#define RUN_HOOK(group, hook, args) \ + (group ##_delegate->is_empty() ? \ + 0 : group ##_delegate->hook args) + +#endif /* RPL_HANDLER_H */ |