diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/field.cc | 98 | ||||
-rw-r--r-- | sql/field.h | 3 | ||||
-rw-r--r-- | sql/handler.cc | 138 | ||||
-rw-r--r-- | sql/handler.h | 20 | ||||
-rw-r--r-- | sql/opt_range.cc | 341 | ||||
-rw-r--r-- | sql/opt_range.h | 15 | ||||
-rw-r--r-- | sql/sql_class.cc | 3 | ||||
-rw-r--r-- | sql/sql_select.cc | 74 | ||||
-rw-r--r-- | sql/table.cc | 1 |
9 files changed, 424 insertions, 269 deletions
diff --git a/sql/field.cc b/sql/field.cc index 2da84ba1316..d865d2cc9e9 100644 --- a/sql/field.cc +++ b/sql/field.cc @@ -4359,7 +4359,7 @@ int Field_varstring::store(const char *from,uint length,CHARSET_INFO *cs) set_warning(MYSQL_ERROR::WARN_LEVEL_WARN, ER_WARN_DATA_TRUNCATED); error= 1; } - memcpy(ptr+2,from,length); + memcpy(ptr+HA_KEY_BLOB_LENGTH,from,length); int2store(ptr, length); return error; } @@ -4388,18 +4388,18 @@ int Field_varstring::store(longlong nr) double Field_varstring::val_real(void) { int not_used; - uint length=uint2korr(ptr)+2; + uint length=uint2korr(ptr)+HA_KEY_BLOB_LENGTH; CHARSET_INFO *cs=charset(); - return my_strntod(cs,ptr+2,length,(char**)0, ¬_used); + return my_strntod(cs, ptr+HA_KEY_BLOB_LENGTH, length, (char**)0, ¬_used); } longlong Field_varstring::val_int(void) { int not_used; - uint length=uint2korr(ptr)+2; + uint length=uint2korr(ptr)+HA_KEY_BLOB_LENGTH; CHARSET_INFO *cs=charset(); - return my_strntoll(cs,ptr+2,length,10,NULL, ¬_used); + return my_strntoll(cs,ptr+HA_KEY_BLOB_LENGTH,length,10,NULL, ¬_used); } @@ -4407,7 +4407,7 @@ String *Field_varstring::val_str(String *val_buffer __attribute__((unused)), String *val_ptr) { uint length=uint2korr(ptr); - val_ptr->set((const char*) ptr+2,length,field_charset); + val_ptr->set((const char*) ptr+HA_KEY_BLOB_LENGTH,length,field_charset); return val_ptr; } @@ -4417,18 +4417,21 @@ int Field_varstring::cmp(const char *a_ptr, const char *b_ptr) uint a_length=uint2korr(a_ptr); uint b_length=uint2korr(b_ptr); int diff; - diff=my_strnncoll(field_charset, - (const uchar*)a_ptr+2,min(a_length,b_length), - (const uchar*)b_ptr+2,min(a_length,b_length)); + diff= my_strnncoll(field_charset, + (const uchar*) a_ptr+HA_KEY_BLOB_LENGTH, + min(a_length,b_length), + (const uchar*) b_ptr+HA_KEY_BLOB_LENGTH, + min(a_length,b_length)); return diff ? diff : (int) (a_length - b_length); } void Field_varstring::sort_string(char *to,uint length) { uint tot_length=uint2korr(ptr); - tot_length=my_strnxfrm(field_charset, - (unsigned char *) to, length, - (unsigned char *)ptr+2, tot_length); + tot_length= my_strnxfrm(field_charset, + (uchar*) to, length, + (uchar*) ptr+HA_KEY_BLOB_LENGTH, + tot_length); if (tot_length < length) field_charset->cset->fill(field_charset, to+tot_length,length-tot_length, binary() ? (char) 0 : ' '); @@ -4453,7 +4456,7 @@ char *Field_varstring::pack(char *to, const char *from, uint max_length) if (max_length > 255) *to++= (char) (length >> 8); if (length) - memcpy(to, from+2, length); + memcpy(to, from+HA_KEY_BLOB_LENGTH, length); return to+length; } @@ -4473,7 +4476,7 @@ const char *Field_varstring::unpack(char *to, const char *from) to[1] = *from++; } if (length) - memcpy(to+2, from, length); + memcpy(to+HA_KEY_BLOB_LENGTH, from, length); return from+length; } @@ -4484,8 +4487,8 @@ int Field_varstring::pack_cmp(const char *a, const char *b, uint key_length) uint b_length; if (key_length > 255) { - a_length=uint2korr(a); a+=2; - b_length=uint2korr(b); b+=2; + a_length=uint2korr(a); a+= 2; + b_length=uint2korr(b); b+= 2; } else { @@ -4493,32 +4496,32 @@ int Field_varstring::pack_cmp(const char *a, const char *b, uint key_length) b_length= (uint) (uchar) *b++; } return my_strnncoll(field_charset, - (const uchar *)a,a_length, - (const uchar *)b,b_length); + (const uchar*) a, a_length, + (const uchar*) b, b_length); } int Field_varstring::pack_cmp(const char *b, uint key_length) { - char *a=ptr+2; - uint a_length=uint2korr(ptr); + char *a= ptr+HA_KEY_BLOB_LENGTH; + uint a_length= uint2korr(ptr); uint b_length; if (key_length > 255) { - b_length=uint2korr(b); b+=2; + b_length=uint2korr(b); b+= 2; } else { b_length= (uint) (uchar) *b++; } return my_strnncoll(field_charset, - (const uchar *)a,a_length, - (const uchar *)b,b_length); + (const uchar*) a, a_length, + (const uchar*) b, b_length); } uint Field_varstring::packed_col_length(const char *data_ptr, uint length) { if (length > 255) - return uint2korr(data_ptr)+2; + return uint2korr(data_ptr)+HA_KEY_BLOB_LENGTH; else return (uint) ((uchar) *data_ptr)+1; } @@ -4531,22 +4534,21 @@ uint Field_varstring::max_packed_col_length(uint max_length) void Field_varstring::get_key_image(char *buff, uint length, CHARSET_INFO *cs, imagetype type) { - length-= HA_KEY_BLOB_LENGTH; uint f_length=uint2korr(ptr); if (f_length > length) f_length= length; int2store(buff,length); - memcpy(buff+2,ptr+2,length); + memcpy(buff+HA_KEY_BLOB_LENGTH, ptr+HA_KEY_BLOB_LENGTH, length); #ifdef HAVE_purify if (f_length < length) - bzero(buff+2+f_length, (length-f_length)); + bzero(buff+HA_KEY_BLOB_LENGTH+f_length, (length-f_length)); #endif } void Field_varstring::set_key_image(char *buff,uint length, CHARSET_INFO *cs) { length=uint2korr(buff); // Real length is here - (void) Field_varstring::store(buff+2, length, cs); + (void) Field_varstring::store(buff+HA_KEY_BLOB_LENGTH, length, cs); } @@ -4799,7 +4801,6 @@ int Field_blob::cmp_binary(const char *a_ptr, const char *b_ptr, void Field_blob::get_key_image(char *buff,uint length, CHARSET_INFO *cs, imagetype type) { - length-= HA_KEY_BLOB_LENGTH; uint32 blob_length= get_length(ptr); char *blob; @@ -4838,18 +4839,18 @@ void Field_blob::get_key_image(char *buff,uint length, Must clear this as we do a memcmp in opt_range.cc to detect identical keys */ - bzero(buff+2+blob_length, (length-blob_length)); + bzero(buff+HA_KEY_BLOB_LENGTH+blob_length, (length-blob_length)); length=(uint) blob_length; } int2store(buff,length); get_ptr(&blob); - memcpy(buff+2,blob,length); + memcpy(buff+HA_KEY_BLOB_LENGTH, blob, length); } void Field_blob::set_key_image(char *buff,uint length, CHARSET_INFO *cs) { - length=uint2korr(buff); - (void) Field_blob::store(buff+2,length,cs); + length= uint2korr(buff); + (void) Field_blob::store(buff+HA_KEY_BLOB_LENGTH, length, cs); } @@ -4857,16 +4858,16 @@ int Field_blob::key_cmp(const byte *key_ptr, uint max_key_length) { char *blob1; uint blob_length=get_length(ptr); - max_key_length-=2; memcpy_fixed(&blob1,ptr+packlength,sizeof(char*)); return Field_blob::cmp(blob1,min(blob_length, max_key_length), - (char*) key_ptr+2,uint2korr(key_ptr)); + (char*) key_ptr+HA_KEY_BLOB_LENGTH, + uint2korr(key_ptr)); } int Field_blob::key_cmp(const byte *a,const byte *b) { - return Field_blob::cmp((char*) a+2,uint2korr(a), - (char*) b+2,uint2korr(b)); + return Field_blob::cmp((char*) a+HA_KEY_BLOB_LENGTH, uint2korr(a), + (char*) b+HA_KEY_BLOB_LENGTH, uint2korr(b)); } @@ -4882,8 +4883,8 @@ void Field_blob::sort_string(char *to,uint length) memcpy_fixed(&blob,ptr+packlength,sizeof(char*)); blob_length=my_strnxfrm(field_charset, - (unsigned char *)to, length, - (unsigned char *)blob, blob_length); + (uchar*) to, length, + (uchar*) blob, blob_length); if (blob_length < length) field_charset->cset->fill(field_charset, to+blob_length, length-blob_length, @@ -4965,8 +4966,8 @@ int Field_blob::pack_cmp(const char *a, const char *b, uint key_length) b_length= (uint) (uchar) *b++; } return my_strnncoll(field_charset, - (const uchar *)a,a_length, - (const uchar *)b,b_length); + (const uchar*) a, a_length, + (const uchar*) b, b_length); } @@ -4988,8 +4989,8 @@ int Field_blob::pack_cmp(const char *b, uint key_length) b_length= (uint) (uchar) *b++; } return my_strnncoll(field_charset, - (const uchar *)a,a_length, - (const uchar *)b,b_length); + (const uchar*) a, a_length, + (const uchar*) b, b_length); } /* Create a packed key that will be used for storage from a MySQL row */ @@ -5025,7 +5026,7 @@ char *Field_blob::pack_key_from_key_image(char *to, const char *from, if (max_length > 255) *to++= (char) (length >> 8); if (length) - memcpy(to, from+2, length); + memcpy(to, from+HA_KEY_BLOB_LENGTH, length); return to+length; } @@ -5048,11 +5049,12 @@ uint Field_blob::max_packed_col_length(uint max_length) void Field_geom::get_key_image(char *buff, uint length, CHARSET_INFO *cs, imagetype type) { - length-= HA_KEY_BLOB_LENGTH; - ulong blob_length= get_length(ptr); char *blob; const char *dummy; MBR mbr; + ulong blob_length= get_length(ptr); + Geometry_buffer buffer; + Geometry *gobj; if (blob_length < SRID_SIZE) { @@ -5060,8 +5062,6 @@ void Field_geom::get_key_image(char *buff, uint length, CHARSET_INFO *cs, return; } get_ptr(&blob); - Geometry_buffer buffer; - Geometry *gobj; gobj= Geometry::create_from_wkb(&buffer, blob + SRID_SIZE, blob_length - SRID_SIZE); if (gobj->get_mbr(&mbr, &dummy)) @@ -5554,7 +5554,7 @@ uint32 calc_pack_length(enum_field_types type,uint32 length) switch (type) { case FIELD_TYPE_STRING: case FIELD_TYPE_DECIMAL: return (length); - case FIELD_TYPE_VAR_STRING: return (length+2); + case FIELD_TYPE_VAR_STRING: return (length+HA_KEY_BLOB_LENGTH); case FIELD_TYPE_YEAR: case FIELD_TYPE_TINY : return 1; case FIELD_TYPE_SHORT : return 2; diff --git a/sql/field.h b/sql/field.h index 8ebc7412c35..4efabcead18 100644 --- a/sql/field.h +++ b/sql/field.h @@ -212,7 +212,8 @@ public: { memcpy(buff,ptr,length); } inline void set_image(char *buff,uint length, CHARSET_INFO *cs) { memcpy(ptr,buff,length); } - virtual void get_key_image(char *buff,uint length, CHARSET_INFO *cs, imagetype type) + virtual void get_key_image(char *buff,uint length, CHARSET_INFO *cs, + imagetype type) { get_image(buff,length,cs); } virtual void set_key_image(char *buff,uint length, CHARSET_INFO *cs) { set_image(buff,length,cs); } diff --git a/sql/handler.cc b/sql/handler.cc index ddf2e68db47..2b22563bc3c 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1287,3 +1287,141 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache, mi_change_key_cache(old_key_cache, new_key_cache); return 0; } + + +/* + Read first row between two ranges. + Store ranges for future calls to read_range_next + + SYNOPSIS + read_range_first() + start_key Start key. Is 0 if no min range + end_key End key. Is 0 if no max range + sorted Set to 1 if result should be sorted per key + + NOTES + Record is read into table->record[0] + + RETURN + 0 Found row + HA_ERR_END_OF_FILE No rows in range + # Error code +*/ + +int handler::read_range_first(const key_range *start_key, + const key_range *end_key, + bool sorted) +{ + int result; + DBUG_ENTER("handler::read_range_first"); + + end_range= 0; + if (end_key) + { + end_range= &save_end_range; + save_end_range= *end_key; + key_compare_result_on_equal= ((end_key->flag == HA_READ_BEFORE_KEY) ? 1 : + (end_key->flag == HA_READ_AFTER_KEY) ? -1 : 0); + } + range_key_part= table->key_info[active_index].key_part; + + + if (!start_key) // Read first record + result= index_first(table->record[0]); + else + result= index_read(table->record[0], + start_key->key, + start_key->length, + start_key->flag); + if (result) + DBUG_RETURN((result == HA_ERR_KEY_NOT_FOUND || + result == HA_ERR_END_OF_FILE) ? HA_ERR_END_OF_FILE : + result); + + DBUG_RETURN (compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE); +} + + +/* + Read next row between two ranges. + + SYNOPSIS + read_range_next() + eq_range Set to 1 if start_key == end_key + + NOTES + Record is read into table->record[0] + + RETURN + 0 Found row + HA_ERR_END_OF_FILE No rows in range + # Error code +*/ + +int handler::read_range_next(bool eq_range) +{ + int result; + DBUG_ENTER("handler::read_range_next"); + + if (eq_range) + result= index_next_same(table->record[0], + end_range->key, + end_range->length); + else + result= index_next(table->record[0]); + if (result) + DBUG_RETURN(result); + DBUG_RETURN(compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE); +} + + +/* + Compare if found key is over max-value + + SYNOPSIS + compare_key + range key to compare to row + + NOTES + For this to work, the row must be stored in table->record[0] + + RETURN + 0 Key is equal to range or 'range' == 0 (no range) + -1 Key is less than range + 1 Key is larger than range +*/ + +int handler::compare_key(key_range *range) +{ + KEY_PART_INFO *key_part= range_key_part; + uint store_length; + + if (!range) + return 0; // No max range + + for (const char *key=range->key, *end=key+range->length; + key < end; + key+= store_length, key_part++) + { + int cmp; + store_length= key_part->store_length; + if (key_part->null_bit) + { + if (*key) + { + if (!key_part->field->is_null()) + return 1; + continue; + } + else if (key_part->field->is_null()) + return 0; + key++; // Skip null byte + store_length--; + } + if ((cmp=key_part->field->key_cmp((byte*) key, key_part->length)) < 0) + return -1; + if (cmp > 0) + return 1; + } + return key_compare_result_on_equal; +} diff --git a/sql/handler.h b/sql/handler.h index 80a3c2e7f1b..f1bce5950c4 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -218,6 +218,14 @@ typedef struct st_ha_check_opt } HA_CHECK_OPT; +typedef struct st_key_range +{ + const byte *key; + uint length; + enum ha_rkey_function flag; +} key_range; + + class handler :public Sql_alloc { protected: @@ -239,6 +247,12 @@ public: time_t create_time; /* When table was created */ time_t check_time; time_t update_time; + + /* The following are for read_range() */ + key_range save_end_range, *end_range; + KEY_PART_INFO *range_key_part; + int key_compare_result_on_equal; + uint errkey; /* Last dup key */ uint sortkey, key_used_on_scan; uint active_index; @@ -250,6 +264,7 @@ public: bool auto_increment_column_changed; bool implicit_emptied; /* Can be !=0 only if HEAP */ + handler(TABLE *table_arg) :table(table_arg), ref(0), data_file_length(0), max_data_file_length(0), index_file_length(0), delete_length(0), auto_increment_value(0), @@ -298,6 +313,11 @@ public: { return (my_errno=HA_ERR_WRONG_COMMAND); } + virtual int handler::read_range_first(const key_range *start_key, + const key_range *end_key, + bool sorted); + virtual int handler::read_range_next(bool eq_range); + int handler::compare_key(key_range *range); virtual int ft_init() { return -1; } virtual FT_INFO *ft_init_ext(uint flags,uint inx,const byte *key, uint keylen) diff --git a/sql/opt_range.cc b/sql/opt_range.cc index 38ff7f14c40..3ef1323893f 100644 --- a/sql/opt_range.cc +++ b/sql/opt_range.cc @@ -177,11 +177,11 @@ public: if (maybe_null && *min_value) { **min_key=1; - bzero(*min_key+1,length); + bzero(*min_key+1,length-1); } else - memcpy(*min_key,min_value,length+(int) maybe_null); - (*min_key)+= length+(int) maybe_null; + memcpy(*min_key,min_value,length); + (*min_key)+= length; } if (!(max_flag & NO_MAX_RANGE) && !(max_key_flag & (NO_MAX_RANGE | NEAR_MAX))) @@ -189,18 +189,18 @@ public: if (maybe_null && *max_value) { **max_key=1; - bzero(*max_key+1,length); + bzero(*max_key+1,length-1); } else - memcpy(*max_key,max_value,length+(int) maybe_null); - (*max_key)+= length+(int) maybe_null; + memcpy(*max_key,max_value,length); + (*max_key)+= length; } } void store_min_key(KEY_PART *key,char **range_key, uint *range_key_flag) { SEL_ARG *key_tree= first(); - key_tree->store(key[key_tree->part].part_length, + key_tree->store(key[key_tree->part].store_length, range_key,*range_key_flag,range_key,NO_MAX_RANGE); *range_key_flag|= key_tree->min_flag; if (key_tree->next_key_part && @@ -213,7 +213,7 @@ public: void store_max_key(KEY_PART *key,char **range_key, uint *range_key_flag) { SEL_ARG *key_tree= last(); - key_tree->store(key[key_tree->part].part_length, + key_tree->store(key[key_tree->part].store_length, range_key, NO_MIN_RANGE, range_key,*range_key_flag); (*range_key_flag)|= key_tree->max_flag; if (key_tree->next_key_part && @@ -646,6 +646,7 @@ int SQL_SELECT::test_quick_select(THD *thd, key_map keys_to_use, MEM_ROOT *old_root,alloc; SEL_TREE *tree; KEY_PART *key_parts; + KEY *key_info; PARAM param; /* set up parameter that is passed to all functions */ @@ -671,24 +672,26 @@ int SQL_SELECT::test_quick_select(THD *thd, key_map keys_to_use, old_root=my_pthread_getspecific_ptr(MEM_ROOT*,THR_MALLOC); my_pthread_setspecific_ptr(THR_MALLOC,&alloc); - for (idx=0 ; idx < head->keys ; idx++) + key_info= head->key_info; + for (idx=0 ; idx < head->keys ; idx++, key_info++) { + KEY_PART_INFO *key_part_info; if (!keys_to_use.is_set(idx)) continue; - KEY *key_info= &head->key_info[idx]; if (key_info->flags & HA_FULLTEXT) continue; // ToDo: ft-keys in non-ft ranges, if possible SerG param.key[param.keys]=key_parts; - for (uint part=0 ; part < key_info->key_parts ; part++,key_parts++) + key_part_info= key_info->key_part; + for (uint part=0 ; part < key_info->key_parts ; + part++, key_parts++, key_part_info++) { - key_parts->key=param.keys; - key_parts->part=part; - key_parts->part_length= key_info->key_part[part].length; - key_parts->field= key_info->key_part[part].field; - key_parts->null_bit= key_info->key_part[part].null_bit; - if (key_parts->field->type() == FIELD_TYPE_BLOB) - key_parts->part_length+=HA_KEY_BLOB_LENGTH; + key_parts->key= param.keys; + key_parts->part= part; + key_parts->length= key_part_info->length; + key_parts->store_length= key_part_info->store_length; + key_parts->field= key_part_info->field; + key_parts->null_bit= key_part_info->null_bit; key_parts->image_type = (key_info->flags & HA_SPATIAL) ? Field::itMBR : Field::itRAW; } @@ -1043,18 +1046,26 @@ get_mm_leaf(PARAM *param, COND *conf_func, Field *field, KEY_PART *key_part, DBUG_RETURN(0); // Can only optimize strings offset=maybe_null; - length=key_part->part_length; - if (field->type() == FIELD_TYPE_BLOB) + length=key_part->store_length; + + if (length != key_part->length + maybe_null) { - offset+=HA_KEY_BLOB_LENGTH; - field_length=key_part->part_length-HA_KEY_BLOB_LENGTH; + /* key packed with length prefix */ + offset+= HA_KEY_BLOB_LENGTH; + field_length= length - HA_KEY_BLOB_LENGTH; } else { - if (length < field_length) - length=field_length; // Only if overlapping key + if (unlikely(length < field_length)) + { + /* + This can only happen in a table created with UNIREG where one key + overlaps many fields + */ + length= field_length; + } else - field_length=length; + field_length= length; } length+=offset; if (!(min_str= (char*) alloc_root(param->mem_root, length*2))) @@ -1067,7 +1078,7 @@ get_mm_leaf(PARAM *param, COND *conf_func, Field *field, KEY_PART *key_part, res->ptr(), res->length(), ((Item_func_like*)(param->cond))->escape, wild_one, wild_many, - field_length, + field_length-maybe_null, min_str+offset, max_str+offset, &min_length, &max_length); if (like_error) // Can't optimize with LIKE @@ -1105,13 +1116,13 @@ get_mm_leaf(PARAM *param, COND *conf_func, Field *field, KEY_PART *key_part, if (field->key_type() == HA_KEYTYPE_VARTEXT) copies= 2; str= str2= (char*) alloc_root(param->mem_root, - (key_part->part_length+maybe_null)*copies+1); + (key_part->store_length)*copies+1); if (!str) DBUG_RETURN(0); if (maybe_null) *str= (char) field->is_real_null(); // Set to 1 if null - field->get_key_image(str+maybe_null,key_part->part_length, - field->charset(),key_part->image_type); + field->get_key_image(str+maybe_null, key_part->length, + field->charset(), key_part->image_type); if (copies == 2) { /* @@ -1120,16 +1131,17 @@ get_mm_leaf(PARAM *param, COND *conf_func, Field *field, KEY_PART *key_part, all rows between 'X' and 'X ...' */ uint length= uint2korr(str+maybe_null); - str2= str+ key_part->part_length + maybe_null; + str2= str+ key_part->store_length; /* remove end space */ while (length > 0 && str[length+HA_KEY_BLOB_LENGTH+maybe_null-1] == ' ') length--; int2store(str+maybe_null, length); /* Create key that is space filled */ memcpy(str2, str, length + HA_KEY_BLOB_LENGTH + maybe_null); - bfill(str2+ length+ HA_KEY_BLOB_LENGTH +maybe_null, - key_part->part_length-length - HA_KEY_BLOB_LENGTH, ' '); - int2store(str2+maybe_null, key_part->part_length - HA_KEY_BLOB_LENGTH); + my_fill_8bit(field->charset(), + str2+ length+ HA_KEY_BLOB_LENGTH +maybe_null, + key_part->length-length, ' '); + int2store(str2+maybe_null, key_part->length); } if (!(tree=new SEL_ARG(field,str,str2))) DBUG_RETURN(0); // out of memory @@ -1156,39 +1168,39 @@ get_mm_leaf(PARAM *param, COND *conf_func, Field *field, KEY_PART *key_part, tree->max_flag=NO_MAX_RANGE; break; case Item_func::SP_EQUALS_FUNC: - tree->min_flag=GEOM_FLAG | HA_READ_MBR_EQUAL;// NEAR_MIN;//512; - tree->max_flag=NO_MAX_RANGE; - break; + tree->min_flag=GEOM_FLAG | HA_READ_MBR_EQUAL;// NEAR_MIN;//512; + tree->max_flag=NO_MAX_RANGE; + break; case Item_func::SP_DISJOINT_FUNC: - tree->min_flag=GEOM_FLAG | HA_READ_MBR_DISJOINT;// NEAR_MIN;//512; - tree->max_flag=NO_MAX_RANGE; - break; + tree->min_flag=GEOM_FLAG | HA_READ_MBR_DISJOINT;// NEAR_MIN;//512; + tree->max_flag=NO_MAX_RANGE; + break; case Item_func::SP_INTERSECTS_FUNC: - tree->min_flag=GEOM_FLAG | HA_READ_MBR_INTERSECT;// NEAR_MIN;//512; - tree->max_flag=NO_MAX_RANGE; - break; + tree->min_flag=GEOM_FLAG | HA_READ_MBR_INTERSECT;// NEAR_MIN;//512; + tree->max_flag=NO_MAX_RANGE; + break; case Item_func::SP_TOUCHES_FUNC: - tree->min_flag=GEOM_FLAG | HA_READ_MBR_INTERSECT;// NEAR_MIN;//512; - tree->max_flag=NO_MAX_RANGE; - break; + tree->min_flag=GEOM_FLAG | HA_READ_MBR_INTERSECT;// NEAR_MIN;//512; + tree->max_flag=NO_MAX_RANGE; + break; case Item_func::SP_CROSSES_FUNC: - tree->min_flag=GEOM_FLAG | HA_READ_MBR_INTERSECT;// NEAR_MIN;//512; - tree->max_flag=NO_MAX_RANGE; - break; + tree->min_flag=GEOM_FLAG | HA_READ_MBR_INTERSECT;// NEAR_MIN;//512; + tree->max_flag=NO_MAX_RANGE; + break; case Item_func::SP_WITHIN_FUNC: - tree->min_flag=GEOM_FLAG | HA_READ_MBR_WITHIN;// NEAR_MIN;//512; - tree->max_flag=NO_MAX_RANGE; - break; + tree->min_flag=GEOM_FLAG | HA_READ_MBR_WITHIN;// NEAR_MIN;//512; + tree->max_flag=NO_MAX_RANGE; + break; case Item_func::SP_CONTAINS_FUNC: - tree->min_flag=GEOM_FLAG | HA_READ_MBR_CONTAIN;// NEAR_MIN;//512; - tree->max_flag=NO_MAX_RANGE; - break; + tree->min_flag=GEOM_FLAG | HA_READ_MBR_CONTAIN;// NEAR_MIN;//512; + tree->max_flag=NO_MAX_RANGE; + break; case Item_func::SP_OVERLAPS_FUNC: - tree->min_flag=GEOM_FLAG | HA_READ_MBR_INTERSECT;// NEAR_MIN;//512; - tree->max_flag=NO_MAX_RANGE; - break; + tree->min_flag=GEOM_FLAG | HA_READ_MBR_INTERSECT;// NEAR_MIN;//512; + tree->max_flag=NO_MAX_RANGE; + break; default: break; @@ -2236,7 +2248,7 @@ check_quick_keys(PARAM *param,uint idx,SEL_ARG *key_tree, uint tmp_min_flag,tmp_max_flag,keynr; char *tmp_min_key=min_key,*tmp_max_key=max_key; - key_tree->store(param->key[idx][key_tree->part].part_length, + key_tree->store(param->key[idx][key_tree->part].store_length, &tmp_min_key,min_key_flag,&tmp_max_key,max_key_flag); uint min_key_length= (uint) (tmp_min_key- param->min_key); uint max_key_length= (uint) (tmp_max_key- param->max_key); @@ -2332,8 +2344,14 @@ get_quick_select(PARAM *param,uint idx,SEL_ARG *key_tree) { QUICK_SELECT *quick; DBUG_ENTER("get_quick_select"); - if ((quick=new QUICK_SELECT(param->thd, param->table, - param->real_keynr[idx]))) + + if (param->table->key_info[param->real_keynr[idx]].flags & HA_SPATIAL) + quick=new QUICK_SELECT_GEOM(param->thd, param->table, param->real_keynr[idx], + 0); + else + quick=new QUICK_SELECT(param->thd, param->table, param->real_keynr[idx]); + + if (quick) { if (quick->error || get_quick_keys(param,quick,param->key[idx],key_tree,param->min_key,0, @@ -2373,7 +2391,7 @@ get_quick_keys(PARAM *param,QUICK_SELECT *quick,KEY_PART *key, return 1; } char *tmp_min_key=min_key,*tmp_max_key=max_key; - key_tree->store(key[key_tree->part].part_length, + key_tree->store(key[key_tree->part].store_length, &tmp_min_key,min_key_flag,&tmp_max_key,max_key_flag); if (key_tree->next_key_part && @@ -2491,19 +2509,17 @@ static bool null_part_in_key(KEY_PART *key_part, const char *key, uint length) { for (const char *end=key+length ; key < end; - key+= key_part++->part_length) + key+= key_part++->store_length) { - if (key_part->null_bit) - { - if (*key++) - return 1; - } + if (key_part->null_bit && *key) + return 1; } return 0; } + /**************************************************************************** -** Create a QUICK RANGE based on a key + Create a QUICK RANGE based on a key ****************************************************************************/ QUICK_SELECT *get_quick_select_for_ref(THD *thd, TABLE *table, TABLE_REF *ref) @@ -2541,9 +2557,8 @@ QUICK_SELECT *get_quick_select_for_ref(THD *thd, TABLE *table, TABLE_REF *ref) { key_part->part=part; key_part->field= key_info->key_part[part].field; - key_part->part_length= key_info->key_part[part].length; - if (key_part->field->type() == FIELD_TYPE_BLOB) - key_part->part_length+=HA_KEY_BLOB_LENGTH; + key_part->length= key_info->key_part[part].length; + key_part->store_length= key_info->key_part[part].store_length; key_part->null_bit= key_info->key_part[part].null_bit; } if (quick->ranges.push_back(range)) @@ -2585,111 +2600,74 @@ int QUICK_SELECT::get_next() for (;;) { int result; + key_range start_key, end_key; if (range) - { // Already read through key - result=((range->flag & (EQ_RANGE | GEOM_FLAG)) ? - file->index_next_same(record, (byte*) range->min_key, - range->min_length) : - file->index_next(record)); - - if (!result) - { - if ((range->flag & GEOM_FLAG) || !cmp_next(*it.ref())) - DBUG_RETURN(0); - } - else if (result != HA_ERR_END_OF_FILE) + { + // Already read through key + result= file->read_range_next(test(range->flag & EQ_RANGE)); + if (result != HA_ERR_END_OF_FILE) DBUG_RETURN(result); } - if (!(range=it++)) + if (!(range= it++)) DBUG_RETURN(HA_ERR_END_OF_FILE); // All ranges used - if (range->flag & GEOM_FLAG) - { - if ((result = file->index_read(record, - (byte*) (range->min_key), - range->min_length, - (ha_rkey_function)(range->flag ^ - GEOM_FLAG)))) - { - if (result != HA_ERR_KEY_NOT_FOUND) - DBUG_RETURN(result); - range=0; // Not found, to next range - continue; - } - DBUG_RETURN(0); - } + start_key.key= range->min_key; + start_key.length= range->min_length; + start_key.flag= ((range->flag & NEAR_MIN) ? HA_READ_AFTER_KEY : + (range->flag & EQ_RANGE) ? + HA_READ_KEY_EXACT : HA_READ_KEY_OR_NEXT); + end_key.key= range->max_key; + end_key.length= range->max_length; + /* + We use READ_AFTER_KEY here because if we are reading on a key + prefix we want to find all keys with this prefix + */ + end_key.flag= (range->flag & NEAR_MAX ? HA_READ_BEFORE_KEY : + HA_READ_AFTER_KEY); - if (range->flag & NO_MIN_RANGE) // Read first record - { - int local_error; - if ((local_error=file->index_first(record))) - DBUG_RETURN(local_error); // Empty table - if (cmp_next(range) == 0) - DBUG_RETURN(0); - range=0; // No matching records; go to next range - continue; - } - if ((result = file->index_read(record, - (byte*) (range->min_key + - test(range->flag & GEOM_FLAG)), - range->min_length, - (range->flag & NEAR_MIN) ? - HA_READ_AFTER_KEY: - (range->flag & EQ_RANGE) ? - HA_READ_KEY_EXACT : - HA_READ_KEY_OR_NEXT))) + result= file->read_range_first(range->min_length ? &start_key : 0, + range->max_length ? &end_key : 0, + sorted); + if (range->flag == (UNIQUE_RANGE | EQ_RANGE)) + range=0; // Stop searching - { - if (result != HA_ERR_KEY_NOT_FOUND) - DBUG_RETURN(result); - range=0; // Not found, to next range - continue; - } - if (cmp_next(range) == 0) - { - if (range->flag == (UNIQUE_RANGE | EQ_RANGE)) - range=0; // Stop searching - DBUG_RETURN(0); // Found key is in range - } - range=0; // To next range + if (result != HA_ERR_END_OF_FILE) + DBUG_RETURN(result); + range=0; // No matching rows; go to next range } } -/* - Compare if found key is over max-value - Returns 0 if key <= range->max_key -*/ +/* Get next for geometrical indexes */ -int QUICK_SELECT::cmp_next(QUICK_RANGE *range_arg) +int QUICK_SELECT_GEOM::get_next() { - if (range_arg->flag & NO_MAX_RANGE) - return 0; /* key can't be to large */ + DBUG_ENTER(" QUICK_SELECT_GEOM::get_next"); - KEY_PART *key_part=key_parts; - for (char *key=range_arg->max_key, *end=key+range_arg->max_length; - key < end; - key+= key_part++->part_length) + for (;;) { - int cmp; - if (key_part->null_bit) + int result; + if (range) { - if (*key++) - { - if (!key_part->field->is_null()) - return 1; - continue; - } - else if (key_part->field->is_null()) - return 0; + // Already read through key + result= file->index_next_same(record, (byte*) range->min_key, + range->min_length); + if (result != HA_ERR_END_OF_FILE) + DBUG_RETURN(result); } - if ((cmp=key_part->field->key_cmp((byte*) key, key_part->part_length)) < 0) - return 0; - if (cmp > 0) - return 1; + + if (!(range= it++)) + DBUG_RETURN(HA_ERR_END_OF_FILE); // All ranges used + + result= file->index_read(record, + (byte*) range->min_key, + range->min_length, + (ha_rkey_function)(range->flag ^ GEOM_FLAG)); + if (result != HA_ERR_KEY_NOT_FOUND) + DBUG_RETURN(result); + range=0; // Not found, to next range } - return (range_arg->flag & NEAR_MAX) ? 1 : 0; // Exact match } @@ -2841,15 +2819,18 @@ int QUICK_SELECT_DESC::cmp_prev(QUICK_RANGE *range_arg) return 0; /* key can't be to small */ KEY_PART *key_part = key_parts; + uint store_length; + for (char *key = range_arg->min_key, *end = key + range_arg->min_length; key < end; - key += key_part++->part_length) + key += store_length, key_part++) { int cmp; + store_length= key_part->store_length; if (key_part->null_bit) { // this key part allows null values; NULL is lower than everything else - if (*key++) + if (*key) { // the range is expecting a null value if (!key_part->field->is_null()) @@ -2858,9 +2839,11 @@ int QUICK_SELECT_DESC::cmp_prev(QUICK_RANGE *range_arg) } else if (key_part->field->is_null()) return 1; // null -- outside the range + key++; + store_length--; } if ((cmp = key_part->field->key_cmp((byte*) key, - key_part->part_length)) > 0) + key_part->length)) > 0) return 0; if (cmp < 0) return 1; @@ -2888,23 +2871,20 @@ bool QUICK_SELECT_DESC::range_reads_after_key(QUICK_RANGE *range_arg) bool QUICK_SELECT_DESC::test_if_null_range(QUICK_RANGE *range_arg, uint used_key_parts) { - uint offset,end; + uint offset, end; KEY_PART *key_part = key_parts, *key_part_end= key_part+used_key_parts; for (offset= 0, end = min(range_arg->min_length, range_arg->max_length) ; offset < end && key_part != key_part_end ; - offset += key_part++->part_length) + offset+= key_part++->store_length) { - uint null_length=test(key_part->null_bit); if (!memcmp((char*) range_arg->min_key+offset, (char*) range_arg->max_key+offset, - key_part->part_length + null_length)) - { - offset+=null_length; + key_part->store_length)) continue; - } - if (null_length && range_arg->min_key[offset]) + + if (key_part->null_bit && range_arg->min_key[offset]) return 1; // min_key is null and max_key isn't // Range doesn't cover NULL. This is ok if there is no more null parts break; @@ -2946,33 +2926,34 @@ static void print_key(KEY_PART *key_part,const char *key,uint used_length) { char buff[1024]; + const char *key_end= key+used_length; String tmp(buff,sizeof(buff),&my_charset_bin); + uint store_length; - for (uint length=0; - length < used_length ; - length+=key_part->part_length, key+=key_part->part_length, key_part++) + for (; key < key_end; key+=store_length, key_part++) { - Field *field=key_part->field; - if (length != 0) - fputc('/',DBUG_FILE); + Field *field= key_part->field; + store_length= key_part->store_length; + if (field->real_maybe_null()) { - length++; // null byte is not in part_length - if (*key++) + if (*key) { fwrite("NULL",sizeof(char),4,DBUG_FILE); continue; } + key++; // Skip null byte + store_length--; } - field->set_key_image((char*) key,key_part->part_length - - ((field->type() == FIELD_TYPE_BLOB) ? - HA_KEY_BLOB_LENGTH : 0), - field->charset()); + field->set_key_image((char*) key, key_part->length, field->charset()); field->val_str(&tmp); fwrite(tmp.ptr(),sizeof(char),tmp.length(),DBUG_FILE); + if (key+store_length < key_end) + fputc('/',DBUG_FILE); } } + static void print_quick(QUICK_SELECT *quick,const key_map* needed_reg) { QUICK_RANGE *range; diff --git a/sql/opt_range.h b/sql/opt_range.h index bf10c02c295..2df9d93e1ef 100644 --- a/sql/opt_range.h +++ b/sql/opt_range.h @@ -35,7 +35,7 @@ typedef struct st_key_part { - uint16 key,part,part_length; + uint16 key,part, store_length, length; uint8 null_bit; Field *field; Field::imagetype image_type; @@ -68,7 +68,7 @@ class QUICK_RANGE :public Sql_alloc { class QUICK_SELECT { public: - bool next,dont_free; + bool next,dont_free,sorted; int error; uint index, max_used_key_length, used_key_parts; TABLE *head; @@ -89,11 +89,20 @@ public: int init() { return error=file->index_init(index); } virtual int get_next(); virtual bool reverse_sorted() { return 0; } - int cmp_next(QUICK_RANGE *range); bool unique_key_range(); }; +class QUICK_SELECT_GEOM: public QUICK_SELECT +{ +public: + QUICK_SELECT_GEOM(THD *thd, TABLE *table, uint index_arg, bool no_alloc) + :QUICK_SELECT(thd, table, index_arg, no_alloc) + {}; + virtual int get_next(); +}; + + class QUICK_SELECT_DESC: public QUICK_SELECT { public: diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 1b4c8bec416..5cad16f31df 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -86,7 +86,8 @@ extern "C" void free_user_var(user_var_entry *entry) THD::THD():user_time(0), current_statement(0), is_fatal_error(0), last_insert_id_used(0), insert_id_used(0), rand_used(0), in_lock_tables(0), - global_read_lock(0), bootstrap(0) + global_read_lock(0), bootstrap(0), + no_table_fix_fields_cache(0) { host= user= priv_user= db= ip=0; host_or_ip= "connecting host"; diff --git a/sql/sql_select.cc b/sql/sql_select.cc index 86b1dcfb4ee..2eb3293e57d 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -3746,7 +3746,8 @@ make_join_readinfo(JOIN *join, uint options) table->key_read=1; table->file->extra(HA_EXTRA_KEYREAD); } - else if (!table->used_keys.is_clear_all() && ! (tab->select && tab->select->quick)) + else if (!table->used_keys.is_clear_all() && + !(tab->select && tab->select->quick)) { // Only read index tree tab->index=find_shortest_key(table, & table->used_keys); tab->table->file->index_init(tab->index); @@ -6907,6 +6908,7 @@ static int test_if_order_by_key(ORDER *order, TABLE *table, uint idx, key_part_end=key_part+table->key_info[idx].key_parts; key_part_map const_key_parts=table->const_key_parts[idx]; int reverse=0; + DBUG_ENTER("test_if_order_by_key"); for (; order ; order=order->next, const_key_parts>>=1) { @@ -6917,25 +6919,24 @@ static int test_if_order_by_key(ORDER *order, TABLE *table, uint idx, Skip key parts that are constants in the WHERE clause. These are already skipped in the ORDER BY by const_expression_in_where() */ - while (const_key_parts & 1) - { - key_part++; const_key_parts>>=1; - } + for (; const_key_parts & 1 ; const_key_parts>>= 1) + key_part++; + if (key_part == key_part_end || key_part->field != field) - return 0; + DBUG_RETURN(0); /* set flag to 1 if we can use read-next on key, else to -1 */ - flag=(order->asc == !(key_part->key_part_flag & HA_REVERSE_SORT)) - ? 1 : -1; + flag= ((order->asc == !(key_part->key_part_flag & HA_REVERSE_SORT)) ? 1 : -1); if (reverse && flag != reverse) - return 0; + DBUG_RETURN(0); reverse=flag; // Remember if reverse key_part++; } *used_key_parts= (uint) (key_part - table->key_info[idx].key_part); - return reverse; + DBUG_RETURN(reverse); } + static uint find_shortest_key(TABLE *table, const key_map *usable_keys) { uint min_length= (uint) ~0; @@ -6958,18 +6959,20 @@ static uint find_shortest_key(TABLE *table, const key_map *usable_keys) } /* + Test if a second key is the subkey of the first one. + SYNOPSIS is_subkey() - key_part - first key parts - ref_key_part - second key parts - ref_key_part_end - last+1 part of the second key - DESCRIPTION - Test if a second key is the subkey of the first one. + key_part First key parts + ref_key_part Second key parts + ref_key_part_end Last+1 part of the second key + NOTE Second key MUST be shorter than the first one. + RETURN - 1 - is the subkey - 0 - otherwise + 1 is a subkey + 0 no sub key */ inline bool @@ -6983,20 +6986,21 @@ is_subkey(KEY_PART_INFO *key_part, KEY_PART_INFO *ref_key_part, } /* + Test if we can use one of the 'usable_keys' instead of 'ref' key for sorting + SYNOPSIS test_if_subkey() - ref - number of key, used for WHERE clause - usable_keys - keys for testing - DESCRIPTION - Test if we can use one of the 'usable_keys' instead of 'ref' key. + ref Number of key, used for WHERE clause + usable_keys Keys for testing + RETURN - MAX_KEY - if we can't use other key - the number of found key - otherwise + MAX_KEY If we can't use other key + the number of found key Otherwise */ static uint test_if_subkey(ORDER *order, TABLE *table, uint ref, uint ref_key_parts, - const key_map& usable_keys) + const key_map *usable_keys) { uint nr; uint min_length= (uint) ~0; @@ -7007,7 +7011,7 @@ test_if_subkey(ORDER *order, TABLE *table, uint ref, uint ref_key_parts, for (nr= 0 ; nr < table->keys ; nr++) { - if (usable_keys.is_set(nr) && + if (usable_keys->is_set(nr) && table->key_info[nr].key_length < min_length && table->key_info[nr].key_parts >= ref_key_parts && is_subkey(table->key_info[nr].key_part, ref_key_part, @@ -7051,12 +7055,12 @@ test_if_skip_sort_order(JOIN_TAB *tab,ORDER *order,ha_rows select_limit, if ((*tmp_order->item)->type() != Item::FIELD_ITEM) { usable_keys.clear_all(); - break; + DBUG_RETURN(0); } - usable_keys.intersect( - ((Item_field*) (*tmp_order->item))->field->part_of_sortkey); + usable_keys.intersect(((Item_field*) (*tmp_order->item))-> + field->part_of_sortkey); if (usable_keys.is_clear_all()) - break; // No usable keys + DBUG_RETURN(0); // No usable keys } ref_key= -1; @@ -7092,9 +7096,9 @@ test_if_skip_sort_order(JOIN_TAB *tab,ORDER *order,ha_rows select_limit, keys */ if (table->used_keys.is_set(ref_key)) - usable_keys.merge(table->used_keys); + usable_keys.intersect(table->used_keys); if ((new_ref_key= test_if_subkey(order, table, ref_key, ref_key_parts, - usable_keys)) < MAX_KEY) + &usable_keys)) < MAX_KEY) { /* Found key that can be used to retrieve data in sorted order */ if (tab->ref.key >= 0) @@ -7154,6 +7158,8 @@ test_if_skip_sort_order(JOIN_TAB *tab,ORDER *order,ha_rows select_limit, /* fall through */ } } + else if (select && select->quick) + select->quick->sorted= 1; DBUG_RETURN(1); /* No need to sort */ } } @@ -7292,9 +7298,9 @@ create_sort_index(THD *thd, JOIN *join, ORDER *order, For impossible ranges (like when doing a lookup on NULL on a NOT NULL field, quick will contain an empty record set. */ - if (!(select->quick= tab->type == JT_FT ? - new FT_SELECT(thd, table, tab->ref.key) : - get_quick_select_for_ref(thd, table, &tab->ref))) + if (!(select->quick= (tab->type == JT_FT ? + new FT_SELECT(thd, table, tab->ref.key) : + get_quick_select_for_ref(thd, table, &tab->ref)))) goto err; } } diff --git a/sql/table.cc b/sql/table.cc index f137abf2ef7..1b7d30560ef 100644 --- a/sql/table.cc +++ b/sql/table.cc @@ -553,7 +553,6 @@ int openfrm(const char *name, const char *alias, uint db_stat, uint prgflag, keyinfo->key_length+= HA_KEY_NULL_LENGTH; } if (field->type() == FIELD_TYPE_BLOB || - field->type() == FIELD_TYPE_GEOMETRY || field->real_type() == FIELD_TYPE_VAR_STRING) { if (field->type() == FIELD_TYPE_BLOB) |