diff options
author | unknown <sasha@mysql.sashanet.com> | 2001-10-23 13:28:03 -0600 |
---|---|---|
committer | unknown <sasha@mysql.sashanet.com> | 2001-10-23 13:28:03 -0600 |
commit | 8fc78e08b0e68f82121b0bba8d930bc5ec57a29f (patch) | |
tree | c5c654aca3970830687c68cb22289835e1719233 /mysys/mf_iocache.c | |
parent | 74f49f9f34b7a4cc1b87aa1cb657b72f8c6c856e (diff) | |
download | mariadb-git-8fc78e08b0e68f82121b0bba8d930bc5ec57a29f.tar.gz |
cleanup
removal of duplicate code in mf_iocache.cc
work on failsafe replication
work on SEQ_READ_APPEND io cache
include/my_sys.h:
updates for SEQ_READ_APPEND
libmysql/Makefile.am:
fix for mysys/mf_iocache.c
libmysql/libmysql.c:
updates for new format of SHOW SLAVE HOSTS
mysql-test/r/rpl000001.result:
test replication of LOAD DATA LOCAL INFILE
mysql-test/r/rpl000002.result:
updated test result
mysql-test/t/rpl000001.test:
test LOAD DATA LOCAL INFILE
mysys/mf_iocache.c:
cleanup to remove duplicate functionality
some work on SEQ_READ_APPEND
sql/mf_iocache.cc:
cleanup to remove duplicate functionality
sql/repl_failsafe.cc:
more work on failsafe replication
sql/repl_failsafe.h:
more work on failsafe replication
sql/slave.cc:
cleanup
more work on failsafe replication
sql/sql_load.cc:
fixed bug on replicating empty file loads
got LOAD DATA LOCAL INFILE to work again, and to be replicated
sql/sql_repl.cc:
cleanup
more work on failsafe replication
sql/sql_repl.h:
more work on failsafe replication
Diffstat (limited to 'mysys/mf_iocache.c')
-rw-r--r-- | mysys/mf_iocache.c | 259 |
1 files changed, 210 insertions, 49 deletions
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index 0ef496227b6..f0100b5122b 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -36,10 +36,34 @@ #include <m_string.h> #ifdef HAVE_AIOWAIT #include "mysys_err.h" -#include <errno.h> static void my_aiowait(my_aio_result *result); #endif +#include <assert.h> +#include <errno.h> + +static void init_read_function(IO_CACHE* info, enum cache_type type); +static void init_read_function(IO_CACHE* info, enum cache_type type) +{ + switch (type) + { +#ifndef MYSQL_CLIENT + case READ_NET: + /* must be initialized by the caller. The problem is that + _my_b_net_read has to be defined in sql directory because of + the dependency on THD, and therefore cannot be visible to + programs that link against mysys but know nothing about THD, such + as myisamchk + */ + break; +#endif + case SEQ_READ_APPEND: + info->read_function = _my_b_seq_read; + break; + default: + info->read_function = _my_b_read; + } +} /* ** if cachesize == 0 then use default cachesize (from s-file) @@ -55,14 +79,15 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, DBUG_ENTER("init_io_cache"); DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset)); - info->file=file; + /* There is no file in net_reading */ + info->file= file; info->pre_close = info->pre_read = info->post_read = 0; info->arg = 0; if (!cachesize) if (! (cachesize= my_default_record_cache_size)) DBUG_RETURN(1); /* No cache requested */ min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2; - if (type == READ_CACHE) + if (type == READ_CACHE || type == SEQ_READ_APPEND) { /* Assume file isn't growing */ if (cache_myflags & MY_DONT_CHECK_FILESIZE) { @@ -77,36 +102,65 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, if (end_of_file < seek_offset) end_of_file=seek_offset; VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0))); - if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1) + /* Trim cache size if the file is very small. + However, we should not do this with SEQ_READ_APPEND cache + */ + if (type != SEQ_READ_APPEND && + (my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1) { cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1; - use_async_io=0; /* No nead to use async */ + use_async_io=0; /* No need to use async */ } } } - - for (;;) + info->alloced_buffer = 0; + if ((int) type < (int) READ_NET) { - cachesize=(uint) ((ulong) (cachesize + min_cache-1) & - (ulong) ~(min_cache-1)); - if (cachesize < min_cache) - cachesize = min_cache; - if ((info->buffer= - (byte*) my_malloc(cachesize, - MYF((cache_myflags & ~ MY_WME) | - (cachesize == min_cache ? MY_WME : 0)))) != 0) - break; /* Enough memory found */ - if (cachesize == min_cache) - DBUG_RETURN(2); /* Can't alloc cache */ - cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */ + uint buffer_block; + for (;;) + { + buffer_block = cachesize=(uint) ((ulong) (cachesize + min_cache-1) & + (ulong) ~(min_cache-1)); + if (type == SEQ_READ_APPEND) + buffer_block *= 2; + if (cachesize < min_cache) + cachesize = min_cache; + if ((info->buffer= + (byte*) my_malloc(buffer_block, + MYF((cache_myflags & ~ MY_WME) | + (cachesize == min_cache ? MY_WME : 0)))) != 0) + { + if (type == SEQ_READ_APPEND) + info->append_buffer = info->buffer + cachesize; + info->alloced_buffer=1; + break; /* Enough memory found */ + } + if (cachesize == min_cache) + DBUG_RETURN(2); /* Can't alloc cache */ + cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */ + } } - info->pos_in_file=seek_offset; + else + info->buffer=0; + + DBUG_PRINT("info",("init_io_cache: cachesize = %u",cachesize)); + info->pos_in_file= seek_offset; info->read_length=info->buffer_length=cachesize; - info->seek_not_done= test(file >= 0); /* Seek not done */ + info->seek_not_done= test(file >= 0 && type != READ_FIFO && + type != READ_NET); info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP); info->rc_request_pos=info->rc_pos=info->buffer; + if (type == SEQ_READ_APPEND) + { + info->append_pos = info->append_buffer; + info->append_end = info->append_buffer + info->buffer_length; +#ifdef THREAD + pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST); +#endif + } - if (type == READ_CACHE) + if (type == READ_CACHE || type == SEQ_READ_APPEND || + type == READ_NET || type == READ_FIFO) { info->rc_end=info->buffer; /* Nothing in cache */ } @@ -114,10 +168,12 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, { info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1)); } - info->end_of_file=MY_FILEPOS_ERROR; /* May be changed by user */ + /* end_of_file may be changed by user later */ + info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0 + : ~(my_off_t) 0); info->type=type; info->error=0; - info->read_function=_my_b_read; + init_read_function(info,type); #ifdef HAVE_AIOWAIT if (use_async_io && ! my_disable_async_io) { @@ -169,6 +225,8 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, DBUG_ENTER("reinit_io_cache"); info->seek_not_done= test(info->file >= 0); /* Seek not done */ + + /* If the whole file is in memory, avoid flushing to disk */ if (! clear_cache && seek_offset >= info->pos_in_file && seek_offset <= info->pos_in_file + @@ -179,8 +237,12 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, info->rc_end=info->rc_pos; info->end_of_file=my_b_tell(info); } - else if (info->type == READ_CACHE && type == WRITE_CACHE) - info->rc_end=info->buffer+info->buffer_length; + else if (type == WRITE_CACHE) + { + if (info->type == READ_CACHE) + info->rc_end=info->buffer+info->buffer_length; + info->end_of_file = ~(my_off_t) 0; + } info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file); #ifdef HAVE_AIOWAIT my_aiowait(&info->aio_result); /* Wait for outstanding req */ @@ -188,13 +250,22 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, } else { + /* + If we change from WRITE_CACHE to READ_CACHE, assume that everything + after the current positions should be ignored + */ if (info->type == WRITE_CACHE && type == READ_CACHE) info->end_of_file=my_b_tell(info); - if (flush_io_cache(info)) + /* No need to flush cache if we want to reuse it */ + if ((type != WRITE_CACHE || !clear_cache) && flush_io_cache(info)) DBUG_RETURN(1); - info->pos_in_file=seek_offset; + if (info->pos_in_file != seek_offset) + { + info->pos_in_file=seek_offset; + info->seek_not_done=1; + } info->rc_request_pos=info->rc_pos=info->buffer; - if (type == READ_CACHE) + if (type == READ_CACHE || type == READ_NET || type == READ_FIFO) { info->rc_end=info->buffer; /* Nothing in cache */ } @@ -202,19 +273,23 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, { info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1)); - info->end_of_file=MY_FILEPOS_ERROR; /* May be changed by user */ + info->end_of_file= ((type == READ_NET || type == READ_FIFO) ? 0 : + ~(my_off_t) 0); } } info->type=type; info->error=0; - info->read_function=_my_b_read; + init_read_function(info,type); #ifdef HAVE_AIOWAIT - if (use_async_io && ! my_disable_async_io && - ((ulong) info->buffer_length < - (ulong) (info->end_of_file - seek_offset))) + if (type != READ_NET) { - info->read_length=info->buffer_length/2; - info->read_function=_my_b_async_read; + if (use_async_io && ! my_disable_async_io && + ((ulong) info->buffer_length < + (ulong) (info->end_of_file - seek_offset))) + { + info->read_length=info->buffer_length/2; + info->read_function=_my_b_async_read; + } } info->inited=0; #endif @@ -223,18 +298,27 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, - /* Read buffered. Returns 1 if can't read requested characters */ - /* Returns 0 if record read */ + /* + Read buffered. Returns 1 if can't read requested characters + This function is only called from the my_b_read() macro + when there isn't enough characters in the buffer to + satisfy the request. + Returns 0 we succeeded in reading all data + */ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) { uint length,diff_length,left_length; my_off_t max_length, pos_in_file; - - memcpy(Buffer,info->rc_pos, - (size_t) (left_length=(uint) (info->rc_end-info->rc_pos))); - Buffer+=left_length; - Count-=left_length; + + if ((left_length=(uint) (info->rc_end-info->rc_pos))) + { + dbug_assert(Count >= left_length); /* User is not using my_b_read() */ + memcpy(Buffer,info->rc_pos, (size_t) (left_length)); + Buffer+=left_length; + Count-=left_length; + } + /* pos_in_file always point on where info->buffer was read */ pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer); if (info->seek_not_done) { /* File touched, do seek */ @@ -264,10 +348,10 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) left_length+=length; diff_length=0; } - max_length=info->end_of_file - pos_in_file; - if (max_length > info->read_length-diff_length) - max_length=info->read_length-diff_length; - + max_length=info->read_length-diff_length; + if (info->type != READ_FIFO && + (info->end_of_file - pos_in_file) < max_length) + max_length = info->end_of_file - pos_in_file; if (!max_length) { if (Count) @@ -293,6 +377,78 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) return 0; } +int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) +{ + uint length,diff_length,left_length; + my_off_t max_length, pos_in_file; + + if ((left_length=(uint) (info->rc_end-info->rc_pos))) + { + dbug_assert(Count >= left_length); /* User is not using my_b_read() */ + memcpy(Buffer,info->rc_pos, (size_t) (left_length)); + Buffer+=left_length; + Count-=left_length; + } + /* pos_in_file always point on where info->buffer was read */ + pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer); + /* no need to seek since the read is guaranteed to be sequential */ + diff_length=(uint) (pos_in_file & (IO_SIZE-1)); +#ifdef THREAD + pthread_mutex_lock(&info->append_buffer_lock); +#endif +#ifdef THREAD + pthread_mutex_unlock(&info->append_buffer_lock); +#endif + if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length))) + { /* Fill first intern buffer */ + uint read_length; + if (info->end_of_file == pos_in_file) + { /* End of file */ + info->error=(int) left_length; + return 1; + } + length=(Count & (uint) ~(IO_SIZE-1))-diff_length; + if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) + != (uint) length) + { + info->error= read_length == (uint) -1 ? -1 : + (int) (read_length+left_length); + return 1; + } + Count-=length; + Buffer+=length; + pos_in_file+=length; + left_length+=length; + diff_length=0; + } + max_length=info->read_length-diff_length; + if (info->type != READ_FIFO && + (info->end_of_file - pos_in_file) < max_length) + max_length = info->end_of_file - pos_in_file; + if (!max_length) + { + if (Count) + { + info->error= left_length; /* We only got this many char */ + return 1; + } + length=0; /* Didn't read any chars */ + } + else if ((length=my_read(info->file,info->buffer,(uint) max_length, + info->myflags)) < Count || + length == (uint) -1) + { + if (length != (uint) -1) + memcpy(Buffer,info->buffer,(size_t) length); + info->error= length == (uint) -1 ? -1 : (int) (length+left_length); + return 1; + } + info->rc_pos=info->buffer+Count; + info->rc_end=info->buffer+length; + info->pos_in_file=pos_in_file; + memcpy(Buffer,info->buffer,(size_t) Count); + return 0; +} #ifdef HAVE_AIOWAIT @@ -490,6 +646,11 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) Buffer+=rest_length; Count-=rest_length; info->rc_pos+=rest_length; + if (info->pos_in_file+info->buffer_length > info->end_of_file) + { + my_errno=errno=EFBIG; + return info->error = -1; + } if (flush_io_cache(info)) return 1; if (Count >= IO_SIZE) @@ -596,7 +757,7 @@ int flush_io_cache(IO_CACHE *info) } } #ifdef HAVE_AIOWAIT - else + else if (info->type != READ_NET) { my_aiowait(&info->aio_result); /* Wait for outstanding req */ info->inited=0; @@ -613,7 +774,7 @@ int end_io_cache(IO_CACHE *info) DBUG_ENTER("end_io_cache"); if((pre_close=info->pre_close)) (*pre_close)(info); - if (info->buffer) + if (info->alloced_buffer) { if (info->file != -1) /* File doesn't exist */ error=flush_io_cache(info); |