summaryrefslogtreecommitdiff
path: root/sql/rpl_handler.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_handler.cc')
-rw-r--r--sql/rpl_handler.cc78
1 files changed, 76 insertions, 2 deletions
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
index 733af6c61c8..574fc4938a3 100644
--- a/sql/rpl_handler.cc
+++ b/sql/rpl_handler.cc
@@ -23,6 +23,7 @@
#include "rpl_filter.h"
#include <my_dir.h>
#include "rpl_handler.h"
+#include "sql_prepare.h"
Trans_delegate *transaction_delegate;
Binlog_storage_delegate *binlog_storage_delegate;
@@ -86,6 +87,42 @@ int get_user_var_str(const char *name, char *value,
return 0;
}
+int set_user_var_int(const char *name,
+ long long int value,
+ long long int *old_value)
+{
+ THD* thd= current_thd;
+ bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (entry != NULL)
+ {
+ if (old_value != NULL)
+ *old_value= entry->val_int(&null_val);
+ }
+
+ Ed_connection con(thd);
+
+ char buf[256];
+ int res= snprintf(buf, sizeof(buf), "SET @%s=%lld", name, value);
+ if (/* error */ res < 0 ||
+ /* truncated */ res >= sizeof(buf))
+ {
+ return -1;
+ }
+
+ LEX_STRING str;
+ lex_string_set(&str, buf);
+
+ if (con.execute_direct(str))
+ {
+ return -1;
+ }
+
+ return entry == NULL ? 0 : 1;
+}
+
int delegates_init()
{
static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
@@ -240,6 +277,17 @@ int Trans_delegate::after_rollback(THD *thd, bool all)
return ret;
}
+int Trans_delegate::before_commit(THD *thd)
+{
+ int ret= 0, error= 0;
+ Trans_param param;
+ param.flags= 0;
+ param.log_file= 0;
+ param.log_pos= 0;
+ FOREACH_OBSERVER(ret, before_commit, thd, (&param, &error));
+ return error;
+}
+
int Binlog_storage_delegate::after_flush(THD *thd,
const char *log_file,
my_off_t log_pos,
@@ -364,17 +412,19 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
String *packet,
- const char *log_file,
+ const char *log_file_path,
my_off_t log_pos)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
+ const char* log_file_name= log_file_path != NULL ?
+ log_file_path + dirname_length(log_file_path) : NULL;
FOREACH_OBSERVER(ret, before_send_event, false,
(&param, (uchar *)packet->c_ptr(),
packet->length(),
- log_file+dirname_length(log_file), log_pos));
+ log_file_name, log_pos));
return ret;
}
@@ -404,6 +454,7 @@ int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
Master_info *mi)
{
+ param->mi = mi;
param->mysql= mi->mysql;
param->user= mi->user;
param->host= mi->host;
@@ -530,6 +581,20 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
{
return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
}
+
+/* get master log pos for a Master_info struct */
+size_t get_master_log_pos(const Master_info* mi,
+ char *filename_buf, my_off_t *filepos)
+{
+ mysql_mutex_t *mutex= &mi->rli.data_lock;
+
+ mysql_mutex_lock(mutex);
+ *filepos= mi->rli.group_master_log_pos;
+ strncpy(filename_buf, mi->rli.group_master_log_name, FN_REFLEN);
+ mysql_mutex_unlock(mutex);
+ return strnlen(filename_buf, FN_REFLEN);
+}
+
#else
int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
{
@@ -550,4 +615,13 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
{
return 0;
}
+
+size_t get_master_log_pos(const Master_info* mi,
+ char *filename_buf, my_off_t *filepos)
+{
+ *filepos= 0;
+ filename_buf[0]= 0;
+ return 0;
+}
+
#endif /* HAVE_REPLICATION */