diff options
author | unknown <serg@janus.mylan> | 2006-10-18 17:24:07 +0200 |
---|---|---|
committer | unknown <serg@janus.mylan> | 2006-10-18 17:24:07 +0200 |
commit | 12a55aeabc353fdc1c3829ddd8baacb142160c80 (patch) | |
tree | 660c00763097e4fd6cbb954397696684f6d4b810 | |
parent | 67a2b7cf298d0a351256ab13c2577c32e20bd723 (diff) | |
download | mariadb-git-12a55aeabc353fdc1c3829ddd8baacb142160c80.tar.gz |
lock manager passed unit tests
storage/maria/trnman.c:
comments
include/my_dbug.h:
make DBUG_ASSERT always a statement
storage/maria/lockman.h:
comments
include/lf.h:
lf_pinbox - don't use a fixed-size purgatory.
mysys/lf_alloc-pin.c:
lf_pinbox - don't use a fixed-size purgatory.
mysys/lf_hash.c:
lf_pinbox - don't use a fixed-size purgatory.
storage/maria/lockman.c:
removed IGNORE_ME/UPGDARED matching - it was wrong in the first place.
updated for "lf_pinbox - don't use a fixed-size purgatory"
storage/maria/unittest/lockman-t.c:
IGNORE_ME/UPGRADED pair counting bugtest.
more tests
unittest/mysys/my_atomic-t.c:
lf_pinbox - don't use a fixed-size purgatory.
-rw-r--r-- | include/lf.h | 15 | ||||
-rw-r--r-- | include/my_dbug.h | 2 | ||||
-rw-r--r-- | mysys/lf_alloc-pin.c | 172 | ||||
-rw-r--r-- | mysys/lf_hash.c | 4 | ||||
-rw-r--r-- | storage/maria/lockman.c | 199 | ||||
-rw-r--r-- | storage/maria/lockman.h | 9 | ||||
-rw-r--r-- | storage/maria/trnman.c | 22 | ||||
-rw-r--r-- | storage/maria/unittest/lockman-t.c | 85 | ||||
-rw-r--r-- | unittest/mysys/my_atomic-t.c | 4 |
9 files changed, 330 insertions, 182 deletions
diff --git a/include/lf.h b/include/lf.h index 4c6765b2d40..45c5f109e1c 100644 --- a/include/lf.h +++ b/include/lf.h @@ -96,20 +96,21 @@ typedef void lf_pinbox_free_func(void *, void *); typedef struct { LF_DYNARRAY pinstack; lf_pinbox_free_func *free_func; - void * free_func_arg; + void *free_func_arg; + uint free_ptr_offset; uint32 volatile pinstack_top_ver; /* this is a versioned pointer */ uint32 volatile pins_in_stack; /* number of elements in array */ } LF_PINBOX; -/* we want sizeof(LF_PINS) to be close to 128 to avoid false sharing */ +/* we want sizeof(LF_PINS) to be 128 to avoid false sharing */ typedef struct { void * volatile pin[LF_PINBOX_PINS]; - void * purgatory[LF_PURGATORY_SIZE]; LF_PINBOX *pinbox; + void *purgatory; uint32 purgatory_count; uint32 volatile link; char pad[128-sizeof(uint32)*2 - -sizeof(void *)*(LF_PINBOX_PINS+LF_PURGATORY_SIZE+1)]; + -sizeof(void *)*(LF_PINBOX_PINS+2)]; } LF_PINS; #define lf_rwlock_by_pins(PINS) \ @@ -147,8 +148,8 @@ typedef struct { #define _lf_assert_pin(PINS, PIN) assert((PINS)->pin[PIN] != 0) #define _lf_assert_unpin(PINS, PIN) assert((PINS)->pin[PIN]==0) -void lf_pinbox_init(LF_PINBOX *pinbox, lf_pinbox_free_func *free_func, - void * free_func_arg); +void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset, + lf_pinbox_free_func *free_func, void * free_func_arg); void lf_pinbox_destroy(LF_PINBOX *pinbox); lock_wrap(lf_pinbox_get_pins, LF_PINS *, @@ -181,7 +182,7 @@ typedef struct st_lf_allocator { uint32 volatile mallocs; } LF_ALLOCATOR; -void lf_alloc_init(LF_ALLOCATOR *allocator, uint size); +void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset); void lf_alloc_destroy(LF_ALLOCATOR *allocator); uint lf_alloc_in_pool(LF_ALLOCATOR *allocator); #define _lf_alloc_free(PINS, PTR) _lf_pinbox_free((PINS), (PTR)) diff --git a/include/my_dbug.h b/include/my_dbug.h index a397861c1af..e3fdf9d6461 100644 --- a/include/my_dbug.h +++ b/include/my_dbug.h @@ -100,7 +100,7 @@ extern FILE *_db_fp_(void); #define DBUG_LONGJMP(a1) longjmp(a1) #define DBUG_DUMP(keyword,a1,a2) #define DBUG_END() -#define DBUG_ASSERT(A) +#define DBUG_ASSERT(A) do { } while(0) #define DBUG_LOCK_FILE #define DBUG_FILE (stderr) #define DBUG_UNLOCK_FILE diff --git a/mysys/lf_alloc-pin.c b/mysys/lf_alloc-pin.c index cf1612b73d1..842c9690463 100644 --- a/mysys/lf_alloc-pin.c +++ b/mysys/lf_alloc-pin.c @@ -15,21 +15,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* - concurrent allocator based on pinning addresses - - strictly speaking it's not lock-free, as it can be blocked - if a thread's purgatory is full and all addresses from there - are pinned. - - But until the above happens, it's wait-free. - - It can be made strictly wait-free by increasing purgatory size. - If it's larger than pins_in_stack*LF_PINBOX_PINS, then apocalyptical - condition above will never happen. But than the memory requirements - will be O(pins_in_stack^2). - - Note, that for large purgatory sizes it makes sense to remove - purgatory array, and link objects in a list using embedded pointer. + wait-free concurrent allocator based on pinning addresses TODO test with more than 256 threads TODO test w/o alloca @@ -43,15 +29,17 @@ static void _lf_pinbox_real_free(LF_PINS *pins); -void lf_pinbox_init(LF_PINBOX *pinbox, lf_pinbox_free_func *free_func, - void *free_func_arg) +void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset, + lf_pinbox_free_func *free_func,void *free_func_arg) { DBUG_ASSERT(sizeof(LF_PINS) == 128); + DBUG_ASSERT(free_ptr_offset % sizeof(void *) == 0); lf_dynarray_init(&pinbox->pinstack, sizeof(LF_PINS)); - pinbox->pinstack_top_ver=0; - pinbox->pins_in_stack=0; - pinbox->free_func=free_func; - pinbox->free_func_arg=free_func_arg; + pinbox->pinstack_top_ver= 0; + pinbox->pins_in_stack= 0; + pinbox->free_ptr_offset= free_ptr_offset; + pinbox->free_func= free_func; + pinbox->free_func_arg= free_func_arg; } void lf_pinbox_destroy(LF_PINBOX *pinbox) @@ -64,58 +52,64 @@ LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox) uint32 pins, next, top_ver; LF_PINS *el; - top_ver=pinbox->pinstack_top_ver; + top_ver= pinbox->pinstack_top_ver; do { - if (!(pins=top_ver % LF_PINBOX_MAX_PINS)) + if (!(pins= top_ver % LF_PINBOX_MAX_PINS)) { - pins=my_atomic_add32(&pinbox->pins_in_stack, 1)+1; - el=(LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinstack, pins); + pins= my_atomic_add32(&pinbox->pins_in_stack, 1)+1; + el= (LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinstack, pins); break; } - el=(LF_PINS *)_lf_dynarray_value(&pinbox->pinstack, pins); - next=el->link; + el= (LF_PINS *)_lf_dynarray_value(&pinbox->pinstack, pins); + next= el->link; } while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver, top_ver-pins+next+LF_PINBOX_MAX_PINS)); - el->link=pins; - el->purgatory_count=0; - el->pinbox=pinbox; + el->link= pins; + el->purgatory_count= 0; + el->pinbox= pinbox; return el; } void _lf_pinbox_put_pins(LF_PINS *pins) { - LF_PINBOX *pinbox=pins->pinbox; + LF_PINBOX *pinbox= pins->pinbox; uint32 top_ver, nr; - nr=pins->link; + nr= pins->link; #ifdef MY_LF_EXTRA_DEBUG { int i; - for (i=0; i < LF_PINBOX_PINS; i++) + for (i= 0; i < LF_PINBOX_PINS; i++) assert(pins->pin[i] == 0); } #endif + /* + Note - this will deadlock if other threads will wait for + the caller to do something after _lf_pinbox_put_pins(), + and they would have pinned addresses that the caller wants to free. + Thus: only free pins when all work is done and nobody can wait for you!!! + */ while (pins->purgatory_count) { _lf_pinbox_real_free(pins); - if (pins->purgatory_count && my_getncpus() == 1) + if (pins->purgatory_count) { my_atomic_rwlock_wrunlock(&pins->pinbox->pinstack.lock); pthread_yield(); my_atomic_rwlock_wrlock(&pins->pinbox->pinstack.lock); } } - top_ver=pinbox->pinstack_top_ver; + top_ver= pinbox->pinstack_top_ver; if (nr == pinbox->pins_in_stack) { - int32 tmp=nr; + int32 tmp= nr; if (my_atomic_cas32(&pinbox->pins_in_stack, &tmp, tmp-1)) goto ret; } do { - pins->link=top_ver % LF_PINBOX_MAX_PINS; + pins->link= top_ver % LF_PINBOX_MAX_PINS; } while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver, top_ver-pins->link+nr+LF_PINBOX_MAX_PINS)); ret: @@ -127,19 +121,20 @@ static int ptr_cmp(void **a, void **b) return *a < *b ? -1 : *a == *b ? 0 : 1; } +#define add_to_purgatory(PINS, ADDR) \ + do \ + { \ + *(void **)((char *)(ADDR)+(PINS)->pinbox->free_ptr_offset)= \ + (PINS)->purgatory; \ + (PINS)->purgatory= (ADDR); \ + (PINS)->purgatory_count++; \ + } while (0) + void _lf_pinbox_free(LF_PINS *pins, void *addr) { - while (pins->purgatory_count == LF_PURGATORY_SIZE) - { + if (pins->purgatory_count % LF_PURGATORY_SIZE) _lf_pinbox_real_free(pins); - if (pins->purgatory_count == LF_PURGATORY_SIZE && my_getncpus() == 1) - { - my_atomic_rwlock_wrunlock(&pins->pinbox->pinstack.lock); - pthread_yield(); - my_atomic_rwlock_wrlock(&pins->pinbox->pinstack.lock); - } - } - pins->purgatory[pins->purgatory_count++]=addr; + add_to_purgatory(pins, addr); } struct st_harvester { @@ -178,11 +173,11 @@ static int match_pins(LF_PINS *el, void *addr) static void _lf_pinbox_real_free(LF_PINS *pins) { int npins; - void **addr=0; - void **start, **cur, **end=pins->purgatory+pins->purgatory_count; - LF_PINBOX *pinbox=pins->pinbox; + void *list; + void **addr; + LF_PINBOX *pinbox= pins->pinbox; - npins=pinbox->pins_in_stack+1; + npins= pinbox->pins_in_stack+1; #ifdef HAVE_ALLOCA /* create a sorted list of pinned addresses, to speed up searches */ @@ -190,64 +185,64 @@ static void _lf_pinbox_real_free(LF_PINS *pins) { struct st_harvester hv; addr= (void **) alloca(sizeof(void *)*LF_PINBOX_PINS*npins); - hv.granary=addr; - hv.npins=npins; + hv.granary= addr; + hv.npins= npins; _lf_dynarray_iterate(&pinbox->pinstack, (lf_dynarray_func)harvest_pins, &hv); - npins=hv.granary-addr; + npins= hv.granary-addr; if (npins) qsort(addr, npins, sizeof(void *), (qsort_cmp)ptr_cmp); } + else #endif + addr= 0; - start= cur= pins->purgatory; - end= start+pins->purgatory_count; - for (; cur < end; cur++) + list= pins->purgatory; + pins->purgatory= 0; + pins->purgatory_count= 0; + while (list) { + void *cur= list; + list= *(void **)((char *)cur+pinbox->free_ptr_offset); if (npins) { if (addr) { void **a,**b,**c; - for (a=addr, b=addr+npins-1, c=a+(b-a)/2; b-a>1; c=a+(b-a)/2) - if (*cur == *c) - a=b=c; - else if (*cur > *c) - a=c; + for (a= addr, b= addr+npins-1, c= a+(b-a)/2; b-a>1; c= a+(b-a)/2) + if (cur == *c) + a= b= c; + else if (cur > *c) + a= c; else - b=c; - if (*cur == *a || *cur == *b) + b= c; + if (cur == *a || cur == *b) goto found; } else { if (_lf_dynarray_iterate(&pinbox->pinstack, - (lf_dynarray_func)match_pins, *cur)) + (lf_dynarray_func)match_pins, cur)) goto found; } } /* not pinned - freeing */ - pinbox->free_func(*cur, pinbox->free_func_arg); + pinbox->free_func(cur, pinbox->free_func_arg); continue; found: /* pinned - keeping */ - *start++=*cur; + add_to_purgatory(pins, cur); } - pins->purgatory_count=start-pins->purgatory; -#ifdef MY_LF_EXTRA_DEBUG - while (start < pins->purgatory + LF_PURGATORY_SIZE) - *start++=0; -#endif } static void alloc_free(void *node, LF_ALLOCATOR *allocator) { void *tmp; - tmp=allocator->top; + tmp= allocator->top; do { - (*(void **)node)=tmp; + (*(void **)node)= tmp; } while (!my_atomic_casptr((void **)&allocator->top, (void **)&tmp, node) && LF_BACKOFF); } @@ -255,18 +250,18 @@ static void alloc_free(void *node, LF_ALLOCATOR *allocator) LF_REQUIRE_PINS(1); void *_lf_alloc_new(LF_PINS *pins) { - LF_ALLOCATOR *allocator=(LF_ALLOCATOR *)(pins->pinbox->free_func_arg); + LF_ALLOCATOR *allocator= (LF_ALLOCATOR *)(pins->pinbox->free_func_arg); void *node; for (;;) { do { - node=allocator->top; + node= allocator->top; _lf_pin(pins, 0, node); - } while (node !=allocator->top && LF_BACKOFF); + } while (node != allocator->top && LF_BACKOFF); if (!node) { - if (!(node=my_malloc(allocator->element_size, MYF(MY_WME|MY_ZEROFILL)))) + if (!(node= my_malloc(allocator->element_size, MYF(MY_WME|MY_ZEROFILL)))) goto ret; #ifdef MY_LF_EXTRA_DEBUG my_atomic_add32(&allocator->mallocs, 1); @@ -282,27 +277,27 @@ ret: return node; } -void lf_alloc_init(LF_ALLOCATOR *allocator, uint size) +void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset) { - lf_pinbox_init(&allocator->pinbox, + lf_pinbox_init(&allocator->pinbox, free_ptr_offset, (lf_pinbox_free_func *)alloc_free, allocator); - allocator->top=0; - allocator->mallocs=0; - allocator->element_size=size; + allocator->top= 0; + allocator->mallocs= 0; + allocator->element_size= size; DBUG_ASSERT(size >= (int)sizeof(void *)); } void lf_alloc_destroy(LF_ALLOCATOR *allocator) { - void *el=allocator->top; + void *el= allocator->top; while (el) { - void *tmp=*(void **)el; + void *tmp= *(void **)el; my_free(el, MYF(0)); - el=tmp; + el= tmp; } lf_pinbox_destroy(&allocator->pinbox); - allocator->top=0; + allocator->top= 0; } /* @@ -313,7 +308,8 @@ uint lf_alloc_in_pool(LF_ALLOCATOR *allocator) { uint i; void *node; - for (node=allocator->top, i=0; node; node=*(void **)node, i++) /* no op */; + for (node= allocator->top, i= 0; node; node= *(void **)node, i++) + /* no op */; return i; } diff --git a/mysys/lf_hash.c b/mysys/lf_hash.c index a0425e89556..45b45f7531e 100644 --- a/mysys/lf_hash.c +++ b/mysys/lf_hash.c @@ -20,6 +20,7 @@ TODO try to get rid of dummy nodes ? for non-unique hash, count only _distinct_ values + (but how to do it in lf_hash_delete ?) */ #include <my_global.h> #include <my_sys.h> @@ -222,7 +223,8 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags, uint key_offset, uint key_length, hash_get_key get_key, CHARSET_INFO *charset) { - lf_alloc_init(&hash->alloc,sizeof(LF_SLIST)+element_size); + lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size, + offsetof(LF_SLIST, key)); lf_dynarray_init(&hash->array, sizeof(LF_SLIST **)); hash->size=1; hash->count=0; diff --git a/storage/maria/lockman.c b/storage/maria/lockman.c index e8ddbd1a25a..1712f6f2221 100644 --- a/storage/maria/lockman.c +++ b/storage/maria/lockman.c @@ -1,4 +1,4 @@ -// TODO lock escalation, instant duration locks +// TODO instant duration locks // automatically place S instead of LS if possible /* TODO optimization: table locks - they have completely @@ -21,6 +21,94 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +/* + Generic Lock Manager + + Lock manager handles locks on "resources", a resource must be uniquely + identified by a 64-bit number. Lock manager itself does not imply + anything about the nature of a resource - it can be a row, a table, a + database, or just anything. + + Locks belong to "lock owners". A Lock owner is uniquely identified by a + 16-bit number. A function loid2lo must be provided by the application + that takes such a number as an argument and returns a LOCK_OWNER + structure. + + Lock levels are completely defined by three tables. Lock compatibility + matrix specifies which locks can be held at the same time on a resource. + Lock combining matrix specifies what lock level has the same behaviour as + a pair of two locks of given levels. getlock_result matrix simplifies + intention locking and lock escalation for an application, basically it + defines which locks are intention locks and which locks are "loose" + locks. It is only used to provide better diagnostics for the + application, lock manager itself does not differentiate between normal, + intention, and loose locks. + + Internally lock manager is based on a lock-free hash, see lf_hash.c for + details. All locks are stored in a hash, with a resource id as a search + key, so all locks for the same resource will be considered collisions and + will be put in a one (lock-free) linked list. The main lock-handling + logic is in the inner loop that searches for a lock in such a linked + list - lockfind(). + + This works as follows. Locks generally are added to the end of the list + (with one exception, see below). When scanning the list it is always + possible to determine what locks are granted (active) and what locks are + waiting - first lock is obviously active, the second is active if it's + compatible with the first, and so on, a lock is active if it's compatible + with all previous locks and all locks before it are also active. + To calculate the "compatible with all previous locks" all locks are + accumulated in prev_lock variable using lock_combining_matrix. + + Lock upgrades: when a thread that has a lock on a given resource, + requests a new lock on the same resource and the old lock is not enough + to satisfy new lock requirements (which is defined by + lock_combining_matrix[old_lock][new_lock] != old_lock), a new lock is + placed in the list. Depending on other locks it is immediately active or + it will wait for other locks. Here's an exception to "locks are added + to the end" rule - upgraded locks are added after the last active lock + but before all waiting locks. Old lock (the one we upgraded from) is + not removed from the list, indeed we may need to return to it later if + the new lock was in a savepoint that gets rolled back. So old lock is + marked as "ignored" (IGNORE_ME flag). New lock gets an UPGRADED flag. + + Loose locks add an important exception to the above. Loose locks do not + always commute with other locks. In the list IX-LS both locks are active, + while in the LS-IX list only the first lock is active. This creates a + problem in lock upgrades. If the list was IX-LS and the owner of the + first lock wants to place LS lock (which can be immediately granted), the + IX lock is upgraded to LSIX and the list becomes IX-LS-LSIX, which, + according to the lock compatibility matrix means that the last lock is + waiting - of course it all happened because IX and LS were swapped and + they don't commute. To work around this there's ACTIVE flag which is set + in every lock that never waited (was placed active), and this flag + overrides "compatible with all previous locks" rule. + + When a lock is placed to the end of the list it's either compatible with + all locks and all locks are active - new lock becomes active at once, or + it conflicts with some of the locks, in this case in the 'blocker' + variable a conflicting lock is returned and the calling thread waits on a + pthread condition in the LOCK_OWNER structure of the owner of the + conflicting lock. Or a new lock is compatible with all locks, but some + existing locks are not compatible with previous locks (example: request IS, + when the list is S-IX) - that is not all locks are active. In this case a + first waiting lock is returned in the 'blocker' variable, + lockman_getlock() notices that a "blocker" does not conflict with the + requested lock, and "dereferences" it, to find the lock that it's waiting + on. The calling thread than begins to wait on the same lock. + + To better support table-row relations where one needs to lock the table + with an intention lock before locking the row, extended diagnostics is + provided. When an intention lock (presumably on a table) is granted, + lockman_getlock() returns one of GOT_THE_LOCK (no need to lock the row, + perhaps the thread already has a normal lock on this table), + GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE (need to lock the row, as usual), + GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE (only need to check + whether it's possible to lock the row, but no need to lock it - perhaps + the thread has a loose lock on this table). This is defined by + getlock_result[] table. +*/ + #include <my_global.h> #include <my_sys.h> #include <my_bit.h> @@ -36,11 +124,6 @@ ') Though you can take LS lock while somebody has S lock, it makes no sense - it's simpler to take S lock too. - ") Strictly speaking you can take LX lock while somebody has S lock. - But in this case you lock no rows, because all rows are locked by this - somebody. So we prefer to define that LX cannot be taken when S - exists. Same about LX and X. - 1 - compatible 0 - incompatible -1 - "impossible", so that we can assert the impossibility. @@ -107,8 +190,8 @@ static enum lock_type lock_combining_matrix[10][10]= Defines below help to preserve the table structure. I/L/A values are self explanatory x means the combination is possible (assert should not crash) - but cannot happen in row locks, only in table locks (S,X), or - lock escalations (LS,LX) + but it cannot happen in row locks, only in table locks (S,X), + or lock escalations (LS,LX) */ #define I GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE #define L GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE @@ -138,8 +221,7 @@ typedef struct lockman_lock { uint64 resource; struct lockman_lock *lonext; intptr volatile link; - uint32 hashnr; -//#warning TODO - remove hashnr from LOCK + uint32 hashnr; // TODO - remove hashnr from LOCK uint16 loid; uchar lock; /* sizeof(uchar) <= sizeof(enum) */ uchar flags; @@ -147,6 +229,7 @@ typedef struct lockman_lock { #define IGNORE_ME 1 #define UPGRADED 2 +#define ACTIVE 4 typedef struct { intptr volatile *prev; @@ -171,7 +254,7 @@ static int lockfind(LOCK * volatile *head, LOCK *node, my_bool cur_active, compatible, upgrading, prev_active; enum lock_type lock, prev_lock, cur_lock; uint16 loid, cur_loid; - int upgraded_pairs, cur_flags, flags; + int cur_flags, flags; hashnr= node->hashnr; resource= node->resource; @@ -187,7 +270,6 @@ retry: upgrading= FALSE; cursor->blocker= cursor->upgrade_from= 0; _lf_unpin(pins, 3); - upgraded_pairs= 0; do { cursor->curr= PTR(*cursor->prev); _lf_pin(pins,1,cursor->curr); @@ -217,28 +299,24 @@ retry: (cur_hashnr == hashnr && cur_resource >= resource)) { if (cur_hashnr > hashnr || cur_resource > resource) - { - if (upgraded_pairs != 0) - goto retry; break; - } /* ok, we have a lock for this resource */ DBUG_ASSERT(lock_compatibility_matrix[prev_lock][cur_lock] >= 0); DBUG_ASSERT(lock_compatibility_matrix[cur_lock][lock] >= 0); - if (cur_flags & UPGRADED) - upgraded_pairs++; if ((cur_flags & IGNORE_ME) && ! (flags & IGNORE_ME)) { DBUG_ASSERT(cur_active); - upgraded_pairs--; if (cur_loid == loid) cursor->upgrade_from= cursor->curr; } else { prev_active= cur_active; - cur_active&= lock_compatibility_matrix[prev_lock][cur_lock]; - if (upgrading && !cur_active && upgraded_pairs == 0) + if (cur_flags & ACTIVE) + DBUG_ASSERT(prev_active == TRUE); + else + cur_active&= lock_compatibility_matrix[prev_lock][cur_lock]; + if (upgrading && !cur_active) break; if (prev_active && !cur_active) { @@ -253,8 +331,14 @@ retry: if (lock_combining_matrix[cur_lock][lock] == cur_lock) { /* new lock is compatible */ - return cur_active ? ALREADY_HAVE_THE_LOCK - : ALREADY_HAVE_THE_REQUEST; + if (cur_active) + { + cursor->blocker= cursor->curr; /* loose-locks! */ + _lf_unpin(pins, 3); /* loose-locks! */ + return ALREADY_HAVE_THE_LOCK; + } + else + return ALREADY_HAVE_THE_REQUEST; } /* not compatible, upgrading */ upgrading= TRUE; @@ -268,9 +352,9 @@ retry: cursor->blocker= cursor->curr; _lf_pin(pins, 3, cursor->curr); } - prev_lock= lock_combining_matrix[prev_lock][cur_lock]; - DBUG_ASSERT(prev_lock != N); } + prev_lock= lock_combining_matrix[prev_lock][cur_lock]; + DBUG_ASSERT(prev_lock != N); } } cursor->prev= &(cursor->curr->link); @@ -335,6 +419,10 @@ static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins, node->flags|= UPGRADED; node->lock= lock_combining_matrix[cursor.upgrade_from->lock][node->lock]; } + if (!(res & NEED_TO_WAIT)) + node->flags|= ACTIVE; + else + node->flags&= ~ACTIVE; /* if we're retrying on REPEAT_ONCE_MORE */ node->link= (intptr)cursor.curr; DBUG_ASSERT(node->link != (intptr)node); DBUG_ASSERT(cursor.prev != &node->link); @@ -349,13 +437,13 @@ static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins, _lf_unpin(pins, 1); _lf_unpin(pins, 2); /* - note that cursor.curr is NOT pinned on return. - this is ok as it's either a dummy node for initialize_bucket + note that blocker is not necessarily pinned here (when it's == curr). + this is ok as it's either a dummy node then for initialize_bucket and dummy nodes don't need pinning, or it's a lock of the same transaction for lockman_getlock, and it cannot be removed by another thread */ - *blocker= cursor.blocker ? cursor.blocker : cursor.curr; + *blocker= cursor.blocker; return res; } @@ -419,7 +507,7 @@ static int lockdelete(LOCK * volatile *head, LOCK *node, LF_PINS *pins) void lockman_init(LOCKMAN *lm, loid_to_lo_func *func, uint timeout) { - lf_alloc_init(&lm->alloc,sizeof(LOCK)); + lf_alloc_init(&lm->alloc,sizeof(LOCK), offsetof(LOCK,lonext)); lf_dynarray_init(&lm->array, sizeof(LOCK **)); lm->size= 1; lm->count= 0; @@ -516,13 +604,13 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo, res= lockinsert(el, node, pins, &blocker); if (res & ALREADY_HAVE) { + int r; old_lock= blocker->lock; - _lf_assert_unpin(pins, 3); /* unpin should not be needed */ _lf_alloc_free(pins, node); lf_rwunlock_by_pins(pins); - res= getlock_result[old_lock][lock]; - DBUG_ASSERT(res); - return res; + r= getlock_result[old_lock][lock]; + DBUG_ASSERT(r); + return r; } /* a new value was added to the hash */ csize= lm->size; @@ -537,9 +625,8 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo, struct timespec timeout; _lf_assert_pin(pins, 3); /* blocker must be pinned here */ - lf_rwunlock_by_pins(pins); - wait_for_lo= lm->loid_to_lo(blocker->loid); + /* now, this is tricky. blocker is not necessarily a LOCK we're waiting for. If it's compatible with what we want, @@ -550,12 +637,9 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo, if (lock_compatibility_matrix[blocker->lock][lock]) { blocker= wait_for_lo->all_locks; - lf_pin(pins, 3, blocker); + _lf_pin(pins, 3, blocker); if (blocker != wait_for_lo->all_locks) - { - lf_rwlock_by_pins(pins); continue; - } wait_for_lo= wait_for_lo->waiting_for; } @@ -565,11 +649,11 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo, to an unrelated - albeit valid - LOCK_OWNER */ if (!wait_for_lo) - { - /* blocker transaction has ended, short id was released */ - lf_rwlock_by_pins(pins); continue; - } + + lo->waiting_for= wait_for_lo; + lf_rwunlock_by_pins(pins); + /* We lock a mutex - it may belong to a wrong LOCK_OWNER, but it must belong to _some_ LOCK_OWNER. It means, we can never free() a LOCK_OWNER, @@ -587,9 +671,8 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo, lf_rwlock_by_pins(pins); continue; } - /* yuck. waiting */ - lo->waiting_for= wait_for_lo; + /* yuck. waiting */ deadline= my_getsystime() + lm->lock_timeout * 10000; timeout.tv_sec= deadline/10000000; timeout.tv_nsec= (deadline % 10000000) * 100; @@ -607,11 +690,12 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo, Instead we're relying on the caller to abort the transaction, and release all locks at once - see lockman_release_locks() */ + _lf_unpin(pins, 3); lf_rwunlock_by_pins(pins); return DIDNT_GET_THE_LOCK; } - lo->waiting_for= 0; } + lo->waiting_for= 0; _lf_assert_unpin(pins, 3); /* unpin should not be needed */ lf_rwunlock_by_pins(pins); return getlock_result[lock][lock]; @@ -626,14 +710,15 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo, */ int lockman_release_locks(LOCKMAN *lm, LOCK_OWNER *lo) { - LOCK * volatile *el, *node; + LOCK * volatile *el, *node, *next; uint bucket; LF_PINS *pins= lo->pins; pthread_mutex_lock(lo->mutex); lf_rwlock_by_pins(pins); - for (node= lo->all_locks; node; node= node->lonext) + for (node= lo->all_locks; node; node= next) { + next= node->lonext; bucket= calc_hash(node->resource) % lm->size; el= _lf_dynarray_lvalue(&lm->array, bucket); if (*el == NULL) @@ -650,12 +735,12 @@ int lockman_release_locks(LOCKMAN *lm, LOCK_OWNER *lo) } #ifdef MY_LF_EXTRA_DEBUG +static char *lock2str[]= +{ "N", "S", "X", "IS", "IX", "SIX", "LS", "LX", "SLX", "LSIX" }; /* NOTE the function below is NOT thread-safe !!! */ -static char *lock2str[]= -{ "N", "S", "X", "IS", "IX", "SIX", "LS", "LX", "SLX", "LSIX" }; void print_lockhash(LOCKMAN *lm) { LOCK *el= *(LOCK **)_lf_dynarray_lvalue(&lm->array, 0); @@ -664,17 +749,21 @@ void print_lockhash(LOCKMAN *lm) { intptr next= el->link; if (el->hashnr & 1) + { printf("0x%08x { resource %llu, loid %u, lock %s", el->hashnr, el->resource, el->loid, lock2str[el->lock]); + if (el->flags & IGNORE_ME) printf(" IGNORE_ME"); + if (el->flags & UPGRADED) printf(" UPGRADED"); + if (el->flags & ACTIVE) printf(" ACTIVE"); + if (DELETED(next)) printf(" ***DELETED***"); + printf("}\n"); + } else { - printf("0x%08x { dummy ", el->hashnr); + /*printf("0x%08x { dummy }\n", el->hashnr);*/ DBUG_ASSERT(el->resource == 0 && el->loid == 0 && el->lock == X); } - if (el->flags & IGNORE_ME) printf(" IGNORE_ME"); - if (el->flags & UPGRADED) printf(" UPGRADED"); - printf("}\n"); - el= (LOCK *)next; + el= PTR(next); } } #endif diff --git a/storage/maria/lockman.h b/storage/maria/lockman.h index a3c96786935..9edd79eb7f1 100644 --- a/storage/maria/lockman.h +++ b/storage/maria/lockman.h @@ -18,7 +18,10 @@ #define _lockman_h /* - N - "no lock", not a lock, used sometimes to simplify the code + Lock levels: + ^^^^^^^^^^^ + + N - "no lock", not a lock, used sometimes internally to simplify the code S - Shared X - eXclusive IS - Intention Shared @@ -35,8 +38,8 @@ struct lockman_lock; typedef struct st_lock_owner LOCK_OWNER; struct st_lock_owner { - LF_PINS *pins; - struct lockman_lock *all_locks; + LF_PINS *pins; /* must be allocated from lockman's pinbox */ + struct lockman_lock *all_locks; /* a LIFO */ LOCK_OWNER *waiting_for; pthread_cond_t *cond; /* transactions waiting for this, wait on 'cond' */ pthread_mutex_t *mutex; /* mutex is required to use 'cond' */ diff --git a/storage/maria/trnman.c b/storage/maria/trnman.c index 49f49a3e26b..ecabf100cb8 100644 --- a/storage/maria/trnman.c +++ b/storage/maria/trnman.c @@ -128,6 +128,11 @@ static void set_short_trid(TRN *trn) trn->locks.loid= i; } +/* + DESCRIPTION + start a new transaction, allocate and initialize transaction object + mutex and cond will be used for lock waits +*/ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) { TRN *trn; @@ -148,6 +153,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) trnman_active_transactions++; trn= pool; + /* popping an element from a stack */ my_atomic_rwlock_wrlock(&LOCK_pool); while (trn && !my_atomic_casptr((void **)&pool, (void **)&trn, (void *)trn->next)) @@ -213,9 +219,12 @@ void trnman_end_trn(TRN *trn, my_bool commit) LF_PINS *pins= trn->pins; pthread_mutex_lock(&LOCK_trn_list); + + /* remove from active list */ trn->next->prev= trn->prev; trn->prev->next= trn->next; + /* if this transaction was the oldest - clean up committed list */ if (trn->prev == &active_list_min) { TRN *t; @@ -232,6 +241,7 @@ void trnman_end_trn(TRN *trn, my_bool commit) } } + /* add transaction to the committed list (for read-from relations) */ if (commit && active_list_min.next != &active_list_max) { trn->commit_trid= global_trid_generator; @@ -243,7 +253,7 @@ void trnman_end_trn(TRN *trn, my_bool commit) res= lf_hash_insert(&trid_to_trn, pins, &trn); DBUG_ASSERT(res == 0); } - else + else /* or free it right away */ { trn->next= free_me; free_me= trn; @@ -251,6 +261,7 @@ void trnman_end_trn(TRN *trn, my_bool commit) trnman_active_transactions--; pthread_mutex_unlock(&LOCK_trn_list); + /* the rest is done outside of a critical section */ lockman_release_locks(&maria_lockman, &trn->locks); trn->locks.mutex= 0; trn->locks.cond= 0; @@ -258,7 +269,6 @@ void trnman_end_trn(TRN *trn, my_bool commit) my_atomic_storeptr((void **)&short_trid_to_trn[trn->locks.loid], 0); my_atomic_rwlock_rdunlock(&LOCK_short_trid_to_trn); - while (free_me) // XXX send them to the purge thread { int res; @@ -288,7 +298,7 @@ void trnman_free_trn(TRN *trn) do { /* - without volatile cast gcc-3.4.4 moved the assignment + without this volatile cast gcc-3.4.4 moved the assignment down after the loop at -O2 */ *(TRN * volatile *)&(trn->next)= tmp; @@ -317,13 +327,13 @@ my_bool trnman_can_read_from(TRN *trn, TrID trid) LF_REQUIRE_PINS(3); if (trid < trn->min_read_from) - return TRUE; + return TRUE; /* can read */ if (trid > trn->trid) - return FALSE; + return FALSE; /* cannot read */ found= lf_hash_search(&trid_to_trn, trn->pins, &trid, sizeof(trid)); if (!found) - return FALSE; /* not in the hash of committed transactions = cannot read*/ + return FALSE; /* not in the hash of committed transactions = cannot read */ can= (*found)->commit_trid < trn->trid; lf_unpin(trn->pins, 2); diff --git a/storage/maria/unittest/lockman-t.c b/storage/maria/unittest/lockman-t.c index 8b62ccfe094..638078fea65 100644 --- a/storage/maria/unittest/lockman-t.c +++ b/storage/maria/unittest/lockman-t.c @@ -14,7 +14,7 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -//#define EXTRA_VERBOSE +#undef EXTRA_VERBOSE #include <tap.h> @@ -24,7 +24,7 @@ #include <lf.h> #include "../lockman.h" -#define Nlos 10 +#define Nlos 100 LOCK_OWNER loarray[Nlos]; pthread_mutex_t mutexes[Nlos]; pthread_cond_t conds[Nlos]; @@ -51,8 +51,7 @@ LOCK_OWNER *loid2lo(uint16 loid) #define lock_ok_a(O,R,L) test_lock(O,R,L,"",GOT_THE_LOCK) #define lock_ok_i(O,R,L) test_lock(O,R,L,"",GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE) #define lock_ok_l(O,R,L) test_lock(O,R,L,"",GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE) -#define lock_conflict(O,R,L) test_lock(O,R,L,"cannot ",DIDNT_GET_THE_LOCK); \ - unlock_all(O) +#define lock_conflict(O,R,L) test_lock(O,R,L,"cannot ",DIDNT_GET_THE_LOCK); void test_lockman_simple() { @@ -64,7 +63,8 @@ void test_lockman_simple() lock_ok_a(1, 1, X); lock_ok_i(2, 2, IX); /* failures */ - lock_conflict(2,1,X); /* this removes all locks of lo2 */ + lock_conflict(2,1,X); + unlock_all(2); lock_ok_a(1,2,S); lock_ok_a(1,2,IS); lock_ok_a(1,2,LS); @@ -72,8 +72,36 @@ void test_lockman_simple() lock_ok_a(2,3,LS); lock_ok_i(1,3,IX); lock_ok_l(2,3,IS); - lockman_release_locks(&lockman, loid2lo(1)); - lockman_release_locks(&lockman, loid2lo(2)); + unlock_all(1); + unlock_all(2); + + lock_ok_i(1,1,IX); + lock_conflict(2,1,S); + lock_ok_a(1,1,LS); + unlock_all(1); + unlock_all(2); + + lock_ok_i(1,1,IX); + lock_ok_a(2,1,LS); + lock_ok_a(1,1,LS); + lock_ok_i(1,1,IX); + lock_ok_i(3,1,IS); + unlock_all(1); + unlock_all(2); + unlock_all(3); + + lock_ok_i(1,4,IS); + lock_ok_i(2,4,IS); + lock_ok_i(3,4,IS); + lock_ok_a(3,4,LS); + lock_ok_i(4,4,IS); + lock_conflict(4,4,IX); + lock_conflict(2,4,IX); + lock_ok_a(1,4,LS); + unlock_all(1); + unlock_all(2); + unlock_all(3); + unlock_all(4); } @@ -82,11 +110,13 @@ pthread_mutex_t rt_mutex; pthread_cond_t rt_cond; int rt_num_threads; int litmus; +int thread_number= 0, timeouts=0; void run_test(const char *test, pthread_handler handler, int n, int m) { pthread_t t; ulonglong now= my_getsystime(); + thread_number= timeouts= 0; litmus= 0; diag("Testing %s with %d threads, %d iterations... ", test, n, m); @@ -100,13 +130,12 @@ void run_test(const char *test, pthread_handler handler, int n, int m) ok(litmus == 0, "tested %s in %g secs (%d)", test, ((double)now)/1e7, litmus); } -int thread_number= 0, timeouts=0; -#define Nrows 1000 -#define Ntables 10 -#define TABLE_LOCK_RATIO 10 +int Nrows= 100; +int Ntables= 10; +int table_lock_ratio= 10; enum lock_type lock_array[6]={S,X,LS,LX,IS,IX}; char *lock2str[6]={"S","X","LS","LX","IS","IX"}; -char *res2str[6]={ +char *res2str[4]={ "DIDN'T GET THE LOCK", "GOT THE LOCK", "GOT THE LOCK NEED TO LOCK A SUBRESOURCE", @@ -128,10 +157,11 @@ pthread_handler_t test_lockman(void *arg) row= x % Nrows + Ntables; table= row % Ntables; locklevel= (x/Nrows) & 3; - if ((x/Nrows/4) % TABLE_LOCK_RATIO == 0) + if (table_lock_ratio && (x/Nrows/4) % table_lock_ratio == 0) { /* table lock */ res= lockman_getlock(&lockman, lo, table, lock_array[locklevel]); - DIAG(("loid=%2d, table %d lock %s, res=%s", loid, table, lock2str[locklevel], res2str[res])); + DIAG(("loid=%2d, table %d lock %s, res=%s", loid, table, + lock2str[locklevel], res2str[res])); if (res == DIDNT_GET_THE_LOCK) { lockman_release_locks(&lockman, lo); @@ -145,7 +175,8 @@ pthread_handler_t test_lockman(void *arg) { /* row lock */ locklevel&= 1; res= lockman_getlock(&lockman, lo, table, lock_array[locklevel + 4]); - DIAG(("loid=%2d, row %d lock %s, res=%s", loid, row, lock2str[locklevel+4], res2str[res])); + DIAG(("loid=%2d, row %d lock %s, res=%s", loid, row, + lock2str[locklevel+4], res2str[res])); switch (res) { case DIDNT_GET_THE_LOCK: @@ -159,7 +190,8 @@ pthread_handler_t test_lockman(void *arg) /* not implemented, so take a regular lock */ case GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE: res= lockman_getlock(&lockman, lo, row, lock_array[locklevel]); - DIAG(("loid=%2d, ROW %d lock %s, res=%s", loid, row, lock2str[locklevel], res2str[res])); + DIAG(("loid=%2d, ROW %d lock %s, res=%s", loid, row, + lock2str[locklevel], res2str[res])); if (res == DIDNT_GET_THE_LOCK) { lockman_release_locks(&lockman, lo); @@ -196,7 +228,7 @@ int main() my_init(); - plan(14); + plan(31); if (my_atomic_initialize()) return exit_status(); @@ -222,11 +254,21 @@ int main() test_lockman_simple(); -#define CYCLES 100 +#define CYCLES 1000 #define THREADS Nlos /* don't change this line */ + /* mixed load, stress-test with random locks */ + Nrows= 100; + Ntables= 10; + table_lock_ratio= 10; run_test("lockman", test_lockman, THREADS,CYCLES); + /* "real-life" simulation - many rows, no table locks */ + Nrows= 1000000; + Ntables= 10; + table_lock_ratio= 0; + run_test("lockman", test_lockman, THREADS,10000); + for (i= 0; i < Nlos; i++) { lockman_release_locks(&lockman, &loarray[i]); @@ -235,7 +277,12 @@ int main() lf_pinbox_put_pins(loarray[i].pins); } - lockman_destroy(&lockman); + { + ulonglong now= my_getsystime(); + lockman_destroy(&lockman); + now= my_getsystime()-now; + diag("lockman_destroy: %g", ((double)now)/1e7); + } pthread_mutex_destroy(&rt_mutex); pthread_cond_destroy(&rt_cond); diff --git a/unittest/mysys/my_atomic-t.c b/unittest/mysys/my_atomic-t.c index 881f80a66f8..e2177b94720 100644 --- a/unittest/mysys/my_atomic-t.c +++ b/unittest/mysys/my_atomic-t.c @@ -154,7 +154,7 @@ pthread_handler_t test_lf_pinbox(void *arg) typedef union { int32 data; - void *not_used; /* to guarantee sizeof(TLA) >= sizeof(void *) */ + void *not_used; } TLA; pthread_handler_t test_lf_alloc(void *arg) @@ -294,7 +294,7 @@ int main() pthread_mutex_init(&mutex, 0); pthread_cond_init(&cond, 0); my_atomic_rwlock_init(&rwl); - lf_alloc_init(&lf_allocator, sizeof(TLA)); + lf_alloc_init(&lf_allocator, sizeof(TLA), offsetof(TLA, not_used)); lf_hash_init(&lf_hash, sizeof(int), LF_HASH_UNIQUE, 0, sizeof(int), 0, &my_charset_bin); |