diff options
author | unknown <serg@serg.mysql.com> | 2002-06-30 14:17:29 +0000 |
---|---|---|
committer | unknown <serg@serg.mysql.com> | 2002-06-30 14:17:29 +0000 |
commit | 1158fd9f2e5934bdcf871791895bec58854c843c (patch) | |
tree | d21dcff9bca4b293a433b84b9356e37d0531494b /mysys/mf_iocache.c | |
parent | ea01d6572a9d8187dcf70753d5bc420a9c2e139b (diff) | |
download | mariadb-git-1158fd9f2e5934bdcf871791895bec58854c843c.tar.gz |
"myisamchk -p" for parallel recover works (no extensive testing though)
include/my_sys.h:
make [un]lock_io_cache functions, not macro
some io_cache_share functions int->void
include/myisam.h:
mi_repair_by_sort_r -> mi_repair_parallel
MI_SORT_PARAM.master field for updating info->s.state struct
myisam/mi_check.c:
mi_repair_by_sort_r -> mi_repair_parallel
MI_SORT_PARAM.master field for updating info->s.state struct
myisam/sort.c:
my_thread_init()/my_thread_end()
misc bugfixes
mysys/mf_iocache.c:
io_cache_share functions int->void
comments added
[un]lock_io_cache functions added
Diffstat (limited to 'mysys/mf_iocache.c')
-rw-r--r-- | mysys/mf_iocache.c | 113 |
1 files changed, 88 insertions, 25 deletions
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index 1a761fa5ed8..add12cff892 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -68,6 +68,9 @@ static void my_aiowait(my_aio_result *result); #define unlock_append_buffer(info) #endif +#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1)) +#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1)) + static void init_functions(IO_CACHE* info, enum cache_type type) { @@ -425,31 +428,71 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) } #ifdef THREAD -int init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads) +/* Prepare IO_CACHE for shared use */ +void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads) { DBUG_ASSERT(info->type == READ_CACHE); - pthread_mutex_init(& s->mutex, MY_MUTEX_INIT_FAST); - pthread_cond_init (& s->cond, 0); - s->count=num_threads; + pthread_mutex_init(&s->mutex, MY_MUTEX_INIT_FAST); + pthread_cond_init (&s->cond, 0); + s->count=num_threads-1; s->active=0; /* to catch errors */ info->share=s; info->read_function=_my_b_read_r; } -int remove_io_thread(IO_CACHE *info) +/* + Remove a thread from shared access to IO_CACHE + Every thread should do that on exit for not + to deadlock other threads +*/ +void remove_io_thread(IO_CACHE *info) { - if (errno=pthread_mutex_lock(& info->share->mutex)) - return -1; + pthread_mutex_lock(&info->share->mutex); if (! info->share->count--) - pthread_cond_signal(& info->share->cond); - pthread_mutex_unlock(& info->share->mutex); + pthread_cond_signal(&info->share->cond); + pthread_mutex_unlock(&info->share->mutex); return 0; } +static int lock_io_cache(IO_CACHE *info) +{ + pthread_mutex_lock(&info->share->mutex); + if (!info->share->count) + return 1; + + --(info->share->count); + pthread_cond_wait(&info->share->cond, &info->share->mutex); + /* + count can be -1 here, if one thread was removed (remove_io_cache) + while all others were locked (lock_io_cache). + If this is the case, this thread behaves as if count was 0 from the + very beginning, that is returns 1 and does not unlock the mutex. + */ + if (++(info->share->count)) + return pthread_mutex_unlock(&info->share->mutex); + else + return 1; +} + +static void unlock_io_cache(IO_CACHE *info) +{ + pthread_cond_broadcast(&info->share->cond); + pthread_mutex_unlock(&info->share->mutex); +} + +/* + Read from IO_CACHE when it is shared between several threads. + It works as follows: when a thread tries to read from a file + (that is, after using all the data from the (shared) buffer), + it just hangs on lock_io_cache(), wating for other threads. + When the very last thread attempts a read, lock_io_cache() + returns 1, the thread does actual IO and unlock_io_cache(), + which signals all the waiting threads that data is in the buffer. +*/ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) { my_off_t pos_in_file; - int length,diff_length,read_len; + uint length,diff_length,read_len; DBUG_ENTER("_my_b_read_r"); if ((read_len=(uint) (info->read_end-info->read_pos))) @@ -460,28 +503,41 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) Count-=read_len; } -#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1)) -#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1)) - - while (Count) { + while (Count) + { int cnt, len; pos_in_file= info->pos_in_file + (uint)(info->read_end - info->buffer); - diff_length= pos_in_file & (IO_SIZE-1); + diff_length= (uint)pos_in_file & (IO_SIZE-1); length=IO_ROUND_UP(Count+diff_length)-diff_length; length=(length <= info->read_length) ? length + IO_ROUND_DN(info->read_length - length) : length - IO_ROUND_UP(length - info->read_length) ; + if (info->type != READ_FIFO && (length > info->end_of_file - pos_in_file)) + length=info->end_of_file - pos_in_file; + if (length == 0) + { + info->error=(int) read_len; + DBUG_RETURN(1); + } if (lock_io_cache(info)) { +#if 0 && SAFE_MUTEX +#define PRINT_LOCK(M) printf("Thread %d: mutex is %s\n", my_thread_id(), \ + (((safe_mutex_t *)(M))->count ? "Locked" : "Unlocked")) +#else +#define PRINT_LOCK(M) +#endif + PRINT_LOCK(& info->share->mutex); info->share->active=info; if (info->seek_not_done) /* File touched, do seek */ VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); - len=my_read(info->file,info->buffer, length, info->myflags); + len=(int)my_read(info->file,info->buffer, length, info->myflags); info->read_end=info->buffer + (len == -1 ? 0 : len); - info->error=(len == length ? 0 : len); + info->error=(len == (int)length ? 0 : len); info->pos_in_file=pos_in_file; unlock_io_cache(info); + PRINT_LOCK(& info->share->mutex); } else { @@ -489,15 +545,16 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) info->read_end= info->share->active->read_end; info->pos_in_file= info->share->active->pos_in_file; len= (info->error == -1 ? -1 : info->read_end-info->buffer); + PRINT_LOCK(& info->share->mutex); } info->read_pos=info->buffer; info->seek_not_done=0; - if (info->error) + if (len <= 0) { - info->error=read_len; + info->error=(int)read_len; DBUG_RETURN(1); } - cnt=(len > Count) ? Count : len; + cnt=(len > Count) ? (int)Count : len; memcpy(Buffer,info->read_pos, (size_t)cnt); Count -=cnt; Buffer+=cnt; @@ -1070,15 +1127,21 @@ int end_io_cache(IO_CACHE *info) DBUG_ENTER("end_io_cache"); #ifdef THREAD - /* simple protection against multi-close: destroying share first */ if (info->share) - if (pthread_cond_destroy (& info->share->cond) | - pthread_mutex_destroy(& info->share->mutex)) + { +#ifdef SAFE_MUTEX + /* simple protection against multi-close: destroying share first */ + if (pthread_cond_destroy (&info->share->cond) | + pthread_mutex_destroy(&info->share->mutex)) { DBUG_RETURN(1); } - else - info->share=0; +#else + pthread_cond_destroy (&info->share->cond); + pthread_mutex_destroy(&info->share->mutex); +#endif + info->share=0; + } #endif if ((pre_close=info->pre_close)) |