summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThirunarayanan Balathandayuthapani <thiru@mariadb.com>2023-04-04 10:47:20 +0530
committerThirunarayanan Balathandayuthapani <thiru@mariadb.com>2023-04-19 15:20:07 +0530
commit2fa4d72c40e7cab539f0fda525c0df5ecd198bf7 (patch)
tree725c36689b22393c308ce692f03ebf76f5756325
parent485a1b1f116f0c5e73fce3a97ffdac84c861b3c2 (diff)
downloadmariadb-git-bb-10.6-MDEV-30996_fts.tar.gz
MDEV-30996 insert.. select in presence of full text index freezes all other commits at commit timebb-10.6-MDEV-30996_fts
- When binlog is enabled, trasaction takes a lot of time to do sync operation on innodb fts table. This leads to block of other transaction commit. To avoid this failure, remove the fulltext sync operation during transaction commit. - This patch does the revert of MDEV-25581 and also introduces the multiple fts background threads to process and optimize the fts table. - Introduced the new variable innodb_fulltext_bg_threads which does background processing of message and optimization. Minimum value is 1 and Maximum value can be 255 - By introducing the above variable, InnoDB can do sync on multiple tables parallel
-rw-r--r--mysql-test/suite/innodb_fts/r/concurrent_insert.result2
-rw-r--r--mysql-test/suite/innodb_fts/r/sync.result16
-rw-r--r--mysql-test/suite/innodb_fts/r/sync_block.result83
-rw-r--r--mysql-test/suite/innodb_fts/t/concurrent_insert.test2
-rw-r--r--mysql-test/suite/innodb_fts/t/sync.test4
-rw-r--r--mysql-test/suite/innodb_fts/t/sync_block.test124
-rw-r--r--mysql-test/suite/sys_vars/r/innodb_fulltext_bg_threads.result43
-rw-r--r--mysql-test/suite/sys_vars/r/sysvars_innodb.result12
-rw-r--r--mysql-test/suite/sys_vars/t/innodb_fulltext_bg_threads.test45
-rw-r--r--storage/innobase/fts/fts0fts.cc224
-rw-r--r--storage/innobase/fts/fts0opt.cc790
-rw-r--r--storage/innobase/handler/ha_innodb.cc18
-rw-r--r--storage/innobase/include/fts0fts.h27
-rw-r--r--storage/innobase/include/fts0types.h2
-rw-r--r--storage/innobase/include/srv0srv.h2
15 files changed, 1036 insertions, 358 deletions
diff --git a/mysql-test/suite/innodb_fts/r/concurrent_insert.result b/mysql-test/suite/innodb_fts/r/concurrent_insert.result
index 2335982816b..bc47511b046 100644
--- a/mysql-test/suite/innodb_fts/r/concurrent_insert.result
+++ b/mysql-test/suite/innodb_fts/r/concurrent_insert.result
@@ -19,7 +19,7 @@ INSERT INTO t2 VALUES('mariadb');
connection default;
SET @saved_dbug = @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug ='+d,fts_instrument_sync_request,ib_optimize_wq_hang';
-SET DEBUG_SYNC= 'fts_sync_end
+SET DEBUG_SYNC= 'fts_instrument_sync_request
SIGNAL drop_index_start WAIT_FOR sync_op';
INSERT INTO t1 VALUES('Keyword');
connect con1,localhost,root,,,;
diff --git a/mysql-test/suite/innodb_fts/r/sync.result b/mysql-test/suite/innodb_fts/r/sync.result
index 74a5d2f13fb..928efffdb21 100644
--- a/mysql-test/suite/innodb_fts/r/sync.result
+++ b/mysql-test/suite/innodb_fts/r/sync.result
@@ -11,19 +11,19 @@ INSERT INTO t1(title) VALUES('database');
connection con1;
SET @old_dbug = @@SESSION.debug_dbug;
SET debug_dbug = '+d,fts_instrument_sync_debug';
-SET DEBUG_SYNC= 'fts_sync_end SIGNAL written WAIT_FOR selected';
+SET DEBUG_SYNC= 'fts_write_node SIGNAL written WAIT_FOR selected';
INSERT INTO t1(title) VALUES('mysql database');
connection default;
SET DEBUG_SYNC= 'now WAIT_FOR written';
SET GLOBAL innodb_ft_aux_table="test/t1";
SELECT * FROM INFORMATION_SCHEMA.INNODB_FT_INDEX_CACHE;
WORD FIRST_DOC_ID LAST_DOC_ID DOC_COUNT DOC_ID POSITION
-SELECT * FROM INFORMATION_SCHEMA.INNODB_FT_INDEX_TABLE;
-WORD FIRST_DOC_ID LAST_DOC_ID DOC_COUNT DOC_ID POSITION
database 2 3 2 2 0
database 2 3 2 3 6
mysql 1 3 2 1 0
mysql 1 3 2 3 0
+SELECT * FROM INFORMATION_SCHEMA.INNODB_FT_INDEX_TABLE;
+WORD FIRST_DOC_ID LAST_DOC_ID DOC_COUNT DOC_ID POSITION
SET GLOBAL innodb_ft_aux_table=default;
SELECT * FROM t1 WHERE MATCH(title) AGAINST('mysql database');
FTS_DOC_ID title
@@ -59,7 +59,7 @@ INSERT INTO t1(title) VALUES('mysql');
INSERT INTO t1(title) VALUES('database');
connection con1;
SET debug_dbug = '+d,fts_instrument_sync_debug';
-SET DEBUG_SYNC= 'fts_sync_end SIGNAL written WAIT_FOR inserted';
+SET DEBUG_SYNC= 'fts_write_node SIGNAL written WAIT_FOR inserted';
INSERT INTO t1(title) VALUES('mysql database');
connection default;
SET DEBUG_SYNC= 'now WAIT_FOR written';
@@ -70,14 +70,14 @@ SET debug_dbug = @old_dbug;
SET GLOBAL innodb_ft_aux_table="test/t1";
SELECT * FROM INFORMATION_SCHEMA.INNODB_FT_INDEX_CACHE;
WORD FIRST_DOC_ID LAST_DOC_ID DOC_COUNT DOC_ID POSITION
-database 4 4 1 4 6
-mysql 4 4 1 4 0
SELECT * FROM INFORMATION_SCHEMA.INNODB_FT_INDEX_TABLE;
WORD FIRST_DOC_ID LAST_DOC_ID DOC_COUNT DOC_ID POSITION
database 2 3 2 2 0
database 2 3 2 3 6
-mysql 1 3 2 1 0
-mysql 1 3 2 3 0
+database 4 4 1 4 6
+mysql 1 4 3 1 0
+mysql 1 4 3 3 0
+mysql 1 4 3 4 0
SET GLOBAL innodb_ft_aux_table=default;
SELECT * FROM t1 WHERE MATCH(title) AGAINST('mysql database');
FTS_DOC_ID title
diff --git a/mysql-test/suite/innodb_fts/r/sync_block.result b/mysql-test/suite/innodb_fts/r/sync_block.result
new file mode 100644
index 00000000000..65bee127e80
--- /dev/null
+++ b/mysql-test/suite/innodb_fts/r/sync_block.result
@@ -0,0 +1,83 @@
+SET @old_log_output = @@global.log_output;
+SET @old_slow_query_log = @@global.slow_query_log;
+SET @old_general_log = @@global.general_log;
+SET @old_long_query_time = @@global.long_query_time;
+SET @old_debug = @@global.debug_dbug;
+SET GLOBAL log_output = 'TABLE';
+SET GLOBAL general_log = 1;
+SET GLOBAL slow_query_log = 1;
+SET GLOBAL long_query_time = 1;
+connect con1,localhost,root,,;
+connect con2,localhost,root,,;
+connection default;
+# Case 1: Sync blocks DML(insert) on the same table.
+CREATE TABLE t1 (
+FTS_DOC_ID BIGINT UNSIGNED AUTO_INCREMENT NOT NULL PRIMARY KEY,
+title VARCHAR(200),
+FULLTEXT(title)
+) ENGINE = InnoDB;
+connection con1;
+SET GLOBAL debug_dbug='+d,fts_instrument_sync_debug,fts_instrument_sync_sleep';
+SET DEBUG_SYNC= 'fts_sync_begin SIGNAL begin WAIT_FOR continue';
+INSERT INTO t1(title) VALUES('mysql database');
+connection con2;
+SET DEBUG_SYNC= 'now WAIT_FOR begin';
+SELECT * FROM t1 WHERE MATCH(title) AGAINST('mysql database');
+connection default;
+SET DEBUG_SYNC= 'now SIGNAL continue';
+connection con1;
+/* connection con1 */ INSERT INTO t1(title) VALUES('mysql database');
+connection con2;
+/* conneciton con2 */ SELECT * FROM t1 WHERE MATCH(title) AGAINST('mysql database');
+FTS_DOC_ID title
+connection default;
+# make con1 & con2 show up in mysql.slow_log
+SELECT SLEEP(2);
+SLEEP(2)
+0
+# slow log results should only contain INSERT INTO t1.
+SELECT sql_text FROM mysql.slow_log WHERE query_time >= '00:00:02';
+sql_text
+INSERT INTO t1(title) VALUES('mysql database')
+SET GLOBAL debug_dbug = @old_debug;
+TRUNCATE TABLE mysql.slow_log;
+DROP TABLE t1;
+# Case 2: Sync blocks DML(insert) on other tables.
+CREATE TABLE t1 (
+FTS_DOC_ID BIGINT UNSIGNED AUTO_INCREMENT NOT NULL PRIMARY KEY,
+title VARCHAR(200),
+FULLTEXT(title)
+) ENGINE = InnoDB;
+CREATE TABLE t2(id INT);
+connection con1;
+SET GLOBAL debug_dbug='+d,fts_instrument_sync_request,fts_instrument_sync_sleep';
+SET DEBUG_SYNC= 'fts_instrument_sync_request SIGNAL begin WAIT_FOR continue';
+INSERT INTO t1(title) VALUES('mysql database');
+connection con2;
+SET DEBUG_SYNC= 'now WAIT_FOR begin';
+INSERT INTO t2 VALUES(1);
+connection default;
+SET DEBUG_SYNC= 'now SIGNAL continue';
+connection con1;
+/* connection con1 */ INSERT INTO t1(title) VALUES('mysql database');
+connection con2;
+/* conneciton con2 */ INSERT INTO t2 VALUES(1);
+connection default;
+SET DEBUG_SYNC = 'RESET';
+# make con1 & con2 show up in mysql.slow_log
+SELECT SLEEP(2);
+SLEEP(2)
+0
+# slow log results should be empty here.
+SELECT sql_text FROM mysql.slow_log WHERE query_time >= '00:00:02';
+sql_text
+SET GLOBAL debug_dbug = @old_debug;
+TRUNCATE TABLE mysql.slow_log;
+DROP TABLE t1,t2;
+disconnect con1;
+disconnect con2;
+# Restore slow log settings.
+SET GLOBAL log_output = @old_log_output;
+SET GLOBAL general_log = @old_general_log;
+SET GLOBAL slow_query_log = @old_slow_query_log;
+SET GLOBAL long_query_time = @old_long_query_time;
diff --git a/mysql-test/suite/innodb_fts/t/concurrent_insert.test b/mysql-test/suite/innodb_fts/t/concurrent_insert.test
index b6991f6e503..9b4d9517b1a 100644
--- a/mysql-test/suite/innodb_fts/t/concurrent_insert.test
+++ b/mysql-test/suite/innodb_fts/t/concurrent_insert.test
@@ -31,7 +31,7 @@ INSERT INTO t2 VALUES('mariadb');
connection default;
SET @saved_dbug = @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug ='+d,fts_instrument_sync_request,ib_optimize_wq_hang';
-SET DEBUG_SYNC= 'fts_sync_end
+SET DEBUG_SYNC= 'fts_instrument_sync_request
SIGNAL drop_index_start WAIT_FOR sync_op';
send INSERT INTO t1 VALUES('Keyword');
diff --git a/mysql-test/suite/innodb_fts/t/sync.test b/mysql-test/suite/innodb_fts/t/sync.test
index 7c5c835f2ee..168309a5c92 100644
--- a/mysql-test/suite/innodb_fts/t/sync.test
+++ b/mysql-test/suite/innodb_fts/t/sync.test
@@ -27,7 +27,7 @@ connection con1;
SET @old_dbug = @@SESSION.debug_dbug;
SET debug_dbug = '+d,fts_instrument_sync_debug';
-SET DEBUG_SYNC= 'fts_sync_end SIGNAL written WAIT_FOR selected';
+SET DEBUG_SYNC= 'fts_write_node SIGNAL written WAIT_FOR selected';
send INSERT INTO t1(title) VALUES('mysql database');
@@ -74,7 +74,7 @@ connection con1;
SET debug_dbug = '+d,fts_instrument_sync_debug';
-SET DEBUG_SYNC= 'fts_sync_end SIGNAL written WAIT_FOR inserted';
+SET DEBUG_SYNC= 'fts_write_node SIGNAL written WAIT_FOR inserted';
send INSERT INTO t1(title) VALUES('mysql database');
diff --git a/mysql-test/suite/innodb_fts/t/sync_block.test b/mysql-test/suite/innodb_fts/t/sync_block.test
new file mode 100644
index 00000000000..895d2ba8a59
--- /dev/null
+++ b/mysql-test/suite/innodb_fts/t/sync_block.test
@@ -0,0 +1,124 @@
+#
+# BUG#22516559 MYSQL INSTANCE STALLS WHEN SYNCING FTS INDEX
+#
+
+--source include/have_innodb.inc
+--source include/have_debug.inc
+--source include/have_debug_sync.inc
+--source include/have_log_bin.inc
+--source include/count_sessions.inc
+
+SET @old_log_output = @@global.log_output;
+SET @old_slow_query_log = @@global.slow_query_log;
+SET @old_general_log = @@global.general_log;
+SET @old_long_query_time = @@global.long_query_time;
+SET @old_debug = @@global.debug_dbug;
+
+SET GLOBAL log_output = 'TABLE';
+SET GLOBAL general_log = 1;
+SET GLOBAL slow_query_log = 1;
+SET GLOBAL long_query_time = 1;
+
+connect (con1,localhost,root,,);
+connect (con2,localhost,root,,);
+connection default;
+
+--echo # Case 1: Sync blocks DML(insert) on the same table.
+CREATE TABLE t1 (
+ FTS_DOC_ID BIGINT UNSIGNED AUTO_INCREMENT NOT NULL PRIMARY KEY,
+ title VARCHAR(200),
+ FULLTEXT(title)
+) ENGINE = InnoDB;
+
+connection con1;
+
+SET GLOBAL debug_dbug='+d,fts_instrument_sync_debug,fts_instrument_sync_sleep';
+
+SET DEBUG_SYNC= 'fts_sync_begin SIGNAL begin WAIT_FOR continue';
+
+send INSERT INTO t1(title) VALUES('mysql database');
+
+connection con2;
+
+SET DEBUG_SYNC= 'now WAIT_FOR begin';
+
+send SELECT * FROM t1 WHERE MATCH(title) AGAINST('mysql database');
+
+connection default;
+SET DEBUG_SYNC= 'now SIGNAL continue';
+
+connection con1;
+--echo /* connection con1 */ INSERT INTO t1(title) VALUES('mysql database');
+--reap
+
+connection con2;
+--echo /* conneciton con2 */ SELECT * FROM t1 WHERE MATCH(title) AGAINST('mysql database');
+--reap
+
+connection default;
+-- echo # make con1 & con2 show up in mysql.slow_log
+SELECT SLEEP(2);
+-- echo # slow log results should only contain INSERT INTO t1.
+SELECT sql_text FROM mysql.slow_log WHERE query_time >= '00:00:02';
+
+SET GLOBAL debug_dbug = @old_debug;
+TRUNCATE TABLE mysql.slow_log;
+
+DROP TABLE t1;
+
+--echo # Case 2: Sync blocks DML(insert) on other tables.
+CREATE TABLE t1 (
+ FTS_DOC_ID BIGINT UNSIGNED AUTO_INCREMENT NOT NULL PRIMARY KEY,
+ title VARCHAR(200),
+ FULLTEXT(title)
+) ENGINE = InnoDB;
+
+CREATE TABLE t2(id INT);
+
+connection con1;
+
+SET GLOBAL debug_dbug='+d,fts_instrument_sync_request,fts_instrument_sync_sleep';
+
+SET DEBUG_SYNC= 'fts_instrument_sync_request SIGNAL begin WAIT_FOR continue';
+
+send INSERT INTO t1(title) VALUES('mysql database');
+
+connection con2;
+
+SET DEBUG_SYNC= 'now WAIT_FOR begin';
+
+send INSERT INTO t2 VALUES(1);
+
+connection default;
+SET DEBUG_SYNC= 'now SIGNAL continue';
+
+connection con1;
+--echo /* connection con1 */ INSERT INTO t1(title) VALUES('mysql database');
+--reap
+
+connection con2;
+--echo /* conneciton con2 */ INSERT INTO t2 VALUES(1);
+--reap
+
+connection default;
+SET DEBUG_SYNC = 'RESET';
+-- echo # make con1 & con2 show up in mysql.slow_log
+SELECT SLEEP(2);
+-- echo # slow log results should be empty here.
+SELECT sql_text FROM mysql.slow_log WHERE query_time >= '00:00:02';
+
+SET GLOBAL debug_dbug = @old_debug;
+TRUNCATE TABLE mysql.slow_log;
+
+DROP TABLE t1,t2;
+
+disconnect con1;
+disconnect con2;
+
+--source include/wait_until_count_sessions.inc
+
+-- echo # Restore slow log settings.
+SET GLOBAL log_output = @old_log_output;
+SET GLOBAL general_log = @old_general_log;
+SET GLOBAL slow_query_log = @old_slow_query_log;
+SET GLOBAL long_query_time = @old_long_query_time;
diff --git a/mysql-test/suite/sys_vars/r/innodb_fulltext_bg_threads.result b/mysql-test/suite/sys_vars/r/innodb_fulltext_bg_threads.result
new file mode 100644
index 00000000000..ae8a0adf4bc
--- /dev/null
+++ b/mysql-test/suite/sys_vars/r/innodb_fulltext_bg_threads.result
@@ -0,0 +1,43 @@
+SET @start_global_value = @@global.innodb_fulltext_bg_threads;
+select @@global.innodb_fulltext_bg_threads;
+@@global.innodb_fulltext_bg_threads
+2
+select @@session.innodb_fulltext_bg_threads;
+ERROR HY000: Variable 'innodb_fulltext_bg_threads' is a GLOBAL variable
+show global variables like 'innodb_fulltext_bg_threads';
+Variable_name Value
+innodb_fulltext_bg_threads 2
+show session variables like 'innodb_fulltext_bg_threads';
+Variable_name Value
+innodb_fulltext_bg_threads 2
+select * from information_schema.global_variables
+where variable_name='innodb_fulltext_bg_threads';
+VARIABLE_NAME VARIABLE_VALUE
+INNODB_FULLTEXT_BG_THREADS 2
+select * from information_schema.session_variables
+where variable_name='innodb_fulltext_bg_threads';
+VARIABLE_NAME VARIABLE_VALUE
+INNODB_FULLTEXT_BG_THREADS 2
+set global innodb_fulltext_bg_threads=5;
+select @@global.innodb_fulltext_bg_threads;
+@@global.innodb_fulltext_bg_threads
+5
+set global innodb_fulltext_bg_threads=2;
+select @@global.innodb_fulltext_bg_threads;
+@@global.innodb_fulltext_bg_threads
+2
+set session innodb_fulltext_bg_threads=1;
+ERROR HY000: Variable 'innodb_fulltext_bg_threads' is a GLOBAL variable and should be set with SET GLOBAL
+set global innodb_fulltext_bg_threads=1.1;
+ERROR 42000: Incorrect argument type to variable 'innodb_fulltext_bg_threads'
+set global innodb_fulltext_bg_threads=1e1;
+ERROR 42000: Incorrect argument type to variable 'innodb_fulltext_bg_threads'
+set global innodb_fulltext_bg_threads="foo";
+ERROR 42000: Incorrect argument type to variable 'innodb_fulltext_bg_threads'
+set global innodb_fulltext_bg_threads=0;
+Warnings:
+Warning 1292 Truncated incorrect innodb_fulltext_bg_threads value: '0'
+select @@global.innodb_fulltext_bg_threads;
+@@global.innodb_fulltext_bg_threads
+1
+SET @@global.innodb_fulltext_bg_threads = @start_global_value;
diff --git a/mysql-test/suite/sys_vars/r/sysvars_innodb.result b/mysql-test/suite/sys_vars/r/sysvars_innodb.result
index e07725abbeb..98e975d36de 100644
--- a/mysql-test/suite/sys_vars/r/sysvars_innodb.result
+++ b/mysql-test/suite/sys_vars/r/sysvars_innodb.result
@@ -907,6 +907,18 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT OPTIONAL
+VARIABLE_NAME INNODB_FULLTEXT_BG_THREADS
+SESSION_VALUE NULL
+DEFAULT_VALUE 2
+VARIABLE_SCOPE GLOBAL
+VARIABLE_TYPE INT UNSIGNED
+VARIABLE_COMMENT Number of threads performing background fulltext message process and optimization
+NUMERIC_MIN_VALUE 1
+NUMERIC_MAX_VALUE 255
+NUMERIC_BLOCK_SIZE 0
+ENUM_VALUE_LIST NULL
+READ_ONLY NO
+COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME INNODB_IMMEDIATE_SCRUB_DATA_UNCOMPRESSED
SESSION_VALUE NULL
DEFAULT_VALUE OFF
diff --git a/mysql-test/suite/sys_vars/t/innodb_fulltext_bg_threads.test b/mysql-test/suite/sys_vars/t/innodb_fulltext_bg_threads.test
new file mode 100644
index 00000000000..bf6576514b6
--- /dev/null
+++ b/mysql-test/suite/sys_vars/t/innodb_fulltext_bg_threads.test
@@ -0,0 +1,45 @@
+# Variable name: innodb_fulltext_bg_threads
+# Scope: Global
+# Access type: Dynamic
+# Data type: numeric
+
+--source include/have_innodb.inc
+
+SET @start_global_value = @@global.innodb_fulltext_bg_threads;
+
+#
+# exists as global only
+#
+select @@global.innodb_fulltext_bg_threads;
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+select @@session.innodb_fulltext_bg_threads;
+show global variables like 'innodb_fulltext_bg_threads';
+show session variables like 'innodb_fulltext_bg_threads';
+select * from information_schema.global_variables
+where variable_name='innodb_fulltext_bg_threads';
+select * from information_schema.session_variables
+where variable_name='innodb_fulltext_bg_threads';
+
+#
+# show that it's writable
+#
+set global innodb_fulltext_bg_threads=5;
+select @@global.innodb_fulltext_bg_threads;
+set global innodb_fulltext_bg_threads=2;
+select @@global.innodb_fulltext_bg_threads;
+--error ER_GLOBAL_VARIABLE
+set session innodb_fulltext_bg_threads=1;
+
+#
+# incorrect types
+#
+--error ER_WRONG_TYPE_FOR_VAR
+set global innodb_fulltext_bg_threads=1.1;
+--error ER_WRONG_TYPE_FOR_VAR
+set global innodb_fulltext_bg_threads=1e1;
+--error ER_WRONG_TYPE_FOR_VAR
+set global innodb_fulltext_bg_threads="foo";
+set global innodb_fulltext_bg_threads=0;
+select @@global.innodb_fulltext_bg_threads;
+
+SET @@global.innodb_fulltext_bg_threads = @start_global_value;
diff --git a/storage/innobase/fts/fts0fts.cc b/storage/innobase/fts/fts0fts.cc
index eed2eb72cd1..6ef53af9bf7 100644
--- a/storage/innobase/fts/fts0fts.cc
+++ b/storage/innobase/fts/fts0fts.cc
@@ -40,18 +40,22 @@ Full Text Search interface
/** The SYNC state of the cache. There is one instance of this struct
associated with each ADD thread. */
-struct fts_sync_t {
- /** Transaction used for SYNCing the cache to disk */
- trx_t *trx;
- /** Table with FTS index(es) */
- dict_table_t *table;
- /** Max size in bytes of the cache */
- ulint max_cache_size;
- /** The doc id at which the cache was noted as being
- full, we use this to set the upper_limit field */
- doc_id_t max_doc_id;
- /** SYNC start time; only used if fts_enable_diag_print */
- time_t start_time;
+struct fts_sync_t
+{
+ /** Transaction used for SYNCing the cache to disk */
+ trx_t *trx;
+ /** Table with FTS index(es) */
+ dict_table_t *table;
+ /** The doc id at which the cache was noted as being
+ full, we use this to set the upper_limit field */
+ doc_id_t max_doc_id;
+ /** Flag to indicate whether sync is in progress */
+ bool in_progress;
+ /** Flag whether unlock the cache when writing fts node */
+ bool unlock_cache;
+ /** condition variable for in_progress;
+ used with table->fts->cache->lock */
+ pthread_cond_t cond;
};
static const ulint FTS_MAX_ID_LEN = 32;
@@ -201,8 +205,15 @@ struct fts_tokenize_param_t {
/** Run SYNC on the table, i.e., write out data from the cache to the
FTS auxiliary INDEX table and clear the cache at the end.
@param[in,out] sync sync state
+@param[in] unlock_cache whether unlock cache lock when write node
+@param[in] wait whether wait when a sync is in progress
@return DB_SUCCESS if all OK */
-static dberr_t fts_sync(fts_sync_t *sync);
+static
+dberr_t
+fts_sync(
+ fts_sync_t* sync,
+ bool unlock_cache,
+ bool wait);
/****************************************************************//**
Release all resources help by the words rb tree e.g., the node ilist. */
@@ -275,6 +286,7 @@ fts_cache_destroy(fts_cache_t* cache)
mysql_mutex_destroy(&cache->init_lock);
mysql_mutex_destroy(&cache->deleted_lock);
mysql_mutex_destroy(&cache->doc_id_lock);
+ pthread_cond_destroy(&cache->sync->cond);
if (cache->stopword_info.cached_stopword) {
rbt_free(cache->stopword_info.cached_stopword);
@@ -643,6 +655,7 @@ fts_cache_create(
mem_heap_zalloc(heap, sizeof(fts_sync_t)));
cache->sync->table = table;
+ pthread_cond_init(&cache->sync->cond, nullptr);
/* Create the index cache vector that will hold the inverted indexes. */
cache->indexes = ib_vector_create(
@@ -1333,7 +1346,8 @@ fts_cache_add_doc(
ib_vector_last(word->nodes));
}
- if (!fts_node || fts_node->ilist_size > FTS_ILIST_MAX_SIZE
+ if (fts_node == NULL || fts_node->synced
+ || fts_node->ilist_size > FTS_ILIST_MAX_SIZE
|| doc_id < fts_node->last_doc_id) {
fts_node = static_cast<fts_node_t*>(
@@ -3320,7 +3334,7 @@ fts_add_doc_from_tuple(
if (cache->total_size > fts_max_cache_size / 5
|| fts_need_sync) {
- fts_sync(cache->sync);
+ fts_sync(cache->sync, true, false);
}
mtr_start(&mtr);
@@ -3356,7 +3370,6 @@ fts_add_doc_by_id(
dict_index_t* fts_id_index;
ibool is_id_cluster;
fts_cache_t* cache = ftt->table->fts->cache;
- bool need_sync= false;
ut_ad(cache->get_docs);
/* If Doc ID has been supplied by the user, then the table
@@ -3496,32 +3509,44 @@ fts_add_doc_by_id(
get_doc->index_cache,
doc_id, doc.tokens);
- /** FTS cache sync should happen
- frequently. Because user thread
- shouldn't hold the cache lock for
- longer time. So cache should sync
- whenever cache size exceeds 512 KB */
- need_sync =
- cache->total_size > 512*1024;
+ bool need_sync = !cache->sync->in_progress
+ && (fts_need_sync
+ || (cache->total_size
+ - cache->total_size_at_sync)
+ > fts_max_cache_size / 10);
+ if (need_sync) {
+ cache->total_size_at_sync =
+ cache->total_size;
+ }
mysql_mutex_unlock(&table->fts->cache->lock);
DBUG_EXECUTE_IF(
"fts_instrument_sync",
- fts_sync_table(table);
+ fts_optimize_request_sync_table(table);
+ mysql_mutex_lock(&cache->lock);
+ if (cache->sync->in_progress)
+ my_cond_wait(
+ &cache->sync->cond,
+ &cache->lock.m_mutex);
+ mysql_mutex_unlock(&cache->lock);
);
DBUG_EXECUTE_IF(
"fts_instrument_sync_debug",
- fts_sync(cache->sync);
+ fts_sync(cache->sync, true, true);
);
DEBUG_SYNC_C("fts_instrument_sync_request");
DBUG_EXECUTE_IF(
"fts_instrument_sync_request",
- need_sync= true;
+ fts_optimize_request_sync_table(table);
);
+ if (need_sync) {
+ fts_optimize_request_sync_table(table);
+ }
+
mtr_start(&mtr);
if (i < num_idx - 1) {
@@ -3547,10 +3572,6 @@ func_exit:
ut_free(pcur.old_rec_buf);
mem_heap_free(heap);
-
- if (need_sync) {
- fts_sync_table(table);
- }
}
@@ -3910,7 +3931,8 @@ static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
fts_sync_write_words(
trx_t* trx,
- fts_index_cache_t* index_cache)
+ fts_index_cache_t* index_cache,
+ bool unlock_cache)
{
fts_table_t fts_table;
ulint n_nodes = 0;
@@ -3952,6 +3974,17 @@ fts_sync_write_words(
fts_node_t* fts_node = static_cast<fts_node_t*>(
ib_vector_get(word->nodes, i));
+ if (fts_node->synced) {
+ continue;
+ } else {
+ fts_node->synced = true;
+ }
+
+ if (unlock_cache) {
+ mysql_mutex_unlock(
+ &table->fts->cache->lock);
+ }
+
error = fts_write_node(
trx, &index_cache->ins_graph[selected],
&fts_table, &word->text, fts_node);
@@ -3964,6 +3997,11 @@ fts_sync_write_words(
std::this_thread::sleep_for(
std::chrono::seconds(1)););
+ if (unlock_cache) {
+ mysql_mutex_lock(
+ &table->fts->cache->lock);
+ }
+
if (error != DB_SUCCESS) {
goto err_exit;
}
@@ -4001,8 +4039,6 @@ fts_sync_begin(
n_nodes = 0;
elapsed_time = 0;
- sync->start_time = time(NULL);
-
sync->trx = trx_create();
trx_start_internal(sync->trx);
@@ -4035,7 +4071,36 @@ fts_sync_index(
ut_ad(rbt_validate(index_cache->words));
- return(fts_sync_write_words(trx, index_cache));
+ return(fts_sync_write_words(trx, index_cache, sync->unlock_cache));
+}
+
+/** Check if index cache has been synced completely
+@param[in,out] index_cache index cache
+@return true if index is synced, otherwise false. */
+static bool fts_sync_index_check(fts_index_cache_t *index_cache)
+{
+ for (const ib_rbt_node_t *rbt_node = rbt_first(index_cache->words);
+ rbt_node != NULL;
+ rbt_node = rbt_next(index_cache->words, rbt_node))
+ {
+ fts_tokenizer_word_t *word= rbt_value(fts_tokenizer_word_t, rbt_node);
+ if (!((fts_node_t*)ib_vector_last(word->nodes))->synced)
+ return false;
+ }
+ return true;
+}
+
+/** Reset synced flag in index cache when rollback
+@param[in,out] index_cache index cache */
+static void fts_sync_index_reset(fts_index_cache_t *index_cache)
+{
+ for (const ib_rbt_node_t *rbt_node = rbt_first(index_cache->words);
+ rbt_node != NULL;
+ rbt_node = rbt_next(index_cache->words, rbt_node))
+ {
+ fts_tokenizer_word_t *word= rbt_value(fts_tokenizer_word_t, rbt_node);
+ ((fts_node_t*)ib_vector_last(word->nodes))->synced= false;
+ }
}
/** Rollback a sync operation
@@ -4055,6 +4120,10 @@ fts_sync_rollback(
index_cache = static_cast<fts_index_cache_t*>(
ib_vector_get(cache->indexes, i));
+ /* Reset synced flag so nodes will not be skipped
+ in the next sync, see fts_sync_write_words(). */
+ fts_sync_index_reset(index_cache);
+
for (j = 0; fts_index_selector[j].value; ++j) {
if (index_cache->ins_graph[j] != NULL) {
@@ -4121,16 +4190,6 @@ fts_sync_commit(
return error;
}
- if (UNIV_UNLIKELY(fts_enable_diag_print) && elapsed_time) {
- ib::info() << "SYNC for table " << sync->table->name
- << ": SYNC time: "
- << (time(NULL) - sync->start_time)
- << " secs: elapsed "
- << static_cast<double>(n_nodes)
- / static_cast<double>(elapsed_time)
- << " ins/sec";
- }
-
/* Avoid assertion in trx_t::free(). */
trx->dict_operation_lock_mode = false;
trx->free();
@@ -4144,7 +4203,12 @@ FTS auxiliary INDEX table and clear the cache at the end.
@param[in] unlock_cache whether unlock cache lock when write node
@param[in] wait whether wait when a sync is in progress
@return DB_SUCCESS if all OK */
-static dberr_t fts_sync(fts_sync_t *sync)
+static
+dberr_t
+fts_sync(
+ fts_sync_t* sync,
+ bool unlock_cache,
+ bool wait)
{
if (srv_read_only_mode) {
return DB_READ_ONLY;
@@ -4155,13 +4219,31 @@ static dberr_t fts_sync(fts_sync_t *sync)
fts_cache_t* cache = sync->table->fts->cache;
mysql_mutex_lock(&cache->lock);
+
+ /* Check if cache is being synced.
+ Note: we release cache lock in fts_sync_write_words() to
+ avoid long wait for the lock by other threads. */
+ while (sync->in_progress) {
+ if (!wait) {
+ mysql_mutex_unlock(&cache->lock);
+ return(DB_SUCCESS);
+ }
+ my_cond_wait(&sync->cond, &cache->lock.m_mutex);
+ }
+
+ sync->unlock_cache = unlock_cache;
+ sync->in_progress = true;
+
DEBUG_SYNC_C("fts_sync_begin");
fts_sync_begin(sync);
-
+ time_t start_time= time(NULL);
+begin_sync:
const size_t fts_cache_size= fts_max_cache_size;
if (cache->total_size > fts_cache_size) {
/* Avoid the case: sync never finish when
insert/update keeps comming. */
+ ut_ad(sync->unlock_cache);
+ sync->unlock_cache = false;
ib::warn() << "Total InnoDB FTS size "
<< cache->total_size << " for the table "
<< cache->sync->table->name
@@ -4185,23 +4267,53 @@ static dberr_t fts_sync(fts_sync_t *sync)
error = fts_sync_index(sync, index_cache);
if (error != DB_SUCCESS) {
- goto err_exit;
+ goto end_sync;
}
}
DBUG_EXECUTE_IF("fts_instrument_sync_interrupted",
error = DB_INTERRUPTED;
- goto err_exit;
+ goto end_sync;
);
+ /* Make sure all the caches are synced. */
+ for (i = 0; i < ib_vector_size(cache->indexes); ++i) {
+ fts_index_cache_t* index_cache;
+
+ index_cache = static_cast<fts_index_cache_t*>(
+ ib_vector_get(cache->indexes, i));
+
+ if (index_cache->index->to_be_dropped
+ || fts_sync_index_check(index_cache)) {
+ continue;
+ }
+
+ goto begin_sync;
+ }
+
+end_sync:
if (error == DB_SUCCESS) {
error = fts_sync_commit(sync);
+ if (UNIV_UNLIKELY(fts_enable_diag_print)
+ && elapsed_time) {
+ ib::info() << "SYNC for table "
+ << sync->table->name << ": SYNC time: "
+ << (time(NULL) - start_time)
+ << " secs: elapsed "
+ << static_cast<double>(n_nodes)
+ / static_cast<double>(elapsed_time)
+ << " ins/sec";
+ }
} else {
-err_exit:
fts_sync_rollback(sync);
- return error;
}
+ mysql_mutex_lock(&cache->lock);
+ ut_ad(sync->in_progress);
+ sync->in_progress = false;
+ pthread_cond_broadcast(&sync->cond);
+ mysql_mutex_unlock(&cache->lock);
+
/* We need to check whether an optimize is required, for that
we make copies of the two variables that control the trigger. These
variables can change behind our back and we don't want to hold the
@@ -4213,7 +4325,6 @@ err_exit:
mysql_mutex_unlock(&cache->deleted_lock);
- DEBUG_SYNC_C("fts_sync_end");
return(error);
}
@@ -4222,12 +4333,12 @@ FTS auxiliary INDEX table and clear the cache at the end.
@param[in,out] table fts table
@param[in] wait whether wait for existing sync to finish
@return DB_SUCCESS on success, error code on failure. */
-dberr_t fts_sync_table(dict_table_t* table)
+dberr_t fts_sync_table(dict_table_t* table, bool wait)
{
ut_ad(table->fts);
return table->space && !table->corrupted && table->fts->cache
- ? fts_sync(table->fts->cache->sync)
+ ? fts_sync(table->fts->cache->sync, !wait, wait)
: DB_SUCCESS;
}
@@ -5138,8 +5249,9 @@ fts_t::fts_t(
added_synced(0), dict_locked(0),
add_wq(NULL),
cache(NULL),
- doc_col(ULINT_UNDEFINED), in_queue(false), sync_message(false),
- fts_heap(heap)
+ doc_col(ULINT_UNDEFINED),
+ wait_in_queue(false), sync_message(false),
+ in_queue(false), in_process(false), fts_heap(heap)
{
ut_a(table->fts == NULL);
@@ -5148,6 +5260,7 @@ fts_t::fts_t(
indexes = ib_vector_create(heap_alloc, sizeof(dict_index_t*), 4);
dict_table_get_all_fts_indexes(table, indexes);
+ pthread_cond_init(&fts_queue_cond, nullptr);
}
/** fts_t destructor. */
@@ -5160,6 +5273,7 @@ fts_t::~fts_t()
fts_cache_destroy(cache);
}
+ pthread_cond_destroy(&fts_queue_cond);
/* There is no need to call ib_vector_free() on this->indexes
because it is stored in this->fts_heap. */
mem_heap_free(fts_heap);
diff --git a/storage/innobase/fts/fts0opt.cc b/storage/innobase/fts/fts0opt.cc
index 7c40a25e6e7..e65ef841f31 100644
--- a/storage/innobase/fts/fts0opt.cc
+++ b/storage/innobase/fts/fts0opt.cc
@@ -45,20 +45,86 @@ extern Atomic_relaxed<bool> wsrep_sst_disable_writes;
constexpr bool wsrep_sst_disable_writes= false;
#endif
+uint srv_n_fts_threads;
+
+/** Count the number of fts threads started */
+static uint srv_n_fts_threads_started;
+
/** The FTS optimize thread's work queue. */
ib_wqueue_t* fts_optimize_wq;
-static void fts_optimize_callback(void *);
-static void timer_callback(void*);
-static tpool::timer* timer;
-static tpool::task_group task_group(1);
-static tpool::task task(fts_optimize_callback,0, &task_group);
+/** Indicate whether the background fts thread was initalized */
+static bool fts_bg_threads_inited= false;
+
+/** Condition valriable for srv_n_fts_threads_started */
+static pthread_cond_t fts_start_cond;
+
+/** Condition variable to signal fts background threads */
+static pthread_cond_t fts_wait_cond;
/** FTS optimize thread, for MDL acquisition */
static THD *fts_opt_thd;
-/** The FTS vector to store fts_slot_t */
-static ib_vector_t* fts_slots;
+struct fts_msg_del_t;
+
+struct fts_slot_t;
+
+/** Slots to store the fts table which can be used by
+fulltext background thread for optimization */
+struct fts_slots_t
+{
+ ib_vector_t *fts_slots;
+ mysql_mutex_t mutex;
+ ulint n_optimize_count;
+
+ void init(ib_alloc_t *heap_alloc);
+
+ void free()
+ {
+ mysql_mutex_destroy(&mutex);
+ ib_vector_free(fts_slots);
+ fts_slots= nullptr;
+ }
+
+ ulint get_n_optimize()
+ {
+ mysql_mutex_lock(&mutex);
+ ulint n_count= n_optimize_count;
+ mysql_mutex_unlock(&mutex);
+ return n_count;
+ }
+
+ /** Add new table to the fts slots */
+ void add_new_table(dict_table_t *table);
+
+ /** Delete the table from fts slots */
+ void delete_table(fts_msg_del_t *remove);
+
+ /** Update the tables which need optimization */
+ void update_need_sync();
+
+ /** Whether all table fts cache memory exceeds the threshold */
+ bool is_sync_need();
+
+ ulint get_n_tables()
+ {
+ mysql_mutex_lock(&mutex);
+ ulint n_tables= ib_vector_size(fts_slots);
+ mysql_mutex_unlock(&mutex);
+ return n_tables;
+ }
+
+ /** Get the table which is ready for optimization */
+ dict_table_t* get_optimize_table();
+
+ /** Update the slot which has optimization was done */
+ void update_slot(dict_table_t *table, bool threshold, dberr_t *err);
+
+ /** Iterate all table in fts slots during shutdown */
+ dict_table_t* get_all_tables_during_exit();
+};
+
+static fts_slots_t fts_opt_slots;
/** Default optimize interval in secs. */
static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300;
@@ -83,8 +149,9 @@ enum fts_msg_type_t {
FTS_MSG_ADD_TABLE, /*!< Add table to the optimize thread's
work queue */
- FTS_MSG_DEL_TABLE /*!< Remove a table from the optimize
+ FTS_MSG_DEL_TABLE, /*!< Remove a table from the optimize
threads work queue */
+ FTS_MSG_SYNC_TABLE /*!< Sync fts cache of a table */
};
/** Compressed list of words that have been read from FTS INDEX
@@ -259,6 +326,217 @@ static const char* fts_end_delete_sql =
"DELETE FROM $BEING_DELETED;\n"
"DELETE FROM $BEING_DELETED_CACHE;\n";
+void fts_slots_t::init(ib_alloc_t *heap_alloc)
+{
+ fts_slots= ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
+ mysql_mutex_init(0, &mutex, nullptr);
+ n_optimize_count= 0;
+}
+
+void fts_slots_t::add_new_table(dict_table_t *table)
+{
+ fts_slot_t* slot;
+ fts_slot_t* empty= NULL;
+ mysql_mutex_lock(&mutex);
+ /* Search for duplicates, also find a free slot if one exists. */
+ for (ulint i= 0; i < ib_vector_size(fts_slots); ++i)
+ {
+ slot= static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
+ if (!slot->table)
+ empty= slot;
+ else if (slot->table == table)
+ {
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ table->fts->in_queue= true;
+ table->fts->wait_in_queue= false;
+ pthread_cond_signal(&table->fts->fts_queue_cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ mysql_mutex_unlock(&mutex);
+ return;
+ }
+ }
+
+ slot= empty ? empty : static_cast<fts_slot_t*>(ib_vector_push(
+ fts_slots, NULL));
+ memset(slot, 0x0, sizeof(*slot));
+ slot->table = table;
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ table->fts->wait_in_queue= false;
+ table->fts->in_queue= true;
+ pthread_cond_signal(&fts_wait_cond);
+ pthread_cond_signal(&table->fts->fts_queue_cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ mysql_mutex_unlock(&mutex);
+ return;
+}
+
+void fts_slots_t::delete_table(fts_msg_del_t *remove)
+{
+ const dict_table_t* table= remove->table;
+ ut_ad(table);
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ /* Make sure that InnoDB table was added in fts_slots */
+ while (!table->fts->in_queue || table->fts->in_process)
+ my_cond_wait(&table->fts->fts_queue_cond,
+ &fts_optimize_wq->mutex.m_mutex);
+
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ mysql_mutex_lock(&mutex);
+ for (ulint i = 0; i < ib_vector_size(fts_slots); ++i)
+ {
+ fts_slot_t *slot= static_cast<fts_slot_t*>(
+ ib_vector_get(fts_slots, i));
+ if (slot->table == table)
+ {
+ if (UNIV_UNLIKELY(fts_enable_diag_print))
+ ib::info() << "FTS Optimize Removing table " << table->name;
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ table->fts->in_queue= false;
+ pthread_cond_signal(remove->cond);
+ slot->table= nullptr;
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ mysql_mutex_unlock(&mutex);
+ return;
+ }
+ }
+
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ pthread_cond_signal(remove->cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ mysql_mutex_unlock(&mutex);
+}
+
+void fts_slots_t::update_need_sync()
+{
+ ulint n_tables = 0;
+ mysql_mutex_lock(&mutex);
+ const time_t current_time = time(NULL);
+ for (ulint i = 0; i < ib_vector_size(fts_slots); ++i)
+ {
+ const fts_slot_t* slot = static_cast<const fts_slot_t*>(
+ ib_vector_get_const(fts_slots, i));
+ if (!slot->table)
+ continue;
+ const time_t end = slot->running ? slot->last_run : slot->completed;
+ ulint interval = ulint(current_time - end);
+
+ if (lint(interval) < 0
+ || interval >= FTS_OPTIMIZE_INTERVAL_IN_SECS)
+ ++n_tables;
+ }
+
+ n_optimize_count= n_tables;
+ mysql_mutex_unlock(&mutex);
+}
+
+bool fts_slots_t::is_sync_need()
+{
+ ulint total_memory= 0;
+ mysql_mutex_lock(&mutex);
+ for (ulint i = 0; i < ib_vector_size(fts_slots); ++i)
+ {
+ const fts_slot_t* slot = static_cast<const fts_slot_t*>(
+ ib_vector_get_const(fts_slots, i));
+
+ if (!slot->table) continue;
+ if (slot->table->fts && slot->table->fts->cache)
+ total_memory += slot->table->fts->cache->total_size;
+
+ if (total_memory > fts_max_total_cache_size)
+ {
+ mysql_mutex_unlock(&mutex);
+ return true;
+ }
+ }
+ mysql_mutex_unlock(&mutex);
+ return false;
+}
+
+dict_table_t* fts_slots_t::get_optimize_table()
+{
+ static ulint current= 0;
+ fts_slot_t* slot;
+ mysql_mutex_lock(&mutex);
+read_slot:
+ if (current >= ib_vector_size(fts_slots))
+ {
+ current= 0;
+ mysql_mutex_unlock(&mutex);
+ return nullptr;
+ }
+
+ slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, current));
+
+ /* Handle the case of empty slots. */
+ if (slot->table)
+ {
+ slot->running = true;
+ const time_t now = time(NULL);
+ const ulint interval = ulint(now - slot->last_run);
+ /* Avoid optimizing tables that were optimized recently. */
+ if (slot->last_run > 0 && lint(interval) >= 0
+ && interval < FTS_OPTIMIZE_INTERVAL_IN_SECS)
+ {
+ current++;
+ goto read_slot;
+ }
+ }
+ dict_table_t *table= slot->table;
+ /* Update in_process */
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ table->fts->in_process= true;
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ mysql_mutex_unlock(&mutex);
+ return table;
+}
+
+void fts_slots_t::update_slot(
+ dict_table_t *table, bool threshold, dberr_t *err)
+{
+ mysql_mutex_lock(&mutex);
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ table->fts->in_process= false;
+ pthread_cond_broadcast(&table->fts->fts_queue_cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ for (ulint i = 0; i < ib_vector_size(fts_slots); ++i)
+ {
+ fts_slot_t* slot = static_cast<fts_slot_t*>(
+ ib_vector_get(fts_slots, i));
+ if (slot->table == table)
+ {
+ if (threshold)
+ {
+ slot->last_run= time(NULL);
+ if (*err == DB_SUCCESS)
+ {
+ slot->running= false;
+ slot->completed= slot->last_run;
+ }
+ } else slot->last_run= time(NULL);
+ }
+ }
+ mysql_mutex_unlock(&mutex);
+}
+
+dict_table_t* fts_slots_t::get_all_tables_during_exit()
+{
+ static ulint i= 0;
+ fts_slot_t* slot;
+ mysql_mutex_lock(&mutex);
+read_again:
+ if (i >= ib_vector_size(fts_slots))
+ {
+ mysql_mutex_unlock(&mutex);
+ return nullptr;
+ }
+ slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i++));
+ /* Handle the case of empty slots. */
+ if (!slot->table) { goto read_again; }
+ dict_table_t *table= slot->table;
+ mysql_mutex_unlock(&mutex);
+ return table;
+}
+
/**********************************************************************//**
Initialize fts_zip_t. */
static
@@ -2388,41 +2666,15 @@ Run OPTIMIZE on the given table by a background thread.
@return DB_SUCCESS if all OK */
static MY_ATTRIBUTE((nonnull))
dberr_t
-fts_optimize_table_bk(
-/*==================*/
- fts_slot_t* slot) /*!< in: table to optimiza */
+fts_optimize_table_bk(dict_table_t *table, bool &threshold)
{
- const time_t now = time(NULL);
- const ulint interval = ulint(now - slot->last_run);
-
- /* Avoid optimizing tables that were optimized recently. */
- if (slot->last_run > 0
- && lint(interval) >= 0
- && interval < FTS_OPTIMIZE_INTERVAL_IN_SECS) {
-
- return(DB_SUCCESS);
- }
-
- dict_table_t* table = slot->table;
- dberr_t error;
-
+ dberr_t error = DB_SUCCESS;
if (table->is_accessible()
&& table->fts && table->fts->cache
&& table->fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) {
error = fts_optimize_table(table);
-
- slot->last_run = time(NULL);
-
- if (error == DB_SUCCESS) {
- slot->running = false;
- slot->completed = slot->last_run;
- }
- } else {
- /* Note time this run completed. */
- slot->last_run = now;
- error = DB_SUCCESS;
+ threshold= true;
}
-
return(error);
}
/*********************************************************************//**
@@ -2550,16 +2802,8 @@ fts_optimize_create_msg(
static void add_msg(fts_msg_t *msg)
{
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
- srv_thread_pool->submit_task(&task);
-}
-
-/**
-Called by "idle" timer. Submits optimize task, which
-will only recalculate is_sync_needed, in case the queue is empty.
-*/
-static void timer_callback(void*)
-{
- srv_thread_pool->submit_task(&task);
+ /* Signal all the fts threads */
+ pthread_cond_signal(&fts_wait_cond);
}
/** Add the table to add to the OPTIMIZER's list.
@@ -2579,9 +2823,15 @@ void fts_optimize_add_table(dict_table_t* table)
mysql_mutex_lock(&fts_optimize_wq->mutex);
+ if (table->fts->in_queue || table->fts->wait_in_queue) {
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ mem_heap_free(msg->heap);
+ return;
+ }
+
add_msg(msg);
- table->fts->in_queue = true;
+ table->fts->wait_in_queue = true;
mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
@@ -2608,7 +2858,7 @@ fts_optimize_remove_table(
mysql_mutex_lock(&fts_optimize_wq->mutex);
- if (table->fts->in_queue)
+ if (table->fts->wait_in_queue || table->fts->in_queue)
{
fts_msg_t *msg= fts_optimize_create_msg(FTS_MSG_DEL_TABLE, nullptr);
pthread_cond_t cond;
@@ -2624,130 +2874,34 @@ fts_optimize_remove_table(
mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
-/** Add a table to fts_slots if it doesn't already exist. */
-static bool fts_optimize_new_table(dict_table_t* table)
-{
- ut_ad(table);
-
- ulint i;
- fts_slot_t* slot;
- fts_slot_t* empty = NULL;
-
- /* Search for duplicates, also find a free slot if one exists. */
- for (i = 0; i < ib_vector_size(fts_slots); ++i) {
-
- slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
-
- if (!slot->table) {
- empty = slot;
- } else if (slot->table == table) {
- /* Already exists in our optimize queue. */
- return false;
- }
- }
-
- slot = empty ? empty : static_cast<fts_slot_t*>(
- ib_vector_push(fts_slots, NULL));
-
- memset(slot, 0x0, sizeof(*slot));
-
- slot->table = table;
- return true;
-}
-
-/** Remove a table from fts_slots if it exists.
-@param remove table to be removed from fts_slots */
-static bool fts_optimize_del_table(fts_msg_del_t *remove)
+/** Send sync fts cache for the table.
+@param[in] table table to sync */
+void
+fts_optimize_request_sync_table(
+ dict_table_t* table)
{
- const dict_table_t* table = remove->table;
- ut_ad(table);
- for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
- fts_slot_t* slot;
-
- slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
-
- if (slot->table == table) {
- if (UNIV_UNLIKELY(fts_enable_diag_print)) {
- ib::info() << "FTS Optimize Removing table "
- << table->name;
- }
-
- mysql_mutex_lock(&fts_optimize_wq->mutex);
- table->fts->in_queue = false;
- pthread_cond_signal(remove->cond);
- mysql_mutex_unlock(&fts_optimize_wq->mutex);
- slot->table = NULL;
- return true;
- }
+ /* if the optimize system not yet initialized, return */
+ if (!fts_optimize_wq) {
+ return;
}
mysql_mutex_lock(&fts_optimize_wq->mutex);
- pthread_cond_signal(remove->cond);
- mysql_mutex_unlock(&fts_optimize_wq->mutex);
- return false;
-}
-
-/**********************************************************************//**
-Calculate how many tables in fts_slots need to be optimized.
-@return no. of tables to optimize */
-static ulint fts_optimize_how_many()
-{
- ulint n_tables = 0;
- const time_t current_time = time(NULL);
-
- for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
- const fts_slot_t* slot = static_cast<const fts_slot_t*>(
- ib_vector_get_const(fts_slots, i));
- if (!slot->table) {
- continue;
- }
-
- const time_t end = slot->running
- ? slot->last_run : slot->completed;
- ulint interval = ulint(current_time - end);
-
- if (lint(interval) < 0
- || interval >= FTS_OPTIMIZE_INTERVAL_IN_SECS) {
- ++n_tables;
- }
- }
-
- return(n_tables);
-}
-
-/**********************************************************************//**
-Check if the total memory used by all FTS table exceeds the maximum limit.
-@return true if a sync is needed, false otherwise */
-static bool fts_is_sync_needed()
-{
- ulint total_memory = 0;
- const time_t now = time(NULL);
- double time_diff = difftime(now, last_check_sync_time);
-
- if (fts_need_sync || (time_diff >= 0 && time_diff < 5)) {
- return(false);
- }
-
- last_check_sync_time = now;
-
- for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
- const fts_slot_t* slot = static_cast<const fts_slot_t*>(
- ib_vector_get_const(fts_slots, i));
-
- if (!slot->table) {
- continue;
- }
-
- if (slot->table->fts && slot->table->fts->cache) {
- total_memory += slot->table->fts->cache->total_size;
- }
- if (total_memory > fts_max_total_cache_size) {
- return(true);
- }
+ /* FTS optimizer thread is already exited */
+ if (fts_opt_start_shutdown) {
+ ib::info() << "Try to sync table " << table->name
+ << " after FTS optimize thread exiting.";
+ } else if (table->fts->sync_message) {
+ /* If the table already has SYNC message in
+ fts_optimize_wq queue then ignore it */
+ } else {
+ add_msg(fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, table));
+ table->fts->sync_message = true;
+ DBUG_EXECUTE_IF("fts_optimize_wq_count_check",
+ DBUG_ASSERT(fts_optimize_wq->length <= 1000););
}
- return(false);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
/** Sync fts cache of a table
@@ -2765,12 +2919,25 @@ static void fts_optimize_sync_table(dict_table_t *table,
if (sync_table->fts && sync_table->fts->cache && sync_table->is_accessible())
{
- fts_sync_table(sync_table);
-
+ if (process_message)
+ {
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ if (!sync_table->fts->in_process)
+ sync_table->fts->in_process = true;
+ else
+ {
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ goto func_exit;
+ }
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ }
+ fts_sync_table(sync_table, false);
if (process_message)
{
mysql_mutex_lock(&fts_optimize_wq->mutex);
sync_table->fts->sync_message = false;
+ sync_table->fts->in_process = false;
+ pthread_cond_broadcast(&sync_table->fts->fts_queue_cond);
mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
}
@@ -2778,131 +2945,147 @@ static void fts_optimize_sync_table(dict_table_t *table,
DBUG_EXECUTE_IF("ib_optimize_wq_hang",
std::this_thread::sleep_for(std::chrono::seconds(6)););
+func_exit:
if (mdl_ticket)
dict_table_close(sync_table, false, fts_opt_thd, mdl_ticket);
}
-/**********************************************************************//**
-Optimize all FTS tables.
-@return Dummy return */
-static void fts_optimize_callback(void *)
+static void fts_process_msg(fts_msg_t *msg)
{
- ut_ad(!srv_read_only_mode);
-
- static ulint current;
- static bool done;
- static ulint n_optimize;
-
- if (!fts_optimize_wq || done) {
- /* Possibly timer initiated callback, can come after FTS_MSG_STOP.*/
- return;
- }
-
- static ulint n_tables = ib_vector_size(fts_slots);
-
- while (!done && srv_shutdown_state <= SRV_SHUTDOWN_INITIATED) {
- /* If there is no message in the queue and we have tables
- to optimize then optimize the tables. */
-
- if (!done
- && ib_wqueue_is_empty(fts_optimize_wq)
- && n_tables > 0
- && n_optimize > 0) {
-
- /* The queue is empty but we have tables
- to optimize. */
- if (UNIV_UNLIKELY(wsrep_sst_disable_writes)) {
-retry_later:
- if (fts_is_sync_needed()) {
- fts_need_sync = true;
- }
- if (n_tables) {
- timer->set_time(5000, 0);
- }
- return;
- }
-
- fts_slot_t* slot = static_cast<fts_slot_t*>(
- ib_vector_get(fts_slots, current));
+ switch (msg->type)
+ {
+ case FTS_MSG_ADD_TABLE:
+ fts_opt_slots.add_new_table(static_cast<dict_table_t*>(msg->ptr));
+ break;
+ case FTS_MSG_DEL_TABLE:
+ fts_opt_slots.delete_table(static_cast<fts_msg_del_t*>(msg->ptr));
+ break;
+ case FTS_MSG_SYNC_TABLE:
+ DBUG_EXECUTE_IF("fts_instrument_msg_sync_sleep",
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(300)););
+
+ fts_optimize_sync_table(static_cast<dict_table_t*>(msg->ptr), true);
+ break;
+ default: ut_error;
+ }
+ return;
+}
- /* Handle the case of empty slots. */
- if (slot->table) {
- slot->running = true;
- fts_optimize_table_bk(slot);
- }
+/** Fulltext thread information. */
+struct fts_thread_info
+{
+ uint thread_no;
+ fts_thread_info(uint no): thread_no(no) {}
+ bool should_shutdown() const
+ {
+ switch(srv_shutdown_state)
+ {
+ case SRV_SHUTDOWN_NONE:
+ return thread_no >= srv_n_fts_threads;
+ case SRV_SHUTDOWN_EXIT_THREADS:
+ case SRV_SHUTDOWN_CLEANUP:
+ case SRV_SHUTDOWN_INITIATED:
+ return true;
+ case SRV_SHUTDOWN_LAST_PHASE:
+ break;
+ }
+ ut_ad(0);
+ return false;
+ }
+};
- /* Wrap around the counter. */
- if (++current >= ib_vector_size(fts_slots)) {
- n_optimize = fts_optimize_how_many();
- current = 0;
- }
- } else if (n_optimize == 0
- || !ib_wqueue_is_empty(fts_optimize_wq)) {
- fts_msg_t* msg = static_cast<fts_msg_t*>
- (ib_wqueue_nowait(fts_optimize_wq));
- /* Timeout ? */
- if (!msg) {
- goto retry_later;
- }
+static void fts_optimize_func()
+{
+ ut_ad(!srv_read_only_mode);
- switch (msg->type) {
- case FTS_MSG_STOP:
- done = true;
- break;
+ if (!fts_optimize_wq) return;
- case FTS_MSG_ADD_TABLE:
- ut_a(!done);
- if (fts_optimize_new_table(
- static_cast<dict_table_t*>(
- msg->ptr))) {
- ++n_tables;
- }
- break;
-
- case FTS_MSG_DEL_TABLE:
- if (fts_optimize_del_table(
- static_cast<fts_msg_del_t*>(
- msg->ptr))) {
- --n_tables;
- }
- break;
- default:
- ut_error;
- }
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ fts_thread_info fts_thd(srv_n_fts_threads_started++);
+ pthread_cond_signal(&fts_start_cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ while (!fts_thd.should_shutdown())
+ {
+ if (ib_wqueue_is_empty(fts_optimize_wq))
+ {
+ if (fts_opt_slots.get_n_optimize() == 0)
+ {
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ my_cond_wait(&fts_wait_cond, &fts_optimize_wq->mutex.m_mutex);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
+ }
+ else
+ {
+ dict_table_t* table= fts_opt_slots.get_optimize_table();
+ if (!table) fts_opt_slots.update_need_sync();
+ else
+ {
+ bool threshold= false;
+ dberr_t err= fts_optimize_table_bk(table, threshold);
+ fts_opt_slots.update_slot(table, threshold, &err);
+ }
+ }
+ }
+ else
+ {
+ fts_msg_t* msg = static_cast<fts_msg_t*>(
+ ib_wqueue_nowait(fts_optimize_wq));
+ if (!msg) fts_opt_slots.update_need_sync();
+ else
+ {
+ fts_process_msg(msg);
+ mem_heap_free(msg->heap);
+ }
+ }
+ }
- mem_heap_free(msg->heap);
- n_optimize = done ? 0 : fts_optimize_how_many();
- }
- }
+ /** Exit thread doesn't have to process all msg
+ and sync all tables */
+ if (srv_shutdown_state >= SRV_SHUTDOWN_INITIATED)
+ goto func_exit;
- /* Server is being shutdown, sync the data from FTS cache to disk
- if needed */
- if (n_tables > 0) {
- for (ulint i = 0; i < ib_vector_size(fts_slots); i++) {
- fts_slot_t* slot = static_cast<fts_slot_t*>(
- ib_vector_get(fts_slots, i));
+ /* Process all messages from fts_optimize_wq */
+ while (!ib_wqueue_is_empty(fts_optimize_wq))
+ {
+ fts_msg_t* msg = static_cast<fts_msg_t*>(
+ ib_wqueue_nowait(fts_optimize_wq));
+ if (!msg)
+ break;
+ fts_process_msg(msg);
+ mem_heap_free(msg->heap);
+ }
- if (slot->table) {
- fts_optimize_sync_table(slot->table);
- }
- }
- }
+ /* Sync all the table during shutdown */
+ while (dict_table_t *table= fts_opt_slots.get_all_tables_during_exit())
+ fts_optimize_sync_table(table);
- ib_vector_free(fts_slots);
- mysql_mutex_lock(&fts_optimize_wq->mutex);
- fts_slots = NULL;
- pthread_cond_broadcast(&fts_opt_shutdown_cond);
- mysql_mutex_unlock(&fts_optimize_wq->mutex);
+func_exit:
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ srv_n_fts_threads_started--;
+ if (srv_n_fts_threads_started == 0)
+ {
+ fts_opt_slots.free();
+ pthread_cond_broadcast(&fts_opt_shutdown_cond);
+ }
+ pthread_cond_signal(&fts_start_cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
- ib::info() << "FTS optimize thread exiting.";
+ ib::info() << "FTS optimize thread exiting.";
}
+void fts_bg_set_thread_cnt(const uint new_cnt);
+
/**********************************************************************//**
Startup the optimize thread and create the work queue. */
void
fts_optimize_init(void)
/*===================*/
{
+ if (fts_bg_threads_inited) {
+ return;
+ }
+
mem_heap_t* heap;
ib_alloc_t* heap_alloc;
@@ -2913,12 +3096,12 @@ fts_optimize_init(void)
/* Create FTS optimize work queue */
fts_optimize_wq = ib_wqueue_create();
- timer = srv_thread_pool->create_timer(timer_callback);
/* Create FTS vector to store fts_slot_t */
heap = mem_heap_create(sizeof(dict_table_t*) * 64);
heap_alloc = ib_heap_allocator_create(heap);
- fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
+
+ fts_opt_slots.init(heap_alloc);
fts_opt_thd = innobase_create_background_thd("InnoDB FTS optimizer");
/* Add fts tables to fts_slots which could be skipped
@@ -2936,13 +3119,52 @@ fts_optimize_init(void)
need to acquire fts_optimize_wq->mutex for adding the fts
table to the fts slots. */
ut_ad(!table->can_be_evicted);
- fts_optimize_new_table(table);
+ fts_opt_slots.add_new_table(table);
table->fts->in_queue = true;
}
dict_sys.unfreeze();
pthread_cond_init(&fts_opt_shutdown_cond, nullptr);
+ pthread_cond_init(&fts_start_cond, nullptr);
+ pthread_cond_init(&fts_wait_cond, nullptr);
last_check_sync_time = time(NULL);
+ uint cnt = srv_n_fts_threads;
+ srv_n_fts_threads = 0;
+ fts_bg_threads_inited= true;
+ fts_bg_set_thread_cnt(cnt);
+}
+
+void fts_bg_set_thread_cnt(const uint new_cnt)
+{
+ if (!fts_bg_threads_inited)
+ {
+ if (srv_shutdown_state != SRV_SHUTDOWN_NONE)
+ return;
+ fts_optimize_init();
+ }
+
+ mysql_mutex_lock(&fts_optimize_wq->mutex);
+ if (new_cnt > srv_n_fts_threads)
+ {
+ uint add= new_cnt - srv_n_fts_threads;
+ srv_n_fts_threads= new_cnt;
+ for (uint i= 0; i < add; i++)
+ {
+ std::thread thd(fts_optimize_func);
+ ib::info() << "Creating #" << i + 1 << " fulltext thread id "
+ << thd.get_id() << " total threads " << new_cnt <<".";
+ thd.detach();
+ }
+ } else if (new_cnt < srv_n_fts_threads)
+ srv_n_fts_threads= new_cnt;
+ while (srv_n_fts_threads_started != srv_n_fts_threads)
+ {
+ pthread_cond_broadcast(&fts_wait_cond);
+ my_cond_wait(&fts_start_cond, &fts_optimize_wq->mutex.m_mutex);
+ }
+
+ pthread_cond_signal(&fts_wait_cond);
+ mysql_mutex_unlock(&fts_optimize_wq->mutex);
}
/** Shutdown fts optimize thread. */
@@ -2961,29 +3183,21 @@ fts_optimize_shutdown()
fts_opt_start_shutdown = true;
dict_sys.unfreeze();
- /* We tell the OPTIMIZE thread to switch to state done, we
- can't delete the work queue here because the add thread needs
- deregister the FTS tables. */
- timer->disarm();
- task_group.cancel_pending(&task);
-
- add_msg(fts_optimize_create_msg(FTS_MSG_STOP, nullptr));
-
- while (fts_slots) {
+ while (srv_n_fts_threads_started) {
+ pthread_cond_broadcast(&fts_wait_cond);
my_cond_wait(&fts_opt_shutdown_cond,
- &fts_optimize_wq->mutex.m_mutex);
+ &fts_optimize_wq->mutex.m_mutex);
}
destroy_background_thd(fts_opt_thd);
fts_opt_thd = NULL;
pthread_cond_destroy(&fts_opt_shutdown_cond);
+ pthread_cond_destroy(&fts_start_cond);
+ pthread_cond_destroy(&fts_wait_cond);
mysql_mutex_unlock(&fts_optimize_wq->mutex);
ib_wqueue_free(fts_optimize_wq);
fts_optimize_wq = NULL;
-
- delete timer;
- timer = NULL;
}
/** Sync the table during commit phase
@@ -2998,7 +3212,7 @@ void fts_sync_during_ddl(dict_table_t* table)
if (!sync_message)
return;
- fts_sync_table(table);
+ fts_sync_table(table, false);
mysql_mutex_lock(&fts_optimize_wq->mutex);
table->fts->sync_message = false;
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index f102789d7ab..14c8486761a 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -18603,6 +18603,15 @@ innodb_encrypt_tables_update(THD*, st_mysql_sys_var*, void*, const void* save)
mysql_mutex_lock(&LOCK_global_system_variables);
}
+static
+void
+innodb_fulltext_threads_update(THD*, st_mysql_sys_var*, void*, const void* save)
+{
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ fts_bg_set_thread_cnt(*static_cast<const uint*>(save));
+ mysql_mutex_lock(&LOCK_global_system_variables);
+}
+
static SHOW_VAR innodb_status_variables_export[]= {
SHOW_FUNC_ENTRY("Innodb", &show_innodb_vars),
{NullS, NullS, SHOW_LONG}
@@ -19747,6 +19756,14 @@ static MYSQL_SYSVAR_BOOL(encrypt_temporary_tables, innodb_encrypt_temporary_tabl
"Enrypt the temporary table data.",
NULL, NULL, false);
+static MYSQL_SYSVAR_UINT(fulltext_bg_threads, srv_n_fts_threads,
+ PLUGIN_VAR_RQCMDARG,
+ "Number of threads performing background "
+ "fulltext message process and optimization ",
+ NULL,
+ innodb_fulltext_threads_update,
+ 2, 1, 255, 0);
+
static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(autoextend_increment),
MYSQL_SYSVAR(buffer_pool_size),
@@ -19913,6 +19930,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(buf_dump_status_frequency),
MYSQL_SYSVAR(background_thread),
MYSQL_SYSVAR(encrypt_temporary_tables),
+ MYSQL_SYSVAR(fulltext_bg_threads),
NULL
};
diff --git a/storage/innobase/include/fts0fts.h b/storage/innobase/include/fts0fts.h
index 720fe7f25b9..2b45da79650 100644
--- a/storage/innobase/include/fts0fts.h
+++ b/storage/innobase/include/fts0fts.h
@@ -332,14 +332,26 @@ public:
/** Vector of FTS indexes, this is mainly for caching purposes. */
ib_vector_t* indexes;
- /** Whether the table exists in fts_optimize_wq;
+ /** Whether the addition of new table message in fts_optimize_wq;
protected by fts_optimize_wq mutex */
- bool in_queue;
+ bool wait_in_queue;
/** Whether the sync message exists in fts_optimize_wq;
protected by fts_optimize_wq mutex */
bool sync_message;
+ /** Whether the table is in fts_slots
+ protected by fts_optimize_wq mutex */
+ bool in_queue;
+
+ /** Whether the table is picked by fts_bg_threads
+ protected by fts_optimize_wq mutex */
+ bool in_process;
+
+ /** Condition variable to wake up the background thread
+ when table is in fts queue */
+ pthread_cond_t fts_queue_cond;
+
/** Heap for fts_t allocation. */
mem_heap_t* fts_heap;
};
@@ -648,6 +660,12 @@ fts_optimize_remove_table(
void
fts_optimize_shutdown();
+/** Send sync fts cache for the table.
+@param[in] table table to sync */
+void
+fts_optimize_request_sync_table(
+ dict_table_t* table);
+
/**********************************************************************//**
Take a FTS savepoint. */
void
@@ -702,8 +720,9 @@ fts_savepoint_rollback_last_stmt(
/** Run SYNC on the table, i.e., write out data from the cache to the
FTS auxiliary INDEX table and clear the cache at the end.
@param[in,out] table fts table
+@param[in] wait whether to wait for existing sync to finish
@return DB_SUCCESS on success, error code on failure. */
-dberr_t fts_sync_table(dict_table_t* table);
+dberr_t fts_sync_table(dict_table_t* table, bool wait = true);
/****************************************************************//**
Create an FTS index cache. */
@@ -938,3 +957,5 @@ fts_update_sync_doc_id(const dict_table_t *table,
/** Sync the table during commit phase
@param[in] table table to be synced */
void fts_sync_during_ddl(dict_table_t* table);
+
+void fts_bg_set_thread_cnt(const uint new_cnt);
diff --git a/storage/innobase/include/fts0types.h b/storage/innobase/include/fts0types.h
index 04e99d595c5..8c91c22bcce 100644
--- a/storage/innobase/include/fts0types.h
+++ b/storage/innobase/include/fts0types.h
@@ -175,6 +175,8 @@ struct fts_node_t {
ulint ilist_size_alloc;
/*!< Allocated size of ilist in
bytes */
+ /** Flag to indicate whether node is synced */
+ bool synced;
};
/** A tokenizer word. Contains information about one word. */
diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h
index 96cfe886c02..5959f567c8e 100644
--- a/storage/innobase/include/srv0srv.h
+++ b/storage/innobase/include/srv0srv.h
@@ -263,6 +263,8 @@ extern unsigned long long srv_max_undo_log_size;
extern uint srv_n_fil_crypt_threads;
extern uint srv_n_fil_crypt_threads_started;
+extern uint srv_n_fts_threads;
+
/** Rate at which UNDO records should be purged. */
extern ulong srv_purge_rseg_truncate_frequency;