diff options
-rw-r--r-- | mysql-test/r/federated_innodb.result | 34 | ||||
-rw-r--r-- | mysql-test/t/federated_innodb-slave.opt | 1 | ||||
-rw-r--r-- | mysql-test/t/federated_innodb.test | 34 | ||||
-rw-r--r-- | sql/ha_federated.cc | 232 | ||||
-rw-r--r-- | sql/ha_federated.h | 20 |
5 files changed, 283 insertions, 38 deletions
diff --git a/mysql-test/r/federated_innodb.result b/mysql-test/r/federated_innodb.result new file mode 100644 index 00000000000..70ba3acb279 --- /dev/null +++ b/mysql-test/r/federated_innodb.result @@ -0,0 +1,34 @@ +stop slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +reset master; +reset slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +start slave; +stop slave; +DROP DATABASE IF EXISTS federated; +CREATE DATABASE federated; +DROP DATABASE IF EXISTS federated; +CREATE DATABASE federated; +create table federated.t1 (a int primary key, b varchar(64)) +engine=myisam; +create table federated.t1 (a int primary key, b varchar(64)) +engine=federated +connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1'; +insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe"); +ERROR 23000: Can't write; duplicate key in table 't1' +select * from federated.t1; +a b +1 Larry +2 Curly +truncate federated.t1; +alter table federated.t1 engine=innodb; +insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe"); +ERROR 23000: Can't write; duplicate key in table 't1' +select * from federated.t1; +a b +drop table federated.t1; +drop table federated.t1; +DROP TABLE IF EXISTS federated.t1; +DROP DATABASE IF EXISTS federated; +DROP TABLE IF EXISTS federated.t1; +DROP DATABASE IF EXISTS federated; diff --git a/mysql-test/t/federated_innodb-slave.opt b/mysql-test/t/federated_innodb-slave.opt new file mode 100644 index 00000000000..627becdbfb5 --- /dev/null +++ b/mysql-test/t/federated_innodb-slave.opt @@ -0,0 +1 @@ +--innodb diff --git a/mysql-test/t/federated_innodb.test b/mysql-test/t/federated_innodb.test new file mode 100644 index 00000000000..772e37a2929 --- /dev/null +++ b/mysql-test/t/federated_innodb.test @@ -0,0 +1,34 @@ +source include/federated.inc; +source include/have_innodb.inc; + +# +# Bug#25513 Federated transaction failures +# +connection slave; +create table federated.t1 (a int primary key, b varchar(64)) + engine=myisam; +connection master; +--replace_result $SLAVE_MYPORT SLAVE_PORT +eval create table federated.t1 (a int primary key, b varchar(64)) + engine=federated + connection='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1'; + +--error ER_DUP_KEY +insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe"); +select * from federated.t1; + +connection slave; +truncate federated.t1; +alter table federated.t1 engine=innodb; +connection master; + +--error ER_DUP_KEY +insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe"); +select * from federated.t1; + +drop table federated.t1; +connection slave; +drop table federated.t1; + + +source include/federated_cleanup.inc; diff --git a/sql/ha_federated.cc b/sql/ha_federated.cc index e691831bbc9..8551db6be16 100644 --- a/sql/ha_federated.cc +++ b/sql/ha_federated.cc @@ -352,6 +352,7 @@ static char ident_quote_char= '`'; // Character for quoting // identifiers static char value_quote_char= '\''; // Character for quoting // literals +static const int bulk_padding= 64; // bytes "overhead" in packet /* Federated storage engine handlerton */ @@ -773,7 +774,9 @@ error: ha_federated::ha_federated(TABLE *table_arg) :handler(&federated_hton, table_arg), mysql(0), stored_result(0) -{} +{ + bzero(&bulk_insert, sizeof(bulk_insert)); +} /* @@ -1584,6 +1587,83 @@ inline uint field_in_record_is_null(TABLE *table, DBUG_RETURN(0); } + +/** + @brief Construct the INSERT statement. + + @details This method will construct the INSERT statement and appends it to + the supplied query string buffer. + + @return + @retval FALSE No error + @retval TRUE Failure +*/ + +bool ha_federated::append_stmt_insert(String *query) +{ + char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + Field **field; + uint tmp_length; + + /* The main insert query string */ + String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin); + DBUG_ENTER("ha_federated::append_stmt_insert"); + + insert_string.length(0); + + if (replace_duplicates) + insert_string.append(STRING_WITH_LEN("REPLACE INTO ")); + else if (ignore_duplicates && !insert_dup_update) + insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO ")); + else + insert_string.append(STRING_WITH_LEN("INSERT INTO ")); + append_ident(&insert_string, share->table_name, share->table_name_length, + ident_quote_char); + insert_string.append(FEDERATED_OPENPAREN); + tmp_length= insert_string.length() - strlen(FEDERATED_COMMA); + + /* + loop through the field pointer array, add any fields to both the values + list and the fields list that match the current query id + */ + for (field= table->field; *field; field++) + { + /* append the field name */ + append_ident(&insert_string, (*field)->field_name, + strlen((*field)->field_name), ident_quote_char); + + /* append commas between both fields and fieldnames */ + /* + unfortunately, we can't use the logic + if *(fields + 1) to make the following + appends conditional because we may not append + if the next field doesn't match the condition: + (((*field)->query_id && (*field)->query_id == current_query_id) + */ + insert_string.append(FEDERATED_COMMA); + } + + /* + remove trailing comma + */ + insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA)); + + /* + if there were no fields, we don't want to add a closing paren + AND, we don't want to chop off the last char '(' + insert will be "INSERT INTO t1 VALUES ();" + */ + if (insert_string.length() > tmp_length) + { + insert_string.append(FEDERATED_CLOSEPAREN); + } + + insert_string.append(FEDERATED_VALUES); + + DBUG_RETURN(query->append(insert_string)); +} + + /* write_row() inserts a row. No extra() hint is given currently if a bulk load is happeneding. buf() is a byte array of data. You can use the field @@ -1600,13 +1680,14 @@ inline uint field_in_record_is_null(TABLE *table, int ha_federated::write_row(byte *buf) { - char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char values_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE]; Field **field; + uint tmp_length; + int error= 0; + bool use_bulk_insert; + bool auto_increment_update_required= table->next_number_field; - /* The main insert query string */ - String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin); /* The string containing the values to be added to the insert */ String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin); /* The actual value of the field, to be added to the values_string */ @@ -1614,7 +1695,6 @@ int ha_federated::write_row(byte *buf) sizeof(insert_field_value_buffer), &my_charset_bin); values_string.length(0); - insert_string.length(0); insert_field_value_string.length(0); DBUG_ENTER("ha_federated::write_row"); @@ -1624,19 +1704,19 @@ int ha_federated::write_row(byte *buf) /* start both our field and field values strings + We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE" + Ignore duplicates is always true when insert_dup_update is true. + When replace_duplicates == TRUE, we can safely enable multi-row insert. + When performing multi-row insert, we only collect the columns values for + the row. The start of the statement is only created when the first + row is copied in to the bulk_insert string. */ - if (replace_duplicates) - insert_string.append(STRING_WITH_LEN("REPLACE INTO ")); - else if (ignore_duplicates && !insert_dup_update) - insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO ")); - else - insert_string.append(STRING_WITH_LEN("INSERT INTO ")); - append_ident(&insert_string, share->table_name, - share->table_name_length, ident_quote_char); - insert_string.append(FEDERATED_OPENPAREN); + if (!(use_bulk_insert= bulk_insert.str && + (!insert_dup_update || replace_duplicates))) + append_stmt_insert(&values_string); - values_string.append(FEDERATED_VALUES); values_string.append(FEDERATED_OPENPAREN); + tmp_length= values_string.length(); /* loop through the field pointer array, add any fields to both the values @@ -1655,9 +1735,6 @@ int ha_federated::write_row(byte *buf) insert_field_value_string.length(0); } - /* append the field name */ - append_ident(&insert_string, (*field)->field_name, - strlen((*field)->field_name), ident_quote_char); /* append the value */ values_string.append(insert_field_value_string); @@ -1671,32 +1748,61 @@ int ha_federated::write_row(byte *buf) if the next field doesn't match the condition: (((*field)->query_id && (*field)->query_id == current_query_id) */ - insert_string.append(FEDERATED_COMMA); values_string.append(FEDERATED_COMMA); } /* - remove trailing comma - */ - insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA)); - /* if there were no fields, we don't want to add a closing paren AND, we don't want to chop off the last char '(' insert will be "INSERT INTO t1 VALUES ();" */ - if (table->s->fields) + if (values_string.length() > tmp_length) { /* chops off leading commas */ values_string.length(values_string.length() - strlen(FEDERATED_COMMA)); - insert_string.append(FEDERATED_CLOSEPAREN); } /* we always want to append this, even if there aren't any fields */ values_string.append(FEDERATED_CLOSEPAREN); - /* add the values */ - insert_string.append(values_string); + if (use_bulk_insert) + { + /* + Send the current bulk insert out if appending the current row would + cause the statement to overflow the packet size, otherwise set + auto_increment_update_required to FALSE as no query was executed. + */ + if (bulk_insert.length + values_string.length() + bulk_padding > + mysql->net.max_packet_size && bulk_insert.length) + { + error= mysql_real_query(mysql, bulk_insert.str, bulk_insert.length); + bulk_insert.length= 0; + } + else + auto_increment_update_required= FALSE; + + if (bulk_insert.length == 0) + { + char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + String insert_string(insert_buffer, sizeof(insert_buffer), + &my_charset_bin); + insert_string.length(0); + append_stmt_insert(&insert_string); + dynstr_append_mem(&bulk_insert, insert_string.ptr(), + insert_string.length()); + } + else + dynstr_append_mem(&bulk_insert, ",", 1); - if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length())) + dynstr_append_mem(&bulk_insert, values_string.ptr(), + values_string.length()); + } + else + { + error= mysql_real_query(mysql, values_string.ptr(), + values_string.length()); + } + + if (error) { DBUG_RETURN(stash_remote_error()); } @@ -1704,12 +1810,79 @@ int ha_federated::write_row(byte *buf) If the table we've just written a record to contains an auto_increment field, then store the last_insert_id() value from the foreign server */ - if (table->next_number_field) + if (auto_increment_update_required) update_auto_increment(); DBUG_RETURN(0); } + +/** + @brief Prepares the storage engine for bulk inserts. + + @param[in] rows estimated number of rows in bulk insert + or 0 if unknown. + + @details Initializes memory structures required for bulk insert. +*/ + +void ha_federated::start_bulk_insert(ha_rows rows) +{ + uint page_size; + DBUG_ENTER("ha_federated::start_bulk_insert"); + + dynstr_free(&bulk_insert); + + /** + We don't bother with bulk-insert semantics when the estimated rows == 1 + The rows value will be 0 if the server does not know how many rows + would be inserted. This can occur when performing INSERT...SELECT + */ + + if (rows == 1) + DBUG_VOID_RETURN; + + page_size= (uint) my_getpagesize(); + + if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size)) + DBUG_VOID_RETURN; + + bulk_insert.length= 0; + DBUG_VOID_RETURN; +} + + +/** + @brief End bulk insert. + + @details This method will send any remaining rows to the remote server. + Finally, it will deinitialize the bulk insert data structure. + + @return Operation status + @retval 0 No error + @retval != 0 Error occured at remote server. Also sets my_errno. +*/ + +int ha_federated::end_bulk_insert() +{ + int error= 0; + DBUG_ENTER("ha_federated::end_bulk_insert"); + + if (bulk_insert.str && bulk_insert.length) + { + if (mysql_real_query(mysql, bulk_insert.str, bulk_insert.length)) + error= stash_remote_error(); + else + if (table->next_number_field) + update_auto_increment(); + } + + dynstr_free(&bulk_insert); + + DBUG_RETURN(my_errno= error); +} + + /* ha_federated::update_auto_increment @@ -2451,7 +2624,6 @@ int ha_federated::info(uint flag) { char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char status_buf[FEDERATED_QUERY_BUFFER_SIZE]; - char escaped_table_name[FEDERATED_QUERY_BUFFER_SIZE]; int error; uint error_code; MYSQL_RES *result= 0; diff --git a/sql/ha_federated.h b/sql/ha_federated.h index 28c89561b2c..b5e1c217eb5 100644 --- a/sql/ha_federated.h +++ b/sql/ha_federated.h @@ -159,6 +159,7 @@ class ha_federated: public handler char remote_error_buf[FEDERATED_QUERY_BUFFER_SIZE]; bool ignore_duplicates, replace_duplicates; bool insert_dup_update; + DYNAMIC_STRING bulk_insert; private: /* @@ -173,6 +174,14 @@ private: bool records_in_range); int stash_remote_error(); + bool append_stmt_insert(String *query); + + int read_next(byte *buf, MYSQL_RES *result); + int index_read_idx_with_result_set(byte *buf, uint index, + const byte *key, + uint key_len, + ha_rkey_function find_flag, + MYSQL_RES **result); public: ha_federated(TABLE *table_arg); ~ha_federated() @@ -258,6 +267,8 @@ public: int open(const char *name, int mode, uint test_if_locked); // required int close(void); // required + void start_bulk_insert(ha_rows rows); + int end_bulk_insert(); int write_row(byte *buf); int update_row(const byte *old_data, byte *new_data); int delete_row(const byte *buf); @@ -301,14 +312,7 @@ public: THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type); //required - virtual bool get_error_message(int error, String *buf); - - int read_next(byte *buf, MYSQL_RES *result); - int index_read_idx_with_result_set(byte *buf, uint index, - const byte *key, - uint key_len, - ha_rkey_function find_flag, - MYSQL_RES **result); + bool get_error_message(int error, String *buf); }; bool federated_db_init(void); |