summaryrefslogtreecommitdiff
path: root/storage/innobase/row/row0ftsort.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/row/row0ftsort.cc')
-rw-r--r--storage/innobase/row/row0ftsort.cc142
1 files changed, 89 insertions, 53 deletions
diff --git a/storage/innobase/row/row0ftsort.cc b/storage/innobase/row/row0ftsort.cc
index 275fedbfb5d..087d2152826 100644
--- a/storage/innobase/row/row0ftsort.cc
+++ b/storage/innobase/row/row0ftsort.cc
@@ -265,6 +265,9 @@ row_fts_psort_info_init(
psort_info[j].child_status = 0;
psort_info[j].state = 0;
psort_info[j].psort_common = common_info;
+ psort_info[j].error = DB_SUCCESS;
+ psort_info[j].memory_used = 0;
+ mutex_create(fts_pll_tokenize_mutex_key, &psort_info[j].mutex, SYNC_FTS_TOKENIZE);
}
/* Initialize merge_info structures parallel merge and insert
@@ -312,6 +315,8 @@ row_fts_psort_info_destroy(
}
mem_free(psort_info[j].merge_file[i]);
}
+
+ mutex_free(&psort_info[j].mutex);
}
os_event_free(merge_info[0].psort_common->sort_event);
@@ -545,6 +550,35 @@ row_merge_fts_doc_tokenize(
}
/*********************************************************************//**
+Get next doc item from fts_doc_list */
+UNIV_INLINE
+void
+row_merge_fts_get_next_doc_item(
+/*============================*/
+ fts_psort_t* psort_info, /*!< in: psort_info */
+ fts_doc_item_t** doc_item) /*!< in/out: doc item */
+{
+ if (*doc_item != NULL) {
+ ut_free(*doc_item);
+ }
+
+ mutex_enter(&psort_info->mutex);
+
+ *doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
+ if (*doc_item != NULL) {
+ UT_LIST_REMOVE(doc_list, psort_info->fts_doc_list,
+ *doc_item);
+
+ ut_ad(psort_info->memory_used >= sizeof(fts_doc_item_t)
+ + (*doc_item)->field->len);
+ psort_info->memory_used -= sizeof(fts_doc_item_t)
+ + (*doc_item)->field->len;
+ }
+
+ mutex_exit(&psort_info->mutex);
+}
+
+/*********************************************************************//**
Function performs parallel tokenization of the incoming doc strings.
It also performs the initial in memory sort of the parsed records.
@return OS_THREAD_DUMMY_RETURN */
@@ -557,7 +591,6 @@ fts_parallel_tokenization(
fts_psort_t* psort_info = (fts_psort_t*) arg;
ulint i;
fts_doc_item_t* doc_item = NULL;
- fts_doc_item_t* prev_doc_item = NULL;
row_merge_buf_t** buf;
ibool processed = FALSE;
merge_file_t** merge_file;
@@ -575,7 +608,7 @@ fts_parallel_tokenization(
dict_field_t* idx_field;
fts_tokenize_ctx_t t_ctx;
ulint retried = 0;
- ut_ad(psort_info);
+ dberr_t error = DB_SUCCESS;
ut_ad(psort_info);
@@ -599,11 +632,7 @@ fts_parallel_tokenization(
block = psort_info->merge_block;
zip_size = dict_table_zip_size(table);
- doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
-
- if (doc_item) {
- prev_doc_item = doc_item;
- }
+ row_merge_fts_get_next_doc_item(psort_info, &doc_item);
t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword;
processed = TRUE;
@@ -613,17 +642,8 @@ loop:
last_doc_id = doc_item->doc_id;
- if (!(dfield->data)
- || dfield_get_len(dfield) == UNIV_SQL_NULL) {
- num_doc_processed++;
- doc_item = UT_LIST_GET_NEXT(doc_list, doc_item);
-
- /* Always remember the last doc_item we processed */
- if (doc_item) {
- prev_doc_item = doc_item;
- }
- continue;
- }
+ ut_ad (dfield->data != NULL
+ && dfield_get_len(dfield) != UNIV_SQL_NULL);
/* If finish processing the last item, update "doc" with
strings in the doc_item, otherwise continue processing last
@@ -671,11 +691,13 @@ loop:
num_doc_processed++;
if (fts_enable_diag_print && num_doc_processed % 10000 == 1) {
- fprintf(stderr, "number of doc processed %d\n",
+ ib_logf(IB_LOG_LEVEL_INFO,
+ "number of doc processed %d\n",
(int) num_doc_processed);
#ifdef FTS_INTERNAL_DIAG_PRINT
for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
- fprintf(stderr, "ID %d, partition %d, word "
+ ib_logf(IB_LOG_LEVEL_INFO,
+ "ID %d, partition %d, word "
"%d\n",(int) psort_info->psort_id,
(int) i, (int) mycount[i]);
}
@@ -684,19 +706,10 @@ loop:
mem_heap_empty(blob_heap);
- if (doc_item->field->data) {
- ut_free(doc_item->field->data);
- doc_item->field->data = NULL;
- }
-
- doc_item = UT_LIST_GET_NEXT(doc_list, doc_item);
+ row_merge_fts_get_next_doc_item(psort_info, &doc_item);
- /* Always remember the last doc_item we processed */
- if (doc_item) {
- prev_doc_item = doc_item;
- if (last_doc_id != doc_item->doc_id) {
- t_ctx.init_pos = 0;
- }
+ if (doc_item && last_doc_id != doc_item->doc_id) {
+ t_ctx.init_pos = 0;
}
}
@@ -707,9 +720,14 @@ loop:
row_merge_buf_write(buf[t_ctx.buf_used],
merge_file[t_ctx.buf_used],
block[t_ctx.buf_used]);
- row_merge_write(merge_file[t_ctx.buf_used]->fd,
- merge_file[t_ctx.buf_used]->offset++,
- block[t_ctx.buf_used]);
+
+ if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
+ merge_file[t_ctx.buf_used]->offset++,
+ block[t_ctx.buf_used])) {
+ error = DB_TEMP_FILE_WRITE_FAILURE;
+ goto func_exit;
+ }
+
UNIV_MEM_INVALID(block[t_ctx.buf_used][0], srv_sort_buf_size);
buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]);
mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used];
@@ -721,13 +739,13 @@ loop:
/* Parent done scanning, and if finish processing all the docs, exit */
if (psort_info->state == FTS_PARENT_COMPLETE) {
- if (num_doc_processed >= UT_LIST_GET_LEN(
- psort_info->fts_doc_list)) {
+ if (UT_LIST_GET_LEN(psort_info->fts_doc_list) == 0) {
goto exit;
} else if (retried > 10000) {
ut_ad(!doc_item);
/* retied too many times and cannot get new record */
- fprintf(stderr, "InnoDB: FTS parallel sort processed "
+ ib_logf(IB_LOG_LEVEL_ERROR,
+ "InnoDB: FTS parallel sort processed "
"%lu records, the sort queue has "
"%lu records. But sort cannot get "
"the next records", num_doc_processed,
@@ -735,21 +753,18 @@ loop:
psort_info->fts_doc_list));
goto exit;
}
+ } else if (psort_info->state == FTS_PARENT_EXITING) {
+ /* Parent abort */
+ goto func_exit;
}
- if (doc_item) {
- doc_item = UT_LIST_GET_NEXT(doc_list, doc_item);
- } else if (prev_doc_item) {
- os_thread_yield();
- doc_item = UT_LIST_GET_NEXT(doc_list, prev_doc_item);
- } else {
+ if (doc_item == NULL) {
os_thread_yield();
- doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
}
- if (doc_item) {
- prev_doc_item = doc_item;
+ row_merge_fts_get_next_doc_item(psort_info, &doc_item);
+ if (doc_item != NULL) {
if (last_doc_id != doc_item->doc_id) {
t_ctx.init_pos = 0;
}
@@ -799,9 +814,12 @@ exit:
never flush to temp file, it can be held all in
memory */
if (merge_file[i]->offset != 0) {
- row_merge_write(merge_file[i]->fd,
+ if (!row_merge_write(merge_file[i]->fd,
merge_file[i]->offset++,
- block[i]);
+ block[i])) {
+ error = DB_TEMP_FILE_WRITE_FAILURE;
+ goto func_exit;
+ }
UNIV_MEM_INVALID(block[i][0],
srv_sort_buf_size);
@@ -817,19 +835,24 @@ exit:
}
for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
-
if (!merge_file[i]->offset) {
continue;
}
tmpfd[i] = row_merge_file_create_low();
if (tmpfd[i] < 0) {
+ error = DB_OUT_OF_MEMORY;
+ goto func_exit;
+ }
+
+ error = row_merge_sort(psort_info->psort_common->trx,
+ psort_info->psort_common->dup,
+ merge_file[i], block[i], &tmpfd[i]);
+ if (error != DB_SUCCESS) {
+ close(tmpfd[i]);
goto func_exit;
}
- row_merge_sort(psort_info->psort_common->trx,
- psort_info->psort_common->dup,
- merge_file[i], block[i], &tmpfd[i]);
total_rec += merge_file[i]->n_rec;
close(tmpfd[i]);
}
@@ -841,6 +864,19 @@ func_exit:
mem_heap_free(blob_heap);
+ mutex_enter(&psort_info->mutex);
+ psort_info->error = error;
+ mutex_exit(&psort_info->mutex);
+
+ if (UT_LIST_GET_LEN(psort_info->fts_doc_list) > 0) {
+ ut_ad(error != DB_SUCCESS);
+ }
+
+ /* Free fts doc list in case of error. */
+ do {
+ row_merge_fts_get_next_doc_item(psort_info, &doc_item);
+ } while (doc_item != NULL);
+
psort_info->child_status = FTS_CHILD_COMPLETE;
os_event_set(psort_info->psort_common->sort_event);
psort_info->child_status = FTS_CHILD_EXITING;