summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/r/federated_innodb.result34
-rw-r--r--mysql-test/t/federated_innodb-slave.opt1
-rw-r--r--mysql-test/t/federated_innodb.test34
-rw-r--r--sql/ha_federated.cc232
-rw-r--r--sql/ha_federated.h20
5 files changed, 283 insertions, 38 deletions
diff --git a/mysql-test/r/federated_innodb.result b/mysql-test/r/federated_innodb.result
new file mode 100644
index 00000000000..70ba3acb279
--- /dev/null
+++ b/mysql-test/r/federated_innodb.result
@@ -0,0 +1,34 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+stop slave;
+DROP DATABASE IF EXISTS federated;
+CREATE DATABASE federated;
+DROP DATABASE IF EXISTS federated;
+CREATE DATABASE federated;
+create table federated.t1 (a int primary key, b varchar(64))
+engine=myisam;
+create table federated.t1 (a int primary key, b varchar(64))
+engine=federated
+connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1';
+insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
+ERROR 23000: Can't write; duplicate key in table 't1'
+select * from federated.t1;
+a b
+1 Larry
+2 Curly
+truncate federated.t1;
+alter table federated.t1 engine=innodb;
+insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
+ERROR 23000: Can't write; duplicate key in table 't1'
+select * from federated.t1;
+a b
+drop table federated.t1;
+drop table federated.t1;
+DROP TABLE IF EXISTS federated.t1;
+DROP DATABASE IF EXISTS federated;
+DROP TABLE IF EXISTS federated.t1;
+DROP DATABASE IF EXISTS federated;
diff --git a/mysql-test/t/federated_innodb-slave.opt b/mysql-test/t/federated_innodb-slave.opt
new file mode 100644
index 00000000000..627becdbfb5
--- /dev/null
+++ b/mysql-test/t/federated_innodb-slave.opt
@@ -0,0 +1 @@
+--innodb
diff --git a/mysql-test/t/federated_innodb.test b/mysql-test/t/federated_innodb.test
new file mode 100644
index 00000000000..772e37a2929
--- /dev/null
+++ b/mysql-test/t/federated_innodb.test
@@ -0,0 +1,34 @@
+source include/federated.inc;
+source include/have_innodb.inc;
+
+#
+# Bug#25513 Federated transaction failures
+#
+connection slave;
+create table federated.t1 (a int primary key, b varchar(64))
+ engine=myisam;
+connection master;
+--replace_result $SLAVE_MYPORT SLAVE_PORT
+eval create table federated.t1 (a int primary key, b varchar(64))
+ engine=federated
+ connection='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1';
+
+--error ER_DUP_KEY
+insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
+select * from federated.t1;
+
+connection slave;
+truncate federated.t1;
+alter table federated.t1 engine=innodb;
+connection master;
+
+--error ER_DUP_KEY
+insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
+select * from federated.t1;
+
+drop table federated.t1;
+connection slave;
+drop table federated.t1;
+
+
+source include/federated_cleanup.inc;
diff --git a/sql/ha_federated.cc b/sql/ha_federated.cc
index e691831bbc9..8551db6be16 100644
--- a/sql/ha_federated.cc
+++ b/sql/ha_federated.cc
@@ -352,6 +352,7 @@ static char ident_quote_char= '`'; // Character for quoting
// identifiers
static char value_quote_char= '\''; // Character for quoting
// literals
+static const int bulk_padding= 64; // bytes "overhead" in packet
/* Federated storage engine handlerton */
@@ -773,7 +774,9 @@ error:
ha_federated::ha_federated(TABLE *table_arg)
:handler(&federated_hton, table_arg),
mysql(0), stored_result(0)
-{}
+{
+ bzero(&bulk_insert, sizeof(bulk_insert));
+}
/*
@@ -1584,6 +1587,83 @@ inline uint field_in_record_is_null(TABLE *table,
DBUG_RETURN(0);
}
+
+/**
+ @brief Construct the INSERT statement.
+
+ @details This method will construct the INSERT statement and appends it to
+ the supplied query string buffer.
+
+ @return
+ @retval FALSE No error
+ @retval TRUE Failure
+*/
+
+bool ha_federated::append_stmt_insert(String *query)
+{
+ char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
+ Field **field;
+ uint tmp_length;
+
+ /* The main insert query string */
+ String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
+ DBUG_ENTER("ha_federated::append_stmt_insert");
+
+ insert_string.length(0);
+
+ if (replace_duplicates)
+ insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
+ else if (ignore_duplicates && !insert_dup_update)
+ insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
+ else
+ insert_string.append(STRING_WITH_LEN("INSERT INTO "));
+ append_ident(&insert_string, share->table_name, share->table_name_length,
+ ident_quote_char);
+ insert_string.append(FEDERATED_OPENPAREN);
+ tmp_length= insert_string.length() - strlen(FEDERATED_COMMA);
+
+ /*
+ loop through the field pointer array, add any fields to both the values
+ list and the fields list that match the current query id
+ */
+ for (field= table->field; *field; field++)
+ {
+ /* append the field name */
+ append_ident(&insert_string, (*field)->field_name,
+ strlen((*field)->field_name), ident_quote_char);
+
+ /* append commas between both fields and fieldnames */
+ /*
+ unfortunately, we can't use the logic
+ if *(fields + 1) to make the following
+ appends conditional because we may not append
+ if the next field doesn't match the condition:
+ (((*field)->query_id && (*field)->query_id == current_query_id)
+ */
+ insert_string.append(FEDERATED_COMMA);
+ }
+
+ /*
+ remove trailing comma
+ */
+ insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA));
+
+ /*
+ if there were no fields, we don't want to add a closing paren
+ AND, we don't want to chop off the last char '('
+ insert will be "INSERT INTO t1 VALUES ();"
+ */
+ if (insert_string.length() > tmp_length)
+ {
+ insert_string.append(FEDERATED_CLOSEPAREN);
+ }
+
+ insert_string.append(FEDERATED_VALUES);
+
+ DBUG_RETURN(query->append(insert_string));
+}
+
+
/*
write_row() inserts a row. No extra() hint is given currently if a bulk load
is happeneding. buf() is a byte array of data. You can use the field
@@ -1600,13 +1680,14 @@ inline uint field_in_record_is_null(TABLE *table,
int ha_federated::write_row(byte *buf)
{
- char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
Field **field;
+ uint tmp_length;
+ int error= 0;
+ bool use_bulk_insert;
+ bool auto_increment_update_required= table->next_number_field;
- /* The main insert query string */
- String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
/* The string containing the values to be added to the insert */
String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin);
/* The actual value of the field, to be added to the values_string */
@@ -1614,7 +1695,6 @@ int ha_federated::write_row(byte *buf)
sizeof(insert_field_value_buffer),
&my_charset_bin);
values_string.length(0);
- insert_string.length(0);
insert_field_value_string.length(0);
DBUG_ENTER("ha_federated::write_row");
@@ -1624,19 +1704,19 @@ int ha_federated::write_row(byte *buf)
/*
start both our field and field values strings
+ We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE"
+ Ignore duplicates is always true when insert_dup_update is true.
+ When replace_duplicates == TRUE, we can safely enable multi-row insert.
+ When performing multi-row insert, we only collect the columns values for
+ the row. The start of the statement is only created when the first
+ row is copied in to the bulk_insert string.
*/
- if (replace_duplicates)
- insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
- else if (ignore_duplicates && !insert_dup_update)
- insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
- else
- insert_string.append(STRING_WITH_LEN("INSERT INTO "));
- append_ident(&insert_string, share->table_name,
- share->table_name_length, ident_quote_char);
- insert_string.append(FEDERATED_OPENPAREN);
+ if (!(use_bulk_insert= bulk_insert.str &&
+ (!insert_dup_update || replace_duplicates)))
+ append_stmt_insert(&values_string);
- values_string.append(FEDERATED_VALUES);
values_string.append(FEDERATED_OPENPAREN);
+ tmp_length= values_string.length();
/*
loop through the field pointer array, add any fields to both the values
@@ -1655,9 +1735,6 @@ int ha_federated::write_row(byte *buf)
insert_field_value_string.length(0);
}
- /* append the field name */
- append_ident(&insert_string, (*field)->field_name,
- strlen((*field)->field_name), ident_quote_char);
/* append the value */
values_string.append(insert_field_value_string);
@@ -1671,32 +1748,61 @@ int ha_federated::write_row(byte *buf)
if the next field doesn't match the condition:
(((*field)->query_id && (*field)->query_id == current_query_id)
*/
- insert_string.append(FEDERATED_COMMA);
values_string.append(FEDERATED_COMMA);
}
/*
- remove trailing comma
- */
- insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA));
- /*
if there were no fields, we don't want to add a closing paren
AND, we don't want to chop off the last char '('
insert will be "INSERT INTO t1 VALUES ();"
*/
- if (table->s->fields)
+ if (values_string.length() > tmp_length)
{
/* chops off leading commas */
values_string.length(values_string.length() - strlen(FEDERATED_COMMA));
- insert_string.append(FEDERATED_CLOSEPAREN);
}
/* we always want to append this, even if there aren't any fields */
values_string.append(FEDERATED_CLOSEPAREN);
- /* add the values */
- insert_string.append(values_string);
+ if (use_bulk_insert)
+ {
+ /*
+ Send the current bulk insert out if appending the current row would
+ cause the statement to overflow the packet size, otherwise set
+ auto_increment_update_required to FALSE as no query was executed.
+ */
+ if (bulk_insert.length + values_string.length() + bulk_padding >
+ mysql->net.max_packet_size && bulk_insert.length)
+ {
+ error= mysql_real_query(mysql, bulk_insert.str, bulk_insert.length);
+ bulk_insert.length= 0;
+ }
+ else
+ auto_increment_update_required= FALSE;
+
+ if (bulk_insert.length == 0)
+ {
+ char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
+ String insert_string(insert_buffer, sizeof(insert_buffer),
+ &my_charset_bin);
+ insert_string.length(0);
+ append_stmt_insert(&insert_string);
+ dynstr_append_mem(&bulk_insert, insert_string.ptr(),
+ insert_string.length());
+ }
+ else
+ dynstr_append_mem(&bulk_insert, ",", 1);
- if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
+ dynstr_append_mem(&bulk_insert, values_string.ptr(),
+ values_string.length());
+ }
+ else
+ {
+ error= mysql_real_query(mysql, values_string.ptr(),
+ values_string.length());
+ }
+
+ if (error)
{
DBUG_RETURN(stash_remote_error());
}
@@ -1704,12 +1810,79 @@ int ha_federated::write_row(byte *buf)
If the table we've just written a record to contains an auto_increment
field, then store the last_insert_id() value from the foreign server
*/
- if (table->next_number_field)
+ if (auto_increment_update_required)
update_auto_increment();
DBUG_RETURN(0);
}
+
+/**
+ @brief Prepares the storage engine for bulk inserts.
+
+ @param[in] rows estimated number of rows in bulk insert
+ or 0 if unknown.
+
+ @details Initializes memory structures required for bulk insert.
+*/
+
+void ha_federated::start_bulk_insert(ha_rows rows)
+{
+ uint page_size;
+ DBUG_ENTER("ha_federated::start_bulk_insert");
+
+ dynstr_free(&bulk_insert);
+
+ /**
+ We don't bother with bulk-insert semantics when the estimated rows == 1
+ The rows value will be 0 if the server does not know how many rows
+ would be inserted. This can occur when performing INSERT...SELECT
+ */
+
+ if (rows == 1)
+ DBUG_VOID_RETURN;
+
+ page_size= (uint) my_getpagesize();
+
+ if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size))
+ DBUG_VOID_RETURN;
+
+ bulk_insert.length= 0;
+ DBUG_VOID_RETURN;
+}
+
+
+/**
+ @brief End bulk insert.
+
+ @details This method will send any remaining rows to the remote server.
+ Finally, it will deinitialize the bulk insert data structure.
+
+ @return Operation status
+ @retval 0 No error
+ @retval != 0 Error occured at remote server. Also sets my_errno.
+*/
+
+int ha_federated::end_bulk_insert()
+{
+ int error= 0;
+ DBUG_ENTER("ha_federated::end_bulk_insert");
+
+ if (bulk_insert.str && bulk_insert.length)
+ {
+ if (mysql_real_query(mysql, bulk_insert.str, bulk_insert.length))
+ error= stash_remote_error();
+ else
+ if (table->next_number_field)
+ update_auto_increment();
+ }
+
+ dynstr_free(&bulk_insert);
+
+ DBUG_RETURN(my_errno= error);
+}
+
+
/*
ha_federated::update_auto_increment
@@ -2451,7 +2624,6 @@ int ha_federated::info(uint flag)
{
char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
char status_buf[FEDERATED_QUERY_BUFFER_SIZE];
- char escaped_table_name[FEDERATED_QUERY_BUFFER_SIZE];
int error;
uint error_code;
MYSQL_RES *result= 0;
diff --git a/sql/ha_federated.h b/sql/ha_federated.h
index 28c89561b2c..b5e1c217eb5 100644
--- a/sql/ha_federated.h
+++ b/sql/ha_federated.h
@@ -159,6 +159,7 @@ class ha_federated: public handler
char remote_error_buf[FEDERATED_QUERY_BUFFER_SIZE];
bool ignore_duplicates, replace_duplicates;
bool insert_dup_update;
+ DYNAMIC_STRING bulk_insert;
private:
/*
@@ -173,6 +174,14 @@ private:
bool records_in_range);
int stash_remote_error();
+ bool append_stmt_insert(String *query);
+
+ int read_next(byte *buf, MYSQL_RES *result);
+ int index_read_idx_with_result_set(byte *buf, uint index,
+ const byte *key,
+ uint key_len,
+ ha_rkey_function find_flag,
+ MYSQL_RES **result);
public:
ha_federated(TABLE *table_arg);
~ha_federated()
@@ -258,6 +267,8 @@ public:
int open(const char *name, int mode, uint test_if_locked); // required
int close(void); // required
+ void start_bulk_insert(ha_rows rows);
+ int end_bulk_insert();
int write_row(byte *buf);
int update_row(const byte *old_data, byte *new_data);
int delete_row(const byte *buf);
@@ -301,14 +312,7 @@ public:
THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
enum thr_lock_type lock_type); //required
- virtual bool get_error_message(int error, String *buf);
-
- int read_next(byte *buf, MYSQL_RES *result);
- int index_read_idx_with_result_set(byte *buf, uint index,
- const byte *key,
- uint key_len,
- ha_rkey_function find_flag,
- MYSQL_RES **result);
+ bool get_error_message(int error, String *buf);
};
bool federated_db_init(void);