summaryrefslogtreecommitdiff
path: root/sql/sql_insert.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r--sql/sql_insert.cc1346
1 files changed, 1346 insertions, 0 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
new file mode 100644
index 00000000000..a7a6a6c24c7
--- /dev/null
+++ b/sql/sql_insert.cc
@@ -0,0 +1,1346 @@
+/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+/* Insert of records */
+
+#include "mysql_priv.h"
+#include "sql_acl.h"
+
+static int check_null_fields(THD *thd,TABLE *entry);
+static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
+static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup,
+ char *query, uint query_length, bool log_on);
+static void end_delayed_insert(THD *thd);
+static pthread_handler_decl(handle_delayed_insert,arg);
+static void unlink_blobs(register TABLE *table);
+
+/* Define to force use of my_malloc() if the allocated memory block is big */
+
+#ifndef HAVE_ALLOCA
+#define my_safe_alloca(size, min_length) my_alloca(size)
+#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
+#else
+#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
+#define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
+#endif
+
+
+/*
+ Check if insert fields are correct
+ Resets form->time_stamp if a timestamp value is set
+*/
+
+static int
+check_insert_fields(THD *thd,TABLE *table,List<Item> &fields,
+ List<Item> &values, ulong counter)
+{
+ if (fields.elements == 0 && values.elements != 0)
+ {
+ if (values.elements != table->fields)
+ {
+ my_printf_error(ER_WRONG_VALUE_COUNT_ON_ROW,
+ ER(ER_WRONG_VALUE_COUNT_ON_ROW),
+ MYF(0),counter);
+ return -1;
+ }
+ if (grant_option &&
+ check_grant_all_columns(thd,INSERT_ACL,table))
+ return -1;
+ table->time_stamp=0; // This should be saved
+ }
+ else
+ { // Part field list
+ if (fields.elements != values.elements)
+ {
+ my_printf_error(ER_WRONG_VALUE_COUNT_ON_ROW,
+ ER(ER_WRONG_VALUE_COUNT_ON_ROW),
+ MYF(0),counter);
+ return -1;
+ }
+ TABLE_LIST table_list;
+ bzero((char*) &table_list,sizeof(table_list));
+ table_list.name=table->table_name;
+ table_list.table=table;
+ table_list.grant=table->grant;
+
+ thd->dupp_field=0;
+ if (setup_fields(thd,&table_list,fields,1,0))
+ return -1;
+ if (thd->dupp_field)
+ {
+ my_error(ER_FIELD_SPECIFIED_TWICE,MYF(0), thd->dupp_field->field_name);
+ return -1;
+ }
+ if (table->timestamp_field && // Don't set timestamp if used
+ table->timestamp_field->query_id == thd->query_id)
+ table->time_stamp=0; // This should be saved
+ }
+ // For the values we need select_priv
+ table->grant.want_privilege=(SELECT_ACL & ~table->grant.privilege);
+ return 0;
+}
+
+
+int mysql_insert(THD *thd,TABLE_LIST *table_list, List<Item> &fields,
+ List<List_item> &values_list,enum_duplicates duplic,
+ thr_lock_type lock_type)
+{
+ int error;
+ bool log_on= ((thd->options & OPTION_UPDATE_LOG) ||
+ !(thd->master_access & PROCESS_ACL));
+ uint value_count;
+ uint save_time_stamp;
+ ulong counter = 1;
+ ulonglong id;
+ COPY_INFO info;
+ TABLE *table;
+ List_iterator<List_item> its(values_list);
+ List_item *values;
+ char *query=thd->query;
+ DBUG_ENTER("mysql_insert");
+
+ if (lock_type == TL_WRITE_DELAYED &&
+ (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) ||
+ lock_type == TL_WRITE_CONCURRENT_INSERT && duplic == DUP_REPLACE)
+ lock_type=TL_WRITE;
+
+ if (lock_type == TL_WRITE_DELAYED)
+ {
+ if (thd->locked_tables)
+ {
+ if (find_locked_table(thd,
+ table_list->db ? table_list->db : thd->db,
+ table_list->real_name))
+ {
+ my_printf_error(ER_DELAYED_INSERT_TABLE_LOCKED,
+ ER(ER_DELAYED_INSERT_TABLE_LOCKED),
+ MYF(0), table_list->real_name);
+ DBUG_RETURN(-1);
+ }
+ }
+ if (!(table = delayed_get_table(thd,table_list)) && !thd->fatal_error)
+ table = open_ltable(thd,table_list,lock_type=thd->update_lock_default);
+ }
+ else
+ table = open_ltable(thd,table_list,lock_type);
+ if (!table)
+ DBUG_RETURN(-1);
+ thd->proc_info="init";
+ save_time_stamp=table->time_stamp;
+ values= its++;
+ if (check_insert_fields(thd,table,fields,*values,1) ||
+ setup_fields(thd,table_list,*values,0,0))
+ {
+ table->time_stamp=save_time_stamp;
+ goto abort;
+ }
+ value_count= values->elements;
+ while ((values = its++))
+ {
+ counter++;
+ if (values->elements != value_count)
+ {
+ my_printf_error(ER_WRONG_VALUE_COUNT_ON_ROW,
+ ER(ER_WRONG_VALUE_COUNT_ON_ROW),
+ MYF(0),counter);
+ table->time_stamp=save_time_stamp;
+ goto abort;
+ }
+ if (setup_fields(thd,table_list,*values,0,0))
+ {
+ table->time_stamp=save_time_stamp;
+ goto abort;
+ }
+ }
+ its.rewind ();
+ /*
+ ** Fill in the given fields and dump it to the table file
+ */
+
+ info.records=info.deleted=info.copied=0;
+ info.handle_duplicates=duplic;
+ // Don't count warnings for simple inserts
+ if (values_list.elements > 1 || (thd->options & OPTION_WARNINGS))
+ thd->count_cuted_fields = 1;
+ thd->cuted_fields = 0L;
+ table->next_number_field=table->found_next_number_field;
+
+ error=0;
+ id=0;
+ thd->proc_info="update";
+ while ((values = its++))
+ {
+ if (fields.elements || !value_count)
+ {
+ restore_record(table,2); // Get empty record
+ if (fill_record(fields,*values) || check_null_fields(thd,table))
+ {
+ if (values_list.elements != 1)
+ {
+ info.records++;
+ continue;
+ }
+ error=1;
+ break;
+ }
+ }
+ else
+ {
+ table->record[0][0]=table->record[2][0]; // Fix delete marker
+ if (fill_record(table->field,*values))
+ {
+ if (values_list.elements != 1)
+ {
+ info.records++;
+ continue;
+ }
+ error=1;
+ break;
+ }
+ }
+ if (lock_type == TL_WRITE_DELAYED)
+ {
+ error=write_delayed(thd,table,duplic,query, thd->query_length, log_on);
+ query=0;
+ }
+ else
+ error=write_record(table,&info);
+ if (error)
+ break;
+ /*
+ If auto_increment values are used, save the first one
+ for LAST_INSERT_ID() and for the update log.
+ We can't use insert_id() as we don't want to touch the
+ last_insert_id_used flag.
+ */
+ if (! id && thd->insert_id_used)
+ { // Get auto increment value
+ id= thd->last_insert_id;
+ }
+ }
+ if (lock_type == TL_WRITE_DELAYED)
+ {
+ id=0; // No auto_increment id
+ info.copied=values_list.elements;
+ end_delayed_insert(thd);
+ }
+ else
+ {
+ if (id && values_list.elements != 1)
+ thd->insert_id(id); // For update log
+ else if (table->next_number_field)
+ id=table->next_number_field->val_int(); // Return auto_increment value
+ if (info.copied || info.deleted)
+ {
+ mysql_update_log.write(thd->query, thd->query_length);
+ Query_log_event qinfo(thd, thd->query);
+ mysql_bin_log.write(&qinfo);
+ }
+ error=ha_autocommit_or_rollback(thd,error);
+ if (thd->lock)
+ {
+ mysql_unlock_tables(thd, thd->lock);
+ thd->lock=0;
+ }
+ }
+ thd->proc_info="end";
+ table->time_stamp=save_time_stamp; // Restore auto timestamp ptr
+ table->next_number_field=0;
+ thd->count_cuted_fields=0;
+ thd->next_insert_id=0; // Reset this if wrongly used
+
+ if (error)
+ goto abort;
+
+ if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
+ !thd->cuted_fields))
+ send_ok(&thd->net,info.copied+info.deleted,id);
+ else {
+ char buff[160];
+ if (duplic == DUP_IGNORE)
+ sprintf(buff,ER(ER_INSERT_INFO),info.records,info.records-info.copied,
+ thd->cuted_fields);
+ else
+ sprintf(buff,ER(ER_INSERT_INFO),info.records,info.deleted,
+ thd->cuted_fields);
+ ::send_ok(&thd->net,info.copied+info.deleted,0L,buff);
+ }
+ DBUG_RETURN(0);
+
+abort:
+ if (lock_type == TL_WRITE_DELAYED)
+ end_delayed_insert(thd);
+ DBUG_RETURN(-1);
+}
+
+
+ /* Check if there is more uniq keys after field */
+
+static int last_uniq_key(TABLE *table,uint keynr)
+{
+ while (++keynr < table->keys)
+ if (table->key_info[keynr].flags & HA_NOSAME)
+ return 0;
+ return 1;
+}
+
+
+/*
+** Write a record to table with optional deleting of conflicting records
+*/
+
+
+int write_record(TABLE *table,COPY_INFO *info)
+{
+ int error;
+ char *key=0;
+
+ info->records++;
+ if (info->handle_duplicates == DUP_REPLACE)
+ {
+ while ((error=table->file->write_row(table->record[0])))
+ {
+ if (error != HA_WRITE_SKIPP)
+ goto err;
+ uint key_nr;
+ if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
+ {
+ error=HA_WRITE_SKIPP; /* Database can't find key */
+ goto err;
+ }
+ if (table->file->option_flag() & HA_DUPP_POS)
+ {
+ if (table->file->rnd_pos(table->record[1],table->file->dupp_ref))
+ goto err;
+ }
+ else
+ {
+ if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not neaded with NISAM */
+ {
+ error=my_errno;
+ goto err;
+ }
+
+ if (!key)
+ {
+ if (!(key=(char*) my_safe_alloca(table->max_unique_length,
+ MAX_KEY_LENGTH)))
+ {
+ error=ENOMEM;
+ goto err;
+ }
+ }
+ key_copy((byte*) key,table,key_nr,0);
+ if ((error=(table->file->index_read_idx(table->record[1],key_nr,
+ (byte*) key,0,
+ HA_READ_KEY_EXACT))))
+ goto err;
+ }
+ if (last_uniq_key(table,key_nr))
+ {
+ if ((error=table->file->update_row(table->record[1],table->record[0])))
+ goto err;
+ info->deleted++;
+ break; /* Update logfile and count */
+ }
+ else if ((error=table->file->delete_row(table->record[1])))
+ goto err;
+ info->deleted++;
+ }
+ info->copied++;
+ }
+ else if ((error=table->file->write_row(table->record[0])))
+ {
+ if (info->handle_duplicates != DUP_IGNORE ||
+ (error != HA_ERR_FOUND_DUPP_KEY && error != HA_ERR_FOUND_DUPP_UNIQUE))
+ goto err;
+ }
+ else
+ info->copied++;
+ if (key)
+ my_safe_afree(key,table->max_unique_length,MAX_KEY_LENGTH);
+ return 0;
+
+err:
+ if (key)
+ my_afree(key);
+ table->file->print_error(error,MYF(0));
+ return 1;
+}
+
+
+/******************************************************************************
+ Check that all fields with arn't null_fields are used
+ if DONT_USE_DEFAULT_FIELDS isn't defined use default value for not
+ set fields.
+******************************************************************************/
+
+static int check_null_fields(THD *thd __attribute__((unused)),
+ TABLE *entry __attribute__((unused)))
+{
+#ifdef DONT_USE_DEFAULT_FIELDS
+ for (Field **field=entry->field ; *field ; field++)
+ {
+ if ((*field)->query_id != thd->query_id && !(*field)->maybe_null() &&
+ *field != entry->timestamp_field &&
+ *field != entry->next_number_field)
+ {
+ my_printf_error(ER_BAD_NULL_ERROR, ER(ER_BAD_NULL_ERROR),MYF(0),
+ (*field)->field_name);
+ return 1;
+ }
+ }
+#endif
+ return 0;
+}
+
+/*****************************************************************************
+** Handling of delayed inserts
+**
+** A thread is created for each table that one uses with the DELAYED
+** attribute.
+*****************************************************************************/
+
+class delayed_row :public ilink {
+public:
+ char *record,*query;
+ enum_duplicates dup;
+ time_t start_time;
+ bool query_start_used,last_insert_id_used,insert_id_used,log_query;
+ ulonglong last_insert_id;
+ ulong time_stamp;
+ uint query_length;
+
+ delayed_row(enum_duplicates dup_arg, bool log_query_arg)
+ :record(0),query(0),dup(dup_arg),log_query(log_query_arg) {}
+ ~delayed_row()
+ {
+ x_free(record);
+ }
+};
+
+
+class delayed_insert :public ilink {
+ uint locks_in_memory;
+public:
+ THD thd;
+ TABLE *table;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond,cond_client;
+ volatile uint tables_in_use,stacked_inserts;
+ volatile bool status,dead;
+ COPY_INFO info;
+ I_List<delayed_row> rows;
+ uint group_count;
+ TABLE_LIST *table_list; // Argument
+
+ delayed_insert()
+ :locks_in_memory(0),
+ table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
+ group_count(0)
+ {
+ thd.user=thd.host=(char*) "";
+ thd.current_tablenr=0;
+ thd.version=refresh_version;
+ thd.command=COM_DELAYED_INSERT;
+
+ bzero((char*) &thd.net,sizeof(thd.net)); // Safety
+ thd.system_thread=1;
+ bzero((char*) &info,sizeof(info));
+ pthread_mutex_init(&mutex,NULL);
+ pthread_cond_init(&cond,NULL);
+ pthread_cond_init(&cond_client,NULL);
+ VOID(pthread_mutex_lock(&LOCK_thread_count));
+ delayed_insert_threads++;
+ VOID(pthread_mutex_unlock(&LOCK_thread_count));
+ }
+ ~delayed_insert()
+ {
+ /* The following is not really neaded, but just for safety */
+ delayed_row *row;
+ while ((row=rows.get()))
+ delete row;
+ pthread_mutex_destroy(&mutex);
+ if (table)
+ close_thread_tables(&thd);
+ VOID(pthread_mutex_lock(&LOCK_thread_count));
+ thd.unlink(); // Must be unlinked under lock
+ x_free(thd.query);
+ thd.user=thd.host=0;
+ thread_count--;
+ delayed_insert_threads--;
+ VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
+ VOID(pthread_mutex_unlock(&LOCK_thread_count));
+ }
+
+ /* The following is for checking when we can delete ourselves */
+ inline void lock()
+ {
+ locks_in_memory++; // Assume LOCK_delay_insert
+ }
+ void unlock()
+ {
+ pthread_mutex_lock(&LOCK_delayed_insert);
+ if (!--locks_in_memory)
+ {
+ pthread_mutex_lock(&mutex);
+ if (thd.killed && ! stacked_inserts && ! tables_in_use)
+ {
+ pthread_cond_signal(&cond);
+ status=1;
+ }
+ pthread_mutex_unlock(&mutex);
+ }
+ pthread_mutex_unlock(&LOCK_delayed_insert);
+ }
+ inline uint lock_count() { return locks_in_memory; }
+
+ TABLE* get_local_table(THD* client_thd);
+ bool handle_inserts(void);
+};
+
+
+I_List<delayed_insert> delayed_threads;
+
+
+delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
+{
+ thd->proc_info="waiting for delay_list";
+ pthread_mutex_lock(&LOCK_delayed_insert); // Protect master list
+ I_List_iterator<delayed_insert> it(delayed_threads);
+ delayed_insert *tmp;
+ while ((tmp=it++))
+ {
+ if (!strcmp(tmp->thd.db,table_list->db) &&
+ !strcmp(table_list->real_name,tmp->table->real_name))
+ {
+ tmp->lock();
+ break;
+ }
+ }
+ pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
+ return tmp;
+}
+
+
+static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
+{
+ int error;
+ delayed_insert *tmp;
+ DBUG_ENTER("delayed_get_table");
+
+ if (!table_list->db)
+ table_list->db=thd->db;
+
+ /* no match; create a new thread to handle the table */
+ if (!(tmp=find_handler(thd,table_list)))
+ {
+ thd->proc_info="Creating delayed handler";
+ pthread_mutex_lock(&LOCK_delayed_create);
+ if (!(tmp=find_handler(thd,table_list))) // Was just created
+ {
+ if (!(tmp=new delayed_insert()))
+ {
+ thd->fatal_error=1;
+ my_error(ER_OUTOFMEMORY,MYF(0),sizeof(delayed_insert));
+ pthread_mutex_unlock(&LOCK_delayed_create);
+ DBUG_RETURN(0);
+ }
+ if (!(tmp->thd.db=my_strdup(table_list->db,MYF(MY_WME))) ||
+ !(tmp->thd.query=my_strdup(table_list->real_name,MYF(MY_FAE))))
+ {
+ delete tmp;
+ thd->fatal_error=1;
+ my_error(ER_OUT_OF_RESOURCES,MYF(0));
+ pthread_mutex_unlock(&LOCK_delayed_create);
+ DBUG_RETURN(0);
+ }
+ tmp->table_list=table_list; // Needed to open table
+ tmp->lock();
+ pthread_mutex_lock(&LOCK_thread_count);
+ thread_count++;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ pthread_mutex_lock(&tmp->mutex);
+ if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib,
+ handle_delayed_insert,(void*) tmp)))
+ {
+ DBUG_PRINT("error",
+ ("Can't create thread to handle delayed insert (error %d)",
+ error));
+ pthread_mutex_unlock(&tmp->mutex);
+ tmp->unlock();
+ delete tmp;
+ thd->fatal_error=1;
+ pthread_mutex_unlock(&LOCK_delayed_create);
+ net_printf(&thd->net,ER_CANT_CREATE_THREAD,error);
+ DBUG_RETURN(0);
+ }
+
+ /* Wait until table is open */
+ thd->proc_info="waiting for handler open";
+ while (!tmp->thd.killed && !tmp->table && !thd->killed)
+ {
+ pthread_cond_wait(&tmp->cond_client,&tmp->mutex);
+ }
+ pthread_mutex_unlock(&tmp->mutex);
+ thd->proc_info="got old table";
+ if (tmp->thd.killed)
+ {
+ if (tmp->thd.fatal_error)
+ {
+ /* Copy error message and abort */
+ thd->fatal_error=1;
+ strmov(thd->net.last_error,tmp->thd.net.last_error);
+ thd->net.last_errno=thd->net.last_errno;
+ }
+ tmp->unlock();
+ pthread_mutex_unlock(&LOCK_delayed_create);
+ DBUG_RETURN(0); // Continue with normal insert
+ }
+ if (thd->killed)
+ {
+ tmp->unlock();
+ pthread_mutex_unlock(&LOCK_delayed_create);
+ DBUG_RETURN(0);
+ }
+ }
+ pthread_mutex_unlock(&LOCK_delayed_create);
+ }
+
+ pthread_mutex_lock(&tmp->mutex);
+ TABLE *table=tmp->get_local_table(thd);
+ pthread_mutex_unlock(&tmp->mutex);
+ tmp->unlock();
+ if (table)
+ thd->di=tmp;
+ else if (tmp->thd.fatal_error)
+ thd->fatal_error=1;
+ DBUG_RETURN((table_list->table=table));
+}
+
+
+/*
+ As we can't let many threads modify the same TABLE structure, we create
+ an own structure for each tread. This includes a row buffer to save the
+ column values and new fields that points to the new row buffer.
+ The memory is allocated in the client thread and is freed automaticly.
+*/
+
+TABLE *delayed_insert::get_local_table(THD* client_thd)
+{
+ my_ptrdiff_t adjust_ptrs;
+ Field **field,**org_field;
+ TABLE *copy;
+
+ /* First request insert thread to get a lock */
+ status=1;
+ tables_in_use++;
+ if (!thd.lock) // Table is not locked
+ {
+ client_thd->proc_info="waiting for handler lock";
+ pthread_cond_signal(&cond); // Tell handler to lock table
+ while (!dead && !thd.lock && ! client_thd->killed)
+ {
+ pthread_cond_wait(&cond_client,&mutex);
+ }
+ client_thd->proc_info="got handler lock";
+ if (client_thd->killed)
+ goto error;
+ if (dead)
+ {
+ strmov(client_thd->net.last_error,thd.net.last_error);
+ client_thd->net.last_errno=thd.net.last_errno;
+ goto error;
+ }
+ }
+
+ client_thd->proc_info="allocating local table";
+ copy= (TABLE*) sql_alloc(sizeof(*copy)+
+ (table->fields+1)*sizeof(Field**)+
+ table->reclength);
+ if (!copy)
+ goto error;
+ *copy= *table;
+ bzero((char*) &copy->name_hash,sizeof(copy->name_hash)); // No name hashing
+ /* We don't need to change the file handler here */
+
+ field=copy->field=(Field**) (copy+1);
+ copy->record[0]=(byte*) (field+table->fields+1);
+ memcpy((char*) copy->record[0],(char*) table->record[0],table->reclength);
+
+ /* Make a copy of all fields */
+
+ adjust_ptrs=PTR_BYTE_DIFF(copy->record[0],table->record[0]);
+
+ for (org_field=table->field ; *org_field ; org_field++,field++)
+ {
+ if (!(*field= (*org_field)->new_field(copy)))
+ return 0;
+ (*field)->move_field(adjust_ptrs); // Point at copy->record[0]
+ }
+ *field=0;
+
+ /* Adjust timestamp */
+ if (table->timestamp_field)
+ {
+ /* Restore offset as this may have been reset in handle_inserts */
+ copy->time_stamp=table->timestamp_field->offset()+1;
+ copy->timestamp_field=
+ (Field_timestamp*) copy->field[table->timestamp_field_offset];
+ }
+
+ /* _rowid is not used with delayed insert */
+ copy->rowid_field=0;
+ return copy;
+
+ /* Got fatal error */
+ error:
+ tables_in_use--;
+ status=1;
+ pthread_cond_signal(&cond); // Inform thread about abort
+ return 0;
+}
+
+
+/* Put a question in queue */
+
+static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
+ char *query, uint query_length, bool log_on)
+{
+ delayed_row *row=0;
+ delayed_insert *di=thd->di;
+ DBUG_ENTER("write_delayed");
+
+ thd->proc_info="waiting for handler insert";
+ pthread_mutex_lock(&di->mutex);
+ while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
+ pthread_cond_wait(&di->cond_client,&di->mutex);
+ thd->proc_info="storing row into queue";
+
+ if (thd->killed || !(row= new delayed_row(duplic, log_on)))
+ goto err;
+
+ if (!query)
+ query_length=0;
+ if (!(row->record= (char*) my_malloc(table->reclength+query_length+1,
+ MYF(MY_WME))))
+ goto err;
+ memcpy(row->record,table->record[0],table->reclength);
+ if (query_length)
+ {
+ row->query=row->record+table->reclength;
+ memcpy(row->query,query,query_length+1);
+ }
+ row->query_length= query_length;
+ row->start_time= thd->start_time;
+ row->query_start_used= thd->query_start_used;
+ row->last_insert_id_used= thd->last_insert_id_used;
+ row->insert_id_used= thd->insert_id_used;
+ row->last_insert_id= thd->last_insert_id;
+ row->time_stamp= table->time_stamp;
+
+ di->rows.push_back(row);
+ di->stacked_inserts++;
+ di->status=1;
+ if (table->blob_fields)
+ unlink_blobs(table);
+ pthread_cond_signal(&di->cond);
+
+ thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
+ pthread_mutex_unlock(&di->mutex);
+ DBUG_RETURN(0);
+
+ err:
+ delete row;
+ pthread_mutex_unlock(&di->mutex);
+ DBUG_RETURN(1);
+}
+
+
+static void end_delayed_insert(THD *thd)
+{
+ delayed_insert *di=thd->di;
+ pthread_mutex_lock(&di->mutex);
+ if (!--di->tables_in_use || di->thd.killed)
+ { // Unlock table
+ di->status=1;
+ pthread_cond_signal(&di->cond);
+ }
+ pthread_mutex_unlock(&di->mutex);
+}
+
+
+/* We kill all delayed threads when doing flush-tables */
+
+void kill_delayed_threads(void)
+{
+ VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list
+
+ I_List_iterator<delayed_insert> it(delayed_threads);
+ delayed_insert *tmp;
+ while ((tmp=it++))
+ {
+ pthread_mutex_lock(&tmp->mutex);
+ tmp->thd.killed=1;
+ if (tmp->thd.mysys_var)
+ {
+ pthread_mutex_lock(&tmp->thd.mysys_var->mutex);
+ if (tmp->thd.mysys_var->current_mutex)
+ {
+ if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
+ pthread_mutex_lock(tmp->thd.mysys_var->current_mutex);
+ pthread_cond_broadcast(tmp->thd.mysys_var->current_cond);
+ if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
+ pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex);
+ }
+ pthread_mutex_unlock(&tmp->thd.mysys_var->mutex);
+ }
+ pthread_mutex_unlock(&tmp->mutex);
+ }
+ VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
+}
+
+
+/*
+ * Create a new delayed insert thread
+*/
+
+static pthread_handler_decl(handle_delayed_insert,arg)
+{
+ delayed_insert *di=(delayed_insert*) arg;
+ THD *thd= &di->thd;
+
+ pthread_detach_this_thread();
+ /* Add thread to THD list so that's it's visible in 'show processlist' */
+ pthread_mutex_lock(&LOCK_thread_count);
+ thd->thread_id=thread_id++;
+ threads.append(thd);
+ pthread_mutex_unlock(&LOCK_thread_count);
+
+ pthread_mutex_lock(&di->mutex);
+#ifndef __WIN__ /* Win32 calls this in pthread_create */
+ if (my_thread_init())
+ {
+ strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
+ goto end;
+ }
+#endif
+
+ DBUG_ENTER("handle_delayed_insert");
+ if (init_thr_lock() ||
+ my_pthread_setspecific_ptr(THR_THD, thd) ||
+ my_pthread_setspecific_ptr(THR_NET, &thd->net))
+ {
+ strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
+ goto end;
+ }
+ thd->mysys_var=my_thread_var;
+ thd->dbug_thread_id=my_thread_id();
+#ifndef __WIN__
+ sigset_t set;
+ VOID(sigemptyset(&set)); // Get mask in use
+ VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
+#endif
+
+ /* open table */
+
+ if (!(di->table=open_ltable(thd,di->table_list,TL_WRITE_DELAYED)))
+ {
+ thd->fatal_error=1; // Abort waiting inserts
+ goto end;
+ }
+ di->table->copy_blobs=1;
+
+ /* One can now use this */
+ pthread_mutex_lock(&LOCK_delayed_insert);
+ delayed_threads.append(di);
+ pthread_mutex_unlock(&LOCK_delayed_insert);
+
+ /* Tell client that the thread is initialized */
+ pthread_cond_signal(&di->cond_client);
+
+ /* Now wait until we get an insert or lock to handle */
+ /* We will not abort as long as a client thread uses this thread */
+
+ for (;;)
+ {
+ if (thd->killed)
+ {
+ uint lock_count;
+ /*
+ Remove this from delay insert list so that no one can request a
+ table from this
+ */
+ pthread_mutex_unlock(&di->mutex);
+ pthread_mutex_lock(&LOCK_delayed_insert);
+ di->unlink();
+ lock_count=di->lock_count();
+ pthread_mutex_unlock(&LOCK_delayed_insert);
+ pthread_mutex_lock(&di->mutex);
+ if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
+ break; // Time to die
+ }
+
+ if (!di->status && !di->stacked_inserts)
+ {
+ struct timespec abstime;
+#if defined(HAVE_TIMESPEC_TS_SEC)
+ abstime.ts_sec=time((time_t*) 0)+(time_t) delayed_insert_timeout;
+ abstime.ts_nsec=0;
+#elif defined(__WIN__)
+ abstime.tv_sec=time((time_t*) 0)+(time_t) delayed_insert_timeout;
+ abstime.tv_nsec=0;
+#else
+ struct timeval tv;
+ gettimeofday(&tv,0);
+ abstime.tv_sec=tv.tv_sec+(time_t) delayed_insert_timeout;
+ abstime.tv_nsec=tv.tv_usec*1000;
+#endif
+
+ /* Information for pthread_kill */
+ pthread_mutex_lock(&di->thd.mysys_var->mutex);
+ di->thd.mysys_var->current_mutex= &di->mutex;
+ di->thd.mysys_var->current_cond= &di->cond;
+ pthread_mutex_unlock(&di->thd.mysys_var->mutex);
+ di->thd.proc_info=0;
+
+ for ( ; ;)
+ {
+ int error;
+#if (defined(HAVE_BROKEN_COND_TIMEDWAIT) || defined(HAVE_LINUXTHREADS))
+ error=pthread_cond_wait(&di->cond,&di->mutex);
+#else
+ error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
+#ifdef EXTRA_DEBUG
+ if (error && error != EINTR)
+ {
+ fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
+ DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
+ error));
+ }
+#endif
+#endif
+ if (thd->killed || di->status)
+ break;
+ if (error == ETIME || error == ETIMEDOUT)
+ {
+ thd->killed=1;
+ break;
+ }
+ }
+ pthread_mutex_lock(&di->thd.mysys_var->mutex);
+ di->thd.mysys_var->current_mutex= 0;
+ di->thd.mysys_var->current_cond= 0;
+ pthread_mutex_unlock(&di->thd.mysys_var->mutex);
+ }
+
+ if (di->tables_in_use && ! thd->lock)
+ {
+ /* request for new delayed insert */
+ if (!(thd->lock=mysql_lock_tables(thd,&di->table,1)))
+ {
+ di->dead=thd->killed=1; // Fatal error
+ }
+ pthread_cond_broadcast(&di->cond_client);
+ }
+ if (di->stacked_inserts)
+ {
+ if (di->handle_inserts())
+ {
+ di->dead=thd->killed=1; // Some fatal error
+ }
+ }
+ di->status=0;
+ if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
+ {
+ /* No one is doing a insert delayed;
+ Unlock it so that other threads can use it */
+ MYSQL_LOCK *lock=thd->lock;
+ thd->lock=0;
+ pthread_mutex_unlock(&di->mutex);
+ mysql_unlock_tables(thd, lock);
+ di->group_count=0;
+ pthread_mutex_lock(&di->mutex);
+ }
+ if (di->tables_in_use)
+ pthread_cond_broadcast(&di->cond_client); // If waiting clients
+ }
+
+end:
+ /*
+ di should be unlinked from the thread handler list and have no active
+ clients
+ */
+
+ close_thread_tables(thd); // Free the table
+ di->table=0;
+ di->dead=thd->killed=1; // If error
+ pthread_cond_broadcast(&di->cond_client); // Safety
+ pthread_mutex_unlock(&di->mutex);
+
+ pthread_mutex_lock(&LOCK_delayed_create); // Because of delayed_get_table
+ pthread_mutex_lock(&LOCK_delayed_insert);
+ delete di;
+ pthread_mutex_unlock(&LOCK_delayed_insert);
+ pthread_mutex_unlock(&LOCK_delayed_create);
+
+ my_thread_end();
+ pthread_exit(0);
+ DBUG_RETURN(0);
+}
+
+
+/* Remove pointers from temporary fields to allocated values */
+
+static void unlink_blobs(register TABLE *table)
+{
+ for (Field **ptr=table->field ; *ptr ; ptr++)
+ {
+ if ((*ptr)->flags & BLOB_FLAG)
+ ((Field_blob *) (*ptr))->clear_temporary();
+ }
+}
+
+/* Free blobs stored in current row */
+
+static void free_delayed_insert_blobs(register TABLE *table)
+{
+ for (Field **ptr=table->field ; *ptr ; ptr++)
+ {
+ if ((*ptr)->flags & BLOB_FLAG)
+ {
+ char *str;
+ ((Field_blob *) (*ptr))->get_ptr(&str);
+ my_free(str,MYF(MY_ALLOW_ZERO_PTR));
+ ((Field_blob *) (*ptr))->reset();
+ }
+ }
+}
+
+
+bool delayed_insert::handle_inserts(void)
+{
+ int error;
+ uint max_rows;
+ DBUG_ENTER("handle_inserts");
+
+ /* Allow client to insert new rows */
+ pthread_mutex_unlock(&mutex);
+
+ table->next_number_field=table->found_next_number_field;
+
+ thd.proc_info="upgrading lock";
+ if (thr_upgrade_write_delay_lock(*thd.lock->locks))
+ {
+ /* This can only happen if thread is killed by shutdown */
+ sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->real_name);
+ goto err;
+ }
+
+ thd.proc_info="insert";
+ max_rows=delayed_insert_limit;
+ if (thd.killed || table->version != refresh_version)
+ {
+ thd.killed=1;
+ max_rows= ~0; // Do as much as possible
+ }
+
+ table->file->extra(HA_EXTRA_WRITE_CACHE);
+ pthread_mutex_lock(&mutex);
+ delayed_row *row;
+ while ((row=rows.get()))
+ {
+ stacked_inserts--;
+ pthread_mutex_unlock(&mutex);
+ memcpy(table->record[0],row->record,table->reclength);
+
+ thd.start_time=row->start_time;
+ thd.query_start_used=row->query_start_used;
+ thd.last_insert_id=row->last_insert_id;
+ thd.last_insert_id_used=row->last_insert_id_used;
+ thd.insert_id_used=row->insert_id_used;
+ table->time_stamp=row->time_stamp;
+
+ info.handle_duplicates= row->dup;
+ if (write_record(table,&info))
+ {
+ info.error++; // Ignore errors
+ pthread_mutex_lock(&LOCK_delayed_status);
+ delayed_insert_errors++;
+ pthread_mutex_unlock(&LOCK_delayed_status);
+ }
+ if (row->query && row->log_query)
+ {
+ mysql_update_log.write(row->query, row->query_length);
+ Query_log_event qinfo(&thd, row->query);
+ mysql_bin_log.write(&qinfo);
+ }
+ if (table->blob_fields)
+ free_delayed_insert_blobs(table);
+ thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
+ thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
+ pthread_mutex_lock(&mutex);
+
+ delete row;
+ /* Let READ clients do something once in a while */
+ if (group_count++ == max_rows)
+ {
+ group_count=0;
+ if (stacked_inserts || tables_in_use) // Let these wait a while
+ {
+ if (tables_in_use)
+ pthread_cond_broadcast(&cond_client); // If waiting clients
+ thd.proc_info="reschedule";
+ pthread_mutex_unlock(&mutex);
+ if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
+ {
+ /* This should never happen */
+ table->file->print_error(error,MYF(0));
+ sql_print_error("%s",thd.net.last_error);
+ goto err;
+ }
+ if (thr_reschedule_write_lock(*thd.lock->locks))
+ {
+ /* This should never happen */
+ sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->real_name);
+ }
+ table->file->extra(HA_EXTRA_WRITE_CACHE);
+ pthread_mutex_lock(&mutex);
+ thd.proc_info="insert";
+ }
+ if (tables_in_use)
+ pthread_cond_broadcast(&cond_client); // If waiting clients
+ }
+ }
+
+ thd.proc_info=0;
+ table->next_number_field=0;
+ pthread_mutex_unlock(&mutex);
+ if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
+ { // This shouldn't happen
+ table->file->print_error(error,MYF(0));
+ sql_print_error("%s",thd.net.last_error);
+ goto err;
+ }
+ pthread_mutex_lock(&mutex);
+ DBUG_RETURN(0);
+
+ err:
+ thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
+ pthread_mutex_lock(&mutex);
+ DBUG_RETURN(1);
+}
+
+
+
+/***************************************************************************
+** store records in INSERT ... SELECT *
+***************************************************************************/
+
+int
+select_insert::prepare(List<Item> &values)
+{
+ TABLE *form=table;
+ DBUG_ENTER("select_insert::prepare");
+
+ save_time_stamp=table->time_stamp;
+ if (check_insert_fields(thd,table,*fields,values,1))
+ DBUG_RETURN(1);
+
+ if (fields->elements)
+ {
+ restore_record(form,2); // Get empty record
+ }
+ else
+ form->record[0][0]=form->record[2][0]; // Fix delete marker
+ form->next_number_field=form->found_next_number_field;
+ thd->count_cuted_fields=1; /* calc cuted fields */
+ thd->cuted_fields=0;
+ if (info.handle_duplicates != DUP_REPLACE)
+ form->file->extra(HA_EXTRA_WRITE_CACHE);
+ DBUG_RETURN(0);
+}
+
+select_insert::~select_insert()
+{
+ if (table)
+ {
+ if (save_time_stamp)
+ table->time_stamp=save_time_stamp;
+ table->next_number_field=0;
+ }
+ thd->count_cuted_fields=0;
+}
+
+
+bool select_insert::send_data(List<Item> &values)
+{
+ if (thd->offset_limit)
+ { // using limit offset,count
+ thd->offset_limit--;
+ return 0;
+ }
+ if (fields->elements)
+ fill_record(*fields,values);
+ else
+ fill_record(table->field,values);
+ if (write_record(table,&info))
+ return 1;
+ if (table->next_number_field) // Clear for next record
+ {
+ table->next_number_field->reset();
+ if (! last_insert_id && thd->insert_id_used)
+ last_insert_id=thd->insert_id();
+ }
+ return 0;
+}
+
+
+void select_insert::send_error(uint errcode,const char *err)
+{
+ ::send_error(&thd->net,errcode,err);
+ VOID(table->file->extra(HA_EXTRA_NO_CACHE));
+}
+
+
+bool select_insert::send_eof()
+{
+ int error;
+ if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
+ {
+ table->file->print_error(error,MYF(0));
+ ::send_error(&thd->net);
+ return 1;
+ }
+ else
+ {
+ char buff[160];
+ if (info.handle_duplicates == DUP_IGNORE)
+ sprintf(buff,ER(ER_INSERT_INFO),info.records,info.records-info.copied,
+ thd->cuted_fields);
+ else
+ sprintf(buff,ER(ER_INSERT_INFO),info.records,info.deleted,
+ thd->cuted_fields);
+ if (last_insert_id)
+ thd->insert_id(last_insert_id); // For update log
+ ::send_ok(&thd->net,info.copied,last_insert_id,buff);
+ mysql_update_log.write(thd->query,thd->query_length);
+ Query_log_event qinfo(thd, thd->query);
+ mysql_bin_log.write(&qinfo);
+ return 0;
+ }
+}
+
+
+/***************************************************************************
+** CREATE TABLE (SELECT) ...
+***************************************************************************/
+
+int
+select_create::prepare(List<Item> &values)
+{
+ DBUG_ENTER("select_create::prepare");
+
+ table=create_table_from_items(thd, create_info, db, name,
+ extra_fields, keys, &values, &lock);
+ if (!table)
+ DBUG_RETURN(-1); // abort() deletes table
+
+ /* First field to copy */
+ field=table->field+table->fields - values.elements;
+
+ save_time_stamp=table->time_stamp;
+ if (table->timestamp_field) // Don't set timestamp if used
+ {
+ table->timestamp_field->set_time();
+ table->time_stamp=0; // This should be saved
+ }
+ table->next_number_field=table->found_next_number_field;
+
+ restore_record(table,2); // Get empty record
+ thd->count_cuted_fields=1; // count warnings
+ thd->cuted_fields=0;
+ DBUG_RETURN(0);
+}
+
+
+bool select_create::send_data(List<Item> &values)
+{
+ if (thd->offset_limit)
+ { // using limit offset,count
+ thd->offset_limit--;
+ return 0;
+ }
+ fill_record(field,values);
+ if (write_record(table,&info))
+ return 1;
+ if (table->next_number_field) // Clear for next record
+ {
+ table->next_number_field->reset();
+ if (! last_insert_id && thd->insert_id_used)
+ last_insert_id=thd->insert_id();
+ }
+ return 0;
+}
+
+extern HASH open_cache;
+
+bool select_create::send_eof()
+{
+ bool tmp=select_insert::send_eof();
+ if (tmp)
+ abort();
+ else
+ {
+ VOID(pthread_mutex_lock(&LOCK_open));
+ mysql_unlock_tables(thd, lock);
+ hash_delete(&open_cache,(byte*) table);
+ lock=0; table=0;
+ VOID(pthread_mutex_unlock(&LOCK_open));
+ }
+ return tmp;
+}
+
+void select_create::abort()
+{
+ VOID(pthread_mutex_lock(&LOCK_open));
+ if (lock)
+ {
+ mysql_unlock_tables(thd, lock);
+ lock=0;
+ }
+ if (table)
+ {
+ enum db_type table_type=table->db_type;
+ hash_delete(&open_cache,(byte*) table);
+ quick_rm_table(table_type,db,name);
+ table=0;
+ }
+ VOID(pthread_mutex_unlock(&LOCK_open));
+}
+
+
+/*****************************************************************************
+** Instansiate templates
+*****************************************************************************/
+
+#ifdef __GNUC__
+template class List_iterator<List_item>;
+template class I_List<delayed_insert>;
+template class I_List_iterator<delayed_insert>;
+template class I_List<delayed_row>;
+#endif