diff options
author | Tamar Christina <tamar@zhox.com> | 2019-06-16 21:32:25 +0100 |
---|---|---|
committer | Ben Gamari <ben@smart-cactus.org> | 2020-07-15 16:41:02 -0400 |
commit | 64d8f2fe2d27743e2986d2176b1aa934e5484d7a (patch) | |
tree | ddf3b643330f032b332e1f476d2809fe35ae6072 /rts | |
parent | 4489af6bad11a198e9e6c192f41e17020f28d0c1 (diff) | |
download | haskell-64d8f2fe2d27743e2986d2176b1aa934e5484d7a.tar.gz |
winio: core non-threaded I/O manager
Diffstat (limited to 'rts')
-rw-r--r-- | rts/win32/AsyncMIO.c | 387 | ||||
-rw-r--r-- | rts/win32/AsyncMIO.h | 29 | ||||
-rw-r--r-- | rts/win32/AsyncWinIO.c | 402 | ||||
-rw-r--r-- | rts/win32/AsyncWinIO.h | 26 |
4 files changed, 844 insertions, 0 deletions
diff --git a/rts/win32/AsyncMIO.c b/rts/win32/AsyncMIO.c new file mode 100644 index 0000000000..5d55f79d74 --- /dev/null +++ b/rts/win32/AsyncMIO.c @@ -0,0 +1,387 @@ +/* AsyncIO.c + * + * Integrating Win32 asynchronous I/O with the GHC RTS. + * + * (c) sof, 2002-2003. + * + * NOTE: This is the MIO manager, only used for --io-manager=posix. + * For the WINIO manager see base in the GHC.Event modules. + */ + +#if !defined(THREADED_RTS) + +#include "Rts.h" +#include "RtsUtils.h" +#include <windows.h> +#include <stdio.h> +#include "Schedule.h" +#include "Capability.h" +#include "win32/AsyncMIO.h" +#include "win32/IOManager.h" + +/* + * Overview: + * + * Haskell code issue asynchronous I/O requests via the + * async{Read,Write,DoOp}# primops. These cause addIORequest() + * to be invoked, which forwards the request to the underlying + * asynchronous I/O subsystem. Each request is tagged with a unique + * ID. + * + * addIORequest() returns this ID, so that when the blocked CH + * thread is added onto blocked_queue, its TSO is annotated with + * it. Upon completion of an I/O request, the async I/O handling + * code makes a back-call to signal its completion; the local + * onIOComplete() routine. It adds the IO request ID (along with + * its result data) to a queue of completed requests before returning. + * + * The queue of completed IO request is read by the thread operating + * the RTS scheduler. It de-queues the CH threads corresponding + * to the request IDs, making them runnable again. + * + */ + +typedef struct CompletedReq { + unsigned int reqID; + HsInt len; + HsInt errCode; +} CompletedReq; + +#define MAX_REQUESTS 200 + +static Mutex queue_lock; +static HANDLE completed_req_event = INVALID_HANDLE_VALUE; +static HANDLE abandon_req_wait = INVALID_HANDLE_VALUE; +static HANDLE wait_handles[2]; +static CompletedReq completedTable[MAX_REQUESTS]; +static int completed_hw; +static HANDLE completed_table_sema; +static int issued_reqs; + +static void +onIOComplete(unsigned int reqID, + int fd STG_UNUSED, + HsInt len, + void* buf STG_UNUSED, + HsInt errCode) +{ + DWORD dwRes; + /* Deposit result of request in queue/table..when there's room. */ + dwRes = WaitForSingleObject(completed_table_sema, INFINITE); + switch (dwRes) { + case WAIT_OBJECT_0: + break; + default: + /* Not likely */ + fprintf(stderr, + "onIOComplete: failed to grab table semaphore (res=%d, err=%ld), " + "dropping request 0x%lx\n", reqID, dwRes, GetLastError()); + fflush(stderr); + return; + } + OS_ACQUIRE_LOCK(&queue_lock); + if (completed_hw == MAX_REQUESTS) { + /* Shouldn't happen */ + fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); " + "dropping.\n", reqID); + fflush(stderr); + } else { +#if 0 + fprintf(stderr, "onCompl: %d %d %d %d %d\n", + reqID, len, errCode, issued_reqs, completed_hw); + fflush(stderr); +#endif + completedTable[completed_hw].reqID = reqID; + completedTable[completed_hw].len = len; + completedTable[completed_hw].errCode = errCode; + completed_hw++; + issued_reqs--; + if (completed_hw == 1) { + /* The event is used to wake up the scheduler thread should it + * be blocked waiting for requests to complete. The event resets + * once that thread has cleared out the request queue/table. + */ + SetEvent(completed_req_event); + } + } + OS_RELEASE_LOCK(&queue_lock); +} + +unsigned int +addIORequest(int fd, + bool forWriting, + bool isSock, + HsInt len, + char* buf) +{ + OS_ACQUIRE_LOCK(&queue_lock); + issued_reqs++; + OS_RELEASE_LOCK(&queue_lock); +#if 0 + fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); + fflush(stderr); +#endif + return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete); +} + +unsigned int +addDelayRequest(HsInt usecs) +{ + OS_ACQUIRE_LOCK(&queue_lock); + issued_reqs++; + OS_RELEASE_LOCK(&queue_lock); +#if 0 + fprintf(stderr, "addDelayReq: %d\n", usecs); fflush(stderr); +#endif + return AddDelayRequest(usecs,onIOComplete); +} + +unsigned int +addDoProcRequest(void* proc, void* param) +{ + OS_ACQUIRE_LOCK(&queue_lock); + issued_reqs++; + OS_RELEASE_LOCK(&queue_lock); +#if 0 + fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr); +#endif + return AddProcRequest(proc,param,onIOComplete); +} + + +int +startupAsyncIO() +{ + if (!StartIOManager()) { + return 0; + } + OS_INIT_LOCK(&queue_lock); + /* Create a pair of events: + * + * - completed_req_event -- signals the deposit of request result; + * manual reset. + * - abandon_req_wait -- external OS thread tells current + * RTS/Scheduler thread to abandon wait + * for IO request completion. + * Auto reset. + */ + completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL); + abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL); + wait_handles[0] = completed_req_event; + wait_handles[1] = abandon_req_wait; + completed_hw = 0; + if ( !(completed_table_sema = CreateSemaphore(NULL, MAX_REQUESTS, + MAX_REQUESTS, NULL)) ) { + DWORD rc = GetLastError(); + fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n", + (int)rc); + fflush(stderr); + } + + return ( completed_req_event != INVALID_HANDLE_VALUE && + abandon_req_wait != INVALID_HANDLE_VALUE && + completed_table_sema != NULL ); +} + +void +shutdownAsyncIO(bool wait_threads) +{ + ShutdownIOManager(wait_threads); + if (completed_req_event != INVALID_HANDLE_VALUE) { + CloseHandle(completed_req_event); + completed_req_event = INVALID_HANDLE_VALUE; + } + if (abandon_req_wait != INVALID_HANDLE_VALUE) { + CloseHandle(abandon_req_wait); + abandon_req_wait = INVALID_HANDLE_VALUE; + } + if (completed_table_sema != NULL) { + CloseHandle(completed_table_sema); + completed_table_sema = NULL; + } + OS_CLOSE_LOCK(&queue_lock); +} + +/* + * Function: awaitRequests(wait) + * + * Check for the completion of external IO work requests. Worker + * threads signal completion of IO requests by depositing them + * in a table (completedTable). awaitRequests() matches up + * requests in that table with threads on the blocked_queue, + * making the threads whose IO requests have completed runnable + * again. + * + * awaitRequests() is called by the scheduler periodically _or_ if + * it is out of work, and need to wait for the completion of IO + * requests to make further progress. In the latter scenario, + * awaitRequests() will simply block waiting for worker threads + * to complete if the 'completedTable' is empty. + */ +int +awaitRequests(bool wait) +{ +#if !defined(THREADED_RTS) + // none of this is actually used in the threaded RTS + +start: +#if 0 + fprintf(stderr, "awaitRequests(): %d %d %d\n", + issued_reqs, completed_hw, wait); + fflush(stderr); +#endif + OS_ACQUIRE_LOCK(&queue_lock); + // Nothing immediately available & we won't wait + if ((!wait && completed_hw == 0) +#if 0 + // If we just return when wait==false, we'll go into a busy + // wait loop, so I disabled this condition --SDM 18/12/2003 + (issued_reqs == 0 && completed_hw == 0) +#endif + ) { + OS_RELEASE_LOCK(&queue_lock); + return 0; + } + if (completed_hw == 0) { + // empty table, drop lock and wait + OS_RELEASE_LOCK(&queue_lock); + if ( wait && sched_state == SCHED_RUNNING ) { + DWORD dwRes = WaitForMultipleObjects(2, wait_handles, + FALSE, INFINITE); + switch (dwRes) { + case WAIT_OBJECT_0: + // a request was completed + break; + case WAIT_OBJECT_0 + 1: + case WAIT_TIMEOUT: + // timeout (unlikely) or told to abandon waiting + return 0; + case WAIT_FAILED: { + DWORD dw = GetLastError(); + fprintf(stderr, "awaitRequests: wait failed -- " + "error code: %lu\n", dw); fflush(stderr); + return 0; + } + default: + fprintf(stderr, "awaitRequests: unexpected wait return " + "code %lu\n", dwRes); fflush(stderr); + return 0; + } + } else { + return 0; + } + goto start; + } else { + int i; + StgTSO *tso, *prev; + + for (i=0; i < completed_hw; i++) { + /* For each of the completed requests, match up their Ids + * with those of the threads on the blocked_queue. If the + * thread that made the IO request has been subsequently + * killed (and removed from blocked_queue), no match will + * be found for that request Id. + * + * i.e., killing a Haskell thread doesn't attempt to cancel + * the IO request it is blocked on. + * + */ + unsigned int rID = completedTable[i].reqID; + + prev = NULL; + for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; + tso = tso->_link) { + + switch(tso->why_blocked) { + case BlockedOnRead: + case BlockedOnWrite: + case BlockedOnDoProc: + if (tso->block_info.async_result->reqID == rID) { + // Found the thread blocked waiting on request; + // stodgily fill + // in its result block. + tso->block_info.async_result->len = + completedTable[i].len; + tso->block_info.async_result->errCode = + completedTable[i].errCode; + + // Drop the matched TSO from blocked_queue + if (prev) { + setTSOLink(&MainCapability, prev, tso->_link); + } else { + blocked_queue_hd = tso->_link; + } + if (blocked_queue_tl == tso) { + blocked_queue_tl = prev ? prev : END_TSO_QUEUE; + } + + // Terminates the run queue + this inner for-loop. + tso->_link = END_TSO_QUEUE; + tso->why_blocked = NotBlocked; + // save the StgAsyncIOResult in the + // stg_block_async_info stack frame, because + // the block_info field will be overwritten by + // pushOnRunQueue(). + tso->stackobj->sp[1] = (W_)tso->block_info.async_result; + pushOnRunQueue(&MainCapability, tso); + break; + } + break; + default: + if (tso->why_blocked != NotBlocked) { + barf("awaitRequests: odd thread state"); + } + break; + } + + prev = tso; + } + /* Signal that there's completed table slots available */ + if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) { + DWORD dw = GetLastError(); + fprintf(stderr, "awaitRequests: failed to signal semaphore " + "(error code=0x%x)\n", (int)dw); + fflush(stderr); + } + } + completed_hw = 0; + ResetEvent(completed_req_event); + OS_RELEASE_LOCK(&queue_lock); + return 1; + } +#endif /* !THREADED_RTS */ +} + +/* + * Function: abandonRequestWait() + * + * Wake up a thread that's blocked waiting for new IO requests + * to complete (via awaitRequests().) + */ +void +abandonRequestWait( void ) +{ + /* the event is auto-reset, but in case there's no thread + * already waiting on the event, we want to return it to + * a non-signalled state. + * + * Careful! There is no synchronisation between + * abandonRequestWait and awaitRequest, which means that + * abandonRequestWait might be called just before a thread + * goes into a wait, and we miss the abandon signal. So we + * must SetEvent() here rather than PulseEvent() to ensure + * that the event isn't lost. We can re-optimise by resetting + * the event somewhere safe if we know the event has been + * properly serviced (see resetAbandon() below). --SDM 18/12/2003 + */ + SetEvent(abandon_req_wait); + interruptIOManagerEvent (); +} + +void +resetAbandonRequestWait( void ) +{ + ResetEvent(abandon_req_wait); +} + +#endif /* !defined(THREADED_RTS) */ diff --git a/rts/win32/AsyncMIO.h b/rts/win32/AsyncMIO.h new file mode 100644 index 0000000000..63d8f34827 --- /dev/null +++ b/rts/win32/AsyncMIO.h @@ -0,0 +1,29 @@ +/* AsyncIO.h + * + * Integrating Win32 asynchronous I/O with the GHC RTS. + * + * (c) sof, 2002-2003. + * + * NOTE: This is the MIO manager, only used for --io-manager=posix. + * For the WINIO manager see AsyncWinIO.h. + */ + +#pragma once + +#include "Rts.h" + +extern unsigned int +addIORequest(int fd, + bool forWriting, + bool isSock, + HsInt len, + char* buf); +extern unsigned int addDelayRequest(HsInt usecs); +extern unsigned int addDoProcRequest(void* proc, void* param); +extern int startupAsyncIO(void); +extern void shutdownAsyncIO(bool wait_threads); + +extern int awaitRequests(bool wait); + +extern void abandonRequestWait(void); +extern void resetAbandonRequestWait(void); diff --git a/rts/win32/AsyncWinIO.c b/rts/win32/AsyncWinIO.c new file mode 100644 index 0000000000..00c2fe2913 --- /dev/null +++ b/rts/win32/AsyncWinIO.c @@ -0,0 +1,402 @@ +/* AsyncIO.h + * + * Integrating Win32 asynchronous IOCP with the GHC RTS. + * + * (c) Tamar Christina, 2018 - 2019 + * + * NOTE: This is the WinIO manager, only used for --io-manager=native. + * For the MIO manager see AsyncIO.h. + */ + +#include "Rts.h" +#include <rts/IOManager.h> +#include "AsyncWinIO.h" +#include "Prelude.h" +#include "Capability.h" +#include "Schedule.h" + +#include <stdbool.h> +#include <windows.h> +#include <stdint.h> +#include <stdio.h> + +/* Note [Non-Threaded WINIO design] + Compared to Async MIO, Async WINIO does all of the heavy processing at the + Haskell side of things. The same code as the threaded WINIO is re-used for + the Non-threaded version. Of course since we are in a non-threaded rts we + can't block on foreign calls without hanging the application. + + This file thus serves as a back-end service that continuously reads pending + evens from the given I/O completion port and notified the Haskell I/O manager + of work that has been completed. This does incur a slight cost in that the + rts has to actually schedule the Haskell thread to do the work, however this + shouldn't be a problem for performance. + + It is however a problem for the workload buffer we use as we are not allowed + to service new requests until the old ones have actually been read and + processes by the Haskell I/O side. + + To account for this the I/O manager works in two stages. + + 1) Like the threaded version, any long wait we do, we prefer to do it in an + alterable state so that we can respond immediately to new requests. Note + that once we know which completion port handle we are bound to we no longer + need the Haskell side to tell us of new work. We can simply handle any new + work pre-emptively. + + 2) We block in a non-alertable state whenever + a) The Completion port handle is yet unknown. + b) The RTS requested the I/O manager be shutdown via an event + c) We are waiting on the Haskell I/O manager to service a previous + request as to allow us to re-use the buffer. + + We would ideally like to spend as little time as possible in 2). + + The workflow for this I/O manager is as follows: + + +------------------------+ + | Worker thread creation | + +-----------+------------+ + | + | + +-------------v---------------+ + +------> Block in unalertable wait +-----+ + | +-------------+---------------+ | + | | | + | | | + | +-----------v------------+ | + | |Init by Haskell I/O call| | If init already + wait for I/O | +-----------+------------+ | + processing in | | | + Haskell side | | | + | +--------v---------+ | + Also process | | alertable wait <-----------+ + events like | +--------+---------+ + shutdown | | + | | + | +-------v--------+ + +------------+process response| + +----------------+ + + + As a design decision to keep this side as light as possible no bookkeeping + is done here to track requests. That is, this file has no wait of knowing + of the remaining outstanding I/O requests, how many it actually completed + in the last call as that list may contain spurious events. + + It works around this by having the Haskell side tell it how much work it + still has left to do. + + Unlike the Threaded version we use a single worker thread to handle + completions and so it won't scale as well. But if high scalability is needed + then use the threaded runtime. This could would have to become threadsafe + in order to use multiple threads, but this is non-trivial as the non-threaded + rts has no locks around any of the key parts. + + See also Note [WINIO Manager design]. */ + +/* The IOCP Handle all I/O requests are associated with for this RTS. */ +static HANDLE completionPortHandle = INVALID_HANDLE_VALUE; +/* Number of currently still outstanding I/O requests. */ +uint64_t outstanding_requests = 0; +/* Boolean controlling if the I/O manager is/should still be running. */ +bool running = false; +/* Boolean to indicate whether we have outstanding I/O requests that still need + to be processed by the I/O manager on the Haskell side. */ +bool outstanding_service_requests = false; +/* Indicates wether we have hit one case where we serviced as much requests as + we could because the buffer got full. In such cases for the next requests + we expand the buffers so we have room to process requests in bigger + batches. */ +bool queue_full = false; + +/* Timeout to use for the next alertable wait. INFINITE means never timeout. + Also see note [WINIO Timer management]. */ +DWORD timeout = INFINITE; +DWORD WINAPI runner (LPVOID lpParam); +HANDLE workerThread = NULL; +DWORD workerThreadId = 0; + +/* Synchronization mutex for modifying the above state variables in a thread + safe way. */ +SRWLOCK lock; +/* Conditional variable to wake the I/O manager up from a non-alertable waiting + state. */ +CONDITION_VARIABLE wakeEvent; +/* Conditional variable to force the system thread to wait for a request to + complete. */ +CONDITION_VARIABLE threadIOWait; +/* The last event that was sent to the I/O manager. */ +HsWord32 lastEvent = 0; + +/* Number of callbacks to reserve slots for in ENTRIES. This is also the + total number of concurrent I/O requests we can handle in one go. */ +uint32_t num_callbacks = 32; +/* Buffer for I/O request information. */ +OVERLAPPED_ENTRY *entries; +/* Number of I/O calls verified to have completed in the last round by the + Haskell I/O Manager. */ +uint32_t num_last_completed; + +static void notifyRtsOfFinishedCall (uint32_t num); + +/* Create and initialize the non-threaded I/O manager. */ +bool startupAsyncWinIO(void) +{ + running = true; + outstanding_service_requests = false; + completionPortHandle = INVALID_HANDLE_VALUE; + outstanding_requests = 0; + + InitializeSRWLock (&lock); + InitializeConditionVariable (&wakeEvent); + InitializeConditionVariable (&threadIOWait); + + entries = calloc (sizeof (OVERLAPPED_ENTRY), num_callbacks); + + /* Start the I/O manager before creating the worker thread to prevent a busy + wait or spin-lock, this will call registerNewIOCPHandle allowing us to + skip the initial un-alertable wait. */ + ioManagerStart (); + + workerThread = CreateThread (NULL, 0, runner, NULL, 0, &workerThreadId); + if (!workerThread) + { + barf ("could not create I/O manager thread."); + return false; + } + + return true; +} + +/* Terminate the I/O manager, if WAIT_THREADS then the call will block until + all helper threads are finished. */ +void shutdownAsyncWinIO(bool wait_threads) +{ + if (workerThread != NULL) + { + if (wait_threads) + { + AcquireSRWLockExclusive (&lock); + + running = false; + ioManagerWakeup (); + PostQueuedCompletionStatus (completionPortHandle, 0, 0, NULL); + WakeConditionVariable (&wakeEvent); + WakeConditionVariable (&threadIOWait); + + ReleaseSRWLockExclusive (&lock); + + /* Now wait for the thread to actually finish. */ + WaitForSingleObject (workerThread, INFINITE); + } + completionPortHandle = INVALID_HANDLE_VALUE; + workerThread = NULL; + workerThreadId = 0; + free (entries); + entries = NULL; + } + + /* Call back into the Haskell side to terminate things there too. */ + ioManagerDie (); +} + +/* Register the I/O completetion port handle PORT that the I/O manager will be + monitoring. All handles are expected to be associated with this handle. */ +void registerNewIOCPHandle (HANDLE port) +{ + AcquireSRWLockExclusive (&lock); + + completionPortHandle = port; + + ReleaseSRWLockExclusive (&lock); +} + +/* Callback hook so the Haskell part of the I/O manager can notify this manager + that a request someone is waiting on was completed synchronously. This means + we need to wake up the scheduler as there is work to be done. */ + +void completeSynchronousRequest (void) +{ + AcquireSRWLockExclusive (&lock); + + WakeConditionVariable (&threadIOWait); + + ReleaseSRWLockExclusive (&lock); +} + + +/* Register a new I/O request that the I/O manager should handle. PORT is the + completion port handle that the request is associated with, MSSEC is the + maximum amount of time in milliseconds that an alertable wait should be done + for before the RTS requested to be notified of progress and NUM_REQ is the + total overall number of outstanding I/O requests. */ + +void registerAlertableWait (HANDLE port, DWORD mssec, uint64_t num_req) +{ + bool interrupt = false; + bool wakeup = false; + AcquireSRWLockExclusive (&lock); + + /* Decide if we may have to wake up the I/O manager. */ + wakeup = outstanding_requests == 0 && num_req > 0; + + outstanding_requests = num_req; + /* If the new timeout is earlier than the old one we have to reschedule the + wait. Do this by interrupting the current operation and setting the new + timeout, since it must be the shortest one in the queue. */ + if (timeout > mssec) + { + timeout = mssec; + interrupt = true; + } + + ReleaseSRWLockExclusive (&lock); + + if (wakeup) + WakeConditionVariable (&wakeEvent); + else if (interrupt) + PostQueuedCompletionStatus (port, 0, 0, NULL); +} + +/* Exported callback function that will be called by the RTS to collect the + finished overlapped entried belonging to the completed I/O requests. The + number of read entries will be returned in NUM. + + NOTE: This function isn't thread safe, but is intended to be called only + when requested to by the I/O manager via notifyRtsOfFinishedCall. In + that context it is thread safe as we're guaranteeing that the I/O + manager is blocked waiting for the read to happen followed by a + servicedIOEntries call. */ +OVERLAPPED_ENTRY* getOverlappedEntries (uint32_t *num) +{ + *num = num_last_completed; + return entries; +} + +/* Internal function to notify the RTS of NUM completed I/O requests. */ +static void notifyRtsOfFinishedCall (uint32_t num) +{ + num_last_completed = num; +#if !defined(THREADED_RTS) + Capability *cap = &MainCapability; + StgTSO * tso = createStrictIOThread (cap, RtsFlags.GcFlags.initialStkSize, + processRemoteCompletion_closure); + AcquireSRWLockExclusive (&lock); + if (num > 0) + outstanding_service_requests = true; + + scheduleThread (cap, tso); + + WakeConditionVariable (&threadIOWait); + ReleaseSRWLockExclusive (&lock); +#endif +} + +/* Called by the scheduler when we have ran out of work to do and we have at + least one thread blocked on an I/O Port. When WAIT then if this function + returns you will have at least one action to service, though this may be a + wake-up action. */ + +void awaitAsyncRequests (bool wait) +{ + AcquireSRWLockExclusive (&lock); + /* We don't deal with spurious requests here, that's left up to AwaitEvent.c + because in principle we need to check if the capability work queue is now + not empty but we can't do that here. Also these locks don't guarantee + fairness, as such a request may have completed without us seeing a + timeslice in between. */ + if (wait && !outstanding_service_requests) + SleepConditionVariableSRW (&threadIOWait, &lock, INFINITE, 0); + ReleaseSRWLockExclusive (&lock); +} + + +/* Exported function that will be called by the RTS in order to indicate that + the RTS has successfully finished servicing I/O request returned with + getOverlappedEntries. Some of the overlapped requests may not have been + an I/O request so the RTS also returns the amount of REMAINING overlapped + entried it still expects to be processed. */ + +void servicedIOEntries (uint64_t remaining) +{ + AcquireSRWLockExclusive (&lock); + + outstanding_requests = remaining; + if (outstanding_requests <= 0) + timeout = INFINITE; + outstanding_service_requests = false; + + if (queue_full) + { + num_callbacks *= 2; + OVERLAPPED_ENTRY *new + = realloc (entries, + sizeof (OVERLAPPED_ENTRY) * num_callbacks); + if (new) + entries = new; + } + + ReleaseSRWLockExclusive (&lock); + + WakeConditionVariable (&wakeEvent); +} +/* Main thread runner for the non-threaded I/O Manager. */ + +DWORD WINAPI runner (LPVOID lpParam STG_UNUSED) +{ + while (running) + { + AcquireSRWLockExclusive (&lock); + + lastEvent = readIOManagerEvent (); + /* Non-alertable wait. While here we can't server any I/O requests so we + would ideally like to spent as little time here as possible. As such + there are only 3 reasons to enter this state: + + 1) I/O manager hasn't been fully initialized yet. + 2) I/O manager was told to shutdown, instead of doing that we just + block indefinitely so we don't have to recreate the thread to start + back up. + 3) We are waiting for the RTS to service the last round of requests. */ + while (completionPortHandle == INVALID_HANDLE_VALUE + || lastEvent == IO_MANAGER_DIE + || outstanding_service_requests) + { + SleepConditionVariableSRW (&wakeEvent, &lock, INFINITE, 0); + HsWord32 nextEvent = readIOManagerEvent (); + lastEvent = nextEvent ? nextEvent : lastEvent; + } + + ReleaseSRWLockExclusive (&lock); + + ULONG num_removed = -1; + ZeroMemory (entries, sizeof (entries[0]) * num_callbacks); + if (GetQueuedCompletionStatusEx (completionPortHandle, entries, + num_callbacks, &num_removed, timeout, + false)) + { + if (outstanding_requests == 0) + num_removed = 0; + + if (num_removed > 0) + { + queue_full = num_removed == num_callbacks; + notifyRtsOfFinishedCall (num_removed); + } + } + else if (WAIT_TIMEOUT == GetLastError ()) + { + num_removed = 0; + notifyRtsOfFinishedCall (num_removed); + } + + AcquireSRWLockExclusive (&lock); + + if (!running) + ExitThread (0); + + ReleaseSRWLockExclusive (&lock); + } + return 0; +} diff --git a/rts/win32/AsyncWinIO.h b/rts/win32/AsyncWinIO.h new file mode 100644 index 0000000000..34a0f502de --- /dev/null +++ b/rts/win32/AsyncWinIO.h @@ -0,0 +1,26 @@ +/* AsyncIO.h + * + * Integrating Win32 asynchronous IOCP with the GHC RTS. + * + * (c) Tamar Christina, 2018 + * + * NOTE: This is the WinIO manager, only used for --io-manager=native. + * For the MIO manager see AsyncIO.h. + */ + +#pragma once + +#include "Rts.h" +#include <stdbool.h> +#include <windows.h> + +extern bool startupAsyncWinIO(void); +extern void shutdownAsyncWinIO(bool wait_threads); +extern void awaitAsyncRequests(bool wait); +extern void registerNewIOCPHandle (HANDLE port); +extern void registerAlertableWait (HANDLE port, DWORD mssec, uint64_t num_req); + +extern OVERLAPPED_ENTRY* getOverlappedEntries (uint32_t *num); +extern void servicedIOEntries (uint64_t remaining); +extern void completeSynchronousRequest (void); + |