summaryrefslogtreecommitdiff
path: root/mysys/mf_iocache.c
diff options
context:
space:
mode:
authorunknown <sasha@mysql.sashanet.com>2001-10-23 13:28:03 -0600
committerunknown <sasha@mysql.sashanet.com>2001-10-23 13:28:03 -0600
commit8fc78e08b0e68f82121b0bba8d930bc5ec57a29f (patch)
treec5c654aca3970830687c68cb22289835e1719233 /mysys/mf_iocache.c
parent74f49f9f34b7a4cc1b87aa1cb657b72f8c6c856e (diff)
downloadmariadb-git-8fc78e08b0e68f82121b0bba8d930bc5ec57a29f.tar.gz
cleanup
removal of duplicate code in mf_iocache.cc work on failsafe replication work on SEQ_READ_APPEND io cache include/my_sys.h: updates for SEQ_READ_APPEND libmysql/Makefile.am: fix for mysys/mf_iocache.c libmysql/libmysql.c: updates for new format of SHOW SLAVE HOSTS mysql-test/r/rpl000001.result: test replication of LOAD DATA LOCAL INFILE mysql-test/r/rpl000002.result: updated test result mysql-test/t/rpl000001.test: test LOAD DATA LOCAL INFILE mysys/mf_iocache.c: cleanup to remove duplicate functionality some work on SEQ_READ_APPEND sql/mf_iocache.cc: cleanup to remove duplicate functionality sql/repl_failsafe.cc: more work on failsafe replication sql/repl_failsafe.h: more work on failsafe replication sql/slave.cc: cleanup more work on failsafe replication sql/sql_load.cc: fixed bug on replicating empty file loads got LOAD DATA LOCAL INFILE to work again, and to be replicated sql/sql_repl.cc: cleanup more work on failsafe replication sql/sql_repl.h: more work on failsafe replication
Diffstat (limited to 'mysys/mf_iocache.c')
-rw-r--r--mysys/mf_iocache.c259
1 files changed, 210 insertions, 49 deletions
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index 0ef496227b6..f0100b5122b 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -36,10 +36,34 @@
#include <m_string.h>
#ifdef HAVE_AIOWAIT
#include "mysys_err.h"
-#include <errno.h>
static void my_aiowait(my_aio_result *result);
#endif
+#include <assert.h>
+#include <errno.h>
+
+static void init_read_function(IO_CACHE* info, enum cache_type type);
+static void init_read_function(IO_CACHE* info, enum cache_type type)
+{
+ switch (type)
+ {
+#ifndef MYSQL_CLIENT
+ case READ_NET:
+ /* must be initialized by the caller. The problem is that
+ _my_b_net_read has to be defined in sql directory because of
+ the dependency on THD, and therefore cannot be visible to
+ programs that link against mysys but know nothing about THD, such
+ as myisamchk
+ */
+ break;
+#endif
+ case SEQ_READ_APPEND:
+ info->read_function = _my_b_seq_read;
+ break;
+ default:
+ info->read_function = _my_b_read;
+ }
+}
/*
** if cachesize == 0 then use default cachesize (from s-file)
@@ -55,14 +79,15 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
DBUG_ENTER("init_io_cache");
DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset));
- info->file=file;
+ /* There is no file in net_reading */
+ info->file= file;
info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
if (!cachesize)
if (! (cachesize= my_default_record_cache_size))
DBUG_RETURN(1); /* No cache requested */
min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
- if (type == READ_CACHE)
+ if (type == READ_CACHE || type == SEQ_READ_APPEND)
{ /* Assume file isn't growing */
if (cache_myflags & MY_DONT_CHECK_FILESIZE)
{
@@ -77,36 +102,65 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
if (end_of_file < seek_offset)
end_of_file=seek_offset;
VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0)));
- if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
+ /* Trim cache size if the file is very small.
+ However, we should not do this with SEQ_READ_APPEND cache
+ */
+ if (type != SEQ_READ_APPEND &&
+ (my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
{
cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1;
- use_async_io=0; /* No nead to use async */
+ use_async_io=0; /* No need to use async */
}
}
}
-
- for (;;)
+ info->alloced_buffer = 0;
+ if ((int) type < (int) READ_NET)
{
- cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
- (ulong) ~(min_cache-1));
- if (cachesize < min_cache)
- cachesize = min_cache;
- if ((info->buffer=
- (byte*) my_malloc(cachesize,
- MYF((cache_myflags & ~ MY_WME) |
- (cachesize == min_cache ? MY_WME : 0)))) != 0)
- break; /* Enough memory found */
- if (cachesize == min_cache)
- DBUG_RETURN(2); /* Can't alloc cache */
- cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */
+ uint buffer_block;
+ for (;;)
+ {
+ buffer_block = cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
+ (ulong) ~(min_cache-1));
+ if (type == SEQ_READ_APPEND)
+ buffer_block *= 2;
+ if (cachesize < min_cache)
+ cachesize = min_cache;
+ if ((info->buffer=
+ (byte*) my_malloc(buffer_block,
+ MYF((cache_myflags & ~ MY_WME) |
+ (cachesize == min_cache ? MY_WME : 0)))) != 0)
+ {
+ if (type == SEQ_READ_APPEND)
+ info->append_buffer = info->buffer + cachesize;
+ info->alloced_buffer=1;
+ break; /* Enough memory found */
+ }
+ if (cachesize == min_cache)
+ DBUG_RETURN(2); /* Can't alloc cache */
+ cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */
+ }
}
- info->pos_in_file=seek_offset;
+ else
+ info->buffer=0;
+
+ DBUG_PRINT("info",("init_io_cache: cachesize = %u",cachesize));
+ info->pos_in_file= seek_offset;
info->read_length=info->buffer_length=cachesize;
- info->seek_not_done= test(file >= 0); /* Seek not done */
+ info->seek_not_done= test(file >= 0 && type != READ_FIFO &&
+ type != READ_NET);
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
info->rc_request_pos=info->rc_pos=info->buffer;
+ if (type == SEQ_READ_APPEND)
+ {
+ info->append_pos = info->append_buffer;
+ info->append_end = info->append_buffer + info->buffer_length;
+#ifdef THREAD
+ pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
+#endif
+ }
- if (type == READ_CACHE)
+ if (type == READ_CACHE || type == SEQ_READ_APPEND ||
+ type == READ_NET || type == READ_FIFO)
{
info->rc_end=info->buffer; /* Nothing in cache */
}
@@ -114,10 +168,12 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
{
info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
}
- info->end_of_file=MY_FILEPOS_ERROR; /* May be changed by user */
+ /* end_of_file may be changed by user later */
+ info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0
+ : ~(my_off_t) 0);
info->type=type;
info->error=0;
- info->read_function=_my_b_read;
+ init_read_function(info,type);
#ifdef HAVE_AIOWAIT
if (use_async_io && ! my_disable_async_io)
{
@@ -169,6 +225,8 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
DBUG_ENTER("reinit_io_cache");
info->seek_not_done= test(info->file >= 0); /* Seek not done */
+
+ /* If the whole file is in memory, avoid flushing to disk */
if (! clear_cache &&
seek_offset >= info->pos_in_file &&
seek_offset <= info->pos_in_file +
@@ -179,8 +237,12 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
info->rc_end=info->rc_pos;
info->end_of_file=my_b_tell(info);
}
- else if (info->type == READ_CACHE && type == WRITE_CACHE)
- info->rc_end=info->buffer+info->buffer_length;
+ else if (type == WRITE_CACHE)
+ {
+ if (info->type == READ_CACHE)
+ info->rc_end=info->buffer+info->buffer_length;
+ info->end_of_file = ~(my_off_t) 0;
+ }
info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
#ifdef HAVE_AIOWAIT
my_aiowait(&info->aio_result); /* Wait for outstanding req */
@@ -188,13 +250,22 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
}
else
{
+ /*
+ If we change from WRITE_CACHE to READ_CACHE, assume that everything
+ after the current positions should be ignored
+ */
if (info->type == WRITE_CACHE && type == READ_CACHE)
info->end_of_file=my_b_tell(info);
- if (flush_io_cache(info))
+ /* No need to flush cache if we want to reuse it */
+ if ((type != WRITE_CACHE || !clear_cache) && flush_io_cache(info))
DBUG_RETURN(1);
- info->pos_in_file=seek_offset;
+ if (info->pos_in_file != seek_offset)
+ {
+ info->pos_in_file=seek_offset;
+ info->seek_not_done=1;
+ }
info->rc_request_pos=info->rc_pos=info->buffer;
- if (type == READ_CACHE)
+ if (type == READ_CACHE || type == READ_NET || type == READ_FIFO)
{
info->rc_end=info->buffer; /* Nothing in cache */
}
@@ -202,19 +273,23 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
{
info->rc_end=info->buffer+info->buffer_length-
(seek_offset & (IO_SIZE-1));
- info->end_of_file=MY_FILEPOS_ERROR; /* May be changed by user */
+ info->end_of_file= ((type == READ_NET || type == READ_FIFO) ? 0 :
+ ~(my_off_t) 0);
}
}
info->type=type;
info->error=0;
- info->read_function=_my_b_read;
+ init_read_function(info,type);
#ifdef HAVE_AIOWAIT
- if (use_async_io && ! my_disable_async_io &&
- ((ulong) info->buffer_length <
- (ulong) (info->end_of_file - seek_offset)))
+ if (type != READ_NET)
{
- info->read_length=info->buffer_length/2;
- info->read_function=_my_b_async_read;
+ if (use_async_io && ! my_disable_async_io &&
+ ((ulong) info->buffer_length <
+ (ulong) (info->end_of_file - seek_offset)))
+ {
+ info->read_length=info->buffer_length/2;
+ info->read_function=_my_b_async_read;
+ }
}
info->inited=0;
#endif
@@ -223,18 +298,27 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
- /* Read buffered. Returns 1 if can't read requested characters */
- /* Returns 0 if record read */
+ /*
+ Read buffered. Returns 1 if can't read requested characters
+ This function is only called from the my_b_read() macro
+ when there isn't enough characters in the buffer to
+ satisfy the request.
+ Returns 0 we succeeded in reading all data
+ */
int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
uint length,diff_length,left_length;
my_off_t max_length, pos_in_file;
-
- memcpy(Buffer,info->rc_pos,
- (size_t) (left_length=(uint) (info->rc_end-info->rc_pos)));
- Buffer+=left_length;
- Count-=left_length;
+
+ if ((left_length=(uint) (info->rc_end-info->rc_pos)))
+ {
+ dbug_assert(Count >= left_length); /* User is not using my_b_read() */
+ memcpy(Buffer,info->rc_pos, (size_t) (left_length));
+ Buffer+=left_length;
+ Count-=left_length;
+ }
+ /* pos_in_file always point on where info->buffer was read */
pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
if (info->seek_not_done)
{ /* File touched, do seek */
@@ -264,10 +348,10 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
left_length+=length;
diff_length=0;
}
- max_length=info->end_of_file - pos_in_file;
- if (max_length > info->read_length-diff_length)
- max_length=info->read_length-diff_length;
-
+ max_length=info->read_length-diff_length;
+ if (info->type != READ_FIFO &&
+ (info->end_of_file - pos_in_file) < max_length)
+ max_length = info->end_of_file - pos_in_file;
if (!max_length)
{
if (Count)
@@ -293,6 +377,78 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
return 0;
}
+int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
+{
+ uint length,diff_length,left_length;
+ my_off_t max_length, pos_in_file;
+
+ if ((left_length=(uint) (info->rc_end-info->rc_pos)))
+ {
+ dbug_assert(Count >= left_length); /* User is not using my_b_read() */
+ memcpy(Buffer,info->rc_pos, (size_t) (left_length));
+ Buffer+=left_length;
+ Count-=left_length;
+ }
+ /* pos_in_file always point on where info->buffer was read */
+ pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
+ /* no need to seek since the read is guaranteed to be sequential */
+ diff_length=(uint) (pos_in_file & (IO_SIZE-1));
+#ifdef THREAD
+ pthread_mutex_lock(&info->append_buffer_lock);
+#endif
+#ifdef THREAD
+ pthread_mutex_unlock(&info->append_buffer_lock);
+#endif
+ if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length)))
+ { /* Fill first intern buffer */
+ uint read_length;
+ if (info->end_of_file == pos_in_file)
+ { /* End of file */
+ info->error=(int) left_length;
+ return 1;
+ }
+ length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
+ if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
+ != (uint) length)
+ {
+ info->error= read_length == (uint) -1 ? -1 :
+ (int) (read_length+left_length);
+ return 1;
+ }
+ Count-=length;
+ Buffer+=length;
+ pos_in_file+=length;
+ left_length+=length;
+ diff_length=0;
+ }
+ max_length=info->read_length-diff_length;
+ if (info->type != READ_FIFO &&
+ (info->end_of_file - pos_in_file) < max_length)
+ max_length = info->end_of_file - pos_in_file;
+ if (!max_length)
+ {
+ if (Count)
+ {
+ info->error= left_length; /* We only got this many char */
+ return 1;
+ }
+ length=0; /* Didn't read any chars */
+ }
+ else if ((length=my_read(info->file,info->buffer,(uint) max_length,
+ info->myflags)) < Count ||
+ length == (uint) -1)
+ {
+ if (length != (uint) -1)
+ memcpy(Buffer,info->buffer,(size_t) length);
+ info->error= length == (uint) -1 ? -1 : (int) (length+left_length);
+ return 1;
+ }
+ info->rc_pos=info->buffer+Count;
+ info->rc_end=info->buffer+length;
+ info->pos_in_file=pos_in_file;
+ memcpy(Buffer,info->buffer,(size_t) Count);
+ return 0;
+}
#ifdef HAVE_AIOWAIT
@@ -490,6 +646,11 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
Buffer+=rest_length;
Count-=rest_length;
info->rc_pos+=rest_length;
+ if (info->pos_in_file+info->buffer_length > info->end_of_file)
+ {
+ my_errno=errno=EFBIG;
+ return info->error = -1;
+ }
if (flush_io_cache(info))
return 1;
if (Count >= IO_SIZE)
@@ -596,7 +757,7 @@ int flush_io_cache(IO_CACHE *info)
}
}
#ifdef HAVE_AIOWAIT
- else
+ else if (info->type != READ_NET)
{
my_aiowait(&info->aio_result); /* Wait for outstanding req */
info->inited=0;
@@ -613,7 +774,7 @@ int end_io_cache(IO_CACHE *info)
DBUG_ENTER("end_io_cache");
if((pre_close=info->pre_close))
(*pre_close)(info);
- if (info->buffer)
+ if (info->alloced_buffer)
{
if (info->file != -1) /* File doesn't exist */
error=flush_io_cache(info);