diff options
author | Sergei Golubchik <serg@mariadb.org> | 2015-05-16 08:48:52 +0200 |
---|---|---|
committer | Sergei Golubchik <serg@mariadb.org> | 2015-06-02 18:53:36 +0200 |
commit | 80e61ae21e2373ee73407f91f596b11c2c46e7d9 (patch) | |
tree | 6478f01d7bc067fb7c526056b1544b3567753993 | |
parent | 91dab5ddb6accbb256e7089aea8ba38e66a1b0cf (diff) | |
download | mariadb-git-80e61ae21e2373ee73407f91f596b11c2c46e7d9.tar.gz |
cleanup: LOAD DATA replication support in IO_CACHE
remove some 14-year old code that added support for
LOAD DATA replication to IO_CACHE:
* three callbacks, of which only two were actually used and that
were only needed for LOAD DATA replication but were
tested in every IO_CACHE instance
* an additional opaque void * argument in IO_CACHE, also only
used for LOAD DATA replication, but present everywhere
* the code to close IO_CACHE prematurely in LOAD DATA to have
these callbacks called in the correct order and a long
comment explaining what will happen if IO_CACHE is not
closed prematurely
* a variable to track whether IO_CACHE was closed prematurely
(to avoid double-closing it)
-rw-r--r-- | include/my_sys.h | 12 | ||||
-rw-r--r-- | mysys/mf_iocache.c | 14 | ||||
-rw-r--r-- | sql/sql_load.cc | 82 | ||||
-rw-r--r-- | sql/sql_repl.cc | 16 | ||||
-rw-r--r-- | sql/sql_repl.h | 11 |
5 files changed, 37 insertions, 98 deletions
diff --git a/include/my_sys.h b/include/my_sys.h index d9536356cda..17ad872e4e9 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -364,7 +364,6 @@ typedef struct st_dynamic_string } DYNAMIC_STRING; struct st_io_cache; -typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*); typedef struct st_io_cache_share { @@ -461,22 +460,11 @@ typedef struct st_io_cache /* Used when cacheing files */ */ enum cache_type type; /* - Callbacks when the actual read I/O happens. These were added and - are currently used for binary logging of LOAD DATA INFILE - when a - block is read from the file, we create a block create/append event, and - when IO_CACHE is closed, we create an end event. These functions could, - of course be used for other things - */ - IO_CACHE_CALLBACK pre_read; - IO_CACHE_CALLBACK post_read; - IO_CACHE_CALLBACK pre_close; - /* Counts the number of times, when we were forced to use disk. We use it to increase the binlog_cache_disk_use and binlog_stmt_cache_disk_use status variables. */ ulong disk_writes; - void* arg; /* for use by pre/post_read */ char *file_name; /* if used with 'open_cached_file' */ const char *dir; char prefix[3]; diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index a3cbaff68b0..354995c644d 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -1,5 +1,6 @@ /* Copyright (c) 2000, 2011, Oracle and/or its affiliates + Copyright (c) 2010, 2015, MariaDB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -157,8 +158,6 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize, info->file= file; info->type= TYPE_NOT_SET; /* Don't set it until mutex are created */ info->pos_in_file= seek_offset; - info->pre_close = info->pre_read = info->post_read = 0; - info->arg = 0; info->alloced_buffer = 0; info->buffer=0; info->seek_not_done= 0; @@ -1500,13 +1499,8 @@ int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count) int _my_b_get(IO_CACHE *info) { uchar buff; - IO_CACHE_CALLBACK pre_read,post_read; - if ((pre_read = info->pre_read)) - (*pre_read)(info); if ((*(info)->read_function)(info,&buff,1)) return my_b_EOF; - if ((post_read = info->post_read)) - (*post_read)(info); return (int) (uchar) buff; } @@ -1821,7 +1815,6 @@ int my_b_flush_io_cache(IO_CACHE *info, int end_io_cache(IO_CACHE *info) { int error=0; - IO_CACHE_CALLBACK pre_close; DBUG_ENTER("end_io_cache"); DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info)); @@ -1831,11 +1824,6 @@ int end_io_cache(IO_CACHE *info) */ DBUG_ASSERT(!info->share || !info->share->total_threads); - if ((pre_close=info->pre_close)) - { - (*pre_close)(info); - info->pre_close= 0; - } if (info->alloced_buffer) { info->alloced_buffer=0; diff --git a/sql/sql_load.cc b/sql/sql_load.cc index 18982c50bf2..62b6e1742f1 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -74,8 +74,6 @@ class READ_INFO { int field_term_char,line_term_char,enclosed_char,escape_char; int *stack,*stack_pos; bool found_end_of_line,start_of_line,eof; - bool need_end_io_cache; - IO_CACHE cache; NET *io_net; int level; /* for load xml */ @@ -84,6 +82,7 @@ public: uchar *row_start, /* Found row starts here */ *row_end; /* Found row ends here */ CHARSET_INFO *read_charset; + LOAD_FILE_IO_CACHE cache; READ_INFO(File file,uint tot_length,CHARSET_INFO *cs, String &field_term,String &line_start,String &line_term, @@ -101,25 +100,9 @@ public: int read_xml(); int clear_level(int level); - /* - We need to force cache close before destructor is invoked to log - the last read block - */ - void end_io_cache() - { - ::end_io_cache(&cache); - need_end_io_cache = 0; - } my_off_t file_length() { return cache.end_of_file; } my_off_t position() { return my_b_tell(&cache); } - /* - Either this method, or we need to make cache public - Arg must be set from mysql_load() since constructor does not see - either the table or THD value - */ - void set_io_cache_arg(void* arg) { cache.arg = arg; } - /** skip all data till the eof. */ @@ -187,7 +170,6 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, String *enclosed=ex->enclosed; bool is_fifo=0; #ifndef EMBEDDED_LIBRARY - LOAD_FILE_INFO lf_info; killed_state killed_status; bool is_concurrent; #endif @@ -453,11 +435,10 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, #ifndef EMBEDDED_LIBRARY if (mysql_bin_log.is_open()) { - lf_info.thd = thd; - lf_info.wrote_create_file = 0; - lf_info.last_pos_in_file = HA_POS_ERROR; - lf_info.log_delayed= transactional_table; - read_info.set_io_cache_arg((void*) &lf_info); + read_info.cache.thd = thd; + read_info.cache.wrote_create_file = 0; + read_info.cache.last_pos_in_file = HA_POS_ERROR; + read_info.cache.log_delayed= transactional_table; } #endif /*!EMBEDDED_LIBRARY*/ @@ -553,30 +534,12 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, { { /* - Make sure last block (the one which caused the error) gets - logged. This is needed because otherwise after write of (to - the binlog, not to read_info (which is a cache)) - Delete_file_log_event the bad block will remain in read_info - (because pre_read is not called at the end of the last - block; remember pre_read is called whenever a new block is - read from disk). At the end of mysql_load(), the destructor - of read_info will call end_io_cache() which will flush - read_info, so we will finally have this in the binlog: - - Append_block # The last successfull block - Delete_file - Append_block # The failing block - which is nonsense. - Or could also be (for a small file) - Create_file # The failing block - which is nonsense (Delete_file is not written in this case, because: - Create_file has not been written, so Delete_file is not written, then - when read_info is destroyed end_io_cache() is called which writes - Create_file. + Make sure last block (the one which caused the error) gets + logged. */ - read_info.end_io_cache(); + log_loaded_block(&read_info.cache, 0, 0); /* If the file was not empty, wrote_create_file is true */ - if (lf_info.wrote_create_file) + if (read_info.cache.wrote_create_file) { int errcode= query_error_code(thd, killed_status == NOT_KILLED); @@ -626,12 +589,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, else { /* - As already explained above, we need to call end_io_cache() or the last - block will be logged only after Execute_load_query_log_event (which is - wrong), when read_info is destroyed. + As already explained above, we need to call log_loaded_block() to have + the last block logged */ - read_info.end_io_cache(); - if (lf_info.wrote_create_file) + log_loaded_block(&read_info.cache, 0, 0); + if (read_info.cache.wrote_create_file) { int errcode= query_error_code(thd, killed_status == NOT_KILLED); error= write_execute_load_query_log_event(thd, ex, @@ -1350,7 +1312,7 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs, String &enclosed_par, int escape, bool get_it_from_net, bool is_fifo) :file(file_par), buffer(NULL), buff_length(tot_length), escape_char(escape), - found_end_of_line(false), eof(false), need_end_io_cache(false), + found_end_of_line(false), eof(false), error(false), line_cuted(false), found_null(false), read_charset(cs) { /* @@ -1410,20 +1372,15 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs, } else { - /* - init_io_cache() will not initialize read_function member - if the cache is READ_NET. So we work around the problem with a - manual assignment - */ - need_end_io_cache = 1; - #ifndef EMBEDDED_LIBRARY if (get_it_from_net) cache.read_function = _my_b_net_read; if (mysql_bin_log.is_open()) - cache.pre_read = cache.pre_close = - (IO_CACHE_CALLBACK) log_loaded_block; + { + cache.real_read_function= cache.read_function; + cache.read_function= log_loaded_block; + } #endif } } @@ -1432,8 +1389,7 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs, READ_INFO::~READ_INFO() { - if (need_end_io_cache) - ::end_io_cache(&cache); + ::end_io_cache(&cache); my_free(buffer); List_iterator<XML_TAG> xmlit(taglist); XML_TAG *t; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index d9ae6cad111..0ea61320a5a 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -4119,20 +4119,20 @@ err: @retval 0 success @retval 1 failure */ -int log_loaded_block(IO_CACHE* file) +int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count) { DBUG_ENTER("log_loaded_block"); - LOAD_FILE_INFO *lf_info; + LOAD_FILE_IO_CACHE *lf_info= static_cast<LOAD_FILE_IO_CACHE*>(file); uint block_len; /* buffer contains position where we started last read */ uchar* buffer= (uchar*) my_b_get_buffer_start(file); - uint max_event_size= current_thd->variables.max_allowed_packet; - lf_info= (LOAD_FILE_INFO*) file->arg; + uint max_event_size= lf_info->thd->variables.max_allowed_packet; + if (lf_info->thd->is_current_stmt_binlog_format_row()) - DBUG_RETURN(0); + goto ret; if (lf_info->last_pos_in_file != HA_POS_ERROR && lf_info->last_pos_in_file >= my_b_get_pos_in_file(file)) - DBUG_RETURN(0); + goto ret; for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0; buffer += MY_MIN(block_len, max_event_size), @@ -4158,7 +4158,9 @@ int log_loaded_block(IO_CACHE* file) lf_info->wrote_create_file= 1; } } - DBUG_RETURN(0); +ret: + int res= Buffer ? lf_info->real_read_function(file, Buffer, Count) : 0; + DBUG_RETURN(res); } diff --git a/sql/sql_repl.h b/sql/sql_repl.h index 7f7751b8f44..a9fdce9e5e2 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -56,14 +56,15 @@ extern int init_master_info(Master_info* mi); void kill_zombie_dump_threads(uint32 slave_server_id); int check_binlog_magic(IO_CACHE* log, const char** errmsg); -typedef struct st_load_file_info +struct LOAD_FILE_IO_CACHE : public IO_CACHE { THD* thd; my_off_t last_pos_in_file; bool wrote_create_file, log_delayed; -} LOAD_FILE_INFO; + int (*real_read_function)(struct st_io_cache *,uchar *,size_t); +}; -int log_loaded_block(IO_CACHE* file); +int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count); int init_replication_sys_vars(); void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags); @@ -80,6 +81,10 @@ int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog); bool rpl_gtid_pos_check(THD *thd, char *str, size_t len); bool rpl_gtid_pos_update(THD *thd, char *str, size_t len); +#else + +struct LOAD_FILE_IO_CACHE : public IO_CACHE { }; + #endif /* HAVE_REPLICATION */ #endif /* SQL_REPL_INCLUDED */ |