diff options
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r-- | sql/sql_insert.cc | 279 |
1 files changed, 142 insertions, 137 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 946c0536897..894a2a21efb 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -108,7 +108,7 @@ static int check_insert_fields(THD *thd, TABLE_LIST *table_list, No fields are provided so all fields must be provided in the values. Thus we set all bits in the write set. */ - table->file->ha_set_all_bits_in_write_set(); + bitmap_set_all(table->write_set); } else { // Part field list @@ -123,7 +123,7 @@ static int check_insert_fields(THD *thd, TABLE_LIST *table_list, return -1; } - thd->dupp_field=0; + thd->dup_field= 0; select_lex->no_wrap_view_item= TRUE; /* Save the state of the current name resolution context. */ @@ -135,11 +135,7 @@ static int check_insert_fields(THD *thd, TABLE_LIST *table_list, */ table_list->next_local= 0; context->resolve_in_table_list_only(table_list); - /* - Indicate fields in list is to be updated by setting set_query_id - parameter to 2. This sets the bit in the write_set for each field. - */ - res= setup_fields(thd, 0, fields, 2, 0, 0); + res= setup_fields(thd, 0, fields, MARK_COLUMNS_WRITE, 0, 0); /* Restore the current context. */ ctx_state.restore_state(context, table_list); @@ -167,16 +163,27 @@ static int check_insert_fields(THD *thd, TABLE_LIST *table_list, table_list->table= table= tbl->table; } - if (check_unique && thd->dupp_field) + if (check_unique && thd->dup_field) { - my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dupp_field->field_name); + my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name); return -1; } - if (table->timestamp_field && // Don't set timestamp if used - table->timestamp_field->query_id == thd->query_id) - clear_timestamp_auto_bits(table->timestamp_field_type, - TIMESTAMP_AUTO_SET_ON_INSERT); + if (table->timestamp_field) // Don't automaticly set timestamp if used + { + if (bitmap_is_set(table->write_set, + table->timestamp_field->field_index)) + clear_timestamp_auto_bits(table->timestamp_field_type, + TIMESTAMP_AUTO_SET_ON_INSERT); + else + { + bitmap_set_bit(table->write_set, + table->timestamp_field->field_index); + } + } } + if (table->found_next_number_field) + table->mark_auto_increment_column(); + table->mark_columns_needed_for_insert(); // For the values we need select_priv #ifndef NO_EMBEDDED_ACCESS_CHECKS table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege); @@ -217,40 +224,33 @@ static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list, List<Item> &update_fields) { TABLE *table= insert_table_list->table; - query_id_t timestamp_query_id; - LINT_INIT(timestamp_query_id); + my_bool timestamp_mark; - /* - Change the query_id for the timestamp column so that we can - check if this is modified directly. - */ if (table->timestamp_field) { - timestamp_query_id= table->timestamp_field->query_id; - table->timestamp_field->query_id= thd->query_id - 1; + /* + Unmark the timestamp field so that we can check if this is modified + by update_fields + */ + timestamp_mark= bitmap_test_and_clear(table->write_set, + table->timestamp_field->field_index); } - /* - Check the fields we are going to modify. This will set the query_id - of all used fields to the threads query_id. It will also set all - fields into the write set of this table. - */ - if (setup_fields(thd, 0, update_fields, 2, 0, 0)) + /* Check the fields we are going to modify */ + if (setup_fields(thd, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0)) return -1; if (table->timestamp_field) { /* Don't set timestamp column if this is modified. */ - if (table->timestamp_field->query_id == thd->query_id) + if (bitmap_is_set(table->write_set, + table->timestamp_field->field_index)) clear_timestamp_auto_bits(table->timestamp_field_type, TIMESTAMP_AUTO_SET_ON_UPDATE); - else - { - table->timestamp_field->query_id= timestamp_query_id; - table->file->ha_set_bit_in_write_set(table->timestamp_field->fieldnr); - } + if (timestamp_mark) + bitmap_set_bit(table->write_set, + table->timestamp_field->field_index); } - return 0; } @@ -269,8 +269,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, By default, both logs are enabled (this won't cause problems if the server runs without --log-update or --log-bin). */ - bool log_on= (thd->options & OPTION_BIN_LOG) || - (!(thd->security_ctx->master_access & SUPER_ACL)); + bool log_on= ((thd->options & OPTION_BIN_LOG) || + (!(thd->security_ctx->master_access & SUPER_ACL))); bool transactional_table, joins_freed= FALSE; bool changed; uint value_count; @@ -380,7 +380,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter); goto abort; } - if (setup_fields(thd, 0, *values, 0, 0, 0)) + if (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0)) goto abort; } its.rewind (); @@ -753,7 +753,7 @@ static bool check_view_insertability(THD * thd, TABLE_LIST *view) */ static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list, - List<Item> &fields, COND **where, + List<Item> &fields, bool select_insert) { bool insert_into_view= (table_list->view != 0); @@ -761,7 +761,7 @@ static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list, if (setup_tables(thd, &thd->lex->select_lex.context, &thd->lex->select_lex.top_join_list, - table_list, where, &thd->lex->select_lex.leaf_tables, + table_list, &thd->lex->select_lex.leaf_tables, select_insert)) DBUG_RETURN(TRUE); @@ -851,8 +851,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, DBUG_RETURN(TRUE); } - if (mysql_prepare_insert_check_table(thd, table_list, fields, where, - select_insert)) + if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert)) DBUG_RETURN(TRUE); /* Save the state of the current name resolution context. */ @@ -869,7 +868,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, if (values && !(res= check_insert_fields(thd, context->table_list, fields, *values, !insert_into_view) || - setup_fields(thd, 0, *values, 0, 0, 0)) && + setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0)) && duplic == DUP_UPDATE) { select_lex->no_wrap_view_item= TRUE; @@ -887,7 +886,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, next_name_resolution_table= ctx_state.save_next_local; } if (!res) - res= setup_fields(thd, 0, update_values, 1, 0, 0); + res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0); } /* Restore the current context. */ @@ -912,7 +911,7 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, select_lex->first_execution= 0; } if (duplic == DUP_UPDATE || duplic == DUP_REPLACE) - table->file->ha_retrieve_all_pk(); + table->prepare_for_position(); DBUG_RETURN(FALSE); } @@ -959,9 +958,12 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) { int error, trg_error= 0; char *key=0; + MY_BITMAP *save_read_set, *save_write_set; DBUG_ENTER("write_record"); info->records++; + save_read_set= table->read_set; + save_write_set= table->write_set; if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE) @@ -977,6 +979,8 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) error=HA_WRITE_SKIP; /* Database can't find key */ goto err; } + /* Read all columns for the row we are going to replace */ + table->use_all_columns(); /* Don't allow REPLACE to replace a row when a auto_increment column was used. This ensures that we don't get a problem when the @@ -987,9 +991,9 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) key_nr == table->s->next_number_index && table->file->auto_increment_column_changed) goto err; - if (table->file->table_flags() & HA_DUPP_POS) + if (table->file->ha_table_flags() & HA_DUPLICATE_POS) { - if (table->file->rnd_pos(table->record[1],table->file->dupp_ref)) + if (table->file->rnd_pos(table->record[1],table->file->dup_ref)) goto err; } else @@ -1050,7 +1054,8 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) thd->clear_next_insert_id= 0; thd->next_insert_id= 0; } - if ((error=table->file->ha_update_row(table->record[1],table->record[0]))) + if ((error=table->file->ha_update_row(table->record[1], + table->record[0]))) { if ((error == HA_ERR_FOUND_DUPP_KEY) && info->ignore) goto ok_or_after_trg_err; @@ -1127,6 +1132,13 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) trg_error= (table->triggers && table->triggers->process_triggers(thd, TRG_EVENT_INSERT, TRG_ACTION_AFTER, TRUE)); + /* + Restore column maps if they where replaced during an duplicate key + problem. + */ + if (table->read_set != save_read_set || + table->write_set != save_write_set) + table->column_bitmaps_set(save_read_set, save_write_set); } else if ((error=table->file->ha_write_row(table->record[0]))) { @@ -1160,6 +1172,7 @@ err: before_trg_err: if (key) my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH); + table->column_bitmaps_set(save_read_set, save_write_set); DBUG_RETURN(1); } @@ -1172,9 +1185,11 @@ int check_that_all_fields_are_given_values(THD *thd, TABLE *entry, TABLE_LIST *table_list) { int err= 0; + MY_BITMAP *write_set= entry->write_set; + for (Field **field=entry->field ; *field ; field++) { - if ((*field)->query_id != thd->query_id && + if (!bitmap_is_set(write_set, (*field)->field_index) && ((*field)->flags & NO_DEFAULT_VALUE_FLAG) && ((*field)->real_type() != FIELD_TYPE_ENUM)) { @@ -1506,6 +1521,7 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) Field **field,**org_field, *found_next_number_field; TABLE *copy; TABLE_SHARE *share= table->s; + byte *bitmap; /* First request insert thread to get a lock */ status=1; @@ -1532,14 +1548,16 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) client_thd->proc_info="allocating local table"; copy= (TABLE*) client_thd->alloc(sizeof(*copy)+ (share->fields+1)*sizeof(Field**)+ - share->reclength); + share->reclength + + share->column_bitmap_size*2); if (!copy) goto error; *copy= *table; /* We don't need to change the file handler here */ - field=copy->field=(Field**) (copy+1); - copy->record[0]=(byte*) (field+share->fields+1); + field= copy->field= (Field**) (copy+1); + bitmap= (byte*) (field+share->fields+1); + copy->record[0]= (bitmap+ share->column_bitmap_size*2); memcpy((char*) copy->record[0],(char*) table->record[0],share->reclength); /* Make a copy of all fields */ @@ -1568,13 +1586,21 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type(); } - /* Adjust in_use for pointing to client thread */ copy->in_use= client_thd; /* Adjust lock_count. This table object is not part of a lock. */ copy->lock_count= 0; + /* Adjust bitmaps */ + copy->def_read_set.bitmap= (my_bitmap_map*) bitmap; + copy->def_write_set.bitmap= ((my_bitmap_map*) + (bitmap + share->column_bitmap_size)); + copy->tmp_set.bitmap= 0; // To catch errors + bzero((char*) bitmap, share->column_bitmap_size*2); + copy->read_set= ©->def_read_set; + copy->write_set= ©->def_write_set; + return copy; /* Got fatal error */ @@ -1742,7 +1768,7 @@ pthread_handler_t handle_delayed_insert(void *arg) thd->fatal_error(); // Abort waiting inserts goto err; } - if (!(di->table->file->table_flags() & HA_CAN_INSERT_DELAYED)) + if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED)) { thd->fatal_error(); my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name); @@ -1952,6 +1978,7 @@ bool delayed_insert::handle_inserts(void) pthread_mutex_unlock(&mutex); table->next_number_field=table->found_next_number_field; + table->use_all_columns(); thd.proc_info="upgrading lock"; if (thr_upgrade_write_delay_lock(*thd.lock->locks)) @@ -2058,7 +2085,6 @@ bool delayed_insert::handle_inserts(void) } thd.proc_info=0; - table->next_number_field=0; pthread_mutex_unlock(&mutex); /* After releasing the mutex, to prevent deadlocks. */ @@ -2181,7 +2207,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) lex->current_select= &lex->select_lex; res= check_insert_fields(thd, table_list, *fields, values, !insert_into_view) || - setup_fields(thd, 0, values, 0, 0, 0); + setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0); if (info.handle_duplicates == DUP_UPDATE) { @@ -2211,7 +2237,8 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) context->first_name_resolution_table-> next_name_resolution_table= ctx_state.save_next_local; } - res= res || setup_fields(thd, 0, *info.update_values, 1, 0, 0); + res= res || setup_fields(thd, 0, *info.update_values, MARK_COLUMNS_READ, + 0, 0); /* Restore the current context. */ ctx_state.restore_state(context, table_list); @@ -2264,16 +2291,6 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) check_that_all_fields_are_given_values(thd, table, table_list)) || table_list->prepare_where(thd, 0, TRUE) || table_list->prepare_check_option(thd)); - - /* - For non-transactional non-temporary tables, we set the - OPTION_STATUS_NO_TRANS_UPDATE flag here. The send_eof() function - is used by both the select_insert and the select_create classes, - so setting it there would clash. - */ - if (!(table->file->has_transactions() || table->s->tmp_table)) - thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; - DBUG_RETURN(res); } @@ -2391,7 +2408,9 @@ void select_insert::send_error(uint errcode,const char *err) { DBUG_ENTER("select_insert::send_error"); - my_message(errcode, err, MYF(0)); + /* Avoid an extra 'unknown error' message if we already reported an error */ + if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error) + my_message(errcode, err, MYF(0)); if (!table) { @@ -2419,11 +2438,8 @@ void select_insert::send_error(uint errcode,const char *err) INSERT-SELECT. When replicating a CREATE-SELECT statement, we shall not write the - events to the binary log. To prevent the ha_rollback_stmt() below - from writing to the binary log, we have to pretend that the table - is transactional, even if it actually is not. Therefore, the - OPTION_STATUS_NO_TRANS_UPDATE is cleared in - select_create::prepare() and will remain cleared here. + events to the binary log and should thus not set + OPTION_STATUS_NO_TRANS_UPDATE. When replicating INSERT-SELECT, we shall not write the events to the binary log for transactional table, but shall write all events @@ -2431,22 +2447,22 @@ void select_insert::send_error(uint errcode,const char *err) this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a write to a non-transactional table, otherwise it is cleared. */ - if ((info.copied || info.deleted || info.updated) && - !table->file->has_transactions()) + if (info.copied || info.deleted || info.updated) { - if (last_insert_id) - thd->insert_id(last_insert_id); // For binary log - if (mysql_bin_log.is_open()) + if (!table->file->has_transactions()) { - thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - table->file->has_transactions(), FALSE); + if (last_insert_id) + thd->insert_id(last_insert_id); // For binary log + if (mysql_bin_log.is_open()) + { + thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, + table->file->has_transactions(), FALSE); + } + if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table && + !can_rollback_data()) + thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; + query_cache_invalidate3(thd, table, 1); } - if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table) - thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; - } - if (info.copied || info.deleted || info.updated) - { - query_cache_invalidate3(thd, table, 1); } ha_rollback_stmt(thd); DBUG_VOID_RETURN; @@ -2461,22 +2477,25 @@ bool select_insert::send_eof() error= (!thd->prelocked_mode) ? table->file->end_bulk_insert():0; table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); - /* - We must invalidate the table in the query cache before binlog writing - and ha_autocommit_or_rollback. - - If nothing was inserted in the table, there is no need to emit a - ROLLBACK statement to the binary log, so in that case we clear - OPTION_STATUS_NO_TRANS_UPDATE. - - Observe that select_insert::send_eof() is used by both - select_insert and select_create and that they set the flag in - different manners. See Note 1 below for more info. - */ if (info.copied || info.deleted || info.updated) + { + /* + We must invalidate the table in the query cache before binlog writing + and ha_autocommit_or_rollback. + */ query_cache_invalidate3(thd, table, 1); - else - thd->options&= ~OPTION_STATUS_NO_TRANS_UPDATE; + /* + Mark that we have done permanent changes if all of the below is true + - Table doesn't support transactions + - It's a normal (not temporary) table. (Changes to temporary tables + are not logged in RBR) + - We are using statement based replication + */ + if (!table->file->has_transactions() && + (!table->s->tmp_table || + !thd->current_stmt_binlog_row_based)) + thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; + } if (last_insert_id) thd->insert_id(last_insert_id); // For binary log @@ -2518,31 +2537,30 @@ bool select_insert::send_eof() CREATE TABLE (SELECT) ... ***************************************************************************/ -int -select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) +class MY_HOOKS : public TABLEOP_HOOKS { - DBUG_ENTER("select_create::prepare"); - - class MY_HOOKS : public TABLEOP_HOOKS { - public: - MY_HOOKS(select_create *x) : ptr(x) { } - virtual void do_prelock(TABLE **tables, uint count) +public: + MY_HOOKS(select_create *x) : ptr(x) { } + virtual void do_prelock(TABLE **tables, uint count) { if (ptr->get_thd()->current_stmt_binlog_row_based) ptr->binlog_show_create_table(tables, count); } - private: - select_create *ptr; - }; +private: + select_create *ptr; +}; + +int select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) +{ MY_HOOKS hooks(this); + DBUG_ENTER("select_create::prepare"); unit= u; - table= create_table_from_items(thd, create_info, create_table, - extra_fields, keys, &values, &lock, - &hooks); - if (!table) + if (!(table= create_table_from_items(thd, create_info, create_table, + extra_fields, keys, &values, &lock, + &hooks))) DBUG_RETURN(-1); // abort() deletes table if (table->s->fields < values.elements) @@ -2551,16 +2569,15 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) DBUG_RETURN(-1); } - /* First field to copy */ + /* First field to copy */ field= table->field+table->s->fields - values.elements; /* Mark all fields that are given values */ for (Field **f= field ; *f ; f++) - (*f)->query_id= thd->query_id; + bitmap_set_bit(table->write_set, (*f)->field_index); /* Don't set timestamp if used */ table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; - table->next_number_field=table->found_next_number_field; restore_record(table,s->default_values); // Get empty record @@ -2579,8 +2596,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) } -void -select_create::binlog_show_create_table(TABLE **tables, uint count) +void select_create::binlog_show_create_table(TABLE **tables, uint count) { /* Note 1: In RBR mode, we generate a CREATE TABLE statement for the @@ -2598,25 +2614,22 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) since there potentially are sub-selects or accesses to information schema that will do a close_thread_tables(), destroying the statement transaction cache. - - To ensure that the event kaboodle is not written to the binary log - on rollback, we clear the OPTION_STATUS_NO_TRANS_UPDATE bit of - thd->options. - */ + */ DBUG_ASSERT(thd->current_stmt_binlog_row_based); DBUG_ASSERT(tables && *tables && count > 0); - thd->options&= ~OPTION_STATUS_NO_TRANS_UPDATE; char buf[2048]; String query(buf, sizeof(buf), system_charset_info); - query.length(0); // Have to zero it since constructor doesn't - + int result; TABLE_LIST table_list; + memset(&table_list, 0, sizeof(table_list)); table_list.table = *tables; + query.length(0); // Have to zero it since constructor doesn't - int result= store_create_info(thd, &table_list, &query, create_info); + result= store_create_info(thd, &table_list, &query, create_info); DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */ + thd->binlog_query(THD::STMT_QUERY_TYPE, query.ptr(), query.length(), /* is_trans */ TRUE, @@ -2653,17 +2666,9 @@ bool select_create::send_eof() table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); VOID(pthread_mutex_lock(&LOCK_open)); mysql_unlock_tables(thd, lock); - /* - TODO: - Check if we can remove the following two rows. - We should be able to just keep the table in the table cache. - */ if (!table->s->tmp_table) { - ulong version= table->s->version; - hash_delete(&open_cache,(byte*) table); - /* Tell threads waiting for refresh that something has happened */ - if (version != refresh_version) + if (close_thread_table(thd, &table)) VOID(pthread_cond_broadcast(&COND_refresh)); } lock=0; |