diff options
author | istruewing@chilla.local <> | 2006-10-11 17:57:47 +0200 |
---|---|---|
committer | istruewing@chilla.local <> | 2006-10-11 17:57:47 +0200 |
commit | 35c94b6e2dd23458db7f3aa3da0d613493c6bb37 (patch) | |
tree | 1c500053f455af38dba9c498ee43b3b219adde3a /mysys | |
parent | 7683b297cfa3aa0227bb7d851328f85958fa8a67 (diff) | |
parent | 014c1c885e5f99238c321a39d8db75e7ded0b963 (diff) | |
download | mariadb-git-35c94b6e2dd23458db7f3aa3da0d613493c6bb37.tar.gz |
Merge bk-internal.mysql.com:/home/bk/mysql-5.0-engines
into chilla.local:/home/mydev/mysql-5.0-bug8283
Diffstat (limited to 'mysys')
-rw-r--r-- | mysys/mf_iocache.c | 582 |
1 files changed, 500 insertions, 82 deletions
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index cd2a140182e..b1cf940d70d 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -70,7 +70,6 @@ static void my_aiowait(my_aio_result *result); #define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1)) #define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1)) - /* Setup internal pointers inside IO_CACHE @@ -502,65 +501,366 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) DBUG_RETURN(0); } + #ifdef THREAD -/* Prepare IO_CACHE for shared use */ -void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads) +/* + Prepare IO_CACHE for shared use. + + SYNOPSIS + init_io_cache_share() + read_cache A read cache. This will be copied for + every thread after setup. + cshare The share. + write_cache If non-NULL a write cache that is to be + synchronized with the read caches. + num_threads Number of threads sharing the cache + including the write thread if any. + + DESCRIPTION + + The shared cache is used so: One IO_CACHE is initialized with + init_io_cache(). This includes the allocation of a buffer. Then a + share is allocated and init_io_cache_share() is called with the io + cache and the share. Then the io cache is copied for each thread. So + every thread has its own copy of IO_CACHE. But the allocated buffer + is shared because cache->buffer is the same for all caches. + + One thread reads data from the file into the buffer. All threads + read from the buffer, but every thread maintains its own set of + pointers into the buffer. When all threads have used up the buffer + contents, one of the threads reads the next block of data into the + buffer. To accomplish this, each thread enters the cache lock before + accessing the buffer. They wait in lock_io_cache() until all threads + joined the lock. The last thread entering the lock is in charge of + reading from file to buffer. It wakes all threads when done. + + Synchronizing a write cache to the read caches works so: Whenever + the write buffer needs a flush, the write thread enters the lock and + waits for all other threads to enter the lock too. They do this when + they have used up the read buffer. When all threads are in the lock, + the write thread copies the write buffer to the read buffer and + wakes all threads. + + share->running_threads is the number of threads not being in the + cache lock. When entering lock_io_cache() the number is decreased. + When the thread that fills the buffer enters unlock_io_cache() the + number is reset to the number of threads. The condition + running_threads == 0 means that all threads are in the lock. Bumping + up the number to the full count is non-intuitive. But increasing the + number by one for each thread that leaves the lock could lead to a + solo run of one thread. The last thread to join a lock reads from + file to buffer, wakes the other threads, processes the data in the + cache and enters the lock again. If no other thread left the lock + meanwhile, it would think it's the last one again and read the next + block... + + The share has copies of 'error', 'buffer', 'read_end', and + 'pos_in_file' from the thread that filled the buffer. We may not be + able to access this information directly from its cache because the + thread may be removed from the share before the variables could be + copied by all other threads. Or, if a write buffer is synchronized, + it would change its 'pos_in_file' after waking the other threads, + possibly before they could copy its value. + + However, the 'buffer' variable in the share is for a synchronized + write cache. It needs to know where to put the data. Otherwise it + would need access to the read cache of one of the threads that is + not yet removed from the share. + + RETURN + void +*/ + +void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare, + IO_CACHE *write_cache, uint num_threads) { - DBUG_ASSERT(info->type == READ_CACHE); - pthread_mutex_init(&s->mutex, MY_MUTEX_INIT_FAST); - pthread_cond_init (&s->cond, 0); - s->total=s->count=num_threads-1; - s->active=0; - info->share=s; - info->read_function=_my_b_read_r; - info->current_pos= info->current_end= 0; + DBUG_ENTER("init_io_cache_share"); + DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx share: 0x%lx " + "write_cache: 0x%lx threads: %u", + read_cache, cshare, write_cache, num_threads)); + + DBUG_ASSERT(num_threads > 1); + DBUG_ASSERT(read_cache->type == READ_CACHE); + DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE)); + + pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST); + pthread_cond_init(&cshare->cond, 0); + pthread_cond_init(&cshare->cond_writer, 0); + + cshare->running_threads= num_threads; + cshare->total_threads= num_threads; + cshare->error= 0; /* Initialize. */ + cshare->buffer= read_cache->buffer; + cshare->read_end= NULL; /* See function comment of lock_io_cache(). */ + cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */ + cshare->source_cache= write_cache; /* Can be NULL. */ + + read_cache->share= cshare; + read_cache->read_function= _my_b_read_r; + read_cache->current_pos= NULL; + read_cache->current_end= NULL; + + if (write_cache) + write_cache->share= cshare; + + DBUG_VOID_RETURN; } + /* - Remove a thread from shared access to IO_CACHE - Every thread should do that on exit for not - to deadlock other threads + Remove a thread from shared access to IO_CACHE. + + SYNOPSIS + remove_io_thread() + cache The IO_CACHE to be removed from the share. + + NOTE + + Every thread must do that on exit for not to deadlock other threads. + + The last thread destroys the pthread resources. + + A writer flushes its cache first. + + RETURN + void */ -void remove_io_thread(IO_CACHE *info) + +void remove_io_thread(IO_CACHE *cache) { - IO_CACHE_SHARE *s=info->share; + IO_CACHE_SHARE *cshare= cache->share; + uint total; + DBUG_ENTER("remove_io_thread"); + + /* If the writer goes, it needs to flush the write cache. */ + if (cache == cshare->source_cache) + flush_io_cache(cache); - pthread_mutex_lock(&s->mutex); - s->total--; - if (! s->count--) - pthread_cond_signal(&s->cond); - pthread_mutex_unlock(&s->mutex); + pthread_mutex_lock(&cshare->mutex); + DBUG_PRINT("io_cache_share", ("%s: 0x%lx", + (cache == cshare->source_cache) ? + "writer" : "reader", cache)); + + /* Remove from share. */ + total= --cshare->total_threads; + DBUG_PRINT("io_cache_share", ("remaining threads: %u", total)); + + /* Detach from share. */ + cache->share= NULL; + + /* If the writer goes, let the readers know. */ + if (cache == cshare->source_cache) + { + DBUG_PRINT("io_cache_share", ("writer leaves")); + cshare->source_cache= NULL; + } + + /* If all threads are waiting for me to join the lock, wake them. */ + if (!--cshare->running_threads) + { + DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all")); + pthread_cond_signal(&cshare->cond_writer); + pthread_cond_broadcast(&cshare->cond); + } + + pthread_mutex_unlock(&cshare->mutex); + + if (!total) + { + DBUG_PRINT("io_cache_share", ("last thread removed, destroy share")); + pthread_cond_destroy (&cshare->cond_writer); + pthread_cond_destroy (&cshare->cond); + pthread_mutex_destroy(&cshare->mutex); + } + + DBUG_VOID_RETURN; } -static int lock_io_cache(IO_CACHE *info, my_off_t pos) -{ - int total; - IO_CACHE_SHARE *s=info->share; - pthread_mutex_lock(&s->mutex); - if (!s->count) +/* + Lock IO cache and wait for all other threads to join. + + SYNOPSIS + lock_io_cache() + cache The cache of the thread entering the lock. + pos File position of the block to read. + Unused for the write thread. + + DESCRIPTION + + Wait for all threads to finish with the current buffer. We want + all threads to proceed in concert. The last thread to join + lock_io_cache() will read the block from file and all threads start + to use it. Then they will join again for reading the next block. + + The waiting threads detect a fresh buffer by comparing + cshare->pos_in_file with the position they want to process next. + Since the first block may start at position 0, we take + cshare->read_end as an additional condition. This variable is + initialized to NULL and will be set after a block of data is written + to the buffer. + + RETURN + 1 OK, lock in place, go ahead and read. + 0 OK, unlocked, another thread did the read. +*/ + +static int lock_io_cache(IO_CACHE *cache, my_off_t pos) +{ + IO_CACHE_SHARE *cshare= cache->share; + DBUG_ENTER("lock_io_cache"); + + /* Enter the lock. */ + pthread_mutex_lock(&cshare->mutex); + cshare->running_threads--; + DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u", + (cache == cshare->source_cache) ? + "writer" : "reader", cache, (ulong) pos, + cshare->running_threads)); + + if (cshare->source_cache) { - s->count=s->total; - return 1; + /* A write cache is synchronized to the read caches. */ + + if (cache == cshare->source_cache) + { + /* The writer waits until all readers are here. */ + while (cshare->running_threads) + { + DBUG_PRINT("io_cache_share", ("writer waits in lock")); + pthread_cond_wait(&cshare->cond_writer, &cshare->mutex); + } + DBUG_PRINT("io_cache_share", ("writer awoke, going to copy")); + + /* Stay locked. Leave the lock later by unlock_io_cache(). */ + DBUG_RETURN(1); + } + + /* The last thread wakes the writer. */ + if (!cshare->running_threads) + { + DBUG_PRINT("io_cache_share", ("waking writer")); + pthread_cond_signal(&cshare->cond_writer); + } + + /* + Readers wait until the data is copied from the writer. Another + reason to stop waiting is the removal of the write thread. If this + happens, we leave the lock with old data in the buffer. + */ + while ((!cshare->read_end || (cshare->pos_in_file < pos)) && + cshare->source_cache) + { + DBUG_PRINT("io_cache_share", ("reader waits in lock")); + pthread_cond_wait(&cshare->cond, &cshare->mutex); + } + + /* + If the writer was removed from the share while this thread was + asleep, we need to simulate an EOF condition. The writer cannot + reset the share variables as they might still be in use by readers + of the last block. When we awake here then because the last + joining thread signalled us. If the writer is not the last, it + will not signal. So it is safe to clear the buffer here. + */ + if (!cshare->read_end || (cshare->pos_in_file < pos)) + { + DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF")); + cshare->read_end= cshare->buffer; /* Empty buffer. */ + cshare->error= 0; /* EOF is not an error. */ + } } + else + { + /* + There are read caches only. The last thread arriving in + lock_io_cache() continues with a locked cache and reads the block. + */ + if (!cshare->running_threads) + { + DBUG_PRINT("io_cache_share", ("last thread joined, going to read")); + /* Stay locked. Leave the lock later by unlock_io_cache(). */ + DBUG_RETURN(1); + } - total=s->total; - s->count--; - while (!s->active || s->active->pos_in_file < pos) - pthread_cond_wait(&s->cond, &s->mutex); + /* + All other threads wait until the requested block is read by the + last thread arriving. Another reason to stop waiting is the + removal of a thread. If this leads to all threads being in the + lock, we have to continue also. The first of the awaken threads + will then do the read. + */ + while ((!cshare->read_end || (cshare->pos_in_file < pos)) && + cshare->running_threads) + { + DBUG_PRINT("io_cache_share", ("reader waits in lock")); + pthread_cond_wait(&cshare->cond, &cshare->mutex); + } - if (s->total < total && - (!s->active || s->active->pos_in_file < pos)) - return 1; + /* If the block is not yet read, continue with a locked cache and read. */ + if (!cshare->read_end || (cshare->pos_in_file < pos)) + { + DBUG_PRINT("io_cache_share", ("reader awoke, going to read")); + /* Stay locked. Leave the lock later by unlock_io_cache(). */ + DBUG_RETURN(1); + } - pthread_mutex_unlock(&s->mutex); - return 0; + /* Another thread did read the block already. */ + } + DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes", + cshare->read_end ? (uint) + (cshare->read_end - cshare->buffer) : 0)); + + /* + Leave the lock. Do not call unlock_io_cache() later. The thread that + filled the buffer did this and marked all threads as running. + */ + pthread_mutex_unlock(&cshare->mutex); + DBUG_RETURN(0); } -static void unlock_io_cache(IO_CACHE *info) + +/* + Unlock IO cache. + + SYNOPSIS + unlock_io_cache() + cache The cache of the thread leaving the lock. + + NOTE + This is called by the thread that filled the buffer. It marks all + threads as running and awakes them. This must not be done by any + other thread. + + Do not signal cond_writer. Either there is no writer or the writer + is the only one who can call this function. + + The reason for resetting running_threads to total_threads before + waking all other threads is that it could be possible that this + thread is so fast with processing the buffer that it enters the lock + before even one other thread has left it. If every awoken thread + would increase running_threads by one, this thread could think that + he is again the last to join and would not wait for the other + threads to process the data. + + RETURN + void +*/ + +static void unlock_io_cache(IO_CACHE *cache) { - pthread_cond_broadcast(&info->share->cond); - pthread_mutex_unlock(&info->share->mutex); + IO_CACHE_SHARE *cshare= cache->share; + DBUG_ENTER("unlock_io_cache"); + DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u", + (cache == cshare->source_cache) ? + "writer" : "reader", + cache, (ulong) cshare->pos_in_file, + cshare->total_threads)); + + cshare->running_threads= cshare->total_threads; + pthread_cond_broadcast(&cshare->cond); + pthread_mutex_unlock(&cshare->mutex); + DBUG_VOID_RETURN; } @@ -569,7 +869,7 @@ static void unlock_io_cache(IO_CACHE *info) SYNOPSIS _my_b_read_r() - info IO_CACHE pointer + cache IO_CACHE pointer Buffer Buffer to retrieve count bytes from file Count Number of bytes to read into Buffer @@ -581,7 +881,7 @@ static void unlock_io_cache(IO_CACHE *info) It works as follows: when a thread tries to read from a file (that is, after using all the data from the (shared) buffer), it just - hangs on lock_io_cache(), wating for other threads. When the very + hangs on lock_io_cache(), waiting for other threads. When the very last thread attempts a read, lock_io_cache() returns 1, the thread does actual IO and unlock_io_cache(), which signals all the waiting threads that data is in the buffer. @@ -601,16 +901,17 @@ static void unlock_io_cache(IO_CACHE *info) 1 Error: can't read requested characters */ -int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) +int _my_b_read_r(register IO_CACHE *cache, byte *Buffer, uint Count) { my_off_t pos_in_file; uint length, diff_length, left_length; + IO_CACHE_SHARE *cshare= cache->share; DBUG_ENTER("_my_b_read_r"); - if ((left_length= (uint) (info->read_end - info->read_pos))) + if ((left_length= (uint) (cache->read_end - cache->read_pos))) { DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */ - memcpy(Buffer, info->read_pos, (size_t) (left_length)); + memcpy(Buffer, cache->read_pos, (size_t) (left_length)); Buffer+= left_length; Count-= left_length; } @@ -618,55 +919,133 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) { int cnt, len; - pos_in_file= info->pos_in_file + (info->read_end - info->buffer); + pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer); diff_length= (uint) (pos_in_file & (IO_SIZE-1)); length=IO_ROUND_UP(Count+diff_length)-diff_length; - length=(length <= info->read_length) ? - length + IO_ROUND_DN(info->read_length - length) : - length - IO_ROUND_UP(length - info->read_length) ; - if (info->type != READ_FIFO && - (length > (info->end_of_file - pos_in_file))) - length= (uint) (info->end_of_file - pos_in_file); + length= ((length <= cache->read_length) ? + length + IO_ROUND_DN(cache->read_length - length) : + length - IO_ROUND_UP(length - cache->read_length)); + if (cache->type != READ_FIFO && + (length > (cache->end_of_file - pos_in_file))) + length= (uint) (cache->end_of_file - pos_in_file); if (length == 0) { - info->error= (int) left_length; + cache->error= (int) left_length; DBUG_RETURN(1); } - if (lock_io_cache(info, pos_in_file)) + if (lock_io_cache(cache, pos_in_file)) { - info->share->active=info; - if (info->seek_not_done) /* File touched, do seek */ - VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); - len=(int)my_read(info->file,info->buffer, length, info->myflags); - info->read_end=info->buffer + (len == -1 ? 0 : len); - info->error=(len == (int)length ? 0 : len); - info->pos_in_file=pos_in_file; - unlock_io_cache(info); + /* With a synchronized write/read cache we won't come here... */ + DBUG_ASSERT(!cshare->source_cache); + /* + ... unless the writer has gone before this thread entered the + lock. Simulate EOF in this case. It can be distinguished by + cache->file. + */ + if (cache->file < 0) + len= 0; + else + { + if (cache->seek_not_done) /* File touched, do seek */ + VOID(my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))); + len= (int) my_read(cache->file, cache->buffer, length, cache->myflags); + } + DBUG_PRINT("io_cache_share", ("read %d bytes", len)); + + cache->read_end= cache->buffer + (len == -1 ? 0 : len); + cache->error= (len == (int)length ? 0 : len); + cache->pos_in_file= pos_in_file; + + /* Copy important values to the share. */ + cshare->error= cache->error; + cshare->read_end= cache->read_end; + cshare->pos_in_file= pos_in_file; + + /* Mark all threads as running and wake them. */ + unlock_io_cache(cache); } else { - info->error= info->share->active->error; - info->read_end= info->share->active->read_end; - info->pos_in_file= info->share->active->pos_in_file; - len= (int) (info->error == -1 ? -1 : info->read_end-info->buffer); + /* + With a synchronized write/read cache readers always come here. + Copy important values from the share. + */ + cache->error= cshare->error; + cache->read_end= cshare->read_end; + cache->pos_in_file= cshare->pos_in_file; + + len= (int) ((cache->error == -1) ? -1 : cache->read_end - cache->buffer); } - info->read_pos=info->buffer; - info->seek_not_done=0; + cache->read_pos= cache->buffer; + cache->seek_not_done= 0; if (len <= 0) { - info->error= (int) left_length; + DBUG_PRINT("io_cache_share", ("reader error. len %d left %u", + len, left_length)); + cache->error= (int) left_length; DBUG_RETURN(1); } cnt= ((uint) len > Count) ? (int) Count : len; - memcpy(Buffer, info->read_pos, (size_t) cnt); + memcpy(Buffer, cache->read_pos, (size_t) cnt); Count -= cnt; Buffer+= cnt; left_length+= cnt; - info->read_pos+= cnt; + cache->read_pos+= cnt; } DBUG_RETURN(0); } -#endif + + +/* + Copy data from write cache to read cache. + + SYNOPSIS + copy_to_read_buffer() + write_cache The write cache. + write_buffer The source of data, mostly the cache buffer. + write_length The number of bytes to copy. + + NOTE + The write thread will wait for all read threads to join the cache + lock. Then it copies the data over and wakes the read threads. + + RETURN + void +*/ + +static void copy_to_read_buffer(IO_CACHE *write_cache, + const byte *write_buffer, uint write_length) +{ + IO_CACHE_SHARE *cshare= write_cache->share; + + DBUG_ASSERT(cshare->source_cache == write_cache); + /* + write_length is usually less or equal to buffer_length. + It can be bigger if _my_b_write() is called with a big length. + */ + while (write_length) + { + uint copy_length= min(write_length, write_cache->buffer_length); + int __attribute__((unused)) rc; + + rc= lock_io_cache(write_cache, write_cache->pos_in_file); + /* The writing thread does always have the lock when it awakes. */ + DBUG_ASSERT(rc); + + memcpy(cshare->buffer, write_buffer, copy_length); + + cshare->error= 0; + cshare->read_end= cshare->buffer + copy_length; + cshare->pos_in_file= write_cache->pos_in_file; + + /* Mark all threads as running and wake them. */ + unlock_io_cache(write_cache); + + write_buffer+= copy_length; + write_length-= copy_length; + } +} +#endif /*THREAD*/ /* @@ -1018,6 +1397,7 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) Buffer+=rest_length; Count-=rest_length; info->write_pos+=rest_length; + if (my_b_flush_io_cache(info,1)) return 1; if (Count >= IO_SIZE) @@ -1030,6 +1410,23 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) } if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP)) return info->error= -1; + +#ifdef THREAD + /* + In case of a shared I/O cache with a writer we normally do direct + write cache to read cache copy. Simulate this here by direct + caller buffer to read cache copy. Do it after the write so that + the cache readers actions on the flushed part can go in parallel + with the write of the extra stuff. copy_to_read_buffer() + synchronizes writer and readers so that after this call the + readers can act on the extra stuff while the writer can go ahead + and prepare the next output. copy_to_read_buffer() relies on + info->pos_in_file. + */ + if (info->share) + copy_to_read_buffer(info, Buffer, length); +#endif + Count-=length; Buffer+=length; info->pos_in_file+=length; @@ -1050,6 +1447,14 @@ int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count) { uint rest_length,length; +#ifdef THREAD + /* + Assert that we cannot come here with a shared cache. If we do one + day, we might need to add a call to copy_to_read_buffer(). + */ + DBUG_ASSERT(!info->share); +#endif + lock_append_buffer(info); rest_length=(uint) (info->write_end - info->write_pos); if (Count <= rest_length) @@ -1110,6 +1515,14 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count, uint length; int error=0; +#ifdef THREAD + /* + Assert that we cannot come here with a shared cache. If we do one + day, we might need to add a call to copy_to_read_buffer(). + */ + DBUG_ASSERT(!info->share); +#endif + if (pos < info->pos_in_file) { /* Of no overlap, write everything without buffering */ @@ -1186,6 +1599,17 @@ int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) if ((length=(uint) (info->write_pos - info->write_buffer))) { +#ifdef THREAD + /* + In case of a shared I/O cache with a writer we do direct write + cache to read cache copy. Do it before the write here so that + the readers can work in parallel with the write. + copy_to_read_buffer() relies on info->pos_in_file. + */ + if (info->share) + copy_to_read_buffer(info, info->write_buffer, length); +#endif + pos_in_file=info->pos_in_file; /* If we have append cache, we always open the file with @@ -1265,16 +1689,10 @@ int end_io_cache(IO_CACHE *info) #ifdef THREAD /* - if IO_CACHE is shared between several threads, only one - thread needs to call end_io_cache() - just as init_io_cache() - should be called only once and then memcopy'ed + Every thread must call remove_io_thread(). The last one destroys + the share elements. */ - if (info->share) - { - pthread_cond_destroy(&info->share->cond); - pthread_mutex_destroy(&info->share->mutex); - info->share=0; - } + DBUG_ASSERT(!info->share || !info->share->total_threads); #endif if ((pre_close=info->pre_close)) |