diff options
Diffstat (limited to 'storage/maria')
-rw-r--r-- | storage/maria/ma_checkpoint.c | 20 | ||||
-rw-r--r-- | storage/maria/ma_loghandler.c | 11 | ||||
-rw-r--r-- | storage/maria/ma_pagecache.c | 32 | ||||
-rw-r--r-- | storage/maria/ma_pagecache.h | 40 | ||||
-rw-r--r-- | storage/maria/ma_servicethread.c | 51 | ||||
-rw-r--r-- | storage/maria/ma_servicethread.h | 7 | ||||
-rw-r--r-- | storage/maria/ma_sort.c | 318 | ||||
-rw-r--r-- | storage/maria/maria_def.h | 3 |
8 files changed, 225 insertions, 257 deletions
diff --git a/storage/maria/ma_checkpoint.c b/storage/maria/ma_checkpoint.c index 36b729e6053..21b4758a720 100644 --- a/storage/maria/ma_checkpoint.c +++ b/storage/maria/ma_checkpoint.c @@ -46,7 +46,7 @@ static mysql_mutex_t LOCK_checkpoint; static mysql_cond_t COND_checkpoint; /** @brief control structure for checkpoint background thread */ static MA_SERVICE_THREAD_CONTROL checkpoint_control= - {THREAD_DEAD, FALSE, &LOCK_checkpoint, &COND_checkpoint}; + {0, FALSE, FALSE, &LOCK_checkpoint, &COND_checkpoint}; /* is ulong like pagecache->blocks_changed */ static ulong pages_to_flush_before_next_checkpoint; static PAGECACHE_FILE *dfiles, /**< data files to flush in background */ @@ -326,7 +326,6 @@ end: int ma_checkpoint_init(ulong interval) { - pthread_t th; int res= 0; DBUG_ENTER("ma_checkpoint_init"); if (ma_service_thread_control_init(&checkpoint_control)) @@ -334,14 +333,14 @@ int ma_checkpoint_init(ulong interval) else if (interval > 0) { compile_time_assert(sizeof(void *) >= sizeof(ulong)); - if (!(res= mysql_thread_create(key_thread_checkpoint, - &th, NULL, ma_checkpoint_background, - (void *)interval))) - { - /* thread lives, will have to be killed */ - checkpoint_control.status= THREAD_RUNNING; - } + if ((res= mysql_thread_create(key_thread_checkpoint, + &checkpoint_control.thread, NULL, + ma_checkpoint_background, + (void*) interval))) + checkpoint_control.killed= TRUE; } + else + checkpoint_control.killed= TRUE; DBUG_RETURN(res); } @@ -573,8 +572,6 @@ pthread_handler_t ma_checkpoint_background(void *arg) sleeps= 1; pages_to_flush_before_next_checkpoint= 0; - pthread_detach_this_thread(); - for(;;) /* iterations of checkpoints and dirty page flushing */ { #if 0 /* good for testing, to do a lot of checkpoints, finds a lot of bugs */ @@ -723,7 +720,6 @@ pthread_handler_t ma_checkpoint_background(void *arg) DBUG_EXECUTE_IF("maria_checkpoint_indirect", level= CHECKPOINT_INDIRECT;); ma_checkpoint_execute(level, FALSE); } - my_service_thread_signal_end(&checkpoint_control); my_thread_end(); return 0; } diff --git a/storage/maria/ma_loghandler.c b/storage/maria/ma_loghandler.c index 1e7cc9483a2..8bcb84c2a20 100644 --- a/storage/maria/ma_loghandler.c +++ b/storage/maria/ma_loghandler.c @@ -54,7 +54,7 @@ static mysql_mutex_t LOCK_soft_sync; static mysql_cond_t COND_soft_sync; /** @brief control structure for checkpoint background thread */ static MA_SERVICE_THREAD_CONTROL soft_sync_control= - {THREAD_DEAD, FALSE, &LOCK_soft_sync, &COND_soft_sync}; + {0, FALSE, FALSE, &LOCK_soft_sync, &COND_soft_sync}; /* transaction log file descriptor */ @@ -8790,7 +8790,6 @@ ma_soft_sync_background( void *arg __attribute__((unused))) if (my_service_thread_sleep(&soft_sync_control, sleep)) break; } - my_service_thread_signal_end(&soft_sync_control); my_thread_end(); DBUG_RETURN(0); } @@ -8803,7 +8802,6 @@ ma_soft_sync_background( void *arg __attribute__((unused))) int translog_soft_sync_start(void) { - pthread_t th; int res= 0; uint32 min, max; DBUG_ENTER("translog_soft_sync_start"); @@ -8818,9 +8816,10 @@ int translog_soft_sync_start(void) soft_need_sync= 1; if (!(res= ma_service_thread_control_init(&soft_sync_control))) - if (!(res= mysql_thread_create(key_thread_soft_sync, - &th, NULL, ma_soft_sync_background, NULL))) - soft_sync_control.status= THREAD_RUNNING; + if ((res= mysql_thread_create(key_thread_soft_sync, + &soft_sync_control.thread, NULL, + ma_soft_sync_background, NULL))) + soft_sync_control.killed= TRUE; DBUG_RETURN(res); } diff --git a/storage/maria/ma_pagecache.c b/storage/maria/ma_pagecache.c index bba99703119..4630a84334b 100644 --- a/storage/maria/ma_pagecache.c +++ b/storage/maria/ma_pagecache.c @@ -500,8 +500,8 @@ static void test_key_cache(PAGECACHE *pagecache, const char *where, my_bool lock); #endif -#define PAGECACHE_HASH(p, f, pos) (((ulong) (pos) + \ - (ulong) (f).file) & (p->hash_entries-1)) +#define PAGECACHE_HASH(p, f, pos) (((size_t) (pos) + \ + (size_t) (f).file) & (p->hash_entries-1)) #define FILE_HASH(f,cache) ((uint) (f).file & (cache->changed_blocks_hash_size-1)) #define DEFAULT_PAGECACHE_DEBUG_LOG "pagecache_debug.log" @@ -641,10 +641,10 @@ static my_bool pagecache_fwrite(PAGECACHE *pagecache, { char buff[80]; uint len= my_sprintf(buff, - (buff, "fwrite: fd: %d id: %u page: %lu", + (buff, "fwrite: fd: %d id: %u page: %llu", filedesc->file, _ma_file_callback_to_id(filedesc->callback_data), - (ulong) pageno)); + pageno)); (void) translog_log_debug_info(0, LOGREC_DEBUG_INFO_QUERY, (uchar*) buff, len); } @@ -745,12 +745,12 @@ static inline uint next_power(uint value) */ -ulong init_pagecache(PAGECACHE *pagecache, size_t use_mem, +size_t init_pagecache(PAGECACHE *pagecache, size_t use_mem, uint division_limit, uint age_threshold, uint block_size, uint changed_blocks_hash_size, myf my_readwrite_flags) { - ulong blocks, hash_links, length; + size_t blocks, hash_links, length; int error; DBUG_ENTER("init_pagecache"); DBUG_ASSERT(block_size >= 512); @@ -787,10 +787,10 @@ ulong init_pagecache(PAGECACHE *pagecache, size_t use_mem, DBUG_PRINT("info", ("block_size: %u", block_size)); DBUG_ASSERT(((uint)(1 << pagecache->shift)) == block_size); - blocks= (ulong) (use_mem / (sizeof(PAGECACHE_BLOCK_LINK) + + blocks= use_mem / (sizeof(PAGECACHE_BLOCK_LINK) + 2 * sizeof(PAGECACHE_HASH_LINK) + sizeof(PAGECACHE_HASH_LINK*) * - 5/4 + block_size)); + 5/4 + block_size); /* Changed blocks hash needs to be a power of 2 */ changed_blocks_hash_size= my_round_up_to_next_power(MY_MAX(changed_blocks_hash_size, MIN_PAGECACHE_CHANGED_BLOCKS_HASH_SIZE)); @@ -826,7 +826,7 @@ ulong init_pagecache(PAGECACHE *pagecache, size_t use_mem, blocks--; /* Allocate memory for cache page buffers */ if ((pagecache->block_mem= - my_large_malloc((ulong) blocks * pagecache->block_size, + my_large_malloc(blocks * pagecache->block_size, MYF(MY_WME)))) { /* @@ -857,7 +857,7 @@ ulong init_pagecache(PAGECACHE *pagecache, size_t use_mem, blocks= blocks / 4*3; } pagecache->blocks_unused= blocks; - pagecache->disk_blocks= (long) blocks; + pagecache->disk_blocks= blocks; pagecache->hash_links= hash_links; pagecache->hash_links_used= 0; pagecache->free_hash_list= NULL; @@ -894,7 +894,7 @@ ulong init_pagecache(PAGECACHE *pagecache, size_t use_mem, pagecache->hash_links, (long) pagecache->hash_link_root)); pagecache->blocks= pagecache->disk_blocks > 0 ? pagecache->disk_blocks : 0; - DBUG_RETURN((ulong) pagecache->disk_blocks); + DBUG_RETURN((size_t)pagecache->disk_blocks); err: error= my_errno; @@ -985,11 +985,11 @@ static int flush_all_key_blocks(PAGECACHE *pagecache) So we disable it for now. */ #if NOT_USED /* keep disabled until code is fixed see above !! */ -ulong resize_pagecache(PAGECACHE *pagecache, +size_t resize_pagecache(PAGECACHE *pagecache, size_t use_mem, uint division_limit, uint age_threshold, uint changed_blocks_hash_size) { - ulong blocks; + size_t blocks; struct st_my_thread_var *thread; WQUEUE *wqueue; DBUG_ENTER("resize_pagecache"); @@ -1385,7 +1385,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, ("linked block: %u:%1u status: %x #requests: %u #available: %u", PCBLOCK_NUMBER(pagecache, block), at_end, block->status, block->requests, pagecache->blocks_available)); - KEYCACHE_DBUG_ASSERT((ulong) pagecache->blocks_available <= + KEYCACHE_DBUG_ASSERT(pagecache->blocks_available <= pagecache->blocks_used); #endif DBUG_VOID_RETURN; @@ -2024,7 +2024,7 @@ restart: /* There are some never used blocks, take first of them */ block= &pagecache->block_root[pagecache->blocks_used]; block->buffer= ADD_TO_PTR(pagecache->block_mem, - ((ulong) pagecache->blocks_used* + (pagecache->blocks_used* pagecache->block_size), uchar*); pagecache->blocks_used++; @@ -4875,7 +4875,7 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache, LSN *min_rec_lsn) { my_bool error= 0; - ulong stored_list_size= 0; + size_t stored_list_size= 0; uint file_hash; char *ptr; LSN minimum_rec_lsn= LSN_MAX; diff --git a/storage/maria/ma_pagecache.h b/storage/maria/ma_pagecache.h index e212a7b7029..207ad69711f 100644 --- a/storage/maria/ma_pagecache.h +++ b/storage/maria/ma_pagecache.h @@ -131,21 +131,21 @@ typedef struct st_pagecache_hash_link PAGECACHE_HASH_LINK; typedef struct st_pagecache { size_t mem_size; /* specified size of the cache memory */ - ulong min_warm_blocks; /* min number of warm blocks; */ - ulong age_threshold; /* age threshold for hot blocks */ + size_t min_warm_blocks; /* min number of warm blocks; */ + size_t age_threshold; /* age threshold for hot blocks */ ulonglong time; /* total number of block link operations */ - ulong hash_entries; /* max number of entries in the hash table */ - ulong changed_blocks_hash_size; /* Number of hash buckets for file blocks */ - long hash_links; /* max number of hash links */ - long hash_links_used; /* number of hash links taken from free links pool */ - long disk_blocks; /* max number of blocks in the cache */ - ulong blocks_used; /* maximum number of concurrently used blocks */ - ulong blocks_unused; /* number of currently unused blocks */ - ulong blocks_changed; /* number of currently dirty blocks */ - ulong warm_blocks; /* number of blocks in warm sub-chain */ - ulong cnt_for_resize_op; /* counter to block resize operation */ - ulong blocks_available; /* number of blocks available in the LRU chain */ - long blocks; /* max number of blocks in the cache */ + size_t hash_entries; /* max number of entries in the hash table */ + size_t changed_blocks_hash_size;/* Number of hash buckets for file blocks */ + ssize_t hash_links; /* max number of hash links */ + ssize_t hash_links_used; /* number of hash links taken from free links pool */ + ssize_t disk_blocks; /* max number of blocks in the cache */ + size_t blocks_used; /* maximum number of concurrently used blocks */ + size_t blocks_unused; /* number of currently unused blocks */ + size_t blocks_changed; /* number of currently dirty blocks */ + size_t warm_blocks; /* number of blocks in warm sub-chain */ + size_t cnt_for_resize_op; /* counter to block resize operation */ + size_t blocks_available; /* number of blocks available in the LRU chain */ + ssize_t blocks; /* max number of blocks in the cache */ uint32 block_size; /* size of the page buffer of a cache block */ PAGECACHE_HASH_LINK **hash_root;/* arr. of entries into hash table buckets */ PAGECACHE_HASH_LINK *hash_link_root;/* memory for hash table links */ @@ -170,12 +170,12 @@ typedef struct st_pagecache */ ulonglong param_buff_size; /* size the memory allocated for the cache */ - ulong param_block_size; /* size of the blocks in the key cache */ - ulong param_division_limit; /* min. percentage of warm blocks */ - ulong param_age_threshold; /* determines when hot block is downgraded */ + size_t param_block_size; /* size of the blocks in the key cache */ + size_t param_division_limit; /* min. percentage of warm blocks */ + size_t param_age_threshold; /* determines when hot block is downgraded */ /* Statistics variables. These are reset in reset_pagecache_counters(). */ - ulong global_blocks_changed; /* number of currently dirty blocks */ + size_t global_blocks_changed; /* number of currently dirty blocks */ ulonglong global_cache_w_requests;/* number of write requests (write hits) */ ulonglong global_cache_write; /* number of writes from cache to files */ ulonglong global_cache_r_requests;/* number of read requests (read hits) */ @@ -208,11 +208,11 @@ typedef enum pagecache_flush_filter_result /* The default key cache */ extern PAGECACHE dflt_pagecache_var, *dflt_pagecache; -extern ulong init_pagecache(PAGECACHE *pagecache, size_t use_mem, +extern size_t init_pagecache(PAGECACHE *pagecache, size_t use_mem, uint division_limit, uint age_threshold, uint block_size, uint changed_blocks_hash_size, myf my_read_flags); -extern ulong resize_pagecache(PAGECACHE *pagecache, +extern size_t resize_pagecache(PAGECACHE *pagecache, size_t use_mem, uint division_limit, uint age_threshold, uint changed_blocks_hash_size); extern void change_pagecache_param(PAGECACHE *pagecache, uint division_limit, diff --git a/storage/maria/ma_servicethread.c b/storage/maria/ma_servicethread.c index e5c949a7571..d92c5315933 100644 --- a/storage/maria/ma_servicethread.c +++ b/storage/maria/ma_servicethread.c @@ -33,7 +33,7 @@ int ma_service_thread_control_init(MA_SERVICE_THREAD_CONTROL *control) DBUG_ENTER("ma_service_thread_control_init"); DBUG_PRINT("init", ("control 0x%lx", (ulong) control)); control->inited= TRUE; - control->status= THREAD_DEAD; /* not yet born == dead */ + control->killed= FALSE; res= (mysql_mutex_init(key_SERVICE_THREAD_CONTROL_lock, control->LOCK_control, MY_MUTEX_INIT_SLOW) || mysql_cond_init(key_SERVICE_THREAD_CONTROL_cond, @@ -60,20 +60,17 @@ void ma_service_thread_control_end(MA_SERVICE_THREAD_CONTROL *control) DBUG_PRINT("init", ("control 0x%lx", (ulong) control)); DBUG_ASSERT(control->inited); mysql_mutex_lock(control->LOCK_control); - if (control->status != THREAD_DEAD) /* thread was started OK */ + if (!control->killed) { DBUG_PRINT("info",("killing Maria background thread")); - control->status= THREAD_DYING; /* kill it */ - do /* and wait for it to be dead */ - { - /* wake it up if it was in a sleep */ - mysql_cond_broadcast(control->COND_control); - DBUG_PRINT("info",("waiting for Maria background thread to die")); - mysql_cond_wait(control->COND_control, control->LOCK_control); - } - while (control->status != THREAD_DEAD); + control->killed= TRUE; /* kill it */ + mysql_cond_broadcast(control->COND_control); + mysql_mutex_unlock(control->LOCK_control); + DBUG_PRINT("info", ("waiting for Maria background thread to die")); + pthread_join(control->thread, NULL); } - mysql_mutex_unlock(control->LOCK_control); + else + mysql_mutex_unlock(control->LOCK_control); mysql_mutex_destroy(control->LOCK_control); mysql_cond_destroy(control->COND_control); control->inited= FALSE; @@ -100,7 +97,7 @@ my_bool my_service_thread_sleep(MA_SERVICE_THREAD_CONTROL *control, DBUG_ENTER("my_service_thread_sleep"); DBUG_PRINT("init", ("control 0x%lx", (ulong) control)); mysql_mutex_lock(control->LOCK_control); - if (control->status == THREAD_DYING) + if (control->killed) { mysql_mutex_unlock(control->LOCK_control); DBUG_RETURN(TRUE); @@ -119,34 +116,8 @@ my_bool my_service_thread_sleep(MA_SERVICE_THREAD_CONTROL *control, control->LOCK_control, &abstime); } #endif - if (control->status == THREAD_DYING) + if (control->killed) res= TRUE; mysql_mutex_unlock(control->LOCK_control); DBUG_RETURN(res); } - - -/** - inform about thread exiting - - @param control control block -*/ - -void my_service_thread_signal_end(MA_SERVICE_THREAD_CONTROL *control) -{ - DBUG_ENTER("my_service_thread_signal_end"); - DBUG_PRINT("init", ("control 0x%lx", (ulong) control)); - mysql_mutex_lock(control->LOCK_control); - control->status = THREAD_DEAD; /* indicate that we are dead */ - /* - wake up ma_service_thread_control_end which may be waiting for - our death - */ - mysql_cond_broadcast(control->COND_control); - /* - broadcast was inside unlock because ma_service_thread_control_end - destroys mutex - */ - mysql_mutex_unlock(control->LOCK_control); - DBUG_VOID_RETURN; -} diff --git a/storage/maria/ma_servicethread.h b/storage/maria/ma_servicethread.h index ed578d93c24..254225bd608 100644 --- a/storage/maria/ma_servicethread.h +++ b/storage/maria/ma_servicethread.h @@ -16,12 +16,10 @@ #include <my_pthread.h> -enum ma_service_thread_state {THREAD_RUNNING, THREAD_DYING, THREAD_DEAD}; - typedef struct st_ma_service_thread_control { - /** 'kill' flag for the background thread */ - enum ma_service_thread_state status; + pthread_t thread; + my_bool killed; /** if thread module was inited or not */ my_bool inited; /** for killing the background thread */ @@ -35,4 +33,3 @@ int ma_service_thread_control_init(MA_SERVICE_THREAD_CONTROL *control); void ma_service_thread_control_end(MA_SERVICE_THREAD_CONTROL *control); my_bool my_service_thread_sleep(MA_SERVICE_THREAD_CONTROL *control, ulonglong sleep_time); -void my_service_thread_signal_end(MA_SERVICE_THREAD_CONTROL *control); diff --git a/storage/maria/ma_sort.c b/storage/maria/ma_sort.c index ef6e8506ac6..ac166cf4084 100644 --- a/storage/maria/ma_sort.c +++ b/storage/maria/ma_sort.c @@ -364,192 +364,196 @@ err: } /* find_all_keys */ -/* Search after all keys and place them in a temp. file */ - -pthread_handler_t _ma_thr_find_all_keys(void *arg) +static my_bool _ma_thr_find_all_keys_exec(MARIA_SORT_PARAM* sort_param) { - MARIA_SORT_PARAM *sort_param= (MARIA_SORT_PARAM*) arg; - int error; - size_t memavl, old_memavl; + int error= 0; + ulonglong memavl, old_memavl; longlong sortbuff_size; ha_keys UNINIT_VAR(keys), idx; uint sort_length; uint maxbuffer; - uchar **sort_keys=0; + uchar **sort_keys= NULL; + DBUG_ENTER("_ma_thr_find_all_keys_exec"); + DBUG_PRINT("enter", ("master: %d", sort_param->master)); - error=1; + if (sort_param->sort_info->got_error) + DBUG_RETURN(TRUE); - if (my_thread_init()) - goto err; - - { /* Add extra block since DBUG_ENTER declare variables */ - DBUG_ENTER("_ma_thr_find_all_keys"); - DBUG_PRINT("enter", ("master: %d", sort_param->master)); - if (sort_param->sort_info->got_error) - goto err; + set_sort_param_read_write(sort_param); - set_sort_param_read_write(sort_param); + my_b_clear(&sort_param->tempfile); + my_b_clear(&sort_param->tempfile_for_exceptions); + bzero((char*) &sort_param->buffpek, sizeof(sort_param->buffpek)); + bzero((char*) &sort_param->unique, sizeof(sort_param->unique)); - my_b_clear(&sort_param->tempfile); - my_b_clear(&sort_param->tempfile_for_exceptions); - bzero((char*) &sort_param->buffpek,sizeof(sort_param->buffpek)); - bzero((char*) &sort_param->unique, sizeof(sort_param->unique)); + sortbuff_size= sort_param->sortbuff_size; + memavl= MY_MAX(sortbuff_size, MIN_SORT_MEMORY); + idx= (ha_keys) sort_param->sort_info->max_records; + sort_length= sort_param->key_length; + maxbuffer= 1; - sortbuff_size= sort_param->sortbuff_size; - memavl= MY_MAX(sortbuff_size, MIN_SORT_MEMORY); - idx= (ha_keys) sort_param->sort_info->max_records; - sort_length= sort_param->key_length; - maxbuffer= 1; - - while (memavl >= MIN_SORT_MEMORY) + while (memavl >= MIN_SORT_MEMORY) + { + if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <= (my_off_t) memavl) + keys= idx+1; + else if ((sort_param->sort_info->param->testflag & + (T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS)) == + T_FORCE_SORT_MEMORY) { - if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <= (my_off_t) memavl) - keys= idx+1; - else if ((sort_param->sort_info->param->testflag & - (T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS)) == - T_FORCE_SORT_MEMORY) - { - /* - Use all of the given sort buffer for key data. - Allocate 1000 buffers at a start for new data. More buffers - will be allocated when needed. - */ - keys= memavl / (sort_length+sizeof(char*)); - maxbuffer= (uint) MY_MIN((ulonglong) 1000, (idx / keys)+1); - } - else - { - uint maxbuffer_org; - do - { - maxbuffer_org= maxbuffer; - if (memavl < sizeof(BUFFPEK)*maxbuffer || - (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/ - (sort_length+sizeof(char*))) <= 1 || - keys < maxbuffer) - { - _ma_check_print_error(sort_param->sort_info->param, - "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u", - sortbuff_size, (ulonglong) idx, sort_length); - goto err; - } - } - while ((maxbuffer= (uint) (idx/(keys-1)+1)) != maxbuffer_org); - } - if ((sort_keys= (uchar **) - my_malloc(keys*(sort_length+sizeof(char*))+ - ((sort_param->keyinfo->flag & HA_FULLTEXT) ? - HA_FT_MAXBYTELEN : 0), MYF(0)))) + /* + Use all of the given sort buffer for key data. + Allocate 1000 buffers at a start for new data. More buffers + will be allocated when needed. + */ + keys= memavl / (sort_length+sizeof(char*)); + maxbuffer= (uint) MY_MIN((ulonglong) 1000, (idx / keys)+1); + } + else + { + uint maxbuffer_org; + do { - if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK), - maxbuffer, MY_MIN(maxbuffer/2, 1000), MYF(0))) + maxbuffer_org= maxbuffer; + if (memavl < sizeof(BUFFPEK)*maxbuffer || + (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/ + (sort_length+sizeof(char*))) <= 1 || + keys < maxbuffer) { - my_free(sort_keys); - sort_keys= (uchar **) NULL; /* for err: label */ + _ma_check_print_error(sort_param->sort_info->param, + "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u", + sortbuff_size, (ulonglong) idx, sort_length); + goto err; } - else - break; } - old_memavl= memavl; - if ((memavl= memavl/4*3) < MIN_SORT_MEMORY && - old_memavl > MIN_SORT_MEMORY) - memavl= MIN_SORT_MEMORY; + while ((maxbuffer= (uint) (idx/(keys-1)+1)) != maxbuffer_org); } - if (memavl < MIN_SORT_MEMORY) + if ((sort_keys= (uchar **) + my_malloc(keys*(sort_length+sizeof(char*))+ + ((sort_param->keyinfo->flag & HA_FULLTEXT) ? + HA_FT_MAXBYTELEN : 0), MYF(0)))) { - /* purecov: begin inspected */ - _ma_check_print_error(sort_param->sort_info->param, - "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u", - sortbuff_size, (ulonglong) idx, sort_length); - my_errno= ENOMEM; - goto err; - /* purecov: end inspected */ + if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK), + maxbuffer, MY_MIN(maxbuffer / 2, 1000), MYF(0))) + { + my_free(sort_keys); + sort_keys= NULL; /* Safety against double free on error. */ + } + else + break; } + old_memavl= memavl; + if ((memavl= memavl/4*3) < MIN_SORT_MEMORY && + old_memavl > MIN_SORT_MEMORY) + memavl= MIN_SORT_MEMORY; + } + if (memavl < MIN_SORT_MEMORY) + { + /* purecov: begin inspected */ + _ma_check_print_error(sort_param->sort_info->param, + "aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u", + sortbuff_size, (ulonglong) idx, sort_length); + my_errno= ENOMEM; + goto err; + /* purecov: end inspected */ + } - if (sort_param->sort_info->param->testflag & T_VERBOSE) - my_fprintf(stdout, - "Key %d - Allocating buffer for %llu keys\n", - sort_param->key + 1, (ulonglong) keys); - sort_param->sort_keys= sort_keys; + if (sort_param->sort_info->param->testflag & T_VERBOSE) + my_fprintf(stdout, + "Key %d - Allocating buffer for %llu keys\n", + sort_param->key + 1, (ulonglong) keys); + sort_param->sort_keys= sort_keys; - idx= error= 0; - sort_keys[0]= (uchar*) (sort_keys+keys); + idx= error= 0; + sort_keys[0]= (uchar*) (sort_keys+keys); - DBUG_PRINT("info", ("reading keys")); - while (!(error= sort_param->sort_info->got_error) && - !(error= (*sort_param->key_read)(sort_param, sort_keys[idx]))) + DBUG_PRINT("info", ("reading keys")); + while (!(error= sort_param->sort_info->got_error) && + !(error= (*sort_param->key_read)(sort_param, sort_keys[idx]))) + { + if (sort_param->real_key_length > sort_param->key_length) { - if (sort_param->real_key_length > sort_param->key_length) - { - if (write_key(sort_param, sort_keys[idx], - &sort_param->tempfile_for_exceptions)) - goto err; - continue; - } - - if (++idx == keys) - { - if (sort_param->write_keys(sort_param, sort_keys, idx - 1, - (BUFFPEK *)alloc_dynamic(&sort_param-> - buffpek), - &sort_param->tempfile)) - goto err; - sort_keys[0]= (uchar*) (sort_keys+keys); - memcpy(sort_keys[0], sort_keys[idx - 1], - (size_t) sort_param->key_length); - idx= 1; - } - sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length; + if (write_key(sort_param, sort_keys[idx], + &sort_param->tempfile_for_exceptions)) + goto err; + continue; } - if (error > 0) - goto err; - if (sort_param->buffpek.elements) + + if (++idx == keys) { - if (sort_param->write_keys(sort_param,sort_keys, idx, - (BUFFPEK *) alloc_dynamic(&sort_param-> - buffpek), + if (sort_param->write_keys(sort_param, sort_keys, idx - 1, + (BUFFPEK *)alloc_dynamic(&sort_param->buffpek), &sort_param->tempfile)) goto err; - sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx; + sort_keys[0]= (uchar*) (sort_keys+keys); + memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length); + idx= 1; } - else - sort_param->keys= idx; + sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length; + } + if (error > 0) + goto err; + if (sort_param->buffpek.elements) + { + if (sort_param->write_keys(sort_param,sort_keys, idx, + (BUFFPEK *) alloc_dynamic(&sort_param->buffpek), + &sort_param->tempfile)) + goto err; + sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx; + } + else + sort_param->keys= idx; - goto ok; + DBUG_RETURN(FALSE); err: - DBUG_PRINT("error", ("got some error")); - sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */ - my_free(sort_keys); - sort_param->sort_keys= 0; - delete_dynamic(& sort_param->buffpek); - close_cached_file(&sort_param->tempfile); - close_cached_file(&sort_param->tempfile_for_exceptions); - -ok: - free_root(&sort_param->wordroot, MYF(0)); - /* - Detach from the share if the writer is involved. Avoid others to - be blocked. This includes a flush of the write buffer. This will - also indicate EOF to the readers. - That means that a writer always gets here first and readers - - only when they see EOF. But if a reader finishes prematurely - because of an error it may reach this earlier - don't allow it - to detach the writer thread. - */ - if (sort_param->master && sort_param->sort_info->info->rec_cache.share) - remove_io_thread(&sort_param->sort_info->info->rec_cache); - - /* Readers detach from the share if any. Avoid others to be blocked. */ - if (sort_param->read_cache.share) - remove_io_thread(&sort_param->read_cache); - - mysql_mutex_lock(&sort_param->sort_info->mutex); - if (!--sort_param->sort_info->threads_running) - mysql_cond_signal(&sort_param->sort_info->cond); - mysql_mutex_unlock(&sort_param->sort_info->mutex); - DBUG_PRINT("exit", ("======== ending thread ========")); - } + DBUG_PRINT("error", ("got some error")); + my_free(sort_keys); + sort_param->sort_keys= 0; + delete_dynamic(& sort_param->buffpek); + close_cached_file(&sort_param->tempfile); + close_cached_file(&sort_param->tempfile_for_exceptions); + + DBUG_RETURN(TRUE); +} + +/* Search after all keys and place them in a temp. file */ + +pthread_handler_t _ma_thr_find_all_keys(void *arg) +{ + MARIA_SORT_PARAM *sort_param= (MARIA_SORT_PARAM*) arg; + my_bool error= FALSE; + /* If my_thread_init fails */ + if (my_thread_init() || _ma_thr_find_all_keys_exec(sort_param)) + error= TRUE; + + /* + Thread must clean up after itself. + */ + free_root(&sort_param->wordroot, MYF(0)); + /* + Detach from the share if the writer is involved. Avoid others to + be blocked. This includes a flush of the write buffer. This will + also indicate EOF to the readers. + That means that a writer always gets here first and readers - + only when they see EOF. But if a reader finishes prematurely + because of an error it may reach this earlier - don't allow it + to detach the writer thread. + */ + if (sort_param->master && sort_param->sort_info->info->rec_cache.share) + remove_io_thread(&sort_param->sort_info->info->rec_cache); + + /* Readers detach from the share if any. Avoid others to be blocked. */ + if (sort_param->read_cache.share) + remove_io_thread(&sort_param->read_cache); + + mysql_mutex_lock(&sort_param->sort_info->mutex); + if (error) + sort_param->sort_info->got_error= 1; + + if (!--sort_param->sort_info->threads_running) + mysql_cond_signal(&sort_param->sort_info->cond); + mysql_mutex_unlock(&sort_param->sort_info->mutex); + my_thread_end(); return NULL; } @@ -559,7 +563,7 @@ int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param) { MARIA_SORT_INFO *sort_info=sort_param->sort_info; HA_CHECK *param=sort_info->param; - ulong UNINIT_VAR(length), keys; + size_t UNINIT_VAR(length), keys; double *rec_per_key_part= param->new_rec_per_key_part; int got_error=sort_info->got_error; uint i; diff --git a/storage/maria/maria_def.h b/storage/maria/maria_def.h index 7337b01a981..a4fac8c088a 100644 --- a/storage/maria/maria_def.h +++ b/storage/maria/maria_def.h @@ -67,7 +67,8 @@ typedef struct st_maria_sort_info pgcache_page_no_t page; ha_rows max_records; uint current_key, total_keys; - uint got_error, threads_running; + volatile uint got_error; + uint threads_running; myf myf_rw; enum data_file_type new_data_file_type, org_data_file_type; } MARIA_SORT_INFO; |