diff options
author | Ben Gamari <ben@smart-cactus.org> | 2019-09-30 16:04:17 +0000 |
---|---|---|
committer | Ben Gamari <ben@smart-cactus.org> | 2020-10-24 21:01:54 -0400 |
commit | 8cf50eb1b5f145d7bca9abae6220f4c2622e21b1 (patch) | |
tree | ff796c039fce879dc0bb61203f9d7e3ca06fcb40 /rts/STM.c | |
parent | bf1b0bc78da7dbe5f6fbda54b37a9cb165ff857f (diff) | |
download | haskell-8cf50eb1b5f145d7bca9abae6220f4c2622e21b1.tar.gz |
rts/STM: Use atomics
This fixes a potentially harmful race where we failed to synchronize
before looking at a TVar's current_value.
Also did a bit of refactoring to avoid abstract over management of
max_commits.
Diffstat (limited to 'rts/STM.c')
-rw-r--r-- | rts/STM.c | 72 |
1 files changed, 45 insertions, 27 deletions
@@ -210,7 +210,7 @@ static StgBool cond_lock_tvar(Capability *cap STG_UNUSED, StgClosure *expected) { StgClosure *result; TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected); - result = s -> current_value; + result = RELAXED_LOAD(&s->current_value); TRACE("%p : %s", trec, (result == expected) ? "success" : "failure"); return (result == expected); } @@ -231,7 +231,7 @@ static void lock_stm(StgTRecHeader *trec) { static void unlock_stm(StgTRecHeader *trec STG_UNUSED) { TRACE("%p : unlock_stm()", trec); ASSERT(smp_locked == trec); - smp_locked = 0; + SEQ_CST_STORE(&smp_locked, 0); } static StgClosure *lock_tvar(Capability *cap STG_UNUSED, @@ -240,7 +240,7 @@ static StgClosure *lock_tvar(Capability *cap STG_UNUSED, StgClosure *result; TRACE("%p : lock_tvar(%p)", trec, s); ASSERT(smp_locked == trec); - result = s -> current_value; + result = RELAXED_LOAD(&s->current_value); return result; } @@ -253,7 +253,7 @@ static void *unlock_tvar(Capability *cap, ASSERT(smp_locked == trec); if (force_update) { StgClosure *old_value = s -> current_value; - s -> current_value = c; + RELAXED_STORE(&s->current_value, c); dirty_TVAR(cap, s, old_value); } } @@ -265,7 +265,7 @@ static StgBool cond_lock_tvar(Capability *cap STG_UNUSED, StgClosure *result; TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected); ASSERT(smp_locked == trec); - result = s -> current_value; + result = RELAXED_LOAD(&s->current_value); TRACE("%p : %d", result ? "success" : "failure"); return (result == expected); } @@ -292,7 +292,7 @@ static StgClosure *lock_tvar(Capability *cap, TRACE("%p : lock_tvar(%p)", trec, s); do { do { - result = s -> current_value; + result = RELAXED_LOAD(&s->current_value); } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info); } while (cas((void *)&(s -> current_value), (StgWord)result, (StgWord)trec) != (StgWord)result); @@ -311,8 +311,8 @@ static void unlock_tvar(Capability *cap, StgClosure *c, StgBool force_update STG_UNUSED) { TRACE("%p : unlock_tvar(%p, %p)", trec, s, c); - ASSERT(s -> current_value == (StgClosure *)trec); - s -> current_value = c; + ASSERT(RELAXED_LOAD(&s->current_value) == (StgClosure *)trec); + RELEASE_STORE(&s->current_value, c); dirty_TVAR(cap, s, (StgClosure *) trec); } @@ -532,8 +532,8 @@ static void build_watch_queue_entries_for_trec(Capability *cap, StgTVarWatchQueue *fq; s = e -> tvar; TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s); - ACQ_ASSERT(s -> current_value == (StgClosure *)trec); - NACQ_ASSERT(s -> current_value == e -> expected_value); + ACQ_ASSERT(RELAXED_LOAD(&s->current_value) == (StgClosure *)trec); + NACQ_ASSERT(RELAXED_LOAD(&s->current_value) == e -> expected_value); fq = s -> first_watch_queue_entry; q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso); q -> next_queue_entry = fq; @@ -569,7 +569,7 @@ static void remove_watch_queue_entries_for_trec(Capability *cap, trec, q -> closure, s); - ACQ_ASSERT(s -> current_value == (StgClosure *)trec); + ACQ_ASSERT(RELAXED_LOAD(&s->current_value) == (StgClosure *)trec); nq = q -> next_queue_entry; pq = q -> prev_queue_entry; if (nq != END_STM_WATCH_QUEUE) { @@ -727,7 +727,7 @@ static StgBool entry_is_read_only(TRecEntry *e) { static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) { StgClosure *c; StgBool result; - c = s -> current_value; + c = RELAXED_LOAD(&s->current_value); result = (c == (StgClosure *) h); return result; } @@ -800,13 +800,16 @@ static StgBool validate_and_acquire_ownership (Capability *cap, ASSERT(config_use_read_phase); IF_STM_FG_LOCKS({ TRACE("%p : will need to check %p", trec, s); - if (s -> current_value != e -> expected_value) { + // The memory ordering here must ensure that we have two distinct + // reads to current_value, with the read from num_updates between + // them. + if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) { TRACE("%p : doesn't match", trec); result = false; BREAK_FOR_EACH; } - e -> num_updates = s -> num_updates; - if (s -> current_value != e -> expected_value) { + e->num_updates = SEQ_CST_LOAD(&s->num_updates); + if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) { TRACE("%p : doesn't match (race)", trec); result = false; BREAK_FOR_EACH; @@ -828,7 +831,7 @@ static StgBool validate_and_acquire_ownership (Capability *cap, // check_read_only : check that we've seen an atomic snapshot of the // non-updated TVars accessed by a trec. This checks that the last TRec to // commit an update to the TVar is unchanged since the value was stashed in -// validate_and_acquire_ownership. If no update is seen to any TVar than +// validate_and_acquire_ownership. If no update is seen to any TVar then // all of them contained their expected values at the start of the call to // check_read_only. // @@ -847,11 +850,16 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) { if (entry_is_read_only(e)) { TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates); + // We must first load current_value then num_updates; this is inverse of + // the order of the stores in stmCommitTransaction. + StgClosure *current_value = SEQ_CST_LOAD(&s->current_value); + StgInt num_updates = SEQ_CST_LOAD(&s->num_updates); + // Note we need both checks and in this order as the TVar could be // locked by another transaction that is committing but has not yet // incremented `num_updates` (See #7815). - if (s -> current_value != e -> expected_value || - s -> num_updates != e -> num_updates) { + if (current_value != e->expected_value || + num_updates != e->num_updates) { TRACE("%p : mismatch", trec); result = false; BREAK_FOR_EACH; @@ -887,17 +895,22 @@ void stmPreGCHook (Capability *cap) { #define TOKEN_BATCH_SIZE 1024 +#if defined(THREADED_RTS) + static volatile StgInt64 max_commits = 0; -#if defined(THREADED_RTS) static volatile StgWord token_locked = false; +static StgInt64 getMaxCommits(void) { + return RELAXED_LOAD(&max_commits); +} + static void getTokenBatch(Capability *cap) { while (cas((void *)&token_locked, false, true) == true) { /* nothing */ } - max_commits += TOKEN_BATCH_SIZE; - TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits); + NONATOMIC_ADD(&max_commits, TOKEN_BATCH_SIZE); + TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, RELAXED_LOAD(&max_commits)); cap -> transaction_tokens = TOKEN_BATCH_SIZE; - token_locked = false; + RELEASE_STORE(&token_locked, false); } static void getToken(Capability *cap) { @@ -907,6 +920,10 @@ static void getToken(Capability *cap) { cap -> transaction_tokens --; } #else +static StgInt64 getMaxCommits(void) { + return 0; +} + static void getToken(Capability *cap STG_UNUSED) { // Nothing } @@ -1062,7 +1079,7 @@ static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeade /*......................................................................*/ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { - StgInt64 max_commits_at_start = max_commits; + StgInt64 max_commits_at_start = getMaxCommits(); TRACE("%p : stmCommitTransaction()", trec); ASSERT(trec != NO_TREC); @@ -1088,7 +1105,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { result = check_read_only(trec); TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed"); - max_commits_at_end = max_commits; + max_commits_at_end = getMaxCommits(); max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) + (n_capabilities * TOKEN_BATCH_SIZE)); if (((max_concurrent_commits >> 32) > 0) || shake()) { @@ -1113,7 +1130,8 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s); unpark_waiters_on(cap,s); IF_STM_FG_LOCKS({ - s -> num_updates ++; + // We have locked the TVar therefore nonatomic addition is sufficient + NONATOMIC_ADD(&s->num_updates, 1); }); unlock_tvar(cap, trec, s, e -> new_value, true); } @@ -1269,12 +1287,12 @@ StgBool stmReWait(Capability *cap, StgTSO *tso) { static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) { StgClosure *result; - result = tvar -> current_value; + result = ACQUIRE_LOAD(&tvar->current_value); #if defined(STM_FG_LOCKS) while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) { TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result); - result = tvar -> current_value; + result = ACQUIRE_LOAD(&tvar->current_value); } #endif |