summaryrefslogtreecommitdiff
path: root/mysys
diff options
context:
space:
mode:
authorunknown <bell@desktop.sanja.is.com.ua>2007-02-02 09:41:32 +0200
committerunknown <bell@desktop.sanja.is.com.ua>2007-02-02 09:41:32 +0200
commit025400922118f11a15be54c66455f20e2f72c0b4 (patch)
treef3e73bf1a50802f9f1fd8f4bcee5361654350e32 /mysys
parent5c7960965c4c178d3a02f9893ea65b2802b38b8f (diff)
downloadmariadb-git-025400922118f11a15be54c66455f20e2f72c0b4.tar.gz
postreview changes for page cache and pre review commit for loghandler
storage/maria/unittest/test_file.c: Rename: unittest/mysys/test_file.c -> storage/maria/unittest/test_file.c storage/maria/unittest/test_file.h: Rename: unittest/mysys/test_file.h -> storage/maria/unittest/test_file.h include/pagecache.h: A waiting queue mechanism moved to separate file wqueue.* Pointer name changed for compatibility mysys/Makefile.am: A waiting queue mechanism moved to separate file wqueue.* mysys/mf_keycache.c: fixed unsigned comparison mysys/mf_pagecache.c: A waiting queue mechanism moved to separate file wqueue.* Fixed bug in unregistering block during write storage/maria/Makefile.am: The loghandler files added storage/maria/ma_control_file.h: Now we have loghandler and can compile control file storage/maria/maria_def.h: Including files need for compilation of maria storage/maria/unittest/Makefile.am: unit tests of loghandler storage/maria/unittest/ma_control_file-t.c: Used maria def storage/maria/unittest/mf_pagecache_consist.c: fixed memory overrun storage/maria/unittest/mf_pagecache_single.c: fixed used uninitialized memory unittest/mysys/Makefile.am: unittests of pagecache moved to maria becase pagecache need loghandler include/wqueue.h: New BitKeeper file ``include/wqueue.h'' mysys/wqueue.c: New BitKeeper file ``mysys/wqueue.c'' storage/maria/ma_loghandler.c: New BitKeeper file ``storage/maria/ma_loghandler.c'' storage/maria/ma_loghandler.h: New BitKeeper file ``storage/maria/ma_loghandler.h'' storage/maria/ma_loghandler_lsn.h: New BitKeeper file ``storage/maria/ma_loghandler_lsn.h'' storage/maria/unittest/ma_test_loghandler-t.c: New BitKeeper file ``storage/maria/unittest/ma_test_loghandler-t.c'' storage/maria/unittest/ma_test_loghandler_multigroup-t.c: New BitKeeper file ``storage/maria/unittest/ma_test_loghandler_multigroup-t.c'' storage/maria/unittest/ma_test_loghandler_multithread-t.c: New BitKeeper file ``storage/maria/unittest/ma_test_loghandler_multithread-t.c'' storage/maria/unittest/ma_test_loghandler_pagecache-t.c: New BitKeeper file ``storage/maria/unittest/ma_test_loghandler_pagecache-t.c''
Diffstat (limited to 'mysys')
-rw-r--r--mysys/Makefile.am2
-rw-r--r--mysys/mf_keycache.c2
-rwxr-xr-xmysys/mf_pagecache.c776
-rw-r--r--mysys/wqueue.c167
4 files changed, 516 insertions, 431 deletions
diff --git a/mysys/Makefile.am b/mysys/Makefile.am
index 4d9570febbd..612411404c4 100644
--- a/mysys/Makefile.am
+++ b/mysys/Makefile.am
@@ -56,7 +56,7 @@ libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c my_mmap.c \
my_handler.c my_netware.c my_largepage.c \
my_memmem.c \
my_windac.c my_access.c base64.c my_libwrap.c \
- mf_pagecache.c
+ mf_pagecache.c wqueue.c
EXTRA_DIST = thr_alarm.c thr_lock.c my_pthread.c my_thr_init.c \
thr_mutex.c thr_rwlock.c \
CMakeLists.txt mf_soundex.c \
diff --git a/mysys/mf_keycache.c b/mysys/mf_keycache.c
index 9a99a278bc5..9cb428ab200 100644
--- a/mysys/mf_keycache.c
+++ b/mysys/mf_keycache.c
@@ -1008,12 +1008,12 @@ static void unlink_block(KEY_CACHE *keycache, BLOCK_LINK *block)
KEYCACHE_THREAD_TRACE("unlink_block");
#if defined(KEYCACHE_DEBUG)
+ KEYCACHE_DBUG_ASSERT(keycache->blocks_available != 0);
keycache->blocks_available--;
KEYCACHE_DBUG_PRINT("unlink_block",
("unlinked block %u status=%x #requests=%u #available=%u",
BLOCK_NUMBER(block), block->status,
block->requests, keycache->blocks_available));
- KEYCACHE_DBUG_ASSERT(keycache->blocks_available >= 0);
#endif
}
diff --git a/mysys/mf_pagecache.c b/mysys/mf_pagecache.c
index 4b92f68d9bf..97cb542f329 100755
--- a/mysys/mf_pagecache.c
+++ b/mysys/mf_pagecache.c
@@ -26,7 +26,7 @@
When a new block is required it is first tried to pop one from the stack.
If the stack is empty, it is tried to get a never-used block from the pool.
If this is empty too, then a block is taken from the LRU ring, flushing it
- to disk, if necessary. This is handled in find_key_block().
+ to disk, if necessary. This is handled in find_block().
With the new free list, the blocks can have three temperatures:
hot, warm and cold (which is free). This is remembered in the block header
by the enum BLOCK_TEMPERATURE temperature variable. Remembering the
@@ -91,13 +91,16 @@
/*
In key cache we have external raw locking here we use
SERIALIZED_READ_FROM_CACHE to avoid problem of reading
- not consistent data from te page
+ not consistent data from the page.
+ (keycache functions (key_cache_read(), key_cache_insert() and
+ key_cache_write()) rely on external MyISAM lock, we don't)
*/
#define SERIALIZED_READ_FROM_CACHE yes
#define BLOCK_INFO(B) \
DBUG_PRINT("info", \
- ("block 0x%lx, file %lu, page %lu, s %0x, hshL 0x%lx, req %u/%u", \
+ ("block 0x%lx file %lu page %lu s %0x hshL 0x%lx req %u/%u " \
+ "wrlock: %c", \
(ulong)(B), \
(ulong)((B)->hash_link ? \
(B)->hash_link->file.file : \
@@ -110,7 +113,8 @@
(uint) (B)->requests, \
(uint)((B)->hash_link ? \
(B)->hash_link->requests : \
- 0)))
+ 0), \
+ ((block->status & BLOCK_WRLOCK)?'Y':'N')))
/* TODO: put it to my_static.c */
my_bool my_disable_flush_pagecache_blocks= 0;
@@ -138,7 +142,7 @@ typedef pthread_cond_t KEYCACHE_CONDVAR;
struct st_pagecache_page
{
PAGECACHE_FILE file; /* file to which the page belongs to */
- maria_page_no_t pageno; /* number of the page in the file */
+ pgcache_page_no_t pageno; /* number of the page in the file */
};
/* element in the chain of a hash table bucket */
@@ -149,7 +153,7 @@ struct st_pagecache_hash_link
struct st_pagecache_block_link
*block; /* reference to the block for the page: */
PAGECACHE_FILE file; /* from such a file */
- maria_page_no_t pageno; /* this page */
+ pgcache_page_no_t pageno; /* this page */
uint requests; /* number of requests for the page */
};
@@ -162,7 +166,7 @@ struct st_pagecache_hash_link
#define BLOCK_CHANGED 32 /* block buffer contains a dirty page */
#define BLOCK_WRLOCK 64 /* write locked block */
-/* page status, returned by find_key_block */
+/* page status, returned by find_block */
#define PAGE_READ 0
#define PAGE_TO_BE_READ 1
#define PAGE_WAIT_TO_BE_READ 2
@@ -232,7 +236,7 @@ typedef struct st_pagecache_lock_info
node the node which should be linked
*/
-void info_link(PAGECACHE_PIN_INFO **list, PAGECACHE_PIN_INFO *node)
+static void info_link(PAGECACHE_PIN_INFO **list, PAGECACHE_PIN_INFO *node)
{
if ((node->next= *list))
node->next->prev= &(node->next);
@@ -249,7 +253,7 @@ void info_link(PAGECACHE_PIN_INFO **list, PAGECACHE_PIN_INFO *node)
node the node which should be unlinked
*/
-void info_unlink(PAGECACHE_PIN_INFO *node)
+static void info_unlink(PAGECACHE_PIN_INFO *node)
{
if ((*node->prev= node->next))
node->next->prev= node->prev;
@@ -271,8 +275,8 @@ void info_unlink(PAGECACHE_PIN_INFO *node)
pointer to the information node of the thread in the list
*/
-PAGECACHE_PIN_INFO *info_find(PAGECACHE_PIN_INFO *list,
- struct st_my_thread_var *thread)
+static PAGECACHE_PIN_INFO *info_find(PAGECACHE_PIN_INFO *list,
+ struct st_my_thread_var *thread)
{
register PAGECACHE_PIN_INFO *i= list;
for(; i != 0; i= i->next)
@@ -291,7 +295,7 @@ struct st_pagecache_block_link
*next_changed, **prev_changed; /* for lists of file dirty/clean blocks */
struct st_pagecache_hash_link
*hash_link; /* backward ptr to referring hash_link */
- PAGECACHE_WQUEUE
+ WQUEUE
wqueue[COND_SIZE]; /* queues on waiting requests for new/old pages */
uint requests; /* number of requests for the block */
byte *buffer; /* buffer for the block page */
@@ -310,8 +314,8 @@ struct st_pagecache_block_link
#ifdef PAGECACHE_DEBUG
/* debug checks */
-my_bool info_check_pin(PAGECACHE_BLOCK_LINK *block,
- enum pagecache_page_pin mode)
+static my_bool info_check_pin(PAGECACHE_BLOCK_LINK *block,
+ enum pagecache_page_pin mode)
{
struct st_my_thread_var *thread= my_thread_var;
DBUG_ENTER("info_check_pin");
@@ -367,9 +371,9 @@ my_bool info_check_pin(PAGECACHE_BLOCK_LINK *block,
1 - Error
*/
-my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block,
- enum pagecache_page_lock lock,
- enum pagecache_page_pin pin)
+static my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block,
+ enum pagecache_page_lock lock,
+ enum pagecache_page_pin pin)
{
struct st_my_thread_var *thread= my_thread_var;
DBUG_ENTER("info_check_lock");
@@ -379,47 +383,47 @@ my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block,
switch(lock)
{
case PAGECACHE_LOCK_LEFT_UNLOCKED:
- DBUG_ASSERT(pin == PAGECACHE_PIN_LEFT_UNPINNED);
- if (info)
+ if (pin != PAGECACHE_PIN_LEFT_UNPINNED ||
+ info)
goto error;
break;
case PAGECACHE_LOCK_LEFT_READLOCKED:
- DBUG_ASSERT(pin == PAGECACHE_PIN_LEFT_UNPINNED ||
- pin == PAGECACHE_PIN_LEFT_PINNED);
- if (info == 0 || info->write_lock)
+ if ((pin != PAGECACHE_PIN_LEFT_UNPINNED &&
+ pin != PAGECACHE_PIN_LEFT_PINNED) ||
+ info == 0 || info->write_lock)
goto error;
break;
case PAGECACHE_LOCK_LEFT_WRITELOCKED:
- DBUG_ASSERT(pin == PAGECACHE_PIN_LEFT_PINNED);
- if (info == 0 || !info->write_lock)
+ if (pin != PAGECACHE_PIN_LEFT_PINNED ||
+ info == 0 || !info->write_lock)
goto error;
break;
case PAGECACHE_LOCK_READ:
- DBUG_ASSERT(pin == PAGECACHE_PIN_LEFT_UNPINNED ||
- pin == PAGECACHE_PIN);
- if (info != 0)
+ if ((pin != PAGECACHE_PIN_LEFT_UNPINNED &&
+ pin != PAGECACHE_PIN) ||
+ info != 0)
goto error;
break;
case PAGECACHE_LOCK_WRITE:
- DBUG_ASSERT(pin == PAGECACHE_PIN);
- if (info != 0)
+ if (pin != PAGECACHE_PIN ||
+ info != 0)
goto error;
break;
case PAGECACHE_LOCK_READ_UNLOCK:
- DBUG_ASSERT(pin == PAGECACHE_PIN_LEFT_UNPINNED ||
- pin == PAGECACHE_UNPIN);
- if (info == 0 || info->write_lock)
+ if ((pin != PAGECACHE_PIN_LEFT_UNPINNED &&
+ pin != PAGECACHE_UNPIN) ||
+ info == 0 || info->write_lock)
goto error;
break;
case PAGECACHE_LOCK_WRITE_UNLOCK:
- DBUG_ASSERT(pin == PAGECACHE_UNPIN);
- if (info == 0 || !info->write_lock)
+ if (pin != PAGECACHE_UNPIN ||
+ info == 0 || !info->write_lock)
goto error;
break;
case PAGECACHE_LOCK_WRITE_TO_READ:
- DBUG_ASSERT(pin == PAGECACHE_PIN_LEFT_PINNED ||
- pin == PAGECACHE_UNPIN);
- if (info == 0 || !info->write_lock)
+ if ((pin != PAGECACHE_PIN_LEFT_PINNED &&
+ pin != PAGECACHE_UNPIN) ||
+ info == 0 || !info->write_lock)
goto error;
break;
}
@@ -439,12 +443,6 @@ error:
#define FLUSH_CACHE 2000 /* sort this many blocks at once */
static int flush_all_key_blocks(PAGECACHE *pagecache);
-#ifdef THREAD
-static void link_into_queue(PAGECACHE_WQUEUE *wqueue,
- struct st_my_thread_var *thread);
-static void unlink_from_queue(PAGECACHE_WQUEUE *wqueue,
- struct st_my_thread_var *thread);
-#endif
static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block);
static void test_key_cache(PAGECACHE *pagecache,
const char *where, my_bool lock);
@@ -551,6 +549,7 @@ static int ___pagecache_pthread_cond_signal(pthread_cond_t *cond);
#define pagecache_pthread_cond_signal pthread_cond_signal
#endif /* defined(PAGECACHE_DEBUG) */
+extern my_bool translog_flush(LSN *lsn);
/*
Write page to the disk
@@ -567,18 +566,28 @@ static int ___pagecache_pthread_cond_signal(pthread_cond_t *cond);
0 - OK
!=0 - Error
*/
-uint pagecache_fwrite(PAGECACHE *pagecache,
- PAGECACHE_FILE *filedesc,
- byte *buffer,
- maria_page_no_t pageno,
- enum pagecache_page_type type,
- myf flags)
+
+static uint pagecache_fwrite(PAGECACHE *pagecache,
+ PAGECACHE_FILE *filedesc,
+ byte *buffer,
+ pgcache_page_no_t pageno,
+ enum pagecache_page_type type,
+ myf flags)
{
DBUG_ENTER("pagecache_fwrite");
if (type == PAGECACHE_LSN_PAGE)
{
+ LSN lsn;
DBUG_PRINT("info", ("Log handler call"));
- /* TODO: put here loghandler call */
+ /* TODO: integrate with page format */
+#define PAGE_LSN_OFFSET 0
+ lsn7korr(&lsn, buffer + PAGE_LSN_OFFSET);
+ /*
+ check CONTROL_FILE_IMPOSSIBLE_FILENO &
+ CONTROL_FILE_IMPOSSIBLE_LOG_OFFSET
+ */
+ DBUG_ASSERT(lsn.file_no != 0 && lsn.rec_offset != 0);
+ translog_flush(&lsn);
}
DBUG_RETURN(my_pwrite(filedesc->file, buffer, pagecache->block_size,
(pageno)<<(pagecache->shift), flags));
@@ -628,8 +637,6 @@ static uint next_power(uint value)
division_limit division limit (may be zero)
age_threshold age threshold (may be zero)
block_size size of block (should be power of 2)
- loghandler logfandler pointer to call it in case of
- pages with LSN
RETURN VALUE
number of blocks in the key cache, if successful,
@@ -647,12 +654,11 @@ static uint next_power(uint value)
int init_pagecache(PAGECACHE *pagecache, my_size_t use_mem,
uint division_limit, uint age_threshold,
- uint block_size,
- LOG_HANDLER *loghandler)
+ uint block_size)
{
- int blocks, hash_links, length;
+ uint blocks, hash_links, length;
int error;
- DBUG_ENTER("init_key_cache");
+ DBUG_ENTER("init_pagecache");
DBUG_ASSERT(block_size >= 512);
PAGECACHE_DEBUG_OPEN;
@@ -662,8 +668,6 @@ int init_pagecache(PAGECACHE *pagecache, my_size_t use_mem,
DBUG_RETURN(0);
}
- pagecache->loghandler= loghandler;
-
pagecache->global_cache_w_requests= pagecache->global_cache_r_requests= 0;
pagecache->global_cache_read= pagecache->global_cache_write= 0;
pagecache->disk_blocks= -1;
@@ -692,8 +696,8 @@ int init_pagecache(PAGECACHE *pagecache, my_size_t use_mem,
for ( ; ; )
{
/* Set my_hash_entries to the next bigger 2 power */
- if ((pagecache->hash_entries= next_power((uint)blocks)) <
- ((uint)blocks) * 5/4)
+ if ((pagecache->hash_entries= next_power(blocks)) <
+ (blocks) * 5/4)
pagecache->hash_entries<<= 1;
hash_links= 2 * blocks;
#if defined(MAX_THREADS)
@@ -704,7 +708,7 @@ int init_pagecache(PAGECACHE *pagecache, my_size_t use_mem,
ALIGN_SIZE(hash_links * sizeof(PAGECACHE_HASH_LINK)) +
ALIGN_SIZE(sizeof(PAGECACHE_HASH_LINK*) *
pagecache->hash_entries))) +
- ((ulong) blocks << pagecache->shift) > use_mem)
+ (((ulong) blocks) << pagecache->shift) > use_mem)
blocks--;
/* Allocate memory for cache page buffers */
if ((pagecache->block_mem=
@@ -760,10 +764,10 @@ int init_pagecache(PAGECACHE *pagecache, my_size_t use_mem,
pagecache->warm_blocks= 0;
pagecache->min_warm_blocks= (division_limit ?
blocks * division_limit / 100 + 1 :
- (ulong)blocks);
+ blocks);
pagecache->age_threshold= (age_threshold ?
blocks * age_threshold / 100 :
- (ulong)blocks);
+ blocks);
pagecache->cnt_for_resize_op= 0;
pagecache->resize_in_flush= 0;
@@ -842,7 +846,8 @@ int resize_pagecache(PAGECACHE *pagecache,
{
int blocks;
struct st_my_thread_var *thread;
- PAGECACHE_WQUEUE *wqueue;
+ WQUEUE *wqueue;
+
DBUG_ENTER("resize_pagecache");
if (!pagecache->inited)
@@ -859,7 +864,7 @@ int resize_pagecache(PAGECACHE *pagecache,
#ifdef THREAD
wqueue= &pagecache->resize_queue;
thread= my_thread_var;
- link_into_queue(wqueue, thread);
+ wqueue_link_into_queue(wqueue, thread);
while (wqueue->last_thread->next != thread)
{
@@ -892,12 +897,11 @@ int resize_pagecache(PAGECACHE *pagecache,
end_pagecache(pagecache, 0); /* Don't free mutex */
/* The following will work even if use_mem is 0 */
blocks= init_pagecache(pagecache, pagecache->block_size, use_mem,
- division_limit, age_threshold,
- pagecache->loghandler);
+ division_limit, age_threshold);
finish:
#ifdef THREAD
- unlink_from_queue(wqueue, thread);
+ wqueue_unlink_from_queue(wqueue, thread);
/* Signal for the next resize request to proceeed if any */
if (wqueue->last_thread)
{
@@ -1027,146 +1031,6 @@ void end_pagecache(PAGECACHE *pagecache, my_bool cleanup)
} /* end_pagecache */
-#ifdef THREAD
-/*
- Link a thread into double-linked queue of waiting threads.
-
- SYNOPSIS
- link_into_queue()
- wqueue pointer to the queue structure
- thread pointer to the thread to be added to the queue
-
- RETURN VALUE
- none
-
- NOTES.
- Queue is represented by a circular list of the thread structures
- The list is double-linked of the type (**prev,*next), accessed by
- a pointer to the last element.
-*/
-
-static void link_into_queue(PAGECACHE_WQUEUE *wqueue,
- struct st_my_thread_var *thread)
-{
- struct st_my_thread_var *last;
- if (! (last= wqueue->last_thread))
- {
- /* Queue is empty */
- thread->next= thread;
- thread->prev= &thread->next;
- }
- else
- {
- thread->prev= last->next->prev;
- last->next->prev= &thread->next;
- thread->next= last->next;
- last->next= thread;
- }
- wqueue->last_thread= thread;
-}
-
-/*
- Unlink a thread from double-linked queue of waiting threads
-
- SYNOPSIS
- unlink_from_queue()
- wqueue pointer to the queue structure
- thread pointer to the thread to be removed from the queue
-
- RETURN VALUE
- none
-
- NOTES.
- See NOTES for link_into_queue
-*/
-
-static void unlink_from_queue(PAGECACHE_WQUEUE *wqueue,
- struct st_my_thread_var *thread)
-{
- KEYCACHE_DBUG_PRINT("unlink_from_queue", ("thread %ld", thread->id));
- if (thread->next == thread)
- /* The queue contains only one member */
- wqueue->last_thread= NULL;
- else
- {
- thread->next->prev= thread->prev;
- *thread->prev=thread->next;
- if (wqueue->last_thread == thread)
- wqueue->last_thread= STRUCT_PTR(struct st_my_thread_var, next,
- thread->prev);
- }
- thread->next= NULL;
-}
-
-
-/*
- Add a thread to single-linked queue of waiting threads
-
- SYNOPSIS
- add_to_queue()
- wqueue pointer to the queue structure
- thread pointer to the thread to be added to the queue
-
- RETURN VALUE
- none
-
- NOTES.
- Queue is represented by a circular list of the thread structures
- The list is single-linked of the type (*next), accessed by a pointer
- to the last element.
-*/
-
-static inline void add_to_queue(PAGECACHE_WQUEUE *wqueue,
- struct st_my_thread_var *thread)
-{
- struct st_my_thread_var *last;
- if (! (last= wqueue->last_thread))
- thread->next= thread;
- else
- {
- thread->next= last->next;
- last->next= thread;
- }
- wqueue->last_thread= thread;
-}
-
-
-/*
- Remove all threads from queue signaling them to proceed
-
- SYNOPSIS
- realease_queue()
- wqueue pointer to the queue structure
- thread pointer to the thread to be added to the queue
-
- RETURN VALUE
- none
-
- NOTES.
- See notes for add_to_queue
- When removed from the queue each thread is signaled via condition
- variable thread->suspend.
-*/
-
-static void release_queue(PAGECACHE_WQUEUE *wqueue)
-{
- struct st_my_thread_var *last= wqueue->last_thread;
- struct st_my_thread_var *next= last->next;
- struct st_my_thread_var *thread;
- do
- {
- thread=next;
- KEYCACHE_DBUG_PRINT("release_queue: signal", ("thread %ld", thread->id));
- pagecache_pthread_cond_signal(&thread->suspend);
- next=thread->next;
- thread->next= NULL;
- }
- while (thread != last);
- wqueue->last_thread= NULL;
-}
-#endif
-
-
/*
Unlink a block from the chain of dirty/clean blocks
*/
@@ -1273,6 +1137,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
PAGECACHE_BLOCK_LINK *ins;
PAGECACHE_BLOCK_LINK **ptr_ins;
+ BLOCK_INFO(block);
KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests));
#ifdef THREAD
if (!hot && pagecache->waiting_for_block.last_thread)
@@ -1297,7 +1162,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
{
KEYCACHE_DBUG_PRINT("link_block: signal", ("thread %ld", thread->id));
pagecache_pthread_cond_signal(&thread->suspend);
- unlink_from_queue(&pagecache->waiting_for_block, thread);
+ wqueue_unlink_from_queue(&pagecache->waiting_for_block, thread);
block->requests++;
}
}
@@ -1363,6 +1228,8 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
static void unlink_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
{
+ DBUG_ENTER("unlink_block");
+ DBUG_PRINT("unlink_block", ("unlink 0x%lx", (ulong)block));
if (block->next_used == block)
/* The list contains only one member */
pagecache->used_last= pagecache->used_ins= NULL;
@@ -1381,14 +1248,15 @@ static void unlink_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
KEYCACHE_THREAD_TRACE("unlink_block");
#if defined(PAGECACHE_DEBUG)
+ KEYCACHE_DBUG_ASSERT(pagecache->blocks_available != 0);
pagecache->blocks_available--;
KEYCACHE_DBUG_PRINT("unlink_block",
("unlinked block 0x%lx (%u) status=%x #requests=%u #available=%u",
(ulong)block, BLOCK_NUMBER(pagecache, block), block->status,
block->requests, pagecache->blocks_available));
BLOCK_INFO(block);
- KEYCACHE_DBUG_ASSERT(pagecache->blocks_available >= 0);
#endif
+ DBUG_VOID_RETURN;
}
@@ -1591,7 +1459,7 @@ static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link)
{
KEYCACHE_DBUG_PRINT("unlink_hash: signal", ("thread %ld", thread->id));
pagecache_pthread_cond_signal(&thread->suspend);
- unlink_from_queue(&pagecache->waiting_for_hash_link, thread);
+ wqueue_unlink_from_queue(&pagecache->waiting_for_hash_link, thread);
}
}
while (thread != last_thread);
@@ -1618,7 +1486,7 @@ static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link)
pagecache Pagecache reference
file file ID
pageno page number in the file
- start where to put pointer to found hash link (for
+ start where to put pointer to found hash bucket (for
direct referring it)
RETURN
@@ -1627,7 +1495,7 @@ static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link)
static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
- maria_page_no_t pageno,
+ pgcache_page_no_t pageno,
PAGECACHE_HASH_LINK ***start)
{
reg1 PAGECACHE_HASH_LINK *hash_link;
@@ -1670,6 +1538,12 @@ static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache,
KEYCACHE_DBUG_ASSERT(cnt <= pagecache->hash_links_used);
#endif
}
+ if (hash_link)
+ {
+ /* Register the request for the page */
+ hash_link->requests++;
+ }
+
DBUG_RETURN(hash_link);
}
@@ -1680,7 +1554,7 @@ static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache,
static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
- maria_page_no_t pageno)
+ pgcache_page_no_t pageno)
{
reg1 PAGECACHE_HASH_LINK *hash_link;
PAGECACHE_HASH_LINK **start;
@@ -1693,7 +1567,7 @@ restart:
/* try to find the page in the cache */
hash_link= get_present_hash_link(pagecache, file, pageno,
&start);
- if (! hash_link)
+ if (!hash_link)
{
/* There is no hash link in the hash table for the pair (file, pageno) */
if (pagecache->free_hash_list)
@@ -1714,7 +1588,7 @@ restart:
page.file= *file;
page.pageno= pageno;
thread->opt_info= (void *) &page;
- link_into_queue(&pagecache->waiting_for_hash_link, thread);
+ wqueue_link_into_queue(&pagecache->waiting_for_hash_link, thread);
KEYCACHE_DBUG_PRINT("get_hash_link: wait",
("suspend thread %ld", thread->id));
pagecache_pthread_cond_wait(&thread->suspend,
@@ -1723,14 +1597,15 @@ restart:
#else
KEYCACHE_DBUG_ASSERT(0);
#endif
+ DBUG_PRINT("info", ("restarting..."));
goto restart;
}
hash_link->file= *file;
hash_link->pageno= pageno;
link_hash(start, hash_link);
+ /* Register the request for the page */
+ hash_link->requests++;
}
- /* Register the request for the page */
- hash_link->requests++;
return hash_link;
}
@@ -1743,7 +1618,7 @@ restart:
SYNOPSIS
- find_key_block()
+ find_block()
pagecache pointer to a page cache data structure
file handler for the file to read page from
pageno number of the page in the file
@@ -1773,29 +1648,29 @@ restart:
waits until first of this operations links any block back.
*/
-static PAGECACHE_BLOCK_LINK *find_key_block(PAGECACHE *pagecache,
- PAGECACHE_FILE *file,
- maria_page_no_t pageno,
- int init_hits_left,
- my_bool wrmode,
- my_bool reg_req,
- int *page_st)
+static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file,
+ pgcache_page_no_t pageno,
+ int init_hits_left,
+ my_bool wrmode,
+ my_bool reg_req,
+ int *page_st)
{
PAGECACHE_HASH_LINK *hash_link;
PAGECACHE_BLOCK_LINK *block;
int error= 0;
int page_status;
- DBUG_ENTER("find_key_block");
- KEYCACHE_THREAD_TRACE("find_key_block:begin");
+ DBUG_ENTER("find_block");
+ KEYCACHE_THREAD_TRACE("find_block:begin");
DBUG_PRINT("enter", ("fd: %u pos %lu wrmode: %lu",
(uint) file->file, (ulong) pageno, (uint) wrmode));
- KEYCACHE_DBUG_PRINT("find_key_block", ("fd: %u pos: %lu wrmode: %lu",
- (uint) file->file, (ulong) pageno,
- (uint) wrmode));
+ KEYCACHE_DBUG_PRINT("find_block", ("fd: %u pos: %lu wrmode: %lu",
+ (uint) file->file, (ulong) pageno,
+ (uint) wrmode));
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
DBUG_EXECUTE("check_pagecache",
- test_key_cache(pagecache, "start of find_key_block", 0););
+ test_key_cache(pagecache, "start of find_block", 0););
#endif
restart:
@@ -1840,10 +1715,10 @@ restart:
{
#ifdef THREAD
struct st_my_thread_var *thread= my_thread_var;
- add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
+ wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
do
{
- KEYCACHE_DBUG_PRINT("find_key_block: wait",
+ KEYCACHE_DBUG_PRINT("find_block: wait",
("suspend thread %ld", thread->id));
pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock);
@@ -1871,7 +1746,7 @@ restart:
{
/* This is a request for a page to be removed from cache */
- KEYCACHE_DBUG_PRINT("find_key_block",
+ KEYCACHE_DBUG_PRINT("find_block",
("request for old page in block %u "
"wrmode: %d block->status: %d",
BLOCK_NUMBER(pagecache, block), wrmode,
@@ -1888,17 +1763,17 @@ restart:
else
{
hash_link->requests--;
- KEYCACHE_DBUG_PRINT("find_key_block",
+ KEYCACHE_DBUG_PRINT("find_block",
("request waiting for old page to be saved"));
{
#ifdef THREAD
struct st_my_thread_var *thread= my_thread_var;
/* Put the request into the queue of those waiting for the old page */
- add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
+ wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
/* Wait until the request can be resubmitted */
do
{
- KEYCACHE_DBUG_PRINT("find_key_block: wait",
+ KEYCACHE_DBUG_PRINT("find_block: wait",
("suspend thread %ld", thread->id));
pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock);
@@ -1909,11 +1784,13 @@ restart:
/* No parallel requests in single-threaded case */
#endif
}
- KEYCACHE_DBUG_PRINT("find_key_block",
+ KEYCACHE_DBUG_PRINT("find_block",
("request for old page resubmitted"));
+ DBUG_PRINT("info", ("restarting..."));
/* Resubmit the request */
goto restart;
}
+ block->status&= ~BLOCK_IN_SWITCH;
}
else
{
@@ -1941,7 +1818,8 @@ restart:
pagecache->blocks_used++;
}
pagecache->blocks_unused--;
- DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0);
+ DBUG_ASSERT((block->status & BLOCK_WRLOCK));
+ DBUG_ASSERT(block->pins > 0);
block->status= 0;
#ifndef DBUG_OFF
block->type= PAGECACHE_EMPTY_PAGE;
@@ -1954,7 +1832,9 @@ restart:
block->hash_link= hash_link;
hash_link->block= block;
page_status= PAGE_TO_BE_READ;
- KEYCACHE_DBUG_PRINT("find_key_block",
+ DBUG_PRINT("info", ("page to be read set for page 0x%lx",
+ (ulong)block));
+ KEYCACHE_DBUG_PRINT("find_block",
("got free or never used block %u",
BLOCK_NUMBER(pagecache, block)));
}
@@ -1973,10 +1853,10 @@ restart:
{
struct st_my_thread_var *thread= my_thread_var;
thread->opt_info= (void *) hash_link;
- link_into_queue(&pagecache->waiting_for_block, thread);
+ wqueue_link_into_queue(&pagecache->waiting_for_block, thread);
do
{
- KEYCACHE_DBUG_PRINT("find_key_block: wait",
+ KEYCACHE_DBUG_PRINT("find_block: wait",
("suspend thread %ld", thread->id));
pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock);
@@ -2001,19 +1881,18 @@ restart:
reg_requests(pagecache, block,1);
hash_link->block= block;
}
- else
- {
- DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0);
- }
+ DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0);
+ DBUG_ASSERT(block->pins > 0);
if (block->hash_link != hash_link &&
! (block->status & BLOCK_IN_SWITCH) )
{
/* this is a primary request for a new page */
DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0);
+ DBUG_ASSERT(block->pins > 0);
block->status|= (BLOCK_IN_SWITCH | BLOCK_WRLOCK);
- KEYCACHE_DBUG_PRINT("find_key_block",
+ KEYCACHE_DBUG_PRINT("find_block",
("got block %u for new page",
BLOCK_NUMBER(pagecache, block)));
@@ -2021,7 +1900,7 @@ restart:
{
/* The block contains a dirty page - push it out of the cache */
- KEYCACHE_DBUG_PRINT("find_key_block", ("block is dirty"));
+ KEYCACHE_DBUG_PRINT("find_block", ("block is dirty"));
pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
/*
@@ -2054,7 +1933,7 @@ restart:
unlink_hash(pagecache, block->hash_link);
/* All pending requests for this page must be resubmitted */
if (block->wqueue[COND_FOR_SAVED].last_thread)
- release_queue(&block->wqueue[COND_FOR_SAVED]);
+ wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
}
link_to_file_list(pagecache, block, file,
(my_bool)(block->hash_link ? 1 : 0));
@@ -2065,6 +1944,8 @@ restart:
#endif
block->hash_link= hash_link;
page_status= PAGE_TO_BE_READ;
+ DBUG_PRINT("info", ("page to be read set for page 0x%lx",
+ (ulong)block));
KEYCACHE_DBUG_ASSERT(block->hash_link->block == block);
KEYCACHE_DBUG_ASSERT(hash_link->block->hash_link == hash_link);
@@ -2072,7 +1953,7 @@ restart:
else
{
/* This is for secondary requests for a new page only */
- KEYCACHE_DBUG_PRINT("find_key_block",
+ KEYCACHE_DBUG_PRINT("find_block",
("block->hash_link: %p hash_link: %p "
"block->status: %u", block->hash_link,
hash_link, block->status ));
@@ -2087,7 +1968,7 @@ restart:
{
if (reg_req)
reg_requests(pagecache, block, 1);
- KEYCACHE_DBUG_PRINT("find_key_block",
+ KEYCACHE_DBUG_PRINT("find_block",
("block->hash_link: %p hash_link: %p "
"block->status: %u", block->hash_link,
hash_link, block->status ));
@@ -2098,12 +1979,12 @@ restart:
}
KEYCACHE_DBUG_ASSERT(page_status != -1);
- *page_st=page_status;
+ *page_st= page_status;
DBUG_PRINT("info",
("block: 0x%lx fd: %u pos %lu block->status %u page_status %lu",
(ulong) block, (uint) file->file,
(ulong) pageno, block->status, (uint) page_status));
- KEYCACHE_DBUG_PRINT("find_key_block",
+ KEYCACHE_DBUG_PRINT("find_block",
("block: 0x%lx fd: %u pos %lu block->status %u page_status %lu",
(ulong) block,
(uint) file->file, (ulong) pageno, block->status,
@@ -2111,16 +1992,16 @@ restart:
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
DBUG_EXECUTE("check_pagecache",
- test_key_cache(pagecache, "end of find_key_block",0););
+ test_key_cache(pagecache, "end of find_block",0););
#endif
- KEYCACHE_THREAD_TRACE("find_key_block:end");
+ KEYCACHE_THREAD_TRACE("find_block:end");
DBUG_RETURN(block);
}
-void pagecache_add_pin(PAGECACHE_BLOCK_LINK *block)
+static void add_pin(PAGECACHE_BLOCK_LINK *block)
{
- DBUG_ENTER("pagecache_add_pin");
+ DBUG_ENTER("add_pin");
DBUG_PRINT("enter", ("block 0x%lx pins: %u",
(ulong) block,
block->pins));
@@ -2137,9 +2018,9 @@ void pagecache_add_pin(PAGECACHE_BLOCK_LINK *block)
DBUG_VOID_RETURN;
}
-void pagecache_remove_pin(PAGECACHE_BLOCK_LINK *block)
+static void remove_pin(PAGECACHE_BLOCK_LINK *block)
{
- DBUG_ENTER("pagecache_remove_pin");
+ DBUG_ENTER("remove_pin");
DBUG_PRINT("enter", ("block 0x%lx pins: %u",
(ulong) block,
block->pins));
@@ -2157,7 +2038,7 @@ void pagecache_remove_pin(PAGECACHE_BLOCK_LINK *block)
DBUG_VOID_RETURN;
}
#ifdef PAGECACHE_DEBUG
-void pagecache_add_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl)
+static void info_add_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl)
{
PAGECACHE_LOCK_INFO *info=
(PAGECACHE_LOCK_INFO *)my_malloc(sizeof(PAGECACHE_LOCK_INFO), MYF(0));
@@ -2166,7 +2047,7 @@ void pagecache_add_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl)
info_link((PAGECACHE_PIN_INFO **)&block->lock_list,
(PAGECACHE_PIN_INFO *)info);
}
-void pagecache_remove_lock(PAGECACHE_BLOCK_LINK *block)
+static void info_remove_lock(PAGECACHE_BLOCK_LINK *block)
{
PAGECACHE_LOCK_INFO *info=
(PAGECACHE_LOCK_INFO *)info_find((PAGECACHE_PIN_INFO *)block->lock_list,
@@ -2175,7 +2056,7 @@ void pagecache_remove_lock(PAGECACHE_BLOCK_LINK *block)
info_unlink((PAGECACHE_PIN_INFO *)info);
my_free((gptr)info, MYF(0));
}
-void pagecache_change_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl)
+static void info_change_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl)
{
PAGECACHE_LOCK_INFO *info=
(PAGECACHE_LOCK_INFO *)info_find((PAGECACHE_PIN_INFO *)block->lock_list,
@@ -2184,40 +2065,47 @@ void pagecache_change_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl)
info->write_lock= wl;
}
#else
-#define pagecache_add_lock(B,W)
-#define pagecache_remove_lock(B)
-#define pagecache_change_lock(B,W)
+#define info_add_lock(B,W)
+#define info_remove_lock(B)
+#define info_change_lock(B,W)
#endif
/*
- Put on the block "update" type lock
+ Put on the block write lock
SYNOPSIS
- pagecache_lock_block()
+ get_wrlock()
pagecache pointer to a page cache data structure
block the block to work with
RETURN
0 - OK
- 1 - Try to lock the block failed
+ 1 - Can't lock this block, need retry
*/
-my_bool pagecache_lock_block(PAGECACHE *pagecache,
- PAGECACHE_BLOCK_LINK *block)
-{
- DBUG_ENTER("pagecache_lock_block");
+static my_bool get_wrlock(PAGECACHE *pagecache,
+ PAGECACHE_BLOCK_LINK *block)
+{
+ PAGECACHE_FILE file= block->hash_link->file;
+ pgcache_page_no_t pageno= block->hash_link->pageno;
+ DBUG_ENTER("get_wrlock");
+ DBUG_PRINT("info", ("the block 0x%lx "
+ "files %d(%d) pages %d(%d)",
+ (ulong)block,
+ file.file, block->hash_link->file.file,
+ pageno, block->hash_link->pageno));
BLOCK_INFO(block);
while (block->status & BLOCK_WRLOCK)
{
- DBUG_PRINT("info", ("fail to lock, waiting..."));
+ DBUG_PRINT("info", ("fail to lock, waiting... 0x%lx", (ulong)block));
/* Lock failed we will wait */
#ifdef THREAD
struct st_my_thread_var *thread= my_thread_var;
- add_to_queue(&block->wqueue[COND_FOR_WRLOCK], thread);
+ wqueue_add_to_queue(&block->wqueue[COND_FOR_WRLOCK], thread);
dec_counter_for_resize_op(pagecache);
do
{
- KEYCACHE_DBUG_PRINT("pagecache_lock_block: wait",
+ KEYCACHE_DBUG_PRINT("get_wrlock: wait",
("suspend thread %ld", thread->id));
pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock);
@@ -2227,35 +2115,61 @@ my_bool pagecache_lock_block(PAGECACHE *pagecache,
DBUG_ASSERT(0);
#endif
BLOCK_INFO(block);
- DBUG_RETURN(1);
+ if ((block->status & (BLOCK_REASSIGNED | BLOCK_IN_SWITCH)) ||
+ file.file != block->hash_link->file.file ||
+ pageno != block->hash_link->pageno)
+ {
+ DBUG_PRINT("info", ("the block 0x%lx changed => need retry"
+ "status %x files %d != %d or pages %d !=%d",
+ (ulong)block, block->status,
+ file.file, block->hash_link->file.file,
+ pageno, block->hash_link->pageno));
+ DBUG_RETURN(1);
+ }
}
- /* we are doing it by global cache mutex protectio, so it is OK */
+ DBUG_ASSERT(block->pins == 0);
+ /* we are doing it by global cache mutex protection, so it is OK */
block->status|= BLOCK_WRLOCK;
DBUG_PRINT("info", ("WR lock set, block 0x%lx", (ulong)block));
DBUG_RETURN(0);
}
-void pagecache_unlock_block(PAGECACHE_BLOCK_LINK *block)
+
+/*
+ Remove write lock from the block
+
+ SYNOPSIS
+ release_wrlock()
+ pagecache pointer to a page cache data structure
+ block the block to work with
+
+ RETURN
+ 0 - OK
+*/
+
+static void release_wrlock(PAGECACHE_BLOCK_LINK *block)
{
- DBUG_ENTER("pagecache_unlock_block");
+ DBUG_ENTER("release_wrlock");
BLOCK_INFO(block);
DBUG_ASSERT(block->status & BLOCK_WRLOCK);
+ DBUG_ASSERT(block->pins > 0);
block->status&= ~BLOCK_WRLOCK;
DBUG_PRINT("info", ("WR lock reset, block 0x%lx", (ulong)block));
#ifdef THREAD
/* release all threads waiting for write lock */
if (block->wqueue[COND_FOR_WRLOCK].last_thread)
- release_queue(&block->wqueue[COND_FOR_WRLOCK]);
+ wqueue_release_queue(&block->wqueue[COND_FOR_WRLOCK]);
#endif
BLOCK_INFO(block);
DBUG_VOID_RETURN;
}
+
/*
- Try to lock/uplock and pin/unpin the block
+ Try to lock/unlock and pin/unpin the block
SYNOPSIS
- pagecache_make_lock_and_pin()
+ make_lock_and_pin()
pagecache pointer to a page cache data structure
block the block to work with
lock lock change mode
@@ -2266,12 +2180,12 @@ void pagecache_unlock_block(PAGECACHE_BLOCK_LINK *block)
1 - Try to lock the block failed
*/
-my_bool pagecache_make_lock_and_pin(PAGECACHE *pagecache,
- PAGECACHE_BLOCK_LINK *block,
- enum pagecache_page_lock lock,
- enum pagecache_page_pin pin)
+static my_bool make_lock_and_pin(PAGECACHE *pagecache,
+ PAGECACHE_BLOCK_LINK *block,
+ enum pagecache_page_lock lock,
+ enum pagecache_page_pin pin)
{
- DBUG_ENTER("pagecache_make_lock_and_pin");
+ DBUG_ENTER("make_lock_and_pin");
DBUG_PRINT("enter", ("block: 0x%lx (%u), wrlock: %c pins: %u, lock %s, pin: %s",
(ulong)block, BLOCK_NUMBER(pagecache, block),
((block->status & BLOCK_WRLOCK)?'Y':'N'),
@@ -2287,53 +2201,47 @@ my_bool pagecache_make_lock_and_pin(PAGECACHE *pagecache,
{
case PAGECACHE_LOCK_WRITE: /* free -> write */
/* Writelock and pin the buffer */
- if (pagecache_lock_block(pagecache, block))
+ if (get_wrlock(pagecache, block))
{
- DBUG_PRINT("info", ("restart"));
- /* in case of fail pagecache_lock_block unlock cache */
- DBUG_RETURN(1);
+ /* can't lock => need retry */
+ goto retry;
}
- /* The cache is locked so nothing afraid off */
- pagecache_add_pin(block);
- pagecache_add_lock(block, 1);
+
+ /* The cache is locked so nothing afraid of */
+ add_pin(block);
+ info_add_lock(block, 1);
break;
case PAGECACHE_LOCK_WRITE_TO_READ: /* write -> read */
case PAGECACHE_LOCK_WRITE_UNLOCK: /* write -> free */
/*
- Removes writelog and puts read lock (which is nothing in our
+ Removes write lock and puts read lock (which is nothing in our
implementation)
*/
- pagecache_unlock_block(block);
+ release_wrlock(block);
case PAGECACHE_LOCK_READ_UNLOCK: /* read -> free */
case PAGECACHE_LOCK_LEFT_READLOCKED: /* read -> read */
-#ifndef DBUG_OFF
if (pin == PAGECACHE_UNPIN)
{
- pagecache_remove_pin(block);
+ remove_pin(block);
}
-#endif
-#ifdef PAGECACHE_DEBUG
if (lock == PAGECACHE_LOCK_WRITE_TO_READ)
{
- pagecache_change_lock(block, 0);
+ info_change_lock(block, 0);
}
else if (lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
lock == PAGECACHE_LOCK_READ_UNLOCK)
{
- pagecache_remove_lock(block);
+ info_remove_lock(block);
}
-#endif
break;
case PAGECACHE_LOCK_READ: /* free -> read */
-#ifndef DBUG_OFF
if (pin == PAGECACHE_PIN)
{
/* The cache is locked so nothing afraid off */
- pagecache_add_pin(block);
+ add_pin(block);
}
- pagecache_add_lock(block, 0);
+ info_add_lock(block, 0);
break;
-#endif
case PAGECACHE_LOCK_LEFT_UNLOCKED: /* free -> free */
case PAGECACHE_LOCK_LEFT_WRITELOCKED: /* write -> write */
break; /* do nothing */
@@ -2343,6 +2251,16 @@ my_bool pagecache_make_lock_and_pin(PAGECACHE *pagecache,
BLOCK_INFO(block);
DBUG_RETURN(0);
+retry:
+ DBUG_PRINT("INFO", ("Retry block 0x%lx", (ulong)block));
+ BLOCK_INFO(block);
+ DBUG_ASSERT(block->hash_link->requests != 0);
+ block->hash_link->requests--;
+ DBUG_ASSERT(block->requests != 0);
+ unreg_request(pagecache, block, 1);
+ BLOCK_INFO(block);
+ DBUG_RETURN(1);
+
}
@@ -2355,6 +2273,8 @@ my_bool pagecache_make_lock_and_pin(PAGECACHE *pagecache,
pagecache pointer to a page cache data structure
block block to which buffer the data is to be read
primary <-> the current thread will read the data
+ validator validator of read from the disk data
+ validator_data pointer to the data need by the validator
RETURN VALUE
None
@@ -2368,13 +2288,15 @@ my_bool pagecache_make_lock_and_pin(PAGECACHE *pagecache,
static void read_block(PAGECACHE *pagecache,
PAGECACHE_BLOCK_LINK *block,
- my_bool primary)
+ my_bool primary,
+ pagecache_disk_read_validator validator,
+ gptr validator_data)
{
uint got_length;
/* On entry cache_lock is locked */
- KEYCACHE_THREAD_TRACE("read_block");
+ DBUG_ENTER("read_block");
if (primary)
{
/*
@@ -2382,8 +2304,8 @@ static void read_block(PAGECACHE *pagecache,
that submitted primary requests
*/
- KEYCACHE_DBUG_PRINT("read_block",
- ("page to be read by primary request"));
+ DBUG_PRINT("read_block",
+ ("page to be read by primary request"));
/* Page is not in buffer yet, is to be read from disk */
pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
@@ -2400,11 +2322,15 @@ static void read_block(PAGECACHE *pagecache,
else
block->status= (BLOCK_READ | (block->status & BLOCK_WRLOCK));
- KEYCACHE_DBUG_PRINT("read_block",
- ("primary request: new page in cache"));
+ if (validator != NULL &&
+ (*validator)(block->buffer, validator_data))
+ block->status|= BLOCK_ERROR;
+
+ DBUG_PRINT("read_block",
+ ("primary request: new page in cache"));
/* Signal that all pending requests for this page now can be processed */
if (block->wqueue[COND_FOR_REQUESTED].last_thread)
- release_queue(&block->wqueue[COND_FOR_REQUESTED]);
+ wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
}
else
{
@@ -2412,17 +2338,17 @@ static void read_block(PAGECACHE *pagecache,
This code is executed only by threads
that submitted secondary requests
*/
- KEYCACHE_DBUG_PRINT("read_block",
- ("secondary request waiting for new page to be read"));
+ DBUG_PRINT("read_block",
+ ("secondary request waiting for new page to be read"));
{
#ifdef THREAD
struct st_my_thread_var *thread= my_thread_var;
/* Put the request into a queue and wait until it can be processed */
- add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread);
+ wqueue_add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread);
do
{
- KEYCACHE_DBUG_PRINT("read_block: wait",
- ("suspend thread %ld", thread->id));
+ DBUG_PRINT("read_block: wait",
+ ("suspend thread %ld", thread->id));
pagecache_pthread_cond_wait(&thread->suspend,
&pagecache->cache_lock);
}
@@ -2432,9 +2358,10 @@ static void read_block(PAGECACHE *pagecache,
/* No parallel requests in single-threaded case */
#endif
}
- KEYCACHE_DBUG_PRINT("read_block",
- ("secondary request: new page in cache"));
+ DBUG_PRINT("read_block",
+ ("secondary request: new page in cache"));
}
+ DBUG_VOID_RETURN;
}
@@ -2454,11 +2381,11 @@ static void read_block(PAGECACHE *pagecache,
void pagecache_unlock_page(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
- maria_page_no_t pageno,
+ pgcache_page_no_t pageno,
enum pagecache_page_lock lock,
enum pagecache_page_pin pin,
my_bool stamp_this_page,
- LSN first_REDO_LSN_for_page)
+ LSN_PTR first_REDO_LSN_for_page)
{
PAGECACHE_BLOCK_LINK *block;
int page_st;
@@ -2471,24 +2398,6 @@ void pagecache_unlock_page(PAGECACHE *pagecache,
DBUG_ASSERT(pin != PAGECACHE_PIN &&
lock != PAGECACHE_LOCK_READ &&
lock != PAGECACHE_LOCK_WRITE);
- if (pin == PAGECACHE_PIN_LEFT_UNPINNED &&
- lock == PAGECACHE_LOCK_READ_UNLOCK)
- {
-#ifndef DBUG_OFF
- if (
-#endif
- /* block do not need here so we do not provide it */
- pagecache_make_lock_and_pin(pagecache, 0, lock, pin)
-#ifndef DBUG_OFF
- )
- {
- DBUG_ASSERT(0); /* should not happend */
- }
-#else
- ;
-#endif
- DBUG_VOID_RETURN;
- }
pagecache_pthread_mutex_lock(&pagecache->cache_lock);
/*
@@ -2498,7 +2407,7 @@ void pagecache_unlock_page(PAGECACHE *pagecache,
DBUG_ASSERT(pagecache->can_be_used);
inc_counter_for_resize_op(pagecache);
- block= find_key_block(pagecache, file, pageno, 0, 0, 0, &page_st);
+ block= find_block(pagecache, file, pageno, 0, 0, 0, &page_st);
BLOCK_INFO(block);
DBUG_ASSERT(block != 0 && page_st == PAGE_READ);
if (stamp_this_page)
@@ -2511,7 +2420,7 @@ void pagecache_unlock_page(PAGECACHE *pagecache,
#ifndef DBUG_OFF
if (
#endif
- pagecache_make_lock_and_pin(pagecache, block, lock, pin)
+ make_lock_and_pin(pagecache, block, lock, pin)
#ifndef DBUG_OFF
)
{
@@ -2549,7 +2458,7 @@ void pagecache_unlock_page(PAGECACHE *pagecache,
void pagecache_unpin_page(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
- maria_page_no_t pageno)
+ pgcache_page_no_t pageno)
{
PAGECACHE_BLOCK_LINK *block;
int page_st;
@@ -2565,7 +2474,7 @@ void pagecache_unpin_page(PAGECACHE *pagecache,
DBUG_ASSERT(pagecache->can_be_used);
inc_counter_for_resize_op(pagecache);
- block= find_key_block(pagecache, file, pageno, 0, 0, 0, &page_st);
+ block= find_block(pagecache, file, pageno, 0, 0, 0, &page_st);
DBUG_ASSERT(block != 0 && page_st == PAGE_READ);
#ifndef DBUG_OFF
@@ -2576,9 +2485,9 @@ void pagecache_unpin_page(PAGECACHE *pagecache,
a) we can't pin without any lock
b) we can't unpin keeping write lock
*/
- pagecache_make_lock_and_pin(pagecache, block,
- PAGECACHE_LOCK_LEFT_READLOCKED,
- PAGECACHE_UNPIN)
+ make_lock_and_pin(pagecache, block,
+ PAGECACHE_LOCK_LEFT_READLOCKED,
+ PAGECACHE_UNPIN)
#ifndef DBUG_OFF
)
{
@@ -2622,7 +2531,7 @@ void pagecache_unlock(PAGECACHE *pagecache,
enum pagecache_page_lock lock,
enum pagecache_page_pin pin,
my_bool stamp_this_page,
- LSN first_REDO_LSN_for_page)
+ LSN_PTR first_REDO_LSN_for_page)
{
PAGECACHE_BLOCK_LINK *block= (PAGECACHE_BLOCK_LINK *)link;
DBUG_ENTER("pagecache_unlock");
@@ -2643,7 +2552,7 @@ void pagecache_unlock(PAGECACHE *pagecache,
if (
#endif
/* block do not need here so we do not provide it */
- pagecache_make_lock_and_pin(pagecache, 0, lock, pin)
+ make_lock_and_pin(pagecache, 0, lock, pin)
#ifndef DBUG_OFF
)
{
@@ -2673,7 +2582,7 @@ void pagecache_unlock(PAGECACHE *pagecache,
#ifndef DBUG_OFF
if (
#endif
- pagecache_make_lock_and_pin(pagecache, block, lock, pin)
+ make_lock_and_pin(pagecache, block, lock, pin)
#ifndef DBUG_OFF
)
{
@@ -2736,9 +2645,9 @@ void pagecache_unpin(PAGECACHE *pagecache,
a) we can't pin without any lock
b) we can't unpin keeping write lock
*/
- pagecache_make_lock_and_pin(pagecache, block,
- PAGECACHE_LOCK_LEFT_READLOCKED,
- PAGECACHE_UNPIN)
+ make_lock_and_pin(pagecache, block,
+ PAGECACHE_LOCK_LEFT_READLOCKED,
+ PAGECACHE_UNPIN)
#ifndef DBUG_OFF
)
{
@@ -2767,7 +2676,7 @@ void pagecache_unpin(PAGECACHE *pagecache,
Read a block of data from a cached file into a buffer;
SYNOPSIS
- pagecache_read()
+ pagecache_valid_read()
pagecache pointer to a page cache data structure
file handler for the file for the block of data to be read
pageno number of the block of data in the file
@@ -2776,16 +2685,12 @@ void pagecache_unpin(PAGECACHE *pagecache,
type type of the page
lock lock change
link link to the page if we pin it
+ validator validator of read from the disk data
+ validator_data pointer to the data need by the validator
RETURN VALUE
Returns address from where the data is placed if sucessful, 0 - otherwise.
- NOTES.
-
- The function ensures that a block of data of size length from file
- positioned at pageno is in the buffers for some key cache blocks.
- Then the function copies the data into the buffer buff.
-
Pin will be choosen according to lock parameter (see lock_to_pin)
*/
static enum pagecache_page_pin lock_to_pin[]=
@@ -2800,19 +2705,21 @@ static enum pagecache_page_pin lock_to_pin[]=
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_TO_READ*/
};
-byte *pagecache_read(PAGECACHE *pagecache,
- PAGECACHE_FILE *file,
- maria_page_no_t pageno,
- uint level,
- byte *buff,
- enum pagecache_page_type type,
- enum pagecache_page_lock lock,
- PAGECACHE_PAGE_LINK *link)
+byte *pagecache_valid_read(PAGECACHE *pagecache,
+ PAGECACHE_FILE *file,
+ pgcache_page_no_t pageno,
+ uint level,
+ byte *buff,
+ enum pagecache_page_type type,
+ enum pagecache_page_lock lock,
+ PAGECACHE_PAGE_LINK *link,
+ pagecache_disk_read_validator validator,
+ gptr validator_data)
{
int error= 0;
enum pagecache_page_pin pin= lock_to_pin[lock];
PAGECACHE_PAGE_LINK fake_link;
- DBUG_ENTER("page_cache_read");
+ DBUG_ENTER("pagecache_valid_read");
DBUG_PRINT("enter", ("fd: %u page: %lu level: %u t:%s l%s p%s",
(uint) file->file, (ulong) pageno, level,
page_cache_page_type_str[type],
@@ -2829,7 +2736,7 @@ restart:
if (pagecache->can_be_used)
{
/* Key cache is used */
- reg1 PAGECACHE_BLOCK_LINK *block;
+ PAGECACHE_BLOCK_LINK *block;
uint status;
int page_st;
@@ -2842,29 +2749,33 @@ restart:
inc_counter_for_resize_op(pagecache);
pagecache->global_cache_r_requests++;
- block= find_key_block(pagecache, file, pageno, level,
- ((lock == PAGECACHE_LOCK_WRITE) ? 1 : 0),
- (((pin == PAGECACHE_PIN_LEFT_PINNED) ||
- (pin == PAGECACHE_UNPIN)) ? 0 : 1),
- &page_st);
+ block= find_block(pagecache, file, pageno, level,
+ test(lock == PAGECACHE_LOCK_WRITE),
+ test((pin == PAGECACHE_PIN_LEFT_PINNED) ||
+ (pin == PAGECACHE_UNPIN)),
+ &page_st);
DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
block->type == type);
block->type= type;
- if (pagecache_make_lock_and_pin(pagecache, block, lock, pin))
+ if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
+ {
+ DBUG_PRINT("info", ("read block 0x%lx", (ulong)block));
+ /* The requested page is to be read into the block buffer */
+ read_block(pagecache, block,
+ (my_bool)(page_st == PAGE_TO_BE_READ),
+ validator, validator_data);
+ DBUG_PRINT("info", ("read is done"));
+ }
+ if (make_lock_and_pin(pagecache, block, lock, pin))
{
/*
- We failed to write lock the block, cache is unlocked, and last write
- lock is released, we will try to get the block again.
+ We failed to write lock the block, cache is unlocked,
+ we will try to get the block again.
*/
pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ DBUG_PRINT("info", ("restarting..."));
goto restart;
}
- if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
- {
- /* The requested page is to be read into the block buffer */
- read_block(pagecache, block,
- (my_bool)(page_st == PAGE_TO_BE_READ));
- }
if (! ((status= block->status) & BLOCK_ERROR))
{
@@ -2933,7 +2844,7 @@ no_key_cache: /* Key cache is not used */
*/
my_bool pagecache_delete_page(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
- maria_page_no_t pageno,
+ pgcache_page_no_t pageno,
enum pagecache_page_lock lock,
my_bool flush)
{
@@ -2969,13 +2880,14 @@ restart:
}
block= link->block;
DBUG_ASSERT(block != 0);
- if (pagecache_make_lock_and_pin(pagecache, block, lock, pin))
+ if (make_lock_and_pin(pagecache, block, lock, pin))
{
/*
We failed to writelock the block, cache is unlocked, and last write
lock is released, we will try to get the block again.
*/
pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ DBUG_PRINT("info", ("restarting..."));
goto restart;
}
@@ -2983,7 +2895,7 @@ restart:
{
/* The block contains a dirty page - push it out of the cache */
- KEYCACHE_DBUG_PRINT("find_key_block", ("block is dirty"));
+ KEYCACHE_DBUG_PRINT("find_block", ("block is dirty"));
pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
/*
@@ -3015,9 +2927,10 @@ restart:
}
/* Cache is locked, so we can relese page before freeing it */
- pagecache_make_lock_and_pin(pagecache, block,
- PAGECACHE_LOCK_WRITE_UNLOCK,
- PAGECACHE_UNPIN);
+ make_lock_and_pin(pagecache, block,
+ PAGECACHE_LOCK_WRITE_UNLOCK,
+ PAGECACHE_UNPIN);
+ link->requests--;
if (pin == PAGECACHE_PIN_LEFT_PINNED)
unreg_request(pagecache, block, 1);
free_block(pagecache, block);
@@ -3053,11 +2966,12 @@ end:
0 if a success, 1 - otherwise.
*/
+/* description of how to change lock before and after write */
struct write_lock_change
{
- int need_lock_change;
- enum pagecache_page_lock new_lock;
- enum pagecache_page_lock unlock_lock;
+ int need_lock_change; /* need changing of lock at the end of write */
+ enum pagecache_page_lock new_lock; /* lock at the beginning */
+ enum pagecache_page_lock unlock_lock; /* lock at the end */
};
static struct write_lock_change write_lock_change_table[]=
@@ -3084,10 +2998,11 @@ static struct write_lock_change write_lock_change_table[]=
PAGECACHE_LOCK_WRITE_TO_READ}/*PAGECACHE_LOCK_WRITE_TO_READ*/
};
+/* description of how to change pin before and after write */
struct write_pin_change
{
- enum pagecache_page_pin new_pin;
- enum pagecache_page_pin unlock_pin;
+ enum pagecache_page_pin new_pin; /* pin status at the beginning */
+ enum pagecache_page_pin unlock_pin; /* pin status at the end */
};
static struct write_pin_change write_pin_change_table[]=
@@ -3104,7 +3019,7 @@ static struct write_pin_change write_pin_change_table[]=
my_bool pagecache_write(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
- maria_page_no_t pageno,
+ pgcache_page_no_t pageno,
uint level,
byte *buff,
enum pagecache_page_type type,
@@ -3113,7 +3028,7 @@ my_bool pagecache_write(PAGECACHE *pagecache,
enum pagecache_write_mode write_mode,
PAGECACHE_PAGE_LINK *link)
{
- reg1 PAGECACHE_BLOCK_LINK *block;
+ reg1 PAGECACHE_BLOCK_LINK *block= NULL;
PAGECACHE_PAGE_LINK fake_link;
int error= 0;
int need_lock_change= write_lock_change_table[lock].need_lock_change;
@@ -3133,7 +3048,7 @@ my_bool pagecache_write(PAGECACHE *pagecache,
if (write_mode == PAGECACHE_WRITE_NOW)
{
- /* we allow direct write if wwe do not use long term lockings */
+ /* we allow direct write if we do not use long term lockings */
DBUG_ASSERT(lock == PAGECACHE_LOCK_LEFT_UNLOCKED);
/* Force writing from buff into disk */
pagecache->global_cache_write++;
@@ -3167,10 +3082,10 @@ restart:
lock != PAGECACHE_LOCK_LEFT_WRITELOCKED &&
lock != PAGECACHE_LOCK_WRITE_UNLOCK &&
lock != PAGECACHE_LOCK_WRITE_TO_READ);
- block= find_key_block(pagecache, file, pageno, level,
- (need_wrlock ? 1 : 0),
- (need_wrlock ? 1 : 0),
- &page_st);
+ block= find_block(pagecache, file, pageno, level,
+ (need_wrlock ? 1 : 0),
+ (need_wrlock ? 1 : 0),
+ &page_st);
}
if (!block)
{
@@ -3186,24 +3101,25 @@ restart:
block->type == type);
block->type= type;
- if (pagecache_make_lock_and_pin(pagecache, block,
- write_lock_change_table[lock].new_lock,
- (need_lock_change ?
- write_pin_change_table[pin].new_pin :
- pin)))
+ if (make_lock_and_pin(pagecache, block,
+ write_lock_change_table[lock].new_lock,
+ (need_lock_change ?
+ write_pin_change_table[pin].new_pin :
+ pin)))
{
/*
We failed to writelock the block, cache is unlocked, and last write
lock is released, we will try to get the block again.
*/
pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
+ DBUG_PRINT("info", ("restarting..."));
goto restart;
}
if (write_mode == PAGECACHE_WRITE_DONE)
{
- if (block->status != BLOCK_ERROR && page_st != PAGE_READ)
+ if ((block->status & BLOCK_ERROR) && page_st != PAGE_READ)
{
/* Copy data from buff */
bmove512(block->buffer, buff, pagecache->block_size);
@@ -3212,7 +3128,7 @@ restart:
("primary request: new page in cache"));
/* Signal that all pending requests for this now can be processed. */
if (block->wqueue[COND_FOR_REQUESTED].last_thread)
- release_queue(&block->wqueue[COND_FOR_REQUESTED]);
+ wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
}
}
else
@@ -3220,7 +3136,8 @@ restart:
if (write_mode == PAGECACHE_WRITE_NOW)
{
/* buff has been written to disk at start */
- if (block->status & BLOCK_CHANGED)
+ if ((block->status & BLOCK_CHANGED) &&
+ !(block->status & BLOCK_ERROR))
link_to_file_list(pagecache, block, &block->hash_link->file, 1);
}
else
@@ -3231,8 +3148,8 @@ restart:
if (! (block->status & BLOCK_ERROR))
{
bmove512(block->buffer, buff, pagecache->block_size);
+ block->status|= BLOCK_READ;
}
- block->status|= BLOCK_READ;
}
@@ -3242,9 +3159,9 @@ restart:
int rc=
#endif
#warning we are doing an unlock here, so need to give the page its rec_lsn!
- pagecache_make_lock_and_pin(pagecache, block,
- write_lock_change_table[lock].unlock_lock,
- write_pin_change_table[pin].unlock_pin);
+ make_lock_and_pin(pagecache, block,
+ write_lock_change_table[lock].unlock_lock,
+ write_pin_change_table[pin].unlock_pin);
#ifndef DBUG_OFF
DBUG_ASSERT(rc == 0);
#endif
@@ -3255,10 +3172,7 @@ restart:
block->hash_link->requests--;
if (pin != PAGECACHE_PIN_LEFT_PINNED && pin != PAGECACHE_PIN)
{
- if (write_mode != PAGECACHE_WRITE_DONE)
- {
- unreg_request(pagecache, block, 1);
- }
+ unreg_request(pagecache, block, 1);
}
else
*link= (PAGECACHE_PAGE_LINK)block;
@@ -3290,6 +3204,7 @@ end:
DBUG_EXECUTE("exec",
test_key_cache(pagecache, "end of key_cache_write", 1););
#endif
+ BLOCK_INFO(block);
DBUG_RETURN(error);
}
@@ -3321,6 +3236,7 @@ static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
unlink_changed(block);
DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0);
+ DBUG_ASSERT(block->pins > 0);
block->status= 0;
#ifndef DBUG_OFF
block->type= PAGECACHE_EMPTY_PAGE;
@@ -3344,7 +3260,7 @@ static void free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
/* All pending requests for this page must be resubmitted. */
if (block->wqueue[COND_FOR_SAVED].last_thread)
- release_queue(&block->wqueue[COND_FOR_SAVED]);
+ wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
}
@@ -3398,12 +3314,13 @@ static int flush_cached_blocks(PAGECACHE *pagecache,
}
/* if the block is not pinned then it is not write locked */
DBUG_ASSERT((block->status & BLOCK_WRLOCK) == 0);
+ DBUG_ASSERT(block->pins > 0);
#ifndef DBUG_OFF
{
int rc=
#endif
- pagecache_make_lock_and_pin(pagecache, block,
- PAGECACHE_LOCK_WRITE, PAGECACHE_PIN);
+ make_lock_and_pin(pagecache, block,
+ PAGECACHE_LOCK_WRITE, PAGECACHE_PIN);
#ifndef DBUG_OFF
DBUG_ASSERT(rc == 0);
}
@@ -3427,9 +3344,9 @@ static int flush_cached_blocks(PAGECACHE *pagecache,
MYF(MY_NABP | MY_WAIT_IF_FULL));
pagecache_pthread_mutex_lock(&pagecache->cache_lock);
- pagecache_make_lock_and_pin(pagecache, block,
- PAGECACHE_LOCK_WRITE_UNLOCK,
- PAGECACHE_UNPIN);
+ make_lock_and_pin(pagecache, block,
+ PAGECACHE_LOCK_WRITE_UNLOCK,
+ PAGECACHE_UNPIN);
pagecache->global_cache_write++;
if (error)
@@ -3443,7 +3360,7 @@ static int flush_cached_blocks(PAGECACHE *pagecache,
It might happen only during an operation to resize the key cache.
*/
if (block->wqueue[COND_FOR_SAVED].last_thread)
- release_queue(&block->wqueue[COND_FOR_SAVED]);
+ wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
/* type will never be FLUSH_IGNORE_CHANGED here */
if (! (type == FLUSH_KEEP || type == FLUSH_FORCE_WRITE))
{
@@ -3577,6 +3494,7 @@ restart:
if ((error= flush_cached_blocks(pagecache, file, cache,
end,type)))
last_errno=error;
+ DBUG_PRINT("info", ("restarting..."));
/*
Restart the scan as some other thread might have changed
the changed blocks chain: the blocks that were in switch
@@ -3622,7 +3540,7 @@ removes a page from the list of dirty pages, while it's still dirty. A \
{
#ifdef THREAD
struct st_my_thread_var *thread= my_thread_var;
- add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
+ wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
do
{
KEYCACHE_DBUG_PRINT("flush_pagecache_blocks_int: wait",
@@ -3761,7 +3679,7 @@ static int flush_all_key_blocks(PAGECACHE *pagecache)
0 on success (always because it can't fail)
*/
-int reset_key_cache_counters(const char *name, PAGECACHE *key_cache)
+static int reset_key_cache_counters(const char *name, PAGECACHE *key_cache)
{
DBUG_ENTER("reset_key_cache_counters");
if (!key_cache->inited)
diff --git a/mysys/wqueue.c b/mysys/wqueue.c
new file mode 100644
index 00000000000..28e044ff606
--- /dev/null
+++ b/mysys/wqueue.c
@@ -0,0 +1,167 @@
+
+#include <wqueue.h>
+
+#define STRUCT_PTR(TYPE, MEMBER, a) \
+ (TYPE *) ((char *) (a) - offsetof(TYPE, MEMBER))
+/*
+ Link a thread into double-linked queue of waiting threads.
+
+ SYNOPSIS
+ wqueue_link_into_queue()
+ wqueue pointer to the queue structure
+ thread pointer to the thread to be added to the queue
+
+ RETURN VALUE
+ none
+
+ NOTES.
+ Queue is represented by a circular list of the thread structures
+ The list is double-linked of the type (**prev,*next), accessed by
+ a pointer to the last element.
+*/
+
+void wqueue_link_into_queue(WQUEUE *wqueue, struct st_my_thread_var *thread)
+{
+ struct st_my_thread_var *last;
+ if (!(last= wqueue->last_thread))
+ {
+ /* Queue is empty */
+ thread->next= thread;
+ thread->prev= &thread->next;
+ }
+ else
+ {
+ thread->prev= last->next->prev;
+ last->next->prev= &thread->next;
+ thread->next= last->next;
+ last->next= thread;
+ }
+ wqueue->last_thread= thread;
+}
+
+
+/*
+ Add a thread to single-linked queue of waiting threads
+
+ SYNOPSIS
+ wqueue_add_to_queue()
+ wqueue pointer to the queue structure
+ thread pointer to the thread to be added to the queue
+
+ RETURN VALUE
+ none
+
+ NOTES.
+ Queue is represented by a circular list of the thread structures
+ The list is single-linked of the type (*next), accessed by a pointer
+ to the last element.
+*/
+
+void wqueue_add_to_queue(WQUEUE *wqueue, struct st_my_thread_var *thread)
+{
+ struct st_my_thread_var *last;
+ if (!(last= wqueue->last_thread))
+ thread->next= thread;
+ else
+ {
+ thread->next= last->next;
+ last->next= thread;
+ }
+ wqueue->last_thread= thread;
+}
+
+/*
+ Unlink a thread from double-linked queue of waiting threads
+
+ SYNOPSIS
+ wqueue_unlink_from_queue()
+ wqueue pointer to the queue structure
+ thread pointer to the thread to be removed from the queue
+
+ RETURN VALUE
+ none
+
+ NOTES.
+ See NOTES for link_into_queue
+*/
+
+void wqueue_unlink_from_queue(WQUEUE *wqueue, struct st_my_thread_var *thread)
+{
+ if (thread->next == thread)
+ /* The queue contains only one member */
+ wqueue->last_thread= NULL;
+ else
+ {
+ thread->next->prev= thread->prev;
+ *thread->prev= thread->next;
+ if (wqueue->last_thread == thread)
+ wqueue->last_thread= STRUCT_PTR(struct st_my_thread_var, next,
+ thread->prev);
+ }
+ thread->next= NULL;
+}
+
+
+/*
+ Remove all threads from queue signaling them to proceed
+
+ SYNOPSIS
+ wqueue_realease_queue()
+ wqueue pointer to the queue structure
+ thread pointer to the thread to be added to the queue
+
+ RETURN VALUE
+ none
+
+ NOTES.
+ See notes for add_to_queue
+ When removed from the queue each thread is signaled via condition
+ variable thread->suspend.
+*/
+
+void wqueue_release_queue(WQUEUE *wqueue)
+{
+ struct st_my_thread_var *last= wqueue->last_thread;
+ struct st_my_thread_var *next= last->next;
+ struct st_my_thread_var *thread;
+ do
+ {
+ thread= next;
+ pthread_cond_signal(&thread->suspend);
+ next= thread->next;
+ thread->next= NULL;
+ }
+ while (thread != last);
+ wqueue->last_thread= NULL;
+}
+
+
+/*
+ Add thread and wait
+
+ SYNOPSYS
+ wqueue_add_and_wait()
+ wqueue queue to add to
+ thread thread which is waiting
+ lock mutex need for the operation
+*/
+
+void wqueue_add_and_wait(WQUEUE *wqueue,
+ struct st_my_thread_var *thread, pthread_mutex_t *lock)
+{
+ DBUG_ENTER("wqueue_add_and_wait");
+ DBUG_PRINT("enter", ("thread ox%lxcond 0x%lx, mutex 0x%lx",
+ (ulong) thread, (ulong) &thread->suspend, (ulong) lock));
+ wqueue_add_to_queue(wqueue, thread);
+ do
+ {
+ DBUG_PRINT("info", ("wait... cond 0x%lx, mutex 0x%lx",
+ (ulong) &thread->suspend, (ulong) lock));
+ pthread_cond_wait(&thread->suspend, lock);
+ DBUG_PRINT("info", ("wait done cond 0x%lx, mutex 0x%lx, next 0x%lx",
+ (ulong) &thread->suspend, (ulong) lock,
+ (ulong) thread->next));
+ }
+ while (thread->next);
+ DBUG_VOID_RETURN;
+}