diff options
author | unknown <serg@serg.mysql.com> | 2002-06-19 23:54:45 +0000 |
---|---|---|
committer | unknown <serg@serg.mysql.com> | 2002-06-19 23:54:45 +0000 |
commit | cf7be403f323a0d38cf23ffa46d5dcdbfeb38a8f (patch) | |
tree | 9939048d2a201bb88a20ae302cbc0cdfa14fd75d /mysys/mf_iocache.c | |
parent | 44d91e0a1f3b404beb4c1bd4706371914ca1bce8 (diff) | |
download | mariadb-git-cf7be403f323a0d38cf23ffa46d5dcdbfeb38a8f.tar.gz |
multithreaded repair-by-sort code
parallel read access to IO_CACHE
include/my_sys.h:
parallel read access to IO_CACHE
include/myisam.h:
multithreaded repair-by-sort code
myisam/mi_cache.c:
parallel read access to IO_CACHE
misc cleanups
myisam/mi_check.c:
multithreaded repair-by-sort code
myisam/myisamchk.c:
multithreaded repair-by-sort code
myisam/myisamdef.h:
multithreaded repair-by-sort code
myisam/sort.c:
multithreaded repair-by-sort code
mysys/mf_iocache.c:
parallel read access to IO_CACHE
Diffstat (limited to 'mysys/mf_iocache.c')
-rw-r--r-- | mysys/mf_iocache.c | 118 |
1 files changed, 110 insertions, 8 deletions
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index a0247003a81..db4ebaaea73 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -85,7 +85,11 @@ init_functions(IO_CACHE* info, enum cache_type type) info->write_function = 0; /* Force a core if used */ break; default: - info->read_function = _my_b_read; + info->read_function = +#ifdef THREAD + info->share ? _my_b_read_r : +#endif + _my_b_read; info->write_function = _my_b_write; } @@ -127,6 +131,9 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, info->alloced_buffer = 0; info->buffer=0; info->seek_not_done= test(file >= 0); +#ifdef THREAD + info->share=0; +#endif if (!cachesize) if (! (cachesize= my_default_record_cache_size)) @@ -214,7 +221,6 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, DBUG_RETURN(0); } /* init_io_cache */ - /* Wait until current request is ready */ #ifdef HAVE_AIOWAIT @@ -419,6 +425,90 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) DBUG_RETURN(0); } +#ifdef THREAD +int init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads) +{ + DBUG_ASSERT(info->type == READ_CACHE); + pthread_mutex_init(& s->mutex, MY_MUTEX_INIT_FAST); + pthread_cond_init (& s->cond, 0); + s->count=num_threads; + s->active=0; /* to catch errors */ + info->share=s; + info->read_function=_my_b_read_r; +} + +int remove_io_thread(IO_CACHE *info) +{ + if (errno=pthread_mutex_lock(& info->share->mutex)) + return -1; + if (! info->share->count--) + pthread_cond_signal(& info->share->cond); + pthread_mutex_unlock(& info->share->mutex); + return 0; +} + +int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) +{ + my_off_t pos_in_file; + int length,diff_length,read_len; + DBUG_ENTER("_my_b_read_r"); + + if ((read_len=(uint) (info->read_end-info->read_pos))) + { + DBUG_ASSERT(Count >= read_len); /* User is not using my_b_read() */ + memcpy(Buffer,info->read_pos, (size_t) (read_len)); + Buffer+=read_len; + Count-=read_len; + } + +#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1)) +#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1)) + + while (Count) { + int cnt, len; + + pos_in_file= info->pos_in_file + (uint)(info->read_end - info->buffer); + diff_length= pos_in_file & (IO_SIZE-1); + length=IO_ROUND_UP(Count+diff_length)-diff_length; + length=(length <= info->read_length) ? + length + IO_ROUND_DN(info->read_length - length) : + length - IO_ROUND_UP(length - info->read_length) ; + if (lock_io_cache(info)) + { + info->share->active=info; + if (info->seek_not_done) /* File touched, do seek */ + VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); + len=my_read(info->file,info->buffer, length, info->myflags); + info->read_end=info->buffer + (len == -1 ? 0 : len); + info->error=(len == length ? 0 : len); + info->pos_in_file=pos_in_file; + unlock_io_cache(info); + } + else + { + info->error= info->share->active->error; + info->read_end= info->share->active->read_end; + info->pos_in_file= info->share->active->pos_in_file; + len= (info->error == -1 ? -1 : info->read_end-info->buffer); + } + info->read_pos=info->buffer; + info->seek_not_done=0; + if (info->error) + { + info->error=read_len; + DBUG_RETURN(1); + } + cnt=(len > Count) ? Count : len; + memcpy(Buffer,info->read_pos, (size_t)cnt); + Count -=cnt; + Buffer+=cnt; + read_len+=cnt; + info->read_pos+=cnt; + } + DBUG_RETURN(0); +} +#endif + /* Do sequential read from the SEQ_READ_APPEND cache we do this in three stages: @@ -454,7 +544,7 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) */ VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); info->seek_not_done=0; - + diff_length=(uint) (pos_in_file & (IO_SIZE-1)); /* now the second stage begins - read from file descriptor */ @@ -510,7 +600,7 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) memcpy(Buffer,info->buffer,(size_t) length); Count -= length; Buffer += length; - + /* added the line below to make DBUG_ASSERT(pos_in_file==info->end_of_file) pass. @@ -544,7 +634,7 @@ read_append_buffer: /* TODO: figure out if the assert below is needed or correct. */ - DBUG_ASSERT(pos_in_file == info->end_of_file); + DBUG_ASSERT(pos_in_file == info->end_of_file); copy_len=min(Count, len_in_buff); memcpy(Buffer, info->append_read_pos, copy_len); info->append_read_pos += copy_len; @@ -910,7 +1000,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) if (!(append_cache = (info->type == SEQ_READ_APPEND))) need_append_buffer_lock=0; - + if (info->type == WRITE_CACHE || append_cache) { if (info->file == -1) @@ -919,7 +1009,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) DBUG_RETURN((info->error= -1)); } LOCK_APPEND_BUFFER; - + if ((length=(uint) (info->write_pos - info->write_buffer))) { pos_in_file=info->pos_in_file; @@ -956,7 +1046,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) info->end_of_file+=(info->write_pos-info->append_read_pos); DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0))); } - + info->append_read_pos=info->write_pos=info->write_buffer; UNLOCK_APPEND_BUFFER; DBUG_RETURN(info->error); @@ -980,6 +1070,18 @@ int end_io_cache(IO_CACHE *info) IO_CACHE_CALLBACK pre_close; DBUG_ENTER("end_io_cache"); +#ifdef THREAD + /* simple protection against multi-close: destroying share first */ + if (info->share) + if (pthread_cond_destroy (& info->share->cond) | + pthread_mutex_destroy(& info->share->mutex)) + { + DBUG_RETURN(1); + } + else + info->share=0; +#endif + if ((pre_close=info->pre_close)) (*pre_close)(info); if (info->alloced_buffer) |