diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 250 |
1 files changed, 194 insertions, 56 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 16290c58685..d45fa1e1ba7 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -840,10 +840,9 @@ Log_event::do_shall_skip(Relay_log_info *rli) if ((server_id == ::server_id && !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group())) return EVENT_SKIP_IGNORE; - else if (rli->slave_skip_counter > 0) + if (rli->slave_skip_counter > 0) return EVENT_SKIP_COUNT; - else - return EVENT_SKIP_NOT; + return EVENT_SKIP_NOT; } @@ -1140,7 +1139,7 @@ failed my_b_read")); goto err; } if ((res= read_log_event(buf, data_len, &error, description_event))) - res->register_temp_buf(buf); + res->register_temp_buf(buf, TRUE); err: UNLOCK_MUTEX; @@ -1655,11 +1654,11 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr, int i, end; char buff[512], *pos; pos= buff; - pos+= sprintf(buff, "%s", dec.sign() ? "-" : ""); + pos+= my_sprintf(buff, (buff, "%s", dec.sign() ? "-" : "")); end= ROUND_UP(dec.frac) + ROUND_UP(dec.intg)-1; for (i=0; i < end; i++) - pos+= sprintf(pos, "%09d.", dec.buf[i]); - pos+= sprintf(pos, "%09d", dec.buf[i]); + pos+= my_sprintf(pos, (pos, "%09d.", dec.buf[i])); + pos+= my_sprintf(pos, (pos, "%09d", dec.buf[i])); my_b_printf(file, "%s", buff); my_snprintf(typestr, typestr_length, "DECIMAL(%d,%d)", precision, decimals); @@ -1708,13 +1707,13 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr, case MYSQL_TYPE_DATETIME: { - size_t d, t; + ulong d, t; uint64 i64= uint8korr(ptr); /* YYYYMMDDhhmmss */ - d= i64 / 1000000; - t= i64 % 1000000; + d= (ulong) (i64 / 1000000); + t= (ulong) (i64 % 1000000); my_b_printf(file, "%04d-%02d-%02d %02d:%02d:%02d", - d / 10000, (d % 10000) / 100, d % 100, - t / 10000, (t % 10000) / 100, t % 100); + (int) (d / 10000), (int) (d % 10000) / 100, (int) (d % 100), + (int) (t / 10000), (int) (t % 10000) / 100, (int) t % 100); my_snprintf(typestr, typestr_length, "DATETIME"); return 8; } @@ -2165,7 +2164,7 @@ static void write_str_with_code_and_len(uchar **dst, const char *src, */ DBUG_ASSERT(len <= 255); DBUG_ASSERT(src); - *((*dst)++)= code; + *((*dst)++)= (uchar) code; *((*dst)++)= (uchar) len; bmove(*dst, src, len); (*dst)+= len; @@ -4755,7 +4754,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, */ lex_start(thd); thd->lex->local_file= local_fname; - mysql_reset_thd_for_next_command(thd); + mysql_reset_thd_for_next_command(thd, 0); if (!use_rli_only_for_errors) { @@ -5628,8 +5627,9 @@ void User_var_log_event::pack_info(Protocol* protocol) } break; case ROW_RESULT: + case IMPOSSIBLE_RESULT: default: - DBUG_ASSERT(1); + DBUG_ASSERT(0); return; } } @@ -5743,8 +5743,9 @@ bool User_var_log_event::write(IO_CACHE* file) pos= (uchar*) val; break; case ROW_RESULT: + case IMPOSSIBLE_RESULT: default: - DBUG_ASSERT(1); + DBUG_ASSERT(0); return 0; } int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len); @@ -5864,7 +5865,7 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) break; case ROW_RESULT: default: - DBUG_ASSERT(1); + DBUG_ASSERT(0); return; } } @@ -5926,8 +5927,9 @@ int User_var_log_event::do_apply_event(Relay_log_info const *rli) it= new Item_string(val, val_len, charset); break; case ROW_RESULT: + case IMPOSSIBLE_RESULT: default: - DBUG_ASSERT(1); + DBUG_ASSERT(0); return 0; } } @@ -6368,7 +6370,7 @@ void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) void Create_file_log_event::pack_info(Protocol *protocol) { - char buf[NAME_LEN*2 + 30 + 21*2], *pos; + char buf[SAFE_NAME_LEN*2 + 30 + 21*2], *pos; pos= strmov(buf, "db="); memcpy(pos, db, db_len); pos= strmov(pos + db_len, ";table="); @@ -6554,9 +6556,10 @@ void Append_block_log_event::print(FILE* file, void Append_block_log_event::pack_info(Protocol *protocol) { char buf[256]; - size_t length; - length= my_snprintf(buf, sizeof(buf), ";file_id=%u;block_len=%u", - file_id, block_len); + uint length; + length= (uint) my_sprintf(buf, + (buf, ";file_id=%u;block_len=%u", file_id, + block_len)); protocol->store(buf, length, &my_charset_bin); } @@ -6591,7 +6594,7 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli) as the present method does not call mysql_parse(). */ lex_start(thd); - mysql_reset_thd_for_next_command(thd); + mysql_reset_thd_for_next_command(thd, 0); /* old copy may exist already */ mysql_file_delete(key_file_log_event_data, fname, MYF(0)); if ((fd= mysql_file_create(key_file_log_event_data, @@ -6711,9 +6714,9 @@ void Delete_file_log_event::print(FILE* file, void Delete_file_log_event::pack_info(Protocol *protocol) { char buf[64]; - size_t length; - length= my_snprintf(buf, sizeof(buf), ";file_id=%u", (uint) file_id); - protocol->store(buf, length, &my_charset_bin); + uint length; + length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id)); + protocol->store(buf, (int32) length, &my_charset_bin); } #endif @@ -6809,9 +6812,9 @@ void Execute_load_log_event::print(FILE* file, void Execute_load_log_event::pack_info(Protocol *protocol) { char buf[64]; - size_t length; - length= my_snprintf(buf, sizeof(buf), ";file_id=%u", (uint) file_id); - protocol->store(buf, length, &my_charset_bin); + uint length; + length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id)); + protocol->store(buf, (int32) length, &my_charset_bin); } @@ -7445,7 +7448,7 @@ int Rows_log_event::do_add_row_data(uchar *row_data, size_t length) Don't print debug messages when running valgrind since they can trigger false warnings. */ -#ifndef HAVE_purify +#ifndef HAVE_valgrind DBUG_DUMP("row_data", row_data, min(length, 32)); #endif @@ -7537,7 +7540,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) we need to do any changes to that value after this function. */ lex_start(thd); - mysql_reset_thd_for_next_command(thd); + mysql_reset_thd_for_next_command(thd, 0); /* The current statement is just about to begin and has not yet modified anything. Note, all.modified is reset @@ -8242,7 +8245,7 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len, Don't print debug messages when running valgrind since they can trigger false warnings. */ -#ifndef HAVE_purify +#ifndef HAVE_valgrind DBUG_DUMP("event buffer", (uchar*) buf, event_len); #endif @@ -8330,6 +8333,111 @@ Table_map_log_event::~Table_map_log_event() my_free(m_memory); } + +#ifdef MYSQL_CLIENT + +/* + Rewrite database name for the event to name specified by new_db + SYNOPSIS + new_db Database name to change to + new_len Length + desc Event describing binlog that we're writing to. + + DESCRIPTION + Reset db name. This function assumes that temp_buf member contains event + representation taken from a binary log. It resets m_dbnam and m_dblen and + rewrites temp_buf with new db name. + + RETURN + 0 - Success + other - Error +*/ + +int Table_map_log_event::rewrite_db(const char* new_db, size_t new_len, + const Format_description_log_event* desc) +{ + DBUG_ENTER("Table_map_log_event::rewrite_db"); + DBUG_ASSERT(temp_buf); + + uint header_len= min(desc->common_header_len, + LOG_EVENT_MINIMAL_HEADER_LEN) + TABLE_MAP_HEADER_LEN; + int len_diff; + + if (!(len_diff= new_len - m_dblen)) + { + memcpy((void*) (temp_buf + header_len + 1), new_db, m_dblen + 1); + memcpy((void*) m_dbnam, new_db, m_dblen + 1); + DBUG_RETURN(0); + } + + // Create new temp_buf + ulong event_cur_len= uint4korr(temp_buf + EVENT_LEN_OFFSET); + ulong event_new_len= event_cur_len + len_diff; + char* new_temp_buf= (char*) my_malloc(event_new_len, MYF(MY_WME)); + + if (!new_temp_buf) + { + sql_print_error("Table_map_log_event::rewrite_db: " + "failed to allocate new temp_buf (%d bytes required)", + event_new_len); + DBUG_RETURN(-1); + } + + // Rewrite temp_buf + char* ptr= new_temp_buf; + ulong cnt= 0; + + // Copy header and change event length + memcpy(ptr, temp_buf, header_len); + int4store(ptr + EVENT_LEN_OFFSET, event_new_len); + ptr += header_len; + cnt += header_len; + + // Write new db name length and new name + *ptr++ = new_len; + memcpy(ptr, new_db, new_len + 1); + ptr += new_len + 1; + cnt += m_dblen + 2; + + // Copy rest part + memcpy(ptr, temp_buf + cnt, event_cur_len - cnt); + + // Reregister temp buf + free_temp_buf(); + register_temp_buf(new_temp_buf, TRUE); + + // Reset m_dbnam and m_dblen members + m_dblen= new_len; + + // m_dbnam resides in m_memory together with m_tblnam and m_coltype + uchar* memory= m_memory; + char const* tblnam= m_tblnam; + uchar* coltype= m_coltype; + + m_memory= (uchar*) my_multi_malloc(MYF(MY_WME), + &m_dbnam, (uint) m_dblen + 1, + &m_tblnam, (uint) m_tbllen + 1, + &m_coltype, (uint) m_colcnt, + NullS); + + if (!m_memory) + { + sql_print_error("Table_map_log_event::rewrite_db: " + "failed to allocate new m_memory (%d + %d + %d bytes required)", + m_dblen + 1, m_tbllen + 1, m_colcnt); + DBUG_RETURN(-1); + } + + memcpy((void*)m_dbnam, new_db, m_dblen + 1); + memcpy((void*)m_tblnam, tblnam, m_tbllen + 1); + memcpy(m_coltype, coltype, m_colcnt); + + my_free(memory, MYF(MY_WME)); + DBUG_RETURN(0); +} +#endif /* MYSQL_CLIENT */ + + /* Return value is an error code, one of: @@ -8805,7 +8913,7 @@ Rows_log_event::write_row(const Relay_log_info *const rli, if (table->file->ha_table_flags() & HA_DUPLICATE_POS) { DBUG_PRINT("info",("Locating offending record using rnd_pos()")); - error= table->file->rnd_pos(table->record[1], table->file->dup_ref); + error= table->file->ha_rnd_pos(table->record[1], table->file->dup_ref); if (error) { DBUG_PRINT("info",("rnd_pos() returns error %d",error)); @@ -8837,10 +8945,10 @@ Rows_log_event::write_row(const Relay_log_info *const rli, key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum, 0); - error= table->file->index_read_idx_map(table->record[1], keynum, - (const uchar*)key.get(), - HA_WHOLE_KEY, - HA_READ_KEY_EXACT); + error= table->file->ha_index_read_idx_map(table->record[1], keynum, + (const uchar*)key.get(), + HA_WHOLE_KEY, + HA_READ_KEY_EXACT); if (error) { DBUG_PRINT("info",("index_read_idx() returns %s", HA_ERR(error))); @@ -9062,10 +9170,10 @@ record_compare_exit: /** Locate the current row in event's table. - The current row is pointed by @c m_curr_row. Member @c m_width tells how many - columns are there in the row (this can be differnet from the number of columns - in the table). It is assumed that event's table is already open and pointed - by @c m_table. + The current row is pointed by @c m_curr_row. Member @c m_width tells + how many columns are there in the row (this can be differnet from + the number of columns in the table). It is assumed that event's + table is already open and pointed by @c m_table. If a corresponding record is found in the table it is stored in @c m_table->record[0]. Note that when record is located based on a primary @@ -9126,13 +9234,14 @@ int Rows_log_event::find_row(const Relay_log_info *rli) length. Something along these lines should work: ADD>>> store_record(table,record[1]); - int error= table->file->rnd_pos(table->record[0], table->file->ref); + int error= table->file->ha_rnd_pos(table->record[0], + table->file->ref); ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0], table->s->reclength) == 0); */ DBUG_PRINT("info",("locating record using primary key (position)")); - int error= table->file->rnd_pos_by_record(table->record[0]); + int error= table->file->ha_rnd_pos_by_record(table->record[0]); if (error) { DBUG_PRINT("info",("rnd_pos returns error %d",error)); @@ -9178,7 +9287,7 @@ int Rows_log_event::find_row(const Relay_log_info *rli) Don't print debug messages when running valgrind since they can trigger false warnings. */ -#ifndef HAVE_purify +#ifndef HAVE_valgrind DBUG_DUMP("key data", m_key, table->key_info->key_length); #endif @@ -9192,9 +9301,9 @@ int Rows_log_event::find_row(const Relay_log_info *rli) table->record[0][table->s->null_bytes - 1]|= 256U - (1U << table->s->last_null_bit_pos); - if ((error= table->file->index_read_map(table->record[0], m_key, - HA_WHOLE_KEY, - HA_READ_KEY_EXACT))) + if ((error= table->file->ha_index_read_map(table->record[0], m_key, + HA_WHOLE_KEY, + HA_READ_KEY_EXACT))) { DBUG_PRINT("info",("no record matching the key found in the table")); if (error == HA_ERR_RECORD_DELETED) @@ -9208,7 +9317,7 @@ int Rows_log_event::find_row(const Relay_log_info *rli) Don't print debug messages when running valgrind since they can trigger false warnings. */ -#ifndef HAVE_purify +#ifndef HAVE_valgrind DBUG_PRINT("info",("found first matching record")); DBUG_DUMP("record[0]", table->record[0], table->s->reclength); #endif @@ -9283,7 +9392,7 @@ int Rows_log_event::find_row(const Relay_log_info *rli) 256U - (1U << table->s->last_null_bit_pos); } - while ((error= table->file->index_next(table->record[0]))) + while ((error= table->file->ha_index_next(table->record[0]))) { /* We just skip records that has already been deleted */ if (error == HA_ERR_RECORD_DELETED) @@ -9307,11 +9416,10 @@ int Rows_log_event::find_row(const Relay_log_info *rli) int restart_count= 0; // Number of times scanning has restarted from top /* We don't have a key: search the table using rnd_next() */ - if ((error= table->file->ha_rnd_init(1))) + if ((error= table->file->ha_rnd_init_with_error(1))) { DBUG_PRINT("info",("error initializing table scan" " (ha_rnd_init returns %d)",error)); - table->file->print_error(error, MYF(0)); goto err; } @@ -9319,7 +9427,7 @@ int Rows_log_event::find_row(const Relay_log_info *rli) do { restart_rnd_next: - error= table->file->rnd_next(table->record[0]); + error= table->file->ha_rnd_next(table->record[0]); DBUG_PRINT("info", ("error: %s", HA_ERR(error))); switch (error) { @@ -9336,7 +9444,14 @@ int Rows_log_event::find_row(const Relay_log_info *rli) case HA_ERR_END_OF_FILE: if (++restart_count < 2) - table->file->ha_rnd_init(1); + { + int error2; + if ((error2= table->file->ha_rnd_init_with_error(1))) + { + error= error2; + goto err; + } + } break; default: @@ -9602,7 +9717,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) Now we have the right row to update. The old row (the one we're looking for) is in record[1] and the new row is in record[0]. */ -#ifndef HAVE_purify +#ifndef HAVE_valgrind /* Don't print debug messages when running valgrind since they can trigger false warnings. @@ -9680,7 +9795,6 @@ Incident_log_event::description() const }; DBUG_PRINT("info", ("m_incident: %d", m_incident)); - return description[m_incident]; } @@ -9776,7 +9890,6 @@ st_print_event_info::st_print_event_info() } #endif - #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len, const Format_description_log_event* description_event) @@ -9788,3 +9901,28 @@ Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len, log_ident= buf + header_size; } #endif + +#if defined(MYSQL_SERVER) +/* + Access to the current replication position. + + There is a dummy replacement for this in the embedded library that returns + FALSE; this is used by XtraDB to allow it to access replication stuff while + still being able to use the same plugin in both stand-alone and embedded. +*/ +bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos, + const char **group_relay_log_name, + ulonglong *relay_log_pos) +{ +#if defined(EMBEDDED_LIBRARY) || !defined(HAVE_REPLICATION) + return FALSE; +#else + const Relay_log_info *rli= &(active_mi->rli); + *log_file_name= rli->group_master_log_name; + *log_pos= rli->group_master_log_pos + + (rli->future_event_relay_log_pos - rli->group_relay_log_pos); + *group_relay_log_name= rli->group_relay_log_name; + *relay_log_pos= rli->future_event_relay_log_pos; + return TRUE; +#endif +} |