diff options
author | serg@serg.mysql.com <> | 2002-06-19 23:54:45 +0000 |
---|---|---|
committer | serg@serg.mysql.com <> | 2002-06-19 23:54:45 +0000 |
commit | 5c83ae3fca5a3a56eb31a6827dae0779a94e52b8 (patch) | |
tree | 9939048d2a201bb88a20ae302cbc0cdfa14fd75d /include | |
parent | 287491719819f27942b55fa46da082a4d0cd4257 (diff) | |
download | mariadb-git-5c83ae3fca5a3a56eb31a6827dae0779a94e52b8.tar.gz |
multithreaded repair-by-sort code
parallel read access to IO_CACHE
Diffstat (limited to 'include')
-rw-r--r-- | include/my_sys.h | 59 | ||||
-rw-r--r-- | include/myisam.h | 71 |
2 files changed, 95 insertions, 35 deletions
diff --git a/include/my_sys.h b/include/my_sys.h index 3ff35763bce..3935a4d5cc0 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -36,7 +36,7 @@ extern int NEAR my_errno; /* Last error in mysys */ #include <m_ctype.h> /* for CHARSET_INFO */ #endif -#include <stdarg.h> +#include <stdarg.h> #define MYSYS_PROGRAM_USES_CURSES() { error_handler_hook = my_message_curses; mysys_uses_curses=1; } #define MYSYS_PROGRAM_DONT_USE_CURSES() { error_handler_hook = my_message_no_curses; mysys_uses_curses=0;} @@ -278,7 +278,7 @@ extern struct my_file_info { my_string name; enum file_type type; -#if defined(THREAD) && !defined(HAVE_PREAD) +#if defined(THREAD) && !defined(HAVE_PREAD) pthread_mutex_t mutex; #endif } my_file_info[MY_NFILE]; @@ -299,6 +299,45 @@ typedef struct st_dynamic_string { struct st_io_cache; typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*); +#ifdef THREAD +typedef struct st_io_cache_share +{ + /* to sync on reads into buffer */ + pthread_mutex_t mutex; + pthread_cond_t cond; + int count; + /* actual IO_CACHE that filled the buffer */ + struct st_io_cache *active; + /* the following will go implemented whenever the need arises */ +#ifdef NOT_IMPLEMENTED + /* whether the structure should be free'd */ + my_bool alloced; +#endif +} IO_CACHE_SHARE; + +#define lock_io_cache(info) \ + ( \ + (errno=pthread_mutex_lock(&((info)->share->mutex))) ? -1 : ( \ + (info)->share->count ? ( \ + --((info)->share->count), \ + pthread_cond_wait(&((info)->share->cond), \ + &((info)->share->mutex)), \ + (++((info)->share->count) ? \ + pthread_mutex_unlock(&((info)->share->mutex)) : 0)) \ + : 1 ) \ + ) + +#define unlock_io_cache(info) \ + ( \ + pthread_cond_broadcast(&((info)->share->cond)), \ + pthread_mutex_unlock (&((info)->share->mutex)) \ + ) +/* -- to catch errors +#else +#define lock_io_cache(info) +#define unlock_io_cache(info) +*/ +#endif typedef struct st_io_cache /* Used when cacheing files */ { @@ -331,10 +370,16 @@ typedef struct st_io_cache /* Used when cacheing files */ WRITE_CACHE, and &read_pos and &read_end respectively otherwise */ byte **current_pos, **current_end; -/* The lock is for append buffer used in SEQ_READ_APPEND cache */ #ifdef THREAD + /* The lock is for append buffer used in SEQ_READ_APPEND cache + need mutex copying from append buffer to read buffer. */ pthread_mutex_t append_buffer_lock; - /* need mutex copying from append buffer to read buffer */ + /* The following is used when several threads are reading the + same file in parallel. They are synchronized on disk + accesses reading the cached part of the file asynchronously. + It should be set to NULL to disable the feature. Only + READ_CACHE mode is supported. */ + IO_CACHE_SHARE *share; #endif /* a caller will use my_b_read() macro to read from the cache if the data is already in cache, it will be simply copied with @@ -626,6 +671,12 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type, my_off_t seek_offset,pbool use_async_io, pbool clear_cache); extern int _my_b_read(IO_CACHE *info,byte *Buffer,uint Count); +#ifdef THREAD +extern int _my_b_read_r(IO_CACHE *info,byte *Buffer,uint Count); +extern int init_io_cache_share(IO_CACHE *info, + IO_CACHE_SHARE *s, uint num_threads); +extern int remove_io_thread(IO_CACHE *info); +#endif extern int _my_b_seq_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_get(IO_CACHE *info); diff --git a/include/myisam.h b/include/myisam.h index b7010dab814..bf7abddbd0e 100644 --- a/include/myisam.h +++ b/include/myisam.h @@ -314,24 +314,6 @@ typedef struct st_sort_key_blocks { /* Used when sorting */ int inited; } SORT_KEY_BLOCKS; -struct st_mi_check_param; - -typedef struct st_sort_info { - MI_INFO *info; - struct st_mi_check_param *param; - enum data_file_type new_data_file_type; - SORT_KEY_BLOCKS *key_block,*key_block_end; - uint key,find_length,real_key_length; - my_off_t pos,max_pos,filepos,start_recpos,filelength,dupp,buff_length; - ha_rows max_records; - ulonglong unique[MI_MAX_KEY_SEG+1]; - my_bool fix_datafile; - char *record,*buff; - void *wordlist, *wordptr; - MI_KEYDEF *keyinfo; - MI_KEYSEG *keyseg; -} SORT_INFO; - typedef struct st_mi_check_param { ulonglong auto_increment_value; @@ -354,7 +336,6 @@ typedef struct st_mi_check_param int tmpfile_createflag; myf myf_rw; IO_CACHE read_cache; - SORT_INFO sort_info; ulonglong unique_count[MI_MAX_KEY_SEG+1]; ha_checksum key_crc[MI_MAX_POSSIBLE_KEY]; ulong rec_per_key_part[MI_MAX_KEY_SEG*MI_MAX_POSSIBLE_KEY]; @@ -363,17 +344,42 @@ typedef struct st_mi_check_param char *op_name; } MI_CHECK; - -typedef struct st_mi_sortinfo { +typedef struct st_sort_info { + MI_INFO *info; + MI_CHECK *param; + enum data_file_type new_data_file_type; + SORT_KEY_BLOCKS *key_block,*key_block_end; + uint kei, total_keys; + my_off_t filelength,dupp,buff_length; ha_rows max_records; + char *buff; + myf myf_rw; + /* sync things*/ + uint got_error, threads_running; + pthread_mutex_t mutex; + pthread_cond_t cond; +} SORT_INFO; + +typedef struct st_mi_sort_param { + pthread_t thr; + IO_CACHE read_cache; + ulonglong unique[MI_MAX_KEY_SEG+1]; + uint key, key_length,real_key_length,sortbuff_size; + uint maxbuffers, keys, find_length, sort_keys_length; + uchar **sort_keys; + void *wordlist, *wordptr; + MI_KEYDEF *keyinfo; SORT_INFO *sort_info; + IO_CACHE tempfile, tempfile_for_exceptions; + DYNAMIC_ARRAY buffpek; + my_off_t pos,max_pos,filepos,start_recpos; + my_bool fix_datafile; + char *record; char *tmpdir; - int (*key_cmp)(SORT_INFO *info, const void *, const void *); - int (*key_read)(SORT_INFO *info,void *buff); - int (*key_write)(SORT_INFO *info, const void *buff); - void (*lock_in_memory)(MI_CHECK *info); - uint key_length; - myf myf_rw; + int (*key_cmp)(struct st_mi_sort_param *, const void *, const void *); + int (*key_read)(struct st_mi_sort_param *,void *); + int (*key_write)(struct st_mi_sort_param *, const void *); + void (*lock_in_memory)(MI_CHECK *); } MI_SORT_PARAM; /* functions in mi_check */ @@ -398,14 +404,17 @@ int flush_blocks(MI_CHECK *param, File file); void update_auto_increment_key(MI_CHECK *param, MI_INFO *info, my_bool repair); int update_state_info(MI_CHECK *param, MI_INFO *info,uint update); +void update_key_parts(MI_KEYDEF *keyinfo, ulong *rec_per_key_part, + ulonglong *unique, ulonglong records); int filecopy(MI_CHECK *param, File to,File from,my_off_t start, my_off_t length, const char *type); int movepoint(MI_INFO *info,byte *record,my_off_t oldpos, my_off_t newpos, uint prot_key); -int sort_write_record(SORT_INFO *sort_info); - int write_data_suffix(MI_CHECK *param, MI_INFO *info); -int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages, - ulong); +int sort_write_record(MI_SORT_PARAM *sort_param); +int write_data_suffix(SORT_INFO *sort_info, my_bool fix_datafile); +int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages, ulong); +void *_thr_find_all_keys(MI_SORT_PARAM *info); +int _thr_write_keys(MI_SORT_PARAM *sort_param); int test_if_almost_full(MI_INFO *info); int recreate_table(MI_CHECK *param, MI_INFO **org_info, char *filename); void mi_disable_non_unique_index(MI_INFO *info, ha_rows rows); |