diff options
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r-- | sql/sql_insert.cc | 1613 |
1 files changed, 1262 insertions, 351 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 283fe571d53..7274d38a7cc 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -17,17 +17,59 @@ /* Insert of records */ +/* + INSERT DELAYED + + Insert delayed is distinguished from a normal insert by lock_type == + TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a + "delayed" table (delayed_get_table()), but falls back to + open_and_lock_tables() on error and proceeds as normal insert then. + + Opening a "delayed" table means to find a delayed insert thread that + has the table open already. If this fails, a new thread is created and + waited for to open and lock the table. + + If accessing the thread succeeded, in + delayed_insert::get_local_table() the table of the thread is copied + for local use. A copy is required because the normal insert logic + works on a target table, but the other threads table object must not + be used. The insert logic uses the record buffer to create a record. + And the delayed insert thread uses the record buffer to pass the + record to the table handler. So there must be different objects. Also + the copied table is not included in the lock, so that the statement + can proceed even if the real table cannot be accessed at this moment. + + Copying a table object is not a trivial operation. Besides the TABLE + object there are the field pointer array, the field objects and the + record buffer. After copying the field objects, their pointers into + the record must be "moved" to point to the new record buffer. + + After this setup the normal insert logic is used. Only that for + delayed inserts write_delayed() is called instead of write_record(). + It inserts the rows into a queue and signals the delayed insert thread + instead of writing directly to the table. + + The delayed insert thread awakes from the signal. It locks the table, + inserts the rows from the queue, unlocks the table, and waits for the + next signal. It does normally live until a FLUSH TABLES or SHUTDOWN. + +*/ + #include "mysql_priv.h" +#include "sp_head.h" +#include "sql_trigger.h" +#include "sql_select.h" static int check_null_fields(THD *thd,TABLE *entry); #ifndef EMBEDDED_LIBRARY static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list); static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, bool ignore, - char *query, uint query_length, int log_on); + char *query, uint query_length, bool log_on); static void end_delayed_insert(THD *thd); -extern "C" pthread_handler_decl(handle_delayed_insert,arg); +pthread_handler_t handle_delayed_insert(void *arg); static void unlink_blobs(register TABLE *table); #endif +static bool check_view_insertability(THD *thd, TABLE_LIST *view); /* Define to force use of my_malloc() if the allocated memory block is big */ @@ -39,9 +81,6 @@ static void unlink_blobs(register TABLE *table); #define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0)) #endif -#define DELAYED_LOG_UPDATE 1 -#define DELAYED_LOG_BIN 2 - /* Check if insert fields are correct. @@ -52,6 +91,7 @@ static void unlink_blobs(register TABLE *table); table The table for insert. fields The insert fields. values The insert values. + check_unique If duplicate values should be rejected. NOTE Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type @@ -63,50 +103,101 @@ static void unlink_blobs(register TABLE *table); -1 Error */ -static int check_insert_fields(THD *thd, TABLE *table, List<Item> &fields, - List<Item> &values) +static int check_insert_fields(THD *thd, TABLE_LIST *table_list, + List<Item> &fields, List<Item> &values, + bool check_unique) { + TABLE *table= table_list->table; + + if (!table_list->updatable) + { + my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "INSERT"); + return -1; + } + if (fields.elements == 0 && values.elements != 0) { - if (values.elements != table->fields) + if (!table) { - my_printf_error(ER_WRONG_VALUE_COUNT_ON_ROW, - ER(ER_WRONG_VALUE_COUNT_ON_ROW), - MYF(0), 1L); + my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0), + table_list->view_db.str, table_list->view_name.str); return -1; } -#ifndef NO_EMBEDDED_ACCESS_CHECKS - if (grant_option && - check_grant_all_columns(thd,INSERT_ACL,table)) + if (values.elements != table->s->fields) + { + my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L); return -1; + } +#ifndef NO_EMBEDDED_ACCESS_CHECKS + if (grant_option) + { + Field_iterator_table fields; + fields.set_table(table); + if (check_grant_all_columns(thd, INSERT_ACL, &table->grant, + table->s->db, table->s->table_name, + &fields)) + return -1; + } #endif clear_timestamp_auto_bits(table->timestamp_field_type, TIMESTAMP_AUTO_SET_ON_INSERT); } else { // Part field list + SELECT_LEX *select_lex= &thd->lex->select_lex; + Name_resolution_context *context= &select_lex->context; + Name_resolution_context_state ctx_state; + int res; + if (fields.elements != values.elements) { - my_printf_error(ER_WRONG_VALUE_COUNT_ON_ROW, - ER(ER_WRONG_VALUE_COUNT_ON_ROW), - MYF(0), 1L); + my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L); return -1; } - TABLE_LIST table_list; - bzero((char*) &table_list,sizeof(table_list)); - table_list.db= table->table_cache_key; - table_list.real_name= table_list.alias= table->table_name; - table_list.table=table; - table_list.grant=table->grant; thd->dupp_field=0; - if (setup_tables(&table_list) || - setup_fields(thd, 0, &table_list,fields,1,0,0)) + select_lex->no_wrap_view_item= TRUE; + + /* Save the state of the current name resolution context. */ + ctx_state.save_state(context, table_list); + + /* + Perform name resolution only in the first table - 'table_list', + which is the table that is inserted into. + */ + table_list->next_local= 0; + context->resolve_in_table_list_only(table_list); + res= setup_fields(thd, 0, fields, 1, 0, 0); + + /* Restore the current context. */ + ctx_state.restore_state(context, table_list); + thd->lex->select_lex.no_wrap_view_item= FALSE; + + if (res) return -1; - if (thd->dupp_field) + if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE) { - my_error(ER_FIELD_SPECIFIED_TWICE,MYF(0), thd->dupp_field->field_name); + /* it is join view => we need to find table for update */ + List_iterator_fast<Item> it(fields); + Item *item; + TABLE_LIST *tbl= 0; // reset for call to check_single_table() + table_map map= 0; + + while ((item= it++)) + map|= item->used_tables(); + if (table_list->check_single_table(&tbl, map, table_list) || tbl == 0) + { + my_error(ER_VIEW_MULTIUPDATE, MYF(0), + table_list->view_db.str, table_list->view_name.str); + return -1; + } + table_list->table= table= tbl->table; + } + + if (check_unique && thd->dupp_field) + { + my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dupp_field->field_name); return -1; } if (table->timestamp_field && // Don't set timestamp if used @@ -116,8 +207,17 @@ static int check_insert_fields(THD *thd, TABLE *table, List<Item> &fields, } // For the values we need select_priv #ifndef NO_EMBEDDED_ACCESS_CHECKS - table->grant.want_privilege=(SELECT_ACL & ~table->grant.privilege); + table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege); #endif + + if (check_key_in_view(thd, table_list) || + (table_list->view && + check_view_insertability(thd, table_list))) + { + my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "INSERT"); + return -1; + } + return 0; } @@ -141,11 +241,11 @@ static int check_insert_fields(THD *thd, TABLE *table, List<Item> &fields, -1 Error */ -static int check_update_fields(THD *thd, TABLE *table, - TABLE_LIST *insert_table_list, +static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list, List<Item> &update_fields) { - ulong timestamp_query_id; + TABLE *table= insert_table_list->table; + query_id_t timestamp_query_id; LINT_INIT(timestamp_query_id); /* @@ -155,14 +255,14 @@ static int check_update_fields(THD *thd, TABLE *table, if (table->timestamp_field) { timestamp_query_id= table->timestamp_field->query_id; - table->timestamp_field->query_id= thd->query_id-1; + table->timestamp_field->query_id= thd->query_id - 1; } /* Check the fields we are going to modify. This will set the query_id of all used fields to the threads query_id. */ - if (setup_fields(thd, 0, insert_table_list, update_fields, 1, 0, 0)) + if (setup_fields(thd, 0, update_fields, 1, 0, 0)) return -1; if (table->timestamp_field) @@ -179,13 +279,40 @@ static int check_update_fields(THD *thd, TABLE *table, } -int mysql_insert(THD *thd,TABLE_LIST *table_list, - List<Item> &fields, - List<List_item> &values_list, - List<Item> &update_fields, - List<Item> &update_values, - enum_duplicates duplic, - bool ignore) +/* + Mark fields used by triggers for INSERT-like statement. + + SYNOPSIS + mark_fields_used_by_triggers_for_insert_stmt() + thd The current thread + table Table to which insert will happen + duplic Type of duplicate handling for insert which will happen + + NOTE + For REPLACE there is no sense in marking particular fields + used by ON DELETE trigger as to execute it properly we have + to retrieve and store values for all table columns anyway. +*/ + +void mark_fields_used_by_triggers_for_insert_stmt(THD *thd, TABLE *table, + enum_duplicates duplic) +{ + if (table->triggers) + { + table->triggers->mark_fields_used(thd, TRG_EVENT_INSERT); + if (duplic == DUP_UPDATE) + table->triggers->mark_fields_used(thd, TRG_EVENT_UPDATE); + } +} + + +bool mysql_insert(THD *thd,TABLE_LIST *table_list, + List<Item> &fields, + List<List_item> &values_list, + List<Item> &update_fields, + List<Item> &update_values, + enum_duplicates duplic, + bool ignore) { int error, res; /* @@ -193,27 +320,26 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, By default, both logs are enabled (this won't cause problems if the server runs without --log-update or --log-bin). */ - int log_on= DELAYED_LOG_UPDATE | DELAYED_LOG_BIN ; - bool transactional_table, log_delayed, joins_freed= FALSE; + bool log_on= (thd->options & OPTION_BIN_LOG) || + (!(thd->security_ctx->master_access & SUPER_ACL)); + bool transactional_table, joins_freed= FALSE; + bool changed; uint value_count; ulong counter = 1; ulonglong id; COPY_INFO info; - TABLE *table; + TABLE *table= 0; List_iterator_fast<List_item> its(values_list); List_item *values; + Name_resolution_context *context; + Name_resolution_context_state ctx_state; #ifndef EMBEDDED_LIBRARY char *query= thd->query; #endif thr_lock_type lock_type = table_list->lock_type; - TABLE_LIST *insert_table_list= (TABLE_LIST*) - thd->lex->select_lex.table_list.first; + Item *unused_conds= 0; DBUG_ENTER("mysql_insert"); - if (!(thd->options & OPTION_UPDATE_LOG)) - log_on&= ~(int) DELAYED_LOG_UPDATE; - if (!(thd->options & OPTION_BIN_LOG)) - log_on&= ~(int) DELAYED_LOG_BIN; /* in safe mode or with skip-new change delayed insert to be regular if we are told to replace duplicates, the insert cannot be concurrent @@ -237,23 +363,29 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, { if (thd->locked_tables) { - if (find_locked_table(thd, - table_list->db ? table_list->db : thd->db, - table_list->real_name)) + DBUG_ASSERT(table_list->db); /* Must be set in the parser */ + if (find_locked_table(thd, table_list->db, table_list->table_name)) { - my_printf_error(ER_DELAYED_INSERT_TABLE_LOCKED, - ER(ER_DELAYED_INSERT_TABLE_LOCKED), - MYF(0), table_list->real_name); - DBUG_RETURN(-1); + my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0), + table_list->table_name); + DBUG_RETURN(TRUE); } } if ((table= delayed_get_table(thd,table_list)) && !thd->is_fatal_error) { - res= 0; - if (table_list->next) /* if sub select */ - res= open_and_lock_tables(thd, table_list->next); + /* + Open tables used for sub-selects or in stored functions, will also + cache these functions. + */ + res= open_and_lock_tables(thd, table_list->next_global); + /* + First is not processed by open_and_lock_tables() => we need set + updateability flags "by hands". + */ + if (!table_list->derived && !table_list->view) + table_list->updatable= 1; // usual table } - else + else if (thd->net.last_errno != ER_WRONG_OBJECT) { /* Too many delayed insert threads; Use a normal insert */ table_list->lock_type= lock_type= TL_WRITE; @@ -264,49 +396,65 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, #endif /* EMBEDDED_LIBRARY */ res= open_and_lock_tables(thd, table_list); if (res || thd->is_fatal_error) - DBUG_RETURN(-1); + DBUG_RETURN(TRUE); - table= table_list->table; thd->proc_info="init"; thd->used_tables=0; values= its++; - if (mysql_prepare_insert(thd, table_list, insert_table_list, - insert_table_list, table, - fields, values, update_fields, - update_values, duplic)) + if (mysql_prepare_insert(thd, table_list, table, fields, values, + update_fields, update_values, duplic, &unused_conds, + FALSE)) goto abort; + /* mysql_prepare_insert set table_list->table if it was not set */ + table= table_list->table; + + context= &thd->lex->select_lex.context; + /* Save the state of the current name resolution context. */ + ctx_state.save_state(context, table_list); + + /* + Perform name resolution only in the first table - 'table_list', + which is the table that is inserted into. + */ + table_list->next_local= 0; + context->resolve_in_table_list_only(table_list); + value_count= values->elements; while ((values= its++)) { counter++; if (values->elements != value_count) { - my_printf_error(ER_WRONG_VALUE_COUNT_ON_ROW, - ER(ER_WRONG_VALUE_COUNT_ON_ROW), - MYF(0),counter); + my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter); goto abort; } - if (setup_fields(thd, 0, insert_table_list, *values, 0, 0, 0)) + if (setup_fields(thd, 0, *values, 0, 0, 0)) goto abort; } its.rewind (); + + /* Restore the current context. */ + ctx_state.restore_state(context, table_list); + /* Fill in the given fields and dump it to the table file */ - info.records= info.deleted= info.copied= info.updated= 0; info.ignore= ignore; info.handle_duplicates=duplic; info.update_fields= &update_fields; info.update_values= &update_values; + info.view= (table_list->view ? table_list : 0); + /* Count warnings for all inserts. For single line insert, generate an error if try to set a NOT NULL field - to NULL + to NULL. */ - thd->count_cuted_fields= ((values_list.elements == 1) ? + thd->count_cuted_fields= ((values_list.elements == 1 && + !ignore) ? CHECK_FIELD_ERROR_FOR_NULL : CHECK_FIELD_WARN); thd->cuted_fields = 0L; @@ -317,30 +465,70 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, thd->proc_info="update"; if (duplic != DUP_ERROR || ignore) table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); + if (duplic == DUP_REPLACE) + { + if (!table->triggers || !table->triggers->has_delete_triggers()) + table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + /* + REPLACE should change values of all columns so we should mark + all columns as columns to be set. As nice side effect we will + retrieve columns which values are needed for ON DELETE triggers. + */ + table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS); + } /* let's *try* to start bulk inserts. It won't necessary start them as values_list.elements should be greater than some - handler dependent - threshold. + We should not start bulk inserts if this statement uses + functions or invokes triggers since they may access + to the same table and therefore should not see its + inconsistent state created by this optimization. So we call start_bulk_insert to perform nesessary checks on values_list.elements, and - if nothing else - to initialize the code to make the call of end_bulk_insert() below safe. */ - if (lock_type != TL_WRITE_DELAYED) + if (lock_type != TL_WRITE_DELAYED && !thd->prelocked_mode) table->file->start_bulk_insert(values_list.elements); + thd->no_trans_update= 0; + thd->abort_on_warning= (!ignore && + (thd->variables.sql_mode & + (MODE_STRICT_TRANS_TABLES | + MODE_STRICT_ALL_TABLES))); + + if ((fields.elements || !value_count) && + check_that_all_fields_are_given_values(thd, table, table_list)) + { + /* thd->net.report_error is now set, which will abort the next loop */ + error= 1; + } + + mark_fields_used_by_triggers_for_insert_stmt(thd, table, duplic); + + if (table_list->prepare_where(thd, 0, TRUE) || + table_list->prepare_check_option(thd)) + error= 1; + while ((values= its++)) { if (fields.elements || !value_count) { - restore_record(table,default_values); // Get empty record - if (fill_record(fields, *values, 0)|| thd->net.report_error || - check_null_fields(thd,table)) + restore_record(table,s->default_values); // Get empty record + if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0, + table->triggers, + TRG_EVENT_INSERT)) { if (values_list.elements != 1 && !thd->net.report_error) { info.records++; continue; } + /* + TODO: set thd->abort_on_warning if values_list.elements == 1 + and check that all items return warning in case of problem with + storing field. + */ error=1; break; } @@ -348,10 +536,19 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, else { if (thd->used_tables) // Column used in values() - restore_record(table,default_values); // Get empty record + restore_record(table,s->default_values); // Get empty record else - table->record[0][0]=table->default_values[0]; // Fix delete marker - if (fill_record(table->field,*values, 0) || thd->net.report_error) + { + /* + Fix delete marker. No need to restore rest of record since it will + be overwritten by fill_record() anyway (and fill_record() does not + use default values in this case). + */ + table->record[0][0]= table->s->default_values[0]; + } + if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0, + table->triggers, + TRG_EVENT_INSERT)) { if (values_list.elements != 1 && ! thd->net.report_error) { @@ -362,6 +559,18 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, break; } } + + if ((res= table_list->view_check_option(thd, + (values_list.elements == 1 ? + 0 : + ignore))) == + VIEW_CHECK_SKIP) + continue; + else if (res == VIEW_CHECK_ERROR) + { + error= 1; + break; + } #ifndef EMBEDDED_LIBRARY if (lock_type == TL_WRITE_DELAYED) { @@ -370,9 +579,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, } else #endif - error=write_record(table,&info); - if (error) - break; + error=write_record(thd, table ,&info); /* If auto_increment values are used, save the first one for LAST_INSERT_ID() and for the update log. @@ -383,6 +590,8 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, { // Get auto increment value id= thd->last_insert_id; } + if (error) + break; thd->row_count++; } @@ -407,7 +616,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, else #endif { - if (table->file->end_bulk_insert() && !error) + if (!thd->prelocked_mode && table->file->end_bulk_insert() && !error) { table->file->print_error(my_errno,MYF(0)); error=1; @@ -417,32 +626,30 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, else if (table->next_number_field && info.copied) id=table->next_number_field->val_int(); // Return auto_increment value - /* - Invalidate the table in the query cache if something changed. - For the transactional algorithm to work the invalidation must be - before binlog writing and ha_autocommit_... - */ - if (info.copied || info.deleted || info.updated) - query_cache_invalidate3(thd, table_list, 1); - transactional_table= table->file->has_transactions(); - log_delayed= (transactional_table || table->tmp_table); - if ((info.copied || info.deleted || info.updated) && - (error <= 0 || !transactional_table)) + if ((changed= (info.copied || info.deleted || info.updated))) { - mysql_update_log.write(thd, thd->query, thd->query_length); - if (mysql_bin_log.is_open()) + /* + Invalidate the table in the query cache if something changed. + For the transactional algorithm to work the invalidation must be + before binlog writing and ha_autocommit_or_rollback + */ + query_cache_invalidate3(thd, table_list, 1); + if (error <= 0 || !transactional_table) { - if (error <= 0) - thd->clear_error(); - Query_log_event qinfo(thd, thd->query, thd->query_length, - log_delayed, FALSE); - if (mysql_bin_log.write(&qinfo) && transactional_table) - error=1; + if (mysql_bin_log.is_open()) + { + if (error <= 0) + thd->clear_error(); + Query_log_event qinfo(thd, thd->query, thd->query_length, + transactional_table, FALSE); + if (mysql_bin_log.write(&qinfo) && transactional_table) + error=1; + } + if (!transactional_table) + thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; } - if (!log_delayed) - thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; } if (transactional_table) error=ha_autocommit_or_rollback(thd,error); @@ -450,6 +657,16 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, if (thd->lock) { mysql_unlock_tables(thd, thd->lock); + /* + Invalidate the table in the query cache if something changed + after unlocking when changes become fisible. + TODO: this is workaround. right way will be move invalidating in + the unlock procedure. + */ + if (lock_type == TL_WRITE_CONCURRENT_INSERT && changed) + { + query_cache_invalidate3(thd, table_list, 1); + } thd->lock=0; } } @@ -459,6 +676,9 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, thd->next_insert_id=0; // Reset this if wrongly used if (duplic != DUP_ERROR || ignore) table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + if (duplic == DUP_REPLACE && + (!table->triggers || !table->triggers->has_delete_triggers())) + table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); /* Reset value of LAST_INSERT_ID if no rows where inserted */ if (!info.copied && thd->insert_id_used) @@ -470,7 +690,10 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, goto abort; if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) || !thd->cuted_fields)) - send_ok(thd,info.copied+info.deleted+info.updated,id); + { + thd->row_count_func= info.copied+info.deleted+info.updated; + send_ok(thd, (ulong) thd->row_count_func, id); + } else { char buff[160]; @@ -481,10 +704,11 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, else sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records, (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields); - ::send_ok(thd,info.copied+info.deleted+info.updated,(ulonglong)id,buff); + thd->row_count_func= info.copied+info.deleted+info.updated; + ::send_ok(thd, (ulong) thd->row_count_func, id, buff); } - table->insert_values=0; - DBUG_RETURN(0); + thd->abort_on_warning= 0; + DBUG_RETURN(FALSE); abort: #ifndef EMBEDDED_LIBRARY @@ -493,8 +717,145 @@ abort: #endif if (!joins_freed) free_underlaid_joins(thd, &thd->lex->select_lex); - table->insert_values=0; - DBUG_RETURN(-1); + thd->abort_on_warning= 0; + DBUG_RETURN(TRUE); +} + + +/* + Additional check for insertability for VIEW + + SYNOPSIS + check_view_insertability() + thd - thread handler + view - reference on VIEW + + IMPLEMENTATION + A view is insertable if the folloings are true: + - All columns in the view are columns from a table + - All not used columns in table have a default values + - All field in view are unique (not referring to the same column) + + RETURN + FALSE - OK + view->contain_auto_increment is 1 if and only if the view contains an + auto_increment field + + TRUE - can't be used for insert +*/ + +static bool check_view_insertability(THD * thd, TABLE_LIST *view) +{ + uint num= view->view->select_lex.item_list.elements; + TABLE *table= view->table; + Field_translator *trans_start= view->field_translation, + *trans_end= trans_start + num; + Field_translator *trans; + Field **field_ptr= table->field; + uint used_fields_buff_size= (table->s->fields + 7) / 8; + uchar *used_fields_buff= (uchar*)thd->alloc(used_fields_buff_size); + MY_BITMAP used_fields; + bool save_set_query_id= thd->set_query_id; + DBUG_ENTER("check_key_in_view"); + + if (!used_fields_buff) + DBUG_RETURN(TRUE); // EOM + + DBUG_ASSERT(view->table != 0 && view->field_translation != 0); + + VOID(bitmap_init(&used_fields, used_fields_buff, used_fields_buff_size * 8, + 0)); + bitmap_clear_all(&used_fields); + + view->contain_auto_increment= 0; + /* + we must not set query_id for fields as they're not + really used in this context + */ + thd->set_query_id= 0; + /* check simplicity and prepare unique test of view */ + for (trans= trans_start; trans != trans_end; trans++) + { + if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item)) + { + thd->set_query_id= save_set_query_id; + DBUG_RETURN(TRUE); + } + Item_field *field; + /* simple SELECT list entry (field without expression) */ + if (!(field= trans->item->filed_for_view_update())) + { + thd->set_query_id= save_set_query_id; + DBUG_RETURN(TRUE); + } + if (field->field->unireg_check == Field::NEXT_NUMBER) + view->contain_auto_increment= 1; + /* prepare unique test */ + /* + remove collation (or other transparent for update function) if we have + it + */ + trans->item= field; + } + thd->set_query_id= save_set_query_id; + /* unique test */ + for (trans= trans_start; trans != trans_end; trans++) + { + /* Thanks to test above, we know that all columns are of type Item_field */ + Item_field *field= (Item_field *)trans->item; + /* check fields belong to table in which we are inserting */ + if (field->field->table == table && + bitmap_fast_test_and_set(&used_fields, field->field->field_index)) + DBUG_RETURN(TRUE); + } + + DBUG_RETURN(FALSE); +} + + +/* + Check if table can be updated + + SYNOPSIS + mysql_prepare_insert_check_table() + thd Thread handle + table_list Table list + fields List of fields to be updated + where Pointer to where clause + select_insert Check is making for SELECT ... INSERT + + RETURN + FALSE ok + TRUE ERROR +*/ + +static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list, + List<Item> &fields, COND **where, + bool select_insert) +{ + bool insert_into_view= (table_list->view != 0); + DBUG_ENTER("mysql_prepare_insert_check_table"); + + if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context, + &thd->lex->select_lex.top_join_list, + table_list, where, + &thd->lex->select_lex.leaf_tables, + select_insert, INSERT_ACL)) + DBUG_RETURN(TRUE); + + if (insert_into_view && !fields.elements) + { + thd->lex->empty_field_list_on_rset= 1; + if (!table_list->table) + { + my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0), + table_list->view_db.str, table_list->view_name.str); + DBUG_RETURN(TRUE); + } + DBUG_RETURN(insert_view_fields(thd, &fields, table_list)); + } + + DBUG_RETURN(FALSE); } @@ -503,15 +864,12 @@ abort: SYNOPSIS mysql_prepare_insert() - thd thread handler - table_list global table list (not including first table for - INSERT ... SELECT) - insert_table_list Table we are inserting into (for INSERT ... SELECT) - dup_table_list Tables to be used in ON DUPLICATE KEY - It's either all global tables or only the table we - insert into, depending on if we are using GROUP BY - in the SELECT clause). - values Values to insert. NULL for INSERT ... SELECT + thd Thread handler + table_list Global/local table list + table Table to insert into (can be NULL if table should + be taken from table_list->table) + where Where clause (for insert ... select) + select_insert TRUE if INSERT ... SELECT statement TODO (in far future) In cases of: @@ -522,51 +880,118 @@ abort: WARNING You MUST set table->insert_values to 0 after calling this function before releasing the table object. - + RETURN VALUE - 0 OK - -1 error (message is not sent to user) + FALSE OK + TRUE error */ -int mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, - TABLE_LIST *insert_table_list, - TABLE_LIST *dup_table_list, - TABLE *table, - List<Item> &fields, List_item *values, - List<Item> &update_fields, List<Item> &update_values, - enum_duplicates duplic) +bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, + TABLE *table, List<Item> &fields, List_item *values, + List<Item> &update_fields, List<Item> &update_values, + enum_duplicates duplic, + COND **where, bool select_insert) { + SELECT_LEX *select_lex= &thd->lex->select_lex; + Name_resolution_context *context= &select_lex->context; + Name_resolution_context_state ctx_state; + bool insert_into_view= (table_list->view != 0); + bool res= 0; DBUG_ENTER("mysql_prepare_insert"); + DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d", + (ulong)table_list, (ulong)table, + (int)insert_into_view)); - if (duplic == DUP_UPDATE && !table->insert_values) + /* + For subqueries in VALUES() we should not see the table in which we are + inserting (for INSERT ... SELECT this is done by changing table_list, + because INSERT ... SELECT share SELECT_LEX it with SELECT. + */ + if (!select_insert) + { + for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit(); + un; + un= un->next_unit()) + { + for (SELECT_LEX *sl= un->first_select(); + sl; + sl= sl->next_select()) + { + sl->context.outer_context= 0; + } + } + } + + if (duplic == DUP_UPDATE) { /* it should be allocated before Item::fix_fields() */ - table->insert_values= - (byte *)alloc_root(thd->mem_root, table->rec_buff_length); - if (!table->insert_values) - DBUG_RETURN(-1); + if (table_list->set_insert_values(thd->mem_root)) + DBUG_RETURN(TRUE); } - if (setup_tables(insert_table_list)) - DBUG_RETURN(-1); - if (values) - { - if (check_insert_fields(thd, table, fields, *values) || - setup_fields(thd, 0, insert_table_list, *values, 0, 0, 0) || - (duplic == DUP_UPDATE && - (check_update_fields(thd, table, insert_table_list, update_fields) || - setup_fields(thd, 0, dup_table_list, update_values, 1, 0, 0)))) - DBUG_RETURN(-1); - if (find_real_table_in_list(table_list->next, table_list->db, - table_list->real_name)) + + if (mysql_prepare_insert_check_table(thd, table_list, fields, where, + select_insert)) + DBUG_RETURN(TRUE); + + /* Save the state of the current name resolution context. */ + ctx_state.save_state(context, table_list); + + /* + Perform name resolution only in the first table - 'table_list', + which is the table that is inserted into. + */ + table_list->next_local= 0; + context->resolve_in_table_list_only(table_list); + + /* Prepare the fields in the statement. */ + if (values && + !(res= check_insert_fields(thd, context->table_list, fields, *values, + !insert_into_view) || + setup_fields(thd, 0, *values, 0, 0, 0)) && + duplic == DUP_UPDATE) + { + select_lex->no_wrap_view_item= TRUE; + res= check_update_fields(thd, context->table_list, update_fields); + select_lex->no_wrap_view_item= FALSE; + /* + When we are not using GROUP BY we can refer to other tables in the + ON DUPLICATE KEY part. + */ + if (select_lex->group_list.elements == 0) + { + context->table_list->next_local= ctx_state.save_next_local; + /* first_name_resolution_table was set by resolve_in_table_list_only() */ + context->first_name_resolution_table-> + next_name_resolution_table= ctx_state.save_next_local; + } + if (!res) + res= setup_fields(thd, 0, update_values, 1, 0, 0); + } + + /* Restore the current context. */ + ctx_state.restore_state(context, table_list); + + if (res) + DBUG_RETURN(res); + + if (!table) + table= table_list->table; + + if (!select_insert) + { + Item *fake_conds= 0; + TABLE_LIST *duplicate; + if ((duplicate= unique_table(thd, table_list, table_list->next_global))) { - my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->real_name); - DBUG_RETURN(-1); + update_non_unique_table_error(table_list, "INSERT", duplicate); + DBUG_RETURN(TRUE); } + select_lex->fix_prepare_information(thd, &fake_conds); + select_lex->first_execution= 0; } if (duplic == DUP_UPDATE || duplic == DUP_REPLACE) table->file->extra(HA_EXTRA_RETRIEVE_PRIMARY_KEY); - - DBUG_RETURN(0); + DBUG_RETURN(FALSE); } @@ -574,7 +999,7 @@ int mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, static int last_uniq_key(TABLE *table,uint keynr) { - while (++keynr < table->keys) + while (++keynr < table->s->keys) if (table->key_info[keynr].flags & HA_NOSAME) return 0; return 1; @@ -582,13 +1007,35 @@ static int last_uniq_key(TABLE *table,uint keynr) /* - Write a record to table with optional deleting of conflicting records + Write a record to table with optional deleting of conflicting records, + invoke proper triggers if needed. + + SYNOPSIS + write_record() + thd - thread context + table - table to which record should be written + info - COPY_INFO structure describing handling of duplicates + and which is used for counting number of records inserted + and deleted. + + NOTE + Once this record will be written to table after insert trigger will + be invoked. If instead of inserting new record we will update old one + then both on update triggers will work instead. Similarly both on + delete triggers will be invoked if we will delete conflicting records. + + Sets thd->no_trans_update if table which is updated didn't have + transactions. + + RETURN VALUE + 0 - success + non-0 - error */ -int write_record(TABLE *table,COPY_INFO *info) +int write_record(THD *thd, TABLE *table,COPY_INFO *info) { - int error; + int error, trg_error= 0; char *key=0; DBUG_ENTER("write_record"); @@ -598,9 +1045,9 @@ int write_record(TABLE *table,COPY_INFO *info) { while ((error=table->file->write_row(table->record[0]))) { + uint key_nr; if (error != HA_WRITE_SKIP) goto err; - uint key_nr; if ((int) (key_nr = table->file->get_dup_key(error)) < 0) { error=HA_WRITE_SKIP; /* Database can't find key */ @@ -613,7 +1060,7 @@ int write_record(TABLE *table,COPY_INFO *info) */ if (info->handle_duplicates == DUP_REPLACE && table->next_number_field && - key_nr == table->next_number_index && + key_nr == table->s->next_number_index && table->file->auto_increment_column_changed) goto err; if (table->file->table_flags() & HA_DUPP_POS) @@ -631,14 +1078,14 @@ int write_record(TABLE *table,COPY_INFO *info) if (!key) { - if (!(key=(char*) my_safe_alloca(table->max_unique_length, + if (!(key=(char*) my_safe_alloca(table->s->max_unique_length, MAX_KEY_LENGTH))) { error=ENOMEM; goto err; } } - key_copy((byte*) key,table,key_nr,0); + key_copy((byte*) key,table->record[0],table->key_info+key_nr,0); if ((error=(table->file->index_read_idx(table->record[1],key_nr, (byte*) key, table->key_info[key_nr]. @@ -648,24 +1095,50 @@ int write_record(TABLE *table,COPY_INFO *info) } if (info->handle_duplicates == DUP_UPDATE) { - /* we don't check for other UNIQUE keys - the first row - that matches, is updated. If update causes a conflict again, - an error is returned + int res= 0; + /* + We don't check for other UNIQUE keys - the first row + that matches, is updated. If update causes a conflict again, + an error is returned */ DBUG_ASSERT(table->insert_values != NULL); store_record(table,insert_values); restore_record(table,record[1]); - DBUG_ASSERT(info->update_fields->elements==info->update_values->elements); - if (fill_record(*info->update_fields, *info->update_values, 0)) - goto err; + DBUG_ASSERT(info->update_fields->elements == + info->update_values->elements); + if (fill_record_n_invoke_before_triggers(thd, *info->update_fields, + *info->update_values, 0, + table->triggers, + TRG_EVENT_UPDATE)) + goto before_trg_err; + + /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */ + if (info->view && + (res= info->view->view_check_option(current_thd, info->ignore)) == + VIEW_CHECK_SKIP) + goto ok_or_after_trg_err; + if (res == VIEW_CHECK_ERROR) + goto before_trg_err; + if ((error=table->file->update_row(table->record[1],table->record[0]))) { if ((error == HA_ERR_FOUND_DUPP_KEY) && info->ignore) - break; + { + table->file->restore_auto_increment(); + goto ok_or_after_trg_err; + } goto err; } info->updated++; - break; + + if (table->next_number_field) + table->file->adjust_next_insert_id_after_explicit_value(table->next_number_field->val_int()); + + trg_error= (table->triggers && + table->triggers->process_triggers(thd, TRG_EVENT_UPDATE, + TRG_ACTION_AFTER, TRUE)); + info->copied++; + goto ok_or_after_trg_err; } else /* DUP_REPLACE */ { @@ -676,69 +1149,128 @@ int write_record(TABLE *table,COPY_INFO *info) to convert the latter operation internally to an UPDATE. We also should not perform this conversion if we have timestamp field with ON UPDATE which is different from DEFAULT. + Another case when conversion should not be performed is when + we have ON DELETE trigger on table so user may notice that + we cheat here. Note that it is ok to do such conversion for + tables which have ON UPDATE but have no ON DELETE triggers, + we just should not expose this fact to users by invoking + ON UPDATE triggers. */ if (last_uniq_key(table,key_nr) && !table->file->referenced_by_foreign_key() && (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET || - table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH)) + table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) && + (!table->triggers || !table->triggers->has_delete_triggers())) { if ((error=table->file->update_row(table->record[1], table->record[0]))) goto err; info->deleted++; - break; /* Update logfile and count */ + /* + Since we pretend that we have done insert we should call + its after triggers. + */ + goto after_trg_n_copied_inc; + } + else + { + if (table->triggers && + table->triggers->process_triggers(thd, TRG_EVENT_DELETE, + TRG_ACTION_BEFORE, TRUE)) + goto before_trg_err; + if ((error=table->file->delete_row(table->record[1]))) + goto err; + info->deleted++; + if (!table->file->has_transactions()) + thd->no_trans_update= 1; + if (table->triggers && + table->triggers->process_triggers(thd, TRG_EVENT_DELETE, + TRG_ACTION_AFTER, TRUE)) + { + trg_error= 1; + goto ok_or_after_trg_err; + } + /* Let us attempt do write_row() once more */ } - else if ((error=table->file->delete_row(table->record[1]))) - goto err; - info->deleted++; } } - info->copied++; } else if ((error=table->file->write_row(table->record[0]))) { if (!info->ignore || (error != HA_ERR_FOUND_DUPP_KEY && error != HA_ERR_FOUND_DUPP_UNIQUE)) goto err; + table->file->restore_auto_increment(); + goto ok_or_after_trg_err; } - else - info->copied++; + +after_trg_n_copied_inc: + info->copied++; + trg_error= (table->triggers && + table->triggers->process_triggers(thd, TRG_EVENT_INSERT, + TRG_ACTION_AFTER, TRUE)); + +ok_or_after_trg_err: if (key) - my_safe_afree(key,table->max_unique_length,MAX_KEY_LENGTH); - DBUG_RETURN(0); + my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH); + if (!table->file->has_transactions()) + thd->no_trans_update= 1; + DBUG_RETURN(trg_error); err: - if (key) - my_safe_afree(key,table->max_unique_length,MAX_KEY_LENGTH); info->last_errno= error; + /* current_select is NULL if this is a delayed insert */ + if (thd->lex->current_select) + thd->lex->current_select->no_error= 0; // Give error table->file->print_error(error,MYF(0)); + +before_trg_err: + table->file->restore_auto_increment(); + if (key) + my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH); DBUG_RETURN(1); } /****************************************************************************** Check that all fields with arn't null_fields are used - If DONT_USE_DEFAULT_FIELDS isn't defined use default value for not set - fields. ******************************************************************************/ -static int check_null_fields(THD *thd __attribute__((unused)), - TABLE *entry __attribute__((unused))) +int check_that_all_fields_are_given_values(THD *thd, TABLE *entry, + TABLE_LIST *table_list) { -#ifdef DONT_USE_DEFAULT_FIELDS + int err= 0; for (Field **field=entry->field ; *field ; field++) { - if ((*field)->query_id != thd->query_id && !(*field)->maybe_null() && - *field != entry->timestamp_field && - *field != entry->next_number_field) + if ((*field)->query_id != thd->query_id && + ((*field)->flags & NO_DEFAULT_VALUE_FLAG) && + ((*field)->real_type() != FIELD_TYPE_ENUM)) { - my_printf_error(ER_BAD_NULL_ERROR, ER(ER_BAD_NULL_ERROR),MYF(0), - (*field)->field_name); - return 1; + bool view= FALSE; + if (table_list) + { + table_list= table_list->top_table(); + view= test(table_list->view); + } + if (view) + { + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + ER_NO_DEFAULT_FOR_VIEW_FIELD, + ER(ER_NO_DEFAULT_FOR_VIEW_FIELD), + table_list->view_db.str, + table_list->view_name.str); + } + else + { + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + ER_NO_DEFAULT_FOR_FIELD, + ER(ER_NO_DEFAULT_FOR_FIELD), + (*field)->field_name); + } + err= 1; } } -#endif - return 0; + return thd->abort_on_warning ? err : 0; } /***************************************************************************** @@ -753,14 +1285,13 @@ public: char *record,*query; enum_duplicates dup; time_t start_time; - bool query_start_used,last_insert_id_used,insert_id_used, ignore; - int log_query; + bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query; ulonglong last_insert_id; timestamp_auto_set_type timestamp_field_type; uint query_length; - delayed_row(enum_duplicates dup_arg, bool ignore_arg, int log_query_arg) - :record(0),query(0),dup(dup_arg),ignore(ignore_arg),log_query(log_query_arg) {} + delayed_row(enum_duplicates dup_arg, bool ignore_arg, bool log_query_arg) + :record(0), query(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {} ~delayed_row() { x_free(record); @@ -787,8 +1318,8 @@ public: table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0), group_count(0) { - thd.user=thd.priv_user=(char*) delayed_user; - thd.host=(char*) my_localhost; + thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user; + thd.security_ctx->host=(char*) my_localhost; thd.current_tablenr=0; thd.version=refresh_version; thd.command=COM_DELAYED_INSERT; @@ -798,7 +1329,7 @@ public: bzero((char*) &thd.net, sizeof(thd.net)); // Safety bzero((char*) &table_list, sizeof(table_list)); // Safety thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT; - thd.host_or_ip= ""; + thd.security_ctx->host_or_ip= ""; bzero((char*) &info,sizeof(info)); pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST); pthread_cond_init(&cond,NULL); @@ -821,7 +1352,7 @@ public: pthread_cond_destroy(&cond_client); thd.unlink(); // Must be unlinked under lock x_free(thd.query); - thd.user=thd.host=0; + thd.security_ctx->user= thd.security_ctx->host=0; thread_count--; delayed_insert_threads--; VOID(pthread_mutex_unlock(&LOCK_thread_count)); @@ -867,7 +1398,7 @@ delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list) while ((tmp=it++)) { if (!strcmp(tmp->thd.db,table_list->db) && - !strcmp(table_list->real_name,tmp->table->real_name)) + !strcmp(table_list->table_name,tmp->table->s->table_name)) { tmp->lock(); break; @@ -885,8 +1416,8 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) TABLE *table; DBUG_ENTER("delayed_get_table"); - if (!table_list->db) - table_list->db=thd->db; + /* Must be set in the parser */ + DBUG_ASSERT(table_list->db); /* Find the thread which handles this table. */ if (!(tmp=find_handler(thd,table_list))) @@ -905,18 +1436,6 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) */ if (! (tmp= find_handler(thd, table_list))) { - /* - Avoid that a global read lock steps in while we are creating the - new thread. It would block trying to open the table. Hence, the - DI thread and this thread would wait until after the global - readlock is gone. Since the insert thread needs to wait for a - global read lock anyway, we do it right now. Note that - wait_if_global_read_lock() sets a protection against a new - global read lock when it succeeds. This needs to be released by - start_waiting_global_read_lock(). - */ - if (wait_if_global_read_lock(thd, 0, 1)) - goto err; if (!(tmp=new delayed_insert())) { my_error(ER_OUTOFMEMORY,MYF(0),sizeof(delayed_insert)); @@ -925,16 +1444,16 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) pthread_mutex_lock(&LOCK_thread_count); thread_count++; pthread_mutex_unlock(&LOCK_thread_count); - if (!(tmp->thd.db=my_strdup(table_list->db,MYF(MY_WME))) || - !(tmp->thd.query=my_strdup(table_list->real_name,MYF(MY_WME)))) + tmp->thd.set_db(table_list->db, strlen(table_list->db)); + tmp->thd.query= my_strdup(table_list->table_name,MYF(MY_WME)); + if (tmp->thd.db == NULL || tmp->thd.query == NULL) { delete tmp; - my_error(ER_OUT_OF_RESOURCES,MYF(0)); + my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0)); goto err1; } tmp->table_list= *table_list; // Needed to open table - tmp->table_list.db= tmp->thd.db; - tmp->table_list.alias= tmp->table_list.real_name=tmp->thd.query; + tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query; tmp->lock(); pthread_mutex_lock(&tmp->mutex); if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib, @@ -946,7 +1465,7 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) pthread_mutex_unlock(&tmp->mutex); tmp->unlock(); delete tmp; - net_printf(thd,ER_CANT_CREATE_THREAD,error); + my_error(ER_CANT_CREATE_THREAD, MYF(0), error); goto err1; } @@ -957,11 +1476,6 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) pthread_cond_wait(&tmp->cond_client,&tmp->mutex); } pthread_mutex_unlock(&tmp->mutex); - /* - Release the protection against the global read lock and wake - everyone, who might want to set a global read lock. - */ - start_waiting_global_read_lock(thd); thd->proc_info="got old table"; if (tmp->thd.killed) { @@ -985,7 +1499,7 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) } pthread_mutex_lock(&tmp->mutex); - table=tmp->get_local_table(thd); + table= tmp->get_local_table(thd); pthread_mutex_unlock(&tmp->mutex); if (table) thd->di=tmp; @@ -997,11 +1511,6 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) err1: thd->fatal_error(); - /* - Release the protection against the global read lock and wake - everyone, who might want to set a global read lock. - */ - start_waiting_global_read_lock(thd); err: pthread_mutex_unlock(&LOCK_delayed_create); DBUG_RETURN(0); // Continue with normal insert @@ -1020,6 +1529,7 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) my_ptrdiff_t adjust_ptrs; Field **field,**org_field, *found_next_number_field; TABLE *copy; + DBUG_ENTER("delayed_insert::get_local_table"); /* First request insert thread to get a lock */ status=1; @@ -1043,29 +1553,47 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) } } + /* + Allocate memory for the TABLE object, the field pointers array, and + one record buffer of reclength size. Normally a table has three + record buffers of rec_buff_length size, which includes alignment + bytes. Since the table copy is used for creating one record only, + the other record buffers and alignment are unnecessary. + */ client_thd->proc_info="allocating local table"; copy= (TABLE*) client_thd->alloc(sizeof(*copy)+ - (table->fields+1)*sizeof(Field**)+ - table->reclength); + (table->s->fields+1)*sizeof(Field**)+ + table->s->reclength); if (!copy) goto error; + + /* Copy the TABLE object. */ *copy= *table; - bzero((char*) ©->name_hash,sizeof(copy->name_hash)); // No name hashing + copy->s= ©->share_not_to_be_used; + // No name hashing + bzero((char*) ©->s->name_hash,sizeof(copy->s->name_hash)); /* We don't need to change the file handler here */ - field=copy->field=(Field**) (copy+1); - copy->record[0]=(byte*) (field+table->fields+1); - memcpy((char*) copy->record[0],(char*) table->record[0],table->reclength); + /* Assign the pointers for the field pointers array and the record. */ + field= copy->field= (Field**) (copy + 1); + copy->record[0]= (byte*) (field + table->s->fields + 1); + memcpy((char*) copy->record[0], (char*) table->record[0], + table->s->reclength); - /* Make a copy of all fields */ - - adjust_ptrs=PTR_BYTE_DIFF(copy->record[0],table->record[0]); + /* + Make a copy of all fields. + The copied fields need to point into the copied record. This is done + by copying the field objects with their old pointer values and then + "move" the pointers by the distance between the original and copied + records. That way we preserve the relative positions in the records. + */ + adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]); - found_next_number_field=table->found_next_number_field; - for (org_field=table->field ; *org_field ; org_field++,field++) + found_next_number_field= table->found_next_number_field; + for (org_field= table->field; *org_field; org_field++, field++) { - if (!(*field= (*org_field)->new_field(client_thd->mem_root,copy))) - return 0; + if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1))) + DBUG_RETURN(0); (*field)->orig_table= copy; // Remove connection (*field)->move_field(adjust_ptrs); // Point at copy->record[0] if (*org_field == found_next_number_field) @@ -1078,7 +1606,7 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) { /* Restore offset as this may have been reset in handle_inserts */ copy->timestamp_field= - (Field_timestamp*) copy->field[table->timestamp_field_offset]; + (Field_timestamp*) copy->field[table->s->timestamp_field_offset]; copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check; copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type(); } @@ -1088,22 +1616,25 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) /* Adjust in_use for pointing to client thread */ copy->in_use= client_thd; - - return copy; + + /* Adjust lock_count. This table object is not part of a lock. */ + copy->lock_count= 0; + + DBUG_RETURN(copy); /* Got fatal error */ error: tables_in_use--; status=1; pthread_cond_signal(&cond); // Inform thread about abort - return 0; + DBUG_RETURN(0); } /* Put a question in queue */ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, bool ignore, - char *query, uint query_length, int log_on) + char *query, uint query_length, bool log_on) { delayed_row *row=0; delayed_insert *di=thd->di; @@ -1120,13 +1651,13 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, bool igno if (!query) query_length=0; - if (!(row->record= (char*) my_malloc(table->reclength+query_length+1, + if (!(row->record= (char*) my_malloc(table->s->reclength+query_length+1, MYF(MY_WME)))) goto err; - memcpy(row->record,table->record[0],table->reclength); + memcpy(row->record, table->record[0], table->s->reclength); if (query_length) { - row->query=row->record+table->reclength; + row->query= row->record+table->s->reclength; memcpy(row->query,query,query_length+1); } row->query_length= query_length; @@ -1140,7 +1671,7 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, bool igno di->rows.push_back(row); di->stacked_inserts++; di->status=1; - if (table->blob_fields) + if (table->s->blob_fields) unlink_blobs(table); pthread_cond_signal(&di->cond); @@ -1183,7 +1714,7 @@ void kill_delayed_threads(void) { /* Ensure that the thread doesn't kill itself while we are looking at it */ pthread_mutex_lock(&tmp->mutex); - tmp->thd.killed=1; + tmp->thd.killed= THD::KILL_CONNECTION; if (tmp->thd.mysys_var) { pthread_mutex_lock(&tmp->thd.mysys_var->mutex); @@ -1211,7 +1742,7 @@ void kill_delayed_threads(void) * Create a new delayed insert thread */ -extern "C" pthread_handler_decl(handle_delayed_insert,arg) +pthread_handler_t handle_delayed_insert(void *arg) { delayed_insert *di=(delayed_insert*) arg; THD *thd= &di->thd; @@ -1222,7 +1753,7 @@ extern "C" pthread_handler_decl(handle_delayed_insert,arg) thd->thread_id=thread_id++; thd->end_time(); threads.append(thd); - thd->killed=abort_loop; + thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED; pthread_mutex_unlock(&LOCK_thread_count); /* @@ -1243,6 +1774,7 @@ extern "C" pthread_handler_decl(handle_delayed_insert,arg) #endif DBUG_ENTER("handle_delayed_insert"); + thd->thread_stack= (char*) &thd; if (init_thr_lock() || thd->store_globals()) { thd->fatal_error(); @@ -1265,7 +1797,7 @@ extern "C" pthread_handler_decl(handle_delayed_insert,arg) if (!(di->table->file->table_flags() & HA_CAN_INSERT_DELAYED)) { thd->fatal_error(); - my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.real_name); + my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name); goto end; } di->table->copy_blobs=1; @@ -1283,7 +1815,7 @@ extern "C" pthread_handler_decl(handle_delayed_insert,arg) for (;;) { - if (thd->killed) + if (thd->killed == THD::KILL_CONNECTION) { uint lock_count; /* @@ -1329,9 +1861,9 @@ extern "C" pthread_handler_decl(handle_delayed_insert,arg) #endif if (thd->killed || di->status) break; - if (error == ETIME || error == ETIMEDOUT) + if (error == ETIMEDOUT || error == ETIME) { - thd->killed=1; + thd->killed= THD::KILL_CONNECTION; break; } } @@ -1347,6 +1879,7 @@ extern "C" pthread_handler_decl(handle_delayed_insert,arg) if (di->tables_in_use && ! thd->lock) { + bool not_used; /* Request for new delayed insert. Lock the table, but avoid to be blocked by a global read lock. @@ -1358,10 +1891,12 @@ extern "C" pthread_handler_decl(handle_delayed_insert,arg) inserts are done. */ if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1, - MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK))) + MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK, + ¬_used))) { - di->dead= 1; // Some fatal error - thd->killed= 1; + /* Fatal error */ + di->dead= 1; + thd->killed= THD::KILL_CONNECTION; } pthread_cond_broadcast(&di->cond_client); } @@ -1369,8 +1904,9 @@ extern "C" pthread_handler_decl(handle_delayed_insert,arg) { if (di->handle_inserts()) { - di->dead= 1; // Some fatal error - thd->killed= 1; + /* Some fatal error */ + di->dead= 1; + thd->killed= THD::KILL_CONNECTION; } } di->status=0; @@ -1400,7 +1936,7 @@ end: close_thread_tables(thd); // Free the table di->table=0; di->dead= 1; // If error - thd->killed= 1; + thd->killed= THD::KILL_CONNECTION; // If error pthread_cond_broadcast(&di->cond_client); // Safety pthread_mutex_unlock(&di->mutex); @@ -1448,7 +1984,8 @@ bool delayed_insert::handle_inserts(void) { int error; ulong max_rows; - bool using_ignore=0, using_bin_log=mysql_bin_log.is_open(); + bool using_ignore= 0, using_opt_replace= 0; + bool using_bin_log= mysql_bin_log.is_open(); delayed_row *row; DBUG_ENTER("handle_inserts"); @@ -1461,15 +1998,15 @@ bool delayed_insert::handle_inserts(void) if (thr_upgrade_write_delay_lock(*thd.lock->locks)) { /* This can only happen if thread is killed by shutdown */ - sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->real_name); + sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->s->table_name); goto err; } thd.proc_info="insert"; max_rows= delayed_insert_limit; - if (thd.killed || table->version != refresh_version) + if (thd.killed || table->s->version != refresh_version) { - thd.killed=1; + thd.killed= THD::KILL_CONNECTION; max_rows= ~(ulong)0; // Do as much as possible } @@ -1481,11 +2018,19 @@ bool delayed_insert::handle_inserts(void) if (!using_bin_log) table->file->extra(HA_EXTRA_WRITE_CACHE); pthread_mutex_lock(&mutex); + + /* Reset auto-increment cacheing */ + if (thd.clear_next_insert_id) + { + thd.next_insert_id= 0; + thd.clear_next_insert_id= 0; + } + while ((row=rows.get())) { stacked_inserts--; pthread_mutex_unlock(&mutex); - memcpy(table->record[0],row->record,table->reclength); + memcpy(table->record[0],row->record,table->s->reclength); thd.start_time=row->start_time; thd.query_start_used=row->query_start_used; @@ -1502,8 +2047,15 @@ bool delayed_insert::handle_inserts(void) table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); using_ignore=1; } + if (info.handle_duplicates == DUP_REPLACE && + (!table->triggers || + !table->triggers->has_delete_triggers())) + { + table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + using_opt_replace= 1; + } thd.clear_error(); // reset error for binlog - if (write_record(table,&info)) + if (write_record(&thd, table, &info)) { info.error_count++; // Ignore errors thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status); @@ -1514,17 +2066,17 @@ bool delayed_insert::handle_inserts(void) using_ignore=0; table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); } - if (row->query) + if (using_opt_replace) { - if (row->log_query & DELAYED_LOG_UPDATE) - mysql_update_log.write(&thd,row->query, row->query_length); - if (row->log_query & DELAYED_LOG_BIN && using_bin_log) - { - Query_log_event qinfo(&thd, row->query, row->query_length,0, FALSE); - mysql_bin_log.write(&qinfo); - } + using_opt_replace= 0; + table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); + } + if (row->query && row->log_query && using_bin_log) + { + Query_log_event qinfo(&thd, row->query, row->query_length, 0, FALSE); + mysql_bin_log.write(&qinfo); } - if (table->blob_fields) + if (table->s->blob_fields) free_delayed_insert_blobs(table); thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status); thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status); @@ -1538,7 +2090,7 @@ bool delayed_insert::handle_inserts(void) on this table until all entries has been processed */ if (group_count++ >= max_rows && (row= rows.head()) && - (!(row->log_query & DELAYED_LOG_BIN && using_bin_log) || + (!(row->log_query & using_bin_log) || row->query)) { group_count=0; @@ -1559,7 +2111,7 @@ bool delayed_insert::handle_inserts(void) if (thr_reschedule_write_lock(*thd.lock->locks)) { /* This should never happen */ - sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->real_name); + sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->s->table_name); } if (!using_bin_log) table->file->extra(HA_EXTRA_WRITE_CACHE); @@ -1602,13 +2154,83 @@ bool delayed_insert::handle_inserts(void) Store records in INSERT ... SELECT * ***************************************************************************/ + +/* + make insert specific preparation and checks after opening tables + + SYNOPSIS + mysql_insert_select_prepare() + thd thread handler + + RETURN + FALSE OK + TRUE Error +*/ + +bool mysql_insert_select_prepare(THD *thd) +{ + LEX *lex= thd->lex; + SELECT_LEX *select_lex= &lex->select_lex; + TABLE_LIST *first_select_leaf_table; + DBUG_ENTER("mysql_insert_select_prepare"); + + /* + SELECT_LEX do not belong to INSERT statement, so we can't add WHERE + clause if table is VIEW + */ + + if (mysql_prepare_insert(thd, lex->query_tables, + lex->query_tables->table, lex->field_list, 0, + lex->update_list, lex->value_list, + lex->duplicates, + &select_lex->where, TRUE)) + DBUG_RETURN(TRUE); + + /* + exclude first table from leaf tables list, because it belong to + INSERT + */ + DBUG_ASSERT(select_lex->leaf_tables != 0); + lex->leaf_tables_insert= select_lex->leaf_tables; + /* skip all leaf tables belonged to view where we are insert */ + for (first_select_leaf_table= select_lex->leaf_tables->next_leaf; + first_select_leaf_table && + first_select_leaf_table->belong_to_view && + first_select_leaf_table->belong_to_view == + lex->leaf_tables_insert->belong_to_view; + first_select_leaf_table= first_select_leaf_table->next_leaf) + {} + select_lex->leaf_tables= first_select_leaf_table; + DBUG_RETURN(FALSE); +} + + +select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par, + List<Item> *fields_par, + List<Item> *update_fields, + List<Item> *update_values, + enum_duplicates duplic, + bool ignore_check_option_errors) + :table_list(table_list_par), table(table_par), fields(fields_par), + last_insert_id(0), + insert_into_view(table_list_par && table_list_par->view != 0) +{ + bzero((char*) &info,sizeof(info)); + info.handle_duplicates= duplic; + info.ignore= ignore_check_option_errors; + info.update_fields= update_fields; + info.update_values= update_values; + if (table_list_par) + info.view= (table_list_par->view ? table_list_par : 0); +} + + int select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) { - int res; LEX *lex= thd->lex; + int res; SELECT_LEX *lex_current_select_save= lex->current_select; - bool lex_select_no_error= lex->select_lex.no_error; DBUG_ENTER("select_insert::prepare"); unit= u; @@ -1616,29 +2238,129 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) Since table in which we are going to insert is added to the first select, LEX::current_select should point to the first select while we are fixing fields from insert list. - Since these checks may cause the query to fail, we don't want the - error messages to be converted into warnings, must force no_error=0 */ lex->current_select= &lex->select_lex; - lex->select_lex.no_error= 0; - res= - check_insert_fields(thd, table, *fields, values) || - setup_fields(thd, 0, insert_table_list, values, 0, 0, 0) || - (info.handle_duplicates == DUP_UPDATE && - (check_update_fields(thd, table, insert_table_list, *info.update_fields) || - setup_fields(thd, 0, dup_table_list, *info.update_values, 1, 0, 0))); + res= check_insert_fields(thd, table_list, *fields, values, + !insert_into_view) || + setup_fields(thd, 0, values, 0, 0, 0); + + if (info.handle_duplicates == DUP_UPDATE) + { + /* Save the state of the current name resolution context. */ + Name_resolution_context *context= &lex->select_lex.context; + Name_resolution_context_state ctx_state; + + /* Save the state of the current name resolution context. */ + ctx_state.save_state(context, table_list); + + /* Perform name resolution only in the first table - 'table_list'. */ + table_list->next_local= 0; + context->resolve_in_table_list_only(table_list); + + lex->select_lex.no_wrap_view_item= TRUE; + res= res || check_update_fields(thd, context->table_list, + *info.update_fields); + lex->select_lex.no_wrap_view_item= FALSE; + /* + When we are not using GROUP BY we can refer to other tables in the + ON DUPLICATE KEY part + */ + if (lex->select_lex.group_list.elements == 0) + { + context->table_list->next_local= ctx_state.save_next_local; + /* first_name_resolution_table was set by resolve_in_table_list_only() */ + context->first_name_resolution_table-> + next_name_resolution_table= ctx_state.save_next_local; + } + res= res || setup_fields(thd, 0, *info.update_values, 1, 0, 0); + + /* Restore the current context. */ + ctx_state.restore_state(context, table_list); + } + lex->current_select= lex_current_select_save; - lex->select_lex.no_error= lex_select_no_error; if (res) DBUG_RETURN(1); + /* + if it is INSERT into join view then check_insert_fields already found + real table for insert + */ + table= table_list->table; - restore_record(table,default_values); // Get empty record + /* + Is table which we are changing used somewhere in other parts of + query + */ + if (!(lex->current_select->options & OPTION_BUFFER_RESULT) && + unique_table(thd, table_list, table_list->next_global)) + { + /* Using same table for INSERT and SELECT */ + lex->current_select->options|= OPTION_BUFFER_RESULT; + lex->current_select->join->select_options|= OPTION_BUFFER_RESULT; + } + else if (!thd->prelocked_mode) + { + /* + We must not yet prepare the result table if it is the same as one of the + source tables (INSERT SELECT). The preparation may disable + indexes on the result table, which may be used during the select, if it + is the same table (Bug #6034). Do the preparation after the select phase + in select_insert::prepare2(). + We won't start bulk inserts at all if this statement uses functions or + should invoke triggers since they may access to the same table too. + */ + table->file->start_bulk_insert((ha_rows) 0); + } + restore_record(table,s->default_values); // Get empty record table->next_number_field=table->found_next_number_field; thd->cuted_fields=0; - if (info.ignore || - info.handle_duplicates != DUP_ERROR) + if (info.ignore || info.handle_duplicates != DUP_ERROR) table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); - table->file->start_bulk_insert((ha_rows) 0); + if (info.handle_duplicates == DUP_REPLACE) + { + if (!table->triggers || !table->triggers->has_delete_triggers()) + table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS); + } + thd->no_trans_update= 0; + thd->abort_on_warning= (!info.ignore && + (thd->variables.sql_mode & + (MODE_STRICT_TRANS_TABLES | + MODE_STRICT_ALL_TABLES))); + res= ((fields->elements && + check_that_all_fields_are_given_values(thd, table, table_list)) || + table_list->prepare_where(thd, 0, TRUE) || + table_list->prepare_check_option(thd)); + + if (!res) + mark_fields_used_by_triggers_for_insert_stmt(thd, table, + info.handle_duplicates); + DBUG_RETURN(res); +} + + +/* + Finish the preparation of the result table. + + SYNOPSIS + select_insert::prepare2() + void + + DESCRIPTION + If the result table is the same as one of the source tables (INSERT SELECT), + the result table is not finally prepared at the join prepair phase. + Do the final preparation now. + + RETURN + 0 OK +*/ + +int select_insert::prepare2(void) +{ + DBUG_ENTER("select_insert::prepare2"); + if (thd->lex->current_select->options & OPTION_BUFFER_RESULT && + !thd->prelocked_mode) + table->file->start_bulk_insert((ha_rows) 0); DBUG_RETURN(0); } @@ -1651,12 +2373,15 @@ void select_insert::cleanup() select_insert::~select_insert() { + DBUG_ENTER("~select_insert"); if (table) { table->next_number_field=0; table->file->reset(); } thd->count_cuted_fields= CHECK_FIELD_IGNORE; + thd->abort_on_warning= 0; + DBUG_VOID_RETURN; } @@ -1669,25 +2394,44 @@ bool select_insert::send_data(List<Item> &values) unit->offset_limit_cnt--; DBUG_RETURN(0); } - thd->count_cuted_fields= CHECK_FIELD_WARN; // calc cuted fields + + thd->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields store_values(values); - error=thd->net.report_error || write_record(table,&info); thd->count_cuted_fields= CHECK_FIELD_IGNORE; - - if (!error) + if (thd->net.report_error) + DBUG_RETURN(1); + if (table_list) // Not CREATE ... SELECT { - /* - Restore fields of the record since it is possible that they were - changed by ON DUPLICATE KEY UPDATE clause. - */ - if (info.handle_duplicates == DUP_UPDATE) - restore_record(table, default_values); - - if (table->next_number_field) // Clear for next record + switch (table_list->view_check_option(thd, info.ignore)) { + case VIEW_CHECK_SKIP: + DBUG_RETURN(0); + case VIEW_CHECK_ERROR: + DBUG_RETURN(1); + } + } + if (!(error= write_record(thd, table, &info))) + { + if (table->triggers || info.handle_duplicates == DUP_UPDATE) { + /* + Restore fields of the record since it is possible that they were + changed by ON DUPLICATE KEY UPDATE clause. + + If triggers exist then whey can modify some fields which were not + originally touched by INSERT ... SELECT, so we have to restore + their original values for the next row. + */ + restore_record(table, s->default_values); + } + if (table->next_number_field) + { + /* + Clear auto-increment field for the next record, if triggers are used + we will clear it twice, but this should be cheap. + */ table->next_number_field->reset(); - if (! last_insert_id && thd->insert_id_used) - last_insert_id=thd->insert_id(); + if (!last_insert_id && thd->insert_id_used) + last_insert_id= thd->insert_id(); } } DBUG_RETURN(error); @@ -1697,17 +2441,18 @@ bool select_insert::send_data(List<Item> &values) void select_insert::store_values(List<Item> &values) { if (fields->elements) - fill_record(*fields, values, 1); + fill_record_n_invoke_before_triggers(thd, *fields, values, 1, + table->triggers, TRG_EVENT_INSERT); else - fill_record(table->field, values, 1); + fill_record_n_invoke_before_triggers(thd, table->field, values, 1, + table->triggers, TRG_EVENT_INSERT); } void select_insert::send_error(uint errcode,const char *err) { DBUG_ENTER("select_insert::send_error"); - /* TODO error should be sent at the query processing end */ - ::send_error(thd,errcode,err); + my_message(errcode, err, MYF(0)); if (!table) { @@ -1717,7 +2462,8 @@ void select_insert::send_error(uint errcode,const char *err) */ DBUG_VOID_RETURN; } - table->file->end_bulk_insert(); + if (!thd->prelocked_mode) + table->file->end_bulk_insert(); /* If at least one row has been inserted/modified and will stay in the table (the table doesn't have transactions) (example: we got a duplicate key @@ -1729,18 +2475,19 @@ void select_insert::send_error(uint errcode,const char *err) { if (last_insert_id) thd->insert_id(last_insert_id); // For binary log - mysql_update_log.write(thd,thd->query,thd->query_length); if (mysql_bin_log.is_open()) { Query_log_event qinfo(thd, thd->query, thd->query_length, table->file->has_transactions(), FALSE); mysql_bin_log.write(&qinfo); } - if (!table->tmp_table) + if (!table->s->tmp_table) thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; } if (info.copied || info.deleted || info.updated) + { query_cache_invalidate3(thd, table, 1); + } ha_rollback_stmt(thd); DBUG_VOID_RETURN; } @@ -1751,25 +2498,25 @@ bool select_insert::send_eof() int error,error2; DBUG_ENTER("select_insert::send_eof"); - error=table->file->end_bulk_insert(); + error= (!thd->prelocked_mode) ? table->file->end_bulk_insert():0; table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); /* We must invalidate the table in the query cache before binlog writing - and ha_autocommit_... + and ha_autocommit_or_rollback */ if (info.copied || info.deleted || info.updated) { query_cache_invalidate3(thd, table, 1); - if (!(table->file->has_transactions() || table->tmp_table)) + if (!(table->file->has_transactions() || table->s->tmp_table)) thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; } if (last_insert_id) thd->insert_id(last_insert_id); // For binary log /* Write to binlog before commiting transaction */ - mysql_update_log.write(thd,thd->query,thd->query_length); if (mysql_bin_log.is_open()) { if (!error) @@ -1783,8 +2530,6 @@ bool select_insert::send_eof() if (error) { table->file->print_error(error,MYF(0)); - //TODO error should be sent at the query processing end - ::send_error(thd); DBUG_RETURN(1); } char buff[160]; @@ -1794,7 +2539,8 @@ bool select_insert::send_eof() else sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records, (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields); - ::send_ok(thd,info.copied+info.deleted+info.updated,last_insert_id,buff); + thd->row_count_func= info.copied+info.deleted+info.updated; + ::send_ok(thd, (ulong) thd->row_count_func, last_insert_id, buff); DBUG_RETURN(0); } @@ -1803,46 +2549,209 @@ bool select_insert::send_eof() CREATE TABLE (SELECT) ... ***************************************************************************/ +/* + Create table from lists of fields and items (or open existing table + with same name). + + SYNOPSIS + create_table_from_items() + thd in Thread object + create_info in Create information (like MAX_ROWS, ENGINE or + temporary table flag) + create_table in Pointer to TABLE_LIST object providing database + and name for table to be created or to be open + extra_fields in/out Initial list of fields for table to be created + keys in List of keys for table to be created + items in List of items which should be used to produce rest + of fields for the table (corresponding fields will + be added to the end of 'extra_fields' list) + lock out Pointer to the MYSQL_LOCK object for table created + (open) will be returned in this parameter. Since + this table is not included in THD::lock caller is + responsible for explicitly unlocking this table. + + NOTES + If 'create_info->options' bitmask has HA_LEX_CREATE_IF_NOT_EXISTS + flag and table with name provided already exists then this function will + simply open existing table. + Also note that create, open and lock sequence in this function is not + atomic and thus contains gap for deadlock and can cause other troubles. + Since this function contains some logic specific to CREATE TABLE ... SELECT + it should be changed before it can be used in other contexts. + + RETURN VALUES + non-zero Pointer to TABLE object for table created or opened + 0 Error +*/ + +static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info, + TABLE_LIST *create_table, + List<create_field> *extra_fields, + List<Key> *keys, List<Item> *items, + MYSQL_LOCK **lock) +{ + TABLE tmp_table; // Used during 'create_field()' + TABLE *table= 0; + uint select_field_count= items->elements; + /* Add selected items to field list */ + List_iterator_fast<Item> it(*items); + Item *item; + Field *tmp_field; + bool not_used; + DBUG_ENTER("create_table_from_items"); + + tmp_table.alias= 0; + tmp_table.timestamp_field= 0; + tmp_table.s= &tmp_table.share_not_to_be_used; + tmp_table.s->db_create_options=0; + tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr; + tmp_table.s->db_low_byte_first= test(create_info->db_type == DB_TYPE_MYISAM || + create_info->db_type == DB_TYPE_HEAP); + tmp_table.null_row=tmp_table.maybe_null=0; + + while ((item=it++)) + { + create_field *cr_field; + Field *field, *def_field; + if (item->type() == Item::FUNC_ITEM) + field= item->tmp_table_field(&tmp_table); + else + field= create_tmp_field(thd, &tmp_table, item, item->type(), + (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0, + 0); + if (!field || + !(cr_field=new create_field(field,(item->type() == Item::FIELD_ITEM ? + ((Item_field *)item)->field : + (Field*) 0)))) + DBUG_RETURN(0); + if (item->maybe_null) + cr_field->flags &= ~NOT_NULL_FLAG; + extra_fields->push_back(cr_field); + } + /* + create and lock table + + We don't log the statement, it will be logged later. + + If this is a HEAP table, the automatic DELETE FROM which is written to the + binlog when a HEAP table is opened for the first time since startup, must + not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we + don't want to delete from it) 2) it would be written before the CREATE + TABLE, which is a wrong order. So we keep binary logging disabled when we + open_table(). + NOTE: By locking table which we just have created (or for which we just have + have found that it already exists) separately from other tables used by the + statement we create potential window for deadlock. + TODO: create and open should be done atomic ! + */ + { + tmp_disable_binlog(thd); + if (!mysql_create_table(thd, create_table->db, create_table->table_name, + create_info, *extra_fields, *keys, 0, + select_field_count)) + { + /* + If we are here in prelocked mode we either create temporary table + or prelocked mode is caused by the SELECT part of this statement. + */ + DBUG_ASSERT(!thd->prelocked_mode || + create_info->options & HA_LEX_CREATE_TMP_TABLE || + thd->lex->requires_prelocking()); + + /* + NOTE: We don't want to ignore set of locked tables here if we are + under explicit LOCK TABLES since it will open gap for deadlock + too wide (and also is not backward compatible). + */ + if (! (table= open_table(thd, create_table, thd->mem_root, (bool*) 0, + (MYSQL_LOCK_IGNORE_FLUSH | + ((thd->prelocked_mode == PRELOCKED) ? + MYSQL_OPEN_IGNORE_LOCKED_TABLES:0))))) + quick_rm_table(create_info->db_type, create_table->db, + table_case_name(create_info, create_table->table_name)); + } + reenable_binlog(thd); + if (!table) // open failed + DBUG_RETURN(0); + } + + /* + FIXME: What happens if trigger manages to be created while we are + obtaining this lock ? May be it is sensible just to disable + trigger execution in this case ? Or will MYSQL_LOCK_IGNORE_FLUSH + save us from that ? + */ + table->reginfo.lock_type=TL_WRITE; + if (! ((*lock)= mysql_lock_tables(thd, &table, 1, + MYSQL_LOCK_IGNORE_FLUSH, ¬_used))) + { + VOID(pthread_mutex_lock(&LOCK_open)); + hash_delete(&open_cache,(byte*) table); + VOID(pthread_mutex_unlock(&LOCK_open)); + quick_rm_table(create_info->db_type, create_table->db, + table_case_name(create_info, create_table->table_name)); + DBUG_RETURN(0); + } + table->file->extra(HA_EXTRA_WRITE_CACHE); + DBUG_RETURN(table); +} + + int select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) { DBUG_ENTER("select_create::prepare"); unit= u; - table= create_table_from_items(thd, create_info, db, name, + table= create_table_from_items(thd, create_info, create_table, extra_fields, keys, &values, &lock); if (!table) DBUG_RETURN(-1); // abort() deletes table - if (table->fields < values.elements) + if (table->s->fields < values.elements) { - my_printf_error(ER_WRONG_VALUE_COUNT_ON_ROW, - ER(ER_WRONG_VALUE_COUNT_ON_ROW), - MYF(0),1); + my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1); DBUG_RETURN(-1); } /* First field to copy */ - field=table->field+table->fields - values.elements; + field= table->field+table->s->fields - values.elements; + + /* Mark all fields that are given values */ + for (Field **f= field ; *f ; f++) + (*f)->query_id= thd->query_id; /* Don't set timestamp if used */ table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; table->next_number_field=table->found_next_number_field; - restore_record(table,default_values); // Get empty record + restore_record(table,s->default_values); // Get empty record thd->cuted_fields=0; - if (info.ignore || - info.handle_duplicates != DUP_ERROR) + if (info.ignore || info.handle_duplicates != DUP_ERROR) table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); - table->file->start_bulk_insert((ha_rows) 0); - DBUG_RETURN(0); + if (info.handle_duplicates == DUP_REPLACE) + { + if (!table->triggers || !table->triggers->has_delete_triggers()) + table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS); + } + if (!thd->prelocked_mode) + table->file->start_bulk_insert((ha_rows) 0); + thd->no_trans_update= 0; + thd->abort_on_warning= (!info.ignore && + (thd->variables.sql_mode & + (MODE_STRICT_TRANS_TABLES | + MODE_STRICT_ALL_TABLES))); + DBUG_RETURN(check_that_all_fields_are_given_values(thd, table, + table_list)); } void select_create::store_values(List<Item> &values) { - fill_record(field, values, 1); + fill_record_n_invoke_before_triggers(thd, field, values, 1, + table->triggers, TRG_EVENT_INSERT); } @@ -1866,6 +2775,7 @@ bool select_create::send_eof() else { table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); VOID(pthread_mutex_lock(&LOCK_open)); mysql_unlock_tables(thd, lock); /* @@ -1873,13 +2783,13 @@ bool select_create::send_eof() Check if we can remove the following two rows. We should be able to just keep the table in the table cache. */ - if (!table->tmp_table) + if (!table->s->tmp_table) { - ulong version= table->version; + ulong version= table->s->version; hash_delete(&open_cache,(byte*) table); /* Tell threads waiting for refresh that something has happened */ if (version != refresh_version) - VOID(pthread_cond_broadcast(&COND_refresh)); + broadcast_refresh(); } lock=0; table=0; @@ -1899,19 +2809,20 @@ void select_create::abort() if (table) { table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); - enum db_type table_type=table->db_type; - if (!table->tmp_table) + table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); + enum db_type table_type=table->s->db_type; + if (!table->s->tmp_table) { - ulong version= table->version; + ulong version= table->s->version; hash_delete(&open_cache,(byte*) table); if (!create_info->table_existed) - quick_rm_table(table_type, db, name); + quick_rm_table(table_type, create_table->db, create_table->table_name); /* Tell threads waiting for refresh that something has happened */ if (version != refresh_version) - VOID(pthread_cond_broadcast(&COND_refresh)); + broadcast_refresh(); } else if (!create_info->table_existed) - close_temporary_table(thd, db, name); + close_temporary_table(thd, create_table->db, create_table->table_name); table=0; } VOID(pthread_mutex_unlock(&LOCK_open)); @@ -1922,11 +2833,11 @@ void select_create::abort() Instansiate templates *****************************************************************************/ -#ifdef __GNUC__ +#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION template class List_iterator_fast<List_item>; #ifndef EMBEDDED_LIBRARY template class I_List<delayed_insert>; template class I_List_iterator<delayed_insert>; template class I_List<delayed_row>; #endif /* EMBEDDED_LIBRARY */ -#endif /* __GNUC__ */ +#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */ |