summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@mariadb.com>2021-02-16 15:44:45 +0200
committerMarko Mäkelä <marko.makela@mariadb.com>2021-02-16 15:44:45 +0200
commit2a55c8208f42aa5951b9478fe6abaf63d9d5d402 (patch)
treec603a87008b17e2f0f333db2a06dcbad085723c0
parent2ec4b277cc12c005fbe9f308bb8457f055250e97 (diff)
parent1146e98b3af3e2b15df0598a860f4571663a98d0 (diff)
downloadmariadb-git-2a55c8208f42aa5951b9478fe6abaf63d9d5d402.tar.gz
Merge 10.6
-rw-r--r--mysql-test/suite/innodb/r/innodb-truncate.result13
-rw-r--r--mysql-test/suite/innodb/t/innodb-truncate.test16
-rw-r--r--sql/net_serv.cc16
-rw-r--r--sql/scheduler.h2
-rw-r--r--sql/sql_class.cc53
-rw-r--r--sql/sql_class.h159
-rw-r--r--sql/sql_parse.cc93
-rw-r--r--sql/sql_parse.h13
-rw-r--r--sql/threadpool.h3
-rw-r--r--sql/threadpool_common.cc67
-rw-r--r--sql/threadpool_generic.cc5
-rw-r--r--sql/threadpool_win.cc7
-rw-r--r--storage/innobase/include/log0log.h12
-rw-r--r--storage/innobase/lock/lock0lock.cc1
-rw-r--r--storage/innobase/log/log0log.cc10
-rw-r--r--storage/innobase/log/log0sync.cc89
-rw-r--r--storage/innobase/log/log0sync.h15
-rw-r--r--storage/innobase/trx/trx0trx.cc122
18 files changed, 594 insertions, 102 deletions
diff --git a/mysql-test/suite/innodb/r/innodb-truncate.result b/mysql-test/suite/innodb/r/innodb-truncate.result
index 8610a892cc6..3e2ab7936dc 100644
--- a/mysql-test/suite/innodb/r/innodb-truncate.result
+++ b/mysql-test/suite/innodb/r/innodb-truncate.result
@@ -91,3 +91,16 @@ ALTER TABLE t1 FORCE;
TRUNCATE TABLE t1;
ERROR 42000: Cannot truncate a table referenced in a foreign key constraint (`test`.`t2`, CONSTRAINT `t2_ibfk_1` FOREIGN KEY (`f2`) REFERENCES `test`.`t3` (`f2`))
DROP TABLE t2, t1;
+#
+# MDEV-24861 Assertion `trx->rsegs.m_redo.rseg' failed
+# in innodb_prepare_commit_versioned
+#
+CREATE TABLE t1 (id INT PRIMARY KEY, f TEXT UNIQUE,
+s BIGINT UNSIGNED AS ROW START, e BIGINT UNSIGNED AS ROW END,
+PERIOD FOR SYSTEM_TIME(s,e))
+ENGINE=InnoDB WITH SYSTEM VERSIONING;
+CREATE TABLE t2 (id INT PRIMARY KEY) ENGINE=InnoDB;
+ALTER TABLE t1 FORCE;
+TRUNCATE TABLE t2;
+DROP TABLE t1, t2;
+# End of 10.6 tests
diff --git a/mysql-test/suite/innodb/t/innodb-truncate.test b/mysql-test/suite/innodb/t/innodb-truncate.test
index 71c0fcfea8b..4d39fcaef6d 100644
--- a/mysql-test/suite/innodb/t/innodb-truncate.test
+++ b/mysql-test/suite/innodb/t/innodb-truncate.test
@@ -92,3 +92,19 @@ ALTER TABLE t1 FORCE;
--error ER_TRUNCATE_ILLEGAL_FK
TRUNCATE TABLE t1;
DROP TABLE t2, t1;
+
+--echo #
+--echo # MDEV-24861 Assertion `trx->rsegs.m_redo.rseg' failed
+--echo # in innodb_prepare_commit_versioned
+--echo #
+
+CREATE TABLE t1 (id INT PRIMARY KEY, f TEXT UNIQUE,
+ s BIGINT UNSIGNED AS ROW START, e BIGINT UNSIGNED AS ROW END,
+ PERIOD FOR SYSTEM_TIME(s,e))
+ENGINE=InnoDB WITH SYSTEM VERSIONING;
+CREATE TABLE t2 (id INT PRIMARY KEY) ENGINE=InnoDB;
+ALTER TABLE t1 FORCE;
+TRUNCATE TABLE t2;
+DROP TABLE t1, t2;
+
+--echo # End of 10.6 tests
diff --git a/sql/net_serv.cc b/sql/net_serv.cc
index a96c43a94fe..409d3cac85e 100644
--- a/sql/net_serv.cc
+++ b/sql/net_serv.cc
@@ -640,8 +640,20 @@ net_real_write(NET *net,const uchar *packet, size_t len)
my_bool net_blocking = vio_is_blocking(net->vio);
DBUG_ENTER("net_real_write");
-#if defined(MYSQL_SERVER) && defined(USE_QUERY_CACHE)
- query_cache_insert(net->thd, (char*) packet, len, net->pkt_nr);
+#if defined(MYSQL_SERVER)
+ THD *thd= (THD *)net->thd;
+#if defined(USE_QUERY_CACHE)
+ query_cache_insert(thd, (char*) packet, len, net->pkt_nr);
+#endif
+ if (likely(thd))
+ {
+ /*
+ Wait until pending operations (currently it is engine
+ asynchronous group commit) are finished before replying
+ to the client, to keep durability promise.
+ */
+ thd->async_state.wait_for_pending_ops();
+ }
#endif
if (unlikely(net->error == 2))
diff --git a/sql/scheduler.h b/sql/scheduler.h
index ebf8d6e9e64..68387390d81 100644
--- a/sql/scheduler.h
+++ b/sql/scheduler.h
@@ -40,6 +40,8 @@ struct scheduler_functions
void (*thd_wait_end)(THD *thd);
void (*post_kill_notification)(THD *thd);
void (*end)(void);
+ /** resume previous unfinished command (threadpool only)*/
+ void (*thd_resume)(THD* thd);
};
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 3b8414f0c71..5b1d21bb44b 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -682,7 +682,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
m_stmt_da(&main_da),
tdc_hash_pins(0),
xid_hash_pins(0),
- m_tmp_tables_locked(false)
+ m_tmp_tables_locked(false),
+ async_state()
#ifdef HAVE_REPLICATION
,
current_linfo(0),
@@ -4947,6 +4948,56 @@ void reset_thd(MYSQL_THD thd)
free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
}
+/**
+ This function can be used by storage engine
+ to indicate a start of an async operation.
+
+ This asynchronous is such operation needs to be
+ finished before we write response to the client
+.
+ An example of this operation is Innodb's asynchronous
+ group commit. Server needs to wait for the end of it
+ before writing response to client, to provide durability
+ guarantees, in other words, server can't send OK packet
+ before modified data is durable in redo log.
+*/
+extern "C" MYSQL_THD thd_increment_pending_ops(void)
+{
+ THD *thd = current_thd;
+ if (!thd)
+ return NULL;
+ thd->async_state.inc_pending_ops();
+ return thd;
+}
+
+/**
+ This function can be used by plugin/engine to indicate
+ end of async operation (such as end of group commit
+ write flush)
+
+ @param thd THD
+*/
+extern "C" void thd_decrement_pending_ops(MYSQL_THD thd)
+{
+ DBUG_ASSERT(thd);
+ thd_async_state::enum_async_state state;
+ if (thd->async_state.dec_pending_ops(&state) == 0)
+ {
+ switch(state)
+ {
+ case thd_async_state::enum_async_state::SUSPENDED:
+ DBUG_ASSERT(thd->scheduler->thd_resume);
+ thd->scheduler->thd_resume(thd);
+ break;
+ case thd_async_state::enum_async_state::NONE:
+ break;
+ default:
+ DBUG_ASSERT(0);
+ }
+ }
+}
+
+
unsigned long long thd_get_query_id(const MYSQL_THD thd)
{
return((unsigned long long)thd->query_id);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 50b746fe514..5612ed5a790 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2309,6 +2309,164 @@ struct THD_count
~THD_count() { thread_count--; }
};
+/**
+ Support structure for asynchronous group commit, or more generally
+ any asynchronous operation that needs to finish before server writes
+ response to client.
+
+ An engine, or any other server component, can signal that there is
+ a pending operation by incrementing a counter, i.e inc_pending_ops()
+ and that pending operation is finished by decrementing that counter
+ dec_pending_ops().
+
+ NOTE: Currently, pending operations can not fail, i.e there is no
+ way to pass a return code in dec_pending_ops()
+
+ The server does not write response to the client before the counter
+ becomes 0. In case of group commit it ensures that data is persistent
+ before success reported to client, i.e durability in ACID.
+*/
+struct thd_async_state
+{
+ enum class enum_async_state
+ {
+ NONE,
+ SUSPENDED, /* do_command() did not finish, and needs to be resumed */
+ RESUMED /* do_command() is resumed*/
+ };
+ enum_async_state m_state{enum_async_state::NONE};
+
+ /* Stuff we need to resume do_command where we finished last time*/
+ enum enum_server_command m_command{COM_SLEEP};
+ LEX_STRING m_packet{};
+
+ mysql_mutex_t m_mtx;
+ mysql_cond_t m_cond;
+
+ /** Pending counter*/
+ Atomic_counter<int> m_pending_ops=0;
+
+#ifndef DBUG_OFF
+ /* Checks */
+ pthread_t m_dbg_thread;
+#endif
+
+ thd_async_state()
+ {
+ mysql_mutex_init(PSI_NOT_INSTRUMENTED, &m_mtx, 0);
+ mysql_cond_init(PSI_INSTRUMENT_ME, &m_cond, 0);
+ }
+
+ /*
+ Currently only used with threadpool, one can "suspend" and "resume" a THD.
+ Suspend only means leaving do_command earlier, after saving some state.
+ Resume is continuing suspended THD's do_command(), from where it finished last time.
+ */
+ bool try_suspend()
+ {
+ bool ret;
+ mysql_mutex_lock(&m_mtx);
+ DBUG_ASSERT(m_state == enum_async_state::NONE);
+ DBUG_ASSERT(m_pending_ops >= 0);
+
+ if(m_pending_ops)
+ {
+ ret=true;
+ m_state= enum_async_state::SUSPENDED;
+ }
+ else
+ {
+ /*
+ If there is no pending operations, can't suspend, since
+ nobody can resume it.
+ */
+ ret=false;
+ }
+ mysql_mutex_unlock(&m_mtx);
+ return ret;
+ }
+
+ ~thd_async_state()
+ {
+ wait_for_pending_ops();
+ mysql_mutex_destroy(&m_mtx);
+ mysql_cond_destroy(&m_cond);
+ }
+
+ /*
+ Increment pending asynchronous operations.
+ The client response may not be written if
+ this count > 0.
+ So, without threadpool query needs to wait for
+ the operations to finish.
+ With threadpool, THD can be suspended and resumed
+ when this counter goes to 0.
+ */
+ void inc_pending_ops()
+ {
+ mysql_mutex_lock(&m_mtx);
+
+#ifndef DBUG_OFF
+ /*
+ Check that increments are always done by the same thread.
+ */
+ if (!m_pending_ops)
+ m_dbg_thread= pthread_self();
+ else
+ DBUG_ASSERT(pthread_equal(pthread_self(),m_dbg_thread));
+#endif
+
+ m_pending_ops++;
+ mysql_mutex_unlock(&m_mtx);
+ }
+
+ int dec_pending_ops(enum_async_state* state)
+ {
+ int ret;
+ mysql_mutex_lock(&m_mtx);
+ ret= --m_pending_ops;
+ if (!ret)
+ mysql_cond_signal(&m_cond);
+ *state = m_state;
+ mysql_mutex_unlock(&m_mtx);
+ return ret;
+ }
+
+ /*
+ This is used for "dirty" reading pending ops,
+ when dirty read is OK.
+ */
+ int pending_ops()
+ {
+ return m_pending_ops;
+ }
+
+ /* Wait for pending operations to finish.*/
+ void wait_for_pending_ops()
+ {
+ /*
+ It is fine to read m_pending_ops and compare it with 0,
+ without mutex protection.
+
+ The value is only incremented by the current thread, and will
+ be decremented by another one, thus "dirty" may show positive number
+ when it is really 0, but this is not a problem, and the only
+ bad thing from that will be rechecking under mutex.
+ */
+ if (!pending_ops())
+ return;
+
+ mysql_mutex_lock(&m_mtx);
+ DBUG_ASSERT(m_pending_ops >= 0);
+ while (m_pending_ops)
+ mysql_cond_wait(&m_cond, &m_mtx);
+ mysql_mutex_unlock(&m_mtx);
+ }
+};
+
+extern "C" MYSQL_THD thd_increment_pending_ops(void);
+extern "C" void thd_decrement_pending_ops(MYSQL_THD);
+
/**
@class THD
@@ -5025,6 +5183,7 @@ private:
}
public:
+ thd_async_state async_state;
#ifdef HAVE_REPLICATION
/*
If we do a purge of binary logs, log index info of the threads
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 97b7dd427a5..078d48872b0 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -1168,25 +1168,55 @@ static enum enum_server_command fetch_command(THD *thd, char *packet)
/**
Read one command from connection and execute it (query or simple command).
- This function is called in loop from thread function.
+ This function is to be used by different schedulers (one-thread-per-connection,
+ pool-of-threads)
For profiling to work, it must never be called recursively.
+ @param thd - client connection context
+
+ @param blocking - wait for command to finish.
+ if false (nonblocking), then the function might
+ return when command is "half-finished", with
+ DISPATCH_COMMAND_WOULDBLOCK.
+ Currenly, this can *only* happen when using
+ threadpool. The command will resume, after all outstanding
+ async operations (i.e group commit) finish.
+ Threadpool scheduler takes care of "resume".
+
+ @retval
+ DISPATCH_COMMAND_SUCCESS - success
@retval
- 0 success
+ DISPATCH_COMMAND_CLOSE_CONNECTION request of THD shutdown
+ (s. dispatch_command() description)
@retval
- 1 request of thread shutdown (see dispatch_command() description)
+ DISPATCH_COMMAND_WOULDBLOCK - need to wait for asyncronous operations
+ to finish. Only returned if parameter
+ 'blocking' is false.
*/
-bool do_command(THD *thd)
+dispatch_command_return do_command(THD *thd, bool blocking)
{
- bool return_value;
+ dispatch_command_return return_value;
char *packet= 0;
ulong packet_length;
NET *net= &thd->net;
enum enum_server_command command;
DBUG_ENTER("do_command");
+ DBUG_ASSERT(!thd->async_state.pending_ops());
+ if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
+ {
+ /*
+ Resuming previously suspended command.
+ Restore the state
+ */
+ command = thd->async_state.m_command;
+ packet = thd->async_state.m_packet.str;
+ packet_length = (ulong)thd->async_state.m_packet.length;
+ goto resume;
+ }
+
/*
indicator of uninitialized lex => normal flow of errors handling
(see my_message_sql)
@@ -1253,12 +1283,12 @@ bool do_command(THD *thd)
if (net->error != 3)
{
- return_value= TRUE; // We have to close it.
+ return_value= DISPATCH_COMMAND_CLOSE_CONNECTION; // We have to close it.
goto out;
}
net->error= 0;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
goto out;
}
@@ -1325,7 +1355,7 @@ bool do_command(THD *thd)
MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
wsrep_after_command_before_result(thd);
goto out;
@@ -1351,7 +1381,7 @@ bool do_command(THD *thd)
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
wsrep_after_command_before_result(thd);
goto out;
}
@@ -1362,8 +1392,18 @@ bool do_command(THD *thd)
DBUG_ASSERT(packet_length);
DBUG_ASSERT(!thd->apc_target.is_enabled());
+
+resume:
return_value= dispatch_command(command, thd, packet+1,
- (uint) (packet_length-1));
+ (uint) (packet_length-1), blocking);
+ if (return_value == DISPATCH_COMMAND_WOULDBLOCK)
+ {
+ /* Save current state, and resume later.*/
+ thd->async_state.m_command= command;
+ thd->async_state.m_packet={packet,packet_length};
+ DBUG_RETURN(return_value);
+ }
+
DBUG_ASSERT(!thd->apc_target.is_enabled());
out:
@@ -1510,6 +1550,13 @@ public:
@param packet_length length of packet + 1 (to show that data is
null-terminated) except for COM_SLEEP, where it
can be zero.
+ @param blocking if false (nonblocking), then the function might
+ return when command is "half-finished", with
+ DISPATCH_COMMAND_WOULDBLOCK.
+ Currenly, this can *only* happen when using threadpool.
+ The current command will resume, after all outstanding
+ async operations (i.e group commit) finish.
+ Threadpool scheduler takes care of "resume".
@todo
set thd->lex->sql_command to SQLCOM_END here.
@@ -1522,8 +1569,8 @@ public:
1 request of thread shutdown, i. e. if command is
COM_QUIT/COM_SHUTDOWN
*/
-bool dispatch_command(enum enum_server_command command, THD *thd,
- char* packet, uint packet_length)
+dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd,
+ char* packet, uint packet_length, bool blocking)
{
NET *net= &thd->net;
bool error= 0;
@@ -1535,6 +1582,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
"<?>")));
bool drop_more_results= 0;
+ if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
+ {
+ thd->async_state.m_state = thd_async_state::enum_async_state::NONE;
+ goto resume;
+ }
+
/* keep it withing 1 byte */
compile_time_assert(COM_END == 255);
@@ -2265,6 +2318,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
general_log_print(thd, command, NullS);
my_eof(thd);
break;
+
case COM_SLEEP:
case COM_CONNECT: // Impossible here
case COM_TIME: // Impossible from client
@@ -2278,7 +2332,18 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
dispatch_end:
- do_end_of_statement= true;
+ /*
+ For the threadpool i.e if non-blocking call, if not all async operations
+ are finished, return without cleanup. The cleanup will be done on
+ later, when command execution is resumed.
+ */
+ if (!blocking && !error && thd->async_state.pending_ops())
+ {
+ DBUG_RETURN(DISPATCH_COMMAND_WOULDBLOCK);
+ }
+
+resume:
+
#ifdef WITH_WSREP
/*
Next test should really be WSREP(thd), but that causes a failure when doing
@@ -2382,7 +2447,7 @@ dispatch_end:
/* Check that some variables are reset properly */
DBUG_ASSERT(thd->abort_on_warning == 0);
thd->lex->restore_set_statement_var();
- DBUG_RETURN(error);
+ DBUG_RETURN(error?DISPATCH_COMMAND_CLOSE_CONNECTION: DISPATCH_COMMAND_SUCCESS);
}
static bool slow_filter_masked(THD *thd, ulonglong mask)
diff --git a/sql/sql_parse.h b/sql/sql_parse.h
index 37861cf224f..44fd2185726 100644
--- a/sql/sql_parse.h
+++ b/sql/sql_parse.h
@@ -100,9 +100,16 @@ bool multi_delete_set_locks_and_link_aux_tables(LEX *lex);
void create_table_set_open_action_and_adjust_tables(LEX *lex);
int bootstrap(MYSQL_FILE *file);
int mysql_execute_command(THD *thd);
-bool do_command(THD *thd);
-bool dispatch_command(enum enum_server_command command, THD *thd,
- char* packet, uint packet_length);
+enum dispatch_command_return
+{
+ DISPATCH_COMMAND_SUCCESS=0,
+ DISPATCH_COMMAND_CLOSE_CONNECTION= 1,
+ DISPATCH_COMMAND_WOULDBLOCK= 2
+};
+
+dispatch_command_return do_command(THD *thd, bool blocking = true);
+dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd,
+ char* packet, uint packet_length, bool blocking = true);
void log_slow_statement(THD *thd);
bool append_file_to_dir(THD *thd, const char **filename_ptr,
const LEX_CSTRING *table_name);
diff --git a/sql/threadpool.h b/sql/threadpool.h
index 11132b3bf95..7737d056b4a 100644
--- a/sql/threadpool.h
+++ b/sql/threadpool.h
@@ -133,6 +133,7 @@ struct TP_pool
virtual int set_stall_limit(uint){ return 0; }
virtual int get_thread_count() { return tp_stats.num_worker_threads; }
virtual int get_idle_thread_count(){ return 0; }
+ virtual void resume(TP_connection* c)=0;
};
#ifdef _WIN32
@@ -146,6 +147,7 @@ struct TP_pool_win:TP_pool
virtual void add(TP_connection *);
virtual int set_max_threads(uint);
virtual int set_min_threads(uint);
+ void resume(TP_connection *c);
};
#endif
@@ -159,6 +161,7 @@ struct TP_pool_generic :TP_pool
virtual int set_pool_size(uint);
virtual int set_stall_limit(uint);
virtual int get_idle_thread_count();
+ void resume(TP_connection* c);
};
#endif /* HAVE_POOL_OF_THREADS */
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index 168579d984b..07555ac21ed 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -23,6 +23,8 @@
#include <sql_audit.h>
#include <debug_sync.h>
#include <threadpool.h>
+#include <sql_class.h>
+#include <sql_parse.h>
#ifdef WITH_WSREP
#include "wsrep_trans_observer.h"
@@ -51,7 +53,7 @@ TP_STATISTICS tp_stats;
static void threadpool_remove_connection(THD *thd);
-static int threadpool_process_request(THD *thd);
+static dispatch_command_return threadpool_process_request(THD *thd);
static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
extern bool do_command(THD*);
@@ -195,10 +197,30 @@ void tp_callback(TP_connection *c)
}
c->connect= 0;
}
- else if (threadpool_process_request(thd))
+ else
{
- /* QUIT or an error occurred. */
- goto error;
+retry:
+ switch(threadpool_process_request(thd))
+ {
+ case DISPATCH_COMMAND_WOULDBLOCK:
+ if (!thd->async_state.try_suspend())
+ {
+ /*
+ All async operations finished meanwhile, thus nobody is will wake up
+ this THD. Therefore, we'll resume "manually" here.
+ */
+ thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
+ goto retry;
+ }
+ worker_context.restore();
+ return;
+ case DISPATCH_COMMAND_CLOSE_CONNECTION:
+ /* QUIT or an error occurred. */
+ goto error;
+ case DISPATCH_COMMAND_SUCCESS:
+ break;
+ }
+ thd->async_state.m_state= thd_async_state::enum_async_state::NONE;
}
/* Set priority */
@@ -331,10 +353,13 @@ static bool has_unread_data(THD* thd)
/**
Process a single client request or a single batch.
*/
-static int threadpool_process_request(THD *thd)
+static dispatch_command_return threadpool_process_request(THD *thd)
{
- int retval= 0;
+ dispatch_command_return retval= DISPATCH_COMMAND_SUCCESS;
+
thread_attach(thd);
+ if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
+ goto resume;
if (thd->killed >= KILL_CONNECTION)
{
@@ -342,7 +367,7 @@ static int threadpool_process_request(THD *thd)
killed flag was set by timeout handler
or KILL command. Return error.
*/
- retval= 1;
+ retval= DISPATCH_COMMAND_CLOSE_CONNECTION;
if(thd->killed == KILL_WAIT_TIMEOUT)
handle_wait_timeout(thd);
goto end;
@@ -365,19 +390,27 @@ static int threadpool_process_request(THD *thd)
if (mysql_audit_release_required(thd))
mysql_audit_release(thd);
- if ((retval= do_command(thd)) != 0)
- goto end;
+resume:
+ retval= do_command(thd, false);
+ switch(retval)
+ {
+ case DISPATCH_COMMAND_WOULDBLOCK:
+ case DISPATCH_COMMAND_CLOSE_CONNECTION:
+ goto end;
+ case DISPATCH_COMMAND_SUCCESS:
+ break;
+ }
if (!thd_is_connection_alive(thd))
{
- retval= 1;
+ retval=DISPATCH_COMMAND_CLOSE_CONNECTION;
goto end;
}
set_thd_idle(thd);
if (!has_unread_data(thd))
- {
+ {
/* More info on this debug sync is in sql_parse.cc*/
DEBUG_SYNC(thd, "before_do_command_net_read");
goto end;
@@ -527,6 +560,15 @@ static void tp_post_kill_notification(THD *thd)
post_kill_notification(thd);
}
+/* Resume previously suspended THD */
+static void tp_resume(THD* thd)
+{
+ DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPENDED);
+ thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
+ TP_connection* c = get_TP_connection(thd);
+ pool->resume(c);
+}
+
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
@@ -537,7 +579,8 @@ static scheduler_functions tp_scheduler_functions=
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
tp_post_kill_notification, // post kill notification
- tp_end // end
+ tp_end, // end
+ tp_resume
};
void pool_of_threads_scheduler(struct scheduler_functions *func,
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc
index 709bb95eb98..19193be0354 100644
--- a/sql/threadpool_generic.cc
+++ b/sql/threadpool_generic.cc
@@ -1327,7 +1327,10 @@ void TP_pool_generic::add(TP_connection *c)
DBUG_VOID_RETURN;
}
-
+void TP_pool_generic::resume(TP_connection* c)
+{
+ add(c);
+}
/**
MySQL scheduler callback: wait begin
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc
index df8a6c216a3..ed68e31c755 100644
--- a/sql/threadpool_win.cc
+++ b/sql/threadpool_win.cc
@@ -125,6 +125,12 @@ void TP_pool_win::add(TP_connection *c)
}
}
+void TP_pool_win::resume(TP_connection* c)
+{
+ DBUG_ASSERT(c->state == TP_STATE_RUNNING);
+ SubmitThreadpoolWork(((TP_connection_win*)c)->work);
+}
+
#define CHECK_ALLOC_ERROR(op) \
do \
{ \
@@ -438,3 +444,4 @@ TP_connection *TP_pool_win::new_connection(CONNECT *connect)
}
return c;
}
+
diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h
index 1a79738bc9b..a3085f0f2dd 100644
--- a/storage/innobase/include/log0log.h
+++ b/storage/innobase/include/log0log.h
@@ -103,15 +103,21 @@ bool
log_set_capacity(ulonglong file_size)
MY_ATTRIBUTE((warn_unused_result));
-/** Ensure that the log has been written to the log file up to a given
+/**
+Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
wait and check if an already running write is covering the request.
@param[in] lsn log sequence number that should be
included in the redo log file write
@param[in] flush_to_disk whether the written log should also
be flushed to the file system
-@param[in] rotate_key whether to rotate the encryption key */
-void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false);
+@param[in] rotate_key whether to rotate the encryption key
+@param[in] cb completion callback. If not NULL, the callback will be called
+ whenever lsn is written or flushed.
+*/
+struct completion_callback;
+void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false,
+ const completion_callback* cb=nullptr);
/** write to the log file up to the last log entry.
@param[in] sync whether we want the written log
diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc
index 39b62fcc74f..8a3351dff56 100644
--- a/storage/innobase/lock/lock0lock.cc
+++ b/storage/innobase/lock/lock0lock.cc
@@ -123,6 +123,7 @@ void lock_sys_t::hash_table::resize(ulint n)
}
}
+ aligned_free(array);
array= new_array;
n_cells= new_n_cells;
}
diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc
index 95167d6a487..ff81f9099df 100644
--- a/storage/innobase/log/log0log.cc
+++ b/storage/innobase/log/log0log.cc
@@ -771,6 +771,7 @@ bool log_write_lock_own()
}
#endif
+
/** Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
wait and check if an already running write is covering the request.
@@ -779,7 +780,8 @@ included in the redo log file write
@param[in] flush_to_disk whether the written log should also
be flushed to the file system
@param[in] rotate_key whether to rotate the encryption key */
-void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
+void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key,
+ const completion_callback *callback)
{
ut_ad(!srv_read_only_mode);
ut_ad(!rotate_key || flush_to_disk);
@@ -788,16 +790,18 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
{
/* Recovery is running and no operations on the log files are
allowed yet (the variable name .._no_ibuf_.. is misleading) */
+ ut_a(!callback);
return;
}
if (flush_to_disk &&
- flush_lock.acquire(lsn) != group_commit_lock::ACQUIRED)
+ flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED)
{
return;
}
- if (write_lock.acquire(lsn) == group_commit_lock::ACQUIRED)
+ if (write_lock.acquire(lsn, flush_to_disk?0:callback) ==
+ group_commit_lock::ACQUIRED)
{
mysql_mutex_lock(&log_sys.mutex);
lsn_t write_lsn= log_sys.get_lsn();
diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc
index 2a6e1b8b853..30433007639 100644
--- a/storage/innobase/log/log0sync.cc
+++ b/storage/innobase/log/log0sync.cc
@@ -77,6 +77,7 @@ Note that if write operation is very fast, a) or b) can be fine as alternative.
#include <log0types.h>
#include "log0sync.h"
#include <mysql/service_thd_wait.h>
+#include <sql_class.h>
/**
Helper class , used in group commit lock.
@@ -158,10 +159,10 @@ void binary_semaphore::wake()
/* A thread helper structure, used in group commit lock below*/
struct group_commit_waiter_t
{
- lsn_t m_value;
- binary_semaphore m_sema;
- group_commit_waiter_t* m_next;
- group_commit_waiter_t() :m_value(), m_sema(), m_next() {}
+ lsn_t m_value=0;
+ binary_semaphore m_sema{};
+ group_commit_waiter_t* m_next= nullptr;
+ bool m_group_commit_leader=false;
};
group_commit_lock::group_commit_lock() :
@@ -188,7 +189,13 @@ void group_commit_lock::set_pending(group_commit_lock::value_type num)
const unsigned int MAX_SPINS = 1; /** max spins in acquire */
thread_local group_commit_waiter_t thread_local_waiter;
-group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
+static inline void do_completion_callback(const completion_callback* cb)
+{
+ if (cb)
+ cb->m_callback(cb->m_param);
+}
+
+group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback)
{
unsigned int spins = MAX_SPINS;
@@ -197,6 +204,7 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
if (num <= value())
{
/* No need to wait.*/
+ do_completion_callback(callback);
return lock_return_code::EXPIRED;
}
@@ -212,17 +220,23 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
}
thread_local_waiter.m_value = num;
+ thread_local_waiter.m_group_commit_leader= false;
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
while (num > value())
{
lk.lock();
/* Re-read current value after acquiring the lock*/
- if (num <= value())
+ if (num <= value() &&
+ (!thread_local_waiter.m_group_commit_leader || m_lock))
{
+ lk.unlock();
+ do_completion_callback(callback);
+ thread_local_waiter.m_group_commit_leader=false;
return lock_return_code::EXPIRED;
}
+ thread_local_waiter.m_group_commit_leader= false;
if (!m_lock)
{
/* Take the lock, become group commit leader.*/
@@ -230,9 +244,21 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
#ifndef DBUG_OFF
m_owner_id = std::this_thread::get_id();
#endif
+ if (callback)
+ m_pending_callbacks.push_back({num,*callback});
return lock_return_code::ACQUIRED;
}
+ if (callback && m_waiters_list)
+ {
+ /*
+ We need to have at least one waiter,
+ so it can become the new group commit leader.
+ */
+ m_pending_callbacks.push_back({num, *callback});
+ return lock_return_code::CALLBACK_QUEUED;
+ }
+
/* Add yourself to waiters list.*/
thread_local_waiter.m_next = m_waiters_list;
m_waiters_list = &thread_local_waiter;
@@ -244,11 +270,15 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
thd_wait_end(0);
}
+ do_completion_callback(callback);
return lock_return_code::EXPIRED;
}
void group_commit_lock::release(value_type num)
{
+ completion_callback callbacks[1000];
+ size_t callback_count = 0;
+
std::unique_lock<std::mutex> lk(m_mtx);
m_lock = false;
@@ -262,12 +292,21 @@ void group_commit_lock::release(value_type num)
*/
group_commit_waiter_t* cur, * prev, * next;
group_commit_waiter_t* wakeup_list = nullptr;
- int extra_wake = 0;
+ for (auto& c : m_pending_callbacks)
+ {
+ if (c.first <= num)
+ {
+ if (callback_count < array_elements(callbacks))
+ callbacks[callback_count++] = c.second;
+ else
+ c.second.m_callback(c.second.m_param);
+ }
+ }
for (prev= nullptr, cur= m_waiters_list; cur; cur= next)
{
next= cur->m_next;
- if (cur->m_value <= num || extra_wake++ == 0)
+ if (cur->m_value <= num)
{
/* Move current waiter to wakeup_list*/
@@ -291,8 +330,42 @@ void group_commit_lock::release(value_type num)
prev= cur;
}
}
+
+ auto it= std::remove_if(
+ m_pending_callbacks.begin(), m_pending_callbacks.end(),
+ [num](const pending_cb &c) { return c.first <= num; });
+
+ m_pending_callbacks.erase(it, m_pending_callbacks.end());
+
+ if (m_pending_callbacks.size() || m_waiters_list)
+ {
+ /*
+ Ensure that after this thread released the lock,
+ there is a new group commit leader
+ We take this from waiters list or wakeup list. It
+ might look like a spurious wake, but in fact we just
+ ensure the waiter do not wait for eternity.
+ */
+ if (m_waiters_list)
+ {
+ /* Move one waiter to wakeup list */
+ auto e= m_waiters_list;
+ m_waiters_list= m_waiters_list->m_next;
+ e->m_next= wakeup_list;
+ e->m_group_commit_leader= true;
+ wakeup_list = e;
+ }
+ else if (wakeup_list)
+ {
+ wakeup_list->m_group_commit_leader=true;
+ }
+ }
+
lk.unlock();
+ for (size_t i = 0; i < callback_count; i++)
+ callbacks[i].m_callback(callbacks[i].m_param);
+
for (cur= wakeup_list; cur; cur= next)
{
next= cur->m_next;
diff --git a/storage/innobase/log/log0sync.h b/storage/innobase/log/log0sync.h
index 40afbf74ecd..ec4ad6eb99b 100644
--- a/storage/innobase/log/log0sync.h
+++ b/storage/innobase/log/log0sync.h
@@ -18,8 +18,14 @@ this program; if not, write to the Free Software Foundation, Inc.,
#include <atomic>
#include <thread>
#include <log0types.h>
+#include <vector>
struct group_commit_waiter_t;
+struct completion_callback
+{
+ void (*m_callback)(void*);
+ void* m_param;
+};
/**
Special synchronization primitive, which is helpful for
@@ -63,14 +69,19 @@ class group_commit_lock
std::atomic<value_type> m_pending_value;
bool m_lock;
group_commit_waiter_t* m_waiters_list;
+
+ typedef std::pair<value_type, completion_callback> pending_cb;
+ std::vector<pending_cb> m_pending_callbacks;
+
public:
group_commit_lock();
enum lock_return_code
{
ACQUIRED,
- EXPIRED
+ EXPIRED,
+ CALLBACK_QUEUED
};
- lock_return_code acquire(value_type num);
+ lock_return_code acquire(value_type num, const completion_callback *cb);
void release(value_type num);
value_type value() const;
value_type pending() const;
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index 2d9a7d43408..413668e0359 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -1162,35 +1162,49 @@ trx_finalize_for_fts(
trx->fts_trx = NULL;
}
-/**********************************************************************//**
-If required, flushes the log to disk based on the value of
-innodb_flush_log_at_trx_commit. */
-static
-void
-trx_flush_log_if_needed_low(
-/*========================*/
- lsn_t lsn) /*!< in: lsn up to which logs are to be
- flushed. */
+extern "C" MYSQL_THD thd_increment_pending_ops();
+extern "C" void thd_decrement_pending_ops(MYSQL_THD);
+
+
+#include "../log/log0sync.h"
+
+/*
+ If required, initiates write and optionally flush of the log to
+ disk
+ @param[in] lsn - lsn up to which logs are to be flushed.
+ @param[in] trx_state - if trx_state is PREPARED, the function will
+ also wait for the flush to complete.
+*/
+static void trx_flush_log_if_needed_low(lsn_t lsn, trx_state_t trx_state)
{
- bool flush = srv_file_flush_method != SRV_NOSYNC;
+ if (!srv_flush_log_at_trx_commit)
+ return;
- switch (srv_flush_log_at_trx_commit) {
- case 3:
- case 2:
- /* Write the log but do not flush it to disk */
- flush = false;
- /* fall through */
- case 1:
- /* Write the log and optionally flush it to disk */
- log_write_up_to(lsn, flush);
- srv_inc_activity_count();
- return;
- case 0:
- /* Do nothing */
- return;
- }
+ if (log_sys.get_flushed_lsn() > lsn)
+ return;
- ut_error;
+ bool flush= srv_file_flush_method != SRV_NOSYNC &&
+ srv_flush_log_at_trx_commit == 1;
+
+ if (trx_state == TRX_STATE_PREPARED)
+ {
+ /* XA, which is used with binlog as well.
+ Be conservative, use synchronous wait.*/
+ log_write_up_to(lsn, flush);
+ return;
+ }
+
+ completion_callback cb;
+ if ((cb.m_param = thd_increment_pending_ops()))
+ {
+ cb.m_callback = (void (*)(void *)) thd_decrement_pending_ops;
+ log_write_up_to(lsn, flush, false, &cb);
+ }
+ else
+ {
+ /* No THD, synchronous write */
+ log_write_up_to(lsn, flush);
+ }
}
/**********************************************************************//**
@@ -1205,51 +1219,53 @@ trx_flush_log_if_needed(
trx_t* trx) /*!< in/out: transaction */
{
trx->op_info = "flushing log";
- trx_flush_log_if_needed_low(lsn);
+ trx_flush_log_if_needed_low(lsn,trx->state);
trx->op_info = "";
}
/** Process tables that were modified by the committing transaction. */
inline void trx_t::commit_tables()
{
- if (!undo_no || mod_tables.empty())
+ if (mod_tables.empty())
return;
+ if (undo_no)
+ {
#if defined SAFE_MUTEX && defined UNIV_DEBUG
- const bool preserve_tables= !innodb_evict_tables_on_commit_debug ||
- is_recovered || /* avoid trouble with XA recovery */
+ const bool preserve_tables= !innodb_evict_tables_on_commit_debug ||
+ is_recovered || /* avoid trouble with XA recovery */
# if 1 /* if dict_stats_exec_sql() were not playing dirty tricks */
- dict_sys.mutex_is_locked();
+ dict_sys.mutex_is_locked();
# else /* this would be more proper way to do it */
- dict_operation_lock_mode || dict_operation;
+ dict_operation_lock_mode || dict_operation;
# endif
#endif
- const trx_id_t max_trx_id= trx_sys.get_max_trx_id();
- const auto now= start_time;
+ const trx_id_t max_trx_id= trx_sys.get_max_trx_id();
+ const auto now= start_time;
- for (const auto& p : mod_tables)
- {
- dict_table_t *table= p.first;
- table->update_time= now;
- table->query_cache_inv_trx_id= max_trx_id;
+ for (const auto& p : mod_tables)
+ {
+ dict_table_t *table= p.first;
+ table->update_time= now;
+ table->query_cache_inv_trx_id= max_trx_id;
#if defined SAFE_MUTEX && defined UNIV_DEBUG
- if (preserve_tables || table->get_ref_count() || table->is_temporary() ||
- UT_LIST_GET_LEN(table->locks))
- /* do not evict when committing DDL operations or if some other
- transaction is holding the table handle */
- continue;
- /* recheck while holding the mutex that blocks
- table->acquire() */
- dict_sys.mutex_lock();
- {
- LockMutexGuard g{SRW_LOCK_CALL};
- if (!table->get_ref_count() && !UT_LIST_GET_LEN(table->locks))
- dict_sys.remove(table, true);
- }
- dict_sys.mutex_unlock();
+ if (preserve_tables || table->get_ref_count() || table->is_temporary() ||
+ UT_LIST_GET_LEN(table->locks))
+ /* do not evict when committing DDL operations or if some other
+ transaction is holding the table handle */
+ continue;
+ /* recheck while holding the mutex that blocks table->acquire() */
+ dict_sys.mutex_lock();
+ {
+ LockMutexGuard g{SRW_LOCK_CALL};
+ if (!table->get_ref_count() && !UT_LIST_GET_LEN(table->locks))
+ dict_sys.remove(table, true);
+ }
+ dict_sys.mutex_unlock();
#endif
+ }
}
mod_tables.clear();