summaryrefslogtreecommitdiff
path: root/storage/innobase/fts/fts0opt.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/fts/fts0opt.cc')
-rw-r--r--storage/innobase/fts/fts0opt.cc301
1 files changed, 115 insertions, 186 deletions
diff --git a/storage/innobase/fts/fts0opt.cc b/storage/innobase/fts/fts0opt.cc
index 9e3b4b3121d..7c40a25e6e7 100644
--- a/storage/innobase/fts/fts0opt.cc
+++ b/storage/innobase/fts/fts0opt.cc
@@ -66,8 +66,9 @@ static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300;
/** Server is shutting down, so does we exiting the optimize thread */
static bool fts_opt_start_shutdown = false;
-/** Event to wait for shutdown of the optimize thread */
-static os_event_t fts_opt_shutdown_event = NULL;
+/** Condition variable for shutting down the optimize thread.
+Protected by fts_optimize_wq->mutex. */
+static pthread_cond_t fts_opt_shutdown_cond;
/** Initial size of nodes in fts_word_t. */
static const ulint FTS_WORD_NODES_INIT_SIZE = 64;
@@ -82,9 +83,8 @@ enum fts_msg_type_t {
FTS_MSG_ADD_TABLE, /*!< Add table to the optimize thread's
work queue */
- FTS_MSG_DEL_TABLE, /*!< Remove a table from the optimize
+ FTS_MSG_DEL_TABLE /*!< Remove a table from the optimize
threads work queue */
- FTS_MSG_SYNC_TABLE /*!< Sync fts cache of a table */
};
/** Compressed list of words that have been read from FTS INDEX
@@ -203,12 +203,12 @@ struct fts_slot_t {
};
/** A table remove message for the FTS optimize thread. */
-struct fts_msg_del_t {
- dict_table_t* table; /*!< The table to remove */
-
- os_event_t event; /*!< Event to synchronize acknowledgement
- of receipt and processing of the
- this message by the consumer */
+struct fts_msg_del_t
+{
+ /** the table to remove */
+ dict_table_t *table;
+ /** condition variable to signal message consumption */
+ pthread_cond_t *cond;
};
/** The FTS optimize message work queue message type. */
@@ -899,7 +899,7 @@ fts_index_fetch_words(
}
}
- fts_que_graph_free(graph);
+ que_graph_free(graph);
/* Check if max word to fetch is exceeded */
if (optim->zip->n_words >= n_words) {
@@ -1012,10 +1012,7 @@ fts_table_fetch_doc_ids(
error = fts_eval_sql(trx, graph);
fts_sql_commit(trx);
-
- mutex_enter(&dict_sys.mutex);
que_graph_free(graph);
- mutex_exit(&dict_sys.mutex);
if (error == DB_SUCCESS) {
ib_vector_sort(doc_ids->doc_ids, fts_doc_id_cmp);
@@ -1470,7 +1467,7 @@ fts_optimize_write_word(
" when deleting a word from the FTS index.";
}
- fts_que_graph_free(graph);
+ que_graph_free(graph);
graph = NULL;
/* Even if the operation needs to be rolled back and redone,
@@ -1502,7 +1499,7 @@ fts_optimize_write_word(
}
if (graph != NULL) {
- fts_que_graph_free(graph);
+ que_graph_free(graph);
}
return(error);
@@ -1614,8 +1611,21 @@ fts_optimize_create(
optim->fts_index_table.table = table;
/* The common prefix for all this parent table's aux tables. */
- optim->name_prefix = fts_get_table_name_prefix(
- &optim->fts_common_table);
+ char table_id[FTS_AUX_MIN_TABLE_ID_LENGTH];
+ const size_t table_id_len = 1
+ + size_t(fts_get_table_id(&optim->fts_common_table, table_id));
+ dict_sys.freeze(SRW_LOCK_CALL);
+ /* Include the separator as well. */
+ const size_t dbname_len = table->name.dblen() + 1;
+ ut_ad(dbname_len > 1);
+ const size_t prefix_name_len = dbname_len + 4 + table_id_len;
+ char* prefix_name = static_cast<char*>(
+ ut_malloc_nokey(prefix_name_len));
+ memcpy(prefix_name, table->name.m_name, dbname_len);
+ dict_sys.unfreeze();
+ memcpy(prefix_name + dbname_len, "FTS_", 4);
+ memcpy(prefix_name + dbname_len + 4, table_id, table_id_len);
+ optim->name_prefix =prefix_name;
return(optim);
}
@@ -1827,7 +1837,7 @@ fts_optimize_words(
charset, word->f_str,
word->f_len)
&& graph) {
- fts_que_graph_free(graph);
+ que_graph_free(graph);
graph = NULL;
}
}
@@ -1846,7 +1856,7 @@ fts_optimize_words(
}
if (graph != NULL) {
- fts_que_graph_free(graph);
+ que_graph_free(graph);
}
}
@@ -2079,7 +2089,7 @@ fts_optimize_purge_deleted_doc_ids(
}
}
- fts_que_graph_free(graph);
+ que_graph_free(graph);
return(error);
}
@@ -2116,7 +2126,7 @@ fts_optimize_purge_deleted_doc_id_snapshot(
graph = fts_parse_sql(NULL, info, fts_end_delete_sql);
error = fts_eval_sql(optim->trx, graph);
- fts_que_graph_free(graph);
+ que_graph_free(graph);
return(error);
}
@@ -2184,7 +2194,7 @@ fts_optimize_create_deleted_doc_id_snapshot(
error = fts_eval_sql(optim->trx, graph);
- fts_que_graph_free(graph);
+ que_graph_free(graph);
if (error != DB_SUCCESS) {
fts_sql_rollback(optim->trx);
@@ -2537,9 +2547,9 @@ fts_optimize_create_msg(
}
/** Add message to wqueue, signal thread pool*/
-static void add_msg(fts_msg_t *msg, bool wq_locked= false)
+static void add_msg(fts_msg_t *msg)
{
- ib_wqueue_add(fts_optimize_wq, msg, msg->heap, wq_locked);
+ ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
srv_thread_pool->submit_task(&task);
}
@@ -2563,17 +2573,17 @@ void fts_optimize_add_table(dict_table_t* table)
}
/* Make sure table with FTS index cannot be evicted */
- dict_table_prevent_eviction(table);
+ dict_sys.prevent_eviction(table);
msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
- mutex_enter(&fts_optimize_wq->mutex);
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
- add_msg(msg, true);
+ add_msg(msg);
table->fts->in_queue = true;
- mutex_exit(&fts_optimize_wq->mutex);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
/**********************************************************************//**
@@ -2584,102 +2594,34 @@ fts_optimize_remove_table(
/*======================*/
dict_table_t* table) /*!< in: table to remove */
{
- fts_msg_t* msg;
- os_event_t event;
- fts_msg_del_t* remove;
-
- /* if the optimize system not yet initialized, return */
- if (!fts_optimize_wq) {
- return;
- }
-
- /* FTS optimizer thread is already exited */
- if (fts_opt_start_shutdown) {
- ib::info() << "Try to remove table " << table->name
- << " after FTS optimize thread exiting.";
- /* If the table can't be removed then wait till
- fts optimize thread shuts down */
- while (fts_optimize_wq) {
- os_thread_sleep(10000);
- }
- return;
- }
-
- mutex_enter(&fts_optimize_wq->mutex);
-
- if (!table->fts->in_queue) {
- mutex_exit(&fts_optimize_wq->mutex);
- return;
- }
-
- msg = fts_optimize_create_msg(FTS_MSG_DEL_TABLE, NULL);
-
- /* We will wait on this event until signalled by the consumer. */
- event = os_event_create(0);
-
- remove = static_cast<fts_msg_del_t*>(
- mem_heap_alloc(msg->heap, sizeof(*remove)));
-
- remove->table = table;
- remove->event = event;
- msg->ptr = remove;
-
- ut_ad(!mutex_own(&dict_sys.mutex));
-
- add_msg(msg, true);
-
- mutex_exit(&fts_optimize_wq->mutex);
-
- os_event_wait(event);
-
- os_event_destroy(event);
-
-#ifdef UNIV_DEBUG
- if (!fts_opt_start_shutdown) {
- mutex_enter(&fts_optimize_wq->mutex);
- ut_ad(!table->fts->in_queue);
- mutex_exit(&fts_optimize_wq->mutex);
- }
-#endif /* UNIV_DEBUG */
-}
-
-/** Send sync fts cache for the table.
-@param[in] table table to sync */
-void
-fts_optimize_request_sync_table(
- dict_table_t* table)
-{
- /* if the optimize system not yet initialized, return */
- if (!fts_optimize_wq) {
- return;
- }
-
- /* FTS optimizer thread is already exited */
- if (fts_opt_start_shutdown) {
- ib::info() << "Try to sync table " << table->name
- << " after FTS optimize thread exiting.";
- return;
- }
-
- mutex_enter(&fts_optimize_wq->mutex);
-
- if (table->fts->sync_message) {
- /* If the table already has SYNC message in
- fts_optimize_wq queue then ignore it */
- mutex_exit(&fts_optimize_wq->mutex);
- return;
- }
-
- fts_msg_t* msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, table);
+ if (!fts_optimize_wq)
+ return;
- add_msg(msg, true);
+ if (fts_opt_start_shutdown)
+ {
+ ib::info() << "Try to remove table " << table->name
+ << " after FTS optimize thread exiting.";
+ while (fts_optimize_wq)
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ return;
+ }
- table->fts->sync_message = true;
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
- DBUG_EXECUTE_IF("fts_optimize_wq_count_check",
- DBUG_ASSERT(fts_optimize_wq->length <= 1000););
+ if (table->fts->in_queue)
+ {
+ fts_msg_t *msg= fts_optimize_create_msg(FTS_MSG_DEL_TABLE, nullptr);
+ pthread_cond_t cond;
+ pthread_cond_init(&cond, nullptr);
+ msg->ptr= new(mem_heap_alloc(msg->heap, sizeof(fts_msg_del_t)))
+ fts_msg_del_t{table, &cond};
+ add_msg(msg);
+ my_cond_wait(&cond, &fts_optimize_wq->mutex.m_mutex);
+ pthread_cond_destroy(&cond);
+ ut_ad(!table->fts->in_queue);
+ }
- mutex_exit(&fts_optimize_wq->mutex);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
/** Add a table to fts_slots if it doesn't already exist. */
@@ -2714,9 +2656,10 @@ static bool fts_optimize_new_table(dict_table_t* table)
}
/** Remove a table from fts_slots if it exists.
-@param[in,out] table table to be removed from fts_slots */
-static bool fts_optimize_del_table(const dict_table_t* table)
+@param remove table to be removed from fts_slots */
+static bool fts_optimize_del_table(fts_msg_del_t *remove)
{
+ const dict_table_t* table = remove->table;
ut_ad(table);
for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
fts_slot_t* slot;
@@ -2729,14 +2672,18 @@ static bool fts_optimize_del_table(const dict_table_t* table)
<< table->name;
}
- mutex_enter(&fts_optimize_wq->mutex);
- slot->table->fts->in_queue = false;
- mutex_exit(&fts_optimize_wq->mutex);
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ table->fts->in_queue = false;
+ pthread_cond_signal(remove->cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
slot->table = NULL;
return true;
}
}
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ pthread_cond_signal(remove->cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
return false;
}
@@ -2818,19 +2765,21 @@ static void fts_optimize_sync_table(dict_table_t *table,
if (sync_table->fts && sync_table->fts->cache && sync_table->is_accessible())
{
- fts_sync_table(sync_table, false);
+ fts_sync_table(sync_table);
+
if (process_message)
{
- mutex_enter(&fts_optimize_wq->mutex);
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
sync_table->fts->sync_message = false;
- mutex_exit(&fts_optimize_wq->mutex);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
}
- DBUG_EXECUTE_IF("ib_optimize_wq_hang", os_thread_sleep(6000000););
+ DBUG_EXECUTE_IF("ib_optimize_wq_hang",
+ std::this_thread::sleep_for(std::chrono::seconds(6)););
if (mdl_ticket)
- dict_table_close(sync_table, false, false, fts_opt_thd, mdl_ticket);
+ dict_table_close(sync_table, false, fts_opt_thd, mdl_ticket);
}
/**********************************************************************//**
@@ -2840,15 +2789,16 @@ static void fts_optimize_callback(void *)
{
ut_ad(!srv_read_only_mode);
- if (!fts_optimize_wq) {
+ static ulint current;
+ static bool done;
+ static ulint n_optimize;
+
+ if (!fts_optimize_wq || done) {
/* Possibly timer initiated callback, can come after FTS_MSG_STOP.*/
return;
}
- static ulint current = 0;
- static ibool done = FALSE;
static ulint n_tables = ib_vector_size(fts_slots);
- static ulint n_optimize = 0;
while (!done && srv_shutdown_state <= SRV_SHUTDOWN_INITIATED) {
/* If there is no message in the queue and we have tables
@@ -2897,7 +2847,7 @@ retry_later:
switch (msg->type) {
case FTS_MSG_STOP:
- done = TRUE;
+ done = true;
break;
case FTS_MSG_ADD_TABLE:
@@ -2912,31 +2862,10 @@ retry_later:
case FTS_MSG_DEL_TABLE:
if (fts_optimize_del_table(
static_cast<fts_msg_del_t*>(
- msg->ptr)->table)) {
+ msg->ptr))) {
--n_tables;
}
-
- /* Signal the producer that we have
- removed the table. */
- os_event_set(
- ((fts_msg_del_t*) msg->ptr)->event);
- break;
-
- case FTS_MSG_SYNC_TABLE:
- if (UNIV_UNLIKELY(wsrep_sst_disable_writes)) {
- add_msg(msg);
- goto retry_later;
- }
-
- DBUG_EXECUTE_IF(
- "fts_instrument_msg_sync_sleep",
- os_thread_sleep(300000););
-
- fts_optimize_sync_table(
- static_cast<dict_table_t*>(msg->ptr),
- true);
break;
-
default:
ut_error;
}
@@ -2960,15 +2889,12 @@ retry_later:
}
ib_vector_free(fts_slots);
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
fts_slots = NULL;
+ pthread_cond_broadcast(&fts_opt_shutdown_cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
- ib_wqueue_free(fts_optimize_wq);
- fts_optimize_wq = NULL;
-
- destroy_background_thd(fts_opt_thd);
ib::info() << "FTS optimize thread exiting.";
-
- os_event_set(fts_opt_shutdown_event);
}
/**********************************************************************//**
@@ -2998,7 +2924,7 @@ fts_optimize_init(void)
/* Add fts tables to fts_slots which could be skipped
during dict_load_table_one() because fts_optimize_thread
wasn't even started. */
- mutex_enter(&dict_sys.mutex);
+ dict_sys.freeze(SRW_LOCK_CALL);
for (dict_table_t* table = UT_LIST_GET_FIRST(dict_sys.table_LRU);
table != NULL;
table = UT_LIST_GET_NEXT(table_LRU, table)) {
@@ -3013,9 +2939,9 @@ fts_optimize_init(void)
fts_optimize_new_table(table);
table->fts->in_queue = true;
}
- mutex_exit(&dict_sys.mutex);
+ dict_sys.unfreeze();
- fts_opt_shutdown_event = os_event_create(0);
+ pthread_cond_init(&fts_opt_shutdown_cond, nullptr);
last_check_sync_time = time(NULL);
}
@@ -3025,17 +2951,15 @@ fts_optimize_shutdown()
{
ut_ad(!srv_read_only_mode);
- fts_msg_t* msg;
-
/* If there is an ongoing activity on dictionary, such as
srv_master_evict_from_table_cache(), wait for it */
- dict_mutex_enter_for_mysql();
-
+ dict_sys.freeze(SRW_LOCK_CALL);
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
/* Tells FTS optimizer system that we are exiting from
optimizer thread, message send their after will not be
processed */
fts_opt_start_shutdown = true;
- dict_mutex_exit_for_mysql();
+ dict_sys.unfreeze();
/* We tell the OPTIMIZE thread to switch to state done, we
can't delete the work queue here because the add thread needs
@@ -3043,14 +2967,21 @@ fts_optimize_shutdown()
timer->disarm();
task_group.cancel_pending(&task);
- msg = fts_optimize_create_msg(FTS_MSG_STOP, NULL);
+ add_msg(fts_optimize_create_msg(FTS_MSG_STOP, nullptr));
- add_msg(msg);
-
- os_event_wait(fts_opt_shutdown_event);
+ while (fts_slots) {
+ my_cond_wait(&fts_opt_shutdown_cond,
+ &fts_optimize_wq->mutex.m_mutex);
+ }
- os_event_destroy(fts_opt_shutdown_event);
+ destroy_background_thd(fts_opt_thd);
fts_opt_thd = NULL;
+ pthread_cond_destroy(&fts_opt_shutdown_cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+
+ ib_wqueue_free(fts_optimize_wq);
+ fts_optimize_wq = NULL;
+
delete timer;
timer = NULL;
}
@@ -3061,17 +2992,15 @@ void fts_sync_during_ddl(dict_table_t* table)
{
if (!fts_optimize_wq)
return;
- mutex_enter(&fts_optimize_wq->mutex);
- if (!table->fts->sync_message)
- {
- mutex_exit(&fts_optimize_wq->mutex);
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ const auto sync_message= table->fts->sync_message;
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ if (!sync_message)
return;
- }
- mutex_exit(&fts_optimize_wq->mutex);
- fts_sync_table(table, false);
+ fts_sync_table(table);
- mutex_enter(&fts_optimize_wq->mutex);
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
table->fts->sync_message = false;
- mutex_exit(&fts_optimize_wq->mutex);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
}