diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 644 |
1 files changed, 547 insertions, 97 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 90ee2360eb7..9b91206ca75 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -4,18 +4,8 @@ #include "rpl_mi.h" #include "debug_sync.h" - /* Code for optional parallel execution of replicated events on the slave. - - ToDo list: - - - Retry of failed transactions is not yet implemented for the parallel case. - - - All the waits (eg. in struct wait_for_commit and in - rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill, - everything needs to be correctly rolled back and stopped in all threads, - to ensure a consistent slave replication state. */ struct rpl_parallel_thread_pool global_rpl_thread_pool; @@ -31,20 +21,22 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, rpl_group_info *rgi= qev->rgi; Relay_log_info *rli= rgi->rli; THD *thd= rgi->thd; + Log_event *ev; + + DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT); + ev= qev->ev; - thd->rgi_slave= rgi; thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter; + ev->thd= thd; - /* ToDo: Access to thd, and what about rli, split out a parallel part? */ - mysql_mutex_lock(&rli->data_lock); - qev->ev->thd= thd; strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name); rgi->event_relay_log_name= rgi->event_relay_log_name_buf; rgi->event_relay_log_pos= qev->event_relay_log_pos; rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); - err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); - thd->rgi_slave= NULL; + mysql_mutex_lock(&rli->data_lock); + /* Mutex will be released in apply_event_and_update_pos(). */ + err= apply_event_and_update_pos(ev, thd, rgi, rpt); thread_safe_increment64(&rli->executed_entries, &slave_executed_entries_lock); @@ -58,6 +50,8 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) { int cmp; Relay_log_info *rli; + rpl_parallel_entry *e; + /* Events that are not part of an event group, such as Format Description, Stop, GTID List and such, are executed directly in the driver SQL thread, @@ -68,6 +62,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) if ((thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions) return; + + /* Do not update position if an earlier event group caused an error abort. */ + DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE); + e= qev->entry_for_queued; + if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort) + return; + rli= qev->rgi->rli; mysql_mutex_lock(&rli->data_lock); cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name); @@ -165,6 +166,7 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, mysql_mutex_unlock(&entry->LOCK_parallel_entry); thd->clear_error(); + thd->reset_killed(); thd->get_stmt_da()->reset_diagnostics_area(); wfc->wakeup_subsequent_commits(rgi->worker_error); } @@ -197,6 +199,290 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond, } +static void +register_wait_for_prior_event_group_commit(rpl_group_info *rgi, + rpl_parallel_entry *entry) +{ + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + if (rgi->wait_commit_sub_id > entry->last_committed_sub_id) + { + /* + Register that the commit of this event group must wait for the + commit of the previous event group to complete before it may + complete itself, so that we preserve commit order. + */ + wait_for_commit *waitee= + &rgi->wait_commit_group_info->commit_orderer; + rgi->commit_orderer.register_wait_for_prior_commit(waitee); + } +} + + +#ifndef DBUG_OFF +static int +dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) +{ + if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 100 && + rgi->retry_event_count == 4) + { + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); + my_error(ER_LOCK_DEADLOCK, MYF(0)); + return 1; + } + return 0; +} +#endif + + +/* + If we detect a deadlock due to eg. storage engine locks that conflict with + the fixed commit order, then the later transaction will be killed + asynchroneously to allow the former to complete its commit. + + In this case, we convert the 'killed' error into a deadlock error, and retry + the later transaction. */ +static void +convert_kill_to_deadlock_error(rpl_group_info *rgi) +{ + THD *thd= rgi->thd; + int err_code; + + if (!thd->get_stmt_da()->is_error()) + return; + err_code= thd->get_stmt_da()->sql_errno(); + if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) && + rgi->killed_for_retry) + { + thd->clear_error(); + my_error(ER_LOCK_DEADLOCK, MYF(0)); + rgi->killed_for_retry= false; + thd->reset_killed(); + } +} + + +static bool +is_group_ending(Log_event *ev, Log_event_type event_type) +{ + return event_type == XID_EVENT || + (event_type == QUERY_EVENT && + (((Query_log_event *)ev)->is_commit() || + ((Query_log_event *)ev)->is_rollback())); +} + + +static int +retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, + rpl_parallel_thread::queued_event *orig_qev) +{ + IO_CACHE rlog; + LOG_INFO linfo; + File fd= (File)-1; + const char *errmsg= NULL; + inuse_relaylog *ir= rgi->relay_log; + uint64 event_count; + uint64 events_to_execute= rgi->retry_event_count; + Relay_log_info *rli= rgi->rli; + int err; + ulonglong cur_offset, old_offset; + char log_name[FN_REFLEN]; + THD *thd= rgi->thd; + rpl_parallel_entry *entry= rgi->parallel_entry; + ulong retries= 0; + +do_retry: + event_count= 0; + err= 0; + + /* + If we already started committing before getting the deadlock (or other + error) that caused us to need to retry, we have already signalled + subsequent transactions that we have started committing. This is + potentially a problem, as now we will rollback, and if subsequent + transactions would start to execute now, they could see an unexpected + state of the database and get eg. key not found or duplicate key error. + + However, to get a deadlock in the first place, there must have been + another earlier transaction that is waiting for us. Thus that other + transaction has _not_ yet started to commit, and any subsequent + transactions will still be waiting at this point. + + So here, we decrement back the count of transactions that started + committing (if we already incremented it), undoing the effect of an + earlier mark_start_commit(). Then later, when the retry succeeds and we + commit again, we can do a new mark_start_commit() and eventually wake up + subsequent transactions at the proper time. + + We need to do the unmark before the rollback, to be sure that the + transaction we deadlocked with will not signal that it started to commit + until after the unmark. + */ + rgi->unmark_start_commit(); + + /* + We might get the deadlock error that causes the retry during commit, while + sitting in wait_for_prior_commit(). If this happens, we will have a + pending error in the wait_for_commit object. So clear this by + unregistering (and later re-registering) the wait. + */ + if(thd->wait_for_commit_ptr) + thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); + rgi->cleanup_context(thd, 1); + + /* + If we retry due to a deadlock kill that occured during the commit step, we + might have already updated (but not committed) an update of table + mysql.gtid_slave_pos, and cleared the gtid_pending flag. Now we have + rolled back any such update, so we must set the gtid_pending flag back to + true so that we will do a new update when/if we succeed with the retry. + */ + rgi->gtid_pending= true; + + mysql_mutex_lock(&rli->data_lock); + ++rli->retried_trans; + statistic_increment(slave_retried_transactions, LOCK_status); + mysql_mutex_unlock(&rli->data_lock); + + mysql_mutex_lock(&entry->LOCK_parallel_entry); + register_wait_for_prior_event_group_commit(rgi, entry); + mysql_mutex_unlock(&entry->LOCK_parallel_entry); + + strmake_buf(log_name, ir->name); + if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) + { + err= 1; + goto err; + } + cur_offset= rgi->retry_start_offset; + my_b_seek(&rlog, cur_offset); + + do + { + Log_event_type event_type; + Log_event *ev; + rpl_parallel_thread::queued_event *qev; + + /* The loop is here so we can try again the next relay log file on EOF. */ + for (;;) + { + old_offset= cur_offset; + ev= Log_event::read_log_event(&rlog, 0, + rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */, + opt_slave_sql_verify_checksum); + cur_offset= my_b_tell(&rlog); + + if (ev) + break; + if (rlog.error < 0) + { + errmsg= "slave SQL thread aborted because of I/O error"; + err= 1; + goto err; + } + if (rlog.error > 0) + { + sql_print_error("Slave SQL thread: I/O error reading " + "event(errno: %d cur_log->error: %d)", + my_errno, rlog.error); + errmsg= "Aborting slave SQL thread because of partial event read"; + err= 1; + goto err; + } + /* EOF. Move to the next relay log. */ + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + fd= (File)-1; + + /* Find the next relay log file. */ + if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) || + (err= rli->relay_log.find_next_log(&linfo, 1))) + { + char buff[22]; + sql_print_error("next log error: %d offset: %s log: %s", + err, + llstr(linfo.index_file_offset, buff), + log_name); + goto err; + } + strmake_buf(log_name ,linfo.log_file_name); + + if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) + { + err= 1; + goto err; + } + /* Loop to try again on the new log file. */ + } + + event_type= ev->get_type_code(); + if (!Log_event::is_group_event(event_type)) + { + delete ev; + continue; + } + ev->thd= thd; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset, + cur_offset - old_offset); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + if (!qev) + { + delete ev; + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + err= 1; + goto err; + } + if (is_group_ending(ev, event_type)) + rgi->mark_start_commit(); + + err= rpt_handle_event(qev, rpt); + ++event_count; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->free_qev(qev); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + + delete_or_keep_event_post_apply(rgi, event_type, ev); + DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100", + if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd);); + DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100", + err= dbug_simulate_tmp_error(rgi, thd);); + if (err) + { + convert_kill_to_deadlock_error(rgi); + if (has_temporary_error(thd)) + { + ++retries; + if (retries < slave_trans_retries) + { + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + fd= (File)-1; + goto do_retry; + } + sql_print_error("Slave worker thread retried transaction %lu time(s) " + "in vain, giving up. Consider raising the value of " + "the slave_transaction_retries variable.", + slave_trans_retries); + } + goto err; + } + } while (event_count < events_to_execute); + +err: + + if (fd >= 0) + { + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + } + if (errmsg) + sql_print_error("Error reading relay log event: %s", errmsg); + return err; +} + + pthread_handler_t handle_rpl_parallel_thread(void *arg) { @@ -215,6 +501,8 @@ handle_rpl_parallel_thread(void *arg) rpl_sql_thread_info sql_info(NULL); size_t total_event_size; int err; + inuse_relaylog *last_ir; + uint64 accumulated_ir_count; struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; @@ -244,39 +532,6 @@ handle_rpl_parallel_thread(void *arg) thd->set_time(); thd->variables.lock_wait_timeout= LONG_TIMEOUT; thd->system_thread_info.rpl_sql_info= &sql_info; - /* - For now, we need to run the replication parallel worker threads in - READ COMMITTED. This is needed because gap locks are not symmetric. - For example, a gap lock from a DELETE blocks an insert intention lock, - but not vice versa. So an INSERT followed by DELETE can group commit - on the master, but if we are unlucky with thread scheduling we can - then deadlock on the slave because the INSERT ends up waiting for a - gap lock from the DELETE (and the DELETE in turn waits for the INSERT - in wait_for_prior_commit()). See also MDEV-5914. - - It should be mostly safe to run in READ COMMITTED in the slave anyway. - The commit order is already fixed from on the master, so we do not - risk logging into the binlog in an incorrect order between worker - threads (one that would cause different results if executed on a - lower-level slave that uses this slave as a master). The only - potential problem is with transactions run in a different master - connection (using multi-source replication), or run directly on the - slave by an application; when using READ COMMITTED we are not - guaranteed serialisability of binlogged statements. - - In practice, this is unlikely to be an issue. In GTID mode, such - parallel transactions from multi-source or application must in any - case use a different replication domain, in which case binlog order - by definition must be independent between the different domain. Even - in non-GTID mode, normally one will assume that the external - transactions are not conflicting with those applied by the slave, so - that isolation level should make no difference. It would be rather - strange if the result of applying query events from one master would - depend on the timing and nature of other queries executed from - different multi-source connections or done directly on the slave by - an application. Still, something to be aware of. - */ - thd->variables.tx_isolation= ISO_READ_COMMITTED; mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->thd= thd; @@ -323,7 +578,7 @@ handle_rpl_parallel_thread(void *arg) bool end_of_group, group_ending; total_event_size+= events->event_size; - if (!events->ev) + if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE) { handle_queued_pos_update(thd, events); events->next= qevs_to_free; @@ -331,8 +586,33 @@ handle_rpl_parallel_thread(void *arg) events= next; continue; } + else if (events->typ == + rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART) + { + if (in_event_group) + { + /* + Master restarted (crashed) in the middle of an event group. + So we need to roll back and discard that event group. + */ + group_rgi->cleanup_context(thd, 1); + in_event_group= false; + finish_event_group(thd, group_rgi->gtid_sub_id, + events->entry_for_queued, group_rgi); + + group_rgi->next= rgis_to_free; + rgis_to_free= group_rgi; + thd->rgi_slave= group_rgi= NULL; + } + + events->next= qevs_to_free; + qevs_to_free= events; + events= next; + continue; + } + DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); - group_rgi= rgi; + thd->rgi_slave= group_rgi= rgi; gco= rgi->gco; /* Handle a new event group, which will be initiated by a GTID event. */ if ((event_type= events->ev->get_type_code()) == GTID_EVENT) @@ -341,7 +621,6 @@ handle_rpl_parallel_thread(void *arg) PSI_stage_info old_stage; uint64 wait_count; - thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation; in_event_group= true; /* If the standalone flag is set, then this event group consists of a @@ -352,9 +631,7 @@ handle_rpl_parallel_thread(void *arg) (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 & Gtid_log_event::FL_STANDALONE)); - /* Save this, as it gets cleared when the event group commits. */ event_gtid_sub_id= rgi->gtid_sub_id; - rgi->thd= thd; /* @@ -388,7 +665,7 @@ handle_rpl_parallel_thread(void *arg) { DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); thd->send_kill_message(); - slave_output_error_info(rgi->rli, thd); + slave_output_error_info(rgi, thd); signal_error_to_sql_driver_thread(thd, rgi, 1); /* Even though we were killed, we need to continue waiting for the @@ -430,17 +707,9 @@ handle_rpl_parallel_thread(void *arg) if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) skip_event_group= true; - else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id) - { - /* - Register that the commit of this event group must wait for the - commit of the previous event group to complete before it may - complete itself, so that we preserve commit order. - */ - wait_for_commit *waitee= - &rgi->wait_commit_group_info->commit_orderer; - rgi->commit_orderer.register_wait_for_prior_commit(waitee); - } + else + register_wait_for_prior_event_group_commit(rgi, entry); + unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, &did_enter_cond, &old_stage); @@ -467,7 +736,7 @@ handle_rpl_parallel_thread(void *arg) if (res < 0) { /* Error. */ - slave_output_error_info(rgi->rli, thd); + slave_output_error_info(rgi, thd); signal_error_to_sql_driver_thread(thd, rgi, 1); } else if (!res) @@ -482,11 +751,8 @@ handle_rpl_parallel_thread(void *arg) } } - group_ending= event_type == XID_EVENT || - (event_type == QUERY_EVENT && - (((Query_log_event *)events->ev)->is_commit() || - ((Query_log_event *)events->ev)->is_rollback())); - if (group_ending) + group_ending= is_group_ending(events->ev, event_type); + if (group_ending && likely(!rgi->worker_error)) { DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit"); rgi->mark_start_commit(); @@ -498,24 +764,42 @@ handle_rpl_parallel_thread(void *arg) processing between the event groups as a simple way to ensure that everything is stopped and cleaned up correctly. */ - if (!rgi->worker_error && !skip_event_group) + if (likely(!rgi->worker_error) && !skip_event_group) + { + ++rgi->retry_event_count; err= rpt_handle_event(events, rpt); + delete_or_keep_event_post_apply(rgi, event_type, events->ev); + DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100", + err= dbug_simulate_tmp_error(rgi, thd);); + if (err) + { + convert_kill_to_deadlock_error(rgi); + if (has_temporary_error(thd) && slave_trans_retries > 0) + err= retry_event_group(rgi, rpt, events); + } + } else + { + delete events->ev; err= thd->wait_for_prior_commit(); + } end_of_group= in_event_group && ((group_standalone && !Log_event::is_part_of_group(event_type)) || group_ending); - delete_or_keep_event_post_apply(rgi, event_type, events->ev); events->next= qevs_to_free; qevs_to_free= events; - if (unlikely(err) && !rgi->worker_error) + if (unlikely(err)) { - slave_output_error_info(rgi->rli, thd); - signal_error_to_sql_driver_thread(thd, rgi, err); + if (!rgi->worker_error) + { + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, err); + } + thd->reset_killed(); } if (end_of_group) { @@ -523,7 +807,7 @@ handle_rpl_parallel_thread(void *arg) finish_event_group(thd, event_gtid_sub_id, entry, rgi); rgi->next= rgis_to_free; rgis_to_free= rgi; - group_rgi= rgi= NULL; + thd->rgi_slave= group_rgi= rgi= NULL; skip_event_group= false; DEBUG_SYNC(thd, "rpl_parallel_end_of_group"); } @@ -548,12 +832,34 @@ handle_rpl_parallel_thread(void *arg) rpt->free_rgi(rgis_to_free); rgis_to_free= next; } + last_ir= NULL; + accumulated_ir_count= 0; while (qevs_to_free) { rpl_parallel_thread::queued_event *next= qevs_to_free->next; + inuse_relaylog *ir= qevs_to_free->ir; + /* Batch up refcount update to reduce use of synchronised operations. */ + if (last_ir != ir) + { + if (last_ir) + { + my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock); + my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); + my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock); + accumulated_ir_count= 0; + } + last_ir= ir; + } + ++accumulated_ir_count; rpt->free_qev(qevs_to_free); qevs_to_free= next; } + if (last_ir) + { + my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock); + my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); + my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock); + } if ((events= rpt->event_queue) != NULL) { @@ -584,7 +890,7 @@ handle_rpl_parallel_thread(void *arg) in_event_group= false; mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->free_rgi(group_rgi); - group_rgi= NULL; + thd->rgi_slave= group_rgi= NULL; skip_event_group= false; } if (!in_event_group) @@ -802,8 +1108,7 @@ err: rpl_parallel_thread::queued_event * -rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, - Relay_log_info *rli) +rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size) { queued_event *qev; mysql_mutex_assert_owner(&LOCK_rpl_thread); @@ -814,9 +1119,21 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev)); return NULL; } + qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT; qev->ev= ev; qev->event_size= event_size; qev->next= NULL; + return qev; +} + + +rpl_parallel_thread::queued_event * +rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, + Relay_log_info *rli) +{ + queued_event *qev= get_qev_common(ev, event_size); + if (!qev) + return NULL; strcpy(qev->event_relay_log_name, rli->event_relay_log_name); qev->event_relay_log_pos= rli->event_relay_log_pos; qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; @@ -825,6 +1142,24 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, } +rpl_parallel_thread::queued_event * +rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev, + const char *relay_log_name, + ulonglong event_pos, ulonglong event_size) +{ + queued_event *qev= get_qev_common(ev, event_size); + if (!qev) + return NULL; + qev->rgi= orig_qev->rgi; + strcpy(qev->event_relay_log_name, relay_log_name); + qev->event_relay_log_pos= event_pos; + qev->future_event_relay_log_pos= event_pos+event_size; + strcpy(qev->future_event_master_log_name, + orig_qev->future_event_master_log_name); + return qev; +} + + void rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) { @@ -836,7 +1171,7 @@ rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) rpl_group_info* rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, - rpl_parallel_entry *e) + rpl_parallel_entry *e, ulonglong event_size) { rpl_group_info *rgi; mysql_mutex_assert_owner(&LOCK_rpl_thread); @@ -864,6 +1199,10 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, return NULL; } rgi->parallel_entry= e; + rgi->relay_log= rli->last_inuse_relaylog; + rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size; + rgi->retry_event_count= 0; + rgi->killed_for_retry= false; return rgi; } @@ -1018,10 +1357,11 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt) if it is still available. Otherwise a new worker thread is allocated. */ rpl_parallel_thread * -rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond, +rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage, bool reuse) { uint32 idx; + Relay_log_info *rli= rgi->rli; rpl_parallel_thread *thr; idx= rpl_thread_idx; @@ -1066,7 +1406,7 @@ rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond, debug_sync_set_action(rli->sql_driver_thd, STRING_WITH_LEN("now SIGNAL wait_queue_killed")); };); - slave_output_error_info(rli, rli->sql_driver_thd); + slave_output_error_info(rgi, rli->sql_driver_thd); return NULL; } else @@ -1300,6 +1640,91 @@ rpl_parallel::workers_idle() } +int +rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, + Format_description_log_event *fdev) +{ + uint32 idx; + rpl_parallel_thread *thr; + rpl_parallel_thread::queued_event *qev; + Relay_log_info *rli= rgi->rli; + + /* + We only need to queue the server restart if we still have a thread working + on a (potentially partial) event group. + + If the last thread we queued for has finished, then it cannot have any + partial event group that needs aborting. + + Thus there is no need for the full complexity of choose_thread(). We only + need to check if we have a current worker thread, and queue for it if so. + */ + idx= rpl_thread_idx; + thr= rpl_threads[idx]; + if (!thr) + return 0; + mysql_mutex_lock(&thr->LOCK_rpl_thread); + if (thr->current_owner != &rpl_threads[idx]) + { + /* No active worker thread, so no need to queue the master restart. */ + mysql_mutex_unlock(&thr->LOCK_rpl_thread); + return 0; + } + + if (!(qev= thr->get_qev(fdev, 0, rli))) + { + mysql_mutex_unlock(&thr->LOCK_rpl_thread); + return 1; + } + + qev->rgi= rgi; + qev->typ= rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART; + qev->entry_for_queued= this; + qev->ir= rli->last_inuse_relaylog; + ++qev->ir->queued_count; + thr->enqueue(qev); + mysql_mutex_unlock(&thr->LOCK_rpl_thread); + return 0; +} + + +int +rpl_parallel::wait_for_workers_idle(THD *thd) +{ + uint32 i, max_i; + + /* + The domain_hash is only accessed by the SQL driver thread, so it is safe + to iterate over without a lock. + */ + max_i= domain_hash.records; + for (i= 0; i < max_i; ++i) + { + bool active; + wait_for_commit my_orderer; + struct rpl_parallel_entry *e; + + e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); + mysql_mutex_lock(&e->LOCK_parallel_entry); + if ((active= (e->current_sub_id > e->last_committed_sub_id))) + { + wait_for_commit *waitee= &e->current_group_info->commit_orderer; + my_orderer.register_wait_for_prior_commit(waitee); + thd->wait_for_commit_ptr= &my_orderer; + } + mysql_mutex_unlock(&e->LOCK_parallel_entry); + if (active) + { + int err= my_orderer.wait_for_prior_commit(thd); + thd->wait_for_commit_ptr= NULL; + if (err) + return err; + } + } + return 0; +} + + /* This is used when we get an error during processing in do_event(); We will not queue any event to the thread, but we still need to wake it up @@ -1367,6 +1792,33 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); + if (typ == FORMAT_DESCRIPTION_EVENT) + { + Format_description_log_event *fdev= + static_cast<Format_description_log_event *>(ev); + if (fdev->created) + { + /* + This format description event marks a new binlog after a master server + restart. We are going to close all temporary tables to clean up any + possible left-overs after a prior master crash. + + Thus we need to wait for all prior events to execute to completion, + in case they need access to any of the temporary tables. + + We also need to notify the worker thread running the prior incomplete + event group (if any), as such event group signifies an incompletely + written group cut short by a master crash, and must be rolled back. + */ + if (current->queue_master_restart(serial_rgi, fdev) || + wait_for_workers_idle(rli->sql_driver_thd)) + { + delete ev; + return 1; + } + } + } + /* Stop queueing additional event groups once the SQL thread is requested to stop. @@ -1390,15 +1842,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, if (typ == GTID_EVENT) { - uint32 domain_id; - if (likely(typ == GTID_EVENT)) - { - Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); - domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? - 0 : gtid_ev->domain_id); - } - else - domain_id= 0; + Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); + uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? + 0 : gtid_ev->domain_id); if (!(e= find(domain_id))) { my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); @@ -1417,7 +1863,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, instead re-use a thread that we queued for previously. */ cur_thread= - e->choose_thread(rli, &did_enter_cond, &old_stage, typ != GTID_EVENT); + e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, + typ != GTID_EVENT); if (!cur_thread) { /* This means we were killed. The error is already signalled. */ @@ -1437,7 +1884,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); - if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e))) + if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size))) { cur_thread->free_qev(qev); abandon_worker_thread(rli->sql_driver_thd, cur_thread, @@ -1527,7 +1974,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return 1; } /* - Queue an empty event, so that the position will be updated in a + Queue a position update, so that the position will be updated in a reasonable way relative to other events: - If the currently executing events are queued serially for a single @@ -1538,7 +1985,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, least the position will not be updated until one of them has reached the current point. */ - qev->ev= NULL; + qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE; + qev->entry_for_queued= e; } else { @@ -1549,6 +1997,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, Queue the event for processing. */ rli->event_relay_log_pos= rli->future_event_relay_log_pos; + qev->ir= rli->last_inuse_relaylog; + ++qev->ir->queued_count; cur_thread->enqueue(qev); unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread, &did_enter_cond, &old_stage); |