diff options
author | istruewing@chilla.local <> | 2006-10-11 17:57:47 +0200 |
---|---|---|
committer | istruewing@chilla.local <> | 2006-10-11 17:57:47 +0200 |
commit | 35c94b6e2dd23458db7f3aa3da0d613493c6bb37 (patch) | |
tree | 1c500053f455af38dba9c498ee43b3b219adde3a /myisam | |
parent | 7683b297cfa3aa0227bb7d851328f85958fa8a67 (diff) | |
parent | 014c1c885e5f99238c321a39d8db75e7ded0b963 (diff) | |
download | mariadb-git-35c94b6e2dd23458db7f3aa3da0d613493c6bb37.tar.gz |
Merge bk-internal.mysql.com:/home/bk/mysql-5.0-engines
into chilla.local:/home/mydev/mysql-5.0-bug8283
Diffstat (limited to 'myisam')
-rw-r--r-- | myisam/mi_check.c | 306 | ||||
-rw-r--r-- | myisam/mi_open.c | 5 | ||||
-rw-r--r-- | myisam/mi_packrec.c | 76 | ||||
-rw-r--r-- | myisam/myisamdef.h | 29 | ||||
-rw-r--r-- | myisam/sort.c | 126 |
5 files changed, 400 insertions, 142 deletions
diff --git a/myisam/mi_check.c b/myisam/mi_check.c index 5a81d221d2a..06fdd3bff4c 100644 --- a/myisam/mi_check.c +++ b/myisam/mi_check.c @@ -16,6 +16,31 @@ /* Describe, check and repair of MyISAM tables */ +/* + About checksum calculation. + + There are two types of checksums. Table checksum and row checksum. + + Row checksum is an additional byte at the end of dynamic length + records. It must be calculated if the table is configured for them. + Otherwise they must not be used. The variable + MYISAM_SHARE::calc_checksum determines if row checksums are used. + MI_INFO::checksum is used as temporary storage during row handling. + For parallel repair we must assure that only one thread can use this + variable. There is no problem on the write side as this is done by one + thread only. But when checking a record after read this could go + wrong. But since all threads read through a common read buffer, it is + sufficient if only one thread checks it. + + Table checksum is an eight byte value in the header of the index file. + It can be calculated even if row checksums are not used. The variable + MI_CHECK::glob_crc is calculated over all records. + MI_SORT_PARAM::calc_checksum determines if this should be done. This + variable is not part of MI_CHECK because it must be set per thread for + parallel repair. The global glob_crc must be changed by one thread + only. And it is sufficient to calculate the checksum once only. +*/ + #include "ftdefs.h" #include <m_ctype.h> #include <stdarg.h> @@ -41,8 +66,7 @@ static int chk_index(MI_CHECK *param, MI_INFO *info,MI_KEYDEF *keyinfo, ha_checksum *key_checksum, uint level); static uint isam_key_length(MI_INFO *info,MI_KEYDEF *keyinfo); static ha_checksum calc_checksum(ha_rows count); -static int writekeys(MI_CHECK *param, MI_INFO *info,byte *buff, - my_off_t filepos); +static int writekeys(MI_SORT_PARAM *sort_param); static int sort_one_index(MI_CHECK *param, MI_INFO *info,MI_KEYDEF *keyinfo, my_off_t pagepos, File new_file); static int sort_key_read(MI_SORT_PARAM *sort_param,void *key); @@ -1102,7 +1126,8 @@ int chk_data_link(MI_CHECK *param, MI_INFO *info,int extend) goto err; start_recpos=pos; splits++; - VOID(_mi_pack_get_block_info(info,&block_info, -1, start_recpos)); + VOID(_mi_pack_get_block_info(info, &info->bit_buff, &block_info, + &info->rec_buff, -1, start_recpos)); pos=block_info.filepos+block_info.rec_len; if (block_info.rec_len < (uint) info->s->min_pack_length || block_info.rec_len > (uint) info->s->max_pack_length) @@ -1116,7 +1141,8 @@ int chk_data_link(MI_CHECK *param, MI_INFO *info,int extend) if (_mi_read_cache(¶m->read_cache,(byte*) info->rec_buff, block_info.filepos, block_info.rec_len, READING_NEXT)) goto err; - if (_mi_pack_rec_unpack(info,record,info->rec_buff,block_info.rec_len)) + if (_mi_pack_rec_unpack(info, &info->bit_buff, record, + info->rec_buff, block_info.rec_len)) { mi_check_print_error(param,"Found wrong record at %s", llstr(start_recpos,llbuff)); @@ -1400,7 +1426,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, info->state->empty=0; param->glob_crc=0; if (param->testflag & T_CALC_CHECKSUM) - param->calc_checksum=1; + sort_param.calc_checksum= 1; info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED); @@ -1429,7 +1455,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, /* Re-create all keys, which are set in key_map. */ while (!(error=sort_get_next_record(&sort_param))) { - if (writekeys(param,info,(byte*)sort_param.record,sort_param.filepos)) + if (writekeys(&sort_param)) { if (my_errno != HA_ERR_FOUND_DUPP_KEY) goto err; @@ -1574,11 +1600,13 @@ err: /* Uppate keyfile when doing repair */ -static int writekeys(MI_CHECK *param, register MI_INFO *info, byte *buff, - my_off_t filepos) +static int writekeys(MI_SORT_PARAM *sort_param) { register uint i; - uchar *key; + uchar *key; + MI_INFO *info= sort_param->sort_info->info; + byte *buff= sort_param->record; + my_off_t filepos= sort_param->filepos; DBUG_ENTER("writekeys"); key=info->lastkey+info->s->base.max_key_length; @@ -1632,8 +1660,8 @@ static int writekeys(MI_CHECK *param, register MI_INFO *info, byte *buff, } } /* Remove checksum that was added to glob_crc in sort_get_next_record */ - if (param->calc_checksum) - param->glob_crc-= info->checksum; + if (sort_param->calc_checksum) + sort_param->sort_info->param->glob_crc-= info->checksum; DBUG_PRINT("error",("errno: %d",my_errno)); DBUG_RETURN(-1); } /* writekeys */ @@ -2139,7 +2167,7 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, del=info->state->del; param->glob_crc=0; if (param->testflag & T_CALC_CHECKSUM) - param->calc_checksum=1; + sort_param.calc_checksum= 1; rec_per_key_part= param->rec_per_key_part; for (sort_param.key=0 ; sort_param.key < share->base.keys ; @@ -2201,7 +2229,8 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, param->retry_repair=1; goto err; } - param->calc_checksum=0; /* No need to calc glob_crc */ + /* No need to calculate checksum again. */ + sort_param.calc_checksum= 0; /* Set for next loop */ sort_info.max_records= (ha_rows) info->state->records; @@ -2364,6 +2393,28 @@ err: Each key is handled by a separate thread. TODO: make a number of threads a parameter + In parallel repair we use one thread per index. There are two modes: + + Quick + + Only the indexes are rebuilt. All threads share a read buffer. + Every thread that needs fresh data in the buffer enters the shared + cache lock. The last thread joining the lock reads the buffer from + the data file and wakes all other threads. + + Non-quick + + The data file is rebuilt and all indexes are rebuilt to point to + the new record positions. One thread is the master thread. It + reads from the old data file and writes to the new data file. It + also creates one of the indexes. The other threads read from a + buffer which is filled by the master. If they need fresh data, + they enter the shared cache lock. If the masters write buffer is + full, it flushes it to the new data file and enters the shared + cache lock too. When all threads joined in the lock, the master + copies its write buffer to the read buffer for the other threads + and wakes them. + RESULT 0 ok <>0 Error @@ -2386,6 +2437,7 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, ulong *rec_per_key_part; HA_KEYSEG *keyseg; char llbuff[22]; + IO_CACHE new_data_cache; /* For non-quick repair. */ IO_CACHE_SHARE io_share; SORT_INFO sort_info; ulonglong key_map=share->state.key_map; @@ -2407,19 +2459,55 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, if (info->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD)) param->testflag|=T_CALC_CHECKSUM; + /* + Quick repair (not touching data file, rebuilding indexes): + { + Read cache is (MI_CHECK *param)->read_cache using info->dfile. + } + + Non-quick repair (rebuilding data file and indexes): + { + Master thread: + + Read cache is (MI_CHECK *param)->read_cache using info->dfile. + Write cache is (MI_INFO *info)->rec_cache using new_file. + + Slave threads: + + Read cache is new_data_cache synced to master rec_cache. + + The final assignment of the filedescriptor for rec_cache is done + after the cache creation. + + Don't check file size on new_data_cache, as the resulting file size + is not known yet. + + As rec_cache and new_data_cache are synced, write_buffer_length is + used for the read cache 'new_data_cache'. Both start at the same + position 'new_header_length'. + } + */ + DBUG_PRINT("info", ("is quick repair: %d", rep_quick)); bzero((char*)&sort_info,sizeof(sort_info)); + /* Initialize pthread structures before goto err. */ + pthread_mutex_init(&sort_info.mutex, MY_MUTEX_INIT_FAST); + pthread_cond_init(&sort_info.cond, 0); + if (!(sort_info.key_block= - alloc_key_blocks(param, - (uint) param->sort_key_blocks, - share->base.max_key_block_length)) - || init_io_cache(¶m->read_cache,info->dfile, - (uint) param->read_buffer_length, - READ_CACHE,share->pack.header_length,1,MYF(MY_WME)) || - (! rep_quick && - init_io_cache(&info->rec_cache,info->dfile, - (uint) param->write_buffer_length, - WRITE_CACHE,new_header_length,1, - MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw))) + alloc_key_blocks(param, (uint) param->sort_key_blocks, + share->base.max_key_block_length)) || + init_io_cache(¶m->read_cache, info->dfile, + (uint) param->read_buffer_length, + READ_CACHE, share->pack.header_length, 1, MYF(MY_WME)) || + (!rep_quick && + (init_io_cache(&info->rec_cache, info->dfile, + (uint) param->write_buffer_length, + WRITE_CACHE, new_header_length, 1, + MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw) || + init_io_cache(&new_data_cache, -1, + (uint) param->write_buffer_length, + READ_CACHE, new_header_length, 1, + MYF(MY_WME | MY_DONT_CHECK_FILESIZE))))) goto err; sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks; info->opt_flag|=WRITE_CACHE_USED; @@ -2510,8 +2598,6 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, del=info->state->del; param->glob_crc=0; - if (param->testflag & T_CALC_CHECKSUM) - param->calc_checksum=1; if (!(sort_param=(MI_SORT_PARAM *) my_malloc((uint) share->base.keys * @@ -2561,6 +2647,7 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, sort_param[i].sort_info=&sort_info; sort_param[i].master=0; sort_param[i].fix_datafile=0; + sort_param[i].calc_checksum= 0; sort_param[i].filepos=new_header_length; sort_param[i].max_pos=sort_param[i].pos=share->pack.header_length; @@ -2597,19 +2684,45 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, sort_info.total_keys=i; sort_param[0].master= 1; sort_param[0].fix_datafile= (my_bool)(! rep_quick); + sort_param[0].calc_checksum= test(param->testflag & T_CALC_CHECKSUM); sort_info.got_error=0; - pthread_mutex_init(&sort_info.mutex, MY_MUTEX_INIT_FAST); - pthread_cond_init(&sort_info.cond, 0); pthread_mutex_lock(&sort_info.mutex); - init_io_cache_share(¶m->read_cache, &io_share, i); + /* + Initialize the I/O cache share for use with the read caches and, in + case of non-quick repair, the write cache. When all threads join on + the cache lock, the writer copies the write cache contents to the + read caches. + */ + if (i > 1) + { + if (rep_quick) + init_io_cache_share(¶m->read_cache, &io_share, NULL, i); + else + init_io_cache_share(&new_data_cache, &io_share, &info->rec_cache, i); + } + else + io_share.total_threads= 0; /* share not used */ + (void) pthread_attr_init(&thr_attr); (void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED); for (i=0 ; i < sort_info.total_keys ; i++) { - sort_param[i].read_cache=param->read_cache; + /* + Copy the properly initialized IO_CACHE structure so that every + thread has its own copy. In quick mode param->read_cache is shared + for use by all threads. In non-quick mode all threads but the + first copy the shared new_data_cache, which is synchronized to the + write cache of the first thread. The first thread copies + param->read_cache, which is not shared. + */ + sort_param[i].read_cache= ((rep_quick || !i) ? param->read_cache : + new_data_cache); + DBUG_PRINT("io_cache_share", ("thread: %u read_cache: 0x%lx", + i, (long) &sort_param[i].read_cache)); + /* two approaches: the same amount of memory for each thread or the memory for the same number of keys for each thread... @@ -2627,7 +2740,10 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, (void *) (sort_param+i))) { mi_check_print_error(param,"Cannot start a repair thread"); - remove_io_thread(¶m->read_cache); + /* Cleanup: Detach from the share. Avoid others to be blocked. */ + if (io_share.total_threads) + remove_io_thread(&sort_param[i].read_cache); + DBUG_PRINT("error", ("Cannot start a repair thread")); sort_info.got_error=1; } else @@ -2649,6 +2765,11 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, if (sort_param[0].fix_datafile) { + /* + Append some nuls to the end of a memory mapped file. Destroy the + write cache. The master thread did already detach from the share + by remove_io_thread() in sort.c:thr_find_all_keys(). + */ if (write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache)) goto err; if (param->testflag & T_SAFE_REPAIR) @@ -2664,8 +2785,14 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, sort_param->filepos; /* Only whole records */ share->state.version=(ulong) time((time_t*) 0); + + /* + Exchange the data file descriptor of the table, so that we use the + new file from now on. + */ my_close(info->dfile,MYF(0)); info->dfile=new_file; + share->data_file_type=sort_info.new_data_file_type; share->pack.header_length=(ulong) new_header_length; } @@ -2720,7 +2847,20 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, err: got_error|= flush_blocks(param, share->key_cache, share->kfile); + /* + Destroy the write cache. The master thread did already detach from + the share by remove_io_thread() or it was not yet started (if the + error happend before creating the thread). + */ VOID(end_io_cache(&info->rec_cache)); + /* + Destroy the new data cache in case of non-quick repair. All slave + threads did either detach from the share by remove_io_thread() + already or they were not yet started (if the error happend before + creating the threads). + */ + if (!rep_quick) + VOID(end_io_cache(&new_data_cache)); if (!got_error) { /* Replace the actual file with the temporary file */ @@ -2851,12 +2991,41 @@ static int sort_ft_key_read(MI_SORT_PARAM *sort_param, void *key) } /* sort_ft_key_read */ - /* Read next record from file using parameters in sort_info */ - /* Return -1 if end of file, 0 if ok and > 0 if error */ +/* + Read next record from file using parameters in sort_info. + + SYNOPSIS + sort_get_next_record() + sort_param Information about and for the sort process + + NOTE + + Dynamic Records With Non-Quick Parallel Repair + + For non-quick parallel repair we use a synchronized read/write + cache. This means that one thread is the master who fixes the data + file by reading each record from the old data file and writing it + to the new data file. By doing this the records in the new data + file are written contiguously. Whenever the write buffer is full, + it is copied to the read buffer. The slaves read from the read + buffer, which is not associated with a file. Thus read_cache.file + is -1. When using _mi_read_cache(), the slaves must always set + flag to READING_NEXT so that the function never tries to read from + file. This is safe because the records are contiguous. There is no + need to read outside the cache. This condition is evaluated in the + variable 'parallel_flag' for quick reference. read_cache.file must + be >= 0 in every other case. + + RETURN + -1 end of file + 0 ok + > 0 error +*/ static int sort_get_next_record(MI_SORT_PARAM *sort_param) { int searching; + int parallel_flag; uint found_record,b_type,left_length; my_off_t pos; byte *to; @@ -2894,7 +3063,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) sort_param->max_pos=(sort_param->pos+=share->base.pack_reclength); if (*sort_param->record) { - if (param->calc_checksum) + if (sort_param->calc_checksum) param->glob_crc+= (info->checksum= mi_static_checksum(info,sort_param->record)); DBUG_RETURN(0); @@ -2909,6 +3078,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) LINT_INIT(to); pos=sort_param->pos; searching=(sort_param->fix_datafile && (param->testflag & T_EXTEND)); + parallel_flag= (sort_param->read_cache.file < 0) ? READING_NEXT : 0; for (;;) { found_record=block_info.second_read= 0; @@ -2939,7 +3109,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) (byte*) block_info.header,pos, MI_BLOCK_INFO_HEADER_LENGTH, (! found_record ? READING_NEXT : 0) | - READING_HEADER)) + parallel_flag | READING_HEADER)) { if (found_record) { @@ -3116,9 +3286,31 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) llstr(sort_param->start_recpos,llbuff)); goto try_next; } - if (_mi_read_cache(&sort_param->read_cache,to,block_info.filepos, - block_info.data_len, - (found_record == 1 ? READING_NEXT : 0))) + /* + Copy information that is already read. Avoid accessing data + below the cache start. This could happen if the header + streched over the end of the previous buffer contents. + */ + { + uint header_len= (uint) (block_info.filepos - pos); + uint prefetch_len= (MI_BLOCK_INFO_HEADER_LENGTH - header_len); + + if (prefetch_len > block_info.data_len) + prefetch_len= block_info.data_len; + if (prefetch_len) + { + memcpy(to, block_info.header + header_len, prefetch_len); + block_info.filepos+= prefetch_len; + block_info.data_len-= prefetch_len; + left_length-= prefetch_len; + to+= prefetch_len; + } + } + if (block_info.data_len && + _mi_read_cache(&sort_param->read_cache,to,block_info.filepos, + block_info.data_len, + (found_record == 1 ? READING_NEXT : 0) | + parallel_flag)) { mi_check_print_info(param, "Read error for block at: %s (error: %d); Skipped", @@ -3148,13 +3340,14 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) { if (sort_param->read_cache.error < 0) DBUG_RETURN(1); - if (info->s->calc_checksum) - info->checksum=mi_checksum(info,sort_param->record); + if (sort_param->calc_checksum) + info->checksum= mi_checksum(info, sort_param->record); if ((param->testflag & (T_EXTEND | T_REP)) || searching) { if (_mi_rec_check(info, sort_param->record, sort_param->rec_buff, sort_param->find_length, (param->testflag & T_QUICK) && + sort_param->calc_checksum && test(info->s->calc_checksum))) { mi_check_print_info(param,"Found wrong packed record at %s", @@ -3162,7 +3355,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) goto try_next; } } - if (param->calc_checksum) + if (sort_param->calc_checksum) param->glob_crc+= info->checksum; DBUG_RETURN(0); } @@ -3189,7 +3382,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) DBUG_RETURN(1); /* Something wrong with data */ } sort_param->start_recpos=sort_param->pos; - if (_mi_pack_get_block_info(info,&block_info,-1,sort_param->pos)) + if (_mi_pack_get_block_info(info, &sort_param->bit_buff, &block_info, + &sort_param->rec_buff, -1, sort_param->pos)) DBUG_RETURN(-1); if (!block_info.rec_len && sort_param->pos + MEMMAP_EXTRA_MARGIN == @@ -3213,15 +3407,14 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) llstr(sort_param->pos,llbuff)); continue; } - if (_mi_pack_rec_unpack(info,sort_param->record,sort_param->rec_buff, - block_info.rec_len)) + if (_mi_pack_rec_unpack(info, &sort_param->bit_buff, sort_param->record, + sort_param->rec_buff, block_info.rec_len)) { if (! searching) mi_check_print_info(param,"Found wrong record at %s", llstr(sort_param->pos,llbuff)); continue; } - info->checksum=mi_checksum(info,sort_param->record); if (!sort_param->fix_datafile) { sort_param->filepos=sort_param->pos; @@ -3231,8 +3424,9 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) sort_param->max_pos=(sort_param->pos=block_info.filepos+ block_info.rec_len); info->packed_length=block_info.rec_len; - if (param->calc_checksum) - param->glob_crc+= info->checksum; + if (sort_param->calc_checksum) + param->glob_crc+= (info->checksum= + mi_checksum(info, sort_param->record)); DBUG_RETURN(0); } } @@ -3240,7 +3434,20 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) } - /* Write record to new file */ +/* + Write record to new file. + + SYNOPSIS + sort_write_record() + sort_param Sort parameters. + + NOTE + This is only called by a master thread if parallel repair is used. + + RETURN + 0 OK + 1 Error +*/ int sort_write_record(MI_SORT_PARAM *sort_param) { @@ -3289,6 +3496,7 @@ int sort_write_record(MI_SORT_PARAM *sort_param) } from=sort_info->buff+ALIGN_SIZE(MI_MAX_DYN_BLOCK_HEADER); } + /* We can use info->checksum here as only one thread calls this. */ info->checksum=mi_checksum(info,sort_param->record); reclength=_mi_rec_pack(info,from,sort_param->record); flag=0; @@ -3698,7 +3906,7 @@ static int sort_delete_record(MI_SORT_PARAM *sort_param) DBUG_RETURN(1); } } - if (param->calc_checksum) + if (sort_param->calc_checksum) param->glob_crc-=(*info->s->calc_checksum)(info, sort_param->record); } error=flush_io_cache(&info->rec_cache) || (*info->s->delete_record)(info); diff --git a/myisam/mi_open.c b/myisam/mi_open.c index 955d55cf765..a52ab5aad1f 100644 --- a/myisam/mi_open.c +++ b/myisam/mi_open.c @@ -208,7 +208,10 @@ MI_INFO *mi_open(const char *name, int mode, uint open_flags) ((open_flags & HA_OPEN_ABORT_IF_CRASHED) && (my_disable_locking && share->state.open_count)))) { - DBUG_PRINT("error",("Table is marked as crashed")); + DBUG_PRINT("error",("Table is marked as crashed. open_flags: %u " + "changed: %u open_count: %u !locking: %d", + open_flags, share->state.changed, + share->state.open_count, my_disable_locking)); my_errno=((share->state.changed & STATE_CRASHED_ON_REPAIR) ? HA_ERR_CRASHED_ON_REPAIR : HA_ERR_CRASHED_ON_USAGE); goto err; diff --git a/myisam/mi_packrec.c b/myisam/mi_packrec.c index c9653eadc28..13898df0466 100644 --- a/myisam/mi_packrec.c +++ b/myisam/mi_packrec.c @@ -103,7 +103,8 @@ static uint fill_and_get_bits(MI_BIT_BUFF *bit_buff,uint count); static void fill_buffer(MI_BIT_BUFF *bit_buff); static uint max_bit(uint value); #ifdef HAVE_MMAP -static uchar *_mi_mempack_get_block_info(MI_INFO *myisam,MI_BLOCK_INFO *info, +static uchar *_mi_mempack_get_block_info(MI_INFO *myisam, MI_BIT_BUFF *bit_buff, + MI_BLOCK_INFO *info, byte **rec_buff_p, uchar *header); #endif @@ -449,13 +450,15 @@ int _mi_read_pack_record(MI_INFO *info, my_off_t filepos, byte *buf) DBUG_RETURN(-1); /* _search() didn't find record */ file=info->dfile; - if (_mi_pack_get_block_info(info, &block_info, file, filepos)) + if (_mi_pack_get_block_info(info, &info->bit_buff, &block_info, + &info->rec_buff, file, filepos)) goto err; if (my_read(file,(byte*) info->rec_buff + block_info.offset , block_info.rec_len - block_info.offset, MYF(MY_NABP))) goto panic; info->update|= HA_STATE_AKTIV; - DBUG_RETURN(_mi_pack_rec_unpack(info,buf,info->rec_buff,block_info.rec_len)); + DBUG_RETURN(_mi_pack_rec_unpack(info, &info->bit_buff, buf, + info->rec_buff, block_info.rec_len)); panic: my_errno=HA_ERR_WRONG_IN_RECORD; err: @@ -464,8 +467,8 @@ err: -int _mi_pack_rec_unpack(register MI_INFO *info, register byte *to, byte *from, - ulong reclength) +int _mi_pack_rec_unpack(register MI_INFO *info, MI_BIT_BUFF *bit_buff, + register byte *to, byte *from, ulong reclength) { byte *end_field; reg3 MI_COLUMNDEF *end; @@ -473,18 +476,18 @@ int _mi_pack_rec_unpack(register MI_INFO *info, register byte *to, byte *from, MYISAM_SHARE *share=info->s; DBUG_ENTER("_mi_pack_rec_unpack"); - init_bit_buffer(&info->bit_buff, (uchar*) from,reclength); + init_bit_buffer(bit_buff, (uchar*) from, reclength); for (current_field=share->rec, end=current_field+share->base.fields ; current_field < end ; current_field++,to=end_field) { end_field=to+current_field->length; - (*current_field->unpack)(current_field,&info->bit_buff,(uchar*) to, + (*current_field->unpack)(current_field, bit_buff, (uchar*) to, (uchar*) end_field); } - if (! info->bit_buff.error && - info->bit_buff.pos - info->bit_buff.bits/8 == info->bit_buff.end) + if (!bit_buff->error && + bit_buff->pos - bit_buff->bits / 8 == bit_buff->end) DBUG_RETURN(0); info->update&= ~HA_STATE_AKTIV; DBUG_RETURN(my_errno=HA_ERR_WRONG_IN_RECORD); @@ -1015,13 +1018,16 @@ int _mi_read_rnd_pack_record(MI_INFO *info, byte *buf, if (info->opt_flag & READ_CACHE_USED) { - if (_mi_read_cache(&info->rec_cache,(byte*) block_info.header,filepos, - share->pack.ref_length, skip_deleted_blocks)) + if (_mi_read_cache(&info->rec_cache, (byte*) block_info.header, + filepos, share->pack.ref_length, + skip_deleted_blocks ? READING_NEXT : 0)) goto err; - b_type=_mi_pack_get_block_info(info,&block_info,-1, filepos); + b_type=_mi_pack_get_block_info(info, &info->bit_buff, &block_info, + &info->rec_buff, -1, filepos); } else - b_type=_mi_pack_get_block_info(info,&block_info,info->dfile,filepos); + b_type=_mi_pack_get_block_info(info, &info->bit_buff, &block_info, + &info->rec_buff, info->dfile, filepos); if (b_type) goto err; /* Error code is already set */ #ifndef DBUG_OFF @@ -1034,9 +1040,9 @@ int _mi_read_rnd_pack_record(MI_INFO *info, byte *buf, if (info->opt_flag & READ_CACHE_USED) { - if (_mi_read_cache(&info->rec_cache,(byte*) info->rec_buff, - block_info.filepos, block_info.rec_len, - skip_deleted_blocks)) + if (_mi_read_cache(&info->rec_cache, (byte*) info->rec_buff, + block_info.filepos, block_info.rec_len, + skip_deleted_blocks ? READING_NEXT : 0)) goto err; } else @@ -1051,8 +1057,8 @@ int _mi_read_rnd_pack_record(MI_INFO *info, byte *buf, info->nextpos=block_info.filepos+block_info.rec_len; info->update|= HA_STATE_AKTIV | HA_STATE_KEY_CHANGED; - DBUG_RETURN (_mi_pack_rec_unpack(info,buf,info->rec_buff, - block_info.rec_len)); + DBUG_RETURN (_mi_pack_rec_unpack(info, &info->bit_buff, buf, + info->rec_buff, block_info.rec_len)); err: DBUG_RETURN(my_errno); } @@ -1060,8 +1066,9 @@ int _mi_read_rnd_pack_record(MI_INFO *info, byte *buf, /* Read and process header from a huff-record-file */ -uint _mi_pack_get_block_info(MI_INFO *myisam, MI_BLOCK_INFO *info, File file, - my_off_t filepos) +uint _mi_pack_get_block_info(MI_INFO *myisam, MI_BIT_BUFF *bit_buff, + MI_BLOCK_INFO *info, byte **rec_buff_p, + File file, my_off_t filepos) { uchar *header=info->header; uint head_length,ref_length; @@ -1086,17 +1093,17 @@ uint _mi_pack_get_block_info(MI_INFO *myisam, MI_BLOCK_INFO *info, File file, head_length+= read_pack_length((uint) myisam->s->pack.version, header + head_length, &info->blob_len); if (!(mi_alloc_rec_buff(myisam,info->rec_len + info->blob_len, - &myisam->rec_buff))) + rec_buff_p))) return BLOCK_FATAL_ERROR; /* not enough memory */ - myisam->bit_buff.blob_pos=(uchar*) myisam->rec_buff+info->rec_len; - myisam->bit_buff.blob_end= myisam->bit_buff.blob_pos+info->blob_len; + bit_buff->blob_pos= (uchar*) *rec_buff_p + info->rec_len; + bit_buff->blob_end= bit_buff->blob_pos + info->blob_len; myisam->blob_length=info->blob_len; } info->filepos=filepos+head_length; if (file > 0) { info->offset=min(info->rec_len, ref_length - head_length); - memcpy(myisam->rec_buff, header+head_length, info->offset); + memcpy(*rec_buff_p, header + head_length, info->offset); } return 0; } @@ -1237,7 +1244,8 @@ void _mi_unmap_file(MI_INFO *info) } -static uchar *_mi_mempack_get_block_info(MI_INFO *myisam,MI_BLOCK_INFO *info, +static uchar *_mi_mempack_get_block_info(MI_INFO *myisam, MI_BIT_BUFF *bit_buff, + MI_BLOCK_INFO *info, byte **rec_buff_p, uchar *header) { header+= read_pack_length((uint) myisam->s->pack.version, header, @@ -1248,10 +1256,10 @@ static uchar *_mi_mempack_get_block_info(MI_INFO *myisam,MI_BLOCK_INFO *info, &info->blob_len); /* mi_alloc_rec_buff sets my_errno on error */ if (!(mi_alloc_rec_buff(myisam, info->blob_len, - &myisam->rec_buff))) + rec_buff_p))) return 0; /* not enough memory */ - myisam->bit_buff.blob_pos=(uchar*) myisam->rec_buff; - myisam->bit_buff.blob_end= (uchar*) myisam->rec_buff + info->blob_len; + bit_buff->blob_pos= (uchar*) *rec_buff_p; + bit_buff->blob_end= (uchar*) *rec_buff_p + info->blob_len; } return header; } @@ -1267,11 +1275,13 @@ static int _mi_read_mempack_record(MI_INFO *info, my_off_t filepos, byte *buf) if (filepos == HA_OFFSET_ERROR) DBUG_RETURN(-1); /* _search() didn't find record */ - if (!(pos= (byte*) _mi_mempack_get_block_info(info,&block_info, + if (!(pos= (byte*) _mi_mempack_get_block_info(info, &info->bit_buff, + &block_info, &info->rec_buff, (uchar*) share->file_map+ filepos))) DBUG_RETURN(-1); - DBUG_RETURN(_mi_pack_rec_unpack(info, buf, pos, block_info.rec_len)); + DBUG_RETURN(_mi_pack_rec_unpack(info, &info->bit_buff, buf, + pos, block_info.rec_len)); } @@ -1291,7 +1301,8 @@ static int _mi_read_rnd_mempack_record(MI_INFO *info, byte *buf, my_errno=HA_ERR_END_OF_FILE; goto err; } - if (!(pos= (byte*) _mi_mempack_get_block_info(info,&block_info, + if (!(pos= (byte*) _mi_mempack_get_block_info(info, &info->bit_buff, + &block_info, &info->rec_buff, (uchar*) (start=share->file_map+ filepos)))) @@ -1308,7 +1319,8 @@ static int _mi_read_rnd_mempack_record(MI_INFO *info, byte *buf, info->nextpos=filepos+(uint) (pos-start)+block_info.rec_len; info->update|= HA_STATE_AKTIV | HA_STATE_KEY_CHANGED; - DBUG_RETURN (_mi_pack_rec_unpack(info,buf,pos, block_info.rec_len)); + DBUG_RETURN (_mi_pack_rec_unpack(info, &info->bit_buff, buf, + pos, block_info.rec_len)); err: DBUG_RETURN(my_errno); } diff --git a/myisam/myisamdef.h b/myisam/myisamdef.h index ea45915a088..12e2c8e4bec 100644 --- a/myisam/myisamdef.h +++ b/myisam/myisamdef.h @@ -76,6 +76,7 @@ typedef struct st_mi_state_info ulong sec_index_changed; /* Updated when new sec_index */ ulong sec_index_used; /* which extra index are in use */ ulonglong key_map; /* Which keys are in use */ + ha_checksum checksum; /* Table checksum */ ulong version; /* timestamp of create */ time_t create_time; /* Time when created database */ time_t recover_time; /* Time for last recover */ @@ -176,6 +177,7 @@ typedef struct st_mi_isam_share { /* Shared between opens */ int (*delete_record)(struct st_myisam_info*); int (*read_rnd)(struct st_myisam_info*, byte*, my_off_t, my_bool); int (*compare_record)(struct st_myisam_info*, const byte *); + /* Function to use for a row checksum. */ ha_checksum (*calc_checksum)(struct st_myisam_info*, const byte *); int (*compare_unique)(struct st_myisam_info*, MI_UNIQUEDEF *, const byte *record, my_off_t pos); @@ -249,7 +251,7 @@ struct st_myisam_info { my_off_t last_keypage; /* Last key page read */ my_off_t last_search_keypage; /* Last keypage when searching */ my_off_t dupp_key_pos; - ha_checksum checksum; + ha_checksum checksum; /* Temp storage for row checksum */ /* QQ: the folloing two xxx_length fields should be removed, as they are not compatible with parallel repair */ ulong packed_length,blob_length; /* Length of found, packed record */ @@ -301,8 +303,9 @@ typedef struct st_mi_sort_param pthread_t thr; IO_CACHE read_cache, tempfile, tempfile_for_exceptions; DYNAMIC_ARRAY buffpek; - - /* + MI_BIT_BUFF bit_buff; /* For parallel repair of packrec. */ + + /* The next two are used to collect statistics, see update_key_parts for description. */ @@ -313,6 +316,7 @@ typedef struct st_mi_sort_param uint key, key_length,real_key_length,sortbuff_size; uint maxbuffers, keys, find_length, sort_keys_length; my_bool fix_datafile, master; + my_bool calc_checksum; /* calculate table checksum */ MI_KEYDEF *keyinfo; HA_KEYSEG *seg; SORT_INFO *sort_info; @@ -365,8 +369,15 @@ typedef struct st_mi_sort_param #define mi_putint(x,y,nod) { uint16 boh=(nod ? (uint16) 32768 : 0) + (uint16) (y);\ mi_int2store(x,boh); } #define mi_test_if_nod(x) (x[0] & 128 ? info->s->base.key_reflength : 0) -#define mi_mark_crashed(x) (x)->s->state.changed|=STATE_CRASHED -#define mi_mark_crashed_on_repair(x) { (x)->s->state.changed|=STATE_CRASHED|STATE_CRASHED_ON_REPAIR ; (x)->update|= HA_STATE_CHANGED; } +#define mi_mark_crashed(x) do{(x)->s->state.changed|= STATE_CRASHED; \ + DBUG_PRINT("error", ("Marked table crashed")); \ + }while(0) +#define mi_mark_crashed_on_repair(x) do{(x)->s->state.changed|= \ + STATE_CRASHED|STATE_CRASHED_ON_REPAIR; \ + (x)->update|= HA_STATE_CHANGED; \ + DBUG_PRINT("error", \ + ("Marked table crashed")); \ + }while(0) #define mi_is_crashed(x) ((x)->s->state.changed & STATE_CRASHED) #define mi_is_crashed_on_repair(x) ((x)->s->state.changed & STATE_CRASHED_ON_REPAIR) #define mi_print_error(SHARE, ERRNO) \ @@ -606,8 +617,8 @@ extern void _mi_print_key(FILE *stream,HA_KEYSEG *keyseg,const uchar *key, extern my_bool _mi_read_pack_info(MI_INFO *info,pbool fix_keys); extern int _mi_read_pack_record(MI_INFO *info,my_off_t filepos,byte *buf); extern int _mi_read_rnd_pack_record(MI_INFO*, byte *,my_off_t, my_bool); -extern int _mi_pack_rec_unpack(MI_INFO *info,byte *to,byte *from, - ulong reclength); +extern int _mi_pack_rec_unpack(MI_INFO *info, MI_BIT_BUFF *bit_buff, + byte *to, byte *from, ulong reclength); extern ulonglong mi_safe_mul(ulonglong a,ulonglong b); extern int _mi_ft_update(MI_INFO *info, uint keynr, byte *keybuf, const byte *oldrec, const byte *newrec, my_off_t pos); @@ -672,7 +683,9 @@ extern "C" { extern uint _mi_get_block_info(MI_BLOCK_INFO *,File, my_off_t); extern uint _mi_rec_pack(MI_INFO *info,byte *to,const byte *from); -extern uint _mi_pack_get_block_info(MI_INFO *, MI_BLOCK_INFO *, File, my_off_t); +extern uint _mi_pack_get_block_info(MI_INFO *myisam, MI_BIT_BUFF *bit_buff, + MI_BLOCK_INFO *info, byte **rec_buff_p, + File file, my_off_t filepos); extern void _my_store_blob_length(byte *pos,uint pack_length,uint length); extern void _myisam_log(enum myisam_log_commands command,MI_INFO *info, const byte *buffert,uint length); diff --git a/myisam/sort.c b/myisam/sort.c index 00fbfe768dc..8ee9cd9c31b 100644 --- a/myisam/sort.c +++ b/myisam/sort.c @@ -309,7 +309,7 @@ static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info, uint keys, pthread_handler_t thr_find_all_keys(void *arg) { - MI_SORT_PARAM *info= (MI_SORT_PARAM*) arg; + MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg; int error; uint memavl,old_memavl,keys,sort_length; uint idx, maxbuffer; @@ -321,32 +321,34 @@ pthread_handler_t thr_find_all_keys(void *arg) if (my_thread_init()) goto err; - if (info->sort_info->got_error) + DBUG_ENTER("thr_find_all_keys"); + DBUG_PRINT("enter", ("master: %d", sort_param->master)); + if (sort_param->sort_info->got_error) goto err; - if (info->keyinfo->flag && HA_VAR_LENGTH_KEY) + if (sort_param->keyinfo->flag && HA_VAR_LENGTH_KEY) { - info->write_keys=write_keys_varlen; - info->read_to_buffer=read_to_buffer_varlen; - info->write_key=write_merge_key_varlen; + sort_param->write_keys= write_keys_varlen; + sort_param->read_to_buffer= read_to_buffer_varlen; + sort_param->write_key= write_merge_key_varlen; } else { - info->write_keys=write_keys; - info->read_to_buffer=read_to_buffer; - info->write_key=write_merge_key; + sort_param->write_keys= write_keys; + sort_param->read_to_buffer= read_to_buffer; + sort_param->write_key= write_merge_key; } - my_b_clear(&info->tempfile); - my_b_clear(&info->tempfile_for_exceptions); - bzero((char*) &info->buffpek,sizeof(info->buffpek)); - bzero((char*) &info->unique, sizeof(info->unique)); + my_b_clear(&sort_param->tempfile); + my_b_clear(&sort_param->tempfile_for_exceptions); + bzero((char*) &sort_param->buffpek, sizeof(sort_param->buffpek)); + bzero((char*) &sort_param->unique, sizeof(sort_param->unique)); sort_keys= (uchar **) NULL; - memavl=max(info->sortbuff_size, MIN_SORT_MEMORY); - idx= info->sort_info->max_records; - sort_length= info->key_length; - maxbuffer= 1; + memavl= max(sort_param->sortbuff_size, MIN_SORT_MEMORY); + idx= sort_param->sort_info->max_records; + sort_length= sort_param->key_length; + maxbuffer= 1; while (memavl >= MIN_SORT_MEMORY) { @@ -363,18 +365,19 @@ pthread_handler_t thr_find_all_keys(void *arg) (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/ (sort_length+sizeof(char*))) <= 1) { - mi_check_print_error(info->sort_info->param, + mi_check_print_error(sort_param->sort_info->param, "sort_buffer_size is to small"); goto err; } } while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr); } - if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+ - ((info->keyinfo->flag & HA_FULLTEXT) ? - HA_FT_MAXBYTELEN : 0), MYF(0)))) + if ((sort_keys= (uchar**) + my_malloc(keys*(sort_length+sizeof(char*))+ + ((sort_param->keyinfo->flag & HA_FULLTEXT) ? + HA_FT_MAXBYTELEN : 0), MYF(0)))) { - if (my_init_dynamic_array(&info->buffpek, sizeof(BUFFPEK), + if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK), maxbuffer, maxbuffer/2)) { my_free((gptr) sort_keys,MYF(0)); @@ -389,69 +392,87 @@ pthread_handler_t thr_find_all_keys(void *arg) } if (memavl < MIN_SORT_MEMORY) { - mi_check_print_error(info->sort_info->param,"Sort buffer to small"); /* purecov: tested */ + mi_check_print_error(sort_param->sort_info->param, "Sort buffer too small"); goto err; /* purecov: tested */ } - if (info->sort_info->param->testflag & T_VERBOSE) - printf("Key %d - Allocating buffer for %d keys\n",info->key+1,keys); - info->sort_keys=sort_keys; + if (sort_param->sort_info->param->testflag & T_VERBOSE) + printf("Key %d - Allocating buffer for %d keys\n", + sort_param->key + 1, keys); + sort_param->sort_keys= sort_keys; idx=error=0; sort_keys[0]=(uchar*) (sort_keys+keys); - while (!(error=info->sort_info->got_error) && - !(error=(*info->key_read)(info,sort_keys[idx]))) + DBUG_PRINT("info", ("reading keys")); + while (!(error= sort_param->sort_info->got_error) && + !(error= (*sort_param->key_read)(sort_param, sort_keys[idx]))) { - if (info->real_key_length > info->key_length) + if (sort_param->real_key_length > sort_param->key_length) { - if (write_key(info,sort_keys[idx], &info->tempfile_for_exceptions)) + if (write_key(sort_param, sort_keys[idx], + &sort_param->tempfile_for_exceptions)) goto err; continue; } if (++idx == keys) { - if (info->write_keys(info,sort_keys,idx-1, - (BUFFPEK *)alloc_dynamic(&info->buffpek), - &info->tempfile)) + if (sort_param->write_keys(sort_param, sort_keys, idx - 1, + (BUFFPEK*) alloc_dynamic(&sort_param->buffpek), + &sort_param->tempfile)) goto err; sort_keys[0]=(uchar*) (sort_keys+keys); - memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length); + memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length); idx=1; } - sort_keys[idx]=sort_keys[idx-1]+info->key_length; + sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length; } if (error > 0) goto err; - if (info->buffpek.elements) + if (sort_param->buffpek.elements) { - if (info->write_keys(info,sort_keys, idx, - (BUFFPEK *) alloc_dynamic(&info->buffpek), &info->tempfile)) + if (sort_param->write_keys(sort_param, sort_keys, idx, + (BUFFPEK*) alloc_dynamic(&sort_param->buffpek), + &sort_param->tempfile)) goto err; - info->keys=(info->buffpek.elements-1)*(keys-1)+idx; + sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx; } else - info->keys=idx; + sort_param->keys= idx; - info->sort_keys_length=keys; + sort_param->sort_keys_length= keys; goto ok; err: - info->sort_info->got_error=1; /* no need to protect this with a mutex */ + DBUG_PRINT("error", ("got some error")); + sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */ if (sort_keys) my_free((gptr) sort_keys,MYF(0)); - info->sort_keys=0; - delete_dynamic(& info->buffpek); - close_cached_file(&info->tempfile); - close_cached_file(&info->tempfile_for_exceptions); + sort_param->sort_keys= 0; + delete_dynamic(& sort_param->buffpek); + close_cached_file(&sort_param->tempfile); + close_cached_file(&sort_param->tempfile_for_exceptions); ok: - remove_io_thread(&info->read_cache); - pthread_mutex_lock(&info->sort_info->mutex); - info->sort_info->threads_running--; - pthread_cond_signal(&info->sort_info->cond); - pthread_mutex_unlock(&info->sort_info->mutex); + /* + Detach from the share if the writer is involved. Avoid others to + be blocked. This includes a flush of the write buffer. This will + also indicate EOF to the readers. + */ + if (sort_param->sort_info->info->rec_cache.share) + remove_io_thread(&sort_param->sort_info->info->rec_cache); + + /* Readers detach from the share if any. Avoid others to be blocked. */ + if (sort_param->read_cache.share) + remove_io_thread(&sort_param->read_cache); + + pthread_mutex_lock(&sort_param->sort_info->mutex); + if (!--sort_param->sort_info->threads_running) + pthread_cond_signal(&sort_param->sort_info->cond); + pthread_mutex_unlock(&sort_param->sort_info->mutex); + + DBUG_PRINT("exit", ("======== ending thread ========")); my_thread_end(); return NULL; } @@ -469,6 +490,7 @@ int thr_write_keys(MI_SORT_PARAM *sort_param) MYISAM_SHARE *share=info->s; MI_SORT_PARAM *sinfo; byte *mergebuf=0; + DBUG_ENTER("thr_write_keys"); LINT_INIT(length); for (i= 0, sinfo= sort_param ; @@ -605,7 +627,7 @@ int thr_write_keys(MI_SORT_PARAM *sort_param) } } my_free((gptr) mergebuf,MYF(MY_ALLOW_ZERO_PTR)); - return got_error; + DBUG_RETURN(got_error); } #endif /* THREAD */ |