summaryrefslogtreecommitdiff
path: root/sql/handler.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/handler.cc')
-rw-r--r--sql/handler.cc315
1 files changed, 134 insertions, 181 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index 9a3766c90de..c3f78693849 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -3620,8 +3620,9 @@ int handler::update_auto_increment()
variables->auto_increment_increment);
auto_inc_intervals_count++;
/* Row-based replication does not need to store intervals in binlog */
- if (((WSREP(thd) && wsrep_emulate_bin_log ) || mysql_bin_log.is_open())
- && !thd->is_current_stmt_binlog_format_row())
+ if (((WSREP_NNULL(thd) && wsrep_emulate_bin_log) ||
+ mysql_bin_log.is_open()) &&
+ !thd->is_current_stmt_binlog_format_row())
thd->auto_inc_intervals_in_cur_stmt_for_binlog.
append(auto_inc_interval_for_cur_row.minimum(),
auto_inc_interval_for_cur_row.values(),
@@ -6366,32 +6367,35 @@ bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat)
1 Row needs to be logged
*/
-bool handler::check_table_binlog_row_based(bool binlog_row)
+bool handler::check_table_binlog_row_based()
{
- if (table->versioned(VERS_TRX_ID))
- return false;
- if (unlikely((table->in_use->variables.sql_log_bin_off)))
- return 0; /* Called by partitioning engine */
-#ifdef WITH_WSREP
- if (!table->in_use->variables.sql_log_bin &&
- wsrep_thd_is_applying(table->in_use))
- return 0; /* wsrep patch sets sql_log_bin to silence binlogging
- from high priority threads */
-#endif /* WITH_WSREP */
if (unlikely((!check_table_binlog_row_based_done)))
{
check_table_binlog_row_based_done= 1;
check_table_binlog_row_based_result=
- check_table_binlog_row_based_internal(binlog_row);
+ check_table_binlog_row_based_internal();
}
return check_table_binlog_row_based_result;
}
-bool handler::check_table_binlog_row_based_internal(bool binlog_row)
+bool handler::check_table_binlog_row_based_internal()
{
THD *thd= table->in_use;
+#ifdef WITH_WSREP
+ if (!thd->variables.sql_log_bin &&
+ wsrep_thd_is_applying(table->in_use))
+ {
+ /*
+ wsrep patch sets sql_log_bin to silence binlogging from high
+ priority threads
+ */
+ return 0;
+ }
+#endif
return (table->s->can_do_row_logging &&
+ !table->versioned(VERS_TRX_ID) &&
+ !(thd->variables.option_bits & OPTION_BIN_TMP_LOG_OFF) &&
thd->is_current_stmt_binlog_format_row() &&
/*
Wsrep partially enables binary logging if it have not been
@@ -6407,9 +6411,9 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row)
Otherwise, return 'true' if binary logging is on.
*/
- IF_WSREP(((WSREP_EMULATE_BINLOG(thd) &&
+ IF_WSREP(((WSREP_EMULATE_BINLOG_NNULL(thd) &&
wsrep_thd_is_local(thd)) ||
- ((WSREP(thd) ||
+ ((WSREP_NNULL(thd) ||
(thd->variables.option_bits & OPTION_BIN_LOG)) &&
mysql_bin_log.is_open())),
(thd->variables.option_bits & OPTION_BIN_LOG) &&
@@ -6417,151 +6421,22 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row)
}
-/** @brief
- Write table maps for all (manually or automatically) locked tables
- to the binary log. Also, if binlog_annotate_row_events is ON,
- write Annotate_rows event before the first table map.
-
- SYNOPSIS
- write_locked_table_maps()
- thd Pointer to THD structure
-
- DESCRIPTION
- This function will generate and write table maps for all tables
- that are locked by the thread 'thd'.
-
- RETURN VALUE
- 0 All OK
- 1 Failed to write all table maps
-
- SEE ALSO
- THD::lock
-*/
-
-static int write_locked_table_maps(THD *thd)
-{
- DBUG_ENTER("write_locked_table_maps");
- DBUG_PRINT("enter", ("thd:%p thd->lock:%p "
- "thd->extra_lock: %p",
- thd, thd->lock, thd->extra_lock));
-
- DBUG_PRINT("debug", ("get_binlog_table_maps(): %d", thd->get_binlog_table_maps()));
-
- MYSQL_LOCK *locks[2];
- locks[0]= thd->extra_lock;
- locks[1]= thd->lock;
- my_bool with_annotate= IF_WSREP(!wsrep_fragments_certified_for_stmt(thd),
- true) &&
- thd->variables.binlog_annotate_row_events &&
- thd->query() && thd->query_length();
-
- for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i )
- {
- MYSQL_LOCK const *const lock= locks[i];
- if (lock == NULL)
- continue;
-
- TABLE **const end_ptr= lock->table + lock->table_count;
- for (TABLE **table_ptr= lock->table ;
- table_ptr != end_ptr ;
- ++table_ptr)
- {
- TABLE *const table= *table_ptr;
- if (table->current_lock == F_WRLCK &&
- table->file->check_table_binlog_row_based(0))
- {
- if (binlog_write_table_map(thd, table, with_annotate))
- DBUG_RETURN(1);
- with_annotate= 0;
- }
- }
- }
- DBUG_RETURN(0);
-}
-
-
-int binlog_write_table_map(THD *thd, TABLE *table, bool with_annotate)
-{
- DBUG_ENTER("binlog_write_table_map");
- DBUG_PRINT("info", ("table %s", table->s->table_name.str));
- /*
- We need to have a transactional behavior for SQLCOM_CREATE_TABLE
- (e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
- compatible behavior with the STMT based replication even when
- the table is not transactional. In other words, if the operation
- fails while executing the insert phase nothing is written to the
- binlog.
-
- Note that at this point, we check the type of a set of tables to
- create the table map events. In the function binlog_log_row(),
- which calls the current function, we check the type of the table
- of the current row.
- */
- bool const has_trans= ((sql_command_flags[thd->lex->sql_command] &
- (CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) ||
- table->file->has_transactions());
- int const error= thd->binlog_write_table_map(table, has_trans,
- &with_annotate);
- /*
- If an error occurs, it is the responsibility of the caller to
- roll back the transaction.
- */
- if (unlikely(error))
- DBUG_RETURN(1);
- DBUG_RETURN(0);
-}
-
-
-static int binlog_log_row_internal(TABLE* table,
- const uchar *before_record,
- const uchar *after_record,
- Log_func *log_func)
-{
- bool error= 0;
- THD *const thd= table->in_use;
-
- /*
- If there are no table maps written to the binary log, this is
- the first row handled in this statement. In that case, we need
- to write table maps for all locked tables to the binary log.
- */
- if (likely(!(error= ((thd->get_binlog_table_maps() == 0 &&
- write_locked_table_maps(thd))))))
- {
- /*
- We need to have a transactional behavior for SQLCOM_CREATE_TABLE
- (i.e. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
- compatible behavior with the STMT based replication even when
- the table is not transactional. In other words, if the operation
- fails while executing the insert phase nothing is written to the
- binlog. We need the same also for ALTER TABLE in the case we convert
- a shared table to a not shared table as in this case we will log all
- rows.
- */
- bool const has_trans= ((sql_command_flags[thd->lex->sql_command] &
- (CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) ||
- table->file->has_transactions());
- error= (*log_func)(thd, table, has_trans, before_record, after_record);
- }
- return error ? HA_ERR_RBR_LOGGING_FAILED : 0;
-}
-
-int binlog_log_row(TABLE* table, const uchar *before_record,
- const uchar *after_record, Log_func *log_func)
+int handler::binlog_log_row(TABLE *table,
+ const uchar *before_record,
+ const uchar *after_record,
+ Log_func *log_func)
{
-#ifdef WITH_WSREP
- THD *const thd= table->in_use;
+ bool error;
+ THD *thd= table->in_use;
+ DBUG_ENTER("binlog_log_row");
- /* only InnoDB tables will be replicated through binlog emulation */
- if ((WSREP_EMULATE_BINLOG(thd) &&
- !(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) ||
- thd->wsrep_ignore_table == true)
- return 0;
-#endif
+ if (!thd->binlog_table_maps &&
+ thd->binlog_write_table_maps())
+ DBUG_RETURN(HA_ERR_RBR_LOGGING_FAILED);
- if (!table->file->check_table_binlog_row_based(1))
- return 0;
- return binlog_log_row_internal(table, before_record, after_record, log_func);
+ error= (*log_func)(thd, table, row_logging_has_trans,
+ before_record, after_record);
+ DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0);
}
@@ -6647,6 +6522,7 @@ int handler::ha_external_lock(THD *thd, int lock_type)
int handler::ha_reset()
{
DBUG_ENTER("ha_reset");
+
/* Check that we have called all proper deallocation functions */
DBUG_ASSERT((uchar*) table->def_read_set.bitmap +
table->s->column_bitmap_size ==
@@ -6662,6 +6538,10 @@ int handler::ha_reset()
pushed_cond= NULL;
tracker= NULL;
mark_trx_read_write_done= 0;
+ /*
+ Disable row logging.
+ */
+ row_logging= row_logging_init= 0;
clear_cached_table_binlog_row_based_flag();
/* Reset information about pushed engine conditions */
cancel_pushed_idx_cond();
@@ -6678,8 +6558,8 @@ static int wsrep_after_row(THD *thd)
/* enforce wsrep_max_ws_rows */
thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows &&
- wsrep_thd_is_local(thd) &&
- thd->wsrep_affected_rows > wsrep_max_ws_rows)
+ thd->wsrep_affected_rows > wsrep_max_ws_rows &&
+ wsrep_thd_is_local(thd))
{
trans_rollback_stmt(thd) || trans_rollback(thd);
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
@@ -6864,6 +6744,74 @@ static int check_duplicate_long_entries_update(TABLE *table, uchar *new_rec)
/**
+ Check if galera disables binary logging for this table
+
+ @return 0 Binary logging disabled
+ @return 1 Binary logging can be enabled
+*/
+
+
+static inline bool wsrep_check_if_binlog_row(TABLE *table)
+{
+#ifdef WITH_WSREP
+ THD *const thd= table->in_use;
+
+ /* only InnoDB tables will be replicated through binlog emulation */
+ if ((WSREP_EMULATE_BINLOG(thd) &&
+ !(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) ||
+ thd->wsrep_ignore_table == true)
+ return 0;
+#endif
+ return 1;
+}
+
+
+/**
+ Prepare handler for row logging
+
+ @return 0 if handler will not participate in row logging
+ @return 1 handler will participate in row logging
+
+ This function is always safe to call on an opened table.
+*/
+
+bool handler::prepare_for_row_logging()
+{
+ DBUG_ENTER("handler::prepare_for_row_logging");
+
+ /* Check if we should have row logging */
+ if (wsrep_check_if_binlog_row(table) &&
+ check_table_binlog_row_based())
+ {
+ /*
+ Row logging enabled. Intialize all variables and write
+ annotated and table maps
+ */
+ row_logging= row_logging_init= 1;
+
+ /*
+ We need to have a transactional behavior for SQLCOM_CREATE_TABLE
+ (e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
+ compatible behavior with the STMT based replication even when
+ the table is not transactional. In other words, if the operation
+ fails while executing the insert phase nothing is written to the
+ binlog.
+ */
+ row_logging_has_trans=
+ ((sql_command_flags[table->in_use->lex->sql_command] &
+ (CF_SCHEMA_CHANGE | CF_ADMIN_COMMAND)) ||
+ table->file->has_transactions());
+ }
+ else
+ {
+ /* Check row_logging has not been properly cleared from previous command */
+ DBUG_ASSERT(row_logging == 0);
+ }
+ DBUG_RETURN(row_logging);
+}
+
+
+/*
Do all initialization needed for insert
@param force_update_handler Set to TRUE if we should always create an
@@ -6891,7 +6839,6 @@ int handler::prepare_for_insert(bool force_update_handler)
int handler::ha_write_row(const uchar *buf)
{
int error;
- Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type == F_WRLCK);
DBUG_ENTER("handler::ha_write_row");
@@ -6911,13 +6858,17 @@ int handler::ha_write_row(const uchar *buf)
{ error= write_row(buf); })
MYSQL_INSERT_ROW_DONE(error);
- if (likely(!error) && !row_already_logged)
+ if (likely(!error))
{
rows_changed++;
- error= binlog_log_row(table, 0, buf, log_func);
+ if (row_logging)
+ {
+ Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
+ error= binlog_log_row(table, 0, buf, log_func);
+ }
#ifdef WITH_WSREP
- if (table_share->tmp_table == NO_TMP_TABLE &&
- WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
+ if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE &&
+ !error && (error= wsrep_after_row(ha_thd())))
{
DBUG_RETURN(error);
}
@@ -6932,10 +6883,8 @@ int handler::ha_write_row(const uchar *buf)
int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
{
int error;
- Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type == F_WRLCK);
-
/*
Some storage engines require that the new record is in record[0]
(and the old record is in record[1]).
@@ -6950,20 +6899,22 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
(error= check_duplicate_long_entries_update(table, (uchar*) new_data)))
return error;
- TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, error,
+ TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, 0,
{ error= update_row(old_data, new_data);})
MYSQL_UPDATE_ROW_DONE(error);
- if (likely(!error) && !row_already_logged)
+ if (likely(!error))
{
rows_changed++;
- error= binlog_log_row(table, old_data, new_data, log_func);
-#ifdef WITH_WSREP
- if (table_share->tmp_table == NO_TMP_TABLE &&
- WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
+ if (row_logging)
{
- return error;
+ Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
+ error= binlog_log_row(table, old_data, new_data, log_func);
}
+#ifdef WITH_WSREP
+ if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE &&
+ !error && (error= wsrep_after_row(ha_thd())))
+ return error;
#endif /* WITH_WSREP */
}
return error;
@@ -7000,7 +6951,6 @@ int handler::update_first_row(const uchar *new_data)
int handler::ha_delete_row(const uchar *buf)
{
int error;
- Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type == F_WRLCK);
/*
@@ -7019,10 +6969,14 @@ int handler::ha_delete_row(const uchar *buf)
if (likely(!error))
{
rows_changed++;
- error= binlog_log_row(table, buf, 0, log_func);
+ if (row_logging)
+ {
+ Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
+ error= binlog_log_row(table, buf, 0, log_func);
+ }
#ifdef WITH_WSREP
- if (table_share->tmp_table == NO_TMP_TABLE &&
- WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
+ if (WSREP_NNULL(ha_thd()) && table_share->tmp_table == NO_TMP_TABLE &&
+ !error && (error= wsrep_after_row(ha_thd())))
{
return error;
}
@@ -7049,11 +7003,10 @@ int handler::ha_delete_row(const uchar *buf)
int handler::ha_direct_update_rows(ha_rows *update_rows, ha_rows *found_rows)
{
int error;
-
MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
mark_trx_read_write();
- error = direct_update_rows(update_rows, found_rows);
+ error= direct_update_rows(update_rows, found_rows);
MYSQL_UPDATE_ROW_DONE(error);
return error;
}