diff options
Diffstat (limited to 'sql/mf_iocache.cc')
-rw-r--r-- | sql/mf_iocache.cc | 641 |
1 files changed, 641 insertions, 0 deletions
diff --git a/sql/mf_iocache.cc b/sql/mf_iocache.cc new file mode 100644 index 00000000000..e5ecc4bb428 --- /dev/null +++ b/sql/mf_iocache.cc @@ -0,0 +1,641 @@ +/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/* + Cashing of files with only does (sequential) read or writes of fixed- + length records. A read isn't allowed to go over file-length. A read is ok + if it ends at file-length and next read can try to read after file-length + (and get a EOF-error). + Possibly use of asyncronic io. + macros for read and writes for faster io. + Used instead of FILE when reading or writing whole files. + This will make mf_rec_cache obsolete. + One can change info->pos_in_file to a higher value to skip bytes in file if + also info->rc_pos is set to info->rc_end. + If called through open_cached_file(), then the temporary file will + only be created if a write exeeds the file buffer or if one calls + flush_io_cache(). +*/ + +#define MAP_TO_USE_RAID +#include "mysql_priv.h" +#include <mysys_err.h> +#ifdef HAVE_AIOWAIT +#include <errno.h> +static void my_aiowait(my_aio_result *result); +#endif + + /* if cachesize == 0 then use default cachesize (from s-file) */ + /* returns 0 if we have enough memory */ + +extern "C" { + + +int init_io_cache(IO_CACHE *info, File file, uint cachesize, + enum cache_type type, my_off_t seek_offset, + pbool use_async_io, myf cache_myflags) +{ + uint min_cache; + DBUG_ENTER("init_io_cache"); + DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset)); + + /* There is no file in net_reading */ + info->file= file; + if (!cachesize) + if (! (cachesize= my_default_record_cache_size)) + DBUG_RETURN(1); /* No cache requested */ + min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2; + if (type == READ_CACHE) + { /* Assume file isn't growing */ + my_off_t file_pos,end_of_file; + if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR)) + DBUG_RETURN(1); + end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0)); + if (end_of_file < seek_offset) + end_of_file=seek_offset; + VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0))); + if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1) + { + cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1; + use_async_io=0; /* No nead to use async */ + } + } + + if ((int) type < (int) READ_NET) + { + for (;;) + { + cachesize=(uint) ((ulong) (cachesize + min_cache-1) & + (ulong) ~(min_cache-1)); + if (cachesize < min_cache) + cachesize = min_cache; + if ((info->buffer= + (byte*) my_malloc(cachesize, + MYF((cache_myflags & ~ MY_WME) | + (cachesize == min_cache ? MY_WME : 0)))) != 0) + break; /* Enough memory found */ + if (cachesize == min_cache) + DBUG_RETURN(2); /* Can't alloc cache */ + cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */ + } + } + else + info->buffer=0; + info->pos_in_file= seek_offset; + info->read_length=info->buffer_length=cachesize; + info->seek_not_done=test(file >= 0); /* Seek not done */ + info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP); + info->rc_request_pos=info->rc_pos=info->buffer; + + if (type == READ_CACHE || type == READ_NET) /* the same logic */ + { + info->rc_end=info->buffer; /* Nothing in cache */ + } + else /* type == WRITE_CACHE */ + { + info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1)); + } + info->end_of_file=(type == READ_NET) ? 0 : MY_FILEPOS_ERROR; /* May be changed by user */ + info->type=type; + info->error=0; + info->read_function=(type == READ_NET) ? _my_b_net_read : _my_b_read; /* net | file */ +#ifdef HAVE_AIOWAIT + if (use_async_io && ! my_disable_async_io) + { + DBUG_PRINT("info",("Using async io")); + info->read_length/=2; + info->read_function=_my_b_async_read; + } + info->inited=info->aio_result.pending=0; +#endif + DBUG_RETURN(0); +} /* init_io_cache */ + + + /* Wait until current request is ready */ + +#ifdef HAVE_AIOWAIT +static void my_aiowait(my_aio_result *result) +{ + if (result->pending) + { + struct aio_result_t *tmp; + for (;;) + { + if ((int) (tmp=aiowait((struct timeval *) 0)) == -1) + { + if (errno == EINTR) + continue; + DBUG_PRINT("error",("No aio request, error: %d",errno)); + result->pending=0; /* Assume everythings is ok */ + break; + } + ((my_aio_result*) tmp)->pending=0; + if ((my_aio_result*) tmp == result) + break; + } + } + return; +} +#endif + + /* Use this to reset cache to start or other type */ + /* Some simple optimizing is done when reinit in current buffer */ + +my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, + my_off_t seek_offset, + pbool use_async_io __attribute__((unused)), + pbool clear_cache) +{ + DBUG_ENTER("reinit_io_cache"); + + info->seek_not_done= test(info->file >= 0); /* Seek not done */ + if (!clear_cache && seek_offset >= info->pos_in_file && + seek_offset <= info->pos_in_file + + (uint) (info->rc_end - info->rc_request_pos)) + { /* use current buffer */ + if (info->type == WRITE_CACHE && type == READ_CACHE) + { + info->rc_end=info->rc_pos; + info->end_of_file=my_b_tell(info); + } + else if (info->type == READ_CACHE && type == WRITE_CACHE) + info->rc_end=info->buffer+info->buffer_length; + info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file); +#ifdef HAVE_AIOWAIT + my_aiowait(&info->aio_result); /* Wait for outstanding req */ +#endif + } + else + { + if (info->type == WRITE_CACHE && type == READ_CACHE) + info->end_of_file=my_b_tell(info); + if (flush_io_cache(info)) + DBUG_RETURN(1); + info->pos_in_file=seek_offset; + info->rc_request_pos=info->rc_pos=info->buffer; + if (type == READ_CACHE || type == READ_NET) + { + info->rc_end=info->buffer; /* Nothing in cache */ + } + else + { + info->rc_end=info->buffer+info->buffer_length- + (seek_offset & (IO_SIZE-1)); + info->end_of_file=(type == READ_NET) ? 0 : MY_FILEPOS_ERROR; + } + } + info->type=type; + info->error=0; + info->read_function=(type == READ_NET) ? _my_b_net_read : _my_b_read; +#ifdef HAVE_AIOWAIT + if (type != READ_NET) + { + if (use_async_io && ! my_disable_async_io && + ((ulong) info->buffer_length < + (ulong) (info->end_of_file - seek_offset))) + { + info->read_length=info->buffer_length/2; + info->read_function=_my_b_async_read; + } + } + info->inited=0; +#endif + DBUG_RETURN(0); +} /* init_io_cache */ + + + + /* Read buffered. Returns 1 if can't read requested characters */ + /* Returns 0 if record read */ + +int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) +{ + uint length,diff_length,left_length; + my_off_t max_length, pos_in_file; + memcpy(Buffer,info->rc_pos, + (size_t) (left_length=(uint) (info->rc_end-info->rc_pos))); + Buffer+=left_length; + Count-=left_length; + pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer); + if (info->seek_not_done) + { /* File touched, do seek */ + VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); + info->seek_not_done=0; + } + diff_length=(uint) (pos_in_file & (IO_SIZE-1)); + if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length))) + { /* Fill first intern buffer */ + uint read_length; + if (info->end_of_file == pos_in_file) + { /* End of file */ + info->error=(int) left_length; + return 1; + } + length=(Count & (uint) ~(IO_SIZE-1))-diff_length; + if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) + != (uint) length) + { + info->error= read_length == (uint) -1 ? -1 : + (int) (read_length+left_length); + return 1; + } + Count-=length; + Buffer+=length; + pos_in_file+=length; + left_length+=length; + diff_length=0; + } + max_length=info->end_of_file - pos_in_file; + if (max_length > info->read_length-diff_length) + max_length=info->read_length-diff_length; + if (!max_length) + { + if (Count) + { + info->error= left_length; /* We only got this many char */ + return 1; + } + length=0; /* Didn't read any chars */ + } + else if ((length=my_read(info->file,info->buffer,(uint) max_length, + info->myflags)) < Count || + length == (uint) -1) + { + if (length != (uint) -1) + memcpy(Buffer,info->buffer,(size_t) length); + info->error= length == (uint) -1 ? -1 : (int) (length+left_length); + return 1; + } + info->rc_pos=info->buffer+Count; + info->rc_end=info->buffer+length; + info->pos_in_file=pos_in_file; + memcpy(Buffer,info->buffer,(size_t) Count); + return 0; +} + + /* + ** Read buffered from the net. + ** Returns 1 if can't read requested characters + ** Returns 0 if record read + */ + +int _my_b_net_read(register IO_CACHE *info, byte *Buffer, + uint Count __attribute__((unused))) +{ + int read_length; + NET *net= &(current_thd)->net; + + if (info->end_of_file) + return 1; /* because my_b_get (no _) takes 1 byte at a time */ + read_length=my_net_read(net); + if (read_length == (int) packet_error) + { + info->error=(uint) -1; + return 1; + } + if (read_length == 0) + { + /* End of file from client */ + info->end_of_file = 1; return 1; + } + /* to set up stuff for my_b_get (no _) */ + info->rc_end = (info->rc_pos = (byte*) net->read_pos) + read_length; + Buffer[0] = info->rc_pos[0]; /* length is always 1 */ + info->rc_pos++; + return 0; +} + +#ifdef HAVE_AIOWAIT + +int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count) +{ + uint length,read_length,diff_length,left_length,use_length,org_Count; + my_off_t max_length; + my_off_t next_pos_in_file; + byte *read_buffer; + + memcpy(Buffer,info->rc_pos, + (size_t) (left_length=(uint) (info->rc_end-info->rc_pos))); + Buffer+=left_length; + org_Count=Count; + Count-=left_length; + + if (info->inited) + { /* wait for read block */ + info->inited=0; /* No more block to read */ + my_aiowait(&info->aio_result); /* Wait for outstanding req */ + if (info->aio_result.result.aio_errno) + { + if (info->myflags & MY_WME) + my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG), + my_filename(info->file), + info->aio_result.result.aio_errno); + my_errno=info->aio_result.result.aio_errno; + info->error= -1; + return(1); + } + if (! (read_length = (uint) info->aio_result.result.aio_return) || + read_length == (uint) -1) + { + my_errno=0; /* For testing */ + info->error= (read_length == (uint) -1 ? -1 : + (int) (read_length+left_length)); + return(1); + } + info->pos_in_file+=(uint) (info->rc_end - info->rc_request_pos); + + if (info->rc_request_pos != info->buffer) + info->rc_request_pos=info->buffer; + else + info->rc_request_pos=info->buffer+info->read_length; + info->rc_pos=info->rc_request_pos; + next_pos_in_file=info->aio_read_pos+read_length; + + /* Check if pos_in_file is changed + (_ni_read_cache may have skipped some bytes) */ + + if (info->aio_read_pos < info->pos_in_file) + { /* Fix if skipped bytes */ + if (info->aio_read_pos + read_length < info->pos_in_file) + { + read_length=0; /* Skipp block */ + next_pos_in_file=info->pos_in_file; + } + else + { + my_off_t offset= (info->pos_in_file - info->aio_read_pos); + info->pos_in_file=info->aio_read_pos; /* Whe are here */ + info->rc_pos=info->rc_request_pos+offset; + read_length-=offset; /* Bytes left from rc_pos */ + } + } +#ifndef DBUG_OFF + if (info->aio_read_pos > info->pos_in_file) + { + my_errno=EINVAL; + return(info->read_length= -1); + } +#endif + /* Copy found bytes to buffer */ + length=min(Count,read_length); + memcpy(Buffer,info->rc_pos,(size_t) length); + Buffer+=length; + Count-=length; + left_length+=length; + info->rc_end=info->rc_pos+read_length; + info->rc_pos+=length; + } + else + next_pos_in_file=(info->pos_in_file+ (uint) + (info->rc_end - info->rc_request_pos)); + + /* If reading large blocks, or first read or read with skipp */ + if (Count) + { + if (next_pos_in_file == info->end_of_file) + { + info->error=(int) (read_length+left_length); + return 1; + } + VOID(my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))); + read_length=IO_SIZE*2- (uint) (next_pos_in_file & (IO_SIZE-1)); + if (Count < read_length) + { /* Small block, read to cache */ + if ((read_length=my_read(info->file,info->rc_request_pos, + read_length, info->myflags)) == (uint) -1) + return info->error= -1; + use_length=min(Count,read_length); + memcpy(Buffer,info->rc_request_pos,(size_t) use_length); + info->rc_pos=info->rc_request_pos+Count; + info->rc_end=info->rc_request_pos+read_length; + info->pos_in_file=next_pos_in_file; /* Start of block in cache */ + next_pos_in_file+=read_length; + + if (Count != use_length) + { /* Didn't find hole block */ + if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count) + my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG), + my_filename(info->file),my_errno); + info->error=(int) (read_length+left_length); + return 1; + } + } + else + { /* Big block, don't cache it */ + if ((read_length=my_read(info->file,Buffer,(uint) Count,info->myflags)) + != Count) + { + info->error= read_length == (uint) -1 ? -1 : read_length+left_length; + return 1; + } + info->rc_pos=info->rc_end=info->rc_request_pos; + info->pos_in_file=(next_pos_in_file+=Count); + } + } + + /* Read next block with asyncronic io */ + max_length=info->end_of_file - next_pos_in_file; + diff_length=(next_pos_in_file & (IO_SIZE-1)); + + if (max_length > (my_off_t) info->read_length - diff_length) + max_length= (my_off_t) info->read_length - diff_length; + if (info->rc_request_pos != info->buffer) + read_buffer=info->buffer; + else + read_buffer=info->buffer+info->read_length; + info->aio_read_pos=next_pos_in_file; + if (max_length) + { + info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */ + DBUG_PRINT("aioread",("filepos: %ld length: %ld", + (ulong) next_pos_in_file,(ulong) max_length)); + if (aioread(info->file,read_buffer,(int) max_length, + (my_off_t) next_pos_in_file,MY_SEEK_SET, + &info->aio_result.result)) + { /* Skipp async io */ + my_errno=errno; + DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped", + errno, info->aio_result.result.aio_errno)); + if (info->rc_request_pos != info->buffer) + { + bmove(info->buffer,info->rc_request_pos, + (uint) (info->rc_end - info->rc_pos)); + info->rc_request_pos=info->buffer; + info->rc_pos-=info->read_length; + info->rc_end-=info->read_length; + } + info->read_length=info->buffer_length; /* Use hole buffer */ + info->read_function=_my_b_read; /* Use normal IO_READ next */ + } + else + info->inited=info->aio_result.pending=1; + } + return 0; /* Block read, async in use */ +} /* _my_b_async_read */ +#endif + + +/* Read one byte when buffer is empty */ + +int _my_b_get(IO_CACHE *info) +{ + byte buff; + if ((*(info)->read_function)(info,&buff,1)) + return my_b_EOF; + return (int) (uchar) buff; +} + + /* Returns != 0 if error on write */ + +int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) +{ + uint rest_length,length; + + rest_length=(uint) (info->rc_end - info->rc_pos); + memcpy(info->rc_pos,Buffer,(size_t) rest_length); + Buffer+=rest_length; + Count-=rest_length; + info->rc_pos+=rest_length; + if (flush_io_cache(info)) + return 1; + if (Count >= IO_SIZE) + { /* Fill first intern buffer */ + length=Count & (uint) ~(IO_SIZE-1); + if (info->seek_not_done) + { /* File touched, do seek */ + VOID(my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0))); + info->seek_not_done=0; + } + if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP)) + return info->error= -1; + Count-=length; + Buffer+=length; + info->pos_in_file+=length; + } + memcpy(info->rc_pos,Buffer,(size_t) Count); + info->rc_pos+=Count; + return 0; +} + + +/* + Write a block to disk where part of the data may be inside the record + buffer. As all write calls to the data goes through the cache, + we will never get a seek over the end of the buffer +*/ + +int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count, + my_off_t pos) +{ + uint length; + int error=0; + + if (pos < info->pos_in_file) + { + /* Of no overlap, write everything without buffering */ + if (pos + Count <= info->pos_in_file) + return my_pwrite(info->file, Buffer, Count, pos, + info->myflags | MY_NABP); + /* Write the part of the block that is before buffer */ + length= (uint) (info->pos_in_file - pos); + if (my_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP)) + info->error=error=-1; + Buffer+=length; + pos+= length; + Count-= length; + } + + /* Check if we want to write inside the used part of the buffer.*/ + length= (uint) (info->rc_end - info->buffer); + if (pos < info->pos_in_file + length) + { + uint offset= (uint) (pos - info->pos_in_file); + length-=offset; + if (length > Count) + length=Count; + memcpy(info->buffer+offset, Buffer, length); + Buffer+=length; + Count-= length; + /* Fix length of buffer if the new data was larger */ + if (info->buffer+length > info->rc_pos) + info->rc_pos=info->buffer+length; + if (!Count) + return (error); + } + /* Write at the end of the current buffer; This is the normal case */ + if (_my_b_write(info, Buffer, Count)) + error= -1; + return error; +} + + /* Flush write cache */ + +int flush_io_cache(IO_CACHE *info) +{ + uint length; + DBUG_ENTER("flush_io_cache"); + + if (info->type == WRITE_CACHE) + { + if (info->file == -1) + { + if (real_open_cached_file(info)) + DBUG_RETURN((info->error= -1)); + } + if (info->rc_pos != info->buffer) + { + length=(uint) (info->rc_pos - info->buffer); + if (info->seek_not_done) + { /* File touched, do seek */ + VOID(my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0))); + info->seek_not_done=0; + } + info->rc_pos=info->buffer; + info->pos_in_file+=length; + info->rc_end=(info->buffer+info->buffer_length- + (info->pos_in_file & (IO_SIZE-1))); + if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP)) + DBUG_RETURN((info->error= -1)); + DBUG_RETURN(0); + } + } +#ifdef HAVE_AIOWAIT + else if (info->type != READ_NET) + { + my_aiowait(&info->aio_result); /* Wait for outstanding req */ + info->inited=0; + } +#endif + DBUG_RETURN(0); +} + + +int end_io_cache(IO_CACHE *info) +{ + int error=0; + DBUG_ENTER("end_io_cache"); + if (info->buffer) + { + if (info->file != -1) /* File doesn't exist */ + error=flush_io_cache(info); + my_free((gptr) info->buffer,MYF(MY_WME)); + info->buffer=info->rc_pos=(byte*) 0; + } + DBUG_RETURN(error); +} /* end_io_cache */ + +} |