summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/item_func.cc49
-rw-r--r--sql/log.cc17
-rw-r--r--sql/log_event.cc240
-rw-r--r--sql/log_event.h50
-rw-r--r--sql/sql_class.cc12
-rw-r--r--sql/sql_class.h13
-rw-r--r--sql/sql_parse.cc6
-rw-r--r--sql/unireg.h2
8 files changed, 385 insertions, 4 deletions
diff --git a/sql/item_func.cc b/sql/item_func.cc
index dcf4638c48a..58f2f6c90a7 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -1940,6 +1940,7 @@ static user_var_entry *get_variable(HASH *hash, LEX_STRING &name,
entry->value=0;
entry->length=0;
entry->update_query_id=0;
+ entry->used_query_id=current_thd->query_id;
entry->type=STRING_RESULT;
memcpy(entry->name.str, name.str, name.length+1);
if (hash_insert(hash,(byte*) entry))
@@ -2073,7 +2074,7 @@ Item_func_set_user_var::val_str(String *str)
{
String *res=args[0]->val_str(str);
if (!res) // Null value
- update_hash((void*) 0,0,STRING_RESULT, default_charset_info);
+ update_hash((void*) 0, 0, STRING_RESULT, default_charset_info);
else
update_hash(res->c_ptr(),res->length()+1,STRING_RESULT,res->charset());
return res;
@@ -2172,14 +2173,58 @@ longlong Item_func_get_user_var::val_int()
return LL(0); // Impossible
}
+/* From sql_parse.cc */
+extern bool is_update_query(enum enum_sql_command command);
void Item_func_get_user_var::fix_length_and_dec()
{
+ BINLOG_USER_VAR_EVENT *user_var_event;
THD *thd=current_thd;
maybe_null=1;
decimals=NOT_FIXED_DEC;
max_length=MAX_BLOB_WIDTH;
- var_entry= get_variable(&thd->user_vars, name, 0);
+
+ if ((var_entry= get_variable(&thd->user_vars, name, 0)))
+ {
+ if (opt_bin_log && is_update_query(thd->lex.sql_command) &&
+ var_entry->used_query_id != thd->query_id)
+ {
+ /*
+ First we need to store value of var_entry, when the next situation appers:
+ > set @a:=1;
+ > insert into t1 values (@a), (@a:=@a+1), (@a:=@a+1);
+ We have to write to binlog value @a= 1;
+ */
+ uint size= ALIGN_SIZE(sizeof(BINLOG_USER_VAR_EVENT)) + var_entry->length;
+ if (!(user_var_event= (BINLOG_USER_VAR_EVENT *) thd->alloc(size)))
+ goto err;
+
+ user_var_event->value= (char*) user_var_event +
+ ALIGN_SIZE(sizeof(BINLOG_USER_VAR_EVENT));
+ user_var_event->user_var_event= var_entry;
+ user_var_event->type= var_entry->type;
+ user_var_event->charset_number= var_entry->var_charset->number;
+ if (!var_entry->value)
+ {
+ /* NULL value*/
+ user_var_event->length= 0;
+ user_var_event->value= 0;
+ }
+ else
+ {
+ user_var_event->length= var_entry->length;
+ memcpy(user_var_event->value, var_entry->value,
+ var_entry->length);
+ }
+ var_entry->used_query_id= thd->query_id;
+ if (insert_dynamic(&thd->user_var_events, (gptr) &user_var_event))
+ goto err;
+ }
+ }
+ return;
+err:
+ thd->fatal_error= 1;
+ return;
}
diff --git a/sql/log.cc b/sql/log.cc
index 5dcb5857026..b83dfce6fab 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -1097,6 +1097,23 @@ bool MYSQL_LOG::write(Log_event* event_info)
if (e.write(file))
goto err;
}
+ if (thd->user_var_events.elements)
+ {
+ for (uint i= 0; i < thd->user_var_events.elements; i++)
+ {
+ BINLOG_USER_VAR_EVENT *user_var_event;
+ get_dynamic(&thd->user_var_events,(gptr) &user_var_event, i);
+ User_var_log_event e(thd, user_var_event->user_var_event->name.str,
+ user_var_event->user_var_event->name.length,
+ user_var_event->value,
+ user_var_event->length,
+ user_var_event->type,
+ user_var_event->charset_number);
+ e.set_log_pos(this);
+ if (e.write(file))
+ goto err;
+ }
+ }
if (thd->variables.convert_set)
{
char buf[256], *p;
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 8f98fa511a0..b46d3b93d12 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -227,6 +227,7 @@ const char* Log_event::get_type_str()
case DELETE_FILE_EVENT: return "Delete_file";
case EXEC_LOAD_EVENT: return "Exec_load";
case RAND_EVENT: return "RAND";
+ case USER_VAR_EVENT: return "User var";
default: /* impossible */ return "Unknown";
}
}
@@ -593,6 +594,9 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
case RAND_EVENT:
ev = new Rand_log_event(buf, old_format);
break;
+ case USER_VAR_EVENT:
+ ev = new User_var_log_event(buf, old_format);
+ break;
default:
break;
}
@@ -1894,6 +1898,242 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli)
}
#endif // !MYSQL_CLIENT
+/*****************************************************************************
+ *****************************************************************************
+
+ User_var_log_event methods
+
+ *****************************************************************************
+ ****************************************************************************/
+
+/*****************************************************************************
+
+ User_var_log_event::pack_info()
+
+ ****************************************************************************/
+#ifndef MYSQL_CLIENT
+void User_var_log_event::pack_info(Protocol* protocol)
+{
+ char *buf= 0;
+ uint val_offset= 2 + name_len;
+ uint event_len= val_offset;
+
+ if (is_null)
+ {
+ buf= my_malloc(val_offset + 5, MYF(MY_WME));
+ strmov(buf + val_offset, "NULL");
+ event_len= val_offset + 4;
+ }
+ else
+ {
+ switch (type) {
+ case REAL_RESULT:
+ double real_val;
+ float8get(real_val, val);
+ buf= my_malloc(val_offset + FLOATING_POINT_BUFFER, MYF(MY_WME));
+ event_len += my_sprintf(buf + val_offset,
+ (buf + val_offset, "%.14g", real_val));
+ break;
+ case INT_RESULT:
+ buf= my_malloc(val_offset + 22, MYF(MY_WME));
+ event_len= longlong10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
+ break;
+ case STRING_RESULT:
+ /*
+ This is correct as pack_info is used for SHOW BINLOG command
+ only. But be carefull this is may be incorrect in other cases as
+ string may contain \ and '.
+ */
+ buf= my_malloc(val_offset + 2 + val_len, MYF(MY_WME));
+ buf[val_offset]= '\'';
+ memcpy(buf + val_offset + 1, val, val_len);
+ buf[val_offset + val_len]= '\'';
+ event_len= val_offset + 1 + val_len;
+ break;
+ case ROW_RESULT:
+ DBUG_ASSERT(1);
+ return;
+ }
+ }
+ buf[0]= '@';
+ buf[1+name_len]= '=';
+ memcpy(buf+1, name, name_len);
+ protocol->store(buf, event_len);
+ my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
+}
+#endif // !MYSQL_CLIENT
+/*****************************************************************************
+
+ User_var_log_event::User_var_log_event()
+
+ ****************************************************************************/
+User_var_log_event::User_var_log_event(const char* buf, bool old_format)
+ :Log_event(buf, old_format)
+{
+ buf+= (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
+ name_len= uint4korr(buf);
+ name= (char *) buf + UV_NAME_LEN_SIZE;
+ is_null= buf[UV_NAME_LEN_SIZE + name_len];
+ if (is_null)
+ {
+ type= STRING_RESULT;
+ val_len= 0;
+ val= 0;
+ }
+ else
+ {
+ type= (Item_result) buf[UV_VAL_IS_NULL + UV_NAME_LEN_SIZE + name_len];
+ charset_number= uint4korr(buf + UV_NAME_LEN_SIZE + name_len +
+ UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
+ val_len= uint4korr(buf + UV_NAME_LEN_SIZE + name_len +
+ UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
+ UV_CHARSET_NUMBER_SIZE);
+ val= (char *) buf + UV_NAME_LEN_SIZE + name_len + UV_VAL_IS_NULL +
+ UV_VAL_TYPE_SIZE + UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE;
+ }
+}
+
+/*****************************************************************************
+
+ User_var_log_event::write_data()
+
+ ****************************************************************************/
+int User_var_log_event::write_data(IO_CACHE* file)
+{
+ char buf[UV_NAME_LEN_SIZE];
+ char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
+ UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
+ char buf2[8];
+ char *pos= buf2;
+ int4store(buf, name_len);
+ buf1[0]= is_null;
+ if (!is_null)
+ {
+ buf1[1]= type;
+ int4store(buf1 + 2, charset_number);
+ int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
+
+ switch (type) {
+ case REAL_RESULT:
+ float8store(buf2, *(double*) val);
+ break;
+ case INT_RESULT:
+ int8store(buf2, *(longlong*) val);
+ break;
+ case STRING_RESULT:
+ pos= val;
+ break;
+ case ROW_RESULT:
+ DBUG_ASSERT(1);
+ return 0;
+ }
+ return (my_b_safe_write(file, (byte*) buf, sizeof(buf)) ||
+ my_b_safe_write(file, (byte*) name, name_len) ||
+ my_b_safe_write(file, (byte*) buf1, sizeof(buf1)) ||
+ my_b_safe_write(file, (byte*) pos, val_len));
+ }
+
+ return (my_b_safe_write(file, (byte*) buf, sizeof(buf)) ||
+ my_b_safe_write(file, (byte*) name, name_len) ||
+ my_b_safe_write(file, (byte*) buf1, 1));
+}
+
+/*****************************************************************************
+
+ User_var_log_event::print()
+
+ ****************************************************************************/
+#ifdef MYSQL_CLIENT
+void User_var_log_event::print(FILE* file, bool short_form, char* last_db)
+{
+ if (!short_form)
+ {
+ print_header(file);
+ fprintf(file, "\tUser_var\n");
+ }
+
+ fprintf(file, "SET @");
+ my_fwrite(file, (byte*) name, (uint) (name_len), MYF(MY_NABP | MY_WME));
+
+ if (is_null)
+ {
+ fprintf(file, ":=NULL;\n");
+ }
+ else
+ {
+ switch (type) {
+ case REAL_RESULT:
+ double real_val;
+ float8get(real_val, val);
+ fprintf(file, ":=%.14g;\n", real_val);
+ break;
+ case INT_RESULT:
+ char int_buf[22];
+ longlong10_to_str(uint8korr(val), int_buf, -10);
+ fprintf(file, ":=%s;\n", int_buf);
+ break;
+ case STRING_RESULT:
+ fprintf(file, ":='%s';\n", val);
+ break;
+ case ROW_RESULT:
+ DBUG_ASSERT(1);
+ return;
+ }
+ }
+ fflush(file);
+}
+#endif
+
+/*****************************************************************************
+
+ User_var_log_event::exec_event()
+
+ ****************************************************************************/
+#ifndef MYSQL_CLIENT
+int User_var_log_event::exec_event(struct st_relay_log_info* rli)
+{
+ Item *it= 0;
+ CHARSET_INFO *charset= log_cs;
+ LEX_STRING user_var_name;
+ user_var_name.str= name;
+ user_var_name.length= name_len;
+
+ if (type != ROW_RESULT)
+ init_sql_alloc(&thd->mem_root, 8192,0);
+
+ if (is_null)
+ {
+ it= new Item_null();
+ }
+ else
+ {
+ switch (type) {
+ case REAL_RESULT:
+ double real_val;
+ float8get(real_val, val);
+ it= new Item_real(real_val);
+ break;
+ case INT_RESULT:
+ it= new Item_int((longlong) uint8korr(val));
+ break;
+ case STRING_RESULT:
+ it= new Item_string(val, val_len, charset);
+ break;
+ case ROW_RESULT:
+ DBUG_ASSERT(1);
+ return 0;
+ }
+ charset= get_charset(charset_number, MYF(0));
+ }
+ Item_func_set_user_var e(user_var_name, it);
+ e.fix_fields(thd, 0, 0);
+ e.update_hash(val, val_len, type, charset);
+ free_root(&thd->mem_root,0);
+
+ rli->inc_pending(get_event_len());
+ return 0;
+}
+#endif // !MYSQL_CLIENT
/*****************************************************************************
*****************************************************************************
diff --git a/sql/log_event.h b/sql/log_event.h
index ec3b4819e74..f7a5dbdc406 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -173,6 +173,14 @@ struct sql_ex_info
#define RAND_SEED1_OFFSET 0
#define RAND_SEED2_OFFSET 8
+/* User_var event post-header */
+
+#define UV_VAL_LEN_SIZE 4
+#define UV_VAL_IS_NULL 1
+#define UV_VAL_TYPE_SIZE 1
+#define UV_NAME_LEN_SIZE 4
+#define UV_CHARSET_NUMBER_SIZE 4
+
/* Load event post-header */
#define L_THREAD_ID_OFFSET 0
@@ -222,7 +230,7 @@ enum Log_event_type
START_EVENT = 1, QUERY_EVENT =2, STOP_EVENT=3, ROTATE_EVENT = 4,
INTVAR_EVENT=5, LOAD_EVENT=6, SLAVE_EVENT=7, CREATE_FILE_EVENT=8,
APPEND_BLOCK_EVENT=9, EXEC_LOAD_EVENT=10, DELETE_FILE_EVENT=11,
- NEW_LOAD_EVENT=12, RAND_EVENT=13
+ NEW_LOAD_EVENT=12, RAND_EVENT=13, USER_VAR_EVENT=14
};
enum Int_event_type
@@ -590,6 +598,46 @@ class Rand_log_event: public Log_event
bool is_valid() { return 1; }
};
+/*****************************************************************************
+
+ User var Log Event class
+
+ ****************************************************************************/
+class User_var_log_event: public Log_event
+{
+public:
+ char *name;
+ uint name_len;
+ char *val;
+ ulong val_len;
+ Item_result type;
+ uint charset_number;
+ byte is_null;
+#ifndef MYSQL_CLIENT
+ User_var_log_event(THD* thd_arg, char *name_arg, uint name_len_arg,
+ char *val_arg, ulong val_len_arg, Item_result type_arg,
+ uint charset_number_arg)
+ :Log_event(), name(name_arg), name_len(name_len_arg), val(val_arg),
+ val_len(val_len_arg), type(type_arg), charset_number(charset_number_arg)
+ { is_null= !val; }
+ void pack_info(Protocol* protocol);
+ int exec_event(struct st_relay_log_info* rli);
+#else
+ void print(FILE* file, bool short_form = 0, char* last_db = 0);
+#endif
+
+ User_var_log_event(const char* buf, bool old_format);
+ ~User_var_log_event() {}
+ Log_event_type get_type_code() { return USER_VAR_EVENT;}
+ int get_data_size()
+ {
+ return (is_null ? UV_NAME_LEN_SIZE + name_len + UV_VAL_IS_NULL :
+ UV_NAME_LEN_SIZE + name_len + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
+ UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE + val_len);
+ }
+ int write_data(IO_CACHE* file);
+ bool is_valid() { return 1; }
+};
/*****************************************************************************
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 3ca1f4827ff..8a02d91f2db 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -145,6 +145,17 @@ THD::THD():user_time(0), fatal_error(0),
(hash_get_key) get_var_key,
(hash_free_key) free_user_var,0);
+ /* For user vars replication*/
+ if (opt_bin_log)
+ my_init_dynamic_array(&user_var_events,
+ sizeof(BINLOG_USER_VAR_EVENT *),
+ 16,
+ 16);
+ else
+ bzero((char*) &user_var_events, sizeof(user_var_events));
+
+
+
/* Prepared statements */
last_prepared_stmt= 0;
init_tree(&prepared_statements, 0, 0, sizeof(PREP_STMT),
@@ -244,6 +255,7 @@ void THD::cleanup(void)
close_thread_tables(this);
}
close_temporary_tables(this);
+ delete_dynamic(&user_var_events);
hash_free(&user_vars);
if (global_read_lock)
unlock_global_read_lock(this);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index bc72e6324e9..5b50da1ebac 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -57,6 +57,15 @@ typedef struct st_log_info
~st_log_info() { pthread_mutex_destroy(&lock);}
} LOG_INFO;
+typedef struct st_user_var_events
+{
+ user_var_entry *user_var_event;
+ char *value;
+ ulong length;
+ Item_result type;
+ uint charset_number;
+} BINLOG_USER_VAR_EVENT;
+
class Log_event;
class MYSQL_LOG {
@@ -511,6 +520,8 @@ public:
uint check_loops_counter; //last id used to check loops
/* variables.transaction_isolation is reset to this after each commit */
enum_tx_isolation session_tx_isolation;
+ /* for user variables replication*/
+ DYNAMIC_ARRAY user_var_events;
// extend scramble to handle new auth
char scramble[SCRAMBLE41_LENGTH+1];
// old scramble is needed to handle old clients
@@ -896,7 +907,7 @@ class user_var_entry
public:
LEX_STRING name;
char *value;
- ulong length, update_query_id;
+ ulong length, update_query_id, used_query_id;
Item_result type;
CHARSET_INFO *var_charset;
};
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index b2f08b4e0d4..e748a3acae9 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -390,6 +390,10 @@ void init_update_queries(void)
uc_update_queries[SQLCOM_UPDATE_MULTI]=1;
}
+bool is_update_query(enum enum_sql_command command)
+{
+ return uc_update_queries[command];
+}
/*
Check if maximum queries per hour limit has been reached
@@ -3077,6 +3081,8 @@ mysql_init_query(THD *thd)
thd->sent_row_count= thd->examined_row_count= 0;
thd->fatal_error= thd->rand_used= 0;
thd->possible_loops= 0;
+ if (opt_bin_log)
+ reset_dynamic(&thd->user_var_events);
DBUG_VOID_RETURN;
}
diff --git a/sql/unireg.h b/sql/unireg.h
index 724ff3f6197..2dc84720b2d 100644
--- a/sql/unireg.h
+++ b/sql/unireg.h
@@ -136,6 +136,8 @@ bfill((A)->null_flags,(A)->null_bytes,255);\
#define BIN_LOG_HEADER_SIZE 4
+#define FLOATING_POINT_BUFFER 331
+
/* Include prototypes for unireg */
#include "mysqld_error.h"