diff options
author | Marko Mäkelä <marko.makela@mariadb.com> | 2021-02-16 15:44:45 +0200 |
---|---|---|
committer | Marko Mäkelä <marko.makela@mariadb.com> | 2021-02-16 15:44:45 +0200 |
commit | 2a55c8208f42aa5951b9478fe6abaf63d9d5d402 (patch) | |
tree | c603a87008b17e2f0f333db2a06dcbad085723c0 | |
parent | 2ec4b277cc12c005fbe9f308bb8457f055250e97 (diff) | |
parent | 1146e98b3af3e2b15df0598a860f4571663a98d0 (diff) | |
download | mariadb-git-2a55c8208f42aa5951b9478fe6abaf63d9d5d402.tar.gz |
Merge 10.6
-rw-r--r-- | mysql-test/suite/innodb/r/innodb-truncate.result | 13 | ||||
-rw-r--r-- | mysql-test/suite/innodb/t/innodb-truncate.test | 16 | ||||
-rw-r--r-- | sql/net_serv.cc | 16 | ||||
-rw-r--r-- | sql/scheduler.h | 2 | ||||
-rw-r--r-- | sql/sql_class.cc | 53 | ||||
-rw-r--r-- | sql/sql_class.h | 159 | ||||
-rw-r--r-- | sql/sql_parse.cc | 93 | ||||
-rw-r--r-- | sql/sql_parse.h | 13 | ||||
-rw-r--r-- | sql/threadpool.h | 3 | ||||
-rw-r--r-- | sql/threadpool_common.cc | 67 | ||||
-rw-r--r-- | sql/threadpool_generic.cc | 5 | ||||
-rw-r--r-- | sql/threadpool_win.cc | 7 | ||||
-rw-r--r-- | storage/innobase/include/log0log.h | 12 | ||||
-rw-r--r-- | storage/innobase/lock/lock0lock.cc | 1 | ||||
-rw-r--r-- | storage/innobase/log/log0log.cc | 10 | ||||
-rw-r--r-- | storage/innobase/log/log0sync.cc | 89 | ||||
-rw-r--r-- | storage/innobase/log/log0sync.h | 15 | ||||
-rw-r--r-- | storage/innobase/trx/trx0trx.cc | 122 |
18 files changed, 594 insertions, 102 deletions
diff --git a/mysql-test/suite/innodb/r/innodb-truncate.result b/mysql-test/suite/innodb/r/innodb-truncate.result index 8610a892cc6..3e2ab7936dc 100644 --- a/mysql-test/suite/innodb/r/innodb-truncate.result +++ b/mysql-test/suite/innodb/r/innodb-truncate.result @@ -91,3 +91,16 @@ ALTER TABLE t1 FORCE; TRUNCATE TABLE t1; ERROR 42000: Cannot truncate a table referenced in a foreign key constraint (`test`.`t2`, CONSTRAINT `t2_ibfk_1` FOREIGN KEY (`f2`) REFERENCES `test`.`t3` (`f2`)) DROP TABLE t2, t1; +# +# MDEV-24861 Assertion `trx->rsegs.m_redo.rseg' failed +# in innodb_prepare_commit_versioned +# +CREATE TABLE t1 (id INT PRIMARY KEY, f TEXT UNIQUE, +s BIGINT UNSIGNED AS ROW START, e BIGINT UNSIGNED AS ROW END, +PERIOD FOR SYSTEM_TIME(s,e)) +ENGINE=InnoDB WITH SYSTEM VERSIONING; +CREATE TABLE t2 (id INT PRIMARY KEY) ENGINE=InnoDB; +ALTER TABLE t1 FORCE; +TRUNCATE TABLE t2; +DROP TABLE t1, t2; +# End of 10.6 tests diff --git a/mysql-test/suite/innodb/t/innodb-truncate.test b/mysql-test/suite/innodb/t/innodb-truncate.test index 71c0fcfea8b..4d39fcaef6d 100644 --- a/mysql-test/suite/innodb/t/innodb-truncate.test +++ b/mysql-test/suite/innodb/t/innodb-truncate.test @@ -92,3 +92,19 @@ ALTER TABLE t1 FORCE; --error ER_TRUNCATE_ILLEGAL_FK TRUNCATE TABLE t1; DROP TABLE t2, t1; + +--echo # +--echo # MDEV-24861 Assertion `trx->rsegs.m_redo.rseg' failed +--echo # in innodb_prepare_commit_versioned +--echo # + +CREATE TABLE t1 (id INT PRIMARY KEY, f TEXT UNIQUE, + s BIGINT UNSIGNED AS ROW START, e BIGINT UNSIGNED AS ROW END, + PERIOD FOR SYSTEM_TIME(s,e)) +ENGINE=InnoDB WITH SYSTEM VERSIONING; +CREATE TABLE t2 (id INT PRIMARY KEY) ENGINE=InnoDB; +ALTER TABLE t1 FORCE; +TRUNCATE TABLE t2; +DROP TABLE t1, t2; + +--echo # End of 10.6 tests diff --git a/sql/net_serv.cc b/sql/net_serv.cc index a96c43a94fe..409d3cac85e 100644 --- a/sql/net_serv.cc +++ b/sql/net_serv.cc @@ -640,8 +640,20 @@ net_real_write(NET *net,const uchar *packet, size_t len) my_bool net_blocking = vio_is_blocking(net->vio); DBUG_ENTER("net_real_write"); -#if defined(MYSQL_SERVER) && defined(USE_QUERY_CACHE) - query_cache_insert(net->thd, (char*) packet, len, net->pkt_nr); +#if defined(MYSQL_SERVER) + THD *thd= (THD *)net->thd; +#if defined(USE_QUERY_CACHE) + query_cache_insert(thd, (char*) packet, len, net->pkt_nr); +#endif + if (likely(thd)) + { + /* + Wait until pending operations (currently it is engine + asynchronous group commit) are finished before replying + to the client, to keep durability promise. + */ + thd->async_state.wait_for_pending_ops(); + } #endif if (unlikely(net->error == 2)) diff --git a/sql/scheduler.h b/sql/scheduler.h index ebf8d6e9e64..68387390d81 100644 --- a/sql/scheduler.h +++ b/sql/scheduler.h @@ -40,6 +40,8 @@ struct scheduler_functions void (*thd_wait_end)(THD *thd); void (*post_kill_notification)(THD *thd); void (*end)(void); + /** resume previous unfinished command (threadpool only)*/ + void (*thd_resume)(THD* thd); }; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 3b8414f0c71..5b1d21bb44b 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -682,7 +682,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) m_stmt_da(&main_da), tdc_hash_pins(0), xid_hash_pins(0), - m_tmp_tables_locked(false) + m_tmp_tables_locked(false), + async_state() #ifdef HAVE_REPLICATION , current_linfo(0), @@ -4947,6 +4948,56 @@ void reset_thd(MYSQL_THD thd) free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC)); } +/** + This function can be used by storage engine + to indicate a start of an async operation. + + This asynchronous is such operation needs to be + finished before we write response to the client +. + An example of this operation is Innodb's asynchronous + group commit. Server needs to wait for the end of it + before writing response to client, to provide durability + guarantees, in other words, server can't send OK packet + before modified data is durable in redo log. +*/ +extern "C" MYSQL_THD thd_increment_pending_ops(void) +{ + THD *thd = current_thd; + if (!thd) + return NULL; + thd->async_state.inc_pending_ops(); + return thd; +} + +/** + This function can be used by plugin/engine to indicate + end of async operation (such as end of group commit + write flush) + + @param thd THD +*/ +extern "C" void thd_decrement_pending_ops(MYSQL_THD thd) +{ + DBUG_ASSERT(thd); + thd_async_state::enum_async_state state; + if (thd->async_state.dec_pending_ops(&state) == 0) + { + switch(state) + { + case thd_async_state::enum_async_state::SUSPENDED: + DBUG_ASSERT(thd->scheduler->thd_resume); + thd->scheduler->thd_resume(thd); + break; + case thd_async_state::enum_async_state::NONE: + break; + default: + DBUG_ASSERT(0); + } + } +} + + unsigned long long thd_get_query_id(const MYSQL_THD thd) { return((unsigned long long)thd->query_id); diff --git a/sql/sql_class.h b/sql/sql_class.h index 50b746fe514..5612ed5a790 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2309,6 +2309,164 @@ struct THD_count ~THD_count() { thread_count--; } }; +/** + Support structure for asynchronous group commit, or more generally + any asynchronous operation that needs to finish before server writes + response to client. + + An engine, or any other server component, can signal that there is + a pending operation by incrementing a counter, i.e inc_pending_ops() + and that pending operation is finished by decrementing that counter + dec_pending_ops(). + + NOTE: Currently, pending operations can not fail, i.e there is no + way to pass a return code in dec_pending_ops() + + The server does not write response to the client before the counter + becomes 0. In case of group commit it ensures that data is persistent + before success reported to client, i.e durability in ACID. +*/ +struct thd_async_state +{ + enum class enum_async_state + { + NONE, + SUSPENDED, /* do_command() did not finish, and needs to be resumed */ + RESUMED /* do_command() is resumed*/ + }; + enum_async_state m_state{enum_async_state::NONE}; + + /* Stuff we need to resume do_command where we finished last time*/ + enum enum_server_command m_command{COM_SLEEP}; + LEX_STRING m_packet{}; + + mysql_mutex_t m_mtx; + mysql_cond_t m_cond; + + /** Pending counter*/ + Atomic_counter<int> m_pending_ops=0; + +#ifndef DBUG_OFF + /* Checks */ + pthread_t m_dbg_thread; +#endif + + thd_async_state() + { + mysql_mutex_init(PSI_NOT_INSTRUMENTED, &m_mtx, 0); + mysql_cond_init(PSI_INSTRUMENT_ME, &m_cond, 0); + } + + /* + Currently only used with threadpool, one can "suspend" and "resume" a THD. + Suspend only means leaving do_command earlier, after saving some state. + Resume is continuing suspended THD's do_command(), from where it finished last time. + */ + bool try_suspend() + { + bool ret; + mysql_mutex_lock(&m_mtx); + DBUG_ASSERT(m_state == enum_async_state::NONE); + DBUG_ASSERT(m_pending_ops >= 0); + + if(m_pending_ops) + { + ret=true; + m_state= enum_async_state::SUSPENDED; + } + else + { + /* + If there is no pending operations, can't suspend, since + nobody can resume it. + */ + ret=false; + } + mysql_mutex_unlock(&m_mtx); + return ret; + } + + ~thd_async_state() + { + wait_for_pending_ops(); + mysql_mutex_destroy(&m_mtx); + mysql_cond_destroy(&m_cond); + } + + /* + Increment pending asynchronous operations. + The client response may not be written if + this count > 0. + So, without threadpool query needs to wait for + the operations to finish. + With threadpool, THD can be suspended and resumed + when this counter goes to 0. + */ + void inc_pending_ops() + { + mysql_mutex_lock(&m_mtx); + +#ifndef DBUG_OFF + /* + Check that increments are always done by the same thread. + */ + if (!m_pending_ops) + m_dbg_thread= pthread_self(); + else + DBUG_ASSERT(pthread_equal(pthread_self(),m_dbg_thread)); +#endif + + m_pending_ops++; + mysql_mutex_unlock(&m_mtx); + } + + int dec_pending_ops(enum_async_state* state) + { + int ret; + mysql_mutex_lock(&m_mtx); + ret= --m_pending_ops; + if (!ret) + mysql_cond_signal(&m_cond); + *state = m_state; + mysql_mutex_unlock(&m_mtx); + return ret; + } + + /* + This is used for "dirty" reading pending ops, + when dirty read is OK. + */ + int pending_ops() + { + return m_pending_ops; + } + + /* Wait for pending operations to finish.*/ + void wait_for_pending_ops() + { + /* + It is fine to read m_pending_ops and compare it with 0, + without mutex protection. + + The value is only incremented by the current thread, and will + be decremented by another one, thus "dirty" may show positive number + when it is really 0, but this is not a problem, and the only + bad thing from that will be rechecking under mutex. + */ + if (!pending_ops()) + return; + + mysql_mutex_lock(&m_mtx); + DBUG_ASSERT(m_pending_ops >= 0); + while (m_pending_ops) + mysql_cond_wait(&m_cond, &m_mtx); + mysql_mutex_unlock(&m_mtx); + } +}; + +extern "C" MYSQL_THD thd_increment_pending_ops(void); +extern "C" void thd_decrement_pending_ops(MYSQL_THD); + /** @class THD @@ -5025,6 +5183,7 @@ private: } public: + thd_async_state async_state; #ifdef HAVE_REPLICATION /* If we do a purge of binary logs, log index info of the threads diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 97b7dd427a5..078d48872b0 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1168,25 +1168,55 @@ static enum enum_server_command fetch_command(THD *thd, char *packet) /** Read one command from connection and execute it (query or simple command). - This function is called in loop from thread function. + This function is to be used by different schedulers (one-thread-per-connection, + pool-of-threads) For profiling to work, it must never be called recursively. + @param thd - client connection context + + @param blocking - wait for command to finish. + if false (nonblocking), then the function might + return when command is "half-finished", with + DISPATCH_COMMAND_WOULDBLOCK. + Currenly, this can *only* happen when using + threadpool. The command will resume, after all outstanding + async operations (i.e group commit) finish. + Threadpool scheduler takes care of "resume". + + @retval + DISPATCH_COMMAND_SUCCESS - success @retval - 0 success + DISPATCH_COMMAND_CLOSE_CONNECTION request of THD shutdown + (s. dispatch_command() description) @retval - 1 request of thread shutdown (see dispatch_command() description) + DISPATCH_COMMAND_WOULDBLOCK - need to wait for asyncronous operations + to finish. Only returned if parameter + 'blocking' is false. */ -bool do_command(THD *thd) +dispatch_command_return do_command(THD *thd, bool blocking) { - bool return_value; + dispatch_command_return return_value; char *packet= 0; ulong packet_length; NET *net= &thd->net; enum enum_server_command command; DBUG_ENTER("do_command"); + DBUG_ASSERT(!thd->async_state.pending_ops()); + if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED) + { + /* + Resuming previously suspended command. + Restore the state + */ + command = thd->async_state.m_command; + packet = thd->async_state.m_packet.str; + packet_length = (ulong)thd->async_state.m_packet.length; + goto resume; + } + /* indicator of uninitialized lex => normal flow of errors handling (see my_message_sql) @@ -1253,12 +1283,12 @@ bool do_command(THD *thd) if (net->error != 3) { - return_value= TRUE; // We have to close it. + return_value= DISPATCH_COMMAND_CLOSE_CONNECTION; // We have to close it. goto out; } net->error= 0; - return_value= FALSE; + return_value= DISPATCH_COMMAND_SUCCESS; goto out; } @@ -1325,7 +1355,7 @@ bool do_command(THD *thd) MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); thd->m_statement_psi= NULL; thd->m_digest= NULL; - return_value= FALSE; + return_value= DISPATCH_COMMAND_SUCCESS; wsrep_after_command_before_result(thd); goto out; @@ -1351,7 +1381,7 @@ bool do_command(THD *thd) thd->m_statement_psi= NULL; thd->m_digest= NULL; - return_value= FALSE; + return_value= DISPATCH_COMMAND_SUCCESS; wsrep_after_command_before_result(thd); goto out; } @@ -1362,8 +1392,18 @@ bool do_command(THD *thd) DBUG_ASSERT(packet_length); DBUG_ASSERT(!thd->apc_target.is_enabled()); + +resume: return_value= dispatch_command(command, thd, packet+1, - (uint) (packet_length-1)); + (uint) (packet_length-1), blocking); + if (return_value == DISPATCH_COMMAND_WOULDBLOCK) + { + /* Save current state, and resume later.*/ + thd->async_state.m_command= command; + thd->async_state.m_packet={packet,packet_length}; + DBUG_RETURN(return_value); + } + DBUG_ASSERT(!thd->apc_target.is_enabled()); out: @@ -1510,6 +1550,13 @@ public: @param packet_length length of packet + 1 (to show that data is null-terminated) except for COM_SLEEP, where it can be zero. + @param blocking if false (nonblocking), then the function might + return when command is "half-finished", with + DISPATCH_COMMAND_WOULDBLOCK. + Currenly, this can *only* happen when using threadpool. + The current command will resume, after all outstanding + async operations (i.e group commit) finish. + Threadpool scheduler takes care of "resume". @todo set thd->lex->sql_command to SQLCOM_END here. @@ -1522,8 +1569,8 @@ public: 1 request of thread shutdown, i. e. if command is COM_QUIT/COM_SHUTDOWN */ -bool dispatch_command(enum enum_server_command command, THD *thd, - char* packet, uint packet_length) +dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd, + char* packet, uint packet_length, bool blocking) { NET *net= &thd->net; bool error= 0; @@ -1535,6 +1582,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd, "<?>"))); bool drop_more_results= 0; + if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED) + { + thd->async_state.m_state = thd_async_state::enum_async_state::NONE; + goto resume; + } + /* keep it withing 1 byte */ compile_time_assert(COM_END == 255); @@ -2265,6 +2318,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, general_log_print(thd, command, NullS); my_eof(thd); break; + case COM_SLEEP: case COM_CONNECT: // Impossible here case COM_TIME: // Impossible from client @@ -2278,7 +2332,18 @@ bool dispatch_command(enum enum_server_command command, THD *thd, } dispatch_end: - do_end_of_statement= true; + /* + For the threadpool i.e if non-blocking call, if not all async operations + are finished, return without cleanup. The cleanup will be done on + later, when command execution is resumed. + */ + if (!blocking && !error && thd->async_state.pending_ops()) + { + DBUG_RETURN(DISPATCH_COMMAND_WOULDBLOCK); + } + +resume: + #ifdef WITH_WSREP /* Next test should really be WSREP(thd), but that causes a failure when doing @@ -2382,7 +2447,7 @@ dispatch_end: /* Check that some variables are reset properly */ DBUG_ASSERT(thd->abort_on_warning == 0); thd->lex->restore_set_statement_var(); - DBUG_RETURN(error); + DBUG_RETURN(error?DISPATCH_COMMAND_CLOSE_CONNECTION: DISPATCH_COMMAND_SUCCESS); } static bool slow_filter_masked(THD *thd, ulonglong mask) diff --git a/sql/sql_parse.h b/sql/sql_parse.h index 37861cf224f..44fd2185726 100644 --- a/sql/sql_parse.h +++ b/sql/sql_parse.h @@ -100,9 +100,16 @@ bool multi_delete_set_locks_and_link_aux_tables(LEX *lex); void create_table_set_open_action_and_adjust_tables(LEX *lex); int bootstrap(MYSQL_FILE *file); int mysql_execute_command(THD *thd); -bool do_command(THD *thd); -bool dispatch_command(enum enum_server_command command, THD *thd, - char* packet, uint packet_length); +enum dispatch_command_return +{ + DISPATCH_COMMAND_SUCCESS=0, + DISPATCH_COMMAND_CLOSE_CONNECTION= 1, + DISPATCH_COMMAND_WOULDBLOCK= 2 +}; + +dispatch_command_return do_command(THD *thd, bool blocking = true); +dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd, + char* packet, uint packet_length, bool blocking = true); void log_slow_statement(THD *thd); bool append_file_to_dir(THD *thd, const char **filename_ptr, const LEX_CSTRING *table_name); diff --git a/sql/threadpool.h b/sql/threadpool.h index 11132b3bf95..7737d056b4a 100644 --- a/sql/threadpool.h +++ b/sql/threadpool.h @@ -133,6 +133,7 @@ struct TP_pool virtual int set_stall_limit(uint){ return 0; } virtual int get_thread_count() { return tp_stats.num_worker_threads; } virtual int get_idle_thread_count(){ return 0; } + virtual void resume(TP_connection* c)=0; }; #ifdef _WIN32 @@ -146,6 +147,7 @@ struct TP_pool_win:TP_pool virtual void add(TP_connection *); virtual int set_max_threads(uint); virtual int set_min_threads(uint); + void resume(TP_connection *c); }; #endif @@ -159,6 +161,7 @@ struct TP_pool_generic :TP_pool virtual int set_pool_size(uint); virtual int set_stall_limit(uint); virtual int get_idle_thread_count(); + void resume(TP_connection* c); }; #endif /* HAVE_POOL_OF_THREADS */ diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index 168579d984b..07555ac21ed 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -23,6 +23,8 @@ #include <sql_audit.h> #include <debug_sync.h> #include <threadpool.h> +#include <sql_class.h> +#include <sql_parse.h> #ifdef WITH_WSREP #include "wsrep_trans_observer.h" @@ -51,7 +53,7 @@ TP_STATISTICS tp_stats; static void threadpool_remove_connection(THD *thd); -static int threadpool_process_request(THD *thd); +static dispatch_command_return threadpool_process_request(THD *thd); static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c); extern bool do_command(THD*); @@ -195,10 +197,30 @@ void tp_callback(TP_connection *c) } c->connect= 0; } - else if (threadpool_process_request(thd)) + else { - /* QUIT or an error occurred. */ - goto error; +retry: + switch(threadpool_process_request(thd)) + { + case DISPATCH_COMMAND_WOULDBLOCK: + if (!thd->async_state.try_suspend()) + { + /* + All async operations finished meanwhile, thus nobody is will wake up + this THD. Therefore, we'll resume "manually" here. + */ + thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED; + goto retry; + } + worker_context.restore(); + return; + case DISPATCH_COMMAND_CLOSE_CONNECTION: + /* QUIT or an error occurred. */ + goto error; + case DISPATCH_COMMAND_SUCCESS: + break; + } + thd->async_state.m_state= thd_async_state::enum_async_state::NONE; } /* Set priority */ @@ -331,10 +353,13 @@ static bool has_unread_data(THD* thd) /** Process a single client request or a single batch. */ -static int threadpool_process_request(THD *thd) +static dispatch_command_return threadpool_process_request(THD *thd) { - int retval= 0; + dispatch_command_return retval= DISPATCH_COMMAND_SUCCESS; + thread_attach(thd); + if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED) + goto resume; if (thd->killed >= KILL_CONNECTION) { @@ -342,7 +367,7 @@ static int threadpool_process_request(THD *thd) killed flag was set by timeout handler or KILL command. Return error. */ - retval= 1; + retval= DISPATCH_COMMAND_CLOSE_CONNECTION; if(thd->killed == KILL_WAIT_TIMEOUT) handle_wait_timeout(thd); goto end; @@ -365,19 +390,27 @@ static int threadpool_process_request(THD *thd) if (mysql_audit_release_required(thd)) mysql_audit_release(thd); - if ((retval= do_command(thd)) != 0) - goto end; +resume: + retval= do_command(thd, false); + switch(retval) + { + case DISPATCH_COMMAND_WOULDBLOCK: + case DISPATCH_COMMAND_CLOSE_CONNECTION: + goto end; + case DISPATCH_COMMAND_SUCCESS: + break; + } if (!thd_is_connection_alive(thd)) { - retval= 1; + retval=DISPATCH_COMMAND_CLOSE_CONNECTION; goto end; } set_thd_idle(thd); if (!has_unread_data(thd)) - { + { /* More info on this debug sync is in sql_parse.cc*/ DEBUG_SYNC(thd, "before_do_command_net_read"); goto end; @@ -527,6 +560,15 @@ static void tp_post_kill_notification(THD *thd) post_kill_notification(thd); } +/* Resume previously suspended THD */ +static void tp_resume(THD* thd) +{ + DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPENDED); + thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED; + TP_connection* c = get_TP_connection(thd); + pool->resume(c); +} + static scheduler_functions tp_scheduler_functions= { 0, // max_threads @@ -537,7 +579,8 @@ static scheduler_functions tp_scheduler_functions= tp_wait_begin, // thd_wait_begin tp_wait_end, // thd_wait_end tp_post_kill_notification, // post kill notification - tp_end // end + tp_end, // end + tp_resume }; void pool_of_threads_scheduler(struct scheduler_functions *func, diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc index 709bb95eb98..19193be0354 100644 --- a/sql/threadpool_generic.cc +++ b/sql/threadpool_generic.cc @@ -1327,7 +1327,10 @@ void TP_pool_generic::add(TP_connection *c) DBUG_VOID_RETURN; } - +void TP_pool_generic::resume(TP_connection* c) +{ + add(c); +} /** MySQL scheduler callback: wait begin diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index df8a6c216a3..ed68e31c755 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -125,6 +125,12 @@ void TP_pool_win::add(TP_connection *c) } } +void TP_pool_win::resume(TP_connection* c) +{ + DBUG_ASSERT(c->state == TP_STATE_RUNNING); + SubmitThreadpoolWork(((TP_connection_win*)c)->work); +} + #define CHECK_ALLOC_ERROR(op) \ do \ { \ @@ -438,3 +444,4 @@ TP_connection *TP_pool_win::new_connection(CONNECT *connect) } return c; } + diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h index 1a79738bc9b..a3085f0f2dd 100644 --- a/storage/innobase/include/log0log.h +++ b/storage/innobase/include/log0log.h @@ -103,15 +103,21 @@ bool log_set_capacity(ulonglong file_size) MY_ATTRIBUTE((warn_unused_result)); -/** Ensure that the log has been written to the log file up to a given +/** +Ensure that the log has been written to the log file up to a given log entry (such as that of a transaction commit). Start a new write, or wait and check if an already running write is covering the request. @param[in] lsn log sequence number that should be included in the redo log file write @param[in] flush_to_disk whether the written log should also be flushed to the file system -@param[in] rotate_key whether to rotate the encryption key */ -void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false); +@param[in] rotate_key whether to rotate the encryption key +@param[in] cb completion callback. If not NULL, the callback will be called + whenever lsn is written or flushed. +*/ +struct completion_callback; +void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false, + const completion_callback* cb=nullptr); /** write to the log file up to the last log entry. @param[in] sync whether we want the written log diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index 39b62fcc74f..8a3351dff56 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -123,6 +123,7 @@ void lock_sys_t::hash_table::resize(ulint n) } } + aligned_free(array); array= new_array; n_cells= new_n_cells; } diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc index 95167d6a487..ff81f9099df 100644 --- a/storage/innobase/log/log0log.cc +++ b/storage/innobase/log/log0log.cc @@ -771,6 +771,7 @@ bool log_write_lock_own() } #endif + /** Ensure that the log has been written to the log file up to a given log entry (such as that of a transaction commit). Start a new write, or wait and check if an already running write is covering the request. @@ -779,7 +780,8 @@ included in the redo log file write @param[in] flush_to_disk whether the written log should also be flushed to the file system @param[in] rotate_key whether to rotate the encryption key */ -void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key) +void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key, + const completion_callback *callback) { ut_ad(!srv_read_only_mode); ut_ad(!rotate_key || flush_to_disk); @@ -788,16 +790,18 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key) { /* Recovery is running and no operations on the log files are allowed yet (the variable name .._no_ibuf_.. is misleading) */ + ut_a(!callback); return; } if (flush_to_disk && - flush_lock.acquire(lsn) != group_commit_lock::ACQUIRED) + flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED) { return; } - if (write_lock.acquire(lsn) == group_commit_lock::ACQUIRED) + if (write_lock.acquire(lsn, flush_to_disk?0:callback) == + group_commit_lock::ACQUIRED) { mysql_mutex_lock(&log_sys.mutex); lsn_t write_lsn= log_sys.get_lsn(); diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc index 2a6e1b8b853..30433007639 100644 --- a/storage/innobase/log/log0sync.cc +++ b/storage/innobase/log/log0sync.cc @@ -77,6 +77,7 @@ Note that if write operation is very fast, a) or b) can be fine as alternative. #include <log0types.h> #include "log0sync.h" #include <mysql/service_thd_wait.h> +#include <sql_class.h> /** Helper class , used in group commit lock. @@ -158,10 +159,10 @@ void binary_semaphore::wake() /* A thread helper structure, used in group commit lock below*/ struct group_commit_waiter_t { - lsn_t m_value; - binary_semaphore m_sema; - group_commit_waiter_t* m_next; - group_commit_waiter_t() :m_value(), m_sema(), m_next() {} + lsn_t m_value=0; + binary_semaphore m_sema{}; + group_commit_waiter_t* m_next= nullptr; + bool m_group_commit_leader=false; }; group_commit_lock::group_commit_lock() : @@ -188,7 +189,13 @@ void group_commit_lock::set_pending(group_commit_lock::value_type num) const unsigned int MAX_SPINS = 1; /** max spins in acquire */ thread_local group_commit_waiter_t thread_local_waiter; -group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) +static inline void do_completion_callback(const completion_callback* cb) +{ + if (cb) + cb->m_callback(cb->m_param); +} + +group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback) { unsigned int spins = MAX_SPINS; @@ -197,6 +204,7 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) if (num <= value()) { /* No need to wait.*/ + do_completion_callback(callback); return lock_return_code::EXPIRED; } @@ -212,17 +220,23 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) } thread_local_waiter.m_value = num; + thread_local_waiter.m_group_commit_leader= false; std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock); while (num > value()) { lk.lock(); /* Re-read current value after acquiring the lock*/ - if (num <= value()) + if (num <= value() && + (!thread_local_waiter.m_group_commit_leader || m_lock)) { + lk.unlock(); + do_completion_callback(callback); + thread_local_waiter.m_group_commit_leader=false; return lock_return_code::EXPIRED; } + thread_local_waiter.m_group_commit_leader= false; if (!m_lock) { /* Take the lock, become group commit leader.*/ @@ -230,9 +244,21 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) #ifndef DBUG_OFF m_owner_id = std::this_thread::get_id(); #endif + if (callback) + m_pending_callbacks.push_back({num,*callback}); return lock_return_code::ACQUIRED; } + if (callback && m_waiters_list) + { + /* + We need to have at least one waiter, + so it can become the new group commit leader. + */ + m_pending_callbacks.push_back({num, *callback}); + return lock_return_code::CALLBACK_QUEUED; + } + /* Add yourself to waiters list.*/ thread_local_waiter.m_next = m_waiters_list; m_waiters_list = &thread_local_waiter; @@ -244,11 +270,15 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) thd_wait_end(0); } + do_completion_callback(callback); return lock_return_code::EXPIRED; } void group_commit_lock::release(value_type num) { + completion_callback callbacks[1000]; + size_t callback_count = 0; + std::unique_lock<std::mutex> lk(m_mtx); m_lock = false; @@ -262,12 +292,21 @@ void group_commit_lock::release(value_type num) */ group_commit_waiter_t* cur, * prev, * next; group_commit_waiter_t* wakeup_list = nullptr; - int extra_wake = 0; + for (auto& c : m_pending_callbacks) + { + if (c.first <= num) + { + if (callback_count < array_elements(callbacks)) + callbacks[callback_count++] = c.second; + else + c.second.m_callback(c.second.m_param); + } + } for (prev= nullptr, cur= m_waiters_list; cur; cur= next) { next= cur->m_next; - if (cur->m_value <= num || extra_wake++ == 0) + if (cur->m_value <= num) { /* Move current waiter to wakeup_list*/ @@ -291,8 +330,42 @@ void group_commit_lock::release(value_type num) prev= cur; } } + + auto it= std::remove_if( + m_pending_callbacks.begin(), m_pending_callbacks.end(), + [num](const pending_cb &c) { return c.first <= num; }); + + m_pending_callbacks.erase(it, m_pending_callbacks.end()); + + if (m_pending_callbacks.size() || m_waiters_list) + { + /* + Ensure that after this thread released the lock, + there is a new group commit leader + We take this from waiters list or wakeup list. It + might look like a spurious wake, but in fact we just + ensure the waiter do not wait for eternity. + */ + if (m_waiters_list) + { + /* Move one waiter to wakeup list */ + auto e= m_waiters_list; + m_waiters_list= m_waiters_list->m_next; + e->m_next= wakeup_list; + e->m_group_commit_leader= true; + wakeup_list = e; + } + else if (wakeup_list) + { + wakeup_list->m_group_commit_leader=true; + } + } + lk.unlock(); + for (size_t i = 0; i < callback_count; i++) + callbacks[i].m_callback(callbacks[i].m_param); + for (cur= wakeup_list; cur; cur= next) { next= cur->m_next; diff --git a/storage/innobase/log/log0sync.h b/storage/innobase/log/log0sync.h index 40afbf74ecd..ec4ad6eb99b 100644 --- a/storage/innobase/log/log0sync.h +++ b/storage/innobase/log/log0sync.h @@ -18,8 +18,14 @@ this program; if not, write to the Free Software Foundation, Inc., #include <atomic> #include <thread> #include <log0types.h> +#include <vector> struct group_commit_waiter_t; +struct completion_callback +{ + void (*m_callback)(void*); + void* m_param; +}; /** Special synchronization primitive, which is helpful for @@ -63,14 +69,19 @@ class group_commit_lock std::atomic<value_type> m_pending_value; bool m_lock; group_commit_waiter_t* m_waiters_list; + + typedef std::pair<value_type, completion_callback> pending_cb; + std::vector<pending_cb> m_pending_callbacks; + public: group_commit_lock(); enum lock_return_code { ACQUIRED, - EXPIRED + EXPIRED, + CALLBACK_QUEUED }; - lock_return_code acquire(value_type num); + lock_return_code acquire(value_type num, const completion_callback *cb); void release(value_type num); value_type value() const; value_type pending() const; diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index 2d9a7d43408..413668e0359 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -1162,35 +1162,49 @@ trx_finalize_for_fts( trx->fts_trx = NULL; } -/**********************************************************************//** -If required, flushes the log to disk based on the value of -innodb_flush_log_at_trx_commit. */ -static -void -trx_flush_log_if_needed_low( -/*========================*/ - lsn_t lsn) /*!< in: lsn up to which logs are to be - flushed. */ +extern "C" MYSQL_THD thd_increment_pending_ops(); +extern "C" void thd_decrement_pending_ops(MYSQL_THD); + + +#include "../log/log0sync.h" + +/* + If required, initiates write and optionally flush of the log to + disk + @param[in] lsn - lsn up to which logs are to be flushed. + @param[in] trx_state - if trx_state is PREPARED, the function will + also wait for the flush to complete. +*/ +static void trx_flush_log_if_needed_low(lsn_t lsn, trx_state_t trx_state) { - bool flush = srv_file_flush_method != SRV_NOSYNC; + if (!srv_flush_log_at_trx_commit) + return; - switch (srv_flush_log_at_trx_commit) { - case 3: - case 2: - /* Write the log but do not flush it to disk */ - flush = false; - /* fall through */ - case 1: - /* Write the log and optionally flush it to disk */ - log_write_up_to(lsn, flush); - srv_inc_activity_count(); - return; - case 0: - /* Do nothing */ - return; - } + if (log_sys.get_flushed_lsn() > lsn) + return; - ut_error; + bool flush= srv_file_flush_method != SRV_NOSYNC && + srv_flush_log_at_trx_commit == 1; + + if (trx_state == TRX_STATE_PREPARED) + { + /* XA, which is used with binlog as well. + Be conservative, use synchronous wait.*/ + log_write_up_to(lsn, flush); + return; + } + + completion_callback cb; + if ((cb.m_param = thd_increment_pending_ops())) + { + cb.m_callback = (void (*)(void *)) thd_decrement_pending_ops; + log_write_up_to(lsn, flush, false, &cb); + } + else + { + /* No THD, synchronous write */ + log_write_up_to(lsn, flush); + } } /**********************************************************************//** @@ -1205,51 +1219,53 @@ trx_flush_log_if_needed( trx_t* trx) /*!< in/out: transaction */ { trx->op_info = "flushing log"; - trx_flush_log_if_needed_low(lsn); + trx_flush_log_if_needed_low(lsn,trx->state); trx->op_info = ""; } /** Process tables that were modified by the committing transaction. */ inline void trx_t::commit_tables() { - if (!undo_no || mod_tables.empty()) + if (mod_tables.empty()) return; + if (undo_no) + { #if defined SAFE_MUTEX && defined UNIV_DEBUG - const bool preserve_tables= !innodb_evict_tables_on_commit_debug || - is_recovered || /* avoid trouble with XA recovery */ + const bool preserve_tables= !innodb_evict_tables_on_commit_debug || + is_recovered || /* avoid trouble with XA recovery */ # if 1 /* if dict_stats_exec_sql() were not playing dirty tricks */ - dict_sys.mutex_is_locked(); + dict_sys.mutex_is_locked(); # else /* this would be more proper way to do it */ - dict_operation_lock_mode || dict_operation; + dict_operation_lock_mode || dict_operation; # endif #endif - const trx_id_t max_trx_id= trx_sys.get_max_trx_id(); - const auto now= start_time; + const trx_id_t max_trx_id= trx_sys.get_max_trx_id(); + const auto now= start_time; - for (const auto& p : mod_tables) - { - dict_table_t *table= p.first; - table->update_time= now; - table->query_cache_inv_trx_id= max_trx_id; + for (const auto& p : mod_tables) + { + dict_table_t *table= p.first; + table->update_time= now; + table->query_cache_inv_trx_id= max_trx_id; #if defined SAFE_MUTEX && defined UNIV_DEBUG - if (preserve_tables || table->get_ref_count() || table->is_temporary() || - UT_LIST_GET_LEN(table->locks)) - /* do not evict when committing DDL operations or if some other - transaction is holding the table handle */ - continue; - /* recheck while holding the mutex that blocks - table->acquire() */ - dict_sys.mutex_lock(); - { - LockMutexGuard g{SRW_LOCK_CALL}; - if (!table->get_ref_count() && !UT_LIST_GET_LEN(table->locks)) - dict_sys.remove(table, true); - } - dict_sys.mutex_unlock(); + if (preserve_tables || table->get_ref_count() || table->is_temporary() || + UT_LIST_GET_LEN(table->locks)) + /* do not evict when committing DDL operations or if some other + transaction is holding the table handle */ + continue; + /* recheck while holding the mutex that blocks table->acquire() */ + dict_sys.mutex_lock(); + { + LockMutexGuard g{SRW_LOCK_CALL}; + if (!table->get_ref_count() && !UT_LIST_GET_LEN(table->locks)) + dict_sys.remove(table, true); + } + dict_sys.mutex_unlock(); #endif + } } mod_tables.clear(); |