summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/my_pthread.h5
-rw-r--r--include/waiting_threads.h4
-rw-r--r--mysys/waiting_threads.c36
-rw-r--r--storage/maria/ma_state.c2
-rw-r--r--storage/maria/ma_static.c3
-rw-r--r--storage/maria/ma_write.c59
-rw-r--r--storage/maria/maria_def.h6
-rw-r--r--storage/maria/trnman.c70
-rw-r--r--storage/maria/trnman.h16
-rw-r--r--storage/maria/trnman_public.h1
10 files changed, 163 insertions, 39 deletions
diff --git a/include/my_pthread.h b/include/my_pthread.h
index 28ce083d744..337b167e548 100644
--- a/include/my_pthread.h
+++ b/include/my_pthread.h
@@ -437,9 +437,10 @@ int my_pthread_mutex_trylock(pthread_mutex_t *mutex);
#ifndef set_timespec_time_nsec
#define set_timespec_time_nsec(ABSTIME,TIME,NSEC) do { \
- ulonglong now= (TIME) + (NSEC/100); \
+ ulonglong nsec= (NSEC); \
+ ulonglong now= (TIME) + (nsec/100); \
(ABSTIME).TV_sec= (now / ULL(10000000)); \
- (ABSTIME).TV_nsec= (now % ULL(10000000) * 100 + ((NSEC) % 100)); \
+ (ABSTIME).TV_nsec= (now % ULL(10000000) * 100 + (nsec % 100)); \
} while(0)
#endif /* !set_timespec_time_nsec */
diff --git a/include/waiting_threads.h b/include/waiting_threads.h
index 92fbbf998be..6355a83b13d 100644
--- a/include/waiting_threads.h
+++ b/include/waiting_threads.h
@@ -13,6 +13,9 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#ifndef _waiting_threads_h
+#define _waiting_threads_h
+
#include <my_global.h>
#include <my_sys.h>
#include <lf.h>
@@ -152,3 +155,4 @@ void wt_thd_release(WT_THD *, WT_RESOURCE_ID *);
#define wt_thd_release_all(THD) wt_thd_release((THD), 0)
int wt_resource_id_memcmp(void *, void *);
+#endif
diff --git a/mysys/waiting_threads.c b/mysys/waiting_threads.c
index 1c87886f405..491e7c3a726 100644
--- a/mysys/waiting_threads.c
+++ b/mysys/waiting_threads.c
@@ -227,6 +227,20 @@ struct deadlock_arg {
WT_RESOURCE *rc;
};
+static void change_victim(WT_THD* found, struct deadlock_arg *arg)
+{
+ if (found->weight < arg->victim->weight)
+ {
+ if (arg->victim != arg->thd)
+ {
+ rc_unlock(arg->victim->waiting_for); /* release the previous victim */
+ DBUG_ASSERT(arg->rc == found->waiting_for);
+ }
+ arg->victim= found;
+ arg->rc= 0;
+ }
+}
+
/*
loop detection in a wait-for graph with a limited search depth.
*/
@@ -294,16 +308,8 @@ retry:
break;
case WT_DEADLOCK:
ret= WT_DEADLOCK;
- if (cursor->weight < arg->victim->weight)
- {
- if (arg->victim != arg->thd)
- {
- rc_unlock(arg->victim->waiting_for); /* release the previous victim */
- DBUG_ASSERT(arg->rc == cursor->waiting_for);
- }
- arg->victim= cursor;
- }
- else if (arg->rc)
+ change_victim(cursor, arg);
+ if (arg->rc)
rc_unlock(arg->rc);
goto end;
case WT_OK:
@@ -329,13 +335,15 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth,
int ret;
DBUG_ENTER("deadlock");
ret= deadlock_search(&arg, blocker, depth);
- if (arg.rc)
- rc_unlock(arg.rc);
if (ret == WT_DEPTH_EXCEEDED)
{
increment_cycle_stats(WT_CYCLE_STATS, max_depth);
ret= WT_OK;
}
+ if (ret == WT_DEADLOCK && depth)
+ change_victim(blocker, &arg);
+ if (arg.rc)
+ rc_unlock(arg.rc);
if (ret == WT_DEADLOCK && arg.victim != thd)
{
DBUG_PRINT("wt", ("killing %s", arg.victim->name));
@@ -570,7 +578,7 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
ret= WT_OK;
rc_unlock(rc);
- set_timespec_time_nsec(timeout, starttime, wt_timeout_short*1000);
+ set_timespec_time_nsec(timeout, starttime, wt_timeout_short*ULL(1000));
if (ret == WT_TIMEOUT)
ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
if (ret == WT_TIMEOUT)
@@ -579,7 +587,7 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
ret= WT_DEADLOCK;
else if (wt_timeout_long > wt_timeout_short)
{
- set_timespec_time_nsec(timeout, starttime, wt_timeout_long*1000);
+ set_timespec_time_nsec(timeout, starttime, wt_timeout_long*ULL(1000));
if (!thd->killed)
ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
}
diff --git a/storage/maria/ma_state.c b/storage/maria/ma_state.c
index ec8ea2010ae..422e35795bd 100644
--- a/storage/maria/ma_state.c
+++ b/storage/maria/ma_state.c
@@ -88,7 +88,7 @@ my_bool _ma_setup_live_state(MARIA_HA *info)
It's enough to compare trids here (instead of calling
tranman_can_read_from) as history->trid is a commit_trid
*/
- while (trn->trid < history->trid)
+ while (trn->trid < history->trid && history->trid != ~(TrID)0)
history= history->next;
pthread_mutex_unlock(&share->intern_lock);
/* The current item can't be deleted as it's the first one visible for us */
diff --git a/storage/maria/ma_static.c b/storage/maria/ma_static.c
index 07425a5db91..3af40550a69 100644
--- a/storage/maria/ma_static.c
+++ b/storage/maria/ma_static.c
@@ -64,6 +64,9 @@ HASH maria_stored_state;
*/
TRN dummy_transaction_object;
+/* a WT_RESOURCE_TYPE for transactions waiting on a unique key conflict */
+WT_RESOURCE_TYPE ma_rc_dup_unique={ wt_resource_id_memcmp, 0};
+
/* Enough for comparing if number is zero */
uchar maria_zero_string[]= {0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0};
diff --git a/storage/maria/ma_write.c b/storage/maria/ma_write.c
index 0ad8db0ae00..87a49d61285 100644
--- a/storage/maria/ma_write.c
+++ b/storage/maria/ma_write.c
@@ -180,15 +180,47 @@ int maria_write(MARIA_HA *info, uchar *record)
}
else
{
- if (keyinfo->ck_insert(info,
- (*keyinfo->make_key)(info, &int_key, i,
- buff, record, filepos,
- info->trn->trid)))
+ while (keyinfo->ck_insert(info,
+ (*keyinfo->make_key)(info, &int_key, i,
+ buff, record, filepos,
+ info->trn->trid)))
{
+ TRN *blocker=trnman_trid_to_trn(info->trn, info->dup_key_trid);
+ DBUG_PRINT("error",("Got error: %d on write",my_errno));
+ /*
+ if blocker TRN was not found, it means that the conflicting
+ transaction was committed long time ago. It could not be
+ aborted, as it would have to wait on the key tree lock
+ to remove the conflicting key it has inserted.
+ */
if (local_lock_tree)
rw_unlock(&keyinfo->root_lock);
- DBUG_PRINT("error",("Got error: %d on write",my_errno));
- goto err;
+ if (!blocker)
+ goto err;
+ if (blocker->commit_trid != ~(TrID)0)
+ { /* committed, albeit recently */
+ pthread_mutex_unlock(& blocker->state_lock);
+ goto err;
+ }
+ { /* running. now we wait */
+ WT_RESOURCE_ID rc;
+ int res;
+
+ rc.type= &ma_rc_dup_unique;
+ rc.value.ptr= blocker; /* TODO savepoint id when we'll have them */
+ res= wt_thd_will_wait_for(& info->trn->wt, & blocker->wt, & rc);
+ if (res != WT_OK)
+ {
+ pthread_mutex_unlock(& blocker->state_lock);
+ goto err;
+ }
+ res=wt_thd_cond_timedwait(& info->trn->wt, & blocker->state_lock);
+ pthread_mutex_unlock(& blocker->state_lock);
+ if (res != WT_OK)
+ goto err;
+ }
+ if (local_lock_tree)
+ rw_wrlock(&keyinfo->root_lock);
}
}
@@ -597,9 +629,22 @@ static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key,
else /* not HA_FULLTEXT, normal HA_NOSAME key */
{
DBUG_PRINT("warning", ("Duplicate key"));
+ /*
+ FIXME
+ When the index will support true versioning - with multiple
+ identical values in the UNIQUE index, invisible to each other -
+ the following should be changed to "continue inserting keys, at the
+ end (of the row or statement) wait". Until it's done we cannot properly
+ support deadlock timeouts.
+ */
+ /*
+ transaction that has inserted the conflicting key is in progress.
+ wait for it to be committed or aborted.
+ */
+ info->dup_key_trid= _ma_trid_from_key(&tmp_key);
info->dup_key_pos= dup_key_pos;
my_afree((uchar*) temp_buff);
- my_errno=HA_ERR_FOUND_DUPP_KEY;
+ my_errno= HA_ERR_FOUND_DUPP_KEY;
DBUG_RETURN(-1);
}
}
diff --git a/storage/maria/maria_def.h b/storage/maria/maria_def.h
index c1160993607..3d633355729 100644
--- a/storage/maria/maria_def.h
+++ b/storage/maria/maria_def.h
@@ -29,6 +29,7 @@
#include "ma_loghandler.h"
#include "ma_control_file.h"
#include "ma_state.h"
+#include <waiting_threads.h>
/* For testing recovery */
#ifdef TO_BE_REMOVED
@@ -492,13 +493,14 @@ struct st_maria_handler
uint32 int_keytree_version; /* -""- */
int (*read_record)(MARIA_HA *, uchar*, MARIA_RECORD_POS);
invalidator_by_filename invalidator; /* query cache invalidator */
- ulonglong last_auto_increment; /* auto value at start of statement */
+ ulonglong last_auto_increment; /* auto value at start of statement */
ulong this_unique; /* uniq filenumber or thread */
ulong last_unique; /* last unique number */
ulong this_loop; /* counter for this open */
ulong last_loop; /* last used counter */
MARIA_RECORD_POS save_lastpos;
MARIA_RECORD_POS dup_key_pos;
+ TrID dup_key_trid;
my_off_t pos; /* Intern variable */
my_off_t last_keypage; /* Last key page read */
my_off_t last_search_keypage; /* Last keypage when searching */
@@ -759,6 +761,7 @@ extern char *maria_data_root;
extern uchar maria_zero_string[];
extern my_bool maria_inited, maria_in_ha_maria;
extern HASH maria_stored_state;
+extern WT_RESOURCE_TYPE ma_rc_dup_unique;
/* This is used by _ma_calc_xxx_key_length och _ma_store_key */
typedef struct st_maria_s_param
@@ -782,7 +785,6 @@ typedef struct st_pinned_page
my_bool changed;
} MARIA_PINNED_PAGE;
-
/* Prototypes for intern functions */
extern int _ma_read_dynamic_record(MARIA_HA *, uchar *, MARIA_RECORD_POS);
extern int _ma_read_rnd_dynamic_record(MARIA_HA *, uchar *, MARIA_RECORD_POS,
diff --git a/storage/maria/trnman.c b/storage/maria/trnman.c
index 9f6d0829521..d2077f8f4c6 100644
--- a/storage/maria/trnman.c
+++ b/storage/maria/trnman.c
@@ -46,7 +46,7 @@ static TRN *pool;
/* a hash for committed transactions that maps trid to a TRN structure */
static LF_HASH trid_to_trn;
-/* an array that maps short_trid of an active transaction to a TRN structure */
+/* an array that maps short_id of an active transaction to a TRN structure */
static TRN **short_trid_to_active_trn;
/* locks for short_trid_to_active_trn and pool */
@@ -114,11 +114,13 @@ int trnman_init(TrID initial_trid)
{
DBUG_ENTER("trnman_init");
+ wt_init(); /* FIXME this should be done in the server, not in the engine! */
+
short_trid_to_active_trn= (TRN **)my_malloc(SHORT_TRID_MAX*sizeof(TRN*),
MYF(MY_WME|MY_ZEROFILL));
if (unlikely(!short_trid_to_active_trn))
DBUG_RETURN(1);
- short_trid_to_active_trn--; /* min short_trid is 1 */
+ short_trid_to_active_trn--; /* min short_id is 1 */
/*
Initialize lists.
@@ -179,6 +181,8 @@ void trnman_destroy()
{
TRN *trn= pool;
pool= pool->next;
+ pthread_mutex_destroy(&trn->state_lock);
+ wt_thd_destroy(&trn->wt);
my_free((void *)trn, MYF(0));
}
lf_hash_destroy(&trid_to_trn);
@@ -188,6 +192,9 @@ void trnman_destroy()
my_atomic_rwlock_destroy(&LOCK_pool);
my_free((void *)(short_trid_to_active_trn+1), MYF(0));
short_trid_to_active_trn= NULL;
+
+ wt_end();
+
DBUG_VOID_RETURN;
}
@@ -206,11 +213,13 @@ static TrID new_trid()
DBUG_RETURN(++global_trid_generator);
}
-static void set_short_trid(TRN *trn)
+static uint get_short_trid(TRN *trn)
{
int i= (int) ((global_trid_generator + (intptr)trn) * 312089 %
SHORT_TRID_MAX + 1);
- for ( ; !trn->short_id ; i= 1)
+ uint res=0;
+
+ for ( ; !res ; i= 1)
{
my_atomic_rwlock_wrlock(&LOCK_short_trid_to_trn);
for ( ; i <= SHORT_TRID_MAX; i++) /* the range is [1..SHORT_TRID_MAX] */
@@ -219,12 +228,13 @@ static void set_short_trid(TRN *trn)
if (short_trid_to_active_trn[i] == NULL &&
my_atomic_casptr((void **)&short_trid_to_active_trn[i], &tmp, trn))
{
- trn->short_id= i;
+ res= i;
break;
}
}
my_atomic_rwlock_wrunlock(&LOCK_short_trid_to_trn);
}
+ return res;
}
/*
@@ -243,7 +253,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
we have a mutex, to do simple things under it - allocate a TRN,
increment trnman_active_transactions, set trn->min_read_from.
- Note that all the above is fast. generating short_trid may be slow,
+ Note that all the above is fast. generating short_id may be slow,
as it involves scanning a large array - so it's done outside of the
mutex.
*/
@@ -280,6 +290,8 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
return 0;
}
trnman_allocated_transactions++;
+ pthread_mutex_init(&trn->state_lock, MY_MUTEX_INIT_FAST);
+ wt_thd_init(&trn->wt);
}
trn->pins= lf_hash_get_pins(&trid_to_trn);
if (!trn->pins)
@@ -293,7 +305,6 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
trn->min_read_from= active_list_min.next->trid;
trn->trid= new_trid();
- trn->short_id= 0;
trn->next= &active_list_max;
trn->prev= active_list_max.prev;
@@ -320,7 +331,9 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
only after the following function TRN is considered initialized,
so it must be done the last
*/
- set_short_trid(trn);
+ pthread_mutex_lock(&trn->state_lock);
+ trn->short_id= get_short_trid(trn);
+ pthread_mutex_unlock(&trn->state_lock);
res= lf_hash_insert(&trid_to_trn, trn->pins, &trn);
DBUG_ASSERT(res <= 0);
@@ -364,6 +377,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
/* if a rollback, all UNDO records should have been executed */
DBUG_ASSERT(commit || trn->undo_lsn == 0);
DBUG_PRINT("info", ("pthread_mutex_lock LOCK_trn_list"));
+
pthread_mutex_lock(&LOCK_trn_list);
/* remove from active list */
@@ -402,7 +416,11 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
*/
if (commit && active_list_min.next != &active_list_max)
{
+ pthread_mutex_lock(&trn->state_lock);
trn->commit_trid= global_trid_generator;
+ wt_thd_release_all(& trn->wt);
+ pthread_mutex_unlock(&trn->state_lock);
+
trn->next= &committed_list_max;
trn->prev= committed_list_max.prev;
trnman_committed_transactions++;
@@ -436,11 +454,14 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
TRN *t= free_me;
free_me= free_me->next;
- /*
- ignore OOM here. it's harmless, and there's nothing we could do, anyway
- */
+ /* ignore OOM. it's harmless, and we can do nothing here anyway */
(void)lf_hash_delete(&trid_to_trn, pins, &t->trid, sizeof(TrID));
+ pthread_mutex_lock(&trn->state_lock);
+ trn->short_id= 0;
+ wt_thd_release_all(& trn->wt);
+ pthread_mutex_unlock(&trn->state_lock);
+
trnman_free_trn(t);
}
@@ -533,6 +554,33 @@ int trnman_can_read_from(TRN *trn, TrID trid)
return can;
}
+TRN *trnman_trid_to_trn(TRN *trn, TrID trid)
+{
+ TRN **found;
+ LF_REQUIRE_PINS(3);
+
+ if (trid < trn->min_read_from)
+ return 0; /* it's committed eons ago */
+
+ found= lf_hash_search(&trid_to_trn, trn->pins, &trid, sizeof(trid));
+ if (found == NULL || found == MY_ERRPTR)
+ return 0; /* no luck */
+
+ /* we've found something */
+ pthread_mutex_lock(&(*found)->state_lock);
+
+ if ((*found)->short_id == 0)
+ {
+ pthread_mutex_unlock(&(*found)->state_lock);
+ lf_hash_search_unpin(trn->pins);
+ return 0; /* but it was a ghost */
+ }
+ lf_hash_search_unpin(trn->pins);
+
+ /* Gotcha! */
+ return *found; /* note that TRN is returned locked !!! */
+}
+
/* TODO: the stubs below are waiting for savepoints to be implemented */
void trnman_new_statement(TRN *trn __attribute__ ((unused)))
diff --git a/storage/maria/trnman.h b/storage/maria/trnman.h
index c9b26292110..f9f1d91d50e 100644
--- a/storage/maria/trnman.h
+++ b/storage/maria/trnman.h
@@ -21,19 +21,32 @@ C_MODE_START
#include <lf.h>
#include "trnman_public.h"
#include "ma_loghandler_lsn.h"
+#include <waiting_threads.h>
/*
trid - 6 uchar transaction identifier. Assigned when a transaction
is created. Transaction can always be identified by its trid,
even after transaction has ended.
- short_trid - 2-byte transaction identifier, identifies a running
+ short_id - 2-byte transaction identifier, identifies a running
transaction, is reassigned when transaction ends.
+
+ when short_id is 0, TRN is not initialized, for all practical purposes
+ it could be considered unused.
+
+ when commit_trid is ~(TrID)0 the transaction is running, otherwise it's
+ committed.
+
+ state_lock mutex protects the state of a TRN, that is whether a TRN
+ is committed/running/unused. Meaning that modifications of short_id and
+ commit_trid happen under this mutex.
*/
struct st_transaction
{
LF_PINS *pins;
+ WT_THD wt;
+ pthread_mutex_t state_lock;
void *used_tables; /* Tables used by transaction */
TRN *next, *prev;
TrID trid, min_read_from, commit_trid;
@@ -41,7 +54,6 @@ struct st_transaction
LSN_WITH_FLAGS first_undo_lsn;
uint locked_tables;
uint16 short_id;
- /* Note! if short_id is 0, trn is NOT initialized */
};
#define TRANSACTION_LOGGED_LONG_ID ULL(0x8000000000000000)
diff --git a/storage/maria/trnman_public.h b/storage/maria/trnman_public.h
index d8238a334e5..fe172cbbb54 100644
--- a/storage/maria/trnman_public.h
+++ b/storage/maria/trnman_public.h
@@ -45,6 +45,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit);
#define trnman_rollback_trn(T) trnman_end_trn(T, FALSE)
void trnman_free_trn(TRN *trn);
int trnman_can_read_from(TRN *trn, TrID trid);
+TRN *trnman_trid_to_trn(TRN *trn, TrID trid);
void trnman_new_statement(TRN *trn);
void trnman_rollback_statement(TRN *trn);
my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com,