summaryrefslogtreecommitdiff
path: root/sql/sql_load.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_load.cc')
-rw-r--r--sql/sql_load.cc162
1 files changed, 76 insertions, 86 deletions
diff --git a/sql/sql_load.cc b/sql/sql_load.cc
index d972947c718..9d367149eaa 100644
--- a/sql/sql_load.cc
+++ b/sql/sql_load.cc
@@ -1,6 +1,6 @@
/*
Copyright (c) 2000, 2016, Oracle and/or its affiliates.
- Copyright (c) 2010, 2017, MariaDB Corporation
+ Copyright (c) 2010, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -97,18 +97,57 @@ public:
#define GET (stack_pos != stack ? *--stack_pos : my_b_get(&cache))
#define PUSH(A) *(stack_pos++)=(A)
+#ifdef WITH_WSREP
+/** If requested by wsrep_load_data_splitting, commit and restart
+the transaction after every 10,000 inserted rows. */
+
+static bool wsrep_load_data_split(THD *thd, const TABLE *table,
+ const COPY_INFO &info)
+{
+ DBUG_ENTER("wsrep_load_data_split");
+
+ if (!wsrep_load_data_splitting || !wsrep_on(thd)
+ || !info.records || (info.records % 10000)
+ || !thd->transaction.stmt.ha_list
+ || thd->transaction.stmt.ha_list->ht() != binlog_hton
+ || !thd->transaction.stmt.ha_list->next()
+ || thd->transaction.stmt.ha_list->next()->next())
+ DBUG_RETURN(false);
+
+ if (handlerton* hton= thd->transaction.stmt.ha_list->next()->ht())
+ {
+ if (hton->db_type != DB_TYPE_INNODB)
+ DBUG_RETURN(false);
+ WSREP_DEBUG("intermediate transaction commit in LOAD DATA");
+ if (wsrep_run_wsrep_commit(thd, true) != WSREP_TRX_OK) DBUG_RETURN(true);
+ if (binlog_hton->commit(binlog_hton, thd, true)) DBUG_RETURN(true);
+ wsrep_post_commit(thd, true);
+ hton->commit(hton, thd, true);
+ table->file->extra(HA_EXTRA_FAKE_START_STMT);
+ }
+
+ DBUG_RETURN(false);
+}
+# define WSREP_LOAD_DATA_SPLIT(thd,table,info) \
+ if (wsrep_load_data_split(thd,table,info)) \
+ { \
+ table->auto_increment_field_not_null= FALSE; \
+ DBUG_RETURN(1); \
+ }
+#else /* WITH_WSREP */
+#define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */
+#endif /* WITH_WSREP */
+
class READ_INFO {
File file;
String data; /* Read buffer */
uint fixed_length; /* Length of the fixed length record */
- uint max_length; /* Max length of row */
Term_string m_field_term; /* FIELDS TERMINATED BY 'string' */
Term_string m_line_term; /* LINES TERMINATED BY 'string' */
Term_string m_line_start; /* LINES STARTING BY 'string' */
int enclosed_char,escape_char;
int *stack,*stack_pos;
bool found_end_of_line,start_of_line,eof;
- NET *io_net;
int level; /* for load xml */
bool getbyte(char *to)
@@ -283,13 +322,13 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
killed_state killed_status;
bool is_concurrent;
#endif
- const char *db = table_list->db; // This is never null
+ const char *db= table_list->db.str; // This is never null
/*
If path for file is not defined, we will use the current database.
If this is not set, we will use the directory where the table to be
loaded is located
*/
- const char *tdb= thd->db ? thd->db : db; // Result is never null
+ const char *tdb= thd->db.str ? thd->db.str : db; // Result is never null
ulong skip_lines= ex->skip_lines;
bool transactional_table __attribute__((unused));
DBUG_ENTER("mysql_load");
@@ -340,7 +379,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
!table_list->single_table_updatable() || // and derived tables
check_key_in_view(thd, table_list))
{
- my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "LOAD");
+ my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias.str, "LOAD");
DBUG_RETURN(TRUE);
}
if (table_list->prepare_where(thd, 0, TRUE) ||
@@ -359,7 +398,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
*/
if (unique_table(thd, table_list, table_list->next_global, 0))
{
- my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name,
+ my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name.str,
"LOAD DATA");
DBUG_RETURN(TRUE);
}
@@ -370,6 +409,13 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
is_concurrent= (table_list->lock_type == TL_WRITE_CONCURRENT_INSERT);
#endif
+ if (table->versioned(VERS_TIMESTAMP) && handle_duplicates == DUP_REPLACE)
+ {
+ // Additional memory may be required to create historical items.
+ if (table_list->set_insert_values(thd->mem_root))
+ DBUG_RETURN(TRUE);
+ }
+
if (!fields_vars.elements)
{
Field_iterator_table_ref field_iterator;
@@ -671,8 +717,8 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
writing binary log will be ignored */
if (thd->transaction.stmt.modified_non_trans_table)
(void) write_execute_load_query_log_event(thd, ex,
- table_list->db,
- table_list->table_name,
+ table_list->db.str,
+ table_list->table_name.str,
is_concurrent,
handle_duplicates, ignore,
transactional_table,
@@ -722,7 +768,8 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
{
int errcode= query_error_code(thd, killed_status == NOT_KILLED);
error= write_execute_load_query_log_event(thd, ex,
- table_list->db, table_list->table_name,
+ table_list->db.str,
+ table_list->table_name.str,
is_concurrent,
handle_duplicates, ignore,
transactional_table,
@@ -771,7 +818,7 @@ static bool write_execute_load_query_log_event(THD *thd, sql_exchange* ex,
List<Item> fv;
Item *item, *val;
int n;
- const char *tdb= (thd->db != NULL ? thd->db : db_arg);
+ const char *tdb= (thd->db.str != NULL ? thd->db.str : db_arg);
const char *qualify_db= NULL;
char command_buffer[1024];
String query_str(command_buffer, sizeof(command_buffer),
@@ -787,7 +834,7 @@ static bool write_execute_load_query_log_event(THD *thd, sql_exchange* ex,
lle.set_fname_outside_temp_buf(ex->file_name, strlen(ex->file_name));
query_str.length(0);
- if (!thd->db || strcmp(db_arg, thd->db))
+ if (!thd->db.str || strcmp(db_arg, thd->db.str))
{
/*
If used database differs from table's database,
@@ -814,7 +861,7 @@ static bool write_execute_load_query_log_event(THD *thd, sql_exchange* ex,
if (n++)
query_str.append(", ");
if (item->real_type() == Item::FIELD_ITEM)
- append_identifier(thd, &query_str, item->name.str, item->name.length);
+ append_identifier(thd, &query_str, &item->name);
else
{
/* Actually Item_user_var_as_out_param despite claiming STRING_ITEM. */
@@ -838,7 +885,7 @@ static bool write_execute_load_query_log_event(THD *thd, sql_exchange* ex,
val= lv++;
if (n++)
query_str.append(STRING_WITH_LEN(", "));
- append_identifier(thd, &query_str, item->name.str, item->name.length);
+ append_identifier(thd, &query_str, &item->name);
query_str.append(&val->name);
}
}
@@ -927,7 +974,7 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
*/
while ((sql_field= (Item_field*) it++))
{
- Field *field= sql_field->field;
+ Field *field= sql_field->field;
table->auto_increment_field_not_null= auto_increment_field_not_null;
/*
No fields specified in fields_vars list can be null in this format.
@@ -989,6 +1036,7 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
+ WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= FALSE;
if (err)
@@ -1015,7 +1063,6 @@ continue_loop:;
}
-
static int
read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
List<Item> &fields_vars, List<Item> &set_fields,
@@ -1094,28 +1141,9 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
}
else
{
- Field *field= real_item->field;
- if (field->reset())
- {
- my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name.str,
- thd->get_stmt_da()->current_row_for_warning());
+ DBUG_ASSERT(real_item->field->table == table);
+ if (real_item->field->load_data_set_null(thd))
DBUG_RETURN(1);
- }
- field->set_null();
- if (!field->maybe_null())
- {
- /*
- Timestamp fields that are NOT NULL are autoupdated if there is no
- corresponding value in the data file.
- */
- if (field->type() == MYSQL_TYPE_TIMESTAMP)
- field->set_time();
- else if (field != table->next_number_field)
- field->set_warning(Sql_condition::WARN_LEVEL_WARN,
- ER_WARN_NULL_TO_NOTNULL, 1);
- }
- /* Do not auto-update this field. */
- field->set_has_explicit_value();
}
continue;
@@ -1214,6 +1242,7 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
+ WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= FALSE;
if (err)
@@ -1262,6 +1291,7 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
for ( ; ; it.rewind())
{
+ bool err;
if (thd->killed)
{
thd->send_kill_message();
@@ -1313,21 +1343,9 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
}
else
{
- Field *field= real_item->field;
- field->reset();
- field->set_null();
- if (field == table->next_number_field)
- table->auto_increment_field_not_null= TRUE;
- if (!field->maybe_null())
- {
- if (field->type() == FIELD_TYPE_TIMESTAMP)
- field->set_time();
- else if (field != table->next_number_field)
- field->set_warning(Sql_condition::WARN_LEVEL_WARN,
- ER_WARN_NULL_TO_NOTNULL, 1);
- }
- /* Do not auto-update this field. */
- field->set_has_explicit_value();
+ DBUG_ASSERT(real_item->field->table == table);
+ if (real_item->field->load_data_set_null(thd))
+ DBUG_RETURN(1);
}
continue;
}
@@ -1361,39 +1379,8 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
skip_lines--;
continue;
}
-
- if (item)
- {
- /* Have not read any field, thus input file is simply ended */
- if (item == fields_vars.head())
- break;
-
- for ( ; item; item= it++)
- {
- Item_field *real_item= item->field_for_view_update();
- if (item->type() == Item::STRING_ITEM)
- ((Item_user_var_as_out_param *)item)->set_null_value(cs);
- else if (!real_item)
- {
- my_error(ER_NONUPDATEABLE_COLUMN, MYF(0), item->name.str);
- DBUG_RETURN(1);
- }
- else
- {
- /*
- QQ: We probably should not throw warning for each field.
- But how about intention to always have the same number
- of warnings in THD::cuted_fields (and get rid of cuted_fields
- in the end ?)
- */
- thd->cuted_fields++;
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_WARN_TOO_FEW_RECORDS,
- ER_THD(thd, ER_WARN_TOO_FEW_RECORDS),
- thd->get_stmt_da()->current_row_for_warning());
- }
- }
- }
+
+ DBUG_ASSERT(!item);
if (thd->killed ||
fill_record_n_invoke_before_triggers(thd, table, set_fields, set_values,
@@ -1410,7 +1397,10 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
- if (write_record(thd, table, &info))
+ WSREP_LOAD_DATA_SPLIT(thd, table, info);
+ err= write_record(thd, table, &info);
+ table->auto_increment_field_not_null= false;
+ if (err)
DBUG_RETURN(1);
/*