diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 276 |
1 files changed, 181 insertions, 95 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 9b91206ca75..2050ccdb3d7 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -8,6 +8,15 @@ Code for optional parallel execution of replicated events on the slave. */ + +/* + Maximum number of queued events to accumulate in a local free list, before + moving them to the global free list. There is additional a limit of how much + to accumulate based on opt_slave_parallel_max_queued. +*/ +#define QEV_BATCH_FREE 200 + + struct rpl_parallel_thread_pool global_rpl_thread_pool; static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, @@ -290,6 +299,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, THD *thd= rgi->thd; rpl_parallel_entry *entry= rgi->parallel_entry; ulong retries= 0; + Format_description_log_event *description_event= NULL; do_retry: event_count= 0; @@ -355,6 +365,14 @@ do_retry: goto err; } cur_offset= rgi->retry_start_offset; + delete description_event; + description_event= + read_relay_log_description_event(&rlog, cur_offset, &errmsg); + if (!description_event) + { + err= 1; + goto err; + } my_b_seek(&rlog, cur_offset); do @@ -367,8 +385,7 @@ do_retry: for (;;) { old_offset= cur_offset; - ev= Log_event::read_log_event(&rlog, 0, - rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */, + ev= Log_event::read_log_event(&rlog, 0, description_event, opt_slave_sql_verify_checksum); cur_offset= my_b_tell(&rlog); @@ -416,7 +433,12 @@ do_retry: } event_type= ev->get_type_code(); - if (!Log_event::is_group_event(event_type)) + if (event_type == FORMAT_DESCRIPTION_EVENT) + { + delete description_event; + description_event= (Format_description_log_event *)ev; + continue; + } else if (!Log_event::is_group_event(event_type)) { delete ev; continue; @@ -424,7 +446,7 @@ do_retry: ev->thd= thd; mysql_mutex_lock(&rpt->LOCK_rpl_thread); - qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset, + qev= rpt->retry_get_qev(ev, orig_qev, log_name, old_offset, cur_offset - old_offset); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); if (!qev) @@ -472,6 +494,8 @@ do_retry: err: + if (description_event) + delete description_event; if (fd >= 0) { end_io_cache(&rlog); @@ -495,14 +519,8 @@ handle_rpl_parallel_thread(void *arg) rpl_group_info *group_rgi= NULL; group_commit_orderer *gco, *tmp_gco; uint64 event_gtid_sub_id= 0; - rpl_parallel_thread::queued_event *qevs_to_free; - rpl_group_info *rgis_to_free; - group_commit_orderer *gcos_to_free; rpl_sql_thread_info sql_info(NULL); - size_t total_event_size; int err; - inuse_relaylog *last_ir; - uint64 accumulated_ir_count; struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; @@ -544,6 +562,8 @@ handle_rpl_parallel_thread(void *arg) while (!rpt->stop) { + rpl_parallel_thread::queued_event *qev, *next_qev; + thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread, &stage_waiting_for_work_from_sql_thread, &old_stage); /* @@ -565,28 +585,21 @@ handle_rpl_parallel_thread(void *arg) thd->EXIT_COND(&old_stage); more_events: - qevs_to_free= NULL; - rgis_to_free= NULL; - gcos_to_free= NULL; - total_event_size= 0; - while (events) + for (qev= events; qev; qev= next_qev) { - struct rpl_parallel_thread::queued_event *next= events->next; Log_event_type event_type; - rpl_group_info *rgi= events->rgi; + rpl_group_info *rgi= qev->rgi; rpl_parallel_entry *entry= rgi->parallel_entry; bool end_of_group, group_ending; - total_event_size+= events->event_size; - if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE) + next_qev= qev->next; + if (qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE) { - handle_queued_pos_update(thd, events); - events->next= qevs_to_free; - qevs_to_free= events; - events= next; + handle_queued_pos_update(thd, qev); + rpt->loc_free_qev(qev); continue; } - else if (events->typ == + else if (qev->typ == rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART) { if (in_event_group) @@ -598,29 +611,34 @@ handle_rpl_parallel_thread(void *arg) group_rgi->cleanup_context(thd, 1); in_event_group= false; finish_event_group(thd, group_rgi->gtid_sub_id, - events->entry_for_queued, group_rgi); + qev->entry_for_queued, group_rgi); - group_rgi->next= rgis_to_free; - rgis_to_free= group_rgi; + rpt->loc_free_rgi(group_rgi); thd->rgi_slave= group_rgi= NULL; } - events->next= qevs_to_free; - qevs_to_free= events; - events= next; + rpt->loc_free_qev(qev); continue; } - DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); + DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); thd->rgi_slave= group_rgi= rgi; gco= rgi->gco; /* Handle a new event group, which will be initiated by a GTID event. */ - if ((event_type= events->ev->get_type_code()) == GTID_EVENT) + if ((event_type= qev->ev->get_type_code()) == GTID_EVENT) { bool did_enter_cond= false; PSI_stage_info old_stage; uint64 wait_count; + DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", { + if (rgi->current_gtid.domain_id == 0 && + rgi->current_gtid.seq_no == 100) { + debug_sync_set_action(thd, + STRING_WITH_LEN("now SIGNAL scheduled_gtid_0_x_100")); + } + }); + in_event_group= true; /* If the standalone flag is set, then this event group consists of a @@ -628,7 +646,7 @@ handle_rpl_parallel_thread(void *arg) similar), without any terminating COMMIT/ROLLBACK/XID. */ group_standalone= - (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 & + (0 != (static_cast<Gtid_log_event *>(qev->ev)->flags2 & Gtid_log_event::FL_STANDALONE)); event_gtid_sub_id= rgi->gtid_sub_id; @@ -656,7 +674,7 @@ handle_rpl_parallel_thread(void *arg) DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); thd->ENTER_COND(&gco->COND_group_commit_orderer, &entry->LOCK_parallel_entry, - &stage_waiting_for_prior_transaction_to_commit, + &stage_waiting_for_prior_transaction_to_start_commit, &old_stage); did_enter_cond= true; do @@ -689,8 +707,7 @@ handle_rpl_parallel_thread(void *arg) */ DBUG_ASSERT(!tmp_gco->prev_gco); gco->prev_gco= NULL; - tmp_gco->next_gco= gcos_to_free; - gcos_to_free= tmp_gco; + rpt->loc_free_gco(tmp_gco); } if (entry->force_abort && wait_count > entry->stop_count) @@ -751,7 +768,7 @@ handle_rpl_parallel_thread(void *arg) } } - group_ending= is_group_ending(events->ev, event_type); + group_ending= is_group_ending(qev->ev, event_type); if (group_ending && likely(!rgi->worker_error)) { DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit"); @@ -767,20 +784,32 @@ handle_rpl_parallel_thread(void *arg) if (likely(!rgi->worker_error) && !skip_event_group) { ++rgi->retry_event_count; - err= rpt_handle_event(events, rpt); - delete_or_keep_event_post_apply(rgi, event_type, events->ev); +#ifndef DBUG_OFF + err= 0; + DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_xid", + if (event_type == XID_EVENT) + { + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); + my_error(ER_LOCK_DEADLOCK, MYF(0)); + err= 1; + }); + if (!err) +#endif + err= rpt_handle_event(qev, rpt); + delete_or_keep_event_post_apply(rgi, event_type, qev->ev); DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100", err= dbug_simulate_tmp_error(rgi, thd);); if (err) { convert_kill_to_deadlock_error(rgi); if (has_temporary_error(thd) && slave_trans_retries > 0) - err= retry_event_group(rgi, rpt, events); + err= retry_event_group(rgi, rpt, qev); } } else { - delete events->ev; + delete qev->ev; err= thd->wait_for_prior_commit(); } @@ -789,8 +818,7 @@ handle_rpl_parallel_thread(void *arg) ((group_standalone && !Log_event::is_part_of_group(event_type)) || group_ending); - events->next= qevs_to_free; - qevs_to_free= events; + rpt->loc_free_qev(qev); if (unlikely(err)) { @@ -805,61 +833,20 @@ handle_rpl_parallel_thread(void *arg) { in_event_group= false; finish_event_group(thd, event_gtid_sub_id, entry, rgi); - rgi->next= rgis_to_free; - rgis_to_free= rgi; + rpt->loc_free_rgi(rgi); thd->rgi_slave= group_rgi= rgi= NULL; skip_event_group= false; DEBUG_SYNC(thd, "rpl_parallel_end_of_group"); } - - events= next; } mysql_mutex_lock(&rpt->LOCK_rpl_thread); - /* Signal that our queue can now accept more events. */ - rpt->dequeue2(total_event_size); - mysql_cond_signal(&rpt->COND_rpl_thread_queue); - /* We need to delay the free here, to when we have the lock. */ - while (gcos_to_free) - { - group_commit_orderer *next= gcos_to_free->next_gco; - rpt->free_gco(gcos_to_free); - gcos_to_free= next; - } - while (rgis_to_free) - { - rpl_group_info *next= rgis_to_free->next; - rpt->free_rgi(rgis_to_free); - rgis_to_free= next; - } - last_ir= NULL; - accumulated_ir_count= 0; - while (qevs_to_free) - { - rpl_parallel_thread::queued_event *next= qevs_to_free->next; - inuse_relaylog *ir= qevs_to_free->ir; - /* Batch up refcount update to reduce use of synchronised operations. */ - if (last_ir != ir) - { - if (last_ir) - { - my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock); - my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); - my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock); - accumulated_ir_count= 0; - } - last_ir= ir; - } - ++accumulated_ir_count; - rpt->free_qev(qevs_to_free); - qevs_to_free= next; - } - if (last_ir) - { - my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock); - my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); - my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock); - } + /* + Now that we have the lock, we can move everything from our local free + lists to the real free lists that are also accessible from the SQL + driver thread. + */ + rpt->batch_free(); if ((events= rpt->event_queue) != NULL) { @@ -872,6 +859,7 @@ handle_rpl_parallel_thread(void *arg) mysql_mutex_unlock(&rpt->LOCK_rpl_thread); goto more_events; } + rpt->inuse_relaylog_refcount_update(); if (in_event_group && group_rgi->parallel_entry->force_abort) { @@ -1107,6 +1095,51 @@ err: } +void +rpl_parallel_thread::batch_free() +{ + mysql_mutex_assert_owner(&LOCK_rpl_thread); + if (loc_qev_list) + { + *loc_qev_last_ptr_ptr= qev_free_list; + qev_free_list= loc_qev_list; + loc_qev_list= NULL; + dequeue2(loc_qev_size); + /* Signal that our queue can now accept more events. */ + mysql_cond_signal(&COND_rpl_thread_queue); + loc_qev_size= 0; + qev_free_pending= 0; + } + if (loc_rgi_list) + { + *loc_rgi_last_ptr_ptr= rgi_free_list; + rgi_free_list= loc_rgi_list; + loc_rgi_list= NULL; + } + if (loc_gco_list) + { + *loc_gco_last_ptr_ptr= gco_free_list; + gco_free_list= loc_gco_list; + loc_gco_list= NULL; + } +} + + +void +rpl_parallel_thread::inuse_relaylog_refcount_update() +{ + inuse_relaylog *ir= accumulated_ir_last; + if (ir) + { + my_atomic_rwlock_wrlock(&ir->rli->inuse_relaylog_atomic_lock); + my_atomic_add64(&ir->dequeued_count, accumulated_ir_count); + my_atomic_rwlock_wrunlock(&ir->rli->inuse_relaylog_atomic_lock); + accumulated_ir_count= 0; + accumulated_ir_last= NULL; + } +} + + rpl_parallel_thread::queued_event * rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size) { @@ -1161,6 +1194,43 @@ rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev, void +rpl_parallel_thread::loc_free_qev(rpl_parallel_thread::queued_event *qev) +{ + inuse_relaylog *ir= qev->ir; + inuse_relaylog *last_ir= accumulated_ir_last; + if (ir != last_ir) + { + if (last_ir) + inuse_relaylog_refcount_update(); + accumulated_ir_last= ir; + } + ++accumulated_ir_count; + if (!loc_qev_list) + loc_qev_last_ptr_ptr= &qev->next; + else + qev->next= loc_qev_list; + loc_qev_list= qev; + loc_qev_size+= qev->event_size; + /* + We want to release to the global free list only occasionally, to avoid + having to take the LOCK_rpl_thread muted too many times. + + However, we do need to release regularly. If we let the unreleased part + grow too large, then the SQL driver thread may go to sleep waiting for + the queue to drop below opt_slave_parallel_max_queued, and this in turn + can stall all other worker threads for more stuff to do. + */ + if (++qev_free_pending >= QEV_BATCH_FREE || + loc_qev_size >= opt_slave_parallel_max_queued/3) + { + mysql_mutex_lock(&LOCK_rpl_thread); + batch_free(); + mysql_mutex_unlock(&LOCK_rpl_thread); + } +} + + +void rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) { mysql_mutex_assert_owner(&LOCK_rpl_thread); @@ -1209,6 +1279,19 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, void +rpl_parallel_thread::loc_free_rgi(rpl_group_info *rgi) +{ + DBUG_ASSERT(rgi->commit_orderer.waitee == NULL); + rgi->free_annotate_event(); + if (!loc_rgi_list) + loc_rgi_last_ptr_ptr= &rgi->next; + else + rgi->next= loc_rgi_list; + loc_rgi_list= rgi; +} + + +void rpl_parallel_thread::free_rgi(rpl_group_info *rgi) { mysql_mutex_assert_owner(&LOCK_rpl_thread); @@ -1242,12 +1325,14 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev) void -rpl_parallel_thread::free_gco(group_commit_orderer *gco) +rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) { - mysql_mutex_assert_owner(&LOCK_rpl_thread); DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */); - gco->next_gco= gco_free_list; - gco_free_list= gco; + if (!loc_gco_list) + loc_gco_last_ptr_ptr= &gco->next_gco; + else + gco->next_gco= loc_gco_list; + loc_gco_list= gco; } @@ -1683,6 +1768,7 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, qev->ir= rli->last_inuse_relaylog; ++qev->ir->queued_count; thr->enqueue(qev); + mysql_cond_signal(&thr->COND_rpl_thread); mysql_mutex_unlock(&thr->LOCK_rpl_thread); return 0; } |