summaryrefslogtreecommitdiff
path: root/sql/sql_load.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_load.cc')
-rw-r--r--sql/sql_load.cc156
1 files changed, 115 insertions, 41 deletions
diff --git a/sql/sql_load.cc b/sql/sql_load.cc
index 1dcc8c2130e..e58c7493a0f 100644
--- a/sql/sql_load.cc
+++ b/sql/sql_load.cc
@@ -1,15 +1,15 @@
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
-
+
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
-
+
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
-
+
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
@@ -20,6 +20,7 @@
#include "mysql_priv.h"
#include <my_dir.h>
#include <m_ctype.h>
+#include "sql_repl.h"
class READ_INFO {
File file;
@@ -32,6 +33,7 @@ class READ_INFO {
int field_term_char,line_term_char,enclosed_char,escape_char;
int *stack,*stack_pos;
bool found_end_of_line,start_of_line,eof;
+ bool need_end_io_cache;
IO_CACHE cache;
NET *io_net;
@@ -50,6 +52,22 @@ public:
char unescape(char chr);
int terminator(char *ptr,uint length);
bool find_start_of_fields();
+ /*
+ We need to force cache close before destructor is invoked to log
+ the last read block
+ */
+ void end_io_cache()
+ {
+ ::end_io_cache(&cache);
+ need_end_io_cache = 0;
+ }
+
+ /*
+ Either this method, or we need to make cache public
+ Arg must be set from mysql_load() since constructor does not see
+ either the table or THD value
+ */
+ void set_io_cache_arg(void* arg) { cache.arg = arg; }
};
static int read_fixed_length(THD *thd,COPY_INFO &info,TABLE *table,
@@ -67,20 +85,24 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
File file;
TABLE *table;
int error;
- uint save_skip_lines = ex->skip_lines;
String *field_term=ex->field_term,*escaped=ex->escaped,
*enclosed=ex->enclosed;
bool is_fifo=0;
+ LOAD_FILE_INFO lf_info;
+ char * db = table_list->db ? table_list->db : thd->db;
bool using_transactions;
DBUG_ENTER("mysql_load");
+#ifdef EMBEDDED_LIBRARY
+ read_file_from_client = 0; //server is always in the same process
+#endif
+
if (escaped->length() > 1 || enclosed->length() > 1)
{
my_message(ER_WRONG_FIELD_TERMINATORS,ER(ER_WRONG_FIELD_TERMINATORS),
MYF(0));
DBUG_RETURN(-1);
}
-
if (!(table = open_ltable(thd,table_list,lock_type)))
DBUG_RETURN(-1);
if (!fields.elements)
@@ -92,7 +114,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
else
{ // Part field list
thd->dupp_field=0;
- if (setup_tables(table_list) || setup_fields(thd,table_list,fields,1,0))
+ if (setup_tables(table_list) || setup_fields(thd,table_list,fields,1,0,0))
DBUG_RETURN(-1);
if (thd->dupp_field)
{
@@ -103,7 +125,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
uint tot_length=0;
bool use_blobs=0,use_timestamp=0;
- List_iterator<Item> it(fields);
+ List_iterator_fast<Item> it(fields);
Item_field *field;
while ((field=(Item_field*) it++))
@@ -133,12 +155,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (read_file_from_client)
{
- char tmp [FN_REFLEN+1],*end;
- DBUG_PRINT("info",("reading local file"));
- tmp[0] = (char) 251; /* NULL_LENGTH */
- end=strnmov(tmp+1,ex->file_name,sizeof(tmp)-2);
- (void) my_net_write(&thd->net,tmp,(uint) (end-tmp));
- (void) net_flush(&thd->net);
+ (void)net_request_file(&thd->net,ex->file_name);
file = -1;
}
else
@@ -161,9 +178,10 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
MY_STAT stat_info;
if (!my_stat(name,&stat_info,MYF(MY_WME)))
DBUG_RETURN(-1);
-
- // the file must be:
- if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
+
+ // if we are not in slave thread, the file must be:
+ if (!thd->slave_thread &&
+ !((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
#ifndef __EMX__
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
#endif
@@ -196,13 +214,27 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
DBUG_RETURN(-1); // Can't allocate buffers
}
+ if (!opt_old_rpl_compat && mysql_bin_log.is_open())
+ {
+ lf_info.thd = thd;
+ lf_info.ex = ex;
+ lf_info.db = db;
+ lf_info.table_name = table_list->real_name;
+ lf_info.fields = &fields;
+ lf_info.handle_dup = handle_duplicates;
+ lf_info.wrote_create_file = 0;
+ lf_info.last_pos_in_file = HA_POS_ERROR;
+ read_info.set_io_cache_arg((void*) &lf_info);
+ }
restore_record(table,2);
thd->count_cuted_fields=1; /* calc cuted fields */
thd->cuted_fields=0L;
if (ex->line_term->length() && field_term->length())
{
- while (ex->skip_lines--)
+ // ex->skip_lines needs to be preserved for logging
+ uint skip_lines = ex->skip_lines;
+ while (skip_lines--)
{
if (read_info.next_line())
break;
@@ -214,7 +246,10 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (use_timestamp)
table->time_stamp=0;
table->next_number_field=table->found_next_number_field;
- VOID(table->file->extra(HA_EXTRA_WRITE_CACHE));
+ VOID(table->file->extra_opt(HA_EXTRA_WRITE_CACHE,
+ thd->variables.read_buff_size));
+ VOID(table->file->extra_opt(HA_EXTRA_BULK_INSERT_BEGIN,
+ thd->variables.bulk_insert_buff_size));
if (handle_duplicates == DUP_IGNORE ||
handle_duplicates == DUP_REPLACE)
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
@@ -224,9 +259,10 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
error=read_fixed_length(thd,info,table,fields,read_info);
else
error=read_sep_field(thd,info,table,fields,read_info,*enclosed);
- if (table->file->extra(HA_EXTRA_NO_CACHE) ||
- table->file->activate_all_index(thd))
- error=1; /* purecov: inspected */
+ if (table->file->extra(HA_EXTRA_NO_CACHE))
+ error=1; /* purecov: inspected */
+ if (table->file->activate_all_index(thd))
+ error=1; /* purecov: inspected */
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
table->time_stamp=save_time_stamp;
table->next_number_field=0;
@@ -245,23 +281,42 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
{
if (using_transactions)
ha_autocommit_or_rollback(thd,error);
- DBUG_RETURN(-1); // Error on read
+ if (!opt_old_rpl_compat && mysql_bin_log.is_open())
+ {
+ if (lf_info.wrote_create_file)
+ {
+ Delete_file_log_event d(thd);
+ mysql_bin_log.write(&d);
+ }
+ }
+ DBUG_RETURN(-1); // Error on read
}
sprintf(name,ER(ER_LOAD_INFO),info.records,info.deleted,
info.records-info.copied,thd->cuted_fields);
send_ok(&thd->net,info.copied+info.deleted,0L,name);
// on the slave thd->query is never initialized
- if(!thd->slave_thread)
+ if (!thd->slave_thread)
mysql_update_log.write(thd,thd->query,thd->query_length);
-
+
if (!using_transactions)
thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
- if (!read_file_from_client && mysql_bin_log.is_open())
+ if (mysql_bin_log.is_open())
{
- ex->skip_lines = save_skip_lines;
- Load_log_event qinfo(thd, ex, table->table_cache_key, table->table_name,
- fields, handle_duplicates);
- mysql_bin_log.write(&qinfo);
+ if (opt_old_rpl_compat && !read_file_from_client)
+ {
+ Load_log_event qinfo(thd, ex, db, table->table_name, fields,
+ handle_duplicates);
+ mysql_bin_log.write(&qinfo);
+ }
+ if (!opt_old_rpl_compat)
+ {
+ read_info.end_io_cache(); // make sure last block gets logged
+ if (lf_info.wrote_create_file)
+ {
+ Execute_load_log_event e(thd);
+ mysql_bin_log.write(&e);
+ }
+ }
}
if (using_transactions)
error=ha_autocommit_or_rollback(thd,error);
@@ -277,7 +332,7 @@ static int
read_fixed_length(THD *thd,COPY_INFO &info,TABLE *table,List<Item> &fields,
READ_INFO &read_info)
{
- List_iterator<Item> it(fields);
+ List_iterator_fast<Item> it(fields);
Item_field *sql_field;
DBUG_ENTER("read_fixed_length");
@@ -325,7 +380,7 @@ read_fixed_length(THD *thd,COPY_INFO &info,TABLE *table,List<Item> &fields,
DBUG_RETURN(1);
if (table->next_number_field)
table->next_number_field->reset(); // Clear for next record
- if (read_info.next_line()) // Skipp to next line
+ if (read_info.next_line()) // Skip to next line
break;
if (read_info.line_cuted)
thd->cuted_fields++; /* To long row */
@@ -340,7 +395,7 @@ read_sep_field(THD *thd,COPY_INFO &info,TABLE *table,
List<Item> &fields, READ_INFO &read_info,
String &enclosed)
{
- List_iterator<Item> it(fields);
+ List_iterator_fast<Item> it(fields);
Item_field *sql_field;
uint enclosed_length;
DBUG_ENTER("read_sep_field");
@@ -390,7 +445,7 @@ read_sep_field(THD *thd,COPY_INFO &info,TABLE *table,
{ // Last record
if (sql_field == (Item_field*) fields.head())
break;
- for ( ; sql_field ; sql_field=(Item_field*) it++)
+ for (; sql_field ; sql_field=(Item_field*) it++)
{
sql_field->field->set_null();
sql_field->field->reset();
@@ -401,7 +456,7 @@ read_sep_field(THD *thd,COPY_INFO &info,TABLE *table,
DBUG_RETURN(1);
if (table->next_number_field)
table->next_number_field->reset(); // Clear for next record
- if (read_info.next_line()) // Skipp to next line
+ if (read_info.next_line()) // Skip to next line
break;
if (read_info.line_cuted)
thd->cuted_fields++; /* To long row */
@@ -430,8 +485,10 @@ READ_INFO::unescape(char chr)
}
- /* Read a line using buffering */
- /* If last line is empty (in line mode) then it isn't outputed */
+/*
+ Read a line using buffering
+ If last line is empty (in line mode) then it isn't outputed
+*/
READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term,
@@ -488,6 +545,22 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term,
my_free((gptr) buffer,MYF(0)); /* purecov: inspected */
error=1;
}
+ else
+ {
+ /*
+ init_io_cache() will not initialize read_function member
+ if the cache is READ_NET. The reason is explained in
+ mysys/mf_iocache.c. So we work around the problem with a
+ manual assignment
+ */
+ if (get_it_from_net)
+ cache.read_function = _my_b_net_read;
+
+ need_end_io_cache = 1;
+ if (!opt_old_rpl_compat && mysql_bin_log.is_open())
+ cache.pre_read = cache.pre_close =
+ (IO_CACHE_CALLBACK) log_loaded_block;
+ }
}
}
@@ -496,7 +569,8 @@ READ_INFO::~READ_INFO()
{
if (!error)
{
- end_io_cache(&cache);
+ if (need_end_io_cache)
+ ::end_io_cache(&cache);
my_free((gptr) buffer,MYF(0));
error=1;
}
@@ -536,10 +610,10 @@ int READ_INFO::read_field()
if (found_end_of_line)
return 1; // One have to call next_line
- /* Skipp until we find 'line_start' */
+ /* Skip until we find 'line_start' */
if (start_of_line)
- { // Skipp until line_start
+ { // Skip until line_start
start_of_line=0;
if (find_start_of_fields())
return 1;
@@ -682,7 +756,7 @@ found_eof:
/*
** One can't use fixed length with multi-byte charset **
*/
-
+
int READ_INFO::read_fixed_length()
{
int chr;
@@ -691,7 +765,7 @@ int READ_INFO::read_fixed_length()
return 1; // One have to call next_line
if (start_of_line)
- { // Skipp until line_start
+ { // Skip until line_start
start_of_line=0;
if (find_start_of_fields())
return 1;