summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-06-26 12:10:35 +0200
committerunknown <knielsen@knielsen-hq.org>2013-06-26 12:10:35 +0200
commit7e5dc4f074b7d1cee4721e6fa49d6e5628ef793f (patch)
tree1df1473973c1cb97d8d1a4f0a25028d3b74c39cb
parent535de71728ff92747b46e985b339d23b4587a9c4 (diff)
downloadmariadb-git-7e5dc4f074b7d1cee4721e6fa49d6e5628ef793f.tar.gz
MDEV-4506: Parallel replication. Intermediate commit.
Implement facility for the commit in one thread to wait for the commit of another to complete first. The wait is done in a way that does not hinder that a waiter and a waitee can group commit together with a single fsync() in both binlog and InnoDB. The wait is done efficiently with respect to locking. The patch was originally made to support TaoBao parallel replication with in-order commit; now it will be adapted to also be used for parallel replication of group-committed transactions. A waiter THD registers itself with a prior waitee THD. The waiter will then complete its commit at the earliest in the same group commit of the waitee (when using binlog). The wait can also be done explicitly by the waitee.
-rw-r--r--include/mysql/plugin.h35
-rw-r--r--include/mysql/plugin_audit.h.pp1
-rw-r--r--include/mysql/plugin_auth.h.pp1
-rw-r--r--include/mysql/plugin_ftparser.h.pp1
-rw-r--r--sql/handler.cc8
-rw-r--r--sql/log.cc193
-rw-r--r--sql/log.h13
-rw-r--r--sql/mysqld.cc7
-rw-r--r--sql/mysqld.h5
-rw-r--r--sql/sql_class.cc208
-rw-r--r--sql/sql_class.h122
-rw-r--r--storage/innobase/handler/ha_innodb.cc5
-rw-r--r--storage/xtradb/handler/ha_innodb.cc5
13 files changed, 586 insertions, 18 deletions
diff --git a/include/mysql/plugin.h b/include/mysql/plugin.h
index 38573180232..ab72a9d106b 100644
--- a/include/mysql/plugin.h
+++ b/include/mysql/plugin.h
@@ -683,6 +683,41 @@ void *thd_get_ha_data(const MYSQL_THD thd, const struct handlerton *hton);
*/
void thd_set_ha_data(MYSQL_THD thd, const struct handlerton *hton,
const void *ha_data);
+
+
+/**
+ Signal that the first part of handler commit is finished, and that the
+ committed transaction is now visible and has fixed commit ordering with
+ respect to other transactions. The commit need _not_ be durable yet, and
+ typically will not be when this call makes sense.
+
+ This call is optional, if the storage engine does not call it the upper
+ layer will after the handler commit() method is done. However, the storage
+ engine may choose to call it itself to increase the possibility for group
+ commit.
+
+ In-order parallel replication uses this to apply different transaction in
+ parallel, but delay the commits of later transactions until earlier
+ transactions have committed first, thus achieving increased performance on
+ multi-core systems while still preserving full transaction consistency.
+
+ The storage engine can call this from within the commit() method, typically
+ after the commit record has been written to the transaction log, but before
+ the log has been fsync()'ed. This will allow the next replicated transaction
+ to proceed to commit before the first one has done fsync() or similar. Thus,
+ it becomes possible for multiple sequential replicated transactions to share
+ a single fsync() inside the engine in group commit.
+
+ Note that this method should _not_ be called from within the commit_ordered()
+ method, or any other place in the storage engine. When commit_ordered() is
+ used (typically when binlog is enabled), the transaction coordinator takes
+ care of this and makes group commit in the storage engine possible without
+ any other action needed on the part of the storage engine. This function
+ thd_wakeup_subsequent_commits() is only needed when no transaction
+ coordinator is used, meaning a single storage engine and no binary log.
+*/
+void thd_wakeup_subsequent_commits(MYSQL_THD thd);
+
#ifdef __cplusplus
}
#endif
diff --git a/include/mysql/plugin_audit.h.pp b/include/mysql/plugin_audit.h.pp
index d630359f5fe..564dd6272f5 100644
--- a/include/mysql/plugin_audit.h.pp
+++ b/include/mysql/plugin_audit.h.pp
@@ -236,6 +236,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
+void thd_wakeup_subsequent_commits(void* thd);
struct mysql_event_general
{
unsigned int event_subclass;
diff --git a/include/mysql/plugin_auth.h.pp b/include/mysql/plugin_auth.h.pp
index 6a877980c25..edfd7095203 100644
--- a/include/mysql/plugin_auth.h.pp
+++ b/include/mysql/plugin_auth.h.pp
@@ -236,6 +236,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
+void thd_wakeup_subsequent_commits(void* thd);
#include <mysql/plugin_auth_common.h>
typedef struct st_plugin_vio_info
{
diff --git a/include/mysql/plugin_ftparser.h.pp b/include/mysql/plugin_ftparser.h.pp
index ab15c9d176d..0cc51e259dc 100644
--- a/include/mysql/plugin_ftparser.h.pp
+++ b/include/mysql/plugin_ftparser.h.pp
@@ -189,6 +189,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
+void thd_wakeup_subsequent_commits(void* thd);
enum enum_ftparser_mode
{
MYSQL_FTPARSER_SIMPLE_MODE= 0,
diff --git a/sql/handler.cc b/sql/handler.cc
index 660697cd74b..25b2ee13187 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1455,6 +1455,8 @@ int ha_commit_one_phase(THD *thd, bool all)
*/
bool is_real_trans=all || thd->transaction.all.ha_list == 0;
DBUG_ENTER("ha_commit_one_phase");
+ if (is_real_trans)
+ thd->wait_for_prior_commit();
int res= commit_one_phase_2(thd, all, trans, is_real_trans);
DBUG_RETURN(res);
}
@@ -1494,7 +1496,10 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
}
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
+ {
+ thd->wakeup_subsequent_commits();
thd->transaction.cleanup();
+ }
DBUG_RETURN(error);
}
@@ -1569,7 +1574,10 @@ int ha_rollback_trans(THD *thd, bool all)
}
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
+ {
+ thd->wakeup_subsequent_commits();
thd->transaction.cleanup();
+ }
if (all)
thd->transaction_rollback_request= FALSE;
diff --git a/sql/log.cc b/sql/log.cc
index d312f4bc936..e3eb5f9a331 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6542,44 +6542,199 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
}
bool
-MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
+MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
+ wait_for_commit *wfc)
{
+ group_commit_entry *orig_queue;
+ wait_for_commit *list, *cur, *last;
+
/*
To facilitate group commit for the binlog, we first queue up ourselves in
the group commit queue. Then the first thread to enter the queue waits for
the LOCK_log mutex, and commits for everyone in the queue once it gets the
lock. Any other threads in the queue just wait for the first one to finish
the commit and wake them up.
+
+ To support in-order parallel replication with group commit, after we add
+ some transaction to the queue, we check if there were other transactions
+ already prepared to commit but just waiting for the first one to commit.
+ If so, we add those to the queue as well, transitively for all waiters.
*/
entry->thd->clear_wakeup_ready();
mysql_mutex_lock(&LOCK_prepare_ordered);
- group_commit_entry *orig_queue= group_commit_queue;
- entry->next= orig_queue;
- group_commit_queue= entry;
+ orig_queue= group_commit_queue;
+
+ /*
+ Iteratively process everything added to the queue, looking for waiters,
+ and their waiters, and so on. If a waiter is ready to commit, we
+ immediately add it to the queue; if not we just wake it up.
+
+ This would be natural to do with recursion, but we want to avoid
+ potentially unbounded recursion blowing the C stack, so we use the list
+ approach instead.
+ */
+ list= wfc;
+ cur= list;
+ last= list;
+ for (;;)
+ {
+ /* Add the entry to the group commit queue. */
+ entry->next= group_commit_queue;
+ group_commit_queue= entry;
+
+ if (entry->cache_mngr->using_xa)
+ {
+ DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
+ run_prepare_ordered(entry->thd, entry->all);
+ DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
+ }
+
+ if (!cur)
+ break; // Can happen if initial entry has no wait_for_commit
+
+ if (cur->subsequent_commits_list)
+ {
+ bool have_lock;
+ wait_for_commit *waiter;
+
+ mysql_mutex_lock(&cur->LOCK_wait_commit);
+ have_lock= true;
+ waiter= cur->subsequent_commits_list;
+ /* Check again, now safely under lock. */
+ if (waiter)
+ {
+ /* Grab the list of waiters and process it. */
+ cur->subsequent_commits_list= NULL;
+ do
+ {
+ wait_for_commit *next= waiter->next_subsequent_commit;
+ group_commit_entry *entry2=
+ (group_commit_entry *)waiter->opaque_pointer;
+ if (entry2)
+ {
+ /*
+ This is another transaction ready to be written to the binary
+ log. We can put it into the queue directly, without needing a
+ separate context switch to the other thread. We just set a flag
+ so that the other thread will know when it wakes up that it was
+ already processed.
+
+ So put it at the end of the list to be processed in a subsequent
+ iteration of the outer loop.
+ */
+ entry2->queued_by_other= true;
+ last->next_subsequent_commit= waiter;
+ last= waiter;
+ /*
+ As a small optimisation, we do not actually need to set
+ waiter->next_subsequent_commit to NULL, as we can use the
+ pointer `last' to check for end-of-list.
+ */
+ }
+ else
+ {
+ /*
+ Wake up the waiting transaction.
+
+ For this, we need to set the "wakeup running" flag and release
+ the waitee lock to avoid a deadlock, see comments on
+ THD::wakeup_subsequent_commits2() for details.
+ */
+ if (have_lock)
+ {
+ cur->wakeup_subsequent_commits_running= true;
+ mysql_mutex_unlock(&cur->LOCK_wait_commit);
+ have_lock= false;
+ }
+ waiter->wakeup();
+ }
+ waiter= next;
+ } while (waiter);
+ }
+ if (have_lock)
+ mysql_mutex_unlock(&cur->LOCK_wait_commit);
+ }
+ if (cur == last)
+ break;
+ cur= cur->next_subsequent_commit;
+ entry= (group_commit_entry *)cur->opaque_pointer;
+ DBUG_ASSERT(entry != NULL);
+ }
- if (entry->cache_mngr->using_xa)
+ /* Now we need to clear the wakeup_subsequent_commits_running flags. */
+ if (list)
{
- DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
- run_prepare_ordered(entry->thd, entry->all);
- DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
+ for (;;)
+ {
+ if (list->wakeup_subsequent_commits_running)
+ {
+ mysql_mutex_lock(&list->LOCK_wait_commit);
+ list->wakeup_subsequent_commits_running= false;
+ mysql_mutex_unlock(&list->LOCK_wait_commit);
+ }
+ if (list == last)
+ break;
+ list= list->next_subsequent_commit;
+ }
}
+
mysql_mutex_unlock(&LOCK_prepare_ordered);
DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered");
+ return orig_queue == NULL;
+}
+
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
+{
+ wait_for_commit *wfc;
+ bool is_leader;
+
+ wfc= entry->thd->wait_for_commit_ptr;
+ entry->queued_by_other= false;
+ if (wfc && wfc->waiting_for_commit)
+ {
+ mysql_mutex_lock(&wfc->LOCK_wait_commit);
+ /* Do an extra check here, this time safely under lock. */
+ if (wfc->waiting_for_commit)
+ {
+ wfc->opaque_pointer= entry;
+ do
+ {
+ mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
+ } while (wfc->waiting_for_commit);
+ wfc->opaque_pointer= NULL;
+ }
+ mysql_mutex_unlock(&wfc->LOCK_wait_commit);
+ }
+
+ if (entry->queued_by_other)
+ is_leader= false;
+ else
+ is_leader= queue_for_group_commit(entry, wfc);
+
/*
- The first in the queue handle group commit for all; the others just wait
+ The first in the queue handles group commit for all; the others just wait
to be signalled when group commit is done.
*/
- if (orig_queue != NULL)
+ if (is_leader)
+ trx_group_commit_leader(entry);
+ else if (!entry->queued_by_other)
entry->thd->wait_for_wakeup_ready();
else
- trx_group_commit_leader(entry);
+ {
+ /*
+ If we were queued by another prior commit, then we are woken up
+ only when the leader has already completed the commit for us.
+ So nothing to do here then.
+ */
+ }
if (!opt_optimize_thread_scheduling)
{
/* For the leader, trx_group_commit_leader() already took the lock. */
- if (orig_queue != NULL)
+ if (!is_leader)
mysql_mutex_lock(&LOCK_commit_ordered);
DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered");
@@ -6598,7 +6753,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
if (next)
{
- next->thd->signal_wakeup_ready();
+ if (next->queued_by_other)
+ next->thd->wait_for_commit_ptr->wakeup();
+ else
+ next->thd->signal_wakeup_ready();
}
else
{
@@ -6884,7 +7042,12 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
next= current->next;
if (current != leader) // Don't wake up ourself
- current->thd->signal_wakeup_ready();
+ {
+ if (current->queued_by_other)
+ current->thd->wait_for_commit_ptr->wakeup();
+ else
+ current->thd->signal_wakeup_ready();
+ }
current= next;
}
DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered");
@@ -7514,6 +7677,8 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
mysql_mutex_unlock(&LOCK_prepare_ordered);
}
+ thd->wait_for_prior_commit();
+
cookie= 0;
if (xid)
cookie= log_one_transaction(xid);
diff --git a/sql/log.h b/sql/log.h
index 0b1344aa523..2345f0acf9c 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -45,6 +45,15 @@ class TC_LOG
virtual int open(const char *opt_name)=0;
virtual void close()=0;
+ /*
+ Transaction coordinator 2-phase commit.
+
+ Must invoke the run_prepare_ordered and run_commit_ordered methods, as
+ described below for these methods.
+
+ In addition, must invoke THD::wait_for_prior_commit(), or equivalent
+ wait, to ensure that one commit waits for another if registered to do so.
+ */
virtual int log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered,
bool need_commit_ordered) = 0;
@@ -397,6 +406,7 @@ private:
class binlog_cache_mngr;
struct rpl_gtid;
+class wait_for_commit;
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
{
private:
@@ -445,6 +455,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
group commit, only used when opt_optimize_thread_scheduling is not set.
*/
bool check_purge;
+ /* Flag used to optimise around wait_for_prior_commit. */
+ bool queued_by_other;
ulong binlog_id;
};
@@ -526,6 +538,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
void do_checkpoint_request(ulong binlog_id);
void purge();
int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id);
+ bool queue_for_group_commit(group_commit_entry *entry, wait_for_commit *wfc);
bool write_transaction_to_binlog_events(group_commit_entry *entry);
void trx_group_commit_leader(group_commit_entry *leader);
bool is_xidlist_idle_nolock();
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 4e2679f1c91..bbb7c0d67bf 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -777,7 +777,7 @@ PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats,
- key_LOCK_wakeup_ready;
+ key_LOCK_wakeup_ready, key_LOCK_wait_commit;
PSI_mutex_key key_LOCK_rpl_gtid_state;
@@ -825,6 +825,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
{ &key_LOCK_rpl_gtid_state, "LOCK_rpl_gtid_state", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
@@ -888,7 +889,8 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache,
key_BINLOG_COND_queue_busy;
-PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
+PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
+ key_COND_wait_commit;
PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool;
@@ -912,6 +914,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_RELAYLOG_update_cond, "MYSQL_RELAY_LOG::update_cond", 0},
{ &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0},
{ &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0},
+ { &key_COND_wait_commit, "wait_for_commit::COND_wait_commit", 0},
{ &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 0},
{ &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL},
{ &key_COND_rpl_status, "COND_rpl_status", PSI_FLAG_GLOBAL},
diff --git a/sql/mysqld.h b/sql/mysqld.h
index ff2dfffa991..ed6d05807b0 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -253,7 +253,7 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
- key_LOCK_global_index_stats, key_LOCK_wakeup_ready;
+ key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
extern PSI_mutex_key key_LOCK_rpl_gtid_state;
@@ -279,7 +279,8 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_relay_log_info_sleep_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache;
-extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
+extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
+ key_COND_wait_commit;
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool;
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 8e91c4d7901..fa53b38ab70 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -605,6 +605,17 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton,
}
+/**
+ Allow storage engine to wakeup commits waiting in THD::wait_for_prior_commit.
+ @see thd_wakeup_subsequent_commits() definition in plugin.h
+*/
+extern "C"
+void thd_wakeup_subsequent_commits(THD *thd)
+{
+ thd->wakeup_subsequent_commits();
+}
+
+
extern "C"
long long thd_test_options(const THD *thd, long long test_options)
{
@@ -788,6 +799,7 @@ THD::THD()
#if defined(ENABLED_DEBUG_SYNC)
debug_sync_control(0),
#endif /* defined(ENABLED_DEBUG_SYNC) */
+ wait_for_commit_ptr(0),
main_warning_info(0, false, false)
{
ulong tmp;
@@ -5580,6 +5592,202 @@ THD::signal_wakeup_ready()
}
+wait_for_commit::wait_for_commit()
+ : subsequent_commits_list(0), next_subsequent_commit(0), waitee(0),
+ opaque_pointer(0),
+ waiting_for_commit(false), wakeup_subsequent_commits_running(false)
+{
+ mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
+}
+
+
+void
+wait_for_commit::wakeup()
+{
+ /*
+ We signal each waiter on their own condition and mutex (rather than using
+ pthread_cond_broadcast() or something like that).
+
+ Otherwise we would need to somehow ensure that they were done
+ waking up before we could allow this THD to be destroyed, which would
+ be annoying and unnecessary.
+ */
+ mysql_mutex_lock(&LOCK_wait_commit);
+ waiting_for_commit= false;
+ mysql_cond_signal(&COND_wait_commit);
+ mysql_mutex_unlock(&LOCK_wait_commit);
+}
+
+
+/*
+ Register that the next commit of this THD should wait to complete until
+ commit in another THD (the waitee) has completed.
+
+ The wait may occur explicitly, with the waiter sitting in
+ wait_for_prior_commit() until the waitee calls wakeup_subsequent_commits().
+
+ Alternatively, the TC (eg. binlog) may do the commits of both waitee and
+ waiter at once during group commit, resolving both of them in the right
+ order.
+
+ Only one waitee can be registered for a waiter; it must be removed by
+ wait_for_prior_commit() or unregister_wait_for_prior_commit() before a new
+ one is registered. But it is ok for several waiters to register a wait for
+ the same waitee. It is also permissible for one THD to be both a waiter and
+ a waitee at the same time.
+*/
+void
+wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
+{
+ waiting_for_commit= true;
+ DBUG_ASSERT(!this->waitee /* No prior registration allowed */);
+ this->waitee= waitee;
+
+ mysql_mutex_lock(&waitee->LOCK_wait_commit);
+ /*
+ If waitee is in the middle of wakeup, then there is nothing to wait for,
+ so we need not register. This is necessary to avoid a race in unregister,
+ see comments on wakeup_subsequent_commits2() for details.
+ */
+ if (waitee->wakeup_subsequent_commits_running)
+ waiting_for_commit= false;
+ else
+ {
+ this->next_subsequent_commit= waitee->subsequent_commits_list;
+ waitee->subsequent_commits_list= this;
+ }
+ mysql_mutex_unlock(&waitee->LOCK_wait_commit);
+}
+
+
+/*
+ Wait for commit of another transaction to complete, as already registered
+ with register_wait_for_prior_commit(). If the commit already completed,
+ returns immediately.
+*/
+void
+wait_for_commit::wait_for_prior_commit2()
+{
+ mysql_mutex_lock(&LOCK_wait_commit);
+ while (waiting_for_commit)
+ mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+ mysql_mutex_unlock(&LOCK_wait_commit);
+ waitee= NULL;
+}
+
+
+/*
+ Wakeup anyone waiting for us to have committed.
+
+ Note about locking:
+
+ We have a potential race or deadlock between wakeup_subsequent_commits() in
+ the waitee and unregister_wait_for_prior_commit() in the waiter.
+
+ Both waiter and waitee needs to take their own lock before it is safe to take
+ a lock on the other party - else the other party might disappear and invalid
+ memory data could be accessed. But if we take the two locks in different
+ order, we may end up in a deadlock.
+
+ The waiter needs to lock the waitee to delete itself from the list in
+ unregister_wait_for_prior_commit(). Thus wakeup_subsequent_commits() can not
+ hold its own lock while locking waiters, lest we deadlock.
+
+ So we need to prevent unregister_wait_for_prior_commit() running while wakeup
+ is in progress - otherwise the unregister could complete before the wakeup,
+ leading to incorrect spurious wakeup or accessing invalid memory.
+
+ However, if we are in the middle of running wakeup_subsequent_commits(), then
+ there is no need for unregister_wait_for_prior_commit() in the first place -
+ the waiter can just do a normal wait_for_prior_commit(), as it will be
+ immediately woken up.
+
+ So the solution to the potential race/deadlock is to set a flag in the waitee
+ that wakeup_subsequent_commits() is in progress. When this flag is set,
+ unregister_wait_for_prior_commit() becomes just wait_for_prior_commit().
+
+ Then also register_wait_for_prior_commit() needs to check if
+ wakeup_subsequent_commits() is running, and skip the registration if
+ so. This is needed in case a new waiter manages to register itself and
+ immediately try to unregister while wakeup_subsequent_commits() is
+ running. Else the new waiter would also wait rather than unregister, but it
+ would not be woken up until next wakeup, which could be potentially much
+ later than necessary.
+*/
+void
+wait_for_commit::wakeup_subsequent_commits2()
+{
+ wait_for_commit *waiter;
+
+ mysql_mutex_lock(&LOCK_wait_commit);
+ wakeup_subsequent_commits_running= true;
+ waiter= subsequent_commits_list;
+ subsequent_commits_list= NULL;
+ mysql_mutex_unlock(&LOCK_wait_commit);
+
+ while (waiter)
+ {
+ /*
+ Important: we must grab the next pointer before waking up the waiter;
+ once the wakeup is done, the field could be invalidated at any time.
+ */
+ wait_for_commit *next= waiter->next_subsequent_commit;
+ waiter->wakeup();
+ waiter= next;
+ }
+
+ mysql_mutex_lock(&LOCK_wait_commit);
+ wakeup_subsequent_commits_running= false;
+ mysql_mutex_unlock(&LOCK_wait_commit);
+}
+
+
+/* Cancel a previously registered wait for another THD to commit before us. */
+void
+wait_for_commit::unregister_wait_for_prior_commit2()
+{
+ mysql_mutex_lock(&LOCK_wait_commit);
+ if (waiting_for_commit)
+ {
+ wait_for_commit *loc_waitee= this->waitee;
+ wait_for_commit **next_ptr_ptr, *cur;
+ mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
+ if (loc_waitee->wakeup_subsequent_commits_running)
+ {
+ /*
+ When a wakeup is running, we cannot safely remove ourselves from the
+ list without corrupting it. Instead we can just wait, as wakeup is
+ already in progress and will thus be immediate.
+
+ See comments on wakeup_subsequent_commits2() for more details.
+ */
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ while (waiting_for_commit)
+ mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+ }
+ else
+ {
+ /* Remove ourselves from the list in the waitee. */
+ next_ptr_ptr= &loc_waitee->subsequent_commits_list;
+ while ((cur= *next_ptr_ptr) != NULL)
+ {
+ if (cur == this)
+ {
+ *next_ptr_ptr= this->next_subsequent_commit;
+ break;
+ }
+ next_ptr_ptr= &cur->next_subsequent_commit;
+ }
+ waiting_for_commit= false;
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ }
+ }
+ mysql_mutex_unlock(&LOCK_wait_commit);
+ this->waitee= NULL;
+}
+
+
bool Discrete_intervals_list::append(ulonglong start, ulonglong val,
ulonglong incr)
{
diff --git a/sql/sql_class.h b/sql/sql_class.h
index bb5b2c4e775..4e1917f62b7 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1553,6 +1553,115 @@ private:
};
+/*
+ Class to facilitate the commit of one transactions waiting for the commit of
+ another transaction to complete first.
+
+ This is used during (parallel) replication, to allow different transactions
+ to be applied in parallel, but still commit in order.
+
+ The transaction that wants to wait for a prior commit must first register
+ to wait with register_wait_for_prior_commit(waitee). Such registration
+ must be done holding the waitee->LOCK_wait_commit, to prevent the other
+ THD from disappearing during the registration.
+
+ Then during commit, if a THD is registered to wait, it will call
+ wait_for_prior_commit() as part of ha_commit_trans(). If no wait is
+ registered, or if the waitee for has already completed commit, then
+ wait_for_prior_commit() returns immediately.
+
+ And when a THD that may be waited for has completed commit (more precisely
+ commit_ordered()), then it must call wakeup_subsequent_commits() to wake
+ up any waiters. Note that this must be done at a point that is guaranteed
+ to be later than any waiters registering themselves. It is safe to call
+ wakeup_subsequent_commits() multiple times, as waiters are removed from
+ registration as part of the wakeup.
+
+ The reason for separate register and wait calls is that this allows to
+ register the wait early, at a point where the waited-for THD is known to
+ exist. And then the actual wait can be done much later, where the
+ waited-for THD may have been long gone. By registering early, the waitee
+ can signal before disappearing.
+*/
+struct wait_for_commit
+{
+ /*
+ The LOCK_wait_commit protects the fields subsequent_commits_list and
+ wakeup_subsequent_commits_running (for a waitee), and the flag
+ waiting_for_commit and associated COND_wait_commit (for a waiter).
+ */
+ mysql_mutex_t LOCK_wait_commit;
+ mysql_cond_t COND_wait_commit;
+ /* List of threads that did register_wait_for_prior_commit() on us. */
+ wait_for_commit *subsequent_commits_list;
+ /* Link field for entries in subsequent_commits_list. */
+ wait_for_commit *next_subsequent_commit;
+ /* Our waitee, if we did register_wait_for_prior_commit(), else NULL. */
+ wait_for_commit *waitee;
+ /*
+ Generic pointer for use by the transaction coordinator to optimise the
+ waiting for improved group commit.
+
+ Currently used by binlog TC to signal that a waiter is ready to commit, so
+ that the waitee can grab it and group commit it directly. It is free to be
+ used by another transaction coordinator for similar purposes.
+ */
+ void *opaque_pointer;
+ /*
+ The waiting_for_commit flag is cleared when a waiter has been woken
+ up. The COND_wait_commit condition is signalled when this has been
+ cleared.
+ */
+ bool waiting_for_commit;
+ /*
+ Flag set when wakeup_subsequent_commits_running() is active, see commonts
+ on that function for details.
+ */
+ bool wakeup_subsequent_commits_running;
+
+ void register_wait_for_prior_commit(wait_for_commit *waitee);
+ void wait_for_prior_commit()
+ {
+ /*
+ Quick inline check, to avoid function call and locking in the common case
+ where no wakeup is registered, or a registered wait was already signalled.
+ */
+ if (waiting_for_commit)
+ wait_for_prior_commit2();
+ }
+ void wakeup_subsequent_commits()
+ {
+ /*
+ Do the check inline, so only the wakeup case takes the cost of a function
+ call for every commmit.
+
+ Note that the check is done without locking. It is the responsibility of
+ the user of the wakeup facility to ensure that no waiters can register
+ themselves after the last call to wakeup_subsequent_commits().
+
+ This avoids having to take another lock for every commit, which would be
+ pointless anyway - even if we check under lock, there is nothing to
+ prevent a waiter from arriving just after releasing the lock.
+ */
+ if (subsequent_commits_list)
+ wakeup_subsequent_commits2();
+ }
+ void unregister_wait_for_prior_commit()
+ {
+ if (waiting_for_commit)
+ unregister_wait_for_prior_commit2();
+ }
+
+ void wakeup();
+
+ void wait_for_prior_commit2();
+ void wakeup_subsequent_commits2();
+ void unregister_wait_for_prior_commit2();
+
+ wait_for_commit();
+};
+
+
extern "C" void my_message_sql(uint error, const char *str, myf MyFlags);
class THD;
@@ -3194,6 +3303,19 @@ public:
void wait_for_wakeup_ready();
/* Wake this thread up from wait_for_wakeup_ready(). */
void signal_wakeup_ready();
+
+ wait_for_commit *wait_for_commit_ptr;
+ void wait_for_prior_commit()
+ {
+ if (wait_for_commit_ptr)
+ wait_for_commit_ptr->wait_for_prior_commit();
+ }
+ void wakeup_subsequent_commits()
+ {
+ if (wait_for_commit_ptr)
+ wait_for_commit_ptr->wakeup_subsequent_commits();
+ }
+
private:
/** The current internal error handler for this thread, or NULL. */
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 4b77936550b..4d4bb7bd1f3 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -2924,6 +2924,11 @@ innobase_commit(
/* We were instructed to commit the whole transaction, or
this is an SQL statement end and autocommit is on */
+ /* At this point commit order is fixed and transaction is
+ visible to others. So we can wakeup other commits waiting for
+ this one, to allow then to group commit with us. */
+ thd_wakeup_subsequent_commits(thd);
+
/* We did the first part already in innobase_commit_ordered(),
Now finish by doing a write + flush of logs. */
trx_commit_complete_for_mysql(trx);
diff --git a/storage/xtradb/handler/ha_innodb.cc b/storage/xtradb/handler/ha_innodb.cc
index f7507a04412..e80810d3948 100644
--- a/storage/xtradb/handler/ha_innodb.cc
+++ b/storage/xtradb/handler/ha_innodb.cc
@@ -3585,6 +3585,11 @@ innobase_commit(
/* We were instructed to commit the whole transaction, or
this is an SQL statement end and autocommit is on */
+ /* At this point commit order is fixed and transaction is
+ visible to others. So we can wakeup other commits waiting for
+ this one, to allow then to group commit with us. */
+ thd_wakeup_subsequent_commits(thd);
+
/* We did the first part already in innobase_commit_ordered(),
Now finish by doing a write + flush of logs. */
trx_commit_complete_for_mysql(trx);