summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/log.cc280
-rw-r--r--sql/log.h2
-rw-r--r--sql/log_event.cc72
-rw-r--r--sql/log_event.h74
-rw-r--r--sql/log_event_old.cc6
-rw-r--r--sql/log_event_old.h12
-rw-r--r--sql/rpl_gtid.cc2
-rw-r--r--sql/rpl_parallel.cc15
-rw-r--r--sql/rpl_parallel.h6
-rw-r--r--sql/rpl_rli.cc8
-rw-r--r--sql/rpl_rli.h14
-rw-r--r--sql/rpl_utility.cc2
-rw-r--r--sql/slave.cc2
-rw-r--r--sql/sql_binlog.cc2
-rw-r--r--sql/sql_class.cc7
-rw-r--r--sql/sql_class.h2
16 files changed, 313 insertions, 193 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 61d4428fc18..763eb4177ea 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6542,26 +6542,87 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
}
}
+
+/*
+ Put a transaction that is ready to commit in the group commit queue.
+ The transaction is identified by the ENTRY object passed into this function.
+
+ To facilitate group commit for the binlog, we first queue up ourselves in
+ this function. Then later the first thread to enter the queue waits for
+ the LOCK_log mutex, and commits for everyone in the queue once it gets the
+ lock. Any other threads in the queue just wait for the first one to finish
+ the commit and wake them up. This way, all transactions in the queue get
+ committed in a single disk operation.
+
+ The return value of this function is TRUE if queued as the first entry in
+ the queue (meaning this is the leader), FALSE otherwise.
+
+ The main work in this function is when the commit in one transaction has
+ been marked to wait for the commit of another transaction to happen
+ first. This is used to support in-order parallel replication, where
+ transactions can execute out-of-order but need to be committed in-order with
+ how they happened on the master. The waiting of one commit on another needs
+ to be integrated with the group commit queue, to ensure that the waiting
+ transaction can participate in the same group commit as the waited-for
+ transaction.
+
+ So when we put a transaction in the queue, we check if there were other
+ transactions already prepared to commit but just waiting for the first one
+ to commit. If so, we add those to the queue as well, transitively for all
+ waiters.
+*/
+
bool
-MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
- wait_for_commit *wfc)
+MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry)
{
group_commit_entry *orig_queue;
wait_for_commit *list, *cur, *last;
+ wait_for_commit *wfc;
/*
- To facilitate group commit for the binlog, we first queue up ourselves in
- the group commit queue. Then the first thread to enter the queue waits for
- the LOCK_log mutex, and commits for everyone in the queue once it gets the
- lock. Any other threads in the queue just wait for the first one to finish
- the commit and wake them up.
-
- To support in-order parallel replication with group commit, after we add
- some transaction to the queue, we check if there were other transactions
- already prepared to commit but just waiting for the first one to commit.
- If so, we add those to the queue as well, transitively for all waiters.
+ Check if we need to wait for another transaction to commit before us.
+
+ It is safe to do a quick check without lock first in the case where we do
+ not have to wait. But if the quick check shows we need to wait, we must do
+ another safe check under lock, to avoid the race where the other
+ transaction wakes us up between the check and the wait.
*/
+ wfc= entry->thd->wait_for_commit_ptr;
+ entry->queued_by_other= false;
+ if (wfc && wfc->waiting_for_commit)
+ {
+ mysql_mutex_lock(&wfc->LOCK_wait_commit);
+ /* Do an extra check here, this time safely under lock. */
+ if (wfc->waiting_for_commit)
+ {
+ /*
+ By setting wfc->opaque_pointer to our own entry, we mark that we are
+ ready to commit, but waiting for another transaction to commit before
+ us.
+
+ This other transaction may then take over the commit process for us to
+ get us included in its own group commit. If this happens, the
+ queued_by_other flag is set.
+ */
+ wfc->opaque_pointer= entry;
+ do
+ {
+ mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
+ } while (wfc->waiting_for_commit);
+ wfc->opaque_pointer= NULL;
+ }
+ mysql_mutex_unlock(&wfc->LOCK_wait_commit);
+ }
+ /*
+ If the transaction we were waiting for has already put us into the group
+ commit queue (and possibly already done the entire binlog commit for us),
+ then there is nothing else to do.
+ */
+ if (entry->queued_by_other)
+ return false;
+
+ /* Now enqueue ourselves in the group commit queue. */
entry->thd->clear_wakeup_ready();
mysql_mutex_lock(&LOCK_prepare_ordered);
orig_queue= group_commit_queue;
@@ -6574,6 +6635,23 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
This would be natural to do with recursion, but we want to avoid
potentially unbounded recursion blowing the C stack, so we use the list
approach instead.
+
+ We keep a list of all the waiters that need to be processed in `list',
+ linked through the next_subsequent_commit pointer. Initially this list
+ contains only the entry passed into this function.
+
+ We process entries in the list one by one. The element currently being
+ processed is pointed to by `cur`, and the element at the end of the list
+ is pointed to by `last` (we do not use NULL to terminate the list).
+
+ As we process an element, it is first added to the group_commit_queue.
+ Then any waiters for that element are added at the end of the list, to
+ be processed in subsequent iterations. This continues until the list
+ is exhausted, with all elements ever added eventually processed.
+
+ The end result is a breath-first traversal of the tree of waiters,
+ re-using the next_subsequent_commit pointers in place of extra stack
+ space in a recursive traversal.
*/
list= wfc;
cur= list;
@@ -6594,6 +6672,12 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
if (!cur)
break; // Can happen if initial entry has no wait_for_commit
+ /*
+ Check if this transaction has other transaction waiting for it to commit.
+
+ If so, process the waiting transactions, and their waiters and so on,
+ transitively.
+ */
if (cur->subsequent_commits_list)
{
bool have_lock;
@@ -6601,63 +6685,66 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
mysql_mutex_lock(&cur->LOCK_wait_commit);
have_lock= true;
+ /*
+ Grab the list, now safely under lock, and process it if still
+ non-empty.
+ */
waiter= cur->subsequent_commits_list;
- /* Check again, now safely under lock. */
- if (waiter)
+ cur->subsequent_commits_list= NULL;
+ while (waiter)
{
- /* Grab the list of waiters and process it. */
- cur->subsequent_commits_list= NULL;
- do
+ wait_for_commit *next= waiter->next_subsequent_commit;
+ group_commit_entry *entry2=
+ (group_commit_entry *)waiter->opaque_pointer;
+ if (entry2)
{
- wait_for_commit *next= waiter->next_subsequent_commit;
- group_commit_entry *entry2=
- (group_commit_entry *)waiter->opaque_pointer;
- if (entry2)
- {
- /*
- This is another transaction ready to be written to the binary
- log. We can put it into the queue directly, without needing a
- separate context switch to the other thread. We just set a flag
- so that the other thread will know when it wakes up that it was
- already processed.
-
- So put it at the end of the list to be processed in a subsequent
- iteration of the outer loop.
- */
- entry2->queued_by_other= true;
- last->next_subsequent_commit= waiter;
- last= waiter;
- /*
- As a small optimisation, we do not actually need to set
- waiter->next_subsequent_commit to NULL, as we can use the
- pointer `last' to check for end-of-list.
- */
- }
- else
- {
- /*
- Wake up the waiting transaction.
+ /*
+ This is another transaction ready to be written to the binary
+ log. We can put it into the queue directly, without needing a
+ separate context switch to the other thread. We just set a flag
+ so that the other thread will know when it wakes up that it was
+ already processed.
+
+ So put it at the end of the list to be processed in a subsequent
+ iteration of the outer loop.
+ */
+ entry2->queued_by_other= true;
+ last->next_subsequent_commit= waiter;
+ last= waiter;
+ /*
+ As a small optimisation, we do not actually need to set
+ waiter->next_subsequent_commit to NULL, as we can use the
+ pointer `last' to check for end-of-list.
+ */
+ }
+ else
+ {
+ /*
+ Wake up the waiting transaction.
- For this, we need to set the "wakeup running" flag and release
- the waitee lock to avoid a deadlock, see comments on
- THD::wakeup_subsequent_commits2() for details.
- */
- if (have_lock)
- {
- cur->wakeup_subsequent_commits_running= true;
- mysql_mutex_unlock(&cur->LOCK_wait_commit);
- have_lock= false;
- }
- waiter->wakeup();
+ For this, we need to set the "wakeup running" flag and release
+ the waitee lock to avoid a deadlock, see comments on
+ THD::wakeup_subsequent_commits2() for details.
+ */
+ if (have_lock)
+ {
+ have_lock= false;
+ cur->wakeup_subsequent_commits_running= true;
+ mysql_mutex_unlock(&cur->LOCK_wait_commit);
}
- waiter= next;
- } while (waiter);
+ waiter->wakeup();
+ }
+ waiter= next;
}
if (have_lock)
mysql_mutex_unlock(&cur->LOCK_wait_commit);
}
if (cur == last)
break;
+ /*
+ Move to the next entry in the flattened list of waiting transactions
+ that still need to be processed transitively.
+ */
cur= cur->next_subsequent_commit;
entry= (group_commit_entry *)cur->opaque_pointer;
DBUG_ASSERT(entry != NULL);
@@ -6691,31 +6778,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
bool
MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{
- wait_for_commit *wfc;
- bool is_leader;
-
- wfc= entry->thd->wait_for_commit_ptr;
- entry->queued_by_other= false;
- if (wfc && wfc->waiting_for_commit)
- {
- mysql_mutex_lock(&wfc->LOCK_wait_commit);
- /* Do an extra check here, this time safely under lock. */
- if (wfc->waiting_for_commit)
- {
- wfc->opaque_pointer= entry;
- do
- {
- mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
- } while (wfc->waiting_for_commit);
- wfc->opaque_pointer= NULL;
- }
- mysql_mutex_unlock(&wfc->LOCK_wait_commit);
- }
-
- if (entry->queued_by_other)
- is_leader= false;
- else
- is_leader= queue_for_group_commit(entry, wfc);
+ bool is_leader= queue_for_group_commit(entry);
/*
The first in the queue handles group commit for all; the others just wait
@@ -6756,6 +6819,16 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
if (next)
{
+ /*
+ Wake up the next thread in the group commit.
+
+ The next thread can be waiting in two different ways, depending on
+ whether it put itself in the queue, or if it was put in queue by us
+ because it had to wait for us to commit first.
+
+ So execute the appropriate wakeup, identified by the queued_by_other
+ field.
+ */
if (next->queued_by_other)
next->thd->wait_for_commit_ptr->wakeup();
else
@@ -6840,14 +6913,18 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
mysql_mutex_lock(&LOCK_log);
DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
- binlog_id= current_binlog_id;
mysql_mutex_lock(&LOCK_prepare_ordered);
if (opt_binlog_commit_wait_count)
wait_for_sufficient_commits();
+ /*
+ Note that wait_for_sufficient_commits() may have released and
+ re-acquired the LOCK_log and LOCK_prepare_ordered if it needed to wait.
+ */
current= group_commit_queue;
group_commit_queue= NULL;
mysql_mutex_unlock(&LOCK_prepare_ordered);
+ binlog_id= current_binlog_id;
/* As the queue is in reverse order of entering, reverse it. */
last_in_queue= current;
@@ -7141,6 +7218,13 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
}
+/*
+ Wait for sufficient commits to queue up for group commit, according to the
+ values of binlog_commit_wait_count and binlog_commit_wait_usec.
+
+ Note that this function may release and re-acquire LOCK_log and
+ LOCK_prepare_ordered if it needs to wait.
+*/
void
MYSQL_BIN_LOG::wait_for_sufficient_commits()
{
@@ -7152,11 +7236,9 @@ MYSQL_BIN_LOG::wait_for_sufficient_commits()
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_owner(&LOCK_prepare_ordered);
- count= 0;
- for (e= last_head= group_commit_queue; e; e= e->next)
- ++count;
- if (count >= opt_binlog_commit_wait_count)
- return;
+ for (e= last_head= group_commit_queue, count= 0; e; e= e->next)
+ if (++count >= opt_binlog_commit_wait_count)
+ return;
mysql_mutex_unlock(&LOCK_log);
set_timespec_nsec(wait_until, (ulonglong)1000*opt_binlog_commit_wait_usec);
@@ -7178,7 +7260,25 @@ MYSQL_BIN_LOG::wait_for_sufficient_commits()
last_head= head;
}
- mysql_mutex_lock(&LOCK_log);
+ /*
+ We must not wait for LOCK_log while holding LOCK_prepare_ordered.
+ LOCK_log can be held for long periods (eg. we do I/O under it), while
+ LOCK_prepare_ordered must only be held for short periods.
+
+ In addition, waiting for LOCK_log while holding LOCK_prepare_ordered would
+ violate locking order of LOCK_log-before-LOCK_prepare_ordered. This could
+ cause SAFEMUTEX warnings (even if it cannot actually deadlock with current
+ code, as there can be at most one group commit leader thread at a time).
+
+ So release and re-acquire LOCK_prepare_ordered if we need to wait for the
+ LOCK_log.
+ */
+ if (mysql_mutex_trylock(&LOCK_log))
+ {
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+ mysql_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ }
}
diff --git a/sql/log.h b/sql/log.h
index efb560dc245..8b5fe17e660 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -540,7 +540,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
void do_checkpoint_request(ulong binlog_id);
void purge();
int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id);
- bool queue_for_group_commit(group_commit_entry *entry, wait_for_commit *wfc);
+ bool queue_for_group_commit(group_commit_entry *entry);
bool write_transaction_to_binlog_events(group_commit_entry *entry);
void trx_group_commit_leader(group_commit_entry *leader);
bool is_xidlist_idle_nolock();
diff --git a/sql/log_event.cc b/sql/log_event.cc
index f07c58f4d6b..c0a2ebfa365 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -937,7 +937,7 @@ Log_event::Log_event(const char* buf,
#ifndef MYSQL_CLIENT
#ifdef HAVE_REPLICATION
-int Log_event::do_update_pos(struct rpl_group_info *rgi)
+int Log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
@@ -3756,7 +3756,7 @@ void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Query_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Query_log_event::do_apply_event(rpl_group_info *rgi)
{
return do_apply_event(rgi, query, q_len);
}
@@ -3807,8 +3807,8 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error)
mismatch. This mismatch could be implemented with a new ER_ code, and
to ignore it you would use --slave-skip-errors...
*/
-int Query_log_event::do_apply_event(struct rpl_group_info *rgi,
- const char *query_arg, uint32 q_len_arg)
+int Query_log_event::do_apply_event(rpl_group_info *rgi,
+ const char *query_arg, uint32 q_len_arg)
{
LEX_STRING new_db;
int expected_error,actual_error= 0;
@@ -4244,7 +4244,7 @@ end:
DBUG_RETURN(thd->is_slave_error);
}
-int Query_log_event::do_update_pos(struct rpl_group_info *rgi)
+int Query_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
@@ -4461,7 +4461,7 @@ bool Start_log_event_v3::write(IO_CACHE* file)
other words, no deadlock problem.
*/
-int Start_log_event_v3::do_apply_event(struct rpl_group_info *rgi)
+int Start_log_event_v3::do_apply_event(rpl_group_info *rgi)
{
DBUG_ENTER("Start_log_event_v3::do_apply_event");
int error= 0;
@@ -4810,7 +4810,7 @@ bool Format_description_log_event::write(IO_CACHE* file)
#endif
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Format_description_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Format_description_log_event::do_apply_event(rpl_group_info *rgi)
{
int ret= 0;
Relay_log_info const *rli= rgi->rli;
@@ -4867,7 +4867,7 @@ int Format_description_log_event::do_apply_event(struct rpl_group_info *rgi)
DBUG_RETURN(ret);
}
-int Format_description_log_event::do_update_pos(struct rpl_group_info *rgi)
+int Format_description_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
if (server_id == (uint32) global_system_variables.server_id)
@@ -5516,7 +5516,7 @@ void Load_log_event::set_fields(const char* affected_db,
1 Failure
*/
-int Load_log_event::do_apply_event(NET* net, struct rpl_group_info *rgi,
+int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi,
bool use_rli_only_for_errors)
{
LEX_STRING new_db;
@@ -5919,7 +5919,7 @@ bool Rotate_log_event::write(IO_CACHE* file)
@retval
0 ok
*/
-int Rotate_log_event::do_update_pos(struct rpl_group_info *rgi)
+int Rotate_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Rotate_log_event::do_update_pos");
@@ -6096,7 +6096,7 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file)
Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
const Format_description_log_event *description_event)
- : Log_event(buf, description_event), seq_no(0)
+ : Log_event(buf, description_event), seq_no(0), commit_id(0)
{
uint8 header_size= description_event->common_header_len;
uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1];
@@ -6120,8 +6120,6 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
++buf;
commit_id= uint8korr(buf);
}
- else
- commit_id= 0;
}
@@ -6254,7 +6252,7 @@ Gtid_log_event::pack_info(THD *thd, Protocol *protocol)
static char gtid_begin_string[] = "BEGIN";
int
-Gtid_log_event::do_apply_event(struct rpl_group_info *rgi)
+Gtid_log_event::do_apply_event(rpl_group_info *rgi)
{
thd->variables.server_id= this->server_id;
thd->variables.gtid_domain_id= this->domain_id;
@@ -6295,7 +6293,7 @@ Gtid_log_event::do_apply_event(struct rpl_group_info *rgi)
int
-Gtid_log_event::do_update_pos(struct rpl_group_info *rgi)
+Gtid_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
@@ -6477,7 +6475,7 @@ Gtid_list_log_event::write(IO_CACHE *file)
int
-Gtid_list_log_event::do_apply_event(struct rpl_group_info *rgi)
+Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
{
Relay_log_info const *rli= rgi->rli;
int ret= Log_event::do_apply_event(rgi);
@@ -6707,7 +6705,7 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
Intvar_log_event::do_apply_event()
*/
-int Intvar_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Intvar_log_event::do_apply_event(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
@@ -6731,7 +6729,7 @@ int Intvar_log_event::do_apply_event(struct rpl_group_info *rgi)
return 0;
}
-int Intvar_log_event::do_update_pos(struct rpl_group_info *rgi)
+int Intvar_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
@@ -6818,7 +6816,7 @@ void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Rand_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Rand_log_event::do_apply_event(rpl_group_info *rgi)
{
Relay_log_info const *rli= rgi->rli;
/*
@@ -6835,7 +6833,7 @@ int Rand_log_event::do_apply_event(struct rpl_group_info *rgi)
return 0;
}
-int Rand_log_event::do_update_pos(struct rpl_group_info *rgi)
+int Rand_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
@@ -6950,7 +6948,7 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Xid_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Xid_log_event::do_apply_event(rpl_group_info *rgi)
{
bool res;
int err;
@@ -7416,7 +7414,7 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int User_var_log_event::do_apply_event(struct rpl_group_info *rgi)
+int User_var_log_event::do_apply_event(rpl_group_info *rgi)
{
Item *it= 0;
CHARSET_INFO *charset;
@@ -7505,7 +7503,7 @@ int User_var_log_event::do_apply_event(struct rpl_group_info *rgi)
DBUG_RETURN(0);
}
-int User_var_log_event::do_update_pos(struct rpl_group_info *rgi)
+int User_var_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
@@ -7682,7 +7680,7 @@ Slave_log_event::Slave_log_event(const char* buf,
#ifndef MYSQL_CLIENT
-int Slave_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Slave_log_event::do_apply_event(rpl_group_info *rgi)
{
if (mysql_bin_log.is_open())
return mysql_bin_log.write(this);
@@ -7726,7 +7724,7 @@ void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
Start_log_event_v3::do_apply_event(), not here. Because if we come
here, the master was sane.
*/
-int Stop_log_event::do_update_pos(struct rpl_group_info *rgi)
+int Stop_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
@@ -7958,7 +7956,7 @@ void Create_file_log_event::pack_info(THD *thd, Protocol *protocol)
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Create_file_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Create_file_log_event::do_apply_event(rpl_group_info *rgi)
{
char proc_info[17+FN_REFLEN+10], *fname_buf;
char *ext;
@@ -8140,7 +8138,7 @@ int Append_block_log_event::get_create_or_append() const
Append_block_log_event::do_apply_event()
*/
-int Append_block_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Append_block_log_event::do_apply_event(rpl_group_info *rgi)
{
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
int fd;
@@ -8291,7 +8289,7 @@ void Delete_file_log_event::pack_info(THD *thd, Protocol *protocol)
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Delete_file_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Delete_file_log_event::do_apply_event(rpl_group_info *rgi)
{
char fname[FN_REFLEN+10];
Relay_log_info const *rli= rgi->rli;
@@ -8391,7 +8389,7 @@ void Execute_load_log_event::pack_info(THD *thd, Protocol *protocol)
Execute_load_log_event::do_apply_event()
*/
-int Execute_load_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Execute_load_log_event::do_apply_event(rpl_group_info *rgi)
{
char fname[FN_REFLEN+10];
char *ext;
@@ -8664,7 +8662,7 @@ void Execute_load_query_log_event::pack_info(THD *thd, Protocol *protocol)
int
-Execute_load_query_log_event::do_apply_event(struct rpl_group_info *rgi)
+Execute_load_query_log_event::do_apply_event(rpl_group_info *rgi)
{
char *p;
char *buf;
@@ -9072,7 +9070,7 @@ int Rows_log_event::do_add_row_data(uchar *row_data, size_t length)
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Rows_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Rows_log_event::do_apply_event(rpl_group_info *rgi)
{
Relay_log_info const *rli= rgi->rli;
DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
@@ -9538,7 +9536,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
@retval non-zero Error in the statement commit
*/
int
-Rows_log_event::do_update_pos(struct rpl_group_info *rgi)
+Rows_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Rows_log_event::do_update_pos");
@@ -9777,7 +9775,7 @@ void Annotate_rows_log_event::print(FILE *file, PRINT_EVENT_INFO *pinfo)
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Annotate_rows_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Annotate_rows_log_event::do_apply_event(rpl_group_info *rgi)
{
m_save_thd_query_txt= thd->query();
m_save_thd_query_len= thd->query_length();
@@ -9787,7 +9785,7 @@ int Annotate_rows_log_event::do_apply_event(struct rpl_group_info *rgi)
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Annotate_rows_log_event::do_update_pos(struct rpl_group_info *rgi)
+int Annotate_rows_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
@@ -10296,7 +10294,7 @@ check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list)
DBUG_RETURN(res);
}
-int Table_map_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
{
RPL_TABLE_LIST *table_list;
char *db_mem, *tname_mem;
@@ -10415,7 +10413,7 @@ Table_map_log_event::do_shall_skip(Relay_log_info *rli)
return continue_group(rli);
}
-int Table_map_log_event::do_update_pos(struct rpl_group_info *rgi)
+int Table_map_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
@@ -11847,7 +11845,7 @@ Incident_log_event::print(FILE *file,
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int
-Incident_log_event::do_apply_event(struct rpl_group_info *rgi)
+Incident_log_event::do_apply_event(rpl_group_info *rgi)
{
Relay_log_info const *rli= rgi->rli;
DBUG_ENTER("Incident_log_event::do_apply_event");
diff --git a/sql/log_event.h b/sql/log_event.h
index 6d6a330fc48..1dc7f516727 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -1317,7 +1317,7 @@ public:
@see do_apply_event
*/
- int apply_event(struct rpl_group_info *rgi)
+ int apply_event(rpl_group_info *rgi)
{
return do_apply_event(rgi);
}
@@ -1331,7 +1331,7 @@ public:
@see do_update_pos
*/
- int update_pos(struct rpl_group_info *rgi)
+ int update_pos(rpl_group_info *rgi)
{
return do_update_pos(rgi);
}
@@ -1432,7 +1432,7 @@ protected:
@retval 0 Event applied successfully
@retval errno Error code if event application failed
*/
- virtual int do_apply_event(struct rpl_group_info *rgi)
+ virtual int do_apply_event(rpl_group_info *rgi)
{
return 0; /* Default implementation does nothing */
}
@@ -1461,7 +1461,7 @@ protected:
1). Observe that handler errors are returned by the
do_apply_event() function, and not by this one.
*/
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
/**
@@ -1986,10 +1986,10 @@ public:
public: /* !!! Public in this patch to allow old usage */
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
- virtual int do_apply_event(struct rpl_group_info *rgi);
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
- int do_apply_event(struct rpl_group_info *rgi,
+ int do_apply_event(rpl_group_info *rgi,
const char *query_arg,
uint32 q_len_arg);
static bool peek_is_commit_rollback(const char *event_start,
@@ -2103,7 +2103,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
#endif
};
@@ -2416,12 +2416,12 @@ public:
public: /* !!! Public in this patch to allow old usage */
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi)
+ virtual int do_apply_event(rpl_group_info *rgi)
{
return do_apply_event(thd->slave_net,rgi,0);
}
- int do_apply_event(NET *net, struct rpl_group_info *rgi,
+ int do_apply_event(NET *net, rpl_group_info *rgi,
bool use_rli_only_for_errors);
#endif
};
@@ -2500,7 +2500,7 @@ public:
protected:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info*)
{
/*
@@ -2596,8 +2596,8 @@ public:
static bool is_version_before_checksum(const master_version_split *version_split);
protected:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
@@ -2675,8 +2675,8 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg,
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
@@ -2754,8 +2754,8 @@ class Rand_log_event: public Log_event
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
@@ -2803,7 +2803,7 @@ class Xid_log_event: public Log_event
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
@@ -2870,8 +2870,8 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
@@ -2905,7 +2905,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli)
{
/*
@@ -3007,7 +3007,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
@@ -3119,8 +3119,8 @@ public:
uint16 flags, bool is_transactional, uint64 commit_id);
#ifdef HAVE_REPLICATION
void pack_info(THD *thd, Protocol *protocol);
- virtual int do_apply_event(struct rpl_group_info *rgi);
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
#else
@@ -3249,7 +3249,7 @@ public:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
bool to_packet(String *packet);
bool write(IO_CACHE *file);
- virtual int do_apply_event(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
#endif
static bool peek(const char *event_start, uint32 event_len,
uint8 checksum_alg,
@@ -3328,7 +3328,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
#endif
};
@@ -3383,7 +3383,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
#endif
};
@@ -3424,7 +3424,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
#endif
};
@@ -3464,7 +3464,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
#endif
};
@@ -3563,7 +3563,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
#endif
};
@@ -3635,8 +3635,8 @@ public:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
private:
- virtual int do_apply_event(struct rpl_group_info *rgi);
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info*);
#endif
@@ -4050,8 +4050,8 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
@@ -4278,8 +4278,8 @@ protected:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
/*
@@ -4612,7 +4612,7 @@ public:
#endif
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
#endif
virtual bool write_data_header(IO_CACHE *file);
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc
index d3e9d47d64a..db1b3fb5a9f 100644
--- a/sql/log_event_old.cc
+++ b/sql/log_event_old.cc
@@ -36,7 +36,7 @@
// Old implementation of do_apply_event()
int
-Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, struct rpl_group_info *rgi)
+Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
{
DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)");
int error= 0;
@@ -1451,7 +1451,7 @@ int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Old_rows_log_event::do_apply_event(struct rpl_group_info *rgi)
+int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
{
DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)");
int error= 0;
@@ -1834,7 +1834,7 @@ Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
}
int
-Old_rows_log_event::do_update_pos(struct rpl_group_info *rgi)
+Old_rows_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Old_rows_log_event::do_update_pos");
diff --git a/sql/log_event_old.h b/sql/log_event_old.h
index ad51349ef80..7c35b875dc4 100644
--- a/sql/log_event_old.h
+++ b/sql/log_event_old.h
@@ -214,8 +214,8 @@ protected:
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
- virtual int do_apply_event(struct rpl_group_info *rgi);
- virtual int do_update_pos(struct rpl_group_info *rgi);
+ virtual int do_apply_event(rpl_group_info *rgi);
+ virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
/*
@@ -275,7 +275,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
- int do_apply_event(Old_rows_log_event*, struct rpl_group_info *rgi);
+ int do_apply_event(Old_rows_log_event*, rpl_group_info *rgi);
/*
Primitive to prepare for a sequence of row executions.
@@ -403,7 +403,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
// use old definition of do_apply_event()
- virtual int do_apply_event(struct rpl_group_info *rgi)
+ virtual int do_apply_event(rpl_group_info *rgi)
{ return Old_rows_log_event::do_apply_event(this, rgi); }
// primitives for old version of do_apply_event()
@@ -481,7 +481,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
// use old definition of do_apply_event()
- virtual int do_apply_event(struct rpl_group_info *rgi)
+ virtual int do_apply_event(rpl_group_info *rgi)
{ return Old_rows_log_event::do_apply_event(this, rgi); }
// primitives for old version of do_apply_event()
@@ -556,7 +556,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
// use old definition of do_apply_event()
- virtual int do_apply_event(struct rpl_group_info *rgi)
+ virtual int do_apply_event(rpl_group_info *rgi)
{ return Old_rows_log_event::do_apply_event(this, rgi); }
// primitives for old version of do_apply_event()
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index bc826e9bdb5..a1b14ad3255 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -62,7 +62,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
int
-rpl_slave_state::record_and_update_gtid(THD *thd, struct rpl_group_info *rgi)
+rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
{
uint64 sub_id;
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 7970f15eb49..7cf2c9162ff 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -62,7 +62,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt)
{
int err;
- struct rpl_group_info *rgi= qev->rgi;
+ rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
@@ -128,8 +128,9 @@ handle_rpl_parallel_thread(void *arg)
old_msg= thd->proc_info;
thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
"Waiting for work from SQL thread");
- while (!rpt->stop && !thd->killed && !(events= rpt->event_queue))
+ while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed)
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
+ /* Mark that this thread is now executing */
rpt->free= false;
rpt->event_queue= rpt->last_in_queue= NULL;
thd->exit_cond(old_msg);
@@ -145,9 +146,15 @@ handle_rpl_parallel_thread(void *arg)
uint64 wait_start_sub_id;
bool end_of_group;
+ /* Handle a new event group, which will be initiated by a GTID event. */
if (event_type == GTID_EVENT)
{
in_event_group= true;
+ /*
+ If the standalone flag is set, then this event group consists of a
+ single statement (possibly preceeded by some Intvar_log_event and
+ similar), without any terminating COMMIT/ROLLBACK/XID.
+ */
group_standalone=
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
@@ -540,12 +547,12 @@ rpl_parallel::wait_for_done()
bool
-rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev)
+rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
{
rpl_parallel_entry *e;
rpl_parallel_thread *cur_thread;
rpl_parallel_thread::queued_event *qev;
- struct rpl_group_info *rgi;
+ rpl_group_info *rgi;
Relay_log_info *rli= serial_rgi->rli;
enum Log_event_type typ;
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 304263c3477..adbb1a18526 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -23,7 +23,7 @@ struct rpl_parallel_thread {
struct queued_event {
queued_event *next;
Log_event *ev;
- struct rpl_group_info *rgi;
+ rpl_group_info *rgi;
} *event_queue, *last_in_queue;
};
@@ -59,7 +59,7 @@ struct rpl_parallel_entry {
mysql_mutex_t LOCK_parallel_entry;
mysql_cond_t COND_parallel_entry;
uint64 current_sub_id;
- struct rpl_group_info *current_group_info;
+ rpl_group_info *current_group_info;
/*
The sub_id of the last event group in the previous batch of group-committed
transactions.
@@ -78,7 +78,7 @@ struct rpl_parallel {
~rpl_parallel();
rpl_parallel_entry *find(uint32 domain_id);
void wait_for_done();
- bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev);
+ bool do_event(rpl_group_info *serial_rgi, Log_event *ev);
};
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 73658d10624..49547718230 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1193,7 +1193,7 @@ bool Relay_log_info::cached_charset_compare(char *charset) const
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
time_t event_creation_time, THD *thd,
- struct rpl_group_info *rgi)
+ rpl_group_info *rgi)
{
#ifndef DBUG_OFF
extern uint debug_not_change_ts_if_art_event;
@@ -1265,6 +1265,11 @@ void Relay_log_info::cleanup_context(THD *thd, bool error)
{
DBUG_ENTER("Relay_log_info::cleanup_context");
+ /*
+ In parallel replication, different THDs can be used from different
+ parallel threads. But in single-threaded mode, only the THD of the main
+ SQL thread is allowed.
+ */
DBUG_ASSERT(opt_slave_parallel_threads > 0 || sql_thd == thd);
/*
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
@@ -1552,6 +1557,7 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
if (!sub_id)
{
+ /* Out of memory caused hash insertion to fail. */
return 1;
}
rgi->gtid_sub_id= sub_id;
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 91c5c65d33b..4d954d1c8aa 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -422,7 +422,7 @@ public:
*/
void stmt_done(my_off_t event_log_pos,
time_t event_creation_time, THD *thd,
- struct rpl_group_info *rgi);
+ rpl_group_info *rgi);
/**
@@ -521,10 +521,14 @@ private:
/*
This is data for various state needed to be kept for the processing of
- one event group in the SQL thread.
+ one event group (transaction) during replication.
- For single-threaded replication it is linked from the RLI, for parallel
- replication it is linked into each event group being executed in parallel.
+ In single-threaded replication, there will be one global rpl_group_info and
+ one global Relay_log_info per master connection. They will be linked
+ together.
+
+ In parallel replication, there will be one rpl_group_info object for
+ each running thd. All rpl_group_info will share the same Relay_log_info.
*/
struct rpl_group_info
{
@@ -555,7 +559,7 @@ struct rpl_group_info
for the wrong commit).
*/
uint64 wait_commit_sub_id;
- struct rpl_group_info *wait_commit_group_info;
+ rpl_group_info *wait_commit_group_info;
/*
If non-zero, the event group must wait for this sub_id to be committed
before the execution of the event group is allowed to start.
diff --git a/sql/rpl_utility.cc b/sql/rpl_utility.cc
index f734b95edc1..40fda63f396 100644
--- a/sql/rpl_utility.cc
+++ b/sql/rpl_utility.cc
@@ -1143,7 +1143,7 @@ bool Deferred_log_events::is_empty()
return array.elements == 0;
}
-bool Deferred_log_events::execute(struct rpl_group_info *rgi)
+bool Deferred_log_events::execute(rpl_group_info *rgi)
{
bool res= false;
diff --git a/sql/slave.cc b/sql/slave.cc
index 777ab9c8468..e0cc595213d 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3019,7 +3019,7 @@ static int has_temporary_error(THD *thd)
ev->update_pos().
*/
int apply_event_and_update_pos(Log_event* ev, THD* thd,
- struct rpl_group_info *rgi,
+ rpl_group_info *rgi,
rpl_parallel_thread *rpt)
{
int exec_res= 0;
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index 1b6713f1bc3..04cb4adcb2c 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -80,7 +80,7 @@ void mysql_client_binlog_statement(THD* thd)
my_bool have_fd_event= TRUE;
int err;
Relay_log_info *rli;
- struct rpl_group_info *rgi;
+ rpl_group_info *rgi;
rli= thd->rli_fake;
if (!rli)
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 43d810d27d4..66b28c87ac9 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5666,6 +5666,10 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
waiting_for_commit= false;
else
{
+ /*
+ Put ourself at the head of the waitee's list of transactions that must
+ wait for it to commit first.
+ */
this->next_subsequent_commit= waitee->subsequent_commits_list;
waitee->subsequent_commits_list= this;
}
@@ -5704,7 +5708,7 @@ wait_for_commit::wait_for_prior_commit2()
The waiter needs to lock the waitee to delete itself from the list in
unregister_wait_for_prior_commit(). Thus wakeup_subsequent_commits() can not
- hold its own lock while locking waiters, lest we deadlock.
+ hold its own lock while locking waiters, as this could lead to deadlock.
So we need to prevent unregister_wait_for_prior_commit() running while wakeup
is in progress - otherwise the unregister could complete before the wakeup,
@@ -5727,6 +5731,7 @@ wait_for_commit::wait_for_prior_commit2()
would not be woken up until next wakeup, which could be potentially much
later than necessary.
*/
+
void
wait_for_commit::wakeup_subsequent_commits2()
{
diff --git a/sql/sql_class.h b/sql/sql_class.h
index e7f593db62b..c34c100171d 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1615,7 +1615,7 @@ struct wait_for_commit
*/
bool waiting_for_commit;
/*
- Flag set when wakeup_subsequent_commits_running() is active, see commonts
+ Flag set when wakeup_subsequent_commits_running() is active, see comments
on that function for details.
*/
bool wakeup_subsequent_commits_running;