diff options
author | unknown <knielsen@knielsen-hq.org> | 2014-05-15 15:52:08 +0200 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2014-05-15 15:52:08 +0200 |
commit | 787c470cef54574e744eb5dfd9153d837fe67e45 (patch) | |
tree | 1168c4a1f2bd4371f56e7b231da7f0f18dcdb1f9 /sql | |
parent | d60915692cd02cc70b7eb8245c9ac6eab5df3d0c (diff) | |
download | mariadb-git-787c470cef54574e744eb5dfd9153d837fe67e45.tar.gz |
MDEV-5262: Missing retry after temp error in parallel replication
Handle retry of event groups that span multiple relay log files.
- If retry reaches the end of one relay log file, move on to the next.
- Handle refcounting of relay log files, and avoid purging relay log
files until all event groups have completed that might have needed
them for transaction retry.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log.cc | 26 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 154 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 2 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 5 | ||||
-rw-r--r-- | sql/rpl_rli.h | 21 | ||||
-rw-r--r-- | sql/slave.cc | 1 |
6 files changed, 162 insertions, 47 deletions
diff --git a/sql/log.cc b/sql/log.cc index 4f9a1f2b746..116ac6aed52 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -4097,6 +4097,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) { int error; char *to_purge_if_included= NULL; + inuse_relaylog *ir; DBUG_ENTER("purge_first_log"); DBUG_ASSERT(is_open()); @@ -4104,7 +4105,30 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name)); mysql_mutex_lock(&LOCK_index); - to_purge_if_included= my_strdup(rli->group_relay_log_name, MYF(0)); + + ir= rli->inuse_relaylog_list; + while (ir) + { + inuse_relaylog *next= ir->next; + if (!ir->completed || ir->dequeued_count < ir->queued_count) + { + included= false; + break; + } + if (!included && 0 == strcmp(ir->name, rli->group_relay_log_name)) + break; + if (!next) + { + rli->last_inuse_relaylog= NULL; + included= 1; + to_purge_if_included= my_strdup(ir->name, MYF(0)); + } + my_free(ir); + ir= next; + } + rli->inuse_relaylog_list= ir; + if (ir) + to_purge_if_included= my_strdup(ir->name, MYF(0)); /* Read the next log file name from the index file and pass it back to diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 0b35e3c9fdc..67d61b7cf11 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -204,20 +204,14 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) } #endif -static int -retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog) -{ - /* ToDo */ - return 0; -} - static int retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, rpl_parallel_thread::queued_event *orig_qev) { IO_CACHE rlog; - File fd; + LOG_INFO linfo; + File fd= (File)-1; const char *errmsg= NULL; inuse_relaylog *ir= rgi->relay_log; uint64 event_count; @@ -241,7 +235,10 @@ do_retry: strcpy(log_name, ir->name); if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) - return 1; + { + err= 1; + goto err; + } cur_offset= rgi->retry_start_offset; my_b_seek(&rlog, cur_offset); @@ -249,43 +246,85 @@ do_retry: { Log_event_type event_type; Log_event *ev; + rpl_parallel_thread::queued_event *qev; - old_offset= cur_offset; - ev= Log_event::read_log_event(&rlog, 0, - rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */, - opt_slave_sql_verify_checksum); - cur_offset= my_b_tell(&rlog); - - if (!ev) - { - err= 1; - goto err; - } - ev->thd= thd; - event_type= ev->get_type_code(); - if (Log_event::is_group_event(event_type)) + /* The loop is here so we can try again the next relay log file on EOF. */ + for (;;) { - rpl_parallel_thread::queued_event *qev; + old_offset= cur_offset; + ev= Log_event::read_log_event(&rlog, 0, + rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */, + opt_slave_sql_verify_checksum); + cur_offset= my_b_tell(&rlog); - mysql_mutex_lock(&rpt->LOCK_rpl_thread); - qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset, - cur_offset - old_offset); - mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - if (!qev) + if (ev) + break; + if (rlog.error < 0) { - delete ev; - my_error(ER_OUT_OF_RESOURCES, MYF(0)); + errmsg= "slave SQL thread aborted because of I/O error"; err= 1; goto err; } - err= rpt_handle_event(qev, rpt); - ++event_count; - mysql_mutex_lock(&rpt->LOCK_rpl_thread); - rpt->free_qev(qev); - mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + if (rlog.error > 0) + { + sql_print_error("Slave SQL thread: I/O error reading " + "event(errno: %d cur_log->error: %d)", + my_errno, rlog.error); + errmsg= "Aborting slave SQL thread because of partial event read"; + err= 1; + goto err; + } + /* EOF. Move to the next relay log. */ + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + fd= (File)-1; + + /* Find the next relay log file. */ + if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) || + (err= rli->relay_log.find_next_log(&linfo, 1))) + { + char buff[22]; + sql_print_error("next log error: %d offset: %s log: %s", + err, + llstr(linfo.index_file_offset, buff), + log_name); + goto err; + } + strmake_buf(log_name ,linfo.log_file_name); + + if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) + { + err= 1; + goto err; + } + /* Loop to try again on the new log file. */ } - else - err= retry_handle_relay_log_rotate(ev, &rlog); + + event_type= ev->get_type_code(); + if (!Log_event::is_group_event(event_type)) + { + delete ev; + continue; + } + ev->thd= thd; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset, + cur_offset - old_offset); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + if (!qev) + { + delete ev; + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + err= 1; + goto err; + } + err= rpt_handle_event(qev, rpt); + ++event_count; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->free_qev(qev); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + delete_or_keep_event_post_apply(rgi, event_type, ev); DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100", if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd);); @@ -300,6 +339,7 @@ do_retry: { end_io_cache(&rlog); mysql_file_close(fd, MYF(MY_WME)); + fd= (File)-1; goto do_retry; } sql_print_error("Slave worker thread retried transaction %lu time(s) " @@ -309,15 +349,17 @@ do_retry: } goto err; } - - // ToDo: handle too many retries. - } while (event_count < events_to_execute); err: - end_io_cache(&rlog); - mysql_file_close(fd, MYF(MY_WME)); + if (fd >= 0) + { + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + } + if (errmsg) + sql_print_error("Error reading relay log event: %s", errmsg); return err; } @@ -340,6 +382,8 @@ handle_rpl_parallel_thread(void *arg) rpl_sql_thread_info sql_info(NULL); size_t total_event_size; int err; + inuse_relaylog *last_ir; + uint64 accumulated_ir_count; struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; @@ -683,12 +727,34 @@ handle_rpl_parallel_thread(void *arg) rpt->free_rgi(rgis_to_free); rgis_to_free= next; } + last_ir= NULL; + accumulated_ir_count= 0; while (qevs_to_free) { rpl_parallel_thread::queued_event *next= qevs_to_free->next; + inuse_relaylog *ir= qevs_to_free->ir; + /* Batch up refcount update to reduce use of synchronised operations. */ + if (last_ir != ir) + { + if (last_ir) + { + my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock); + my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); + my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock); + accumulated_ir_count= 0; + } + last_ir= ir; + } + ++accumulated_ir_count; rpt->free_qev(qevs_to_free); qevs_to_free= next; } + if (last_ir) + { + my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock); + my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); + my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock); + } if ((events= rpt->event_queue) != NULL) { @@ -1711,6 +1777,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, Queue the event for processing. */ rli->event_relay_log_pos= rli->future_event_relay_log_pos; + qev->ir= rli->last_inuse_relaylog; + ++qev->ir->queued_count; cur_thread->enqueue(qev); unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread, &did_enter_cond, &old_stage); diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 3b6641523f6..3934fd98648 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -9,6 +9,7 @@ struct rpl_parallel_entry; struct rpl_parallel_thread_pool; class Relay_log_info; +struct inuse_relaylog; /* @@ -73,6 +74,7 @@ struct rpl_parallel_thread { queued_event *next; Log_event *ev; rpl_group_info *rgi; + inuse_relaylog *ir; ulonglong future_event_relay_log_pos; char event_relay_log_name[FN_REFLEN]; char future_event_master_log_name[FN_REFLEN]; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 3a3e22f970a..688068b850f 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -92,6 +92,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL); mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL); mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL); + my_atomic_rwlock_init(&inuse_relaylog_atomic_lock); relay_log.init_pthread_objects(); DBUG_VOID_RETURN; } @@ -117,6 +118,7 @@ Relay_log_info::~Relay_log_info() mysql_cond_destroy(&start_cond); mysql_cond_destroy(&stop_cond); mysql_cond_destroy(&log_space_cond); + my_atomic_rwlock_destroy(&inuse_relaylog_atomic_lock); relay_log.cleanup(); DBUG_VOID_RETURN; } @@ -1365,7 +1367,10 @@ Relay_log_info::alloc_inuse_relaylog(const char *name) if (!inuse_relaylog_list) inuse_relaylog_list= ir; else + { + last_inuse_relaylog->completed= true; last_inuse_relaylog->next= ir; + } last_inuse_relaylog= ir; return 0; diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index c2cdbcdc573..932db0a0b7d 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -170,6 +170,7 @@ public: */ inuse_relaylog *inuse_relaylog_list; inuse_relaylog *last_inuse_relaylog; + my_atomic_rwlock_t inuse_relaylog_atomic_lock; /* Needed to deal properly with cur_log getting closed and re-opened with @@ -481,12 +482,26 @@ private: Each rpl_group_info has a pointer to one of those, corresponding to the first GTID event. - A reference count keeps track of how long a relay log is potentially in use. + A pair of reference count keeps track of how long a relay log is potentially + in use. When the `completed' flag is set, all events have been read out of + the relay log, but the log might still be needed for retry in worker + threads. As worker threads complete an event group, they increment + atomically the `dequeued_count' with number of events queued. Thus, when + completed is set and dequeued_count equals queued_count, the relay log file + is finally done with and can be purged. + + By separating the queued and dequeued count, only the dequeued_count needs + multi-thread synchronisation; the completed flag and queued_count fields + are only accessed by the SQL driver thread and need no synchronisation. */ struct inuse_relaylog { inuse_relaylog *next; - uint64 queued_count; - uint64 dequeued_count; + /* Number of events in this relay log queued for worker threads. */ + int64 queued_count; + /* Number of events completed by worker threads. */ + volatile int64 dequeued_count; + /* Set when all events have been read from a relaylog. */ + bool completed; char name[FN_REFLEN]; }; diff --git a/sql/slave.cc b/sql/slave.cc index ab505a4011f..59375297448 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -6397,6 +6397,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) DBUG_ASSERT(rli->cur_log_fd >= 0); mysql_file_close(rli->cur_log_fd, MYF(MY_WME)); rli->cur_log_fd = -1; + rli->last_inuse_relaylog->completed= true; if (relay_log_purge) { |