summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2020-12-11 02:29:46 +0100
committerVladislav Vaintroub <wlad@mariadb.com>2021-01-26 21:13:28 +0100
commitc4d15d677a981cec07d3313ae7a444a08f36dfbb (patch)
tree1511ba3c830b84882f1433d8e42ffd20e8af7038
parent3f871b339429441ad907ecf7dfabdc414797e664 (diff)
downloadmariadb-git-bb-10.6-MDEV-24341.tar.gz
MDEV-24341 reduce or eliminate waits for Innodb redo log group commit in foreground threadsbb-10.6-MDEV-24341
-rw-r--r--sql/net_serv.cc16
-rw-r--r--sql/scheduler.h2
-rw-r--r--sql/sql_class.cc53
-rw-r--r--sql/sql_class.h147
-rw-r--r--sql/sql_parse.cc81
-rw-r--r--sql/sql_parse.h13
-rw-r--r--sql/threadpool.h3
-rw-r--r--sql/threadpool_common.cc67
-rw-r--r--sql/threadpool_generic.cc5
-rw-r--r--sql/threadpool_win.cc7
-rw-r--r--storage/innobase/include/log0log.h12
-rw-r--r--storage/innobase/log/log0log.cc10
-rw-r--r--storage/innobase/log/log0sync.cc90
-rw-r--r--storage/innobase/log/log0sync.h13
-rw-r--r--storage/innobase/trx/trx0trx.cc69
15 files changed, 512 insertions, 76 deletions
diff --git a/sql/net_serv.cc b/sql/net_serv.cc
index a96c43a94fe..409d3cac85e 100644
--- a/sql/net_serv.cc
+++ b/sql/net_serv.cc
@@ -640,8 +640,20 @@ net_real_write(NET *net,const uchar *packet, size_t len)
my_bool net_blocking = vio_is_blocking(net->vio);
DBUG_ENTER("net_real_write");
-#if defined(MYSQL_SERVER) && defined(USE_QUERY_CACHE)
- query_cache_insert(net->thd, (char*) packet, len, net->pkt_nr);
+#if defined(MYSQL_SERVER)
+ THD *thd= (THD *)net->thd;
+#if defined(USE_QUERY_CACHE)
+ query_cache_insert(thd, (char*) packet, len, net->pkt_nr);
+#endif
+ if (likely(thd))
+ {
+ /*
+ Wait until pending operations (currently it is engine
+ asynchronous group commit) are finished before replying
+ to the client, to keep durability promise.
+ */
+ thd->async_state.wait_for_pending_ops();
+ }
#endif
if (unlikely(net->error == 2))
diff --git a/sql/scheduler.h b/sql/scheduler.h
index ebf8d6e9e64..68387390d81 100644
--- a/sql/scheduler.h
+++ b/sql/scheduler.h
@@ -40,6 +40,8 @@ struct scheduler_functions
void (*thd_wait_end)(THD *thd);
void (*post_kill_notification)(THD *thd);
void (*end)(void);
+ /** resume previous unfinished command (threadpool only)*/
+ void (*thd_resume)(THD* thd);
};
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 3b8414f0c71..5b1d21bb44b 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -682,7 +682,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
m_stmt_da(&main_da),
tdc_hash_pins(0),
xid_hash_pins(0),
- m_tmp_tables_locked(false)
+ m_tmp_tables_locked(false),
+ async_state()
#ifdef HAVE_REPLICATION
,
current_linfo(0),
@@ -4947,6 +4948,56 @@ void reset_thd(MYSQL_THD thd)
free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
}
+/**
+ This function can be used by storage engine
+ to indicate a start of an async operation.
+
+ This asynchronous is such operation needs to be
+ finished before we write response to the client
+.
+ An example of this operation is Innodb's asynchronous
+ group commit. Server needs to wait for the end of it
+ before writing response to client, to provide durability
+ guarantees, in other words, server can't send OK packet
+ before modified data is durable in redo log.
+*/
+extern "C" MYSQL_THD thd_increment_pending_ops(void)
+{
+ THD *thd = current_thd;
+ if (!thd)
+ return NULL;
+ thd->async_state.inc_pending_ops();
+ return thd;
+}
+
+/**
+ This function can be used by plugin/engine to indicate
+ end of async operation (such as end of group commit
+ write flush)
+
+ @param thd THD
+*/
+extern "C" void thd_decrement_pending_ops(MYSQL_THD thd)
+{
+ DBUG_ASSERT(thd);
+ thd_async_state::enum_async_state state;
+ if (thd->async_state.dec_pending_ops(&state) == 0)
+ {
+ switch(state)
+ {
+ case thd_async_state::enum_async_state::SUSPENDED:
+ DBUG_ASSERT(thd->scheduler->thd_resume);
+ thd->scheduler->thd_resume(thd);
+ break;
+ case thd_async_state::enum_async_state::NONE:
+ break;
+ default:
+ DBUG_ASSERT(0);
+ }
+ }
+}
+
+
unsigned long long thd_get_query_id(const MYSQL_THD thd)
{
return((unsigned long long)thd->query_id);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 4532c69e8e1..fb365dbf144 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2308,6 +2308,152 @@ struct THD_count
~THD_count() { thread_count--; }
};
+/**
+ Support structure for asynchronous group commit
+ (or generally any asynchronous operation that needs
+ to finish before server writes response to client)
+*/
+struct thd_async_state
+{
+ enum class enum_async_state
+ {
+ NONE,
+ SUSPENDED, /* do_command() did not finish, and needs to be resumed */
+ RESUMED /* do_command() is resumed*/
+ };
+ enum_async_state m_state{enum_async_state::NONE};
+
+ /* Stuff we need to resume do_command where we finished last time*/
+ enum enum_server_command m_command{COM_SLEEP};
+ LEX_STRING m_packet{};
+
+ mysql_mutex_t m_mtx;
+ mysql_cond_t m_cond;
+
+ /** Pending counter*/
+ Atomic_counter<int> m_pending_ops=0;
+
+#ifndef DBUG_OFF
+ /* Checks */
+ pthread_t m_dbg_thread;
+#endif
+
+ thd_async_state()
+ {
+ mysql_mutex_init(PSI_NOT_INSTRUMENTED, &m_mtx, 0);
+ mysql_cond_init(PSI_INSTRUMENT_ME, &m_cond, 0);
+ }
+
+ /*
+ Currently only used with threadpool, one can "suspend" and "resume" a THD.
+ Suspend only means leaving do_command earlier, after saving some state.
+ Resume is continuing suspended THD's do_command(), from where it finished last time.
+ */
+ bool try_suspend()
+ {
+ bool ret;
+ mysql_mutex_lock(&m_mtx);
+ DBUG_ASSERT(m_state == enum_async_state::NONE);
+ DBUG_ASSERT(m_pending_ops >= 0);
+
+ if(m_pending_ops)
+ {
+ ret=true;
+ m_state= enum_async_state::SUSPENDED;
+ }
+ else
+ {
+ /*
+ If there is no pending operations, can't suspend, since
+ nobody can resume it.
+ */
+ ret=false;
+ }
+ mysql_mutex_unlock(&m_mtx);
+ return ret;
+ }
+
+ ~thd_async_state()
+ {
+ wait_for_pending_ops();
+ mysql_mutex_destroy(&m_mtx);
+ mysql_cond_destroy(&m_cond);
+ }
+
+ /*
+ Increment pending asynchronous operations.
+ The client response may not be written if
+ this count > 0.
+ So, without threadpool query needs to wait for
+ the operations to finish.
+ With threadpool, THD can be suspended and resumed
+ when this counter goes to 0.
+ */
+ void inc_pending_ops()
+ {
+ mysql_mutex_lock(&m_mtx);
+
+#ifndef DBUG_OFF
+ /*
+ Check that increments are always done by the same thread.
+ */
+ if (!m_pending_ops)
+ m_dbg_thread= pthread_self();
+ else
+ DBUG_ASSERT(pthread_equal(pthread_self(),m_dbg_thread));
+#endif
+
+ m_pending_ops++;
+ mysql_mutex_unlock(&m_mtx);
+ }
+
+ int dec_pending_ops(enum_async_state* state)
+ {
+ int ret;
+ mysql_mutex_lock(&m_mtx);
+ ret= --m_pending_ops;
+ if (!ret)
+ mysql_cond_signal(&m_cond);
+ *state = m_state;
+ mysql_mutex_unlock(&m_mtx);
+ return ret;
+ }
+
+ /*
+ This is used for "dirty" reading pending ops,
+ when dirty read is OK.
+ */
+ int pending_ops()
+ {
+ return m_pending_ops;
+ }
+
+ /* Wait for pending operations to finish.*/
+ void wait_for_pending_ops()
+ {
+ /*
+ It is fine to read m_pending_ops and compare it with 0,
+ without mutex protection.
+
+ The value is only incremented by the current thread, and will
+ be decremented by another one, thus "dirty" may show positive number
+ when it is really 0, but this is not a problem, and the only
+ bad thing from that will be rechecking under mutex.
+ */
+ if (!pending_ops())
+ return;
+
+ mysql_mutex_lock(&m_mtx);
+ DBUG_ASSERT(m_pending_ops >= 0);
+ while (m_pending_ops)
+ mysql_cond_wait(&m_cond, &m_mtx);
+ mysql_mutex_unlock(&m_mtx);
+ }
+};
+
+extern "C" MYSQL_THD thd_increment_pending_ops(void);
+extern "C" void thd_decrement_pending_ops(MYSQL_THD);
+
/**
@class THD
@@ -5024,6 +5170,7 @@ private:
}
public:
+ thd_async_state async_state;
#ifdef HAVE_REPLICATION
/*
If we do a purge of binary logs, log index info of the threads
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index d976e75843a..6c6068945aa 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -1168,25 +1168,50 @@ static enum enum_server_command fetch_command(THD *thd, char *packet)
/**
Read one command from connection and execute it (query or simple command).
- This function is called in loop from thread function.
+ This function is to be used by different schedulers (one-thread-per-connection,
+ pool-of-threads)
For profiling to work, it must never be called recursively.
+ @param thd - client connection context
+
+ @param blocking - wait for command to finish.
+ if true, will wait for outstanding operations (e.g group commit) to finish,
+ before returning. otherwise, it might return DISPATCH_COMMAND_WOULDBLOCK,
+ in this case another do_command() needs to be executed to finish the current
+ command.
+
+ @retval
+ DISPATCH_COMMAND_SUCCESS - success
@retval
- 0 success
+ DISPATCH_COMMAND_ERROR request of THD shutdown (see dispatch_command() description)
@retval
- 1 request of thread shutdown (see dispatch_command() description)
+ DISPATCH_COMMAND_WOULDBLOCK - need to wait for asyncronous operations
+ to finish, only returned if blocking=false,
*/
-bool do_command(THD *thd)
+dispatch_command_return do_command(THD *thd, bool blocking)
{
- bool return_value;
+ dispatch_command_return return_value;
char *packet= 0;
ulong packet_length;
NET *net= &thd->net;
enum enum_server_command command;
DBUG_ENTER("do_command");
+ DBUG_ASSERT(!thd->async_state.pending_ops());
+ if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
+ {
+ /*
+ Resuming previously suspended command.
+ Restore the state
+ */
+ command = thd->async_state.m_command;
+ packet = thd->async_state.m_packet.str;
+ packet_length = (ulong)thd->async_state.m_packet.length;
+ goto resume;
+ }
+
/*
indicator of uninitialized lex => normal flow of errors handling
(see my_message_sql)
@@ -1253,12 +1278,12 @@ bool do_command(THD *thd)
if (net->error != 3)
{
- return_value= TRUE; // We have to close it.
+ return_value= DISPATCH_COMMAND_ERROR; // We have to close it.
goto out;
}
net->error= 0;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
goto out;
}
@@ -1325,7 +1350,7 @@ bool do_command(THD *thd)
MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
wsrep_after_command_before_result(thd);
goto out;
@@ -1351,7 +1376,7 @@ bool do_command(THD *thd)
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
wsrep_after_command_before_result(thd);
goto out;
}
@@ -1362,8 +1387,18 @@ bool do_command(THD *thd)
DBUG_ASSERT(packet_length);
DBUG_ASSERT(!thd->apc_target.is_enabled());
+
+resume:
return_value= dispatch_command(command, thd, packet+1,
- (uint) (packet_length-1));
+ (uint) (packet_length-1), blocking);
+ if (return_value == DISPATCH_COMMAND_WOULDBLOCK)
+ {
+ /* Save current state, and resume later.*/
+ thd->async_state.m_command= command;
+ thd->async_state.m_packet={packet,packet_length};
+ DBUG_RETURN(return_value);
+ }
+
DBUG_ASSERT(!thd->apc_target.is_enabled());
out:
@@ -1522,8 +1557,8 @@ public:
1 request of thread shutdown, i. e. if command is
COM_QUIT/COM_SHUTDOWN
*/
-bool dispatch_command(enum enum_server_command command, THD *thd,
- char* packet, uint packet_length)
+dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd,
+ char* packet, uint packet_length, bool blocking)
{
NET *net= &thd->net;
bool error= 0;
@@ -1535,6 +1570,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
"<?>")));
bool drop_more_results= 0;
+ if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
+ {
+ thd->async_state.m_state = thd_async_state::enum_async_state::NONE;
+ goto resume;
+ }
+
/* keep it withing 1 byte */
compile_time_assert(COM_END == 255);
@@ -2265,6 +2306,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
general_log_print(thd, command, NullS);
my_eof(thd);
break;
+
case COM_SLEEP:
case COM_CONNECT: // Impossible here
case COM_TIME: // Impossible from client
@@ -2278,7 +2320,18 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
dispatch_end:
- do_end_of_statement= true;
+ /*
+ For the threadpool i.e if non-blocking call, if not all async operations
+ are finished, return without cleanup. The cleanup will be done on
+ later, when command execution is resumed.
+ */
+ if (!blocking && !error && thd->async_state.pending_ops())
+ {
+ DBUG_RETURN(DISPATCH_COMMAND_WOULDBLOCK);
+ }
+
+resume:
+
#ifdef WITH_WSREP
/*
Next test should really be WSREP(thd), but that causes a failure when doing
@@ -2382,7 +2435,7 @@ dispatch_end:
/* Check that some variables are reset properly */
DBUG_ASSERT(thd->abort_on_warning == 0);
thd->lex->restore_set_statement_var();
- DBUG_RETURN(error);
+ DBUG_RETURN(error?DISPATCH_COMMAND_ERROR: DISPATCH_COMMAND_SUCCESS);
}
static bool slow_filter_masked(THD *thd, ulonglong mask)
diff --git a/sql/sql_parse.h b/sql/sql_parse.h
index 37861cf224f..793081b2930 100644
--- a/sql/sql_parse.h
+++ b/sql/sql_parse.h
@@ -100,9 +100,16 @@ bool multi_delete_set_locks_and_link_aux_tables(LEX *lex);
void create_table_set_open_action_and_adjust_tables(LEX *lex);
int bootstrap(MYSQL_FILE *file);
int mysql_execute_command(THD *thd);
-bool do_command(THD *thd);
-bool dispatch_command(enum enum_server_command command, THD *thd,
- char* packet, uint packet_length);
+enum dispatch_command_return
+{
+ DISPATCH_COMMAND_SUCCESS=0,
+ DISPATCH_COMMAND_ERROR= 1,
+ DISPATCH_COMMAND_WOULDBLOCK= 2
+};
+
+dispatch_command_return do_command(THD *thd, bool blocking = true);
+dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd,
+ char* packet, uint packet_length, bool blocking = true);
void log_slow_statement(THD *thd);
bool append_file_to_dir(THD *thd, const char **filename_ptr,
const LEX_CSTRING *table_name);
diff --git a/sql/threadpool.h b/sql/threadpool.h
index 11132b3bf95..7737d056b4a 100644
--- a/sql/threadpool.h
+++ b/sql/threadpool.h
@@ -133,6 +133,7 @@ struct TP_pool
virtual int set_stall_limit(uint){ return 0; }
virtual int get_thread_count() { return tp_stats.num_worker_threads; }
virtual int get_idle_thread_count(){ return 0; }
+ virtual void resume(TP_connection* c)=0;
};
#ifdef _WIN32
@@ -146,6 +147,7 @@ struct TP_pool_win:TP_pool
virtual void add(TP_connection *);
virtual int set_max_threads(uint);
virtual int set_min_threads(uint);
+ void resume(TP_connection *c);
};
#endif
@@ -159,6 +161,7 @@ struct TP_pool_generic :TP_pool
virtual int set_pool_size(uint);
virtual int set_stall_limit(uint);
virtual int get_idle_thread_count();
+ void resume(TP_connection* c);
};
#endif /* HAVE_POOL_OF_THREADS */
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index 168579d984b..f0a0b416b9c 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -23,6 +23,8 @@
#include <sql_audit.h>
#include <debug_sync.h>
#include <threadpool.h>
+#include <sql_class.h>
+#include <sql_parse.h>
#ifdef WITH_WSREP
#include "wsrep_trans_observer.h"
@@ -51,7 +53,7 @@ TP_STATISTICS tp_stats;
static void threadpool_remove_connection(THD *thd);
-static int threadpool_process_request(THD *thd);
+static dispatch_command_return threadpool_process_request(THD *thd);
static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
extern bool do_command(THD*);
@@ -195,10 +197,30 @@ void tp_callback(TP_connection *c)
}
c->connect= 0;
}
- else if (threadpool_process_request(thd))
+ else
{
- /* QUIT or an error occurred. */
- goto error;
+retry:
+ switch(threadpool_process_request(thd))
+ {
+ case DISPATCH_COMMAND_WOULDBLOCK:
+ if (!thd->async_state.try_suspend())
+ {
+ /*
+ All async operations finished meanwhile, thus nobody is will wake up
+ this THD. Therefore, we'll resume "manually" here.
+ */
+ thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
+ goto retry;
+ }
+ worker_context.restore();
+ return;
+ case DISPATCH_COMMAND_ERROR:
+ /* QUIT or an error occurred. */
+ goto error;
+ case DISPATCH_COMMAND_SUCCESS:
+ break;
+ }
+ thd->async_state.m_state= thd_async_state::enum_async_state::NONE;
}
/* Set priority */
@@ -331,10 +353,13 @@ static bool has_unread_data(THD* thd)
/**
Process a single client request or a single batch.
*/
-static int threadpool_process_request(THD *thd)
+static dispatch_command_return threadpool_process_request(THD *thd)
{
- int retval= 0;
+ dispatch_command_return retval= DISPATCH_COMMAND_SUCCESS;
+
thread_attach(thd);
+ if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
+ goto resume;
if (thd->killed >= KILL_CONNECTION)
{
@@ -342,7 +367,7 @@ static int threadpool_process_request(THD *thd)
killed flag was set by timeout handler
or KILL command. Return error.
*/
- retval= 1;
+ retval= DISPATCH_COMMAND_ERROR;
if(thd->killed == KILL_WAIT_TIMEOUT)
handle_wait_timeout(thd);
goto end;
@@ -365,19 +390,27 @@ static int threadpool_process_request(THD *thd)
if (mysql_audit_release_required(thd))
mysql_audit_release(thd);
- if ((retval= do_command(thd)) != 0)
- goto end;
+resume:
+ retval= do_command(thd, false);
+ switch(retval)
+ {
+ case DISPATCH_COMMAND_WOULDBLOCK:
+ case DISPATCH_COMMAND_ERROR:
+ goto end;
+ case DISPATCH_COMMAND_SUCCESS:
+ break;
+ }
if (!thd_is_connection_alive(thd))
{
- retval= 1;
+ retval=DISPATCH_COMMAND_ERROR;
goto end;
}
set_thd_idle(thd);
if (!has_unread_data(thd))
- {
+ {
/* More info on this debug sync is in sql_parse.cc*/
DEBUG_SYNC(thd, "before_do_command_net_read");
goto end;
@@ -527,6 +560,15 @@ static void tp_post_kill_notification(THD *thd)
post_kill_notification(thd);
}
+/* Resume previously suspended THD */
+static void tp_resume(THD* thd)
+{
+ DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPENDED);
+ thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
+ TP_connection* c = get_TP_connection(thd);
+ pool->resume(c);
+}
+
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
@@ -537,7 +579,8 @@ static scheduler_functions tp_scheduler_functions=
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
tp_post_kill_notification, // post kill notification
- tp_end // end
+ tp_end, // end
+ tp_resume
};
void pool_of_threads_scheduler(struct scheduler_functions *func,
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc
index 709bb95eb98..19193be0354 100644
--- a/sql/threadpool_generic.cc
+++ b/sql/threadpool_generic.cc
@@ -1327,7 +1327,10 @@ void TP_pool_generic::add(TP_connection *c)
DBUG_VOID_RETURN;
}
-
+void TP_pool_generic::resume(TP_connection* c)
+{
+ add(c);
+}
/**
MySQL scheduler callback: wait begin
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc
index df8a6c216a3..ed68e31c755 100644
--- a/sql/threadpool_win.cc
+++ b/sql/threadpool_win.cc
@@ -125,6 +125,12 @@ void TP_pool_win::add(TP_connection *c)
}
}
+void TP_pool_win::resume(TP_connection* c)
+{
+ DBUG_ASSERT(c->state == TP_STATE_RUNNING);
+ SubmitThreadpoolWork(((TP_connection_win*)c)->work);
+}
+
#define CHECK_ALLOC_ERROR(op) \
do \
{ \
@@ -438,3 +444,4 @@ TP_connection *TP_pool_win::new_connection(CONNECT *connect)
}
return c;
}
+
diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h
index 1a79738bc9b..a3085f0f2dd 100644
--- a/storage/innobase/include/log0log.h
+++ b/storage/innobase/include/log0log.h
@@ -103,15 +103,21 @@ bool
log_set_capacity(ulonglong file_size)
MY_ATTRIBUTE((warn_unused_result));
-/** Ensure that the log has been written to the log file up to a given
+/**
+Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
wait and check if an already running write is covering the request.
@param[in] lsn log sequence number that should be
included in the redo log file write
@param[in] flush_to_disk whether the written log should also
be flushed to the file system
-@param[in] rotate_key whether to rotate the encryption key */
-void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false);
+@param[in] rotate_key whether to rotate the encryption key
+@param[in] cb completion callback. If not NULL, the callback will be called
+ whenever lsn is written or flushed.
+*/
+struct completion_callback;
+void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false,
+ const completion_callback* cb=nullptr);
/** write to the log file up to the last log entry.
@param[in] sync whether we want the written log
diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc
index 44eaecb7a29..19a2ef7f9eb 100644
--- a/storage/innobase/log/log0log.cc
+++ b/storage/innobase/log/log0log.cc
@@ -771,6 +771,7 @@ bool log_write_lock_own()
}
#endif
+
/** Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
wait and check if an already running write is covering the request.
@@ -779,7 +780,8 @@ included in the redo log file write
@param[in] flush_to_disk whether the written log should also
be flushed to the file system
@param[in] rotate_key whether to rotate the encryption key */
-void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
+void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key,
+ const completion_callback *callback)
{
ut_ad(!srv_read_only_mode);
ut_ad(!rotate_key || flush_to_disk);
@@ -788,16 +790,18 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
{
/* Recovery is running and no operations on the log files are
allowed yet (the variable name .._no_ibuf_.. is misleading) */
+ ut_a(!callback);
return;
}
if (flush_to_disk &&
- flush_lock.acquire(lsn) != group_commit_lock::ACQUIRED)
+ flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED)
{
return;
}
- if (write_lock.acquire(lsn) == group_commit_lock::ACQUIRED)
+ if (write_lock.acquire(lsn, flush_to_disk?0:callback) ==
+ group_commit_lock::ACQUIRED)
{
mysql_mutex_lock(&log_sys.mutex);
lsn_t write_lsn= log_sys.get_lsn();
diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc
index 2a6e1b8b853..1c6011a69ff 100644
--- a/storage/innobase/log/log0sync.cc
+++ b/storage/innobase/log/log0sync.cc
@@ -77,6 +77,7 @@ Note that if write operation is very fast, a) or b) can be fine as alternative.
#include <log0types.h>
#include "log0sync.h"
#include <mysql/service_thd_wait.h>
+#include <sql_class.h>
/**
Helper class , used in group commit lock.
@@ -158,10 +159,10 @@ void binary_semaphore::wake()
/* A thread helper structure, used in group commit lock below*/
struct group_commit_waiter_t
{
- lsn_t m_value;
- binary_semaphore m_sema;
- group_commit_waiter_t* m_next;
- group_commit_waiter_t() :m_value(), m_sema(), m_next() {}
+ lsn_t m_value=0;
+ binary_semaphore m_sema{};
+ group_commit_waiter_t* m_next{};
+ bool m_group_commit_leader=false;
};
group_commit_lock::group_commit_lock() :
@@ -188,7 +189,13 @@ void group_commit_lock::set_pending(group_commit_lock::value_type num)
const unsigned int MAX_SPINS = 1; /** max spins in acquire */
thread_local group_commit_waiter_t thread_local_waiter;
-group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
+static inline void do_completion_callback(const completion_callback* cb)
+{
+ if (cb)
+ cb->m_callback(cb->m_param);
+}
+
+group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback)
{
unsigned int spins = MAX_SPINS;
@@ -197,6 +204,7 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
if (num <= value())
{
/* No need to wait.*/
+ do_completion_callback(callback);
return lock_return_code::EXPIRED;
}
@@ -212,17 +220,23 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
}
thread_local_waiter.m_value = num;
+ thread_local_waiter.m_group_commit_leader= false;
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
while (num > value())
{
lk.lock();
/* Re-read current value after acquiring the lock*/
- if (num <= value())
+ if (num <= value() &&
+ (!thread_local_waiter.m_group_commit_leader || m_lock))
{
+ lk.unlock();
+ do_completion_callback(callback);
+ thread_local_waiter.m_group_commit_leader=false;
return lock_return_code::EXPIRED;
}
+ thread_local_waiter.m_group_commit_leader= false;
if (!m_lock)
{
/* Take the lock, become group commit leader.*/
@@ -230,9 +244,21 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
#ifndef DBUG_OFF
m_owner_id = std::this_thread::get_id();
#endif
+ if (callback)
+ m_pending_callbacks.push_back({num,*callback});
return lock_return_code::ACQUIRED;
}
+ if (callback && m_waiters_list)
+ {
+ /*
+ We need to have at least one waiter,
+ so it can become the new group commit leader.
+ */
+ m_pending_callbacks.push_back({num, *callback});
+ return lock_return_code::CALLBACK_QUEUED;
+ }
+
/* Add yourself to waiters list.*/
thread_local_waiter.m_next = m_waiters_list;
m_waiters_list = &thread_local_waiter;
@@ -244,11 +270,15 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
thd_wait_end(0);
}
+ do_completion_callback(callback);
return lock_return_code::EXPIRED;
}
void group_commit_lock::release(value_type num)
{
+ completion_callback callbacks[1000];
+ size_t callback_count = 0;
+
std::unique_lock<std::mutex> lk(m_mtx);
m_lock = false;
@@ -262,12 +292,21 @@ void group_commit_lock::release(value_type num)
*/
group_commit_waiter_t* cur, * prev, * next;
group_commit_waiter_t* wakeup_list = nullptr;
- int extra_wake = 0;
+ for (auto& c : m_pending_callbacks)
+ {
+ if (c.first <= num)
+ {
+ if (callback_count < array_elements(callbacks))
+ callbacks[callback_count++] = c.second;
+ else
+ c.second.m_callback(c.second.m_param);
+ }
+ }
for (prev= nullptr, cur= m_waiters_list; cur; cur= next)
{
next= cur->m_next;
- if (cur->m_value <= num || extra_wake++ == 0)
+ if (cur->m_value <= num)
{
/* Move current waiter to wakeup_list*/
@@ -291,8 +330,43 @@ void group_commit_lock::release(value_type num)
prev= cur;
}
}
+
+ auto it= std::remove_if(
+ m_pending_callbacks.begin(), m_pending_callbacks.end(),
+ [num](const std::pair<value_type,completion_callback> &c) { return c.first <= num; });
+
+ m_pending_callbacks.erase(it, m_pending_callbacks.end());
+
+ if (m_pending_callbacks.size() || m_waiters_list)
+ {
+ /*
+ Ensure that after this thread released the lock,
+ there is a new group commit leader
+ We take this from waiters list or wakeup list. It
+ might look like a spurious wake, but in fact we just
+ ensure the waiter do not wait for eternity.
+ */
+ if (!m_waiters_list && !wakeup_list)
+ abort(); /* Assert, also in release version */
+
+ if (m_waiters_list)
+ {
+ /* Move one waiter to wakeup list */
+ auto e= m_waiters_list;
+ m_waiters_list= m_waiters_list->m_next;
+ e->m_next= wakeup_list;
+ e->m_group_commit_leader= true;
+ wakeup_list = e;
+ }
+ else //if (wakeup_list)
+ wakeup_list->m_group_commit_leader=true;
+ }
+
lk.unlock();
+ for (size_t i = 0; i < callback_count; i++)
+ callbacks[i].m_callback(callbacks[i].m_param);
+
for (cur= wakeup_list; cur; cur= next)
{
next= cur->m_next;
diff --git a/storage/innobase/log/log0sync.h b/storage/innobase/log/log0sync.h
index 40afbf74ecd..ed928582826 100644
--- a/storage/innobase/log/log0sync.h
+++ b/storage/innobase/log/log0sync.h
@@ -18,8 +18,14 @@ this program; if not, write to the Free Software Foundation, Inc.,
#include <atomic>
#include <thread>
#include <log0types.h>
+#include <vector>
struct group_commit_waiter_t;
+struct completion_callback
+{
+ void (*m_callback)(void*);
+ void* m_param;
+};
/**
Special synchronization primitive, which is helpful for
@@ -63,14 +69,17 @@ class group_commit_lock
std::atomic<value_type> m_pending_value;
bool m_lock;
group_commit_waiter_t* m_waiters_list;
+ std::vector<std::pair<value_type,completion_callback>> m_pending_callbacks;
+
public:
group_commit_lock();
enum lock_return_code
{
ACQUIRED,
- EXPIRED
+ EXPIRED,
+ CALLBACK_QUEUED
};
- lock_return_code acquire(value_type num);
+ lock_return_code acquire(value_type num, const completion_callback *cb);
void release(value_type num);
value_type value() const;
value_type pending() const;
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index 2ba782e2f1c..d904bd19e7e 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -23,6 +23,7 @@ The transaction
Created 3/26/1996 Heikki Tuuri
*******************************************************/
+#define MYSQL_SERVER
#include "trx0trx.h"
@@ -1165,35 +1166,49 @@ trx_finalize_for_fts(
trx->fts_trx = NULL;
}
-/**********************************************************************//**
-If required, flushes the log to disk based on the value of
-innodb_flush_log_at_trx_commit. */
-static
-void
-trx_flush_log_if_needed_low(
-/*========================*/
- lsn_t lsn) /*!< in: lsn up to which logs are to be
- flushed. */
+extern "C" MYSQL_THD thd_increment_pending_ops();
+extern "C" void thd_decrement_pending_ops(MYSQL_THD);
+
+
+#include "../log/log0sync.h"
+
+/*
+ If required, initiates write and optionally flush of the log to
+ disk
+ @param[in] lsn - lsn up to which logs are to be flushed.
+ @param[in] trx_state - if trx_state is PREPARED, the function will
+ also wait for the flush to complete.
+*/
+static void trx_flush_log_if_needed_low(lsn_t lsn, trx_state_t trx_state)
{
- bool flush = srv_file_flush_method != SRV_NOSYNC;
+ if (!srv_flush_log_at_trx_commit)
+ return;
- switch (srv_flush_log_at_trx_commit) {
- case 3:
- case 2:
- /* Write the log but do not flush it to disk */
- flush = false;
- /* fall through */
- case 1:
- /* Write the log and optionally flush it to disk */
- log_write_up_to(lsn, flush);
- srv_inc_activity_count();
- return;
- case 0:
- /* Do nothing */
- return;
- }
+ if (log_sys.get_flushed_lsn() > lsn)
+ return;
- ut_error;
+ bool flush= srv_file_flush_method != SRV_NOSYNC &&
+ srv_flush_log_at_trx_commit == 1;
+
+ if (trx_state == TRX_STATE_PREPARED)
+ {
+ /* XA, which is used with binlog as well.
+ Be conservative, use synchronous wait.*/
+ log_write_up_to(lsn, flush);
+ return;
+ }
+
+ completion_callback cb;
+ if ((cb.m_param = thd_increment_pending_ops()))
+ {
+ cb.m_callback = (void (*)(void *)) thd_decrement_pending_ops;
+ log_write_up_to(lsn, flush, false, &cb);
+ }
+ else
+ {
+ /* No THD, synchronous write */
+ log_write_up_to(lsn, flush);
+ }
}
/**********************************************************************//**
@@ -1208,7 +1223,7 @@ trx_flush_log_if_needed(
trx_t* trx) /*!< in/out: transaction */
{
trx->op_info = "flushing log";
- trx_flush_log_if_needed_low(lsn);
+ trx_flush_log_if_needed_low(lsn,trx->state);
trx->op_info = "";
}