summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDuncan Coutts <duncan@well-typed.com>2020-12-29 16:39:36 +0000
committerMarge Bot <ben+marge-bot@smart-cactus.org>2022-11-22 02:06:17 -0500
commit5cf709c541a46a17ef2e36d589ba13d949d058e1 (patch)
treedbd2e595dbd3af2a3f904b084de3aa74e2099a8e
parent8901285ef64ab4aa16aa34096a1e2be42d4be246 (diff)
downloadhaskell-5cf709c541a46a17ef2e36d589ba13d949d058e1.tar.gz
Move APPEND_TO_BLOCKED_QUEUE from cmm to C
The I/O and delay blocking primitives for the non-threaded way currently access the blocked_queue and sleeping_queue directly. We want to move where those queues are to make their ownership clearer: to have them clearly belong to the I/O manager impls rather than to the scheduler. Ultimately we will want to change their representation too. It's inconvenient to do that if these queues are accessed directly from cmm code. So as a first step, replace the APPEND_TO_BLOCKED_QUEUE with a C version appendToIOBlockedQueue(), and replace the open-coded sleeping_queue insertion with insertIntoSleepingQueue().
-rw-r--r--rts/IOManager.c31
-rw-r--r--rts/IOManager.h20
-rw-r--r--rts/PrimOps.cmm42
-rw-r--r--rts/Schedule.h16
4 files changed, 58 insertions, 51 deletions
diff --git a/rts/IOManager.c b/rts/IOManager.c
index 5a6347c537..5a3b0d0698 100644
--- a/rts/IOManager.c
+++ b/rts/IOManager.c
@@ -19,6 +19,7 @@
#include "rts/IOInterface.h" // exported
#include "IOManager.h" // RTS internal
#include "Capability.h"
+#include "Schedule.h"
#include "RtsFlags.h"
#include "RtsUtils.h"
@@ -172,3 +173,33 @@ setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
#endif
}
#endif
+
+#if !defined(THREADED_RTS)
+void appendToIOBlockedQueue(StgTSO *tso)
+{
+ ASSERT(tso->_link == END_TSO_QUEUE);
+ if (blocked_queue_hd == END_TSO_QUEUE) {
+ blocked_queue_hd = tso;
+ } else {
+ setTSOLink(&MainCapability, blocked_queue_tl, tso);
+ }
+ blocked_queue_tl = tso;
+}
+
+void insertIntoSleepingQueue(StgTSO *tso, LowResTime target)
+{
+ StgTSO *prev = NULL;
+ StgTSO *t = sleeping_queue;
+ while (t != END_TSO_QUEUE && t->block_info.target < target) {
+ prev = t;
+ t = t->_link;
+ }
+
+ tso->_link = t;
+ if (prev == NULL) {
+ sleeping_queue = tso;
+ } else {
+ setTSOLink(&MainCapability, prev, tso);
+ }
+}
+#endif
diff --git a/rts/IOManager.h b/rts/IOManager.h
index ceb38508ea..f1d8cfe5c2 100644
--- a/rts/IOManager.h
+++ b/rts/IOManager.h
@@ -22,6 +22,7 @@
#include "BeginPrivate.h"
#include "sm/GC.h" // for evac_fn
+#include "posix/Select.h" // for LowResTime TODO: switch to normal Time
/* The per-capability data structures belonging to the I/O manager.
@@ -105,6 +106,25 @@ void wakeupIOManager(void);
void markCapabilityIOManager(evac_fn evac, void *user, CapIOManager *iomgr);
+#if !defined(THREADED_RTS)
+/* Add a thread to the end of the queue of threads blocked on I/O.
+ *
+ * This is used by the select() and the Windows MIO non-threaded I/O manager
+ * implementation.
+ */
+void appendToIOBlockedQueue(StgTSO *tso);
+
+/* Insert a thread into the queue of threads blocked on timers.
+ *
+ * This is used by the select() I/O manager implementation only.
+ *
+ * The sleeping queue is defined for other non-threaded I/O managers but not
+ * used. This is a wart that should be excised.
+ */
+void insertIntoSleepingQueue(StgTSO *tso, LowResTime target);
+#endif
+
+
/* Pedantic warning cleanliness
*/
#if !defined(THREADED_RTS) && defined(mingw32_HOST_OS)
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index fe74b3fa81..537050857d 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -2571,18 +2571,6 @@ stg_whereFromzh (P_ clos)
Thread I/O blocking primitives
-------------------------------------------------------------------------- */
-/* Add a thread to the end of the blocked queue. (C-- version of the C
- * macro in Schedule.h).
- */
-#define APPEND_TO_BLOCKED_QUEUE(tso) \
- ASSERT(StgTSO__link(tso) == END_TSO_QUEUE); \
- if (W_[blocked_queue_hd] == END_TSO_QUEUE) { \
- W_[blocked_queue_hd] = tso; \
- } else { \
- ccall setTSOLink(MyCapability() "ptr", W_[blocked_queue_tl] "ptr", tso); \
- } \
- W_[blocked_queue_tl] = tso;
-
stg_waitReadzh ( W_ fd )
{
#if defined(THREADED_RTS)
@@ -2594,7 +2582,7 @@ stg_waitReadzh ( W_ fd )
StgTSO_block_info(CurrentTSO) = fd;
// No locking - we're not going to use this interface in the
// threaded RTS anyway.
- APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ ccall appendToIOBlockedQueue(CurrentTSO "ptr");
jump stg_block_noregs();
#endif
}
@@ -2610,7 +2598,7 @@ stg_waitWritezh ( W_ fd )
StgTSO_block_info(CurrentTSO) = fd;
// No locking - we're not going to use this interface in the
// threaded RTS anyway.
- APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ ccall appendToIOBlockedQueue(CurrentTSO "ptr");
jump stg_block_noregs();
#endif
}
@@ -2647,32 +2635,16 @@ stg_delayzh ( W_ us_delay )
* delayed thread on the blocked_queue.
*/
StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16;
- APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ ccall appendToIOBlockedQueue(CurrentTSO "ptr");
jump stg_block_async_void();
#else
-
(target) = ccall getDelayTarget(us_delay);
StgTSO_block_info(CurrentTSO) = target;
- /* Insert the new thread in the sleeping queue. */
- prev = NULL;
- t = W_[sleeping_queue];
-while:
- if (t != END_TSO_QUEUE && StgTSO_block_info(t) < target) {
- prev = t;
- t = StgTSO__link(t);
- goto while;
- }
-
- StgTSO__link(CurrentTSO) = t;
- if (prev == NULL) {
- W_[sleeping_queue] = CurrentTSO;
- } else {
- ccall setTSOLink(MyCapability() "ptr", prev "ptr", CurrentTSO);
- }
+ ccall insertIntoSleepingQueue(CurrentTSO "ptr", target);
jump stg_block_noregs();
#endif
#endif /* !THREADED_RTS */
@@ -2700,7 +2672,7 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf )
StgAsyncIOResult_len(ares) = 0;
StgAsyncIOResult_errCode(ares) = 0;
StgTSO_block_info(CurrentTSO) = ares;
- APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ ccall appendToIOBlockedQueue(CurrentTSO "ptr");
jump stg_block_async();
#endif
}
@@ -2725,7 +2697,7 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf )
StgAsyncIOResult_len(ares) = 0;
StgAsyncIOResult_errCode(ares) = 0;
StgTSO_block_info(CurrentTSO) = ares;
- APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ ccall appendToIOBlockedQueue(CurrentTSO "ptr");
jump stg_block_async();
#endif
}
@@ -2750,7 +2722,7 @@ stg_asyncDoProczh ( W_ proc, W_ param )
StgAsyncIOResult_len(ares) = 0;
StgAsyncIOResult_errCode(ares) = 0;
StgTSO_block_info(CurrentTSO) = ares;
- APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ ccall appendToIOBlockedQueue(CurrentTSO "ptr");
jump stg_block_async();
#endif
}
diff --git a/rts/Schedule.h b/rts/Schedule.h
index eda0aa3f79..d12a85cfbd 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -155,22 +155,6 @@ peekRunQueue (Capability *cap)
void promoteInRunQueue (Capability *cap, StgTSO *tso);
-/* Add a thread to the end of the blocked queue.
- */
-#if !defined(THREADED_RTS)
-INLINE_HEADER void
-appendToBlockedQueue(StgTSO *tso)
-{
- ASSERT(tso->_link == END_TSO_QUEUE);
- if (blocked_queue_hd == END_TSO_QUEUE) {
- blocked_queue_hd = tso;
- } else {
- setTSOLink(&MainCapability, blocked_queue_tl, tso);
- }
- blocked_queue_tl = tso;
-}
-#endif
-
/* Check whether various thread queues are empty
*/
INLINE_HEADER bool