summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergei Golubchik <serg@mariadb.org>2015-05-16 08:48:52 +0200
committerSergei Golubchik <serg@mariadb.org>2015-06-02 18:53:36 +0200
commit80e61ae21e2373ee73407f91f596b11c2c46e7d9 (patch)
tree6478f01d7bc067fb7c526056b1544b3567753993
parent91dab5ddb6accbb256e7089aea8ba38e66a1b0cf (diff)
downloadmariadb-git-80e61ae21e2373ee73407f91f596b11c2c46e7d9.tar.gz
cleanup: LOAD DATA replication support in IO_CACHE
remove some 14-year old code that added support for LOAD DATA replication to IO_CACHE: * three callbacks, of which only two were actually used and that were only needed for LOAD DATA replication but were tested in every IO_CACHE instance * an additional opaque void * argument in IO_CACHE, also only used for LOAD DATA replication, but present everywhere * the code to close IO_CACHE prematurely in LOAD DATA to have these callbacks called in the correct order and a long comment explaining what will happen if IO_CACHE is not closed prematurely * a variable to track whether IO_CACHE was closed prematurely (to avoid double-closing it)
-rw-r--r--include/my_sys.h12
-rw-r--r--mysys/mf_iocache.c14
-rw-r--r--sql/sql_load.cc82
-rw-r--r--sql/sql_repl.cc16
-rw-r--r--sql/sql_repl.h11
5 files changed, 37 insertions, 98 deletions
diff --git a/include/my_sys.h b/include/my_sys.h
index d9536356cda..17ad872e4e9 100644
--- a/include/my_sys.h
+++ b/include/my_sys.h
@@ -364,7 +364,6 @@ typedef struct st_dynamic_string
} DYNAMIC_STRING;
struct st_io_cache;
-typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*);
typedef struct st_io_cache_share
{
@@ -461,22 +460,11 @@ typedef struct st_io_cache /* Used when cacheing files */
*/
enum cache_type type;
/*
- Callbacks when the actual read I/O happens. These were added and
- are currently used for binary logging of LOAD DATA INFILE - when a
- block is read from the file, we create a block create/append event, and
- when IO_CACHE is closed, we create an end event. These functions could,
- of course be used for other things
- */
- IO_CACHE_CALLBACK pre_read;
- IO_CACHE_CALLBACK post_read;
- IO_CACHE_CALLBACK pre_close;
- /*
Counts the number of times, when we were forced to use disk. We use it to
increase the binlog_cache_disk_use and binlog_stmt_cache_disk_use status
variables.
*/
ulong disk_writes;
- void* arg; /* for use by pre/post_read */
char *file_name; /* if used with 'open_cached_file' */
const char *dir;
char prefix[3];
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index a3cbaff68b0..354995c644d 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -1,5 +1,6 @@
/*
Copyright (c) 2000, 2011, Oracle and/or its affiliates
+ Copyright (c) 2010, 2015, MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -157,8 +158,6 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
info->file= file;
info->type= TYPE_NOT_SET; /* Don't set it until mutex are created */
info->pos_in_file= seek_offset;
- info->pre_close = info->pre_read = info->post_read = 0;
- info->arg = 0;
info->alloced_buffer = 0;
info->buffer=0;
info->seek_not_done= 0;
@@ -1500,13 +1499,8 @@ int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
int _my_b_get(IO_CACHE *info)
{
uchar buff;
- IO_CACHE_CALLBACK pre_read,post_read;
- if ((pre_read = info->pre_read))
- (*pre_read)(info);
if ((*(info)->read_function)(info,&buff,1))
return my_b_EOF;
- if ((post_read = info->post_read))
- (*post_read)(info);
return (int) (uchar) buff;
}
@@ -1821,7 +1815,6 @@ int my_b_flush_io_cache(IO_CACHE *info,
int end_io_cache(IO_CACHE *info)
{
int error=0;
- IO_CACHE_CALLBACK pre_close;
DBUG_ENTER("end_io_cache");
DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
@@ -1831,11 +1824,6 @@ int end_io_cache(IO_CACHE *info)
*/
DBUG_ASSERT(!info->share || !info->share->total_threads);
- if ((pre_close=info->pre_close))
- {
- (*pre_close)(info);
- info->pre_close= 0;
- }
if (info->alloced_buffer)
{
info->alloced_buffer=0;
diff --git a/sql/sql_load.cc b/sql/sql_load.cc
index 18982c50bf2..62b6e1742f1 100644
--- a/sql/sql_load.cc
+++ b/sql/sql_load.cc
@@ -74,8 +74,6 @@ class READ_INFO {
int field_term_char,line_term_char,enclosed_char,escape_char;
int *stack,*stack_pos;
bool found_end_of_line,start_of_line,eof;
- bool need_end_io_cache;
- IO_CACHE cache;
NET *io_net;
int level; /* for load xml */
@@ -84,6 +82,7 @@ public:
uchar *row_start, /* Found row starts here */
*row_end; /* Found row ends here */
CHARSET_INFO *read_charset;
+ LOAD_FILE_IO_CACHE cache;
READ_INFO(File file,uint tot_length,CHARSET_INFO *cs,
String &field_term,String &line_start,String &line_term,
@@ -101,25 +100,9 @@ public:
int read_xml();
int clear_level(int level);
- /*
- We need to force cache close before destructor is invoked to log
- the last read block
- */
- void end_io_cache()
- {
- ::end_io_cache(&cache);
- need_end_io_cache = 0;
- }
my_off_t file_length() { return cache.end_of_file; }
my_off_t position() { return my_b_tell(&cache); }
- /*
- Either this method, or we need to make cache public
- Arg must be set from mysql_load() since constructor does not see
- either the table or THD value
- */
- void set_io_cache_arg(void* arg) { cache.arg = arg; }
-
/**
skip all data till the eof.
*/
@@ -187,7 +170,6 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
String *enclosed=ex->enclosed;
bool is_fifo=0;
#ifndef EMBEDDED_LIBRARY
- LOAD_FILE_INFO lf_info;
killed_state killed_status;
bool is_concurrent;
#endif
@@ -453,11 +435,10 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
#ifndef EMBEDDED_LIBRARY
if (mysql_bin_log.is_open())
{
- lf_info.thd = thd;
- lf_info.wrote_create_file = 0;
- lf_info.last_pos_in_file = HA_POS_ERROR;
- lf_info.log_delayed= transactional_table;
- read_info.set_io_cache_arg((void*) &lf_info);
+ read_info.cache.thd = thd;
+ read_info.cache.wrote_create_file = 0;
+ read_info.cache.last_pos_in_file = HA_POS_ERROR;
+ read_info.cache.log_delayed= transactional_table;
}
#endif /*!EMBEDDED_LIBRARY*/
@@ -553,30 +534,12 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
{
{
/*
- Make sure last block (the one which caused the error) gets
- logged. This is needed because otherwise after write of (to
- the binlog, not to read_info (which is a cache))
- Delete_file_log_event the bad block will remain in read_info
- (because pre_read is not called at the end of the last
- block; remember pre_read is called whenever a new block is
- read from disk). At the end of mysql_load(), the destructor
- of read_info will call end_io_cache() which will flush
- read_info, so we will finally have this in the binlog:
-
- Append_block # The last successfull block
- Delete_file
- Append_block # The failing block
- which is nonsense.
- Or could also be (for a small file)
- Create_file # The failing block
- which is nonsense (Delete_file is not written in this case, because:
- Create_file has not been written, so Delete_file is not written, then
- when read_info is destroyed end_io_cache() is called which writes
- Create_file.
+ Make sure last block (the one which caused the error) gets
+ logged.
*/
- read_info.end_io_cache();
+ log_loaded_block(&read_info.cache, 0, 0);
/* If the file was not empty, wrote_create_file is true */
- if (lf_info.wrote_create_file)
+ if (read_info.cache.wrote_create_file)
{
int errcode= query_error_code(thd, killed_status == NOT_KILLED);
@@ -626,12 +589,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
else
{
/*
- As already explained above, we need to call end_io_cache() or the last
- block will be logged only after Execute_load_query_log_event (which is
- wrong), when read_info is destroyed.
+ As already explained above, we need to call log_loaded_block() to have
+ the last block logged
*/
- read_info.end_io_cache();
- if (lf_info.wrote_create_file)
+ log_loaded_block(&read_info.cache, 0, 0);
+ if (read_info.cache.wrote_create_file)
{
int errcode= query_error_code(thd, killed_status == NOT_KILLED);
error= write_execute_load_query_log_event(thd, ex,
@@ -1350,7 +1312,7 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs,
String &enclosed_par, int escape, bool get_it_from_net,
bool is_fifo)
:file(file_par), buffer(NULL), buff_length(tot_length), escape_char(escape),
- found_end_of_line(false), eof(false), need_end_io_cache(false),
+ found_end_of_line(false), eof(false),
error(false), line_cuted(false), found_null(false), read_charset(cs)
{
/*
@@ -1410,20 +1372,15 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs,
}
else
{
- /*
- init_io_cache() will not initialize read_function member
- if the cache is READ_NET. So we work around the problem with a
- manual assignment
- */
- need_end_io_cache = 1;
-
#ifndef EMBEDDED_LIBRARY
if (get_it_from_net)
cache.read_function = _my_b_net_read;
if (mysql_bin_log.is_open())
- cache.pre_read = cache.pre_close =
- (IO_CACHE_CALLBACK) log_loaded_block;
+ {
+ cache.real_read_function= cache.read_function;
+ cache.read_function= log_loaded_block;
+ }
#endif
}
}
@@ -1432,8 +1389,7 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs,
READ_INFO::~READ_INFO()
{
- if (need_end_io_cache)
- ::end_io_cache(&cache);
+ ::end_io_cache(&cache);
my_free(buffer);
List_iterator<XML_TAG> xmlit(taglist);
XML_TAG *t;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index d9ae6cad111..0ea61320a5a 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -4119,20 +4119,20 @@ err:
@retval 0 success
@retval 1 failure
*/
-int log_loaded_block(IO_CACHE* file)
+int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count)
{
DBUG_ENTER("log_loaded_block");
- LOAD_FILE_INFO *lf_info;
+ LOAD_FILE_IO_CACHE *lf_info= static_cast<LOAD_FILE_IO_CACHE*>(file);
uint block_len;
/* buffer contains position where we started last read */
uchar* buffer= (uchar*) my_b_get_buffer_start(file);
- uint max_event_size= current_thd->variables.max_allowed_packet;
- lf_info= (LOAD_FILE_INFO*) file->arg;
+ uint max_event_size= lf_info->thd->variables.max_allowed_packet;
+
if (lf_info->thd->is_current_stmt_binlog_format_row())
- DBUG_RETURN(0);
+ goto ret;
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
- DBUG_RETURN(0);
+ goto ret;
for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0;
buffer += MY_MIN(block_len, max_event_size),
@@ -4158,7 +4158,9 @@ int log_loaded_block(IO_CACHE* file)
lf_info->wrote_create_file= 1;
}
}
- DBUG_RETURN(0);
+ret:
+ int res= Buffer ? lf_info->real_read_function(file, Buffer, Count) : 0;
+ DBUG_RETURN(res);
}
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 7f7751b8f44..a9fdce9e5e2 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -56,14 +56,15 @@ extern int init_master_info(Master_info* mi);
void kill_zombie_dump_threads(uint32 slave_server_id);
int check_binlog_magic(IO_CACHE* log, const char** errmsg);
-typedef struct st_load_file_info
+struct LOAD_FILE_IO_CACHE : public IO_CACHE
{
THD* thd;
my_off_t last_pos_in_file;
bool wrote_create_file, log_delayed;
-} LOAD_FILE_INFO;
+ int (*real_read_function)(struct st_io_cache *,uchar *,size_t);
+};
-int log_loaded_block(IO_CACHE* file);
+int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count);
int init_replication_sys_vars();
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
@@ -80,6 +81,10 @@ int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog);
bool rpl_gtid_pos_check(THD *thd, char *str, size_t len);
bool rpl_gtid_pos_update(THD *thd, char *str, size_t len);
+#else
+
+struct LOAD_FILE_IO_CACHE : public IO_CACHE { };
+
#endif /* HAVE_REPLICATION */
#endif /* SQL_REPL_INCLUDED */