diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-06-24 10:50:25 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-06-24 10:50:25 +0200 |
commit | 26a9fbc416cc8afaf2099ce293334e85c76b50cb (patch) | |
tree | a49b66fcdbc8775fca1b610019ab5dcc0094af20 /sql | |
parent | 6a0a4f00a1741df68c0d201e090f5d28f59410c8 (diff) | |
download | mariadb-git-26a9fbc416cc8afaf2099ce293334e85c76b50cb.tar.gz |
MDEV-4506: Parallel replication of group-committed transactions: Intermediate commit
First very rough sketch. We spawn and retire a pool of slave threads.
Test main.alias works, most likely not much else does.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/CMakeLists.txt | 2 | ||||
-rw-r--r-- | sql/log.cc | 16 | ||||
-rw-r--r-- | sql/log.h | 5 | ||||
-rw-r--r-- | sql/log_event.cc | 55 | ||||
-rw-r--r-- | sql/log_event.h | 13 | ||||
-rw-r--r-- | sql/mysqld.cc | 19 | ||||
-rw-r--r-- | sql/mysqld.h | 8 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 509 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 74 | ||||
-rw-r--r-- | sql/rpl_rli.h | 2 | ||||
-rw-r--r-- | sql/share/errmsg-utf8.txt | 2 | ||||
-rw-r--r-- | sql/slave.cc | 14 | ||||
-rw-r--r-- | sql/slave.h | 4 | ||||
-rw-r--r-- | sql/sys_vars.cc | 46 |
14 files changed, 739 insertions, 30 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 36ab121cadf..2d7499c8b9e 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -88,7 +88,7 @@ SET (SQL_SOURCE threadpool_common.cc ../sql-common/mysql_async.c my_apc.cc my_apc.h - rpl_gtid.cc + rpl_gtid.cc rpl_parallel.cc ${GEN_SOURCES} ${MYSYS_LIBWRAP_SOURCE} ) diff --git a/sql/log.cc b/sql/log.cc index 19fc3cc7b6f..d312f4bc936 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -5355,7 +5355,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, /* Generate a new global transaction ID, and write it to the binlog */ bool MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, - bool is_transactional) + bool is_transactional, uint64 commit_id) { rpl_gtid gtid; uint32 domain_id= thd->variables.gtid_domain_id; @@ -5393,7 +5393,8 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, return true; Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone, - LOG_EVENT_SUPPRESS_USE_F, is_transactional); + LOG_EVENT_SUPPRESS_USE_F, is_transactional, + commit_id); /* Write the event to the binary log. */ if (gtid_event.write(&mysql_bin_log.log_file)) @@ -5651,7 +5652,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) my_org_b_tell= my_b_tell(file); mysql_mutex_lock(&LOCK_log); prev_binlog_id= current_binlog_id; - if (write_gtid_event(thd, true, using_trans)) + if (write_gtid_event(thd, true, using_trans, 0)) goto err; } else @@ -6667,6 +6668,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) group_commit_entry *queue= NULL; bool check_purge= false; ulong binlog_id; + uint64 commit_id; DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader"); LINT_INIT(binlog_id); @@ -6701,6 +6703,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { + commit_id= (last_in_queue == leader ? 0 : (uint64)leader->thd->query_id); /* Commit every transaction in the queue. @@ -6721,7 +6724,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty()); - if ((current->error= write_transaction_or_stmt(current))) + if ((current->error= write_transaction_or_stmt(current, commit_id))) current->commit_errno= errno; strmake_buf(cache_mngr->last_commit_pos_file, log_file_name); @@ -6896,11 +6899,12 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) int -MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry) +MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, + uint64 commit_id) { binlog_cache_mngr *mngr= entry->cache_mngr; - if (write_gtid_event(entry->thd, false, entry->using_trx_cache)) + if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id)) return ER_ERROR_ON_WRITE; if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && diff --git a/sql/log.h b/sql/log.h index 018ac64eff7..0b1344aa523 100644 --- a/sql/log.h +++ b/sql/log.h @@ -525,7 +525,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG int new_file_impl(bool need_lock); void do_checkpoint_request(ulong binlog_id); void purge(); - int write_transaction_or_stmt(group_commit_entry *entry); + int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id); bool write_transaction_to_binlog_events(group_commit_entry *entry); void trx_group_commit_leader(group_commit_entry *leader); bool is_xidlist_idle_nolock(); @@ -775,7 +775,8 @@ public: inline uint32 get_open_count() { return open_count; } void set_status_variables(THD *thd); bool is_xidlist_idle(); - bool write_gtid_event(THD *thd, bool standalone, bool is_transactional); + bool write_gtid_event(THD *thd, bool standalone, bool is_transactional, + uint64 commit_id); int read_state_from_file(); int write_state_to_file(); int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); diff --git a/sql/log_event.cc b/sql/log_event.cc index 3076cbb1766..431f8b47f2d 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -6101,6 +6101,18 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, domain_id= uint4korr(buf); buf+= 4; flags2= *buf; + if (flags2 & FL_GROUP_COMMIT_ID) + { + if (event_len < (uint)header_size + GTID_HEADER_LEN + 2) + { + seq_no= 0; // So is_valid() returns false + return; + } + ++buf; + commit_id= uint8korr(buf); + } + else + commit_id= 0; } @@ -6108,10 +6120,11 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, uint32 domain_id_arg, bool standalone, - uint16 flags_arg, bool is_transactional) + uint16 flags_arg, bool is_transactional, + uint64 commit_id_arg) : Log_event(thd_arg, flags_arg, is_transactional), - seq_no(seq_no_arg), domain_id(domain_id_arg), - flags2(standalone ? FL_STANDALONE : 0) + seq_no(seq_no_arg), commit_id(commit_id_arg), domain_id(domain_id_arg), + flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0)) { cache_type= Log_event::EVENT_NO_CACHE; } @@ -6156,13 +6169,24 @@ Gtid_log_event::peek(const char *event_start, size_t event_len, bool Gtid_log_event::write(IO_CACHE *file) { - uchar buf[GTID_HEADER_LEN]; + uchar buf[GTID_HEADER_LEN+2]; + size_t write_len; + int8store(buf, seq_no); int4store(buf+8, domain_id); buf[12]= flags2; - bzero(buf+13, GTID_HEADER_LEN-13); - return write_header(file, GTID_HEADER_LEN) || - wrapper_my_b_safe_write(file, buf, GTID_HEADER_LEN) || + if (flags2 & FL_GROUP_COMMIT_ID) + { + int8store(buf+13, commit_id); + write_len= GTID_HEADER_LEN + 2; + } + else + { + bzero(buf+13, GTID_HEADER_LEN-13); + write_len= GTID_HEADER_LEN; + } + return write_header(file, write_len) || + wrapper_my_b_safe_write(file, buf, write_len) || write_footer(file); } @@ -6201,7 +6225,7 @@ Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event, void Gtid_log_event::pack_info(THD *thd, Protocol *protocol) { - char buf[6+5+10+1+10+1+20+1]; + char buf[6+5+10+1+10+1+20+1+4+20+1]; char *p; p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID ")); p= longlong10_to_str(domain_id, p, 10); @@ -6209,6 +6233,11 @@ Gtid_log_event::pack_info(THD *thd, Protocol *protocol) p= longlong10_to_str(server_id, p, 10); *p++= '-'; p= longlong10_to_str(seq_no, p, 10); + if (flags2 & FL_GROUP_COMMIT_ID) + { + p= strmov(p, " cid="); + p= longlong10_to_str(commit_id, p, 10); + } protocol->store(buf, p-buf, &my_charset_bin); } @@ -6295,12 +6324,20 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) Write_on_release_cache cache(&print_event_info->head_cache, file, Write_on_release_cache::FLUSH_F); char buf[21]; + char buf2[21]; if (!print_event_info->short_form) { print_header(&cache, print_event_info, FALSE); longlong10_to_str(seq_no, buf, 10); - my_b_printf(&cache, "\tGTID %u-%u-%s\n", domain_id, server_id, buf); + if (flags2 & FL_GROUP_COMMIT_ID) + { + longlong10_to_str(commit_id, buf2, 10); + my_b_printf(&cache, "\tGTID %u-%u-%s cid=%s\n", + domain_id, server_id, buf, buf2); + } + else + my_b_printf(&cache, "\tGTID %u-%u-%s\n", domain_id, server_id, buf); if (!print_event_info->domain_id_printed || print_event_info->domain_id != domain_id) diff --git a/sql/log_event.h b/sql/log_event.h index b54e2028ef2..641ab3e37b7 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -3080,6 +3080,7 @@ class Gtid_log_event: public Log_event { public: uint64 seq_no; + uint64 commit_id; uint32 domain_id; uchar flags2; @@ -3087,10 +3088,15 @@ public: /* FL_STANDALONE is set when there is no terminating COMMIT event. */ static const uchar FL_STANDALONE= 1; + /* + FL_GROUP_COMMIT_ID is set when event group is part of a group commit on the + master. Groups with same commit_id are part of the same group commit. + */ + static const uchar FL_GROUP_COMMIT_ID= 2; #ifdef MYSQL_SERVER Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone, - uint16 flags, bool is_transactional); + uint16 flags, bool is_transactional, uint64 commit_id); #ifdef HAVE_REPLICATION void pack_info(THD *thd, Protocol *protocol); virtual int do_apply_event(Relay_log_info const *rli); @@ -3104,7 +3110,10 @@ public: const Format_description_log_event *description_event); ~Gtid_log_event() { } Log_event_type get_type_code() { return GTID_EVENT; } - int get_data_size() { return GTID_HEADER_LEN; } + int get_data_size() + { + return GTID_HEADER_LEN + ((flags2 & FL_GROUP_COMMIT_ID) ? 2 : 0); + } bool is_valid() const { return seq_no != 0; } #ifdef MYSQL_SERVER bool write(IO_CACHE *file); diff --git a/sql/mysqld.cc b/sql/mysqld.cc index d5af1634a8a..4e2679f1c91 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -543,6 +543,8 @@ ulong rpl_recovery_rank=0; */ ulong stored_program_cache_size= 0; +ulong opt_slave_parallel_threads= 0; + const double log_10[] = { 1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009, 1e010, 1e011, 1e012, 1e013, 1e014, 1e015, 1e016, 1e017, 1e018, 1e019, @@ -769,7 +771,8 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_LOCK_thread_count, key_LOCK_thread_cache, key_PARTITION_LOCK_auto_inc; PSI_mutex_key key_RELAYLOG_LOCK_index; -PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state; +PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state, + key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool; PSI_mutex_key key_LOCK_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, @@ -844,7 +847,9 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL}, { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0}, { &key_LOCK_slave_state, "LOCK_slave_state", 0}, - { &key_LOCK_binlog_state, "LOCK_binlog_state", 0} + { &key_LOCK_binlog_state, "LOCK_binlog_state", 0}, + { &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0}, + { &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0} }; PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, @@ -886,6 +891,7 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond, PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready; PSI_cond_key key_RELAYLOG_COND_queue_busy; PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; +PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool; static PSI_cond_info all_server_conds[]= { @@ -926,13 +932,15 @@ static PSI_cond_info all_server_conds[]= { &key_user_level_lock_cond, "User_level_lock::cond", 0}, { &key_COND_thread_count, "COND_thread_count", PSI_FLAG_GLOBAL}, { &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL}, - { &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL} + { &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL}, + { &key_COND_rpl_thread, "COND_rpl_thread", 0}, + { &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0} }; PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, key_thread_handle_manager, key_thread_main, key_thread_one_connection, key_thread_signal_hand, - key_thread_slave_init; + key_thread_slave_init, key_rpl_parallel_thread; static PSI_thread_info all_server_threads[]= { @@ -958,7 +966,8 @@ static PSI_thread_info all_server_threads[]= { &key_thread_main, "main", PSI_FLAG_GLOBAL}, { &key_thread_one_connection, "one_connection", 0}, { &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL}, - { &key_thread_slave_init, "slave_init", PSI_FLAG_GLOBAL} + { &key_thread_slave_init, "slave_init", PSI_FLAG_GLOBAL}, + { &key_rpl_parallel_thread, "rpl_parallel_thread", 0} }; PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest, diff --git a/sql/mysqld.h b/sql/mysqld.h index 02d6b41cf69..ff2dfffa991 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -176,6 +176,7 @@ extern ulong slave_max_allowed_packet; extern ulong opt_binlog_rows_event_max_size; extern ulong rpl_recovery_rank, thread_cache_size; extern ulong stored_program_cache_size; +extern ulong opt_slave_parallel_threads; extern ulong back_log; extern ulong executed_events; extern char language[FN_REFLEN]; @@ -247,7 +248,8 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data, key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; extern PSI_mutex_key key_RELAYLOG_LOCK_index; -extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state; +extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state, + key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool; extern PSI_mutex_key key_LOCK_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, @@ -280,10 +282,12 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond, extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready; extern PSI_cond_key key_RELAYLOG_COND_queue_busy; extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; +extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool; extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, key_thread_handle_manager, key_thread_kill_server, key_thread_main, - key_thread_one_connection, key_thread_signal_hand, key_thread_slave_init; + key_thread_one_connection, key_thread_signal_hand, key_thread_slave_init, + key_rpl_parallel_thread; extern PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest, key_file_dbopt, key_file_des_key_file, key_file_ERRMSG, key_select_to_file, diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc new file mode 100644 index 00000000000..a4a87c1e92e --- /dev/null +++ b/sql/rpl_parallel.cc @@ -0,0 +1,509 @@ +#include "my_global.h" +#include "rpl_parallel.h" +#include "slave.h" +#include "rpl_mi.h" + + +struct rpl_parallel_thread_pool global_rpl_thread_pool; + + +static void +rpt_handle_event(rpl_parallel_thread::queued_event *qev, + THD *thd, + struct rpl_parallel_thread *rpt) +{ + int err; + + /* ToDo: Access to thd, and what about rli, split out a parallel part? */ + err= apply_event_and_update_pos(qev->ev, thd, qev->rli, rpt); + /* ToDo: error handling. */ + /* ToDo: also free qev->ev, or hold on to it for a bit if necessary. */ +} + + +pthread_handler_t +handle_rpl_parallel_thread(void *arg) +{ + THD *thd; + const char* old_msg; + struct rpl_parallel_thread::queued_event *events; + bool group_standalone= true; + bool in_event_group= false; + + struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; + + my_thread_init(); + thd = new THD; + thd->thread_stack = (char*)&thd; + mysql_mutex_lock(&LOCK_thread_count); + thd->thread_id= thd->variables.pseudo_thread_id= thread_id++; + threads.append(thd); + mysql_mutex_unlock(&LOCK_thread_count); + set_current_thd(thd); + pthread_detach_this_thread(); + thd->init_for_queries(); + thd->variables.binlog_annotate_row_events= 0; + init_thr_lock(); + thd->store_globals(); + thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; + thd->security_ctx->skip_grants(); + thd->variables.max_allowed_packet= slave_max_allowed_packet; + thd->slave_thread= 1; + thd->enable_slow_log= opt_log_slow_slave_statements; + thd->variables.log_slow_filter= global_system_variables.log_slow_filter; + set_slave_thread_options(thd); + thd->client_capabilities = CLIENT_LOCAL_FILES; + thd_proc_info(thd, "Waiting for work from main SQL threads"); + thd->set_time(); + thd->variables.lock_wait_timeout= LONG_TIMEOUT; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->thd= thd; + + while (rpt->delay_start) + mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); + + rpt->running= true; + + while (!rpt->stop && !thd->killed) + { + rpl_parallel_thread *list; + + old_msg= thd->proc_info; + thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread, + "Waiting for work from SQL thread"); + while (!rpt->stop && !thd->killed && !(events= rpt->event_queue)) + mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); + rpt->free= false; + rpt->event_queue= rpt->last_in_queue= NULL; + thd->exit_cond(old_msg); + + more_events: + while (events) + { + struct rpl_parallel_thread::queued_event *next= events->next; + Log_event_type event_type= events->ev->get_type_code(); + if (event_type == GTID_EVENT) + { + group_standalone= + (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 & + Gtid_log_event::FL_STANDALONE)); + in_event_group= true; + } + else + { + if (group_standalone) + { + if (!Log_event::is_part_of_group(event_type)) + in_event_group= false; + } + else if (event_type == XID_EVENT) + in_event_group= false; + else if (event_type == QUERY_EVENT) + { + Query_log_event *query= static_cast<Query_log_event *>(events->ev); + if (!strcmp("COMMIT", query->query) || + !strcmp("ROLLBACK", query->query)) + in_event_group= false; + } + } + rpt_handle_event(events, thd, rpt); + free(events); + events= next; + } + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if ((events= rpt->event_queue) != NULL) + { + rpt->event_queue= rpt->last_in_queue= NULL; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + goto more_events; + } + + if (!in_event_group) + { + rpt->current_entry= NULL; + if (!rpt->free) + { + mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool); + list= rpt->pool->free_list; + rpt->next= list; + rpt->pool->free_list= list; + if (!list) + mysql_cond_signal(&rpt->pool->COND_rpl_thread_pool); + mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool); + rpt->free= true; + } + } + } + + rpt->running= false; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + + return NULL; +} + + +int +rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, + uint32 new_count, bool skip_check) +{ + uint32 i; + rpl_parallel_thread **new_list= NULL; + rpl_parallel_thread *new_free_list= NULL; + + /* + Allocate the new list of threads up-front. + That way, if we fail half-way, we only need to free whatever we managed + to allocate, and will not be left with a half-functional thread pool. + */ + if (new_count && + !(new_list= (rpl_parallel_thread **)my_malloc(new_count*sizeof(*new_list), + MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list)))); + goto err;; + } + + for (i= 0; i < new_count; ++i) + { + pthread_t th; + + if (!(new_list[i]= (rpl_parallel_thread *)my_malloc(sizeof(*(new_list[i])), + MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*(new_list[i]))); + goto err; + } + new_list[i]->delay_start= true; + new_list[i]->running= false; + new_list[i]->stop= false; + new_list[i]->free= false; + mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread, + MY_MUTEX_INIT_SLOW); + mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL); + new_list[i]->pool= pool; + new_list[i]->current_entry= NULL; + new_list[i]->event_queue= NULL; + new_list[i]->last_in_queue= NULL; + if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL, + handle_rpl_parallel_thread, new_list[i])) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + my_free(new_list[i]); + goto err; + } + new_list[i]->next= new_free_list; + new_free_list= new_list[i]; + } + + if (!skip_check) + { + mysql_mutex_lock(&LOCK_active_mi); + if (master_info_index->give_error_if_slave_running()) + { + mysql_mutex_unlock(&LOCK_active_mi); + goto err; + } + if (pool->changing) + { + mysql_mutex_unlock(&LOCK_active_mi); + my_error(ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE, MYF(0)); + goto err; + } + pool->changing= true; + mysql_mutex_unlock(&LOCK_active_mi); + } + + for (i= 0; i < pool->count; ++i) + { + rpl_parallel_thread *rpt= pool->get_thread(NULL); + rpt->stop= true; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + } + + for (i= 0; i < pool->count; ++i) + { + rpl_parallel_thread *rpt= pool->threads[i]; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + while (rpt->running) + mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + delete rpt; + } + + my_free(pool->threads); + pool->threads= new_list; + pool->free_list= new_free_list; + pool->count= new_count; + for (i= 0; i < pool->count; ++i) + { + mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread); + pool->threads[i]->delay_start= false; + mysql_cond_signal(&pool->threads[i]->COND_rpl_thread); + mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread); + } + + if (!skip_check) + { + mysql_mutex_lock(&LOCK_active_mi); + pool->changing= false; + mysql_mutex_unlock(&LOCK_active_mi); + } + return 0; + +err: + if (new_list) + { + while (new_free_list) + { + rpl_parallel_thread *next= new_free_list->next; + mysql_mutex_lock(&new_free_list->LOCK_rpl_thread); + new_free_list->delay_start= false; + new_free_list->stop= true; + while (!new_free_list->running) + mysql_cond_wait(&new_free_list->COND_rpl_thread, + &new_free_list->LOCK_rpl_thread); + while (new_free_list->running) + mysql_cond_wait(&new_free_list->COND_rpl_thread, + &new_free_list->LOCK_rpl_thread); + my_free(new_free_list); + new_free_list= next; + } + my_free(new_list); + } + if (!skip_check) + { + mysql_mutex_lock(&LOCK_active_mi); + pool->changing= false; + mysql_mutex_unlock(&LOCK_active_mi); + } + return 1; +} + + +rpl_parallel_thread_pool::rpl_parallel_thread_pool() + : count(0), threads(0), free_list(0), changing(false), inited(false) +{ +} + + +int +rpl_parallel_thread_pool::init(uint32 size) +{ + count= 0; + threads= NULL; + free_list= NULL; + + mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, + MY_MUTEX_INIT_SLOW); + mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL); + changing= false; + inited= true; + + return rpl_parallel_change_thread_count(this, size, true); +} + + +void +rpl_parallel_thread_pool::destroy() +{ + if (!inited) + return; + rpl_parallel_change_thread_count(this, 0, true); + mysql_mutex_destroy(&LOCK_rpl_thread_pool); + mysql_cond_destroy(&COND_rpl_thread_pool); +} + + +struct rpl_parallel_thread * +rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry) +{ + rpl_parallel_thread *rpt; + + mysql_mutex_lock(&LOCK_rpl_thread_pool); + while ((rpt= free_list) == NULL) + mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); + free_list= rpt->next; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + mysql_mutex_unlock(&LOCK_rpl_thread_pool); + rpt->current_entry= entry; + + return rpt; +} + + +rpl_parallel::rpl_parallel() : + current(NULL) +{ + my_hash_init(&domain_hash, &my_charset_bin, 32, + offsetof(rpl_parallel_entry, domain_id), sizeof(uint32), + NULL, NULL, HASH_UNIQUE); +} + + +rpl_parallel::~rpl_parallel() +{ + my_hash_free(&domain_hash); +} + + +rpl_parallel_entry * +rpl_parallel::find(uint32 domain_id) +{ + struct rpl_parallel_entry *e; + + if (!(e= (rpl_parallel_entry *)my_hash_search(&domain_hash, + (const uchar *)&domain_id, 0))) + { + /* Allocate a new, empty one. */ + if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e), MYF(0)))) + return NULL; + e->domain_id= domain_id; + e->last_server_id= 0; + e->last_seq_no= 0; + e->last_commit_id= 0; + e->active= false; + e->rpl_thread= NULL; + if (my_hash_insert(&domain_hash, (uchar *)e)) + { + my_free(e); + return NULL; + } + } + + return e; +} + + +bool +rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd) +{ + rpl_parallel_entry *e; + rpl_parallel_thread *cur_thread; + rpl_parallel_thread::queued_event *qev; + + if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev), + MYF(0)))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return true; + } + qev->ev= ev; + qev->rli= rli; + qev->next= NULL; + + if (ev->get_type_code() == GTID_EVENT) + { + Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); + + if (!(e= find(gtid_ev->domain_id))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return true; + } + + /* Check if we already have a worker thread for this entry. */ + cur_thread= e->rpl_thread; + if (cur_thread) + { + mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); + if (cur_thread->current_entry != e) + { + /* Not ours anymore, we need to grab a new one. */ + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + e->rpl_thread= cur_thread= NULL; + } + } + + if (!cur_thread) + { + /* + Nothing else is currently running in this domain. We can spawn a new + thread to do this event group in parallel with anything else that might + be running in other domains. + */ + if (gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID) + { + e->last_server_id= gtid_ev->server_id; + e->last_seq_no= gtid_ev->seq_no; + e->last_commit_id= gtid_ev->commit_id; + } + else + { + e->last_server_id= 0; + e->last_seq_no= 0; + e->last_commit_id= 0; + } + cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e); + e->rpl_thread->wait_for= NULL; /* ToDo */ + /* get_thread() returns with the LOCK_rpl_thread locked. */ + } + else if ((gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID) && + e->last_commit_id == gtid_ev->commit_id) + { + /* + We are already executing something else in this domain. But the two + event groups were committed together in the same group commit on the + master, so we can still do them in parallel here on the slave. + + However, the commit of this event must wait for the commit of the prior + event, to preserve binlog commit order and visibility across all + servers in the replication hierarchy. + */ + rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e); + rpt->wait_for= cur_thread; /* ToDo */ + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + e->rpl_thread= cur_thread= rpt; + /* get_thread() returns with the LOCK_rpl_thread locked. */ + } + else + { + /* + We are still executing the previous event group for this replication + domain, and we have to wait for that to finish before we can start on + the next one. So just re-use the thread. + */ + } + + current= e; + } + else + { + if (!current) + { + /* We have no domain_id yet, just run non-parallel. */ + rpt_handle_event(qev, parent_thd, NULL); + return false; + } + cur_thread= current->rpl_thread; + if (cur_thread) + { + mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); + if (cur_thread->current_entry != current) + { + /* Not ours anymore, we need to grab a new one. */ + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + cur_thread= NULL; + } + } + if (!cur_thread) + { + cur_thread= current->rpl_thread= + global_rpl_thread_pool.get_thread(current); + cur_thread->wait_for= NULL; /* ToDo */ + } + } + /* + Queue the event for processing. + */ + if (cur_thread->last_in_queue) + cur_thread->last_in_queue->next= qev; + else + cur_thread->event_queue= qev; + cur_thread->last_in_queue= qev; + mysql_cond_signal(&cur_thread->COND_rpl_thread); + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + + return false; +} diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h new file mode 100644 index 00000000000..7e966f1615c --- /dev/null +++ b/sql/rpl_parallel.h @@ -0,0 +1,74 @@ +#ifndef RPL_PARALLEL_H +#define RPL_PARALLEL_H + +#include "log_event.h" + + +struct rpl_parallel; +struct rpl_parallel_entry; +struct rpl_parallel_thread_pool; + +class Relay_log_info; +struct rpl_parallel_thread { + bool delay_start; + bool running; + bool stop; + bool free; + mysql_mutex_t LOCK_rpl_thread; + mysql_cond_t COND_rpl_thread; + struct rpl_parallel_thread *next; /* For free list. */ + struct rpl_parallel_thread_pool *pool; + THD *thd; + struct rpl_parallel_entry *current_entry; + struct queued_event { + queued_event *next; + Log_event *ev; + Relay_log_info *rli; + } *event_queue, *last_in_queue; + rpl_parallel_thread *wait_for; /* ToDo: change this ... */ +}; + + +struct rpl_parallel_thread_pool { + uint32 count; + struct rpl_parallel_thread **threads; + struct rpl_parallel_thread *free_list; + mysql_mutex_t LOCK_rpl_thread_pool; + mysql_cond_t COND_rpl_thread_pool; + bool changing; + bool inited; + + rpl_parallel_thread_pool(); + int init(uint32 size); + void destroy(); + struct rpl_parallel_thread *get_thread(rpl_parallel_entry *entry); +}; + + +struct rpl_parallel_entry { + uint32 domain_id; + uint32 last_server_id; + uint64 last_seq_no; + uint64 last_commit_id; + bool active; + rpl_parallel_thread *rpl_thread; +}; +struct rpl_parallel { + HASH domain_hash; + rpl_parallel_entry *current; + + rpl_parallel(); + ~rpl_parallel(); + rpl_parallel_entry *find(uint32 domain_id); + bool do_event(Relay_log_info *rli, Log_event *ev, THD *thd); +}; + + +extern struct rpl_parallel_thread_pool global_rpl_thread_pool; + + +extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, + uint32 new_count, + bool skip_check= false); + +#endif /* RPL_PARALLEL_H */ diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 6dd757343fd..452457e9e5a 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -22,6 +22,7 @@ #include "log.h" /* LOG_INFO, MYSQL_BIN_LOG */ #include "sql_class.h" /* THD */ #include "log_event.h" +#include "rpl_parallel.h" struct RPL_TABLE_LIST; class Master_info; @@ -318,6 +319,7 @@ public: */ uint64 gtid_sub_id; rpl_gtid current_gtid; + rpl_parallel parallel; Relay_log_info(bool is_slave_recovery); ~Relay_log_info(); diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index 48809417a6c..85baddd3c49 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -6555,3 +6555,5 @@ ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO eng "Cannot modify @@session.gtid_domain_id or @@session.gtid_seq_no inside a transaction" ER_STORED_FUNCTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO eng "Cannot modify @@session.gtid_domain_id or @@session.gtid_seq_no inside a stored function or trigger" +ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE + eng "Cannot change @@slave_parallel_threads while another change is in progress" diff --git a/sql/slave.cc b/sql/slave.cc index 1734b2c4f76..419fa579a09 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -57,6 +57,8 @@ #include "rpl_tblmap.h" #include "debug_sync.h" +#include "rpl_parallel.h" + #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -360,6 +362,9 @@ int init_slave() goto err; } + if (global_rpl_thread_pool.init(opt_slave_parallel_threads)) + return 1; + /* If --slave-skip-errors=... was not used, the string value for the system variable has not been set up yet. Do it now. @@ -947,6 +952,7 @@ void end_slave() master_info_index= 0; active_mi= 0; mysql_mutex_unlock(&LOCK_active_mi); + global_rpl_thread_pool.destroy(); free_all_rpl_filters(); DBUG_VOID_RETURN; } @@ -3012,7 +3018,8 @@ static int has_temporary_error(THD *thd) @retval 2 No error calling ev->apply_event(), but error calling ev->update_pos(). */ -int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) +int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, + rpl_parallel_thread *rpt) { int exec_res= 0; @@ -3234,7 +3241,10 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) };); } - exec_res= apply_event_and_update_pos(ev, thd, rli); + if (opt_slave_parallel_threads > 0) + DBUG_RETURN(rli->parallel.do_event(rli, ev, thd)); + + exec_res= apply_event_and_update_pos(ev, thd, rli, NULL); switch (ev->get_type_code()) { case FORMAT_DESCRIPTION_EVENT: diff --git a/sql/slave.h b/sql/slave.h index 565f40b7236..69b0e011a39 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -51,6 +51,7 @@ class Relay_log_info; class Master_info; class Master_info_index; +struct rpl_parallel_thread; int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, @@ -227,7 +228,8 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, void set_slave_thread_options(THD* thd); void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli); int rotate_relay_log(Master_info* mi); -int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli); +int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, + rpl_parallel_thread *rpt); pthread_handler_t handle_slave_io(void *arg); pthread_handler_t handle_slave_sql(void *arg); diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 4066a04aea7..f63960a4e36 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -57,6 +57,7 @@ #include "threadpool.h" #include "sql_repl.h" #include "opt_range.h" +#include "rpl_parallel.h" /* The rule for this file: everything should be 'static'. When a sys_var @@ -1434,6 +1435,51 @@ static Sys_var_mybool Sys_gtid_strict_mode( "generate an out-of-order binlog if executed.", GLOBAL_VAR(opt_gtid_strict_mode), CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + + +static bool +check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var) +{ + bool running; + + mysql_mutex_unlock(&LOCK_global_system_variables); + mysql_mutex_lock(&LOCK_active_mi); + running= master_info_index->give_error_if_slave_running(); + mysql_mutex_unlock(&LOCK_active_mi); + mysql_mutex_lock(&LOCK_global_system_variables); + if (running) + return true; + + return false; +} + +static bool +fix_slave_parallel_threads(sys_var *self, THD *thd, enum_var_type type) +{ + bool running; + + mysql_mutex_unlock(&LOCK_global_system_variables); + mysql_mutex_lock(&LOCK_active_mi); + running= master_info_index->give_error_if_slave_running(); + mysql_mutex_unlock(&LOCK_active_mi); + mysql_mutex_lock(&LOCK_global_system_variables); + if (running || rpl_parallel_change_thread_count(&global_rpl_thread_pool, + opt_slave_parallel_threads)) + return true; + + return false; +} + + +static Sys_var_ulong Sys_slave_parallel_threads( + "slave_parallel_threads", + "If non-zero, number of threads to spawn to apply in parallel events " + "on the slave that were group-committed on the master or were logged " + "with GTID in different replication domains.", + GLOBAL_VAR(opt_slave_parallel_threads), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,16383), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD, + NOT_IN_BINLOG, ON_CHECK(check_slave_parallel_threads), + ON_UPDATE(fix_slave_parallel_threads)); #endif |