summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
authorunknown <mkindahl@dl145h.mysql.com>2008-01-31 17:46:50 +0100
committerunknown <mkindahl@dl145h.mysql.com>2008-01-31 17:46:50 +0100
commit248c752b2a320d62d198043866e973ebcdda436c (patch)
treec327f90b44e25f92a820d5886ba6220cf6d9c2c8 /sql/log_event.cc
parent24cda91887c1a230b4bc70e859879caede1f909e (diff)
parent0a51f0ba27351180209631c52d254bbd2e607a0b (diff)
downloadmariadb-git-248c752b2a320d62d198043866e973ebcdda436c.tar.gz
Merge dl145h.mysql.com:/data0/mkindahl/mysql-5.1
into dl145h.mysql.com:/data0/mkindahl/mysql-5.1-rpl-merge client/client_priv.h: Auto merged include/my_sys.h: Auto merged mysql-test/mysql-test-run.pl: Auto merged mysql-test/lib/mtr_report.pl: Auto merged mysql-test/suite/rpl/t/rpl_err_ignoredtable.test: Auto merged sql/item_cmpfunc.cc: Auto merged sql/log_event.cc: Auto merged sql/mysql_priv.h: Auto merged sql/mysqld.cc: Auto merged sql/set_var.cc: Auto merged sql/set_var.h: Auto merged sql/slave.cc: Auto merged sql/sql_acl.cc: Auto merged sql/sql_class.h: Auto merged sql/sql_parse.cc: Auto merged sql/sql_repl.cc: Auto merged sql/sql_view.cc: Auto merged mysql-test/suite/rpl/r/rpl_invoked_features.result: Manual merge. mysql-test/suite/rpl/t/rpl_invoked_features.test: Manual merge. sql/log.cc: Manual merge.
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc571
1 files changed, 377 insertions, 194 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index cf03dd5bf44..df0d1e8a020 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -36,7 +36,17 @@
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
-#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) && !defined(DBUG_OFF) && !defined(_lint)
+
+/*
+ Size of buffer for printing a double in format %.<PREC>g
+
+ optional '-' + optional zero + '.' + PREC digits + 'e' + sign +
+ exponent digits + '\0'
+*/
+#define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1)
+
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
static const char *HA_ERR(int i)
{
switch (i) {
@@ -90,7 +100,28 @@ static const char *HA_ERR(int i)
case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE";
case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT";
}
- return "<unknown error>";
+ return 0;
+}
+
+/**
+ macro to call from different branches of Rows_log_event::do_apply_event
+*/
+static void inline slave_rows_error_report(enum loglevel level, int ha_error,
+ Relay_log_info const *rli, THD *thd,
+ TABLE *table, const char * type,
+ const char *log_name, ulong pos)
+{
+ const char *handler_error= HA_ERR(ha_error);
+ rli->report(level, thd->net.client_last_errno,
+ "Could not execute %s event on table %s.%s;"
+ "%s%s handler error %s; "
+ "the event's master log %s, end_log_pos %lu",
+ type, table->s->db.str,
+ table->s->table_name.str,
+ thd->net.client_last_error[0] != 0 ? thd->net.client_last_error : "",
+ thd->net.client_last_error[0] != 0 ? ";" : "",
+ handler_error == NULL? "<unknown>" : handler_error,
+ log_name, pos);
}
#endif
@@ -460,9 +491,9 @@ static void print_set_option(IO_CACHE* file, uint32 bits_changed,
returns the human readable name of the event's type
*/
-const char* Log_event::get_type_str()
+const char* Log_event::get_type_str(Log_event_type type)
{
- switch(get_type_code()) {
+ switch(type) {
case START_EVENT_V3: return "Start_v3";
case STOP_EVENT: return "Stop";
case QUERY_EVENT: return "Query";
@@ -480,6 +511,9 @@ const char* Log_event::get_type_str()
case USER_VAR_EVENT: return "User var";
case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
case TABLE_MAP_EVENT: return "Table_map";
+ case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old";
+ case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old";
+ case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old";
case WRITE_ROWS_EVENT: return "Write_rows";
case UPDATE_ROWS_EVENT: return "Update_rows";
case DELETE_ROWS_EVENT: return "Delete_rows";
@@ -490,6 +524,11 @@ const char* Log_event::get_type_str()
}
}
+const char* Log_event::get_type_str()
+{
+ return get_type_str(get_type_code());
+}
+
/*
Log_event::Log_event()
@@ -997,6 +1036,8 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
DBUG_ENTER("Log_event::read_log_event(char*,...)");
DBUG_ASSERT(description_event != 0);
DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version));
+ DBUG_DUMP("data", (unsigned char*) buf, event_len);
+
/* Check the integrity */
if (event_len < EVENT_LEN_OFFSET ||
buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
@@ -1006,94 +1047,134 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
DBUG_RETURN(NULL); // general sanity check - will fail on a partial read
}
- switch(buf[EVENT_TYPE_OFFSET]) {
- case QUERY_EVENT:
- ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
- break;
- case LOAD_EVENT:
- ev = new Load_log_event(buf, event_len, description_event);
- break;
- case NEW_LOAD_EVENT:
- ev = new Load_log_event(buf, event_len, description_event);
- break;
- case ROTATE_EVENT:
- ev = new Rotate_log_event(buf, event_len, description_event);
- break;
+ uint event_type= buf[EVENT_TYPE_OFFSET];
+ if (event_type > description_event->number_of_event_types &&
+ event_type != FORMAT_DESCRIPTION_EVENT)
+ {
+ /*
+ It is unsafe to use the description_event if its post_header_len
+ array does not include the event type.
+ */
+ DBUG_PRINT("error", ("event type %d found, but the current "
+ "Format_description_log_event supports only %d event "
+ "types", event_type,
+ description_event->number_of_event_types));
+ ev= NULL;
+ }
+ else
+ {
+ /*
+ In some previuos versions (see comment in
+ Format_description_log_event::Format_description_log_event(char*,...)),
+ event types were assigned different id numbers than in the
+ present version. In order to replicate from such versions to the
+ present version, we must map those event type id's to our event
+ type id's. The mapping is done with the event_type_permutation
+ array, which was set up when the Format_description_log_event
+ was read.
+ */
+ if (description_event->event_type_permutation)
+ {
+ IF_DBUG({
+ int new_event_type=
+ description_event->event_type_permutation[event_type];
+ DBUG_PRINT("info",
+ ("converting event type %d to %d (%s)",
+ event_type, new_event_type,
+ get_type_str((Log_event_type)new_event_type)));
+ });
+ event_type= description_event->event_type_permutation[event_type];
+ }
+
+ switch(event_type) {
+ case QUERY_EVENT:
+ ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
+ break;
+ case LOAD_EVENT:
+ ev = new Load_log_event(buf, event_len, description_event);
+ break;
+ case NEW_LOAD_EVENT:
+ ev = new Load_log_event(buf, event_len, description_event);
+ break;
+ case ROTATE_EVENT:
+ ev = new Rotate_log_event(buf, event_len, description_event);
+ break;
#ifdef HAVE_REPLICATION
- case SLAVE_EVENT: /* can never happen (unused event) */
- ev = new Slave_log_event(buf, event_len);
- break;
+ case SLAVE_EVENT: /* can never happen (unused event) */
+ ev = new Slave_log_event(buf, event_len);
+ break;
#endif /* HAVE_REPLICATION */
- case CREATE_FILE_EVENT:
- ev = new Create_file_log_event(buf, event_len, description_event);
- break;
- case APPEND_BLOCK_EVENT:
- ev = new Append_block_log_event(buf, event_len, description_event);
- break;
- case DELETE_FILE_EVENT:
- ev = new Delete_file_log_event(buf, event_len, description_event);
- break;
- case EXEC_LOAD_EVENT:
- ev = new Execute_load_log_event(buf, event_len, description_event);
- break;
- case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
- ev = new Start_log_event_v3(buf, description_event);
- break;
- case STOP_EVENT:
- ev = new Stop_log_event(buf, description_event);
- break;
- case INTVAR_EVENT:
- ev = new Intvar_log_event(buf, description_event);
- break;
- case XID_EVENT:
- ev = new Xid_log_event(buf, description_event);
- break;
- case RAND_EVENT:
- ev = new Rand_log_event(buf, description_event);
- break;
- case USER_VAR_EVENT:
- ev = new User_var_log_event(buf, description_event);
- break;
- case FORMAT_DESCRIPTION_EVENT:
- ev = new Format_description_log_event(buf, event_len, description_event);
- break;
+ case CREATE_FILE_EVENT:
+ ev = new Create_file_log_event(buf, event_len, description_event);
+ break;
+ case APPEND_BLOCK_EVENT:
+ ev = new Append_block_log_event(buf, event_len, description_event);
+ break;
+ case DELETE_FILE_EVENT:
+ ev = new Delete_file_log_event(buf, event_len, description_event);
+ break;
+ case EXEC_LOAD_EVENT:
+ ev = new Execute_load_log_event(buf, event_len, description_event);
+ break;
+ case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
+ ev = new Start_log_event_v3(buf, description_event);
+ break;
+ case STOP_EVENT:
+ ev = new Stop_log_event(buf, description_event);
+ break;
+ case INTVAR_EVENT:
+ ev = new Intvar_log_event(buf, description_event);
+ break;
+ case XID_EVENT:
+ ev = new Xid_log_event(buf, description_event);
+ break;
+ case RAND_EVENT:
+ ev = new Rand_log_event(buf, description_event);
+ break;
+ case USER_VAR_EVENT:
+ ev = new User_var_log_event(buf, description_event);
+ break;
+ case FORMAT_DESCRIPTION_EVENT:
+ ev = new Format_description_log_event(buf, event_len, description_event);
+ break;
#if defined(HAVE_REPLICATION)
- case PRE_GA_WRITE_ROWS_EVENT:
- ev = new Write_rows_log_event_old(buf, event_len, description_event);
- break;
- case PRE_GA_UPDATE_ROWS_EVENT:
- ev = new Update_rows_log_event_old(buf, event_len, description_event);
- break;
- case PRE_GA_DELETE_ROWS_EVENT:
- ev = new Delete_rows_log_event_old(buf, event_len, description_event);
- break;
- case WRITE_ROWS_EVENT:
- ev = new Write_rows_log_event(buf, event_len, description_event);
- break;
- case UPDATE_ROWS_EVENT:
- ev = new Update_rows_log_event(buf, event_len, description_event);
- break;
- case DELETE_ROWS_EVENT:
- ev = new Delete_rows_log_event(buf, event_len, description_event);
- break;
- case TABLE_MAP_EVENT:
- ev = new Table_map_log_event(buf, event_len, description_event);
- break;
+ case PRE_GA_WRITE_ROWS_EVENT:
+ ev = new Write_rows_log_event_old(buf, event_len, description_event);
+ break;
+ case PRE_GA_UPDATE_ROWS_EVENT:
+ ev = new Update_rows_log_event_old(buf, event_len, description_event);
+ break;
+ case PRE_GA_DELETE_ROWS_EVENT:
+ ev = new Delete_rows_log_event_old(buf, event_len, description_event);
+ break;
+ case WRITE_ROWS_EVENT:
+ ev = new Write_rows_log_event(buf, event_len, description_event);
+ break;
+ case UPDATE_ROWS_EVENT:
+ ev = new Update_rows_log_event(buf, event_len, description_event);
+ break;
+ case DELETE_ROWS_EVENT:
+ ev = new Delete_rows_log_event(buf, event_len, description_event);
+ break;
+ case TABLE_MAP_EVENT:
+ ev = new Table_map_log_event(buf, event_len, description_event);
+ break;
#endif
- case BEGIN_LOAD_QUERY_EVENT:
- ev = new Begin_load_query_log_event(buf, event_len, description_event);
- break;
- case EXECUTE_LOAD_QUERY_EVENT:
- ev= new Execute_load_query_log_event(buf, event_len, description_event);
- break;
- case INCIDENT_EVENT:
- ev = new Incident_log_event(buf, event_len, description_event);
- break;
- default:
- DBUG_PRINT("error",("Unknown event code: %d",
- (int) buf[EVENT_TYPE_OFFSET]));
- ev= NULL;
- break;
+ case BEGIN_LOAD_QUERY_EVENT:
+ ev = new Begin_load_query_log_event(buf, event_len, description_event);
+ break;
+ case EXECUTE_LOAD_QUERY_EVENT:
+ ev= new Execute_load_query_log_event(buf, event_len, description_event);
+ break;
+ case INCIDENT_EVENT:
+ ev = new Incident_log_event(buf, event_len, description_event);
+ break;
+ default:
+ DBUG_PRINT("error",("Unknown event code: %d",
+ (int) buf[EVENT_TYPE_OFFSET]));
+ ev= NULL;
+ break;
+ }
}
DBUG_PRINT("read_event", ("%s(type_code: %d; event_len: %d)",
@@ -1632,6 +1713,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
DBUG_ASSERT(thd_arg->variables.character_set_client->number < 256*256);
DBUG_ASSERT(thd_arg->variables.collation_connection->number < 256*256);
DBUG_ASSERT(thd_arg->variables.collation_server->number < 256*256);
+ DBUG_ASSERT(thd_arg->variables.character_set_client->mbminlen == 1);
int2store(charset, thd_arg->variables.character_set_client->number);
int2store(charset+2, thd_arg->variables.collation_connection->number);
int2store(charset+4, thd_arg->variables.collation_server->number);
@@ -2135,7 +2217,7 @@ void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
print_query_header(&cache, print_event_info);
my_b_write(&cache, (uchar*) query, q_len);
- my_b_printf(&cache, "%s\n", print_event_info->delimiter);
+ my_b_printf(&cache, "\n%s\n", print_event_info->delimiter);
}
#endif /* MYSQL_CLIENT */
@@ -2580,6 +2662,14 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
my_b_printf(&cache,"ROLLBACK%s\n", print_event_info->delimiter);
#endif
}
+ if (temp_buf &&
+ print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER &&
+ !print_event_info->short_form)
+ {
+ my_b_printf(&cache, "BINLOG '\n");
+ print_base64(&cache, print_event_info, FALSE);
+ print_event_info->printed_fd_event= TRUE;
+ }
DBUG_VOID_RETURN;
}
#endif /* MYSQL_CLIENT */
@@ -2715,7 +2805,7 @@ int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
Format_description_log_event::
Format_description_log_event(uint8 binlog_ver, const char* server_ver)
- :Start_log_event_v3()
+ :Start_log_event_v3(), event_type_permutation(0)
{
binlog_version= binlog_ver;
switch (binlog_ver) {
@@ -2842,7 +2932,7 @@ Format_description_log_event(const char* buf,
const
Format_description_log_event*
description_event)
- :Start_log_event_v3(buf, description_event)
+ :Start_log_event_v3(buf, description_event), event_type_permutation(0)
{
DBUG_ENTER("Format_description_log_event::Format_description_log_event(char*,...)");
buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
@@ -2857,6 +2947,65 @@ Format_description_log_event(const char* buf,
number_of_event_types*
sizeof(*post_header_len), MYF(0));
calc_server_version_split();
+
+ /*
+ In some previous versions, the events were given other event type
+ id numbers than in the present version. When replicating from such
+ a version, we therefore set up an array that maps those id numbers
+ to the id numbers of the present server.
+
+ If post_header_len is null, it means malloc failed, and is_valid
+ will fail, so there is no need to do anything.
+
+ The trees which have wrong event id's are:
+ mysql-5.1-wl2325-5.0-drop6p13-alpha, mysql-5.1-wl2325-5.0-drop6,
+ mysql-5.1-wl2325-5.0, mysql-5.1-wl2325-no-dd (`grep -C2
+ BEGIN_LOAD_QUERY_EVENT /home/bk/ * /sql/log_event.h`). The
+ corresponding version (`grep mysql, configure.in` in those trees)
+ strings are 5.2.2-a_drop6p13-alpha, 5.2.2-a_drop6p13c,
+ 5.1.5-a_drop5p20, 5.1.2-a_drop5p5.
+ */
+ if (post_header_len &&
+ (strncmp(server_version, "5.1.2-a_drop5", 13) == 0 ||
+ strncmp(server_version, "5.1.5-a_drop5", 13) == 0 ||
+ strncmp(server_version, "5.2.2-a_drop6", 13) == 0))
+ {
+ if (number_of_event_types != 22)
+ {
+ DBUG_PRINT("info", (" number_of_event_types=%d",
+ number_of_event_types));
+ /* this makes is_valid() return false. */
+ my_free(post_header_len, MYF(MY_ALLOW_ZERO_PTR));
+ post_header_len= NULL;
+ DBUG_VOID_RETURN;
+ }
+ static const uint8 perm[23]=
+ {
+ UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
+ INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
+ APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
+ NEW_LOAD_EVENT,
+ RAND_EVENT, USER_VAR_EVENT,
+ FORMAT_DESCRIPTION_EVENT,
+ TABLE_MAP_EVENT,
+ PRE_GA_WRITE_ROWS_EVENT,
+ PRE_GA_UPDATE_ROWS_EVENT,
+ PRE_GA_DELETE_ROWS_EVENT,
+ XID_EVENT,
+ BEGIN_LOAD_QUERY_EVENT,
+ EXECUTE_LOAD_QUERY_EVENT,
+ };
+ event_type_permutation= perm;
+ /*
+ Since we use (permuted) event id's to index the post_header_len
+ array, we need to permute the post_header_len array too.
+ */
+ uint8 post_header_len_temp[23];
+ for (int i= 1; i < 23; i++)
+ post_header_len_temp[perm[i] - 1]= post_header_len[i - 1];
+ for (int i= 0; i < 22; i++)
+ post_header_len[i] = post_header_len_temp[i];
+ }
DBUG_VOID_RETURN;
}
@@ -4267,6 +4416,7 @@ Xid_log_event(const char* buf,
#ifndef MYSQL_CLIENT
bool Xid_log_event::write(IO_CACHE* file)
{
+ DBUG_EXECUTE_IF("do_not_write_xid", return 0;);
return write_header(file, sizeof(xid)) ||
my_b_safe_write(file, (uchar*) &xid, sizeof(xid));
}
@@ -4520,8 +4670,10 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
switch (type) {
case REAL_RESULT:
double real_val;
+ char real_buf[FMT_G_BUFSIZE(14)];
float8get(real_val, val);
- my_b_printf(&cache, ":=%.14g%s\n", real_val, print_event_info->delimiter);
+ my_sprintf(real_buf, (real_buf, "%.14g", real_val));
+ my_b_printf(&cache, ":=%s%s\n", real_buf, print_event_info->delimiter);
break;
case INT_RESULT:
char int_buf[22];
@@ -4925,7 +5077,7 @@ Create_file_log_event(THD* thd_arg, sql_exchange* ex,
const char* db_arg, const char* table_name_arg,
List<Item>& fields_arg, enum enum_duplicates handle_dup,
bool ignore,
- char* block_arg, uint block_len_arg, bool using_trans)
+ uchar* block_arg, uint block_len_arg, bool using_trans)
:Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
using_trans),
fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
@@ -5023,8 +5175,8 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len,
Load_log_event::get_data_size() +
create_file_header_len + 1);
if (len < block_offset)
- return;
- block = (char*)buf + block_offset;
+ DBUG_VOID_RETURN;
+ block = (uchar*)buf + block_offset;
block_len = len - block_offset;
}
else
@@ -5182,7 +5334,7 @@ err:
#ifndef MYSQL_CLIENT
Append_block_log_event::Append_block_log_event(THD *thd_arg,
const char *db_arg,
- char *block_arg,
+ uchar *block_arg,
uint block_len_arg,
bool using_trans)
:Log_event(thd_arg,0, using_trans), block(block_arg),
@@ -5208,7 +5360,7 @@ Append_block_log_event::Append_block_log_event(const char* buf, uint len,
if (len < total_header_len)
DBUG_VOID_RETURN;
file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
- block= (char*)buf + total_header_len;
+ block= (uchar*)buf + total_header_len;
block_len= len - total_header_len;
DBUG_VOID_RETURN;
}
@@ -5600,7 +5752,7 @@ err:
#ifndef MYSQL_CLIENT
Begin_load_query_log_event::
-Begin_load_query_log_event(THD* thd_arg, const char* db_arg, char* block_arg,
+Begin_load_query_log_event(THD* thd_arg, const char* db_arg, uchar* block_arg,
uint block_len_arg, bool using_trans)
:Append_block_log_event(thd_arg, db_arg, block_arg, block_len_arg,
using_trans)
@@ -5732,12 +5884,12 @@ void Execute_load_query_log_event::print(FILE* file,
my_b_printf(&cache, " REPLACE");
my_b_printf(&cache, " INTO");
my_b_write(&cache, (uchar*) query + fn_pos_end, q_len-fn_pos_end);
- my_b_printf(&cache, "%s\n", print_event_info->delimiter);
+ my_b_printf(&cache, "\n%s\n", print_event_info->delimiter);
}
else
{
my_b_write(&cache, (uchar*) query, q_len);
- my_b_printf(&cache, "%s\n", print_event_info->delimiter);
+ my_b_printf(&cache, "\n%s\n", print_event_info->delimiter);
}
if (!print_event_info->short_form)
@@ -6180,7 +6332,6 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
{
DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
int error= 0;
-
/*
If m_table_id == ~0UL, then we have a dummy event that does not
contain any data. In that case, we just remove all tables in the
@@ -6226,6 +6377,24 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
lex_start(thd);
+ /*
+ There are a few flags that are replicated with each row event.
+ Make sure to set/clear them before executing the main body of
+ the event.
+ */
+ if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
+ thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
+ else
+ thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
+
+ if (get_flags(RELAXED_UNIQUE_CHECKS_F))
+ thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
+ else
+ thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
+ /* A small test to verify that objects have consistent types */
+ DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
+
+
while ((error= lock_tables(thd, rli->tables_to_lock,
rli->tables_to_lock_count, &need_reopen)))
{
@@ -6360,22 +6529,6 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
So we call set_time(), like in SBR. Presently it changes nothing.
*/
thd->set_time((time_t)when);
- /*
- There are a few flags that are replicated with each row event.
- Make sure to set/clear them before executing the main body of
- the event.
- */
- if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
- thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
- else
- thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
-
- if (get_flags(RELAXED_UNIQUE_CHECKS_F))
- thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
- else
- thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
- /* A small test to verify that objects have consistent types */
- DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
/*
Now we are in a statement and will stay in a statement until we
@@ -6408,8 +6561,9 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
if (!get_flags(COMPLETE_ROWS_F))
bitmap_intersect(table->write_set,&m_cols);
+ this->slave_exec_mode= slave_exec_mode_options; // fix the mode
+
// Do event specific preparations
-
error= do_before_row_operations(rli);
// row processing loop
@@ -6428,23 +6582,41 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
{
case 0:
break;
-
- /* Some recoverable errors */
+ /*
+ The following list of "idempotent" errors
+ means that an error from the list might happen
+ because of idempotent (more than once)
+ applying of a binlog file.
+ Notice, that binlog has a ddl operation its
+ second applying may cause
+
+ case HA_ERR_TABLE_DEF_CHANGED:
+ case HA_ERR_CANNOT_ADD_FOREIGN:
+
+ which are not included into to the list.
+ */
case HA_ERR_RECORD_CHANGED:
case HA_ERR_RECORD_DELETED:
case HA_ERR_KEY_NOT_FOUND:
case HA_ERR_END_OF_FILE:
- /* Idempotency support: OK if tuple does not exist */
+ case HA_ERR_FOUND_DUPP_KEY:
+ case HA_ERR_FOUND_DUPP_UNIQUE:
+ case HA_ERR_FOREIGN_DUPLICATE_KEY:
+ case HA_ERR_NO_REFERENCED_ROW:
+ case HA_ERR_ROW_IS_REFERENCED:
+
DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
- error= 0;
+ if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
+ {
+ if (global_system_variables.log_warnings)
+ slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
+ get_type_str(),
+ RPL_LOG_NAME, (ulong) log_pos);
+ error= 0;
+ }
break;
-
+
default:
- rli->report(ERROR_LEVEL,
- thd->is_error() ? thd->main_da.sql_errno() : 0,
- "Error in %s event: row application failed. %s",
- get_type_str(),
- thd->is_error() ? thd->main_da.message() : "");
thd->is_slave_error= 1;
break;
}
@@ -6488,17 +6660,14 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
if (rli->tables_to_lock && get_flags(STMT_END_F))
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
-
+
if (error)
{ /* error has occured during the transaction */
- rli->report(ERROR_LEVEL,
- thd->is_error() ? thd->main_da.sql_errno() : 0,
- "Error in %s event: error during transaction execution "
- "on table %s.%s. %s",
- get_type_str(), table->s->db.str,
- table->s->table_name.str,
- thd->is_error() ? thd->main_da.message() : "");
-
+ slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
+ get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
+ }
+ if (error)
+ {
/*
If one day we honour --skip-slave-errors in row-based replication, and
the error should be skipped, then we would clear mappings, rollback,
@@ -6610,6 +6779,7 @@ Rows_log_event::do_update_pos(Relay_log_info *rli)
*/
thd->reset_current_stmt_binlog_row_based();
+
rli->cleanup_context(thd, 0);
if (error == 0)
{
@@ -7299,43 +7469,50 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability
{
int error= 0;
- /*
- We are using REPLACE semantics and not INSERT IGNORE semantics
- when writing rows, that is: new rows replace old rows. We need to
- inform the storage engine that it should use this behaviour.
+ /**
+ todo: to introduce a property for the event (handler?) which forces
+ applying the event in the replace (idempotent) fashion.
*/
+ if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 ||
+ m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
+ {
+ /*
+ We are using REPLACE semantics and not INSERT IGNORE semantics
+ when writing rows, that is: new rows replace old rows. We need to
+ inform the storage engine that it should use this behaviour.
+ */
+
+ /* Tell the storage engine that we are using REPLACE semantics. */
+ thd->lex->duplicates= DUP_REPLACE;
+
+ /*
+ Pretend we're executing a REPLACE command: this is needed for
+ InnoDB and NDB Cluster since they are not (properly) checking the
+ lex->duplicates flag.
+ */
+ thd->lex->sql_command= SQLCOM_REPLACE;
+ /*
+ Do not raise the error flag in case of hitting to an unique attribute
+ */
+ m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
+ /*
+ NDB specific: update from ndb master wrapped as Write_rows
+ so that the event should be applied to replace slave's row
+ */
+ m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
+ /*
+ NDB specific: if update from ndb master wrapped as Write_rows
+ does not find the row it's assumed idempotent binlog applying
+ is taking place; don't raise the error.
+ */
+ m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
+ /*
+ TODO: the cluster team (Tomas?) says that it's better if the engine knows
+ how many rows are going to be inserted, then it can allocate needed memory
+ from the start.
+ */
+ }
- /* Tell the storage engine that we are using REPLACE semantics. */
- thd->lex->duplicates= DUP_REPLACE;
-
- /*
- Pretend we're executing a REPLACE command: this is needed for
- InnoDB and NDB Cluster since they are not (properly) checking the
- lex->duplicates flag.
- */
- thd->lex->sql_command= SQLCOM_REPLACE;
- /*
- Do not raise the error flag in case of hitting to an unique attribute
- */
- m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
- /*
- NDB specific: update from ndb master wrapped as Write_rows
- */
- /*
- so that the event should be applied to replace slave's row
- */
- m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
- /*
- NDB specific: if update from ndb master wrapped as Write_rows
- does not find the row it's assumed idempotent binlog applying
- is taking place; don't raise the error.
- */
- m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
- /*
- TODO: the cluster team (Tomas?) says that it's better if the engine knows
- how many rows are going to be inserted, then it can allocate needed memory
- from the start.
- */
m_table->file->ha_start_bulk_insert(0);
/*
We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
@@ -7357,18 +7534,23 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability
}
int
-Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
+Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
int error)
{
int local_error= 0;
- m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
- m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
- /*
- reseting the extra with
- table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
- fires bug#27077
- todo: explain or fix
- */
+ if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 ||
+ m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
+ {
+ m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
+ m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
+ /*
+ resetting the extra with
+ table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
+ fires bug#27077
+ explanation: file->reset() performs this duty
+ ultimately. Still todo: fix
+ */
+ }
if ((local_error= m_table->file->ha_end_bulk_insert()))
{
m_table->file->print_error(local_error, MYF(0));
@@ -7487,23 +7669,22 @@ Rows_log_event::write_row(const Relay_log_info *const rli,
while ((error= table->file->ha_write_row(table->record[0])))
{
- if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
- {
- table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
- DBUG_RETURN(error);
- }
- if ((keynum= table->file->get_dup_key(error)) < 0)
+ if (error == HA_ERR_LOCK_DEADLOCK ||
+ error == HA_ERR_LOCK_WAIT_TIMEOUT ||
+ (keynum= table->file->get_dup_key(error)) < 0 ||
+ !overwrite)
{
- DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum));
- table->file->print_error(error, MYF(0));
+ DBUG_PRINT("info",("get_dup_key returns %d)", keynum));
/*
- We failed to retrieve the duplicate key
+ Deadlock, waiting for lock or just an error from the handler
+ such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
+ Retrieval of the duplicate key number may fail
- either because the error was not "duplicate key" error
- or because the information which key is not available
*/
+ table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
-
/*
We need to retrieve the old row into record[1] to be able to
either update or delete the offending record. We either:
@@ -7641,14 +7822,16 @@ int
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
{
DBUG_ASSERT(m_table != NULL);
- int error= write_row(rli, TRUE /* overwrite */);
-
+ int error=
+ write_row(rli, /* if 1 then overwrite */
+ bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
+
if (error && !thd->is_error())
{
DBUG_ASSERT(0);
my_error(ER_UNKNOWN_ERROR, MYF(0));
}
-
+
return error;
}