diff options
author | Tamar Christina <tamar@zhox.com> | 2019-06-16 21:30:14 +0100 |
---|---|---|
committer | Ben Gamari <ben@smart-cactus.org> | 2020-07-15 16:41:01 -0400 |
commit | 4bf542bf1cdf2fa468457fc0af21333478293476 (patch) | |
tree | 8378f0fa182d8e829e15fc5d102ba01aa8bd038e /rts/win32 | |
parent | 050da6dd42d0cb293c7fce4a5ccdeb5abe1aadb4 (diff) | |
download | haskell-4bf542bf1cdf2fa468457fc0af21333478293476.tar.gz |
winio: Multiple refactorings and support changes.
Diffstat (limited to 'rts/win32')
-rw-r--r-- | rts/win32/AsyncIO.c | 384 | ||||
-rw-r--r-- | rts/win32/AsyncIO.h | 24 | ||||
-rw-r--r-- | rts/win32/AwaitEvent.c | 16 | ||||
-rw-r--r-- | rts/win32/ConsoleHandler.c | 7 | ||||
-rw-r--r-- | rts/win32/ConsoleHandler.h | 14 | ||||
-rw-r--r-- | rts/win32/IOManager.c | 3 | ||||
-rw-r--r-- | rts/win32/IOManager.h | 5 | ||||
-rw-r--r-- | rts/win32/ThrIOManager.c | 60 | ||||
-rw-r--r-- | rts/win32/libHSbase.def | 2 |
9 files changed, 61 insertions, 454 deletions
diff --git a/rts/win32/AsyncIO.c b/rts/win32/AsyncIO.c deleted file mode 100644 index 49da79d2dd..0000000000 --- a/rts/win32/AsyncIO.c +++ /dev/null @@ -1,384 +0,0 @@ -/* AsyncIO.c - * - * Integrating Win32 asynchronous I/O with the GHC RTS. - * - * (c) sof, 2002-2003. - */ - -#if !defined(THREADED_RTS) - -#include "Rts.h" -#include "RtsUtils.h" -#include <windows.h> -#include <stdio.h> -#include "Schedule.h" -#include "Capability.h" -#include "win32/AsyncIO.h" -#include "win32/IOManager.h" - -/* - * Overview: - * - * Haskell code issue asynchronous I/O requests via the - * async{Read,Write,DoOp}# primops. These cause addIORequest() - * to be invoked, which forwards the request to the underlying - * asynchronous I/O subsystem. Each request is tagged with a unique - * ID. - * - * addIORequest() returns this ID, so that when the blocked CH - * thread is added onto blocked_queue, its TSO is annotated with - * it. Upon completion of an I/O request, the async I/O handling - * code makes a back-call to signal its completion; the local - * onIOComplete() routine. It adds the IO request ID (along with - * its result data) to a queue of completed requests before returning. - * - * The queue of completed IO request is read by the thread operating - * the RTS scheduler. It de-queues the CH threads corresponding - * to the request IDs, making them runnable again. - * - */ - -typedef struct CompletedReq { - unsigned int reqID; - HsInt len; - HsInt errCode; -} CompletedReq; - -#define MAX_REQUESTS 200 - -static CRITICAL_SECTION queue_lock; -static HANDLE completed_req_event = INVALID_HANDLE_VALUE; -static HANDLE abandon_req_wait = INVALID_HANDLE_VALUE; -static HANDLE wait_handles[2]; -static CompletedReq completedTable[MAX_REQUESTS]; -static int completed_hw; -static HANDLE completed_table_sema; -static int issued_reqs; - -static void -onIOComplete(unsigned int reqID, - int fd STG_UNUSED, - HsInt len, - void* buf STG_UNUSED, - HsInt errCode) -{ - DWORD dwRes; - /* Deposit result of request in queue/table..when there's room. */ - dwRes = WaitForSingleObject(completed_table_sema, INFINITE); - switch (dwRes) { - case WAIT_OBJECT_0: - case WAIT_ABANDONED: - break; - default: - /* Not likely */ - fprintf(stderr, - "onIOComplete: failed to grab table semaphore (res=%d, err=%d), " - "dropping request 0x%x\n", reqID, dwRes, GetLastError()); - fflush(stderr); - return; - } - EnterCriticalSection(&queue_lock); - if (completed_hw == MAX_REQUESTS) { - /* Shouldn't happen */ - fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); " - "dropping.\n", reqID); - fflush(stderr); - } else { -#if 0 - fprintf(stderr, "onCompl: %d %d %d %d %d\n", - reqID, len, errCode, issued_reqs, completed_hw); - fflush(stderr); -#endif - completedTable[completed_hw].reqID = reqID; - completedTable[completed_hw].len = len; - completedTable[completed_hw].errCode = errCode; - completed_hw++; - issued_reqs--; - if (completed_hw == 1) { - /* The event is used to wake up the scheduler thread should it - * be blocked waiting for requests to complete. The event resets - * once that thread has cleared out the request queue/table. - */ - SetEvent(completed_req_event); - } - } - LeaveCriticalSection(&queue_lock); -} - -unsigned int -addIORequest(int fd, - bool forWriting, - bool isSock, - HsInt len, - char* buf) -{ - EnterCriticalSection(&queue_lock); - issued_reqs++; - LeaveCriticalSection(&queue_lock); -#if 0 - fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); - fflush(stderr); -#endif - return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete); -} - -unsigned int -addDelayRequest(HsInt usecs) -{ - EnterCriticalSection(&queue_lock); - issued_reqs++; - LeaveCriticalSection(&queue_lock); -#if 0 - fprintf(stderr, "addDelayReq: %d\n", usecs); fflush(stderr); -#endif - return AddDelayRequest(usecs,onIOComplete); -} - -unsigned int -addDoProcRequest(void* proc, void* param) -{ - EnterCriticalSection(&queue_lock); - issued_reqs++; - LeaveCriticalSection(&queue_lock); -#if 0 - fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr); -#endif - return AddProcRequest(proc,param,onIOComplete); -} - - -int -startupAsyncIO() -{ - if (!StartIOManager()) { - return 0; - } - InitializeCriticalSection(&queue_lock); - /* Create a pair of events: - * - * - completed_req_event -- signals the deposit of request result; - * manual reset. - * - abandon_req_wait -- external OS thread tells current - * RTS/Scheduler thread to abandon wait - * for IO request completion. - * Auto reset. - */ - completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL); - abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL); - wait_handles[0] = completed_req_event; - wait_handles[1] = abandon_req_wait; - completed_hw = 0; - if ( !(completed_table_sema = CreateSemaphore(NULL, MAX_REQUESTS, - MAX_REQUESTS, NULL)) ) { - DWORD rc = GetLastError(); - fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n", - (int)rc); - fflush(stderr); - } - - return ( completed_req_event != INVALID_HANDLE_VALUE && - abandon_req_wait != INVALID_HANDLE_VALUE && - completed_table_sema != NULL ); -} - -void -shutdownAsyncIO(bool wait_threads) -{ - ShutdownIOManager(wait_threads); - if (completed_req_event != INVALID_HANDLE_VALUE) { - CloseHandle(completed_req_event); - completed_req_event = INVALID_HANDLE_VALUE; - } - if (abandon_req_wait != INVALID_HANDLE_VALUE) { - CloseHandle(abandon_req_wait); - abandon_req_wait = INVALID_HANDLE_VALUE; - } - if (completed_table_sema != NULL) { - CloseHandle(completed_table_sema); - completed_table_sema = NULL; - } - DeleteCriticalSection(&queue_lock); -} - -/* - * Function: awaitRequests(wait) - * - * Check for the completion of external IO work requests. Worker - * threads signal completion of IO requests by depositing them - * in a table (completedTable). awaitRequests() matches up - * requests in that table with threads on the blocked_queue, - * making the threads whose IO requests have completed runnable - * again. - * - * awaitRequests() is called by the scheduler periodically _or_ if - * it is out of work, and need to wait for the completion of IO - * requests to make further progress. In the latter scenario, - * awaitRequests() will simply block waiting for worker threads - * to complete if the 'completedTable' is empty. - */ -int -awaitRequests(bool wait) -{ -#if !defined(THREADED_RTS) - // none of this is actually used in the threaded RTS - -start: -#if 0 - fprintf(stderr, "awaitRequests(): %d %d %d\n", - issued_reqs, completed_hw, wait); - fflush(stderr); -#endif - EnterCriticalSection(&queue_lock); - // Nothing immediately available & we won't wait - if ((!wait && completed_hw == 0) -#if 0 - // If we just return when wait==false, we'll go into a busy - // wait loop, so I disabled this condition --SDM 18/12/2003 - (issued_reqs == 0 && completed_hw == 0) -#endif - ) { - LeaveCriticalSection(&queue_lock); - return 0; - } - if (completed_hw == 0) { - // empty table, drop lock and wait - LeaveCriticalSection(&queue_lock); - if ( wait && sched_state == SCHED_RUNNING ) { - DWORD dwRes = WaitForMultipleObjects(2, wait_handles, - FALSE, INFINITE); - switch (dwRes) { - case WAIT_OBJECT_0: - // a request was completed - break; - case WAIT_OBJECT_0 + 1: - case WAIT_TIMEOUT: - // timeout (unlikely) or told to abandon waiting - return 0; - case WAIT_FAILED: { - DWORD dw = GetLastError(); - fprintf(stderr, "awaitRequests: wait failed -- " - "error code: %lu\n", dw); fflush(stderr); - return 0; - } - default: - fprintf(stderr, "awaitRequests: unexpected wait return " - "code %lu\n", dwRes); fflush(stderr); - return 0; - } - } else { - return 0; - } - goto start; - } else { - int i; - StgTSO *tso, *prev; - - for (i=0; i < completed_hw; i++) { - /* For each of the completed requests, match up their Ids - * with those of the threads on the blocked_queue. If the - * thread that made the IO request has been subsequently - * killed (and removed from blocked_queue), no match will - * be found for that request Id. - * - * i.e., killing a Haskell thread doesn't attempt to cancel - * the IO request it is blocked on. - * - */ - unsigned int rID = completedTable[i].reqID; - - prev = NULL; - for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; - tso = tso->_link) { - - switch(tso->why_blocked) { - case BlockedOnRead: - case BlockedOnWrite: - case BlockedOnDoProc: - if (tso->block_info.async_result->reqID == rID) { - // Found the thread blocked waiting on request; - // stodgily fill - // in its result block. - tso->block_info.async_result->len = - completedTable[i].len; - tso->block_info.async_result->errCode = - completedTable[i].errCode; - - // Drop the matched TSO from blocked_queue - if (prev) { - setTSOLink(&MainCapability, prev, tso->_link); - } else { - blocked_queue_hd = tso->_link; - } - if (blocked_queue_tl == tso) { - blocked_queue_tl = prev ? prev : END_TSO_QUEUE; - } - - // Terminates the run queue + this inner for-loop. - tso->_link = END_TSO_QUEUE; - tso->why_blocked = NotBlocked; - // save the StgAsyncIOResult in the - // stg_block_async_info stack frame, because - // the block_info field will be overwritten by - // pushOnRunQueue(). - tso->stackobj->sp[1] = (W_)tso->block_info.async_result; - pushOnRunQueue(&MainCapability, tso); - break; - } - break; - default: - if (tso->why_blocked != NotBlocked) { - barf("awaitRequests: odd thread state"); - } - break; - } - - prev = tso; - } - /* Signal that there's completed table slots available */ - if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) { - DWORD dw = GetLastError(); - fprintf(stderr, "awaitRequests: failed to signal semaphore " - "(error code=0x%x)\n", (int)dw); - fflush(stderr); - } - } - completed_hw = 0; - ResetEvent(completed_req_event); - LeaveCriticalSection(&queue_lock); - return 1; - } -#endif /* !THREADED_RTS */ -} - -/* - * Function: abandonRequestWait() - * - * Wake up a thread that's blocked waiting for new IO requests - * to complete (via awaitRequests().) - */ -void -abandonRequestWait( void ) -{ - /* the event is auto-reset, but in case there's no thread - * already waiting on the event, we want to return it to - * a non-signalled state. - * - * Careful! There is no synchronisation between - * abandonRequestWait and awaitRequest, which means that - * abandonRequestWait might be called just before a thread - * goes into a wait, and we miss the abandon signal. So we - * must SetEvent() here rather than PulseEvent() to ensure - * that the event isn't lost. We can re-optimise by resetting - * the event somewhere safe if we know the event has been - * properly serviced (see resetAbandon() below). --SDM 18/12/2003 - */ - SetEvent(abandon_req_wait); -} - -void -resetAbandonRequestWait( void ) -{ - ResetEvent(abandon_req_wait); -} - -#endif /* !defined(THREADED_RTS) */ diff --git a/rts/win32/AsyncIO.h b/rts/win32/AsyncIO.h deleted file mode 100644 index 75d0e0460d..0000000000 --- a/rts/win32/AsyncIO.h +++ /dev/null @@ -1,24 +0,0 @@ -/* AsyncIO.h - * - * Integrating Win32 asynchronous I/O with the GHC RTS. - * - * (c) sof, 2002-2003. - */ - -#pragma once - -extern unsigned int -addIORequest(int fd, - bool forWriting, - bool isSock, - HsInt len, - char* buf); -extern unsigned int addDelayRequest(HsInt usecs); -extern unsigned int addDoProcRequest(void* proc, void* param); -extern int startupAsyncIO(void); -extern void shutdownAsyncIO(bool wait_threads); - -extern int awaitRequests(bool wait); - -extern void abandonRequestWait(void); -extern void resetAbandonRequestWait(void); diff --git a/rts/win32/AwaitEvent.c b/rts/win32/AwaitEvent.c index b639121c87..6a621d6ef5 100644 --- a/rts/win32/AwaitEvent.c +++ b/rts/win32/AwaitEvent.c @@ -14,15 +14,18 @@ * */ #include "Rts.h" +#include "RtsFlags.h" #include "Schedule.h" #include "AwaitEvent.h" #include <windows.h> -#include "win32/AsyncIO.h" +#include "win32/AsyncMIO.h" +#include "win32/AsyncWinIO.h" #include "win32/ConsoleHandler.h" +#include <stdbool.h> // Used to avoid calling abandonRequestWait() if we don't need to. // Protected by sched_mutex. -static uint32_t workerWaitingForRequests = 0; +static bool workerWaitingForRequests = false; void awaitEvent(bool wait) @@ -30,9 +33,12 @@ awaitEvent(bool wait) do { /* Try to de-queue completed IO requests */ - workerWaitingForRequests = 1; - awaitRequests(wait); - workerWaitingForRequests = 0; + workerWaitingForRequests = true; + if (is_io_mng_native_p()) + awaitAsyncRequests(wait); + else + awaitRequests(wait); + workerWaitingForRequests = false; // If a signal was raised, we need to service it // XXX the scheduler loop really should be calling diff --git a/rts/win32/ConsoleHandler.c b/rts/win32/ConsoleHandler.c index 3ddf4103da..05d15868eb 100644 --- a/rts/win32/ConsoleHandler.c +++ b/rts/win32/ConsoleHandler.c @@ -1,13 +1,15 @@ /* * Console control handler support. * + * NOTE: This is the MIO manager, only used for --io-manager=posix. + * For the WINIO manager see base in the GHC.Event modules. */ #include "Rts.h" #include <windows.h> #include "ConsoleHandler.h" #include "Schedule.h" #include "RtsUtils.h" -#include "AsyncIO.h" +#include "AsyncMIO.h" #include "RtsSignals.h" extern int stg_InstallConsoleEvent(int action, StgStablePtr *handler); @@ -86,7 +88,6 @@ static BOOL WINAPI shutdown_handler(DWORD dwCtrlType) return false; case CTRL_C_EVENT: case CTRL_BREAK_EVENT: - // If we're already trying to interrupt the RTS, terminate with // extreme prejudice. So the first ^C tries to exit the program // cleanly, and the second one just kills it. @@ -223,12 +224,12 @@ static BOOL WINAPI generic_handler(DWORD dwCtrlType) #if defined(THREADED_RTS) sendIOManagerEvent((StgWord8) ((dwCtrlType<<1) | 1)); + interruptIOManagerEvent (); #else if ( stg_pending_events < N_PENDING_EVENTS ) { stg_pending_buf[stg_pending_events] = dwCtrlType; stg_pending_events++; } - // we need to wake up awaitEvent() abandonRequestWait(); #endif diff --git a/rts/win32/ConsoleHandler.h b/rts/win32/ConsoleHandler.h index 06af9dd0d0..bb7278abba 100644 --- a/rts/win32/ConsoleHandler.h +++ b/rts/win32/ConsoleHandler.h @@ -1,6 +1,8 @@ /* * Console control handler support. * + * NOTE: This is the MIO manager, only used for --io-manager=posix. + * For the WINIO manager see base in the GHC.Event modules. */ #pragma once @@ -16,24 +18,24 @@ */ #if !defined(THREADED_RTS) -/* +/* * under THREADED_RTS, console events are passed to the IO manager * thread, which starts up the handler. See ThrIOManager.c. */ /* - * Function: signals_pending() - * + * Function: signals_pending() + * * Used by the RTS to check whether new signals have been 'recently' reported. - * If so, the RTS arranges for the delivered signals to be handled by - * de-queueing them from their table, running the associated Haskell + * If so, the RTS arranges for the delivered signals to be handled by + * de-queueing them from their table, running the associated Haskell * signal handler. */ extern StgInt stg_pending_events; #define signals_pending() ( stg_pending_events > 0) -/* +/* * Function: anyUserHandlers() * * Used by the Scheduler to decide whether its worth its while to stick diff --git a/rts/win32/IOManager.c b/rts/win32/IOManager.c index e5da32b982..47bcf4bcf4 100644 --- a/rts/win32/IOManager.c +++ b/rts/win32/IOManager.c @@ -3,6 +3,9 @@ * Non-blocking / asynchronous I/O for Win32. * * (c) sof, 2002-2003. + * + * NOTE: This is the MIO manager, only used for --io-manager=posix. + * For the WINIO manager see base in the GHC.Event modules. */ #if !defined(THREADED_RTS) diff --git a/rts/win32/IOManager.h b/rts/win32/IOManager.h index a5bd61ab1b..cb876db9cc 100644 --- a/rts/win32/IOManager.h +++ b/rts/win32/IOManager.h @@ -3,6 +3,9 @@ * Non-blocking / asynchronous I/O for Win32. * * (c) sof, 2002-2003 + * + * NOTE: This is the MIO manager, only used for --io-manager=posix. + * For the WINIO manager see base in the GHC.Event modules. */ #pragma once @@ -102,3 +105,5 @@ extern int AddProcRequest ( void* proc, CompletionProc onCompletion); extern void abandonWorkRequest ( int reqID ); + +extern void interruptIOManagerEvent ( void ); diff --git a/rts/win32/ThrIOManager.c b/rts/win32/ThrIOManager.c index 44414b92c3..b70a178faf 100644 --- a/rts/win32/ThrIOManager.c +++ b/rts/win32/ThrIOManager.c @@ -5,6 +5,7 @@ * The IO manager thread in THREADED_RTS. * See also libraries/base/GHC/Conc.hs. * + * NOTE: This is used by both MIO and WINIO * ---------------------------------------------------------------------------*/ #include "Rts.h" @@ -15,28 +16,14 @@ // Here's the Event that we use to wake up the IO manager thread static HANDLE io_manager_event = INVALID_HANDLE_VALUE; -// must agree with values in GHC.Conc: -#define IO_MANAGER_WAKEUP 0xffffffff -#define IO_MANAGER_DIE 0xfffffffe -// spurious wakeups are returned as zero. -// console events are ((event<<1) | 1) - -#if defined(THREADED_RTS) - #define EVENT_BUFSIZ 256 Mutex event_buf_mutex; StgWord32 event_buf[EVENT_BUFSIZ]; uint32_t next_event; -#endif - HANDLE getIOManagerEvent (void) { - // This function has to exist even in the non-THREADED_RTS, - // because code in GHC.Conc refers to it. It won't ever be called - // unless we're in the threaded RTS, however. -#if defined(THREADED_RTS) HANDLE hRes; ACQUIRE_LOCK(&event_buf_mutex); @@ -57,18 +44,12 @@ getIOManagerEvent (void) RELEASE_LOCK(&event_buf_mutex); return hRes; -#else - return NULL; -#endif } HsWord32 readIOManagerEvent (void) { - // This function must exist even in non-THREADED_RTS, - // see getIOManagerEvent() above. -#if defined(THREADED_RTS) HsWord32 res; ACQUIRE_LOCK(&event_buf_mutex); @@ -77,7 +58,11 @@ readIOManagerEvent (void) if (next_event == 0) { res = 0; // no event to return } else { - res = (HsWord32)(event_buf[--next_event]); + do { + // Dequeue as many wakeup events as possible. + res = (HsWord32)(event_buf[--next_event]); + } while (res == IO_MANAGER_WAKEUP && next_event); + if (next_event == 0) { if (!ResetEvent(io_manager_event)) { sysErrorBelch("readIOManagerEvent"); @@ -91,34 +76,45 @@ readIOManagerEvent (void) RELEASE_LOCK(&event_buf_mutex); - // debugBelch("readIOManagerEvent: %d\n", res); + //debugBelch("readIOManagerEvent: %d\n", res); return res; -#else - return 0; -#endif } void sendIOManagerEvent (HsWord32 event) { -#if defined(THREADED_RTS) ACQUIRE_LOCK(&event_buf_mutex); - // debugBelch("sendIOManagerEvent: %d\n", event); + //debugBelch("sendIOManagerEvent: %d to %p\n", event, io_manager_event); if (io_manager_event != INVALID_HANDLE_VALUE) { if (next_event == EVENT_BUFSIZ) { errorBelch("event buffer overflowed; event dropped"); } else { + event_buf[next_event++] = (StgWord32)event; if (!SetEvent(io_manager_event)) { - sysErrorBelch("sendIOManagerEvent"); + sysErrorBelch("sendIOManagerEvent: SetEvent"); stg_exit(EXIT_FAILURE); } - event_buf[next_event++] = (StgWord32)event; } } RELEASE_LOCK(&event_buf_mutex); -#endif +} + +void +interruptIOManagerEvent (void) +{ + if (is_io_mng_native_p ()) { + ACQUIRE_LOCK(&event_buf_mutex); + + /* How expensive is this??. */ + Capability *cap; + cap = rts_lock(); + rts_evalIO(&cap, interruptIOManager_closure, NULL); + rts_unlock(cap); + + RELEASE_LOCK(&event_buf_mutex); + } } void @@ -127,7 +123,6 @@ ioManagerWakeup (void) sendIOManagerEvent(IO_MANAGER_WAKEUP); } -#if defined(THREADED_RTS) void ioManagerDie (void) { @@ -145,7 +140,9 @@ ioManagerDie (void) void ioManagerStart (void) { +#if defined(THREADED_RTS) initMutex(&event_buf_mutex); +#endif next_event = 0; // Make sure the IO manager thread is running @@ -156,4 +153,3 @@ ioManagerStart (void) rts_unlock(cap); } } -#endif diff --git a/rts/win32/libHSbase.def b/rts/win32/libHSbase.def index de4db2244b..fb705bbd9f 100644 --- a/rts/win32/libHSbase.def +++ b/rts/win32/libHSbase.def @@ -27,8 +27,10 @@ EXPORTS base_GHCziPtr_FunPtr_con_info base_GHCziConcziIO_ensureIOManagerIsRunning_closure + base_GHCziConcziIO_interruptIOManager_closure base_GHCziConcziIO_ioManagerCapabilitiesChanged_closure base_GHCziConcziSync_runSparks_closure + base_GHCziEventziWindows_processRemoteCompletion_closure base_GHCziTopHandler_flushStdHandles_closure |