summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-06-24 10:50:25 +0200
committerunknown <knielsen@knielsen-hq.org>2013-06-24 10:50:25 +0200
commit26a9fbc416cc8afaf2099ce293334e85c76b50cb (patch)
treea49b66fcdbc8775fca1b610019ab5dcc0094af20 /sql
parent6a0a4f00a1741df68c0d201e090f5d28f59410c8 (diff)
downloadmariadb-git-26a9fbc416cc8afaf2099ce293334e85c76b50cb.tar.gz
MDEV-4506: Parallel replication of group-committed transactions: Intermediate commit
First very rough sketch. We spawn and retire a pool of slave threads. Test main.alias works, most likely not much else does.
Diffstat (limited to 'sql')
-rw-r--r--sql/CMakeLists.txt2
-rw-r--r--sql/log.cc16
-rw-r--r--sql/log.h5
-rw-r--r--sql/log_event.cc55
-rw-r--r--sql/log_event.h13
-rw-r--r--sql/mysqld.cc19
-rw-r--r--sql/mysqld.h8
-rw-r--r--sql/rpl_parallel.cc509
-rw-r--r--sql/rpl_parallel.h74
-rw-r--r--sql/rpl_rli.h2
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/slave.cc14
-rw-r--r--sql/slave.h4
-rw-r--r--sql/sys_vars.cc46
14 files changed, 739 insertions, 30 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 36ab121cadf..2d7499c8b9e 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -88,7 +88,7 @@ SET (SQL_SOURCE
threadpool_common.cc
../sql-common/mysql_async.c
my_apc.cc my_apc.h
- rpl_gtid.cc
+ rpl_gtid.cc rpl_parallel.cc
${GEN_SOURCES}
${MYSYS_LIBWRAP_SOURCE}
)
diff --git a/sql/log.cc b/sql/log.cc
index 19fc3cc7b6f..d312f4bc936 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -5355,7 +5355,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
/* Generate a new global transaction ID, and write it to the binlog */
bool
MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
- bool is_transactional)
+ bool is_transactional, uint64 commit_id)
{
rpl_gtid gtid;
uint32 domain_id= thd->variables.gtid_domain_id;
@@ -5393,7 +5393,8 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
return true;
Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
- LOG_EVENT_SUPPRESS_USE_F, is_transactional);
+ LOG_EVENT_SUPPRESS_USE_F, is_transactional,
+ commit_id);
/* Write the event to the binary log. */
if (gtid_event.write(&mysql_bin_log.log_file))
@@ -5651,7 +5652,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log);
prev_binlog_id= current_binlog_id;
- if (write_gtid_event(thd, true, using_trans))
+ if (write_gtid_event(thd, true, using_trans, 0))
goto err;
}
else
@@ -6667,6 +6668,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
group_commit_entry *queue= NULL;
bool check_purge= false;
ulong binlog_id;
+ uint64 commit_id;
DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader");
LINT_INIT(binlog_id);
@@ -6701,6 +6703,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
DBUG_ASSERT(is_open());
if (likely(is_open())) // Should always be true
{
+ commit_id= (last_in_queue == leader ? 0 : (uint64)leader->thd->query_id);
/*
Commit every transaction in the queue.
@@ -6721,7 +6724,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty());
- if ((current->error= write_transaction_or_stmt(current)))
+ if ((current->error= write_transaction_or_stmt(current, commit_id)))
current->commit_errno= errno;
strmake_buf(cache_mngr->last_commit_pos_file, log_file_name);
@@ -6896,11 +6899,12 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
int
-MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry)
+MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
+ uint64 commit_id)
{
binlog_cache_mngr *mngr= entry->cache_mngr;
- if (write_gtid_event(entry->thd, false, entry->using_trx_cache))
+ if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id))
return ER_ERROR_ON_WRITE;
if (entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
diff --git a/sql/log.h b/sql/log.h
index 018ac64eff7..0b1344aa523 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -525,7 +525,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
int new_file_impl(bool need_lock);
void do_checkpoint_request(ulong binlog_id);
void purge();
- int write_transaction_or_stmt(group_commit_entry *entry);
+ int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id);
bool write_transaction_to_binlog_events(group_commit_entry *entry);
void trx_group_commit_leader(group_commit_entry *leader);
bool is_xidlist_idle_nolock();
@@ -775,7 +775,8 @@ public:
inline uint32 get_open_count() { return open_count; }
void set_status_variables(THD *thd);
bool is_xidlist_idle();
- bool write_gtid_event(THD *thd, bool standalone, bool is_transactional);
+ bool write_gtid_event(THD *thd, bool standalone, bool is_transactional,
+ uint64 commit_id);
int read_state_from_file();
int write_state_to_file();
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 3076cbb1766..431f8b47f2d 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -6101,6 +6101,18 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
domain_id= uint4korr(buf);
buf+= 4;
flags2= *buf;
+ if (flags2 & FL_GROUP_COMMIT_ID)
+ {
+ if (event_len < (uint)header_size + GTID_HEADER_LEN + 2)
+ {
+ seq_no= 0; // So is_valid() returns false
+ return;
+ }
+ ++buf;
+ commit_id= uint8korr(buf);
+ }
+ else
+ commit_id= 0;
}
@@ -6108,10 +6120,11 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
uint32 domain_id_arg, bool standalone,
- uint16 flags_arg, bool is_transactional)
+ uint16 flags_arg, bool is_transactional,
+ uint64 commit_id_arg)
: Log_event(thd_arg, flags_arg, is_transactional),
- seq_no(seq_no_arg), domain_id(domain_id_arg),
- flags2(standalone ? FL_STANDALONE : 0)
+ seq_no(seq_no_arg), commit_id(commit_id_arg), domain_id(domain_id_arg),
+ flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0))
{
cache_type= Log_event::EVENT_NO_CACHE;
}
@@ -6156,13 +6169,24 @@ Gtid_log_event::peek(const char *event_start, size_t event_len,
bool
Gtid_log_event::write(IO_CACHE *file)
{
- uchar buf[GTID_HEADER_LEN];
+ uchar buf[GTID_HEADER_LEN+2];
+ size_t write_len;
+
int8store(buf, seq_no);
int4store(buf+8, domain_id);
buf[12]= flags2;
- bzero(buf+13, GTID_HEADER_LEN-13);
- return write_header(file, GTID_HEADER_LEN) ||
- wrapper_my_b_safe_write(file, buf, GTID_HEADER_LEN) ||
+ if (flags2 & FL_GROUP_COMMIT_ID)
+ {
+ int8store(buf+13, commit_id);
+ write_len= GTID_HEADER_LEN + 2;
+ }
+ else
+ {
+ bzero(buf+13, GTID_HEADER_LEN-13);
+ write_len= GTID_HEADER_LEN;
+ }
+ return write_header(file, write_len) ||
+ wrapper_my_b_safe_write(file, buf, write_len) ||
write_footer(file);
}
@@ -6201,7 +6225,7 @@ Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
void
Gtid_log_event::pack_info(THD *thd, Protocol *protocol)
{
- char buf[6+5+10+1+10+1+20+1];
+ char buf[6+5+10+1+10+1+20+1+4+20+1];
char *p;
p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID "));
p= longlong10_to_str(domain_id, p, 10);
@@ -6209,6 +6233,11 @@ Gtid_log_event::pack_info(THD *thd, Protocol *protocol)
p= longlong10_to_str(server_id, p, 10);
*p++= '-';
p= longlong10_to_str(seq_no, p, 10);
+ if (flags2 & FL_GROUP_COMMIT_ID)
+ {
+ p= strmov(p, " cid=");
+ p= longlong10_to_str(commit_id, p, 10);
+ }
protocol->store(buf, p-buf, &my_charset_bin);
}
@@ -6295,12 +6324,20 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
Write_on_release_cache cache(&print_event_info->head_cache, file,
Write_on_release_cache::FLUSH_F);
char buf[21];
+ char buf2[21];
if (!print_event_info->short_form)
{
print_header(&cache, print_event_info, FALSE);
longlong10_to_str(seq_no, buf, 10);
- my_b_printf(&cache, "\tGTID %u-%u-%s\n", domain_id, server_id, buf);
+ if (flags2 & FL_GROUP_COMMIT_ID)
+ {
+ longlong10_to_str(commit_id, buf2, 10);
+ my_b_printf(&cache, "\tGTID %u-%u-%s cid=%s\n",
+ domain_id, server_id, buf, buf2);
+ }
+ else
+ my_b_printf(&cache, "\tGTID %u-%u-%s\n", domain_id, server_id, buf);
if (!print_event_info->domain_id_printed ||
print_event_info->domain_id != domain_id)
diff --git a/sql/log_event.h b/sql/log_event.h
index b54e2028ef2..641ab3e37b7 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3080,6 +3080,7 @@ class Gtid_log_event: public Log_event
{
public:
uint64 seq_no;
+ uint64 commit_id;
uint32 domain_id;
uchar flags2;
@@ -3087,10 +3088,15 @@ public:
/* FL_STANDALONE is set when there is no terminating COMMIT event. */
static const uchar FL_STANDALONE= 1;
+ /*
+ FL_GROUP_COMMIT_ID is set when event group is part of a group commit on the
+ master. Groups with same commit_id are part of the same group commit.
+ */
+ static const uchar FL_GROUP_COMMIT_ID= 2;
#ifdef MYSQL_SERVER
Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,
- uint16 flags, bool is_transactional);
+ uint16 flags, bool is_transactional, uint64 commit_id);
#ifdef HAVE_REPLICATION
void pack_info(THD *thd, Protocol *protocol);
virtual int do_apply_event(Relay_log_info const *rli);
@@ -3104,7 +3110,10 @@ public:
const Format_description_log_event *description_event);
~Gtid_log_event() { }
Log_event_type get_type_code() { return GTID_EVENT; }
- int get_data_size() { return GTID_HEADER_LEN; }
+ int get_data_size()
+ {
+ return GTID_HEADER_LEN + ((flags2 & FL_GROUP_COMMIT_ID) ? 2 : 0);
+ }
bool is_valid() const { return seq_no != 0; }
#ifdef MYSQL_SERVER
bool write(IO_CACHE *file);
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index d5af1634a8a..4e2679f1c91 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -543,6 +543,8 @@ ulong rpl_recovery_rank=0;
*/
ulong stored_program_cache_size= 0;
+ulong opt_slave_parallel_threads= 0;
+
const double log_10[] = {
1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009,
1e010, 1e011, 1e012, 1e013, 1e014, 1e015, 1e016, 1e017, 1e018, 1e019,
@@ -769,7 +771,8 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_thread_count, key_LOCK_thread_cache,
key_PARTITION_LOCK_auto_inc;
PSI_mutex_key key_RELAYLOG_LOCK_index;
-PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
+PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
+ key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool;
PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
@@ -844,7 +847,9 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
{ &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0},
{ &key_LOCK_slave_state, "LOCK_slave_state", 0},
- { &key_LOCK_binlog_state, "LOCK_binlog_state", 0}
+ { &key_LOCK_binlog_state, "LOCK_binlog_state", 0},
+ { &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0},
+ { &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0}
};
PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
@@ -886,6 +891,7 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
+PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool;
static PSI_cond_info all_server_conds[]=
{
@@ -926,13 +932,15 @@ static PSI_cond_info all_server_conds[]=
{ &key_user_level_lock_cond, "User_level_lock::cond", 0},
{ &key_COND_thread_count, "COND_thread_count", PSI_FLAG_GLOBAL},
{ &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL},
- { &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL}
+ { &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL},
+ { &key_COND_rpl_thread, "COND_rpl_thread", 0},
+ { &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0}
};
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_main,
key_thread_one_connection, key_thread_signal_hand,
- key_thread_slave_init;
+ key_thread_slave_init, key_rpl_parallel_thread;
static PSI_thread_info all_server_threads[]=
{
@@ -958,7 +966,8 @@ static PSI_thread_info all_server_threads[]=
{ &key_thread_main, "main", PSI_FLAG_GLOBAL},
{ &key_thread_one_connection, "one_connection", 0},
{ &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL},
- { &key_thread_slave_init, "slave_init", PSI_FLAG_GLOBAL}
+ { &key_thread_slave_init, "slave_init", PSI_FLAG_GLOBAL},
+ { &key_rpl_parallel_thread, "rpl_parallel_thread", 0}
};
PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest,
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 02d6b41cf69..ff2dfffa991 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -176,6 +176,7 @@ extern ulong slave_max_allowed_packet;
extern ulong opt_binlog_rows_event_max_size;
extern ulong rpl_recovery_rank, thread_cache_size;
extern ulong stored_program_cache_size;
+extern ulong opt_slave_parallel_threads;
extern ulong back_log;
extern ulong executed_events;
extern char language[FN_REFLEN];
@@ -247,7 +248,8 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
-extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
+extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
+ key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool;
extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
@@ -280,10 +282,12 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
+extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
- key_thread_one_connection, key_thread_signal_hand, key_thread_slave_init;
+ key_thread_one_connection, key_thread_signal_hand, key_thread_slave_init,
+ key_rpl_parallel_thread;
extern PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest,
key_file_dbopt, key_file_des_key_file, key_file_ERRMSG, key_select_to_file,
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
new file mode 100644
index 00000000000..a4a87c1e92e
--- /dev/null
+++ b/sql/rpl_parallel.cc
@@ -0,0 +1,509 @@
+#include "my_global.h"
+#include "rpl_parallel.h"
+#include "slave.h"
+#include "rpl_mi.h"
+
+
+struct rpl_parallel_thread_pool global_rpl_thread_pool;
+
+
+static void
+rpt_handle_event(rpl_parallel_thread::queued_event *qev,
+ THD *thd,
+ struct rpl_parallel_thread *rpt)
+{
+ int err;
+
+ /* ToDo: Access to thd, and what about rli, split out a parallel part? */
+ err= apply_event_and_update_pos(qev->ev, thd, qev->rli, rpt);
+ /* ToDo: error handling. */
+ /* ToDo: also free qev->ev, or hold on to it for a bit if necessary. */
+}
+
+
+pthread_handler_t
+handle_rpl_parallel_thread(void *arg)
+{
+ THD *thd;
+ const char* old_msg;
+ struct rpl_parallel_thread::queued_event *events;
+ bool group_standalone= true;
+ bool in_event_group= false;
+
+ struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
+
+ my_thread_init();
+ thd = new THD;
+ thd->thread_stack = (char*)&thd;
+ mysql_mutex_lock(&LOCK_thread_count);
+ thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
+ threads.append(thd);
+ mysql_mutex_unlock(&LOCK_thread_count);
+ set_current_thd(thd);
+ pthread_detach_this_thread();
+ thd->init_for_queries();
+ thd->variables.binlog_annotate_row_events= 0;
+ init_thr_lock();
+ thd->store_globals();
+ thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
+ thd->security_ctx->skip_grants();
+ thd->variables.max_allowed_packet= slave_max_allowed_packet;
+ thd->slave_thread= 1;
+ thd->enable_slow_log= opt_log_slow_slave_statements;
+ thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
+ set_slave_thread_options(thd);
+ thd->client_capabilities = CLIENT_LOCAL_FILES;
+ thd_proc_info(thd, "Waiting for work from main SQL threads");
+ thd->set_time();
+ thd->variables.lock_wait_timeout= LONG_TIMEOUT;
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->thd= thd;
+
+ while (rpt->delay_start)
+ mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
+
+ rpt->running= true;
+
+ while (!rpt->stop && !thd->killed)
+ {
+ rpl_parallel_thread *list;
+
+ old_msg= thd->proc_info;
+ thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
+ "Waiting for work from SQL thread");
+ while (!rpt->stop && !thd->killed && !(events= rpt->event_queue))
+ mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
+ rpt->free= false;
+ rpt->event_queue= rpt->last_in_queue= NULL;
+ thd->exit_cond(old_msg);
+
+ more_events:
+ while (events)
+ {
+ struct rpl_parallel_thread::queued_event *next= events->next;
+ Log_event_type event_type= events->ev->get_type_code();
+ if (event_type == GTID_EVENT)
+ {
+ group_standalone=
+ (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
+ Gtid_log_event::FL_STANDALONE));
+ in_event_group= true;
+ }
+ else
+ {
+ if (group_standalone)
+ {
+ if (!Log_event::is_part_of_group(event_type))
+ in_event_group= false;
+ }
+ else if (event_type == XID_EVENT)
+ in_event_group= false;
+ else if (event_type == QUERY_EVENT)
+ {
+ Query_log_event *query= static_cast<Query_log_event *>(events->ev);
+ if (!strcmp("COMMIT", query->query) ||
+ !strcmp("ROLLBACK", query->query))
+ in_event_group= false;
+ }
+ }
+ rpt_handle_event(events, thd, rpt);
+ free(events);
+ events= next;
+ }
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if ((events= rpt->event_queue) != NULL)
+ {
+ rpt->event_queue= rpt->last_in_queue= NULL;
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ goto more_events;
+ }
+
+ if (!in_event_group)
+ {
+ rpt->current_entry= NULL;
+ if (!rpt->free)
+ {
+ mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool);
+ list= rpt->pool->free_list;
+ rpt->next= list;
+ rpt->pool->free_list= list;
+ if (!list)
+ mysql_cond_signal(&rpt->pool->COND_rpl_thread_pool);
+ mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool);
+ rpt->free= true;
+ }
+ }
+ }
+
+ rpt->running= false;
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+
+ return NULL;
+}
+
+
+int
+rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
+ uint32 new_count, bool skip_check)
+{
+ uint32 i;
+ rpl_parallel_thread **new_list= NULL;
+ rpl_parallel_thread *new_free_list= NULL;
+
+ /*
+ Allocate the new list of threads up-front.
+ That way, if we fail half-way, we only need to free whatever we managed
+ to allocate, and will not be left with a half-functional thread pool.
+ */
+ if (new_count &&
+ !(new_list= (rpl_parallel_thread **)my_malloc(new_count*sizeof(*new_list),
+ MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list))));
+ goto err;;
+ }
+
+ for (i= 0; i < new_count; ++i)
+ {
+ pthread_t th;
+
+ if (!(new_list[i]= (rpl_parallel_thread *)my_malloc(sizeof(*(new_list[i])),
+ MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*(new_list[i])));
+ goto err;
+ }
+ new_list[i]->delay_start= true;
+ new_list[i]->running= false;
+ new_list[i]->stop= false;
+ new_list[i]->free= false;
+ mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread,
+ MY_MUTEX_INIT_SLOW);
+ mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL);
+ new_list[i]->pool= pool;
+ new_list[i]->current_entry= NULL;
+ new_list[i]->event_queue= NULL;
+ new_list[i]->last_in_queue= NULL;
+ if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL,
+ handle_rpl_parallel_thread, new_list[i]))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ my_free(new_list[i]);
+ goto err;
+ }
+ new_list[i]->next= new_free_list;
+ new_free_list= new_list[i];
+ }
+
+ if (!skip_check)
+ {
+ mysql_mutex_lock(&LOCK_active_mi);
+ if (master_info_index->give_error_if_slave_running())
+ {
+ mysql_mutex_unlock(&LOCK_active_mi);
+ goto err;
+ }
+ if (pool->changing)
+ {
+ mysql_mutex_unlock(&LOCK_active_mi);
+ my_error(ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE, MYF(0));
+ goto err;
+ }
+ pool->changing= true;
+ mysql_mutex_unlock(&LOCK_active_mi);
+ }
+
+ for (i= 0; i < pool->count; ++i)
+ {
+ rpl_parallel_thread *rpt= pool->get_thread(NULL);
+ rpt->stop= true;
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ }
+
+ for (i= 0; i < pool->count; ++i)
+ {
+ rpl_parallel_thread *rpt= pool->threads[i];
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ while (rpt->running)
+ mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ delete rpt;
+ }
+
+ my_free(pool->threads);
+ pool->threads= new_list;
+ pool->free_list= new_free_list;
+ pool->count= new_count;
+ for (i= 0; i < pool->count; ++i)
+ {
+ mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread);
+ pool->threads[i]->delay_start= false;
+ mysql_cond_signal(&pool->threads[i]->COND_rpl_thread);
+ mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
+ }
+
+ if (!skip_check)
+ {
+ mysql_mutex_lock(&LOCK_active_mi);
+ pool->changing= false;
+ mysql_mutex_unlock(&LOCK_active_mi);
+ }
+ return 0;
+
+err:
+ if (new_list)
+ {
+ while (new_free_list)
+ {
+ rpl_parallel_thread *next= new_free_list->next;
+ mysql_mutex_lock(&new_free_list->LOCK_rpl_thread);
+ new_free_list->delay_start= false;
+ new_free_list->stop= true;
+ while (!new_free_list->running)
+ mysql_cond_wait(&new_free_list->COND_rpl_thread,
+ &new_free_list->LOCK_rpl_thread);
+ while (new_free_list->running)
+ mysql_cond_wait(&new_free_list->COND_rpl_thread,
+ &new_free_list->LOCK_rpl_thread);
+ my_free(new_free_list);
+ new_free_list= next;
+ }
+ my_free(new_list);
+ }
+ if (!skip_check)
+ {
+ mysql_mutex_lock(&LOCK_active_mi);
+ pool->changing= false;
+ mysql_mutex_unlock(&LOCK_active_mi);
+ }
+ return 1;
+}
+
+
+rpl_parallel_thread_pool::rpl_parallel_thread_pool()
+ : count(0), threads(0), free_list(0), changing(false), inited(false)
+{
+}
+
+
+int
+rpl_parallel_thread_pool::init(uint32 size)
+{
+ count= 0;
+ threads= NULL;
+ free_list= NULL;
+
+ mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
+ MY_MUTEX_INIT_SLOW);
+ mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL);
+ changing= false;
+ inited= true;
+
+ return rpl_parallel_change_thread_count(this, size, true);
+}
+
+
+void
+rpl_parallel_thread_pool::destroy()
+{
+ if (!inited)
+ return;
+ rpl_parallel_change_thread_count(this, 0, true);
+ mysql_mutex_destroy(&LOCK_rpl_thread_pool);
+ mysql_cond_destroy(&COND_rpl_thread_pool);
+}
+
+
+struct rpl_parallel_thread *
+rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
+{
+ rpl_parallel_thread *rpt;
+
+ mysql_mutex_lock(&LOCK_rpl_thread_pool);
+ while ((rpt= free_list) == NULL)
+ mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
+ free_list= rpt->next;
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ mysql_mutex_unlock(&LOCK_rpl_thread_pool);
+ rpt->current_entry= entry;
+
+ return rpt;
+}
+
+
+rpl_parallel::rpl_parallel() :
+ current(NULL)
+{
+ my_hash_init(&domain_hash, &my_charset_bin, 32,
+ offsetof(rpl_parallel_entry, domain_id), sizeof(uint32),
+ NULL, NULL, HASH_UNIQUE);
+}
+
+
+rpl_parallel::~rpl_parallel()
+{
+ my_hash_free(&domain_hash);
+}
+
+
+rpl_parallel_entry *
+rpl_parallel::find(uint32 domain_id)
+{
+ struct rpl_parallel_entry *e;
+
+ if (!(e= (rpl_parallel_entry *)my_hash_search(&domain_hash,
+ (const uchar *)&domain_id, 0)))
+ {
+ /* Allocate a new, empty one. */
+ if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e), MYF(0))))
+ return NULL;
+ e->domain_id= domain_id;
+ e->last_server_id= 0;
+ e->last_seq_no= 0;
+ e->last_commit_id= 0;
+ e->active= false;
+ e->rpl_thread= NULL;
+ if (my_hash_insert(&domain_hash, (uchar *)e))
+ {
+ my_free(e);
+ return NULL;
+ }
+ }
+
+ return e;
+}
+
+
+bool
+rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
+{
+ rpl_parallel_entry *e;
+ rpl_parallel_thread *cur_thread;
+ rpl_parallel_thread::queued_event *qev;
+
+ if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev),
+ MYF(0))))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return true;
+ }
+ qev->ev= ev;
+ qev->rli= rli;
+ qev->next= NULL;
+
+ if (ev->get_type_code() == GTID_EVENT)
+ {
+ Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+
+ if (!(e= find(gtid_ev->domain_id)))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return true;
+ }
+
+ /* Check if we already have a worker thread for this entry. */
+ cur_thread= e->rpl_thread;
+ if (cur_thread)
+ {
+ mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
+ if (cur_thread->current_entry != e)
+ {
+ /* Not ours anymore, we need to grab a new one. */
+ mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ e->rpl_thread= cur_thread= NULL;
+ }
+ }
+
+ if (!cur_thread)
+ {
+ /*
+ Nothing else is currently running in this domain. We can spawn a new
+ thread to do this event group in parallel with anything else that might
+ be running in other domains.
+ */
+ if (gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID)
+ {
+ e->last_server_id= gtid_ev->server_id;
+ e->last_seq_no= gtid_ev->seq_no;
+ e->last_commit_id= gtid_ev->commit_id;
+ }
+ else
+ {
+ e->last_server_id= 0;
+ e->last_seq_no= 0;
+ e->last_commit_id= 0;
+ }
+ cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
+ e->rpl_thread->wait_for= NULL; /* ToDo */
+ /* get_thread() returns with the LOCK_rpl_thread locked. */
+ }
+ else if ((gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
+ e->last_commit_id == gtid_ev->commit_id)
+ {
+ /*
+ We are already executing something else in this domain. But the two
+ event groups were committed together in the same group commit on the
+ master, so we can still do them in parallel here on the slave.
+
+ However, the commit of this event must wait for the commit of the prior
+ event, to preserve binlog commit order and visibility across all
+ servers in the replication hierarchy.
+ */
+ rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
+ rpt->wait_for= cur_thread; /* ToDo */
+ mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ e->rpl_thread= cur_thread= rpt;
+ /* get_thread() returns with the LOCK_rpl_thread locked. */
+ }
+ else
+ {
+ /*
+ We are still executing the previous event group for this replication
+ domain, and we have to wait for that to finish before we can start on
+ the next one. So just re-use the thread.
+ */
+ }
+
+ current= e;
+ }
+ else
+ {
+ if (!current)
+ {
+ /* We have no domain_id yet, just run non-parallel. */
+ rpt_handle_event(qev, parent_thd, NULL);
+ return false;
+ }
+ cur_thread= current->rpl_thread;
+ if (cur_thread)
+ {
+ mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
+ if (cur_thread->current_entry != current)
+ {
+ /* Not ours anymore, we need to grab a new one. */
+ mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ cur_thread= NULL;
+ }
+ }
+ if (!cur_thread)
+ {
+ cur_thread= current->rpl_thread=
+ global_rpl_thread_pool.get_thread(current);
+ cur_thread->wait_for= NULL; /* ToDo */
+ }
+ }
+ /*
+ Queue the event for processing.
+ */
+ if (cur_thread->last_in_queue)
+ cur_thread->last_in_queue->next= qev;
+ else
+ cur_thread->event_queue= qev;
+ cur_thread->last_in_queue= qev;
+ mysql_cond_signal(&cur_thread->COND_rpl_thread);
+ mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+
+ return false;
+}
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
new file mode 100644
index 00000000000..7e966f1615c
--- /dev/null
+++ b/sql/rpl_parallel.h
@@ -0,0 +1,74 @@
+#ifndef RPL_PARALLEL_H
+#define RPL_PARALLEL_H
+
+#include "log_event.h"
+
+
+struct rpl_parallel;
+struct rpl_parallel_entry;
+struct rpl_parallel_thread_pool;
+
+class Relay_log_info;
+struct rpl_parallel_thread {
+ bool delay_start;
+ bool running;
+ bool stop;
+ bool free;
+ mysql_mutex_t LOCK_rpl_thread;
+ mysql_cond_t COND_rpl_thread;
+ struct rpl_parallel_thread *next; /* For free list. */
+ struct rpl_parallel_thread_pool *pool;
+ THD *thd;
+ struct rpl_parallel_entry *current_entry;
+ struct queued_event {
+ queued_event *next;
+ Log_event *ev;
+ Relay_log_info *rli;
+ } *event_queue, *last_in_queue;
+ rpl_parallel_thread *wait_for; /* ToDo: change this ... */
+};
+
+
+struct rpl_parallel_thread_pool {
+ uint32 count;
+ struct rpl_parallel_thread **threads;
+ struct rpl_parallel_thread *free_list;
+ mysql_mutex_t LOCK_rpl_thread_pool;
+ mysql_cond_t COND_rpl_thread_pool;
+ bool changing;
+ bool inited;
+
+ rpl_parallel_thread_pool();
+ int init(uint32 size);
+ void destroy();
+ struct rpl_parallel_thread *get_thread(rpl_parallel_entry *entry);
+};
+
+
+struct rpl_parallel_entry {
+ uint32 domain_id;
+ uint32 last_server_id;
+ uint64 last_seq_no;
+ uint64 last_commit_id;
+ bool active;
+ rpl_parallel_thread *rpl_thread;
+};
+struct rpl_parallel {
+ HASH domain_hash;
+ rpl_parallel_entry *current;
+
+ rpl_parallel();
+ ~rpl_parallel();
+ rpl_parallel_entry *find(uint32 domain_id);
+ bool do_event(Relay_log_info *rli, Log_event *ev, THD *thd);
+};
+
+
+extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
+
+
+extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
+ uint32 new_count,
+ bool skip_check= false);
+
+#endif /* RPL_PARALLEL_H */
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 6dd757343fd..452457e9e5a 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -22,6 +22,7 @@
#include "log.h" /* LOG_INFO, MYSQL_BIN_LOG */
#include "sql_class.h" /* THD */
#include "log_event.h"
+#include "rpl_parallel.h"
struct RPL_TABLE_LIST;
class Master_info;
@@ -318,6 +319,7 @@ public:
*/
uint64 gtid_sub_id;
rpl_gtid current_gtid;
+ rpl_parallel parallel;
Relay_log_info(bool is_slave_recovery);
~Relay_log_info();
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 48809417a6c..85baddd3c49 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6555,3 +6555,5 @@ ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO
eng "Cannot modify @@session.gtid_domain_id or @@session.gtid_seq_no inside a transaction"
ER_STORED_FUNCTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO
eng "Cannot modify @@session.gtid_domain_id or @@session.gtid_seq_no inside a stored function or trigger"
+ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE
+ eng "Cannot change @@slave_parallel_threads while another change is in progress"
diff --git a/sql/slave.cc b/sql/slave.cc
index 1734b2c4f76..419fa579a09 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -57,6 +57,8 @@
#include "rpl_tblmap.h"
#include "debug_sync.h"
+#include "rpl_parallel.h"
+
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
@@ -360,6 +362,9 @@ int init_slave()
goto err;
}
+ if (global_rpl_thread_pool.init(opt_slave_parallel_threads))
+ return 1;
+
/*
If --slave-skip-errors=... was not used, the string value for the
system variable has not been set up yet. Do it now.
@@ -947,6 +952,7 @@ void end_slave()
master_info_index= 0;
active_mi= 0;
mysql_mutex_unlock(&LOCK_active_mi);
+ global_rpl_thread_pool.destroy();
free_all_rpl_filters();
DBUG_VOID_RETURN;
}
@@ -3012,7 +3018,8 @@ static int has_temporary_error(THD *thd)
@retval 2 No error calling ev->apply_event(), but error calling
ev->update_pos().
*/
-int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
+int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
+ rpl_parallel_thread *rpt)
{
int exec_res= 0;
@@ -3234,7 +3241,10 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
};);
}
- exec_res= apply_event_and_update_pos(ev, thd, rli);
+ if (opt_slave_parallel_threads > 0)
+ DBUG_RETURN(rli->parallel.do_event(rli, ev, thd));
+
+ exec_res= apply_event_and_update_pos(ev, thd, rli, NULL);
switch (ev->get_type_code()) {
case FORMAT_DESCRIPTION_EVENT:
diff --git a/sql/slave.h b/sql/slave.h
index 565f40b7236..69b0e011a39 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -51,6 +51,7 @@
class Relay_log_info;
class Master_info;
class Master_info_index;
+struct rpl_parallel_thread;
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
@@ -227,7 +228,8 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli);
int rotate_relay_log(Master_info* mi);
-int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli);
+int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
+ rpl_parallel_thread *rpt);
pthread_handler_t handle_slave_io(void *arg);
pthread_handler_t handle_slave_sql(void *arg);
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 4066a04aea7..f63960a4e36 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -57,6 +57,7 @@
#include "threadpool.h"
#include "sql_repl.h"
#include "opt_range.h"
+#include "rpl_parallel.h"
/*
The rule for this file: everything should be 'static'. When a sys_var
@@ -1434,6 +1435,51 @@ static Sys_var_mybool Sys_gtid_strict_mode(
"generate an out-of-order binlog if executed.",
GLOBAL_VAR(opt_gtid_strict_mode),
CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+
+
+static bool
+check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var)
+{
+ bool running;
+
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ mysql_mutex_lock(&LOCK_active_mi);
+ running= master_info_index->give_error_if_slave_running();
+ mysql_mutex_unlock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_global_system_variables);
+ if (running)
+ return true;
+
+ return false;
+}
+
+static bool
+fix_slave_parallel_threads(sys_var *self, THD *thd, enum_var_type type)
+{
+ bool running;
+
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ mysql_mutex_lock(&LOCK_active_mi);
+ running= master_info_index->give_error_if_slave_running();
+ mysql_mutex_unlock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_global_system_variables);
+ if (running || rpl_parallel_change_thread_count(&global_rpl_thread_pool,
+ opt_slave_parallel_threads))
+ return true;
+
+ return false;
+}
+
+
+static Sys_var_ulong Sys_slave_parallel_threads(
+ "slave_parallel_threads",
+ "If non-zero, number of threads to spawn to apply in parallel events "
+ "on the slave that were group-committed on the master or were logged "
+ "with GTID in different replication domains.",
+ GLOBAL_VAR(opt_slave_parallel_threads), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0,16383), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD,
+ NOT_IN_BINLOG, ON_CHECK(check_slave_parallel_threads),
+ ON_UPDATE(fix_slave_parallel_threads));
#endif