diff options
Diffstat (limited to 'mysys/thr_lock.c')
-rw-r--r-- | mysys/thr_lock.c | 313 |
1 files changed, 194 insertions, 119 deletions
diff --git a/mysys/thr_lock.c b/mysys/thr_lock.c index d96d08ea0c3..135a2a5618f 100644 --- a/mysys/thr_lock.c +++ b/mysys/thr_lock.c @@ -24,7 +24,7 @@ Locks are prioritized according to: The current lock types are: -TL_READ # Low priority read +TL_READ # Low priority read TL_READ_WITH_SHARED_LOCKS TL_READ_HIGH_PRIORITY # High priority read TL_READ_NO_INSERT # Read without concurrent inserts @@ -56,8 +56,17 @@ check_status: In MyISAM this is a simple check if the insert can be done at the end of the datafile. update_status: - Before a write lock is released, this function is called. - In MyISAM this functions updates the count and length of the datafile + in thr_reschedule_write_lock(), when an insert delayed thread + downgrades TL_WRITE lock to TL_WRITE_DELAYED, to allow SELECT + threads to proceed. + A storage engine should also call update_status internally + in the ::external_lock(F_UNLCK) method. + In MyISAM and CSV this functions updates the length of the datafile. + MySQL does in some exceptional cases (when doing DLL statements on + open tables calls thr_unlock() followed by thr_lock() without calling + ::external_lock() in between. In this case thr_unlock() is called with + the THR_UNLOCK_UPDATE_STATUS flag and thr_unlock() will call + update_status for write locks. get_status: When one gets a lock this functions is called. In MyISAM this stores the number of rows and size of the datafile @@ -66,6 +75,8 @@ get_status: The lock algorithm allows one to have one TL_WRITE_CONCURRENT_INSERT or one TL_WRITE_DELAYED lock at the same time as multiple read locks. +In addition, if lock->allow_multiple_concurrent_insert is set then there can +be any number of TL_WRITE_CONCURRENT_INSERT locks aktive at the same time. */ #if !defined(MAIN) && !defined(DBUG_OFF) && !defined(EXTRA_DEBUG) @@ -106,8 +117,30 @@ static inline mysql_cond_t *get_cond(void) return &my_thread_var->suspend; } + /* -** For the future (now the thread specific cond is alloced by my_pthread.c) + Priority for locks (decides in which order locks are locked) + We want all write locks to be first, followed by read locks. + Locks from MERGE tables has a little lower priority than other + locks, to allow one to release merge tables without having + to unlock and re-lock other locks. + The lower the number, the higher the priority for the lock. + Read locks should have 4, write locks should have 0. + UNLOCK is 8, to force these last in thr_merge_locks. + For MERGE tables we add 2 (THR_LOCK_MERGE_PRIV) to the lock priority. + THR_LOCK_LATE_PRIV (1) is used when one locks other tables to be merged + with existing locks. This way we prioritize the original locks over the + new locks. +*/ + +static uint lock_priority[(uint)TL_WRITE_ONLY+1] = +{ 8, 4, 4, 4, 4, 4, 0, 0, 0, 0, 0, 0, 0, 0}; + +#define LOCK_CMP(A,B) ((uchar*) ((A)->lock) + lock_priority[(uint) (A)->type] + (A)->priority < (uchar*) ((B)->lock) + lock_priority[(uint) (B)->type] + (B)->priority) + + +/* + For the future (now the thread specific cond is alloced by my_pthread.c) */ my_bool init_thr_lock() @@ -154,7 +187,8 @@ static int check_lock(struct st_lock_list *list, const char* lock_type, } if (same_owner && !thr_lock_owner_equal(data->owner, first_owner) && - last_lock_type != TL_WRITE_ALLOW_WRITE) + last_lock_type != TL_WRITE_ALLOW_WRITE && + last_lock_type != TL_WRITE_CONCURRENT_INSERT) { fprintf(stderr, "Warning: Found locks from different threads in %s: %s\n", @@ -207,7 +241,7 @@ static void check_locks(THR_LOCK *lock, const char *where, THR_LOCK_DATA *data; for (data=lock->read.data ; data ; data=data->next) { - if ((int) data->type == (int) TL_READ_NO_INSERT) + if (data->type == TL_READ_NO_INSERT) count++; /* Protect against infinite loop. */ DBUG_ASSERT(count <= lock->read_no_write_count); @@ -255,7 +289,22 @@ static void check_locks(THR_LOCK *lock, const char *where, } } else - { /* Have write lock */ + { + /* We have at least one write lock */ + if (lock->write.data->type == TL_WRITE_CONCURRENT_INSERT) + { + THR_LOCK_DATA *data; + for (data=lock->write.data->next ; data ; data=data->next) + { + if (data->type != TL_WRITE_CONCURRENT_INSERT) + { + fprintf(stderr, + "Warning at '%s': Found TL_WRITE_CONCURRENT_INSERT lock mixed with other write locks\n", + where); + break; + } + } + } if (lock->write_wait.data) { if (!allow_no_locks && @@ -362,6 +411,7 @@ void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param) data->owner= 0; /* no owner yet */ data->status_param=param; data->cond=0; + data->priority= 0; } @@ -518,7 +568,8 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, { result= THR_LOCK_SUCCESS; if (data->lock->get_status) - (*data->lock->get_status)(data->status_param, 0); + (*data->lock->get_status)(data->status_param, + data->type == TL_WRITE_CONCURRENT_INSERT); check_locks(data->lock,"got wait_for_lock",0); } mysql_mutex_unlock(&data->lock->mutex); @@ -535,7 +586,7 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, } -enum enum_thr_lock_result +static enum enum_thr_lock_result thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, enum thr_lock_type lock_type, ulong lock_wait_timeout) { @@ -548,6 +599,7 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, data->cond=0; /* safety */ data->type=lock_type; data->owner= owner; /* Must be reset ! */ + data->priority&= ~THR_LOCK_LATE_PRIV; mysql_mutex_lock(&lock->mutex); DBUG_PRINT("lock",("data: 0x%lx thread: 0x%lx lock: 0x%lx type: %d", (long) data, data->owner->thread_id, @@ -620,11 +672,11 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, (*lock->read.last)=data; /* Add to running FIFO */ data->prev=lock->read.last; lock->read.last= &data->next; - if (lock->get_status) - (*lock->get_status)(data->status_param, 0); if (lock_type == TL_READ_NO_INSERT) lock->read_no_write_count++; check_locks(lock,"read lock with no write locks",0); + if (lock->get_status) + (*lock->get_status)(data->status_param, 0); statistic_increment(locks_immediate,&THR_LOCK_lock); goto end; } @@ -682,7 +734,8 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, /* The idea is to allow us to get a lock at once if we already have a write lock or if there is no pending write locks and if all - write locks are of TL_WRITE_ALLOW_WRITE type. + write locks are of the same type and are either + TL_WRITE_ALLOW_WRITE or TL_WRITE_CONCURRENT_INSERT Note that, since lock requests for the same table are sorted in such way that requests with higher thr_lock_type value come first @@ -713,15 +766,13 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, lock->write.data->type == TL_WRITE_LOW_PRIORITY)) && lock->write.data->type != TL_WRITE_DELAYED)); - if ((lock_type == TL_WRITE_ALLOW_WRITE && + if (((lock_type == TL_WRITE_ALLOW_WRITE || + (lock_type == TL_WRITE_CONCURRENT_INSERT && + lock->allow_multiple_concurrent_insert)) && ! lock->write_wait.data && - lock->write.data->type == TL_WRITE_ALLOW_WRITE) || + lock->write.data->type == lock_type) || has_old_lock(lock->write.data, data->owner)) { - /* - We have already got a write lock or all locks are - TL_WRITE_ALLOW_WRITE - */ DBUG_PRINT("info", ("write_wait.data: 0x%lx old_type: %d", (ulong) lock->write_wait.data, lock->write.data->type)); @@ -730,8 +781,9 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, data->prev=lock->write.last; lock->write.last= &data->next; check_locks(lock,"second write lock",0); - if (data->lock->get_status) - (*data->lock->get_status)(data->status_param, 0); + if (lock->get_status) + (*lock->get_status)(data->status_param, + lock_type == TL_WRITE_CONCURRENT_INSERT); statistic_increment(locks_immediate,&THR_LOCK_lock); goto end; } @@ -764,8 +816,8 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, (*lock->write.last)=data; /* Add as current write lock */ data->prev=lock->write.last; lock->write.last= &data->next; - if (data->lock->get_status) - (*data->lock->get_status)(data->status_param, concurrent_insert); + if (lock->get_status) + (*lock->get_status)(data->status_param, concurrent_insert); check_locks(lock,"only write lock",0); statistic_increment(locks_immediate,&THR_LOCK_lock); goto end; @@ -836,7 +888,7 @@ static inline void free_all_read_locks(THR_LOCK *lock, /* Unlock lock and free next thread on same lock */ -void thr_unlock(THR_LOCK_DATA *data) +void thr_unlock(THR_LOCK_DATA *data, uint unlock_flags) { THR_LOCK *lock=data->lock; enum thr_lock_type lock_type=data->type; @@ -860,15 +912,20 @@ void thr_unlock(THR_LOCK_DATA *data) } else lock->write.last=data->prev; - if (lock_type >= TL_WRITE_CONCURRENT_INSERT) + + if (unlock_flags & THR_UNLOCK_UPDATE_STATUS) { - if (lock->update_status) - (*lock->update_status)(data->status_param); - } - else - { - if (lock->restore_status) - (*lock->restore_status)(data->status_param); + /* External lock was not called; Update or restore status */ + if (lock_type >= TL_WRITE_CONCURRENT_INSERT) + { + if (lock->update_status) + (*lock->update_status)(data->status_param); + } + else + { + if (lock->restore_status) + (*lock->restore_status)(data->status_param); + } } if (lock_type == TL_READ_NO_INSERT) lock->read_no_write_count--; @@ -892,7 +949,6 @@ static void wake_up_waiters(THR_LOCK *lock) { THR_LOCK_DATA *data; enum thr_lock_type lock_type; - DBUG_ENTER("wake_up_waiters"); if (!lock->write.data) /* If no active write locks */ @@ -1006,14 +1062,12 @@ end: /* -** Get all locks in a specific order to avoid dead-locks -** Sort acording to lock position and put write_locks before read_locks if -** lock on same lock. + Get all locks in a specific order to avoid dead-locks + Sort acording to lock position and put write_locks before read_locks if + lock on same lock. Locks on MERGE tables has lower priority than other + locks of the same type. See comment for lock_priority. */ - -#define LOCK_CMP(A,B) ((uchar*) (A->lock) - (uint) ((A)->type) < (uchar*) (B->lock)- (uint) ((B)->type)) - static void sort_locks(THR_LOCK_DATA **data,uint count) { THR_LOCK_DATA **pos,**end,**prev,*tmp; @@ -1039,11 +1093,15 @@ enum enum_thr_lock_result thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_INFO *owner, ulong lock_wait_timeout) { - THR_LOCK_DATA **pos,**end; + THR_LOCK_DATA **pos, **end, **first_lock; DBUG_ENTER("thr_multi_lock"); DBUG_PRINT("lock",("data: 0x%lx count: %d", (long) data, count)); + if (count > 1) sort_locks(data,count); + else if (count == 0) + DBUG_RETURN(THR_LOCK_SUCCESS); + /* lock everything */ for (pos=data,end=data+count; pos < end ; pos++) { @@ -1051,7 +1109,7 @@ thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_INFO *owner, lock_wait_timeout); if (result != THR_LOCK_SUCCESS) { /* Aborted */ - thr_multi_unlock(data,(uint) (pos-data)); + thr_multi_unlock(data,(uint) (pos-data), 0); DBUG_RETURN(result); } DEBUG_SYNC_C("thr_multi_lock_after_thr_lock"); @@ -1060,93 +1118,103 @@ thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_INFO *owner, (long) pos[0]->lock, pos[0]->type); fflush(stdout); #endif } - thr_lock_merge_status(data, count); + + /* + Call start_trans for all locks. + If we lock the same table multiple times, we must use the same + status_param; We ensure this by calling copy_status() for all + copies of the same tables. + */ + if ((*data)->lock->start_trans) + ((*data)->lock->start_trans)((*data)->status_param); + for (first_lock=data, pos= data+1 ; pos < end ; pos++) + { + /* Get the current status (row count, checksum, trid etc) */ + if ((*pos)->lock->start_trans) + (*(*pos)->lock->start_trans)((*pos)->status_param); + /* + If same table as previous table use pointer to previous status + information to ensure that all read/write tables shares same + state. + */ + if (pos[0]->lock == pos[-1]->lock && pos[0]->lock->copy_status) + (pos[0]->lock->copy_status)((*pos)->status_param, + (*first_lock)->status_param); + else + { + /* Different lock, use this as base for next lock */ + first_lock= pos; + } + } DBUG_RETURN(THR_LOCK_SUCCESS); } /** - Ensure that all locks for a given table have the same - status_param. - - This is a MyISAM and possibly Maria specific crutch. MyISAM - engine stores data file length, record count and other table - properties in status_param member of handler. When a table is - locked, connection-local copy is made from a global copy - (myisam_share) by mi_get_status(). When a table is unlocked, - the changed status is transferred back to the global share by - mi_update_status(). - - One thing MyISAM doesn't do is to ensure that when the same - table is opened twice in a connection all instances share the - same status_param. This is necessary, however: for one, to keep - all instances of a connection "on the same page" with regard to - the current state of the table. For other, unless this is done, - myisam_share will always get updated from the last unlocked - instance (in mi_update_status()), and when this instance was not - the one that was used to update data, records may be lost. - - For each table, this function looks up the last lock_data in the - list of acquired locks, and makes sure that all other instances - share status_param with it. + Merge two sets of locks. + + @param data All locks. First old locks, then new locks. + @param old_count Original number of locks. These are first in 'data'. + @param new_count How many new locks + + The merge is needed if the new locks contains same tables as the old + locks, in which case we have to ensure that same tables shares the + same status (as after a thr_multi_lock()). */ -void -thr_lock_merge_status(THR_LOCK_DATA **data, uint count) +void thr_merge_locks(THR_LOCK_DATA **data, uint old_count, uint new_count) { -#if !defined(DONT_USE_RW_LOCKS) - THR_LOCK_DATA **pos= data; - THR_LOCK_DATA **end= data + count; - if (count > 1) + THR_LOCK_DATA **pos, **end, **first_lock= 0; + DBUG_ENTER("thr_merge_lock"); + + /* Remove marks on old locks to make them sort before new ones */ + for (pos=data, end= pos + old_count; pos < end ; pos++) + (*pos)->priority&= ~THR_LOCK_LATE_PRIV; + + /* Mark new locks with LATE_PRIV to make them sort after org ones */ + for (pos=data + old_count, end= pos + new_count; pos < end ; pos++) + (*pos)->priority|= THR_LOCK_LATE_PRIV; + + sort_locks(data, old_count + new_count); + + for (pos=data ; pos < end ; pos++) { - THR_LOCK_DATA *last_lock= end[-1]; - pos=end-1; - do + /* Check if lock was unlocked before */ + if (pos[0]->type == TL_UNLOCK || ! pos[0]->lock->fix_status) { - pos--; - if (last_lock->lock == (*pos)->lock && - last_lock->lock->copy_status) - { - if (last_lock->type <= TL_READ_NO_INSERT) - { - THR_LOCK_DATA **read_lock; - /* - If we are locking the same table with read locks we must ensure - that all tables share the status of the last write lock or - the same read lock. - */ - for (; - (*pos)->type <= TL_READ_NO_INSERT && - pos != data && - pos[-1]->lock == (*pos)->lock ; - pos--) ; - - read_lock = pos+1; - do - { - (last_lock->lock->copy_status)((*read_lock)->status_param, - (*pos)->status_param); - } while (*(read_lock++) != last_lock); - last_lock= (*pos); /* Point at last write lock */ - } - else - (*last_lock->lock->copy_status)((*pos)->status_param, - last_lock->status_param); - } - else - last_lock=(*pos); - } while (pos != data); + DBUG_PRINT("info", ("lock skipped. unlocked: %d fix_status: %d", + pos[0]->type == TL_UNLOCK, + pos[0]->lock->fix_status == 0)); + continue; + } + + /* + If same table as previous table use pointer to previous status + information to ensure that all read/write tables shares same + state. + */ + if (first_lock && pos[0]->lock == first_lock[0]->lock) + (pos[0]->lock->fix_status)((*first_lock)->status_param, + (*pos)->status_param); + else + { + /* Different lock, use this as base for next lock */ + first_lock= pos; + (pos[0]->lock->fix_status)((*first_lock)->status_param, 0); + } } -#endif + DBUG_VOID_RETURN; } - /* free all locks */ -void thr_multi_unlock(THR_LOCK_DATA **data,uint count) +/* Unlock all locks */ + +void thr_multi_unlock(THR_LOCK_DATA **data,uint count, uint unlock_flags) { THR_LOCK_DATA **pos,**end; DBUG_ENTER("thr_multi_unlock"); - DBUG_PRINT("lock",("data: 0x%lx count: %d", (long) data, count)); + DBUG_PRINT("lock",("data: 0x%lx count: %d flags: %u", (long) data, count, + unlock_flags)); for (pos=data,end=data+count; pos < end ; pos++) { @@ -1156,7 +1224,7 @@ void thr_multi_unlock(THR_LOCK_DATA **data,uint count) fflush(stdout); #endif if ((*pos)->type != TL_UNLOCK) - thr_unlock(*pos); + thr_unlock(*pos, unlock_flags); else { DBUG_PRINT("lock",("Free lock: data: 0x%lx thread: 0x%lx lock: 0x%lx", @@ -1306,6 +1374,7 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data, ulong lock_wait_timeout) { THR_LOCK *lock=data->lock; + enum enum_thr_lock_result res; DBUG_ENTER("thr_upgrade_write_delay_lock"); mysql_mutex_lock(&lock->mutex); @@ -1326,6 +1395,8 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data, if (data->lock->get_status) (*data->lock->get_status)(data->status_param, 0); mysql_mutex_unlock(&lock->mutex); + if (lock->start_trans) + (*lock->start_trans)(data->status_param); DBUG_RETURN(0); } @@ -1346,7 +1417,10 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data, { check_locks(lock,"waiting for lock",0); } - DBUG_RETURN(wait_for_lock(&lock->write_wait,data,1, lock_wait_timeout)); + res= wait_for_lock(&lock->write_wait, data, 1, lock_wait_timeout); + if (res == THR_LOCK_SUCCESS && lock->start_trans) + DBUG_RETURN((*lock->start_trans)(data->status_param)); + DBUG_RETURN(0); } @@ -1466,7 +1540,7 @@ struct st_test { enum thr_lock_type lock_type; }; -THR_LOCK locks[5]; /* 4 locks */ +THR_LOCK locks[6]; /* Number of locks +1 */ struct st_test test_0[] = {{0,TL_READ}}; /* One lock */ struct st_test test_1[] = {{0,TL_READ},{0,TL_WRITE}}; /* Read and write lock of lock 0 */ @@ -1547,7 +1621,6 @@ static void *test_thread(void *arg) printf("Thread %s (%d) started\n",my_thread_name(),param); fflush(stdout); - thr_lock_info_init(&lock_info); for (i=0; i < lock_counts[param] ; i++) thr_lock_data_init(locks+tests[param][i].lock_nr,data+i,NULL); @@ -1574,7 +1647,7 @@ static void *test_thread(void *arg) } } mysql_mutex_unlock(&LOCK_thread_count); - thr_multi_unlock(multi_locks,lock_counts[param]); + thr_multi_unlock(multi_locks,lock_counts[param], THR_UNLOCK_UPDATE_STATUS); } printf("Thread %s (%d) ended\n",my_thread_name(),param); fflush(stdout); @@ -1592,7 +1665,8 @@ int main(int argc __attribute__((unused)),char **argv __attribute__((unused))) { pthread_t tid; pthread_attr_t thr_attr; - int i,*param,error; + int *param,error; + uint i; MY_INIT(argv[0]); if (argc > 1 && argv[1][0] == '-' && argv[1][1] == '#') DBUG_PUSH(argv[1]+2); @@ -1612,13 +1686,14 @@ int main(int argc __attribute__((unused)),char **argv __attribute__((unused))) exit(1); } - for (i=0 ; i < (int) array_elements(locks) ; i++) + for (i=0 ; i < array_elements(locks) ; i++) { thr_lock_init(locks+i); locks[i].check_status= test_check_status; locks[i].update_status=test_update_status; locks[i].copy_status= test_copy_status; locks[i].get_status= test_get_status; + locks[i].allow_multiple_concurrent_insert= 1; } if ((error=pthread_attr_init(&thr_attr))) { @@ -1644,7 +1719,7 @@ int main(int argc __attribute__((unused)),char **argv __attribute__((unused))) #ifdef HAVE_THR_SETCONCURRENCY (void) thr_setconcurrency(2); #endif - for (i=0 ; i < (int) array_elements(lock_counts) ; i++) + for (i=0 ; i < array_elements(lock_counts) ; i++) { param=(int*) malloc(sizeof(int)); *param=i; @@ -1678,7 +1753,7 @@ int main(int argc __attribute__((unused)),char **argv __attribute__((unused))) } if ((error= mysql_mutex_unlock(&LOCK_thread_count))) fprintf(stderr, "Got error: %d from mysql_mutex_unlock\n", error); - for (i=0 ; i < (int) array_elements(locks) ; i++) + for (i=0 ; i < array_elements(locks) ; i++) thr_lock_delete(locks+i); #ifdef EXTRA_DEBUG if (found_errors) |