diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/item_func.cc | 49 | ||||
-rw-r--r-- | sql/log.cc | 17 | ||||
-rw-r--r-- | sql/log_event.cc | 240 | ||||
-rw-r--r-- | sql/log_event.h | 50 | ||||
-rw-r--r-- | sql/sql_class.cc | 12 | ||||
-rw-r--r-- | sql/sql_class.h | 13 | ||||
-rw-r--r-- | sql/sql_parse.cc | 6 | ||||
-rw-r--r-- | sql/unireg.h | 2 |
8 files changed, 385 insertions, 4 deletions
diff --git a/sql/item_func.cc b/sql/item_func.cc index dcf4638c48a..58f2f6c90a7 100644 --- a/sql/item_func.cc +++ b/sql/item_func.cc @@ -1940,6 +1940,7 @@ static user_var_entry *get_variable(HASH *hash, LEX_STRING &name, entry->value=0; entry->length=0; entry->update_query_id=0; + entry->used_query_id=current_thd->query_id; entry->type=STRING_RESULT; memcpy(entry->name.str, name.str, name.length+1); if (hash_insert(hash,(byte*) entry)) @@ -2073,7 +2074,7 @@ Item_func_set_user_var::val_str(String *str) { String *res=args[0]->val_str(str); if (!res) // Null value - update_hash((void*) 0,0,STRING_RESULT, default_charset_info); + update_hash((void*) 0, 0, STRING_RESULT, default_charset_info); else update_hash(res->c_ptr(),res->length()+1,STRING_RESULT,res->charset()); return res; @@ -2172,14 +2173,58 @@ longlong Item_func_get_user_var::val_int() return LL(0); // Impossible } +/* From sql_parse.cc */ +extern bool is_update_query(enum enum_sql_command command); void Item_func_get_user_var::fix_length_and_dec() { + BINLOG_USER_VAR_EVENT *user_var_event; THD *thd=current_thd; maybe_null=1; decimals=NOT_FIXED_DEC; max_length=MAX_BLOB_WIDTH; - var_entry= get_variable(&thd->user_vars, name, 0); + + if ((var_entry= get_variable(&thd->user_vars, name, 0))) + { + if (opt_bin_log && is_update_query(thd->lex.sql_command) && + var_entry->used_query_id != thd->query_id) + { + /* + First we need to store value of var_entry, when the next situation appers: + > set @a:=1; + > insert into t1 values (@a), (@a:=@a+1), (@a:=@a+1); + We have to write to binlog value @a= 1; + */ + uint size= ALIGN_SIZE(sizeof(BINLOG_USER_VAR_EVENT)) + var_entry->length; + if (!(user_var_event= (BINLOG_USER_VAR_EVENT *) thd->alloc(size))) + goto err; + + user_var_event->value= (char*) user_var_event + + ALIGN_SIZE(sizeof(BINLOG_USER_VAR_EVENT)); + user_var_event->user_var_event= var_entry; + user_var_event->type= var_entry->type; + user_var_event->charset_number= var_entry->var_charset->number; + if (!var_entry->value) + { + /* NULL value*/ + user_var_event->length= 0; + user_var_event->value= 0; + } + else + { + user_var_event->length= var_entry->length; + memcpy(user_var_event->value, var_entry->value, + var_entry->length); + } + var_entry->used_query_id= thd->query_id; + if (insert_dynamic(&thd->user_var_events, (gptr) &user_var_event)) + goto err; + } + } + return; +err: + thd->fatal_error= 1; + return; } diff --git a/sql/log.cc b/sql/log.cc index 5dcb5857026..b83dfce6fab 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -1097,6 +1097,23 @@ bool MYSQL_LOG::write(Log_event* event_info) if (e.write(file)) goto err; } + if (thd->user_var_events.elements) + { + for (uint i= 0; i < thd->user_var_events.elements; i++) + { + BINLOG_USER_VAR_EVENT *user_var_event; + get_dynamic(&thd->user_var_events,(gptr) &user_var_event, i); + User_var_log_event e(thd, user_var_event->user_var_event->name.str, + user_var_event->user_var_event->name.length, + user_var_event->value, + user_var_event->length, + user_var_event->type, + user_var_event->charset_number); + e.set_log_pos(this); + if (e.write(file)) + goto err; + } + } if (thd->variables.convert_set) { char buf[256], *p; diff --git a/sql/log_event.cc b/sql/log_event.cc index 8f98fa511a0..b46d3b93d12 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -227,6 +227,7 @@ const char* Log_event::get_type_str() case DELETE_FILE_EVENT: return "Delete_file"; case EXEC_LOAD_EVENT: return "Exec_load"; case RAND_EVENT: return "RAND"; + case USER_VAR_EVENT: return "User var"; default: /* impossible */ return "Unknown"; } } @@ -593,6 +594,9 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, case RAND_EVENT: ev = new Rand_log_event(buf, old_format); break; + case USER_VAR_EVENT: + ev = new User_var_log_event(buf, old_format); + break; default: break; } @@ -1894,6 +1898,242 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli) } #endif // !MYSQL_CLIENT +/***************************************************************************** + ***************************************************************************** + + User_var_log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + User_var_log_event::pack_info() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +void User_var_log_event::pack_info(Protocol* protocol) +{ + char *buf= 0; + uint val_offset= 2 + name_len; + uint event_len= val_offset; + + if (is_null) + { + buf= my_malloc(val_offset + 5, MYF(MY_WME)); + strmov(buf + val_offset, "NULL"); + event_len= val_offset + 4; + } + else + { + switch (type) { + case REAL_RESULT: + double real_val; + float8get(real_val, val); + buf= my_malloc(val_offset + FLOATING_POINT_BUFFER, MYF(MY_WME)); + event_len += my_sprintf(buf + val_offset, + (buf + val_offset, "%.14g", real_val)); + break; + case INT_RESULT: + buf= my_malloc(val_offset + 22, MYF(MY_WME)); + event_len= longlong10_to_str(uint8korr(val), buf + val_offset,-10)-buf; + break; + case STRING_RESULT: + /* + This is correct as pack_info is used for SHOW BINLOG command + only. But be carefull this is may be incorrect in other cases as + string may contain \ and '. + */ + buf= my_malloc(val_offset + 2 + val_len, MYF(MY_WME)); + buf[val_offset]= '\''; + memcpy(buf + val_offset + 1, val, val_len); + buf[val_offset + val_len]= '\''; + event_len= val_offset + 1 + val_len; + break; + case ROW_RESULT: + DBUG_ASSERT(1); + return; + } + } + buf[0]= '@'; + buf[1+name_len]= '='; + memcpy(buf+1, name, name_len); + protocol->store(buf, event_len); + my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); +} +#endif // !MYSQL_CLIENT +/***************************************************************************** + + User_var_log_event::User_var_log_event() + + ****************************************************************************/ +User_var_log_event::User_var_log_event(const char* buf, bool old_format) + :Log_event(buf, old_format) +{ + buf+= (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; + name_len= uint4korr(buf); + name= (char *) buf + UV_NAME_LEN_SIZE; + is_null= buf[UV_NAME_LEN_SIZE + name_len]; + if (is_null) + { + type= STRING_RESULT; + val_len= 0; + val= 0; + } + else + { + type= (Item_result) buf[UV_VAL_IS_NULL + UV_NAME_LEN_SIZE + name_len]; + charset_number= uint4korr(buf + UV_NAME_LEN_SIZE + name_len + + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE); + val_len= uint4korr(buf + UV_NAME_LEN_SIZE + name_len + + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + + UV_CHARSET_NUMBER_SIZE); + val= (char *) buf + UV_NAME_LEN_SIZE + name_len + UV_VAL_IS_NULL + + UV_VAL_TYPE_SIZE + UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE; + } +} + +/***************************************************************************** + + User_var_log_event::write_data() + + ****************************************************************************/ +int User_var_log_event::write_data(IO_CACHE* file) +{ + char buf[UV_NAME_LEN_SIZE]; + char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + + UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE]; + char buf2[8]; + char *pos= buf2; + int4store(buf, name_len); + buf1[0]= is_null; + if (!is_null) + { + buf1[1]= type; + int4store(buf1 + 2, charset_number); + int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len); + + switch (type) { + case REAL_RESULT: + float8store(buf2, *(double*) val); + break; + case INT_RESULT: + int8store(buf2, *(longlong*) val); + break; + case STRING_RESULT: + pos= val; + break; + case ROW_RESULT: + DBUG_ASSERT(1); + return 0; + } + return (my_b_safe_write(file, (byte*) buf, sizeof(buf)) || + my_b_safe_write(file, (byte*) name, name_len) || + my_b_safe_write(file, (byte*) buf1, sizeof(buf1)) || + my_b_safe_write(file, (byte*) pos, val_len)); + } + + return (my_b_safe_write(file, (byte*) buf, sizeof(buf)) || + my_b_safe_write(file, (byte*) name, name_len) || + my_b_safe_write(file, (byte*) buf1, 1)); +} + +/***************************************************************************** + + User_var_log_event::print() + + ****************************************************************************/ +#ifdef MYSQL_CLIENT +void User_var_log_event::print(FILE* file, bool short_form, char* last_db) +{ + if (!short_form) + { + print_header(file); + fprintf(file, "\tUser_var\n"); + } + + fprintf(file, "SET @"); + my_fwrite(file, (byte*) name, (uint) (name_len), MYF(MY_NABP | MY_WME)); + + if (is_null) + { + fprintf(file, ":=NULL;\n"); + } + else + { + switch (type) { + case REAL_RESULT: + double real_val; + float8get(real_val, val); + fprintf(file, ":=%.14g;\n", real_val); + break; + case INT_RESULT: + char int_buf[22]; + longlong10_to_str(uint8korr(val), int_buf, -10); + fprintf(file, ":=%s;\n", int_buf); + break; + case STRING_RESULT: + fprintf(file, ":='%s';\n", val); + break; + case ROW_RESULT: + DBUG_ASSERT(1); + return; + } + } + fflush(file); +} +#endif + +/***************************************************************************** + + User_var_log_event::exec_event() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +int User_var_log_event::exec_event(struct st_relay_log_info* rli) +{ + Item *it= 0; + CHARSET_INFO *charset= log_cs; + LEX_STRING user_var_name; + user_var_name.str= name; + user_var_name.length= name_len; + + if (type != ROW_RESULT) + init_sql_alloc(&thd->mem_root, 8192,0); + + if (is_null) + { + it= new Item_null(); + } + else + { + switch (type) { + case REAL_RESULT: + double real_val; + float8get(real_val, val); + it= new Item_real(real_val); + break; + case INT_RESULT: + it= new Item_int((longlong) uint8korr(val)); + break; + case STRING_RESULT: + it= new Item_string(val, val_len, charset); + break; + case ROW_RESULT: + DBUG_ASSERT(1); + return 0; + } + charset= get_charset(charset_number, MYF(0)); + } + Item_func_set_user_var e(user_var_name, it); + e.fix_fields(thd, 0, 0); + e.update_hash(val, val_len, type, charset); + free_root(&thd->mem_root,0); + + rli->inc_pending(get_event_len()); + return 0; +} +#endif // !MYSQL_CLIENT /***************************************************************************** ***************************************************************************** diff --git a/sql/log_event.h b/sql/log_event.h index ec3b4819e74..f7a5dbdc406 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -173,6 +173,14 @@ struct sql_ex_info #define RAND_SEED1_OFFSET 0 #define RAND_SEED2_OFFSET 8 +/* User_var event post-header */ + +#define UV_VAL_LEN_SIZE 4 +#define UV_VAL_IS_NULL 1 +#define UV_VAL_TYPE_SIZE 1 +#define UV_NAME_LEN_SIZE 4 +#define UV_CHARSET_NUMBER_SIZE 4 + /* Load event post-header */ #define L_THREAD_ID_OFFSET 0 @@ -222,7 +230,7 @@ enum Log_event_type START_EVENT = 1, QUERY_EVENT =2, STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5, LOAD_EVENT=6, SLAVE_EVENT=7, CREATE_FILE_EVENT=8, APPEND_BLOCK_EVENT=9, EXEC_LOAD_EVENT=10, DELETE_FILE_EVENT=11, - NEW_LOAD_EVENT=12, RAND_EVENT=13 + NEW_LOAD_EVENT=12, RAND_EVENT=13, USER_VAR_EVENT=14 }; enum Int_event_type @@ -590,6 +598,46 @@ class Rand_log_event: public Log_event bool is_valid() { return 1; } }; +/***************************************************************************** + + User var Log Event class + + ****************************************************************************/ +class User_var_log_event: public Log_event +{ +public: + char *name; + uint name_len; + char *val; + ulong val_len; + Item_result type; + uint charset_number; + byte is_null; +#ifndef MYSQL_CLIENT + User_var_log_event(THD* thd_arg, char *name_arg, uint name_len_arg, + char *val_arg, ulong val_len_arg, Item_result type_arg, + uint charset_number_arg) + :Log_event(), name(name_arg), name_len(name_len_arg), val(val_arg), + val_len(val_len_arg), type(type_arg), charset_number(charset_number_arg) + { is_null= !val; } + void pack_info(Protocol* protocol); + int exec_event(struct st_relay_log_info* rli); +#else + void print(FILE* file, bool short_form = 0, char* last_db = 0); +#endif + + User_var_log_event(const char* buf, bool old_format); + ~User_var_log_event() {} + Log_event_type get_type_code() { return USER_VAR_EVENT;} + int get_data_size() + { + return (is_null ? UV_NAME_LEN_SIZE + name_len + UV_VAL_IS_NULL : + UV_NAME_LEN_SIZE + name_len + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + + UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE + val_len); + } + int write_data(IO_CACHE* file); + bool is_valid() { return 1; } +}; /***************************************************************************** diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 3ca1f4827ff..8a02d91f2db 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -145,6 +145,17 @@ THD::THD():user_time(0), fatal_error(0), (hash_get_key) get_var_key, (hash_free_key) free_user_var,0); + /* For user vars replication*/ + if (opt_bin_log) + my_init_dynamic_array(&user_var_events, + sizeof(BINLOG_USER_VAR_EVENT *), + 16, + 16); + else + bzero((char*) &user_var_events, sizeof(user_var_events)); + + + /* Prepared statements */ last_prepared_stmt= 0; init_tree(&prepared_statements, 0, 0, sizeof(PREP_STMT), @@ -244,6 +255,7 @@ void THD::cleanup(void) close_thread_tables(this); } close_temporary_tables(this); + delete_dynamic(&user_var_events); hash_free(&user_vars); if (global_read_lock) unlock_global_read_lock(this); diff --git a/sql/sql_class.h b/sql/sql_class.h index bc72e6324e9..5b50da1ebac 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -57,6 +57,15 @@ typedef struct st_log_info ~st_log_info() { pthread_mutex_destroy(&lock);} } LOG_INFO; +typedef struct st_user_var_events +{ + user_var_entry *user_var_event; + char *value; + ulong length; + Item_result type; + uint charset_number; +} BINLOG_USER_VAR_EVENT; + class Log_event; class MYSQL_LOG { @@ -511,6 +520,8 @@ public: uint check_loops_counter; //last id used to check loops /* variables.transaction_isolation is reset to this after each commit */ enum_tx_isolation session_tx_isolation; + /* for user variables replication*/ + DYNAMIC_ARRAY user_var_events; // extend scramble to handle new auth char scramble[SCRAMBLE41_LENGTH+1]; // old scramble is needed to handle old clients @@ -896,7 +907,7 @@ class user_var_entry public: LEX_STRING name; char *value; - ulong length, update_query_id; + ulong length, update_query_id, used_query_id; Item_result type; CHARSET_INFO *var_charset; }; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index b2f08b4e0d4..e748a3acae9 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -390,6 +390,10 @@ void init_update_queries(void) uc_update_queries[SQLCOM_UPDATE_MULTI]=1; } +bool is_update_query(enum enum_sql_command command) +{ + return uc_update_queries[command]; +} /* Check if maximum queries per hour limit has been reached @@ -3077,6 +3081,8 @@ mysql_init_query(THD *thd) thd->sent_row_count= thd->examined_row_count= 0; thd->fatal_error= thd->rand_used= 0; thd->possible_loops= 0; + if (opt_bin_log) + reset_dynamic(&thd->user_var_events); DBUG_VOID_RETURN; } diff --git a/sql/unireg.h b/sql/unireg.h index 724ff3f6197..2dc84720b2d 100644 --- a/sql/unireg.h +++ b/sql/unireg.h @@ -136,6 +136,8 @@ bfill((A)->null_flags,(A)->null_bytes,255);\ #define BIN_LOG_HEADER_SIZE 4 +#define FLOATING_POINT_BUFFER 331 + /* Include prototypes for unireg */ #include "mysqld_error.h" |