summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <serg@janus.mylan>2006-10-18 17:24:07 +0200
committerunknown <serg@janus.mylan>2006-10-18 17:24:07 +0200
commit12a55aeabc353fdc1c3829ddd8baacb142160c80 (patch)
tree660c00763097e4fd6cbb954397696684f6d4b810
parent67a2b7cf298d0a351256ab13c2577c32e20bd723 (diff)
downloadmariadb-git-12a55aeabc353fdc1c3829ddd8baacb142160c80.tar.gz
lock manager passed unit tests
storage/maria/trnman.c: comments include/my_dbug.h: make DBUG_ASSERT always a statement storage/maria/lockman.h: comments include/lf.h: lf_pinbox - don't use a fixed-size purgatory. mysys/lf_alloc-pin.c: lf_pinbox - don't use a fixed-size purgatory. mysys/lf_hash.c: lf_pinbox - don't use a fixed-size purgatory. storage/maria/lockman.c: removed IGNORE_ME/UPGDARED matching - it was wrong in the first place. updated for "lf_pinbox - don't use a fixed-size purgatory" storage/maria/unittest/lockman-t.c: IGNORE_ME/UPGRADED pair counting bugtest. more tests unittest/mysys/my_atomic-t.c: lf_pinbox - don't use a fixed-size purgatory.
-rw-r--r--include/lf.h15
-rw-r--r--include/my_dbug.h2
-rw-r--r--mysys/lf_alloc-pin.c172
-rw-r--r--mysys/lf_hash.c4
-rw-r--r--storage/maria/lockman.c199
-rw-r--r--storage/maria/lockman.h9
-rw-r--r--storage/maria/trnman.c22
-rw-r--r--storage/maria/unittest/lockman-t.c85
-rw-r--r--unittest/mysys/my_atomic-t.c4
9 files changed, 330 insertions, 182 deletions
diff --git a/include/lf.h b/include/lf.h
index 4c6765b2d40..45c5f109e1c 100644
--- a/include/lf.h
+++ b/include/lf.h
@@ -96,20 +96,21 @@ typedef void lf_pinbox_free_func(void *, void *);
typedef struct {
LF_DYNARRAY pinstack;
lf_pinbox_free_func *free_func;
- void * free_func_arg;
+ void *free_func_arg;
+ uint free_ptr_offset;
uint32 volatile pinstack_top_ver; /* this is a versioned pointer */
uint32 volatile pins_in_stack; /* number of elements in array */
} LF_PINBOX;
-/* we want sizeof(LF_PINS) to be close to 128 to avoid false sharing */
+/* we want sizeof(LF_PINS) to be 128 to avoid false sharing */
typedef struct {
void * volatile pin[LF_PINBOX_PINS];
- void * purgatory[LF_PURGATORY_SIZE];
LF_PINBOX *pinbox;
+ void *purgatory;
uint32 purgatory_count;
uint32 volatile link;
char pad[128-sizeof(uint32)*2
- -sizeof(void *)*(LF_PINBOX_PINS+LF_PURGATORY_SIZE+1)];
+ -sizeof(void *)*(LF_PINBOX_PINS+2)];
} LF_PINS;
#define lf_rwlock_by_pins(PINS) \
@@ -147,8 +148,8 @@ typedef struct {
#define _lf_assert_pin(PINS, PIN) assert((PINS)->pin[PIN] != 0)
#define _lf_assert_unpin(PINS, PIN) assert((PINS)->pin[PIN]==0)
-void lf_pinbox_init(LF_PINBOX *pinbox, lf_pinbox_free_func *free_func,
- void * free_func_arg);
+void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset,
+ lf_pinbox_free_func *free_func, void * free_func_arg);
void lf_pinbox_destroy(LF_PINBOX *pinbox);
lock_wrap(lf_pinbox_get_pins, LF_PINS *,
@@ -181,7 +182,7 @@ typedef struct st_lf_allocator {
uint32 volatile mallocs;
} LF_ALLOCATOR;
-void lf_alloc_init(LF_ALLOCATOR *allocator, uint size);
+void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset);
void lf_alloc_destroy(LF_ALLOCATOR *allocator);
uint lf_alloc_in_pool(LF_ALLOCATOR *allocator);
#define _lf_alloc_free(PINS, PTR) _lf_pinbox_free((PINS), (PTR))
diff --git a/include/my_dbug.h b/include/my_dbug.h
index a397861c1af..e3fdf9d6461 100644
--- a/include/my_dbug.h
+++ b/include/my_dbug.h
@@ -100,7 +100,7 @@ extern FILE *_db_fp_(void);
#define DBUG_LONGJMP(a1) longjmp(a1)
#define DBUG_DUMP(keyword,a1,a2)
#define DBUG_END()
-#define DBUG_ASSERT(A)
+#define DBUG_ASSERT(A) do { } while(0)
#define DBUG_LOCK_FILE
#define DBUG_FILE (stderr)
#define DBUG_UNLOCK_FILE
diff --git a/mysys/lf_alloc-pin.c b/mysys/lf_alloc-pin.c
index cf1612b73d1..842c9690463 100644
--- a/mysys/lf_alloc-pin.c
+++ b/mysys/lf_alloc-pin.c
@@ -15,21 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
- concurrent allocator based on pinning addresses
-
- strictly speaking it's not lock-free, as it can be blocked
- if a thread's purgatory is full and all addresses from there
- are pinned.
-
- But until the above happens, it's wait-free.
-
- It can be made strictly wait-free by increasing purgatory size.
- If it's larger than pins_in_stack*LF_PINBOX_PINS, then apocalyptical
- condition above will never happen. But than the memory requirements
- will be O(pins_in_stack^2).
-
- Note, that for large purgatory sizes it makes sense to remove
- purgatory array, and link objects in a list using embedded pointer.
+ wait-free concurrent allocator based on pinning addresses
TODO test with more than 256 threads
TODO test w/o alloca
@@ -43,15 +29,17 @@
static void _lf_pinbox_real_free(LF_PINS *pins);
-void lf_pinbox_init(LF_PINBOX *pinbox, lf_pinbox_free_func *free_func,
- void *free_func_arg)
+void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset,
+ lf_pinbox_free_func *free_func,void *free_func_arg)
{
DBUG_ASSERT(sizeof(LF_PINS) == 128);
+ DBUG_ASSERT(free_ptr_offset % sizeof(void *) == 0);
lf_dynarray_init(&pinbox->pinstack, sizeof(LF_PINS));
- pinbox->pinstack_top_ver=0;
- pinbox->pins_in_stack=0;
- pinbox->free_func=free_func;
- pinbox->free_func_arg=free_func_arg;
+ pinbox->pinstack_top_ver= 0;
+ pinbox->pins_in_stack= 0;
+ pinbox->free_ptr_offset= free_ptr_offset;
+ pinbox->free_func= free_func;
+ pinbox->free_func_arg= free_func_arg;
}
void lf_pinbox_destroy(LF_PINBOX *pinbox)
@@ -64,58 +52,64 @@ LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox)
uint32 pins, next, top_ver;
LF_PINS *el;
- top_ver=pinbox->pinstack_top_ver;
+ top_ver= pinbox->pinstack_top_ver;
do
{
- if (!(pins=top_ver % LF_PINBOX_MAX_PINS))
+ if (!(pins= top_ver % LF_PINBOX_MAX_PINS))
{
- pins=my_atomic_add32(&pinbox->pins_in_stack, 1)+1;
- el=(LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinstack, pins);
+ pins= my_atomic_add32(&pinbox->pins_in_stack, 1)+1;
+ el= (LF_PINS *)_lf_dynarray_lvalue(&pinbox->pinstack, pins);
break;
}
- el=(LF_PINS *)_lf_dynarray_value(&pinbox->pinstack, pins);
- next=el->link;
+ el= (LF_PINS *)_lf_dynarray_value(&pinbox->pinstack, pins);
+ next= el->link;
} while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver,
top_ver-pins+next+LF_PINBOX_MAX_PINS));
- el->link=pins;
- el->purgatory_count=0;
- el->pinbox=pinbox;
+ el->link= pins;
+ el->purgatory_count= 0;
+ el->pinbox= pinbox;
return el;
}
void _lf_pinbox_put_pins(LF_PINS *pins)
{
- LF_PINBOX *pinbox=pins->pinbox;
+ LF_PINBOX *pinbox= pins->pinbox;
uint32 top_ver, nr;
- nr=pins->link;
+ nr= pins->link;
#ifdef MY_LF_EXTRA_DEBUG
{
int i;
- for (i=0; i < LF_PINBOX_PINS; i++)
+ for (i= 0; i < LF_PINBOX_PINS; i++)
assert(pins->pin[i] == 0);
}
#endif
+ /*
+ Note - this will deadlock if other threads will wait for
+ the caller to do something after _lf_pinbox_put_pins(),
+ and they would have pinned addresses that the caller wants to free.
+ Thus: only free pins when all work is done and nobody can wait for you!!!
+ */
while (pins->purgatory_count)
{
_lf_pinbox_real_free(pins);
- if (pins->purgatory_count && my_getncpus() == 1)
+ if (pins->purgatory_count)
{
my_atomic_rwlock_wrunlock(&pins->pinbox->pinstack.lock);
pthread_yield();
my_atomic_rwlock_wrlock(&pins->pinbox->pinstack.lock);
}
}
- top_ver=pinbox->pinstack_top_ver;
+ top_ver= pinbox->pinstack_top_ver;
if (nr == pinbox->pins_in_stack)
{
- int32 tmp=nr;
+ int32 tmp= nr;
if (my_atomic_cas32(&pinbox->pins_in_stack, &tmp, tmp-1))
goto ret;
}
do
{
- pins->link=top_ver % LF_PINBOX_MAX_PINS;
+ pins->link= top_ver % LF_PINBOX_MAX_PINS;
} while (!my_atomic_cas32(&pinbox->pinstack_top_ver, &top_ver,
top_ver-pins->link+nr+LF_PINBOX_MAX_PINS));
ret:
@@ -127,19 +121,20 @@ static int ptr_cmp(void **a, void **b)
return *a < *b ? -1 : *a == *b ? 0 : 1;
}
+#define add_to_purgatory(PINS, ADDR) \
+ do \
+ { \
+ *(void **)((char *)(ADDR)+(PINS)->pinbox->free_ptr_offset)= \
+ (PINS)->purgatory; \
+ (PINS)->purgatory= (ADDR); \
+ (PINS)->purgatory_count++; \
+ } while (0)
+
void _lf_pinbox_free(LF_PINS *pins, void *addr)
{
- while (pins->purgatory_count == LF_PURGATORY_SIZE)
- {
+ if (pins->purgatory_count % LF_PURGATORY_SIZE)
_lf_pinbox_real_free(pins);
- if (pins->purgatory_count == LF_PURGATORY_SIZE && my_getncpus() == 1)
- {
- my_atomic_rwlock_wrunlock(&pins->pinbox->pinstack.lock);
- pthread_yield();
- my_atomic_rwlock_wrlock(&pins->pinbox->pinstack.lock);
- }
- }
- pins->purgatory[pins->purgatory_count++]=addr;
+ add_to_purgatory(pins, addr);
}
struct st_harvester {
@@ -178,11 +173,11 @@ static int match_pins(LF_PINS *el, void *addr)
static void _lf_pinbox_real_free(LF_PINS *pins)
{
int npins;
- void **addr=0;
- void **start, **cur, **end=pins->purgatory+pins->purgatory_count;
- LF_PINBOX *pinbox=pins->pinbox;
+ void *list;
+ void **addr;
+ LF_PINBOX *pinbox= pins->pinbox;
- npins=pinbox->pins_in_stack+1;
+ npins= pinbox->pins_in_stack+1;
#ifdef HAVE_ALLOCA
/* create a sorted list of pinned addresses, to speed up searches */
@@ -190,64 +185,64 @@ static void _lf_pinbox_real_free(LF_PINS *pins)
{
struct st_harvester hv;
addr= (void **) alloca(sizeof(void *)*LF_PINBOX_PINS*npins);
- hv.granary=addr;
- hv.npins=npins;
+ hv.granary= addr;
+ hv.npins= npins;
_lf_dynarray_iterate(&pinbox->pinstack,
(lf_dynarray_func)harvest_pins, &hv);
- npins=hv.granary-addr;
+ npins= hv.granary-addr;
if (npins)
qsort(addr, npins, sizeof(void *), (qsort_cmp)ptr_cmp);
}
+ else
#endif
+ addr= 0;
- start= cur= pins->purgatory;
- end= start+pins->purgatory_count;
- for (; cur < end; cur++)
+ list= pins->purgatory;
+ pins->purgatory= 0;
+ pins->purgatory_count= 0;
+ while (list)
{
+ void *cur= list;
+ list= *(void **)((char *)cur+pinbox->free_ptr_offset);
if (npins)
{
if (addr)
{
void **a,**b,**c;
- for (a=addr, b=addr+npins-1, c=a+(b-a)/2; b-a>1; c=a+(b-a)/2)
- if (*cur == *c)
- a=b=c;
- else if (*cur > *c)
- a=c;
+ for (a= addr, b= addr+npins-1, c= a+(b-a)/2; b-a>1; c= a+(b-a)/2)
+ if (cur == *c)
+ a= b= c;
+ else if (cur > *c)
+ a= c;
else
- b=c;
- if (*cur == *a || *cur == *b)
+ b= c;
+ if (cur == *a || cur == *b)
goto found;
}
else
{
if (_lf_dynarray_iterate(&pinbox->pinstack,
- (lf_dynarray_func)match_pins, *cur))
+ (lf_dynarray_func)match_pins, cur))
goto found;
}
}
/* not pinned - freeing */
- pinbox->free_func(*cur, pinbox->free_func_arg);
+ pinbox->free_func(cur, pinbox->free_func_arg);
continue;
found:
/* pinned - keeping */
- *start++=*cur;
+ add_to_purgatory(pins, cur);
}
- pins->purgatory_count=start-pins->purgatory;
-#ifdef MY_LF_EXTRA_DEBUG
- while (start < pins->purgatory + LF_PURGATORY_SIZE)
- *start++=0;
-#endif
}
static void alloc_free(void *node, LF_ALLOCATOR *allocator)
{
void *tmp;
- tmp=allocator->top;
+ tmp= allocator->top;
do
{
- (*(void **)node)=tmp;
+ (*(void **)node)= tmp;
} while (!my_atomic_casptr((void **)&allocator->top, (void **)&tmp, node) &&
LF_BACKOFF);
}
@@ -255,18 +250,18 @@ static void alloc_free(void *node, LF_ALLOCATOR *allocator)
LF_REQUIRE_PINS(1);
void *_lf_alloc_new(LF_PINS *pins)
{
- LF_ALLOCATOR *allocator=(LF_ALLOCATOR *)(pins->pinbox->free_func_arg);
+ LF_ALLOCATOR *allocator= (LF_ALLOCATOR *)(pins->pinbox->free_func_arg);
void *node;
for (;;)
{
do
{
- node=allocator->top;
+ node= allocator->top;
_lf_pin(pins, 0, node);
- } while (node !=allocator->top && LF_BACKOFF);
+ } while (node != allocator->top && LF_BACKOFF);
if (!node)
{
- if (!(node=my_malloc(allocator->element_size, MYF(MY_WME|MY_ZEROFILL))))
+ if (!(node= my_malloc(allocator->element_size, MYF(MY_WME|MY_ZEROFILL))))
goto ret;
#ifdef MY_LF_EXTRA_DEBUG
my_atomic_add32(&allocator->mallocs, 1);
@@ -282,27 +277,27 @@ ret:
return node;
}
-void lf_alloc_init(LF_ALLOCATOR *allocator, uint size)
+void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset)
{
- lf_pinbox_init(&allocator->pinbox,
+ lf_pinbox_init(&allocator->pinbox, free_ptr_offset,
(lf_pinbox_free_func *)alloc_free, allocator);
- allocator->top=0;
- allocator->mallocs=0;
- allocator->element_size=size;
+ allocator->top= 0;
+ allocator->mallocs= 0;
+ allocator->element_size= size;
DBUG_ASSERT(size >= (int)sizeof(void *));
}
void lf_alloc_destroy(LF_ALLOCATOR *allocator)
{
- void *el=allocator->top;
+ void *el= allocator->top;
while (el)
{
- void *tmp=*(void **)el;
+ void *tmp= *(void **)el;
my_free(el, MYF(0));
- el=tmp;
+ el= tmp;
}
lf_pinbox_destroy(&allocator->pinbox);
- allocator->top=0;
+ allocator->top= 0;
}
/*
@@ -313,7 +308,8 @@ uint lf_alloc_in_pool(LF_ALLOCATOR *allocator)
{
uint i;
void *node;
- for (node=allocator->top, i=0; node; node=*(void **)node, i++) /* no op */;
+ for (node= allocator->top, i= 0; node; node= *(void **)node, i++)
+ /* no op */;
return i;
}
diff --git a/mysys/lf_hash.c b/mysys/lf_hash.c
index a0425e89556..45b45f7531e 100644
--- a/mysys/lf_hash.c
+++ b/mysys/lf_hash.c
@@ -20,6 +20,7 @@
TODO
try to get rid of dummy nodes ?
for non-unique hash, count only _distinct_ values
+ (but how to do it in lf_hash_delete ?)
*/
#include <my_global.h>
#include <my_sys.h>
@@ -222,7 +223,8 @@ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
uint key_offset, uint key_length, hash_get_key get_key,
CHARSET_INFO *charset)
{
- lf_alloc_init(&hash->alloc,sizeof(LF_SLIST)+element_size);
+ lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size,
+ offsetof(LF_SLIST, key));
lf_dynarray_init(&hash->array, sizeof(LF_SLIST **));
hash->size=1;
hash->count=0;
diff --git a/storage/maria/lockman.c b/storage/maria/lockman.c
index e8ddbd1a25a..1712f6f2221 100644
--- a/storage/maria/lockman.c
+++ b/storage/maria/lockman.c
@@ -1,4 +1,4 @@
-// TODO lock escalation, instant duration locks
+// TODO instant duration locks
// automatically place S instead of LS if possible
/*
TODO optimization: table locks - they have completely
@@ -21,6 +21,94 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+/*
+ Generic Lock Manager
+
+ Lock manager handles locks on "resources", a resource must be uniquely
+ identified by a 64-bit number. Lock manager itself does not imply
+ anything about the nature of a resource - it can be a row, a table, a
+ database, or just anything.
+
+ Locks belong to "lock owners". A Lock owner is uniquely identified by a
+ 16-bit number. A function loid2lo must be provided by the application
+ that takes such a number as an argument and returns a LOCK_OWNER
+ structure.
+
+ Lock levels are completely defined by three tables. Lock compatibility
+ matrix specifies which locks can be held at the same time on a resource.
+ Lock combining matrix specifies what lock level has the same behaviour as
+ a pair of two locks of given levels. getlock_result matrix simplifies
+ intention locking and lock escalation for an application, basically it
+ defines which locks are intention locks and which locks are "loose"
+ locks. It is only used to provide better diagnostics for the
+ application, lock manager itself does not differentiate between normal,
+ intention, and loose locks.
+
+ Internally lock manager is based on a lock-free hash, see lf_hash.c for
+ details. All locks are stored in a hash, with a resource id as a search
+ key, so all locks for the same resource will be considered collisions and
+ will be put in a one (lock-free) linked list. The main lock-handling
+ logic is in the inner loop that searches for a lock in such a linked
+ list - lockfind().
+
+ This works as follows. Locks generally are added to the end of the list
+ (with one exception, see below). When scanning the list it is always
+ possible to determine what locks are granted (active) and what locks are
+ waiting - first lock is obviously active, the second is active if it's
+ compatible with the first, and so on, a lock is active if it's compatible
+ with all previous locks and all locks before it are also active.
+ To calculate the "compatible with all previous locks" all locks are
+ accumulated in prev_lock variable using lock_combining_matrix.
+
+ Lock upgrades: when a thread that has a lock on a given resource,
+ requests a new lock on the same resource and the old lock is not enough
+ to satisfy new lock requirements (which is defined by
+ lock_combining_matrix[old_lock][new_lock] != old_lock), a new lock is
+ placed in the list. Depending on other locks it is immediately active or
+ it will wait for other locks. Here's an exception to "locks are added
+ to the end" rule - upgraded locks are added after the last active lock
+ but before all waiting locks. Old lock (the one we upgraded from) is
+ not removed from the list, indeed we may need to return to it later if
+ the new lock was in a savepoint that gets rolled back. So old lock is
+ marked as "ignored" (IGNORE_ME flag). New lock gets an UPGRADED flag.
+
+ Loose locks add an important exception to the above. Loose locks do not
+ always commute with other locks. In the list IX-LS both locks are active,
+ while in the LS-IX list only the first lock is active. This creates a
+ problem in lock upgrades. If the list was IX-LS and the owner of the
+ first lock wants to place LS lock (which can be immediately granted), the
+ IX lock is upgraded to LSIX and the list becomes IX-LS-LSIX, which,
+ according to the lock compatibility matrix means that the last lock is
+ waiting - of course it all happened because IX and LS were swapped and
+ they don't commute. To work around this there's ACTIVE flag which is set
+ in every lock that never waited (was placed active), and this flag
+ overrides "compatible with all previous locks" rule.
+
+ When a lock is placed to the end of the list it's either compatible with
+ all locks and all locks are active - new lock becomes active at once, or
+ it conflicts with some of the locks, in this case in the 'blocker'
+ variable a conflicting lock is returned and the calling thread waits on a
+ pthread condition in the LOCK_OWNER structure of the owner of the
+ conflicting lock. Or a new lock is compatible with all locks, but some
+ existing locks are not compatible with previous locks (example: request IS,
+ when the list is S-IX) - that is not all locks are active. In this case a
+ first waiting lock is returned in the 'blocker' variable,
+ lockman_getlock() notices that a "blocker" does not conflict with the
+ requested lock, and "dereferences" it, to find the lock that it's waiting
+ on. The calling thread than begins to wait on the same lock.
+
+ To better support table-row relations where one needs to lock the table
+ with an intention lock before locking the row, extended diagnostics is
+ provided. When an intention lock (presumably on a table) is granted,
+ lockman_getlock() returns one of GOT_THE_LOCK (no need to lock the row,
+ perhaps the thread already has a normal lock on this table),
+ GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE (need to lock the row, as usual),
+ GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE (only need to check
+ whether it's possible to lock the row, but no need to lock it - perhaps
+ the thread has a loose lock on this table). This is defined by
+ getlock_result[] table.
+*/
+
#include <my_global.h>
#include <my_sys.h>
#include <my_bit.h>
@@ -36,11 +124,6 @@
') Though you can take LS lock while somebody has S lock, it makes no
sense - it's simpler to take S lock too.
- ") Strictly speaking you can take LX lock while somebody has S lock.
- But in this case you lock no rows, because all rows are locked by this
- somebody. So we prefer to define that LX cannot be taken when S
- exists. Same about LX and X.
-
1 - compatible
0 - incompatible
-1 - "impossible", so that we can assert the impossibility.
@@ -107,8 +190,8 @@ static enum lock_type lock_combining_matrix[10][10]=
Defines below help to preserve the table structure.
I/L/A values are self explanatory
x means the combination is possible (assert should not crash)
- but cannot happen in row locks, only in table locks (S,X), or
- lock escalations (LS,LX)
+ but it cannot happen in row locks, only in table locks (S,X),
+ or lock escalations (LS,LX)
*/
#define I GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE
#define L GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE
@@ -138,8 +221,7 @@ typedef struct lockman_lock {
uint64 resource;
struct lockman_lock *lonext;
intptr volatile link;
- uint32 hashnr;
-//#warning TODO - remove hashnr from LOCK
+ uint32 hashnr; // TODO - remove hashnr from LOCK
uint16 loid;
uchar lock; /* sizeof(uchar) <= sizeof(enum) */
uchar flags;
@@ -147,6 +229,7 @@ typedef struct lockman_lock {
#define IGNORE_ME 1
#define UPGRADED 2
+#define ACTIVE 4
typedef struct {
intptr volatile *prev;
@@ -171,7 +254,7 @@ static int lockfind(LOCK * volatile *head, LOCK *node,
my_bool cur_active, compatible, upgrading, prev_active;
enum lock_type lock, prev_lock, cur_lock;
uint16 loid, cur_loid;
- int upgraded_pairs, cur_flags, flags;
+ int cur_flags, flags;
hashnr= node->hashnr;
resource= node->resource;
@@ -187,7 +270,6 @@ retry:
upgrading= FALSE;
cursor->blocker= cursor->upgrade_from= 0;
_lf_unpin(pins, 3);
- upgraded_pairs= 0;
do {
cursor->curr= PTR(*cursor->prev);
_lf_pin(pins,1,cursor->curr);
@@ -217,28 +299,24 @@ retry:
(cur_hashnr == hashnr && cur_resource >= resource))
{
if (cur_hashnr > hashnr || cur_resource > resource)
- {
- if (upgraded_pairs != 0)
- goto retry;
break;
- }
/* ok, we have a lock for this resource */
DBUG_ASSERT(lock_compatibility_matrix[prev_lock][cur_lock] >= 0);
DBUG_ASSERT(lock_compatibility_matrix[cur_lock][lock] >= 0);
- if (cur_flags & UPGRADED)
- upgraded_pairs++;
if ((cur_flags & IGNORE_ME) && ! (flags & IGNORE_ME))
{
DBUG_ASSERT(cur_active);
- upgraded_pairs--;
if (cur_loid == loid)
cursor->upgrade_from= cursor->curr;
}
else
{
prev_active= cur_active;
- cur_active&= lock_compatibility_matrix[prev_lock][cur_lock];
- if (upgrading && !cur_active && upgraded_pairs == 0)
+ if (cur_flags & ACTIVE)
+ DBUG_ASSERT(prev_active == TRUE);
+ else
+ cur_active&= lock_compatibility_matrix[prev_lock][cur_lock];
+ if (upgrading && !cur_active)
break;
if (prev_active && !cur_active)
{
@@ -253,8 +331,14 @@ retry:
if (lock_combining_matrix[cur_lock][lock] == cur_lock)
{
/* new lock is compatible */
- return cur_active ? ALREADY_HAVE_THE_LOCK
- : ALREADY_HAVE_THE_REQUEST;
+ if (cur_active)
+ {
+ cursor->blocker= cursor->curr; /* loose-locks! */
+ _lf_unpin(pins, 3); /* loose-locks! */
+ return ALREADY_HAVE_THE_LOCK;
+ }
+ else
+ return ALREADY_HAVE_THE_REQUEST;
}
/* not compatible, upgrading */
upgrading= TRUE;
@@ -268,9 +352,9 @@ retry:
cursor->blocker= cursor->curr;
_lf_pin(pins, 3, cursor->curr);
}
- prev_lock= lock_combining_matrix[prev_lock][cur_lock];
- DBUG_ASSERT(prev_lock != N);
}
+ prev_lock= lock_combining_matrix[prev_lock][cur_lock];
+ DBUG_ASSERT(prev_lock != N);
}
}
cursor->prev= &(cursor->curr->link);
@@ -335,6 +419,10 @@ static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins,
node->flags|= UPGRADED;
node->lock= lock_combining_matrix[cursor.upgrade_from->lock][node->lock];
}
+ if (!(res & NEED_TO_WAIT))
+ node->flags|= ACTIVE;
+ else
+ node->flags&= ~ACTIVE; /* if we're retrying on REPEAT_ONCE_MORE */
node->link= (intptr)cursor.curr;
DBUG_ASSERT(node->link != (intptr)node);
DBUG_ASSERT(cursor.prev != &node->link);
@@ -349,13 +437,13 @@ static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins,
_lf_unpin(pins, 1);
_lf_unpin(pins, 2);
/*
- note that cursor.curr is NOT pinned on return.
- this is ok as it's either a dummy node for initialize_bucket
+ note that blocker is not necessarily pinned here (when it's == curr).
+ this is ok as it's either a dummy node then for initialize_bucket
and dummy nodes don't need pinning,
or it's a lock of the same transaction for lockman_getlock,
and it cannot be removed by another thread
*/
- *blocker= cursor.blocker ? cursor.blocker : cursor.curr;
+ *blocker= cursor.blocker;
return res;
}
@@ -419,7 +507,7 @@ static int lockdelete(LOCK * volatile *head, LOCK *node, LF_PINS *pins)
void lockman_init(LOCKMAN *lm, loid_to_lo_func *func, uint timeout)
{
- lf_alloc_init(&lm->alloc,sizeof(LOCK));
+ lf_alloc_init(&lm->alloc,sizeof(LOCK), offsetof(LOCK,lonext));
lf_dynarray_init(&lm->array, sizeof(LOCK **));
lm->size= 1;
lm->count= 0;
@@ -516,13 +604,13 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
res= lockinsert(el, node, pins, &blocker);
if (res & ALREADY_HAVE)
{
+ int r;
old_lock= blocker->lock;
- _lf_assert_unpin(pins, 3); /* unpin should not be needed */
_lf_alloc_free(pins, node);
lf_rwunlock_by_pins(pins);
- res= getlock_result[old_lock][lock];
- DBUG_ASSERT(res);
- return res;
+ r= getlock_result[old_lock][lock];
+ DBUG_ASSERT(r);
+ return r;
}
/* a new value was added to the hash */
csize= lm->size;
@@ -537,9 +625,8 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
struct timespec timeout;
_lf_assert_pin(pins, 3); /* blocker must be pinned here */
- lf_rwunlock_by_pins(pins);
-
wait_for_lo= lm->loid_to_lo(blocker->loid);
+
/*
now, this is tricky. blocker is not necessarily a LOCK
we're waiting for. If it's compatible with what we want,
@@ -550,12 +637,9 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
if (lock_compatibility_matrix[blocker->lock][lock])
{
blocker= wait_for_lo->all_locks;
- lf_pin(pins, 3, blocker);
+ _lf_pin(pins, 3, blocker);
if (blocker != wait_for_lo->all_locks)
- {
- lf_rwlock_by_pins(pins);
continue;
- }
wait_for_lo= wait_for_lo->waiting_for;
}
@@ -565,11 +649,11 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
to an unrelated - albeit valid - LOCK_OWNER
*/
if (!wait_for_lo)
- {
- /* blocker transaction has ended, short id was released */
- lf_rwlock_by_pins(pins);
continue;
- }
+
+ lo->waiting_for= wait_for_lo;
+ lf_rwunlock_by_pins(pins);
+
/*
We lock a mutex - it may belong to a wrong LOCK_OWNER, but it must
belong to _some_ LOCK_OWNER. It means, we can never free() a LOCK_OWNER,
@@ -587,9 +671,8 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
lf_rwlock_by_pins(pins);
continue;
}
- /* yuck. waiting */
- lo->waiting_for= wait_for_lo;
+ /* yuck. waiting */
deadline= my_getsystime() + lm->lock_timeout * 10000;
timeout.tv_sec= deadline/10000000;
timeout.tv_nsec= (deadline % 10000000) * 100;
@@ -607,11 +690,12 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
Instead we're relying on the caller to abort the transaction,
and release all locks at once - see lockman_release_locks()
*/
+ _lf_unpin(pins, 3);
lf_rwunlock_by_pins(pins);
return DIDNT_GET_THE_LOCK;
}
- lo->waiting_for= 0;
}
+ lo->waiting_for= 0;
_lf_assert_unpin(pins, 3); /* unpin should not be needed */
lf_rwunlock_by_pins(pins);
return getlock_result[lock][lock];
@@ -626,14 +710,15 @@ enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
*/
int lockman_release_locks(LOCKMAN *lm, LOCK_OWNER *lo)
{
- LOCK * volatile *el, *node;
+ LOCK * volatile *el, *node, *next;
uint bucket;
LF_PINS *pins= lo->pins;
pthread_mutex_lock(lo->mutex);
lf_rwlock_by_pins(pins);
- for (node= lo->all_locks; node; node= node->lonext)
+ for (node= lo->all_locks; node; node= next)
{
+ next= node->lonext;
bucket= calc_hash(node->resource) % lm->size;
el= _lf_dynarray_lvalue(&lm->array, bucket);
if (*el == NULL)
@@ -650,12 +735,12 @@ int lockman_release_locks(LOCKMAN *lm, LOCK_OWNER *lo)
}
#ifdef MY_LF_EXTRA_DEBUG
+static char *lock2str[]=
+{ "N", "S", "X", "IS", "IX", "SIX", "LS", "LX", "SLX", "LSIX" };
/*
NOTE
the function below is NOT thread-safe !!!
*/
-static char *lock2str[]=
-{ "N", "S", "X", "IS", "IX", "SIX", "LS", "LX", "SLX", "LSIX" };
void print_lockhash(LOCKMAN *lm)
{
LOCK *el= *(LOCK **)_lf_dynarray_lvalue(&lm->array, 0);
@@ -664,17 +749,21 @@ void print_lockhash(LOCKMAN *lm)
{
intptr next= el->link;
if (el->hashnr & 1)
+ {
printf("0x%08x { resource %llu, loid %u, lock %s",
el->hashnr, el->resource, el->loid, lock2str[el->lock]);
+ if (el->flags & IGNORE_ME) printf(" IGNORE_ME");
+ if (el->flags & UPGRADED) printf(" UPGRADED");
+ if (el->flags & ACTIVE) printf(" ACTIVE");
+ if (DELETED(next)) printf(" ***DELETED***");
+ printf("}\n");
+ }
else
{
- printf("0x%08x { dummy ", el->hashnr);
+ /*printf("0x%08x { dummy }\n", el->hashnr);*/
DBUG_ASSERT(el->resource == 0 && el->loid == 0 && el->lock == X);
}
- if (el->flags & IGNORE_ME) printf(" IGNORE_ME");
- if (el->flags & UPGRADED) printf(" UPGRADED");
- printf("}\n");
- el= (LOCK *)next;
+ el= PTR(next);
}
}
#endif
diff --git a/storage/maria/lockman.h b/storage/maria/lockman.h
index a3c96786935..9edd79eb7f1 100644
--- a/storage/maria/lockman.h
+++ b/storage/maria/lockman.h
@@ -18,7 +18,10 @@
#define _lockman_h
/*
- N - "no lock", not a lock, used sometimes to simplify the code
+ Lock levels:
+ ^^^^^^^^^^^
+
+ N - "no lock", not a lock, used sometimes internally to simplify the code
S - Shared
X - eXclusive
IS - Intention Shared
@@ -35,8 +38,8 @@ struct lockman_lock;
typedef struct st_lock_owner LOCK_OWNER;
struct st_lock_owner {
- LF_PINS *pins;
- struct lockman_lock *all_locks;
+ LF_PINS *pins; /* must be allocated from lockman's pinbox */
+ struct lockman_lock *all_locks; /* a LIFO */
LOCK_OWNER *waiting_for;
pthread_cond_t *cond; /* transactions waiting for this, wait on 'cond' */
pthread_mutex_t *mutex; /* mutex is required to use 'cond' */
diff --git a/storage/maria/trnman.c b/storage/maria/trnman.c
index 49f49a3e26b..ecabf100cb8 100644
--- a/storage/maria/trnman.c
+++ b/storage/maria/trnman.c
@@ -128,6 +128,11 @@ static void set_short_trid(TRN *trn)
trn->locks.loid= i;
}
+/*
+ DESCRIPTION
+ start a new transaction, allocate and initialize transaction object
+ mutex and cond will be used for lock waits
+*/
TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
{
TRN *trn;
@@ -148,6 +153,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
trnman_active_transactions++;
trn= pool;
+ /* popping an element from a stack */
my_atomic_rwlock_wrlock(&LOCK_pool);
while (trn && !my_atomic_casptr((void **)&pool, (void **)&trn,
(void *)trn->next))
@@ -213,9 +219,12 @@ void trnman_end_trn(TRN *trn, my_bool commit)
LF_PINS *pins= trn->pins;
pthread_mutex_lock(&LOCK_trn_list);
+
+ /* remove from active list */
trn->next->prev= trn->prev;
trn->prev->next= trn->next;
+ /* if this transaction was the oldest - clean up committed list */
if (trn->prev == &active_list_min)
{
TRN *t;
@@ -232,6 +241,7 @@ void trnman_end_trn(TRN *trn, my_bool commit)
}
}
+ /* add transaction to the committed list (for read-from relations) */
if (commit && active_list_min.next != &active_list_max)
{
trn->commit_trid= global_trid_generator;
@@ -243,7 +253,7 @@ void trnman_end_trn(TRN *trn, my_bool commit)
res= lf_hash_insert(&trid_to_trn, pins, &trn);
DBUG_ASSERT(res == 0);
}
- else
+ else /* or free it right away */
{
trn->next= free_me;
free_me= trn;
@@ -251,6 +261,7 @@ void trnman_end_trn(TRN *trn, my_bool commit)
trnman_active_transactions--;
pthread_mutex_unlock(&LOCK_trn_list);
+ /* the rest is done outside of a critical section */
lockman_release_locks(&maria_lockman, &trn->locks);
trn->locks.mutex= 0;
trn->locks.cond= 0;
@@ -258,7 +269,6 @@ void trnman_end_trn(TRN *trn, my_bool commit)
my_atomic_storeptr((void **)&short_trid_to_trn[trn->locks.loid], 0);
my_atomic_rwlock_rdunlock(&LOCK_short_trid_to_trn);
-
while (free_me) // XXX send them to the purge thread
{
int res;
@@ -288,7 +298,7 @@ void trnman_free_trn(TRN *trn)
do
{
/*
- without volatile cast gcc-3.4.4 moved the assignment
+ without this volatile cast gcc-3.4.4 moved the assignment
down after the loop at -O2
*/
*(TRN * volatile *)&(trn->next)= tmp;
@@ -317,13 +327,13 @@ my_bool trnman_can_read_from(TRN *trn, TrID trid)
LF_REQUIRE_PINS(3);
if (trid < trn->min_read_from)
- return TRUE;
+ return TRUE; /* can read */
if (trid > trn->trid)
- return FALSE;
+ return FALSE; /* cannot read */
found= lf_hash_search(&trid_to_trn, trn->pins, &trid, sizeof(trid));
if (!found)
- return FALSE; /* not in the hash of committed transactions = cannot read*/
+ return FALSE; /* not in the hash of committed transactions = cannot read */
can= (*found)->commit_trid < trn->trid;
lf_unpin(trn->pins, 2);
diff --git a/storage/maria/unittest/lockman-t.c b/storage/maria/unittest/lockman-t.c
index 8b62ccfe094..638078fea65 100644
--- a/storage/maria/unittest/lockman-t.c
+++ b/storage/maria/unittest/lockman-t.c
@@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-//#define EXTRA_VERBOSE
+#undef EXTRA_VERBOSE
#include <tap.h>
@@ -24,7 +24,7 @@
#include <lf.h>
#include "../lockman.h"
-#define Nlos 10
+#define Nlos 100
LOCK_OWNER loarray[Nlos];
pthread_mutex_t mutexes[Nlos];
pthread_cond_t conds[Nlos];
@@ -51,8 +51,7 @@ LOCK_OWNER *loid2lo(uint16 loid)
#define lock_ok_a(O,R,L) test_lock(O,R,L,"",GOT_THE_LOCK)
#define lock_ok_i(O,R,L) test_lock(O,R,L,"",GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE)
#define lock_ok_l(O,R,L) test_lock(O,R,L,"",GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE)
-#define lock_conflict(O,R,L) test_lock(O,R,L,"cannot ",DIDNT_GET_THE_LOCK); \
- unlock_all(O)
+#define lock_conflict(O,R,L) test_lock(O,R,L,"cannot ",DIDNT_GET_THE_LOCK);
void test_lockman_simple()
{
@@ -64,7 +63,8 @@ void test_lockman_simple()
lock_ok_a(1, 1, X);
lock_ok_i(2, 2, IX);
/* failures */
- lock_conflict(2,1,X); /* this removes all locks of lo2 */
+ lock_conflict(2,1,X);
+ unlock_all(2);
lock_ok_a(1,2,S);
lock_ok_a(1,2,IS);
lock_ok_a(1,2,LS);
@@ -72,8 +72,36 @@ void test_lockman_simple()
lock_ok_a(2,3,LS);
lock_ok_i(1,3,IX);
lock_ok_l(2,3,IS);
- lockman_release_locks(&lockman, loid2lo(1));
- lockman_release_locks(&lockman, loid2lo(2));
+ unlock_all(1);
+ unlock_all(2);
+
+ lock_ok_i(1,1,IX);
+ lock_conflict(2,1,S);
+ lock_ok_a(1,1,LS);
+ unlock_all(1);
+ unlock_all(2);
+
+ lock_ok_i(1,1,IX);
+ lock_ok_a(2,1,LS);
+ lock_ok_a(1,1,LS);
+ lock_ok_i(1,1,IX);
+ lock_ok_i(3,1,IS);
+ unlock_all(1);
+ unlock_all(2);
+ unlock_all(3);
+
+ lock_ok_i(1,4,IS);
+ lock_ok_i(2,4,IS);
+ lock_ok_i(3,4,IS);
+ lock_ok_a(3,4,LS);
+ lock_ok_i(4,4,IS);
+ lock_conflict(4,4,IX);
+ lock_conflict(2,4,IX);
+ lock_ok_a(1,4,LS);
+ unlock_all(1);
+ unlock_all(2);
+ unlock_all(3);
+ unlock_all(4);
}
@@ -82,11 +110,13 @@ pthread_mutex_t rt_mutex;
pthread_cond_t rt_cond;
int rt_num_threads;
int litmus;
+int thread_number= 0, timeouts=0;
void run_test(const char *test, pthread_handler handler, int n, int m)
{
pthread_t t;
ulonglong now= my_getsystime();
+ thread_number= timeouts= 0;
litmus= 0;
diag("Testing %s with %d threads, %d iterations... ", test, n, m);
@@ -100,13 +130,12 @@ void run_test(const char *test, pthread_handler handler, int n, int m)
ok(litmus == 0, "tested %s in %g secs (%d)", test, ((double)now)/1e7, litmus);
}
-int thread_number= 0, timeouts=0;
-#define Nrows 1000
-#define Ntables 10
-#define TABLE_LOCK_RATIO 10
+int Nrows= 100;
+int Ntables= 10;
+int table_lock_ratio= 10;
enum lock_type lock_array[6]={S,X,LS,LX,IS,IX};
char *lock2str[6]={"S","X","LS","LX","IS","IX"};
-char *res2str[6]={
+char *res2str[4]={
"DIDN'T GET THE LOCK",
"GOT THE LOCK",
"GOT THE LOCK NEED TO LOCK A SUBRESOURCE",
@@ -128,10 +157,11 @@ pthread_handler_t test_lockman(void *arg)
row= x % Nrows + Ntables;
table= row % Ntables;
locklevel= (x/Nrows) & 3;
- if ((x/Nrows/4) % TABLE_LOCK_RATIO == 0)
+ if (table_lock_ratio && (x/Nrows/4) % table_lock_ratio == 0)
{ /* table lock */
res= lockman_getlock(&lockman, lo, table, lock_array[locklevel]);
- DIAG(("loid=%2d, table %d lock %s, res=%s", loid, table, lock2str[locklevel], res2str[res]));
+ DIAG(("loid=%2d, table %d lock %s, res=%s", loid, table,
+ lock2str[locklevel], res2str[res]));
if (res == DIDNT_GET_THE_LOCK)
{
lockman_release_locks(&lockman, lo);
@@ -145,7 +175,8 @@ pthread_handler_t test_lockman(void *arg)
{ /* row lock */
locklevel&= 1;
res= lockman_getlock(&lockman, lo, table, lock_array[locklevel + 4]);
- DIAG(("loid=%2d, row %d lock %s, res=%s", loid, row, lock2str[locklevel+4], res2str[res]));
+ DIAG(("loid=%2d, row %d lock %s, res=%s", loid, row,
+ lock2str[locklevel+4], res2str[res]));
switch (res)
{
case DIDNT_GET_THE_LOCK:
@@ -159,7 +190,8 @@ pthread_handler_t test_lockman(void *arg)
/* not implemented, so take a regular lock */
case GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE:
res= lockman_getlock(&lockman, lo, row, lock_array[locklevel]);
- DIAG(("loid=%2d, ROW %d lock %s, res=%s", loid, row, lock2str[locklevel], res2str[res]));
+ DIAG(("loid=%2d, ROW %d lock %s, res=%s", loid, row,
+ lock2str[locklevel], res2str[res]));
if (res == DIDNT_GET_THE_LOCK)
{
lockman_release_locks(&lockman, lo);
@@ -196,7 +228,7 @@ int main()
my_init();
- plan(14);
+ plan(31);
if (my_atomic_initialize())
return exit_status();
@@ -222,11 +254,21 @@ int main()
test_lockman_simple();
-#define CYCLES 100
+#define CYCLES 1000
#define THREADS Nlos /* don't change this line */
+ /* mixed load, stress-test with random locks */
+ Nrows= 100;
+ Ntables= 10;
+ table_lock_ratio= 10;
run_test("lockman", test_lockman, THREADS,CYCLES);
+ /* "real-life" simulation - many rows, no table locks */
+ Nrows= 1000000;
+ Ntables= 10;
+ table_lock_ratio= 0;
+ run_test("lockman", test_lockman, THREADS,10000);
+
for (i= 0; i < Nlos; i++)
{
lockman_release_locks(&lockman, &loarray[i]);
@@ -235,7 +277,12 @@ int main()
lf_pinbox_put_pins(loarray[i].pins);
}
- lockman_destroy(&lockman);
+ {
+ ulonglong now= my_getsystime();
+ lockman_destroy(&lockman);
+ now= my_getsystime()-now;
+ diag("lockman_destroy: %g", ((double)now)/1e7);
+ }
pthread_mutex_destroy(&rt_mutex);
pthread_cond_destroy(&rt_cond);
diff --git a/unittest/mysys/my_atomic-t.c b/unittest/mysys/my_atomic-t.c
index 881f80a66f8..e2177b94720 100644
--- a/unittest/mysys/my_atomic-t.c
+++ b/unittest/mysys/my_atomic-t.c
@@ -154,7 +154,7 @@ pthread_handler_t test_lf_pinbox(void *arg)
typedef union {
int32 data;
- void *not_used; /* to guarantee sizeof(TLA) >= sizeof(void *) */
+ void *not_used;
} TLA;
pthread_handler_t test_lf_alloc(void *arg)
@@ -294,7 +294,7 @@ int main()
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
my_atomic_rwlock_init(&rwl);
- lf_alloc_init(&lf_allocator, sizeof(TLA));
+ lf_alloc_init(&lf_allocator, sizeof(TLA), offsetof(TLA, not_used));
lf_hash_init(&lf_hash, sizeof(int), LF_HASH_UNIQUE, 0, sizeof(int), 0,
&my_charset_bin);