diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-11-01 09:17:06 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-11-01 09:17:06 +0100 |
commit | cb86ce60b9bade5ae7712d8f3f74668208ee3fd2 (patch) | |
tree | daff81c02baa6c2581d6abe3d746b8f35ee44f32 /sql/rpl_parallel.cc | |
parent | f4d5d849fd3b526d38ca6eb083fd0b290eb0eda7 (diff) | |
parent | 39df665a3332bd9bfb2529419f534a49cfac388c (diff) | |
download | mariadb-git-cb86ce60b9bade5ae7712d8f3f74668208ee3fd2.tar.gz |
Merge MDEV-4506: Parallel replication into 10.0-base.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 956 |
1 files changed, 956 insertions, 0 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc new file mode 100644 index 00000000000..8328dd24128 --- /dev/null +++ b/sql/rpl_parallel.cc @@ -0,0 +1,956 @@ +#include "my_global.h" +#include "rpl_parallel.h" +#include "slave.h" +#include "rpl_mi.h" + + +/* + Code for optional parallel execution of replicated events on the slave. + + ToDo list: + + - Retry of failed transactions is not yet implemented for the parallel case. + + - All the waits (eg. in struct wait_for_commit and in + rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill, + everything needs to be correctly rolled back and stopped in all threads, + to ensure a consistent slave replication state. + + - Handle the case of a partial event group. This occurs when the master + crashes in the middle of writing the event group to the binlog. The + slave rolls back the transaction; parallel execution needs to be able + to deal with this wrt. commit_orderer and such. + See Format_description_log_event::do_apply_event(). +*/ + +struct rpl_parallel_thread_pool global_rpl_thread_pool; + + +static int +rpt_handle_event(rpl_parallel_thread::queued_event *qev, + struct rpl_parallel_thread *rpt) +{ + int err __attribute__((unused)); + rpl_group_info *rgi= qev->rgi; + Relay_log_info *rli= rgi->rli; + THD *thd= rgi->thd; + + thd->rgi_slave= rgi; + thd->rpl_filter = rli->mi->rpl_filter; + + /* ToDo: Access to thd, and what about rli, split out a parallel part? */ + mysql_mutex_lock(&rli->data_lock); + qev->ev->thd= thd; + strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name); + rgi->event_relay_log_name= rgi->event_relay_log_name_buf; + rgi->event_relay_log_pos= qev->event_relay_log_pos; + rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; + strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); + err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); + thd->rgi_slave= NULL; + + thread_safe_increment64(&rli->executed_entries, + &slave_executed_entries_lock); + /* ToDo: error handling. */ + return err; +} + + +static void +handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) +{ + int cmp; + Relay_log_info *rli; + /* + Events that are not part of an event group, such as Format Description, + Stop, GTID List and such, are executed directly in the driver SQL thread, + to keep the relay log state up-to-date. But the associated position update + is done here, in sync with other normal events as they are queued to + worker threads. + */ + if ((thd->variables.option_bits & OPTION_BEGIN) && + opt_using_transactions) + return; + rli= qev->rgi->rli; + mysql_mutex_lock(&rli->data_lock); + cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name); + if (cmp < 0) + { + rli->group_relay_log_pos= qev->future_event_relay_log_pos; + strmake_buf(rli->group_relay_log_name, qev->event_relay_log_name); + rli->notify_group_relay_log_name_update(); + } else if (cmp == 0 && + rli->group_relay_log_pos < qev->future_event_relay_log_pos) + rli->group_relay_log_pos= qev->future_event_relay_log_pos; + + cmp= strcmp(rli->group_master_log_name, qev->future_event_master_log_name); + if (cmp < 0) + { + strcpy(rli->group_master_log_name, qev->future_event_master_log_name); + rli->notify_group_master_log_name_update(); + rli->group_master_log_pos= qev->future_event_master_log_pos; + } + else if (cmp == 0 + && rli->group_master_log_pos < qev->future_event_master_log_pos) + rli->group_master_log_pos= qev->future_event_master_log_pos; + mysql_mutex_unlock(&rli->data_lock); + mysql_cond_broadcast(&rli->data_cond); +} + + +static bool +sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) +{ + if (!rgi->rli->abort_slave && !abort_loop) + return false; + + /* + Do not abort in the middle of an event group that cannot be rolled back. + */ + if ((thd->transaction.all.modified_non_trans_table || + (thd->variables.option_bits & OPTION_KEEP_LOG)) + && in_event_group) + return false; + /* ToDo: should we add some timeout like in sql_slave_killed? + if (rgi->last_event_start_time == 0) + rgi->last_event_start_time= my_time(0); + */ + + return true; +} + + +pthread_handler_t +handle_rpl_parallel_thread(void *arg) +{ + THD *thd; + const char* old_msg; + struct rpl_parallel_thread::queued_event *events; + bool group_standalone= true; + bool in_event_group= false; + uint64 event_gtid_sub_id= 0; + int err; + + struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; + + my_thread_init(); + thd = new THD; + thd->thread_stack = (char*)&thd; + mysql_mutex_lock(&LOCK_thread_count); + thd->thread_id= thd->variables.pseudo_thread_id= thread_id++; + threads.append(thd); + mysql_mutex_unlock(&LOCK_thread_count); + set_current_thd(thd); + pthread_detach_this_thread(); + thd->init_for_queries(); + thd->variables.binlog_annotate_row_events= 0; + init_thr_lock(); + thd->store_globals(); + thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; + thd->security_ctx->skip_grants(); + thd->variables.max_allowed_packet= slave_max_allowed_packet; + thd->slave_thread= 1; + thd->enable_slow_log= opt_log_slow_slave_statements; + thd->variables.log_slow_filter= global_system_variables.log_slow_filter; + set_slave_thread_options(thd); + thd->client_capabilities = CLIENT_LOCAL_FILES; + thd_proc_info(thd, "Waiting for work from main SQL threads"); + thd->set_time(); + thd->variables.lock_wait_timeout= LONG_TIMEOUT; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->thd= thd; + + while (rpt->delay_start) + mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); + + rpt->running= true; + mysql_cond_signal(&rpt->COND_rpl_thread); + + while (!rpt->stop && !thd->killed) + { + rpl_parallel_thread *list; + + old_msg= thd->proc_info; + thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread, + "Waiting for work from SQL thread"); + while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) + mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); + rpt->dequeue(events); + thd->exit_cond(old_msg); + mysql_cond_signal(&rpt->COND_rpl_thread); + + more_events: + while (events) + { + struct rpl_parallel_thread::queued_event *next= events->next; + Log_event_type event_type; + rpl_group_info *rgi= events->rgi; + rpl_parallel_entry *entry= rgi->parallel_entry; + uint64 wait_for_sub_id; + uint64 wait_start_sub_id; + bool end_of_group; + + if (!events->ev) + { + handle_queued_pos_update(thd, events); + my_free(events); + events= next; + continue; + } + + err= 0; + /* Handle a new event group, which will be initiated by a GTID event. */ + if ((event_type= events->ev->get_type_code()) == GTID_EVENT) + { + in_event_group= true; + /* + If the standalone flag is set, then this event group consists of a + single statement (possibly preceeded by some Intvar_log_event and + similar), without any terminating COMMIT/ROLLBACK/XID. + */ + group_standalone= + (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 & + Gtid_log_event::FL_STANDALONE)); + + /* Save this, as it gets cleared when the event group commits. */ + event_gtid_sub_id= rgi->gtid_sub_id; + + rgi->thd= thd; + + /* + Register ourself to wait for the previous commit, if we need to do + such registration _and_ that previous commit has not already + occured. + + Also do not start parallel execution of this event group until all + prior groups have committed that are not safe to run in parallel with. + */ + wait_for_sub_id= rgi->wait_commit_sub_id; + wait_start_sub_id= rgi->wait_start_sub_id; + if (wait_for_sub_id || wait_start_sub_id) + { + mysql_mutex_lock(&entry->LOCK_parallel_entry); + if (wait_start_sub_id) + { + while (wait_start_sub_id > entry->last_committed_sub_id) + mysql_cond_wait(&entry->COND_parallel_entry, + &entry->LOCK_parallel_entry); + } + rgi->wait_start_sub_id= 0; /* No need to check again. */ + if (wait_for_sub_id > entry->last_committed_sub_id) + { + wait_for_commit *waitee= + &rgi->wait_commit_group_info->commit_orderer; + rgi->commit_orderer.register_wait_for_prior_commit(waitee); + } + mysql_mutex_unlock(&entry->LOCK_parallel_entry); + } + + if(thd->wait_for_commit_ptr) + { + /* + This indicates that we get a new GTID event in the middle of + a not completed event group. This is corrupt binlog (the master + will never write such binlog), so it does not happen unless + someone tries to inject wrong crafted binlog, but let us still + try to handle it somewhat nicely. + */ + rgi->cleanup_context(thd, true); + thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); + thd->wait_for_commit_ptr->wakeup_subsequent_commits(err); + } + thd->wait_for_commit_ptr= &rgi->commit_orderer; + } + + /* + If the SQL thread is stopping, we just skip execution of all the + following event groups. We still do all the normal waiting and wakeup + processing between the event groups as a simple way to ensure that + everything is stopped and cleaned up correctly. + */ + if (!rgi->is_error && !sql_worker_killed(thd, rgi, in_event_group)) + err= rpt_handle_event(events, rpt); + else + err= thd->wait_for_prior_commit(); + + end_of_group= + in_event_group && + ((group_standalone && !Log_event::is_part_of_group(event_type)) || + event_type == XID_EVENT || + (event_type == QUERY_EVENT && + (((Query_log_event *)events->ev)->is_commit() || + ((Query_log_event *)events->ev)->is_rollback()))); + + delete_or_keep_event_post_apply(rgi, event_type, events->ev); + my_free(events); + + if (err) + { + rgi->is_error= true; + slave_output_error_info(rgi->rli, thd); + rgi->cleanup_context(thd, true); + rgi->rli->abort_slave= true; + } + if (end_of_group) + { + in_event_group= false; + + /* + Remove any left-over registration to wait for a prior commit to + complete. Normally, such wait would already have been removed at + this point by wait_for_prior_commit(), but eg. in error case we + might have skipped waiting, so we would need to remove it explicitly. + */ + rgi->commit_orderer.unregister_wait_for_prior_commit(); + thd->wait_for_commit_ptr= NULL; + + /* + Record that this event group has finished (eg. transaction is + committed, if transactional), so other event groups will no longer + attempt to wait for us to commit. Once we have increased + entry->last_committed_sub_id, no other threads will execute + register_wait_for_prior_commit() against us. Thus, by doing one + extra (usually redundant) wakeup_subsequent_commits() we can ensure + that no register_wait_for_prior_commit() can ever happen without a + subsequent wakeup_subsequent_commits() to wake it up. + + We can race here with the next transactions, but that is fine, as + long as we check that we do not decrease last_committed_sub_id. If + this commit is done, then any prior commits will also have been + done and also no longer need waiting for. + */ + mysql_mutex_lock(&entry->LOCK_parallel_entry); + if (entry->last_committed_sub_id < event_gtid_sub_id) + { + entry->last_committed_sub_id= event_gtid_sub_id; + mysql_cond_broadcast(&entry->COND_parallel_entry); + } + mysql_mutex_unlock(&entry->LOCK_parallel_entry); + + rgi->commit_orderer.wakeup_subsequent_commits(err); + delete rgi; + } + + events= next; + } + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if ((events= rpt->event_queue) != NULL) + { + /* + Take next group of events from the replication pool. + This is faster than having to wakeup the pool manager thread to give us + a new event. + */ + rpt->dequeue(events); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + mysql_cond_signal(&rpt->COND_rpl_thread); + goto more_events; + } + + if (!in_event_group) + { + rpt->current_entry= NULL; + if (!rpt->stop) + { + mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool); + list= rpt->pool->free_list; + rpt->next= list; + rpt->pool->free_list= rpt; + if (!list) + mysql_cond_broadcast(&rpt->pool->COND_rpl_thread_pool); + mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool); + } + } + } + + rpt->thd= NULL; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + + thd->clear_error(); + thd->catalog= 0; + thd->reset_query(); + thd->reset_db(NULL, 0); + thd_proc_info(thd, "Slave worker thread exiting"); + thd->temporary_tables= 0; + mysql_mutex_lock(&LOCK_thread_count); + THD_CHECK_SENTRY(thd); + delete thd; + mysql_mutex_unlock(&LOCK_thread_count); + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->running= false; + mysql_cond_signal(&rpt->COND_rpl_thread); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + + my_thread_end(); + + return NULL; +} + + +int +rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, + uint32 new_count, bool skip_check) +{ + uint32 i; + rpl_parallel_thread **new_list= NULL; + rpl_parallel_thread *new_free_list= NULL; + rpl_parallel_thread *rpt_array= NULL; + + /* + Allocate the new list of threads up-front. + That way, if we fail half-way, we only need to free whatever we managed + to allocate, and will not be left with a half-functional thread pool. + */ + if (new_count && + !my_multi_malloc(MYF(MY_WME|MY_ZEROFILL), + &new_list, new_count*sizeof(*new_list), + &rpt_array, new_count*sizeof(*rpt_array), + NULL)) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) + + new_count*sizeof(*rpt_array)))); + goto err;; + } + + for (i= 0; i < new_count; ++i) + { + pthread_t th; + + new_list[i]= &rpt_array[i]; + new_list[i]->delay_start= true; + mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread, + MY_MUTEX_INIT_SLOW); + mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL); + new_list[i]->pool= pool; + if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL, + handle_rpl_parallel_thread, new_list[i])) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + goto err; + } + new_list[i]->next= new_free_list; + new_free_list= new_list[i]; + } + + if (!skip_check) + { + mysql_mutex_lock(&LOCK_active_mi); + if (master_info_index->give_error_if_slave_running()) + { + mysql_mutex_unlock(&LOCK_active_mi); + goto err; + } + if (pool->changing) + { + mysql_mutex_unlock(&LOCK_active_mi); + my_error(ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE, MYF(0)); + goto err; + } + pool->changing= true; + mysql_mutex_unlock(&LOCK_active_mi); + } + + /* + Grab each old thread in turn, and signal it to stop. + + Note that since we require all replication threads to be stopped before + changing the parallel replication worker thread pool, all the threads will + be already idle and will terminate immediately. + */ + for (i= 0; i < pool->count; ++i) + { + rpl_parallel_thread *rpt= pool->get_thread(NULL); + rpt->stop= true; + mysql_cond_signal(&rpt->COND_rpl_thread); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + } + + for (i= 0; i < pool->count; ++i) + { + rpl_parallel_thread *rpt= pool->threads[i]; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + while (rpt->running) + mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + mysql_mutex_destroy(&rpt->LOCK_rpl_thread); + mysql_cond_destroy(&rpt->COND_rpl_thread); + } + + my_free(pool->threads); + pool->threads= new_list; + pool->free_list= new_free_list; + pool->count= new_count; + for (i= 0; i < pool->count; ++i) + { + mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread); + pool->threads[i]->delay_start= false; + mysql_cond_signal(&pool->threads[i]->COND_rpl_thread); + while (!pool->threads[i]->running) + mysql_cond_wait(&pool->threads[i]->COND_rpl_thread, + &pool->threads[i]->LOCK_rpl_thread); + mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread); + } + + if (!skip_check) + { + mysql_mutex_lock(&LOCK_active_mi); + pool->changing= false; + mysql_mutex_unlock(&LOCK_active_mi); + } + return 0; + +err: + if (new_list) + { + while (new_free_list) + { + mysql_mutex_lock(&new_free_list->LOCK_rpl_thread); + new_free_list->delay_start= false; + new_free_list->stop= true; + mysql_cond_signal(&new_free_list->COND_rpl_thread); + while (!new_free_list->running) + mysql_cond_wait(&new_free_list->COND_rpl_thread, + &new_free_list->LOCK_rpl_thread); + while (new_free_list->running) + mysql_cond_wait(&new_free_list->COND_rpl_thread, + &new_free_list->LOCK_rpl_thread); + mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread); + new_free_list= new_free_list->next; + } + my_free(new_list); + } + if (!skip_check) + { + mysql_mutex_lock(&LOCK_active_mi); + pool->changing= false; + mysql_mutex_unlock(&LOCK_active_mi); + } + return 1; +} + + +rpl_parallel_thread_pool::rpl_parallel_thread_pool() + : count(0), threads(0), free_list(0), changing(false), inited(false) +{ +} + + +int +rpl_parallel_thread_pool::init(uint32 size) +{ + count= 0; + threads= NULL; + free_list= NULL; + + mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, + MY_MUTEX_INIT_SLOW); + mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL); + changing= false; + inited= true; + + return rpl_parallel_change_thread_count(this, size, true); +} + + +void +rpl_parallel_thread_pool::destroy() +{ + if (!inited) + return; + rpl_parallel_change_thread_count(this, 0, true); + mysql_mutex_destroy(&LOCK_rpl_thread_pool); + mysql_cond_destroy(&COND_rpl_thread_pool); + inited= false; +} + + +/* + Wait for a worker thread to become idle. When one does, grab the thread for + our use and return it. + + Note that we return with the worker threads's LOCK_rpl_thread mutex locked. +*/ +struct rpl_parallel_thread * +rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry) +{ + rpl_parallel_thread *rpt; + + mysql_mutex_lock(&LOCK_rpl_thread_pool); + while ((rpt= free_list) == NULL) + mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); + free_list= rpt->next; + mysql_mutex_unlock(&LOCK_rpl_thread_pool); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->current_entry= entry; + + return rpt; +} + + +static void +free_rpl_parallel_entry(void *element) +{ + rpl_parallel_entry *e= (rpl_parallel_entry *)element; + mysql_cond_destroy(&e->COND_parallel_entry); + mysql_mutex_destroy(&e->LOCK_parallel_entry); + my_free(e); +} + + +rpl_parallel::rpl_parallel() : + current(NULL), sql_thread_stopping(false) +{ + my_hash_init(&domain_hash, &my_charset_bin, 32, + offsetof(rpl_parallel_entry, domain_id), sizeof(uint32), + NULL, free_rpl_parallel_entry, HASH_UNIQUE); +} + + +void +rpl_parallel::reset() +{ + my_hash_reset(&domain_hash); + current= NULL; + sql_thread_stopping= false; +} + + +rpl_parallel::~rpl_parallel() +{ + my_hash_free(&domain_hash); +} + + +rpl_parallel_entry * +rpl_parallel::find(uint32 domain_id) +{ + struct rpl_parallel_entry *e; + + if (!(e= (rpl_parallel_entry *)my_hash_search(&domain_hash, + (const uchar *)&domain_id, 0))) + { + /* Allocate a new, empty one. */ + if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e), + MYF(MY_ZEROFILL)))) + return NULL; + e->domain_id= domain_id; + if (my_hash_insert(&domain_hash, (uchar *)e)) + { + my_free(e); + return NULL; + } + mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL); + } + + return e; +} + + +void +rpl_parallel::wait_for_done() +{ + struct rpl_parallel_entry *e; + uint32 i; + + for (i= 0; i < domain_hash.records; ++i) + { + e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); + mysql_mutex_lock(&e->LOCK_parallel_entry); + while (e->current_sub_id > e->last_committed_sub_id) + mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); + mysql_mutex_unlock(&e->LOCK_parallel_entry); + } +} + + +/* + do_event() is executed by the sql_driver_thd thread. + It's main purpose is to find a thread that can execute the query. + + @retval false ok, event was accepted + @retval true error +*/ + +bool +rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, + ulonglong event_size) +{ + rpl_parallel_entry *e; + rpl_parallel_thread *cur_thread; + rpl_parallel_thread::queued_event *qev; + rpl_group_info *rgi= NULL; + Relay_log_info *rli= serial_rgi->rli; + enum Log_event_type typ; + bool is_group_event; + + /* ToDo: what to do with this lock?!? */ + mysql_mutex_unlock(&rli->data_lock); + + /* + Stop queueing additional event groups once the SQL thread is requested to + stop. + */ + if (((typ= ev->get_type_code()) == GTID_EVENT || + !(is_group_event= Log_event::is_group_event(typ))) && + rli->abort_slave) + sql_thread_stopping= true; + if (sql_thread_stopping) + { + /* QQ: Need a better comment why we return false here */ + return false; + } + + if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev), + MYF(0)))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return true; + } + qev->ev= ev; + qev->event_size= event_size; + qev->next= NULL; + strcpy(qev->event_relay_log_name, rli->event_relay_log_name); + qev->event_relay_log_pos= rli->event_relay_log_pos; + qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; + strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name); + + if (typ == GTID_EVENT) + { + Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); + uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? + 0 : gtid_ev->domain_id); + + if (!(e= find(domain_id)) || + !(rgi= new rpl_group_info(rli)) || + event_group_new_gtid(rgi, gtid_ev)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); + delete rgi; + return true; + } + rgi->is_parallel_exec = true; + if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) + rgi->deferred_events= new Deferred_log_events(rli); + + if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) && + e->last_commit_id == gtid_ev->commit_id) + { + /* + We are already executing something else in this domain. But the two + event groups were committed together in the same group commit on the + master, so we can still do them in parallel here on the slave. + + However, the commit of this event must wait for the commit of the prior + event, to preserve binlog commit order and visibility across all + servers in the replication hierarchy. + */ + rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e); + rgi->wait_commit_sub_id= e->current_sub_id; + rgi->wait_commit_group_info= e->current_group_info; + rgi->wait_start_sub_id= e->prev_groupcommit_sub_id; + e->rpl_thread= cur_thread= rpt; + /* get_thread() returns with the LOCK_rpl_thread locked. */ + } + else + { + /* + Check if we already have a worker thread for this entry. + + We continue to queue more events up for the worker thread while it is + still executing the first ones, to be able to start executing a large + event group without having to wait for the end to be fetched from the + master. And we continue to queue up more events after the first group, + so that we can continue to process subsequent parts of the relay log in + parallel without having to wait for previous long-running events to + complete. + + But if the worker thread is idle at any point, it may return to the + idle list or start servicing a different request. So check this, and + allocate a new thread if the old one is no longer processing for us. + */ + cur_thread= e->rpl_thread; + if (cur_thread) + { + mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); + for (;;) + { + if (cur_thread->current_entry != e) + { + /* + The worker thread became idle, and returned to the free list and + possibly was allocated to a different request. This also means + that everything previously queued has already been executed, + else the worker thread would not have become idle. So we should + allocate a new worker thread. + */ + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + e->rpl_thread= cur_thread= NULL; + break; + } + else if (cur_thread->queued_size <= opt_slave_parallel_max_queued) + break; // The thread is ready to queue into + else + { + /* + We have reached the limit of how much memory we are allowed to + use for queuing events, so wait for the thread to consume some + of its queue. + */ + mysql_cond_wait(&cur_thread->COND_rpl_thread, + &cur_thread->LOCK_rpl_thread); + } + } + } + + if (!cur_thread) + { + /* + Nothing else is currently running in this domain. We can + spawn a new thread to do this event group in parallel with + anything else that might be running in other domains. + */ + cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e); + /* get_thread() returns with the LOCK_rpl_thread locked. */ + } + else + { + /* + We are still executing the previous event group for this replication + domain, and we have to wait for that to finish before we can start on + the next one. So just re-use the thread. + */ + } + + rgi->wait_commit_sub_id= 0; + rgi->wait_start_sub_id= 0; + e->prev_groupcommit_sub_id= e->current_sub_id; + } + + if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) + { + e->last_server_id= gtid_ev->server_id; + e->last_seq_no= gtid_ev->seq_no; + e->last_commit_id= gtid_ev->commit_id; + } + else + { + e->last_server_id= 0; + e->last_seq_no= 0; + e->last_commit_id= 0; + } + + qev->rgi= e->current_group_info= rgi; + e->current_sub_id= rgi->gtid_sub_id; + current= rgi->parallel_entry= e; + } + else if (!is_group_event || !current) + { + my_off_t log_pos; + int err; + bool tmp; + /* + Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. + Same for events not preceeded by GTID (we should not see those normally, + but they might be from an old master). + + The varuable `current' is NULL for the case where the master did not + have GTID, like a MariaDB 5.5 or MySQL master. + */ + qev->rgi= serial_rgi; + /* Handle master log name change, seen in Rotate_log_event. */ + if (typ == ROTATE_EVENT) + { + Rotate_log_event *rev= static_cast<Rotate_log_event *>(qev->ev); + if ((rev->server_id != global_system_variables.server_id || + rli->replicate_same_server_id) && + !rev->is_relay_log_event() && + !rli->is_in_group()) + { + memcpy(rli->future_event_master_log_name, + rev->new_log_ident, rev->ident_len+1); + } + } + + tmp= serial_rgi->is_parallel_exec; + serial_rgi->is_parallel_exec= true; + err= rpt_handle_event(qev, NULL); + serial_rgi->is_parallel_exec= tmp; + log_pos= qev->ev->log_pos; + delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev); + + if (err) + { + my_free(qev); + return true; + } + qev->ev= NULL; + qev->future_event_master_log_pos= log_pos; + if (!current) + { + handle_queued_pos_update(rli->sql_driver_thd, qev); + my_free(qev); + return false; + } + /* + Queue an empty event, so that the position will be updated in a + reasonable way relative to other events: + + - If the currently executing events are queued serially for a single + thread, the position will only be updated when everything before has + completed. + + - If we are executing multiple independent events in parallel, then at + least the position will not be updated until one of them has reached + the current point. + */ + cur_thread= current->rpl_thread; + if (cur_thread) + { + mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); + if (cur_thread->current_entry != current) + { + /* Not ours anymore, we need to grab a new one. */ + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + cur_thread= NULL; + } + } + if (!cur_thread) + cur_thread= current->rpl_thread= + global_rpl_thread_pool.get_thread(current); + } + else + { + cur_thread= current->rpl_thread; + if (cur_thread) + { + mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); + if (cur_thread->current_entry != current) + { + /* Not ours anymore, we need to grab a new one. */ + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + cur_thread= NULL; + } + } + if (!cur_thread) + { + cur_thread= current->rpl_thread= + global_rpl_thread_pool.get_thread(current); + } + qev->rgi= current->current_group_info; + } + + /* + Queue the event for processing. + */ + rli->event_relay_log_pos= rli->future_event_relay_log_pos; + cur_thread->enqueue(qev); + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + mysql_cond_signal(&cur_thread->COND_rpl_thread); + + return false; +} |