summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc468
1 files changed, 379 insertions, 89 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index e72d3470a7f..0d23248539c 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -7,15 +7,6 @@
/*
Code for optional parallel execution of replicated events on the slave.
-
- ToDo list:
-
- - Retry of failed transactions is not yet implemented for the parallel case.
-
- - All the waits (eg. in struct wait_for_commit and in
- rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill,
- everything needs to be correctly rolled back and stopped in all threads,
- to ensure a consistent slave replication state.
*/
struct rpl_parallel_thread_pool global_rpl_thread_pool;
@@ -32,7 +23,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
- thd->rgi_slave= rgi;
thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
@@ -44,7 +34,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
- thd->rgi_slave= NULL;
thread_safe_increment64(&rli->executed_entries,
&slave_executed_entries_lock);
@@ -165,6 +154,7 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
thd->clear_error();
+ thd->reset_killed();
thd->get_stmt_da()->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(rgi->worker_error);
}
@@ -197,6 +187,281 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
}
+static void
+register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
+ rpl_parallel_entry *entry)
+{
+ mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
+ if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
+ {
+ /*
+ Register that the commit of this event group must wait for the
+ commit of the previous event group to complete before it may
+ complete itself, so that we preserve commit order.
+ */
+ wait_for_commit *waitee=
+ &rgi->wait_commit_group_info->commit_orderer;
+ rgi->commit_orderer.register_wait_for_prior_commit(waitee);
+ }
+}
+
+
+#ifndef DBUG_OFF
+static int
+dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
+{
+ if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 100 &&
+ rgi->retry_event_count == 4)
+ {
+ thd->clear_error();
+ thd->get_stmt_da()->reset_diagnostics_area();
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ return 1;
+ }
+ return 0;
+}
+#endif
+
+
+/*
+ If we detect a deadlock due to eg. storage engine locks that conflict with
+ the fixed commit order, then the later transaction will be killed
+ asynchroneously to allow the former to complete its commit.
+
+ In this case, we convert the 'killed' error into a deadlock error, and retry
+ the later transaction. */
+static void
+convert_kill_to_deadlock_error(rpl_group_info *rgi)
+{
+ THD *thd= rgi->thd;
+ int err_code;
+
+ if (!thd->get_stmt_da()->is_error())
+ return;
+ err_code= thd->get_stmt_da()->sql_errno();
+ if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
+ rgi->killed_for_retry)
+ {
+ thd->clear_error();
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ rgi->killed_for_retry= false;
+ thd->reset_killed();
+ }
+}
+
+
+static bool
+is_group_ending(Log_event *ev, Log_event_type event_type)
+{
+ return event_type == XID_EVENT ||
+ (event_type == QUERY_EVENT &&
+ (((Query_log_event *)ev)->is_commit() ||
+ ((Query_log_event *)ev)->is_rollback()));
+}
+
+
+static int
+retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
+ rpl_parallel_thread::queued_event *orig_qev)
+{
+ IO_CACHE rlog;
+ LOG_INFO linfo;
+ File fd= (File)-1;
+ const char *errmsg= NULL;
+ inuse_relaylog *ir= rgi->relay_log;
+ uint64 event_count;
+ uint64 events_to_execute= rgi->retry_event_count;
+ Relay_log_info *rli= rgi->rli;
+ int err;
+ ulonglong cur_offset, old_offset;
+ char log_name[FN_REFLEN];
+ THD *thd= rgi->thd;
+ rpl_parallel_entry *entry= rgi->parallel_entry;
+ ulong retries= 0;
+
+do_retry:
+ event_count= 0;
+ err= 0;
+
+ /*
+ If we already started committing before getting the deadlock (or other
+ error) that caused us to need to retry, we have already signalled
+ subsequent transactions that we have started committing. This is
+ potentially a problem, as now we will rollback, and if subsequent
+ transactions would start to execute now, they could see an unexpected
+ state of the database and get eg. key not found or duplicate key error.
+
+ However, to get a deadlock in the first place, there must have been
+ another earlier transaction that is waiting for us. Thus that other
+ transaction has _not_ yet started to commit, and any subsequent
+ transactions will still be waiting at this point.
+
+ So here, we decrement back the count of transactions that started
+ committing (if we already incremented it), undoing the effect of an
+ earlier mark_start_commit(). Then later, when the retry succeeds and we
+ commit again, we can do a new mark_start_commit() and eventually wake up
+ subsequent transactions at the proper time.
+
+ We need to do the unmark before the rollback, to be sure that the
+ transaction we deadlocked with will not signal that it started to commit
+ until after the unmark.
+ */
+ rgi->unmark_start_commit();
+
+ /*
+ We might get the deadlock error that causes the retry during commit, while
+ sitting in wait_for_prior_commit(). If this happens, we will have a
+ pending error in the wait_for_commit object. So clear this by
+ unregistering (and later re-registering) the wait.
+ */
+ if(thd->wait_for_commit_ptr)
+ thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
+ rgi->cleanup_context(thd, 1);
+
+ mysql_mutex_lock(&rli->data_lock);
+ ++rli->retried_trans;
+ statistic_increment(slave_retried_transactions, LOCK_status);
+ mysql_mutex_unlock(&rli->data_lock);
+
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ register_wait_for_prior_event_group_commit(rgi, entry);
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+
+ strmake_buf(log_name, ir->name);
+ if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
+ {
+ err= 1;
+ goto err;
+ }
+ cur_offset= rgi->retry_start_offset;
+ my_b_seek(&rlog, cur_offset);
+
+ do
+ {
+ Log_event_type event_type;
+ Log_event *ev;
+ rpl_parallel_thread::queued_event *qev;
+
+ /* The loop is here so we can try again the next relay log file on EOF. */
+ for (;;)
+ {
+ old_offset= cur_offset;
+ ev= Log_event::read_log_event(&rlog, 0,
+ rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
+ opt_slave_sql_verify_checksum);
+ cur_offset= my_b_tell(&rlog);
+
+ if (ev)
+ break;
+ if (rlog.error < 0)
+ {
+ errmsg= "slave SQL thread aborted because of I/O error";
+ err= 1;
+ goto err;
+ }
+ if (rlog.error > 0)
+ {
+ sql_print_error("Slave SQL thread: I/O error reading "
+ "event(errno: %d cur_log->error: %d)",
+ my_errno, rlog.error);
+ errmsg= "Aborting slave SQL thread because of partial event read";
+ err= 1;
+ goto err;
+ }
+ /* EOF. Move to the next relay log. */
+ end_io_cache(&rlog);
+ mysql_file_close(fd, MYF(MY_WME));
+ fd= (File)-1;
+
+ /* Find the next relay log file. */
+ if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) ||
+ (err= rli->relay_log.find_next_log(&linfo, 1)))
+ {
+ char buff[22];
+ sql_print_error("next log error: %d offset: %s log: %s",
+ err,
+ llstr(linfo.index_file_offset, buff),
+ log_name);
+ goto err;
+ }
+ strmake_buf(log_name ,linfo.log_file_name);
+
+ if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
+ {
+ err= 1;
+ goto err;
+ }
+ /* Loop to try again on the new log file. */
+ }
+
+ event_type= ev->get_type_code();
+ if (!Log_event::is_group_event(event_type))
+ {
+ delete ev;
+ continue;
+ }
+ ev->thd= thd;
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
+ cur_offset - old_offset);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ if (!qev)
+ {
+ delete ev;
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ err= 1;
+ goto err;
+ }
+ if (is_group_ending(ev, event_type))
+ rgi->mark_start_commit();
+
+ err= rpt_handle_event(qev, rpt);
+ ++event_count;
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->free_qev(qev);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+
+ delete_or_keep_event_post_apply(rgi, event_type, ev);
+ DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100",
+ if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
+ DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100",
+ err= dbug_simulate_tmp_error(rgi, thd););
+ if (err)
+ {
+ convert_kill_to_deadlock_error(rgi);
+ if (has_temporary_error(thd))
+ {
+ ++retries;
+ if (retries < slave_trans_retries)
+ {
+ end_io_cache(&rlog);
+ mysql_file_close(fd, MYF(MY_WME));
+ fd= (File)-1;
+ goto do_retry;
+ }
+ sql_print_error("Slave worker thread retried transaction %lu time(s) "
+ "in vain, giving up. Consider raising the value of "
+ "the slave_transaction_retries variable.",
+ slave_trans_retries);
+ }
+ goto err;
+ }
+ } while (event_count < events_to_execute);
+
+err:
+
+ if (fd >= 0)
+ {
+ end_io_cache(&rlog);
+ mysql_file_close(fd, MYF(MY_WME));
+ }
+ if (errmsg)
+ sql_print_error("Error reading relay log event: %s", errmsg);
+ return err;
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -215,6 +480,8 @@ handle_rpl_parallel_thread(void *arg)
rpl_sql_thread_info sql_info(NULL);
size_t total_event_size;
int err;
+ inuse_relaylog *last_ir;
+ uint64 accumulated_ir_count;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@@ -244,39 +511,6 @@ handle_rpl_parallel_thread(void *arg)
thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
thd->system_thread_info.rpl_sql_info= &sql_info;
- /*
- For now, we need to run the replication parallel worker threads in
- READ COMMITTED. This is needed because gap locks are not symmetric.
- For example, a gap lock from a DELETE blocks an insert intention lock,
- but not vice versa. So an INSERT followed by DELETE can group commit
- on the master, but if we are unlucky with thread scheduling we can
- then deadlock on the slave because the INSERT ends up waiting for a
- gap lock from the DELETE (and the DELETE in turn waits for the INSERT
- in wait_for_prior_commit()). See also MDEV-5914.
-
- It should be mostly safe to run in READ COMMITTED in the slave anyway.
- The commit order is already fixed from on the master, so we do not
- risk logging into the binlog in an incorrect order between worker
- threads (one that would cause different results if executed on a
- lower-level slave that uses this slave as a master). The only
- potential problem is with transactions run in a different master
- connection (using multi-source replication), or run directly on the
- slave by an application; when using READ COMMITTED we are not
- guaranteed serialisability of binlogged statements.
-
- In practice, this is unlikely to be an issue. In GTID mode, such
- parallel transactions from multi-source or application must in any
- case use a different replication domain, in which case binlog order
- by definition must be independent between the different domain. Even
- in non-GTID mode, normally one will assume that the external
- transactions are not conflicting with those applied by the slave, so
- that isolation level should make no difference. It would be rather
- strange if the result of applying query events from one master would
- depend on the timing and nature of other queries executed from
- different multi-source connections or done directly on the slave by
- an application. Still, something to be aware of.
- */
- thd->variables.tx_isolation= ISO_READ_COMMITTED;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->thd= thd;
@@ -332,7 +566,7 @@ handle_rpl_parallel_thread(void *arg)
continue;
}
- group_rgi= rgi;
+ thd->rgi_slave= group_rgi= rgi;
gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
@@ -341,7 +575,6 @@ handle_rpl_parallel_thread(void *arg)
PSI_stage_info old_stage;
uint64 wait_count;
- thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation;
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -352,9 +585,7 @@ handle_rpl_parallel_thread(void *arg)
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
- /* Save this, as it gets cleared when the event group commits. */
event_gtid_sub_id= rgi->gtid_sub_id;
-
rgi->thd= thd;
/*
@@ -388,7 +619,7 @@ handle_rpl_parallel_thread(void *arg)
{
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->send_kill_message();
- slave_output_error_info(rgi->rli, thd);
+ slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, 1);
/*
Even though we were killed, we need to continue waiting for the
@@ -430,17 +661,9 @@ handle_rpl_parallel_thread(void *arg)
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
skip_event_group= true;
- else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
- {
- /*
- Register that the commit of this event group must wait for the
- commit of the previous event group to complete before it may
- complete itself, so that we preserve commit order.
- */
- wait_for_commit *waitee=
- &rgi->wait_commit_group_info->commit_orderer;
- rgi->commit_orderer.register_wait_for_prior_commit(waitee);
- }
+ else
+ register_wait_for_prior_event_group_commit(rgi, entry);
+
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
&did_enter_cond, &old_stage);
@@ -467,7 +690,7 @@ handle_rpl_parallel_thread(void *arg)
if (res < 0)
{
/* Error. */
- slave_output_error_info(rgi->rli, thd);
+ slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, 1);
}
else if (!res)
@@ -482,11 +705,8 @@ handle_rpl_parallel_thread(void *arg)
}
}
- group_ending= event_type == XID_EVENT ||
- (event_type == QUERY_EVENT &&
- (((Query_log_event *)events->ev)->is_commit() ||
- ((Query_log_event *)events->ev)->is_rollback()));
- if (group_ending)
+ group_ending= is_group_ending(events->ev, event_type);
+ if (group_ending && likely(!rgi->worker_error))
{
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
rgi->mark_start_commit();
@@ -498,24 +718,42 @@ handle_rpl_parallel_thread(void *arg)
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
- if (!rgi->worker_error && !skip_event_group)
+ if (likely(!rgi->worker_error) && !skip_event_group)
+ {
+ ++rgi->retry_event_count;
err= rpt_handle_event(events, rpt);
+ delete_or_keep_event_post_apply(rgi, event_type, events->ev);
+ DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
+ err= dbug_simulate_tmp_error(rgi, thd););
+ if (err)
+ {
+ convert_kill_to_deadlock_error(rgi);
+ if (has_temporary_error(thd) && slave_trans_retries > 0)
+ err= retry_event_group(rgi, rpt, events);
+ }
+ }
else
+ {
+ delete events->ev;
err= thd->wait_for_prior_commit();
+ }
end_of_group=
in_event_group &&
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
group_ending);
- delete_or_keep_event_post_apply(rgi, event_type, events->ev);
events->next= qevs_to_free;
qevs_to_free= events;
- if (unlikely(err) && !rgi->worker_error)
+ if (unlikely(err))
{
- slave_output_error_info(rgi->rli, thd);
- signal_error_to_sql_driver_thread(thd, rgi, err);
+ if (!rgi->worker_error)
+ {
+ slave_output_error_info(rgi, thd);
+ signal_error_to_sql_driver_thread(thd, rgi, err);
+ }
+ thd->reset_killed();
}
if (end_of_group)
{
@@ -523,7 +761,7 @@ handle_rpl_parallel_thread(void *arg)
finish_event_group(thd, event_gtid_sub_id, entry, rgi);
rgi->next= rgis_to_free;
rgis_to_free= rgi;
- group_rgi= rgi= NULL;
+ thd->rgi_slave= group_rgi= rgi= NULL;
skip_event_group= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
@@ -548,12 +786,34 @@ handle_rpl_parallel_thread(void *arg)
rpt->free_rgi(rgis_to_free);
rgis_to_free= next;
}
+ last_ir= NULL;
+ accumulated_ir_count= 0;
while (qevs_to_free)
{
rpl_parallel_thread::queued_event *next= qevs_to_free->next;
+ inuse_relaylog *ir= qevs_to_free->ir;
+ /* Batch up refcount update to reduce use of synchronised operations. */
+ if (last_ir != ir)
+ {
+ if (last_ir)
+ {
+ my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock);
+ my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
+ my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock);
+ accumulated_ir_count= 0;
+ }
+ last_ir= ir;
+ }
+ ++accumulated_ir_count;
rpt->free_qev(qevs_to_free);
qevs_to_free= next;
}
+ if (last_ir)
+ {
+ my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock);
+ my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
+ my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock);
+ }
if ((events= rpt->event_queue) != NULL)
{
@@ -584,7 +844,7 @@ handle_rpl_parallel_thread(void *arg)
in_event_group= false;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_rgi(group_rgi);
- group_rgi= NULL;
+ thd->rgi_slave= group_rgi= NULL;
skip_event_group= false;
}
if (!in_event_group)
@@ -802,8 +1062,7 @@ err:
rpl_parallel_thread::queued_event *
-rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
- Relay_log_info *rli)
+rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
{
queued_event *qev;
mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -817,6 +1076,17 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
qev->ev= ev;
qev->event_size= event_size;
qev->next= NULL;
+ return qev;
+}
+
+
+rpl_parallel_thread::queued_event *
+rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
+ Relay_log_info *rli)
+{
+ queued_event *qev= get_qev_common(ev, event_size);
+ if (!qev)
+ return NULL;
strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
qev->event_relay_log_pos= rli->event_relay_log_pos;
qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
@@ -825,6 +1095,24 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
}
+rpl_parallel_thread::queued_event *
+rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev,
+ const char *relay_log_name,
+ ulonglong event_pos, ulonglong event_size)
+{
+ queued_event *qev= get_qev_common(ev, event_size);
+ if (!qev)
+ return NULL;
+ qev->rgi= orig_qev->rgi;
+ strcpy(qev->event_relay_log_name, relay_log_name);
+ qev->event_relay_log_pos= event_pos;
+ qev->future_event_relay_log_pos= event_pos+event_size;
+ strcpy(qev->future_event_master_log_name,
+ orig_qev->future_event_master_log_name);
+ return qev;
+}
+
+
void
rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
{
@@ -836,7 +1124,7 @@ rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
rpl_group_info*
rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
- rpl_parallel_entry *e)
+ rpl_parallel_entry *e, ulonglong event_size)
{
rpl_group_info *rgi;
mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -864,6 +1152,10 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
return NULL;
}
rgi->parallel_entry= e;
+ rgi->relay_log= rli->last_inuse_relaylog;
+ rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
+ rgi->retry_event_count= 0;
+ rgi->killed_for_retry= false;
return rgi;
}
@@ -1018,10 +1310,11 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
if it is still available. Otherwise a new worker thread is allocated.
*/
rpl_parallel_thread *
-rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond,
+rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage, bool reuse)
{
uint32 idx;
+ Relay_log_info *rli= rgi->rli;
rpl_parallel_thread *thr;
idx= rpl_thread_idx;
@@ -1066,7 +1359,7 @@ rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond,
debug_sync_set_action(rli->sql_driver_thd,
STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
};);
- slave_output_error_info(rli, rli->sql_driver_thd);
+ slave_output_error_info(rgi, rli->sql_driver_thd);
return NULL;
}
else
@@ -1390,15 +1683,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (typ == GTID_EVENT)
{
- uint32 domain_id;
- if (likely(typ == GTID_EVENT))
- {
- Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
- 0 : gtid_ev->domain_id);
- }
- else
- domain_id= 0;
+ Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+ uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
+ 0 : gtid_ev->domain_id);
if (!(e= find(domain_id)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
@@ -1417,7 +1704,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
instead re-use a thread that we queued for previously.
*/
cur_thread=
- e->choose_thread(rli, &did_enter_cond, &old_stage, typ != GTID_EVENT);
+ e->choose_thread(serial_rgi, &did_enter_cond, &old_stage,
+ typ != GTID_EVENT);
if (!cur_thread)
{
/* This means we were killed. The error is already signalled. */
@@ -1437,7 +1725,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e)))
+ if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
{
cur_thread->free_qev(qev);
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
@@ -1549,6 +1837,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Queue the event for processing.
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
+ qev->ir= rli->last_inuse_relaylog;
+ ++qev->ir->queued_count;
cur_thread->enqueue(qev);
unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
&did_enter_cond, &old_stage);