diff options
author | Duncan Coutts <duncan@well-typed.com> | 2020-12-29 16:39:36 +0000 |
---|---|---|
committer | Marge Bot <ben+marge-bot@smart-cactus.org> | 2022-11-22 02:06:17 -0500 |
commit | 5cf709c541a46a17ef2e36d589ba13d949d058e1 (patch) | |
tree | dbd2e595dbd3af2a3f904b084de3aa74e2099a8e | |
parent | 8901285ef64ab4aa16aa34096a1e2be42d4be246 (diff) | |
download | haskell-5cf709c541a46a17ef2e36d589ba13d949d058e1.tar.gz |
Move APPEND_TO_BLOCKED_QUEUE from cmm to C
The I/O and delay blocking primitives for the non-threaded way
currently access the blocked_queue and sleeping_queue directly.
We want to move where those queues are to make their ownership clearer:
to have them clearly belong to the I/O manager impls rather than to the
scheduler. Ultimately we will want to change their representation too.
It's inconvenient to do that if these queues are accessed directly from
cmm code. So as a first step, replace the APPEND_TO_BLOCKED_QUEUE with a
C version appendToIOBlockedQueue(), and replace the open-coded
sleeping_queue insertion with insertIntoSleepingQueue().
-rw-r--r-- | rts/IOManager.c | 31 | ||||
-rw-r--r-- | rts/IOManager.h | 20 | ||||
-rw-r--r-- | rts/PrimOps.cmm | 42 | ||||
-rw-r--r-- | rts/Schedule.h | 16 |
4 files changed, 58 insertions, 51 deletions
diff --git a/rts/IOManager.c b/rts/IOManager.c index 5a6347c537..5a3b0d0698 100644 --- a/rts/IOManager.c +++ b/rts/IOManager.c @@ -19,6 +19,7 @@ #include "rts/IOInterface.h" // exported #include "IOManager.h" // RTS internal #include "Capability.h" +#include "Schedule.h" #include "RtsFlags.h" #include "RtsUtils.h" @@ -172,3 +173,33 @@ setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) { #endif } #endif + +#if !defined(THREADED_RTS) +void appendToIOBlockedQueue(StgTSO *tso) +{ + ASSERT(tso->_link == END_TSO_QUEUE); + if (blocked_queue_hd == END_TSO_QUEUE) { + blocked_queue_hd = tso; + } else { + setTSOLink(&MainCapability, blocked_queue_tl, tso); + } + blocked_queue_tl = tso; +} + +void insertIntoSleepingQueue(StgTSO *tso, LowResTime target) +{ + StgTSO *prev = NULL; + StgTSO *t = sleeping_queue; + while (t != END_TSO_QUEUE && t->block_info.target < target) { + prev = t; + t = t->_link; + } + + tso->_link = t; + if (prev == NULL) { + sleeping_queue = tso; + } else { + setTSOLink(&MainCapability, prev, tso); + } +} +#endif diff --git a/rts/IOManager.h b/rts/IOManager.h index ceb38508ea..f1d8cfe5c2 100644 --- a/rts/IOManager.h +++ b/rts/IOManager.h @@ -22,6 +22,7 @@ #include "BeginPrivate.h" #include "sm/GC.h" // for evac_fn +#include "posix/Select.h" // for LowResTime TODO: switch to normal Time /* The per-capability data structures belonging to the I/O manager. @@ -105,6 +106,25 @@ void wakeupIOManager(void); void markCapabilityIOManager(evac_fn evac, void *user, CapIOManager *iomgr); +#if !defined(THREADED_RTS) +/* Add a thread to the end of the queue of threads blocked on I/O. + * + * This is used by the select() and the Windows MIO non-threaded I/O manager + * implementation. + */ +void appendToIOBlockedQueue(StgTSO *tso); + +/* Insert a thread into the queue of threads blocked on timers. + * + * This is used by the select() I/O manager implementation only. + * + * The sleeping queue is defined for other non-threaded I/O managers but not + * used. This is a wart that should be excised. + */ +void insertIntoSleepingQueue(StgTSO *tso, LowResTime target); +#endif + + /* Pedantic warning cleanliness */ #if !defined(THREADED_RTS) && defined(mingw32_HOST_OS) diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index fe74b3fa81..537050857d 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -2571,18 +2571,6 @@ stg_whereFromzh (P_ clos) Thread I/O blocking primitives -------------------------------------------------------------------------- */ -/* Add a thread to the end of the blocked queue. (C-- version of the C - * macro in Schedule.h). - */ -#define APPEND_TO_BLOCKED_QUEUE(tso) \ - ASSERT(StgTSO__link(tso) == END_TSO_QUEUE); \ - if (W_[blocked_queue_hd] == END_TSO_QUEUE) { \ - W_[blocked_queue_hd] = tso; \ - } else { \ - ccall setTSOLink(MyCapability() "ptr", W_[blocked_queue_tl] "ptr", tso); \ - } \ - W_[blocked_queue_tl] = tso; - stg_waitReadzh ( W_ fd ) { #if defined(THREADED_RTS) @@ -2594,7 +2582,7 @@ stg_waitReadzh ( W_ fd ) StgTSO_block_info(CurrentTSO) = fd; // No locking - we're not going to use this interface in the // threaded RTS anyway. - APPEND_TO_BLOCKED_QUEUE(CurrentTSO); + ccall appendToIOBlockedQueue(CurrentTSO "ptr"); jump stg_block_noregs(); #endif } @@ -2610,7 +2598,7 @@ stg_waitWritezh ( W_ fd ) StgTSO_block_info(CurrentTSO) = fd; // No locking - we're not going to use this interface in the // threaded RTS anyway. - APPEND_TO_BLOCKED_QUEUE(CurrentTSO); + ccall appendToIOBlockedQueue(CurrentTSO "ptr"); jump stg_block_noregs(); #endif } @@ -2647,32 +2635,16 @@ stg_delayzh ( W_ us_delay ) * delayed thread on the blocked_queue. */ StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16; - APPEND_TO_BLOCKED_QUEUE(CurrentTSO); + ccall appendToIOBlockedQueue(CurrentTSO "ptr"); jump stg_block_async_void(); #else - (target) = ccall getDelayTarget(us_delay); StgTSO_block_info(CurrentTSO) = target; - /* Insert the new thread in the sleeping queue. */ - prev = NULL; - t = W_[sleeping_queue]; -while: - if (t != END_TSO_QUEUE && StgTSO_block_info(t) < target) { - prev = t; - t = StgTSO__link(t); - goto while; - } - - StgTSO__link(CurrentTSO) = t; - if (prev == NULL) { - W_[sleeping_queue] = CurrentTSO; - } else { - ccall setTSOLink(MyCapability() "ptr", prev "ptr", CurrentTSO); - } + ccall insertIntoSleepingQueue(CurrentTSO "ptr", target); jump stg_block_noregs(); #endif #endif /* !THREADED_RTS */ @@ -2700,7 +2672,7 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf ) StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; - APPEND_TO_BLOCKED_QUEUE(CurrentTSO); + ccall appendToIOBlockedQueue(CurrentTSO "ptr"); jump stg_block_async(); #endif } @@ -2725,7 +2697,7 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf ) StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; - APPEND_TO_BLOCKED_QUEUE(CurrentTSO); + ccall appendToIOBlockedQueue(CurrentTSO "ptr"); jump stg_block_async(); #endif } @@ -2750,7 +2722,7 @@ stg_asyncDoProczh ( W_ proc, W_ param ) StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; - APPEND_TO_BLOCKED_QUEUE(CurrentTSO); + ccall appendToIOBlockedQueue(CurrentTSO "ptr"); jump stg_block_async(); #endif } diff --git a/rts/Schedule.h b/rts/Schedule.h index eda0aa3f79..d12a85cfbd 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -155,22 +155,6 @@ peekRunQueue (Capability *cap) void promoteInRunQueue (Capability *cap, StgTSO *tso); -/* Add a thread to the end of the blocked queue. - */ -#if !defined(THREADED_RTS) -INLINE_HEADER void -appendToBlockedQueue(StgTSO *tso) -{ - ASSERT(tso->_link == END_TSO_QUEUE); - if (blocked_queue_hd == END_TSO_QUEUE) { - blocked_queue_hd = tso; - } else { - setTSOLink(&MainCapability, blocked_queue_tl, tso); - } - blocked_queue_tl = tso; -} -#endif - /* Check whether various thread queues are empty */ INLINE_HEADER bool |