From 86362129a2f70349cc79adb0825d5bc8f9a61f27 Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Wed, 25 Jun 2014 15:17:03 +0200 Subject: MDEV-6120: When slave stops with error, error message should indicate the failing GTID If replication breaks in GTID mode, it is not trivial to determine the GTID of the failing event group. This is a problem, as such GTID is needed eg. to explicitly set @@gtid_slave_pos to skip to after that event group, or to compare errors on different servers, etc. Fix by ensuring that relevant slave errors logged to the error log include the GTID of the event group containing the problem event. --- sql/rpl_parallel.cc | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index e72d3470a7f..974a02e3968 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -388,7 +388,7 @@ handle_rpl_parallel_thread(void *arg) { DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); thd->send_kill_message(); - slave_output_error_info(rgi->rli, thd); + slave_output_error_info(rgi, thd); signal_error_to_sql_driver_thread(thd, rgi, 1); /* Even though we were killed, we need to continue waiting for the @@ -467,7 +467,7 @@ handle_rpl_parallel_thread(void *arg) if (res < 0) { /* Error. */ - slave_output_error_info(rgi->rli, thd); + slave_output_error_info(rgi, thd); signal_error_to_sql_driver_thread(thd, rgi, 1); } else if (!res) @@ -514,7 +514,7 @@ handle_rpl_parallel_thread(void *arg) if (unlikely(err) && !rgi->worker_error) { - slave_output_error_info(rgi->rli, thd); + slave_output_error_info(rgi, thd); signal_error_to_sql_driver_thread(thd, rgi, err); } if (end_of_group) @@ -1018,10 +1018,11 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt) if it is still available. Otherwise a new worker thread is allocated. */ rpl_parallel_thread * -rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond, +rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage, bool reuse) { uint32 idx; + Relay_log_info *rli= rgi->rli; rpl_parallel_thread *thr; idx= rpl_thread_idx; @@ -1066,7 +1067,7 @@ rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond, debug_sync_set_action(rli->sql_driver_thd, STRING_WITH_LEN("now SIGNAL wait_queue_killed")); };); - slave_output_error_info(rli, rli->sql_driver_thd); + slave_output_error_info(rgi, rli->sql_driver_thd); return NULL; } else @@ -1417,7 +1418,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, instead re-use a thread that we queued for previously. */ cur_thread= - e->choose_thread(rli, &did_enter_cond, &old_stage, typ != GTID_EVENT); + e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, + typ != GTID_EVENT); if (!cur_thread) { /* This means we were killed. The error is already signalled. */ -- cgit v1.2.1 From 370318f8948a9f6f4366588beff4f3b8b4344a20 Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Fri, 27 Jun 2014 13:34:29 +0200 Subject: MDEV-6386: Assertion `thd->transaction.stmt.is_empty() || thd->in_sub_stmt || (thd->state_flags & Open_tables_state::BACKUPS_AVAIL)' fails with parallel replication The direct cause of the assertion was missing error handling in record_gtid(). If ha_commit_trans() fails for the statement commit, there was missing code to catch the error and do ha_rollback_trans() in this case; this caused close_thread_tables() to assert. Normally, this error case is not hit, but in this case it was triggered due to another bug: When a transaction T1 fails during parallel replication, the code would signal following transactions that they could start to run without properly marking the error condition. This caused subsequent transactions to incorrectly start replicating, only to get an error later during their own commit step. This was particularly serious if the subsequent transactions were DDL or MyISAM updates, which cannot be rolled back and would leave replication in an inconsistent state. Fixed by 1) in case of error, only signal following transactions to continue once the error has been properly marked and those transactions will know not to start; and 2) implement proper error handling in record_gtid() in the case that statement commit fails. --- sql/rpl_parallel.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 974a02e3968..53769107661 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -486,7 +486,7 @@ handle_rpl_parallel_thread(void *arg) (event_type == QUERY_EVENT && (((Query_log_event *)events->ev)->is_commit() || ((Query_log_event *)events->ev)->is_rollback())); - if (group_ending) + if (group_ending && likely(!rgi->worker_error)) { DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit"); rgi->mark_start_commit(); @@ -498,7 +498,7 @@ handle_rpl_parallel_thread(void *arg) processing between the event groups as a simple way to ensure that everything is stopped and cleaned up correctly. */ - if (!rgi->worker_error && !skip_event_group) + if (likely(!rgi->worker_error) && !skip_event_group) err= rpt_handle_event(events, rpt); else err= thd->wait_for_prior_commit(); -- cgit v1.2.1 From b0b60f249807b6c2d423313350d9ad66693c2d1e Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 8 May 2014 14:20:18 +0200 Subject: MDEV-5262: Missing retry after temp error in parallel replication Start implementing that an event group can be re-tried in parallel replication if it fails with a temporary error (like deadlock). Patch is very incomplete, just some very basic retry works. Stuff still missing (not complete list): - Handle moving to the next relay log file, if event group to be retried spans multiple relay log files. - Handle refcounting of relay log files, to ensure that we do not purge a relay log file and then later attempt to re-execute events out of it. - Handle description_event_for_exec - we need to save this somehow for the possible retry - and use the correct one in case it differs between relay logs. - Do another retry attempt in case the first retry also fails. - Limit the max number of retries. - Lots of testing will be needed for the various edge cases. --- sql/rpl_parallel.cc | 163 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 150 insertions(+), 13 deletions(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 53769107661..f0147527957 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -7,15 +7,6 @@ /* Code for optional parallel execution of replicated events on the slave. - - ToDo list: - - - Retry of failed transactions is not yet implemented for the parallel case. - - - All the waits (eg. in struct wait_for_commit and in - rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill, - everything needs to be correctly rolled back and stopped in all threads, - to ensure a consistent slave replication state. */ struct rpl_parallel_thread_pool global_rpl_thread_pool; @@ -197,6 +188,105 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond, } +static int +retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog) +{ + /* ToDo */ + return 0; +} + + +static int +retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, + rpl_parallel_thread::queued_event *orig_qev) +{ + IO_CACHE rlog; + File fd; + const char *errmsg= NULL; + inuse_relaylog *ir= rgi->relay_log; + uint64 event_count= 0; + uint64 events_to_execute= rgi->retry_event_count; + Relay_log_info *rli= rgi->rli; + int err= 0; + ulonglong cur_offset, old_offset; + char log_name[FN_REFLEN]; + THD *thd= rgi->thd; + +do_retry: + rgi->cleanup_context(thd, 1); + + mysql_mutex_lock(&rli->data_lock); + ++rli->retried_trans; + statistic_increment(slave_retried_transactions, LOCK_status); + mysql_mutex_unlock(&rli->data_lock); + + strcpy(log_name, ir->name); + if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) + return 1; + cur_offset= rgi->retry_start_offset; + my_b_seek(&rlog, cur_offset); + + do + { + Log_event_type event_type; + Log_event *ev; + + old_offset= cur_offset; + ev= Log_event::read_log_event(&rlog, 0, + rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */, + opt_slave_sql_verify_checksum); + cur_offset= my_b_tell(&rlog); + + if (!ev) + { + err= 1; + goto err; + } + ev->thd= thd; + event_type= ev->get_type_code(); + if (Log_event::is_group_event(event_type)) + { + rpl_parallel_thread::queued_event *qev; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset, + cur_offset - old_offset); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + if (!qev) + { + delete ev; + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + err= 1; + goto err; + } + err= rpt_handle_event(qev, rpt); + ++event_count; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->free_qev(qev); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + } + else + err= retry_handle_relay_log_rotate(ev, &rlog); + delete_or_keep_event_post_apply(rgi, event_type, ev); + + if (err) + { + /* ToDo: Need to here also handle second retry. */ + goto err; + } + + // ToDo: handle too many retries. + + } while (event_count < events_to_execute); + +err: + + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + return err; +} + + pthread_handler_t handle_rpl_parallel_thread(void *arg) { @@ -499,7 +589,23 @@ handle_rpl_parallel_thread(void *arg) everything is stopped and cleaned up correctly. */ if (likely(!rgi->worker_error) && !skip_event_group) + { + ++rgi->retry_event_count; err= rpt_handle_event(events, rpt); + DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_1_100", + if (rgi->current_gtid.domain_id == 0 && + rgi->current_gtid.server_id == 1 && + rgi->current_gtid.seq_no == 100 && + rgi->retry_event_count == 4) + { + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); + my_error(ER_LOCK_DEADLOCK, MYF(0)); + err= 1; + };); + if (err && has_temporary_error(thd)) + err= retry_event_group(rgi, rpt, events); + } else err= thd->wait_for_prior_commit(); @@ -802,8 +908,7 @@ err: rpl_parallel_thread::queued_event * -rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, - Relay_log_info *rli) +rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size) { queued_event *qev; mysql_mutex_assert_owner(&LOCK_rpl_thread); @@ -817,6 +922,17 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, qev->ev= ev; qev->event_size= event_size; qev->next= NULL; + return qev; +} + + +rpl_parallel_thread::queued_event * +rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, + Relay_log_info *rli) +{ + queued_event *qev= get_qev_common(ev, event_size); + if (!qev) + return NULL; strcpy(qev->event_relay_log_name, rli->event_relay_log_name); qev->event_relay_log_pos= rli->event_relay_log_pos; qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; @@ -825,6 +941,24 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, } +rpl_parallel_thread::queued_event * +rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev, + const char *relay_log_name, + ulonglong event_pos, ulonglong event_size) +{ + queued_event *qev= get_qev_common(ev, event_size); + if (!qev) + return NULL; + qev->rgi= orig_qev->rgi; + strcpy(qev->event_relay_log_name, relay_log_name); + qev->event_relay_log_pos= event_pos; + qev->future_event_relay_log_pos= event_pos+event_size; + strcpy(qev->future_event_master_log_name, + orig_qev->future_event_master_log_name); + return qev; +} + + void rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) { @@ -836,7 +970,7 @@ rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) rpl_group_info* rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, - rpl_parallel_entry *e) + rpl_parallel_entry *e, ulonglong event_size) { rpl_group_info *rgi; mysql_mutex_assert_owner(&LOCK_rpl_thread); @@ -864,6 +998,9 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, return NULL; } rgi->parallel_entry= e; + rgi->relay_log= rli->last_inuse_relaylog; + rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size; + rgi->retry_event_count= 0; return rgi; } @@ -1439,7 +1576,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { Gtid_log_event *gtid_ev= static_cast(ev); - if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e))) + if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size))) { cur_thread->free_qev(qev); abandon_worker_thread(rli->sql_driver_thd, cur_thread, -- cgit v1.2.1 From d60915692cd02cc70b7eb8245c9ac6eab5df3d0c Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 13 May 2014 13:42:06 +0200 Subject: MDEV-5262: Missing retry after temp error in parallel replication Implement that if first retry fails, we can do another attempt. Add testcases to test multi-retry that succeeds in second attempt, and multi-retry that eventually fails due to exceeding slave_trans_retries. --- sql/rpl_parallel.cc | 73 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 25 deletions(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index f0147527957..0b35e3c9fdc 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -188,6 +188,22 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond, } +#ifndef DBUG_OFF +static int +dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) +{ + if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 100 && + rgi->retry_event_count == 4) + { + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); + my_error(ER_LOCK_DEADLOCK, MYF(0)); + return 1; + } + return 0; +} +#endif + static int retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog) { @@ -204,15 +220,18 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, File fd; const char *errmsg= NULL; inuse_relaylog *ir= rgi->relay_log; - uint64 event_count= 0; + uint64 event_count; uint64 events_to_execute= rgi->retry_event_count; Relay_log_info *rli= rgi->rli; - int err= 0; + int err; ulonglong cur_offset, old_offset; char log_name[FN_REFLEN]; THD *thd= rgi->thd; + ulong retries= 0; do_retry: + event_count= 0; + err= 0; rgi->cleanup_context(thd, 1); mysql_mutex_lock(&rli->data_lock); @@ -268,10 +287,26 @@ do_retry: else err= retry_handle_relay_log_rotate(ev, &rlog); delete_or_keep_event_post_apply(rgi, event_type, ev); - + DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100", + if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd);); + DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100", + err= dbug_simulate_tmp_error(rgi, thd);); if (err) { - /* ToDo: Need to here also handle second retry. */ + if (has_temporary_error(thd)) + { + ++retries; + if (retries < slave_trans_retries) + { + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + goto do_retry; + } + sql_print_error("Slave worker thread retried transaction %lu time(s) " + "in vain, giving up. Consider raising the value of " + "the slave_transaction_retries variable.", + slave_trans_retries); + } goto err; } @@ -592,29 +627,23 @@ handle_rpl_parallel_thread(void *arg) { ++rgi->retry_event_count; err= rpt_handle_event(events, rpt); - DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_1_100", - if (rgi->current_gtid.domain_id == 0 && - rgi->current_gtid.server_id == 1 && - rgi->current_gtid.seq_no == 100 && - rgi->retry_event_count == 4) - { - thd->clear_error(); - thd->get_stmt_da()->reset_diagnostics_area(); - my_error(ER_LOCK_DEADLOCK, MYF(0)); - err= 1; - };); + delete_or_keep_event_post_apply(rgi, event_type, events->ev); + DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100", + err= dbug_simulate_tmp_error(rgi, thd);); if (err && has_temporary_error(thd)) err= retry_event_group(rgi, rpt, events); } else + { + delete events->ev; err= thd->wait_for_prior_commit(); + } end_of_group= in_event_group && ((group_standalone && !Log_event::is_part_of_group(event_type)) || group_ending); - delete_or_keep_event_post_apply(rgi, event_type, events->ev); events->next= qevs_to_free; qevs_to_free= events; @@ -1528,15 +1557,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, if (typ == GTID_EVENT) { - uint32 domain_id; - if (likely(typ == GTID_EVENT)) - { - Gtid_log_event *gtid_ev= static_cast(ev); - domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? - 0 : gtid_ev->domain_id); - } - else - domain_id= 0; + Gtid_log_event *gtid_ev= static_cast(ev); + uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? + 0 : gtid_ev->domain_id); if (!(e= find(domain_id))) { my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); -- cgit v1.2.1 From 787c470cef54574e744eb5dfd9153d837fe67e45 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 15 May 2014 15:52:08 +0200 Subject: MDEV-5262: Missing retry after temp error in parallel replication Handle retry of event groups that span multiple relay log files. - If retry reaches the end of one relay log file, move on to the next. - Handle refcounting of relay log files, and avoid purging relay log files until all event groups have completed that might have needed them for transaction retry. --- sql/rpl_parallel.cc | 154 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 111 insertions(+), 43 deletions(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 0b35e3c9fdc..67d61b7cf11 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -204,20 +204,14 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) } #endif -static int -retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog) -{ - /* ToDo */ - return 0; -} - static int retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, rpl_parallel_thread::queued_event *orig_qev) { IO_CACHE rlog; - File fd; + LOG_INFO linfo; + File fd= (File)-1; const char *errmsg= NULL; inuse_relaylog *ir= rgi->relay_log; uint64 event_count; @@ -241,7 +235,10 @@ do_retry: strcpy(log_name, ir->name); if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) - return 1; + { + err= 1; + goto err; + } cur_offset= rgi->retry_start_offset; my_b_seek(&rlog, cur_offset); @@ -249,43 +246,85 @@ do_retry: { Log_event_type event_type; Log_event *ev; + rpl_parallel_thread::queued_event *qev; - old_offset= cur_offset; - ev= Log_event::read_log_event(&rlog, 0, - rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */, - opt_slave_sql_verify_checksum); - cur_offset= my_b_tell(&rlog); - - if (!ev) - { - err= 1; - goto err; - } - ev->thd= thd; - event_type= ev->get_type_code(); - if (Log_event::is_group_event(event_type)) + /* The loop is here so we can try again the next relay log file on EOF. */ + for (;;) { - rpl_parallel_thread::queued_event *qev; + old_offset= cur_offset; + ev= Log_event::read_log_event(&rlog, 0, + rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */, + opt_slave_sql_verify_checksum); + cur_offset= my_b_tell(&rlog); - mysql_mutex_lock(&rpt->LOCK_rpl_thread); - qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset, - cur_offset - old_offset); - mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - if (!qev) + if (ev) + break; + if (rlog.error < 0) { - delete ev; - my_error(ER_OUT_OF_RESOURCES, MYF(0)); + errmsg= "slave SQL thread aborted because of I/O error"; err= 1; goto err; } - err= rpt_handle_event(qev, rpt); - ++event_count; - mysql_mutex_lock(&rpt->LOCK_rpl_thread); - rpt->free_qev(qev); - mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + if (rlog.error > 0) + { + sql_print_error("Slave SQL thread: I/O error reading " + "event(errno: %d cur_log->error: %d)", + my_errno, rlog.error); + errmsg= "Aborting slave SQL thread because of partial event read"; + err= 1; + goto err; + } + /* EOF. Move to the next relay log. */ + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + fd= (File)-1; + + /* Find the next relay log file. */ + if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) || + (err= rli->relay_log.find_next_log(&linfo, 1))) + { + char buff[22]; + sql_print_error("next log error: %d offset: %s log: %s", + err, + llstr(linfo.index_file_offset, buff), + log_name); + goto err; + } + strmake_buf(log_name ,linfo.log_file_name); + + if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) + { + err= 1; + goto err; + } + /* Loop to try again on the new log file. */ } - else - err= retry_handle_relay_log_rotate(ev, &rlog); + + event_type= ev->get_type_code(); + if (!Log_event::is_group_event(event_type)) + { + delete ev; + continue; + } + ev->thd= thd; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset, + cur_offset - old_offset); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + if (!qev) + { + delete ev; + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + err= 1; + goto err; + } + err= rpt_handle_event(qev, rpt); + ++event_count; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->free_qev(qev); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + delete_or_keep_event_post_apply(rgi, event_type, ev); DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100", if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd);); @@ -300,6 +339,7 @@ do_retry: { end_io_cache(&rlog); mysql_file_close(fd, MYF(MY_WME)); + fd= (File)-1; goto do_retry; } sql_print_error("Slave worker thread retried transaction %lu time(s) " @@ -309,15 +349,17 @@ do_retry: } goto err; } - - // ToDo: handle too many retries. - } while (event_count < events_to_execute); err: - end_io_cache(&rlog); - mysql_file_close(fd, MYF(MY_WME)); + if (fd >= 0) + { + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + } + if (errmsg) + sql_print_error("Error reading relay log event: %s", errmsg); return err; } @@ -340,6 +382,8 @@ handle_rpl_parallel_thread(void *arg) rpl_sql_thread_info sql_info(NULL); size_t total_event_size; int err; + inuse_relaylog *last_ir; + uint64 accumulated_ir_count; struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; @@ -683,12 +727,34 @@ handle_rpl_parallel_thread(void *arg) rpt->free_rgi(rgis_to_free); rgis_to_free= next; } + last_ir= NULL; + accumulated_ir_count= 0; while (qevs_to_free) { rpl_parallel_thread::queued_event *next= qevs_to_free->next; + inuse_relaylog *ir= qevs_to_free->ir; + /* Batch up refcount update to reduce use of synchronised operations. */ + if (last_ir != ir) + { + if (last_ir) + { + my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock); + my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); + my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock); + accumulated_ir_count= 0; + } + last_ir= ir; + } + ++accumulated_ir_count; rpt->free_qev(qevs_to_free); qevs_to_free= next; } + if (last_ir) + { + my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock); + my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); + my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock); + } if ((events= rpt->event_queue) != NULL) { @@ -1711,6 +1777,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, Queue the event for processing. */ rli->event_relay_log_pos= rli->future_event_relay_log_pos; + qev->ir= rli->last_inuse_relaylog; + ++qev->ir->queued_count; cur_thread->enqueue(qev); unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread, &did_enter_cond, &old_stage); -- cgit v1.2.1 From 629b822913348cec56ec7a80a236f0ba2e613585 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 3 Jun 2014 10:31:11 +0200 Subject: MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel replication causing replication to fail. In parallel replication, we run transactions from the master in parallel, but force them to commit in the same order they did on the master. If we force T1 to commit before T2, but T2 holds eg. a row lock that is needed by T1, we get a deadlock when T2 waits until T1 has committed. Usually, we do not run T1 and T2 in parallel if there is a chance that they can have conflicting locks like this, but there are certain edge cases where it can occasionally happen (eg. MDEV-5914, MDEV-5941, MDEV-6020). The bug was that this would cause replication to hang, eventually getting a lock timeout and causing the slave to stop with error. With this patch, InnoDB will report back to the upper layer whenever a transactions T1 is about to do a lock wait on T2. If T1 and T2 are parallel replication transactions, and T2 needs to commit later than T1, we can thus detect the deadlock; we then kill T2, setting a flag that causes it to catch the kill and convert it to a deadlock error; this error will then cause T2 to roll back and release its locks (so that T1 can commit), and later T2 will be re-tried and eventually also committed. The kill happens asynchroneously in a slave background thread; this is necessary, as the reporting from InnoDB about lock waits happen deep inside the locking code, at a point where it is not possible to directly call THD::awake() due to mutexes held. Deadlock is assumed to be (very) rarely occuring, so this patch tries to minimise the performance impact on the normal case where no deadlocks occur, rather than optimise the handling of the occasional deadlock. Also fix transaction retry due to deadlock when it happens after a transaction already signalled to later transactions that it started to commit. In this case we need to undo this signalling (and later redo it when we commit again during retry), so following transactions will not start too early. Also add a missing thd->send_kill_message() that got triggered during testing (this corrects an incorrect fix for MySQL Bug#58933). --- sql/rpl_parallel.cc | 135 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 115 insertions(+), 20 deletions(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 67d61b7cf11..65461b3f990 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -156,6 +156,7 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, mysql_mutex_unlock(&entry->LOCK_parallel_entry); thd->clear_error(); + thd->reset_killed(); thd->get_stmt_da()->reset_diagnostics_area(); wfc->wakeup_subsequent_commits(rgi->worker_error); } @@ -188,6 +189,25 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond, } +static void +register_wait_for_prior_event_group_commit(rpl_group_info *rgi, + rpl_parallel_entry *entry) +{ + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + if (rgi->wait_commit_sub_id > entry->last_committed_sub_id) + { + /* + Register that the commit of this event group must wait for the + commit of the previous event group to complete before it may + complete itself, so that we preserve commit order. + */ + wait_for_commit *waitee= + &rgi->wait_commit_group_info->commit_orderer; + rgi->commit_orderer.register_wait_for_prior_commit(waitee); + } +} + + #ifndef DBUG_OFF static int dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) @@ -205,6 +225,40 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) #endif +/* + If we detect a deadlock due to eg. storage engine locks that conflict with + the fixed commit order, then the later transaction will be killed + asynchroneously to allow the former to complete its commit. + + In this case, we convert the 'killed' error into a deadlock error, and retry + the later transaction. */ +static void +convert_kill_to_deadlock_error(rpl_group_info *rgi) +{ + THD *thd= rgi->thd; + + if (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED && + rgi->killed_for_retry) + { + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); + my_error(ER_LOCK_DEADLOCK, MYF(0)); + rgi->killed_for_retry= false; + thd->reset_killed(); + } +} + + +static bool +is_group_ending(Log_event *ev, Log_event_type event_type) +{ + return event_type == XID_EVENT || + (event_type == QUERY_EVENT && + (((Query_log_event *)ev)->is_commit() || + ((Query_log_event *)ev)->is_rollback())); +} + + static int retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, rpl_parallel_thread::queued_event *orig_qev) @@ -221,11 +275,46 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, ulonglong cur_offset, old_offset; char log_name[FN_REFLEN]; THD *thd= rgi->thd; + rpl_parallel_entry *entry= rgi->parallel_entry; ulong retries= 0; do_retry: event_count= 0; err= 0; + + /* + If we already started committing before getting the deadlock (or other + error) that caused us to need to retry, we have already signalled + subsequent transactions that we have started committing. This is + potentially a problem, as now we will rollback, and if subsequent + transactions would start to execute now, they could see an unexpected + state of the database and get eg. key not found or duplicate key error. + + However, to get a deadlock in the first place, there must have been + another earlier transaction that is waiting for us. Thus that other + transaction has _not_ yet started to commit, and any subsequent + transactions will still be waiting at this point. + + So here, we decrement back the count of transactions that started + committing (if we already incremented it), undoing the effect of an + earlier mark_start_commit(). Then later, when the retry succeeds and we + commit again, we can do a new mark_start_commit() and eventually wake up + subsequent transactions at the proper time. + + We need to do the unmark before the rollback, to be sure that the + transaction we deadlocked with will not signal that it started to commit + until after the unmark. + */ + rgi->unmark_start_commit(); + + /* + We might get the deadlock error that causes the retry during commit, while + sitting in wait_for_prior_commit(). If this happens, we will have a + pending error in the wait_for_commit object. So clear this by + unregistering (and later re-registering) the wait. + */ + if(thd->wait_for_commit_ptr) + thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); rgi->cleanup_context(thd, 1); mysql_mutex_lock(&rli->data_lock); @@ -233,6 +322,10 @@ do_retry: statistic_increment(slave_retried_transactions, LOCK_status); mysql_mutex_unlock(&rli->data_lock); + mysql_mutex_lock(&entry->LOCK_parallel_entry); + register_wait_for_prior_event_group_commit(rgi, entry); + mysql_mutex_unlock(&entry->LOCK_parallel_entry); + strcpy(log_name, ir->name); if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) { @@ -319,6 +412,9 @@ do_retry: err= 1; goto err; } + if (is_group_ending(ev, event_type)) + rgi->mark_start_commit(); + err= rpt_handle_event(qev, rpt); ++event_count; mysql_mutex_lock(&rpt->LOCK_rpl_thread); @@ -332,6 +428,7 @@ do_retry: err= dbug_simulate_tmp_error(rgi, thd);); if (err) { + convert_kill_to_deadlock_error(rgi); if (has_temporary_error(thd)) { ++retries; @@ -599,17 +696,9 @@ handle_rpl_parallel_thread(void *arg) if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) skip_event_group= true; - else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id) - { - /* - Register that the commit of this event group must wait for the - commit of the previous event group to complete before it may - complete itself, so that we preserve commit order. - */ - wait_for_commit *waitee= - &rgi->wait_commit_group_info->commit_orderer; - rgi->commit_orderer.register_wait_for_prior_commit(waitee); - } + else + register_wait_for_prior_event_group_commit(rgi, entry); + unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, &did_enter_cond, &old_stage); @@ -651,10 +740,7 @@ handle_rpl_parallel_thread(void *arg) } } - group_ending= event_type == XID_EVENT || - (event_type == QUERY_EVENT && - (((Query_log_event *)events->ev)->is_commit() || - ((Query_log_event *)events->ev)->is_rollback())); + group_ending= is_group_ending(events->ev, event_type); if (group_ending && likely(!rgi->worker_error)) { DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit"); @@ -674,8 +760,12 @@ handle_rpl_parallel_thread(void *arg) delete_or_keep_event_post_apply(rgi, event_type, events->ev); DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100", err= dbug_simulate_tmp_error(rgi, thd);); - if (err && has_temporary_error(thd)) - err= retry_event_group(rgi, rpt, events); + if (err) + { + convert_kill_to_deadlock_error(rgi); + if (has_temporary_error(thd) && slave_trans_retries > 0) + err= retry_event_group(rgi, rpt, events); + } } else { @@ -691,10 +781,14 @@ handle_rpl_parallel_thread(void *arg) events->next= qevs_to_free; qevs_to_free= events; - if (unlikely(err) && !rgi->worker_error) + if (unlikely(err)) { - slave_output_error_info(rgi, thd); - signal_error_to_sql_driver_thread(thd, rgi, err); + if (!rgi->worker_error) + { + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, err); + } + thd->reset_killed(); } if (end_of_group) { @@ -1096,6 +1190,7 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, rgi->relay_log= rli->last_inuse_relaylog; rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size; rgi->retry_event_count= 0; + rgi->killed_for_retry= false; return rgi; } -- cgit v1.2.1 From bd4153a8c2978af6d39a60a7f1c4e13c68fbbaab Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 10 Jun 2014 10:13:15 +0200 Subject: MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel replication causing replication to fail. Remove the temporary fix for MDEV-5914, which used READ COMMITTED for parallel replication worker threads. Replace it with a better, more selective solution. The issue is with certain edge cases of InnoDB gap locks, for example between INSERT and ranged DELETE. It is possible for the gap lock set by the DELETE to block the INSERT, if the DELETE runs first, while the record lock set by INSERT does not block the DELETE, if the INSERT runs first. This can cause a conflict between the two in parallel replication on the slave even though they ran without conflicts on the master. With this patch, InnoDB will ask the server layer about the two involved transactions before blocking on a gap lock. If the server layer tells InnoDB that the transactions are already fixed wrt. commit order, as they are in parallel replication, InnoDB will ignore the gap lock and allow the two transactions to proceed in parallel, avoiding the conflict. Improve the fix for MDEV-6020. When InnoDB itself detects a deadlock, it now asks the server layer for any preferences about which transaction to roll back. In case of parallel replication with two transactions T1 and T2 fixed to commit T1 before T2, the server layer will ask InnoDB to roll back T2 as the deadlock victim, not T1. This helps in some cases to avoid excessive deadlock rollback, as T2 will in any case need to wait for T1 to complete before it can itself commit. Also some misc. fixes found during development and testing: - Remove thd_rpl_is_parallel(), it is not used or needed. - Use KILL_CONNECTION instead of KILL_QUERY when a parallel replication worker thread is killed to resolve a deadlock with fixed commit ordering. There are some cases, eg. in sql/sql_parse.cc, where a KILL_QUERY can be ignored if the query otherwise completed successfully, and this could cause the deadlock kill to be lost, so that the deadlock was not correctly resolved. - Fix random test failure due to missing wait_for_binlog_checkpoint.inc. - Make sure that deadlock or other temporary errors during parallel replication are not printed to the the error log; there were some places around the replication code with extra error logging. These conditions can occur occasionally and are handled automatically without breaking replication, so they should not pollute the error log. - Fix handling of rgi->gtid_sub_id. We need to be able to access this also at the end of a transaction, to be able to detect and resolve deadlocks due to commit ordering. But this value was also used as a flag to mark whether record_gtid() had been called, by being set to zero, losing the value. Now, introduce a separate flag rgi->gtid_pending, so rgi->gtid_sub_id remains valid for the entire duration of the transaction. - Fix one place where the code to handle ignored errors called reset_killed() unconditionally, even if no error was caught that should be ignored. This could cause loss of a deadlock kill signal, breaking deadlock detection and resolution. - Fix a couple of missing mysql_reset_thd_for_next_command(). This could cause a prior error condition to remain for the next event executed, causing assertions about errors already being set and possibly giving incorrect error handling for following event executions. - Fix code that cleared thd->rgi_slave in the parallel replication worker threads after each event execution; this caused the deadlock detection and handling code to not be able to correctly process the associated transactions as belonging to replication worker threads. - Remove useless error code in slave_background_kill_request(). - Fix bug where wfc->wakeup_error was not cleared at wait_for_commit::unregister_wait_for_prior_commit(). This could cause the error condition to wrongly propagate to a later wait_for_prior_commit(), causing spurious ER_PRIOR_COMMIT_FAILED errors. - Do not put the binlog background thread into the processlist. It causes too many result differences in mtr, but also it probably is not useful for users to pollute the process list with a system thread that does not really perform any user-visible tasks... --- sql/rpl_parallel.cc | 47 +++++------------------------------------------ 1 file changed, 5 insertions(+), 42 deletions(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 65461b3f990..621ebc024bb 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -23,7 +23,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, Relay_log_info *rli= rgi->rli; THD *thd= rgi->thd; - thd->rgi_slave= rgi; thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter; /* ToDo: Access to thd, and what about rli, split out a parallel part? */ @@ -35,7 +34,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); - thd->rgi_slave= NULL; thread_safe_increment64(&rli->executed_entries, &slave_executed_entries_lock); @@ -236,8 +234,9 @@ static void convert_kill_to_deadlock_error(rpl_group_info *rgi) { THD *thd= rgi->thd; + int err_code= thd->get_stmt_da()->sql_errno(); - if (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED && + if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) && rgi->killed_for_retry) { thd->clear_error(); @@ -510,39 +509,6 @@ handle_rpl_parallel_thread(void *arg) thd->set_time(); thd->variables.lock_wait_timeout= LONG_TIMEOUT; thd->system_thread_info.rpl_sql_info= &sql_info; - /* - For now, we need to run the replication parallel worker threads in - READ COMMITTED. This is needed because gap locks are not symmetric. - For example, a gap lock from a DELETE blocks an insert intention lock, - but not vice versa. So an INSERT followed by DELETE can group commit - on the master, but if we are unlucky with thread scheduling we can - then deadlock on the slave because the INSERT ends up waiting for a - gap lock from the DELETE (and the DELETE in turn waits for the INSERT - in wait_for_prior_commit()). See also MDEV-5914. - - It should be mostly safe to run in READ COMMITTED in the slave anyway. - The commit order is already fixed from on the master, so we do not - risk logging into the binlog in an incorrect order between worker - threads (one that would cause different results if executed on a - lower-level slave that uses this slave as a master). The only - potential problem is with transactions run in a different master - connection (using multi-source replication), or run directly on the - slave by an application; when using READ COMMITTED we are not - guaranteed serialisability of binlogged statements. - - In practice, this is unlikely to be an issue. In GTID mode, such - parallel transactions from multi-source or application must in any - case use a different replication domain, in which case binlog order - by definition must be independent between the different domain. Even - in non-GTID mode, normally one will assume that the external - transactions are not conflicting with those applied by the slave, so - that isolation level should make no difference. It would be rather - strange if the result of applying query events from one master would - depend on the timing and nature of other queries executed from - different multi-source connections or done directly on the slave by - an application. Still, something to be aware of. - */ - thd->variables.tx_isolation= ISO_READ_COMMITTED; mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->thd= thd; @@ -598,7 +564,7 @@ handle_rpl_parallel_thread(void *arg) continue; } - group_rgi= rgi; + thd->rgi_slave= group_rgi= rgi; gco= rgi->gco; /* Handle a new event group, which will be initiated by a GTID event. */ if ((event_type= events->ev->get_type_code()) == GTID_EVENT) @@ -607,7 +573,6 @@ handle_rpl_parallel_thread(void *arg) PSI_stage_info old_stage; uint64 wait_count; - thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation; in_event_group= true; /* If the standalone flag is set, then this event group consists of a @@ -618,9 +583,7 @@ handle_rpl_parallel_thread(void *arg) (0 != (static_cast(events->ev)->flags2 & Gtid_log_event::FL_STANDALONE)); - /* Save this, as it gets cleared when the event group commits. */ event_gtid_sub_id= rgi->gtid_sub_id; - rgi->thd= thd; /* @@ -796,7 +759,7 @@ handle_rpl_parallel_thread(void *arg) finish_event_group(thd, event_gtid_sub_id, entry, rgi); rgi->next= rgis_to_free; rgis_to_free= rgi; - group_rgi= rgi= NULL; + thd->rgi_slave= group_rgi= rgi= NULL; skip_event_group= false; DEBUG_SYNC(thd, "rpl_parallel_end_of_group"); } @@ -879,7 +842,7 @@ handle_rpl_parallel_thread(void *arg) in_event_group= false; mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->free_rgi(group_rgi); - group_rgi= NULL; + thd->rgi_slave= group_rgi= NULL; skip_event_group= false; } if (!in_event_group) -- cgit v1.2.1 From 98fc5b3af8b1954e4480ac33d30493aa4de66ec4 Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Tue, 8 Jul 2014 12:54:47 +0200 Subject: MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel replication causing replication to fail. After-review changes. For this patch in 10.0, we do not introduce a new public storage engine API, we just fix the InnoDB/XtraDB issues. In 10.1, we will make a better public API that can be used for all storage engines (MDEV-6429). Eliminate the background thread that did deadlock kills asynchroneously. Instead, we ensure that the InnoDB/XtraDB code can handle doing the kill from inside the deadlock detection code (when thd_report_wait_for() needs to kill a later thread to resolve a deadlock). (We preserve the part of the original patch that introduces dedicated mutex and condition for the slave init thread, to remove the abuse of LOCK_thread_count for start/stop synchronisation of the slave init thread). --- sql/rpl_parallel.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 621ebc024bb..98753865568 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -240,7 +240,6 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi) rgi->killed_for_retry) { thd->clear_error(); - thd->get_stmt_da()->reset_diagnostics_area(); my_error(ER_LOCK_DEADLOCK, MYF(0)); rgi->killed_for_retry= false; thd->reset_killed(); @@ -325,7 +324,7 @@ do_retry: register_wait_for_prior_event_group_commit(rgi, entry); mysql_mutex_unlock(&entry->LOCK_parallel_entry); - strcpy(log_name, ir->name); + strmake_buf(log_name, ir->name); if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) { err= 1; -- cgit v1.2.1 From 8f21a3166908d71b5828d50bfce65b480508c6c1 Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Thu, 10 Jul 2014 13:55:53 +0200 Subject: MDEV-6435: Assertion `m_status == DA_ERROR' failed in Diagnostics_area::sql_errno() with parallel replication When a MyISAM query is killed midway, the query is logged to the binlog marked with the error. The slave does not attempt to run the query, but aborts with a suitable error message in the error log for the DBA to act on. In this case, the parallel replication code would check the sql_errno() code, even no my_error() had been set. In debug builds, this causes an assertion. Fixed the code to check that we actually have an error set before querying for an error code. --- sql/rpl_parallel.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 98753865568..0d23248539c 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -234,8 +234,11 @@ static void convert_kill_to_deadlock_error(rpl_group_info *rgi) { THD *thd= rgi->thd; - int err_code= thd->get_stmt_da()->sql_errno(); + int err_code; + if (!thd->get_stmt_da()->is_error()) + return; + err_code= thd->get_stmt_da()->sql_errno(); if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) && rgi->killed_for_retry) { -- cgit v1.2.1 From ec05fea0a003cbbccd0dc830477e93b040b113ab Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Wed, 13 Aug 2014 13:34:28 +0200 Subject: MDEV-6549, failing to update gtid_slave_pos for a transaction that was retried. The bug was that in some cases, if a replicated transaction was rolled back due to deadlock, during the subsequent retry of that transaction, the gtid_slave_pos would _not_ be updated with the new GTID, leaving the GTID position of the slave incorrect. Fix this by ensuring during the retry that we clear the flag that marks that the GTID has already been recorded in gtid_slave_pos, so that the update of gtid_slave_pos will be done again during the retry. In the original bug, the symptom was an assertion due to OPTION_GTID_BEGIN not being cleared during the retry of the transaction. The reason was some code in handling of a COMMIT query event, which would not clear the flag when not recording a GTID in gtid_slave_pos. This commit also fixes that code to always clear the OPTION_GTID_BEGIN flag for clarity, though it is actually not possible for OPTION_GTID_BEGIN to become set unless a GTID is pending for update (after fixing the bug described above). --- sql/rpl_parallel.cc | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 0d23248539c..91bd636d3f5 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -318,6 +318,15 @@ do_retry: thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); rgi->cleanup_context(thd, 1); + /* + If we retry due to a deadlock kill that occured during the commit step, we + might have already updated (but not committed) an update of table + mysql.gtid_slave_pos, and cleared the gtid_pending flag. Now we have + rolled back any such update, so we must set the gtid_pending flag back to + true so that we will do a new update when/if we succeed with the retry. + */ + rgi->gtid_pending= true; + mysql_mutex_lock(&rli->data_lock); ++rli->retried_trans; statistic_increment(slave_retried_transactions, LOCK_status); -- cgit v1.2.1 From cfa1ce81bb7992c362958bb95f41325ce2109834 Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Fri, 15 Aug 2014 11:31:13 +0200 Subject: MDEV-6551: Some replication errors are ignored if slave_parallel_threads > 0 The problem occured when using parallel replication, and an error occured that caused the SQL thread to stop when the IO thread had already reached a following binlog file from the master (or otherwise performed a relay log rotation). In this case, the Rotate Event at the end of the relay log file could still be executed, even though an earlier event in that relay log file had gotten an error. This would cause the position to be incorrectly updated, so that upon restart of the SQL thread, the event that had failed would be silently skipped and ignored, causing replication corruption. Fixed by checking before executing Rotate Event, whether an earlier event has failed. If so, the Rotate Event is not executed, just dequeued, same as for other normal events following a failing event. --- sql/rpl_parallel.cc | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 91bd636d3f5..eeb66821809 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -22,18 +22,22 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, rpl_group_info *rgi= qev->rgi; Relay_log_info *rli= rgi->rli; THD *thd= rgi->thd; + Log_event *ev; + + DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT); + ev= qev->ev; thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter; + ev->thd= thd; - /* ToDo: Access to thd, and what about rli, split out a parallel part? */ - mysql_mutex_lock(&rli->data_lock); - qev->ev->thd= thd; strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name); rgi->event_relay_log_name= rgi->event_relay_log_name_buf; rgi->event_relay_log_pos= qev->event_relay_log_pos; rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); - err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); + mysql_mutex_lock(&rli->data_lock); + /* Mutex will be released in apply_event_and_update_pos(). */ + err= apply_event_and_update_pos(ev, thd, rgi, rpt); thread_safe_increment64(&rli->executed_entries, &slave_executed_entries_lock); @@ -47,6 +51,8 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) { int cmp; Relay_log_info *rli; + rpl_parallel_entry *e; + /* Events that are not part of an event group, such as Format Description, Stop, GTID List and such, are executed directly in the driver SQL thread, @@ -57,6 +63,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) if ((thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions) return; + + /* Do not update position if an earlier event group caused an error abort. */ + DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE); + e= qev->entry_for_queued; + if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort) + return; + rli= qev->rgi->rli; mysql_mutex_lock(&rli->data_lock); cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name); @@ -566,7 +579,7 @@ handle_rpl_parallel_thread(void *arg) bool end_of_group, group_ending; total_event_size+= events->event_size; - if (!events->ev) + if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE) { handle_queued_pos_update(thd, events); events->next= qevs_to_free; @@ -574,6 +587,7 @@ handle_rpl_parallel_thread(void *arg) events= next; continue; } + DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); thd->rgi_slave= group_rgi= rgi; gco= rgi->gco; @@ -1082,6 +1096,7 @@ rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size) my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev)); return NULL; } + qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT; qev->ev= ev; qev->event_size= event_size; qev->next= NULL; @@ -1824,7 +1839,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return 1; } /* - Queue an empty event, so that the position will be updated in a + Queue a position update, so that the position will be updated in a reasonable way relative to other events: - If the currently executing events are queued serially for a single @@ -1835,7 +1850,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, least the position will not be updated until one of them has reached the current point. */ - qev->ev= NULL; + qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE; + qev->entry_for_queued= e; } else { -- cgit v1.2.1 From 4cb1e0eea0c965df3e91c1637f17127c1e7d6db7 Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Wed, 2 Jul 2014 12:51:45 +0200 Subject: MDEV-6321: close_temporary_tables() in format description event not serialised correctly When a master server starts up, it logs a special format_description event at the start of a new binlog to mark that is has restarted. This is used by a slave to drop all temporary tables - this is needed in case the master crashed and did not have a chance to send explicit DROP TEMPORARY TABLE statements to the slave. In parallel replication, we need to be careful when dropping the temporary tables - we need to be sure that no prior events are still executing that might be using the temporary tables to be dropped, _and_ that no following events have started executing that might have created new temporary tables that should not be dropped. This was not handled correctly, which could cause errors about access to not existing temporary tables or even crashes. This patch implements that such format_description events cause serialisation of event execution; all prior events are executed to completion first, then the format_description event is executed, dropping temporary tables, then following events are queued for execution. Master restarts should be sufficiently infrequent that the resulting loss of parallelism should be of minimal impact. --- sql/rpl_parallel.cc | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index eeb66821809..9ba155bebb4 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1617,6 +1617,36 @@ rpl_parallel::workers_idle() } +void +rpl_parallel::wait_for_workers_idle(THD *thd) +{ + uint32 i, max_i; + + max_i= domain_hash.records; + for (i= 0; i < max_i; ++i) + { + bool active; + wait_for_commit my_orderer; + struct rpl_parallel_entry *e; + + e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); + mysql_mutex_lock(&e->LOCK_parallel_entry); + if ((active= (e->current_sub_id > e->last_committed_sub_id))) + { + wait_for_commit *waitee= &e->current_group_info->commit_orderer; + my_orderer.register_wait_for_prior_commit(waitee); + thd->wait_for_commit_ptr= &my_orderer; + } + mysql_mutex_unlock(&e->LOCK_parallel_entry); + if (active) + { + my_orderer.wait_for_prior_commit(thd); + thd->wait_for_commit_ptr= NULL; + } + } +} + + /* This is used when we get an error during processing in do_event(); We will not queue any event to the thread, but we still need to wake it up @@ -1684,6 +1714,24 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); + if (typ == FORMAT_DESCRIPTION_EVENT) + { + Format_description_log_event *fdev= + static_cast(ev); + if (fdev->created) + { + /* + This format description event marks a new binlog after a master server + restart. We are going to close all temporary tables to clean up any + possible left-overs after a prior master crash. + + Thus we need to wait for all prior events to execute to completion, + in case they need access to any of the temporary tables. + */ + wait_for_workers_idle(rli->sql_driver_thd); + } + } + /* Stop queueing additional event groups once the SQL thread is requested to stop. -- cgit v1.2.1 From 453c29c3f772d7bec69be2a2bf5a5747444f0a77 Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Tue, 19 Aug 2014 14:26:42 +0200 Subject: MDEV-6321: close_temporary_tables() in format description event not serialised correctly Follow-up patch, fixing a possible deadlock issue. If the master crashes in the middle of an event group, there can be an active transaction in a worker thread when we encounter the following master restart format description event. In this case, we need to notify that worker thread to abort and roll back the partial event group. Otherwise a deadlock occurs: the worker thread waits for the commit that never arrives, and the SQL driver thread waits for the worker thread to complete its event group, which it never does. --- sql/rpl_parallel.cc | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 9ba155bebb4..21234d7fd38 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -587,6 +587,30 @@ handle_rpl_parallel_thread(void *arg) events= next; continue; } + else if (events->typ == + rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART) + { + if (in_event_group) + { + /* + Master restarted (crashed) in the middle of an event group. + So we need to roll back and discard that event group. + */ + group_rgi->cleanup_context(thd, 1); + in_event_group= false; + finish_event_group(thd, group_rgi->gtid_sub_id, + events->entry_for_queued, group_rgi); + + group_rgi->next= rgis_to_free; + rgis_to_free= group_rgi; + thd->rgi_slave= group_rgi= NULL; + } + + events->next= qevs_to_free; + qevs_to_free= events; + events= next; + continue; + } DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); thd->rgi_slave= group_rgi= rgi; @@ -1617,6 +1641,54 @@ rpl_parallel::workers_idle() } +int +rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, + Format_description_log_event *fdev) +{ + uint32 idx; + rpl_parallel_thread *thr; + rpl_parallel_thread::queued_event *qev; + Relay_log_info *rli= rgi->rli; + + /* + We only need to queue the server restart if we still have a thread working + on a (potentially partial) event group. + + If the last thread we queued for has finished, then it cannot have any + partial event group that needs aborting. + + Thus there is no need for the full complexity of choose_thread(). We only + need to check if we have a current worker thread, and queue for it if so. + */ + idx= rpl_thread_idx; + thr= rpl_threads[idx]; + if (!thr) + return 0; + mysql_mutex_lock(&thr->LOCK_rpl_thread); + if (thr->current_owner != &rpl_threads[idx]) + { + /* No active worker thread, so no need to queue the master restart. */ + mysql_mutex_unlock(&thr->LOCK_rpl_thread); + return 0; + } + + if (!(qev= thr->get_qev(fdev, 0, rli))) + { + mysql_mutex_unlock(&thr->LOCK_rpl_thread); + return 1; + } + + qev->rgi= rgi; + qev->typ= rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART; + qev->entry_for_queued= this; + qev->ir= rli->last_inuse_relaylog; + ++qev->ir->queued_count; + thr->enqueue(qev); + mysql_mutex_unlock(&thr->LOCK_rpl_thread); + return 0; +} + + void rpl_parallel::wait_for_workers_idle(THD *thd) { @@ -1727,7 +1799,16 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, Thus we need to wait for all prior events to execute to completion, in case they need access to any of the temporary tables. + + We also need to notify the worker thread running the prior incomplete + event group (if any), as such event group signifies an incompletely + written group cut short by a master crash, and must be rolled back. */ + if (current->queue_master_restart(serial_rgi, fdev)) + { + delete ev; + return 1; + } wait_for_workers_idle(rli->sql_driver_thd); } } -- cgit v1.2.1 From c6a60f6d79862bd82d517a25c6e13e6d141173b6 Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Wed, 20 Aug 2014 10:59:39 +0200 Subject: MDEV-6321: close_temporary_tables() in format description event not serialised correctly After-review fixes. Mainly catching if the wait in wait_for_workers_idle() is aborted due to kill. In this case, we should return an error and not proceed to execute the format description event, as other threads might still be running for a bit until the error is caught in all threads. --- sql/rpl_parallel.cc | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 21234d7fd38..038abb351ea 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1689,11 +1689,15 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, } -void +int rpl_parallel::wait_for_workers_idle(THD *thd) { uint32 i, max_i; + /* + The domain_hash is only accessed by the SQL driver thread, so it is safe + to iterate over without a lock. + */ max_i= domain_hash.records; for (i= 0; i < max_i; ++i) { @@ -1712,10 +1716,13 @@ rpl_parallel::wait_for_workers_idle(THD *thd) mysql_mutex_unlock(&e->LOCK_parallel_entry); if (active) { - my_orderer.wait_for_prior_commit(thd); + int err= my_orderer.wait_for_prior_commit(thd); thd->wait_for_commit_ptr= NULL; + if (err) + return err; } } + return 0; } @@ -1804,12 +1811,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, event group (if any), as such event group signifies an incompletely written group cut short by a master crash, and must be rolled back. */ - if (current->queue_master_restart(serial_rgi, fdev)) + if (current->queue_master_restart(serial_rgi, fdev) || + wait_for_workers_idle(rli->sql_driver_thd)) { delete ev; return 1; } - wait_for_workers_idle(rli->sql_driver_thd); } } -- cgit v1.2.1 From 4d4ce59d2be82a1a0aebe8d527da1e660395c063 Mon Sep 17 00:00:00 2001 From: Sergei Golubchik Date: Mon, 8 Sep 2014 12:59:57 +0200 Subject: compilation fixes for WITH_ATOMIC_OPS=rwlocks --- sql/rpl_parallel.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 038abb351ea..cda224ff01b 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -4,7 +4,6 @@ #include "rpl_mi.h" #include "debug_sync.h" - /* Code for optional parallel execution of replicated events on the slave. */ @@ -844,9 +843,9 @@ handle_rpl_parallel_thread(void *arg) { if (last_ir) { - my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock); + my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock); my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); - my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock); + my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock); accumulated_ir_count= 0; } last_ir= ir; @@ -857,9 +856,9 @@ handle_rpl_parallel_thread(void *arg) } if (last_ir) { - my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock); + my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock); my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); - my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock); + my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock); } if ((events= rpt->event_queue) != NULL) -- cgit v1.2.1