summaryrefslogtreecommitdiff
path: root/rts/win32
diff options
context:
space:
mode:
Diffstat (limited to 'rts/win32')
-rw-r--r--rts/win32/AsyncIO.c345
-rw-r--r--rts/win32/AsyncIO.h25
-rw-r--r--rts/win32/AwaitEvent.c51
-rw-r--r--rts/win32/ConsoleHandler.c313
-rw-r--r--rts/win32/ConsoleHandler.h63
-rw-r--r--rts/win32/GetTime.c101
-rw-r--r--rts/win32/IOManager.c510
-rw-r--r--rts/win32/IOManager.h110
-rw-r--r--rts/win32/OSThreads.c199
-rw-r--r--rts/win32/Ticker.c124
-rw-r--r--rts/win32/WorkQueue.c215
-rw-r--r--rts/win32/WorkQueue.h37
12 files changed, 2093 insertions, 0 deletions
diff --git a/rts/win32/AsyncIO.c b/rts/win32/AsyncIO.c
new file mode 100644
index 0000000000..7bcf571cf8
--- /dev/null
+++ b/rts/win32/AsyncIO.c
@@ -0,0 +1,345 @@
+/* AsyncIO.c
+ *
+ * Integrating Win32 asynchronous I/O with the GHC RTS.
+ *
+ * (c) sof, 2002-2003.
+ */
+#include "Rts.h"
+#include "RtsUtils.h"
+#include <windows.h>
+#include <stdio.h>
+#include "Schedule.h"
+#include "RtsFlags.h"
+#include "Capability.h"
+#include "win32/AsyncIO.h"
+#include "win32/IOManager.h"
+
+/*
+ * Overview:
+ *
+ * Haskell code issue asynchronous I/O requests via the
+ * async{Read,Write,DoOp}# primops. These cause addIORequest()
+ * to be invoked, which forwards the request to the underlying
+ * asynchronous I/O subsystem. Each request is tagged with a unique
+ * ID.
+ *
+ * addIORequest() returns this ID, so that when the blocked CH
+ * thread is added onto blocked_queue, its TSO is annotated with
+ * it. Upon completion of an I/O request, the async I/O handling
+ * code makes a back-call to signal its completion; the local
+ * onIOComplete() routine. It adds the IO request ID (along with
+ * its result data) to a queue of completed requests before returning.
+ *
+ * The queue of completed IO request is read by the thread operating
+ * the RTS scheduler. It de-queues the CH threads corresponding
+ * to the request IDs, making them runnable again.
+ *
+ */
+
+typedef struct CompletedReq {
+ unsigned int reqID;
+ int len;
+ int errCode;
+} CompletedReq;
+
+#define MAX_REQUESTS 200
+
+static CRITICAL_SECTION queue_lock;
+static HANDLE completed_req_event;
+static HANDLE abandon_req_wait;
+static HANDLE wait_handles[2];
+static CompletedReq completedTable[MAX_REQUESTS];
+static int completed_hw;
+static HANDLE completed_table_sema;
+static int issued_reqs;
+
+static void
+onIOComplete(unsigned int reqID,
+ int fd STG_UNUSED,
+ int len,
+ void* buf STG_UNUSED,
+ int errCode)
+{
+ DWORD dwRes;
+ /* Deposit result of request in queue/table..when there's room. */
+ dwRes = WaitForSingleObject(completed_table_sema, INFINITE);
+ switch (dwRes) {
+ case WAIT_OBJECT_0:
+ break;
+ default:
+ /* Not likely */
+ fprintf(stderr, "onIOComplete: failed to grab table semaphore, dropping request 0x%x\n", reqID);
+ fflush(stderr);
+ return;
+ }
+ EnterCriticalSection(&queue_lock);
+ if (completed_hw == MAX_REQUESTS) {
+ /* Shouldn't happen */
+ fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); dropping.\n", reqID);
+ fflush(stderr);
+ } else {
+#if 0
+ fprintf(stderr, "onCompl: %d %d %d %d %d\n",
+ reqID, len, errCode, issued_reqs, completed_hw);
+ fflush(stderr);
+#endif
+ completedTable[completed_hw].reqID = reqID;
+ completedTable[completed_hw].len = len;
+ completedTable[completed_hw].errCode = errCode;
+ completed_hw++;
+ issued_reqs--;
+ if (completed_hw == 1) {
+ /* The event is used to wake up the scheduler thread should it
+ * be blocked waiting for requests to complete. The event resets once
+ * that thread has cleared out the request queue/table.
+ */
+ SetEvent(completed_req_event);
+ }
+ }
+ LeaveCriticalSection(&queue_lock);
+}
+
+unsigned int
+addIORequest(int fd,
+ int forWriting,
+ int isSock,
+ int len,
+ char* buf)
+{
+ EnterCriticalSection(&queue_lock);
+ issued_reqs++;
+ LeaveCriticalSection(&queue_lock);
+#if 0
+ fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
+#endif
+ return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
+}
+
+unsigned int
+addDelayRequest(int msecs)
+{
+ EnterCriticalSection(&queue_lock);
+ issued_reqs++;
+ LeaveCriticalSection(&queue_lock);
+#if 0
+ fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
+#endif
+ return AddDelayRequest(msecs,onIOComplete);
+}
+
+unsigned int
+addDoProcRequest(void* proc, void* param)
+{
+ EnterCriticalSection(&queue_lock);
+ issued_reqs++;
+ LeaveCriticalSection(&queue_lock);
+#if 0
+ fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
+#endif
+ return AddProcRequest(proc,param,onIOComplete);
+}
+
+
+int
+startupAsyncIO()
+{
+ if (!StartIOManager()) {
+ return 0;
+ }
+ InitializeCriticalSection(&queue_lock);
+ /* Create a pair of events:
+ *
+ * - completed_req_event -- signals the deposit of request result; manual reset.
+ * - abandon_req_wait -- external OS thread tells current RTS/Scheduler
+ * thread to abandon wait for IO request completion.
+ * Auto reset.
+ */
+ completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
+ abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL);
+ wait_handles[0] = completed_req_event;
+ wait_handles[1] = abandon_req_wait;
+ completed_hw = 0;
+ if ( !(completed_table_sema = CreateSemaphore (NULL, MAX_REQUESTS, MAX_REQUESTS, NULL)) ) {
+ DWORD rc = GetLastError();
+ fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n", rc);
+ fflush(stderr);
+ }
+
+ return ( completed_req_event != INVALID_HANDLE_VALUE &&
+ abandon_req_wait != INVALID_HANDLE_VALUE &&
+ completed_table_sema != NULL );
+}
+
+void
+shutdownAsyncIO()
+{
+ CloseHandle(completed_req_event);
+ ShutdownIOManager();
+}
+
+/*
+ * Function: awaitRequests(wait)
+ *
+ * Check for the completion of external IO work requests. Worker
+ * threads signal completion of IO requests by depositing them
+ * in a table (completedTable). awaitRequests() matches up
+ * requests in that table with threads on the blocked_queue,
+ * making the threads whose IO requests have completed runnable
+ * again.
+ *
+ * awaitRequests() is called by the scheduler periodically _or_ if
+ * it is out of work, and need to wait for the completion of IO
+ * requests to make further progress. In the latter scenario,
+ * awaitRequests() will simply block waiting for worker threads
+ * to complete if the 'completedTable' is empty.
+ */
+int
+awaitRequests(rtsBool wait)
+{
+#ifndef THREADED_RTS
+ // none of this is actually used in the threaded RTS
+
+start:
+#if 0
+ fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait);
+ fflush(stderr);
+#endif
+ EnterCriticalSection(&queue_lock);
+ /* Nothing immediately available & we won't wait */
+ if ((!wait && completed_hw == 0)
+#if 0
+ // If we just return when wait==rtsFalse, we'll go into a busy
+ // wait loop, so I disabled this condition --SDM 18/12/2003
+ (issued_reqs == 0 && completed_hw == 0)
+#endif
+ ) {
+ LeaveCriticalSection(&queue_lock);
+ return 0;
+ }
+ if (completed_hw == 0) {
+ /* empty table, drop lock and wait */
+ LeaveCriticalSection(&queue_lock);
+ if ( wait && sched_state == SCHED_RUNNING ) {
+ DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
+ switch (dwRes) {
+ case WAIT_OBJECT_0:
+ /* a request was completed */
+ break;
+ case WAIT_OBJECT_0 + 1:
+ case WAIT_TIMEOUT:
+ /* timeout (unlikely) or told to abandon waiting */
+ return 0;
+ case WAIT_FAILED: {
+ DWORD dw = GetLastError();
+ fprintf(stderr, "awaitRequests: wait failed -- error code: %lu\n", dw); fflush(stderr);
+ return 0;
+ }
+ default:
+ fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
+ return 0;
+ }
+ } else {
+ return 0;
+ }
+ goto start;
+ } else {
+ int i;
+ StgTSO *tso, *prev;
+
+ for (i=0; i < completed_hw; i++) {
+ /* For each of the completed requests, match up their Ids
+ * with those of the threads on the blocked_queue. If the
+ * thread that made the IO request has been subsequently
+ * killed (and removed from blocked_queue), no match will
+ * be found for that request Id.
+ *
+ * i.e., killing a Haskell thread doesn't attempt to cancel
+ * the IO request it is blocked on.
+ *
+ */
+ unsigned int rID = completedTable[i].reqID;
+
+ prev = NULL;
+ for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) {
+
+ switch(tso->why_blocked) {
+ case BlockedOnRead:
+ case BlockedOnWrite:
+ case BlockedOnDoProc:
+ if (tso->block_info.async_result->reqID == rID) {
+ /* Found the thread blocked waiting on request; stodgily fill
+ * in its result block.
+ */
+ tso->block_info.async_result->len = completedTable[i].len;
+ tso->block_info.async_result->errCode = completedTable[i].errCode;
+
+ /* Drop the matched TSO from blocked_queue */
+ if (prev) {
+ prev->link = tso->link;
+ } else {
+ blocked_queue_hd = tso->link;
+ }
+ if (blocked_queue_tl == tso) {
+ blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
+ }
+
+ /* Terminates the run queue + this inner for-loop. */
+ tso->link = END_TSO_QUEUE;
+ tso->why_blocked = NotBlocked;
+ pushOnRunQueue(&MainCapability, tso);
+ break;
+ }
+ break;
+ default:
+ if (tso->why_blocked != NotBlocked) {
+ barf("awaitRequests: odd thread state");
+ }
+ break;
+ }
+ }
+ /* Signal that there's completed table slots available */
+ if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) {
+ DWORD dw = GetLastError();
+ fprintf(stderr, "awaitRequests: failed to signal semaphore (error code=0x%x)\n", dw);
+ fflush(stderr);
+ }
+ }
+ completed_hw = 0;
+ ResetEvent(completed_req_event);
+ LeaveCriticalSection(&queue_lock);
+ return 1;
+ }
+#endif /* !THREADED_RTS */
+}
+
+/*
+ * Function: abandonRequestWait()
+ *
+ * Wake up a thread that's blocked waiting for new IO requests
+ * to complete (via awaitRequests().)
+ */
+void
+abandonRequestWait( void )
+{
+ /* the event is auto-reset, but in case there's no thread
+ * already waiting on the event, we want to return it to
+ * a non-signalled state.
+ *
+ * Careful! There is no synchronisation between
+ * abandonRequestWait and awaitRequest, which means that
+ * abandonRequestWait might be called just before a thread
+ * goes into a wait, and we miss the abandon signal. So we
+ * must SetEvent() here rather than PulseEvent() to ensure
+ * that the event isn't lost. We can re-optimise by resetting
+ * the event somewhere safe if we know the event has been
+ * properly serviced (see resetAbandon() below). --SDM 18/12/2003
+ */
+ SetEvent(abandon_req_wait);
+}
+
+void
+resetAbandonRequestWait( void )
+{
+ ResetEvent(abandon_req_wait);
+}
+
diff --git a/rts/win32/AsyncIO.h b/rts/win32/AsyncIO.h
new file mode 100644
index 0000000000..2077ea0cf7
--- /dev/null
+++ b/rts/win32/AsyncIO.h
@@ -0,0 +1,25 @@
+/* AsyncIO.h
+ *
+ * Integrating Win32 asynchronous I/O with the GHC RTS.
+ *
+ * (c) sof, 2002-2003.
+ */
+#ifndef __ASYNCHIO_H__
+#define __ASYNCHIO_H__
+extern unsigned int
+addIORequest(int fd,
+ int forWriting,
+ int isSock,
+ int len,
+ char* buf);
+extern unsigned int addDelayRequest(int msecs);
+extern unsigned int addDoProcRequest(void* proc, void* param);
+extern int startupAsyncIO(void);
+extern void shutdownAsyncIO(void);
+
+extern int awaitRequests(rtsBool wait);
+
+extern void abandonRequestWait(void);
+extern void resetAbandonRequestWait(void);
+
+#endif /* __ASYNCHIO_H__ */
diff --git a/rts/win32/AwaitEvent.c b/rts/win32/AwaitEvent.c
new file mode 100644
index 0000000000..43e188fb34
--- /dev/null
+++ b/rts/win32/AwaitEvent.c
@@ -0,0 +1,51 @@
+#if !defined(THREADED_RTS) /* to the end */
+/*
+ * Wait/check for external events. Periodically, the
+ * Scheduler checks for the completion of external operations,
+ * like the expiration of timers, completion of I/O requests
+ * issued by Haskell threads.
+ *
+ * If the Scheduler is otherwise out of work, it'll block
+ * herein waiting for external events to occur.
+ *
+ * This file mirrors the select()-based functionality
+ * for POSIX / Unix platforms in rts/Select.c, but for
+ * Win32.
+ *
+ */
+#include "Rts.h"
+#include "Schedule.h"
+#include "AwaitEvent.h"
+#include <windows.h>
+#include "win32/AsyncIO.h"
+
+// Used to avoid calling abandonRequestWait() if we don't need to.
+// Protected by sched_mutex.
+static nat workerWaitingForRequests = 0;
+
+void
+awaitEvent(rtsBool wait)
+{
+ int ret;
+
+ do {
+ /* Try to de-queue completed IO requests
+ */
+ workerWaitingForRequests = 1;
+ ret = awaitRequests(wait);
+ workerWaitingForRequests = 0;
+ if (!ret) {
+ return; /* still hold the lock */
+ }
+
+ // Return to the scheduler if:
+ //
+ // - we were interrupted
+ // - new threads have arrived
+
+ } while (wait
+ && sched_state == SCHED_RUNNING
+ && emptyRunQueue(&MainCapability)
+ );
+}
+#endif
diff --git a/rts/win32/ConsoleHandler.c b/rts/win32/ConsoleHandler.c
new file mode 100644
index 0000000000..d7096db632
--- /dev/null
+++ b/rts/win32/ConsoleHandler.c
@@ -0,0 +1,313 @@
+/*
+ * Console control handler support.
+ *
+ */
+#include "Rts.h"
+#include <windows.h>
+#include "ConsoleHandler.h"
+#include "SchedAPI.h"
+#include "Schedule.h"
+#include "RtsUtils.h"
+#include "RtsFlags.h"
+#include "AsyncIO.h"
+#include "RtsSignals.h"
+
+extern int stg_InstallConsoleEvent(int action, StgStablePtr *handler);
+
+static BOOL WINAPI shutdown_handler(DWORD dwCtrlType);
+static BOOL WINAPI generic_handler(DWORD dwCtrlType);
+
+static rtsBool deliver_event = rtsTrue;
+static StgInt console_handler = STG_SIG_DFL;
+
+static HANDLE hConsoleEvent = INVALID_HANDLE_VALUE;
+
+#define N_PENDING_EVENTS 16
+StgInt stg_pending_events = 0; /* number of undelivered events */
+DWORD stg_pending_buf[N_PENDING_EVENTS]; /* their associated event numbers. */
+
+/*
+ * Function: initUserSignals()
+ *
+ * Initialize the console handling substrate.
+ */
+void
+initUserSignals(void)
+{
+ stg_pending_events = 0;
+ console_handler = STG_SIG_DFL;
+ if (hConsoleEvent == INVALID_HANDLE_VALUE) {
+ hConsoleEvent =
+ CreateEvent ( NULL, /* default security attributes */
+ TRUE, /* manual-reset event */
+ FALSE, /* initially non-signalled */
+ NULL); /* no name */
+ }
+ return;
+}
+
+/*
+ * Function: shutdown_handler()
+ *
+ * Local function that performs the default handling of Ctrl+C kind
+ * events; gently shutting down the RTS
+ *
+ * To repeat Signals.c remark -- user code may choose to override the
+ * default handler. Which is fine, assuming they put back the default
+ * handler when/if they de-install the custom handler.
+ *
+ */
+static BOOL WINAPI shutdown_handler(DWORD dwCtrlType)
+{
+ switch (dwCtrlType) {
+
+ case CTRL_CLOSE_EVENT:
+ /* see generic_handler() comment re: this event */
+ return FALSE;
+ case CTRL_C_EVENT:
+ case CTRL_BREAK_EVENT:
+
+ // If we're already trying to interrupt the RTS, terminate with
+ // extreme prejudice. So the first ^C tries to exit the program
+ // cleanly, and the second one just kills it.
+ if (sched_state >= SCHED_INTERRUPTING) {
+ stg_exit(EXIT_INTERRUPTED);
+ } else {
+ interruptStgRts();
+ /* Cheesy pulsing of an event to wake up a waiting RTS thread, if any */
+ abandonRequestWait();
+ resetAbandonRequestWait();
+ }
+ return TRUE;
+
+ /* shutdown + logoff events are not handled here. */
+ default:
+ return FALSE;
+ }
+}
+
+
+/*
+ * Function: initDefaultHandlers()
+ *
+ * Install any default signal/console handlers. Currently we install a
+ * Ctrl+C handler that shuts down the RTS in an orderly manner.
+ */
+void initDefaultHandlers(void)
+{
+ if ( !SetConsoleCtrlHandler(shutdown_handler, TRUE) ) {
+ errorBelch("warning: failed to install default console handler");
+ }
+}
+
+
+/*
+ * Function: blockUserSignals()
+ *
+ * Temporarily block the delivery of further console events. Needed to
+ * avoid race conditions when GCing the stack of outstanding handlers or
+ * when emptying the stack by running the handlers.
+ *
+ */
+void
+blockUserSignals(void)
+{
+ deliver_event = rtsFalse;
+}
+
+
+/*
+ * Function: unblockUserSignals()
+ *
+ * The inverse of blockUserSignals(); re-enable the deliver of console events.
+ */
+void
+unblockUserSignals(void)
+{
+ deliver_event = rtsTrue;
+}
+
+
+/*
+ * Function: awaitUserSignals()
+ *
+ * Wait for the next console event. Currently a NOP (returns immediately.)
+ */
+void awaitUserSignals(void)
+{
+ return;
+}
+
+
+/*
+ * Function: startSignalHandlers()
+ *
+ * Run the handlers associated with the stacked up console events. Console
+ * event delivery is blocked for the duration of this call.
+ */
+void startSignalHandlers(Capability *cap)
+{
+ StgStablePtr handler;
+
+ if (console_handler < 0) {
+ return;
+ }
+
+ blockUserSignals();
+ ACQUIRE_LOCK(&sched_mutex);
+
+ handler = deRefStablePtr((StgStablePtr)console_handler);
+ while (stg_pending_events > 0) {
+ stg_pending_events--;
+ scheduleThread(cap,
+ createIOThread(cap,
+ RtsFlags.GcFlags.initialStkSize,
+ rts_apply(cap,
+ (StgClosure *)handler,
+ rts_mkInt(cap,
+ stg_pending_buf[stg_pending_events]))));
+ }
+
+ RELEASE_LOCK(&sched_mutex);
+ unblockUserSignals();
+}
+
+/*
+ * Function: markSignalHandlers()
+ *
+ * Evacuate the handler stack. _Assumes_ that console event delivery
+ * has already been blocked.
+ */
+void markSignalHandlers (evac_fn evac)
+{
+ if (console_handler >= 0) {
+ StgPtr p = deRefStablePtr((StgStablePtr)console_handler);
+ evac((StgClosure**)(void *)&p);
+ }
+}
+
+
+/*
+ * Function: generic_handler()
+ *
+ * Local function which handles incoming console event (done in a sep OS thread),
+ * recording the event in stg_pending_events.
+ */
+static BOOL WINAPI generic_handler(DWORD dwCtrlType)
+{
+ ACQUIRE_LOCK(&sched_mutex);
+
+ /* Ultra-simple -- up the counter + signal a switch. */
+ switch(dwCtrlType) {
+ case CTRL_CLOSE_EVENT:
+ /* Don't support the delivery of this event; if we
+ * indicate that we've handled it here and the Haskell handler
+ * doesn't take proper action (e.g., terminate the OS process),
+ * the user of the app will be unable to kill/close it. Not
+ * good, so disable the delivery for now.
+ */
+ return FALSE;
+ default:
+ if (!deliver_event) return TRUE;
+
+ if ( stg_pending_events < N_PENDING_EVENTS ) {
+ stg_pending_buf[stg_pending_events] = dwCtrlType;
+ stg_pending_events++;
+ }
+ /* Cheesy pulsing of an event to wake up a waiting RTS thread, if any */
+ abandonRequestWait();
+ resetAbandonRequestWait();
+ return TRUE;
+ }
+
+ RELEASE_LOCK(&sched_mutex);
+}
+
+
+/*
+ * Function: rts_InstallConsoleEvent()
+ *
+ * Install/remove a console event handler.
+ */
+int
+rts_InstallConsoleEvent(int action, StgStablePtr *handler)
+{
+ StgInt previous_hdlr = console_handler;
+
+ switch (action) {
+ case STG_SIG_IGN:
+ console_handler = STG_SIG_IGN;
+ if ( !SetConsoleCtrlHandler(NULL, TRUE) ) {
+ errorBelch("warning: unable to ignore console events");
+ }
+ break;
+ case STG_SIG_DFL:
+ console_handler = STG_SIG_IGN;
+ if ( !SetConsoleCtrlHandler(NULL, FALSE) ) {
+ errorBelch("warning: unable to restore default console event handling");
+ }
+ break;
+ case STG_SIG_HAN:
+ console_handler = (StgInt)*handler;
+ if ( previous_hdlr < 0 ) {
+ /* Only install generic_handler() once */
+ if ( !SetConsoleCtrlHandler(generic_handler, TRUE) ) {
+ errorBelch("warning: unable to install console event handler");
+ }
+ }
+ break;
+ }
+
+ if (previous_hdlr == STG_SIG_DFL ||
+ previous_hdlr == STG_SIG_IGN) {
+ return previous_hdlr;
+ } else {
+ *handler = (StgStablePtr)previous_hdlr;
+ return STG_SIG_HAN;
+ }
+}
+
+/*
+ * Function: rts_HandledConsoleEvent()
+ *
+ * Signal that a Haskell console event handler has completed its run.
+ * The explicit notification that a Haskell handler has completed is
+ * required to better handle the delivery of Ctrl-C/Break events whilst
+ * an async worker thread is handling a read request on stdin. The
+ * Win32 console implementation will abort such a read request when Ctrl-C
+ * is delivered. That leaves the worker thread in a bind: should it
+ * abandon the request (the Haskell thread reading from stdin has been
+ * thrown an exception to signal the delivery of Ctrl-C & hence have
+ * aborted the I/O request) or simply ignore the aborted read and retry?
+ * (the Haskell thread reading from stdin isn't concerned with the
+ * delivery and handling of Ctrl-C.) With both scenarios being
+ * possible, the worker thread needs to be told -- that is, did the
+ * console event handler cause the IO request to be abandoned?
+ *
+ */
+void
+rts_ConsoleHandlerDone(int ev)
+{
+ if ( (DWORD)ev == CTRL_BREAK_EVENT ||
+ (DWORD)ev == CTRL_C_EVENT ) {
+ /* only these two cause stdin system calls to abort.. */
+ SetEvent(hConsoleEvent); /* event is manual-reset */
+ Sleep(0); /* yield */
+ ResetEvent(hConsoleEvent); /* turn it back off again */
+ }
+}
+
+/*
+ * Function: rts_waitConsoleHandlerCompletion()
+ *
+ * Esoteric entry point used by worker thread that got woken
+ * up as part Ctrl-C delivery.
+ */
+int
+rts_waitConsoleHandlerCompletion()
+{
+ /* As long as the worker doesn't need to do a multiple wait,
+ * let's keep this HANDLE private to this 'module'.
+ */
+ return (WaitForSingleObject(hConsoleEvent, INFINITE) == WAIT_OBJECT_0);
+}
diff --git a/rts/win32/ConsoleHandler.h b/rts/win32/ConsoleHandler.h
new file mode 100644
index 0000000000..b09adf71cb
--- /dev/null
+++ b/rts/win32/ConsoleHandler.h
@@ -0,0 +1,63 @@
+/*
+ * Console control handler support.
+ *
+ */
+#ifndef __CONSOLEHANDLER_H__
+#define __CONSOLEHANDLER_H__
+
+/*
+ * Console control handlers lets an application handle Ctrl+C, Ctrl+Break etc.
+ * in Haskell under Win32. Akin to the Unix signal SIGINT.
+ *
+ * The API offered by ConsoleHandler.h is identical to that of the signal handling
+ * code (which isn't supported under win32.) Unsurprisingly, the underlying impl
+ * is derived from the signal handling code also.
+ */
+
+/*
+ * Function: signals_pending()
+ *
+ * Used by the RTS to check whether new signals have been 'recently' reported.
+ * If so, the RTS arranges for the delivered signals to be handled by
+ * de-queueing them from their table, running the associated Haskell
+ * signal handler.
+ */
+extern StgInt stg_pending_events;
+
+#define signals_pending() ( stg_pending_events > 0)
+
+/*
+ * Function: anyUserHandlers()
+ *
+ * Used by the Scheduler to decide whether its worth its while to stick
+ * around waiting for an external signal when there are no threads
+ * runnable. A console handler is used to handle termination events (Ctrl+C)
+ * and isn't considered a 'user handler'.
+ */
+#define anyUserHandlers() (rtsFalse)
+
+/*
+ * Function: startSignalHandlers()
+ *
+ * Run the handlers associated with the queued up console events. Console
+ * event delivery is blocked for the duration of this call.
+ */
+extern void startSignalHandlers(Capability *cap);
+
+/*
+ * Function: handleSignalsInThisThread()
+ *
+ * Have current (OS) thread assume responsibility of handling console events/signals.
+ * Currently not used (by the console event handling code.)
+ */
+extern void handleSignalsInThisThread(void);
+
+/*
+ * Function: rts_waitConsoleHandlerCompletion()
+ *
+ * Esoteric entry point used by worker thread that got woken
+ * up as part Ctrl-C delivery.
+ */
+extern int rts_waitConsoleHandlerCompletion(void);
+
+#endif /* __CONSOLEHANDLER_H__ */
diff --git a/rts/win32/GetTime.c b/rts/win32/GetTime.c
new file mode 100644
index 0000000000..584b994d53
--- /dev/null
+++ b/rts/win32/GetTime.c
@@ -0,0 +1,101 @@
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team 2005
+ *
+ * Machine-dependent time measurement functions
+ *
+ * ---------------------------------------------------------------------------*/
+
+#include "Rts.h"
+#include "GetTime.h"
+
+#include <windows.h>
+
+#ifdef HAVE_TIME_H
+# include <time.h>
+#endif
+
+#define HNS_PER_SEC 10000000LL /* FILETIMES are in units of 100ns */
+/* Convert FILETIMEs into secs */
+
+static INLINE_ME Ticks
+fileTimeToTicks(FILETIME ft)
+{
+ Ticks t;
+ t = ((Ticks)ft.dwHighDateTime << 32) | ft.dwLowDateTime;
+ t = (t * TICKS_PER_SECOND) / HNS_PER_SEC;
+ return t;
+}
+
+static int is_win9x = -1;
+
+static INLINE_ME rtsBool
+isWin9x(void)
+{
+ if (is_win9x < 0) {
+ /* figure out whether we're on a Win9x box or not. */
+ OSVERSIONINFO oi;
+ BOOL b;
+
+ /* Need to init the size field first.*/
+ oi.dwOSVersionInfoSize = sizeof(OSVERSIONINFO);
+ b = GetVersionEx(&oi);
+
+ is_win9x = ( (b && (oi.dwPlatformId & VER_PLATFORM_WIN32_WINDOWS)) ? 1 : 0);
+ }
+ return is_win9x;
+}
+
+
+void
+getProcessTimes(Ticks *user, Ticks *elapsed)
+{
+ *user = getProcessCPUTime();
+ *elapsed = getProcessElapsedTime();
+}
+
+Ticks
+getProcessCPUTime(void)
+{
+ FILETIME creationTime, exitTime, userTime, kernelTime = {0,0};
+
+ if (isWin9x()) return getProcessElapsedTime();
+
+ if (!GetProcessTimes(GetCurrentProcess(), &creationTime,
+ &exitTime, &kernelTime, &userTime)) {
+ return 0;
+ }
+
+ return fileTimeToTicks(userTime);
+}
+
+Ticks
+getProcessElapsedTime(void)
+{
+ FILETIME system_time;
+ GetSystemTimeAsFileTime(&system_time);
+ return fileTimeToTicks(system_time);
+}
+
+Ticks
+getThreadCPUTime(void)
+{
+ FILETIME creationTime, exitTime, userTime, kernelTime = {0,0};
+
+ if (isWin9x()) return getProcessCPUTime();
+
+ if (!GetThreadTimes(GetCurrentThread(), &creationTime,
+ &exitTime, &kernelTime, &userTime)) {
+ return 0;
+ }
+
+ return fileTimeToTicks(userTime);
+}
+
+nat
+getPageFaults(void)
+{
+ /* ToDo (on NT): better, get this via the performance data
+ that's stored in the registry. */
+ return 0;
+}
diff --git a/rts/win32/IOManager.c b/rts/win32/IOManager.c
new file mode 100644
index 0000000000..a67c3504c1
--- /dev/null
+++ b/rts/win32/IOManager.c
@@ -0,0 +1,510 @@
+/* IOManager.c
+ *
+ * Non-blocking / asynchronous I/O for Win32.
+ *
+ * (c) sof, 2002-2003.
+ */
+#include "Rts.h"
+#include "IOManager.h"
+#include "WorkQueue.h"
+#include "ConsoleHandler.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <io.h>
+#include <winsock.h>
+#include <process.h>
+
+/*
+ * Internal state maintained by the IO manager.
+ */
+typedef struct IOManagerState {
+ CritSection manLock;
+ WorkQueue* workQueue;
+ int queueSize;
+ int numWorkers;
+ int workersIdle;
+ HANDLE hExitEvent;
+ unsigned int requestID;
+ /* fields for keeping track of active WorkItems */
+ CritSection active_work_lock;
+ WorkItem* active_work_items;
+} IOManagerState;
+
+/* ToDo: wrap up this state via a IOManager handle instead? */
+static IOManagerState* ioMan;
+
+static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi);
+static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
+
+/*
+ * The routine executed by each worker thread.
+ */
+static
+unsigned
+WINAPI
+IOWorkerProc(PVOID param)
+{
+ HANDLE hWaits[2];
+ DWORD rc;
+ IOManagerState* iom = (IOManagerState*)param;
+ WorkQueue* pq = iom->workQueue;
+ WorkItem* work;
+ int len = 0, fd = 0;
+ DWORD errCode = 0;
+ void* complData;
+
+ hWaits[0] = (HANDLE)iom->hExitEvent;
+ hWaits[1] = GetWorkQueueHandle(pq);
+
+ while (1) {
+ /* The error code is communicated back on completion of request; reset. */
+ errCode = 0;
+
+ EnterCriticalSection(&iom->manLock);
+ /* Signal that the worker is idle.
+ *
+ * 'workersIdle' is used when determining whether or not to
+ * increase the worker thread pool when adding a new request.
+ * (see addIORequest().)
+ */
+ iom->workersIdle++;
+ LeaveCriticalSection(&iom->manLock);
+
+ /*
+ * A possible future refinement is to make long-term idle threads
+ * wake up and decide to shut down should the number of idle threads
+ * be above some threshold.
+ *
+ */
+ rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
+
+ if (rc == WAIT_OBJECT_0) {
+ // we received the exit event
+ return 0;
+ }
+
+ EnterCriticalSection(&iom->manLock);
+ /* Signal that the thread is 'non-idle' and about to consume
+ * a work item.
+ */
+ iom->workersIdle--;
+ iom->queueSize--;
+ LeaveCriticalSection(&iom->manLock);
+
+ if ( rc == (WAIT_OBJECT_0 + 1) ) {
+ /* work item available, fetch it. */
+ if (FetchWork(pq,(void**)&work)) {
+ work->abandonOp = 0;
+ RegisterWorkItem(iom,work);
+ if ( work->workKind & WORKER_READ ) {
+ if ( work->workKind & WORKER_FOR_SOCKET ) {
+ len = recv(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len,
+ 0);
+ if (len == SOCKET_ERROR) {
+ errCode = WSAGetLastError();
+ }
+ } else {
+ while (1) {
+ /* Do the read(), with extra-special handling for Ctrl+C */
+ len = read(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len);
+ if ( len == 0 && work->workData.ioData.len != 0 ) {
+ /* Given the following scenario:
+ * - a console handler has been registered that handles Ctrl+C
+ * events.
+ * - we've not tweaked the 'console mode' settings to turn on
+ * ENABLE_PROCESSED_INPUT.
+ * - we're blocked waiting on input from standard input.
+ * - the user hits Ctrl+C.
+ *
+ * The OS will invoke the console handler (in a separate OS thread),
+ * and the above read() (i.e., under the hood, a ReadFile() op) returns
+ * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
+ * want to percolate this error condition back to the Haskell user.
+ * Do this by waiting for the completion of the Haskell console handler.
+ * If upon completion of the console handler routine, the Haskell thread
+ * that issued the request is found to have been thrown an exception,
+ * the worker abandons the request (since that's what the Haskell thread
+ * has done.) If the Haskell thread hasn't been interrupted, the worker
+ * retries the read request as if nothing happened.
+ */
+ if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
+ /* For now, only abort when dealing with the standard input handle.
+ * i.e., for all others, an error is raised.
+ */
+ HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
+ if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
+ if (rts_waitConsoleHandlerCompletion()) {
+ /* If the Scheduler has set work->abandonOp, the Haskell thread has
+ * been thrown an exception (=> the worker must abandon this request.)
+ * We test for this below before invoking the on-completion routine.
+ */
+ if (work->abandonOp) {
+ break;
+ } else {
+ continue;
+ }
+ }
+ } else {
+ break; /* Treat it like an error */
+ }
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ if (len == -1) { errCode = errno; }
+ }
+ complData = work->workData.ioData.buf;
+ fd = work->workData.ioData.fd;
+ } else if ( work->workKind & WORKER_WRITE ) {
+ if ( work->workKind & WORKER_FOR_SOCKET ) {
+ len = send(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len,
+ 0);
+ if (len == SOCKET_ERROR) {
+ errCode = WSAGetLastError();
+ }
+ } else {
+ len = write(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len);
+ if (len == -1) { errCode = errno; }
+ }
+ complData = work->workData.ioData.buf;
+ fd = work->workData.ioData.fd;
+ } else if ( work->workKind & WORKER_DELAY ) {
+ /* Approximate implementation of threadDelay;
+ *
+ * Note: Sleep() is in milliseconds, not micros.
+ */
+ Sleep(work->workData.delayData.msecs / 1000);
+ len = work->workData.delayData.msecs;
+ complData = NULL;
+ fd = 0;
+ errCode = 0;
+ } else if ( work->workKind & WORKER_DO_PROC ) {
+ /* perform operation/proc on behalf of Haskell thread. */
+ if (work->workData.procData.proc) {
+ /* The procedure is assumed to encode result + success/failure
+ * via its param.
+ */
+ errCode=work->workData.procData.proc(work->workData.procData.param);
+ } else {
+ errCode=1;
+ }
+ complData = work->workData.procData.param;
+ } else {
+ fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
+ fflush(stderr);
+ continue;
+ }
+ if (!work->abandonOp) {
+ work->onCompletion(work->requestID,
+ fd,
+ len,
+ complData,
+ errCode);
+ }
+ /* Free the WorkItem */
+ DeregisterWorkItem(iom,work);
+ free(work);
+ } else {
+ fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
+ return 1;
+ }
+ } else {
+ fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
+ return 1;
+ }
+ }
+ return 0;
+}
+
+static
+BOOL
+NewIOWorkerThread(IOManagerState* iom)
+{
+ unsigned threadId;
+ return ( 0 != _beginthreadex(NULL,
+ 0,
+ IOWorkerProc,
+ (LPVOID)iom,
+ 0,
+ &threadId) );
+}
+
+BOOL
+StartIOManager(void)
+{
+ HANDLE hExit;
+ WorkQueue* wq;
+
+ wq = NewWorkQueue();
+ if ( !wq ) return FALSE;
+
+ ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
+
+ if (!ioMan) {
+ FreeWorkQueue(wq);
+ return FALSE;
+ }
+
+ /* A manual-reset event */
+ hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
+ if ( !hExit ) {
+ FreeWorkQueue(wq);
+ free(ioMan);
+ return FALSE;
+ }
+
+ ioMan->hExitEvent = hExit;
+ InitializeCriticalSection(&ioMan->manLock);
+ ioMan->workQueue = wq;
+ ioMan->numWorkers = 0;
+ ioMan->workersIdle = 0;
+ ioMan->queueSize = 0;
+ ioMan->requestID = 1;
+ InitializeCriticalSection(&ioMan->active_work_lock);
+ ioMan->active_work_items = NULL;
+
+ return TRUE;
+}
+
+/*
+ * Function: depositWorkItem()
+ *
+ * Local function which deposits a WorkItem onto a work queue,
+ * deciding in the process whether or not the thread pool needs
+ * to be augmented with another thread to handle the new request.
+ *
+ */
+static
+int
+depositWorkItem( unsigned int reqID,
+ WorkItem* wItem )
+{
+ EnterCriticalSection(&ioMan->manLock);
+
+#if 0
+ fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers);
+ fflush(stderr);
+#endif
+ /* A new worker thread is created when there are fewer idle threads
+ * than non-consumed queue requests. This ensures that requests will
+ * be dealt with in a timely manner.
+ *
+ * [Long explanation of why the previous thread pool policy lead to
+ * trouble]
+ *
+ * Previously, the thread pool was augmented iff no idle worker threads
+ * were available. That strategy runs the risk of repeatedly adding to
+ * the request queue without expanding the thread pool to handle this
+ * sudden spike in queued requests.
+ * [How? Assume workersIdle is 1, and addIORequest() is called. No new
+ * thread is created and the request is simply queued. If addIORequest()
+ * is called again _before the OS schedules a worker thread to pull the
+ * request off the queue_, workersIdle is still 1 and another request is
+ * simply added to the queue. Once the worker thread is run, only one
+ * request is de-queued, leaving the 2nd request in the queue]
+ *
+ * Assuming none of the queued requests take an inordinate amount of to
+ * complete, the request queue would eventually be drained. But if that's
+ * not the case, the later requests will end up languishing in the queue
+ * indefinitely. The non-timely handling of requests may cause CH applications
+ * to misbehave / hang; bad.
+ *
+ */
+ ioMan->queueSize++;
+ if ( (ioMan->workersIdle < ioMan->queueSize) ) {
+ /* see if giving up our quantum ferrets out some idle threads.
+ */
+ LeaveCriticalSection(&ioMan->manLock);
+ Sleep(0);
+ EnterCriticalSection(&ioMan->manLock);
+ if ( (ioMan->workersIdle < ioMan->queueSize) ) {
+ /* No, go ahead and create another. */
+ ioMan->numWorkers++;
+ LeaveCriticalSection(&ioMan->manLock);
+ NewIOWorkerThread(ioMan);
+ } else {
+ LeaveCriticalSection(&ioMan->manLock);
+ }
+ } else {
+ LeaveCriticalSection(&ioMan->manLock);
+ }
+
+ if (SubmitWork(ioMan->workQueue,wItem)) {
+ /* Note: the work item has potentially been consumed by a worker thread
+ * (and freed) at this point, so we cannot use wItem's requestID.
+ */
+ return reqID;
+ } else {
+ return 0;
+ }
+}
+
+/*
+ * Function: AddIORequest()
+ *
+ * Conduit to underlying WorkQueue's SubmitWork(); adds IO
+ * request to work queue, deciding whether or not to augment
+ * the thread pool in the process.
+ */
+int
+AddIORequest ( int fd,
+ BOOL forWriting,
+ BOOL isSocket,
+ int len,
+ char* buffer,
+ CompletionProc onCompletion)
+{
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ unsigned int reqID = ioMan->requestID++;
+ if (!ioMan || !wItem) return 0;
+
+ /* Fill in the blanks */
+ wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
+ ( forWriting ? WORKER_WRITE : WORKER_READ );
+ wItem->workData.ioData.fd = fd;
+ wItem->workData.ioData.len = len;
+ wItem->workData.ioData.buf = buffer;
+ wItem->link = NULL;
+
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = reqID;
+
+ return depositWorkItem(reqID, wItem);
+}
+
+/*
+ * Function: AddDelayRequest()
+ *
+ * Like AddIORequest(), but this time adding a delay request to
+ * the request queue.
+ */
+BOOL
+AddDelayRequest ( unsigned int msecs,
+ CompletionProc onCompletion)
+{
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ unsigned int reqID = ioMan->requestID++;
+ if (!ioMan || !wItem) return FALSE;
+
+ /* Fill in the blanks */
+ wItem->workKind = WORKER_DELAY;
+ wItem->workData.delayData.msecs = msecs;
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = reqID;
+ wItem->link = NULL;
+
+ return depositWorkItem(reqID, wItem);
+}
+
+/*
+ * Function: AddProcRequest()
+ *
+ * Add an asynchronous procedure request.
+ */
+BOOL
+AddProcRequest ( void* proc,
+ void* param,
+ CompletionProc onCompletion)
+{
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ unsigned int reqID = ioMan->requestID++;
+ if (!ioMan || !wItem) return FALSE;
+
+ /* Fill in the blanks */
+ wItem->workKind = WORKER_DO_PROC;
+ wItem->workData.procData.proc = proc;
+ wItem->workData.procData.param = param;
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = reqID;
+ wItem->abandonOp = 0;
+ wItem->link = NULL;
+
+ return depositWorkItem(reqID, wItem);
+}
+
+void ShutdownIOManager ( void )
+{
+ SetEvent(ioMan->hExitEvent);
+ // ToDo: we can't free this now, because the worker thread(s)
+ // haven't necessarily finished with it yet. Perhaps it should
+ // have a reference count or something.
+ // free(ioMan);
+ // ioMan = NULL;
+}
+
+/* Keep track of WorkItems currently being serviced. */
+static
+void
+RegisterWorkItem(IOManagerState* ioMan,
+ WorkItem* wi)
+{
+ EnterCriticalSection(&ioMan->active_work_lock);
+ wi->link = ioMan->active_work_items;
+ ioMan->active_work_items = wi;
+ LeaveCriticalSection(&ioMan->active_work_lock);
+}
+
+static
+void
+DeregisterWorkItem(IOManagerState* ioMan,
+ WorkItem* wi)
+{
+ WorkItem *ptr, *prev;
+
+ EnterCriticalSection(&ioMan->active_work_lock);
+ for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
+ if (wi->requestID == ptr->requestID) {
+ if (prev==NULL) {
+ ioMan->active_work_items = ptr->link;
+ } else {
+ prev->link = ptr->link;
+ }
+ LeaveCriticalSection(&ioMan->active_work_lock);
+ return;
+ }
+ }
+ fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
+ LeaveCriticalSection(&ioMan->active_work_lock);
+}
+
+
+/*
+ * Function: abandonWorkRequest()
+ *
+ * Signal that a work request isn't of interest. Called by the Scheduler
+ * if a blocked Haskell thread has an exception thrown to it.
+ *
+ * Note: we're not aborting the system call that a worker might be blocked on
+ * here, just disabling the propagation of its result once its finished. We
+ * may have to go the whole hog here and switch to overlapped I/O so that we
+ * can abort blocked system calls.
+ */
+void
+abandonWorkRequest ( int reqID )
+{
+ WorkItem *ptr;
+ EnterCriticalSection(&ioMan->active_work_lock);
+ for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
+ if (ptr->requestID == (unsigned int)reqID ) {
+ ptr->abandonOp = 1;
+ LeaveCriticalSection(&ioMan->active_work_lock);
+ return;
+ }
+ }
+ /* Note: if the request ID isn't present, the worker will have
+ * finished sometime since awaitRequests() last drained the completed
+ * request table; i.e., not an error.
+ */
+ LeaveCriticalSection(&ioMan->active_work_lock);
+}
diff --git a/rts/win32/IOManager.h b/rts/win32/IOManager.h
new file mode 100644
index 0000000000..4893e2387c
--- /dev/null
+++ b/rts/win32/IOManager.h
@@ -0,0 +1,110 @@
+/* IOManager.h
+ *
+ * Non-blocking / asynchronous I/O for Win32.
+ *
+ * (c) sof, 2002-2003
+ */
+#ifndef __IOMANAGER_H__
+#define __IOMANAGER_H__
+/* On the yucky side..suppress -Wmissing-declarations warnings when
+ * including <windows.h>
+ */
+extern void* GetCurrentFiber ( void );
+extern void* GetFiberData ( void );
+#include <windows.h>
+
+/*
+ The IOManager subsystem provides a non-blocking view
+ of I/O operations. It lets one (or more) OS thread(s)
+ issue multiple I/O requests, which the IOManager then
+ handles independently of/concurrent to the thread(s)
+ that issued the request. Upon completion, the issuing
+ thread can inspect the result of the I/O operation &
+ take appropriate action.
+
+ The IOManager is intended used with the GHC RTS to
+ implement non-blocking I/O in Concurrent Haskell.
+ */
+
+/*
+ * Our WorkQueue holds WorkItems, encoding IO and
+ * delay requests.
+ *
+ */
+typedef void (*CompletionProc)(unsigned int requestID,
+ int fd,
+ int len,
+ void* buf,
+ int errCode);
+
+/*
+ * Asynchronous procedure calls executed by a worker thread
+ * take a generic state argument pointer and return an int by
+ * default.
+ */
+typedef int (*DoProcProc)(void *param);
+
+typedef union workData {
+ struct {
+ int fd;
+ int len;
+ char *buf;
+ } ioData;
+ struct {
+ int msecs;
+ } delayData;
+ struct {
+ DoProcProc proc;
+ void* param;
+ } procData;
+} WorkData;
+
+typedef struct WorkItem {
+ unsigned int workKind;
+ WorkData workData;
+ unsigned int requestID;
+ CompletionProc onCompletion;
+ unsigned int abandonOp;
+ struct WorkItem *link;
+} WorkItem;
+
+extern CompletionProc onComplete;
+
+/* the kind of operations supported; you could easily imagine
+ * that instead of passing a tag describing the work to be performed,
+ * a function pointer is passed instead. Maybe later.
+ */
+#define WORKER_READ 1
+#define WORKER_WRITE 2
+#define WORKER_DELAY 4
+#define WORKER_FOR_SOCKET 8
+#define WORKER_DO_PROC 16
+
+/*
+ * Starting up and shutting down.
+ */
+extern BOOL StartIOManager ( void );
+extern void ShutdownIOManager ( void );
+
+/*
+ * Adding I/O and delay requests. With each request a
+ * completion routine is supplied, which the worker thread
+ * will invoke upon completion.
+ */
+extern int AddDelayRequest ( unsigned int msecs,
+ CompletionProc onCompletion);
+
+extern int AddIORequest ( int fd,
+ BOOL forWriting,
+ BOOL isSocket,
+ int len,
+ char* buffer,
+ CompletionProc onCompletion);
+
+extern int AddProcRequest ( void* proc,
+ void* data,
+ CompletionProc onCompletion);
+
+extern void abandonWorkRequest ( int reqID );
+
+#endif /* __IOMANAGER_H__ */
diff --git a/rts/win32/OSThreads.c b/rts/win32/OSThreads.c
new file mode 100644
index 0000000000..c772be38f4
--- /dev/null
+++ b/rts/win32/OSThreads.c
@@ -0,0 +1,199 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2001-2005
+ *
+ * Accessing OS threads functionality in a (mostly) OS-independent
+ * manner.
+ *
+ * --------------------------------------------------------------------------*/
+
+#include "Rts.h"
+#if defined(THREADED_RTS)
+#include "OSThreads.h"
+#include "RtsUtils.h"
+
+/* For reasons not yet clear, the entire contents of process.h is protected
+ * by __STRICT_ANSI__ not being defined.
+ */
+#undef __STRICT_ANSI__
+#include <process.h>
+
+/* Win32 threads and synchronisation objects */
+
+/* A Condition is represented by a Win32 Event object;
+ * a Mutex by a Mutex kernel object.
+ *
+ * ToDo: go through the defn and usage of these to
+ * make sure the semantics match up with that of
+ * the (assumed) pthreads behaviour. This is really
+ * just a first pass at getting something compilable.
+ */
+
+void
+initCondition( Condition* pCond )
+{
+ HANDLE h = CreateEvent(NULL,
+ FALSE, /* auto reset */
+ FALSE, /* initially not signalled */
+ NULL); /* unnamed => process-local. */
+
+ if ( h == NULL ) {
+ errorBelch("initCondition: unable to create");
+ }
+ *pCond = h;
+ return;
+}
+
+void
+closeCondition( Condition* pCond )
+{
+ if ( CloseHandle(*pCond) == 0 ) {
+ errorBelch("closeCondition: failed to close");
+ }
+ return;
+}
+
+rtsBool
+broadcastCondition ( Condition* pCond )
+{
+ PulseEvent(*pCond);
+ return rtsTrue;
+}
+
+rtsBool
+signalCondition ( Condition* pCond )
+{
+ if (SetEvent(*pCond) == 0) {
+ barf("SetEvent: %d", GetLastError());
+ }
+ return rtsTrue;
+}
+
+rtsBool
+waitCondition ( Condition* pCond, Mutex* pMut )
+{
+ RELEASE_LOCK(pMut);
+ WaitForSingleObject(*pCond, INFINITE);
+ /* Hmm..use WaitForMultipleObjects() ? */
+ ACQUIRE_LOCK(pMut);
+ return rtsTrue;
+}
+
+void
+yieldThread()
+{
+ Sleep(0);
+ return;
+}
+
+void
+shutdownThread()
+{
+ _endthreadex(0);
+}
+
+int
+createOSThread (OSThreadId* pId, OSThreadProc *startProc, void *param)
+{
+
+ return (_beginthreadex ( NULL, /* default security attributes */
+ 0,
+ (unsigned (__stdcall *)(void *)) startProc,
+ param,
+ 0,
+ (unsigned*)pId) == 0);
+}
+
+OSThreadId
+osThreadId()
+{
+ return GetCurrentThreadId();
+}
+
+#ifdef USE_CRITICAL_SECTIONS
+void
+initMutex (Mutex* pMut)
+{
+ InitializeCriticalSectionAndSpinCount(pMut,4000);
+}
+#else
+void
+initMutex (Mutex* pMut)
+{
+ HANDLE h = CreateMutex ( NULL, /* default sec. attributes */
+ FALSE, /* not owned => initially signalled */
+ NULL
+ );
+ *pMut = h;
+ return;
+}
+#endif
+
+void
+newThreadLocalKey (ThreadLocalKey *key)
+{
+ DWORD r;
+ r = TlsAlloc();
+ if (r == TLS_OUT_OF_INDEXES) {
+ barf("newThreadLocalKey: out of keys");
+ }
+ *key = r;
+}
+
+void *
+getThreadLocalVar (ThreadLocalKey *key)
+{
+ void *r;
+ r = TlsGetValue(*key);
+#ifdef DEBUG
+ // r is allowed to be NULL - it can mean that either there was an
+ // error or the stored value is in fact NULL.
+ if (GetLastError() != NO_ERROR) {
+ barf("getThreadLocalVar: key not found");
+ }
+#endif
+ return r;
+}
+
+void
+setThreadLocalVar (ThreadLocalKey *key, void *value)
+{
+ BOOL b;
+ b = TlsSetValue(*key, value);
+ if (!b) {
+ barf("setThreadLocalVar: %d", GetLastError());
+ }
+}
+
+
+static unsigned __stdcall
+forkOS_createThreadWrapper ( void * entry )
+{
+ Capability *cap;
+ cap = rts_lock();
+ cap = rts_evalStableIO(cap, (HsStablePtr) entry, NULL);
+ rts_unlock(cap);
+ return 0;
+}
+
+int
+forkOS_createThread ( HsStablePtr entry )
+{
+ unsigned long pId;
+ return (_beginthreadex ( NULL, /* default security attributes */
+ 0,
+ forkOS_createThreadWrapper,
+ (void*)entry,
+ 0,
+ (unsigned*)&pId) == 0);
+}
+
+#else /* !defined(THREADED_RTS) */
+
+int
+forkOS_createThread ( HsStablePtr entry STG_UNUSED )
+{
+ return -1;
+}
+
+#endif /* !defined(THREADED_RTS) */
diff --git a/rts/win32/Ticker.c b/rts/win32/Ticker.c
new file mode 100644
index 0000000000..ab791d8dc7
--- /dev/null
+++ b/rts/win32/Ticker.c
@@ -0,0 +1,124 @@
+/*
+ * RTS periodic timers.
+ *
+ */
+#include "Rts.h"
+#include "Timer.h"
+#include "Ticker.h"
+#include <windows.h>
+#include <stdio.h>
+#include <process.h>
+#include "OSThreads.h"
+
+/*
+ * Provide a timer service for the RTS, periodically
+ * notifying it that a number of 'ticks' has passed.
+ *
+ */
+
+/* To signal shutdown of the timer service, we use a local
+ * event which the timer thread listens to (and stopVirtTimer()
+ * signals.)
+ */
+static HANDLE hStopEvent = INVALID_HANDLE_VALUE;
+static HANDLE tickThread = INVALID_HANDLE_VALUE;
+
+static TickProc tickProc = NULL;
+
+/*
+ * Ticking is done by a separate thread which periodically
+ * wakes up to handle a tick.
+ *
+ * This is the portable way of providing a timer service under
+ * Win32; features like waitable timers or timer queues are only
+ * supported by a subset of the Win32 platforms (notably not
+ * under Win9x.)
+ *
+ */
+static
+unsigned
+WINAPI
+TimerProc(PVOID param)
+{
+ int ms = (int)param;
+ DWORD waitRes;
+
+ /* interpret a < 0 timeout period as 'instantaneous' */
+ if (ms < 0) ms = 0;
+
+ while (1) {
+ waitRes = WaitForSingleObject(hStopEvent, ms);
+
+ switch (waitRes) {
+ case WAIT_OBJECT_0:
+ /* event has become signalled */
+ tickProc = NULL;
+ CloseHandle(hStopEvent);
+ return 0;
+ case WAIT_TIMEOUT:
+ /* tick */
+ tickProc(0);
+ break;
+ case WAIT_FAILED: {
+ DWORD dw = GetLastError();
+ fprintf(stderr, "TimerProc: wait failed -- error code: %lu\n", dw); fflush(stderr);
+ break;
+ }
+ default:
+ fprintf(stderr, "TimerProc: unexpected result %lu\n", waitRes); fflush(stderr);
+ break;
+ }
+ }
+ return 0;
+}
+
+
+int
+startTicker(nat ms, TickProc handle_tick)
+{
+ unsigned threadId;
+ /* 'hStopEvent' is a manual-reset event that's signalled upon
+ * shutdown of timer service (=> timer thread.)
+ */
+ hStopEvent = CreateEvent ( NULL,
+ TRUE,
+ FALSE,
+ NULL);
+ if (hStopEvent == INVALID_HANDLE_VALUE) {
+ return 0;
+ }
+ tickProc = handle_tick;
+ tickThread = (HANDLE)(long)_beginthreadex( NULL,
+ 0,
+ TimerProc,
+ (LPVOID)ms,
+ 0,
+ &threadId);
+ return (tickThread != 0);
+}
+
+int
+stopTicker(void)
+{
+ // We must wait for the ticker thread to terminate, since if we
+ // are in a DLL that is about to be unloaded, the ticker thread
+ // cannot be allowed to return to a missing DLL.
+
+ if (hStopEvent != INVALID_HANDLE_VALUE &&
+ tickThread != INVALID_HANDLE_VALUE) {
+ DWORD exitCode;
+ SetEvent(hStopEvent);
+ while (1) {
+ WaitForSingleObject(tickThread, 20);
+ if (!GetExitCodeThread(tickThread, &exitCode)) {
+ return 1;
+ }
+ if (exitCode != STILL_ACTIVE) {
+ tickThread = INVALID_HANDLE_VALUE;
+ return 0;
+ }
+ TerminateThread(tickThread, 0);
+ }
+ }
+ return 0;
+}
diff --git a/rts/win32/WorkQueue.c b/rts/win32/WorkQueue.c
new file mode 100644
index 0000000000..85a23608be
--- /dev/null
+++ b/rts/win32/WorkQueue.c
@@ -0,0 +1,215 @@
+/*
+ * A fixed-size queue; MT-friendly.
+ *
+ * (c) sof, 2002-2003.
+ */
+#include "WorkQueue.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+static void queue_error_rc( char* loc, DWORD err);
+static void queue_error( char* loc, char* reason);
+
+
+/* Wrapper around OS call to create semaphore */
+static Semaphore
+newSemaphore(int initCount, int max)
+{
+ Semaphore s;
+ s = CreateSemaphore ( NULL, /* LPSECURITY_ATTRIBUTES (default) */
+ initCount, /* LONG lInitialCount */
+ max, /* LONG lMaxCount */
+ NULL); /* LPCTSTR (anonymous / no object name) */
+ if ( NULL == s) {
+ queue_error_rc("newSemaphore", GetLastError());
+ return NULL;
+ }
+ return s;
+}
+
+/*
+ * Function: NewWorkQueue
+ *
+ * The queue constructor - semaphores are initialised to match
+ * max number of queue entries.
+ *
+ */
+WorkQueue*
+NewWorkQueue()
+{
+ WorkQueue* wq = (WorkQueue*)malloc(sizeof(WorkQueue));
+
+ if (!wq) {
+ queue_error("NewWorkQueue", "malloc() failed");
+ return wq;
+ }
+
+ wq->head = 0;
+ wq->tail = 0;
+
+ InitializeCriticalSection(&wq->queueLock);
+ wq->workAvailable = newSemaphore(0, WORKQUEUE_SIZE);
+ wq->roomAvailable = newSemaphore(WORKQUEUE_SIZE, WORKQUEUE_SIZE);
+
+ /* Fail if we were unable to create any of the sync objects. */
+ if ( NULL == wq->workAvailable ||
+ NULL == wq->roomAvailable ) {
+ FreeWorkQueue(wq);
+ return NULL;
+ }
+
+ return wq;
+}
+
+void
+FreeWorkQueue ( WorkQueue* pq )
+{
+ /* Close the semaphores; any threads blocked waiting
+ * on either will as a result be woken up.
+ */
+ if ( pq->workAvailable ) {
+ CloseHandle(pq->workAvailable);
+ }
+ if ( pq->roomAvailable ) {
+ CloseHandle(pq->workAvailable);
+ }
+ free(pq);
+ return;
+}
+
+HANDLE
+GetWorkQueueHandle ( WorkQueue* pq )
+{
+ if (!pq) return NULL;
+
+ return pq->workAvailable;
+}
+
+/*
+ * Function: GetWork
+ *
+ * Fetch a work item from the queue, blocking if none available.
+ * Return value indicates of FALSE indicates error/fatal condition.
+ */
+BOOL
+GetWork ( WorkQueue* pq, void** ppw )
+{
+ DWORD rc;
+
+ if (!pq) {
+ queue_error("GetWork", "NULL WorkQueue object");
+ return FALSE;
+ }
+ if (!ppw) {
+ queue_error("GetWork", "NULL WorkItem object");
+ return FALSE;
+ }
+
+ /* Block waiting for work item to become available */
+ if ( (rc = WaitForSingleObject( pq->workAvailable, INFINITE)) != WAIT_OBJECT_0 ) {
+ queue_error_rc("GetWork.WaitForSingleObject(workAvailable)",
+ ( (WAIT_FAILED == rc) ? GetLastError() : rc));
+ return FALSE;
+ }
+
+ return FetchWork(pq,ppw);
+}
+
+/*
+ * Function: FetchWork
+ *
+ * Fetch a work item from the queue, blocking if none available.
+ * Return value indicates of FALSE indicates error/fatal condition.
+ */
+BOOL
+FetchWork ( WorkQueue* pq, void** ppw )
+{
+ DWORD rc;
+
+ if (!pq) {
+ queue_error("FetchWork", "NULL WorkQueue object");
+ return FALSE;
+ }
+ if (!ppw) {
+ queue_error("FetchWork", "NULL WorkItem object");
+ return FALSE;
+ }
+
+ EnterCriticalSection(&pq->queueLock);
+ *ppw = pq->items[pq->head];
+ /* For sanity's sake, zero out the pointer. */
+ pq->items[pq->head] = NULL;
+ pq->head = (pq->head + 1) % WORKQUEUE_SIZE;
+ rc = ReleaseSemaphore(pq->roomAvailable,1, NULL);
+ LeaveCriticalSection(&pq->queueLock);
+ if ( 0 == rc ) {
+ queue_error_rc("FetchWork.ReleaseSemaphore()", GetLastError());
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+/*
+ * Function: SubmitWork
+ *
+ * Add work item to the queue, blocking if no room available.
+ * Return value indicates of FALSE indicates error/fatal condition.
+ */
+BOOL
+SubmitWork ( WorkQueue* pq, void* pw )
+{
+ DWORD rc;
+
+ if (!pq) {
+ queue_error("SubmitWork", "NULL WorkQueue object");
+ return FALSE;
+ }
+ if (!pw) {
+ queue_error("SubmitWork", "NULL WorkItem object");
+ return FALSE;
+ }
+
+ /* Block waiting for work item to become available */
+ if ( (rc = WaitForSingleObject( pq->roomAvailable, INFINITE)) != WAIT_OBJECT_0 ) {
+ queue_error_rc("SubmitWork.WaitForSingleObject(workAvailable)",
+ ( (WAIT_FAILED == rc) ? GetLastError() : rc));
+
+ return FALSE;
+ }
+
+ EnterCriticalSection(&pq->queueLock);
+ pq->items[pq->tail] = pw;
+ pq->tail = (pq->tail + 1) % WORKQUEUE_SIZE;
+ rc = ReleaseSemaphore(pq->workAvailable,1, NULL);
+ LeaveCriticalSection(&pq->queueLock);
+ if ( 0 == rc ) {
+ queue_error_rc("SubmitWork.ReleaseSemaphore()", GetLastError());
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+/* Error handling */
+
+static void
+queue_error_rc( char* loc,
+ DWORD err)
+{
+ fprintf(stderr, "%s failed: return code = 0x%lx\n", loc, err);
+ fflush(stderr);
+ return;
+}
+
+
+static void
+queue_error( char* loc,
+ char* reason)
+{
+ fprintf(stderr, "%s failed: %s\n", loc, reason);
+ fflush(stderr);
+ return;
+}
+
diff --git a/rts/win32/WorkQueue.h b/rts/win32/WorkQueue.h
new file mode 100644
index 0000000000..bde82a3a77
--- /dev/null
+++ b/rts/win32/WorkQueue.h
@@ -0,0 +1,37 @@
+/* WorkQueue.h
+ *
+ * A fixed-size queue; MT-friendly.
+ *
+ * (c) sof, 2002-2003
+ *
+ */
+#ifndef __WORKQUEUE_H__
+#define __WORKQUEUE_H__
+#include <windows.h>
+
+/* This is a fixed-size queue. */
+#define WORKQUEUE_SIZE 16
+
+typedef HANDLE Semaphore;
+typedef CRITICAL_SECTION CritSection;
+
+typedef struct WorkQueue {
+ /* the master lock, need to be grabbed prior to
+ using any of the other elements of the struct. */
+ CritSection queueLock;
+ /* consumers/workers block waiting for 'workAvailable' */
+ Semaphore workAvailable;
+ Semaphore roomAvailable;
+ int head;
+ int tail;
+ void** items[WORKQUEUE_SIZE];
+} WorkQueue;
+
+extern WorkQueue* NewWorkQueue ( void );
+extern void FreeWorkQueue ( WorkQueue* pq );
+extern HANDLE GetWorkQueueHandle ( WorkQueue* pq );
+extern BOOL GetWork ( WorkQueue* pq, void** ppw );
+extern BOOL FetchWork ( WorkQueue* pq, void** ppw );
+extern int SubmitWork ( WorkQueue* pq, void* pw );
+
+#endif /* __WORKQUEUE_H__ */