diff options
author | Thirunarayanan Balathandayuthapani <thiru@mariadb.com> | 2023-04-19 15:13:20 +0530 |
---|---|---|
committer | Thirunarayanan Balathandayuthapani <thiru@mariadb.com> | 2023-04-19 15:13:39 +0530 |
commit | 5f7dfabdaa658ef77c5b511e25134b60e9a129c0 (patch) | |
tree | 070daf69ebc208a3ddb6ea051749062f727d75bb | |
parent | 9aa6e38ac52c99a4d8541af39aac31e3f84a3001 (diff) | |
download | mariadb-git-bb-10.6-MDEV-30996.tar.gz |
MDEV-30996 INSERT..SELECT in presence of fulltext indexbb-10.6-MDEV-30996
freezes all other commits at commit time
- Introduced new variable innodb_fts_threads which processing
fulltext message, optimization of fulltext table.
Minimum value is 1, default value is 2 and maximum value is 255
- By having multiple fts threads, InnoDB can do sync of multiple
table at the same time.
- Introduce the class fts_slots_t, which can be used to store
the fts table in the slot.
-rw-r--r-- | mysql-test/suite/sys_vars/r/sysvars_innodb.result | 12 | ||||
-rw-r--r-- | storage/innobase/fts/fts0fts.cc | 42 | ||||
-rw-r--r-- | storage/innobase/fts/fts0opt.cc | 676 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 16 | ||||
-rw-r--r-- | storage/innobase/include/fts0fts.h | 18 | ||||
-rw-r--r-- | storage/innobase/include/fts0types.h | 5 | ||||
-rw-r--r-- | storage/innobase/include/srv0srv.h | 1 |
7 files changed, 433 insertions, 337 deletions
diff --git a/mysql-test/suite/sys_vars/r/sysvars_innodb.result b/mysql-test/suite/sys_vars/r/sysvars_innodb.result index e07725abbeb..c0e0ac70d47 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_innodb.result +++ b/mysql-test/suite/sys_vars/r/sysvars_innodb.result @@ -763,6 +763,18 @@ NUMERIC_BLOCK_SIZE 0 ENUM_VALUE_LIST NULL READ_ONLY YES COMMAND_LINE_ARGUMENT REQUIRED +VARIABLE_NAME INNODB_FTS_THREADS +SESSION_VALUE NULL +DEFAULT_VALUE 2 +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE INT UNSIGNED +VARIABLE_COMMENT Number of threads performing background fts operation +NUMERIC_MIN_VALUE 1 +NUMERIC_MAX_VALUE 255 +NUMERIC_BLOCK_SIZE 0 +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT REQUIRED VARIABLE_NAME INNODB_FT_AUX_TABLE SESSION_VALUE NULL DEFAULT_VALUE diff --git a/storage/innobase/fts/fts0fts.cc b/storage/innobase/fts/fts0fts.cc index 94cdfd6da01..66853a4e688 100644 --- a/storage/innobase/fts/fts0fts.cc +++ b/storage/innobase/fts/fts0fts.cc @@ -4248,6 +4248,25 @@ static dberr_t fts_sync(fts_sync_t *sync, bool unlock_cache, bool wait) sync->unlock_cache = unlock_cache; sync->in_progress = true; + if (cache->total_size == 0) + { +func_exit: + sync->in_progress = false; + pthread_cond_broadcast(&sync->cond); + mysql_mutex_unlock(&cache->lock); + /* We need to check whether an optimize is required, for + that we make copies of the two variables that control + the trigger. These variables can change behind our + back and we don't want to hold the lock for longer + than is needed. */ + mysql_mutex_lock(&cache->deleted_lock); + cache->added = 0; + cache->deleted = 0; + mysql_mutex_unlock(&cache->deleted_lock); + DEBUG_SYNC_C("fts_sync_end"); + return(error); + } + DEBUG_SYNC_C("fts_sync_begin"); fts_sync_begin(sync); @@ -4309,27 +4328,11 @@ begin_sync: } else { err_exit: fts_sync_rollback(sync); - return error; } mysql_mutex_lock(&cache->lock); ut_ad(sync->in_progress); - sync->in_progress = false; - pthread_cond_broadcast(&sync->cond); - mysql_mutex_unlock(&cache->lock); - /* We need to check whether an optimize is required, for that - we make copies of the two variables that control the trigger. These - variables can change behind our back and we don't want to hold the - lock for longer than is needed. */ - mysql_mutex_lock(&cache->deleted_lock); - - cache->added = 0; - cache->deleted = 0; - - mysql_mutex_unlock(&cache->deleted_lock); - - DEBUG_SYNC_C("fts_sync_end"); - return(error); + goto func_exit; } /** Run SYNC on the table, i.e., write out data from the cache to the @@ -5253,7 +5256,8 @@ fts_t::fts_t( added_synced(0), dict_locked(0), add_wq(NULL), cache(NULL), - doc_col(ULINT_UNDEFINED), in_queue(false), sync_message(false), + doc_col(ULINT_UNDEFINED), wait_in_queue(false), + in_queue(false), sync_message(false), in_process(false), fts_heap(heap) { ut_a(table->fts == NULL); @@ -5263,6 +5267,7 @@ fts_t::fts_t( indexes = ib_vector_create(heap_alloc, sizeof(dict_index_t*), 4); dict_table_get_all_fts_indexes(table, indexes); + pthread_cond_init(&fts_queue_cond, nullptr); } /** fts_t destructor. */ @@ -5275,6 +5280,7 @@ fts_t::~fts_t() fts_cache_destroy(cache); } + pthread_cond_destroy(&fts_queue_cond); /* There is no need to call ib_vector_free() on this->indexes because it is stored in this->fts_heap. */ mem_heap_free(fts_heap); diff --git a/storage/innobase/fts/fts0opt.cc b/storage/innobase/fts/fts0opt.cc index 5702d3e8753..8f6bf513be8 100644 --- a/storage/innobase/fts/fts0opt.cc +++ b/storage/innobase/fts/fts0opt.cc @@ -38,6 +38,8 @@ Completed 2011/7/10 Sunny and Jimmy Yang #include "fts0opt.h" #include "fts0vlc.h" #include "wsrep.h" +#include <mutex> +#include <thread> #ifdef WITH_WSREP extern Atomic_relaxed<bool> wsrep_sst_disable_writes; @@ -47,19 +49,29 @@ constexpr bool wsrep_sst_disable_writes= false; /** The FTS optimize thread's work queue. */ ib_wqueue_t* fts_optimize_wq; -static void fts_optimize_callback(void *); +static void fts_optimize_func(void *); static void timer_callback(void*); static tpool::timer* timer; -static tpool::task_group task_group(1); -static tpool::task task(fts_optimize_callback,0, &task_group); +static tpool::task_group *task_group= nullptr; +static tpool::task *task= nullptr; + +/** Mutex to protect srv_n_fts_threads_entered */ +std::mutex fts_thread_mutex; + +/** Number of innodb fts threads */ +extern uint innodb_n_fts_threads; + +/** Number of innodb fts threads currently executing +fts_optimize_func() */ +static uint srv_n_fts_threads_entered; + +/** FTS subsystem processed shutdown message */ +static bool fts_shutdown_processed; /** FTS optimize thread, for MDL acquisition */ static THD *fts_opt_thd; -/** The FTS vector to store fts_slot_t */ -static ib_vector_t* fts_slots; - /** Default optimize interval in secs. */ static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300; @@ -201,6 +213,10 @@ struct fts_slot_t { /** time(NULL) of latest successful fts_optimize_table() */ time_t completed; + + fts_slot_t(dict_table_t *new_table): + table(new_table), running(false), added(0), + deleted(0), last_run(0), completed(0) {} }; /** A table remove message for the FTS optimize thread. */ @@ -2384,48 +2400,24 @@ fts_optimize_reset_start_time( return(error); } -/*********************************************************************//** -Run OPTIMIZE on the given table by a background thread. +/** Run OPTIMIZE on the given table by a background thread. +@param table table to be optimized +@param threshold processed optimization @return DB_SUCCESS if all OK */ static MY_ATTRIBUTE((nonnull)) -dberr_t -fts_optimize_table_bk( -/*==================*/ - fts_slot_t* slot) /*!< in: table to optimiza */ +dberr_t fts_optimize_table_bk(dict_table_t *table, bool &threshold) { - const time_t now = time(NULL); - const ulint interval = ulint(now - slot->last_run); - - /* Avoid optimizing tables that were optimized recently. */ - if (slot->last_run > 0 - && lint(interval) >= 0 - && interval < FTS_OPTIMIZE_INTERVAL_IN_SECS) { - - return(DB_SUCCESS); - } - - dict_table_t* table = slot->table; - dberr_t error; - - if (table->is_accessible() - && table->fts && table->fts->cache - && table->fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) { - error = fts_optimize_table(table); - - slot->last_run = time(NULL); - - if (error == DB_SUCCESS) { - slot->running = false; - slot->completed = slot->last_run; - } - } else { - /* Note time this run completed. */ - slot->last_run = now; - error = DB_SUCCESS; - } - - return(error); + dberr_t error= DB_SUCCESS; + if (table->is_accessible() + && table->fts && table->fts->cache + && table->fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) + { + error = fts_optimize_table(table); + threshold = true; + } + return(error); } + /*********************************************************************//** Run OPTIMIZE on the given table. @return DB_SUCCESS if all OK */ @@ -2551,7 +2543,7 @@ fts_optimize_create_msg( static void add_msg(fts_msg_t *msg) { ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true); - srv_thread_pool->submit_task(&task); + srv_thread_pool->submit_task(task); } /** @@ -2560,7 +2552,7 @@ will only recalculate is_sync_needed, in case the queue is empty. */ static void timer_callback(void*) { - srv_thread_pool->submit_task(&task); + srv_thread_pool->submit_task(task); } /** Add the table to add to the OPTIMIZER's list. @@ -2580,9 +2572,15 @@ void fts_optimize_add_table(dict_table_t* table) mysql_mutex_lock(&fts_optimize_wq->mutex); + if (table->fts->in_queue || table->fts->wait_in_queue) { + mysql_mutex_unlock(&fts_optimize_wq->mutex); + mem_heap_free(msg->heap); + return; + } + add_msg(msg); - table->fts->in_queue = true; + table->fts->wait_in_queue = true; mysql_mutex_unlock(&fts_optimize_wq->mutex); } @@ -2609,19 +2607,28 @@ fts_optimize_remove_table( mysql_mutex_lock(&fts_optimize_wq->mutex); - if (table->fts->in_queue) + if (!table->fts->wait_in_queue && !table->fts->in_queue) { - fts_msg_t *msg= fts_optimize_create_msg(FTS_MSG_DEL_TABLE, nullptr); - pthread_cond_t cond; - pthread_cond_init(&cond, nullptr); - msg->ptr= new(mem_heap_alloc(msg->heap, sizeof(fts_msg_del_t))) - fts_msg_del_t{table, &cond}; - add_msg(msg); - my_cond_wait(&cond, &fts_optimize_wq->mutex.m_mutex); - pthread_cond_destroy(&cond); - ut_ad(!table->fts->in_queue); + mysql_mutex_unlock(&fts_optimize_wq->mutex); + return; } + /* Make sure that InnoDB table was added in fts_slots */ + while (!table->fts->in_queue || table->fts->sync_message + || table->fts->in_process) + my_cond_wait(&table->fts->fts_queue_cond, + &fts_optimize_wq->mutex.m_mutex); + + fts_msg_t *msg= fts_optimize_create_msg(FTS_MSG_DEL_TABLE, nullptr); + pthread_cond_t cond; + pthread_cond_init(&cond, nullptr); + msg->ptr= new(mem_heap_alloc(msg->heap, sizeof(fts_msg_del_t))) + fts_msg_del_t{table, &cond}; + add_msg(msg); + my_cond_wait(&cond, &fts_optimize_wq->mutex.m_mutex); + pthread_cond_destroy(&cond); + ut_ad(!table->fts->in_queue); + mysql_mutex_unlock(&fts_optimize_wq->mutex); } @@ -2648,132 +2655,6 @@ void fts_optimize_request_sync_table(dict_table_t *table) mysql_mutex_unlock(&fts_optimize_wq->mutex); } -/** Add a table to fts_slots if it doesn't already exist. */ -static bool fts_optimize_new_table(dict_table_t* table) -{ - ut_ad(table); - - ulint i; - fts_slot_t* slot; - fts_slot_t* empty = NULL; - - /* Search for duplicates, also find a free slot if one exists. */ - for (i = 0; i < ib_vector_size(fts_slots); ++i) { - - slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i)); - - if (!slot->table) { - empty = slot; - } else if (slot->table == table) { - /* Already exists in our optimize queue. */ - return false; - } - } - - slot = empty ? empty : static_cast<fts_slot_t*>( - ib_vector_push(fts_slots, NULL)); - - memset(slot, 0x0, sizeof(*slot)); - - slot->table = table; - return true; -} - -/** Remove a table from fts_slots if it exists. -@param remove table to be removed from fts_slots */ -static bool fts_optimize_del_table(fts_msg_del_t *remove) -{ - const dict_table_t* table = remove->table; - ut_ad(table); - for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) { - fts_slot_t* slot; - - slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i)); - - if (slot->table == table) { - if (UNIV_UNLIKELY(fts_enable_diag_print)) { - ib::info() << "FTS Optimize Removing table " - << table->name; - } - - mysql_mutex_lock(&fts_optimize_wq->mutex); - table->fts->in_queue = false; - pthread_cond_signal(remove->cond); - mysql_mutex_unlock(&fts_optimize_wq->mutex); - slot->table = NULL; - return true; - } - } - - mysql_mutex_lock(&fts_optimize_wq->mutex); - pthread_cond_signal(remove->cond); - mysql_mutex_unlock(&fts_optimize_wq->mutex); - return false; -} - -/**********************************************************************//** -Calculate how many tables in fts_slots need to be optimized. -@return no. of tables to optimize */ -static ulint fts_optimize_how_many() -{ - ulint n_tables = 0; - const time_t current_time = time(NULL); - - for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) { - const fts_slot_t* slot = static_cast<const fts_slot_t*>( - ib_vector_get_const(fts_slots, i)); - if (!slot->table) { - continue; - } - - const time_t end = slot->running - ? slot->last_run : slot->completed; - ulint interval = ulint(current_time - end); - - if (lint(interval) < 0 - || interval >= FTS_OPTIMIZE_INTERVAL_IN_SECS) { - ++n_tables; - } - } - - return(n_tables); -} - -/**********************************************************************//** -Check if the total memory used by all FTS table exceeds the maximum limit. -@return true if a sync is needed, false otherwise */ -static bool fts_is_sync_needed() -{ - ulint total_memory = 0; - const time_t now = time(NULL); - double time_diff = difftime(now, last_check_sync_time); - - if (fts_need_sync || (time_diff >= 0 && time_diff < 5)) { - return(false); - } - - last_check_sync_time = now; - - for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) { - const fts_slot_t* slot = static_cast<const fts_slot_t*>( - ib_vector_get_const(fts_slots, i)); - - if (!slot->table) { - continue; - } - - if (slot->table->fts && slot->table->fts->cache) { - total_memory += slot->table->fts->cache->total_size; - } - - if (total_memory > fts_max_total_cache_size) { - return(true); - } - } - - return(false); -} - /** Sync fts cache of a table @param[in,out] table table to be synced @param[in] process_message processing messages from fts_optimize_wq */ @@ -2785,7 +2666,16 @@ static void fts_optimize_sync_table(dict_table_t *table, &mdl_ticket); if (!sync_table) + { + if (process_message) + { + mysql_mutex_lock(&fts_optimize_wq->mutex); + table->fts->sync_message= false; + pthread_cond_broadcast(&table->fts->fts_queue_cond); + mysql_mutex_unlock(&fts_optimize_wq->mutex); + } return; + } if (sync_table->fts && sync_table->fts->cache && sync_table->is_accessible()) { @@ -2795,6 +2685,7 @@ static void fts_optimize_sync_table(dict_table_t *table, { mysql_mutex_lock(&fts_optimize_wq->mutex); sync_table->fts->sync_message = false; + pthread_cond_broadcast(&table->fts->fts_queue_cond); mysql_mutex_unlock(&fts_optimize_wq->mutex); } } @@ -2806,134 +2697,286 @@ static void fts_optimize_sync_table(dict_table_t *table, dict_table_close(sync_table, false, fts_opt_thd, mdl_ticket); } -/**********************************************************************//** -Optimize all FTS tables. -@return Dummy return */ -static void fts_optimize_callback(void *) +class fts_slots_t { - ut_ad(!srv_read_only_mode); - - static ulint current; - static bool done; - static ulint n_optimize; - - if (!fts_optimize_wq || done) { - /* Possibly timer initiated callback, can come after FTS_MSG_STOP.*/ - return; - } - - static ulint n_tables = ib_vector_size(fts_slots); - - while (!done && srv_shutdown_state <= SRV_SHUTDOWN_INITIATED) { - /* If there is no message in the queue and we have tables - to optimize then optimize the tables. */ + typedef std::vector<fts_slot_t*, ut_allocator<fts_slot_t*> > fts_slots_vec; + fts_slots_vec *m_fts_slots; + std::mutex m_mutex; + ulint m_n_optimize_tables; +public: + fts_slots_t() + { + m_fts_slots= new fts_slots_vec(); + } - if (!done - && ib_wqueue_is_empty(fts_optimize_wq) - && n_tables > 0 - && n_optimize > 0) { + ~fts_slots_t() + { + for (fts_slots_vec::iterator it= m_fts_slots->begin(); + it != m_fts_slots->end(); it++) + delete *it; + m_fts_slots->clear(); + delete m_fts_slots; + } - /* The queue is empty but we have tables - to optimize. */ - if (UNIV_UNLIKELY(wsrep_sst_disable_writes)) { -retry_later: - if (fts_is_sync_needed()) { - fts_need_sync = true; - } - if (n_tables) { - timer->set_time(5000, 0); - } - return; - } + ulint get_n_optimize_tables() + { + std::unique_lock<std::mutex> lk(m_mutex); + return m_n_optimize_tables; + } - fts_slot_t* slot = static_cast<fts_slot_t*>( - ib_vector_get(fts_slots, current)); + void add_new_table(dict_table_t *table) + { + std::unique_lock<std::mutex> lk(m_mutex); + for (fts_slots_vec::iterator it= m_fts_slots->begin(); + it != m_fts_slots->end(); it++) + if ((*it)->table == table) return; + m_fts_slots->push_back(new fts_slot_t(table)); + mysql_mutex_lock(&fts_optimize_wq->mutex); + table->fts->wait_in_queue = false; + table->fts->in_queue = true; + pthread_cond_broadcast(&table->fts->fts_queue_cond); + mysql_mutex_unlock(&fts_optimize_wq->mutex); + } - /* Handle the case of empty slots. */ - if (slot->table) { - slot->running = true; - fts_optimize_table_bk(slot); - } + void delete_table(const dict_table_t *table) + { + std::unique_lock<std::mutex> lk(m_mutex); + for (fts_slots_vec::iterator it= m_fts_slots->begin(); + it != m_fts_slots->end(); it++) + { + if ((*it)->table == table) + { + m_fts_slots->erase(it); + mysql_mutex_lock(&fts_optimize_wq->mutex); + table->fts->in_queue= false; + mysql_mutex_unlock(&fts_optimize_wq->mutex); + return; + } + } + } - /* Wrap around the counter. */ - if (++current >= ib_vector_size(fts_slots)) { - n_optimize = fts_optimize_how_many(); - current = 0; - } - } else if (n_optimize == 0 - || !ib_wqueue_is_empty(fts_optimize_wq)) { - fts_msg_t* msg = static_cast<fts_msg_t*> - (ib_wqueue_nowait(fts_optimize_wq)); - /* Timeout ? */ - if (!msg) { - goto retry_later; - } + void update_need_sync() + { + std::unique_lock<std::mutex> lk(m_mutex); + const time_t current_time = time(NULL); + m_n_optimize_tables= 0; + for (fts_slots_vec::iterator it= m_fts_slots->begin(); + it != m_fts_slots->end(); it++) + { + const fts_slot_t* slot= *it; + if (!slot->table) continue; + const time_t end= slot->running + ? slot->last_run : slot->completed; + ulint interval= ulint(current_time - end); + if (lint(interval) < 0 || + interval >= FTS_OPTIMIZE_INTERVAL_IN_SECS) + ++m_n_optimize_tables; + } + } + + bool is_sync_need() + { + std::unique_lock<std::mutex> lk(m_mutex); + ulint total_memory= 0; + for (fts_slots_vec::iterator it= m_fts_slots->begin(); + it != m_fts_slots->end(); it++) + { + const fts_slot_t* slot= *it; + if (!slot->table) continue; + if (slot->table->fts && slot->table->fts->cache) + total_memory+= slot->table->fts->cache->total_size; + if (total_memory > fts_max_total_cache_size) return true; + } + return false; + } + + dict_table_t *get_optimize_table() + { + std::unique_lock<std::mutex> lk(m_mutex); + if (m_fts_slots->empty()) return nullptr; + static ulint current= 0; + ulint size= m_fts_slots->size(); + if (current >= size) current= 0; + ulint start_val= current; + bool iteration= false; +read_slot: + /* Traversed the entire list */ + if (start_val == current && iteration) { return nullptr; } + current++; + /* Reset and traverse from start */ + if (current == size) current = 0; + iteration= true; + fts_slot_t *slot= m_fts_slots->at(current); + if (slot->table && !slot->running) + { + slot->running = true; + const time_t now = time(NULL); + const ulint interval = ulint(now - slot->last_run); + /* Avoid optimizing tables that were optimized recently. */ + if (slot->last_run > 0 && lint(interval) >= 0 + && interval < FTS_OPTIMIZE_INTERVAL_IN_SECS) + goto read_slot; + } else goto read_slot; + + mysql_mutex_lock(&fts_optimize_wq->mutex); + slot->table->fts->in_process= true; + mysql_mutex_unlock(&fts_optimize_wq->mutex); + return slot->table; + } + + void update_slot(dict_table_t *table, bool threshold, dberr_t *err) + { + std::unique_lock<std::mutex> lk(m_mutex); + for (fts_slots_vec::iterator it= m_fts_slots->begin(); + it != m_fts_slots->end(); it++) + { + fts_slot_t* slot= *it; + if (slot->table != table) continue; + else + { + slot->last_run= time(nullptr); + slot->running= false; + + mysql_mutex_lock(&fts_optimize_wq->mutex); + table->fts->in_process= false; + pthread_cond_broadcast(&table->fts->fts_queue_cond); + mysql_mutex_unlock(&fts_optimize_wq->mutex); + + if (!threshold) return; + if (*err == DB_SUCCESS) + slot->completed= slot->last_run; + } + } + } + + dict_table_t* pop_table() + { + std::unique_lock<std::mutex> lk(m_mutex); + if (m_fts_slots->empty()) return nullptr; + dict_table_t *table= (*m_fts_slots->begin())->table; + m_fts_slots->erase(m_fts_slots->begin()); + return table; + } - switch (msg->type) { - case FTS_MSG_STOP: - done = true; - break; +}; - case FTS_MSG_ADD_TABLE: - ut_a(!done); - if (fts_optimize_new_table( - static_cast<dict_table_t*>( - msg->ptr))) { - ++n_tables; - } - break; +fts_slots_t *fts_table_slots; - case FTS_MSG_DEL_TABLE: - if (fts_optimize_del_table( - static_cast<fts_msg_del_t*>( - msg->ptr))) { - --n_tables; - } - break; - case FTS_MSG_SYNC_TABLE: - if (UNIV_UNLIKELY(wsrep_sst_disable_writes)) { - add_msg(msg); - goto retry_later; - } +static void fts_process_msg(fts_msg_t *msg) +{ + switch (msg->type) + { + case FTS_MSG_STOP: + { + std::unique_lock<std::mutex> lk(fts_thread_mutex); + fts_shutdown_processed= true; + } + break; + case FTS_MSG_ADD_TABLE: + fts_table_slots->add_new_table( + static_cast<dict_table_t*>(msg->ptr)); + break; + case FTS_MSG_DEL_TABLE: + { + fts_msg_del_t* remove= static_cast<fts_msg_del_t*>(msg->ptr); + fts_table_slots->delete_table(remove->table); + mysql_mutex_lock(&fts_optimize_wq->mutex); + pthread_cond_signal(remove->cond); + mysql_mutex_unlock(&fts_optimize_wq->mutex); + break; + } + case FTS_MSG_SYNC_TABLE: + DBUG_EXECUTE_IF("fts_instrument_msg_sync_sleep", + std::this_thread::sleep_for( + std::chrono::milliseconds(300));); + fts_optimize_sync_table( + static_cast<dict_table_t*>(msg->ptr), true); + break; + default: ut_error; + } + mem_heap_free(msg->heap); + return; +} - DBUG_EXECUTE_IF( - "fts_instrument_msg_sync_sleep", - std::this_thread::sleep_for( - std::chrono::milliseconds( - 300));); - fts_optimize_sync_table( - static_cast<dict_table_t*>( - msg->ptr), true); - break; - default: - ut_error; - } +static void fts_optimize_func(void *) +{ + ut_ad(!srv_read_only_mode); - mem_heap_free(msg->heap); - n_optimize = done ? 0 : fts_optimize_how_many(); - } - } + std::unique_lock<std::mutex> lk(fts_thread_mutex); + /** Acquire mutex to check fts_optimize_wq and how many + threads entered this function */ + if (!fts_optimize_wq || !fts_table_slots) + /* Possibly timer initiated callback */ + return; - /* Server is being shutdown, sync the data from FTS cache to disk - if needed */ - if (n_tables > 0) { - for (ulint i = 0; i < ib_vector_size(fts_slots); i++) { - fts_slot_t* slot = static_cast<fts_slot_t*>( - ib_vector_get(fts_slots, i)); + srv_n_fts_threads_entered++; + lk.unlock(); + while (srv_shutdown_state <= SRV_SHUTDOWN_INITIATED) + { + lk.lock(); + if (fts_shutdown_processed == true) { lk.unlock(); break; } + lk.unlock(); + ulint n_opt_tables= fts_table_slots->get_n_optimize_tables(); + if (ib_wqueue_is_empty(fts_optimize_wq)) + { + /* Queue is empty, but we have table to optimize */ + if (UNIV_UNLIKELY(wsrep_sst_disable_writes)) + { +retry_later: + if (fts_table_slots->is_sync_need()) fts_need_sync= true; + if (n_opt_tables) timer->set_time(5000, 0); + lk.lock(); + srv_n_fts_threads_entered--; + return; + } + if (n_opt_tables == 0) goto retry_later; + else + { + dict_table_t* table= fts_table_slots->get_optimize_table(); + if (!table) fts_table_slots->update_need_sync(); + else + { + bool threshold= false; + dberr_t err= fts_optimize_table_bk(table, threshold); + fts_table_slots->update_slot(table, threshold, &err); + } + } + } + else + { + fts_msg_t* msg = static_cast<fts_msg_t*>( + ib_wqueue_nowait(fts_optimize_wq)); + if (!msg) { goto retry_later; } + else fts_process_msg(msg); + } + } - if (slot->table) { - fts_optimize_sync_table(slot->table); - } - } - } + /* Process all messages from fts_optimize_wq */ + while (!ib_wqueue_is_empty(fts_optimize_wq)) + { + fts_msg_t *msg= static_cast<fts_msg_t*>( + ib_wqueue_nowait(fts_optimize_wq)); + if (!msg) continue; + fts_process_msg(msg); + } - ib_vector_free(fts_slots); - mysql_mutex_lock(&fts_optimize_wq->mutex); - fts_slots = NULL; - pthread_cond_broadcast(&fts_opt_shutdown_cond); - mysql_mutex_unlock(&fts_optimize_wq->mutex); + /* Sync all tables during shutdown */ + while (dict_table_t *table= fts_table_slots->pop_table()) + fts_optimize_sync_table(table); - ib::info() << "FTS optimize thread exiting."; + /* Decrement the working threads */ + lk.lock(); + srv_n_fts_threads_entered--; + /* Free the slots and broadcast the shutdown signal */ + if (srv_n_fts_threads_entered == 0) + { + mysql_mutex_lock(&fts_optimize_wq->mutex); + delete fts_table_slots; + fts_table_slots= nullptr; + pthread_cond_broadcast(&fts_opt_shutdown_cond); + mysql_mutex_unlock(&fts_optimize_wq->mutex); + ib::info() << "FTS optimize threads exiting."; + } } /**********************************************************************//** @@ -2942,9 +2985,6 @@ void fts_optimize_init(void) /*===================*/ { - mem_heap_t* heap; - ib_alloc_t* heap_alloc; - ut_ad(!srv_read_only_mode); /* For now we only support one optimize thread. */ @@ -2952,13 +2992,10 @@ fts_optimize_init(void) /* Create FTS optimize work queue */ fts_optimize_wq = ib_wqueue_create(); + fts_table_slots= new fts_slots_t(); + fts_set_n_threads(innodb_n_fts_threads); timer = srv_thread_pool->create_timer(timer_callback); - /* Create FTS vector to store fts_slot_t */ - heap = mem_heap_create(sizeof(dict_table_t*) * 64); - heap_alloc = ib_heap_allocator_create(heap); - fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4); - fts_opt_thd = innobase_create_background_thd("InnoDB FTS optimizer"); /* Add fts tables to fts_slots which could be skipped during dict_load_table_one() because fts_optimize_thread @@ -2975,7 +3012,7 @@ fts_optimize_init(void) need to acquire fts_optimize_wq->mutex for adding the fts table to the fts slots. */ ut_ad(!table->can_be_evicted); - fts_optimize_new_table(table); + fts_table_slots->add_new_table(table); table->fts->in_queue = true; } dict_sys.unfreeze(); @@ -3004,11 +3041,10 @@ fts_optimize_shutdown() can't delete the work queue here because the add thread needs deregister the FTS tables. */ timer->disarm(); - task_group.cancel_pending(&task); + task_group->cancel_pending(task); add_msg(fts_optimize_create_msg(FTS_MSG_STOP, nullptr)); - - while (fts_slots) { + while (fts_table_slots) { my_cond_wait(&fts_opt_shutdown_cond, &fts_optimize_wq->mutex.m_mutex); } @@ -3021,6 +3057,8 @@ fts_optimize_shutdown() ib_wqueue_free(fts_optimize_wq); fts_optimize_wq = NULL; + delete task_group; + delete task; delete timer; timer = NULL; } @@ -3031,15 +3069,21 @@ void fts_sync_during_ddl(dict_table_t* table) { if (!fts_optimize_wq) return; - mysql_mutex_lock(&fts_optimize_wq->mutex); - const auto sync_message= table->fts->sync_message; - mysql_mutex_unlock(&fts_optimize_wq->mutex); - if (!sync_message) - return; fts_sync_table(table, false); +} - mysql_mutex_lock(&fts_optimize_wq->mutex); - table->fts->sync_message = false; - mysql_mutex_unlock(&fts_optimize_wq->mutex); +/* Use the mutex to set the number of fts threads */ +std::mutex fts_threads_cnt_mutex; +void fts_set_n_threads(const uint new_cnt) +{ + std::unique_lock<std::mutex> lk(fts_threads_cnt_mutex); + if (task_group) + task_group->set_max_tasks(new_cnt); + else + { + task_group= new tpool::task_group(new_cnt); + task= new tpool::task(fts_optimize_func,0,task_group); + } + innodb_n_fts_threads= new_cnt; } diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index f102789d7ab..3d7fd9a9969 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -206,6 +206,8 @@ my_bool innodb_evict_tables_on_commit_debug; /** File format constraint for ALTER TABLE */ ulong innodb_instant_alter_column_allowed; +uint innodb_n_fts_threads; + /** Note we cannot use rec_format_enum because we do not allow COMPRESSED row format for innodb_default_row_format option. */ enum default_row_format_enum { @@ -18603,6 +18605,12 @@ innodb_encrypt_tables_update(THD*, st_mysql_sys_var*, void*, const void* save) mysql_mutex_lock(&LOCK_global_system_variables); } +static void innodb_fts_threads_update( + THD*,st_mysql_sys_var*,void*, const void* save) +{ + fts_set_n_threads(*static_cast<const uint*>(save)); +} + static SHOW_VAR innodb_status_variables_export[]= { SHOW_FUNC_ENTRY("Innodb", &show_innodb_vars), {NullS, NullS, SHOW_LONG} @@ -19747,6 +19755,13 @@ static MYSQL_SYSVAR_BOOL(encrypt_temporary_tables, innodb_encrypt_temporary_tabl "Enrypt the temporary table data.", NULL, NULL, false); +static MYSQL_SYSVAR_UINT(fts_threads, innodb_n_fts_threads, + PLUGIN_VAR_RQCMDARG, + "Number of threads performing background fts operation ", + NULL, + innodb_fts_threads_update, + 2, 1, 255, 0); + static struct st_mysql_sys_var* innobase_system_variables[]= { MYSQL_SYSVAR(autoextend_increment), MYSQL_SYSVAR(buffer_pool_size), @@ -19913,6 +19928,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= { MYSQL_SYSVAR(buf_dump_status_frequency), MYSQL_SYSVAR(background_thread), MYSQL_SYSVAR(encrypt_temporary_tables), + MYSQL_SYSVAR(fts_threads), NULL }; diff --git a/storage/innobase/include/fts0fts.h b/storage/innobase/include/fts0fts.h index f88c7990ba9..f3c8ae76e0a 100644 --- a/storage/innobase/include/fts0fts.h +++ b/storage/innobase/include/fts0fts.h @@ -332,7 +332,11 @@ public: /** Vector of FTS indexes, this is mainly for caching purposes. */ ib_vector_t* indexes; - /** Whether the table exists in fts_optimize_wq; + /** Whether the addition of new table message in + fts_optimize_wq; protected by fts_optimize_wq mutex */ + bool wait_in_queue; + + /** Whether the table is in fts_optimize_wq; protected by fts_optimize_wq mutex */ bool in_queue; @@ -340,6 +344,15 @@ public: protected by fts_optimize_wq mutex */ bool sync_message; + /** Whether the table is picked by fts threads + protected by fts_optimize_wq mutex */ + bool in_process; + + /** Condition variable to wake up the background thread + when table is in fts queue or sync or in_process + condition */ + pthread_cond_t fts_queue_cond; + /** Heap for fts_t allocation. */ mem_heap_t* fts_heap; }; @@ -943,3 +956,6 @@ fts_update_sync_doc_id(const dict_table_t *table, /** Sync the table during commit phase @param[in] table table to be synced */ void fts_sync_during_ddl(dict_table_t* table); + +/** Set the number of fts threads */ +void fts_set_n_threads(const uint new_cnt); diff --git a/storage/innobase/include/fts0types.h b/storage/innobase/include/fts0types.h index f3e22c72156..f8c9adf4c9e 100644 --- a/storage/innobase/include/fts0types.h +++ b/storage/innobase/include/fts0types.h @@ -175,8 +175,9 @@ struct fts_node_t { ulint ilist_size_alloc; /*!< Allocated size of ilist in bytes */ - bool synced; /*!< flag whether the node is -synced */ + /** Flag to indicate whether node is synced. + Protected by cache->lock */ + bool synced; }; /** A tokenizer word. Contains information about one word. */ diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h index 96cfe886c02..69af8de3732 100644 --- a/storage/innobase/include/srv0srv.h +++ b/storage/innobase/include/srv0srv.h @@ -383,6 +383,7 @@ extern ulong srv_max_purge_lag_delay; extern my_bool innodb_encrypt_temporary_tables; extern my_bool srv_immediate_scrub_data_uncompressed; + /*-------------------------------------------*/ /** Modes of operation */ |