summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Gamari <ben@smart-cactus.org>2019-09-30 16:04:17 +0000
committerMoritz Angermann <moritz.angermann@gmail.com>2020-09-18 08:09:58 +0000
commitbc911406b246162e862282ae527c629689236b57 (patch)
tree2a42cf42a7a3c6116b2f42e88780632d448e6caa
parenta63d774c1464e69e7ee48d77d612612a57596241 (diff)
downloadhaskell-bc911406b246162e862282ae527c629689236b57.tar.gz
rts/STM: Use atomics
This fixes a potentially harmful race where we failed to synchronize before looking at a TVar's current_value. Also did a bit of refactoring to avoid abstract over management of max_commits.
-rw-r--r--rts/STM.c76
1 files changed, 47 insertions, 29 deletions
diff --git a/rts/STM.c b/rts/STM.c
index dc0b0ebb78..8a99e34cf4 100644
--- a/rts/STM.c
+++ b/rts/STM.c
@@ -207,7 +207,7 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED,
StgClosure *expected) {
StgClosure *result;
TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
- result = s -> current_value;
+ result = RELAXED_LOAD(&s->current_value);
TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
return (result == expected);
}
@@ -228,7 +228,7 @@ static void lock_stm(StgTRecHeader *trec) {
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
TRACE("%p : unlock_stm()", trec);
ASSERT(smp_locked == trec);
- smp_locked = 0;
+ SEQ_CST_STORE(&smp_locked, 0);
}
static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED,
@@ -236,7 +236,7 @@ static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED,
StgClosure *result;
TRACE("%p : lock_tvar(%p)", trec, s);
ASSERT(smp_locked == trec);
- result = s -> current_value;
+ result = RELAXED_LOAD(&s->current_value);
return result;
}
@@ -248,8 +248,8 @@ static void *unlock_tvar(Capability *cap,
TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
ASSERT(smp_locked == trec);
if (force_update) {
- s -> current_value = c;
- dirty_TVAR(cap,s);
+ RELAXED_STORE(&s->current_value, c);
+ dirty_TVAR(cap, s);
}
}
@@ -259,7 +259,7 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED,
StgClosure *result;
TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
ASSERT(smp_locked == trec);
- result = s -> current_value;
+ result = RELAXED_LOAD(&s->current_value);
TRACE("%p : %d", result ? "success" : "failure");
return (result == expected);
}
@@ -285,7 +285,7 @@ static StgClosure *lock_tvar(StgTRecHeader *trec,
TRACE("%p : lock_tvar(%p)", trec, s);
do {
do {
- result = s -> current_value;
+ result = RELAXED_LOAD(&s->current_value);
} while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info);
} while (cas((void *)&(s -> current_value),
(StgWord)result, (StgWord)trec) != (StgWord)result);
@@ -298,9 +298,9 @@ static void unlock_tvar(Capability *cap,
StgClosure *c,
StgBool force_update STG_UNUSED) {
TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
- ASSERT(s -> current_value == (StgClosure *)trec);
- s -> current_value = c;
- dirty_TVAR(cap,s);
+ ASSERT(RELAXED_LOAD(&s->current_value) == (StgClosure *)trec);
+ RELEASE_STORE(&s->current_value, c);
+ dirty_TVAR(cap, s);
}
static StgBool cond_lock_tvar(StgTRecHeader *trec,
@@ -514,8 +514,8 @@ static void build_watch_queue_entries_for_trec(Capability *cap,
StgTVarWatchQueue *fq;
s = e -> tvar;
TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
- ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
- NACQ_ASSERT(s -> current_value == e -> expected_value);
+ ACQ_ASSERT(RELAXED_LOAD(&s->current_value) == (StgClosure *)trec);
+ NACQ_ASSERT(RELAXED_LOAD(&s->current_value) == e -> expected_value);
fq = s -> first_watch_queue_entry;
q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
q -> next_queue_entry = fq;
@@ -551,7 +551,7 @@ static void remove_watch_queue_entries_for_trec(Capability *cap,
trec,
q -> closure,
s);
- ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
+ ACQ_ASSERT(RELAXED_LOAD(&s->current_value) == (StgClosure *)trec);
nq = q -> next_queue_entry;
pq = q -> prev_queue_entry;
if (nq != END_STM_WATCH_QUEUE) {
@@ -709,7 +709,7 @@ static StgBool entry_is_read_only(TRecEntry *e) {
static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
StgClosure *c;
StgBool result;
- c = s -> current_value;
+ c = RELAXED_LOAD(&s->current_value);
result = (c == (StgClosure *) h);
return result;
}
@@ -782,13 +782,16 @@ static StgBool validate_and_acquire_ownership (Capability *cap,
ASSERT(config_use_read_phase);
IF_STM_FG_LOCKS({
TRACE("%p : will need to check %p", trec, s);
- if (s -> current_value != e -> expected_value) {
+ // The memory ordering here must ensure that we have two distinct
+ // reads to current_value, with the read from num_updates between
+ // them.
+ if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) {
TRACE("%p : doesn't match", trec);
result = false;
BREAK_FOR_EACH;
}
- e -> num_updates = s -> num_updates;
- if (s -> current_value != e -> expected_value) {
+ e->num_updates = SEQ_CST_LOAD(&s->num_updates);
+ if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) {
TRACE("%p : doesn't match (race)", trec);
result = false;
BREAK_FOR_EACH;
@@ -810,7 +813,7 @@ static StgBool validate_and_acquire_ownership (Capability *cap,
// check_read_only : check that we've seen an atomic snapshot of the
// non-updated TVars accessed by a trec. This checks that the last TRec to
// commit an update to the TVar is unchanged since the value was stashed in
-// validate_and_acquire_ownership. If no udpate is seen to any TVar than
+// validate_and_acquire_ownership. If no update is seen to any TVar then
// all of them contained their expected values at the start of the call to
// check_read_only.
//
@@ -829,11 +832,16 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
if (entry_is_read_only(e)) {
TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
+ // We must first load current_value then num_updates; this is inverse of
+ // the order of the stores in stmCommitTransaction.
+ StgClosure *current_value = SEQ_CST_LOAD(&s->current_value);
+ StgInt num_updates = SEQ_CST_LOAD(&s->num_updates);
+
// Note we need both checks and in this order as the TVar could be
// locked by another transaction that is committing but has not yet
// incremented `num_updates` (See #7815).
- if (s -> current_value != e -> expected_value ||
- s -> num_updates != e -> num_updates) {
+ if (current_value != e->expected_value ||
+ num_updates != e->num_updates) {
TRACE("%p : mismatch", trec);
result = false;
BREAK_FOR_EACH;
@@ -869,17 +877,22 @@ void stmPreGCHook (Capability *cap) {
#define TOKEN_BATCH_SIZE 1024
+#if defined(THREADED_RTS)
+
static volatile StgInt64 max_commits = 0;
-#if defined(THREADED_RTS)
static volatile StgWord token_locked = false;
+static StgInt64 getMaxCommits(void) {
+ return RELAXED_LOAD(&max_commits);
+}
+
static void getTokenBatch(Capability *cap) {
while (cas((void *)&token_locked, false, true) == true) { /* nothing */ }
- max_commits += TOKEN_BATCH_SIZE;
- TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits);
+ NONATOMIC_ADD(&max_commits, TOKEN_BATCH_SIZE);
+ TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, RELAXED_LOAD(&max_commits));
cap -> transaction_tokens = TOKEN_BATCH_SIZE;
- token_locked = false;
+ RELEASE_STORE(&token_locked, false);
}
static void getToken(Capability *cap) {
@@ -889,6 +902,10 @@ static void getToken(Capability *cap) {
cap -> transaction_tokens --;
}
#else
+static StgInt64 getMaxCommits(void) {
+ return 0;
+}
+
static void getToken(Capability *cap STG_UNUSED) {
// Nothing
}
@@ -1044,7 +1061,7 @@ static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeade
/*......................................................................*/
StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
- StgInt64 max_commits_at_start = max_commits;
+ StgInt64 max_commits_at_start = getMaxCommits();
TRACE("%p : stmCommitTransaction()", trec);
ASSERT(trec != NO_TREC);
@@ -1070,7 +1087,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
result = check_read_only(trec);
TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
- max_commits_at_end = max_commits;
+ max_commits_at_end = getMaxCommits();
max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
(n_capabilities * TOKEN_BATCH_SIZE));
if (((max_concurrent_commits >> 32) > 0) || shake()) {
@@ -1095,7 +1112,8 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
unpark_waiters_on(cap,s);
IF_STM_FG_LOCKS({
- s -> num_updates ++;
+ // We have locked the TVar therefore nonatomic addition is sufficient
+ NONATOMIC_ADD(&s->num_updates, 1);
});
unlock_tvar(cap, trec, s, e -> new_value, true);
}
@@ -1251,12 +1269,12 @@ StgBool stmReWait(Capability *cap, StgTSO *tso) {
static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
StgClosure *result;
- result = tvar -> current_value;
+ result = ACQUIRE_LOAD(&tvar->current_value);
#if defined(STM_FG_LOCKS)
while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) {
TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result);
- result = tvar -> current_value;
+ result = ACQUIRE_LOAD(&tvar->current_value);
}
#endif