diff options
-rw-r--r-- | mysql-test/r/group_commit.result | 6 | ||||
-rw-r--r-- | mysql-test/r/group_commit_binlog_pos.result | 2 | ||||
-rw-r--r-- | mysql-test/t/group_commit.test | 6 | ||||
-rw-r--r-- | mysql-test/t/group_commit_binlog_pos.test | 2 | ||||
-rw-r--r-- | sql/log.cc | 658 | ||||
-rw-r--r-- | sql/log.h | 143 |
6 files changed, 218 insertions, 599 deletions
diff --git a/mysql-test/r/group_commit.result b/mysql-test/r/group_commit.result index c7993227f8f..9e80dc6da6e 100644 --- a/mysql-test/r/group_commit.result +++ b/mysql-test/r/group_commit.result @@ -3,11 +3,11 @@ SELECT variable_value INTO @commits FROM information_schema.global_status WHERE variable_name = 'binlog_commits'; SELECT variable_value INTO @group_commits FROM information_schema.global_status WHERE variable_name = 'binlog_group_commits'; -SET DEBUG_SYNC= "commit_after_group_log_xid SIGNAL group1_running WAIT_FOR group2_queued"; +SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group1_running WAIT_FOR group2_queued"; INSERT INTO t1 VALUES ("con1"); set DEBUG_SYNC= "now WAIT_FOR group1_running"; SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL group2_con2"; -SET DEBUG_SYNC= "commit_after_release_LOCK_group_commit WAIT_FOR group3_committed"; +SET DEBUG_SYNC= "commit_after_release_LOCK_log WAIT_FOR group3_committed"; SET DEBUG_SYNC= "commit_after_group_run_commit_ordered SIGNAL group2_visible WAIT_FOR group2_checked"; INSERT INTO t1 VALUES ("con2"); SET DEBUG_SYNC= "now WAIT_FOR group2_con2"; @@ -25,7 +25,7 @@ SELECT * FROM t1 ORDER BY a; a con1 SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group3_con5"; -SET DEBUG_SYNC= "commit_after_get_LOCK_group_commit SIGNAL con5_leader WAIT_FOR con6_queued"; +SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con5_leader WAIT_FOR con6_queued"; INSERT INTO t1 VALUES ("con5"); SET DEBUG_SYNC= "now WAIT_FOR con5_leader"; SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL con6_queued"; diff --git a/mysql-test/r/group_commit_binlog_pos.result b/mysql-test/r/group_commit_binlog_pos.result index a0bb5ee2d8e..67ae30bbb79 100644 --- a/mysql-test/r/group_commit_binlog_pos.result +++ b/mysql-test/r/group_commit_binlog_pos.result @@ -1,6 +1,6 @@ CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb; INSERT INTO t1 VALUES (0); -SET DEBUG_SYNC= "commit_after_get_LOCK_group_commit SIGNAL con1_waiting WAIT_FOR con3_queued"; +SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con1_waiting WAIT_FOR con3_queued"; SET DEBUG_SYNC= "commit_loop_entry_commit_ordered SIGNAL con1_loop WAIT_FOR con1_loop_cont EXECUTE 3"; INSERT INTO t1 VALUES (1); SET DEBUG_SYNC= "now WAIT_FOR con1_waiting"; diff --git a/mysql-test/t/group_commit.test b/mysql-test/t/group_commit.test index df4ea6654d4..7c87c166844 100644 --- a/mysql-test/t/group_commit.test +++ b/mysql-test/t/group_commit.test @@ -27,7 +27,7 @@ connect(con6,localhost,root,,); # group2 to queue up before finishing. connection con1; -SET DEBUG_SYNC= "commit_after_group_log_xid SIGNAL group1_running WAIT_FOR group2_queued"; +SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group1_running WAIT_FOR group2_queued"; send INSERT INTO t1 VALUES ("con1"); # Make group2 (with three threads) queue up. @@ -37,7 +37,7 @@ send INSERT INTO t1 VALUES ("con1"); connection con2; set DEBUG_SYNC= "now WAIT_FOR group1_running"; SET DEBUG_SYNC= "commit_after_prepare_ordered SIGNAL group2_con2"; -SET DEBUG_SYNC= "commit_after_release_LOCK_group_commit WAIT_FOR group3_committed"; +SET DEBUG_SYNC= "commit_after_release_LOCK_log WAIT_FOR group3_committed"; SET DEBUG_SYNC= "commit_after_group_run_commit_ordered SIGNAL group2_visible WAIT_FOR group2_checked"; send INSERT INTO t1 VALUES ("con2"); connection con3; @@ -69,7 +69,7 @@ SELECT * FROM t1 ORDER BY a; connection con5; SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group3_con5"; -SET DEBUG_SYNC= "commit_after_get_LOCK_group_commit SIGNAL con5_leader WAIT_FOR con6_queued"; +SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con5_leader WAIT_FOR con6_queued"; send INSERT INTO t1 VALUES ("con5"); connection con6; diff --git a/mysql-test/t/group_commit_binlog_pos.test b/mysql-test/t/group_commit_binlog_pos.test index f8c5e719f11..00cf6ab685f 100644 --- a/mysql-test/t/group_commit_binlog_pos.test +++ b/mysql-test/t/group_commit_binlog_pos.test @@ -23,7 +23,7 @@ connect(con3,localhost,root,,); # Queue up three commits for group commit. connection con1; -SET DEBUG_SYNC= "commit_after_get_LOCK_group_commit SIGNAL con1_waiting WAIT_FOR con3_queued"; +SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con1_waiting WAIT_FOR con3_queued"; SET DEBUG_SYNC= "commit_loop_entry_commit_ordered SIGNAL con1_loop WAIT_FOR con1_loop_cont EXECUTE 3"; send INSERT INTO t1 VALUES (1); diff --git a/sql/log.cc b/sql/log.cc index e29758b7f0b..f2884c1ad38 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -155,19 +155,14 @@ class binlog_trx_data { public: binlog_trx_data() : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0), - before_stmt_pos(MY_OFF_T_UNDEF), using_xa(0), commit_bin_log_file_pos(0) + before_stmt_pos(MY_OFF_T_UNDEF), commit_bin_log_file_pos(0), using_xa(0) { trans_log.end_of_file= max_binlog_cache_size; - (void) my_pthread_mutex_init(&LOCK_binlog_participant, MY_MUTEX_INIT_SLOW, - "LOCK_binlog_participant", MYF(0)); - (void) pthread_cond_init(&COND_binlog_participant, 0); } ~binlog_trx_data() { DBUG_ASSERT(pending() == NULL); - (void) pthread_cond_destroy(&COND_binlog_participant); - (void) pthread_mutex_destroy(&LOCK_binlog_participant); close_cached_file(&trans_log); } @@ -265,46 +260,17 @@ public: Binlog position before the start of the current statement. */ my_off_t before_stmt_pos; - - /* 0 or error when writing to binlog; set during group commit. */ - int error; - /* If error != 0, value of errno (for my_error() reporting). */ - int commit_errno; - /* Link for queueing transactions up for group commit to binlog. */ - binlog_trx_data *next; - /* - Flag set true when group commit for this transaction is finished; used - with pthread_cond_wait() to wait until commit is done. - This flag is protected by LOCK_binlog_participant. - */ - bool done; /* - Flag set if this transaction is the group commit leader that will handle - the actual writing to the binlog. - This flag is protected by LOCK_binlog_participant. + Binlog position after current commit, available to storage engines during + commit_ordered() and commit(). */ - bool group_commit_leader; + ulonglong commit_bin_log_file_pos; + /* Flag set true if this transaction is committed with log_xid() as part of XA, false if not. */ bool using_xa; - /* - Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be - written during group commit. The incident_event is only valid if - has_incident() is true. - */ - Log_event *begin_event; - Log_event *end_event; - Log_event *incident_event; - /* Mutex and condition for wakeup after group commit. */ - pthread_mutex_t LOCK_binlog_participant; - pthread_cond_t COND_binlog_participant; - /* - Binlog position after current commit, available to storage engines during - commit() and commit_ordered(). - */ - ulonglong commit_bin_log_file_pos; }; handlerton *binlog_hton; @@ -1441,30 +1407,6 @@ static int binlog_close_connection(handlerton *hton, THD *thd) return 0; } -/* Helper functions for binlog_flush_trx_cache(). */ -static int -binlog_flush_trx_cache_prepare(THD *thd) -{ - if (thd->binlog_flush_pending_rows_event(TRUE)) - return 1; - return 0; -} - -static void -binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data) -{ - IO_CACHE *trans_log= &trx_data->trans_log; - - trx_data->reset(); - - statistic_increment(binlog_cache_use, &LOCK_status); - if (trans_log->disk_writes != 0) - { - statistic_increment(binlog_cache_disk_use, &LOCK_status); - trans_log->disk_writes= 0; - } -} - /* End a transaction, writing events to the binary log. @@ -1487,14 +1429,15 @@ binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data) */ static int binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev) + Log_event *end_ev, bool all) { DBUG_ENTER("binlog_flush_trx_cache"); + IO_CACHE *trans_log= &trx_data->trans_log; DBUG_PRINT("info", ("thd->options={ %s%s}", FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), FLAGSTR(thd->options, OPTION_BEGIN))); - if (binlog_flush_trx_cache_prepare(thd)) + if (thd->binlog_flush_pending_rows_event(TRUE)) DBUG_RETURN(1); /* @@ -1507,9 +1450,17 @@ binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data, were, we would have to ensure that we're not ending a statement inside a stored function. */ - int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, end_ev); + int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, + end_ev, all); - binlog_flush_trx_cache_finish(thd, trx_data); + trx_data->reset(); + + statistic_increment(binlog_cache_use, &LOCK_status); + if (trans_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + trans_log->disk_writes= 0; + } DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); DBUG_RETURN(error); @@ -1578,51 +1529,11 @@ static LEX_STRING const write_error_msg= static int binlog_prepare(handlerton *hton, THD *thd, bool all) { /* - If this prepare is for a single statement in the middle of a transactions, - not the actual transaction commit, then we do nothing. The real work is - only done later, in the prepare for making persistent changes. + do nothing. + just pretend we can do 2pc, so that MySQL won't + switch to 1pc. + real work will be done in MYSQL_BIN_LOG::log_and_order() */ - if (!all && (thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) - return 0; - - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - - trx_data->using_xa= TRUE; - - if (binlog_flush_trx_cache_prepare(thd)) - return 1; - - my_xid xid= thd->transaction.xid_state.xid.get_my_xid(); - if (!xid) - { - /* Skip logging this transaction, marked by setting end_event to NULL. */ - trx_data->end_event= NULL; - return 0; - } - - /* - Allocate the extra events that will be logged to the binlog in binlog group - commit. Use placement new to allocate them on the THD memroot, as they need - to remain live until log_xid() returns. - */ - size_t needed_size= sizeof(Query_log_event) + sizeof(Xid_log_event); - if (trx_data->has_incident()) - needed_size+= sizeof(Incident_log_event); - uchar *mem= (uchar *)thd->alloc(needed_size); - if (!mem) - return 1; - - trx_data->begin_event= new ((void *)mem) - Query_log_event(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); - mem+= sizeof(Query_log_event); - - trx_data->end_event= new ((void *)mem) Xid_log_event(thd, xid); - - if (trx_data->has_incident()) - trx_data->incident_event= new ((void *)(mem + sizeof(Xid_log_event))) - Incident_log_event(thd, INCIDENT_LOST_EVENTS, write_error_msg); - return 0; } @@ -1646,11 +1557,11 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) binlog_trx_data *const trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - if (trx_data->using_xa) + if (trx_data->empty()) { // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid() - binlog_flush_trx_cache_finish(thd, trx_data); - DBUG_RETURN(error); + trx_data->reset(); + DBUG_RETURN(0); } /* @@ -1673,7 +1584,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) !stmt_has_updated_trans_table(thd) && stmt_has_updated_non_trans_table(thd))) { Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); - error= binlog_flush_trx_cache(thd, trx_data, &end_ev); + error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all); } trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0; @@ -1757,7 +1668,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) thd->current_stmt_binlog_row_based)) { Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); - error= binlog_flush_trx_cache(thd, trx_data, &end_ev); + error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all); } /* Otherwise, we simply truncate the cache as there is no change on @@ -2599,6 +2510,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name, MYSQL_BIN_LOG::MYSQL_BIN_LOG() :bytes_written(0), prepared_xids(0), file_id(1), open_count(1), need_start_event(TRUE), + group_commit_queue(0), num_commits(0), num_group_commits(0), is_relay_log(0), description_event_for_exec(0), description_event_for_queue(0) { @@ -2626,7 +2538,6 @@ void MYSQL_BIN_LOG::cleanup() delete description_event_for_exec; (void) pthread_mutex_destroy(&LOCK_log); (void) pthread_mutex_destroy(&LOCK_index); - (void) pthread_mutex_destroy(&LOCK_queue); (void) pthread_cond_destroy(&update_cond); } DBUG_VOID_RETURN; @@ -2655,8 +2566,6 @@ void MYSQL_BIN_LOG::init_pthread_objects() */ (void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index", MYF_NO_DEADLOCK_DETECTION); - (void) my_pthread_mutex_init(&LOCK_queue, MY_MUTEX_INIT_FAST, "LOCK_queue", - MYF(0)); (void) pthread_cond_init(&update_cond, 0); } @@ -4461,11 +4370,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) } /* - Flush the pending rows event to the transaction cache or to the - log file. Since this function potentially aquire the LOCK_log - mutex, we do this before aquiring the LOCK_log mutex in this - function. - We only end the statement if we are in a top-level statement. If we are inside a stored function, we do not end the statement since this will close all tables on the slave. @@ -4475,8 +4379,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) if (thd->binlog_flush_pending_rows_event(end_stmt)) DBUG_RETURN(error); - pthread_mutex_lock(&LOCK_log); - /* In most cases this is only called if 'is_open()' is true; in fact this is mostly called if is_open() *was* true a few instructions before, but it @@ -4497,7 +4399,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) thd->lex->sql_command != SQLCOM_SAVEPOINT && !binlog_filter->db_ok(local_db))) { - VOID(pthread_mutex_unlock(&LOCK_log)); DBUG_RETURN(0); } #endif /* HAVE_REPLICATION */ @@ -4539,15 +4440,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) thd->binlog_start_trans_and_stmt(); file= trans_log; } - /* - TODO as Mats suggested, for all the cases above where we write to - trans_log, it sounds unnecessary to lock LOCK_log. We should rather - test first if we want to write to trans_log, and if not, lock - LOCK_log. - */ } #endif /* USING_TRANSACTIONS */ DBUG_PRINT("info",("event type: %d",event_info->get_type_code())); + if (file == &log_file) + pthread_mutex_lock(&LOCK_log); /* No check for auto events flag here - this write method should @@ -4572,7 +4469,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, thd->first_successful_insert_id_in_prev_stmt_for_binlog); if (e.write(file)) - goto err; + goto err_unlock; } if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0) { @@ -4583,13 +4480,13 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) thd->auto_inc_intervals_in_cur_stmt_for_binlog. minimum()); if (e.write(file)) - goto err; + goto err_unlock; } if (thd->rand_used) { Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2); if (e.write(file)) - goto err; + goto err_unlock; } if (thd->user_var_events.elements) { @@ -4604,7 +4501,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) user_var_event->type, user_var_event->charset_number); if (e.write(file)) - goto err; + goto err_unlock; } } } @@ -4616,23 +4513,26 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) if (event_info->write(file) || DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0)) - goto err; + goto err_unlock; if (file == &log_file) // we are writing to the real log (disk) { if (flush_and_sync()) - goto err; + goto err_unlock; signal_update(); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } error=0; +err_unlock: + if (file == &log_file) + pthread_mutex_unlock(&LOCK_log); + err: if (error) set_write_error(thd); } - pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -4957,10 +4857,16 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) bool MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev) + Log_event *end_ev, bool all) { + group_commit_entry entry; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); + entry.thd= thd; + entry.trx_data= trx_data; + entry.error= 0; + entry.all= all; + /* Create the necessary events here, where we have the correct THD (and thread context). @@ -4969,23 +4875,23 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, thread. */ Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); - trx_data->begin_event= &qinfo; - trx_data->end_event= end_ev; + entry.begin_event= &qinfo; + entry.end_event= end_ev; if (trx_data->has_incident()) { Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg); - trx_data->incident_event= &inc_ev; - DBUG_RETURN(write_transaction_to_binlog_events(trx_data)); + entry.incident_event= &inc_ev; + DBUG_RETURN(write_transaction_to_binlog_events(&entry)); } else { - trx_data->incident_event= NULL; - DBUG_RETURN(write_transaction_to_binlog_events(trx_data)); + entry.incident_event= NULL; + DBUG_RETURN(write_transaction_to_binlog_events(&entry)); } } bool -MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data) +MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) { /* To facilitate group commit for the binlog, we first queue up ourselves in @@ -4995,91 +4901,61 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data) the commit and wake them up. */ - pthread_mutex_lock(&trx_data->LOCK_binlog_participant); - - pthread_mutex_lock(&LOCK_queue); - binlog_trx_data *orig_queue= group_commit_queue; - trx_data->next= orig_queue; - group_commit_queue= trx_data; - pthread_mutex_unlock(&LOCK_queue); + entry->thd->clear_wakeup_ready(); + pthread_mutex_lock(&LOCK_prepare_ordered); + group_commit_entry *orig_queue= group_commit_queue; + entry->next= orig_queue; + group_commit_queue= entry; - if (orig_queue != NULL) + if (entry->trx_data->using_xa) { - trx_data->group_commit_leader= FALSE; - trx_data->done= FALSE; - trx_group_commit_participant(trx_data); + DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered"); + run_prepare_ordered(entry->thd, entry->all); + DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered"); } - else - { - trx_data->group_commit_leader= TRUE; - pthread_mutex_unlock(&trx_data->LOCK_binlog_participant); - trx_group_commit_leader(NULL); - } - - return trx_group_commit_finish(trx_data); -} - -/* - Participate as secondary transaction in group commit. - - Another thread is already waiting to obtain the LOCK_log, and should include - this thread in the group commit once the log is obtained. So here we put - ourself in the queue and wait to be signalled that the group commit is done. + pthread_mutex_unlock(&LOCK_prepare_ordered); - Note that this function must be called with trx_data->LOCK_binlog_participant - locked; the mutex will be released before return. -*/ -void -MYSQL_BIN_LOG::trx_group_commit_participant(binlog_trx_data *trx_data) -{ - safe_mutex_assert_owner(&trx_data->LOCK_binlog_participant); + /* + The first in the queue handle group commit for all; the others just wait + to be signalled when group commit is done. + */ + if (orig_queue != NULL) + entry->thd->wait_for_wakeup_ready(); + else + trx_group_commit_leader(entry); - /* Wait until trx_data.done == true and woken up by the leader. */ - while (!trx_data->done) - pthread_cond_wait(&trx_data->COND_binlog_participant, - &trx_data->LOCK_binlog_participant); - pthread_mutex_unlock(&trx_data->LOCK_binlog_participant); -} + if (!entry->error) + return 0; -bool -MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data) -{ - DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_finish"); - DBUG_PRINT("info", ("trx_data->error=%d\n", trx_data->error)); - if (trx_data->error) + switch (entry->error) { - switch (trx_data->error) - { - case ER_ERROR_ON_WRITE: - my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, trx_data->commit_errno); - break; - case ER_ERROR_ON_READ: - my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), - trx_data->trans_log.file_name, trx_data->commit_errno); - break; - default: - /* - There are not (and should not be) any errors thrown not covered above. - But just in case one is added later without updating the above switch - statement, include a catch-all. - */ - my_printf_error(trx_data->error, - "Error writing transaction to binary log: %d", - MYF(ME_NOREFRESH), trx_data->error); - } - + case ER_ERROR_ON_WRITE: + my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, entry->commit_errno); + break; + case ER_ERROR_ON_READ: + my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), + entry->trx_data->trans_log.file_name, entry->commit_errno); + break; + default: /* - Since we return error, this transaction XID will not be committed, so - we need to mark it as not needed for recovery (unlog() is not called - for a transaction if log_xid() fails). - */ - if (trx_data->end_event->get_type_code() == XID_EVENT) - mark_xid_done(); - - DBUG_RETURN(1); + There are not (and should not be) any errors thrown not covered above. + But just in case one is added later without updating the above switch + statement, include a catch-all. + */ + my_printf_error(entry->error, + "Error writing transaction to binary log: %d", + MYF(ME_NOREFRESH), entry->error); } - DBUG_RETURN(0); + /* + Since we return error, this transaction XID will not be committed, so + we need to mark it as not needed for recovery (unlog() is not called + for a transaction if log_xid() fails). + */ + if (entry->trx_data->using_xa) + mark_xid_done(); + + return 1; } /* @@ -5093,69 +4969,36 @@ MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data) */ void -MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) +MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) { + DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader"); uint xid_count= 0; uint write_count= 0; - /* First, put anything from group_log_xid into the queue. */ - binlog_trx_data *full_queue= NULL; - binlog_trx_data **next_ptr= &full_queue; - for (TC_group_commit_entry *entry= first; entry; entry= entry->next) - { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); - - /* Skip log_xid for transactions without xid, marked by NULL end_event. */ - if (!trx_data->end_event) - continue; - - trx_data->error= 0; - *next_ptr= trx_data; - next_ptr= &(trx_data->next); - } - /* - Next, lock the LOCK_log(), and once we get it, add any additional writes + Lock the LOCK_log(), and once we get it, collect any additional writes that queued up while we were waiting. - - Note that if some writer not going through log_xid() comes in and gets the - LOCK_log before us, they will not be able to include us in their group - commit (and they are not able to handle ensuring same commit order between - us and participating transactional storage engines anyway). - - On the other hand, when we get the LOCK_log, we will be able to include - any non-trasactional writes that queued up in our group commit. This - should hopefully not be too big of a problem, as group commit is most - important for the transactional case anyway when durability (fsync) is - enabled. */ VOID(pthread_mutex_lock(&LOCK_log)); + DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log"); - /* - As the queue is in reverse order of entering, reverse the queue as we add - it to the existing one. Note that there is no ordering defined between - transactional and non-transactional commits. - */ - pthread_mutex_lock(&LOCK_queue); - binlog_trx_data *current= group_commit_queue; + pthread_mutex_lock(&LOCK_prepare_ordered); + group_commit_entry *current= group_commit_queue; group_commit_queue= NULL; - pthread_mutex_unlock(&LOCK_queue); - binlog_trx_data *xtra_queue= NULL; + pthread_mutex_unlock(&LOCK_prepare_ordered); + + /* As the queue is in reverse order of entering, reverse it. */ + group_commit_entry *queue= NULL; while (current) { - current->error= 0; - binlog_trx_data *next= current->next; - current->next= xtra_queue; - xtra_queue= current; + group_commit_entry *next= current->next; + current->next= queue; + queue= current; current= next; } - *next_ptr= xtra_queue; + DBUG_ASSERT(leader == queue /* the leader should be first in queue */); - /* - Now we have in full_queue the list of transactions to be committed in - order. - */ + /* Now we have in queue the list of transactions to be committed in order. */ DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { @@ -5169,9 +5012,14 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) current->error and let the thread do the error reporting itself once we wake it up. */ - for (current= full_queue; current != NULL; current= current->next) + for (current= queue; current != NULL; current= current->next) { - IO_CACHE *cache= ¤t->trans_log; + binlog_trx_data *trx_data= current->trx_data; + IO_CACHE *cache= &trx_data->trans_log; + + /* Skip log_xid for transactions without xid, marked by NULL end_event. */ + if (!current->end_event) + continue; /* We only bother to write to the binary log if there is anything @@ -5186,9 +5034,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) write_count++; } - current->commit_bin_log_file_pos= + trx_data->commit_bin_log_file_pos= log_file.pos_in_file + (log_file.write_pos - log_file.write_buffer); - if (current->end_event->get_type_code() == XID_EVENT) + if (trx_data->using_xa) xid_count++; } @@ -5196,7 +5044,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) { if (flush_and_sync()) { - for (current= full_queue; current != NULL; current= current->next) + for (current= queue; current != NULL; current= current->next) { if (!current->error) { @@ -5213,7 +5061,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) /* if any commit_events are Xid_log_event, increase the number of - prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated + prepared_xids (it's decreased in ::unlog()). Binlog cannot be rotated if there're prepared xids in it - see the comment in new_file() for an explanation. If no Xid_log_events (then it's all Query_log_event) rotate binlog, @@ -5227,37 +5075,49 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } - VOID(pthread_mutex_unlock(&LOCK_log)); - + DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered"); + pthread_mutex_lock(&LOCK_commit_ordered); /* - Signal those that are not part of group_log_xid, and are not group leaders - running the queue. + We cannot unlock LOCK_log until we have locked LOCK_commit_ordered; + otherwise scheduling could allow the next group commit to run ahead of us, + messing up the order of commit_ordered() calls. But as soon as + LOCK_commit_ordered is obtained, we can let the next group commit start. + */ + pthread_mutex_unlock(&LOCK_log); + DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log"); + ++num_group_commits; - Since a group leader runs the queue itself if a group_log_xid does not get - to do it forst, such leader threads do not need wait or wakeup. + /* + Wakeup each participant waiting for our group commit, first calling the + commit_ordered() methods for any transactions doing 2-phase commit. */ - for (current= xtra_queue; current != NULL; current= current->next) + current= queue; + while (current != NULL) { - /* - Note that we need to take LOCK_binlog_participant even in the case of a - leader! + DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered"); + ++num_commits; + if (current->trx_data->using_xa && !current->error) + run_commit_ordered(current->thd, current->all); - Otherwise there is a race between setting and testing the - group_commit_leader flag. + /* + Careful not to access current->next after waking up the other thread! As + it may change immediately after wakeup. */ - pthread_mutex_lock(¤t->LOCK_binlog_participant); - if (!current->group_commit_leader) - { - current->done= true; - pthread_cond_signal(¤t->COND_binlog_participant); - } - pthread_mutex_unlock(¤t->LOCK_binlog_participant); + group_commit_entry *next= current->next; + if (current != leader) // Don't wake up ourself + current->thd->signal_wakeup_ready(); + current= next; } + DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered"); + pthread_mutex_unlock(&LOCK_commit_ordered); + + DBUG_VOID_RETURN; } int -MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data) +MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry) { + binlog_trx_data *trx_data= entry->trx_data; IO_CACHE *cache= &trx_data->trans_log; /* Log "BEGIN" at the beginning of every transaction. Here, a transaction is @@ -5272,7 +5132,7 @@ MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data) in wrong positions being shown to the user, MASTER_POS_WAIT undue waiting etc. */ - if (trx_data->begin_event->write(&log_file)) + if (entry->begin_event->write(&log_file)) return ER_ERROR_ON_WRITE; DBUG_EXECUTE_IF("crash_before_writing_xid", @@ -5289,10 +5149,10 @@ MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data) if (write_cache(cache)) return ER_ERROR_ON_WRITE; - if (trx_data->end_event->write(&log_file)) + if (entry->end_event->write(&log_file)) return ER_ERROR_ON_WRITE; - if (trx_data->has_incident() && trx_data->incident_event->write(&log_file)) + if (entry->incident_event && entry->incident_event->write(&log_file)) return ER_ERROR_ON_WRITE; if (cache->error) // Error on read @@ -5754,30 +5614,6 @@ TC_LOG::run_commit_ordered(THD *thd, bool all) } } -TC_LOG_queued::TC_LOG_queued() : group_commit_queue(NULL) -{ -} - -TC_LOG_queued::~TC_LOG_queued() -{ -} - -TC_LOG_queued::TC_group_commit_entry * -TC_LOG_queued::reverse_queue(TC_LOG_queued::TC_group_commit_entry *queue) -{ - TC_group_commit_entry *entry= queue; - TC_group_commit_entry *prev= NULL; - while (entry) - { - TC_group_commit_entry *next= entry->next; - entry->next= prev; - prev= entry; - entry= next; - } - - return prev; -} - int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, bool need_prepare_ordered, bool need_commit_ordered) @@ -5886,142 +5722,6 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, } -TC_LOG_group_commit::TC_LOG_group_commit() - : num_commits(0), num_group_commits(0) -{ -} - -TC_LOG_group_commit::~TC_LOG_group_commit() -{ -} - -void -TC_LOG_group_commit::init() -{ - my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW, - "LOCK_group_commit", MYF(0)); -} - -void -TC_LOG_group_commit::deinit() -{ - pthread_mutex_destroy(&LOCK_group_commit); -} - -int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all, - bool need_prepare_ordered, - bool need_commit_ordered) -{ - IF_DBUG(int err;) - int cookie; - struct TC_group_commit_entry entry; - bool is_group_commit_leader; - - thd->clear_wakeup_ready(); - entry.thd= thd; - entry.all= all; - entry.xid_error= 0; - - pthread_mutex_lock(&LOCK_prepare_ordered); - TC_group_commit_entry *previous_queue= group_commit_queue; - entry.next= previous_queue; - group_commit_queue= &entry; - - DEBUG_SYNC(thd, "commit_before_prepare_ordered"); - run_prepare_ordered(thd, all); - DEBUG_SYNC(thd, "commit_after_prepare_ordered"); - pthread_mutex_unlock(&LOCK_prepare_ordered); - - is_group_commit_leader= (previous_queue == NULL); - - if (is_group_commit_leader) - { - TC_group_commit_entry *current; - - pthread_mutex_lock(&LOCK_group_commit); - DEBUG_SYNC(thd, "commit_after_get_LOCK_group_commit"); - - pthread_mutex_lock(&LOCK_prepare_ordered); - TC_group_commit_entry *queue= group_commit_queue; - group_commit_queue= NULL; - pthread_mutex_unlock(&LOCK_prepare_ordered); - - /* - Since we enqueue at the head, the queue is actually in reverse order. - So reverse it back into correct commit order before returning. - */ - queue= reverse_queue(queue); - - /* The first in the queue is the leader. */ - DBUG_ASSERT(queue == &entry && queue->thd == thd); - - DEBUG_SYNC(thd, "commit_before_group_log_xid"); - /* This will set individual error codes in each thd->xid_error. */ - group_log_xid(queue); - DEBUG_SYNC(thd, "commit_after_group_log_xid"); - - /* - Call commit_ordered methods for all transactions in the queue - (that did not get an error in group_log_xid()). - - We do this under an additional global LOCK_commit_ordered; this is - so that transactions that do not need 2-phase commit do not have - to wait for the potentially long duration of LOCK_group_commit. - */ - current= queue; - - DEBUG_SYNC(thd, "commit_before_get_LOCK_commit_ordered"); - pthread_mutex_lock(&LOCK_commit_ordered); - /* - We cannot unlock LOCK_group_commit until we have locked - LOCK_commit_ordered; otherwise scheduling could allow the next - group commit to run ahead of us, messing up the order of - commit_ordered() calls. But as soon as LOCK_commit_ordered is - obtained, we can let the next group commit start. - */ - pthread_mutex_unlock(&LOCK_group_commit); - DEBUG_SYNC(thd, "commit_after_release_LOCK_group_commit"); - - ++num_group_commits; - do - { - DEBUG_SYNC(thd, "commit_loop_entry_commit_ordered"); - ++num_commits; - if (!current->xid_error) - run_commit_ordered(current->thd, current->all); - - /* - Careful not to access current->next_commit_ordered after waking up - the other thread! As it may change immediately after wakeup. - */ - TC_group_commit_entry *next= current->next; - if (current != &entry) // Don't wake up ourself - current->thd->signal_wakeup_ready(); - current= next; - } while (current != NULL); - DEBUG_SYNC(thd, "commit_after_group_run_commit_ordered"); - - pthread_mutex_unlock(&LOCK_commit_ordered); - } - else - { - /* If not leader, just wait until leader wakes us up. */ - thd->wait_for_wakeup_ready(); - } - - /* - Now that we're back in our own thread context, do any delayed processing - and error reporting. - */ - IF_DBUG(err= entry.xid_error;) - cookie= xid_log_after(&entry); - /* The cookie must be non-zero in the non-error case. */ - DBUG_ASSERT(err || cookie); - - return cookie; -} - - /********* transaction coordinator log for 2pc - mmap() based solution *******/ /* @@ -6567,7 +6267,6 @@ int TC_LOG_BINLOG::open(const char *opt_name) DBUG_ASSERT(total_ha_2pc > 1); DBUG_ASSERT(opt_name && opt_name[0]); - TC_LOG_group_commit::init(); pthread_mutex_init(&LOCK_prep_xids, MY_MUTEX_INIT_FAST); pthread_cond_init (&COND_prep_xids, 0); @@ -6651,36 +6350,33 @@ void TC_LOG_BINLOG::close() DBUG_ASSERT(prepared_xids==0); pthread_mutex_destroy(&LOCK_prep_xids); pthread_cond_destroy (&COND_prep_xids); - TC_LOG_group_commit::deinit(); } /* Do a binlog log_xid() for a group of transactions, linked through thd->next_commit_ordered. */ -void -TC_LOG_BINLOG::group_log_xid(TC_group_commit_entry *first) -{ - DBUG_ENTER("TC_LOG_BINLOG::group_log_xid"); - trx_group_commit_leader(first); - for (TC_group_commit_entry *entry= first; entry; entry= entry->next) - { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); - entry->xid_error= trx_data->error; - } - DBUG_VOID_RETURN; -} - int -TC_LOG_BINLOG::xid_log_after(TC_group_commit_entry *entry) +TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered __attribute__((unused)), + bool need_commit_ordered __attribute__((unused))) { + int err; + DBUG_ENTER("TC_LOG_BINLOG::log_and_order"); + binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); - if (trx_group_commit_finish(trx_data)) - return 0; // Returning zero cookie signals error + (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + + trx_data->using_xa= TRUE; + if (xid) + { + Xid_log_event xid_event(thd, xid); + err= binlog_flush_trx_cache(thd, trx_data, &xid_event, all); + } else - return 1; + err= binlog_flush_trx_cache(thd, trx_data, NULL, all); + + DBUG_RETURN(!err); } /* diff --git a/sql/log.h b/sql/log.h index 37f5462f198..c5a2a72647b 100644 --- a/sql/log.h +++ b/sql/log.h @@ -73,101 +73,6 @@ extern pthread_mutex_t LOCK_commit_ordered; extern void TC_init(); extern void TC_destroy(); -/* - Base class for two TC implementations TC_LOG_unordered and - TC_LOG_group_commit that both use a queue of threads waiting for group - commit. -*/ -class TC_LOG_queued: public TC_LOG -{ -protected: - TC_LOG_queued(); - ~TC_LOG_queued(); - - /* Structure used to link list of THDs waiting for group commit. */ - struct TC_group_commit_entry - { - struct TC_group_commit_entry *next; - THD *thd; - /* This is the `all' parameter for ha_commit_trans() etc. */ - bool all; - /* - Set by TC_LOG_group_commit::group_log_xid(), to return per-thd error and - cookie. - */ - int xid_error; - }; - - TC_group_commit_entry * reverse_queue(TC_group_commit_entry *queue); - - /* - This is a queue of threads waiting for being allowed to commit. - Access to the queue must be protected by LOCK_prepare_ordered. - */ - TC_group_commit_entry *group_commit_queue; -}; - -class TC_LOG_group_commit: public TC_LOG_queued -{ -public: - TC_LOG_group_commit(); - ~TC_LOG_group_commit(); - void init(); - void deinit(); - - int log_and_order(THD *thd, my_xid xid, bool all, - bool need_prepare_ordered, bool need_commit_ordered); - -protected: - /* Total number of committed transactions. */ - ulonglong num_commits; - /* Number of group commits done. */ - ulonglong num_group_commits; - - /* - When using this class, this method is used instead of log_xid() to do - logging of a group of transactions all at once. - - The transactions will be linked through THD::next_commit_ordered. - - Additionally, when this method is used instead of log_xid(), the order in - which handler->prepare_ordered() and handler->commit_ordered() are called - is guaranteed to be the same as the order of calls and THD list elements - for group_log_xid(). - - This can be used to efficiently implement group commit that at the same - time preserves the order of commits among handlers and TC (eg. to get same - commit order in InnoDB and binary log). - - For TCs that do not need this, it can be preferable to use plain log_xid() - with class TC_LOG_unordered instead, as it allows threads to run log_xid() - in parallel with each other. In contrast, group_log_xid() runs under a - global mutex, so it is guaranteed that only once call into it will be - active at once. - - Since this call handles multiple threads/THDs at once, my_error() (and - other code that relies on thread local storage) cannot be used in this - method. Instead, the implementation must record any error and report it as - the return value from xid_log_after(), which will be invoked individually - for each thread. - - In the success case, this method must set thd->xid_cookie for each thread - to the cookie that is normally returned from log_xid() (which must be - non-zero in the non-error case). - */ - virtual void group_log_xid(TC_group_commit_entry *first) = 0; - /* - Called for each transaction (in corrent thread context) after - group_log_xid() has finished, but with no guarantee on ordering among - threads. - Can be used to do error reporting etc. */ - virtual int xid_log_after(TC_group_commit_entry *entry) = 0; - -private: - /* Mutex used to serialise calls to group_log_xid(). */ - pthread_mutex_t LOCK_group_commit; -}; - class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging { public: @@ -398,17 +303,33 @@ private: }; class binlog_trx_data; -class MYSQL_BIN_LOG: public TC_LOG_group_commit, private MYSQL_LOG +class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG { private: + struct group_commit_entry + { + struct group_commit_entry *next; + THD *thd; + binlog_trx_data *trx_data; + /* + Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be + written during group commit. The incident_event is only valid if + trx_data->has_incident() is true. + */ + Log_event *begin_event; + Log_event *end_event; + Log_event *incident_event; + /* Set during group commit to record any per-thread error. */ + int error; + int commit_errno; + /* This is the `all' parameter for ha_commit_ordered(). */ + bool all; + /* True if we come in through XA log_and_order(), false otherwise. */ + }; + /* LOCK_log and LOCK_index are inited by init_pthread_objects() */ pthread_mutex_t LOCK_index; pthread_mutex_t LOCK_prep_xids; - /* - Mutex to protect the queue of non-transactional binlog writes waiting to - participate in group commit. - */ - pthread_mutex_t LOCK_queue; pthread_cond_t COND_prep_xids; pthread_cond_t update_cond; @@ -449,7 +370,11 @@ class MYSQL_BIN_LOG: public TC_LOG_group_commit, private MYSQL_LOG */ bool no_auto_events; /* Queue of transactions queued up to participate in group commit. */ - binlog_trx_data *group_commit_queue; + group_commit_entry *group_commit_queue; + /* Total number of committed transactions. */ + ulonglong num_commits; + /* Number of group commits done. */ + ulonglong num_group_commits; int write_to_file(IO_CACHE *cache); /* @@ -459,10 +384,9 @@ class MYSQL_BIN_LOG: public TC_LOG_group_commit, private MYSQL_LOG */ void new_file_without_locking(); void new_file_impl(bool need_lock); - int write_transaction(binlog_trx_data *trx_data); - bool write_transaction_to_binlog_events(binlog_trx_data *trx_data); - void trx_group_commit_participant(binlog_trx_data *trx_data); - void trx_group_commit_leader(TC_group_commit_entry *first); + int write_transaction(group_commit_entry *entry); + bool write_transaction_to_binlog_events(group_commit_entry *entry); + void trx_group_commit_leader(group_commit_entry *leader); void mark_xid_done(); void mark_xids_active(uint xid_count); @@ -493,8 +417,8 @@ public: int open(const char *opt_name); void close(); - void group_log_xid(TC_group_commit_entry *first); - int xid_log_after(TC_group_commit_entry *entry); + int log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, bool need_commit_ordered); void unlog(ulong cookie, my_xid xid); int recover(IO_CACHE *log, Format_description_log_event *fdle); #if !defined(MYSQL_CLIENT) @@ -540,8 +464,7 @@ public: void reset_gathered_updates(THD *thd); bool write(Log_event* event_info); // binary log write bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev); - bool trx_group_commit_finish(binlog_trx_data *trx_data); + Log_event *end_ev, bool all); bool write_incident(THD *thd); int write_cache(IO_CACHE *cache); |