summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc518
1 files changed, 335 insertions, 183 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 7a1462974a2..fe86e78f7e3 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -1,4 +1,4 @@
-/* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc.
+/* Copyright 2000-2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -29,14 +29,15 @@
#include "rpl_rli.h"
#include "rpl_mi.h"
#include "rpl_filter.h"
-#include "rpl_utility.h"
#include "rpl_record.h"
+#include "transaction.h"
#include <my_dir.h>
#endif /* MYSQL_CLIENT */
#include <base64.h>
#include <my_bitmap.h>
+#include "rpl_utility.h"
#define log_cs &my_charset_latin1
@@ -497,7 +498,7 @@ static void cleanup_load_tmpdir()
if (is_prefix(file->name, prefbuf))
{
fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
- my_delete(fname, MYF(0));
+ mysql_file_delete(key_file_misc, fname, MYF(0));
}
}
@@ -665,10 +666,11 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
{
server_id= thd->server_id;
when= thd->start_time;
- cache_stmt= using_trans;
+ cache_type= (using_trans || stmt_has_updated_trans_table(thd)
+ ? Log_event::EVENT_TRANSACTIONAL_CACHE :
+ Log_event::EVENT_STMT_CACHE);
}
-
/**
This minimal constructor is for when you are not even sure that there
is a valid THD. For example in the server when we are shutting down or
@@ -677,8 +679,8 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
*/
Log_event::Log_event()
- :temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
- thd(0)
+ :temp_buf(0), exec_time(0), flags(0),
+ cache_type(Log_event::EVENT_INVALID_CACHE), thd(0)
{
server_id= ::server_id;
/*
@@ -697,7 +699,7 @@ Log_event::Log_event()
Log_event::Log_event(const char* buf,
const Format_description_log_event* description_event)
- :temp_buf(0), cache_stmt(0)
+ :temp_buf(0), cache_type(Log_event::EVENT_INVALID_CACHE)
{
#ifndef MYSQL_CLIENT
thd = 0;
@@ -967,7 +969,7 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
*/
int Log_event::read_log_event(IO_CACHE* file, String* packet,
- pthread_mutex_t* log_lock)
+ mysql_mutex_t* log_lock)
{
ulong data_len;
int result=0;
@@ -975,7 +977,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
DBUG_ENTER("Log_event::read_log_event");
if (log_lock)
- pthread_mutex_lock(log_lock);
+ mysql_mutex_lock(log_lock);
if (my_b_read(file, (uchar*) buf, sizeof(buf)))
{
/*
@@ -1033,14 +1035,14 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
end:
if (log_lock)
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
DBUG_RETURN(result);
}
#endif /* !MYSQL_CLIENT */
#ifndef MYSQL_CLIENT
-#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
-#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
+#define UNLOCK_MUTEX if (log_lock) mysql_mutex_unlock(log_lock);
+#define LOCK_MUTEX if (log_lock) mysql_mutex_lock(log_lock);
#else
#define UNLOCK_MUTEX
#define LOCK_MUTEX
@@ -1052,7 +1054,7 @@ end:
Allocates memory; The caller is responsible for clean-up.
*/
Log_event* Log_event::read_log_event(IO_CACHE* file,
- pthread_mutex_t* log_lock,
+ mysql_mutex_t* log_lock,
const Format_description_log_event
*description_event)
#else
@@ -1200,15 +1202,11 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
*/
if (description_event->event_type_permutation)
{
-#ifndef DBUG_OFF
- int new_event_type=
- description_event->event_type_permutation[event_type];
- DBUG_PRINT("info",
- ("converting event type %d to %d (%s)",
- event_type, new_event_type,
- get_type_str((Log_event_type)new_event_type)));
-#endif
- event_type= description_event->event_type_permutation[event_type];
+ int new_event_type= description_event->event_type_permutation[event_type];
+ DBUG_PRINT("info", ("converting event type %d to %d (%s)",
+ event_type, new_event_type,
+ get_type_str((Log_event_type)new_event_type)));
+ event_type= new_event_type;
}
switch(event_type) {
@@ -1571,37 +1569,14 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr,
/* a long CHAR() field: see #37426 */
length= byte1 | (((byte0 & 0x30) ^ 0x30) << 4);
type= byte0 | 0x30;
- goto beg;
- }
-
- switch (byte0)
- {
- case MYSQL_TYPE_SET:
- case MYSQL_TYPE_ENUM:
- case MYSQL_TYPE_STRING:
- type= byte0;
- length= byte1;
- break;
-
- default:
-
- {
- char tmp[5];
- my_snprintf(tmp, sizeof(tmp), "%04X", meta);
- my_b_printf(file,
- "!! Don't know how to handle column type=%d meta=%d (%s)",
- type, meta, tmp);
- return 0;
- }
}
+ else
+ length = meta & 0xFF;
}
else
length= meta;
}
-
-beg:
-
switch (type) {
case MYSQL_TYPE_LONG:
{
@@ -1738,6 +1713,33 @@ beg:
return 3;
}
+ case MYSQL_TYPE_NEWDATE:
+ {
+ uint32 tmp= uint3korr(ptr);
+ int part;
+ char buf[10];
+ char *pos= &buf[10];
+
+ /* Copied from field.cc */
+ *pos--=0; // End NULL
+ part=(int) (tmp & 31);
+ *pos--= (char) ('0'+part%10);
+ *pos--= (char) ('0'+part/10);
+ *pos--= ':';
+ part=(int) (tmp >> 5 & 15);
+ *pos--= (char) ('0'+part%10);
+ *pos--= (char) ('0'+part/10);
+ *pos--= ':';
+ part=(int) (tmp >> 9);
+ *pos--= (char) ('0'+part%10); part/=10;
+ *pos--= (char) ('0'+part%10); part/=10;
+ *pos--= (char) ('0'+part%10); part/=10;
+ *pos= (char) ('0'+part);
+ my_b_printf(file , "'%s'", buf);
+ my_snprintf(typestr, typestr_length, "DATE");
+ return 3;
+ }
+
case MYSQL_TYPE_DATE:
{
uint i32= uint3korr(ptr);
@@ -1756,7 +1758,7 @@ beg:
}
case MYSQL_TYPE_ENUM:
- switch (length) {
+ switch (meta & 0xFF) {
case 1:
my_b_printf(file, "%d", (int) *ptr);
my_snprintf(typestr, typestr_length, "ENUM(1 byte)");
@@ -1769,15 +1771,15 @@ beg:
return 2;
}
default:
- my_b_printf(file, "!! Unknown ENUM packlen=%d", length);
+ my_b_printf(file, "!! Unknown ENUM packlen=%d", meta & 0xFF);
return 0;
}
break;
case MYSQL_TYPE_SET:
- my_b_write_bit(file, ptr , length * 8);
- my_snprintf(typestr, typestr_length, "SET(%d bytes)", length);
- return length;
+ my_b_write_bit(file, ptr , (meta & 0xFF) * 8);
+ my_snprintf(typestr, typestr_length, "SET(%d bytes)", meta & 0xFF);
+ return meta & 0xFF;
case MYSQL_TYPE_BLOB:
switch (meta) {
@@ -2369,7 +2371,7 @@ Query_log_event::Query_log_event()
*/
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
ulong query_length, bool using_trans,
- bool suppress_use, int errcode)
+ bool direct, bool suppress_use, int errcode)
:Log_event(thd_arg,
(thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
@@ -2428,7 +2430,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
the autocommit flag as written by the master to the binlog. This
behavior may change after WL#4162 has been implemented.
*/
- flags2= (uint32) (thd_arg->options &
+ flags2= (uint32) (thd_arg->variables.option_bits &
(OPTIONS_WRITTEN_TO_BIN_LOG & ~OPTION_NOT_AUTOCOMMIT));
DBUG_ASSERT(thd_arg->variables.character_set_client->number < 256*256);
DBUG_ASSERT(thd_arg->variables.collation_connection->number < 256*256);
@@ -2449,6 +2451,96 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
}
else
time_zone_len= 0;
+
+ /*
+ In what follows, we decide whether to write to the binary log or to use a
+ cache.
+ */
+ LEX *lex= thd->lex;
+ bool implicit_commit= FALSE;
+ cache_type= Log_event::EVENT_INVALID_CACHE;
+ switch (lex->sql_command)
+ {
+ case SQLCOM_ALTER_DB:
+ case SQLCOM_CREATE_FUNCTION:
+ case SQLCOM_DROP_FUNCTION:
+ case SQLCOM_DROP_PROCEDURE:
+ case SQLCOM_INSTALL_PLUGIN:
+ case SQLCOM_UNINSTALL_PLUGIN:
+ case SQLCOM_ALTER_TABLESPACE:
+ implicit_commit= TRUE;
+ break;
+ case SQLCOM_DROP_TABLE:
+ implicit_commit= !(lex->drop_temporary && thd->in_multi_stmt_transaction());
+ break;
+ case SQLCOM_ALTER_TABLE:
+ case SQLCOM_CREATE_TABLE:
+ implicit_commit= !((lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) &&
+ thd->in_multi_stmt_transaction()) &&
+ !(lex->select_lex.item_list.elements &&
+ thd->is_current_stmt_binlog_format_row());
+ break;
+ case SQLCOM_SET_OPTION:
+ implicit_commit= (lex->autocommit ? TRUE : FALSE);
+ break;
+ /*
+ Replace what follows after CF_AUTO_COMMIT_TRANS is backported by:
+
+ default:
+ implicit_commit= ((sql_command_flags[lex->sql_command] &
+ CF_AUTO_COMMIT_TRANS));
+ break;
+ */
+ case SQLCOM_CREATE_INDEX:
+ case SQLCOM_TRUNCATE:
+ case SQLCOM_CREATE_DB:
+ case SQLCOM_DROP_DB:
+ case SQLCOM_ALTER_DB_UPGRADE:
+ case SQLCOM_RENAME_TABLE:
+ case SQLCOM_DROP_INDEX:
+ case SQLCOM_CREATE_VIEW:
+ case SQLCOM_DROP_VIEW:
+ case SQLCOM_CREATE_TRIGGER:
+ case SQLCOM_DROP_TRIGGER:
+ case SQLCOM_CREATE_EVENT:
+ case SQLCOM_ALTER_EVENT:
+ case SQLCOM_DROP_EVENT:
+ case SQLCOM_REPAIR:
+ case SQLCOM_OPTIMIZE:
+ case SQLCOM_ANALYZE:
+ case SQLCOM_CREATE_USER:
+ case SQLCOM_DROP_USER:
+ case SQLCOM_RENAME_USER:
+ case SQLCOM_REVOKE_ALL:
+ case SQLCOM_REVOKE:
+ case SQLCOM_GRANT:
+ case SQLCOM_CREATE_PROCEDURE:
+ case SQLCOM_CREATE_SPFUNCTION:
+ case SQLCOM_ALTER_PROCEDURE:
+ case SQLCOM_ALTER_FUNCTION:
+ case SQLCOM_ASSIGN_TO_KEYCACHE:
+ case SQLCOM_PRELOAD_KEYS:
+ case SQLCOM_FLUSH:
+ case SQLCOM_RESET:
+ case SQLCOM_CHECK:
+ implicit_commit= TRUE;
+ break;
+ default:
+ implicit_commit= FALSE;
+ break;
+ }
+
+ if (implicit_commit || direct)
+ {
+ cache_type= Log_event::EVENT_NO_CACHE;
+ }
+ else
+ {
+ cache_type= (using_trans || stmt_has_updated_trans_table(thd)
+ ? Log_event::EVENT_TRANSACTIONAL_CACHE :
+ Log_event::EVENT_STMT_CACHE);
+ }
+ DBUG_ASSERT(cache_type != Log_event::EVENT_INVALID_CACHE);
DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %lu",
(ulong) flags2, sql_mode));
}
@@ -3067,7 +3159,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
}
else
{
- const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
+ const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
}
/*
@@ -3086,10 +3178,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
rpl_filter->db_ok(thd->db))
{
thd->set_time((time_t)when);
- thd->set_query((char*)query_arg, q_len_arg);
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query_id = next_query_id();
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
+ thd->set_query_and_id((char*)query_arg, q_len_arg, next_query_id());
thd->variables.pseudo_thread_id= thread_id; // for temp tables
DBUG_PRINT("query",("%s", thd->query()));
@@ -3098,13 +3187,13 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
{
if (flags2_inited)
/*
- all bits of thd->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
+ all bits of thd->variables.option_bits which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
must take their value from flags2.
*/
- thd->options= flags2|(thd->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
+ thd->variables.option_bits= flags2|(thd->variables.option_bits & ~OPTIONS_WRITTEN_TO_BIN_LOG);
/*
else, we are in a 3.23/4.0 binlog; we previously received a
- Rotate_log_event which reset thd->options and sql_mode etc, so
+ Rotate_log_event which reset thd->variables.option_bits and sql_mode etc, so
nothing to do.
*/
/*
@@ -3285,7 +3374,7 @@ Default database: '%s'. Query: '%s'",
them back here.
*/
if (expected_error && expected_error == actual_error)
- ha_autocommit_or_rollback(thd, TRUE);
+ trans_rollback_stmt(thd);
}
/*
If we expected a non-zero error code and get nothing and, it is a concurrency
@@ -3294,7 +3383,8 @@ Default database: '%s'. Query: '%s'",
else if (expected_error && !actual_error &&
(concurrency_error_code(expected_error) ||
ignored_error_code(expected_error)))
- ha_autocommit_or_rollback(thd, TRUE);
+ trans_rollback_stmt(thd);
+
/*
Other cases: mostly we expected no error and get one.
*/
@@ -3404,13 +3494,13 @@ Query_log_event::do_shall_skip(Relay_log_info *rli)
{
if (strcmp("BEGIN", query) == 0)
{
- thd->options|= OPTION_BEGIN;
+ thd->variables.option_bits|= OPTION_BEGIN;
DBUG_RETURN(Log_event::continue_group(rli));
}
if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
{
- thd->options&= ~OPTION_BEGIN;
+ thd->variables.option_bits&= ~OPTION_BEGIN;
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
}
@@ -3666,8 +3756,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
#ifndef DBUG_OFF
// Allows us to sanity-check that all events initialized their
// events (see the end of this 'if' block).
- memset(post_header_len, 255,
- number_of_event_types*sizeof(uint8));
+ memset(post_header_len, 255, number_of_event_types*sizeof(uint8));
#endif
/* Note: all event types must explicitly fill in their lengths here. */
@@ -3949,7 +4038,6 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
int ret= 0;
DBUG_ENTER("Format_description_log_event::do_apply_event");
-#ifdef USING_TRANSACTIONS
/*
As a transaction NEVER spans on 2 or more binlogs:
if we have an active transaction at this point, the master died
@@ -3971,7 +4059,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
"its binary log, thus rolled back too.");
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 1);
}
-#endif
+
/*
If this event comes from ourselves, there is no cleaning task to
perform, we don't call Start_log_event_v3::do_apply_event()
@@ -4640,13 +4728,14 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
if (rpl_filter->db_ok(thd->db))
{
thd->set_time((time_t)when);
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query_id = next_query_id();
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
+ thd->set_query_id(next_query_id());
thd->warning_info->opt_clear_warning_info(thd->query_id);
TABLE_LIST tables;
- tables.init_one_table(thd->db, table_name, TL_WRITE);
+ tables.init_one_table(thd->strmake(thd->db, thd->db_length),
+ thd->db_length,
+ table_name, strlen(table_name),
+ table_name, TL_WRITE);
tables.updating= 1;
// the table will be opened in mysql_load
@@ -4994,7 +5083,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
!is_relay_log_event() &&
!rli->is_in_group())
{
- pthread_mutex_lock(&rli->data_lock);
+ mysql_mutex_lock(&rli->data_lock);
DBUG_PRINT("info", ("old group_master_log_name: '%s' "
"old group_master_log_pos: %lu",
rli->group_master_log_name,
@@ -5006,11 +5095,11 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
"new group_master_log_pos: %lu",
rli->group_master_log_name,
(ulong) rli->group_master_log_pos));
- pthread_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
flush_relay_log_info(rli);
/*
- Reset thd->options and sql_mode etc, because this could be the signal of
+ Reset thd->variables.option_bits and sql_mode etc, because this could be the signal of
a master's downgrade from 5.0 to 4.0.
However, no need to reset description_event_for_exec: indeed, if the next
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
@@ -5369,10 +5458,16 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Xid_log_event::do_apply_event(Relay_log_info const *rli)
{
+ bool res;
/* For a slave Xid_log_event is COMMIT */
general_log_print(thd, COM_QUERY,
"COMMIT /* implicit, from Xid_log_event */");
- return end_trans(thd, COMMIT);
+ if (!(res= trans_commit(thd)))
+ {
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ }
+ return res;
}
Log_event::enum_skip_reason
@@ -5380,7 +5475,7 @@ Xid_log_event::do_shall_skip(Relay_log_info *rli)
{
DBUG_ENTER("Xid_log_event::do_shall_skip");
if (rli->slave_skip_counter > 0) {
- thd->options&= ~OPTION_BEGIN;
+ thd->variables.option_bits&= ~OPTION_BEGIN;
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
DBUG_RETURN(Log_event::do_shall_skip(rli));
@@ -5412,16 +5507,18 @@ void User_var_log_event::pack_info(Protocol* protocol)
case REAL_RESULT:
double real_val;
float8get(real_val, val);
- if (!(buf= (char*) my_malloc(val_offset + FLOATING_POINT_BUFFER,
+ if (!(buf= (char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1,
MYF(MY_WME))))
return;
- event_len+= my_sprintf(buf + val_offset,
- (buf + val_offset, "%.14g", real_val));
+ event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH,
+ buf + val_offset, NULL);
break;
case INT_RESULT:
if (!(buf= (char*) my_malloc(val_offset + 22, MYF(MY_WME))))
return;
- event_len= longlong10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
+ event_len= longlong10_to_str(uint8korr(val), buf + val_offset,
+ ((flags & User_var_log_event::UNSIGNED_F) ?
+ 10 : -10))-buf;
break;
case DECIMAL_RESULT:
{
@@ -5479,12 +5576,14 @@ User_var_log_event(const char* buf,
:Log_event(buf, description_event)
{
/* The Post-Header is empty. The Variable Data part begins immediately. */
+ const char *start= buf;
buf+= description_event->common_header_len +
description_event->post_header_len[USER_VAR_EVENT-1];
name_len= uint4korr(buf);
name= (char *) buf + UV_NAME_LEN_SIZE;
buf+= UV_NAME_LEN_SIZE + name_len;
is_null= (bool) *buf;
+ flags= User_var_log_event::UNDEF_F; // defaults to UNDEF_F
if (is_null)
{
type= STRING_RESULT;
@@ -5500,6 +5599,27 @@ User_var_log_event(const char* buf,
UV_CHARSET_NUMBER_SIZE);
val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
+
+ /**
+ We need to check if this is from an old server
+ that did not pack information for flags.
+ We do this by checking if there are extra bytes
+ after the packed value. If there are we take the
+ extra byte and it's value is assumed to contain
+ the flags value.
+
+ Old events will not have this extra byte, thence,
+ we keep the flags set to UNDEF_F.
+ */
+ uint bytes_read= ((val + val_len) - start);
+ DBUG_ASSERT(bytes_read==data_written ||
+ bytes_read==(data_written-1));
+ if ((data_written - bytes_read) > 0)
+ {
+ flags= (uint) *(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
+ UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE +
+ val_len);
+ }
}
}
@@ -5511,6 +5631,7 @@ bool User_var_log_event::write(IO_CACHE* file)
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
uchar buf2[max(8, DECIMAL_MAX_FIELD_SIZE + 2)], *pos= buf2;
+ uint unsigned_len= 0;
uint buf1_length;
ulong event_length;
@@ -5532,6 +5653,7 @@ bool User_var_log_event::write(IO_CACHE* file)
break;
case INT_RESULT:
int8store(buf2, *(longlong*) val);
+ unsigned_len= 1;
break;
case DECIMAL_RESULT:
{
@@ -5556,13 +5678,14 @@ bool User_var_log_event::write(IO_CACHE* file)
}
/* Length of the whole event */
- event_length= sizeof(buf)+ name_len + buf1_length + val_len;
+ event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len;
return (write_header(file, event_length) ||
my_b_safe_write(file, (uchar*) buf, sizeof(buf)) ||
my_b_safe_write(file, (uchar*) name, name_len) ||
my_b_safe_write(file, (uchar*) buf1, buf1_length) ||
- my_b_safe_write(file, pos, val_len));
+ my_b_safe_write(file, pos, val_len) ||
+ my_b_safe_write(file, &flags, unsigned_len));
}
#endif
@@ -5603,7 +5726,8 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
break;
case INT_RESULT:
char int_buf[22];
- longlong10_to_str(uint8korr(val), int_buf, -10);
+ longlong10_to_str(uint8korr(val), int_buf,
+ ((flags & User_var_log_event::UNSIGNED_F) ? 10 : -10));
my_b_printf(&cache, ":=%s%s\n", int_buf, print_event_info->delimiter);
break;
case DECIMAL_RESULT:
@@ -5750,7 +5874,8 @@ int User_var_log_event::do_apply_event(Relay_log_info const *rli)
a single record and with a single column. Thus, like
a column value, it could always have IMPLICIT derivation.
*/
- e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
+ e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT,
+ (flags & User_var_log_event::UNSIGNED_F));
free_root(thd->mem_root,0);
return 0;
@@ -5827,8 +5952,8 @@ Slave_log_event::Slave_log_event(THD* thd_arg,
Master_info* mi = rli->mi;
// TODO: re-write this better without holding both locks at the same time
- pthread_mutex_lock(&mi->data_lock);
- pthread_mutex_lock(&rli->data_lock);
+ mysql_mutex_lock(&mi->data_lock);
+ mysql_mutex_lock(&rli->data_lock);
master_host_len = strlen(mi->host);
master_log_len = strlen(rli->group_master_log_name);
// on OOM, just do not initialize the structure and print the error
@@ -5846,8 +5971,8 @@ Slave_log_event::Slave_log_event(THD* thd_arg,
}
else
sql_print_error("Out of memory while recording slave event");
- pthread_mutex_unlock(&rli->data_lock);
- pthread_mutex_unlock(&mi->data_lock);
+ mysql_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&mi->data_lock);
DBUG_VOID_RETURN;
}
#endif /* !MYSQL_CLIENT */
@@ -5981,7 +6106,7 @@ int Stop_log_event::do_update_pos(Relay_log_info *rli)
could give false triggers in MASTER_POS_WAIT() that we have reached
the target position when in fact we have not.
*/
- if (thd->options & OPTION_BEGIN)
+ if (thd->variables.option_bits & OPTION_BEGIN)
rli->inc_event_relay_log_pos();
else
{
@@ -6204,10 +6329,12 @@ int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
fname_buf= strmov(proc_info, "Making temp file ");
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
thd_proc_info(thd, proc_info);
- my_delete(fname_buf, MYF(0)); // old copy may exist already
- if ((fd= my_create(fname_buf, CREATE_MODE,
- O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
- MYF(MY_WME))) < 0 ||
+ /* old copy may exist already */
+ mysql_file_delete(key_file_log_event_info, fname_buf, MYF(0));
+ if ((fd= mysql_file_create(key_file_log_event_info,
+ fname_buf, CREATE_MODE,
+ O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
+ MYF(MY_WME))) < 0 ||
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
@@ -6229,20 +6356,22 @@ int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
goto err;
}
end_io_cache(&file);
- my_close(fd, MYF(0));
+ mysql_file_close(fd, MYF(0));
// fname_buf now already has .data, not .info, because we did our trick
- my_delete(fname_buf, MYF(0)); // old copy may exist already
- if ((fd= my_create(fname_buf, CREATE_MODE,
- O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
- MYF(MY_WME))) < 0)
+ /* old copy may exist already */
+ mysql_file_delete(key_file_log_event_data, fname_buf, MYF(0));
+ if ((fd= mysql_file_create(key_file_log_event_data,
+ fname_buf, CREATE_MODE,
+ O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
+ MYF(MY_WME))) < 0)
{
rli->report(ERROR_LEVEL, my_errno,
"Error in Create_file event: could not open file '%s'",
fname_buf);
goto err;
}
- if (my_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP)))
+ if (mysql_file_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP)))
{
rli->report(ERROR_LEVEL, my_errno,
"Error in Create_file event: write to '%s' failed",
@@ -6255,7 +6384,7 @@ err:
if (error)
end_io_cache(&file);
if (fd >= 0)
- my_close(fd, MYF(0));
+ mysql_file_close(fd, MYF(0));
thd_proc_info(thd, 0);
return error != 0;
}
@@ -6387,10 +6516,12 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
*/
lex_start(thd);
mysql_reset_thd_for_next_command(thd);
- my_delete(fname, MYF(0)); // old copy may exist already
- if ((fd= my_create(fname, CREATE_MODE,
- O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
- MYF(MY_WME))) < 0)
+ /* old copy may exist already */
+ mysql_file_delete(key_file_log_event_data, fname, MYF(0));
+ if ((fd= mysql_file_create(key_file_log_event_data,
+ fname, CREATE_MODE,
+ O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
+ MYF(MY_WME))) < 0)
{
rli->report(ERROR_LEVEL, my_errno,
"Error in %s event: could not create file '%s'",
@@ -6398,8 +6529,10 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
goto err;
}
}
- else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW,
- MYF(MY_WME))) < 0)
+ else if ((fd= mysql_file_open(key_file_log_event_data,
+ fname,
+ O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW,
+ MYF(MY_WME))) < 0)
{
rli->report(ERROR_LEVEL, my_errno,
"Error in %s event: could not open file '%s'",
@@ -6407,10 +6540,12 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
goto err;
}
- DBUG_EXECUTE_IF("remove_slave_load_file_before_write",
- my_close(fd,MYF(0)); fd= -1; my_delete(fname, MYF(0)););
+ DBUG_EXECUTE_IF("remove_slave_load_file_before_write",
+ {
+ my_delete_allow_opened(fname, MYF(0));
+ });
- if (my_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP)))
+ if (mysql_file_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP)))
{
rli->report(ERROR_LEVEL, my_errno,
"Error in %s event: write to '%s' failed",
@@ -6421,7 +6556,7 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
err:
if (fd >= 0)
- my_close(fd, MYF(0));
+ mysql_file_close(fd, MYF(0));
thd_proc_info(thd, 0);
DBUG_RETURN(error);
}
@@ -6515,9 +6650,9 @@ int Delete_file_log_event::do_apply_event(Relay_log_info const *rli)
{
char fname[FN_REFLEN+10];
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
- (void) my_delete(fname, MYF(MY_WME));
+ mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME));
strmov(ext, ".info");
- (void) my_delete(fname, MYF(MY_WME));
+ mysql_file_delete(key_file_log_event_info, fname, MYF(MY_WME));
return 0;
}
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
@@ -6618,8 +6753,9 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
Load_log_event *lev= 0;
ext= slave_load_file_stem(fname, file_id, server_id, ".info");
- if ((fd = my_open(fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
- MYF(MY_WME))) < 0 ||
+ if ((fd= mysql_file_open(key_file_log_event_info,
+ fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
+ MYF(MY_WME))) < 0 ||
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
@@ -6629,7 +6765,7 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
goto err;
}
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
- (pthread_mutex_t*)0,
+ (mysql_mutex_t*)0,
rli->relay_log.description_event_for_exec)) ||
lev->get_type_code() != NEW_LOAD_EVENT)
{
@@ -6669,24 +6805,24 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
}
/*
We have an open file descriptor to the .info file; we need to close it
- or Windows will refuse to delete the file in my_delete().
+ or Windows will refuse to delete the file in mysql_file_delete().
*/
if (fd >= 0)
{
- my_close(fd, MYF(0));
+ mysql_file_close(fd, MYF(0));
end_io_cache(&file);
fd= -1;
}
- (void) my_delete(fname, MYF(MY_WME));
+ mysql_file_delete(key_file_log_event_info, fname, MYF(MY_WME));
memcpy(ext, ".data", 6);
- (void) my_delete(fname, MYF(MY_WME));
+ mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME));
error = 0;
err:
delete lev;
if (fd >= 0)
{
- my_close(fd, MYF(0));
+ mysql_file_close(fd, MYF(0));
end_io_cache(&file);
}
return error;
@@ -6751,9 +6887,9 @@ Execute_load_query_log_event(THD *thd_arg, const char* query_arg,
ulong query_length_arg, uint fn_pos_start_arg,
uint fn_pos_end_arg,
enum_load_dup_handling dup_handling_arg,
- bool using_trans, bool suppress_use,
+ bool using_trans, bool direct, bool suppress_use,
int errcode):
- Query_log_event(thd_arg, query_arg, query_length_arg, using_trans,
+ Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, direct,
suppress_use, errcode),
file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg),
fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
@@ -6925,7 +7061,7 @@ Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
file so that we can re-execute this event at START SLAVE.
*/
if (!error)
- (void) my_delete(fname, MYF(MY_WME));
+ mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME));
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
return error;
@@ -7048,9 +7184,9 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
DBUG_ASSERT((tbl_arg && tbl_arg->s && tid != ~0UL) ||
(!tbl_arg && !cols && tid == ~0UL));
- if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
+ if (thd_arg->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS)
set_flags(NO_FOREIGN_KEY_CHECKS_F);
- if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
+ if (thd_arg->variables.option_bits & OPTION_RELAXED_UNIQUE_CHECKS)
set_flags(RELAXED_UNIQUE_CHECKS_F);
/* if bitmap_init fails, caught in is_valid() */
if (likely(!bitmap_init(&m_cols,
@@ -7295,8 +7431,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
DBUG_ASSERT(get_flags(STMT_END_F));
- const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
- close_thread_tables(thd);
+ const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
thd->clear_error();
DBUG_RETURN(0);
}
@@ -7322,7 +7457,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
We also call the mysql_reset_thd_for_next_command(), since this
is the logical start of the next "statement". Note that this
- call might reset the value of current_stmt_binlog_row_based, so
+ call might reset the value of current_stmt_binlog_format, so
we need to do any changes to that value after this function.
*/
lex_start(thd);
@@ -7334,16 +7469,12 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
thd->transaction.stmt.modified_non_trans_table= FALSE;
/*
- Check if the slave is set to use SBR. If so, it should switch
- to using RBR until the end of the "statement", i.e., next
- STMT_END_F or next error.
+ This is a row injection, so we flag the "statement" as
+ such. Note that this code is called both when the slave does row
+ injections and when the BINLOG statement is used to do row
+ injections.
*/
- if (!thd->current_stmt_binlog_row_based &&
- mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG))
- {
- thd->set_current_stmt_binlog_row_based();
- }
-
+ thd->lex->set_stmt_row_injection();
/*
There are a few flags that are replicated with each row event.
@@ -7351,18 +7482,18 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
the event.
*/
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
- thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
+ thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
else
- thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
+ thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
- thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
+ thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
else
- thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
+ thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
/* A small test to verify that objects have consistent types */
- DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
+ DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
- if (simple_open_n_lock_tables(thd, rli->tables_to_lock))
+ if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
{
uint actual_error= thd->stmt_da->sql_errno();
if (thd->is_slave_error || thd->is_fatal_error)
@@ -7379,7 +7510,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
"unexpected success or fatal error"));
thd->is_slave_error= 1;
}
- const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
+ const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
DBUG_RETURN(actual_error);
}
@@ -7392,11 +7523,18 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
{
+ DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
+ rli->tables_to_lock));
RPL_TABLE_LIST *ptr= rli->tables_to_lock;
for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
{
- if (ptr->m_tabledef.compatible_with(rli, ptr->table))
+ TABLE *conv_table;
+ if (!ptr->m_tabledef.compatible_with(thd, const_cast<Relay_log_info*>(rli),
+ ptr->table, &conv_table))
{
+ DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master",
+ ptr->table->s->db.str,
+ ptr->table->s->table_name.str));
/*
We should not honour --slave-skip-errors at this point as we are
having severe errors which should not be skiped.
@@ -7404,15 +7542,20 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
mysql_unlock_tables(thd, thd->lock);
thd->lock= 0;
thd->is_slave_error= 1;
- const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
+ const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
DBUG_RETURN(ERR_BAD_TABLE_DEF);
}
+ DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
+ " - conv_table: %p",
+ ptr->table->s->db.str,
+ ptr->table->s->table_name.str, conv_table));
+ ptr->m_conv_table= conv_table;
}
}
/*
- ... and then we add all the tables to the table map and remove
- them from tables to lock.
+ ... and then we add all the tables to the table map and but keep
+ them in the tables to lock list.
We also invalidate the query cache for all the tables, since
they will now be changed.
@@ -7514,8 +7657,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
{
int actual_error= convert_handler_error(error, thd, table);
bool idempotent_error= (idempotent_error_code(error) &&
- ((bit_is_set(slave_exec_mode,
- SLAVE_EXEC_MODE_IDEMPOTENT)) == 1));
+ (slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT));
bool ignored_error= (idempotent_error == 0 ?
ignored_error_code(actual_error) : 0);
@@ -7580,26 +7722,27 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
error= 0;
}
- if (!cache_stmt)
+ if (!use_trans_cache())
{
DBUG_PRINT("info", ("Marked that we need to keep log"));
- thd->options|= OPTION_KEEP_LOG;
+ thd->variables.option_bits|= OPTION_KEEP_LOG;
}
} // if (table)
- /*
- We need to delay this clear until here bacause unpack_current_row() uses
- master-side table definitions stored in rli.
- */
- if (rli->tables_to_lock && get_flags(STMT_END_F))
- const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
if (error)
{
slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
get_type_str(),
RPL_LOG_NAME, (ulong) log_pos);
- thd->reset_current_stmt_binlog_row_based();
+ /*
+ @todo We should probably not call
+ reset_current_stmt_binlog_format_row() from here.
+
+ Note: this applies to log_event_old.cc too.
+ /Sven
+ */
+ thd->reset_current_stmt_binlog_format_row();
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
thd->is_slave_error= 1;
DBUG_RETURN(error);
@@ -7660,7 +7803,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
(assume the last master's transaction is ignored by the slave because of
replicate-ignore rules).
*/
- error= thd->binlog_flush_pending_rows_event(true);
+ error= thd->binlog_flush_pending_rows_event(TRUE);
/*
If this event is not in a transaction, the call below will, if some
@@ -7671,7 +7814,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
are involved, commit the transaction and flush the pending event to the
binlog.
*/
- error|= ha_autocommit_or_rollback(thd, error);
+ error|= (error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd));
/*
Now what if this is not a transactional engine? we still need to
@@ -7683,7 +7826,17 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
event flushed.
*/
- thd->reset_current_stmt_binlog_row_based();
+ /*
+ @todo We should probably not call
+ reset_current_stmt_binlog_format_row() from here.
+
+ Note: this applies to log_event_old.cc too
+
+ Btw, the previous comment about transactional engines does not
+ seem related to anything that happens here.
+ /Sven
+ */
+ thd->reset_current_stmt_binlog_format_row();
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
}
@@ -7890,7 +8043,10 @@ int Table_map_log_event::save_field_metadata()
DBUG_ENTER("Table_map_log_event::save_field_metadata");
int index= 0;
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
+ {
+ DBUG_PRINT("debug", ("field_type: %d", m_coltype[i]));
index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
+ }
DBUG_RETURN(index);
}
#endif /* !defined(MYSQL_CLIENT) */
@@ -7903,7 +8059,7 @@ int Table_map_log_event::save_field_metadata()
#if !defined(MYSQL_CLIENT)
Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
bool is_transactional)
- : Log_event(thd, 0, true),
+ : Log_event(thd, 0, is_transactional),
m_table(tbl),
m_dbnam(tbl->s->db.str),
m_dblen(m_dbnam ? tbl->s->db.length : 0),
@@ -8128,9 +8284,7 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
DBUG_ASSERT(rli->sql_thd == thd);
/* Step the query id to mark what columns that are actually used. */
- pthread_mutex_lock(&LOCK_thread_count);
- thd->query_id= next_query_id();
- pthread_mutex_unlock(&LOCK_thread_count);
+ thd->set_query_id(next_query_id());
if (!(memory= my_multi_malloc(MYF(MY_WME),
&table_list, (uint) sizeof(RPL_TABLE_LIST),
@@ -8139,15 +8293,15 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
NullS)))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
- bzero(table_list, sizeof(*table_list));
- table_list->db = db_mem;
- table_list->alias= table_list->table_name = tname_mem;
- table_list->lock_type= TL_WRITE;
- table_list->next_global= table_list->next_local= 0;
+ strmov(db_mem, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
+ strmov(tname_mem, m_tblnam);
+
+ table_list->init_one_table(db_mem, strlen(db_mem),
+ tname_mem, strlen(tname_mem),
+ tname_mem, TL_WRITE);
+
table_list->table_id= m_table_id;
table_list->updating= 1;
- strmov(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
- strmov(table_list->table_name, m_tblnam);
int error= 0;
@@ -8331,8 +8485,8 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability
todo: to introduce a property for the event (handler?) which forces
applying the event in the replace (idempotent) fashion.
*/
- if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 ||
- m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
+ if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
+ (m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER))
{
/*
We are using REPLACE semantics and not INSERT IGNORE semantics
@@ -8410,7 +8564,7 @@ Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *
int local_error= 0;
m_table->next_number_field=0;
m_table->auto_increment_field_not_null= FALSE;
- if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 ||
+ if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
{
m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
@@ -8711,9 +8865,7 @@ int
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
{
DBUG_ASSERT(m_table != NULL);
- int error=
- write_row(rli, /* if 1 then overwrite */
- bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
+ int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
if (error && !thd->is_error())
{