diff options
author | Ben Gamari <ben@smart-cactus.org> | 2019-09-30 16:04:17 +0000 |
---|---|---|
committer | Moritz Angermann <moritz.angermann@gmail.com> | 2020-09-18 08:09:58 +0000 |
commit | bc911406b246162e862282ae527c629689236b57 (patch) | |
tree | 2a42cf42a7a3c6116b2f42e88780632d448e6caa | |
parent | a63d774c1464e69e7ee48d77d612612a57596241 (diff) | |
download | haskell-bc911406b246162e862282ae527c629689236b57.tar.gz |
rts/STM: Use atomics
This fixes a potentially harmful race where we failed to synchronize
before looking at a TVar's current_value.
Also did a bit of refactoring to avoid abstract over management of
max_commits.
-rw-r--r-- | rts/STM.c | 76 |
1 files changed, 47 insertions, 29 deletions
@@ -207,7 +207,7 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, StgClosure *expected) { StgClosure *result; TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected); - result = s -> current_value; + result = RELAXED_LOAD(&s->current_value); TRACE("%p : %s", trec, (result == expected) ? "success" : "failure"); return (result == expected); } @@ -228,7 +228,7 @@ static void lock_stm(StgTRecHeader *trec) { static void unlock_stm(StgTRecHeader *trec STG_UNUSED) { TRACE("%p : unlock_stm()", trec); ASSERT(smp_locked == trec); - smp_locked = 0; + SEQ_CST_STORE(&smp_locked, 0); } static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, @@ -236,7 +236,7 @@ static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, StgClosure *result; TRACE("%p : lock_tvar(%p)", trec, s); ASSERT(smp_locked == trec); - result = s -> current_value; + result = RELAXED_LOAD(&s->current_value); return result; } @@ -248,8 +248,8 @@ static void *unlock_tvar(Capability *cap, TRACE("%p : unlock_tvar(%p, %p)", trec, s, c); ASSERT(smp_locked == trec); if (force_update) { - s -> current_value = c; - dirty_TVAR(cap,s); + RELAXED_STORE(&s->current_value, c); + dirty_TVAR(cap, s); } } @@ -259,7 +259,7 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, StgClosure *result; TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected); ASSERT(smp_locked == trec); - result = s -> current_value; + result = RELAXED_LOAD(&s->current_value); TRACE("%p : %d", result ? "success" : "failure"); return (result == expected); } @@ -285,7 +285,7 @@ static StgClosure *lock_tvar(StgTRecHeader *trec, TRACE("%p : lock_tvar(%p)", trec, s); do { do { - result = s -> current_value; + result = RELAXED_LOAD(&s->current_value); } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info); } while (cas((void *)&(s -> current_value), (StgWord)result, (StgWord)trec) != (StgWord)result); @@ -298,9 +298,9 @@ static void unlock_tvar(Capability *cap, StgClosure *c, StgBool force_update STG_UNUSED) { TRACE("%p : unlock_tvar(%p, %p)", trec, s, c); - ASSERT(s -> current_value == (StgClosure *)trec); - s -> current_value = c; - dirty_TVAR(cap,s); + ASSERT(RELAXED_LOAD(&s->current_value) == (StgClosure *)trec); + RELEASE_STORE(&s->current_value, c); + dirty_TVAR(cap, s); } static StgBool cond_lock_tvar(StgTRecHeader *trec, @@ -514,8 +514,8 @@ static void build_watch_queue_entries_for_trec(Capability *cap, StgTVarWatchQueue *fq; s = e -> tvar; TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s); - ACQ_ASSERT(s -> current_value == (StgClosure *)trec); - NACQ_ASSERT(s -> current_value == e -> expected_value); + ACQ_ASSERT(RELAXED_LOAD(&s->current_value) == (StgClosure *)trec); + NACQ_ASSERT(RELAXED_LOAD(&s->current_value) == e -> expected_value); fq = s -> first_watch_queue_entry; q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso); q -> next_queue_entry = fq; @@ -551,7 +551,7 @@ static void remove_watch_queue_entries_for_trec(Capability *cap, trec, q -> closure, s); - ACQ_ASSERT(s -> current_value == (StgClosure *)trec); + ACQ_ASSERT(RELAXED_LOAD(&s->current_value) == (StgClosure *)trec); nq = q -> next_queue_entry; pq = q -> prev_queue_entry; if (nq != END_STM_WATCH_QUEUE) { @@ -709,7 +709,7 @@ static StgBool entry_is_read_only(TRecEntry *e) { static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) { StgClosure *c; StgBool result; - c = s -> current_value; + c = RELAXED_LOAD(&s->current_value); result = (c == (StgClosure *) h); return result; } @@ -782,13 +782,16 @@ static StgBool validate_and_acquire_ownership (Capability *cap, ASSERT(config_use_read_phase); IF_STM_FG_LOCKS({ TRACE("%p : will need to check %p", trec, s); - if (s -> current_value != e -> expected_value) { + // The memory ordering here must ensure that we have two distinct + // reads to current_value, with the read from num_updates between + // them. + if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) { TRACE("%p : doesn't match", trec); result = false; BREAK_FOR_EACH; } - e -> num_updates = s -> num_updates; - if (s -> current_value != e -> expected_value) { + e->num_updates = SEQ_CST_LOAD(&s->num_updates); + if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) { TRACE("%p : doesn't match (race)", trec); result = false; BREAK_FOR_EACH; @@ -810,7 +813,7 @@ static StgBool validate_and_acquire_ownership (Capability *cap, // check_read_only : check that we've seen an atomic snapshot of the // non-updated TVars accessed by a trec. This checks that the last TRec to // commit an update to the TVar is unchanged since the value was stashed in -// validate_and_acquire_ownership. If no udpate is seen to any TVar than +// validate_and_acquire_ownership. If no update is seen to any TVar then // all of them contained their expected values at the start of the call to // check_read_only. // @@ -829,11 +832,16 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) { if (entry_is_read_only(e)) { TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates); + // We must first load current_value then num_updates; this is inverse of + // the order of the stores in stmCommitTransaction. + StgClosure *current_value = SEQ_CST_LOAD(&s->current_value); + StgInt num_updates = SEQ_CST_LOAD(&s->num_updates); + // Note we need both checks and in this order as the TVar could be // locked by another transaction that is committing but has not yet // incremented `num_updates` (See #7815). - if (s -> current_value != e -> expected_value || - s -> num_updates != e -> num_updates) { + if (current_value != e->expected_value || + num_updates != e->num_updates) { TRACE("%p : mismatch", trec); result = false; BREAK_FOR_EACH; @@ -869,17 +877,22 @@ void stmPreGCHook (Capability *cap) { #define TOKEN_BATCH_SIZE 1024 +#if defined(THREADED_RTS) + static volatile StgInt64 max_commits = 0; -#if defined(THREADED_RTS) static volatile StgWord token_locked = false; +static StgInt64 getMaxCommits(void) { + return RELAXED_LOAD(&max_commits); +} + static void getTokenBatch(Capability *cap) { while (cas((void *)&token_locked, false, true) == true) { /* nothing */ } - max_commits += TOKEN_BATCH_SIZE; - TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits); + NONATOMIC_ADD(&max_commits, TOKEN_BATCH_SIZE); + TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, RELAXED_LOAD(&max_commits)); cap -> transaction_tokens = TOKEN_BATCH_SIZE; - token_locked = false; + RELEASE_STORE(&token_locked, false); } static void getToken(Capability *cap) { @@ -889,6 +902,10 @@ static void getToken(Capability *cap) { cap -> transaction_tokens --; } #else +static StgInt64 getMaxCommits(void) { + return 0; +} + static void getToken(Capability *cap STG_UNUSED) { // Nothing } @@ -1044,7 +1061,7 @@ static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeade /*......................................................................*/ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { - StgInt64 max_commits_at_start = max_commits; + StgInt64 max_commits_at_start = getMaxCommits(); TRACE("%p : stmCommitTransaction()", trec); ASSERT(trec != NO_TREC); @@ -1070,7 +1087,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { result = check_read_only(trec); TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed"); - max_commits_at_end = max_commits; + max_commits_at_end = getMaxCommits(); max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) + (n_capabilities * TOKEN_BATCH_SIZE)); if (((max_concurrent_commits >> 32) > 0) || shake()) { @@ -1095,7 +1112,8 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s); unpark_waiters_on(cap,s); IF_STM_FG_LOCKS({ - s -> num_updates ++; + // We have locked the TVar therefore nonatomic addition is sufficient + NONATOMIC_ADD(&s->num_updates, 1); }); unlock_tvar(cap, trec, s, e -> new_value, true); } @@ -1251,12 +1269,12 @@ StgBool stmReWait(Capability *cap, StgTSO *tso) { static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) { StgClosure *result; - result = tvar -> current_value; + result = ACQUIRE_LOAD(&tvar->current_value); #if defined(STM_FG_LOCKS) while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) { TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result); - result = tvar -> current_value; + result = ACQUIRE_LOAD(&tvar->current_value); } #endif |