diff options
-rw-r--r-- | include/my_pthread.h | 5 | ||||
-rw-r--r-- | include/waiting_threads.h | 4 | ||||
-rw-r--r-- | mysys/waiting_threads.c | 36 | ||||
-rw-r--r-- | storage/maria/ma_state.c | 2 | ||||
-rw-r--r-- | storage/maria/ma_static.c | 3 | ||||
-rw-r--r-- | storage/maria/ma_write.c | 59 | ||||
-rw-r--r-- | storage/maria/maria_def.h | 6 | ||||
-rw-r--r-- | storage/maria/trnman.c | 70 | ||||
-rw-r--r-- | storage/maria/trnman.h | 16 | ||||
-rw-r--r-- | storage/maria/trnman_public.h | 1 |
10 files changed, 163 insertions, 39 deletions
diff --git a/include/my_pthread.h b/include/my_pthread.h index 28ce083d744..337b167e548 100644 --- a/include/my_pthread.h +++ b/include/my_pthread.h @@ -437,9 +437,10 @@ int my_pthread_mutex_trylock(pthread_mutex_t *mutex); #ifndef set_timespec_time_nsec #define set_timespec_time_nsec(ABSTIME,TIME,NSEC) do { \ - ulonglong now= (TIME) + (NSEC/100); \ + ulonglong nsec= (NSEC); \ + ulonglong now= (TIME) + (nsec/100); \ (ABSTIME).TV_sec= (now / ULL(10000000)); \ - (ABSTIME).TV_nsec= (now % ULL(10000000) * 100 + ((NSEC) % 100)); \ + (ABSTIME).TV_nsec= (now % ULL(10000000) * 100 + (nsec % 100)); \ } while(0) #endif /* !set_timespec_time_nsec */ diff --git a/include/waiting_threads.h b/include/waiting_threads.h index 92fbbf998be..6355a83b13d 100644 --- a/include/waiting_threads.h +++ b/include/waiting_threads.h @@ -13,6 +13,9 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#ifndef _waiting_threads_h +#define _waiting_threads_h + #include <my_global.h> #include <my_sys.h> #include <lf.h> @@ -152,3 +155,4 @@ void wt_thd_release(WT_THD *, WT_RESOURCE_ID *); #define wt_thd_release_all(THD) wt_thd_release((THD), 0) int wt_resource_id_memcmp(void *, void *); +#endif diff --git a/mysys/waiting_threads.c b/mysys/waiting_threads.c index 1c87886f405..491e7c3a726 100644 --- a/mysys/waiting_threads.c +++ b/mysys/waiting_threads.c @@ -227,6 +227,20 @@ struct deadlock_arg { WT_RESOURCE *rc; }; +static void change_victim(WT_THD* found, struct deadlock_arg *arg) +{ + if (found->weight < arg->victim->weight) + { + if (arg->victim != arg->thd) + { + rc_unlock(arg->victim->waiting_for); /* release the previous victim */ + DBUG_ASSERT(arg->rc == found->waiting_for); + } + arg->victim= found; + arg->rc= 0; + } +} + /* loop detection in a wait-for graph with a limited search depth. */ @@ -294,16 +308,8 @@ retry: break; case WT_DEADLOCK: ret= WT_DEADLOCK; - if (cursor->weight < arg->victim->weight) - { - if (arg->victim != arg->thd) - { - rc_unlock(arg->victim->waiting_for); /* release the previous victim */ - DBUG_ASSERT(arg->rc == cursor->waiting_for); - } - arg->victim= cursor; - } - else if (arg->rc) + change_victim(cursor, arg); + if (arg->rc) rc_unlock(arg->rc); goto end; case WT_OK: @@ -329,13 +335,15 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth, int ret; DBUG_ENTER("deadlock"); ret= deadlock_search(&arg, blocker, depth); - if (arg.rc) - rc_unlock(arg.rc); if (ret == WT_DEPTH_EXCEEDED) { increment_cycle_stats(WT_CYCLE_STATS, max_depth); ret= WT_OK; } + if (ret == WT_DEADLOCK && depth) + change_victim(blocker, &arg); + if (arg.rc) + rc_unlock(arg.rc); if (ret == WT_DEADLOCK && arg.victim != thd) { DBUG_PRINT("wt", ("killing %s", arg.victim->name)); @@ -570,7 +578,7 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex) ret= WT_OK; rc_unlock(rc); - set_timespec_time_nsec(timeout, starttime, wt_timeout_short*1000); + set_timespec_time_nsec(timeout, starttime, wt_timeout_short*ULL(1000)); if (ret == WT_TIMEOUT) ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout); if (ret == WT_TIMEOUT) @@ -579,7 +587,7 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex) ret= WT_DEADLOCK; else if (wt_timeout_long > wt_timeout_short) { - set_timespec_time_nsec(timeout, starttime, wt_timeout_long*1000); + set_timespec_time_nsec(timeout, starttime, wt_timeout_long*ULL(1000)); if (!thd->killed) ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout); } diff --git a/storage/maria/ma_state.c b/storage/maria/ma_state.c index ec8ea2010ae..422e35795bd 100644 --- a/storage/maria/ma_state.c +++ b/storage/maria/ma_state.c @@ -88,7 +88,7 @@ my_bool _ma_setup_live_state(MARIA_HA *info) It's enough to compare trids here (instead of calling tranman_can_read_from) as history->trid is a commit_trid */ - while (trn->trid < history->trid) + while (trn->trid < history->trid && history->trid != ~(TrID)0) history= history->next; pthread_mutex_unlock(&share->intern_lock); /* The current item can't be deleted as it's the first one visible for us */ diff --git a/storage/maria/ma_static.c b/storage/maria/ma_static.c index 07425a5db91..3af40550a69 100644 --- a/storage/maria/ma_static.c +++ b/storage/maria/ma_static.c @@ -64,6 +64,9 @@ HASH maria_stored_state; */ TRN dummy_transaction_object; +/* a WT_RESOURCE_TYPE for transactions waiting on a unique key conflict */ +WT_RESOURCE_TYPE ma_rc_dup_unique={ wt_resource_id_memcmp, 0}; + /* Enough for comparing if number is zero */ uchar maria_zero_string[]= {0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}; diff --git a/storage/maria/ma_write.c b/storage/maria/ma_write.c index 0ad8db0ae00..87a49d61285 100644 --- a/storage/maria/ma_write.c +++ b/storage/maria/ma_write.c @@ -180,15 +180,47 @@ int maria_write(MARIA_HA *info, uchar *record) } else { - if (keyinfo->ck_insert(info, - (*keyinfo->make_key)(info, &int_key, i, - buff, record, filepos, - info->trn->trid))) + while (keyinfo->ck_insert(info, + (*keyinfo->make_key)(info, &int_key, i, + buff, record, filepos, + info->trn->trid))) { + TRN *blocker=trnman_trid_to_trn(info->trn, info->dup_key_trid); + DBUG_PRINT("error",("Got error: %d on write",my_errno)); + /* + if blocker TRN was not found, it means that the conflicting + transaction was committed long time ago. It could not be + aborted, as it would have to wait on the key tree lock + to remove the conflicting key it has inserted. + */ if (local_lock_tree) rw_unlock(&keyinfo->root_lock); - DBUG_PRINT("error",("Got error: %d on write",my_errno)); - goto err; + if (!blocker) + goto err; + if (blocker->commit_trid != ~(TrID)0) + { /* committed, albeit recently */ + pthread_mutex_unlock(& blocker->state_lock); + goto err; + } + { /* running. now we wait */ + WT_RESOURCE_ID rc; + int res; + + rc.type= &ma_rc_dup_unique; + rc.value.ptr= blocker; /* TODO savepoint id when we'll have them */ + res= wt_thd_will_wait_for(& info->trn->wt, & blocker->wt, & rc); + if (res != WT_OK) + { + pthread_mutex_unlock(& blocker->state_lock); + goto err; + } + res=wt_thd_cond_timedwait(& info->trn->wt, & blocker->state_lock); + pthread_mutex_unlock(& blocker->state_lock); + if (res != WT_OK) + goto err; + } + if (local_lock_tree) + rw_wrlock(&keyinfo->root_lock); } } @@ -597,9 +629,22 @@ static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key, else /* not HA_FULLTEXT, normal HA_NOSAME key */ { DBUG_PRINT("warning", ("Duplicate key")); + /* + FIXME + When the index will support true versioning - with multiple + identical values in the UNIQUE index, invisible to each other - + the following should be changed to "continue inserting keys, at the + end (of the row or statement) wait". Until it's done we cannot properly + support deadlock timeouts. + */ + /* + transaction that has inserted the conflicting key is in progress. + wait for it to be committed or aborted. + */ + info->dup_key_trid= _ma_trid_from_key(&tmp_key); info->dup_key_pos= dup_key_pos; my_afree((uchar*) temp_buff); - my_errno=HA_ERR_FOUND_DUPP_KEY; + my_errno= HA_ERR_FOUND_DUPP_KEY; DBUG_RETURN(-1); } } diff --git a/storage/maria/maria_def.h b/storage/maria/maria_def.h index c1160993607..3d633355729 100644 --- a/storage/maria/maria_def.h +++ b/storage/maria/maria_def.h @@ -29,6 +29,7 @@ #include "ma_loghandler.h" #include "ma_control_file.h" #include "ma_state.h" +#include <waiting_threads.h> /* For testing recovery */ #ifdef TO_BE_REMOVED @@ -492,13 +493,14 @@ struct st_maria_handler uint32 int_keytree_version; /* -""- */ int (*read_record)(MARIA_HA *, uchar*, MARIA_RECORD_POS); invalidator_by_filename invalidator; /* query cache invalidator */ - ulonglong last_auto_increment; /* auto value at start of statement */ + ulonglong last_auto_increment; /* auto value at start of statement */ ulong this_unique; /* uniq filenumber or thread */ ulong last_unique; /* last unique number */ ulong this_loop; /* counter for this open */ ulong last_loop; /* last used counter */ MARIA_RECORD_POS save_lastpos; MARIA_RECORD_POS dup_key_pos; + TrID dup_key_trid; my_off_t pos; /* Intern variable */ my_off_t last_keypage; /* Last key page read */ my_off_t last_search_keypage; /* Last keypage when searching */ @@ -759,6 +761,7 @@ extern char *maria_data_root; extern uchar maria_zero_string[]; extern my_bool maria_inited, maria_in_ha_maria; extern HASH maria_stored_state; +extern WT_RESOURCE_TYPE ma_rc_dup_unique; /* This is used by _ma_calc_xxx_key_length och _ma_store_key */ typedef struct st_maria_s_param @@ -782,7 +785,6 @@ typedef struct st_pinned_page my_bool changed; } MARIA_PINNED_PAGE; - /* Prototypes for intern functions */ extern int _ma_read_dynamic_record(MARIA_HA *, uchar *, MARIA_RECORD_POS); extern int _ma_read_rnd_dynamic_record(MARIA_HA *, uchar *, MARIA_RECORD_POS, diff --git a/storage/maria/trnman.c b/storage/maria/trnman.c index 9f6d0829521..d2077f8f4c6 100644 --- a/storage/maria/trnman.c +++ b/storage/maria/trnman.c @@ -46,7 +46,7 @@ static TRN *pool; /* a hash for committed transactions that maps trid to a TRN structure */ static LF_HASH trid_to_trn; -/* an array that maps short_trid of an active transaction to a TRN structure */ +/* an array that maps short_id of an active transaction to a TRN structure */ static TRN **short_trid_to_active_trn; /* locks for short_trid_to_active_trn and pool */ @@ -114,11 +114,13 @@ int trnman_init(TrID initial_trid) { DBUG_ENTER("trnman_init"); + wt_init(); /* FIXME this should be done in the server, not in the engine! */ + short_trid_to_active_trn= (TRN **)my_malloc(SHORT_TRID_MAX*sizeof(TRN*), MYF(MY_WME|MY_ZEROFILL)); if (unlikely(!short_trid_to_active_trn)) DBUG_RETURN(1); - short_trid_to_active_trn--; /* min short_trid is 1 */ + short_trid_to_active_trn--; /* min short_id is 1 */ /* Initialize lists. @@ -179,6 +181,8 @@ void trnman_destroy() { TRN *trn= pool; pool= pool->next; + pthread_mutex_destroy(&trn->state_lock); + wt_thd_destroy(&trn->wt); my_free((void *)trn, MYF(0)); } lf_hash_destroy(&trid_to_trn); @@ -188,6 +192,9 @@ void trnman_destroy() my_atomic_rwlock_destroy(&LOCK_pool); my_free((void *)(short_trid_to_active_trn+1), MYF(0)); short_trid_to_active_trn= NULL; + + wt_end(); + DBUG_VOID_RETURN; } @@ -206,11 +213,13 @@ static TrID new_trid() DBUG_RETURN(++global_trid_generator); } -static void set_short_trid(TRN *trn) +static uint get_short_trid(TRN *trn) { int i= (int) ((global_trid_generator + (intptr)trn) * 312089 % SHORT_TRID_MAX + 1); - for ( ; !trn->short_id ; i= 1) + uint res=0; + + for ( ; !res ; i= 1) { my_atomic_rwlock_wrlock(&LOCK_short_trid_to_trn); for ( ; i <= SHORT_TRID_MAX; i++) /* the range is [1..SHORT_TRID_MAX] */ @@ -219,12 +228,13 @@ static void set_short_trid(TRN *trn) if (short_trid_to_active_trn[i] == NULL && my_atomic_casptr((void **)&short_trid_to_active_trn[i], &tmp, trn)) { - trn->short_id= i; + res= i; break; } } my_atomic_rwlock_wrunlock(&LOCK_short_trid_to_trn); } + return res; } /* @@ -243,7 +253,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) we have a mutex, to do simple things under it - allocate a TRN, increment trnman_active_transactions, set trn->min_read_from. - Note that all the above is fast. generating short_trid may be slow, + Note that all the above is fast. generating short_id may be slow, as it involves scanning a large array - so it's done outside of the mutex. */ @@ -280,6 +290,8 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) return 0; } trnman_allocated_transactions++; + pthread_mutex_init(&trn->state_lock, MY_MUTEX_INIT_FAST); + wt_thd_init(&trn->wt); } trn->pins= lf_hash_get_pins(&trid_to_trn); if (!trn->pins) @@ -293,7 +305,6 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) trn->min_read_from= active_list_min.next->trid; trn->trid= new_trid(); - trn->short_id= 0; trn->next= &active_list_max; trn->prev= active_list_max.prev; @@ -320,7 +331,9 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) only after the following function TRN is considered initialized, so it must be done the last */ - set_short_trid(trn); + pthread_mutex_lock(&trn->state_lock); + trn->short_id= get_short_trid(trn); + pthread_mutex_unlock(&trn->state_lock); res= lf_hash_insert(&trid_to_trn, trn->pins, &trn); DBUG_ASSERT(res <= 0); @@ -364,6 +377,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit) /* if a rollback, all UNDO records should have been executed */ DBUG_ASSERT(commit || trn->undo_lsn == 0); DBUG_PRINT("info", ("pthread_mutex_lock LOCK_trn_list")); + pthread_mutex_lock(&LOCK_trn_list); /* remove from active list */ @@ -402,7 +416,11 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit) */ if (commit && active_list_min.next != &active_list_max) { + pthread_mutex_lock(&trn->state_lock); trn->commit_trid= global_trid_generator; + wt_thd_release_all(& trn->wt); + pthread_mutex_unlock(&trn->state_lock); + trn->next= &committed_list_max; trn->prev= committed_list_max.prev; trnman_committed_transactions++; @@ -436,11 +454,14 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit) TRN *t= free_me; free_me= free_me->next; - /* - ignore OOM here. it's harmless, and there's nothing we could do, anyway - */ + /* ignore OOM. it's harmless, and we can do nothing here anyway */ (void)lf_hash_delete(&trid_to_trn, pins, &t->trid, sizeof(TrID)); + pthread_mutex_lock(&trn->state_lock); + trn->short_id= 0; + wt_thd_release_all(& trn->wt); + pthread_mutex_unlock(&trn->state_lock); + trnman_free_trn(t); } @@ -533,6 +554,33 @@ int trnman_can_read_from(TRN *trn, TrID trid) return can; } +TRN *trnman_trid_to_trn(TRN *trn, TrID trid) +{ + TRN **found; + LF_REQUIRE_PINS(3); + + if (trid < trn->min_read_from) + return 0; /* it's committed eons ago */ + + found= lf_hash_search(&trid_to_trn, trn->pins, &trid, sizeof(trid)); + if (found == NULL || found == MY_ERRPTR) + return 0; /* no luck */ + + /* we've found something */ + pthread_mutex_lock(&(*found)->state_lock); + + if ((*found)->short_id == 0) + { + pthread_mutex_unlock(&(*found)->state_lock); + lf_hash_search_unpin(trn->pins); + return 0; /* but it was a ghost */ + } + lf_hash_search_unpin(trn->pins); + + /* Gotcha! */ + return *found; /* note that TRN is returned locked !!! */ +} + /* TODO: the stubs below are waiting for savepoints to be implemented */ void trnman_new_statement(TRN *trn __attribute__ ((unused))) diff --git a/storage/maria/trnman.h b/storage/maria/trnman.h index c9b26292110..f9f1d91d50e 100644 --- a/storage/maria/trnman.h +++ b/storage/maria/trnman.h @@ -21,19 +21,32 @@ C_MODE_START #include <lf.h> #include "trnman_public.h" #include "ma_loghandler_lsn.h" +#include <waiting_threads.h> /* trid - 6 uchar transaction identifier. Assigned when a transaction is created. Transaction can always be identified by its trid, even after transaction has ended. - short_trid - 2-byte transaction identifier, identifies a running + short_id - 2-byte transaction identifier, identifies a running transaction, is reassigned when transaction ends. + + when short_id is 0, TRN is not initialized, for all practical purposes + it could be considered unused. + + when commit_trid is ~(TrID)0 the transaction is running, otherwise it's + committed. + + state_lock mutex protects the state of a TRN, that is whether a TRN + is committed/running/unused. Meaning that modifications of short_id and + commit_trid happen under this mutex. */ struct st_transaction { LF_PINS *pins; + WT_THD wt; + pthread_mutex_t state_lock; void *used_tables; /* Tables used by transaction */ TRN *next, *prev; TrID trid, min_read_from, commit_trid; @@ -41,7 +54,6 @@ struct st_transaction LSN_WITH_FLAGS first_undo_lsn; uint locked_tables; uint16 short_id; - /* Note! if short_id is 0, trn is NOT initialized */ }; #define TRANSACTION_LOGGED_LONG_ID ULL(0x8000000000000000) diff --git a/storage/maria/trnman_public.h b/storage/maria/trnman_public.h index d8238a334e5..fe172cbbb54 100644 --- a/storage/maria/trnman_public.h +++ b/storage/maria/trnman_public.h @@ -45,6 +45,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit); #define trnman_rollback_trn(T) trnman_end_trn(T, FALSE) void trnman_free_trn(TRN *trn); int trnman_can_read_from(TRN *trn, TrID trid); +TRN *trnman_trid_to_trn(TRN *trn, TrID trid); void trnman_new_statement(TRN *trn); void trnman_rollback_statement(TRN *trn); my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com, |