diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 226 |
1 files changed, 183 insertions, 43 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 43e52c00e8d..35cddee6d4d 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -47,12 +47,9 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time; - mysql_mutex_lock(&rli->data_lock); - /* Mutex will be released in apply_event_and_update_pos(). */ - err= apply_event_and_update_pos(ev, thd, rgi, rpt); + err= apply_event_and_update_pos_for_parallel(ev, thd, rgi); - thread_safe_increment64(&rli->executed_entries, - &slave_executed_entries_lock); + thread_safe_increment64(&rli->executed_entries); /* ToDo: error handling. */ return err; } @@ -108,6 +105,25 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) } +/* + Wait for any pending deadlock kills. Since deadlock kills happen + asynchronously, we need to be sure they will be completed before starting a + new transaction. Otherwise the new transaction might suffer a spurious kill. +*/ +static void +wait_for_pending_deadlock_kill(THD *thd, rpl_group_info *rgi) +{ + PSI_stage_info old_stage; + + mysql_mutex_lock(&thd->LOCK_wakeup_ready); + thd->ENTER_COND(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready, + &stage_waiting_for_deadlock_kill, &old_stage); + while (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING) + mysql_cond_wait(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready); + thd->EXIT_COND(&old_stage); +} + + static void finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, rpl_parallel_entry *entry, rpl_group_info *rgi) @@ -213,6 +229,8 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, entry->stop_on_error_sub_id= sub_id; mysql_mutex_unlock(&entry->LOCK_parallel_entry); + if (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING) + wait_for_pending_deadlock_kill(thd, rgi); thd->clear_error(); thd->reset_killed(); /* @@ -582,7 +600,13 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) asynchroneously to allow the former to complete its commit. In this case, we convert the 'killed' error into a deadlock error, and retry - the later transaction. */ + the later transaction. + + If we are doing optimistic parallel apply of transactions not known to be + safe, we convert any error to a deadlock error, but then at retry we will + wait for prior transactions to commit first, so that the retries can be + done non-speculative. +*/ static void convert_kill_to_deadlock_error(rpl_group_info *rgi) { @@ -592,12 +616,13 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi) if (!thd->get_stmt_da()->is_error()) return; err_code= thd->get_stmt_da()->sql_errno(); - if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) && - rgi->killed_for_retry) + if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC && + err_code != ER_PRIOR_COMMIT_FAILED) || + ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) && + rgi->killed_for_retry)) { thd->clear_error(); my_error(ER_LOCK_DEADLOCK, MYF(0)); - rgi->killed_for_retry= false; thd->reset_killed(); } } @@ -688,14 +713,14 @@ do_retry: thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); DBUG_EXECUTE_IF("inject_mdev8031", { /* Simulate that we get deadlock killed at this exact point. */ - rgi->killed_for_retry= true; - mysql_mutex_lock(&thd->LOCK_thd_data); - thd->killed= KILL_CONNECTION; - mysql_mutex_unlock(&thd->LOCK_thd_data); + rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED; + thd->set_killed(KILL_CONNECTION); }); rgi->cleanup_context(thd, 1); + wait_for_pending_deadlock_kill(thd, rgi); thd->reset_killed(); thd->clear_error(); + rgi->killed_for_retry = rpl_group_info::RETRY_KILL_NONE; /* If we retry due to a deadlock kill that occurred during the commit step, we @@ -723,7 +748,10 @@ do_retry: killed by the same earlier transaction. */ if (!(err= thd->wait_for_prior_commit())) + { + rgi->speculation = rpl_group_info::SPECULATE_WAIT; break; + } convert_kill_to_deadlock_error(rgi); if (!has_temporary_error(thd)) @@ -830,11 +858,9 @@ do_retry: if (retries < 2) { /* Simulate that we get deadlock killed during open_binlog(). */ - mysql_reset_thd_for_next_command(thd); - rgi->killed_for_retry= true; - mysql_mutex_lock(&thd->LOCK_thd_data); - thd->killed= KILL_CONNECTION; - mysql_mutex_unlock(&thd->LOCK_thd_data); + thd->reset_for_next_command(); + rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED; + thd->set_killed(KILL_CONNECTION); thd->send_kill_message(); fd= (File)-1; err= 1; @@ -846,16 +872,26 @@ do_retry: err= 1; goto check_retry; } + description_event->reset_crypto(); /* Loop to try again on the new log file. */ } event_type= ev->get_type_code(); if (event_type == FORMAT_DESCRIPTION_EVENT) { + Format_description_log_event *newde= (Format_description_log_event*)ev; + newde->copy_crypto_data(description_event); delete description_event; - description_event= (Format_description_log_event *)ev; + description_event= newde; + continue; + } + else if (event_type == START_ENCRYPTION_EVENT) + { + description_event->start_decryption((Start_encryption_log_event*)ev); + delete ev; continue; - } else if (!Log_event::is_group_event(event_type)) + } + else if (!Log_event::is_group_event(event_type)) { delete ev; continue; @@ -963,7 +999,7 @@ handle_rpl_parallel_thread(void *arg) thd->security_ctx->skip_grants(); thd->variables.max_allowed_packet= slave_max_allowed_packet; thd->slave_thread= 1; - thd->enable_slow_log= opt_log_slow_slave_statements; + thd->variables.sql_log_slow= opt_log_slow_slave_statements; thd->variables.log_slow_filter= global_system_variables.log_slow_filter; set_slave_thread_options(thd); thd->client_capabilities = CLIENT_LOCAL_FILES; @@ -972,6 +1008,13 @@ handle_rpl_parallel_thread(void *arg) thd->set_time(); thd->variables.lock_wait_timeout= LONG_TIMEOUT; thd->system_thread_info.rpl_sql_info= &sql_info; + /* + We need to use (at least) REPEATABLE READ isolation level. Otherwise + speculative parallel apply can run out-of-order and give wrong results + for statement-based replication. + */ + thd->variables.tx_isolation= ISO_REPEATABLE_READ; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->thd= thd; @@ -1076,6 +1119,7 @@ handle_rpl_parallel_thread(void *arg) rpt->loc_free_rgi(group_rgi); } + thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation; in_event_group= true; /* If the standalone flag is set, then this event group consists of a @@ -1131,6 +1175,18 @@ handle_rpl_parallel_thread(void *arg) /* We have to apply the event. */ } } + /* + If we are optimistically running transactions in parallel, but this + particular event group should not run in parallel with what came + before, then wait now for the prior transaction to complete its + commit. + */ + if (rgi->speculation == rpl_group_info::SPECULATE_WAIT && + (err= thd->wait_for_prior_commit())) + { + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, 1); + } } group_rgi= rgi; @@ -1334,10 +1390,11 @@ handle_rpl_parallel_thread(void *arg) thd->reset_db(NULL, 0); thd_proc_info(thd, "Slave worker thread exiting"); thd->temporary_tables= 0; + mysql_mutex_lock(&LOCK_thread_count); - THD_CHECK_SENTRY(thd); - delete thd; + thd->unlink(); mysql_mutex_unlock(&LOCK_thread_count); + delete thd; mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->running= false; @@ -1377,6 +1434,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, uint32 new_count, bool force) { uint32 i; + rpl_parallel_thread **old_list= NULL; rpl_parallel_thread **new_list= NULL; rpl_parallel_thread *new_free_list= NULL; rpl_parallel_thread *rpt_array= NULL; @@ -1498,10 +1556,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, } } - my_free(pool->threads); + old_list= pool->threads; + if (new_count < pool->count) + pool->count= new_count; pool->threads= new_list; + if (new_count > pool->count) + pool->count= new_count; + my_free(old_list); pool->free_list= new_free_list; - pool->count= new_count; for (i= 0; i < pool->count; ++i) { mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread); @@ -1608,9 +1670,7 @@ rpl_parallel_thread::inuse_relaylog_refcount_update() inuse_relaylog *ir= accumulated_ir_last; if (ir) { - my_atomic_rwlock_wrlock(&ir->inuse_relaylog_atomic_lock); my_atomic_add64(&ir->dequeued_count, accumulated_ir_count); - my_atomic_rwlock_wrunlock(&ir->inuse_relaylog_atomic_lock); accumulated_ir_count= 0; accumulated_ir_last= NULL; } @@ -1749,7 +1809,7 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, rgi->relay_log= rli->last_inuse_relaylog; rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size; rgi->retry_event_count= 0; - rgi->killed_for_retry= false; + rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE; return rgi; } @@ -1799,6 +1859,7 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev, gco->next_gco= NULL; gco->prior_sub_id= prior_sub_id; gco->installed= false; + gco->flags= 0; return gco; } @@ -2381,6 +2442,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, bool did_enter_cond= false; PSI_stage_info old_stage; + DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE();); /* Handle master log name change, seen in Rotate_log_event. */ typ= ev->get_type_code(); if (unlikely(typ == ROTATE_EVENT)) @@ -2502,7 +2564,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { rpl_gtid gtid; Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); - uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? + uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO || + rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ? 0 : gtid_ev->domain_id); if (!(e= find(domain_id))) { @@ -2559,6 +2622,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, if (typ == GTID_EVENT) { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); + bool new_gco; + enum_slave_parallel_mode mode= rli->mi->parallel_mode; + uchar gtid_flags= gtid_ev->flags2; + group_commit_orderer *gco; + uint8 force_switch_flag; + enum rpl_group_info::enum_speculation speculation; if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size))) { @@ -2585,19 +2654,93 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, rgi->wait_commit_sub_id= e->current_sub_id; rgi->wait_commit_group_info= e->current_group_info; - if (!((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) && - e->last_commit_id == gtid_ev->commit_id)) + speculation= rpl_group_info::SPECULATE_NO; + new_gco= true; + force_switch_flag= 0; + gco= e->current_gco; + if (likely(gco)) + { + uint8 flags= gco->flags; + + if (mode <= SLAVE_PARALLEL_MINIMAL || + !(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) || + e->last_commit_id != gtid_ev->commit_id) + flags|= group_commit_orderer::MULTI_BATCH; + /* Make sure we do not attempt to run DDL in parallel speculatively. */ + if (gtid_flags & Gtid_log_event::FL_DDL) + flags|= (force_switch_flag= group_commit_orderer::FORCE_SWITCH); + + if (!(flags & group_commit_orderer::MULTI_BATCH)) + { + /* + Still the same batch of event groups that group-committed together + on the master, so we can run in parallel. + */ + new_gco= false; + } + else if ((mode >= SLAVE_PARALLEL_OPTIMISTIC) && + !(flags & group_commit_orderer::FORCE_SWITCH)) + { + /* + In transactional parallel mode, we optimistically attempt to run + non-DDL in parallel. In case of conflicts, we catch the conflict as + a deadlock or other error, roll back and retry serially. + + The assumption is that only a few event groups will be + non-transactional or otherwise unsuitable for parallel apply. Those + transactions are still scheduled in parallel, but we set a flag that + will make the worker thread wait for everything before to complete + before starting. + */ + new_gco= false; + if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL) || + ( (!(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) || + (gtid_flags & Gtid_log_event::FL_WAITED)) && + (mode < SLAVE_PARALLEL_AGGRESSIVE))) + { + /* + This transaction should not be speculatively run in parallel with + what came before, either because it cannot safely be rolled back in + case of a conflict, or because it was marked as likely to conflict + and require expensive rollback and retry. + + Here we mark it as such, and then the worker thread will do a + wait_for_prior_commit() before starting it. We do not introduce a + new group_commit_orderer, since we still want following transactions + to run in parallel with transactions prior to this one. + */ + speculation= rpl_group_info::SPECULATE_WAIT; + } + else + speculation= rpl_group_info::SPECULATE_OPTIMISTIC; + } + gco->flags= flags; + } + else + { + if (gtid_flags & Gtid_log_event::FL_DDL) + force_switch_flag= group_commit_orderer::FORCE_SWITCH; + } + rgi->speculation= speculation; + + if (gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) + e->last_commit_id= gtid_ev->commit_id; + else + e->last_commit_id= 0; + + if (new_gco) { /* - A new batch of transactions that group-committed together on the master. + Do not run this event group in parallel with what came before; instead + wait for everything prior to at least have started its commit phase, to + avoid any risk of performing any conflicting action too early. - Remember the count that marks the end of the previous group committed - batch, and allocate a new gco. + Remember the count that marks the end of the previous batch of event + groups that run in parallel, and allocate a new gco. */ uint64 count= e->count_queued_event_groups; - group_commit_orderer *gco; - if (!(gco= cur_thread->get_gco(count, e->current_gco, e->current_sub_id))) + if (!(gco= cur_thread->get_gco(count, gco, e->current_sub_id))) { cur_thread->free_rgi(rgi); cur_thread->free_qev(qev); @@ -2606,14 +2749,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, delete ev; return 1; } - e->current_gco= rgi->gco= gco; + gco->flags|= force_switch_flag; + e->current_gco= gco; } - else - rgi->gco= e->current_gco; - if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) - e->last_commit_id= gtid_ev->commit_id; - else - e->last_commit_id= 0; + rgi->gco= gco; + qev->rgi= e->current_group_info= rgi; e->current_sub_id= rgi->gtid_sub_id; ++e->count_queued_event_groups; |