summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/log_event.h20
-rw-r--r--sql/mysqld.cc5
-rw-r--r--sql/mysqld.h2
-rw-r--r--sql/rpl_parallel.cc177
-rw-r--r--sql/rpl_parallel.h11
-rw-r--r--sql/rpl_rli.cc26
-rw-r--r--sql/rpl_rli.h27
-rw-r--r--sql/slave.cc70
-rw-r--r--sql/sql_class.cc7
-rw-r--r--sql/sql_class.h1
10 files changed, 258 insertions, 88 deletions
diff --git a/sql/log_event.h b/sql/log_event.h
index 8a60296695b..491666e2fdb 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -1376,6 +1376,26 @@ public:
}
}
+ static bool is_group_event(enum Log_event_type ev_type)
+ {
+ switch (ev_type)
+ {
+ case START_EVENT_V3:
+ case STOP_EVENT:
+ case ROTATE_EVENT:
+ case SLAVE_EVENT:
+ case FORMAT_DESCRIPTION_EVENT:
+ case INCIDENT_EVENT:
+ case HEARTBEAT_LOG_EVENT:
+ case BINLOG_CHECKPOINT_EVENT:
+ case GTID_LIST_EVENT:
+ return false;
+
+ default:
+ return true;
+ }
+ }
+
protected:
/**
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index bbb7c0d67bf..52c754993ac 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -772,7 +772,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_PARTITION_LOCK_auto_inc;
PSI_mutex_key key_RELAYLOG_LOCK_index;
PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
- key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool;
+ key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
@@ -850,7 +850,8 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_slave_state, "LOCK_slave_state", 0},
{ &key_LOCK_binlog_state, "LOCK_binlog_state", 0},
{ &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0},
- { &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0}
+ { &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0},
+ { &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0}
};
PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
diff --git a/sql/mysqld.h b/sql/mysqld.h
index ed6d05807b0..d3b17cfefe1 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -249,7 +249,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
- key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool;
+ key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 8ea4799e94a..1a6eb9e3d50 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -4,6 +4,51 @@
#include "rpl_mi.h"
+/*
+ Code for optional parallel execution of replicated events on the slave.
+
+ ToDo list:
+
+ - Review every field in Relay_log_info, and all code that accesses it.
+ Split out the necessary parts into rpl_group_info, to avoid conflicts
+ between parallel execution of events. (Such as deferred events ...)
+
+ - Error handling. If we fail in one of multiple parallel executions, we
+ need to make a best effort to complete prior transactions and roll back
+ following transactions, so slave binlog position will be correct.
+
+ - Stopping the slave needs to handle stopping all parallel executions. And
+ the logic in sql_slave_killed() that waits for current event group to
+ complete needs to be extended appropriately...
+
+ - We need some user-configurable limit on how far ahead the SQL thread will
+ fetch and queue events for parallel execution (otherwise if slave gets
+ behind we will fill up memory with pending malloc()'ed events).
+
+ - Fix update of relay-log.info and master.info. In non-GTID replication,
+ they must be serialised to preserve correctness. In GTID replication, we
+ should not update them at all except at slave thread stop.
+
+ - All the waits (eg. in struct wait_for_commit and in
+ rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill,
+ everything needs to be correctly rolled back and stopped in all threads,
+ to ensure a consistent slave replication state.
+
+ - We need some knob on the master to allow the user to deliberately delay
+ commits waiting for more transactions to join group commit, to increase
+ potential for parallel execution on the slave.
+
+ - Handle the case of a partial event group. This occurs when the master
+ crashes in the middle of writing the event group to the binlog. The
+ slave rolls back the transaction; parallel execution needs to be able
+ to deal with this wrt. commit_orderer and such.
+
+ - We should fail if we connect to the master with opt_slave_parallel_threads
+ greater than zero and master does not support GTID. Just to avoid a bunch
+ of potential problems, we won't be able to do any parallel replication
+ in this case anyway.
+*/
+
struct rpl_parallel_thread_pool global_rpl_thread_pool;
@@ -18,13 +63,14 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
thd->rli_slave= rli;
thd->rpl_filter = rli->mi->rpl_filter;
+ /* ToDo: Get rid of rli->group_info, it is not thread safe. */
+ rli->group_info= rgi;
+
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock);
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
/* ToDo: error handling. */
/* ToDo: also free qev->ev, or hold on to it for a bit if necessary. */
- my_free(rgi);
- rgi= NULL;
}
@@ -90,31 +136,72 @@ handle_rpl_parallel_thread(void *arg)
{
struct rpl_parallel_thread::queued_event *next= events->next;
Log_event_type event_type= events->ev->get_type_code();
+ rpl_group_info *rgi= events->rgi;
+ rpl_parallel_entry *entry= rgi->parallel_entry;
+ uint64 wait_for_sub_id;
+
if (event_type == GTID_EVENT)
{
+ in_event_group= true;
group_standalone=
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
- in_event_group= true;
- }
- else
- {
- if (group_standalone)
+
+ /*
+ Register ourself to wait for the previous commit, if we need to do
+ such registration _and_ that previous commit has not already
+ occured.
+ */
+ if ((wait_for_sub_id= rgi->wait_commit_sub_id))
{
- if (!Log_event::is_part_of_group(event_type))
- in_event_group= false;
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ if (wait_for_sub_id > entry->last_committed_sub_id)
+ {
+ wait_for_commit *waitee=
+ &rgi->wait_commit_group_info->commit_orderer;
+ rgi->commit_orderer.register_wait_for_prior_commit(waitee);
+ }
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
}
- else if (event_type == XID_EVENT)
- in_event_group= false;
- else if (event_type == QUERY_EVENT)
+
+ DBUG_ASSERT(!thd->wait_for_commit_ptr);
+ thd->wait_for_commit_ptr= &rgi->commit_orderer;
+ }
+
+ rpt_handle_event(events, thd, rpt);
+
+ if (in_event_group)
+ {
+ if ((group_standalone && !Log_event::is_part_of_group(event_type)) ||
+ event_type == XID_EVENT ||
+ (event_type == QUERY_EVENT &&
+ (!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) ||
+ !strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query))))
{
- Query_log_event *query= static_cast<Query_log_event *>(events->ev);
- if (!strcmp("COMMIT", query->query) ||
- !strcmp("ROLLBACK", query->query))
- in_event_group= false;
+ in_event_group= false;
+
+ rgi->commit_orderer.unregister_wait_for_prior_commit();
+ thd->wait_for_commit_ptr= NULL;
+
+ /*
+ Record that we have finished, so other event groups will no
+ longer attempt to wait for us to commit.
+
+ We can race here with the next transactions, but that is fine, as
+ long as we check that we do not decrease last_committed_sub_id. If
+ this commit is done, then any prior commits will also have been
+ done and also no longer need waiting for.
+ */
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ if (entry->last_committed_sub_id < rgi->gtid_sub_id)
+ entry->last_committed_sub_id= rgi->gtid_sub_id;
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+
+ rgi->commit_orderer.wakeup_subsequent_commits();
+ delete rgi;
}
}
- rpt_handle_event(events, thd, rpt);
+
my_free(events);
events= next;
}
@@ -365,19 +452,17 @@ rpl_parallel::find(uint32 domain_id)
(const uchar *)&domain_id, 0)))
{
/* Allocate a new, empty one. */
- if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e), MYF(0))))
+ if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e),
+ MYF(MY_ZEROFILL))))
return NULL;
e->domain_id= domain_id;
- e->last_server_id= 0;
- e->last_seq_no= 0;
- e->last_commit_id= 0;
- e->active= false;
- e->rpl_thread= NULL;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
return NULL;
}
+ mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry,
+ MY_MUTEX_INIT_FAST);
}
return e;
@@ -385,11 +470,15 @@ rpl_parallel::find(uint32 domain_id)
bool
-rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
+rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
+ THD *parent_thd)
{
rpl_parallel_entry *e;
rpl_parallel_thread *cur_thread;
rpl_parallel_thread::queued_event *qev;
+ struct rpl_group_info *rgi;
+ Relay_log_info *rli= serial_rgi->rli;
+ enum Log_event_type typ;
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
@@ -401,17 +490,17 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
return true;
}
qev->ev= ev;
- qev->rgi= rli->group_info;
- rli->group_info= NULL; /* Avoid conflict with groups applied in parallel */
qev->next= NULL;
- if (ev->get_type_code() == GTID_EVENT)
+ if ((typ= ev->get_type_code()) == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- if (!(e= find(gtid_ev->domain_id)))
+ if (!(e= find(gtid_ev->domain_id)) ||
+ !(e->current_group_info= rgi= new rpl_group_info(rli)) ||
+ event_group_new_gtid(rgi, gtid_ev))
{
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
return true;
}
@@ -448,7 +537,7 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
e->last_commit_id= 0;
}
cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
- e->rpl_thread->wait_for= NULL; /* ToDo */
+ rgi->wait_commit_sub_id= 0;
/* get_thread() returns with the LOCK_rpl_thread locked. */
}
else if ((gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
@@ -464,8 +553,8 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
servers in the replication hierarchy.
*/
rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
- rpt->wait_for= cur_thread; /* ToDo */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ rgi->wait_commit_sub_id= e->current_sub_id;
+ rgi->wait_commit_group_info= e->current_group_info;
e->rpl_thread= cur_thread= rpt;
/* get_thread() returns with the LOCK_rpl_thread locked. */
}
@@ -476,18 +565,25 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
domain, and we have to wait for that to finish before we can start on
the next one. So just re-use the thread.
*/
+ rgi->wait_commit_sub_id= 0;
}
- current= e;
+ e->current_sub_id= rgi->gtid_sub_id;
+ current= rgi->parallel_entry= e;
+ }
+ else if (!Log_event::is_group_event(typ) || !current)
+ {
+ /*
+ Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
+ Same for events not preceeded by GTID (we should not see those normally,
+ but they might be from an old master).
+ */
+ qev->rgi= serial_rgi;
+ rpt_handle_event(qev, parent_thd, NULL);
+ return false;
}
else
{
- if (!current)
- {
- /* We have no domain_id yet, just run non-parallel. */
- rpt_handle_event(qev, parent_thd, NULL);
- return false;
- }
cur_thread= current->rpl_thread;
if (cur_thread)
{
@@ -503,9 +599,10 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
{
cur_thread= current->rpl_thread=
global_rpl_thread_pool.get_thread(current);
- cur_thread->wait_for= NULL; /* ToDo */
}
}
+ qev->rgi= current->current_group_info;
+
/*
Queue the event for processing.
*/
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index c5bb39cb6fc..b0367efdea6 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -25,7 +25,6 @@ struct rpl_parallel_thread {
Log_event *ev;
struct rpl_group_info *rgi;
} *event_queue, *last_in_queue;
- rpl_parallel_thread *wait_for; /* ToDo: change this ... */
};
@@ -52,6 +51,14 @@ struct rpl_parallel_entry {
uint64 last_commit_id;
bool active;
rpl_parallel_thread *rpl_thread;
+ /*
+ The sub_id of the last transaction to commit within this domain_id.
+ Must be accessed under LOCK_parallel_entry protection.
+ */
+ uint64 last_committed_sub_id;
+ mysql_mutex_t LOCK_parallel_entry;
+ uint64 current_sub_id;
+ struct rpl_group_info *current_group_info;
};
struct rpl_parallel {
HASH domain_hash;
@@ -60,7 +67,7 @@ struct rpl_parallel {
rpl_parallel();
~rpl_parallel();
rpl_parallel_entry *find(uint32 domain_id);
- bool do_event(Relay_log_info *rli, Log_event *ev, THD *thd);
+ bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev, THD *thd);
};
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 5d5bca1189c..264c0b2cc22 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -113,8 +113,6 @@ Relay_log_info::~Relay_log_info()
mysql_cond_destroy(&sleep_cond);
relay_log.cleanup();
free_annotate_event();
- if (group_info)
- my_free(group_info);
DBUG_VOID_RETURN;
}
@@ -1532,4 +1530,28 @@ end:
DBUG_RETURN(err);
}
+
+rpl_group_info::rpl_group_info(Relay_log_info *rli_)
+ : rli(rli_), gtid_sub_id(0), wait_commit_sub_id(0), wait_commit_group_info(0),
+ parallel_entry(0)
+{
+ bzero(&current_gtid, sizeof(current_gtid));
+}
+
+
+int
+event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
+{
+ uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
+ if (!sub_id)
+ {
+ return 1;
+ }
+ rgi->gtid_sub_id= sub_id;
+ rgi->current_gtid.server_id= gev->server_id;
+ rgi->current_gtid.domain_id= gev->domain_id;
+ rgi->current_gtid.seq_no= gev->seq_no;
+ return 0;
+}
+
#endif
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index c02ae6e3adb..f1f96344c65 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -314,7 +314,7 @@ public:
char slave_patternload_file[FN_REFLEN];
size_t slave_patternload_file_size;
- /* Various data related to the currently executing event group. */
+ /* ToDo: We need to remove this, always use the per-transaction one to work with parallel replication. */
struct rpl_group_info *group_info;
rpl_parallel parallel;
@@ -610,6 +610,30 @@ struct rpl_group_info
*/
uint64 gtid_sub_id;
rpl_gtid current_gtid;
+ /*
+ This is used to keep transaction commit order.
+ We will signal this when we commit, and can register it to wait for the
+ commit_orderer of the previous commit to signal us.
+ */
+ wait_for_commit commit_orderer;
+ /*
+ If non-zero, the sub_id of a prior event group whose commit we have to wait
+ for before committing ourselves. Then wait_commit_group_info points to the
+ event group to wait for.
+
+ Before using this, rpl_parallel_entry::last_committed_sub_id should be
+ compared against wait_commit_sub_id. Only if last_committed_sub_id is
+ smaller than wait_commit_sub_id must the wait be done (otherwise the
+ waited-for transaction is already committed, so we would otherwise wait
+ for the wrong commit).
+ */
+ uint64 wait_commit_sub_id;
+ struct rpl_group_info *wait_commit_group_info;
+
+ struct rpl_parallel_entry *parallel_entry;
+
+ rpl_group_info(Relay_log_info *rli);
+ ~rpl_group_info() { };
};
@@ -620,5 +644,6 @@ int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
extern struct rpl_slave_state rpl_global_gtid_slave_state;
int rpl_load_gtid_slave_state(THD *thd);
+int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
#endif /* RPL_RLI_H */
diff --git a/sql/slave.cc b/sql/slave.cc
index ace5c7f837b..072ec90076d 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3177,7 +3177,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
@retval 1 The event was not applied.
*/
-static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
+static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
+ rpl_group_info *serial_rgi)
{
DBUG_ENTER("exec_relay_log_event");
@@ -3201,6 +3202,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
if (ev)
{
int exec_res;
+ Log_event_type typ= ev->get_type_code();
/*
This tests if the position of the beginning of the current event
@@ -3230,8 +3232,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
read hanging if the realy log does not have any more events.
*/
DBUG_EXECUTE_IF("incomplete_group_in_relay_log",
- if ((ev->get_type_code() == XID_EVENT) ||
- ((ev->get_type_code() == QUERY_EVENT) &&
+ if ((typ == XID_EVENT) ||
+ ((typ == QUERY_EVENT) &&
strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0))
{
DBUG_ASSERT(thd->transaction.all.modified_non_trans_table);
@@ -3244,11 +3246,25 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
}
if (opt_slave_parallel_threads > 0)
- DBUG_RETURN(rli->parallel.do_event(rli, ev, thd));
+ DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, thd));
+
+ /*
+ For GTID, allocate a new sub_id for the given domain_id.
+ The sub_id must be allocated in increasing order of binlog order.
+ */
+ if (typ == GTID_EVENT &&
+ event_group_new_gtid(serial_rgi, static_cast<Gtid_log_event *>(ev)))
+ {
+ sql_print_error("Error reading relay log event: %s",
+ "slave SQL thread aborted because of out-of-memory error");
+ mysql_mutex_unlock(&rli->data_lock);
+ delete ev;
+ DBUG_RETURN(1);
+ }
exec_res= apply_event_and_update_pos(ev, thd, rli->group_info, NULL);
- switch (ev->get_type_code()) {
+ switch (typ) {
case FORMAT_DESCRIPTION_EVENT:
/*
Format_description_log_event should not be deleted because it
@@ -4001,6 +4017,7 @@ pthread_handler_t handle_slave_sql(void *arg)
Master_info *mi= ((Master_info*)arg);
Relay_log_info* rli = &mi->rli;
const char *errmsg;
+ rpl_group_info serial_rgi(rli);
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
@@ -4205,6 +4222,13 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
}
mysql_mutex_unlock(&rli->data_lock);
+ /*
+ ToDo: Get rid of this, all accesses to rpl_group_info must be made
+ per-worker-thread to work with parallel replication.
+ */
+ if (opt_slave_parallel_threads <= 0)
+ rli->group_info= &serial_rgi;
+
/* Read queries from the IO/THREAD until this thread is killed */
while (!sql_slave_killed(thd,rli))
@@ -4227,7 +4251,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
saved_skip= 0;
}
- if (exec_relay_log_event(thd,rli))
+ if (exec_relay_log_event(thd, rli, &serial_rgi))
{
DBUG_PRINT("info", ("exec_relay_log_event() failed"));
// do not scare the user if SQL thread was simply killed or stopped
@@ -5736,7 +5760,6 @@ static Log_event* next_event(Relay_log_info* rli)
mysql_mutex_t *log_lock = rli->relay_log.get_log_lock();
const char* errmsg=0;
THD* thd = rli->sql_thd;
- struct rpl_group_info *rgi;
DBUG_ENTER("next_event");
DBUG_ASSERT(thd != 0);
@@ -5824,45 +5847,12 @@ static Log_event* next_event(Relay_log_info* rli)
opt_slave_sql_verify_checksum)))
{
- if (!(rgi= rli->group_info))
- {
- if (!(rgi= rli->group_info= (struct rpl_group_info *)
- my_malloc(sizeof(*rgi), MYF(0))))
- {
- errmsg = "slave SQL thread aborted because of out-of-memory error";
- if (hot_log)
- mysql_mutex_unlock(log_lock);
- goto err;
- }
- bzero(rgi, sizeof(*rgi));
- }
- rgi->rli= rli;
DBUG_ASSERT(thd==rli->sql_thd);
/*
read it while we have a lock, to avoid a mutex lock in
inc_event_relay_log_pos()
*/
rli->future_event_relay_log_pos= my_b_tell(cur_log);
- /*
- For GTID, allocate a new sub_id for the given domain_id.
- The sub_id must be allocated in increasing order of binlog order.
- */
- if (ev->get_type_code() == GTID_EVENT)
- {
- Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev);
- uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
- if (!sub_id)
- {
- errmsg = "slave SQL thread aborted because of out-of-memory error";
- if (hot_log)
- mysql_mutex_unlock(log_lock);
- goto err;
- }
- rgi->gtid_sub_id= sub_id;
- rgi->current_gtid.server_id= gev->server_id;
- rgi->current_gtid.domain_id= gev->domain_id;
- rgi->current_gtid.seq_no= gev->seq_no;
- }
if (hot_log)
mysql_mutex_unlock(log_lock);
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index fa53b38ab70..aec65dc385c 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5602,6 +5602,13 @@ wait_for_commit::wait_for_commit()
}
+wait_for_commit::~wait_for_commit()
+{
+ mysql_mutex_destroy(&LOCK_wait_commit);
+ mysql_cond_destroy(&COND_wait_commit);
+}
+
+
void
wait_for_commit::wakeup()
{
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 4e1917f62b7..3b7cfb42ec7 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1659,6 +1659,7 @@ struct wait_for_commit
void unregister_wait_for_prior_commit2();
wait_for_commit();
+ ~wait_for_commit();
};