diff options
-rw-r--r-- | sql/rpl_parallel.cc | 19 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 8 | ||||
-rw-r--r-- | sql/rpl_rli.h | 1 | ||||
-rw-r--r-- | sql/slave.cc | 168 | ||||
-rw-r--r-- | sql/slave.h | 1 |
5 files changed, 103 insertions, 94 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index e65c543148e..bbc917b6e9d 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -9,11 +9,6 @@ ToDo list: - - Error handling. If we fail in one of multiple parallel executions, we - need to make a best effort to complete prior transactions and roll back - following transactions, so slave binlog position will be correct. - And all the retry logic for temporary errors like deadlock. - - Retry of failed transactions is not yet implemented for the parallel case. - All the waits (eg. in struct wait_for_commit and in @@ -212,7 +207,7 @@ handle_rpl_parallel_thread(void *arg) processing between the event groups as a simple way to ensure that everything is stopped and cleaned up correctly. */ - if (!sql_worker_killed(thd, rgi, in_event_group)) + if (!rgi->is_error && !sql_worker_killed(thd, rgi, in_event_group)) err= rpt_handle_event(events, rpt); else err= thd->wait_for_prior_commit(); @@ -228,6 +223,13 @@ handle_rpl_parallel_thread(void *arg) delete_or_keep_event_post_apply(rgi, event_type, events->ev); my_free(events); + if (err) + { + rgi->is_error= true; + slave_output_error_info(rgi->rli, thd); + rgi->cleanup_context(thd, true); + rgi->rli->abort_slave= true; + } if (end_of_group) { in_event_group= false; @@ -785,6 +787,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, } else if (!is_group_event || !current) { + int err; /* Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. Same for events not preceeded by GTID (we should not see those normally, @@ -802,11 +805,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, rev->new_log_ident, rev->ident_len+1); } - rpt_handle_event(qev, NULL); + err= rpt_handle_event(qev, NULL); delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev); my_free(qev); - return false; + return (err != 0); } else { diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index ebbe5f4407c..b558f2db64c 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1274,12 +1274,10 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, DBA aware of the problem in the error log. */ } + DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE();); if (mi->using_gtid == Master_info::USE_GTID_NO) - { - DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE();); flush_relay_log_info(this); - DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE();); - } + DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE();); /* Note that Rotate_log_event::do_apply_event() does not call this function, so there is no chance that a fake rotate event resets @@ -1453,7 +1451,7 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli_) wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0), deferred_events(NULL), m_annotate_event(0), tables_to_lock(0), tables_to_lock_count(0), trans_retries(0), last_event_start_time(0), - is_parallel_exec(false), + is_parallel_exec(false), is_error(false), row_stmt_start_timestamp(0), long_find_row_note_printed(false) { bzero(¤t_gtid, sizeof(current_gtid)); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 38268ee85c5..2f049c41d0f 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -558,6 +558,7 @@ struct rpl_group_info */ char future_event_master_log_name[FN_REFLEN]; bool is_parallel_exec; + bool is_error; private: /* diff --git a/sql/slave.cc b/sql/slave.cc index fcc92f42536..acb42feb6e7 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -4078,6 +4078,92 @@ end: } +void +slave_output_error_info(Relay_log_info *rli, THD *thd) +{ + /* + retrieve as much info as possible from the thd and, error + codes and warnings and print this to the error log as to + allow the user to locate the error + */ + uint32 const last_errno= rli->last_error().number; + char llbuff[22]; + + if (thd->is_error()) + { + char const *const errmsg= thd->stmt_da->message(); + + DBUG_PRINT("info", + ("thd->stmt_da->sql_errno()=%d; rli->last_error.number=%d", + thd->stmt_da->sql_errno(), last_errno)); + if (last_errno == 0) + { + /* + This function is reporting an error which was not reported + while executing exec_relay_log_event(). + */ + rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), "%s", errmsg); + } + else if (last_errno != thd->stmt_da->sql_errno()) + { + /* + * An error was reported while executing exec_relay_log_event() + * however the error code differs from what is in the thread. + * This function prints out more information to help finding + * what caused the problem. + */ + sql_print_error("Slave (additional info): %s Error_code: %d", + errmsg, thd->stmt_da->sql_errno()); + } + } + + /* Print any warnings issued */ + List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list()); + MYSQL_ERROR *err; + /* + Added controlled slave thread cancel for replication + of user-defined variables. + */ + bool udf_error = false; + while ((err= it++)) + { + if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY) + udf_error = true; + sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno()); + } + if (udf_error) + { + String tmp; + if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_error("Error loading user-defined library, slave SQL " + "thread aborted. Install the missing library, and restart the " + "slave SQL thread with \"SLAVE START\". We stopped at log '%s' " + "position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, + llbuff), tmp.c_ptr_safe()); + } + else + { + String tmp; + if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_error("\ +Error running query, slave SQL thread aborted. Fix the problem, and restart \ +the slave SQL thread with \"SLAVE START\". We stopped at log \ +'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff), + tmp.c_ptr_safe()); + } +} + + /** Slave SQL thread entry point. @@ -4335,87 +4421,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, DBUG_PRINT("info", ("exec_relay_log_event() failed")); // do not scare the user if SQL thread was simply killed or stopped if (!sql_slave_killed(serial_rgi)) - { - /* - retrieve as much info as possible from the thd and, error - codes and warnings and print this to the error log as to - allow the user to locate the error - */ - uint32 const last_errno= rli->last_error().number; - - if (thd->is_error()) - { - char const *const errmsg= thd->stmt_da->message(); - - DBUG_PRINT("info", - ("thd->stmt_da->sql_errno()=%d; rli->last_error.number=%d", - thd->stmt_da->sql_errno(), last_errno)); - if (last_errno == 0) - { - /* - This function is reporting an error which was not reported - while executing exec_relay_log_event(). - */ - rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), "%s", errmsg); - } - else if (last_errno != thd->stmt_da->sql_errno()) - { - /* - * An error was reported while executing exec_relay_log_event() - * however the error code differs from what is in the thread. - * This function prints out more information to help finding - * what caused the problem. - */ - sql_print_error("Slave (additional info): %s Error_code: %d", - errmsg, thd->stmt_da->sql_errno()); - } - } - - /* Print any warnings issued */ - List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list()); - MYSQL_ERROR *err; - /* - Added controlled slave thread cancel for replication - of user-defined variables. - */ - bool udf_error = false; - while ((err= it++)) - { - if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY) - udf_error = true; - sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno()); - } - if (udf_error) - { - String tmp; - if (mi->using_gtid != Master_info::USE_GTID_NO) - { - tmp.append(STRING_WITH_LEN("; GTID position '")); - rpl_append_gtid_state(&tmp, false); - tmp.append(STRING_WITH_LEN("'")); - } - sql_print_error("Error loading user-defined library, slave SQL " - "thread aborted. Install the missing library, and restart the " - "slave SQL thread with \"SLAVE START\". We stopped at log '%s' " - "position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, - llbuff), tmp.c_ptr_safe()); - } - else - { - String tmp; - if (mi->using_gtid != Master_info::USE_GTID_NO) - { - tmp.append(STRING_WITH_LEN("; GTID position '")); - rpl_append_gtid_state(&tmp, false); - tmp.append(STRING_WITH_LEN("'")); - } - sql_print_error("\ -Error running query, slave SQL thread aborted. Fix the problem, and restart \ -the slave SQL thread with \"SLAVE START\". We stopped at log \ -'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff), - tmp.c_ptr_safe()); - } - } + slave_output_error_info(rli, thd); goto err; } } diff --git a/sql/slave.h b/sql/slave.h index 4e64754a877..3981a9d4f2c 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -233,6 +233,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, rpl_parallel_thread *rpt); pthread_handler_t handle_slave_io(void *arg); +void slave_output_error_info(Relay_log_info *rli, THD *thd); pthread_handler_t handle_slave_sql(void *arg); bool net_request_file(NET* net, const char* fname); |