summaryrefslogtreecommitdiff
path: root/mysys/thr_lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'mysys/thr_lock.c')
-rw-r--r--mysys/thr_lock.c313
1 files changed, 194 insertions, 119 deletions
diff --git a/mysys/thr_lock.c b/mysys/thr_lock.c
index d96d08ea0c3..135a2a5618f 100644
--- a/mysys/thr_lock.c
+++ b/mysys/thr_lock.c
@@ -24,7 +24,7 @@ Locks are prioritized according to:
The current lock types are:
-TL_READ # Low priority read
+TL_READ # Low priority read
TL_READ_WITH_SHARED_LOCKS
TL_READ_HIGH_PRIORITY # High priority read
TL_READ_NO_INSERT # Read without concurrent inserts
@@ -56,8 +56,17 @@ check_status:
In MyISAM this is a simple check if the insert can be done
at the end of the datafile.
update_status:
- Before a write lock is released, this function is called.
- In MyISAM this functions updates the count and length of the datafile
+ in thr_reschedule_write_lock(), when an insert delayed thread
+ downgrades TL_WRITE lock to TL_WRITE_DELAYED, to allow SELECT
+ threads to proceed.
+ A storage engine should also call update_status internally
+ in the ::external_lock(F_UNLCK) method.
+ In MyISAM and CSV this functions updates the length of the datafile.
+ MySQL does in some exceptional cases (when doing DLL statements on
+ open tables calls thr_unlock() followed by thr_lock() without calling
+ ::external_lock() in between. In this case thr_unlock() is called with
+ the THR_UNLOCK_UPDATE_STATUS flag and thr_unlock() will call
+ update_status for write locks.
get_status:
When one gets a lock this functions is called.
In MyISAM this stores the number of rows and size of the datafile
@@ -66,6 +75,8 @@ get_status:
The lock algorithm allows one to have one TL_WRITE_CONCURRENT_INSERT or
one TL_WRITE_DELAYED lock at the same time as multiple read locks.
+In addition, if lock->allow_multiple_concurrent_insert is set then there can
+be any number of TL_WRITE_CONCURRENT_INSERT locks aktive at the same time.
*/
#if !defined(MAIN) && !defined(DBUG_OFF) && !defined(EXTRA_DEBUG)
@@ -106,8 +117,30 @@ static inline mysql_cond_t *get_cond(void)
return &my_thread_var->suspend;
}
+
/*
-** For the future (now the thread specific cond is alloced by my_pthread.c)
+ Priority for locks (decides in which order locks are locked)
+ We want all write locks to be first, followed by read locks.
+ Locks from MERGE tables has a little lower priority than other
+ locks, to allow one to release merge tables without having
+ to unlock and re-lock other locks.
+ The lower the number, the higher the priority for the lock.
+ Read locks should have 4, write locks should have 0.
+ UNLOCK is 8, to force these last in thr_merge_locks.
+ For MERGE tables we add 2 (THR_LOCK_MERGE_PRIV) to the lock priority.
+ THR_LOCK_LATE_PRIV (1) is used when one locks other tables to be merged
+ with existing locks. This way we prioritize the original locks over the
+ new locks.
+*/
+
+static uint lock_priority[(uint)TL_WRITE_ONLY+1] =
+{ 8, 4, 4, 4, 4, 4, 0, 0, 0, 0, 0, 0, 0, 0};
+
+#define LOCK_CMP(A,B) ((uchar*) ((A)->lock) + lock_priority[(uint) (A)->type] + (A)->priority < (uchar*) ((B)->lock) + lock_priority[(uint) (B)->type] + (B)->priority)
+
+
+/*
+ For the future (now the thread specific cond is alloced by my_pthread.c)
*/
my_bool init_thr_lock()
@@ -154,7 +187,8 @@ static int check_lock(struct st_lock_list *list, const char* lock_type,
}
if (same_owner &&
!thr_lock_owner_equal(data->owner, first_owner) &&
- last_lock_type != TL_WRITE_ALLOW_WRITE)
+ last_lock_type != TL_WRITE_ALLOW_WRITE &&
+ last_lock_type != TL_WRITE_CONCURRENT_INSERT)
{
fprintf(stderr,
"Warning: Found locks from different threads in %s: %s\n",
@@ -207,7 +241,7 @@ static void check_locks(THR_LOCK *lock, const char *where,
THR_LOCK_DATA *data;
for (data=lock->read.data ; data ; data=data->next)
{
- if ((int) data->type == (int) TL_READ_NO_INSERT)
+ if (data->type == TL_READ_NO_INSERT)
count++;
/* Protect against infinite loop. */
DBUG_ASSERT(count <= lock->read_no_write_count);
@@ -255,7 +289,22 @@ static void check_locks(THR_LOCK *lock, const char *where,
}
}
else
- { /* Have write lock */
+ {
+ /* We have at least one write lock */
+ if (lock->write.data->type == TL_WRITE_CONCURRENT_INSERT)
+ {
+ THR_LOCK_DATA *data;
+ for (data=lock->write.data->next ; data ; data=data->next)
+ {
+ if (data->type != TL_WRITE_CONCURRENT_INSERT)
+ {
+ fprintf(stderr,
+ "Warning at '%s': Found TL_WRITE_CONCURRENT_INSERT lock mixed with other write locks\n",
+ where);
+ break;
+ }
+ }
+ }
if (lock->write_wait.data)
{
if (!allow_no_locks &&
@@ -362,6 +411,7 @@ void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
data->owner= 0; /* no owner yet */
data->status_param=param;
data->cond=0;
+ data->priority= 0;
}
@@ -518,7 +568,8 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
{
result= THR_LOCK_SUCCESS;
if (data->lock->get_status)
- (*data->lock->get_status)(data->status_param, 0);
+ (*data->lock->get_status)(data->status_param,
+ data->type == TL_WRITE_CONCURRENT_INSERT);
check_locks(data->lock,"got wait_for_lock",0);
}
mysql_mutex_unlock(&data->lock->mutex);
@@ -535,7 +586,7 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
}
-enum enum_thr_lock_result
+static enum enum_thr_lock_result
thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner,
enum thr_lock_type lock_type, ulong lock_wait_timeout)
{
@@ -548,6 +599,7 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner,
data->cond=0; /* safety */
data->type=lock_type;
data->owner= owner; /* Must be reset ! */
+ data->priority&= ~THR_LOCK_LATE_PRIV;
mysql_mutex_lock(&lock->mutex);
DBUG_PRINT("lock",("data: 0x%lx thread: 0x%lx lock: 0x%lx type: %d",
(long) data, data->owner->thread_id,
@@ -620,11 +672,11 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner,
(*lock->read.last)=data; /* Add to running FIFO */
data->prev=lock->read.last;
lock->read.last= &data->next;
- if (lock->get_status)
- (*lock->get_status)(data->status_param, 0);
if (lock_type == TL_READ_NO_INSERT)
lock->read_no_write_count++;
check_locks(lock,"read lock with no write locks",0);
+ if (lock->get_status)
+ (*lock->get_status)(data->status_param, 0);
statistic_increment(locks_immediate,&THR_LOCK_lock);
goto end;
}
@@ -682,7 +734,8 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner,
/*
The idea is to allow us to get a lock at once if we already have
a write lock or if there is no pending write locks and if all
- write locks are of TL_WRITE_ALLOW_WRITE type.
+ write locks are of the same type and are either
+ TL_WRITE_ALLOW_WRITE or TL_WRITE_CONCURRENT_INSERT
Note that, since lock requests for the same table are sorted in
such way that requests with higher thr_lock_type value come first
@@ -713,15 +766,13 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner,
lock->write.data->type == TL_WRITE_LOW_PRIORITY)) &&
lock->write.data->type != TL_WRITE_DELAYED));
- if ((lock_type == TL_WRITE_ALLOW_WRITE &&
+ if (((lock_type == TL_WRITE_ALLOW_WRITE ||
+ (lock_type == TL_WRITE_CONCURRENT_INSERT &&
+ lock->allow_multiple_concurrent_insert)) &&
! lock->write_wait.data &&
- lock->write.data->type == TL_WRITE_ALLOW_WRITE) ||
+ lock->write.data->type == lock_type) ||
has_old_lock(lock->write.data, data->owner))
{
- /*
- We have already got a write lock or all locks are
- TL_WRITE_ALLOW_WRITE
- */
DBUG_PRINT("info", ("write_wait.data: 0x%lx old_type: %d",
(ulong) lock->write_wait.data,
lock->write.data->type));
@@ -730,8 +781,9 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner,
data->prev=lock->write.last;
lock->write.last= &data->next;
check_locks(lock,"second write lock",0);
- if (data->lock->get_status)
- (*data->lock->get_status)(data->status_param, 0);
+ if (lock->get_status)
+ (*lock->get_status)(data->status_param,
+ lock_type == TL_WRITE_CONCURRENT_INSERT);
statistic_increment(locks_immediate,&THR_LOCK_lock);
goto end;
}
@@ -764,8 +816,8 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner,
(*lock->write.last)=data; /* Add as current write lock */
data->prev=lock->write.last;
lock->write.last= &data->next;
- if (data->lock->get_status)
- (*data->lock->get_status)(data->status_param, concurrent_insert);
+ if (lock->get_status)
+ (*lock->get_status)(data->status_param, concurrent_insert);
check_locks(lock,"only write lock",0);
statistic_increment(locks_immediate,&THR_LOCK_lock);
goto end;
@@ -836,7 +888,7 @@ static inline void free_all_read_locks(THR_LOCK *lock,
/* Unlock lock and free next thread on same lock */
-void thr_unlock(THR_LOCK_DATA *data)
+void thr_unlock(THR_LOCK_DATA *data, uint unlock_flags)
{
THR_LOCK *lock=data->lock;
enum thr_lock_type lock_type=data->type;
@@ -860,15 +912,20 @@ void thr_unlock(THR_LOCK_DATA *data)
}
else
lock->write.last=data->prev;
- if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
+
+ if (unlock_flags & THR_UNLOCK_UPDATE_STATUS)
{
- if (lock->update_status)
- (*lock->update_status)(data->status_param);
- }
- else
- {
- if (lock->restore_status)
- (*lock->restore_status)(data->status_param);
+ /* External lock was not called; Update or restore status */
+ if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
+ {
+ if (lock->update_status)
+ (*lock->update_status)(data->status_param);
+ }
+ else
+ {
+ if (lock->restore_status)
+ (*lock->restore_status)(data->status_param);
+ }
}
if (lock_type == TL_READ_NO_INSERT)
lock->read_no_write_count--;
@@ -892,7 +949,6 @@ static void wake_up_waiters(THR_LOCK *lock)
{
THR_LOCK_DATA *data;
enum thr_lock_type lock_type;
-
DBUG_ENTER("wake_up_waiters");
if (!lock->write.data) /* If no active write locks */
@@ -1006,14 +1062,12 @@ end:
/*
-** Get all locks in a specific order to avoid dead-locks
-** Sort acording to lock position and put write_locks before read_locks if
-** lock on same lock.
+ Get all locks in a specific order to avoid dead-locks
+ Sort acording to lock position and put write_locks before read_locks if
+ lock on same lock. Locks on MERGE tables has lower priority than other
+ locks of the same type. See comment for lock_priority.
*/
-
-#define LOCK_CMP(A,B) ((uchar*) (A->lock) - (uint) ((A)->type) < (uchar*) (B->lock)- (uint) ((B)->type))
-
static void sort_locks(THR_LOCK_DATA **data,uint count)
{
THR_LOCK_DATA **pos,**end,**prev,*tmp;
@@ -1039,11 +1093,15 @@ enum enum_thr_lock_result
thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_INFO *owner,
ulong lock_wait_timeout)
{
- THR_LOCK_DATA **pos,**end;
+ THR_LOCK_DATA **pos, **end, **first_lock;
DBUG_ENTER("thr_multi_lock");
DBUG_PRINT("lock",("data: 0x%lx count: %d", (long) data, count));
+
if (count > 1)
sort_locks(data,count);
+ else if (count == 0)
+ DBUG_RETURN(THR_LOCK_SUCCESS);
+
/* lock everything */
for (pos=data,end=data+count; pos < end ; pos++)
{
@@ -1051,7 +1109,7 @@ thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_INFO *owner,
lock_wait_timeout);
if (result != THR_LOCK_SUCCESS)
{ /* Aborted */
- thr_multi_unlock(data,(uint) (pos-data));
+ thr_multi_unlock(data,(uint) (pos-data), 0);
DBUG_RETURN(result);
}
DEBUG_SYNC_C("thr_multi_lock_after_thr_lock");
@@ -1060,93 +1118,103 @@ thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_INFO *owner,
(long) pos[0]->lock, pos[0]->type); fflush(stdout);
#endif
}
- thr_lock_merge_status(data, count);
+
+ /*
+ Call start_trans for all locks.
+ If we lock the same table multiple times, we must use the same
+ status_param; We ensure this by calling copy_status() for all
+ copies of the same tables.
+ */
+ if ((*data)->lock->start_trans)
+ ((*data)->lock->start_trans)((*data)->status_param);
+ for (first_lock=data, pos= data+1 ; pos < end ; pos++)
+ {
+ /* Get the current status (row count, checksum, trid etc) */
+ if ((*pos)->lock->start_trans)
+ (*(*pos)->lock->start_trans)((*pos)->status_param);
+ /*
+ If same table as previous table use pointer to previous status
+ information to ensure that all read/write tables shares same
+ state.
+ */
+ if (pos[0]->lock == pos[-1]->lock && pos[0]->lock->copy_status)
+ (pos[0]->lock->copy_status)((*pos)->status_param,
+ (*first_lock)->status_param);
+ else
+ {
+ /* Different lock, use this as base for next lock */
+ first_lock= pos;
+ }
+ }
DBUG_RETURN(THR_LOCK_SUCCESS);
}
/**
- Ensure that all locks for a given table have the same
- status_param.
-
- This is a MyISAM and possibly Maria specific crutch. MyISAM
- engine stores data file length, record count and other table
- properties in status_param member of handler. When a table is
- locked, connection-local copy is made from a global copy
- (myisam_share) by mi_get_status(). When a table is unlocked,
- the changed status is transferred back to the global share by
- mi_update_status().
-
- One thing MyISAM doesn't do is to ensure that when the same
- table is opened twice in a connection all instances share the
- same status_param. This is necessary, however: for one, to keep
- all instances of a connection "on the same page" with regard to
- the current state of the table. For other, unless this is done,
- myisam_share will always get updated from the last unlocked
- instance (in mi_update_status()), and when this instance was not
- the one that was used to update data, records may be lost.
-
- For each table, this function looks up the last lock_data in the
- list of acquired locks, and makes sure that all other instances
- share status_param with it.
+ Merge two sets of locks.
+
+ @param data All locks. First old locks, then new locks.
+ @param old_count Original number of locks. These are first in 'data'.
+ @param new_count How many new locks
+
+ The merge is needed if the new locks contains same tables as the old
+ locks, in which case we have to ensure that same tables shares the
+ same status (as after a thr_multi_lock()).
*/
-void
-thr_lock_merge_status(THR_LOCK_DATA **data, uint count)
+void thr_merge_locks(THR_LOCK_DATA **data, uint old_count, uint new_count)
{
-#if !defined(DONT_USE_RW_LOCKS)
- THR_LOCK_DATA **pos= data;
- THR_LOCK_DATA **end= data + count;
- if (count > 1)
+ THR_LOCK_DATA **pos, **end, **first_lock= 0;
+ DBUG_ENTER("thr_merge_lock");
+
+ /* Remove marks on old locks to make them sort before new ones */
+ for (pos=data, end= pos + old_count; pos < end ; pos++)
+ (*pos)->priority&= ~THR_LOCK_LATE_PRIV;
+
+ /* Mark new locks with LATE_PRIV to make them sort after org ones */
+ for (pos=data + old_count, end= pos + new_count; pos < end ; pos++)
+ (*pos)->priority|= THR_LOCK_LATE_PRIV;
+
+ sort_locks(data, old_count + new_count);
+
+ for (pos=data ; pos < end ; pos++)
{
- THR_LOCK_DATA *last_lock= end[-1];
- pos=end-1;
- do
+ /* Check if lock was unlocked before */
+ if (pos[0]->type == TL_UNLOCK || ! pos[0]->lock->fix_status)
{
- pos--;
- if (last_lock->lock == (*pos)->lock &&
- last_lock->lock->copy_status)
- {
- if (last_lock->type <= TL_READ_NO_INSERT)
- {
- THR_LOCK_DATA **read_lock;
- /*
- If we are locking the same table with read locks we must ensure
- that all tables share the status of the last write lock or
- the same read lock.
- */
- for (;
- (*pos)->type <= TL_READ_NO_INSERT &&
- pos != data &&
- pos[-1]->lock == (*pos)->lock ;
- pos--) ;
-
- read_lock = pos+1;
- do
- {
- (last_lock->lock->copy_status)((*read_lock)->status_param,
- (*pos)->status_param);
- } while (*(read_lock++) != last_lock);
- last_lock= (*pos); /* Point at last write lock */
- }
- else
- (*last_lock->lock->copy_status)((*pos)->status_param,
- last_lock->status_param);
- }
- else
- last_lock=(*pos);
- } while (pos != data);
+ DBUG_PRINT("info", ("lock skipped. unlocked: %d fix_status: %d",
+ pos[0]->type == TL_UNLOCK,
+ pos[0]->lock->fix_status == 0));
+ continue;
+ }
+
+ /*
+ If same table as previous table use pointer to previous status
+ information to ensure that all read/write tables shares same
+ state.
+ */
+ if (first_lock && pos[0]->lock == first_lock[0]->lock)
+ (pos[0]->lock->fix_status)((*first_lock)->status_param,
+ (*pos)->status_param);
+ else
+ {
+ /* Different lock, use this as base for next lock */
+ first_lock= pos;
+ (pos[0]->lock->fix_status)((*first_lock)->status_param, 0);
+ }
}
-#endif
+ DBUG_VOID_RETURN;
}
- /* free all locks */
-void thr_multi_unlock(THR_LOCK_DATA **data,uint count)
+/* Unlock all locks */
+
+void thr_multi_unlock(THR_LOCK_DATA **data,uint count, uint unlock_flags)
{
THR_LOCK_DATA **pos,**end;
DBUG_ENTER("thr_multi_unlock");
- DBUG_PRINT("lock",("data: 0x%lx count: %d", (long) data, count));
+ DBUG_PRINT("lock",("data: 0x%lx count: %d flags: %u", (long) data, count,
+ unlock_flags));
for (pos=data,end=data+count; pos < end ; pos++)
{
@@ -1156,7 +1224,7 @@ void thr_multi_unlock(THR_LOCK_DATA **data,uint count)
fflush(stdout);
#endif
if ((*pos)->type != TL_UNLOCK)
- thr_unlock(*pos);
+ thr_unlock(*pos, unlock_flags);
else
{
DBUG_PRINT("lock",("Free lock: data: 0x%lx thread: 0x%lx lock: 0x%lx",
@@ -1306,6 +1374,7 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data,
ulong lock_wait_timeout)
{
THR_LOCK *lock=data->lock;
+ enum enum_thr_lock_result res;
DBUG_ENTER("thr_upgrade_write_delay_lock");
mysql_mutex_lock(&lock->mutex);
@@ -1326,6 +1395,8 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data,
if (data->lock->get_status)
(*data->lock->get_status)(data->status_param, 0);
mysql_mutex_unlock(&lock->mutex);
+ if (lock->start_trans)
+ (*lock->start_trans)(data->status_param);
DBUG_RETURN(0);
}
@@ -1346,7 +1417,10 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data,
{
check_locks(lock,"waiting for lock",0);
}
- DBUG_RETURN(wait_for_lock(&lock->write_wait,data,1, lock_wait_timeout));
+ res= wait_for_lock(&lock->write_wait, data, 1, lock_wait_timeout);
+ if (res == THR_LOCK_SUCCESS && lock->start_trans)
+ DBUG_RETURN((*lock->start_trans)(data->status_param));
+ DBUG_RETURN(0);
}
@@ -1466,7 +1540,7 @@ struct st_test {
enum thr_lock_type lock_type;
};
-THR_LOCK locks[5]; /* 4 locks */
+THR_LOCK locks[6]; /* Number of locks +1 */
struct st_test test_0[] = {{0,TL_READ}}; /* One lock */
struct st_test test_1[] = {{0,TL_READ},{0,TL_WRITE}}; /* Read and write lock of lock 0 */
@@ -1547,7 +1621,6 @@ static void *test_thread(void *arg)
printf("Thread %s (%d) started\n",my_thread_name(),param); fflush(stdout);
-
thr_lock_info_init(&lock_info);
for (i=0; i < lock_counts[param] ; i++)
thr_lock_data_init(locks+tests[param][i].lock_nr,data+i,NULL);
@@ -1574,7 +1647,7 @@ static void *test_thread(void *arg)
}
}
mysql_mutex_unlock(&LOCK_thread_count);
- thr_multi_unlock(multi_locks,lock_counts[param]);
+ thr_multi_unlock(multi_locks,lock_counts[param], THR_UNLOCK_UPDATE_STATUS);
}
printf("Thread %s (%d) ended\n",my_thread_name(),param); fflush(stdout);
@@ -1592,7 +1665,8 @@ int main(int argc __attribute__((unused)),char **argv __attribute__((unused)))
{
pthread_t tid;
pthread_attr_t thr_attr;
- int i,*param,error;
+ int *param,error;
+ uint i;
MY_INIT(argv[0]);
if (argc > 1 && argv[1][0] == '-' && argv[1][1] == '#')
DBUG_PUSH(argv[1]+2);
@@ -1612,13 +1686,14 @@ int main(int argc __attribute__((unused)),char **argv __attribute__((unused)))
exit(1);
}
- for (i=0 ; i < (int) array_elements(locks) ; i++)
+ for (i=0 ; i < array_elements(locks) ; i++)
{
thr_lock_init(locks+i);
locks[i].check_status= test_check_status;
locks[i].update_status=test_update_status;
locks[i].copy_status= test_copy_status;
locks[i].get_status= test_get_status;
+ locks[i].allow_multiple_concurrent_insert= 1;
}
if ((error=pthread_attr_init(&thr_attr)))
{
@@ -1644,7 +1719,7 @@ int main(int argc __attribute__((unused)),char **argv __attribute__((unused)))
#ifdef HAVE_THR_SETCONCURRENCY
(void) thr_setconcurrency(2);
#endif
- for (i=0 ; i < (int) array_elements(lock_counts) ; i++)
+ for (i=0 ; i < array_elements(lock_counts) ; i++)
{
param=(int*) malloc(sizeof(int));
*param=i;
@@ -1678,7 +1753,7 @@ int main(int argc __attribute__((unused)),char **argv __attribute__((unused)))
}
if ((error= mysql_mutex_unlock(&LOCK_thread_count)))
fprintf(stderr, "Got error: %d from mysql_mutex_unlock\n", error);
- for (i=0 ; i < (int) array_elements(locks) ; i++)
+ for (i=0 ; i < array_elements(locks) ; i++)
thr_lock_delete(locks+i);
#ifdef EXTRA_DEBUG
if (found_errors)