summaryrefslogtreecommitdiff
path: root/mysys/mf_iocache.c
diff options
context:
space:
mode:
authorunknown <monty@bitch.mysql.fi>2001-11-28 02:55:52 +0200
committerunknown <monty@bitch.mysql.fi>2001-11-28 02:55:52 +0200
commit06e1e27557cc6788fafd423eea6ca5073dce892a (patch)
tree40e5d73c964ca6aeb5e0e7d4294e70216edaa397 /mysys/mf_iocache.c
parentf33fb18677d673f1bcb2d617355ea3e2dbb04f48 (diff)
downloadmariadb-git-06e1e27557cc6788fafd423eea6ca5073dce892a.tar.gz
New improved IO_CACHE
include/my_global.h: Remove dbug_assert() (One should use DBUG_ASSERT() instead) mysql-test/mysql-test-run.sh: Fixed that xterm works on Solaris 2.8. Fixed printing of errors mysql-test/r/isam.result: Removed MyISAM test from ISAM test mysql-test/t/isam.test: Removed MyISAM test from ISAM test mysys/my_alloc.c: Removed warnings mysys/my_bitmap.c: Use DBUG_ASSERT mysys/my_pthread.c: Use DBUG_ASSERT mysys/my_seek.c: More DBUG sql/ha_berkeley.cc: Use DBUG_ASSERT sql/ha_innobase.cc: Use DBUG_ASSERT sql/log.cc: Use DBUG_ASSERT sql/opt_range.cc: Use DBUG_ASSERT sql/sql_base.cc: Use DBUG_ASSERT sql/sql_handler.cc: Use DBUG_ASSERT sql/sql_load.cc: Cleanup sql/sql_parse.cc: Use DBUG_ASSERT sql/sql_repl.cc: Cleanup sql/sql_select.cc: Use DBUG_ASSERT tools/mysqlmanager.c: Cleanup
Diffstat (limited to 'mysys/mf_iocache.c')
-rw-r--r--mysys/mf_iocache.c584
1 files changed, 322 insertions, 262 deletions
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index f29df74b651..b0868851177 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -1,15 +1,15 @@
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
-
+
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
-
+
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
-
+
You should have received a copy of the GNU Library General Public
License along with this library; if not, write to the Free
Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
@@ -23,12 +23,30 @@
Possibly use of asyncronic io.
macros for read and writes for faster io.
Used instead of FILE when reading or writing whole files.
- This will make mf_rec_cache obsolete.
+ This code makes mf_rec_cache obsolete (currently only used by ISAM)
One can change info->pos_in_file to a higher value to skip bytes in file if
- also info->rc_pos is set to info->rc_end.
+ also info->read_pos is set to info->read_end.
If called through open_cached_file(), then the temporary file will
only be created if a write exeeds the file buffer or if one calls
- flush_io_cache().
+ flush_io_cache().
+
+ If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
+ reading and another for writing. Reads are first done from disk and
+ then done from the write buffer. This is an efficient way to read
+ from a log file when one is writing to it at the same time.
+ For this to work, the file has to be opened in append mode!
+ Note that when one uses SEQ_READ_APPEND, one MUST write using
+ my_b_append ! This is needed because we need to lock the mutex
+ every time we access the write buffer.
+
+TODO:
+ When one SEQ_READ_APPEND and we are reading and writing at the same time,
+ each time the write buffer gets full and it's written to disk, we will
+ always do a disk read to read a part of the buffer from disk to the
+ read buffer.
+ This should be fixed so that when we do a flush_io_cache() and
+ we have been reading the write buffer, we should transfer the rest of the
+ write buffer to the read buffer before we start to reuse it.
*/
#define MAP_TO_USE_RAID
@@ -41,18 +59,20 @@ static void my_aiowait(my_aio_result *result);
#include <assert.h>
#include <errno.h>
-#ifdef MAIN
-#include <my_dir.h>
+#ifdef THREAD
+#define lock_append_buffer(info) \
+ pthread_mutex_lock(&(info)->append_buffer_lock)
+#define unlock_append_buffer(info) \
+ pthread_mutex_unlock(&(info)->append_buffer_lock)
+#else
+#define lock_append_buffer(info)
+#define unlock_append_buffer(info)
#endif
-static void init_read_function(IO_CACHE* info, enum cache_type type);
-static void init_write_function(IO_CACHE* info, enum cache_type type);
-
-static void init_read_function(IO_CACHE* info, enum cache_type type)
+static void
+init_functions(IO_CACHE* info, enum cache_type type)
{
- switch (type)
- {
-#ifndef MYSQL_CLIENT
+ switch (type) {
case READ_NET:
/* must be initialized by the caller. The problem is that
_my_b_net_read has to be defined in sql directory because of
@@ -61,24 +81,25 @@ static void init_read_function(IO_CACHE* info, enum cache_type type)
as myisamchk
*/
break;
-#endif
case SEQ_READ_APPEND:
info->read_function = _my_b_seq_read;
+ info->write_function = 0; /* Force a core if used */
break;
default:
info->read_function = _my_b_read;
+ info->write_function = _my_b_write;
}
-}
-static void init_write_function(IO_CACHE* info, enum cache_type type)
-{
- switch (type)
+ /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
+ if (type == WRITE_CACHE)
{
- case SEQ_READ_APPEND:
- info->write_function = _my_b_append;
- break;
- default:
- info->write_function = _my_b_write;
+ info->current_pos= &info->write_pos;
+ info->current_end= &info->write_end;
+ }
+ else
+ {
+ info->current_pos= &info->read_pos;
+ info->current_end= &info->read_end;
}
}
@@ -93,62 +114,61 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
pbool use_async_io, myf cache_myflags)
{
uint min_cache;
+ my_off_t end_of_file= ~(my_off_t) 0;
DBUG_ENTER("init_io_cache");
DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset));
- /* There is no file in net_reading */
info->file= file;
+ info->type=type;
+ info->pos_in_file= seek_offset;
info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
+ info->alloced_buffer = 0;
+ info->buffer=0;
+ info->seek_not_done= test(file >= 0);
+
if (!cachesize)
if (! (cachesize= my_default_record_cache_size))
DBUG_RETURN(1); /* No cache requested */
min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
- info->alloced_buffer = 0;
if (type == READ_CACHE || type == SEQ_READ_APPEND)
{ /* Assume file isn't growing */
- if (cache_myflags & MY_DONT_CHECK_FILESIZE)
+ if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
{
- cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
- }
- else
- {
- my_off_t file_pos,end_of_file;
- if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR))
- DBUG_RETURN(1);
+ /* Calculate end of file to not allocate to big buffers */
end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
if (end_of_file < seek_offset)
end_of_file=seek_offset;
- VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0)));
- /* Trim cache size if the file is very small.
- However, we should not do this with SEQ_READ_APPEND cache
- */
- if (type != SEQ_READ_APPEND &&
- (my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
+ /* Trim cache size if the file is very small */
+ if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
{
cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1;
use_async_io=0; /* No need to use async */
}
}
}
- if ((int) type < (int) READ_NET)
+ cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
+ if (type != READ_NET && type != WRITE_NET)
{
- uint buffer_block;
+ /* Retry allocating memory in smaller blocks until we get one */
for (;;)
{
- buffer_block = cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
+ uint buffer_block;
+ cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
(ulong) ~(min_cache-1));
- if (type == SEQ_READ_APPEND)
- buffer_block *= 2;
if (cachesize < min_cache)
cachesize = min_cache;
+ buffer_block = cachesize;
+ if (type == SEQ_READ_APPEND)
+ buffer_block *= 2;
if ((info->buffer=
(byte*) my_malloc(buffer_block,
MYF((cache_myflags & ~ MY_WME) |
(cachesize == min_cache ? MY_WME : 0)))) != 0)
{
+ info->write_buffer=info->buffer;
if (type == SEQ_READ_APPEND)
- info->append_buffer = info->buffer + cachesize;
+ info->write_buffer = info->buffer + cachesize;
info->alloced_buffer=1;
break; /* Enough memory found */
}
@@ -157,45 +177,30 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */
}
}
- else
- info->buffer=0;
-
+
DBUG_PRINT("info",("init_io_cache: cachesize = %u",cachesize));
- info->pos_in_file= seek_offset;
info->read_length=info->buffer_length=cachesize;
- info->seek_not_done= test(file >= 0 && type != READ_FIFO &&
- type != READ_NET);
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
- info->rc_request_pos=info->rc_pos= info->write_pos = info->buffer;
- info->write_pos = info->write_end = 0;
+ info->request_pos= info->read_pos= info->write_pos = info->buffer;
if (type == SEQ_READ_APPEND)
{
- info->append_read_pos = info->write_pos = info->append_buffer;
- info->write_end = info->append_end =
- info->append_buffer + info->buffer_length;
-#ifdef THREAD
+ info->append_read_pos = info->write_pos = info->write_buffer;
+ info->write_end = info->write_buffer + info->buffer_length;
+#ifdef THREAD
pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
-#endif
+#endif
}
- if (type == READ_CACHE || type == SEQ_READ_APPEND ||
- type == READ_NET || type == READ_FIFO)
- {
- info->rc_end=info->buffer; /* Nothing in cache */
- }
- else /* type == WRITE_CACHE */
- {
+ if (type == WRITE_CACHE)
info->write_end=
info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
- info->write_pos = info->buffer;
- }
- /* end_of_file may be changed by user later */
- info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0
- : ~(my_off_t) 0);
- info->type=type;
+ else
+ info->read_end=info->buffer; /* Nothing in cache */
+
+ /* End_of_file may be changed by user later */
+ info->end_of_file= end_of_file;
info->error=0;
- init_read_function(info,type);
- init_write_function(info,type);
+ init_functions(info,type);
#ifdef HAVE_AIOWAIT
if (use_async_io && ! my_disable_async_io)
{
@@ -236,8 +241,13 @@ static void my_aiowait(my_aio_result *result)
}
#endif
- /* Use this to reset cache to start or other type */
- /* Some simple optimizing is done when reinit in current buffer */
+
+/*
+ Use this to reset cache to re-start reading or to change the type
+ between READ_CACHE <-> WRITE_CACHE
+ If we are doing a reinit of a cache where we have the start of the file
+ in the cache, we are reusing this memory without flushing it to disk.
+*/
my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
my_off_t seek_offset,
@@ -245,33 +255,37 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
pbool clear_cache)
{
DBUG_ENTER("reinit_io_cache");
+ DBUG_PRINT("enter",("type: %d seek_offset: %lu clear_cache: %d",
+ type, (ulong) seek_offset, (int) clear_cache));
- info->seek_not_done= test(info->file >= 0); /* Seek not done */
+ /* One can't do reinit with the following types */
+ DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
+ type != WRITE_NET && info->type != WRITE_NET &&
+ type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
/* If the whole file is in memory, avoid flushing to disk */
if (! clear_cache &&
seek_offset >= info->pos_in_file &&
- seek_offset <= info->pos_in_file +
- (uint) (info->rc_end - info->rc_request_pos))
- { /* use current buffer */
+ seek_offset <= my_b_tell(info))
+ {
+ /* Reuse current buffer without flushing it to disk */
+ byte *pos;
if (info->type == WRITE_CACHE && type == READ_CACHE)
{
- info->rc_end=info->write_pos;
+ info->read_end=info->write_pos;
info->end_of_file=my_b_tell(info);
}
else if (type == WRITE_CACHE)
{
if (info->type == READ_CACHE)
- {
- info->write_end=info->buffer+info->buffer_length;
- info->write_pos=info->rc_pos;
- }
+ info->write_end=info->write_buffer+info->buffer_length;
info->end_of_file = ~(my_off_t) 0;
}
+ pos=info->request_pos+(seek_offset-info->pos_in_file);
if (type == WRITE_CACHE)
- info->write_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
- else
- info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
+ info->write_pos=pos;
+ else
+ info->read_pos= pos;
#ifdef HAVE_AIOWAIT
my_aiowait(&info->aio_result); /* Wait for outstanding req */
#endif
@@ -284,50 +298,35 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
*/
if (info->type == WRITE_CACHE && type == READ_CACHE)
info->end_of_file=my_b_tell(info);
- /* No need to flush cache if we want to reuse it */
- if ((type != WRITE_CACHE || !clear_cache) && flush_io_cache(info))
+ /* flush cache if we want to reuse it */
+ if (!clear_cache && flush_io_cache(info))
DBUG_RETURN(1);
- if (info->pos_in_file != seek_offset)
- {
- info->pos_in_file=seek_offset;
- info->seek_not_done=1;
- }
- info->rc_request_pos=info->rc_pos=info->buffer;
- if (type == READ_CACHE || type == READ_NET || type == READ_FIFO)
+ info->pos_in_file=seek_offset;
+ /* Better to do always do a seek */
+ info->seek_not_done=1;
+ info->request_pos=info->read_pos=info->write_pos=info->buffer;
+ if (type == READ_CACHE)
{
- info->rc_end=info->buffer; /* Nothing in cache */
+ info->read_end=info->buffer; /* Nothing in cache */
}
else
{
- info->rc_end=info->buffer+info->buffer_length-
- (seek_offset & (IO_SIZE-1));
- info->end_of_file= ((type == READ_NET || type == READ_FIFO) ? 0 :
- ~(my_off_t) 0);
+ info->write_end=(info->buffer + info->buffer_length -
+ (seek_offset & (IO_SIZE-1)));
+ info->end_of_file= ~(my_off_t) 0;
}
}
- if (info->type == SEQ_READ_APPEND)
- {
- info->append_read_pos = info->write_pos = info->append_buffer;
- }
- if (!info->write_pos)
- info->write_pos = info->buffer;
- if (!info->write_end)
- info->write_end = info->buffer+info->buffer_length-
- (seek_offset & (IO_SIZE-1));
info->type=type;
info->error=0;
- init_read_function(info,type);
- init_write_function(info,type);
+ init_functions(info,type);
+
#ifdef HAVE_AIOWAIT
- if (type != READ_NET)
+ if (use_async_io && ! my_disable_async_io &&
+ ((ulong) info->buffer_length <
+ (ulong) (info->end_of_file - seek_offset)))
{
- if (use_async_io && ! my_disable_async_io &&
- ((ulong) info->buffer_length <
- (ulong) (info->end_of_file - seek_offset)))
- {
- info->read_length=info->buffer_length/2;
- info->read_function=_my_b_async_read;
- }
+ info->read_length=info->buffer_length/2;
+ info->read_function=_my_b_async_read;
}
info->inited=0;
#endif
@@ -336,28 +335,30 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
- /*
- Read buffered. Returns 1 if can't read requested characters
- This function is only called from the my_b_read() macro
- when there isn't enough characters in the buffer to
- satisfy the request.
- Returns 0 we succeeded in reading all data
- */
+/*
+ Read buffered. Returns 1 if can't read requested characters
+ This function is only called from the my_b_read() macro
+ when there isn't enough characters in the buffer to
+ satisfy the request.
+ Returns 0 we succeeded in reading all data
+*/
int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
uint length,diff_length,left_length;
my_off_t max_length, pos_in_file;
-
- if ((left_length=(uint) (info->rc_end-info->rc_pos)))
+ DBUG_ENTER("_my_b_read");
+
+ if ((left_length=(uint) (info->read_end-info->read_pos)))
{
- dbug_assert(Count >= left_length); /* User is not using my_b_read() */
- memcpy(Buffer,info->rc_pos, (size_t) (left_length));
+ DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
+ memcpy(Buffer,info->read_pos, (size_t) (left_length));
Buffer+=left_length;
Count-=left_length;
}
+
/* pos_in_file always point on where info->buffer was read */
- pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
+ pos_in_file=info->pos_in_file+(uint) (info->read_end - info->buffer);
if (info->seek_not_done)
{ /* File touched, do seek */
VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
@@ -370,7 +371,7 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
if (info->end_of_file == pos_in_file)
{ /* End of file */
info->error=(int) left_length;
- return 1;
+ DBUG_RETURN(1);
}
length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
@@ -378,7 +379,7 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
info->error= read_length == (uint) -1 ? -1 :
(int) (read_length+left_length);
- return 1;
+ DBUG_RETURN(1);
}
Count-=length;
Buffer+=length;
@@ -386,16 +387,17 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
left_length+=length;
diff_length=0;
}
+
max_length=info->read_length-diff_length;
if (info->type != READ_FIFO &&
- (info->end_of_file - pos_in_file) < max_length)
+ max_length > (info->end_of_file - pos_in_file))
max_length = info->end_of_file - pos_in_file;
if (!max_length)
{
if (Count)
{
info->error= left_length; /* We only got this many char */
- return 1;
+ DBUG_RETURN(1);
}
length=0; /* Didn't read any chars */
}
@@ -406,20 +408,22 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
if (length != (uint) -1)
memcpy(Buffer,info->buffer,(size_t) length);
info->error= length == (uint) -1 ? -1 : (int) (length+left_length);
- return 1;
+ info->read_pos=info->read_end=info->buffer;
+ DBUG_RETURN(1);
}
- info->rc_pos=info->buffer+Count;
- info->rc_end=info->buffer+length;
+ info->read_pos=info->buffer+Count;
+ info->read_end=info->buffer+length;
info->pos_in_file=pos_in_file;
memcpy(Buffer,info->buffer,(size_t) Count);
- return 0;
+ DBUG_RETURN(0);
}
-/* Do sequential read from the SEQ_READ_APPEND cache
- we do this in three stages:
- - first read from info->buffer
- - then if there are still data to read, try the file descriptor
- - afterwards, if there are still data to read, try append buffer
+/*
+ Do sequential read from the SEQ_READ_APPEND cache
+ we do this in three stages:
+ - first read from info->buffer
+ - then if there are still data to read, try the file descriptor
+ - afterwards, if there are still data to read, try append buffer
*/
int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
@@ -427,97 +431,127 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
uint length,diff_length,left_length,save_count;
my_off_t max_length, pos_in_file;
save_count=Count;
+
/* first, read the regular buffer */
- if ((left_length=(uint) (info->rc_end-info->rc_pos)))
+ if ((left_length=(uint) (info->read_end-info->read_pos)))
{
- dbug_assert(Count >= left_length); /* User is not using my_b_read() */
- memcpy(Buffer,info->rc_pos, (size_t) (left_length));
+ DBUG_ASSERT(Count > left_length); /* User is not using my_b_read() */
+ memcpy(Buffer,info->read_pos, (size_t) (left_length));
Buffer+=left_length;
Count-=left_length;
}
+ lock_append_buffer(info);
+
/* pos_in_file always point on where info->buffer was read */
- if ((pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer)) >=
+ if ((pos_in_file=info->pos_in_file+(uint) (info->read_end - info->buffer)) >=
info->end_of_file)
- {
- info->pos_in_file=pos_in_file;
goto read_append_buffer;
+
+ if (info->seek_not_done)
+ { /* File touched, do seek */
+ VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
+ info->seek_not_done=0;
}
- /* no need to seek since the read is guaranteed to be sequential */
diff_length=(uint) (pos_in_file & (IO_SIZE-1));
-
+
/* now the second stage begins - read from file descriptor */
if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length)))
{ /* Fill first intern buffer */
uint read_length;
- if (info->end_of_file == pos_in_file)
- { /* End of file */
- goto read_append_buffer;
- }
+
length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
- if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
- != (uint) length)
+ if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) ==
+ (uint)-1)
{
- if (read_length != (uint)-1)
- {
- Count -= read_length;
- Buffer += read_length;
- }
+ info->error= -1;
+ unlock_append_buffer(info);
+ return 1;
+ }
+ Count-=read_length;
+ Buffer+=read_length;
+ pos_in_file+=read_length;
+
+ if (read_length != (uint) length)
+ {
+ /*
+ We only got part of data; Read the rest of the data from the
+ write buffer
+ */
goto read_append_buffer;
}
- Count-=length;
- Buffer+=length;
- pos_in_file+=length;
left_length+=length;
diff_length=0;
}
+
max_length=info->read_length-diff_length;
- if ((info->end_of_file - pos_in_file) < max_length)
+ if (max_length > (info->end_of_file - pos_in_file))
max_length = info->end_of_file - pos_in_file;
if (!max_length)
{
if (Count)
- {
goto read_append_buffer;
- }
- length=0; /* Didn't read any chars */
+ length=0; /* Didn't read any more chars */
}
- else if ((length=my_read(info->file,info->buffer,(uint) max_length,
- info->myflags)) < Count ||
- length == (uint) -1)
+ else
{
- if (length != (uint) -1)
+ length=my_read(info->file,info->buffer,(uint) max_length,
+ info->myflags);
+ if (length == (uint) -1)
+ {
+ info->error= -1;
+ unlock_append_buffer(info);
+ return 1;
+ }
+ if (length < Count)
{
memcpy(Buffer,info->buffer,(size_t) length);
Count -= length;
Buffer += length;
+ goto read_append_buffer;
}
- goto read_append_buffer;
}
- info->rc_pos=info->buffer+Count;
- info->rc_end=info->buffer+length;
+ unlock_append_buffer(info);
+ info->read_pos=info->buffer+Count;
+ info->read_end=info->buffer+length;
info->pos_in_file=pos_in_file;
memcpy(Buffer,info->buffer,(size_t) Count);
return 0;
+
read_append_buffer:
- lock_append_buffer(info);
- if (!Count) return 0;
+
+ /*
+ Read data from the current write buffer.
+ Count should never be == 0 here (The code will work even if count is 0)
+ */
+
{
- uint copy_len = (uint)(info->append_read_pos -
- info->write_pos);
- dbug_assert(info->append_read_pos <= info->write_pos);
- if (copy_len > Count)
- copy_len = Count;
- memcpy(Buffer, info->append_read_pos,
- copy_len);
+ /* First copy the data to Count */
+ uint len_in_buff = (uint) (info->write_pos - info->append_read_pos);
+ uint copy_len;
+
+ DBUG_ASSERT(info->append_read_pos <= info->write_pos);
+ DBUG_ASSERT(pos_in_file == info->end_of_file);
+
+ copy_len=min(Count, len_in_buff);
+ memcpy(Buffer, info->append_read_pos, copy_len);
info->append_read_pos += copy_len;
Count -= copy_len;
if (Count)
- info->error = save_count - Count;
+ info->error = save_count - Count;
+
+ /* Fill read buffer with data from write buffer */
+ memcpy(info->buffer, info->append_read_pos,
+ (size_t) (len_in_buff - copy_len));
+ info->read_pos= info->buffer;
+ info->read_end= info->buffer+(len_in_buff - copy_len);
+ info->append_read_pos=info->write_pos;
+ info->pos_in_file+=len_in_buff;
}
unlock_append_buffer(info);
return Count ? 1 : 0;
}
+
#ifdef HAVE_AIOWAIT
int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
@@ -527,8 +561,8 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
my_off_t next_pos_in_file;
byte *read_buffer;
- memcpy(Buffer,info->rc_pos,
- (size_t) (left_length=(uint) (info->rc_end-info->rc_pos)));
+ memcpy(Buffer,info->read_pos,
+ (size_t) (left_length=(uint) (info->read_end-info->read_pos)));
Buffer+=left_length;
org_Count=Count;
Count-=left_length;
@@ -555,13 +589,13 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
(int) (read_length+left_length));
return(1);
}
- info->pos_in_file+=(uint) (info->rc_end - info->rc_request_pos);
+ info->pos_in_file+=(uint) (info->read_end - info->request_pos);
- if (info->rc_request_pos != info->buffer)
- info->rc_request_pos=info->buffer;
+ if (info->request_pos != info->buffer)
+ info->request_pos=info->buffer;
else
- info->rc_request_pos=info->buffer+info->read_length;
- info->rc_pos=info->rc_request_pos;
+ info->request_pos=info->buffer+info->read_length;
+ info->read_pos=info->request_pos;
next_pos_in_file=info->aio_read_pos+read_length;
/* Check if pos_in_file is changed
@@ -578,8 +612,8 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
my_off_t offset= (info->pos_in_file - info->aio_read_pos);
info->pos_in_file=info->aio_read_pos; /* Whe are here */
- info->rc_pos=info->rc_request_pos+offset;
- read_length-=offset; /* Bytes left from rc_pos */
+ info->read_pos=info->request_pos+offset;
+ read_length-=offset; /* Bytes left from read_pos */
}
}
#ifndef DBUG_OFF
@@ -591,16 +625,16 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
#endif
/* Copy found bytes to buffer */
length=min(Count,read_length);
- memcpy(Buffer,info->rc_pos,(size_t) length);
+ memcpy(Buffer,info->read_pos,(size_t) length);
Buffer+=length;
Count-=length;
left_length+=length;
- info->rc_end=info->rc_pos+read_length;
- info->rc_pos+=length;
+ info->read_end=info->rc_pos+read_length;
+ info->read_pos+=length;
}
else
next_pos_in_file=(info->pos_in_file+ (uint)
- (info->rc_end - info->rc_request_pos));
+ (info->read_end - info->request_pos));
/* If reading large blocks, or first read or read with skipp */
if (Count)
@@ -614,13 +648,13 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
read_length=IO_SIZE*2- (uint) (next_pos_in_file & (IO_SIZE-1));
if (Count < read_length)
{ /* Small block, read to cache */
- if ((read_length=my_read(info->file,info->rc_request_pos,
+ if ((read_length=my_read(info->file,info->request_pos,
read_length, info->myflags)) == (uint) -1)
return info->error= -1;
use_length=min(Count,read_length);
- memcpy(Buffer,info->rc_request_pos,(size_t) use_length);
- info->rc_pos=info->rc_request_pos+Count;
- info->rc_end=info->rc_request_pos+read_length;
+ memcpy(Buffer,info->request_pos,(size_t) use_length);
+ info->read_pos=info->request_pos+Count;
+ info->read_end=info->request_pos+read_length;
info->pos_in_file=next_pos_in_file; /* Start of block in cache */
next_pos_in_file+=read_length;
@@ -641,7 +675,7 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
info->error= read_length == (uint) -1 ? -1 : read_length+left_length;
return 1;
}
- info->rc_pos=info->rc_end=info->rc_request_pos;
+ info->read_pos=info->read_end=info->request_pos;
info->pos_in_file=(next_pos_in_file+=Count);
}
}
@@ -652,7 +686,7 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
if (max_length > (my_off_t) info->read_length - diff_length)
max_length= (my_off_t) info->read_length - diff_length;
- if (info->rc_request_pos != info->buffer)
+ if (info->request_pos != info->buffer)
read_buffer=info->buffer;
else
read_buffer=info->buffer+info->read_length;
@@ -669,13 +703,13 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
my_errno=errno;
DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
errno, info->aio_result.result.aio_errno));
- if (info->rc_request_pos != info->buffer)
+ if (info->request_pos != info->buffer)
{
- bmove(info->buffer,info->rc_request_pos,
- (uint) (info->rc_end - info->rc_pos));
- info->rc_request_pos=info->buffer;
- info->rc_pos-=info->read_length;
- info->rc_end-=info->read_length;
+ bmove(info->buffer,info->request_pos,
+ (uint) (info->read_end - info->read_pos));
+ info->request_pos=info->buffer;
+ info->read_pos-=info->read_length;
+ info->read_end-=info->read_length;
}
info->read_length=info->buffer_length; /* Use hole buffer */
info->read_function=_my_b_read; /* Use normal IO_READ next */
@@ -709,16 +743,17 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
{
uint rest_length,length;
+ if (info->pos_in_file+info->buffer_length > info->end_of_file)
+ {
+ my_errno=errno=EFBIG;
+ return info->error = -1;
+ }
+
rest_length=(uint) (info->write_end - info->write_pos);
memcpy(info->write_pos,Buffer,(size_t) rest_length);
Buffer+=rest_length;
Count-=rest_length;
info->write_pos+=rest_length;
- if (info->pos_in_file+info->buffer_length > info->end_of_file)
- {
- my_errno=errno=EFBIG;
- return info->error = -1;
- }
if (flush_io_cache(info))
return 1;
if (Count >= IO_SIZE)
@@ -740,12 +775,21 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
return 0;
}
-int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
+
+/*
+ Append a block to the write buffer.
+ This is done with the buffer locked to ensure that we don't read from
+ the write buffer before we are ready with it.
+*/
+
+int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
{
uint rest_length,length;
- rest_length=(uint) (info->append_end -
- info->write_pos);
+ lock_append_buffer(info);
+ rest_length=(uint) (info->write_end - info->write_pos);
+ if (Count <= rest_length)
+ goto end;
memcpy(info->write_pos,Buffer,(size_t) rest_length);
Buffer+=rest_length;
Count-=rest_length;
@@ -760,8 +804,11 @@ int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
Count-=length;
Buffer+=length;
}
+
+end:
memcpy(info->write_pos,Buffer,(size_t) Count);
info->write_pos+=Count;
+ unlock_append_buffer(info);
return 0;
}
@@ -791,10 +838,13 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
Buffer+=length;
pos+= length;
Count-= length;
+#ifndef HAVE_PREAD
+ info->seek_not_done=1;
+#endif
}
/* Check if we want to write inside the used part of the buffer.*/
- length= (uint) (info->rc_end - info->buffer);
+ length= (uint) (info->write_end - info->buffer);
if (pos < info->pos_in_file + length)
{
uint offset= (uint) (pos - info->pos_in_file);
@@ -816,20 +866,16 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
return error;
}
-/* avoid warning about empty if body */
-#ifdef THREAD
-#define IF_APPEND_CACHE if (append_cache)
-#else
-#define IF_APPEND_CACHE
-#endif
/* Flush write cache */
int flush_io_cache(IO_CACHE *info)
{
uint length;
- int append_cache;
+ my_bool append_cache;
+ my_off_t pos_in_file;
DBUG_ENTER("flush_io_cache");
+
append_cache = (info->type == SEQ_READ_APPEND);
if (info->type == WRITE_CACHE || append_cache)
{
@@ -838,44 +884,45 @@ int flush_io_cache(IO_CACHE *info)
if (real_open_cached_file(info))
DBUG_RETURN((info->error= -1));
}
- IF_APPEND_CACHE
- lock_append_buffer(info);
- if (info->write_pos != info->buffer)
+ if ((length=(uint) (info->write_pos - info->write_buffer)))
{
- length=(uint) (info->write_pos - info->buffer);
+ pos_in_file=info->pos_in_file;
+ if (append_cache)
+ {
+ pos_in_file=info->end_of_file;
+ info->seek_not_done=1;
+ }
if (info->seek_not_done)
{ /* File touched, do seek */
- if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)) ==
+ if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
MY_FILEPOS_ERROR)
{
- IF_APPEND_CACHE
- unlock_append_buffer(info);
DBUG_RETURN((info->error= -1));
}
- info->seek_not_done=0;
- }
- info->write_pos=info->buffer;
- info->pos_in_file+=length;
- info->write_end=(info->buffer+info->buffer_length-
- (info->pos_in_file & (IO_SIZE-1)));
- if (append_cache)
- {
- info->append_read_pos = info->buffer;
- info->append_end = info->write_end;
+ if (!append_cache)
+ info->seek_not_done=0;
}
- if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP))
- info->error= -1;
+ info->write_pos= info->write_buffer;
+ if (!append_cache)
+ info->pos_in_file+=length;
+ info->write_end= (info->write_buffer+info->buffer_length-
+ ((pos_in_file+length) & (IO_SIZE-1)));
+
+ /* Set this to be used if we are using SEQ_READ_APPEND */
+ info->append_read_pos = info->write_buffer;
+ if (my_write(info->file,info->write_buffer,length,
+ info->myflags | MY_NABP))
+ info->error= -1;
else
info->error= 0;
- IF_APPEND_CACHE
- unlock_append_buffer(info);
+ set_if_bigger(info->end_of_file,(pos_in_file+length));
DBUG_RETURN(info->error);
}
}
#ifdef HAVE_AIOWAIT
else if (info->type != READ_NET)
{
- my_aiowait(&info->aio_result); /* Wait for outstanding req */
+ my_aiowait(&info->aio_result); /* Wait for outstanding req */
info->inited=0;
}
#endif
@@ -888,7 +935,8 @@ int end_io_cache(IO_CACHE *info)
int error=0;
IO_CACHE_CALLBACK pre_close;
DBUG_ENTER("end_io_cache");
- if((pre_close=info->pre_close))
+
+ if ((pre_close=info->pre_close))
(*pre_close)(info);
if (info->alloced_buffer)
{
@@ -896,13 +944,28 @@ int end_io_cache(IO_CACHE *info)
if (info->file != -1) /* File doesn't exist */
error=flush_io_cache(info);
my_free((gptr) info->buffer,MYF(MY_WME));
- info->buffer=info->rc_pos=(byte*) 0;
- info->alloced_buffer = 0;
+ info->buffer=info->read_pos=(byte*) 0;
+ }
+ if (info->type == SEQ_READ_APPEND)
+ {
+ /* Destroy allocated mutex */
+ info->type=0;
+#ifdef THREAD
+ pthread_mutex_destroy(&info->append_buffer_lock);
+#endif
}
DBUG_RETURN(error);
} /* end_io_cache */
+
+/**********************************************************************
+ Testing of MF_IOCACHE
+**********************************************************************/
+
#ifdef MAIN
+
+#include <my_dir.h>
+
void die(const char* fmt, ...)
{
va_list va_args;
@@ -916,7 +979,7 @@ void die(const char* fmt, ...)
int open_file(const char* fname, IO_CACHE* info, int cache_size)
{
int fd;
- if ((fd=my_open(fname,O_CREAT|O_APPEND|O_RDWR,MYF(MY_WME))) < 0)
+ if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
die("Could not open %s", fname);
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
die("failed in init_io_cache()");
@@ -960,8 +1023,8 @@ int main(int argc, char** argv)
char buf[4];
int block_size = abs(rand() % max_block);
int4store(buf, block_size);
- if (my_b_write(&sra_cache,buf,4) ||
- my_b_write(&sra_cache, block, block_size))
+ if (my_b_append(&sra_cache,buf,4) ||
+ my_b_append(&sra_cache, block, block_size))
die("write failed");
total_bytes += 4+block_size;
}
@@ -985,6 +1048,3 @@ supposedly written\n");
return 0;
}
#endif
-
-
-