summaryrefslogtreecommitdiff
path: root/sql/sql_class.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r--sql/sql_class.cc451
1 files changed, 449 insertions, 2 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index a28324c5e28..08d89228a72 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -27,6 +27,8 @@
#endif
#include "mysql_priv.h"
+#include <my_bitmap.h>
+#include "log_event.h"
#include <m_ctype.h>
#include <sys/stat.h>
#include <thr_alarm.h>
@@ -174,7 +176,7 @@ Open_tables_state::Open_tables_state(ulong version_arg)
THD::THD()
:Statement(CONVENTIONAL_EXECUTION, 0, ALLOC_ROOT_MIN_BLOCK_SIZE, 0),
- Open_tables_state(refresh_version),
+ Open_tables_state(refresh_version), rli_fake(0),
lock_id(&main_lock_id),
user_time(0), in_sub_stmt(0), global_read_lock(0), is_fatal_error(0),
rand_used(0), time_zone_used(0),
@@ -227,6 +229,9 @@ THD::THD()
ull=0;
system_thread= cleanup_done= abort_on_warning= no_warnings_for_error= 0;
peer_port= 0; // For SHOW PROCESSLIST
+#ifdef HAVE_ROW_BASED_REPLICATION
+ transaction.m_pending_rows_event= 0;
+#endif
#ifdef __WIN__
real_id = 0;
#endif
@@ -440,6 +445,11 @@ THD::~THD()
#ifndef DBUG_OFF
dbug_sentry= THD_SENTRY_GONE;
#endif
+#ifndef EMBEDDED_LIBRARY
+ if (rli_fake)
+ delete rli_fake;
+#endif
+
DBUG_VOID_RETURN;
}
@@ -1959,7 +1969,8 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup,
backup->client_capabilities= client_capabilities;
backup->savepoints= transaction.savepoints;
- if (!lex->requires_prelocking() || is_update_query(lex->sql_command))
+ if ((!lex->requires_prelocking() || is_update_query(lex->sql_command)) &&
+ !binlog_row_based)
options&= ~OPTION_BIN_LOG;
/* Disable result sets */
client_capabilities &= ~CLIENT_MULTI_RESULTS;
@@ -2101,3 +2112,439 @@ void xid_cache_delete(XID_STATE *xid_state)
pthread_mutex_unlock(&LOCK_xid_cache);
}
+/*
+ Implementation of interface to write rows to the binary log through the
+ thread. The thread is responsible for writing the rows it has
+ inserted/updated/deleted.
+*/
+
+#ifndef MYSQL_CLIENT
+#ifdef HAVE_ROW_BASED_REPLICATION
+
+/*
+ Template member function for ensuring that there is an rows log
+ event of the apropriate type before proceeding.
+
+ PRE CONDITION:
+ - Events of type 'RowEventT' have the type code 'type_code'.
+
+ POST CONDITION:
+ If a non-NULL pointer is returned, the pending event for thread 'thd' will
+ be an event of type 'RowEventT' (which have the type code 'type_code')
+ will either empty or have enough space to hold 'needed' bytes. In
+ addition, the columns bitmap will be correct for the row, meaning that
+ the pending event will be flushed if the columns in the event differ from
+ the columns suppled to the function.
+
+ RETURNS
+ If no error, a non-NULL pending event (either one which already existed or
+ the newly created one).
+ If error, NULL.
+ */
+
+template <class RowsEventT> Rows_log_event*
+THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
+ MY_BITMAP const* cols,
+ my_size_t colcnt,
+ my_size_t needed,
+ bool is_transactional)
+{
+ /* Pre-conditions */
+ DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
+
+ /* Fetch the type code for the RowsEventT template parameter */
+ int const type_code= RowsEventT::TYPE_CODE;
+
+ /*
+ There is no good place to set up the transactional data, so we
+ have to do it here.
+ */
+ if (binlog_setup_trx_data())
+ return NULL;
+
+ Rows_log_event* pending= binlog_get_pending_rows_event();
+
+ if (unlikely(pending && !pending->is_valid()))
+ return NULL;
+
+ /*
+ Check if the current event is non-NULL and a write-rows
+ event. Also check if the table provided is mapped: if it is not,
+ then we have switched to writing to a new table.
+ If there is no pending event, we need to create one. If there is a pending
+ event, but it's not about the same table id, or not of the same type
+ (between Write, Update and Delete), or not the same affected columns, or
+ going to be too big, flush this event to disk and create a new pending
+ event.
+ */
+ if (!pending ||
+ pending->server_id != serv_id ||
+ pending->get_table_id() != table->s->table_map_id ||
+ pending->get_type_code() != type_code ||
+ pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
+ pending->get_width() != colcnt ||
+ !bitmap_cmp(pending->get_cols(), cols))
+ {
+ /* Create a new RowsEventT... */
+ Rows_log_event* const
+ ev= new RowsEventT(this, table, table->s->table_map_id, cols,
+ is_transactional);
+ if (unlikely(!ev))
+ return NULL;
+ ev->server_id= serv_id; // I don't like this, it's too easy to forget.
+ /*
+ flush the pending event and replace it with the newly created
+ event...
+ */
+ if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
+ {
+ delete ev;
+ return NULL;
+ }
+
+ return ev; /* This is the new pending event */
+ }
+ return pending; /* This is the current pending event */
+}
+
+/*
+ Instansiate the versions we need, we have -fno-implicit-template as
+ compiling option.
+*/
+template Rows_log_event*
+THD::binlog_prepare_pending_rows_event<Write_rows_log_event>
+(TABLE*, uint32, MY_BITMAP const*, my_size_t colcnt, size_t, bool);
+
+template Rows_log_event*
+THD::binlog_prepare_pending_rows_event<Delete_rows_log_event>
+(TABLE*, uint32, MY_BITMAP const*, my_size_t colcnt, size_t, bool);
+
+template Rows_log_event*
+THD::binlog_prepare_pending_rows_event<Update_rows_log_event>
+(TABLE*, uint32, MY_BITMAP const*, my_size_t colcnt, size_t, bool);
+
+static char const*
+field_type_name(enum_field_types type)
+{
+ switch (type)
+ {
+ case MYSQL_TYPE_DECIMAL:
+ return "MYSQL_TYPE_DECIMAL";
+ case MYSQL_TYPE_TINY:
+ return "MYSQL_TYPE_TINY";
+ case MYSQL_TYPE_SHORT:
+ return "MYSQL_TYPE_SHORT";
+ case MYSQL_TYPE_LONG:
+ return "MYSQL_TYPE_LONG";
+ case MYSQL_TYPE_FLOAT:
+ return "MYSQL_TYPE_FLOAT";
+ case MYSQL_TYPE_DOUBLE:
+ return "MYSQL_TYPE_DOUBLE";
+ case MYSQL_TYPE_NULL:
+ return "MYSQL_TYPE_NULL";
+ case MYSQL_TYPE_TIMESTAMP:
+ return "MYSQL_TYPE_TIMESTAMP";
+ case MYSQL_TYPE_LONGLONG:
+ return "MYSQL_TYPE_LONGLONG";
+ case MYSQL_TYPE_INT24:
+ return "MYSQL_TYPE_INT24";
+ case MYSQL_TYPE_DATE:
+ return "MYSQL_TYPE_DATE";
+ case MYSQL_TYPE_TIME:
+ return "MYSQL_TYPE_TIME";
+ case MYSQL_TYPE_DATETIME:
+ return "MYSQL_TYPE_DATETIME";
+ case MYSQL_TYPE_YEAR:
+ return "MYSQL_TYPE_YEAR";
+ case MYSQL_TYPE_NEWDATE:
+ return "MYSQL_TYPE_NEWDATE";
+ case MYSQL_TYPE_VARCHAR:
+ return "MYSQL_TYPE_VARCHAR";
+ case MYSQL_TYPE_BIT:
+ return "MYSQL_TYPE_BIT";
+ case MYSQL_TYPE_NEWDECIMAL:
+ return "MYSQL_TYPE_NEWDECIMAL";
+ case MYSQL_TYPE_ENUM:
+ return "MYSQL_TYPE_ENUM";
+ case MYSQL_TYPE_SET:
+ return "MYSQL_TYPE_SET";
+ case MYSQL_TYPE_TINY_BLOB:
+ return "MYSQL_TYPE_TINY_BLOB";
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ return "MYSQL_TYPE_MEDIUM_BLOB";
+ case MYSQL_TYPE_LONG_BLOB:
+ return "MYSQL_TYPE_LONG_BLOB";
+ case MYSQL_TYPE_BLOB:
+ return "MYSQL_TYPE_BLOB";
+ case MYSQL_TYPE_VAR_STRING:
+ return "MYSQL_TYPE_VAR_STRING";
+ case MYSQL_TYPE_STRING:
+ return "MYSQL_TYPE_STRING";
+ case MYSQL_TYPE_GEOMETRY:
+ return "MYSQL_TYPE_GEOMETRY";
+ }
+ return "Unknown";
+}
+
+my_size_t THD::max_row_length_blob(TABLE *table, const byte *data) const
+{
+ my_size_t length= 0;
+ TABLE_SHARE *table_s= table->s;
+ uint* const beg= table_s->blob_field;
+ uint* const end= beg + table_s->blob_fields;
+
+ for (uint *ptr= beg ; ptr != end ; ++ptr)
+ {
+ Field_blob* const blob= (Field_blob*) table->field[*ptr];
+ length+= blob->get_length(data + blob->offset()) + 2;
+ }
+
+ return length;
+}
+
+my_size_t THD::pack_row(TABLE *table, MY_BITMAP const* cols, byte *row_data,
+ const byte *record) const
+{
+ Field **p_field= table->field, *field= *p_field;
+ int n_null_bytes= table->s->null_bytes;
+ my_ptrdiff_t const offset= record - (byte*) table->record[0];
+
+ memcpy(row_data, record, n_null_bytes);
+ byte *ptr= row_data+n_null_bytes;
+
+ for (int i= 0 ; field ; i++, p_field++, field= *p_field)
+ {
+ if (bitmap_is_set(cols,i))
+ ptr= field->pack(ptr, field->ptr + offset);
+ }
+
+ /*
+ my_ptrdiff_t is signed, size_t is unsigned. Assert that the
+ conversion will work correctly.
+ */
+ DBUG_ASSERT(ptr - row_data >= 0);
+ return (static_cast<size_t>(ptr - row_data));
+}
+
+int THD::binlog_write_row(TABLE* table, bool is_trans,
+ MY_BITMAP const* cols, my_size_t colcnt,
+ byte const *record)
+{
+ DBUG_ASSERT(binlog_row_based && mysql_bin_log.is_open());
+
+ /*
+ Pack records into format for transfer. We are allocating more
+ memory than needed, but that doesn't matter.
+ */
+ bool error= 0;
+ byte *row_data= table->write_row_record;
+ my_size_t const max_len= max_row_length(table, record);
+
+ /*
+ * Allocate room for a row (if needed)
+ */
+ if (!row_data)
+ {
+ if (!table->s->blob_fields)
+ {
+ /* multiply max_len by 2 so it can be used for update_row as well */
+ table->write_row_record= alloc_root(&table->mem_root, 2*max_len);
+ if (!table->write_row_record)
+ return HA_ERR_OUT_OF_MEM;
+ row_data= table->write_row_record;
+ }
+ else if (unlikely(!(row_data= my_malloc(max_len, MYF(MY_WME)))))
+ return HA_ERR_OUT_OF_MEM;
+ }
+ my_size_t const len= pack_row(table, cols, row_data, record);
+
+ Rows_log_event* const
+ ev= binlog_prepare_pending_rows_event<Write_rows_log_event>
+ (table, server_id, cols, colcnt, len, is_trans);
+
+ /* add_row_data copies row_data to internal buffer */
+ error= likely(ev != 0) ? ev->add_row_data(row_data,len) : HA_ERR_OUT_OF_MEM ;
+
+ if (table->write_row_record == 0)
+ my_free(row_data, MYF(MY_WME));
+
+ return error;
+}
+
+int THD::binlog_update_row(TABLE* table, bool is_trans,
+ MY_BITMAP const* cols, my_size_t colcnt,
+ const byte *before_record,
+ const byte *after_record)
+{
+ DBUG_ASSERT(binlog_row_based && mysql_bin_log.is_open());
+
+ bool error= 0;
+ my_size_t const before_maxlen = max_row_length(table, before_record);
+ my_size_t const after_maxlen = max_row_length(table, after_record);
+
+ byte *row_data= table->write_row_record;
+ byte *before_row, *after_row;
+ if (row_data != 0)
+ {
+ before_row= row_data;
+ after_row= before_row + before_maxlen;
+ }
+ else
+ {
+ if (unlikely(!(row_data= my_multi_malloc(MYF(MY_WME),
+ &before_row, before_maxlen,
+ &after_row, after_maxlen,
+ NULL))))
+ return HA_ERR_OUT_OF_MEM;
+ }
+
+ my_size_t const before_size= pack_row(table, cols, before_row,
+ before_record);
+ my_size_t const after_size= pack_row(table, cols, after_row,
+ after_record);
+
+ Rows_log_event* const
+ ev= binlog_prepare_pending_rows_event<Update_rows_log_event>
+ (table, server_id, cols, colcnt, before_size + after_size, is_trans);
+
+ error= (unlikely(!ev)) || ev->add_row_data(before_row, before_size) ||
+ ev->add_row_data(after_row, after_size);
+
+ if (!table->write_row_record)
+ {
+ /* add_row_data copies row_data to internal buffer */
+ my_free(row_data, MYF(MY_WME));
+ }
+
+ return error;
+}
+
+int THD::binlog_delete_row(TABLE* table, bool is_trans,
+ MY_BITMAP const* cols, my_size_t colcnt,
+ byte const *record)
+{
+ DBUG_ASSERT(binlog_row_based && mysql_bin_log.is_open());
+
+ /*
+ Pack records into format for transfer. We are allocating more
+ memory than needed, but that doesn't matter.
+ */
+ bool error= 0;
+ my_size_t const max_len= max_row_length(table, record);
+ byte *row_data= table->write_row_record;
+ if (!row_data && unlikely(!(row_data= my_malloc(max_len, MYF(MY_WME)))))
+ return HA_ERR_OUT_OF_MEM;
+ my_size_t const len= pack_row(table, cols, row_data, record);
+
+ Rows_log_event* const
+ ev= binlog_prepare_pending_rows_event<Delete_rows_log_event>
+ (table, server_id, cols, colcnt, len, is_trans);
+
+ error= (unlikely(!ev)) || ev->add_row_data(row_data, len);
+
+ /* add_row_data copies row_data */
+ if (table->write_row_record == 0)
+ my_free(row_data, MYF(MY_WME));
+
+ return error;
+}
+
+
+int THD::binlog_flush_pending_rows_event(bool stmt_end)
+{
+ DBUG_ENTER("THD::binlog_flush_pending_rows_event");
+ if (!binlog_row_based || !mysql_bin_log.is_open())
+ DBUG_RETURN(0);
+
+ /*
+ Mark the event as the last event of a statement if the stmt_end
+ flag is set.
+ */
+ int error= 0;
+ if (Rows_log_event *pending= binlog_get_pending_rows_event())
+ {
+ if (stmt_end)
+ {
+ pending->set_flags(Rows_log_event::STMT_END_F);
+ pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
+ }
+
+ /*
+ We only bother to set the pending event if it is non-NULL. This
+ is essential for correctness, since there is not necessarily a
+ trx_data created for the thread if the pending event is NULL.
+ */
+ error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
+ }
+
+ DBUG_RETURN(error);
+}
+
+
+void THD::binlog_delete_pending_rows_event()
+{
+ if (Rows_log_event *pending= binlog_get_pending_rows_event())
+ {
+ delete pending;
+ binlog_set_pending_rows_event(0);
+ }
+}
+
+#endif /* HAVE_ROW_BASED_REPLICATION */
+
+/*
+ Member function that will log query, either row-based or
+ statement-based depending on the value of the 'binlog_row_based'
+ variable and the value of the 'qtype' flag.
+
+ This function should be called after the all calls to ha_*_row()
+ functions have been issued, but before tables are unlocked and
+ closed.
+
+ RETURN VALUE
+ Error code, or 0 if no error.
+*/
+int THD::binlog_query(THD::enum_binlog_query_type qtype,
+ char const *query, ulong query_len,
+ bool is_trans, bool suppress_use)
+{
+ DBUG_ENTER("THD::binlog_query");
+ DBUG_ASSERT(query && mysql_bin_log.is_open());
+ int error= binlog_flush_pending_rows_event(true);
+ switch (qtype)
+ {
+ case THD::MYSQL_QUERY_TYPE:
+ /*
+ Using this query type is a conveniece hack, since we have been
+ moving back and forth between using RBR for replication of
+ system tables and not using it.
+
+ Make sure to change in check_table_binlog_row_based() according
+ to how you treat this.
+ */
+ case THD::ROW_QUERY_TYPE:
+ if (binlog_row_based)
+ DBUG_RETURN(binlog_flush_pending_rows_event(true));
+ /* Otherwise, we fall through */
+ case THD::STMT_QUERY_TYPE:
+ /*
+ Most callers of binlog_query() ignore the error code, assuming
+ that the statement will always be written to the binlog. In
+ case of error above, we therefore just continue and write the
+ statement to the binary log.
+ */
+ {
+ Query_log_event qinfo(this, query, query_len, is_trans, suppress_use);
+ qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
+ DBUG_RETURN(mysql_bin_log.write(&qinfo));
+ }
+ break;
+
+ case THD::QUERY_TYPE_COUNT:
+ default:
+ DBUG_ASSERT(0 <= qtype && qtype < QUERY_TYPE_COUNT);
+ }
+ DBUG_RETURN(0);
+}
+
+#endif /* !defined(MYSQL_CLIENT) */