summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/rpl_parallel.cc19
-rw-r--r--sql/rpl_rli.cc8
-rw-r--r--sql/rpl_rli.h1
-rw-r--r--sql/slave.cc168
-rw-r--r--sql/slave.h1
5 files changed, 103 insertions, 94 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index e65c543148e..bbc917b6e9d 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -9,11 +9,6 @@
ToDo list:
- - Error handling. If we fail in one of multiple parallel executions, we
- need to make a best effort to complete prior transactions and roll back
- following transactions, so slave binlog position will be correct.
- And all the retry logic for temporary errors like deadlock.
-
- Retry of failed transactions is not yet implemented for the parallel case.
- All the waits (eg. in struct wait_for_commit and in
@@ -212,7 +207,7 @@ handle_rpl_parallel_thread(void *arg)
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
- if (!sql_worker_killed(thd, rgi, in_event_group))
+ if (!rgi->is_error && !sql_worker_killed(thd, rgi, in_event_group))
err= rpt_handle_event(events, rpt);
else
err= thd->wait_for_prior_commit();
@@ -228,6 +223,13 @@ handle_rpl_parallel_thread(void *arg)
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
my_free(events);
+ if (err)
+ {
+ rgi->is_error= true;
+ slave_output_error_info(rgi->rli, thd);
+ rgi->cleanup_context(thd, true);
+ rgi->rli->abort_slave= true;
+ }
if (end_of_group)
{
in_event_group= false;
@@ -785,6 +787,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
}
else if (!is_group_event || !current)
{
+ int err;
/*
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
Same for events not preceeded by GTID (we should not see those normally,
@@ -802,11 +805,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
rev->new_log_ident, rev->ident_len+1);
}
- rpt_handle_event(qev, NULL);
+ err= rpt_handle_event(qev, NULL);
delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev);
my_free(qev);
- return false;
+ return (err != 0);
}
else
{
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index ebbe5f4407c..b558f2db64c 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1274,12 +1274,10 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
DBA aware of the problem in the error log.
*/
}
+ DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
if (mi->using_gtid == Master_info::USE_GTID_NO)
- {
- DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
flush_relay_log_info(this);
- DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
- }
+ DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
/*
Note that Rotate_log_event::do_apply_event() does not call this
function, so there is no chance that a fake rotate event resets
@@ -1453,7 +1451,7 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli_)
wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
tables_to_lock_count(0), trans_retries(0), last_event_start_time(0),
- is_parallel_exec(false),
+ is_parallel_exec(false), is_error(false),
row_stmt_start_timestamp(0), long_find_row_note_printed(false)
{
bzero(&current_gtid, sizeof(current_gtid));
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 38268ee85c5..2f049c41d0f 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -558,6 +558,7 @@ struct rpl_group_info
*/
char future_event_master_log_name[FN_REFLEN];
bool is_parallel_exec;
+ bool is_error;
private:
/*
diff --git a/sql/slave.cc b/sql/slave.cc
index fcc92f42536..acb42feb6e7 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -4078,6 +4078,92 @@ end:
}
+void
+slave_output_error_info(Relay_log_info *rli, THD *thd)
+{
+ /*
+ retrieve as much info as possible from the thd and, error
+ codes and warnings and print this to the error log as to
+ allow the user to locate the error
+ */
+ uint32 const last_errno= rli->last_error().number;
+ char llbuff[22];
+
+ if (thd->is_error())
+ {
+ char const *const errmsg= thd->stmt_da->message();
+
+ DBUG_PRINT("info",
+ ("thd->stmt_da->sql_errno()=%d; rli->last_error.number=%d",
+ thd->stmt_da->sql_errno(), last_errno));
+ if (last_errno == 0)
+ {
+ /*
+ This function is reporting an error which was not reported
+ while executing exec_relay_log_event().
+ */
+ rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), "%s", errmsg);
+ }
+ else if (last_errno != thd->stmt_da->sql_errno())
+ {
+ /*
+ * An error was reported while executing exec_relay_log_event()
+ * however the error code differs from what is in the thread.
+ * This function prints out more information to help finding
+ * what caused the problem.
+ */
+ sql_print_error("Slave (additional info): %s Error_code: %d",
+ errmsg, thd->stmt_da->sql_errno());
+ }
+ }
+
+ /* Print any warnings issued */
+ List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
+ MYSQL_ERROR *err;
+ /*
+ Added controlled slave thread cancel for replication
+ of user-defined variables.
+ */
+ bool udf_error = false;
+ while ((err= it++))
+ {
+ if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY)
+ udf_error = true;
+ sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno());
+ }
+ if (udf_error)
+ {
+ String tmp;
+ if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ tmp.append(STRING_WITH_LEN("; GTID position '"));
+ rpl_append_gtid_state(&tmp, false);
+ tmp.append(STRING_WITH_LEN("'"));
+ }
+ sql_print_error("Error loading user-defined library, slave SQL "
+ "thread aborted. Install the missing library, and restart the "
+ "slave SQL thread with \"SLAVE START\". We stopped at log '%s' "
+ "position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,
+ llbuff), tmp.c_ptr_safe());
+ }
+ else
+ {
+ String tmp;
+ if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ tmp.append(STRING_WITH_LEN("; GTID position '"));
+ rpl_append_gtid_state(&tmp, false);
+ tmp.append(STRING_WITH_LEN("'"));
+ }
+ sql_print_error("\
+Error running query, slave SQL thread aborted. Fix the problem, and restart \
+the slave SQL thread with \"SLAVE START\". We stopped at log \
+'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff),
+ tmp.c_ptr_safe());
+ }
+}
+
+
/**
Slave SQL thread entry point.
@@ -4335,87 +4421,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
DBUG_PRINT("info", ("exec_relay_log_event() failed"));
// do not scare the user if SQL thread was simply killed or stopped
if (!sql_slave_killed(serial_rgi))
- {
- /*
- retrieve as much info as possible from the thd and, error
- codes and warnings and print this to the error log as to
- allow the user to locate the error
- */
- uint32 const last_errno= rli->last_error().number;
-
- if (thd->is_error())
- {
- char const *const errmsg= thd->stmt_da->message();
-
- DBUG_PRINT("info",
- ("thd->stmt_da->sql_errno()=%d; rli->last_error.number=%d",
- thd->stmt_da->sql_errno(), last_errno));
- if (last_errno == 0)
- {
- /*
- This function is reporting an error which was not reported
- while executing exec_relay_log_event().
- */
- rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), "%s", errmsg);
- }
- else if (last_errno != thd->stmt_da->sql_errno())
- {
- /*
- * An error was reported while executing exec_relay_log_event()
- * however the error code differs from what is in the thread.
- * This function prints out more information to help finding
- * what caused the problem.
- */
- sql_print_error("Slave (additional info): %s Error_code: %d",
- errmsg, thd->stmt_da->sql_errno());
- }
- }
-
- /* Print any warnings issued */
- List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
- MYSQL_ERROR *err;
- /*
- Added controlled slave thread cancel for replication
- of user-defined variables.
- */
- bool udf_error = false;
- while ((err= it++))
- {
- if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY)
- udf_error = true;
- sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno());
- }
- if (udf_error)
- {
- String tmp;
- if (mi->using_gtid != Master_info::USE_GTID_NO)
- {
- tmp.append(STRING_WITH_LEN("; GTID position '"));
- rpl_append_gtid_state(&tmp, false);
- tmp.append(STRING_WITH_LEN("'"));
- }
- sql_print_error("Error loading user-defined library, slave SQL "
- "thread aborted. Install the missing library, and restart the "
- "slave SQL thread with \"SLAVE START\". We stopped at log '%s' "
- "position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,
- llbuff), tmp.c_ptr_safe());
- }
- else
- {
- String tmp;
- if (mi->using_gtid != Master_info::USE_GTID_NO)
- {
- tmp.append(STRING_WITH_LEN("; GTID position '"));
- rpl_append_gtid_state(&tmp, false);
- tmp.append(STRING_WITH_LEN("'"));
- }
- sql_print_error("\
-Error running query, slave SQL thread aborted. Fix the problem, and restart \
-the slave SQL thread with \"SLAVE START\". We stopped at log \
-'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff),
- tmp.c_ptr_safe());
- }
- }
+ slave_output_error_info(rli, thd);
goto err;
}
}
diff --git a/sql/slave.h b/sql/slave.h
index 4e64754a877..3981a9d4f2c 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -233,6 +233,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
rpl_parallel_thread *rpt);
pthread_handler_t handle_slave_io(void *arg);
+void slave_output_error_info(Relay_log_info *rli, THD *thd);
pthread_handler_t handle_slave_sql(void *arg);
bool net_request_file(NET* net, const char* fname);