diff options
author | unknown <bell@desktop.sanja.is.com.ua> | 2007-02-02 09:41:32 +0200 |
---|---|---|
committer | unknown <bell@desktop.sanja.is.com.ua> | 2007-02-02 09:41:32 +0200 |
commit | 025400922118f11a15be54c66455f20e2f72c0b4 (patch) | |
tree | f3e73bf1a50802f9f1fd8f4bcee5361654350e32 /mysys | |
parent | 5c7960965c4c178d3a02f9893ea65b2802b38b8f (diff) | |
download | mariadb-git-025400922118f11a15be54c66455f20e2f72c0b4.tar.gz |
postreview changes for page cache and pre review commit for loghandler
storage/maria/unittest/test_file.c:
Rename: unittest/mysys/test_file.c -> storage/maria/unittest/test_file.c
storage/maria/unittest/test_file.h:
Rename: unittest/mysys/test_file.h -> storage/maria/unittest/test_file.h
include/pagecache.h:
A waiting queue mechanism moved to separate file wqueue.*
Pointer name changed for compatibility
mysys/Makefile.am:
A waiting queue mechanism moved to separate file wqueue.*
mysys/mf_keycache.c:
fixed unsigned comparison
mysys/mf_pagecache.c:
A waiting queue mechanism moved to separate file wqueue.*
Fixed bug in unregistering block during write
storage/maria/Makefile.am:
The loghandler files added
storage/maria/ma_control_file.h:
Now we have loghandler and can compile control file
storage/maria/maria_def.h:
Including files need for compilation of maria
storage/maria/unittest/Makefile.am:
unit tests of loghandler
storage/maria/unittest/ma_control_file-t.c:
Used maria def
storage/maria/unittest/mf_pagecache_consist.c:
fixed memory overrun
storage/maria/unittest/mf_pagecache_single.c:
fixed used uninitialized memory
unittest/mysys/Makefile.am:
unittests of pagecache moved to maria becase pagecache need loghandler
include/wqueue.h:
New BitKeeper file ``include/wqueue.h''
mysys/wqueue.c:
New BitKeeper file ``mysys/wqueue.c''
storage/maria/ma_loghandler.c:
New BitKeeper file ``storage/maria/ma_loghandler.c''
storage/maria/ma_loghandler.h:
New BitKeeper file ``storage/maria/ma_loghandler.h''
storage/maria/ma_loghandler_lsn.h:
New BitKeeper file ``storage/maria/ma_loghandler_lsn.h''
storage/maria/unittest/ma_test_loghandler-t.c:
New BitKeeper file ``storage/maria/unittest/ma_test_loghandler-t.c''
storage/maria/unittest/ma_test_loghandler_multigroup-t.c:
New BitKeeper file ``storage/maria/unittest/ma_test_loghandler_multigroup-t.c''
storage/maria/unittest/ma_test_loghandler_multithread-t.c:
New BitKeeper file ``storage/maria/unittest/ma_test_loghandler_multithread-t.c''
storage/maria/unittest/ma_test_loghandler_pagecache-t.c:
New BitKeeper file ``storage/maria/unittest/ma_test_loghandler_pagecache-t.c''
Diffstat (limited to 'mysys')
-rw-r--r-- | mysys/Makefile.am | 2 | ||||
-rw-r--r-- | mysys/mf_keycache.c | 2 | ||||
-rwxr-xr-x | mysys/mf_pagecache.c | 776 | ||||
-rw-r--r-- | mysys/wqueue.c | 167 |
4 files changed, 516 insertions, 431 deletions
diff --git a/mysys/Makefile.am b/mysys/Makefile.am index 4d9570febbd..612411404c4 100644 --- a/mysys/Makefile.am +++ b/mysys/Makefile.am @@ -56,7 +56,7 @@ libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c my_mmap.c \ my_handler.c my_netware.c my_largepage.c \ my_memmem.c \ my_windac.c my_access.c base64.c my_libwrap.c \ - mf_pagecache.c + mf_pagecache.c wqueue.c EXTRA_DIST = thr_alarm.c thr_lock.c my_pthread.c my_thr_init.c \ thr_mutex.c thr_rwlock.c \ CMakeLists.txt mf_soundex.c \ diff --git a/mysys/mf_keycache.c b/mysys/mf_keycache.c index 9a99a278bc5..9cb428ab200 100644 --- a/mysys/mf_keycache.c +++ b/mysys/mf_keycache.c @@ -1008,12 +1008,12 @@ static void unlink_block(KEY_CACHE *keycache, BLOCK_LINK *block) KEYCACHE_THREAD_TRACE("unlink_block"); #if defined(KEYCACHE_DEBUG) + KEYCACHE_DBUG_ASSERT(keycache->blocks_available != 0); keycache->blocks_available--; KEYCACHE_DBUG_PRINT("unlink_block", ("unlinked block %u status=%x #requests=%u #available=%u", BLOCK_NUMBER(block), block->status, block->requests, keycache->blocks_available)); - KEYCACHE_DBUG_ASSERT(keycache->blocks_available >= 0); #endif } diff --git a/mysys/mf_pagecache.c b/mysys/mf_pagecache.c index 4b92f68d9bf..97cb542f329 100755 --- a/mysys/mf_pagecache.c +++ b/mysys/mf_pagecache.c @@ -26,7 +26,7 @@ When a new block is required it is first tried to pop one from the stack. If the stack is empty, it is tried to get a never-used block from the pool. If this is empty too, then a block is taken from the LRU ring, flushing it - to disk, if necessary. This is handled in find_key_block(). + to disk, if necessary. This is handled in find_block(). With the new free list, the blocks can have three temperatures: hot, warm and cold (which is free). This is remembered in the block header by the enum BLOCK_TEMPERATURE temperature variable. Remembering the @@ -91,13 +91,16 @@ /* In key cache we have external raw locking here we use SERIALIZED_READ_FROM_CACHE to avoid problem of reading - not consistent data from te page + not consistent data from the page. + (keycache functions (key_cache_read(), key_cache_insert() and + key_cache_write()) rely on external MyISAM lock, we don't) */ #define SERIALIZED_READ_FROM_CACHE yes #define BLOCK_INFO(B) \ DBUG_PRINT("info", \ - ("block 0x%lx, file %lu, page %lu, s %0x, hshL 0x%lx, req %u/%u", \ + ("block 0x%lx file %lu page %lu s %0x hshL 0x%lx req %u/%u " \ + "wrlock: %c", \ (ulong)(B), \ (ulong)((B)->hash_link ? \ (B)->hash_link->file.file : \ @@ -110,7 +113,8 @@ (uint) (B)->requests, \ (uint)((B)->hash_link ? \ (B)->hash_link->requests : \ - 0))) + 0), \ + ((block->status & BLOCK_WRLOCK)?'Y':'N'))) /* TODO: put it to my_static.c */ my_bool my_disable_flush_pagecache_blocks= 0; @@ -138,7 +142,7 @@ typedef pthread_cond_t KEYCACHE_CONDVAR; struct st_pagecache_page { PAGECACHE_FILE file; /* file to which the page belongs to */ - maria_page_no_t pageno; /* number of the page in the file */ + pgcache_page_no_t pageno; /* number of the page in the file */ }; /* element in the chain of a hash table bucket */ @@ -149,7 +153,7 @@ struct st_pagecache_hash_link struct st_pagecache_block_link *block; /* reference to the block for the page: */ PAGECACHE_FILE file; /* from such a file */ - maria_page_no_t pageno; /* this page */ + pgcache_page_no_t pageno; /* this page */ uint requests; /* number of requests for the page */ }; @@ -162,7 +166,7 @@ struct st_pagecache_hash_link #define BLOCK_CHANGED 32 /* block buffer contains a dirty page */ #define BLOCK_WRLOCK 64 /* write locked block */ -/* page status, returned by find_key_block */ +/* page status, returned by find_block */ #define PAGE_READ 0 #define PAGE_TO_BE_READ 1 #define PAGE_WAIT_TO_BE_READ 2 @@ -232,7 +236,7 @@ typedef struct st_pagecache_lock_info node the node which should be linked */ -void info_link(PAGECACHE_PIN_INFO **list, PAGECACHE_PIN_INFO *node) +static void info_link(PAGECACHE_PIN_INFO **list, PAGECACHE_PIN_INFO *node) { if ((node->next= *list)) node->next->prev= &(node->next); @@ -249,7 +253,7 @@ void info_link(PAGECACHE_PIN_INFO **list, PAGECACHE_PIN_INFO *node) node the node which should be unlinked */ -void info_unlink(PAGECACHE_PIN_INFO *node) +static void info_unlink(PAGECACHE_PIN_INFO *node) { if ((*node->prev= node->next)) node->next->prev= node->prev; @@ -271,8 +275,8 @@ void info_unlink(PAGECACHE_PIN_INFO *node) pointer to the information node of the thread in the list */ -PAGECACHE_PIN_INFO *info_find(PAGECACHE_PIN_INFO *list, - struct st_my_thread_var *thread) +static PAGECACHE_PIN_INFO *info_find(PAGECACHE_PIN_INFO *list, + struct st_my_thread_var *thread) { register PAGECACHE_PIN_INFO *i= list; for(; i != 0; i= i->next) @@ -291,7 +295,7 @@ struct st_pagecache_block_link *next_changed, **prev_changed; /* for lists of file dirty/clean blocks */ struct st_pagecache_hash_link *hash_link; /* backward ptr to referring hash_link */ - PAGECACHE_WQUEUE + WQUEUE wqueue[COND_SIZE]; /* queues on waiting requests for new/old pages */ uint requests; /* number of requests for the block */ byte *buffer; /* buffer for the block page */ @@ -310,8 +314,8 @@ struct st_pagecache_block_link #ifdef PAGECACHE_DEBUG /* debug checks */ -my_bool info_check_pin(PAGECACHE_BLOCK_LINK *block, - enum pagecache_page_pin mode) +static my_bool info_check_pin(PAGECACHE_BLOCK_LINK *block, + enum pagecache_page_pin mode) { struct st_my_thread_var *thread= my_thread_var; DBUG_ENTER("info_check_pin"); @@ -367,9 +371,9 @@ my_bool info_check_pin(PAGECACHE_BLOCK_LINK *block, 1 - Error */ -my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block, - enum pagecache_page_lock lock, - enum pagecache_page_pin pin) +static my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block, + enum pagecache_page_lock lock, + enum pagecache_page_pin pin) { struct st_my_thread_var *thread= my_thread_var; DBUG_ENTER("info_check_lock"); @@ -379,47 +383,47 @@ my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block, switch(lock) { case PAGECACHE_LOCK_LEFT_UNLOCKED: - DBUG_ASSERT(pin == PAGECACHE_PIN_LEFT_UNPINNED); - if (info) + if (pin != PAGECACHE_PIN_LEFT_UNPINNED || + info) goto error; break; case PAGECACHE_LOCK_LEFT_READLOCKED: - DBUG_ASSERT(pin == PAGECACHE_PIN_LEFT_UNPINNED || - pin == PAGECACHE_PIN_LEFT_PINNED); - if (info == 0 || info->write_lock) + if ((pin != PAGECACHE_PIN_LEFT_UNPINNED && + pin != PAGECACHE_PIN_LEFT_PINNED) || + info == 0 || info->write_lock) goto error; break; case PAGECACHE_LOCK_LEFT_WRITELOCKED: - DBUG_ASSERT(pin == PAGECACHE_PIN_LEFT_PINNED); - if (info == 0 || !info->write_lock) + if (pin != PAGECACHE_PIN_LEFT_PINNED || + info == 0 || !info->write_lock) goto error; break; case PAGECACHE_LOCK_READ: - DBUG_ASSERT(pin == PAGECACHE_PIN_LEFT_UNPINNED || - pin == PAGECACHE_PIN); - if (info != 0) + if ((pin != PAGECACHE_PIN_LEFT_UNPINNED && + pin != PAGECACHE_PIN) || + info != 0) goto error; break; case PAGECACHE_LOCK_WRITE: - DBUG_ASSERT(pin == PAGECACHE_PIN); - if (info != 0) + if (pin != PAGECACHE_PIN || + info != 0) goto error; break; case PAGECACHE_LOCK_READ_UNLOCK: - DBUG_ASSERT(pin == PAGECACHE_PIN_LEFT_UNPINNED || - pin == PAGECACHE_UNPIN); - if (info == 0 || info->write_lock) + if ((pin != PAGECACHE_PIN_LEFT_UNPINNED && + pin != PAGECACHE_UNPIN) || + info == 0 || info->write_lock) goto error; break; case PAGECACHE_LOCK_WRITE_UNLOCK: - DBUG_ASSERT(pin == PAGECACHE_UNPIN); - if (info == 0 || !info->write_lock) + if (pin != PAGECACHE_UNPIN || + info == 0 || !info->write_lock) goto error; break; case PAGECACHE_LOCK_WRITE_TO_READ: - DBUG_ASSERT(pin == PAGECACHE_PIN_LEFT_PINNED || - pin == PAGECACHE_UNPIN); - if (info == 0 || !info->write_lock) + if ((pin != PAGECACHE_PIN_LEFT_PINNED && + pin != PAGECACHE_UNPIN) || + info == 0 || !info->write_lock) goto error; break; } @@ -439,12 +443,6 @@ error: #define FLUSH_CACHE 2000 /* sort this many blocks at once */ static int flush_all_key_blocks(PAGECACHE *pagecache); -#ifdef THREAD -static void link_into_queue(PAGECACHE_WQUEUE *wqueue, - struct st_my_thread_var *thread); -static void unlink_from_queue(PAGECACHE_WQUEUE *wqueue, - struct st_my_thread_var *thread); -#endif static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block); static void test_key_cache(PAGECACHE *pagecache, const char *where, my_bool lock); @@ -551,6 +549,7 @@ static int ___pagecache_pthread_cond_signal(pthread_cond_t *cond); #define pagecache_pthread_cond_signal pthread_cond_signal #endif /* defined(PAGECACHE_DEBUG) */ +extern my_bool translog_flush(LSN *lsn); /* Write page to the disk @@ -567,18 +566,28 @@ static int ___pagecache_pthread_cond_signal(pthread_cond_t *cond); 0 - OK !=0 - Error */ -uint pagecache_fwrite(PAGECACHE *pagecache, - PAGECACHE_FILE *filedesc, - byte *buffer, - maria_page_no_t pageno, - enum pagecache_page_type type, - myf flags) + +static uint pagecache_fwrite(PAGECACHE *pagecache, + PAGECACHE_FILE *filedesc, + byte *buffer, + pgcache_page_no_t pageno, + enum pagecache_page_type type, + myf flags) { DBUG_ENTER("pagecache_fwrite"); if (type == PAGECACHE_LSN_PAGE) { + LSN lsn; DBUG_PRINT("info", ("Log handler call")); - /* TODO: put here loghandler call */ + /* TODO: integrate with page format */ +#define PAGE_LSN_OFFSET 0 + lsn7korr(&lsn, buffer + PAGE_LSN_OFFSET); + /* + check CONTROL_FILE_IMPOSSIBLE_FILENO & + CONTROL_FILE_IMPOSSIBLE_LOG_OFFSET + */ + DBUG_ASSERT(lsn.file_no != 0 && lsn.rec_offset != 0); + translog_flush(&lsn); } DBUG_RETURN(my_pwrite(filedesc->file, buffer, pagecache->block_size, (pageno)<<(pagecache->shift), flags)); @@ -628,8 +637,6 @@ static uint next_power(uint value) division_limit division limit (may be zero) age_threshold age threshold (may be zero) block_size size of block (should be power of 2) - loghandler logfandler pointer to call it in case of - pages with LSN RETURN VALUE number of blocks in the key cache, if successful, @@ -647,12 +654,11 @@ static uint next_power(uint value) int init_pagecache(PAGECACHE *pagecache, my_size_t use_mem, uint division_limit, uint age_threshold, - uint block_size, - LOG_HANDLER *loghandler) + uint block_size) { - int blocks, hash_links, length; + uint blocks, hash_links, length; int error; - DBUG_ENTER("init_key_cache"); + DBUG_ENTER("init_pagecache"); DBUG_ASSERT(block_size >= 512); PAGECACHE_DEBUG_OPEN; @@ -662,8 +668,6 @@ int init_pagecache(PAGECACHE *pagecache, my_size_t use_mem, DBUG_RETURN(0); } - pagecache->loghandler= loghandler; - pagecache->global_cache_w_requests= pagecache->global_cache_r_requests= 0; pagecache->global_cache_read= pagecache->global_cache_write= 0; pagecache->disk_blocks= -1; @@ -692,8 +696,8 @@ int init_pagecache(PAGECACHE *pagecache, my_size_t use_mem, for ( ; ; ) { /* Set my_hash_entries to the next bigger 2 power */ - if ((pagecache->hash_entries= next_power((uint)blocks)) < - ((uint)blocks) * 5/4) + if ((pagecache->hash_entries= next_power(blocks)) < + (blocks) * 5/4) pagecache->hash_entries<<= 1; hash_links= 2 * blocks; #if defined(MAX_THREADS) @@ -704,7 +708,7 @@ int init_pagecache(PAGECACHE *pagecache, my_size_t use_mem, ALIGN_SIZE(hash_links * sizeof(PAGECACHE_HASH_LINK)) + ALIGN_SIZE(sizeof(PAGECACHE_HASH_LINK*) * pagecache->hash_entries))) + - ((ulong) blocks << pagecache->shift) > use_mem) + (((ulong) blocks) << pagecache->shift) > use_mem) blocks--; /* Allocate memory for cache page buffers */ if ((pagecache->block_mem= @@ -760,10 +764,10 @@ int init_pagecache(PAGECACHE *pagecache, my_size_t use_mem, pagecache->warm_blocks= 0; pagecache->min_warm_blocks= (division_limit ? blocks * division_limit / 100 + 1 : - (ulong)blocks); + blocks); pagecache->age_threshold= (age_threshold ? blocks * age_threshold / 100 : - (ulong)blocks); + blocks); pagecache->cnt_for_resize_op= 0; pagecache->resize_in_flush= 0; @@ -842,7 +846,8 @@ int resize_pagecache(PAGECACHE *pagecache, { int blocks; struct st_my_thread_var *thread; - PAGECACHE_WQUEUE *wqueue; + WQUEUE *wqueue; + DBUG_ENTER("resize_pagecache"); if (!pagecache->inited) @@ -859,7 +864,7 @@ int resize_pagecache(PAGECACHE *pagecache, #ifdef THREAD wqueue= &pagecache->resize_queue; thread= my_thread_var; - link_into_queue(wqueue, thread); + wqueue_link_into_queue(wqueue, thread); while (wqueue->last_thread->next != thread) { @@ -892,12 +897,11 @@ int resize_pagecache(PAGECACHE *pagecache, end_pagecache(pagecache, 0); /* Don't free mutex */ /* The following will work even if use_mem is 0 */ blocks= init_pagecache(pagecache, pagecache->block_size, use_mem, - division_limit, age_threshold, - pagecache->loghandler); + division_limit, age_threshold); finish: #ifdef THREAD - unlink_from_queue(wqueue, thread); + wqueue_unlink_from_queue(wqueue, thread); /* Signal for the next resize request to proceeed if any */ if (wqueue->last_thread) { @@ -1027,146 +1031,6 @@ void end_pagecache(PAGECACHE *pagecache, my_bool cleanup) } /* end_pagecache */ -#ifdef THREAD -/* - Link a thread into double-linked queue of waiting threads. - - SYNOPSIS - link_into_queue() - wqueue pointer to the queue structure - thread pointer to the thread to be added to the queue - - RETURN VALUE - none - - NOTES. - Queue is represented by a circular list of the thread structures - The list is double-linked of the type (**prev,*next), accessed by - a pointer to the last element. -*/ - -static void link_into_queue(PAGECACHE_WQUEUE *wqueue, - struct st_my_thread_var *thread) -{ - struct st_my_thread_var *last; - if (! (last= wqueue->last_thread)) - { - /* Queue is empty */ - thread->next= thread; - thread->prev= &thread->next; - } - else - { - thread->prev= last->next->prev; - last->next->prev= &thread->next; - thread->next= last->next; - last->next= thread; - } - wqueue->last_thread= thread; -} - -/* - Unlink a thread from double-linked queue of waiting threads - - SYNOPSIS - unlink_from_queue() - wqueue pointer to the queue structure - thread pointer to the thread to be removed from the queue - - RETURN VALUE - none - - NOTES. - See NOTES for link_into_queue -*/ - -static void unlink_from_queue(PAGECACHE_WQUEUE *wqueue, - struct st_my_thread_var *thread) -{ - KEYCACHE_DBUG_PRINT("unlink_from_queue", ("thread %ld", thread->id)); - if (thread->next == thread) - /* The queue contains only one member */ - wqueue->last_thread= NULL; - else - { - thread->next->prev= thread->prev; - *thread->prev=thread->next; - if (wqueue->last_thread == thread) - wqueue->last_thread= STRUCT_PTR(struct st_my_thread_var, next, - thread->prev); - } - thread->next= NULL; -} - - -/* - Add a thread to single-linked queue of waiting threads - - SYNOPSIS - add_to_queue() - wqueue pointer to the queue structure - thread pointer to the thread to be added to the queue - - RETURN VALUE - none - - NOTES. - Queue is represented by a circular list of the thread structures - The list is single-linked of the type (*next), accessed by a pointer - to the last element. -*/ - -static inline void add_to_queue(PAGECACHE_WQUEUE *wqueue, - struct st_my_thread_var *thread) -{ - struct st_my_thread_var *last; - if (! (last= wqueue->last_thread)) - thread->next= thread; - else - { - thread->next= last->next; - last->next= thread; - } - wqueue->last_thread= thread; -} - - -/* - Remove all threads from queue signaling them to proceed - - SYNOPSIS - realease_queue() - wqueue pointer to the queue structure - thread pointer to the thread to be added to the queue - - RETURN VALUE - none - - NOTES. - See notes for add_to_queue - When removed from the queue each thread is signaled via condition - variable thread->suspend. -*/ - -static void release_queue(PAGECACHE_WQUEUE *wqueue) -{ - struct st_my_thread_var *last= wqueue->last_thread; - struct st_my_thread_var *next= last->next; - struct st_my_thread_var *thread; - do - { - thread=next; - KEYCACHE_DBUG_PRINT("release_queue: signal", ("thread %ld", thread->id)); - pagecache_pthread_cond_signal(&thread->suspend); - next=thread->next; - thread->next= NULL; - } - while (thread != last); - wqueue->last_thread= NULL; -} -#endif - - /* Unlink a block from the chain of dirty/clean blocks */ @@ -1273,6 +1137,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, PAGECACHE_BLOCK_LINK *ins; PAGECACHE_BLOCK_LINK **ptr_ins; + BLOCK_INFO(block); KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests)); #ifdef THREAD if (!hot && pagecache->waiting_for_block.last_thread) @@ -1297,7 +1162,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, { KEYCACHE_DBUG_PRINT("link_block: signal", ("thread %ld", thread->id)); pagecache_pthread_cond_signal(&thread->suspend); - unlink_from_queue(&pagecache->waiting_for_block, thread); + wqueue_unlink_from_queue(&pagecache->waiting_for_block, thread); block->requests++; } } @@ -1363,6 +1228,8 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, static void unlink_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block) { + DBUG_ENTER("unlink_block"); + DBUG_PRINT("unlink_block", ("unlink 0x%lx", (ulong)block)); if (block->next_used == block) /* The list contains only one member */ pagecache->used_last= pagecache->used_ins= NULL; @@ -1381,14 +1248,15 @@ static void unlink_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block) KEYCACHE_THREAD_TRACE("unlink_block"); #if defined(PAGECACHE_DEBUG) + KEYCACHE_DBUG_ASSERT(pagecache->blocks_available != 0); pagecache->blocks_available--; KEYCACHE_DBUG_PRINT("unlink_block", ("unlinked block 0x%lx (%u) status=%x #requests=%u #available=%u", (ulong)block, BLOCK_NUMBER(pagecache, block), block->status, block->requests, pagecache->blocks_available)); BLOCK_INFO(block); - KEYCACHE_DBUG_ASSERT(pagecache->blocks_available >= 0); #endif + DBUG_VOID_RETURN; } @@ -1591,7 +1459,7 @@ static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link) { KEYCACHE_DBUG_PRINT("unlink_hash: signal", ("thread %ld", thread->id)); pagecache_pthread_cond_signal(&thread->suspend); - unlink_from_queue(&pagecache->waiting_for_hash_link, thread); + wqueue_unlink_from_queue(&pagecache->waiting_for_hash_link, thread); } } while (thread != last_thread); @@ -1618,7 +1486,7 @@ static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link) pagecache Pagecache reference file file ID pageno page number in the file - start where to put pointer to found hash link (for + start where to put pointer to found hash bucket (for direct referring it) RETURN @@ -1627,7 +1495,7 @@ static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link) static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache, PAGECACHE_FILE *file, - maria_page_no_t pageno, + pgcache_page_no_t pageno, PAGECACHE_HASH_LINK ***start) { reg1 PAGECACHE_HASH_LINK *hash_link; @@ -1670,6 +1538,12 @@ static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache, KEYCACHE_DBUG_ASSERT(cnt <= pagecache->hash_links_used); #endif } + if (hash_link) + { + /* Register the request for the page */ + hash_link->requests++; + } + DBUG_RETURN(hash_link); } @@ -1680,7 +1554,7 @@ static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache, static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache, PAGECACHE_FILE *file, - maria_page_no_t pageno) + pgcache_page_no_t pageno) { reg1 PAGECACHE_HASH_LINK *hash_link; PAGECACHE_HASH_LINK **start; @@ -1693,7 +1567,7 @@ restart: /* try to find the page in the cache */ hash_link= get_present_hash_link(pagecache, file, pageno, &start); - if (! hash_link) + if (!hash_link) { /* There is no hash link in the hash table for the pair (file, pageno) */ if (pagecache->free_hash_list) @@ -1714,7 +1588,7 @@ restart: page.file= *file; page.pageno= pageno; thread->opt_info= (void *) &page; - link_into_queue(&pagecache->waiting_for_hash_link, thread); + wqueue_link_into_queue(&pagecache->waiting_for_hash_link, thread); KEYCACHE_DBUG_PRINT("get_hash_link: wait", ("suspend thread %ld", thread->id)); pagecache_pthread_cond_wait(&thread->suspend, @@ -1723,14 +1597,15 @@ restart: #else KEYCACHE_DBUG_ASSERT(0); #endif + DBUG_PRINT("info", ("restarting...")); goto restart; } hash_link->file= *file; hash_link->pageno= pageno; link_hash(start, hash_link); + /* Register the request for the page */ + hash_link->requests++; } - /* Register the request for the page */ - hash_link->requests++; return hash_link; } @@ -1743,7 +1618,7 @@ restart: SYNOPSIS - find_key_block() + find_block() pagecache pointer to a page cache data structure file handler for the file to read page from pageno number of the page in the file @@ -1773,29 +1648,29 @@ restart: waits until first of this operations links any block back. */ -static PAGECACHE_BLOCK_LINK *find_key_block(PAGECACHE *pagecache, - PAGECACHE_FILE *file, - maria_page_no_t pageno, - int init_hits_left, - my_bool wrmode, - my_bool reg_req, - int *page_st) +static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache, + PAGECACHE_FILE *file, + pgcache_page_no_t pageno, + int init_hits_left, + my_bool wrmode, + my_bool reg_req, + int *page_st) { PAGECACHE_HASH_LINK *hash_link; PAGECACHE_BLOCK_LINK *block; int error= 0; int page_status; - DBUG_ENTER("find_key_block"); - KEYCACHE_THREAD_TRACE("find_key_block:begin"); + DBUG_ENTER("find_block"); + KEYCACHE_THREAD_TRACE("find_block:begin"); DBUG_PRINT("enter", ("fd: %u pos %lu wrmode: %lu", (uint) file->file, (ulong) pageno, (uint) wrmode)); - KEYCACHE_DBUG_PRINT("find_key_block", ("fd: %u pos: %lu wrmode: %lu", - (uint) file->file, (ulong) pageno, - (uint) wrmode)); + KEYCACHE_DBUG_PRINT("find_block", ("fd: %u pos: %lu wrmode: %lu", + (uint) file->file, (ulong) pageno, + (uint) wrmode)); #if !defined(DBUG_OFF) && defined(EXTRA_DEBUG) DBUG_EXECUTE("check_pagecache", - test_key_cache(pagecache, "start of find_key_block", 0);); + test_key_cache(pagecache, "start of find_block", 0);); #endif restart: @@ -1840,10 +1715,10 @@ restart: { #ifdef THREAD struct st_my_thread_var *thread= my_thread_var; - add_to_queue(&block->wqueue[COND_FOR_SAVED], thread); + wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread); do { - KEYCACHE_DBUG_PRINT("find_key_block: wait", + KEYCACHE_DBUG_PRINT("find_block: wait", ("suspend thread %ld", thread->id)); pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock); @@ -1871,7 +1746,7 @@ restart: { /* This is a request for a page to be removed from cache */ - KEYCACHE_DBUG_PRINT("find_key_block", + KEYCACHE_DBUG_PRINT("find_block", ("request for old page in block %u " "wrmode: %d block->status: %d", BLOCK_NUMBER(pagecache, block), wrmode, @@ -1888,17 +1763,17 @@ restart: else { hash_link->requests--; - KEYCACHE_DBUG_PRINT("find_key_block", + KEYCACHE_DBUG_PRINT("find_block", ("request waiting for old page to be saved")); { #ifdef THREAD struct st_my_thread_var *thread= my_thread_var; /* Put the request into the queue of those waiting for the old page */ - add_to_queue(&block->wqueue[COND_FOR_SAVED], thread); + wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread); /* Wait until the request can be resubmitted */ do { - KEYCACHE_DBUG_PRINT("find_key_block: wait", + KEYCACHE_DBUG_PRINT("find_block: wait", ("suspend thread %ld", thread->id)); pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock); @@ -1909,11 +1784,13 @@ restart: /* No parallel requests in single-threaded case */ #endif } - KEYCACHE_DBUG_PRINT("find_key_block", + KEYCACHE_DBUG_PRINT("find_block", ("request for old page resubmitted")); + DBUG_PRINT("info", ("restarting...")); /* Resubmit the request */ goto restart; } + block->status&= ~BLOCK_IN_SWITCH; } else { @@ -1941,7 +1818,8 @@ restart: pagecache->blocks_used++; } pagecache->blocks_unused--; - DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0); + DBUG_ASSERT((block->status & BLOCK_WRLOCK)); + DBUG_ASSERT(block->pins > 0); block->status= 0; #ifndef DBUG_OFF block->type= PAGECACHE_EMPTY_PAGE; @@ -1954,7 +1832,9 @@ restart: block->hash_link= hash_link; hash_link->block= block; page_status= PAGE_TO_BE_READ; - KEYCACHE_DBUG_PRINT("find_key_block", + DBUG_PRINT("info", ("page to be read set for page 0x%lx", + (ulong)block)); + KEYCACHE_DBUG_PRINT("find_block", ("got free or never used block %u", BLOCK_NUMBER(pagecache, block))); } @@ -1973,10 +1853,10 @@ restart: { struct st_my_thread_var *thread= my_thread_var; thread->opt_info= (void *) hash_link; - link_into_queue(&pagecache->waiting_for_block, thread); + wqueue_link_into_queue(&pagecache->waiting_for_block, thread); do { - KEYCACHE_DBUG_PRINT("find_key_block: wait", + KEYCACHE_DBUG_PRINT("find_block: wait", ("suspend thread %ld", thread->id)); pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock); @@ -2001,19 +1881,18 @@ restart: reg_requests(pagecache, block,1); hash_link->block= block; } - else - { - DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0); - } + DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0); + DBUG_ASSERT(block->pins > 0); if (block->hash_link != hash_link && ! (block->status & BLOCK_IN_SWITCH) ) { /* this is a primary request for a new page */ DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0); + DBUG_ASSERT(block->pins > 0); block->status|= (BLOCK_IN_SWITCH | BLOCK_WRLOCK); - KEYCACHE_DBUG_PRINT("find_key_block", + KEYCACHE_DBUG_PRINT("find_block", ("got block %u for new page", BLOCK_NUMBER(pagecache, block))); @@ -2021,7 +1900,7 @@ restart: { /* The block contains a dirty page - push it out of the cache */ - KEYCACHE_DBUG_PRINT("find_key_block", ("block is dirty")); + KEYCACHE_DBUG_PRINT("find_block", ("block is dirty")); pagecache_pthread_mutex_unlock(&pagecache->cache_lock); /* @@ -2054,7 +1933,7 @@ restart: unlink_hash(pagecache, block->hash_link); /* All pending requests for this page must be resubmitted */ if (block->wqueue[COND_FOR_SAVED].last_thread) - release_queue(&block->wqueue[COND_FOR_SAVED]); + wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]); } link_to_file_list(pagecache, block, file, (my_bool)(block->hash_link ? 1 : 0)); @@ -2065,6 +1944,8 @@ restart: #endif block->hash_link= hash_link; page_status= PAGE_TO_BE_READ; + DBUG_PRINT("info", ("page to be read set for page 0x%lx", + (ulong)block)); KEYCACHE_DBUG_ASSERT(block->hash_link->block == block); KEYCACHE_DBUG_ASSERT(hash_link->block->hash_link == hash_link); @@ -2072,7 +1953,7 @@ restart: else { /* This is for secondary requests for a new page only */ - KEYCACHE_DBUG_PRINT("find_key_block", + KEYCACHE_DBUG_PRINT("find_block", ("block->hash_link: %p hash_link: %p " "block->status: %u", block->hash_link, hash_link, block->status )); @@ -2087,7 +1968,7 @@ restart: { if (reg_req) reg_requests(pagecache, block, 1); - KEYCACHE_DBUG_PRINT("find_key_block", + KEYCACHE_DBUG_PRINT("find_block", ("block->hash_link: %p hash_link: %p " "block->status: %u", block->hash_link, hash_link, block->status )); @@ -2098,12 +1979,12 @@ restart: } KEYCACHE_DBUG_ASSERT(page_status != -1); - *page_st=page_status; + *page_st= page_status; DBUG_PRINT("info", ("block: 0x%lx fd: %u pos %lu block->status %u page_status %lu", (ulong) block, (uint) file->file, (ulong) pageno, block->status, (uint) page_status)); - KEYCACHE_DBUG_PRINT("find_key_block", + KEYCACHE_DBUG_PRINT("find_block", ("block: 0x%lx fd: %u pos %lu block->status %u page_status %lu", (ulong) block, (uint) file->file, (ulong) pageno, block->status, @@ -2111,16 +1992,16 @@ restart: #if !defined(DBUG_OFF) && defined(EXTRA_DEBUG) DBUG_EXECUTE("check_pagecache", - test_key_cache(pagecache, "end of find_key_block",0);); + test_key_cache(pagecache, "end of find_block",0);); #endif - KEYCACHE_THREAD_TRACE("find_key_block:end"); + KEYCACHE_THREAD_TRACE("find_block:end"); DBUG_RETURN(block); } -void pagecache_add_pin(PAGECACHE_BLOCK_LINK *block) +static void add_pin(PAGECACHE_BLOCK_LINK *block) { - DBUG_ENTER("pagecache_add_pin"); + DBUG_ENTER("add_pin"); DBUG_PRINT("enter", ("block 0x%lx pins: %u", (ulong) block, block->pins)); @@ -2137,9 +2018,9 @@ void pagecache_add_pin(PAGECACHE_BLOCK_LINK *block) DBUG_VOID_RETURN; } -void pagecache_remove_pin(PAGECACHE_BLOCK_LINK *block) +static void remove_pin(PAGECACHE_BLOCK_LINK *block) { - DBUG_ENTER("pagecache_remove_pin"); + DBUG_ENTER("remove_pin"); DBUG_PRINT("enter", ("block 0x%lx pins: %u", (ulong) block, block->pins)); @@ -2157,7 +2038,7 @@ void pagecache_remove_pin(PAGECACHE_BLOCK_LINK *block) DBUG_VOID_RETURN; } #ifdef PAGECACHE_DEBUG -void pagecache_add_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl) +static void info_add_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl) { PAGECACHE_LOCK_INFO *info= (PAGECACHE_LOCK_INFO *)my_malloc(sizeof(PAGECACHE_LOCK_INFO), MYF(0)); @@ -2166,7 +2047,7 @@ void pagecache_add_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl) info_link((PAGECACHE_PIN_INFO **)&block->lock_list, (PAGECACHE_PIN_INFO *)info); } -void pagecache_remove_lock(PAGECACHE_BLOCK_LINK *block) +static void info_remove_lock(PAGECACHE_BLOCK_LINK *block) { PAGECACHE_LOCK_INFO *info= (PAGECACHE_LOCK_INFO *)info_find((PAGECACHE_PIN_INFO *)block->lock_list, @@ -2175,7 +2056,7 @@ void pagecache_remove_lock(PAGECACHE_BLOCK_LINK *block) info_unlink((PAGECACHE_PIN_INFO *)info); my_free((gptr)info, MYF(0)); } -void pagecache_change_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl) +static void info_change_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl) { PAGECACHE_LOCK_INFO *info= (PAGECACHE_LOCK_INFO *)info_find((PAGECACHE_PIN_INFO *)block->lock_list, @@ -2184,40 +2065,47 @@ void pagecache_change_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl) info->write_lock= wl; } #else -#define pagecache_add_lock(B,W) -#define pagecache_remove_lock(B) -#define pagecache_change_lock(B,W) +#define info_add_lock(B,W) +#define info_remove_lock(B) +#define info_change_lock(B,W) #endif /* - Put on the block "update" type lock + Put on the block write lock SYNOPSIS - pagecache_lock_block() + get_wrlock() pagecache pointer to a page cache data structure block the block to work with RETURN 0 - OK - 1 - Try to lock the block failed + 1 - Can't lock this block, need retry */ -my_bool pagecache_lock_block(PAGECACHE *pagecache, - PAGECACHE_BLOCK_LINK *block) -{ - DBUG_ENTER("pagecache_lock_block"); +static my_bool get_wrlock(PAGECACHE *pagecache, + PAGECACHE_BLOCK_LINK *block) +{ + PAGECACHE_FILE file= block->hash_link->file; + pgcache_page_no_t pageno= block->hash_link->pageno; + DBUG_ENTER("get_wrlock"); + DBUG_PRINT("info", ("the block 0x%lx " + "files %d(%d) pages %d(%d)", + (ulong)block, + file.file, block->hash_link->file.file, + pageno, block->hash_link->pageno)); BLOCK_INFO(block); while (block->status & BLOCK_WRLOCK) { - DBUG_PRINT("info", ("fail to lock, waiting...")); + DBUG_PRINT("info", ("fail to lock, waiting... 0x%lx", (ulong)block)); /* Lock failed we will wait */ #ifdef THREAD struct st_my_thread_var *thread= my_thread_var; - add_to_queue(&block->wqueue[COND_FOR_WRLOCK], thread); + wqueue_add_to_queue(&block->wqueue[COND_FOR_WRLOCK], thread); dec_counter_for_resize_op(pagecache); do { - KEYCACHE_DBUG_PRINT("pagecache_lock_block: wait", + KEYCACHE_DBUG_PRINT("get_wrlock: wait", ("suspend thread %ld", thread->id)); pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock); @@ -2227,35 +2115,61 @@ my_bool pagecache_lock_block(PAGECACHE *pagecache, DBUG_ASSERT(0); #endif BLOCK_INFO(block); - DBUG_RETURN(1); + if ((block->status & (BLOCK_REASSIGNED | BLOCK_IN_SWITCH)) || + file.file != block->hash_link->file.file || + pageno != block->hash_link->pageno) + { + DBUG_PRINT("info", ("the block 0x%lx changed => need retry" + "status %x files %d != %d or pages %d !=%d", + (ulong)block, block->status, + file.file, block->hash_link->file.file, + pageno, block->hash_link->pageno)); + DBUG_RETURN(1); + } } - /* we are doing it by global cache mutex protectio, so it is OK */ + DBUG_ASSERT(block->pins == 0); + /* we are doing it by global cache mutex protection, so it is OK */ block->status|= BLOCK_WRLOCK; DBUG_PRINT("info", ("WR lock set, block 0x%lx", (ulong)block)); DBUG_RETURN(0); } -void pagecache_unlock_block(PAGECACHE_BLOCK_LINK *block) + +/* + Remove write lock from the block + + SYNOPSIS + release_wrlock() + pagecache pointer to a page cache data structure + block the block to work with + + RETURN + 0 - OK +*/ + +static void release_wrlock(PAGECACHE_BLOCK_LINK *block) { - DBUG_ENTER("pagecache_unlock_block"); + DBUG_ENTER("release_wrlock"); BLOCK_INFO(block); DBUG_ASSERT(block->status & BLOCK_WRLOCK); + DBUG_ASSERT(block->pins > 0); block->status&= ~BLOCK_WRLOCK; DBUG_PRINT("info", ("WR lock reset, block 0x%lx", (ulong)block)); #ifdef THREAD /* release all threads waiting for write lock */ if (block->wqueue[COND_FOR_WRLOCK].last_thread) - release_queue(&block->wqueue[COND_FOR_WRLOCK]); + wqueue_release_queue(&block->wqueue[COND_FOR_WRLOCK]); #endif BLOCK_INFO(block); DBUG_VOID_RETURN; } + /* - Try to lock/uplock and pin/unpin the block + Try to lock/unlock and pin/unpin the block SYNOPSIS - pagecache_make_lock_and_pin() + make_lock_and_pin() pagecache pointer to a page cache data structure block the block to work with lock lock change mode @@ -2266,12 +2180,12 @@ void pagecache_unlock_block(PAGECACHE_BLOCK_LINK *block) 1 - Try to lock the block failed */ -my_bool pagecache_make_lock_and_pin(PAGECACHE *pagecache, - PAGECACHE_BLOCK_LINK *block, - enum pagecache_page_lock lock, - enum pagecache_page_pin pin) +static my_bool make_lock_and_pin(PAGECACHE *pagecache, + PAGECACHE_BLOCK_LINK *block, + enum pagecache_page_lock lock, + enum pagecache_page_pin pin) { - DBUG_ENTER("pagecache_make_lock_and_pin"); + DBUG_ENTER("make_lock_and_pin"); DBUG_PRINT("enter", ("block: 0x%lx (%u), wrlock: %c pins: %u, lock %s, pin: %s", (ulong)block, BLOCK_NUMBER(pagecache, block), ((block->status & BLOCK_WRLOCK)?'Y':'N'), @@ -2287,53 +2201,47 @@ my_bool pagecache_make_lock_and_pin(PAGECACHE *pagecache, { case PAGECACHE_LOCK_WRITE: /* free -> write */ /* Writelock and pin the buffer */ - if (pagecache_lock_block(pagecache, block)) + if (get_wrlock(pagecache, block)) { - DBUG_PRINT("info", ("restart")); - /* in case of fail pagecache_lock_block unlock cache */ - DBUG_RETURN(1); + /* can't lock => need retry */ + goto retry; } - /* The cache is locked so nothing afraid off */ - pagecache_add_pin(block); - pagecache_add_lock(block, 1); + + /* The cache is locked so nothing afraid of */ + add_pin(block); + info_add_lock(block, 1); break; case PAGECACHE_LOCK_WRITE_TO_READ: /* write -> read */ case PAGECACHE_LOCK_WRITE_UNLOCK: /* write -> free */ /* - Removes writelog and puts read lock (which is nothing in our + Removes write lock and puts read lock (which is nothing in our implementation) */ - pagecache_unlock_block(block); + release_wrlock(block); case PAGECACHE_LOCK_READ_UNLOCK: /* read -> free */ case PAGECACHE_LOCK_LEFT_READLOCKED: /* read -> read */ -#ifndef DBUG_OFF if (pin == PAGECACHE_UNPIN) { - pagecache_remove_pin(block); + remove_pin(block); } -#endif -#ifdef PAGECACHE_DEBUG if (lock == PAGECACHE_LOCK_WRITE_TO_READ) { - pagecache_change_lock(block, 0); + info_change_lock(block, 0); } else if (lock == PAGECACHE_LOCK_WRITE_UNLOCK || lock == PAGECACHE_LOCK_READ_UNLOCK) { - pagecache_remove_lock(block); + info_remove_lock(block); } -#endif break; case PAGECACHE_LOCK_READ: /* free -> read */ -#ifndef DBUG_OFF if (pin == PAGECACHE_PIN) { /* The cache is locked so nothing afraid off */ - pagecache_add_pin(block); + add_pin(block); } - pagecache_add_lock(block, 0); + info_add_lock(block, 0); break; -#endif case PAGECACHE_LOCK_LEFT_UNLOCKED: /* free -> free */ case PAGECACHE_LOCK_LEFT_WRITELOCKED: /* write -> write */ break; /* do nothing */ @@ -2343,6 +2251,16 @@ my_bool pagecache_make_lock_and_pin(PAGECACHE *pagecache, BLOCK_INFO(block); DBUG_RETURN(0); +retry: + DBUG_PRINT("INFO", ("Retry block 0x%lx", (ulong)block)); + BLOCK_INFO(block); + DBUG_ASSERT(block->hash_link->requests != 0); + block->hash_link->requests--; + DBUG_ASSERT(block->requests != 0); + unreg_request(pagecache, block, 1); + BLOCK_INFO(block); + DBUG_RETURN(1); + } @@ -2355,6 +2273,8 @@ my_bool pagecache_make_lock_and_pin(PAGECACHE *pagecache, pagecache pointer to a page cache data structure block block to which buffer the data is to be read primary <-> the current thread will read the data + validator validator of read from the disk data + validator_data pointer to the data need by the validator RETURN VALUE None @@ -2368,13 +2288,15 @@ my_bool pagecache_make_lock_and_pin(PAGECACHE *pagecache, static void read_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block, - my_bool primary) + my_bool primary, + pagecache_disk_read_validator validator, + gptr validator_data) { uint got_length; /* On entry cache_lock is locked */ - KEYCACHE_THREAD_TRACE("read_block"); + DBUG_ENTER("read_block"); if (primary) { /* @@ -2382,8 +2304,8 @@ static void read_block(PAGECACHE *pagecache, that submitted primary requests */ - KEYCACHE_DBUG_PRINT("read_block", - ("page to be read by primary request")); + DBUG_PRINT("read_block", + ("page to be read by primary request")); /* Page is not in buffer yet, is to be read from disk */ pagecache_pthread_mutex_unlock(&pagecache->cache_lock); @@ -2400,11 +2322,15 @@ static void read_block(PAGECACHE *pagecache, else block->status= (BLOCK_READ | (block->status & BLOCK_WRLOCK)); - KEYCACHE_DBUG_PRINT("read_block", - ("primary request: new page in cache")); + if (validator != NULL && + (*validator)(block->buffer, validator_data)) + block->status|= BLOCK_ERROR; + + DBUG_PRINT("read_block", + ("primary request: new page in cache")); /* Signal that all pending requests for this page now can be processed */ if (block->wqueue[COND_FOR_REQUESTED].last_thread) - release_queue(&block->wqueue[COND_FOR_REQUESTED]); + wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]); } else { @@ -2412,17 +2338,17 @@ static void read_block(PAGECACHE *pagecache, This code is executed only by threads that submitted secondary requests */ - KEYCACHE_DBUG_PRINT("read_block", - ("secondary request waiting for new page to be read")); + DBUG_PRINT("read_block", + ("secondary request waiting for new page to be read")); { #ifdef THREAD struct st_my_thread_var *thread= my_thread_var; /* Put the request into a queue and wait until it can be processed */ - add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread); + wqueue_add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread); do { - KEYCACHE_DBUG_PRINT("read_block: wait", - ("suspend thread %ld", thread->id)); + DBUG_PRINT("read_block: wait", + ("suspend thread %ld", thread->id)); pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock); } @@ -2432,9 +2358,10 @@ static void read_block(PAGECACHE *pagecache, /* No parallel requests in single-threaded case */ #endif } - KEYCACHE_DBUG_PRINT("read_block", - ("secondary request: new page in cache")); + DBUG_PRINT("read_block", + ("secondary request: new page in cache")); } + DBUG_VOID_RETURN; } @@ -2454,11 +2381,11 @@ static void read_block(PAGECACHE *pagecache, void pagecache_unlock_page(PAGECACHE *pagecache, PAGECACHE_FILE *file, - maria_page_no_t pageno, + pgcache_page_no_t pageno, enum pagecache_page_lock lock, enum pagecache_page_pin pin, my_bool stamp_this_page, - LSN first_REDO_LSN_for_page) + LSN_PTR first_REDO_LSN_for_page) { PAGECACHE_BLOCK_LINK *block; int page_st; @@ -2471,24 +2398,6 @@ void pagecache_unlock_page(PAGECACHE *pagecache, DBUG_ASSERT(pin != PAGECACHE_PIN && lock != PAGECACHE_LOCK_READ && lock != PAGECACHE_LOCK_WRITE); - if (pin == PAGECACHE_PIN_LEFT_UNPINNED && - lock == PAGECACHE_LOCK_READ_UNLOCK) - { -#ifndef DBUG_OFF - if ( -#endif - /* block do not need here so we do not provide it */ - pagecache_make_lock_and_pin(pagecache, 0, lock, pin) -#ifndef DBUG_OFF - ) - { - DBUG_ASSERT(0); /* should not happend */ - } -#else - ; -#endif - DBUG_VOID_RETURN; - } pagecache_pthread_mutex_lock(&pagecache->cache_lock); /* @@ -2498,7 +2407,7 @@ void pagecache_unlock_page(PAGECACHE *pagecache, DBUG_ASSERT(pagecache->can_be_used); inc_counter_for_resize_op(pagecache); - block= find_key_block(pagecache, file, pageno, 0, 0, 0, &page_st); + block= find_block(pagecache, file, pageno, 0, 0, 0, &page_st); BLOCK_INFO(block); DBUG_ASSERT(block != 0 && page_st == PAGE_READ); if (stamp_this_page) @@ -2511,7 +2420,7 @@ void pagecache_unlock_page(PAGECACHE *pagecache, #ifndef DBUG_OFF if ( #endif - pagecache_make_lock_and_pin(pagecache, block, lock, pin) + make_lock_and_pin(pagecache, block, lock, pin) #ifndef DBUG_OFF ) { @@ -2549,7 +2458,7 @@ void pagecache_unlock_page(PAGECACHE *pagecache, void pagecache_unpin_page(PAGECACHE *pagecache, PAGECACHE_FILE *file, - maria_page_no_t pageno) + pgcache_page_no_t pageno) { PAGECACHE_BLOCK_LINK *block; int page_st; @@ -2565,7 +2474,7 @@ void pagecache_unpin_page(PAGECACHE *pagecache, DBUG_ASSERT(pagecache->can_be_used); inc_counter_for_resize_op(pagecache); - block= find_key_block(pagecache, file, pageno, 0, 0, 0, &page_st); + block= find_block(pagecache, file, pageno, 0, 0, 0, &page_st); DBUG_ASSERT(block != 0 && page_st == PAGE_READ); #ifndef DBUG_OFF @@ -2576,9 +2485,9 @@ void pagecache_unpin_page(PAGECACHE *pagecache, a) we can't pin without any lock b) we can't unpin keeping write lock */ - pagecache_make_lock_and_pin(pagecache, block, - PAGECACHE_LOCK_LEFT_READLOCKED, - PAGECACHE_UNPIN) + make_lock_and_pin(pagecache, block, + PAGECACHE_LOCK_LEFT_READLOCKED, + PAGECACHE_UNPIN) #ifndef DBUG_OFF ) { @@ -2622,7 +2531,7 @@ void pagecache_unlock(PAGECACHE *pagecache, enum pagecache_page_lock lock, enum pagecache_page_pin pin, my_bool stamp_this_page, - LSN first_REDO_LSN_for_page) + LSN_PTR first_REDO_LSN_for_page) { PAGECACHE_BLOCK_LINK *block= (PAGECACHE_BLOCK_LINK *)link; DBUG_ENTER("pagecache_unlock"); @@ -2643,7 +2552,7 @@ void pagecache_unlock(PAGECACHE *pagecache, if ( #endif /* block do not need here so we do not provide it */ - pagecache_make_lock_and_pin(pagecache, 0, lock, pin) + make_lock_and_pin(pagecache, 0, lock, pin) #ifndef DBUG_OFF ) { @@ -2673,7 +2582,7 @@ void pagecache_unlock(PAGECACHE *pagecache, #ifndef DBUG_OFF if ( #endif - pagecache_make_lock_and_pin(pagecache, block, lock, pin) + make_lock_and_pin(pagecache, block, lock, pin) #ifndef DBUG_OFF ) { @@ -2736,9 +2645,9 @@ void pagecache_unpin(PAGECACHE *pagecache, a) we can't pin without any lock b) we can't unpin keeping write lock */ - pagecache_make_lock_and_pin(pagecache, block, - PAGECACHE_LOCK_LEFT_READLOCKED, - PAGECACHE_UNPIN) + make_lock_and_pin(pagecache, block, + PAGECACHE_LOCK_LEFT_READLOCKED, + PAGECACHE_UNPIN) #ifndef DBUG_OFF ) { @@ -2767,7 +2676,7 @@ void pagecache_unpin(PAGECACHE *pagecache, Read a block of data from a cached file into a buffer; SYNOPSIS - pagecache_read() + pagecache_valid_read() pagecache pointer to a page cache data structure file handler for the file for the block of data to be read pageno number of the block of data in the file @@ -2776,16 +2685,12 @@ void pagecache_unpin(PAGECACHE *pagecache, type type of the page lock lock change link link to the page if we pin it + validator validator of read from the disk data + validator_data pointer to the data need by the validator RETURN VALUE Returns address from where the data is placed if sucessful, 0 - otherwise. - NOTES. - - The function ensures that a block of data of size length from file - positioned at pageno is in the buffers for some key cache blocks. - Then the function copies the data into the buffer buff. - Pin will be choosen according to lock parameter (see lock_to_pin) */ static enum pagecache_page_pin lock_to_pin[]= @@ -2800,19 +2705,21 @@ static enum pagecache_page_pin lock_to_pin[]= PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_TO_READ*/ }; -byte *pagecache_read(PAGECACHE *pagecache, - PAGECACHE_FILE *file, - maria_page_no_t pageno, - uint level, - byte *buff, - enum pagecache_page_type type, - enum pagecache_page_lock lock, - PAGECACHE_PAGE_LINK *link) +byte *pagecache_valid_read(PAGECACHE *pagecache, + PAGECACHE_FILE *file, + pgcache_page_no_t pageno, + uint level, + byte *buff, + enum pagecache_page_type type, + enum pagecache_page_lock lock, + PAGECACHE_PAGE_LINK *link, + pagecache_disk_read_validator validator, + gptr validator_data) { int error= 0; enum pagecache_page_pin pin= lock_to_pin[lock]; PAGECACHE_PAGE_LINK fake_link; - DBUG_ENTER("page_cache_read"); + DBUG_ENTER("pagecache_valid_read"); DBUG_PRINT("enter", ("fd: %u page: %lu level: %u t:%s l%s p%s", (uint) file->file, (ulong) pageno, level, page_cache_page_type_str[type], @@ -2829,7 +2736,7 @@ restart: if (pagecache->can_be_used) { /* Key cache is used */ - reg1 PAGECACHE_BLOCK_LINK *block; + PAGECACHE_BLOCK_LINK *block; uint status; int page_st; @@ -2842,29 +2749,33 @@ restart: inc_counter_for_resize_op(pagecache); pagecache->global_cache_r_requests++; - block= find_key_block(pagecache, file, pageno, level, - ((lock == PAGECACHE_LOCK_WRITE) ? 1 : 0), - (((pin == PAGECACHE_PIN_LEFT_PINNED) || - (pin == PAGECACHE_UNPIN)) ? 0 : 1), - &page_st); + block= find_block(pagecache, file, pageno, level, + test(lock == PAGECACHE_LOCK_WRITE), + test((pin == PAGECACHE_PIN_LEFT_PINNED) || + (pin == PAGECACHE_UNPIN)), + &page_st); DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE || block->type == type); block->type= type; - if (pagecache_make_lock_and_pin(pagecache, block, lock, pin)) + if (block->status != BLOCK_ERROR && page_st != PAGE_READ) + { + DBUG_PRINT("info", ("read block 0x%lx", (ulong)block)); + /* The requested page is to be read into the block buffer */ + read_block(pagecache, block, + (my_bool)(page_st == PAGE_TO_BE_READ), + validator, validator_data); + DBUG_PRINT("info", ("read is done")); + } + if (make_lock_and_pin(pagecache, block, lock, pin)) { /* - We failed to write lock the block, cache is unlocked, and last write - lock is released, we will try to get the block again. + We failed to write lock the block, cache is unlocked, + we will try to get the block again. */ pagecache_pthread_mutex_unlock(&pagecache->cache_lock); + DBUG_PRINT("info", ("restarting...")); goto restart; } - if (block->status != BLOCK_ERROR && page_st != PAGE_READ) - { - /* The requested page is to be read into the block buffer */ - read_block(pagecache, block, - (my_bool)(page_st == PAGE_TO_BE_READ)); - } if (! ((status= block->status) & BLOCK_ERROR)) { @@ -2933,7 +2844,7 @@ no_key_cache: /* Key cache is not used */ */ my_bool pagecache_delete_page(PAGECACHE *pagecache, PAGECACHE_FILE *file, - maria_page_no_t pageno, + pgcache_page_no_t pageno, enum pagecache_page_lock lock, my_bool flush) { @@ -2969,13 +2880,14 @@ restart: } block= link->block; DBUG_ASSERT(block != 0); - if (pagecache_make_lock_and_pin(pagecache, block, lock, pin)) + if (make_lock_and_pin(pagecache, block, lock, pin)) { /* We failed to writelock the block, cache is unlocked, and last write lock is released, we will try to get the block again. */ pagecache_pthread_mutex_unlock(&pagecache->cache_lock); + DBUG_PRINT("info", ("restarting...")); goto restart; } @@ -2983,7 +2895,7 @@ restart: { /* The block contains a dirty page - push it out of the cache */ - KEYCACHE_DBUG_PRINT("find_key_block", ("block is dirty")); + KEYCACHE_DBUG_PRINT("find_block", ("block is dirty")); pagecache_pthread_mutex_unlock(&pagecache->cache_lock); /* @@ -3015,9 +2927,10 @@ restart: } /* Cache is locked, so we can relese page before freeing it */ - pagecache_make_lock_and_pin(pagecache, block, - PAGECACHE_LOCK_WRITE_UNLOCK, - PAGECACHE_UNPIN); + make_lock_and_pin(pagecache, block, + PAGECACHE_LOCK_WRITE_UNLOCK, + PAGECACHE_UNPIN); + link->requests--; if (pin == PAGECACHE_PIN_LEFT_PINNED) unreg_request(pagecache, block, 1); free_block(pagecache, block); @@ -3053,11 +2966,12 @@ end: 0 if a success, 1 - otherwise. */ +/* description of how to change lock before and after write */ struct write_lock_change { - int need_lock_change; - enum pagecache_page_lock new_lock; - enum pagecache_page_lock unlock_lock; + int need_lock_change; /* need changing of lock at the end of write */ + enum pagecache_page_lock new_lock; /* lock at the beginning */ + enum pagecache_page_lock unlock_lock; /* lock at the end */ }; static struct write_lock_change write_lock_change_table[]= @@ -3084,10 +2998,11 @@ static struct write_lock_change write_lock_change_table[]= PAGECACHE_LOCK_WRITE_TO_READ}/*PAGECACHE_LOCK_WRITE_TO_READ*/ }; +/* description of how to change pin before and after write */ struct write_pin_change { - enum pagecache_page_pin new_pin; - enum pagecache_page_pin unlock_pin; + enum pagecache_page_pin new_pin; /* pin status at the beginning */ + enum pagecache_page_pin unlock_pin; /* pin status at the end */ }; static struct write_pin_change write_pin_change_table[]= @@ -3104,7 +3019,7 @@ static struct write_pin_change write_pin_change_table[]= my_bool pagecache_write(PAGECACHE *pagecache, PAGECACHE_FILE *file, - maria_page_no_t pageno, + pgcache_page_no_t pageno, uint level, byte *buff, enum pagecache_page_type type, @@ -3113,7 +3028,7 @@ my_bool pagecache_write(PAGECACHE *pagecache, enum pagecache_write_mode write_mode, PAGECACHE_PAGE_LINK *link) { - reg1 PAGECACHE_BLOCK_LINK *block; + reg1 PAGECACHE_BLOCK_LINK *block= NULL; PAGECACHE_PAGE_LINK fake_link; int error= 0; int need_lock_change= write_lock_change_table[lock].need_lock_change; @@ -3133,7 +3048,7 @@ my_bool pagecache_write(PAGECACHE *pagecache, if (write_mode == PAGECACHE_WRITE_NOW) { - /* we allow direct write if wwe do not use long term lockings */ + /* we allow direct write if we do not use long term lockings */ DBUG_ASSERT(lock == PAGECACHE_LOCK_LEFT_UNLOCKED); /* Force writing from buff into disk */ pagecache->global_cache_write++; @@ -3167,10 +3082,10 @@ restart: lock != PAGECACHE_LOCK_LEFT_WRITELOCKED && lock != PAGECACHE_LOCK_WRITE_UNLOCK && lock != PAGECACHE_LOCK_WRITE_TO_READ); - block= find_key_block(pagecache, file, pageno, level, - (need_wrlock ? 1 : 0), - (need_wrlock ? 1 : 0), - &page_st); + block= find_block(pagecache, file, pageno, level, + (need_wrlock ? 1 : 0), + (need_wrlock ? 1 : 0), + &page_st); } if (!block) { @@ -3186,24 +3101,25 @@ restart: block->type == type); block->type= type; - if (pagecache_make_lock_and_pin(pagecache, block, - write_lock_change_table[lock].new_lock, - (need_lock_change ? - write_pin_change_table[pin].new_pin : - pin))) + if (make_lock_and_pin(pagecache, block, + write_lock_change_table[lock].new_lock, + (need_lock_change ? + write_pin_change_table[pin].new_pin : + pin))) { /* We failed to writelock the block, cache is unlocked, and last write lock is released, we will try to get the block again. */ pagecache_pthread_mutex_unlock(&pagecache->cache_lock); + DBUG_PRINT("info", ("restarting...")); goto restart; } if (write_mode == PAGECACHE_WRITE_DONE) { - if (block->status != BLOCK_ERROR && page_st != PAGE_READ) + if ((block->status & BLOCK_ERROR) && page_st != PAGE_READ) { /* Copy data from buff */ bmove512(block->buffer, buff, pagecache->block_size); @@ -3212,7 +3128,7 @@ restart: ("primary request: new page in cache")); /* Signal that all pending requests for this now can be processed. */ if (block->wqueue[COND_FOR_REQUESTED].last_thread) - release_queue(&block->wqueue[COND_FOR_REQUESTED]); + wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]); } } else @@ -3220,7 +3136,8 @@ restart: if (write_mode == PAGECACHE_WRITE_NOW) { /* buff has been written to disk at start */ - if (block->status & BLOCK_CHANGED) + if ((block->status & BLOCK_CHANGED) && + !(block->status & BLOCK_ERROR)) link_to_file_list(pagecache, block, &block->hash_link->file, 1); } else @@ -3231,8 +3148,8 @@ restart: if (! (block->status & BLOCK_ERROR)) { bmove512(block->buffer, buff, pagecache->block_size); + block->status|= BLOCK_READ; } - block->status|= BLOCK_READ; } @@ -3242,9 +3159,9 @@ restart: int rc= #endif #warning we are doing an unlock here, so need to give the page its rec_lsn! - pagecache_make_lock_and_pin(pagecache, block, - write_lock_change_table[lock].unlock_lock, - write_pin_change_table[pin].unlock_pin); + make_lock_and_pin(pagecache, block, + write_lock_change_table[lock].unlock_lock, + write_pin_change_table[pin].unlock_pin); #ifndef DBUG_OFF DBUG_ASSERT(rc == 0); #endif @@ -3255,10 +3172,7 @@ restart: block->hash_link->requests--; if (pin != PAGECACHE_PIN_LEFT_PINNED && pin != PAGECACHE_PIN) { - if (write_mode != PAGECACHE_WRITE_DONE) - { - unreg_request(pagecache, block, 1); - } + unreg_request(pagecache, block, 1); } else *link= (PAGECACHE_PAGE_LINK)block; @@ -3290,6 +3204,7 @@ end: DBUG_EXECUTE("exec", test_key_cache(pagecache, "end of key_cache_write", 1);); #endif + BLOCK_INFO(block); DBUG_RETURN(error); } @@ -3321,6 +3236,7 @@ static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block) unlink_changed(block); DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0); + DBUG_ASSERT(block->pins > 0); block->status= 0; #ifndef DBUG_OFF block->type= PAGECACHE_EMPTY_PAGE; @@ -3344,7 +3260,7 @@ static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block) /* All pending requests for this page must be resubmitted. */ if (block->wqueue[COND_FOR_SAVED].last_thread) - release_queue(&block->wqueue[COND_FOR_SAVED]); + wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]); } @@ -3398,12 +3314,13 @@ static int flush_cached_blocks(PAGECACHE *pagecache, } /* if the block is not pinned then it is not write locked */ DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0); + DBUG_ASSERT(block->pins > 0); #ifndef DBUG_OFF { int rc= #endif - pagecache_make_lock_and_pin(pagecache, block, - PAGECACHE_LOCK_WRITE, PAGECACHE_PIN); + make_lock_and_pin(pagecache, block, + PAGECACHE_LOCK_WRITE, PAGECACHE_PIN); #ifndef DBUG_OFF DBUG_ASSERT(rc == 0); } @@ -3427,9 +3344,9 @@ static int flush_cached_blocks(PAGECACHE *pagecache, MYF(MY_NABP | MY_WAIT_IF_FULL)); pagecache_pthread_mutex_lock(&pagecache->cache_lock); - pagecache_make_lock_and_pin(pagecache, block, - PAGECACHE_LOCK_WRITE_UNLOCK, - PAGECACHE_UNPIN); + make_lock_and_pin(pagecache, block, + PAGECACHE_LOCK_WRITE_UNLOCK, + PAGECACHE_UNPIN); pagecache->global_cache_write++; if (error) @@ -3443,7 +3360,7 @@ static int flush_cached_blocks(PAGECACHE *pagecache, It might happen only during an operation to resize the key cache. */ if (block->wqueue[COND_FOR_SAVED].last_thread) - release_queue(&block->wqueue[COND_FOR_SAVED]); + wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]); /* type will never be FLUSH_IGNORE_CHANGED here */ if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE)) { @@ -3577,6 +3494,7 @@ restart: if ((error= flush_cached_blocks(pagecache, file, cache, end,type))) last_errno=error; + DBUG_PRINT("info", ("restarting...")); /* Restart the scan as some other thread might have changed the changed blocks chain: the blocks that were in switch @@ -3622,7 +3540,7 @@ removes a page from the list of dirty pages, while it's still dirty. A \ { #ifdef THREAD struct st_my_thread_var *thread= my_thread_var; - add_to_queue(&block->wqueue[COND_FOR_SAVED], thread); + wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread); do { KEYCACHE_DBUG_PRINT("flush_pagecache_blocks_int: wait", @@ -3761,7 +3679,7 @@ static int flush_all_key_blocks(PAGECACHE *pagecache) 0 on success (always because it can't fail) */ -int reset_key_cache_counters(const char *name, PAGECACHE *key_cache) +static int reset_key_cache_counters(const char *name, PAGECACHE *key_cache) { DBUG_ENTER("reset_key_cache_counters"); if (!key_cache->inited) diff --git a/mysys/wqueue.c b/mysys/wqueue.c new file mode 100644 index 00000000000..28e044ff606 --- /dev/null +++ b/mysys/wqueue.c @@ -0,0 +1,167 @@ + +#include <wqueue.h> + +#define STRUCT_PTR(TYPE, MEMBER, a) \ + (TYPE *) ((char *) (a) - offsetof(TYPE, MEMBER)) +/* + Link a thread into double-linked queue of waiting threads. + + SYNOPSIS + wqueue_link_into_queue() + wqueue pointer to the queue structure + thread pointer to the thread to be added to the queue + + RETURN VALUE + none + + NOTES. + Queue is represented by a circular list of the thread structures + The list is double-linked of the type (**prev,*next), accessed by + a pointer to the last element. +*/ + +void wqueue_link_into_queue(WQUEUE *wqueue, struct st_my_thread_var *thread) +{ + struct st_my_thread_var *last; + if (!(last= wqueue->last_thread)) + { + /* Queue is empty */ + thread->next= thread; + thread->prev= &thread->next; + } + else + { + thread->prev= last->next->prev; + last->next->prev= &thread->next; + thread->next= last->next; + last->next= thread; + } + wqueue->last_thread= thread; +} + + +/* + Add a thread to single-linked queue of waiting threads + + SYNOPSIS + wqueue_add_to_queue() + wqueue pointer to the queue structure + thread pointer to the thread to be added to the queue + + RETURN VALUE + none + + NOTES. + Queue is represented by a circular list of the thread structures + The list is single-linked of the type (*next), accessed by a pointer + to the last element. +*/ + +void wqueue_add_to_queue(WQUEUE *wqueue, struct st_my_thread_var *thread) +{ + struct st_my_thread_var *last; + if (!(last= wqueue->last_thread)) + thread->next= thread; + else + { + thread->next= last->next; + last->next= thread; + } + wqueue->last_thread= thread; +} + +/* + Unlink a thread from double-linked queue of waiting threads + + SYNOPSIS + wqueue_unlink_from_queue() + wqueue pointer to the queue structure + thread pointer to the thread to be removed from the queue + + RETURN VALUE + none + + NOTES. + See NOTES for link_into_queue +*/ + +void wqueue_unlink_from_queue(WQUEUE *wqueue, struct st_my_thread_var *thread) +{ + if (thread->next == thread) + /* The queue contains only one member */ + wqueue->last_thread= NULL; + else + { + thread->next->prev= thread->prev; + *thread->prev= thread->next; + if (wqueue->last_thread == thread) + wqueue->last_thread= STRUCT_PTR(struct st_my_thread_var, next, + thread->prev); + } + thread->next= NULL; +} + + +/* + Remove all threads from queue signaling them to proceed + + SYNOPSIS + wqueue_realease_queue() + wqueue pointer to the queue structure + thread pointer to the thread to be added to the queue + + RETURN VALUE + none + + NOTES. + See notes for add_to_queue + When removed from the queue each thread is signaled via condition + variable thread->suspend. +*/ + +void wqueue_release_queue(WQUEUE *wqueue) +{ + struct st_my_thread_var *last= wqueue->last_thread; + struct st_my_thread_var *next= last->next; + struct st_my_thread_var *thread; + do + { + thread= next; + pthread_cond_signal(&thread->suspend); + next= thread->next; + thread->next= NULL; + } + while (thread != last); + wqueue->last_thread= NULL; +} + + +/* + Add thread and wait + + SYNOPSYS + wqueue_add_and_wait() + wqueue queue to add to + thread thread which is waiting + lock mutex need for the operation +*/ + +void wqueue_add_and_wait(WQUEUE *wqueue, + struct st_my_thread_var *thread, pthread_mutex_t *lock) +{ + DBUG_ENTER("wqueue_add_and_wait"); + DBUG_PRINT("enter", ("thread ox%lxcond 0x%lx, mutex 0x%lx", + (ulong) thread, (ulong) &thread->suspend, (ulong) lock)); + wqueue_add_to_queue(wqueue, thread); + do + { + DBUG_PRINT("info", ("wait... cond 0x%lx, mutex 0x%lx", + (ulong) &thread->suspend, (ulong) lock)); + pthread_cond_wait(&thread->suspend, lock); + DBUG_PRINT("info", ("wait done cond 0x%lx, mutex 0x%lx, next 0x%lx", + (ulong) &thread->suspend, (ulong) lock, + (ulong) thread->next)); + } + while (thread->next); + DBUG_VOID_RETURN; +} |