diff options
author | unknown <antony@ppcg5.local> | 2007-07-06 14:29:58 -0700 |
---|---|---|
committer | unknown <antony@ppcg5.local> | 2007-07-06 14:29:58 -0700 |
commit | 7cbbce05f9b4afac7d80b2ec993c90d8c34ad516 (patch) | |
tree | ef4f11959a41ab0909ae86a33eccb8a4123f95ae /sql | |
parent | 0e0b20dab1792131b6cfd607f761227a56d280f9 (diff) | |
parent | 8d8ca72deca3e60affcd35b4ff9ca668f08c29ab (diff) | |
download | mariadb-git-7cbbce05f9b4afac7d80b2ec993c90d8c34ad516.tar.gz |
Merge acurtis@bk-internal.mysql.com:/home/bk/mysql-5.0
into ppcg5.local:/private/Network/Servers/anubis.xiphis.org/home/antony/work/mysql-5.0-engines.merge
Diffstat (limited to 'sql')
-rw-r--r-- | sql/ha_archive.cc | 22 | ||||
-rw-r--r-- | sql/ha_federated.cc | 435 | ||||
-rw-r--r-- | sql/ha_federated.h | 23 | ||||
-rw-r--r-- | sql/sql_insert.cc | 8 |
4 files changed, 379 insertions, 109 deletions
diff --git a/sql/ha_archive.cc b/sql/ha_archive.cc index e2a2211259f..0c558bf2515 100644 --- a/sql/ha_archive.cc +++ b/sql/ha_archive.cc @@ -205,7 +205,7 @@ bool archive_db_init() else { zoffset_size= 2 << ((zlibCompileFlags() >> 6) & 3); - switch (sizeof(z_off_t)) { + switch (zoffset_size) { case 2: max_zfile_size= INT_MAX16; break; @@ -676,6 +676,7 @@ int ha_archive::real_write_row(byte *buf, gzFile writer) total_row_length+= ((Field_blob*) table->field[*ptr])->get_length(); if (share->approx_file_size > max_zfile_size - total_row_length) { + gzflush(writer, Z_SYNC_FLUSH); info(HA_STATUS_TIME); share->approx_file_size= (ulong) data_file_length; if (share->approx_file_size > max_zfile_size - total_row_length) @@ -1204,7 +1205,6 @@ bool ha_archive::is_crashed() const int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt) { int rc= 0; - byte *buf; const char *old_proc_info=thd->proc_info; ha_rows count= share->rows_recorded; DBUG_ENTER("ha_archive::check"); @@ -1213,25 +1213,13 @@ int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt) /* Flush any waiting data */ gzflush(share->archive_write, Z_SYNC_FLUSH); - /* - First we create a buffer that we can use for reading rows, and can pass - to get_row(). - */ - if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME)))) - rc= HA_ERR_OUT_OF_MEM; - /* Now we will rewind the archive file so that we are positioned at the start of the file. */ - if (!rc) - read_data_header(archive); - - if (!rc) - while (!(rc= get_row(archive, buf))) - count--; - - my_free((char*)buf, MYF(0)); + read_data_header(archive); + while (!(rc= get_row(archive, table->record[0]))) + count--; thd->proc_info= old_proc_info; diff --git a/sql/ha_federated.cc b/sql/ha_federated.cc index dd4dd725be4..3cf9c2a8b99 100644 --- a/sql/ha_federated.cc +++ b/sql/ha_federated.cc @@ -348,6 +348,11 @@ pthread_mutex_t federated_mutex; // This is the mutex we use to // init the hash static int federated_init= FALSE; // Variable for checking the // init state of hash +static char ident_quote_char= '`'; // Character for quoting + // identifiers +static char value_quote_char= '\''; // Character for quoting + // literals +static const int bulk_padding= 64; // bytes "overhead" in packet /* Federated storage engine handlerton */ @@ -440,6 +445,58 @@ bool federated_db_end() return FALSE; } + +/** + @brief Append identifiers to the string. + + @param[in,out] string The target string. + @param[in] name Identifier name + @param[in] length Length of identifier name in bytes + @param[in] quote_char Quote char to use for quoting identifier. + + @return Operation Status + @retval FALSE OK + @retval TRUE There was an error appending to the string. + + @note This function is based upon the append_identifier() function + in sql_show.cc except that quoting always occurs. +*/ + +static bool append_ident(String *string, const char *name, uint length, + const char quote_char) +{ + bool result; + uint clen; + const char *name_end; + DBUG_ENTER("append_ident"); + + if (quote_char) + { + string->reserve(length * 2 + 2); + if ((result= string->append("e_char, 1, system_charset_info))) + goto err; + + for (name_end= name+length; name < name_end; name+= clen) + { + uchar c= *(uchar *) name; + if (!(clen= my_mbcharlen(system_charset_info, c))) + clen= 1; + if (clen == 1 && c == (uchar) quote_char && + (result= string->append("e_char, 1, system_charset_info))) + goto err; + if ((result= string->append(name, clen, string->charset()))) + goto err; + } + result= string->append("e_char, 1, system_charset_info); + } + else + result= string->append(name, length, system_charset_info); + +err: + DBUG_RETURN(result); +} + + /* Check (in create) whether the tables exists, and that it can be connected to @@ -458,7 +515,6 @@ bool federated_db_end() static int check_foreign_data_source(FEDERATED_SHARE *share, bool table_create_flag) { - char escaped_table_name[NAME_LEN*2]; char query_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; uint error_code; @@ -499,7 +555,6 @@ static int check_foreign_data_source(FEDERATED_SHARE *share, } else { - int escaped_table_name_length= 0; /* Since we do not support transactions at this version, we can let the client API silently reconnect. For future versions, we will need more @@ -517,14 +572,8 @@ static int check_foreign_data_source(FEDERATED_SHARE *share, query.append(FEDERATED_SELECT); query.append(FEDERATED_STAR); query.append(FEDERATED_FROM); - query.append(FEDERATED_BTICK); - escaped_table_name_length= - escape_string_for_mysql(&my_charset_bin, (char*)escaped_table_name, - sizeof(escaped_table_name), - share->table_name, - share->table_name_length); - query.append(escaped_table_name, escaped_table_name_length); - query.append(FEDERATED_BTICK); + append_ident(&query, share->table_name, share->table_name_length, + ident_quote_char); query.append(FEDERATED_WHERE); query.append(FEDERATED_FALSE); @@ -725,7 +774,9 @@ error: ha_federated::ha_federated(TABLE *table_arg) :handler(&federated_hton, table_arg), mysql(0), stored_result(0) -{} +{ + bzero(&bulk_insert, sizeof(bulk_insert)); +} /* @@ -784,9 +835,8 @@ uint ha_federated::convert_row_to_internal_format(byte *record, static bool emit_key_part_name(String *to, KEY_PART_INFO *part) { DBUG_ENTER("emit_key_part_name"); - if (to->append(FEDERATED_BTICK) || - to->append(part->field->field_name) || - to->append(FEDERATED_BTICK)) + if (append_ident(to, part->field->field_name, + strlen(part->field->field_name), ident_quote_char)) DBUG_RETURN(1); // Out of memory DBUG_RETURN(0); } @@ -1309,31 +1359,28 @@ static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table) query.append(FEDERATED_SELECT); for (field= table->field; *field; field++) { - query.append(FEDERATED_BTICK); - query.append((*field)->field_name); - query.append(FEDERATED_BTICK); + append_ident(&query, (*field)->field_name, + strlen((*field)->field_name), ident_quote_char); query.append(FEDERATED_COMMA); } query.length(query.length()- strlen(FEDERATED_COMMA)); query.append(FEDERATED_FROM); - query.append(FEDERATED_BTICK); + + tmp_share.table_name_length= strlen(tmp_share.table_name); + append_ident(&query, tmp_share.table_name, + tmp_share.table_name_length, ident_quote_char); if (!(share= (FEDERATED_SHARE *) my_multi_malloc(MYF(MY_WME), &share, sizeof(*share), - &select_query, - query.length()+table->s->connect_string.length+1, + &select_query, query.length()+1, NullS))) goto error; memcpy(share, &tmp_share, sizeof(tmp_share)); + memcpy(select_query, query.ptr(), query.length()+1); - share->table_name_length= strlen(share->table_name); - /* TODO: share->table_name to LEX_STRING object */ - query.append(share->table_name, share->table_name_length); - query.append(FEDERATED_BTICK); share->select_query= select_query; - strmov(share->select_query, query.ptr()); share->use_count= 0; DBUG_PRINT("info", ("share->select_query %s", share->select_query)); @@ -1467,6 +1514,8 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked) table->s->reclength); DBUG_PRINT("info", ("ref_length: %u", ref_length)); + reset(); + DBUG_RETURN(0); } @@ -1538,6 +1587,83 @@ inline uint field_in_record_is_null(TABLE *table, DBUG_RETURN(0); } + +/** + @brief Construct the INSERT statement. + + @details This method will construct the INSERT statement and appends it to + the supplied query string buffer. + + @return + @retval FALSE No error + @retval TRUE Failure +*/ + +bool ha_federated::append_stmt_insert(String *query) +{ + char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + Field **field; + uint tmp_length; + + /* The main insert query string */ + String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin); + DBUG_ENTER("ha_federated::append_stmt_insert"); + + insert_string.length(0); + + if (replace_duplicates) + insert_string.append(STRING_WITH_LEN("REPLACE INTO ")); + else if (ignore_duplicates && !insert_dup_update) + insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO ")); + else + insert_string.append(STRING_WITH_LEN("INSERT INTO ")); + append_ident(&insert_string, share->table_name, share->table_name_length, + ident_quote_char); + insert_string.append(FEDERATED_OPENPAREN); + tmp_length= insert_string.length() - strlen(FEDERATED_COMMA); + + /* + loop through the field pointer array, add any fields to both the values + list and the fields list that match the current query id + */ + for (field= table->field; *field; field++) + { + /* append the field name */ + append_ident(&insert_string, (*field)->field_name, + strlen((*field)->field_name), ident_quote_char); + + /* append commas between both fields and fieldnames */ + /* + unfortunately, we can't use the logic + if *(fields + 1) to make the following + appends conditional because we may not append + if the next field doesn't match the condition: + (((*field)->query_id && (*field)->query_id == current_query_id) + */ + insert_string.append(FEDERATED_COMMA); + } + + /* + remove trailing comma + */ + insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA)); + + /* + if there were no fields, we don't want to add a closing paren + AND, we don't want to chop off the last char '(' + insert will be "INSERT INTO t1 VALUES ();" + */ + if (insert_string.length() > tmp_length) + { + insert_string.append(FEDERATED_CLOSEPAREN); + } + + insert_string.append(FEDERATED_VALUES); + + DBUG_RETURN(query->append(insert_string)); +} + + /* write_row() inserts a row. No extra() hint is given currently if a bulk load is happeneding. buf() is a byte array of data. You can use the field @@ -1554,13 +1680,14 @@ inline uint field_in_record_is_null(TABLE *table, int ha_federated::write_row(byte *buf) { - char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char values_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE]; Field **field; + uint tmp_length; + int error= 0; + bool use_bulk_insert; + bool auto_increment_update_required= (table->next_number_field != NULL); - /* The main insert query string */ - String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin); /* The string containing the values to be added to the insert */ String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin); /* The actual value of the field, to be added to the values_string */ @@ -1568,7 +1695,6 @@ int ha_federated::write_row(byte *buf) sizeof(insert_field_value_buffer), &my_charset_bin); values_string.length(0); - insert_string.length(0); insert_field_value_string.length(0); DBUG_ENTER("ha_federated::write_row"); @@ -1578,15 +1704,19 @@ int ha_federated::write_row(byte *buf) /* start both our field and field values strings + We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE" + Ignore duplicates is always true when insert_dup_update is true. + When replace_duplicates == TRUE, we can safely enable multi-row insert. + When performing multi-row insert, we only collect the columns values for + the row. The start of the statement is only created when the first + row is copied in to the bulk_insert string. */ - insert_string.append(FEDERATED_INSERT); - insert_string.append(FEDERATED_BTICK); - insert_string.append(share->table_name, share->table_name_length); - insert_string.append(FEDERATED_BTICK); - insert_string.append(FEDERATED_OPENPAREN); + if (!(use_bulk_insert= bulk_insert.str && + (!insert_dup_update || replace_duplicates))) + append_stmt_insert(&values_string); - values_string.append(FEDERATED_VALUES); values_string.append(FEDERATED_OPENPAREN); + tmp_length= values_string.length(); /* loop through the field pointer array, add any fields to both the values @@ -1599,14 +1729,12 @@ int ha_federated::write_row(byte *buf) else { (*field)->val_str(&insert_field_value_string); - values_string.append('\''); + values_string.append(value_quote_char); insert_field_value_string.print(&values_string); - values_string.append('\''); + values_string.append(value_quote_char); insert_field_value_string.length(0); } - /* append the field name */ - insert_string.append((*field)->field_name); /* append the value */ values_string.append(insert_field_value_string); @@ -1620,32 +1748,61 @@ int ha_federated::write_row(byte *buf) if the next field doesn't match the condition: (((*field)->query_id && (*field)->query_id == current_query_id) */ - insert_string.append(FEDERATED_COMMA); values_string.append(FEDERATED_COMMA); } /* - remove trailing comma - */ - insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA)); - /* if there were no fields, we don't want to add a closing paren AND, we don't want to chop off the last char '(' insert will be "INSERT INTO t1 VALUES ();" */ - if (table->s->fields) + if (values_string.length() > tmp_length) { /* chops off leading commas */ values_string.length(values_string.length() - strlen(FEDERATED_COMMA)); - insert_string.append(FEDERATED_CLOSEPAREN); } /* we always want to append this, even if there aren't any fields */ values_string.append(FEDERATED_CLOSEPAREN); - /* add the values */ - insert_string.append(values_string); + if (use_bulk_insert) + { + /* + Send the current bulk insert out if appending the current row would + cause the statement to overflow the packet size, otherwise set + auto_increment_update_required to FALSE as no query was executed. + */ + if (bulk_insert.length + values_string.length() + bulk_padding > + mysql->net.max_packet_size && bulk_insert.length) + { + error= mysql_real_query(mysql, bulk_insert.str, bulk_insert.length); + bulk_insert.length= 0; + } + else + auto_increment_update_required= FALSE; + + if (bulk_insert.length == 0) + { + char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; + String insert_string(insert_buffer, sizeof(insert_buffer), + &my_charset_bin); + insert_string.length(0); + append_stmt_insert(&insert_string); + dynstr_append_mem(&bulk_insert, insert_string.ptr(), + insert_string.length()); + } + else + dynstr_append_mem(&bulk_insert, ",", 1); - if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length())) + dynstr_append_mem(&bulk_insert, values_string.ptr(), + values_string.length()); + } + else + { + error= mysql_real_query(mysql, values_string.ptr(), + values_string.length()); + } + + if (error) { DBUG_RETURN(stash_remote_error()); } @@ -1653,12 +1810,79 @@ int ha_federated::write_row(byte *buf) If the table we've just written a record to contains an auto_increment field, then store the last_insert_id() value from the foreign server */ - if (table->next_number_field) + if (auto_increment_update_required) update_auto_increment(); DBUG_RETURN(0); } + +/** + @brief Prepares the storage engine for bulk inserts. + + @param[in] rows estimated number of rows in bulk insert + or 0 if unknown. + + @details Initializes memory structures required for bulk insert. +*/ + +void ha_federated::start_bulk_insert(ha_rows rows) +{ + uint page_size; + DBUG_ENTER("ha_federated::start_bulk_insert"); + + dynstr_free(&bulk_insert); + + /** + We don't bother with bulk-insert semantics when the estimated rows == 1 + The rows value will be 0 if the server does not know how many rows + would be inserted. This can occur when performing INSERT...SELECT + */ + + if (rows == 1) + DBUG_VOID_RETURN; + + page_size= (uint) my_getpagesize(); + + if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size)) + DBUG_VOID_RETURN; + + bulk_insert.length= 0; + DBUG_VOID_RETURN; +} + + +/** + @brief End bulk insert. + + @details This method will send any remaining rows to the remote server. + Finally, it will deinitialize the bulk insert data structure. + + @return Operation status + @retval 0 No error + @retval != 0 Error occured at remote server. Also sets my_errno. +*/ + +int ha_federated::end_bulk_insert() +{ + int error= 0; + DBUG_ENTER("ha_federated::end_bulk_insert"); + + if (bulk_insert.str && bulk_insert.length) + { + if (mysql_real_query(mysql, bulk_insert.str, bulk_insert.length)) + error= stash_remote_error(); + else + if (table->next_number_field) + update_auto_increment(); + } + + dynstr_free(&bulk_insert); + + DBUG_RETURN(my_errno= error); +} + + /* ha_federated::update_auto_increment @@ -1688,9 +1912,8 @@ int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt) query.set_charset(system_charset_info); query.append(FEDERATED_OPTIMIZE); - query.append(FEDERATED_BTICK); - query.append(share->table_name, share->table_name_length); - query.append(FEDERATED_BTICK); + append_ident(&query, share->table_name, share->table_name_length, + ident_quote_char); if (mysql_real_query(mysql, query.ptr(), query.length())) { @@ -1711,9 +1934,8 @@ int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt) query.set_charset(system_charset_info); query.append(FEDERATED_REPAIR); - query.append(FEDERATED_BTICK); - query.append(share->table_name, share->table_name_length); - query.append(FEDERATED_BTICK); + append_ident(&query, share->table_name, share->table_name_length, + ident_quote_char); if (check_opt->flags & T_QUICK) query.append(FEDERATED_QUICK); if (check_opt->flags & T_EXTEND) @@ -1788,10 +2010,12 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) update_string.length(0); where_string.length(0); - update_string.append(FEDERATED_UPDATE); - update_string.append(FEDERATED_BTICK); - update_string.append(share->table_name); - update_string.append(FEDERATED_BTICK); + if (ignore_duplicates) + update_string.append(STRING_WITH_LEN("UPDATE IGNORE ")); + else + update_string.append(STRING_WITH_LEN("UPDATE ")); + append_ident(&update_string, share->table_name, + share->table_name_length, ident_quote_char); update_string.append(FEDERATED_SET); /* @@ -1806,8 +2030,11 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) for (Field **field= table->field; *field; field++) { - where_string.append((*field)->field_name); - update_string.append((*field)->field_name); + uint field_name_length= strlen((*field)->field_name); + append_ident(&where_string, (*field)->field_name, field_name_length, + ident_quote_char); + append_ident(&update_string, (*field)->field_name, field_name_length, + ident_quote_char); update_string.append(FEDERATED_EQ); if ((*field)->is_null()) @@ -1816,9 +2043,9 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) { /* otherwise = */ (*field)->val_str(&field_value); - update_string.append('\''); + update_string.append(value_quote_char); field_value.print(&update_string); - update_string.append('\''); + update_string.append(value_quote_char); field_value.length(0); } @@ -1829,9 +2056,9 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) where_string.append(FEDERATED_EQ); (*field)->val_str(&field_value, (char*) (old_data + (*field)->offset())); - where_string.append('\''); + where_string.append(value_quote_char); field_value.print(&where_string); - where_string.append('\''); + where_string.append(value_quote_char); field_value.length(0); } @@ -1888,16 +2115,16 @@ int ha_federated::delete_row(const byte *buf) delete_string.length(0); delete_string.append(FEDERATED_DELETE); delete_string.append(FEDERATED_FROM); - delete_string.append(FEDERATED_BTICK); - delete_string.append(share->table_name); - delete_string.append(FEDERATED_BTICK); + append_ident(&delete_string, share->table_name, + share->table_name_length, ident_quote_char); delete_string.append(FEDERATED_WHERE); for (Field **field= table->field; *field; field++) { Field *cur_field= *field; data_string.length(0); - delete_string.append(cur_field->field_name); + append_ident(&delete_string, (*field)->field_name, + strlen((*field)->field_name), ident_quote_char); if (cur_field->is_null()) { @@ -1907,9 +2134,9 @@ int ha_federated::delete_row(const byte *buf) { delete_string.append(FEDERATED_EQ); cur_field->val_str(&data_string); - delete_string.append('\''); + delete_string.append(value_quote_char); data_string.print(&delete_string); - delete_string.append('\''); + delete_string.append(value_quote_char); } delete_string.append(FEDERATED_AND); @@ -2397,7 +2624,6 @@ int ha_federated::info(uint flag) { char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char status_buf[FEDERATED_QUERY_BUFFER_SIZE]; - char escaped_table_name[FEDERATED_QUERY_BUFFER_SIZE]; int error; uint error_code; MYSQL_RES *result= 0; @@ -2411,14 +2637,8 @@ int ha_federated::info(uint flag) { status_query_string.length(0); status_query_string.append(FEDERATED_INFO); - status_query_string.append(FEDERATED_SQUOTE); - - escape_string_for_mysql(&my_charset_bin, (char *)escaped_table_name, - sizeof(escaped_table_name), - share->table_name, - share->table_name_length); - status_query_string.append(escaped_table_name); - status_query_string.append(FEDERATED_SQUOTE); + append_ident(&status_query_string, share->table_name, + share->table_name_length, value_quote_char); if (mysql_real_query(mysql, status_query_string.ptr(), status_query_string.length())) @@ -2484,6 +2704,51 @@ error: } +/** + @brief Handles extra signals from MySQL server + + @param[in] operation Hint for storage engine + + @return Operation Status + @retval 0 OK + */ +int ha_federated::extra(ha_extra_function operation) +{ + DBUG_ENTER("ha_federated::extra"); + switch (operation) { + case HA_EXTRA_IGNORE_DUP_KEY: + ignore_duplicates= TRUE; + break; + case HA_EXTRA_NO_IGNORE_DUP_KEY: + insert_dup_update= FALSE; + ignore_duplicates= FALSE; + break; + case HA_EXTRA_WRITE_CAN_REPLACE: + replace_duplicates= TRUE; + break; + case HA_EXTRA_WRITE_CANNOT_REPLACE: + /* + We use this flag to ensure that we do not create an "INSERT IGNORE" + statement when inserting new rows into the remote table. + */ + replace_duplicates= FALSE; + break; + case HA_EXTRA_INSERT_WITH_UPDATE: + insert_dup_update= TRUE; + break; + case HA_EXTRA_RESET: + insert_dup_update= FALSE; + ignore_duplicates= FALSE; + replace_duplicates= FALSE; + break; + default: + /* do nothing */ + DBUG_PRINT("info",("unhandled operation: %d", (uint) operation)); + } + DBUG_RETURN(0); +} + + /* Used to delete all rows in a table. Both for cases of truncate and for cases where the optimizer realizes that all rows will be @@ -2506,9 +2771,8 @@ int ha_federated::delete_all_rows() query.set_charset(system_charset_info); query.append(FEDERATED_TRUNCATE); - query.append(FEDERATED_BTICK); - query.append(share->table_name); - query.append(FEDERATED_BTICK); + append_ident(&query, share->table_name, share->table_name_length, + ident_quote_char); /* TRUNCATE won't return anything in mysql_affected_rows @@ -2616,6 +2880,9 @@ int ha_federated::stash_remote_error() DBUG_ENTER("ha_federated::stash_remote_error()"); remote_error_number= mysql_errno(mysql); strmake(remote_error_buf, mysql_error(mysql), sizeof(remote_error_buf)-1); + if (remote_error_number == ER_DUP_ENTRY || + remote_error_number == ER_DUP_KEY) + DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY); DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM); } diff --git a/sql/ha_federated.h b/sql/ha_federated.h index 09c934cb493..b5e1c217eb5 100644 --- a/sql/ha_federated.h +++ b/sql/ha_federated.h @@ -157,6 +157,9 @@ class ha_federated: public handler MYSQL_ROW_OFFSET current_position; // Current position used by ::position() int remote_error_number; char remote_error_buf[FEDERATED_QUERY_BUFFER_SIZE]; + bool ignore_duplicates, replace_duplicates; + bool insert_dup_update; + DYNAMIC_STRING bulk_insert; private: /* @@ -171,6 +174,14 @@ private: bool records_in_range); int stash_remote_error(); + bool append_stmt_insert(String *query); + + int read_next(byte *buf, MYSQL_RES *result); + int index_read_idx_with_result_set(byte *buf, uint index, + const byte *key, + uint key_len, + ha_rkey_function find_flag, + MYSQL_RES **result); public: ha_federated(TABLE *table_arg); ~ha_federated() @@ -256,6 +267,8 @@ public: int open(const char *name, int mode, uint test_if_locked); // required int close(void); // required + void start_bulk_insert(ha_rows rows); + int end_bulk_insert(); int write_row(byte *buf); int update_row(const byte *old_data, byte *new_data); int delete_row(const byte *buf); @@ -284,6 +297,7 @@ public: int rnd_pos(byte *buf, byte *pos); //required void position(const byte *record); //required int info(uint); //required + int extra(ha_extra_function operation); void update_auto_increment(void); int repair(THD* thd, HA_CHECK_OPT* check_opt); @@ -298,14 +312,7 @@ public: THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type); //required - virtual bool get_error_message(int error, String *buf); - - int read_next(byte *buf, MYSQL_RES *result); - int index_read_idx_with_result_set(byte *buf, uint index, - const byte *key, - uint key_len, - ha_rkey_function find_flag, - MYSQL_RES **result); + bool get_error_message(int error, String *buf); }; bool federated_db_init(void); diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 9e094c31dce..73f8c5e4418 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -715,6 +715,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, */ table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS); } + if (duplic == DUP_UPDATE) + table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE); /* let's *try* to start bulk inserts. It won't necessary start them as values_list.elements should be greater than @@ -2434,6 +2436,8 @@ bool Delayed_insert::handle_inserts(void) table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); using_opt_replace= 1; } + if (info.handle_duplicates == DUP_UPDATE) + table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE); thd.clear_error(); // reset error for binlog if (write_record(&thd, table, &info)) { @@ -2761,6 +2765,8 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS); } + if (info.handle_duplicates == DUP_UPDATE) + table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE); thd->no_trans_update.stmt= FALSE; thd->abort_on_warning= (!info.ignore && (thd->variables.sql_mode & @@ -3226,6 +3232,8 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS); } + if (info.handle_duplicates == DUP_UPDATE) + table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE); if (!thd->prelocked_mode) table->file->start_bulk_insert((ha_rows) 0); thd->no_trans_update.stmt= FALSE; |