diff options
-rw-r--r-- | include/my_sys.h | 81 | ||||
-rw-r--r-- | include/myisam.h | 73 | ||||
-rw-r--r-- | myisam/mi_cache.c | 64 | ||||
-rw-r--r-- | myisam/mi_check.c | 914 | ||||
-rw-r--r-- | myisam/mi_write.c | 3 | ||||
-rw-r--r-- | myisam/myisamchk.c | 56 | ||||
-rw-r--r-- | myisam/myisamdef.h | 8 | ||||
-rw-r--r-- | myisam/sort.c | 470 | ||||
-rw-r--r-- | mysys/mf_iocache.c | 118 |
9 files changed, 1276 insertions, 511 deletions
diff --git a/include/my_sys.h b/include/my_sys.h index 894a9c653a7..4ddd204c406 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -36,7 +36,7 @@ extern int NEAR my_errno; /* Last error in mysys */ #include <m_ctype.h> /* for CHARSET_INFO */ #endif -#include <stdarg.h> +#include <stdarg.h> #define MYSYS_PROGRAM_USES_CURSES() { error_handler_hook = my_message_curses; mysys_uses_curses=1; } #define MYSYS_PROGRAM_DONT_USE_CURSES() { error_handler_hook = my_message_no_curses; mysys_uses_curses=0;} @@ -284,7 +284,7 @@ extern struct my_file_info { my_string name; enum file_type type; -#if defined(THREAD) && !defined(HAVE_PREAD) +#if defined(THREAD) && !defined(HAVE_PREAD) pthread_mutex_t mutex; #endif } my_file_info[MY_NFILE]; @@ -307,6 +307,45 @@ typedef struct st_dynamic_string struct st_io_cache; typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*); +#ifdef THREAD +typedef struct st_io_cache_share +{ + /* to sync on reads into buffer */ + pthread_mutex_t mutex; + pthread_cond_t cond; + int count; + /* actual IO_CACHE that filled the buffer */ + struct st_io_cache *active; + /* the following will be implemented whenever the need arises */ +#ifdef NOT_IMPLEMENTED + /* whether the structure should be free'd */ + my_bool alloced; +#endif +} IO_CACHE_SHARE; + +#define lock_io_cache(info) \ + ( \ + (errno=pthread_mutex_lock(&((info)->share->mutex))) ? -1 : ( \ + (info)->share->count ? ( \ + --((info)->share->count), \ + pthread_cond_wait(&((info)->share->cond), \ + &((info)->share->mutex)), \ + (++((info)->share->count) ? \ + pthread_mutex_unlock(&((info)->share->mutex)) : 0)) \ + : 1 ) \ + ) + +#define unlock_io_cache(info) \ + ( \ + pthread_cond_broadcast(&((info)->share->cond)), \ + pthread_mutex_unlock (&((info)->share->mutex)) \ + ) +/* -- to catch errors +#else +#define lock_io_cache(info) +#define unlock_io_cache(info) +*/ +#endif typedef struct st_io_cache /* Used when cacheing files */ { @@ -347,10 +386,20 @@ typedef struct st_io_cache /* Used when cacheing files */ WRITE_CACHE, and &read_pos and &read_end respectively otherwise */ byte **current_pos, **current_end; -/* The lock is for append buffer used in SEQ_READ_APPEND cache */ #ifdef THREAD + /* + The lock is for append buffer used in SEQ_READ_APPEND cache + need mutex copying from append buffer to read buffer. + */ pthread_mutex_t append_buffer_lock; - /* need mutex copying from append buffer to read buffer */ + /* + The following is used when several threads are reading the + same file in parallel. They are synchronized on disk + accesses reading the cached part of the file asynchronously. + It should be set to NULL to disable the feature. Only + READ_CACHE mode is supported. + */ + IO_CACHE_SHARE *share; #endif /* A caller will use my_b_read() macro to read from the cache @@ -366,10 +415,10 @@ typedef struct st_io_cache /* Used when cacheing files */ be replaced with my_b_append() for a SEQ_READ_APPEND cache */ int (*write_function)(struct st_io_cache *,const byte *,uint); - /* - Specifies the type of the cache. Depending on the type of the cache - certain operations might not be available and yield unpredicatable - results. Details to be documented later + /* + Specifies the type of the cache. Depending on the type of the cache + certain operations might not be available and yield unpredicatable + results. Details to be documented later */ enum cache_type type; /* @@ -400,11 +449,11 @@ typedef struct st_io_cache /* Used when cacheing files */ uint read_length; myf myflags; /* Flags used to my_read/my_write */ /* - alloced_buffer is 1 if the buffer was allocated by init_io_cache() and - 0 if it was supplied by the user. - Currently READ_NET is the only one that will use a buffer allocated - somewhere else - */ + alloced_buffer is 1 if the buffer was allocated by init_io_cache() and + 0 if it was supplied by the user. + Currently READ_NET is the only one that will use a buffer allocated + somewhere else + */ my_bool alloced_buffer; #ifdef HAVE_AIOWAIT /* @@ -635,6 +684,12 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type, my_off_t seek_offset,pbool use_async_io, pbool clear_cache); extern int _my_b_read(IO_CACHE *info,byte *Buffer,uint Count); +#ifdef THREAD +extern int _my_b_read_r(IO_CACHE *info,byte *Buffer,uint Count); +extern int init_io_cache_share(IO_CACHE *info, + IO_CACHE_SHARE *s, uint num_threads); +extern int remove_io_thread(IO_CACHE *info); +#endif extern int _my_b_seq_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_get(IO_CACHE *info); diff --git a/include/myisam.h b/include/myisam.h index 0a7e23e7988..11f814f2c32 100644 --- a/include/myisam.h +++ b/include/myisam.h @@ -319,25 +319,6 @@ typedef struct st_sort_key_blocks /* Used when sorting */ int inited; } SORT_KEY_BLOCKS; -struct st_mi_check_param; - -typedef struct st_sort_info -{ - MI_INFO *info; - struct st_mi_check_param *param; - enum data_file_type new_data_file_type; - SORT_KEY_BLOCKS *key_block,*key_block_end; - uint key,find_length,real_key_length; - my_off_t pos,max_pos,filepos,start_recpos,filelength,dupp,buff_length; - ha_rows max_records; - ulonglong unique[MI_MAX_KEY_SEG+1]; - my_bool fix_datafile; - char *record,*buff; - void *wordlist, *wordptr; - MI_KEYDEF *keyinfo; - MI_KEYSEG *keyseg; -} SORT_INFO; - typedef struct st_mi_check_param { ulonglong auto_increment_value; @@ -360,7 +341,6 @@ typedef struct st_mi_check_param int tmpfile_createflag; myf myf_rw; IO_CACHE read_cache; - SORT_INFO sort_info; ulonglong unique_count[MI_MAX_KEY_SEG+1]; ha_checksum key_crc[MI_MAX_POSSIBLE_KEY]; ulong rec_per_key_part[MI_MAX_KEY_SEG*MI_MAX_POSSIBLE_KEY]; @@ -369,18 +349,44 @@ typedef struct st_mi_check_param char *op_name; } MI_CHECK; - -typedef struct st_mi_sortinfo +typedef struct st_sort_info { + MI_INFO *info; + MI_CHECK *param; + enum data_file_type new_data_file_type; + SORT_KEY_BLOCKS *key_block,*key_block_end; + uint kei, total_keys; + my_off_t filelength,dupp,buff_length; ha_rows max_records; + char *buff; + myf myf_rw; + /* sync things*/ + uint got_error, threads_running; + pthread_mutex_t mutex; + pthread_cond_t cond; +} SORT_INFO; + +typedef struct st_mi_sort_param +{ + pthread_t thr; + IO_CACHE read_cache; + ulonglong unique[MI_MAX_KEY_SEG+1]; + uint key, key_length,real_key_length,sortbuff_size; + uint maxbuffers, keys, find_length, sort_keys_length; + uchar **sort_keys; + void *wordlist, *wordptr; + MI_KEYDEF *keyinfo; SORT_INFO *sort_info; + IO_CACHE tempfile, tempfile_for_exceptions; + DYNAMIC_ARRAY buffpek; + my_off_t pos,max_pos,filepos,start_recpos; + my_bool fix_datafile; + char *record; char *tmpdir; - int (*key_cmp)(SORT_INFO *info, const void *, const void *); - int (*key_read)(SORT_INFO *info,void *buff); - int (*key_write)(SORT_INFO *info, const void *buff); - void (*lock_in_memory)(MI_CHECK *info); - uint key_length; - myf myf_rw; + int (*key_cmp)(struct st_mi_sort_param *, const void *, const void *); + int (*key_read)(struct st_mi_sort_param *,void *); + int (*key_write)(struct st_mi_sort_param *, const void *); + void (*lock_in_memory)(MI_CHECK *); } MI_SORT_PARAM; /* functions in mi_check */ @@ -405,14 +411,17 @@ int flush_blocks(MI_CHECK *param, File file); void update_auto_increment_key(MI_CHECK *param, MI_INFO *info, my_bool repair); int update_state_info(MI_CHECK *param, MI_INFO *info,uint update); +void update_key_parts(MI_KEYDEF *keyinfo, ulong *rec_per_key_part, + ulonglong *unique, ulonglong records); int filecopy(MI_CHECK *param, File to,File from,my_off_t start, my_off_t length, const char *type); int movepoint(MI_INFO *info,byte *record,my_off_t oldpos, my_off_t newpos, uint prot_key); -int sort_write_record(SORT_INFO *sort_info); -int write_data_suffix(MI_CHECK *param, MI_INFO *info); -int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages, - ulong); +int sort_write_record(MI_SORT_PARAM *sort_param); +int write_data_suffix(SORT_INFO *sort_info, my_bool fix_datafile); +int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages, ulong); +void *_thr_find_all_keys(MI_SORT_PARAM *info); +int _thr_write_keys(MI_SORT_PARAM *sort_param); int test_if_almost_full(MI_INFO *info); int recreate_table(MI_CHECK *param, MI_INFO **org_info, char *filename); void mi_disable_non_unique_index(MI_INFO *info, ha_rows rows); diff --git a/myisam/mi_cache.c b/myisam/mi_cache.c index 55f85fb99d1..bd3c4cd213c 100644 --- a/myisam/mi_cache.c +++ b/myisam/mi_cache.c @@ -14,14 +14,26 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -/* Functions for read record cacheing with myisam */ -/* Used instead of my_b_read() to allow for no-cacheed seeks */ +/* + Functions for read record cacheing with myisam + Used for reading dynamic/compressed records from datafile. -#include "myisamdef.h" + Can fetch data directly from file (outside cache), + if reading a small chunk straight before the cached part (with possible + overlap). + + Can be explicitly asked not to use cache (by not setting READING_NEXT in + flag) - useful for occasional out-of-cache reads, when the next read is + expected to hit the cache again. + + Allows "partial read" errors in the record header (when READING_HEADER flag + is set) - unread part is bzero'ed + + Note: out-of-cache reads are disabled for shared IO_CACHE's +*/ - /* Copy block from cache if it`s in it. If re_read_if_possibly is */ - /* set read to cache (if after current file-position) else read to */ - /* buff */ + +#include "myisamdef.h" int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length, int flag) @@ -31,7 +43,7 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length, char *in_buff_pos; DBUG_ENTER("_mi_read_cache"); - if (pos < info->pos_in_file) + if (pos < info->pos_in_file && ! info->share) { read_length=length; if ((my_off_t) read_length > (my_off_t) (info->pos_in_file-pos)) @@ -44,7 +56,8 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length, pos+=read_length; buff+=read_length; } - if ((offset= (my_off_t) (pos - info->pos_in_file)) < + if (pos >= info->pos_in_file && + (offset= (my_off_t) (pos - info->pos_in_file)) < (my_off_t) (info->read_end - info->request_pos)) { in_buff_pos=info->request_pos+(uint) offset; @@ -57,10 +70,10 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length, } else in_buff_length=0; - if (flag & READING_NEXT) + if (flag & READING_NEXT || info->share) { - if (pos != ((info)->pos_in_file + - (uint) ((info)->read_end - (info)->request_pos))) + if (pos != + (info->pos_in_file + (uint) (info->read_end - info->request_pos))) { info->pos_in_file=pos; /* Force start here */ info->read_pos=info->read_end=info->request_pos; /* Everything used */ @@ -70,34 +83,25 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length, info->read_pos=info->read_end; /* All block used */ if (!(*info->read_function)(info,buff,length)) DBUG_RETURN(0); - if (!(flag & READING_HEADER) || info->error == -1 || - (uint) info->error+in_buff_length < 3) - { - DBUG_PRINT("error", - ("Error %d reading next-multi-part block (Got %d bytes)", - my_errno, info->error)); - if (!my_errno || my_errno == -1) - my_errno=HA_ERR_WRONG_IN_RECORD; - DBUG_RETURN(1); - } - bzero(buff+info->error,MI_BLOCK_INFO_HEADER_LENGTH - in_buff_length - - (uint) info->error); - DBUG_RETURN(0); + read_length=info->error; + } + else + { + info->seek_not_done=1; + if ((read_length=my_pread(info->file,buff,length,pos,MYF(0))) == length) + DBUG_RETURN(0); } - info->seek_not_done=1; - if ((read_length=my_pread(info->file,buff,length,pos,MYF(0))) == length) - DBUG_RETURN(0); if (!(flag & READING_HEADER) || (int) read_length == -1 || read_length+in_buff_length < 3) { DBUG_PRINT("error", - ("Error %d reading new block (Got %d bytes)", - my_errno, (int) read_length)); + ("Error %d reading next-multi-part block (Got %d bytes)", + my_errno, (int) read_length)); if (!my_errno || my_errno == -1) my_errno=HA_ERR_WRONG_IN_RECORD; DBUG_RETURN(1); } bzero(buff+read_length,MI_BLOCK_INFO_HEADER_LENGTH - in_buff_length - - read_length); + read_length); DBUG_RETURN(0); } /* _mi_read_cache */ diff --git a/myisam/mi_check.c b/myisam/mi_check.c index bae47061a04..8441666641c 100644 --- a/myisam/mi_check.c +++ b/myisam/mi_check.c @@ -45,26 +45,22 @@ static int writekeys(MI_CHECK *param, MI_INFO *info,byte *buff, my_off_t filepos); static int sort_one_index(MI_CHECK *param, MI_INFO *info,MI_KEYDEF *keyinfo, my_off_t pagepos, File new_file); -static int sort_key_read(SORT_INFO *sort_info,void *key); -static int sort_ft_key_read(SORT_INFO *sort_info,void *key); -static int sort_get_next_record(SORT_INFO *sort_info); -static int sort_key_cmp(SORT_INFO *sort_info, const void *a,const void *b); -static int sort_key_write(SORT_INFO *sort_info, const void *a); +static int sort_key_read(MI_SORT_PARAM *sort_param,void *key); +static int sort_ft_key_read(MI_SORT_PARAM *sort_param,void *key); +static int sort_get_next_record(MI_SORT_PARAM *sort_param); +static int sort_key_cmp(MI_SORT_PARAM *sort_param, const void *a,const void *b); +static int sort_key_write(MI_SORT_PARAM *sort_param, const void *a); static my_off_t get_record_for_key(MI_INFO *info,MI_KEYDEF *keyinfo, uchar *key); -static int sort_insert_key(MI_CHECK *param, reg1 SORT_KEY_BLOCKS *key_block, +static int sort_insert_key(MI_SORT_PARAM *sort_param, + reg1 SORT_KEY_BLOCKS *key_block, uchar *key, my_off_t prev_block); -static int sort_delete_record(MI_CHECK *param); +static int sort_delete_record(MI_SORT_PARAM *sort_param); /*static int flush_pending_blocks(MI_CHECK *param);*/ static SORT_KEY_BLOCKS *alloc_key_blocks(MI_CHECK *param, uint blocks, uint buffer_length); -static void update_key_parts(MI_KEYDEF *keyinfo, - ulong *rec_per_key_part, - ulonglong *unique, - ulonglong records); static ha_checksum mi_byte_checksum(const byte *buf, uint length); -static void set_data_file_type(MI_CHECK *param, SORT_INFO *info, - MYISAM_SHARE *share); +static void set_data_file_type(SORT_INFO *sort_info, MYISAM_SHARE *share); #ifdef __WIN__ static double ulonglong2double(ulonglong value) @@ -80,7 +76,7 @@ static double ulonglong2double(ulonglong value) #else #define my_off_t2double(A) ((double) (A)) #endif /* SIZEOF_OFF_T > 4 */ -#endif +#endif /* __WIN__ */ void myisamchk_init(MI_CHECK *param) { @@ -96,7 +92,6 @@ void myisamchk_init(MI_CHECK *param) param->sort_key_blocks=BUFFERS_WHEN_SORTING; param->tmpfile_createflag=O_RDWR | O_TRUNC | O_EXCL; param->myf_rw=MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL); - param->sort_info.param=param; param->start_check_pos=0; } @@ -1070,7 +1065,7 @@ int chk_data_link(MI_CHECK *param, MI_INFO *info,int extend) if (used != 0 && ! param->error_printed) { printf("Records:%18s M.recordlength:%9lu Packed:%14.0f%%\n", - llstr(records,llbuff), (long)((used-link_used)/records), + llstr(records,llbuff), (long)((used-link_used)/records), (info->s->base.blobs ? 0.0 : (ulonglong2double((ulonglong) info->s->base.reclength*records)- my_off_t2double(used))/ @@ -1112,18 +1107,18 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, File new_file; MYISAM_SHARE *share=info->s; char llbuff[22],llbuff2[22]; - SORT_INFO *sort_info= ¶m->sort_info; + SORT_INFO sort_info; + MI_SORT_PARAM sort_param; DBUG_ENTER("mi_repair"); - sort_info->buff=sort_info->record=0; + bzero((char *)&sort_info, sizeof(sort_info)); + bzero((char *)&sort_param, sizeof(sort_param)); start_records=info->state->records; new_header_length=(param->testflag & T_UNPACK) ? 0L : share->pack.header_length; got_error=1; new_file= -1; - sort_info->buff=0; - sort_info->buff_length=0; - sort_info->record=0; + sort_param.sort_info=&sort_info; if (!(param->testflag & T_SILENT)) { @@ -1147,8 +1142,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, MYF(MY_WME | MY_WAIT_IF_FULL))) goto err; info->opt_flag|=WRITE_CACHE_USED; - sort_info->start_recpos=0; - if (!(sort_info->record=(byte*) my_malloc((uint) share->base.pack_reclength, + if (!(sort_param.record=(byte*) my_malloc((uint) share->base.pack_reclength, MYF(0)))) { mi_check_print_error(param,"Not enough memory for extra record"); @@ -1161,8 +1155,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, fn_format(param->temp_filename,name,"", MI_NAME_DEXT,2+4+32); if ((new_file=my_raid_create(fn_format(param->temp_filename, param->temp_filename,"", - DATA_TMP_EXT, - 2+4), + DATA_TMP_EXT, 2+4), 0,param->tmpfile_createflag, share->base.raid_type, share->base.raid_chunks, @@ -1184,16 +1177,18 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, mi_int2store(share->state.header.options,share->options); } } - sort_info->info=info; - sort_info->pos=sort_info->max_pos=share->pack.header_length; - sort_info->filepos=new_header_length; - param->read_cache.end_of_file=sort_info->filelength= + sort_info.info=info; + sort_info.param = param; + sort_param.read_cache=param->read_cache; + sort_param.pos=sort_param.max_pos=share->pack.header_length; + sort_param.filepos=new_header_length; + param->read_cache.end_of_file=sort_info.filelength= my_seek(info->dfile,0L,MY_SEEK_END,MYF(0)); - sort_info->dupp=0; - sort_info->fix_datafile= (my_bool) (! rep_quick); - sort_info->max_records= ~(ha_rows) 0; + sort_info.dupp=0; + sort_param.fix_datafile= (my_bool) (! rep_quick); + sort_info.max_records= ~(ha_rows) 0; - set_data_file_type(param, sort_info, share); + set_data_file_type(&sort_info, share); del=info->state->del; info->state->records=info->state->del=share->state.split=0; info->state->empty=0; @@ -1210,7 +1205,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, /* I think mi_repair and mi_repair_by_sort should do the same (according, e.g. to ha_myisam::repair), but as mi_repair doesn't touch key_map it cannot be used to T_CREATE_MISSING_KEYS. - That is the next line for... (serg) + That is what the next line is for... (serg) */ share->state.key_map= ((((ulonglong) 1L << share->base.keys)-1) & @@ -1219,25 +1214,25 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, info->state->key_file_length=share->base.keystart; lock_memory(param); /* Everything is alloced */ - while (!(error=sort_get_next_record(sort_info))) + while (!(error=sort_get_next_record(&sort_param))) { - if (writekeys(param, info,(byte*) sort_info->record,sort_info->filepos)) + if (writekeys(param,info,(byte*)sort_param.record,sort_param.filepos)) { if (my_errno != HA_ERR_FOUND_DUPP_KEY) goto err; - DBUG_DUMP("record",(byte*) sort_info->record,share->base.pack_reclength); + DBUG_DUMP("record",(byte*) sort_param.record,share->base.pack_reclength); mi_check_print_info(param,"Duplicate key %2d for record at %10s against new record at %10s", info->errkey+1, - llstr(sort_info->start_recpos,llbuff), + llstr(sort_param.start_recpos,llbuff), llstr(info->dupp_key_pos,llbuff2)); if (param->testflag & T_VERBOSE) { VOID(_mi_make_key(info,(uint) info->errkey,info->lastkey, - sort_info->record,0L)); + sort_param.record,0L)); _mi_print_key(stdout,share->keyinfo[info->errkey].seg,info->lastkey, USE_WHOLE_KEY); } - sort_info->dupp++; + sort_info.dupp++; if (!(rep_quick & T_FORCE_UNIQUENESS)) { param->testflag|=T_RETRY_WITHOUT_QUICK; @@ -1246,10 +1241,10 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, } continue; } - if (sort_write_record(sort_info)) + if (sort_write_record(&sort_param)) goto err; } - if (error > 0 || write_data_suffix(param,info) || + if (error > 0 || write_data_suffix(&sort_info, (my_bool)!rep_quick) || flush_io_cache(&info->rec_cache) || param->read_cache.error < 0) goto err; @@ -1265,7 +1260,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, goto err; } - if (rep_quick && del+sort_info->dupp != info->state->del) + if (rep_quick && del+sort_info.dupp != info->state->del) { mi_check_print_error(param,"Couldn't fix table with quick recovery: Found wrong number of deleted records"); mi_check_print_error(param,"Run recovery again without -q"); @@ -1289,12 +1284,12 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, { my_close(info->dfile,MYF(0)); info->dfile=new_file; - info->state->data_file_length=sort_info->filepos; + info->state->data_file_length=sort_param.filepos; share->state.version=(ulong) time((time_t*) 0); /* Force reopen */ } else { - info->state->data_file_length=sort_info->max_pos; + info->state->data_file_length=sort_param.max_pos; } if (param->testflag & T_CALC_CHECKSUM) share->state.checksum=param->glob_crc; @@ -1303,10 +1298,10 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, { if (start_records != info->state->records) printf("Data records: %s\n", llstr(info->state->records,llbuff)); - if (sort_info->dupp) + if (sort_info.dupp) mi_check_print_warning(param, "%s records have been removed", - llstr(sort_info->dupp,llbuff)); + llstr(sort_info.dupp,llbuff)); } got_error=0; @@ -1334,7 +1329,7 @@ err: { if (! param->error_printed) mi_check_print_error(param,"%d for record at pos %s",my_errno, - llstr(sort_info->start_recpos,llbuff)); + llstr(sort_param.start_recpos,llbuff)); if (new_file >= 0) { VOID(my_close(new_file,MYF(0))); @@ -1343,8 +1338,8 @@ err: } mi_mark_crashed_on_repair(info); } - my_free(sort_info->record,MYF(MY_ALLOW_ZERO_PTR)); - my_free(sort_info->buff,MYF(MY_ALLOW_ZERO_PTR)); + my_free(sort_param.record,MYF(MY_ALLOW_ZERO_PTR)); + my_free(sort_info.buff,MYF(MY_ALLOW_ZERO_PTR)); VOID(end_io_cache(¶m->read_cache)); info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED); VOID(end_io_cache(&info->rec_cache)); @@ -1353,7 +1348,7 @@ err: { share->state.header.options[0]&= (uchar) ~HA_OPTION_COMPRESS_RECORD; share->pack.header_length=0; - share->data_file_type=sort_info->new_data_file_type; + share->data_file_type=sort_info.new_data_file_type; } share->state.changed|= (STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES | STATE_NOT_ANALYZED); @@ -1741,7 +1736,6 @@ err: type,my_errno); DBUG_RETURN(1); } - /* Fix table or given index using sorting */ /* saves new table in temp_filename */ @@ -1756,9 +1750,10 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, File new_file; MI_SORT_PARAM sort_param; MYISAM_SHARE *share=info->s; + MI_KEYSEG *keyseg; ulong *rec_per_key_part; char llbuff[22]; - SORT_INFO *sort_info= ¶m->sort_info; + SORT_INFO sort_info; ulonglong key_map=share->state.key_map; DBUG_ENTER("mi_repair_by_sort"); @@ -1773,8 +1768,8 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, printf("Data records: %s\n", llstr(start_records,llbuff)); } - bzero((char*) sort_info,sizeof(*sort_info)); - if (!(sort_info->key_block= + bzero((char*)&sort_info,sizeof(sort_info)); + if (!(sort_info.key_block= alloc_key_blocks(param, (uint) param->sort_key_blocks, share->base.max_key_block_length)) @@ -1787,11 +1782,11 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, WRITE_CACHE,new_header_length,1, MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw))) goto err; - sort_info->key_block_end=sort_info->key_block+param->sort_key_blocks; + sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks; info->opt_flag|=WRITE_CACHE_USED; info->rec_cache.file=info->dfile; /* for sort_delete_record */ - if (!(sort_info->record=(byte*) my_malloc((uint) share->base.pack_reclength, + if (!(sort_param.record=(byte*) my_malloc((uint) share->base.pack_reclength, MYF(0)))) { mi_check_print_error(param,"Not enough memory for extra record"); @@ -1849,17 +1844,17 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, key_map= ~key_map; /* Create the missing keys */ } - sort_info->info=info; - sort_info->param = param; + sort_info.info=info; + sort_info.param = param; - set_data_file_type(param, sort_info, share); - sort_info->filepos=new_header_length; - sort_info->dupp=0; - sort_info->buff=0; - param->read_cache.end_of_file=sort_info->filelength= + set_data_file_type(&sort_info, share); + sort_param.filepos=new_header_length; + sort_info.dupp=0; + sort_info.buff=0; + param->read_cache.end_of_file=sort_info.filelength= my_seek(param->read_cache.file,0L,MY_SEEK_END,MYF(0)); - sort_info->wordlist=NULL; + sort_param.wordlist=NULL; if (share->data_file_type == DYNAMIC_RECORD) length=max(share->base.min_pack_length+1,share->base.min_block_length); @@ -1867,15 +1862,15 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, length=share->base.min_block_length; else length=share->base.pack_reclength; - sort_param.max_records=sort_info->max_records= + sort_info.max_records= ((param->testflag & T_TRUST_HEADER) ? info->state->records : - (ha_rows) (sort_info->filelength/length+1)); + (ha_rows) (sort_info.filelength/length+1)); sort_param.key_cmp=sort_key_cmp; sort_param.key_write=sort_key_write; sort_param.lock_in_memory=lock_memory; sort_param.tmpdir=param->tmpdir; - sort_param.myf_rw=param->myf_rw; - sort_param.sort_info=sort_info; + sort_param.sort_info=&sort_info; + sort_param.fix_datafile= (my_bool) (! rep_quick); del=info->state->del; param->glob_crc=0; @@ -1883,44 +1878,44 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, param->calc_checksum=1; rec_per_key_part= param->rec_per_key_part; - for (sort_info->key=0 ; sort_info->key < share->base.keys ; - rec_per_key_part+=sort_info->keyinfo->keysegs, sort_info->key++) + for (sort_param.key=0 ; sort_param.key < share->base.keys ; + rec_per_key_part+=sort_param.keyinfo->keysegs, sort_param.key++) { - sort_info->keyinfo=share->keyinfo+sort_info->key; - if (!(((ulonglong) 1 << sort_info->key) & key_map)) + sort_param.read_cache=param->read_cache; + sort_param.keyinfo=share->keyinfo+sort_param.key; + if (!(((ulonglong) 1 << sort_param.key) & key_map)) { /* Remember old statistics for key */ memcpy((char*) rec_per_key_part, (char*) share->state.rec_per_key_part+ (uint) (rec_per_key_part - param->rec_per_key_part), - sort_info->keyinfo->keysegs*sizeof(*rec_per_key_part)); + sort_param.keyinfo->keysegs*sizeof(*rec_per_key_part)); continue; } if ((!(param->testflag & T_SILENT))) - printf ("- Fixing index %d\n",sort_info->key+1); - sort_info->max_pos=sort_info->pos=share->pack.header_length; - sort_info->keyseg=sort_info->keyinfo->seg; - sort_info->fix_datafile= (my_bool) (sort_info->key == 0 && ! rep_quick); - bzero((char*) sort_info->unique,sizeof(sort_info->unique)); + printf ("- Fixing index %d\n",sort_param.key+1); + sort_param.max_pos=sort_param.pos=share->pack.header_length; + keyseg=sort_param.keyinfo->seg; + bzero((char*) sort_param.unique,sizeof(sort_param.unique)); sort_param.key_length=share->rec_reflength; - for (i=0 ; sort_info->keyseg[i].type != HA_KEYTYPE_END; i++) + for (i=0 ; keyseg[i].type != HA_KEYTYPE_END; i++) { - sort_param.key_length+=sort_info->keyseg[i].length; - if (sort_info->keyseg[i].flag & HA_SPACE_PACK) - sort_param.key_length+=get_pack_length(sort_info->keyseg[i].length); - if (sort_info->keyseg[i].flag & (HA_BLOB_PART | HA_VAR_LENGTH)) - sort_param.key_length+=2 + test(sort_info->keyseg[i].length >= 127); - if (sort_info->keyseg[i].flag & HA_NULL_PART) + sort_param.key_length+=keyseg[i].length; + if (keyseg[i].flag & HA_SPACE_PACK) + sort_param.key_length+=get_pack_length(keyseg[i].length); + if (keyseg[i].flag & (HA_BLOB_PART | HA_VAR_LENGTH)) + sort_param.key_length+=2 + test(keyseg[i].length >= 127); + if (keyseg[i].flag & HA_NULL_PART) sort_param.key_length++; } info->state->records=info->state->del=share->state.split=0; info->state->empty=0; - if (sort_info->keyinfo->flag & HA_FULLTEXT) + if (sort_param.keyinfo->flag & HA_FULLTEXT) { - sort_param.max_records=sort_info->max_records= - (ha_rows) (sort_info->filelength/ft_max_word_len_for_sort+1); + sort_info.max_records= + (ha_rows) (sort_info.filelength/ft_max_word_len_for_sort+1); sort_param.key_read=sort_ft_key_read; sort_param.key_length+=ft_max_word_len_for_sort-ft_max_word_len; @@ -1938,18 +1933,17 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, param->calc_checksum=0; /* No need to calc glob_crc */ /* Set for next loop */ - sort_param.max_records=sort_info->max_records= - (ha_rows) info->state->records; + sort_info.max_records= (ha_rows) info->state->records; if (param->testflag & T_STATISTICS) - update_key_parts(sort_info->keyinfo, rec_per_key_part, sort_info->unique, + update_key_parts(sort_param.keyinfo, rec_per_key_part, sort_param.unique, (ulonglong) info->state->records); - share->state.key_map|=(ulonglong) 1 << sort_info->key; + share->state.key_map|=(ulonglong) 1 << sort_param.key; - if (sort_info->fix_datafile) + if (sort_param.fix_datafile) { - param->read_cache.end_of_file=sort_info->filepos; - if (write_data_suffix(param,info) || end_io_cache(&info->rec_cache)) + param->read_cache.end_of_file=sort_param.filepos; + if (write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache)) goto err; if (param->testflag & T_SAFE_REPAIR) { @@ -1961,23 +1955,24 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, } } share->state.state.data_file_length = info->state->data_file_length - = sort_info->filepos; + = sort_param.filepos; /* Only whole records */ share->state.version=(ulong) time((time_t*) 0); my_close(info->dfile,MYF(0)); info->dfile=new_file; - share->data_file_type=sort_info->new_data_file_type; + share->data_file_type=sort_info.new_data_file_type; share->pack.header_length=(ulong) new_header_length; + sort_param.fix_datafile=0; } else - info->state->data_file_length=sort_info->max_pos; + info->state->data_file_length=sort_param.max_pos; /*if (flush_pending_blocks(param)) goto err;*/ param->read_cache.file=info->dfile; /* re-init read cache */ - reinit_io_cache(¶m->read_cache,READ_CACHE,share->pack.header_length,1, - 1); + reinit_io_cache(¶m->read_cache,READ_CACHE,share->pack.header_length, + 1,1); } if (param->testflag & T_WRITE_LOOP) @@ -1985,7 +1980,7 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, VOID(fputs(" \r",stdout)); VOID(fflush(stdout)); } - if (rep_quick && del+sort_info->dupp != info->state->del) + if (rep_quick && del+sort_info.dupp != info->state->del) { mi_check_print_error(param,"Couldn't fix table with quick recovery: Found wrong number of deleted records"); mi_check_print_error(param,"Run recovery again without -q"); @@ -2005,7 +2000,7 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, skr < share->base.reloc*share->base.min_pack_length) skr=share->base.reloc*share->base.min_pack_length; #endif - if (skr != sort_info->filelength && !info->s->base.raid_type) + if (skr != sort_info.filelength && !info->s->base.raid_type) if (my_chsize(info->dfile,skr,MYF(0))) mi_check_print_warning(param, "Can't change size of datafile, error: %d", @@ -2023,10 +2018,10 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, { if (start_records != info->state->records) printf("Data records: %s\n", llstr(info->state->records,llbuff)); - if (sort_info->dupp) + if (sort_info.dupp) mi_check_print_warning(param, "%s records have been removed", - llstr(sort_info->dupp,llbuff)); + llstr(sort_info.dupp,llbuff)); } got_error=0; @@ -2069,9 +2064,9 @@ err: share->state.changed&= ~STATE_NOT_OPTIMIZED_KEYS; share->state.changed|=STATE_NOT_SORTED_PAGES; - my_free((gptr) sort_info->key_block,MYF(MY_ALLOW_ZERO_PTR)); - my_free(sort_info->record,MYF(MY_ALLOW_ZERO_PTR)); - my_free(sort_info->buff,MYF(MY_ALLOW_ZERO_PTR)); + my_free((gptr) sort_info.key_block,MYF(MY_ALLOW_ZERO_PTR)); + my_free(sort_param.record,MYF(MY_ALLOW_ZERO_PTR)); + my_free(sort_info.buff,MYF(MY_ALLOW_ZERO_PTR)); VOID(end_io_cache(¶m->read_cache)); info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED); if (!got_error && (param->testflag & T_UNPACK)) @@ -2082,18 +2077,376 @@ err: DBUG_RETURN(got_error); } + /* same as mi_repair_by_sort */ + /* but do it multithreaded */ + +int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info, + const char * name, int rep_quick) +{ + int got_error; + uint i,key; + ulong length; + ha_rows start_records; + my_off_t new_header_length,del; + File new_file; + MI_SORT_PARAM *sort_param=0, *sinfo; + MYISAM_SHARE *share=info->s; + ulong *rec_per_key_part; + MI_KEYSEG *keyseg; + char llbuff[22]; + IO_CACHE_SHARE io_share; + SORT_INFO sort_info; + ulonglong key_map=share->state.key_map; + DBUG_ENTER("mi_repair_by_sort_r"); + + start_records=info->state->records; + got_error=1; + new_file= -1; + new_header_length=(param->testflag & T_UNPACK) ? 0 : + share->pack.header_length; + if (!(param->testflag & T_SILENT)) + { + printf("- parallel recovering (with sort) MyISAM-table '%s'\n",name); + printf("Data records: %s\n", llstr(start_records,llbuff)); + } + + bzero((char*)&sort_info,sizeof(sort_info)); + if (!(sort_info.key_block= + alloc_key_blocks(param, + (uint) param->sort_key_blocks, + share->base.max_key_block_length)) + || init_io_cache(¶m->read_cache,info->dfile, + (uint) param->read_buffer_length, + READ_CACHE,share->pack.header_length,1,MYF(MY_WME)) || + (! rep_quick && + init_io_cache(&info->rec_cache,info->dfile, + (uint) param->write_buffer_length, + WRITE_CACHE,new_header_length,1, + MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw))) + goto err; + sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks; + info->opt_flag|=WRITE_CACHE_USED; + info->rec_cache.file=info->dfile; /* for sort_delete_record */ + + if (!rep_quick) + { + /* Get real path for data file */ + fn_format(param->temp_filename,name,"", MI_NAME_DEXT,2+4+32); + if ((new_file=my_raid_create(fn_format(param->temp_filename, + param->temp_filename, "", + DATA_TMP_EXT, + 2+4), + 0,param->tmpfile_createflag, + share->base.raid_type, + share->base.raid_chunks, + share->base.raid_chunksize, + MYF(0))) < 0) + { + mi_check_print_error(param,"Can't create new tempfile: '%s'", + param->temp_filename); + goto err; + } + if (filecopy(param, new_file,info->dfile,0L,new_header_length, + "datafile-header")) + goto err; + if (param->testflag & T_UNPACK) + { + share->options&= ~HA_OPTION_COMPRESS_RECORD; + mi_int2store(share->state.header.options,share->options); + } + share->state.dellink= HA_OFFSET_ERROR; + info->rec_cache.file=new_file; + } + + info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED); + if (!(param->testflag & T_CREATE_MISSING_KEYS)) + { + /* + Flush key cache for this file if we are calling this outside + myisamchk + */ + flush_key_blocks(share->kfile, FLUSH_IGNORE_CHANGED); + /* Clear the pointers to the given rows */ + for (i=0 ; i < share->base.keys ; i++) + share->state.key_root[i]= HA_OFFSET_ERROR; + for (i=0 ; i < share->state.header.max_block_size ; i++) + share->state.key_del[i]= HA_OFFSET_ERROR; + info->state->key_file_length=share->base.keystart; + } + else + { + if (flush_key_blocks(share->kfile, FLUSH_FORCE_WRITE)) + goto err; + key_map= ~key_map; /* Create the missing keys */ + } + + sort_info.info=info; + sort_info.param = param; + + set_data_file_type(&sort_info, share); + sort_info.dupp=0; + sort_info.buff=0; + param->read_cache.end_of_file=sort_info.filelength= + my_seek(param->read_cache.file,0L,MY_SEEK_END,MYF(0)); + + if (share->data_file_type == DYNAMIC_RECORD) + length=max(share->base.min_pack_length+1,share->base.min_block_length); + else if (share->data_file_type == COMPRESSED_RECORD) + length=share->base.min_block_length; + else + length=share->base.pack_reclength; + sort_info.max_records= + ((param->testflag & T_TRUST_HEADER) ? info->state->records : + (ha_rows) (sort_info.filelength/length+1)); + + del=info->state->del; + param->glob_crc=0; + if (param->testflag & T_CALC_CHECKSUM) + param->calc_checksum=1; + + if (!(sort_param=(MI_SORT_PARAM *) + my_malloc((uint) share->base.keys * + (sizeof(MI_SORT_PARAM) + share->base.pack_reclength), + MYF(MY_ZEROFILL)))) + { + mi_check_print_error(param,"Not enough memory!"); + goto err; + } + length=0; + rec_per_key_part= param->rec_per_key_part; + info->state->records=info->state->del=share->state.split=0; + info->state->empty=0; + + for (i=key=0 ; key < share->base.keys ; + rec_per_key_part+=sort_param[i].keyinfo->keysegs, i++, key++) + { + sort_param[i].key=key; + sort_param[i].keyinfo=share->keyinfo+key; + if (!(((ulonglong) 1 << key) & key_map)) + { + /* Remember old statistics for key */ + memcpy((char*) rec_per_key_part, + (char*) share->state.rec_per_key_part+ + (uint) (rec_per_key_part - param->rec_per_key_part), + sort_param[i].keyinfo->keysegs*sizeof(*rec_per_key_part)); + i--; + continue; + } + if ((!(param->testflag & T_SILENT))) + printf ("- Fixing index %d\n",key+1); + sort_param[i].key_read=(sort_param[i].keyinfo->flag & HA_FULLTEXT) ? + sort_ft_key_read : sort_key_read; + sort_param[i].key_cmp=sort_key_cmp; + sort_param[i].key_write=sort_key_write; + sort_param[i].lock_in_memory=lock_memory; + sort_param[i].tmpdir=param->tmpdir; + sort_param[i].sort_info=&sort_info; + sort_param[i].fix_datafile=0; + + sort_param[i].filepos=new_header_length; + sort_param[i].max_pos=sort_param[i].pos=share->pack.header_length; + + sort_param[i].record=((char *)(sort_param+share->base.keys))+ + (share->base.pack_reclength * i); + + sort_param[i].key_length=share->rec_reflength; + for (keyseg=sort_param[i].keyinfo->seg; keyseg->type != HA_KEYTYPE_END; + keyseg++) + { + sort_param[i].key_length+=keyseg->length; + if (keyseg->flag & HA_SPACE_PACK) + sort_param[i].key_length+=get_pack_length(keyseg->length); + if (keyseg->flag & (HA_BLOB_PART | HA_VAR_LENGTH)) + sort_param[i].key_length+=2 + test(keyseg->length >= 127); + if (keyseg->flag & HA_NULL_PART) + sort_param[i].key_length++; + } + + length+=sort_param[i].key_length; + + if (sort_param[i].keyinfo->flag & HA_FULLTEXT) + sort_param[i].key_length+=ft_max_word_len_for_sort-ft_max_word_len; + } + sort_info.total_keys=i; + sort_param[0].fix_datafile= ! rep_quick; + + sort_info.got_error=0; + pthread_mutex_init(& sort_info.mutex, MY_MUTEX_INIT_FAST); + pthread_cond_init(& sort_info.cond, 0); + pthread_mutex_lock(& sort_info.mutex); + + init_io_cache_share(& param->read_cache, &io_share, i); + for (i=0 ; i<sort_info.total_keys ; i++) + { + sort_param[i].read_cache=param->read_cache; + sort_param[i].sortbuff_size= + /* + two approaches: the same amount of memory for each thread + or the memory for the same number of keys for each thread... + In the second one all the threads will fill their sort_buffers + (and call write_keys) at the same time, putting more stress on i/o. + */ +#if 1 + param->sort_buffer_length/sort_info.total_keys; +#else + param->sort_buffer_length*sort_param[i].key_length/length; +#endif + if (pthread_create(& sort_param[i].thr, 0, + (void *(*)(void*))_thr_find_all_keys, sort_param+i)) + { + mi_check_print_error(param,"Cannot start a repair thread"); + remove_io_thread(& param->read_cache); + sort_info.got_error=1; + } + else + sort_info.threads_running++; + } + + /* waiting for all threads to finish */ + while (sort_info.threads_running) + pthread_cond_wait(& sort_info.cond, &sort_info.mutex); + pthread_mutex_unlock(& sort_info.mutex); + + if (got_error=_thr_write_keys(sort_param)) + { + param->retry_repair=1; + goto err; + } + got_error=1; + + if (sort_param[0].fix_datafile) + { + if (write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache)) + goto err; + if (param->testflag & T_SAFE_REPAIR) + { + /* Don't repair if we loosed more than one row */ + if (info->state->records+1 < start_records) + { + info->state->records=start_records; + goto err; + } + } + share->state.state.data_file_length = info->state->data_file_length + = sort_param->filepos; + /* Only whole records */ + share->state.version=(ulong) time((time_t*) 0); + my_close(info->dfile,MYF(0)); + info->dfile=new_file; + share->data_file_type=sort_info.new_data_file_type; + share->pack.header_length=(ulong) new_header_length; + } + else + info->state->data_file_length=sort_param->max_pos; + + if (rep_quick && del+sort_info.dupp != info->state->del) + { + mi_check_print_error(param,"Couldn't fix table with quick recovery: Found wrong number of deleted records"); + mi_check_print_error(param,"Run recovery again without -q"); + param->retry_repair=1; + param->testflag|=T_RETRY_WITHOUT_QUICK; + goto err; + } + + if (rep_quick & T_FORCE_UNIQUENESS) + { + my_off_t skr=info->state->data_file_length+ + (share->options & HA_OPTION_COMPRESS_RECORD ? + MEMMAP_EXTRA_MARGIN : 0); +#ifdef USE_RELOC + if (share->data_file_type == STATIC_RECORD && + skr < share->base.reloc*share->base.min_pack_length) + skr=share->base.reloc*share->base.min_pack_length; +#endif + if (skr != sort_info.filelength && !info->s->base.raid_type) + if (my_chsize(info->dfile,skr,MYF(0))) + mi_check_print_warning(param, + "Can't change size of datafile, error: %d", + my_errno); + } + if (param->testflag & T_CALC_CHECKSUM) + share->state.checksum=param->glob_crc; + + if (my_chsize(share->kfile,info->state->key_file_length,MYF(0))) + mi_check_print_warning(param, + "Can't change size of indexfile, error: %d", my_errno); + + if (!(param->testflag & T_SILENT)) + { + if (start_records != info->state->records) + printf("Data records: %s\n", llstr(info->state->records,llbuff)); + if (sort_info.dupp) + mi_check_print_warning(param, + "%s records have been removed", + llstr(sort_info.dupp,llbuff)); + } + got_error=0; + + if (&share->state.state != info->state) + memcpy( &share->state.state, info->state, sizeof(*info->state)); + +err: + got_error|= flush_blocks(param,share->kfile); + VOID(end_io_cache(&info->rec_cache)); + if (!got_error) + { + /* Replace the actual file with the temporary file */ + if (new_file >= 0) + { + my_close(new_file,MYF(0)); + info->dfile=new_file= -1; + if (change_to_newfile(share->data_file_name,MI_NAME_DEXT, + DATA_TMP_EXT, share->base.raid_chunks, + (param->testflag & T_BACKUP_DATA ? + MYF(MY_REDEL_MAKE_BACKUP): MYF(0))) || + mi_open_datafile(info,share,-1)) + got_error=1; + } + } + if (got_error) + { + if (! param->error_printed) + mi_check_print_error(param,"%d when fixing table",my_errno); + if (new_file >= 0) + { + VOID(my_close(new_file,MYF(0))); + VOID(my_raid_delete(param->temp_filename,share->base.raid_chunks, + MYF(MY_WME))); + if (info->dfile == new_file) + info->dfile= -1; + } + mi_mark_crashed_on_repair(info); + } + else if (key_map == share->state.key_map) + share->state.changed&= ~STATE_NOT_OPTIMIZED_KEYS; + share->state.changed|=STATE_NOT_SORTED_PAGES; + + pthread_cond_destroy (& sort_info.cond); + pthread_mutex_destroy(& sort_info.mutex); + + my_free((gptr) sort_info.key_block,MYF(MY_ALLOW_ZERO_PTR)); + my_free((gptr) sort_param,MYF(MY_ALLOW_ZERO_PTR)); + my_free(sort_info.buff,MYF(MY_ALLOW_ZERO_PTR)); + VOID(end_io_cache(¶m->read_cache)); + info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED); + if (!got_error && (param->testflag & T_UNPACK)) + { + share->state.header.options[0]&= (uchar) ~HA_OPTION_COMPRESS_RECORD; + share->pack.header_length=0; + } + DBUG_RETURN(got_error); +} /* Read next record and return next key */ -static int sort_key_read(SORT_INFO *sort_info, void *key) +static int sort_key_read(MI_SORT_PARAM *sort_param, void *key) { int error; - MI_INFO *info; + SORT_INFO *sort_info=sort_param->sort_info; + MI_INFO *info=sort_info->info; DBUG_ENTER("sort_key_read"); - info=sort_info->info; - - if ((error=sort_get_next_record(sort_info))) + if ((error=sort_get_next_record(sort_param))) DBUG_RETURN(error); if (info->state->records == sort_info->max_records) { @@ -2101,52 +2454,51 @@ static int sort_key_read(SORT_INFO *sort_info, void *key) "Found too many records; Can`t continue"); DBUG_RETURN(1); } - sort_info->real_key_length=(info->s->rec_reflength+ - _mi_make_key(info, sort_info->key, - (uchar*) key, sort_info->record, - sort_info->filepos)); - DBUG_RETURN(sort_write_record(sort_info)); + sort_param->real_key_length=(info->s->rec_reflength+ + _mi_make_key(info, sort_param->key, (uchar*) key, + sort_param->record, sort_param->filepos)); + DBUG_RETURN(sort_write_record(sort_param)); } /* sort_key_read */ -static int sort_ft_key_read(SORT_INFO *sort_info, void *key) +static int sort_ft_key_read(MI_SORT_PARAM *sort_param, void *key) { int error; - MI_INFO *info; + SORT_INFO *sort_info=sort_param->sort_info; + MI_INFO *info=sort_info->info; FT_WORD *wptr=0; DBUG_ENTER("sort_ft_key_read"); - info=sort_info->info; - - if (!sort_info->wordlist) + if (!sort_param->wordlist) { do { my_free((char*) wptr, MYF(MY_ALLOW_ZERO_PTR)); - if ((error=sort_get_next_record(sort_info))) + if ((error=sort_get_next_record(sort_param))) DBUG_RETURN(error); - if (!(wptr=_mi_ft_parserecord(info,sort_info->key,key,sort_info->record))) + if (!(wptr=_mi_ft_parserecord(info,sort_param->key, + key,sort_param->record))) DBUG_RETURN(1); - error=sort_write_record(sort_info); + error=sort_write_record(sort_param); } while (!wptr->pos); - sort_info->wordptr=sort_info->wordlist=wptr; + sort_param->wordptr=sort_param->wordlist=wptr; } else { error=0; - wptr=(FT_WORD*)(sort_info->wordptr); + wptr=(FT_WORD*)(sort_param->wordptr); } - sort_info->real_key_length=info->s->rec_reflength+_ft_make_key(info, - sort_info->key,key,wptr++,sort_info->filepos); + sort_param->real_key_length=info->s->rec_reflength+_ft_make_key(info, + sort_param->key,key,wptr++,sort_param->filepos); if (!wptr->pos) { - my_free((char*) sort_info->wordlist, MYF(0)); - sort_info->wordlist=0; + my_free((char*) sort_param->wordlist, MYF(0)); + sort_param->wordlist=0; } else - sort_info->wordptr=(void*)wptr; + sort_param->wordptr=(void*)wptr; DBUG_RETURN(error); @@ -2155,7 +2507,7 @@ static int sort_ft_key_read(SORT_INFO *sort_info, void *key) /* Read next record from file using parameters in sort_info */ /* Return -1 if end of file, 0 if ok and > 0 if error */ -static int sort_get_next_record(SORT_INFO *sort_info) +static int sort_get_next_record(MI_SORT_PARAM *sort_param) { int searching; uint found_record,b_type,left_length; @@ -2164,6 +2516,7 @@ static int sort_get_next_record(SORT_INFO *sort_info) MI_BLOCK_INFO block_info; MI_INFO *info; MYISAM_SHARE *share; + SORT_INFO *sort_info=sort_param->sort_info; MI_CHECK *param=sort_info->param; char llbuff[22],llbuff2[22]; DBUG_ENTER("sort_get_next_record"); @@ -2174,30 +2527,30 @@ static int sort_get_next_record(SORT_INFO *sort_info) case STATIC_RECORD: for (;;) { - if (my_b_read(¶m->read_cache,sort_info->record, + if (my_b_read(&sort_param->read_cache,sort_param->record, share->base.pack_reclength)) { - if (param->read_cache.error) + if (sort_param->read_cache.error) param->out_flag |= O_DATA_LOST; param->retry_repair=1; param->testflag|=T_RETRY_WITHOUT_QUICK; DBUG_RETURN(-1); } - sort_info->start_recpos=sort_info->pos; - if (!sort_info->fix_datafile) + sort_param->start_recpos=sort_param->pos; + if (!sort_param->fix_datafile) { - sort_info->filepos=sort_info->pos; + sort_param->filepos=sort_param->pos; share->state.split++; } - sort_info->max_pos=(sort_info->pos+=share->base.pack_reclength); - if (*sort_info->record) + sort_param->max_pos=(sort_param->pos+=share->base.pack_reclength); + if (*sort_param->record) { if (param->calc_checksum) param->glob_crc+= (info->checksum= - mi_static_checksum(info,sort_info->record)); + mi_static_checksum(info,sort_param->record)); DBUG_RETURN(0); } - if (!sort_info->fix_datafile) + if (!sort_param->fix_datafile) { info->state->del++; info->state->empty+=share->base.pack_reclength; @@ -2205,8 +2558,8 @@ static int sort_get_next_record(SORT_INFO *sort_info) } case DYNAMIC_RECORD: LINT_INIT(to); - pos=sort_info->pos; - searching=(sort_info->fix_datafile && (param->testflag & T_EXTEND)); + pos=sort_param->pos; + searching=(sort_param->fix_datafile && (param->testflag & T_EXTEND)); for (;;) { found_record=block_info.second_read= 0; @@ -2215,12 +2568,12 @@ static int sort_get_next_record(SORT_INFO *sort_info) { pos=MY_ALIGN(pos,MI_DYN_ALIGN_SIZE); param->testflag|=T_RETRY_WITHOUT_QUICK; - sort_info->start_recpos=pos; + sort_param->start_recpos=pos; } do { - if (pos > sort_info->max_pos) - sort_info->max_pos=pos; + if (pos > sort_param->max_pos) + sort_param->max_pos=pos; if (pos & (MI_DYN_ALIGN_SIZE-1)) { if ((param->testflag & T_VERBOSE) || searching == 0) @@ -2232,8 +2585,9 @@ static int sort_get_next_record(SORT_INFO *sort_info) if (found_record && pos == param->search_after_block) mi_check_print_info(param,"Block: %s used by record at %s", llstr(param->search_after_block,llbuff), - llstr(sort_info->start_recpos,llbuff2)); - if (_mi_read_cache(¶m->read_cache,(byte*) block_info.header,pos, + llstr(sort_param->start_recpos,llbuff2)); + if (_mi_read_cache(&sort_param->read_cache, + (byte*) block_info.header,pos, MI_BLOCK_INFO_HEADER_LENGTH, (! found_record ? READING_NEXT : 0) | READING_HEADER)) @@ -2242,12 +2596,12 @@ static int sort_get_next_record(SORT_INFO *sort_info) { mi_check_print_info(param, "Can't read whole record at %s (errno: %d)", - llstr(sort_info->start_recpos,llbuff),errno); + llstr(sort_param->start_recpos,llbuff),errno); goto try_next; } DBUG_RETURN(-1); } - if (searching && ! sort_info->fix_datafile) + if (searching && ! sort_param->fix_datafile) { param->error_printed=1; param->retry_repair=1; @@ -2278,7 +2632,7 @@ static int sort_get_next_record(SORT_INFO *sort_info) block_info.header[i] <= MI_MAX_DYN_HEADER_BYTE) break; pos+=(ulong) i; - sort_info->start_recpos=pos; + sort_param->start_recpos=pos; continue; } if (b_type & BLOCK_DELETED) @@ -2314,7 +2668,7 @@ static int sort_get_next_record(SORT_INFO *sort_info) goto try_next; searching=1; pos+= MI_DYN_ALIGN_SIZE; - sort_info->start_recpos=pos; + sort_param->start_recpos=pos; block_info.second_read=0; continue; } @@ -2335,14 +2689,14 @@ static int sort_get_next_record(SORT_INFO *sort_info) goto try_next; searching=1; pos+= MI_DYN_ALIGN_SIZE; - sort_info->start_recpos=pos; + sort_param->start_recpos=pos; block_info.second_read=0; continue; } } if (b_type & (BLOCK_DELETED | BLOCK_SYNC_ERROR)) { - if (!sort_info->fix_datafile && (b_type & BLOCK_DELETED)) + if (!sort_param->fix_datafile && (b_type & BLOCK_DELETED)) { info->state->empty+=block_info.block_len; info->state->del++; @@ -2353,7 +2707,7 @@ static int sort_get_next_record(SORT_INFO *sort_info) if (searching) { pos+=MI_DYN_ALIGN_SIZE; - sort_info->start_recpos=pos; + sort_param->start_recpos=pos; } else pos=block_info.filepos+block_info.block_len; @@ -2361,24 +2715,24 @@ static int sort_get_next_record(SORT_INFO *sort_info) continue; } - if (!sort_info->fix_datafile) + if (!sort_param->fix_datafile) share->state.split++; if (! found_record++) { - sort_info->find_length=left_length=block_info.rec_len; - sort_info->start_recpos=pos; - if (!sort_info->fix_datafile) - sort_info->filepos=sort_info->start_recpos; - if (sort_info->fix_datafile && (param->testflag & T_EXTEND)) - sort_info->pos=block_info.filepos+1; + sort_param->find_length=left_length=block_info.rec_len; + sort_param->start_recpos=pos; + if (!sort_param->fix_datafile) + sort_param->filepos=sort_param->start_recpos; + if (sort_param->fix_datafile && (param->testflag & T_EXTEND)) + sort_param->pos=block_info.filepos+1; else - sort_info->pos=block_info.filepos+block_info.block_len; + sort_param->pos=block_info.filepos+block_info.block_len; if (share->base.blobs) { if (!(to=mi_fix_rec_buff_for_blob(info,block_info.rec_len))) { mi_check_print_error(param,"Not enough memory for blob at %s", - llstr(sort_info->start_recpos,llbuff)); + llstr(sort_param->start_recpos,llbuff)); DBUG_RETURN(1); } } @@ -2388,17 +2742,17 @@ static int sort_get_next_record(SORT_INFO *sort_info) if (left_length < block_info.data_len || ! block_info.data_len) { mi_check_print_info(param,"Found block with too small length at %s; Skipped", - llstr(sort_info->start_recpos,llbuff)); + llstr(sort_param->start_recpos,llbuff)); goto try_next; } if (block_info.filepos + block_info.data_len > - param->read_cache.end_of_file) + sort_param->read_cache.end_of_file) { mi_check_print_info(param,"Found block that points outside data file at %s", - llstr(sort_info->start_recpos,llbuff)); + llstr(sort_param->start_recpos,llbuff)); goto try_next; } - if (_mi_read_cache(¶m->read_cache,to,block_info.filepos, + if (_mi_read_cache(&sort_param->read_cache,to,block_info.filepos, block_info.data_len, (found_record == 1 ? READING_NEXT : 0))) { @@ -2413,31 +2767,31 @@ static int sort_get_next_record(SORT_INFO *sort_info) if (pos == HA_OFFSET_ERROR && left_length) { mi_check_print_info(param,"Wrong block with wrong total length starting at %s", - llstr(sort_info->start_recpos,llbuff)); + llstr(sort_param->start_recpos,llbuff)); goto try_next; } - if (pos + MI_BLOCK_INFO_HEADER_LENGTH > param->read_cache.end_of_file) + if (pos + MI_BLOCK_INFO_HEADER_LENGTH > sort_param->read_cache.end_of_file) { mi_check_print_info(param,"Found link that points at %s (outside data file) at %s", llstr(pos,llbuff2), - llstr(sort_info->start_recpos,llbuff)); + llstr(sort_param->start_recpos,llbuff)); goto try_next; } } while (left_length); - if (_mi_rec_unpack(info,sort_info->record,info->rec_buff, - sort_info->find_length) != MY_FILE_ERROR) + if (_mi_rec_unpack(info,sort_param->record,info->rec_buff, + sort_param->find_length) != MY_FILE_ERROR) { - if (param->read_cache.error < 0) + if (sort_param->read_cache.error < 0) DBUG_RETURN(1); if (info->s->calc_checksum) - info->checksum=mi_checksum(info,sort_info->record); + info->checksum=mi_checksum(info,sort_param->record); if ((param->testflag & (T_EXTEND | T_REP)) || searching) { - if (_mi_rec_check(info, sort_info->record)) + if (_mi_rec_check(info, sort_param->record)) { mi_check_print_info(param,"Found wrong packed record at %s", - llstr(sort_info->start_recpos,llbuff)); + llstr(sort_param->start_recpos,llbuff)); goto try_next; } } @@ -2447,31 +2801,31 @@ static int sort_get_next_record(SORT_INFO *sort_info) } if (!searching) mi_check_print_info(param,"Found wrong stored record at %s", - llstr(sort_info->start_recpos,llbuff)); + llstr(sort_param->start_recpos,llbuff)); try_next: - pos=(sort_info->start_recpos+=MI_DYN_ALIGN_SIZE); + pos=(sort_param->start_recpos+=MI_DYN_ALIGN_SIZE); searching=1; } case COMPRESSED_RECORD: - for (searching=0 ;; searching=1, sort_info->pos++) + for (searching=0 ;; searching=1, sort_param->pos++) { - if (_mi_read_cache(¶m->read_cache,(byte*) block_info.header, - sort_info->pos, + if (_mi_read_cache(&sort_param->read_cache,(byte*) block_info.header, + sort_param->pos, share->pack.ref_length,READING_NEXT)) DBUG_RETURN(-1); - if (searching && ! sort_info->fix_datafile) + if (searching && ! sort_param->fix_datafile) { param->error_printed=1; param->retry_repair=1; param->testflag|=T_RETRY_WITHOUT_QUICK; DBUG_RETURN(1); /* Something wrong with data */ } - sort_info->start_recpos=sort_info->pos; - if (_mi_pack_get_block_info(info,&block_info,-1,sort_info->pos, NullS)) + sort_param->start_recpos=sort_param->pos; + if (_mi_pack_get_block_info(info,&block_info,-1,sort_param->pos, NullS)) DBUG_RETURN(-1); if (!block_info.rec_len && - sort_info->pos + MEMMAP_EXTRA_MARGIN == - param->read_cache.end_of_file) + sort_param->pos + MEMMAP_EXTRA_MARGIN == + sort_param->read_cache.end_of_file) DBUG_RETURN(-1); if (block_info.rec_len < (uint) share->min_pack_length || block_info.rec_len > (uint) share->max_pack_length) @@ -2479,33 +2833,33 @@ static int sort_get_next_record(SORT_INFO *sort_info) if (! searching) mi_check_print_info(param,"Found block with wrong recordlength: %d at %s\n", block_info.rec_len, - llstr(sort_info->pos,llbuff)); + llstr(sort_param->pos,llbuff)); continue; } - if (_mi_read_cache(¶m->read_cache,(byte*) info->rec_buff, + if (_mi_read_cache(&sort_param->read_cache,(byte*) info->rec_buff, block_info.filepos, block_info.rec_len, READING_NEXT)) { if (! searching) mi_check_print_info(param,"Couldn't read whole record from %s", - llstr(sort_info->pos,llbuff)); + llstr(sort_param->pos,llbuff)); continue; } - if (_mi_pack_rec_unpack(info,sort_info->record,info->rec_buff, + if (_mi_pack_rec_unpack(info,sort_param->record,info->rec_buff, block_info.rec_len)) { if (! searching) mi_check_print_info(param,"Found wrong record at %s", - llstr(sort_info->pos,llbuff)); + llstr(sort_param->pos,llbuff)); continue; } - info->checksum=mi_checksum(info,sort_info->record); - if (!sort_info->fix_datafile) + info->checksum=mi_checksum(info,sort_param->record); + if (!sort_param->fix_datafile) { - sort_info->filepos=sort_info->pos; + sort_param->filepos=sort_param->pos; share->state.split++; } - sort_info->max_pos=(sort_info->pos=block_info.filepos+ + sort_param->max_pos=(sort_param->pos=block_info.filepos+ block_info.rec_len); info->packed_length=block_info.rec_len; if (param->calc_checksum) @@ -2519,7 +2873,7 @@ static int sort_get_next_record(SORT_INFO *sort_info) /* Write record to new file */ -int sort_write_record(SORT_INFO *sort_info) +int sort_write_record(MI_SORT_PARAM *sort_param) { int flag; uint length; @@ -2527,25 +2881,26 @@ int sort_write_record(SORT_INFO *sort_info) byte *from; byte block_buff[8]; MI_INFO *info; + SORT_INFO *sort_info=sort_param->sort_info; MYISAM_SHARE *share; MI_CHECK *param=sort_info->param; DBUG_ENTER("sort_write_record"); info=sort_info->info; share=info->s; - if (sort_info->fix_datafile) + if (sort_param->fix_datafile) { switch (sort_info->new_data_file_type) { case STATIC_RECORD: - if (my_b_write(&info->rec_cache,sort_info->record, + if (my_b_write(&info->rec_cache,sort_param->record, share->base.pack_reclength)) { mi_check_print_error(param,"%d when writing to datafile",my_errno); DBUG_RETURN(1); } - sort_info->filepos+=share->base.pack_reclength; + sort_param->filepos+=share->base.pack_reclength; info->s->state.split++; - /* sort_info->param->glob_crc+=mi_static_checksum(info, sort_info->record); */ + /* sort_info->param->glob_crc+=mi_static_checksum(info, sort_param->record); */ break; case DYNAMIC_RECORD: if (! info->blobs) @@ -2554,7 +2909,7 @@ int sort_write_record(SORT_INFO *sort_info) { /* must be sure that local buffer is big enough */ reclength=info->s->base.pack_reclength+ - _my_calc_total_blob_length(info,sort_info->record)+ + _my_calc_total_blob_length(info,sort_param->record)+ ALIGN_SIZE(MI_MAX_DYN_BLOCK_HEADER)+MI_SPLIT_LENGTH+ MI_DYN_DELETE_BLOCK_HEADER; if (sort_info->buff_length < reclength) @@ -2567,8 +2922,8 @@ int sort_write_record(SORT_INFO *sort_info) } from=sort_info->buff+ALIGN_SIZE(MI_MAX_DYN_BLOCK_HEADER); } - info->checksum=mi_checksum(info,sort_info->record); - reclength=_mi_rec_pack(info,from,sort_info->record); + info->checksum=mi_checksum(info,sort_param->record); + reclength=_mi_rec_pack(info,from,sort_param->record); flag=0; /* sort_info->param->glob_crc+=info->checksum; */ @@ -2582,13 +2937,13 @@ int sort_write_record(SORT_INFO *sort_info) if (block_length > MI_MAX_BLOCK_LENGTH) block_length=MI_MAX_BLOCK_LENGTH; if (_mi_write_part_record(info,0L,block_length, - sort_info->filepos+block_length, + sort_param->filepos+block_length, &from,&reclength,&flag)) { mi_check_print_error(param,"%d when writing to datafile",my_errno); DBUG_RETURN(1); } - sort_info->filepos+=block_length; + sort_param->filepos+=block_length; info->s->state.split++; } while (reclength); /* sort_info->param->glob_crc+=info->checksum; */ @@ -2605,7 +2960,7 @@ int sort_write_record(SORT_INFO *sort_info) DBUG_RETURN(1); } /* sort_info->param->glob_crc+=info->checksum; */ - sort_info->filepos+=reclength+length; + sort_param->filepos+=reclength+length; info->s->state.split++; break; } @@ -2623,49 +2978,47 @@ int sort_write_record(SORT_INFO *sort_info) /* Compare two keys from _create_index_by_sort */ -static int sort_key_cmp(SORT_INFO *sort_info, const void *a, const void *b) +static int sort_key_cmp(MI_SORT_PARAM *sort_param, const void *a, const void *b) { uint not_used; - return (_mi_key_cmp(sort_info->keyseg,*((uchar**) a),*((uchar**) b), + return (_mi_key_cmp(sort_param->keyinfo->seg,*((uchar**) a),*((uchar**) b), USE_WHOLE_KEY, SEARCH_SAME,¬_used)); } /* sort_key_cmp */ -static int sort_key_write(SORT_INFO *sort_info, const void *a) +static int sort_key_write(MI_SORT_PARAM *sort_param, const void *a) { uint diff_pos; char llbuff[22],llbuff2[22]; + SORT_INFO *sort_info=sort_param->sort_info; MI_CHECK *param= sort_info->param; int cmp; if (sort_info->key_block->inited) { - cmp=_mi_key_cmp(sort_info->keyseg,sort_info->key_block->lastkey,(uchar*) a, - USE_WHOLE_KEY,SEARCH_FIND | SEARCH_UPDATE ,&diff_pos); - sort_info->unique[diff_pos-1]++; + cmp=_mi_key_cmp(sort_param->keyinfo->seg,sort_info->key_block->lastkey, + (uchar*) a, USE_WHOLE_KEY,SEARCH_FIND | SEARCH_UPDATE ,&diff_pos); + sort_param->unique[diff_pos-1]++; } else { cmp= -1; } - if ((sort_info->keyinfo->flag & HA_NOSAME) && cmp == 0) + if ((sort_param->keyinfo->flag & HA_NOSAME) && cmp == 0) { sort_info->dupp++; sort_info->info->lastpos=get_record_for_key(sort_info->info, - sort_info->keyinfo, + sort_param->keyinfo, (uchar*) a); mi_check_print_warning(param, - "Duplicate key for record at %10s against record at %10s", - llstr(sort_info->info->lastpos,llbuff), - llstr(get_record_for_key(sort_info->info, - sort_info->keyinfo, - sort_info->key_block-> - lastkey), - llbuff2)); + "Duplicate key for record at %10s against record at %10s", + llstr(sort_info->info->lastpos,llbuff), + llstr(get_record_for_key(sort_info->info, sort_param->keyinfo, + sort_info->key_block->lastkey), llbuff2)); param->testflag|=T_RETRY_WITHOUT_QUICK; if (sort_info->param->testflag & T_VERBOSE) - _mi_print_key(stdout,sort_info->keyseg,(uchar*) a, USE_WHOLE_KEY); - return (sort_delete_record(param)); + _mi_print_key(stdout,sort_param->keyinfo->seg,(uchar*) a, USE_WHOLE_KEY); + return (sort_delete_record(sort_param)); } #ifndef DBUG_OFF if (cmp > 0) @@ -2675,8 +3028,8 @@ static int sort_key_write(SORT_INFO *sort_info, const void *a) return(1); } #endif - return (sort_insert_key(param,sort_info->key_block,(uchar*) a, - HA_OFFSET_ERROR)); + return (sort_insert_key(sort_param,sort_info->key_block, + (uchar*) a, HA_OFFSET_ERROR)); } /* sort_key_write */ @@ -2691,7 +3044,7 @@ static my_off_t get_record_for_key(MI_INFO *info, MI_KEYDEF *keyinfo, /* Insert a key in sort-key-blocks */ -static int sort_insert_key(MI_CHECK *param, +static int sort_insert_key(MI_SORT_PARAM *sort_param, register SORT_KEY_BLOCKS *key_block, uchar *key, my_off_t prev_block) { @@ -2700,14 +3053,16 @@ static int sort_insert_key(MI_CHECK *param, uchar *anc_buff,*lastkey; MI_KEY_PARAM s_temp; MI_INFO *info; - SORT_INFO *sort_info= ¶m->sort_info; + MI_KEYDEF *keyinfo=sort_param->keyinfo; + SORT_INFO *sort_info= sort_param->sort_info; + MI_CHECK *param=sort_info->param; DBUG_ENTER("sort_insert_key"); anc_buff=key_block->buff; info=sort_info->info; lastkey=key_block->lastkey; nod_flag= (key_block == sort_info->key_block ? 0 : - sort_info->info->s->base.key_reflength); + info->s->base.key_reflength); if (!key_block->inited) { @@ -2728,17 +3083,16 @@ static int sort_insert_key(MI_CHECK *param, if (nod_flag) _mi_kpointer(info,key_block->end_pos,prev_block); - t_length=(*sort_info->keyinfo->pack_key)(sort_info->keyinfo,nod_flag, - (uchar*) 0,lastkey,lastkey,key, - &s_temp); - (*sort_info->keyinfo->store_key)(sort_info->keyinfo, - key_block->end_pos+nod_flag,&s_temp); + t_length=(*keyinfo->pack_key)(keyinfo,nod_flag, + (uchar*) 0,lastkey,lastkey,key, + &s_temp); + (*keyinfo->store_key)(keyinfo, key_block->end_pos+nod_flag,&s_temp); a_length+=t_length; mi_putint(anc_buff,a_length,nod_flag); key_block->end_pos+=t_length; - if (a_length <= sort_info->keyinfo->block_length) + if (a_length <= keyinfo->block_length) { - VOID(_mi_move_key(sort_info->keyinfo,key_block->lastkey,key)); + VOID(_mi_move_key(keyinfo,key_block->lastkey,key)); key_block->last_length=a_length-t_length; DBUG_RETURN(0); } @@ -2746,43 +3100,42 @@ static int sort_insert_key(MI_CHECK *param, /* Fill block with end-zero and write filled block */ mi_putint(anc_buff,key_block->last_length,nod_flag); bzero((byte*) anc_buff+key_block->last_length, - sort_info->keyinfo->block_length- key_block->last_length); + keyinfo->block_length- key_block->last_length); key_file_length=info->state->key_file_length; - if ((filepos=_mi_new(info,sort_info->keyinfo)) == HA_OFFSET_ERROR) + if ((filepos=_mi_new(info,keyinfo)) == HA_OFFSET_ERROR) DBUG_RETURN(1); /* If we read the page from the key cache, we have to write it back to it */ if (key_file_length == info->state->key_file_length) { - if (_mi_write_keypage(info, sort_info->keyinfo, filepos, - anc_buff)) + if (_mi_write_keypage(info, keyinfo, filepos, anc_buff)) DBUG_RETURN(1); } else if (my_pwrite(info->s->kfile,(byte*) anc_buff, - (uint) sort_info->keyinfo->block_length,filepos, - param->myf_rw)) + (uint) keyinfo->block_length,filepos, param->myf_rw)) DBUG_RETURN(1); DBUG_DUMP("buff",(byte*) anc_buff,mi_getint(anc_buff)); /* Write separator-key to block in next level */ - if (sort_insert_key(param,key_block+1,key_block->lastkey,filepos)) + if (sort_insert_key(sort_param,key_block+1,key_block->lastkey,filepos)) DBUG_RETURN(1); /* clear old block and write new key in it */ key_block->inited=0; - DBUG_RETURN(sort_insert_key(param, key_block,key,prev_block)); + DBUG_RETURN(sort_insert_key(sort_param, key_block,key,prev_block)); } /* sort_insert_key */ /* Delete record when we found a duplicated key */ -static int sort_delete_record(MI_CHECK *param) +static int sort_delete_record(MI_SORT_PARAM *sort_param) { uint i; int old_file,error; uchar *key; - MI_INFO *info; - SORT_INFO *sort_info= ¶m->sort_info; + SORT_INFO *sort_info=sort_param->sort_info; + MI_CHECK *param=sort_info->param; + MI_INFO *info=sort_info->info; DBUG_ENTER("sort_delete_record"); if (!(param->testflag & T_FORCE_UNIQUENESS)) @@ -2791,7 +3144,6 @@ static int sort_delete_record(MI_CHECK *param) "Quick-recover aborted; Run recovery without switch -q or with switch -qq"); DBUG_RETURN(1); } - info=sort_info->info; if (info->s->options & HA_OPTION_COMPRESS_RECORD) { mi_check_print_error(param, @@ -2801,10 +3153,10 @@ static int sort_delete_record(MI_CHECK *param) old_file=info->dfile; info->dfile=info->rec_cache.file; - if (sort_info->key) + if (sort_info->kei) { key=info->lastkey+info->s->base.max_key_length; - if ((error=(*info->s->read_rnd)(info,sort_info->record,info->lastpos,0)) && + if ((error=(*info->s->read_rnd)(info,sort_param->record,info->lastpos,0)) && error != HA_ERR_RECORD_DELETED) { mi_check_print_error(param,"Can't read record to be removed"); @@ -2812,9 +3164,9 @@ static int sort_delete_record(MI_CHECK *param) DBUG_RETURN(1); } - for (i=0 ; i < sort_info->key ; i++) + for (i=0 ; i < sort_info->kei ; i++) { - uint key_length=_mi_make_key(info,i,key,sort_info->record,info->lastpos); + uint key_length=_mi_make_key(info,i,key,sort_param->record,info->lastpos); if (_mi_ck_delete(info,i,key,key_length)) { mi_check_print_error(param,"Can't delete key %d from record to be removed",i+1); @@ -2823,8 +3175,7 @@ static int sort_delete_record(MI_CHECK *param) } } if (param->calc_checksum) - param->glob_crc-=(*info->s->calc_checksum)(info, - sort_info->record); + param->glob_crc-=(*info->s->calc_checksum)(info, sort_param->record); } error=flush_io_cache(&info->rec_cache) || (*info->s->delete_record)(info); info->dfile=old_file; /* restore actual value */ @@ -2832,20 +3183,20 @@ static int sort_delete_record(MI_CHECK *param) DBUG_RETURN(error); } /* sort_delete_record */ - /* Fix all pending blocks and flush everything to disk */ -int flush_pending_blocks(MI_CHECK *param) +int flush_pending_blocks(MI_SORT_PARAM *sort_param) { uint nod_flag,length; my_off_t filepos,key_file_length; - MI_INFO *info; SORT_KEY_BLOCKS *key_block; - SORT_INFO *sort_info= ¶m->sort_info; + SORT_INFO *sort_info= sort_param->sort_info; + MI_CHECK *param=sort_info->param; + MI_INFO *info=sort_info->info; + MI_KEYDEF *keyinfo=sort_param->keyinfo; DBUG_ENTER("flush_pending_blocks"); filepos= HA_OFFSET_ERROR; /* if empty file */ - info=sort_info->info; nod_flag=0; for (key_block=sort_info->key_block ; key_block->inited ; key_block++) { @@ -2854,30 +3205,26 @@ int flush_pending_blocks(MI_CHECK *param) if (nod_flag) _mi_kpointer(info,key_block->end_pos,filepos); key_file_length=info->state->key_file_length; - bzero((byte*) key_block->buff+length, - sort_info->keyinfo->block_length-length); - if ((filepos=_mi_new(info,sort_info->keyinfo)) == HA_OFFSET_ERROR) + bzero((byte*) key_block->buff+length, keyinfo->block_length-length); + if ((filepos=_mi_new(info,keyinfo)) == HA_OFFSET_ERROR) DBUG_RETURN(1); /* If we read the page from the key cache, we have to write it back */ if (key_file_length == info->state->key_file_length) { - if (_mi_write_keypage(info, sort_info->keyinfo, filepos, - key_block->buff)) + if (_mi_write_keypage(info, keyinfo, filepos, key_block->buff)) DBUG_RETURN(1); } else if (my_pwrite(info->s->kfile,(byte*) key_block->buff, - (uint) sort_info->keyinfo->block_length,filepos, - param->myf_rw)) + (uint) keyinfo->block_length,filepos, param->myf_rw)) DBUG_RETURN(1); DBUG_DUMP("buff",(byte*) key_block->buff,length); nod_flag=1; } - info->s->state.key_root[sort_info->key]=filepos; /* Last is root for tree */ + info->s->state.key_root[sort_param->key]=filepos; /* Last is root for tree */ DBUG_RETURN(0); } /* flush_pending_blocks */ - /* alloc space and pointers for key_blocks */ static SORT_KEY_BLOCKS *alloc_key_blocks(MI_CHECK *param, uint blocks, @@ -3087,24 +3434,25 @@ end: /* write suffix to data file if neaded */ -int write_data_suffix(MI_CHECK *param, MI_INFO *info) +int write_data_suffix(SORT_INFO *sort_info, my_bool fix_datafile) { - if (info->s->options & HA_OPTION_COMPRESS_RECORD && - param->sort_info.fix_datafile) + MI_INFO *info=sort_info->info; + + if (info->s->options & HA_OPTION_COMPRESS_RECORD && fix_datafile) { char buff[MEMMAP_EXTRA_MARGIN]; bzero(buff,sizeof(buff)); if (my_b_write(&info->rec_cache,buff,sizeof(buff))) { - mi_check_print_error(param,"%d when writing to datafile",my_errno); + mi_check_print_error(sort_info->param, + "%d when writing to datafile",my_errno); return 1; } - param->read_cache.end_of_file+=sizeof(buff); + sort_info->param->read_cache.end_of_file+=sizeof(buff); } return 0; } - /* Update state and myisamchk_time of indexfile */ int update_state_info(MI_CHECK *param, MI_INFO *info,uint update) @@ -3210,10 +3558,8 @@ void update_auto_increment_key(MI_CHECK *param, MI_INFO *info, /* calculate unique keys for each part key */ -static void update_key_parts(MI_KEYDEF *keyinfo, - ulong *rec_per_key_part, - ulonglong *unique, - ulonglong records) +void update_key_parts(MI_KEYDEF *keyinfo, ulong *rec_per_key_part, + ulonglong *unique, ulonglong records) { ulonglong count=0,tmp; uint parts; @@ -3288,7 +3634,7 @@ void mi_disable_non_unique_index(MI_INFO *info, ha_rows rows) even if the temporary file would be quite big! */ -my_bool mi_test_if_sort_rep(MI_INFO *info, ha_rows rows, +my_bool mi_test_if_sort_rep(MI_INFO *info, ha_rows rows, ulonglong key_map, my_bool force) { @@ -3297,7 +3643,7 @@ my_bool mi_test_if_sort_rep(MI_INFO *info, ha_rows rows, uint i; /* - repair_by_sort only works if we have at least one key. If we don't + mi_repair_by_sort only works if we have at least one key. If we don't have any keys, we should use the normal repair. */ if (!key_map) @@ -3312,10 +3658,10 @@ my_bool mi_test_if_sort_rep(MI_INFO *info, ha_rows rows, static void -set_data_file_type(MI_CHECK *param, SORT_INFO *sort_info, MYISAM_SHARE *share) +set_data_file_type(SORT_INFO *sort_info, MYISAM_SHARE *share) { if ((sort_info->new_data_file_type=share->data_file_type) == - COMPRESSED_RECORD && param->testflag & T_UNPACK) + COMPRESSED_RECORD && sort_info->param->testflag & T_UNPACK) { MYISAM_SHARE tmp; diff --git a/myisam/mi_write.c b/myisam/mi_write.c index 7398ec07ae2..9998a25a96a 100644 --- a/myisam/mi_write.c +++ b/myisam/mi_write.c @@ -823,7 +823,8 @@ int _mi_init_bulk_insert(MI_INFO *info) } } - if (num_keys==0 || num_keys > myisam_bulk_insert_tree_size) + if (num_keys==0 || + num_keys * MI_MIN_SIZE_BULK_INSERT_TREE > myisam_bulk_insert_tree_size) DBUG_RETURN(0); info->bulk_insert=(TREE *) diff --git a/myisam/myisamchk.c b/myisam/myisamchk.c index c776c80e3ec..f04a30a1584 100644 --- a/myisam/myisamchk.c +++ b/myisam/myisamchk.c @@ -74,7 +74,8 @@ static int mi_sort_records(MI_CHECK *param, uint sort_key, my_bool write_info, my_bool update_index); -static int sort_record_index(MI_CHECK *param,MI_INFO *info,MI_KEYDEF *keyinfo, +static int sort_record_index(MI_SORT_PARAM *sort_param,MI_INFO *info, + MI_KEYDEF *keyinfo, my_off_t page,uchar *buff,uint sortkey, File new_file, my_bool update_index); @@ -1331,7 +1332,7 @@ static int mi_sort_records(MI_CHECK *param, register MI_INFO *info, my_string name, uint sort_key, my_bool write_info, - my_bool update_index) + my_bool update_index) { int got_error; uint key; @@ -1341,11 +1342,14 @@ static int mi_sort_records(MI_CHECK *param, ha_rows old_record_count; MYISAM_SHARE *share=info->s; char llbuff[22],llbuff2[22]; - SORT_INFO *sort_info= ¶m->sort_info; + SORT_INFO sort_info; + MI_SORT_PARAM sort_param; DBUG_ENTER("sort_records"); - bzero((char*) sort_info,sizeof(*sort_info)); - sort_info->param=param; + bzero((char*)&sort_info,sizeof(sort_info)); + bzero((char*)&sort_param,sizeof(sort_param)); + sort_param.sort_info=&sort_info; + sort_info.param=param; keyinfo= &share->keyinfo[sort_key]; got_error=1; temp_buff=0; @@ -1381,7 +1385,7 @@ static int mi_sort_records(MI_CHECK *param, mi_check_print_error(param,"Not enough memory for key block"); goto err; } - if (!(sort_info->record=(byte*) my_malloc((uint) share->base.pack_reclength, + if (!(sort_param.record=(byte*) my_malloc((uint) share->base.pack_reclength, MYF(0)))) { mi_check_print_error(param,"Not enough memory for record"); @@ -1423,18 +1427,18 @@ static int mi_sort_records(MI_CHECK *param, } /* Setup param for sort_write_record */ - sort_info->info=info; - sort_info->new_data_file_type=share->data_file_type; - sort_info->fix_datafile=1; - sort_info->filepos=share->pack.header_length; + sort_info.info=info; + sort_info.new_data_file_type=share->data_file_type; + sort_param.fix_datafile=1; + sort_param.filepos=share->pack.header_length; old_record_count=info->state->records; info->state->records=0; - if (sort_info->new_data_file_type != COMPRESSED_RECORD) + if (sort_info.new_data_file_type != COMPRESSED_RECORD) share->state.checksum=0; - if (sort_record_index(param, info,keyinfo,share->state.key_root[sort_key], + if (sort_record_index(&sort_param,info,keyinfo,share->state.key_root[sort_key], temp_buff, sort_key,new_file,update_index) || - write_data_suffix(param, info) || + write_data_suffix(&sort_info,1) || flush_io_cache(&info->rec_cache)) goto err; @@ -1452,7 +1456,7 @@ static int mi_sort_records(MI_CHECK *param, info->state->del=0; info->state->empty=0; share->state.dellink= HA_OFFSET_ERROR; - info->state->data_file_length=sort_info->filepos; + info->state->data_file_length=sort_param.filepos; share->state.split=info->state->records; /* Only hole records */ share->state.version=(ulong) time((time_t*) 0); @@ -1476,11 +1480,11 @@ err: { my_afree((gptr) temp_buff); } - my_free(sort_info->record,MYF(MY_ALLOW_ZERO_PTR)); + my_free(sort_param.record,MYF(MY_ALLOW_ZERO_PTR)); info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED); VOID(end_io_cache(&info->rec_cache)); - my_free(sort_info->buff,MYF(MY_ALLOW_ZERO_PTR)); - sort_info->buff=0; + my_free(sort_info.buff,MYF(MY_ALLOW_ZERO_PTR)); + sort_info.buff=0; share->state.sortkey=sort_key; DBUG_RETURN(flush_blocks(param, share->kfile) | got_error); } /* sort_records */ @@ -1488,7 +1492,8 @@ err: /* Sort records recursive using one index */ -static int sort_record_index(MI_CHECK *param,MI_INFO *info, MI_KEYDEF *keyinfo, +static int sort_record_index(MI_SORT_PARAM *sort_param,MI_INFO *info, + MI_KEYDEF *keyinfo, my_off_t page, uchar *buff, uint sort_key, File new_file,my_bool update_index) { @@ -1497,7 +1502,8 @@ static int sort_record_index(MI_CHECK *param,MI_INFO *info, MI_KEYDEF *keyinfo, my_off_t next_page,rec_pos; uchar lastkey[MI_MAX_KEY_BUFF]; char llbuff[22]; - SORT_INFO *sort_info= ¶m->sort_info; + SORT_INFO *sort_info= sort_param->sort_info; + MI_CHECK *param=sort_info->param; DBUG_ENTER("sort_record_index"); nod_flag=mi_test_if_nod(buff); @@ -1528,7 +1534,7 @@ static int sort_record_index(MI_CHECK *param,MI_INFO *info, MI_KEYDEF *keyinfo, llstr(next_page,llbuff)); goto err; } - if (sort_record_index(param, info,keyinfo,next_page,temp_buff,sort_key, + if (sort_record_index(sort_param, info,keyinfo,next_page,temp_buff,sort_key, new_file, update_index)) goto err; } @@ -1539,23 +1545,23 @@ static int sort_record_index(MI_CHECK *param,MI_INFO *info, MI_KEYDEF *keyinfo, break; rec_pos= _mi_dpos(info,0,lastkey+key_length); - if ((*info->s->read_rnd)(info,sort_info->record,rec_pos,0)) + if ((*info->s->read_rnd)(info,sort_param->record,rec_pos,0)) { mi_check_print_error(param,"%d when reading datafile",my_errno); goto err; } - if (rec_pos != sort_info->filepos && update_index) + if (rec_pos != sort_param->filepos && update_index) { _mi_dpointer(info,keypos-nod_flag-info->s->rec_reflength, - sort_info->filepos); - if (movepoint(info,sort_info->record,rec_pos,sort_info->filepos, + sort_param->filepos); + if (movepoint(info,sort_param->record,rec_pos,sort_param->filepos, sort_key)) { mi_check_print_error(param,"%d when updating key-pointers",my_errno); goto err; } } - if (sort_write_record(sort_info)) + if (sort_write_record(sort_param)) goto err; } /* Clear end of block to get better compression if the table is backuped */ diff --git a/myisam/myisamdef.h b/myisam/myisamdef.h index e9d3461fe9a..b5da3d4b4c3 100644 --- a/myisam/myisamdef.h +++ b/myisam/myisamdef.h @@ -370,8 +370,10 @@ struct st_myisam_info { #define MI_MAX_KEY_BLOCK_SIZE (MI_MAX_KEY_BLOCK_LENGTH/MI_MIN_KEY_BLOCK_LENGTH) #define MI_BLOCK_SIZE(key_length,data_pointer,key_pointer) ((((key_length+data_pointer+key_pointer)*4+key_pointer+2)/myisam_block_size+1)*myisam_block_size) -#define MI_MAX_KEYPTR_SIZE 5 /* For calculating block lengths */ -#define MI_MIN_KEYBLOCK_LENGTH 50 /* When to split delete blocks */ +#define MI_MAX_KEYPTR_SIZE 5 /* For calculating block lengths */ +#define MI_MIN_KEYBLOCK_LENGTH 50 /* When to split delete blocks */ + +#define MI_MIN_SIZE_BULK_INSERT_TREE 16384 /* this is per key */ /* The UNIQUE check is done with a hashed long key */ @@ -656,7 +658,7 @@ int _mi_init_bulk_insert(MI_INFO *info); void mi_check_print_error _VARARGS((MI_CHECK *param, const char *fmt,...)); void mi_check_print_warning _VARARGS((MI_CHECK *param, const char *fmt,...)); void mi_check_print_info _VARARGS((MI_CHECK *param, const char *fmt,...)); -int flush_pending_blocks(MI_CHECK *param); +int flush_pending_blocks(MI_SORT_PARAM *param); #ifdef __cplusplus } diff --git a/myisam/sort.c b/myisam/sort.c index bec77b231b8..75fb8bf7247 100644 --- a/myisam/sort.c +++ b/myisam/sort.c @@ -27,7 +27,7 @@ #endif #include <queues.h> - /* static variabels */ + /* static variables */ #undef MIN_SORT_MEMORY #undef MYF_RW #undef DISK_BUFFER_SIZE @@ -35,44 +35,43 @@ #define MERGEBUFF 15 #define MERGEBUFF2 31 #define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD) -#define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL) +#define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL) #define DISK_BUFFER_SIZE (IO_SIZE*16) typedef struct st_buffpek { - my_off_t file_pos; /* Where we are in the sort file */ - uchar *base,*key; /* Key pointers */ - ha_rows count; /* Number of rows in table */ - ulong mem_count; /* numbers of keys in memory */ - ulong max_keys; /* Max keys in buffert */ + my_off_t file_pos; /* Where we are in the sort file */ + uchar *base,*key; /* Key pointers */ + ha_rows count; /* Number of rows in table */ + ulong mem_count; /* numbers of keys in memory */ + ulong max_keys; /* Max keys in buffert */ } BUFFPEK; extern void print_error _VARARGS((const char *fmt,...)); - /* functions defined in this file */ + /* functions defined in this file */ static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info,uint keys, - uchar **sort_keys, - DYNAMIC_ARRAY *buffpek,int *maxbuffer, - IO_CACHE *tempfile, - IO_CACHE *tempfile_for_exceptions); + uchar **sort_keys, + DYNAMIC_ARRAY *buffpek,int *maxbuffer, + IO_CACHE *tempfile, + IO_CACHE *tempfile_for_exceptions); static int NEAR_F write_keys(MI_SORT_PARAM *info,uchar * *sort_keys, - uint count, BUFFPEK *buffpek,IO_CACHE *tempfile); + uint count, BUFFPEK *buffpek,IO_CACHE *tempfile); static int NEAR_F write_key(MI_SORT_PARAM *info, uchar *key, IO_CACHE *tempfile); static int NEAR_F write_index(MI_SORT_PARAM *info,uchar * *sort_keys, - uint count); + uint count); static int NEAR_F merge_many_buff(MI_SORT_PARAM *info,uint keys, - uchar * *sort_keys, - BUFFPEK *buffpek,int *maxbuffer, - IO_CACHE *t_file); + uchar * *sort_keys, + BUFFPEK *buffpek,int *maxbuffer, + IO_CACHE *t_file); static uint NEAR_F read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek, - uint sort_length); + uint sort_length); static int NEAR_F merge_buffers(MI_SORT_PARAM *info,uint keys, - IO_CACHE *from_file, IO_CACHE *to_file, - uchar * *sort_keys, BUFFPEK *lastbuff, - BUFFPEK *Fb, BUFFPEK *Tb); + IO_CACHE *from_file, IO_CACHE *to_file, + uchar * *sort_keys, BUFFPEK *lastbuff, + BUFFPEK *Fb, BUFFPEK *Tb); static int NEAR_F merge_index(MI_SORT_PARAM *,uint,uchar **,BUFFPEK *, int, - IO_CACHE *); - + IO_CACHE *); /* Creates a index of sorted keys */ /* Returns 0 if everything went ok */ @@ -95,7 +94,7 @@ int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages, maxbuffer=1; memavl=max(sortbuff_size,MIN_SORT_MEMORY); - records= info->max_records; + records= info->sort_info->max_records; sort_length= info->key_length; LINT_INIT(keys); @@ -174,13 +173,13 @@ int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages, goto err; /* purecov: inspected */ } - if (flush_pending_blocks(info->sort_info->param)) + if (flush_pending_blocks(info)) goto err; if (my_b_inited(&tempfile_for_exceptions)) { MI_INFO *index=info->sort_info->info; - uint keyno=info->sort_info->key; + uint keyno=info->key; uint key_length, ref_length=index->s->rec_reflength; if (flush_io_cache(&tempfile_for_exceptions) || @@ -224,9 +223,9 @@ static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info, uint keys, idx=error=0; sort_keys[0]=(uchar*) (sort_keys+keys); - while(!(error=(*info->key_read)(info->sort_info,sort_keys[idx]))) + while(!(error=(*info->key_read)(info,sort_keys[idx]))) { - if (info->sort_info->real_key_length > info->key_length) + if (info->real_key_length > info->key_length) { if (write_key(info,sort_keys[idx],tempfile_for_exceptions)) DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */ @@ -259,21 +258,262 @@ static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info, uint keys, DBUG_RETURN((*maxbuffer)*(keys-1)+idx); } /* find_all_keys */ - /* Write all keys in memory to file for later merge */ + /* Search after all keys and place them in a temp. file */ + +void *_thr_find_all_keys(MI_SORT_PARAM *info) +{ + int error,skr; + uint memavl,old_memavl,keys,sort_length; + uint idx, maxbuffer; + uchar **sort_keys; + MI_KEYSEG *keyseg; + + my_b_clear(&info->tempfile); + my_b_clear(&info->tempfile_for_exceptions); + bzero((char*) &info->buffpek,sizeof(info->buffpek)); + bzero((char*) &info->unique, sizeof(info->unique)); + sort_keys= (uchar **) NULL; error= 1; + if (info->sort_info->got_error) + goto err; + + memavl=max(info->sortbuff_size, MIN_SORT_MEMORY); + idx= info->sort_info->max_records; + sort_length= info->key_length; + + while (memavl >= MIN_SORT_MEMORY) + { + if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <= + (my_off_t) memavl) + keys= idx+1; + else + do + { + skr=maxbuffer; + if (memavl < sizeof(BUFFPEK)*maxbuffer || + (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/ + (sort_length+sizeof(char*))) <= 1) + { + mi_check_print_error(info->sort_info->param, + "sort_buffer_size is to small"); + goto err; + } + } + while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr); + + if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+ + ((info->keyinfo->flag & HA_FULLTEXT) ? HA_FT_MAXLEN : 0), MYF(0)))) + { + if (my_init_dynamic_array(&info->buffpek, sizeof(BUFFPEK), + maxbuffer, maxbuffer/2)) + my_free((gptr) sort_keys,MYF(0)); + else + break; + } + old_memavl=memavl; + if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY) + memavl=MIN_SORT_MEMORY; + } + if (memavl < MIN_SORT_MEMORY) + { + mi_check_print_error(info->sort_info->param,"Sort buffer to small"); /* purecov: tested */ + goto err; /* purecov: tested */ + } +// (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */ + + if (info->sort_info->param->testflag & T_VERBOSE) + printf("Key %d - Allocating buffer for %d keys\n",info->key+1,keys); + info->sort_keys=sort_keys; + + idx=error=0; + sort_keys[0]=(uchar*) (sort_keys+keys); + + while(!(error=info->sort_info->got_error) || + !(error=(*info->key_read)(info,sort_keys[idx]))) + { + if (info->real_key_length > info->key_length) + { + if (write_key(info,sort_keys[idx],& info->tempfile_for_exceptions)) + goto err; + continue; + } + + if (++idx == keys) + { + if (write_keys(info,sort_keys,idx-1, + (BUFFPEK *)alloc_dynamic(&info->buffpek), &info->tempfile)) + goto err; + + sort_keys[0]=(uchar*) (sort_keys+keys); + memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length); + idx=1; + } + sort_keys[idx]=sort_keys[idx-1]+info->key_length; + } + if (error > 0) + goto err; + if (info->buffpek.elements) + { + if (write_keys(info,sort_keys,idx,(BUFFPEK *) + alloc_dynamic(&info->buffpek),&info->tempfile)) + goto err; + info->keys=(info->buffpek.elements-1)*(keys-1)+idx; + } + else + info->keys=idx; + + info->sort_keys_length=keys; + goto ok; + +err: + info->sort_info->got_error=1; /* no need to protect this with a mutex */ + if (sort_keys) + my_free((gptr) sort_keys,MYF(0)); + info->sort_keys=0; + delete_dynamic(& info->buffpek); + close_cached_file(& info->tempfile); + close_cached_file(& info->tempfile_for_exceptions); +ok: + remove_io_thread(& info->read_cache); + pthread_mutex_lock(& info->sort_info->mutex); + info->sort_info->threads_running--; + pthread_cond_signal(& info->sort_info->cond); + pthread_mutex_unlock(& info->sort_info->mutex); + return NULL; +} /* _thr_find_all_keys */ + +int _thr_write_keys(MI_SORT_PARAM *sort_param) +{ + SORT_INFO *sort_info=sort_param->sort_info; + MI_CHECK *param=sort_info->param; + ulong length, keys; + ulong *rec_per_key_part=param->rec_per_key_part; + int i, got_error=sort_info->got_error; + MI_INFO *info=sort_info->info; + MYISAM_SHARE *share=info->s; + MI_SORT_PARAM *sinfo; + byte *mergebuf=0; + + for (i=0, sinfo=sort_param ; i<sort_info->total_keys ; i++, sinfo++, + rec_per_key_part+=sinfo->keyinfo->keysegs) + { + if (!sinfo->sort_keys) + { + got_error=1; + continue; + } + share->state.key_map|=(ulonglong) 1 << sinfo->key; + if (param->testflag & T_STATISTICS) + update_key_parts(sinfo->keyinfo, rec_per_key_part, + sinfo->unique, (ulonglong) info->state->records); + if (!sinfo->buffpek.elements) + { + if (param->testflag & T_VERBOSE) + printf("Key %d - Dumping %lu keys\n",sinfo->key+1, sinfo->keys); + if (write_index(sinfo,sinfo->sort_keys,(uint) sinfo->keys) || + flush_pending_blocks(sinfo)) + got_error=1; + } + my_free((gptr) sinfo->sort_keys,MYF(0)); + sinfo->sort_keys=0; + } + + for (i=0, sinfo=sort_param ; i<sort_info->total_keys ; i++, sinfo++, + delete_dynamic(& sinfo->buffpek), + close_cached_file(& sinfo->tempfile), + close_cached_file(& sinfo->tempfile_for_exceptions)) + { + if (got_error) continue; + if (sinfo->buffpek.elements) + { + uint maxbuffer=sinfo->buffpek.elements-1; + if (!mergebuf) + { + length=param->sort_buffer_length; + while (length >= MIN_SORT_MEMORY && !mergebuf) + { + mergebuf=my_malloc(length, MYF(0)); + length=length*3/4; + } + if (!mergebuf) + { + got_error=1; + continue; + } + } + keys=length/sinfo->key_length; + if (maxbuffer >= MERGEBUFF2) + { + if (param->testflag & T_VERBOSE) + printf("Key %d - Merging %lu keys\n",sinfo->key+1, sinfo->keys); + if (merge_many_buff(sinfo, keys, (uchar **)mergebuf, + dynamic_element(&sinfo->buffpek, 0, BUFFPEK *), + &maxbuffer, &sinfo->tempfile)) + { + got_error=1; + continue; + } + } + if (flush_io_cache(&sinfo->tempfile) || + reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0)) + { + got_error=1; + continue; + } + if (param->testflag & T_VERBOSE) + printf("Key %d - Last merge and dumping keys", sinfo->key+1); + if (merge_index(sinfo, keys, (uchar **)mergebuf, + dynamic_element(&sinfo->buffpek,0,BUFFPEK *), + maxbuffer,&sinfo->tempfile) + || flush_pending_blocks(sinfo)) + { + got_error=1; + continue; + } + } + if (my_b_inited(&sinfo->tempfile_for_exceptions)) + { + uint key_length; + + if (param->testflag & T_VERBOSE) + printf("Key %d - Dumping 'long' keys", sinfo->key+1); + + if (flush_io_cache(&sinfo->tempfile_for_exceptions) || + reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0)) + { + got_error=1; + continue; + } + + while (!got_error + && !my_b_read(&sinfo->tempfile_for_exceptions,(byte*)&key_length, + sizeof(key_length)) + && !my_b_read(&sinfo->tempfile_for_exceptions,(byte*)mergebuf, + (uint) key_length)) + { + if (_mi_ck_write(info,sinfo->key,(uchar*) mergebuf, + key_length - info->s->rec_reflength)) + got_error=1; + } + } + } + my_free((gptr) mergebuf,MYF(MY_ALLOW_ZERO_PTR)); + return got_error; +} + + /* Write all keys in memory to file for later merge */ static int NEAR_F write_keys(MI_SORT_PARAM *info, register uchar **sort_keys, - uint count, BUFFPEK *buffpek, - IO_CACHE *tempfile) + uint count, BUFFPEK *buffpek, IO_CACHE *tempfile) { uchar **end; uint sort_length=info->key_length; DBUG_ENTER("write_keys"); qsort2((byte*) sort_keys,count,sizeof(byte*),(qsort2_cmp) info->key_cmp, - info->sort_info); + info); if (!my_b_inited(tempfile) && open_cached_file(tempfile, info->tmpdir, "ST", DISK_BUFFER_SIZE, - info->myf_rw)) + info->sort_info->param->myf_rw)) DBUG_RETURN(1); /* purecov: inspected */ buffpek->file_pos=my_b_tell(tempfile); @@ -288,12 +528,12 @@ static int NEAR_F write_keys(MI_SORT_PARAM *info, register uchar **sort_keys, static int NEAR_F write_key(MI_SORT_PARAM *info, uchar *key, IO_CACHE *tempfile) { - uint key_length=info->sort_info->real_key_length; + uint key_length=info->real_key_length; DBUG_ENTER("write_key"); if (!my_b_inited(tempfile) && open_cached_file(tempfile, info->tmpdir, "ST", DISK_BUFFER_SIZE, - info->myf_rw)) + info->sort_info->param->myf_rw)) DBUG_RETURN(1); if (my_b_write(tempfile,(byte*)&key_length,sizeof(key_length)) || @@ -302,27 +542,27 @@ static int NEAR_F write_key(MI_SORT_PARAM *info, uchar *key, IO_CACHE *tempfile) DBUG_RETURN(0); } /* write_key */ - /* Write index */ + /* Write index */ static int NEAR_F write_index(MI_SORT_PARAM *info, register uchar **sort_keys, - register uint count) + register uint count) { DBUG_ENTER("write_index"); qsort2((gptr) sort_keys,(size_t) count,sizeof(byte*), - (qsort2_cmp) info->key_cmp,info->sort_info); + (qsort2_cmp) info->key_cmp,info); while (count--) - if ((*info->key_write)(info->sort_info,*sort_keys++)) + if ((*info->key_write)(info,*sort_keys++)) DBUG_RETURN(-1); /* purecov: inspected */ DBUG_RETURN(0); } /* write_index */ - /* Merge buffers to make < MERGEBUFF2 buffers */ + /* Merge buffers to make < MERGEBUFF2 buffers */ static int NEAR_F merge_many_buff(MI_SORT_PARAM *info, uint keys, - uchar **sort_keys, BUFFPEK *buffpek, - int *maxbuffer, IO_CACHE *t_file) + uchar **sort_keys, BUFFPEK *buffpek, + int *maxbuffer, IO_CACHE *t_file) { register int i; IO_CACHE t_file2, *from_file, *to_file, *temp; @@ -330,11 +570,11 @@ static int NEAR_F merge_many_buff(MI_SORT_PARAM *info, uint keys, DBUG_ENTER("merge_many_buff"); if (*maxbuffer < MERGEBUFF2) - DBUG_RETURN(0); /* purecov: inspected */ + DBUG_RETURN(0); /* purecov: inspected */ if (flush_io_cache(t_file) || open_cached_file(&t_file2,info->tmpdir,"ST",DISK_BUFFER_SIZE, - info->myf_rw)) - DBUG_RETURN(1); /* purecov: inspected */ + info->sort_info->param->myf_rw)) + DBUG_RETURN(1); /* purecov: inspected */ from_file= t_file ; to_file= &t_file2; while (*maxbuffer >= MERGEBUFF2) @@ -345,30 +585,30 @@ static int NEAR_F merge_many_buff(MI_SORT_PARAM *info, uint keys, for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF) { if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++, - buffpek+i,buffpek+i+MERGEBUFF-1)) - break; /* purecov: inspected */ + buffpek+i,buffpek+i+MERGEBUFF-1)) + break; /* purecov: inspected */ } if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++, - buffpek+i,buffpek+ *maxbuffer)) + buffpek+i,buffpek+ *maxbuffer)) break; /* purecov: inspected */ if (flush_io_cache(to_file)) - break; /* purecov: inspected */ + break; /* purecov: inspected */ temp=from_file; from_file=to_file; to_file=temp; *maxbuffer= (int) (lastbuff-buffpek)-1; } - close_cached_file(to_file); /* This holds old result */ + close_cached_file(to_file); /* This holds old result */ if (to_file == t_file) - *t_file=t_file2; /* Copy result file */ + *t_file=t_file2; /* Copy result file */ - DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */ + DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */ } /* merge_many_buff */ - /* Read data to buffer */ - /* This returns (uint) -1 if something goes wrong */ + /* Read data to buffer */ + /* This returns (uint) -1 if something goes wrong */ static uint NEAR_F read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek, - uint sort_length) + uint sort_length) { register uint count; uint length; @@ -376,24 +616,24 @@ static uint NEAR_F read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek, if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count))) { if (my_pread(fromfile->file,(byte*) buffpek->base, - (length= sort_length*count),buffpek->file_pos,MYF_RW)) - return((uint) -1); /* purecov: inspected */ + (length= sort_length*count),buffpek->file_pos,MYF_RW)) + return((uint) -1); /* purecov: inspected */ buffpek->key=buffpek->base; - buffpek->file_pos+= length; /* New filepos */ - buffpek->count-= count; + buffpek->file_pos+= length; /* New filepos */ + buffpek->count-= count; buffpek->mem_count= count; } return (count*sort_length); } /* read_to_buffer */ - /* Merge buffers to one buffer */ - /* If to_file == 0 then use info->key_write */ + /* Merge buffers to one buffer */ + /* If to_file == 0 then use info->key_write */ static int NEAR_F merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file, - IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff, - BUFFPEK *Fb, BUFFPEK *Tb) + IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff, + BUFFPEK *Fb, BUFFPEK *Tb) { int error; uint sort_length,maxcount; @@ -413,8 +653,8 @@ merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file, sort_length=info->key_length; if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0, - (int (*)(void*, byte *,byte*)) info->key_cmp, - (void*) info->sort_info)) + (int (*)(void*, byte *,byte*)) info->key_cmp, + (void*) info->sort_info)) DBUG_RETURN(1); /* purecov: inspected */ for (buffpek= Fb ; buffpek <= Tb ; buffpek++) @@ -423,7 +663,7 @@ merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file, buffpek->base= strpos; buffpek->max_keys=maxcount; strpos+= (uint) (error=(int) read_to_buffer(from_file,buffpek, - sort_length)); + sort_length)); if (error == -1) goto err; /* purecov: inspected */ queue_insert(&queue,(char*) buffpek); @@ -436,52 +676,52 @@ merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file, buffpek=(BUFFPEK*) queue_top(&queue); if (to_file) { - if (my_b_write(to_file,(byte*) buffpek->key,(uint) sort_length)) - { - error=1; goto err; /* purecov: inspected */ - } + if (my_b_write(to_file,(byte*) buffpek->key,(uint) sort_length)) + { + error=1; goto err; /* purecov: inspected */ + } } else { - if ((*info->key_write)(info->sort_info,(void*) buffpek->key)) - { - error=1; goto err; /* purecov: inspected */ - } + if ((*info->key_write)(info,(void*) buffpek->key)) + { + error=1; goto err; /* purecov: inspected */ + } } buffpek->key+=sort_length; if (! --buffpek->mem_count) { - if (!(error=(int) read_to_buffer(from_file,buffpek,sort_length))) - { - uchar *base=buffpek->base; - uint max_keys=buffpek->max_keys; - - VOID(queue_remove(&queue,0)); - - /* Put room used by buffer to use in other buffer */ - for (refpek= (BUFFPEK**) &queue_top(&queue); - refpek <= (BUFFPEK**) &queue_end(&queue); - refpek++) - { - buffpek= *refpek; - if (buffpek->base+buffpek->max_keys*sort_length == base) - { - buffpek->max_keys+=max_keys; - break; - } - else if (base+max_keys*sort_length == buffpek->base) - { - buffpek->base=base; - buffpek->max_keys+=max_keys; - break; - } - } - break; /* One buffer have been removed */ - } + if (!(error=(int) read_to_buffer(from_file,buffpek,sort_length))) + { + uchar *base=buffpek->base; + uint max_keys=buffpek->max_keys; + + VOID(queue_remove(&queue,0)); + + /* Put room used by buffer to use in other buffer */ + for (refpek= (BUFFPEK**) &queue_top(&queue); + refpek <= (BUFFPEK**) &queue_end(&queue); + refpek++) + { + buffpek= *refpek; + if (buffpek->base+buffpek->max_keys*sort_length == base) + { + buffpek->max_keys+=max_keys; + break; + } + else if (base+max_keys*sort_length == buffpek->base) + { + buffpek->base=base; + buffpek->max_keys+=max_keys; + break; + } + } + break; /* One buffer have been removed */ + } } else if (error == -1) - goto err; /* purecov: inspected */ - queue_replaced(&queue); /* Top element has been replaced */ + goto err; /* purecov: inspected */ + queue_replaced(&queue); /* Top element has been replaced */ } } buffpek=(BUFFPEK*) queue_top(&queue); @@ -492,9 +732,9 @@ merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file, if (to_file) { if (my_b_write(to_file,(byte*) buffpek->key, - (sort_length*buffpek->mem_count))) + (sort_length*buffpek->mem_count))) { - error=1; goto err; /* purecov: inspected */ + error=1; goto err; /* purecov: inspected */ } } else @@ -502,18 +742,18 @@ merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file, register uchar *end; strpos= buffpek->key; for (end=strpos+buffpek->mem_count*sort_length; - strpos != end ; - strpos+=sort_length) + strpos != end ; + strpos+=sort_length) { - if ((*info->key_write)(info->sort_info,(void*) strpos)) - { - error=1; goto err; /* purecov: inspected */ - } + if ((*info->key_write)(info,(void*) strpos)) + { + error=1; goto err; /* purecov: inspected */ + } } } } while ((error=(int) read_to_buffer(from_file,buffpek,sort_length)) != -1 && - error != 0); + error != 0); lastbuff->count=count; if (to_file) @@ -524,15 +764,15 @@ err: } /* merge_buffers */ - /* Do a merge to output-file (save only positions) */ + /* Do a merge to output-file (save only positions) */ static int NEAR_F merge_index(MI_SORT_PARAM *info, uint keys, uchar **sort_keys, - BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile) + BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile) { DBUG_ENTER("merge_index"); if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek, - buffpek+maxbuffer)) + buffpek+maxbuffer)) DBUG_RETURN(1); /* purecov: inspected */ DBUG_RETURN(0); } /* merge_index */ diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index 54dfaf6d993..1a761fa5ed8 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -86,7 +86,11 @@ init_functions(IO_CACHE* info, enum cache_type type) info->write_function = 0; /* Force a core if used */ break; default: - info->read_function = _my_b_read; + info->read_function = +#ifdef THREAD + info->share ? _my_b_read_r : +#endif + _my_b_read; info->write_function = _my_b_write; } @@ -126,6 +130,9 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, info->alloced_buffer = 0; info->buffer=0; info->seek_not_done= test(file >= 0); +#ifdef THREAD + info->share=0; +#endif if (!cachesize) if (! (cachesize= my_default_record_cache_size)) @@ -213,7 +220,6 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, DBUG_RETURN(0); } /* init_io_cache */ - /* Wait until current request is ready */ #ifdef HAVE_AIOWAIT @@ -418,6 +424,90 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) DBUG_RETURN(0); } +#ifdef THREAD +int init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads) +{ + DBUG_ASSERT(info->type == READ_CACHE); + pthread_mutex_init(& s->mutex, MY_MUTEX_INIT_FAST); + pthread_cond_init (& s->cond, 0); + s->count=num_threads; + s->active=0; /* to catch errors */ + info->share=s; + info->read_function=_my_b_read_r; +} + +int remove_io_thread(IO_CACHE *info) +{ + if (errno=pthread_mutex_lock(& info->share->mutex)) + return -1; + if (! info->share->count--) + pthread_cond_signal(& info->share->cond); + pthread_mutex_unlock(& info->share->mutex); + return 0; +} + +int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) +{ + my_off_t pos_in_file; + int length,diff_length,read_len; + DBUG_ENTER("_my_b_read_r"); + + if ((read_len=(uint) (info->read_end-info->read_pos))) + { + DBUG_ASSERT(Count >= read_len); /* User is not using my_b_read() */ + memcpy(Buffer,info->read_pos, (size_t) (read_len)); + Buffer+=read_len; + Count-=read_len; + } + +#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1)) +#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1)) + + while (Count) { + int cnt, len; + + pos_in_file= info->pos_in_file + (uint)(info->read_end - info->buffer); + diff_length= pos_in_file & (IO_SIZE-1); + length=IO_ROUND_UP(Count+diff_length)-diff_length; + length=(length <= info->read_length) ? + length + IO_ROUND_DN(info->read_length - length) : + length - IO_ROUND_UP(length - info->read_length) ; + if (lock_io_cache(info)) + { + info->share->active=info; + if (info->seek_not_done) /* File touched, do seek */ + VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); + len=my_read(info->file,info->buffer, length, info->myflags); + info->read_end=info->buffer + (len == -1 ? 0 : len); + info->error=(len == length ? 0 : len); + info->pos_in_file=pos_in_file; + unlock_io_cache(info); + } + else + { + info->error= info->share->active->error; + info->read_end= info->share->active->read_end; + info->pos_in_file= info->share->active->pos_in_file; + len= (info->error == -1 ? -1 : info->read_end-info->buffer); + } + info->read_pos=info->buffer; + info->seek_not_done=0; + if (info->error) + { + info->error=read_len; + DBUG_RETURN(1); + } + cnt=(len > Count) ? Count : len; + memcpy(Buffer,info->read_pos, (size_t)cnt); + Count -=cnt; + Buffer+=cnt; + read_len+=cnt; + info->read_pos+=cnt; + } + DBUG_RETURN(0); +} +#endif + /* Do sequential read from the SEQ_READ_APPEND cache we do this in three stages: @@ -453,7 +543,7 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) */ VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); info->seek_not_done=0; - + diff_length=(uint) (pos_in_file & (IO_SIZE-1)); /* now the second stage begins - read from file descriptor */ @@ -509,7 +599,7 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) memcpy(Buffer,info->buffer,(size_t) length); Count -= length; Buffer += length; - + /* added the line below to make DBUG_ASSERT(pos_in_file==info->end_of_file) pass. @@ -543,7 +633,7 @@ read_append_buffer: /* TODO: figure out if the assert below is needed or correct. */ - DBUG_ASSERT(pos_in_file == info->end_of_file); + DBUG_ASSERT(pos_in_file == info->end_of_file); copy_len=min(Count, len_in_buff); memcpy(Buffer, info->append_read_pos, copy_len); info->append_read_pos += copy_len; @@ -909,7 +999,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) if (!(append_cache = (info->type == SEQ_READ_APPEND))) need_append_buffer_lock=0; - + if (info->type == WRITE_CACHE || append_cache) { if (info->file == -1) @@ -918,7 +1008,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) DBUG_RETURN((info->error= -1)); } LOCK_APPEND_BUFFER; - + if ((length=(uint) (info->write_pos - info->write_buffer))) { pos_in_file=info->pos_in_file; @@ -955,7 +1045,7 @@ int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) info->end_of_file+=(info->write_pos-info->append_read_pos); DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0))); } - + info->append_read_pos=info->write_pos=info->write_buffer; UNLOCK_APPEND_BUFFER; DBUG_RETURN(info->error); @@ -979,6 +1069,18 @@ int end_io_cache(IO_CACHE *info) IO_CACHE_CALLBACK pre_close; DBUG_ENTER("end_io_cache"); +#ifdef THREAD + /* simple protection against multi-close: destroying share first */ + if (info->share) + if (pthread_cond_destroy (& info->share->cond) | + pthread_mutex_destroy(& info->share->mutex)) + { + DBUG_RETURN(1); + } + else + info->share=0; +#endif + if ((pre_close=info->pre_close)) (*pre_close)(info); if (info->alloced_buffer) |