diff options
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r-- | sql/sql_insert.cc | 187 |
1 files changed, 111 insertions, 76 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 73f8c5e4418..bd21d929291 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -446,7 +446,6 @@ void upgrade_lock_type(THD *thd, thr_lock_type *lock_type, client connection and the delayed thread. */ if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) || - thd->slave_thread || thd->variables.max_insert_delayed_threads == 0 || thd->prelocked_mode || thd->lex->uses_stored_routines()) @@ -454,6 +453,14 @@ void upgrade_lock_type(THD *thd, thr_lock_type *lock_type, *lock_type= TL_WRITE; return; } + if (thd->slave_thread) + { + /* Try concurrent insert */ + *lock_type= (duplic == DUP_UPDATE || duplic == DUP_REPLACE) ? + TL_WRITE : TL_WRITE_CONCURRENT_INSERT; + return; + } + bool log_on= (thd->options & OPTION_BIN_LOG || ! (thd->security_ctx->master_access & SUPER_ACL)); if (log_on && mysql_bin_log.is_open() && is_multi_insert) @@ -560,6 +567,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, int error, res; bool transactional_table, joins_freed= FALSE; bool changed; + bool was_insert_delayed= (table_list->lock_type == TL_WRITE_DELAYED); uint value_count; ulong counter = 1; ulonglong id; @@ -732,7 +740,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, if (lock_type != TL_WRITE_DELAYED && !thd->prelocked_mode) table->file->start_bulk_insert(values_list.elements); - thd->no_trans_update.stmt= FALSE; thd->abort_on_warning= (!ignore && (thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES))); @@ -859,14 +866,16 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, transactional_table= table->file->has_transactions(); - if ((changed= (info.copied || info.deleted || info.updated))) + if ((changed= (info.copied || info.deleted || info.updated)) || + was_insert_delayed) { /* Invalidate the table in the query cache if something changed. For the transactional algorithm to work the invalidation must be before binlog writing and ha_autocommit_or_rollback */ - query_cache_invalidate3(thd, table_list, 1); + if (changed) + query_cache_invalidate3(thd, table_list, 1); if (error <= 0 || !transactional_table) { if (mysql_bin_log.is_open()) @@ -904,10 +913,12 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, if (mysql_bin_log.write(&qinfo) && transactional_table) error=1; } - if (!transactional_table) - thd->no_trans_update.all= TRUE; + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; } } + DBUG_ASSERT(transactional_table || !changed || + thd->transaction.stmt.modified_non_trans_table); if (transactional_table) error=ha_autocommit_or_rollback(thd,error); @@ -1308,7 +1319,7 @@ static int last_uniq_key(TABLE *table,uint keynr) then both on update triggers will work instead. Similarly both on delete triggers will be invoked if we will delete conflicting records. - Sets thd->no_trans_update.stmt to TRUE if table which is updated didn't have + Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which is updated didn't have transactions. RETURN VALUE @@ -1475,7 +1486,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) goto err; info->deleted++; if (!table->file->has_transactions()) - thd->no_trans_update.stmt= TRUE; + thd->transaction.stmt.modified_non_trans_table= TRUE; if (table->triggers && table->triggers->process_triggers(thd, TRG_EVENT_DELETE, TRG_ACTION_AFTER, TRUE)) @@ -1507,7 +1518,7 @@ ok_or_after_trg_err: if (key) my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH); if (!table->file->has_transactions()) - thd->no_trans_update.stmt= TRUE; + thd->transaction.stmt.modified_non_trans_table= TRUE; DBUG_RETURN(trg_error); err: @@ -1702,18 +1713,18 @@ Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list) thd->proc_info="waiting for delay_list"; pthread_mutex_lock(&LOCK_delayed_insert); // Protect master list I_List_iterator<Delayed_insert> it(delayed_threads); - Delayed_insert *tmp; - while ((tmp=it++)) + Delayed_insert *di; + while ((di= it++)) { - if (!strcmp(tmp->thd.db,table_list->db) && - !strcmp(table_list->table_name,tmp->table->s->table_name)) + if (!strcmp(table_list->db, di->table_list.db) && + !strcmp(table_list->table_name, di->table_list.table_name)) { - tmp->lock(); + di->lock(); break; } } pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list - return tmp; + return di; } @@ -1739,21 +1750,41 @@ Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list) Two latter cases indicate a request for lock upgrade. XXX: why do we regard INSERT DELAYED into a view as an error and - do not simply a lock upgrade? + do not simply perform a lock upgrade? + + TODO: The approach with using two mutexes to work with the + delayed thread list -- LOCK_delayed_insert and + LOCK_delayed_create -- is redundant, and we only need one of + them to protect the list. The reason we have two locks is that + we do not want to block look-ups in the list while we're waiting + for the newly created thread to open the delayed table. However, + this wait itself is redundant -- we always call get_local_table + later on, and there wait again until the created thread acquires + a table lock. + + As is redundant the concept of locks_in_memory, since we already + have another counter with similar semantics - tables_in_use, + both of them are devoted to counting the number of producers for + a given consumer (delayed insert thread), only at different + stages of producer-consumer relationship. + + 'dead' and 'status' variables in Delayed_insert are redundant + too, since there is already 'di->thd.killed' and + di->stacked_inserts. */ static bool delayed_get_table(THD *thd, TABLE_LIST *table_list) { int error; - Delayed_insert *tmp; + Delayed_insert *di; DBUG_ENTER("delayed_get_table"); /* Must be set in the parser */ DBUG_ASSERT(table_list->db); /* Find the thread which handles this table. */ - if (!(tmp=find_handler(thd,table_list))) + if (!(di= find_handler(thd, table_list))) { /* No match. Create a new thread to handle the table, but @@ -1767,9 +1798,9 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list) The first search above was done without LOCK_delayed_create. Another thread might have created the handler in between. Search again. */ - if (! (tmp= find_handler(thd, table_list))) + if (! (di= find_handler(thd, table_list))) { - if (!(tmp=new Delayed_insert())) + if (!(di= new Delayed_insert())) { my_error(ER_OUTOFMEMORY,MYF(0),sizeof(Delayed_insert)); thd->fatal_error(); @@ -1778,28 +1809,30 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list) pthread_mutex_lock(&LOCK_thread_count); thread_count++; pthread_mutex_unlock(&LOCK_thread_count); - tmp->thd.set_db(table_list->db, strlen(table_list->db)); - tmp->thd.query= my_strdup(table_list->table_name,MYF(MY_WME)); - if (tmp->thd.db == NULL || tmp->thd.query == NULL) + di->thd.set_db(table_list->db, strlen(table_list->db)); + di->thd.query= my_strdup(table_list->table_name, MYF(MY_WME)); + if (di->thd.db == NULL || di->thd.query == NULL) { /* The error is reported */ - delete tmp; + delete di; thd->fatal_error(); goto end_create; } - tmp->table_list= *table_list; // Needed to open table - tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query; - tmp->lock(); - pthread_mutex_lock(&tmp->mutex); - if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib, - handle_delayed_insert,(void*) tmp))) + di->table_list= *table_list; // Needed to open table + /* Replace volatile strings with local copies */ + di->table_list.alias= di->table_list.table_name= di->thd.query; + di->table_list.db= di->thd.db; + di->lock(); + pthread_mutex_lock(&di->mutex); + if ((error= pthread_create(&di->thd.real_id, &connection_attrib, + handle_delayed_insert, (void*) di))) { DBUG_PRINT("error", ("Can't create thread to handle delayed insert (error %d)", error)); - pthread_mutex_unlock(&tmp->mutex); - tmp->unlock(); - delete tmp; + pthread_mutex_unlock(&di->mutex); + di->unlock(); + delete di; my_error(ER_CANT_CREATE_THREAD, MYF(0), error); thd->fatal_error(); goto end_create; @@ -1807,15 +1840,15 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list) /* Wait until table is open */ thd->proc_info="waiting for handler open"; - while (!tmp->thd.killed && !tmp->table && !thd->killed) + while (!di->thd.killed && !di->table && !thd->killed) { - pthread_cond_wait(&tmp->cond_client,&tmp->mutex); + pthread_cond_wait(&di->cond_client, &di->mutex); } - pthread_mutex_unlock(&tmp->mutex); + pthread_mutex_unlock(&di->mutex); thd->proc_info="got old table"; - if (tmp->thd.killed) + if (di->thd.killed) { - if (tmp->thd.net.report_error) + if (di->thd.net.report_error) { /* Copy the error message. Note that we don't treat fatal @@ -1823,31 +1856,34 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list) main thread. Use of my_message will enable stored procedures continue handlers. */ - my_message(tmp->thd.net.last_errno, tmp->thd.net.last_error, + my_message(di->thd.net.last_errno, di->thd.net.last_error, MYF(0)); } - tmp->unlock(); + di->unlock(); goto end_create; } if (thd->killed) { - tmp->unlock(); + di->unlock(); goto end_create; } + pthread_mutex_lock(&LOCK_delayed_insert); + delayed_threads.append(di); + pthread_mutex_unlock(&LOCK_delayed_insert); } pthread_mutex_unlock(&LOCK_delayed_create); } - pthread_mutex_lock(&tmp->mutex); - table_list->table= tmp->get_local_table(thd); - pthread_mutex_unlock(&tmp->mutex); + pthread_mutex_lock(&di->mutex); + table_list->table= di->get_local_table(thd); + pthread_mutex_unlock(&di->mutex); if (table_list->table) { DBUG_ASSERT(thd->net.report_error == 0); - thd->di=tmp; + thd->di= di; } /* Unlock the delayed insert object after its last access. */ - tmp->unlock(); + di->unlock(); DBUG_RETURN(table_list->table == NULL); end_create: @@ -2077,26 +2113,26 @@ void kill_delayed_threads(void) VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list I_List_iterator<Delayed_insert> it(delayed_threads); - Delayed_insert *tmp; - while ((tmp=it++)) + Delayed_insert *di; + while ((di= it++)) { - tmp->thd.killed= THD::KILL_CONNECTION; - if (tmp->thd.mysys_var) + di->thd.killed= THD::KILL_CONNECTION; + if (di->thd.mysys_var) { - pthread_mutex_lock(&tmp->thd.mysys_var->mutex); - if (tmp->thd.mysys_var->current_cond) + pthread_mutex_lock(&di->thd.mysys_var->mutex); + if (di->thd.mysys_var->current_cond) { /* We need the following test because the main mutex may be locked in handle_delayed_insert() */ - if (&tmp->mutex != tmp->thd.mysys_var->current_mutex) - pthread_mutex_lock(tmp->thd.mysys_var->current_mutex); - pthread_cond_broadcast(tmp->thd.mysys_var->current_cond); - if (&tmp->mutex != tmp->thd.mysys_var->current_mutex) - pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex); + if (&di->mutex != di->thd.mysys_var->current_mutex) + pthread_mutex_lock(di->thd.mysys_var->current_mutex); + pthread_cond_broadcast(di->thd.mysys_var->current_cond); + if (&di->mutex != di->thd.mysys_var->current_mutex) + pthread_mutex_unlock(di->thd.mysys_var->current_mutex); } - pthread_mutex_unlock(&tmp->thd.mysys_var->mutex); + pthread_mutex_unlock(&di->thd.mysys_var->mutex); } } VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list @@ -2176,11 +2212,6 @@ pthread_handler_t handle_delayed_insert(void *arg) } di->table->copy_blobs=1; - /* One can now use this */ - pthread_mutex_lock(&LOCK_delayed_insert); - delayed_threads.append(di); - pthread_mutex_unlock(&LOCK_delayed_insert); - /* Tell client that the thread is initialized */ pthread_cond_signal(&di->cond_client); @@ -2767,7 +2798,6 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) } if (info.handle_duplicates == DUP_UPDATE) table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE); - thd->no_trans_update.stmt= FALSE; thd->abort_on_warning= (!info.ignore && (thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES | @@ -2904,7 +2934,8 @@ void select_insert::send_error(uint errcode,const char *err) bool select_insert::send_eof() { - int error,error2; + int error, error2; + bool changed, transactional_table= table->file->has_transactions(); DBUG_ENTER("select_insert::send_eof"); error= (!thd->prelocked_mode) ? table->file->end_bulk_insert():0; @@ -2916,12 +2947,14 @@ bool select_insert::send_eof() and ha_autocommit_or_rollback */ - if (info.copied || info.deleted || info.updated) + if (changed= (info.copied || info.deleted || info.updated)) { query_cache_invalidate3(thd, table, 1); - if (!(table->file->has_transactions() || table->s->tmp_table)) - thd->no_trans_update.all= TRUE; + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; } + DBUG_ASSERT(transactional_table || !changed || + thd->transaction.stmt.modified_non_trans_table); if (last_insert_id) thd->insert_id(info.copied ? last_insert_id : 0); // For binary log @@ -2931,7 +2964,7 @@ bool select_insert::send_eof() if (!error) thd->clear_error(); Query_log_event qinfo(thd, thd->query, thd->query_length, - table->file->has_transactions(), FALSE); + transactional_table, FALSE); mysql_bin_log.write(&qinfo); } if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error) @@ -2957,6 +2990,7 @@ bool select_insert::send_eof() void select_insert::abort() { + bool changed, transactional_table; DBUG_ENTER("select_insert::abort"); if (!table) @@ -2967,6 +3001,7 @@ void select_insert::abort() */ DBUG_VOID_RETURN; } + transactional_table= table->file->has_transactions(); if (!thd->prelocked_mode) table->file->end_bulk_insert(); /* @@ -2975,21 +3010,22 @@ void select_insert::abort() error while inserting into a MyISAM table) we must write to the binlog (and the error code will make the slave stop). */ - if ((info.copied || info.deleted || info.updated) && - !table->file->has_transactions()) + if ((changed= info.copied || info.deleted || info.updated) && + !transactional_table) { if (last_insert_id) thd->insert_id(last_insert_id); // For binary log if (mysql_bin_log.is_open()) { Query_log_event qinfo(thd, thd->query, thd->query_length, - table->file->has_transactions(), FALSE); + transactional_table, FALSE); mysql_bin_log.write(&qinfo); } - if (!table->s->tmp_table) - thd->no_trans_update.all= TRUE; + if (thd->transaction.stmt.modified_non_trans_table) + thd->transaction.all.modified_non_trans_table= TRUE; } - if (info.copied || info.deleted || info.updated) + DBUG_ASSERT(transactional_table || !changed || thd->transaction.stmt.modified_non_trans_table); + if (changed) { query_cache_invalidate3(thd, table, 1); } @@ -3236,7 +3272,6 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE); if (!thd->prelocked_mode) table->file->start_bulk_insert((ha_rows) 0); - thd->no_trans_update.stmt= FALSE; thd->abort_on_warning= (!info.ignore && (thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES | |