summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Gamari <ben@smart-cactus.org>2019-09-27 20:37:41 +0000
committerBen Gamari <ben@smart-cactus.org>2020-10-24 21:00:37 -0400
commitd079b9435382882b0b069ea40bcd287db18082d3 (patch)
tree5d956d0041fea0e1f058e00be6a8ca84b7d38adb
parentf88710185acc0e02b334b96004f4b8fae38c5eb9 (diff)
downloadhaskell-d079b9435382882b0b069ea40bcd287db18082d3.tar.gz
rts: Avoid data races in message handling
-rw-r--r--rts/Capability.h8
-rw-r--r--rts/Messages.c36
-rw-r--r--rts/RaiseAsync.c9
3 files changed, 26 insertions, 27 deletions
diff --git a/rts/Capability.h b/rts/Capability.h
index 7eaffa0225..aab2d84ae7 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -482,8 +482,12 @@ contextSwitchCapability (Capability *cap)
INLINE_HEADER bool emptyInbox(Capability *cap)
{
- return (cap->inbox == (Message*)END_TSO_QUEUE &&
- cap->putMVars == NULL);
+ // This may race with writes to putMVars and inbox but this harmless for the
+ // intended uses of this function.
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->putMVars, "emptyInbox(cap->putMVars)");
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->inbox, "emptyInbox(cap->inbox)");
+ return (RELAXED_LOAD(&cap->inbox) == (Message*)END_TSO_QUEUE &&
+ RELAXED_LOAD(&cap->putMVars) == NULL);
}
#endif
diff --git a/rts/Messages.c b/rts/Messages.c
index 2f80370845..71c2bfdb38 100644
--- a/rts/Messages.c
+++ b/rts/Messages.c
@@ -69,7 +69,8 @@ executeMessage (Capability *cap, Message *m)
loop:
write_barrier(); // allow m->header to be modified by another thread
- i = m->header.info;
+ // TODO: Is the above barrier actually needed? Why is it a write barrier? Something is fishy here.
+ i = ACQUIRE_LOAD(&m->header.info);
if (i == &stg_MSG_TRY_WAKEUP_info)
{
StgTSO *tso = ((MessageWakeup *)m)->tso;
@@ -127,7 +128,7 @@ loop:
else if (i == &stg_WHITEHOLE_info)
{
#if defined(PROF_SPIN)
- ++whitehole_executeMessage_spin;
+ NONATOMIC_ADD(&whitehole_executeMessage_spin, 1);
#endif
goto loop;
}
@@ -169,8 +170,7 @@ uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg)
debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on "
"blackhole %p", (W_)msg->tso->id, msg->bh);
- info = bh->header.info;
- load_load_barrier(); // See Note [Heap memory barriers] in SMP.h
+ info = ACQUIRE_LOAD(&bh->header.info);
// If we got this message in our inbox, it might be that the
// BLACKHOLE has already been updated, and GC has shorted out the
@@ -190,11 +190,8 @@ uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg)
// The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
// or a value.
loop:
- // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load
- // and turns this into an infinite loop.
- p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee));
- info = p->header.info;
- load_load_barrier(); // See Note [Heap memory barriers] in SMP.h
+ p = UNTAG_CLOSURE(ACQUIRE_LOAD(&((StgInd*)bh)->indirectee));
+ info = RELAXED_LOAD(&p->header.info);
if (info == &stg_IND_info)
{
@@ -240,9 +237,8 @@ loop:
// We are about to make the newly-constructed message visible to other cores;
// a barrier is necessary to ensure that all writes are visible.
// See Note [Heap memory barriers] in SMP.h.
- write_barrier();
dirty_TSO(cap, owner); // we will modify owner->bq
- owner->bq = bq;
+ RELEASE_STORE(&owner->bq, bq);
// If the owner of the blackhole is currently runnable, then
// bump it to the front of the run queue. This gives the
@@ -258,11 +254,11 @@ loop:
}
// point to the BLOCKING_QUEUE from the BLACKHOLE
- write_barrier(); // make the BQ visible, see Note [Heap memory barriers].
+ // RELEASE to make the BQ visible, see Note [Heap memory barriers].
+ RELEASE_STORE(&((StgInd*)bh)->indirectee, (StgClosure *)bq);
IF_NONMOVING_WRITE_BARRIER_ENABLED {
updateRemembSetPushClosure(cap, (StgClosure*)p);
}
- ((StgInd*)bh)->indirectee = (StgClosure *)bq;
recordClosureMutated(cap,bh); // bh was mutated
debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
@@ -295,14 +291,14 @@ loop:
// makes it into the update remembered set
updateRemembSetPushClosure(cap, (StgClosure*)bq->queue);
}
- msg->link = bq->queue;
+ RELAXED_STORE(&msg->link, bq->queue);
bq->queue = msg;
// No barrier is necessary here: we are only exposing the
// closure to the GC. See Note [Heap memory barriers] in SMP.h.
recordClosureMutated(cap,(StgClosure*)msg);
if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
- bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ RELAXED_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_DIRTY_info);
// No barrier is necessary here: we are only exposing the
// closure to the GC. See Note [Heap memory barriers] in SMP.h.
recordClosureMutated(cap,(StgClosure*)bq);
@@ -333,7 +329,7 @@ StgTSO * blackHoleOwner (StgClosure *bh)
const StgInfoTable *info;
StgClosure *p;
- info = bh->header.info;
+ info = RELAXED_LOAD(&bh->header.info);
if (info != &stg_BLACKHOLE_info &&
info != &stg_CAF_BLACKHOLE_info &&
@@ -345,10 +341,8 @@ StgTSO * blackHoleOwner (StgClosure *bh)
// The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
// or a value.
loop:
- // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load
- // and turns this into an infinite loop.
- p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee));
- info = p->header.info;
+ p = UNTAG_CLOSURE(ACQUIRE_LOAD(&((StgInd*)bh)->indirectee));
+ info = RELAXED_LOAD(&p->header.info);
if (info == &stg_IND_info) goto loop;
@@ -360,7 +354,7 @@ loop:
info == &stg_BLOCKING_QUEUE_DIRTY_info)
{
StgBlockingQueue *bq = (StgBlockingQueue *)p;
- return bq->owner;
+ return RELAXED_LOAD(&bq->owner);
}
return NULL; // not blocked
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index 719c05435d..893d96ec60 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -232,7 +232,7 @@ uint32_t
throwToMsg (Capability *cap, MessageThrowTo *msg)
{
StgWord status;
- StgTSO *target = msg->target;
+ StgTSO *target = ACQUIRE_LOAD(&msg->target);
Capability *target_cap;
goto check_target;
@@ -245,8 +245,9 @@ check_target:
ASSERT(target != END_TSO_QUEUE);
// Thread already dead?
- if (target->what_next == ThreadComplete
- || target->what_next == ThreadKilled) {
+ StgWord16 what_next = RELAXED_LOAD(&target->what_next);
+ if (what_next == ThreadComplete
+ || what_next == ThreadKilled) {
return THROWTO_SUCCESS;
}
@@ -988,7 +989,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
sp[0] = (W_)raise;
sp[-1] = (W_)&stg_enter_info;
stack->sp = sp-1;
- tso->what_next = ThreadRunGHC;
+ RELAXED_STORE(&tso->what_next, ThreadRunGHC);
goto done;
}