diff options
Diffstat (limited to 'rts/win32')
-rw-r--r-- | rts/win32/AsyncIO.c | 345 | ||||
-rw-r--r-- | rts/win32/AsyncIO.h | 25 | ||||
-rw-r--r-- | rts/win32/AwaitEvent.c | 51 | ||||
-rw-r--r-- | rts/win32/ConsoleHandler.c | 313 | ||||
-rw-r--r-- | rts/win32/ConsoleHandler.h | 63 | ||||
-rw-r--r-- | rts/win32/GetTime.c | 101 | ||||
-rw-r--r-- | rts/win32/IOManager.c | 510 | ||||
-rw-r--r-- | rts/win32/IOManager.h | 110 | ||||
-rw-r--r-- | rts/win32/OSThreads.c | 199 | ||||
-rw-r--r-- | rts/win32/Ticker.c | 124 | ||||
-rw-r--r-- | rts/win32/WorkQueue.c | 215 | ||||
-rw-r--r-- | rts/win32/WorkQueue.h | 37 |
12 files changed, 2093 insertions, 0 deletions
diff --git a/rts/win32/AsyncIO.c b/rts/win32/AsyncIO.c new file mode 100644 index 0000000000..7bcf571cf8 --- /dev/null +++ b/rts/win32/AsyncIO.c @@ -0,0 +1,345 @@ +/* AsyncIO.c + * + * Integrating Win32 asynchronous I/O with the GHC RTS. + * + * (c) sof, 2002-2003. + */ +#include "Rts.h" +#include "RtsUtils.h" +#include <windows.h> +#include <stdio.h> +#include "Schedule.h" +#include "RtsFlags.h" +#include "Capability.h" +#include "win32/AsyncIO.h" +#include "win32/IOManager.h" + +/* + * Overview: + * + * Haskell code issue asynchronous I/O requests via the + * async{Read,Write,DoOp}# primops. These cause addIORequest() + * to be invoked, which forwards the request to the underlying + * asynchronous I/O subsystem. Each request is tagged with a unique + * ID. + * + * addIORequest() returns this ID, so that when the blocked CH + * thread is added onto blocked_queue, its TSO is annotated with + * it. Upon completion of an I/O request, the async I/O handling + * code makes a back-call to signal its completion; the local + * onIOComplete() routine. It adds the IO request ID (along with + * its result data) to a queue of completed requests before returning. + * + * The queue of completed IO request is read by the thread operating + * the RTS scheduler. It de-queues the CH threads corresponding + * to the request IDs, making them runnable again. + * + */ + +typedef struct CompletedReq { + unsigned int reqID; + int len; + int errCode; +} CompletedReq; + +#define MAX_REQUESTS 200 + +static CRITICAL_SECTION queue_lock; +static HANDLE completed_req_event; +static HANDLE abandon_req_wait; +static HANDLE wait_handles[2]; +static CompletedReq completedTable[MAX_REQUESTS]; +static int completed_hw; +static HANDLE completed_table_sema; +static int issued_reqs; + +static void +onIOComplete(unsigned int reqID, + int fd STG_UNUSED, + int len, + void* buf STG_UNUSED, + int errCode) +{ + DWORD dwRes; + /* Deposit result of request in queue/table..when there's room. */ + dwRes = WaitForSingleObject(completed_table_sema, INFINITE); + switch (dwRes) { + case WAIT_OBJECT_0: + break; + default: + /* Not likely */ + fprintf(stderr, "onIOComplete: failed to grab table semaphore, dropping request 0x%x\n", reqID); + fflush(stderr); + return; + } + EnterCriticalSection(&queue_lock); + if (completed_hw == MAX_REQUESTS) { + /* Shouldn't happen */ + fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); dropping.\n", reqID); + fflush(stderr); + } else { +#if 0 + fprintf(stderr, "onCompl: %d %d %d %d %d\n", + reqID, len, errCode, issued_reqs, completed_hw); + fflush(stderr); +#endif + completedTable[completed_hw].reqID = reqID; + completedTable[completed_hw].len = len; + completedTable[completed_hw].errCode = errCode; + completed_hw++; + issued_reqs--; + if (completed_hw == 1) { + /* The event is used to wake up the scheduler thread should it + * be blocked waiting for requests to complete. The event resets once + * that thread has cleared out the request queue/table. + */ + SetEvent(completed_req_event); + } + } + LeaveCriticalSection(&queue_lock); +} + +unsigned int +addIORequest(int fd, + int forWriting, + int isSock, + int len, + char* buf) +{ + EnterCriticalSection(&queue_lock); + issued_reqs++; + LeaveCriticalSection(&queue_lock); +#if 0 + fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr); +#endif + return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete); +} + +unsigned int +addDelayRequest(int msecs) +{ + EnterCriticalSection(&queue_lock); + issued_reqs++; + LeaveCriticalSection(&queue_lock); +#if 0 + fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr); +#endif + return AddDelayRequest(msecs,onIOComplete); +} + +unsigned int +addDoProcRequest(void* proc, void* param) +{ + EnterCriticalSection(&queue_lock); + issued_reqs++; + LeaveCriticalSection(&queue_lock); +#if 0 + fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr); +#endif + return AddProcRequest(proc,param,onIOComplete); +} + + +int +startupAsyncIO() +{ + if (!StartIOManager()) { + return 0; + } + InitializeCriticalSection(&queue_lock); + /* Create a pair of events: + * + * - completed_req_event -- signals the deposit of request result; manual reset. + * - abandon_req_wait -- external OS thread tells current RTS/Scheduler + * thread to abandon wait for IO request completion. + * Auto reset. + */ + completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL); + abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL); + wait_handles[0] = completed_req_event; + wait_handles[1] = abandon_req_wait; + completed_hw = 0; + if ( !(completed_table_sema = CreateSemaphore (NULL, MAX_REQUESTS, MAX_REQUESTS, NULL)) ) { + DWORD rc = GetLastError(); + fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n", rc); + fflush(stderr); + } + + return ( completed_req_event != INVALID_HANDLE_VALUE && + abandon_req_wait != INVALID_HANDLE_VALUE && + completed_table_sema != NULL ); +} + +void +shutdownAsyncIO() +{ + CloseHandle(completed_req_event); + ShutdownIOManager(); +} + +/* + * Function: awaitRequests(wait) + * + * Check for the completion of external IO work requests. Worker + * threads signal completion of IO requests by depositing them + * in a table (completedTable). awaitRequests() matches up + * requests in that table with threads on the blocked_queue, + * making the threads whose IO requests have completed runnable + * again. + * + * awaitRequests() is called by the scheduler periodically _or_ if + * it is out of work, and need to wait for the completion of IO + * requests to make further progress. In the latter scenario, + * awaitRequests() will simply block waiting for worker threads + * to complete if the 'completedTable' is empty. + */ +int +awaitRequests(rtsBool wait) +{ +#ifndef THREADED_RTS + // none of this is actually used in the threaded RTS + +start: +#if 0 + fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait); + fflush(stderr); +#endif + EnterCriticalSection(&queue_lock); + /* Nothing immediately available & we won't wait */ + if ((!wait && completed_hw == 0) +#if 0 + // If we just return when wait==rtsFalse, we'll go into a busy + // wait loop, so I disabled this condition --SDM 18/12/2003 + (issued_reqs == 0 && completed_hw == 0) +#endif + ) { + LeaveCriticalSection(&queue_lock); + return 0; + } + if (completed_hw == 0) { + /* empty table, drop lock and wait */ + LeaveCriticalSection(&queue_lock); + if ( wait && sched_state == SCHED_RUNNING ) { + DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE); + switch (dwRes) { + case WAIT_OBJECT_0: + /* a request was completed */ + break; + case WAIT_OBJECT_0 + 1: + case WAIT_TIMEOUT: + /* timeout (unlikely) or told to abandon waiting */ + return 0; + case WAIT_FAILED: { + DWORD dw = GetLastError(); + fprintf(stderr, "awaitRequests: wait failed -- error code: %lu\n", dw); fflush(stderr); + return 0; + } + default: + fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr); + return 0; + } + } else { + return 0; + } + goto start; + } else { + int i; + StgTSO *tso, *prev; + + for (i=0; i < completed_hw; i++) { + /* For each of the completed requests, match up their Ids + * with those of the threads on the blocked_queue. If the + * thread that made the IO request has been subsequently + * killed (and removed from blocked_queue), no match will + * be found for that request Id. + * + * i.e., killing a Haskell thread doesn't attempt to cancel + * the IO request it is blocked on. + * + */ + unsigned int rID = completedTable[i].reqID; + + prev = NULL; + for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) { + + switch(tso->why_blocked) { + case BlockedOnRead: + case BlockedOnWrite: + case BlockedOnDoProc: + if (tso->block_info.async_result->reqID == rID) { + /* Found the thread blocked waiting on request; stodgily fill + * in its result block. + */ + tso->block_info.async_result->len = completedTable[i].len; + tso->block_info.async_result->errCode = completedTable[i].errCode; + + /* Drop the matched TSO from blocked_queue */ + if (prev) { + prev->link = tso->link; + } else { + blocked_queue_hd = tso->link; + } + if (blocked_queue_tl == tso) { + blocked_queue_tl = prev ? prev : END_TSO_QUEUE; + } + + /* Terminates the run queue + this inner for-loop. */ + tso->link = END_TSO_QUEUE; + tso->why_blocked = NotBlocked; + pushOnRunQueue(&MainCapability, tso); + break; + } + break; + default: + if (tso->why_blocked != NotBlocked) { + barf("awaitRequests: odd thread state"); + } + break; + } + } + /* Signal that there's completed table slots available */ + if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) { + DWORD dw = GetLastError(); + fprintf(stderr, "awaitRequests: failed to signal semaphore (error code=0x%x)\n", dw); + fflush(stderr); + } + } + completed_hw = 0; + ResetEvent(completed_req_event); + LeaveCriticalSection(&queue_lock); + return 1; + } +#endif /* !THREADED_RTS */ +} + +/* + * Function: abandonRequestWait() + * + * Wake up a thread that's blocked waiting for new IO requests + * to complete (via awaitRequests().) + */ +void +abandonRequestWait( void ) +{ + /* the event is auto-reset, but in case there's no thread + * already waiting on the event, we want to return it to + * a non-signalled state. + * + * Careful! There is no synchronisation between + * abandonRequestWait and awaitRequest, which means that + * abandonRequestWait might be called just before a thread + * goes into a wait, and we miss the abandon signal. So we + * must SetEvent() here rather than PulseEvent() to ensure + * that the event isn't lost. We can re-optimise by resetting + * the event somewhere safe if we know the event has been + * properly serviced (see resetAbandon() below). --SDM 18/12/2003 + */ + SetEvent(abandon_req_wait); +} + +void +resetAbandonRequestWait( void ) +{ + ResetEvent(abandon_req_wait); +} + diff --git a/rts/win32/AsyncIO.h b/rts/win32/AsyncIO.h new file mode 100644 index 0000000000..2077ea0cf7 --- /dev/null +++ b/rts/win32/AsyncIO.h @@ -0,0 +1,25 @@ +/* AsyncIO.h + * + * Integrating Win32 asynchronous I/O with the GHC RTS. + * + * (c) sof, 2002-2003. + */ +#ifndef __ASYNCHIO_H__ +#define __ASYNCHIO_H__ +extern unsigned int +addIORequest(int fd, + int forWriting, + int isSock, + int len, + char* buf); +extern unsigned int addDelayRequest(int msecs); +extern unsigned int addDoProcRequest(void* proc, void* param); +extern int startupAsyncIO(void); +extern void shutdownAsyncIO(void); + +extern int awaitRequests(rtsBool wait); + +extern void abandonRequestWait(void); +extern void resetAbandonRequestWait(void); + +#endif /* __ASYNCHIO_H__ */ diff --git a/rts/win32/AwaitEvent.c b/rts/win32/AwaitEvent.c new file mode 100644 index 0000000000..43e188fb34 --- /dev/null +++ b/rts/win32/AwaitEvent.c @@ -0,0 +1,51 @@ +#if !defined(THREADED_RTS) /* to the end */ +/* + * Wait/check for external events. Periodically, the + * Scheduler checks for the completion of external operations, + * like the expiration of timers, completion of I/O requests + * issued by Haskell threads. + * + * If the Scheduler is otherwise out of work, it'll block + * herein waiting for external events to occur. + * + * This file mirrors the select()-based functionality + * for POSIX / Unix platforms in rts/Select.c, but for + * Win32. + * + */ +#include "Rts.h" +#include "Schedule.h" +#include "AwaitEvent.h" +#include <windows.h> +#include "win32/AsyncIO.h" + +// Used to avoid calling abandonRequestWait() if we don't need to. +// Protected by sched_mutex. +static nat workerWaitingForRequests = 0; + +void +awaitEvent(rtsBool wait) +{ + int ret; + + do { + /* Try to de-queue completed IO requests + */ + workerWaitingForRequests = 1; + ret = awaitRequests(wait); + workerWaitingForRequests = 0; + if (!ret) { + return; /* still hold the lock */ + } + + // Return to the scheduler if: + // + // - we were interrupted + // - new threads have arrived + + } while (wait + && sched_state == SCHED_RUNNING + && emptyRunQueue(&MainCapability) + ); +} +#endif diff --git a/rts/win32/ConsoleHandler.c b/rts/win32/ConsoleHandler.c new file mode 100644 index 0000000000..d7096db632 --- /dev/null +++ b/rts/win32/ConsoleHandler.c @@ -0,0 +1,313 @@ +/* + * Console control handler support. + * + */ +#include "Rts.h" +#include <windows.h> +#include "ConsoleHandler.h" +#include "SchedAPI.h" +#include "Schedule.h" +#include "RtsUtils.h" +#include "RtsFlags.h" +#include "AsyncIO.h" +#include "RtsSignals.h" + +extern int stg_InstallConsoleEvent(int action, StgStablePtr *handler); + +static BOOL WINAPI shutdown_handler(DWORD dwCtrlType); +static BOOL WINAPI generic_handler(DWORD dwCtrlType); + +static rtsBool deliver_event = rtsTrue; +static StgInt console_handler = STG_SIG_DFL; + +static HANDLE hConsoleEvent = INVALID_HANDLE_VALUE; + +#define N_PENDING_EVENTS 16 +StgInt stg_pending_events = 0; /* number of undelivered events */ +DWORD stg_pending_buf[N_PENDING_EVENTS]; /* their associated event numbers. */ + +/* + * Function: initUserSignals() + * + * Initialize the console handling substrate. + */ +void +initUserSignals(void) +{ + stg_pending_events = 0; + console_handler = STG_SIG_DFL; + if (hConsoleEvent == INVALID_HANDLE_VALUE) { + hConsoleEvent = + CreateEvent ( NULL, /* default security attributes */ + TRUE, /* manual-reset event */ + FALSE, /* initially non-signalled */ + NULL); /* no name */ + } + return; +} + +/* + * Function: shutdown_handler() + * + * Local function that performs the default handling of Ctrl+C kind + * events; gently shutting down the RTS + * + * To repeat Signals.c remark -- user code may choose to override the + * default handler. Which is fine, assuming they put back the default + * handler when/if they de-install the custom handler. + * + */ +static BOOL WINAPI shutdown_handler(DWORD dwCtrlType) +{ + switch (dwCtrlType) { + + case CTRL_CLOSE_EVENT: + /* see generic_handler() comment re: this event */ + return FALSE; + case CTRL_C_EVENT: + case CTRL_BREAK_EVENT: + + // If we're already trying to interrupt the RTS, terminate with + // extreme prejudice. So the first ^C tries to exit the program + // cleanly, and the second one just kills it. + if (sched_state >= SCHED_INTERRUPTING) { + stg_exit(EXIT_INTERRUPTED); + } else { + interruptStgRts(); + /* Cheesy pulsing of an event to wake up a waiting RTS thread, if any */ + abandonRequestWait(); + resetAbandonRequestWait(); + } + return TRUE; + + /* shutdown + logoff events are not handled here. */ + default: + return FALSE; + } +} + + +/* + * Function: initDefaultHandlers() + * + * Install any default signal/console handlers. Currently we install a + * Ctrl+C handler that shuts down the RTS in an orderly manner. + */ +void initDefaultHandlers(void) +{ + if ( !SetConsoleCtrlHandler(shutdown_handler, TRUE) ) { + errorBelch("warning: failed to install default console handler"); + } +} + + +/* + * Function: blockUserSignals() + * + * Temporarily block the delivery of further console events. Needed to + * avoid race conditions when GCing the stack of outstanding handlers or + * when emptying the stack by running the handlers. + * + */ +void +blockUserSignals(void) +{ + deliver_event = rtsFalse; +} + + +/* + * Function: unblockUserSignals() + * + * The inverse of blockUserSignals(); re-enable the deliver of console events. + */ +void +unblockUserSignals(void) +{ + deliver_event = rtsTrue; +} + + +/* + * Function: awaitUserSignals() + * + * Wait for the next console event. Currently a NOP (returns immediately.) + */ +void awaitUserSignals(void) +{ + return; +} + + +/* + * Function: startSignalHandlers() + * + * Run the handlers associated with the stacked up console events. Console + * event delivery is blocked for the duration of this call. + */ +void startSignalHandlers(Capability *cap) +{ + StgStablePtr handler; + + if (console_handler < 0) { + return; + } + + blockUserSignals(); + ACQUIRE_LOCK(&sched_mutex); + + handler = deRefStablePtr((StgStablePtr)console_handler); + while (stg_pending_events > 0) { + stg_pending_events--; + scheduleThread(cap, + createIOThread(cap, + RtsFlags.GcFlags.initialStkSize, + rts_apply(cap, + (StgClosure *)handler, + rts_mkInt(cap, + stg_pending_buf[stg_pending_events])))); + } + + RELEASE_LOCK(&sched_mutex); + unblockUserSignals(); +} + +/* + * Function: markSignalHandlers() + * + * Evacuate the handler stack. _Assumes_ that console event delivery + * has already been blocked. + */ +void markSignalHandlers (evac_fn evac) +{ + if (console_handler >= 0) { + StgPtr p = deRefStablePtr((StgStablePtr)console_handler); + evac((StgClosure**)(void *)&p); + } +} + + +/* + * Function: generic_handler() + * + * Local function which handles incoming console event (done in a sep OS thread), + * recording the event in stg_pending_events. + */ +static BOOL WINAPI generic_handler(DWORD dwCtrlType) +{ + ACQUIRE_LOCK(&sched_mutex); + + /* Ultra-simple -- up the counter + signal a switch. */ + switch(dwCtrlType) { + case CTRL_CLOSE_EVENT: + /* Don't support the delivery of this event; if we + * indicate that we've handled it here and the Haskell handler + * doesn't take proper action (e.g., terminate the OS process), + * the user of the app will be unable to kill/close it. Not + * good, so disable the delivery for now. + */ + return FALSE; + default: + if (!deliver_event) return TRUE; + + if ( stg_pending_events < N_PENDING_EVENTS ) { + stg_pending_buf[stg_pending_events] = dwCtrlType; + stg_pending_events++; + } + /* Cheesy pulsing of an event to wake up a waiting RTS thread, if any */ + abandonRequestWait(); + resetAbandonRequestWait(); + return TRUE; + } + + RELEASE_LOCK(&sched_mutex); +} + + +/* + * Function: rts_InstallConsoleEvent() + * + * Install/remove a console event handler. + */ +int +rts_InstallConsoleEvent(int action, StgStablePtr *handler) +{ + StgInt previous_hdlr = console_handler; + + switch (action) { + case STG_SIG_IGN: + console_handler = STG_SIG_IGN; + if ( !SetConsoleCtrlHandler(NULL, TRUE) ) { + errorBelch("warning: unable to ignore console events"); + } + break; + case STG_SIG_DFL: + console_handler = STG_SIG_IGN; + if ( !SetConsoleCtrlHandler(NULL, FALSE) ) { + errorBelch("warning: unable to restore default console event handling"); + } + break; + case STG_SIG_HAN: + console_handler = (StgInt)*handler; + if ( previous_hdlr < 0 ) { + /* Only install generic_handler() once */ + if ( !SetConsoleCtrlHandler(generic_handler, TRUE) ) { + errorBelch("warning: unable to install console event handler"); + } + } + break; + } + + if (previous_hdlr == STG_SIG_DFL || + previous_hdlr == STG_SIG_IGN) { + return previous_hdlr; + } else { + *handler = (StgStablePtr)previous_hdlr; + return STG_SIG_HAN; + } +} + +/* + * Function: rts_HandledConsoleEvent() + * + * Signal that a Haskell console event handler has completed its run. + * The explicit notification that a Haskell handler has completed is + * required to better handle the delivery of Ctrl-C/Break events whilst + * an async worker thread is handling a read request on stdin. The + * Win32 console implementation will abort such a read request when Ctrl-C + * is delivered. That leaves the worker thread in a bind: should it + * abandon the request (the Haskell thread reading from stdin has been + * thrown an exception to signal the delivery of Ctrl-C & hence have + * aborted the I/O request) or simply ignore the aborted read and retry? + * (the Haskell thread reading from stdin isn't concerned with the + * delivery and handling of Ctrl-C.) With both scenarios being + * possible, the worker thread needs to be told -- that is, did the + * console event handler cause the IO request to be abandoned? + * + */ +void +rts_ConsoleHandlerDone(int ev) +{ + if ( (DWORD)ev == CTRL_BREAK_EVENT || + (DWORD)ev == CTRL_C_EVENT ) { + /* only these two cause stdin system calls to abort.. */ + SetEvent(hConsoleEvent); /* event is manual-reset */ + Sleep(0); /* yield */ + ResetEvent(hConsoleEvent); /* turn it back off again */ + } +} + +/* + * Function: rts_waitConsoleHandlerCompletion() + * + * Esoteric entry point used by worker thread that got woken + * up as part Ctrl-C delivery. + */ +int +rts_waitConsoleHandlerCompletion() +{ + /* As long as the worker doesn't need to do a multiple wait, + * let's keep this HANDLE private to this 'module'. + */ + return (WaitForSingleObject(hConsoleEvent, INFINITE) == WAIT_OBJECT_0); +} diff --git a/rts/win32/ConsoleHandler.h b/rts/win32/ConsoleHandler.h new file mode 100644 index 0000000000..b09adf71cb --- /dev/null +++ b/rts/win32/ConsoleHandler.h @@ -0,0 +1,63 @@ +/* + * Console control handler support. + * + */ +#ifndef __CONSOLEHANDLER_H__ +#define __CONSOLEHANDLER_H__ + +/* + * Console control handlers lets an application handle Ctrl+C, Ctrl+Break etc. + * in Haskell under Win32. Akin to the Unix signal SIGINT. + * + * The API offered by ConsoleHandler.h is identical to that of the signal handling + * code (which isn't supported under win32.) Unsurprisingly, the underlying impl + * is derived from the signal handling code also. + */ + +/* + * Function: signals_pending() + * + * Used by the RTS to check whether new signals have been 'recently' reported. + * If so, the RTS arranges for the delivered signals to be handled by + * de-queueing them from their table, running the associated Haskell + * signal handler. + */ +extern StgInt stg_pending_events; + +#define signals_pending() ( stg_pending_events > 0) + +/* + * Function: anyUserHandlers() + * + * Used by the Scheduler to decide whether its worth its while to stick + * around waiting for an external signal when there are no threads + * runnable. A console handler is used to handle termination events (Ctrl+C) + * and isn't considered a 'user handler'. + */ +#define anyUserHandlers() (rtsFalse) + +/* + * Function: startSignalHandlers() + * + * Run the handlers associated with the queued up console events. Console + * event delivery is blocked for the duration of this call. + */ +extern void startSignalHandlers(Capability *cap); + +/* + * Function: handleSignalsInThisThread() + * + * Have current (OS) thread assume responsibility of handling console events/signals. + * Currently not used (by the console event handling code.) + */ +extern void handleSignalsInThisThread(void); + +/* + * Function: rts_waitConsoleHandlerCompletion() + * + * Esoteric entry point used by worker thread that got woken + * up as part Ctrl-C delivery. + */ +extern int rts_waitConsoleHandlerCompletion(void); + +#endif /* __CONSOLEHANDLER_H__ */ diff --git a/rts/win32/GetTime.c b/rts/win32/GetTime.c new file mode 100644 index 0000000000..584b994d53 --- /dev/null +++ b/rts/win32/GetTime.c @@ -0,0 +1,101 @@ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team 2005 + * + * Machine-dependent time measurement functions + * + * ---------------------------------------------------------------------------*/ + +#include "Rts.h" +#include "GetTime.h" + +#include <windows.h> + +#ifdef HAVE_TIME_H +# include <time.h> +#endif + +#define HNS_PER_SEC 10000000LL /* FILETIMES are in units of 100ns */ +/* Convert FILETIMEs into secs */ + +static INLINE_ME Ticks +fileTimeToTicks(FILETIME ft) +{ + Ticks t; + t = ((Ticks)ft.dwHighDateTime << 32) | ft.dwLowDateTime; + t = (t * TICKS_PER_SECOND) / HNS_PER_SEC; + return t; +} + +static int is_win9x = -1; + +static INLINE_ME rtsBool +isWin9x(void) +{ + if (is_win9x < 0) { + /* figure out whether we're on a Win9x box or not. */ + OSVERSIONINFO oi; + BOOL b; + + /* Need to init the size field first.*/ + oi.dwOSVersionInfoSize = sizeof(OSVERSIONINFO); + b = GetVersionEx(&oi); + + is_win9x = ( (b && (oi.dwPlatformId & VER_PLATFORM_WIN32_WINDOWS)) ? 1 : 0); + } + return is_win9x; +} + + +void +getProcessTimes(Ticks *user, Ticks *elapsed) +{ + *user = getProcessCPUTime(); + *elapsed = getProcessElapsedTime(); +} + +Ticks +getProcessCPUTime(void) +{ + FILETIME creationTime, exitTime, userTime, kernelTime = {0,0}; + + if (isWin9x()) return getProcessElapsedTime(); + + if (!GetProcessTimes(GetCurrentProcess(), &creationTime, + &exitTime, &kernelTime, &userTime)) { + return 0; + } + + return fileTimeToTicks(userTime); +} + +Ticks +getProcessElapsedTime(void) +{ + FILETIME system_time; + GetSystemTimeAsFileTime(&system_time); + return fileTimeToTicks(system_time); +} + +Ticks +getThreadCPUTime(void) +{ + FILETIME creationTime, exitTime, userTime, kernelTime = {0,0}; + + if (isWin9x()) return getProcessCPUTime(); + + if (!GetThreadTimes(GetCurrentThread(), &creationTime, + &exitTime, &kernelTime, &userTime)) { + return 0; + } + + return fileTimeToTicks(userTime); +} + +nat +getPageFaults(void) +{ + /* ToDo (on NT): better, get this via the performance data + that's stored in the registry. */ + return 0; +} diff --git a/rts/win32/IOManager.c b/rts/win32/IOManager.c new file mode 100644 index 0000000000..a67c3504c1 --- /dev/null +++ b/rts/win32/IOManager.c @@ -0,0 +1,510 @@ +/* IOManager.c + * + * Non-blocking / asynchronous I/O for Win32. + * + * (c) sof, 2002-2003. + */ +#include "Rts.h" +#include "IOManager.h" +#include "WorkQueue.h" +#include "ConsoleHandler.h" +#include <stdio.h> +#include <stdlib.h> +#include <io.h> +#include <winsock.h> +#include <process.h> + +/* + * Internal state maintained by the IO manager. + */ +typedef struct IOManagerState { + CritSection manLock; + WorkQueue* workQueue; + int queueSize; + int numWorkers; + int workersIdle; + HANDLE hExitEvent; + unsigned int requestID; + /* fields for keeping track of active WorkItems */ + CritSection active_work_lock; + WorkItem* active_work_items; +} IOManagerState; + +/* ToDo: wrap up this state via a IOManager handle instead? */ +static IOManagerState* ioMan; + +static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi); +static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi); + +/* + * The routine executed by each worker thread. + */ +static +unsigned +WINAPI +IOWorkerProc(PVOID param) +{ + HANDLE hWaits[2]; + DWORD rc; + IOManagerState* iom = (IOManagerState*)param; + WorkQueue* pq = iom->workQueue; + WorkItem* work; + int len = 0, fd = 0; + DWORD errCode = 0; + void* complData; + + hWaits[0] = (HANDLE)iom->hExitEvent; + hWaits[1] = GetWorkQueueHandle(pq); + + while (1) { + /* The error code is communicated back on completion of request; reset. */ + errCode = 0; + + EnterCriticalSection(&iom->manLock); + /* Signal that the worker is idle. + * + * 'workersIdle' is used when determining whether or not to + * increase the worker thread pool when adding a new request. + * (see addIORequest().) + */ + iom->workersIdle++; + LeaveCriticalSection(&iom->manLock); + + /* + * A possible future refinement is to make long-term idle threads + * wake up and decide to shut down should the number of idle threads + * be above some threshold. + * + */ + rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE ); + + if (rc == WAIT_OBJECT_0) { + // we received the exit event + return 0; + } + + EnterCriticalSection(&iom->manLock); + /* Signal that the thread is 'non-idle' and about to consume + * a work item. + */ + iom->workersIdle--; + iom->queueSize--; + LeaveCriticalSection(&iom->manLock); + + if ( rc == (WAIT_OBJECT_0 + 1) ) { + /* work item available, fetch it. */ + if (FetchWork(pq,(void**)&work)) { + work->abandonOp = 0; + RegisterWorkItem(iom,work); + if ( work->workKind & WORKER_READ ) { + if ( work->workKind & WORKER_FOR_SOCKET ) { + len = recv(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len, + 0); + if (len == SOCKET_ERROR) { + errCode = WSAGetLastError(); + } + } else { + while (1) { + /* Do the read(), with extra-special handling for Ctrl+C */ + len = read(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len); + if ( len == 0 && work->workData.ioData.len != 0 ) { + /* Given the following scenario: + * - a console handler has been registered that handles Ctrl+C + * events. + * - we've not tweaked the 'console mode' settings to turn on + * ENABLE_PROCESSED_INPUT. + * - we're blocked waiting on input from standard input. + * - the user hits Ctrl+C. + * + * The OS will invoke the console handler (in a separate OS thread), + * and the above read() (i.e., under the hood, a ReadFile() op) returns + * 0, with the error set to ERROR_OPERATION_ABORTED. We don't + * want to percolate this error condition back to the Haskell user. + * Do this by waiting for the completion of the Haskell console handler. + * If upon completion of the console handler routine, the Haskell thread + * that issued the request is found to have been thrown an exception, + * the worker abandons the request (since that's what the Haskell thread + * has done.) If the Haskell thread hasn't been interrupted, the worker + * retries the read request as if nothing happened. + */ + if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) { + /* For now, only abort when dealing with the standard input handle. + * i.e., for all others, an error is raised. + */ + HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE); + if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) { + if (rts_waitConsoleHandlerCompletion()) { + /* If the Scheduler has set work->abandonOp, the Haskell thread has + * been thrown an exception (=> the worker must abandon this request.) + * We test for this below before invoking the on-completion routine. + */ + if (work->abandonOp) { + break; + } else { + continue; + } + } + } else { + break; /* Treat it like an error */ + } + } else { + break; + } + } else { + break; + } + } + if (len == -1) { errCode = errno; } + } + complData = work->workData.ioData.buf; + fd = work->workData.ioData.fd; + } else if ( work->workKind & WORKER_WRITE ) { + if ( work->workKind & WORKER_FOR_SOCKET ) { + len = send(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len, + 0); + if (len == SOCKET_ERROR) { + errCode = WSAGetLastError(); + } + } else { + len = write(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len); + if (len == -1) { errCode = errno; } + } + complData = work->workData.ioData.buf; + fd = work->workData.ioData.fd; + } else if ( work->workKind & WORKER_DELAY ) { + /* Approximate implementation of threadDelay; + * + * Note: Sleep() is in milliseconds, not micros. + */ + Sleep(work->workData.delayData.msecs / 1000); + len = work->workData.delayData.msecs; + complData = NULL; + fd = 0; + errCode = 0; + } else if ( work->workKind & WORKER_DO_PROC ) { + /* perform operation/proc on behalf of Haskell thread. */ + if (work->workData.procData.proc) { + /* The procedure is assumed to encode result + success/failure + * via its param. + */ + errCode=work->workData.procData.proc(work->workData.procData.param); + } else { + errCode=1; + } + complData = work->workData.procData.param; + } else { + fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind); + fflush(stderr); + continue; + } + if (!work->abandonOp) { + work->onCompletion(work->requestID, + fd, + len, + complData, + errCode); + } + /* Free the WorkItem */ + DeregisterWorkItem(iom,work); + free(work); + } else { + fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr); + return 1; + } + } else { + fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr); + return 1; + } + } + return 0; +} + +static +BOOL +NewIOWorkerThread(IOManagerState* iom) +{ + unsigned threadId; + return ( 0 != _beginthreadex(NULL, + 0, + IOWorkerProc, + (LPVOID)iom, + 0, + &threadId) ); +} + +BOOL +StartIOManager(void) +{ + HANDLE hExit; + WorkQueue* wq; + + wq = NewWorkQueue(); + if ( !wq ) return FALSE; + + ioMan = (IOManagerState*)malloc(sizeof(IOManagerState)); + + if (!ioMan) { + FreeWorkQueue(wq); + return FALSE; + } + + /* A manual-reset event */ + hExit = CreateEvent ( NULL, TRUE, FALSE, NULL ); + if ( !hExit ) { + FreeWorkQueue(wq); + free(ioMan); + return FALSE; + } + + ioMan->hExitEvent = hExit; + InitializeCriticalSection(&ioMan->manLock); + ioMan->workQueue = wq; + ioMan->numWorkers = 0; + ioMan->workersIdle = 0; + ioMan->queueSize = 0; + ioMan->requestID = 1; + InitializeCriticalSection(&ioMan->active_work_lock); + ioMan->active_work_items = NULL; + + return TRUE; +} + +/* + * Function: depositWorkItem() + * + * Local function which deposits a WorkItem onto a work queue, + * deciding in the process whether or not the thread pool needs + * to be augmented with another thread to handle the new request. + * + */ +static +int +depositWorkItem( unsigned int reqID, + WorkItem* wItem ) +{ + EnterCriticalSection(&ioMan->manLock); + +#if 0 + fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); + fflush(stderr); +#endif + /* A new worker thread is created when there are fewer idle threads + * than non-consumed queue requests. This ensures that requests will + * be dealt with in a timely manner. + * + * [Long explanation of why the previous thread pool policy lead to + * trouble] + * + * Previously, the thread pool was augmented iff no idle worker threads + * were available. That strategy runs the risk of repeatedly adding to + * the request queue without expanding the thread pool to handle this + * sudden spike in queued requests. + * [How? Assume workersIdle is 1, and addIORequest() is called. No new + * thread is created and the request is simply queued. If addIORequest() + * is called again _before the OS schedules a worker thread to pull the + * request off the queue_, workersIdle is still 1 and another request is + * simply added to the queue. Once the worker thread is run, only one + * request is de-queued, leaving the 2nd request in the queue] + * + * Assuming none of the queued requests take an inordinate amount of to + * complete, the request queue would eventually be drained. But if that's + * not the case, the later requests will end up languishing in the queue + * indefinitely. The non-timely handling of requests may cause CH applications + * to misbehave / hang; bad. + * + */ + ioMan->queueSize++; + if ( (ioMan->workersIdle < ioMan->queueSize) ) { + /* see if giving up our quantum ferrets out some idle threads. + */ + LeaveCriticalSection(&ioMan->manLock); + Sleep(0); + EnterCriticalSection(&ioMan->manLock); + if ( (ioMan->workersIdle < ioMan->queueSize) ) { + /* No, go ahead and create another. */ + ioMan->numWorkers++; + LeaveCriticalSection(&ioMan->manLock); + NewIOWorkerThread(ioMan); + } else { + LeaveCriticalSection(&ioMan->manLock); + } + } else { + LeaveCriticalSection(&ioMan->manLock); + } + + if (SubmitWork(ioMan->workQueue,wItem)) { + /* Note: the work item has potentially been consumed by a worker thread + * (and freed) at this point, so we cannot use wItem's requestID. + */ + return reqID; + } else { + return 0; + } +} + +/* + * Function: AddIORequest() + * + * Conduit to underlying WorkQueue's SubmitWork(); adds IO + * request to work queue, deciding whether or not to augment + * the thread pool in the process. + */ +int +AddIORequest ( int fd, + BOOL forWriting, + BOOL isSocket, + int len, + char* buffer, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return 0; + + /* Fill in the blanks */ + wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) | + ( forWriting ? WORKER_WRITE : WORKER_READ ); + wItem->workData.ioData.fd = fd; + wItem->workData.ioData.len = len; + wItem->workData.ioData.buf = buffer; + wItem->link = NULL; + + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + + return depositWorkItem(reqID, wItem); +} + +/* + * Function: AddDelayRequest() + * + * Like AddIORequest(), but this time adding a delay request to + * the request queue. + */ +BOOL +AddDelayRequest ( unsigned int msecs, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return FALSE; + + /* Fill in the blanks */ + wItem->workKind = WORKER_DELAY; + wItem->workData.delayData.msecs = msecs; + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + wItem->link = NULL; + + return depositWorkItem(reqID, wItem); +} + +/* + * Function: AddProcRequest() + * + * Add an asynchronous procedure request. + */ +BOOL +AddProcRequest ( void* proc, + void* param, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return FALSE; + + /* Fill in the blanks */ + wItem->workKind = WORKER_DO_PROC; + wItem->workData.procData.proc = proc; + wItem->workData.procData.param = param; + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + wItem->abandonOp = 0; + wItem->link = NULL; + + return depositWorkItem(reqID, wItem); +} + +void ShutdownIOManager ( void ) +{ + SetEvent(ioMan->hExitEvent); + // ToDo: we can't free this now, because the worker thread(s) + // haven't necessarily finished with it yet. Perhaps it should + // have a reference count or something. + // free(ioMan); + // ioMan = NULL; +} + +/* Keep track of WorkItems currently being serviced. */ +static +void +RegisterWorkItem(IOManagerState* ioMan, + WorkItem* wi) +{ + EnterCriticalSection(&ioMan->active_work_lock); + wi->link = ioMan->active_work_items; + ioMan->active_work_items = wi; + LeaveCriticalSection(&ioMan->active_work_lock); +} + +static +void +DeregisterWorkItem(IOManagerState* ioMan, + WorkItem* wi) +{ + WorkItem *ptr, *prev; + + EnterCriticalSection(&ioMan->active_work_lock); + for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) { + if (wi->requestID == ptr->requestID) { + if (prev==NULL) { + ioMan->active_work_items = ptr->link; + } else { + prev->link = ptr->link; + } + LeaveCriticalSection(&ioMan->active_work_lock); + return; + } + } + fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID); + LeaveCriticalSection(&ioMan->active_work_lock); +} + + +/* + * Function: abandonWorkRequest() + * + * Signal that a work request isn't of interest. Called by the Scheduler + * if a blocked Haskell thread has an exception thrown to it. + * + * Note: we're not aborting the system call that a worker might be blocked on + * here, just disabling the propagation of its result once its finished. We + * may have to go the whole hog here and switch to overlapped I/O so that we + * can abort blocked system calls. + */ +void +abandonWorkRequest ( int reqID ) +{ + WorkItem *ptr; + EnterCriticalSection(&ioMan->active_work_lock); + for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) { + if (ptr->requestID == (unsigned int)reqID ) { + ptr->abandonOp = 1; + LeaveCriticalSection(&ioMan->active_work_lock); + return; + } + } + /* Note: if the request ID isn't present, the worker will have + * finished sometime since awaitRequests() last drained the completed + * request table; i.e., not an error. + */ + LeaveCriticalSection(&ioMan->active_work_lock); +} diff --git a/rts/win32/IOManager.h b/rts/win32/IOManager.h new file mode 100644 index 0000000000..4893e2387c --- /dev/null +++ b/rts/win32/IOManager.h @@ -0,0 +1,110 @@ +/* IOManager.h + * + * Non-blocking / asynchronous I/O for Win32. + * + * (c) sof, 2002-2003 + */ +#ifndef __IOMANAGER_H__ +#define __IOMANAGER_H__ +/* On the yucky side..suppress -Wmissing-declarations warnings when + * including <windows.h> + */ +extern void* GetCurrentFiber ( void ); +extern void* GetFiberData ( void ); +#include <windows.h> + +/* + The IOManager subsystem provides a non-blocking view + of I/O operations. It lets one (or more) OS thread(s) + issue multiple I/O requests, which the IOManager then + handles independently of/concurrent to the thread(s) + that issued the request. Upon completion, the issuing + thread can inspect the result of the I/O operation & + take appropriate action. + + The IOManager is intended used with the GHC RTS to + implement non-blocking I/O in Concurrent Haskell. + */ + +/* + * Our WorkQueue holds WorkItems, encoding IO and + * delay requests. + * + */ +typedef void (*CompletionProc)(unsigned int requestID, + int fd, + int len, + void* buf, + int errCode); + +/* + * Asynchronous procedure calls executed by a worker thread + * take a generic state argument pointer and return an int by + * default. + */ +typedef int (*DoProcProc)(void *param); + +typedef union workData { + struct { + int fd; + int len; + char *buf; + } ioData; + struct { + int msecs; + } delayData; + struct { + DoProcProc proc; + void* param; + } procData; +} WorkData; + +typedef struct WorkItem { + unsigned int workKind; + WorkData workData; + unsigned int requestID; + CompletionProc onCompletion; + unsigned int abandonOp; + struct WorkItem *link; +} WorkItem; + +extern CompletionProc onComplete; + +/* the kind of operations supported; you could easily imagine + * that instead of passing a tag describing the work to be performed, + * a function pointer is passed instead. Maybe later. + */ +#define WORKER_READ 1 +#define WORKER_WRITE 2 +#define WORKER_DELAY 4 +#define WORKER_FOR_SOCKET 8 +#define WORKER_DO_PROC 16 + +/* + * Starting up and shutting down. + */ +extern BOOL StartIOManager ( void ); +extern void ShutdownIOManager ( void ); + +/* + * Adding I/O and delay requests. With each request a + * completion routine is supplied, which the worker thread + * will invoke upon completion. + */ +extern int AddDelayRequest ( unsigned int msecs, + CompletionProc onCompletion); + +extern int AddIORequest ( int fd, + BOOL forWriting, + BOOL isSocket, + int len, + char* buffer, + CompletionProc onCompletion); + +extern int AddProcRequest ( void* proc, + void* data, + CompletionProc onCompletion); + +extern void abandonWorkRequest ( int reqID ); + +#endif /* __IOMANAGER_H__ */ diff --git a/rts/win32/OSThreads.c b/rts/win32/OSThreads.c new file mode 100644 index 0000000000..c772be38f4 --- /dev/null +++ b/rts/win32/OSThreads.c @@ -0,0 +1,199 @@ +/* --------------------------------------------------------------------------- + * + * (c) The GHC Team, 2001-2005 + * + * Accessing OS threads functionality in a (mostly) OS-independent + * manner. + * + * --------------------------------------------------------------------------*/ + +#include "Rts.h" +#if defined(THREADED_RTS) +#include "OSThreads.h" +#include "RtsUtils.h" + +/* For reasons not yet clear, the entire contents of process.h is protected + * by __STRICT_ANSI__ not being defined. + */ +#undef __STRICT_ANSI__ +#include <process.h> + +/* Win32 threads and synchronisation objects */ + +/* A Condition is represented by a Win32 Event object; + * a Mutex by a Mutex kernel object. + * + * ToDo: go through the defn and usage of these to + * make sure the semantics match up with that of + * the (assumed) pthreads behaviour. This is really + * just a first pass at getting something compilable. + */ + +void +initCondition( Condition* pCond ) +{ + HANDLE h = CreateEvent(NULL, + FALSE, /* auto reset */ + FALSE, /* initially not signalled */ + NULL); /* unnamed => process-local. */ + + if ( h == NULL ) { + errorBelch("initCondition: unable to create"); + } + *pCond = h; + return; +} + +void +closeCondition( Condition* pCond ) +{ + if ( CloseHandle(*pCond) == 0 ) { + errorBelch("closeCondition: failed to close"); + } + return; +} + +rtsBool +broadcastCondition ( Condition* pCond ) +{ + PulseEvent(*pCond); + return rtsTrue; +} + +rtsBool +signalCondition ( Condition* pCond ) +{ + if (SetEvent(*pCond) == 0) { + barf("SetEvent: %d", GetLastError()); + } + return rtsTrue; +} + +rtsBool +waitCondition ( Condition* pCond, Mutex* pMut ) +{ + RELEASE_LOCK(pMut); + WaitForSingleObject(*pCond, INFINITE); + /* Hmm..use WaitForMultipleObjects() ? */ + ACQUIRE_LOCK(pMut); + return rtsTrue; +} + +void +yieldThread() +{ + Sleep(0); + return; +} + +void +shutdownThread() +{ + _endthreadex(0); +} + +int +createOSThread (OSThreadId* pId, OSThreadProc *startProc, void *param) +{ + + return (_beginthreadex ( NULL, /* default security attributes */ + 0, + (unsigned (__stdcall *)(void *)) startProc, + param, + 0, + (unsigned*)pId) == 0); +} + +OSThreadId +osThreadId() +{ + return GetCurrentThreadId(); +} + +#ifdef USE_CRITICAL_SECTIONS +void +initMutex (Mutex* pMut) +{ + InitializeCriticalSectionAndSpinCount(pMut,4000); +} +#else +void +initMutex (Mutex* pMut) +{ + HANDLE h = CreateMutex ( NULL, /* default sec. attributes */ + FALSE, /* not owned => initially signalled */ + NULL + ); + *pMut = h; + return; +} +#endif + +void +newThreadLocalKey (ThreadLocalKey *key) +{ + DWORD r; + r = TlsAlloc(); + if (r == TLS_OUT_OF_INDEXES) { + barf("newThreadLocalKey: out of keys"); + } + *key = r; +} + +void * +getThreadLocalVar (ThreadLocalKey *key) +{ + void *r; + r = TlsGetValue(*key); +#ifdef DEBUG + // r is allowed to be NULL - it can mean that either there was an + // error or the stored value is in fact NULL. + if (GetLastError() != NO_ERROR) { + barf("getThreadLocalVar: key not found"); + } +#endif + return r; +} + +void +setThreadLocalVar (ThreadLocalKey *key, void *value) +{ + BOOL b; + b = TlsSetValue(*key, value); + if (!b) { + barf("setThreadLocalVar: %d", GetLastError()); + } +} + + +static unsigned __stdcall +forkOS_createThreadWrapper ( void * entry ) +{ + Capability *cap; + cap = rts_lock(); + cap = rts_evalStableIO(cap, (HsStablePtr) entry, NULL); + rts_unlock(cap); + return 0; +} + +int +forkOS_createThread ( HsStablePtr entry ) +{ + unsigned long pId; + return (_beginthreadex ( NULL, /* default security attributes */ + 0, + forkOS_createThreadWrapper, + (void*)entry, + 0, + (unsigned*)&pId) == 0); +} + +#else /* !defined(THREADED_RTS) */ + +int +forkOS_createThread ( HsStablePtr entry STG_UNUSED ) +{ + return -1; +} + +#endif /* !defined(THREADED_RTS) */ diff --git a/rts/win32/Ticker.c b/rts/win32/Ticker.c new file mode 100644 index 0000000000..ab791d8dc7 --- /dev/null +++ b/rts/win32/Ticker.c @@ -0,0 +1,124 @@ +/* + * RTS periodic timers. + * + */ +#include "Rts.h" +#include "Timer.h" +#include "Ticker.h" +#include <windows.h> +#include <stdio.h> +#include <process.h> +#include "OSThreads.h" + +/* + * Provide a timer service for the RTS, periodically + * notifying it that a number of 'ticks' has passed. + * + */ + +/* To signal shutdown of the timer service, we use a local + * event which the timer thread listens to (and stopVirtTimer() + * signals.) + */ +static HANDLE hStopEvent = INVALID_HANDLE_VALUE; +static HANDLE tickThread = INVALID_HANDLE_VALUE; + +static TickProc tickProc = NULL; + +/* + * Ticking is done by a separate thread which periodically + * wakes up to handle a tick. + * + * This is the portable way of providing a timer service under + * Win32; features like waitable timers or timer queues are only + * supported by a subset of the Win32 platforms (notably not + * under Win9x.) + * + */ +static +unsigned +WINAPI +TimerProc(PVOID param) +{ + int ms = (int)param; + DWORD waitRes; + + /* interpret a < 0 timeout period as 'instantaneous' */ + if (ms < 0) ms = 0; + + while (1) { + waitRes = WaitForSingleObject(hStopEvent, ms); + + switch (waitRes) { + case WAIT_OBJECT_0: + /* event has become signalled */ + tickProc = NULL; + CloseHandle(hStopEvent); + return 0; + case WAIT_TIMEOUT: + /* tick */ + tickProc(0); + break; + case WAIT_FAILED: { + DWORD dw = GetLastError(); + fprintf(stderr, "TimerProc: wait failed -- error code: %lu\n", dw); fflush(stderr); + break; + } + default: + fprintf(stderr, "TimerProc: unexpected result %lu\n", waitRes); fflush(stderr); + break; + } + } + return 0; +} + + +int +startTicker(nat ms, TickProc handle_tick) +{ + unsigned threadId; + /* 'hStopEvent' is a manual-reset event that's signalled upon + * shutdown of timer service (=> timer thread.) + */ + hStopEvent = CreateEvent ( NULL, + TRUE, + FALSE, + NULL); + if (hStopEvent == INVALID_HANDLE_VALUE) { + return 0; + } + tickProc = handle_tick; + tickThread = (HANDLE)(long)_beginthreadex( NULL, + 0, + TimerProc, + (LPVOID)ms, + 0, + &threadId); + return (tickThread != 0); +} + +int +stopTicker(void) +{ + // We must wait for the ticker thread to terminate, since if we + // are in a DLL that is about to be unloaded, the ticker thread + // cannot be allowed to return to a missing DLL. + + if (hStopEvent != INVALID_HANDLE_VALUE && + tickThread != INVALID_HANDLE_VALUE) { + DWORD exitCode; + SetEvent(hStopEvent); + while (1) { + WaitForSingleObject(tickThread, 20); + if (!GetExitCodeThread(tickThread, &exitCode)) { + return 1; + } + if (exitCode != STILL_ACTIVE) { + tickThread = INVALID_HANDLE_VALUE; + return 0; + } + TerminateThread(tickThread, 0); + } + } + return 0; +} diff --git a/rts/win32/WorkQueue.c b/rts/win32/WorkQueue.c new file mode 100644 index 0000000000..85a23608be --- /dev/null +++ b/rts/win32/WorkQueue.c @@ -0,0 +1,215 @@ +/* + * A fixed-size queue; MT-friendly. + * + * (c) sof, 2002-2003. + */ +#include "WorkQueue.h" +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +static void queue_error_rc( char* loc, DWORD err); +static void queue_error( char* loc, char* reason); + + +/* Wrapper around OS call to create semaphore */ +static Semaphore +newSemaphore(int initCount, int max) +{ + Semaphore s; + s = CreateSemaphore ( NULL, /* LPSECURITY_ATTRIBUTES (default) */ + initCount, /* LONG lInitialCount */ + max, /* LONG lMaxCount */ + NULL); /* LPCTSTR (anonymous / no object name) */ + if ( NULL == s) { + queue_error_rc("newSemaphore", GetLastError()); + return NULL; + } + return s; +} + +/* + * Function: NewWorkQueue + * + * The queue constructor - semaphores are initialised to match + * max number of queue entries. + * + */ +WorkQueue* +NewWorkQueue() +{ + WorkQueue* wq = (WorkQueue*)malloc(sizeof(WorkQueue)); + + if (!wq) { + queue_error("NewWorkQueue", "malloc() failed"); + return wq; + } + + wq->head = 0; + wq->tail = 0; + + InitializeCriticalSection(&wq->queueLock); + wq->workAvailable = newSemaphore(0, WORKQUEUE_SIZE); + wq->roomAvailable = newSemaphore(WORKQUEUE_SIZE, WORKQUEUE_SIZE); + + /* Fail if we were unable to create any of the sync objects. */ + if ( NULL == wq->workAvailable || + NULL == wq->roomAvailable ) { + FreeWorkQueue(wq); + return NULL; + } + + return wq; +} + +void +FreeWorkQueue ( WorkQueue* pq ) +{ + /* Close the semaphores; any threads blocked waiting + * on either will as a result be woken up. + */ + if ( pq->workAvailable ) { + CloseHandle(pq->workAvailable); + } + if ( pq->roomAvailable ) { + CloseHandle(pq->workAvailable); + } + free(pq); + return; +} + +HANDLE +GetWorkQueueHandle ( WorkQueue* pq ) +{ + if (!pq) return NULL; + + return pq->workAvailable; +} + +/* + * Function: GetWork + * + * Fetch a work item from the queue, blocking if none available. + * Return value indicates of FALSE indicates error/fatal condition. + */ +BOOL +GetWork ( WorkQueue* pq, void** ppw ) +{ + DWORD rc; + + if (!pq) { + queue_error("GetWork", "NULL WorkQueue object"); + return FALSE; + } + if (!ppw) { + queue_error("GetWork", "NULL WorkItem object"); + return FALSE; + } + + /* Block waiting for work item to become available */ + if ( (rc = WaitForSingleObject( pq->workAvailable, INFINITE)) != WAIT_OBJECT_0 ) { + queue_error_rc("GetWork.WaitForSingleObject(workAvailable)", + ( (WAIT_FAILED == rc) ? GetLastError() : rc)); + return FALSE; + } + + return FetchWork(pq,ppw); +} + +/* + * Function: FetchWork + * + * Fetch a work item from the queue, blocking if none available. + * Return value indicates of FALSE indicates error/fatal condition. + */ +BOOL +FetchWork ( WorkQueue* pq, void** ppw ) +{ + DWORD rc; + + if (!pq) { + queue_error("FetchWork", "NULL WorkQueue object"); + return FALSE; + } + if (!ppw) { + queue_error("FetchWork", "NULL WorkItem object"); + return FALSE; + } + + EnterCriticalSection(&pq->queueLock); + *ppw = pq->items[pq->head]; + /* For sanity's sake, zero out the pointer. */ + pq->items[pq->head] = NULL; + pq->head = (pq->head + 1) % WORKQUEUE_SIZE; + rc = ReleaseSemaphore(pq->roomAvailable,1, NULL); + LeaveCriticalSection(&pq->queueLock); + if ( 0 == rc ) { + queue_error_rc("FetchWork.ReleaseSemaphore()", GetLastError()); + return FALSE; + } + + return TRUE; +} + +/* + * Function: SubmitWork + * + * Add work item to the queue, blocking if no room available. + * Return value indicates of FALSE indicates error/fatal condition. + */ +BOOL +SubmitWork ( WorkQueue* pq, void* pw ) +{ + DWORD rc; + + if (!pq) { + queue_error("SubmitWork", "NULL WorkQueue object"); + return FALSE; + } + if (!pw) { + queue_error("SubmitWork", "NULL WorkItem object"); + return FALSE; + } + + /* Block waiting for work item to become available */ + if ( (rc = WaitForSingleObject( pq->roomAvailable, INFINITE)) != WAIT_OBJECT_0 ) { + queue_error_rc("SubmitWork.WaitForSingleObject(workAvailable)", + ( (WAIT_FAILED == rc) ? GetLastError() : rc)); + + return FALSE; + } + + EnterCriticalSection(&pq->queueLock); + pq->items[pq->tail] = pw; + pq->tail = (pq->tail + 1) % WORKQUEUE_SIZE; + rc = ReleaseSemaphore(pq->workAvailable,1, NULL); + LeaveCriticalSection(&pq->queueLock); + if ( 0 == rc ) { + queue_error_rc("SubmitWork.ReleaseSemaphore()", GetLastError()); + return FALSE; + } + + return TRUE; +} + +/* Error handling */ + +static void +queue_error_rc( char* loc, + DWORD err) +{ + fprintf(stderr, "%s failed: return code = 0x%lx\n", loc, err); + fflush(stderr); + return; +} + + +static void +queue_error( char* loc, + char* reason) +{ + fprintf(stderr, "%s failed: %s\n", loc, reason); + fflush(stderr); + return; +} + diff --git a/rts/win32/WorkQueue.h b/rts/win32/WorkQueue.h new file mode 100644 index 0000000000..bde82a3a77 --- /dev/null +++ b/rts/win32/WorkQueue.h @@ -0,0 +1,37 @@ +/* WorkQueue.h + * + * A fixed-size queue; MT-friendly. + * + * (c) sof, 2002-2003 + * + */ +#ifndef __WORKQUEUE_H__ +#define __WORKQUEUE_H__ +#include <windows.h> + +/* This is a fixed-size queue. */ +#define WORKQUEUE_SIZE 16 + +typedef HANDLE Semaphore; +typedef CRITICAL_SECTION CritSection; + +typedef struct WorkQueue { + /* the master lock, need to be grabbed prior to + using any of the other elements of the struct. */ + CritSection queueLock; + /* consumers/workers block waiting for 'workAvailable' */ + Semaphore workAvailable; + Semaphore roomAvailable; + int head; + int tail; + void** items[WORKQUEUE_SIZE]; +} WorkQueue; + +extern WorkQueue* NewWorkQueue ( void ); +extern void FreeWorkQueue ( WorkQueue* pq ); +extern HANDLE GetWorkQueueHandle ( WorkQueue* pq ); +extern BOOL GetWork ( WorkQueue* pq, void** ppw ); +extern BOOL FetchWork ( WorkQueue* pq, void** ppw ); +extern int SubmitWork ( WorkQueue* pq, void* pw ); + +#endif /* __WORKQUEUE_H__ */ |