diff options
Diffstat (limited to 'sql/rpl_handler.cc')
-rw-r--r-- | sql/rpl_handler.cc | 78 |
1 files changed, 76 insertions, 2 deletions
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc index 733af6c61c8..574fc4938a3 100644 --- a/sql/rpl_handler.cc +++ b/sql/rpl_handler.cc @@ -23,6 +23,7 @@ #include "rpl_filter.h" #include <my_dir.h> #include "rpl_handler.h" +#include "sql_prepare.h" Trans_delegate *transaction_delegate; Binlog_storage_delegate *binlog_storage_delegate; @@ -86,6 +87,42 @@ int get_user_var_str(const char *name, char *value, return 0; } +int set_user_var_int(const char *name, + long long int value, + long long int *old_value) +{ + THD* thd= current_thd; + bool null_val; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, + (uchar*) name, strlen(name)); + if (entry != NULL) + { + if (old_value != NULL) + *old_value= entry->val_int(&null_val); + } + + Ed_connection con(thd); + + char buf[256]; + int res= snprintf(buf, sizeof(buf), "SET @%s=%lld", name, value); + if (/* error */ res < 0 || + /* truncated */ res >= sizeof(buf)) + { + return -1; + } + + LEX_STRING str; + lex_string_set(&str, buf); + + if (con.execute_direct(str)) + { + return -1; + } + + return entry == NULL ? 0 : 1; +} + int delegates_init() { static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem; @@ -240,6 +277,17 @@ int Trans_delegate::after_rollback(THD *thd, bool all) return ret; } +int Trans_delegate::before_commit(THD *thd) +{ + int ret= 0, error= 0; + Trans_param param; + param.flags= 0; + param.log_file= 0; + param.log_pos= 0; + FOREACH_OBSERVER(ret, before_commit, thd, (¶m, &error)); + return error; +} + int Binlog_storage_delegate::after_flush(THD *thd, const char *log_file, my_off_t log_pos, @@ -364,17 +412,19 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags, int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags, String *packet, - const char *log_file, + const char *log_file_path, my_off_t log_pos) { Binlog_transmit_param param; param.flags= flags; int ret= 0; + const char* log_file_name= log_file_path != NULL ? + log_file_path + dirname_length(log_file_path) : NULL; FOREACH_OBSERVER(ret, before_send_event, false, (¶m, (uchar *)packet->c_ptr(), packet->length(), - log_file+dirname_length(log_file), log_pos)); + log_file_name, log_pos)); return ret; } @@ -404,6 +454,7 @@ int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags) void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param, Master_info *mi) { + param->mi = mi; param->mysql= mi->mysql; param->user= mi->user; param->host= mi->host; @@ -530,6 +581,20 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void { return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p); } + +/* get master log pos for a Master_info struct */ +size_t get_master_log_pos(const Master_info* mi, + char *filename_buf, my_off_t *filepos) +{ + mysql_mutex_t *mutex= &mi->rli.data_lock; + + mysql_mutex_lock(mutex); + *filepos= mi->rli.group_master_log_pos; + strncpy(filename_buf, mi->rli.group_master_log_name, FN_REFLEN); + mysql_mutex_unlock(mutex); + return strnlen(filename_buf, FN_REFLEN); +} + #else int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) { @@ -550,4 +615,13 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void { return 0; } + +size_t get_master_log_pos(const Master_info* mi, + char *filename_buf, my_off_t *filepos) +{ + *filepos= 0; + filename_buf[0]= 0; + return 0; +} + #endif /* HAVE_REPLICATION */ |