summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc276
1 files changed, 181 insertions, 95 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 9b91206ca75..2050ccdb3d7 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -8,6 +8,15 @@
Code for optional parallel execution of replicated events on the slave.
*/
+
+/*
+ Maximum number of queued events to accumulate in a local free list, before
+ moving them to the global free list. There is additional a limit of how much
+ to accumulate based on opt_slave_parallel_max_queued.
+*/
+#define QEV_BATCH_FREE 200
+
+
struct rpl_parallel_thread_pool global_rpl_thread_pool;
static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
@@ -290,6 +299,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
THD *thd= rgi->thd;
rpl_parallel_entry *entry= rgi->parallel_entry;
ulong retries= 0;
+ Format_description_log_event *description_event= NULL;
do_retry:
event_count= 0;
@@ -355,6 +365,14 @@ do_retry:
goto err;
}
cur_offset= rgi->retry_start_offset;
+ delete description_event;
+ description_event=
+ read_relay_log_description_event(&rlog, cur_offset, &errmsg);
+ if (!description_event)
+ {
+ err= 1;
+ goto err;
+ }
my_b_seek(&rlog, cur_offset);
do
@@ -367,8 +385,7 @@ do_retry:
for (;;)
{
old_offset= cur_offset;
- ev= Log_event::read_log_event(&rlog, 0,
- rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
+ ev= Log_event::read_log_event(&rlog, 0, description_event,
opt_slave_sql_verify_checksum);
cur_offset= my_b_tell(&rlog);
@@ -416,7 +433,12 @@ do_retry:
}
event_type= ev->get_type_code();
- if (!Log_event::is_group_event(event_type))
+ if (event_type == FORMAT_DESCRIPTION_EVENT)
+ {
+ delete description_event;
+ description_event= (Format_description_log_event *)ev;
+ continue;
+ } else if (!Log_event::is_group_event(event_type))
{
delete ev;
continue;
@@ -424,7 +446,7 @@ do_retry:
ev->thd= thd;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
+ qev= rpt->retry_get_qev(ev, orig_qev, log_name, old_offset,
cur_offset - old_offset);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
if (!qev)
@@ -472,6 +494,8 @@ do_retry:
err:
+ if (description_event)
+ delete description_event;
if (fd >= 0)
{
end_io_cache(&rlog);
@@ -495,14 +519,8 @@ handle_rpl_parallel_thread(void *arg)
rpl_group_info *group_rgi= NULL;
group_commit_orderer *gco, *tmp_gco;
uint64 event_gtid_sub_id= 0;
- rpl_parallel_thread::queued_event *qevs_to_free;
- rpl_group_info *rgis_to_free;
- group_commit_orderer *gcos_to_free;
rpl_sql_thread_info sql_info(NULL);
- size_t total_event_size;
int err;
- inuse_relaylog *last_ir;
- uint64 accumulated_ir_count;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@@ -544,6 +562,8 @@ handle_rpl_parallel_thread(void *arg)
while (!rpt->stop)
{
+ rpl_parallel_thread::queued_event *qev, *next_qev;
+
thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
&stage_waiting_for_work_from_sql_thread, &old_stage);
/*
@@ -565,28 +585,21 @@ handle_rpl_parallel_thread(void *arg)
thd->EXIT_COND(&old_stage);
more_events:
- qevs_to_free= NULL;
- rgis_to_free= NULL;
- gcos_to_free= NULL;
- total_event_size= 0;
- while (events)
+ for (qev= events; qev; qev= next_qev)
{
- struct rpl_parallel_thread::queued_event *next= events->next;
Log_event_type event_type;
- rpl_group_info *rgi= events->rgi;
+ rpl_group_info *rgi= qev->rgi;
rpl_parallel_entry *entry= rgi->parallel_entry;
bool end_of_group, group_ending;
- total_event_size+= events->event_size;
- if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
+ next_qev= qev->next;
+ if (qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
{
- handle_queued_pos_update(thd, events);
- events->next= qevs_to_free;
- qevs_to_free= events;
- events= next;
+ handle_queued_pos_update(thd, qev);
+ rpt->loc_free_qev(qev);
continue;
}
- else if (events->typ ==
+ else if (qev->typ ==
rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART)
{
if (in_event_group)
@@ -598,29 +611,34 @@ handle_rpl_parallel_thread(void *arg)
group_rgi->cleanup_context(thd, 1);
in_event_group= false;
finish_event_group(thd, group_rgi->gtid_sub_id,
- events->entry_for_queued, group_rgi);
+ qev->entry_for_queued, group_rgi);
- group_rgi->next= rgis_to_free;
- rgis_to_free= group_rgi;
+ rpt->loc_free_rgi(group_rgi);
thd->rgi_slave= group_rgi= NULL;
}
- events->next= qevs_to_free;
- qevs_to_free= events;
- events= next;
+ rpt->loc_free_qev(qev);
continue;
}
- DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
+ DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
thd->rgi_slave= group_rgi= rgi;
gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
- if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
+ if ((event_type= qev->ev->get_type_code()) == GTID_EVENT)
{
bool did_enter_cond= false;
PSI_stage_info old_stage;
uint64 wait_count;
+ DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", {
+ if (rgi->current_gtid.domain_id == 0 &&
+ rgi->current_gtid.seq_no == 100) {
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now SIGNAL scheduled_gtid_0_x_100"));
+ }
+ });
+
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -628,7 +646,7 @@ handle_rpl_parallel_thread(void *arg)
similar), without any terminating COMMIT/ROLLBACK/XID.
*/
group_standalone=
- (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
+ (0 != (static_cast<Gtid_log_event *>(qev->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
event_gtid_sub_id= rgi->gtid_sub_id;
@@ -656,7 +674,7 @@ handle_rpl_parallel_thread(void *arg)
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
thd->ENTER_COND(&gco->COND_group_commit_orderer,
&entry->LOCK_parallel_entry,
- &stage_waiting_for_prior_transaction_to_commit,
+ &stage_waiting_for_prior_transaction_to_start_commit,
&old_stage);
did_enter_cond= true;
do
@@ -689,8 +707,7 @@ handle_rpl_parallel_thread(void *arg)
*/
DBUG_ASSERT(!tmp_gco->prev_gco);
gco->prev_gco= NULL;
- tmp_gco->next_gco= gcos_to_free;
- gcos_to_free= tmp_gco;
+ rpt->loc_free_gco(tmp_gco);
}
if (entry->force_abort && wait_count > entry->stop_count)
@@ -751,7 +768,7 @@ handle_rpl_parallel_thread(void *arg)
}
}
- group_ending= is_group_ending(events->ev, event_type);
+ group_ending= is_group_ending(qev->ev, event_type);
if (group_ending && likely(!rgi->worker_error))
{
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
@@ -767,20 +784,32 @@ handle_rpl_parallel_thread(void *arg)
if (likely(!rgi->worker_error) && !skip_event_group)
{
++rgi->retry_event_count;
- err= rpt_handle_event(events, rpt);
- delete_or_keep_event_post_apply(rgi, event_type, events->ev);
+#ifndef DBUG_OFF
+ err= 0;
+ DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_xid",
+ if (event_type == XID_EVENT)
+ {
+ thd->clear_error();
+ thd->get_stmt_da()->reset_diagnostics_area();
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ err= 1;
+ });
+ if (!err)
+#endif
+ err= rpt_handle_event(qev, rpt);
+ delete_or_keep_event_post_apply(rgi, event_type, qev->ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
err= dbug_simulate_tmp_error(rgi, thd););
if (err)
{
convert_kill_to_deadlock_error(rgi);
if (has_temporary_error(thd) && slave_trans_retries > 0)
- err= retry_event_group(rgi, rpt, events);
+ err= retry_event_group(rgi, rpt, qev);
}
}
else
{
- delete events->ev;
+ delete qev->ev;
err= thd->wait_for_prior_commit();
}
@@ -789,8 +818,7 @@ handle_rpl_parallel_thread(void *arg)
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
group_ending);
- events->next= qevs_to_free;
- qevs_to_free= events;
+ rpt->loc_free_qev(qev);
if (unlikely(err))
{
@@ -805,61 +833,20 @@ handle_rpl_parallel_thread(void *arg)
{
in_event_group= false;
finish_event_group(thd, event_gtid_sub_id, entry, rgi);
- rgi->next= rgis_to_free;
- rgis_to_free= rgi;
+ rpt->loc_free_rgi(rgi);
thd->rgi_slave= group_rgi= rgi= NULL;
skip_event_group= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
-
- events= next;
}
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- /* Signal that our queue can now accept more events. */
- rpt->dequeue2(total_event_size);
- mysql_cond_signal(&rpt->COND_rpl_thread_queue);
- /* We need to delay the free here, to when we have the lock. */
- while (gcos_to_free)
- {
- group_commit_orderer *next= gcos_to_free->next_gco;
- rpt->free_gco(gcos_to_free);
- gcos_to_free= next;
- }
- while (rgis_to_free)
- {
- rpl_group_info *next= rgis_to_free->next;
- rpt->free_rgi(rgis_to_free);
- rgis_to_free= next;
- }
- last_ir= NULL;
- accumulated_ir_count= 0;
- while (qevs_to_free)
- {
- rpl_parallel_thread::queued_event *next= qevs_to_free->next;
- inuse_relaylog *ir= qevs_to_free->ir;
- /* Batch up refcount update to reduce use of synchronised operations. */
- if (last_ir != ir)
- {
- if (last_ir)
- {
- my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock);
- my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
- my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock);
- accumulated_ir_count= 0;
- }
- last_ir= ir;
- }
- ++accumulated_ir_count;
- rpt->free_qev(qevs_to_free);
- qevs_to_free= next;
- }
- if (last_ir)
- {
- my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock);
- my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
- my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock);
- }
+ /*
+ Now that we have the lock, we can move everything from our local free
+ lists to the real free lists that are also accessible from the SQL
+ driver thread.
+ */
+ rpt->batch_free();
if ((events= rpt->event_queue) != NULL)
{
@@ -872,6 +859,7 @@ handle_rpl_parallel_thread(void *arg)
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
goto more_events;
}
+ rpt->inuse_relaylog_refcount_update();
if (in_event_group && group_rgi->parallel_entry->force_abort)
{
@@ -1107,6 +1095,51 @@ err:
}
+void
+rpl_parallel_thread::batch_free()
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if (loc_qev_list)
+ {
+ *loc_qev_last_ptr_ptr= qev_free_list;
+ qev_free_list= loc_qev_list;
+ loc_qev_list= NULL;
+ dequeue2(loc_qev_size);
+ /* Signal that our queue can now accept more events. */
+ mysql_cond_signal(&COND_rpl_thread_queue);
+ loc_qev_size= 0;
+ qev_free_pending= 0;
+ }
+ if (loc_rgi_list)
+ {
+ *loc_rgi_last_ptr_ptr= rgi_free_list;
+ rgi_free_list= loc_rgi_list;
+ loc_rgi_list= NULL;
+ }
+ if (loc_gco_list)
+ {
+ *loc_gco_last_ptr_ptr= gco_free_list;
+ gco_free_list= loc_gco_list;
+ loc_gco_list= NULL;
+ }
+}
+
+
+void
+rpl_parallel_thread::inuse_relaylog_refcount_update()
+{
+ inuse_relaylog *ir= accumulated_ir_last;
+ if (ir)
+ {
+ my_atomic_rwlock_wrlock(&ir->rli->inuse_relaylog_atomic_lock);
+ my_atomic_add64(&ir->dequeued_count, accumulated_ir_count);
+ my_atomic_rwlock_wrunlock(&ir->rli->inuse_relaylog_atomic_lock);
+ accumulated_ir_count= 0;
+ accumulated_ir_last= NULL;
+ }
+}
+
+
rpl_parallel_thread::queued_event *
rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
{
@@ -1161,6 +1194,43 @@ rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev,
void
+rpl_parallel_thread::loc_free_qev(rpl_parallel_thread::queued_event *qev)
+{
+ inuse_relaylog *ir= qev->ir;
+ inuse_relaylog *last_ir= accumulated_ir_last;
+ if (ir != last_ir)
+ {
+ if (last_ir)
+ inuse_relaylog_refcount_update();
+ accumulated_ir_last= ir;
+ }
+ ++accumulated_ir_count;
+ if (!loc_qev_list)
+ loc_qev_last_ptr_ptr= &qev->next;
+ else
+ qev->next= loc_qev_list;
+ loc_qev_list= qev;
+ loc_qev_size+= qev->event_size;
+ /*
+ We want to release to the global free list only occasionally, to avoid
+ having to take the LOCK_rpl_thread muted too many times.
+
+ However, we do need to release regularly. If we let the unreleased part
+ grow too large, then the SQL driver thread may go to sleep waiting for
+ the queue to drop below opt_slave_parallel_max_queued, and this in turn
+ can stall all other worker threads for more stuff to do.
+ */
+ if (++qev_free_pending >= QEV_BATCH_FREE ||
+ loc_qev_size >= opt_slave_parallel_max_queued/3)
+ {
+ mysql_mutex_lock(&LOCK_rpl_thread);
+ batch_free();
+ mysql_mutex_unlock(&LOCK_rpl_thread);
+ }
+}
+
+
+void
rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
{
mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -1209,6 +1279,19 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
void
+rpl_parallel_thread::loc_free_rgi(rpl_group_info *rgi)
+{
+ DBUG_ASSERT(rgi->commit_orderer.waitee == NULL);
+ rgi->free_annotate_event();
+ if (!loc_rgi_list)
+ loc_rgi_last_ptr_ptr= &rgi->next;
+ else
+ rgi->next= loc_rgi_list;
+ loc_rgi_list= rgi;
+}
+
+
+void
rpl_parallel_thread::free_rgi(rpl_group_info *rgi)
{
mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -1242,12 +1325,14 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev)
void
-rpl_parallel_thread::free_gco(group_commit_orderer *gco)
+rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
{
- mysql_mutex_assert_owner(&LOCK_rpl_thread);
DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */);
- gco->next_gco= gco_free_list;
- gco_free_list= gco;
+ if (!loc_gco_list)
+ loc_gco_last_ptr_ptr= &gco->next_gco;
+ else
+ gco->next_gco= loc_gco_list;
+ loc_gco_list= gco;
}
@@ -1683,6 +1768,7 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
qev->ir= rli->last_inuse_relaylog;
++qev->ir->queued_count;
thr->enqueue(qev);
+ mysql_cond_signal(&thr->COND_rpl_thread);
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
return 0;
}