diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 468 |
1 files changed, 379 insertions, 89 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index e72d3470a7f..0d23248539c 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -7,15 +7,6 @@ /* Code for optional parallel execution of replicated events on the slave. - - ToDo list: - - - Retry of failed transactions is not yet implemented for the parallel case. - - - All the waits (eg. in struct wait_for_commit and in - rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill, - everything needs to be correctly rolled back and stopped in all threads, - to ensure a consistent slave replication state. */ struct rpl_parallel_thread_pool global_rpl_thread_pool; @@ -32,7 +23,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, Relay_log_info *rli= rgi->rli; THD *thd= rgi->thd; - thd->rgi_slave= rgi; thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter; /* ToDo: Access to thd, and what about rli, split out a parallel part? */ @@ -44,7 +34,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); - thd->rgi_slave= NULL; thread_safe_increment64(&rli->executed_entries, &slave_executed_entries_lock); @@ -165,6 +154,7 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, mysql_mutex_unlock(&entry->LOCK_parallel_entry); thd->clear_error(); + thd->reset_killed(); thd->get_stmt_da()->reset_diagnostics_area(); wfc->wakeup_subsequent_commits(rgi->worker_error); } @@ -197,6 +187,281 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond, } +static void +register_wait_for_prior_event_group_commit(rpl_group_info *rgi, + rpl_parallel_entry *entry) +{ + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + if (rgi->wait_commit_sub_id > entry->last_committed_sub_id) + { + /* + Register that the commit of this event group must wait for the + commit of the previous event group to complete before it may + complete itself, so that we preserve commit order. + */ + wait_for_commit *waitee= + &rgi->wait_commit_group_info->commit_orderer; + rgi->commit_orderer.register_wait_for_prior_commit(waitee); + } +} + + +#ifndef DBUG_OFF +static int +dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) +{ + if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 100 && + rgi->retry_event_count == 4) + { + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); + my_error(ER_LOCK_DEADLOCK, MYF(0)); + return 1; + } + return 0; +} +#endif + + +/* + If we detect a deadlock due to eg. storage engine locks that conflict with + the fixed commit order, then the later transaction will be killed + asynchroneously to allow the former to complete its commit. + + In this case, we convert the 'killed' error into a deadlock error, and retry + the later transaction. */ +static void +convert_kill_to_deadlock_error(rpl_group_info *rgi) +{ + THD *thd= rgi->thd; + int err_code; + + if (!thd->get_stmt_da()->is_error()) + return; + err_code= thd->get_stmt_da()->sql_errno(); + if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) && + rgi->killed_for_retry) + { + thd->clear_error(); + my_error(ER_LOCK_DEADLOCK, MYF(0)); + rgi->killed_for_retry= false; + thd->reset_killed(); + } +} + + +static bool +is_group_ending(Log_event *ev, Log_event_type event_type) +{ + return event_type == XID_EVENT || + (event_type == QUERY_EVENT && + (((Query_log_event *)ev)->is_commit() || + ((Query_log_event *)ev)->is_rollback())); +} + + +static int +retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, + rpl_parallel_thread::queued_event *orig_qev) +{ + IO_CACHE rlog; + LOG_INFO linfo; + File fd= (File)-1; + const char *errmsg= NULL; + inuse_relaylog *ir= rgi->relay_log; + uint64 event_count; + uint64 events_to_execute= rgi->retry_event_count; + Relay_log_info *rli= rgi->rli; + int err; + ulonglong cur_offset, old_offset; + char log_name[FN_REFLEN]; + THD *thd= rgi->thd; + rpl_parallel_entry *entry= rgi->parallel_entry; + ulong retries= 0; + +do_retry: + event_count= 0; + err= 0; + + /* + If we already started committing before getting the deadlock (or other + error) that caused us to need to retry, we have already signalled + subsequent transactions that we have started committing. This is + potentially a problem, as now we will rollback, and if subsequent + transactions would start to execute now, they could see an unexpected + state of the database and get eg. key not found or duplicate key error. + + However, to get a deadlock in the first place, there must have been + another earlier transaction that is waiting for us. Thus that other + transaction has _not_ yet started to commit, and any subsequent + transactions will still be waiting at this point. + + So here, we decrement back the count of transactions that started + committing (if we already incremented it), undoing the effect of an + earlier mark_start_commit(). Then later, when the retry succeeds and we + commit again, we can do a new mark_start_commit() and eventually wake up + subsequent transactions at the proper time. + + We need to do the unmark before the rollback, to be sure that the + transaction we deadlocked with will not signal that it started to commit + until after the unmark. + */ + rgi->unmark_start_commit(); + + /* + We might get the deadlock error that causes the retry during commit, while + sitting in wait_for_prior_commit(). If this happens, we will have a + pending error in the wait_for_commit object. So clear this by + unregistering (and later re-registering) the wait. + */ + if(thd->wait_for_commit_ptr) + thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); + rgi->cleanup_context(thd, 1); + + mysql_mutex_lock(&rli->data_lock); + ++rli->retried_trans; + statistic_increment(slave_retried_transactions, LOCK_status); + mysql_mutex_unlock(&rli->data_lock); + + mysql_mutex_lock(&entry->LOCK_parallel_entry); + register_wait_for_prior_event_group_commit(rgi, entry); + mysql_mutex_unlock(&entry->LOCK_parallel_entry); + + strmake_buf(log_name, ir->name); + if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) + { + err= 1; + goto err; + } + cur_offset= rgi->retry_start_offset; + my_b_seek(&rlog, cur_offset); + + do + { + Log_event_type event_type; + Log_event *ev; + rpl_parallel_thread::queued_event *qev; + + /* The loop is here so we can try again the next relay log file on EOF. */ + for (;;) + { + old_offset= cur_offset; + ev= Log_event::read_log_event(&rlog, 0, + rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */, + opt_slave_sql_verify_checksum); + cur_offset= my_b_tell(&rlog); + + if (ev) + break; + if (rlog.error < 0) + { + errmsg= "slave SQL thread aborted because of I/O error"; + err= 1; + goto err; + } + if (rlog.error > 0) + { + sql_print_error("Slave SQL thread: I/O error reading " + "event(errno: %d cur_log->error: %d)", + my_errno, rlog.error); + errmsg= "Aborting slave SQL thread because of partial event read"; + err= 1; + goto err; + } + /* EOF. Move to the next relay log. */ + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + fd= (File)-1; + + /* Find the next relay log file. */ + if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) || + (err= rli->relay_log.find_next_log(&linfo, 1))) + { + char buff[22]; + sql_print_error("next log error: %d offset: %s log: %s", + err, + llstr(linfo.index_file_offset, buff), + log_name); + goto err; + } + strmake_buf(log_name ,linfo.log_file_name); + + if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) + { + err= 1; + goto err; + } + /* Loop to try again on the new log file. */ + } + + event_type= ev->get_type_code(); + if (!Log_event::is_group_event(event_type)) + { + delete ev; + continue; + } + ev->thd= thd; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset, + cur_offset - old_offset); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + if (!qev) + { + delete ev; + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + err= 1; + goto err; + } + if (is_group_ending(ev, event_type)) + rgi->mark_start_commit(); + + err= rpt_handle_event(qev, rpt); + ++event_count; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->free_qev(qev); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + + delete_or_keep_event_post_apply(rgi, event_type, ev); + DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100", + if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd);); + DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100", + err= dbug_simulate_tmp_error(rgi, thd);); + if (err) + { + convert_kill_to_deadlock_error(rgi); + if (has_temporary_error(thd)) + { + ++retries; + if (retries < slave_trans_retries) + { + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + fd= (File)-1; + goto do_retry; + } + sql_print_error("Slave worker thread retried transaction %lu time(s) " + "in vain, giving up. Consider raising the value of " + "the slave_transaction_retries variable.", + slave_trans_retries); + } + goto err; + } + } while (event_count < events_to_execute); + +err: + + if (fd >= 0) + { + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + } + if (errmsg) + sql_print_error("Error reading relay log event: %s", errmsg); + return err; +} + + pthread_handler_t handle_rpl_parallel_thread(void *arg) { @@ -215,6 +480,8 @@ handle_rpl_parallel_thread(void *arg) rpl_sql_thread_info sql_info(NULL); size_t total_event_size; int err; + inuse_relaylog *last_ir; + uint64 accumulated_ir_count; struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; @@ -244,39 +511,6 @@ handle_rpl_parallel_thread(void *arg) thd->set_time(); thd->variables.lock_wait_timeout= LONG_TIMEOUT; thd->system_thread_info.rpl_sql_info= &sql_info; - /* - For now, we need to run the replication parallel worker threads in - READ COMMITTED. This is needed because gap locks are not symmetric. - For example, a gap lock from a DELETE blocks an insert intention lock, - but not vice versa. So an INSERT followed by DELETE can group commit - on the master, but if we are unlucky with thread scheduling we can - then deadlock on the slave because the INSERT ends up waiting for a - gap lock from the DELETE (and the DELETE in turn waits for the INSERT - in wait_for_prior_commit()). See also MDEV-5914. - - It should be mostly safe to run in READ COMMITTED in the slave anyway. - The commit order is already fixed from on the master, so we do not - risk logging into the binlog in an incorrect order between worker - threads (one that would cause different results if executed on a - lower-level slave that uses this slave as a master). The only - potential problem is with transactions run in a different master - connection (using multi-source replication), or run directly on the - slave by an application; when using READ COMMITTED we are not - guaranteed serialisability of binlogged statements. - - In practice, this is unlikely to be an issue. In GTID mode, such - parallel transactions from multi-source or application must in any - case use a different replication domain, in which case binlog order - by definition must be independent between the different domain. Even - in non-GTID mode, normally one will assume that the external - transactions are not conflicting with those applied by the slave, so - that isolation level should make no difference. It would be rather - strange if the result of applying query events from one master would - depend on the timing and nature of other queries executed from - different multi-source connections or done directly on the slave by - an application. Still, something to be aware of. - */ - thd->variables.tx_isolation= ISO_READ_COMMITTED; mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->thd= thd; @@ -332,7 +566,7 @@ handle_rpl_parallel_thread(void *arg) continue; } - group_rgi= rgi; + thd->rgi_slave= group_rgi= rgi; gco= rgi->gco; /* Handle a new event group, which will be initiated by a GTID event. */ if ((event_type= events->ev->get_type_code()) == GTID_EVENT) @@ -341,7 +575,6 @@ handle_rpl_parallel_thread(void *arg) PSI_stage_info old_stage; uint64 wait_count; - thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation; in_event_group= true; /* If the standalone flag is set, then this event group consists of a @@ -352,9 +585,7 @@ handle_rpl_parallel_thread(void *arg) (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 & Gtid_log_event::FL_STANDALONE)); - /* Save this, as it gets cleared when the event group commits. */ event_gtid_sub_id= rgi->gtid_sub_id; - rgi->thd= thd; /* @@ -388,7 +619,7 @@ handle_rpl_parallel_thread(void *arg) { DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); thd->send_kill_message(); - slave_output_error_info(rgi->rli, thd); + slave_output_error_info(rgi, thd); signal_error_to_sql_driver_thread(thd, rgi, 1); /* Even though we were killed, we need to continue waiting for the @@ -430,17 +661,9 @@ handle_rpl_parallel_thread(void *arg) if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) skip_event_group= true; - else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id) - { - /* - Register that the commit of this event group must wait for the - commit of the previous event group to complete before it may - complete itself, so that we preserve commit order. - */ - wait_for_commit *waitee= - &rgi->wait_commit_group_info->commit_orderer; - rgi->commit_orderer.register_wait_for_prior_commit(waitee); - } + else + register_wait_for_prior_event_group_commit(rgi, entry); + unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, &did_enter_cond, &old_stage); @@ -467,7 +690,7 @@ handle_rpl_parallel_thread(void *arg) if (res < 0) { /* Error. */ - slave_output_error_info(rgi->rli, thd); + slave_output_error_info(rgi, thd); signal_error_to_sql_driver_thread(thd, rgi, 1); } else if (!res) @@ -482,11 +705,8 @@ handle_rpl_parallel_thread(void *arg) } } - group_ending= event_type == XID_EVENT || - (event_type == QUERY_EVENT && - (((Query_log_event *)events->ev)->is_commit() || - ((Query_log_event *)events->ev)->is_rollback())); - if (group_ending) + group_ending= is_group_ending(events->ev, event_type); + if (group_ending && likely(!rgi->worker_error)) { DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit"); rgi->mark_start_commit(); @@ -498,24 +718,42 @@ handle_rpl_parallel_thread(void *arg) processing between the event groups as a simple way to ensure that everything is stopped and cleaned up correctly. */ - if (!rgi->worker_error && !skip_event_group) + if (likely(!rgi->worker_error) && !skip_event_group) + { + ++rgi->retry_event_count; err= rpt_handle_event(events, rpt); + delete_or_keep_event_post_apply(rgi, event_type, events->ev); + DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100", + err= dbug_simulate_tmp_error(rgi, thd);); + if (err) + { + convert_kill_to_deadlock_error(rgi); + if (has_temporary_error(thd) && slave_trans_retries > 0) + err= retry_event_group(rgi, rpt, events); + } + } else + { + delete events->ev; err= thd->wait_for_prior_commit(); + } end_of_group= in_event_group && ((group_standalone && !Log_event::is_part_of_group(event_type)) || group_ending); - delete_or_keep_event_post_apply(rgi, event_type, events->ev); events->next= qevs_to_free; qevs_to_free= events; - if (unlikely(err) && !rgi->worker_error) + if (unlikely(err)) { - slave_output_error_info(rgi->rli, thd); - signal_error_to_sql_driver_thread(thd, rgi, err); + if (!rgi->worker_error) + { + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, err); + } + thd->reset_killed(); } if (end_of_group) { @@ -523,7 +761,7 @@ handle_rpl_parallel_thread(void *arg) finish_event_group(thd, event_gtid_sub_id, entry, rgi); rgi->next= rgis_to_free; rgis_to_free= rgi; - group_rgi= rgi= NULL; + thd->rgi_slave= group_rgi= rgi= NULL; skip_event_group= false; DEBUG_SYNC(thd, "rpl_parallel_end_of_group"); } @@ -548,12 +786,34 @@ handle_rpl_parallel_thread(void *arg) rpt->free_rgi(rgis_to_free); rgis_to_free= next; } + last_ir= NULL; + accumulated_ir_count= 0; while (qevs_to_free) { rpl_parallel_thread::queued_event *next= qevs_to_free->next; + inuse_relaylog *ir= qevs_to_free->ir; + /* Batch up refcount update to reduce use of synchronised operations. */ + if (last_ir != ir) + { + if (last_ir) + { + my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock); + my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); + my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock); + accumulated_ir_count= 0; + } + last_ir= ir; + } + ++accumulated_ir_count; rpt->free_qev(qevs_to_free); qevs_to_free= next; } + if (last_ir) + { + my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock); + my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); + my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock); + } if ((events= rpt->event_queue) != NULL) { @@ -584,7 +844,7 @@ handle_rpl_parallel_thread(void *arg) in_event_group= false; mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->free_rgi(group_rgi); - group_rgi= NULL; + thd->rgi_slave= group_rgi= NULL; skip_event_group= false; } if (!in_event_group) @@ -802,8 +1062,7 @@ err: rpl_parallel_thread::queued_event * -rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, - Relay_log_info *rli) +rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size) { queued_event *qev; mysql_mutex_assert_owner(&LOCK_rpl_thread); @@ -817,6 +1076,17 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, qev->ev= ev; qev->event_size= event_size; qev->next= NULL; + return qev; +} + + +rpl_parallel_thread::queued_event * +rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, + Relay_log_info *rli) +{ + queued_event *qev= get_qev_common(ev, event_size); + if (!qev) + return NULL; strcpy(qev->event_relay_log_name, rli->event_relay_log_name); qev->event_relay_log_pos= rli->event_relay_log_pos; qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; @@ -825,6 +1095,24 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, } +rpl_parallel_thread::queued_event * +rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev, + const char *relay_log_name, + ulonglong event_pos, ulonglong event_size) +{ + queued_event *qev= get_qev_common(ev, event_size); + if (!qev) + return NULL; + qev->rgi= orig_qev->rgi; + strcpy(qev->event_relay_log_name, relay_log_name); + qev->event_relay_log_pos= event_pos; + qev->future_event_relay_log_pos= event_pos+event_size; + strcpy(qev->future_event_master_log_name, + orig_qev->future_event_master_log_name); + return qev; +} + + void rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) { @@ -836,7 +1124,7 @@ rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) rpl_group_info* rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, - rpl_parallel_entry *e) + rpl_parallel_entry *e, ulonglong event_size) { rpl_group_info *rgi; mysql_mutex_assert_owner(&LOCK_rpl_thread); @@ -864,6 +1152,10 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, return NULL; } rgi->parallel_entry= e; + rgi->relay_log= rli->last_inuse_relaylog; + rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size; + rgi->retry_event_count= 0; + rgi->killed_for_retry= false; return rgi; } @@ -1018,10 +1310,11 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt) if it is still available. Otherwise a new worker thread is allocated. */ rpl_parallel_thread * -rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond, +rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage, bool reuse) { uint32 idx; + Relay_log_info *rli= rgi->rli; rpl_parallel_thread *thr; idx= rpl_thread_idx; @@ -1066,7 +1359,7 @@ rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond, debug_sync_set_action(rli->sql_driver_thd, STRING_WITH_LEN("now SIGNAL wait_queue_killed")); };); - slave_output_error_info(rli, rli->sql_driver_thd); + slave_output_error_info(rgi, rli->sql_driver_thd); return NULL; } else @@ -1390,15 +1683,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, if (typ == GTID_EVENT) { - uint32 domain_id; - if (likely(typ == GTID_EVENT)) - { - Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); - domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? - 0 : gtid_ev->domain_id); - } - else - domain_id= 0; + Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); + uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? + 0 : gtid_ev->domain_id); if (!(e= find(domain_id))) { my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); @@ -1417,7 +1704,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, instead re-use a thread that we queued for previously. */ cur_thread= - e->choose_thread(rli, &did_enter_cond, &old_stage, typ != GTID_EVENT); + e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, + typ != GTID_EVENT); if (!cur_thread) { /* This means we were killed. The error is already signalled. */ @@ -1437,7 +1725,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); - if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e))) + if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size))) { cur_thread->free_qev(qev); abandon_worker_thread(rli->sql_driver_thd, cur_thread, @@ -1549,6 +1837,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, Queue the event for processing. */ rli->event_relay_log_pos= rli->future_event_relay_log_pos; + qev->ir= rli->last_inuse_relaylog; + ++qev->ir->queued_count; cur_thread->enqueue(qev); unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread, &did_enter_cond, &old_stage); |