summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc666
1 files changed, 416 insertions, 250 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index c07bb573188..29d1fd8a084 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -1,4 +1,4 @@
-/* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc.
+/* Copyright 2000-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -16,7 +16,7 @@
#ifdef MYSQL_CLIENT
-#include "mysql_priv.h"
+#include "sql_priv.h"
#else
@@ -24,19 +24,32 @@
#pragma implementation // gcc: Class implementation
#endif
-#include "mysql_priv.h"
+#include "sql_priv.h"
+#include "unireg.h"
+#include "my_global.h" // REQUIRED by log_event.h > m_string.h > my_bitmap.h
+#include "log_event.h"
+#include "sql_base.h" // close_thread_tables
+#include "sql_cache.h" // QUERY_CACHE_FLAGS_SIZE
+#include "sql_locale.h" // MY_LOCALE, my_locale_by_number, my_locale_en_US
+#include "key.h" // key_copy
+#include "lock.h" // mysql_unlock_tables
+#include "sql_parse.h" // mysql_test_parse_for_slave
+#include "tztime.h" // struct Time_zone
+#include "sql_load.h" // mysql_load
+#include "sql_db.h" // load_db_opt_by_name
#include "slave.h"
#include "rpl_rli.h"
#include "rpl_mi.h"
#include "rpl_filter.h"
-#include "rpl_utility.h"
#include "rpl_record.h"
+#include "transaction.h"
#include <my_dir.h>
#endif /* MYSQL_CLIENT */
#include <base64.h>
#include <my_bitmap.h>
+#include "rpl_utility.h"
#define log_cs &my_charset_latin1
@@ -134,7 +147,7 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
char buff[MAX_SLAVE_ERRMSG], *slider;
const char *buff_end= buff + sizeof(buff);
uint len;
- List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
+ List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
MYSQL_ERROR *err;
buff[0]= 0;
@@ -142,10 +155,11 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
slider += len, err= it++)
{
len= my_snprintf(slider, buff_end - slider,
- " %s, Error_code: %d;", err->msg, err->code);
+ " %s, Error_code: %d;", err->get_message_text(),
+ err->get_sql_errno());
}
- rli->report(level, thd->is_error()? thd->main_da.sql_errno() : 0,
+ rli->report(level, thd->is_error()? thd->stmt_da->sql_errno() : 0,
"Could not execute %s event on table %s.%s;"
"%s handler error %s; "
"the event's master log %s, end_log_pos %lu",
@@ -353,13 +367,13 @@ inline int ignored_error_code(int err_code)
*/
int convert_handler_error(int error, THD* thd, TABLE *table)
{
- uint actual_error= (thd->is_error() ? thd->main_da.sql_errno() :
+ uint actual_error= (thd->is_error() ? thd->stmt_da->sql_errno() :
0);
if (actual_error == 0)
{
table->file->print_error(error, MYF(0));
- actual_error= (thd->is_error() ? thd->main_da.sql_errno() :
+ actual_error= (thd->is_error() ? thd->stmt_da->sql_errno() :
ER_UNKNOWN_ERROR);
if (actual_error == ER_UNKNOWN_ERROR)
if (global_system_variables.log_warnings)
@@ -496,7 +510,7 @@ static void cleanup_load_tmpdir()
if (is_prefix(file->name, prefbuf))
{
fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
- my_delete(fname, MYF(0));
+ mysql_file_delete(key_file_misc, fname, MYF(0));
}
}
@@ -664,10 +678,12 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
{
server_id= thd->server_id;
when= thd->start_time;
- cache_stmt= using_trans;
+ cache_type= (using_trans || stmt_has_updated_trans_table(thd)
+ || thd->thread_temporary_used
+ ? Log_event::EVENT_TRANSACTIONAL_CACHE :
+ Log_event::EVENT_STMT_CACHE);
}
-
/**
This minimal constructor is for when you are not even sure that there
is a valid THD. For example in the server when we are shutting down or
@@ -676,8 +692,8 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
*/
Log_event::Log_event()
- :temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
- thd(0)
+ :temp_buf(0), exec_time(0), flags(0),
+ cache_type(Log_event::EVENT_INVALID_CACHE), thd(0)
{
server_id= ::server_id;
/*
@@ -696,7 +712,7 @@ Log_event::Log_event()
Log_event::Log_event(const char* buf,
const Format_description_log_event* description_event)
- :temp_buf(0), cache_stmt(0)
+ :temp_buf(0), cache_type(Log_event::EVENT_INVALID_CACHE)
{
#ifndef MYSQL_CLIENT
thd = 0;
@@ -966,7 +982,7 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
*/
int Log_event::read_log_event(IO_CACHE* file, String* packet,
- pthread_mutex_t* log_lock)
+ mysql_mutex_t* log_lock)
{
ulong data_len;
int result=0;
@@ -974,7 +990,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
DBUG_ENTER("Log_event::read_log_event");
if (log_lock)
- pthread_mutex_lock(log_lock);
+ mysql_mutex_lock(log_lock);
if (my_b_read(file, (uchar*) buf, sizeof(buf)))
{
/*
@@ -1032,14 +1048,14 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
end:
if (log_lock)
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
DBUG_RETURN(result);
}
#endif /* !MYSQL_CLIENT */
#ifndef MYSQL_CLIENT
-#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
-#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
+#define UNLOCK_MUTEX if (log_lock) mysql_mutex_unlock(log_lock);
+#define LOCK_MUTEX if (log_lock) mysql_mutex_lock(log_lock);
#else
#define UNLOCK_MUTEX
#define LOCK_MUTEX
@@ -1051,7 +1067,7 @@ end:
Allocates memory; The caller is responsible for clean-up.
*/
Log_event* Log_event::read_log_event(IO_CACHE* file,
- pthread_mutex_t* log_lock,
+ mysql_mutex_t* log_lock,
const Format_description_log_event
*description_event)
#else
@@ -1199,15 +1215,11 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
*/
if (description_event->event_type_permutation)
{
- IF_DBUG({
- int new_event_type=
- description_event->event_type_permutation[event_type];
- DBUG_PRINT("info",
- ("converting event type %d to %d (%s)",
- event_type, new_event_type,
- get_type_str((Log_event_type)new_event_type)));
- });
- event_type= description_event->event_type_permutation[event_type];
+ int new_event_type= description_event->event_type_permutation[event_type];
+ DBUG_PRINT("info", ("converting event type %d to %d (%s)",
+ event_type, new_event_type,
+ get_type_str((Log_event_type)new_event_type)));
+ event_type= new_event_type;
}
switch(event_type) {
@@ -1570,37 +1582,14 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr,
/* a long CHAR() field: see #37426 */
length= byte1 | (((byte0 & 0x30) ^ 0x30) << 4);
type= byte0 | 0x30;
- goto beg;
- }
-
- switch (byte0)
- {
- case MYSQL_TYPE_SET:
- case MYSQL_TYPE_ENUM:
- case MYSQL_TYPE_STRING:
- type= byte0;
- length= byte1;
- break;
-
- default:
-
- {
- char tmp[5];
- my_snprintf(tmp, sizeof(tmp), "%04X", meta);
- my_b_printf(file,
- "!! Don't know how to handle column type=%d meta=%d (%s)",
- type, meta, tmp);
- return 0;
- }
}
+ else
+ length = meta & 0xFF;
}
else
length= meta;
}
-
-beg:
-
switch (type) {
case MYSQL_TYPE_LONG:
{
@@ -1737,6 +1726,33 @@ beg:
return 3;
}
+ case MYSQL_TYPE_NEWDATE:
+ {
+ uint32 tmp= uint3korr(ptr);
+ int part;
+ char buf[11];
+ char *pos= &buf[10]; // start from '\0' to the beginning
+
+ /* Copied from field.cc */
+ *pos--=0; // End NULL
+ part=(int) (tmp & 31);
+ *pos--= (char) ('0'+part%10);
+ *pos--= (char) ('0'+part/10);
+ *pos--= ':';
+ part=(int) (tmp >> 5 & 15);
+ *pos--= (char) ('0'+part%10);
+ *pos--= (char) ('0'+part/10);
+ *pos--= ':';
+ part=(int) (tmp >> 9);
+ *pos--= (char) ('0'+part%10); part/=10;
+ *pos--= (char) ('0'+part%10); part/=10;
+ *pos--= (char) ('0'+part%10); part/=10;
+ *pos= (char) ('0'+part);
+ my_b_printf(file , "'%s'", buf);
+ my_snprintf(typestr, typestr_length, "DATE");
+ return 3;
+ }
+
case MYSQL_TYPE_DATE:
{
uint i32= uint3korr(ptr);
@@ -1755,7 +1771,7 @@ beg:
}
case MYSQL_TYPE_ENUM:
- switch (length) {
+ switch (meta & 0xFF) {
case 1:
my_b_printf(file, "%d", (int) *ptr);
my_snprintf(typestr, typestr_length, "ENUM(1 byte)");
@@ -1768,15 +1784,15 @@ beg:
return 2;
}
default:
- my_b_printf(file, "!! Unknown ENUM packlen=%d", length);
+ my_b_printf(file, "!! Unknown ENUM packlen=%d", meta & 0xFF);
return 0;
}
break;
case MYSQL_TYPE_SET:
- my_b_write_bit(file, ptr , length * 8);
- my_snprintf(typestr, typestr_length, "SET(%d bytes)", length);
- return length;
+ my_b_write_bit(file, ptr , (meta & 0xFF) * 8);
+ my_snprintf(typestr, typestr_length, "SET(%d bytes)", meta & 0xFF);
+ return meta & 0xFF;
case MYSQL_TYPE_BLOB:
switch (meta) {
@@ -2368,7 +2384,7 @@ Query_log_event::Query_log_event()
*/
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
ulong query_length, bool using_trans,
- bool suppress_use, int errcode)
+ bool direct, bool suppress_use, int errcode)
:Log_event(thd_arg,
(thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
@@ -2427,7 +2443,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
the autocommit flag as written by the master to the binlog. This
behavior may change after WL#4162 has been implemented.
*/
- flags2= (uint32) (thd_arg->options &
+ flags2= (uint32) (thd_arg->variables.option_bits &
(OPTIONS_WRITTEN_TO_BIN_LOG & ~OPTION_NOT_AUTOCOMMIT));
DBUG_ASSERT(thd_arg->variables.character_set_client->number < 256*256);
DBUG_ASSERT(thd_arg->variables.collation_connection->number < 256*256);
@@ -2448,6 +2464,100 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
}
else
time_zone_len= 0;
+
+ /*
+ In what follows, we decide whether to write to the binary log or to use a
+ cache.
+ */
+ LEX *lex= thd->lex;
+ bool implicit_commit= FALSE;
+ bool force_trans= FALSE;
+ cache_type= Log_event::EVENT_INVALID_CACHE;
+ switch (lex->sql_command)
+ {
+ case SQLCOM_ALTER_DB:
+ case SQLCOM_CREATE_FUNCTION:
+ case SQLCOM_DROP_FUNCTION:
+ case SQLCOM_DROP_PROCEDURE:
+ case SQLCOM_INSTALL_PLUGIN:
+ case SQLCOM_UNINSTALL_PLUGIN:
+ case SQLCOM_ALTER_TABLESPACE:
+ implicit_commit= TRUE;
+ break;
+ case SQLCOM_DROP_TABLE:
+ force_trans= lex->drop_temporary && thd->in_multi_stmt_transaction();
+ implicit_commit= !force_trans;
+ break;
+ case SQLCOM_ALTER_TABLE:
+ case SQLCOM_CREATE_TABLE:
+ force_trans= (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) &&
+ thd->in_multi_stmt_transaction();
+ implicit_commit= !force_trans &&
+ !(lex->select_lex.item_list.elements &&
+ thd->is_current_stmt_binlog_format_row());
+ break;
+ case SQLCOM_SET_OPTION:
+ implicit_commit= (lex->autocommit ? TRUE : FALSE);
+ break;
+ /*
+ Replace what follows after CF_AUTO_COMMIT_TRANS is backported by:
+
+ default:
+ implicit_commit= ((sql_command_flags[lex->sql_command] &
+ CF_AUTO_COMMIT_TRANS));
+ break;
+ */
+ case SQLCOM_CREATE_INDEX:
+ case SQLCOM_TRUNCATE:
+ case SQLCOM_CREATE_DB:
+ case SQLCOM_DROP_DB:
+ case SQLCOM_ALTER_DB_UPGRADE:
+ case SQLCOM_RENAME_TABLE:
+ case SQLCOM_DROP_INDEX:
+ case SQLCOM_CREATE_VIEW:
+ case SQLCOM_DROP_VIEW:
+ case SQLCOM_CREATE_TRIGGER:
+ case SQLCOM_DROP_TRIGGER:
+ case SQLCOM_CREATE_EVENT:
+ case SQLCOM_ALTER_EVENT:
+ case SQLCOM_DROP_EVENT:
+ case SQLCOM_REPAIR:
+ case SQLCOM_OPTIMIZE:
+ case SQLCOM_ANALYZE:
+ case SQLCOM_CREATE_USER:
+ case SQLCOM_DROP_USER:
+ case SQLCOM_RENAME_USER:
+ case SQLCOM_REVOKE_ALL:
+ case SQLCOM_REVOKE:
+ case SQLCOM_GRANT:
+ case SQLCOM_CREATE_PROCEDURE:
+ case SQLCOM_CREATE_SPFUNCTION:
+ case SQLCOM_ALTER_PROCEDURE:
+ case SQLCOM_ALTER_FUNCTION:
+ case SQLCOM_ASSIGN_TO_KEYCACHE:
+ case SQLCOM_PRELOAD_KEYS:
+ case SQLCOM_FLUSH:
+ case SQLCOM_RESET:
+ case SQLCOM_CHECK:
+ implicit_commit= TRUE;
+ break;
+ default:
+ implicit_commit= FALSE;
+ break;
+ }
+
+ if (implicit_commit || direct)
+ {
+ cache_type= Log_event::EVENT_NO_CACHE;
+ }
+ else
+ {
+ cache_type= ((using_trans || stmt_has_updated_trans_table(thd) ||
+ force_trans || thd->thread_temporary_used)
+ ? Log_event::EVENT_TRANSACTIONAL_CACHE :
+ Log_event::EVENT_STMT_CACHE);
+ }
+ DBUG_ASSERT(cache_type != Log_event::EVENT_INVALID_CACHE);
DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %lu",
(ulong) flags2, sql_mode));
}
@@ -3066,7 +3176,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
}
else
{
- const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
+ const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
}
/*
@@ -3082,10 +3192,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
if (is_trans_keyword() || rpl_filter->db_ok(thd->db))
{
thd->set_time((time_t)when);
- thd->set_query((char*)query_arg, q_len_arg);
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query_id = next_query_id();
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
+ thd->set_query_and_id((char*)query_arg, q_len_arg, next_query_id());
thd->variables.pseudo_thread_id= thread_id; // for temp tables
DBUG_PRINT("query",("%s", thd->query()));
@@ -3094,13 +3201,13 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
{
if (flags2_inited)
/*
- all bits of thd->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
+ all bits of thd->variables.option_bits which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
must take their value from flags2.
*/
- thd->options= flags2|(thd->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
+ thd->variables.option_bits= flags2|(thd->variables.option_bits & ~OPTIONS_WRITTEN_TO_BIN_LOG);
/*
else, we are in a 3.23/4.0 binlog; we previously received a
- Rotate_log_event which reset thd->options and sql_mode etc, so
+ Rotate_log_event which reset thd->variables.option_bits and sql_mode etc, so
nothing to do.
*/
/*
@@ -3180,8 +3287,8 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
thd->table_map_for_update= (table_map)table_map_for_update;
/* Execute the query (note that we bypass dispatch_command()) */
- const char* found_semicolon= NULL;
- mysql_parse(thd, thd->query(), thd->query_length(), &found_semicolon);
+ Parser_state parser_state(thd, thd->query(), thd->query_length());
+ mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
log_slow_statement(thd);
/*
@@ -3222,7 +3329,7 @@ START SLAVE; . Query: '%s'", expected_error, thd->query());
}
/* If the query was not ignored, it is printed to the general log */
- if (!thd->is_error() || thd->main_da.sql_errno() != ER_SLAVE_IGNORED_TABLE)
+ if (!thd->is_error() || thd->stmt_da->sql_errno() != ER_SLAVE_IGNORED_TABLE)
general_log_write(thd, COM_QUERY, thd->query(), thd->query_length());
compare_errors:
@@ -3235,14 +3342,14 @@ compare_errors:
not exist errors", we silently clear the error if TEMPORARY was used.
*/
if (thd->lex->sql_command == SQLCOM_DROP_TABLE && thd->lex->drop_temporary &&
- thd->is_error() && thd->main_da.sql_errno() == ER_BAD_TABLE_ERROR &&
+ thd->is_error() && thd->stmt_da->sql_errno() == ER_BAD_TABLE_ERROR &&
!expected_error)
- thd->main_da.reset_diagnostics_area();
+ thd->stmt_da->reset_diagnostics_area();
/*
If we expected a non-zero error code, and we don't get the same error
code, and it should be ignored or is related to a concurrency issue.
*/
- actual_error= thd->is_error() ? thd->main_da.sql_errno() : 0;
+ actual_error= thd->is_error() ? thd->stmt_da->sql_errno() : 0;
DBUG_PRINT("info",("expected_error: %d sql_errno: %d",
expected_error, actual_error));
if ((expected_error && expected_error != actual_error &&
@@ -3257,7 +3364,7 @@ Error on master: '%s' (%d), Error on slave: '%s' (%d). \
Default database: '%s'. Query: '%s'",
ER_SAFE(expected_error),
expected_error,
- actual_error ? thd->main_da.message() : "no error",
+ actual_error ? thd->stmt_da->message() : "no error",
actual_error,
print_slave_db_safe(db), query_arg);
thd->is_slave_error= 1;
@@ -3281,7 +3388,7 @@ Default database: '%s'. Query: '%s'",
them back here.
*/
if (expected_error && expected_error == actual_error)
- ha_autocommit_or_rollback(thd, TRUE);
+ trans_rollback_stmt(thd);
}
/*
If we expected a non-zero error code and get nothing and, it is a concurrency
@@ -3290,7 +3397,8 @@ Default database: '%s'. Query: '%s'",
else if (expected_error && !actual_error &&
(concurrency_error_code(expected_error) ||
ignored_error_code(expected_error)))
- ha_autocommit_or_rollback(thd, TRUE);
+ trans_rollback_stmt(thd);
+
/*
Other cases: mostly we expected no error and get one.
*/
@@ -3298,7 +3406,7 @@ Default database: '%s'. Query: '%s'",
{
rli->report(ERROR_LEVEL, actual_error,
"Error '%s' on query. Default database: '%s'. Query: '%s'",
- (actual_error ? thd->main_da.message() :
+ (actual_error ? thd->stmt_da->message() :
"unexpected success or fatal error"),
print_slave_db_safe(thd->db), query_arg);
thd->is_slave_error= 1;
@@ -3327,6 +3435,21 @@ Default database: '%s'. Query: '%s'",
*/
} /* End of if (db_ok(... */
+ {/**
+ The following failure injecion works in cooperation with tests
+ setting @@global.debug= 'd,stop_slave_middle_group'.
+ The sql thread receives the killed status and will proceed
+ to shutdown trying to finish incomplete events group.
+ */
+ DBUG_EXECUTE_IF("stop_slave_middle_group",
+ if (strcmp("COMMIT", query) != 0 &&
+ strcmp("BEGIN", query) != 0)
+ {
+ if (thd->transaction.all.modified_non_trans_table)
+ const_cast<Relay_log_info*>(rli)->abort_slave= 1;
+ };);
+ }
+
end:
/*
Probably we have set thd->query, thd->db, thd->catalog to point to places
@@ -3385,13 +3508,13 @@ Query_log_event::do_shall_skip(Relay_log_info *rli)
{
if (strcmp("BEGIN", query) == 0)
{
- thd->options|= OPTION_BEGIN;
+ thd->variables.option_bits|= OPTION_BEGIN;
DBUG_RETURN(Log_event::continue_group(rli));
}
if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
{
- thd->options&= ~OPTION_BEGIN;
+ thd->variables.option_bits&= ~OPTION_BEGIN;
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
}
@@ -3644,10 +3767,11 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
*/
if (post_header_len)
{
+#ifndef DBUG_OFF
// Allows us to sanity-check that all events initialized their
// events (see the end of this 'if' block).
- IF_DBUG(memset(post_header_len, 255,
- number_of_event_types*sizeof(uint8)););
+ memset(post_header_len, 255, number_of_event_types*sizeof(uint8));
+#endif
/* Note: all event types must explicitly fill in their lengths here. */
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
@@ -3698,13 +3822,12 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[UPDATE_ROWS_EVENT-1]=
post_header_len[DELETE_ROWS_EVENT-1]= 6;);
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
+ post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
// Sanity-check that all post header lengths are initialized.
- IF_DBUG({
- int i;
- for (i=0; i<number_of_event_types; i++)
- assert(post_header_len[i] != 255);
- });
+ int i;
+ for (i=0; i<number_of_event_types; i++)
+ DBUG_ASSERT(post_header_len[i] != 255);
}
break;
@@ -3929,7 +4052,6 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
int ret= 0;
DBUG_ENTER("Format_description_log_event::do_apply_event");
-#ifdef USING_TRANSACTIONS
/*
As a transaction NEVER spans on 2 or more binlogs:
if we have an active transaction at this point, the master died
@@ -3951,7 +4073,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
"its binary log, thus rolled back too.");
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 1);
}
-#endif
+
/*
If this event comes from ourselves, there is no cleaning task to
perform, we don't call Start_log_event_v3::do_apply_event()
@@ -4620,22 +4742,14 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
if (rpl_filter->db_ok(thd->db))
{
thd->set_time((time_t)when);
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query_id = next_query_id();
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
- /*
- Initing thd->row_count is not necessary in theory as this variable has no
- influence in the case of the slave SQL thread (it is used to generate a
- "data truncated" warning but which is absorbed and never gets to the
- error log); still we init it to avoid a Valgrind message.
- */
- mysql_reset_errors(thd, 0);
+ thd->set_query_id(next_query_id());
+ thd->warning_info->opt_clear_warning_info(thd->query_id);
TABLE_LIST tables;
- bzero((char*) &tables,sizeof(tables));
- tables.db= thd->strmake(thd->db, thd->db_length);
- tables.alias = tables.table_name = (char*) table_name;
- tables.lock_type = TL_WRITE;
+ tables.init_one_table(thd->strmake(thd->db, thd->db_length),
+ thd->db_length,
+ table_name, strlen(table_name),
+ table_name, TL_WRITE);
tables.updating= 1;
// the table will be opened in mysql_load
@@ -4789,8 +4903,8 @@ error:
int sql_errno;
if (thd->is_error())
{
- err= thd->main_da.message();
- sql_errno= thd->main_da.sql_errno();
+ err= thd->stmt_da->message();
+ sql_errno= thd->stmt_da->sql_errno();
}
else
{
@@ -4983,7 +5097,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
!is_relay_log_event() &&
!rli->is_in_group())
{
- pthread_mutex_lock(&rli->data_lock);
+ mysql_mutex_lock(&rli->data_lock);
DBUG_PRINT("info", ("old group_master_log_name: '%s' "
"old group_master_log_pos: %lu",
rli->group_master_log_name,
@@ -4995,11 +5109,11 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
"new group_master_log_pos: %lu",
rli->group_master_log_name,
(ulong) rli->group_master_log_pos));
- pthread_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
flush_relay_log_info(rli);
/*
- Reset thd->options and sql_mode etc, because this could be the signal of
+ Reset thd->variables.option_bits and sql_mode etc, because this could be the signal of
a master's downgrade from 5.0 to 4.0.
However, no need to reset description_event_for_exec: indeed, if the next
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
@@ -5358,10 +5472,16 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Xid_log_event::do_apply_event(Relay_log_info const *rli)
{
+ bool res;
/* For a slave Xid_log_event is COMMIT */
general_log_print(thd, COM_QUERY,
"COMMIT /* implicit, from Xid_log_event */");
- return end_trans(thd, COMMIT);
+ if (!(res= trans_commit(thd)))
+ {
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ }
+ return res;
}
Log_event::enum_skip_reason
@@ -5369,7 +5489,7 @@ Xid_log_event::do_shall_skip(Relay_log_info *rli)
{
DBUG_ENTER("Xid_log_event::do_shall_skip");
if (rli->slave_skip_counter > 0) {
- thd->options&= ~OPTION_BEGIN;
+ thd->variables.option_bits&= ~OPTION_BEGIN;
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
DBUG_RETURN(Log_event::do_shall_skip(rli));
@@ -5401,16 +5521,18 @@ void User_var_log_event::pack_info(Protocol* protocol)
case REAL_RESULT:
double real_val;
float8get(real_val, val);
- if (!(buf= (char*) my_malloc(val_offset + FLOATING_POINT_BUFFER,
+ if (!(buf= (char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1,
MYF(MY_WME))))
return;
- event_len+= my_sprintf(buf + val_offset,
- (buf + val_offset, "%.14g", real_val));
+ event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH,
+ buf + val_offset, NULL);
break;
case INT_RESULT:
if (!(buf= (char*) my_malloc(val_offset + 22, MYF(MY_WME))))
return;
- event_len= longlong10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
+ event_len= longlong10_to_str(uint8korr(val), buf + val_offset,
+ ((flags & User_var_log_event::UNSIGNED_F) ?
+ 10 : -10))-buf;
break;
case DECIMAL_RESULT:
{
@@ -5468,12 +5590,14 @@ User_var_log_event(const char* buf,
:Log_event(buf, description_event)
{
/* The Post-Header is empty. The Variable Data part begins immediately. */
+ const char *start= buf;
buf+= description_event->common_header_len +
description_event->post_header_len[USER_VAR_EVENT-1];
name_len= uint4korr(buf);
name= (char *) buf + UV_NAME_LEN_SIZE;
buf+= UV_NAME_LEN_SIZE + name_len;
is_null= (bool) *buf;
+ flags= User_var_log_event::UNDEF_F; // defaults to UNDEF_F
if (is_null)
{
type= STRING_RESULT;
@@ -5489,6 +5613,27 @@ User_var_log_event(const char* buf,
UV_CHARSET_NUMBER_SIZE);
val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
+
+ /**
+ We need to check if this is from an old server
+ that did not pack information for flags.
+ We do this by checking if there are extra bytes
+ after the packed value. If there are we take the
+ extra byte and it's value is assumed to contain
+ the flags value.
+
+ Old events will not have this extra byte, thence,
+ we keep the flags set to UNDEF_F.
+ */
+ uint bytes_read= ((val + val_len) - start);
+ DBUG_ASSERT(bytes_read==data_written ||
+ bytes_read==(data_written-1));
+ if ((data_written - bytes_read) > 0)
+ {
+ flags= (uint) *(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
+ UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE +
+ val_len);
+ }
}
}
@@ -5500,6 +5645,7 @@ bool User_var_log_event::write(IO_CACHE* file)
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
uchar buf2[max(8, DECIMAL_MAX_FIELD_SIZE + 2)], *pos= buf2;
+ uint unsigned_len= 0;
uint buf1_length;
ulong event_length;
@@ -5521,6 +5667,7 @@ bool User_var_log_event::write(IO_CACHE* file)
break;
case INT_RESULT:
int8store(buf2, *(longlong*) val);
+ unsigned_len= 1;
break;
case DECIMAL_RESULT:
{
@@ -5545,13 +5692,14 @@ bool User_var_log_event::write(IO_CACHE* file)
}
/* Length of the whole event */
- event_length= sizeof(buf)+ name_len + buf1_length + val_len;
+ event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len;
return (write_header(file, event_length) ||
my_b_safe_write(file, (uchar*) buf, sizeof(buf)) ||
my_b_safe_write(file, (uchar*) name, name_len) ||
my_b_safe_write(file, (uchar*) buf1, buf1_length) ||
- my_b_safe_write(file, pos, val_len));
+ my_b_safe_write(file, pos, val_len) ||
+ my_b_safe_write(file, &flags, unsigned_len));
}
#endif
@@ -5592,7 +5740,8 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
break;
case INT_RESULT:
char int_buf[22];
- longlong10_to_str(uint8korr(val), int_buf, -10);
+ longlong10_to_str(uint8korr(val), int_buf,
+ ((flags & User_var_log_event::UNSIGNED_F) ? 10 : -10));
my_b_printf(&cache, ":=%s%s\n", int_buf, print_event_info->delimiter);
break;
case DECIMAL_RESULT:
@@ -5739,7 +5888,8 @@ int User_var_log_event::do_apply_event(Relay_log_info const *rli)
a single record and with a single column. Thus, like
a column value, it could always have IMPLICIT derivation.
*/
- e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
+ e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT,
+ (flags & User_var_log_event::UNSIGNED_F));
free_root(thd->mem_root,0);
return 0;
@@ -5816,8 +5966,8 @@ Slave_log_event::Slave_log_event(THD* thd_arg,
Master_info* mi = rli->mi;
// TODO: re-write this better without holding both locks at the same time
- pthread_mutex_lock(&mi->data_lock);
- pthread_mutex_lock(&rli->data_lock);
+ mysql_mutex_lock(&mi->data_lock);
+ mysql_mutex_lock(&rli->data_lock);
master_host_len = strlen(mi->host);
master_log_len = strlen(rli->group_master_log_name);
// on OOM, just do not initialize the structure and print the error
@@ -5835,8 +5985,8 @@ Slave_log_event::Slave_log_event(THD* thd_arg,
}
else
sql_print_error("Out of memory while recording slave event");
- pthread_mutex_unlock(&rli->data_lock);
- pthread_mutex_unlock(&mi->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&mi->data_lock);
DBUG_VOID_RETURN;
}
#endif /* !MYSQL_CLIENT */
@@ -5970,7 +6120,7 @@ int Stop_log_event::do_update_pos(Relay_log_info *rli)
could give false triggers in MASTER_POS_WAIT() that we have reached
the target position when in fact we have not.
*/
- if (thd->options & OPTION_BEGIN)
+ if (thd->variables.option_bits & OPTION_BEGIN)
rli->inc_event_relay_log_pos();
else
{
@@ -6193,10 +6343,12 @@ int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
fname_buf= strmov(proc_info, "Making temp file ");
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
thd_proc_info(thd, proc_info);
- my_delete(fname_buf, MYF(0)); // old copy may exist already
- if ((fd= my_create(fname_buf, CREATE_MODE,
- O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
- MYF(MY_WME))) < 0 ||
+ /* old copy may exist already */
+ mysql_file_delete(key_file_log_event_info, fname_buf, MYF(0));
+ if ((fd= mysql_file_create(key_file_log_event_info,
+ fname_buf, CREATE_MODE,
+ O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
+ MYF(MY_WME))) < 0 ||
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
@@ -6218,20 +6370,22 @@ int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
goto err;
}
end_io_cache(&file);
- my_close(fd, MYF(0));
+ mysql_file_close(fd, MYF(0));
// fname_buf now already has .data, not .info, because we did our trick
- my_delete(fname_buf, MYF(0)); // old copy may exist already
- if ((fd= my_create(fname_buf, CREATE_MODE,
- O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
- MYF(MY_WME))) < 0)
+ /* old copy may exist already */
+ mysql_file_delete(key_file_log_event_data, fname_buf, MYF(0));
+ if ((fd= mysql_file_create(key_file_log_event_data,
+ fname_buf, CREATE_MODE,
+ O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
+ MYF(MY_WME))) < 0)
{
rli->report(ERROR_LEVEL, my_errno,
"Error in Create_file event: could not open file '%s'",
fname_buf);
goto err;
}
- if (my_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP)))
+ if (mysql_file_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP)))
{
rli->report(ERROR_LEVEL, my_errno,
"Error in Create_file event: write to '%s' failed",
@@ -6244,7 +6398,7 @@ err:
if (error)
end_io_cache(&file);
if (fd >= 0)
- my_close(fd, MYF(0));
+ mysql_file_close(fd, MYF(0));
thd_proc_info(thd, 0);
return error != 0;
}
@@ -6376,10 +6530,12 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
*/
lex_start(thd);
mysql_reset_thd_for_next_command(thd);
- my_delete(fname, MYF(0)); // old copy may exist already
- if ((fd= my_create(fname, CREATE_MODE,
- O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
- MYF(MY_WME))) < 0)
+ /* old copy may exist already */
+ mysql_file_delete(key_file_log_event_data, fname, MYF(0));
+ if ((fd= mysql_file_create(key_file_log_event_data,
+ fname, CREATE_MODE,
+ O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
+ MYF(MY_WME))) < 0)
{
rli->report(ERROR_LEVEL, my_errno,
"Error in %s event: could not create file '%s'",
@@ -6387,8 +6543,10 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
goto err;
}
}
- else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW,
- MYF(MY_WME))) < 0)
+ else if ((fd= mysql_file_open(key_file_log_event_data,
+ fname,
+ O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW,
+ MYF(MY_WME))) < 0)
{
rli->report(ERROR_LEVEL, my_errno,
"Error in %s event: could not open file '%s'",
@@ -6396,10 +6554,12 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
goto err;
}
- DBUG_EXECUTE_IF("remove_slave_load_file_before_write",
- my_close(fd,MYF(0)); fd= -1; my_delete(fname, MYF(0)););
+ DBUG_EXECUTE_IF("remove_slave_load_file_before_write",
+ {
+ my_delete_allow_opened(fname, MYF(0));
+ });
- if (my_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP)))
+ if (mysql_file_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP)))
{
rli->report(ERROR_LEVEL, my_errno,
"Error in %s event: write to '%s' failed",
@@ -6410,7 +6570,7 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
err:
if (fd >= 0)
- my_close(fd, MYF(0));
+ mysql_file_close(fd, MYF(0));
thd_proc_info(thd, 0);
DBUG_RETURN(error);
}
@@ -6504,9 +6664,9 @@ int Delete_file_log_event::do_apply_event(Relay_log_info const *rli)
{
char fname[FN_REFLEN+10];
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
- (void) my_delete(fname, MYF(MY_WME));
+ mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME));
strmov(ext, ".info");
- (void) my_delete(fname, MYF(MY_WME));
+ mysql_file_delete(key_file_log_event_info, fname, MYF(MY_WME));
return 0;
}
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
@@ -6607,8 +6767,9 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
Load_log_event *lev= 0;
ext= slave_load_file_stem(fname, file_id, server_id, ".info");
- if ((fd = my_open(fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
- MYF(MY_WME))) < 0 ||
+ if ((fd= mysql_file_open(key_file_log_event_info,
+ fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
+ MYF(MY_WME))) < 0 ||
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
@@ -6618,7 +6779,7 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
goto err;
}
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
- (pthread_mutex_t*)0,
+ (mysql_mutex_t*)0,
rli->relay_log.description_event_for_exec)) ||
lev->get_type_code() != NEW_LOAD_EVENT)
{
@@ -6658,24 +6819,24 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
}
/*
We have an open file descriptor to the .info file; we need to close it
- or Windows will refuse to delete the file in my_delete().
+ or Windows will refuse to delete the file in mysql_file_delete().
*/
if (fd >= 0)
{
- my_close(fd, MYF(0));
+ mysql_file_close(fd, MYF(0));
end_io_cache(&file);
fd= -1;
}
- (void) my_delete(fname, MYF(MY_WME));
+ mysql_file_delete(key_file_log_event_info, fname, MYF(MY_WME));
memcpy(ext, ".data", 6);
- (void) my_delete(fname, MYF(MY_WME));
+ mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME));
error = 0;
err:
delete lev;
if (fd >= 0)
{
- my_close(fd, MYF(0));
+ mysql_file_close(fd, MYF(0));
end_io_cache(&file);
}
return error;
@@ -6740,9 +6901,9 @@ Execute_load_query_log_event(THD *thd_arg, const char* query_arg,
ulong query_length_arg, uint fn_pos_start_arg,
uint fn_pos_end_arg,
enum_load_dup_handling dup_handling_arg,
- bool using_trans, bool suppress_use,
+ bool using_trans, bool direct, bool suppress_use,
int errcode):
- Query_log_event(thd_arg, query_arg, query_length_arg, using_trans,
+ Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, direct,
suppress_use, errcode),
file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg),
fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
@@ -6914,7 +7075,7 @@ Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
file so that we can re-execute this event at START SLAVE.
*/
if (!error)
- (void) my_delete(fname, MYF(MY_WME));
+ mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME));
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
return error;
@@ -7037,9 +7198,9 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
DBUG_ASSERT((tbl_arg && tbl_arg->s && tid != ~0UL) ||
(!tbl_arg && !cols && tid == ~0UL));
- if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
+ if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS)
set_flags(NO_FOREIGN_KEY_CHECKS_F);
- if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
+ if (thd_arg->variables.option_bits & OPTION_RELAXED_UNIQUE_CHECKS)
set_flags(RELAXED_UNIQUE_CHECKS_F);
/* if bitmap_init fails, caught in is_valid() */
if (likely(!bitmap_init(&m_cols,
@@ -7284,8 +7445,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
DBUG_ASSERT(get_flags(STMT_END_F));
- const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
- close_thread_tables(thd);
+ const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
thd->clear_error();
DBUG_RETURN(0);
}
@@ -7311,7 +7471,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
We also call the mysql_reset_thd_for_next_command(), since this
is the logical start of the next "statement". Note that this
- call might reset the value of current_stmt_binlog_row_based, so
+ call might reset the value of current_stmt_binlog_format, so
we need to do any changes to that value after this function.
*/
lex_start(thd);
@@ -7323,16 +7483,12 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
thd->transaction.stmt.modified_non_trans_table= FALSE;
/*
- Check if the slave is set to use SBR. If so, it should switch
- to using RBR until the end of the "statement", i.e., next
- STMT_END_F or next error.
+ This is a row injection, so we flag the "statement" as
+ such. Note that this code is called both when the slave does row
+ injections and when the BINLOG statement is used to do row
+ injections.
*/
- if (!thd->current_stmt_binlog_row_based &&
- mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG))
- {
- thd->set_current_stmt_binlog_row_based();
- }
-
+ thd->lex->set_stmt_row_injection();
/*
There are a few flags that are replicated with each row event.
@@ -7340,20 +7496,20 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
the event.
*/
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
- thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
+ thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
else
- thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
+ thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
- thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
+ thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
else
- thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
+ thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
/* A small test to verify that objects have consistent types */
- DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
+ DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
- if (simple_open_n_lock_tables(thd, rli->tables_to_lock))
+ if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
{
- uint actual_error= thd->main_da.sql_errno();
+ uint actual_error= thd->stmt_da->sql_errno();
if (thd->is_slave_error || thd->is_fatal_error)
{
/*
@@ -7363,12 +7519,12 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
having severe errors which should not be skiped.
*/
rli->report(ERROR_LEVEL, actual_error,
- "Error '%s' on opening tables",
- (actual_error ? thd->main_da.message() :
+ "Error executing row event: '%s'",
+ (actual_error ? thd->stmt_da->message() :
"unexpected success or fatal error"));
thd->is_slave_error= 1;
}
- const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
+ const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
DBUG_RETURN(actual_error);
}
@@ -7381,11 +7537,18 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
{
+ DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
+ rli->tables_to_lock));
RPL_TABLE_LIST *ptr= rli->tables_to_lock;
for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
{
- if (ptr->m_tabledef.compatible_with(rli, ptr->table))
+ TABLE *conv_table;
+ if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
+ ptr->table, &conv_table))
{
+ DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master",
+ ptr->table->s->db.str,
+ ptr->table->s->table_name.str));
/*
We should not honour --slave-skip-errors at this point as we are
having severe errors which should not be skiped.
@@ -7393,15 +7556,20 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
mysql_unlock_tables(thd, thd->lock);
thd->lock= 0;
thd->is_slave_error= 1;
- const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
+ const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
DBUG_RETURN(ERR_BAD_TABLE_DEF);
}
+ DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
+ " - conv_table: %p",
+ ptr->table->s->db.str,
+ ptr->table->s->table_name.str, conv_table));
+ ptr->m_conv_table= conv_table;
}
}
/*
- ... and then we add all the tables to the table map and remove
- them from tables to lock.
+ ... and then we add all the tables to the table map and but keep
+ them in the tables to lock list.
We also invalidate the query cache for all the tables, since
they will now be changed.
@@ -7503,8 +7671,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
{
int actual_error= convert_handler_error(error, thd, table);
bool idempotent_error= (idempotent_error_code(error) &&
- ((bit_is_set(slave_exec_mode,
- SLAVE_EXEC_MODE_IDEMPOTENT)) == 1));
+ (slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT));
bool ignored_error= (idempotent_error == 0 ?
ignored_error_code(actual_error) : 0);
@@ -7546,8 +7713,16 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
thd->transaction.stmt.modified_non_trans_table= TRUE;
} // row processing loop
- DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event",
- const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
+ {/**
+ The following failure injecion works in cooperation with tests
+ setting @@global.debug= 'd,stop_slave_middle_group'.
+ The sql thread receives the killed status and will proceed
+ to shutdown trying to finish incomplete events group.
+ */
+ DBUG_EXECUTE_IF("stop_slave_middle_group",
+ if (thd->transaction.all.modified_non_trans_table)
+ const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
+ }
if ((error= do_after_row_operations(rli, error)) &&
ignored_error_code(convert_handler_error(error, thd, table)))
@@ -7560,57 +7735,26 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
error= 0;
}
-
- if (!cache_stmt)
- {
- DBUG_PRINT("info", ("Marked that we need to keep log"));
- thd->options|= OPTION_KEEP_LOG;
- }
} // if (table)
- /*
- We need to delay this clear until here bacause unpack_current_row() uses
- master-side table definitions stored in rli.
- */
- if (rli->tables_to_lock && get_flags(STMT_END_F))
- const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
if (error)
{
slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
get_type_str(),
RPL_LOG_NAME, (ulong) log_pos);
- thd->reset_current_stmt_binlog_row_based();
+ /*
+ @todo We should probably not call
+ reset_current_stmt_binlog_format_row() from here.
+
+ Note: this applies to log_event_old.cc too.
+ /Sven
+ */
+ thd->reset_current_stmt_binlog_format_row();
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
thd->is_slave_error= 1;
DBUG_RETURN(error);
}
- /*
- This code would ideally be placed in do_update_pos() instead, but
- since we have no access to table there, we do the setting of
- last_event_start_time here instead.
- */
- else if (table && (table->s->primary_key == MAX_KEY) &&
- !cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
- {
- /*
- ------------ Temporary fix until WL#2975 is implemented ---------
-
- This event is not the last one (no STMT_END_F). If we stop now
- (in case of terminate_slave_thread()), how will we restart? We
- have to restart from Table_map_log_event, but as this table is
- not transactional, the rows already inserted will still be
- present, and idempotency is not guaranteed (no PK) so we risk
- that repeating leads to double insert. So we desperately try to
- continue, hope we'll eventually leave this buggy situation (by
- executing the final Rows_log_event). If we are in a hopeless
- wait (reached end of last relay log and nothing gets appended
- there), we timeout after one minute, and notify DBA about the
- problem. When WL#2975 is implemented, just remove the member
- Relay_log_info::last_event_start_time and all its occurrences.
- */
- const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
- }
if (get_flags(STMT_END_F))
if ((error= rows_event_stmt_cleanup(rli, thd)))
@@ -7667,7 +7811,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
(assume the last master's transaction is ignored by the slave because of
replicate-ignore rules).
*/
- error= thd->binlog_flush_pending_rows_event(true);
+ error= thd->binlog_flush_pending_rows_event(TRUE);
/*
If this event is not in a transaction, the call below will, if some
@@ -7678,7 +7822,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
are involved, commit the transaction and flush the pending event to the
binlog.
*/
- error|= ha_autocommit_or_rollback(thd, error);
+ error|= (error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd));
/*
Now what if this is not a transactional engine? we still need to
@@ -7690,7 +7834,17 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
event flushed.
*/
- thd->reset_current_stmt_binlog_row_based();
+ /*
+ @todo We should probably not call
+ reset_current_stmt_binlog_format_row() from here.
+
+ Note: this applies to log_event_old.cc too
+
+ Btw, the previous comment about transactional engines does not
+ seem related to anything that happens here.
+ /Sven
+ */
+ thd->reset_current_stmt_binlog_format_row();
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
}
@@ -7897,7 +8051,10 @@ int Table_map_log_event::save_field_metadata()
DBUG_ENTER("Table_map_log_event::save_field_metadata");
int index= 0;
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
+ {
+ DBUG_PRINT("debug", ("field_type: %d", m_coltype[i]));
index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
+ }
DBUG_RETURN(index);
}
#endif /* !defined(MYSQL_CLIENT) */
@@ -7910,7 +8067,7 @@ int Table_map_log_event::save_field_metadata()
#if !defined(MYSQL_CLIENT)
Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
bool is_transactional)
- : Log_event(thd, 0, true),
+ : Log_event(thd, 0, is_transactional),
m_table(tbl),
m_dbnam(tbl->s->db.str),
m_dblen(m_dbnam ? tbl->s->db.length : 0),
@@ -8135,9 +8292,7 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
DBUG_ASSERT(rli->sql_thd == thd);
/* Step the query id to mark what columns that are actually used. */
- pthread_mutex_lock(&LOCK_thread_count);
- thd->query_id= next_query_id();
- pthread_mutex_unlock(&LOCK_thread_count);
+ thd->set_query_id(next_query_id());
if (!(memory= my_multi_malloc(MYF(MY_WME),
&table_list, (uint) sizeof(RPL_TABLE_LIST),
@@ -8146,15 +8301,15 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
NullS)))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
- bzero(table_list, sizeof(*table_list));
- table_list->db = db_mem;
- table_list->alias= table_list->table_name = tname_mem;
- table_list->lock_type= TL_WRITE;
- table_list->next_global= table_list->next_local= 0;
+ strmov(db_mem, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
+ strmov(tname_mem, m_tblnam);
+
+ table_list->init_one_table(db_mem, strlen(db_mem),
+ tname_mem, strlen(tname_mem),
+ tname_mem, TL_WRITE);
+
table_list->table_id= m_table_id;
table_list->updating= 1;
- strmov(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
- strmov(table_list->table_name, m_tblnam);
int error= 0;
@@ -8338,8 +8493,8 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability
todo: to introduce a property for the event (handler?) which forces
applying the event in the replace (idempotent) fashion.
*/
- if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 ||
- m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
+ if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
+ (m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER))
{
/*
We are using REPLACE semantics and not INSERT IGNORE semantics
@@ -8417,7 +8572,7 @@ Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *
int local_error= 0;
m_table->next_number_field=0;
m_table->auto_increment_field_not_null= FALSE;
- if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 ||
+ if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
{
m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
@@ -8718,9 +8873,7 @@ int
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
{
DBUG_ASSERT(m_table != NULL);
- int error=
- write_row(rli, /* if 1 then overwrite */
- bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
+ int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
if (error && !thd->is_error())
{
@@ -9565,3 +9718,16 @@ st_print_event_info::st_print_event_info()
open_cached_file(&body_cache, NULL, NULL, 0, flags);
}
#endif
+
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event)
+{
+ uint8 header_size= description_event->common_header_len;
+ ident_len = event_len - header_size;
+ set_if_smaller(ident_len,FN_REFLEN-1);
+ log_ident= buf + header_size;
+}
+#endif