summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2010-10-29 13:58:47 +0200
committerunknown <knielsen@knielsen-hq.org>2010-10-29 13:58:47 +0200
commit5614ebe7ed8e56cbd345158395c1c1930b0752d1 (patch)
treebfa51ddf7459c7333d3a335c97af811dc0f05e0e /sql/log.cc
parentb91ad17cea8572f7cd21fb97d0b0ddce9f8b3c46 (diff)
downloadmariadb-git-5614ebe7ed8e56cbd345158395c1c1930b0752d1.tar.gz
MWL#116: after-architecture-review code refactoring and cleanup.
Remove the extra class hierarchy with classes TC_LOG_queued, TC_LOG_unordered, and TC_LOG_group_commit, folding the code into the TC_LOG_MMAP and TC_LOG_BINLOG classes. In particular TC_LOG_BINLOG is greatly simplified by this, unifying the code path for transactional and non-transactional commit. Remove unnecessary locking of LOCK_log in MYSQL_BIN_LOG::write() (backport of same fix from mysql-5.5).
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc658
1 files changed, 177 insertions, 481 deletions
diff --git a/sql/log.cc b/sql/log.cc
index e29758b7f0b..f2884c1ad38 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -155,19 +155,14 @@ class binlog_trx_data {
public:
binlog_trx_data()
: at_least_one_stmt_committed(0), incident(FALSE), m_pending(0),
- before_stmt_pos(MY_OFF_T_UNDEF), using_xa(0), commit_bin_log_file_pos(0)
+ before_stmt_pos(MY_OFF_T_UNDEF), commit_bin_log_file_pos(0), using_xa(0)
{
trans_log.end_of_file= max_binlog_cache_size;
- (void) my_pthread_mutex_init(&LOCK_binlog_participant, MY_MUTEX_INIT_SLOW,
- "LOCK_binlog_participant", MYF(0));
- (void) pthread_cond_init(&COND_binlog_participant, 0);
}
~binlog_trx_data()
{
DBUG_ASSERT(pending() == NULL);
- (void) pthread_cond_destroy(&COND_binlog_participant);
- (void) pthread_mutex_destroy(&LOCK_binlog_participant);
close_cached_file(&trans_log);
}
@@ -265,46 +260,17 @@ public:
Binlog position before the start of the current statement.
*/
my_off_t before_stmt_pos;
-
- /* 0 or error when writing to binlog; set during group commit. */
- int error;
- /* If error != 0, value of errno (for my_error() reporting). */
- int commit_errno;
- /* Link for queueing transactions up for group commit to binlog. */
- binlog_trx_data *next;
- /*
- Flag set true when group commit for this transaction is finished; used
- with pthread_cond_wait() to wait until commit is done.
- This flag is protected by LOCK_binlog_participant.
- */
- bool done;
/*
- Flag set if this transaction is the group commit leader that will handle
- the actual writing to the binlog.
- This flag is protected by LOCK_binlog_participant.
+ Binlog position after current commit, available to storage engines during
+ commit_ordered() and commit().
*/
- bool group_commit_leader;
+ ulonglong commit_bin_log_file_pos;
+
/*
Flag set true if this transaction is committed with log_xid() as part of
XA, false if not.
*/
bool using_xa;
- /*
- Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be
- written during group commit. The incident_event is only valid if
- has_incident() is true.
- */
- Log_event *begin_event;
- Log_event *end_event;
- Log_event *incident_event;
- /* Mutex and condition for wakeup after group commit. */
- pthread_mutex_t LOCK_binlog_participant;
- pthread_cond_t COND_binlog_participant;
- /*
- Binlog position after current commit, available to storage engines during
- commit() and commit_ordered().
- */
- ulonglong commit_bin_log_file_pos;
};
handlerton *binlog_hton;
@@ -1441,30 +1407,6 @@ static int binlog_close_connection(handlerton *hton, THD *thd)
return 0;
}
-/* Helper functions for binlog_flush_trx_cache(). */
-static int
-binlog_flush_trx_cache_prepare(THD *thd)
-{
- if (thd->binlog_flush_pending_rows_event(TRUE))
- return 1;
- return 0;
-}
-
-static void
-binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data)
-{
- IO_CACHE *trans_log= &trx_data->trans_log;
-
- trx_data->reset();
-
- statistic_increment(binlog_cache_use, &LOCK_status);
- if (trans_log->disk_writes != 0)
- {
- statistic_increment(binlog_cache_disk_use, &LOCK_status);
- trans_log->disk_writes= 0;
- }
-}
-
/*
End a transaction, writing events to the binary log.
@@ -1487,14 +1429,15 @@ binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data)
*/
static int
binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data,
- Log_event *end_ev)
+ Log_event *end_ev, bool all)
{
DBUG_ENTER("binlog_flush_trx_cache");
+ IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_PRINT("info", ("thd->options={ %s%s}",
FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->options, OPTION_BEGIN)));
- if (binlog_flush_trx_cache_prepare(thd))
+ if (thd->binlog_flush_pending_rows_event(TRUE))
DBUG_RETURN(1);
/*
@@ -1507,9 +1450,17 @@ binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data,
were, we would have to ensure that we're not ending a statement
inside a stored function.
*/
- int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, end_ev);
+ int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data,
+ end_ev, all);
- binlog_flush_trx_cache_finish(thd, trx_data);
+ trx_data->reset();
+
+ statistic_increment(binlog_cache_use, &LOCK_status);
+ if (trans_log->disk_writes != 0)
+ {
+ statistic_increment(binlog_cache_disk_use, &LOCK_status);
+ trans_log->disk_writes= 0;
+ }
DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL);
DBUG_RETURN(error);
@@ -1578,51 +1529,11 @@ static LEX_STRING const write_error_msg=
static int binlog_prepare(handlerton *hton, THD *thd, bool all)
{
/*
- If this prepare is for a single statement in the middle of a transactions,
- not the actual transaction commit, then we do nothing. The real work is
- only done later, in the prepare for making persistent changes.
+ do nothing.
+ just pretend we can do 2pc, so that MySQL won't
+ switch to 1pc.
+ real work will be done in MYSQL_BIN_LOG::log_and_order()
*/
- if (!all && (thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
- return 0;
-
- binlog_trx_data *trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
-
- trx_data->using_xa= TRUE;
-
- if (binlog_flush_trx_cache_prepare(thd))
- return 1;
-
- my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
- if (!xid)
- {
- /* Skip logging this transaction, marked by setting end_event to NULL. */
- trx_data->end_event= NULL;
- return 0;
- }
-
- /*
- Allocate the extra events that will be logged to the binlog in binlog group
- commit. Use placement new to allocate them on the THD memroot, as they need
- to remain live until log_xid() returns.
- */
- size_t needed_size= sizeof(Query_log_event) + sizeof(Xid_log_event);
- if (trx_data->has_incident())
- needed_size+= sizeof(Incident_log_event);
- uchar *mem= (uchar *)thd->alloc(needed_size);
- if (!mem)
- return 1;
-
- trx_data->begin_event= new ((void *)mem)
- Query_log_event(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
- mem+= sizeof(Query_log_event);
-
- trx_data->end_event= new ((void *)mem) Xid_log_event(thd, xid);
-
- if (trx_data->has_incident())
- trx_data->incident_event= new ((void *)(mem + sizeof(Xid_log_event)))
- Incident_log_event(thd, INCIDENT_LOST_EVENTS, write_error_msg);
-
return 0;
}
@@ -1646,11 +1557,11 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
binlog_trx_data *const trx_data=
(binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
- if (trx_data->using_xa)
+ if (trx_data->empty())
{
// we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid()
- binlog_flush_trx_cache_finish(thd, trx_data);
- DBUG_RETURN(error);
+ trx_data->reset();
+ DBUG_RETURN(0);
}
/*
@@ -1673,7 +1584,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
!stmt_has_updated_trans_table(thd) && stmt_has_updated_non_trans_table(thd)))
{
Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0);
- error= binlog_flush_trx_cache(thd, trx_data, &end_ev);
+ error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all);
}
trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0;
@@ -1757,7 +1668,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
thd->current_stmt_binlog_row_based))
{
Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0);
- error= binlog_flush_trx_cache(thd, trx_data, &end_ev);
+ error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all);
}
/*
Otherwise, we simply truncate the cache as there is no change on
@@ -2599,6 +2510,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
MYSQL_BIN_LOG::MYSQL_BIN_LOG()
:bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
need_start_event(TRUE),
+ group_commit_queue(0), num_commits(0), num_group_commits(0),
is_relay_log(0),
description_event_for_exec(0), description_event_for_queue(0)
{
@@ -2626,7 +2538,6 @@ void MYSQL_BIN_LOG::cleanup()
delete description_event_for_exec;
(void) pthread_mutex_destroy(&LOCK_log);
(void) pthread_mutex_destroy(&LOCK_index);
- (void) pthread_mutex_destroy(&LOCK_queue);
(void) pthread_cond_destroy(&update_cond);
}
DBUG_VOID_RETURN;
@@ -2655,8 +2566,6 @@ void MYSQL_BIN_LOG::init_pthread_objects()
*/
(void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index",
MYF_NO_DEADLOCK_DETECTION);
- (void) my_pthread_mutex_init(&LOCK_queue, MY_MUTEX_INIT_FAST, "LOCK_queue",
- MYF(0));
(void) pthread_cond_init(&update_cond, 0);
}
@@ -4461,11 +4370,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
}
/*
- Flush the pending rows event to the transaction cache or to the
- log file. Since this function potentially aquire the LOCK_log
- mutex, we do this before aquiring the LOCK_log mutex in this
- function.
-
We only end the statement if we are in a top-level statement. If
we are inside a stored function, we do not end the statement since
this will close all tables on the slave.
@@ -4475,8 +4379,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
if (thd->binlog_flush_pending_rows_event(end_stmt))
DBUG_RETURN(error);
- pthread_mutex_lock(&LOCK_log);
-
/*
In most cases this is only called if 'is_open()' is true; in fact this is
mostly called if is_open() *was* true a few instructions before, but it
@@ -4497,7 +4399,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
thd->lex->sql_command != SQLCOM_SAVEPOINT &&
!binlog_filter->db_ok(local_db)))
{
- VOID(pthread_mutex_unlock(&LOCK_log));
DBUG_RETURN(0);
}
#endif /* HAVE_REPLICATION */
@@ -4539,15 +4440,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
thd->binlog_start_trans_and_stmt();
file= trans_log;
}
- /*
- TODO as Mats suggested, for all the cases above where we write to
- trans_log, it sounds unnecessary to lock LOCK_log. We should rather
- test first if we want to write to trans_log, and if not, lock
- LOCK_log.
- */
}
#endif /* USING_TRANSACTIONS */
DBUG_PRINT("info",("event type: %d",event_info->get_type_code()));
+ if (file == &log_file)
+ pthread_mutex_lock(&LOCK_log);
/*
No check for auto events flag here - this write method should
@@ -4572,7 +4469,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
thd->first_successful_insert_id_in_prev_stmt_for_binlog);
if (e.write(file))
- goto err;
+ goto err_unlock;
}
if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0)
{
@@ -4583,13 +4480,13 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
thd->auto_inc_intervals_in_cur_stmt_for_binlog.
minimum());
if (e.write(file))
- goto err;
+ goto err_unlock;
}
if (thd->rand_used)
{
Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2);
if (e.write(file))
- goto err;
+ goto err_unlock;
}
if (thd->user_var_events.elements)
{
@@ -4604,7 +4501,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
user_var_event->type,
user_var_event->charset_number);
if (e.write(file))
- goto err;
+ goto err_unlock;
}
}
}
@@ -4616,23 +4513,26 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
if (event_info->write(file) ||
DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0))
- goto err;
+ goto err_unlock;
if (file == &log_file) // we are writing to the real log (disk)
{
if (flush_and_sync())
- goto err;
+ goto err_unlock;
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
error=0;
+err_unlock:
+ if (file == &log_file)
+ pthread_mutex_unlock(&LOCK_log);
+
err:
if (error)
set_write_error(thd);
}
- pthread_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
@@ -4957,10 +4857,16 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
bool
MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
- Log_event *end_ev)
+ Log_event *end_ev, bool all)
{
+ group_commit_entry entry;
DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
+ entry.thd= thd;
+ entry.trx_data= trx_data;
+ entry.error= 0;
+ entry.all= all;
+
/*
Create the necessary events here, where we have the correct THD (and
thread context).
@@ -4969,23 +4875,23 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
thread.
*/
Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
- trx_data->begin_event= &qinfo;
- trx_data->end_event= end_ev;
+ entry.begin_event= &qinfo;
+ entry.end_event= end_ev;
if (trx_data->has_incident())
{
Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg);
- trx_data->incident_event= &inc_ev;
- DBUG_RETURN(write_transaction_to_binlog_events(trx_data));
+ entry.incident_event= &inc_ev;
+ DBUG_RETURN(write_transaction_to_binlog_events(&entry));
}
else
{
- trx_data->incident_event= NULL;
- DBUG_RETURN(write_transaction_to_binlog_events(trx_data));
+ entry.incident_event= NULL;
+ DBUG_RETURN(write_transaction_to_binlog_events(&entry));
}
}
bool
-MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data)
+MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{
/*
To facilitate group commit for the binlog, we first queue up ourselves in
@@ -4995,91 +4901,61 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data)
the commit and wake them up.
*/
- pthread_mutex_lock(&trx_data->LOCK_binlog_participant);
-
- pthread_mutex_lock(&LOCK_queue);
- binlog_trx_data *orig_queue= group_commit_queue;
- trx_data->next= orig_queue;
- group_commit_queue= trx_data;
- pthread_mutex_unlock(&LOCK_queue);
+ entry->thd->clear_wakeup_ready();
+ pthread_mutex_lock(&LOCK_prepare_ordered);
+ group_commit_entry *orig_queue= group_commit_queue;
+ entry->next= orig_queue;
+ group_commit_queue= entry;
- if (orig_queue != NULL)
+ if (entry->trx_data->using_xa)
{
- trx_data->group_commit_leader= FALSE;
- trx_data->done= FALSE;
- trx_group_commit_participant(trx_data);
+ DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
+ run_prepare_ordered(entry->thd, entry->all);
+ DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
}
- else
- {
- trx_data->group_commit_leader= TRUE;
- pthread_mutex_unlock(&trx_data->LOCK_binlog_participant);
- trx_group_commit_leader(NULL);
- }
-
- return trx_group_commit_finish(trx_data);
-}
-
-/*
- Participate as secondary transaction in group commit.
-
- Another thread is already waiting to obtain the LOCK_log, and should include
- this thread in the group commit once the log is obtained. So here we put
- ourself in the queue and wait to be signalled that the group commit is done.
+ pthread_mutex_unlock(&LOCK_prepare_ordered);
- Note that this function must be called with trx_data->LOCK_binlog_participant
- locked; the mutex will be released before return.
-*/
-void
-MYSQL_BIN_LOG::trx_group_commit_participant(binlog_trx_data *trx_data)
-{
- safe_mutex_assert_owner(&trx_data->LOCK_binlog_participant);
+ /*
+ The first in the queue handle group commit for all; the others just wait
+ to be signalled when group commit is done.
+ */
+ if (orig_queue != NULL)
+ entry->thd->wait_for_wakeup_ready();
+ else
+ trx_group_commit_leader(entry);
- /* Wait until trx_data.done == true and woken up by the leader. */
- while (!trx_data->done)
- pthread_cond_wait(&trx_data->COND_binlog_participant,
- &trx_data->LOCK_binlog_participant);
- pthread_mutex_unlock(&trx_data->LOCK_binlog_participant);
-}
+ if (!entry->error)
+ return 0;
-bool
-MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data)
-{
- DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_finish");
- DBUG_PRINT("info", ("trx_data->error=%d\n", trx_data->error));
- if (trx_data->error)
+ switch (entry->error)
{
- switch (trx_data->error)
- {
- case ER_ERROR_ON_WRITE:
- my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, trx_data->commit_errno);
- break;
- case ER_ERROR_ON_READ:
- my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH),
- trx_data->trans_log.file_name, trx_data->commit_errno);
- break;
- default:
- /*
- There are not (and should not be) any errors thrown not covered above.
- But just in case one is added later without updating the above switch
- statement, include a catch-all.
- */
- my_printf_error(trx_data->error,
- "Error writing transaction to binary log: %d",
- MYF(ME_NOREFRESH), trx_data->error);
- }
-
+ case ER_ERROR_ON_WRITE:
+ my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, entry->commit_errno);
+ break;
+ case ER_ERROR_ON_READ:
+ my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH),
+ entry->trx_data->trans_log.file_name, entry->commit_errno);
+ break;
+ default:
/*
- Since we return error, this transaction XID will not be committed, so
- we need to mark it as not needed for recovery (unlog() is not called
- for a transaction if log_xid() fails).
- */
- if (trx_data->end_event->get_type_code() == XID_EVENT)
- mark_xid_done();
-
- DBUG_RETURN(1);
+ There are not (and should not be) any errors thrown not covered above.
+ But just in case one is added later without updating the above switch
+ statement, include a catch-all.
+ */
+ my_printf_error(entry->error,
+ "Error writing transaction to binary log: %d",
+ MYF(ME_NOREFRESH), entry->error);
}
- DBUG_RETURN(0);
+ /*
+ Since we return error, this transaction XID will not be committed, so
+ we need to mark it as not needed for recovery (unlog() is not called
+ for a transaction if log_xid() fails).
+ */
+ if (entry->trx_data->using_xa)
+ mark_xid_done();
+
+ return 1;
}
/*
@@ -5093,69 +4969,36 @@ MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data)
*/
void
-MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
+MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{
+ DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader");
uint xid_count= 0;
uint write_count= 0;
- /* First, put anything from group_log_xid into the queue. */
- binlog_trx_data *full_queue= NULL;
- binlog_trx_data **next_ptr= &full_queue;
- for (TC_group_commit_entry *entry= first; entry; entry= entry->next)
- {
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton);
-
- /* Skip log_xid for transactions without xid, marked by NULL end_event. */
- if (!trx_data->end_event)
- continue;
-
- trx_data->error= 0;
- *next_ptr= trx_data;
- next_ptr= &(trx_data->next);
- }
-
/*
- Next, lock the LOCK_log(), and once we get it, add any additional writes
+ Lock the LOCK_log(), and once we get it, collect any additional writes
that queued up while we were waiting.
-
- Note that if some writer not going through log_xid() comes in and gets the
- LOCK_log before us, they will not be able to include us in their group
- commit (and they are not able to handle ensuring same commit order between
- us and participating transactional storage engines anyway).
-
- On the other hand, when we get the LOCK_log, we will be able to include
- any non-trasactional writes that queued up in our group commit. This
- should hopefully not be too big of a problem, as group commit is most
- important for the transactional case anyway when durability (fsync) is
- enabled.
*/
VOID(pthread_mutex_lock(&LOCK_log));
+ DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
- /*
- As the queue is in reverse order of entering, reverse the queue as we add
- it to the existing one. Note that there is no ordering defined between
- transactional and non-transactional commits.
- */
- pthread_mutex_lock(&LOCK_queue);
- binlog_trx_data *current= group_commit_queue;
+ pthread_mutex_lock(&LOCK_prepare_ordered);
+ group_commit_entry *current= group_commit_queue;
group_commit_queue= NULL;
- pthread_mutex_unlock(&LOCK_queue);
- binlog_trx_data *xtra_queue= NULL;
+ pthread_mutex_unlock(&LOCK_prepare_ordered);
+
+ /* As the queue is in reverse order of entering, reverse it. */
+ group_commit_entry *queue= NULL;
while (current)
{
- current->error= 0;
- binlog_trx_data *next= current->next;
- current->next= xtra_queue;
- xtra_queue= current;
+ group_commit_entry *next= current->next;
+ current->next= queue;
+ queue= current;
current= next;
}
- *next_ptr= xtra_queue;
+ DBUG_ASSERT(leader == queue /* the leader should be first in queue */);
- /*
- Now we have in full_queue the list of transactions to be committed in
- order.
- */
+ /* Now we have in queue the list of transactions to be committed in order. */
DBUG_ASSERT(is_open());
if (likely(is_open())) // Should always be true
{
@@ -5169,9 +5012,14 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
current->error and let the thread do the error reporting itself once
we wake it up.
*/
- for (current= full_queue; current != NULL; current= current->next)
+ for (current= queue; current != NULL; current= current->next)
{
- IO_CACHE *cache= &current->trans_log;
+ binlog_trx_data *trx_data= current->trx_data;
+ IO_CACHE *cache= &trx_data->trans_log;
+
+ /* Skip log_xid for transactions without xid, marked by NULL end_event. */
+ if (!current->end_event)
+ continue;
/*
We only bother to write to the binary log if there is anything
@@ -5186,9 +5034,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
write_count++;
}
- current->commit_bin_log_file_pos=
+ trx_data->commit_bin_log_file_pos=
log_file.pos_in_file + (log_file.write_pos - log_file.write_buffer);
- if (current->end_event->get_type_code() == XID_EVENT)
+ if (trx_data->using_xa)
xid_count++;
}
@@ -5196,7 +5044,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
{
if (flush_and_sync())
{
- for (current= full_queue; current != NULL; current= current->next)
+ for (current= queue; current != NULL; current= current->next)
{
if (!current->error)
{
@@ -5213,7 +5061,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
/*
if any commit_events are Xid_log_event, increase the number of
- prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated
+ prepared_xids (it's decreased in ::unlog()). Binlog cannot be rotated
if there're prepared xids in it - see the comment in new_file() for
an explanation.
If no Xid_log_events (then it's all Query_log_event) rotate binlog,
@@ -5227,37 +5075,49 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
- VOID(pthread_mutex_unlock(&LOCK_log));
-
+ DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
+ pthread_mutex_lock(&LOCK_commit_ordered);
/*
- Signal those that are not part of group_log_xid, and are not group leaders
- running the queue.
+ We cannot unlock LOCK_log until we have locked LOCK_commit_ordered;
+ otherwise scheduling could allow the next group commit to run ahead of us,
+ messing up the order of commit_ordered() calls. But as soon as
+ LOCK_commit_ordered is obtained, we can let the next group commit start.
+ */
+ pthread_mutex_unlock(&LOCK_log);
+ DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log");
+ ++num_group_commits;
- Since a group leader runs the queue itself if a group_log_xid does not get
- to do it forst, such leader threads do not need wait or wakeup.
+ /*
+ Wakeup each participant waiting for our group commit, first calling the
+ commit_ordered() methods for any transactions doing 2-phase commit.
*/
- for (current= xtra_queue; current != NULL; current= current->next)
+ current= queue;
+ while (current != NULL)
{
- /*
- Note that we need to take LOCK_binlog_participant even in the case of a
- leader!
+ DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered");
+ ++num_commits;
+ if (current->trx_data->using_xa && !current->error)
+ run_commit_ordered(current->thd, current->all);
- Otherwise there is a race between setting and testing the
- group_commit_leader flag.
+ /*
+ Careful not to access current->next after waking up the other thread! As
+ it may change immediately after wakeup.
*/
- pthread_mutex_lock(&current->LOCK_binlog_participant);
- if (!current->group_commit_leader)
- {
- current->done= true;
- pthread_cond_signal(&current->COND_binlog_participant);
- }
- pthread_mutex_unlock(&current->LOCK_binlog_participant);
+ group_commit_entry *next= current->next;
+ if (current != leader) // Don't wake up ourself
+ current->thd->signal_wakeup_ready();
+ current= next;
}
+ DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered");
+ pthread_mutex_unlock(&LOCK_commit_ordered);
+
+ DBUG_VOID_RETURN;
}
int
-MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data)
+MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry)
{
+ binlog_trx_data *trx_data= entry->trx_data;
IO_CACHE *cache= &trx_data->trans_log;
/*
Log "BEGIN" at the beginning of every transaction. Here, a transaction is
@@ -5272,7 +5132,7 @@ MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data)
in wrong positions being shown to the user, MASTER_POS_WAIT
undue waiting etc.
*/
- if (trx_data->begin_event->write(&log_file))
+ if (entry->begin_event->write(&log_file))
return ER_ERROR_ON_WRITE;
DBUG_EXECUTE_IF("crash_before_writing_xid",
@@ -5289,10 +5149,10 @@ MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data)
if (write_cache(cache))
return ER_ERROR_ON_WRITE;
- if (trx_data->end_event->write(&log_file))
+ if (entry->end_event->write(&log_file))
return ER_ERROR_ON_WRITE;
- if (trx_data->has_incident() && trx_data->incident_event->write(&log_file))
+ if (entry->incident_event && entry->incident_event->write(&log_file))
return ER_ERROR_ON_WRITE;
if (cache->error) // Error on read
@@ -5754,30 +5614,6 @@ TC_LOG::run_commit_ordered(THD *thd, bool all)
}
}
-TC_LOG_queued::TC_LOG_queued() : group_commit_queue(NULL)
-{
-}
-
-TC_LOG_queued::~TC_LOG_queued()
-{
-}
-
-TC_LOG_queued::TC_group_commit_entry *
-TC_LOG_queued::reverse_queue(TC_LOG_queued::TC_group_commit_entry *queue)
-{
- TC_group_commit_entry *entry= queue;
- TC_group_commit_entry *prev= NULL;
- while (entry)
- {
- TC_group_commit_entry *next= entry->next;
- entry->next= prev;
- prev= entry;
- entry= next;
- }
-
- return prev;
-}
-
int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered,
bool need_commit_ordered)
@@ -5886,142 +5722,6 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
}
-TC_LOG_group_commit::TC_LOG_group_commit()
- : num_commits(0), num_group_commits(0)
-{
-}
-
-TC_LOG_group_commit::~TC_LOG_group_commit()
-{
-}
-
-void
-TC_LOG_group_commit::init()
-{
- my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW,
- "LOCK_group_commit", MYF(0));
-}
-
-void
-TC_LOG_group_commit::deinit()
-{
- pthread_mutex_destroy(&LOCK_group_commit);
-}
-
-int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all,
- bool need_prepare_ordered,
- bool need_commit_ordered)
-{
- IF_DBUG(int err;)
- int cookie;
- struct TC_group_commit_entry entry;
- bool is_group_commit_leader;
-
- thd->clear_wakeup_ready();
- entry.thd= thd;
- entry.all= all;
- entry.xid_error= 0;
-
- pthread_mutex_lock(&LOCK_prepare_ordered);
- TC_group_commit_entry *previous_queue= group_commit_queue;
- entry.next= previous_queue;
- group_commit_queue= &entry;
-
- DEBUG_SYNC(thd, "commit_before_prepare_ordered");
- run_prepare_ordered(thd, all);
- DEBUG_SYNC(thd, "commit_after_prepare_ordered");
- pthread_mutex_unlock(&LOCK_prepare_ordered);
-
- is_group_commit_leader= (previous_queue == NULL);
-
- if (is_group_commit_leader)
- {
- TC_group_commit_entry *current;
-
- pthread_mutex_lock(&LOCK_group_commit);
- DEBUG_SYNC(thd, "commit_after_get_LOCK_group_commit");
-
- pthread_mutex_lock(&LOCK_prepare_ordered);
- TC_group_commit_entry *queue= group_commit_queue;
- group_commit_queue= NULL;
- pthread_mutex_unlock(&LOCK_prepare_ordered);
-
- /*
- Since we enqueue at the head, the queue is actually in reverse order.
- So reverse it back into correct commit order before returning.
- */
- queue= reverse_queue(queue);
-
- /* The first in the queue is the leader. */
- DBUG_ASSERT(queue == &entry && queue->thd == thd);
-
- DEBUG_SYNC(thd, "commit_before_group_log_xid");
- /* This will set individual error codes in each thd->xid_error. */
- group_log_xid(queue);
- DEBUG_SYNC(thd, "commit_after_group_log_xid");
-
- /*
- Call commit_ordered methods for all transactions in the queue
- (that did not get an error in group_log_xid()).
-
- We do this under an additional global LOCK_commit_ordered; this is
- so that transactions that do not need 2-phase commit do not have
- to wait for the potentially long duration of LOCK_group_commit.
- */
- current= queue;
-
- DEBUG_SYNC(thd, "commit_before_get_LOCK_commit_ordered");
- pthread_mutex_lock(&LOCK_commit_ordered);
- /*
- We cannot unlock LOCK_group_commit until we have locked
- LOCK_commit_ordered; otherwise scheduling could allow the next
- group commit to run ahead of us, messing up the order of
- commit_ordered() calls. But as soon as LOCK_commit_ordered is
- obtained, we can let the next group commit start.
- */
- pthread_mutex_unlock(&LOCK_group_commit);
- DEBUG_SYNC(thd, "commit_after_release_LOCK_group_commit");
-
- ++num_group_commits;
- do
- {
- DEBUG_SYNC(thd, "commit_loop_entry_commit_ordered");
- ++num_commits;
- if (!current->xid_error)
- run_commit_ordered(current->thd, current->all);
-
- /*
- Careful not to access current->next_commit_ordered after waking up
- the other thread! As it may change immediately after wakeup.
- */
- TC_group_commit_entry *next= current->next;
- if (current != &entry) // Don't wake up ourself
- current->thd->signal_wakeup_ready();
- current= next;
- } while (current != NULL);
- DEBUG_SYNC(thd, "commit_after_group_run_commit_ordered");
-
- pthread_mutex_unlock(&LOCK_commit_ordered);
- }
- else
- {
- /* If not leader, just wait until leader wakes us up. */
- thd->wait_for_wakeup_ready();
- }
-
- /*
- Now that we're back in our own thread context, do any delayed processing
- and error reporting.
- */
- IF_DBUG(err= entry.xid_error;)
- cookie= xid_log_after(&entry);
- /* The cookie must be non-zero in the non-error case. */
- DBUG_ASSERT(err || cookie);
-
- return cookie;
-}
-
-
/********* transaction coordinator log for 2pc - mmap() based solution *******/
/*
@@ -6567,7 +6267,6 @@ int TC_LOG_BINLOG::open(const char *opt_name)
DBUG_ASSERT(total_ha_2pc > 1);
DBUG_ASSERT(opt_name && opt_name[0]);
- TC_LOG_group_commit::init();
pthread_mutex_init(&LOCK_prep_xids, MY_MUTEX_INIT_FAST);
pthread_cond_init (&COND_prep_xids, 0);
@@ -6651,36 +6350,33 @@ void TC_LOG_BINLOG::close()
DBUG_ASSERT(prepared_xids==0);
pthread_mutex_destroy(&LOCK_prep_xids);
pthread_cond_destroy (&COND_prep_xids);
- TC_LOG_group_commit::deinit();
}
/*
Do a binlog log_xid() for a group of transactions, linked through
thd->next_commit_ordered.
*/
-void
-TC_LOG_BINLOG::group_log_xid(TC_group_commit_entry *first)
-{
- DBUG_ENTER("TC_LOG_BINLOG::group_log_xid");
- trx_group_commit_leader(first);
- for (TC_group_commit_entry *entry= first; entry; entry= entry->next)
- {
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton);
- entry->xid_error= trx_data->error;
- }
- DBUG_VOID_RETURN;
-}
-
int
-TC_LOG_BINLOG::xid_log_after(TC_group_commit_entry *entry)
+TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
+ bool need_prepare_ordered __attribute__((unused)),
+ bool need_commit_ordered __attribute__((unused)))
{
+ int err;
+ DBUG_ENTER("TC_LOG_BINLOG::log_and_order");
+
binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton);
- if (trx_group_commit_finish(trx_data))
- return 0; // Returning zero cookie signals error
+ (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+
+ trx_data->using_xa= TRUE;
+ if (xid)
+ {
+ Xid_log_event xid_event(thd, xid);
+ err= binlog_flush_trx_cache(thd, trx_data, &xid_event, all);
+ }
else
- return 1;
+ err= binlog_flush_trx_cache(thd, trx_data, NULL, all);
+
+ DBUG_RETURN(!err);
}
/*