diff options
author | Simon Marlow <simonmar@microsoft.com> | 2006-04-07 02:05:11 +0000 |
---|---|---|
committer | Simon Marlow <simonmar@microsoft.com> | 2006-04-07 02:05:11 +0000 |
commit | 0065d5ab628975892cea1ec7303f968c3338cbe1 (patch) | |
tree | 8e2afe0ab48ee33cf95009809d67c9649573ef92 /rts/win32 | |
parent | 28a464a75e14cece5db40f2765a29348273ff2d2 (diff) | |
download | haskell-0065d5ab628975892cea1ec7303f968c3338cbe1.tar.gz |
Reorganisation of the source tree
Most of the other users of the fptools build system have migrated to
Cabal, and with the move to darcs we can now flatten the source tree
without losing history, so here goes.
The main change is that the ghc/ subdir is gone, and most of what it
contained is now at the top level. The build system now makes no
pretense at being multi-project, it is just the GHC build system.
No doubt this will break many things, and there will be a period of
instability while we fix the dependencies. A straightforward build
should work, but I haven't yet fixed binary/source distributions.
Changes to the Building Guide will follow, too.
Diffstat (limited to 'rts/win32')
-rw-r--r-- | rts/win32/AsyncIO.c | 345 | ||||
-rw-r--r-- | rts/win32/AsyncIO.h | 25 | ||||
-rw-r--r-- | rts/win32/AwaitEvent.c | 51 | ||||
-rw-r--r-- | rts/win32/ConsoleHandler.c | 313 | ||||
-rw-r--r-- | rts/win32/ConsoleHandler.h | 63 | ||||
-rw-r--r-- | rts/win32/GetTime.c | 101 | ||||
-rw-r--r-- | rts/win32/IOManager.c | 510 | ||||
-rw-r--r-- | rts/win32/IOManager.h | 110 | ||||
-rw-r--r-- | rts/win32/OSThreads.c | 199 | ||||
-rw-r--r-- | rts/win32/Ticker.c | 124 | ||||
-rw-r--r-- | rts/win32/WorkQueue.c | 215 | ||||
-rw-r--r-- | rts/win32/WorkQueue.h | 37 |
12 files changed, 2093 insertions, 0 deletions
diff --git a/rts/win32/AsyncIO.c b/rts/win32/AsyncIO.c new file mode 100644 index 0000000000..7bcf571cf8 --- /dev/null +++ b/rts/win32/AsyncIO.c @@ -0,0 +1,345 @@ +/* AsyncIO.c + * + * Integrating Win32 asynchronous I/O with the GHC RTS. + * + * (c) sof, 2002-2003. + */ +#include "Rts.h" +#include "RtsUtils.h" +#include <windows.h> +#include <stdio.h> +#include "Schedule.h" +#include "RtsFlags.h" +#include "Capability.h" +#include "win32/AsyncIO.h" +#include "win32/IOManager.h" + +/* + * Overview: + * + * Haskell code issue asynchronous I/O requests via the + * async{Read,Write,DoOp}# primops. These cause addIORequest() + * to be invoked, which forwards the request to the underlying + * asynchronous I/O subsystem. Each request is tagged with a unique + * ID. + * + * addIORequest() returns this ID, so that when the blocked CH + * thread is added onto blocked_queue, its TSO is annotated with + * it. Upon completion of an I/O request, the async I/O handling + * code makes a back-call to signal its completion; the local + * onIOComplete() routine. It adds the IO request ID (along with + * its result data) to a queue of completed requests before returning. + * + * The queue of completed IO request is read by the thread operating + * the RTS scheduler. It de-queues the CH threads corresponding + * to the request IDs, making them runnable again. + * + */ + +typedef struct CompletedReq { + unsigned int reqID; + int len; + int errCode; +} CompletedReq; + +#define MAX_REQUESTS 200 + +static CRITICAL_SECTION queue_lock; +static HANDLE completed_req_event; +static HANDLE abandon_req_wait; +static HANDLE wait_handles[2]; +static CompletedReq completedTable[MAX_REQUESTS]; +static int completed_hw; +static HANDLE completed_table_sema; +static int issued_reqs; + +static void +onIOComplete(unsigned int reqID, + int fd STG_UNUSED, + int len, + void* buf STG_UNUSED, + int errCode) +{ + DWORD dwRes; + /* Deposit result of request in queue/table..when there's room. */ + dwRes = WaitForSingleObject(completed_table_sema, INFINITE); + switch (dwRes) { + case WAIT_OBJECT_0: + break; + default: + /* Not likely */ + fprintf(stderr, "onIOComplete: failed to grab table semaphore, dropping request 0x%x\n", reqID); + fflush(stderr); + return; + } + EnterCriticalSection(&queue_lock); + if (completed_hw == MAX_REQUESTS) { + /* Shouldn't happen */ + fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); dropping.\n", reqID); + fflush(stderr); + } else { +#if 0 + fprintf(stderr, "onCompl: %d %d %d %d %d\n", + reqID, len, errCode, issued_reqs, completed_hw); + fflush(stderr); +#endif + completedTable[completed_hw].reqID = reqID; + completedTable[completed_hw].len = len; + completedTable[completed_hw].errCode = errCode; + completed_hw++; + issued_reqs--; + if (completed_hw == 1) { + /* The event is used to wake up the scheduler thread should it + * be blocked waiting for requests to complete. The event resets once + * that thread has cleared out the request queue/table. + */ + SetEvent(completed_req_event); + } + } + LeaveCriticalSection(&queue_lock); +} + +unsigned int +addIORequest(int fd, + int forWriting, + int isSock, + int len, + char* buf) +{ + EnterCriticalSection(&queue_lock); + issued_reqs++; + LeaveCriticalSection(&queue_lock); +#if 0 + fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr); +#endif + return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete); +} + +unsigned int +addDelayRequest(int msecs) +{ + EnterCriticalSection(&queue_lock); + issued_reqs++; + LeaveCriticalSection(&queue_lock); +#if 0 + fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr); +#endif + return AddDelayRequest(msecs,onIOComplete); +} + +unsigned int +addDoProcRequest(void* proc, void* param) +{ + EnterCriticalSection(&queue_lock); + issued_reqs++; + LeaveCriticalSection(&queue_lock); +#if 0 + fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr); +#endif + return AddProcRequest(proc,param,onIOComplete); +} + + +int +startupAsyncIO() +{ + if (!StartIOManager()) { + return 0; + } + InitializeCriticalSection(&queue_lock); + /* Create a pair of events: + * + * - completed_req_event -- signals the deposit of request result; manual reset. + * - abandon_req_wait -- external OS thread tells current RTS/Scheduler + * thread to abandon wait for IO request completion. + * Auto reset. + */ + completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL); + abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL); + wait_handles[0] = completed_req_event; + wait_handles[1] = abandon_req_wait; + completed_hw = 0; + if ( !(completed_table_sema = CreateSemaphore (NULL, MAX_REQUESTS, MAX_REQUESTS, NULL)) ) { + DWORD rc = GetLastError(); + fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n", rc); + fflush(stderr); + } + + return ( completed_req_event != INVALID_HANDLE_VALUE && + abandon_req_wait != INVALID_HANDLE_VALUE && + completed_table_sema != NULL ); +} + +void +shutdownAsyncIO() +{ + CloseHandle(completed_req_event); + ShutdownIOManager(); +} + +/* + * Function: awaitRequests(wait) + * + * Check for the completion of external IO work requests. Worker + * threads signal completion of IO requests by depositing them + * in a table (completedTable). awaitRequests() matches up + * requests in that table with threads on the blocked_queue, + * making the threads whose IO requests have completed runnable + * again. + * + * awaitRequests() is called by the scheduler periodically _or_ if + * it is out of work, and need to wait for the completion of IO + * requests to make further progress. In the latter scenario, + * awaitRequests() will simply block waiting for worker threads + * to complete if the 'completedTable' is empty. + */ +int +awaitRequests(rtsBool wait) +{ +#ifndef THREADED_RTS + // none of this is actually used in the threaded RTS + +start: +#if 0 + fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait); + fflush(stderr); +#endif + EnterCriticalSection(&queue_lock); + /* Nothing immediately available & we won't wait */ + if ((!wait && completed_hw == 0) +#if 0 + // If we just return when wait==rtsFalse, we'll go into a busy + // wait loop, so I disabled this condition --SDM 18/12/2003 + (issued_reqs == 0 && completed_hw == 0) +#endif + ) { + LeaveCriticalSection(&queue_lock); + return 0; + } + if (completed_hw == 0) { + /* empty table, drop lock and wait */ + LeaveCriticalSection(&queue_lock); + if ( wait && sched_state == SCHED_RUNNING ) { + DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE); + switch (dwRes) { + case WAIT_OBJECT_0: + /* a request was completed */ + break; + case WAIT_OBJECT_0 + 1: + case WAIT_TIMEOUT: + /* timeout (unlikely) or told to abandon waiting */ + return 0; + case WAIT_FAILED: { + DWORD dw = GetLastError(); + fprintf(stderr, "awaitRequests: wait failed -- error code: %lu\n", dw); fflush(stderr); + return 0; + } + default: + fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr); + return 0; + } + } else { + return 0; + } + goto start; + } else { + int i; + StgTSO *tso, *prev; + + for (i=0; i < completed_hw; i++) { + /* For each of the completed requests, match up their Ids + * with those of the threads on the blocked_queue. If the + * thread that made the IO request has been subsequently + * killed (and removed from blocked_queue), no match will + * be found for that request Id. + * + * i.e., killing a Haskell thread doesn't attempt to cancel + * the IO request it is blocked on. + * + */ + unsigned int rID = completedTable[i].reqID; + + prev = NULL; + for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) { + + switch(tso->why_blocked) { + case BlockedOnRead: + case BlockedOnWrite: + case BlockedOnDoProc: + if (tso->block_info.async_result->reqID == rID) { + /* Found the thread blocked waiting on request; stodgily fill + * in its result block. + */ + tso->block_info.async_result->len = completedTable[i].len; + tso->block_info.async_result->errCode = completedTable[i].errCode; + + /* Drop the matched TSO from blocked_queue */ + if (prev) { + prev->link = tso->link; + } else { + blocked_queue_hd = tso->link; + } + if (blocked_queue_tl == tso) { + blocked_queue_tl = prev ? prev : END_TSO_QUEUE; + } + + /* Terminates the run queue + this inner for-loop. */ + tso->link = END_TSO_QUEUE; + tso->why_blocked = NotBlocked; + pushOnRunQueue(&MainCapability, tso); + break; + } + break; + default: + if (tso->why_blocked != NotBlocked) { + barf("awaitRequests: odd thread state"); + } + break; + } + } + /* Signal that there's completed table slots available */ + if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) { + DWORD dw = GetLastError(); + fprintf(stderr, "awaitRequests: failed to signal semaphore (error code=0x%x)\n", dw); + fflush(stderr); + } + } + completed_hw = 0; + ResetEvent(completed_req_event); + LeaveCriticalSection(&queue_lock); + return 1; + } +#endif /* !THREADED_RTS */ +} + +/* + * Function: abandonRequestWait() + * + * Wake up a thread that's blocked waiting for new IO requests + * to complete (via awaitRequests().) + */ +void +abandonRequestWait( void ) +{ + /* the event is auto-reset, but in case there's no thread + * already waiting on the event, we want to return it to + * a non-signalled state. + * + * Careful! There is no synchronisation between + * abandonRequestWait and awaitRequest, which means that + * abandonRequestWait might be called just before a thread + * goes into a wait, and we miss the abandon signal. So we + * must SetEvent() here rather than PulseEvent() to ensure + * that the event isn't lost. We can re-optimise by resetting + * the event somewhere safe if we know the event has been + * properly serviced (see resetAbandon() below). --SDM 18/12/2003 + */ + SetEvent(abandon_req_wait); +} + +void +resetAbandonRequestWait( void ) +{ + ResetEvent(abandon_req_wait); +} + diff --git a/rts/win32/AsyncIO.h b/rts/win32/AsyncIO.h new file mode 100644 index 0000000000..2077ea0cf7 --- /dev/null +++ b/rts/win32/AsyncIO.h @@ -0,0 +1,25 @@ +/* AsyncIO.h + * + * Integrating Win32 asynchronous I/O with the GHC RTS. + * + * (c) sof, 2002-2003. + */ +#ifndef __ASYNCHIO_H__ +#define __ASYNCHIO_H__ +extern unsigned int +addIORequest(int fd, + int forWriting, + int isSock, + int len, + char* buf); +extern unsigned int addDelayRequest(int msecs); +extern unsigned int addDoProcRequest(void* proc, void* param); +extern int startupAsyncIO(void); +extern void shutdownAsyncIO(void); + +extern int awaitRequests(rtsBool wait); + +extern void abandonRequestWait(void); +extern void resetAbandonRequestWait(void); + +#endif /* __ASYNCHIO_H__ */ diff --git a/rts/win32/AwaitEvent.c b/rts/win32/AwaitEvent.c new file mode 100644 index 0000000000..43e188fb34 --- /dev/null +++ b/rts/win32/AwaitEvent.c @@ -0,0 +1,51 @@ +#if !defined(THREADED_RTS) /* to the end */ +/* + * Wait/check for external events. Periodically, the + * Scheduler checks for the completion of external operations, + * like the expiration of timers, completion of I/O requests + * issued by Haskell threads. + * + * If the Scheduler is otherwise out of work, it'll block + * herein waiting for external events to occur. + * + * This file mirrors the select()-based functionality + * for POSIX / Unix platforms in rts/Select.c, but for + * Win32. + * + */ +#include "Rts.h" +#include "Schedule.h" +#include "AwaitEvent.h" +#include <windows.h> +#include "win32/AsyncIO.h" + +// Used to avoid calling abandonRequestWait() if we don't need to. +// Protected by sched_mutex. +static nat workerWaitingForRequests = 0; + +void +awaitEvent(rtsBool wait) +{ + int ret; + + do { + /* Try to de-queue completed IO requests + */ + workerWaitingForRequests = 1; + ret = awaitRequests(wait); + workerWaitingForRequests = 0; + if (!ret) { + return; /* still hold the lock */ + } + + // Return to the scheduler if: + // + // - we were interrupted + // - new threads have arrived + + } while (wait + && sched_state == SCHED_RUNNING + && emptyRunQueue(&MainCapability) + ); +} +#endif diff --git a/rts/win32/ConsoleHandler.c b/rts/win32/ConsoleHandler.c new file mode 100644 index 0000000000..d7096db632 --- /dev/null +++ b/rts/win32/ConsoleHandler.c @@ -0,0 +1,313 @@ +/* + * Console control handler support. + * + */ +#include "Rts.h" +#include <windows.h> +#include "ConsoleHandler.h" +#include "SchedAPI.h" +#include "Schedule.h" +#include "RtsUtils.h" +#include "RtsFlags.h" +#include "AsyncIO.h" +#include "RtsSignals.h" + +extern int stg_InstallConsoleEvent(int action, StgStablePtr *handler); + +static BOOL WINAPI shutdown_handler(DWORD dwCtrlType); +static BOOL WINAPI generic_handler(DWORD dwCtrlType); + +static rtsBool deliver_event = rtsTrue; +static StgInt console_handler = STG_SIG_DFL; + +static HANDLE hConsoleEvent = INVALID_HANDLE_VALUE; + +#define N_PENDING_EVENTS 16 +StgInt stg_pending_events = 0; /* number of undelivered events */ +DWORD stg_pending_buf[N_PENDING_EVENTS]; /* their associated event numbers. */ + +/* + * Function: initUserSignals() + * + * Initialize the console handling substrate. + */ +void +initUserSignals(void) +{ + stg_pending_events = 0; + console_handler = STG_SIG_DFL; + if (hConsoleEvent == INVALID_HANDLE_VALUE) { + hConsoleEvent = + CreateEvent ( NULL, /* default security attributes */ + TRUE, /* manual-reset event */ + FALSE, /* initially non-signalled */ + NULL); /* no name */ + } + return; +} + +/* + * Function: shutdown_handler() + * + * Local function that performs the default handling of Ctrl+C kind + * events; gently shutting down the RTS + * + * To repeat Signals.c remark -- user code may choose to override the + * default handler. Which is fine, assuming they put back the default + * handler when/if they de-install the custom handler. + * + */ +static BOOL WINAPI shutdown_handler(DWORD dwCtrlType) +{ + switch (dwCtrlType) { + + case CTRL_CLOSE_EVENT: + /* see generic_handler() comment re: this event */ + return FALSE; + case CTRL_C_EVENT: + case CTRL_BREAK_EVENT: + + // If we're already trying to interrupt the RTS, terminate with + // extreme prejudice. So the first ^C tries to exit the program + // cleanly, and the second one just kills it. + if (sched_state >= SCHED_INTERRUPTING) { + stg_exit(EXIT_INTERRUPTED); + } else { + interruptStgRts(); + /* Cheesy pulsing of an event to wake up a waiting RTS thread, if any */ + abandonRequestWait(); + resetAbandonRequestWait(); + } + return TRUE; + + /* shutdown + logoff events are not handled here. */ + default: + return FALSE; + } +} + + +/* + * Function: initDefaultHandlers() + * + * Install any default signal/console handlers. Currently we install a + * Ctrl+C handler that shuts down the RTS in an orderly manner. + */ +void initDefaultHandlers(void) +{ + if ( !SetConsoleCtrlHandler(shutdown_handler, TRUE) ) { + errorBelch("warning: failed to install default console handler"); + } +} + + +/* + * Function: blockUserSignals() + * + * Temporarily block the delivery of further console events. Needed to + * avoid race conditions when GCing the stack of outstanding handlers or + * when emptying the stack by running the handlers. + * + */ +void +blockUserSignals(void) +{ + deliver_event = rtsFalse; +} + + +/* + * Function: unblockUserSignals() + * + * The inverse of blockUserSignals(); re-enable the deliver of console events. + */ +void +unblockUserSignals(void) +{ + deliver_event = rtsTrue; +} + + +/* + * Function: awaitUserSignals() + * + * Wait for the next console event. Currently a NOP (returns immediately.) + */ +void awaitUserSignals(void) +{ + return; +} + + +/* + * Function: startSignalHandlers() + * + * Run the handlers associated with the stacked up console events. Console + * event delivery is blocked for the duration of this call. + */ +void startSignalHandlers(Capability *cap) +{ + StgStablePtr handler; + + if (console_handler < 0) { + return; + } + + blockUserSignals(); + ACQUIRE_LOCK(&sched_mutex); + + handler = deRefStablePtr((StgStablePtr)console_handler); + while (stg_pending_events > 0) { + stg_pending_events--; + scheduleThread(cap, + createIOThread(cap, + RtsFlags.GcFlags.initialStkSize, + rts_apply(cap, + (StgClosure *)handler, + rts_mkInt(cap, + stg_pending_buf[stg_pending_events])))); + } + + RELEASE_LOCK(&sched_mutex); + unblockUserSignals(); +} + +/* + * Function: markSignalHandlers() + * + * Evacuate the handler stack. _Assumes_ that console event delivery + * has already been blocked. + */ +void markSignalHandlers (evac_fn evac) +{ + if (console_handler >= 0) { + StgPtr p = deRefStablePtr((StgStablePtr)console_handler); + evac((StgClosure**)(void *)&p); + } +} + + +/* + * Function: generic_handler() + * + * Local function which handles incoming console event (done in a sep OS thread), + * recording the event in stg_pending_events. + */ +static BOOL WINAPI generic_handler(DWORD dwCtrlType) +{ + ACQUIRE_LOCK(&sched_mutex); + + /* Ultra-simple -- up the counter + signal a switch. */ + switch(dwCtrlType) { + case CTRL_CLOSE_EVENT: + /* Don't support the delivery of this event; if we + * indicate that we've handled it here and the Haskell handler + * doesn't take proper action (e.g., terminate the OS process), + * the user of the app will be unable to kill/close it. Not + * good, so disable the delivery for now. + */ + return FALSE; + default: + if (!deliver_event) return TRUE; + + if ( stg_pending_events < N_PENDING_EVENTS ) { + stg_pending_buf[stg_pending_events] = dwCtrlType; + stg_pending_events++; + } + /* Cheesy pulsing of an event to wake up a waiting RTS thread, if any */ + abandonRequestWait(); + resetAbandonRequestWait(); + return TRUE; + } + + RELEASE_LOCK(&sched_mutex); +} + + +/* + * Function: rts_InstallConsoleEvent() + * + * Install/remove a console event handler. + */ +int +rts_InstallConsoleEvent(int action, StgStablePtr *handler) +{ + StgInt previous_hdlr = console_handler; + + switch (action) { + case STG_SIG_IGN: + console_handler = STG_SIG_IGN; + if ( !SetConsoleCtrlHandler(NULL, TRUE) ) { + errorBelch("warning: unable to ignore console events"); + } + break; + case STG_SIG_DFL: + console_handler = STG_SIG_IGN; + if ( !SetConsoleCtrlHandler(NULL, FALSE) ) { + errorBelch("warning: unable to restore default console event handling"); + } + break; + case STG_SIG_HAN: + console_handler = (StgInt)*handler; + if ( previous_hdlr < 0 ) { + /* Only install generic_handler() once */ + if ( !SetConsoleCtrlHandler(generic_handler, TRUE) ) { + errorBelch("warning: unable to install console event handler"); + } + } + break; + } + + if (previous_hdlr == STG_SIG_DFL || + previous_hdlr == STG_SIG_IGN) { + return previous_hdlr; + } else { + *handler = (StgStablePtr)previous_hdlr; + return STG_SIG_HAN; + } +} + +/* + * Function: rts_HandledConsoleEvent() + * + * Signal that a Haskell console event handler has completed its run. + * The explicit notification that a Haskell handler has completed is + * required to better handle the delivery of Ctrl-C/Break events whilst + * an async worker thread is handling a read request on stdin. The + * Win32 console implementation will abort such a read request when Ctrl-C + * is delivered. That leaves the worker thread in a bind: should it + * abandon the request (the Haskell thread reading from stdin has been + * thrown an exception to signal the delivery of Ctrl-C & hence have + * aborted the I/O request) or simply ignore the aborted read and retry? + * (the Haskell thread reading from stdin isn't concerned with the + * delivery and handling of Ctrl-C.) With both scenarios being + * possible, the worker thread needs to be told -- that is, did the + * console event handler cause the IO request to be abandoned? + * + */ +void +rts_ConsoleHandlerDone(int ev) +{ + if ( (DWORD)ev == CTRL_BREAK_EVENT || + (DWORD)ev == CTRL_C_EVENT ) { + /* only these two cause stdin system calls to abort.. */ + SetEvent(hConsoleEvent); /* event is manual-reset */ + Sleep(0); /* yield */ + ResetEvent(hConsoleEvent); /* turn it back off again */ + } +} + +/* + * Function: rts_waitConsoleHandlerCompletion() + * + * Esoteric entry point used by worker thread that got woken + * up as part Ctrl-C delivery. + */ +int +rts_waitConsoleHandlerCompletion() +{ + /* As long as the worker doesn't need to do a multiple wait, + * let's keep this HANDLE private to this 'module'. + */ + return (WaitForSingleObject(hConsoleEvent, INFINITE) == WAIT_OBJECT_0); +} diff --git a/rts/win32/ConsoleHandler.h b/rts/win32/ConsoleHandler.h new file mode 100644 index 0000000000..b09adf71cb --- /dev/null +++ b/rts/win32/ConsoleHandler.h @@ -0,0 +1,63 @@ +/* + * Console control handler support. + * + */ +#ifndef __CONSOLEHANDLER_H__ +#define __CONSOLEHANDLER_H__ + +/* + * Console control handlers lets an application handle Ctrl+C, Ctrl+Break etc. + * in Haskell under Win32. Akin to the Unix signal SIGINT. + * + * The API offered by ConsoleHandler.h is identical to that of the signal handling + * code (which isn't supported under win32.) Unsurprisingly, the underlying impl + * is derived from the signal handling code also. + */ + +/* + * Function: signals_pending() + * + * Used by the RTS to check whether new signals have been 'recently' reported. + * If so, the RTS arranges for the delivered signals to be handled by + * de-queueing them from their table, running the associated Haskell + * signal handler. + */ +extern StgInt stg_pending_events; + +#define signals_pending() ( stg_pending_events > 0) + +/* + * Function: anyUserHandlers() + * + * Used by the Scheduler to decide whether its worth its while to stick + * around waiting for an external signal when there are no threads + * runnable. A console handler is used to handle termination events (Ctrl+C) + * and isn't considered a 'user handler'. + */ +#define anyUserHandlers() (rtsFalse) + +/* + * Function: startSignalHandlers() + * + * Run the handlers associated with the queued up console events. Console + * event delivery is blocked for the duration of this call. + */ +extern void startSignalHandlers(Capability *cap); + +/* + * Function: handleSignalsInThisThread() + * + * Have current (OS) thread assume responsibility of handling console events/signals. + * Currently not used (by the console event handling code.) + */ +extern void handleSignalsInThisThread(void); + +/* + * Function: rts_waitConsoleHandlerCompletion() + * + * Esoteric entry point used by worker thread that got woken + * up as part Ctrl-C delivery. + */ +extern int rts_waitConsoleHandlerCompletion(void); + +#endif /* __CONSOLEHANDLER_H__ */ diff --git a/rts/win32/GetTime.c b/rts/win32/GetTime.c new file mode 100644 index 0000000000..584b994d53 --- /dev/null +++ b/rts/win32/GetTime.c @@ -0,0 +1,101 @@ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team 2005 + * + * Machine-dependent time measurement functions + * + * ---------------------------------------------------------------------------*/ + +#include "Rts.h" +#include "GetTime.h" + +#include <windows.h> + +#ifdef HAVE_TIME_H +# include <time.h> +#endif + +#define HNS_PER_SEC 10000000LL /* FILETIMES are in units of 100ns */ +/* Convert FILETIMEs into secs */ + +static INLINE_ME Ticks +fileTimeToTicks(FILETIME ft) +{ + Ticks t; + t = ((Ticks)ft.dwHighDateTime << 32) | ft.dwLowDateTime; + t = (t * TICKS_PER_SECOND) / HNS_PER_SEC; + return t; +} + +static int is_win9x = -1; + +static INLINE_ME rtsBool +isWin9x(void) +{ + if (is_win9x < 0) { + /* figure out whether we're on a Win9x box or not. */ + OSVERSIONINFO oi; + BOOL b; + + /* Need to init the size field first.*/ + oi.dwOSVersionInfoSize = sizeof(OSVERSIONINFO); + b = GetVersionEx(&oi); + + is_win9x = ( (b && (oi.dwPlatformId & VER_PLATFORM_WIN32_WINDOWS)) ? 1 : 0); + } + return is_win9x; +} + + +void +getProcessTimes(Ticks *user, Ticks *elapsed) +{ + *user = getProcessCPUTime(); + *elapsed = getProcessElapsedTime(); +} + +Ticks +getProcessCPUTime(void) +{ + FILETIME creationTime, exitTime, userTime, kernelTime = {0,0}; + + if (isWin9x()) return getProcessElapsedTime(); + + if (!GetProcessTimes(GetCurrentProcess(), &creationTime, + &exitTime, &kernelTime, &userTime)) { + return 0; + } + + return fileTimeToTicks(userTime); +} + +Ticks +getProcessElapsedTime(void) +{ + FILETIME system_time; + GetSystemTimeAsFileTime(&system_time); + return fileTimeToTicks(system_time); +} + +Ticks +getThreadCPUTime(void) +{ + FILETIME creationTime, exitTime, userTime, kernelTime = {0,0}; + + if (isWin9x()) return getProcessCPUTime(); + + if (!GetThreadTimes(GetCurrentThread(), &creationTime, + &exitTime, &kernelTime, &userTime)) { + return 0; + } + + return fileTimeToTicks(userTime); +} + +nat +getPageFaults(void) +{ + /* ToDo (on NT): better, get this via the performance data + that's stored in the registry. */ + return 0; +} diff --git a/rts/win32/IOManager.c b/rts/win32/IOManager.c new file mode 100644 index 0000000000..a67c3504c1 --- /dev/null +++ b/rts/win32/IOManager.c @@ -0,0 +1,510 @@ +/* IOManager.c + * + * Non-blocking / asynchronous I/O for Win32. + * + * (c) sof, 2002-2003. + */ +#include "Rts.h" +#include "IOManager.h" +#include "WorkQueue.h" +#include "ConsoleHandler.h" +#include <stdio.h> +#include <stdlib.h> +#include <io.h> +#include <winsock.h> +#include <process.h> + +/* + * Internal state maintained by the IO manager. + */ +typedef struct IOManagerState { + CritSection manLock; + WorkQueue* workQueue; + int queueSize; + int numWorkers; + int workersIdle; + HANDLE hExitEvent; + unsigned int requestID; + /* fields for keeping track of active WorkItems */ + CritSection active_work_lock; + WorkItem* active_work_items; +} IOManagerState; + +/* ToDo: wrap up this state via a IOManager handle instead? */ +static IOManagerState* ioMan; + +static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi); +static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi); + +/* + * The routine executed by each worker thread. + */ +static +unsigned +WINAPI +IOWorkerProc(PVOID param) +{ + HANDLE hWaits[2]; + DWORD rc; + IOManagerState* iom = (IOManagerState*)param; + WorkQueue* pq = iom->workQueue; + WorkItem* work; + int len = 0, fd = 0; + DWORD errCode = 0; + void* complData; + + hWaits[0] = (HANDLE)iom->hExitEvent; + hWaits[1] = GetWorkQueueHandle(pq); + + while (1) { + /* The error code is communicated back on completion of request; reset. */ + errCode = 0; + + EnterCriticalSection(&iom->manLock); + /* Signal that the worker is idle. + * + * 'workersIdle' is used when determining whether or not to + * increase the worker thread pool when adding a new request. + * (see addIORequest().) + */ + iom->workersIdle++; + LeaveCriticalSection(&iom->manLock); + + /* + * A possible future refinement is to make long-term idle threads + * wake up and decide to shut down should the number of idle threads + * be above some threshold. + * + */ + rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE ); + + if (rc == WAIT_OBJECT_0) { + // we received the exit event + return 0; + } + + EnterCriticalSection(&iom->manLock); + /* Signal that the thread is 'non-idle' and about to consume + * a work item. + */ + iom->workersIdle--; + iom->queueSize--; + LeaveCriticalSection(&iom->manLock); + + if ( rc == (WAIT_OBJECT_0 + 1) ) { + /* work item available, fetch it. */ + if (FetchWork(pq,(void**)&work)) { + work->abandonOp = 0; + RegisterWorkItem(iom,work); + if ( work->workKind & WORKER_READ ) { + if ( work->workKind & WORKER_FOR_SOCKET ) { + len = recv(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len, + 0); + if (len == SOCKET_ERROR) { + errCode = WSAGetLastError(); + } + } else { + while (1) { + /* Do the read(), with extra-special handling for Ctrl+C */ + len = read(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len); + if ( len == 0 && work->workData.ioData.len != 0 ) { + /* Given the following scenario: + * - a console handler has been registered that handles Ctrl+C + * events. + * - we've not tweaked the 'console mode' settings to turn on + * ENABLE_PROCESSED_INPUT. + * - we're blocked waiting on input from standard input. + * - the user hits Ctrl+C. + * + * The OS will invoke the console handler (in a separate OS thread), + * and the above read() (i.e., under the hood, a ReadFile() op) returns + * 0, with the error set to ERROR_OPERATION_ABORTED. We don't + * want to percolate this error condition back to the Haskell user. + * Do this by waiting for the completion of the Haskell console handler. + * If upon completion of the console handler routine, the Haskell thread + * that issued the request is found to have been thrown an exception, + * the worker abandons the request (since that's what the Haskell thread + * has done.) If the Haskell thread hasn't been interrupted, the worker + * retries the read request as if nothing happened. + */ + if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) { + /* For now, only abort when dealing with the standard input handle. + * i.e., for all others, an error is raised. + */ + HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE); + if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) { + if (rts_waitConsoleHandlerCompletion()) { + /* If the Scheduler has set work->abandonOp, the Haskell thread has + * been thrown an exception (=> the worker must abandon this request.) + * We test for this below before invoking the on-completion routine. + */ + if (work->abandonOp) { + break; + } else { + continue; + } + } + } else { + break; /* Treat it like an error */ + } + } else { + break; + } + } else { + break; + } + } + if (len == -1) { errCode = errno; } + } + complData = work->workData.ioData.buf; + fd = work->workData.ioData.fd; + } else if ( work->workKind & WORKER_WRITE ) { + if ( work->workKind & WORKER_FOR_SOCKET ) { + len = send(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len, + 0); + if (len == SOCKET_ERROR) { + errCode = WSAGetLastError(); + } + } else { + len = write(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len); + if (len == -1) { errCode = errno; } + } + complData = work->workData.ioData.buf; + fd = work->workData.ioData.fd; + } else if ( work->workKind & WORKER_DELAY ) { + /* Approximate implementation of threadDelay; + * + * Note: Sleep() is in milliseconds, not micros. + */ + Sleep(work->workData.delayData.msecs / 1000); + len = work->workData.delayData.msecs; + complData = NULL; + fd = 0; + errCode = 0; + } else if ( work->workKind & WORKER_DO_PROC ) { + /* perform operation/proc on behalf of Haskell thread. */ + if (work->workData.procData.proc) { + /* The procedure is assumed to encode result + success/failure + * via its param. + */ + errCode=work->workData.procData.proc(work->workData.procData.param); + } else { + errCode=1; + } + complData = work->workData.procData.param; + } else { + fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind); + fflush(stderr); + continue; + } + if (!work->abandonOp) { + work->onCompletion(work->requestID, + fd, + len, + complData, + errCode); + } + /* Free the WorkItem */ + DeregisterWorkItem(iom,work); + free(work); + } else { + fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr); + return 1; + } + } else { + fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr); + return 1; + } + } + return 0; +} + +static +BOOL +NewIOWorkerThread(IOManagerState* iom) +{ + unsigned threadId; + return ( 0 != _beginthreadex(NULL, + 0, + IOWorkerProc, + (LPVOID)iom, + 0, + &threadId) ); +} + +BOOL +StartIOManager(void) +{ + HANDLE hExit; + WorkQueue* wq; + + wq = NewWorkQueue(); + if ( !wq ) return FALSE; + + ioMan = (IOManagerState*)malloc(sizeof(IOManagerState)); + + if (!ioMan) { + FreeWorkQueue(wq); + return FALSE; + } + + /* A manual-reset event */ + hExit = CreateEvent ( NULL, TRUE, FALSE, NULL ); + if ( !hExit ) { + FreeWorkQueue(wq); + free(ioMan); + return FALSE; + } + + ioMan->hExitEvent = hExit; + InitializeCriticalSection(&ioMan->manLock); + ioMan->workQueue = wq; + ioMan->numWorkers = 0; + ioMan->workersIdle = 0; + ioMan->queueSize = 0; + ioMan->requestID = 1; + InitializeCriticalSection(&ioMan->active_work_lock); + ioMan->active_work_items = NULL; + + return TRUE; +} + +/* + * Function: depositWorkItem() + * + * Local function which deposits a WorkItem onto a work queue, + * deciding in the process whether or not the thread pool needs + * to be augmented with another thread to handle the new request. + * + */ +static +int +depositWorkItem( unsigned int reqID, + WorkItem* wItem ) +{ + EnterCriticalSection(&ioMan->manLock); + +#if 0 + fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); + fflush(stderr); +#endif + /* A new worker thread is created when there are fewer idle threads + * than non-consumed queue requests. This ensures that requests will + * be dealt with in a timely manner. + * + * [Long explanation of why the previous thread pool policy lead to + * trouble] + * + * Previously, the thread pool was augmented iff no idle worker threads + * were available. That strategy runs the risk of repeatedly adding to + * the request queue without expanding the thread pool to handle this + * sudden spike in queued requests. + * [How? Assume workersIdle is 1, and addIORequest() is called. No new + * thread is created and the request is simply queued. If addIORequest() + * is called again _before the OS schedules a worker thread to pull the + * request off the queue_, workersIdle is still 1 and another request is + * simply added to the queue. Once the worker thread is run, only one + * request is de-queued, leaving the 2nd request in the queue] + * + * Assuming none of the queued requests take an inordinate amount of to + * complete, the request queue would eventually be drained. But if that's + * not the case, the later requests will end up languishing in the queue + * indefinitely. The non-timely handling of requests may cause CH applications + * to misbehave / hang; bad. + * + */ + ioMan->queueSize++; + if ( (ioMan->workersIdle < ioMan->queueSize) ) { + /* see if giving up our quantum ferrets out some idle threads. + */ + LeaveCriticalSection(&ioMan->manLock); + Sleep(0); + EnterCriticalSection(&ioMan->manLock); + if ( (ioMan->workersIdle < ioMan->queueSize) ) { + /* No, go ahead and create another. */ + ioMan->numWorkers++; + LeaveCriticalSection(&ioMan->manLock); + NewIOWorkerThread(ioMan); + } else { + LeaveCriticalSection(&ioMan->manLock); + } + } else { + LeaveCriticalSection(&ioMan->manLock); + } + + if (SubmitWork(ioMan->workQueue,wItem)) { + /* Note: the work item has potentially been consumed by a worker thread + * (and freed) at this point, so we cannot use wItem's requestID. + */ + return reqID; + } else { + return 0; + } +} + +/* + * Function: AddIORequest() + * + * Conduit to underlying WorkQueue's SubmitWork(); adds IO + * request to work queue, deciding whether or not to augment + * the thread pool in the process. + */ +int +AddIORequest ( int fd, + BOOL forWriting, + BOOL isSocket, + int len, + char* buffer, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return 0; + + /* Fill in the blanks */ + wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) | + ( forWriting ? WORKER_WRITE : WORKER_READ ); + wItem->workData.ioData.fd = fd; + wItem->workData.ioData.len = len; + wItem->workData.ioData.buf = buffer; + wItem->link = NULL; + + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + + return depositWorkItem(reqID, wItem); +} + +/* + * Function: AddDelayRequest() + * + * Like AddIORequest(), but this time adding a delay request to + * the request queue. + */ +BOOL +AddDelayRequest ( unsigned int msecs, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return FALSE; + + /* Fill in the blanks */ + wItem->workKind = WORKER_DELAY; + wItem->workData.delayData.msecs = msecs; + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + wItem->link = NULL; + + return depositWorkItem(reqID, wItem); +} + +/* + * Function: AddProcRequest() + * + * Add an asynchronous procedure request. + */ +BOOL +AddProcRequest ( void* proc, + void* param, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return FALSE; + + /* Fill in the blanks */ + wItem->workKind = WORKER_DO_PROC; + wItem->workData.procData.proc = proc; + wItem->workData.procData.param = param; + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + wItem->abandonOp = 0; + wItem->link = NULL; + + return depositWorkItem(reqID, wItem); +} + +void ShutdownIOManager ( void ) +{ + SetEvent(ioMan->hExitEvent); + // ToDo: we can't free this now, because the worker thread(s) + // haven't necessarily finished with it yet. Perhaps it should + // have a reference count or something. + // free(ioMan); + // ioMan = NULL; +} + +/* Keep track of WorkItems currently being serviced. */ +static +void +RegisterWorkItem(IOManagerState* ioMan, + WorkItem* wi) +{ + EnterCriticalSection(&ioMan->active_work_lock); + wi->link = ioMan->active_work_items; + ioMan->active_work_items = wi; + LeaveCriticalSection(&ioMan->active_work_lock); +} + +static +void +DeregisterWorkItem(IOManagerState* ioMan, + WorkItem* wi) +{ + WorkItem *ptr, *prev; + + EnterCriticalSection(&ioMan->active_work_lock); + for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) { + if (wi->requestID == ptr->requestID) { + if (prev==NULL) { + ioMan->active_work_items = ptr->link; + } else { + prev->link = ptr->link; + } + LeaveCriticalSection(&ioMan->active_work_lock); + return; + } + } + fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID); + LeaveCriticalSection(&ioMan->active_work_lock); +} + + +/* + * Function: abandonWorkRequest() + * + * Signal that a work request isn't of interest. Called by the Scheduler + * if a blocked Haskell thread has an exception thrown to it. + * + * Note: we're not aborting the system call that a worker might be blocked on + * here, just disabling the propagation of its result once its finished. We + * may have to go the whole hog here and switch to overlapped I/O so that we + * can abort blocked system calls. + */ +void +abandonWorkRequest ( int reqID ) +{ + WorkItem *ptr; + EnterCriticalSection(&ioMan->active_work_lock); + for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) { + if (ptr->requestID == (unsigned int)reqID ) { + ptr->abandonOp = 1; + LeaveCriticalSection(&ioMan->active_work_lock); + return; + } + } + /* Note: if the request ID isn't present, the worker will have + * finished sometime since awaitRequests() last drained the completed + * request table; i.e., not an error. + */ + LeaveCriticalSection(&ioMan->active_work_lock); +} diff --git a/rts/win32/IOManager.h b/rts/win32/IOManager.h new file mode 100644 index 0000000000..4893e2387c --- /dev/null +++ b/rts/win32/IOManager.h @@ -0,0 +1,110 @@ +/* IOManager.h + * + * Non-blocking / asynchronous I/O for Win32. + * + * (c) sof, 2002-2003 + */ +#ifndef __IOMANAGER_H__ +#define __IOMANAGER_H__ +/* On the yucky side..suppress -Wmissing-declarations warnings when + * including <windows.h> + */ +extern void* GetCurrentFiber ( void ); +extern void* GetFiberData ( void ); +#include <windows.h> + +/* + The IOManager subsystem provides a non-blocking view + of I/O operations. It lets one (or more) OS thread(s) + issue multiple I/O requests, which the IOManager then + handles independently of/concurrent to the thread(s) + that issued the request. Upon completion, the issuing + thread can inspect the result of the I/O operation & + take appropriate action. + + The IOManager is intended used with the GHC RTS to + implement non-blocking I/O in Concurrent Haskell. + */ + +/* + * Our WorkQueue holds WorkItems, encoding IO and + * delay requests. + * + */ +typedef void (*CompletionProc)(unsigned int requestID, + int fd, + int len, + void* buf, + int errCode); + +/* + * Asynchronous procedure calls executed by a worker thread + * take a generic state argument pointer and return an int by + * default. + */ +typedef int (*DoProcProc)(void *param); + +typedef union workData { + struct { + int fd; + int len; + char *buf; + } ioData; + struct { + int msecs; + } delayData; + struct { + DoProcProc proc; + void* param; + } procData; +} WorkData; + +typedef struct WorkItem { + unsigned int workKind; + WorkData workData; + unsigned int requestID; + CompletionProc onCompletion; + unsigned int abandonOp; + struct WorkItem *link; +} WorkItem; + +extern CompletionProc onComplete; + +/* the kind of operations supported; you could easily imagine + * that instead of passing a tag describing the work to be performed, + * a function pointer is passed instead. Maybe later. + */ +#define WORKER_READ 1 +#define WORKER_WRITE 2 +#define WORKER_DELAY 4 +#define WORKER_FOR_SOCKET 8 +#define WORKER_DO_PROC 16 + +/* + * Starting up and shutting down. + */ +extern BOOL StartIOManager ( void ); +extern void ShutdownIOManager ( void ); + +/* + * Adding I/O and delay requests. With each request a + * completion routine is supplied, which the worker thread + * will invoke upon completion. + */ +extern int AddDelayRequest ( unsigned int msecs, + CompletionProc onCompletion); + +extern int AddIORequest ( int fd, + BOOL forWriting, + BOOL isSocket, + int len, + char* buffer, + CompletionProc onCompletion); + +extern int AddProcRequest ( void* proc, + void* data, + CompletionProc onCompletion); + +extern void abandonWorkRequest ( int reqID ); + +#endif /* __IOMANAGER_H__ */ diff --git a/rts/win32/OSThreads.c b/rts/win32/OSThreads.c new file mode 100644 index 0000000000..c772be38f4 --- /dev/null +++ b/rts/win32/OSThreads.c @@ -0,0 +1,199 @@ +/* --------------------------------------------------------------------------- + * + * (c) The GHC Team, 2001-2005 + * + * Accessing OS threads functionality in a (mostly) OS-independent + * manner. + * + * --------------------------------------------------------------------------*/ + +#include "Rts.h" +#if defined(THREADED_RTS) +#include "OSThreads.h" +#include "RtsUtils.h" + +/* For reasons not yet clear, the entire contents of process.h is protected + * by __STRICT_ANSI__ not being defined. + */ +#undef __STRICT_ANSI__ +#include <process.h> + +/* Win32 threads and synchronisation objects */ + +/* A Condition is represented by a Win32 Event object; + * a Mutex by a Mutex kernel object. + * + * ToDo: go through the defn and usage of these to + * make sure the semantics match up with that of + * the (assumed) pthreads behaviour. This is really + * just a first pass at getting something compilable. + */ + +void +initCondition( Condition* pCond ) +{ + HANDLE h = CreateEvent(NULL, + FALSE, /* auto reset */ + FALSE, /* initially not signalled */ + NULL); /* unnamed => process-local. */ + + if ( h == NULL ) { + errorBelch("initCondition: unable to create"); + } + *pCond = h; + return; +} + +void +closeCondition( Condition* pCond ) +{ + if ( CloseHandle(*pCond) == 0 ) { + errorBelch("closeCondition: failed to close"); + } + return; +} + +rtsBool +broadcastCondition ( Condition* pCond ) +{ + PulseEvent(*pCond); + return rtsTrue; +} + +rtsBool +signalCondition ( Condition* pCond ) +{ + if (SetEvent(*pCond) == 0) { + barf("SetEvent: %d", GetLastError()); + } + return rtsTrue; +} + +rtsBool +waitCondition ( Condition* pCond, Mutex* pMut ) +{ + RELEASE_LOCK(pMut); + WaitForSingleObject(*pCond, INFINITE); + /* Hmm..use WaitForMultipleObjects() ? */ + ACQUIRE_LOCK(pMut); + return rtsTrue; +} + +void +yieldThread() +{ + Sleep(0); + return; +} + +void +shutdownThread() +{ + _endthreadex(0); +} + +int +createOSThread (OSThreadId* pId, OSThreadProc *startProc, void *param) +{ + + return (_beginthreadex ( NULL, /* default security attributes */ + 0, + (unsigned (__stdcall *)(void *)) startProc, + param, + 0, + (unsigned*)pId) == 0); +} + +OSThreadId +osThreadId() +{ + return GetCurrentThreadId(); +} + +#ifdef USE_CRITICAL_SECTIONS +void +initMutex (Mutex* pMut) +{ + InitializeCriticalSectionAndSpinCount(pMut,4000); +} +#else +void +initMutex (Mutex* pMut) +{ + HANDLE h = CreateMutex ( NULL, /* default sec. attributes */ + FALSE, /* not owned => initially signalled */ + NULL + ); + *pMut = h; + return; +} +#endif + +void +newThreadLocalKey (ThreadLocalKey *key) +{ + DWORD r; + r = TlsAlloc(); + if (r == TLS_OUT_OF_INDEXES) { + barf("newThreadLocalKey: out of keys"); + } + *key = r; +} + +void * +getThreadLocalVar (ThreadLocalKey *key) +{ + void *r; + r = TlsGetValue(*key); +#ifdef DEBUG + // r is allowed to be NULL - it can mean that either there was an + // error or the stored value is in fact NULL. + if (GetLastError() != NO_ERROR) { + barf("getThreadLocalVar: key not found"); + } +#endif + return r; +} + +void +setThreadLocalVar (ThreadLocalKey *key, void *value) +{ + BOOL b; + b = TlsSetValue(*key, value); + if (!b) { + barf("setThreadLocalVar: %d", GetLastError()); + } +} + + +static unsigned __stdcall +forkOS_createThreadWrapper ( void * entry ) +{ + Capability *cap; + cap = rts_lock(); + cap = rts_evalStableIO(cap, (HsStablePtr) entry, NULL); + rts_unlock(cap); + return 0; +} + +int +forkOS_createThread ( HsStablePtr entry ) +{ + unsigned long pId; + return (_beginthreadex ( NULL, /* default security attributes */ + 0, + forkOS_createThreadWrapper, + (void*)entry, + 0, + (unsigned*)&pId) == 0); +} + +#else /* !defined(THREADED_RTS) */ + +int +forkOS_createThread ( HsStablePtr entry STG_UNUSED ) +{ + return -1; +} + +#endif /* !defined(THREADED_RTS) */ diff --git a/rts/win32/Ticker.c b/rts/win32/Ticker.c new file mode 100644 index 0000000000..ab791d8dc7 --- /dev/null +++ b/rts/win32/Ticker.c @@ -0,0 +1,124 @@ +/* + * RTS periodic timers. + * + */ +#include "Rts.h" +#include "Timer.h" +#include "Ticker.h" +#include <windows.h> +#include <stdio.h> +#include <process.h> +#include "OSThreads.h" + +/* + * Provide a timer service for the RTS, periodically + * notifying it that a number of 'ticks' has passed. + * + */ + +/* To signal shutdown of the timer service, we use a local + * event which the timer thread listens to (and stopVirtTimer() + * signals.) + */ +static HANDLE hStopEvent = INVALID_HANDLE_VALUE; +static HANDLE tickThread = INVALID_HANDLE_VALUE; + +static TickProc tickProc = NULL; + +/* + * Ticking is done by a separate thread which periodically + * wakes up to handle a tick. + * + * This is the portable way of providing a timer service under + * Win32; features like waitable timers or timer queues are only + * supported by a subset of the Win32 platforms (notably not + * under Win9x.) + * + */ +static +unsigned +WINAPI +TimerProc(PVOID param) +{ + int ms = (int)param; + DWORD waitRes; + + /* interpret a < 0 timeout period as 'instantaneous' */ + if (ms < 0) ms = 0; + + while (1) { + waitRes = WaitForSingleObject(hStopEvent, ms); + + switch (waitRes) { + case WAIT_OBJECT_0: + /* event has become signalled */ + tickProc = NULL; + CloseHandle(hStopEvent); + return 0; + case WAIT_TIMEOUT: + /* tick */ + tickProc(0); + break; + case WAIT_FAILED: { + DWORD dw = GetLastError(); + fprintf(stderr, "TimerProc: wait failed -- error code: %lu\n", dw); fflush(stderr); + break; + } + default: + fprintf(stderr, "TimerProc: unexpected result %lu\n", waitRes); fflush(stderr); + break; + } + } + return 0; +} + + +int +startTicker(nat ms, TickProc handle_tick) +{ + unsigned threadId; + /* 'hStopEvent' is a manual-reset event that's signalled upon + * shutdown of timer service (=> timer thread.) + */ + hStopEvent = CreateEvent ( NULL, + TRUE, + FALSE, + NULL); + if (hStopEvent == INVALID_HANDLE_VALUE) { + return 0; + } + tickProc = handle_tick; + tickThread = (HANDLE)(long)_beginthreadex( NULL, + 0, + TimerProc, + (LPVOID)ms, + 0, + &threadId); + return (tickThread != 0); +} + +int +stopTicker(void) +{ + // We must wait for the ticker thread to terminate, since if we + // are in a DLL that is about to be unloaded, the ticker thread + // cannot be allowed to return to a missing DLL. + + if (hStopEvent != INVALID_HANDLE_VALUE && + tickThread != INVALID_HANDLE_VALUE) { + DWORD exitCode; + SetEvent(hStopEvent); + while (1) { + WaitForSingleObject(tickThread, 20); + if (!GetExitCodeThread(tickThread, &exitCode)) { + return 1; + } + if (exitCode != STILL_ACTIVE) { + tickThread = INVALID_HANDLE_VALUE; + return 0; + } + TerminateThread(tickThread, 0); + } + } + return 0; +} diff --git a/rts/win32/WorkQueue.c b/rts/win32/WorkQueue.c new file mode 100644 index 0000000000..85a23608be --- /dev/null +++ b/rts/win32/WorkQueue.c @@ -0,0 +1,215 @@ +/* + * A fixed-size queue; MT-friendly. + * + * (c) sof, 2002-2003. + */ +#include "WorkQueue.h" +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +static void queue_error_rc( char* loc, DWORD err); +static void queue_error( char* loc, char* reason); + + +/* Wrapper around OS call to create semaphore */ +static Semaphore +newSemaphore(int initCount, int max) +{ + Semaphore s; + s = CreateSemaphore ( NULL, /* LPSECURITY_ATTRIBUTES (default) */ + initCount, /* LONG lInitialCount */ + max, /* LONG lMaxCount */ + NULL); /* LPCTSTR (anonymous / no object name) */ + if ( NULL == s) { + queue_error_rc("newSemaphore", GetLastError()); + return NULL; + } + return s; +} + +/* + * Function: NewWorkQueue + * + * The queue constructor - semaphores are initialised to match + * max number of queue entries. + * + */ +WorkQueue* +NewWorkQueue() +{ + WorkQueue* wq = (WorkQueue*)malloc(sizeof(WorkQueue)); + + if (!wq) { + queue_error("NewWorkQueue", "malloc() failed"); + return wq; + } + + wq->head = 0; + wq->tail = 0; + + InitializeCriticalSection(&wq->queueLock); + wq->workAvailable = newSemaphore(0, WORKQUEUE_SIZE); + wq->roomAvailable = newSemaphore(WORKQUEUE_SIZE, WORKQUEUE_SIZE); + + /* Fail if we were unable to create any of the sync objects. */ + if ( NULL == wq->workAvailable || + NULL == wq->roomAvailable ) { + FreeWorkQueue(wq); + return NULL; + } + + return wq; +} + +void +FreeWorkQueue ( WorkQueue* pq ) +{ + /* Close the semaphores; any threads blocked waiting + * on either will as a result be woken up. + */ + if ( pq->workAvailable ) { + CloseHandle(pq->workAvailable); + } + if ( pq->roomAvailable ) { + CloseHandle(pq->workAvailable); + } + free(pq); + return; +} + +HANDLE +GetWorkQueueHandle ( WorkQueue* pq ) +{ + if (!pq) return NULL; + + return pq->workAvailable; +} + +/* + * Function: GetWork + * + * Fetch a work item from the queue, blocking if none available. + * Return value indicates of FALSE indicates error/fatal condition. + */ +BOOL +GetWork ( WorkQueue* pq, void** ppw ) +{ + DWORD rc; + + if (!pq) { + queue_error("GetWork", "NULL WorkQueue object"); + return FALSE; + } + if (!ppw) { + queue_error("GetWork", "NULL WorkItem object"); + return FALSE; + } + + /* Block waiting for work item to become available */ + if ( (rc = WaitForSingleObject( pq->workAvailable, INFINITE)) != WAIT_OBJECT_0 ) { + queue_error_rc("GetWork.WaitForSingleObject(workAvailable)", + ( (WAIT_FAILED == rc) ? GetLastError() : rc)); + return FALSE; + } + + return FetchWork(pq,ppw); +} + +/* + * Function: FetchWork + * + * Fetch a work item from the queue, blocking if none available. + * Return value indicates of FALSE indicates error/fatal condition. + */ +BOOL +FetchWork ( WorkQueue* pq, void** ppw ) +{ + DWORD rc; + + if (!pq) { + queue_error("FetchWork", "NULL WorkQueue object"); + return FALSE; + } + if (!ppw) { + queue_error("FetchWork", "NULL WorkItem object"); + return FALSE; + } + + EnterCriticalSection(&pq->queueLock); + *ppw = pq->items[pq->head]; + /* For sanity's sake, zero out the pointer. */ + pq->items[pq->head] = NULL; + pq->head = (pq->head + 1) % WORKQUEUE_SIZE; + rc = ReleaseSemaphore(pq->roomAvailable,1, NULL); + LeaveCriticalSection(&pq->queueLock); + if ( 0 == rc ) { + queue_error_rc("FetchWork.ReleaseSemaphore()", GetLastError()); + return FALSE; + } + + return TRUE; +} + +/* + * Function: SubmitWork + * + * Add work item to the queue, blocking if no room available. + * Return value indicates of FALSE indicates error/fatal condition. + */ +BOOL +SubmitWork ( WorkQueue* pq, void* pw ) +{ + DWORD rc; + + if (!pq) { + queue_error("SubmitWork", "NULL WorkQueue object"); + return FALSE; + } + if (!pw) { + queue_error("SubmitWork", "NULL WorkItem object"); + return FALSE; + } + + /* Block waiting for work item to become available */ + if ( (rc = WaitForSingleObject( pq->roomAvailable, INFINITE)) != WAIT_OBJECT_0 ) { + queue_error_rc("SubmitWork.WaitForSingleObject(workAvailable)", + ( (WAIT_FAILED == rc) ? GetLastError() : rc)); + + return FALSE; + } + + EnterCriticalSection(&pq->queueLock); + pq->items[pq->tail] = pw; + pq->tail = (pq->tail + 1) % WORKQUEUE_SIZE; + rc = ReleaseSemaphore(pq->workAvailable,1, NULL); + LeaveCriticalSection(&pq->queueLock); + if ( 0 == rc ) { + queue_error_rc("SubmitWork.ReleaseSemaphore()", GetLastError()); + return FALSE; + } + + return TRUE; +} + +/* Error handling */ + +static void +queue_error_rc( char* loc, + DWORD err) +{ + fprintf(stderr, "%s failed: return code = 0x%lx\n", loc, err); + fflush(stderr); + return; +} + + +static void +queue_error( char* loc, + char* reason) +{ + fprintf(stderr, "%s failed: %s\n", loc, reason); + fflush(stderr); + return; +} + diff --git a/rts/win32/WorkQueue.h b/rts/win32/WorkQueue.h new file mode 100644 index 0000000000..bde82a3a77 --- /dev/null +++ b/rts/win32/WorkQueue.h @@ -0,0 +1,37 @@ +/* WorkQueue.h + * + * A fixed-size queue; MT-friendly. + * + * (c) sof, 2002-2003 + * + */ +#ifndef __WORKQUEUE_H__ +#define __WORKQUEUE_H__ +#include <windows.h> + +/* This is a fixed-size queue. */ +#define WORKQUEUE_SIZE 16 + +typedef HANDLE Semaphore; +typedef CRITICAL_SECTION CritSection; + +typedef struct WorkQueue { + /* the master lock, need to be grabbed prior to + using any of the other elements of the struct. */ + CritSection queueLock; + /* consumers/workers block waiting for 'workAvailable' */ + Semaphore workAvailable; + Semaphore roomAvailable; + int head; + int tail; + void** items[WORKQUEUE_SIZE]; +} WorkQueue; + +extern WorkQueue* NewWorkQueue ( void ); +extern void FreeWorkQueue ( WorkQueue* pq ); +extern HANDLE GetWorkQueueHandle ( WorkQueue* pq ); +extern BOOL GetWork ( WorkQueue* pq, void** ppw ); +extern BOOL FetchWork ( WorkQueue* pq, void** ppw ); +extern int SubmitWork ( WorkQueue* pq, void* pw ); + +#endif /* __WORKQUEUE_H__ */ |