summaryrefslogtreecommitdiff
path: root/rts/win32/IOManager.c
diff options
context:
space:
mode:
Diffstat (limited to 'rts/win32/IOManager.c')
-rw-r--r--rts/win32/IOManager.c510
1 files changed, 510 insertions, 0 deletions
diff --git a/rts/win32/IOManager.c b/rts/win32/IOManager.c
new file mode 100644
index 0000000000..a67c3504c1
--- /dev/null
+++ b/rts/win32/IOManager.c
@@ -0,0 +1,510 @@
+/* IOManager.c
+ *
+ * Non-blocking / asynchronous I/O for Win32.
+ *
+ * (c) sof, 2002-2003.
+ */
+#include "Rts.h"
+#include "IOManager.h"
+#include "WorkQueue.h"
+#include "ConsoleHandler.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <io.h>
+#include <winsock.h>
+#include <process.h>
+
+/*
+ * Internal state maintained by the IO manager.
+ */
+typedef struct IOManagerState {
+ CritSection manLock;
+ WorkQueue* workQueue;
+ int queueSize;
+ int numWorkers;
+ int workersIdle;
+ HANDLE hExitEvent;
+ unsigned int requestID;
+ /* fields for keeping track of active WorkItems */
+ CritSection active_work_lock;
+ WorkItem* active_work_items;
+} IOManagerState;
+
+/* ToDo: wrap up this state via a IOManager handle instead? */
+static IOManagerState* ioMan;
+
+static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi);
+static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi);
+
+/*
+ * The routine executed by each worker thread.
+ */
+static
+unsigned
+WINAPI
+IOWorkerProc(PVOID param)
+{
+ HANDLE hWaits[2];
+ DWORD rc;
+ IOManagerState* iom = (IOManagerState*)param;
+ WorkQueue* pq = iom->workQueue;
+ WorkItem* work;
+ int len = 0, fd = 0;
+ DWORD errCode = 0;
+ void* complData;
+
+ hWaits[0] = (HANDLE)iom->hExitEvent;
+ hWaits[1] = GetWorkQueueHandle(pq);
+
+ while (1) {
+ /* The error code is communicated back on completion of request; reset. */
+ errCode = 0;
+
+ EnterCriticalSection(&iom->manLock);
+ /* Signal that the worker is idle.
+ *
+ * 'workersIdle' is used when determining whether or not to
+ * increase the worker thread pool when adding a new request.
+ * (see addIORequest().)
+ */
+ iom->workersIdle++;
+ LeaveCriticalSection(&iom->manLock);
+
+ /*
+ * A possible future refinement is to make long-term idle threads
+ * wake up and decide to shut down should the number of idle threads
+ * be above some threshold.
+ *
+ */
+ rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
+
+ if (rc == WAIT_OBJECT_0) {
+ // we received the exit event
+ return 0;
+ }
+
+ EnterCriticalSection(&iom->manLock);
+ /* Signal that the thread is 'non-idle' and about to consume
+ * a work item.
+ */
+ iom->workersIdle--;
+ iom->queueSize--;
+ LeaveCriticalSection(&iom->manLock);
+
+ if ( rc == (WAIT_OBJECT_0 + 1) ) {
+ /* work item available, fetch it. */
+ if (FetchWork(pq,(void**)&work)) {
+ work->abandonOp = 0;
+ RegisterWorkItem(iom,work);
+ if ( work->workKind & WORKER_READ ) {
+ if ( work->workKind & WORKER_FOR_SOCKET ) {
+ len = recv(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len,
+ 0);
+ if (len == SOCKET_ERROR) {
+ errCode = WSAGetLastError();
+ }
+ } else {
+ while (1) {
+ /* Do the read(), with extra-special handling for Ctrl+C */
+ len = read(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len);
+ if ( len == 0 && work->workData.ioData.len != 0 ) {
+ /* Given the following scenario:
+ * - a console handler has been registered that handles Ctrl+C
+ * events.
+ * - we've not tweaked the 'console mode' settings to turn on
+ * ENABLE_PROCESSED_INPUT.
+ * - we're blocked waiting on input from standard input.
+ * - the user hits Ctrl+C.
+ *
+ * The OS will invoke the console handler (in a separate OS thread),
+ * and the above read() (i.e., under the hood, a ReadFile() op) returns
+ * 0, with the error set to ERROR_OPERATION_ABORTED. We don't
+ * want to percolate this error condition back to the Haskell user.
+ * Do this by waiting for the completion of the Haskell console handler.
+ * If upon completion of the console handler routine, the Haskell thread
+ * that issued the request is found to have been thrown an exception,
+ * the worker abandons the request (since that's what the Haskell thread
+ * has done.) If the Haskell thread hasn't been interrupted, the worker
+ * retries the read request as if nothing happened.
+ */
+ if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) {
+ /* For now, only abort when dealing with the standard input handle.
+ * i.e., for all others, an error is raised.
+ */
+ HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE);
+ if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) {
+ if (rts_waitConsoleHandlerCompletion()) {
+ /* If the Scheduler has set work->abandonOp, the Haskell thread has
+ * been thrown an exception (=> the worker must abandon this request.)
+ * We test for this below before invoking the on-completion routine.
+ */
+ if (work->abandonOp) {
+ break;
+ } else {
+ continue;
+ }
+ }
+ } else {
+ break; /* Treat it like an error */
+ }
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ if (len == -1) { errCode = errno; }
+ }
+ complData = work->workData.ioData.buf;
+ fd = work->workData.ioData.fd;
+ } else if ( work->workKind & WORKER_WRITE ) {
+ if ( work->workKind & WORKER_FOR_SOCKET ) {
+ len = send(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len,
+ 0);
+ if (len == SOCKET_ERROR) {
+ errCode = WSAGetLastError();
+ }
+ } else {
+ len = write(work->workData.ioData.fd,
+ work->workData.ioData.buf,
+ work->workData.ioData.len);
+ if (len == -1) { errCode = errno; }
+ }
+ complData = work->workData.ioData.buf;
+ fd = work->workData.ioData.fd;
+ } else if ( work->workKind & WORKER_DELAY ) {
+ /* Approximate implementation of threadDelay;
+ *
+ * Note: Sleep() is in milliseconds, not micros.
+ */
+ Sleep(work->workData.delayData.msecs / 1000);
+ len = work->workData.delayData.msecs;
+ complData = NULL;
+ fd = 0;
+ errCode = 0;
+ } else if ( work->workKind & WORKER_DO_PROC ) {
+ /* perform operation/proc on behalf of Haskell thread. */
+ if (work->workData.procData.proc) {
+ /* The procedure is assumed to encode result + success/failure
+ * via its param.
+ */
+ errCode=work->workData.procData.proc(work->workData.procData.param);
+ } else {
+ errCode=1;
+ }
+ complData = work->workData.procData.param;
+ } else {
+ fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
+ fflush(stderr);
+ continue;
+ }
+ if (!work->abandonOp) {
+ work->onCompletion(work->requestID,
+ fd,
+ len,
+ complData,
+ errCode);
+ }
+ /* Free the WorkItem */
+ DeregisterWorkItem(iom,work);
+ free(work);
+ } else {
+ fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
+ return 1;
+ }
+ } else {
+ fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr);
+ return 1;
+ }
+ }
+ return 0;
+}
+
+static
+BOOL
+NewIOWorkerThread(IOManagerState* iom)
+{
+ unsigned threadId;
+ return ( 0 != _beginthreadex(NULL,
+ 0,
+ IOWorkerProc,
+ (LPVOID)iom,
+ 0,
+ &threadId) );
+}
+
+BOOL
+StartIOManager(void)
+{
+ HANDLE hExit;
+ WorkQueue* wq;
+
+ wq = NewWorkQueue();
+ if ( !wq ) return FALSE;
+
+ ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
+
+ if (!ioMan) {
+ FreeWorkQueue(wq);
+ return FALSE;
+ }
+
+ /* A manual-reset event */
+ hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
+ if ( !hExit ) {
+ FreeWorkQueue(wq);
+ free(ioMan);
+ return FALSE;
+ }
+
+ ioMan->hExitEvent = hExit;
+ InitializeCriticalSection(&ioMan->manLock);
+ ioMan->workQueue = wq;
+ ioMan->numWorkers = 0;
+ ioMan->workersIdle = 0;
+ ioMan->queueSize = 0;
+ ioMan->requestID = 1;
+ InitializeCriticalSection(&ioMan->active_work_lock);
+ ioMan->active_work_items = NULL;
+
+ return TRUE;
+}
+
+/*
+ * Function: depositWorkItem()
+ *
+ * Local function which deposits a WorkItem onto a work queue,
+ * deciding in the process whether or not the thread pool needs
+ * to be augmented with another thread to handle the new request.
+ *
+ */
+static
+int
+depositWorkItem( unsigned int reqID,
+ WorkItem* wItem )
+{
+ EnterCriticalSection(&ioMan->manLock);
+
+#if 0
+ fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers);
+ fflush(stderr);
+#endif
+ /* A new worker thread is created when there are fewer idle threads
+ * than non-consumed queue requests. This ensures that requests will
+ * be dealt with in a timely manner.
+ *
+ * [Long explanation of why the previous thread pool policy lead to
+ * trouble]
+ *
+ * Previously, the thread pool was augmented iff no idle worker threads
+ * were available. That strategy runs the risk of repeatedly adding to
+ * the request queue without expanding the thread pool to handle this
+ * sudden spike in queued requests.
+ * [How? Assume workersIdle is 1, and addIORequest() is called. No new
+ * thread is created and the request is simply queued. If addIORequest()
+ * is called again _before the OS schedules a worker thread to pull the
+ * request off the queue_, workersIdle is still 1 and another request is
+ * simply added to the queue. Once the worker thread is run, only one
+ * request is de-queued, leaving the 2nd request in the queue]
+ *
+ * Assuming none of the queued requests take an inordinate amount of to
+ * complete, the request queue would eventually be drained. But if that's
+ * not the case, the later requests will end up languishing in the queue
+ * indefinitely. The non-timely handling of requests may cause CH applications
+ * to misbehave / hang; bad.
+ *
+ */
+ ioMan->queueSize++;
+ if ( (ioMan->workersIdle < ioMan->queueSize) ) {
+ /* see if giving up our quantum ferrets out some idle threads.
+ */
+ LeaveCriticalSection(&ioMan->manLock);
+ Sleep(0);
+ EnterCriticalSection(&ioMan->manLock);
+ if ( (ioMan->workersIdle < ioMan->queueSize) ) {
+ /* No, go ahead and create another. */
+ ioMan->numWorkers++;
+ LeaveCriticalSection(&ioMan->manLock);
+ NewIOWorkerThread(ioMan);
+ } else {
+ LeaveCriticalSection(&ioMan->manLock);
+ }
+ } else {
+ LeaveCriticalSection(&ioMan->manLock);
+ }
+
+ if (SubmitWork(ioMan->workQueue,wItem)) {
+ /* Note: the work item has potentially been consumed by a worker thread
+ * (and freed) at this point, so we cannot use wItem's requestID.
+ */
+ return reqID;
+ } else {
+ return 0;
+ }
+}
+
+/*
+ * Function: AddIORequest()
+ *
+ * Conduit to underlying WorkQueue's SubmitWork(); adds IO
+ * request to work queue, deciding whether or not to augment
+ * the thread pool in the process.
+ */
+int
+AddIORequest ( int fd,
+ BOOL forWriting,
+ BOOL isSocket,
+ int len,
+ char* buffer,
+ CompletionProc onCompletion)
+{
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ unsigned int reqID = ioMan->requestID++;
+ if (!ioMan || !wItem) return 0;
+
+ /* Fill in the blanks */
+ wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
+ ( forWriting ? WORKER_WRITE : WORKER_READ );
+ wItem->workData.ioData.fd = fd;
+ wItem->workData.ioData.len = len;
+ wItem->workData.ioData.buf = buffer;
+ wItem->link = NULL;
+
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = reqID;
+
+ return depositWorkItem(reqID, wItem);
+}
+
+/*
+ * Function: AddDelayRequest()
+ *
+ * Like AddIORequest(), but this time adding a delay request to
+ * the request queue.
+ */
+BOOL
+AddDelayRequest ( unsigned int msecs,
+ CompletionProc onCompletion)
+{
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ unsigned int reqID = ioMan->requestID++;
+ if (!ioMan || !wItem) return FALSE;
+
+ /* Fill in the blanks */
+ wItem->workKind = WORKER_DELAY;
+ wItem->workData.delayData.msecs = msecs;
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = reqID;
+ wItem->link = NULL;
+
+ return depositWorkItem(reqID, wItem);
+}
+
+/*
+ * Function: AddProcRequest()
+ *
+ * Add an asynchronous procedure request.
+ */
+BOOL
+AddProcRequest ( void* proc,
+ void* param,
+ CompletionProc onCompletion)
+{
+ WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
+ unsigned int reqID = ioMan->requestID++;
+ if (!ioMan || !wItem) return FALSE;
+
+ /* Fill in the blanks */
+ wItem->workKind = WORKER_DO_PROC;
+ wItem->workData.procData.proc = proc;
+ wItem->workData.procData.param = param;
+ wItem->onCompletion = onCompletion;
+ wItem->requestID = reqID;
+ wItem->abandonOp = 0;
+ wItem->link = NULL;
+
+ return depositWorkItem(reqID, wItem);
+}
+
+void ShutdownIOManager ( void )
+{
+ SetEvent(ioMan->hExitEvent);
+ // ToDo: we can't free this now, because the worker thread(s)
+ // haven't necessarily finished with it yet. Perhaps it should
+ // have a reference count or something.
+ // free(ioMan);
+ // ioMan = NULL;
+}
+
+/* Keep track of WorkItems currently being serviced. */
+static
+void
+RegisterWorkItem(IOManagerState* ioMan,
+ WorkItem* wi)
+{
+ EnterCriticalSection(&ioMan->active_work_lock);
+ wi->link = ioMan->active_work_items;
+ ioMan->active_work_items = wi;
+ LeaveCriticalSection(&ioMan->active_work_lock);
+}
+
+static
+void
+DeregisterWorkItem(IOManagerState* ioMan,
+ WorkItem* wi)
+{
+ WorkItem *ptr, *prev;
+
+ EnterCriticalSection(&ioMan->active_work_lock);
+ for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) {
+ if (wi->requestID == ptr->requestID) {
+ if (prev==NULL) {
+ ioMan->active_work_items = ptr->link;
+ } else {
+ prev->link = ptr->link;
+ }
+ LeaveCriticalSection(&ioMan->active_work_lock);
+ return;
+ }
+ }
+ fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID);
+ LeaveCriticalSection(&ioMan->active_work_lock);
+}
+
+
+/*
+ * Function: abandonWorkRequest()
+ *
+ * Signal that a work request isn't of interest. Called by the Scheduler
+ * if a blocked Haskell thread has an exception thrown to it.
+ *
+ * Note: we're not aborting the system call that a worker might be blocked on
+ * here, just disabling the propagation of its result once its finished. We
+ * may have to go the whole hog here and switch to overlapped I/O so that we
+ * can abort blocked system calls.
+ */
+void
+abandonWorkRequest ( int reqID )
+{
+ WorkItem *ptr;
+ EnterCriticalSection(&ioMan->active_work_lock);
+ for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) {
+ if (ptr->requestID == (unsigned int)reqID ) {
+ ptr->abandonOp = 1;
+ LeaveCriticalSection(&ioMan->active_work_lock);
+ return;
+ }
+ }
+ /* Note: if the request ID isn't present, the worker will have
+ * finished sometime since awaitRequests() last drained the completed
+ * request table; i.e., not an error.
+ */
+ LeaveCriticalSection(&ioMan->active_work_lock);
+}