diff options
Diffstat (limited to 'sql/handler.cc')
-rw-r--r-- | sql/handler.cc | 315 |
1 files changed, 134 insertions, 181 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index 9a3766c90de..c3f78693849 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -3620,8 +3620,9 @@ int handler::update_auto_increment() variables->auto_increment_increment); auto_inc_intervals_count++; /* Row-based replication does not need to store intervals in binlog */ - if (((WSREP(thd) && wsrep_emulate_bin_log ) || mysql_bin_log.is_open()) - && !thd->is_current_stmt_binlog_format_row()) + if (((WSREP_NNULL(thd) && wsrep_emulate_bin_log) || + mysql_bin_log.is_open()) && + !thd->is_current_stmt_binlog_format_row()) thd->auto_inc_intervals_in_cur_stmt_for_binlog. append(auto_inc_interval_for_cur_row.minimum(), auto_inc_interval_for_cur_row.values(), @@ -6366,32 +6367,35 @@ bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat) 1 Row needs to be logged */ -bool handler::check_table_binlog_row_based(bool binlog_row) +bool handler::check_table_binlog_row_based() { - if (table->versioned(VERS_TRX_ID)) - return false; - if (unlikely((table->in_use->variables.sql_log_bin_off))) - return 0; /* Called by partitioning engine */ -#ifdef WITH_WSREP - if (!table->in_use->variables.sql_log_bin && - wsrep_thd_is_applying(table->in_use)) - return 0; /* wsrep patch sets sql_log_bin to silence binlogging - from high priority threads */ -#endif /* WITH_WSREP */ if (unlikely((!check_table_binlog_row_based_done))) { check_table_binlog_row_based_done= 1; check_table_binlog_row_based_result= - check_table_binlog_row_based_internal(binlog_row); + check_table_binlog_row_based_internal(); } return check_table_binlog_row_based_result; } -bool handler::check_table_binlog_row_based_internal(bool binlog_row) +bool handler::check_table_binlog_row_based_internal() { THD *thd= table->in_use; +#ifdef WITH_WSREP + if (!thd->variables.sql_log_bin && + wsrep_thd_is_applying(table->in_use)) + { + /* + wsrep patch sets sql_log_bin to silence binlogging from high + priority threads + */ + return 0; + } +#endif return (table->s->can_do_row_logging && + !table->versioned(VERS_TRX_ID) && + !(thd->variables.option_bits & OPTION_BIN_TMP_LOG_OFF) && thd->is_current_stmt_binlog_format_row() && /* Wsrep partially enables binary logging if it have not been @@ -6407,9 +6411,9 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row) Otherwise, return 'true' if binary logging is on. */ - IF_WSREP(((WSREP_EMULATE_BINLOG(thd) && + IF_WSREP(((WSREP_EMULATE_BINLOG_NNULL(thd) && wsrep_thd_is_local(thd)) || - ((WSREP(thd) || + ((WSREP_NNULL(thd) || (thd->variables.option_bits & OPTION_BIN_LOG)) && mysql_bin_log.is_open())), (thd->variables.option_bits & OPTION_BIN_LOG) && @@ -6417,151 +6421,22 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row) } -/** @brief - Write table maps for all (manually or automatically) locked tables - to the binary log. Also, if binlog_annotate_row_events is ON, - write Annotate_rows event before the first table map. - - SYNOPSIS - write_locked_table_maps() - thd Pointer to THD structure - - DESCRIPTION - This function will generate and write table maps for all tables - that are locked by the thread 'thd'. - - RETURN VALUE - 0 All OK - 1 Failed to write all table maps - - SEE ALSO - THD::lock -*/ - -static int write_locked_table_maps(THD *thd) -{ - DBUG_ENTER("write_locked_table_maps"); - DBUG_PRINT("enter", ("thd:%p thd->lock:%p " - "thd->extra_lock: %p", - thd, thd->lock, thd->extra_lock)); - - DBUG_PRINT("debug", ("get_binlog_table_maps(): %d", thd->get_binlog_table_maps())); - - MYSQL_LOCK *locks[2]; - locks[0]= thd->extra_lock; - locks[1]= thd->lock; - my_bool with_annotate= IF_WSREP(!wsrep_fragments_certified_for_stmt(thd), - true) && - thd->variables.binlog_annotate_row_events && - thd->query() && thd->query_length(); - - for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i ) - { - MYSQL_LOCK const *const lock= locks[i]; - if (lock == NULL) - continue; - - TABLE **const end_ptr= lock->table + lock->table_count; - for (TABLE **table_ptr= lock->table ; - table_ptr != end_ptr ; - ++table_ptr) - { - TABLE *const table= *table_ptr; - if (table->current_lock == F_WRLCK && - table->file->check_table_binlog_row_based(0)) - { - if (binlog_write_table_map(thd, table, with_annotate)) - DBUG_RETURN(1); - with_annotate= 0; - } - } - } - DBUG_RETURN(0); -} - - -int binlog_write_table_map(THD *thd, TABLE *table, bool with_annotate) -{ - DBUG_ENTER("binlog_write_table_map"); - DBUG_PRINT("info", ("table %s", table->s->table_name.str)); - /* - We need to have a transactional behavior for SQLCOM_CREATE_TABLE - (e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a - compatible behavior with the STMT based replication even when - the table is not transactional. In other words, if the operation - fails while executing the insert phase nothing is written to the - binlog. - - Note that at this point, we check the type of a set of tables to - create the table map events. In the function binlog_log_row(), - which calls the current function, we check the type of the table - of the current row. - */ - bool const has_trans= ((sql_command_flags[thd->lex->sql_command] & - (CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) || - table->file->has_transactions()); - int const error= thd->binlog_write_table_map(table, has_trans, - &with_annotate); - /* - If an error occurs, it is the responsibility of the caller to - roll back the transaction. - */ - if (unlikely(error)) - DBUG_RETURN(1); - DBUG_RETURN(0); -} - - -static int binlog_log_row_internal(TABLE* table, - const uchar *before_record, - const uchar *after_record, - Log_func *log_func) -{ - bool error= 0; - THD *const thd= table->in_use; - - /* - If there are no table maps written to the binary log, this is - the first row handled in this statement. In that case, we need - to write table maps for all locked tables to the binary log. - */ - if (likely(!(error= ((thd->get_binlog_table_maps() == 0 && - write_locked_table_maps(thd)))))) - { - /* - We need to have a transactional behavior for SQLCOM_CREATE_TABLE - (i.e. CREATE TABLE... SELECT * FROM TABLE) in order to keep a - compatible behavior with the STMT based replication even when - the table is not transactional. In other words, if the operation - fails while executing the insert phase nothing is written to the - binlog. We need the same also for ALTER TABLE in the case we convert - a shared table to a not shared table as in this case we will log all - rows. - */ - bool const has_trans= ((sql_command_flags[thd->lex->sql_command] & - (CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) || - table->file->has_transactions()); - error= (*log_func)(thd, table, has_trans, before_record, after_record); - } - return error ? HA_ERR_RBR_LOGGING_FAILED : 0; -} - -int binlog_log_row(TABLE* table, const uchar *before_record, - const uchar *after_record, Log_func *log_func) +int handler::binlog_log_row(TABLE *table, + const uchar *before_record, + const uchar *after_record, + Log_func *log_func) { -#ifdef WITH_WSREP - THD *const thd= table->in_use; + bool error; + THD *thd= table->in_use; + DBUG_ENTER("binlog_log_row"); - /* only InnoDB tables will be replicated through binlog emulation */ - if ((WSREP_EMULATE_BINLOG(thd) && - !(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) || - thd->wsrep_ignore_table == true) - return 0; -#endif + if (!thd->binlog_table_maps && + thd->binlog_write_table_maps()) + DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED); - if (!table->file->check_table_binlog_row_based(1)) - return 0; - return binlog_log_row_internal(table, before_record, after_record, log_func); + error= (*log_func)(thd, table, row_logging_has_trans, + before_record, after_record); + DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0); } @@ -6647,6 +6522,7 @@ int handler::ha_external_lock(THD *thd, int lock_type) int handler::ha_reset() { DBUG_ENTER("ha_reset"); + /* Check that we have called all proper deallocation functions */ DBUG_ASSERT((uchar*) table->def_read_set.bitmap + table->s->column_bitmap_size == @@ -6662,6 +6538,10 @@ int handler::ha_reset() pushed_cond= NULL; tracker= NULL; mark_trx_read_write_done= 0; + /* + Disable row logging. + */ + row_logging= row_logging_init= 0; clear_cached_table_binlog_row_based_flag(); /* Reset information about pushed engine conditions */ cancel_pushed_idx_cond(); @@ -6678,8 +6558,8 @@ static int wsrep_after_row(THD *thd) /* enforce wsrep_max_ws_rows */ thd->wsrep_affected_rows++; if (wsrep_max_ws_rows && - wsrep_thd_is_local(thd) && - thd->wsrep_affected_rows > wsrep_max_ws_rows) + thd->wsrep_affected_rows > wsrep_max_ws_rows && + wsrep_thd_is_local(thd)) { trans_rollback_stmt(thd) || trans_rollback(thd); my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0)); @@ -6864,6 +6744,74 @@ static int check_duplicate_long_entries_update(TABLE *table, uchar *new_rec) /** + Check if galera disables binary logging for this table + + @return 0 Binary logging disabled + @return 1 Binary logging can be enabled +*/ + + +static inline bool wsrep_check_if_binlog_row(TABLE *table) +{ +#ifdef WITH_WSREP + THD *const thd= table->in_use; + + /* only InnoDB tables will be replicated through binlog emulation */ + if ((WSREP_EMULATE_BINLOG(thd) && + !(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) || + thd->wsrep_ignore_table == true) + return 0; +#endif + return 1; +} + + +/** + Prepare handler for row logging + + @return 0 if handler will not participate in row logging + @return 1 handler will participate in row logging + + This function is always safe to call on an opened table. +*/ + +bool handler::prepare_for_row_logging() +{ + DBUG_ENTER("handler::prepare_for_row_logging"); + + /* Check if we should have row logging */ + if (wsrep_check_if_binlog_row(table) && + check_table_binlog_row_based()) + { + /* + Row logging enabled. Intialize all variables and write + annotated and table maps + */ + row_logging= row_logging_init= 1; + + /* + We need to have a transactional behavior for SQLCOM_CREATE_TABLE + (e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a + compatible behavior with the STMT based replication even when + the table is not transactional. In other words, if the operation + fails while executing the insert phase nothing is written to the + binlog. + */ + row_logging_has_trans= + ((sql_command_flags[table->in_use->lex->sql_command] & + (CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) || + table->file->has_transactions()); + } + else + { + /* Check row_logging has not been properly cleared from previous command */ + DBUG_ASSERT(row_logging == 0); + } + DBUG_RETURN(row_logging); +} + + +/* Do all initialization needed for insert @param force_update_handler Set to TRUE if we should always create an @@ -6891,7 +6839,6 @@ int handler::prepare_for_insert(bool force_update_handler) int handler::ha_write_row(const uchar *buf) { int error; - Log_func *log_func= Write_rows_log_event::binlog_row_logging_function; DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK); DBUG_ENTER("handler::ha_write_row"); @@ -6911,13 +6858,17 @@ int handler::ha_write_row(const uchar *buf) { error= write_row(buf); }) MYSQL_INSERT_ROW_DONE(error); - if (likely(!error) && !row_already_logged) + if (likely(!error)) { rows_changed++; - error= binlog_log_row(table, 0, buf, log_func); + if (row_logging) + { + Log_func *log_func= Write_rows_log_event::binlog_row_logging_function; + error= binlog_log_row(table, 0, buf, log_func); + } #ifdef WITH_WSREP - if (table_share->tmp_table == NO_TMP_TABLE && - WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd()))) + if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE && + !error && (error= wsrep_after_row(ha_thd()))) { DBUG_RETURN(error); } @@ -6932,10 +6883,8 @@ int handler::ha_write_row(const uchar *buf) int handler::ha_update_row(const uchar *old_data, const uchar *new_data) { int error; - Log_func *log_func= Update_rows_log_event::binlog_row_logging_function; DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK); - /* Some storage engines require that the new record is in record[0] (and the old record is in record[1]). @@ -6950,20 +6899,22 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data) (error= check_duplicate_long_entries_update(table, (uchar*) new_data))) return error; - TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, error, + TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, 0, { error= update_row(old_data, new_data);}) MYSQL_UPDATE_ROW_DONE(error); - if (likely(!error) && !row_already_logged) + if (likely(!error)) { rows_changed++; - error= binlog_log_row(table, old_data, new_data, log_func); -#ifdef WITH_WSREP - if (table_share->tmp_table == NO_TMP_TABLE && - WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd()))) + if (row_logging) { - return error; + Log_func *log_func= Update_rows_log_event::binlog_row_logging_function; + error= binlog_log_row(table, old_data, new_data, log_func); } +#ifdef WITH_WSREP + if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE && + !error && (error= wsrep_after_row(ha_thd()))) + return error; #endif /* WITH_WSREP */ } return error; @@ -7000,7 +6951,6 @@ int handler::update_first_row(const uchar *new_data) int handler::ha_delete_row(const uchar *buf) { int error; - Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function; DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK); /* @@ -7019,10 +6969,14 @@ int handler::ha_delete_row(const uchar *buf) if (likely(!error)) { rows_changed++; - error= binlog_log_row(table, buf, 0, log_func); + if (row_logging) + { + Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function; + error= binlog_log_row(table, buf, 0, log_func); + } #ifdef WITH_WSREP - if (table_share->tmp_table == NO_TMP_TABLE && - WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd()))) + if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE && + !error && (error= wsrep_after_row(ha_thd()))) { return error; } @@ -7049,11 +7003,10 @@ int handler::ha_delete_row(const uchar *buf) int handler::ha_direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) { int error; - MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str); mark_trx_read_write(); - error = direct_update_rows(update_rows, found_rows); + error= direct_update_rows(update_rows, found_rows); MYSQL_UPDATE_ROW_DONE(error); return error; } |