diff options
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r-- | sql/sql_insert.cc | 135 |
1 files changed, 91 insertions, 44 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 010ebc3b798..ced85159a53 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -21,12 +21,14 @@ #include "sql_acl.h" static int check_null_fields(THD *thd,TABLE *entry); +#ifndef EMBEDDED_LIBRARY static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list); static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, char *query, uint query_length, bool log_on); static void end_delayed_insert(THD *thd); extern "C" pthread_handler_decl(handle_delayed_insert,arg); static void unlink_blobs(register TABLE *table); +#endif /* Define to force use of my_malloc() if the allocated memory block is big */ @@ -39,9 +41,9 @@ static void unlink_blobs(register TABLE *table); #endif /* - Check if insert fields are correct - Updates table->time_stamp to point to timestamp field or 0, depending on - if timestamp should be updated or not. + Check if insert fields are correct. + Sets table->timestamp_default_now/on_update_now to 0 o leaves it to point + to timestamp field, depending on if timestamp should be updated or not. */ int @@ -62,7 +64,7 @@ check_insert_fields(THD *thd,TABLE *table,List<Item> &fields, check_grant_all_columns(thd,INSERT_ACL,table)) return -1; #endif - table->time_stamp=0; // This is saved by caller + table->timestamp_default_now= table->timestamp_on_update_now= 0; } else { // Part field list @@ -84,15 +86,15 @@ check_insert_fields(THD *thd,TABLE *table,List<Item> &fields, if (setup_tables(&table_list) || setup_fields(thd, 0, &table_list,fields,1,0,0)) return -1; + if (thd->dupp_field) { my_error(ER_FIELD_SPECIFIED_TWICE,MYF(0), thd->dupp_field->field_name); return -1; } - table->time_stamp=0; if (table->timestamp_field && // Don't set timestamp if used - table->timestamp_field->query_id != thd->query_id) - table->time_stamp= table->timestamp_field->offset()+1; + table->timestamp_field->query_id == thd->query_id) + table->timestamp_default_now= table->timestamp_on_update_now= 0; } // For the values we need select_priv #ifndef NO_EMBEDDED_ACCESS_CHECKS @@ -109,7 +111,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, List<Item> &update_values, enum_duplicates duplic) { - int error; + int error, res; /* log_on is about delayed inserts only. By default, both logs are enabled (this won't cause problems if the server @@ -124,7 +126,9 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, TABLE *table; List_iterator_fast<List_item> its(values_list); List_item *values; - char *query=thd->query; +#ifndef EMBEDDED_LIBRARY + char *query= thd->query; +#endif thr_lock_type lock_type = table_list->lock_type; TABLE_LIST *insert_table_list= (TABLE_LIST*) thd->lex->select_lex.table_list.first; @@ -135,15 +139,20 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, if we are told to replace duplicates, the insert cannot be concurrent delayed insert changed to regular in slave thread */ +#ifdef EMBEDDED_LIBRARY + if (lock_type == TL_WRITE_DELAYED) + lock_type=TL_WRITE; +#else if ((lock_type == TL_WRITE_DELAYED && ((specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) || - thd->slave_thread || !max_insert_delayed_threads)) || + thd->slave_thread || !thd->variables.max_insert_delayed_threads)) || (lock_type == TL_WRITE_CONCURRENT_INSERT && duplic == DUP_REPLACE) || (duplic == DUP_UPDATE)) lock_type=TL_WRITE; +#endif table_list->lock_type= lock_type; - int res; +#ifndef EMBEDDED_LIBRARY if (lock_type == TL_WRITE_DELAYED) { if (thd->locked_tables) @@ -172,10 +181,10 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, } } else +#endif /* EMBEDDED_LIBRARY */ res= open_and_lock_tables(thd, table_list); if (res) DBUG_RETURN(-1); - fix_tables_pointers(thd->lex->all_selects_list); table= table_list->table; thd->proc_info="init"; @@ -185,7 +194,8 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, if (duplic == DUP_UPDATE && !table->insert_values) { /* it should be allocated before Item::fix_fields() */ - table->insert_values=(byte *)alloc_root(&table->mem_root, table->rec_buff_length); + table->insert_values= + (byte *)alloc_root(&thd->mem_root, table->rec_buff_length); if (!table->insert_values) goto abort; } @@ -223,7 +233,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, Fill in the given fields and dump it to the table file */ - info.records=info.deleted=info.copied=0; + info.records= info.deleted= info.copied= info.updated= 0; info.handle_duplicates=duplic; info.update_fields=&update_fields; info.update_values=&update_values; @@ -289,12 +299,14 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, break; } } +#ifndef EMBEDDED_LIBRARY if (lock_type == TL_WRITE_DELAYED) { error=write_delayed(thd,table,duplic,query, thd->query_length, log_on); query=0; } else +#endif error=write_record(table,&info); if (error) break; @@ -315,6 +327,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, Now all rows are inserted. Time to update logs and sends response to user */ +#ifndef EMBEDDED_LIBRARY if (lock_type == TL_WRITE_DELAYED) { if (!error) @@ -326,6 +339,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, query_cache_invalidate3(thd, table_list, 1); } else +#endif { if (bulk_insert) { @@ -356,7 +370,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, For the transactional algorithm to work the invalidation must be before binlog writing and ha_autocommit_... */ - if (info.copied || info.deleted) + if (info.copied || info.deleted || info.updated) { query_cache_invalidate3(thd, table_list, 1); } @@ -364,7 +378,8 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, transactional_table= table->file->has_transactions(); log_delayed= (transactional_table || table->tmp_table); - if ((info.copied || info.deleted) && (error <= 0 || !transactional_table)) + if ((info.copied || info.deleted || info.updated) && + (error <= 0 || !transactional_table)) { if (mysql_bin_log.is_open()) { @@ -404,7 +419,10 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, goto abort; if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) || !thd->cuted_fields)) - send_ok(thd,info.copied+info.deleted,id); + { + thd->row_count_func= info.copied+info.deleted+info.updated; + send_ok(thd, thd->row_count_func, id); + } else { char buff[160]; @@ -414,16 +432,19 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, (ulong) (info.records - info.copied), (ulong) thd->cuted_fields); else sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records, - (ulong) info.deleted, (ulong) thd->cuted_fields); - ::send_ok(thd,info.copied+info.deleted,(ulonglong)id,buff); + (ulong) info.deleted+info.updated, (ulong) thd->cuted_fields); + thd->row_count_func= info.copied+info.deleted+info.updated; + ::send_ok(thd, thd->row_count_func, (ulonglong)id,buff); } free_underlaid_joins(thd, &thd->lex->select_lex); table->insert_values=0; DBUG_RETURN(0); abort: +#ifndef EMBEDDED_LIBRARY if (lock_type == TL_WRITE_DELAYED) end_delayed_insert(thd); +#endif free_underlaid_joins(thd, &thd->lex->select_lex); table->insert_values=0; DBUG_RETURN(-1); @@ -517,12 +538,22 @@ int write_record(TABLE *table,COPY_INFO *info) goto err; if ((error=table->file->update_row(table->record[1],table->record[0]))) goto err; - info->deleted++; + info->updated++; break; } else /* DUP_REPLACE */ { - if (last_uniq_key(table,key_nr)) + /* + The manual defines the REPLACE semantics that it is either + an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in + InnoDB do not function in the defined way if we allow MySQL + to convert the latter operation internally to an UPDATE. + We also should not perform this conversion if we have + timestamp field with ON UPDATE which is different from DEFAULT. + */ + if (last_uniq_key(table,key_nr) && + !table->file->referenced_by_foreign_key() && + table->timestamp_default_now == table->timestamp_on_update_now) { if ((error=table->file->update_row(table->record[1], table->record[0]))) @@ -588,6 +619,8 @@ static int check_null_fields(THD *thd __attribute__((unused)), A thread is created for each table that one uses with the DELAYED attribute. *****************************************************************************/ +#ifndef EMBEDDED_LIBRARY + class delayed_row :public ilink { public: char *record,*query; @@ -595,7 +628,8 @@ public: time_t start_time; bool query_start_used,last_insert_id_used,insert_id_used, log_query; ulonglong last_insert_id; - ulong time_stamp; + ulong timestamp_default_now; + ulong timestamp_on_update_now; uint query_length; delayed_row(enum_duplicates dup_arg, bool log_query_arg) @@ -633,7 +667,8 @@ public: thd.command=COM_DELAYED_INSERT; thd.lex->current_select= 0; /* for my_message_sql */ - bzero((char*) &thd.net,sizeof(thd.net)); // Safety + bzero((char*) &thd.net, sizeof(thd.net)); // Safety + bzero((char*) &table_list, sizeof(table_list)); // Safety thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT; thd.host_or_ip= ""; bzero((char*) &info,sizeof(info)); @@ -728,7 +763,7 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) if (!(tmp=find_handler(thd,table_list))) { /* Don't create more than max_insert_delayed_threads */ - if (delayed_insert_threads >= max_insert_delayed_threads) + if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads) DBUG_RETURN(0); thd->proc_info="Creating delayed handler"; pthread_mutex_lock(&LOCK_delayed_create); @@ -874,6 +909,7 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) { if (!(*field= (*org_field)->new_field(&client_thd->mem_root,copy))) return 0; + (*field)->orig_table= copy; // Remove connection (*field)->move_field(adjust_ptrs); // Point at copy->record[0] if (*org_field == found_next_number_field) (*field)->table->found_next_number_field= *field; @@ -884,9 +920,10 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) if (table->timestamp_field) { /* Restore offset as this may have been reset in handle_inserts */ - copy->time_stamp=table->timestamp_field->offset()+1; copy->timestamp_field= (Field_timestamp*) copy->field[table->timestamp_field_offset]; + copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check; + copy->timestamp_field->set_timestamp_offsets(); } /* _rowid is not used with delayed insert */ @@ -937,7 +974,8 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, row->last_insert_id_used= thd->last_insert_id_used; row->insert_id_used= thd->insert_id_used; row->last_insert_id= thd->last_insert_id; - row->time_stamp= table->time_stamp; + row->timestamp_default_now= table->timestamp_default_now; + row->timestamp_on_update_now= table->timestamp_on_update_now; di->rows.push_back(row); di->stacked_inserts++; @@ -1276,7 +1314,8 @@ bool delayed_insert::handle_inserts(void) thd.last_insert_id=row->last_insert_id; thd.last_insert_id_used=row->last_insert_id_used; thd.insert_id_used=row->insert_id_used; - table->time_stamp=row->time_stamp; + table->timestamp_default_now= row->timestamp_default_now; + table->timestamp_on_update_now= row->timestamp_on_update_now; info.handle_duplicates= row->dup; if (info.handle_duplicates == DUP_IGNORE || @@ -1309,8 +1348,15 @@ bool delayed_insert::handle_inserts(void) pthread_mutex_lock(&mutex); delete row; - /* Let READ clients do something once in a while */ - if (group_count++ == max_rows) + /* + Let READ clients do something once in a while + We should however not break in the middle of a multi-line insert + if we have binary logging enabled as we don't want other commands + on this table until all entries has been processed + */ + if (group_count++ >= max_rows && (row= rows.head()) && + (!(row->log_query & using_bin_log) || + row->query)) { group_count=0; if (stacked_inserts || tables_in_use) // Let these wait a while @@ -1367,8 +1413,7 @@ bool delayed_insert::handle_inserts(void) pthread_mutex_lock(&mutex); DBUG_RETURN(1); } - - +#endif /* EMBEDDED_LIBRARY */ /*************************************************************************** Store records in INSERT ... SELECT * @@ -1453,7 +1498,8 @@ void select_insert::send_error(uint errcode,const char *err) error while inserting into a MyISAM table) we must write to the binlog (and the error code will make the slave stop). */ - if ((info.copied || info.deleted) && !table->file->has_transactions()) + if ((info.copied || info.deleted || info.updated) && + !table->file->has_transactions()) { if (last_insert_id) thd->insert_id(last_insert_id); // For binary log @@ -1466,7 +1512,7 @@ void select_insert::send_error(uint errcode,const char *err) if (!table->tmp_table) thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; } - if (info.copied || info.deleted) + if (info.copied || info.deleted || info.updated) { query_cache_invalidate3(thd, table, 1); } @@ -1489,7 +1535,7 @@ bool select_insert::send_eof() and ha_autocommit_... */ - if (info.copied || info.deleted) + if (info.copied || info.deleted || info.updated) { query_cache_invalidate3(thd, table, 1); if (!(table->file->has_transactions() || table->tmp_table)) @@ -1522,8 +1568,9 @@ bool select_insert::send_eof() (ulong) (info.records - info.copied), (ulong) thd->cuted_fields); else sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records, - (ulong) info.deleted, (ulong) thd->cuted_fields); - ::send_ok(thd,info.copied+info.deleted,last_insert_id,buff); + (ulong) info.deleted+info.updated, (ulong) thd->cuted_fields); + thd->row_count_func= info.copied+info.deleted+info.updated; + ::send_ok(thd, thd->row_count_func, last_insert_id, buff); DBUG_RETURN(0); } @@ -1538,8 +1585,8 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) DBUG_ENTER("select_create::prepare"); unit= u; - table=create_table_from_items(thd, create_info, db, name, - extra_fields, keys, &values, &lock); + table= create_table_from_items(thd, create_info, db, name, + extra_fields, keys, &values, &lock); if (!table) DBUG_RETURN(-1); // abort() deletes table @@ -1554,11 +1601,9 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) /* First field to copy */ field=table->field+table->fields - values.elements; - if (table->timestamp_field) // Don't set timestamp if used - { - table->timestamp_field->set_time(); - table->time_stamp=0; // This should be saved - } + /* Don't set timestamp if used */ + table->timestamp_default_now= table->timestamp_on_update_now= 0; + table->next_number_field=table->found_next_number_field; restore_record(table,default_values); // Get empty record @@ -1644,7 +1689,9 @@ void select_create::abort() #ifdef __GNUC__ template class List_iterator_fast<List_item>; +#ifndef EMBEDDED_LIBRARY template class I_List<delayed_insert>; template class I_List_iterator<delayed_insert>; template class I_List<delayed_row>; -#endif +#endif /* EMBEDDED_LIBRARY */ +#endif /* __GNUC__ */ |