diff options
author | unknown <mkindahl@dl145h.mysql.com> | 2008-01-31 17:46:50 +0100 |
---|---|---|
committer | unknown <mkindahl@dl145h.mysql.com> | 2008-01-31 17:46:50 +0100 |
commit | a36faa7edaec89c22a5d449a927bf9c11f2fbfde (patch) | |
tree | c327f90b44e25f92a820d5886ba6220cf6d9c2c8 /sql | |
parent | 4bacd53715ac860c6ba2d9c148f87d22cae9c62a (diff) | |
parent | 681bf14ea7a6fedb55d6e59a414d876d4d5f7313 (diff) | |
download | mariadb-git-a36faa7edaec89c22a5d449a927bf9c11f2fbfde.tar.gz |
Merge dl145h.mysql.com:/data0/mkindahl/mysql-5.1
into dl145h.mysql.com:/data0/mkindahl/mysql-5.1-rpl-merge
client/client_priv.h:
Auto merged
include/my_sys.h:
Auto merged
mysql-test/mysql-test-run.pl:
Auto merged
mysql-test/lib/mtr_report.pl:
Auto merged
mysql-test/suite/rpl/t/rpl_err_ignoredtable.test:
Auto merged
sql/item_cmpfunc.cc:
Auto merged
sql/log_event.cc:
Auto merged
sql/mysql_priv.h:
Auto merged
sql/mysqld.cc:
Auto merged
sql/set_var.cc:
Auto merged
sql/set_var.h:
Auto merged
sql/slave.cc:
Auto merged
sql/sql_acl.cc:
Auto merged
sql/sql_class.h:
Auto merged
sql/sql_parse.cc:
Auto merged
sql/sql_repl.cc:
Auto merged
sql/sql_view.cc:
Auto merged
mysql-test/suite/rpl/r/rpl_invoked_features.result:
Manual merge.
mysql-test/suite/rpl/t/rpl_invoked_features.test:
Manual merge.
sql/log.cc:
Manual merge.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 5 | ||||
-rw-r--r-- | sql/item_cmpfunc.cc | 2 | ||||
-rw-r--r-- | sql/log.cc | 102 | ||||
-rw-r--r-- | sql/log_event.cc | 571 | ||||
-rw-r--r-- | sql/log_event.h | 51 | ||||
-rw-r--r-- | sql/log_event_old.cc | 1712 | ||||
-rw-r--r-- | sql/log_event_old.h | 450 | ||||
-rw-r--r-- | sql/mysql_priv.h | 1 | ||||
-rw-r--r-- | sql/mysqld.cc | 19 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 5 | ||||
-rw-r--r-- | sql/rpl_utility.cc | 2 | ||||
-rw-r--r-- | sql/set_var.cc | 100 | ||||
-rw-r--r-- | sql/set_var.h | 40 | ||||
-rw-r--r-- | sql/share/errmsg.txt | 5 | ||||
-rw-r--r-- | sql/slave.cc | 310 | ||||
-rw-r--r-- | sql/slave.h | 2 | ||||
-rw-r--r-- | sql/sql_acl.cc | 27 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 74 | ||||
-rw-r--r-- | sql/sql_class.h | 3 | ||||
-rw-r--r-- | sql/sql_parse.cc | 1 | ||||
-rw-r--r-- | sql/sql_repl.cc | 75 | ||||
-rw-r--r-- | sql/sql_string.cc | 4 | ||||
-rw-r--r-- | sql/sql_view.cc | 29 |
23 files changed, 3105 insertions, 485 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index 07b0d907229..841dce2d832 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -799,7 +799,7 @@ static int ndbcluster_create_ndb_apply_status_table(THD *thd) " log_name VARCHAR(255) BINARY NOT NULL, " " start_pos BIGINT UNSIGNED NOT NULL, " " end_pos BIGINT UNSIGNED NOT NULL, " - " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB"); + " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB CHARACTER SET latin1"); const int no_print_error[5]= {ER_TABLE_EXISTS_ERROR, 701, @@ -860,7 +860,7 @@ static int ndbcluster_create_schema_table(THD *thd) " id INT UNSIGNED NOT NULL," " version INT UNSIGNED NOT NULL," " type INT UNSIGNED NOT NULL," - " PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB"); + " PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB CHARACTER SET latin1"); const int no_print_error[5]= {ER_TABLE_EXISTS_ERROR, 701, @@ -4036,6 +4036,7 @@ restart: i_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); bzero((char*) &row, sizeof(row)); + thd->variables.character_set_client= &my_charset_latin1; injector::transaction trans; // pass table map before epoch { diff --git a/sql/item_cmpfunc.cc b/sql/item_cmpfunc.cc index 62ac7bc4751..dc868376796 100644 --- a/sql/item_cmpfunc.cc +++ b/sql/item_cmpfunc.cc @@ -975,7 +975,7 @@ get_datetime_value(THD *thd, Item ***item_arg, Item **cache_arg, *is_null= item->null_value; } if (*is_null) - return -1; + return ~(ulonglong) 0; /* Convert strings to the integer DATE/DATETIME representation. Even if both dates provided in strings we can't compare them directly as diff --git a/sql/log.cc b/sql/log.cc index 3a09acd8fca..fce1eb076db 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -1420,6 +1420,21 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) return 0; } +/** + This function is called once after each statement. + + It has the responsibility to flush the transaction cache to the + binlog file on commits. + + @param hton The binlog handlerton. + @param thd The client thread that executes the transaction. + @param all true if this is the last statement before a COMMIT + statement; false if either this is a statement in a + transaction but not the last, or if this is a statement + not inside a BEGIN block and autocommit is on. + + @see handlerton::commit +*/ static int binlog_commit(handlerton *hton, THD *thd, bool all) { DBUG_ENTER("binlog_commit"); @@ -1432,7 +1447,15 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) trx_data->reset(); DBUG_RETURN(0); } - if (all) + /* + Write commit event if at least one of the following holds: + - the user sends an explicit COMMIT; or + - the autocommit flag is on, and we are not inside a BEGIN. + However, if the user has not sent an explicit COMMIT, and we are + either inside a BEGIN or run with autocommit off, then this is not + the end of a transaction and we should not write a commit event. + */ + if (all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) { Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE); qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE) @@ -1446,6 +1469,23 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) } } +/** + This function is called when a transaction involving a transactional + table is rolled back. + + It has the responsibility to flush the transaction cache to the + binlog file. However, if the transaction does not involve + non-transactional tables, nothing needs to be logged. + + @param hton The binlog handlerton. + @param thd The client thread that executes the transaction. + @param all true if this is the last statement before a COMMIT + statement; false if either this is a statement in a + transaction but not the last, or if this is a statement + not inside a BEGIN block and autocommit is on. + + @see handlerton::rollback +*/ static int binlog_rollback(handlerton *hton, THD *thd, bool all) { DBUG_ENTER("binlog_rollback"); @@ -4010,32 +4050,42 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event) if (my_b_tell(cache) > 0) { /* - Log "BEGIN" at the beginning of the transaction. - which may contain more than 1 SQL statement. + Log "BEGIN" at the beginning of every transaction. Here, a + transaction is either a BEGIN..COMMIT block or a single + statement in autocommit mode. */ - if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) - { - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE); - /* - Imagine this is rollback due to net timeout, after all statements of - the transaction succeeded. Then we want a zero-error code in BEGIN. - In other words, if there was a really serious error code it's already - in the statement's events, there is no need to put it also in this - internally generated event, and as this event is generated late it - would lead to false alarms. - This is safer than thd->clear_error() against kills at shutdown. - */ - qinfo.error_code= 0; - /* - Now this Query_log_event has artificial log_pos 0. It must be adjusted - to reflect the real position in the log. Not doing it would confuse the - slave: it would prevent this one from knowing where he is in the - master's binlog, which would result in wrong positions being shown to - the user, MASTER_POS_WAIT undue waiting etc. - */ - if (qinfo.write(&log_file)) - goto err; - } + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE); + /* + Imagine this is rollback due to net timeout, after all + statements of the transaction succeeded. Then we want a + zero-error code in BEGIN. In other words, if there was a + really serious error code it's already in the statement's + events, there is no need to put it also in this internally + generated event, and as this event is generated late it would + lead to false alarms. + + This is safer than thd->clear_error() against kills at shutdown. + */ + qinfo.error_code= 0; + /* + Now this Query_log_event has artificial log_pos 0. It must be + adjusted to reflect the real position in the log. Not doing it + would confuse the slave: it would prevent this one from + knowing where he is in the master's binlog, which would result + in wrong positions being shown to the user, MASTER_POS_WAIT + undue waiting etc. + */ + if (qinfo.write(&log_file)) + goto err; + + DBUG_EXECUTE_IF("crash_before_writing_xid", + { + if ((write_error= write_cache(cache, false, true))) + DBUG_PRINT("info", ("error writing binlog cache: %d", + write_error)); + DBUG_PRINT("info", ("crashing before writing xid")); + abort(); + }); if ((write_error= write_cache(cache, false, false))) goto err; diff --git a/sql/log_event.cc b/sql/log_event.cc index cf03dd5bf44..df0d1e8a020 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -36,7 +36,17 @@ #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") -#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) && !defined(DBUG_OFF) && !defined(_lint) + +/* + Size of buffer for printing a double in format %.<PREC>g + + optional '-' + optional zero + '.' + PREC digits + 'e' + sign + + exponent digits + '\0' +*/ +#define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1) + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) static const char *HA_ERR(int i) { switch (i) { @@ -90,7 +100,28 @@ static const char *HA_ERR(int i) case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE"; case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT"; } - return "<unknown error>"; + return 0; +} + +/** + macro to call from different branches of Rows_log_event::do_apply_event +*/ +static void inline slave_rows_error_report(enum loglevel level, int ha_error, + Relay_log_info const *rli, THD *thd, + TABLE *table, const char * type, + const char *log_name, ulong pos) +{ + const char *handler_error= HA_ERR(ha_error); + rli->report(level, thd->net.client_last_errno, + "Could not execute %s event on table %s.%s;" + "%s%s handler error %s; " + "the event's master log %s, end_log_pos %lu", + type, table->s->db.str, + table->s->table_name.str, + thd->net.client_last_error[0] != 0 ? thd->net.client_last_error : "", + thd->net.client_last_error[0] != 0 ? ";" : "", + handler_error == NULL? "<unknown>" : handler_error, + log_name, pos); } #endif @@ -460,9 +491,9 @@ static void print_set_option(IO_CACHE* file, uint32 bits_changed, returns the human readable name of the event's type */ -const char* Log_event::get_type_str() +const char* Log_event::get_type_str(Log_event_type type) { - switch(get_type_code()) { + switch(type) { case START_EVENT_V3: return "Start_v3"; case STOP_EVENT: return "Stop"; case QUERY_EVENT: return "Query"; @@ -480,6 +511,9 @@ const char* Log_event::get_type_str() case USER_VAR_EVENT: return "User var"; case FORMAT_DESCRIPTION_EVENT: return "Format_desc"; case TABLE_MAP_EVENT: return "Table_map"; + case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old"; + case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old"; + case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old"; case WRITE_ROWS_EVENT: return "Write_rows"; case UPDATE_ROWS_EVENT: return "Update_rows"; case DELETE_ROWS_EVENT: return "Delete_rows"; @@ -490,6 +524,11 @@ const char* Log_event::get_type_str() } } +const char* Log_event::get_type_str() +{ + return get_type_str(get_type_code()); +} + /* Log_event::Log_event() @@ -997,6 +1036,8 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, DBUG_ENTER("Log_event::read_log_event(char*,...)"); DBUG_ASSERT(description_event != 0); DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version)); + DBUG_DUMP("data", (unsigned char*) buf, event_len); + /* Check the integrity */ if (event_len < EVENT_LEN_OFFSET || buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT || @@ -1006,94 +1047,134 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, DBUG_RETURN(NULL); // general sanity check - will fail on a partial read } - switch(buf[EVENT_TYPE_OFFSET]) { - case QUERY_EVENT: - ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT); - break; - case LOAD_EVENT: - ev = new Load_log_event(buf, event_len, description_event); - break; - case NEW_LOAD_EVENT: - ev = new Load_log_event(buf, event_len, description_event); - break; - case ROTATE_EVENT: - ev = new Rotate_log_event(buf, event_len, description_event); - break; + uint event_type= buf[EVENT_TYPE_OFFSET]; + if (event_type > description_event->number_of_event_types && + event_type != FORMAT_DESCRIPTION_EVENT) + { + /* + It is unsafe to use the description_event if its post_header_len + array does not include the event type. + */ + DBUG_PRINT("error", ("event type %d found, but the current " + "Format_description_log_event supports only %d event " + "types", event_type, + description_event->number_of_event_types)); + ev= NULL; + } + else + { + /* + In some previuos versions (see comment in + Format_description_log_event::Format_description_log_event(char*,...)), + event types were assigned different id numbers than in the + present version. In order to replicate from such versions to the + present version, we must map those event type id's to our event + type id's. The mapping is done with the event_type_permutation + array, which was set up when the Format_description_log_event + was read. + */ + if (description_event->event_type_permutation) + { + IF_DBUG({ + int new_event_type= + description_event->event_type_permutation[event_type]; + DBUG_PRINT("info", + ("converting event type %d to %d (%s)", + event_type, new_event_type, + get_type_str((Log_event_type)new_event_type))); + }); + event_type= description_event->event_type_permutation[event_type]; + } + + switch(event_type) { + case QUERY_EVENT: + ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT); + break; + case LOAD_EVENT: + ev = new Load_log_event(buf, event_len, description_event); + break; + case NEW_LOAD_EVENT: + ev = new Load_log_event(buf, event_len, description_event); + break; + case ROTATE_EVENT: + ev = new Rotate_log_event(buf, event_len, description_event); + break; #ifdef HAVE_REPLICATION - case SLAVE_EVENT: /* can never happen (unused event) */ - ev = new Slave_log_event(buf, event_len); - break; + case SLAVE_EVENT: /* can never happen (unused event) */ + ev = new Slave_log_event(buf, event_len); + break; #endif /* HAVE_REPLICATION */ - case CREATE_FILE_EVENT: - ev = new Create_file_log_event(buf, event_len, description_event); - break; - case APPEND_BLOCK_EVENT: - ev = new Append_block_log_event(buf, event_len, description_event); - break; - case DELETE_FILE_EVENT: - ev = new Delete_file_log_event(buf, event_len, description_event); - break; - case EXEC_LOAD_EVENT: - ev = new Execute_load_log_event(buf, event_len, description_event); - break; - case START_EVENT_V3: /* this is sent only by MySQL <=4.x */ - ev = new Start_log_event_v3(buf, description_event); - break; - case STOP_EVENT: - ev = new Stop_log_event(buf, description_event); - break; - case INTVAR_EVENT: - ev = new Intvar_log_event(buf, description_event); - break; - case XID_EVENT: - ev = new Xid_log_event(buf, description_event); - break; - case RAND_EVENT: - ev = new Rand_log_event(buf, description_event); - break; - case USER_VAR_EVENT: - ev = new User_var_log_event(buf, description_event); - break; - case FORMAT_DESCRIPTION_EVENT: - ev = new Format_description_log_event(buf, event_len, description_event); - break; + case CREATE_FILE_EVENT: + ev = new Create_file_log_event(buf, event_len, description_event); + break; + case APPEND_BLOCK_EVENT: + ev = new Append_block_log_event(buf, event_len, description_event); + break; + case DELETE_FILE_EVENT: + ev = new Delete_file_log_event(buf, event_len, description_event); + break; + case EXEC_LOAD_EVENT: + ev = new Execute_load_log_event(buf, event_len, description_event); + break; + case START_EVENT_V3: /* this is sent only by MySQL <=4.x */ + ev = new Start_log_event_v3(buf, description_event); + break; + case STOP_EVENT: + ev = new Stop_log_event(buf, description_event); + break; + case INTVAR_EVENT: + ev = new Intvar_log_event(buf, description_event); + break; + case XID_EVENT: + ev = new Xid_log_event(buf, description_event); + break; + case RAND_EVENT: + ev = new Rand_log_event(buf, description_event); + break; + case USER_VAR_EVENT: + ev = new User_var_log_event(buf, description_event); + break; + case FORMAT_DESCRIPTION_EVENT: + ev = new Format_description_log_event(buf, event_len, description_event); + break; #if defined(HAVE_REPLICATION) - case PRE_GA_WRITE_ROWS_EVENT: - ev = new Write_rows_log_event_old(buf, event_len, description_event); - break; - case PRE_GA_UPDATE_ROWS_EVENT: - ev = new Update_rows_log_event_old(buf, event_len, description_event); - break; - case PRE_GA_DELETE_ROWS_EVENT: - ev = new Delete_rows_log_event_old(buf, event_len, description_event); - break; - case WRITE_ROWS_EVENT: - ev = new Write_rows_log_event(buf, event_len, description_event); - break; - case UPDATE_ROWS_EVENT: - ev = new Update_rows_log_event(buf, event_len, description_event); - break; - case DELETE_ROWS_EVENT: - ev = new Delete_rows_log_event(buf, event_len, description_event); - break; - case TABLE_MAP_EVENT: - ev = new Table_map_log_event(buf, event_len, description_event); - break; + case PRE_GA_WRITE_ROWS_EVENT: + ev = new Write_rows_log_event_old(buf, event_len, description_event); + break; + case PRE_GA_UPDATE_ROWS_EVENT: + ev = new Update_rows_log_event_old(buf, event_len, description_event); + break; + case PRE_GA_DELETE_ROWS_EVENT: + ev = new Delete_rows_log_event_old(buf, event_len, description_event); + break; + case WRITE_ROWS_EVENT: + ev = new Write_rows_log_event(buf, event_len, description_event); + break; + case UPDATE_ROWS_EVENT: + ev = new Update_rows_log_event(buf, event_len, description_event); + break; + case DELETE_ROWS_EVENT: + ev = new Delete_rows_log_event(buf, event_len, description_event); + break; + case TABLE_MAP_EVENT: + ev = new Table_map_log_event(buf, event_len, description_event); + break; #endif - case BEGIN_LOAD_QUERY_EVENT: - ev = new Begin_load_query_log_event(buf, event_len, description_event); - break; - case EXECUTE_LOAD_QUERY_EVENT: - ev= new Execute_load_query_log_event(buf, event_len, description_event); - break; - case INCIDENT_EVENT: - ev = new Incident_log_event(buf, event_len, description_event); - break; - default: - DBUG_PRINT("error",("Unknown event code: %d", - (int) buf[EVENT_TYPE_OFFSET])); - ev= NULL; - break; + case BEGIN_LOAD_QUERY_EVENT: + ev = new Begin_load_query_log_event(buf, event_len, description_event); + break; + case EXECUTE_LOAD_QUERY_EVENT: + ev= new Execute_load_query_log_event(buf, event_len, description_event); + break; + case INCIDENT_EVENT: + ev = new Incident_log_event(buf, event_len, description_event); + break; + default: + DBUG_PRINT("error",("Unknown event code: %d", + (int) buf[EVENT_TYPE_OFFSET])); + ev= NULL; + break; + } } DBUG_PRINT("read_event", ("%s(type_code: %d; event_len: %d)", @@ -1632,6 +1713,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, DBUG_ASSERT(thd_arg->variables.character_set_client->number < 256*256); DBUG_ASSERT(thd_arg->variables.collation_connection->number < 256*256); DBUG_ASSERT(thd_arg->variables.collation_server->number < 256*256); + DBUG_ASSERT(thd_arg->variables.character_set_client->mbminlen == 1); int2store(charset, thd_arg->variables.character_set_client->number); int2store(charset+2, thd_arg->variables.collation_connection->number); int2store(charset+4, thd_arg->variables.collation_server->number); @@ -2135,7 +2217,7 @@ void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) print_query_header(&cache, print_event_info); my_b_write(&cache, (uchar*) query, q_len); - my_b_printf(&cache, "%s\n", print_event_info->delimiter); + my_b_printf(&cache, "\n%s\n", print_event_info->delimiter); } #endif /* MYSQL_CLIENT */ @@ -2580,6 +2662,14 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) my_b_printf(&cache,"ROLLBACK%s\n", print_event_info->delimiter); #endif } + if (temp_buf && + print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER && + !print_event_info->short_form) + { + my_b_printf(&cache, "BINLOG '\n"); + print_base64(&cache, print_event_info, FALSE); + print_event_info->printed_fd_event= TRUE; + } DBUG_VOID_RETURN; } #endif /* MYSQL_CLIENT */ @@ -2715,7 +2805,7 @@ int Start_log_event_v3::do_apply_event(Relay_log_info const *rli) Format_description_log_event:: Format_description_log_event(uint8 binlog_ver, const char* server_ver) - :Start_log_event_v3() + :Start_log_event_v3(), event_type_permutation(0) { binlog_version= binlog_ver; switch (binlog_ver) { @@ -2842,7 +2932,7 @@ Format_description_log_event(const char* buf, const Format_description_log_event* description_event) - :Start_log_event_v3(buf, description_event) + :Start_log_event_v3(buf, description_event), event_type_permutation(0) { DBUG_ENTER("Format_description_log_event::Format_description_log_event(char*,...)"); buf+= LOG_EVENT_MINIMAL_HEADER_LEN; @@ -2857,6 +2947,65 @@ Format_description_log_event(const char* buf, number_of_event_types* sizeof(*post_header_len), MYF(0)); calc_server_version_split(); + + /* + In some previous versions, the events were given other event type + id numbers than in the present version. When replicating from such + a version, we therefore set up an array that maps those id numbers + to the id numbers of the present server. + + If post_header_len is null, it means malloc failed, and is_valid + will fail, so there is no need to do anything. + + The trees which have wrong event id's are: + mysql-5.1-wl2325-5.0-drop6p13-alpha, mysql-5.1-wl2325-5.0-drop6, + mysql-5.1-wl2325-5.0, mysql-5.1-wl2325-no-dd (`grep -C2 + BEGIN_LOAD_QUERY_EVENT /home/bk/ * /sql/log_event.h`). The + corresponding version (`grep mysql, configure.in` in those trees) + strings are 5.2.2-a_drop6p13-alpha, 5.2.2-a_drop6p13c, + 5.1.5-a_drop5p20, 5.1.2-a_drop5p5. + */ + if (post_header_len && + (strncmp(server_version, "5.1.2-a_drop5", 13) == 0 || + strncmp(server_version, "5.1.5-a_drop5", 13) == 0 || + strncmp(server_version, "5.2.2-a_drop6", 13) == 0)) + { + if (number_of_event_types != 22) + { + DBUG_PRINT("info", (" number_of_event_types=%d", + number_of_event_types)); + /* this makes is_valid() return false. */ + my_free(post_header_len, MYF(MY_ALLOW_ZERO_PTR)); + post_header_len= NULL; + DBUG_VOID_RETURN; + } + static const uint8 perm[23]= + { + UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT, + INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT, + APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT, + NEW_LOAD_EVENT, + RAND_EVENT, USER_VAR_EVENT, + FORMAT_DESCRIPTION_EVENT, + TABLE_MAP_EVENT, + PRE_GA_WRITE_ROWS_EVENT, + PRE_GA_UPDATE_ROWS_EVENT, + PRE_GA_DELETE_ROWS_EVENT, + XID_EVENT, + BEGIN_LOAD_QUERY_EVENT, + EXECUTE_LOAD_QUERY_EVENT, + }; + event_type_permutation= perm; + /* + Since we use (permuted) event id's to index the post_header_len + array, we need to permute the post_header_len array too. + */ + uint8 post_header_len_temp[23]; + for (int i= 1; i < 23; i++) + post_header_len_temp[perm[i] - 1]= post_header_len[i - 1]; + for (int i= 0; i < 22; i++) + post_header_len[i] = post_header_len_temp[i]; + } DBUG_VOID_RETURN; } @@ -4267,6 +4416,7 @@ Xid_log_event(const char* buf, #ifndef MYSQL_CLIENT bool Xid_log_event::write(IO_CACHE* file) { + DBUG_EXECUTE_IF("do_not_write_xid", return 0;); return write_header(file, sizeof(xid)) || my_b_safe_write(file, (uchar*) &xid, sizeof(xid)); } @@ -4520,8 +4670,10 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) switch (type) { case REAL_RESULT: double real_val; + char real_buf[FMT_G_BUFSIZE(14)]; float8get(real_val, val); - my_b_printf(&cache, ":=%.14g%s\n", real_val, print_event_info->delimiter); + my_sprintf(real_buf, (real_buf, "%.14g", real_val)); + my_b_printf(&cache, ":=%s%s\n", real_buf, print_event_info->delimiter); break; case INT_RESULT: char int_buf[22]; @@ -4925,7 +5077,7 @@ Create_file_log_event(THD* thd_arg, sql_exchange* ex, const char* db_arg, const char* table_name_arg, List<Item>& fields_arg, enum enum_duplicates handle_dup, bool ignore, - char* block_arg, uint block_len_arg, bool using_trans) + uchar* block_arg, uint block_len_arg, bool using_trans) :Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore, using_trans), fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg), @@ -5023,8 +5175,8 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len, Load_log_event::get_data_size() + create_file_header_len + 1); if (len < block_offset) - return; - block = (char*)buf + block_offset; + DBUG_VOID_RETURN; + block = (uchar*)buf + block_offset; block_len = len - block_offset; } else @@ -5182,7 +5334,7 @@ err: #ifndef MYSQL_CLIENT Append_block_log_event::Append_block_log_event(THD *thd_arg, const char *db_arg, - char *block_arg, + uchar *block_arg, uint block_len_arg, bool using_trans) :Log_event(thd_arg,0, using_trans), block(block_arg), @@ -5208,7 +5360,7 @@ Append_block_log_event::Append_block_log_event(const char* buf, uint len, if (len < total_header_len) DBUG_VOID_RETURN; file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET); - block= (char*)buf + total_header_len; + block= (uchar*)buf + total_header_len; block_len= len - total_header_len; DBUG_VOID_RETURN; } @@ -5600,7 +5752,7 @@ err: #ifndef MYSQL_CLIENT Begin_load_query_log_event:: -Begin_load_query_log_event(THD* thd_arg, const char* db_arg, char* block_arg, +Begin_load_query_log_event(THD* thd_arg, const char* db_arg, uchar* block_arg, uint block_len_arg, bool using_trans) :Append_block_log_event(thd_arg, db_arg, block_arg, block_len_arg, using_trans) @@ -5732,12 +5884,12 @@ void Execute_load_query_log_event::print(FILE* file, my_b_printf(&cache, " REPLACE"); my_b_printf(&cache, " INTO"); my_b_write(&cache, (uchar*) query + fn_pos_end, q_len-fn_pos_end); - my_b_printf(&cache, "%s\n", print_event_info->delimiter); + my_b_printf(&cache, "\n%s\n", print_event_info->delimiter); } else { my_b_write(&cache, (uchar*) query, q_len); - my_b_printf(&cache, "%s\n", print_event_info->delimiter); + my_b_printf(&cache, "\n%s\n", print_event_info->delimiter); } if (!print_event_info->short_form) @@ -6180,7 +6332,6 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) { DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)"); int error= 0; - /* If m_table_id == ~0UL, then we have a dummy event that does not contain any data. In that case, we just remove all tables in the @@ -6226,6 +6377,24 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ lex_start(thd); + /* + There are a few flags that are replicated with each row event. + Make sure to set/clear them before executing the main body of + the event. + */ + if (get_flags(NO_FOREIGN_KEY_CHECKS_F)) + thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS; + else + thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; + + if (get_flags(RELAXED_UNIQUE_CHECKS_F)) + thd->options|= OPTION_RELAXED_UNIQUE_CHECKS; + else + thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; + /* A small test to verify that objects have consistent types */ + DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); + + while ((error= lock_tables(thd, rli->tables_to_lock, rli->tables_to_lock_count, &need_reopen))) { @@ -6360,22 +6529,6 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) So we call set_time(), like in SBR. Presently it changes nothing. */ thd->set_time((time_t)when); - /* - There are a few flags that are replicated with each row event. - Make sure to set/clear them before executing the main body of - the event. - */ - if (get_flags(NO_FOREIGN_KEY_CHECKS_F)) - thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS; - else - thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; - - if (get_flags(RELAXED_UNIQUE_CHECKS_F)) - thd->options|= OPTION_RELAXED_UNIQUE_CHECKS; - else - thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; - /* A small test to verify that objects have consistent types */ - DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); /* Now we are in a statement and will stay in a statement until we @@ -6408,8 +6561,9 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) if (!get_flags(COMPLETE_ROWS_F)) bitmap_intersect(table->write_set,&m_cols); + this->slave_exec_mode= slave_exec_mode_options; // fix the mode + // Do event specific preparations - error= do_before_row_operations(rli); // row processing loop @@ -6428,23 +6582,41 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) { case 0: break; - - /* Some recoverable errors */ + /* + The following list of "idempotent" errors + means that an error from the list might happen + because of idempotent (more than once) + applying of a binlog file. + Notice, that binlog has a ddl operation its + second applying may cause + + case HA_ERR_TABLE_DEF_CHANGED: + case HA_ERR_CANNOT_ADD_FOREIGN: + + which are not included into to the list. + */ case HA_ERR_RECORD_CHANGED: case HA_ERR_RECORD_DELETED: case HA_ERR_KEY_NOT_FOUND: case HA_ERR_END_OF_FILE: - /* Idempotency support: OK if tuple does not exist */ + case HA_ERR_FOUND_DUPP_KEY: + case HA_ERR_FOUND_DUPP_UNIQUE: + case HA_ERR_FOREIGN_DUPLICATE_KEY: + case HA_ERR_NO_REFERENCED_ROW: + case HA_ERR_ROW_IS_REFERENCED: + DBUG_PRINT("info", ("error: %s", HA_ERR(error))); - error= 0; + if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1) + { + if (global_system_variables.log_warnings) + slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table, + get_type_str(), + RPL_LOG_NAME, (ulong) log_pos); + error= 0; + } break; - + default: - rli->report(ERROR_LEVEL, - thd->is_error() ? thd->main_da.sql_errno() : 0, - "Error in %s event: row application failed. %s", - get_type_str(), - thd->is_error() ? thd->main_da.message() : ""); thd->is_slave_error= 1; break; } @@ -6488,17 +6660,14 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) */ if (rli->tables_to_lock && get_flags(STMT_END_F)) const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); - + if (error) { /* error has occured during the transaction */ - rli->report(ERROR_LEVEL, - thd->is_error() ? thd->main_da.sql_errno() : 0, - "Error in %s event: error during transaction execution " - "on table %s.%s. %s", - get_type_str(), table->s->db.str, - table->s->table_name.str, - thd->is_error() ? thd->main_da.message() : ""); - + slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table, + get_type_str(), RPL_LOG_NAME, (ulong) log_pos); + } + if (error) + { /* If one day we honour --skip-slave-errors in row-based replication, and the error should be skipped, then we would clear mappings, rollback, @@ -6610,6 +6779,7 @@ Rows_log_event::do_update_pos(Relay_log_info *rli) */ thd->reset_current_stmt_binlog_row_based(); + rli->cleanup_context(thd, 0); if (error == 0) { @@ -7299,43 +7469,50 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability { int error= 0; - /* - We are using REPLACE semantics and not INSERT IGNORE semantics - when writing rows, that is: new rows replace old rows. We need to - inform the storage engine that it should use this behaviour. + /** + todo: to introduce a property for the event (handler?) which forces + applying the event in the replace (idempotent) fashion. */ + if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 || + m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER) + { + /* + We are using REPLACE semantics and not INSERT IGNORE semantics + when writing rows, that is: new rows replace old rows. We need to + inform the storage engine that it should use this behaviour. + */ + + /* Tell the storage engine that we are using REPLACE semantics. */ + thd->lex->duplicates= DUP_REPLACE; + + /* + Pretend we're executing a REPLACE command: this is needed for + InnoDB and NDB Cluster since they are not (properly) checking the + lex->duplicates flag. + */ + thd->lex->sql_command= SQLCOM_REPLACE; + /* + Do not raise the error flag in case of hitting to an unique attribute + */ + m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); + /* + NDB specific: update from ndb master wrapped as Write_rows + so that the event should be applied to replace slave's row + */ + m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + /* + NDB specific: if update from ndb master wrapped as Write_rows + does not find the row it's assumed idempotent binlog applying + is taking place; don't raise the error. + */ + m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY); + /* + TODO: the cluster team (Tomas?) says that it's better if the engine knows + how many rows are going to be inserted, then it can allocate needed memory + from the start. + */ + } - /* Tell the storage engine that we are using REPLACE semantics. */ - thd->lex->duplicates= DUP_REPLACE; - - /* - Pretend we're executing a REPLACE command: this is needed for - InnoDB and NDB Cluster since they are not (properly) checking the - lex->duplicates flag. - */ - thd->lex->sql_command= SQLCOM_REPLACE; - /* - Do not raise the error flag in case of hitting to an unique attribute - */ - m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); - /* - NDB specific: update from ndb master wrapped as Write_rows - */ - /* - so that the event should be applied to replace slave's row - */ - m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); - /* - NDB specific: if update from ndb master wrapped as Write_rows - does not find the row it's assumed idempotent binlog applying - is taking place; don't raise the error. - */ - m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY); - /* - TODO: the cluster team (Tomas?) says that it's better if the engine knows - how many rows are going to be inserted, then it can allocate needed memory - from the start. - */ m_table->file->ha_start_bulk_insert(0); /* We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill @@ -7357,18 +7534,23 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability } int -Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, +Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, int error) { int local_error= 0; - m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); - m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); - /* - reseting the extra with - table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); - fires bug#27077 - todo: explain or fix - */ + if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 || + m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER) + { + m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); + /* + resetting the extra with + table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); + fires bug#27077 + explanation: file->reset() performs this duty + ultimately. Still todo: fix + */ + } if ((local_error= m_table->file->ha_end_bulk_insert())) { m_table->file->print_error(local_error, MYF(0)); @@ -7487,23 +7669,22 @@ Rows_log_event::write_row(const Relay_log_info *const rli, while ((error= table->file->ha_write_row(table->record[0]))) { - if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT) - { - table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */ - DBUG_RETURN(error); - } - if ((keynum= table->file->get_dup_key(error)) < 0) + if (error == HA_ERR_LOCK_DEADLOCK || + error == HA_ERR_LOCK_WAIT_TIMEOUT || + (keynum= table->file->get_dup_key(error)) < 0 || + !overwrite) { - DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum)); - table->file->print_error(error, MYF(0)); + DBUG_PRINT("info",("get_dup_key returns %d)", keynum)); /* - We failed to retrieve the duplicate key + Deadlock, waiting for lock or just an error from the handler + such as HA_ERR_FOUND_DUPP_KEY when overwrite is false. + Retrieval of the duplicate key number may fail - either because the error was not "duplicate key" error - or because the information which key is not available */ + table->file->print_error(error, MYF(0)); DBUG_RETURN(error); } - /* We need to retrieve the old row into record[1] to be able to either update or delete the offending record. We either: @@ -7641,14 +7822,16 @@ int Write_rows_log_event::do_exec_row(const Relay_log_info *const rli) { DBUG_ASSERT(m_table != NULL); - int error= write_row(rli, TRUE /* overwrite */); - + int error= + write_row(rli, /* if 1 then overwrite */ + bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1); + if (error && !thd->is_error()) { DBUG_ASSERT(0); my_error(ER_UNKNOWN_ERROR, MYF(0)); } - + return error; } diff --git a/sql/log_event.h b/sql/log_event.h index 4bd496af2a4..efb8675780e 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -567,6 +567,15 @@ class Format_description_log_event; class Relay_log_info; #ifdef MYSQL_CLIENT +enum enum_base64_output_mode { + BASE64_OUTPUT_NEVER= 0, + BASE64_OUTPUT_AUTO= 1, + BASE64_OUTPUT_ALWAYS= 2, + BASE64_OUTPUT_UNSPEC= 3, + /* insert new output modes here */ + BASE64_OUTPUT_MODE_COUNT +}; + /* A structure for mysqlbinlog to know how to print events @@ -600,7 +609,8 @@ typedef struct st_print_event_info st_print_event_info() :flags2_inited(0), sql_mode_inited(0), auto_increment_increment(1),auto_increment_offset(1), charset_inited(0), - lc_time_names_number(0), charset_database_number(0) + lc_time_names_number(0), charset_database_number(0), + base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE) { /* Currently we only use static PRINT_EVENT_INFO objects, so zeroed at @@ -627,7 +637,14 @@ typedef struct st_print_event_info /* Settings on how to print the events */ bool short_form; - bool base64_output; + enum_base64_output_mode base64_output_mode; + /* + This is set whenever a Format_description_event is printed. + Later, when an event is printed in base64, this flag is tested: if + no Format_description_event has been seen, it is unsafe to print + the base64 event, so an error message is generated. + */ + bool printed_fd_event; my_off_t hexdump_from; uint8 common_header_len; char delimiter[16]; @@ -809,6 +826,12 @@ public: bool cache_stmt; + /** + A storage to cache the global system variable's value. + Handling of a separate event will be governed its member. + */ + ulong slave_exec_mode; + #ifndef MYSQL_CLIENT THD* thd; @@ -930,7 +953,13 @@ public: const char **error, const Format_description_log_event *description_event); - /* returns the human readable name of the event's type */ + /** + Returns the human readable name of the given event type. + */ + static const char* get_type_str(Log_event_type type); + /** + Returns the human readable name of this event's type. + */ const char* get_type_str(); /* Return start of query time or current time */ @@ -2077,12 +2106,16 @@ public: /* The list of post-headers' lengthes */ uint8 *post_header_len; uchar server_version_split[3]; + const uint8 *event_type_permutation; Format_description_log_event(uint8 binlog_ver, const char* server_ver=0); Format_description_log_event(const char* buf, uint event_len, const Format_description_log_event *description_event); - ~Format_description_log_event() { my_free((uchar*)post_header_len, MYF(0)); } + ~Format_description_log_event() + { + my_free((uchar*)post_header_len, MYF(MY_ALLOW_ZERO_PTR)); + } Log_event_type get_type_code() { return FORMAT_DESCRIPTION_EVENT;} #ifndef MYSQL_CLIENT bool write(IO_CACHE* file); @@ -2486,7 +2519,7 @@ protected: */ bool fake_base; public: - char* block; + uchar* block; const char *event_buf; uint block_len; uint file_id; @@ -2497,7 +2530,7 @@ public: const char* table_name_arg, List<Item>& fields_arg, enum enum_duplicates handle_dup, bool ignore, - char* block_arg, uint block_len_arg, + uchar* block_arg, uint block_len_arg, bool using_trans); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); @@ -2552,7 +2585,7 @@ private: class Append_block_log_event: public Log_event { public: - char* block; + uchar* block; uint block_len; uint file_id; /* @@ -2569,7 +2602,7 @@ public: const char* db; #ifndef MYSQL_CLIENT - Append_block_log_event(THD* thd, const char* db_arg, char* block_arg, + Append_block_log_event(THD* thd, const char* db_arg, uchar* block_arg, uint block_len_arg, bool using_trans); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); @@ -2693,7 +2726,7 @@ class Begin_load_query_log_event: public Append_block_log_event public: #ifndef MYSQL_CLIENT Begin_load_query_log_event(THD* thd_arg, const char *db_arg, - char* block_arg, uint block_len_arg, + uchar* block_arg, uint block_len_arg, bool using_trans); #ifdef HAVE_REPLICATION Begin_load_query_log_event(THD* thd); diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index 6d5d86e42fe..7621bdc6291 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -11,9 +11,9 @@ // Old implementation of do_apply_event() int -Old_rows_log_event::do_apply_event(Rows_log_event *ev, const Relay_log_info *rli) +Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info *rli) { - DBUG_ENTER("Rows_log_event::do_apply_event(st_relay_log_info*)"); + DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)"); int error= 0; THD *thd= ev->thd; uchar const *row_start= ev->m_rows_buf; @@ -30,7 +30,7 @@ Old_rows_log_event::do_apply_event(Rows_log_event *ev, const Relay_log_info *rli This one is supposed to be set: just an extra check so that nothing strange has happened. */ - DBUG_ASSERT(ev->get_flags(Rows_log_event::STMT_END_F)); + DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F)); const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); close_thread_tables(thd); @@ -148,7 +148,7 @@ Old_rows_log_event::do_apply_event(Rows_log_event *ev, const Relay_log_info *rli thd->lock= 0; thd->is_slave_error= 1; const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); - DBUG_RETURN(Rows_log_event::ERR_BAD_TABLE_DEF); + DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF); } } } @@ -163,8 +163,8 @@ Old_rows_log_event::do_apply_event(Rows_log_event *ev, const Relay_log_info *rli TODO [/Matz]: Maybe the query cache should not be invalidated here? It might be that a table is not changed, even though it was locked for the statement. We do know that each - Rows_log_event contain at least one row, so after processing one - Rows_log_event, we can invalidate the query cache for the + Old_rows_log_event contain at least one row, so after processing one + Old_rows_log_event, we can invalidate the query cache for the associated table. */ for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global) @@ -200,12 +200,12 @@ Old_rows_log_event::do_apply_event(Rows_log_event *ev, const Relay_log_info *rli Make sure to set/clear them before executing the main body of the event. */ - if (ev->get_flags(Rows_log_event::NO_FOREIGN_KEY_CHECKS_F)) + if (ev->get_flags(Old_rows_log_event::NO_FOREIGN_KEY_CHECKS_F)) thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS; else thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; - if (ev->get_flags(Rows_log_event::RELAXED_UNIQUE_CHECKS_F)) + if (ev->get_flags(Old_rows_log_event::RELAXED_UNIQUE_CHECKS_F)) thd->options|= OPTION_RELAXED_UNIQUE_CHECKS; else thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; @@ -275,7 +275,7 @@ Old_rows_log_event::do_apply_event(Rows_log_event *ev, const Relay_log_info *rli We need to delay this clear until the table def is no longer needed. The table def is needed in unpack_row(). */ - if (rli->tables_to_lock && ev->get_flags(Rows_log_event::STMT_END_F)) + if (rli->tables_to_lock && ev->get_flags(Old_rows_log_event::STMT_END_F)) const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); if (error) @@ -311,7 +311,7 @@ Old_rows_log_event::do_apply_event(Rows_log_event *ev, const Relay_log_info *rli */ if (table && (table->s->primary_key == MAX_KEY) && !ev->cache_stmt && - ev->get_flags(Rows_log_event::STMT_END_F) == Rows_log_event::RLE_NO_FLAGS) + ev->get_flags(Old_rows_log_event::STMT_END_F) == Old_rows_log_event::RLE_NO_FLAGS) { /* ------------ Temporary fix until WL#2975 is implemented --------- @@ -323,7 +323,7 @@ Old_rows_log_event::do_apply_event(Rows_log_event *ev, const Relay_log_info *rli present, and idempotency is not guaranteed (no PK) so we risk that repeating leads to double insert. So we desperately try to continue, hope we'll eventually leave this buggy situation (by - executing the final Rows_log_event). If we are in a hopeless + executing the final Old_rows_log_event). If we are in a hopeless wait (reached end of last relay log and nothing gets appended there), we timeout after one minute, and notify DBA about the problem. When WL#2975 is implemented, just remove the member @@ -336,6 +336,7 @@ Old_rows_log_event::do_apply_event(Rows_log_event *ev, const Relay_log_info *rli } #endif + #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) /* @@ -350,6 +351,7 @@ last_uniq_key(TABLE *table, uint keyno) return 1; } + /* Compares table->record[0] and table->record[1] @@ -428,6 +430,7 @@ record_compare_exit: return result; } + /* Copy "extra" columns from record[1] to record[0]. @@ -516,6 +519,7 @@ copy_extra_record_fields(TABLE *table, DBUG_RETURN(0); // All OK } + /* Replace the provided record in the database. @@ -668,6 +672,7 @@ replace_record(THD *thd, TABLE *table, DBUG_RETURN(error); } + /** Find the row given by 'key', if the table has keys, or else use a table scan to find (and fetch) the row. @@ -879,6 +884,7 @@ static int find_and_fetch_row(TABLE *table, uchar *key) DBUG_RETURN(0); } + /********************************************************** Row handling primitives for Write_rows_log_event_old **********************************************************/ @@ -944,6 +950,7 @@ int Write_rows_log_event_old::do_before_row_operations(TABLE *table) return error; } + int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error) { int local_error= 0; @@ -962,6 +969,7 @@ int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error) return error? error : local_error; } + int Write_rows_log_event_old::do_prepare_row(THD *thd_arg, Relay_log_info const *rli, @@ -981,6 +989,7 @@ Write_rows_log_event_old::do_prepare_row(THD *thd_arg, return error; } + int Write_rows_log_event_old::do_exec_row(TABLE *table) { DBUG_ASSERT(table != NULL); @@ -988,6 +997,7 @@ int Write_rows_log_event_old::do_exec_row(TABLE *table) return error; } + /********************************************************** Row handling primitives for Delete_rows_log_event_old **********************************************************/ @@ -1029,6 +1039,7 @@ int Delete_rows_log_event_old::do_before_row_operations(TABLE *table) return error; } + int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error) { /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ @@ -1041,6 +1052,7 @@ int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error) return error; } + int Delete_rows_log_event_old::do_prepare_row(THD *thd_arg, Relay_log_info const *rli, @@ -1074,6 +1086,7 @@ Delete_rows_log_event_old::do_prepare_row(THD *thd_arg, return error; } + int Delete_rows_log_event_old::do_exec_row(TABLE *table) { int error; @@ -1091,6 +1104,7 @@ int Delete_rows_log_event_old::do_exec_row(TABLE *table) return error; } + /********************************************************** Row handling primitives for Update_rows_log_event_old **********************************************************/ @@ -1124,6 +1138,7 @@ int Update_rows_log_event_old::do_before_row_operations(TABLE *table) return error; } + int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error) { /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ @@ -1136,6 +1151,7 @@ int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error) return error; } + int Update_rows_log_event_old::do_prepare_row(THD *thd_arg, Relay_log_info const *rli, TABLE *table, @@ -1179,6 +1195,7 @@ int Update_rows_log_event_old::do_prepare_row(THD *thd_arg, return error; } + int Update_rows_log_event_old::do_exec_row(TABLE *table) { DBUG_ASSERT(table != NULL); @@ -1217,3 +1234,1676 @@ int Update_rows_log_event_old::do_exec_row(TABLE *table) } #endif + + +/************************************************************************** + Rows_log_event member functions +**************************************************************************/ + +#ifndef MYSQL_CLIENT +Old_rows_log_event::Old_rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, + MY_BITMAP const *cols, + bool is_transactional) + : Log_event(thd_arg, 0, is_transactional), + m_row_count(0), + m_table(tbl_arg), + m_table_id(tid), + m_width(tbl_arg ? tbl_arg->s->fields : 1), + m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0) +#ifdef HAVE_REPLICATION + , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL) +#endif +{ + + // This constructor should not be reached. + assert(0); + + /* + We allow a special form of dummy event when the table, and cols + are null and the table id is ~0UL. This is a temporary + solution, to be able to terminate a started statement in the + binary log: the extraneous events will be removed in the future. + */ + DBUG_ASSERT(tbl_arg && tbl_arg->s && tid != ~0UL || + !tbl_arg && !cols && tid == ~0UL); + + if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS) + set_flags(NO_FOREIGN_KEY_CHECKS_F); + if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS) + set_flags(RELAXED_UNIQUE_CHECKS_F); + /* if bitmap_init fails, caught in is_valid() */ + if (likely(!bitmap_init(&m_cols, + m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL, + m_width, + false))) + { + /* Cols can be zero if this is a dummy binrows event */ + if (likely(cols != NULL)) + { + memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols)); + create_last_word_mask(&m_cols); + } + } + else + { + // Needed because bitmap_init() does not set it to null on failure + m_cols.bitmap= 0; + } +} +#endif + + +Old_rows_log_event::Old_rows_log_event(const char *buf, uint event_len, + Log_event_type event_type, + const Format_description_log_event + *description_event) + : Log_event(buf, description_event), + m_row_count(0), +#ifndef MYSQL_CLIENT + m_table(NULL), +#endif + m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0) +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL) +#endif +{ + DBUG_ENTER("Old_rows_log_event::Old_Rows_log_event(const char*,...)"); + uint8 const common_header_len= description_event->common_header_len; + uint8 const post_header_len= description_event->post_header_len[event_type-1]; + + DBUG_PRINT("enter",("event_len: %u common_header_len: %d " + "post_header_len: %d", + event_len, common_header_len, + post_header_len)); + + const char *post_start= buf + common_header_len; + DBUG_DUMP("post_header", (uchar*) post_start, post_header_len); + post_start+= RW_MAPID_OFFSET; + if (post_header_len == 6) + { + /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */ + m_table_id= uint4korr(post_start); + post_start+= 4; + } + else + { + m_table_id= (ulong) uint6korr(post_start); + post_start+= RW_FLAGS_OFFSET; + } + + m_flags= uint2korr(post_start); + + uchar const *const var_start= + (const uchar *)buf + common_header_len + post_header_len; + uchar const *const ptr_width= var_start; + uchar *ptr_after_width= (uchar*) ptr_width; + DBUG_PRINT("debug", ("Reading from %p", ptr_after_width)); + m_width = net_field_length(&ptr_after_width); + DBUG_PRINT("debug", ("m_width=%lu", m_width)); + /* if bitmap_init fails, catched in is_valid() */ + if (likely(!bitmap_init(&m_cols, + m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL, + m_width, + false))) + { + DBUG_PRINT("debug", ("Reading from %p", ptr_after_width)); + memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8); + create_last_word_mask(&m_cols); + ptr_after_width+= (m_width + 7) / 8; + DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); + } + else + { + // Needed because bitmap_init() does not set it to null on failure + m_cols.bitmap= NULL; + DBUG_VOID_RETURN; + } + + const uchar* const ptr_rows_data= (const uchar*) ptr_after_width; + size_t const data_size= event_len - (ptr_rows_data - (const uchar *) buf); + DBUG_PRINT("info",("m_table_id: %lu m_flags: %d m_width: %lu data_size: %lu", + m_table_id, m_flags, m_width, (ulong) data_size)); + DBUG_DUMP("rows_data", (uchar*) ptr_rows_data, data_size); + + m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME)); + if (likely((bool)m_rows_buf)) + { +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + m_curr_row= m_rows_buf; +#endif + m_rows_end= m_rows_buf + data_size; + m_rows_cur= m_rows_end; + memcpy(m_rows_buf, ptr_rows_data, data_size); + } + else + m_cols.bitmap= 0; // to not free it + + DBUG_VOID_RETURN; +} + + +Old_rows_log_event::~Old_rows_log_event() +{ + if (m_cols.bitmap == m_bitbuf) // no my_malloc happened + m_cols.bitmap= 0; // so no my_free in bitmap_free + bitmap_free(&m_cols); // To pair with bitmap_init(). + my_free((uchar*)m_rows_buf, MYF(MY_ALLOW_ZERO_PTR)); +} + + +int Old_rows_log_event::get_data_size() +{ + uchar buf[sizeof(m_width)+1]; + uchar *end= net_store_length(buf, (m_width + 7) / 8); + + DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", + return 6 + no_bytes_in_map(&m_cols) + (end - buf) + + (m_rows_cur - m_rows_buf);); + int data_size= ROWS_HEADER_LEN; + data_size+= no_bytes_in_map(&m_cols); + data_size+= end - buf; + + data_size+= (m_rows_cur - m_rows_buf); + return data_size; +} + + +#ifndef MYSQL_CLIENT +int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length) +{ + /* + When the table has a primary key, we would probably want, by default, to + log only the primary key value instead of the entire "before image". This + would save binlog space. TODO + */ + DBUG_ENTER("Old_rows_log_event::do_add_row_data"); + DBUG_PRINT("enter", ("row_data: 0x%lx length: %lu", (ulong) row_data, + (ulong) length)); + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ +#ifndef HAVE_purify + DBUG_DUMP("row_data", row_data, min(length, 32)); +#endif + + DBUG_ASSERT(m_rows_buf <= m_rows_cur); + DBUG_ASSERT(!m_rows_buf || m_rows_end && m_rows_buf < m_rows_end); + DBUG_ASSERT(m_rows_cur <= m_rows_end); + + /* The cast will always work since m_rows_cur <= m_rows_end */ + if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length) + { + size_t const block_size= 1024; + my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf; + my_ptrdiff_t const new_alloc= + block_size * ((cur_size + length + block_size - 1) / block_size); + + uchar* const new_buf= (uchar*)my_realloc((uchar*)m_rows_buf, (uint) new_alloc, + MYF(MY_ALLOW_ZERO_PTR|MY_WME)); + if (unlikely(!new_buf)) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + /* If the memory moved, we need to move the pointers */ + if (new_buf != m_rows_buf) + { + m_rows_buf= new_buf; + m_rows_cur= m_rows_buf + cur_size; + } + + /* + The end pointer should always be changed to point to the end of + the allocated memory. + */ + m_rows_end= m_rows_buf + new_alloc; + } + + DBUG_ASSERT(m_rows_cur + length <= m_rows_end); + memcpy(m_rows_cur, row_data, length); + m_rows_cur+= length; + m_row_count++; + DBUG_RETURN(0); +} +#endif + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) +{ + DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)"); + int error= 0; + + /* + If m_table_id == ~0UL, then we have a dummy event that does not + contain any data. In that case, we just remove all tables in the + tables_to_lock list, close the thread tables, and return with + success. + */ + if (m_table_id == ~0UL) + { + /* + This one is supposed to be set: just an extra check so that + nothing strange has happened. + */ + DBUG_ASSERT(get_flags(STMT_END_F)); + + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + close_thread_tables(thd); + thd->clear_error(); + DBUG_RETURN(0); + } + + /* + 'thd' has been set by exec_relay_log_event(), just before calling + do_apply_event(). We still check here to prevent future coding + errors. + */ + DBUG_ASSERT(rli->sql_thd == thd); + + /* + If there is no locks taken, this is the first binrow event seen + after the table map events. We should then lock all the tables + used in the transaction and proceed with execution of the actual + event. + */ + if (!thd->lock) + { + bool need_reopen= 1; /* To execute the first lap of the loop below */ + + /* + lock_tables() reads the contents of thd->lex, so they must be + initialized. Contrary to in + Table_map_log_event::do_apply_event() we don't call + mysql_init_query() as that may reset the binlog format. + */ + lex_start(thd); + + while ((error= lock_tables(thd, rli->tables_to_lock, + rli->tables_to_lock_count, &need_reopen))) + { + if (!need_reopen) + { + if (thd->is_slave_error || thd->is_fatal_error) + { + /* + Error reporting borrowed from Query_log_event with many excessive + simplifications (we don't honour --slave-skip-errors) + */ + uint actual_error= thd->net.client_last_errno; + rli->report(ERROR_LEVEL, actual_error, + "Error '%s' in %s event: when locking tables", + (actual_error ? thd->net.client_last_error : + "unexpected success or fatal error"), + get_type_str()); + thd->is_fatal_error= 1; + } + else + { + rli->report(ERROR_LEVEL, error, + "Error in %s event: when locking tables", + get_type_str()); + } + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + DBUG_RETURN(error); + } + + /* + So we need to reopen the tables. + + We need to flush the pending RBR event, since it keeps a + pointer to an open table. + + ALTERNATIVE SOLUTION (not implemented): Extract a pointer to + the pending RBR event and reset the table pointer after the + tables has been reopened. + + NOTE: For this new scheme there should be no pending event: + need to add code to assert that is the case. + */ + thd->binlog_flush_pending_rows_event(false); + TABLE_LIST *tables= rli->tables_to_lock; + close_tables_for_reopen(thd, &tables); + + uint tables_count= rli->tables_to_lock_count; + if ((error= open_tables(thd, &tables, &tables_count, 0))) + { + if (thd->is_slave_error || thd->is_fatal_error) + { + /* + Error reporting borrowed from Query_log_event with many excessive + simplifications (we don't honour --slave-skip-errors) + */ + uint actual_error= thd->net.client_last_errno; + rli->report(ERROR_LEVEL, actual_error, + "Error '%s' on reopening tables", + (actual_error ? thd->net.client_last_error : + "unexpected success or fatal error")); + thd->is_slave_error= 1; + } + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + DBUG_RETURN(error); + } + } + + /* + When the open and locking succeeded, we check all tables to + ensure that they still have the correct type. + + We can use a down cast here since we know that every table added + to the tables_to_lock is a RPL_TABLE_LIST. + */ + + { + RPL_TABLE_LIST *ptr= rli->tables_to_lock; + for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global)) + { + if (ptr->m_tabledef.compatible_with(rli, ptr->table)) + { + mysql_unlock_tables(thd, thd->lock); + thd->lock= 0; + thd->is_slave_error= 1; + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + DBUG_RETURN(ERR_BAD_TABLE_DEF); + } + } + } + + /* + ... and then we add all the tables to the table map and remove + them from tables to lock. + + We also invalidate the query cache for all the tables, since + they will now be changed. + + TODO [/Matz]: Maybe the query cache should not be invalidated + here? It might be that a table is not changed, even though it + was locked for the statement. We do know that each + Old_rows_log_event contain at least one row, so after processing one + Old_rows_log_event, we can invalidate the query cache for the + associated table. + */ + for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global) + { + const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table); + } +#ifdef HAVE_QUERY_CACHE + query_cache.invalidate_locked_for_write(rli->tables_to_lock); +#endif + } + + TABLE* + table= + m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id); + + if (table) + { + /* + table == NULL means that this table should not be replicated + (this was set up by Table_map_log_event::do_apply_event() + which tested replicate-* rules). + */ + + /* + It's not needed to set_time() but + 1) it continues the property that "Time" in SHOW PROCESSLIST shows how + much slave is behind + 2) it will be needed when we allow replication from a table with no + TIMESTAMP column to a table with one. + So we call set_time(), like in SBR. Presently it changes nothing. + */ + thd->set_time((time_t)when); + /* + There are a few flags that are replicated with each row event. + Make sure to set/clear them before executing the main body of + the event. + */ + if (get_flags(NO_FOREIGN_KEY_CHECKS_F)) + thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS; + else + thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; + + if (get_flags(RELAXED_UNIQUE_CHECKS_F)) + thd->options|= OPTION_RELAXED_UNIQUE_CHECKS; + else + thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; + /* A small test to verify that objects have consistent types */ + DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); + + /* + Now we are in a statement and will stay in a statement until we + see a STMT_END_F. + + We set this flag here, before actually applying any rows, in + case the SQL thread is stopped and we need to detect that we're + inside a statement and halting abruptly might cause problems + when restarting. + */ + const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT); + + if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols)) + set_flags(COMPLETE_ROWS_F); + + /* + Set tables write and read sets. + + Read_set contains all slave columns (in case we are going to fetch + a complete record from slave) + + Write_set equals the m_cols bitmap sent from master but it can be + longer if slave has extra columns. + */ + + DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols); + + bitmap_set_all(table->read_set); + bitmap_set_all(table->write_set); + if (!get_flags(COMPLETE_ROWS_F)) + bitmap_intersect(table->write_set,&m_cols); + + // Do event specific preparations + + error= do_before_row_operations(rli); + + // row processing loop + + while (error == 0 && m_curr_row < m_rows_end) + { + /* in_use can have been set to NULL in close_tables_for_reopen */ + THD* old_thd= table->in_use; + if (!table->in_use) + table->in_use= thd; + + error= do_exec_row(rli); + + table->in_use = old_thd; + switch (error) + { + case 0: + break; + + /* Some recoverable errors */ + case HA_ERR_RECORD_CHANGED: + case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if + tuple does not exist */ + error= 0; + break; + + default: + rli->report(ERROR_LEVEL, thd->net.client_last_errno, + "Error in %s event: row application failed. %s", + get_type_str(), + thd->net.client_last_error ? thd->net.client_last_error : ""); + thd->is_slave_error= 1; + break; + } + + /* + If m_curr_row_end was not set during event execution (e.g., because + of errors) we can't proceed to the next row. If the error is transient + (i.e., error==0 at this point) we must call unpack_current_row() to set + m_curr_row_end. + */ + + DBUG_PRINT("info", ("error: %d", error)); + DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu", + (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end)); + + if (!m_curr_row_end && !error) + unpack_current_row(rli); + + // at this moment m_curr_row_end should be set + DBUG_ASSERT(error || m_curr_row_end != NULL); + DBUG_ASSERT(error || m_curr_row < m_curr_row_end); + DBUG_ASSERT(error || m_curr_row_end <= m_rows_end); + + m_curr_row= m_curr_row_end; + + } // row processing loop + + DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event", + const_cast<Relay_log_info*>(rli)->abort_slave= 1;); + error= do_after_row_operations(rli, error); + if (!cache_stmt) + { + DBUG_PRINT("info", ("Marked that we need to keep log")); + thd->options|= OPTION_KEEP_LOG; + } + } // if (table) + + /* + We need to delay this clear until here bacause unpack_current_row() uses + master-side table definitions stored in rli. + */ + if (rli->tables_to_lock && get_flags(STMT_END_F)) + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + + if (error) + { /* error has occured during the transaction */ + rli->report(ERROR_LEVEL, thd->net.client_last_errno, + "Error in %s event: error during transaction execution " + "on table %s.%s. %s", + get_type_str(), table->s->db.str, + table->s->table_name.str, + thd->net.client_last_error ? thd->net.client_last_error : ""); + + /* + If one day we honour --skip-slave-errors in row-based replication, and + the error should be skipped, then we would clear mappings, rollback, + close tables, but the slave SQL thread would not stop and then may + assume the mapping is still available, the tables are still open... + So then we should clear mappings/rollback/close here only if this is a + STMT_END_F. + For now we code, knowing that error is not skippable and so slave SQL + thread is certainly going to stop. + rollback at the caller along with sbr. + */ + thd->reset_current_stmt_binlog_row_based(); + const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); + thd->is_slave_error= 1; + DBUG_RETURN(error); + } + + /* + This code would ideally be placed in do_update_pos() instead, but + since we have no access to table there, we do the setting of + last_event_start_time here instead. + */ + if (table && (table->s->primary_key == MAX_KEY) && + !cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS) + { + /* + ------------ Temporary fix until WL#2975 is implemented --------- + + This event is not the last one (no STMT_END_F). If we stop now + (in case of terminate_slave_thread()), how will we restart? We + have to restart from Table_map_log_event, but as this table is + not transactional, the rows already inserted will still be + present, and idempotency is not guaranteed (no PK) so we risk + that repeating leads to double insert. So we desperately try to + continue, hope we'll eventually leave this buggy situation (by + executing the final Old_rows_log_event). If we are in a hopeless + wait (reached end of last relay log and nothing gets appended + there), we timeout after one minute, and notify DBA about the + problem. When WL#2975 is implemented, just remove the member + Relay_log_info::last_event_start_time and all its occurrences. + */ + const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0); + } + + DBUG_RETURN(0); +} + + +Log_event::enum_skip_reason +Old_rows_log_event::do_shall_skip(Relay_log_info *rli) +{ + /* + If the slave skip counter is 1 and this event does not end a + statement, then we should not start executing on the next event. + Otherwise, we defer the decision to the normal skipping logic. + */ + if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F)) + return Log_event::EVENT_SKIP_IGNORE; + else + return Log_event::do_shall_skip(rli); +} + +int +Old_rows_log_event::do_update_pos(Relay_log_info *rli) +{ + DBUG_ENTER("Old_rows_log_event::do_update_pos"); + int error= 0; + + DBUG_PRINT("info", ("flags: %s", + get_flags(STMT_END_F) ? "STMT_END_F " : "")); + + if (get_flags(STMT_END_F)) + { + /* + This is the end of a statement or transaction, so close (and + unlock) the tables we opened when processing the + Table_map_log_event starting the statement. + + OBSERVER. This will clear *all* mappings, not only those that + are open for the table. There is not good handle for on-close + actions for tables. + + NOTE. Even if we have no table ('table' == 0) we still need to be + here, so that we increase the group relay log position. If we didn't, we + could have a group relay log position which lags behind "forever" + (assume the last master's transaction is ignored by the slave because of + replicate-ignore rules). + */ + thd->binlog_flush_pending_rows_event(true); + + /* + If this event is not in a transaction, the call below will, if some + transactional storage engines are involved, commit the statement into + them and flush the pending event to binlog. + If this event is in a transaction, the call will do nothing, but a + Xid_log_event will come next which will, if some transactional engines + are involved, commit the transaction and flush the pending event to the + binlog. + */ + error= ha_autocommit_or_rollback(thd, 0); + + /* + Now what if this is not a transactional engine? we still need to + flush the pending event to the binlog; we did it with + thd->binlog_flush_pending_rows_event(). Note that we imitate + what is done for real queries: a call to + ha_autocommit_or_rollback() (sometimes only if involves a + transactional engine), and a call to be sure to have the pending + event flushed. + */ + + thd->reset_current_stmt_binlog_row_based(); + rli->cleanup_context(thd, 0); + if (error == 0) + { + /* + Indicate that a statement is finished. + Step the group log position if we are not in a transaction, + otherwise increase the event log position. + */ + rli->stmt_done(log_pos, when); + + /* + Clear any errors pushed in thd->net.client_last_err* if for + example "no key found" (as this is allowed). This is a safety + measure; apparently those errors (e.g. when executing a + Delete_rows_log_event_old of a non-existing row, like in + rpl_row_mystery22.test, thd->net.client_last_error = "Can't + find record in 't1'" and last_errno=1032) do not become + visible. We still prefer to wipe them out. + */ + thd->clear_error(); + } + else + rli->report(ERROR_LEVEL, error, + "Error in %s event: commit of row events failed, " + "table `%s`.`%s`", + get_type_str(), m_table->s->db.str, + m_table->s->table_name.str); + } + else + { + rli->inc_event_relay_log_pos(); + } + + DBUG_RETURN(error); +} + +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + + +#ifndef MYSQL_CLIENT +bool Old_rows_log_event::write_data_header(IO_CACHE *file) +{ + uchar buf[ROWS_HEADER_LEN]; // No need to init the buffer + + // This method should not be reached. + assert(0); + + DBUG_ASSERT(m_table_id != ~0UL); + DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", + { + int4store(buf + 0, m_table_id); + int2store(buf + 4, m_flags); + return (my_b_safe_write(file, buf, 6)); + }); + int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id); + int2store(buf + RW_FLAGS_OFFSET, m_flags); + return (my_b_safe_write(file, buf, ROWS_HEADER_LEN)); +} + + +bool Old_rows_log_event::write_data_body(IO_CACHE*file) +{ + /* + Note that this should be the number of *bits*, not the number of + bytes. + */ + uchar sbuf[sizeof(m_width)]; + my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf; + + // This method should not be reached. + assert(0); + + bool res= false; + uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width); + DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); + + DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf)); + res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf)); + + DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); + res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap, + no_bytes_in_map(&m_cols)); + DBUG_DUMP("rows", m_rows_buf, data_size); + res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size); + + return res; + +} +#endif + + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +void Old_rows_log_event::pack_info(Protocol *protocol) +{ + char buf[256]; + char const *const flagstr= + get_flags(STMT_END_F) ? " flags: STMT_END_F" : ""; + size_t bytes= my_snprintf(buf, sizeof(buf), + "table_id: %lu%s", m_table_id, flagstr); + protocol->store(buf, bytes, &my_charset_bin); +} +#endif + + +#ifdef MYSQL_CLIENT +void Old_rows_log_event::print_helper(FILE *file, + PRINT_EVENT_INFO *print_event_info, + char const *const name) +{ + IO_CACHE *const head= &print_event_info->head_cache; + IO_CACHE *const body= &print_event_info->body_cache; + if (!print_event_info->short_form) + { + bool const last_stmt_event= get_flags(STMT_END_F); + print_header(head, print_event_info, !last_stmt_event); + my_b_printf(head, "\t%s: table id %lu%s\n", + name, m_table_id, + last_stmt_event ? " flags: STMT_END_F" : ""); + print_base64(body, print_event_info, !last_stmt_event); + } + + if (get_flags(STMT_END_F)) + { + copy_event_cache_to_file_and_reinit(head, file); + copy_event_cache_to_file_and_reinit(body, file); + } +} +#endif + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +/** + Write the current row into event's table. + + The row is located in the row buffer, pointed by @c m_curr_row member. + Number of columns of the row is stored in @c m_width member (it can be + different from the number of columns in the table to which we insert). + Bitmap @c m_cols indicates which columns are present in the row. It is assumed + that event's table is already open and pointed by @c m_table. + + If the same record already exists in the table it can be either overwritten + or an error is reported depending on the value of @c overwrite flag + (error reporting not yet implemented). Note that the matching record can be + different from the row we insert if we use primary keys to identify records in + the table. + + The row to be inserted can contain values only for selected columns. The + missing columns are filled with default values using @c prepare_record() + function. If a matching record is found in the table and @c overwritte is + true, the missing columns are taken from it. + + @param rli Relay log info (needed for row unpacking). + @param overwrite + Shall we overwrite if the row already exists or signal + error (currently ignored). + + @returns Error code on failure, 0 on success. + + This method, if successful, sets @c m_curr_row_end pointer to point at the + next row in the rows buffer. This is done when unpacking the row to be + inserted. + + @note If a matching record is found, it is either updated using + @c ha_update_row() or first deleted and then new record written. +*/ + +int +Old_rows_log_event::write_row(const Relay_log_info *const rli, + const bool overwrite) +{ + DBUG_ENTER("write_row"); + DBUG_ASSERT(m_table != NULL && thd != NULL); + + TABLE *table= m_table; // pointer to event's table + int error; + int keynum; + auto_afree_ptr<char> key(NULL); + + /* fill table->record[0] with default values */ + + if ((error= prepare_record(rli, table, m_width, + TRUE /* check if columns have def. values */))) + DBUG_RETURN(error); + + /* unpack row into table->record[0] */ + error= unpack_current_row(rli); // TODO: how to handle errors? + +#ifndef DBUG_OFF + DBUG_DUMP("record[0]", table->record[0], table->s->reclength); + DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set); + DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set); +#endif + + /* + Try to write record. If a corresponding record already exists in the table, + we try to change it using ha_update_row() if possible. Otherwise we delete + it and repeat the whole process again. + + TODO: Add safety measures against infinite looping. + */ + + while ((error= table->file->ha_write_row(table->record[0]))) + { + if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT) + { + table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */ + DBUG_RETURN(error); + } + if ((keynum= table->file->get_dup_key(error)) < 0) + { + DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum)); + table->file->print_error(error, MYF(0)); + /* + We failed to retrieve the duplicate key + - either because the error was not "duplicate key" error + - or because the information which key is not available + */ + DBUG_RETURN(error); + } + + /* + We need to retrieve the old row into record[1] to be able to + either update or delete the offending record. We either: + + - use rnd_pos() with a row-id (available as dupp_row) to the + offending row, if that is possible (MyISAM and Blackhole), or else + + - use index_read_idx() with the key that is duplicated, to + retrieve the offending row. + */ + if (table->file->ha_table_flags() & HA_DUPLICATE_POS) + { + DBUG_PRINT("info",("Locating offending record using rnd_pos()")); + error= table->file->rnd_pos(table->record[1], table->file->dup_ref); + if (error) + { + DBUG_PRINT("info",("rnd_pos() returns error %d",error)); + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + } + else + { + DBUG_PRINT("info",("Locating offending record using index_read_idx()")); + + if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) + { + DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE")); + DBUG_RETURN(my_errno); + } + + if (key.get() == NULL) + { + key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length))); + if (key.get() == NULL) + { + DBUG_PRINT("info",("Can't allocate key buffer")); + DBUG_RETURN(ENOMEM); + } + } + + key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum, + 0); + error= table->file->index_read_idx_map(table->record[1], keynum, + (const uchar*)key.get(), + HA_WHOLE_KEY, + HA_READ_KEY_EXACT); + if (error) + { + DBUG_PRINT("info",("index_read_idx() returns error %d",error)); + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + } + + /* + Now, record[1] should contain the offending row. That + will enable us to update it or, alternatively, delete it (so + that we can insert the new row afterwards). + */ + + /* + If row is incomplete we will use the record found to fill + missing columns. + */ + if (!get_flags(COMPLETE_ROWS_F)) + { + restore_record(table,record[1]); + error= unpack_current_row(rli); + } + +#ifndef DBUG_OFF + DBUG_PRINT("debug",("preparing for update: before and after image")); + DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength); + DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength); +#endif + + /* + REPLACE is defined as either INSERT or DELETE + INSERT. If + possible, we can replace it with an UPDATE, but that will not + work on InnoDB if FOREIGN KEY checks are necessary. + + I (Matz) am not sure of the reason for the last_uniq_key() + check as, but I'm guessing that it's something along the + following lines. + + Suppose that we got the duplicate key to be a key that is not + the last unique key for the table and we perform an update: + then there might be another key for which the unique check will + fail, so we're better off just deleting the row and inserting + the correct row. + */ + if (last_uniq_key(table, keynum) && + !table->file->referenced_by_foreign_key()) + { + DBUG_PRINT("info",("Updating row using ha_update_row()")); + error=table->file->ha_update_row(table->record[1], + table->record[0]); + switch (error) { + + case HA_ERR_RECORD_IS_THE_SAME: + DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from" + " ha_update_row()")); + error= 0; + + case 0: + break; + + default: + DBUG_PRINT("info",("ha_update_row() returns error %d",error)); + table->file->print_error(error, MYF(0)); + } + + DBUG_RETURN(error); + } + else + { + DBUG_PRINT("info",("Deleting offending row and trying to write new one again")); + if ((error= table->file->ha_delete_row(table->record[1]))) + { + DBUG_PRINT("info",("ha_delete_row() returns error %d",error)); + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + /* Will retry ha_write_row() with the offending row removed. */ + } + } + + DBUG_RETURN(error); +} + + +/** + Locate the current row in event's table. + + The current row is pointed by @c m_curr_row. Member @c m_width tells how many + columns are there in the row (this can be differnet from the number of columns + in the table). It is assumed that event's table is already open and pointed + by @c m_table. + + If a corresponding record is found in the table it is stored in + @c m_table->record[0]. Note that when record is located based on a primary + key, it is possible that the record found differs from the row being located. + + If no key is specified or table does not have keys, a table scan is used to + find the row. In that case the row should be complete and contain values for + all columns. However, it can still be shorter than the table, i.e. the table + can contain extra columns not present in the row. It is also possible that + the table has fewer columns than the row being located. + + @returns Error code on failure, 0 on success. + + @post In case of success @c m_table->record[0] contains the record found. + Also, the internal "cursor" of the table is positioned at the record found. + + @note If the engine allows random access of the records, a combination of + @c position() and @c rnd_pos() will be used. + */ + +int Old_rows_log_event::find_row(const Relay_log_info *rli) +{ + DBUG_ENTER("find_row"); + + DBUG_ASSERT(m_table && m_table->in_use != NULL); + + TABLE *table= m_table; + int error; + + /* unpack row - missing fields get default values */ + + // TODO: shall we check and report errors here? + prepare_record(NULL,table,m_width,FALSE /* don't check errors */); + error= unpack_current_row(rli); + +#ifndef DBUG_OFF + DBUG_PRINT("info",("looking for the following record")); + DBUG_DUMP("record[0]", table->record[0], table->s->reclength); +#endif + + if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && + table->s->primary_key < MAX_KEY) + { + /* + Use a more efficient method to fetch the record given by + table->record[0] if the engine allows it. We first compute a + row reference using the position() member function (it will be + stored in table->file->ref) and the use rnd_pos() to position + the "cursor" (i.e., record[0] in this case) at the correct row. + + TODO: Add a check that the correct record has been fetched by + comparing with the original record. Take into account that the + record on the master and slave can be of different + length. Something along these lines should work: + + ADD>>> store_record(table,record[1]); + int error= table->file->rnd_pos(table->record[0], table->file->ref); + ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0], + table->s->reclength) == 0); + + */ + DBUG_PRINT("info",("locating record using primary key (position)")); + int error= table->file->rnd_pos_by_record(table->record[0]); + if (error) + { + DBUG_PRINT("info",("rnd_pos returns error %d",error)); + table->file->print_error(error, MYF(0)); + } + DBUG_RETURN(error); + } + + // We can't use position() - try other methods. + + /* + We need to retrieve all fields + TODO: Move this out from this function to main loop + */ + table->use_all_columns(); + + /* + Save copy of the record in table->record[1]. It might be needed + later if linear search is used to find exact match. + */ + store_record(table,record[1]); + + if (table->s->keys > 0) + { + DBUG_PRINT("info",("locating record using primary key (index_read)")); + + /* We have a key: search the table using the index */ + if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE))) + { + DBUG_PRINT("info",("ha_index_init returns error %d",error)); + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + + /* Fill key data for the row */ + + DBUG_ASSERT(m_key); + key_copy(m_key, table->record[0], table->key_info, 0); + + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ +#ifndef HAVE_purify + DBUG_DUMP("key data", m_key, table->key_info->key_length); +#endif + + /* + We need to set the null bytes to ensure that the filler bit are + all set when returning. There are storage engines that just set + the necessary bits on the bytes and don't set the filler bits + correctly. + */ + my_ptrdiff_t const pos= + table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0; + table->record[0][pos]= 0xFF; + + if ((error= table->file->index_read_map(table->record[0], m_key, + HA_WHOLE_KEY, + HA_READ_KEY_EXACT))) + { + DBUG_PRINT("info",("no record matching the key found in the table")); + table->file->print_error(error, MYF(0)); + table->file->ha_index_end(); + DBUG_RETURN(error); + } + + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ +#ifndef HAVE_purify + DBUG_PRINT("info",("found first matching record")); + DBUG_DUMP("record[0]", table->record[0], table->s->reclength); +#endif + /* + Below is a minor "optimization". If the key (i.e., key number + 0) has the HA_NOSAME flag set, we know that we have found the + correct record (since there can be no duplicates); otherwise, we + have to compare the record with the one found to see if it is + the correct one. + + CAVEAT! This behaviour is essential for the replication of, + e.g., the mysql.proc table since the correct record *shall* be + found using the primary key *only*. There shall be no + comparison of non-PK columns to decide if the correct record is + found. I can see no scenario where it would be incorrect to + chose the row to change only using a PK or an UNNI. + */ + if (table->key_info->flags & HA_NOSAME) + { + table->file->ha_index_end(); + DBUG_RETURN(0); + } + + /* + In case key is not unique, we still have to iterate over records found + and find the one which is identical to the row given. A copy of the + record we are looking for is stored in record[1]. + */ + DBUG_PRINT("info",("non-unique index, scanning it to find matching record")); + + while (record_compare(table)) + { + /* + We need to set the null bytes to ensure that the filler bit + are all set when returning. There are storage engines that + just set the necessary bits on the bytes and don't set the + filler bits correctly. + + TODO[record format ndb]: Remove this code once NDB returns the + correct record format. + */ + if (table->s->null_bytes > 0) + { + table->record[0][table->s->null_bytes - 1]|= + 256U - (1U << table->s->last_null_bit_pos); + } + + if ((error= table->file->index_next(table->record[0]))) + { + DBUG_PRINT("info",("no record matching the given row found")); + table->file->print_error(error, MYF(0)); + table->file->ha_index_end(); + DBUG_RETURN(error); + } + } + + /* + Have to restart the scan to be able to fetch the next row. + */ + table->file->ha_index_end(); + } + else + { + DBUG_PRINT("info",("locating record using table scan (rnd_next)")); + + int restart_count= 0; // Number of times scanning has restarted from top + + /* We don't have a key: search the table using rnd_next() */ + if ((error= table->file->ha_rnd_init(1))) + { + DBUG_PRINT("info",("error initializing table scan" + " (ha_rnd_init returns %d)",error)); + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + + /* Continue until we find the right record or have made a full loop */ + do + { + error= table->file->rnd_next(table->record[0]); + + switch (error) { + + case 0: + case HA_ERR_RECORD_DELETED: + break; + + case HA_ERR_END_OF_FILE: + if (++restart_count < 2) + table->file->ha_rnd_init(1); + break; + + default: + DBUG_PRINT("info", ("Failed to get next record" + " (rnd_next returns %d)",error)); + table->file->print_error(error, MYF(0)); + table->file->ha_rnd_end(); + DBUG_RETURN(error); + } + } + while (restart_count < 2 && record_compare(table)); + + /* + Note: above record_compare will take into accout all record fields + which might be incorrect in case a partial row was given in the event + */ + + /* + Have to restart the scan to be able to fetch the next row. + */ + if (restart_count == 2) + DBUG_PRINT("info", ("Record not found")); + else + DBUG_DUMP("record found", table->record[0], table->s->reclength); + table->file->ha_rnd_end(); + + DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0); + DBUG_RETURN(error); + } + + DBUG_RETURN(0); +} + +#endif + + +/************************************************************************** + Write_rows_log_event member functions +**************************************************************************/ + +/* + Constructor used to build an event for writing to the binary log. + */ +#if !defined(MYSQL_CLIENT) +Write_rows_log_event_old::Write_rows_log_event_old(THD *thd_arg, + TABLE *tbl_arg, + ulong tid_arg, + MY_BITMAP const *cols, + bool is_transactional) + : Old_rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional) +{ + + // This constructor should not be reached. + assert(0); + +} +#endif + + +/* + Constructor used by slave to read the event from the binary log. + */ +#ifdef HAVE_REPLICATION +Write_rows_log_event_old::Write_rows_log_event_old(const char *buf, + uint event_len, + const Format_description_log_event + *description_event) +: Old_rows_log_event(buf, event_len, PRE_GA_WRITE_ROWS_EVENT, + description_event) +{ +} +#endif + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int +Write_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const) +{ + int error= 0; + + /* + We are using REPLACE semantics and not INSERT IGNORE semantics + when writing rows, that is: new rows replace old rows. We need to + inform the storage engine that it should use this behaviour. + */ + + /* Tell the storage engine that we are using REPLACE semantics. */ + thd->lex->duplicates= DUP_REPLACE; + + /* + Pretend we're executing a REPLACE command: this is needed for + InnoDB and NDB Cluster since they are not (properly) checking the + lex->duplicates flag. + */ + thd->lex->sql_command= SQLCOM_REPLACE; + /* + Do not raise the error flag in case of hitting to an unique attribute + */ + m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); + /* + NDB specific: update from ndb master wrapped as Write_rows + */ + /* + so that the event should be applied to replace slave's row + */ + m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + /* + NDB specific: if update from ndb master wrapped as Write_rows + does not find the row it's assumed idempotent binlog applying + is taking place; don't raise the error. + */ + m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY); + /* + TODO: the cluster team (Tomas?) says that it's better if the engine knows + how many rows are going to be inserted, then it can allocate needed memory + from the start. + */ + m_table->file->ha_start_bulk_insert(0); + /* + We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill + any TIMESTAMP column with data from the row but instead will use + the event's current time. + As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra + columns, we know that all TIMESTAMP columns on slave will receive explicit + data from the row, so TIMESTAMP_NO_AUTO_SET is ok. + When we allow a table without TIMESTAMP to be replicated to a table having + more columns including a TIMESTAMP column, or when we allow a TIMESTAMP + column to be replicated into a BIGINT column and the slave's table has a + TIMESTAMP column, then the slave's TIMESTAMP column will take its value + from set_time() which we called earlier (consistent with SBR). And then in + some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to + analyze if explicit data is provided for slave's TIMESTAMP columns). + */ + m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; + return error; +} + + +int +Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const, + int error) +{ + int local_error= 0; + m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); + /* + reseting the extra with + table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); + fires bug#27077 + todo: explain or fix + */ + if ((local_error= m_table->file->ha_end_bulk_insert())) + { + m_table->file->print_error(local_error, MYF(0)); + } + return error? error : local_error; +} + + +int +Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +{ + DBUG_ASSERT(m_table != NULL); + int error= write_row(rli, TRUE /* overwrite */); + + if (error && !thd->net.client_last_errno) + thd->net.client_last_errno= error; + + return error; +} + +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + + +#ifdef MYSQL_CLIENT +void Write_rows_log_event_old::print(FILE *file, + PRINT_EVENT_INFO* print_event_info) +{ + Old_rows_log_event::print_helper(file, print_event_info, "Write_rows_old"); +} +#endif + + +/************************************************************************** + Delete_rows_log_event member functions +**************************************************************************/ + +/* + Constructor used to build an event for writing to the binary log. + */ + +#ifndef MYSQL_CLIENT +Delete_rows_log_event_old::Delete_rows_log_event_old(THD *thd_arg, + TABLE *tbl_arg, + ulong tid, + MY_BITMAP const *cols, + bool is_transactional) + : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional), + m_after_image(NULL), m_memory(NULL) +{ + + // This constructor should not be reached. + assert(0); + +} +#endif /* #if !defined(MYSQL_CLIENT) */ + + +/* + Constructor used by slave to read the event from the binary log. + */ +#ifdef HAVE_REPLICATION +Delete_rows_log_event_old::Delete_rows_log_event_old(const char *buf, + uint event_len, + const Format_description_log_event + *description_event) + : Old_rows_log_event(buf, event_len, PRE_GA_DELETE_ROWS_EVENT, + description_event), + m_after_image(NULL), m_memory(NULL) +{ +} +#endif + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + +int +Delete_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const) +{ + if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && + m_table->s->primary_key < MAX_KEY) + { + /* + We don't need to allocate any memory for m_key since it is not used. + */ + return 0; + } + + if (m_table->s->keys > 0) + { + // Allocate buffer for key searches + m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME)); + if (!m_key) + return HA_ERR_OUT_OF_MEM; + } + return 0; +} + + +int +Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const, + int error) +{ + /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ + m_table->file->ha_index_or_rnd_end(); + my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); + m_key= NULL; + + return error; +} + + +int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +{ + int error; + DBUG_ASSERT(m_table != NULL); + + if (!(error= find_row(rli))) + { + /* + Delete the record found, located in record[0] + */ + error= m_table->file->ha_delete_row(m_table->record[0]); + } + return error; +} + +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + + +#ifdef MYSQL_CLIENT +void Delete_rows_log_event_old::print(FILE *file, + PRINT_EVENT_INFO* print_event_info) +{ + Old_rows_log_event::print_helper(file, print_event_info, "Delete_rows_old"); +} +#endif + + +/************************************************************************** + Update_rows_log_event member functions +**************************************************************************/ + +/* + Constructor used to build an event for writing to the binary log. + */ +#if !defined(MYSQL_CLIENT) +Update_rows_log_event_old::Update_rows_log_event_old(THD *thd_arg, + TABLE *tbl_arg, + ulong tid, + MY_BITMAP const *cols, + bool is_transactional) + : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional), + m_after_image(NULL), m_memory(NULL) +{ + + // This constructor should not be reached. + assert(0); +} +#endif /* !defined(MYSQL_CLIENT) */ + + +/* + Constructor used by slave to read the event from the binary log. + */ +#ifdef HAVE_REPLICATION +Update_rows_log_event_old::Update_rows_log_event_old(const char *buf, + uint event_len, + const + Format_description_log_event + *description_event) + : Old_rows_log_event(buf, event_len, PRE_GA_UPDATE_ROWS_EVENT, + description_event), + m_after_image(NULL), m_memory(NULL) +{ +} +#endif + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + +int +Update_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const) +{ + if (m_table->s->keys > 0) + { + // Allocate buffer for key searches + m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME)); + if (!m_key) + return HA_ERR_OUT_OF_MEM; + } + + m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; + + return 0; +} + + +int +Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const, + int error) +{ + /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ + m_table->file->ha_index_or_rnd_end(); + my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc + m_key= NULL; + + return error; +} + + +int +Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +{ + DBUG_ASSERT(m_table != NULL); + + int error= find_row(rli); + if (error) + { + /* + We need to read the second image in the event of error to be + able to skip to the next pair of updates + */ + m_curr_row= m_curr_row_end; + unpack_current_row(rli); + return error; + } + + /* + This is the situation after locating BI: + + ===|=== before image ====|=== after image ===|=== + ^ ^ + m_curr_row m_curr_row_end + + BI found in the table is stored in record[0]. We copy it to record[1] + and unpack AI to record[0]. + */ + + store_record(m_table,record[1]); + + m_curr_row= m_curr_row_end; + error= unpack_current_row(rli); // this also updates m_curr_row_end + + /* + Now we have the right row to update. The old row (the one we're + looking for) is in record[1] and the new row is in record[0]. + */ +#ifndef HAVE_purify + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ + DBUG_PRINT("info",("Updating row in table")); + DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength); + DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength); +#endif + + error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]); + if (error == HA_ERR_RECORD_IS_THE_SAME) + error= 0; + + return error; +} + +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + + +#ifdef MYSQL_CLIENT +void Update_rows_log_event_old::print(FILE *file, + PRINT_EVENT_INFO* print_event_info) +{ + Old_rows_log_event::print_helper(file, print_event_info, "Update_rows_old"); +} +#endif diff --git a/sql/log_event_old.h b/sql/log_event_old.h index 81e55097905..719802a80fb 100644 --- a/sql/log_event_old.h +++ b/sql/log_event_old.h @@ -20,18 +20,261 @@ Need to include this file at the proper position of log_event.h */ + +/** + @file + + @brief This file contains classes handling old formats of row-based + binlog events. +*/ +/* + Around 2007-10-31, I made these classes completely separated from + the new classes (before, there was a complex class hierarchy + involving multiple inheritance; see BUG#31581), by simply copying + and pasting the entire contents of Rows_log_event into + Old_rows_log_event and the entire contents of + {Write|Update|Delete}_rows_log_event into + {Write|Update|Delete}_rows_log_event_old. For clarity, I will keep + the comments marking which code was cut-and-pasted for some time. + With the classes collapsed into one, there is probably some + redundancy (maybe some methods can be simplified and/or removed), + but we keep them this way for now. /Sven +*/ + + +/** + @class Old_rows_log_event -class Old_rows_log_event + Base class for the three types of row-based events + {Write|Update|Delete}_row_log_event_old, with event type codes + PRE_GA_{WRITE|UPDATE|DELETE}_ROWS_EVENT. These events are never + created any more, except when reading a relay log created by an old + server. +*/ +class Old_rows_log_event : public Log_event { - public: - - virtual ~Old_rows_log_event() {} + /********** BEGIN CUT & PASTE FROM Rows_log_event **********/ +public: + /** + Enumeration of the errors that can be returned. + */ + enum enum_error + { + ERR_OPEN_FAILURE = -1, /**< Failure to open table */ + ERR_OK = 0, /**< No error */ + ERR_TABLE_LIMIT_EXCEEDED = 1, /**< No more room for tables */ + ERR_OUT_OF_MEM = 2, /**< Out of memory */ + ERR_BAD_TABLE_DEF = 3, /**< Table definition does not match */ + ERR_RBR_TO_SBR = 4 /**< daisy-chanining RBR to SBR not allowed */ + }; + + /* + These definitions allow you to combine the flags into an + appropriate flag set using the normal bitwise operators. The + implicit conversion from an enum-constant to an integer is + accepted by the compiler, which is then used to set the real set + of flags. + */ + enum enum_flag + { + /* Last event of a statement */ + STMT_END_F = (1U << 0), + + /* Value of the OPTION_NO_FOREIGN_KEY_CHECKS flag in thd->options */ + NO_FOREIGN_KEY_CHECKS_F = (1U << 1), + + /* Value of the OPTION_RELAXED_UNIQUE_CHECKS flag in thd->options */ + RELAXED_UNIQUE_CHECKS_F = (1U << 2), + + /** + Indicates that rows in this event are complete, that is contain + values for all columns of the table. + */ + COMPLETE_ROWS_F = (1U << 3) + }; + + typedef uint16 flag_set; + + /* Special constants representing sets of flags */ + enum + { + RLE_NO_FLAGS = 0U + }; + + virtual ~Old_rows_log_event(); + + void set_flags(flag_set flags_arg) { m_flags |= flags_arg; } + void clear_flags(flag_set flags_arg) { m_flags &= ~flags_arg; } + flag_set get_flags(flag_set flags_arg) const { return m_flags & flags_arg; } + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual void pack_info(Protocol *protocol); +#endif + +#ifdef MYSQL_CLIENT + /* not for direct call, each derived has its own ::print() */ + virtual void print(FILE *file, PRINT_EVENT_INFO *print_event_info)= 0; +#endif + +#ifndef MYSQL_CLIENT + int add_row_data(uchar *data, size_t length) + { + return do_add_row_data(data,length); + } +#endif + + /* Member functions to implement superclass interface */ + virtual int get_data_size(); + + MY_BITMAP const *get_cols() const { return &m_cols; } + size_t get_width() const { return m_width; } + ulong get_table_id() const { return m_table_id; } + +#ifndef MYSQL_CLIENT + virtual bool write_data_header(IO_CACHE *file); + virtual bool write_data_body(IO_CACHE *file); + virtual const char *get_db() { return m_table->s->db.str; } +#endif + /* + Check that malloc() succeeded in allocating memory for the rows + buffer and the COLS vector. Checking that an Update_rows_log_event_old + is valid is done in the Update_rows_log_event_old::is_valid() + function. + */ + virtual bool is_valid() const + { + return m_rows_buf && m_cols.bitmap; + } + + uint m_row_count; /* The number of rows added to the event */ + +protected: + /* + The constructors are protected since you're supposed to inherit + this class, not create instances of this class. + */ +#ifndef MYSQL_CLIENT + Old_rows_log_event(THD*, TABLE*, ulong table_id, + MY_BITMAP const *cols, bool is_transactional); +#endif + Old_rows_log_event(const char *row_data, uint event_len, + Log_event_type event_type, + const Format_description_log_event *description_event); + +#ifdef MYSQL_CLIENT + void print_helper(FILE *, PRINT_EVENT_INFO *, char const *const name); +#endif + +#ifndef MYSQL_CLIENT + virtual int do_add_row_data(uchar *data, size_t length); +#endif + +#ifndef MYSQL_CLIENT + TABLE *m_table; /* The table the rows belong to */ +#endif + ulong m_table_id; /* Table ID */ + MY_BITMAP m_cols; /* Bitmap denoting columns available */ + ulong m_width; /* The width of the columns bitmap */ + + ulong m_master_reclength; /* Length of record on master side */ + + /* Bit buffers in the same memory as the class */ + uint32 m_bitbuf[128/(sizeof(uint32)*8)]; + uint32 m_bitbuf_ai[128/(sizeof(uint32)*8)]; + + uchar *m_rows_buf; /* The rows in packed format */ + uchar *m_rows_cur; /* One-after the end of the data */ + uchar *m_rows_end; /* One-after the end of the allocated space */ + + flag_set m_flags; /* Flags for row-level events */ + + /* helper functions */ + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + const uchar *m_curr_row; /* Start of the row being processed */ + const uchar *m_curr_row_end; /* One-after the end of the current row */ + uchar *m_key; /* Buffer to keep key value during searches */ + + int find_row(const Relay_log_info *const); + int write_row(const Relay_log_info *const, const bool); + + // Unpack the current row into m_table->record[0] + int unpack_current_row(const Relay_log_info *const rli) + { + DBUG_ASSERT(m_table); + ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT); + int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, &m_cols, + &m_curr_row_end, &m_master_reclength); + ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT); + return result; + } +#endif + +private: + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_update_pos(Relay_log_info *rli); + virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); + + /* + Primitive to prepare for a sequence of row executions. + + DESCRIPTION + + Before doing a sequence of do_prepare_row() and do_exec_row() + calls, this member function should be called to prepare for the + entire sequence. Typically, this member function will allocate + space for any buffers that are needed for the two member + functions mentioned above. + + RETURN VALUE + + The member function will return 0 if all went OK, or a non-zero + error code otherwise. + */ + virtual + int do_before_row_operations(const Slave_reporting_capability *const log) = 0; + + /* + Primitive to clean up after a sequence of row executions. + + DESCRIPTION + + After doing a sequence of do_prepare_row() and do_exec_row(), + this member function should be called to clean up and release + any allocated buffers. + + The error argument, if non-zero, indicates an error which happened during + row processing before this function was called. In this case, even if + function is successful, it should return the error code given in the argument. + */ + virtual + int do_after_row_operations(const Slave_reporting_capability *const log, + int error) = 0; + + /* + Primitive to do the actual execution necessary for a row. + + DESCRIPTION + The member function will do the actual execution needed to handle a row. + The row is located at m_curr_row. When the function returns, + m_curr_row_end should point at the next row (one byte after the end + of the current row). + + RETURN VALUE + 0 if execution succeeded, 1 if execution failed. + + */ + virtual int do_exec_row(const Relay_log_info *const rli) = 0; +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + /********** END OF CUT & PASTE FROM Rows_log_event **********/ protected: #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) - int do_apply_event(Rows_log_event*,const Relay_log_info*); + int do_apply_event(Old_rows_log_event*,const Relay_log_info*); /* Primitive to prepare for a sequence of row executions. @@ -100,33 +343,61 @@ class Old_rows_log_event }; -class Write_rows_log_event_old - : public Write_rows_log_event, public Old_rows_log_event -{ +/** + @class Write_rows_log_event_old + Old class for binlog events that write new rows to a table (event + type code PRE_GA_WRITE_ROWS_EVENT). Such events are never produced + by this version of the server, but they may be read from a relay log + created by an old server. New servers create events of class + Write_rows_log_event (event type code WRITE_ROWS_EVENT) instead. +*/ +class Write_rows_log_event_old : public Old_rows_log_event +{ + /********** BEGIN CUT & PASTE FROM Write_rows_log_event **********/ public: - enum - { - /* Support interface to THD::binlog_prepare_pending_rows_event */ - TYPE_CODE = PRE_GA_WRITE_ROWS_EVENT - }; - #if !defined(MYSQL_CLIENT) - Write_rows_log_event_old(THD *thd, TABLE *table, ulong table_id, - MY_BITMAP const *cols, bool is_transactional) - : Write_rows_log_event(thd, table, table_id, cols, is_transactional) - { - } + Write_rows_log_event_old(THD*, TABLE*, ulong table_id, + MY_BITMAP const *cols, bool is_transactional); #endif -#if defined(HAVE_REPLICATION) +#ifdef HAVE_REPLICATION Write_rows_log_event_old(const char *buf, uint event_len, - const Format_description_log_event *descr) - : Write_rows_log_event(buf, event_len, descr) + const Format_description_log_event *description_event); +#endif +#if !defined(MYSQL_CLIENT) + static bool binlog_row_logging_function(THD *thd, TABLE *table, + bool is_transactional, + MY_BITMAP *cols, + uint fields, + const uchar *before_record + __attribute__((unused)), + const uchar *after_record) { + return thd->binlog_write_row(table, is_transactional, + cols, fields, after_record); } #endif private: +#ifdef MYSQL_CLIENT + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_before_row_operations(const Slave_reporting_capability *const); + virtual int do_after_row_operations(const Slave_reporting_capability *const,int); + virtual int do_exec_row(const Relay_log_info *const); +#endif + /********** END OF CUT & PASTE FROM Write_rows_log_event **********/ + +public: + enum + { + /* Support interface to THD::binlog_prepare_pending_rows_event */ + TYPE_CODE = PRE_GA_WRITE_ROWS_EVENT + }; + +private: virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; } #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -145,9 +416,56 @@ private: }; -class Update_rows_log_event_old - : public Update_rows_log_event, public Old_rows_log_event +/** + @class Update_rows_log_event_old + + Old class for binlog events that modify existing rows to a table + (event type code PRE_GA_UPDATE_ROWS_EVENT). Such events are never + produced by this version of the server, but they may be read from a + relay log created by an old server. New servers create events of + class Update_rows_log_event (event type code UPDATE_ROWS_EVENT) + instead. +*/ +class Update_rows_log_event_old : public Old_rows_log_event { + /********** BEGIN CUT & PASTE FROM Update_rows_log_event **********/ +public: +#ifndef MYSQL_CLIENT + Update_rows_log_event_old(THD*, TABLE*, ulong table_id, + MY_BITMAP const *cols, + bool is_transactional); +#endif + +#ifdef HAVE_REPLICATION + Update_rows_log_event_old(const char *buf, uint event_len, + const Format_description_log_event *description_event); +#endif + +#if !defined(MYSQL_CLIENT) + static bool binlog_row_logging_function(THD *thd, TABLE *table, + bool is_transactional, + MY_BITMAP *cols, + uint fields, + const uchar *before_record, + const uchar *after_record) + { + return thd->binlog_update_row(table, is_transactional, + cols, fields, before_record, after_record); + } +#endif + +protected: +#ifdef MYSQL_CLIENT + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_before_row_operations(const Slave_reporting_capability *const); + virtual int do_after_row_operations(const Slave_reporting_capability *const,int); + virtual int do_exec_row(const Relay_log_info *const); +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + /********** END OF CUT & PASTE FROM Update_rows_log_event **********/ + uchar *m_after_image, *m_memory; public: @@ -157,23 +475,6 @@ public: TYPE_CODE = PRE_GA_UPDATE_ROWS_EVENT }; -#if !defined(MYSQL_CLIENT) - Update_rows_log_event_old(THD *thd, TABLE *table, ulong table_id, - MY_BITMAP const *cols, bool is_transactional) - : Update_rows_log_event(thd, table, table_id, cols, is_transactional), - m_after_image(NULL), m_memory(NULL) - { - } -#endif -#if defined(HAVE_REPLICATION) - Update_rows_log_event_old(const char *buf, uint event_len, - const Format_description_log_event *descr) - : Update_rows_log_event(buf, event_len, descr), - m_after_image(NULL), m_memory(NULL) - { - } -#endif - private: virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; } @@ -192,9 +493,54 @@ private: }; -class Delete_rows_log_event_old - : public Delete_rows_log_event, public Old_rows_log_event +/** + @class Delete_rows_log_event_old + + Old class for binlog events that delete existing rows from a table + (event type code PRE_GA_DELETE_ROWS_EVENT). Such events are never + produced by this version of the server, but they may be read from a + relay log created by an old server. New servers create events of + class Delete_rows_log_event (event type code DELETE_ROWS_EVENT) + instead. +*/ +class Delete_rows_log_event_old : public Old_rows_log_event { + /********** BEGIN CUT & PASTE FROM Update_rows_log_event **********/ +public: +#ifndef MYSQL_CLIENT + Delete_rows_log_event_old(THD*, TABLE*, ulong, + MY_BITMAP const *cols, bool is_transactional); +#endif +#ifdef HAVE_REPLICATION + Delete_rows_log_event_old(const char *buf, uint event_len, + const Format_description_log_event *description_event); +#endif +#if !defined(MYSQL_CLIENT) + static bool binlog_row_logging_function(THD *thd, TABLE *table, + bool is_transactional, + MY_BITMAP *cols, + uint fields, + const uchar *before_record, + const uchar *after_record + __attribute__((unused))) + { + return thd->binlog_delete_row(table, is_transactional, + cols, fields, before_record); + } +#endif + +protected: +#ifdef MYSQL_CLIENT + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_before_row_operations(const Slave_reporting_capability *const); + virtual int do_after_row_operations(const Slave_reporting_capability *const,int); + virtual int do_exec_row(const Relay_log_info *const); +#endif + /********** END CUT & PASTE FROM Delete_rows_log_event **********/ + uchar *m_after_image, *m_memory; public: @@ -204,23 +550,6 @@ public: TYPE_CODE = PRE_GA_DELETE_ROWS_EVENT }; -#if !defined(MYSQL_CLIENT) - Delete_rows_log_event_old(THD *thd, TABLE *table, ulong table_id, - MY_BITMAP const *cols, bool is_transactional) - : Delete_rows_log_event(thd, table, table_id, cols, is_transactional), - m_after_image(NULL), m_memory(NULL) - { - } -#endif -#if defined(HAVE_REPLICATION) - Delete_rows_log_event_old(const char *buf, uint event_len, - const Format_description_log_event *descr) - : Delete_rows_log_event(buf, event_len, descr), - m_after_image(NULL), m_memory(NULL) - { - } -#endif - private: virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; } @@ -240,4 +569,3 @@ private: #endif - diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 0adf90dc258..f1a594351bc 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -1882,6 +1882,7 @@ extern uint volatile thread_count, thread_running, global_read_lock; extern my_bool opt_sql_bin_update, opt_safe_user_create, opt_no_mix_types; extern my_bool opt_safe_show_db, opt_local_infile, opt_myisam_use_mmap; extern my_bool opt_slave_compressed_protocol, use_temp_pool; +extern ulong slave_exec_mode_options; extern my_bool opt_readonly, lower_case_file_system; extern my_bool opt_enable_named_pipe, opt_sync_frm, opt_allow_suspicious_udfs; extern my_bool opt_secure_auth; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 7b331d42941..61f3735fe62 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -459,6 +459,8 @@ ulong thread_stack, what_to_log; ulong query_buff_size, slow_launch_time, slave_open_temp_tables; ulong open_files_limit, max_binlog_size, max_relay_log_size; ulong slave_net_timeout, slave_trans_retries; +ulong slave_exec_mode_options; +const char *slave_exec_mode_str= "STRICT"; ulong thread_cache_size=0, thread_pool_size= 0; ulong binlog_cache_size=0, max_binlog_cache_size=0; ulong query_cache_size=0; @@ -5294,7 +5296,8 @@ enum options_mysqld OPT_SECURE_FILE_PRIV, OPT_MIN_EXAMINED_ROW_LIMIT, OPT_LOG_SLOW_SLAVE_STATEMENTS, - OPT_OLD_MODE + OPT_OLD_MODE, + OPT_SLAVE_EXEC_MODE }; @@ -5996,8 +5999,11 @@ replicating a LOAD DATA INFILE command.", (uchar**) &slave_load_tmpdir, (uchar**) &slave_load_tmpdir, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"slave-skip-errors", OPT_SLAVE_SKIP_ERRORS, - "Tells the slave thread to continue replication when a query returns an error from the provided list.", + "Tells the slave thread to continue replication when a query event returns an error from the provided list.", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"slave-exec-mode", OPT_SLAVE_EXEC_MODE, + "Modes for how replication events should be executed. Legal values are STRICT (default) and IDEMPOTENT. In IDEMPOTENT mode, replication will not stop for operations that are idempotent. In STRICT mode, replication will stop on any unexpected difference between the master and the slave.", + (uchar**) &slave_exec_mode_str, (uchar**) &slave_exec_mode_str, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, #endif {"slow-query-log", OPT_SLOW_LOG, "Enable|disable slow query log", (uchar**) &opt_slow_log, @@ -7212,6 +7218,9 @@ static void mysql_init_variables(void) /* Things with default values that are not zero */ delay_key_write_options= (uint) DELAY_KEY_WRITE_ON; + slave_exec_mode_options= 0; + slave_exec_mode_options= (uint) + find_bit_type_or_exit(slave_exec_mode_str, &slave_exec_mode_typelib, NULL); opt_specialflag= SPECIAL_ENGLISH; unix_sock= ip_sock= INVALID_SOCKET; mysql_home_ptr= mysql_home; @@ -7424,6 +7433,10 @@ mysqld_get_one_option(int optid, case OPT_SLAVE_SKIP_ERRORS: init_slave_skip_errors(argument); break; + case OPT_SLAVE_EXEC_MODE: + slave_exec_mode_options= (uint) + find_bit_type_or_exit(argument, &slave_exec_mode_typelib, ""); + break; #endif case OPT_SAFEMALLOC_MEM_LIMIT: #if !defined(DBUG_OFF) && defined(SAFEMALLOC) @@ -7974,6 +7987,8 @@ static void get_options(int *argc,char **argv) } /* Set global MyISAM variables from delay_key_write_options */ fix_delay_key_write((THD*) 0, OPT_GLOBAL); + /* Set global slave_exec_mode from its option */ + fix_slave_exec_mode(OPT_GLOBAL); #ifndef EMBEDDED_LIBRARY if (mysqld_chroot) diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 3467f6fd67c..0411b01b0cd 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1160,6 +1160,11 @@ void Relay_log_info::cleanup_context(THD *thd, bool error) close_thread_tables(thd); clear_tables_to_lock(); clear_flag(IN_STMT); + /* + Cleanup for the flags that have been set at do_apply_event. + */ + thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; + thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; last_event_start_time= 0; DBUG_VOID_RETURN; } diff --git a/sql/rpl_utility.cc b/sql/rpl_utility.cc index b3ca26d4c2c..4f4083d9b8f 100644 --- a/sql/rpl_utility.cc +++ b/sql/rpl_utility.cc @@ -164,7 +164,7 @@ uint32 table_def::calc_field_size(uint col, uchar *master_data) const break; } default: - length= -1; + length= ~(uint32) 0; } return length; } diff --git a/sql/set_var.cc b/sql/set_var.cc index 7dce5bf1a46..410608f154f 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -93,6 +93,16 @@ TYPELIB delay_key_write_typelib= delay_key_write_type_names, NULL }; +const char *slave_exec_mode_names[]= +{ "STRICT", "IDEMPOTENT", NullS }; +static const unsigned int slave_exec_mode_names_len[]= +{ sizeof("STRICT") - 1, sizeof("IDEMPOTENT") - 1, 0 }; +TYPELIB slave_exec_mode_typelib= +{ + array_elements(slave_exec_mode_names)-1, "", + slave_exec_mode_names, (unsigned int *) slave_exec_mode_names_len +}; + static int sys_check_ftb_syntax(THD *thd, set_var *var); static bool sys_update_ftb_syntax(THD *thd, set_var * var); static void sys_default_ftb_syntax(THD *thd, enum_var_type type); @@ -417,6 +427,11 @@ static sys_var_const_str_ptr sys_secure_file_priv(&vars, "secure_file_priv", static sys_var_long_ptr sys_server_id(&vars, "server_id", &server_id, fix_server_id); static sys_var_bool_ptr sys_slave_compressed_protocol(&vars, "slave_compressed_protocol", &opt_slave_compressed_protocol); +static sys_var_set_slave_mode slave_exec_mode(&vars, + "slave_exec_mode", + &slave_exec_mode_options, + &slave_exec_mode_typelib, + 0); static sys_var_long_ptr sys_slow_launch_time(&vars, "slow_launch_time", &slow_launch_time); static sys_var_thd_ulong sys_sort_buffer(&vars, "sort_buffer_size", @@ -1002,6 +1017,79 @@ extern void fix_delay_key_write(THD *thd, enum_var_type type) } } +bool sys_var_set::update(THD *thd, set_var *var) +{ + *value= var->save_result.ulong_value; + return 0; +}; + +uchar *sys_var_set::value_ptr(THD *thd, enum_var_type type, + LEX_STRING *base) +{ + char buff[256]; + String tmp(buff, sizeof(buff), &my_charset_latin1); + ulong length; + ulong val= *value; + + tmp.length(0); + for (uint i= 0; val; val>>= 1, i++) + { + if (val & 1) + { + tmp.append(enum_names->type_names[i], + enum_names->type_lengths[i]); + tmp.append(','); + } + } + + if ((length= tmp.length())) + length--; + return (uchar*) thd->strmake(tmp.ptr(), length); +} + +void sys_var_set_slave_mode::set_default(THD *thd, enum_var_type type) +{ + slave_exec_mode_options= 0; + bit_do_set(slave_exec_mode_options, SLAVE_EXEC_MODE_STRICT); +} + +bool sys_var_set_slave_mode::check(THD *thd, set_var *var) +{ + bool rc= sys_var_set::check(thd, var); + if (!rc && + bit_is_set(var->save_result.ulong_value, SLAVE_EXEC_MODE_STRICT) == 1 && + bit_is_set(var->save_result.ulong_value, SLAVE_EXEC_MODE_IDEMPOTENT) == 1) + { + rc= true; + my_error(ER_SLAVE_AMBIGOUS_EXEC_MODE, MYF(0), ""); + } + return rc; +} + +bool sys_var_set_slave_mode::update(THD *thd, set_var *var) +{ + bool rc; + pthread_mutex_lock(&LOCK_global_system_variables); + rc= sys_var_set::update(thd, var); + pthread_mutex_unlock(&LOCK_global_system_variables); + return rc; +} + +void fix_slave_exec_mode(enum_var_type type) +{ + DBUG_ENTER("fix_slave_exec_mode"); + compile_time_assert(sizeof(slave_exec_mode_options) * CHAR_BIT + > SLAVE_EXEC_MODE_LAST_BIT - 1); + if (bit_is_set(slave_exec_mode_options, SLAVE_EXEC_MODE_STRICT) == 1 && + bit_is_set(slave_exec_mode_options, SLAVE_EXEC_MODE_IDEMPOTENT) == 1) + { + sql_print_error("Ambiguous slave modes combination." + " STRICT will be used"); + bit_do_clear(slave_exec_mode_options, SLAVE_EXEC_MODE_IDEMPOTENT); + } + if (bit_is_set(slave_exec_mode_options, SLAVE_EXEC_MODE_IDEMPOTENT) == 0) + bit_do_set(slave_exec_mode_options, SLAVE_EXEC_MODE_STRICT); +} bool sys_var_thd_binlog_format::is_readonly() const { @@ -1115,6 +1203,7 @@ static void fix_trans_mem_root(THD *thd, enum_var_type type) static void fix_server_id(THD *thd, enum_var_type type) { server_id_supplied = 1; + thd->server_id= server_id; } @@ -3281,7 +3370,18 @@ int set_var::light_check(THD *thd) return 0; } +/** + Update variable + @param thd thread handler + @returns 0|1 ok or ERROR + + @note ERROR can be only due to abnormal operations involving + the server's execution evironment such as + out of memory, hard disk failure or the computer blows up. + Consider set_var::check() method if there is a need to return + an error due to logics. +*/ int set_var::update(THD *thd) { if (!value) diff --git a/sql/set_var.h b/sql/set_var.h index 5be54200c7d..171158fcf1e 100644 --- a/sql/set_var.h +++ b/sql/set_var.h @@ -30,7 +30,8 @@ class sys_var_pluginvar; /* opaque */ typedef struct system_variables SV; typedef struct my_locale_st MY_LOCALE; -extern TYPELIB bool_typelib, delay_key_write_typelib, sql_mode_typelib; +extern TYPELIB bool_typelib, delay_key_write_typelib, sql_mode_typelib, + slave_exec_mode_typelib; typedef int (*sys_check_func)(THD *, set_var *); typedef bool (*sys_update_func)(THD *, set_var *); @@ -804,6 +805,42 @@ public: }; +class sys_var_set :public sys_var +{ +protected: + ulong *value; + TYPELIB *enum_names; +public: + sys_var_set(sys_var_chain *chain, const char *name_arg, ulong *value_arg, + TYPELIB *typelib, sys_after_update_func func) + :sys_var(name_arg, func), value(value_arg), enum_names(typelib) + { chain_sys_var(chain); } + virtual bool check(THD *thd, set_var *var) + { + return check_set(thd, var, enum_names); + } + virtual void set_default(THD *thd, enum_var_type type) + { + *value= 0; + } + bool update(THD *thd, set_var *var); + uchar *value_ptr(THD *thd, enum_var_type type, LEX_STRING *base); + bool check_update_type(Item_result type) { return 0; } + SHOW_TYPE show_type() { return SHOW_CHAR; } +}; + +class sys_var_set_slave_mode :public sys_var_set +{ +public: + sys_var_set_slave_mode(sys_var_chain *chain, const char *name_arg, + ulong *value_arg, + TYPELIB *typelib, sys_after_update_func func) : + sys_var_set(chain, name_arg, value_arg, typelib, func) {} + void set_default(THD *thd, enum_var_type type); + bool check(THD *thd, set_var *var); + bool update(THD *thd, set_var *var); +}; + class sys_var_log_output :public sys_var { ulong *value; @@ -1222,6 +1259,7 @@ sys_var *find_sys_var(THD *thd, const char *str, uint length=0); int sql_set_variables(THD *thd, List<set_var_base> *var_list); bool not_all_support_one_shot(List<set_var_base> *var_list); void fix_delay_key_write(THD *thd, enum_var_type type); +void fix_slave_exec_mode(enum_var_type type); ulong fix_sql_mode(ulong sql_mode); extern sys_var_const_str sys_charset_system; extern sys_var_str sys_init_connect; diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt index 02e20ad7c5c..026a0023660 100644 --- a/sql/share/errmsg.txt +++ b/sql/share/errmsg.txt @@ -6114,3 +6114,8 @@ ER_TRG_CANT_OPEN_TABLE ER_CANT_CREATE_SROUTINE eng "Cannot create stored routine `%-.64s`. Check warnings" +ER_SLAVE_AMBIGOUS_EXEC_MODE + eng "Ambiguous slave modes combination. %s" + +ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT + eng "The BINLOG statement of type `%s` was not preceded by a format description BINLOG statement." diff --git a/sql/slave.cc b/sql/slave.cc index 45e3d4da090..4ffc2023e85 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -13,6 +13,17 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/** + @addtogroup Replication + @{ + + @file + + @brief Code to run the io thread and the sql thread on the + replication slave. +*/ + #include "mysql_priv.h" #include <mysql.h> @@ -33,10 +44,6 @@ #include "rpl_tblmap.h" -int queue_event(Master_info* mi,const char* buf,ulong event_len); -static Log_event* next_event(Relay_log_info* rli); - - #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") #define MAX_SLAVE_RETRY_PAUSE 5 @@ -132,6 +139,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, const char* table_name, bool overwrite); static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi); static Log_event* next_event(Relay_log_info* rli); +static int queue_event(Master_info* mi,const char* buf,ulong event_len); static int terminate_slave_thread(THD *thd, pthread_mutex_t* term_lock, pthread_cond_t* term_cond, @@ -1775,6 +1783,175 @@ static int has_temporary_error(THD *thd) DBUG_RETURN(0); } + +/** + Applies the given event and advances the relay log position. + + In essence, this function does: + + @code + ev->apply_event(rli); + ev->update_pos(rli); + @endcode + + But it also does some maintainance, such as skipping events if + needed and reporting errors. + + If the @c skip flag is set, then it is tested whether the event + should be skipped, by looking at the slave_skip_counter and the + server id. The skip flag should be set when calling this from a + replication thread but not set when executing an explicit BINLOG + statement. + + @retval 0 OK. + + @retval 1 Error calling ev->apply_event(). + + @retval 2 No error calling ev->apply_event(), but error calling + ev->update_pos(). +*/ +int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, + bool skip) +{ + int exec_res= 0; + + DBUG_ENTER("apply_event_and_update_pos"); + + DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)", + ev->get_type_str(), ev->get_type_code(), + ev->server_id)); + DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu", + FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->options, OPTION_BEGIN), + rli->last_event_start_time)); + + /* + Execute the event to change the database and update the binary + log coordinates, but first we set some data that is needed for + the thread. + + The event will be executed unless it is supposed to be skipped. + + Queries originating from this server must be skipped. Low-level + events (Format_description_log_event, Rotate_log_event, + Stop_log_event) from this server must also be skipped. But for + those we don't want to modify 'group_master_log_pos', because + these events did not exist on the master. + Format_description_log_event is not completely skipped. + + Skip queries specified by the user in 'slave_skip_counter'. We + can't however skip events that has something to do with the log + files themselves. + + Filtering on own server id is extremely important, to ignore + execution of events created by the creation/rotation of the relay + log (remember that now the relay log starts with its Format_desc, + has a Rotate etc). + */ + + thd->server_id = ev->server_id; // use the original server id for logging + thd->set_time(); // time the query + thd->lex->current_select= 0; + if (!ev->when) + ev->when= my_time(0); + ev->thd = thd; // because up to this point, ev->thd == 0 + + if (skip) + { + int reason= ev->shall_skip(rli); + if (reason == Log_event::EVENT_SKIP_COUNT) + --rli->slave_skip_counter; + pthread_mutex_unlock(&rli->data_lock); + if (reason == Log_event::EVENT_SKIP_NOT) + exec_res= ev->apply_event(rli); +#ifndef DBUG_OFF + /* + This only prints information to the debug trace. + + TODO: Print an informational message to the error log? + */ + static const char *const explain[] = { + // EVENT_SKIP_NOT, + "not skipped", + // EVENT_SKIP_IGNORE, + "skipped because event should be ignored", + // EVENT_SKIP_COUNT + "skipped because event skip counter was non-zero" + }; + DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d", + thd->options & OPTION_BEGIN ? 1 : 0, + rli->get_flag(Relay_log_info::IN_STMT))); + DBUG_PRINT("skip_event", ("%s event was %s", + ev->get_type_str(), explain[reason])); +#endif + } + else + exec_res= ev->apply_event(rli); + + DBUG_PRINT("info", ("apply_event error = %d", exec_res)); + if (exec_res == 0) + { + int error= ev->update_pos(rli); + char buf[22]; + DBUG_PRINT("info", ("update_pos error = %d", error)); + DBUG_PRINT("info", ("group %s %s", + llstr(rli->group_relay_log_pos, buf), + rli->group_relay_log_name)); + DBUG_PRINT("info", ("event %s %s", + llstr(rli->event_relay_log_pos, buf), + rli->event_relay_log_name)); + /* + The update should not fail, so print an error message and + return an error code. + + TODO: Replace this with a decent error message when merged + with BUG#24954 (which adds several new error message). + */ + if (error) + { + rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, + "It was not possible to update the positions" + " of the relay log information: the slave may" + " be in an inconsistent state." + " Stopped in %s position %s", + rli->group_relay_log_name, + llstr(rli->group_relay_log_pos, buf)); + DBUG_RETURN(2); + } + } + + DBUG_RETURN(exec_res ? 1 : 0); +} + + +/** + Top-level function for executing the next event from the relay log. + + This function reads the event from the relay log, executes it, and + advances the relay log position. It also handles errors, etc. + + This function may fail to apply the event for the following reasons: + + - The position specfied by the UNTIL condition of the START SLAVE + command is reached. + + - It was not possible to read the event from the log. + + - The slave is killed. + + - An error occurred when applying the event, and the event has been + tried slave_trans_retries times. If the event has been retried + fewer times, 0 is returned. + + - init_master_info or init_relay_log_pos failed. (These are called + if a failure occurs when applying the event.)</li> + + - An error occurred when updating the binlog position. + + @retval 0 The event was applied. + + @retval 1 The event was not applied. +*/ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) { DBUG_ENTER("exec_relay_log_event"); @@ -1820,117 +1997,26 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) } if (ev) { - int const type_code= ev->get_type_code(); - int exec_res= 0; - - DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)", - ev->get_type_str(), type_code, ev->server_id)); - DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu", - FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), - FLAGSTR(thd->options, OPTION_BEGIN), - rli->last_event_start_time)); - + int exec_res= apply_event_and_update_pos(ev, thd, rli, TRUE); /* - Execute the event to change the database and update the binary - log coordinates, but first we set some data that is needed for - the thread. - - The event will be executed unless it is supposed to be skipped. - - Queries originating from this server must be skipped. Low-level - events (Format_description_log_event, Rotate_log_event, - Stop_log_event) from this server must also be skipped. But for - those we don't want to modify 'group_master_log_pos', because - these events did not exist on the master. - Format_description_log_event is not completely skipped. - - Skip queries specified by the user in 'slave_skip_counter'. We - can't however skip events that has something to do with the log - files themselves. - - Filtering on own server id is extremely important, to ignore - execution of events created by the creation/rotation of the relay - log (remember that now the relay log starts with its Format_desc, - has a Rotate etc). + Format_description_log_event should not be deleted because it will be + used to read info about the relay log's format; it will be deleted when + the SQL thread does not need it, i.e. when this thread terminates. */ - - thd->server_id = ev->server_id; // use the original server id for logging - thd->set_time(); // time the query - thd->lex->current_select= 0; - if (!ev->when) - ev->when= my_time(0); - ev->thd = thd; // because up to this point, ev->thd == 0 - - int reason= ev->shall_skip(rli); - if (reason == Log_event::EVENT_SKIP_COUNT) - --rli->slave_skip_counter; - pthread_mutex_unlock(&rli->data_lock); - if (reason == Log_event::EVENT_SKIP_NOT) - exec_res= ev->apply_event(rli); -#ifndef DBUG_OFF - /* - This only prints information to the debug trace. - - TODO: Print an informational message to the error log? - */ - static const char *const explain[] = { - // EVENT_SKIP_NOT, - "not skipped", - // EVENT_SKIP_IGNORE, - "skipped because event should be ignored", - // EVENT_SKIP_COUNT - "skipped because event skip counter was non-zero" - }; - DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d", - thd->options & OPTION_BEGIN ? 1 : 0, - rli->get_flag(Relay_log_info::IN_STMT))); - DBUG_PRINT("skip_event", ("%s event was %s", - ev->get_type_str(), explain[reason])); -#endif - - DBUG_PRINT("info", ("apply_event error = %d", exec_res)); - if (exec_res == 0) + if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) { - int error= ev->update_pos(rli); - char buf[22]; - DBUG_PRINT("info", ("update_pos error = %d", error)); - DBUG_PRINT("info", ("group %s %s", - llstr(rli->group_relay_log_pos, buf), - rli->group_relay_log_name)); - DBUG_PRINT("info", ("event %s %s", - llstr(rli->event_relay_log_pos, buf), - rli->event_relay_log_name)); - /* - The update should not fail, so print an error message and - return an error code. - - TODO: Replace this with a decent error message when merged - with BUG#24954 (which adds several new error message). - */ - if (error) - { - rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, - "It was not possible to update the positions" - " of the relay log information: the slave may" - " be in an inconsistent state." - " Stopped in %s position %s", - rli->group_relay_log_name, - llstr(rli->group_relay_log_pos, buf)); - DBUG_RETURN(1); - } + DBUG_PRINT("info", ("Deleting the event after it has been executed")); + delete ev; } /* - Format_description_log_event should not be deleted because it will be - used to read info about the relay log's format; it will be deleted when - the SQL thread does not need it, i.e. when this thread terminates. + update_log_pos failed: this should not happen, so we don't + retry. */ - if (type_code != FORMAT_DESCRIPTION_EVENT) - { - DBUG_PRINT("info", ("Deleting the event after it has been executed")); - delete ev; - } + if (exec_res == 2) + DBUG_RETURN(1); + if (slave_trans_retries) { int temp_err; @@ -2760,7 +2846,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) } if (unlikely(cev_not_written)) { - cev->block = (char*)net->read_pos; + cev->block = net->read_pos; cev->block_len = num_bytes; if (unlikely(mi->rli.relay_log.append(cev))) { @@ -2774,7 +2860,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) } else { - aev.block = (char*)net->read_pos; + aev.block = net->read_pos; aev.block_len = num_bytes; aev.log_pos = cev->log_pos; if (unlikely(mi->rli.relay_log.append(&aev))) @@ -3074,7 +3160,7 @@ static int queue_old_event(Master_info *mi, const char *buf, any >=5.0.0 format. */ -int queue_event(Master_info* mi,const char* buf, ulong event_len) +static int queue_event(Master_info* mi,const char* buf, ulong event_len) { int error= 0; ulong inc_pos; @@ -3960,4 +4046,8 @@ template class I_List_iterator<i_string>; template class I_List_iterator<i_string_pair>; #endif +/** + @} (end of group Replication) +*/ + #endif /* HAVE_REPLICATION */ diff --git a/sql/slave.h b/sql/slave.h index 2cd9ea352ba..f1772bbc1fc 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -187,6 +187,8 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, void set_slave_thread_options(THD* thd); void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli); void rotate_relay_log(Master_info* mi); +int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, + bool skip); pthread_handler_t handle_slave_io(void *arg); pthread_handler_t handle_slave_sql(void *arg); diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index d2d26da229a..0d563ab9051 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -5579,6 +5579,7 @@ bool mysql_create_user(THD *thd, List <LEX_USER> &list) LEX_USER *user_name, *tmp_user_name; List_iterator <LEX_USER> user_list(list); TABLE_LIST tables[GRANT_TABLES]; + bool some_users_created= FALSE; DBUG_ENTER("mysql_create_user"); /* @@ -5614,6 +5615,7 @@ bool mysql_create_user(THD *thd, List <LEX_USER> &list) continue; } + some_users_created= TRUE; sql_mode= thd->variables.sql_mode; if (replace_user_table(thd, tables[0].table, *user_name, 0, 0, 1, 0)) { @@ -5624,12 +5626,14 @@ bool mysql_create_user(THD *thd, List <LEX_USER> &list) VOID(pthread_mutex_unlock(&acl_cache->lock)); - write_bin_log(thd, FALSE, thd->query, thd->query_length); + if (result) + my_error(ER_CANNOT_USER, MYF(0), "CREATE USER", wrong_users.c_ptr_safe()); + + if (some_users_created) + write_bin_log(thd, FALSE, thd->query, thd->query_length); rw_unlock(&LOCK_grant); close_thread_tables(thd); - if (result) - my_error(ER_CANNOT_USER, MYF(0), "CREATE USER", wrong_users.c_ptr_safe()); DBUG_RETURN(result); } @@ -5654,6 +5658,7 @@ bool mysql_drop_user(THD *thd, List <LEX_USER> &list) LEX_USER *user_name, *tmp_user_name; List_iterator <LEX_USER> user_list(list); TABLE_LIST tables[GRANT_TABLES]; + bool some_users_deleted= FALSE; DBUG_ENTER("mysql_drop_user"); /* @@ -5682,7 +5687,9 @@ bool mysql_drop_user(THD *thd, List <LEX_USER> &list) { append_user(&wrong_users, user_name); result= TRUE; + continue; } + some_users_deleted= TRUE; } /* Rebuild 'acl_check_hosts' since 'acl_users' has been modified */ @@ -5693,7 +5700,8 @@ bool mysql_drop_user(THD *thd, List <LEX_USER> &list) if (result) my_error(ER_CANNOT_USER, MYF(0), "DROP USER", wrong_users.c_ptr_safe()); - write_bin_log(thd, FALSE, thd->query, thd->query_length); + if (some_users_deleted) + write_bin_log(thd, FALSE, thd->query, thd->query_length); rw_unlock(&LOCK_grant); close_thread_tables(thd); @@ -5722,6 +5730,7 @@ bool mysql_rename_user(THD *thd, List <LEX_USER> &list) LEX_USER *user_to, *tmp_user_to; List_iterator <LEX_USER> user_list(list); TABLE_LIST tables[GRANT_TABLES]; + bool some_users_renamed= FALSE; DBUG_ENTER("mysql_rename_user"); /* @@ -5762,7 +5771,9 @@ bool mysql_rename_user(THD *thd, List <LEX_USER> &list) { append_user(&wrong_users, user_from); result= TRUE; + continue; } + some_users_renamed= TRUE; } /* Rebuild 'acl_check_hosts' since 'acl_users' has been modified */ @@ -5770,12 +5781,14 @@ bool mysql_rename_user(THD *thd, List <LEX_USER> &list) VOID(pthread_mutex_unlock(&acl_cache->lock)); - write_bin_log(thd, FALSE, thd->query, thd->query_length); + if (result) + my_error(ER_CANNOT_USER, MYF(0), "RENAME USER", wrong_users.c_ptr_safe()); + + if (some_users_renamed && mysql_bin_log.is_open()) + write_bin_log(thd, FALSE, thd->query, thd->query_length); rw_unlock(&LOCK_grant); close_thread_tables(thd); - if (result) - my_error(ER_CANNOT_USER, MYF(0), "RENAME USER", wrong_users.c_ptr_safe()); DBUG_RETURN(result); } diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 77c5155b41b..04f408453ea 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -17,15 +17,14 @@ #include "rpl_rli.h" #include "base64.h" -/* +/** Execute a BINLOG statement - TODO: This currently assumes a MySQL 5.x binlog. - When we'll have binlog with a different format, to execute the - BINLOG command properly the server will need to know which format - the BINLOG command's event is in. mysqlbinlog should then send - the Format_description_log_event of the binlog it reads and the - server thread should cache this format into + To execute the BINLOG command properly the server needs to know + which format the BINLOG command's event is in. Therefore, the first + BINLOG statement seen must be a base64 encoding of the + Format_description_log_event, as outputted by mysqlbinlog. This + Format_description_log_event is cached in rli->description_event_for_exec. */ @@ -47,11 +46,24 @@ void mysql_client_binlog_statement(THD* thd) /* Allocation */ + + /* + If we do not have a Format_description_event, we create a dummy + one here. In this case, the first event we read must be a + Format_description_event. + */ + my_bool have_fd_event= TRUE; if (!thd->rli_fake) + { thd->rli_fake= new Relay_log_info; - - const Format_description_log_event *desc= - new Format_description_log_event(4); + have_fd_event= FALSE; + } + if (thd->rli_fake && !thd->rli_fake->relay_log.description_event_for_exec) + { + thd->rli_fake->relay_log.description_event_for_exec= + new Format_description_log_event(4); + have_fd_event= FALSE; + } const char *error= 0; char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME)); @@ -60,7 +72,9 @@ void mysql_client_binlog_statement(THD* thd) /* Out of memory check */ - if (!(thd->rli_fake && desc && buf)) + if (!(thd->rli_fake && + thd->rli_fake->relay_log.description_event_for_exec && + buf)) { my_error(ER_OUTOFMEMORY, MYF(0), 1); /* needed 1 bytes */ goto end; @@ -131,7 +145,28 @@ void mysql_client_binlog_statement(THD* thd) goto end; } - ev= Log_event::read_log_event(bufptr, event_len, &error, desc); + /* + If we have not seen any Format_description_event, then we must + see one; it is the only statement that can be read in base64 + without a prior Format_description_event. + */ + if (!have_fd_event) + { + if (bufptr[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) + have_fd_event= TRUE; + else + { + my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT, + MYF(0), + Log_event::get_type_str( + (Log_event_type)bufptr[EVENT_TYPE_OFFSET])); + goto end; + } + } + + ev= Log_event::read_log_event(bufptr, event_len, &error, + thd->rli_fake->relay_log. + description_event_for_exec); DBUG_PRINT("info",("binlog base64 err=%s", error)); if (!ev) @@ -167,11 +202,10 @@ void mysql_client_binlog_statement(THD* thd) Neither do we have to update the log positions, since that is not used at all: the rli_fake instance is used only for error reporting. - */ + */ #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) - if (IF_DBUG(int err= ) ev->apply_event(thd->rli_fake)) + if (apply_event_and_update_pos(ev, thd, thd->rli_fake, FALSE)) { - DBUG_PRINT("info", ("apply_event() returned: %d", err)); /* TODO: Maybe a better error message since the BINLOG statement now contains several events. @@ -181,7 +215,14 @@ void mysql_client_binlog_statement(THD* thd) } #endif - delete ev; + /* + Format_description_log_event should not be deleted because it + will be used to read info about the relay log's format; it + will be deleted when the SQL thread does not need it, + i.e. when this thread terminates. + */ + if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) + delete ev; ev= 0; } } @@ -191,7 +232,6 @@ void mysql_client_binlog_statement(THD* thd) send_ok(thd); end: - delete desc; my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); DBUG_VOID_RETURN; } diff --git a/sql/sql_class.h b/sql/sql_class.h index e8f28b19213..5f2b50f48b8 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -38,6 +38,9 @@ enum enum_ha_read_modes { RFIRST, RNEXT, RPREV, RLAST, RKEY, RNEXT_SAME }; enum enum_duplicates { DUP_ERROR, DUP_REPLACE, DUP_UPDATE }; enum enum_delay_key_write { DELAY_KEY_WRITE_NONE, DELAY_KEY_WRITE_ON, DELAY_KEY_WRITE_ALL }; +enum enum_slave_exec_mode { SLAVE_EXEC_MODE_STRICT, + SLAVE_EXEC_MODE_IDEMPOTENT, + SLAVE_EXEC_MODE_LAST_BIT}; enum enum_mark_columns { MARK_COLUMNS_NONE, MARK_COLUMNS_READ, MARK_COLUMNS_WRITE}; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index ecee3fcb97f..1ae42b3ce79 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1310,7 +1310,6 @@ bool dispatch_command(enum enum_server_command command, THD *thd, unregister_slave(thd,1,1); /* fake COM_QUIT -- if we get here, the thread needs to terminate */ error = TRUE; - net->error = 0; break; } #endif diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 1e05e19c6f5..91b8b626570 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1243,9 +1243,6 @@ bool change_master(THD* thd, Master_info* mi) DBUG_RETURN(TRUE); } } - mi->rli.group_master_log_pos = mi->master_log_pos; - DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); - /* Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block, so restore them to good values. If we left them to ''/0, that would work; @@ -1257,6 +1254,7 @@ bool change_master(THD* thd, Master_info* mi) That's why we always save good coords in rli. */ mi->rli.group_master_log_pos= mi->master_log_pos; + DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); strmake(mi->rli.group_master_log_name,mi->master_log_name, sizeof(mi->rli.group_master_log_name)-1); @@ -1376,6 +1374,11 @@ bool mysql_show_binlog_events(THD* thd) if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0) goto err; + /* + to account binlog event header size + */ + thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER; + pthread_mutex_lock(log_lock); /* @@ -1386,7 +1389,6 @@ bool mysql_show_binlog_events(THD* thd) This code will fail on a mixed relay log (one which has Format_desc then Rotate then Format_desc). */ - ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event); if (ev) { @@ -1578,39 +1580,54 @@ err: DBUG_RETURN(TRUE); } - +/** + Load data's io cache specific hook to be executed + before a chunk of data is being read into the cache's buffer + The fuction instantianates and writes into the binlog + replication events along LOAD DATA processing. + + @param file pointer to io-cache + @return 0 +*/ int log_loaded_block(IO_CACHE* file) { + DBUG_ENTER("log_loaded_block"); LOAD_FILE_INFO *lf_info; - uint block_len ; - - /* file->request_pos contains position where we started last read */ - char* buffer = (char*) file->request_pos; - if (!(block_len = (char*) file->read_end - (char*) buffer)) - return 0; - lf_info = (LOAD_FILE_INFO*) file->arg; + uint block_len; + /* buffer contains position where we started last read */ + uchar* buffer= (uchar*) my_b_get_buffer_start(file); + uint max_event_size= current_thd->variables.max_allowed_packet; + lf_info= (LOAD_FILE_INFO*) file->arg; if (lf_info->thd->current_stmt_binlog_row_based) return 0; if (lf_info->last_pos_in_file != HA_POS_ERROR && - lf_info->last_pos_in_file >= file->pos_in_file) - return 0; - lf_info->last_pos_in_file = file->pos_in_file; - if (lf_info->wrote_create_file) - { - Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer, - block_len, lf_info->log_delayed); - mysql_bin_log.write(&a); - } - else + lf_info->last_pos_in_file >= my_b_get_pos_in_file(file)) + DBUG_RETURN(0); + + for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0; + buffer += min(block_len, max_event_size), + block_len -= min(block_len, max_event_size)) { - Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db, - buffer, block_len, - lf_info->log_delayed); - mysql_bin_log.write(&b); - lf_info->wrote_create_file = 1; - DBUG_SYNC_POINT("debug_lock.created_file_event",10); + lf_info->last_pos_in_file= my_b_get_pos_in_file(file); + if (lf_info->wrote_create_file) + { + Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer, + min(block_len, max_event_size), + lf_info->log_delayed); + mysql_bin_log.write(&a); + } + else + { + Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db, + buffer, + min(block_len, max_event_size), + lf_info->log_delayed); + mysql_bin_log.write(&b); + lf_info->wrote_create_file= 1; + DBUG_SYNC_POINT("debug_lock.created_file_event",10); + } } - return 0; + DBUG_RETURN(0); } /* diff --git a/sql/sql_string.cc b/sql/sql_string.cc index 2e076af45eb..7fa3786c382 100644 --- a/sql/sql_string.cc +++ b/sql/sql_string.cc @@ -297,8 +297,8 @@ bool String::copy_aligned(const char *str,uint32 arg_length, uint32 offset, return TRUE; /* - Note, this is only safe for little-endian UCS-2. - If we add big-endian UCS-2 sometimes, this code + Note, this is only safe for big-endian UCS-2. + If we add little-endian UCS-2 sometimes, this code will be more complicated. But it's OK for now. */ bzero((char*) Ptr, offset); diff --git a/sql/sql_view.cc b/sql/sql_view.cc index 79973b85181..da301b37484 100644 --- a/sql/sql_view.cc +++ b/sql/sql_view.cc @@ -1465,6 +1465,8 @@ bool mysql_drop_view(THD *thd, TABLE_LIST *views, enum_drop_mode drop_mode) char *wrong_object_db= NULL, *wrong_object_name= NULL; bool error= FALSE; enum legacy_db_type not_used; + bool some_views_deleted= FALSE; + bool something_wrong= FALSE; DBUG_ENTER("mysql_drop_view"); VOID(pthread_mutex_lock(&LOCK_open)); @@ -1506,6 +1508,8 @@ bool mysql_drop_view(THD *thd, TABLE_LIST *views, enum_drop_mode drop_mode) if (my_delete(path, MYF(MY_WME))) error= TRUE; + some_views_deleted= TRUE; + /* For a view, there is only one table_share object which should never be used outside of LOCK_open @@ -1523,29 +1527,32 @@ bool mysql_drop_view(THD *thd, TABLE_LIST *views, enum_drop_mode drop_mode) sp_cache_invalidate(); } - if (error) - { - VOID(pthread_mutex_unlock(&LOCK_open)); - DBUG_RETURN(TRUE); - } if (wrong_object_name) { - VOID(pthread_mutex_unlock(&LOCK_open)); my_error(ER_WRONG_OBJECT, MYF(0), wrong_object_db, wrong_object_name, "VIEW"); - DBUG_RETURN(TRUE); } if (non_existant_views.length()) { - VOID(pthread_mutex_unlock(&LOCK_open)); my_error(ER_BAD_TABLE_ERROR, MYF(0), non_existant_views.c_ptr()); - DBUG_RETURN(TRUE); } - write_bin_log(thd, TRUE, thd->query, thd->query_length); + something_wrong= error || wrong_object_name || non_existant_views.length(); + if (some_views_deleted || !something_wrong) + { + /* if something goes wrong, bin-log with possible error code, + otherwise bin-log with error code cleared. + */ + write_bin_log(thd, !something_wrong, thd->query, thd->query_length); + } - send_ok(thd); VOID(pthread_mutex_unlock(&LOCK_open)); + + if (something_wrong) + { + DBUG_RETURN(TRUE); + } + send_ok(thd); DBUG_RETURN(FALSE); } |