diff options
author | Ben Gamari <ben@smart-cactus.org> | 2023-03-24 20:41:58 -0400 |
---|---|---|
committer | Ben Gamari <ben@smart-cactus.org> | 2023-04-24 06:03:49 -0400 |
commit | 3c1e4a696def01b1a42d673228b7dd7be5f6cd7b (patch) | |
tree | a9c7a094704d77a61ed34442b0308b0948084b45 /rts | |
parent | 5946bdf1054038b676e6214cd8be0c20ec4d4fbc (diff) | |
download | haskell-3c1e4a696def01b1a42d673228b7dd7be5f6cd7b.tar.gz |
rts: Fix synchronization on thread blocking state
Diffstat (limited to 'rts')
-rw-r--r-- | rts/Exception.cmm | 2 | ||||
-rw-r--r-- | rts/PrimOps.cmm | 50 | ||||
-rw-r--r-- | rts/RaiseAsync.c | 4 | ||||
-rw-r--r-- | rts/STM.c | 2 | ||||
-rw-r--r-- | rts/Schedule.c | 3 | ||||
-rw-r--r-- | rts/StgMiscClosures.cmm | 2 | ||||
-rw-r--r-- | rts/Threads.c | 11 | ||||
-rw-r--r-- | rts/TraverseHeap.c | 12 | ||||
-rw-r--r-- | rts/include/rts/storage/TSO.h | 2 | ||||
-rw-r--r-- | rts/include/stg/SMP.h | 13 | ||||
-rw-r--r-- | rts/posix/Select.c | 4 | ||||
-rw-r--r-- | rts/sm/Compact.c | 15 | ||||
-rw-r--r-- | rts/sm/NonMovingMark.c | 15 | ||||
-rw-r--r-- | rts/sm/Scav.c | 19 | ||||
-rw-r--r-- | rts/win32/AsyncMIO.c | 2 |
15 files changed, 89 insertions, 67 deletions
diff --git a/rts/Exception.cmm b/rts/Exception.cmm index ff577aace6..96b52e3c3f 100644 --- a/rts/Exception.cmm +++ b/rts/Exception.cmm @@ -351,9 +351,9 @@ stg_killThreadzh (P_ target, P_ exception) if (msg == NULL) { return (); } else { - StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo; updateRemembSetPushPtr(StgTSO_block_info(CurrentTSO)); StgTSO_block_info(CurrentTSO) = msg; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo; // we must block, and unlock the message before returning jump stg_block_throwto (target, exception); } diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index 51bb8981f7..d0aa154a12 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -1743,7 +1743,7 @@ stg_takeMVarzh ( P_ mvar /* :: MVar a */ ) } StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = mvar; - StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; StgMVar_tail(mvar) = q; jump stg_block_takemvar(mvar); @@ -1910,7 +1910,7 @@ stg_putMVarzh ( P_ mvar, /* :: MVar a */ } StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = mvar; - StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; StgMVar_tail(mvar) = q; jump stg_block_putmvar(mvar,val); @@ -2047,11 +2047,11 @@ loop: } } - ASSERT(StgTSO_block_info(tso) == mvar); // save why_blocked here, because waking up the thread destroys // this information W_ why_blocked; - why_blocked = TO_W_(StgTSO_why_blocked(tso)); + why_blocked = TO_W_(StgTSO_why_blocked(tso)); // TODO: Missing barrier + ASSERT(StgTSO_block_info(tso) == mvar); // actually perform the takeMVar W_ stack; @@ -2112,7 +2112,7 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ ) StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = mvar; - StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16; StgMVar_head(mvar) = q; if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) { @@ -2240,17 +2240,16 @@ stg_readIOPortzh ( P_ ioport /* :: IOPort a */ ) StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); - // See Note [Heap memory barriers] - RELEASE_FENCE; StgMVar_head(ioport) = q; StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = ioport; - StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; + + // See Note [Heap memory barriers] + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; //Unlocks the closure as well jump stg_block_readmvar(ioport); - } //This way we can check of there has been a read already. @@ -2328,11 +2327,11 @@ loop: // next element in the waiting list here, as there can only ever // be one thread blocked on a port. - ASSERT(StgTSO_block_info(tso) == ioport); // save why_blocked here, because waking up the thread destroys // this information W_ why_blocked; - why_blocked = TO_W_(StgTSO_why_blocked(tso)); + why_blocked = TO_W_(StgTSO_why_blocked(tso)); // TODO Missing acquire + ASSERT(StgTSO_block_info(tso) == ioport); // actually perform the takeMVar W_ stack; @@ -2574,8 +2573,8 @@ stg_waitReadzh ( W_ fd ) #else ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); - StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16; StgTSO_block_info(CurrentTSO) = fd; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16; // No locking - we're not going to use this interface in the // threaded RTS anyway. ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr"); @@ -2590,8 +2589,8 @@ stg_waitWritezh ( W_ fd ) #else ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); - StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16; StgTSO_block_info(CurrentTSO) = fd; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16; // No locking - we're not going to use this interface in the // threaded RTS anyway. ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr"); @@ -2613,7 +2612,6 @@ stg_delayzh ( W_ us_delay ) #else ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); - StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16; #if defined(mingw32_HOST_OS) @@ -2630,12 +2628,13 @@ stg_delayzh ( W_ us_delay ) * simplifies matters, so change the status to OnDoProc put the * delayed thread on the blocked_queue. */ - StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16; ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr"); jump stg_block_async_void(); #else + %relaxed StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16; (target) = ccall getDelayTarget(us_delay); StgTSO_block_info(CurrentTSO) = target; @@ -2657,9 +2656,6 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf ) ccall barf("asyncRead# on threaded RTS") never returns; #else - ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); - StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16; - /* could probably allocate this on the heap instead */ ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult, "stg_asyncReadzh"); @@ -2668,6 +2664,10 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf ) StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; + + ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16; + ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr"); jump stg_block_async(); #endif @@ -2682,9 +2682,6 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf ) ccall barf("asyncWrite# on threaded RTS") never returns; #else - ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); - StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16; - ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult, "stg_asyncWritezh"); (reqID) = ccall addIORequest(fd, 1/*TRUE*/,is_sock,len,buf "ptr"); @@ -2693,6 +2690,10 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf ) StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; + + ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16; + ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr"); jump stg_block_async(); #endif @@ -2707,9 +2708,6 @@ stg_asyncDoProczh ( W_ proc, W_ param ) ccall barf("asyncDoProc# on threaded RTS") never returns; #else - ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); - StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16; - /* could probably allocate this on the heap instead */ ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult, "stg_asyncDoProczh"); @@ -2718,6 +2716,10 @@ stg_asyncDoProczh ( W_ proc, W_ param ) StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; + + ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16; + ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr"); jump stg_block_async(); #endif diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index f727383082..0209e334f2 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -266,7 +266,7 @@ check_target: return THROWTO_BLOCKED; } - status = target->why_blocked; + status = ACQUIRE_LOAD(&target->why_blocked); switch (status) { case NotBlocked: @@ -728,7 +728,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) } done: - tso->why_blocked = NotBlocked; + RELAXED_STORE(&tso->why_blocked, NotBlocked); appendToRunQueue(cap, tso); } @@ -340,8 +340,8 @@ static StgBool cond_lock_tvar(Capability *cap, static void park_tso(StgTSO *tso) { ASSERT(tso -> why_blocked == NotBlocked); - tso -> why_blocked = BlockedOnSTM; tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE; + RELEASE_STORE(&tso -> why_blocked, BlockedOnSTM); TRACE("park_tso on tso=%p", tso); } diff --git a/rts/Schedule.c b/rts/Schedule.c index 66043c26ab..d8c71203e8 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -512,7 +512,8 @@ run_thread: #endif if (ret == ThreadBlocked) { - if (t->why_blocked == BlockedOnBlackHole) { + uint16_t why_blocked = ACQUIRE_LOAD(&t->why_blocked); + if (why_blocked == BlockedOnBlackHole) { StgTSO *owner = blackHoleOwner(t->block_info.bh->bh); traceEventStopThread(cap, t, t->why_blocked + 6, owner != NULL ? owner->id : 0); diff --git a/rts/StgMiscClosures.cmm b/rts/StgMiscClosures.cmm index 222a12e9c6..efe71bb20b 100644 --- a/rts/StgMiscClosures.cmm +++ b/rts/StgMiscClosures.cmm @@ -605,8 +605,8 @@ retry: if (r == 0) { goto retry; } else { - StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16; StgTSO_block_info(CurrentTSO) = msg; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16; jump stg_block_blackhole(node); } } diff --git a/rts/Threads.c b/rts/Threads.c index b36b4ddb81..29a9525574 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -94,8 +94,8 @@ createThread(Capability *cap, W_ size) // Always start with the compiled code evaluator tso->what_next = ThreadRunGHC; - tso->why_blocked = NotBlocked; tso->block_info.closure = (StgClosure *)END_TSO_QUEUE; + tso->why_blocked = NotBlocked; tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; tso->bq = (StgBlockingQueue *)END_TSO_QUEUE; tso->flags = 0; @@ -285,7 +285,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso) } #endif - switch (tso->why_blocked) + switch (ACQUIRE_LOAD(&tso->why_blocked)) { case BlockedOnMVar: case BlockedOnMVarRead: @@ -825,10 +825,11 @@ loop: } } - ASSERT(tso->block_info.closure == (StgClosure*)mvar); // save why_blocked here, because waking up the thread destroys // this information - StgWord why_blocked = RELAXED_LOAD(&tso->why_blocked); + StgWord why_blocked = ACQUIRE_LOAD(&tso->why_blocked); + ASSERT(why_blocked == BlockedOnMVarRead); + ASSERT(tso->block_info.closure == (StgClosure*)mvar); // actually perform the takeMVar StgStack* stack = tso->stackobj; @@ -902,7 +903,7 @@ StgMutArrPtrs *listThreads(Capability *cap) void printThreadBlockage(StgTSO *tso) { - switch (tso->why_blocked) { + switch (ACQUIRE_LOAD(&tso->why_blocked)) { #if defined(mingw32_HOST_OS) case BlockedOnDoProc: debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID); diff --git a/rts/TraverseHeap.c b/rts/TraverseHeap.c index 027f99ebe0..5224e66aa9 100644 --- a/rts/TraverseHeap.c +++ b/rts/TraverseHeap.c @@ -1239,12 +1239,14 @@ inner_loop: traversePushClosure(ts, (StgClosure *) tso->blocked_exceptions, c, sep, child_data); traversePushClosure(ts, (StgClosure *) tso->bq, c, sep, child_data); traversePushClosure(ts, (StgClosure *) tso->trec, c, sep, child_data); - if ( tso->why_blocked == BlockedOnMVar - || tso->why_blocked == BlockedOnMVarRead - || tso->why_blocked == BlockedOnBlackHole - || tso->why_blocked == BlockedOnMsgThrowTo - ) { + switch (ACQUIRE_LOAD(&tso->why_blocked)) { + case BlockedOnMVar: + case BlockedOnMVarRead: + case BlockedOnBlackHole: + case BlockedOnMsgThrowTo: traversePushClosure(ts, tso->block_info.closure, c, sep, child_data); + break; + default: } goto loop; } diff --git a/rts/include/rts/storage/TSO.h b/rts/include/rts/storage/TSO.h index 4ca19853d7..7c171fbdbd 100644 --- a/rts/include/rts/storage/TSO.h +++ b/rts/include/rts/storage/TSO.h @@ -128,7 +128,7 @@ typedef struct StgTSO_ { StgWord16 what_next; // Values defined in Constants.h StgWord16 why_blocked; // Values defined in Constants.h StgWord32 flags; // Values defined in Constants.h - StgTSOBlockInfo block_info; + StgTSOBlockInfo block_info; // Barrier provided by why_blocked StgThreadID id; StgWord32 saved_errno; StgWord32 dirty; /* non-zero => dirty */ diff --git a/rts/include/stg/SMP.h b/rts/include/stg/SMP.h index 6cdbb15d14..c8facda14a 100644 --- a/rts/include/stg/SMP.h +++ b/rts/include/stg/SMP.h @@ -177,6 +177,7 @@ EXTERN_INLINE void load_load_barrier(void); * - StgMutArrPtrs: payload * - StgSmallMutArrPtrs: payload * - StgThunk although this is a somewhat special case; see below + * - StgTSO: block_info * * Writing to a mutable pointer field must be done via a release-store. * Reading from such a field is done via an acquire-load. @@ -211,8 +212,8 @@ EXTERN_INLINE void load_load_barrier(void); * particular, they have the unique property of being updatable, transforming * from a thunk to an indirection. This transformation requires its own * synchronization protocol. In particular, we must ensure that a reader - * examining a thunk being updated can see the indirectee. Consequently, a - * thunk update (see rts/Updates.h) does the following: + * examining a thunk being updated by another core can see the indirectee. + * Consequently, a thunk update (see rts/Updates.h) does the following: * * 1. Use a relaxed-store to place the new indirectee into the thunk's * indirectee field @@ -268,6 +269,14 @@ EXTERN_INLINE void load_load_barrier(void); * the capability-local mut_list. Consequently this does not require any memory * barrier. * + * Barriers in thread blocking + * --------------------------- + * When a thread blocks (e.g. on an MVar) it will typically allocate a heap object + * to record its blocked-ness (e.g. a StgMVarTSOQueue), expose this via + * StgTSO.block_info, and update StgTSO.why_blocked to record the reason for + * its blocking. The visibility of the block_info is guaranteed by the ordering + * of the why_blocked update. + * * Barriers in thread migration * ---------------------------- * When a thread is migrated from one capability to another we must take care diff --git a/rts/posix/Select.c b/rts/posix/Select.c index 89a46fd763..848739a904 100644 --- a/rts/posix/Select.c +++ b/rts/posix/Select.c @@ -105,7 +105,7 @@ static bool wakeUpSleepingThreads (Capability *cap, LowResTime now) break; } iomgr->sleeping_queue = tso->_link; - tso->why_blocked = NotBlocked; + RELAXED_STORE(&tso->why_blocked, NotBlocked); tso->_link = END_TSO_QUEUE; IF_DEBUG(scheduler, debugBelch("Waking up sleeping thread %" FMT_StgThreadID "\n", tso->id)); @@ -268,7 +268,7 @@ awaitEvent(Capability *cap, bool wait) * So the (int) cast should be removed across the code base once * GHC requires a version of FreeBSD that has that change in it. */ - switch (tso->why_blocked) { + switch (ACQUIRE_LOAD(&tso->why_blocked)) { case BlockedOnRead: { int fd = tso->block_info.fd; diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c index f6e65ecc9a..c1b445ac88 100644 --- a/rts/sm/Compact.c +++ b/rts/sm/Compact.c @@ -463,13 +463,16 @@ thread_TSO (StgTSO *tso) thread_(&tso->_link); thread_(&tso->global_link); - if ( tso->why_blocked == BlockedOnMVar - || tso->why_blocked == BlockedOnMVarRead - || tso->why_blocked == BlockedOnBlackHole - || tso->why_blocked == BlockedOnMsgThrowTo - || tso->why_blocked == NotBlocked - ) { + switch (ACQUIRE_LOAD(&tso->why_blocked)) { + case BlockedOnMVar: + case BlockedOnMVarRead: + case BlockedOnBlackHole: + case BlockedOnMsgThrowTo: + case NotBlocked: thread_(&tso->block_info.closure); + break; + default: + break; } thread_(&tso->blocked_exceptions); thread_(&tso->bq); diff --git a/rts/sm/NonMovingMark.c b/rts/sm/NonMovingMark.c index b0931f5767..60ada48193 100644 --- a/rts/sm/NonMovingMark.c +++ b/rts/sm/NonMovingMark.c @@ -1051,13 +1051,16 @@ trace_tso (MarkQueue *queue, StgTSO *tso) if (tso->label != NULL) { markQueuePushClosure_(queue, (StgClosure *) tso->label); } - if ( tso->why_blocked == BlockedOnMVar - || tso->why_blocked == BlockedOnMVarRead - || tso->why_blocked == BlockedOnBlackHole - || tso->why_blocked == BlockedOnMsgThrowTo - || tso->why_blocked == NotBlocked - ) { + switch (ACQUIRE_LOAD(&tso->why_blocked)) { + case BlockedOnMVar: + case BlockedOnMVarRead: + case BlockedOnBlackHole: + case BlockedOnMsgThrowTo: + case NotBlocked: markQueuePushClosure_(queue, tso->block_info.closure); + break; + default: + break; } } diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c index 8debec6a66..a09130a091 100644 --- a/rts/sm/Scav.c +++ b/rts/sm/Scav.c @@ -137,23 +137,24 @@ scavengeTSO (StgTSO *tso) evacuate((StgClosure **)&tso->label); } - if ( tso->why_blocked == BlockedOnMVar - || tso->why_blocked == BlockedOnMVarRead - || tso->why_blocked == BlockedOnBlackHole - || tso->why_blocked == BlockedOnMsgThrowTo - || tso->why_blocked == NotBlocked - ) { + switch (ACQUIRE_LOAD(&tso->why_blocked)) { + case BlockedOnMVar: + case BlockedOnMVarRead: + case BlockedOnBlackHole: + case BlockedOnMsgThrowTo: + case NotBlocked: evacuate(&tso->block_info.closure); - } + break; + default: #if defined(THREADED_RTS) // in the THREADED_RTS, block_info.closure must always point to a // valid closure, because we assume this in throwTo(). In the // non-threaded RTS it might be a FD (for // BlockedOnRead/BlockedOnWrite) or a time value (BlockedOnDelay) - else { tso->block_info.closure = (StgClosure *)END_TSO_QUEUE; - } #endif + break; + } tso->dirty = gct->failed_to_evac; diff --git a/rts/win32/AsyncMIO.c b/rts/win32/AsyncMIO.c index 00d1638d63..3539c36304 100644 --- a/rts/win32/AsyncMIO.c +++ b/rts/win32/AsyncMIO.c @@ -294,7 +294,7 @@ start: for(tso = iomgr->blocked_queue_hd; tso != END_TSO_QUEUE; tso = tso->_link) { - switch(tso->why_blocked) { + switch(ACQUIRE_LOAD(&tso->why_blocked)) { case BlockedOnRead: case BlockedOnWrite: case BlockedOnDoProc: |