summaryrefslogtreecommitdiff
path: root/mysys
diff options
context:
space:
mode:
authorunknown <tsmith/tim@siva.hindu.god>2006-10-24 14:42:08 -0600
committerunknown <tsmith/tim@siva.hindu.god>2006-10-24 14:42:08 -0600
commitd4bfae38ac685d1f277fcc1b42ac003f98d8c940 (patch)
treef6b3ce247f0c1b8d045348dca4533d282d470935 /mysys
parent752e5f1ec77f79009804a6ed316843b184b46394 (diff)
parent69c7200219dd2010de7ca33d89432aff4fbd9e3a (diff)
downloadmariadb-git-d4bfae38ac685d1f277fcc1b42ac003f98d8c940.tar.gz
Merge siva.hindu.god:/usr/home/tim/m/bk/g50
into siva.hindu.god:/usr/home/tim/m/bk/50 configure.in: Auto merged mysql-test/r/rename.result: Auto merged mysql-test/r/subselect.result: Auto merged mysql-test/r/view.result: Auto merged mysql-test/t/subselect.test: Auto merged mysql-test/t/view.test: Auto merged mysys/mf_iocache.c: Auto merged sql/item_func.cc: Auto merged sql/opt_range.cc: Auto merged sql/opt_range.h: Auto merged sql/sql_lex.cc: Auto merged sql/sql_lex.h: Auto merged sql/sql_select.cc: Auto merged sql/sql_table.cc: Auto merged sql/table.cc: Auto merged myisam/sort.c: Manual merge mysql-test/r/innodb_mysql.result: Manual merge mysql-test/t/innodb_mysql.test: Manual merge
Diffstat (limited to 'mysys')
-rw-r--r--mysys/mf_iocache.c582
1 files changed, 500 insertions, 82 deletions
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index eb8b992c601..4559c0c42b0 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -70,7 +70,6 @@ static void my_aiowait(my_aio_result *result);
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
-
/*
Setup internal pointers inside IO_CACHE
@@ -506,65 +505,366 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
DBUG_RETURN(0);
}
+
#ifdef THREAD
-/* Prepare IO_CACHE for shared use */
-void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads)
+/*
+ Prepare IO_CACHE for shared use.
+
+ SYNOPSIS
+ init_io_cache_share()
+ read_cache A read cache. This will be copied for
+ every thread after setup.
+ cshare The share.
+ write_cache If non-NULL a write cache that is to be
+ synchronized with the read caches.
+ num_threads Number of threads sharing the cache
+ including the write thread if any.
+
+ DESCRIPTION
+
+ The shared cache is used so: One IO_CACHE is initialized with
+ init_io_cache(). This includes the allocation of a buffer. Then a
+ share is allocated and init_io_cache_share() is called with the io
+ cache and the share. Then the io cache is copied for each thread. So
+ every thread has its own copy of IO_CACHE. But the allocated buffer
+ is shared because cache->buffer is the same for all caches.
+
+ One thread reads data from the file into the buffer. All threads
+ read from the buffer, but every thread maintains its own set of
+ pointers into the buffer. When all threads have used up the buffer
+ contents, one of the threads reads the next block of data into the
+ buffer. To accomplish this, each thread enters the cache lock before
+ accessing the buffer. They wait in lock_io_cache() until all threads
+ joined the lock. The last thread entering the lock is in charge of
+ reading from file to buffer. It wakes all threads when done.
+
+ Synchronizing a write cache to the read caches works so: Whenever
+ the write buffer needs a flush, the write thread enters the lock and
+ waits for all other threads to enter the lock too. They do this when
+ they have used up the read buffer. When all threads are in the lock,
+ the write thread copies the write buffer to the read buffer and
+ wakes all threads.
+
+ share->running_threads is the number of threads not being in the
+ cache lock. When entering lock_io_cache() the number is decreased.
+ When the thread that fills the buffer enters unlock_io_cache() the
+ number is reset to the number of threads. The condition
+ running_threads == 0 means that all threads are in the lock. Bumping
+ up the number to the full count is non-intuitive. But increasing the
+ number by one for each thread that leaves the lock could lead to a
+ solo run of one thread. The last thread to join a lock reads from
+ file to buffer, wakes the other threads, processes the data in the
+ cache and enters the lock again. If no other thread left the lock
+ meanwhile, it would think it's the last one again and read the next
+ block...
+
+ The share has copies of 'error', 'buffer', 'read_end', and
+ 'pos_in_file' from the thread that filled the buffer. We may not be
+ able to access this information directly from its cache because the
+ thread may be removed from the share before the variables could be
+ copied by all other threads. Or, if a write buffer is synchronized,
+ it would change its 'pos_in_file' after waking the other threads,
+ possibly before they could copy its value.
+
+ However, the 'buffer' variable in the share is for a synchronized
+ write cache. It needs to know where to put the data. Otherwise it
+ would need access to the read cache of one of the threads that is
+ not yet removed from the share.
+
+ RETURN
+ void
+*/
+
+void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
+ IO_CACHE *write_cache, uint num_threads)
{
- DBUG_ASSERT(info->type == READ_CACHE);
- pthread_mutex_init(&s->mutex, MY_MUTEX_INIT_FAST);
- pthread_cond_init (&s->cond, 0);
- s->total=s->count=num_threads-1;
- s->active=0;
- info->share=s;
- info->read_function=_my_b_read_r;
- info->current_pos= info->current_end= 0;
+ DBUG_ENTER("init_io_cache_share");
+ DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx share: 0x%lx "
+ "write_cache: 0x%lx threads: %u",
+ read_cache, cshare, write_cache, num_threads));
+
+ DBUG_ASSERT(num_threads > 1);
+ DBUG_ASSERT(read_cache->type == READ_CACHE);
+ DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
+
+ pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&cshare->cond, 0);
+ pthread_cond_init(&cshare->cond_writer, 0);
+
+ cshare->running_threads= num_threads;
+ cshare->total_threads= num_threads;
+ cshare->error= 0; /* Initialize. */
+ cshare->buffer= read_cache->buffer;
+ cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
+ cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
+ cshare->source_cache= write_cache; /* Can be NULL. */
+
+ read_cache->share= cshare;
+ read_cache->read_function= _my_b_read_r;
+ read_cache->current_pos= NULL;
+ read_cache->current_end= NULL;
+
+ if (write_cache)
+ write_cache->share= cshare;
+
+ DBUG_VOID_RETURN;
}
+
/*
- Remove a thread from shared access to IO_CACHE
- Every thread should do that on exit for not
- to deadlock other threads
+ Remove a thread from shared access to IO_CACHE.
+
+ SYNOPSIS
+ remove_io_thread()
+ cache The IO_CACHE to be removed from the share.
+
+ NOTE
+
+ Every thread must do that on exit for not to deadlock other threads.
+
+ The last thread destroys the pthread resources.
+
+ A writer flushes its cache first.
+
+ RETURN
+ void
*/
-void remove_io_thread(IO_CACHE *info)
+
+void remove_io_thread(IO_CACHE *cache)
{
- IO_CACHE_SHARE *s=info->share;
+ IO_CACHE_SHARE *cshare= cache->share;
+ uint total;
+ DBUG_ENTER("remove_io_thread");
+
+ /* If the writer goes, it needs to flush the write cache. */
+ if (cache == cshare->source_cache)
+ flush_io_cache(cache);
- pthread_mutex_lock(&s->mutex);
- s->total--;
- if (! s->count--)
- pthread_cond_signal(&s->cond);
- pthread_mutex_unlock(&s->mutex);
+ pthread_mutex_lock(&cshare->mutex);
+ DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
+ (cache == cshare->source_cache) ?
+ "writer" : "reader", cache));
+
+ /* Remove from share. */
+ total= --cshare->total_threads;
+ DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
+
+ /* Detach from share. */
+ cache->share= NULL;
+
+ /* If the writer goes, let the readers know. */
+ if (cache == cshare->source_cache)
+ {
+ DBUG_PRINT("io_cache_share", ("writer leaves"));
+ cshare->source_cache= NULL;
+ }
+
+ /* If all threads are waiting for me to join the lock, wake them. */
+ if (!--cshare->running_threads)
+ {
+ DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
+ pthread_cond_signal(&cshare->cond_writer);
+ pthread_cond_broadcast(&cshare->cond);
+ }
+
+ pthread_mutex_unlock(&cshare->mutex);
+
+ if (!total)
+ {
+ DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
+ pthread_cond_destroy (&cshare->cond_writer);
+ pthread_cond_destroy (&cshare->cond);
+ pthread_mutex_destroy(&cshare->mutex);
+ }
+
+ DBUG_VOID_RETURN;
}
-static int lock_io_cache(IO_CACHE *info, my_off_t pos)
-{
- int total;
- IO_CACHE_SHARE *s=info->share;
- pthread_mutex_lock(&s->mutex);
- if (!s->count)
+/*
+ Lock IO cache and wait for all other threads to join.
+
+ SYNOPSIS
+ lock_io_cache()
+ cache The cache of the thread entering the lock.
+ pos File position of the block to read.
+ Unused for the write thread.
+
+ DESCRIPTION
+
+ Wait for all threads to finish with the current buffer. We want
+ all threads to proceed in concert. The last thread to join
+ lock_io_cache() will read the block from file and all threads start
+ to use it. Then they will join again for reading the next block.
+
+ The waiting threads detect a fresh buffer by comparing
+ cshare->pos_in_file with the position they want to process next.
+ Since the first block may start at position 0, we take
+ cshare->read_end as an additional condition. This variable is
+ initialized to NULL and will be set after a block of data is written
+ to the buffer.
+
+ RETURN
+ 1 OK, lock in place, go ahead and read.
+ 0 OK, unlocked, another thread did the read.
+*/
+
+static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
+{
+ IO_CACHE_SHARE *cshare= cache->share;
+ DBUG_ENTER("lock_io_cache");
+
+ /* Enter the lock. */
+ pthread_mutex_lock(&cshare->mutex);
+ cshare->running_threads--;
+ DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
+ (cache == cshare->source_cache) ?
+ "writer" : "reader", cache, (ulong) pos,
+ cshare->running_threads));
+
+ if (cshare->source_cache)
{
- s->count=s->total;
- return 1;
+ /* A write cache is synchronized to the read caches. */
+
+ if (cache == cshare->source_cache)
+ {
+ /* The writer waits until all readers are here. */
+ while (cshare->running_threads)
+ {
+ DBUG_PRINT("io_cache_share", ("writer waits in lock"));
+ pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
+ }
+ DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
+
+ /* Stay locked. Leave the lock later by unlock_io_cache(). */
+ DBUG_RETURN(1);
+ }
+
+ /* The last thread wakes the writer. */
+ if (!cshare->running_threads)
+ {
+ DBUG_PRINT("io_cache_share", ("waking writer"));
+ pthread_cond_signal(&cshare->cond_writer);
+ }
+
+ /*
+ Readers wait until the data is copied from the writer. Another
+ reason to stop waiting is the removal of the write thread. If this
+ happens, we leave the lock with old data in the buffer.
+ */
+ while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
+ cshare->source_cache)
+ {
+ DBUG_PRINT("io_cache_share", ("reader waits in lock"));
+ pthread_cond_wait(&cshare->cond, &cshare->mutex);
+ }
+
+ /*
+ If the writer was removed from the share while this thread was
+ asleep, we need to simulate an EOF condition. The writer cannot
+ reset the share variables as they might still be in use by readers
+ of the last block. When we awake here then because the last
+ joining thread signalled us. If the writer is not the last, it
+ will not signal. So it is safe to clear the buffer here.
+ */
+ if (!cshare->read_end || (cshare->pos_in_file < pos))
+ {
+ DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
+ cshare->read_end= cshare->buffer; /* Empty buffer. */
+ cshare->error= 0; /* EOF is not an error. */
+ }
}
+ else
+ {
+ /*
+ There are read caches only. The last thread arriving in
+ lock_io_cache() continues with a locked cache and reads the block.
+ */
+ if (!cshare->running_threads)
+ {
+ DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
+ /* Stay locked. Leave the lock later by unlock_io_cache(). */
+ DBUG_RETURN(1);
+ }
- total=s->total;
- s->count--;
- while (!s->active || s->active->pos_in_file < pos)
- pthread_cond_wait(&s->cond, &s->mutex);
+ /*
+ All other threads wait until the requested block is read by the
+ last thread arriving. Another reason to stop waiting is the
+ removal of a thread. If this leads to all threads being in the
+ lock, we have to continue also. The first of the awaken threads
+ will then do the read.
+ */
+ while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
+ cshare->running_threads)
+ {
+ DBUG_PRINT("io_cache_share", ("reader waits in lock"));
+ pthread_cond_wait(&cshare->cond, &cshare->mutex);
+ }
- if (s->total < total &&
- (!s->active || s->active->pos_in_file < pos))
- return 1;
+ /* If the block is not yet read, continue with a locked cache and read. */
+ if (!cshare->read_end || (cshare->pos_in_file < pos))
+ {
+ DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
+ /* Stay locked. Leave the lock later by unlock_io_cache(). */
+ DBUG_RETURN(1);
+ }
- pthread_mutex_unlock(&s->mutex);
- return 0;
+ /* Another thread did read the block already. */
+ }
+ DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
+ cshare->read_end ? (uint)
+ (cshare->read_end - cshare->buffer) : 0));
+
+ /*
+ Leave the lock. Do not call unlock_io_cache() later. The thread that
+ filled the buffer did this and marked all threads as running.
+ */
+ pthread_mutex_unlock(&cshare->mutex);
+ DBUG_RETURN(0);
}
-static void unlock_io_cache(IO_CACHE *info)
+
+/*
+ Unlock IO cache.
+
+ SYNOPSIS
+ unlock_io_cache()
+ cache The cache of the thread leaving the lock.
+
+ NOTE
+ This is called by the thread that filled the buffer. It marks all
+ threads as running and awakes them. This must not be done by any
+ other thread.
+
+ Do not signal cond_writer. Either there is no writer or the writer
+ is the only one who can call this function.
+
+ The reason for resetting running_threads to total_threads before
+ waking all other threads is that it could be possible that this
+ thread is so fast with processing the buffer that it enters the lock
+ before even one other thread has left it. If every awoken thread
+ would increase running_threads by one, this thread could think that
+ he is again the last to join and would not wait for the other
+ threads to process the data.
+
+ RETURN
+ void
+*/
+
+static void unlock_io_cache(IO_CACHE *cache)
{
- pthread_cond_broadcast(&info->share->cond);
- pthread_mutex_unlock(&info->share->mutex);
+ IO_CACHE_SHARE *cshare= cache->share;
+ DBUG_ENTER("unlock_io_cache");
+ DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
+ (cache == cshare->source_cache) ?
+ "writer" : "reader",
+ cache, (ulong) cshare->pos_in_file,
+ cshare->total_threads));
+
+ cshare->running_threads= cshare->total_threads;
+ pthread_cond_broadcast(&cshare->cond);
+ pthread_mutex_unlock(&cshare->mutex);
+ DBUG_VOID_RETURN;
}
@@ -573,7 +873,7 @@ static void unlock_io_cache(IO_CACHE *info)
SYNOPSIS
_my_b_read_r()
- info IO_CACHE pointer
+ cache IO_CACHE pointer
Buffer Buffer to retrieve count bytes from file
Count Number of bytes to read into Buffer
@@ -585,7 +885,7 @@ static void unlock_io_cache(IO_CACHE *info)
It works as follows: when a thread tries to read from a file (that
is, after using all the data from the (shared) buffer), it just
- hangs on lock_io_cache(), wating for other threads. When the very
+ hangs on lock_io_cache(), waiting for other threads. When the very
last thread attempts a read, lock_io_cache() returns 1, the thread
does actual IO and unlock_io_cache(), which signals all the waiting
threads that data is in the buffer.
@@ -605,16 +905,17 @@ static void unlock_io_cache(IO_CACHE *info)
1 Error: can't read requested characters
*/
-int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
+int _my_b_read_r(register IO_CACHE *cache, byte *Buffer, uint Count)
{
my_off_t pos_in_file;
uint length, diff_length, left_length;
+ IO_CACHE_SHARE *cshare= cache->share;
DBUG_ENTER("_my_b_read_r");
- if ((left_length= (uint) (info->read_end - info->read_pos)))
+ if ((left_length= (uint) (cache->read_end - cache->read_pos)))
{
DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
- memcpy(Buffer, info->read_pos, (size_t) (left_length));
+ memcpy(Buffer, cache->read_pos, (size_t) (left_length));
Buffer+= left_length;
Count-= left_length;
}
@@ -622,55 +923,133 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
{
int cnt, len;
- pos_in_file= info->pos_in_file + (info->read_end - info->buffer);
+ pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
diff_length= (uint) (pos_in_file & (IO_SIZE-1));
length=IO_ROUND_UP(Count+diff_length)-diff_length;
- length=(length <= info->read_length) ?
- length + IO_ROUND_DN(info->read_length - length) :
- length - IO_ROUND_UP(length - info->read_length) ;
- if (info->type != READ_FIFO &&
- (length > (info->end_of_file - pos_in_file)))
- length= (uint) (info->end_of_file - pos_in_file);
+ length= ((length <= cache->read_length) ?
+ length + IO_ROUND_DN(cache->read_length - length) :
+ length - IO_ROUND_UP(length - cache->read_length));
+ if (cache->type != READ_FIFO &&
+ (length > (cache->end_of_file - pos_in_file)))
+ length= (uint) (cache->end_of_file - pos_in_file);
if (length == 0)
{
- info->error= (int) left_length;
+ cache->error= (int) left_length;
DBUG_RETURN(1);
}
- if (lock_io_cache(info, pos_in_file))
+ if (lock_io_cache(cache, pos_in_file))
{
- info->share->active=info;
- if (info->seek_not_done) /* File touched, do seek */
- VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
- len=(int)my_read(info->file,info->buffer, length, info->myflags);
- info->read_end=info->buffer + (len == -1 ? 0 : len);
- info->error=(len == (int)length ? 0 : len);
- info->pos_in_file=pos_in_file;
- unlock_io_cache(info);
+ /* With a synchronized write/read cache we won't come here... */
+ DBUG_ASSERT(!cshare->source_cache);
+ /*
+ ... unless the writer has gone before this thread entered the
+ lock. Simulate EOF in this case. It can be distinguished by
+ cache->file.
+ */
+ if (cache->file < 0)
+ len= 0;
+ else
+ {
+ if (cache->seek_not_done) /* File touched, do seek */
+ VOID(my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0)));
+ len= (int) my_read(cache->file, cache->buffer, length, cache->myflags);
+ }
+ DBUG_PRINT("io_cache_share", ("read %d bytes", len));
+
+ cache->read_end= cache->buffer + (len == -1 ? 0 : len);
+ cache->error= (len == (int)length ? 0 : len);
+ cache->pos_in_file= pos_in_file;
+
+ /* Copy important values to the share. */
+ cshare->error= cache->error;
+ cshare->read_end= cache->read_end;
+ cshare->pos_in_file= pos_in_file;
+
+ /* Mark all threads as running and wake them. */
+ unlock_io_cache(cache);
}
else
{
- info->error= info->share->active->error;
- info->read_end= info->share->active->read_end;
- info->pos_in_file= info->share->active->pos_in_file;
- len= (int) (info->error == -1 ? -1 : info->read_end-info->buffer);
+ /*
+ With a synchronized write/read cache readers always come here.
+ Copy important values from the share.
+ */
+ cache->error= cshare->error;
+ cache->read_end= cshare->read_end;
+ cache->pos_in_file= cshare->pos_in_file;
+
+ len= (int) ((cache->error == -1) ? -1 : cache->read_end - cache->buffer);
}
- info->read_pos=info->buffer;
- info->seek_not_done=0;
+ cache->read_pos= cache->buffer;
+ cache->seek_not_done= 0;
if (len <= 0)
{
- info->error= (int) left_length;
+ DBUG_PRINT("io_cache_share", ("reader error. len %d left %u",
+ len, left_length));
+ cache->error= (int) left_length;
DBUG_RETURN(1);
}
cnt= ((uint) len > Count) ? (int) Count : len;
- memcpy(Buffer, info->read_pos, (size_t) cnt);
+ memcpy(Buffer, cache->read_pos, (size_t) cnt);
Count -= cnt;
Buffer+= cnt;
left_length+= cnt;
- info->read_pos+= cnt;
+ cache->read_pos+= cnt;
}
DBUG_RETURN(0);
}
-#endif
+
+
+/*
+ Copy data from write cache to read cache.
+
+ SYNOPSIS
+ copy_to_read_buffer()
+ write_cache The write cache.
+ write_buffer The source of data, mostly the cache buffer.
+ write_length The number of bytes to copy.
+
+ NOTE
+ The write thread will wait for all read threads to join the cache
+ lock. Then it copies the data over and wakes the read threads.
+
+ RETURN
+ void
+*/
+
+static void copy_to_read_buffer(IO_CACHE *write_cache,
+ const byte *write_buffer, uint write_length)
+{
+ IO_CACHE_SHARE *cshare= write_cache->share;
+
+ DBUG_ASSERT(cshare->source_cache == write_cache);
+ /*
+ write_length is usually less or equal to buffer_length.
+ It can be bigger if _my_b_write() is called with a big length.
+ */
+ while (write_length)
+ {
+ uint copy_length= min(write_length, write_cache->buffer_length);
+ int __attribute__((unused)) rc;
+
+ rc= lock_io_cache(write_cache, write_cache->pos_in_file);
+ /* The writing thread does always have the lock when it awakes. */
+ DBUG_ASSERT(rc);
+
+ memcpy(cshare->buffer, write_buffer, copy_length);
+
+ cshare->error= 0;
+ cshare->read_end= cshare->buffer + copy_length;
+ cshare->pos_in_file= write_cache->pos_in_file;
+
+ /* Mark all threads as running and wake them. */
+ unlock_io_cache(write_cache);
+
+ write_buffer+= copy_length;
+ write_length-= copy_length;
+ }
+}
+#endif /*THREAD*/
/*
@@ -1022,6 +1401,7 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
Buffer+=rest_length;
Count-=rest_length;
info->write_pos+=rest_length;
+
if (my_b_flush_io_cache(info,1))
return 1;
if (Count >= IO_SIZE)
@@ -1034,6 +1414,23 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
}
if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
return info->error= -1;
+
+#ifdef THREAD
+ /*
+ In case of a shared I/O cache with a writer we normally do direct
+ write cache to read cache copy. Simulate this here by direct
+ caller buffer to read cache copy. Do it after the write so that
+ the cache readers actions on the flushed part can go in parallel
+ with the write of the extra stuff. copy_to_read_buffer()
+ synchronizes writer and readers so that after this call the
+ readers can act on the extra stuff while the writer can go ahead
+ and prepare the next output. copy_to_read_buffer() relies on
+ info->pos_in_file.
+ */
+ if (info->share)
+ copy_to_read_buffer(info, Buffer, length);
+#endif
+
Count-=length;
Buffer+=length;
info->pos_in_file+=length;
@@ -1054,6 +1451,14 @@ int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
{
uint rest_length,length;
+#ifdef THREAD
+ /*
+ Assert that we cannot come here with a shared cache. If we do one
+ day, we might need to add a call to copy_to_read_buffer().
+ */
+ DBUG_ASSERT(!info->share);
+#endif
+
lock_append_buffer(info);
rest_length=(uint) (info->write_end - info->write_pos);
if (Count <= rest_length)
@@ -1114,6 +1519,14 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
uint length;
int error=0;
+#ifdef THREAD
+ /*
+ Assert that we cannot come here with a shared cache. If we do one
+ day, we might need to add a call to copy_to_read_buffer().
+ */
+ DBUG_ASSERT(!info->share);
+#endif
+
if (pos < info->pos_in_file)
{
/* Of no overlap, write everything without buffering */
@@ -1190,6 +1603,17 @@ int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
if ((length=(uint) (info->write_pos - info->write_buffer)))
{
+#ifdef THREAD
+ /*
+ In case of a shared I/O cache with a writer we do direct write
+ cache to read cache copy. Do it before the write here so that
+ the readers can work in parallel with the write.
+ copy_to_read_buffer() relies on info->pos_in_file.
+ */
+ if (info->share)
+ copy_to_read_buffer(info, info->write_buffer, length);
+#endif
+
pos_in_file=info->pos_in_file;
/*
If we have append cache, we always open the file with
@@ -1269,16 +1693,10 @@ int end_io_cache(IO_CACHE *info)
#ifdef THREAD
/*
- if IO_CACHE is shared between several threads, only one
- thread needs to call end_io_cache() - just as init_io_cache()
- should be called only once and then memcopy'ed
+ Every thread must call remove_io_thread(). The last one destroys
+ the share elements.
*/
- if (info->share)
- {
- pthread_cond_destroy(&info->share->cond);
- pthread_mutex_destroy(&info->share->mutex);
- info->share=0;
- }
+ DBUG_ASSERT(!info->share || !info->share->total_threads);
#endif
if ((pre_close=info->pre_close))