summaryrefslogtreecommitdiff
path: root/sql/sql_insert.cc
diff options
context:
space:
mode:
authorunknown <kostja@bodhi.local>2006-07-13 00:18:59 +0400
committerunknown <kostja@bodhi.local>2006-07-13 00:18:59 +0400
commit26f0d13d0163beba2ae4f96396302b0c44407c65 (patch)
tree72d131bf20c7f83d51448986e035a8f9cbfb5c73 /sql/sql_insert.cc
parente8699b56140571305b898c29bdc45a5bab117a45 (diff)
parentfe4ed2440dad75c005465315199243cdec36a7f4 (diff)
downloadmariadb-git-26f0d13d0163beba2ae4f96396302b0c44407c65.tar.gz
Merge bk-internal.mysql.com:/home/bk/mysql-5.1
into bodhi.local:/opt/local/work/mysql-5.1-runtime-merge sql/ha_ndbcluster.cc: Auto merged sql/ha_partition.cc: Auto merged sql/log_event.cc: Auto merged sql/mysql_priv.h: Auto merged sql/sql_delete.cc: Auto merged sql/sql_load.cc: Auto merged sql/sql_parse.cc: Auto merged sql/sql_table.cc: Auto merged sql/sql_trigger.cc: Auto merged sql/sql_update.cc: Auto merged sql/table.cc: Auto merged mysql-test/r/federated.result: Manual merge. mysql-test/t/federated.test: Manual merge. sql/sql_insert.cc: Manual merge.
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r--sql/sql_insert.cc375
1 files changed, 235 insertions, 140 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 513e780b93a..25ed03c4051 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -17,6 +17,44 @@
/* Insert of records */
+/*
+ INSERT DELAYED
+
+ Insert delayed is distinguished from a normal insert by lock_type ==
+ TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
+ "delayed" table (delayed_get_table()), but falls back to
+ open_and_lock_tables() on error and proceeds as normal insert then.
+
+ Opening a "delayed" table means to find a delayed insert thread that
+ has the table open already. If this fails, a new thread is created and
+ waited for to open and lock the table.
+
+ If accessing the thread succeeded, in
+ delayed_insert::get_local_table() the table of the thread is copied
+ for local use. A copy is required because the normal insert logic
+ works on a target table, but the other threads table object must not
+ be used. The insert logic uses the record buffer to create a record.
+ And the delayed insert thread uses the record buffer to pass the
+ record to the table handler. So there must be different objects. Also
+ the copied table is not included in the lock, so that the statement
+ can proceed even if the real table cannot be accessed at this moment.
+
+ Copying a table object is not a trivial operation. Besides the TABLE
+ object there are the field pointer array, the field objects and the
+ record buffer. After copying the field objects, their pointers into
+ the record must be "moved" to point to the new record buffer.
+
+ After this setup the normal insert logic is used. Only that for
+ delayed inserts write_delayed() is called instead of write_record().
+ It inserts the rows into a queue and signals the delayed insert thread
+ instead of writing directly to the table.
+
+ The delayed insert thread awakes from the signal. It locks the table,
+ inserts the rows from the queue, unlocks the table, and waits for the
+ next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.
+
+*/
+
#include "mysql_priv.h"
#include "sp_head.h"
#include "sql_trigger.h"
@@ -26,8 +64,8 @@
static int check_null_fields(THD *thd,TABLE *entry);
#ifndef EMBEDDED_LIBRARY
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
-static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, bool ignore,
- char *query, uint query_length, bool log_on);
+static int write_delayed(THD *thd, TABLE *table, enum_duplicates dup,
+ LEX_STRING query, bool ignore, bool log_on);
static void end_delayed_insert(THD *thd);
pthread_handler_t handle_delayed_insert(void *arg);
static void unlink_blobs(register TABLE *table);
@@ -407,7 +445,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
table->next_number_field=table->found_next_number_field;
error=0;
- id=0;
thd->proc_info="update";
if (duplic != DUP_ERROR || ignore)
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
@@ -512,22 +549,13 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
{
- error=write_delayed(thd, table, duplic, ignore, query, thd->query_length, log_on);
+ LEX_STRING const st_query = { query, thd->query_length };
+ error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
query=0;
}
else
#endif
error=write_record(thd, table ,&info);
- /*
- If auto_increment values are used, save the first one
- for LAST_INSERT_ID() and for the update log.
- We can't use insert_id() as we don't want to touch the
- last_insert_id_used flag.
- */
- if (! id && thd->insert_id_used)
- { // Get auto increment value
- id= thd->last_insert_id;
- }
if (error)
break;
thd->row_count++;
@@ -535,6 +563,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
free_underlaid_joins(thd, &thd->lex->select_lex);
joins_freed= TRUE;
+ table->file->ha_release_auto_increment();
/*
Now all rows are inserted. Time to update logs and sends response to
@@ -545,7 +574,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
{
if (!error)
{
- id=0; // No auto_increment id
info.copied=values_list.elements;
end_delayed_insert(thd);
}
@@ -559,11 +587,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
table->file->print_error(my_errno,MYF(0));
error=1;
}
- if (id && values_list.elements != 1)
- thd->insert_id(id); // For update log
- else if (table->next_number_field && info.copied)
- id=table->next_number_field->val_int(); // Return auto_increment value
-
transactional_table= table->file->has_transactions();
if ((changed= (info.copied || info.deleted || info.updated)))
@@ -612,21 +635,30 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
}
}
thd->proc_info="end";
+ /*
+ We'll report to the client this id:
+ - if the table contains an autoincrement column and we successfully
+ inserted an autogenerated value, the autogenerated value.
+ - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
+ called, X.
+ - if the table contains an autoincrement column, and some rows were
+ inserted, the id of the last "inserted" row (if IGNORE, that value may not
+ have been really inserted but ignored).
+ */
+ id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
+ thd->first_successful_insert_id_in_cur_stmt :
+ (thd->arg_of_last_insert_id_function ?
+ thd->first_successful_insert_id_in_prev_stmt :
+ ((table->next_number_field && info.copied) ?
+ table->next_number_field->val_int() : 0));
table->next_number_field=0;
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
- thd->next_insert_id=0; // Reset this if wrongly used
if (duplic != DUP_ERROR || ignore)
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
if (duplic == DUP_REPLACE &&
(!table->triggers || !table->triggers->has_delete_triggers()))
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
- /* Reset value of LAST_INSERT_ID if no rows where inserted */
- if (!info.copied && thd->insert_id_used)
- {
- thd->insert_id(0);
- id=0;
- }
if (error)
goto abort;
if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
@@ -648,8 +680,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
thd->row_count_func= info.copied+info.deleted+info.updated;
::send_ok(thd, (ulong) thd->row_count_func, id, buff);
}
- if (table != NULL)
- table->file->release_auto_increment();
thd->abort_on_warning= 0;
DBUG_RETURN(FALSE);
@@ -659,7 +689,7 @@ abort:
end_delayed_insert(thd);
#endif
if (table != NULL)
- table->file->release_auto_increment();
+ table->file->ha_release_auto_increment();
if (!joins_freed)
free_underlaid_joins(thd, &thd->lex->select_lex);
thd->abort_on_warning= 0;
@@ -968,6 +998,8 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
int error, trg_error= 0;
char *key=0;
MY_BITMAP *save_read_set, *save_write_set;
+ ulonglong prev_insert_id= table->file->next_insert_id;
+ ulonglong insert_id_for_cur_row= 0;
DBUG_ENTER("write_record");
info->records++;
@@ -980,10 +1012,20 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
while ((error=table->file->ha_write_row(table->record[0])))
{
uint key_nr;
+ /*
+ If we do more than one iteration of this loop, from the second one the
+ row will have an explicit value in the autoinc field, which was set at
+ the first call of handler::update_auto_increment(). So we must save
+ the autogenerated value to avoid thd->insert_id_for_cur_row to become
+ 0.
+ */
+ if (table->file->insert_id_for_cur_row > 0)
+ insert_id_for_cur_row= table->file->insert_id_for_cur_row;
+ else
+ table->file->insert_id_for_cur_row= insert_id_for_cur_row;
bool is_duplicate_key_error;
if (table->file->is_fatal_error(error, HA_CHECK_DUP))
goto err;
- table->file->restore_auto_increment(); // it's too early here! BUG#20188
is_duplicate_key_error= table->file->is_fatal_error(error, 0);
if (!is_duplicate_key_error)
{
@@ -1011,7 +1053,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
if (info->handle_duplicates == DUP_REPLACE &&
table->next_number_field &&
key_nr == table->s->next_number_index &&
- table->file->auto_increment_column_changed)
+ (insert_id_for_cur_row > 0))
goto err;
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
{
@@ -1070,22 +1112,29 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
if (res == VIEW_CHECK_ERROR)
goto before_trg_err;
- if (thd->clear_next_insert_id)
- {
- /* Reset auto-increment cacheing if we do an update */
- thd->clear_next_insert_id= 0;
- thd->next_insert_id= 0;
- }
if ((error=table->file->ha_update_row(table->record[1],
table->record[0])))
{
if (info->ignore &&
!table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
+ {
+ table->file->restore_auto_increment(prev_insert_id);
goto ok_or_after_trg_err;
+ }
goto err;
}
info->updated++;
-
+ /*
+ If ON DUP KEY UPDATE updates a row instead of inserting one, and
+ there is an auto_increment column, then SELECT LAST_INSERT_ID()
+ returns the id of the updated row:
+ */
+ if (table->next_number_field)
+ {
+ longlong field_val= table->next_number_field->val_int();
+ thd->record_first_successful_insert_id_in_cur_stmt(field_val);
+ table->file->adjust_next_insert_id_after_explicit_value(field_val);
+ }
trg_error= (table->triggers &&
table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
TRG_ACTION_AFTER, TRUE));
@@ -1114,16 +1163,11 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) &&
(!table->triggers || !table->triggers->has_delete_triggers()))
{
- if (thd->clear_next_insert_id)
- {
- /* Reset auto-increment cacheing if we do an update */
- thd->clear_next_insert_id= 0;
- thd->next_insert_id= 0;
- }
if ((error=table->file->ha_update_row(table->record[1],
table->record[0])))
goto err;
info->deleted++;
+ thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
/*
Since we pretend that we have done insert we should call
its after triggers.
@@ -1152,6 +1196,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
}
}
}
+ thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
/*
Restore column maps if they where replaced during an duplicate key
problem.
@@ -1165,12 +1210,13 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
if (!info->ignore ||
table->file->is_fatal_error(error, HA_CHECK_DUP))
goto err;
- table->file->restore_auto_increment();
+ table->file->restore_auto_increment(prev_insert_id);
goto ok_or_after_trg_err;
}
after_trg_n_copied_inc:
info->copied++;
+ thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
trg_error= (table->triggers &&
table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
TRG_ACTION_AFTER, TRUE));
@@ -1190,6 +1236,7 @@ err:
table->file->print_error(error,MYF(0));
before_trg_err:
+ table->file->restore_auto_increment(prev_insert_id);
if (key)
my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
table->column_bitmaps_set(save_read_set, save_write_set);
@@ -1252,14 +1299,20 @@ public:
char *record;
enum_duplicates dup;
time_t start_time;
- bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query;
- ulonglong last_insert_id;
+ bool query_start_used, ignore, log_query;
+ bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
+ ulonglong first_successful_insert_id_in_prev_stmt;
timestamp_auto_set_type timestamp_field_type;
+ LEX_STRING query;
- delayed_row(enum_duplicates dup_arg, bool ignore_arg, bool log_query_arg)
- :record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {}
+ delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
+ bool ignore_arg, bool log_query_arg)
+ : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
+ query(query_arg)
+ {}
~delayed_row()
{
+ x_free(query.str);
x_free(record);
}
};
@@ -1267,9 +1320,6 @@ public:
class delayed_insert :public ilink {
uint locks_in_memory;
- char *query;
- ulong query_length;
- ulong query_allocated;
public:
THD thd;
TABLE *table;
@@ -1283,7 +1333,7 @@ public:
TABLE_LIST table_list; // Argument
delayed_insert()
- :locks_in_memory(0), query(0), query_length(0), query_allocated(0),
+ :locks_in_memory(0),
table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
group_count(0)
{
@@ -1294,6 +1344,11 @@ public:
thd.command=COM_DELAYED_INSERT;
thd.lex->current_select= 0; // for my_message_sql
thd.lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
+ /*
+ Statement-based replication of INSERT DELAYED has problems with RAND()
+ and user vars, so in mixed mode we go to row-based.
+ */
+ thd.set_current_stmt_binlog_row_based_if_mixed();
bzero((char*) &thd.net, sizeof(thd.net)); // Safety
bzero((char*) &table_list, sizeof(table_list)); // Safety
@@ -1309,7 +1364,6 @@ public:
}
~delayed_insert()
{
- my_free(query, MYF(MY_WME|MY_ALLOW_ZERO_PTR));
/* The following is not really needed, but just for safety */
delayed_row *row;
while ((row=rows.get()))
@@ -1329,25 +1383,6 @@ public:
VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
}
- int set_query(char const *q, ulong qlen) {
- if (q && qlen > 0)
- {
- if (query_allocated < qlen + 1)
- {
- ulong const flags(MY_WME|MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR);
- query= my_realloc(query, qlen + 1, MYF(flags));
- if (query == 0)
- return HA_ERR_OUT_OF_MEM;
- query_allocated= qlen;
- }
- query_length= qlen;
- memcpy(query, q, qlen + 1);
- }
- else
- query_length= 0;
- return 0;
- }
-
/* The following is for checking when we can delete ourselves */
inline void lock()
{
@@ -1520,6 +1555,7 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
TABLE *copy;
TABLE_SHARE *share= table->s;
byte *bitmap;
+ DBUG_ENTER("delayed_insert::get_local_table");
/* First request insert thread to get a lock */
status=1;
@@ -1543,6 +1579,13 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
}
}
+ /*
+ Allocate memory for the TABLE object, the field pointers array, and
+ one record buffer of reclength size. Normally a table has three
+ record buffers of rec_buff_length size, which includes alignment
+ bytes. Since the table copy is used for creating one record only,
+ the other record buffers and alignment are unnecessary.
+ */
client_thd->proc_info="allocating local table";
copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
(share->fields+1)*sizeof(Field**)+
@@ -1550,23 +1593,28 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
share->column_bitmap_size*2);
if (!copy)
goto error;
- *copy= *table;
+ /* Copy the TABLE object. */
+ *copy= *table;
/* We don't need to change the file handler here */
- field= copy->field= (Field**) (copy+1);
- bitmap= (byte*) (field+share->fields+1);
- copy->record[0]= (bitmap+ share->column_bitmap_size*2);
- memcpy((char*) copy->record[0],(char*) table->record[0],share->reclength);
-
- /* Make a copy of all fields */
-
- adjust_ptrs=PTR_BYTE_DIFF(copy->record[0],table->record[0]);
-
- found_next_number_field=table->found_next_number_field;
- for (org_field=table->field ; *org_field ; org_field++,field++)
+ /* Assign the pointers for the field pointers array and the record. */
+ field= copy->field= (Field**) (copy + 1);
+ bitmap= (byte*) (field + share->fields + 1);
+ copy->record[0]= (bitmap + share->column_bitmap_size * 2);
+ memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
+ /*
+ Make a copy of all fields.
+ The copied fields need to point into the copied record. This is done
+ by copying the field objects with their old pointer values and then
+ "move" the pointers by the distance between the original and copied
+ records. That way we preserve the relative positions in the records.
+ */
+ adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
+ found_next_number_field= table->found_next_number_field;
+ for (org_field= table->field; *org_field; org_field++, field++)
{
- if (!(*field= (*org_field)->new_field(client_thd->mem_root,copy)))
- return 0;
+ if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
+ DBUG_RETURN(0);
(*field)->orig_table= copy; // Remove connection
(*field)->move_field_offset(adjust_ptrs); // Point at copy->record[0]
if (*org_field == found_next_number_field)
@@ -1599,26 +1647,27 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
copy->read_set= &copy->def_read_set;
copy->write_set= &copy->def_write_set;
- return copy;
+ DBUG_RETURN(copy);
/* Got fatal error */
error:
tables_in_use--;
status=1;
pthread_cond_signal(&cond); // Inform thread about abort
- return 0;
+ DBUG_RETURN(0);
}
/* Put a question in queue */
-static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
- bool ignore, char *query, uint query_length,
- bool log_on)
+static int
+write_delayed(THD *thd,TABLE *table, enum_duplicates duplic,
+ LEX_STRING query, bool ignore, bool log_on)
{
- delayed_row *row=0;
+ delayed_row *row;
delayed_insert *di=thd->di;
DBUG_ENTER("write_delayed");
+ DBUG_PRINT("enter", ("query = '%s' length %u", query.str, query.length));
thd->proc_info="waiting for handler insert";
pthread_mutex_lock(&di->mutex);
@@ -1626,18 +1675,44 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
pthread_cond_wait(&di->cond_client,&di->mutex);
thd->proc_info="storing row into queue";
- if (thd->killed || !(row= new delayed_row(duplic, ignore, log_on)))
+ if (thd->killed)
goto err;
+ /*
+ Take a copy of the query string, if there is any. The string will
+ be free'ed when the row is destroyed. If there is no query string,
+ we don't do anything special.
+ */
+
+ if (query.str)
+ {
+ char *str;
+ if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
+ goto err;
+ query.str= str;
+ }
+ row= new delayed_row(query, duplic, ignore, log_on);
+ if (row == NULL)
+ {
+ my_free(query.str, MYF(MY_WME));
+ goto err;
+ }
+
if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
goto err;
memcpy(row->record, table->record[0], table->s->reclength);
- di->set_query(query, query_length);
row->start_time= thd->start_time;
row->query_start_used= thd->query_start_used;
- row->last_insert_id_used= thd->last_insert_id_used;
- row->insert_id_used= thd->insert_id_used;
- row->last_insert_id= thd->last_insert_id;
+ /*
+ those are for the binlog: LAST_INSERT_ID() has been evaluated at this
+ time, so record does not need it, but statement-based binlogging of the
+ INSERT will need when the row is actually inserted.
+ As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
+ */
+ row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
+ thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
+ row->first_successful_insert_id_in_prev_stmt=
+ thd->first_successful_insert_id_in_prev_stmt;
row->timestamp_field_type= table->timestamp_field_type;
di->rows.push_back(row);
@@ -1891,6 +1966,7 @@ pthread_handler_t handle_delayed_insert(void *arg)
MYSQL_LOCK *lock=thd->lock;
thd->lock=0;
pthread_mutex_unlock(&di->mutex);
+ di->table->file->ha_release_auto_increment();
mysql_unlock_tables(thd, lock);
di->group_count=0;
pthread_mutex_lock(&di->mutex);
@@ -1990,7 +2066,7 @@ bool delayed_insert::handle_inserts(void)
if (thd.killed || table->s->version != refresh_version)
{
thd.killed= THD::KILL_CONNECTION;
- max_rows= ~(ulong)0; // Do as much as possible
+ max_rows= ULONG_MAX; // Do as much as possible
}
/*
@@ -2002,13 +2078,6 @@ bool delayed_insert::handle_inserts(void)
table->file->extra(HA_EXTRA_WRITE_CACHE);
pthread_mutex_lock(&mutex);
- /* Reset auto-increment cacheing */
- if (thd.clear_next_insert_id)
- {
- thd.next_insert_id= 0;
- thd.clear_next_insert_id= 0;
- }
-
while ((row=rows.get()))
{
stacked_inserts--;
@@ -2017,9 +2086,12 @@ bool delayed_insert::handle_inserts(void)
thd.start_time=row->start_time;
thd.query_start_used=row->query_start_used;
- thd.last_insert_id=row->last_insert_id;
- thd.last_insert_id_used=row->last_insert_id_used;
- thd.insert_id_used=row->insert_id_used;
+ /* for the binlog, forget auto_increment ids generated by previous rows */
+// thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
+ thd.first_successful_insert_id_in_prev_stmt=
+ row->first_successful_insert_id_in_prev_stmt;
+ thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt=
+ row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
table->timestamp_field_type= row->timestamp_field_type;
info.ignore= row->ignore;
@@ -2044,6 +2116,7 @@ bool delayed_insert::handle_inserts(void)
thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
row->log_query = 0;
}
+
if (using_ignore)
{
using_ignore=0;
@@ -2054,6 +2127,22 @@ bool delayed_insert::handle_inserts(void)
using_opt_replace= 0;
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
}
+
+ if (row->log_query && row->query.str != NULL && mysql_bin_log.is_open())
+ {
+ /*
+ If the query has several rows to insert, only the first row will come
+ here. In row-based binlogging, this means that the first row will be
+ written to binlog as one Table_map event and one Rows event (due to an
+ event flush done in binlog_query()), then all other rows of this query
+ will be binlogged together as one single Table_map event and one
+ single Rows event.
+ */
+ thd.binlog_query(THD::ROW_QUERY_TYPE,
+ row->query.str, row->query.length,
+ FALSE, FALSE);
+ }
+
if (table->s->blob_fields)
free_delayed_insert_blobs(table);
thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
@@ -2100,13 +2189,25 @@ bool delayed_insert::handle_inserts(void)
pthread_cond_broadcast(&cond_client); // If waiting clients
}
}
-
thd.proc_info=0;
pthread_mutex_unlock(&mutex);
- /* After releasing the mutex, to prevent deadlocks. */
- if (mysql_bin_log.is_open())
- thd.binlog_query(THD::ROW_QUERY_TYPE, query, query_length, FALSE, FALSE);
+#ifdef HAVE_ROW_BASED_REPLICATION
+ /*
+ We need to flush the pending event when using row-based
+ replication since the flushing normally done in binlog_query() is
+ not done last in the statement: for delayed inserts, the insert
+ statement is logged *before* all rows are inserted.
+
+ We can flush the pending event without checking the thd->lock
+ since the delayed insert *thread* is not inside a stored function
+ or trigger.
+
+ TODO: Move the logging to last in the sequence of rows.
+ */
+ if (thd.current_stmt_binlog_row_based)
+ thd.binlog_flush_pending_rows_event(TRUE);
+#endif /* HAVE_ROW_BASED_REPLICATION */
if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
{ // This shouldn't happen
@@ -2194,7 +2295,7 @@ select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
enum_duplicates duplic,
bool ignore_check_option_errors)
:table_list(table_list_par), table(table_par), fields(fields_par),
- last_insert_id(0),
+ autoinc_value_of_last_inserted_row(0),
insert_into_view(table_list_par && table_list_par->view != 0)
{
bzero((char*) &info,sizeof(info));
@@ -2410,15 +2511,20 @@ bool select_insert::send_data(List<Item> &values)
if (table->next_number_field)
{
/*
+ If no value has been autogenerated so far, we need to remember the
+ value we just saw, we may need to send it to client in the end.
+ */
+ if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
+ autoinc_value_of_last_inserted_row=
+ table->next_number_field->val_int();
+ /*
Clear auto-increment field for the next record, if triggers are used
we will clear it twice, but this should be cheap.
*/
table->next_number_field->reset();
- if (!last_insert_id && thd->insert_id_used)
- last_insert_id= thd->insert_id();
}
}
- table->file->release_auto_increment();
+ table->file->ha_release_auto_increment();
DBUG_RETURN(error);
}
@@ -2480,8 +2586,6 @@ void select_insert::send_error(uint errcode,const char *err)
{
if (!table->file->has_transactions())
{
- if (last_insert_id)
- thd->insert_id(last_insert_id); // For binary log
if (mysql_bin_log.is_open())
{
thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
@@ -2501,6 +2605,7 @@ void select_insert::send_error(uint errcode,const char *err)
bool select_insert::send_eof()
{
int error,error2;
+ ulonglong id;
DBUG_ENTER("select_insert::send_eof");
error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
@@ -2527,8 +2632,6 @@ bool select_insert::send_eof()
thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
}
- if (last_insert_id)
- thd->insert_id(last_insert_id); // For binary log
/*
Write to binlog before commiting transaction. No statement will
be written by the binlog_query() below in RBR mode. All the
@@ -2558,7 +2661,13 @@ bool select_insert::send_eof()
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
(ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
thd->row_count_func= info.copied+info.deleted+info.updated;
- ::send_ok(thd, (ulong) thd->row_count_func, last_insert_id, buff);
+
+ id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
+ thd->first_successful_insert_id_in_cur_stmt :
+ (thd->arg_of_last_insert_id_function ?
+ thd->first_successful_insert_id_in_prev_stmt :
+ (info.copied ? autoinc_value_of_last_inserted_row : 0));
+ ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
DBUG_RETURN(0);
}
@@ -2724,21 +2833,6 @@ static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
}
-class MY_HOOKS : public TABLEOP_HOOKS
-{
-public:
- MY_HOOKS(select_create *x) : ptr(x) { }
- virtual void do_prelock(TABLE **tables, uint count)
- {
- if (ptr->get_thd()->current_stmt_binlog_row_based)
- ptr->binlog_show_create_table(tables, count);
- }
-
-private:
- select_create *ptr;
-};
-
-
int
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
{
@@ -2751,8 +2845,9 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
MY_HOOKS(select_create *x) : ptr(x) { }
virtual void do_prelock(TABLE **tables, uint count)
{
- if (ptr->get_thd()->current_stmt_binlog_row_based)
- ptr->binlog_show_create_table(tables, count);
+ if (ptr->get_thd()->current_stmt_binlog_row_based &&
+ !(ptr->get_create_info()->options & HA_LEX_CREATE_TMP_TABLE))
+ ptr->binlog_show_create_table(tables, count);
}
private: