diff options
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r-- | sql/sql_class.cc | 451 |
1 files changed, 449 insertions, 2 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc index a28324c5e28..08d89228a72 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -27,6 +27,8 @@ #endif #include "mysql_priv.h" +#include <my_bitmap.h> +#include "log_event.h" #include <m_ctype.h> #include <sys/stat.h> #include <thr_alarm.h> @@ -174,7 +176,7 @@ Open_tables_state::Open_tables_state(ulong version_arg) THD::THD() :Statement(CONVENTIONAL_EXECUTION, 0, ALLOC_ROOT_MIN_BLOCK_SIZE, 0), - Open_tables_state(refresh_version), + Open_tables_state(refresh_version), rli_fake(0), lock_id(&main_lock_id), user_time(0), in_sub_stmt(0), global_read_lock(0), is_fatal_error(0), rand_used(0), time_zone_used(0), @@ -227,6 +229,9 @@ THD::THD() ull=0; system_thread= cleanup_done= abort_on_warning= no_warnings_for_error= 0; peer_port= 0; // For SHOW PROCESSLIST +#ifdef HAVE_ROW_BASED_REPLICATION + transaction.m_pending_rows_event= 0; +#endif #ifdef __WIN__ real_id = 0; #endif @@ -440,6 +445,11 @@ THD::~THD() #ifndef DBUG_OFF dbug_sentry= THD_SENTRY_GONE; #endif +#ifndef EMBEDDED_LIBRARY + if (rli_fake) + delete rli_fake; +#endif + DBUG_VOID_RETURN; } @@ -1959,7 +1969,8 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup, backup->client_capabilities= client_capabilities; backup->savepoints= transaction.savepoints; - if (!lex->requires_prelocking() || is_update_query(lex->sql_command)) + if ((!lex->requires_prelocking() || is_update_query(lex->sql_command)) && + !binlog_row_based) options&= ~OPTION_BIN_LOG; /* Disable result sets */ client_capabilities &= ~CLIENT_MULTI_RESULTS; @@ -2101,3 +2112,439 @@ void xid_cache_delete(XID_STATE *xid_state) pthread_mutex_unlock(&LOCK_xid_cache); } +/* + Implementation of interface to write rows to the binary log through the + thread. The thread is responsible for writing the rows it has + inserted/updated/deleted. +*/ + +#ifndef MYSQL_CLIENT +#ifdef HAVE_ROW_BASED_REPLICATION + +/* + Template member function for ensuring that there is an rows log + event of the apropriate type before proceeding. + + PRE CONDITION: + - Events of type 'RowEventT' have the type code 'type_code'. + + POST CONDITION: + If a non-NULL pointer is returned, the pending event for thread 'thd' will + be an event of type 'RowEventT' (which have the type code 'type_code') + will either empty or have enough space to hold 'needed' bytes. In + addition, the columns bitmap will be correct for the row, meaning that + the pending event will be flushed if the columns in the event differ from + the columns suppled to the function. + + RETURNS + If no error, a non-NULL pending event (either one which already existed or + the newly created one). + If error, NULL. + */ + +template <class RowsEventT> Rows_log_event* +THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id, + MY_BITMAP const* cols, + my_size_t colcnt, + my_size_t needed, + bool is_transactional) +{ + /* Pre-conditions */ + DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); + + /* Fetch the type code for the RowsEventT template parameter */ + int const type_code= RowsEventT::TYPE_CODE; + + /* + There is no good place to set up the transactional data, so we + have to do it here. + */ + if (binlog_setup_trx_data()) + return NULL; + + Rows_log_event* pending= binlog_get_pending_rows_event(); + + if (unlikely(pending && !pending->is_valid())) + return NULL; + + /* + Check if the current event is non-NULL and a write-rows + event. Also check if the table provided is mapped: if it is not, + then we have switched to writing to a new table. + If there is no pending event, we need to create one. If there is a pending + event, but it's not about the same table id, or not of the same type + (between Write, Update and Delete), or not the same affected columns, or + going to be too big, flush this event to disk and create a new pending + event. + */ + if (!pending || + pending->server_id != serv_id || + pending->get_table_id() != table->s->table_map_id || + pending->get_type_code() != type_code || + pending->get_data_size() + needed > opt_binlog_rows_event_max_size || + pending->get_width() != colcnt || + !bitmap_cmp(pending->get_cols(), cols)) + { + /* Create a new RowsEventT... */ + Rows_log_event* const + ev= new RowsEventT(this, table, table->s->table_map_id, cols, + is_transactional); + if (unlikely(!ev)) + return NULL; + ev->server_id= serv_id; // I don't like this, it's too easy to forget. + /* + flush the pending event and replace it with the newly created + event... + */ + if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev))) + { + delete ev; + return NULL; + } + + return ev; /* This is the new pending event */ + } + return pending; /* This is the current pending event */ +} + +/* + Instansiate the versions we need, we have -fno-implicit-template as + compiling option. +*/ +template Rows_log_event* +THD::binlog_prepare_pending_rows_event<Write_rows_log_event> +(TABLE*, uint32, MY_BITMAP const*, my_size_t colcnt, size_t, bool); + +template Rows_log_event* +THD::binlog_prepare_pending_rows_event<Delete_rows_log_event> +(TABLE*, uint32, MY_BITMAP const*, my_size_t colcnt, size_t, bool); + +template Rows_log_event* +THD::binlog_prepare_pending_rows_event<Update_rows_log_event> +(TABLE*, uint32, MY_BITMAP const*, my_size_t colcnt, size_t, bool); + +static char const* +field_type_name(enum_field_types type) +{ + switch (type) + { + case MYSQL_TYPE_DECIMAL: + return "MYSQL_TYPE_DECIMAL"; + case MYSQL_TYPE_TINY: + return "MYSQL_TYPE_TINY"; + case MYSQL_TYPE_SHORT: + return "MYSQL_TYPE_SHORT"; + case MYSQL_TYPE_LONG: + return "MYSQL_TYPE_LONG"; + case MYSQL_TYPE_FLOAT: + return "MYSQL_TYPE_FLOAT"; + case MYSQL_TYPE_DOUBLE: + return "MYSQL_TYPE_DOUBLE"; + case MYSQL_TYPE_NULL: + return "MYSQL_TYPE_NULL"; + case MYSQL_TYPE_TIMESTAMP: + return "MYSQL_TYPE_TIMESTAMP"; + case MYSQL_TYPE_LONGLONG: + return "MYSQL_TYPE_LONGLONG"; + case MYSQL_TYPE_INT24: + return "MYSQL_TYPE_INT24"; + case MYSQL_TYPE_DATE: + return "MYSQL_TYPE_DATE"; + case MYSQL_TYPE_TIME: + return "MYSQL_TYPE_TIME"; + case MYSQL_TYPE_DATETIME: + return "MYSQL_TYPE_DATETIME"; + case MYSQL_TYPE_YEAR: + return "MYSQL_TYPE_YEAR"; + case MYSQL_TYPE_NEWDATE: + return "MYSQL_TYPE_NEWDATE"; + case MYSQL_TYPE_VARCHAR: + return "MYSQL_TYPE_VARCHAR"; + case MYSQL_TYPE_BIT: + return "MYSQL_TYPE_BIT"; + case MYSQL_TYPE_NEWDECIMAL: + return "MYSQL_TYPE_NEWDECIMAL"; + case MYSQL_TYPE_ENUM: + return "MYSQL_TYPE_ENUM"; + case MYSQL_TYPE_SET: + return "MYSQL_TYPE_SET"; + case MYSQL_TYPE_TINY_BLOB: + return "MYSQL_TYPE_TINY_BLOB"; + case MYSQL_TYPE_MEDIUM_BLOB: + return "MYSQL_TYPE_MEDIUM_BLOB"; + case MYSQL_TYPE_LONG_BLOB: + return "MYSQL_TYPE_LONG_BLOB"; + case MYSQL_TYPE_BLOB: + return "MYSQL_TYPE_BLOB"; + case MYSQL_TYPE_VAR_STRING: + return "MYSQL_TYPE_VAR_STRING"; + case MYSQL_TYPE_STRING: + return "MYSQL_TYPE_STRING"; + case MYSQL_TYPE_GEOMETRY: + return "MYSQL_TYPE_GEOMETRY"; + } + return "Unknown"; +} + +my_size_t THD::max_row_length_blob(TABLE *table, const byte *data) const +{ + my_size_t length= 0; + TABLE_SHARE *table_s= table->s; + uint* const beg= table_s->blob_field; + uint* const end= beg + table_s->blob_fields; + + for (uint *ptr= beg ; ptr != end ; ++ptr) + { + Field_blob* const blob= (Field_blob*) table->field[*ptr]; + length+= blob->get_length(data + blob->offset()) + 2; + } + + return length; +} + +my_size_t THD::pack_row(TABLE *table, MY_BITMAP const* cols, byte *row_data, + const byte *record) const +{ + Field **p_field= table->field, *field= *p_field; + int n_null_bytes= table->s->null_bytes; + my_ptrdiff_t const offset= record - (byte*) table->record[0]; + + memcpy(row_data, record, n_null_bytes); + byte *ptr= row_data+n_null_bytes; + + for (int i= 0 ; field ; i++, p_field++, field= *p_field) + { + if (bitmap_is_set(cols,i)) + ptr= field->pack(ptr, field->ptr + offset); + } + + /* + my_ptrdiff_t is signed, size_t is unsigned. Assert that the + conversion will work correctly. + */ + DBUG_ASSERT(ptr - row_data >= 0); + return (static_cast<size_t>(ptr - row_data)); +} + +int THD::binlog_write_row(TABLE* table, bool is_trans, + MY_BITMAP const* cols, my_size_t colcnt, + byte const *record) +{ + DBUG_ASSERT(binlog_row_based && mysql_bin_log.is_open()); + + /* + Pack records into format for transfer. We are allocating more + memory than needed, but that doesn't matter. + */ + bool error= 0; + byte *row_data= table->write_row_record; + my_size_t const max_len= max_row_length(table, record); + + /* + * Allocate room for a row (if needed) + */ + if (!row_data) + { + if (!table->s->blob_fields) + { + /* multiply max_len by 2 so it can be used for update_row as well */ + table->write_row_record= alloc_root(&table->mem_root, 2*max_len); + if (!table->write_row_record) + return HA_ERR_OUT_OF_MEM; + row_data= table->write_row_record; + } + else if (unlikely(!(row_data= my_malloc(max_len, MYF(MY_WME))))) + return HA_ERR_OUT_OF_MEM; + } + my_size_t const len= pack_row(table, cols, row_data, record); + + Rows_log_event* const + ev= binlog_prepare_pending_rows_event<Write_rows_log_event> + (table, server_id, cols, colcnt, len, is_trans); + + /* add_row_data copies row_data to internal buffer */ + error= likely(ev != 0) ? ev->add_row_data(row_data,len) : HA_ERR_OUT_OF_MEM ; + + if (table->write_row_record == 0) + my_free(row_data, MYF(MY_WME)); + + return error; +} + +int THD::binlog_update_row(TABLE* table, bool is_trans, + MY_BITMAP const* cols, my_size_t colcnt, + const byte *before_record, + const byte *after_record) +{ + DBUG_ASSERT(binlog_row_based && mysql_bin_log.is_open()); + + bool error= 0; + my_size_t const before_maxlen = max_row_length(table, before_record); + my_size_t const after_maxlen = max_row_length(table, after_record); + + byte *row_data= table->write_row_record; + byte *before_row, *after_row; + if (row_data != 0) + { + before_row= row_data; + after_row= before_row + before_maxlen; + } + else + { + if (unlikely(!(row_data= my_multi_malloc(MYF(MY_WME), + &before_row, before_maxlen, + &after_row, after_maxlen, + NULL)))) + return HA_ERR_OUT_OF_MEM; + } + + my_size_t const before_size= pack_row(table, cols, before_row, + before_record); + my_size_t const after_size= pack_row(table, cols, after_row, + after_record); + + Rows_log_event* const + ev= binlog_prepare_pending_rows_event<Update_rows_log_event> + (table, server_id, cols, colcnt, before_size + after_size, is_trans); + + error= (unlikely(!ev)) || ev->add_row_data(before_row, before_size) || + ev->add_row_data(after_row, after_size); + + if (!table->write_row_record) + { + /* add_row_data copies row_data to internal buffer */ + my_free(row_data, MYF(MY_WME)); + } + + return error; +} + +int THD::binlog_delete_row(TABLE* table, bool is_trans, + MY_BITMAP const* cols, my_size_t colcnt, + byte const *record) +{ + DBUG_ASSERT(binlog_row_based && mysql_bin_log.is_open()); + + /* + Pack records into format for transfer. We are allocating more + memory than needed, but that doesn't matter. + */ + bool error= 0; + my_size_t const max_len= max_row_length(table, record); + byte *row_data= table->write_row_record; + if (!row_data && unlikely(!(row_data= my_malloc(max_len, MYF(MY_WME))))) + return HA_ERR_OUT_OF_MEM; + my_size_t const len= pack_row(table, cols, row_data, record); + + Rows_log_event* const + ev= binlog_prepare_pending_rows_event<Delete_rows_log_event> + (table, server_id, cols, colcnt, len, is_trans); + + error= (unlikely(!ev)) || ev->add_row_data(row_data, len); + + /* add_row_data copies row_data */ + if (table->write_row_record == 0) + my_free(row_data, MYF(MY_WME)); + + return error; +} + + +int THD::binlog_flush_pending_rows_event(bool stmt_end) +{ + DBUG_ENTER("THD::binlog_flush_pending_rows_event"); + if (!binlog_row_based || !mysql_bin_log.is_open()) + DBUG_RETURN(0); + + /* + Mark the event as the last event of a statement if the stmt_end + flag is set. + */ + int error= 0; + if (Rows_log_event *pending= binlog_get_pending_rows_event()) + { + if (stmt_end) + { + pending->set_flags(Rows_log_event::STMT_END_F); + pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F; + } + + /* + We only bother to set the pending event if it is non-NULL. This + is essential for correctness, since there is not necessarily a + trx_data created for the thread if the pending event is NULL. + */ + error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0); + } + + DBUG_RETURN(error); +} + + +void THD::binlog_delete_pending_rows_event() +{ + if (Rows_log_event *pending= binlog_get_pending_rows_event()) + { + delete pending; + binlog_set_pending_rows_event(0); + } +} + +#endif /* HAVE_ROW_BASED_REPLICATION */ + +/* + Member function that will log query, either row-based or + statement-based depending on the value of the 'binlog_row_based' + variable and the value of the 'qtype' flag. + + This function should be called after the all calls to ha_*_row() + functions have been issued, but before tables are unlocked and + closed. + + RETURN VALUE + Error code, or 0 if no error. +*/ +int THD::binlog_query(THD::enum_binlog_query_type qtype, + char const *query, ulong query_len, + bool is_trans, bool suppress_use) +{ + DBUG_ENTER("THD::binlog_query"); + DBUG_ASSERT(query && mysql_bin_log.is_open()); + int error= binlog_flush_pending_rows_event(true); + switch (qtype) + { + case THD::MYSQL_QUERY_TYPE: + /* + Using this query type is a conveniece hack, since we have been + moving back and forth between using RBR for replication of + system tables and not using it. + + Make sure to change in check_table_binlog_row_based() according + to how you treat this. + */ + case THD::ROW_QUERY_TYPE: + if (binlog_row_based) + DBUG_RETURN(binlog_flush_pending_rows_event(true)); + /* Otherwise, we fall through */ + case THD::STMT_QUERY_TYPE: + /* + Most callers of binlog_query() ignore the error code, assuming + that the statement will always be written to the binlog. In + case of error above, we therefore just continue and write the + statement to the binary log. + */ + { + Query_log_event qinfo(this, query, query_len, is_trans, suppress_use); + qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F; + DBUG_RETURN(mysql_bin_log.write(&qinfo)); + } + break; + + case THD::QUERY_TYPE_COUNT: + default: + DBUG_ASSERT(0 <= qtype && qtype < QUERY_TYPE_COUNT); + } + DBUG_RETURN(0); +} + +#endif /* !defined(MYSQL_CLIENT) */ |