summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThirunarayanan Balathandayuthapani <thiru@mariadb.com>2023-04-19 15:13:20 +0530
committerThirunarayanan Balathandayuthapani <thiru@mariadb.com>2023-04-19 15:13:39 +0530
commit5f7dfabdaa658ef77c5b511e25134b60e9a129c0 (patch)
tree070daf69ebc208a3ddb6ea051749062f727d75bb
parent9aa6e38ac52c99a4d8541af39aac31e3f84a3001 (diff)
downloadmariadb-git-5f7dfabdaa658ef77c5b511e25134b60e9a129c0.tar.gz
MDEV-30996 INSERT..SELECT in presence of fulltext indexbb-10.6-MDEV-30996
freezes all other commits at commit time - Introduced new variable innodb_fts_threads which processing fulltext message, optimization of fulltext table. Minimum value is 1, default value is 2 and maximum value is 255 - By having multiple fts threads, InnoDB can do sync of multiple table at the same time. - Introduce the class fts_slots_t, which can be used to store the fts table in the slot.
-rw-r--r--mysql-test/suite/sys_vars/r/sysvars_innodb.result12
-rw-r--r--storage/innobase/fts/fts0fts.cc42
-rw-r--r--storage/innobase/fts/fts0opt.cc676
-rw-r--r--storage/innobase/handler/ha_innodb.cc16
-rw-r--r--storage/innobase/include/fts0fts.h18
-rw-r--r--storage/innobase/include/fts0types.h5
-rw-r--r--storage/innobase/include/srv0srv.h1
7 files changed, 433 insertions, 337 deletions
diff --git a/mysql-test/suite/sys_vars/r/sysvars_innodb.result b/mysql-test/suite/sys_vars/r/sysvars_innodb.result
index e07725abbeb..c0e0ac70d47 100644
--- a/mysql-test/suite/sys_vars/r/sysvars_innodb.result
+++ b/mysql-test/suite/sys_vars/r/sysvars_innodb.result
@@ -763,6 +763,18 @@ NUMERIC_BLOCK_SIZE 0
ENUM_VALUE_LIST NULL
READ_ONLY YES
COMMAND_LINE_ARGUMENT REQUIRED
+VARIABLE_NAME INNODB_FTS_THREADS
+SESSION_VALUE NULL
+DEFAULT_VALUE 2
+VARIABLE_SCOPE GLOBAL
+VARIABLE_TYPE INT UNSIGNED
+VARIABLE_COMMENT Number of threads performing background fts operation
+NUMERIC_MIN_VALUE 1
+NUMERIC_MAX_VALUE 255
+NUMERIC_BLOCK_SIZE 0
+ENUM_VALUE_LIST NULL
+READ_ONLY NO
+COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME INNODB_FT_AUX_TABLE
SESSION_VALUE NULL
DEFAULT_VALUE
diff --git a/storage/innobase/fts/fts0fts.cc b/storage/innobase/fts/fts0fts.cc
index 94cdfd6da01..66853a4e688 100644
--- a/storage/innobase/fts/fts0fts.cc
+++ b/storage/innobase/fts/fts0fts.cc
@@ -4248,6 +4248,25 @@ static dberr_t fts_sync(fts_sync_t *sync, bool unlock_cache, bool wait)
sync->unlock_cache = unlock_cache;
sync->in_progress = true;
+ if (cache->total_size == 0)
+ {
+func_exit:
+ sync->in_progress = false;
+ pthread_cond_broadcast(&sync->cond);
+ mysql_mutex_unlock(&cache->lock);
+ /* We need to check whether an optimize is required, for
+ that we make copies of the two variables that control
+ the trigger. These variables can change behind our
+ back and we don't want to hold the lock for longer
+ than is needed. */
+ mysql_mutex_lock(&cache->deleted_lock);
+ cache->added = 0;
+ cache->deleted = 0;
+ mysql_mutex_unlock(&cache->deleted_lock);
+ DEBUG_SYNC_C("fts_sync_end");
+ return(error);
+ }
+
DEBUG_SYNC_C("fts_sync_begin");
fts_sync_begin(sync);
@@ -4309,27 +4328,11 @@ begin_sync:
} else {
err_exit:
fts_sync_rollback(sync);
- return error;
}
mysql_mutex_lock(&cache->lock);
ut_ad(sync->in_progress);
- sync->in_progress = false;
- pthread_cond_broadcast(&sync->cond);
- mysql_mutex_unlock(&cache->lock);
- /* We need to check whether an optimize is required, for that
- we make copies of the two variables that control the trigger. These
- variables can change behind our back and we don't want to hold the
- lock for longer than is needed. */
- mysql_mutex_lock(&cache->deleted_lock);
-
- cache->added = 0;
- cache->deleted = 0;
-
- mysql_mutex_unlock(&cache->deleted_lock);
-
- DEBUG_SYNC_C("fts_sync_end");
- return(error);
+ goto func_exit;
}
/** Run SYNC on the table, i.e., write out data from the cache to the
@@ -5253,7 +5256,8 @@ fts_t::fts_t(
added_synced(0), dict_locked(0),
add_wq(NULL),
cache(NULL),
- doc_col(ULINT_UNDEFINED), in_queue(false), sync_message(false),
+ doc_col(ULINT_UNDEFINED), wait_in_queue(false),
+ in_queue(false), sync_message(false), in_process(false),
fts_heap(heap)
{
ut_a(table->fts == NULL);
@@ -5263,6 +5267,7 @@ fts_t::fts_t(
indexes = ib_vector_create(heap_alloc, sizeof(dict_index_t*), 4);
dict_table_get_all_fts_indexes(table, indexes);
+ pthread_cond_init(&fts_queue_cond, nullptr);
}
/** fts_t destructor. */
@@ -5275,6 +5280,7 @@ fts_t::~fts_t()
fts_cache_destroy(cache);
}
+ pthread_cond_destroy(&fts_queue_cond);
/* There is no need to call ib_vector_free() on this->indexes
because it is stored in this->fts_heap. */
mem_heap_free(fts_heap);
diff --git a/storage/innobase/fts/fts0opt.cc b/storage/innobase/fts/fts0opt.cc
index 5702d3e8753..8f6bf513be8 100644
--- a/storage/innobase/fts/fts0opt.cc
+++ b/storage/innobase/fts/fts0opt.cc
@@ -38,6 +38,8 @@ Completed 2011/7/10 Sunny and Jimmy Yang
#include "fts0opt.h"
#include "fts0vlc.h"
#include "wsrep.h"
+#include <mutex>
+#include <thread>
#ifdef WITH_WSREP
extern Atomic_relaxed<bool> wsrep_sst_disable_writes;
@@ -47,19 +49,29 @@ constexpr bool wsrep_sst_disable_writes= false;
/** The FTS optimize thread's work queue. */
ib_wqueue_t* fts_optimize_wq;
-static void fts_optimize_callback(void *);
+static void fts_optimize_func(void *);
static void timer_callback(void*);
static tpool::timer* timer;
-static tpool::task_group task_group(1);
-static tpool::task task(fts_optimize_callback,0, &task_group);
+static tpool::task_group *task_group= nullptr;
+static tpool::task *task= nullptr;
+
+/** Mutex to protect srv_n_fts_threads_entered */
+std::mutex fts_thread_mutex;
+
+/** Number of innodb fts threads */
+extern uint innodb_n_fts_threads;
+
+/** Number of innodb fts threads currently executing
+fts_optimize_func() */
+static uint srv_n_fts_threads_entered;
+
+/** FTS subsystem processed shutdown message */
+static bool fts_shutdown_processed;
/** FTS optimize thread, for MDL acquisition */
static THD *fts_opt_thd;
-/** The FTS vector to store fts_slot_t */
-static ib_vector_t* fts_slots;
-
/** Default optimize interval in secs. */
static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300;
@@ -201,6 +213,10 @@ struct fts_slot_t {
/** time(NULL) of latest successful fts_optimize_table() */
time_t completed;
+
+ fts_slot_t(dict_table_t *new_table):
+ table(new_table), running(false), added(0),
+ deleted(0), last_run(0), completed(0) {}
};
/** A table remove message for the FTS optimize thread. */
@@ -2384,48 +2400,24 @@ fts_optimize_reset_start_time(
return(error);
}
-/*********************************************************************//**
-Run OPTIMIZE on the given table by a background thread.
+/** Run OPTIMIZE on the given table by a background thread.
+@param table table to be optimized
+@param threshold processed optimization
@return DB_SUCCESS if all OK */
static MY_ATTRIBUTE((nonnull))
-dberr_t
-fts_optimize_table_bk(
-/*==================*/
- fts_slot_t* slot) /*!< in: table to optimiza */
+dberr_t fts_optimize_table_bk(dict_table_t *table, bool &threshold)
{
- const time_t now = time(NULL);
- const ulint interval = ulint(now - slot->last_run);
-
- /* Avoid optimizing tables that were optimized recently. */
- if (slot->last_run > 0
- && lint(interval) >= 0
- && interval < FTS_OPTIMIZE_INTERVAL_IN_SECS) {
-
- return(DB_SUCCESS);
- }
-
- dict_table_t* table = slot->table;
- dberr_t error;
-
- if (table->is_accessible()
- && table->fts && table->fts->cache
- && table->fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) {
- error = fts_optimize_table(table);
-
- slot->last_run = time(NULL);
-
- if (error == DB_SUCCESS) {
- slot->running = false;
- slot->completed = slot->last_run;
- }
- } else {
- /* Note time this run completed. */
- slot->last_run = now;
- error = DB_SUCCESS;
- }
-
- return(error);
+ dberr_t error= DB_SUCCESS;
+ if (table->is_accessible()
+ && table->fts && table->fts->cache
+ && table->fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD)
+ {
+ error = fts_optimize_table(table);
+ threshold = true;
+ }
+ return(error);
}
+
/*********************************************************************//**
Run OPTIMIZE on the given table.
@return DB_SUCCESS if all OK */
@@ -2551,7 +2543,7 @@ fts_optimize_create_msg(
static void add_msg(fts_msg_t *msg)
{
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
- srv_thread_pool->submit_task(&task);
+ srv_thread_pool->submit_task(task);
}
/**
@@ -2560,7 +2552,7 @@ will only recalculate is_sync_needed, in case the queue is empty.
*/
static void timer_callback(void*)
{
- srv_thread_pool->submit_task(&task);
+ srv_thread_pool->submit_task(task);
}
/** Add the table to add to the OPTIMIZER's list.
@@ -2580,9 +2572,15 @@ void fts_optimize_add_table(dict_table_t* table)
mysql_mutex_lock(&fts_optimize_wq->mutex);
+ if (table->fts->in_queue || table->fts->wait_in_queue) {
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ mem_heap_free(msg->heap);
+ return;
+ }
+
add_msg(msg);
- table->fts->in_queue = true;
+ table->fts->wait_in_queue = true;
mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
@@ -2609,19 +2607,28 @@ fts_optimize_remove_table(
mysql_mutex_lock(&fts_optimize_wq->mutex);
- if (table->fts->in_queue)
+ if (!table->fts->wait_in_queue && !table->fts->in_queue)
{
- fts_msg_t *msg= fts_optimize_create_msg(FTS_MSG_DEL_TABLE, nullptr);
- pthread_cond_t cond;
- pthread_cond_init(&cond, nullptr);
- msg->ptr= new(mem_heap_alloc(msg->heap, sizeof(fts_msg_del_t)))
- fts_msg_del_t{table, &cond};
- add_msg(msg);
- my_cond_wait(&cond, &fts_optimize_wq->mutex.m_mutex);
- pthread_cond_destroy(&cond);
- ut_ad(!table->fts->in_queue);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ return;
}
+ /* Make sure that InnoDB table was added in fts_slots */
+ while (!table->fts->in_queue || table->fts->sync_message
+ || table->fts->in_process)
+ my_cond_wait(&table->fts->fts_queue_cond,
+ &fts_optimize_wq->mutex.m_mutex);
+
+ fts_msg_t *msg= fts_optimize_create_msg(FTS_MSG_DEL_TABLE, nullptr);
+ pthread_cond_t cond;
+ pthread_cond_init(&cond, nullptr);
+ msg->ptr= new(mem_heap_alloc(msg->heap, sizeof(fts_msg_del_t)))
+ fts_msg_del_t{table, &cond};
+ add_msg(msg);
+ my_cond_wait(&cond, &fts_optimize_wq->mutex.m_mutex);
+ pthread_cond_destroy(&cond);
+ ut_ad(!table->fts->in_queue);
+
mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
@@ -2648,132 +2655,6 @@ void fts_optimize_request_sync_table(dict_table_t *table)
mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
-/** Add a table to fts_slots if it doesn't already exist. */
-static bool fts_optimize_new_table(dict_table_t* table)
-{
- ut_ad(table);
-
- ulint i;
- fts_slot_t* slot;
- fts_slot_t* empty = NULL;
-
- /* Search for duplicates, also find a free slot if one exists. */
- for (i = 0; i < ib_vector_size(fts_slots); ++i) {
-
- slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
-
- if (!slot->table) {
- empty = slot;
- } else if (slot->table == table) {
- /* Already exists in our optimize queue. */
- return false;
- }
- }
-
- slot = empty ? empty : static_cast<fts_slot_t*>(
- ib_vector_push(fts_slots, NULL));
-
- memset(slot, 0x0, sizeof(*slot));
-
- slot->table = table;
- return true;
-}
-
-/** Remove a table from fts_slots if it exists.
-@param remove table to be removed from fts_slots */
-static bool fts_optimize_del_table(fts_msg_del_t *remove)
-{
- const dict_table_t* table = remove->table;
- ut_ad(table);
- for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
- fts_slot_t* slot;
-
- slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
-
- if (slot->table == table) {
- if (UNIV_UNLIKELY(fts_enable_diag_print)) {
- ib::info() << "FTS Optimize Removing table "
- << table->name;
- }
-
- mysql_mutex_lock(&fts_optimize_wq->mutex);
- table->fts->in_queue = false;
- pthread_cond_signal(remove->cond);
- mysql_mutex_unlock(&fts_optimize_wq->mutex);
- slot->table = NULL;
- return true;
- }
- }
-
- mysql_mutex_lock(&fts_optimize_wq->mutex);
- pthread_cond_signal(remove->cond);
- mysql_mutex_unlock(&fts_optimize_wq->mutex);
- return false;
-}
-
-/**********************************************************************//**
-Calculate how many tables in fts_slots need to be optimized.
-@return no. of tables to optimize */
-static ulint fts_optimize_how_many()
-{
- ulint n_tables = 0;
- const time_t current_time = time(NULL);
-
- for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
- const fts_slot_t* slot = static_cast<const fts_slot_t*>(
- ib_vector_get_const(fts_slots, i));
- if (!slot->table) {
- continue;
- }
-
- const time_t end = slot->running
- ? slot->last_run : slot->completed;
- ulint interval = ulint(current_time - end);
-
- if (lint(interval) < 0
- || interval >= FTS_OPTIMIZE_INTERVAL_IN_SECS) {
- ++n_tables;
- }
- }
-
- return(n_tables);
-}
-
-/**********************************************************************//**
-Check if the total memory used by all FTS table exceeds the maximum limit.
-@return true if a sync is needed, false otherwise */
-static bool fts_is_sync_needed()
-{
- ulint total_memory = 0;
- const time_t now = time(NULL);
- double time_diff = difftime(now, last_check_sync_time);
-
- if (fts_need_sync || (time_diff >= 0 && time_diff < 5)) {
- return(false);
- }
-
- last_check_sync_time = now;
-
- for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
- const fts_slot_t* slot = static_cast<const fts_slot_t*>(
- ib_vector_get_const(fts_slots, i));
-
- if (!slot->table) {
- continue;
- }
-
- if (slot->table->fts && slot->table->fts->cache) {
- total_memory += slot->table->fts->cache->total_size;
- }
-
- if (total_memory > fts_max_total_cache_size) {
- return(true);
- }
- }
-
- return(false);
-}
-
/** Sync fts cache of a table
@param[in,out] table table to be synced
@param[in] process_message processing messages from fts_optimize_wq */
@@ -2785,7 +2666,16 @@ static void fts_optimize_sync_table(dict_table_t *table,
&mdl_ticket);
if (!sync_table)
+ {
+ if (process_message)
+ {
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ table->fts->sync_message= false;
+ pthread_cond_broadcast(&table->fts->fts_queue_cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ }
return;
+ }
if (sync_table->fts && sync_table->fts->cache && sync_table->is_accessible())
{
@@ -2795,6 +2685,7 @@ static void fts_optimize_sync_table(dict_table_t *table,
{
mysql_mutex_lock(&fts_optimize_wq->mutex);
sync_table->fts->sync_message = false;
+ pthread_cond_broadcast(&table->fts->fts_queue_cond);
mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
}
@@ -2806,134 +2697,286 @@ static void fts_optimize_sync_table(dict_table_t *table,
dict_table_close(sync_table, false, fts_opt_thd, mdl_ticket);
}
-/**********************************************************************//**
-Optimize all FTS tables.
-@return Dummy return */
-static void fts_optimize_callback(void *)
+class fts_slots_t
{
- ut_ad(!srv_read_only_mode);
-
- static ulint current;
- static bool done;
- static ulint n_optimize;
-
- if (!fts_optimize_wq || done) {
- /* Possibly timer initiated callback, can come after FTS_MSG_STOP.*/
- return;
- }
-
- static ulint n_tables = ib_vector_size(fts_slots);
-
- while (!done && srv_shutdown_state <= SRV_SHUTDOWN_INITIATED) {
- /* If there is no message in the queue and we have tables
- to optimize then optimize the tables. */
+ typedef std::vector<fts_slot_t*, ut_allocator<fts_slot_t*> > fts_slots_vec;
+ fts_slots_vec *m_fts_slots;
+ std::mutex m_mutex;
+ ulint m_n_optimize_tables;
+public:
+ fts_slots_t()
+ {
+ m_fts_slots= new fts_slots_vec();
+ }
- if (!done
- && ib_wqueue_is_empty(fts_optimize_wq)
- && n_tables > 0
- && n_optimize > 0) {
+ ~fts_slots_t()
+ {
+ for (fts_slots_vec::iterator it= m_fts_slots->begin();
+ it != m_fts_slots->end(); it++)
+ delete *it;
+ m_fts_slots->clear();
+ delete m_fts_slots;
+ }
- /* The queue is empty but we have tables
- to optimize. */
- if (UNIV_UNLIKELY(wsrep_sst_disable_writes)) {
-retry_later:
- if (fts_is_sync_needed()) {
- fts_need_sync = true;
- }
- if (n_tables) {
- timer->set_time(5000, 0);
- }
- return;
- }
+ ulint get_n_optimize_tables()
+ {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ return m_n_optimize_tables;
+ }
- fts_slot_t* slot = static_cast<fts_slot_t*>(
- ib_vector_get(fts_slots, current));
+ void add_new_table(dict_table_t *table)
+ {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ for (fts_slots_vec::iterator it= m_fts_slots->begin();
+ it != m_fts_slots->end(); it++)
+ if ((*it)->table == table) return;
+ m_fts_slots->push_back(new fts_slot_t(table));
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ table->fts->wait_in_queue = false;
+ table->fts->in_queue = true;
+ pthread_cond_broadcast(&table->fts->fts_queue_cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ }
- /* Handle the case of empty slots. */
- if (slot->table) {
- slot->running = true;
- fts_optimize_table_bk(slot);
- }
+ void delete_table(const dict_table_t *table)
+ {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ for (fts_slots_vec::iterator it= m_fts_slots->begin();
+ it != m_fts_slots->end(); it++)
+ {
+ if ((*it)->table == table)
+ {
+ m_fts_slots->erase(it);
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ table->fts->in_queue= false;
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ return;
+ }
+ }
+ }
- /* Wrap around the counter. */
- if (++current >= ib_vector_size(fts_slots)) {
- n_optimize = fts_optimize_how_many();
- current = 0;
- }
- } else if (n_optimize == 0
- || !ib_wqueue_is_empty(fts_optimize_wq)) {
- fts_msg_t* msg = static_cast<fts_msg_t*>
- (ib_wqueue_nowait(fts_optimize_wq));
- /* Timeout ? */
- if (!msg) {
- goto retry_later;
- }
+ void update_need_sync()
+ {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ const time_t current_time = time(NULL);
+ m_n_optimize_tables= 0;
+ for (fts_slots_vec::iterator it= m_fts_slots->begin();
+ it != m_fts_slots->end(); it++)
+ {
+ const fts_slot_t* slot= *it;
+ if (!slot->table) continue;
+ const time_t end= slot->running
+ ? slot->last_run : slot->completed;
+ ulint interval= ulint(current_time - end);
+ if (lint(interval) < 0 ||
+ interval >= FTS_OPTIMIZE_INTERVAL_IN_SECS)
+ ++m_n_optimize_tables;
+ }
+ }
+
+ bool is_sync_need()
+ {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ ulint total_memory= 0;
+ for (fts_slots_vec::iterator it= m_fts_slots->begin();
+ it != m_fts_slots->end(); it++)
+ {
+ const fts_slot_t* slot= *it;
+ if (!slot->table) continue;
+ if (slot->table->fts && slot->table->fts->cache)
+ total_memory+= slot->table->fts->cache->total_size;
+ if (total_memory > fts_max_total_cache_size) return true;
+ }
+ return false;
+ }
+
+ dict_table_t *get_optimize_table()
+ {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ if (m_fts_slots->empty()) return nullptr;
+ static ulint current= 0;
+ ulint size= m_fts_slots->size();
+ if (current >= size) current= 0;
+ ulint start_val= current;
+ bool iteration= false;
+read_slot:
+ /* Traversed the entire list */
+ if (start_val == current && iteration) { return nullptr; }
+ current++;
+ /* Reset and traverse from start */
+ if (current == size) current = 0;
+ iteration= true;
+ fts_slot_t *slot= m_fts_slots->at(current);
+ if (slot->table && !slot->running)
+ {
+ slot->running = true;
+ const time_t now = time(NULL);
+ const ulint interval = ulint(now - slot->last_run);
+ /* Avoid optimizing tables that were optimized recently. */
+ if (slot->last_run > 0 && lint(interval) >= 0
+ && interval < FTS_OPTIMIZE_INTERVAL_IN_SECS)
+ goto read_slot;
+ } else goto read_slot;
+
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ slot->table->fts->in_process= true;
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ return slot->table;
+ }
+
+ void update_slot(dict_table_t *table, bool threshold, dberr_t *err)
+ {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ for (fts_slots_vec::iterator it= m_fts_slots->begin();
+ it != m_fts_slots->end(); it++)
+ {
+ fts_slot_t* slot= *it;
+ if (slot->table != table) continue;
+ else
+ {
+ slot->last_run= time(nullptr);
+ slot->running= false;
+
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ table->fts->in_process= false;
+ pthread_cond_broadcast(&table->fts->fts_queue_cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+
+ if (!threshold) return;
+ if (*err == DB_SUCCESS)
+ slot->completed= slot->last_run;
+ }
+ }
+ }
+
+ dict_table_t* pop_table()
+ {
+ std::unique_lock<std::mutex> lk(m_mutex);
+ if (m_fts_slots->empty()) return nullptr;
+ dict_table_t *table= (*m_fts_slots->begin())->table;
+ m_fts_slots->erase(m_fts_slots->begin());
+ return table;
+ }
- switch (msg->type) {
- case FTS_MSG_STOP:
- done = true;
- break;
+};
- case FTS_MSG_ADD_TABLE:
- ut_a(!done);
- if (fts_optimize_new_table(
- static_cast<dict_table_t*>(
- msg->ptr))) {
- ++n_tables;
- }
- break;
+fts_slots_t *fts_table_slots;
- case FTS_MSG_DEL_TABLE:
- if (fts_optimize_del_table(
- static_cast<fts_msg_del_t*>(
- msg->ptr))) {
- --n_tables;
- }
- break;
- case FTS_MSG_SYNC_TABLE:
- if (UNIV_UNLIKELY(wsrep_sst_disable_writes)) {
- add_msg(msg);
- goto retry_later;
- }
+static void fts_process_msg(fts_msg_t *msg)
+{
+ switch (msg->type)
+ {
+ case FTS_MSG_STOP:
+ {
+ std::unique_lock<std::mutex> lk(fts_thread_mutex);
+ fts_shutdown_processed= true;
+ }
+ break;
+ case FTS_MSG_ADD_TABLE:
+ fts_table_slots->add_new_table(
+ static_cast<dict_table_t*>(msg->ptr));
+ break;
+ case FTS_MSG_DEL_TABLE:
+ {
+ fts_msg_del_t* remove= static_cast<fts_msg_del_t*>(msg->ptr);
+ fts_table_slots->delete_table(remove->table);
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ pthread_cond_signal(remove->cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ break;
+ }
+ case FTS_MSG_SYNC_TABLE:
+ DBUG_EXECUTE_IF("fts_instrument_msg_sync_sleep",
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(300)););
+ fts_optimize_sync_table(
+ static_cast<dict_table_t*>(msg->ptr), true);
+ break;
+ default: ut_error;
+ }
+ mem_heap_free(msg->heap);
+ return;
+}
- DBUG_EXECUTE_IF(
- "fts_instrument_msg_sync_sleep",
- std::this_thread::sleep_for(
- std::chrono::milliseconds(
- 300)););
- fts_optimize_sync_table(
- static_cast<dict_table_t*>(
- msg->ptr), true);
- break;
- default:
- ut_error;
- }
+static void fts_optimize_func(void *)
+{
+ ut_ad(!srv_read_only_mode);
- mem_heap_free(msg->heap);
- n_optimize = done ? 0 : fts_optimize_how_many();
- }
- }
+ std::unique_lock<std::mutex> lk(fts_thread_mutex);
+ /** Acquire mutex to check fts_optimize_wq and how many
+ threads entered this function */
+ if (!fts_optimize_wq || !fts_table_slots)
+ /* Possibly timer initiated callback */
+ return;
- /* Server is being shutdown, sync the data from FTS cache to disk
- if needed */
- if (n_tables > 0) {
- for (ulint i = 0; i < ib_vector_size(fts_slots); i++) {
- fts_slot_t* slot = static_cast<fts_slot_t*>(
- ib_vector_get(fts_slots, i));
+ srv_n_fts_threads_entered++;
+ lk.unlock();
+ while (srv_shutdown_state <= SRV_SHUTDOWN_INITIATED)
+ {
+ lk.lock();
+ if (fts_shutdown_processed == true) { lk.unlock(); break; }
+ lk.unlock();
+ ulint n_opt_tables= fts_table_slots->get_n_optimize_tables();
+ if (ib_wqueue_is_empty(fts_optimize_wq))
+ {
+ /* Queue is empty, but we have table to optimize */
+ if (UNIV_UNLIKELY(wsrep_sst_disable_writes))
+ {
+retry_later:
+ if (fts_table_slots->is_sync_need()) fts_need_sync= true;
+ if (n_opt_tables) timer->set_time(5000, 0);
+ lk.lock();
+ srv_n_fts_threads_entered--;
+ return;
+ }
+ if (n_opt_tables == 0) goto retry_later;
+ else
+ {
+ dict_table_t* table= fts_table_slots->get_optimize_table();
+ if (!table) fts_table_slots->update_need_sync();
+ else
+ {
+ bool threshold= false;
+ dberr_t err= fts_optimize_table_bk(table, threshold);
+ fts_table_slots->update_slot(table, threshold, &err);
+ }
+ }
+ }
+ else
+ {
+ fts_msg_t* msg = static_cast<fts_msg_t*>(
+ ib_wqueue_nowait(fts_optimize_wq));
+ if (!msg) { goto retry_later; }
+ else fts_process_msg(msg);
+ }
+ }
- if (slot->table) {
- fts_optimize_sync_table(slot->table);
- }
- }
- }
+ /* Process all messages from fts_optimize_wq */
+ while (!ib_wqueue_is_empty(fts_optimize_wq))
+ {
+ fts_msg_t *msg= static_cast<fts_msg_t*>(
+ ib_wqueue_nowait(fts_optimize_wq));
+ if (!msg) continue;
+ fts_process_msg(msg);
+ }
- ib_vector_free(fts_slots);
- mysql_mutex_lock(&fts_optimize_wq->mutex);
- fts_slots = NULL;
- pthread_cond_broadcast(&fts_opt_shutdown_cond);
- mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ /* Sync all tables during shutdown */
+ while (dict_table_t *table= fts_table_slots->pop_table())
+ fts_optimize_sync_table(table);
- ib::info() << "FTS optimize thread exiting.";
+ /* Decrement the working threads */
+ lk.lock();
+ srv_n_fts_threads_entered--;
+ /* Free the slots and broadcast the shutdown signal */
+ if (srv_n_fts_threads_entered == 0)
+ {
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ delete fts_table_slots;
+ fts_table_slots= nullptr;
+ pthread_cond_broadcast(&fts_opt_shutdown_cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ ib::info() << "FTS optimize threads exiting.";
+ }
}
/**********************************************************************//**
@@ -2942,9 +2985,6 @@ void
fts_optimize_init(void)
/*===================*/
{
- mem_heap_t* heap;
- ib_alloc_t* heap_alloc;
-
ut_ad(!srv_read_only_mode);
/* For now we only support one optimize thread. */
@@ -2952,13 +2992,10 @@ fts_optimize_init(void)
/* Create FTS optimize work queue */
fts_optimize_wq = ib_wqueue_create();
+ fts_table_slots= new fts_slots_t();
+ fts_set_n_threads(innodb_n_fts_threads);
timer = srv_thread_pool->create_timer(timer_callback);
- /* Create FTS vector to store fts_slot_t */
- heap = mem_heap_create(sizeof(dict_table_t*) * 64);
- heap_alloc = ib_heap_allocator_create(heap);
- fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
-
fts_opt_thd = innobase_create_background_thd("InnoDB FTS optimizer");
/* Add fts tables to fts_slots which could be skipped
during dict_load_table_one() because fts_optimize_thread
@@ -2975,7 +3012,7 @@ fts_optimize_init(void)
need to acquire fts_optimize_wq->mutex for adding the fts
table to the fts slots. */
ut_ad(!table->can_be_evicted);
- fts_optimize_new_table(table);
+ fts_table_slots->add_new_table(table);
table->fts->in_queue = true;
}
dict_sys.unfreeze();
@@ -3004,11 +3041,10 @@ fts_optimize_shutdown()
can't delete the work queue here because the add thread needs
deregister the FTS tables. */
timer->disarm();
- task_group.cancel_pending(&task);
+ task_group->cancel_pending(task);
add_msg(fts_optimize_create_msg(FTS_MSG_STOP, nullptr));
-
- while (fts_slots) {
+ while (fts_table_slots) {
my_cond_wait(&fts_opt_shutdown_cond,
&fts_optimize_wq->mutex.m_mutex);
}
@@ -3021,6 +3057,8 @@ fts_optimize_shutdown()
ib_wqueue_free(fts_optimize_wq);
fts_optimize_wq = NULL;
+ delete task_group;
+ delete task;
delete timer;
timer = NULL;
}
@@ -3031,15 +3069,21 @@ void fts_sync_during_ddl(dict_table_t* table)
{
if (!fts_optimize_wq)
return;
- mysql_mutex_lock(&fts_optimize_wq->mutex);
- const auto sync_message= table->fts->sync_message;
- mysql_mutex_unlock(&fts_optimize_wq->mutex);
- if (!sync_message)
- return;
fts_sync_table(table, false);
+}
- mysql_mutex_lock(&fts_optimize_wq->mutex);
- table->fts->sync_message = false;
- mysql_mutex_unlock(&fts_optimize_wq->mutex);
+/* Use the mutex to set the number of fts threads */
+std::mutex fts_threads_cnt_mutex;
+void fts_set_n_threads(const uint new_cnt)
+{
+ std::unique_lock<std::mutex> lk(fts_threads_cnt_mutex);
+ if (task_group)
+ task_group->set_max_tasks(new_cnt);
+ else
+ {
+ task_group= new tpool::task_group(new_cnt);
+ task= new tpool::task(fts_optimize_func,0,task_group);
+ }
+ innodb_n_fts_threads= new_cnt;
}
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index f102789d7ab..3d7fd9a9969 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -206,6 +206,8 @@ my_bool innodb_evict_tables_on_commit_debug;
/** File format constraint for ALTER TABLE */
ulong innodb_instant_alter_column_allowed;
+uint innodb_n_fts_threads;
+
/** Note we cannot use rec_format_enum because we do not allow
COMPRESSED row format for innodb_default_row_format option. */
enum default_row_format_enum {
@@ -18603,6 +18605,12 @@ innodb_encrypt_tables_update(THD*, st_mysql_sys_var*, void*, const void* save)
mysql_mutex_lock(&LOCK_global_system_variables);
}
+static void innodb_fts_threads_update(
+ THD*,st_mysql_sys_var*,void*, const void* save)
+{
+ fts_set_n_threads(*static_cast<const uint*>(save));
+}
+
static SHOW_VAR innodb_status_variables_export[]= {
SHOW_FUNC_ENTRY("Innodb", &show_innodb_vars),
{NullS, NullS, SHOW_LONG}
@@ -19747,6 +19755,13 @@ static MYSQL_SYSVAR_BOOL(encrypt_temporary_tables, innodb_encrypt_temporary_tabl
"Enrypt the temporary table data.",
NULL, NULL, false);
+static MYSQL_SYSVAR_UINT(fts_threads, innodb_n_fts_threads,
+ PLUGIN_VAR_RQCMDARG,
+ "Number of threads performing background fts operation ",
+ NULL,
+ innodb_fts_threads_update,
+ 2, 1, 255, 0);
+
static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(autoextend_increment),
MYSQL_SYSVAR(buffer_pool_size),
@@ -19913,6 +19928,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(buf_dump_status_frequency),
MYSQL_SYSVAR(background_thread),
MYSQL_SYSVAR(encrypt_temporary_tables),
+ MYSQL_SYSVAR(fts_threads),
NULL
};
diff --git a/storage/innobase/include/fts0fts.h b/storage/innobase/include/fts0fts.h
index f88c7990ba9..f3c8ae76e0a 100644
--- a/storage/innobase/include/fts0fts.h
+++ b/storage/innobase/include/fts0fts.h
@@ -332,7 +332,11 @@ public:
/** Vector of FTS indexes, this is mainly for caching purposes. */
ib_vector_t* indexes;
- /** Whether the table exists in fts_optimize_wq;
+ /** Whether the addition of new table message in
+ fts_optimize_wq; protected by fts_optimize_wq mutex */
+ bool wait_in_queue;
+
+ /** Whether the table is in fts_optimize_wq;
protected by fts_optimize_wq mutex */
bool in_queue;
@@ -340,6 +344,15 @@ public:
protected by fts_optimize_wq mutex */
bool sync_message;
+ /** Whether the table is picked by fts threads
+ protected by fts_optimize_wq mutex */
+ bool in_process;
+
+ /** Condition variable to wake up the background thread
+ when table is in fts queue or sync or in_process
+ condition */
+ pthread_cond_t fts_queue_cond;
+
/** Heap for fts_t allocation. */
mem_heap_t* fts_heap;
};
@@ -943,3 +956,6 @@ fts_update_sync_doc_id(const dict_table_t *table,
/** Sync the table during commit phase
@param[in] table table to be synced */
void fts_sync_during_ddl(dict_table_t* table);
+
+/** Set the number of fts threads */
+void fts_set_n_threads(const uint new_cnt);
diff --git a/storage/innobase/include/fts0types.h b/storage/innobase/include/fts0types.h
index f3e22c72156..f8c9adf4c9e 100644
--- a/storage/innobase/include/fts0types.h
+++ b/storage/innobase/include/fts0types.h
@@ -175,8 +175,9 @@ struct fts_node_t {
ulint ilist_size_alloc;
/*!< Allocated size of ilist in
bytes */
- bool synced; /*!< flag whether the node is
-synced */
+ /** Flag to indicate whether node is synced.
+ Protected by cache->lock */
+ bool synced;
};
/** A tokenizer word. Contains information about one word. */
diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h
index 96cfe886c02..69af8de3732 100644
--- a/storage/innobase/include/srv0srv.h
+++ b/storage/innobase/include/srv0srv.h
@@ -383,6 +383,7 @@ extern ulong srv_max_purge_lag_delay;
extern my_bool innodb_encrypt_temporary_tables;
extern my_bool srv_immediate_scrub_data_uncompressed;
+
/*-------------------------------------------*/
/** Modes of operation */