diff options
-rw-r--r-- | sql/log.cc | 280 | ||||
-rw-r--r-- | sql/log.h | 2 | ||||
-rw-r--r-- | sql/log_event.cc | 72 | ||||
-rw-r--r-- | sql/log_event.h | 74 | ||||
-rw-r--r-- | sql/log_event_old.cc | 6 | ||||
-rw-r--r-- | sql/log_event_old.h | 12 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 2 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 15 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 6 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 8 | ||||
-rw-r--r-- | sql/rpl_rli.h | 14 | ||||
-rw-r--r-- | sql/rpl_utility.cc | 2 | ||||
-rw-r--r-- | sql/slave.cc | 2 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 2 | ||||
-rw-r--r-- | sql/sql_class.cc | 7 | ||||
-rw-r--r-- | sql/sql_class.h | 2 |
16 files changed, 313 insertions, 193 deletions
diff --git a/sql/log.cc b/sql/log.cc index 61d4428fc18..763eb4177ea 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6542,26 +6542,87 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, } } + +/* + Put a transaction that is ready to commit in the group commit queue. + The transaction is identified by the ENTRY object passed into this function. + + To facilitate group commit for the binlog, we first queue up ourselves in + this function. Then later the first thread to enter the queue waits for + the LOCK_log mutex, and commits for everyone in the queue once it gets the + lock. Any other threads in the queue just wait for the first one to finish + the commit and wake them up. This way, all transactions in the queue get + committed in a single disk operation. + + The return value of this function is TRUE if queued as the first entry in + the queue (meaning this is the leader), FALSE otherwise. + + The main work in this function is when the commit in one transaction has + been marked to wait for the commit of another transaction to happen + first. This is used to support in-order parallel replication, where + transactions can execute out-of-order but need to be committed in-order with + how they happened on the master. The waiting of one commit on another needs + to be integrated with the group commit queue, to ensure that the waiting + transaction can participate in the same group commit as the waited-for + transaction. + + So when we put a transaction in the queue, we check if there were other + transactions already prepared to commit but just waiting for the first one + to commit. If so, we add those to the queue as well, transitively for all + waiters. +*/ + bool -MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry, - wait_for_commit *wfc) +MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry) { group_commit_entry *orig_queue; wait_for_commit *list, *cur, *last; + wait_for_commit *wfc; /* - To facilitate group commit for the binlog, we first queue up ourselves in - the group commit queue. Then the first thread to enter the queue waits for - the LOCK_log mutex, and commits for everyone in the queue once it gets the - lock. Any other threads in the queue just wait for the first one to finish - the commit and wake them up. - - To support in-order parallel replication with group commit, after we add - some transaction to the queue, we check if there were other transactions - already prepared to commit but just waiting for the first one to commit. - If so, we add those to the queue as well, transitively for all waiters. + Check if we need to wait for another transaction to commit before us. + + It is safe to do a quick check without lock first in the case where we do + not have to wait. But if the quick check shows we need to wait, we must do + another safe check under lock, to avoid the race where the other + transaction wakes us up between the check and the wait. */ + wfc= entry->thd->wait_for_commit_ptr; + entry->queued_by_other= false; + if (wfc && wfc->waiting_for_commit) + { + mysql_mutex_lock(&wfc->LOCK_wait_commit); + /* Do an extra check here, this time safely under lock. */ + if (wfc->waiting_for_commit) + { + /* + By setting wfc->opaque_pointer to our own entry, we mark that we are + ready to commit, but waiting for another transaction to commit before + us. + + This other transaction may then take over the commit process for us to + get us included in its own group commit. If this happens, the + queued_by_other flag is set. + */ + wfc->opaque_pointer= entry; + do + { + mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); + } while (wfc->waiting_for_commit); + wfc->opaque_pointer= NULL; + } + mysql_mutex_unlock(&wfc->LOCK_wait_commit); + } + /* + If the transaction we were waiting for has already put us into the group + commit queue (and possibly already done the entire binlog commit for us), + then there is nothing else to do. + */ + if (entry->queued_by_other) + return false; + + /* Now enqueue ourselves in the group commit queue. */ entry->thd->clear_wakeup_ready(); mysql_mutex_lock(&LOCK_prepare_ordered); orig_queue= group_commit_queue; @@ -6574,6 +6635,23 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry, This would be natural to do with recursion, but we want to avoid potentially unbounded recursion blowing the C stack, so we use the list approach instead. + + We keep a list of all the waiters that need to be processed in `list', + linked through the next_subsequent_commit pointer. Initially this list + contains only the entry passed into this function. + + We process entries in the list one by one. The element currently being + processed is pointed to by `cur`, and the element at the end of the list + is pointed to by `last` (we do not use NULL to terminate the list). + + As we process an element, it is first added to the group_commit_queue. + Then any waiters for that element are added at the end of the list, to + be processed in subsequent iterations. This continues until the list + is exhausted, with all elements ever added eventually processed. + + The end result is a breath-first traversal of the tree of waiters, + re-using the next_subsequent_commit pointers in place of extra stack + space in a recursive traversal. */ list= wfc; cur= list; @@ -6594,6 +6672,12 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry, if (!cur) break; // Can happen if initial entry has no wait_for_commit + /* + Check if this transaction has other transaction waiting for it to commit. + + If so, process the waiting transactions, and their waiters and so on, + transitively. + */ if (cur->subsequent_commits_list) { bool have_lock; @@ -6601,63 +6685,66 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry, mysql_mutex_lock(&cur->LOCK_wait_commit); have_lock= true; + /* + Grab the list, now safely under lock, and process it if still + non-empty. + */ waiter= cur->subsequent_commits_list; - /* Check again, now safely under lock. */ - if (waiter) + cur->subsequent_commits_list= NULL; + while (waiter) { - /* Grab the list of waiters and process it. */ - cur->subsequent_commits_list= NULL; - do + wait_for_commit *next= waiter->next_subsequent_commit; + group_commit_entry *entry2= + (group_commit_entry *)waiter->opaque_pointer; + if (entry2) { - wait_for_commit *next= waiter->next_subsequent_commit; - group_commit_entry *entry2= - (group_commit_entry *)waiter->opaque_pointer; - if (entry2) - { - /* - This is another transaction ready to be written to the binary - log. We can put it into the queue directly, without needing a - separate context switch to the other thread. We just set a flag - so that the other thread will know when it wakes up that it was - already processed. - - So put it at the end of the list to be processed in a subsequent - iteration of the outer loop. - */ - entry2->queued_by_other= true; - last->next_subsequent_commit= waiter; - last= waiter; - /* - As a small optimisation, we do not actually need to set - waiter->next_subsequent_commit to NULL, as we can use the - pointer `last' to check for end-of-list. - */ - } - else - { - /* - Wake up the waiting transaction. + /* + This is another transaction ready to be written to the binary + log. We can put it into the queue directly, without needing a + separate context switch to the other thread. We just set a flag + so that the other thread will know when it wakes up that it was + already processed. + + So put it at the end of the list to be processed in a subsequent + iteration of the outer loop. + */ + entry2->queued_by_other= true; + last->next_subsequent_commit= waiter; + last= waiter; + /* + As a small optimisation, we do not actually need to set + waiter->next_subsequent_commit to NULL, as we can use the + pointer `last' to check for end-of-list. + */ + } + else + { + /* + Wake up the waiting transaction. - For this, we need to set the "wakeup running" flag and release - the waitee lock to avoid a deadlock, see comments on - THD::wakeup_subsequent_commits2() for details. - */ - if (have_lock) - { - cur->wakeup_subsequent_commits_running= true; - mysql_mutex_unlock(&cur->LOCK_wait_commit); - have_lock= false; - } - waiter->wakeup(); + For this, we need to set the "wakeup running" flag and release + the waitee lock to avoid a deadlock, see comments on + THD::wakeup_subsequent_commits2() for details. + */ + if (have_lock) + { + have_lock= false; + cur->wakeup_subsequent_commits_running= true; + mysql_mutex_unlock(&cur->LOCK_wait_commit); } - waiter= next; - } while (waiter); + waiter->wakeup(); + } + waiter= next; } if (have_lock) mysql_mutex_unlock(&cur->LOCK_wait_commit); } if (cur == last) break; + /* + Move to the next entry in the flattened list of waiting transactions + that still need to be processed transitively. + */ cur= cur->next_subsequent_commit; entry= (group_commit_entry *)cur->opaque_pointer; DBUG_ASSERT(entry != NULL); @@ -6691,31 +6778,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry, bool MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) { - wait_for_commit *wfc; - bool is_leader; - - wfc= entry->thd->wait_for_commit_ptr; - entry->queued_by_other= false; - if (wfc && wfc->waiting_for_commit) - { - mysql_mutex_lock(&wfc->LOCK_wait_commit); - /* Do an extra check here, this time safely under lock. */ - if (wfc->waiting_for_commit) - { - wfc->opaque_pointer= entry; - do - { - mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); - } while (wfc->waiting_for_commit); - wfc->opaque_pointer= NULL; - } - mysql_mutex_unlock(&wfc->LOCK_wait_commit); - } - - if (entry->queued_by_other) - is_leader= false; - else - is_leader= queue_for_group_commit(entry, wfc); + bool is_leader= queue_for_group_commit(entry); /* The first in the queue handles group commit for all; the others just wait @@ -6756,6 +6819,16 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) if (next) { + /* + Wake up the next thread in the group commit. + + The next thread can be waiting in two different ways, depending on + whether it put itself in the queue, or if it was put in queue by us + because it had to wait for us to commit first. + + So execute the appropriate wakeup, identified by the queued_by_other + field. + */ if (next->queued_by_other) next->thd->wait_for_commit_ptr->wakeup(); else @@ -6840,14 +6913,18 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ mysql_mutex_lock(&LOCK_log); DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log"); - binlog_id= current_binlog_id; mysql_mutex_lock(&LOCK_prepare_ordered); if (opt_binlog_commit_wait_count) wait_for_sufficient_commits(); + /* + Note that wait_for_sufficient_commits() may have released and + re-acquired the LOCK_log and LOCK_prepare_ordered if it needed to wait. + */ current= group_commit_queue; group_commit_queue= NULL; mysql_mutex_unlock(&LOCK_prepare_ordered); + binlog_id= current_binlog_id; /* As the queue is in reverse order of entering, reverse it. */ last_in_queue= current; @@ -7141,6 +7218,13 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, } +/* + Wait for sufficient commits to queue up for group commit, according to the + values of binlog_commit_wait_count and binlog_commit_wait_usec. + + Note that this function may release and re-acquire LOCK_log and + LOCK_prepare_ordered if it needs to wait. +*/ void MYSQL_BIN_LOG::wait_for_sufficient_commits() { @@ -7152,11 +7236,9 @@ MYSQL_BIN_LOG::wait_for_sufficient_commits() mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_prepare_ordered); - count= 0; - for (e= last_head= group_commit_queue; e; e= e->next) - ++count; - if (count >= opt_binlog_commit_wait_count) - return; + for (e= last_head= group_commit_queue, count= 0; e; e= e->next) + if (++count >= opt_binlog_commit_wait_count) + return; mysql_mutex_unlock(&LOCK_log); set_timespec_nsec(wait_until, (ulonglong)1000*opt_binlog_commit_wait_usec); @@ -7178,7 +7260,25 @@ MYSQL_BIN_LOG::wait_for_sufficient_commits() last_head= head; } - mysql_mutex_lock(&LOCK_log); + /* + We must not wait for LOCK_log while holding LOCK_prepare_ordered. + LOCK_log can be held for long periods (eg. we do I/O under it), while + LOCK_prepare_ordered must only be held for short periods. + + In addition, waiting for LOCK_log while holding LOCK_prepare_ordered would + violate locking order of LOCK_log-before-LOCK_prepare_ordered. This could + cause SAFEMUTEX warnings (even if it cannot actually deadlock with current + code, as there can be at most one group commit leader thread at a time). + + So release and re-acquire LOCK_prepare_ordered if we need to wait for the + LOCK_log. + */ + if (mysql_mutex_trylock(&LOCK_log)) + { + mysql_mutex_unlock(&LOCK_prepare_ordered); + mysql_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_prepare_ordered); + } } diff --git a/sql/log.h b/sql/log.h index efb560dc245..8b5fe17e660 100644 --- a/sql/log.h +++ b/sql/log.h @@ -540,7 +540,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG void do_checkpoint_request(ulong binlog_id); void purge(); int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id); - bool queue_for_group_commit(group_commit_entry *entry, wait_for_commit *wfc); + bool queue_for_group_commit(group_commit_entry *entry); bool write_transaction_to_binlog_events(group_commit_entry *entry); void trx_group_commit_leader(group_commit_entry *leader); bool is_xidlist_idle_nolock(); diff --git a/sql/log_event.cc b/sql/log_event.cc index f07c58f4d6b..c0a2ebfa365 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -937,7 +937,7 @@ Log_event::Log_event(const char* buf, #ifndef MYSQL_CLIENT #ifdef HAVE_REPLICATION -int Log_event::do_update_pos(struct rpl_group_info *rgi) +int Log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; /* @@ -3756,7 +3756,7 @@ void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Query_log_event::do_apply_event(struct rpl_group_info *rgi) +int Query_log_event::do_apply_event(rpl_group_info *rgi) { return do_apply_event(rgi, query, q_len); } @@ -3807,8 +3807,8 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error) mismatch. This mismatch could be implemented with a new ER_ code, and to ignore it you would use --slave-skip-errors... */ -int Query_log_event::do_apply_event(struct rpl_group_info *rgi, - const char *query_arg, uint32 q_len_arg) +int Query_log_event::do_apply_event(rpl_group_info *rgi, + const char *query_arg, uint32 q_len_arg) { LEX_STRING new_db; int expected_error,actual_error= 0; @@ -4244,7 +4244,7 @@ end: DBUG_RETURN(thd->is_slave_error); } -int Query_log_event::do_update_pos(struct rpl_group_info *rgi) +int Query_log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; /* @@ -4461,7 +4461,7 @@ bool Start_log_event_v3::write(IO_CACHE* file) other words, no deadlock problem. */ -int Start_log_event_v3::do_apply_event(struct rpl_group_info *rgi) +int Start_log_event_v3::do_apply_event(rpl_group_info *rgi) { DBUG_ENTER("Start_log_event_v3::do_apply_event"); int error= 0; @@ -4810,7 +4810,7 @@ bool Format_description_log_event::write(IO_CACHE* file) #endif #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Format_description_log_event::do_apply_event(struct rpl_group_info *rgi) +int Format_description_log_event::do_apply_event(rpl_group_info *rgi) { int ret= 0; Relay_log_info const *rli= rgi->rli; @@ -4867,7 +4867,7 @@ int Format_description_log_event::do_apply_event(struct rpl_group_info *rgi) DBUG_RETURN(ret); } -int Format_description_log_event::do_update_pos(struct rpl_group_info *rgi) +int Format_description_log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; if (server_id == (uint32) global_system_variables.server_id) @@ -5516,7 +5516,7 @@ void Load_log_event::set_fields(const char* affected_db, 1 Failure */ -int Load_log_event::do_apply_event(NET* net, struct rpl_group_info *rgi, +int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi, bool use_rli_only_for_errors) { LEX_STRING new_db; @@ -5919,7 +5919,7 @@ bool Rotate_log_event::write(IO_CACHE* file) @retval 0 ok */ -int Rotate_log_event::do_update_pos(struct rpl_group_info *rgi) +int Rotate_log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; DBUG_ENTER("Rotate_log_event::do_update_pos"); @@ -6096,7 +6096,7 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file) Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, const Format_description_log_event *description_event) - : Log_event(buf, description_event), seq_no(0) + : Log_event(buf, description_event), seq_no(0), commit_id(0) { uint8 header_size= description_event->common_header_len; uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1]; @@ -6120,8 +6120,6 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, ++buf; commit_id= uint8korr(buf); } - else - commit_id= 0; } @@ -6254,7 +6252,7 @@ Gtid_log_event::pack_info(THD *thd, Protocol *protocol) static char gtid_begin_string[] = "BEGIN"; int -Gtid_log_event::do_apply_event(struct rpl_group_info *rgi) +Gtid_log_event::do_apply_event(rpl_group_info *rgi) { thd->variables.server_id= this->server_id; thd->variables.gtid_domain_id= this->domain_id; @@ -6295,7 +6293,7 @@ Gtid_log_event::do_apply_event(struct rpl_group_info *rgi) int -Gtid_log_event::do_update_pos(struct rpl_group_info *rgi) +Gtid_log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; rli->inc_event_relay_log_pos(); @@ -6477,7 +6475,7 @@ Gtid_list_log_event::write(IO_CACHE *file) int -Gtid_list_log_event::do_apply_event(struct rpl_group_info *rgi) +Gtid_list_log_event::do_apply_event(rpl_group_info *rgi) { Relay_log_info const *rli= rgi->rli; int ret= Log_event::do_apply_event(rgi); @@ -6707,7 +6705,7 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) Intvar_log_event::do_apply_event() */ -int Intvar_log_event::do_apply_event(struct rpl_group_info *rgi) +int Intvar_log_event::do_apply_event(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; /* @@ -6731,7 +6729,7 @@ int Intvar_log_event::do_apply_event(struct rpl_group_info *rgi) return 0; } -int Intvar_log_event::do_update_pos(struct rpl_group_info *rgi) +int Intvar_log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; rli->inc_event_relay_log_pos(); @@ -6818,7 +6816,7 @@ void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Rand_log_event::do_apply_event(struct rpl_group_info *rgi) +int Rand_log_event::do_apply_event(rpl_group_info *rgi) { Relay_log_info const *rli= rgi->rli; /* @@ -6835,7 +6833,7 @@ int Rand_log_event::do_apply_event(struct rpl_group_info *rgi) return 0; } -int Rand_log_event::do_update_pos(struct rpl_group_info *rgi) +int Rand_log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; rli->inc_event_relay_log_pos(); @@ -6950,7 +6948,7 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Xid_log_event::do_apply_event(struct rpl_group_info *rgi) +int Xid_log_event::do_apply_event(rpl_group_info *rgi) { bool res; int err; @@ -7416,7 +7414,7 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int User_var_log_event::do_apply_event(struct rpl_group_info *rgi) +int User_var_log_event::do_apply_event(rpl_group_info *rgi) { Item *it= 0; CHARSET_INFO *charset; @@ -7505,7 +7503,7 @@ int User_var_log_event::do_apply_event(struct rpl_group_info *rgi) DBUG_RETURN(0); } -int User_var_log_event::do_update_pos(struct rpl_group_info *rgi) +int User_var_log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; rli->inc_event_relay_log_pos(); @@ -7682,7 +7680,7 @@ Slave_log_event::Slave_log_event(const char* buf, #ifndef MYSQL_CLIENT -int Slave_log_event::do_apply_event(struct rpl_group_info *rgi) +int Slave_log_event::do_apply_event(rpl_group_info *rgi) { if (mysql_bin_log.is_open()) return mysql_bin_log.write(this); @@ -7726,7 +7724,7 @@ void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) Start_log_event_v3::do_apply_event(), not here. Because if we come here, the master was sane. */ -int Stop_log_event::do_update_pos(struct rpl_group_info *rgi) +int Stop_log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; /* @@ -7958,7 +7956,7 @@ void Create_file_log_event::pack_info(THD *thd, Protocol *protocol) */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Create_file_log_event::do_apply_event(struct rpl_group_info *rgi) +int Create_file_log_event::do_apply_event(rpl_group_info *rgi) { char proc_info[17+FN_REFLEN+10], *fname_buf; char *ext; @@ -8140,7 +8138,7 @@ int Append_block_log_event::get_create_or_append() const Append_block_log_event::do_apply_event() */ -int Append_block_log_event::do_apply_event(struct rpl_group_info *rgi) +int Append_block_log_event::do_apply_event(rpl_group_info *rgi) { char proc_info[17+FN_REFLEN+10], *fname= proc_info+17; int fd; @@ -8291,7 +8289,7 @@ void Delete_file_log_event::pack_info(THD *thd, Protocol *protocol) */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Delete_file_log_event::do_apply_event(struct rpl_group_info *rgi) +int Delete_file_log_event::do_apply_event(rpl_group_info *rgi) { char fname[FN_REFLEN+10]; Relay_log_info const *rli= rgi->rli; @@ -8391,7 +8389,7 @@ void Execute_load_log_event::pack_info(THD *thd, Protocol *protocol) Execute_load_log_event::do_apply_event() */ -int Execute_load_log_event::do_apply_event(struct rpl_group_info *rgi) +int Execute_load_log_event::do_apply_event(rpl_group_info *rgi) { char fname[FN_REFLEN+10]; char *ext; @@ -8664,7 +8662,7 @@ void Execute_load_query_log_event::pack_info(THD *thd, Protocol *protocol) int -Execute_load_query_log_event::do_apply_event(struct rpl_group_info *rgi) +Execute_load_query_log_event::do_apply_event(rpl_group_info *rgi) { char *p; char *buf; @@ -9072,7 +9070,7 @@ int Rows_log_event::do_add_row_data(uchar *row_data, size_t length) #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Rows_log_event::do_apply_event(struct rpl_group_info *rgi) +int Rows_log_event::do_apply_event(rpl_group_info *rgi) { Relay_log_info const *rli= rgi->rli; DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)"); @@ -9538,7 +9536,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) @retval non-zero Error in the statement commit */ int -Rows_log_event::do_update_pos(struct rpl_group_info *rgi) +Rows_log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; DBUG_ENTER("Rows_log_event::do_update_pos"); @@ -9777,7 +9775,7 @@ void Annotate_rows_log_event::print(FILE *file, PRINT_EVENT_INFO *pinfo) #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Annotate_rows_log_event::do_apply_event(struct rpl_group_info *rgi) +int Annotate_rows_log_event::do_apply_event(rpl_group_info *rgi) { m_save_thd_query_txt= thd->query(); m_save_thd_query_len= thd->query_length(); @@ -9787,7 +9785,7 @@ int Annotate_rows_log_event::do_apply_event(struct rpl_group_info *rgi) #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Annotate_rows_log_event::do_update_pos(struct rpl_group_info *rgi) +int Annotate_rows_log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; rli->inc_event_relay_log_pos(); @@ -10296,7 +10294,7 @@ check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list) DBUG_RETURN(res); } -int Table_map_log_event::do_apply_event(struct rpl_group_info *rgi) +int Table_map_log_event::do_apply_event(rpl_group_info *rgi) { RPL_TABLE_LIST *table_list; char *db_mem, *tname_mem; @@ -10415,7 +10413,7 @@ Table_map_log_event::do_shall_skip(Relay_log_info *rli) return continue_group(rli); } -int Table_map_log_event::do_update_pos(struct rpl_group_info *rgi) +int Table_map_log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; rli->inc_event_relay_log_pos(); @@ -11847,7 +11845,7 @@ Incident_log_event::print(FILE *file, #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) int -Incident_log_event::do_apply_event(struct rpl_group_info *rgi) +Incident_log_event::do_apply_event(rpl_group_info *rgi) { Relay_log_info const *rli= rgi->rli; DBUG_ENTER("Incident_log_event::do_apply_event"); diff --git a/sql/log_event.h b/sql/log_event.h index 6d6a330fc48..1dc7f516727 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1317,7 +1317,7 @@ public: @see do_apply_event */ - int apply_event(struct rpl_group_info *rgi) + int apply_event(rpl_group_info *rgi) { return do_apply_event(rgi); } @@ -1331,7 +1331,7 @@ public: @see do_update_pos */ - int update_pos(struct rpl_group_info *rgi) + int update_pos(rpl_group_info *rgi) { return do_update_pos(rgi); } @@ -1432,7 +1432,7 @@ protected: @retval 0 Event applied successfully @retval errno Error code if event application failed */ - virtual int do_apply_event(struct rpl_group_info *rgi) + virtual int do_apply_event(rpl_group_info *rgi) { return 0; /* Default implementation does nothing */ } @@ -1461,7 +1461,7 @@ protected: 1). Observe that handler errors are returned by the do_apply_event() function, and not by this one. */ - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); /** @@ -1986,10 +1986,10 @@ public: public: /* !!! Public in this patch to allow old usage */ #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); - virtual int do_apply_event(struct rpl_group_info *rgi); - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); - int do_apply_event(struct rpl_group_info *rgi, + int do_apply_event(rpl_group_info *rgi, const char *query_arg, uint32 q_len_arg); static bool peek_is_commit_rollback(const char *event_start, @@ -2103,7 +2103,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); #endif }; @@ -2416,12 +2416,12 @@ public: public: /* !!! Public in this patch to allow old usage */ #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi) + virtual int do_apply_event(rpl_group_info *rgi) { return do_apply_event(thd->slave_net,rgi,0); } - int do_apply_event(NET *net, struct rpl_group_info *rgi, + int do_apply_event(NET *net, rpl_group_info *rgi, bool use_rli_only_for_errors); #endif }; @@ -2500,7 +2500,7 @@ public: protected: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info*) { /* @@ -2596,8 +2596,8 @@ public: static bool is_version_before_checksum(const master_version_split *version_split); protected: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif }; @@ -2675,8 +2675,8 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg, private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif }; @@ -2754,8 +2754,8 @@ class Rand_log_event: public Log_event private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif }; @@ -2803,7 +2803,7 @@ class Xid_log_event: public Log_event private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif }; @@ -2870,8 +2870,8 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif }; @@ -2905,7 +2905,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli) { /* @@ -3007,7 +3007,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif }; @@ -3119,8 +3119,8 @@ public: uint16 flags, bool is_transactional, uint64 commit_id); #ifdef HAVE_REPLICATION void pack_info(THD *thd, Protocol *protocol); - virtual int do_apply_event(struct rpl_group_info *rgi); - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif #else @@ -3249,7 +3249,7 @@ public: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) bool to_packet(String *packet); bool write(IO_CACHE *file); - virtual int do_apply_event(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); #endif static bool peek(const char *event_start, uint32 event_len, uint8 checksum_alg, @@ -3328,7 +3328,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); #endif }; @@ -3383,7 +3383,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); #endif }; @@ -3424,7 +3424,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); #endif }; @@ -3464,7 +3464,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); #endif }; @@ -3563,7 +3563,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); #endif }; @@ -3635,8 +3635,8 @@ public: #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) private: - virtual int do_apply_event(struct rpl_group_info *rgi); - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info*); #endif @@ -4050,8 +4050,8 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif @@ -4278,8 +4278,8 @@ protected: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); /* @@ -4612,7 +4612,7 @@ public: #endif #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); #endif virtual bool write_data_header(IO_CACHE *file); diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index d3e9d47d64a..db1b3fb5a9f 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -36,7 +36,7 @@ // Old implementation of do_apply_event() int -Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, struct rpl_group_info *rgi) +Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) { DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)"); int error= 0; @@ -1451,7 +1451,7 @@ int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Old_rows_log_event::do_apply_event(struct rpl_group_info *rgi) +int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) { DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)"); int error= 0; @@ -1834,7 +1834,7 @@ Old_rows_log_event::do_shall_skip(Relay_log_info *rli) } int -Old_rows_log_event::do_update_pos(struct rpl_group_info *rgi) +Old_rows_log_event::do_update_pos(rpl_group_info *rgi) { Relay_log_info *rli= rgi->rli; DBUG_ENTER("Old_rows_log_event::do_update_pos"); diff --git a/sql/log_event_old.h b/sql/log_event_old.h index ad51349ef80..7c35b875dc4 100644 --- a/sql/log_event_old.h +++ b/sql/log_event_old.h @@ -214,8 +214,8 @@ protected: private: #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) - virtual int do_apply_event(struct rpl_group_info *rgi); - virtual int do_update_pos(struct rpl_group_info *rgi); + virtual int do_apply_event(rpl_group_info *rgi); + virtual int do_update_pos(rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); /* @@ -275,7 +275,7 @@ private: #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) - int do_apply_event(Old_rows_log_event*, struct rpl_group_info *rgi); + int do_apply_event(Old_rows_log_event*, rpl_group_info *rgi); /* Primitive to prepare for a sequence of row executions. @@ -403,7 +403,7 @@ private: #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) // use old definition of do_apply_event() - virtual int do_apply_event(struct rpl_group_info *rgi) + virtual int do_apply_event(rpl_group_info *rgi) { return Old_rows_log_event::do_apply_event(this, rgi); } // primitives for old version of do_apply_event() @@ -481,7 +481,7 @@ private: #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) // use old definition of do_apply_event() - virtual int do_apply_event(struct rpl_group_info *rgi) + virtual int do_apply_event(rpl_group_info *rgi) { return Old_rows_log_event::do_apply_event(this, rgi); } // primitives for old version of do_apply_event() @@ -556,7 +556,7 @@ private: #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) // use old definition of do_apply_event() - virtual int do_apply_event(struct rpl_group_info *rgi) + virtual int do_apply_event(rpl_group_info *rgi) { return Old_rows_log_event::do_apply_event(this, rgi); } // primitives for old version of do_apply_event() diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index bc826e9bdb5..a1b14ad3255 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -62,7 +62,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid) int -rpl_slave_state::record_and_update_gtid(THD *thd, struct rpl_group_info *rgi) +rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) { uint64 sub_id; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 7970f15eb49..7cf2c9162ff 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -62,7 +62,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, struct rpl_parallel_thread *rpt) { int err; - struct rpl_group_info *rgi= qev->rgi; + rpl_group_info *rgi= qev->rgi; Relay_log_info *rli= rgi->rli; THD *thd= rgi->thd; @@ -128,8 +128,9 @@ handle_rpl_parallel_thread(void *arg) old_msg= thd->proc_info; thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread, "Waiting for work from SQL thread"); - while (!rpt->stop && !thd->killed && !(events= rpt->event_queue)) + while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); + /* Mark that this thread is now executing */ rpt->free= false; rpt->event_queue= rpt->last_in_queue= NULL; thd->exit_cond(old_msg); @@ -145,9 +146,15 @@ handle_rpl_parallel_thread(void *arg) uint64 wait_start_sub_id; bool end_of_group; + /* Handle a new event group, which will be initiated by a GTID event. */ if (event_type == GTID_EVENT) { in_event_group= true; + /* + If the standalone flag is set, then this event group consists of a + single statement (possibly preceeded by some Intvar_log_event and + similar), without any terminating COMMIT/ROLLBACK/XID. + */ group_standalone= (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 & Gtid_log_event::FL_STANDALONE)); @@ -540,12 +547,12 @@ rpl_parallel::wait_for_done() bool -rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev) +rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) { rpl_parallel_entry *e; rpl_parallel_thread *cur_thread; rpl_parallel_thread::queued_event *qev; - struct rpl_group_info *rgi; + rpl_group_info *rgi; Relay_log_info *rli= serial_rgi->rli; enum Log_event_type typ; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 304263c3477..adbb1a18526 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -23,7 +23,7 @@ struct rpl_parallel_thread { struct queued_event { queued_event *next; Log_event *ev; - struct rpl_group_info *rgi; + rpl_group_info *rgi; } *event_queue, *last_in_queue; }; @@ -59,7 +59,7 @@ struct rpl_parallel_entry { mysql_mutex_t LOCK_parallel_entry; mysql_cond_t COND_parallel_entry; uint64 current_sub_id; - struct rpl_group_info *current_group_info; + rpl_group_info *current_group_info; /* The sub_id of the last event group in the previous batch of group-committed transactions. @@ -78,7 +78,7 @@ struct rpl_parallel { ~rpl_parallel(); rpl_parallel_entry *find(uint32 domain_id); void wait_for_done(); - bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev); + bool do_event(rpl_group_info *serial_rgi, Log_event *ev); }; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 73658d10624..49547718230 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1193,7 +1193,7 @@ bool Relay_log_info::cached_charset_compare(char *charset) const void Relay_log_info::stmt_done(my_off_t event_master_log_pos, time_t event_creation_time, THD *thd, - struct rpl_group_info *rgi) + rpl_group_info *rgi) { #ifndef DBUG_OFF extern uint debug_not_change_ts_if_art_event; @@ -1265,6 +1265,11 @@ void Relay_log_info::cleanup_context(THD *thd, bool error) { DBUG_ENTER("Relay_log_info::cleanup_context"); + /* + In parallel replication, different THDs can be used from different + parallel threads. But in single-threaded mode, only the THD of the main + SQL thread is allowed. + */ DBUG_ASSERT(opt_slave_parallel_threads > 0 || sql_thd == thd); /* 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them, @@ -1552,6 +1557,7 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev) uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id); if (!sub_id) { + /* Out of memory caused hash insertion to fail. */ return 1; } rgi->gtid_sub_id= sub_id; diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 91c5c65d33b..4d954d1c8aa 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -422,7 +422,7 @@ public: */ void stmt_done(my_off_t event_log_pos, time_t event_creation_time, THD *thd, - struct rpl_group_info *rgi); + rpl_group_info *rgi); /** @@ -521,10 +521,14 @@ private: /* This is data for various state needed to be kept for the processing of - one event group in the SQL thread. + one event group (transaction) during replication. - For single-threaded replication it is linked from the RLI, for parallel - replication it is linked into each event group being executed in parallel. + In single-threaded replication, there will be one global rpl_group_info and + one global Relay_log_info per master connection. They will be linked + together. + + In parallel replication, there will be one rpl_group_info object for + each running thd. All rpl_group_info will share the same Relay_log_info. */ struct rpl_group_info { @@ -555,7 +559,7 @@ struct rpl_group_info for the wrong commit). */ uint64 wait_commit_sub_id; - struct rpl_group_info *wait_commit_group_info; + rpl_group_info *wait_commit_group_info; /* If non-zero, the event group must wait for this sub_id to be committed before the execution of the event group is allowed to start. diff --git a/sql/rpl_utility.cc b/sql/rpl_utility.cc index f734b95edc1..40fda63f396 100644 --- a/sql/rpl_utility.cc +++ b/sql/rpl_utility.cc @@ -1143,7 +1143,7 @@ bool Deferred_log_events::is_empty() return array.elements == 0; } -bool Deferred_log_events::execute(struct rpl_group_info *rgi) +bool Deferred_log_events::execute(rpl_group_info *rgi) { bool res= false; diff --git a/sql/slave.cc b/sql/slave.cc index 777ab9c8468..e0cc595213d 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3019,7 +3019,7 @@ static int has_temporary_error(THD *thd) ev->update_pos(). */ int apply_event_and_update_pos(Log_event* ev, THD* thd, - struct rpl_group_info *rgi, + rpl_group_info *rgi, rpl_parallel_thread *rpt) { int exec_res= 0; diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 1b6713f1bc3..04cb4adcb2c 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -80,7 +80,7 @@ void mysql_client_binlog_statement(THD* thd) my_bool have_fd_event= TRUE; int err; Relay_log_info *rli; - struct rpl_group_info *rgi; + rpl_group_info *rgi; rli= thd->rli_fake; if (!rli) diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 43d810d27d4..66b28c87ac9 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -5666,6 +5666,10 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) waiting_for_commit= false; else { + /* + Put ourself at the head of the waitee's list of transactions that must + wait for it to commit first. + */ this->next_subsequent_commit= waitee->subsequent_commits_list; waitee->subsequent_commits_list= this; } @@ -5704,7 +5708,7 @@ wait_for_commit::wait_for_prior_commit2() The waiter needs to lock the waitee to delete itself from the list in unregister_wait_for_prior_commit(). Thus wakeup_subsequent_commits() can not - hold its own lock while locking waiters, lest we deadlock. + hold its own lock while locking waiters, as this could lead to deadlock. So we need to prevent unregister_wait_for_prior_commit() running while wakeup is in progress - otherwise the unregister could complete before the wakeup, @@ -5727,6 +5731,7 @@ wait_for_commit::wait_for_prior_commit2() would not be woken up until next wakeup, which could be potentially much later than necessary. */ + void wait_for_commit::wakeup_subsequent_commits2() { diff --git a/sql/sql_class.h b/sql/sql_class.h index e7f593db62b..c34c100171d 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1615,7 +1615,7 @@ struct wait_for_commit */ bool waiting_for_commit; /* - Flag set when wakeup_subsequent_commits_running() is active, see commonts + Flag set when wakeup_subsequent_commits_running() is active, see comments on that function for details. */ bool wakeup_subsequent_commits_running; |