summaryrefslogtreecommitdiff
path: root/storage/innobase/fts
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/fts')
-rw-r--r--storage/innobase/fts/fts0fts.cc325
-rw-r--r--storage/innobase/fts/fts0opt.cc56
2 files changed, 288 insertions, 93 deletions
diff --git a/storage/innobase/fts/fts0fts.cc b/storage/innobase/fts/fts0fts.cc
index 8623e653991..d2b90542ec6 100644
--- a/storage/innobase/fts/fts0fts.cc
+++ b/storage/innobase/fts/fts0fts.cc
@@ -38,22 +38,6 @@ Full Text Search interface
#include "dict0stats.h"
#include "btr0pcur.h"
-/** The SYNC state of the cache. There is one instance of this struct
-associated with each ADD thread. */
-struct fts_sync_t {
- /** Transaction used for SYNCing the cache to disk */
- trx_t *trx;
- /** Table with FTS index(es) */
- dict_table_t *table;
- /** Max size in bytes of the cache */
- ulint max_cache_size;
- /** The doc id at which the cache was noted as being
- full, we use this to set the upper_limit field */
- doc_id_t max_doc_id;
- /** SYNC start time; only used if fts_enable_diag_print */
- time_t start_time;
-};
-
static const ulint FTS_MAX_ID_LEN = 32;
/** Column name from the FTS config table */
@@ -201,8 +185,15 @@ struct fts_tokenize_param_t {
/** Run SYNC on the table, i.e., write out data from the cache to the
FTS auxiliary INDEX table and clear the cache at the end.
@param[in,out] sync sync state
+@param[in] unlock_cache whether unlock cache lock when write node
+@param[in] wait whether wait when a sync is in progress
@return DB_SUCCESS if all OK */
-static dberr_t fts_sync(fts_sync_t *sync);
+static
+dberr_t
+fts_sync(
+ fts_sync_t* sync,
+ bool unlock_cache,
+ bool wait);
/****************************************************************//**
Release all resources help by the words rb tree e.g., the node ilist. */
@@ -275,6 +266,7 @@ fts_cache_destroy(fts_cache_t* cache)
mysql_mutex_destroy(&cache->init_lock);
mysql_mutex_destroy(&cache->deleted_lock);
mysql_mutex_destroy(&cache->doc_id_lock);
+ pthread_cond_destroy(&cache->sync->cond);
if (cache->stopword_info.cached_stopword) {
rbt_free(cache->stopword_info.cached_stopword);
@@ -574,6 +566,7 @@ fts_index_cache_init(
for (i = 0; i < FTS_NUM_AUX_INDEX; ++i) {
ut_a(index_cache->ins_graph[i] == NULL);
+ ut_a(index_cache->sel_graph[i] == NULL);
}
}
@@ -643,6 +636,7 @@ fts_cache_create(
mem_heap_zalloc(heap, sizeof(fts_sync_t)));
cache->sync->table = table;
+ pthread_cond_init(&cache->sync->cond, nullptr);
/* Create the index cache vector that will hold the inverted indexes. */
cache->indexes = ib_vector_create(
@@ -968,6 +962,10 @@ fts_cache_index_cache_create(
mem_heap_zalloc(static_cast<mem_heap_t*>(
cache->self_heap->arg), n_bytes));
+ index_cache->sel_graph = static_cast<que_t**>(
+ mem_heap_zalloc(static_cast<mem_heap_t*>(
+ cache->self_heap->arg), n_bytes));
+
fts_index_cache_init(cache->sync_heap, index_cache);
if (cache->get_docs) {
@@ -1041,6 +1039,13 @@ fts_cache_clear(
index_cache->ins_graph[j] = NULL;
}
+
+ if (index_cache->sel_graph[j] != NULL) {
+
+ que_graph_free(index_cache->sel_graph[j]);
+
+ index_cache->sel_graph[j] = NULL;
+ }
}
index_cache->doc_stats = NULL;
@@ -1333,7 +1338,8 @@ fts_cache_add_doc(
ib_vector_last(word->nodes));
}
- if (!fts_node || fts_node->ilist_size > FTS_ILIST_MAX_SIZE
+ if (fts_node == NULL || fts_node->synced
+ || fts_node->ilist_size > FTS_ILIST_MAX_SIZE
|| doc_id < fts_node->last_doc_id) {
fts_node = static_cast<fts_node_t*>(
@@ -3320,7 +3326,7 @@ fts_add_doc_from_tuple(
if (cache->total_size > fts_max_cache_size / 5
|| fts_need_sync) {
- fts_sync(cache->sync);
+ fts_sync(cache->sync, true, false);
}
mtr_start(&mtr);
@@ -3356,7 +3362,7 @@ fts_add_doc_by_id(
dict_index_t* fts_id_index;
ibool is_id_cluster;
fts_cache_t* cache = ftt->table->fts->cache;
- bool need_sync= false;
+
ut_ad(cache->get_docs);
/* If Doc ID has been supplied by the user, then the table
@@ -3496,32 +3502,44 @@ fts_add_doc_by_id(
get_doc->index_cache,
doc_id, doc.tokens);
- /** FTS cache sync should happen
- frequently. Because user thread
- shouldn't hold the cache lock for
- longer time. So cache should sync
- whenever cache size exceeds 512 KB */
- need_sync =
- cache->total_size > 512*1024;
+ bool need_sync = !cache->sync->in_progress
+ && (fts_need_sync
+ || (cache->total_size
+ - cache->total_size_at_sync)
+ > fts_max_cache_size / 10);
+ if (need_sync) {
+ cache->total_size_at_sync =
+ cache->total_size;
+ }
mysql_mutex_unlock(&table->fts->cache->lock);
DBUG_EXECUTE_IF(
"fts_instrument_sync",
- fts_sync_table(table);
+ fts_optimize_request_sync_table(table);
+ mysql_mutex_lock(&cache->lock);
+ if (cache->sync->in_progress)
+ my_cond_wait(
+ &cache->sync->cond,
+ &cache->lock.m_mutex);
+ mysql_mutex_unlock(&cache->lock);
);
DBUG_EXECUTE_IF(
"fts_instrument_sync_debug",
- fts_sync(cache->sync);
+ fts_sync(cache->sync, true, true);
);
DEBUG_SYNC_C("fts_instrument_sync_request");
DBUG_EXECUTE_IF(
"fts_instrument_sync_request",
- need_sync= true;
+ fts_optimize_request_sync_table(table);
);
+ if (need_sync) {
+ fts_optimize_request_sync_table(table);
+ }
+
mtr_start(&mtr);
if (i < num_idx - 1) {
@@ -3547,10 +3565,6 @@ func_exit:
ut_free(pcur.old_rec_buf);
mem_heap_free(heap);
-
- if (need_sync) {
- fts_sync_table(table);
- }
}
@@ -3910,13 +3924,15 @@ static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
fts_sync_write_words(
trx_t* trx,
- fts_index_cache_t* index_cache)
+ fts_index_cache_t* index_cache,
+ bool unlock_cache)
{
fts_table_t fts_table;
ulint n_nodes = 0;
ulint n_words = 0;
const ib_rbt_node_t* rbt_node;
dberr_t error = DB_SUCCESS;
+ ibool print_error = FALSE;
dict_table_t* table = index_cache->index->table;
FTS_INIT_INDEX_TABLE(
@@ -3947,36 +3963,53 @@ fts_sync_write_words(
fts_table.suffix = fts_get_suffix(selected);
+ /* We iterate over all the nodes even if there was an error */
for (i = 0; i < ib_vector_size(word->nodes); ++i) {
fts_node_t* fts_node = static_cast<fts_node_t*>(
ib_vector_get(word->nodes, i));
- error = fts_write_node(
- trx, &index_cache->ins_graph[selected],
- &fts_table, &word->text, fts_node);
+ if (fts_node->synced) {
+ continue;
+ } else {
+ fts_node->synced = true;
+ }
+
+ /*FIXME: we need to handle the error properly. */
+ if (error == DB_SUCCESS) {
+ if (unlock_cache) {
+ mysql_mutex_unlock(
+ &table->fts->cache->lock);
+ }
+
+ error = fts_write_node(
+ trx,
+ &index_cache->ins_graph[selected],
+ &fts_table, &word->text, fts_node);
- DEBUG_SYNC_C("fts_write_node");
- DBUG_EXECUTE_IF("fts_write_node_crash",
+ DEBUG_SYNC_C("fts_write_node");
+ DBUG_EXECUTE_IF("fts_write_node_crash",
DBUG_SUICIDE(););
- DBUG_EXECUTE_IF("fts_instrument_sync_sleep",
+ DBUG_EXECUTE_IF(
+ "fts_instrument_sync_sleep",
std::this_thread::sleep_for(
std::chrono::seconds(1)););
- if (error != DB_SUCCESS) {
- goto err_exit;
+ if (unlock_cache) {
+ mysql_mutex_lock(
+ &table->fts->cache->lock);
+ }
}
}
n_nodes += ib_vector_size(word->nodes);
- if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
-err_exit:
+ if (UNIV_UNLIKELY(error != DB_SUCCESS) && !print_error) {
ib::error() << "(" << error << ") writing"
" word node to FTS auxiliary index table "
<< table->name;
- break;
+ print_error = TRUE;
}
}
@@ -4035,44 +4068,58 @@ fts_sync_index(
ut_ad(rbt_validate(index_cache->words));
- return(fts_sync_write_words(trx, index_cache));
+ return(fts_sync_write_words(trx, index_cache, sync->unlock_cache));
}
-/** Rollback a sync operation
-@param[in,out] sync sync state */
+/** Check if index cache has been synced completely
+@param[in,out] index_cache index cache
+@return true if index is synced, otherwise false. */
static
-void
-fts_sync_rollback(
- fts_sync_t* sync)
+bool
+fts_sync_index_check(
+ fts_index_cache_t* index_cache)
{
- trx_t* trx = sync->trx;
- fts_cache_t* cache = sync->table->fts->cache;
-
- for (ulint i = 0; i < ib_vector_size(cache->indexes); ++i) {
- ulint j;
- fts_index_cache_t* index_cache;
-
- index_cache = static_cast<fts_index_cache_t*>(
- ib_vector_get(cache->indexes, i));
+ const ib_rbt_node_t* rbt_node;
- for (j = 0; fts_index_selector[j].value; ++j) {
+ for (rbt_node = rbt_first(index_cache->words);
+ rbt_node != NULL;
+ rbt_node = rbt_next(index_cache->words, rbt_node)) {
- if (index_cache->ins_graph[j] != NULL) {
+ fts_tokenizer_word_t* word;
+ word = rbt_value(fts_tokenizer_word_t, rbt_node);
- que_graph_free(index_cache->ins_graph[j]);
+ fts_node_t* fts_node;
+ fts_node = static_cast<fts_node_t*>(ib_vector_last(word->nodes));
- index_cache->ins_graph[j] = NULL;
- }
+ if (!fts_node->synced) {
+ return(false);
}
}
- mysql_mutex_unlock(&cache->lock);
+ return(true);
+}
- fts_sql_rollback(trx);
+/** Reset synced flag in index cache when rollback
+@param[in,out] index_cache index cache */
+static
+void
+fts_sync_index_reset(
+ fts_index_cache_t* index_cache)
+{
+ const ib_rbt_node_t* rbt_node;
- /* Avoid assertion in trx_t::free(). */
- trx->dict_operation_lock_mode = false;
- trx->free();
+ for (rbt_node = rbt_first(index_cache->words);
+ rbt_node != NULL;
+ rbt_node = rbt_next(index_cache->words, rbt_node)) {
+
+ fts_tokenizer_word_t* word;
+ word = rbt_value(fts_tokenizer_word_t, rbt_node);
+
+ fts_node_t* fts_node;
+ fts_node = static_cast<fts_node_t*>(ib_vector_last(word->nodes));
+
+ fts_node->synced = false;
+ }
}
/** Commit the SYNC, change state of processed doc ids etc.
@@ -4105,20 +4152,19 @@ fts_sync_commit(
sync, cache->deleted_doc_ids);
}
+ /* We need to do this within the deleted lock since fts_delete() can
+ attempt to add a deleted doc id to the cache deleted id array. */
+ fts_cache_clear(cache);
+ DEBUG_SYNC_C("fts_deleted_doc_ids_clear");
+ fts_cache_init(cache);
+ mysql_mutex_unlock(&cache->lock);
+
if (UNIV_LIKELY(error == DB_SUCCESS)) {
- /* We need to do this within the deleted lock
- since fts_delete() can attempt to add a deleted
- doc id to the cache deleted id array. */
- fts_cache_clear(cache);
- DEBUG_SYNC_C("fts_deleted_doc_ids_clear");
- fts_cache_init(cache);
- mysql_mutex_unlock(&cache->lock);
fts_sql_commit(trx);
} else {
+ fts_sql_rollback(trx);
ib::error() << "(" << error << ") during SYNC of "
"table " << sync->table->name;
- fts_sync_rollback(sync);
- return error;
}
if (UNIV_UNLIKELY(fts_enable_diag_print) && elapsed_time) {
@@ -4138,13 +4184,66 @@ fts_sync_commit(
return(error);
}
+/** Rollback a sync operation
+@param[in,out] sync sync state */
+static
+void
+fts_sync_rollback(
+ fts_sync_t* sync)
+{
+ trx_t* trx = sync->trx;
+ fts_cache_t* cache = sync->table->fts->cache;
+
+ for (ulint i = 0; i < ib_vector_size(cache->indexes); ++i) {
+ ulint j;
+ fts_index_cache_t* index_cache;
+
+ index_cache = static_cast<fts_index_cache_t*>(
+ ib_vector_get(cache->indexes, i));
+
+ /* Reset synced flag so nodes will not be skipped
+ in the next sync, see fts_sync_write_words(). */
+ fts_sync_index_reset(index_cache);
+
+ for (j = 0; fts_index_selector[j].value; ++j) {
+
+ if (index_cache->ins_graph[j] != NULL) {
+
+ que_graph_free(index_cache->ins_graph[j]);
+
+ index_cache->ins_graph[j] = NULL;
+ }
+
+ if (index_cache->sel_graph[j] != NULL) {
+
+ que_graph_free(index_cache->sel_graph[j]);
+
+ index_cache->sel_graph[j] = NULL;
+ }
+ }
+ }
+
+ mysql_mutex_unlock(&cache->lock);
+
+ fts_sql_rollback(trx);
+
+ /* Avoid assertion in trx_t::free(). */
+ trx->dict_operation_lock_mode = false;
+ trx->free();
+}
+
/** Run SYNC on the table, i.e., write out data from the cache to the
FTS auxiliary INDEX table and clear the cache at the end.
@param[in,out] sync sync state
@param[in] unlock_cache whether unlock cache lock when write node
@param[in] wait whether wait when a sync is in progress
@return DB_SUCCESS if all OK */
-static dberr_t fts_sync(fts_sync_t *sync)
+static
+dberr_t
+fts_sync(
+ fts_sync_t* sync,
+ bool unlock_cache,
+ bool wait)
{
if (srv_read_only_mode) {
return DB_READ_ONLY;
@@ -4155,13 +4254,33 @@ static dberr_t fts_sync(fts_sync_t *sync)
fts_cache_t* cache = sync->table->fts->cache;
mysql_mutex_lock(&cache->lock);
+
+ /* Check if cache is being synced.
+ Note: we release cache lock in fts_sync_write_words() to
+ avoid long wait for the lock by other threads. */
+ if (sync->in_progress) {
+ if (!wait) {
+ mysql_mutex_unlock(&cache->lock);
+ return(DB_SUCCESS);
+ }
+ do {
+ my_cond_wait(&sync->cond, &cache->lock.m_mutex);
+ } while (sync->in_progress);
+ }
+
+ sync->unlock_cache = unlock_cache;
+ sync->in_progress = true;
+
DEBUG_SYNC_C("fts_sync_begin");
fts_sync_begin(sync);
+begin_sync:
const size_t fts_cache_size= fts_max_cache_size;
if (cache->total_size > fts_cache_size) {
/* Avoid the case: sync never finish when
insert/update keeps comming. */
+ ut_ad(sync->unlock_cache);
+ sync->unlock_cache = false;
ib::warn() << "Total InnoDB FTS size "
<< cache->total_size << " for the table "
<< cache->sync->table->name
@@ -4185,23 +4304,52 @@ static dberr_t fts_sync(fts_sync_t *sync)
error = fts_sync_index(sync, index_cache);
if (error != DB_SUCCESS) {
- goto err_exit;
+ goto end_sync;
+ }
+
+ if (!sync->unlock_cache
+ && cache->total_size < fts_max_cache_size) {
+ /* Reset the unlock cache if the value
+ is less than innodb_ft_cache_size */
+ sync->unlock_cache = true;
}
}
DBUG_EXECUTE_IF("fts_instrument_sync_interrupted",
+ sync->interrupted = true;
error = DB_INTERRUPTED;
- goto err_exit;
+ goto end_sync;
);
- if (error == DB_SUCCESS) {
+ /* Make sure all the caches are synced. */
+ for (i = 0; i < ib_vector_size(cache->indexes); ++i) {
+ fts_index_cache_t* index_cache;
+
+ index_cache = static_cast<fts_index_cache_t*>(
+ ib_vector_get(cache->indexes, i));
+
+ if (index_cache->index->to_be_dropped
+ || fts_sync_index_check(index_cache)) {
+ continue;
+ }
+
+ goto begin_sync;
+ }
+
+end_sync:
+ if (error == DB_SUCCESS && !sync->interrupted) {
error = fts_sync_commit(sync);
} else {
-err_exit:
fts_sync_rollback(sync);
- return error;
}
+ mysql_mutex_lock(&cache->lock);
+ ut_ad(sync->in_progress);
+ sync->interrupted = false;
+ sync->in_progress = false;
+ pthread_cond_broadcast(&sync->cond);
+ mysql_mutex_unlock(&cache->lock);
+
/* We need to check whether an optimize is required, for that
we make copies of the two variables that control the trigger. These
variables can change behind our back and we don't want to hold the
@@ -4213,7 +4361,6 @@ err_exit:
mysql_mutex_unlock(&cache->deleted_lock);
- DEBUG_SYNC_C("fts_sync_end");
return(error);
}
@@ -4222,12 +4369,12 @@ FTS auxiliary INDEX table and clear the cache at the end.
@param[in,out] table fts table
@param[in] wait whether wait for existing sync to finish
@return DB_SUCCESS on success, error code on failure. */
-dberr_t fts_sync_table(dict_table_t* table)
+dberr_t fts_sync_table(dict_table_t* table, bool wait)
{
ut_ad(table->fts);
return table->space && !table->corrupted && table->fts->cache
- ? fts_sync(table->fts->cache->sync)
+ ? fts_sync(table->fts->cache->sync, !wait, wait)
: DB_SUCCESS;
}
diff --git a/storage/innobase/fts/fts0opt.cc b/storage/innobase/fts/fts0opt.cc
index 7c40a25e6e7..fe31767d901 100644
--- a/storage/innobase/fts/fts0opt.cc
+++ b/storage/innobase/fts/fts0opt.cc
@@ -83,8 +83,9 @@ enum fts_msg_type_t {
FTS_MSG_ADD_TABLE, /*!< Add table to the optimize thread's
work queue */
- FTS_MSG_DEL_TABLE /*!< Remove a table from the optimize
+ FTS_MSG_DEL_TABLE, /*!< Remove a table from the optimize
threads work queue */
+ FTS_MSG_SYNC_TABLE /*!< Sync fts cache of a table */
};
/** Compressed list of words that have been read from FTS INDEX
@@ -2624,6 +2625,36 @@ fts_optimize_remove_table(
mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
+/** Send sync fts cache for the table.
+@param[in] table table to sync */
+void
+fts_optimize_request_sync_table(
+ dict_table_t* table)
+{
+ /* if the optimize system not yet initialized, return */
+ if (!fts_optimize_wq) {
+ return;
+ }
+
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+
+ /* FTS optimizer thread is already exited */
+ if (fts_opt_start_shutdown) {
+ ib::info() << "Try to sync table " << table->name
+ << " after FTS optimize thread exiting.";
+ } else if (table->fts->sync_message) {
+ /* If the table already has SYNC message in
+ fts_optimize_wq queue then ignore it */
+ } else {
+ add_msg(fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, table));
+ table->fts->sync_message = true;
+ DBUG_EXECUTE_IF("fts_optimize_wq_count_check",
+ DBUG_ASSERT(fts_optimize_wq->length <= 1000););
+ }
+
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+}
+
/** Add a table to fts_slots if it doesn't already exist. */
static bool fts_optimize_new_table(dict_table_t* table)
{
@@ -2765,8 +2796,7 @@ static void fts_optimize_sync_table(dict_table_t *table,
if (sync_table->fts && sync_table->fts->cache && sync_table->is_accessible())
{
- fts_sync_table(sync_table);
-
+ fts_sync_table(sync_table, false);
if (process_message)
{
mysql_mutex_lock(&fts_optimize_wq->mutex);
@@ -2866,6 +2896,24 @@ retry_later:
--n_tables;
}
break;
+
+ case FTS_MSG_SYNC_TABLE:
+ if (UNIV_UNLIKELY(wsrep_sst_disable_writes)) {
+ add_msg(msg);
+ goto retry_later;
+ }
+
+ DBUG_EXECUTE_IF(
+ "fts_instrument_msg_sync_sleep",
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(
+ 300)););
+
+ fts_optimize_sync_table(
+ static_cast<dict_table_t*>(msg->ptr),
+ true);
+ break;
+
default:
ut_error;
}
@@ -2998,7 +3046,7 @@ void fts_sync_during_ddl(dict_table_t* table)
if (!sync_message)
return;
- fts_sync_table(table);
+ fts_sync_table(table, false);
mysql_mutex_lock(&fts_optimize_wq->mutex);
table->fts->sync_message = false;