diff options
Diffstat (limited to 'sql/sql_load.cc')
-rw-r--r-- | sql/sql_load.cc | 162 |
1 files changed, 76 insertions, 86 deletions
diff --git a/sql/sql_load.cc b/sql/sql_load.cc index d972947c718..9d367149eaa 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -1,6 +1,6 @@ /* Copyright (c) 2000, 2016, Oracle and/or its affiliates. - Copyright (c) 2010, 2017, MariaDB Corporation + Copyright (c) 2010, 2018, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -97,18 +97,57 @@ public: #define GET (stack_pos != stack ? *--stack_pos : my_b_get(&cache)) #define PUSH(A) *(stack_pos++)=(A) +#ifdef WITH_WSREP +/** If requested by wsrep_load_data_splitting, commit and restart +the transaction after every 10,000 inserted rows. */ + +static bool wsrep_load_data_split(THD *thd, const TABLE *table, + const COPY_INFO &info) +{ + DBUG_ENTER("wsrep_load_data_split"); + + if (!wsrep_load_data_splitting || !wsrep_on(thd) + || !info.records || (info.records % 10000) + || !thd->transaction.stmt.ha_list + || thd->transaction.stmt.ha_list->ht() != binlog_hton + || !thd->transaction.stmt.ha_list->next() + || thd->transaction.stmt.ha_list->next()->next()) + DBUG_RETURN(false); + + if (handlerton* hton= thd->transaction.stmt.ha_list->next()->ht()) + { + if (hton->db_type != DB_TYPE_INNODB) + DBUG_RETURN(false); + WSREP_DEBUG("intermediate transaction commit in LOAD DATA"); + if (wsrep_run_wsrep_commit(thd, true) != WSREP_TRX_OK) DBUG_RETURN(true); + if (binlog_hton->commit(binlog_hton, thd, true)) DBUG_RETURN(true); + wsrep_post_commit(thd, true); + hton->commit(hton, thd, true); + table->file->extra(HA_EXTRA_FAKE_START_STMT); + } + + DBUG_RETURN(false); +} +# define WSREP_LOAD_DATA_SPLIT(thd,table,info) \ + if (wsrep_load_data_split(thd,table,info)) \ + { \ + table->auto_increment_field_not_null= FALSE; \ + DBUG_RETURN(1); \ + } +#else /* WITH_WSREP */ +#define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */ +#endif /* WITH_WSREP */ + class READ_INFO { File file; String data; /* Read buffer */ uint fixed_length; /* Length of the fixed length record */ - uint max_length; /* Max length of row */ Term_string m_field_term; /* FIELDS TERMINATED BY 'string' */ Term_string m_line_term; /* LINES TERMINATED BY 'string' */ Term_string m_line_start; /* LINES STARTING BY 'string' */ int enclosed_char,escape_char; int *stack,*stack_pos; bool found_end_of_line,start_of_line,eof; - NET *io_net; int level; /* for load xml */ bool getbyte(char *to) @@ -283,13 +322,13 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, killed_state killed_status; bool is_concurrent; #endif - const char *db = table_list->db; // This is never null + const char *db= table_list->db.str; // This is never null /* If path for file is not defined, we will use the current database. If this is not set, we will use the directory where the table to be loaded is located */ - const char *tdb= thd->db ? thd->db : db; // Result is never null + const char *tdb= thd->db.str ? thd->db.str : db; // Result is never null ulong skip_lines= ex->skip_lines; bool transactional_table __attribute__((unused)); DBUG_ENTER("mysql_load"); @@ -340,7 +379,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, !table_list->single_table_updatable() || // and derived tables check_key_in_view(thd, table_list)) { - my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "LOAD"); + my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias.str, "LOAD"); DBUG_RETURN(TRUE); } if (table_list->prepare_where(thd, 0, TRUE) || @@ -359,7 +398,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, */ if (unique_table(thd, table_list, table_list->next_global, 0)) { - my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name, + my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name.str, "LOAD DATA"); DBUG_RETURN(TRUE); } @@ -370,6 +409,13 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, is_concurrent= (table_list->lock_type == TL_WRITE_CONCURRENT_INSERT); #endif + if (table->versioned(VERS_TIMESTAMP) && handle_duplicates == DUP_REPLACE) + { + // Additional memory may be required to create historical items. + if (table_list->set_insert_values(thd->mem_root)) + DBUG_RETURN(TRUE); + } + if (!fields_vars.elements) { Field_iterator_table_ref field_iterator; @@ -671,8 +717,8 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, writing binary log will be ignored */ if (thd->transaction.stmt.modified_non_trans_table) (void) write_execute_load_query_log_event(thd, ex, - table_list->db, - table_list->table_name, + table_list->db.str, + table_list->table_name.str, is_concurrent, handle_duplicates, ignore, transactional_table, @@ -722,7 +768,8 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, { int errcode= query_error_code(thd, killed_status == NOT_KILLED); error= write_execute_load_query_log_event(thd, ex, - table_list->db, table_list->table_name, + table_list->db.str, + table_list->table_name.str, is_concurrent, handle_duplicates, ignore, transactional_table, @@ -771,7 +818,7 @@ static bool write_execute_load_query_log_event(THD *thd, sql_exchange* ex, List<Item> fv; Item *item, *val; int n; - const char *tdb= (thd->db != NULL ? thd->db : db_arg); + const char *tdb= (thd->db.str != NULL ? thd->db.str : db_arg); const char *qualify_db= NULL; char command_buffer[1024]; String query_str(command_buffer, sizeof(command_buffer), @@ -787,7 +834,7 @@ static bool write_execute_load_query_log_event(THD *thd, sql_exchange* ex, lle.set_fname_outside_temp_buf(ex->file_name, strlen(ex->file_name)); query_str.length(0); - if (!thd->db || strcmp(db_arg, thd->db)) + if (!thd->db.str || strcmp(db_arg, thd->db.str)) { /* If used database differs from table's database, @@ -814,7 +861,7 @@ static bool write_execute_load_query_log_event(THD *thd, sql_exchange* ex, if (n++) query_str.append(", "); if (item->real_type() == Item::FIELD_ITEM) - append_identifier(thd, &query_str, item->name.str, item->name.length); + append_identifier(thd, &query_str, &item->name); else { /* Actually Item_user_var_as_out_param despite claiming STRING_ITEM. */ @@ -838,7 +885,7 @@ static bool write_execute_load_query_log_event(THD *thd, sql_exchange* ex, val= lv++; if (n++) query_str.append(STRING_WITH_LEN(", ")); - append_identifier(thd, &query_str, item->name.str, item->name.length); + append_identifier(thd, &query_str, &item->name); query_str.append(&val->name); } } @@ -927,7 +974,7 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, */ while ((sql_field= (Item_field*) it++)) { - Field *field= sql_field->field; + Field *field= sql_field->field; table->auto_increment_field_not_null= auto_increment_field_not_null; /* No fields specified in fields_vars list can be null in this format. @@ -989,6 +1036,7 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_RETURN(-1); } + WSREP_LOAD_DATA_SPLIT(thd, table, info); err= write_record(thd, table, &info); table->auto_increment_field_not_null= FALSE; if (err) @@ -1015,7 +1063,6 @@ continue_loop:; } - static int read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, List<Item> &fields_vars, List<Item> &set_fields, @@ -1094,28 +1141,9 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, } else { - Field *field= real_item->field; - if (field->reset()) - { - my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name.str, - thd->get_stmt_da()->current_row_for_warning()); + DBUG_ASSERT(real_item->field->table == table); + if (real_item->field->load_data_set_null(thd)) DBUG_RETURN(1); - } - field->set_null(); - if (!field->maybe_null()) - { - /* - Timestamp fields that are NOT NULL are autoupdated if there is no - corresponding value in the data file. - */ - if (field->type() == MYSQL_TYPE_TIMESTAMP) - field->set_time(); - else if (field != table->next_number_field) - field->set_warning(Sql_condition::WARN_LEVEL_WARN, - ER_WARN_NULL_TO_NOTNULL, 1); - } - /* Do not auto-update this field. */ - field->set_has_explicit_value(); } continue; @@ -1214,6 +1242,7 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_RETURN(-1); } + WSREP_LOAD_DATA_SPLIT(thd, table, info); err= write_record(thd, table, &info); table->auto_increment_field_not_null= FALSE; if (err) @@ -1262,6 +1291,7 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, for ( ; ; it.rewind()) { + bool err; if (thd->killed) { thd->send_kill_message(); @@ -1313,21 +1343,9 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, } else { - Field *field= real_item->field; - field->reset(); - field->set_null(); - if (field == table->next_number_field) - table->auto_increment_field_not_null= TRUE; - if (!field->maybe_null()) - { - if (field->type() == FIELD_TYPE_TIMESTAMP) - field->set_time(); - else if (field != table->next_number_field) - field->set_warning(Sql_condition::WARN_LEVEL_WARN, - ER_WARN_NULL_TO_NOTNULL, 1); - } - /* Do not auto-update this field. */ - field->set_has_explicit_value(); + DBUG_ASSERT(real_item->field->table == table); + if (real_item->field->load_data_set_null(thd)) + DBUG_RETURN(1); } continue; } @@ -1361,39 +1379,8 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, skip_lines--; continue; } - - if (item) - { - /* Have not read any field, thus input file is simply ended */ - if (item == fields_vars.head()) - break; - - for ( ; item; item= it++) - { - Item_field *real_item= item->field_for_view_update(); - if (item->type() == Item::STRING_ITEM) - ((Item_user_var_as_out_param *)item)->set_null_value(cs); - else if (!real_item) - { - my_error(ER_NONUPDATEABLE_COLUMN, MYF(0), item->name.str); - DBUG_RETURN(1); - } - else - { - /* - QQ: We probably should not throw warning for each field. - But how about intention to always have the same number - of warnings in THD::cuted_fields (and get rid of cuted_fields - in the end ?) - */ - thd->cuted_fields++; - push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, - ER_WARN_TOO_FEW_RECORDS, - ER_THD(thd, ER_WARN_TOO_FEW_RECORDS), - thd->get_stmt_da()->current_row_for_warning()); - } - } - } + + DBUG_ASSERT(!item); if (thd->killed || fill_record_n_invoke_before_triggers(thd, table, set_fields, set_values, @@ -1410,7 +1397,10 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list, DBUG_RETURN(-1); } - if (write_record(thd, table, &info)) + WSREP_LOAD_DATA_SPLIT(thd, table, info); + err= write_record(thd, table, &info); + table->auto_increment_field_not_null= false; + if (err) DBUG_RETURN(1); /* |