summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-03-04 14:32:42 +0100
committerunknown <knielsen@knielsen-hq.org>2014-03-04 14:32:42 +0100
commit5ec49e6452d4c76650c8475e387283fdbe480672 (patch)
treeb2ee21e1c2b89e9050ea65ac70fc3792f9df7a69 /sql
parent016bd4fc5fff311dc4091b3b7329cd980dbaa14b (diff)
parentb5b82108497b5beda3b2fbe98ecea178b5e58076 (diff)
downloadmariadb-git-5ec49e6452d4c76650c8475e387283fdbe480672.tar.gz
Merge MDEV-5754, MDEV-5769, and MDEV-5764 into 10.0
Diffstat (limited to 'sql')
-rw-r--r--sql/log_event.cc37
-rw-r--r--sql/log_event.h9
-rw-r--r--sql/rpl_parallel.cc115
-rw-r--r--sql/rpl_parallel.h6
-rw-r--r--sql/rpl_rli.cc3
-rw-r--r--sql/rpl_rli.h1
-rw-r--r--sql/slave.cc33
7 files changed, 140 insertions, 64 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 2b55db5dc78..1e69d5bf1cc 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -3736,9 +3736,14 @@ Query_log_event::begin_event(String *packet, ulong ev_offset,
DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
- /* Currently we only need to replace GTID event. */
- DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN);
- if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+ /*
+ Currently we only need to replace GTID event.
+ The length of GTID differs depending on whether it contains commit id.
+ */
+ DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN ||
+ data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2);
+ if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN &&
+ data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2)
return 1;
flags= uint2korr(p + FLAGS_OFFSET);
@@ -3751,9 +3756,22 @@ Query_log_event::begin_event(String *packet, ulong ev_offset,
int4store(q + Q_EXEC_TIME_OFFSET, 0);
q[Q_DB_LEN_OFFSET]= 0;
int2store(q + Q_ERR_CODE_OFFSET, 0);
- int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0);
- q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */
- q+= Q_DATA_OFFSET + 1;
+ if (data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+ {
+ int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0);
+ q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */
+ q+= Q_DATA_OFFSET + 1;
+ }
+ else
+ {
+ DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2);
+ /* Put in an empty time_zone_str to take up the extra 2 bytes. */
+ int2store(q + Q_STATUS_VARS_LEN_OFFSET, 2);
+ q[Q_DATA_OFFSET]= Q_TIME_ZONE_CODE;
+ q[Q_DATA_OFFSET+1]= 0; /* Zero length for empty time_zone_str */
+ q[Q_DATA_OFFSET+2]= 0; /* Zero terminator for empty db */
+ q+= Q_DATA_OFFSET + 3;
+ }
memcpy(q, "BEGIN", 5);
if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
@@ -6779,7 +6797,7 @@ Gtid_list_log_event::write(IO_CACHE *file)
int
Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
{
- Relay_log_info const *rli= rgi->rli;
+ Relay_log_info *rli= const_cast<Relay_log_info*>(rgi->rli);
int ret;
if (gl_flags & FLAG_IGN_GTIDS)
{
@@ -6799,10 +6817,11 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
{
char str_buf[128];
String str(str_buf, sizeof(str_buf), system_charset_info);
- const_cast<Relay_log_info*>(rli)->until_gtid_pos.to_string(&str);
+ rli->until_gtid_pos.to_string(&str);
sql_print_information("Slave SQL thread stops because it reached its"
" UNTIL master_gtid_pos %s", str.c_ptr_safe());
- const_cast<Relay_log_info*>(rli)->abort_slave= true;
+ rli->abort_slave= true;
+ rli->stop_for_until= true;
}
return ret;
}
diff --git a/sql/log_event.h b/sql/log_event.h
index cfdc65a2c40..a9d1b08171f 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3123,12 +3123,15 @@ public:
<td>flags</td>
<td>1 byte bitfield</td>
<td>Bit 0 set indicates stand-alone event (no terminating COMMIT)</td>
+ <td>Bit 1 set indicates group commit, and that commit id exists</td>
</tr>
<tr>
- <td>Reserved</td>
- <td>6 bytes</td>
- <td>Reserved bytes, set to 0. Maybe be used for future expansion.</td>
+ <td>Reserved (no group commit) / commit id (group commit) (see flags bit 1)</td>
+ <td>6 bytes / 8 bytes</td>
+ <td>Reserved bytes, set to 0. Maybe be used for future expansion (no
+ group commit). OR commit id, same for all GTIDs in the same group
+ commit (see flags bit 1).</td>
</tr>
</table>
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 5947fb70330..154a95c1028 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -173,6 +173,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
rgi->is_error= true;
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
+ rgi->rli->stop_for_until= false;
mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
rgi->rli->relay_log.signal_update();
@@ -1122,7 +1123,7 @@ rpl_parallel::find(uint32 domain_id)
void
-rpl_parallel::wait_for_done(THD *thd)
+rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
{
struct rpl_parallel_entry *e;
rpl_parallel_thread *rpt;
@@ -1152,9 +1153,13 @@ rpl_parallel::wait_for_done(THD *thd)
started executing yet. So we set e->stop_count here and use it to
decide in the worker threads whether to continue executing an event
group or whether to skip it, when force_abort is set.
+
+ If we stop due to reaching the START SLAVE UNTIL condition, then we
+ need to continue executing any queued events up to that point.
*/
e->force_abort= true;
- e->stop_count= e->count_committing_event_groups;
+ e->stop_count= rli->stop_for_until ?
+ e->count_queued_event_groups : e->count_committing_event_groups;
mysql_mutex_unlock(&e->LOCK_parallel_entry);
for (j= 0; j < e->rpl_thread_max; ++j)
{
@@ -1190,6 +1195,30 @@ rpl_parallel::wait_for_done(THD *thd)
}
+/*
+ This function handles the case where the SQL driver thread reached the
+ START SLAVE UNTIL position; we stop queueing more events but continue
+ processing remaining, already queued events; then use executes manual
+ STOP SLAVE; then this function signals to worker threads that they
+ should stop the processing of any remaining queued events.
+*/
+void
+rpl_parallel::stop_during_until()
+{
+ struct rpl_parallel_entry *e;
+ uint32 i;
+
+ for (i= 0; i < domain_hash.records; ++i)
+ {
+ e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ if (e->force_abort)
+ e->stop_count= e->count_committing_event_groups;
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ }
+}
+
+
bool
rpl_parallel::workers_idle()
{
@@ -1230,11 +1259,12 @@ abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
do_event() is executed by the sql_driver_thd thread.
It's main purpose is to find a thread that can execute the query.
- @retval false ok, event was accepted
- @retval true error
+ @retval 0 ok, event was accepted
+ @retval 1 error
+ @retval -1 event should be executed serially, in the sql driver thread
*/
-bool
+int
rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
ulonglong event_size)
{
@@ -1248,6 +1278,32 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
bool did_enter_cond= false;
PSI_stage_info old_stage;
+ /* Handle master log name change, seen in Rotate_log_event. */
+ typ= ev->get_type_code();
+ if (unlikely(typ == ROTATE_EVENT))
+ {
+ Rotate_log_event *rev= static_cast<Rotate_log_event *>(ev);
+ if ((rev->server_id != global_system_variables.server_id ||
+ rli->replicate_same_server_id) &&
+ !rev->is_relay_log_event() &&
+ !rli->is_in_group())
+ {
+ memcpy(rli->future_event_master_log_name,
+ rev->new_log_ident, rev->ident_len+1);
+ }
+ }
+
+ /*
+ Execute queries non-parallel if slave_skip_counter is set, as it's is
+ easier to skip queries in single threaded mode.
+ */
+ if (rli->slave_skip_counter)
+ return -1;
+
+ /* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */
+ if (unlikely(!current) && typ != GTID_EVENT)
+ return -1;
+
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
@@ -1259,21 +1315,20 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
been partially queued, but after that we will just ignore any further
events the SQL driver thread may try to queue, and eventually it will stop.
*/
- if (((typ= ev->get_type_code()) == GTID_EVENT ||
- !(is_group_event= Log_event::is_group_event(typ))) &&
- rli->abort_slave)
+ is_group_event= Log_event::is_group_event(typ);
+ if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave)
sql_thread_stopping= true;
if (sql_thread_stopping)
{
delete ev;
/*
- Return false ("no error"); normal stop is not an error, and otherwise the
- error has already been recorded.
+ Return "no error"; normal stop is not an error, and otherwise the error
+ has already been recorded.
*/
- return false;
+ return 0;
}
- if (typ == GTID_EVENT || unlikely(!current))
+ if (typ == GTID_EVENT)
{
uint32 domain_id;
if (likely(typ == GTID_EVENT))
@@ -1288,7 +1343,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
delete ev;
- return true;
+ return 1;
}
current= e;
}
@@ -1307,7 +1362,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
/* This means we were killed. The error is already signalled. */
delete ev;
- return true;
+ return 1;
}
if (!(qev= cur_thread->get_qev(ev, event_size, rli)))
@@ -1315,7 +1370,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
&did_enter_cond, &old_stage);
delete ev;
- return true;
+ return 1;
}
if (typ == GTID_EVENT)
@@ -1328,7 +1383,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
&did_enter_cond, &old_stage);
delete ev;
- return true;
+ return 1;
}
/*
@@ -1366,7 +1421,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
&did_enter_cond, &old_stage);
delete ev;
- return true;
+ return 1;
}
e->current_gco= rgi->gco= gco;
}
@@ -1380,7 +1435,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
e->current_sub_id= rgi->gtid_sub_id;
++e->count_queued_event_groups;
}
- else if (!is_group_event || !e)
+ else if (!is_group_event)
{
my_off_t log_pos;
int err;
@@ -1389,38 +1444,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
Same for events not preceeded by GTID (we should not see those normally,
but they might be from an old master).
-
- The variable `e' is NULL for the case where the master did not
- have GTID, like a MariaDB 5.5 or MySQL master.
*/
qev->rgi= serial_rgi;
- /* Handle master log name change, seen in Rotate_log_event. */
- if (typ == ROTATE_EVENT)
- {
- Rotate_log_event *rev= static_cast<Rotate_log_event *>(qev->ev);
- if ((rev->server_id != global_system_variables.server_id ||
- rli->replicate_same_server_id) &&
- !rev->is_relay_log_event() &&
- !rli->is_in_group())
- {
- memcpy(rli->future_event_master_log_name,
- rev->new_log_ident, rev->ident_len+1);
- }
- }
tmp= serial_rgi->is_parallel_exec;
serial_rgi->is_parallel_exec= true;
err= rpt_handle_event(qev, NULL);
serial_rgi->is_parallel_exec= tmp;
- log_pos= qev->ev->log_pos;
- delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev);
+ log_pos= ev->log_pos;
+ delete_or_keep_event_post_apply(serial_rgi, typ, ev);
if (err)
{
cur_thread->free_qev(qev);
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
&did_enter_cond, &old_stage);
- return true;
+ return 1;
}
/*
Queue an empty event, so that the position will be updated in a
@@ -1451,5 +1490,5 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
&did_enter_cond, &old_stage);
mysql_cond_signal(&cur_thread->COND_rpl_thread);
- return false;
+ return 0;
}
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 956a31e4b7f..c4bb407e5eb 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -222,10 +222,10 @@ struct rpl_parallel {
~rpl_parallel();
void reset();
rpl_parallel_entry *find(uint32 domain_id);
- void wait_for_done(THD *thd);
+ void wait_for_done(THD *thd, Relay_log_info *rli);
+ void stop_during_until();
bool workers_idle();
- bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
- ulonglong event_size);
+ int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size);
};
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 399852744f8..0fae3a3bb89 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -60,7 +60,8 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
- inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
+ inited(0), abort_slave(0), stop_for_until(0),
+ slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
m_flags(0)
{
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 0ba259b0efd..6db4ce5d61b 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -262,6 +262,7 @@ public:
*/
volatile bool inited;
volatile bool abort_slave;
+ volatile bool stop_for_until;
volatile uint slave_running;
/*
diff --git a/sql/slave.cc b/sql/slave.cc
index 25480da79a1..8482924ef87 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -615,7 +615,14 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
{
DBUG_PRINT("info",("Terminating SQL thread"));
- mi->rli.abort_slave=1;
+ if (opt_slave_parallel_threads > 0 &&
+ mi->rli.abort_slave && mi->rli.stop_for_until)
+ {
+ mi->rli.stop_for_until= false;
+ mi->rli.parallel.stop_during_until();
+ }
+ else
+ mi->rli.abort_slave=1;
if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock,
&mi->rli.stop_cond,
&mi->rli.slave_running,
@@ -3427,6 +3434,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
message about error in query execution to be printed.
*/
rli->abort_slave= 1;
+ rli->stop_for_until= true;
mysql_mutex_unlock(&rli->data_lock);
delete ev;
DBUG_RETURN(1);
@@ -3454,13 +3462,17 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
update_state_of_relay_log(rli, ev);
- /*
- Execute queries in parallel, except if slave_skip_counter is set,
- as it's is easier to skip queries in single threaded mode.
- */
-
- if (opt_slave_parallel_threads > 0 && rli->slave_skip_counter == 0)
- DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, event_size));
+ if (opt_slave_parallel_threads > 0)
+ {
+ int res= rli->parallel.do_event(serial_rgi, ev, event_size);
+ if (res >= 0)
+ DBUG_RETURN(res);
+ /*
+ Else we proceed to execute the event non-parallel.
+ This is the case for pre-10.0 events without GTID, and for handling
+ slave_skip_counter.
+ */
+ }
/*
For GTID, allocate a new sub_id for the given domain_id.
@@ -4371,6 +4383,7 @@ pthread_handler_t handle_slave_sql(void *arg)
Seconds_Behind_Master grows. No big deal.
*/
rli->abort_slave = 0;
+ rli->stop_for_until= false;
mysql_mutex_unlock(&rli->run_lock);
mysql_cond_broadcast(&rli->start_cond);
@@ -4542,7 +4555,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
}
if (opt_slave_parallel_threads > 0)
- rli->parallel.wait_for_done(thd);
+ rli->parallel.wait_for_done(thd, rli);
/* Thread stopped. Print the current replication position to the log */
{
@@ -4568,7 +4581,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
get the correct position printed.)
*/
if (opt_slave_parallel_threads > 0)
- rli->parallel.wait_for_done(thd);
+ rli->parallel.wait_for_done(thd, rli);
/*
Some events set some playgrounds, which won't be cleared because thread