summaryrefslogtreecommitdiff
path: root/sql/sql_insert.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r--sql/sql_insert.cc135
1 files changed, 91 insertions, 44 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 010ebc3b798..ced85159a53 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -21,12 +21,14 @@
#include "sql_acl.h"
static int check_null_fields(THD *thd,TABLE *entry);
+#ifndef EMBEDDED_LIBRARY
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup,
char *query, uint query_length, bool log_on);
static void end_delayed_insert(THD *thd);
extern "C" pthread_handler_decl(handle_delayed_insert,arg);
static void unlink_blobs(register TABLE *table);
+#endif
/* Define to force use of my_malloc() if the allocated memory block is big */
@@ -39,9 +41,9 @@ static void unlink_blobs(register TABLE *table);
#endif
/*
- Check if insert fields are correct
- Updates table->time_stamp to point to timestamp field or 0, depending on
- if timestamp should be updated or not.
+ Check if insert fields are correct.
+ Sets table->timestamp_default_now/on_update_now to 0 o leaves it to point
+ to timestamp field, depending on if timestamp should be updated or not.
*/
int
@@ -62,7 +64,7 @@ check_insert_fields(THD *thd,TABLE *table,List<Item> &fields,
check_grant_all_columns(thd,INSERT_ACL,table))
return -1;
#endif
- table->time_stamp=0; // This is saved by caller
+ table->timestamp_default_now= table->timestamp_on_update_now= 0;
}
else
{ // Part field list
@@ -84,15 +86,15 @@ check_insert_fields(THD *thd,TABLE *table,List<Item> &fields,
if (setup_tables(&table_list) ||
setup_fields(thd, 0, &table_list,fields,1,0,0))
return -1;
+
if (thd->dupp_field)
{
my_error(ER_FIELD_SPECIFIED_TWICE,MYF(0), thd->dupp_field->field_name);
return -1;
}
- table->time_stamp=0;
if (table->timestamp_field && // Don't set timestamp if used
- table->timestamp_field->query_id != thd->query_id)
- table->time_stamp= table->timestamp_field->offset()+1;
+ table->timestamp_field->query_id == thd->query_id)
+ table->timestamp_default_now= table->timestamp_on_update_now= 0;
}
// For the values we need select_priv
#ifndef NO_EMBEDDED_ACCESS_CHECKS
@@ -109,7 +111,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
List<Item> &update_values,
enum_duplicates duplic)
{
- int error;
+ int error, res;
/*
log_on is about delayed inserts only.
By default, both logs are enabled (this won't cause problems if the server
@@ -124,7 +126,9 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
TABLE *table;
List_iterator_fast<List_item> its(values_list);
List_item *values;
- char *query=thd->query;
+#ifndef EMBEDDED_LIBRARY
+ char *query= thd->query;
+#endif
thr_lock_type lock_type = table_list->lock_type;
TABLE_LIST *insert_table_list= (TABLE_LIST*)
thd->lex->select_lex.table_list.first;
@@ -135,15 +139,20 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
if we are told to replace duplicates, the insert cannot be concurrent
delayed insert changed to regular in slave thread
*/
+#ifdef EMBEDDED_LIBRARY
+ if (lock_type == TL_WRITE_DELAYED)
+ lock_type=TL_WRITE;
+#else
if ((lock_type == TL_WRITE_DELAYED &&
((specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) ||
- thd->slave_thread || !max_insert_delayed_threads)) ||
+ thd->slave_thread || !thd->variables.max_insert_delayed_threads)) ||
(lock_type == TL_WRITE_CONCURRENT_INSERT && duplic == DUP_REPLACE) ||
(duplic == DUP_UPDATE))
lock_type=TL_WRITE;
+#endif
table_list->lock_type= lock_type;
- int res;
+#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
{
if (thd->locked_tables)
@@ -172,10 +181,10 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
}
}
else
+#endif /* EMBEDDED_LIBRARY */
res= open_and_lock_tables(thd, table_list);
if (res)
DBUG_RETURN(-1);
- fix_tables_pointers(thd->lex->all_selects_list);
table= table_list->table;
thd->proc_info="init";
@@ -185,7 +194,8 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
if (duplic == DUP_UPDATE && !table->insert_values)
{
/* it should be allocated before Item::fix_fields() */
- table->insert_values=(byte *)alloc_root(&table->mem_root, table->rec_buff_length);
+ table->insert_values=
+ (byte *)alloc_root(&thd->mem_root, table->rec_buff_length);
if (!table->insert_values)
goto abort;
}
@@ -223,7 +233,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
Fill in the given fields and dump it to the table file
*/
- info.records=info.deleted=info.copied=0;
+ info.records= info.deleted= info.copied= info.updated= 0;
info.handle_duplicates=duplic;
info.update_fields=&update_fields;
info.update_values=&update_values;
@@ -289,12 +299,14 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
break;
}
}
+#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
{
error=write_delayed(thd,table,duplic,query, thd->query_length, log_on);
query=0;
}
else
+#endif
error=write_record(table,&info);
if (error)
break;
@@ -315,6 +327,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
Now all rows are inserted. Time to update logs and sends response to
user
*/
+#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
{
if (!error)
@@ -326,6 +339,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
query_cache_invalidate3(thd, table_list, 1);
}
else
+#endif
{
if (bulk_insert)
{
@@ -356,7 +370,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
For the transactional algorithm to work the invalidation must be
before binlog writing and ha_autocommit_...
*/
- if (info.copied || info.deleted)
+ if (info.copied || info.deleted || info.updated)
{
query_cache_invalidate3(thd, table_list, 1);
}
@@ -364,7 +378,8 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
transactional_table= table->file->has_transactions();
log_delayed= (transactional_table || table->tmp_table);
- if ((info.copied || info.deleted) && (error <= 0 || !transactional_table))
+ if ((info.copied || info.deleted || info.updated) &&
+ (error <= 0 || !transactional_table))
{
if (mysql_bin_log.is_open())
{
@@ -404,7 +419,10 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
goto abort;
if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
!thd->cuted_fields))
- send_ok(thd,info.copied+info.deleted,id);
+ {
+ thd->row_count_func= info.copied+info.deleted+info.updated;
+ send_ok(thd, thd->row_count_func, id);
+ }
else
{
char buff[160];
@@ -414,16 +432,19 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list,
(ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
else
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
- (ulong) info.deleted, (ulong) thd->cuted_fields);
- ::send_ok(thd,info.copied+info.deleted,(ulonglong)id,buff);
+ (ulong) info.deleted+info.updated, (ulong) thd->cuted_fields);
+ thd->row_count_func= info.copied+info.deleted+info.updated;
+ ::send_ok(thd, thd->row_count_func, (ulonglong)id,buff);
}
free_underlaid_joins(thd, &thd->lex->select_lex);
table->insert_values=0;
DBUG_RETURN(0);
abort:
+#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
end_delayed_insert(thd);
+#endif
free_underlaid_joins(thd, &thd->lex->select_lex);
table->insert_values=0;
DBUG_RETURN(-1);
@@ -517,12 +538,22 @@ int write_record(TABLE *table,COPY_INFO *info)
goto err;
if ((error=table->file->update_row(table->record[1],table->record[0])))
goto err;
- info->deleted++;
+ info->updated++;
break;
}
else /* DUP_REPLACE */
{
- if (last_uniq_key(table,key_nr))
+ /*
+ The manual defines the REPLACE semantics that it is either
+ an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
+ InnoDB do not function in the defined way if we allow MySQL
+ to convert the latter operation internally to an UPDATE.
+ We also should not perform this conversion if we have
+ timestamp field with ON UPDATE which is different from DEFAULT.
+ */
+ if (last_uniq_key(table,key_nr) &&
+ !table->file->referenced_by_foreign_key() &&
+ table->timestamp_default_now == table->timestamp_on_update_now)
{
if ((error=table->file->update_row(table->record[1],
table->record[0])))
@@ -588,6 +619,8 @@ static int check_null_fields(THD *thd __attribute__((unused)),
A thread is created for each table that one uses with the DELAYED attribute.
*****************************************************************************/
+#ifndef EMBEDDED_LIBRARY
+
class delayed_row :public ilink {
public:
char *record,*query;
@@ -595,7 +628,8 @@ public:
time_t start_time;
bool query_start_used,last_insert_id_used,insert_id_used, log_query;
ulonglong last_insert_id;
- ulong time_stamp;
+ ulong timestamp_default_now;
+ ulong timestamp_on_update_now;
uint query_length;
delayed_row(enum_duplicates dup_arg, bool log_query_arg)
@@ -633,7 +667,8 @@ public:
thd.command=COM_DELAYED_INSERT;
thd.lex->current_select= 0; /* for my_message_sql */
- bzero((char*) &thd.net,sizeof(thd.net)); // Safety
+ bzero((char*) &thd.net, sizeof(thd.net)); // Safety
+ bzero((char*) &table_list, sizeof(table_list)); // Safety
thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
thd.host_or_ip= "";
bzero((char*) &info,sizeof(info));
@@ -728,7 +763,7 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
if (!(tmp=find_handler(thd,table_list)))
{
/* Don't create more than max_insert_delayed_threads */
- if (delayed_insert_threads >= max_insert_delayed_threads)
+ if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
DBUG_RETURN(0);
thd->proc_info="Creating delayed handler";
pthread_mutex_lock(&LOCK_delayed_create);
@@ -874,6 +909,7 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
{
if (!(*field= (*org_field)->new_field(&client_thd->mem_root,copy)))
return 0;
+ (*field)->orig_table= copy; // Remove connection
(*field)->move_field(adjust_ptrs); // Point at copy->record[0]
if (*org_field == found_next_number_field)
(*field)->table->found_next_number_field= *field;
@@ -884,9 +920,10 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
if (table->timestamp_field)
{
/* Restore offset as this may have been reset in handle_inserts */
- copy->time_stamp=table->timestamp_field->offset()+1;
copy->timestamp_field=
(Field_timestamp*) copy->field[table->timestamp_field_offset];
+ copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check;
+ copy->timestamp_field->set_timestamp_offsets();
}
/* _rowid is not used with delayed insert */
@@ -937,7 +974,8 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
row->last_insert_id_used= thd->last_insert_id_used;
row->insert_id_used= thd->insert_id_used;
row->last_insert_id= thd->last_insert_id;
- row->time_stamp= table->time_stamp;
+ row->timestamp_default_now= table->timestamp_default_now;
+ row->timestamp_on_update_now= table->timestamp_on_update_now;
di->rows.push_back(row);
di->stacked_inserts++;
@@ -1276,7 +1314,8 @@ bool delayed_insert::handle_inserts(void)
thd.last_insert_id=row->last_insert_id;
thd.last_insert_id_used=row->last_insert_id_used;
thd.insert_id_used=row->insert_id_used;
- table->time_stamp=row->time_stamp;
+ table->timestamp_default_now= row->timestamp_default_now;
+ table->timestamp_on_update_now= row->timestamp_on_update_now;
info.handle_duplicates= row->dup;
if (info.handle_duplicates == DUP_IGNORE ||
@@ -1309,8 +1348,15 @@ bool delayed_insert::handle_inserts(void)
pthread_mutex_lock(&mutex);
delete row;
- /* Let READ clients do something once in a while */
- if (group_count++ == max_rows)
+ /*
+ Let READ clients do something once in a while
+ We should however not break in the middle of a multi-line insert
+ if we have binary logging enabled as we don't want other commands
+ on this table until all entries has been processed
+ */
+ if (group_count++ >= max_rows && (row= rows.head()) &&
+ (!(row->log_query & using_bin_log) ||
+ row->query))
{
group_count=0;
if (stacked_inserts || tables_in_use) // Let these wait a while
@@ -1367,8 +1413,7 @@ bool delayed_insert::handle_inserts(void)
pthread_mutex_lock(&mutex);
DBUG_RETURN(1);
}
-
-
+#endif /* EMBEDDED_LIBRARY */
/***************************************************************************
Store records in INSERT ... SELECT *
@@ -1453,7 +1498,8 @@ void select_insert::send_error(uint errcode,const char *err)
error while inserting into a MyISAM table) we must write to the binlog (and
the error code will make the slave stop).
*/
- if ((info.copied || info.deleted) && !table->file->has_transactions())
+ if ((info.copied || info.deleted || info.updated) &&
+ !table->file->has_transactions())
{
if (last_insert_id)
thd->insert_id(last_insert_id); // For binary log
@@ -1466,7 +1512,7 @@ void select_insert::send_error(uint errcode,const char *err)
if (!table->tmp_table)
thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
}
- if (info.copied || info.deleted)
+ if (info.copied || info.deleted || info.updated)
{
query_cache_invalidate3(thd, table, 1);
}
@@ -1489,7 +1535,7 @@ bool select_insert::send_eof()
and ha_autocommit_...
*/
- if (info.copied || info.deleted)
+ if (info.copied || info.deleted || info.updated)
{
query_cache_invalidate3(thd, table, 1);
if (!(table->file->has_transactions() || table->tmp_table))
@@ -1522,8 +1568,9 @@ bool select_insert::send_eof()
(ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
else
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
- (ulong) info.deleted, (ulong) thd->cuted_fields);
- ::send_ok(thd,info.copied+info.deleted,last_insert_id,buff);
+ (ulong) info.deleted+info.updated, (ulong) thd->cuted_fields);
+ thd->row_count_func= info.copied+info.deleted+info.updated;
+ ::send_ok(thd, thd->row_count_func, last_insert_id, buff);
DBUG_RETURN(0);
}
@@ -1538,8 +1585,8 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
DBUG_ENTER("select_create::prepare");
unit= u;
- table=create_table_from_items(thd, create_info, db, name,
- extra_fields, keys, &values, &lock);
+ table= create_table_from_items(thd, create_info, db, name,
+ extra_fields, keys, &values, &lock);
if (!table)
DBUG_RETURN(-1); // abort() deletes table
@@ -1554,11 +1601,9 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
/* First field to copy */
field=table->field+table->fields - values.elements;
- if (table->timestamp_field) // Don't set timestamp if used
- {
- table->timestamp_field->set_time();
- table->time_stamp=0; // This should be saved
- }
+ /* Don't set timestamp if used */
+ table->timestamp_default_now= table->timestamp_on_update_now= 0;
+
table->next_number_field=table->found_next_number_field;
restore_record(table,default_values); // Get empty record
@@ -1644,7 +1689,9 @@ void select_create::abort()
#ifdef __GNUC__
template class List_iterator_fast<List_item>;
+#ifndef EMBEDDED_LIBRARY
template class I_List<delayed_insert>;
template class I_List_iterator<delayed_insert>;
template class I_List<delayed_row>;
-#endif
+#endif /* EMBEDDED_LIBRARY */
+#endif /* __GNUC__ */