diff options
Diffstat (limited to 'sql/table.cc')
-rw-r--r-- | sql/table.cc | 764 |
1 files changed, 688 insertions, 76 deletions
diff --git a/sql/table.cc b/sql/table.cc index 2129da90a29..988e082addf 100644 --- a/sql/table.cc +++ b/sql/table.cc @@ -18,6 +18,7 @@ #include "mysql_priv.h" #include "sql_trigger.h" +#include "create_options.h" #include <m_ctype.h> #include "my_md5.h" @@ -33,6 +34,12 @@ LEX_STRING GENERAL_LOG_NAME= {C_STRING_WITH_LEN("general_log")}; /* SLOW_LOG name */ LEX_STRING SLOW_LOG_NAME= {C_STRING_WITH_LEN("slow_log")}; +/* + Keyword added as a prefix when parsing the defining expression for a + virtual column read from the column definition saved in the frm file +*/ +LEX_STRING parse_vcol_keyword= { C_STRING_WITH_LEN("PARSE_VCOL_EXPR ") }; + /* Functions defined in this file */ void open_table_error(TABLE_SHARE *share, int error, int db_errno, @@ -672,12 +679,14 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, uint interval_count, interval_parts, read_length, int_length; uint db_create_options, keys, key_parts, n_length; uint key_info_length, com_length, null_bit_pos; - uint extra_rec_buf_length; + uint vcol_screen_length; + uint extra_rec_buf_length, options_len; uint i,j; bool use_hash; - char *keynames, *names, *comment_pos; + char *keynames, *names, *comment_pos, *vcol_screen_pos; uchar *record; - uchar *disk_buff, *strpos, *null_flags, *null_pos; + uchar *disk_buff, *strpos, *null_flags, *null_pos, *options; + uchar *buff= 0; ulong pos, record_offset, *rec_per_key, rec_buff_length; handler *handler_file= 0; KEY *keyinfo; @@ -795,7 +804,6 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, for (i=0 ; i < keys ; i++, keyinfo++) { - keyinfo->table= 0; // Updated in open_frm if (new_frm_ver >= 3) { keyinfo->flags= (uint) uint2korr(strpos) ^ HA_NOSAME; @@ -847,6 +855,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, strpos+= (strmov(keynames, (char *) strpos) - keynames)+1; share->reclength = uint2korr((head+16)); + share->stored_rec_length= share->reclength; if (*(head+26) == 1) share->system= 1; /* one-record-database */ #ifdef HAVE_CRYPTED_FRM @@ -864,15 +873,14 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, if ((n_length= uint4korr(head+55))) { /* Read extra data segment */ - uchar *buff, *next_chunk, *buff_end; + uchar *next_chunk, *buff_end; DBUG_PRINT("info", ("extra segment size is %u bytes", n_length)); if (!(next_chunk= buff= (uchar*) my_malloc(n_length, MYF(MY_WME)))) goto err; if (my_pread(file, buff, n_length, record_offset + share->reclength, MYF(MY_NABP))) { - my_free(buff, MYF(0)); - goto err; + goto free_and_err; } share->connect_string.length= uint2korr(buff); if (!(share->connect_string.str= strmake_root(&share->mem_root, @@ -880,8 +888,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, share->connect_string. length))) { - my_free(buff, MYF(0)); - goto err; + goto free_and_err; } next_chunk+= share->connect_string.length + 2; buff_end= buff + n_length; @@ -901,8 +908,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, plugin_data(tmp_plugin, handlerton *))) { /* bad file, legacy_db_type did not match the name */ - my_free(buff, MYF(0)); - goto err; + goto free_and_err; } /* tmp_plugin is locked with a local lock. @@ -931,8 +937,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, error= 8; my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--skip-partition"); - my_free(buff, MYF(0)); - goto err; + goto free_and_err; } plugin_unlock(NULL, share->db_plugin); share->db_plugin= ha_lock_engine(NULL, partition_hton); @@ -946,8 +951,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, /* purecov: begin inspected */ error= 8; my_error(ER_UNKNOWN_STORAGE_ENGINE, MYF(0), name.str); - my_free(buff, MYF(0)); - goto err; + goto free_and_err; /* purecov: end */ } next_chunk+= str_db_type_length + 2; @@ -963,40 +967,26 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, memdup_root(&share->mem_root, next_chunk + 4, partition_info_len + 1))) { - my_free(buff, MYF(0)); - goto err; + goto free_and_err; } } #else if (partition_info_len) { DBUG_PRINT("info", ("WITH_PARTITION_STORAGE_ENGINE is not defined")); - my_free(buff, MYF(0)); - goto err; + goto free_and_err; } #endif next_chunk+= 5 + partition_info_len; } -#if MYSQL_VERSION_ID < 50200 - if (share->mysql_version >= 50106 && share->mysql_version <= 50109) - { - /* - Partition state array was here in version 5.1.6 to 5.1.9, this code - makes it possible to load a 5.1.6 table in later versions. Can most - likely be removed at some point in time. Will only be used for - upgrades within 5.1 series of versions. Upgrade to 5.2 can only be - done from newer 5.1 versions. - */ - next_chunk+= 4; - } - else if (share->mysql_version >= 50110) -#endif + if (share->mysql_version >= 50110) { /* New auto_partitioned indicator introduced in 5.1.11 */ #ifdef WITH_PARTITION_STORAGE_ENGINE share->auto_partitioned= *next_chunk; #endif next_chunk++; + DBUG_ASSERT(next_chunk <= buff_end); } keyinfo= share->key_info; for (i= 0; i < keys; i++, keyinfo++) @@ -1008,8 +998,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, { DBUG_PRINT("error", ("fulltext key uses parser that is not defined in .frm")); - my_free(buff, MYF(0)); - goto err; + goto free_and_err; } parser_name.str= (char*) next_chunk; parser_name.length= strlen((char*) next_chunk); @@ -1019,12 +1008,22 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, if (! keyinfo->parser) { my_error(ER_PLUGIN_IS_NOT_LOADED, MYF(0), parser_name.str); - my_free(buff, MYF(0)); - goto err; + goto free_and_err; } } } - my_free(buff, MYF(0)); + DBUG_ASSERT(next_chunk <= buff_end); + if (share->db_create_options & HA_OPTION_TEXT_CREATE_OPTIONS) + { + /* + store options position, but skip till the time we will + know number of fields + */ + options_len= uint4korr(next_chunk); + options= next_chunk + 4; + next_chunk+= options_len + 4; + } + DBUG_ASSERT(next_chunk <= buff_end); } share->key_block_size= uint2korr(head+62); @@ -1034,21 +1033,21 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, share->rec_buff_length= rec_buff_length; if (!(record= (uchar *) alloc_root(&share->mem_root, rec_buff_length))) - goto err; /* purecov: inspected */ + goto free_and_err; /* purecov: inspected */ share->default_values= record; if (my_pread(file, record, (size_t) share->reclength, record_offset, MYF(MY_NABP))) - goto err; /* purecov: inspected */ + goto free_and_err; /* purecov: inspected */ VOID(my_seek(file,pos,MY_SEEK_SET,MYF(0))); if (my_read(file, head,288,MYF(MY_NABP))) - goto err; + goto free_and_err; #ifdef HAVE_CRYPTED_FRM if (crypted) { crypted->decode((char*) head+256,288-256); if (sint2korr(head+284) != 0) // Should be 0 - goto err; // Wrong password + goto free_and_err; // Wrong password } #endif @@ -1060,11 +1059,15 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, int_length= uint2korr(head+274); share->null_fields= uint2korr(head+282); com_length= uint2korr(head+284); + vcol_screen_length= uint2korr(head+286); + share->vfields= 0; + share->stored_fields= share->fields; share->comment.length= (int) (head[46]); share->comment.str= strmake_root(&share->mem_root, (char*) head+47, share->comment.length); - DBUG_PRINT("info",("i_count: %d i_parts: %d index: %d n_length: %d int_length: %d com_length: %d", interval_count,interval_parts, share->keys,n_length,int_length, com_length)); + DBUG_PRINT("info",("i_count: %d i_parts: %d index: %d n_length: %d int_length: %d com_length: %d vcol_screen_length: %d", interval_count,interval_parts, share->keys,n_length,int_length, com_length, vcol_screen_length)); + if (!(field_ptr = (Field **) alloc_root(&share->mem_root, @@ -1072,14 +1075,16 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, interval_count*sizeof(TYPELIB)+ (share->fields+interval_parts+ keys+3)*sizeof(char *)+ - (n_length+int_length+com_length))))) - goto err; /* purecov: inspected */ + (n_length+int_length+com_length+ + vcol_screen_length))))) + goto free_and_err; /* purecov: inspected */ share->field= field_ptr; read_length=(uint) (share->fields * field_pack_length + - pos+ (uint) (n_length+int_length+com_length)); + pos+ (uint) (n_length+int_length+com_length+ + vcol_screen_length)); if (read_string(file,(uchar**) &disk_buff,read_length)) - goto err; /* purecov: inspected */ + goto free_and_err; /* purecov: inspected */ #ifdef HAVE_CRYPTED_FRM if (crypted) { @@ -1098,11 +1103,15 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, memcpy((char*) names, strpos+(share->fields*field_pack_length), (uint) (n_length+int_length)); comment_pos= names+(n_length+int_length); - memcpy(comment_pos, disk_buff+read_length-com_length, com_length); + memcpy(comment_pos, disk_buff+read_length-com_length-vcol_screen_length, + com_length); + vcol_screen_pos= names+(n_length+int_length+com_length); + memcpy(vcol_screen_pos, disk_buff+read_length-vcol_screen_length, + vcol_screen_length); fix_type_pointers(&interval_array, &share->fieldnames, 1, &names); if (share->fieldnames.count != share->fields) - goto err; + goto free_and_err; fix_type_pointers(&interval_array, share->intervals, interval_count, &names); @@ -1116,7 +1125,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, uint count= (uint) (interval->count + 1) * sizeof(uint); if (!(interval->type_lengths= (uint *) alloc_root(&share->mem_root, count))) - goto err; + goto free_and_err; for (count= 0; count < interval->count; count++) { char *val= (char*) interval->type_names[count]; @@ -1132,7 +1141,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, /* Allocate handler */ if (!(handler_file= get_new_handler(share, thd->mem_root, share->db_type()))) - goto err; + goto free_and_err; record= share->default_values-1; /* Fieldstart = 1 */ null_bits_are_used= share->null_fields != 0; @@ -1167,10 +1176,14 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, for (i=0 ; i < share->fields; i++, strpos+=field_pack_length, field_ptr++) { uint pack_flag, interval_nr, unireg_type, recpos, field_length; + uint vcol_info_length=0; + uint vcol_expr_length=0; enum_field_types field_type; CHARSET_INFO *charset=NULL; Field::geometry_type geom_type= Field::GEOM_GEOMETRY; LEX_STRING comment; + Virtual_column_info *vcol_info= 0; + bool fld_stored_in_db= TRUE; if (new_frm_ver >= 3) { @@ -1191,7 +1204,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, charset= &my_charset_bin; #else error= 4; // unsupported field type - goto err; + goto free_and_err; #endif } else @@ -1202,9 +1215,21 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, { error= 5; // Unknown or unavailable charset errarg= (int) strpos[14]; - goto err; + goto free_and_err; } } + + if ((uchar)field_type == (uchar)MYSQL_TYPE_VIRTUAL) + { + DBUG_ASSERT(interval_nr); // Expect non-null expression + /* + The interval_id byte in the .frm file stores the length of the + expression statement for a virtual column. + */ + vcol_info_length= interval_nr; + interval_nr= 0; + } + if (!comment_length) { comment.str= (char*) ""; @@ -1216,6 +1241,34 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, comment.length= comment_length; comment_pos+= comment_length; } + + if (vcol_info_length) + { + /* + Get virtual column data stored in the .frm file as follows: + byte 1 = 1 (always 1 to allow for future extensions) + byte 2 = sql_type + byte 3 = flags (as of now, 0 - no flags, 1 - field is physically stored) + byte 4-... = virtual column expression (text data) + */ + vcol_info= new Virtual_column_info(); + if ((uint)vcol_screen_pos[0] != 1) + { + error= 4; + goto free_and_err; + } + field_type= (enum_field_types) (uchar) vcol_screen_pos[1]; + fld_stored_in_db= (bool) (uint) vcol_screen_pos[2]; + vcol_expr_length= vcol_info_length-(uint)FRM_VCOL_HEADER_SIZE; + if (!(vcol_info->expr_str.str= + (char *)memdup_root(&share->mem_root, + vcol_screen_pos+(uint)FRM_VCOL_HEADER_SIZE, + vcol_expr_length))) + goto free_and_err; + vcol_info->expr_str.length= vcol_expr_length; + vcol_screen_pos+= vcol_info_length; + share->vfields++; + } } else { @@ -1301,11 +1354,13 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, if (!reg_field) // Not supported field type { error= 4; - goto err; /* purecov: inspected */ + goto free_and_err; /* purecov: inspected */ } reg_field->field_index= i; reg_field->comment=comment; + reg_field->vcol_info= vcol_info; + reg_field->stored_in_db= fld_stored_in_db; if (field_type == MYSQL_TYPE_BIT && !f_bit_as_char(pack_flag)) { null_bits_are_used= 1; @@ -1329,7 +1384,9 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, share->timestamp_field_offset= i; if (use_hash) - if (my_hash_insert(&share->name_hash, (uchar*) field_ptr) ) + { + if (my_hash_insert(&share->name_hash, + (uchar*) field_ptr)) { /* Set return code 8 here to indicate that an error has @@ -1337,10 +1394,20 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, sent (OOM). */ error= 8; - goto err; + goto free_and_err; } + } + if (!reg_field->stored_in_db) + { + share->stored_fields--; + if (share->stored_rec_length>=recpos) + share->stored_rec_length= recpos-1; + } } *field_ptr=0; // End marker + /* Sanity checks: */ + DBUG_ASSERT(share->fields>=share->stored_fields); + DBUG_ASSERT(share->reclength>=share->stored_rec_length); /* Fix key->name and key_part->field */ if (key_parts) @@ -1355,6 +1422,19 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, { uint usable_parts= 0; keyinfo->name=(char*) share->keynames.type_names[key]; + keyinfo->name_length= strlen(keyinfo->name); + keyinfo->cache_name= + (uchar*) alloc_root(&share->mem_root, + share->table_cache_key.length+ + keyinfo->name_length + 1); + if (keyinfo->cache_name) // If not out of memory + { + uchar *pos= keyinfo->cache_name; + memcpy(pos, share->table_cache_key.str, share->table_cache_key.length); + memcpy(pos + share->table_cache_key.length, keyinfo->name, + keyinfo->name_length+1); + } + /* Fix fulltext keys for old .frm files */ if (share->key_info[key].flags & HA_FULLTEXT) share->key_info[key].algorithm= HA_KEY_ALG_FULLTEXT; @@ -1391,7 +1471,7 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, if (!key_part->fieldnr) { error= 4; // Wrong file - goto err; + goto free_and_err; } field= key_part->field= share->field[key_part->fieldnr-1]; key_part->type= field->key_type(); @@ -1417,12 +1497,6 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, keyinfo->extra_length+=HA_KEY_BLOB_LENGTH; key_part->store_length+=HA_KEY_BLOB_LENGTH; keyinfo->key_length+= HA_KEY_BLOB_LENGTH; - /* - Mark that there may be many matching values for one key - combination ('a', 'a ', 'a '...) - */ - if (!(field->flags & BINARY_FLAG)) - keyinfo->flags|= HA_END_SPACE_KEY; } if (field->type() == MYSQL_TYPE_BIT) key_part->key_part_flag|= HA_BIT_PART; @@ -1508,6 +1582,15 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, */ if (field->real_maybe_null()) key_part->key_part_flag|= HA_NULL_PART; + /* + Sometimes we can compare key parts for equality with memcmp. + But not always. + */ + if (!(key_part->key_part_flag & (HA_BLOB_PART | HA_VAR_LENGTH_PART | + HA_BIT_PART)) && + key_part->type != HA_KEYTYPE_FLOAT && + key_part->type == HA_KEYTYPE_DOUBLE) + key_part->key_part_flag|= HA_CAN_MEMCMP; } keyinfo->usable_key_parts= usable_parts; // Filesort @@ -1556,6 +1639,16 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, null_length, 255); } + if (share->db_create_options & HA_OPTION_TEXT_CREATE_OPTIONS) + { + DBUG_ASSERT(options_len); + if (engine_table_options_frm_read(options, options_len, share)) + goto free_and_err; + } + if (parse_engine_table_options(thd, handler_file->partition_ht(), share)) + goto free_and_err; + my_free(buff, MYF(MY_ALLOW_ZERO_PTR)); + if (share->found_next_number_field) { reg_field= *share->found_next_number_field; @@ -1617,6 +1710,8 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, #endif DBUG_RETURN (0); + free_and_err: + my_free(buff, MYF(MY_ALLOW_ZERO_PTR)); err: share->error= error; share->open_errno= my_errno; @@ -1630,6 +1725,315 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head, DBUG_RETURN(error); } /* open_binary_frm */ +/* + @brief + Clear GET_FIXED_FIELDS_FLAG in all fields of a table + + @param + table The table for whose fields the flags are to be cleared + + @note + This routine is used for error handling purposes. + + @return + none +*/ + +static void clear_field_flag(TABLE *table) +{ + Field **ptr; + DBUG_ENTER("clear_field_flag"); + + for (ptr= table->field; *ptr; ptr++) + (*ptr)->flags&= (~GET_FIXED_FIELDS_FLAG); + DBUG_VOID_RETURN; +} + + +/* + @brief + Perform semantic analysis of the defining expression for a virtual column + + @param + thd The thread object + @param + table The table containing the virtual column + @param + vcol_field The virtual field whose defining expression is to be analyzed + + @details + The function performs semantic analysis of the defining expression for + the virtual column vcol_field. The expression is used to compute the + values of this column. + + @note + The function exploits the fact that the fix_fields method sets the flag + GET_FIXED_FIELDS_FLAG for all fields in the item tree. + This flag must always be unset before returning from this function + since it is used for other purposes as well. + + @retval + TRUE An error occurred, something was wrong with the function + @retval + FALSE Otherwise +*/ + +bool fix_vcol_expr(THD *thd, + TABLE *table, + Field *vcol_field) +{ + Virtual_column_info *vcol_info= vcol_field->vcol_info; + Item* func_expr= vcol_info->expr_item; + uint dir_length, home_dir_length; + bool result= TRUE; + TABLE_LIST tables; + TABLE_LIST *save_table_list, *save_first_table, *save_last_table; + int error; + Name_resolution_context *context; + const char *save_where; + char* db_name; + char db_name_string[FN_REFLEN]; + bool save_use_only_table_context; + Field **ptr, *field; + enum_mark_columns save_mark_used_columns= thd->mark_used_columns; + DBUG_ASSERT(func_expr); + DBUG_ENTER("fix_vcol_expr"); + + /* + Set-up the TABLE_LIST object to be a list with a single table + Set the object to zero to create NULL pointers and set alias + and real name to table name and get database name from file name. + */ + + bzero((void*)&tables, sizeof(TABLE_LIST)); + tables.alias= tables.table_name= (char*) table->s->table_name.str; + tables.table= table; + tables.next_local= 0; + tables.next_name_resolution_table= 0; + strmov(db_name_string, table->s->normalized_path.str); + dir_length= dirname_length(db_name_string); + db_name_string[dir_length - 1]= 0; + home_dir_length= dirname_length(db_name_string); + db_name= &db_name_string[home_dir_length]; + tables.db= db_name; + + thd->mark_used_columns= MARK_COLUMNS_NONE; + + context= thd->lex->current_context(); + table->map= 1; //To ensure correct calculation of const item + table->get_fields_in_item_tree= TRUE; + save_table_list= context->table_list; + save_first_table= context->first_name_resolution_table; + save_last_table= context->last_name_resolution_table; + context->table_list= &tables; + context->first_name_resolution_table= &tables; + context->last_name_resolution_table= NULL; + func_expr->walk(&Item::change_context_processor, 0, (uchar*) context); + save_where= thd->where; + thd->where= "virtual column function"; + + /* Save the context before fixing the fields*/ + save_use_only_table_context= thd->lex->use_only_table_context; + thd->lex->use_only_table_context= TRUE; + /* Fix fields referenced to by the virtual column function */ + error= func_expr->fix_fields(thd, (Item**)0); + /* Restore the original context*/ + thd->lex->use_only_table_context= save_use_only_table_context; + context->table_list= save_table_list; + context->first_name_resolution_table= save_first_table; + context->last_name_resolution_table= save_last_table; + + if (unlikely(error)) + { + DBUG_PRINT("info", + ("Field in virtual column expression does not belong to the table")); + goto end; + } + thd->where= save_where; + if (unlikely(func_expr->result_type() == ROW_RESULT)) + { + my_error(ER_ROW_EXPR_FOR_VCOL, MYF(0)); + goto end; + } +#ifdef PARANOID + /* + Walk through the Item tree checking if all items are valid + to be part of the virtual column + */ + error= func_expr->walk(&Item::check_vcol_func_processor, 0, NULL); + if (error) + { + my_error(ER_VIRTUAL_COLUMN_FUNCTION_IS_NOT_ALLOWED, MYF(0), field_name); + goto end; + } +#endif + if (unlikely(func_expr->const_item())) + { + my_error(ER_CONST_EXPR_IN_VCOL, MYF(0)); + goto end; + } + /* Ensure that this virtual column is not based on another virtual field. */ + ptr= table->field; + while ((field= *(ptr++))) + { + if ((field->flags & GET_FIXED_FIELDS_FLAG) && + (field->vcol_info)) + { + my_error(ER_VCOL_BASED_ON_VCOL, MYF(0)); + goto end; + } + } + result= FALSE; + +end: + + /* Clear GET_FIXED_FIELDS_FLAG for the fields of the table */ + clear_field_flag(table); + + table->get_fields_in_item_tree= FALSE; + thd->mark_used_columns= save_mark_used_columns; + table->map= 0; //Restore old value + + DBUG_RETURN(result); +} + +/* + @brief + Unpack the definition of a virtual column from its linear representation + + @parm + thd The thread object + @param + table The table containing the virtual column + @param + field The field for the virtual + @param + vcol_expr The string representation of the defining expression + @param[out] + error_reported The flag to inform the caller that no other error + messages are to be generated + + @details + The function takes string representation 'vcol_expr' of the defining + expression for the virtual field 'field' of the table 'table' and + parses it, building an item object for it. The pointer to this item is + placed into in field->vcol_info.expr_item. After this the function performs + semantic analysis of the item by calling the the function fix_vcol_expr. + Since the defining expression is part of the table definition the item for + it is created in table->memroot within the special arena TABLE::expr_arena. + + @note + Before passing 'vcol_expr" to the parser the function embraces it in + parenthesis and prepands it a special keyword. + + @retval + FALSE If a success + @retval + TRUE Otherwise +*/ +bool unpack_vcol_info_from_frm(THD *thd, + TABLE *table, + Field *field, + LEX_STRING *vcol_expr, + bool *error_reported) +{ + bool rc; + char *vcol_expr_str; + int str_len; + CHARSET_INFO *old_character_set_client; + Query_arena *backup_stmt_arena_ptr; + Query_arena backup_arena; + Query_arena *vcol_arena; + Parser_state parser_state; + DBUG_ENTER("unpack_vcol_info_from_frm"); + DBUG_ASSERT(vcol_expr); + + /* + Step 1: Construct the input string for the parser. + The string to be parsed has to be of the following format: + "PARSE_VCOL_EXPR (<expr_string_from_frm>)". + */ + + if (!(vcol_expr_str= (char*) alloc_root(&table->mem_root, + vcol_expr->length + + parse_vcol_keyword.length + 3))) + { + DBUG_RETURN(TRUE); + } + memcpy(vcol_expr_str, + (char*) parse_vcol_keyword.str, + parse_vcol_keyword.length); + str_len= parse_vcol_keyword.length; + memcpy(vcol_expr_str + str_len, "(", 1); + str_len++; + memcpy(vcol_expr_str + str_len, + (char*) vcol_expr->str, + vcol_expr->length); + str_len+= vcol_expr->length; + memcpy(vcol_expr_str + str_len, ")", 1); + str_len++; + memcpy(vcol_expr_str + str_len, "\0", 1); + str_len++; + + if (parser_state.init(thd, vcol_expr_str, str_len)) + goto err; + + /* + Step 2: Setup thd for parsing. + */ + backup_stmt_arena_ptr= thd->stmt_arena; + vcol_arena= table->expr_arena; + if (!vcol_arena) + { + Query_arena expr_arena(&table->mem_root, Query_arena::INITIALIZED); + if (!(vcol_arena= (Query_arena *) alloc_root(&table->mem_root, + sizeof(Query_arena)))) + goto err; + *vcol_arena= expr_arena; + table->expr_arena= vcol_arena; + } + thd->set_n_backup_active_arena(vcol_arena, &backup_arena); + thd->stmt_arena= vcol_arena; + + thd->lex->parse_vcol_expr= TRUE; + old_character_set_client= thd->variables.character_set_client; + + /* + Step 3: Use the parser to build an Item object from vcol_expr_str. + */ + if (parse_sql(thd, &parser_state, NULL)) + { + goto err; + } + /* From now on use vcol_info generated by the parser. */ + field->vcol_info= thd->lex->vcol_info; + + /* Validate the Item tree. */ + if (fix_vcol_expr(thd, table, field)) + { + *error_reported= TRUE; + field->vcol_info= 0; + goto err; + } + rc= FALSE; + goto end; + +err: + rc= TRUE; + thd->lex->parse_vcol_expr= FALSE; + thd->free_items(); +end: + thd->stmt_arena= backup_stmt_arena_ptr; + if (vcol_arena) + thd->restore_active_arena(vcol_arena, &backup_arena); + thd->variables.character_set_client= old_character_set_client; + + DBUG_RETURN(rc); +} + +/* + Read data from a binary .frm file from MySQL 3.23 - 5.0 into TABLE_SHARE +*/ /* Open a table based on a TABLE_SHARE @@ -1664,7 +2068,7 @@ int open_table_from_share(THD *thd, TABLE_SHARE *share, const char *alias, uint records, i, bitmap_size; bool error_reported= FALSE; uchar *record, *bitmaps; - Field **field_ptr; + Field **field_ptr, **vfield_ptr; DBUG_ENTER("open_table_from_share"); DBUG_PRINT("enter",("name: '%s.%s' form: 0x%lx", share->db.str, share->table_name.str, (long) outparam)); @@ -1818,6 +2222,34 @@ int open_table_from_share(THD *thd, TABLE_SHARE *share, const char *alias, } } + /* + Process virtual columns, if any. + */ + if (!(vfield_ptr = (Field **) alloc_root(&outparam->mem_root, + (uint) ((share->vfields+1)* + sizeof(Field*))))) + goto err; + + outparam->vfield= vfield_ptr; + + for (field_ptr= outparam->field; *field_ptr; field_ptr++) + { + if ((*field_ptr)->vcol_info) + { + if (unpack_vcol_info_from_frm(thd, + outparam, + *field_ptr, + &(*field_ptr)->vcol_info->expr_str, + &error_reported)) + { + error= 4; // in case no error is reported + goto err; + } + *(vfield_ptr++)= *field_ptr; + } + } + *vfield_ptr= 0; // End marker + #ifdef WITH_PARTITION_STORAGE_ENGINE if (share->partition_info_len && outparam->file) { @@ -1885,17 +2317,33 @@ partititon_err: } #endif + /* Check virtual columns against table's storage engine. */ + if (share->vfields && + ((outparam->file && + !outparam->file->check_if_supported_virtual_columns()) || + (!outparam->file && share->db_type() && + share->db_type()->db_type == DB_TYPE_CSV_DB))) // Workaround for CSV + { + my_error(ER_UNSUPPORTED_ACTION_ON_VIRTUAL_COLUMN, + MYF(0), + "Specified storage engine"); + error_reported= TRUE; + goto err; + } + /* Allocate bitmaps */ bitmap_size= share->column_bitmap_size; - if (!(bitmaps= (uchar*) alloc_root(&outparam->mem_root, bitmap_size*3))) + if (!(bitmaps= (uchar*) alloc_root(&outparam->mem_root, bitmap_size*4))) goto err; bitmap_init(&outparam->def_read_set, (my_bitmap_map*) bitmaps, share->fields, FALSE); bitmap_init(&outparam->def_write_set, (my_bitmap_map*) (bitmaps+bitmap_size), share->fields, FALSE); - bitmap_init(&outparam->tmp_set, + bitmap_init(&outparam->def_vcol_set, (my_bitmap_map*) (bitmaps+bitmap_size*2), share->fields, FALSE); + bitmap_init(&outparam->tmp_set, + (my_bitmap_map*) (bitmaps+bitmap_size*3), share->fields, FALSE); outparam->default_column_bitmaps(); /* The table struct is now initialized; Open the table */ @@ -2000,10 +2448,14 @@ int closefrm(register TABLE *table, bool free_share) } my_free((char*) table->alias, MYF(MY_ALLOW_ZERO_PTR)); table->alias= 0; + if (table->expr_arena) + table->expr_arena->free_items(); if (table->field) { for (Field **ptr=table->field ; *ptr ; ptr++) + { delete *ptr; + } table->field= 0; } delete table->file; @@ -2467,6 +2919,7 @@ File create_frm(THD *thd, const char *name, const char *db, ulong length; uchar fill[IO_SIZE]; int create_flags= O_RDWR | O_TRUNC; + DBUG_ENTER("create_frm"); if (create_info->options & HA_LEX_CREATE_TMP_TABLE) create_flags|= O_EXCL | O_NOFOLLOW; @@ -2548,7 +3001,7 @@ File create_frm(THD *thd, const char *name, const char *db, { VOID(my_close(file,MYF(0))); VOID(my_delete(name,MYF(0))); - return(-1); + DBUG_RETURN(-1); } } } @@ -2559,7 +3012,7 @@ File create_frm(THD *thd, const char *name, const char *db, else my_error(ER_CANT_CREATE_TABLE,MYF(0),table,my_errno); } - return (file); + DBUG_RETURN(file); } /* create_frm */ @@ -2578,6 +3031,7 @@ void update_create_info_from_table(HA_CREATE_INFO *create_info, TABLE *table) create_info->comment= share->comment; create_info->transactional= share->transactional; create_info->page_checksum= share->page_checksum; + create_info->option_list= share->option_list; DBUG_VOID_RETURN; } @@ -3735,11 +4189,8 @@ bool TABLE_LIST::prepare_view_securety_context(THD *thd) { DBUG_PRINT("info", ("This table is suid view => load contest")); DBUG_ASSERT(view && view_sctx); - if (acl_getroot_no_password(view_sctx, - definer.user.str, - definer.host.str, - definer.host.str, - thd->db)) + if (acl_getroot(view_sctx, definer.user.str, definer.host.str, + definer.host.str, thd->db)) { if ((thd->lex->sql_command == SQLCOM_SHOW_CREATE) || (thd->lex->sql_command == SQLCOM_SHOW_FIELDS)) @@ -4332,9 +4783,10 @@ void st_table::clear_column_bitmaps() Reset column read/write usage. It's identical to: bitmap_clear_all(&table->def_read_set); bitmap_clear_all(&table->def_write_set); + bitmap_clear_all(&table->def_vcol_set); */ - bzero((char*) def_read_set.bitmap, s->column_bitmap_size*2); - column_bitmaps_set(&def_read_set, &def_write_set); + bzero((char*) def_read_set.bitmap, s->column_bitmap_size*3); + column_bitmaps_set(&def_read_set, &def_write_set, &def_vcol_set); } @@ -4439,7 +4891,14 @@ void st_table::mark_columns_used_by_index_no_reset(uint index, KEY_PART_INFO *key_part_end= (key_part + key_info[index].key_parts); for (;key_part != key_part_end; key_part++) + { bitmap_set_bit(bitmap, key_part->fieldnr-1); + if (key_part->field->vcol_info && + key_part->field->vcol_info->expr_item) + key_part->field->vcol_info-> + expr_item->walk(&Item::register_field_in_bitmap, + 1, (uchar *) bitmap); + } } @@ -4566,6 +5025,8 @@ void st_table::mark_columns_needed_for_update() file->column_bitmaps_signal(); } } + /* Mark all virtual columns needed for update */ + mark_virtual_columns_for_write(FALSE); DBUG_VOID_RETURN; } @@ -4592,9 +5053,109 @@ void st_table::mark_columns_needed_for_insert() } if (found_next_number_field) mark_auto_increment_column(); + /* Mark virtual columns for insert */ + mark_virtual_columns_for_write(TRUE); +} + + +/* + @brief Mark a column as virtual used by the query + + @param field the field for the column to be marked + + @details + The function marks the column for 'field' as virtual (computed) + in the bitmap vcol_set. + If the column is marked for the first time the expression to compute + the column is traversed and all columns that are occurred there are + marked in the read_set of the table. + + @retval + TRUE if column is marked for the first time + @retval + FALSE otherwise +*/ + +bool st_table::mark_virtual_col(Field *field) +{ + bool res; + DBUG_ASSERT(field->vcol_info); + if (!(res= bitmap_fast_test_and_set(vcol_set, field->field_index))) + { + Item *vcol_item= field->vcol_info->expr_item; + DBUG_ASSERT(vcol_item); + vcol_item->walk(&Item::register_field_in_read_map, 1, (uchar *) 0); + } + return res; } +/* + @brief Mark virtual columns for update/insert commands + + @param insert_fl <-> virtual columns are marked for insert command + + @details + The function marks virtual columns used in a update/insert commands + in the vcol_set bitmap. + For an insert command a virtual column is always marked in write_set if + it is a stored column. + If a virtual column is from write_set it is always marked in vcol_set. + If a stored virtual column is not from write_set but it is computed + through columns from write_set it is also marked in vcol_set, and, + besides, it is added to write_set. + + @return void + + @note + Let table t1 have columns a,b,c and let column c be a stored virtual + column computed through columns a and b. Then for the query + UPDATE t1 SET a=1 + column c will be placed into vcol_set and into write_set while + column b will be placed into read_set. + If column c was a virtual column, but not a stored virtual column + then it would not be added to any of the sets. Column b would not + be added to read_set either. +*/ + +void st_table::mark_virtual_columns_for_write(bool insert_fl) +{ + Field **vfield_ptr, *tmp_vfield; + bool bitmap_updated= FALSE; + + for (vfield_ptr= vfield; *vfield_ptr; vfield_ptr++) + { + tmp_vfield= *vfield_ptr; + if (bitmap_is_set(write_set, tmp_vfield->field_index)) + bitmap_updated= mark_virtual_col(tmp_vfield); + else if (tmp_vfield->stored_in_db) + { + bool mark_fl= insert_fl; + if (!mark_fl) + { + MY_BITMAP *save_read_set; + Item *vcol_item= tmp_vfield->vcol_info->expr_item; + DBUG_ASSERT(vcol_item); + bitmap_clear_all(&tmp_set); + save_read_set= read_set; + read_set= &tmp_set; + vcol_item->walk(&Item::register_field_in_read_map, 1, (uchar *) 0); + read_set= save_read_set; + bitmap_intersect(&tmp_set, write_set); + mark_fl= !bitmap_is_clear_all(&tmp_set); + } + if (mark_fl) + { + bitmap_set_bit(write_set, tmp_vfield->field_index); + mark_virtual_col(tmp_vfield); + bitmap_updated= TRUE; + } + } + } + if (bitmap_updated) + file->column_bitmaps_signal(); +} + /** @brief Check if this is part of a MERGE table with attached children. @@ -4602,7 +5163,7 @@ void st_table::mark_columns_needed_for_insert() @retval TRUE children are attached @retval FALSE no MERGE part or children not attached - @detail + @details A MERGE table consists of a parent TABLE and zero or more child TABLEs. Each of these TABLEs is called a part of a MERGE table. */ @@ -4863,6 +5424,57 @@ size_t max_row_length(TABLE *table, const uchar *data) return length; } +/* + @brief Compute values for virtual columns used in query + + @param thd Thread handle + @param table The TABLE object + @param for_write Requests to compute only fields needed for write + + @details + The function computes the values of the virtual columns of the table and + stores them in the table record buffer. + Only fields from vcol_set are computed, and, when the flag for_write is not + set to TRUE, a virtual field is computed only if it's not stored. + The flag for_write is set to TRUE for row insert/update operations. + + @retval + 0 Success + @retval + >0 Error occurred when storing a virtual field value +*/ + +int update_virtual_fields(THD *thd, TABLE *table, bool for_write) +{ + DBUG_ENTER("update_virtual_fields"); + Field **vfield_ptr, *vfield; + int error= 0; + if (!table || !table->vfield) + DBUG_RETURN(0); + + thd->reset_arena_for_cached_items(table->expr_arena); + /* Iterate over virtual fields in the table */ + for (vfield_ptr= table->vfield; *vfield_ptr; vfield_ptr++) + { + vfield= (*vfield_ptr); + DBUG_ASSERT(vfield->vcol_info && vfield->vcol_info->expr_item); + /* Only update those fields that are marked in the vcol_set bitmap */ + if (bitmap_is_set(table->vcol_set, vfield->field_index) && + (for_write || !vfield->stored_in_db)) + { + /* Compute the actual value of the virtual fields */ + error= vfield->vcol_info->expr_item->save_in_field(vfield, 0); + DBUG_PRINT("info", ("field '%s' - updated", vfield->field_name)); + } + else + { + DBUG_PRINT("info", ("field '%s' - skipped", vfield->field_name)); + } + } + thd->reset_arena_for_cached_items(0); + DBUG_RETURN(0); +} + /***************************************************************************** ** Instansiate templates *****************************************************************************/ |