diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 427 |
1 files changed, 346 insertions, 81 deletions
diff --git a/sql/log.cc b/sql/log.cc index d30cf3266f9..44d3869e9d5 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -47,6 +47,19 @@ static int binlog_commit(THD *thd, bool all); static int binlog_rollback(THD *thd, bool all); static int binlog_prepare(THD *thd, bool all); +/* + This is a POD. Please keep it that way! + + Don't add constructors, destructors, or virtual functions. +*/ +struct binlog_trx_data { + bool empty() const { + return pending == NULL && my_b_tell(&trans_log) == 0; + } + IO_CACHE trans_log; // The transaction cache + Rows_log_event *pending; // The pending binrows event +}; + handlerton binlog_hton = { MYSQL_HANDLERTON_INTERFACE_VERSION, "binlog", @@ -92,19 +105,45 @@ bool binlog_init() static int binlog_close_connection(THD *thd) { - IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; - DBUG_ASSERT(mysql_bin_log.is_open() && !my_b_tell(trans_log)); + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + IO_CACHE *trans_log= &trx_data->trans_log; + DBUG_ASSERT(mysql_bin_log.is_open() && trx_data->empty()); close_cached_file(trans_log); - my_free((gptr)trans_log, MYF(0)); + thd->ha_data[binlog_hton.slot]= 0; + my_free((gptr)trx_data, MYF(0)); return 0; } -static int binlog_end_trans(THD *thd, IO_CACHE *trans_log, Log_event *end_ev) +static int +binlog_end_trans(THD *thd, binlog_trx_data *trx_data, Log_event *end_ev) { - int error=0; DBUG_ENTER("binlog_end_trans"); + int error=0; + IO_CACHE *trans_log= &trx_data->trans_log; + if (end_ev) + { + thd->binlog_flush_pending_rows_event(true); error= mysql_bin_log.write(thd, trans_log, end_ev); + } + else + { + thd->binlog_delete_pending_rows_event(); + } + + /* + We need to step the table map version both after writing the + entire transaction to the log file and after rolling back the + transaction. + + We need to step the table map version after writing the + transaction cache to disk. In addition, we need to step the table + map version on a rollback to ensure that a new table map event is + generated instead of the one that was written to the thrown-away + transaction cache. + */ + ++mysql_bin_log.m_table_map_version; statistic_increment(binlog_cache_use, &LOCK_status); if (trans_log->disk_writes != 0) @@ -130,32 +169,36 @@ static int binlog_prepare(THD *thd, bool all) static int binlog_commit(THD *thd, bool all) { - IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; DBUG_ENTER("binlog_commit"); + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + IO_CACHE *trans_log= &trx_data->trans_log; DBUG_ASSERT(mysql_bin_log.is_open() && (all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))); - if (!my_b_tell(trans_log)) + if (trx_data->empty()) { // we're here because trans_log was flushed in MYSQL_LOG::log() DBUG_RETURN(0); } Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE); - DBUG_RETURN(binlog_end_trans(thd, trans_log, &qev)); + DBUG_RETURN(binlog_end_trans(thd, trx_data, &qev)); } static int binlog_rollback(THD *thd, bool all) { - int error=0; - IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; DBUG_ENTER("binlog_rollback"); + int error=0; + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + IO_CACHE *trans_log= &trx_data->trans_log; /* First assert is guaranteed - see trans_register_ha() call below. The second must be true. If it is not, we're registering unnecessary, doing extra work. The cause should be found and eliminated */ DBUG_ASSERT(all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))); - DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log)); + DBUG_ASSERT(mysql_bin_log.is_open() && !trx_data->empty()); /* Update the binary log with a BEGIN/ROLLBACK block if we have cached some queries and we updated some non-transactional @@ -165,10 +208,10 @@ static int binlog_rollback(THD *thd, bool all) if (unlikely(thd->options & OPTION_STATUS_NO_TRANS_UPDATE)) { Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE); - error= binlog_end_trans(thd, trans_log, &qev); + error= binlog_end_trans(thd, trx_data, &qev); } else - error= binlog_end_trans(thd, trans_log, 0); + error= binlog_end_trans(thd, trx_data, 0); DBUG_RETURN(error); } @@ -195,8 +238,10 @@ static int binlog_rollback(THD *thd, bool all) static int binlog_savepoint_set(THD *thd, void *sv) { - IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; DBUG_ENTER("binlog_savepoint_set"); + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + IO_CACHE *trans_log= &trx_data->trans_log; DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log)); *(my_off_t *)sv= my_b_tell(trans_log); @@ -207,8 +252,10 @@ static int binlog_savepoint_set(THD *thd, void *sv) static int binlog_savepoint_rollback(THD *thd, void *sv) { - IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; DBUG_ENTER("binlog_savepoint_rollback"); + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + IO_CACHE *trans_log= &trx_data->trans_log; DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log)); /* @@ -367,6 +414,7 @@ MYSQL_LOG::MYSQL_LOG() :bytes_written(0), last_time(0), query_start(0), name(0), prepared_xids(0), log_type(LOG_CLOSED), file_id(1), open_count(1), write_error(FALSE), inited(FALSE), need_start_event(TRUE), + m_table_map_version(0), description_event_for_exec(0), description_event_for_queue(0) { /* @@ -1363,7 +1411,7 @@ void MYSQL_LOG::new_file(bool need_lock) to change base names at some point. */ THD *thd = current_thd; /* may be 0 if we are reacting to SIGHUP */ - Rotate_log_event r(thd,new_name+dirname_length(new_name), + Rotate_log_event r(new_name+dirname_length(new_name), 0, LOG_EVENT_OFFSET, 0); r.write(&log_file); bytes_written += r.data_written; @@ -1589,6 +1637,162 @@ bool MYSQL_LOG::is_query_in_union(THD *thd, query_id_t query_id_param) query_id_param >= thd->binlog_evt_union.first_query_id); } + +/* + These functions are placed in this file since they need access to + binlog_hton, which has internal linkage. +*/ + +int THD::binlog_setup_trx_data() +{ + DBUG_ENTER("THD::binlog_setup_trx_data"); + binlog_trx_data *trx_data= + (binlog_trx_data*) ha_data[binlog_hton.slot]; + + if (trx_data) + DBUG_RETURN(0); // Already set up + + ha_data[binlog_hton.slot]= trx_data= + (binlog_trx_data*) my_malloc(sizeof(binlog_trx_data), MYF(MY_ZEROFILL)); + if (!trx_data || + open_cached_file(&trx_data->trans_log, mysql_tmpdir, + LOG_PREFIX, binlog_cache_size, MYF(MY_WME))) + { + my_free((gptr)trx_data, MYF(MY_ALLOW_ZERO_PTR)); + ha_data[binlog_hton.slot]= 0; + DBUG_RETURN(1); // Didn't manage to set it up + } + trx_data->trans_log.end_of_file= max_binlog_cache_size; + DBUG_RETURN(0); +} + +Rows_log_event* +THD::binlog_get_pending_rows_event() const +{ + binlog_trx_data *const trx_data= + (binlog_trx_data*) ha_data[binlog_hton.slot]; + /* + This is less than ideal, but here's the story: If there is no + trx_data, prepare_pending_rows_event() has never been called + (since the trx_data is set up there). In that case, we just return + NULL. + */ + return trx_data ? trx_data->pending : NULL; +} + +void +THD::binlog_set_pending_rows_event(Rows_log_event* ev) +{ + binlog_trx_data *const trx_data= + (binlog_trx_data*) ha_data[binlog_hton.slot]; + DBUG_ASSERT(trx_data); + trx_data->pending= ev; +} + + +/* + Moves the last bunch of rows from the pending Rows event to the binlog + (either cached binlog if transaction, or disk binlog). Sets a new pending + event. +*/ +int MYSQL_LOG::flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event) +{ + DBUG_ENTER("MYSQL_LOG::flush_and_set_pending_rows_event(event)"); + DBUG_ASSERT(binlog_row_based && mysql_bin_log.is_open()); + DBUG_PRINT("enter", ("event=%p", event)); + + int error= 0; + + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + + DBUG_ASSERT(trx_data); + + if (Rows_log_event* pending= trx_data->pending) + { + IO_CACHE *file= &log_file; + + /* + Decide if we should write to the log file directly or to the + transaction log. + */ + if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log)) + file= &trx_data->trans_log; + + /* + If we are writing to the log file directly, we could avoid + locking the log. This does not work since we need to step the + m_table_map_version below, and that change has to be protected + by the LOCK_log mutex. + */ + pthread_mutex_lock(&LOCK_log); + + /* + Write a table map if necessary + */ + if (pending->maybe_write_table_map(thd, file, this)) + { + pthread_mutex_unlock(&LOCK_log); + DBUG_RETURN(2); + } + + /* + Write pending event to log file or transaction cache + */ + if (pending->write(file)) + { + pthread_mutex_unlock(&LOCK_log); + DBUG_RETURN(1); + } + + /* + We step the table map version if we are writing an event + representing the end of a statement. We do this regardless of + wheather we write to the transaction cache or to directly to the + file. + + In an ideal world, we could avoid stepping the table map version + if we were writing to a transaction cache, since we could then + reuse the table map that was written earlier in the transaction + cache. This does not work since STMT_END_F implies closing all + table mappings on the slave side. + + TODO: Find a solution so that table maps does not have to be + written several times within a transaction. + */ + if (pending->get_flags(Rows_log_event::STMT_END_F)) + ++m_table_map_version; + + delete pending; + + if (file == &log_file) + { + error= flush_and_sync(); + if (!error) + { + signal_update(); + rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); + } + } + + pthread_mutex_unlock(&LOCK_log); + } + else if (event && event->get_cache_stmt()) /* && pending == 0 */ + { + /* + If we are setting a non-null event for a table that is + transactional, we start a transaction here as well. + */ + trans_register_ha(thd, + thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN), + &binlog_hton); + } + + trx_data->pending= event; + + DBUG_RETURN(error); +} + /* Write an event to the binary log */ @@ -1609,7 +1813,29 @@ bool MYSQL_LOG::write(Log_event *event_info) thd->binlog_evt_union.unioned_events_trans |= event_info->cache_stmt; DBUG_RETURN(0); } - + + /* + Flush the pending rows event to the transaction cache or to the + log file. Since this function potentially aquire the LOCK_log + mutex, we do this before aquiring the LOCK_log mutex in this + function. + + This is not optimal, but necessary in the current implementation + since there is code that writes rows to system tables without + using some way to flush the pending event (e.g., binlog_query()). + + TODO: There shall be no writes to any system table after calling + binlog_query(), so these writes has to be moved to before the call + of binlog_query() for correct functioning. + + This is necessesary not only for RBR, but the master might crash + after binlogging the query but before changing the system tables. + This means that the slave and the master are not in the same state + (after the master has restarted), so therefore we have to + eliminate this problem. + */ + thd->binlog_flush_pending_rows_event(true); + pthread_mutex_lock(&LOCK_log); /* @@ -1649,37 +1875,26 @@ bool MYSQL_LOG::write(Log_event *event_info) */ if (opt_using_transactions && thd) { - IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; + if (thd->binlog_setup_trx_data()) + goto err; - if (event_info->get_cache_stmt()) - { - if (!trans_log) - { - thd->ha_data[binlog_hton.slot]= trans_log= (IO_CACHE *) - my_malloc(sizeof(IO_CACHE), MYF(MY_ZEROFILL)); - if (!trans_log || open_cached_file(trans_log, mysql_tmpdir, - LOG_PREFIX, - binlog_cache_size, MYF(MY_WME))) - { - my_free((gptr)trans_log, MYF(MY_ALLOW_ZERO_PTR)); - thd->ha_data[binlog_hton.slot]= trans_log= 0; - goto err; - } - trans_log->end_of_file= max_binlog_cache_size; - trans_register_ha(thd, - thd->options & (OPTION_NOT_AUTOCOMMIT | - OPTION_BEGIN), - &binlog_hton); - } - else if (!my_b_tell(trans_log)) - trans_register_ha(thd, - thd->options & (OPTION_NOT_AUTOCOMMIT | - OPTION_BEGIN), - &binlog_hton); - file= trans_log; - } - else if (trans_log && my_b_tell(trans_log)) + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + IO_CACHE *trans_log= &trx_data->trans_log; + + if (event_info->get_cache_stmt() && !my_b_tell(trans_log)) + trans_register_ha(thd, + thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN), + &binlog_hton); + + if (event_info->get_cache_stmt() || my_b_tell(trans_log)) file= trans_log; + /* + Note: as Mats suggested, for all the cases above where we write to + trans_log, it sounds unnecessary to lock LOCK_log. We should rather + test first if we want to write to trans_log, and if not, lock + LOCK_log. TODO. + */ } #endif DBUG_PRINT("info",("event type=%d",event_info->get_type_code())); @@ -1694,42 +1909,49 @@ bool MYSQL_LOG::write(Log_event *event_info) of the SQL command */ + /* + If row-based binlogging, Insert_id, Rand and other kind of "setting + context" events are not needed. + */ if (thd) { - if (thd->last_insert_id_used) + if (!binlog_row_based) { - Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, - thd->current_insert_id); - if (e.write(file)) - goto err; - } - if (thd->insert_id_used) - { - Intvar_log_event e(thd,(uchar) INSERT_ID_EVENT,thd->last_insert_id); - if (e.write(file)) - goto err; - } - if (thd->rand_used) - { - Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2); - if (e.write(file)) - goto err; - } - if (thd->user_var_events.elements) - { - for (uint i= 0; i < thd->user_var_events.elements; i++) - { - BINLOG_USER_VAR_EVENT *user_var_event; - get_dynamic(&thd->user_var_events,(gptr) &user_var_event, i); - User_var_log_event e(thd, user_var_event->user_var_event->name.str, - user_var_event->user_var_event->name.length, - user_var_event->value, - user_var_event->length, - user_var_event->type, - user_var_event->charset_number); - if (e.write(file)) - goto err; - } + if (thd->last_insert_id_used) + { + Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, + thd->current_insert_id); + if (e.write(file)) + goto err; + } + if (thd->insert_id_used) + { + Intvar_log_event e(thd,(uchar) INSERT_ID_EVENT,thd->last_insert_id); + if (e.write(file)) + goto err; + } + if (thd->rand_used) + { + Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2); + if (e.write(file)) + goto err; + } + if (thd->user_var_events.elements) + { + for (uint i= 0; i < thd->user_var_events.elements; i++) + { + BINLOG_USER_VAR_EVENT *user_var_event; + get_dynamic(&thd->user_var_events,(gptr) &user_var_event, i); + User_var_log_event e(thd, user_var_event->user_var_event->name.str, + user_var_event->user_var_event->name.length, + user_var_event->value, + user_var_event->length, + user_var_event->type, + user_var_event->charset_number); + if (e.write(file)) + goto err; + } + } } } @@ -1760,6 +1982,9 @@ err: } } + if (event_info->flags & LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F) + ++m_table_map_version; + pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -2307,6 +2532,44 @@ void MYSQL_LOG::signal_update() DBUG_VOID_RETURN; } +#ifndef MYSQL_CLIENT +bool MYSQL_LOG::write_table_map(THD *thd, IO_CACHE *file, TABLE* table, + bool is_transactional) +{ + DBUG_ENTER("MYSQL_LOG::write_table_map()"); + DBUG_PRINT("enter", ("table=%p (%s: %u)", + table, table->s->table_name, table->s->table_map_id)); + + /* Pre-conditions */ + DBUG_ASSERT(binlog_row_based && is_open()); + DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); + +#ifndef DBUG_OFF + /* + We only need to execute under the LOCK_log mutex if we are writing + to the log file; otherwise, we are writing to a thread-specific + transaction cache and there is no need to serialize this event + with events in other threads. + */ + if (file == &log_file) + safe_mutex_assert_owner(&LOCK_log); +#endif + + Table_map_log_event::flag_set const + flags= Table_map_log_event::NO_FLAGS; + + Table_map_log_event + the_event(thd, table, table->s->table_map_id, is_transactional, flags); + + if (the_event.write(file)) + DBUG_RETURN(1); + + table->s->table_map_version= m_table_map_version; + DBUG_RETURN(0); +} +#endif /* !defined(MYSQL_CLIENT) */ + + #ifdef __NT__ void print_buffer_to_nt_eventlog(enum loglevel level, char *buff, uint length, int buffLen) @@ -3013,9 +3276,11 @@ void TC_LOG_BINLOG::close() */ int TC_LOG_BINLOG::log(THD *thd, my_xid xid) { + DBUG_ENTER("TC_LOG_BINLOG::log"); Xid_log_event xle(thd, xid); - IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; - return !binlog_end_trans(thd, trans_log, &xle); // invert return value + binlog_trx_data *trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle)); // invert return value } void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) |