diff options
Diffstat (limited to 'sql/sql_load.cc')
-rw-r--r-- | sql/sql_load.cc | 156 |
1 files changed, 115 insertions, 41 deletions
diff --git a/sql/sql_load.cc b/sql/sql_load.cc index 1dcc8c2130e..e58c7493a0f 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -1,15 +1,15 @@ /* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB - + This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. - + This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - + You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ @@ -20,6 +20,7 @@ #include "mysql_priv.h" #include <my_dir.h> #include <m_ctype.h> +#include "sql_repl.h" class READ_INFO { File file; @@ -32,6 +33,7 @@ class READ_INFO { int field_term_char,line_term_char,enclosed_char,escape_char; int *stack,*stack_pos; bool found_end_of_line,start_of_line,eof; + bool need_end_io_cache; IO_CACHE cache; NET *io_net; @@ -50,6 +52,22 @@ public: char unescape(char chr); int terminator(char *ptr,uint length); bool find_start_of_fields(); + /* + We need to force cache close before destructor is invoked to log + the last read block + */ + void end_io_cache() + { + ::end_io_cache(&cache); + need_end_io_cache = 0; + } + + /* + Either this method, or we need to make cache public + Arg must be set from mysql_load() since constructor does not see + either the table or THD value + */ + void set_io_cache_arg(void* arg) { cache.arg = arg; } }; static int read_fixed_length(THD *thd,COPY_INFO &info,TABLE *table, @@ -67,20 +85,24 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, File file; TABLE *table; int error; - uint save_skip_lines = ex->skip_lines; String *field_term=ex->field_term,*escaped=ex->escaped, *enclosed=ex->enclosed; bool is_fifo=0; + LOAD_FILE_INFO lf_info; + char * db = table_list->db ? table_list->db : thd->db; bool using_transactions; DBUG_ENTER("mysql_load"); +#ifdef EMBEDDED_LIBRARY + read_file_from_client = 0; //server is always in the same process +#endif + if (escaped->length() > 1 || enclosed->length() > 1) { my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS), MYF(0)); DBUG_RETURN(-1); } - if (!(table = open_ltable(thd,table_list,lock_type))) DBUG_RETURN(-1); if (!fields.elements) @@ -92,7 +114,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, else { // Part field list thd->dupp_field=0; - if (setup_tables(table_list) || setup_fields(thd,table_list,fields,1,0)) + if (setup_tables(table_list) || setup_fields(thd,table_list,fields,1,0,0)) DBUG_RETURN(-1); if (thd->dupp_field) { @@ -103,7 +125,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, uint tot_length=0; bool use_blobs=0,use_timestamp=0; - List_iterator<Item> it(fields); + List_iterator_fast<Item> it(fields); Item_field *field; while ((field=(Item_field*) it++)) @@ -133,12 +155,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, if (read_file_from_client) { - char tmp [FN_REFLEN+1],*end; - DBUG_PRINT("info",("reading local file")); - tmp[0] = (char) 251; /* NULL_LENGTH */ - end=strnmov(tmp+1,ex->file_name,sizeof(tmp)-2); - (void) my_net_write(&thd->net,tmp,(uint) (end-tmp)); - (void) net_flush(&thd->net); + (void)net_request_file(&thd->net,ex->file_name); file = -1; } else @@ -161,9 +178,10 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, MY_STAT stat_info; if (!my_stat(name,&stat_info,MYF(MY_WME))) DBUG_RETURN(-1); - - // the file must be: - if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others + + // if we are not in slave thread, the file must be: + if (!thd->slave_thread && + !((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others #ifndef __EMX__ (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink #endif @@ -196,13 +214,27 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, DBUG_RETURN(-1); // Can't allocate buffers } + if (!opt_old_rpl_compat && mysql_bin_log.is_open()) + { + lf_info.thd = thd; + lf_info.ex = ex; + lf_info.db = db; + lf_info.table_name = table_list->real_name; + lf_info.fields = &fields; + lf_info.handle_dup = handle_duplicates; + lf_info.wrote_create_file = 0; + lf_info.last_pos_in_file = HA_POS_ERROR; + read_info.set_io_cache_arg((void*) &lf_info); + } restore_record(table,2); thd->count_cuted_fields=1; /* calc cuted fields */ thd->cuted_fields=0L; if (ex->line_term->length() && field_term->length()) { - while (ex->skip_lines--) + // ex->skip_lines needs to be preserved for logging + uint skip_lines = ex->skip_lines; + while (skip_lines--) { if (read_info.next_line()) break; @@ -214,7 +246,10 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, if (use_timestamp) table->time_stamp=0; table->next_number_field=table->found_next_number_field; - VOID(table->file->extra(HA_EXTRA_WRITE_CACHE)); + VOID(table->file->extra_opt(HA_EXTRA_WRITE_CACHE, + thd->variables.read_buff_size)); + VOID(table->file->extra_opt(HA_EXTRA_BULK_INSERT_BEGIN, + thd->variables.bulk_insert_buff_size)); if (handle_duplicates == DUP_IGNORE || handle_duplicates == DUP_REPLACE) table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); @@ -224,9 +259,10 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, error=read_fixed_length(thd,info,table,fields,read_info); else error=read_sep_field(thd,info,table,fields,read_info,*enclosed); - if (table->file->extra(HA_EXTRA_NO_CACHE) || - table->file->activate_all_index(thd)) - error=1; /* purecov: inspected */ + if (table->file->extra(HA_EXTRA_NO_CACHE)) + error=1; /* purecov: inspected */ + if (table->file->activate_all_index(thd)) + error=1; /* purecov: inspected */ table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); table->time_stamp=save_time_stamp; table->next_number_field=0; @@ -245,23 +281,42 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, { if (using_transactions) ha_autocommit_or_rollback(thd,error); - DBUG_RETURN(-1); // Error on read + if (!opt_old_rpl_compat && mysql_bin_log.is_open()) + { + if (lf_info.wrote_create_file) + { + Delete_file_log_event d(thd); + mysql_bin_log.write(&d); + } + } + DBUG_RETURN(-1); // Error on read } sprintf(name,ER(ER_LOAD_INFO),info.records,info.deleted, info.records-info.copied,thd->cuted_fields); send_ok(&thd->net,info.copied+info.deleted,0L,name); // on the slave thd->query is never initialized - if(!thd->slave_thread) + if (!thd->slave_thread) mysql_update_log.write(thd,thd->query,thd->query_length); - + if (!using_transactions) thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; - if (!read_file_from_client && mysql_bin_log.is_open()) + if (mysql_bin_log.is_open()) { - ex->skip_lines = save_skip_lines; - Load_log_event qinfo(thd, ex, table->table_cache_key, table->table_name, - fields, handle_duplicates); - mysql_bin_log.write(&qinfo); + if (opt_old_rpl_compat && !read_file_from_client) + { + Load_log_event qinfo(thd, ex, db, table->table_name, fields, + handle_duplicates); + mysql_bin_log.write(&qinfo); + } + if (!opt_old_rpl_compat) + { + read_info.end_io_cache(); // make sure last block gets logged + if (lf_info.wrote_create_file) + { + Execute_load_log_event e(thd); + mysql_bin_log.write(&e); + } + } } if (using_transactions) error=ha_autocommit_or_rollback(thd,error); @@ -277,7 +332,7 @@ static int read_fixed_length(THD *thd,COPY_INFO &info,TABLE *table,List<Item> &fields, READ_INFO &read_info) { - List_iterator<Item> it(fields); + List_iterator_fast<Item> it(fields); Item_field *sql_field; DBUG_ENTER("read_fixed_length"); @@ -325,7 +380,7 @@ read_fixed_length(THD *thd,COPY_INFO &info,TABLE *table,List<Item> &fields, DBUG_RETURN(1); if (table->next_number_field) table->next_number_field->reset(); // Clear for next record - if (read_info.next_line()) // Skipp to next line + if (read_info.next_line()) // Skip to next line break; if (read_info.line_cuted) thd->cuted_fields++; /* To long row */ @@ -340,7 +395,7 @@ read_sep_field(THD *thd,COPY_INFO &info,TABLE *table, List<Item> &fields, READ_INFO &read_info, String &enclosed) { - List_iterator<Item> it(fields); + List_iterator_fast<Item> it(fields); Item_field *sql_field; uint enclosed_length; DBUG_ENTER("read_sep_field"); @@ -390,7 +445,7 @@ read_sep_field(THD *thd,COPY_INFO &info,TABLE *table, { // Last record if (sql_field == (Item_field*) fields.head()) break; - for ( ; sql_field ; sql_field=(Item_field*) it++) + for (; sql_field ; sql_field=(Item_field*) it++) { sql_field->field->set_null(); sql_field->field->reset(); @@ -401,7 +456,7 @@ read_sep_field(THD *thd,COPY_INFO &info,TABLE *table, DBUG_RETURN(1); if (table->next_number_field) table->next_number_field->reset(); // Clear for next record - if (read_info.next_line()) // Skipp to next line + if (read_info.next_line()) // Skip to next line break; if (read_info.line_cuted) thd->cuted_fields++; /* To long row */ @@ -430,8 +485,10 @@ READ_INFO::unescape(char chr) } - /* Read a line using buffering */ - /* If last line is empty (in line mode) then it isn't outputed */ +/* + Read a line using buffering + If last line is empty (in line mode) then it isn't outputed +*/ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term, @@ -488,6 +545,22 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term, my_free((gptr) buffer,MYF(0)); /* purecov: inspected */ error=1; } + else + { + /* + init_io_cache() will not initialize read_function member + if the cache is READ_NET. The reason is explained in + mysys/mf_iocache.c. So we work around the problem with a + manual assignment + */ + if (get_it_from_net) + cache.read_function = _my_b_net_read; + + need_end_io_cache = 1; + if (!opt_old_rpl_compat && mysql_bin_log.is_open()) + cache.pre_read = cache.pre_close = + (IO_CACHE_CALLBACK) log_loaded_block; + } } } @@ -496,7 +569,8 @@ READ_INFO::~READ_INFO() { if (!error) { - end_io_cache(&cache); + if (need_end_io_cache) + ::end_io_cache(&cache); my_free((gptr) buffer,MYF(0)); error=1; } @@ -536,10 +610,10 @@ int READ_INFO::read_field() if (found_end_of_line) return 1; // One have to call next_line - /* Skipp until we find 'line_start' */ + /* Skip until we find 'line_start' */ if (start_of_line) - { // Skipp until line_start + { // Skip until line_start start_of_line=0; if (find_start_of_fields()) return 1; @@ -682,7 +756,7 @@ found_eof: /* ** One can't use fixed length with multi-byte charset ** */ - + int READ_INFO::read_fixed_length() { int chr; @@ -691,7 +765,7 @@ int READ_INFO::read_fixed_length() return 1; // One have to call next_line if (start_of_line) - { // Skipp until line_start + { // Skip until line_start start_of_line=0; if (find_start_of_fields()) return 1; |