summaryrefslogtreecommitdiff
path: root/mysys/mf_iocache.c
diff options
context:
space:
mode:
authorunknown <sasha@mysql.sashanet.com>2001-11-03 16:54:00 -0700
committerunknown <sasha@mysql.sashanet.com>2001-11-03 16:54:00 -0700
commitcd825a19936d26735db0c1b4b251bd78617d0c2b (patch)
tree8abf01e56458b77f0e836d8f72481d71cc846b68 /mysys/mf_iocache.c
parent368c6c1f30f44d1d028022a06dbcb9754815ee92 (diff)
downloadmariadb-git-cd825a19936d26735db0c1b4b251bd78617d0c2b.tar.gz
more work on IO_CACHE
portability fixes for systems with broken syscalls that do not interrupt on a signal temporary commit - will not be pushed, need to sync up include/my_sys.h: work on READ_APPEND cache mysys/Makefile.am: change to test IO_CACHE mysys/mf_iocache.c: work on READ_APPEND cache BitKeeper/etc/ignore: Added mysys/#mf_iocache.c# mysys/test_io_cache to the ignore list sql/mysqld.cc: make shutdown work on broken systems sql/sql_repl.cc: make sure slave can be stopped on broken systems in all cases, clean-up
Diffstat (limited to 'mysys/mf_iocache.c')
-rw-r--r--mysys/mf_iocache.c217
1 files changed, 195 insertions, 22 deletions
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index f0100b5122b..d18c3a51c5d 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -41,6 +41,10 @@ static void my_aiowait(my_aio_result *result);
#include <assert.h>
#include <errno.h>
+#ifdef MAIN
+#include <my_dir.h>
+#endif
+
static void init_read_function(IO_CACHE* info, enum cache_type type);
static void init_read_function(IO_CACHE* info, enum cache_type type)
@@ -152,7 +156,7 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info->rc_request_pos=info->rc_pos=info->buffer;
if (type == SEQ_READ_APPEND)
{
- info->append_pos = info->append_buffer;
+ info->append_read_pos = info->append_write_pos = info->append_buffer;
info->append_end = info->append_buffer + info->buffer_length;
#ifdef THREAD
pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
@@ -277,6 +281,10 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
~(my_off_t) 0);
}
}
+ if (info->type == SEQ_READ_APPEND)
+ {
+ info->append_read_pos = info->append_write_pos = info->append_buffer;
+ }
info->type=type;
info->error=0;
init_read_function(info,type);
@@ -294,7 +302,7 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
info->inited=0;
#endif
DBUG_RETURN(0);
-} /* init_io_cache */
+} /* reinit_io_cache */
@@ -377,11 +385,19 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
return 0;
}
+/* Do sequential read from the SEQ_READ_APPEND cache
+ we do this in three stages:
+ - first read from info->buffer
+ - then if there are still data to read, try the file descriptor
+ - afterwards, if there are still data to read, try append buffer
+*/
+
int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
- uint length,diff_length,left_length;
+ uint length,diff_length,left_length,save_count;
my_off_t max_length, pos_in_file;
-
+ save_count=Count;
+ /* first, read the regular buffer */
if ((left_length=(uint) (info->rc_end-info->rc_pos)))
{
dbug_assert(Count >= left_length); /* User is not using my_b_read() */
@@ -390,30 +406,33 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
Count-=left_length;
}
/* pos_in_file always point on where info->buffer was read */
- pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
+ if ((pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer)) >=
+ info->end_of_file)
+ {
+ info->pos_in_file=pos_in_file;
+ goto read_append_buffer;
+ }
/* no need to seek since the read is guaranteed to be sequential */
diff_length=(uint) (pos_in_file & (IO_SIZE-1));
-#ifdef THREAD
- pthread_mutex_lock(&info->append_buffer_lock);
-#endif
-#ifdef THREAD
- pthread_mutex_unlock(&info->append_buffer_lock);
-#endif
+
+ /* now the second stage begins - read from file descriptor */
if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length)))
{ /* Fill first intern buffer */
uint read_length;
if (info->end_of_file == pos_in_file)
{ /* End of file */
- info->error=(int) left_length;
- return 1;
+ goto read_append_buffer;
}
length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
!= (uint) length)
{
- info->error= read_length == (uint) -1 ? -1 :
- (int) (read_length+left_length);
- return 1;
+ if (read_length != (uint)-1)
+ {
+ Count -= read_length;
+ Buffer += read_length;
+ }
+ goto read_append_buffer;
}
Count-=length;
Buffer+=length;
@@ -422,15 +441,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
diff_length=0;
}
max_length=info->read_length-diff_length;
- if (info->type != READ_FIFO &&
- (info->end_of_file - pos_in_file) < max_length)
+ if ((info->end_of_file - pos_in_file) < max_length)
max_length = info->end_of_file - pos_in_file;
if (!max_length)
{
if (Count)
{
- info->error= left_length; /* We only got this many char */
- return 1;
+ goto read_append_buffer;
}
length=0; /* Didn't read any chars */
}
@@ -439,15 +456,36 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
length == (uint) -1)
{
if (length != (uint) -1)
+ {
memcpy(Buffer,info->buffer,(size_t) length);
- info->error= length == (uint) -1 ? -1 : (int) (length+left_length);
- return 1;
+ Count -= length;
+ Buffer += length;
+ }
+ goto read_append_buffer;
}
info->rc_pos=info->buffer+Count;
info->rc_end=info->buffer+length;
info->pos_in_file=pos_in_file;
memcpy(Buffer,info->buffer,(size_t) Count);
return 0;
+read_append_buffer:
+ lock_append_buffer(info);
+ if (!Count) return 0;
+ {
+ uint copy_len = (uint)(info->append_read_pos -
+ info->append_write_pos);
+ dbug_assert(info->append_read_pos <= info->append_write_pos);
+ if (copy_len > Count)
+ copy_len = Count;
+ memcpy(Buffer, info->append_read_pos,
+ copy_len);
+ info->append_read_pos += copy_len;
+ Count -= copy_len;
+ if (Count)
+ info->error = save_count - Count;
+ }
+ unlock_append_buffer(info);
+ return Count ? 1 : 0;
}
#ifdef HAVE_AIOWAIT
@@ -672,6 +710,31 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
return 0;
}
+int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
+{
+ uint rest_length,length;
+
+ rest_length=(uint) (info->append_end -
+ info->append_write_pos);
+ memcpy(info->append_write_pos,Buffer,(size_t) rest_length);
+ Buffer+=rest_length;
+ Count-=rest_length;
+ info->append_write_pos+=rest_length;
+ if (flush_io_cache(info))
+ return 1;
+ if (Count >= IO_SIZE)
+ { /* Fill first intern buffer */
+ length=Count & (uint) ~(IO_SIZE-1);
+ if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
+ return info->error= -1;
+ Count-=length;
+ Buffer+=length;
+ }
+ memcpy(info->append_write_pos,Buffer,(size_t) Count);
+ info->append_write_pos+=Count;
+ return 0;
+}
+
/*
Write a block to disk where part of the data may be inside the record
@@ -756,6 +819,30 @@ int flush_io_cache(IO_CACHE *info)
DBUG_RETURN(0);
}
}
+ else if (info->type == SEQ_READ_APPEND)
+ {
+ if (info->file == -1)
+ {
+ if (real_open_cached_file(info))
+ DBUG_RETURN((info->error= -1));
+ }
+ lock_append_buffer(info);
+ if (info->append_write_pos != info->append_buffer)
+ {
+ length=(uint) (info->append_write_pos - info->append_buffer);
+ info->append_read_pos=info->append_write_pos=info->append_buffer;
+ info->append_end=(info->append_buffer+info->buffer_length-
+ (info->pos_in_file & (IO_SIZE-1)));
+ if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP))
+ {
+ unlock_append_buffer(info);
+ DBUG_RETURN((info->error= -1));
+ }
+ unlock_append_buffer(info);
+ DBUG_RETURN(0);
+ }
+ unlock_append_buffer(info);
+ }
#ifdef HAVE_AIOWAIT
else if (info->type != READ_NET)
{
@@ -784,3 +871,89 @@ int end_io_cache(IO_CACHE *info)
DBUG_RETURN(error);
} /* end_io_cache */
+#ifdef MAIN
+void die(const char* fmt, ...)
+{
+ va_list va_args;
+ va_start(va_args,fmt);
+ fprintf(stderr,"Error:");
+ vfprintf(stderr, fmt,va_args);
+ fprintf(stderr,", errno=%d\n", errno);
+ exit(1);
+}
+
+int open_file(const char* fname, IO_CACHE* info, int cache_size)
+{
+ int fd;
+ if ((fd=my_open(fname,O_CREAT|O_APPEND|O_RDWR,MYF(MY_WME))) < 0)
+ die("Could not open %s", fname);
+ if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
+ die("failed in init_io_cache()");
+ return fd;
+}
+
+void close_file(IO_CACHE* info)
+{
+ end_io_cache(info);
+ my_close(info->file, MYF(MY_WME));
+}
+
+int main(int argc, char** argv)
+{
+ IO_CACHE sra_cache; /* SEQ_READ_APPEND */
+ MY_STAT status;
+ const char* fname="/tmp/iocache.test";
+ int cache_size=16384;
+ char llstr_buf[22];
+ int max_block,total_bytes=0;
+ int i,num_loops=100,error=0;
+ char *p;
+ char* block, *block_end;
+ MY_INIT(argv[0]);
+ max_block = cache_size*3;
+ if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
+ die("Not enough memory to allocate test block");
+ block_end = block + max_block;
+ for (p = block,i=0; p < block_end;i++)
+ {
+ *p++ = (char)i;
+ }
+ if (my_stat(fname,&status, MYF(0)) &&
+ my_delete(fname,MYF(MY_WME)))
+ {
+ die("Delete of %s failed, aborting", fname);
+ }
+ open_file(fname,&sra_cache, cache_size);
+ for (i = 0; i < num_loops; i++)
+ {
+ char buf[4];
+ int block_size = abs(rand() % max_block);
+ int4store(buf, block_size);
+ if (my_b_write(&sra_cache,buf,4) ||
+ my_b_write(&sra_cache, block, block_size))
+ die("write failed");
+ total_bytes += 4+block_size;
+ }
+ close_file(&sra_cache);
+ my_free(block,MYF(MY_WME));
+ if (!my_stat(fname,&status,MYF(MY_WME)))
+ die("%s failed to stat, but I had just closed it,\
+ wonder how that happened");
+ printf("Final size of %s is %s, wrote %d bytes\n",fname,
+ llstr(status.st_size,llstr_buf),
+ total_bytes);
+ my_delete(fname, MYF(MY_WME));
+ /* check correctness of tests */
+ if (total_bytes != status.st_size)
+ {
+ fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
+supposedly written\n");
+ error=1;
+ }
+ exit(error);
+ return 0;
+}
+#endif
+
+
+