summaryrefslogtreecommitdiff
path: root/myisam/sort.c
diff options
context:
space:
mode:
Diffstat (limited to 'myisam/sort.c')
-rw-r--r--myisam/sort.c134
1 files changed, 80 insertions, 54 deletions
diff --git a/myisam/sort.c b/myisam/sort.c
index 74e0d973c9c..cb7893b37bc 100644
--- a/myisam/sort.c
+++ b/myisam/sort.c
@@ -148,7 +148,8 @@ int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
skr=maxbuffer;
if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer ||
(keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/
- (sort_length+sizeof(char*))) <= 1)
+ (sort_length+sizeof(char*))) <= 1 ||
+ keys < (uint) maxbuffer)
{
mi_check_print_error(info->sort_info->param,
"sort_buffer_size is to small");
@@ -309,7 +310,7 @@ static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info, uint keys,
pthread_handler_t thr_find_all_keys(void *arg)
{
- MI_SORT_PARAM *info= (MI_SORT_PARAM*) arg;
+ MI_SORT_PARAM *sort_param= (MI_SORT_PARAM*) arg;
int error;
uint memavl,old_memavl,keys,sort_length;
uint idx, maxbuffer;
@@ -321,32 +322,34 @@ pthread_handler_t thr_find_all_keys(void *arg)
if (my_thread_init())
goto err;
- if (info->sort_info->got_error)
+ DBUG_ENTER("thr_find_all_keys");
+ DBUG_PRINT("enter", ("master: %d", sort_param->master));
+ if (sort_param->sort_info->got_error)
goto err;
- if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
+ if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
{
- info->write_keys=write_keys_varlen;
- info->read_to_buffer=read_to_buffer_varlen;
- info->write_key=write_merge_key_varlen;
+ sort_param->write_keys= write_keys_varlen;
+ sort_param->read_to_buffer= read_to_buffer_varlen;
+ sort_param->write_key= write_merge_key_varlen;
}
else
{
- info->write_keys=write_keys;
- info->read_to_buffer=read_to_buffer;
- info->write_key=write_merge_key;
+ sort_param->write_keys= write_keys;
+ sort_param->read_to_buffer= read_to_buffer;
+ sort_param->write_key= write_merge_key;
}
- my_b_clear(&info->tempfile);
- my_b_clear(&info->tempfile_for_exceptions);
- bzero((char*) &info->buffpek,sizeof(info->buffpek));
- bzero((char*) &info->unique, sizeof(info->unique));
+ my_b_clear(&sort_param->tempfile);
+ my_b_clear(&sort_param->tempfile_for_exceptions);
+ bzero((char*) &sort_param->buffpek, sizeof(sort_param->buffpek));
+ bzero((char*) &sort_param->unique, sizeof(sort_param->unique));
sort_keys= (uchar **) NULL;
- memavl=max(info->sortbuff_size, MIN_SORT_MEMORY);
- idx= info->sort_info->max_records;
- sort_length= info->key_length;
- maxbuffer= 1;
+ memavl= max(sort_param->sortbuff_size, MIN_SORT_MEMORY);
+ idx= sort_param->sort_info->max_records;
+ sort_length= sort_param->key_length;
+ maxbuffer= 1;
while (memavl >= MIN_SORT_MEMORY)
{
@@ -361,20 +364,22 @@ pthread_handler_t thr_find_all_keys(void *arg)
skr=maxbuffer;
if (memavl < sizeof(BUFFPEK)*maxbuffer ||
(keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
- (sort_length+sizeof(char*))) <= 1)
+ (sort_length+sizeof(char*))) <= 1 ||
+ keys < (uint) maxbuffer)
{
- mi_check_print_error(info->sort_info->param,
+ mi_check_print_error(sort_param->sort_info->param,
"sort_buffer_size is to small");
goto err;
}
}
while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr);
}
- if ((sort_keys=(uchar **)my_malloc(keys*(sort_length+sizeof(char*))+
- ((info->keyinfo->flag & HA_FULLTEXT) ?
- HA_FT_MAXBYTELEN : 0), MYF(0))))
+ if ((sort_keys= (uchar**)
+ my_malloc(keys*(sort_length+sizeof(char*))+
+ ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
+ HA_FT_MAXBYTELEN : 0), MYF(0))))
{
- if (my_init_dynamic_array(&info->buffpek, sizeof(BUFFPEK),
+ if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
maxbuffer, maxbuffer/2))
{
my_free((gptr) sort_keys,MYF(0));
@@ -389,69 +394,87 @@ pthread_handler_t thr_find_all_keys(void *arg)
}
if (memavl < MIN_SORT_MEMORY)
{
- mi_check_print_error(info->sort_info->param,"Sort buffer to small"); /* purecov: tested */
+ mi_check_print_error(sort_param->sort_info->param, "Sort buffer too small");
goto err; /* purecov: tested */
}
- if (info->sort_info->param->testflag & T_VERBOSE)
- printf("Key %d - Allocating buffer for %d keys\n",info->key+1,keys);
- info->sort_keys=sort_keys;
+ if (sort_param->sort_info->param->testflag & T_VERBOSE)
+ printf("Key %d - Allocating buffer for %d keys\n",
+ sort_param->key + 1, keys);
+ sort_param->sort_keys= sort_keys;
idx=error=0;
sort_keys[0]=(uchar*) (sort_keys+keys);
- while (!(error=info->sort_info->got_error) &&
- !(error=(*info->key_read)(info,sort_keys[idx])))
+ DBUG_PRINT("info", ("reading keys"));
+ while (!(error= sort_param->sort_info->got_error) &&
+ !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
{
- if (info->real_key_length > info->key_length)
+ if (sort_param->real_key_length > sort_param->key_length)
{
- if (write_key(info,sort_keys[idx], &info->tempfile_for_exceptions))
+ if (write_key(sort_param, sort_keys[idx],
+ &sort_param->tempfile_for_exceptions))
goto err;
continue;
}
if (++idx == keys)
{
- if (info->write_keys(info,sort_keys,idx-1,
- (BUFFPEK *)alloc_dynamic(&info->buffpek),
- &info->tempfile))
+ if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
+ (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
+ &sort_param->tempfile))
goto err;
sort_keys[0]=(uchar*) (sort_keys+keys);
- memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
+ memcpy(sort_keys[0], sort_keys[idx - 1], (size_t) sort_param->key_length);
idx=1;
}
- sort_keys[idx]=sort_keys[idx-1]+info->key_length;
+ sort_keys[idx]= sort_keys[idx - 1] + sort_param->key_length;
}
if (error > 0)
goto err;
- if (info->buffpek.elements)
+ if (sort_param->buffpek.elements)
{
- if (info->write_keys(info,sort_keys, idx,
- (BUFFPEK *) alloc_dynamic(&info->buffpek), &info->tempfile))
+ if (sort_param->write_keys(sort_param, sort_keys, idx,
+ (BUFFPEK*) alloc_dynamic(&sort_param->buffpek),
+ &sort_param->tempfile))
goto err;
- info->keys=(info->buffpek.elements-1)*(keys-1)+idx;
+ sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
}
else
- info->keys=idx;
+ sort_param->keys= idx;
- info->sort_keys_length=keys;
+ sort_param->sort_keys_length= keys;
goto ok;
err:
- info->sort_info->got_error=1; /* no need to protect this with a mutex */
+ DBUG_PRINT("error", ("got some error"));
+ sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
if (sort_keys)
my_free((gptr) sort_keys,MYF(0));
- info->sort_keys=0;
- delete_dynamic(& info->buffpek);
- close_cached_file(&info->tempfile);
- close_cached_file(&info->tempfile_for_exceptions);
+ sort_param->sort_keys= 0;
+ delete_dynamic(& sort_param->buffpek);
+ close_cached_file(&sort_param->tempfile);
+ close_cached_file(&sort_param->tempfile_for_exceptions);
ok:
- remove_io_thread(&info->read_cache);
- pthread_mutex_lock(&info->sort_info->mutex);
- info->sort_info->threads_running--;
- pthread_cond_signal(&info->sort_info->cond);
- pthread_mutex_unlock(&info->sort_info->mutex);
+ /*
+ Detach from the share if the writer is involved. Avoid others to
+ be blocked. This includes a flush of the write buffer. This will
+ also indicate EOF to the readers.
+ */
+ if (sort_param->sort_info->info->rec_cache.share)
+ remove_io_thread(&sort_param->sort_info->info->rec_cache);
+
+ /* Readers detach from the share if any. Avoid others to be blocked. */
+ if (sort_param->read_cache.share)
+ remove_io_thread(&sort_param->read_cache);
+
+ pthread_mutex_lock(&sort_param->sort_info->mutex);
+ if (!--sort_param->sort_info->threads_running)
+ pthread_cond_signal(&sort_param->sort_info->cond);
+ pthread_mutex_unlock(&sort_param->sort_info->mutex);
+
+ DBUG_PRINT("exit", ("======== ending thread ========"));
my_thread_end();
return NULL;
}
@@ -469,6 +492,7 @@ int thr_write_keys(MI_SORT_PARAM *sort_param)
MYISAM_SHARE *share=info->s;
MI_SORT_PARAM *sinfo;
byte *mergebuf=0;
+ DBUG_ENTER("thr_write_keys");
LINT_INIT(length);
for (i= 0, sinfo= sort_param ;
@@ -478,6 +502,8 @@ int thr_write_keys(MI_SORT_PARAM *sort_param)
if (!sinfo->sort_keys)
{
got_error=1;
+ my_free(mi_get_rec_buff_ptr(info, sinfo->rec_buff),
+ MYF(MY_ALLOW_ZERO_PTR));
continue;
}
if (!got_error)
@@ -605,7 +631,7 @@ int thr_write_keys(MI_SORT_PARAM *sort_param)
}
}
my_free((gptr) mergebuf,MYF(MY_ALLOW_ZERO_PTR));
- return got_error;
+ DBUG_RETURN(got_error);
}
#endif /* THREAD */