summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc226
1 files changed, 183 insertions, 43 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 43e52c00e8d..35cddee6d4d 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -47,12 +47,9 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
if (!(ev->is_artificial_event() || ev->is_relay_log_event() ||
(ev->when == 0)))
rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time;
- mysql_mutex_lock(&rli->data_lock);
- /* Mutex will be released in apply_event_and_update_pos(). */
- err= apply_event_and_update_pos(ev, thd, rgi, rpt);
+ err= apply_event_and_update_pos_for_parallel(ev, thd, rgi);
- thread_safe_increment64(&rli->executed_entries,
- &slave_executed_entries_lock);
+ thread_safe_increment64(&rli->executed_entries);
/* ToDo: error handling. */
return err;
}
@@ -108,6 +105,25 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
}
+/*
+ Wait for any pending deadlock kills. Since deadlock kills happen
+ asynchronously, we need to be sure they will be completed before starting a
+ new transaction. Otherwise the new transaction might suffer a spurious kill.
+*/
+static void
+wait_for_pending_deadlock_kill(THD *thd, rpl_group_info *rgi)
+{
+ PSI_stage_info old_stage;
+
+ mysql_mutex_lock(&thd->LOCK_wakeup_ready);
+ thd->ENTER_COND(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready,
+ &stage_waiting_for_deadlock_kill, &old_stage);
+ while (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
+ mysql_cond_wait(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready);
+ thd->EXIT_COND(&old_stage);
+}
+
+
static void
finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
rpl_parallel_entry *entry, rpl_group_info *rgi)
@@ -213,6 +229,8 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
entry->stop_on_error_sub_id= sub_id;
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ if (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
+ wait_for_pending_deadlock_kill(thd, rgi);
thd->clear_error();
thd->reset_killed();
/*
@@ -582,7 +600,13 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
asynchroneously to allow the former to complete its commit.
In this case, we convert the 'killed' error into a deadlock error, and retry
- the later transaction. */
+ the later transaction.
+
+ If we are doing optimistic parallel apply of transactions not known to be
+ safe, we convert any error to a deadlock error, but then at retry we will
+ wait for prior transactions to commit first, so that the retries can be
+ done non-speculative.
+*/
static void
convert_kill_to_deadlock_error(rpl_group_info *rgi)
{
@@ -592,12 +616,13 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi)
if (!thd->get_stmt_da()->is_error())
return;
err_code= thd->get_stmt_da()->sql_errno();
- if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
- rgi->killed_for_retry)
+ if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC &&
+ err_code != ER_PRIOR_COMMIT_FAILED) ||
+ ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
+ rgi->killed_for_retry))
{
thd->clear_error();
my_error(ER_LOCK_DEADLOCK, MYF(0));
- rgi->killed_for_retry= false;
thd->reset_killed();
}
}
@@ -688,14 +713,14 @@ do_retry:
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
DBUG_EXECUTE_IF("inject_mdev8031", {
/* Simulate that we get deadlock killed at this exact point. */
- rgi->killed_for_retry= true;
- mysql_mutex_lock(&thd->LOCK_thd_data);
- thd->killed= KILL_CONNECTION;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
+ rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
+ thd->set_killed(KILL_CONNECTION);
});
rgi->cleanup_context(thd, 1);
+ wait_for_pending_deadlock_kill(thd, rgi);
thd->reset_killed();
thd->clear_error();
+ rgi->killed_for_retry = rpl_group_info::RETRY_KILL_NONE;
/*
If we retry due to a deadlock kill that occurred during the commit step, we
@@ -723,7 +748,10 @@ do_retry:
killed by the same earlier transaction.
*/
if (!(err= thd->wait_for_prior_commit()))
+ {
+ rgi->speculation = rpl_group_info::SPECULATE_WAIT;
break;
+ }
convert_kill_to_deadlock_error(rgi);
if (!has_temporary_error(thd))
@@ -830,11 +858,9 @@ do_retry:
if (retries < 2)
{
/* Simulate that we get deadlock killed during open_binlog(). */
- mysql_reset_thd_for_next_command(thd);
- rgi->killed_for_retry= true;
- mysql_mutex_lock(&thd->LOCK_thd_data);
- thd->killed= KILL_CONNECTION;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
+ thd->reset_for_next_command();
+ rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
+ thd->set_killed(KILL_CONNECTION);
thd->send_kill_message();
fd= (File)-1;
err= 1;
@@ -846,16 +872,26 @@ do_retry:
err= 1;
goto check_retry;
}
+ description_event->reset_crypto();
/* Loop to try again on the new log file. */
}
event_type= ev->get_type_code();
if (event_type == FORMAT_DESCRIPTION_EVENT)
{
+ Format_description_log_event *newde= (Format_description_log_event*)ev;
+ newde->copy_crypto_data(description_event);
delete description_event;
- description_event= (Format_description_log_event *)ev;
+ description_event= newde;
+ continue;
+ }
+ else if (event_type == START_ENCRYPTION_EVENT)
+ {
+ description_event->start_decryption((Start_encryption_log_event*)ev);
+ delete ev;
continue;
- } else if (!Log_event::is_group_event(event_type))
+ }
+ else if (!Log_event::is_group_event(event_type))
{
delete ev;
continue;
@@ -963,7 +999,7 @@ handle_rpl_parallel_thread(void *arg)
thd->security_ctx->skip_grants();
thd->variables.max_allowed_packet= slave_max_allowed_packet;
thd->slave_thread= 1;
- thd->enable_slow_log= opt_log_slow_slave_statements;
+ thd->variables.sql_log_slow= opt_log_slow_slave_statements;
thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
set_slave_thread_options(thd);
thd->client_capabilities = CLIENT_LOCAL_FILES;
@@ -972,6 +1008,13 @@ handle_rpl_parallel_thread(void *arg)
thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
thd->system_thread_info.rpl_sql_info= &sql_info;
+ /*
+ We need to use (at least) REPEATABLE READ isolation level. Otherwise
+ speculative parallel apply can run out-of-order and give wrong results
+ for statement-based replication.
+ */
+ thd->variables.tx_isolation= ISO_REPEATABLE_READ;
+
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->thd= thd;
@@ -1076,6 +1119,7 @@ handle_rpl_parallel_thread(void *arg)
rpt->loc_free_rgi(group_rgi);
}
+ thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation;
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -1131,6 +1175,18 @@ handle_rpl_parallel_thread(void *arg)
/* We have to apply the event. */
}
}
+ /*
+ If we are optimistically running transactions in parallel, but this
+ particular event group should not run in parallel with what came
+ before, then wait now for the prior transaction to complete its
+ commit.
+ */
+ if (rgi->speculation == rpl_group_info::SPECULATE_WAIT &&
+ (err= thd->wait_for_prior_commit()))
+ {
+ slave_output_error_info(rgi, thd);
+ signal_error_to_sql_driver_thread(thd, rgi, 1);
+ }
}
group_rgi= rgi;
@@ -1334,10 +1390,11 @@ handle_rpl_parallel_thread(void *arg)
thd->reset_db(NULL, 0);
thd_proc_info(thd, "Slave worker thread exiting");
thd->temporary_tables= 0;
+
mysql_mutex_lock(&LOCK_thread_count);
- THD_CHECK_SENTRY(thd);
- delete thd;
+ thd->unlink();
mysql_mutex_unlock(&LOCK_thread_count);
+ delete thd;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->running= false;
@@ -1377,6 +1434,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
uint32 new_count, bool force)
{
uint32 i;
+ rpl_parallel_thread **old_list= NULL;
rpl_parallel_thread **new_list= NULL;
rpl_parallel_thread *new_free_list= NULL;
rpl_parallel_thread *rpt_array= NULL;
@@ -1498,10 +1556,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
}
}
- my_free(pool->threads);
+ old_list= pool->threads;
+ if (new_count < pool->count)
+ pool->count= new_count;
pool->threads= new_list;
+ if (new_count > pool->count)
+ pool->count= new_count;
+ my_free(old_list);
pool->free_list= new_free_list;
- pool->count= new_count;
for (i= 0; i < pool->count; ++i)
{
mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread);
@@ -1608,9 +1670,7 @@ rpl_parallel_thread::inuse_relaylog_refcount_update()
inuse_relaylog *ir= accumulated_ir_last;
if (ir)
{
- my_atomic_rwlock_wrlock(&ir->inuse_relaylog_atomic_lock);
my_atomic_add64(&ir->dequeued_count, accumulated_ir_count);
- my_atomic_rwlock_wrunlock(&ir->inuse_relaylog_atomic_lock);
accumulated_ir_count= 0;
accumulated_ir_last= NULL;
}
@@ -1749,7 +1809,7 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rgi->relay_log= rli->last_inuse_relaylog;
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
- rgi->killed_for_retry= false;
+ rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE;
return rgi;
}
@@ -1799,6 +1859,7 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev,
gco->next_gco= NULL;
gco->prior_sub_id= prior_sub_id;
gco->installed= false;
+ gco->flags= 0;
return gco;
}
@@ -2381,6 +2442,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
bool did_enter_cond= false;
PSI_stage_info old_stage;
+ DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE(););
/* Handle master log name change, seen in Rotate_log_event. */
typ= ev->get_type_code();
if (unlikely(typ == ROTATE_EVENT))
@@ -2502,7 +2564,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
rpl_gtid gtid;
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
+ uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
+ rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
0 : gtid_ev->domain_id);
if (!(e= find(domain_id)))
{
@@ -2559,6 +2622,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (typ == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+ bool new_gco;
+ enum_slave_parallel_mode mode= rli->mi->parallel_mode;
+ uchar gtid_flags= gtid_ev->flags2;
+ group_commit_orderer *gco;
+ uint8 force_switch_flag;
+ enum rpl_group_info::enum_speculation speculation;
if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
{
@@ -2585,19 +2654,93 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
rgi->wait_commit_sub_id= e->current_sub_id;
rgi->wait_commit_group_info= e->current_group_info;
- if (!((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
- e->last_commit_id == gtid_ev->commit_id))
+ speculation= rpl_group_info::SPECULATE_NO;
+ new_gco= true;
+ force_switch_flag= 0;
+ gco= e->current_gco;
+ if (likely(gco))
+ {
+ uint8 flags= gco->flags;
+
+ if (mode <= SLAVE_PARALLEL_MINIMAL ||
+ !(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) ||
+ e->last_commit_id != gtid_ev->commit_id)
+ flags|= group_commit_orderer::MULTI_BATCH;
+ /* Make sure we do not attempt to run DDL in parallel speculatively. */
+ if (gtid_flags & Gtid_log_event::FL_DDL)
+ flags|= (force_switch_flag= group_commit_orderer::FORCE_SWITCH);
+
+ if (!(flags & group_commit_orderer::MULTI_BATCH))
+ {
+ /*
+ Still the same batch of event groups that group-committed together
+ on the master, so we can run in parallel.
+ */
+ new_gco= false;
+ }
+ else if ((mode >= SLAVE_PARALLEL_OPTIMISTIC) &&
+ !(flags & group_commit_orderer::FORCE_SWITCH))
+ {
+ /*
+ In transactional parallel mode, we optimistically attempt to run
+ non-DDL in parallel. In case of conflicts, we catch the conflict as
+ a deadlock or other error, roll back and retry serially.
+
+ The assumption is that only a few event groups will be
+ non-transactional or otherwise unsuitable for parallel apply. Those
+ transactions are still scheduled in parallel, but we set a flag that
+ will make the worker thread wait for everything before to complete
+ before starting.
+ */
+ new_gco= false;
+ if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL) ||
+ ( (!(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) ||
+ (gtid_flags & Gtid_log_event::FL_WAITED)) &&
+ (mode < SLAVE_PARALLEL_AGGRESSIVE)))
+ {
+ /*
+ This transaction should not be speculatively run in parallel with
+ what came before, either because it cannot safely be rolled back in
+ case of a conflict, or because it was marked as likely to conflict
+ and require expensive rollback and retry.
+
+ Here we mark it as such, and then the worker thread will do a
+ wait_for_prior_commit() before starting it. We do not introduce a
+ new group_commit_orderer, since we still want following transactions
+ to run in parallel with transactions prior to this one.
+ */
+ speculation= rpl_group_info::SPECULATE_WAIT;
+ }
+ else
+ speculation= rpl_group_info::SPECULATE_OPTIMISTIC;
+ }
+ gco->flags= flags;
+ }
+ else
+ {
+ if (gtid_flags & Gtid_log_event::FL_DDL)
+ force_switch_flag= group_commit_orderer::FORCE_SWITCH;
+ }
+ rgi->speculation= speculation;
+
+ if (gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID)
+ e->last_commit_id= gtid_ev->commit_id;
+ else
+ e->last_commit_id= 0;
+
+ if (new_gco)
{
/*
- A new batch of transactions that group-committed together on the master.
+ Do not run this event group in parallel with what came before; instead
+ wait for everything prior to at least have started its commit phase, to
+ avoid any risk of performing any conflicting action too early.
- Remember the count that marks the end of the previous group committed
- batch, and allocate a new gco.
+ Remember the count that marks the end of the previous batch of event
+ groups that run in parallel, and allocate a new gco.
*/
uint64 count= e->count_queued_event_groups;
- group_commit_orderer *gco;
- if (!(gco= cur_thread->get_gco(count, e->current_gco, e->current_sub_id)))
+ if (!(gco= cur_thread->get_gco(count, gco, e->current_sub_id)))
{
cur_thread->free_rgi(rgi);
cur_thread->free_qev(qev);
@@ -2606,14 +2749,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
delete ev;
return 1;
}
- e->current_gco= rgi->gco= gco;
+ gco->flags|= force_switch_flag;
+ e->current_gco= gco;
}
- else
- rgi->gco= e->current_gco;
- if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID)
- e->last_commit_id= gtid_ev->commit_id;
- else
- e->last_commit_id= 0;
+ rgi->gco= gco;
+
qev->rgi= e->current_group_info= rgi;
e->current_sub_id= rgi->gtid_sub_id;
++e->count_queued_event_groups;