summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc250
1 files changed, 194 insertions, 56 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 16290c58685..d45fa1e1ba7 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -840,10 +840,9 @@ Log_event::do_shall_skip(Relay_log_info *rli)
if ((server_id == ::server_id && !rli->replicate_same_server_id) ||
(rli->slave_skip_counter == 1 && rli->is_in_group()))
return EVENT_SKIP_IGNORE;
- else if (rli->slave_skip_counter > 0)
+ if (rli->slave_skip_counter > 0)
return EVENT_SKIP_COUNT;
- else
- return EVENT_SKIP_NOT;
+ return EVENT_SKIP_NOT;
}
@@ -1140,7 +1139,7 @@ failed my_b_read"));
goto err;
}
if ((res= read_log_event(buf, data_len, &error, description_event)))
- res->register_temp_buf(buf);
+ res->register_temp_buf(buf, TRUE);
err:
UNLOCK_MUTEX;
@@ -1655,11 +1654,11 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr,
int i, end;
char buff[512], *pos;
pos= buff;
- pos+= sprintf(buff, "%s", dec.sign() ? "-" : "");
+ pos+= my_sprintf(buff, (buff, "%s", dec.sign() ? "-" : ""));
end= ROUND_UP(dec.frac) + ROUND_UP(dec.intg)-1;
for (i=0; i < end; i++)
- pos+= sprintf(pos, "%09d.", dec.buf[i]);
- pos+= sprintf(pos, "%09d", dec.buf[i]);
+ pos+= my_sprintf(pos, (pos, "%09d.", dec.buf[i]));
+ pos+= my_sprintf(pos, (pos, "%09d", dec.buf[i]));
my_b_printf(file, "%s", buff);
my_snprintf(typestr, typestr_length, "DECIMAL(%d,%d)",
precision, decimals);
@@ -1708,13 +1707,13 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr,
case MYSQL_TYPE_DATETIME:
{
- size_t d, t;
+ ulong d, t;
uint64 i64= uint8korr(ptr); /* YYYYMMDDhhmmss */
- d= i64 / 1000000;
- t= i64 % 1000000;
+ d= (ulong) (i64 / 1000000);
+ t= (ulong) (i64 % 1000000);
my_b_printf(file, "%04d-%02d-%02d %02d:%02d:%02d",
- d / 10000, (d % 10000) / 100, d % 100,
- t / 10000, (t % 10000) / 100, t % 100);
+ (int) (d / 10000), (int) (d % 10000) / 100, (int) (d % 100),
+ (int) (t / 10000), (int) (t % 10000) / 100, (int) t % 100);
my_snprintf(typestr, typestr_length, "DATETIME");
return 8;
}
@@ -2165,7 +2164,7 @@ static void write_str_with_code_and_len(uchar **dst, const char *src,
*/
DBUG_ASSERT(len <= 255);
DBUG_ASSERT(src);
- *((*dst)++)= code;
+ *((*dst)++)= (uchar) code;
*((*dst)++)= (uchar) len;
bmove(*dst, src, len);
(*dst)+= len;
@@ -4755,7 +4754,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
*/
lex_start(thd);
thd->lex->local_file= local_fname;
- mysql_reset_thd_for_next_command(thd);
+ mysql_reset_thd_for_next_command(thd, 0);
if (!use_rli_only_for_errors)
{
@@ -5628,8 +5627,9 @@ void User_var_log_event::pack_info(Protocol* protocol)
}
break;
case ROW_RESULT:
+ case IMPOSSIBLE_RESULT:
default:
- DBUG_ASSERT(1);
+ DBUG_ASSERT(0);
return;
}
}
@@ -5743,8 +5743,9 @@ bool User_var_log_event::write(IO_CACHE* file)
pos= (uchar*) val;
break;
case ROW_RESULT:
+ case IMPOSSIBLE_RESULT:
default:
- DBUG_ASSERT(1);
+ DBUG_ASSERT(0);
return 0;
}
int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
@@ -5864,7 +5865,7 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
break;
case ROW_RESULT:
default:
- DBUG_ASSERT(1);
+ DBUG_ASSERT(0);
return;
}
}
@@ -5926,8 +5927,9 @@ int User_var_log_event::do_apply_event(Relay_log_info const *rli)
it= new Item_string(val, val_len, charset);
break;
case ROW_RESULT:
+ case IMPOSSIBLE_RESULT:
default:
- DBUG_ASSERT(1);
+ DBUG_ASSERT(0);
return 0;
}
}
@@ -6368,7 +6370,7 @@ void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
void Create_file_log_event::pack_info(Protocol *protocol)
{
- char buf[NAME_LEN*2 + 30 + 21*2], *pos;
+ char buf[SAFE_NAME_LEN*2 + 30 + 21*2], *pos;
pos= strmov(buf, "db=");
memcpy(pos, db, db_len);
pos= strmov(pos + db_len, ";table=");
@@ -6554,9 +6556,10 @@ void Append_block_log_event::print(FILE* file,
void Append_block_log_event::pack_info(Protocol *protocol)
{
char buf[256];
- size_t length;
- length= my_snprintf(buf, sizeof(buf), ";file_id=%u;block_len=%u",
- file_id, block_len);
+ uint length;
+ length= (uint) my_sprintf(buf,
+ (buf, ";file_id=%u;block_len=%u", file_id,
+ block_len));
protocol->store(buf, length, &my_charset_bin);
}
@@ -6591,7 +6594,7 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
as the present method does not call mysql_parse().
*/
lex_start(thd);
- mysql_reset_thd_for_next_command(thd);
+ mysql_reset_thd_for_next_command(thd, 0);
/* old copy may exist already */
mysql_file_delete(key_file_log_event_data, fname, MYF(0));
if ((fd= mysql_file_create(key_file_log_event_data,
@@ -6711,9 +6714,9 @@ void Delete_file_log_event::print(FILE* file,
void Delete_file_log_event::pack_info(Protocol *protocol)
{
char buf[64];
- size_t length;
- length= my_snprintf(buf, sizeof(buf), ";file_id=%u", (uint) file_id);
- protocol->store(buf, length, &my_charset_bin);
+ uint length;
+ length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id));
+ protocol->store(buf, (int32) length, &my_charset_bin);
}
#endif
@@ -6809,9 +6812,9 @@ void Execute_load_log_event::print(FILE* file,
void Execute_load_log_event::pack_info(Protocol *protocol)
{
char buf[64];
- size_t length;
- length= my_snprintf(buf, sizeof(buf), ";file_id=%u", (uint) file_id);
- protocol->store(buf, length, &my_charset_bin);
+ uint length;
+ length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id));
+ protocol->store(buf, (int32) length, &my_charset_bin);
}
@@ -7445,7 +7448,7 @@ int Rows_log_event::do_add_row_data(uchar *row_data, size_t length)
Don't print debug messages when running valgrind since they can
trigger false warnings.
*/
-#ifndef HAVE_purify
+#ifndef HAVE_valgrind
DBUG_DUMP("row_data", row_data, min(length, 32));
#endif
@@ -7537,7 +7540,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
we need to do any changes to that value after this function.
*/
lex_start(thd);
- mysql_reset_thd_for_next_command(thd);
+ mysql_reset_thd_for_next_command(thd, 0);
/*
The current statement is just about to begin and
has not yet modified anything. Note, all.modified is reset
@@ -8242,7 +8245,7 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
Don't print debug messages when running valgrind since they can
trigger false warnings.
*/
-#ifndef HAVE_purify
+#ifndef HAVE_valgrind
DBUG_DUMP("event buffer", (uchar*) buf, event_len);
#endif
@@ -8330,6 +8333,111 @@ Table_map_log_event::~Table_map_log_event()
my_free(m_memory);
}
+
+#ifdef MYSQL_CLIENT
+
+/*
+ Rewrite database name for the event to name specified by new_db
+ SYNOPSIS
+ new_db Database name to change to
+ new_len Length
+ desc Event describing binlog that we're writing to.
+
+ DESCRIPTION
+ Reset db name. This function assumes that temp_buf member contains event
+ representation taken from a binary log. It resets m_dbnam and m_dblen and
+ rewrites temp_buf with new db name.
+
+ RETURN
+ 0 - Success
+ other - Error
+*/
+
+int Table_map_log_event::rewrite_db(const char* new_db, size_t new_len,
+ const Format_description_log_event* desc)
+{
+ DBUG_ENTER("Table_map_log_event::rewrite_db");
+ DBUG_ASSERT(temp_buf);
+
+ uint header_len= min(desc->common_header_len,
+ LOG_EVENT_MINIMAL_HEADER_LEN) + TABLE_MAP_HEADER_LEN;
+ int len_diff;
+
+ if (!(len_diff= new_len - m_dblen))
+ {
+ memcpy((void*) (temp_buf + header_len + 1), new_db, m_dblen + 1);
+ memcpy((void*) m_dbnam, new_db, m_dblen + 1);
+ DBUG_RETURN(0);
+ }
+
+ // Create new temp_buf
+ ulong event_cur_len= uint4korr(temp_buf + EVENT_LEN_OFFSET);
+ ulong event_new_len= event_cur_len + len_diff;
+ char* new_temp_buf= (char*) my_malloc(event_new_len, MYF(MY_WME));
+
+ if (!new_temp_buf)
+ {
+ sql_print_error("Table_map_log_event::rewrite_db: "
+ "failed to allocate new temp_buf (%d bytes required)",
+ event_new_len);
+ DBUG_RETURN(-1);
+ }
+
+ // Rewrite temp_buf
+ char* ptr= new_temp_buf;
+ ulong cnt= 0;
+
+ // Copy header and change event length
+ memcpy(ptr, temp_buf, header_len);
+ int4store(ptr + EVENT_LEN_OFFSET, event_new_len);
+ ptr += header_len;
+ cnt += header_len;
+
+ // Write new db name length and new name
+ *ptr++ = new_len;
+ memcpy(ptr, new_db, new_len + 1);
+ ptr += new_len + 1;
+ cnt += m_dblen + 2;
+
+ // Copy rest part
+ memcpy(ptr, temp_buf + cnt, event_cur_len - cnt);
+
+ // Reregister temp buf
+ free_temp_buf();
+ register_temp_buf(new_temp_buf, TRUE);
+
+ // Reset m_dbnam and m_dblen members
+ m_dblen= new_len;
+
+ // m_dbnam resides in m_memory together with m_tblnam and m_coltype
+ uchar* memory= m_memory;
+ char const* tblnam= m_tblnam;
+ uchar* coltype= m_coltype;
+
+ m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
+ &m_dbnam, (uint) m_dblen + 1,
+ &m_tblnam, (uint) m_tbllen + 1,
+ &m_coltype, (uint) m_colcnt,
+ NullS);
+
+ if (!m_memory)
+ {
+ sql_print_error("Table_map_log_event::rewrite_db: "
+ "failed to allocate new m_memory (%d + %d + %d bytes required)",
+ m_dblen + 1, m_tbllen + 1, m_colcnt);
+ DBUG_RETURN(-1);
+ }
+
+ memcpy((void*)m_dbnam, new_db, m_dblen + 1);
+ memcpy((void*)m_tblnam, tblnam, m_tbllen + 1);
+ memcpy(m_coltype, coltype, m_colcnt);
+
+ my_free(memory, MYF(MY_WME));
+ DBUG_RETURN(0);
+}
+#endif /* MYSQL_CLIENT */
+
+
/*
Return value is an error code, one of:
@@ -8805,7 +8913,7 @@ Rows_log_event::write_row(const Relay_log_info *const rli,
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
{
DBUG_PRINT("info",("Locating offending record using rnd_pos()"));
- error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
+ error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref);
if (error)
{
DBUG_PRINT("info",("rnd_pos() returns error %d",error));
@@ -8837,10 +8945,10 @@ Rows_log_event::write_row(const Relay_log_info *const rli,
key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
0);
- error= table->file->index_read_idx_map(table->record[1], keynum,
- (const uchar*)key.get(),
- HA_WHOLE_KEY,
- HA_READ_KEY_EXACT);
+ error= table->file->ha_index_read_idx_map(table->record[1], keynum,
+ (const uchar*)key.get(),
+ HA_WHOLE_KEY,
+ HA_READ_KEY_EXACT);
if (error)
{
DBUG_PRINT("info",("index_read_idx() returns %s", HA_ERR(error)));
@@ -9062,10 +9170,10 @@ record_compare_exit:
/**
Locate the current row in event's table.
- The current row is pointed by @c m_curr_row. Member @c m_width tells how many
- columns are there in the row (this can be differnet from the number of columns
- in the table). It is assumed that event's table is already open and pointed
- by @c m_table.
+ The current row is pointed by @c m_curr_row. Member @c m_width tells
+ how many columns are there in the row (this can be differnet from
+ the number of columns in the table). It is assumed that event's
+ table is already open and pointed by @c m_table.
If a corresponding record is found in the table it is stored in
@c m_table->record[0]. Note that when record is located based on a primary
@@ -9126,13 +9234,14 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
length. Something along these lines should work:
ADD>>> store_record(table,record[1]);
- int error= table->file->rnd_pos(table->record[0], table->file->ref);
+ int error= table->file->ha_rnd_pos(table->record[0],
+ table->file->ref);
ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0],
table->s->reclength) == 0);
*/
DBUG_PRINT("info",("locating record using primary key (position)"));
- int error= table->file->rnd_pos_by_record(table->record[0]);
+ int error= table->file->ha_rnd_pos_by_record(table->record[0]);
if (error)
{
DBUG_PRINT("info",("rnd_pos returns error %d",error));
@@ -9178,7 +9287,7 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
Don't print debug messages when running valgrind since they can
trigger false warnings.
*/
-#ifndef HAVE_purify
+#ifndef HAVE_valgrind
DBUG_DUMP("key data", m_key, table->key_info->key_length);
#endif
@@ -9192,9 +9301,9 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
table->record[0][table->s->null_bytes - 1]|=
256U - (1U << table->s->last_null_bit_pos);
- if ((error= table->file->index_read_map(table->record[0], m_key,
- HA_WHOLE_KEY,
- HA_READ_KEY_EXACT)))
+ if ((error= table->file->ha_index_read_map(table->record[0], m_key,
+ HA_WHOLE_KEY,
+ HA_READ_KEY_EXACT)))
{
DBUG_PRINT("info",("no record matching the key found in the table"));
if (error == HA_ERR_RECORD_DELETED)
@@ -9208,7 +9317,7 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
Don't print debug messages when running valgrind since they can
trigger false warnings.
*/
-#ifndef HAVE_purify
+#ifndef HAVE_valgrind
DBUG_PRINT("info",("found first matching record"));
DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
#endif
@@ -9283,7 +9392,7 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
256U - (1U << table->s->last_null_bit_pos);
}
- while ((error= table->file->index_next(table->record[0])))
+ while ((error= table->file->ha_index_next(table->record[0])))
{
/* We just skip records that has already been deleted */
if (error == HA_ERR_RECORD_DELETED)
@@ -9307,11 +9416,10 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
int restart_count= 0; // Number of times scanning has restarted from top
/* We don't have a key: search the table using rnd_next() */
- if ((error= table->file->ha_rnd_init(1)))
+ if ((error= table->file->ha_rnd_init_with_error(1)))
{
DBUG_PRINT("info",("error initializing table scan"
" (ha_rnd_init returns %d)",error));
- table->file->print_error(error, MYF(0));
goto err;
}
@@ -9319,7 +9427,7 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
do
{
restart_rnd_next:
- error= table->file->rnd_next(table->record[0]);
+ error= table->file->ha_rnd_next(table->record[0]);
DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
switch (error) {
@@ -9336,7 +9444,14 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
case HA_ERR_END_OF_FILE:
if (++restart_count < 2)
- table->file->ha_rnd_init(1);
+ {
+ int error2;
+ if ((error2= table->file->ha_rnd_init_with_error(1)))
+ {
+ error= error2;
+ goto err;
+ }
+ }
break;
default:
@@ -9602,7 +9717,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
Now we have the right row to update. The old row (the one we're
looking for) is in record[1] and the new row is in record[0].
*/
-#ifndef HAVE_purify
+#ifndef HAVE_valgrind
/*
Don't print debug messages when running valgrind since they can
trigger false warnings.
@@ -9680,7 +9795,6 @@ Incident_log_event::description() const
};
DBUG_PRINT("info", ("m_incident: %d", m_incident));
-
return description[m_incident];
}
@@ -9776,7 +9890,6 @@ st_print_event_info::st_print_event_info()
}
#endif
-
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len,
const Format_description_log_event* description_event)
@@ -9788,3 +9901,28 @@ Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len,
log_ident= buf + header_size;
}
#endif
+
+#if defined(MYSQL_SERVER)
+/*
+ Access to the current replication position.
+
+ There is a dummy replacement for this in the embedded library that returns
+ FALSE; this is used by XtraDB to allow it to access replication stuff while
+ still being able to use the same plugin in both stand-alone and embedded.
+*/
+bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos,
+ const char **group_relay_log_name,
+ ulonglong *relay_log_pos)
+{
+#if defined(EMBEDDED_LIBRARY) || !defined(HAVE_REPLICATION)
+ return FALSE;
+#else
+ const Relay_log_info *rli= &(active_mi->rli);
+ *log_file_name= rli->group_master_log_name;
+ *log_pos= rli->group_master_log_pos +
+ (rli->future_event_relay_log_pos - rli->group_relay_log_pos);
+ *group_relay_log_name= rli->group_relay_log_name;
+ *relay_log_pos= rli->future_event_relay_log_pos;
+ return TRUE;
+#endif
+}