diff options
-rw-r--r-- | rts/IOManager.c | 34 | ||||
-rw-r--r-- | rts/IOManager.h | 16 | ||||
-rw-r--r-- | rts/RaiseAsync.c | 5 | ||||
-rw-r--r-- | rts/Schedule.c | 35 | ||||
-rw-r--r-- | rts/Schedule.h | 15 | ||||
-rw-r--r-- | rts/posix/Select.c | 28 | ||||
-rw-r--r-- | rts/win32/AsyncMIO.c | 11 |
7 files changed, 80 insertions, 64 deletions
diff --git a/rts/IOManager.c b/rts/IOManager.c index 5a3b0d0698..b4ce11ff76 100644 --- a/rts/IOManager.c +++ b/rts/IOManager.c @@ -43,9 +43,15 @@ void initCapabilityIOManager(CapIOManager **piomgr) (CapIOManager *) stgMallocBytes(sizeof(CapIOManager), "initCapabilityIOManager"); -#if defined(THREADED_RTS) && !defined(mingw32_HOST_OS) +#if defined(THREADED_RTS) +#if !defined(mingw32_HOST_OS) iomgr->control_fd = -1; #endif +#else // !defined(THREADED_RTS) + iomgr->blocked_queue_hd = END_TSO_QUEUE; + iomgr->blocked_queue_tl = END_TSO_QUEUE; + iomgr->sleeping_queue = END_TSO_QUEUE; +#endif *piomgr = iomgr; } @@ -150,11 +156,17 @@ void wakeupIOManager(void) #endif } - -void markCapabilityIOManager(evac_fn evac STG_UNUSED, - void *user STG_UNUSED, - CapIOManager *iomgr STG_UNUSED) +void markCapabilityIOManager(evac_fn evac USED_IF_NOT_THREADS, + void *user USED_IF_NOT_THREADS, + CapIOManager *iomgr USED_IF_NOT_THREADS) { + +#if !defined(THREADED_RTS) + evac(user, (StgClosure **)(void *)&iomgr->blocked_queue_hd); + evac(user, (StgClosure **)(void *)&iomgr->blocked_queue_tl); + evac(user, (StgClosure **)(void *)&iomgr->sleeping_queue); +#endif + } @@ -178,18 +190,18 @@ setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) { void appendToIOBlockedQueue(StgTSO *tso) { ASSERT(tso->_link == END_TSO_QUEUE); - if (blocked_queue_hd == END_TSO_QUEUE) { - blocked_queue_hd = tso; + if (MainCapability.iomgr->blocked_queue_hd == END_TSO_QUEUE) { + MainCapability.iomgr->blocked_queue_hd = tso; } else { - setTSOLink(&MainCapability, blocked_queue_tl, tso); + setTSOLink(&MainCapability, MainCapability.iomgr->blocked_queue_tl, tso); } - blocked_queue_tl = tso; + MainCapability.iomgr->blocked_queue_tl = tso; } void insertIntoSleepingQueue(StgTSO *tso, LowResTime target) { StgTSO *prev = NULL; - StgTSO *t = sleeping_queue; + StgTSO *t = MainCapability.iomgr->sleeping_queue; while (t != END_TSO_QUEUE && t->block_info.target < target) { prev = t; t = t->_link; @@ -197,7 +209,7 @@ void insertIntoSleepingQueue(StgTSO *tso, LowResTime target) tso->_link = t; if (prev == NULL) { - sleeping_queue = tso; + MainCapability.iomgr->sleeping_queue = tso; } else { setTSOLink(&MainCapability, prev, tso); } diff --git a/rts/IOManager.h b/rts/IOManager.h index f1d8cfe5c2..515cfc2f85 100644 --- a/rts/IOManager.h +++ b/rts/IOManager.h @@ -43,6 +43,22 @@ typedef struct { /* Control FD for the MIO manager for this capability */ int control_fd; #endif +#else // !defined(THREADED_RTS) + /* Thread queue for threads blocked on I/O completion. + * Used by the select() and Win32 MIO I/O managers. It is not used by + * the WinIO I/O manager, though it remains defined in this case. + */ + StgTSO *blocked_queue_hd; + StgTSO *blocked_queue_tl; + + /* Thread queue for threads blocked on timeouts. + * Used by the select() I/O manager only. It is grossly inefficient, like + * everything else to do with the select() I/O manager. + * + * TODO: It is not used by any of the Windows I/O managers, though it + * remains defined for them. This is an oddity that should be resolved. + */ + StgTSO *sleeping_queue; #endif } CapIOManager; diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index b668b6a178..f727383082 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -708,7 +708,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) #if defined(mingw32_HOST_OS) case BlockedOnDoProc: #endif - removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso); + removeThreadFromDeQueue(cap, &cap->iomgr->blocked_queue_hd, + &cap->iomgr->blocked_queue_tl, tso); #if defined(mingw32_HOST_OS) /* (Cooperatively) signal that the worker thread should abort * the request. @@ -718,7 +719,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) goto done; case BlockedOnDelay: - removeThreadFromQueue(cap, &sleeping_queue, tso); + removeThreadFromQueue(cap, &cap->iomgr->sleeping_queue, tso); goto done; #endif diff --git a/rts/Schedule.c b/rts/Schedule.c index c1ef05930b..95eba3f958 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -70,13 +70,6 @@ * Global variables * -------------------------------------------------------------------------- */ -#if !defined(THREADED_RTS) -// Blocked/sleeping threads -StgTSO *blocked_queue_hd = NULL; -StgTSO *blocked_queue_tl = NULL; -StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table? -#endif - // Bytes allocated since the last time a HeapOverflow exception was thrown by // the RTS uint64_t allocated_bytes_at_heapoverflow = 0; @@ -910,7 +903,13 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) // run queue is empty, and there are no other tasks running, we // can wait indefinitely for something to happen. // - if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) ) + // TODO: this empty-queue test is highly dubious because it only makes + // sense for some I/O managers. The sleeping_queue is _only_ used by the + // select() I/O manager. The WinIO I/O manager does not use either the + // sleeping_queue or the blocked_queue, so both queues will _always_ be + // empty and so awaitEvent will _never_ be called here for WinIO. This may + // explain why there is a second call to awaitEvent below for mingw32. + if ( !EMPTY_BLOCKED_QUEUE(cap) || !EMPTY_SLEEPING_QUEUE(cap) ) { awaitEvent (emptyRunQueue(cap)); } @@ -2365,11 +2364,6 @@ deleteAllThreads () // somewhere, and the main scheduler loop has to deal with it. // Also, the run queue is the only thing keeping these threads from // being GC'd, and we don't want the "main thread has been GC'd" panic. - -#if !defined(THREADED_RTS) - ASSERT(blocked_queue_hd == END_TSO_QUEUE); - ASSERT(sleeping_queue == END_TSO_QUEUE); -#endif } /* ----------------------------------------------------------------------------- @@ -2703,12 +2697,6 @@ startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS) void initScheduler(void) { -#if !defined(THREADED_RTS) - blocked_queue_hd = END_TSO_QUEUE; - blocked_queue_tl = END_TSO_QUEUE; - sleeping_queue = END_TSO_QUEUE; -#endif - sched_state = SCHED_RUNNING; SEQ_CST_STORE(&recent_activity, ACTIVITY_YES); @@ -2793,14 +2781,9 @@ freeScheduler( void ) #endif } -void markScheduler (evac_fn evac USED_IF_NOT_THREADS, - void *user USED_IF_NOT_THREADS) +void markScheduler (evac_fn evac STG_UNUSED, + void *user STG_UNUSED) { -#if !defined(THREADED_RTS) - evac(user, (StgClosure **)(void *)&blocked_queue_hd); - evac(user, (StgClosure **)(void *)&blocked_queue_tl); - evac(user, (StgClosure **)(void *)&sleeping_queue); -#endif } /* ----------------------------------------------------------------------------- diff --git a/rts/Schedule.h b/rts/Schedule.h index d12a85cfbd..a5b3bb3f26 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -105,14 +105,6 @@ extern volatile StgWord sched_state; */ extern volatile StgWord recent_activity; -/* Thread queues. - * Locks required : sched_mutex - */ -#if !defined(THREADED_RTS) -extern StgTSO *blocked_queue_hd, *blocked_queue_tl; -extern StgTSO *sleeping_queue; -#endif - extern bool heap_overflow; #if defined(THREADED_RTS) @@ -184,8 +176,8 @@ truncateRunQueue(Capability *cap) } #if !defined(THREADED_RTS) -#define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd)) -#define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue)) +#define EMPTY_BLOCKED_QUEUE(cap) (emptyQueue(cap->iomgr->blocked_queue_hd)) +#define EMPTY_SLEEPING_QUEUE(cap) (emptyQueue(cap->iomgr->sleeping_queue)) #endif INLINE_HEADER bool @@ -193,7 +185,8 @@ emptyThreadQueues(Capability *cap) { return emptyRunQueue(cap) #if !defined(THREADED_RTS) - && EMPTY_BLOCKED_QUEUE() && EMPTY_SLEEPING_QUEUE() + // TODO replace this by a test that deferrs to the active I/O manager + && EMPTY_BLOCKED_QUEUE(cap) && EMPTY_SLEEPING_QUEUE(cap) #endif ; } diff --git a/rts/posix/Select.c b/rts/posix/Select.c index 2f9f95728c..f02fd63981 100644 --- a/rts/posix/Select.c +++ b/rts/posix/Select.c @@ -98,12 +98,12 @@ static bool wakeUpSleepingThreads (LowResTime now) StgTSO *tso; bool flag = false; - while (sleeping_queue != END_TSO_QUEUE) { - tso = sleeping_queue; + while (MainCapability.iomgr->sleeping_queue != END_TSO_QUEUE) { + tso = MainCapability.iomgr->sleeping_queue; if (((long)now - (long)tso->block_info.target) < 0) { break; } - sleeping_queue = tso->_link; + MainCapability.iomgr->sleeping_queue = tso->_link; tso->why_blocked = NotBlocked; tso->_link = END_TSO_QUEUE; IF_DEBUG(scheduler, debugBelch("Waking up sleeping thread %" @@ -253,7 +253,9 @@ awaitEvent(bool wait) FD_ZERO(&rfd); FD_ZERO(&wfd); - for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) { + for(tso = MainCapability.iomgr->blocked_queue_hd; + tso != END_TSO_QUEUE; + tso = next) { next = tso->_link; /* On older FreeBSDs, FD_SETSIZE is unsigned. Cast it to signed int @@ -298,7 +300,7 @@ awaitEvent(bool wait) tv.tv_sec = 0; tv.tv_usec = 0; ptv = &tv; - } else if (sleeping_queue != END_TSO_QUEUE) { + } else if (MainCapability.iomgr->sleeping_queue != END_TSO_QUEUE) { /* SUSv2 allows implementations to have an implementation defined * maximum timeout for select(2). The standard requires * implementations to silently truncate values exceeding this maximum @@ -317,7 +319,10 @@ awaitEvent(bool wait) */ const time_t max_seconds = 2678400; // 31 * 24 * 60 * 60 - Time min = LowResTimeToTime(sleeping_queue->block_info.target - now); + Time min = LowResTimeToTime( + MainCapability.iomgr->sleeping_queue->block_info.target + - now + ); tv.tv_sec = TimeToSeconds(min); if (tv.tv_sec < max_seconds) { tv.tv_usec = TimeToUS(min) % 1000000; @@ -385,7 +390,9 @@ awaitEvent(bool wait) * traversed blocked TSOs. As a result you * can't use functions accessing 'blocked_queue_hd'. */ - for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) { + for(tso = MainCapability.iomgr->blocked_queue_hd; + tso != END_TSO_QUEUE; + tso = next) { next = tso->_link; int fd; enum FdState fd_state = RTS_FD_IS_BLOCKING; @@ -435,7 +442,7 @@ awaitEvent(bool wait) break; case RTS_FD_IS_BLOCKING: if (prev == NULL) - blocked_queue_hd = tso; + MainCapability.iomgr->blocked_queue_hd = tso; else setTSOLink(&MainCapability, prev, tso); prev = tso; @@ -444,10 +451,11 @@ awaitEvent(bool wait) } if (prev == NULL) - blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE; + MainCapability.iomgr->blocked_queue_hd = + MainCapability.iomgr->blocked_queue_tl = END_TSO_QUEUE; else { prev->_link = END_TSO_QUEUE; - blocked_queue_tl = prev; + MainCapability.iomgr->blocked_queue_tl = prev; } } diff --git a/rts/win32/AsyncMIO.c b/rts/win32/AsyncMIO.c index 6a2a43bd08..65204bd416 100644 --- a/rts/win32/AsyncMIO.c +++ b/rts/win32/AsyncMIO.c @@ -224,6 +224,8 @@ awaitRequests(bool wait) #if !defined(THREADED_RTS) // none of this is actually used in the threaded RTS + CapIOManager *iomgr = MainCapability.iomgr; + start: #if 0 fprintf(stderr, "awaitRequests(): %d %d %d\n", @@ -289,7 +291,7 @@ start: unsigned int rID = completedTable[i].reqID; prev = NULL; - for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; + for(tso = iomgr->blocked_queue_hd; tso != END_TSO_QUEUE; tso = tso->_link) { switch(tso->why_blocked) { @@ -309,10 +311,11 @@ start: if (prev) { setTSOLink(&MainCapability, prev, tso->_link); } else { - blocked_queue_hd = tso->_link; + iomgr->blocked_queue_hd = tso->_link; } - if (blocked_queue_tl == tso) { - blocked_queue_tl = prev ? prev : END_TSO_QUEUE; + if (iomgr->blocked_queue_tl == tso) { + iomgr->blocked_queue_tl = prev ? prev + : END_TSO_QUEUE; } // Terminates the run queue + this inner for-loop. |