summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-05-15 15:52:08 +0200
committerKristian Nielsen <knielsen@knielsen-hq.org>2014-05-15 15:52:08 +0200
commit787c470cef54574e744eb5dfd9153d837fe67e45 (patch)
tree1168c4a1f2bd4371f56e7b231da7f0f18dcdb1f9 /sql
parentd60915692cd02cc70b7eb8245c9ac6eab5df3d0c (diff)
downloadmariadb-git-787c470cef54574e744eb5dfd9153d837fe67e45.tar.gz
MDEV-5262: Missing retry after temp error in parallel replication
Handle retry of event groups that span multiple relay log files. - If retry reaches the end of one relay log file, move on to the next. - Handle refcounting of relay log files, and avoid purging relay log files until all event groups have completed that might have needed them for transaction retry.
Diffstat (limited to 'sql')
-rw-r--r--sql/log.cc26
-rw-r--r--sql/rpl_parallel.cc154
-rw-r--r--sql/rpl_parallel.h2
-rw-r--r--sql/rpl_rli.cc5
-rw-r--r--sql/rpl_rli.h21
-rw-r--r--sql/slave.cc1
6 files changed, 162 insertions, 47 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 4f9a1f2b746..116ac6aed52 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -4097,6 +4097,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
{
int error;
char *to_purge_if_included= NULL;
+ inuse_relaylog *ir;
DBUG_ENTER("purge_first_log");
DBUG_ASSERT(is_open());
@@ -4104,7 +4105,30 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
mysql_mutex_lock(&LOCK_index);
- to_purge_if_included= my_strdup(rli->group_relay_log_name, MYF(0));
+
+ ir= rli->inuse_relaylog_list;
+ while (ir)
+ {
+ inuse_relaylog *next= ir->next;
+ if (!ir->completed || ir->dequeued_count < ir->queued_count)
+ {
+ included= false;
+ break;
+ }
+ if (!included && 0 == strcmp(ir->name, rli->group_relay_log_name))
+ break;
+ if (!next)
+ {
+ rli->last_inuse_relaylog= NULL;
+ included= 1;
+ to_purge_if_included= my_strdup(ir->name, MYF(0));
+ }
+ my_free(ir);
+ ir= next;
+ }
+ rli->inuse_relaylog_list= ir;
+ if (ir)
+ to_purge_if_included= my_strdup(ir->name, MYF(0));
/*
Read the next log file name from the index file and pass it back to
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 0b35e3c9fdc..67d61b7cf11 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -204,20 +204,14 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
}
#endif
-static int
-retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog)
-{
- /* ToDo */
- return 0;
-}
-
static int
retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
rpl_parallel_thread::queued_event *orig_qev)
{
IO_CACHE rlog;
- File fd;
+ LOG_INFO linfo;
+ File fd= (File)-1;
const char *errmsg= NULL;
inuse_relaylog *ir= rgi->relay_log;
uint64 event_count;
@@ -241,7 +235,10 @@ do_retry:
strcpy(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
- return 1;
+ {
+ err= 1;
+ goto err;
+ }
cur_offset= rgi->retry_start_offset;
my_b_seek(&rlog, cur_offset);
@@ -249,43 +246,85 @@ do_retry:
{
Log_event_type event_type;
Log_event *ev;
+ rpl_parallel_thread::queued_event *qev;
- old_offset= cur_offset;
- ev= Log_event::read_log_event(&rlog, 0,
- rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
- opt_slave_sql_verify_checksum);
- cur_offset= my_b_tell(&rlog);
-
- if (!ev)
- {
- err= 1;
- goto err;
- }
- ev->thd= thd;
- event_type= ev->get_type_code();
- if (Log_event::is_group_event(event_type))
+ /* The loop is here so we can try again the next relay log file on EOF. */
+ for (;;)
{
- rpl_parallel_thread::queued_event *qev;
+ old_offset= cur_offset;
+ ev= Log_event::read_log_event(&rlog, 0,
+ rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
+ opt_slave_sql_verify_checksum);
+ cur_offset= my_b_tell(&rlog);
- mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
- cur_offset - old_offset);
- mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- if (!qev)
+ if (ev)
+ break;
+ if (rlog.error < 0)
{
- delete ev;
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ errmsg= "slave SQL thread aborted because of I/O error";
err= 1;
goto err;
}
- err= rpt_handle_event(qev, rpt);
- ++event_count;
- mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- rpt->free_qev(qev);
- mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ if (rlog.error > 0)
+ {
+ sql_print_error("Slave SQL thread: I/O error reading "
+ "event(errno: %d cur_log->error: %d)",
+ my_errno, rlog.error);
+ errmsg= "Aborting slave SQL thread because of partial event read";
+ err= 1;
+ goto err;
+ }
+ /* EOF. Move to the next relay log. */
+ end_io_cache(&rlog);
+ mysql_file_close(fd, MYF(MY_WME));
+ fd= (File)-1;
+
+ /* Find the next relay log file. */
+ if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) ||
+ (err= rli->relay_log.find_next_log(&linfo, 1)))
+ {
+ char buff[22];
+ sql_print_error("next log error: %d offset: %s log: %s",
+ err,
+ llstr(linfo.index_file_offset, buff),
+ log_name);
+ goto err;
+ }
+ strmake_buf(log_name ,linfo.log_file_name);
+
+ if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
+ {
+ err= 1;
+ goto err;
+ }
+ /* Loop to try again on the new log file. */
}
- else
- err= retry_handle_relay_log_rotate(ev, &rlog);
+
+ event_type= ev->get_type_code();
+ if (!Log_event::is_group_event(event_type))
+ {
+ delete ev;
+ continue;
+ }
+ ev->thd= thd;
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
+ cur_offset - old_offset);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ if (!qev)
+ {
+ delete ev;
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ err= 1;
+ goto err;
+ }
+ err= rpt_handle_event(qev, rpt);
+ ++event_count;
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->free_qev(qev);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+
delete_or_keep_event_post_apply(rgi, event_type, ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100",
if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
@@ -300,6 +339,7 @@ do_retry:
{
end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME));
+ fd= (File)-1;
goto do_retry;
}
sql_print_error("Slave worker thread retried transaction %lu time(s) "
@@ -309,15 +349,17 @@ do_retry:
}
goto err;
}
-
- // ToDo: handle too many retries.
-
} while (event_count < events_to_execute);
err:
- end_io_cache(&rlog);
- mysql_file_close(fd, MYF(MY_WME));
+ if (fd >= 0)
+ {
+ end_io_cache(&rlog);
+ mysql_file_close(fd, MYF(MY_WME));
+ }
+ if (errmsg)
+ sql_print_error("Error reading relay log event: %s", errmsg);
return err;
}
@@ -340,6 +382,8 @@ handle_rpl_parallel_thread(void *arg)
rpl_sql_thread_info sql_info(NULL);
size_t total_event_size;
int err;
+ inuse_relaylog *last_ir;
+ uint64 accumulated_ir_count;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@@ -683,12 +727,34 @@ handle_rpl_parallel_thread(void *arg)
rpt->free_rgi(rgis_to_free);
rgis_to_free= next;
}
+ last_ir= NULL;
+ accumulated_ir_count= 0;
while (qevs_to_free)
{
rpl_parallel_thread::queued_event *next= qevs_to_free->next;
+ inuse_relaylog *ir= qevs_to_free->ir;
+ /* Batch up refcount update to reduce use of synchronised operations. */
+ if (last_ir != ir)
+ {
+ if (last_ir)
+ {
+ my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock);
+ my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
+ my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock);
+ accumulated_ir_count= 0;
+ }
+ last_ir= ir;
+ }
+ ++accumulated_ir_count;
rpt->free_qev(qevs_to_free);
qevs_to_free= next;
}
+ if (last_ir)
+ {
+ my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock);
+ my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
+ my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock);
+ }
if ((events= rpt->event_queue) != NULL)
{
@@ -1711,6 +1777,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Queue the event for processing.
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
+ qev->ir= rli->last_inuse_relaylog;
+ ++qev->ir->queued_count;
cur_thread->enqueue(qev);
unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
&did_enter_cond, &old_stage);
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 3b6641523f6..3934fd98648 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -9,6 +9,7 @@ struct rpl_parallel_entry;
struct rpl_parallel_thread_pool;
class Relay_log_info;
+struct inuse_relaylog;
/*
@@ -73,6 +74,7 @@ struct rpl_parallel_thread {
queued_event *next;
Log_event *ev;
rpl_group_info *rgi;
+ inuse_relaylog *ir;
ulonglong future_event_relay_log_pos;
char event_relay_log_name[FN_REFLEN];
char future_event_master_log_name[FN_REFLEN];
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 3a3e22f970a..688068b850f 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -92,6 +92,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
+ my_atomic_rwlock_init(&inuse_relaylog_atomic_lock);
relay_log.init_pthread_objects();
DBUG_VOID_RETURN;
}
@@ -117,6 +118,7 @@ Relay_log_info::~Relay_log_info()
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
mysql_cond_destroy(&log_space_cond);
+ my_atomic_rwlock_destroy(&inuse_relaylog_atomic_lock);
relay_log.cleanup();
DBUG_VOID_RETURN;
}
@@ -1365,7 +1367,10 @@ Relay_log_info::alloc_inuse_relaylog(const char *name)
if (!inuse_relaylog_list)
inuse_relaylog_list= ir;
else
+ {
+ last_inuse_relaylog->completed= true;
last_inuse_relaylog->next= ir;
+ }
last_inuse_relaylog= ir;
return 0;
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index c2cdbcdc573..932db0a0b7d 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -170,6 +170,7 @@ public:
*/
inuse_relaylog *inuse_relaylog_list;
inuse_relaylog *last_inuse_relaylog;
+ my_atomic_rwlock_t inuse_relaylog_atomic_lock;
/*
Needed to deal properly with cur_log getting closed and re-opened with
@@ -481,12 +482,26 @@ private:
Each rpl_group_info has a pointer to one of those, corresponding to the
first GTID event.
- A reference count keeps track of how long a relay log is potentially in use.
+ A pair of reference count keeps track of how long a relay log is potentially
+ in use. When the `completed' flag is set, all events have been read out of
+ the relay log, but the log might still be needed for retry in worker
+ threads. As worker threads complete an event group, they increment
+ atomically the `dequeued_count' with number of events queued. Thus, when
+ completed is set and dequeued_count equals queued_count, the relay log file
+ is finally done with and can be purged.
+
+ By separating the queued and dequeued count, only the dequeued_count needs
+ multi-thread synchronisation; the completed flag and queued_count fields
+ are only accessed by the SQL driver thread and need no synchronisation.
*/
struct inuse_relaylog {
inuse_relaylog *next;
- uint64 queued_count;
- uint64 dequeued_count;
+ /* Number of events in this relay log queued for worker threads. */
+ int64 queued_count;
+ /* Number of events completed by worker threads. */
+ volatile int64 dequeued_count;
+ /* Set when all events have been read from a relaylog. */
+ bool completed;
char name[FN_REFLEN];
};
diff --git a/sql/slave.cc b/sql/slave.cc
index ab505a4011f..59375297448 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -6397,6 +6397,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
DBUG_ASSERT(rli->cur_log_fd >= 0);
mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
rli->cur_log_fd = -1;
+ rli->last_inuse_relaylog->completed= true;
if (relay_log_purge)
{