diff options
author | unknown <knielsen@knielsen-hq.org> | 2010-10-29 13:58:47 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2010-10-29 13:58:47 +0200 |
commit | 5614ebe7ed8e56cbd345158395c1c1930b0752d1 (patch) | |
tree | bfa51ddf7459c7333d3a335c97af811dc0f05e0e /sql/log.cc | |
parent | b91ad17cea8572f7cd21fb97d0b0ddce9f8b3c46 (diff) | |
download | mariadb-git-5614ebe7ed8e56cbd345158395c1c1930b0752d1.tar.gz |
MWL#116: after-architecture-review code refactoring and cleanup.
Remove the extra class hierarchy with classes TC_LOG_queued, TC_LOG_unordered,
and TC_LOG_group_commit, folding the code into the TC_LOG_MMAP and
TC_LOG_BINLOG classes. In particular TC_LOG_BINLOG is greatly simplified by
this, unifying the code path for transactional and non-transactional
commit.
Remove unnecessary locking of LOCK_log in MYSQL_BIN_LOG::write() (backport
of same fix from mysql-5.5).
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 658 |
1 files changed, 177 insertions, 481 deletions
diff --git a/sql/log.cc b/sql/log.cc index e29758b7f0b..f2884c1ad38 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -155,19 +155,14 @@ class binlog_trx_data { public: binlog_trx_data() : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0), - before_stmt_pos(MY_OFF_T_UNDEF), using_xa(0), commit_bin_log_file_pos(0) + before_stmt_pos(MY_OFF_T_UNDEF), commit_bin_log_file_pos(0), using_xa(0) { trans_log.end_of_file= max_binlog_cache_size; - (void) my_pthread_mutex_init(&LOCK_binlog_participant, MY_MUTEX_INIT_SLOW, - "LOCK_binlog_participant", MYF(0)); - (void) pthread_cond_init(&COND_binlog_participant, 0); } ~binlog_trx_data() { DBUG_ASSERT(pending() == NULL); - (void) pthread_cond_destroy(&COND_binlog_participant); - (void) pthread_mutex_destroy(&LOCK_binlog_participant); close_cached_file(&trans_log); } @@ -265,46 +260,17 @@ public: Binlog position before the start of the current statement. */ my_off_t before_stmt_pos; - - /* 0 or error when writing to binlog; set during group commit. */ - int error; - /* If error != 0, value of errno (for my_error() reporting). */ - int commit_errno; - /* Link for queueing transactions up for group commit to binlog. */ - binlog_trx_data *next; - /* - Flag set true when group commit for this transaction is finished; used - with pthread_cond_wait() to wait until commit is done. - This flag is protected by LOCK_binlog_participant. - */ - bool done; /* - Flag set if this transaction is the group commit leader that will handle - the actual writing to the binlog. - This flag is protected by LOCK_binlog_participant. + Binlog position after current commit, available to storage engines during + commit_ordered() and commit(). */ - bool group_commit_leader; + ulonglong commit_bin_log_file_pos; + /* Flag set true if this transaction is committed with log_xid() as part of XA, false if not. */ bool using_xa; - /* - Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be - written during group commit. The incident_event is only valid if - has_incident() is true. - */ - Log_event *begin_event; - Log_event *end_event; - Log_event *incident_event; - /* Mutex and condition for wakeup after group commit. */ - pthread_mutex_t LOCK_binlog_participant; - pthread_cond_t COND_binlog_participant; - /* - Binlog position after current commit, available to storage engines during - commit() and commit_ordered(). - */ - ulonglong commit_bin_log_file_pos; }; handlerton *binlog_hton; @@ -1441,30 +1407,6 @@ static int binlog_close_connection(handlerton *hton, THD *thd) return 0; } -/* Helper functions for binlog_flush_trx_cache(). */ -static int -binlog_flush_trx_cache_prepare(THD *thd) -{ - if (thd->binlog_flush_pending_rows_event(TRUE)) - return 1; - return 0; -} - -static void -binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data) -{ - IO_CACHE *trans_log= &trx_data->trans_log; - - trx_data->reset(); - - statistic_increment(binlog_cache_use, &LOCK_status); - if (trans_log->disk_writes != 0) - { - statistic_increment(binlog_cache_disk_use, &LOCK_status); - trans_log->disk_writes= 0; - } -} - /* End a transaction, writing events to the binary log. @@ -1487,14 +1429,15 @@ binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data) */ static int binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev) + Log_event *end_ev, bool all) { DBUG_ENTER("binlog_flush_trx_cache"); + IO_CACHE *trans_log= &trx_data->trans_log; DBUG_PRINT("info", ("thd->options={ %s%s}", FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), FLAGSTR(thd->options, OPTION_BEGIN))); - if (binlog_flush_trx_cache_prepare(thd)) + if (thd->binlog_flush_pending_rows_event(TRUE)) DBUG_RETURN(1); /* @@ -1507,9 +1450,17 @@ binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data, were, we would have to ensure that we're not ending a statement inside a stored function. */ - int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, end_ev); + int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, + end_ev, all); - binlog_flush_trx_cache_finish(thd, trx_data); + trx_data->reset(); + + statistic_increment(binlog_cache_use, &LOCK_status); + if (trans_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + trans_log->disk_writes= 0; + } DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); DBUG_RETURN(error); @@ -1578,51 +1529,11 @@ static LEX_STRING const write_error_msg= static int binlog_prepare(handlerton *hton, THD *thd, bool all) { /* - If this prepare is for a single statement in the middle of a transactions, - not the actual transaction commit, then we do nothing. The real work is - only done later, in the prepare for making persistent changes. + do nothing. + just pretend we can do 2pc, so that MySQL won't + switch to 1pc. + real work will be done in MYSQL_BIN_LOG::log_and_order() */ - if (!all && (thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) - return 0; - - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - - trx_data->using_xa= TRUE; - - if (binlog_flush_trx_cache_prepare(thd)) - return 1; - - my_xid xid= thd->transaction.xid_state.xid.get_my_xid(); - if (!xid) - { - /* Skip logging this transaction, marked by setting end_event to NULL. */ - trx_data->end_event= NULL; - return 0; - } - - /* - Allocate the extra events that will be logged to the binlog in binlog group - commit. Use placement new to allocate them on the THD memroot, as they need - to remain live until log_xid() returns. - */ - size_t needed_size= sizeof(Query_log_event) + sizeof(Xid_log_event); - if (trx_data->has_incident()) - needed_size+= sizeof(Incident_log_event); - uchar *mem= (uchar *)thd->alloc(needed_size); - if (!mem) - return 1; - - trx_data->begin_event= new ((void *)mem) - Query_log_event(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); - mem+= sizeof(Query_log_event); - - trx_data->end_event= new ((void *)mem) Xid_log_event(thd, xid); - - if (trx_data->has_incident()) - trx_data->incident_event= new ((void *)(mem + sizeof(Xid_log_event))) - Incident_log_event(thd, INCIDENT_LOST_EVENTS, write_error_msg); - return 0; } @@ -1646,11 +1557,11 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) binlog_trx_data *const trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - if (trx_data->using_xa) + if (trx_data->empty()) { // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid() - binlog_flush_trx_cache_finish(thd, trx_data); - DBUG_RETURN(error); + trx_data->reset(); + DBUG_RETURN(0); } /* @@ -1673,7 +1584,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) !stmt_has_updated_trans_table(thd) && stmt_has_updated_non_trans_table(thd))) { Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); - error= binlog_flush_trx_cache(thd, trx_data, &end_ev); + error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all); } trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0; @@ -1757,7 +1668,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) thd->current_stmt_binlog_row_based)) { Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); - error= binlog_flush_trx_cache(thd, trx_data, &end_ev); + error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all); } /* Otherwise, we simply truncate the cache as there is no change on @@ -2599,6 +2510,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name, MYSQL_BIN_LOG::MYSQL_BIN_LOG() :bytes_written(0), prepared_xids(0), file_id(1), open_count(1), need_start_event(TRUE), + group_commit_queue(0), num_commits(0), num_group_commits(0), is_relay_log(0), description_event_for_exec(0), description_event_for_queue(0) { @@ -2626,7 +2538,6 @@ void MYSQL_BIN_LOG::cleanup() delete description_event_for_exec; (void) pthread_mutex_destroy(&LOCK_log); (void) pthread_mutex_destroy(&LOCK_index); - (void) pthread_mutex_destroy(&LOCK_queue); (void) pthread_cond_destroy(&update_cond); } DBUG_VOID_RETURN; @@ -2655,8 +2566,6 @@ void MYSQL_BIN_LOG::init_pthread_objects() */ (void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index", MYF_NO_DEADLOCK_DETECTION); - (void) my_pthread_mutex_init(&LOCK_queue, MY_MUTEX_INIT_FAST, "LOCK_queue", - MYF(0)); (void) pthread_cond_init(&update_cond, 0); } @@ -4461,11 +4370,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) } /* - Flush the pending rows event to the transaction cache or to the - log file. Since this function potentially aquire the LOCK_log - mutex, we do this before aquiring the LOCK_log mutex in this - function. - We only end the statement if we are in a top-level statement. If we are inside a stored function, we do not end the statement since this will close all tables on the slave. @@ -4475,8 +4379,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) if (thd->binlog_flush_pending_rows_event(end_stmt)) DBUG_RETURN(error); - pthread_mutex_lock(&LOCK_log); - /* In most cases this is only called if 'is_open()' is true; in fact this is mostly called if is_open() *was* true a few instructions before, but it @@ -4497,7 +4399,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) thd->lex->sql_command != SQLCOM_SAVEPOINT && !binlog_filter->db_ok(local_db))) { - VOID(pthread_mutex_unlock(&LOCK_log)); DBUG_RETURN(0); } #endif /* HAVE_REPLICATION */ @@ -4539,15 +4440,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) thd->binlog_start_trans_and_stmt(); file= trans_log; } - /* - TODO as Mats suggested, for all the cases above where we write to - trans_log, it sounds unnecessary to lock LOCK_log. We should rather - test first if we want to write to trans_log, and if not, lock - LOCK_log. - */ } #endif /* USING_TRANSACTIONS */ DBUG_PRINT("info",("event type: %d",event_info->get_type_code())); + if (file == &log_file) + pthread_mutex_lock(&LOCK_log); /* No check for auto events flag here - this write method should @@ -4572,7 +4469,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, thd->first_successful_insert_id_in_prev_stmt_for_binlog); if (e.write(file)) - goto err; + goto err_unlock; } if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0) { @@ -4583,13 +4480,13 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) thd->auto_inc_intervals_in_cur_stmt_for_binlog. minimum()); if (e.write(file)) - goto err; + goto err_unlock; } if (thd->rand_used) { Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2); if (e.write(file)) - goto err; + goto err_unlock; } if (thd->user_var_events.elements) { @@ -4604,7 +4501,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) user_var_event->type, user_var_event->charset_number); if (e.write(file)) - goto err; + goto err_unlock; } } } @@ -4616,23 +4513,26 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) if (event_info->write(file) || DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0)) - goto err; + goto err_unlock; if (file == &log_file) // we are writing to the real log (disk) { if (flush_and_sync()) - goto err; + goto err_unlock; signal_update(); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } error=0; +err_unlock: + if (file == &log_file) + pthread_mutex_unlock(&LOCK_log); + err: if (error) set_write_error(thd); } - pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -4957,10 +4857,16 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) bool MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev) + Log_event *end_ev, bool all) { + group_commit_entry entry; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); + entry.thd= thd; + entry.trx_data= trx_data; + entry.error= 0; + entry.all= all; + /* Create the necessary events here, where we have the correct THD (and thread context). @@ -4969,23 +4875,23 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, thread. */ Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); - trx_data->begin_event= &qinfo; - trx_data->end_event= end_ev; + entry.begin_event= &qinfo; + entry.end_event= end_ev; if (trx_data->has_incident()) { Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg); - trx_data->incident_event= &inc_ev; - DBUG_RETURN(write_transaction_to_binlog_events(trx_data)); + entry.incident_event= &inc_ev; + DBUG_RETURN(write_transaction_to_binlog_events(&entry)); } else { - trx_data->incident_event= NULL; - DBUG_RETURN(write_transaction_to_binlog_events(trx_data)); + entry.incident_event= NULL; + DBUG_RETURN(write_transaction_to_binlog_events(&entry)); } } bool -MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data) +MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) { /* To facilitate group commit for the binlog, we first queue up ourselves in @@ -4995,91 +4901,61 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data) the commit and wake them up. */ - pthread_mutex_lock(&trx_data->LOCK_binlog_participant); - - pthread_mutex_lock(&LOCK_queue); - binlog_trx_data *orig_queue= group_commit_queue; - trx_data->next= orig_queue; - group_commit_queue= trx_data; - pthread_mutex_unlock(&LOCK_queue); + entry->thd->clear_wakeup_ready(); + pthread_mutex_lock(&LOCK_prepare_ordered); + group_commit_entry *orig_queue= group_commit_queue; + entry->next= orig_queue; + group_commit_queue= entry; - if (orig_queue != NULL) + if (entry->trx_data->using_xa) { - trx_data->group_commit_leader= FALSE; - trx_data->done= FALSE; - trx_group_commit_participant(trx_data); + DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered"); + run_prepare_ordered(entry->thd, entry->all); + DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered"); } - else - { - trx_data->group_commit_leader= TRUE; - pthread_mutex_unlock(&trx_data->LOCK_binlog_participant); - trx_group_commit_leader(NULL); - } - - return trx_group_commit_finish(trx_data); -} - -/* - Participate as secondary transaction in group commit. - - Another thread is already waiting to obtain the LOCK_log, and should include - this thread in the group commit once the log is obtained. So here we put - ourself in the queue and wait to be signalled that the group commit is done. + pthread_mutex_unlock(&LOCK_prepare_ordered); - Note that this function must be called with trx_data->LOCK_binlog_participant - locked; the mutex will be released before return. -*/ -void -MYSQL_BIN_LOG::trx_group_commit_participant(binlog_trx_data *trx_data) -{ - safe_mutex_assert_owner(&trx_data->LOCK_binlog_participant); + /* + The first in the queue handle group commit for all; the others just wait + to be signalled when group commit is done. + */ + if (orig_queue != NULL) + entry->thd->wait_for_wakeup_ready(); + else + trx_group_commit_leader(entry); - /* Wait until trx_data.done == true and woken up by the leader. */ - while (!trx_data->done) - pthread_cond_wait(&trx_data->COND_binlog_participant, - &trx_data->LOCK_binlog_participant); - pthread_mutex_unlock(&trx_data->LOCK_binlog_participant); -} + if (!entry->error) + return 0; -bool -MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data) -{ - DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_finish"); - DBUG_PRINT("info", ("trx_data->error=%d\n", trx_data->error)); - if (trx_data->error) + switch (entry->error) { - switch (trx_data->error) - { - case ER_ERROR_ON_WRITE: - my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, trx_data->commit_errno); - break; - case ER_ERROR_ON_READ: - my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), - trx_data->trans_log.file_name, trx_data->commit_errno); - break; - default: - /* - There are not (and should not be) any errors thrown not covered above. - But just in case one is added later without updating the above switch - statement, include a catch-all. - */ - my_printf_error(trx_data->error, - "Error writing transaction to binary log: %d", - MYF(ME_NOREFRESH), trx_data->error); - } - + case ER_ERROR_ON_WRITE: + my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, entry->commit_errno); + break; + case ER_ERROR_ON_READ: + my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), + entry->trx_data->trans_log.file_name, entry->commit_errno); + break; + default: /* - Since we return error, this transaction XID will not be committed, so - we need to mark it as not needed for recovery (unlog() is not called - for a transaction if log_xid() fails). - */ - if (trx_data->end_event->get_type_code() == XID_EVENT) - mark_xid_done(); - - DBUG_RETURN(1); + There are not (and should not be) any errors thrown not covered above. + But just in case one is added later without updating the above switch + statement, include a catch-all. + */ + my_printf_error(entry->error, + "Error writing transaction to binary log: %d", + MYF(ME_NOREFRESH), entry->error); } - DBUG_RETURN(0); + /* + Since we return error, this transaction XID will not be committed, so + we need to mark it as not needed for recovery (unlog() is not called + for a transaction if log_xid() fails). + */ + if (entry->trx_data->using_xa) + mark_xid_done(); + + return 1; } /* @@ -5093,69 +4969,36 @@ MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data) */ void -MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) +MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) { + DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader"); uint xid_count= 0; uint write_count= 0; - /* First, put anything from group_log_xid into the queue. */ - binlog_trx_data *full_queue= NULL; - binlog_trx_data **next_ptr= &full_queue; - for (TC_group_commit_entry *entry= first; entry; entry= entry->next) - { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); - - /* Skip log_xid for transactions without xid, marked by NULL end_event. */ - if (!trx_data->end_event) - continue; - - trx_data->error= 0; - *next_ptr= trx_data; - next_ptr= &(trx_data->next); - } - /* - Next, lock the LOCK_log(), and once we get it, add any additional writes + Lock the LOCK_log(), and once we get it, collect any additional writes that queued up while we were waiting. - - Note that if some writer not going through log_xid() comes in and gets the - LOCK_log before us, they will not be able to include us in their group - commit (and they are not able to handle ensuring same commit order between - us and participating transactional storage engines anyway). - - On the other hand, when we get the LOCK_log, we will be able to include - any non-trasactional writes that queued up in our group commit. This - should hopefully not be too big of a problem, as group commit is most - important for the transactional case anyway when durability (fsync) is - enabled. */ VOID(pthread_mutex_lock(&LOCK_log)); + DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log"); - /* - As the queue is in reverse order of entering, reverse the queue as we add - it to the existing one. Note that there is no ordering defined between - transactional and non-transactional commits. - */ - pthread_mutex_lock(&LOCK_queue); - binlog_trx_data *current= group_commit_queue; + pthread_mutex_lock(&LOCK_prepare_ordered); + group_commit_entry *current= group_commit_queue; group_commit_queue= NULL; - pthread_mutex_unlock(&LOCK_queue); - binlog_trx_data *xtra_queue= NULL; + pthread_mutex_unlock(&LOCK_prepare_ordered); + + /* As the queue is in reverse order of entering, reverse it. */ + group_commit_entry *queue= NULL; while (current) { - current->error= 0; - binlog_trx_data *next= current->next; - current->next= xtra_queue; - xtra_queue= current; + group_commit_entry *next= current->next; + current->next= queue; + queue= current; current= next; } - *next_ptr= xtra_queue; + DBUG_ASSERT(leader == queue /* the leader should be first in queue */); - /* - Now we have in full_queue the list of transactions to be committed in - order. - */ + /* Now we have in queue the list of transactions to be committed in order. */ DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { @@ -5169,9 +5012,14 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) current->error and let the thread do the error reporting itself once we wake it up. */ - for (current= full_queue; current != NULL; current= current->next) + for (current= queue; current != NULL; current= current->next) { - IO_CACHE *cache= ¤t->trans_log; + binlog_trx_data *trx_data= current->trx_data; + IO_CACHE *cache= &trx_data->trans_log; + + /* Skip log_xid for transactions without xid, marked by NULL end_event. */ + if (!current->end_event) + continue; /* We only bother to write to the binary log if there is anything @@ -5186,9 +5034,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) write_count++; } - current->commit_bin_log_file_pos= + trx_data->commit_bin_log_file_pos= log_file.pos_in_file + (log_file.write_pos - log_file.write_buffer); - if (current->end_event->get_type_code() == XID_EVENT) + if (trx_data->using_xa) xid_count++; } @@ -5196,7 +5044,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) { if (flush_and_sync()) { - for (current= full_queue; current != NULL; current= current->next) + for (current= queue; current != NULL; current= current->next) { if (!current->error) { @@ -5213,7 +5061,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) /* if any commit_events are Xid_log_event, increase the number of - prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated + prepared_xids (it's decreased in ::unlog()). Binlog cannot be rotated if there're prepared xids in it - see the comment in new_file() for an explanation. If no Xid_log_events (then it's all Query_log_event) rotate binlog, @@ -5227,37 +5075,49 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } - VOID(pthread_mutex_unlock(&LOCK_log)); - + DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered"); + pthread_mutex_lock(&LOCK_commit_ordered); /* - Signal those that are not part of group_log_xid, and are not group leaders - running the queue. + We cannot unlock LOCK_log until we have locked LOCK_commit_ordered; + otherwise scheduling could allow the next group commit to run ahead of us, + messing up the order of commit_ordered() calls. But as soon as + LOCK_commit_ordered is obtained, we can let the next group commit start. + */ + pthread_mutex_unlock(&LOCK_log); + DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log"); + ++num_group_commits; - Since a group leader runs the queue itself if a group_log_xid does not get - to do it forst, such leader threads do not need wait or wakeup. + /* + Wakeup each participant waiting for our group commit, first calling the + commit_ordered() methods for any transactions doing 2-phase commit. */ - for (current= xtra_queue; current != NULL; current= current->next) + current= queue; + while (current != NULL) { - /* - Note that we need to take LOCK_binlog_participant even in the case of a - leader! + DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered"); + ++num_commits; + if (current->trx_data->using_xa && !current->error) + run_commit_ordered(current->thd, current->all); - Otherwise there is a race between setting and testing the - group_commit_leader flag. + /* + Careful not to access current->next after waking up the other thread! As + it may change immediately after wakeup. */ - pthread_mutex_lock(¤t->LOCK_binlog_participant); - if (!current->group_commit_leader) - { - current->done= true; - pthread_cond_signal(¤t->COND_binlog_participant); - } - pthread_mutex_unlock(¤t->LOCK_binlog_participant); + group_commit_entry *next= current->next; + if (current != leader) // Don't wake up ourself + current->thd->signal_wakeup_ready(); + current= next; } + DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered"); + pthread_mutex_unlock(&LOCK_commit_ordered); + + DBUG_VOID_RETURN; } int -MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data) +MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry) { + binlog_trx_data *trx_data= entry->trx_data; IO_CACHE *cache= &trx_data->trans_log; /* Log "BEGIN" at the beginning of every transaction. Here, a transaction is @@ -5272,7 +5132,7 @@ MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data) in wrong positions being shown to the user, MASTER_POS_WAIT undue waiting etc. */ - if (trx_data->begin_event->write(&log_file)) + if (entry->begin_event->write(&log_file)) return ER_ERROR_ON_WRITE; DBUG_EXECUTE_IF("crash_before_writing_xid", @@ -5289,10 +5149,10 @@ MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data) if (write_cache(cache)) return ER_ERROR_ON_WRITE; - if (trx_data->end_event->write(&log_file)) + if (entry->end_event->write(&log_file)) return ER_ERROR_ON_WRITE; - if (trx_data->has_incident() && trx_data->incident_event->write(&log_file)) + if (entry->incident_event && entry->incident_event->write(&log_file)) return ER_ERROR_ON_WRITE; if (cache->error) // Error on read @@ -5754,30 +5614,6 @@ TC_LOG::run_commit_ordered(THD *thd, bool all) } } -TC_LOG_queued::TC_LOG_queued() : group_commit_queue(NULL) -{ -} - -TC_LOG_queued::~TC_LOG_queued() -{ -} - -TC_LOG_queued::TC_group_commit_entry * -TC_LOG_queued::reverse_queue(TC_LOG_queued::TC_group_commit_entry *queue) -{ - TC_group_commit_entry *entry= queue; - TC_group_commit_entry *prev= NULL; - while (entry) - { - TC_group_commit_entry *next= entry->next; - entry->next= prev; - prev= entry; - entry= next; - } - - return prev; -} - int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, bool need_prepare_ordered, bool need_commit_ordered) @@ -5886,142 +5722,6 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, } -TC_LOG_group_commit::TC_LOG_group_commit() - : num_commits(0), num_group_commits(0) -{ -} - -TC_LOG_group_commit::~TC_LOG_group_commit() -{ -} - -void -TC_LOG_group_commit::init() -{ - my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW, - "LOCK_group_commit", MYF(0)); -} - -void -TC_LOG_group_commit::deinit() -{ - pthread_mutex_destroy(&LOCK_group_commit); -} - -int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all, - bool need_prepare_ordered, - bool need_commit_ordered) -{ - IF_DBUG(int err;) - int cookie; - struct TC_group_commit_entry entry; - bool is_group_commit_leader; - - thd->clear_wakeup_ready(); - entry.thd= thd; - entry.all= all; - entry.xid_error= 0; - - pthread_mutex_lock(&LOCK_prepare_ordered); - TC_group_commit_entry *previous_queue= group_commit_queue; - entry.next= previous_queue; - group_commit_queue= &entry; - - DEBUG_SYNC(thd, "commit_before_prepare_ordered"); - run_prepare_ordered(thd, all); - DEBUG_SYNC(thd, "commit_after_prepare_ordered"); - pthread_mutex_unlock(&LOCK_prepare_ordered); - - is_group_commit_leader= (previous_queue == NULL); - - if (is_group_commit_leader) - { - TC_group_commit_entry *current; - - pthread_mutex_lock(&LOCK_group_commit); - DEBUG_SYNC(thd, "commit_after_get_LOCK_group_commit"); - - pthread_mutex_lock(&LOCK_prepare_ordered); - TC_group_commit_entry *queue= group_commit_queue; - group_commit_queue= NULL; - pthread_mutex_unlock(&LOCK_prepare_ordered); - - /* - Since we enqueue at the head, the queue is actually in reverse order. - So reverse it back into correct commit order before returning. - */ - queue= reverse_queue(queue); - - /* The first in the queue is the leader. */ - DBUG_ASSERT(queue == &entry && queue->thd == thd); - - DEBUG_SYNC(thd, "commit_before_group_log_xid"); - /* This will set individual error codes in each thd->xid_error. */ - group_log_xid(queue); - DEBUG_SYNC(thd, "commit_after_group_log_xid"); - - /* - Call commit_ordered methods for all transactions in the queue - (that did not get an error in group_log_xid()). - - We do this under an additional global LOCK_commit_ordered; this is - so that transactions that do not need 2-phase commit do not have - to wait for the potentially long duration of LOCK_group_commit. - */ - current= queue; - - DEBUG_SYNC(thd, "commit_before_get_LOCK_commit_ordered"); - pthread_mutex_lock(&LOCK_commit_ordered); - /* - We cannot unlock LOCK_group_commit until we have locked - LOCK_commit_ordered; otherwise scheduling could allow the next - group commit to run ahead of us, messing up the order of - commit_ordered() calls. But as soon as LOCK_commit_ordered is - obtained, we can let the next group commit start. - */ - pthread_mutex_unlock(&LOCK_group_commit); - DEBUG_SYNC(thd, "commit_after_release_LOCK_group_commit"); - - ++num_group_commits; - do - { - DEBUG_SYNC(thd, "commit_loop_entry_commit_ordered"); - ++num_commits; - if (!current->xid_error) - run_commit_ordered(current->thd, current->all); - - /* - Careful not to access current->next_commit_ordered after waking up - the other thread! As it may change immediately after wakeup. - */ - TC_group_commit_entry *next= current->next; - if (current != &entry) // Don't wake up ourself - current->thd->signal_wakeup_ready(); - current= next; - } while (current != NULL); - DEBUG_SYNC(thd, "commit_after_group_run_commit_ordered"); - - pthread_mutex_unlock(&LOCK_commit_ordered); - } - else - { - /* If not leader, just wait until leader wakes us up. */ - thd->wait_for_wakeup_ready(); - } - - /* - Now that we're back in our own thread context, do any delayed processing - and error reporting. - */ - IF_DBUG(err= entry.xid_error;) - cookie= xid_log_after(&entry); - /* The cookie must be non-zero in the non-error case. */ - DBUG_ASSERT(err || cookie); - - return cookie; -} - - /********* transaction coordinator log for 2pc - mmap() based solution *******/ /* @@ -6567,7 +6267,6 @@ int TC_LOG_BINLOG::open(const char *opt_name) DBUG_ASSERT(total_ha_2pc > 1); DBUG_ASSERT(opt_name && opt_name[0]); - TC_LOG_group_commit::init(); pthread_mutex_init(&LOCK_prep_xids, MY_MUTEX_INIT_FAST); pthread_cond_init (&COND_prep_xids, 0); @@ -6651,36 +6350,33 @@ void TC_LOG_BINLOG::close() DBUG_ASSERT(prepared_xids==0); pthread_mutex_destroy(&LOCK_prep_xids); pthread_cond_destroy (&COND_prep_xids); - TC_LOG_group_commit::deinit(); } /* Do a binlog log_xid() for a group of transactions, linked through thd->next_commit_ordered. */ -void -TC_LOG_BINLOG::group_log_xid(TC_group_commit_entry *first) -{ - DBUG_ENTER("TC_LOG_BINLOG::group_log_xid"); - trx_group_commit_leader(first); - for (TC_group_commit_entry *entry= first; entry; entry= entry->next) - { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); - entry->xid_error= trx_data->error; - } - DBUG_VOID_RETURN; -} - int -TC_LOG_BINLOG::xid_log_after(TC_group_commit_entry *entry) +TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered __attribute__((unused)), + bool need_commit_ordered __attribute__((unused))) { + int err; + DBUG_ENTER("TC_LOG_BINLOG::log_and_order"); + binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); - if (trx_group_commit_finish(trx_data)) - return 0; // Returning zero cookie signals error + (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + + trx_data->using_xa= TRUE; + if (xid) + { + Xid_log_event xid_event(thd, xid); + err= binlog_flush_trx_cache(thd, trx_data, &xid_event, all); + } else - return 1; + err= binlog_flush_trx_cache(thd, trx_data, NULL, all); + + DBUG_RETURN(!err); } /* |