diff options
author | unknown <knielsen@knielsen-hq.org> | 2014-03-04 14:32:42 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2014-03-04 14:32:42 +0100 |
commit | 5ec49e6452d4c76650c8475e387283fdbe480672 (patch) | |
tree | b2ee21e1c2b89e9050ea65ac70fc3792f9df7a69 /sql | |
parent | 016bd4fc5fff311dc4091b3b7329cd980dbaa14b (diff) | |
parent | b5b82108497b5beda3b2fbe98ecea178b5e58076 (diff) | |
download | mariadb-git-5ec49e6452d4c76650c8475e387283fdbe480672.tar.gz |
Merge MDEV-5754, MDEV-5769, and MDEV-5764 into 10.0
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log_event.cc | 37 | ||||
-rw-r--r-- | sql/log_event.h | 9 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 115 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 6 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 3 | ||||
-rw-r--r-- | sql/rpl_rli.h | 1 | ||||
-rw-r--r-- | sql/slave.cc | 33 |
7 files changed, 140 insertions, 64 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 2b55db5dc78..1e69d5bf1cc 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -3736,9 +3736,14 @@ Query_log_event::begin_event(String *packet, ulong ev_offset, DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || checksum_alg == BINLOG_CHECKSUM_ALG_OFF); - /* Currently we only need to replace GTID event. */ - DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN); - if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) + /* + Currently we only need to replace GTID event. + The length of GTID differs depending on whether it contains commit id. + */ + DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN || + data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2); + if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN && + data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2) return 1; flags= uint2korr(p + FLAGS_OFFSET); @@ -3751,9 +3756,22 @@ Query_log_event::begin_event(String *packet, ulong ev_offset, int4store(q + Q_EXEC_TIME_OFFSET, 0); q[Q_DB_LEN_OFFSET]= 0; int2store(q + Q_ERR_CODE_OFFSET, 0); - int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0); - q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */ - q+= Q_DATA_OFFSET + 1; + if (data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) + { + int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0); + q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */ + q+= Q_DATA_OFFSET + 1; + } + else + { + DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN + 2); + /* Put in an empty time_zone_str to take up the extra 2 bytes. */ + int2store(q + Q_STATUS_VARS_LEN_OFFSET, 2); + q[Q_DATA_OFFSET]= Q_TIME_ZONE_CODE; + q[Q_DATA_OFFSET+1]= 0; /* Zero length for empty time_zone_str */ + q[Q_DATA_OFFSET+2]= 0; /* Zero terminator for empty db */ + q+= Q_DATA_OFFSET + 3; + } memcpy(q, "BEGIN", 5); if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) @@ -6779,7 +6797,7 @@ Gtid_list_log_event::write(IO_CACHE *file) int Gtid_list_log_event::do_apply_event(rpl_group_info *rgi) { - Relay_log_info const *rli= rgi->rli; + Relay_log_info *rli= const_cast<Relay_log_info*>(rgi->rli); int ret; if (gl_flags & FLAG_IGN_GTIDS) { @@ -6799,10 +6817,11 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi) { char str_buf[128]; String str(str_buf, sizeof(str_buf), system_charset_info); - const_cast<Relay_log_info*>(rli)->until_gtid_pos.to_string(&str); + rli->until_gtid_pos.to_string(&str); sql_print_information("Slave SQL thread stops because it reached its" " UNTIL master_gtid_pos %s", str.c_ptr_safe()); - const_cast<Relay_log_info*>(rli)->abort_slave= true; + rli->abort_slave= true; + rli->stop_for_until= true; } return ret; } diff --git a/sql/log_event.h b/sql/log_event.h index cfdc65a2c40..a9d1b08171f 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -3123,12 +3123,15 @@ public: <td>flags</td> <td>1 byte bitfield</td> <td>Bit 0 set indicates stand-alone event (no terminating COMMIT)</td> + <td>Bit 1 set indicates group commit, and that commit id exists</td> </tr> <tr> - <td>Reserved</td> - <td>6 bytes</td> - <td>Reserved bytes, set to 0. Maybe be used for future expansion.</td> + <td>Reserved (no group commit) / commit id (group commit) (see flags bit 1)</td> + <td>6 bytes / 8 bytes</td> + <td>Reserved bytes, set to 0. Maybe be used for future expansion (no + group commit). OR commit id, same for all GTIDs in the same group + commit (see flags bit 1).</td> </tr> </table> diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 5947fb70330..154a95c1028 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -173,6 +173,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi) rgi->is_error= true; rgi->cleanup_context(thd, true); rgi->rli->abort_slave= true; + rgi->rli->stop_for_until= false; mysql_mutex_lock(rgi->rli->relay_log.get_log_lock()); mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock()); rgi->rli->relay_log.signal_update(); @@ -1122,7 +1123,7 @@ rpl_parallel::find(uint32 domain_id) void -rpl_parallel::wait_for_done(THD *thd) +rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) { struct rpl_parallel_entry *e; rpl_parallel_thread *rpt; @@ -1152,9 +1153,13 @@ rpl_parallel::wait_for_done(THD *thd) started executing yet. So we set e->stop_count here and use it to decide in the worker threads whether to continue executing an event group or whether to skip it, when force_abort is set. + + If we stop due to reaching the START SLAVE UNTIL condition, then we + need to continue executing any queued events up to that point. */ e->force_abort= true; - e->stop_count= e->count_committing_event_groups; + e->stop_count= rli->stop_for_until ? + e->count_queued_event_groups : e->count_committing_event_groups; mysql_mutex_unlock(&e->LOCK_parallel_entry); for (j= 0; j < e->rpl_thread_max; ++j) { @@ -1190,6 +1195,30 @@ rpl_parallel::wait_for_done(THD *thd) } +/* + This function handles the case where the SQL driver thread reached the + START SLAVE UNTIL position; we stop queueing more events but continue + processing remaining, already queued events; then use executes manual + STOP SLAVE; then this function signals to worker threads that they + should stop the processing of any remaining queued events. +*/ +void +rpl_parallel::stop_during_until() +{ + struct rpl_parallel_entry *e; + uint32 i; + + for (i= 0; i < domain_hash.records; ++i) + { + e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); + mysql_mutex_lock(&e->LOCK_parallel_entry); + if (e->force_abort) + e->stop_count= e->count_committing_event_groups; + mysql_mutex_unlock(&e->LOCK_parallel_entry); + } +} + + bool rpl_parallel::workers_idle() { @@ -1230,11 +1259,12 @@ abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread, do_event() is executed by the sql_driver_thd thread. It's main purpose is to find a thread that can execute the query. - @retval false ok, event was accepted - @retval true error + @retval 0 ok, event was accepted + @retval 1 error + @retval -1 event should be executed serially, in the sql driver thread */ -bool +int rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size) { @@ -1248,6 +1278,32 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, bool did_enter_cond= false; PSI_stage_info old_stage; + /* Handle master log name change, seen in Rotate_log_event. */ + typ= ev->get_type_code(); + if (unlikely(typ == ROTATE_EVENT)) + { + Rotate_log_event *rev= static_cast<Rotate_log_event *>(ev); + if ((rev->server_id != global_system_variables.server_id || + rli->replicate_same_server_id) && + !rev->is_relay_log_event() && + !rli->is_in_group()) + { + memcpy(rli->future_event_master_log_name, + rev->new_log_ident, rev->ident_len+1); + } + } + + /* + Execute queries non-parallel if slave_skip_counter is set, as it's is + easier to skip queries in single threaded mode. + */ + if (rli->slave_skip_counter) + return -1; + + /* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */ + if (unlikely(!current) && typ != GTID_EVENT) + return -1; + /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); @@ -1259,21 +1315,20 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, been partially queued, but after that we will just ignore any further events the SQL driver thread may try to queue, and eventually it will stop. */ - if (((typ= ev->get_type_code()) == GTID_EVENT || - !(is_group_event= Log_event::is_group_event(typ))) && - rli->abort_slave) + is_group_event= Log_event::is_group_event(typ); + if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave) sql_thread_stopping= true; if (sql_thread_stopping) { delete ev; /* - Return false ("no error"); normal stop is not an error, and otherwise the - error has already been recorded. + Return "no error"; normal stop is not an error, and otherwise the error + has already been recorded. */ - return false; + return 0; } - if (typ == GTID_EVENT || unlikely(!current)) + if (typ == GTID_EVENT) { uint32 domain_id; if (likely(typ == GTID_EVENT)) @@ -1288,7 +1343,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); delete ev; - return true; + return 1; } current= e; } @@ -1307,7 +1362,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { /* This means we were killed. The error is already signalled. */ delete ev; - return true; + return 1; } if (!(qev= cur_thread->get_qev(ev, event_size, rli))) @@ -1315,7 +1370,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, abandon_worker_thread(rli->sql_driver_thd, cur_thread, &did_enter_cond, &old_stage); delete ev; - return true; + return 1; } if (typ == GTID_EVENT) @@ -1328,7 +1383,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, abandon_worker_thread(rli->sql_driver_thd, cur_thread, &did_enter_cond, &old_stage); delete ev; - return true; + return 1; } /* @@ -1366,7 +1421,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, abandon_worker_thread(rli->sql_driver_thd, cur_thread, &did_enter_cond, &old_stage); delete ev; - return true; + return 1; } e->current_gco= rgi->gco= gco; } @@ -1380,7 +1435,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, e->current_sub_id= rgi->gtid_sub_id; ++e->count_queued_event_groups; } - else if (!is_group_event || !e) + else if (!is_group_event) { my_off_t log_pos; int err; @@ -1389,38 +1444,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. Same for events not preceeded by GTID (we should not see those normally, but they might be from an old master). - - The variable `e' is NULL for the case where the master did not - have GTID, like a MariaDB 5.5 or MySQL master. */ qev->rgi= serial_rgi; - /* Handle master log name change, seen in Rotate_log_event. */ - if (typ == ROTATE_EVENT) - { - Rotate_log_event *rev= static_cast<Rotate_log_event *>(qev->ev); - if ((rev->server_id != global_system_variables.server_id || - rli->replicate_same_server_id) && - !rev->is_relay_log_event() && - !rli->is_in_group()) - { - memcpy(rli->future_event_master_log_name, - rev->new_log_ident, rev->ident_len+1); - } - } tmp= serial_rgi->is_parallel_exec; serial_rgi->is_parallel_exec= true; err= rpt_handle_event(qev, NULL); serial_rgi->is_parallel_exec= tmp; - log_pos= qev->ev->log_pos; - delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev); + log_pos= ev->log_pos; + delete_or_keep_event_post_apply(serial_rgi, typ, ev); if (err) { cur_thread->free_qev(qev); abandon_worker_thread(rli->sql_driver_thd, cur_thread, &did_enter_cond, &old_stage); - return true; + return 1; } /* Queue an empty event, so that the position will be updated in a @@ -1451,5 +1490,5 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, &did_enter_cond, &old_stage); mysql_cond_signal(&cur_thread->COND_rpl_thread); - return false; + return 0; } diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 956a31e4b7f..c4bb407e5eb 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -222,10 +222,10 @@ struct rpl_parallel { ~rpl_parallel(); void reset(); rpl_parallel_entry *find(uint32 domain_id); - void wait_for_done(THD *thd); + void wait_for_done(THD *thd, Relay_log_info *rli); + void stop_during_until(); bool workers_idle(); - bool do_event(rpl_group_info *serial_rgi, Log_event *ev, - ulonglong event_size); + int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size); }; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 399852744f8..0fae3a3bb89 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -60,7 +60,8 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), - inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), + inited(0), abort_slave(0), stop_for_until(0), + slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), m_flags(0) { diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 0ba259b0efd..6db4ce5d61b 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -262,6 +262,7 @@ public: */ volatile bool inited; volatile bool abort_slave; + volatile bool stop_for_until; volatile uint slave_running; /* diff --git a/sql/slave.cc b/sql/slave.cc index 25480da79a1..8482924ef87 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -615,7 +615,14 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) { DBUG_PRINT("info",("Terminating SQL thread")); - mi->rli.abort_slave=1; + if (opt_slave_parallel_threads > 0 && + mi->rli.abort_slave && mi->rli.stop_for_until) + { + mi->rli.stop_for_until= false; + mi->rli.parallel.stop_during_until(); + } + else + mi->rli.abort_slave=1; if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock, &mi->rli.stop_cond, &mi->rli.slave_running, @@ -3427,6 +3434,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, message about error in query execution to be printed. */ rli->abort_slave= 1; + rli->stop_for_until= true; mysql_mutex_unlock(&rli->data_lock); delete ev; DBUG_RETURN(1); @@ -3454,13 +3462,17 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, update_state_of_relay_log(rli, ev); - /* - Execute queries in parallel, except if slave_skip_counter is set, - as it's is easier to skip queries in single threaded mode. - */ - - if (opt_slave_parallel_threads > 0 && rli->slave_skip_counter == 0) - DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, event_size)); + if (opt_slave_parallel_threads > 0) + { + int res= rli->parallel.do_event(serial_rgi, ev, event_size); + if (res >= 0) + DBUG_RETURN(res); + /* + Else we proceed to execute the event non-parallel. + This is the case for pre-10.0 events without GTID, and for handling + slave_skip_counter. + */ + } /* For GTID, allocate a new sub_id for the given domain_id. @@ -4371,6 +4383,7 @@ pthread_handler_t handle_slave_sql(void *arg) Seconds_Behind_Master grows. No big deal. */ rli->abort_slave = 0; + rli->stop_for_until= false; mysql_mutex_unlock(&rli->run_lock); mysql_cond_broadcast(&rli->start_cond); @@ -4542,7 +4555,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, } if (opt_slave_parallel_threads > 0) - rli->parallel.wait_for_done(thd); + rli->parallel.wait_for_done(thd, rli); /* Thread stopped. Print the current replication position to the log */ { @@ -4568,7 +4581,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, get the correct position printed.) */ if (opt_slave_parallel_threads > 0) - rli->parallel.wait_for_done(thd); + rli->parallel.wait_for_done(thd, rli); /* Some events set some playgrounds, which won't be cleared because thread |