diff options
Diffstat (limited to 'sql/sql_class.h')
-rw-r--r-- | sql/sql_class.h | 513 |
1 files changed, 400 insertions, 113 deletions
diff --git a/sql/sql_class.h b/sql/sql_class.h index d9497907926..c161b25b4c5 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1,15 +1,15 @@ /* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB - + This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. - + This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - + You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ @@ -21,12 +21,18 @@ #pragma interface /* gcc class implementation */ #endif +// TODO: create log.h and move all the log header stuff there + class Query_log_event; class Load_log_event; +class Slave_log_event; - +enum enum_enable_or_disable { LEAVE_AS_IS, ENABLE, DISABLE }; +enum enum_ha_read_modes { RFIRST, RNEXT, RPREV, RLAST, RKEY }; enum enum_duplicates { DUP_ERROR, DUP_REPLACE, DUP_IGNORE }; enum enum_log_type { LOG_CLOSED, LOG_NORMAL, LOG_NEW, LOG_BIN }; +enum enum_delay_key_write { DELAY_KEY_WRITE_NONE, DELAY_KEY_WRITE_ON, + DELAY_KEY_WRITE_ALL }; // log info errors #define LOG_INFO_EOF -1 @@ -38,10 +44,12 @@ enum enum_log_type { LOG_CLOSED, LOG_NORMAL, LOG_NEW, LOG_BIN }; #define LOG_INFO_FATAL -7 #define LOG_INFO_IN_USE -8 +struct st_relay_log_info; + typedef struct st_log_info { char log_file_name[FN_REFLEN]; - my_off_t index_file_offset; + my_off_t index_file_offset, index_file_start_offset; my_off_t pos; bool fatal; // if the purge happens to give us a negative offset pthread_mutex_t lock; @@ -49,58 +57,105 @@ typedef struct st_log_info ~st_log_info() { pthread_mutex_destroy(&lock);} } LOG_INFO; +class Log_event; class MYSQL_LOG { private: pthread_mutex_t LOCK_log, LOCK_index; + pthread_cond_t update_cond; + ulonglong bytes_written; time_t last_time,query_start; IO_CACHE log_file; - File index_file; + IO_CACHE index_file; char *name; - volatile enum_log_type log_type; char time_buff[20],db[NAME_LEN+1]; char log_file_name[FN_REFLEN],index_file_name[FN_REFLEN]; + // current file sequence number for load data infile binary logging + uint file_id; + uint open_count; // For replication + /* + For binlog - if log name can never change we should not try to rotate it + or write any rotation events. The user should use FLUSH MASTER instead + of FLUSH LOGS for purging. + */ + volatile enum_log_type log_type; + enum cache_type io_cache_type; bool write_error,inited; - bool no_rotate; // for binlog - if log name can never change - // we should not try to rotate it or write any rotation events - // the user should use FLUSH MASTER instead of FLUSH LOGS for - // purging + bool no_rotate; + bool need_start_event; + bool no_auto_events; // for relay binlog + friend class Log_event; public: MYSQL_LOG(); ~MYSQL_LOG(); - pthread_mutex_t* get_log_lock() { return &LOCK_log; } - void set_index_file_name(const char* index_file_name = 0); - void init(enum_log_type log_type_arg); - void open(const char *log_name,enum_log_type log_type, - const char *new_name=0); - void new_file(bool inside_mutex = 0); - bool open_index(int options); - void close_index(); - bool write(THD *thd, enum enum_server_command command,const char *format,...); + void reset_bytes_written() + { + bytes_written = 0; + } + void harvest_bytes_written(ulonglong* counter) + { +#ifndef DBUG_OFF + char buf1[22],buf2[22]; +#endif + DBUG_ENTER("harvest_bytes_written"); + (*counter)+=bytes_written; + DBUG_PRINT("info",("counter: %s bytes_written: %s", llstr(*counter,buf1), + llstr(bytes_written,buf2))); + bytes_written=0; + DBUG_VOID_RETURN; + } + void signal_update() { pthread_cond_broadcast(&update_cond);} + void wait_for_update(THD* thd); + void set_need_start_event() { need_start_event = 1; } + void init(enum_log_type log_type_arg, + enum cache_type io_cache_type_arg = WRITE_CACHE, + bool no_auto_events_arg = 0); + bool open(const char *log_name,enum_log_type log_type, + const char *new_name, const char *index_file_name_arg, + enum cache_type io_cache_type_arg, + bool no_auto_events_arg); + void new_file(bool need_lock= 1); + bool write(THD *thd, enum enum_server_command command, + const char *format,...); bool write(THD *thd, const char *query, uint query_length, time_t query_start=0); - bool write(Query_log_event* event_info); // binary log write - bool write(Load_log_event* event_info); + bool write(Log_event* event_info); // binary log write bool write(THD *thd, IO_CACHE *cache); + + /* + v stands for vector + invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0) + */ + bool appendv(const char* buf,uint len,...); + bool append(Log_event* ev); + int generate_new_name(char *new_name,const char *old_name); void make_log_name(char* buf, const char* log_ident); bool is_active(const char* log_file_name); int purge_logs(THD* thd, const char* to_log); - void close(bool exiting = 0); // if we are exiting, we also want to close the - // index file + int purge_first_log(struct st_relay_log_info* rli); + bool reset_logs(THD* thd); + // if we are exiting, we also want to close the index file + void close(bool exiting = 0); // iterating through the log index file - int find_first_log(LOG_INFO* linfo, const char* log_name); - int find_next_log(LOG_INFO* linfo); + int find_log_pos(LOG_INFO* linfo, const char* log_name, + bool need_mutex); + int find_next_log(LOG_INFO* linfo, bool need_mutex); int get_current_log(LOG_INFO* linfo); + uint next_file_id(); inline bool is_open() { return log_type != LOG_CLOSED; } - char* get_index_fname() { return index_file_name;} - char* get_log_fname() { return log_file_name; } - void lock_index() { pthread_mutex_lock(&LOCK_index);} - void unlock_index() { pthread_mutex_unlock(&LOCK_index);} - File get_index_file() { return index_file;} + inline char* get_index_fname() { return index_file_name;} + inline char* get_log_fname() { return log_file_name; } + inline pthread_mutex_t* get_log_lock() { return &LOCK_log; } + inline IO_CACHE* get_log_file() { return &log_file; } + + inline void lock_index() { pthread_mutex_lock(&LOCK_index);} + inline void unlock_index() { pthread_mutex_unlock(&LOCK_index);} + inline IO_CACHE *get_index_file() { return &index_file;} + inline uint32 get_open_count() { return open_count; } }; /* character conversion tables */ @@ -114,23 +169,25 @@ class CONVERT void convert_array(const uchar *mapping,uchar *buff,uint length); public: const char *name; - CONVERT(const char *name_par,uchar *from_par,uchar *to_par) - :from_map(from_par),to_map(to_par),name(name_par) {} + uint numb; + CONVERT(const char *name_par,uchar *from_par,uchar *to_par, uint number) + :from_map(from_par),to_map(to_par),name(name_par),numb(number) {} friend CONVERT *get_convert_set(const char *name_ptr); inline void convert(char *a,uint length) { convert_array(from_map, (uchar*) a,length); } bool store(String *, const char *,uint); + inline uint number() { return numb; } }; typedef struct st_copy_info { ha_rows records; ha_rows deleted; ha_rows copied; - ha_rows error; + ha_rows error_count; enum enum_duplicates handle_duplicates; - int escape_char; + int escape_char, last_errno; } COPY_INFO; @@ -165,11 +222,13 @@ class Key :public Sql_alloc { public: enum Keytype { PRIMARY, UNIQUE, MULTIPLE, FULLTEXT }; enum Keytype type; + enum ha_key_alg algorithm; List<key_part_spec> columns; const char *Name; Key(enum Keytype type_par,const char *name_arg,List<key_part_spec> &cols) - :type(type_par), columns(cols),Name(name_arg) {} + :type(type_par), algorithm(HA_KEY_ALG_UNDEF), columns(cols), Name(name_arg) + {} ~Key() {} const char *name() { return Name; } }; @@ -214,84 +273,190 @@ public: }; -/**************************************************************************** -** every connection is handle by a thread with a THD -****************************************************************************/ - class delayed_insert; +#define THD_SENTRY_MAGIC 0xfeedd1ff +#define THD_SENTRY_GONE 0xdeadbeef + +#define THD_CHECK_SENTRY(thd) DBUG_ASSERT(thd->dbug_sentry == THD_SENTRY_MAGIC) + +struct system_variables +{ + ulonglong myisam_max_extra_sort_file_size; + ulonglong myisam_max_sort_file_size; + ulong bulk_insert_buff_size; + ulong join_buff_size; + ulong long_query_time; + ulong max_allowed_packet; + ulong max_heap_table_size; + ulong max_sort_length; + ulong max_join_size; + ulong max_tmp_tables; + ulong myisam_sort_buff_size; + ulong net_buffer_length; + ulong net_interactive_timeout; + ulong net_read_timeout; + ulong net_wait_timeout; + ulong net_write_timeout; + ulong net_retry_count; + ulong query_cache_type; + ulong read_buff_size; + ulong read_rnd_buff_size; + ulong select_limit; + ulong sortbuff_size; + ulong tmp_table_size; + ulong tx_isolation; + ulong table_type; + + my_bool log_warnings; + my_bool low_priority_updates; + + CONVERT *convert_set; +}; + + +/* + For each client connection we create a separate thread with THD serving as + a thread/connection descriptor +*/ + class THD :public ilink { public: - NET net; - LEX lex; - MEM_ROOT mem_root; - HASH user_vars; - String packet; /* Room for 1 row */ - struct sockaddr_in remote; - struct rand_struct rand; - char *query,*thread_stack; + NET net; // client connection descriptor + LEX lex; // parse tree descriptor + MEM_ROOT mem_root; // 1 command-life memory pool + HASH user_vars; // hash for user variables + String packet; // dynamic buffer for network I/O + struct sockaddr_in remote; // client socket address + struct rand_struct rand; // used for authentication + struct system_variables variables; // Changeable local variables + pthread_mutex_t LOCK_delete; // Locked before thd is deleted + + char *query; // Points to the current query, + /* + A pointer to the stack frame of handle_one_connection(), + which is called first in the thread for handling a client + */ + char *thread_stack; + + /* + host - host of the client + user - user of the client, set to NULL until the user has been read from + the connection + priv_user - not sure why we have it, but it is set to "boot" when we run + with --bootstrap + db - currently selected database + ip - client IP + */ + char *host,*user,*priv_user,*db,*ip; - const char *proc_info; - uint client_capabilities,sql_mode,max_packet_length; - uint master_access,db_access; - TABLE *open_tables,*temporary_tables; + /* Points to info-string that will show in SHOW PROCESSLIST */ + const char *proc_info; + /* points to host if host is available, otherwise points to ip */ + const char *host_or_ip; + + uint client_capabilities; /* What the client supports */ + /* Determines if which non-standard SQL behaviour should be enabled */ + uint sql_mode; + uint max_client_packet_length; + ulong master_access; /* Global privileges from mysql.user */ + ulong db_access; /* Privileges for current db */ + + + /* + open_tables - list of regular tables in use by this thread + temporary_tables - list of temp tables in use by this thread + handler_tables - list of tables that were opened with HANDLER OPEN + and are still in use by this thread + */ + TABLE *open_tables,*temporary_tables, *handler_tables; + // TODO: document the variables below MYSQL_LOCK *lock,*locked_tables; ULL *ull; +#ifndef DBUG_OFF + uint dbug_sentry; // watch out for memory corruption +#endif struct st_my_thread_var *mysys_var; enum enum_server_command command; - uint32 server_id; + uint32 server_id; + uint32 file_id; // for LOAD DATA INFILE const char *where; - char* last_nx_table; // last non-existent table, we need this for replication - char* last_nx_db; // database of the last nx table - time_t start_time,time_after_lock,user_time; - time_t connect_time,thr_create_time; // track down slow pthread_create + time_t start_time,time_after_lock,user_time; + time_t connect_time,thr_create_time; // track down slow pthread_create thr_lock_type update_lock_default; delayed_insert *di; struct st_transactions { IO_CACHE trans_log; - THD_TRANS all; /* Trans since BEGIN WORK */ - THD_TRANS stmt; /* Trans for current statement */ + THD_TRANS all; // Trans since BEGIN WORK + THD_TRANS stmt; // Trans for current statement uint bdb_lock_count; + + /* + Tables changed in transaction (that must be invalidated in query cache). + List contain only transactional tables, that not invalidated in query + cache (instead of full list of changed in transaction tables). + */ + CHANGED_TABLE_LIST* changed_tables; + MEM_ROOT mem_root; // Transaction-life memory allocation pool + void cleanup() + { + changed_tables = 0; + free_root(&mem_root,MYF(MY_KEEP_PREALLOC)); + } } transaction; -#ifdef HAVE_GEMINI_DB - struct st_gemini gemini; -#endif - Item *free_list; - CONVERT *convert_set; + Item *free_list, *handler_items; Field *dupp_field; #ifndef __WIN__ sigset_t signals,block_signals; #endif #ifdef SIGNAL_WITH_VIO_CLOSE Vio* active_vio; - pthread_mutex_t active_vio_lock; #endif - ulonglong next_insert_id,last_insert_id,current_insert_id; - ha_rows select_limit,offset_limit,default_select_limit,cuted_fields, - max_join_size, sent_row_count, examined_row_count; - table_map used_tables; - ulong query_id,version, inactive_timeout,options,thread_id; - ulong gemini_spin_retries; - long dbug_thread_id; + ulonglong next_insert_id,last_insert_id,current_insert_id, + limit_found_rows; + ha_rows select_limit, offset_limit, cuted_fields, + sent_row_count, examined_row_count; + table_map used_tables; + USER_CONN *user_connect; + ulong query_id,version, options,thread_id, col_access; + long dbug_thread_id; pthread_t real_id; - uint current_tablenr,tmp_table,cond_count,col_access,query_length; - uint server_status,open_options; - enum_tx_isolation tx_isolation, session_tx_isolation; + uint current_tablenr,tmp_table,cond_count; + uint server_status,open_options; + uint32 query_length; + uint32 db_length; + /* variables.transaction_isolation is reset to this after each commit */ + enum_tx_isolation session_tx_isolation; char scramble[9]; + uint8 query_cache_type; // type of query cache processing bool slave_thread; bool set_query_id,locked,count_cuted_fields,some_tables_deleted; bool no_errors, allow_sum_func, password, fatal_error; bool query_start_used,last_insert_id_used,insert_id_used; bool system_thread,in_lock_tables,global_read_lock; bool query_error, bootstrap, cleanup_done; + bool safe_to_cache_query; bool volatile killed; + /* + If we do a purge of binary logs, log index info of the threads + that are currently reading it needs to be adjusted. To do that + each thread that is using LOG_INFO needs to adjust the pointer to it + */ LOG_INFO* current_linfo; - // if we do a purge of binary logs, log index info of the threads - // that are currently reading it needs to be adjusted. To do that - // each thread that is using LOG_INFO needs to adjust the pointer to it - - ulong slave_proxy_id; // in slave thread we need to know in behalf of which - // thread the query is being run to replicate temp tables properly + /* + In slave thread we need to know in behalf of which + thread the query is being run to replicate temp tables properly + */ + ulong slave_proxy_id; + NET* slave_net; // network connection from slave -> m. + my_off_t log_pos; + + /* Used by the sys_var class to store temporary values */ + union + { + my_bool my_bool_value; + long long_value; + } sys_var_tmp; THD(); ~THD(); @@ -300,28 +465,19 @@ public: #ifdef SIGNAL_WITH_VIO_CLOSE inline void set_active_vio(Vio* vio) { - pthread_mutex_lock(&active_vio_lock); + pthread_mutex_lock(&LOCK_delete); active_vio = vio; - pthread_mutex_unlock(&active_vio_lock); + pthread_mutex_unlock(&LOCK_delete); } inline void clear_active_vio() { - pthread_mutex_lock(&active_vio_lock); + pthread_mutex_lock(&LOCK_delete); active_vio = 0; - pthread_mutex_unlock(&active_vio_lock); - } - inline void close_active_vio() - { - pthread_mutex_lock(&active_vio_lock); - if(active_vio) - { - vio_close(active_vio); - active_vio = 0; - } - pthread_mutex_unlock(&active_vio_lock); + pthread_mutex_unlock(&LOCK_delete); } + void THD::close_active_vio(); #endif - void prepare_to_die(); + void awake(bool prepare_to_die); inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex, const char* msg) { @@ -355,16 +511,18 @@ public: } return last_insert_id; } + inline ulonglong found_rows(void) + { + return limit_found_rows; + } inline bool active_transaction() { #ifdef USING_TRANSACTIONS return (transaction.all.bdb_tid != 0 || - transaction.all.innodb_active_trans != 0 || - transaction.all.gemini_tid != 0); + transaction.all.innodb_active_trans != 0); #else return 0; #endif - } inline gptr alloc(unsigned int size) { return alloc_root(&mem_root,size); } inline gptr calloc(unsigned int size) @@ -376,11 +534,30 @@ public: } inline char *strdup(const char *str) { return strdup_root(&mem_root,str); } - inline char *memdup(const char *str, unsigned int size) + inline char *strmake(const char *str, uint size) + { return strmake_root(&mem_root,str,size); } + inline char *memdup(const char *str, uint size) { return memdup_root(&mem_root,str,size); } + inline char *memdup_w_gap(const char *str, uint size, uint gap) + { + gptr ptr; + if ((ptr=alloc_root(&mem_root,size+gap))) + memcpy(ptr,str,size); + return ptr; + } + inline gptr trans_alloc(unsigned int size) + { + return alloc_root(&transaction.mem_root,size); + } + void add_changed_table(TABLE *table); + void add_changed_table(const char *key, long key_length); + CHANGED_TABLE_LIST * changed_table_dup(const char *key, long key_length); }; - +/* + Used to hold information about file and file structure in exchainge + via non-DB file (...INTO OUTFILE..., ...LOAD DATA...) +*/ class sql_exchange :public Sql_alloc { public: @@ -399,6 +576,10 @@ public: ** This is used to get result from a select */ +class JOIN; + +void send_error(NET *net,uint sql_errno=0, const char *err=0); + class select_result :public Sql_alloc { protected: THD *thd; @@ -408,7 +589,11 @@ public: virtual int prepare(List<Item> &list) { return 0; } virtual bool send_fields(List<Item> &list,uint flag)=0; virtual bool send_data(List<Item> &items)=0; - virtual void send_error(uint errcode,const char *err)=0; + virtual void initialize_tables (JOIN *join=0) {} + virtual void send_error(uint errcode,const char *err) + { + ::send_error(&thd->net,errcode,err); + } virtual bool send_eof()=0; virtual void abort() {} }; @@ -419,7 +604,6 @@ public: select_send() {} bool send_fields(List<Item> &list,uint flag); bool send_data(List<Item> &items); - void send_error(uint errcode,const char *err); bool send_eof(); }; @@ -443,6 +627,7 @@ public: bool send_eof(); }; + class select_dump :public select_result { sql_exchange *exchange; File file; @@ -463,29 +648,28 @@ public: class select_insert :public select_result { - protected: + public: TABLE *table; List<Item> *fields; - uint save_time_stamp; ulonglong last_insert_id; COPY_INFO info; + uint save_time_stamp; -public: select_insert(TABLE *table_par,List<Item> *fields_par,enum_duplicates duplic) - :table(table_par),fields(fields_par), save_time_stamp(0),last_insert_id(0) - { - bzero((char*) &info,sizeof(info)); - info.handle_duplicates=duplic; - } + :table(table_par),fields(fields_par), last_insert_id(0), save_time_stamp(0) { + bzero((char*) &info,sizeof(info)); + info.handle_duplicates=duplic; + } ~select_insert(); int prepare(List<Item> &list); - bool send_fields(List<Item> &list, - uint flag) { return 0; } + bool send_fields(List<Item> &list, uint flag) + { return 0; } bool send_data(List<Item> &items); void send_error(uint errcode,const char *err); bool send_eof(); }; + class select_create: public select_insert { ORDER *group; const char *db; @@ -512,6 +696,23 @@ public: void abort(); }; +class select_union :public select_result { + public: + TABLE *table; + COPY_INFO info; + uint save_time_stamp; + TMP_TABLE_PARAM *tmp_table_param; + + select_union(TABLE *table_par); + ~select_union(); + int prepare(List<Item> &list); + bool send_fields(List<Item> &list, uint flag) + { return 0; } + bool send_data(List<Item> &items); + bool send_eof(); + bool flush(); +}; + /* Structs used when sorting */ typedef struct st_sort_field { @@ -561,3 +762,89 @@ class user_var_entry Item_result type; }; +/* Class for unique (removing of duplicates) */ + +class Unique :public Sql_alloc +{ + DYNAMIC_ARRAY file_ptrs; + ulong max_elements, max_in_memory_size; + IO_CACHE file; + TREE tree; + byte *record_pointers; + bool flush(); + +public: + ulong elements; + Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg, + uint size, ulong max_in_memory_size_arg); + ~Unique(); + inline bool unique_add(gptr ptr) + { + if (tree.elements_in_tree > max_elements && flush()) + return 1; + return !tree_insert(&tree,ptr,0); + } + + bool get(TABLE *table); + + friend int unique_write_to_file(gptr key, element_count count, Unique *unique); + friend int unique_write_to_ptrs(gptr key, element_count count, Unique *unique); +}; + + class multi_delete : public select_result { + TABLE_LIST *delete_tables, *table_being_deleted; +#ifdef SINISAS_STRIP + IO_CACHE **tempfiles; + byte *memory_lane; +#else + Unique **tempfiles; +#endif + THD *thd; + ha_rows deleted; + uint num_of_tables; + int error; + thr_lock_type lock_option; + bool do_delete, not_trans_safe; + public: + multi_delete(THD *thd, TABLE_LIST *dt, thr_lock_type lock_option_arg, + uint num_of_tables); + ~multi_delete(); + int prepare(List<Item> &list); + bool send_fields(List<Item> &list, + uint flag) { return 0; } + bool send_data(List<Item> &items); + void initialize_tables (JOIN *join); + void send_error(uint errcode,const char *err); + int do_deletes (bool from_send_error); + bool send_eof(); + }; + + class multi_update : public select_result { + TABLE_LIST *update_tables, *table_being_updated; +// Unique **tempfiles; + COPY_INFO *infos; + TABLE **tmp_tables; + THD *thd; + ha_rows updated, found; + List<Item> fields; + List <Item> **fields_by_tables; + thr_lock_type lock_option; + enum enum_duplicates dupl; + uint num_of_tables, num_fields, num_updated, *save_time_stamps, *field_sequence; + int error; + bool do_update, not_trans_safe; + public: + multi_update(THD *thd_arg, TABLE_LIST *ut, List<Item> &fs, + enum enum_duplicates handle_duplicates, + thr_lock_type lock_option_arg, uint num); + ~multi_update(); + int prepare(List<Item> &list); + bool send_fields(List<Item> &list, + uint flag) { return 0; } + bool send_data(List<Item> &items); + void initialize_tables (JOIN *join); + void send_error(uint errcode,const char *err); + int do_updates (bool from_send_error); + bool send_eof(); + }; + |