summaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
authorserg@serg.mysql.com <>2002-06-19 23:54:45 +0000
committerserg@serg.mysql.com <>2002-06-19 23:54:45 +0000
commit5c83ae3fca5a3a56eb31a6827dae0779a94e52b8 (patch)
tree9939048d2a201bb88a20ae302cbc0cdfa14fd75d /include
parent287491719819f27942b55fa46da082a4d0cd4257 (diff)
downloadmariadb-git-5c83ae3fca5a3a56eb31a6827dae0779a94e52b8.tar.gz
multithreaded repair-by-sort code
parallel read access to IO_CACHE
Diffstat (limited to 'include')
-rw-r--r--include/my_sys.h59
-rw-r--r--include/myisam.h71
2 files changed, 95 insertions, 35 deletions
diff --git a/include/my_sys.h b/include/my_sys.h
index 3ff35763bce..3935a4d5cc0 100644
--- a/include/my_sys.h
+++ b/include/my_sys.h
@@ -36,7 +36,7 @@ extern int NEAR my_errno; /* Last error in mysys */
#include <m_ctype.h> /* for CHARSET_INFO */
#endif
-#include <stdarg.h>
+#include <stdarg.h>
#define MYSYS_PROGRAM_USES_CURSES() { error_handler_hook = my_message_curses; mysys_uses_curses=1; }
#define MYSYS_PROGRAM_DONT_USE_CURSES() { error_handler_hook = my_message_no_curses; mysys_uses_curses=0;}
@@ -278,7 +278,7 @@ extern struct my_file_info
{
my_string name;
enum file_type type;
-#if defined(THREAD) && !defined(HAVE_PREAD)
+#if defined(THREAD) && !defined(HAVE_PREAD)
pthread_mutex_t mutex;
#endif
} my_file_info[MY_NFILE];
@@ -299,6 +299,45 @@ typedef struct st_dynamic_string {
struct st_io_cache;
typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*);
+#ifdef THREAD
+typedef struct st_io_cache_share
+{
+ /* to sync on reads into buffer */
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int count;
+ /* actual IO_CACHE that filled the buffer */
+ struct st_io_cache *active;
+ /* the following will go implemented whenever the need arises */
+#ifdef NOT_IMPLEMENTED
+ /* whether the structure should be free'd */
+ my_bool alloced;
+#endif
+} IO_CACHE_SHARE;
+
+#define lock_io_cache(info) \
+ ( \
+ (errno=pthread_mutex_lock(&((info)->share->mutex))) ? -1 : ( \
+ (info)->share->count ? ( \
+ --((info)->share->count), \
+ pthread_cond_wait(&((info)->share->cond), \
+ &((info)->share->mutex)), \
+ (++((info)->share->count) ? \
+ pthread_mutex_unlock(&((info)->share->mutex)) : 0)) \
+ : 1 ) \
+ )
+
+#define unlock_io_cache(info) \
+ ( \
+ pthread_cond_broadcast(&((info)->share->cond)), \
+ pthread_mutex_unlock (&((info)->share->mutex)) \
+ )
+/* -- to catch errors
+#else
+#define lock_io_cache(info)
+#define unlock_io_cache(info)
+*/
+#endif
typedef struct st_io_cache /* Used when cacheing files */
{
@@ -331,10 +370,16 @@ typedef struct st_io_cache /* Used when cacheing files */
WRITE_CACHE, and &read_pos and &read_end respectively otherwise
*/
byte **current_pos, **current_end;
-/* The lock is for append buffer used in SEQ_READ_APPEND cache */
#ifdef THREAD
+ /* The lock is for append buffer used in SEQ_READ_APPEND cache
+ need mutex copying from append buffer to read buffer. */
pthread_mutex_t append_buffer_lock;
- /* need mutex copying from append buffer to read buffer */
+ /* The following is used when several threads are reading the
+ same file in parallel. They are synchronized on disk
+ accesses reading the cached part of the file asynchronously.
+ It should be set to NULL to disable the feature. Only
+ READ_CACHE mode is supported. */
+ IO_CACHE_SHARE *share;
#endif
/* a caller will use my_b_read() macro to read from the cache
if the data is already in cache, it will be simply copied with
@@ -626,6 +671,12 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type,
my_off_t seek_offset,pbool use_async_io,
pbool clear_cache);
extern int _my_b_read(IO_CACHE *info,byte *Buffer,uint Count);
+#ifdef THREAD
+extern int _my_b_read_r(IO_CACHE *info,byte *Buffer,uint Count);
+extern int init_io_cache_share(IO_CACHE *info,
+ IO_CACHE_SHARE *s, uint num_threads);
+extern int remove_io_thread(IO_CACHE *info);
+#endif
extern int _my_b_seq_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_get(IO_CACHE *info);
diff --git a/include/myisam.h b/include/myisam.h
index b7010dab814..bf7abddbd0e 100644
--- a/include/myisam.h
+++ b/include/myisam.h
@@ -314,24 +314,6 @@ typedef struct st_sort_key_blocks { /* Used when sorting */
int inited;
} SORT_KEY_BLOCKS;
-struct st_mi_check_param;
-
-typedef struct st_sort_info {
- MI_INFO *info;
- struct st_mi_check_param *param;
- enum data_file_type new_data_file_type;
- SORT_KEY_BLOCKS *key_block,*key_block_end;
- uint key,find_length,real_key_length;
- my_off_t pos,max_pos,filepos,start_recpos,filelength,dupp,buff_length;
- ha_rows max_records;
- ulonglong unique[MI_MAX_KEY_SEG+1];
- my_bool fix_datafile;
- char *record,*buff;
- void *wordlist, *wordptr;
- MI_KEYDEF *keyinfo;
- MI_KEYSEG *keyseg;
-} SORT_INFO;
-
typedef struct st_mi_check_param
{
ulonglong auto_increment_value;
@@ -354,7 +336,6 @@ typedef struct st_mi_check_param
int tmpfile_createflag;
myf myf_rw;
IO_CACHE read_cache;
- SORT_INFO sort_info;
ulonglong unique_count[MI_MAX_KEY_SEG+1];
ha_checksum key_crc[MI_MAX_POSSIBLE_KEY];
ulong rec_per_key_part[MI_MAX_KEY_SEG*MI_MAX_POSSIBLE_KEY];
@@ -363,17 +344,42 @@ typedef struct st_mi_check_param
char *op_name;
} MI_CHECK;
-
-typedef struct st_mi_sortinfo {
+typedef struct st_sort_info {
+ MI_INFO *info;
+ MI_CHECK *param;
+ enum data_file_type new_data_file_type;
+ SORT_KEY_BLOCKS *key_block,*key_block_end;
+ uint kei, total_keys;
+ my_off_t filelength,dupp,buff_length;
ha_rows max_records;
+ char *buff;
+ myf myf_rw;
+ /* sync things*/
+ uint got_error, threads_running;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+} SORT_INFO;
+
+typedef struct st_mi_sort_param {
+ pthread_t thr;
+ IO_CACHE read_cache;
+ ulonglong unique[MI_MAX_KEY_SEG+1];
+ uint key, key_length,real_key_length,sortbuff_size;
+ uint maxbuffers, keys, find_length, sort_keys_length;
+ uchar **sort_keys;
+ void *wordlist, *wordptr;
+ MI_KEYDEF *keyinfo;
SORT_INFO *sort_info;
+ IO_CACHE tempfile, tempfile_for_exceptions;
+ DYNAMIC_ARRAY buffpek;
+ my_off_t pos,max_pos,filepos,start_recpos;
+ my_bool fix_datafile;
+ char *record;
char *tmpdir;
- int (*key_cmp)(SORT_INFO *info, const void *, const void *);
- int (*key_read)(SORT_INFO *info,void *buff);
- int (*key_write)(SORT_INFO *info, const void *buff);
- void (*lock_in_memory)(MI_CHECK *info);
- uint key_length;
- myf myf_rw;
+ int (*key_cmp)(struct st_mi_sort_param *, const void *, const void *);
+ int (*key_read)(struct st_mi_sort_param *,void *);
+ int (*key_write)(struct st_mi_sort_param *, const void *);
+ void (*lock_in_memory)(MI_CHECK *);
} MI_SORT_PARAM;
/* functions in mi_check */
@@ -398,14 +404,17 @@ int flush_blocks(MI_CHECK *param, File file);
void update_auto_increment_key(MI_CHECK *param, MI_INFO *info,
my_bool repair);
int update_state_info(MI_CHECK *param, MI_INFO *info,uint update);
+void update_key_parts(MI_KEYDEF *keyinfo, ulong *rec_per_key_part,
+ ulonglong *unique, ulonglong records);
int filecopy(MI_CHECK *param, File to,File from,my_off_t start,
my_off_t length, const char *type);
int movepoint(MI_INFO *info,byte *record,my_off_t oldpos,
my_off_t newpos, uint prot_key);
-int sort_write_record(SORT_INFO *sort_info);
- int write_data_suffix(MI_CHECK *param, MI_INFO *info);
-int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
- ulong);
+int sort_write_record(MI_SORT_PARAM *sort_param);
+int write_data_suffix(SORT_INFO *sort_info, my_bool fix_datafile);
+int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages, ulong);
+void *_thr_find_all_keys(MI_SORT_PARAM *info);
+int _thr_write_keys(MI_SORT_PARAM *sort_param);
int test_if_almost_full(MI_INFO *info);
int recreate_table(MI_CHECK *param, MI_INFO **org_info, char *filename);
void mi_disable_non_unique_index(MI_INFO *info, ha_rows rows);