diff options
author | unknown <sasha@mysql.sashanet.com> | 2001-11-03 16:54:00 -0700 |
---|---|---|
committer | unknown <sasha@mysql.sashanet.com> | 2001-11-03 16:54:00 -0700 |
commit | cd825a19936d26735db0c1b4b251bd78617d0c2b (patch) | |
tree | 8abf01e56458b77f0e836d8f72481d71cc846b68 /mysys/mf_iocache.c | |
parent | 368c6c1f30f44d1d028022a06dbcb9754815ee92 (diff) | |
download | mariadb-git-cd825a19936d26735db0c1b4b251bd78617d0c2b.tar.gz |
more work on IO_CACHE
portability fixes for systems with broken syscalls that do not interrupt on
a signal
temporary commit - will not be pushed, need to sync up
include/my_sys.h:
work on READ_APPEND cache
mysys/Makefile.am:
change to test IO_CACHE
mysys/mf_iocache.c:
work on READ_APPEND cache
BitKeeper/etc/ignore:
Added mysys/#mf_iocache.c# mysys/test_io_cache to the ignore list
sql/mysqld.cc:
make shutdown work on broken systems
sql/sql_repl.cc:
make sure slave can be stopped on broken systems in all cases, clean-up
Diffstat (limited to 'mysys/mf_iocache.c')
-rw-r--r-- | mysys/mf_iocache.c | 217 |
1 files changed, 195 insertions, 22 deletions
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index f0100b5122b..d18c3a51c5d 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -41,6 +41,10 @@ static void my_aiowait(my_aio_result *result); #include <assert.h> #include <errno.h> +#ifdef MAIN +#include <my_dir.h> +#endif + static void init_read_function(IO_CACHE* info, enum cache_type type); static void init_read_function(IO_CACHE* info, enum cache_type type) @@ -152,7 +156,7 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, info->rc_request_pos=info->rc_pos=info->buffer; if (type == SEQ_READ_APPEND) { - info->append_pos = info->append_buffer; + info->append_read_pos = info->append_write_pos = info->append_buffer; info->append_end = info->append_buffer + info->buffer_length; #ifdef THREAD pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST); @@ -277,6 +281,10 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, ~(my_off_t) 0); } } + if (info->type == SEQ_READ_APPEND) + { + info->append_read_pos = info->append_write_pos = info->append_buffer; + } info->type=type; info->error=0; init_read_function(info,type); @@ -294,7 +302,7 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, info->inited=0; #endif DBUG_RETURN(0); -} /* init_io_cache */ +} /* reinit_io_cache */ @@ -377,11 +385,19 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) return 0; } +/* Do sequential read from the SEQ_READ_APPEND cache + we do this in three stages: + - first read from info->buffer + - then if there are still data to read, try the file descriptor + - afterwards, if there are still data to read, try append buffer +*/ + int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) { - uint length,diff_length,left_length; + uint length,diff_length,left_length,save_count; my_off_t max_length, pos_in_file; - + save_count=Count; + /* first, read the regular buffer */ if ((left_length=(uint) (info->rc_end-info->rc_pos))) { dbug_assert(Count >= left_length); /* User is not using my_b_read() */ @@ -390,30 +406,33 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) Count-=left_length; } /* pos_in_file always point on where info->buffer was read */ - pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer); + if ((pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer)) >= + info->end_of_file) + { + info->pos_in_file=pos_in_file; + goto read_append_buffer; + } /* no need to seek since the read is guaranteed to be sequential */ diff_length=(uint) (pos_in_file & (IO_SIZE-1)); -#ifdef THREAD - pthread_mutex_lock(&info->append_buffer_lock); -#endif -#ifdef THREAD - pthread_mutex_unlock(&info->append_buffer_lock); -#endif + + /* now the second stage begins - read from file descriptor */ if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length))) { /* Fill first intern buffer */ uint read_length; if (info->end_of_file == pos_in_file) { /* End of file */ - info->error=(int) left_length; - return 1; + goto read_append_buffer; } length=(Count & (uint) ~(IO_SIZE-1))-diff_length; if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) != (uint) length) { - info->error= read_length == (uint) -1 ? -1 : - (int) (read_length+left_length); - return 1; + if (read_length != (uint)-1) + { + Count -= read_length; + Buffer += read_length; + } + goto read_append_buffer; } Count-=length; Buffer+=length; @@ -422,15 +441,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) diff_length=0; } max_length=info->read_length-diff_length; - if (info->type != READ_FIFO && - (info->end_of_file - pos_in_file) < max_length) + if ((info->end_of_file - pos_in_file) < max_length) max_length = info->end_of_file - pos_in_file; if (!max_length) { if (Count) { - info->error= left_length; /* We only got this many char */ - return 1; + goto read_append_buffer; } length=0; /* Didn't read any chars */ } @@ -439,15 +456,36 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) length == (uint) -1) { if (length != (uint) -1) + { memcpy(Buffer,info->buffer,(size_t) length); - info->error= length == (uint) -1 ? -1 : (int) (length+left_length); - return 1; + Count -= length; + Buffer += length; + } + goto read_append_buffer; } info->rc_pos=info->buffer+Count; info->rc_end=info->buffer+length; info->pos_in_file=pos_in_file; memcpy(Buffer,info->buffer,(size_t) Count); return 0; +read_append_buffer: + lock_append_buffer(info); + if (!Count) return 0; + { + uint copy_len = (uint)(info->append_read_pos - + info->append_write_pos); + dbug_assert(info->append_read_pos <= info->append_write_pos); + if (copy_len > Count) + copy_len = Count; + memcpy(Buffer, info->append_read_pos, + copy_len); + info->append_read_pos += copy_len; + Count -= copy_len; + if (Count) + info->error = save_count - Count; + } + unlock_append_buffer(info); + return Count ? 1 : 0; } #ifdef HAVE_AIOWAIT @@ -672,6 +710,31 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) return 0; } +int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count) +{ + uint rest_length,length; + + rest_length=(uint) (info->append_end - + info->append_write_pos); + memcpy(info->append_write_pos,Buffer,(size_t) rest_length); + Buffer+=rest_length; + Count-=rest_length; + info->append_write_pos+=rest_length; + if (flush_io_cache(info)) + return 1; + if (Count >= IO_SIZE) + { /* Fill first intern buffer */ + length=Count & (uint) ~(IO_SIZE-1); + if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP)) + return info->error= -1; + Count-=length; + Buffer+=length; + } + memcpy(info->append_write_pos,Buffer,(size_t) Count); + info->append_write_pos+=Count; + return 0; +} + /* Write a block to disk where part of the data may be inside the record @@ -756,6 +819,30 @@ int flush_io_cache(IO_CACHE *info) DBUG_RETURN(0); } } + else if (info->type == SEQ_READ_APPEND) + { + if (info->file == -1) + { + if (real_open_cached_file(info)) + DBUG_RETURN((info->error= -1)); + } + lock_append_buffer(info); + if (info->append_write_pos != info->append_buffer) + { + length=(uint) (info->append_write_pos - info->append_buffer); + info->append_read_pos=info->append_write_pos=info->append_buffer; + info->append_end=(info->append_buffer+info->buffer_length- + (info->pos_in_file & (IO_SIZE-1))); + if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP)) + { + unlock_append_buffer(info); + DBUG_RETURN((info->error= -1)); + } + unlock_append_buffer(info); + DBUG_RETURN(0); + } + unlock_append_buffer(info); + } #ifdef HAVE_AIOWAIT else if (info->type != READ_NET) { @@ -784,3 +871,89 @@ int end_io_cache(IO_CACHE *info) DBUG_RETURN(error); } /* end_io_cache */ +#ifdef MAIN +void die(const char* fmt, ...) +{ + va_list va_args; + va_start(va_args,fmt); + fprintf(stderr,"Error:"); + vfprintf(stderr, fmt,va_args); + fprintf(stderr,", errno=%d\n", errno); + exit(1); +} + +int open_file(const char* fname, IO_CACHE* info, int cache_size) +{ + int fd; + if ((fd=my_open(fname,O_CREAT|O_APPEND|O_RDWR,MYF(MY_WME))) < 0) + die("Could not open %s", fname); + if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME))) + die("failed in init_io_cache()"); + return fd; +} + +void close_file(IO_CACHE* info) +{ + end_io_cache(info); + my_close(info->file, MYF(MY_WME)); +} + +int main(int argc, char** argv) +{ + IO_CACHE sra_cache; /* SEQ_READ_APPEND */ + MY_STAT status; + const char* fname="/tmp/iocache.test"; + int cache_size=16384; + char llstr_buf[22]; + int max_block,total_bytes=0; + int i,num_loops=100,error=0; + char *p; + char* block, *block_end; + MY_INIT(argv[0]); + max_block = cache_size*3; + if (!(block=(char*)my_malloc(max_block,MYF(MY_WME)))) + die("Not enough memory to allocate test block"); + block_end = block + max_block; + for (p = block,i=0; p < block_end;i++) + { + *p++ = (char)i; + } + if (my_stat(fname,&status, MYF(0)) && + my_delete(fname,MYF(MY_WME))) + { + die("Delete of %s failed, aborting", fname); + } + open_file(fname,&sra_cache, cache_size); + for (i = 0; i < num_loops; i++) + { + char buf[4]; + int block_size = abs(rand() % max_block); + int4store(buf, block_size); + if (my_b_write(&sra_cache,buf,4) || + my_b_write(&sra_cache, block, block_size)) + die("write failed"); + total_bytes += 4+block_size; + } + close_file(&sra_cache); + my_free(block,MYF(MY_WME)); + if (!my_stat(fname,&status,MYF(MY_WME))) + die("%s failed to stat, but I had just closed it,\ + wonder how that happened"); + printf("Final size of %s is %s, wrote %d bytes\n",fname, + llstr(status.st_size,llstr_buf), + total_bytes); + my_delete(fname, MYF(MY_WME)); + /* check correctness of tests */ + if (total_bytes != status.st_size) + { + fprintf(stderr,"Not the same number of bytes acutally in file as bytes \ +supposedly written\n"); + error=1; + } + exit(error); + return 0; +} +#endif + + + |