diff options
author | Simon Marlow <simonmar@microsoft.com> | 2006-04-07 02:05:11 +0000 |
---|---|---|
committer | Simon Marlow <simonmar@microsoft.com> | 2006-04-07 02:05:11 +0000 |
commit | 0065d5ab628975892cea1ec7303f968c3338cbe1 (patch) | |
tree | 8e2afe0ab48ee33cf95009809d67c9649573ef92 /rts/win32/IOManager.c | |
parent | 28a464a75e14cece5db40f2765a29348273ff2d2 (diff) | |
download | haskell-0065d5ab628975892cea1ec7303f968c3338cbe1.tar.gz |
Reorganisation of the source tree
Most of the other users of the fptools build system have migrated to
Cabal, and with the move to darcs we can now flatten the source tree
without losing history, so here goes.
The main change is that the ghc/ subdir is gone, and most of what it
contained is now at the top level. The build system now makes no
pretense at being multi-project, it is just the GHC build system.
No doubt this will break many things, and there will be a period of
instability while we fix the dependencies. A straightforward build
should work, but I haven't yet fixed binary/source distributions.
Changes to the Building Guide will follow, too.
Diffstat (limited to 'rts/win32/IOManager.c')
-rw-r--r-- | rts/win32/IOManager.c | 510 |
1 files changed, 510 insertions, 0 deletions
diff --git a/rts/win32/IOManager.c b/rts/win32/IOManager.c new file mode 100644 index 0000000000..a67c3504c1 --- /dev/null +++ b/rts/win32/IOManager.c @@ -0,0 +1,510 @@ +/* IOManager.c + * + * Non-blocking / asynchronous I/O for Win32. + * + * (c) sof, 2002-2003. + */ +#include "Rts.h" +#include "IOManager.h" +#include "WorkQueue.h" +#include "ConsoleHandler.h" +#include <stdio.h> +#include <stdlib.h> +#include <io.h> +#include <winsock.h> +#include <process.h> + +/* + * Internal state maintained by the IO manager. + */ +typedef struct IOManagerState { + CritSection manLock; + WorkQueue* workQueue; + int queueSize; + int numWorkers; + int workersIdle; + HANDLE hExitEvent; + unsigned int requestID; + /* fields for keeping track of active WorkItems */ + CritSection active_work_lock; + WorkItem* active_work_items; +} IOManagerState; + +/* ToDo: wrap up this state via a IOManager handle instead? */ +static IOManagerState* ioMan; + +static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi); +static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi); + +/* + * The routine executed by each worker thread. + */ +static +unsigned +WINAPI +IOWorkerProc(PVOID param) +{ + HANDLE hWaits[2]; + DWORD rc; + IOManagerState* iom = (IOManagerState*)param; + WorkQueue* pq = iom->workQueue; + WorkItem* work; + int len = 0, fd = 0; + DWORD errCode = 0; + void* complData; + + hWaits[0] = (HANDLE)iom->hExitEvent; + hWaits[1] = GetWorkQueueHandle(pq); + + while (1) { + /* The error code is communicated back on completion of request; reset. */ + errCode = 0; + + EnterCriticalSection(&iom->manLock); + /* Signal that the worker is idle. + * + * 'workersIdle' is used when determining whether or not to + * increase the worker thread pool when adding a new request. + * (see addIORequest().) + */ + iom->workersIdle++; + LeaveCriticalSection(&iom->manLock); + + /* + * A possible future refinement is to make long-term idle threads + * wake up and decide to shut down should the number of idle threads + * be above some threshold. + * + */ + rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE ); + + if (rc == WAIT_OBJECT_0) { + // we received the exit event + return 0; + } + + EnterCriticalSection(&iom->manLock); + /* Signal that the thread is 'non-idle' and about to consume + * a work item. + */ + iom->workersIdle--; + iom->queueSize--; + LeaveCriticalSection(&iom->manLock); + + if ( rc == (WAIT_OBJECT_0 + 1) ) { + /* work item available, fetch it. */ + if (FetchWork(pq,(void**)&work)) { + work->abandonOp = 0; + RegisterWorkItem(iom,work); + if ( work->workKind & WORKER_READ ) { + if ( work->workKind & WORKER_FOR_SOCKET ) { + len = recv(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len, + 0); + if (len == SOCKET_ERROR) { + errCode = WSAGetLastError(); + } + } else { + while (1) { + /* Do the read(), with extra-special handling for Ctrl+C */ + len = read(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len); + if ( len == 0 && work->workData.ioData.len != 0 ) { + /* Given the following scenario: + * - a console handler has been registered that handles Ctrl+C + * events. + * - we've not tweaked the 'console mode' settings to turn on + * ENABLE_PROCESSED_INPUT. + * - we're blocked waiting on input from standard input. + * - the user hits Ctrl+C. + * + * The OS will invoke the console handler (in a separate OS thread), + * and the above read() (i.e., under the hood, a ReadFile() op) returns + * 0, with the error set to ERROR_OPERATION_ABORTED. We don't + * want to percolate this error condition back to the Haskell user. + * Do this by waiting for the completion of the Haskell console handler. + * If upon completion of the console handler routine, the Haskell thread + * that issued the request is found to have been thrown an exception, + * the worker abandons the request (since that's what the Haskell thread + * has done.) If the Haskell thread hasn't been interrupted, the worker + * retries the read request as if nothing happened. + */ + if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) { + /* For now, only abort when dealing with the standard input handle. + * i.e., for all others, an error is raised. + */ + HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE); + if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) { + if (rts_waitConsoleHandlerCompletion()) { + /* If the Scheduler has set work->abandonOp, the Haskell thread has + * been thrown an exception (=> the worker must abandon this request.) + * We test for this below before invoking the on-completion routine. + */ + if (work->abandonOp) { + break; + } else { + continue; + } + } + } else { + break; /* Treat it like an error */ + } + } else { + break; + } + } else { + break; + } + } + if (len == -1) { errCode = errno; } + } + complData = work->workData.ioData.buf; + fd = work->workData.ioData.fd; + } else if ( work->workKind & WORKER_WRITE ) { + if ( work->workKind & WORKER_FOR_SOCKET ) { + len = send(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len, + 0); + if (len == SOCKET_ERROR) { + errCode = WSAGetLastError(); + } + } else { + len = write(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len); + if (len == -1) { errCode = errno; } + } + complData = work->workData.ioData.buf; + fd = work->workData.ioData.fd; + } else if ( work->workKind & WORKER_DELAY ) { + /* Approximate implementation of threadDelay; + * + * Note: Sleep() is in milliseconds, not micros. + */ + Sleep(work->workData.delayData.msecs / 1000); + len = work->workData.delayData.msecs; + complData = NULL; + fd = 0; + errCode = 0; + } else if ( work->workKind & WORKER_DO_PROC ) { + /* perform operation/proc on behalf of Haskell thread. */ + if (work->workData.procData.proc) { + /* The procedure is assumed to encode result + success/failure + * via its param. + */ + errCode=work->workData.procData.proc(work->workData.procData.param); + } else { + errCode=1; + } + complData = work->workData.procData.param; + } else { + fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind); + fflush(stderr); + continue; + } + if (!work->abandonOp) { + work->onCompletion(work->requestID, + fd, + len, + complData, + errCode); + } + /* Free the WorkItem */ + DeregisterWorkItem(iom,work); + free(work); + } else { + fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr); + return 1; + } + } else { + fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr); + return 1; + } + } + return 0; +} + +static +BOOL +NewIOWorkerThread(IOManagerState* iom) +{ + unsigned threadId; + return ( 0 != _beginthreadex(NULL, + 0, + IOWorkerProc, + (LPVOID)iom, + 0, + &threadId) ); +} + +BOOL +StartIOManager(void) +{ + HANDLE hExit; + WorkQueue* wq; + + wq = NewWorkQueue(); + if ( !wq ) return FALSE; + + ioMan = (IOManagerState*)malloc(sizeof(IOManagerState)); + + if (!ioMan) { + FreeWorkQueue(wq); + return FALSE; + } + + /* A manual-reset event */ + hExit = CreateEvent ( NULL, TRUE, FALSE, NULL ); + if ( !hExit ) { + FreeWorkQueue(wq); + free(ioMan); + return FALSE; + } + + ioMan->hExitEvent = hExit; + InitializeCriticalSection(&ioMan->manLock); + ioMan->workQueue = wq; + ioMan->numWorkers = 0; + ioMan->workersIdle = 0; + ioMan->queueSize = 0; + ioMan->requestID = 1; + InitializeCriticalSection(&ioMan->active_work_lock); + ioMan->active_work_items = NULL; + + return TRUE; +} + +/* + * Function: depositWorkItem() + * + * Local function which deposits a WorkItem onto a work queue, + * deciding in the process whether or not the thread pool needs + * to be augmented with another thread to handle the new request. + * + */ +static +int +depositWorkItem( unsigned int reqID, + WorkItem* wItem ) +{ + EnterCriticalSection(&ioMan->manLock); + +#if 0 + fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); + fflush(stderr); +#endif + /* A new worker thread is created when there are fewer idle threads + * than non-consumed queue requests. This ensures that requests will + * be dealt with in a timely manner. + * + * [Long explanation of why the previous thread pool policy lead to + * trouble] + * + * Previously, the thread pool was augmented iff no idle worker threads + * were available. That strategy runs the risk of repeatedly adding to + * the request queue without expanding the thread pool to handle this + * sudden spike in queued requests. + * [How? Assume workersIdle is 1, and addIORequest() is called. No new + * thread is created and the request is simply queued. If addIORequest() + * is called again _before the OS schedules a worker thread to pull the + * request off the queue_, workersIdle is still 1 and another request is + * simply added to the queue. Once the worker thread is run, only one + * request is de-queued, leaving the 2nd request in the queue] + * + * Assuming none of the queued requests take an inordinate amount of to + * complete, the request queue would eventually be drained. But if that's + * not the case, the later requests will end up languishing in the queue + * indefinitely. The non-timely handling of requests may cause CH applications + * to misbehave / hang; bad. + * + */ + ioMan->queueSize++; + if ( (ioMan->workersIdle < ioMan->queueSize) ) { + /* see if giving up our quantum ferrets out some idle threads. + */ + LeaveCriticalSection(&ioMan->manLock); + Sleep(0); + EnterCriticalSection(&ioMan->manLock); + if ( (ioMan->workersIdle < ioMan->queueSize) ) { + /* No, go ahead and create another. */ + ioMan->numWorkers++; + LeaveCriticalSection(&ioMan->manLock); + NewIOWorkerThread(ioMan); + } else { + LeaveCriticalSection(&ioMan->manLock); + } + } else { + LeaveCriticalSection(&ioMan->manLock); + } + + if (SubmitWork(ioMan->workQueue,wItem)) { + /* Note: the work item has potentially been consumed by a worker thread + * (and freed) at this point, so we cannot use wItem's requestID. + */ + return reqID; + } else { + return 0; + } +} + +/* + * Function: AddIORequest() + * + * Conduit to underlying WorkQueue's SubmitWork(); adds IO + * request to work queue, deciding whether or not to augment + * the thread pool in the process. + */ +int +AddIORequest ( int fd, + BOOL forWriting, + BOOL isSocket, + int len, + char* buffer, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return 0; + + /* Fill in the blanks */ + wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) | + ( forWriting ? WORKER_WRITE : WORKER_READ ); + wItem->workData.ioData.fd = fd; + wItem->workData.ioData.len = len; + wItem->workData.ioData.buf = buffer; + wItem->link = NULL; + + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + + return depositWorkItem(reqID, wItem); +} + +/* + * Function: AddDelayRequest() + * + * Like AddIORequest(), but this time adding a delay request to + * the request queue. + */ +BOOL +AddDelayRequest ( unsigned int msecs, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return FALSE; + + /* Fill in the blanks */ + wItem->workKind = WORKER_DELAY; + wItem->workData.delayData.msecs = msecs; + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + wItem->link = NULL; + + return depositWorkItem(reqID, wItem); +} + +/* + * Function: AddProcRequest() + * + * Add an asynchronous procedure request. + */ +BOOL +AddProcRequest ( void* proc, + void* param, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return FALSE; + + /* Fill in the blanks */ + wItem->workKind = WORKER_DO_PROC; + wItem->workData.procData.proc = proc; + wItem->workData.procData.param = param; + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + wItem->abandonOp = 0; + wItem->link = NULL; + + return depositWorkItem(reqID, wItem); +} + +void ShutdownIOManager ( void ) +{ + SetEvent(ioMan->hExitEvent); + // ToDo: we can't free this now, because the worker thread(s) + // haven't necessarily finished with it yet. Perhaps it should + // have a reference count or something. + // free(ioMan); + // ioMan = NULL; +} + +/* Keep track of WorkItems currently being serviced. */ +static +void +RegisterWorkItem(IOManagerState* ioMan, + WorkItem* wi) +{ + EnterCriticalSection(&ioMan->active_work_lock); + wi->link = ioMan->active_work_items; + ioMan->active_work_items = wi; + LeaveCriticalSection(&ioMan->active_work_lock); +} + +static +void +DeregisterWorkItem(IOManagerState* ioMan, + WorkItem* wi) +{ + WorkItem *ptr, *prev; + + EnterCriticalSection(&ioMan->active_work_lock); + for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) { + if (wi->requestID == ptr->requestID) { + if (prev==NULL) { + ioMan->active_work_items = ptr->link; + } else { + prev->link = ptr->link; + } + LeaveCriticalSection(&ioMan->active_work_lock); + return; + } + } + fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID); + LeaveCriticalSection(&ioMan->active_work_lock); +} + + +/* + * Function: abandonWorkRequest() + * + * Signal that a work request isn't of interest. Called by the Scheduler + * if a blocked Haskell thread has an exception thrown to it. + * + * Note: we're not aborting the system call that a worker might be blocked on + * here, just disabling the propagation of its result once its finished. We + * may have to go the whole hog here and switch to overlapped I/O so that we + * can abort blocked system calls. + */ +void +abandonWorkRequest ( int reqID ) +{ + WorkItem *ptr; + EnterCriticalSection(&ioMan->active_work_lock); + for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) { + if (ptr->requestID == (unsigned int)reqID ) { + ptr->abandonOp = 1; + LeaveCriticalSection(&ioMan->active_work_lock); + return; + } + } + /* Note: if the request ID isn't present, the worker will have + * finished sometime since awaitRequests() last drained the completed + * request table; i.e., not an error. + */ + LeaveCriticalSection(&ioMan->active_work_lock); +} |