diff options
Diffstat (limited to 'storage/innobase/row/row0ftsort.cc')
-rw-r--r-- | storage/innobase/row/row0ftsort.cc | 142 |
1 files changed, 89 insertions, 53 deletions
diff --git a/storage/innobase/row/row0ftsort.cc b/storage/innobase/row/row0ftsort.cc index 275fedbfb5d..087d2152826 100644 --- a/storage/innobase/row/row0ftsort.cc +++ b/storage/innobase/row/row0ftsort.cc @@ -265,6 +265,9 @@ row_fts_psort_info_init( psort_info[j].child_status = 0; psort_info[j].state = 0; psort_info[j].psort_common = common_info; + psort_info[j].error = DB_SUCCESS; + psort_info[j].memory_used = 0; + mutex_create(fts_pll_tokenize_mutex_key, &psort_info[j].mutex, SYNC_FTS_TOKENIZE); } /* Initialize merge_info structures parallel merge and insert @@ -312,6 +315,8 @@ row_fts_psort_info_destroy( } mem_free(psort_info[j].merge_file[i]); } + + mutex_free(&psort_info[j].mutex); } os_event_free(merge_info[0].psort_common->sort_event); @@ -545,6 +550,35 @@ row_merge_fts_doc_tokenize( } /*********************************************************************//** +Get next doc item from fts_doc_list */ +UNIV_INLINE +void +row_merge_fts_get_next_doc_item( +/*============================*/ + fts_psort_t* psort_info, /*!< in: psort_info */ + fts_doc_item_t** doc_item) /*!< in/out: doc item */ +{ + if (*doc_item != NULL) { + ut_free(*doc_item); + } + + mutex_enter(&psort_info->mutex); + + *doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list); + if (*doc_item != NULL) { + UT_LIST_REMOVE(doc_list, psort_info->fts_doc_list, + *doc_item); + + ut_ad(psort_info->memory_used >= sizeof(fts_doc_item_t) + + (*doc_item)->field->len); + psort_info->memory_used -= sizeof(fts_doc_item_t) + + (*doc_item)->field->len; + } + + mutex_exit(&psort_info->mutex); +} + +/*********************************************************************//** Function performs parallel tokenization of the incoming doc strings. It also performs the initial in memory sort of the parsed records. @return OS_THREAD_DUMMY_RETURN */ @@ -557,7 +591,6 @@ fts_parallel_tokenization( fts_psort_t* psort_info = (fts_psort_t*) arg; ulint i; fts_doc_item_t* doc_item = NULL; - fts_doc_item_t* prev_doc_item = NULL; row_merge_buf_t** buf; ibool processed = FALSE; merge_file_t** merge_file; @@ -575,7 +608,7 @@ fts_parallel_tokenization( dict_field_t* idx_field; fts_tokenize_ctx_t t_ctx; ulint retried = 0; - ut_ad(psort_info); + dberr_t error = DB_SUCCESS; ut_ad(psort_info); @@ -599,11 +632,7 @@ fts_parallel_tokenization( block = psort_info->merge_block; zip_size = dict_table_zip_size(table); - doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list); - - if (doc_item) { - prev_doc_item = doc_item; - } + row_merge_fts_get_next_doc_item(psort_info, &doc_item); t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword; processed = TRUE; @@ -613,17 +642,8 @@ loop: last_doc_id = doc_item->doc_id; - if (!(dfield->data) - || dfield_get_len(dfield) == UNIV_SQL_NULL) { - num_doc_processed++; - doc_item = UT_LIST_GET_NEXT(doc_list, doc_item); - - /* Always remember the last doc_item we processed */ - if (doc_item) { - prev_doc_item = doc_item; - } - continue; - } + ut_ad (dfield->data != NULL + && dfield_get_len(dfield) != UNIV_SQL_NULL); /* If finish processing the last item, update "doc" with strings in the doc_item, otherwise continue processing last @@ -671,11 +691,13 @@ loop: num_doc_processed++; if (fts_enable_diag_print && num_doc_processed % 10000 == 1) { - fprintf(stderr, "number of doc processed %d\n", + ib_logf(IB_LOG_LEVEL_INFO, + "number of doc processed %d\n", (int) num_doc_processed); #ifdef FTS_INTERNAL_DIAG_PRINT for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { - fprintf(stderr, "ID %d, partition %d, word " + ib_logf(IB_LOG_LEVEL_INFO, + "ID %d, partition %d, word " "%d\n",(int) psort_info->psort_id, (int) i, (int) mycount[i]); } @@ -684,19 +706,10 @@ loop: mem_heap_empty(blob_heap); - if (doc_item->field->data) { - ut_free(doc_item->field->data); - doc_item->field->data = NULL; - } - - doc_item = UT_LIST_GET_NEXT(doc_list, doc_item); + row_merge_fts_get_next_doc_item(psort_info, &doc_item); - /* Always remember the last doc_item we processed */ - if (doc_item) { - prev_doc_item = doc_item; - if (last_doc_id != doc_item->doc_id) { - t_ctx.init_pos = 0; - } + if (doc_item && last_doc_id != doc_item->doc_id) { + t_ctx.init_pos = 0; } } @@ -707,9 +720,14 @@ loop: row_merge_buf_write(buf[t_ctx.buf_used], merge_file[t_ctx.buf_used], block[t_ctx.buf_used]); - row_merge_write(merge_file[t_ctx.buf_used]->fd, - merge_file[t_ctx.buf_used]->offset++, - block[t_ctx.buf_used]); + + if (!row_merge_write(merge_file[t_ctx.buf_used]->fd, + merge_file[t_ctx.buf_used]->offset++, + block[t_ctx.buf_used])) { + error = DB_TEMP_FILE_WRITE_FAILURE; + goto func_exit; + } + UNIV_MEM_INVALID(block[t_ctx.buf_used][0], srv_sort_buf_size); buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]); mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used]; @@ -721,13 +739,13 @@ loop: /* Parent done scanning, and if finish processing all the docs, exit */ if (psort_info->state == FTS_PARENT_COMPLETE) { - if (num_doc_processed >= UT_LIST_GET_LEN( - psort_info->fts_doc_list)) { + if (UT_LIST_GET_LEN(psort_info->fts_doc_list) == 0) { goto exit; } else if (retried > 10000) { ut_ad(!doc_item); /* retied too many times and cannot get new record */ - fprintf(stderr, "InnoDB: FTS parallel sort processed " + ib_logf(IB_LOG_LEVEL_ERROR, + "InnoDB: FTS parallel sort processed " "%lu records, the sort queue has " "%lu records. But sort cannot get " "the next records", num_doc_processed, @@ -735,21 +753,18 @@ loop: psort_info->fts_doc_list)); goto exit; } + } else if (psort_info->state == FTS_PARENT_EXITING) { + /* Parent abort */ + goto func_exit; } - if (doc_item) { - doc_item = UT_LIST_GET_NEXT(doc_list, doc_item); - } else if (prev_doc_item) { - os_thread_yield(); - doc_item = UT_LIST_GET_NEXT(doc_list, prev_doc_item); - } else { + if (doc_item == NULL) { os_thread_yield(); - doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list); } - if (doc_item) { - prev_doc_item = doc_item; + row_merge_fts_get_next_doc_item(psort_info, &doc_item); + if (doc_item != NULL) { if (last_doc_id != doc_item->doc_id) { t_ctx.init_pos = 0; } @@ -799,9 +814,12 @@ exit: never flush to temp file, it can be held all in memory */ if (merge_file[i]->offset != 0) { - row_merge_write(merge_file[i]->fd, + if (!row_merge_write(merge_file[i]->fd, merge_file[i]->offset++, - block[i]); + block[i])) { + error = DB_TEMP_FILE_WRITE_FAILURE; + goto func_exit; + } UNIV_MEM_INVALID(block[i][0], srv_sort_buf_size); @@ -817,19 +835,24 @@ exit: } for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { - if (!merge_file[i]->offset) { continue; } tmpfd[i] = row_merge_file_create_low(); if (tmpfd[i] < 0) { + error = DB_OUT_OF_MEMORY; + goto func_exit; + } + + error = row_merge_sort(psort_info->psort_common->trx, + psort_info->psort_common->dup, + merge_file[i], block[i], &tmpfd[i]); + if (error != DB_SUCCESS) { + close(tmpfd[i]); goto func_exit; } - row_merge_sort(psort_info->psort_common->trx, - psort_info->psort_common->dup, - merge_file[i], block[i], &tmpfd[i]); total_rec += merge_file[i]->n_rec; close(tmpfd[i]); } @@ -841,6 +864,19 @@ func_exit: mem_heap_free(blob_heap); + mutex_enter(&psort_info->mutex); + psort_info->error = error; + mutex_exit(&psort_info->mutex); + + if (UT_LIST_GET_LEN(psort_info->fts_doc_list) > 0) { + ut_ad(error != DB_SUCCESS); + } + + /* Free fts doc list in case of error. */ + do { + row_merge_fts_get_next_doc_item(psort_info, &doc_item); + } while (doc_item != NULL); + psort_info->child_status = FTS_CHILD_COMPLETE; os_event_set(psort_info->psort_common->sort_event); psort_info->child_status = FTS_CHILD_EXITING; |