diff options
Diffstat (limited to 'storage/innobase/fts/fts0opt.cc')
-rw-r--r-- | storage/innobase/fts/fts0opt.cc | 301 |
1 files changed, 115 insertions, 186 deletions
diff --git a/storage/innobase/fts/fts0opt.cc b/storage/innobase/fts/fts0opt.cc index 9e3b4b3121d..7c40a25e6e7 100644 --- a/storage/innobase/fts/fts0opt.cc +++ b/storage/innobase/fts/fts0opt.cc @@ -66,8 +66,9 @@ static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300; /** Server is shutting down, so does we exiting the optimize thread */ static bool fts_opt_start_shutdown = false; -/** Event to wait for shutdown of the optimize thread */ -static os_event_t fts_opt_shutdown_event = NULL; +/** Condition variable for shutting down the optimize thread. +Protected by fts_optimize_wq->mutex. */ +static pthread_cond_t fts_opt_shutdown_cond; /** Initial size of nodes in fts_word_t. */ static const ulint FTS_WORD_NODES_INIT_SIZE = 64; @@ -82,9 +83,8 @@ enum fts_msg_type_t { FTS_MSG_ADD_TABLE, /*!< Add table to the optimize thread's work queue */ - FTS_MSG_DEL_TABLE, /*!< Remove a table from the optimize + FTS_MSG_DEL_TABLE /*!< Remove a table from the optimize threads work queue */ - FTS_MSG_SYNC_TABLE /*!< Sync fts cache of a table */ }; /** Compressed list of words that have been read from FTS INDEX @@ -203,12 +203,12 @@ struct fts_slot_t { }; /** A table remove message for the FTS optimize thread. */ -struct fts_msg_del_t { - dict_table_t* table; /*!< The table to remove */ - - os_event_t event; /*!< Event to synchronize acknowledgement - of receipt and processing of the - this message by the consumer */ +struct fts_msg_del_t +{ + /** the table to remove */ + dict_table_t *table; + /** condition variable to signal message consumption */ + pthread_cond_t *cond; }; /** The FTS optimize message work queue message type. */ @@ -899,7 +899,7 @@ fts_index_fetch_words( } } - fts_que_graph_free(graph); + que_graph_free(graph); /* Check if max word to fetch is exceeded */ if (optim->zip->n_words >= n_words) { @@ -1012,10 +1012,7 @@ fts_table_fetch_doc_ids( error = fts_eval_sql(trx, graph); fts_sql_commit(trx); - - mutex_enter(&dict_sys.mutex); que_graph_free(graph); - mutex_exit(&dict_sys.mutex); if (error == DB_SUCCESS) { ib_vector_sort(doc_ids->doc_ids, fts_doc_id_cmp); @@ -1470,7 +1467,7 @@ fts_optimize_write_word( " when deleting a word from the FTS index."; } - fts_que_graph_free(graph); + que_graph_free(graph); graph = NULL; /* Even if the operation needs to be rolled back and redone, @@ -1502,7 +1499,7 @@ fts_optimize_write_word( } if (graph != NULL) { - fts_que_graph_free(graph); + que_graph_free(graph); } return(error); @@ -1614,8 +1611,21 @@ fts_optimize_create( optim->fts_index_table.table = table; /* The common prefix for all this parent table's aux tables. */ - optim->name_prefix = fts_get_table_name_prefix( - &optim->fts_common_table); + char table_id[FTS_AUX_MIN_TABLE_ID_LENGTH]; + const size_t table_id_len = 1 + + size_t(fts_get_table_id(&optim->fts_common_table, table_id)); + dict_sys.freeze(SRW_LOCK_CALL); + /* Include the separator as well. */ + const size_t dbname_len = table->name.dblen() + 1; + ut_ad(dbname_len > 1); + const size_t prefix_name_len = dbname_len + 4 + table_id_len; + char* prefix_name = static_cast<char*>( + ut_malloc_nokey(prefix_name_len)); + memcpy(prefix_name, table->name.m_name, dbname_len); + dict_sys.unfreeze(); + memcpy(prefix_name + dbname_len, "FTS_", 4); + memcpy(prefix_name + dbname_len + 4, table_id, table_id_len); + optim->name_prefix =prefix_name; return(optim); } @@ -1827,7 +1837,7 @@ fts_optimize_words( charset, word->f_str, word->f_len) && graph) { - fts_que_graph_free(graph); + que_graph_free(graph); graph = NULL; } } @@ -1846,7 +1856,7 @@ fts_optimize_words( } if (graph != NULL) { - fts_que_graph_free(graph); + que_graph_free(graph); } } @@ -2079,7 +2089,7 @@ fts_optimize_purge_deleted_doc_ids( } } - fts_que_graph_free(graph); + que_graph_free(graph); return(error); } @@ -2116,7 +2126,7 @@ fts_optimize_purge_deleted_doc_id_snapshot( graph = fts_parse_sql(NULL, info, fts_end_delete_sql); error = fts_eval_sql(optim->trx, graph); - fts_que_graph_free(graph); + que_graph_free(graph); return(error); } @@ -2184,7 +2194,7 @@ fts_optimize_create_deleted_doc_id_snapshot( error = fts_eval_sql(optim->trx, graph); - fts_que_graph_free(graph); + que_graph_free(graph); if (error != DB_SUCCESS) { fts_sql_rollback(optim->trx); @@ -2537,9 +2547,9 @@ fts_optimize_create_msg( } /** Add message to wqueue, signal thread pool*/ -static void add_msg(fts_msg_t *msg, bool wq_locked= false) +static void add_msg(fts_msg_t *msg) { - ib_wqueue_add(fts_optimize_wq, msg, msg->heap, wq_locked); + ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true); srv_thread_pool->submit_task(&task); } @@ -2563,17 +2573,17 @@ void fts_optimize_add_table(dict_table_t* table) } /* Make sure table with FTS index cannot be evicted */ - dict_table_prevent_eviction(table); + dict_sys.prevent_eviction(table); msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table); - mutex_enter(&fts_optimize_wq->mutex); + mysql_mutex_lock(&fts_optimize_wq->mutex); - add_msg(msg, true); + add_msg(msg); table->fts->in_queue = true; - mutex_exit(&fts_optimize_wq->mutex); + mysql_mutex_unlock(&fts_optimize_wq->mutex); } /**********************************************************************//** @@ -2584,102 +2594,34 @@ fts_optimize_remove_table( /*======================*/ dict_table_t* table) /*!< in: table to remove */ { - fts_msg_t* msg; - os_event_t event; - fts_msg_del_t* remove; - - /* if the optimize system not yet initialized, return */ - if (!fts_optimize_wq) { - return; - } - - /* FTS optimizer thread is already exited */ - if (fts_opt_start_shutdown) { - ib::info() << "Try to remove table " << table->name - << " after FTS optimize thread exiting."; - /* If the table can't be removed then wait till - fts optimize thread shuts down */ - while (fts_optimize_wq) { - os_thread_sleep(10000); - } - return; - } - - mutex_enter(&fts_optimize_wq->mutex); - - if (!table->fts->in_queue) { - mutex_exit(&fts_optimize_wq->mutex); - return; - } - - msg = fts_optimize_create_msg(FTS_MSG_DEL_TABLE, NULL); - - /* We will wait on this event until signalled by the consumer. */ - event = os_event_create(0); - - remove = static_cast<fts_msg_del_t*>( - mem_heap_alloc(msg->heap, sizeof(*remove))); - - remove->table = table; - remove->event = event; - msg->ptr = remove; - - ut_ad(!mutex_own(&dict_sys.mutex)); - - add_msg(msg, true); - - mutex_exit(&fts_optimize_wq->mutex); - - os_event_wait(event); - - os_event_destroy(event); - -#ifdef UNIV_DEBUG - if (!fts_opt_start_shutdown) { - mutex_enter(&fts_optimize_wq->mutex); - ut_ad(!table->fts->in_queue); - mutex_exit(&fts_optimize_wq->mutex); - } -#endif /* UNIV_DEBUG */ -} - -/** Send sync fts cache for the table. -@param[in] table table to sync */ -void -fts_optimize_request_sync_table( - dict_table_t* table) -{ - /* if the optimize system not yet initialized, return */ - if (!fts_optimize_wq) { - return; - } - - /* FTS optimizer thread is already exited */ - if (fts_opt_start_shutdown) { - ib::info() << "Try to sync table " << table->name - << " after FTS optimize thread exiting."; - return; - } - - mutex_enter(&fts_optimize_wq->mutex); - - if (table->fts->sync_message) { - /* If the table already has SYNC message in - fts_optimize_wq queue then ignore it */ - mutex_exit(&fts_optimize_wq->mutex); - return; - } - - fts_msg_t* msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, table); + if (!fts_optimize_wq) + return; - add_msg(msg, true); + if (fts_opt_start_shutdown) + { + ib::info() << "Try to remove table " << table->name + << " after FTS optimize thread exiting."; + while (fts_optimize_wq) + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + return; + } - table->fts->sync_message = true; + mysql_mutex_lock(&fts_optimize_wq->mutex); - DBUG_EXECUTE_IF("fts_optimize_wq_count_check", - DBUG_ASSERT(fts_optimize_wq->length <= 1000);); + if (table->fts->in_queue) + { + fts_msg_t *msg= fts_optimize_create_msg(FTS_MSG_DEL_TABLE, nullptr); + pthread_cond_t cond; + pthread_cond_init(&cond, nullptr); + msg->ptr= new(mem_heap_alloc(msg->heap, sizeof(fts_msg_del_t))) + fts_msg_del_t{table, &cond}; + add_msg(msg); + my_cond_wait(&cond, &fts_optimize_wq->mutex.m_mutex); + pthread_cond_destroy(&cond); + ut_ad(!table->fts->in_queue); + } - mutex_exit(&fts_optimize_wq->mutex); + mysql_mutex_unlock(&fts_optimize_wq->mutex); } /** Add a table to fts_slots if it doesn't already exist. */ @@ -2714,9 +2656,10 @@ static bool fts_optimize_new_table(dict_table_t* table) } /** Remove a table from fts_slots if it exists. -@param[in,out] table table to be removed from fts_slots */ -static bool fts_optimize_del_table(const dict_table_t* table) +@param remove table to be removed from fts_slots */ +static bool fts_optimize_del_table(fts_msg_del_t *remove) { + const dict_table_t* table = remove->table; ut_ad(table); for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) { fts_slot_t* slot; @@ -2729,14 +2672,18 @@ static bool fts_optimize_del_table(const dict_table_t* table) << table->name; } - mutex_enter(&fts_optimize_wq->mutex); - slot->table->fts->in_queue = false; - mutex_exit(&fts_optimize_wq->mutex); + mysql_mutex_lock(&fts_optimize_wq->mutex); + table->fts->in_queue = false; + pthread_cond_signal(remove->cond); + mysql_mutex_unlock(&fts_optimize_wq->mutex); slot->table = NULL; return true; } } + mysql_mutex_lock(&fts_optimize_wq->mutex); + pthread_cond_signal(remove->cond); + mysql_mutex_unlock(&fts_optimize_wq->mutex); return false; } @@ -2818,19 +2765,21 @@ static void fts_optimize_sync_table(dict_table_t *table, if (sync_table->fts && sync_table->fts->cache && sync_table->is_accessible()) { - fts_sync_table(sync_table, false); + fts_sync_table(sync_table); + if (process_message) { - mutex_enter(&fts_optimize_wq->mutex); + mysql_mutex_lock(&fts_optimize_wq->mutex); sync_table->fts->sync_message = false; - mutex_exit(&fts_optimize_wq->mutex); + mysql_mutex_unlock(&fts_optimize_wq->mutex); } } - DBUG_EXECUTE_IF("ib_optimize_wq_hang", os_thread_sleep(6000000);); + DBUG_EXECUTE_IF("ib_optimize_wq_hang", + std::this_thread::sleep_for(std::chrono::seconds(6));); if (mdl_ticket) - dict_table_close(sync_table, false, false, fts_opt_thd, mdl_ticket); + dict_table_close(sync_table, false, fts_opt_thd, mdl_ticket); } /**********************************************************************//** @@ -2840,15 +2789,16 @@ static void fts_optimize_callback(void *) { ut_ad(!srv_read_only_mode); - if (!fts_optimize_wq) { + static ulint current; + static bool done; + static ulint n_optimize; + + if (!fts_optimize_wq || done) { /* Possibly timer initiated callback, can come after FTS_MSG_STOP.*/ return; } - static ulint current = 0; - static ibool done = FALSE; static ulint n_tables = ib_vector_size(fts_slots); - static ulint n_optimize = 0; while (!done && srv_shutdown_state <= SRV_SHUTDOWN_INITIATED) { /* If there is no message in the queue and we have tables @@ -2897,7 +2847,7 @@ retry_later: switch (msg->type) { case FTS_MSG_STOP: - done = TRUE; + done = true; break; case FTS_MSG_ADD_TABLE: @@ -2912,31 +2862,10 @@ retry_later: case FTS_MSG_DEL_TABLE: if (fts_optimize_del_table( static_cast<fts_msg_del_t*>( - msg->ptr)->table)) { + msg->ptr))) { --n_tables; } - - /* Signal the producer that we have - removed the table. */ - os_event_set( - ((fts_msg_del_t*) msg->ptr)->event); - break; - - case FTS_MSG_SYNC_TABLE: - if (UNIV_UNLIKELY(wsrep_sst_disable_writes)) { - add_msg(msg); - goto retry_later; - } - - DBUG_EXECUTE_IF( - "fts_instrument_msg_sync_sleep", - os_thread_sleep(300000);); - - fts_optimize_sync_table( - static_cast<dict_table_t*>(msg->ptr), - true); break; - default: ut_error; } @@ -2960,15 +2889,12 @@ retry_later: } ib_vector_free(fts_slots); + mysql_mutex_lock(&fts_optimize_wq->mutex); fts_slots = NULL; + pthread_cond_broadcast(&fts_opt_shutdown_cond); + mysql_mutex_unlock(&fts_optimize_wq->mutex); - ib_wqueue_free(fts_optimize_wq); - fts_optimize_wq = NULL; - - destroy_background_thd(fts_opt_thd); ib::info() << "FTS optimize thread exiting."; - - os_event_set(fts_opt_shutdown_event); } /**********************************************************************//** @@ -2998,7 +2924,7 @@ fts_optimize_init(void) /* Add fts tables to fts_slots which could be skipped during dict_load_table_one() because fts_optimize_thread wasn't even started. */ - mutex_enter(&dict_sys.mutex); + dict_sys.freeze(SRW_LOCK_CALL); for (dict_table_t* table = UT_LIST_GET_FIRST(dict_sys.table_LRU); table != NULL; table = UT_LIST_GET_NEXT(table_LRU, table)) { @@ -3013,9 +2939,9 @@ fts_optimize_init(void) fts_optimize_new_table(table); table->fts->in_queue = true; } - mutex_exit(&dict_sys.mutex); + dict_sys.unfreeze(); - fts_opt_shutdown_event = os_event_create(0); + pthread_cond_init(&fts_opt_shutdown_cond, nullptr); last_check_sync_time = time(NULL); } @@ -3025,17 +2951,15 @@ fts_optimize_shutdown() { ut_ad(!srv_read_only_mode); - fts_msg_t* msg; - /* If there is an ongoing activity on dictionary, such as srv_master_evict_from_table_cache(), wait for it */ - dict_mutex_enter_for_mysql(); - + dict_sys.freeze(SRW_LOCK_CALL); + mysql_mutex_lock(&fts_optimize_wq->mutex); /* Tells FTS optimizer system that we are exiting from optimizer thread, message send their after will not be processed */ fts_opt_start_shutdown = true; - dict_mutex_exit_for_mysql(); + dict_sys.unfreeze(); /* We tell the OPTIMIZE thread to switch to state done, we can't delete the work queue here because the add thread needs @@ -3043,14 +2967,21 @@ fts_optimize_shutdown() timer->disarm(); task_group.cancel_pending(&task); - msg = fts_optimize_create_msg(FTS_MSG_STOP, NULL); + add_msg(fts_optimize_create_msg(FTS_MSG_STOP, nullptr)); - add_msg(msg); - - os_event_wait(fts_opt_shutdown_event); + while (fts_slots) { + my_cond_wait(&fts_opt_shutdown_cond, + &fts_optimize_wq->mutex.m_mutex); + } - os_event_destroy(fts_opt_shutdown_event); + destroy_background_thd(fts_opt_thd); fts_opt_thd = NULL; + pthread_cond_destroy(&fts_opt_shutdown_cond); + mysql_mutex_unlock(&fts_optimize_wq->mutex); + + ib_wqueue_free(fts_optimize_wq); + fts_optimize_wq = NULL; + delete timer; timer = NULL; } @@ -3061,17 +2992,15 @@ void fts_sync_during_ddl(dict_table_t* table) { if (!fts_optimize_wq) return; - mutex_enter(&fts_optimize_wq->mutex); - if (!table->fts->sync_message) - { - mutex_exit(&fts_optimize_wq->mutex); + mysql_mutex_lock(&fts_optimize_wq->mutex); + const auto sync_message= table->fts->sync_message; + mysql_mutex_unlock(&fts_optimize_wq->mutex); + if (!sync_message) return; - } - mutex_exit(&fts_optimize_wq->mutex); - fts_sync_table(table, false); + fts_sync_table(table); - mutex_enter(&fts_optimize_wq->mutex); + mysql_mutex_lock(&fts_optimize_wq->mutex); table->fts->sync_message = false; - mutex_exit(&fts_optimize_wq->mutex); + mysql_mutex_unlock(&fts_optimize_wq->mutex); } |