From 083d7aeb5b0f9e47bf34459415367502492c5e06 Mon Sep 17 00:00:00 2001 From: Duncan Coutts Date: Sun, 3 Jan 2021 12:22:37 +0000 Subject: Move win32/IOManager to win32/MIOManager It is only for MIO, and we want to use the generic name IOManager for the name of the common parts of the interface and dispatch. --- rts/win32/IOManager.c | 616 -------------------------------------------------- 1 file changed, 616 deletions(-) delete mode 100644 rts/win32/IOManager.c (limited to 'rts/win32/IOManager.c') diff --git a/rts/win32/IOManager.c b/rts/win32/IOManager.c deleted file mode 100644 index 271f0e5335..0000000000 --- a/rts/win32/IOManager.c +++ /dev/null @@ -1,616 +0,0 @@ -/* IOManager.c - * - * Non-blocking / asynchronous I/O for Win32. - * - * (c) sof, 2002-2003. - * - * NOTE: This is the MIO manager, only used for --io-manager=posix. - * For the WINIO manager see base in the GHC.Event modules. - */ - -#if !defined(THREADED_RTS) - -#include "Rts.h" -#include "RtsUtils.h" -#include "IOManager.h" -#include "WorkQueue.h" -#include "ConsoleHandler.h" -#include -#include -#include -#include -#include -#include - -/* - * Internal state maintained by the IO manager. - */ -typedef struct IOManagerState { - Mutex manLock; - WorkQueue* workQueue; - int queueSize; - int numWorkers; - int workersIdle; - HANDLE hExitEvent; - unsigned int requestID; - /* fields for keeping track of active WorkItems */ - Mutex active_work_lock; - WorkItem* active_work_items; - UINT sleepResolution; -} IOManagerState; - -/* ToDo: wrap up this state via a IOManager handle instead? */ -static IOManagerState* ioMan; - -static void RegisterWorkItem ( IOManagerState* iom, WorkItem* wi); -static void DeregisterWorkItem( IOManagerState* iom, WorkItem* wi); - -/* - * The routine executed by each worker thread. - */ -static -unsigned -WINAPI -IOWorkerProc(PVOID param) -{ - HANDLE hWaits[2]; - DWORD rc; - IOManagerState* iom = (IOManagerState*)param; - WorkQueue* pq = iom->workQueue; - WorkItem* work; - int len = 0, fd = 0; - DWORD errCode = 0; - void* complData; - - hWaits[0] = (HANDLE)iom->hExitEvent; - hWaits[1] = GetWorkQueueHandle(pq); - - while (1) { - // The error code is communicated back on completion of request; reset. - errCode = 0; - - OS_ACQUIRE_LOCK(&iom->manLock); - /* Signal that the worker is idle. - * - * 'workersIdle' is used when determining whether or not to - * increase the worker thread pool when adding a new request. - * (see addIORequest().) - */ - iom->workersIdle++; - OS_RELEASE_LOCK(&iom->manLock); - - /* - * A possible future refinement is to make long-term idle threads - * wake up and decide to shut down should the number of idle threads - * be above some threshold. - * - */ - rc = WaitForMultipleObjects( 2, hWaits, false, INFINITE ); - - if (rc == WAIT_OBJECT_0) { - // we received the exit event - OS_ACQUIRE_LOCK(&iom->manLock); - ioMan->numWorkers--; - OS_RELEASE_LOCK(&iom->manLock); - return 0; - } - - OS_ACQUIRE_LOCK(&iom->manLock); - /* Signal that the thread is 'non-idle' and about to consume - * a work item. - */ - iom->workersIdle--; - iom->queueSize--; - OS_RELEASE_LOCK(&iom->manLock); - - if ( rc == (WAIT_OBJECT_0 + 1) ) { - /* work item available, fetch it. */ - if (FetchWork(pq,(void**)&work)) { - work->abandonOp = 0; - RegisterWorkItem(iom,work); - if ( work->workKind & WORKER_READ ) { - if ( work->workKind & WORKER_FOR_SOCKET ) { - len = recv(work->workData.ioData.fd, - work->workData.ioData.buf, - work->workData.ioData.len, - 0); - if (len == SOCKET_ERROR) { - errCode = WSAGetLastError(); - } - } else { - while (1) { - // Do the read(), with extra-special handling for Ctrl+C - len = read(work->workData.ioData.fd, - work->workData.ioData.buf, - work->workData.ioData.len); - if ( len == 0 && work->workData.ioData.len != 0 ) { - /* Given the following scenario: - * - a console handler has been registered - * that handles Ctrl+C events. - * - we've not tweaked the 'console mode' - * settings to turn on - * ENABLE_PROCESSED_INPUT. - * - we're blocked waiting on input from - standard input. - * - the user hits Ctrl+C. - * - * The OS will invoke the console handler - * (in a separate OS thread), and the - * above read() (i.e., under the hood, a - * ReadFile() op) returns 0, with the - * error set to - * ERROR_OPERATION_ABORTED. We don't want - * to percolate this error condition back - * to the Haskell user. Do this by - * waiting for the completion of the - * Haskell console handler. If upon - * completion of the console handler - * routine, the Haskell thread that issued - * the request is found to have been - * thrown an exception, the worker - * abandons the request (since that's what - * the Haskell thread has done.) If the - * Haskell thread hasn't been interrupted, - * the worker retries the read request as - * if nothing happened. - */ - if ( (GetLastError()) == ERROR_OPERATION_ABORTED ) { - /* For now, only abort when dealing - * with the standard input handle. - * i.e., for all others, an error is - * raised. - */ - HANDLE h = - (HANDLE)GetStdHandle(STD_INPUT_HANDLE); - int iofd = work->workData.ioData.fd; - if ( _get_osfhandle(iofd) == (intptr_t)h ) { - if (rts_waitConsoleHandlerCompletion()) { - /* If the Scheduler has set - * work->abandonOp, the - * Haskell thread has been - * thrown an exception (=> the - * worker must abandon this - * request.) We test for this - * below before invoking the - * on-completion routine. - */ - if (work->abandonOp) { - break; - } else { - continue; - } - } - } else { - break; /* Treat it like an error */ - } - } else { - break; - } - } else { - break; - } - } - if (len == -1) { errCode = errno; } - } - complData = work->workData.ioData.buf; - fd = work->workData.ioData.fd; - } else if ( work->workKind & WORKER_WRITE ) { - if ( work->workKind & WORKER_FOR_SOCKET ) { - len = send(work->workData.ioData.fd, - work->workData.ioData.buf, - work->workData.ioData.len, - 0); - if (len == SOCKET_ERROR) { - errCode = WSAGetLastError(); - } - } else { - len = write(work->workData.ioData.fd, - work->workData.ioData.buf, - work->workData.ioData.len); - if (len == -1) { - errCode = errno; - // write() gets errno wrong for - // ERROR_NO_DATA, we have to fix it here: - if (errCode == EINVAL && - GetLastError() == ERROR_NO_DATA) { - errCode = EPIPE; - } - } - } - complData = work->workData.ioData.buf; - fd = work->workData.ioData.fd; - } else if ( work->workKind & WORKER_DELAY ) { - /* Approximate implementation of threadDelay; - * - * Note: Sleep() is in milliseconds, not micros. - * - * MSDN says of Sleep: - * If dwMilliseconds is greater than one tick - * but less than two, the wait can be anywhere - * between one and two ticks, and so on. - * - * so we need to add (milliseconds-per-tick - 1) - * to the amount of time we sleep for. - * - * test ThreadDelay001 fails if we get this wrong. - */ - Sleep(((work->workData.delayData.usecs + 999) / 1000) - + iom->sleepResolution - 1); - len = work->workData.delayData.usecs; - complData = NULL; - fd = 0; - errCode = 0; - } else if ( work->workKind & WORKER_DO_PROC ) { - // perform operation/proc on behalf of Haskell thread. - if (work->workData.procData.proc) { - // The procedure is assumed to encode result + - // success/failure via its param. - void* param = work->workData.procData.param; - errCode=work->workData.procData.proc(param); - } else { - errCode=1; - } - complData = work->workData.procData.param; - } else { - fprintf(stderr, "unknown work request type (%d), " - "ignoring.\n", work->workKind); - fflush(stderr); - continue; - } - if (!work->abandonOp) { - work->onCompletion(work->requestID, - fd, - len, - complData, - errCode); - } - // Free the WorkItem - DeregisterWorkItem(iom,work); - stgFree(work); - } else { - fprintf(stderr, "unable to fetch work; fatal.\n"); - fflush(stderr); - OS_ACQUIRE_LOCK(&iom->manLock); - ioMan->numWorkers--; - OS_RELEASE_LOCK(&iom->manLock); - return 1; - } - } else { - fprintf(stderr, "waiting failed (%lu); fatal.\n", rc); - fflush(stderr); - OS_ACQUIRE_LOCK(&iom->manLock); - ioMan->numWorkers--; - OS_RELEASE_LOCK(&iom->manLock); - return 1; - } - } - return 0; -} - -static -bool -NewIOWorkerThread(IOManagerState* iom) -{ - unsigned threadId; - return ( 0 != _beginthreadex(NULL, - 0, - IOWorkerProc, - (LPVOID)iom, - 0, - &threadId) ); -} - -bool -StartIOManager(void) -{ - HANDLE hExit; - WorkQueue* wq; - UINT sleepResolution; - TIMECAPS timecaps; - MMRESULT mmresult; - - mmresult = timeGetDevCaps(&timecaps, sizeof(timecaps)); - if (mmresult != MMSYSERR_NOERROR) { - return false; - } - sleepResolution = timecaps.wPeriodMin; - mmresult = timeBeginPeriod(sleepResolution); - if (mmresult != MMSYSERR_NOERROR) { - return false; - } - - wq = NewWorkQueue(); - if ( !wq ) return false; - - ioMan = (IOManagerState*)stgMallocBytes(sizeof(IOManagerState), "StartIOManager"); - - if (!ioMan) { - FreeWorkQueue(wq); - return false; - } - - /* A manual-reset event */ - hExit = CreateEvent ( NULL, true, false, NULL ); - if ( !hExit ) { - FreeWorkQueue(wq); - stgFree(ioMan); - return false; - } - - ioMan->hExitEvent = hExit; - OS_INIT_LOCK(&ioMan->manLock); - ioMan->workQueue = wq; - ioMan->numWorkers = 0; - ioMan->workersIdle = 0; - ioMan->queueSize = 0; - ioMan->requestID = 1; - OS_INIT_LOCK(&ioMan->active_work_lock); - ioMan->active_work_items = NULL; - ioMan->sleepResolution = sleepResolution; - - return true; -} - -/* - * Function: depositWorkItem() - * - * Local function which deposits a WorkItem onto a work queue, - * deciding in the process whether or not the thread pool needs - * to be augmented with another thread to handle the new request. - * - */ -static -int -depositWorkItem( unsigned int reqID, - WorkItem* wItem ) -{ - OS_ACQUIRE_LOCK(&ioMan->manLock); - -#if 0 - fprintf(stderr, "depositWorkItem: %d/%d\n", - ioMan->workersIdle, ioMan->numWorkers); - fflush(stderr); -#endif - /* A new worker thread is created when there are fewer idle threads - * than non-consumed queue requests. This ensures that requests will - * be dealt with in a timely manner. - * - * [Long explanation of why the previous thread pool policy lead to - * trouble] - * - * Previously, the thread pool was augmented iff no idle worker threads - * were available. That strategy runs the risk of repeatedly adding to - * the request queue without expanding the thread pool to handle this - * sudden spike in queued requests. - * [How? Assume workersIdle is 1, and addIORequest() is called. No new - * thread is created and the request is simply queued. If addIORequest() - * is called again _before the OS schedules a worker thread to pull the - * request off the queue_, workersIdle is still 1 and another request is - * simply added to the queue. Once the worker thread is run, only one - * request is de-queued, leaving the 2nd request in the queue] - * - * Assuming none of the queued requests take an inordinate amount - * of to complete, the request queue would eventually be - * drained. But if that's not the case, the later requests will - * end up languishing in the queue indefinitely. The non-timely - * handling of requests may cause CH applications to misbehave / - * hang; bad. - * - */ - ioMan->queueSize++; - if ( (ioMan->workersIdle < ioMan->queueSize) ) { - /* see if giving up our quantum ferrets out some idle threads. - */ - OS_RELEASE_LOCK(&ioMan->manLock); - Sleep(0); - OS_ACQUIRE_LOCK(&ioMan->manLock); - if ( (ioMan->workersIdle < ioMan->queueSize) ) { - /* No, go ahead and create another. */ - ioMan->numWorkers++; - if (!NewIOWorkerThread(ioMan)) { - ioMan->numWorkers--; - } - } - } - OS_RELEASE_LOCK(&ioMan->manLock); - - if (SubmitWork(ioMan->workQueue,wItem)) { - /* Note: the work item has potentially been consumed by a worker thread - * (and freed) at this point, so we cannot use wItem's requestID. - */ - return reqID; - } else { - return 0; - } -} - -/* - * Function: AddIORequest() - * - * Conduit to underlying WorkQueue's SubmitWork(); adds IO - * request to work queue, deciding whether or not to augment - * the thread pool in the process. - */ -int -AddIORequest ( int fd, - bool forWriting, - bool isSocket, - HsInt len, - char* buffer, - CompletionProc onCompletion) -{ - ASSERT(ioMan); - - WorkItem* wItem = (WorkItem*)stgMallocBytes(sizeof(WorkItem), "AddIORequest"); - - unsigned int reqID = ioMan->requestID++; - - /* Fill in the blanks */ - wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) | - ( forWriting ? WORKER_WRITE : WORKER_READ ); - wItem->workData.ioData.fd = fd; - wItem->workData.ioData.len = len; - wItem->workData.ioData.buf = buffer; - wItem->link = NULL; - - wItem->onCompletion = onCompletion; - wItem->requestID = reqID; - - return depositWorkItem(reqID, wItem); -} - -/* - * Function: AddDelayRequest() - * - * Like AddIORequest(), but this time adding a delay request to - * the request queue. - */ -BOOL -AddDelayRequest ( HsInt usecs, - CompletionProc onCompletion) -{ - ASSERT(ioMan); - - WorkItem* wItem = (WorkItem*)stgMallocBytes(sizeof(WorkItem), "AddDelayRequest"); - - unsigned int reqID = ioMan->requestID++; - - /* Fill in the blanks */ - wItem->workKind = WORKER_DELAY; - wItem->workData.delayData.usecs = usecs; - wItem->onCompletion = onCompletion; - wItem->requestID = reqID; - wItem->link = NULL; - - return depositWorkItem(reqID, wItem); -} - -/* - * Function: AddProcRequest() - * - * Add an asynchronous procedure request. - */ -BOOL -AddProcRequest ( void* proc, - void* param, - CompletionProc onCompletion) -{ - ASSERT(ioMan); - - WorkItem* wItem = (WorkItem*)stgMallocBytes(sizeof(WorkItem), "AddProcRequest"); - if (!wItem) return false; - - unsigned int reqID = ioMan->requestID++; - - /* Fill in the blanks */ - wItem->workKind = WORKER_DO_PROC; - wItem->workData.procData.proc = proc; - wItem->workData.procData.param = param; - wItem->onCompletion = onCompletion; - wItem->requestID = reqID; - wItem->abandonOp = 0; - wItem->link = NULL; - - return depositWorkItem(reqID, wItem); -} - -void ShutdownIOManager ( bool wait_threads ) -{ - int num; - MMRESULT mmresult; - - SetEvent(ioMan->hExitEvent); - - if (wait_threads) { - /* Wait for all worker threads to die. */ - for (;;) { - OS_ACQUIRE_LOCK(&ioMan->manLock); - num = ioMan->numWorkers; - OS_RELEASE_LOCK(&ioMan->manLock); - if (num == 0) - break; - Sleep(10); - } - FreeWorkQueue(ioMan->workQueue); - CloseHandle(ioMan->hExitEvent); - OS_CLOSE_LOCK(&ioMan->active_work_lock); - OS_CLOSE_LOCK(&ioMan->manLock); - - mmresult = timeEndPeriod(ioMan->sleepResolution); - if (mmresult != MMSYSERR_NOERROR) { - barf("timeEndPeriod failed"); - } - - stgFree(ioMan); - ioMan = NULL; - } -} - -/* Keep track of WorkItems currently being serviced. */ -static -void -RegisterWorkItem(IOManagerState* ioMan, - WorkItem* wi) -{ - OS_ACQUIRE_LOCK(&ioMan->active_work_lock); - wi->link = ioMan->active_work_items; - ioMan->active_work_items = wi; - OS_RELEASE_LOCK(&ioMan->active_work_lock); -} - -static -void -DeregisterWorkItem(IOManagerState* ioMan, - WorkItem* wi) -{ - WorkItem *ptr, *prev; - - OS_ACQUIRE_LOCK(&ioMan->active_work_lock); - for(prev=NULL,ptr=ioMan->active_work_items;ptr;prev=ptr,ptr=ptr->link) { - if (wi->requestID == ptr->requestID) { - if (prev==NULL) { - ioMan->active_work_items = ptr->link; - } else { - prev->link = ptr->link; - } - OS_RELEASE_LOCK(&ioMan->active_work_lock); - return; - } - } - fprintf(stderr, "DeregisterWorkItem: unable to locate work item %d\n", - wi->requestID); - OS_RELEASE_LOCK(&ioMan->active_work_lock); -} - - -/* - * Function: abandonWorkRequest() - * - * Signal that a work request isn't of interest. Called by the Scheduler - * if a blocked Haskell thread has an exception thrown to it. - * - * Note: we're not aborting the system call that a worker might be blocked on - * here, just disabling the propagation of its result once its finished. We - * may have to go the whole hog here and switch to overlapped I/O so that we - * can abort blocked system calls. - */ -void -abandonWorkRequest ( int reqID ) -{ - WorkItem *ptr; - OS_ACQUIRE_LOCK(&ioMan->active_work_lock); - for(ptr=ioMan->active_work_items;ptr;ptr=ptr->link) { - if (ptr->requestID == (unsigned int)reqID ) { - ptr->abandonOp = 1; - OS_RELEASE_LOCK(&ioMan->active_work_lock); - return; - } - } - /* Note: if the request ID isn't present, the worker will have - * finished sometime since awaitRequests() last drained the completed - * request table; i.e., not an error. - */ - OS_RELEASE_LOCK(&ioMan->active_work_lock); -} - -#endif -- cgit v1.2.1