diff options
Diffstat (limited to 'mysys/mf_keycache.c')
-rw-r--r-- | mysys/mf_keycache.c | 121 |
1 files changed, 91 insertions, 30 deletions
diff --git a/mysys/mf_keycache.c b/mysys/mf_keycache.c index cee1b7eb4e9..f14af44dbb9 100644 --- a/mysys/mf_keycache.c +++ b/mysys/mf_keycache.c @@ -184,10 +184,18 @@ static void test_key_cache(KEY_CACHE *keycache, static FILE *keycache_debug_log=NULL; static void keycache_debug_print _VARARGS((const char *fmt,...)); #define KEYCACHE_DEBUG_OPEN \ - if (!keycache_debug_log) keycache_debug_log=fopen(KEYCACHE_DEBUG_LOG, "w") + if (!keycache_debug_log) \ + { \ + keycache_debug_log= fopen(KEYCACHE_DEBUG_LOG, "w"); \ + (void) setvbuf(keycache_debug_log, NULL, _IOLBF, BUFSIZ); \ + } #define KEYCACHE_DEBUG_CLOSE \ - if (keycache_debug_log) { fclose(keycache_debug_log); keycache_debug_log=0; } + if (keycache_debug_log) \ + { \ + fclose(keycache_debug_log); \ + keycache_debug_log= 0; \ + } #else #define KEYCACHE_DEBUG_OPEN #define KEYCACHE_DEBUG_CLOSE @@ -213,7 +221,7 @@ static long keycache_thread_id; #define KEYCACHE_THREAD_TRACE_BEGIN(l) \ { struct st_my_thread_var *thread_var= my_thread_var; \ - keycache_thread_id= my_thread_var->id; \ + keycache_thread_id= thread_var->id; \ KEYCACHE_DBUG_PRINT(l,("[thread %ld",keycache_thread_id)) } #define KEYCACHE_THREAD_TRACE_END(l) \ @@ -240,12 +248,10 @@ static int keycache_pthread_cond_wait(pthread_cond_t *cond, static int keycache_pthread_mutex_lock(pthread_mutex_t *mutex); static void keycache_pthread_mutex_unlock(pthread_mutex_t *mutex); static int keycache_pthread_cond_signal(pthread_cond_t *cond); -static int keycache_pthread_cond_broadcast(pthread_cond_t *cond); #else #define keycache_pthread_mutex_lock pthread_mutex_lock #define keycache_pthread_mutex_unlock pthread_mutex_unlock #define keycache_pthread_cond_signal pthread_cond_signal -#define keycache_pthread_cond_broadcast pthread_cond_broadcast #endif /* defined(KEYCACHE_DEBUG) */ static uint next_power(uint value) @@ -508,6 +514,8 @@ int resize_key_cache(KEY_CACHE *keycache, uint key_cache_block_size, keycache->can_be_used= 0; while (keycache->cnt_for_resize_op) { + KEYCACHE_DBUG_PRINT("resize_key_cache: wait", + ("suspend thread %ld", thread->id)); keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock); } @@ -520,7 +528,11 @@ finish: unlink_from_queue(wqueue, thread); /* Signal for the next resize request to proceeed if any */ if (wqueue->last_thread) + { + KEYCACHE_DBUG_PRINT("resize_key_cache: signal", + ("thread %ld", wqueue->last_thread->next->id)); keycache_pthread_cond_signal(&wqueue->last_thread->next->suspend); + } keycache_pthread_mutex_unlock(&keycache->cache_lock); return blocks; } @@ -544,7 +556,11 @@ static inline void dec_counter_for_resize_op(KEY_CACHE *keycache) struct st_my_thread_var *last_thread; if (!--keycache->cnt_for_resize_op && (last_thread= keycache->resize_queue.last_thread)) + { + KEYCACHE_DBUG_PRINT("dec_counter_for_resize_op: signal", + ("thread %ld", last_thread->next->id)); keycache_pthread_cond_signal(&last_thread->next->suspend); + } } /* @@ -761,8 +777,8 @@ static void release_queue(KEYCACHE_WQUEUE *wqueue) do { thread=next; - keycache_pthread_cond_signal(&thread->suspend); KEYCACHE_DBUG_PRINT("release_queue: signal", ("thread %ld", thread->id)); + keycache_pthread_cond_signal(&thread->suspend); next=thread->next; thread->next= NULL; } @@ -876,7 +892,8 @@ static void link_block(KEY_CACHE *keycache, BLOCK_LINK *block, my_bool hot, BLOCK_LINK **pins; KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests)); - if (!hot && keycache->waiting_for_block.last_thread) { + if (!hot && keycache->waiting_for_block.last_thread) + { /* Signal that in the LRU warm sub-chain an available block has appeared */ struct st_my_thread_var *last_thread= keycache->waiting_for_block.last_thread; @@ -894,6 +911,7 @@ static void link_block(KEY_CACHE *keycache, BLOCK_LINK *block, my_bool hot, */ if ((HASH_LINK *) thread->opt_info == hash_link) { + KEYCACHE_DBUG_PRINT("link_block: signal", ("thread %ld", thread->id)); keycache_pthread_cond_signal(&thread->suspend); unlink_from_queue(&keycache->waiting_for_block, thread); block->requests++; @@ -1000,11 +1018,10 @@ static void reg_requests(KEY_CACHE *keycache, BLOCK_LINK *block, int count) linking it to the LRU chain if it's the last request SYNOPSIS - - unreg_block() - keycache pointer to a key cache data structure - block pointer to the block to link to the LRU chain - at_end <-> to link the block at the end of the LRU chain + unreg_request() + keycache pointer to a key cache data structure + block pointer to the block to link to the LRU chain + at_end <-> to link the block at the end of the LRU chain RETURN VALUE none @@ -1086,6 +1103,9 @@ static inline void wait_for_readers(KEY_CACHE *keycache, BLOCK_LINK *block) struct st_my_thread_var *thread= my_thread_var; while (block->hash_link->requests) { + KEYCACHE_DBUG_PRINT("wait_for_readers: wait", + ("suspend thread %ld block %u", + thread->id, BLOCK_NUMBER(block))); block->condvar= &thread->suspend; keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock); block->condvar= NULL; @@ -1143,6 +1163,7 @@ static void unlink_hash(KEY_CACHE *keycache, HASH_LINK *hash_link) */ if (page->file == hash_link->file && page->filepos == hash_link->diskpos) { + KEYCACHE_DBUG_PRINT("unlink_hash: signal", ("thread %ld", thread->id)); keycache_pthread_cond_signal(&thread->suspend); unlink_from_queue(&keycache->waiting_for_hash_link, thread); } @@ -1225,6 +1246,8 @@ restart: page.filepos= filepos; thread->opt_info= (void *) &page; link_into_queue(&keycache->waiting_for_hash_link, thread); + KEYCACHE_DBUG_PRINT("get_hash_link: wait", + ("suspend thread %ld", thread->id)); keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock); thread->opt_info= NULL; @@ -1343,6 +1366,8 @@ restart: add_to_queue(&block->wqueue[COND_FOR_SAVED], thread); do { + KEYCACHE_DBUG_PRINT("find_key_block: wait", + ("suspend thread %ld", thread->id)); keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock); } @@ -1360,7 +1385,9 @@ restart: /* This is a request for a page to be removed from cache */ KEYCACHE_DBUG_PRINT("find_key_block", - ("request for old page in block %u",BLOCK_NUMBER(block))); + ("request for old page in block %u " + "wrmode: %d block->status: %d", + BLOCK_NUMBER(block), wrmode, block->status)); /* Only reading requests can proceed until the old dirty page is flushed, all others are to be suspended, then resubmitted @@ -1379,6 +1406,8 @@ restart: /* Wait until the request can be resubmitted */ do { + KEYCACHE_DBUG_PRINT("find_key_block: wait", + ("suspend thread %ld", thread->id)); keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock); } @@ -1448,6 +1477,8 @@ restart: link_into_queue(&keycache->waiting_for_block, thread); do { + KEYCACHE_DBUG_PRINT("find_key_block: wait", + ("suspend thread %ld", thread->id)); keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock); } @@ -1528,9 +1559,13 @@ restart: else { /* This is for secondary requests for a new page only */ - page_status= block->hash_link == hash_link && - (block->status & BLOCK_READ) ? - PAGE_READ : PAGE_WAIT_TO_BE_READ; + KEYCACHE_DBUG_PRINT("find_key_block", + ("block->hash_link: %p hash_link: %p " + "block->status: %u", block->hash_link, + hash_link, block->status )); + page_status= (((block->hash_link == hash_link) && + (block->status & BLOCK_READ)) ? + PAGE_READ : PAGE_WAIT_TO_BE_READ); } } keycache->global_cache_read++; @@ -1538,17 +1573,22 @@ restart: else { reg_requests(keycache, block, 1); - page_status = block->hash_link == hash_link && - (block->status & BLOCK_READ) ? - PAGE_READ : PAGE_WAIT_TO_BE_READ; + KEYCACHE_DBUG_PRINT("find_key_block", + ("block->hash_link: %p hash_link: %p " + "block->status: %u", block->hash_link, + hash_link, block->status )); + page_status= (((block->hash_link == hash_link) && + (block->status & BLOCK_READ)) ? + PAGE_READ : PAGE_WAIT_TO_BE_READ); } } KEYCACHE_DBUG_ASSERT(page_status != -1); *page_st=page_status; KEYCACHE_DBUG_PRINT("find_key_block", - ("fd: %u pos %lu page_status %lu", - (uint) file,(ulong) filepos,(uint) page_status)); + ("fd: %u pos %lu block->status %u page_status %lu", + (uint) file, (ulong) filepos, block->status, + (uint) page_status)); #if !defined(DBUG_OFF) && defined(EXTRA_DEBUG) DBUG_EXECUTE("check_keycache2", @@ -1604,6 +1644,10 @@ static void read_block(KEY_CACHE *keycache, /* Page is not in buffer yet, is to be read from disk */ keycache_pthread_mutex_unlock(&keycache->cache_lock); + /* + Here other threads may step in and register as secondary readers. + They will register in block->wqueue[COND_FOR_REQUESTED]. + */ got_length= my_pread(block->hash_link->file, block->buffer, read_length, block->hash_link->diskpos, MYF(0)); keycache_pthread_mutex_lock(&keycache->cache_lock); @@ -1634,6 +1678,8 @@ static void read_block(KEY_CACHE *keycache, add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread); do { + KEYCACHE_DBUG_PRINT("read_block: wait", + ("suspend thread %ld", thread->id)); keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock); } @@ -1855,6 +1901,10 @@ int key_cache_insert(KEY_CACHE *keycache, /* The requested page is to be read into the block buffer */ #if !defined(SERIALIZED_READ_FROM_CACHE) keycache_pthread_mutex_unlock(&keycache->cache_lock); + /* + Here other threads may step in and register as secondary readers. + They will register in block->wqueue[COND_FOR_REQUESTED]. + */ #endif /* Copy data from buff */ @@ -1865,9 +1915,15 @@ int key_cache_insert(KEY_CACHE *keycache, #if !defined(SERIALIZED_READ_FROM_CACHE) keycache_pthread_mutex_lock(&keycache->cache_lock); + /* Here we are alone again. */ #endif block->status= BLOCK_READ; block->length= read_length+offset; + KEYCACHE_DBUG_PRINT("key_cache_insert", + ("primary request: new page in cache")); + /* Signal that all pending requests for this now can be processed. */ + if (block->wqueue[COND_FOR_REQUESTED].last_thread) + release_queue(&block->wqueue[COND_FOR_REQUESTED]); } remove_reader(block); @@ -2074,9 +2130,16 @@ static void free_block(KEY_CACHE *keycache, BLOCK_LINK *block) { KEYCACHE_THREAD_TRACE("free block"); KEYCACHE_DBUG_PRINT("free_block", - ("block %u to be freed",BLOCK_NUMBER(block))); + ("block %u to be freed, hash_link %p", + BLOCK_NUMBER(block), block->hash_link)); if (block->hash_link) { + /* + While waiting for readers to finish, new readers might request the + block. But since we set block->status|= BLOCK_REASSIGNED, they + will wait on block->wqueue[COND_FOR_SAVED]. They must be signalled + later. + */ block->status|= BLOCK_REASSIGNED; wait_for_readers(keycache, block); unlink_hash(keycache, block->hash_link); @@ -2102,6 +2165,10 @@ static void free_block(KEY_CACHE *keycache, BLOCK_LINK *block) keycache->free_block_list= block; /* Keep track of the number of currently unused blocks. */ keycache->blocks_unused++; + + /* All pending requests for this page must be resubmitted. */ + if (block->wqueue[COND_FOR_SAVED].last_thread) + release_queue(&block->wqueue[COND_FOR_SAVED]); } @@ -2334,6 +2401,8 @@ restart: add_to_queue(&block->wqueue[COND_FOR_SAVED], thread); do { + KEYCACHE_DBUG_PRINT("flush_key_blocks_int: wait", + ("suspend thread %ld", thread->id)); keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock); } @@ -2685,14 +2754,6 @@ static int keycache_pthread_cond_signal(pthread_cond_t *cond) } -static int keycache_pthread_cond_broadcast(pthread_cond_t *cond) -{ - int rc; - KEYCACHE_THREAD_TRACE("signal"); - rc= pthread_cond_broadcast(cond); - return rc; -} - #if defined(KEYCACHE_DEBUG_LOG) |