diff options
author | unknown <monty@bitch.mysql.fi> | 2001-11-28 02:55:52 +0200 |
---|---|---|
committer | unknown <monty@bitch.mysql.fi> | 2001-11-28 02:55:52 +0200 |
commit | 06e1e27557cc6788fafd423eea6ca5073dce892a (patch) | |
tree | 40e5d73c964ca6aeb5e0e7d4294e70216edaa397 /mysys/mf_iocache.c | |
parent | f33fb18677d673f1bcb2d617355ea3e2dbb04f48 (diff) | |
download | mariadb-git-06e1e27557cc6788fafd423eea6ca5073dce892a.tar.gz |
New improved IO_CACHE
include/my_global.h:
Remove dbug_assert() (One should use DBUG_ASSERT() instead)
mysql-test/mysql-test-run.sh:
Fixed that xterm works on Solaris 2.8.
Fixed printing of errors
mysql-test/r/isam.result:
Removed MyISAM test from ISAM test
mysql-test/t/isam.test:
Removed MyISAM test from ISAM test
mysys/my_alloc.c:
Removed warnings
mysys/my_bitmap.c:
Use DBUG_ASSERT
mysys/my_pthread.c:
Use DBUG_ASSERT
mysys/my_seek.c:
More DBUG
sql/ha_berkeley.cc:
Use DBUG_ASSERT
sql/ha_innobase.cc:
Use DBUG_ASSERT
sql/log.cc:
Use DBUG_ASSERT
sql/opt_range.cc:
Use DBUG_ASSERT
sql/sql_base.cc:
Use DBUG_ASSERT
sql/sql_handler.cc:
Use DBUG_ASSERT
sql/sql_load.cc:
Cleanup
sql/sql_parse.cc:
Use DBUG_ASSERT
sql/sql_repl.cc:
Cleanup
sql/sql_select.cc:
Use DBUG_ASSERT
tools/mysqlmanager.c:
Cleanup
Diffstat (limited to 'mysys/mf_iocache.c')
-rw-r--r-- | mysys/mf_iocache.c | 584 |
1 files changed, 322 insertions, 262 deletions
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index f29df74b651..b0868851177 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -1,15 +1,15 @@ /* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB - + This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. - + This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. - + You should have received a copy of the GNU Library General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, @@ -23,12 +23,30 @@ Possibly use of asyncronic io. macros for read and writes for faster io. Used instead of FILE when reading or writing whole files. - This will make mf_rec_cache obsolete. + This code makes mf_rec_cache obsolete (currently only used by ISAM) One can change info->pos_in_file to a higher value to skip bytes in file if - also info->rc_pos is set to info->rc_end. + also info->read_pos is set to info->read_end. If called through open_cached_file(), then the temporary file will only be created if a write exeeds the file buffer or if one calls - flush_io_cache(). + flush_io_cache(). + + If one uses SEQ_READ_APPEND, then two buffers are allocated, one for + reading and another for writing. Reads are first done from disk and + then done from the write buffer. This is an efficient way to read + from a log file when one is writing to it at the same time. + For this to work, the file has to be opened in append mode! + Note that when one uses SEQ_READ_APPEND, one MUST write using + my_b_append ! This is needed because we need to lock the mutex + every time we access the write buffer. + +TODO: + When one SEQ_READ_APPEND and we are reading and writing at the same time, + each time the write buffer gets full and it's written to disk, we will + always do a disk read to read a part of the buffer from disk to the + read buffer. + This should be fixed so that when we do a flush_io_cache() and + we have been reading the write buffer, we should transfer the rest of the + write buffer to the read buffer before we start to reuse it. */ #define MAP_TO_USE_RAID @@ -41,18 +59,20 @@ static void my_aiowait(my_aio_result *result); #include <assert.h> #include <errno.h> -#ifdef MAIN -#include <my_dir.h> +#ifdef THREAD +#define lock_append_buffer(info) \ + pthread_mutex_lock(&(info)->append_buffer_lock) +#define unlock_append_buffer(info) \ + pthread_mutex_unlock(&(info)->append_buffer_lock) +#else +#define lock_append_buffer(info) +#define unlock_append_buffer(info) #endif -static void init_read_function(IO_CACHE* info, enum cache_type type); -static void init_write_function(IO_CACHE* info, enum cache_type type); - -static void init_read_function(IO_CACHE* info, enum cache_type type) +static void +init_functions(IO_CACHE* info, enum cache_type type) { - switch (type) - { -#ifndef MYSQL_CLIENT + switch (type) { case READ_NET: /* must be initialized by the caller. The problem is that _my_b_net_read has to be defined in sql directory because of @@ -61,24 +81,25 @@ static void init_read_function(IO_CACHE* info, enum cache_type type) as myisamchk */ break; -#endif case SEQ_READ_APPEND: info->read_function = _my_b_seq_read; + info->write_function = 0; /* Force a core if used */ break; default: info->read_function = _my_b_read; + info->write_function = _my_b_write; } -} -static void init_write_function(IO_CACHE* info, enum cache_type type) -{ - switch (type) + /* Ensure that my_b_tell() and my_b_bytes_in_cache works */ + if (type == WRITE_CACHE) { - case SEQ_READ_APPEND: - info->write_function = _my_b_append; - break; - default: - info->write_function = _my_b_write; + info->current_pos= &info->write_pos; + info->current_end= &info->write_end; + } + else + { + info->current_pos= &info->read_pos; + info->current_end= &info->read_end; } } @@ -93,62 +114,61 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, pbool use_async_io, myf cache_myflags) { uint min_cache; + my_off_t end_of_file= ~(my_off_t) 0; DBUG_ENTER("init_io_cache"); DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset)); - /* There is no file in net_reading */ info->file= file; + info->type=type; + info->pos_in_file= seek_offset; info->pre_close = info->pre_read = info->post_read = 0; info->arg = 0; + info->alloced_buffer = 0; + info->buffer=0; + info->seek_not_done= test(file >= 0); + if (!cachesize) if (! (cachesize= my_default_record_cache_size)) DBUG_RETURN(1); /* No cache requested */ min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2; - info->alloced_buffer = 0; if (type == READ_CACHE || type == SEQ_READ_APPEND) { /* Assume file isn't growing */ - if (cache_myflags & MY_DONT_CHECK_FILESIZE) + if (!(cache_myflags & MY_DONT_CHECK_FILESIZE)) { - cache_myflags &= ~MY_DONT_CHECK_FILESIZE; - } - else - { - my_off_t file_pos,end_of_file; - if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR)) - DBUG_RETURN(1); + /* Calculate end of file to not allocate to big buffers */ end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0)); if (end_of_file < seek_offset) end_of_file=seek_offset; - VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0))); - /* Trim cache size if the file is very small. - However, we should not do this with SEQ_READ_APPEND cache - */ - if (type != SEQ_READ_APPEND && - (my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1) + /* Trim cache size if the file is very small */ + if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1) { cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1; use_async_io=0; /* No need to use async */ } } } - if ((int) type < (int) READ_NET) + cache_myflags &= ~MY_DONT_CHECK_FILESIZE; + if (type != READ_NET && type != WRITE_NET) { - uint buffer_block; + /* Retry allocating memory in smaller blocks until we get one */ for (;;) { - buffer_block = cachesize=(uint) ((ulong) (cachesize + min_cache-1) & + uint buffer_block; + cachesize=(uint) ((ulong) (cachesize + min_cache-1) & (ulong) ~(min_cache-1)); - if (type == SEQ_READ_APPEND) - buffer_block *= 2; if (cachesize < min_cache) cachesize = min_cache; + buffer_block = cachesize; + if (type == SEQ_READ_APPEND) + buffer_block *= 2; if ((info->buffer= (byte*) my_malloc(buffer_block, MYF((cache_myflags & ~ MY_WME) | (cachesize == min_cache ? MY_WME : 0)))) != 0) { + info->write_buffer=info->buffer; if (type == SEQ_READ_APPEND) - info->append_buffer = info->buffer + cachesize; + info->write_buffer = info->buffer + cachesize; info->alloced_buffer=1; break; /* Enough memory found */ } @@ -157,45 +177,30 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */ } } - else - info->buffer=0; - + DBUG_PRINT("info",("init_io_cache: cachesize = %u",cachesize)); - info->pos_in_file= seek_offset; info->read_length=info->buffer_length=cachesize; - info->seek_not_done= test(file >= 0 && type != READ_FIFO && - type != READ_NET); info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP); - info->rc_request_pos=info->rc_pos= info->write_pos = info->buffer; - info->write_pos = info->write_end = 0; + info->request_pos= info->read_pos= info->write_pos = info->buffer; if (type == SEQ_READ_APPEND) { - info->append_read_pos = info->write_pos = info->append_buffer; - info->write_end = info->append_end = - info->append_buffer + info->buffer_length; -#ifdef THREAD + info->append_read_pos = info->write_pos = info->write_buffer; + info->write_end = info->write_buffer + info->buffer_length; +#ifdef THREAD pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST); -#endif +#endif } - if (type == READ_CACHE || type == SEQ_READ_APPEND || - type == READ_NET || type == READ_FIFO) - { - info->rc_end=info->buffer; /* Nothing in cache */ - } - else /* type == WRITE_CACHE */ - { + if (type == WRITE_CACHE) info->write_end= info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1)); - info->write_pos = info->buffer; - } - /* end_of_file may be changed by user later */ - info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0 - : ~(my_off_t) 0); - info->type=type; + else + info->read_end=info->buffer; /* Nothing in cache */ + + /* End_of_file may be changed by user later */ + info->end_of_file= end_of_file; info->error=0; - init_read_function(info,type); - init_write_function(info,type); + init_functions(info,type); #ifdef HAVE_AIOWAIT if (use_async_io && ! my_disable_async_io) { @@ -236,8 +241,13 @@ static void my_aiowait(my_aio_result *result) } #endif - /* Use this to reset cache to start or other type */ - /* Some simple optimizing is done when reinit in current buffer */ + +/* + Use this to reset cache to re-start reading or to change the type + between READ_CACHE <-> WRITE_CACHE + If we are doing a reinit of a cache where we have the start of the file + in the cache, we are reusing this memory without flushing it to disk. +*/ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, my_off_t seek_offset, @@ -245,33 +255,37 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, pbool clear_cache) { DBUG_ENTER("reinit_io_cache"); + DBUG_PRINT("enter",("type: %d seek_offset: %lu clear_cache: %d", + type, (ulong) seek_offset, (int) clear_cache)); - info->seek_not_done= test(info->file >= 0); /* Seek not done */ + /* One can't do reinit with the following types */ + DBUG_ASSERT(type != READ_NET && info->type != READ_NET && + type != WRITE_NET && info->type != WRITE_NET && + type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND); /* If the whole file is in memory, avoid flushing to disk */ if (! clear_cache && seek_offset >= info->pos_in_file && - seek_offset <= info->pos_in_file + - (uint) (info->rc_end - info->rc_request_pos)) - { /* use current buffer */ + seek_offset <= my_b_tell(info)) + { + /* Reuse current buffer without flushing it to disk */ + byte *pos; if (info->type == WRITE_CACHE && type == READ_CACHE) { - info->rc_end=info->write_pos; + info->read_end=info->write_pos; info->end_of_file=my_b_tell(info); } else if (type == WRITE_CACHE) { if (info->type == READ_CACHE) - { - info->write_end=info->buffer+info->buffer_length; - info->write_pos=info->rc_pos; - } + info->write_end=info->write_buffer+info->buffer_length; info->end_of_file = ~(my_off_t) 0; } + pos=info->request_pos+(seek_offset-info->pos_in_file); if (type == WRITE_CACHE) - info->write_pos=info->rc_request_pos+(seek_offset-info->pos_in_file); - else - info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file); + info->write_pos=pos; + else + info->read_pos= pos; #ifdef HAVE_AIOWAIT my_aiowait(&info->aio_result); /* Wait for outstanding req */ #endif @@ -284,50 +298,35 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, */ if (info->type == WRITE_CACHE && type == READ_CACHE) info->end_of_file=my_b_tell(info); - /* No need to flush cache if we want to reuse it */ - if ((type != WRITE_CACHE || !clear_cache) && flush_io_cache(info)) + /* flush cache if we want to reuse it */ + if (!clear_cache && flush_io_cache(info)) DBUG_RETURN(1); - if (info->pos_in_file != seek_offset) - { - info->pos_in_file=seek_offset; - info->seek_not_done=1; - } - info->rc_request_pos=info->rc_pos=info->buffer; - if (type == READ_CACHE || type == READ_NET || type == READ_FIFO) + info->pos_in_file=seek_offset; + /* Better to do always do a seek */ + info->seek_not_done=1; + info->request_pos=info->read_pos=info->write_pos=info->buffer; + if (type == READ_CACHE) { - info->rc_end=info->buffer; /* Nothing in cache */ + info->read_end=info->buffer; /* Nothing in cache */ } else { - info->rc_end=info->buffer+info->buffer_length- - (seek_offset & (IO_SIZE-1)); - info->end_of_file= ((type == READ_NET || type == READ_FIFO) ? 0 : - ~(my_off_t) 0); + info->write_end=(info->buffer + info->buffer_length - + (seek_offset & (IO_SIZE-1))); + info->end_of_file= ~(my_off_t) 0; } } - if (info->type == SEQ_READ_APPEND) - { - info->append_read_pos = info->write_pos = info->append_buffer; - } - if (!info->write_pos) - info->write_pos = info->buffer; - if (!info->write_end) - info->write_end = info->buffer+info->buffer_length- - (seek_offset & (IO_SIZE-1)); info->type=type; info->error=0; - init_read_function(info,type); - init_write_function(info,type); + init_functions(info,type); + #ifdef HAVE_AIOWAIT - if (type != READ_NET) + if (use_async_io && ! my_disable_async_io && + ((ulong) info->buffer_length < + (ulong) (info->end_of_file - seek_offset))) { - if (use_async_io && ! my_disable_async_io && - ((ulong) info->buffer_length < - (ulong) (info->end_of_file - seek_offset))) - { - info->read_length=info->buffer_length/2; - info->read_function=_my_b_async_read; - } + info->read_length=info->buffer_length/2; + info->read_function=_my_b_async_read; } info->inited=0; #endif @@ -336,28 +335,30 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, - /* - Read buffered. Returns 1 if can't read requested characters - This function is only called from the my_b_read() macro - when there isn't enough characters in the buffer to - satisfy the request. - Returns 0 we succeeded in reading all data - */ +/* + Read buffered. Returns 1 if can't read requested characters + This function is only called from the my_b_read() macro + when there isn't enough characters in the buffer to + satisfy the request. + Returns 0 we succeeded in reading all data +*/ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) { uint length,diff_length,left_length; my_off_t max_length, pos_in_file; - - if ((left_length=(uint) (info->rc_end-info->rc_pos))) + DBUG_ENTER("_my_b_read"); + + if ((left_length=(uint) (info->read_end-info->read_pos))) { - dbug_assert(Count >= left_length); /* User is not using my_b_read() */ - memcpy(Buffer,info->rc_pos, (size_t) (left_length)); + DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */ + memcpy(Buffer,info->read_pos, (size_t) (left_length)); Buffer+=left_length; Count-=left_length; } + /* pos_in_file always point on where info->buffer was read */ - pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer); + pos_in_file=info->pos_in_file+(uint) (info->read_end - info->buffer); if (info->seek_not_done) { /* File touched, do seek */ VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); @@ -370,7 +371,7 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) if (info->end_of_file == pos_in_file) { /* End of file */ info->error=(int) left_length; - return 1; + DBUG_RETURN(1); } length=(Count & (uint) ~(IO_SIZE-1))-diff_length; if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) @@ -378,7 +379,7 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) { info->error= read_length == (uint) -1 ? -1 : (int) (read_length+left_length); - return 1; + DBUG_RETURN(1); } Count-=length; Buffer+=length; @@ -386,16 +387,17 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) left_length+=length; diff_length=0; } + max_length=info->read_length-diff_length; if (info->type != READ_FIFO && - (info->end_of_file - pos_in_file) < max_length) + max_length > (info->end_of_file - pos_in_file)) max_length = info->end_of_file - pos_in_file; if (!max_length) { if (Count) { info->error= left_length; /* We only got this many char */ - return 1; + DBUG_RETURN(1); } length=0; /* Didn't read any chars */ } @@ -406,20 +408,22 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) if (length != (uint) -1) memcpy(Buffer,info->buffer,(size_t) length); info->error= length == (uint) -1 ? -1 : (int) (length+left_length); - return 1; + info->read_pos=info->read_end=info->buffer; + DBUG_RETURN(1); } - info->rc_pos=info->buffer+Count; - info->rc_end=info->buffer+length; + info->read_pos=info->buffer+Count; + info->read_end=info->buffer+length; info->pos_in_file=pos_in_file; memcpy(Buffer,info->buffer,(size_t) Count); - return 0; + DBUG_RETURN(0); } -/* Do sequential read from the SEQ_READ_APPEND cache - we do this in three stages: - - first read from info->buffer - - then if there are still data to read, try the file descriptor - - afterwards, if there are still data to read, try append buffer +/* + Do sequential read from the SEQ_READ_APPEND cache + we do this in three stages: + - first read from info->buffer + - then if there are still data to read, try the file descriptor + - afterwards, if there are still data to read, try append buffer */ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) @@ -427,97 +431,127 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) uint length,diff_length,left_length,save_count; my_off_t max_length, pos_in_file; save_count=Count; + /* first, read the regular buffer */ - if ((left_length=(uint) (info->rc_end-info->rc_pos))) + if ((left_length=(uint) (info->read_end-info->read_pos))) { - dbug_assert(Count >= left_length); /* User is not using my_b_read() */ - memcpy(Buffer,info->rc_pos, (size_t) (left_length)); + DBUG_ASSERT(Count > left_length); /* User is not using my_b_read() */ + memcpy(Buffer,info->read_pos, (size_t) (left_length)); Buffer+=left_length; Count-=left_length; } + lock_append_buffer(info); + /* pos_in_file always point on where info->buffer was read */ - if ((pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer)) >= + if ((pos_in_file=info->pos_in_file+(uint) (info->read_end - info->buffer)) >= info->end_of_file) - { - info->pos_in_file=pos_in_file; goto read_append_buffer; + + if (info->seek_not_done) + { /* File touched, do seek */ + VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); + info->seek_not_done=0; } - /* no need to seek since the read is guaranteed to be sequential */ diff_length=(uint) (pos_in_file & (IO_SIZE-1)); - + /* now the second stage begins - read from file descriptor */ if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length))) { /* Fill first intern buffer */ uint read_length; - if (info->end_of_file == pos_in_file) - { /* End of file */ - goto read_append_buffer; - } + length=(Count & (uint) ~(IO_SIZE-1))-diff_length; - if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) - != (uint) length) + if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) == + (uint)-1) { - if (read_length != (uint)-1) - { - Count -= read_length; - Buffer += read_length; - } + info->error= -1; + unlock_append_buffer(info); + return 1; + } + Count-=read_length; + Buffer+=read_length; + pos_in_file+=read_length; + + if (read_length != (uint) length) + { + /* + We only got part of data; Read the rest of the data from the + write buffer + */ goto read_append_buffer; } - Count-=length; - Buffer+=length; - pos_in_file+=length; left_length+=length; diff_length=0; } + max_length=info->read_length-diff_length; - if ((info->end_of_file - pos_in_file) < max_length) + if (max_length > (info->end_of_file - pos_in_file)) max_length = info->end_of_file - pos_in_file; if (!max_length) { if (Count) - { goto read_append_buffer; - } - length=0; /* Didn't read any chars */ + length=0; /* Didn't read any more chars */ } - else if ((length=my_read(info->file,info->buffer,(uint) max_length, - info->myflags)) < Count || - length == (uint) -1) + else { - if (length != (uint) -1) + length=my_read(info->file,info->buffer,(uint) max_length, + info->myflags); + if (length == (uint) -1) + { + info->error= -1; + unlock_append_buffer(info); + return 1; + } + if (length < Count) { memcpy(Buffer,info->buffer,(size_t) length); Count -= length; Buffer += length; + goto read_append_buffer; } - goto read_append_buffer; } - info->rc_pos=info->buffer+Count; - info->rc_end=info->buffer+length; + unlock_append_buffer(info); + info->read_pos=info->buffer+Count; + info->read_end=info->buffer+length; info->pos_in_file=pos_in_file; memcpy(Buffer,info->buffer,(size_t) Count); return 0; + read_append_buffer: - lock_append_buffer(info); - if (!Count) return 0; + + /* + Read data from the current write buffer. + Count should never be == 0 here (The code will work even if count is 0) + */ + { - uint copy_len = (uint)(info->append_read_pos - - info->write_pos); - dbug_assert(info->append_read_pos <= info->write_pos); - if (copy_len > Count) - copy_len = Count; - memcpy(Buffer, info->append_read_pos, - copy_len); + /* First copy the data to Count */ + uint len_in_buff = (uint) (info->write_pos - info->append_read_pos); + uint copy_len; + + DBUG_ASSERT(info->append_read_pos <= info->write_pos); + DBUG_ASSERT(pos_in_file == info->end_of_file); + + copy_len=min(Count, len_in_buff); + memcpy(Buffer, info->append_read_pos, copy_len); info->append_read_pos += copy_len; Count -= copy_len; if (Count) - info->error = save_count - Count; + info->error = save_count - Count; + + /* Fill read buffer with data from write buffer */ + memcpy(info->buffer, info->append_read_pos, + (size_t) (len_in_buff - copy_len)); + info->read_pos= info->buffer; + info->read_end= info->buffer+(len_in_buff - copy_len); + info->append_read_pos=info->write_pos; + info->pos_in_file+=len_in_buff; } unlock_append_buffer(info); return Count ? 1 : 0; } + #ifdef HAVE_AIOWAIT int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count) @@ -527,8 +561,8 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count) my_off_t next_pos_in_file; byte *read_buffer; - memcpy(Buffer,info->rc_pos, - (size_t) (left_length=(uint) (info->rc_end-info->rc_pos))); + memcpy(Buffer,info->read_pos, + (size_t) (left_length=(uint) (info->read_end-info->read_pos))); Buffer+=left_length; org_Count=Count; Count-=left_length; @@ -555,13 +589,13 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count) (int) (read_length+left_length)); return(1); } - info->pos_in_file+=(uint) (info->rc_end - info->rc_request_pos); + info->pos_in_file+=(uint) (info->read_end - info->request_pos); - if (info->rc_request_pos != info->buffer) - info->rc_request_pos=info->buffer; + if (info->request_pos != info->buffer) + info->request_pos=info->buffer; else - info->rc_request_pos=info->buffer+info->read_length; - info->rc_pos=info->rc_request_pos; + info->request_pos=info->buffer+info->read_length; + info->read_pos=info->request_pos; next_pos_in_file=info->aio_read_pos+read_length; /* Check if pos_in_file is changed @@ -578,8 +612,8 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count) { my_off_t offset= (info->pos_in_file - info->aio_read_pos); info->pos_in_file=info->aio_read_pos; /* Whe are here */ - info->rc_pos=info->rc_request_pos+offset; - read_length-=offset; /* Bytes left from rc_pos */ + info->read_pos=info->request_pos+offset; + read_length-=offset; /* Bytes left from read_pos */ } } #ifndef DBUG_OFF @@ -591,16 +625,16 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count) #endif /* Copy found bytes to buffer */ length=min(Count,read_length); - memcpy(Buffer,info->rc_pos,(size_t) length); + memcpy(Buffer,info->read_pos,(size_t) length); Buffer+=length; Count-=length; left_length+=length; - info->rc_end=info->rc_pos+read_length; - info->rc_pos+=length; + info->read_end=info->rc_pos+read_length; + info->read_pos+=length; } else next_pos_in_file=(info->pos_in_file+ (uint) - (info->rc_end - info->rc_request_pos)); + (info->read_end - info->request_pos)); /* If reading large blocks, or first read or read with skipp */ if (Count) @@ -614,13 +648,13 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count) read_length=IO_SIZE*2- (uint) (next_pos_in_file & (IO_SIZE-1)); if (Count < read_length) { /* Small block, read to cache */ - if ((read_length=my_read(info->file,info->rc_request_pos, + if ((read_length=my_read(info->file,info->request_pos, read_length, info->myflags)) == (uint) -1) return info->error= -1; use_length=min(Count,read_length); - memcpy(Buffer,info->rc_request_pos,(size_t) use_length); - info->rc_pos=info->rc_request_pos+Count; - info->rc_end=info->rc_request_pos+read_length; + memcpy(Buffer,info->request_pos,(size_t) use_length); + info->read_pos=info->request_pos+Count; + info->read_end=info->request_pos+read_length; info->pos_in_file=next_pos_in_file; /* Start of block in cache */ next_pos_in_file+=read_length; @@ -641,7 +675,7 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count) info->error= read_length == (uint) -1 ? -1 : read_length+left_length; return 1; } - info->rc_pos=info->rc_end=info->rc_request_pos; + info->read_pos=info->read_end=info->request_pos; info->pos_in_file=(next_pos_in_file+=Count); } } @@ -652,7 +686,7 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count) if (max_length > (my_off_t) info->read_length - diff_length) max_length= (my_off_t) info->read_length - diff_length; - if (info->rc_request_pos != info->buffer) + if (info->request_pos != info->buffer) read_buffer=info->buffer; else read_buffer=info->buffer+info->read_length; @@ -669,13 +703,13 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count) my_errno=errno; DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped", errno, info->aio_result.result.aio_errno)); - if (info->rc_request_pos != info->buffer) + if (info->request_pos != info->buffer) { - bmove(info->buffer,info->rc_request_pos, - (uint) (info->rc_end - info->rc_pos)); - info->rc_request_pos=info->buffer; - info->rc_pos-=info->read_length; - info->rc_end-=info->read_length; + bmove(info->buffer,info->request_pos, + (uint) (info->read_end - info->read_pos)); + info->request_pos=info->buffer; + info->read_pos-=info->read_length; + info->read_end-=info->read_length; } info->read_length=info->buffer_length; /* Use hole buffer */ info->read_function=_my_b_read; /* Use normal IO_READ next */ @@ -709,16 +743,17 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) { uint rest_length,length; + if (info->pos_in_file+info->buffer_length > info->end_of_file) + { + my_errno=errno=EFBIG; + return info->error = -1; + } + rest_length=(uint) (info->write_end - info->write_pos); memcpy(info->write_pos,Buffer,(size_t) rest_length); Buffer+=rest_length; Count-=rest_length; info->write_pos+=rest_length; - if (info->pos_in_file+info->buffer_length > info->end_of_file) - { - my_errno=errno=EFBIG; - return info->error = -1; - } if (flush_io_cache(info)) return 1; if (Count >= IO_SIZE) @@ -740,12 +775,21 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) return 0; } -int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count) + +/* + Append a block to the write buffer. + This is done with the buffer locked to ensure that we don't read from + the write buffer before we are ready with it. +*/ + +int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count) { uint rest_length,length; - rest_length=(uint) (info->append_end - - info->write_pos); + lock_append_buffer(info); + rest_length=(uint) (info->write_end - info->write_pos); + if (Count <= rest_length) + goto end; memcpy(info->write_pos,Buffer,(size_t) rest_length); Buffer+=rest_length; Count-=rest_length; @@ -760,8 +804,11 @@ int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count) Count-=length; Buffer+=length; } + +end: memcpy(info->write_pos,Buffer,(size_t) Count); info->write_pos+=Count; + unlock_append_buffer(info); return 0; } @@ -791,10 +838,13 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count, Buffer+=length; pos+= length; Count-= length; +#ifndef HAVE_PREAD + info->seek_not_done=1; +#endif } /* Check if we want to write inside the used part of the buffer.*/ - length= (uint) (info->rc_end - info->buffer); + length= (uint) (info->write_end - info->buffer); if (pos < info->pos_in_file + length) { uint offset= (uint) (pos - info->pos_in_file); @@ -816,20 +866,16 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count, return error; } -/* avoid warning about empty if body */ -#ifdef THREAD -#define IF_APPEND_CACHE if (append_cache) -#else -#define IF_APPEND_CACHE -#endif /* Flush write cache */ int flush_io_cache(IO_CACHE *info) { uint length; - int append_cache; + my_bool append_cache; + my_off_t pos_in_file; DBUG_ENTER("flush_io_cache"); + append_cache = (info->type == SEQ_READ_APPEND); if (info->type == WRITE_CACHE || append_cache) { @@ -838,44 +884,45 @@ int flush_io_cache(IO_CACHE *info) if (real_open_cached_file(info)) DBUG_RETURN((info->error= -1)); } - IF_APPEND_CACHE - lock_append_buffer(info); - if (info->write_pos != info->buffer) + if ((length=(uint) (info->write_pos - info->write_buffer))) { - length=(uint) (info->write_pos - info->buffer); + pos_in_file=info->pos_in_file; + if (append_cache) + { + pos_in_file=info->end_of_file; + info->seek_not_done=1; + } if (info->seek_not_done) { /* File touched, do seek */ - if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)) == + if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR) { - IF_APPEND_CACHE - unlock_append_buffer(info); DBUG_RETURN((info->error= -1)); } - info->seek_not_done=0; - } - info->write_pos=info->buffer; - info->pos_in_file+=length; - info->write_end=(info->buffer+info->buffer_length- - (info->pos_in_file & (IO_SIZE-1))); - if (append_cache) - { - info->append_read_pos = info->buffer; - info->append_end = info->write_end; + if (!append_cache) + info->seek_not_done=0; } - if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP)) - info->error= -1; + info->write_pos= info->write_buffer; + if (!append_cache) + info->pos_in_file+=length; + info->write_end= (info->write_buffer+info->buffer_length- + ((pos_in_file+length) & (IO_SIZE-1))); + + /* Set this to be used if we are using SEQ_READ_APPEND */ + info->append_read_pos = info->write_buffer; + if (my_write(info->file,info->write_buffer,length, + info->myflags | MY_NABP)) + info->error= -1; else info->error= 0; - IF_APPEND_CACHE - unlock_append_buffer(info); + set_if_bigger(info->end_of_file,(pos_in_file+length)); DBUG_RETURN(info->error); } } #ifdef HAVE_AIOWAIT else if (info->type != READ_NET) { - my_aiowait(&info->aio_result); /* Wait for outstanding req */ + my_aiowait(&info->aio_result); /* Wait for outstanding req */ info->inited=0; } #endif @@ -888,7 +935,8 @@ int end_io_cache(IO_CACHE *info) int error=0; IO_CACHE_CALLBACK pre_close; DBUG_ENTER("end_io_cache"); - if((pre_close=info->pre_close)) + + if ((pre_close=info->pre_close)) (*pre_close)(info); if (info->alloced_buffer) { @@ -896,13 +944,28 @@ int end_io_cache(IO_CACHE *info) if (info->file != -1) /* File doesn't exist */ error=flush_io_cache(info); my_free((gptr) info->buffer,MYF(MY_WME)); - info->buffer=info->rc_pos=(byte*) 0; - info->alloced_buffer = 0; + info->buffer=info->read_pos=(byte*) 0; + } + if (info->type == SEQ_READ_APPEND) + { + /* Destroy allocated mutex */ + info->type=0; +#ifdef THREAD + pthread_mutex_destroy(&info->append_buffer_lock); +#endif } DBUG_RETURN(error); } /* end_io_cache */ + +/********************************************************************** + Testing of MF_IOCACHE +**********************************************************************/ + #ifdef MAIN + +#include <my_dir.h> + void die(const char* fmt, ...) { va_list va_args; @@ -916,7 +979,7 @@ void die(const char* fmt, ...) int open_file(const char* fname, IO_CACHE* info, int cache_size) { int fd; - if ((fd=my_open(fname,O_CREAT|O_APPEND|O_RDWR,MYF(MY_WME))) < 0) + if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0) die("Could not open %s", fname); if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME))) die("failed in init_io_cache()"); @@ -960,8 +1023,8 @@ int main(int argc, char** argv) char buf[4]; int block_size = abs(rand() % max_block); int4store(buf, block_size); - if (my_b_write(&sra_cache,buf,4) || - my_b_write(&sra_cache, block, block_size)) + if (my_b_append(&sra_cache,buf,4) || + my_b_append(&sra_cache, block, block_size)) die("write failed"); total_bytes += 4+block_size; } @@ -985,6 +1048,3 @@ supposedly written\n"); return 0; } #endif - - - |