diff options
author | Ben Gamari <ben@smart-cactus.org> | 2019-09-27 20:37:41 +0000 |
---|---|---|
committer | Ben Gamari <ben@smart-cactus.org> | 2020-10-24 21:00:37 -0400 |
commit | d079b9435382882b0b069ea40bcd287db18082d3 (patch) | |
tree | 5d956d0041fea0e1f058e00be6a8ca84b7d38adb | |
parent | f88710185acc0e02b334b96004f4b8fae38c5eb9 (diff) | |
download | haskell-d079b9435382882b0b069ea40bcd287db18082d3.tar.gz |
rts: Avoid data races in message handling
-rw-r--r-- | rts/Capability.h | 8 | ||||
-rw-r--r-- | rts/Messages.c | 36 | ||||
-rw-r--r-- | rts/RaiseAsync.c | 9 |
3 files changed, 26 insertions, 27 deletions
diff --git a/rts/Capability.h b/rts/Capability.h index 7eaffa0225..aab2d84ae7 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -482,8 +482,12 @@ contextSwitchCapability (Capability *cap) INLINE_HEADER bool emptyInbox(Capability *cap) { - return (cap->inbox == (Message*)END_TSO_QUEUE && - cap->putMVars == NULL); + // This may race with writes to putMVars and inbox but this harmless for the + // intended uses of this function. + TSAN_ANNOTATE_BENIGN_RACE(&cap->putMVars, "emptyInbox(cap->putMVars)"); + TSAN_ANNOTATE_BENIGN_RACE(&cap->inbox, "emptyInbox(cap->inbox)"); + return (RELAXED_LOAD(&cap->inbox) == (Message*)END_TSO_QUEUE && + RELAXED_LOAD(&cap->putMVars) == NULL); } #endif diff --git a/rts/Messages.c b/rts/Messages.c index 2f80370845..71c2bfdb38 100644 --- a/rts/Messages.c +++ b/rts/Messages.c @@ -69,7 +69,8 @@ executeMessage (Capability *cap, Message *m) loop: write_barrier(); // allow m->header to be modified by another thread - i = m->header.info; + // TODO: Is the above barrier actually needed? Why is it a write barrier? Something is fishy here. + i = ACQUIRE_LOAD(&m->header.info); if (i == &stg_MSG_TRY_WAKEUP_info) { StgTSO *tso = ((MessageWakeup *)m)->tso; @@ -127,7 +128,7 @@ loop: else if (i == &stg_WHITEHOLE_info) { #if defined(PROF_SPIN) - ++whitehole_executeMessage_spin; + NONATOMIC_ADD(&whitehole_executeMessage_spin, 1); #endif goto loop; } @@ -169,8 +170,7 @@ uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg) debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on " "blackhole %p", (W_)msg->tso->id, msg->bh); - info = bh->header.info; - load_load_barrier(); // See Note [Heap memory barriers] in SMP.h + info = ACQUIRE_LOAD(&bh->header.info); // If we got this message in our inbox, it might be that the // BLACKHOLE has already been updated, and GC has shorted out the @@ -190,11 +190,8 @@ uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg) // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND, // or a value. loop: - // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load - // and turns this into an infinite loop. - p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee)); - info = p->header.info; - load_load_barrier(); // See Note [Heap memory barriers] in SMP.h + p = UNTAG_CLOSURE(ACQUIRE_LOAD(&((StgInd*)bh)->indirectee)); + info = RELAXED_LOAD(&p->header.info); if (info == &stg_IND_info) { @@ -240,9 +237,8 @@ loop: // We are about to make the newly-constructed message visible to other cores; // a barrier is necessary to ensure that all writes are visible. // See Note [Heap memory barriers] in SMP.h. - write_barrier(); dirty_TSO(cap, owner); // we will modify owner->bq - owner->bq = bq; + RELEASE_STORE(&owner->bq, bq); // If the owner of the blackhole is currently runnable, then // bump it to the front of the run queue. This gives the @@ -258,11 +254,11 @@ loop: } // point to the BLOCKING_QUEUE from the BLACKHOLE - write_barrier(); // make the BQ visible, see Note [Heap memory barriers]. + // RELEASE to make the BQ visible, see Note [Heap memory barriers]. + RELEASE_STORE(&((StgInd*)bh)->indirectee, (StgClosure *)bq); IF_NONMOVING_WRITE_BARRIER_ENABLED { updateRemembSetPushClosure(cap, (StgClosure*)p); } - ((StgInd*)bh)->indirectee = (StgClosure *)bq; recordClosureMutated(cap,bh); // bh was mutated debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", @@ -295,14 +291,14 @@ loop: // makes it into the update remembered set updateRemembSetPushClosure(cap, (StgClosure*)bq->queue); } - msg->link = bq->queue; + RELAXED_STORE(&msg->link, bq->queue); bq->queue = msg; // No barrier is necessary here: we are only exposing the // closure to the GC. See Note [Heap memory barriers] in SMP.h. recordClosureMutated(cap,(StgClosure*)msg); if (info == &stg_BLOCKING_QUEUE_CLEAN_info) { - bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info; + RELAXED_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_DIRTY_info); // No barrier is necessary here: we are only exposing the // closure to the GC. See Note [Heap memory barriers] in SMP.h. recordClosureMutated(cap,(StgClosure*)bq); @@ -333,7 +329,7 @@ StgTSO * blackHoleOwner (StgClosure *bh) const StgInfoTable *info; StgClosure *p; - info = bh->header.info; + info = RELAXED_LOAD(&bh->header.info); if (info != &stg_BLACKHOLE_info && info != &stg_CAF_BLACKHOLE_info && @@ -345,10 +341,8 @@ StgTSO * blackHoleOwner (StgClosure *bh) // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND, // or a value. loop: - // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load - // and turns this into an infinite loop. - p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee)); - info = p->header.info; + p = UNTAG_CLOSURE(ACQUIRE_LOAD(&((StgInd*)bh)->indirectee)); + info = RELAXED_LOAD(&p->header.info); if (info == &stg_IND_info) goto loop; @@ -360,7 +354,7 @@ loop: info == &stg_BLOCKING_QUEUE_DIRTY_info) { StgBlockingQueue *bq = (StgBlockingQueue *)p; - return bq->owner; + return RELAXED_LOAD(&bq->owner); } return NULL; // not blocked diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index 719c05435d..893d96ec60 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -232,7 +232,7 @@ uint32_t throwToMsg (Capability *cap, MessageThrowTo *msg) { StgWord status; - StgTSO *target = msg->target; + StgTSO *target = ACQUIRE_LOAD(&msg->target); Capability *target_cap; goto check_target; @@ -245,8 +245,9 @@ check_target: ASSERT(target != END_TSO_QUEUE); // Thread already dead? - if (target->what_next == ThreadComplete - || target->what_next == ThreadKilled) { + StgWord16 what_next = RELAXED_LOAD(&target->what_next); + if (what_next == ThreadComplete + || what_next == ThreadKilled) { return THROWTO_SUCCESS; } @@ -988,7 +989,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, sp[0] = (W_)raise; sp[-1] = (W_)&stg_enter_info; stack->sp = sp-1; - tso->what_next = ThreadRunGHC; + RELAXED_STORE(&tso->what_next, ThreadRunGHC); goto done; } |