diff options
author | Monty <monty@mariadb.org> | 2020-01-28 23:23:51 +0200 |
---|---|---|
committer | Monty <monty@mariadb.org> | 2020-03-24 21:00:03 +0200 |
commit | 91ab42a823b244a3d4b051ab79701b7a552f274a (patch) | |
tree | 21719a1583bb31297a1a9c07b6b90bee460a28a3 | |
parent | f51df1dc78ad2b8f1d3173f43e35dddecaf8a6e2 (diff) | |
download | mariadb-git-91ab42a823b244a3d4b051ab79701b7a552f274a.tar.gz |
Clean up and speed up interfaces for binary row logging
MDEV-21605 Clean up and speed up interfaces for binary row logging
MDEV-21617 Bug fix for previous version of this code
The intention is to have as few 'if' as possible in ha_write() and
related functions. This is done by pre-calculating once per statement the
row_logging state for all tables.
Benefits are simpler and faster code both when binary logging is disabled
and when it's enabled.
Changes:
- Added handler->row_logging to make it easy to check it table should be
row logged. This also made it easier to disabling row logging for system,
internal and temporary tables.
- The tables row_logging capabilities are checked once per "statements
that updates tables" in THD::binlog_prepare_for_row_logging() which
is called when needed from THD::decide_logging_format().
- Removed most usage of tmp_disable_binlog(), reenable_binlog() and
temporary saving and setting of thd->variables.option_bits.
- Moved checks that can't change during a statement from
check_table_binlog_row_based() to check_table_binlog_row_based_internal()
- Removed flag row_already_logged (used by sequence engine)
- Moved binlog_log_row() to a handler::
- Moved write_locked_table_maps() to THD::binlog_write_table_maps() as
most other related binlog functions are in THD.
- Removed binlog_write_table_map() and binlog_log_row_internal() as
they are now obsolete as 'has_transactions()' is pre-calculated in
prepare_for_row_logging().
- Remove 'is_transactional' argument from binlog_write_table_map() as this
can now be read from handler.
- Changed order of 'if's in handler::external_lock() and wsrep_mysqld.h
to first evaluate fast and likely cases before more complex ones.
- Added error checking in ha_write_row() and related functions if
binlog_log_row() failed.
- Don't clear check_table_binlog_row_based_result in
clear_cached_table_binlog_row_based_flag() as it's not needed.
- THD::clear_binlog_table_maps() has been replaced with
THD::reset_binlog_for_next_statement()
- Added 'MYSQL_OPEN_IGNORE_LOGGING_FORMAT' flag to open_and_lock_tables()
to avoid calculating of binary log format for internal opens. This flag
is also used to avoid reading statistics tables for internal tables.
- Added OPTION_BINLOG_LOG_OFF as a simple way to turn of binlog temporary
for create (instead of using THD::sql_log_bin_off.
- Removed flag THD::sql_log_bin_off (not needed anymore)
- Speed up THD::decide_logging_format() by remembering if blackhole engine
is used and avoid a loop over all tables if it's not used
(the common case).
- THD::decide_logging_format() is not called anymore if no tables are used
for the statement. This will speed up pure stored procedure code with
about 5%+ according to some simple tests.
- We now get annotated events on slave if a CREATE ... SELECT statement
is transformed on the slave from statement to row logging.
- In the original code, the master could come into a state where row
logging is enforced for all future events if statement could be used.
This is now partly fixed.
Other changes:
- Ensure that all tables used by a statement has query_id set.
- Had to restore the row_logging flag for not used tables in
THD::binlog_write_table_maps (not normal scenario)
- Removed injector::transaction::use_table(server_id_type sid, table tbl)
as it's not used.
- Cleaned up set_slave_thread_options()
- Some more DBUG_ENTER/DBUG_RETURN, code comments and minor indentation
changes.
- Ensure we only call THD::decide_logging_format_low() once in
mysql_insert() (inefficiency).
- Don't annotate INSERT DELAYED
- Removed zeroing pos_in_table_list in THD::open_temporary_table() as it's
already 0
35 files changed, 570 insertions, 428 deletions
diff --git a/mysql-test/suite/binlog/t/foreign_key.test b/mysql-test/suite/binlog/t/foreign_key.test new file mode 100644 index 00000000000..87c719e4b6f --- /dev/null +++ b/mysql-test/suite/binlog/t/foreign_key.test @@ -0,0 +1,22 @@ +--source include/have_innodb.inc +--source include/have_binlog_format_row.inc + +reset master; + +CREATE TABLE t1 ( + id INT, + k INT, + c CHAR(8), + KEY (k), + PRIMARY KEY (id), + FOREIGN KEY (id) REFERENCES t1 (k) +) ENGINE=InnoDB; +LOCK TABLES t1 WRITE; +SET SESSION FOREIGN_KEY_CHECKS= OFF; +SET AUTOCOMMIT=OFF; +INSERT INTO t1 VALUES (1,1,'foo'); +DROP TABLE t1; +SET SESSION FOREIGN_KEY_CHECKS= ON; +SET AUTOCOMMIT=ON; + +source include/show_binlog_events.inc; diff --git a/mysql-test/suite/rpl/r/create_or_replace_mix.result b/mysql-test/suite/rpl/r/create_or_replace_mix.result index 661278aa7ef..6c83d27eef9 100644 --- a/mysql-test/suite/rpl/r/create_or_replace_mix.result +++ b/mysql-test/suite/rpl/r/create_or_replace_mix.result @@ -85,9 +85,12 @@ master-bin.000001 # Gtid # # GTID #-#-# master-bin.000001 # Query # # use `test`; DROP TABLE IF EXISTS `t1` /* generated by server */ master-bin.000001 # Gtid # # GTID #-#-# master-bin.000001 # Query # # use `test`; create table t1 (a int) -master-bin.000001 # Gtid # # BEGIN GTID #-#-# +master-bin.000001 # Gtid # # GTID #-#-# master-bin.000001 # Query # # use `test`; DROP TABLE IF EXISTS `test`.`t1`/* Generated to handle failed CREATE OR REPLACE */ -master-bin.000001 # Query # # ROLLBACK +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # use `test`; create temporary table t9 (a int) +master-bin.000001 # Gtid # # GTID #-#-# +master-bin.000001 # Query # # DROP TEMPORARY TABLE IF EXISTS `test`.`t9`/* Generated to handle failed CREATE OR REPLACE */ connection server_2; show tables; Tables_in_test @@ -154,7 +157,7 @@ slave-bin.000001 # Query # # use `test`; create table t4 (server_2_to_be_delete slave-bin.000001 # Gtid # # GTID #-#-# slave-bin.000001 # Query # # use `test`; create table t1 (new_table int) slave-bin.000001 # Gtid # # BEGIN GTID #-#-# -slave-bin.000001 # Query # # use `test`; CREATE TABLE `t2` ( +slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` ( `a` int(11) DEFAULT NULL ) slave-bin.000001 # Annotate_rows # # create table t2 select * from t9 @@ -223,26 +226,12 @@ Log_name Pos Event_type Server_id End_log_pos Info slave-bin.000001 # Gtid # # GTID #-#-# slave-bin.000001 # Query # # use `test`; create table t1 (a int) slave-bin.000001 # Gtid # # BEGIN GTID #-#-# -slave-bin.000001 # Annotate_rows # # insert into t1 values (0),(1),(2) -slave-bin.000001 # Table_map # # table_id: # (test.t1) -slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F +slave-bin.000001 # Query # # use `test`; insert into t1 values (0),(1),(2) slave-bin.000001 # Query # # COMMIT -slave-bin.000001 # Gtid # # BEGIN GTID #-#-# -slave-bin.000001 # Query # # use `test`; CREATE TABLE `t2` ( - `a` int(11) DEFAULT NULL -) ENGINE=MyISAM -slave-bin.000001 # Annotate_rows # # create table t2 engine=myisam select * from t1 -slave-bin.000001 # Table_map # # table_id: # (test.t2) -slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F -slave-bin.000001 # Query # # COMMIT -slave-bin.000001 # Gtid # # BEGIN GTID #-#-# -slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` ( - `a` int(11) DEFAULT NULL -) ENGINE=InnoDB -slave-bin.000001 # Annotate_rows # # create or replace table t2 engine=innodb select * from t1 -slave-bin.000001 # Table_map # # table_id: # (test.t2) -slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F -slave-bin.000001 # Xid # # COMMIT /* XID */ +slave-bin.000001 # Gtid # # GTID #-#-# +slave-bin.000001 # Query # # use `test`; create table t2 engine=myisam select * from t1 +slave-bin.000001 # Gtid # # GTID #-#-# +slave-bin.000001 # Query # # use `test`; create or replace table t2 engine=innodb select * from t1 connection server_1; drop table t1; # diff --git a/mysql-test/suite/rpl/r/create_or_replace_statement.result b/mysql-test/suite/rpl/r/create_or_replace_statement.result index f95b451e5ec..6c83d27eef9 100644 --- a/mysql-test/suite/rpl/r/create_or_replace_statement.result +++ b/mysql-test/suite/rpl/r/create_or_replace_statement.result @@ -160,6 +160,7 @@ slave-bin.000001 # Gtid # # BEGIN GTID #-#-# slave-bin.000001 # Query # # use `test`; CREATE OR REPLACE TABLE `t2` ( `a` int(11) DEFAULT NULL ) +slave-bin.000001 # Annotate_rows # # create table t2 select * from t9 slave-bin.000001 # Table_map # # table_id: # (test.t2) slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F slave-bin.000001 # Query # # COMMIT @@ -171,6 +172,7 @@ slave-bin.000001 # Gtid # # BEGIN GTID #-#-# slave-bin.000001 # Query # # use `test`; CREATE TABLE `t5` ( `a` int(11) DEFAULT NULL ) +slave-bin.000001 # Annotate_rows # # create table t5 select * from t9 slave-bin.000001 # Table_map # # table_id: # (test.t5) slave-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F slave-bin.000001 # Query # # COMMIT diff --git a/mysql-test/suite/rpl/t/create_or_replace.inc b/mysql-test/suite/rpl/t/create_or_replace.inc index 35a6ead60ca..df46cc36e97 100644 --- a/mysql-test/suite/rpl/t/create_or_replace.inc +++ b/mysql-test/suite/rpl/t/create_or_replace.inc @@ -56,6 +56,7 @@ create or replace table t1 (a int primary key) select a from t2; # Same with temporary table create temporary table t9 (a int); + --error ER_DUP_ENTRY create or replace temporary table t9 (a int primary key) select a from t2; diff --git a/sql/events.cc b/sql/events.cc index 91adc0c23ba..acf472736e8 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -1241,15 +1241,9 @@ Events::load_events_from_db(THD *thd) TRUE); /* All the dmls to mysql.events tables are stmt bin-logged. */ - bool save_binlog_row_based; - if ((save_binlog_row_based= thd->is_current_stmt_binlog_format_row())) - thd->set_current_stmt_binlog_format_stmt(); - + table->file->row_logging= 0; (void) table->file->ha_update_row(table->record[1], table->record[0]); - if (save_binlog_row_based) - thd->set_current_stmt_binlog_format_row(); - delete et; continue; } diff --git a/sql/ha_partition.cc b/sql/ha_partition.cc index 41cf8bb2ccd..813351c658f 100644 --- a/sql/ha_partition.cc +++ b/sql/ha_partition.cc @@ -2131,12 +2131,10 @@ int ha_partition::copy_partitions(ulonglong * const copied, } else { - THD *thd= ha_thd(); /* Copy record to new handler */ (*copied)++; - tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */ + DBUG_ASSERT(!m_new_file[new_part]->row_logging); result= m_new_file[new_part]->ha_write_row(m_rec0); - reenable_binlog(thd); if (result) goto error; } @@ -4374,11 +4372,10 @@ int ha_partition::write_row(const uchar * buf) start_part_bulk_insert(thd, part_id); - tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */ + DBUG_ASSERT(!m_file[part_id]->row_logging); error= m_file[part_id]->ha_write_row(buf); if (have_auto_increment && !table->s->next_number_keypart) set_auto_increment_if_higher(table->next_number_field); - reenable_binlog(thd); exit: table->auto_increment_field_not_null= saved_auto_inc_field_not_null; @@ -4457,12 +4454,11 @@ int ha_partition::update_row(const uchar *old_data, const uchar *new_data) m_last_part= new_part_id; start_part_bulk_insert(thd, new_part_id); + DBUG_ASSERT(!m_file[new_part_id]->row_logging); if (new_part_id == old_part_id) { DBUG_PRINT("info", ("Update in partition %u", (uint) new_part_id)); - tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */ error= m_file[new_part_id]->ha_update_row(old_data, new_data); - reenable_binlog(thd); goto exit; } else @@ -4481,16 +4477,12 @@ int ha_partition::update_row(const uchar *old_data, const uchar *new_data) table->next_number_field= NULL; DBUG_PRINT("info", ("Update from partition %u to partition %u", (uint) old_part_id, (uint) new_part_id)); - tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */ error= m_file[new_part_id]->ha_write_row((uchar*) new_data); - reenable_binlog(thd); table->next_number_field= saved_next_number_field; if (unlikely(error)) goto exit; - tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */ error= m_file[old_part_id]->ha_delete_row(old_data); - reenable_binlog(thd); if (unlikely(error)) goto exit; } @@ -4592,9 +4584,8 @@ int ha_partition::delete_row(const uchar *buf) if (!bitmap_is_set(&(m_part_info->lock_partitions), m_last_part)) DBUG_RETURN(HA_ERR_NOT_IN_LOCK_PARTITIONS); - tmp_disable_binlog(thd); + DBUG_ASSERT(!m_file[m_last_part]->row_logging); error= m_file[m_last_part]->ha_delete_row(buf); - reenable_binlog(thd); DBUG_RETURN(error); } diff --git a/sql/ha_sequence.cc b/sql/ha_sequence.cc index 6cb9937ebb4..71da208d775 100644 --- a/sql/ha_sequence.cc +++ b/sql/ha_sequence.cc @@ -202,7 +202,11 @@ int ha_sequence::write_row(const uchar *buf) DBUG_ENTER("ha_sequence::write_row"); DBUG_ASSERT(table->record[0] == buf); - row_already_logged= 0; + /* + Log to binary log even if this function has been called before + (The function ends by setting row_logging to 0) + */ + row_logging= row_logging_init; if (unlikely(sequence->initialized == SEQUENCE::SEQ_IN_PREPARE)) { /* This calls is from ha_open() as part of create table */ @@ -218,6 +222,7 @@ int ha_sequence::write_row(const uchar *buf) sequence->copy(&tmp_seq); if (likely(!(error= file->write_row(buf)))) sequence->initialized= SEQUENCE::SEQ_READY_TO_USE; + row_logging= 0; DBUG_RETURN(error); } if (unlikely(sequence->initialized != SEQUENCE::SEQ_READY_TO_USE)) @@ -262,10 +267,12 @@ int ha_sequence::write_row(const uchar *buf) sequence->copy(&tmp_seq); rows_changed++; /* We have to do the logging while we hold the sequence mutex */ - error= binlog_log_row(table, 0, buf, log_func); - row_already_logged= 1; + if (row_logging) + error= binlog_log_row(table, 0, buf, log_func); } + /* Row is already logged, don't log it again in ha_write_row() */ + row_logging= 0; sequence->all_values_used= 0; if (!sequence_locked) sequence->write_unlock(table); diff --git a/sql/handler.cc b/sql/handler.cc index 9a3766c90de..c3f78693849 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -3620,8 +3620,9 @@ int handler::update_auto_increment() variables->auto_increment_increment); auto_inc_intervals_count++; /* Row-based replication does not need to store intervals in binlog */ - if (((WSREP(thd) && wsrep_emulate_bin_log ) || mysql_bin_log.is_open()) - && !thd->is_current_stmt_binlog_format_row()) + if (((WSREP_NNULL(thd) && wsrep_emulate_bin_log) || + mysql_bin_log.is_open()) && + !thd->is_current_stmt_binlog_format_row()) thd->auto_inc_intervals_in_cur_stmt_for_binlog. append(auto_inc_interval_for_cur_row.minimum(), auto_inc_interval_for_cur_row.values(), @@ -6366,32 +6367,35 @@ bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat) 1 Row needs to be logged */ -bool handler::check_table_binlog_row_based(bool binlog_row) +bool handler::check_table_binlog_row_based() { - if (table->versioned(VERS_TRX_ID)) - return false; - if (unlikely((table->in_use->variables.sql_log_bin_off))) - return 0; /* Called by partitioning engine */ -#ifdef WITH_WSREP - if (!table->in_use->variables.sql_log_bin && - wsrep_thd_is_applying(table->in_use)) - return 0; /* wsrep patch sets sql_log_bin to silence binlogging - from high priority threads */ -#endif /* WITH_WSREP */ if (unlikely((!check_table_binlog_row_based_done))) { check_table_binlog_row_based_done= 1; check_table_binlog_row_based_result= - check_table_binlog_row_based_internal(binlog_row); + check_table_binlog_row_based_internal(); } return check_table_binlog_row_based_result; } -bool handler::check_table_binlog_row_based_internal(bool binlog_row) +bool handler::check_table_binlog_row_based_internal() { THD *thd= table->in_use; +#ifdef WITH_WSREP + if (!thd->variables.sql_log_bin && + wsrep_thd_is_applying(table->in_use)) + { + /* + wsrep patch sets sql_log_bin to silence binlogging from high + priority threads + */ + return 0; + } +#endif return (table->s->can_do_row_logging && + !table->versioned(VERS_TRX_ID) && + !(thd->variables.option_bits & OPTION_BIN_TMP_LOG_OFF) && thd->is_current_stmt_binlog_format_row() && /* Wsrep partially enables binary logging if it have not been @@ -6407,9 +6411,9 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row) Otherwise, return 'true' if binary logging is on. */ - IF_WSREP(((WSREP_EMULATE_BINLOG(thd) && + IF_WSREP(((WSREP_EMULATE_BINLOG_NNULL(thd) && wsrep_thd_is_local(thd)) || - ((WSREP(thd) || + ((WSREP_NNULL(thd) || (thd->variables.option_bits & OPTION_BIN_LOG)) && mysql_bin_log.is_open())), (thd->variables.option_bits & OPTION_BIN_LOG) && @@ -6417,151 +6421,22 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row) } -/** @brief - Write table maps for all (manually or automatically) locked tables - to the binary log. Also, if binlog_annotate_row_events is ON, - write Annotate_rows event before the first table map. - - SYNOPSIS - write_locked_table_maps() - thd Pointer to THD structure - - DESCRIPTION - This function will generate and write table maps for all tables - that are locked by the thread 'thd'. - - RETURN VALUE - 0 All OK - 1 Failed to write all table maps - - SEE ALSO - THD::lock -*/ - -static int write_locked_table_maps(THD *thd) -{ - DBUG_ENTER("write_locked_table_maps"); - DBUG_PRINT("enter", ("thd:%p thd->lock:%p " - "thd->extra_lock: %p", - thd, thd->lock, thd->extra_lock)); - - DBUG_PRINT("debug", ("get_binlog_table_maps(): %d", thd->get_binlog_table_maps())); - - MYSQL_LOCK *locks[2]; - locks[0]= thd->extra_lock; - locks[1]= thd->lock; - my_bool with_annotate= IF_WSREP(!wsrep_fragments_certified_for_stmt(thd), - true) && - thd->variables.binlog_annotate_row_events && - thd->query() && thd->query_length(); - - for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i ) - { - MYSQL_LOCK const *const lock= locks[i]; - if (lock == NULL) - continue; - - TABLE **const end_ptr= lock->table + lock->table_count; - for (TABLE **table_ptr= lock->table ; - table_ptr != end_ptr ; - ++table_ptr) - { - TABLE *const table= *table_ptr; - if (table->current_lock == F_WRLCK && - table->file->check_table_binlog_row_based(0)) - { - if (binlog_write_table_map(thd, table, with_annotate)) - DBUG_RETURN(1); - with_annotate= 0; - } - } - } - DBUG_RETURN(0); -} - - -int binlog_write_table_map(THD *thd, TABLE *table, bool with_annotate) -{ - DBUG_ENTER("binlog_write_table_map"); - DBUG_PRINT("info", ("table %s", table->s->table_name.str)); - /* - We need to have a transactional behavior for SQLCOM_CREATE_TABLE - (e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a - compatible behavior with the STMT based replication even when - the table is not transactional. In other words, if the operation - fails while executing the insert phase nothing is written to the - binlog. - - Note that at this point, we check the type of a set of tables to - create the table map events. In the function binlog_log_row(), - which calls the current function, we check the type of the table - of the current row. - */ - bool const has_trans= ((sql_command_flags[thd->lex->sql_command] & - (CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) || - table->file->has_transactions()); - int const error= thd->binlog_write_table_map(table, has_trans, - &with_annotate); - /* - If an error occurs, it is the responsibility of the caller to - roll back the transaction. - */ - if (unlikely(error)) - DBUG_RETURN(1); - DBUG_RETURN(0); -} - - -static int binlog_log_row_internal(TABLE* table, - const uchar *before_record, - const uchar *after_record, - Log_func *log_func) -{ - bool error= 0; - THD *const thd= table->in_use; - - /* - If there are no table maps written to the binary log, this is - the first row handled in this statement. In that case, we need - to write table maps for all locked tables to the binary log. - */ - if (likely(!(error= ((thd->get_binlog_table_maps() == 0 && - write_locked_table_maps(thd)))))) - { - /* - We need to have a transactional behavior for SQLCOM_CREATE_TABLE - (i.e. CREATE TABLE... SELECT * FROM TABLE) in order to keep a - compatible behavior with the STMT based replication even when - the table is not transactional. In other words, if the operation - fails while executing the insert phase nothing is written to the - binlog. We need the same also for ALTER TABLE in the case we convert - a shared table to a not shared table as in this case we will log all - rows. - */ - bool const has_trans= ((sql_command_flags[thd->lex->sql_command] & - (CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) || - table->file->has_transactions()); - error= (*log_func)(thd, table, has_trans, before_record, after_record); - } - return error ? HA_ERR_RBR_LOGGING_FAILED : 0; -} - -int binlog_log_row(TABLE* table, const uchar *before_record, - const uchar *after_record, Log_func *log_func) +int handler::binlog_log_row(TABLE *table, + const uchar *before_record, + const uchar *after_record, + Log_func *log_func) { -#ifdef WITH_WSREP - THD *const thd= table->in_use; + bool error; + THD *thd= table->in_use; + DBUG_ENTER("binlog_log_row"); - /* only InnoDB tables will be replicated through binlog emulation */ - if ((WSREP_EMULATE_BINLOG(thd) && - !(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) || - thd->wsrep_ignore_table == true) - return 0; -#endif + if (!thd->binlog_table_maps && + thd->binlog_write_table_maps()) + DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED); - if (!table->file->check_table_binlog_row_based(1)) - return 0; - return binlog_log_row_internal(table, before_record, after_record, log_func); + error= (*log_func)(thd, table, row_logging_has_trans, + before_record, after_record); + DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0); } @@ -6647,6 +6522,7 @@ int handler::ha_external_lock(THD *thd, int lock_type) int handler::ha_reset() { DBUG_ENTER("ha_reset"); + /* Check that we have called all proper deallocation functions */ DBUG_ASSERT((uchar*) table->def_read_set.bitmap + table->s->column_bitmap_size == @@ -6662,6 +6538,10 @@ int handler::ha_reset() pushed_cond= NULL; tracker= NULL; mark_trx_read_write_done= 0; + /* + Disable row logging. + */ + row_logging= row_logging_init= 0; clear_cached_table_binlog_row_based_flag(); /* Reset information about pushed engine conditions */ cancel_pushed_idx_cond(); @@ -6678,8 +6558,8 @@ static int wsrep_after_row(THD *thd) /* enforce wsrep_max_ws_rows */ thd->wsrep_affected_rows++; if (wsrep_max_ws_rows && - wsrep_thd_is_local(thd) && - thd->wsrep_affected_rows > wsrep_max_ws_rows) + thd->wsrep_affected_rows > wsrep_max_ws_rows && + wsrep_thd_is_local(thd)) { trans_rollback_stmt(thd) || trans_rollback(thd); my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0)); @@ -6864,6 +6744,74 @@ static int check_duplicate_long_entries_update(TABLE *table, uchar *new_rec) /** + Check if galera disables binary logging for this table + + @return 0 Binary logging disabled + @return 1 Binary logging can be enabled +*/ + + +static inline bool wsrep_check_if_binlog_row(TABLE *table) +{ +#ifdef WITH_WSREP + THD *const thd= table->in_use; + + /* only InnoDB tables will be replicated through binlog emulation */ + if ((WSREP_EMULATE_BINLOG(thd) && + !(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) || + thd->wsrep_ignore_table == true) + return 0; +#endif + return 1; +} + + +/** + Prepare handler for row logging + + @return 0 if handler will not participate in row logging + @return 1 handler will participate in row logging + + This function is always safe to call on an opened table. +*/ + +bool handler::prepare_for_row_logging() +{ + DBUG_ENTER("handler::prepare_for_row_logging"); + + /* Check if we should have row logging */ + if (wsrep_check_if_binlog_row(table) && + check_table_binlog_row_based()) + { + /* + Row logging enabled. Intialize all variables and write + annotated and table maps + */ + row_logging= row_logging_init= 1; + + /* + We need to have a transactional behavior for SQLCOM_CREATE_TABLE + (e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a + compatible behavior with the STMT based replication even when + the table is not transactional. In other words, if the operation + fails while executing the insert phase nothing is written to the + binlog. + */ + row_logging_has_trans= + ((sql_command_flags[table->in_use->lex->sql_command] & + (CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) || + table->file->has_transactions()); + } + else + { + /* Check row_logging has not been properly cleared from previous command */ + DBUG_ASSERT(row_logging == 0); + } + DBUG_RETURN(row_logging); +} + + +/* Do all initialization needed for insert @param force_update_handler Set to TRUE if we should always create an @@ -6891,7 +6839,6 @@ int handler::prepare_for_insert(bool force_update_handler) int handler::ha_write_row(const uchar *buf) { int error; - Log_func *log_func= Write_rows_log_event::binlog_row_logging_function; DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK); DBUG_ENTER("handler::ha_write_row"); @@ -6911,13 +6858,17 @@ int handler::ha_write_row(const uchar *buf) { error= write_row(buf); }) MYSQL_INSERT_ROW_DONE(error); - if (likely(!error) && !row_already_logged) + if (likely(!error)) { rows_changed++; - error= binlog_log_row(table, 0, buf, log_func); + if (row_logging) + { + Log_func *log_func= Write_rows_log_event::binlog_row_logging_function; + error= binlog_log_row(table, 0, buf, log_func); + } #ifdef WITH_WSREP - if (table_share->tmp_table == NO_TMP_TABLE && - WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd()))) + if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE && + !error && (error= wsrep_after_row(ha_thd()))) { DBUG_RETURN(error); } @@ -6932,10 +6883,8 @@ int handler::ha_write_row(const uchar *buf) int handler::ha_update_row(const uchar *old_data, const uchar *new_data) { int error; - Log_func *log_func= Update_rows_log_event::binlog_row_logging_function; DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK); - /* Some storage engines require that the new record is in record[0] (and the old record is in record[1]). @@ -6950,20 +6899,22 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data) (error= check_duplicate_long_entries_update(table, (uchar*) new_data))) return error; - TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, error, + TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, 0, { error= update_row(old_data, new_data);}) MYSQL_UPDATE_ROW_DONE(error); - if (likely(!error) && !row_already_logged) + if (likely(!error)) { rows_changed++; - error= binlog_log_row(table, old_data, new_data, log_func); -#ifdef WITH_WSREP - if (table_share->tmp_table == NO_TMP_TABLE && - WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd()))) + if (row_logging) { - return error; + Log_func *log_func= Update_rows_log_event::binlog_row_logging_function; + error= binlog_log_row(table, old_data, new_data, log_func); } +#ifdef WITH_WSREP + if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE && + !error && (error= wsrep_after_row(ha_thd()))) + return error; #endif /* WITH_WSREP */ } return error; @@ -7000,7 +6951,6 @@ int handler::update_first_row(const uchar *new_data) int handler::ha_delete_row(const uchar *buf) { int error; - Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function; DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK); /* @@ -7019,10 +6969,14 @@ int handler::ha_delete_row(const uchar *buf) if (likely(!error)) { rows_changed++; - error= binlog_log_row(table, buf, 0, log_func); + if (row_logging) + { + Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function; + error= binlog_log_row(table, buf, 0, log_func); + } #ifdef WITH_WSREP - if (table_share->tmp_table == NO_TMP_TABLE && - WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd()))) + if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE && + !error && (error= wsrep_after_row(ha_thd()))) { return error; } @@ -7049,11 +7003,10 @@ int handler::ha_delete_row(const uchar *buf) int handler::ha_direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) { int error; - MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str); mark_trx_read_write(); - error = direct_update_rows(update_rows, found_rows); + error= direct_update_rows(update_rows, found_rows); MYSQL_UPDATE_ROW_DONE(error); return error; } diff --git a/sql/handler.h b/sql/handler.h index 176a67e2494..9d159771097 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -610,6 +610,7 @@ given at all. */ #define HA_CREATE_USED_SEQUENCE (1UL << 25) typedef ulonglong alter_table_operations; +typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*); /* These flags are set by the parser and describes the type of @@ -3050,8 +3051,6 @@ public: bool mark_trx_read_write_done; /* mark_trx_read_write was called */ bool check_table_binlog_row_based_done; /* check_table_binlog.. was called */ bool check_table_binlog_row_based_result; /* cached check_table_binlog... */ - /* Set to 1 if handler logged last insert/update/delete operation */ - bool row_already_logged; /* TRUE <=> the engine guarantees that returned records are within the range being scanned. @@ -3192,10 +3191,16 @@ public: void end_psi_batch_mode(); bool set_top_table_fields; + struct TABLE *top_table; Field **top_table_field; uint top_table_fields; + /* If we have row logging enabled for this table */ + bool row_logging, row_logging_init; + /* If the row logging should be done in transaction cache */ + bool row_logging_has_trans; + private: /** The lock type set by when calling::ha_external_lock(). This is @@ -3213,7 +3218,6 @@ private: /** Stores next_insert_id for handling duplicate key errors. */ ulonglong m_prev_insert_id; - public: handler(handlerton *ht_arg, TABLE_SHARE *share_arg) :table_share(share_arg), table(0), @@ -3223,7 +3227,6 @@ public: mark_trx_read_write_done(0), check_table_binlog_row_based_done(0), check_table_binlog_row_based_result(0), - row_already_logged(0), in_range_check_pushed_down(FALSE), errkey(-1), key_used_on_scan(MAX_KEY), active_index(MAX_KEY), keyread(MAX_KEY), @@ -3242,6 +3245,7 @@ public: m_psi_locker(NULL), set_top_table_fields(FALSE), top_table(0), top_table_field(0), top_table_fields(0), + row_logging(0), row_logging_init(0), m_lock_type(F_UNLCK), ha_share(NULL), m_prev_insert_id(0) { DBUG_PRINT("info", @@ -4598,13 +4602,17 @@ protected: virtual int delete_table(const char *name); public: - bool check_table_binlog_row_based(bool binlog_row); + bool check_table_binlog_row_based(); + bool prepare_for_row_logging(); int prepare_for_insert(bool force_update_handler= 0); + int binlog_log_row(TABLE *table, + const uchar *before_record, + const uchar *after_record, + Log_func *log_func); inline void clear_cached_table_binlog_row_based_flag() { check_table_binlog_row_based_done= 0; - check_table_binlog_row_based_result= 0; } private: /* Cache result to avoid extra calls */ @@ -4619,7 +4627,7 @@ private: private: void mark_trx_read_write_internal(); - bool check_table_binlog_row_based_internal(bool binlog_row); + bool check_table_binlog_row_based_internal(); protected: /* @@ -5202,7 +5210,6 @@ int binlog_log_row(TABLE* table, if (unlikely(this_tracker)) \ tracker->stop_tracking(table->in_use); \ } -int binlog_write_table_map(THD *thd, TABLE *table, bool with_annotate); void print_keydup_error(TABLE *table, KEY *key, const char *msg, myf errflag); void print_keydup_error(TABLE *table, KEY *key, myf errflag); diff --git a/sql/log.cc b/sql/log.cc index 271fe647d87..6e11283904e 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -514,6 +514,12 @@ void Log_event_writer::add_status(enum_logged_status status) cache_data->add_status(status); } +void Log_event_writer::set_incident() +{ + cache_data->set_incident(); +} + + class binlog_cache_mngr { public: binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size, @@ -714,7 +720,6 @@ bool Log_to_csv_event_handler:: uint field_index; Silence_log_table_errors error_handler; Open_tables_backup open_tables_backup; - ulonglong save_thd_options; bool save_time_zone_used; DBUG_ENTER("log_general"); @@ -724,9 +729,6 @@ bool Log_to_csv_event_handler:: */ save_time_zone_used= thd->time_zone_used; - save_thd_options= thd->variables.option_bits; - thd->variables.option_bits&= ~OPTION_BIN_LOG; - table_list.init_one_table(&MYSQL_SCHEMA_NAME, &GENERAL_LOG_NAME, 0, TL_WRITE_CONCURRENT_INSERT); @@ -806,7 +808,6 @@ bool Log_to_csv_event_handler:: table->field[field_index]->set_default(); } - /* log table entries are not replicated */ if (table->file->ha_write_row(table->record[0])) goto err; @@ -827,7 +828,6 @@ err: if (need_close) close_log_table(thd, &open_tables_backup); - thd->variables.option_bits= save_thd_options; thd->time_zone_used= save_time_zone_used; DBUG_RETURN(result); } @@ -880,7 +880,6 @@ bool Log_to_csv_event_handler:: ulong lock_time= (ulong) MY_MIN(lock_utime/1000000, TIME_MAX_VALUE_SECONDS); ulong query_time_micro= (ulong) (query_utime % 1000000); ulong lock_time_micro= (ulong) (lock_utime % 1000000); - DBUG_ENTER("Log_to_csv_event_handler::log_slow"); thd->push_internal_handler(& error_handler); @@ -997,7 +996,6 @@ bool Log_to_csv_event_handler:: 0, TRUE)) goto err; - /* log table entries are not replicated */ if (table->file->ha_write_row(table->record[0])) goto err; @@ -1982,7 +1980,7 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) if (cache_mngr->trx_cache.has_incident()) error= mysql_bin_log.write_incident(thd); - thd->clear_binlog_table_maps(); + thd->reset_binlog_for_next_statement(); cache_mngr->reset(false, true); } @@ -2253,6 +2251,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() */ cache_mngr->reset(false, true); + thd->reset_binlog_for_next_statement(); DBUG_RETURN(error); } if (!wsrep_emulate_bin_log && mysql_bin_log.check_write_error(thd)) @@ -2297,6 +2296,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) */ if (!all) cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); + thd->reset_binlog_for_next_statement(); DBUG_RETURN(error); } @@ -2478,14 +2478,14 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) /* When a SAVEPOINT is executed inside a stored function/trigger we force the - pending event to be flushed with a STMT_END_F flag and clear the table maps + pending event to be flushed with a STMT_END_F flag and reset binlog as well to ensure that following DMLs will have a clean state to start with. ROLLBACK inside a stored routine has to finalize possibly existing current row-based pending event with cleaning up table maps. That ensures that following DMLs will have a clean state to start with. */ if (thd->in_sub_stmt) - thd->clear_binlog_table_maps(); + thd->reset_binlog_for_next_statement(); DBUG_RETURN(0); } @@ -5773,7 +5773,7 @@ binlog_cache_mngr *THD::binlog_setup_trx_data() We only update the saved position if the old one was undefined, the reason is that there are some cases (e.g., for CREATE-SELECT) where the position is saved twice (e.g., both in - select_create::prepare() and THD::binlog_write_table_map()) , but + select_create::prepare() and binlog_write_table_map()) , but we should use the first. This means that calls to this function can be used to start the statement before the first table map event, to include some extra events. @@ -5900,45 +5900,149 @@ binlog_start_consistent_snapshot(handlerton *hton, THD *thd) DBUG_RETURN(err); } + +/** + Prepare all tables that are updated for row logging + + Annotate events and table maps are written by binlog_write_table_maps() +*/ + +void THD::binlog_prepare_for_row_logging() +{ + DBUG_ENTER("THD::binlog_prepare_for_row_logging"); + for (TABLE *table= open_tables ; table; table= table->next) + { + if (table->query_id == query_id && table->current_lock == F_WRLCK) + table->file->prepare_for_row_logging(); + } + DBUG_VOID_RETURN; +} + +/** + Write annnotated row event (the query) if needed +*/ + +bool THD::binlog_write_annotated_row(Log_event_writer *writer) +{ + int error; + DBUG_ENTER("THD::binlog_write_annotated_row"); + + if (!(IF_WSREP(!wsrep_fragments_certified_for_stmt(this), true) && + variables.binlog_annotate_row_events && + query_length())) + DBUG_RETURN(0); + + Annotate_rows_log_event anno(this, 0, false); + if (unlikely((error= writer->write(&anno)))) + { + if (my_errno == EFBIG) + writer->set_incident(); + DBUG_RETURN(error); + } + DBUG_RETURN(0); +} + + +/** + Write table map events for all tables that are using row logging. + This includes all tables used by this statement, including tables + used in triggers. + + Also write annotate events and start transactions. + This is using the "tables_with_row_logging" list prepared by + THD::binlog_prepare_for_row_logging +*/ + +bool THD::binlog_write_table_maps() +{ + bool with_annotate; + MYSQL_LOCK *locks[2], **locks_end= locks; + DBUG_ENTER("THD::binlog_write_table_maps"); + + DBUG_ASSERT(!binlog_table_maps); + DBUG_ASSERT(is_current_stmt_binlog_format_row()); + + /* Initialize cache_mngr once per statement */ + binlog_start_trans_and_stmt(); + with_annotate= 1; // Write annotate with first map + + if ((*locks_end= extra_lock)) + locks_end++; + if ((*locks_end= lock)) + locks_end++; + + for (MYSQL_LOCK **cur_lock= locks ; cur_lock < locks_end ; cur_lock++) + { + TABLE **const end_ptr= (*cur_lock)->table + (*cur_lock)->table_count; + for (TABLE **table_ptr= (*cur_lock)->table; + table_ptr != end_ptr ; + ++table_ptr) + { + TABLE *table= *table_ptr; + bool restore= 0; + /* + We have to also write table maps for tables that have not yet been + used, like for tables in after triggers + */ + if (!table->file->row_logging && + table->query_id != query_id && table->current_lock == F_WRLCK) + { + if (table->file->prepare_for_row_logging()) + restore= 1; + } + if (table->file->row_logging) + { + if (binlog_write_table_map(table, with_annotate)) + DBUG_RETURN(1); + with_annotate= 0; + } + if (restore) + { + /* + Restore original setting so that it doesn't cause problem for the + next statement + */ + table->file->row_logging= table->file->row_logging_init= 0; + } + } + } + binlog_table_maps= 1; // Table maps written + DBUG_RETURN(0); +} + + /** This function writes a table map to the binary log. Note that in order to keep the signature uniform with related methods, we use a redundant parameter to indicate whether a transactional table was changed or not. - If with_annotate != NULL and - *with_annotate = TRUE write also Annotate_rows before the table map. - @param table a pointer to the table. - @param is_transactional @c true indicates a transactional table, - otherwise @c false a non-transactional. + @param with_annotate If true call binlog_write_annotated_row() + @return nonzero if an error pops up when writing the table map event. */ -int THD::binlog_write_table_map(TABLE *table, bool is_transactional, - bool *with_annotate) + +bool THD::binlog_write_table_map(TABLE *table, bool with_annotate) { int error; + bool is_transactional= table->file->row_logging_has_trans; DBUG_ENTER("THD::binlog_write_table_map"); DBUG_PRINT("enter", ("table: %p (%s: #%lu)", table, table->s->table_name.str, table->s->table_map_id)); + /* Pre-conditions */ + DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); + /* Ensure that all events in a GTID group are in the same cache */ if (variables.option_bits & OPTION_GTID_BEGIN) is_transactional= 1; - /* Pre-conditions */ - DBUG_ASSERT(is_current_stmt_binlog_format_row()); - DBUG_ASSERT(WSREP_EMULATE_BINLOG_NNULL(this) || mysql_bin_log.is_open()); - DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); - Table_map_log_event the_event(this, table, table->s->table_map_id, is_transactional); - if (binlog_table_maps == 0) - binlog_start_trans_and_stmt(); - binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); binlog_cache_data *cache_data= (cache_mngr-> @@ -5946,25 +6050,17 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, IO_CACHE *file= &cache_data->cache_log; Log_event_writer writer(file, cache_data); - if (with_annotate && *with_annotate) - { - Annotate_rows_log_event anno(table->in_use, is_transactional, false); - /* Annotate event should be written not more than once */ - *with_annotate= 0; - if (unlikely((error= writer.write(&anno)))) - { - if (my_errno == EFBIG) - cache_data->set_incident(); - DBUG_RETURN(error); - } - } + if (with_annotate) + if (binlog_write_annotated_row(&writer)) + DBUG_RETURN(1); + if (unlikely((error= writer.write(&the_event)))) DBUG_RETURN(error); - binlog_table_maps++; DBUG_RETURN(0); } + /** This function retrieves a pending row event from a cache which is specified through the parameter @c is_transactional. Respectively, when it @@ -10848,7 +10944,7 @@ void wsrep_thd_binlog_trx_reset(THD * thd) cache_mngr->stmt_cache.reset(); } } - thd->clear_binlog_table_maps(); + thd->reset_binlog_for_next_statement(); DBUG_VOID_RETURN; } @@ -10875,7 +10971,6 @@ bool wsrep_stmt_rollback_is_safe(THD* thd) binlog_cache_mngr *cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - if (binlog_hton && cache_mngr) { binlog_cache_data * trx_cache = &cache_mngr->trx_cache; diff --git a/sql/log.h b/sql/log.h index 1071538fbfd..063513fe908 100644 --- a/sql/log.h +++ b/sql/log.h @@ -1144,6 +1144,7 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, void make_default_log_name(char **out, const char* log_ext, bool once); void binlog_reset_cache(THD *thd); +bool write_annotated_row(THD *thd); extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log; extern handlerton *binlog_hton; diff --git a/sql/log_event.h b/sql/log_event.h index 6401ae326e3..bbbe5c915ad 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -942,6 +942,7 @@ public: int write_footer(); my_off_t pos() { return my_b_safe_tell(file); } void add_status(enum_logged_status status); + void set_incident(); Log_event_writer(IO_CACHE *file_arg, binlog_cache_data *cache_data_arg, Binlog_crypt_data *cr= 0) diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index bc7e7d7b7a1..dace82aa835 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -799,7 +799,10 @@ my_bool Log_event::need_checksum() int Log_event_writer::write_internal(const uchar *pos, size_t len) { if (my_b_safe_write(file, pos, len)) + { + DBUG_PRINT("error", ("write to log failed: %d", my_errno)); return 1; + } bytes_written+= len; return 0; } @@ -6153,13 +6156,10 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, (tbl->s->db.str[tbl->s->db.length] == 0)); DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0); -#ifdef MYSQL_SERVER binlog_type_info_array= (Binlog_type_info *)thd->alloc(m_table->s->fields * sizeof(Binlog_type_info)); for (uint i= 0; i < m_table->s->fields; i++) binlog_type_info_array[i]= m_table->field[i]->binlog_type_info(); -#endif - m_data_size= TABLE_MAP_HEADER_LEN; DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;); @@ -7079,14 +7079,14 @@ bool Rows_log_event::process_triggers(trg_event_type event, m_table->triggers->mark_fields_used(event); if (slave_run_triggers_for_rbr == SLAVE_RUN_TRIGGERS_FOR_RBR_YES) { - tmp_disable_binlog(thd); /* Do not replicate the low-level changes. */ result= m_table->triggers->process_triggers(thd, event, - time_type, old_row_is_record1); - reenable_binlog(thd); + time_type, + old_row_is_record1); } else result= m_table->triggers->process_triggers(thd, event, - time_type, old_row_is_record1); + time_type, + old_row_is_record1); DBUG_RETURN(result); } diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 94c36dfcb20..02f1cd616d3 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -423,10 +423,12 @@ rpl_slave_state::truncate_state_table(THD *thd) TABLE_LIST tlist; int err= 0; - tmp_disable_binlog(thd); - tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name, NULL, TL_WRITE); - if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name, + NULL, TL_WRITE); + if (!(err= open_and_lock_tables(thd, &tlist, FALSE, + MYSQL_OPEN_IGNORE_LOGGING_FORMAT))) { + DBUG_ASSERT(!tlist.table->file->row_logging); tdc_remove_table(thd, TDC_RT_REMOVE_UNUSED, "mysql", rpl_gtid_slave_state_table_name.str); err= tlist.table->file->ha_truncate(); @@ -445,8 +447,6 @@ rpl_slave_state::truncate_state_table(THD *thd) } thd->mdl_context.release_transactional_locks(); } - - reenable_binlog(thd); return err; } @@ -676,6 +676,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, table_opened= true; table= tlist.table; hton= table->s->db_type(); + table->file->row_logging= 0; // No binary logging if ((err= gtid_check_rpl_slave_state_table(table))) goto end; diff --git a/sql/rpl_injector.cc b/sql/rpl_injector.cc index b0b6e30bed6..0726697211b 100644 --- a/sql/rpl_injector.cc +++ b/sql/rpl_injector.cc @@ -100,6 +100,7 @@ int injector::transaction::commit() } +#ifdef TO_BE_DELETED int injector::transaction::use_table(server_id_type sid, table tbl) { DBUG_ENTER("injector::transaction::use_table"); @@ -111,12 +112,12 @@ int injector::transaction::use_table(server_id_type sid, table tbl) server_id_type save_id= m_thd->variables.server_id; m_thd->set_server_id(sid); - error= m_thd->binlog_write_table_map(tbl.get_table(), - tbl.is_transactional()); + DBUG_ASSERT(tbl.is_transactional() == tbl.get_table()->file->row_logging_has_trans); + error= m_thd->binlog_write_table_map(tbl.get_table(), 0); m_thd->set_server_id(save_id); DBUG_RETURN(error); } - +#endif injector::transaction::binlog_pos injector::transaction::start_pos() const diff --git a/sql/rpl_injector.h b/sql/rpl_injector.h index ecf16ba28cf..669a8e29543 100644 --- a/sql/rpl_injector.h +++ b/sql/rpl_injector.h @@ -177,8 +177,9 @@ public: >0 Failure */ +#ifdef TO_BE_DELETED int use_table(server_id_type sid, table tbl); - +#endif /* Commit a transaction. diff --git a/sql/slave.cc b/sql/slave.cc index b6cfd2deef4..6a014e9bfa4 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3582,20 +3582,19 @@ void set_slave_thread_options(THD* thd) when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but only for client threads. */ - ulonglong options= thd->variables.option_bits | OPTION_BIG_SELECTS; - if (opt_log_slave_updates) - options|= OPTION_BIN_LOG; - else + ulonglong options= (thd->variables.option_bits | + OPTION_BIG_SELECTS | OPTION_BIN_LOG); + if (!opt_log_slave_updates) options&= ~OPTION_BIN_LOG; - thd->variables.option_bits= options; - thd->variables.completion_type= 0; - /* For easier test in LOGGER::log_command */ if (thd->variables.log_disabled_statements & LOG_DISABLE_SLAVE) - thd->variables.option_bits|= OPTION_LOG_OFF; + options|= OPTION_LOG_OFF; + thd->variables.option_bits= options; - thd->variables.sql_log_slow= !MY_TEST(thd->variables.log_slow_disabled_statements & - LOG_SLOW_DISABLE_SLAVE); + thd->variables.completion_type= 0; + thd->variables.sql_log_slow= + !MY_TEST(thd->variables.log_slow_disabled_statements & + LOG_SLOW_DISABLE_SLAVE); DBUG_VOID_RETURN; } @@ -8196,7 +8195,7 @@ void Rows_event_tracker::reset() /* - Update log event tracking data. + Update log event tracking data. The first- and last- seen event binlog position get memorized, as well as the end-of-statement status of the last one. @@ -8206,6 +8205,7 @@ void Rows_event_tracker::update(const char* file_name, my_off_t pos, const char* buf, const Format_description_log_event *fdle) { + DBUG_ENTER("Rows_event_tracker::update"); if (!first_seen) { first_seen= pos; @@ -8214,6 +8214,7 @@ void Rows_event_tracker::update(const char* file_name, my_off_t pos, last_seen= pos; DBUG_ASSERT(stmt_end_seen == 0); // We can only have one stmt_end_seen= get_row_event_stmt_end(buf, fdle); + DBUG_VOID_RETURN; }; diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index a89b887ca95..7b392f9b1e7 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -1942,7 +1942,9 @@ class Grant_tables if (res) DBUG_RETURN(res); - if (lock_tables(thd, first, counter, MYSQL_LOCK_IGNORE_TIMEOUT)) + if (lock_tables(thd, first, counter, + MYSQL_LOCK_IGNORE_TIMEOUT | + MYSQL_OPEN_IGNORE_LOGGING_FORMAT)) DBUG_RETURN(-1); p_user_table->set_table(tables[USER_TABLE].table); @@ -4397,7 +4399,8 @@ static USER_AUTH auth_no_password; static int replace_user_table(THD *thd, const User_table &user_table, LEX_USER * const combo, privilege_t rights, - const bool revoke_grant, const bool can_create_user, + const bool revoke_grant, + const bool can_create_user, const bool no_auto_create) { int error = -1; @@ -5426,6 +5429,7 @@ static int replace_column_table(GRANT_TABLE *g_t, List_iterator <LEX_COLUMN> iter(columns); class LEX_COLUMN *column; + int error= table->file->ha_index_init(0, 1); if (unlikely(error)) { @@ -5776,7 +5780,7 @@ static int replace_routine_table(THD *thd, GRANT_NAME *grant_name, */ table->use_all_columns(); - restore_record(table, s->default_values); // Get empty record + restore_record(table, s->default_values); // Get empty record table->field[0]->store(combo.host.str,combo.host.length, &my_charset_latin1); table->field[1]->store(db,(uint) strlen(db), &my_charset_latin1); table->field[2]->store(combo.user.str,combo.user.length, &my_charset_latin1); diff --git a/sql/sql_admin.cc b/sql/sql_admin.cc index cfc48c82b4d..cdec2c08e5c 100644 --- a/sql/sql_admin.cc +++ b/sql/sql_admin.cc @@ -1027,9 +1027,7 @@ send_result_message: *save_next_global= table->next_global; table->next_local= table->next_global= 0; - tmp_disable_binlog(thd); // binlogging is done by caller if wanted result_code= admin_recreate_table(thd, table); - reenable_binlog(thd); trans_commit_stmt(thd); trans_commit(thd); close_thread_tables(thd); diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 54ea259ee3b..90e563c824e 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -734,8 +734,9 @@ bool close_cached_connection_tables(THD *thd, LEX_CSTRING *connection) Clear 'check_table_binlog_row_based_done' flag. For tables which were used by current substatement the flag is cleared as part of 'ha_reset()' call. For the rest of the open tables not used by current substament if this - flag is enabled as part of current substatement execution, clear the flag - explicitly. + flag is enabled as part of current substatement execution, + (for example when THD::binlog_write_table_maps() calls + prepare_for_row_logging()), clear the flag explicitly. NOTE The reason we reset query_id is that it's not enough to just test @@ -759,7 +760,7 @@ static void mark_used_tables_as_free_for_reuse(THD *thd, TABLE *table) table->query_id= 0; table->file->ha_reset(); } - else if (table->file->check_table_binlog_row_based_done) + else table->file->clear_cached_table_binlog_row_based_flag(); } DBUG_VOID_RETURN; @@ -1660,9 +1661,10 @@ static int set_partitions_as_used(TABLE_LIST *tl, TABLE *t) needed to remedy problem before retrying again. @retval FALSE 't' was not locked, not a VIEW or an error happened. */ + bool is_locked_view(THD *thd, TABLE_LIST *t) { - DBUG_ENTER("check_locked_view"); + DBUG_ENTER("is_locked_view"); /* Is this table a view and not a base table? (it is work around to allow to open view with locked tables, @@ -2212,8 +2214,8 @@ retry_share: my_error(ER_NOT_SEQUENCE, MYF(0), table_list->db.str, table_list->alias.str); DBUG_RETURN(true); } - table->init(thd, table_list); + DBUG_ASSERT(thd->locked_tables_mode || table->file->row_logging == 0); DBUG_RETURN(FALSE); @@ -3738,10 +3740,11 @@ open_and_process_table(THD *thd, TABLE_LIST *tables, uint *counter, uint flags, temporary table or SEQUENCE (see sequence_insert()). */ DBUG_ASSERT(is_temporary_table(tables) || tables->table->s->sequence); - if (tables->sequence && tables->table->s->table_type != TABLE_TYPE_SEQUENCE) + if (tables->sequence && + tables->table->s->table_type != TABLE_TYPE_SEQUENCE) { - my_error(ER_NOT_SEQUENCE, MYF(0), tables->db.str, tables->alias.str); - DBUG_RETURN(true); + my_error(ER_NOT_SEQUENCE, MYF(0), tables->db.str, tables->alias.str); + DBUG_RETURN(true); } } else if (tables->open_type == OT_TEMPORARY_ONLY) @@ -5214,7 +5217,9 @@ bool open_and_lock_tables(THD *thd, const DDL_options_st &options, if (lock_tables(thd, tables, counter, flags)) goto err; - (void) read_statistics_for_tables_if_needed(thd, tables); + /* Don't read statistics tables when opening internal tables */ + if (!(flags & MYSQL_OPEN_IGNORE_LOGGING_FORMAT)) + (void) read_statistics_for_tables_if_needed(thd, tables); if (derived) { @@ -5348,12 +5353,19 @@ bool open_tables_only_view_structure(THD *thd, TABLE_LIST *table_list, static void mark_real_tables_as_free_for_reuse(TABLE_LIST *table_list) { TABLE_LIST *table; + DBUG_ENTER("mark_real_tables_as_free_for_reuse"); + + /* + We have to make two loops as HA_EXTRA_DETACH_CHILDREN may + remove items from the table list that we have to reset + */ for (table= table_list; table; table= table->next_global) + { if (!table->placeholder()) - { table->table->query_id= 0; - } + } for (table= table_list; table; table= table->next_global) + { if (!table->placeholder()) { /* @@ -5364,6 +5376,8 @@ static void mark_real_tables_as_free_for_reuse(TABLE_LIST *table_list) */ table->table->file->extra(HA_EXTRA_DETACH_CHILDREN); } + } + DBUG_VOID_RETURN; } @@ -5428,7 +5442,7 @@ err: bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags) { - TABLE_LIST *table; + TABLE_LIST *table, *first_not_own; DBUG_ENTER("lock_tables"); /* We can't meet statement requiring prelocking if we already @@ -5438,7 +5452,9 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags) !thd->lex->requires_prelocking()); if (!tables && !thd->lex->requires_prelocking()) - DBUG_RETURN(thd->decide_logging_format(tables)); + DBUG_RETURN(0); + + first_not_own= thd->lex->first_not_own_table(); /* Check for thd->locked_tables_mode to avoid a redundant @@ -5454,13 +5470,26 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags) { DBUG_ASSERT(thd->lock == 0); // You must lock everything at once TABLE **start,**ptr; + bool found_first_not_own= 0; if (!(ptr=start=(TABLE**) thd->alloc(sizeof(TABLE*)*count))) DBUG_RETURN(TRUE); + + /* + Collect changes tables for table lock. + Mark own tables with query id as this is needed by + prepare_for_row_logging() + */ for (table= tables; table; table= table->next_global) { + if (table == first_not_own) + found_first_not_own= 1; if (!table->placeholder()) - *(ptr++)= table->table; + { + *(ptr++)= table->table; + if (!found_first_not_own) + table->table->query_id= thd->query_id; + } } DEBUG_SYNC(thd, "before_lock_tables_takes_lock"); @@ -5474,7 +5503,6 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags) if (thd->lex->requires_prelocking() && thd->lex->sql_command != SQLCOM_LOCK_TABLES) { - TABLE_LIST *first_not_own= thd->lex->first_not_own_table(); /* We just have done implicit LOCK TABLES, and now we have to emulate first open_and_lock_tables() after it. @@ -5492,7 +5520,6 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags) { if (!table->placeholder()) { - table->table->query_id= thd->query_id; if (check_lock_and_start_stmt(thd, thd->lex, table)) { mysql_unlock_tables(thd, thd->lock); @@ -5512,7 +5539,6 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags) } else { - TABLE_LIST *first_not_own= thd->lex->first_not_own_table(); /* When open_and_lock_tables() is called for a single table out of a table list, the 'next_global' chain is temporarily broken. We @@ -5528,6 +5554,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags) if (table->placeholder()) continue; + table->table->query_id= thd->query_id; /* In a stored function or trigger we should ensure that we won't change a table that is already used by the calling statement. @@ -5567,7 +5594,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, uint flags) } bool res= fix_all_session_vcol_exprs(thd, tables); - if (!res) + if (!res && !(flags & MYSQL_OPEN_IGNORE_LOGGING_FORMAT)) res= thd->decide_logging_format(tables); DBUG_RETURN(res); @@ -9005,6 +9032,7 @@ open_system_tables_for_read(THD *thd, TABLE_LIST *table_list, */ if (open_and_lock_tables(thd, table_list, FALSE, (MYSQL_OPEN_IGNORE_FLUSH | + MYSQL_OPEN_IGNORE_LOGGING_FORMAT | (table_list->lock_type < TL_WRITE_ALLOW_WRITE ? MYSQL_LOCK_IGNORE_TIMEOUT : 0)))) { @@ -9016,6 +9044,7 @@ open_system_tables_for_read(THD *thd, TABLE_LIST *table_list, for (TABLE_LIST *tables= table_list; tables; tables= tables->next_global) { DBUG_ASSERT(tables->table->s->table_category == TABLE_CATEGORY_SYSTEM); + tables->table->file->row_logging= 0; tables->table->use_all_columns(); } lex->restore_backup_query_tables_list(&query_tables_list_backup); @@ -9103,8 +9132,9 @@ open_system_table_for_update(THD *thd, TABLE_LIST *one_table) { DBUG_ASSERT(table->s->table_category == TABLE_CATEGORY_SYSTEM); table->use_all_columns(); + /* This table instance is not row logged */ + table->file->row_logging= 0; } - DBUG_RETURN(table); } @@ -9137,6 +9167,8 @@ open_log_table(THD *thd, TABLE_LIST *one_table, Open_tables_backup *backup) if ((table= open_ltable(thd, one_table, one_table->lock_type, flags))) { DBUG_ASSERT(table->s->table_category == TABLE_CATEGORY_LOG); + DBUG_ASSERT(!table->file->row_logging); + /* Make sure all columns get assigned to a default value */ table->use_all_columns(); DBUG_ASSERT(table->s->no_replicate); diff --git a/sql/sql_base.h b/sql/sql_base.h index 572dd487d4a..929dc245073 100644 --- a/sql/sql_base.h +++ b/sql/sql_base.h @@ -125,6 +125,11 @@ TABLE *open_ltable(THD *thd, TABLE_LIST *table_list, thr_lock_type update, */ #define MYSQL_OPEN_IGNORE_REPAIR 0x10000 +/** + Don't call decide_logging_format. Used for statistic tables etc +*/ +#define MYSQL_OPEN_IGNORE_LOGGING_FORMAT 0x20000 + /** Please refer to the internals manual. */ #define MYSQL_OPEN_REOPEN (MYSQL_OPEN_IGNORE_FLUSH |\ MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |\ diff --git a/sql/sql_class.cc b/sql/sql_class.cc index f67b42230c3..64f8e99c0c4 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -636,7 +636,6 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) m_current_stage_key(0), m_psi(0), in_sub_stmt(0), log_all_errors(0), binlog_unsafe_warning_flags(0), - binlog_table_maps(0), bulk_param(0), table_map_for_update(0), m_examined_row_count(0), @@ -797,6 +796,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) bzero((void*) ha_data, sizeof(ha_data)); mysys_var=0; binlog_evt_union.do_union= FALSE; + binlog_table_maps= FALSE; enable_slow_log= 0; durability_property= HA_REGULAR_DURABILITY; @@ -1315,8 +1315,6 @@ void THD::init() else variables.option_bits&= ~OPTION_BIN_LOG; - variables.sql_log_bin_off= 0; - select_commands= update_commands= other_commands= 0; /* Set to handle counting of aborted connections */ userstat_running= opt_userstat_running; @@ -5837,8 +5835,9 @@ int THD::decide_logging_format(TABLE_LIST *tables) { DBUG_ENTER("THD::decide_logging_format"); DBUG_PRINT("info", ("Query: %.*s", (uint) query_length(), query())); - DBUG_PRINT("info", ("variables.binlog_format: %lu", - variables.binlog_format)); + DBUG_PRINT("info", ("binlog_format: %lu", (ulong) variables.binlog_format)); + DBUG_PRINT("info", ("current_stmt_binlog_format: %lu", + (ulong) current_stmt_binlog_format)); DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x", lex->get_stmt_unsafe_flags())); @@ -5861,18 +5860,14 @@ int THD::decide_logging_format(TABLE_LIST *tables) DBUG_RETURN(-1); } } +#endif /* WITH_WSREP */ if ((WSREP_EMULATE_BINLOG_NNULL(this) || - (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG))) && - !(wsrep_binlog_format() == BINLOG_FORMAT_STMT && - !binlog_filter->db_ok(db.str))) -#else - if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) && + (mysql_bin_log.is_open() && + (variables.option_bits & OPTION_BIN_LOG))) && !(wsrep_binlog_format() == BINLOG_FORMAT_STMT && !binlog_filter->db_ok(db.str))) -#endif /* WITH_WSREP */ { - if (is_bulk_op()) { if (wsrep_binlog_format() == BINLOG_FORMAT_STMT) @@ -5915,6 +5910,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) bool has_auto_increment_write_tables_not_first= FALSE; bool found_first_not_own_table= FALSE; bool has_write_tables_with_unsafe_statements= FALSE; + bool blackhole_table_found= 0; /* A pointer to a previous table that was changed. @@ -6040,6 +6036,10 @@ int THD::decide_logging_format(TABLE_LIST *tables) if (prev_write_table && prev_write_table->file->ht != table->file->ht) multi_write_engine= TRUE; + + if (table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB) + blackhole_table_found= 1; + if (share->non_determinstic_insert && !(sql_command_flags[lex->sql_command] & CF_SCHEMA_CHANGE)) has_write_tables_with_unsafe_statements= true; @@ -6285,7 +6285,8 @@ int THD::decide_logging_format(TABLE_LIST *tables) is_current_stmt_binlog_format_row() ? "ROW" : "STATEMENT")); - if (variables.binlog_format == BINLOG_FORMAT_ROW && + if (blackhole_table_found && + variables.binlog_format == BINLOG_FORMAT_ROW && (sql_command_flags[lex->sql_command] & (CF_UPDATES_DATA | CF_DELETES_DATA))) { @@ -6301,8 +6302,8 @@ int THD::decide_logging_format(TABLE_LIST *tables) if (table->table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB && table->lock_type >= TL_WRITE_ALLOW_WRITE) { - table_names.append(&table->table_name); - table_names.append(","); + table_names.append(&table->table_name); + table_names.append(","); } } if (!table_names.is_empty()) @@ -6322,9 +6323,12 @@ int THD::decide_logging_format(TABLE_LIST *tables) table_names.c_ptr()); } } + + if (is_write && is_current_stmt_binlog_format_row()) + binlog_prepare_for_row_logging(); } -#ifndef DBUG_OFF else + { DBUG_PRINT("info", ("decision: no logging since " "mysql_bin_log.is_open() = %d " "and (options & OPTION_BIN_LOG) = 0x%llx " @@ -6334,22 +6338,23 @@ int THD::decide_logging_format(TABLE_LIST *tables) (variables.option_bits & OPTION_BIN_LOG), (uint) wsrep_binlog_format(), binlog_filter->db_ok(db.str))); -#endif - + if (WSREP_NNULL(this) && is_current_stmt_binlog_format_row()) + binlog_prepare_for_row_logging(); + } DBUG_RETURN(0); } int THD::decide_logging_format_low(TABLE *table) { + DBUG_ENTER("decide_logging_format_low"); /* - INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys - can be unsafe. - */ - if(wsrep_binlog_format() <= BINLOG_FORMAT_STMT && - !is_current_stmt_binlog_format_row() && - !lex->is_stmt_unsafe() && - lex->sql_command == SQLCOM_INSERT && - lex->duplicates == DUP_UPDATE) + INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys + can be unsafe. + */ + if (wsrep_binlog_format() <= BINLOG_FORMAT_STMT && + !is_current_stmt_binlog_format_row() && + !lex->is_stmt_unsafe() && + lex->duplicates == DUP_UPDATE) { uint unique_keys= 0; uint keys= table->s->keys, i= 0; @@ -6376,27 +6381,22 @@ exit:; lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS); binlog_unsafe_warning_flags|= lex->get_stmt_unsafe_flags(); set_current_stmt_binlog_format_row_if_mixed(); - return 1; + if (is_current_stmt_binlog_format_row()) + binlog_prepare_for_row_logging(); + DBUG_RETURN(1); } } - return 0; + DBUG_RETURN(0); } -/* - Implementation of interface to write rows to the binary log through the - thread. The thread is responsible for writing the rows it has - inserted/updated/deleted. -*/ - #ifndef MYSQL_CLIENT - /* Template member function for ensuring that there is an rows log event of the apropriate type before proceeding. PRE CONDITION: - Events of type 'RowEventT' have the type code 'type_code'. - + POST CONDITION: If a non-NULL pointer is returned, the pending event for thread 'thd' will be an event of type 'RowEventT' (which have the type code 'type_code') @@ -6814,7 +6814,8 @@ void THD::binlog_prepare_row_images(TABLE *table) */ if (table->s->primary_key < MAX_KEY && (thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) && - !ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW_OPT)) + !ha_check_storage_engine_flag(table->s->db_type(), + HTON_NO_BINLOG_ROW_OPT)) { /** Just to be sure that tmp_set is currently not in use as @@ -6859,7 +6860,7 @@ void THD::binlog_prepare_row_images(TABLE *table) -int THD::binlog_remove_pending_rows_event(bool clear_maps, +int THD::binlog_remove_pending_rows_event(bool reset_stmt, bool is_transactional) { DBUG_ENTER("THD::binlog_remove_pending_rows_event"); @@ -6873,12 +6874,12 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps, mysql_bin_log.remove_pending_rows_event(this, is_transactional); - if (clear_maps) - binlog_table_maps= 0; - + if (reset_stmt) + reset_binlog_for_next_statement(); DBUG_RETURN(0); } + int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) { DBUG_ENTER("THD::binlog_flush_pending_rows_event"); @@ -6904,9 +6905,8 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) if (stmt_end) { pending->set_flags(Rows_log_event::STMT_END_F); - binlog_table_maps= 0; + reset_binlog_for_next_statement(); } - error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0, is_transactional); } @@ -7278,8 +7278,11 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, suppress_use, errcode); error= mysql_bin_log.write(&qinfo); } + /* + row logged binlog may not have been reset in the case of locked tables + */ + reset_binlog_for_next_statement(); - binlog_table_maps= 0; DBUG_RETURN(error >= 0 ? error : 1); } diff --git a/sql/sql_class.h b/sql/sql_class.h index 85a99918a2f..af56eecfe60 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -75,14 +75,15 @@ void set_thd_stage_info(void *thd, #include "wsrep_condition_variable.h" class Wsrep_applier_service; - #endif /* WITH_WSREP */ + class Reprepare_observer; class Relay_log_info; struct rpl_group_info; class Rpl_filter; class Query_log_event; class Load_log_event; +class Log_event_writer; class sp_rcontext; class sp_cache; class Lex_input_stream; @@ -737,11 +738,6 @@ typedef struct system_variables my_bool query_cache_strip_comments; my_bool sql_log_slow; my_bool sql_log_bin; - /* - A flag to help detect whether binary logging was temporarily disabled - (see tmp_disable_binlog(A) macro). - */ - my_bool sql_log_bin_off; my_bool binlog_annotate_row_events; my_bool binlog_direct_non_trans_update; my_bool column_compression_zlib_wrap; @@ -2576,14 +2572,17 @@ public: */ void binlog_start_trans_and_stmt(); void binlog_set_stmt_begin(); - int binlog_write_table_map(TABLE *table, bool is_transactional, - bool *with_annotate= 0); int binlog_write_row(TABLE* table, bool is_transactional, const uchar *buf); int binlog_delete_row(TABLE* table, bool is_transactional, const uchar *buf); int binlog_update_row(TABLE* table, bool is_transactional, const uchar *old_data, const uchar *new_data); + bool prepare_handlers_for_update(uint flag); + bool binlog_write_annotated_row(Log_event_writer *writer); + void binlog_prepare_for_row_logging(); + bool binlog_write_table_maps(); + bool binlog_write_table_map(TABLE *table, bool with_annotate); static void binlog_prepare_row_images(TABLE* table); void set_server_id(uint32 sid) { variables.server_id = sid; } @@ -2677,22 +2676,20 @@ private: */ enum_binlog_format current_stmt_binlog_format; - /* - Number of outstanding table maps, i.e., table maps in the - transaction cache. - */ - uint binlog_table_maps; public: + + /* 1 if binlog table maps has been written */ + bool binlog_table_maps; + void issue_unsafe_warnings(); void reset_unsafe_warnings() { binlog_unsafe_warning_flags= 0; } - uint get_binlog_table_maps() const { - return binlog_table_maps; - } - void clear_binlog_table_maps() { + void reset_binlog_for_next_statement() + { binlog_table_maps= 0; } + #endif /* MYSQL_CLIENT */ public: @@ -5170,11 +5167,10 @@ my_eof(THD *thd) #define tmp_disable_binlog(A) \ {ulonglong tmp_disable_binlog__save_options= (A)->variables.option_bits; \ (A)->variables.option_bits&= ~OPTION_BIN_LOG; \ - (A)->variables.sql_log_bin_off= 1; + (A)->variables.option_bits|= OPTION_BIN_TMP_LOG_OFF; #define reenable_binlog(A) \ - (A)->variables.option_bits= tmp_disable_binlog__save_options; \ - (A)->variables.sql_log_bin_off= 0;} + (A)->variables.option_bits= tmp_disable_binlog__save_options; } inline date_conv_mode_t sql_mode_for_dates(THD *thd) diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 801871fcb5a..d5120d95420 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -959,6 +959,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, goto values_loop_end; THD_STAGE_INFO(thd, stage_update); + thd->decide_logging_format_low(table); do { DBUG_PRINT("info", ("iteration %llu", iteration)); @@ -1071,7 +1072,6 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, break; } - thd->decide_logging_format_low(table); #ifndef EMBEDDED_LIBRARY if (lock_type == TL_WRITE_DELAYED) { @@ -3051,6 +3051,8 @@ bool Delayed_insert::open_and_lock_table() return TRUE; } table->copy_blobs= 1; + + table->file->prepare_for_row_logging(); return FALSE; } @@ -3110,6 +3112,8 @@ pthread_handler_t handle_delayed_insert(void *arg) at which rows are inserted cannot be determined in mixed mode. */ thd->set_current_stmt_binlog_format_row_if_mixed(); + /* Don't annotate insert delayed binlog events */ + thd->variables.binlog_annotate_row_events= 0; /* Clone tickets representing protection against GRL and the lock on @@ -3314,8 +3318,19 @@ pthread_handler_t handle_delayed_insert(void *arg) di->table->file->delete_update_handler(); di->group_count=0; mysql_audit_release(thd); + /* + Reset binlog. We can't call ha_reset() for the table as this will + reset the table maps we have calculated earlier. + */ mysql_mutex_lock(&di->mutex); } + + /* + Reset binlog. We can't call ha_reset() for the table as this will + reset the table maps we have calculated earlier. + */ + thd->reset_binlog_for_next_statement(); + if (di->tables_in_use) mysql_cond_broadcast(&di->cond_client); // If waiting clients } @@ -3407,9 +3422,7 @@ bool Delayed_insert::handle_inserts(void) { int error; ulong max_rows; - bool has_trans = TRUE; - bool using_ignore= 0, using_opt_replace= 0, - using_bin_log= mysql_bin_log.is_open(); + bool using_ignore= 0, using_opt_replace= 0, using_bin_log; delayed_row *row; DBUG_ENTER("handle_inserts"); @@ -3443,7 +3456,13 @@ bool Delayed_insert::handle_inserts(void) if (table->file->ha_rnd_init_with_error(0)) goto err; + /* + We have to call prepare_for_row_logging() as the second call to + handler_writes() will not have called decide_logging_format. + */ + table->file->prepare_for_row_logging(); table->file->prepare_for_insert(); + using_bin_log= table->file->row_logging; /* We can't use row caching when using the binary log because if @@ -3452,6 +3471,7 @@ bool Delayed_insert::handle_inserts(void) */ if (!using_bin_log) table->file->extra(HA_EXTRA_WRITE_CACHE); + mysql_mutex_lock(&mutex); while ((row=rows.get())) @@ -3480,8 +3500,8 @@ bool Delayed_insert::handle_inserts(void) Guaranteed that the INSERT DELAYED STMT will not be here in SBR when mysql binlog is enabled. */ - DBUG_ASSERT(!(mysql_bin_log.is_open() && - !thd.is_current_stmt_binlog_format_row())); + DBUG_ASSERT(!mysql_bin_log.is_open() || + thd.is_current_stmt_binlog_format_row()); /* This is the first value of an INSERT statement. @@ -3639,10 +3659,9 @@ bool Delayed_insert::handle_inserts(void) TODO: Move the logging to last in the sequence of rows. */ - has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE || - table->file->has_transactions(); - if (thd.is_current_stmt_binlog_format_row() && - thd.binlog_flush_pending_rows_event(TRUE, has_trans)) + if (table->file->row_logging && + thd.binlog_flush_pending_rows_event(TRUE, + table->file->row_logging_has_trans)) goto err; if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE)))) @@ -4548,6 +4567,18 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items, /* purecov: end */ } table->s->table_creation_was_logged= save_table_creation_was_logged; + if (!table->s->tmp_table) + table->file->prepare_for_row_logging(); + + /* + If slave is converting a statement event to row events, log the original + create statement as an annotated row + */ +#ifdef HAVE_REPLICATION + if (thd->slave_thread && opt_replicate_annotate_row_events && + thd->is_current_stmt_binlog_format_row()) + thd->variables.binlog_annotate_row_events= 1; +#endif DBUG_RETURN(table); } @@ -4792,6 +4823,7 @@ bool binlog_create_table(THD *thd, TABLE *table) logged */ thd->set_current_stmt_binlog_format_row(); + table->file->prepare_for_row_logging(); return binlog_show_create_table(thd, table, 0) != 0; } @@ -4854,6 +4886,9 @@ bool select_create::send_eof() if (table->s->tmp_table) thd->transaction.stmt.mark_created_temp_table(); + if (thd->slave_thread) + thd->variables.binlog_annotate_row_events= 0; + if (prepare_eof()) { abort_result_set(); diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index f91589e581d..211a51a7a7e 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -7436,6 +7436,12 @@ void THD::reset_for_next_command(bool do_clear_error) DBUG_ENTER("THD::reset_for_next_command"); DBUG_ASSERT(!spcont); /* not for substatements of routines */ DBUG_ASSERT(!in_sub_stmt); + /* + Table maps should have been reset after previous statement except in the + case where we have locked tables + */ + DBUG_ASSERT(binlog_table_maps == 0 || + locked_tables_mode == LTM_LOCK_TABLES); if (likely(do_clear_error)) { diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc index 1d18b0ef392..abbea912c68 100644 --- a/sql/sql_plugin.cc +++ b/sql/sql_plugin.cc @@ -2168,14 +2168,13 @@ static bool finalize_install(THD *thd, TABLE *table, const LEX_CSTRING *name, of the insert into the plugin table, so that it is not replicated in row based mode. */ - tmp_disable_binlog(thd); + DBUG_ASSERT(!table->file->row_logging); table->use_all_columns(); restore_record(table, s->default_values); table->field[0]->store(name->str, name->length, system_charset_info); table->field[1]->store(tmp->plugin_dl->dl.str, tmp->plugin_dl->dl.length, files_charset_info); error= table->file->ha_write_row(table->record[0]); - reenable_binlog(thd); if (unlikely(error)) { table->file->print_error(error, MYF(0)); @@ -2322,9 +2321,8 @@ static bool do_uninstall(THD *thd, TABLE *table, const LEX_CSTRING *name) of the delete from the plugin table, so that it is not replicated in row based mode. */ - tmp_disable_binlog(thd); + table->file->row_logging= 0; // No logging error= table->file->ha_delete_row(table->record[0]); - reenable_binlog(thd); if (unlikely(error)) { table->file->print_error(error, MYF(0)); diff --git a/sql/sql_priv.h b/sql/sql_priv.h index c3ea9226643..493313da90a 100644 --- a/sql/sql_priv.h +++ b/sql/sql_priv.h @@ -180,6 +180,8 @@ #define OPTION_NO_QUERY_CACHE (1ULL << 39) // SELECT, user #define OPTION_PROCEDURE_CLAUSE (1ULL << 40) // Internal usage #define SELECT_NO_UNLOCK (1ULL << 41) // SELECT, intern +#define SELECT_NO_UNLOCK (1ULL << 41) // SELECT, intern +#define OPTION_BIN_TMP_LOG_OFF (1ULL << 42) // disable binlog, intern #define OPTION_LEX_FOUND_COMMENT (1ULL << 0) // intern, parser diff --git a/sql/sql_sequence.cc b/sql/sql_sequence.cc index a5676b8989a..f41221e11de 100644 --- a/sql/sql_sequence.cc +++ b/sql/sql_sequence.cc @@ -580,7 +580,6 @@ void sequence_definition::adjust_values(longlong next_value) int sequence_definition::write_initial_sequence(TABLE *table) { int error; - THD *thd= table->in_use; MY_BITMAP *save_write_set; store_fields(table); @@ -588,15 +587,14 @@ int sequence_definition::write_initial_sequence(TABLE *table) table->s->sequence->copy(this); /* Sequence values will be replicated as a statement - like 'create sequence'. So disable binary log temporarily + like 'create sequence'. So disable row logging for this table & statement */ - tmp_disable_binlog(thd); + table->file->row_logging= table->file->row_logging_init= 0; save_write_set= table->write_set; table->write_set= &table->s->all_set; table->s->sequence->initialized= SEQUENCE::SEQ_IN_PREPARE; error= table->file->ha_write_row(table->record[0]); table->s->sequence->initialized= SEQUENCE::SEQ_UNINTIALIZED; - reenable_binlog(thd); table->write_set= save_write_set; if (unlikely(error)) table->file->print_error(error, MYF(0)); diff --git a/sql/sql_servers.cc b/sql/sql_servers.cc index 1d12e95bbbb..fbf03dce212 100644 --- a/sql/sql_servers.cc +++ b/sql/sql_servers.cc @@ -404,6 +404,7 @@ insert_server(THD *thd, FOREIGN_SERVER *server) /* need to open before acquiring THR_LOCK_plugin or it will deadlock */ if (! (table= open_ltable(thd, &tables, TL_WRITE, MYSQL_LOCK_IGNORE_TIMEOUT))) goto end; + table->file->row_logging= 0; // Don't log to binary log /* insert the server into the table */ if (unlikely(error= insert_server_record(table, server))) @@ -542,9 +543,9 @@ int insert_server_record(TABLE *table, FOREIGN_SERVER *server) { int error; DBUG_ENTER("insert_server_record"); - tmp_disable_binlog(table->in_use); - table->use_all_columns(); + DBUG_ASSERT(!table->file->row_logging); + table->use_all_columns(); empty_record(table); /* set the field that's the PK to the value we're looking for */ @@ -577,8 +578,6 @@ int insert_server_record(TABLE *table, FOREIGN_SERVER *server) } else error= ER_FOREIGN_SERVER_EXISTS; - - reenable_binlog(table->in_use); DBUG_RETURN(error); } @@ -895,7 +894,8 @@ update_server_record(TABLE *table, FOREIGN_SERVER *server) { int error=0; DBUG_ENTER("update_server_record"); - tmp_disable_binlog(table->in_use); + DBUG_ASSERT(!table->file->row_logging); + table->use_all_columns(); /* set the field that's the PK to the value we're looking for */ table->field[0]->store(server->server_name, @@ -931,7 +931,6 @@ update_server_record(TABLE *table, FOREIGN_SERVER *server) } end: - reenable_binlog(table->in_use); DBUG_RETURN(error); } @@ -956,7 +955,8 @@ delete_server_record(TABLE *table, LEX_CSTRING *name) { int error; DBUG_ENTER("delete_server_record"); - tmp_disable_binlog(table->in_use); + DBUG_ASSERT(!table->file->row_logging); + table->use_all_columns(); /* set the field that's the PK to the value we're looking for */ @@ -980,7 +980,6 @@ delete_server_record(TABLE *table, LEX_CSTRING *name) table->file->print_error(error, MYF(0)); } - reenable_binlog(table->in_use); DBUG_RETURN(error); } diff --git a/sql/sql_table.cc b/sql/sql_table.cc index c5ce3a46499..05d4b450f30 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -10471,6 +10471,7 @@ do_continue:; No additional logging of query is needed */ binlog_done= 1; + DBUG_ASSERT(new_table->file->row_logging); new_table->mark_columns_needed_for_insert(); thd->binlog_write_table_map(new_table, 1); } diff --git a/sql/sql_trigger.cc b/sql/sql_trigger.cc index 5b8ae46d33f..726ba6d3cf2 100644 --- a/sql/sql_trigger.cc +++ b/sql/sql_trigger.cc @@ -1330,7 +1330,8 @@ bool Table_triggers_list::prepare_record_accessors(TABLE *table) */ bool Table_triggers_list::check_n_load(THD *thd, const LEX_CSTRING *db, - const LEX_CSTRING *table_name, TABLE *table, + const LEX_CSTRING *table_name, + TABLE *table, bool names_only) { char path_buff[FN_REFLEN]; diff --git a/sql/table.cc b/sql/table.cc index ee5b29ed8f1..811ea28b49c 100644 --- a/sql/table.cc +++ b/sql/table.cc @@ -5269,7 +5269,6 @@ void TABLE::init(THD *thd, TABLE_LIST *tl) /* used in RBR Triggers */ master_had_triggers= 0; #endif - /* Catch wrong handling of the auto_increment_field_not_null. */ DBUG_ASSERT(!auto_increment_field_not_null); auto_increment_field_not_null= FALSE; @@ -5284,7 +5283,6 @@ void TABLE::init(THD *thd, TABLE_LIST *tl) } notnull_cond= 0; - DBUG_ASSERT(!file->keyread_enabled()); restore_record(this, s->default_values); @@ -7293,8 +7291,7 @@ void TABLE::mark_columns_per_binlog_row_image() If in RBR we may need to mark some extra columns, depending on the binlog-row-image command line argument. */ - if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) && - thd->is_current_stmt_binlog_format_row() && + if (file->row_logging && !ha_check_storage_engine_flag(s->db_type(), HTON_NO_BINLOG_ROW_OPT)) { /* if there is no PK, then mark all columns for the BI. */ diff --git a/sql/temporary_tables.cc b/sql/temporary_tables.cc index 46465294893..407072a7b49 100644 --- a/sql/temporary_tables.cc +++ b/sql/temporary_tables.cc @@ -1124,13 +1124,11 @@ TABLE *THD::open_temporary_table(TMP_TABLE_SHARE *share, table->reginfo.lock_type= TL_WRITE; /* Simulate locked */ table->grant.privilege= TMP_TABLE_ACLS; + table->query_id= query_id; share->tmp_table= (table->file->has_transaction_manager() ? TRANSACTIONAL_TMP_TABLE : NON_TRANSACTIONAL_TMP_TABLE); share->not_usable_by_query_cache= 1; - table->pos_in_table_list= 0; - table->query_id= query_id; - /* Add table to the head of table list. */ share->all_tmp_tables.push_front(table); diff --git a/sql/transaction.cc b/sql/transaction.cc index 82e04d35479..db40bd6177b 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -302,8 +302,10 @@ bool trans_commit_implicit(THD *thd) DBUG_RETURN(TRUE); if (thd->variables.option_bits & OPTION_GTID_BEGIN) + { DBUG_PRINT("error", ("OPTION_GTID_BEGIN is set. " "Master and slave will have different GTID values")); + } if (thd->in_multi_stmt_transaction_mode() || (thd->variables.option_bits & OPTION_TABLE_LOCK)) @@ -361,9 +363,9 @@ bool trans_rollback(THD *thd) #ifdef HAVE_REPLICATION repl_semisync_master.wait_after_rollback(thd, FALSE); #endif - thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); /* Reset the binlog transaction marker */ - thd->variables.option_bits&= ~OPTION_GTID_BEGIN; + thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG | + OPTION_GTID_BEGIN); thd->transaction.all.reset(); thd->lex->start_transaction_opt= 0; diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 8ccab45700e..46ca4959741 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -222,7 +222,7 @@ extern wsrep_seqno_t wsrep_locked_seqno; /* use xxxxxx_NNULL macros when thd pointer is guaranteed to be non-null to * avoid compiler warnings (GCC 6 and later) */ #define WSREP_NNULL(thd) \ - (WSREP_ON && thd->variables.wsrep_on) + (thd->variables.wsrep_on && WSREP_ON) #define WSREP(thd) \ (thd && WSREP_NNULL(thd)) |