diff options
author | Alfranio Correia <alfranio.correia@sun.com> | 2009-11-03 19:02:56 +0000 |
---|---|---|
committer | Alfranio Correia <alfranio.correia@sun.com> | 2009-11-03 19:02:56 +0000 |
commit | 19c380aaff1f1f3c0d21ac0c18904c21d7bdce76 (patch) | |
tree | 0e262b0d25432c224e2d4c8e1a1d608f78444771 /sql | |
parent | 97565b8d1ab24b6f861300c75a4f4f3c7d711be6 (diff) | |
download | mariadb-git-19c380aaff1f1f3c0d21ac0c18904c21d7bdce76.tar.gz |
WL#2687 WL#5072 BUG#40278 BUG#47175
Non-transactional updates that take place inside a transaction present problems
for logging because they are visible to other clients before the transaction
is committed, and they are not rolled back even if the transaction is rolled
back. It is not always possible to log correctly in statement format when both
transactional and non-transactional tables are used in the same transaction.
In the current patch, we ensure that such scenario is completely safe under the
ROW and MIXED modes.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 2 | ||||
-rw-r--r-- | sql/handler.cc | 32 | ||||
-rw-r--r-- | sql/log.cc | 1041 | ||||
-rw-r--r-- | sql/log.h | 10 | ||||
-rw-r--r-- | sql/log_event.cc | 113 | ||||
-rw-r--r-- | sql/log_event.h | 58 | ||||
-rw-r--r-- | sql/log_event_old.cc | 9 | ||||
-rw-r--r-- | sql/mysql_priv.h | 3 | ||||
-rw-r--r-- | sql/share/errmsg.txt | 2 | ||||
-rw-r--r-- | sql/sp.cc | 2 | ||||
-rw-r--r-- | sql/sp_head.cc | 2 | ||||
-rw-r--r-- | sql/sql_acl.cc | 2 | ||||
-rw-r--r-- | sql/sql_base.cc | 4 | ||||
-rw-r--r-- | sql/sql_class.cc | 120 | ||||
-rw-r--r-- | sql/sql_class.h | 35 | ||||
-rw-r--r-- | sql/sql_db.cc | 10 | ||||
-rw-r--r-- | sql/sql_delete.cc | 20 | ||||
-rw-r--r-- | sql/sql_insert.cc | 37 | ||||
-rw-r--r-- | sql/sql_lex.cc | 3 | ||||
-rw-r--r-- | sql/sql_lex.h | 7 | ||||
-rw-r--r-- | sql/sql_load.cc | 4 | ||||
-rw-r--r-- | sql/sql_table.cc | 10 | ||||
-rw-r--r-- | sql/sql_update.cc | 33 | ||||
-rw-r--r-- | sql/sql_view.cc | 2 |
24 files changed, 1011 insertions, 550 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index aba71b6f61d..cc4f5a9c2da 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -1856,7 +1856,7 @@ static void ndb_binlog_query(THD *thd, Cluster_schema *schema) thd->db= schema->db; int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED); thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query, - schema->query_length, FALSE, + schema->query_length, FALSE, TRUE, schema->name[0] == 0 || thd->db[0] == 0, errcode); thd->server_id= thd_server_id_save; diff --git a/sql/handler.cc b/sql/handler.cc index e3d22fe1bec..abffdaf49d1 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -4537,7 +4537,21 @@ static int write_locked_table_maps(THD *thd) if (table->current_lock == F_WRLCK && check_table_binlog_row_based(thd, table)) { - int const has_trans= table->file->has_transactions(); + /* + We need to have a transactional behavior for SQLCOM_CREATE_TABLE + (e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a + compatible behavior with the STMT based replication even when + the table is not transactional. In other words, if the operation + fails while executing the insert phase nothing is written to the + binlog. + + Note that at this point, we check the type of a set of tables to + create the table map events. In the function binlog_log_row(), + which calls the current function, we check the type of the table + of the current row. + */ + bool const has_trans= thd->lex->sql_command == SQLCOM_CREATE_TABLE || + table->file->has_transactions(); int const error= thd->binlog_write_table_map(table, has_trans); /* If an error occurs, it is the responsibility of the caller to @@ -4586,10 +4600,20 @@ static int binlog_log_row(TABLE* table, { bitmap_set_all(&cols); if (likely(!(error= write_locked_table_maps(thd)))) - error= (*log_func)(thd, table, table->file->has_transactions(), - &cols, table->s->fields, + { + /* + We need to have a transactional behavior for SQLCOM_CREATE_TABLE + (i.e. CREATE TABLE... SELECT * FROM TABLE) in order to keep a + compatible behavior with the STMT based replication even when + the table is not transactional. In other words, if the operation + fails while executing the insert phase nothing is written to the + binlog. + */ + bool const has_trans= thd->lex->sql_command == SQLCOM_CREATE_TABLE || + table->file->has_transactions(); + error= (*log_func)(thd, table, has_trans, &cols, table->s->fields, before_record, after_record); - + } if (!use_bitbuf) bitmap_free(&cols); } diff --git a/sql/log.cc b/sql/log.cc index 491030aca34..fa063196e14 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -148,112 +148,155 @@ private: }; /* - Helper class to store binary log transaction data. + Helper classes to store non-transactional and transactional data + before copying it to the binary log. */ -class binlog_trx_data { +class binlog_cache_data +{ public: - binlog_trx_data() - : at_least_one_stmt(0), incident(FALSE), m_pending(0), - before_stmt_pos(MY_OFF_T_UNDEF) + binlog_cache_data(): m_pending(0), before_stmt_pos (MY_OFF_T_UNDEF), + incident(FALSE) { - trans_log.end_of_file= max_binlog_cache_size; + cache_log.end_of_file= max_binlog_cache_size; } - ~binlog_trx_data() + ~binlog_cache_data() { - DBUG_ASSERT(pending() == NULL); - close_cached_file(&trans_log); + DBUG_ASSERT(empty()); + close_cached_file(&cache_log); } - my_off_t position() const { - return my_b_tell(&trans_log); + bool empty() const + { + return pending() == NULL && my_b_tell(&cache_log) == 0; } - bool empty() const + Rows_log_event *pending() const { - return pending() == NULL && my_b_tell(&trans_log) == 0; + return m_pending; } - /* - Truncate the transaction cache to a certain position. This - includes deleting the pending event. - */ - void truncate(my_off_t pos) + void set_pending(Rows_log_event *const pending) { - DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos)); - DBUG_PRINT("info", ("before_stmt_pos=%lu", (ulong) pos)); - delete pending(); - set_pending(0); - reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0); - trans_log.end_of_file= max_binlog_cache_size; - if (pos < before_stmt_pos) - before_stmt_pos= MY_OFF_T_UNDEF; + m_pending= pending; + } - /* - The only valid positions that can be truncated to are at the - beginning of a statement. We are relying on this fact to be able - to set the at_least_one_stmt flag correctly. In other word, if - we are truncating to the beginning of the transaction cache, - there will be no statements in the cache, otherwhise, we will - have at least one statement in the transaction cache. - */ - at_least_one_stmt= (pos > 0); + void set_incident(void) + { + incident= TRUE; + } + + bool has_incident(void) + { + return(incident); } - /* - Reset the entire contents of the transaction cache, emptying it - completely. - */ - void reset() { - if (!empty()) - truncate(0); - before_stmt_pos= MY_OFF_T_UNDEF; + void reset() + { + truncate(0); incident= FALSE; - trans_log.end_of_file= max_binlog_cache_size; + before_stmt_pos= MY_OFF_T_UNDEF; + cache_log.end_of_file= max_binlog_cache_size; DBUG_ASSERT(empty()); } - Rows_log_event *pending() const + my_off_t get_byte_position() const { - return m_pending; + return my_b_tell(&cache_log); } - void set_pending(Rows_log_event *const pending) + my_off_t get_prev_position() { - m_pending= pending; + return(before_stmt_pos); } - IO_CACHE trans_log; // The transaction cache - - void set_incident(void) + void set_prev_position(my_off_t pos) { - incident= TRUE; + before_stmt_pos= pos; } - bool has_incident(void) + void restore_prev_position() { - return(incident); + truncate(before_stmt_pos); + } + + void restore_savepoint(my_off_t pos) + { + truncate(pos); + if (pos < before_stmt_pos) + before_stmt_pos= MY_OFF_T_UNDEF; } - /** - Boolean that is true if there is at least one statement in the - transaction cache. + /* + Cache to store data before copying it to the binary log. */ - bool at_least_one_stmt; - bool incident; + IO_CACHE cache_log; private: /* - Pending binrows event. This event is the event where the rows are - currently written. + Pending binrows event. This event is the event where the rows are currently + written. */ Rows_log_event *m_pending; -public: /* Binlog position before the start of the current statement. */ my_off_t before_stmt_pos; + + /* + This indicates that some events did not get into the cache and most likely + it is corrupted. + */ + bool incident; + + /* + It truncates the cache to a certain position. This includes deleting the + pending event. + */ + void truncate(my_off_t pos) + { + DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos)); + if (pending()) + { + delete pending(); + set_pending(0); + } + reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, 0); + cache_log.end_of_file= max_binlog_cache_size; + } + + binlog_cache_data& operator=(const binlog_cache_data& info); + binlog_cache_data(const binlog_cache_data& info); +}; + +class binlog_cache_mngr { +public: + binlog_cache_mngr() {} + + void reset_cache(binlog_cache_data* cache_data) + { + cache_data->reset(); + } + + binlog_cache_data* get_binlog_cache_data(bool is_transactional) + { + return (is_transactional ? &trx_cache : &stmt_cache); + } + + IO_CACHE* get_binlog_cache_log(bool is_transactional) + { + return (is_transactional ? &trx_cache.cache_log : &stmt_cache.cache_log); + } + + binlog_cache_data stmt_cache; + + binlog_cache_data trx_cache; + +private: + + binlog_cache_mngr& operator=(const binlog_cache_mngr& info); + binlog_cache_mngr(const binlog_cache_mngr& info); }; handlerton *binlog_hton; @@ -1265,26 +1308,6 @@ int LOGGER::set_handlers(uint error_log_printer, return 0; } -/** - This function checks if a transactional talbe was updated by the - current statement. - - @param thd The client thread that executed the current statement. - @return - @c true if a transactional table was updated, @false otherwise. -*/ -static bool stmt_has_updated_trans_table(THD *thd) -{ - Ha_trx_info *ha_info; - - for (ha_info= thd->transaction.stmt.ha_list; ha_info; ha_info= ha_info->next()) - { - if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton) - return (TRUE); - } - return (FALSE); -} - /* Save position of binary log transaction cache. @@ -1307,10 +1330,10 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos) DBUG_ASSERT(pos != NULL); if (thd_get_ha_data(thd, binlog_hton) == NULL) thd->binlog_setup_trx_data(); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); DBUG_ASSERT(mysql_bin_log.is_open()); - *pos= trx_data->position(); + *pos= cache_mngr->trx_cache.get_byte_position(); DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos)); DBUG_VOID_RETURN; } @@ -1341,9 +1364,9 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos) /* Only true if binlog_trans_log_savepos() wasn't called before */ DBUG_ASSERT(pos != ~(my_off_t) 0); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - trx_data->truncate(pos); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + cache_mngr->trx_cache.restore_savepoint(pos); DBUG_VOID_RETURN; } @@ -1372,115 +1395,127 @@ int binlog_init(void *p) static int binlog_close_connection(handlerton *hton, THD *thd) { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - DBUG_ASSERT(trx_data->empty()); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + DBUG_ASSERT(cache_mngr->trx_cache.empty() && cache_mngr->stmt_cache.empty()); thd_set_ha_data(thd, binlog_hton, NULL); - trx_data->~binlog_trx_data(); - my_free((uchar*)trx_data, MYF(0)); + cache_mngr->~binlog_cache_mngr(); + my_free((uchar*)cache_mngr, MYF(0)); return 0; } -/* - End a transaction. +/** + This function flushes a transactional cache upon commit/rollback. - SYNOPSIS - binlog_end_trans() + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache data to be flushed + @param end_ev The end event either commit/rollback. - thd The thread whose transaction should be ended - trx_data Pointer to the transaction data to use - end_ev The end event to use, or NULL - all True if the entire transaction should be ended, false if - only the statement transaction should be ended. + @return + nonzero if an error pops up when flushing the transactional cache. +*/ +static int +binlog_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, + Log_event *end_ev) +{ + DBUG_ENTER("binlog_flush_trx_cache"); + int error=0; + IO_CACHE *cache_log= &cache_mngr->trx_cache.cache_log; - DESCRIPTION + /* + This function handles transactional changes and as such + this flag equals to true. + */ + bool const is_transactional= TRUE; - End the currently open transaction. The transaction can be either - a real transaction (if 'all' is true) or a statement transaction - (if 'all' is false). + if (thd->binlog_flush_pending_rows_event(TRUE, is_transactional)) + DBUG_RETURN(1); + /* + Doing a commit or a rollback including non-transactional tables, + i.e., ending a transaction where we might write the transaction + cache to the binary log. + + We can always end the statement when ending a transaction since + transactions are not allowed inside stored functions. If they + were, we would have to ensure that we're not ending a statement + inside a stored function. + */ + error= mysql_bin_log.write(thd, &cache_mngr->trx_cache.cache_log, end_ev, + cache_mngr->trx_cache.has_incident()); + cache_mngr->reset_cache(&cache_mngr->trx_cache); - If 'end_ev' is NULL, the transaction is a rollback of only - transactional tables, so the transaction cache will be truncated - to either just before the last opened statement transaction (if - 'all' is false), or reset completely (if 'all' is true). - */ + /* + We need to step the table map version after writing the + transaction cache to disk. + */ + mysql_bin_log.update_table_map_version(); + statistic_increment(binlog_cache_use, &LOCK_status); + if (cache_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + cache_log->disk_writes= 0; + } + + DBUG_ASSERT(cache_mngr->trx_cache.empty()); + DBUG_RETURN(error); +} + +/** + This function truncates the transactional cache upon committing or rolling + back either a transaction or a statement. + + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache data to be flushed + @param all @c true means truncate the transaction, otherwise the + statement must be truncated. + + @return + nonzero if an error pops up when truncating the transactional cache. +*/ static int -binlog_end_trans(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev, bool all) +binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) { - DBUG_ENTER("binlog_end_trans"); + DBUG_ENTER("binlog_truncate_trx_cache"); int error=0; - IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_PRINT("enter", ("transaction: %s end_ev: 0x%lx", - all ? "all" : "stmt", (long) end_ev)); - DBUG_PRINT("info", ("thd->options={ %s%s}", - FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), - FLAGSTR(thd->options, OPTION_BEGIN))); + /* + This function handles transactional changes and as such this flag + equals to true. + */ + bool const is_transactional= TRUE; + DBUG_PRINT("info", ("thd->options={ %s%s}, transaction: %s", + FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->options, OPTION_BEGIN), + all ? "all" : "stmt")); /* - NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of - only transactional tables. If the transaction contain changes to - any non-transactiona tables, we need write the transaction and log - a ROLLBACK last. + If rolling back an entire transaction or a single statement not + inside a transaction, we reset the transaction cache. */ - if (end_ev != NULL) + thd->binlog_remove_pending_rows_event(TRUE, is_transactional); + if (all || !thd->in_multi_stmt_transaction()) { - if (thd->binlog_flush_pending_rows_event(TRUE)) - DBUG_RETURN(1); - /* - Doing a commit or a rollback including non-transactional tables, - i.e., ending a transaction where we might write the transaction - cache to the binary log. - - We can always end the statement when ending a transaction since - transactions are not allowed inside stored functions. If they - were, we would have to ensure that we're not ending a statement - inside a stored function. - */ - error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev, - trx_data->has_incident()); - trx_data->reset(); + if (cache_mngr->trx_cache.has_incident()) + error= mysql_bin_log.write_incident(thd, TRUE); - /* - We need to step the table map version after writing the - transaction cache to disk. - */ - mysql_bin_log.update_table_map_version(); - statistic_increment(binlog_cache_use, &LOCK_status); - if (trans_log->disk_writes != 0) - { - statistic_increment(binlog_cache_disk_use, &LOCK_status); - trans_log->disk_writes= 0; - } + cache_mngr->reset_cache(&cache_mngr->trx_cache); + + thd->clear_binlog_table_maps(); } + /* + If rolling back a statement in a transaction, we truncate the + transaction cache to remove the statement. + */ else - { - /* - If rolling back an entire transaction or a single statement not - inside a transaction, we reset the transaction cache. + cache_mngr->trx_cache.restore_prev_position(); - If rolling back a statement in a transaction, we truncate the - transaction cache to remove the statement. - */ - thd->binlog_remove_pending_rows_event(TRUE); - if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) - { - if (trx_data->has_incident()) - mysql_bin_log.write_incident(thd, TRUE); - trx_data->reset(); - } - else // ...statement - trx_data->truncate(trx_data->before_stmt_pos); - - /* - We need to step the table map version on a rollback to ensure - that a new table map event is generated instead of the one that - was written to the thrown-away transaction cache. - */ - mysql_bin_log.update_table_map_version(); - } + /* + We need to step the table map version on a rollback to ensure that a new + table map event is generated instead of the one that was written to the + thrown-away transaction cache. + */ + mysql_bin_log.update_table_map_version(); - DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); + DBUG_ASSERT(thd->binlog_get_pending_rows_event(is_transactional) == NULL); DBUG_RETURN(error); } @@ -1496,10 +1531,56 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) } /** + This function flushes the non-transactional to the binary log upon + committing or rolling back a statement. + + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache data to be flushed + + @return + nonzero if an error pops up when flushing the non-transactional cache. +*/ +static int +binlog_flush_stmt_cache(THD *thd, binlog_cache_mngr *cache_mngr) +{ + int error= 0; + DBUG_ENTER("binlog_flush_stmt_cache"); + /* + If we are flushing the statement cache, it means that the changes get + through otherwise the cache is empty and this routine should not be called. + */ + DBUG_ASSERT(cache_mngr->stmt_cache.has_incident() == FALSE); + /* + This function handles non-transactional changes and as such this flag equals + to false. + */ + bool const is_transactional= FALSE; + IO_CACHE *cache_log= &cache_mngr->stmt_cache.cache_log; + thd->binlog_flush_pending_rows_event(TRUE, is_transactional); + Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE, TRUE, 0); + if ((error= mysql_bin_log.write(thd, cache_log, &qev, + cache_mngr->stmt_cache.has_incident()))) + DBUG_RETURN(error); + cache_mngr->reset_cache(&cache_mngr->stmt_cache); + + /* + We need to step the table map version after writing the + transaction cache to disk. + */ + mysql_bin_log.update_table_map_version(); + statistic_increment(binlog_cache_use, &LOCK_status); + if (cache_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + cache_log->disk_writes= 0; + } + DBUG_RETURN(error); +} + +/** This function is called once after each statement. - It has the responsibility to flush the transaction cache to the - binlog file on commits. + It has the responsibility to flush the caches to the binary log on commits. @param hton The binlog handlerton. @param thd The client thread that executes the transaction. @@ -1512,54 +1593,53 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) { int error= 0; DBUG_ENTER("binlog_commit"); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + bool const in_transaction= thd->in_multi_stmt_transaction(); + + DBUG_PRINT("debug", + ("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", + all, + YESNO(in_transaction), + YESNO(thd->transaction.all.modified_non_trans_table), + YESNO(thd->transaction.stmt.modified_non_trans_table))); + + if (!cache_mngr->stmt_cache.empty()) + { + binlog_flush_stmt_cache(thd, cache_mngr); + } - if (trx_data->empty()) + if (cache_mngr->trx_cache.empty()) { - // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid() - trx_data->reset(); + /* + we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() + */ + cache_mngr->reset_cache(&cache_mngr->trx_cache); DBUG_RETURN(0); } /* We commit the transaction if: - - We are not in a transaction and committing a statement, or - - - We are in a transaction and a full transaction is committed - - Otherwise, we accumulate the statement + - We are in a transaction and a full transaction is committed. + Otherwise, we accumulate the changes. */ - ulonglong const in_transaction= - thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN); - DBUG_PRINT("debug", - ("all: %d, empty: %s, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", - all, - YESNO(trx_data->empty()), - YESNO(in_transaction), - YESNO(thd->transaction.all.modified_non_trans_table), - YESNO(thd->transaction.stmt.modified_non_trans_table))); if (!in_transaction || all) { - Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); - error= binlog_end_trans(thd, trx_data, &qev, all); - goto end; + Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE, TRUE, 0); + error= binlog_flush_trx_cache(thd, cache_mngr, &qev); } -end: - if (!all) - trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt commit - DBUG_RETURN(error); -} + /* + This is part of the stmt rollback. + */ + if (!all) + cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); + DBUG_RETURN(error); + } /** - This function is called when a transaction involving a transactional - table is rolled back. - - It has the responsibility to flush the transaction cache to the - binlog file. However, if the transaction does not involve - non-transactional tables, nothing needs to be logged. + This function is called when a transaction or a statement is rolled back. @param hton The binlog handlerton. @param thd The client thread that executes the transaction. @@ -1572,18 +1652,38 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) { DBUG_ENTER("binlog_rollback"); int error=0; - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - - if (trx_data->empty()) { - trx_data->reset(); - DBUG_RETURN(0); - } + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", YESNO(all), YESNO(thd->transaction.all.modified_non_trans_table), YESNO(thd->transaction.stmt.modified_non_trans_table))); + + /* + If an incident event is set we do not flush the content of the statement + cache because it may be corrupted. + */ + if (cache_mngr->stmt_cache.has_incident()) + { + mysql_bin_log.write_incident(thd, TRUE); + cache_mngr->reset_cache(&cache_mngr->stmt_cache); + } + else if (!cache_mngr->stmt_cache.empty()) + { + binlog_flush_stmt_cache(thd, cache_mngr); + } + + if (cache_mngr->trx_cache.empty()) + { + /* + we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() + */ + cache_mngr->reset_cache(&cache_mngr->trx_cache); + DBUG_RETURN(0); + } + + if (mysql_bin_log.check_write_error(thd)) { /* @@ -1594,49 +1694,46 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) */ DBUG_ASSERT(!all); /* - We reach this point if either only transactional tables were modified or - the effect of a statement that did not get into the binlog needs to be - rolled back. In the latter case, if a statement changed non-transactional - tables or had the OPTION_KEEP_LOG associated, we write an incident event - to the binlog in order to stop slaves and notify users that some changes - on the master did not get into the binlog and slaves will be inconsistent. - On the other hand, if a statement is transactional, we just safely roll it - back. + We reach this point if the effect of a statement did not properly get into + a cache and need to be rolled back. */ - if ((thd->transaction.stmt.modified_non_trans_table || - (thd->options & OPTION_KEEP_LOG)) && - mysql_bin_log.check_write_error(thd)) - trx_data->set_incident(); - error= binlog_end_trans(thd, trx_data, 0, all); + error= binlog_truncate_trx_cache(thd, cache_mngr, all); } else - { - /* - We flush the cache with a rollback, wrapped in a beging/rollback if: + { + /* + We flush the cache wrapped in a beging/rollback if: . aborting a transcation that modified a non-transactional table or; . aborting a statement that modified both transactional and - non-transctional tables but which is not in the boundaries of any - transaction; + non-transctional tables but which is not in the boundaries of any + transaction; . the OPTION_KEEP_LOG is activate. */ - if ((all && thd->transaction.all.modified_non_trans_table) || + if (thd->variables.binlog_format == BINLOG_FORMAT_STMT && + ((all && thd->transaction.all.modified_non_trans_table) || (!all && thd->transaction.stmt.modified_non_trans_table && - !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) || - ((thd->options & OPTION_KEEP_LOG))) + !thd->in_multi_stmt_transaction()) || + (thd->options & OPTION_KEEP_LOG))) { - Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); - error= binlog_end_trans(thd, trx_data, &qev, all); + Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE, TRUE, 0); + error= binlog_flush_trx_cache(thd, cache_mngr, &qev); } /* Otherwise, we simply truncate the cache as there is no change on non-transactional tables as follows. */ - else if ((all && !thd->transaction.all.modified_non_trans_table) || - (!all && !thd->transaction.stmt.modified_non_trans_table)) - error= binlog_end_trans(thd, trx_data, 0, all); + else if (all || (!all && + (!thd->transaction.stmt.modified_non_trans_table || + !thd->in_multi_stmt_transaction() || + thd->variables.binlog_format != BINLOG_FORMAT_STMT))) + error= binlog_truncate_trx_cache(thd, cache_mngr, all); } + + /* + This is part of the stmt rollback. + */ if (!all) - trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt rollback + cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); DBUG_RETURN(error); } @@ -1712,7 +1809,8 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); int const error= thd->binlog_query(THD::STMT_QUERY_TYPE, - thd->query, thd->query_length, TRUE, FALSE, errcode); + thd->query, thd->query_length, TRUE, FALSE, FALSE, + errcode); DBUG_RETURN(error); } @@ -1731,7 +1829,8 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); int error= thd->binlog_query(THD::STMT_QUERY_TYPE, - thd->query, thd->query_length, TRUE, FALSE, errcode); + thd->query, thd->query_length, TRUE, FALSE, FALSE, + errcode); DBUG_RETURN(error); } binlog_trans_log_truncate(thd, *(my_off_t*)sv); @@ -3737,27 +3836,67 @@ bool MYSQL_BIN_LOG::is_query_in_union(THD *thd, query_id_t query_id_param) int THD::binlog_setup_trx_data() { DBUG_ENTER("THD::binlog_setup_trx_data"); - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); - if (trx_data) + if (cache_mngr) DBUG_RETURN(0); // Already set up - trx_data= (binlog_trx_data*) my_malloc(sizeof(binlog_trx_data), MYF(MY_ZEROFILL)); - if (!trx_data || - open_cached_file(&trx_data->trans_log, mysql_tmpdir, + cache_mngr= (binlog_cache_mngr*) my_malloc(sizeof(binlog_cache_mngr), MYF(MY_ZEROFILL)); + if (!cache_mngr || + open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir, + LOG_PREFIX, binlog_cache_size, MYF(MY_WME)) || + open_cached_file(&cache_mngr->trx_cache.cache_log, mysql_tmpdir, LOG_PREFIX, binlog_cache_size, MYF(MY_WME))) { - my_free((uchar*)trx_data, MYF(MY_ALLOW_ZERO_PTR)); + my_free((uchar*)cache_mngr, MYF(MY_ALLOW_ZERO_PTR)); DBUG_RETURN(1); // Didn't manage to set it up } - thd_set_ha_data(this, binlog_hton, trx_data); + thd_set_ha_data(this, binlog_hton, cache_mngr); - trx_data= new (thd_get_ha_data(this, binlog_hton)) binlog_trx_data; + cache_mngr= new (thd_get_ha_data(this, binlog_hton)) binlog_cache_mngr; DBUG_RETURN(0); } +/** + This function checks if a transactional talbe was updated by the + current transaction. + + @param thd The client thread that executed the current statement. + @return + @c true if a transactional table was updated, @false otherwise. +*/ +bool +trans_has_updated_trans_table(THD* thd) +{ + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + return (cache_mngr ? my_b_tell (&cache_mngr->trx_cache.cache_log) : 0); +} + +/** + This function checks if a transactional talbe was updated by the + current statement. + + @param thd The client thread that executed the current statement. + @return + @c true if a transactional table was updated, @false otherwise. +*/ +bool +stmt_has_updated_trans_table(THD *thd) +{ + Ha_trx_info *ha_info; + + for (ha_info= thd->transaction.stmt.ha_list; ha_info; ha_info= ha_info->next()) + { + if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton) + return (TRUE); + } + return (FALSE); +} + /* Function to start a statement and optionally a transaction for the binary log. @@ -3771,11 +3910,10 @@ int THD::binlog_setup_trx_data() - Start a transaction if not in autocommit mode or if a BEGIN statement has been seen. - - Start a statement transaction to allow us to truncate the binary - log. + - Start a statement transaction to allow us to truncate the cache. - Save the currrent binlog position so that we can roll back the - statement by truncating the transaction log. + statement by truncating the cache. We only update the saved position if the old one was undefined, the reason is that there are some cases (e.g., for CREATE-SELECT) @@ -3789,15 +3927,15 @@ int THD::binlog_setup_trx_data() void THD::binlog_start_trans_and_stmt() { - binlog_trx_data *trx_data= (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); DBUG_ENTER("binlog_start_trans_and_stmt"); - DBUG_PRINT("enter", ("trx_data: 0x%lx trx_data->before_stmt_pos: %lu", - (long) trx_data, - (trx_data ? (ulong) trx_data->before_stmt_pos : + DBUG_PRINT("enter", ("cache_mngr: %p cache_mngr->trx_cache.get_prev_position(): %lu", + cache_mngr, + (cache_mngr ? (ulong) cache_mngr->trx_cache.get_prev_position() : (ulong) 0))); - if (trx_data == NULL || - trx_data->before_stmt_pos == MY_OFF_T_UNDEF) + if (cache_mngr == NULL || + cache_mngr->trx_cache.get_prev_position() == MY_OFF_T_UNDEF) { this->binlog_set_stmt_begin(); if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) @@ -3818,27 +3956,35 @@ THD::binlog_start_trans_and_stmt() } void THD::binlog_set_stmt_begin() { - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); /* - The call to binlog_trans_log_savepos() might create the trx_data + The call to binlog_trans_log_savepos() might create the cache_mngr structure, if it didn't exist before, so we save the position into an auto variable and then write it into the transaction - data for the binary log (i.e., trx_data). + data for the binary log (i.e., cache_mngr). */ my_off_t pos= 0; binlog_trans_log_savepos(this, &pos); - trx_data= (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); - trx_data->before_stmt_pos= pos; + cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + cache_mngr->trx_cache.set_prev_position(pos); } -/* - Write a table map to the binary log. - */ - -int THD::binlog_write_table_map(TABLE *table, bool is_trans) +/** + This function writes a table map to the binary log. + Note that in order to keep the signature uniform with related methods, + we use a redundant parameter to indicate whether a transactional table + was changed or not. + + @param table a pointer to the table. + @param is_transactional @c true indicates a transactional table, + otherwise @c false a non-transactional. + @return + nonzero if an error pops up when writing the table map event. +*/ +int THD::binlog_write_table_map(TABLE *table, bool is_transactional) { int error; DBUG_ENTER("THD::binlog_write_table_map"); @@ -3854,12 +4000,17 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) flags= Table_map_log_event::TM_NO_FLAGS; Table_map_log_event - the_event(this, table, table->s->table_map_id, is_trans, flags); + the_event(this, table, table->s->table_map_id, is_transactional, flags); - if (is_trans && binlog_table_maps == 0) + if (binlog_table_maps == 0) binlog_start_trans_and_stmt(); - if ((error= mysql_bin_log.write(&the_event))) + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + + IO_CACHE *file= cache_mngr->get_binlog_cache_log(is_transactional); + + if ((error= the_event.write(file))) DBUG_RETURN(error); binlog_table_maps++; @@ -3867,144 +4018,163 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) DBUG_RETURN(0); } +/** + This function retrieves a pending row event from a cache which is + specified through the parameter @c is_transactional. Respectively, when it + is @c true, the pending event is returned from the transactional cache. + Otherwise from the non-transactional cache. + + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. + @return + The row event if any. +*/ Rows_log_event* -THD::binlog_get_pending_rows_event() const +THD::binlog_get_pending_rows_event(bool is_transactional) const { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + Rows_log_event* rows= NULL; + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + /* - This is less than ideal, but here's the story: If there is no - trx_data, prepare_pending_rows_event() has never been called - (since the trx_data is set up there). In that case, we just return - NULL. + This is less than ideal, but here's the story: If there is no cache_mngr, + prepare_pending_rows_event() has never been called (since the cache_mngr + is set up there). In that case, we just return NULL. */ - return trx_data ? trx_data->pending() : NULL; + if (cache_mngr) + { + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(is_transactional); + + rows= cache_data->pending(); + } + return (rows); } +/** + This function stores a pending row event into a cache which is specified + through the parameter @c is_transactional. Respectively, when it is @c + true, the pending event is stored into the transactional cache. Otherwise + into the non-transactional cache. + + @param evt a pointer to the row event. + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. +*/ void -THD::binlog_set_pending_rows_event(Rows_log_event* ev) +THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional) { if (thd_get_ha_data(this, binlog_hton) == NULL) binlog_setup_trx_data(); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + + DBUG_ASSERT(cache_mngr); + + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(is_transactional); - DBUG_ASSERT(trx_data); - trx_data->set_pending(ev); + cache_data->set_pending(ev); } /** - Remove the pending rows event, discarding any outstanding rows. - - If there is no pending rows event available, this is effectively a + This function removes the pending rows event, discarding any outstanding + rows. If there is no pending rows event available, this is effectively a no-op. - */ + + @param thd a pointer to the user thread. + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. +*/ int -MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd) +MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::remove_pending_rows_event"); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + DBUG_ASSERT(cache_mngr); - DBUG_ASSERT(trx_data); + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(is_transactional); - if (Rows_log_event* pending= trx_data->pending()) + if (Rows_log_event* pending= cache_data->pending()) { delete pending; - trx_data->set_pending(NULL); + cache_data->set_pending(NULL); } DBUG_RETURN(0); } /* - Moves the last bunch of rows from the pending Rows event to the binlog - (either cached binlog if transaction, or disk binlog). Sets a new pending - event. + Moves the last bunch of rows from the pending Rows event to a cache (either + transactional cache if is_transaction is @c true, or the non-transactional + cache otherwise. Sets a new pending event. + + @param thd a pointer to the user thread. + @param evt a pointer to the row event. + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. */ int MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, - Rows_log_event* event) + Rows_log_event* event, + bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); DBUG_ASSERT(mysql_bin_log.is_open()); DBUG_PRINT("enter", ("event: 0x%lx", (long) event)); int error= 0; + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + DBUG_ASSERT(cache_mngr); - DBUG_ASSERT(trx_data); + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(is_transactional); - DBUG_PRINT("info", ("trx_data->pending(): 0x%lx", (long) trx_data->pending())); + DBUG_PRINT("info", ("cache_mngr->pending(): 0x%lx", (long) cache_data->pending())); - if (Rows_log_event* pending= trx_data->pending()) + if (Rows_log_event* pending= cache_data->pending()) { - IO_CACHE *file= &log_file; + IO_CACHE *file= &cache_data->cache_log; /* - Decide if we should write to the log file directly or to the - transaction log. - */ - if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log)) - file= &trx_data->trans_log; - - /* - If we are writing to the log file directly, we could avoid - locking the log. This does not work since we need to step the - m_table_map_version below, and that change has to be protected - by the LOCK_log mutex. - */ - pthread_mutex_lock(&LOCK_log); - - /* - Write pending event to log file or transaction cache + Write pending event to the cache. */ if (pending->write(file)) { - pthread_mutex_unlock(&LOCK_log); set_write_error(thd); + if (check_write_error(thd) && cache_data && + thd->transaction.stmt.modified_non_trans_table) + cache_data->set_incident(); DBUG_RETURN(1); } /* We step the table map version if we are writing an event - representing the end of a statement. We do this regardless of - wheather we write to the transaction cache or to directly to the - file. - - In an ideal world, we could avoid stepping the table map version - if we were writing to a transaction cache, since we could then - reuse the table map that was written earlier in the transaction - cache. This does not work since STMT_END_F implies closing all - table mappings on the slave side. + representing the end of a statement. + In an ideal world, we could avoid stepping the table map version, + since we could then reuse the table map that was written earlier + in the cache. This does not work since STMT_END_F implies closing + all table mappings on the slave side. + TODO: Find a solution so that table maps does not have to be written several times within a transaction. - */ + */ if (pending->get_flags(Rows_log_event::STMT_END_F)) ++m_table_map_version; delete pending; - - if (file == &log_file) - { - error= flush_and_sync(0); - if (!error) - { - signal_update(); - rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); - } - } - - pthread_mutex_unlock(&LOCK_log); } - thd->binlog_set_pending_rows_event(event); + thd->binlog_set_pending_rows_event(event, is_transactional); DBUG_RETURN(error); } @@ -4018,6 +4188,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) THD *thd= event_info->thd; bool error= 1; DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)"); + binlog_cache_data *cache_data= 0; if (thd->binlog_evt_union.do_union) { @@ -4026,27 +4197,22 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) We will log the function call to the binary log on function exit */ thd->binlog_evt_union.unioned_events= TRUE; - thd->binlog_evt_union.unioned_events_trans |= event_info->cache_stmt; + thd->binlog_evt_union.unioned_events_trans |= + event_info->use_trans_cache(); DBUG_RETURN(0); } /* - Flush the pending rows event to the transaction cache or to the - log file. Since this function potentially aquire the LOCK_log - mutex, we do this before aquiring the LOCK_log mutex in this - function. - We only end the statement if we are in a top-level statement. If we are inside a stored function, we do not end the statement since this will close all tables on the slave. */ bool const end_stmt= thd->prelocked_mode && thd->lex->requires_prelocking(); - if (thd->binlog_flush_pending_rows_event(end_stmt)) + if (thd->binlog_flush_pending_rows_event(end_stmt, + event_info->use_trans_cache())) DBUG_RETURN(error); - pthread_mutex_lock(&LOCK_log); - /* In most cases this is only called if 'is_open()' is true; in fact this is mostly called if is_open() *was* true a few instructions before, but it @@ -4054,7 +4220,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) */ if (likely(is_open())) { - IO_CACHE *file= &log_file; #ifdef HAVE_REPLICATION /* In the future we need to add to the following if tests like @@ -4064,63 +4229,67 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) const char *local_db= event_info->get_db(); if ((thd && !(thd->options & OPTION_BIN_LOG)) || (!binlog_filter->db_ok(local_db))) - { - VOID(pthread_mutex_unlock(&LOCK_log)); DBUG_RETURN(0); - } #endif /* HAVE_REPLICATION */ -#if defined(USING_TRANSACTIONS) - /* - Should we write to the binlog cache or to the binlog on disk? - Write to the binlog cache if: - - it is already not empty (meaning we're in a transaction; note that the - present event could be about a non-transactional table, but still we need - to write to the binlog cache in that case to handle updates to mixed - trans/non-trans table types the best possible in binlogging) - - or if the event asks for it (cache_stmt == TRUE). - */ - if (opt_using_transactions && thd) +#if defined(USING_TRANSACTIONS) + IO_CACHE *file= NULL; + + if (event_info->use_direct_logging()) + { + file= &log_file; + pthread_mutex_lock(&LOCK_log); + } + else { if (thd->binlog_setup_trx_data()) goto err; - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - IO_CACHE *trans_log= &trx_data->trans_log; - my_off_t trans_log_pos= my_b_tell(trans_log); - if (event_info->get_cache_stmt() || trans_log_pos != 0 || - stmt_has_updated_trans_table(thd)) + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + /* + If we are about to use write rows, we just need to check the type of + the event (either transactional or non-transactional) in order to + choose the cache. + */ + if (thd->is_current_stmt_binlog_format_row()) { - DBUG_PRINT("info", ("Using trans_log: cache: %d, trans_log_pos: %lu", - event_info->get_cache_stmt(), - (ulong) trans_log_pos)); - thd->binlog_start_trans_and_stmt(); - file= trans_log; + file= cache_mngr->get_binlog_cache_log(event_info->use_trans_cache()); + cache_data= cache_mngr->get_binlog_cache_data(event_info->use_trans_cache()); } /* - TODO as Mats suggested, for all the cases above where we write to - trans_log, it sounds unnecessary to lock LOCK_log. We should rather - test first if we want to write to trans_log, and if not, lock - LOCK_log. + However, if we are about to write statements we need to consider other + things. We use the non-transactional cache when: + + . the transactional cache is empty which means that there were no + early statement on behalf of the transaction. + . the respective event is tagged as non-transactional. */ + else if (cache_mngr->trx_cache.empty() && + !event_info->use_trans_cache()) + { + file= &cache_mngr->stmt_cache.cache_log; + cache_data= &cache_mngr->stmt_cache; + } + else + { + file= &cache_mngr->trx_cache.cache_log; + cache_data= &cache_mngr->trx_cache; + } + + thd->binlog_start_trans_and_stmt(); } #endif /* USING_TRANSACTIONS */ DBUG_PRINT("info",("event type: %d",event_info->get_type_code())); /* - No check for auto events flag here - this write method should - never be called if auto-events are enabled - */ - - /* - 1. Write first log events which describe the 'run environment' - of the SQL command - */ + No check for auto events flag here - this write method should + never be called if auto-events are enabled. - /* - If row-based binlogging, Insert_id, Rand and other kind of "setting - context" events are not needed. + Write first log events which describe the 'run environment' + of the SQL command. If row-based binlogging, Insert_id, Rand + and other kind of "setting context" events are not needed. */ if (thd) { @@ -4170,39 +4339,48 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) } /* - Write the SQL command - */ - - if (event_info->write(file) || + Write the event. + */ + if (event_info->write(file) || DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0)) goto err; - if (file == &log_file) // we are writing to the real log (disk) + error= 0; + +err: + if (event_info->use_direct_logging()) { - bool synced= 0; - if (flush_and_sync(&synced)) - goto err; + if (!error) + { + bool synced; + if ((error= flush_and_sync(&synced))) + goto unlock; - if (RUN_HOOK(binlog_storage, after_flush, - (thd, log_file_name, file->pos_in_file, synced))) { - sql_print_error("Failed to run 'after_flush' hooks"); - goto err; + if ((error= RUN_HOOK(binlog_storage, after_flush, + (thd, log_file_name, file->pos_in_file, synced)))) + { + sql_print_error("Failed to run 'after_flush' hooks"); + goto unlock; + } + signal_update(); + rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } - - signal_update(); - rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); +unlock: + pthread_mutex_unlock(&LOCK_log); } - error=0; -err: if (error) + { set_write_error(thd); + if (check_write_error(thd) && cache_data && + thd->transaction.stmt.modified_non_trans_table) + cache_data->set_incident(); + } } if (event_info->flags & LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F) ++m_table_map_version; - pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -4314,7 +4492,7 @@ uint MYSQL_BIN_LOG::next_file_id() write_cache() cache Cache to write to the binary log lock_log True if the LOCK_log mutex should be aquired, false otherwise - sync_log True if the log should be flushed and sync:ed + sync_log True if the log should be flushed and synced DESCRIPTION Write the contents of the cache to the binary log. The cache will @@ -4530,9 +4708,6 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)"); VOID(pthread_mutex_lock(&LOCK_log)); - /* NULL would represent nothing to replicate after ROLLBACK */ - DBUG_ASSERT(commit_event != NULL); - DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { @@ -4547,19 +4722,9 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, transaction is either a BEGIN..COMMIT block or a single statement in autocommit mode. */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); - - /* - Now this Query_log_event has artificial log_pos 0. It must be - adjusted to reflect the real position in the log. Not doing it - would confuse the slave: it would prevent this one from - knowing where he is in the master's binlog, which would result - in wrong positions being shown to the user, MASTER_POS_WAIT - undue waiting etc. - */ + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE, TRUE, 0); if (qinfo.write(&log_file)) goto err; - DBUG_EXECUTE_IF("crash_before_writing_xid", { if ((write_error= write_cache(cache, false, true))) @@ -5657,13 +5822,13 @@ int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid) { DBUG_ENTER("TC_LOG_BINLOG::log"); Xid_log_event xle(thd, xid); - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); /* We always commit the entire transaction when writing an XID. Also note that the return value is inverted. */ - DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE)); + DBUG_RETURN(!binlog_flush_trx_cache(thd, cache_mngr, &xle)); } void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) diff --git a/sql/log.h b/sql/log.h index a31be6dcce6..40695ed2d2d 100644 --- a/sql/log.h +++ b/sql/log.h @@ -20,6 +20,9 @@ class Relay_log_info; class Format_description_log_event; +bool trans_has_updated_trans_table(THD* thd); +bool stmt_has_updated_trans_table(THD *thd); + /* Transaction Coordinator log - a base abstract class for two different implementations @@ -329,8 +332,9 @@ public: ulonglong table_map_version() const { return m_table_map_version; } void update_table_map_version() { ++m_table_map_version; } - int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event); - int remove_pending_rows_event(THD *thd); + int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event, + bool is_transactional); + int remove_pending_rows_event(THD *thd, bool is_transactional); #endif /* !defined(MYSQL_CLIENT) */ void reset_bytes_written() @@ -368,7 +372,7 @@ public: /* Use this to start writing a new log file */ void new_file(); - bool write(Log_event* event_info); // binary log write + bool write(Log_event* event_info); bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event, bool incident); bool write_incident(THD *thd, bool lock); diff --git a/sql/log_event.cc b/sql/log_event.cc index e574be774e3..6c7b6fdf409 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -664,10 +664,11 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) { server_id= thd->server_id; when= thd->start_time; - cache_stmt= using_trans; + cache_type= (using_trans || stmt_has_updated_trans_table(thd) + ? Log_event::EVENT_TRANSACTIONAL_CACHE : + Log_event::EVENT_STMT_CACHE); } - /** This minimal constructor is for when you are not even sure that there is a valid THD. For example in the server when we are shutting down or @@ -676,8 +677,8 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) */ Log_event::Log_event() - :temp_buf(0), exec_time(0), flags(0), cache_stmt(0), - thd(0) + :temp_buf(0), exec_time(0), flags(0), + cache_type(Log_event::EVENT_INVALID_CACHE), thd(0) { server_id= ::server_id; /* @@ -696,7 +697,7 @@ Log_event::Log_event() Log_event::Log_event(const char* buf, const Format_description_log_event* description_event) - :temp_buf(0), cache_stmt(0) + :temp_buf(0), cache_type(Log_event::EVENT_INVALID_CACHE) { #ifndef MYSQL_CLIENT thd = 0; @@ -2352,7 +2353,7 @@ Query_log_event::Query_log_event() */ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length, bool using_trans, - bool suppress_use, int errcode) + bool direct, bool suppress_use, int errcode) :Log_event(thd_arg, (thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : @@ -2431,6 +2432,95 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, } else time_zone_len= 0; + + /* + In what follows, we decide whether to write to the binary log or to use a + cache. + */ + LEX *lex= thd->lex; + bool implicit_commit= FALSE; + cache_type= Log_event::EVENT_INVALID_CACHE; + switch (lex->sql_command) + { + case SQLCOM_ALTER_DB: + case SQLCOM_CREATE_FUNCTION: + case SQLCOM_DROP_FUNCTION: + case SQLCOM_DROP_PROCEDURE: + case SQLCOM_INSTALL_PLUGIN: + case SQLCOM_UNINSTALL_PLUGIN: + case SQLCOM_ALTER_TABLESPACE: + implicit_commit= TRUE; + break; + case SQLCOM_DROP_TABLE: + implicit_commit= !(lex->drop_temporary && thd->in_multi_stmt_transaction()); + break; + case SQLCOM_ALTER_TABLE: + case SQLCOM_CREATE_TABLE: + implicit_commit= !((lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) && + thd->in_multi_stmt_transaction()) && + !(lex->select_lex.item_list.elements && + thd->is_current_stmt_binlog_format_row()); + break; + case SQLCOM_SET_OPTION: + implicit_commit= (lex->autocommit ? TRUE : FALSE); + break; + /* + Replace what follows after CF_AUTO_COMMIT_TRANS is backported by: + + default: + implicit_commit= ((sql_command_flags[lex->sql_command] & + CF_AUTO_COMMIT_TRANS)); + break; + */ + case SQLCOM_CREATE_INDEX: + case SQLCOM_TRUNCATE: + case SQLCOM_CREATE_DB: + case SQLCOM_DROP_DB: + case SQLCOM_ALTER_DB_UPGRADE: + case SQLCOM_RENAME_TABLE: + case SQLCOM_DROP_INDEX: + case SQLCOM_CREATE_VIEW: + case SQLCOM_DROP_VIEW: + case SQLCOM_CREATE_TRIGGER: + case SQLCOM_DROP_TRIGGER: + case SQLCOM_CREATE_EVENT: + case SQLCOM_ALTER_EVENT: + case SQLCOM_DROP_EVENT: + case SQLCOM_REPAIR: + case SQLCOM_OPTIMIZE: + case SQLCOM_ANALYZE: + case SQLCOM_CREATE_USER: + case SQLCOM_DROP_USER: + case SQLCOM_RENAME_USER: + case SQLCOM_REVOKE_ALL: + case SQLCOM_REVOKE: + case SQLCOM_GRANT: + case SQLCOM_CREATE_PROCEDURE: + case SQLCOM_CREATE_SPFUNCTION: + case SQLCOM_ALTER_PROCEDURE: + case SQLCOM_ALTER_FUNCTION: + case SQLCOM_ASSIGN_TO_KEYCACHE: + case SQLCOM_PRELOAD_KEYS: + case SQLCOM_FLUSH: + case SQLCOM_CHECK: + implicit_commit= TRUE; + break; + default: + implicit_commit= FALSE; + break; + } + + if (implicit_commit || direct) + { + cache_type= Log_event::EVENT_NO_CACHE; + } + else + { + cache_type= (using_trans || stmt_has_updated_trans_table(thd) + ? Log_event::EVENT_TRANSACTIONAL_CACHE : + Log_event::EVENT_STMT_CACHE); + } + DBUG_ASSERT(cache_type != Log_event::EVENT_INVALID_CACHE); DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %lu", (ulong) flags2, sql_mode)); } @@ -6684,9 +6774,9 @@ Execute_load_query_log_event(THD *thd_arg, const char* query_arg, ulong query_length_arg, uint fn_pos_start_arg, uint fn_pos_end_arg, enum_load_dup_handling dup_handling_arg, - bool using_trans, bool suppress_use, + bool using_trans, bool direct, bool suppress_use, int errcode): - Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, + Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, direct, suppress_use, errcode), file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg), fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg) @@ -7509,7 +7599,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) error= 0; } - if (!cache_stmt) + if (!use_trans_cache()) { DBUG_PRINT("info", ("Marked that we need to keep log")); thd->options|= OPTION_KEEP_LOG; @@ -7539,7 +7629,6 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); thd->is_slave_error= 1; } - DBUG_RETURN(error); } @@ -7587,7 +7676,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) (assume the last master's transaction is ignored by the slave because of replicate-ignore rules). */ - thd->binlog_flush_pending_rows_event(true); + thd->binlog_flush_pending_rows_event(TRUE); /* If this event is not in a transaction, the call below will, if some @@ -7854,7 +7943,7 @@ int Table_map_log_event::save_field_metadata() #if !defined(MYSQL_CLIENT) Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, bool is_transactional, uint16 flags) - : Log_event(thd, 0, true), + : Log_event(thd, 0, is_transactional), m_table(tbl), m_dbnam(tbl->s->db.str), m_dblen(m_dbnam ? tbl->s->db.length : 0), diff --git a/sql/log_event.h b/sql/log_event.h index de171145acd..5a897c55061 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -870,6 +870,31 @@ public: EVENT_SKIP_COUNT }; + enum enum_event_cache_type + { + EVENT_INVALID_CACHE, + /* + If possible the event should use a non-transactional cache before + being flushed to the binary log. This means that it must be flushed + right after its correspondent statement is completed. + */ + EVENT_STMT_CACHE, + /* + The event should use a transactional cache before being flushed to + the binary log. This means that it must be flushed upon commit or + rollback. + */ + EVENT_TRANSACTIONAL_CACHE, + /* + The event must be written directly to the binary log without going + through a cache. + */ + EVENT_NO_CACHE, + /** + If there is a need for different types, introduce them before this. + */ + EVENT_CACHE_COUNT + }; /* The following type definition is to be used whenever data is placed @@ -920,8 +945,12 @@ public: LOG_EVENT_SUPPRESS_USE_F for notes. */ uint16 flags; - - bool cache_stmt; + + /* + Defines the type of the cache, if any, where the event will be + stored before being flushed to disk. + */ + uint16 cache_type; /** A storage to cache the global system variable's value. @@ -933,7 +962,7 @@ public: THD* thd; Log_event(); - Log_event(THD* thd_arg, uint16 flags_arg, bool cache_stmt); + Log_event(THD* thd_arg, uint16 flags_arg, bool is_transactional); /* read_log_event() functions read an event from a binlog or relay log; used by SHOW BINLOG EVENTS, the binlog_dump thread on the @@ -1031,7 +1060,18 @@ public: void set_relay_log_event() { flags |= LOG_EVENT_RELAY_LOG_F; } bool is_artificial_event() const { return flags & LOG_EVENT_ARTIFICIAL_F; } bool is_relay_log_event() const { return flags & LOG_EVENT_RELAY_LOG_F; } - inline bool get_cache_stmt() const { return cache_stmt; } + inline bool use_trans_cache() const + { + return (cache_type == Log_event::EVENT_TRANSACTIONAL_CACHE); + } + inline void set_direct_logging() + { + cache_type = Log_event::EVENT_NO_CACHE; + } + inline bool use_direct_logging() + { + return (cache_type == Log_event::EVENT_NO_CACHE); + } Log_event(const char* buf, const Format_description_log_event *description_event); virtual ~Log_event() { free_temp_buf();} @@ -1645,7 +1685,7 @@ public: #ifndef MYSQL_CLIENT Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length, - bool using_trans, bool suppress_use, int error); + bool using_trans, bool direct, bool suppress_use, int error); const char* get_db() { return db; } #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); @@ -2895,8 +2935,8 @@ public: ulong query_length, uint fn_pos_start_arg, uint fn_pos_end_arg, enum_load_dup_handling dup_handling_arg, - bool using_trans, bool suppress_use, - int errcode); + bool using_trans, bool direct, + bool suppress_use, int errcode); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); #endif /* HAVE_REPLICATION */ @@ -3328,10 +3368,10 @@ public: return new table_def(m_coltype, m_colcnt, m_field_metadata, m_field_metadata_size, m_null_bits); } +#endif ulong get_table_id() const { return m_table_id; } const char *get_table_name() const { return m_tblnam; } const char *get_db_name() const { return m_dbnam; } -#endif virtual Log_event_type get_type_code() { return TABLE_MAP_EVENT; } virtual bool is_valid() const { return m_memory != NULL; /* we check malloc */ } @@ -3883,6 +3923,7 @@ public: DBUG_PRINT("enter", ("m_incident: %d", m_incident)); m_message.str= NULL; /* Just as a precaution */ m_message.length= 0; + set_direct_logging(); DBUG_VOID_RETURN; } @@ -3892,6 +3933,7 @@ public: DBUG_ENTER("Incident_log_event::Incident_log_event"); DBUG_PRINT("enter", ("m_incident: %d", m_incident)); m_message= msg; + set_direct_logging(); DBUG_VOID_RETURN; } #endif diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index e7dda24f60d..fe81e865048 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -226,7 +226,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info DBUG_EXECUTE_IF("stop_slave_middle_group", const_cast<Relay_log_info*>(rli)->abort_slave= 1;); error= do_after_row_operations(table, error); - if (!ev->cache_stmt) + if (!ev->use_trans_cache()) { DBUG_PRINT("info", ("Marked that we need to keep log")); thd->options|= OPTION_KEEP_LOG; @@ -1510,7 +1510,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) NOTE: For this new scheme there should be no pending event: need to add code to assert that is the case. */ - thd->binlog_flush_pending_rows_event(false); + thd->binlog_flush_pending_rows_event(FALSE); TABLE_LIST *tables= rli->tables_to_lock; close_tables_for_reopen(thd, &tables); @@ -1716,7 +1716,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) DBUG_EXECUTE_IF("stop_slave_middle_group", const_cast<Relay_log_info*>(rli)->abort_slave= 1;); error= do_after_row_operations(rli, error); - if (!cache_stmt) + if (!use_trans_cache()) { DBUG_PRINT("info", ("Marked that we need to keep log")); thd->options|= OPTION_KEEP_LOG; @@ -1755,7 +1755,6 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) thd->is_slave_error= 1; DBUG_RETURN(error); } - DBUG_RETURN(0); } @@ -1800,7 +1799,7 @@ Old_rows_log_event::do_update_pos(Relay_log_info *rli) (assume the last master's transaction is ignored by the slave because of replicate-ignore rules). */ - thd->binlog_flush_pending_rows_event(true); + thd->binlog_flush_pending_rows_event(TRUE); /* If this event is not in a transaction, the call below will, if some diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 507f4b013cc..e3a1b5e6c73 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -939,7 +939,8 @@ struct Query_cache_query_flags #endif /*HAVE_QUERY_CACHE*/ void write_bin_log(THD *thd, bool clear_error, - char const *query, ulong query_length); + char const *query, ulong query_length, + bool is_trans= FALSE); /* sql_connect.cc */ int check_user(THD *thd, enum enum_server_command command, diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt index 3ae0683568e..7afc73582ab 100644 --- a/sql/share/errmsg.txt +++ b/sql/share/errmsg.txt @@ -6235,6 +6235,8 @@ ER_BINLOG_UNSAFE_SYSTEM_VARIABLE eng "Statement uses a system variable whose value may differ on slave." ER_BINLOG_UNSAFE_SYSTEM_FUNCTION eng "Statement uses a system function whose value may differ on slave." +ER_BINLOG_UNSAFE_NONTRANS_AFTER_TRANS + eng "Non-transactional reads or writes are unsafe if they occur after transactional reads or writes inside a transaction." ER_MESSAGE_AND_STATEMENT eng "%s Statement: %s" diff --git a/sql/sp.cc b/sql/sp.cc index 9fafe4ac355..e44b8b3ffdf 100644 --- a/sql/sp.cc +++ b/sql/sp.cc @@ -938,7 +938,7 @@ sp_create_routine(THD *thd, int type, sp_head *sp) /* Such a statement can always go directly to binlog, no trans cache */ thd->binlog_query(THD::STMT_QUERY_TYPE, log_query.c_ptr(), log_query.length(), - FALSE, FALSE, 0); + FALSE, FALSE, FALSE, 0); thd->variables.sql_mode= 0; } diff --git a/sql/sp_head.cc b/sql/sp_head.cc index ef5dc8d3351..06382f85694 100644 --- a/sql/sp_head.cc +++ b/sql/sp_head.cc @@ -1781,7 +1781,7 @@ sp_head::execute_function(THD *thd, Item **argp, uint argcount, { int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED); Query_log_event qinfo(thd, binlog_buf.ptr(), binlog_buf.length(), - thd->binlog_evt_union.unioned_events_trans, FALSE, errcode); + thd->binlog_evt_union.unioned_events_trans, FALSE, FALSE, errcode); if (mysql_bin_log.write(&qinfo) && thd->binlog_evt_union.unioned_events_trans) { diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index 10fefd5bb04..89ba9bc091b 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -1655,7 +1655,7 @@ bool change_password(THD *thd, const char *host, const char *user, new_password)); thd->clear_error(); thd->binlog_query(THD::STMT_QUERY_TYPE, buff, query_length, - FALSE, FALSE, 0); + FALSE, FALSE, FALSE, 0); } end: close_thread_tables(thd); diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 10185705efe..44d15d0ed65 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -1542,7 +1542,7 @@ void close_temporary_tables(THD *thd) thd->variables.character_set_client= system_charset_info; Query_log_event qinfo(thd, s_query.ptr(), s_query.length() - 1 /* to remove trailing ',' */, - 0, FALSE, 0); + FALSE, TRUE, FALSE, 0); qinfo.db= db.ptr(); thd->variables.character_set_client= cs_save; mysql_bin_log.write(&qinfo); @@ -4021,7 +4021,7 @@ retry: int errcode= query_error_code(thd, TRUE); thd->binlog_query(THD::STMT_QUERY_TYPE, query, (ulong)(end-query), - FALSE, FALSE, errcode); + FALSE, FALSE, FALSE, errcode); my_free(query, MYF(0)); } else diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 2301dec5b51..94d7b679c05 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -828,7 +828,8 @@ void THD::init(void) else options &= ~OPTION_BIG_SELECTS; - transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= FALSE; + transaction.all.modified_non_trans_table= + transaction.stmt.modified_non_trans_table= FALSE; open_options=ha_open_options; update_lock_default= (variables.low_priority_updates ? TL_WRITE_LOW_PRIORITY : @@ -3403,7 +3404,10 @@ int THD::decide_logging_format(TABLE_LIST *tables) HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE; my_bool multi_engine= FALSE; - void* prev_ht= NULL; + my_bool mixed_engine= FALSE; + my_bool all_trans_engines= TRUE; + TABLE* prev_write_table= NULL; + TABLE* prev_access_table= NULL; #ifndef DBUG_OFF { @@ -3432,14 +3436,94 @@ int THD::decide_logging_format(TABLE_LIST *tables) handler::Table_flags const flags= table->table->file->ha_table_flags(); DBUG_PRINT("info", ("table: %s; ha_table_flags: 0x%llx", table->table_name, flags)); - if (prev_ht && prev_ht != table->table->file->ht) + if (prev_write_table && prev_write_table->file->ht != + table->table->file->ht) multi_engine= TRUE; - prev_ht= table->table->file->ht; + all_trans_engines= all_trans_engines && + table->table->file->has_transactions(); + prev_write_table= table->table; flags_all_set &= flags; flags_some_set |= flags; } + if (prev_access_table && prev_access_table->file->ht != table->table->file->ht) + mixed_engine= mixed_engine || (prev_access_table->file->has_transactions() != + table->table->file->has_transactions()); + prev_access_table= table->table; } + /* + Set the statement as unsafe if: + + . it is a mixed statement, i.e. access transactional and non-transactional + tables, and updates at least one; + or + . an early statement updated a transactional table; + . and, the current statement updates a non-transactional table. + + Any mixed statement is classified as unsafe to ensure that mixed mode is + completely safe. Consider the following example to understand why we + decided to do this: + + Note that mixed statements such as + + 1: INSERT INTO myisam_t SELECT * FROM innodb_t; + + 2: INSERT INTO innodb_t SELECT * FROM myisam_t; + + are classified as unsafe to ensure that in mixed mode the execution is + completely safe and equivalent to the row mode. Consider the following + statements and sessions (connections) to understand the reason: + + con1: INSERT INTO innodb_t VALUES (1); + con1: INSERT INTO innodb_t VALUES (100); + + con1: BEGIN + con2: INSERT INTO myisam_t SELECT * FROM innodb_t; + con1: INSERT INTO innodb_t VALUES (200); + con1: COMMIT; + + The point is that the concurrent statements may be written into the binary log + in a way different from the execution. For example, + + BINARY LOG: + + con2: BEGIN; + con2: INSERT INTO myisam_t SELECT * FROM innodb_t; + con2: COMMIT; + con1: BEGIN + con1: INSERT INTO innodb_t VALUES (200); + con1: COMMIT; + + .... + + or + + BINARY LOG: + + con1: BEGIN + con1: INSERT INTO innodb_t VALUES (200); + con1: COMMIT; + con2: BEGIN; + con2: INSERT INTO myisam_t SELECT * FROM innodb_t; + con2: COMMIT; + + Clearly, this may become a problem in STMT mode and setting the statement + as unsafe will make rows to be written into the binary log in MIXED mode + and as such the problem will not stand. + + In STMT mode, although such statement is classified as unsafe, i.e. + + INSERT INTO myisam_t SELECT * FROM innodb_t; + + there is no enough information to avoid writing it outside the boundaries + of a transaction. This is not a problem if we are considering snapshot + isolation level but if we have pure repeatable read or serializable the + lock history on the slave will be different from the master. + */ + if (mixed_engine || + trans_has_updated_trans_table(this) && !all_trans_engines) + lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_NONTRANS_AFTER_TRANS); + DBUG_PRINT("info", ("flags_all_set: 0x%llx", flags_all_set)); DBUG_PRINT("info", ("flags_some_set: 0x%llx", flags_some_set)); DBUG_PRINT("info", ("multi_engine: %d", multi_engine)); @@ -3630,7 +3714,7 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, if (binlog_setup_trx_data()) DBUG_RETURN(NULL); - Rows_log_event* pending= binlog_get_pending_rows_event(); + Rows_log_event* pending= binlog_get_pending_rows_event(is_transactional); if (unlikely(pending && !pending->is_valid())) DBUG_RETURN(NULL); @@ -3664,7 +3748,9 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, flush the pending event and replace it with the newly created event... */ - if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev))) + if (unlikely( + mysql_bin_log.flush_and_set_pending_rows_event(this, ev, + is_transactional))) { delete ev; DBUG_RETURN(NULL); @@ -3988,14 +4074,15 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, } -int THD::binlog_remove_pending_rows_event(bool clear_maps) +int THD::binlog_remove_pending_rows_event(bool clear_maps, + bool is_transactional) { DBUG_ENTER("THD::binlog_remove_pending_rows_event"); if (!mysql_bin_log.is_open()) DBUG_RETURN(0); - mysql_bin_log.remove_pending_rows_event(this); + mysql_bin_log.remove_pending_rows_event(this, is_transactional); if (clear_maps) binlog_table_maps= 0; @@ -4003,7 +4090,7 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps) DBUG_RETURN(0); } -int THD::binlog_flush_pending_rows_event(bool stmt_end) +int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) { DBUG_ENTER("THD::binlog_flush_pending_rows_event"); /* @@ -4019,7 +4106,7 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end) flag is set. */ int error= 0; - if (Rows_log_event *pending= binlog_get_pending_rows_event()) + if (Rows_log_event *pending= binlog_get_pending_rows_event(is_transactional)) { if (stmt_end) { @@ -4028,7 +4115,8 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end) binlog_table_maps= 0; } - error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0); + error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0, + is_transactional); } DBUG_RETURN(error); @@ -4144,8 +4232,8 @@ void THD::issue_unsafe_warnings() write failure), then the error code is returned. */ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, - ulong query_len, bool is_trans, bool suppress_use, - int errcode) + ulong query_len, bool is_trans, bool direct, + bool suppress_use, int errcode) { DBUG_ENTER("THD::binlog_query"); DBUG_PRINT("enter", ("qtype: %s query: '%s'", @@ -4162,7 +4250,7 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, top-most close_thread_tables(). */ if (this->prelocked_mode == NON_PRELOCKED) - if (int error= binlog_flush_pending_rows_event(TRUE)) + if (int error= binlog_flush_pending_rows_event(TRUE, is_trans)) DBUG_RETURN(error); /* @@ -4207,8 +4295,8 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, flush the pending rows event if necessary. */ { - Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use, - errcode); + Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, + suppress_use, errcode); qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F; /* Binlog table maps will be irrelevant after a Query_log_event diff --git a/sql/sql_class.h b/sql/sql_class.h index 521884f3f4d..b6950108be4 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1427,10 +1427,15 @@ public: size_t needed, bool is_transactional, RowsEventT* hint); - Rows_log_event* binlog_get_pending_rows_event() const; - void binlog_set_pending_rows_event(Rows_log_event* ev); - int binlog_flush_pending_rows_event(bool stmt_end); - int binlog_remove_pending_rows_event(bool clear_maps); + Rows_log_event* binlog_get_pending_rows_event(bool is_transactional) const; + void binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional); + inline int binlog_flush_pending_rows_event(bool stmt_end) + { + return (binlog_flush_pending_rows_event(stmt_end, FALSE) || + binlog_flush_pending_rows_event(stmt_end, TRUE)); + } + int binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional); + int binlog_remove_pending_rows_event(bool clear_maps, bool is_transactional); /** Determine the binlog format of the current statement. @@ -1494,6 +1499,9 @@ public: uint get_binlog_table_maps() const { return binlog_table_maps; } + void clear_binlog_table_maps() { + binlog_table_maps= 0; + } #endif /* MYSQL_CLIENT */ public: @@ -1959,8 +1967,8 @@ public: }; int binlog_query(enum_binlog_query_type qtype, - char const *query, ulong query_len, - bool is_trans, bool suppress_use, + char const *query, ulong query_len, bool is_trans, + bool direct, bool suppress_use, int errcode); #endif @@ -2009,6 +2017,21 @@ public: return 0; #endif } + /** + Returns TRUE if session is in a multi-statement transaction mode. + + OPTION_NOT_AUTOCOMMIT: When autocommit is off, a multi-statement + transaction is implicitly started on the first statement after a + previous transaction has been ended. + + OPTION_BEGIN: Regardless of the autocommit status, a multi-statement + transaction can be explicitly started with the statements "START + TRANSACTION", "BEGIN [WORK]", "[COMMIT | ROLLBACK] AND CHAIN", etc. + */ + inline bool in_multi_stmt_transaction() + { + return options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN); + } inline bool fill_derived_tables() { return !stmt_arena->is_stmt_prepare() && !lex->only_view_structure(); diff --git a/sql/sql_db.cc b/sql/sql_db.cc index 3fca5bd7df6..770f68f0282 100644 --- a/sql/sql_db.cc +++ b/sql/sql_db.cc @@ -181,7 +181,7 @@ uchar* dboptions_get_key(my_dbopt_t *opt, size_t *length, static inline void write_to_binlog(THD *thd, char *query, uint q_len, char *db, uint db_len) { - Query_log_event qinfo(thd, query, q_len, 0, 0, 0); + Query_log_event qinfo(thd, query, q_len, FALSE, TRUE, FALSE, 0); qinfo.db= db; qinfo.db_len= db_len; mysql_bin_log.write(&qinfo); @@ -722,7 +722,7 @@ not_silent: if (mysql_bin_log.is_open()) { int errcode= query_error_code(thd, TRUE); - Query_log_event qinfo(thd, query, query_length, 0, + Query_log_event qinfo(thd, query, query_length, FALSE, TRUE, /* suppress_use */ TRUE, errcode); /* @@ -811,7 +811,7 @@ bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info) if (mysql_bin_log.is_open()) { int errcode= query_error_code(thd, TRUE); - Query_log_event qinfo(thd, thd->query, thd->query_length, 0, + Query_log_event qinfo(thd, thd->query, thd->query_length, FALSE, TRUE, /* suppress_use */ TRUE, errcode); /* @@ -959,7 +959,7 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent) if (mysql_bin_log.is_open()) { int errcode= query_error_code(thd, TRUE); - Query_log_event qinfo(thd, query, query_length, 0, + Query_log_event qinfo(thd, query, query_length, FALSE, TRUE, /* suppress_use */ TRUE, errcode); /* Write should use the database being created as the "current @@ -1961,7 +1961,7 @@ bool mysql_upgrade_db(THD *thd, LEX_STRING *old_db) { int errcode= query_error_code(thd, TRUE); Query_log_event qinfo(thd, thd->query, thd->query_length, - 0, TRUE, errcode); + FALSE, TRUE, TRUE, errcode); thd->clear_error(); mysql_bin_log.write(&qinfo); } diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc index 7fa7e39e5bf..c851ffdcc09 100644 --- a/sql/sql_delete.cc +++ b/sql/sql_delete.cc @@ -385,7 +385,8 @@ cleanup: transactional_table= table->file->has_transactions(); if (!transactional_table && deleted > 0) - thd->transaction.stmt.modified_non_trans_table= TRUE; + thd->transaction.stmt.modified_non_trans_table= + thd->transaction.all.modified_non_trans_table= TRUE; /* See similar binlogging code in sql_update.cc, for comments */ if ((error < 0) || thd->transaction.stmt.modified_non_trans_table) @@ -414,15 +415,13 @@ cleanup: */ int log_result= thd->binlog_query(query_type, thd->query, thd->query_length, - is_trans, FALSE, errcode); + is_trans, FALSE, FALSE, errcode); if (log_result) { error=1; } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } DBUG_ASSERT(transactional_table || !deleted || thd->transaction.stmt.modified_non_trans_table); free_underlaid_joins(thd, select_lex); @@ -808,6 +807,9 @@ void multi_delete::abort() if (deleted) query_cache_invalidate3(thd, delete_tables, 1); + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + /* If rows from the first table only has been deleted and it is transactional, just do rollback. @@ -838,9 +840,8 @@ void multi_delete::abort() int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - transactional_tables, FALSE, errcode); + transactional_tables, FALSE, FALSE, errcode); } - thd->transaction.all.modified_non_trans_table= true; } DBUG_VOID_RETURN; } @@ -967,6 +968,9 @@ bool multi_delete::send_eof() /* reset used flags */ thd_proc_info(thd, "end"); + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + /* We must invalidate the query cache before binlog writing and ha_autocommit_... @@ -986,14 +990,12 @@ bool multi_delete::send_eof() errcode= query_error_code(thd, killed_status == THD::NOT_KILLED); if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - transactional_tables, FALSE, errcode) && + transactional_tables, FALSE, FALSE, errcode) && !normal_tables) { local_error=1; // Log write failed: roll back the SQL statement } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } if (local_error != 0) error_handled= TRUE; // to force early leave from ::send_error() diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index e5b03a1d44d..4bb2045c06c 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -856,6 +856,10 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, */ query_cache_invalidate3(thd, table_list, 1); } + + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + if ((changed && error <= 0) || thd->transaction.stmt.modified_non_trans_table || was_insert_delayed) @@ -895,14 +899,12 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0); if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - transactional_table, FALSE, - errcode)) + transactional_table, FALSE, FALSE, + errcode)) { error=1; } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } DBUG_ASSERT(transactional_table || !changed || thd->transaction.stmt.modified_non_trans_table); @@ -2565,6 +2567,7 @@ bool Delayed_insert::handle_inserts(void) { int error; ulong max_rows; + bool has_trans = TRUE; bool using_ignore= 0, using_opt_replace= 0, using_bin_log= mysql_bin_log.is_open(); delayed_row *row; @@ -2717,7 +2720,7 @@ bool Delayed_insert::handle_inserts(void) */ thd.binlog_query(THD::ROW_QUERY_TYPE, row->query.str, row->query.length, - FALSE, FALSE, errcode); + FALSE, FALSE, FALSE, errcode); thd.time_zone_used = backup_time_zone_used; thd.variables.time_zone = backup_time_zone; @@ -2785,9 +2788,11 @@ bool Delayed_insert::handle_inserts(void) or trigger. TODO: Move the logging to last in the sequence of rows. - */ + */ + has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE || + table->file->has_transactions(); if (thd.is_current_stmt_binlog_format_row()) - thd.binlog_flush_pending_rows_event(TRUE); + thd.binlog_flush_pending_rows_event(TRUE, has_trans); if ((error=table->file->extra(HA_EXTRA_NO_CACHE))) { // This shouldn't happen @@ -3214,9 +3219,11 @@ bool select_insert::send_eof() and ha_autocommit_or_rollback. */ query_cache_invalidate3(thd, table, 1); - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } + + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + DBUG_ASSERT(trans_table || !changed || thd->transaction.stmt.modified_non_trans_table); @@ -3235,7 +3242,7 @@ bool select_insert::send_eof() errcode= query_error_code(thd, killed_status == THD::NOT_KILLED); thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - trans_table, FALSE, errcode); + trans_table, FALSE, FALSE, errcode); } table->file->ha_release_auto_increment(); @@ -3301,14 +3308,15 @@ void select_insert::abort() { transactional_table= table->file->has_transactions(); if (thd->transaction.stmt.modified_non_trans_table) { + if (!can_rollback_data()) + thd->transaction.all.modified_non_trans_table= TRUE; + if (mysql_bin_log.is_open()) { int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - transactional_table, FALSE, errcode); + transactional_table, FALSE, FALSE, errcode); } - if (!thd->is_current_stmt_binlog_format_row() && !can_rollback_data()) - thd->transaction.all.modified_non_trans_table= TRUE; if (changed) query_cache_invalidate3(thd, table, 1); } @@ -3709,6 +3717,7 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) thd->binlog_query(THD::STMT_QUERY_TYPE, query.ptr(), query.length(), /* is_trans */ TRUE, + /* direct */ FALSE, /* suppress_use */ FALSE, errcode); } @@ -3809,7 +3818,7 @@ void select_create::abort() select_insert::abort(); thd->transaction.stmt.modified_non_trans_table= FALSE; reenable_binlog(thd); - thd->binlog_flush_pending_rows_event(TRUE); + thd->binlog_flush_pending_rows_event(TRUE, TRUE); if (m_plock) { diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc index a36b947fecc..a08338f4c76 100644 --- a/sql/sql_lex.cc +++ b/sql/sql_lex.cc @@ -44,7 +44,8 @@ Query_tables_list::binlog_stmt_unsafe_errcode[BINLOG_STMT_UNSAFE_COUNT] = ER_BINLOG_UNSAFE_TWO_AUTOINC_COLUMNS, ER_BINLOG_UNSAFE_UDF, ER_BINLOG_UNSAFE_SYSTEM_VARIABLE, - ER_BINLOG_UNSAFE_SYSTEM_FUNCTION + ER_BINLOG_UNSAFE_SYSTEM_FUNCTION, + ER_BINLOG_UNSAFE_NONTRANS_AFTER_TRANS }; diff --git a/sql/sql_lex.h b/sql/sql_lex.h index db4b8701635..3e278f784a6 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -1094,6 +1094,13 @@ public: */ BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION, + /** + Mixing transactional and non-transactional statements are unsafe if + non-transactional reads or writes are occur after transactional + reads or writes inside a transaction. + */ + BINLOG_STMT_UNSAFE_NONTRANS_AFTER_TRANS, + /* The last element of this enumeration type. */ BINLOG_STMT_UNSAFE_COUNT }; diff --git a/sql/sql_load.cc b/sql/sql_load.cc index ab5f3165974..986af637931 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -530,7 +530,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, after this point. */ if (thd->is_current_stmt_binlog_format_row()) - thd->binlog_flush_pending_rows_event(true); + thd->binlog_flush_pending_rows_event(TRUE, transactional_table); else { /* @@ -575,7 +575,7 @@ static bool write_execute_load_query_log_event(THD *thd, (uint) ((char*)thd->lex->fname_end - (char*)thd->query), (duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE : (ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR), - transactional_table, FALSE, errcode); + transactional_table, FALSE, FALSE, errcode); e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F; return mysql_bin_log.write(&e); } diff --git a/sql/sql_table.cc b/sql/sql_table.cc index 84868cfb509..1d6c5ba48b4 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -1730,7 +1730,7 @@ end: */ void write_bin_log(THD *thd, bool clear_error, - char const *query, ulong query_length) + char const *query, ulong query_length, bool is_trans) { if (mysql_bin_log.is_open()) { @@ -1739,8 +1739,9 @@ void write_bin_log(THD *thd, bool clear_error, thd->clear_error(); else errcode= query_error_code(thd, TRUE); + thd->binlog_query(THD::STMT_QUERY_TYPE, - query, query_length, FALSE, FALSE, errcode); + query, query_length, is_trans, FALSE, FALSE, errcode); } } @@ -2127,7 +2128,8 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists, */ built_tmp_query.chop(); // Chop of the last comma built_tmp_query.append(" /* generated by server */"); - write_bin_log(thd, !error, built_tmp_query.ptr(), built_tmp_query.length()); + write_bin_log(thd, !error, built_tmp_query.ptr(), built_tmp_query.length(), + thd->in_multi_stmt_transaction()); } } @@ -6525,7 +6527,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name, { thd->clear_error(); Query_log_event qinfo(thd, thd->query, thd->query_length, - 0, FALSE, 0); + FALSE, TRUE, FALSE, 0); mysql_bin_log.write(&qinfo); } my_ok(thd); diff --git a/sql/sql_update.cc b/sql/sql_update.cc index b0085864108..c2b0c66d3a6 100644 --- a/sql/sql_update.cc +++ b/sql/sql_update.cc @@ -789,6 +789,9 @@ int mysql_update(THD *thd, { query_cache_invalidate3(thd, table_list, 1); } + + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; /* error < 0 means really no error at all: we processed all rows until the @@ -811,13 +814,11 @@ int mysql_update(THD *thd, if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - transactional_table, FALSE, errcode)) + transactional_table, FALSE, FALSE, errcode)) { error=1; // Rollback update } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } DBUG_ASSERT(transactional_table || !updated || thd->transaction.stmt.modified_non_trans_table); free_underlaid_joins(thd, select_lex); @@ -1735,10 +1736,10 @@ bool multi_update::send_data(List<Item> ¬_used_values) /* non-transactional or transactional table got modified */ /* either multi_update class' flag is raised in its branch */ if (table->file->has_transactions()) - transactional_tables= 1; + transactional_tables= TRUE; else { - trans_safe= 0; + trans_safe= FALSE; thd->transaction.stmt.modified_non_trans_table= TRUE; } } @@ -1848,7 +1849,7 @@ void multi_update::abort() int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - transactional_tables, FALSE, errcode); + transactional_tables, FALSE, FALSE, errcode); } thd->transaction.all.modified_non_trans_table= TRUE; } @@ -1986,10 +1987,10 @@ int multi_update::do_updates() if (updated != org_updated) { if (table->file->has_transactions()) - transactional_tables= 1; + transactional_tables= TRUE; else { - trans_safe= 0; // Can't do safe rollback + trans_safe= FALSE; // Can't do safe rollback thd->transaction.stmt.modified_non_trans_table= TRUE; } } @@ -2019,10 +2020,10 @@ err2: if (updated != org_updated) { if (table->file->has_transactions()) - transactional_tables= 1; + transactional_tables= TRUE; else { - trans_safe= 0; + trans_safe= FALSE; thd->transaction.stmt.modified_non_trans_table= TRUE; } } @@ -2068,8 +2069,9 @@ bool multi_update::send_eof() either from the query's list or via a stored routine: bug#13270,23333 */ - DBUG_ASSERT(trans_safe || !updated || - thd->transaction.stmt.modified_non_trans_table); + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; + if (local_error == 0 || thd->transaction.stmt.modified_non_trans_table) { if (mysql_bin_log.is_open()) @@ -2081,14 +2083,15 @@ bool multi_update::send_eof() errcode= query_error_code(thd, killed_status == THD::NOT_KILLED); if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - transactional_tables, FALSE, errcode)) + transactional_tables, FALSE, FALSE, errcode)) { local_error= 1; // Rollback update } } - if (thd->transaction.stmt.modified_non_trans_table) - thd->transaction.all.modified_non_trans_table= TRUE; } + DBUG_ASSERT(trans_safe || !updated || + thd->transaction.stmt.modified_non_trans_table); + if (local_error != 0) error_handled= TRUE; // to force early leave from ::send_error() diff --git a/sql/sql_view.cc b/sql/sql_view.cc index 8b00b8f0d9b..9b583f2e869 100644 --- a/sql/sql_view.cc +++ b/sql/sql_view.cc @@ -663,7 +663,7 @@ bool mysql_create_view(THD *thd, TABLE_LIST *views, int errcode= query_error_code(thd, TRUE); thd->binlog_query(THD::STMT_QUERY_TYPE, - buff.ptr(), buff.length(), FALSE, FALSE, errcode); + buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcode); } VOID(pthread_mutex_unlock(&LOCK_open)); |