diff options
Diffstat (limited to 'myisam/mi_check.c')
-rw-r--r-- | myisam/mi_check.c | 306 |
1 files changed, 257 insertions, 49 deletions
diff --git a/myisam/mi_check.c b/myisam/mi_check.c index 5a81d221d2a..06fdd3bff4c 100644 --- a/myisam/mi_check.c +++ b/myisam/mi_check.c @@ -16,6 +16,31 @@ /* Describe, check and repair of MyISAM tables */ +/* + About checksum calculation. + + There are two types of checksums. Table checksum and row checksum. + + Row checksum is an additional byte at the end of dynamic length + records. It must be calculated if the table is configured for them. + Otherwise they must not be used. The variable + MYISAM_SHARE::calc_checksum determines if row checksums are used. + MI_INFO::checksum is used as temporary storage during row handling. + For parallel repair we must assure that only one thread can use this + variable. There is no problem on the write side as this is done by one + thread only. But when checking a record after read this could go + wrong. But since all threads read through a common read buffer, it is + sufficient if only one thread checks it. + + Table checksum is an eight byte value in the header of the index file. + It can be calculated even if row checksums are not used. The variable + MI_CHECK::glob_crc is calculated over all records. + MI_SORT_PARAM::calc_checksum determines if this should be done. This + variable is not part of MI_CHECK because it must be set per thread for + parallel repair. The global glob_crc must be changed by one thread + only. And it is sufficient to calculate the checksum once only. +*/ + #include "ftdefs.h" #include <m_ctype.h> #include <stdarg.h> @@ -41,8 +66,7 @@ static int chk_index(MI_CHECK *param, MI_INFO *info,MI_KEYDEF *keyinfo, ha_checksum *key_checksum, uint level); static uint isam_key_length(MI_INFO *info,MI_KEYDEF *keyinfo); static ha_checksum calc_checksum(ha_rows count); -static int writekeys(MI_CHECK *param, MI_INFO *info,byte *buff, - my_off_t filepos); +static int writekeys(MI_SORT_PARAM *sort_param); static int sort_one_index(MI_CHECK *param, MI_INFO *info,MI_KEYDEF *keyinfo, my_off_t pagepos, File new_file); static int sort_key_read(MI_SORT_PARAM *sort_param,void *key); @@ -1102,7 +1126,8 @@ int chk_data_link(MI_CHECK *param, MI_INFO *info,int extend) goto err; start_recpos=pos; splits++; - VOID(_mi_pack_get_block_info(info,&block_info, -1, start_recpos)); + VOID(_mi_pack_get_block_info(info, &info->bit_buff, &block_info, + &info->rec_buff, -1, start_recpos)); pos=block_info.filepos+block_info.rec_len; if (block_info.rec_len < (uint) info->s->min_pack_length || block_info.rec_len > (uint) info->s->max_pack_length) @@ -1116,7 +1141,8 @@ int chk_data_link(MI_CHECK *param, MI_INFO *info,int extend) if (_mi_read_cache(¶m->read_cache,(byte*) info->rec_buff, block_info.filepos, block_info.rec_len, READING_NEXT)) goto err; - if (_mi_pack_rec_unpack(info,record,info->rec_buff,block_info.rec_len)) + if (_mi_pack_rec_unpack(info, &info->bit_buff, record, + info->rec_buff, block_info.rec_len)) { mi_check_print_error(param,"Found wrong record at %s", llstr(start_recpos,llbuff)); @@ -1400,7 +1426,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, info->state->empty=0; param->glob_crc=0; if (param->testflag & T_CALC_CHECKSUM) - param->calc_checksum=1; + sort_param.calc_checksum= 1; info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED); @@ -1429,7 +1455,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, /* Re-create all keys, which are set in key_map. */ while (!(error=sort_get_next_record(&sort_param))) { - if (writekeys(param,info,(byte*)sort_param.record,sort_param.filepos)) + if (writekeys(&sort_param)) { if (my_errno != HA_ERR_FOUND_DUPP_KEY) goto err; @@ -1574,11 +1600,13 @@ err: /* Uppate keyfile when doing repair */ -static int writekeys(MI_CHECK *param, register MI_INFO *info, byte *buff, - my_off_t filepos) +static int writekeys(MI_SORT_PARAM *sort_param) { register uint i; - uchar *key; + uchar *key; + MI_INFO *info= sort_param->sort_info->info; + byte *buff= sort_param->record; + my_off_t filepos= sort_param->filepos; DBUG_ENTER("writekeys"); key=info->lastkey+info->s->base.max_key_length; @@ -1632,8 +1660,8 @@ static int writekeys(MI_CHECK *param, register MI_INFO *info, byte *buff, } } /* Remove checksum that was added to glob_crc in sort_get_next_record */ - if (param->calc_checksum) - param->glob_crc-= info->checksum; + if (sort_param->calc_checksum) + sort_param->sort_info->param->glob_crc-= info->checksum; DBUG_PRINT("error",("errno: %d",my_errno)); DBUG_RETURN(-1); } /* writekeys */ @@ -2139,7 +2167,7 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, del=info->state->del; param->glob_crc=0; if (param->testflag & T_CALC_CHECKSUM) - param->calc_checksum=1; + sort_param.calc_checksum= 1; rec_per_key_part= param->rec_per_key_part; for (sort_param.key=0 ; sort_param.key < share->base.keys ; @@ -2201,7 +2229,8 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, param->retry_repair=1; goto err; } - param->calc_checksum=0; /* No need to calc glob_crc */ + /* No need to calculate checksum again. */ + sort_param.calc_checksum= 0; /* Set for next loop */ sort_info.max_records= (ha_rows) info->state->records; @@ -2364,6 +2393,28 @@ err: Each key is handled by a separate thread. TODO: make a number of threads a parameter + In parallel repair we use one thread per index. There are two modes: + + Quick + + Only the indexes are rebuilt. All threads share a read buffer. + Every thread that needs fresh data in the buffer enters the shared + cache lock. The last thread joining the lock reads the buffer from + the data file and wakes all other threads. + + Non-quick + + The data file is rebuilt and all indexes are rebuilt to point to + the new record positions. One thread is the master thread. It + reads from the old data file and writes to the new data file. It + also creates one of the indexes. The other threads read from a + buffer which is filled by the master. If they need fresh data, + they enter the shared cache lock. If the masters write buffer is + full, it flushes it to the new data file and enters the shared + cache lock too. When all threads joined in the lock, the master + copies its write buffer to the read buffer for the other threads + and wakes them. + RESULT 0 ok <>0 Error @@ -2386,6 +2437,7 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, ulong *rec_per_key_part; HA_KEYSEG *keyseg; char llbuff[22]; + IO_CACHE new_data_cache; /* For non-quick repair. */ IO_CACHE_SHARE io_share; SORT_INFO sort_info; ulonglong key_map=share->state.key_map; @@ -2407,19 +2459,55 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, if (info->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD)) param->testflag|=T_CALC_CHECKSUM; + /* + Quick repair (not touching data file, rebuilding indexes): + { + Read cache is (MI_CHECK *param)->read_cache using info->dfile. + } + + Non-quick repair (rebuilding data file and indexes): + { + Master thread: + + Read cache is (MI_CHECK *param)->read_cache using info->dfile. + Write cache is (MI_INFO *info)->rec_cache using new_file. + + Slave threads: + + Read cache is new_data_cache synced to master rec_cache. + + The final assignment of the filedescriptor for rec_cache is done + after the cache creation. + + Don't check file size on new_data_cache, as the resulting file size + is not known yet. + + As rec_cache and new_data_cache are synced, write_buffer_length is + used for the read cache 'new_data_cache'. Both start at the same + position 'new_header_length'. + } + */ + DBUG_PRINT("info", ("is quick repair: %d", rep_quick)); bzero((char*)&sort_info,sizeof(sort_info)); + /* Initialize pthread structures before goto err. */ + pthread_mutex_init(&sort_info.mutex, MY_MUTEX_INIT_FAST); + pthread_cond_init(&sort_info.cond, 0); + if (!(sort_info.key_block= - alloc_key_blocks(param, - (uint) param->sort_key_blocks, - share->base.max_key_block_length)) - || init_io_cache(¶m->read_cache,info->dfile, - (uint) param->read_buffer_length, - READ_CACHE,share->pack.header_length,1,MYF(MY_WME)) || - (! rep_quick && - init_io_cache(&info->rec_cache,info->dfile, - (uint) param->write_buffer_length, - WRITE_CACHE,new_header_length,1, - MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw))) + alloc_key_blocks(param, (uint) param->sort_key_blocks, + share->base.max_key_block_length)) || + init_io_cache(¶m->read_cache, info->dfile, + (uint) param->read_buffer_length, + READ_CACHE, share->pack.header_length, 1, MYF(MY_WME)) || + (!rep_quick && + (init_io_cache(&info->rec_cache, info->dfile, + (uint) param->write_buffer_length, + WRITE_CACHE, new_header_length, 1, + MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw) || + init_io_cache(&new_data_cache, -1, + (uint) param->write_buffer_length, + READ_CACHE, new_header_length, 1, + MYF(MY_WME | MY_DONT_CHECK_FILESIZE))))) goto err; sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks; info->opt_flag|=WRITE_CACHE_USED; @@ -2510,8 +2598,6 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, del=info->state->del; param->glob_crc=0; - if (param->testflag & T_CALC_CHECKSUM) - param->calc_checksum=1; if (!(sort_param=(MI_SORT_PARAM *) my_malloc((uint) share->base.keys * @@ -2561,6 +2647,7 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, sort_param[i].sort_info=&sort_info; sort_param[i].master=0; sort_param[i].fix_datafile=0; + sort_param[i].calc_checksum= 0; sort_param[i].filepos=new_header_length; sort_param[i].max_pos=sort_param[i].pos=share->pack.header_length; @@ -2597,19 +2684,45 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, sort_info.total_keys=i; sort_param[0].master= 1; sort_param[0].fix_datafile= (my_bool)(! rep_quick); + sort_param[0].calc_checksum= test(param->testflag & T_CALC_CHECKSUM); sort_info.got_error=0; - pthread_mutex_init(&sort_info.mutex, MY_MUTEX_INIT_FAST); - pthread_cond_init(&sort_info.cond, 0); pthread_mutex_lock(&sort_info.mutex); - init_io_cache_share(¶m->read_cache, &io_share, i); + /* + Initialize the I/O cache share for use with the read caches and, in + case of non-quick repair, the write cache. When all threads join on + the cache lock, the writer copies the write cache contents to the + read caches. + */ + if (i > 1) + { + if (rep_quick) + init_io_cache_share(¶m->read_cache, &io_share, NULL, i); + else + init_io_cache_share(&new_data_cache, &io_share, &info->rec_cache, i); + } + else + io_share.total_threads= 0; /* share not used */ + (void) pthread_attr_init(&thr_attr); (void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED); for (i=0 ; i < sort_info.total_keys ; i++) { - sort_param[i].read_cache=param->read_cache; + /* + Copy the properly initialized IO_CACHE structure so that every + thread has its own copy. In quick mode param->read_cache is shared + for use by all threads. In non-quick mode all threads but the + first copy the shared new_data_cache, which is synchronized to the + write cache of the first thread. The first thread copies + param->read_cache, which is not shared. + */ + sort_param[i].read_cache= ((rep_quick || !i) ? param->read_cache : + new_data_cache); + DBUG_PRINT("io_cache_share", ("thread: %u read_cache: 0x%lx", + i, (long) &sort_param[i].read_cache)); + /* two approaches: the same amount of memory for each thread or the memory for the same number of keys for each thread... @@ -2627,7 +2740,10 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, (void *) (sort_param+i))) { mi_check_print_error(param,"Cannot start a repair thread"); - remove_io_thread(¶m->read_cache); + /* Cleanup: Detach from the share. Avoid others to be blocked. */ + if (io_share.total_threads) + remove_io_thread(&sort_param[i].read_cache); + DBUG_PRINT("error", ("Cannot start a repair thread")); sort_info.got_error=1; } else @@ -2649,6 +2765,11 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, if (sort_param[0].fix_datafile) { + /* + Append some nuls to the end of a memory mapped file. Destroy the + write cache. The master thread did already detach from the share + by remove_io_thread() in sort.c:thr_find_all_keys(). + */ if (write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache)) goto err; if (param->testflag & T_SAFE_REPAIR) @@ -2664,8 +2785,14 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, sort_param->filepos; /* Only whole records */ share->state.version=(ulong) time((time_t*) 0); + + /* + Exchange the data file descriptor of the table, so that we use the + new file from now on. + */ my_close(info->dfile,MYF(0)); info->dfile=new_file; + share->data_file_type=sort_info.new_data_file_type; share->pack.header_length=(ulong) new_header_length; } @@ -2720,7 +2847,20 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, err: got_error|= flush_blocks(param, share->key_cache, share->kfile); + /* + Destroy the write cache. The master thread did already detach from + the share by remove_io_thread() or it was not yet started (if the + error happend before creating the thread). + */ VOID(end_io_cache(&info->rec_cache)); + /* + Destroy the new data cache in case of non-quick repair. All slave + threads did either detach from the share by remove_io_thread() + already or they were not yet started (if the error happend before + creating the threads). + */ + if (!rep_quick) + VOID(end_io_cache(&new_data_cache)); if (!got_error) { /* Replace the actual file with the temporary file */ @@ -2851,12 +2991,41 @@ static int sort_ft_key_read(MI_SORT_PARAM *sort_param, void *key) } /* sort_ft_key_read */ - /* Read next record from file using parameters in sort_info */ - /* Return -1 if end of file, 0 if ok and > 0 if error */ +/* + Read next record from file using parameters in sort_info. + + SYNOPSIS + sort_get_next_record() + sort_param Information about and for the sort process + + NOTE + + Dynamic Records With Non-Quick Parallel Repair + + For non-quick parallel repair we use a synchronized read/write + cache. This means that one thread is the master who fixes the data + file by reading each record from the old data file and writing it + to the new data file. By doing this the records in the new data + file are written contiguously. Whenever the write buffer is full, + it is copied to the read buffer. The slaves read from the read + buffer, which is not associated with a file. Thus read_cache.file + is -1. When using _mi_read_cache(), the slaves must always set + flag to READING_NEXT so that the function never tries to read from + file. This is safe because the records are contiguous. There is no + need to read outside the cache. This condition is evaluated in the + variable 'parallel_flag' for quick reference. read_cache.file must + be >= 0 in every other case. + + RETURN + -1 end of file + 0 ok + > 0 error +*/ static int sort_get_next_record(MI_SORT_PARAM *sort_param) { int searching; + int parallel_flag; uint found_record,b_type,left_length; my_off_t pos; byte *to; @@ -2894,7 +3063,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) sort_param->max_pos=(sort_param->pos+=share->base.pack_reclength); if (*sort_param->record) { - if (param->calc_checksum) + if (sort_param->calc_checksum) param->glob_crc+= (info->checksum= mi_static_checksum(info,sort_param->record)); DBUG_RETURN(0); @@ -2909,6 +3078,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) LINT_INIT(to); pos=sort_param->pos; searching=(sort_param->fix_datafile && (param->testflag & T_EXTEND)); + parallel_flag= (sort_param->read_cache.file < 0) ? READING_NEXT : 0; for (;;) { found_record=block_info.second_read= 0; @@ -2939,7 +3109,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) (byte*) block_info.header,pos, MI_BLOCK_INFO_HEADER_LENGTH, (! found_record ? READING_NEXT : 0) | - READING_HEADER)) + parallel_flag | READING_HEADER)) { if (found_record) { @@ -3116,9 +3286,31 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) llstr(sort_param->start_recpos,llbuff)); goto try_next; } - if (_mi_read_cache(&sort_param->read_cache,to,block_info.filepos, - block_info.data_len, - (found_record == 1 ? READING_NEXT : 0))) + /* + Copy information that is already read. Avoid accessing data + below the cache start. This could happen if the header + streched over the end of the previous buffer contents. + */ + { + uint header_len= (uint) (block_info.filepos - pos); + uint prefetch_len= (MI_BLOCK_INFO_HEADER_LENGTH - header_len); + + if (prefetch_len > block_info.data_len) + prefetch_len= block_info.data_len; + if (prefetch_len) + { + memcpy(to, block_info.header + header_len, prefetch_len); + block_info.filepos+= prefetch_len; + block_info.data_len-= prefetch_len; + left_length-= prefetch_len; + to+= prefetch_len; + } + } + if (block_info.data_len && + _mi_read_cache(&sort_param->read_cache,to,block_info.filepos, + block_info.data_len, + (found_record == 1 ? READING_NEXT : 0) | + parallel_flag)) { mi_check_print_info(param, "Read error for block at: %s (error: %d); Skipped", @@ -3148,13 +3340,14 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) { if (sort_param->read_cache.error < 0) DBUG_RETURN(1); - if (info->s->calc_checksum) - info->checksum=mi_checksum(info,sort_param->record); + if (sort_param->calc_checksum) + info->checksum= mi_checksum(info, sort_param->record); if ((param->testflag & (T_EXTEND | T_REP)) || searching) { if (_mi_rec_check(info, sort_param->record, sort_param->rec_buff, sort_param->find_length, (param->testflag & T_QUICK) && + sort_param->calc_checksum && test(info->s->calc_checksum))) { mi_check_print_info(param,"Found wrong packed record at %s", @@ -3162,7 +3355,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) goto try_next; } } - if (param->calc_checksum) + if (sort_param->calc_checksum) param->glob_crc+= info->checksum; DBUG_RETURN(0); } @@ -3189,7 +3382,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) DBUG_RETURN(1); /* Something wrong with data */ } sort_param->start_recpos=sort_param->pos; - if (_mi_pack_get_block_info(info,&block_info,-1,sort_param->pos)) + if (_mi_pack_get_block_info(info, &sort_param->bit_buff, &block_info, + &sort_param->rec_buff, -1, sort_param->pos)) DBUG_RETURN(-1); if (!block_info.rec_len && sort_param->pos + MEMMAP_EXTRA_MARGIN == @@ -3213,15 +3407,14 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) llstr(sort_param->pos,llbuff)); continue; } - if (_mi_pack_rec_unpack(info,sort_param->record,sort_param->rec_buff, - block_info.rec_len)) + if (_mi_pack_rec_unpack(info, &sort_param->bit_buff, sort_param->record, + sort_param->rec_buff, block_info.rec_len)) { if (! searching) mi_check_print_info(param,"Found wrong record at %s", llstr(sort_param->pos,llbuff)); continue; } - info->checksum=mi_checksum(info,sort_param->record); if (!sort_param->fix_datafile) { sort_param->filepos=sort_param->pos; @@ -3231,8 +3424,9 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) sort_param->max_pos=(sort_param->pos=block_info.filepos+ block_info.rec_len); info->packed_length=block_info.rec_len; - if (param->calc_checksum) - param->glob_crc+= info->checksum; + if (sort_param->calc_checksum) + param->glob_crc+= (info->checksum= + mi_checksum(info, sort_param->record)); DBUG_RETURN(0); } } @@ -3240,7 +3434,20 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) } - /* Write record to new file */ +/* + Write record to new file. + + SYNOPSIS + sort_write_record() + sort_param Sort parameters. + + NOTE + This is only called by a master thread if parallel repair is used. + + RETURN + 0 OK + 1 Error +*/ int sort_write_record(MI_SORT_PARAM *sort_param) { @@ -3289,6 +3496,7 @@ int sort_write_record(MI_SORT_PARAM *sort_param) } from=sort_info->buff+ALIGN_SIZE(MI_MAX_DYN_BLOCK_HEADER); } + /* We can use info->checksum here as only one thread calls this. */ info->checksum=mi_checksum(info,sort_param->record); reclength=_mi_rec_pack(info,from,sort_param->record); flag=0; @@ -3698,7 +3906,7 @@ static int sort_delete_record(MI_SORT_PARAM *sort_param) DBUG_RETURN(1); } } - if (param->calc_checksum) + if (sort_param->calc_checksum) param->glob_crc-=(*info->s->calc_checksum)(info, sort_param->record); } error=flush_io_cache(&info->rec_cache) || (*info->s->delete_record)(info); |