summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDuncan Coutts <duncan@well-typed.com>2020-12-29 17:39:19 +0000
committerMarge Bot <ben+marge-bot@smart-cactus.org>2022-11-22 02:06:17 -0500
commitced9acdbe757331d8d0046df30a06e61b05dd204 (patch)
tree52f6796b25bf23c5eeaa0b88e4b794b09d30386a
parent5cf709c541a46a17ef2e36d589ba13d949d058e1 (diff)
downloadhaskell-ced9acdbe757331d8d0046df30a06e61b05dd204.tar.gz
Move {blocked,sleeping}_queue from scheduler global vars to CapIOManager
The blocked_queue_{hd,tl} and the sleeping_queue are currently cooperatively managed between the scheduler and (some but not all of) the non-threaded I/O manager implementations. They lived as global vars with the scheduler, but are poked by I/O primops and the I/O manager backends. This patch is a step on the path towards making the management of I/O or timer blocking belong to the I/O managers and not the scheduler. Specifically, this patch moves the {blocked,sleeping}_queue from being global vars in the scheduler to being members of the CapIOManager struct within each Capability. They are not yet exclusively used by the I/O managers: they are still poked from a couple other places, notably in the scheduler before calling awaitEvent.
-rw-r--r--rts/IOManager.c34
-rw-r--r--rts/IOManager.h16
-rw-r--r--rts/RaiseAsync.c5
-rw-r--r--rts/Schedule.c35
-rw-r--r--rts/Schedule.h15
-rw-r--r--rts/posix/Select.c28
-rw-r--r--rts/win32/AsyncMIO.c11
7 files changed, 80 insertions, 64 deletions
diff --git a/rts/IOManager.c b/rts/IOManager.c
index 5a3b0d0698..b4ce11ff76 100644
--- a/rts/IOManager.c
+++ b/rts/IOManager.c
@@ -43,9 +43,15 @@ void initCapabilityIOManager(CapIOManager **piomgr)
(CapIOManager *) stgMallocBytes(sizeof(CapIOManager),
"initCapabilityIOManager");
-#if defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
+#if defined(THREADED_RTS)
+#if !defined(mingw32_HOST_OS)
iomgr->control_fd = -1;
#endif
+#else // !defined(THREADED_RTS)
+ iomgr->blocked_queue_hd = END_TSO_QUEUE;
+ iomgr->blocked_queue_tl = END_TSO_QUEUE;
+ iomgr->sleeping_queue = END_TSO_QUEUE;
+#endif
*piomgr = iomgr;
}
@@ -150,11 +156,17 @@ void wakeupIOManager(void)
#endif
}
-
-void markCapabilityIOManager(evac_fn evac STG_UNUSED,
- void *user STG_UNUSED,
- CapIOManager *iomgr STG_UNUSED)
+void markCapabilityIOManager(evac_fn evac USED_IF_NOT_THREADS,
+ void *user USED_IF_NOT_THREADS,
+ CapIOManager *iomgr USED_IF_NOT_THREADS)
{
+
+#if !defined(THREADED_RTS)
+ evac(user, (StgClosure **)(void *)&iomgr->blocked_queue_hd);
+ evac(user, (StgClosure **)(void *)&iomgr->blocked_queue_tl);
+ evac(user, (StgClosure **)(void *)&iomgr->sleeping_queue);
+#endif
+
}
@@ -178,18 +190,18 @@ setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
void appendToIOBlockedQueue(StgTSO *tso)
{
ASSERT(tso->_link == END_TSO_QUEUE);
- if (blocked_queue_hd == END_TSO_QUEUE) {
- blocked_queue_hd = tso;
+ if (MainCapability.iomgr->blocked_queue_hd == END_TSO_QUEUE) {
+ MainCapability.iomgr->blocked_queue_hd = tso;
} else {
- setTSOLink(&MainCapability, blocked_queue_tl, tso);
+ setTSOLink(&MainCapability, MainCapability.iomgr->blocked_queue_tl, tso);
}
- blocked_queue_tl = tso;
+ MainCapability.iomgr->blocked_queue_tl = tso;
}
void insertIntoSleepingQueue(StgTSO *tso, LowResTime target)
{
StgTSO *prev = NULL;
- StgTSO *t = sleeping_queue;
+ StgTSO *t = MainCapability.iomgr->sleeping_queue;
while (t != END_TSO_QUEUE && t->block_info.target < target) {
prev = t;
t = t->_link;
@@ -197,7 +209,7 @@ void insertIntoSleepingQueue(StgTSO *tso, LowResTime target)
tso->_link = t;
if (prev == NULL) {
- sleeping_queue = tso;
+ MainCapability.iomgr->sleeping_queue = tso;
} else {
setTSOLink(&MainCapability, prev, tso);
}
diff --git a/rts/IOManager.h b/rts/IOManager.h
index f1d8cfe5c2..515cfc2f85 100644
--- a/rts/IOManager.h
+++ b/rts/IOManager.h
@@ -43,6 +43,22 @@ typedef struct {
/* Control FD for the MIO manager for this capability */
int control_fd;
#endif
+#else // !defined(THREADED_RTS)
+ /* Thread queue for threads blocked on I/O completion.
+ * Used by the select() and Win32 MIO I/O managers. It is not used by
+ * the WinIO I/O manager, though it remains defined in this case.
+ */
+ StgTSO *blocked_queue_hd;
+ StgTSO *blocked_queue_tl;
+
+ /* Thread queue for threads blocked on timeouts.
+ * Used by the select() I/O manager only. It is grossly inefficient, like
+ * everything else to do with the select() I/O manager.
+ *
+ * TODO: It is not used by any of the Windows I/O managers, though it
+ * remains defined for them. This is an oddity that should be resolved.
+ */
+ StgTSO *sleeping_queue;
#endif
} CapIOManager;
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index b668b6a178..f727383082 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -708,7 +708,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
#endif
- removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso);
+ removeThreadFromDeQueue(cap, &cap->iomgr->blocked_queue_hd,
+ &cap->iomgr->blocked_queue_tl, tso);
#if defined(mingw32_HOST_OS)
/* (Cooperatively) signal that the worker thread should abort
* the request.
@@ -718,7 +719,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
goto done;
case BlockedOnDelay:
- removeThreadFromQueue(cap, &sleeping_queue, tso);
+ removeThreadFromQueue(cap, &cap->iomgr->sleeping_queue, tso);
goto done;
#endif
diff --git a/rts/Schedule.c b/rts/Schedule.c
index c1ef05930b..95eba3f958 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -70,13 +70,6 @@
* Global variables
* -------------------------------------------------------------------------- */
-#if !defined(THREADED_RTS)
-// Blocked/sleeping threads
-StgTSO *blocked_queue_hd = NULL;
-StgTSO *blocked_queue_tl = NULL;
-StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
-#endif
-
// Bytes allocated since the last time a HeapOverflow exception was thrown by
// the RTS
uint64_t allocated_bytes_at_heapoverflow = 0;
@@ -910,7 +903,13 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
// run queue is empty, and there are no other tasks running, we
// can wait indefinitely for something to happen.
//
- if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
+ // TODO: this empty-queue test is highly dubious because it only makes
+ // sense for some I/O managers. The sleeping_queue is _only_ used by the
+ // select() I/O manager. The WinIO I/O manager does not use either the
+ // sleeping_queue or the blocked_queue, so both queues will _always_ be
+ // empty and so awaitEvent will _never_ be called here for WinIO. This may
+ // explain why there is a second call to awaitEvent below for mingw32.
+ if ( !EMPTY_BLOCKED_QUEUE(cap) || !EMPTY_SLEEPING_QUEUE(cap) )
{
awaitEvent (emptyRunQueue(cap));
}
@@ -2365,11 +2364,6 @@ deleteAllThreads ()
// somewhere, and the main scheduler loop has to deal with it.
// Also, the run queue is the only thing keeping these threads from
// being GC'd, and we don't want the "main thread has been GC'd" panic.
-
-#if !defined(THREADED_RTS)
- ASSERT(blocked_queue_hd == END_TSO_QUEUE);
- ASSERT(sleeping_queue == END_TSO_QUEUE);
-#endif
}
/* -----------------------------------------------------------------------------
@@ -2703,12 +2697,6 @@ startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
void
initScheduler(void)
{
-#if !defined(THREADED_RTS)
- blocked_queue_hd = END_TSO_QUEUE;
- blocked_queue_tl = END_TSO_QUEUE;
- sleeping_queue = END_TSO_QUEUE;
-#endif
-
sched_state = SCHED_RUNNING;
SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
@@ -2793,14 +2781,9 @@ freeScheduler( void )
#endif
}
-void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
- void *user USED_IF_NOT_THREADS)
+void markScheduler (evac_fn evac STG_UNUSED,
+ void *user STG_UNUSED)
{
-#if !defined(THREADED_RTS)
- evac(user, (StgClosure **)(void *)&blocked_queue_hd);
- evac(user, (StgClosure **)(void *)&blocked_queue_tl);
- evac(user, (StgClosure **)(void *)&sleeping_queue);
-#endif
}
/* -----------------------------------------------------------------------------
diff --git a/rts/Schedule.h b/rts/Schedule.h
index d12a85cfbd..a5b3bb3f26 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -105,14 +105,6 @@ extern volatile StgWord sched_state;
*/
extern volatile StgWord recent_activity;
-/* Thread queues.
- * Locks required : sched_mutex
- */
-#if !defined(THREADED_RTS)
-extern StgTSO *blocked_queue_hd, *blocked_queue_tl;
-extern StgTSO *sleeping_queue;
-#endif
-
extern bool heap_overflow;
#if defined(THREADED_RTS)
@@ -184,8 +176,8 @@ truncateRunQueue(Capability *cap)
}
#if !defined(THREADED_RTS)
-#define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd))
-#define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))
+#define EMPTY_BLOCKED_QUEUE(cap) (emptyQueue(cap->iomgr->blocked_queue_hd))
+#define EMPTY_SLEEPING_QUEUE(cap) (emptyQueue(cap->iomgr->sleeping_queue))
#endif
INLINE_HEADER bool
@@ -193,7 +185,8 @@ emptyThreadQueues(Capability *cap)
{
return emptyRunQueue(cap)
#if !defined(THREADED_RTS)
- && EMPTY_BLOCKED_QUEUE() && EMPTY_SLEEPING_QUEUE()
+ // TODO replace this by a test that deferrs to the active I/O manager
+ && EMPTY_BLOCKED_QUEUE(cap) && EMPTY_SLEEPING_QUEUE(cap)
#endif
;
}
diff --git a/rts/posix/Select.c b/rts/posix/Select.c
index 2f9f95728c..f02fd63981 100644
--- a/rts/posix/Select.c
+++ b/rts/posix/Select.c
@@ -98,12 +98,12 @@ static bool wakeUpSleepingThreads (LowResTime now)
StgTSO *tso;
bool flag = false;
- while (sleeping_queue != END_TSO_QUEUE) {
- tso = sleeping_queue;
+ while (MainCapability.iomgr->sleeping_queue != END_TSO_QUEUE) {
+ tso = MainCapability.iomgr->sleeping_queue;
if (((long)now - (long)tso->block_info.target) < 0) {
break;
}
- sleeping_queue = tso->_link;
+ MainCapability.iomgr->sleeping_queue = tso->_link;
tso->why_blocked = NotBlocked;
tso->_link = END_TSO_QUEUE;
IF_DEBUG(scheduler, debugBelch("Waking up sleeping thread %"
@@ -253,7 +253,9 @@ awaitEvent(bool wait)
FD_ZERO(&rfd);
FD_ZERO(&wfd);
- for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
+ for(tso = MainCapability.iomgr->blocked_queue_hd;
+ tso != END_TSO_QUEUE;
+ tso = next) {
next = tso->_link;
/* On older FreeBSDs, FD_SETSIZE is unsigned. Cast it to signed int
@@ -298,7 +300,7 @@ awaitEvent(bool wait)
tv.tv_sec = 0;
tv.tv_usec = 0;
ptv = &tv;
- } else if (sleeping_queue != END_TSO_QUEUE) {
+ } else if (MainCapability.iomgr->sleeping_queue != END_TSO_QUEUE) {
/* SUSv2 allows implementations to have an implementation defined
* maximum timeout for select(2). The standard requires
* implementations to silently truncate values exceeding this maximum
@@ -317,7 +319,10 @@ awaitEvent(bool wait)
*/
const time_t max_seconds = 2678400; // 31 * 24 * 60 * 60
- Time min = LowResTimeToTime(sleeping_queue->block_info.target - now);
+ Time min = LowResTimeToTime(
+ MainCapability.iomgr->sleeping_queue->block_info.target
+ - now
+ );
tv.tv_sec = TimeToSeconds(min);
if (tv.tv_sec < max_seconds) {
tv.tv_usec = TimeToUS(min) % 1000000;
@@ -385,7 +390,9 @@ awaitEvent(bool wait)
* traversed blocked TSOs. As a result you
* can't use functions accessing 'blocked_queue_hd'.
*/
- for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
+ for(tso = MainCapability.iomgr->blocked_queue_hd;
+ tso != END_TSO_QUEUE;
+ tso = next) {
next = tso->_link;
int fd;
enum FdState fd_state = RTS_FD_IS_BLOCKING;
@@ -435,7 +442,7 @@ awaitEvent(bool wait)
break;
case RTS_FD_IS_BLOCKING:
if (prev == NULL)
- blocked_queue_hd = tso;
+ MainCapability.iomgr->blocked_queue_hd = tso;
else
setTSOLink(&MainCapability, prev, tso);
prev = tso;
@@ -444,10 +451,11 @@ awaitEvent(bool wait)
}
if (prev == NULL)
- blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
+ MainCapability.iomgr->blocked_queue_hd =
+ MainCapability.iomgr->blocked_queue_tl = END_TSO_QUEUE;
else {
prev->_link = END_TSO_QUEUE;
- blocked_queue_tl = prev;
+ MainCapability.iomgr->blocked_queue_tl = prev;
}
}
diff --git a/rts/win32/AsyncMIO.c b/rts/win32/AsyncMIO.c
index 6a2a43bd08..65204bd416 100644
--- a/rts/win32/AsyncMIO.c
+++ b/rts/win32/AsyncMIO.c
@@ -224,6 +224,8 @@ awaitRequests(bool wait)
#if !defined(THREADED_RTS)
// none of this is actually used in the threaded RTS
+ CapIOManager *iomgr = MainCapability.iomgr;
+
start:
#if 0
fprintf(stderr, "awaitRequests(): %d %d %d\n",
@@ -289,7 +291,7 @@ start:
unsigned int rID = completedTable[i].reqID;
prev = NULL;
- for(tso = blocked_queue_hd; tso != END_TSO_QUEUE;
+ for(tso = iomgr->blocked_queue_hd; tso != END_TSO_QUEUE;
tso = tso->_link) {
switch(tso->why_blocked) {
@@ -309,10 +311,11 @@ start:
if (prev) {
setTSOLink(&MainCapability, prev, tso->_link);
} else {
- blocked_queue_hd = tso->_link;
+ iomgr->blocked_queue_hd = tso->_link;
}
- if (blocked_queue_tl == tso) {
- blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
+ if (iomgr->blocked_queue_tl == tso) {
+ iomgr->blocked_queue_tl = prev ? prev
+ : END_TSO_QUEUE;
}
// Terminates the run queue + this inner for-loop.