summaryrefslogtreecommitdiff
path: root/sql/sql_insert.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r--sql/sql_insert.cc187
1 files changed, 111 insertions, 76 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 73f8c5e4418..bd21d929291 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -446,7 +446,6 @@ void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
client connection and the delayed thread.
*/
if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
- thd->slave_thread ||
thd->variables.max_insert_delayed_threads == 0 ||
thd->prelocked_mode ||
thd->lex->uses_stored_routines())
@@ -454,6 +453,14 @@ void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
*lock_type= TL_WRITE;
return;
}
+ if (thd->slave_thread)
+ {
+ /* Try concurrent insert */
+ *lock_type= (duplic == DUP_UPDATE || duplic == DUP_REPLACE) ?
+ TL_WRITE : TL_WRITE_CONCURRENT_INSERT;
+ return;
+ }
+
bool log_on= (thd->options & OPTION_BIN_LOG ||
! (thd->security_ctx->master_access & SUPER_ACL));
if (log_on && mysql_bin_log.is_open() && is_multi_insert)
@@ -560,6 +567,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
int error, res;
bool transactional_table, joins_freed= FALSE;
bool changed;
+ bool was_insert_delayed= (table_list->lock_type == TL_WRITE_DELAYED);
uint value_count;
ulong counter = 1;
ulonglong id;
@@ -732,7 +740,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
if (lock_type != TL_WRITE_DELAYED && !thd->prelocked_mode)
table->file->start_bulk_insert(values_list.elements);
- thd->no_trans_update.stmt= FALSE;
thd->abort_on_warning= (!ignore && (thd->variables.sql_mode &
(MODE_STRICT_TRANS_TABLES |
MODE_STRICT_ALL_TABLES)));
@@ -859,14 +866,16 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
transactional_table= table->file->has_transactions();
- if ((changed= (info.copied || info.deleted || info.updated)))
+ if ((changed= (info.copied || info.deleted || info.updated)) ||
+ was_insert_delayed)
{
/*
Invalidate the table in the query cache if something changed.
For the transactional algorithm to work the invalidation must be
before binlog writing and ha_autocommit_or_rollback
*/
- query_cache_invalidate3(thd, table_list, 1);
+ if (changed)
+ query_cache_invalidate3(thd, table_list, 1);
if (error <= 0 || !transactional_table)
{
if (mysql_bin_log.is_open())
@@ -904,10 +913,12 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
if (mysql_bin_log.write(&qinfo) && transactional_table)
error=1;
}
- if (!transactional_table)
- thd->no_trans_update.all= TRUE;
+ if (thd->transaction.stmt.modified_non_trans_table)
+ thd->transaction.all.modified_non_trans_table= TRUE;
}
}
+ DBUG_ASSERT(transactional_table || !changed ||
+ thd->transaction.stmt.modified_non_trans_table);
if (transactional_table)
error=ha_autocommit_or_rollback(thd,error);
@@ -1308,7 +1319,7 @@ static int last_uniq_key(TABLE *table,uint keynr)
then both on update triggers will work instead. Similarly both on
delete triggers will be invoked if we will delete conflicting records.
- Sets thd->no_trans_update.stmt to TRUE if table which is updated didn't have
+ Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which is updated didn't have
transactions.
RETURN VALUE
@@ -1475,7 +1486,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
goto err;
info->deleted++;
if (!table->file->has_transactions())
- thd->no_trans_update.stmt= TRUE;
+ thd->transaction.stmt.modified_non_trans_table= TRUE;
if (table->triggers &&
table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
TRG_ACTION_AFTER, TRUE))
@@ -1507,7 +1518,7 @@ ok_or_after_trg_err:
if (key)
my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
if (!table->file->has_transactions())
- thd->no_trans_update.stmt= TRUE;
+ thd->transaction.stmt.modified_non_trans_table= TRUE;
DBUG_RETURN(trg_error);
err:
@@ -1702,18 +1713,18 @@ Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
thd->proc_info="waiting for delay_list";
pthread_mutex_lock(&LOCK_delayed_insert); // Protect master list
I_List_iterator<Delayed_insert> it(delayed_threads);
- Delayed_insert *tmp;
- while ((tmp=it++))
+ Delayed_insert *di;
+ while ((di= it++))
{
- if (!strcmp(tmp->thd.db,table_list->db) &&
- !strcmp(table_list->table_name,tmp->table->s->table_name))
+ if (!strcmp(table_list->db, di->table_list.db) &&
+ !strcmp(table_list->table_name, di->table_list.table_name))
{
- tmp->lock();
+ di->lock();
break;
}
}
pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
- return tmp;
+ return di;
}
@@ -1739,21 +1750,41 @@ Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
Two latter cases indicate a request for lock upgrade.
XXX: why do we regard INSERT DELAYED into a view as an error and
- do not simply a lock upgrade?
+ do not simply perform a lock upgrade?
+
+ TODO: The approach with using two mutexes to work with the
+ delayed thread list -- LOCK_delayed_insert and
+ LOCK_delayed_create -- is redundant, and we only need one of
+ them to protect the list. The reason we have two locks is that
+ we do not want to block look-ups in the list while we're waiting
+ for the newly created thread to open the delayed table. However,
+ this wait itself is redundant -- we always call get_local_table
+ later on, and there wait again until the created thread acquires
+ a table lock.
+
+ As is redundant the concept of locks_in_memory, since we already
+ have another counter with similar semantics - tables_in_use,
+ both of them are devoted to counting the number of producers for
+ a given consumer (delayed insert thread), only at different
+ stages of producer-consumer relationship.
+
+ 'dead' and 'status' variables in Delayed_insert are redundant
+ too, since there is already 'di->thd.killed' and
+ di->stacked_inserts.
*/
static
bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
{
int error;
- Delayed_insert *tmp;
+ Delayed_insert *di;
DBUG_ENTER("delayed_get_table");
/* Must be set in the parser */
DBUG_ASSERT(table_list->db);
/* Find the thread which handles this table. */
- if (!(tmp=find_handler(thd,table_list)))
+ if (!(di= find_handler(thd, table_list)))
{
/*
No match. Create a new thread to handle the table, but
@@ -1767,9 +1798,9 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
The first search above was done without LOCK_delayed_create.
Another thread might have created the handler in between. Search again.
*/
- if (! (tmp= find_handler(thd, table_list)))
+ if (! (di= find_handler(thd, table_list)))
{
- if (!(tmp=new Delayed_insert()))
+ if (!(di= new Delayed_insert()))
{
my_error(ER_OUTOFMEMORY,MYF(0),sizeof(Delayed_insert));
thd->fatal_error();
@@ -1778,28 +1809,30 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
pthread_mutex_lock(&LOCK_thread_count);
thread_count++;
pthread_mutex_unlock(&LOCK_thread_count);
- tmp->thd.set_db(table_list->db, strlen(table_list->db));
- tmp->thd.query= my_strdup(table_list->table_name,MYF(MY_WME));
- if (tmp->thd.db == NULL || tmp->thd.query == NULL)
+ di->thd.set_db(table_list->db, strlen(table_list->db));
+ di->thd.query= my_strdup(table_list->table_name, MYF(MY_WME));
+ if (di->thd.db == NULL || di->thd.query == NULL)
{
/* The error is reported */
- delete tmp;
+ delete di;
thd->fatal_error();
goto end_create;
}
- tmp->table_list= *table_list; // Needed to open table
- tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query;
- tmp->lock();
- pthread_mutex_lock(&tmp->mutex);
- if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib,
- handle_delayed_insert,(void*) tmp)))
+ di->table_list= *table_list; // Needed to open table
+ /* Replace volatile strings with local copies */
+ di->table_list.alias= di->table_list.table_name= di->thd.query;
+ di->table_list.db= di->thd.db;
+ di->lock();
+ pthread_mutex_lock(&di->mutex);
+ if ((error= pthread_create(&di->thd.real_id, &connection_attrib,
+ handle_delayed_insert, (void*) di)))
{
DBUG_PRINT("error",
("Can't create thread to handle delayed insert (error %d)",
error));
- pthread_mutex_unlock(&tmp->mutex);
- tmp->unlock();
- delete tmp;
+ pthread_mutex_unlock(&di->mutex);
+ di->unlock();
+ delete di;
my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
thd->fatal_error();
goto end_create;
@@ -1807,15 +1840,15 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
/* Wait until table is open */
thd->proc_info="waiting for handler open";
- while (!tmp->thd.killed && !tmp->table && !thd->killed)
+ while (!di->thd.killed && !di->table && !thd->killed)
{
- pthread_cond_wait(&tmp->cond_client,&tmp->mutex);
+ pthread_cond_wait(&di->cond_client, &di->mutex);
}
- pthread_mutex_unlock(&tmp->mutex);
+ pthread_mutex_unlock(&di->mutex);
thd->proc_info="got old table";
- if (tmp->thd.killed)
+ if (di->thd.killed)
{
- if (tmp->thd.net.report_error)
+ if (di->thd.net.report_error)
{
/*
Copy the error message. Note that we don't treat fatal
@@ -1823,31 +1856,34 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
main thread. Use of my_message will enable stored
procedures continue handlers.
*/
- my_message(tmp->thd.net.last_errno, tmp->thd.net.last_error,
+ my_message(di->thd.net.last_errno, di->thd.net.last_error,
MYF(0));
}
- tmp->unlock();
+ di->unlock();
goto end_create;
}
if (thd->killed)
{
- tmp->unlock();
+ di->unlock();
goto end_create;
}
+ pthread_mutex_lock(&LOCK_delayed_insert);
+ delayed_threads.append(di);
+ pthread_mutex_unlock(&LOCK_delayed_insert);
}
pthread_mutex_unlock(&LOCK_delayed_create);
}
- pthread_mutex_lock(&tmp->mutex);
- table_list->table= tmp->get_local_table(thd);
- pthread_mutex_unlock(&tmp->mutex);
+ pthread_mutex_lock(&di->mutex);
+ table_list->table= di->get_local_table(thd);
+ pthread_mutex_unlock(&di->mutex);
if (table_list->table)
{
DBUG_ASSERT(thd->net.report_error == 0);
- thd->di=tmp;
+ thd->di= di;
}
/* Unlock the delayed insert object after its last access. */
- tmp->unlock();
+ di->unlock();
DBUG_RETURN(table_list->table == NULL);
end_create:
@@ -2077,26 +2113,26 @@ void kill_delayed_threads(void)
VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list
I_List_iterator<Delayed_insert> it(delayed_threads);
- Delayed_insert *tmp;
- while ((tmp=it++))
+ Delayed_insert *di;
+ while ((di= it++))
{
- tmp->thd.killed= THD::KILL_CONNECTION;
- if (tmp->thd.mysys_var)
+ di->thd.killed= THD::KILL_CONNECTION;
+ if (di->thd.mysys_var)
{
- pthread_mutex_lock(&tmp->thd.mysys_var->mutex);
- if (tmp->thd.mysys_var->current_cond)
+ pthread_mutex_lock(&di->thd.mysys_var->mutex);
+ if (di->thd.mysys_var->current_cond)
{
/*
We need the following test because the main mutex may be locked
in handle_delayed_insert()
*/
- if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
- pthread_mutex_lock(tmp->thd.mysys_var->current_mutex);
- pthread_cond_broadcast(tmp->thd.mysys_var->current_cond);
- if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
- pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex);
+ if (&di->mutex != di->thd.mysys_var->current_mutex)
+ pthread_mutex_lock(di->thd.mysys_var->current_mutex);
+ pthread_cond_broadcast(di->thd.mysys_var->current_cond);
+ if (&di->mutex != di->thd.mysys_var->current_mutex)
+ pthread_mutex_unlock(di->thd.mysys_var->current_mutex);
}
- pthread_mutex_unlock(&tmp->thd.mysys_var->mutex);
+ pthread_mutex_unlock(&di->thd.mysys_var->mutex);
}
}
VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
@@ -2176,11 +2212,6 @@ pthread_handler_t handle_delayed_insert(void *arg)
}
di->table->copy_blobs=1;
- /* One can now use this */
- pthread_mutex_lock(&LOCK_delayed_insert);
- delayed_threads.append(di);
- pthread_mutex_unlock(&LOCK_delayed_insert);
-
/* Tell client that the thread is initialized */
pthread_cond_signal(&di->cond_client);
@@ -2767,7 +2798,6 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
}
if (info.handle_duplicates == DUP_UPDATE)
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
- thd->no_trans_update.stmt= FALSE;
thd->abort_on_warning= (!info.ignore &&
(thd->variables.sql_mode &
(MODE_STRICT_TRANS_TABLES |
@@ -2904,7 +2934,8 @@ void select_insert::send_error(uint errcode,const char *err)
bool select_insert::send_eof()
{
- int error,error2;
+ int error, error2;
+ bool changed, transactional_table= table->file->has_transactions();
DBUG_ENTER("select_insert::send_eof");
error= (!thd->prelocked_mode) ? table->file->end_bulk_insert():0;
@@ -2916,12 +2947,14 @@ bool select_insert::send_eof()
and ha_autocommit_or_rollback
*/
- if (info.copied || info.deleted || info.updated)
+ if (changed= (info.copied || info.deleted || info.updated))
{
query_cache_invalidate3(thd, table, 1);
- if (!(table->file->has_transactions() || table->s->tmp_table))
- thd->no_trans_update.all= TRUE;
+ if (thd->transaction.stmt.modified_non_trans_table)
+ thd->transaction.all.modified_non_trans_table= TRUE;
}
+ DBUG_ASSERT(transactional_table || !changed ||
+ thd->transaction.stmt.modified_non_trans_table);
if (last_insert_id)
thd->insert_id(info.copied ? last_insert_id : 0); // For binary log
@@ -2931,7 +2964,7 @@ bool select_insert::send_eof()
if (!error)
thd->clear_error();
Query_log_event qinfo(thd, thd->query, thd->query_length,
- table->file->has_transactions(), FALSE);
+ transactional_table, FALSE);
mysql_bin_log.write(&qinfo);
}
if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error)
@@ -2957,6 +2990,7 @@ bool select_insert::send_eof()
void select_insert::abort()
{
+ bool changed, transactional_table;
DBUG_ENTER("select_insert::abort");
if (!table)
@@ -2967,6 +3001,7 @@ void select_insert::abort()
*/
DBUG_VOID_RETURN;
}
+ transactional_table= table->file->has_transactions();
if (!thd->prelocked_mode)
table->file->end_bulk_insert();
/*
@@ -2975,21 +3010,22 @@ void select_insert::abort()
error while inserting into a MyISAM table) we must write to the binlog (and
the error code will make the slave stop).
*/
- if ((info.copied || info.deleted || info.updated) &&
- !table->file->has_transactions())
+ if ((changed= info.copied || info.deleted || info.updated) &&
+ !transactional_table)
{
if (last_insert_id)
thd->insert_id(last_insert_id); // For binary log
if (mysql_bin_log.is_open())
{
Query_log_event qinfo(thd, thd->query, thd->query_length,
- table->file->has_transactions(), FALSE);
+ transactional_table, FALSE);
mysql_bin_log.write(&qinfo);
}
- if (!table->s->tmp_table)
- thd->no_trans_update.all= TRUE;
+ if (thd->transaction.stmt.modified_non_trans_table)
+ thd->transaction.all.modified_non_trans_table= TRUE;
}
- if (info.copied || info.deleted || info.updated)
+ DBUG_ASSERT(transactional_table || !changed || thd->transaction.stmt.modified_non_trans_table);
+ if (changed)
{
query_cache_invalidate3(thd, table, 1);
}
@@ -3236,7 +3272,6 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
if (!thd->prelocked_mode)
table->file->start_bulk_insert((ha_rows) 0);
- thd->no_trans_update.stmt= FALSE;
thd->abort_on_warning= (!info.ignore &&
(thd->variables.sql_mode &
(MODE_STRICT_TRANS_TABLES |