diff options
Diffstat (limited to 'rts/win32/IOManager.c')
-rw-r--r-- | rts/win32/IOManager.c | 510 |
1 files changed, 510 insertions, 0 deletions
diff --git a/rts/win32/IOManager.c b/rts/win32/IOManager.c new file mode 100644 index 0000000000..a67c3504c1 --- /dev/null +++ b/rts/win32/IOManager.c @@ -0,0 +1,510 @@ +/* IOManager.c + * + * Non-blocking / asynchronous I/O for Win32. + * + * (c) sof, 2002-2003. + */ +#include "Rts.h" +#include "IOManager.h" +#include "WorkQueue.h" +#include "ConsoleHandler.h" +#include <stdio.h> +#include <stdlib.h> +#include <io.h> +#include <winsock.h> +#include <process.h> + +/* + * Internal state maintained by the IO manager. + */ +typedef struct IOManagerState { + CritSection manLock; + WorkQueue* workQueue; + int queueSize; + int numWorkers; + int workersIdle; + HANDLE hExitEvent; + unsigned int requestID; + /* fields for keeping track of active WorkItems */ + CritSection active_work_lock; + WorkItem* active_work_items; +} IOManagerState; + +/* ToDo: wrap up this state via a IOManager handle instead? */ +static IOManagerState* ioMan; + +static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi); +static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi); + +/* + * The routine executed by each worker thread. + */ +static +unsigned +WINAPI +IOWorkerProc(PVOID param) +{ + HANDLE hWaits[2]; + DWORD rc; + IOManagerState* iom = (IOManagerState*)param; + WorkQueue* pq = iom->workQueue; + WorkItem* work; + int len = 0, fd = 0; + DWORD errCode = 0; + void* complData; + + hWaits[0] = (HANDLE)iom->hExitEvent; + hWaits[1] = GetWorkQueueHandle(pq); + + while (1) { + /* The error code is communicated back on completion of request; reset. */ + errCode = 0; + + EnterCriticalSection(&iom->manLock); + /* Signal that the worker is idle. + * + * 'workersIdle' is used when determining whether or not to + * increase the worker thread pool when adding a new request. + * (see addIORequest().) + */ + iom->workersIdle++; + LeaveCriticalSection(&iom->manLock); + + /* + * A possible future refinement is to make long-term idle threads + * wake up and decide to shut down should the number of idle threads + * be above some threshold. + * + */ + rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE ); + + if (rc == WAIT_OBJECT_0) { + // we received the exit event + return 0; + } + + EnterCriticalSection(&iom->manLock); + /* Signal that the thread is 'non-idle' and about to consume + * a work item. + */ + iom->workersIdle--; + iom->queueSize--; + LeaveCriticalSection(&iom->manLock); + + if ( rc == (WAIT_OBJECT_0 + 1) ) { + /* work item available, fetch it. */ + if (FetchWork(pq,(void**)&work)) { + work->abandonOp = 0; + RegisterWorkItem(iom,work); + if ( work->workKind & WORKER_READ ) { + if ( work->workKind & WORKER_FOR_SOCKET ) { + len = recv(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len, + 0); + if (len == SOCKET_ERROR) { + errCode = WSAGetLastError(); + } + } else { + while (1) { + /* Do the read(), with extra-special handling for Ctrl+C */ + len = read(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len); + if ( len == 0 && work->workData.ioData.len != 0 ) { + /* Given the following scenario: + * - a console handler has been registered that handles Ctrl+C + * events. + * - we've not tweaked the 'console mode' settings to turn on + * ENABLE_PROCESSED_INPUT. + * - we're blocked waiting on input from standard input. + * - the user hits Ctrl+C. + * + * The OS will invoke the console handler (in a separate OS thread), + * and the above read() (i.e., under the hood, a ReadFile() op) returns + * 0, with the error set to ERROR_OPERATION_ABORTED. We don't + * want to percolate this error condition back to the Haskell user. + * Do this by waiting for the completion of the Haskell console handler. + * If upon completion of the console handler routine, the Haskell thread + * that issued the request is found to have been thrown an exception, + * the worker abandons the request (since that's what the Haskell thread + * has done.) If the Haskell thread hasn't been interrupted, the worker + * retries the read request as if nothing happened. + */ + if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) { + /* For now, only abort when dealing with the standard input handle. + * i.e., for all others, an error is raised. + */ + HANDLE h = (HANDLE)GetStdHandle(STD_INPUT_HANDLE); + if ( _get_osfhandle(work->workData.ioData.fd) == (long)h ) { + if (rts_waitConsoleHandlerCompletion()) { + /* If the Scheduler has set work->abandonOp, the Haskell thread has + * been thrown an exception (=> the worker must abandon this request.) + * We test for this below before invoking the on-completion routine. + */ + if (work->abandonOp) { + break; + } else { + continue; + } + } + } else { + break; /* Treat it like an error */ + } + } else { + break; + } + } else { + break; + } + } + if (len == -1) { errCode = errno; } + } + complData = work->workData.ioData.buf; + fd = work->workData.ioData.fd; + } else if ( work->workKind & WORKER_WRITE ) { + if ( work->workKind & WORKER_FOR_SOCKET ) { + len = send(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len, + 0); + if (len == SOCKET_ERROR) { + errCode = WSAGetLastError(); + } + } else { + len = write(work->workData.ioData.fd, + work->workData.ioData.buf, + work->workData.ioData.len); + if (len == -1) { errCode = errno; } + } + complData = work->workData.ioData.buf; + fd = work->workData.ioData.fd; + } else if ( work->workKind & WORKER_DELAY ) { + /* Approximate implementation of threadDelay; + * + * Note: Sleep() is in milliseconds, not micros. + */ + Sleep(work->workData.delayData.msecs / 1000); + len = work->workData.delayData.msecs; + complData = NULL; + fd = 0; + errCode = 0; + } else if ( work->workKind & WORKER_DO_PROC ) { + /* perform operation/proc on behalf of Haskell thread. */ + if (work->workData.procData.proc) { + /* The procedure is assumed to encode result + success/failure + * via its param. + */ + errCode=work->workData.procData.proc(work->workData.procData.param); + } else { + errCode=1; + } + complData = work->workData.procData.param; + } else { + fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind); + fflush(stderr); + continue; + } + if (!work->abandonOp) { + work->onCompletion(work->requestID, + fd, + len, + complData, + errCode); + } + /* Free the WorkItem */ + DeregisterWorkItem(iom,work); + free(work); + } else { + fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr); + return 1; + } + } else { + fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); fflush(stderr); + return 1; + } + } + return 0; +} + +static +BOOL +NewIOWorkerThread(IOManagerState* iom) +{ + unsigned threadId; + return ( 0 != _beginthreadex(NULL, + 0, + IOWorkerProc, + (LPVOID)iom, + 0, + &threadId) ); +} + +BOOL +StartIOManager(void) +{ + HANDLE hExit; + WorkQueue* wq; + + wq = NewWorkQueue(); + if ( !wq ) return FALSE; + + ioMan = (IOManagerState*)malloc(sizeof(IOManagerState)); + + if (!ioMan) { + FreeWorkQueue(wq); + return FALSE; + } + + /* A manual-reset event */ + hExit = CreateEvent ( NULL, TRUE, FALSE, NULL ); + if ( !hExit ) { + FreeWorkQueue(wq); + free(ioMan); + return FALSE; + } + + ioMan->hExitEvent = hExit; + InitializeCriticalSection(&ioMan->manLock); + ioMan->workQueue = wq; + ioMan->numWorkers = 0; + ioMan->workersIdle = 0; + ioMan->queueSize = 0; + ioMan->requestID = 1; + InitializeCriticalSection(&ioMan->active_work_lock); + ioMan->active_work_items = NULL; + + return TRUE; +} + +/* + * Function: depositWorkItem() + * + * Local function which deposits a WorkItem onto a work queue, + * deciding in the process whether or not the thread pool needs + * to be augmented with another thread to handle the new request. + * + */ +static +int +depositWorkItem( unsigned int reqID, + WorkItem* wItem ) +{ + EnterCriticalSection(&ioMan->manLock); + +#if 0 + fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers); + fflush(stderr); +#endif + /* A new worker thread is created when there are fewer idle threads + * than non-consumed queue requests. This ensures that requests will + * be dealt with in a timely manner. + * + * [Long explanation of why the previous thread pool policy lead to + * trouble] + * + * Previously, the thread pool was augmented iff no idle worker threads + * were available. That strategy runs the risk of repeatedly adding to + * the request queue without expanding the thread pool to handle this + * sudden spike in queued requests. + * [How? Assume workersIdle is 1, and addIORequest() is called. No new + * thread is created and the request is simply queued. If addIORequest() + * is called again _before the OS schedules a worker thread to pull the + * request off the queue_, workersIdle is still 1 and another request is + * simply added to the queue. Once the worker thread is run, only one + * request is de-queued, leaving the 2nd request in the queue] + * + * Assuming none of the queued requests take an inordinate amount of to + * complete, the request queue would eventually be drained. But if that's + * not the case, the later requests will end up languishing in the queue + * indefinitely. The non-timely handling of requests may cause CH applications + * to misbehave / hang; bad. + * + */ + ioMan->queueSize++; + if ( (ioMan->workersIdle < ioMan->queueSize) ) { + /* see if giving up our quantum ferrets out some idle threads. + */ + LeaveCriticalSection(&ioMan->manLock); + Sleep(0); + EnterCriticalSection(&ioMan->manLock); + if ( (ioMan->workersIdle < ioMan->queueSize) ) { + /* No, go ahead and create another. */ + ioMan->numWorkers++; + LeaveCriticalSection(&ioMan->manLock); + NewIOWorkerThread(ioMan); + } else { + LeaveCriticalSection(&ioMan->manLock); + } + } else { + LeaveCriticalSection(&ioMan->manLock); + } + + if (SubmitWork(ioMan->workQueue,wItem)) { + /* Note: the work item has potentially been consumed by a worker thread + * (and freed) at this point, so we cannot use wItem's requestID. + */ + return reqID; + } else { + return 0; + } +} + +/* + * Function: AddIORequest() + * + * Conduit to underlying WorkQueue's SubmitWork(); adds IO + * request to work queue, deciding whether or not to augment + * the thread pool in the process. + */ +int +AddIORequest ( int fd, + BOOL forWriting, + BOOL isSocket, + int len, + char* buffer, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return 0; + + /* Fill in the blanks */ + wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) | + ( forWriting ? WORKER_WRITE : WORKER_READ ); + wItem->workData.ioData.fd = fd; + wItem->workData.ioData.len = len; + wItem->workData.ioData.buf = buffer; + wItem->link = NULL; + + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + + return depositWorkItem(reqID, wItem); +} + +/* + * Function: AddDelayRequest() + * + * Like AddIORequest(), but this time adding a delay request to + * the request queue. + */ +BOOL +AddDelayRequest ( unsigned int msecs, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return FALSE; + + /* Fill in the blanks */ + wItem->workKind = WORKER_DELAY; + wItem->workData.delayData.msecs = msecs; + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + wItem->link = NULL; + + return depositWorkItem(reqID, wItem); +} + +/* + * Function: AddProcRequest() + * + * Add an asynchronous procedure request. + */ +BOOL +AddProcRequest ( void* proc, + void* param, + CompletionProc onCompletion) +{ + WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem)); + unsigned int reqID = ioMan->requestID++; + if (!ioMan || !wItem) return FALSE; + + /* Fill in the blanks */ + wItem->workKind = WORKER_DO_PROC; + wItem->workData.procData.proc = proc; + wItem->workData.procData.param = param; + wItem->onCompletion = onCompletion; + wItem->requestID = reqID; + wItem->abandonOp = 0; + wItem->link = NULL; + + return depositWorkItem(reqID, wItem); +} + +void ShutdownIOManager ( void ) +{ + SetEvent(ioMan->hExitEvent); + // ToDo: we can't free this now, because the worker thread(s) + // haven't necessarily finished with it yet. Perhaps it should + // have a reference count or something. + // free(ioMan); + // ioMan = NULL; +} + +/* Keep track of WorkItems currently being serviced. */ +static +void +RegisterWorkItem(IOManagerState* ioMan, + WorkItem* wi) +{ + EnterCriticalSection(&ioMan->active_work_lock); + wi->link = ioMan->active_work_items; + ioMan->active_work_items = wi; + LeaveCriticalSection(&ioMan->active_work_lock); +} + +static +void +DeregisterWorkItem(IOManagerState* ioMan, + WorkItem* wi) +{ + WorkItem *ptr, *prev; + + EnterCriticalSection(&ioMan->active_work_lock); + for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) { + if (wi->requestID == ptr->requestID) { + if (prev==NULL) { + ioMan->active_work_items = ptr->link; + } else { + prev->link = ptr->link; + } + LeaveCriticalSection(&ioMan->active_work_lock); + return; + } + } + fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", wi->requestID); + LeaveCriticalSection(&ioMan->active_work_lock); +} + + +/* + * Function: abandonWorkRequest() + * + * Signal that a work request isn't of interest. Called by the Scheduler + * if a blocked Haskell thread has an exception thrown to it. + * + * Note: we're not aborting the system call that a worker might be blocked on + * here, just disabling the propagation of its result once its finished. We + * may have to go the whole hog here and switch to overlapped I/O so that we + * can abort blocked system calls. + */ +void +abandonWorkRequest ( int reqID ) +{ + WorkItem *ptr; + EnterCriticalSection(&ioMan->active_work_lock); + for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) { + if (ptr->requestID == (unsigned int)reqID ) { + ptr->abandonOp = 1; + LeaveCriticalSection(&ioMan->active_work_lock); + return; + } + } + /* Note: if the request ID isn't present, the worker will have + * finished sometime since awaitRequests() last drained the completed + * request table; i.e., not an error. + */ + LeaveCriticalSection(&ioMan->active_work_lock); +} |