diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 1276 |
1 files changed, 1040 insertions, 236 deletions
diff --git a/sql/log.cc b/sql/log.cc index f52e68dd1b9..8440a835158 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -38,6 +38,7 @@ #endif #include <mysql/plugin.h> +#include "debug_sync.h" /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 @@ -154,9 +155,12 @@ class binlog_trx_data { public: binlog_trx_data() : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0), - before_stmt_pos(MY_OFF_T_UNDEF) + before_stmt_pos(MY_OFF_T_UNDEF), using_xa(0) { trans_log.end_of_file= max_binlog_cache_size; + (void) my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW, + "LOCK_group_commit", MYF(0)); + (void) pthread_cond_init(&COND_group_commit, 0); } ~binlog_trx_data() @@ -208,11 +212,12 @@ public: completely. */ void reset() { - if (!empty()) + if (trans_log.type != WRITE_CACHE || !empty()) truncate(0); before_stmt_pos= MY_OFF_T_UNDEF; incident= FALSE; trans_log.end_of_file= max_binlog_cache_size; + using_xa= FALSE; DBUG_ASSERT(empty()); } @@ -257,6 +262,41 @@ public: Binlog position before the start of the current statement. */ my_off_t before_stmt_pos; + + /* 0 or error when writing to binlog; set during group commit. */ + int error; + /* If error != 0, value of errno (for my_error() reporting). */ + int commit_errno; + /* Link for queueing transactions up for group commit to binlog. */ + binlog_trx_data *next; + /* + Flag set true when group commit for this transaction is finished; used + with pthread_cond_wait() to wait until commit is done. + This flag is protected by LOCK_group_commit. + */ + bool done; + /* + Flag set if this transaction is the group commit leader that will handle + the actual writing to the binlog. + This flag is protected by LOCK_group_commit. + */ + bool group_commit_leader; + /* + Flag set true if this transaction is committed with log_xid() as part of + XA, false if not. + */ + bool using_xa; + /* + Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be + written during group commit. The incident_event is only valid if + has_incident() is true. + */ + Log_event *begin_event; + Log_event *end_event; + Log_event *incident_event; + /* Mutex and condition for wakeup after group commit. */ + pthread_mutex_t LOCK_group_commit; + pthread_cond_t COND_group_commit; }; handlerton *binlog_hton; @@ -1391,117 +1431,188 @@ static int binlog_close_connection(handlerton *hton, THD *thd) return 0; } +/* Helper functions for binlog_flush_trx_cache(). */ +static int +binlog_flush_trx_cache_prepare(THD *thd) +{ + if (thd->binlog_flush_pending_rows_event(TRUE)) + return 1; + return 0; +} + +static void +binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data) +{ + IO_CACHE *trans_log= &trx_data->trans_log; + + trx_data->reset(); + + statistic_increment(binlog_cache_use, &LOCK_status); + if (trans_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + trans_log->disk_writes= 0; + } +} + +/* + End a transaction, writing events to the binary log. + + SYNOPSIS + binlog_flush_trx_cache() + + thd The thread whose transaction should be ended + trx_data Pointer to the transaction data to use + end_ev The end event to use (COMMIT, ROLLBACK, or commit XID) + + DESCRIPTION + + End the currently open transaction. The transaction can be either + a real transaction or a statement transaction. + + This can be to commit a transaction, with a COMMIT query event or an XA + commit XID event. But it can also be to rollback a transaction with a + ROLLBACK query event, used for rolling back transactions which also + contain updates to non-transactional tables. + */ +static int +binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev) +{ + DBUG_ENTER("binlog_flush_trx_cache"); + DBUG_PRINT("info", ("thd->options={ %s%s}", + FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->options, OPTION_BEGIN))); + + if (binlog_flush_trx_cache_prepare(thd)) + DBUG_RETURN(1); + + /* + Doing a commit or a rollback including non-transactional tables, + i.e., ending a transaction where we might write the transaction + cache to the binary log. + + We can always end the statement when ending a transaction since + transactions are not allowed inside stored functions. If they + were, we would have to ensure that we're not ending a statement + inside a stored function. + */ + int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, end_ev); + + binlog_flush_trx_cache_finish(thd, trx_data); + + DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); + DBUG_RETURN(error); +} + /* - End a transaction. + Discard a transaction, ie. ROLLBACK with only transactional table updates. SYNOPSIS - binlog_end_trans() + binlog_truncate_trx_cache() thd The thread whose transaction should be ended trx_data Pointer to the transaction data to use - end_ev The end event to use, or NULL all True if the entire transaction should be ended, false if only the statement transaction should be ended. DESCRIPTION - End the currently open transaction. The transaction can be either - a real transaction (if 'all' is true) or a statement transaction - (if 'all' is false). + Rollback (and end) a transaction that only modifies transactional + tables. The transaction can be either a real transaction (if 'all' is + true) or a statement transaction (if 'all' is false). - If 'end_ev' is NULL, the transaction is a rollback of only - transactional tables, so the transaction cache will be truncated - to either just before the last opened statement transaction (if - 'all' is false), or reset completely (if 'all' is true). + The transaction cache will be truncated to either just before the last + opened statement transaction (if 'all' is false), or reset completely (if + 'all' is true). */ static int -binlog_end_trans(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev, bool all) +binlog_truncate_trx_cache(THD *thd, binlog_trx_data *trx_data, bool all) { - DBUG_ENTER("binlog_end_trans"); - int error=0; - IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_PRINT("enter", ("transaction: %s end_ev: 0x%lx", - all ? "all" : "stmt", (long) end_ev)); + DBUG_ENTER("binlog_truncate_trx_cache"); + int error= 0; + DBUG_PRINT("enter", ("transaction: %s", all ? "all" : "stmt")); DBUG_PRINT("info", ("thd->options={ %s%s}", FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), FLAGSTR(thd->options, OPTION_BEGIN))); /* - NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of - only transactional tables. If the transaction contain changes to - any non-transactiona tables, we need write the transaction and log - a ROLLBACK last. + ROLLBACK with nothing to replicate: i.e., rollback of only transactional + tables. */ - if (end_ev != NULL) - { - if (thd->binlog_flush_pending_rows_event(TRUE)) - DBUG_RETURN(1); - /* - Doing a commit or a rollback including non-transactional tables, - i.e., ending a transaction where we might write the transaction - cache to the binary log. - - We can always end the statement when ending a transaction since - transactions are not allowed inside stored functions. If they - were, we would have to ensure that we're not ending a statement - inside a stored function. - */ - error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev, - trx_data->has_incident()); - trx_data->reset(); - /* - We need to step the table map version after writing the - transaction cache to disk. - */ - mysql_bin_log.update_table_map_version(); - statistic_increment(binlog_cache_use, &LOCK_status); - if (trans_log->disk_writes != 0) - { - statistic_increment(binlog_cache_disk_use, &LOCK_status); - trans_log->disk_writes= 0; - } - } - else - { - /* - If rolling back an entire transaction or a single statement not - inside a transaction, we reset the transaction cache. - - If rolling back a statement in a transaction, we truncate the - transaction cache to remove the statement. - */ - thd->binlog_remove_pending_rows_event(TRUE); - if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) - { - if (trx_data->has_incident()) - error= mysql_bin_log.write_incident(thd, TRUE); - trx_data->reset(); - } - else // ...statement - trx_data->truncate(trx_data->before_stmt_pos); + /* + If rolling back an entire transaction or a single statement not + inside a transaction, we reset the transaction cache. - /* - We need to step the table map version on a rollback to ensure - that a new table map event is generated instead of the one that - was written to the thrown-away transaction cache. - */ - mysql_bin_log.update_table_map_version(); + If rolling back a statement in a transaction, we truncate the + transaction cache to remove the statement. + */ + thd->binlog_remove_pending_rows_event(TRUE); + if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + { + if (trx_data->has_incident()) + error= mysql_bin_log.write_incident(thd); + trx_data->reset(); } + else // ...statement + trx_data->truncate(trx_data->before_stmt_pos); DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); DBUG_RETURN(error); } +static LEX_STRING const write_error_msg= + { C_STRING_WITH_LEN("error writing to the binary log") }; + static int binlog_prepare(handlerton *hton, THD *thd, bool all) { /* - do nothing. - just pretend we can do 2pc, so that MySQL won't - switch to 1pc. - real work will be done in MYSQL_BIN_LOG::log_xid() + If this prepare is for a single statement in the middle of a transactions, + not the actual transaction commit, then we do nothing. The real work is + only done later, in the prepare for making persistent changes. */ + if (!all && (thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + return 0; + + binlog_trx_data *trx_data= + (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + + trx_data->using_xa= TRUE; + + if (binlog_flush_trx_cache_prepare(thd)) + return 1; + + my_xid xid= thd->transaction.xid_state.xid.get_my_xid(); + if (!xid) + { + /* Skip logging this transaction, marked by setting end_event to NULL. */ + trx_data->end_event= NULL; + return 0; + } + + /* + Allocate the extra events that will be logged to the binlog in binlog group + commit. Use placement new to allocate them on the THD memroot, as they need + to remain live until log_xid() returns. + */ + size_t needed_size= sizeof(Query_log_event) + sizeof(Xid_log_event); + if (trx_data->has_incident()) + needed_size+= sizeof(Incident_log_event); + uchar *mem= (uchar *)thd->alloc(needed_size); + if (!mem) + return 1; + + trx_data->begin_event= new ((void *)mem) + Query_log_event(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + mem+= sizeof(Query_log_event); + + trx_data->end_event= new ((void *)mem) Xid_log_event(thd, xid); + + if (trx_data->has_incident()) + trx_data->incident_event= new ((void *)(mem + sizeof(Xid_log_event))) + Incident_log_event(thd, INCIDENT_LOST_EVENTS, write_error_msg); + return 0; } @@ -1525,11 +1636,11 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) binlog_trx_data *const trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - if (trx_data->empty()) + if (trx_data->using_xa) { // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid() - trx_data->reset(); - DBUG_RETURN(0); + binlog_flush_trx_cache_finish(thd, trx_data); + DBUG_RETURN(error); } /* @@ -1556,8 +1667,8 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) !stmt_has_updated_trans_table(thd) && thd->transaction.stmt.modified_non_trans_table)) { - Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); - error= binlog_end_trans(thd, trx_data, &qev, all); + Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); + error= binlog_flush_trx_cache(thd, trx_data, &end_ev); } trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0; @@ -1621,7 +1732,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) (thd->options & OPTION_KEEP_LOG)) && mysql_bin_log.check_write_error(thd)) trx_data->set_incident(); - error= binlog_end_trans(thd, trx_data, 0, all); + error= binlog_truncate_trx_cache(thd, trx_data, all); } else { @@ -1641,8 +1752,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) thd->current_stmt_binlog_row_based) || ((thd->options & OPTION_KEEP_LOG))) { - Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); - error= binlog_end_trans(thd, trx_data, &qev, all); + Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); + error= binlog_flush_trx_cache(thd, trx_data, &end_ev); } /* Otherwise, we simply truncate the cache as there is no change on @@ -1650,7 +1761,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) */ else if ((all && !thd->transaction.all.modified_non_trans_table) || (!all && !thd->transaction.stmt.modified_non_trans_table)) - error= binlog_end_trans(thd, trx_data, 0, all); + error= binlog_truncate_trx_cache(thd, trx_data, all); } if (!all) trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt rollback @@ -2464,7 +2575,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name, MYSQL_BIN_LOG::MYSQL_BIN_LOG() :bytes_written(0), prepared_xids(0), file_id(1), open_count(1), - need_start_event(TRUE), m_table_map_version(0), + need_start_event(TRUE), is_relay_log(0), description_event_for_exec(0), description_event_for_queue(0) { @@ -2492,6 +2603,7 @@ void MYSQL_BIN_LOG::cleanup() delete description_event_for_exec; (void) pthread_mutex_destroy(&LOCK_log); (void) pthread_mutex_destroy(&LOCK_index); + (void) pthread_mutex_destroy(&LOCK_queue); (void) pthread_cond_destroy(&update_cond); } DBUG_VOID_RETURN; @@ -2520,6 +2632,8 @@ void MYSQL_BIN_LOG::init_pthread_objects() */ (void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index", MYF_NO_DEADLOCK_DETECTION); + (void) my_pthread_mutex_init(&LOCK_queue, MY_MUTEX_INIT_FAST, "LOCK_queue", + MYF(0)); (void) pthread_cond_init(&update_cond, 0); } @@ -3943,6 +4057,10 @@ err: } +#ifndef DBUG_OFF +static ulong opt_binlog_dbug_fsync_sleep= 0; +#endif + bool MYSQL_BIN_LOG::flush_and_sync() { int err=0, fd=log_file.file; @@ -3953,6 +4071,11 @@ bool MYSQL_BIN_LOG::flush_and_sync() { sync_binlog_counter= 0; err=my_sync(fd, MYF(MY_WME)); +#ifndef DBUG_OFF + ulong usec_sleep= opt_binlog_dbug_fsync_sleep; + if (usec_sleep > 0) + my_sleep(usec_sleep); +#endif } return err; } @@ -4113,7 +4236,6 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) DBUG_RETURN(error); binlog_table_maps++; - table->s->table_map_version= mysql_bin_log.table_map_version(); DBUG_RETURN(0); } @@ -4194,64 +4316,41 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, if (Rows_log_event* pending= trx_data->pending()) { - IO_CACHE *file= &log_file; - /* Decide if we should write to the log file directly or to the transaction log. */ if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log)) - file= &trx_data->trans_log; - - /* - If we are writing to the log file directly, we could avoid - locking the log. This does not work since we need to step the - m_table_map_version below, and that change has to be protected - by the LOCK_log mutex. - */ - pthread_mutex_lock(&LOCK_log); - - /* - Write pending event to log file or transaction cache - */ - if (pending->write(file)) { - pthread_mutex_unlock(&LOCK_log); - set_write_error(thd); - DBUG_RETURN(1); + /* Write to transaction log/cache. */ + if (pending->write(&trx_data->trans_log)) + { + set_write_error(thd); + DBUG_RETURN(1); + } } - - /* - We step the table map version if we are writing an event - representing the end of a statement. We do this regardless of - wheather we write to the transaction cache or to directly to the - file. - - In an ideal world, we could avoid stepping the table map version - if we were writing to a transaction cache, since we could then - reuse the table map that was written earlier in the transaction - cache. This does not work since STMT_END_F implies closing all - table mappings on the slave side. - - TODO: Find a solution so that table maps does not have to be - written several times within a transaction. - */ - if (pending->get_flags(Rows_log_event::STMT_END_F)) - ++m_table_map_version; - - delete pending; - - if (file == &log_file) + else { + /* Write directly to log file. */ + pthread_mutex_lock(&LOCK_log); + if (pending->write(&log_file)) + { + pthread_mutex_unlock(&LOCK_log); + set_write_error(thd); + DBUG_RETURN(1); + } + error= flush_and_sync(); if (!error) { signal_update(); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } + + pthread_mutex_unlock(&LOCK_log); } - pthread_mutex_unlock(&LOCK_log); + delete pending; } thd->binlog_set_pending_rows_event(event); @@ -4450,9 +4549,6 @@ err: set_write_error(thd); } - if (event_info->flags & LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F) - ++m_table_map_version; - pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -4575,18 +4671,14 @@ uint MYSQL_BIN_LOG::next_file_id() SYNOPSIS write_cache() cache Cache to write to the binary log - lock_log True if the LOCK_log mutex should be aquired, false otherwise - sync_log True if the log should be flushed and sync:ed DESCRIPTION Write the contents of the cache to the binary log. The cache will be reset as a READ_CACHE to be able to read the contents from it. */ -int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log) +int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache) { - Mutex_sentry sentry(lock_log ? &LOCK_log : NULL); - if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) return ER_ERROR_ON_WRITE; uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; @@ -4697,6 +4789,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log) } /* Write data to the binary log file */ + DBUG_EXECUTE_IF("fail_binlog_write_1", return ER_ERROR_ON_WRITE;); if (my_b_write(&log_file, cache->read_pos, length)) return ER_ERROR_ON_WRITE; cache->read_pos=cache->read_end; // Mark buffer used up @@ -4704,9 +4797,6 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log) DBUG_ASSERT(carry == 0); - if (sync_log) - flush_and_sync(); - return 0; // All OK } @@ -4739,26 +4829,22 @@ int query_error_code(THD *thd, bool not_killed) return error; } -bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock) +bool MYSQL_BIN_LOG::write_incident(THD *thd) { uint error= 0; DBUG_ENTER("MYSQL_BIN_LOG::write_incident"); - LEX_STRING const write_error_msg= - { C_STRING_WITH_LEN("error writing to the binary log") }; Incident incident= INCIDENT_LOST_EVENTS; Incident_log_event ev(thd, incident, write_error_msg); - if (lock) - pthread_mutex_lock(&LOCK_log); + + pthread_mutex_lock(&LOCK_log); error= ev.write(&log_file); - if (lock) + if (!error && !(error= flush_and_sync())) { - if (!error && !(error= flush_and_sync())) - { - signal_update(); - rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); - } - pthread_mutex_unlock(&LOCK_log); + signal_update(); + rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } + pthread_mutex_unlock(&LOCK_log); + DBUG_RETURN(error); } @@ -4786,103 +4872,366 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock) 'cache' needs to be reinitialized after this functions returns. */ -bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, - bool incident) +bool +MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev) +{ + DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); + + /* + Create the necessary events here, where we have the correct THD (and + thread context). + + Due to group commit the actual writing to binlog may happen in a different + thread. + */ + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + trx_data->begin_event= &qinfo; + trx_data->end_event= end_ev; + if (trx_data->has_incident()) + { + Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg); + trx_data->incident_event= &inc_ev; + DBUG_RETURN(write_transaction_to_binlog_events(trx_data)); + } + else + { + trx_data->incident_event= NULL; + DBUG_RETURN(write_transaction_to_binlog_events(trx_data)); + } +} + +bool +MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data) { - DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)"); + /* + To facilitate group commit for the binlog, we first queue up ourselves in + the group commit queue. Then the first thread to enter the queue waits for + the LOCK_log mutex, and commits for everyone in the queue once it gets the + lock. Any other threads in the queue just wait for the first one to finish + the commit and wake them up. + */ + + pthread_mutex_lock(&trx_data->LOCK_group_commit); + const binlog_trx_data *orig_queue= atomic_enqueue_trx(trx_data); + + if (orig_queue != NULL) + { + trx_data->group_commit_leader= FALSE; + trx_data->done= FALSE; + trx_group_commit_participant(trx_data); + } + else + { + trx_data->group_commit_leader= TRUE; + pthread_mutex_unlock(&trx_data->LOCK_group_commit); + trx_group_commit_leader(NULL); + } + + return trx_group_commit_finish(trx_data); +} + +/* + Participate as secondary transaction in group commit. + + Another thread is already waiting to obtain the LOCK_log, and should include + this thread in the group commit once the log is obtained. So here we put + ourself in the queue and wait to be signalled that the group commit is done. + + Note that this function must be called with the trs_data->LOCK_group_commit + locked; the mutex will be released before return. +*/ +void +MYSQL_BIN_LOG::trx_group_commit_participant(binlog_trx_data *trx_data) +{ + safe_mutex_assert_owner(&trx_data->LOCK_group_commit); + + /* Wait until trx_data.done == true and woken up by the leader. */ + while (!trx_data->done) + pthread_cond_wait(&trx_data->COND_group_commit, + &trx_data->LOCK_group_commit); + pthread_mutex_unlock(&trx_data->LOCK_group_commit); +} + +bool +MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data) +{ + DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_finish"); + DBUG_PRINT("info", ("trx_data->error=%d\n", trx_data->error)); + if (trx_data->error) + { + switch (trx_data->error) + { + case ER_ERROR_ON_WRITE: + my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, trx_data->commit_errno); + break; + case ER_ERROR_ON_READ: + my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), + trx_data->trans_log.file_name, trx_data->commit_errno); + break; + default: + /* + There are not (and should not be) any errors thrown not covered above. + But just in case one is added later without updating the above switch + statement, include a catch-all. + */ + my_printf_error(trx_data->error, + "Error writing transaction to binary log: %d", + MYF(ME_NOREFRESH), trx_data->error); + } + + /* + Since we return error, this transaction XID will not be committed, so + we need to mark it as not needed for recovery (unlog() is not called + for a transaction if log_xid() fails). + */ + if (trx_data->end_event->get_type_code() == XID_EVENT) + mark_xid_done(); + + DBUG_RETURN(1); + } + + DBUG_RETURN(0); +} + +/* + Do binlog group commit as the lead thread. + + This must be called when this thread/transaction is queued at the start of + the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group + commit all the transactions in the queue (more may have entered while waiting + for LOCK_log). After commit is done, all other threads in the queue will be + signalled. + + */ +void +MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first) +{ + uint xid_count= 0; + uint write_count= 0; + + /* First, put anything from group_log_xid into the queue. */ + binlog_trx_data *full_queue= NULL; + binlog_trx_data **next_ptr= &full_queue; + for (TC_group_commit_entry *entry= first; entry; entry= entry->next) + { + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); + + /* Skip log_xid for transactions without xid, marked by NULL end_event. */ + if (!trx_data->end_event) + continue; + + trx_data->error= 0; + *next_ptr= trx_data; + next_ptr= &(trx_data->next); + } + + /* + Next, lock the LOCK_log(), and once we get it, add any additional writes + that queued up while we were waiting. + + Note that if some writer not going through log_xid() comes in and gets the + LOCK_log before us, they will not be able to include us in their group + commit (and they are not able to handle ensuring same commit order between + us and participating transactional storage engines anyway). + + On the other hand, when we get the LOCK_log, we will be able to include + any non-trasactional writes that queued up in our group commit. This + should hopefully not be too big of a problem, as group commit is most + important for the transactional case anyway when durability (fsync) is + enabled. + */ VOID(pthread_mutex_lock(&LOCK_log)); - /* NULL would represent nothing to replicate after ROLLBACK */ - DBUG_ASSERT(commit_event != NULL); + /* + As the queue is in reverse order of entering, reverse the queue as we add + it to the existing one. Note that there is no ordering defined between + transactional and non-transactional commits. + */ + binlog_trx_data *current= atomic_grab_trx_queue(); + binlog_trx_data *xtra_queue= NULL; + while (current) + { + current->error= 0; + binlog_trx_data *next= current->next; + current->next= xtra_queue; + xtra_queue= current; + current= next; + } + *next_ptr= xtra_queue; + /* + Now we have in full_queue the list of transactions to be committed in + order. + */ DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { /* - We only bother to write to the binary log if there is anything - to write. - */ - if (my_b_tell(cache) > 0) + Commit every transaction in the queue. + + Note that we are doing this in a different thread than the one running + the transaction! So we are limited in the operations we can do. In + particular, we cannot call my_error() on behalf of a transaction, as + that obtains the THD from thread local storage. Instead, we must set + current->error and let the thread do the error reporting itself once + we wake it up. + */ + for (current= full_queue; current != NULL; current= current->next) { - /* - Log "BEGIN" at the beginning of every transaction. Here, a - transaction is either a BEGIN..COMMIT block or a single - statement in autocommit mode. - */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + IO_CACHE *cache= ¤t->trans_log; /* - Now this Query_log_event has artificial log_pos 0. It must be - adjusted to reflect the real position in the log. Not doing it - would confuse the slave: it would prevent this one from - knowing where he is in the master's binlog, which would result - in wrong positions being shown to the user, MASTER_POS_WAIT - undue waiting etc. + We only bother to write to the binary log if there is anything + to write. */ - if (qinfo.write(&log_file)) - goto err; - - DBUG_EXECUTE_IF("crash_before_writing_xid", - { - if ((write_error= write_cache(cache, false, true))) - DBUG_PRINT("info", ("error writing binlog cache: %d", - write_error)); - DBUG_PRINT("info", ("crashing before writing xid")); - abort(); - }); - - if ((write_error= write_cache(cache, false, false))) - goto err; + if (my_b_tell(cache) > 0) + { + current->error= write_transaction(current); + if (current->error) + current->commit_errno= errno; - if (commit_event && commit_event->write(&log_file)) - goto err; + write_count++; + } - if (incident && write_incident(thd, FALSE)) - goto err; + if (current->end_event->get_type_code() == XID_EVENT) + xid_count++; + } + if (write_count > 0) + { if (flush_and_sync()) - goto err; - DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_ABORT();); - if (cache->error) // Error on read { - sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno); - write_error=1; // Don't give more errors - goto err; + for (current= full_queue; current != NULL; current= current->next) + { + if (!current->error) + { + current->error= ER_ERROR_ON_WRITE; + current->commit_errno= errno; + } + } + } + else + { + signal_update(); } - signal_update(); } /* - if commit_event is Xid_log_event, increase the number of + if any commit_events are Xid_log_event, increase the number of prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated if there're prepared xids in it - see the comment in new_file() for an explanation. - If the commit_event is not Xid_log_event (then it's a Query_log_event) - rotate binlog, if necessary. + If no Xid_log_events (then it's all Query_log_event) rotate binlog, + if necessary. */ - if (commit_event && commit_event->get_type_code() == XID_EVENT) + if (xid_count > 0) { - pthread_mutex_lock(&LOCK_prep_xids); - prepared_xids++; - pthread_mutex_unlock(&LOCK_prep_xids); + mark_xids_active(xid_count); } else rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } + VOID(pthread_mutex_unlock(&LOCK_log)); - DBUG_RETURN(0); + /* + Signal those that are not part of group_log_xid, and are not group leaders + running the queue. -err: - if (!write_error) + Since a group leader runs the queue itself if a group_log_xid does not get + to do it forst, such leader threads do not need wait or wakeup. + */ + for (current= xtra_queue; current != NULL; current= current->next) { - write_error= 1; - sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); + /* + Note that we need to take LOCK_group_commit even in the case of a leader! + + Otherwise there is a race between setting and testing the + group_commit_leader flag. + */ + pthread_mutex_lock(¤t->LOCK_group_commit); + if (!current->group_commit_leader) + { + current->done= true; + pthread_cond_signal(¤t->COND_group_commit); + } + pthread_mutex_unlock(¤t->LOCK_group_commit); } - VOID(pthread_mutex_unlock(&LOCK_log)); - DBUG_RETURN(1); } +int +MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data) +{ + IO_CACHE *cache= &trx_data->trans_log; + /* + Log "BEGIN" at the beginning of every transaction. Here, a transaction is + either a BEGIN..COMMIT block or a single statement in autocommit mode. The + event was constructed in write_transaction_to_binlog(), in the thread + running the transaction. + + Now this Query_log_event has artificial log_pos 0. It must be + adjusted to reflect the real position in the log. Not doing it + would confuse the slave: it would prevent this one from + knowing where he is in the master's binlog, which would result + in wrong positions being shown to the user, MASTER_POS_WAIT + undue waiting etc. + */ + if (trx_data->begin_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + + DBUG_EXECUTE_IF("crash_before_writing_xid", + { + if ((write_cache(cache))) + DBUG_PRINT("info", ("error writing binlog cache")); + else + flush_and_sync(); + + DBUG_PRINT("info", ("crashing before writing xid")); + abort(); + }); + + if (write_cache(cache)) + return ER_ERROR_ON_WRITE; + + if (trx_data->end_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + + if (trx_data->has_incident() && trx_data->incident_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + + if (cache->error) // Error on read + return ER_ERROR_ON_READ; + + return 0; +} + +binlog_trx_data * +MYSQL_BIN_LOG::atomic_enqueue_trx(binlog_trx_data *trx_data) +{ + my_atomic_rwlock_wrlock(&LOCK_queue); + trx_data->next= group_commit_queue; + while (!my_atomic_casptr((void **)(&group_commit_queue), + (void **)(&trx_data->next), + trx_data)) + ; + my_atomic_rwlock_wrunlock(&LOCK_queue); + return trx_data->next; +} + +binlog_trx_data * +MYSQL_BIN_LOG::atomic_grab_trx_queue() +{ + my_atomic_rwlock_wrlock(&LOCK_queue); + binlog_trx_data *queue= group_commit_queue; + while (!my_atomic_casptr((void **)(&group_commit_queue), + (void **)(&queue), + NULL)) + ; + my_atomic_rwlock_wrunlock(&LOCK_queue); + return queue; +} /** Wait until we get a signal that the binary log has been updated. @@ -5276,6 +5625,344 @@ void sql_print_information(const char *format, ...) } +static my_bool mutexes_inited; +pthread_mutex_t LOCK_prepare_ordered; +pthread_mutex_t LOCK_commit_ordered; + +void +TC_init() +{ + my_pthread_mutex_init(&LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW, + "LOCK_prepare_ordered", MYF(0)); + my_pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_SLOW, + "LOCK_commit_ordered", MYF(0)); + mutexes_inited= TRUE; +} + +void +TC_destroy() +{ + if (mutexes_inited) + { + pthread_mutex_destroy(&LOCK_prepare_ordered); + pthread_mutex_destroy(&LOCK_commit_ordered); + mutexes_inited= FALSE; + } +} + +void +TC_LOG::run_prepare_ordered(THD *thd, bool all) +{ + Ha_trx_info *ha_info= + all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + + for (; ha_info; ha_info= ha_info->next()) + { + handlerton *ht= ha_info->ht(); + if (!ht->prepare_ordered) + continue; + safe_mutex_assert_owner(&LOCK_prepare_ordered); + ht->prepare_ordered(ht, thd, all); + } +} + +void +TC_LOG::run_commit_ordered(THD *thd, bool all) +{ + Ha_trx_info *ha_info= + all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + + for (; ha_info; ha_info= ha_info->next()) + { + handlerton *ht= ha_info->ht(); + if (!ht->commit_ordered) + continue; + safe_mutex_assert_owner(&LOCK_commit_ordered); + ht->commit_ordered(ht, thd, all); + DEBUG_SYNC(thd, "commit_after_run_commit_ordered"); + } +} + +TC_LOG_queued::TC_LOG_queued() : group_commit_queue(NULL) +{ +} + +TC_LOG_queued::~TC_LOG_queued() +{ +} + +TC_LOG_queued::TC_group_commit_entry * +TC_LOG_queued::reverse_queue(TC_LOG_queued::TC_group_commit_entry *queue) +{ + TC_group_commit_entry *entry= queue; + TC_group_commit_entry *prev= NULL; + while (entry) + { + TC_group_commit_entry *next= entry->next; + entry->next= prev; + prev= entry; + entry= next; + } + + return prev; +} + +void +TC_LOG_queued::group_commit_wait_for_wakeup(TC_group_commit_entry *entry) +{ + THD *thd= entry->thd; + pthread_mutex_lock(&thd->LOCK_commit_ordered); + while (!entry->group_commit_ready) + pthread_cond_wait(&thd->COND_commit_ordered, + &thd->LOCK_commit_ordered); + pthread_mutex_unlock(&thd->LOCK_commit_ordered); +} + +void +TC_LOG_queued::group_commit_wakeup_other(TC_group_commit_entry *other) +{ + THD *thd= other->thd; + pthread_mutex_lock(&thd->LOCK_commit_ordered); + other->group_commit_ready= TRUE; + pthread_cond_signal(&thd->COND_commit_ordered); + pthread_mutex_unlock(&thd->LOCK_commit_ordered); +} + +TC_LOG_unordered::TC_LOG_unordered() : group_commit_queue_busy(0) +{ + pthread_cond_init(&COND_queue_busy, 0); +} + +TC_LOG_unordered::~TC_LOG_unordered() +{ + pthread_cond_destroy(&COND_queue_busy); +} + +int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, + bool need_commit_ordered) +{ + int cookie; + struct TC_group_commit_entry entry; + bool is_group_commit_leader; + LINT_INIT(is_group_commit_leader); + + if (need_prepare_ordered) + { + pthread_mutex_lock(&LOCK_prepare_ordered); + run_prepare_ordered(thd, all); + if (need_commit_ordered) + { + /* + Must put us in queue so we can run_commit_ordered() in same sequence + as we did run_prepare_ordered(). + */ + entry.thd= thd; + entry.group_commit_ready= false; + TC_group_commit_entry *previous_queue= group_commit_queue; + entry.next= previous_queue; + group_commit_queue= &entry; + is_group_commit_leader= (previous_queue == NULL); + } + pthread_mutex_unlock(&LOCK_prepare_ordered); + } + + if (xid) + cookie= log_xid(thd, xid); + else + cookie= 0; + + if (need_commit_ordered) + { + if (need_prepare_ordered) + { + /* + We did the run_prepare_ordered() serialised, then ran the log_xid() in + parallel. Now we have to do run_commit_ordered() serialised in the + same sequence as run_prepare_ordered(). + + We do this starting from the head of the queue, each thread doing + run_commit_ordered() and signalling the next in queue. + */ + if (is_group_commit_leader) + { + /* The first in queue starts the ball rolling. */ + pthread_mutex_lock(&LOCK_prepare_ordered); + while (group_commit_queue_busy) + pthread_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered); + TC_group_commit_entry *queue= group_commit_queue; + group_commit_queue= NULL; + /* + Mark the queue busy while we bounce it from one thread to the + next. + */ + group_commit_queue_busy= TRUE; + pthread_mutex_unlock(&LOCK_prepare_ordered); + + queue= reverse_queue(queue); + DBUG_ASSERT(queue == &entry && queue->thd == thd); + } + else + { + /* Not first in queue; just wait until previous thread wakes us up. */ + group_commit_wait_for_wakeup(&entry); + } + } + + /* Only run commit_ordered() if log_xid was successful. */ + if (cookie) + { + pthread_mutex_lock(&LOCK_commit_ordered); + run_commit_ordered(thd, all); + pthread_mutex_unlock(&LOCK_commit_ordered); + } + + if (need_prepare_ordered) + { + TC_group_commit_entry *next= entry.next; + if (next) + { + group_commit_wakeup_other(next); + } + else + { + pthread_mutex_lock(&LOCK_prepare_ordered); + group_commit_queue_busy= FALSE; + pthread_cond_signal(&COND_queue_busy); + pthread_mutex_unlock(&LOCK_prepare_ordered); + } + } + } + + return cookie; +} + + +TC_LOG_group_commit::TC_LOG_group_commit() + : num_commits(0), num_group_commits(0) +{ + my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW, + "LOCK_group_commit", MYF(0)); +} + +TC_LOG_group_commit::~TC_LOG_group_commit() +{ + pthread_mutex_destroy(&LOCK_group_commit); +} + +int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, + bool need_commit_ordered) +{ + IF_DBUG(int err;) + int cookie; + struct TC_group_commit_entry entry; + bool is_group_commit_leader; + + entry.thd= thd; + entry.all= all; + entry.group_commit_ready= false; + entry.xid_error= 0; + + pthread_mutex_lock(&LOCK_prepare_ordered); + TC_group_commit_entry *previous_queue= group_commit_queue; + entry.next= previous_queue; + group_commit_queue= &entry; + + DEBUG_SYNC(thd, "commit_before_prepare_ordered"); + run_prepare_ordered(thd, all); + DEBUG_SYNC(thd, "commit_after_prepare_ordered"); + pthread_mutex_unlock(&LOCK_prepare_ordered); + + is_group_commit_leader= (previous_queue == NULL); + + if (is_group_commit_leader) + { + TC_group_commit_entry *current; + + pthread_mutex_lock(&LOCK_group_commit); + DEBUG_SYNC(thd, "commit_after_get_LOCK_group_commit"); + + pthread_mutex_lock(&LOCK_prepare_ordered); + TC_group_commit_entry *queue= group_commit_queue; + group_commit_queue= NULL; + pthread_mutex_unlock(&LOCK_prepare_ordered); + + /* + Since we enqueue at the head, the queue is actually in reverse order. + So reverse it back into correct commit order before returning. + */ + queue= reverse_queue(queue); + + /* The first in the queue is the leader. */ + DBUG_ASSERT(queue == &entry && queue->thd == thd); + + DEBUG_SYNC(thd, "commit_before_group_log_xid"); + /* This will set individual error codes in each thd->xid_error. */ + group_log_xid(queue); + DEBUG_SYNC(thd, "commit_after_group_log_xid"); + + /* + Call commit_ordered methods for all transactions in the queue + (that did not get an error in group_log_xid()). + + We do this under an additional global LOCK_commit_ordered; this is + so that transactions that do not need 2-phase commit do not have + to wait for the potentially long duration of LOCK_group_commit. + */ + current= queue; + + DEBUG_SYNC(thd, "commit_before_get_LOCK_commit_ordered"); + pthread_mutex_lock(&LOCK_commit_ordered); + /* + We cannot unlock LOCK_group_commit until we have locked + LOCK_commit_ordered; otherwise scheduling could allow the next + group commit to run ahead of us, messing up the order of + commit_ordered() calls. But as soon as LOCK_commit_ordered is + obtained, we can let the next group commit start. + */ + pthread_mutex_unlock(&LOCK_group_commit); + DEBUG_SYNC(thd, "commit_after_release_LOCK_group_commit"); + + ++num_group_commits; + do + { + ++num_commits; + if (!current->xid_error) + run_commit_ordered(current->thd, current->all); + + /* + Careful not to access current->next_commit_ordered after waking up + the other thread! As it may change immediately after wakeup. + */ + TC_group_commit_entry *next= current->next; + if (current != &entry) // Don't wake up ourself + group_commit_wakeup_other(current); + current= next; + } while (current != NULL); + DEBUG_SYNC(thd, "commit_after_group_run_commit_ordered"); + + pthread_mutex_unlock(&LOCK_commit_ordered); + } + else + { + /* If not leader, just wait until leader wakes us up. */ + group_commit_wait_for_wakeup(&entry); + } + + /* + Now that we're back in our own thread context, do any delayed processing + and error reporting. + */ + IF_DBUG(err= entry.xid_error;) + cookie= xid_log_after(&entry); + /* The cookie must be non-zero in the non-error case. */ + DBUG_ASSERT(err || cookie); + + return cookie; +} + + /********* transaction coordinator log for 2pc - mmap() based solution *******/ /* @@ -5878,30 +6565,68 @@ void TC_LOG_BINLOG::close() pthread_cond_destroy (&COND_prep_xids); } -/** - @todo - group commit +/* + Do a binlog log_xid() for a group of transactions, linked through + thd->next_commit_ordered. +*/ +void +TC_LOG_BINLOG::group_log_xid(TC_group_commit_entry *first) +{ + DBUG_ENTER("TC_LOG_BINLOG::group_log_xid"); + trx_group_commit_leader(first); + for (TC_group_commit_entry *entry= first; entry; entry= entry->next) + { + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); + entry->xid_error= trx_data->error; + } + DBUG_VOID_RETURN; +} - @retval - 0 error - @retval - 1 success +int +TC_LOG_BINLOG::xid_log_after(TC_group_commit_entry *entry) +{ + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd_get_ha_data(entry->thd, binlog_hton); + if (trx_group_commit_finish(trx_data)) + return 0; // Returning zero cookie signals error + else + return 1; +} + +/* + After an XID is logged, we need to hold on to the current binlog file until + it is fully committed in the storage engine. The reason is that crash + recovery only looks at the latest binlog, so we must make sure there are no + outstanding prepared (but not committed) transactions before rotating the + binlog. + + To handle this, we keep a count of outstanding XIDs. This function is used + to increase this count when committing one or more transactions to the + binary log. */ -int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid) +void +TC_LOG_BINLOG::mark_xids_active(uint xid_count) { - DBUG_ENTER("TC_LOG_BINLOG::log"); - Xid_log_event xle(thd, xid); - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - /* - We always commit the entire transaction when writing an XID. Also - note that the return value is inverted. - */ - DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE)); + DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active"); + DBUG_PRINT("info", ("xid_count=%u", xid_count)); + pthread_mutex_lock(&LOCK_prep_xids); + prepared_xids+= xid_count; + pthread_mutex_unlock(&LOCK_prep_xids); + DBUG_VOID_RETURN; } -void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +/* + Once an XID is committed, it is safe to rotate the binary log, as it can no + longer be needed during crash recovery. + + This function is called to mark an XID this way. It needs to decrease the + count of pending XIDs, and signal the log rotator thread when it reaches zero. +*/ +void +TC_LOG_BINLOG::mark_xid_done() { + DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done"); pthread_mutex_lock(&LOCK_prep_xids); DBUG_ASSERT(prepared_xids > 0); if (--prepared_xids == 0) { @@ -5909,7 +6634,16 @@ void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) pthread_cond_signal(&COND_prep_xids); } pthread_mutex_unlock(&LOCK_prep_xids); - rotate_and_purge(0); // as ::write() did not rotate + DBUG_VOID_RETURN; +} + +void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +{ + DBUG_ENTER("TC_LOG_BINLOG::unlog"); + if (xid) + mark_xid_done(); + rotate_and_purge(0); // as ::write_transaction_to_binlog() did not rotate + DBUG_VOID_RETURN; } int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle) @@ -5981,6 +6715,72 @@ ulonglong mysql_bin_log_file_pos(void) #endif /* INNODB_COMPATIBILITY_HOOKS */ +static ulonglong binlog_status_var_num_commits; +static ulonglong binlog_status_var_num_group_commits; + +static SHOW_VAR binlog_status_vars_detail[]= +{ + {"commits", + (char *)&binlog_status_var_num_commits, SHOW_LONGLONG}, + {"group_commits", + (char *)&binlog_status_var_num_group_commits, SHOW_LONGLONG}, + {NullS, NullS, SHOW_LONG} +}; + +static int show_binlog_vars(THD *thd, SHOW_VAR *var, char *buff) +{ + mysql_bin_log.set_status_variables(); + var->type= SHOW_ARRAY; + var->value= (char *)&binlog_status_vars_detail; + return 0; +} + +static SHOW_VAR binlog_status_vars_top[]= { + {"binlog", (char *) &show_binlog_vars, SHOW_FUNC}, + {NullS, NullS, SHOW_LONG} +}; + +#ifndef DBUG_OFF +static MYSQL_SYSVAR_ULONG( + dbug_fsync_sleep, + opt_binlog_dbug_fsync_sleep, + PLUGIN_VAR_RQCMDARG, + "Extra sleep (in microseconds) to add to binlog fsync(), for debugging", + NULL, + NULL, + 0, + 0, + ULONG_MAX, + 0); + +static struct st_mysql_sys_var *binlog_sys_vars[]= +{ + MYSQL_SYSVAR(dbug_fsync_sleep), + NULL +}; +#endif + + +/* + Copy out current values of status variables, for SHOW STATUS or + information_schema.global_status. + + This is called only under LOCK_status, so we can fill in a static array. +*/ +void +TC_LOG_BINLOG::set_status_variables() +{ + ulonglong num_commits, num_group_commits; + + pthread_mutex_lock(&LOCK_commit_ordered); + num_commits= this->num_commits; + num_group_commits= this->num_group_commits; + pthread_mutex_unlock(&LOCK_commit_ordered); + + binlog_status_var_num_commits= num_commits; + binlog_status_var_num_group_commits= num_group_commits; +} + struct st_mysql_storage_engine binlog_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; @@ -5995,8 +6795,12 @@ mysql_declare_plugin(binlog) binlog_init, /* Plugin Init */ NULL, /* Plugin Deinit */ 0x0100 /* 1.0 */, - NULL, /* status variables */ + binlog_status_vars_top, /* status variables */ +#ifndef DBUG_OFF + binlog_sys_vars, /* system variables */ +#else NULL, /* system variables */ +#endif NULL /* config options */ } mysql_declare_plugin_end; |