/* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include "sql_priv.h" #include "unireg.h" #include "rpl_mi.h" #include "sql_repl.h" #include "log_event.h" #include "rpl_filter.h" #include #include "rpl_handler.h" Trans_delegate *transaction_delegate; Binlog_storage_delegate *binlog_storage_delegate; #ifdef HAVE_REPLICATION Binlog_transmit_delegate *binlog_transmit_delegate; Binlog_relay_IO_delegate *binlog_relay_io_delegate; #endif /* HAVE_REPLICATION */ /* structure to save transaction log filename and position */ typedef struct Trans_binlog_info { my_off_t log_pos; char log_file[FN_REFLEN]; } Trans_binlog_info; static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); int get_user_var_int(const char *name, long long int *value, int *null_value) { bool null_val; user_var_entry *entry= (user_var_entry*) my_hash_search(¤t_thd->user_vars, (uchar*) name, strlen(name)); if (!entry) return 1; *value= entry->val_int(&null_val); if (null_value) *null_value= null_val; return 0; } int get_user_var_real(const char *name, double *value, int *null_value) { bool null_val; user_var_entry *entry= (user_var_entry*) my_hash_search(¤t_thd->user_vars, (uchar*) name, strlen(name)); if (!entry) return 1; *value= entry->val_real(&null_val); if (null_value) *null_value= null_val; return 0; } int get_user_var_str(const char *name, char *value, size_t len, unsigned int precision, int *null_value) { String str; bool null_val; user_var_entry *entry= (user_var_entry*) my_hash_search(¤t_thd->user_vars, (uchar*) name, strlen(name)); if (!entry) return 1; entry->val_str(&null_val, &str, precision); strncpy(value, str.c_ptr(), len); if (null_value) *null_value= null_val; return 0; } int delegates_init() { static my_aligned_storage trans_mem; static my_aligned_storage storage_mem; #ifdef HAVE_REPLICATION static my_aligned_storage transmit_mem; static my_aligned_storage relay_io_mem; #endif void *place_trans_mem= trans_mem.data; void *place_storage_mem= storage_mem.data; transaction_delegate= new (place_trans_mem) Trans_delegate; if (!transaction_delegate->is_inited()) { sql_print_error("Initialization of transaction delegates failed. " "Please report a bug."); return 1; } binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate; if (!binlog_storage_delegate->is_inited()) { sql_print_error("Initialization binlog storage delegates failed. " "Please report a bug."); return 1; } #ifdef HAVE_REPLICATION void *place_transmit_mem= transmit_mem.data; void *place_relay_io_mem= relay_io_mem.data; binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate; if (!binlog_transmit_delegate->is_inited()) { sql_print_error("Initialization of binlog transmit delegates failed. " "Please report a bug."); return 1; } binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate; if (!binlog_relay_io_delegate->is_inited()) { sql_print_error("Initialization binlog relay IO delegates failed. " "Please report a bug."); return 1; } #endif if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL)) { sql_print_error("Error while creating pthread specific data key for replication. " "Please report a bug."); return 1; } return 0; } void delegates_destroy() { if (transaction_delegate) transaction_delegate->~Trans_delegate(); if (binlog_storage_delegate) binlog_storage_delegate->~Binlog_storage_delegate(); #ifdef HAVE_REPLICATION if (binlog_transmit_delegate) binlog_transmit_delegate->~Binlog_transmit_delegate(); if (binlog_relay_io_delegate) binlog_relay_io_delegate->~Binlog_relay_IO_delegate(); #endif /* HAVE_REPLICATION */ } /* This macro is used by almost all the Delegate methods to iterate over all the observers running given callback function of the delegate. */ #define FOREACH_OBSERVER(r, f, do_lock, args) \ param.server_id= thd->variables.server_id; \ read_lock(); \ Observer_info_iterator iter= observer_info_iter(); \ Observer_info *info= iter++; \ for (; info; info= iter++) \ { \ if (do_lock) plugin_lock(thd, plugin_int_to_ref(info->plugin_int)); \ if (((Observer *)info->observer)->f \ && ((Observer *)info->observer)->f args) \ { \ r= 1; \ sql_print_error("Run function '" #f "' in plugin '%s' failed", \ info->plugin_int->name.str); \ break; \ } \ } \ unlock(); int Trans_delegate::after_commit(THD *thd, bool all) { Trans_param param; bool is_real_trans= (all || thd->transaction.all.ha_list == 0); param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0; Trans_binlog_info *log_info= my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); param.log_file= log_info ? log_info->log_file : 0; param.log_pos= log_info ? log_info->log_pos : 0; int ret= 0; FOREACH_OBSERVER(ret, after_commit, false, (¶m)); /* This is the end of a real transaction or autocommit statement, we can free the memory allocated for binlog file and position. */ if (is_real_trans && log_info) { my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL); my_free(log_info); } return ret; } int Trans_delegate::after_rollback(THD *thd, bool all) { Trans_param param; bool is_real_trans= (all || thd->transaction.all.ha_list == 0); param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0; Trans_binlog_info *log_info= my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); param.log_file= log_info ? log_info->log_file : 0; param.log_pos= log_info ? log_info->log_pos : 0; int ret= 0; FOREACH_OBSERVER(ret, after_rollback, false, (¶m)); /* This is the end of a real transaction or autocommit statement, we can free the memory allocated for binlog file and position. */ if (is_real_trans && log_info) { my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL); my_free(log_info); } return ret; } int Binlog_storage_delegate::after_flush(THD *thd, const char *log_file, my_off_t log_pos, bool synced, bool first_in_group, bool last_in_group) { Binlog_storage_param param; uint32 flags=0; if (synced) flags |= BINLOG_STORAGE_IS_SYNCED; if (first_in_group) flags|= BINLOG_GROUP_COMMIT_LEADER; if (last_in_group) flags|= BINLOG_GROUP_COMMIT_TRAILER; Trans_binlog_info *log_info= my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); if (!log_info) { if(!(log_info= (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0)))) return 1; my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info); } strcpy(log_info->log_file, log_file+dirname_length(log_file)); log_info->log_pos = log_pos; int ret= 0; FOREACH_OBSERVER(ret, after_flush, false, (¶m, log_info->log_file, log_info->log_pos, flags)); return ret; } int Binlog_storage_delegate::after_sync(THD *thd, const char *log_file, my_off_t log_pos, bool first_in_group, bool last_in_group) { Binlog_storage_param param; uint32 flags=0; if (first_in_group) flags|= BINLOG_GROUP_COMMIT_LEADER; if (last_in_group) flags|= BINLOG_GROUP_COMMIT_TRAILER; int ret= 0; FOREACH_OBSERVER(ret, after_sync, false, (¶m, log_file+dirname_length(log_file), log_pos, flags)); return ret; } #ifdef HAVE_REPLICATION int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags, const char *log_file, my_off_t log_pos) { Binlog_transmit_param param; param.flags= flags; int ret= 0; FOREACH_OBSERVER(ret, transmit_start, true, (¶m, log_file, log_pos)); return ret; } int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags) { Binlog_transmit_param param; param.flags= flags; int ret= 0; FOREACH_OBSERVER(ret, transmit_stop, false, (¶m)); return ret; } int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags, String *packet) { /* NOTE2ME: Maximum extra header size for each observer, I hope 32 bytes should be enough for each Observer to reserve their extra header. If later found this is not enough, we can increase this /HEZX */ #define RESERVE_HEADER_SIZE 32 unsigned char header[RESERVE_HEADER_SIZE]; ulong hlen; Binlog_transmit_param param; param.flags= flags; param.server_id= thd->variables.server_id; int ret= 0; read_lock(); Observer_info_iterator iter= observer_info_iter(); Observer_info *info= iter++; for (; info; info= iter++) { hlen= 0; if (((Observer *)info->observer)->reserve_header && ((Observer *)info->observer)->reserve_header(¶m, header, RESERVE_HEADER_SIZE, &hlen)) { ret= 1; break; } if (hlen == 0) continue; if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen)) { ret= 1; break; } } unlock(); return ret; } int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags, String *packet, const char *log_file, my_off_t log_pos) { Binlog_transmit_param param; param.flags= flags; int ret= 0; FOREACH_OBSERVER(ret, before_send_event, false, (¶m, (uchar *)packet->c_ptr(), packet->length(), log_file+dirname_length(log_file), log_pos)); return ret; } int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags, String *packet) { Binlog_transmit_param param; param.flags= flags; int ret= 0; FOREACH_OBSERVER(ret, after_send_event, false, (¶m, packet->c_ptr(), packet->length())); return ret; } int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags) { Binlog_transmit_param param; param.flags= flags; int ret= 0; FOREACH_OBSERVER(ret, after_reset_master, false, (¶m)); return ret; } void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param, Master_info *mi) { param->mysql= mi->mysql; param->user= mi->user; param->host= mi->host; param->port= mi->port; param->master_log_name= mi->master_log_name; param->master_log_pos= mi->master_log_pos; } int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi) { Binlog_relay_IO_param param; init_param(¶m, mi); int ret= 0; FOREACH_OBSERVER(ret, thread_start, true, (¶m)); return ret; } int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi) { Binlog_relay_IO_param param; init_param(¶m, mi); int ret= 0; FOREACH_OBSERVER(ret, thread_stop, false, (¶m)); return ret; } int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, Master_info *mi, ushort flags) { Binlog_relay_IO_param param; init_param(¶m, mi); int ret= 0; FOREACH_OBSERVER(ret, before_request_transmit, false, (¶m, (uint32)flags)); return ret; } int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi, const char *packet, ulong len, const char **event_buf, ulong *event_len) { Binlog_relay_IO_param param; init_param(¶m, mi); int ret= 0; FOREACH_OBSERVER(ret, after_read_event, false, (¶m, packet, len, event_buf, event_len)); return ret; } int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi, const char *event_buf, ulong event_len, bool synced) { Binlog_relay_IO_param param; init_param(¶m, mi); uint32 flags=0; if (synced) flags |= BINLOG_STORAGE_IS_SYNCED; int ret= 0; FOREACH_OBSERVER(ret, after_queue_event, false, (¶m, event_buf, event_len, flags)); return ret; } int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi) { Binlog_relay_IO_param param; init_param(¶m, mi); int ret= 0; FOREACH_OBSERVER(ret, after_reset_slave, false, (¶m)); return ret; } #endif /* HAVE_REPLICATION */ int register_trans_observer(Trans_observer *observer, void *p) { return transaction_delegate->add_observer(observer, (st_plugin_int *)p); } int unregister_trans_observer(Trans_observer *observer, void *p) { return transaction_delegate->remove_observer(observer, (st_plugin_int *)p); } int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p) { return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p); } int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p) { return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p); } #ifdef HAVE_REPLICATION int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) { return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p); } int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) { return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p); } int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) { return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p); } int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) { return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p); } #else int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) { return 0; } int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) { return 0; } int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) { return 0; } int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) { return 0; } #endif /* HAVE_REPLICATION */