summaryrefslogtreecommitdiff
path: root/rts
diff options
context:
space:
mode:
authorTamar Christina <tamar@zhox.com>2019-06-16 21:32:25 +0100
committerBen Gamari <ben@smart-cactus.org>2020-07-15 16:41:02 -0400
commit64d8f2fe2d27743e2986d2176b1aa934e5484d7a (patch)
treeddf3b643330f032b332e1f476d2809fe35ae6072 /rts
parent4489af6bad11a198e9e6c192f41e17020f28d0c1 (diff)
downloadhaskell-64d8f2fe2d27743e2986d2176b1aa934e5484d7a.tar.gz
winio: core non-threaded I/O manager
Diffstat (limited to 'rts')
-rw-r--r--rts/win32/AsyncMIO.c387
-rw-r--r--rts/win32/AsyncMIO.h29
-rw-r--r--rts/win32/AsyncWinIO.c402
-rw-r--r--rts/win32/AsyncWinIO.h26
4 files changed, 844 insertions, 0 deletions
diff --git a/rts/win32/AsyncMIO.c b/rts/win32/AsyncMIO.c
new file mode 100644
index 0000000000..5d55f79d74
--- /dev/null
+++ b/rts/win32/AsyncMIO.c
@@ -0,0 +1,387 @@
+/* AsyncIO.c
+ *
+ * Integrating Win32 asynchronous I/O with the GHC RTS.
+ *
+ * (c) sof, 2002-2003.
+ *
+ * NOTE: This is the MIO manager, only used for --io-manager=posix.
+ * For the WINIO manager see base in the GHC.Event modules.
+ */
+
+#if !defined(THREADED_RTS)
+
+#include "Rts.h"
+#include "RtsUtils.h"
+#include <windows.h>
+#include <stdio.h>
+#include "Schedule.h"
+#include "Capability.h"
+#include "win32/AsyncMIO.h"
+#include "win32/IOManager.h"
+
+/*
+ * Overview:
+ *
+ * Haskell code issue asynchronous I/O requests via the
+ * async{Read,Write,DoOp}# primops. These cause addIORequest()
+ * to be invoked, which forwards the request to the underlying
+ * asynchronous I/O subsystem. Each request is tagged with a unique
+ * ID.
+ *
+ * addIORequest() returns this ID, so that when the blocked CH
+ * thread is added onto blocked_queue, its TSO is annotated with
+ * it. Upon completion of an I/O request, the async I/O handling
+ * code makes a back-call to signal its completion; the local
+ * onIOComplete() routine. It adds the IO request ID (along with
+ * its result data) to a queue of completed requests before returning.
+ *
+ * The queue of completed IO request is read by the thread operating
+ * the RTS scheduler. It de-queues the CH threads corresponding
+ * to the request IDs, making them runnable again.
+ *
+ */
+
+typedef struct CompletedReq {
+ unsigned int reqID;
+ HsInt len;
+ HsInt errCode;
+} CompletedReq;
+
+#define MAX_REQUESTS 200
+
+static Mutex queue_lock;
+static HANDLE completed_req_event = INVALID_HANDLE_VALUE;
+static HANDLE abandon_req_wait = INVALID_HANDLE_VALUE;
+static HANDLE wait_handles[2];
+static CompletedReq completedTable[MAX_REQUESTS];
+static int completed_hw;
+static HANDLE completed_table_sema;
+static int issued_reqs;
+
+static void
+onIOComplete(unsigned int reqID,
+ int fd STG_UNUSED,
+ HsInt len,
+ void* buf STG_UNUSED,
+ HsInt errCode)
+{
+ DWORD dwRes;
+ /* Deposit result of request in queue/table..when there's room. */
+ dwRes = WaitForSingleObject(completed_table_sema, INFINITE);
+ switch (dwRes) {
+ case WAIT_OBJECT_0:
+ break;
+ default:
+ /* Not likely */
+ fprintf(stderr,
+ "onIOComplete: failed to grab table semaphore (res=%d, err=%ld), "
+ "dropping request 0x%lx\n", reqID, dwRes, GetLastError());
+ fflush(stderr);
+ return;
+ }
+ OS_ACQUIRE_LOCK(&queue_lock);
+ if (completed_hw == MAX_REQUESTS) {
+ /* Shouldn't happen */
+ fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); "
+ "dropping.\n", reqID);
+ fflush(stderr);
+ } else {
+#if 0
+ fprintf(stderr, "onCompl: %d %d %d %d %d\n",
+ reqID, len, errCode, issued_reqs, completed_hw);
+ fflush(stderr);
+#endif
+ completedTable[completed_hw].reqID = reqID;
+ completedTable[completed_hw].len = len;
+ completedTable[completed_hw].errCode = errCode;
+ completed_hw++;
+ issued_reqs--;
+ if (completed_hw == 1) {
+ /* The event is used to wake up the scheduler thread should it
+ * be blocked waiting for requests to complete. The event resets
+ * once that thread has cleared out the request queue/table.
+ */
+ SetEvent(completed_req_event);
+ }
+ }
+ OS_RELEASE_LOCK(&queue_lock);
+}
+
+unsigned int
+addIORequest(int fd,
+ bool forWriting,
+ bool isSock,
+ HsInt len,
+ char* buf)
+{
+ OS_ACQUIRE_LOCK(&queue_lock);
+ issued_reqs++;
+ OS_RELEASE_LOCK(&queue_lock);
+#if 0
+ fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len);
+ fflush(stderr);
+#endif
+ return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
+}
+
+unsigned int
+addDelayRequest(HsInt usecs)
+{
+ OS_ACQUIRE_LOCK(&queue_lock);
+ issued_reqs++;
+ OS_RELEASE_LOCK(&queue_lock);
+#if 0
+ fprintf(stderr, "addDelayReq: %d\n", usecs); fflush(stderr);
+#endif
+ return AddDelayRequest(usecs,onIOComplete);
+}
+
+unsigned int
+addDoProcRequest(void* proc, void* param)
+{
+ OS_ACQUIRE_LOCK(&queue_lock);
+ issued_reqs++;
+ OS_RELEASE_LOCK(&queue_lock);
+#if 0
+ fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
+#endif
+ return AddProcRequest(proc,param,onIOComplete);
+}
+
+
+int
+startupAsyncIO()
+{
+ if (!StartIOManager()) {
+ return 0;
+ }
+ OS_INIT_LOCK(&queue_lock);
+ /* Create a pair of events:
+ *
+ * - completed_req_event -- signals the deposit of request result;
+ * manual reset.
+ * - abandon_req_wait -- external OS thread tells current
+ * RTS/Scheduler thread to abandon wait
+ * for IO request completion.
+ * Auto reset.
+ */
+ completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
+ abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL);
+ wait_handles[0] = completed_req_event;
+ wait_handles[1] = abandon_req_wait;
+ completed_hw = 0;
+ if ( !(completed_table_sema = CreateSemaphore(NULL, MAX_REQUESTS,
+ MAX_REQUESTS, NULL)) ) {
+ DWORD rc = GetLastError();
+ fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n",
+ (int)rc);
+ fflush(stderr);
+ }
+
+ return ( completed_req_event != INVALID_HANDLE_VALUE &&
+ abandon_req_wait != INVALID_HANDLE_VALUE &&
+ completed_table_sema != NULL );
+}
+
+void
+shutdownAsyncIO(bool wait_threads)
+{
+ ShutdownIOManager(wait_threads);
+ if (completed_req_event != INVALID_HANDLE_VALUE) {
+ CloseHandle(completed_req_event);
+ completed_req_event = INVALID_HANDLE_VALUE;
+ }
+ if (abandon_req_wait != INVALID_HANDLE_VALUE) {
+ CloseHandle(abandon_req_wait);
+ abandon_req_wait = INVALID_HANDLE_VALUE;
+ }
+ if (completed_table_sema != NULL) {
+ CloseHandle(completed_table_sema);
+ completed_table_sema = NULL;
+ }
+ OS_CLOSE_LOCK(&queue_lock);
+}
+
+/*
+ * Function: awaitRequests(wait)
+ *
+ * Check for the completion of external IO work requests. Worker
+ * threads signal completion of IO requests by depositing them
+ * in a table (completedTable). awaitRequests() matches up
+ * requests in that table with threads on the blocked_queue,
+ * making the threads whose IO requests have completed runnable
+ * again.
+ *
+ * awaitRequests() is called by the scheduler periodically _or_ if
+ * it is out of work, and need to wait for the completion of IO
+ * requests to make further progress. In the latter scenario,
+ * awaitRequests() will simply block waiting for worker threads
+ * to complete if the 'completedTable' is empty.
+ */
+int
+awaitRequests(bool wait)
+{
+#if !defined(THREADED_RTS)
+ // none of this is actually used in the threaded RTS
+
+start:
+#if 0
+ fprintf(stderr, "awaitRequests(): %d %d %d\n",
+ issued_reqs, completed_hw, wait);
+ fflush(stderr);
+#endif
+ OS_ACQUIRE_LOCK(&queue_lock);
+ // Nothing immediately available & we won't wait
+ if ((!wait && completed_hw == 0)
+#if 0
+ // If we just return when wait==false, we'll go into a busy
+ // wait loop, so I disabled this condition --SDM 18/12/2003
+ (issued_reqs == 0 && completed_hw == 0)
+#endif
+ ) {
+ OS_RELEASE_LOCK(&queue_lock);
+ return 0;
+ }
+ if (completed_hw == 0) {
+ // empty table, drop lock and wait
+ OS_RELEASE_LOCK(&queue_lock);
+ if ( wait && sched_state == SCHED_RUNNING ) {
+ DWORD dwRes = WaitForMultipleObjects(2, wait_handles,
+ FALSE, INFINITE);
+ switch (dwRes) {
+ case WAIT_OBJECT_0:
+ // a request was completed
+ break;
+ case WAIT_OBJECT_0 + 1:
+ case WAIT_TIMEOUT:
+ // timeout (unlikely) or told to abandon waiting
+ return 0;
+ case WAIT_FAILED: {
+ DWORD dw = GetLastError();
+ fprintf(stderr, "awaitRequests: wait failed -- "
+ "error code: %lu\n", dw); fflush(stderr);
+ return 0;
+ }
+ default:
+ fprintf(stderr, "awaitRequests: unexpected wait return "
+ "code %lu\n", dwRes); fflush(stderr);
+ return 0;
+ }
+ } else {
+ return 0;
+ }
+ goto start;
+ } else {
+ int i;
+ StgTSO *tso, *prev;
+
+ for (i=0; i < completed_hw; i++) {
+ /* For each of the completed requests, match up their Ids
+ * with those of the threads on the blocked_queue. If the
+ * thread that made the IO request has been subsequently
+ * killed (and removed from blocked_queue), no match will
+ * be found for that request Id.
+ *
+ * i.e., killing a Haskell thread doesn't attempt to cancel
+ * the IO request it is blocked on.
+ *
+ */
+ unsigned int rID = completedTable[i].reqID;
+
+ prev = NULL;
+ for(tso = blocked_queue_hd; tso != END_TSO_QUEUE;
+ tso = tso->_link) {
+
+ switch(tso->why_blocked) {
+ case BlockedOnRead:
+ case BlockedOnWrite:
+ case BlockedOnDoProc:
+ if (tso->block_info.async_result->reqID == rID) {
+ // Found the thread blocked waiting on request;
+ // stodgily fill
+ // in its result block.
+ tso->block_info.async_result->len =
+ completedTable[i].len;
+ tso->block_info.async_result->errCode =
+ completedTable[i].errCode;
+
+ // Drop the matched TSO from blocked_queue
+ if (prev) {
+ setTSOLink(&MainCapability, prev, tso->_link);
+ } else {
+ blocked_queue_hd = tso->_link;
+ }
+ if (blocked_queue_tl == tso) {
+ blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
+ }
+
+ // Terminates the run queue + this inner for-loop.
+ tso->_link = END_TSO_QUEUE;
+ tso->why_blocked = NotBlocked;
+ // save the StgAsyncIOResult in the
+ // stg_block_async_info stack frame, because
+ // the block_info field will be overwritten by
+ // pushOnRunQueue().
+ tso->stackobj->sp[1] = (W_)tso->block_info.async_result;
+ pushOnRunQueue(&MainCapability, tso);
+ break;
+ }
+ break;
+ default:
+ if (tso->why_blocked != NotBlocked) {
+ barf("awaitRequests: odd thread state");
+ }
+ break;
+ }
+
+ prev = tso;
+ }
+ /* Signal that there's completed table slots available */
+ if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) {
+ DWORD dw = GetLastError();
+ fprintf(stderr, "awaitRequests: failed to signal semaphore "
+ "(error code=0x%x)\n", (int)dw);
+ fflush(stderr);
+ }
+ }
+ completed_hw = 0;
+ ResetEvent(completed_req_event);
+ OS_RELEASE_LOCK(&queue_lock);
+ return 1;
+ }
+#endif /* !THREADED_RTS */
+}
+
+/*
+ * Function: abandonRequestWait()
+ *
+ * Wake up a thread that's blocked waiting for new IO requests
+ * to complete (via awaitRequests().)
+ */
+void
+abandonRequestWait( void )
+{
+ /* the event is auto-reset, but in case there's no thread
+ * already waiting on the event, we want to return it to
+ * a non-signalled state.
+ *
+ * Careful! There is no synchronisation between
+ * abandonRequestWait and awaitRequest, which means that
+ * abandonRequestWait might be called just before a thread
+ * goes into a wait, and we miss the abandon signal. So we
+ * must SetEvent() here rather than PulseEvent() to ensure
+ * that the event isn't lost. We can re-optimise by resetting
+ * the event somewhere safe if we know the event has been
+ * properly serviced (see resetAbandon() below). --SDM 18/12/2003
+ */
+ SetEvent(abandon_req_wait);
+ interruptIOManagerEvent ();
+}
+
+void
+resetAbandonRequestWait( void )
+{
+ ResetEvent(abandon_req_wait);
+}
+
+#endif /* !defined(THREADED_RTS) */
diff --git a/rts/win32/AsyncMIO.h b/rts/win32/AsyncMIO.h
new file mode 100644
index 0000000000..63d8f34827
--- /dev/null
+++ b/rts/win32/AsyncMIO.h
@@ -0,0 +1,29 @@
+/* AsyncIO.h
+ *
+ * Integrating Win32 asynchronous I/O with the GHC RTS.
+ *
+ * (c) sof, 2002-2003.
+ *
+ * NOTE: This is the MIO manager, only used for --io-manager=posix.
+ * For the WINIO manager see AsyncWinIO.h.
+ */
+
+#pragma once
+
+#include "Rts.h"
+
+extern unsigned int
+addIORequest(int fd,
+ bool forWriting,
+ bool isSock,
+ HsInt len,
+ char* buf);
+extern unsigned int addDelayRequest(HsInt usecs);
+extern unsigned int addDoProcRequest(void* proc, void* param);
+extern int startupAsyncIO(void);
+extern void shutdownAsyncIO(bool wait_threads);
+
+extern int awaitRequests(bool wait);
+
+extern void abandonRequestWait(void);
+extern void resetAbandonRequestWait(void);
diff --git a/rts/win32/AsyncWinIO.c b/rts/win32/AsyncWinIO.c
new file mode 100644
index 0000000000..00c2fe2913
--- /dev/null
+++ b/rts/win32/AsyncWinIO.c
@@ -0,0 +1,402 @@
+/* AsyncIO.h
+ *
+ * Integrating Win32 asynchronous IOCP with the GHC RTS.
+ *
+ * (c) Tamar Christina, 2018 - 2019
+ *
+ * NOTE: This is the WinIO manager, only used for --io-manager=native.
+ * For the MIO manager see AsyncIO.h.
+ */
+
+#include "Rts.h"
+#include <rts/IOManager.h>
+#include "AsyncWinIO.h"
+#include "Prelude.h"
+#include "Capability.h"
+#include "Schedule.h"
+
+#include <stdbool.h>
+#include <windows.h>
+#include <stdint.h>
+#include <stdio.h>
+
+/* Note [Non-Threaded WINIO design]
+ Compared to Async MIO, Async WINIO does all of the heavy processing at the
+ Haskell side of things. The same code as the threaded WINIO is re-used for
+ the Non-threaded version. Of course since we are in a non-threaded rts we
+ can't block on foreign calls without hanging the application.
+
+ This file thus serves as a back-end service that continuously reads pending
+ evens from the given I/O completion port and notified the Haskell I/O manager
+ of work that has been completed. This does incur a slight cost in that the
+ rts has to actually schedule the Haskell thread to do the work, however this
+ shouldn't be a problem for performance.
+
+ It is however a problem for the workload buffer we use as we are not allowed
+ to service new requests until the old ones have actually been read and
+ processes by the Haskell I/O side.
+
+ To account for this the I/O manager works in two stages.
+
+ 1) Like the threaded version, any long wait we do, we prefer to do it in an
+ alterable state so that we can respond immediately to new requests. Note
+ that once we know which completion port handle we are bound to we no longer
+ need the Haskell side to tell us of new work. We can simply handle any new
+ work pre-emptively.
+
+ 2) We block in a non-alertable state whenever
+ a) The Completion port handle is yet unknown.
+ b) The RTS requested the I/O manager be shutdown via an event
+ c) We are waiting on the Haskell I/O manager to service a previous
+ request as to allow us to re-use the buffer.
+
+ We would ideally like to spend as little time as possible in 2).
+
+ The workflow for this I/O manager is as follows:
+
+ +------------------------+
+ | Worker thread creation |
+ +-----------+------------+
+ |
+ |
+ +-------------v---------------+
+ +------> Block in unalertable wait +-----+
+ | +-------------+---------------+ |
+ | | |
+ | | |
+ | +-----------v------------+ |
+ | |Init by Haskell I/O call| | If init already
+ wait for I/O | +-----------+------------+ |
+ processing in | | |
+ Haskell side | | |
+ | +--------v---------+ |
+ Also process | | alertable wait <-----------+
+ events like | +--------+---------+
+ shutdown | |
+ | |
+ | +-------v--------+
+ +------------+process response|
+ +----------------+
+
+
+ As a design decision to keep this side as light as possible no bookkeeping
+ is done here to track requests. That is, this file has no wait of knowing
+ of the remaining outstanding I/O requests, how many it actually completed
+ in the last call as that list may contain spurious events.
+
+ It works around this by having the Haskell side tell it how much work it
+ still has left to do.
+
+ Unlike the Threaded version we use a single worker thread to handle
+ completions and so it won't scale as well. But if high scalability is needed
+ then use the threaded runtime. This could would have to become threadsafe
+ in order to use multiple threads, but this is non-trivial as the non-threaded
+ rts has no locks around any of the key parts.
+
+ See also Note [WINIO Manager design]. */
+
+/* The IOCP Handle all I/O requests are associated with for this RTS. */
+static HANDLE completionPortHandle = INVALID_HANDLE_VALUE;
+/* Number of currently still outstanding I/O requests. */
+uint64_t outstanding_requests = 0;
+/* Boolean controlling if the I/O manager is/should still be running. */
+bool running = false;
+/* Boolean to indicate whether we have outstanding I/O requests that still need
+ to be processed by the I/O manager on the Haskell side. */
+bool outstanding_service_requests = false;
+/* Indicates wether we have hit one case where we serviced as much requests as
+ we could because the buffer got full. In such cases for the next requests
+ we expand the buffers so we have room to process requests in bigger
+ batches. */
+bool queue_full = false;
+
+/* Timeout to use for the next alertable wait. INFINITE means never timeout.
+ Also see note [WINIO Timer management]. */
+DWORD timeout = INFINITE;
+DWORD WINAPI runner (LPVOID lpParam);
+HANDLE workerThread = NULL;
+DWORD workerThreadId = 0;
+
+/* Synchronization mutex for modifying the above state variables in a thread
+ safe way. */
+SRWLOCK lock;
+/* Conditional variable to wake the I/O manager up from a non-alertable waiting
+ state. */
+CONDITION_VARIABLE wakeEvent;
+/* Conditional variable to force the system thread to wait for a request to
+ complete. */
+CONDITION_VARIABLE threadIOWait;
+/* The last event that was sent to the I/O manager. */
+HsWord32 lastEvent = 0;
+
+/* Number of callbacks to reserve slots for in ENTRIES. This is also the
+ total number of concurrent I/O requests we can handle in one go. */
+uint32_t num_callbacks = 32;
+/* Buffer for I/O request information. */
+OVERLAPPED_ENTRY *entries;
+/* Number of I/O calls verified to have completed in the last round by the
+ Haskell I/O Manager. */
+uint32_t num_last_completed;
+
+static void notifyRtsOfFinishedCall (uint32_t num);
+
+/* Create and initialize the non-threaded I/O manager. */
+bool startupAsyncWinIO(void)
+{
+ running = true;
+ outstanding_service_requests = false;
+ completionPortHandle = INVALID_HANDLE_VALUE;
+ outstanding_requests = 0;
+
+ InitializeSRWLock (&lock);
+ InitializeConditionVariable (&wakeEvent);
+ InitializeConditionVariable (&threadIOWait);
+
+ entries = calloc (sizeof (OVERLAPPED_ENTRY), num_callbacks);
+
+ /* Start the I/O manager before creating the worker thread to prevent a busy
+ wait or spin-lock, this will call registerNewIOCPHandle allowing us to
+ skip the initial un-alertable wait. */
+ ioManagerStart ();
+
+ workerThread = CreateThread (NULL, 0, runner, NULL, 0, &workerThreadId);
+ if (!workerThread)
+ {
+ barf ("could not create I/O manager thread.");
+ return false;
+ }
+
+ return true;
+}
+
+/* Terminate the I/O manager, if WAIT_THREADS then the call will block until
+ all helper threads are finished. */
+void shutdownAsyncWinIO(bool wait_threads)
+{
+ if (workerThread != NULL)
+ {
+ if (wait_threads)
+ {
+ AcquireSRWLockExclusive (&lock);
+
+ running = false;
+ ioManagerWakeup ();
+ PostQueuedCompletionStatus (completionPortHandle, 0, 0, NULL);
+ WakeConditionVariable (&wakeEvent);
+ WakeConditionVariable (&threadIOWait);
+
+ ReleaseSRWLockExclusive (&lock);
+
+ /* Now wait for the thread to actually finish. */
+ WaitForSingleObject (workerThread, INFINITE);
+ }
+ completionPortHandle = INVALID_HANDLE_VALUE;
+ workerThread = NULL;
+ workerThreadId = 0;
+ free (entries);
+ entries = NULL;
+ }
+
+ /* Call back into the Haskell side to terminate things there too. */
+ ioManagerDie ();
+}
+
+/* Register the I/O completetion port handle PORT that the I/O manager will be
+ monitoring. All handles are expected to be associated with this handle. */
+void registerNewIOCPHandle (HANDLE port)
+{
+ AcquireSRWLockExclusive (&lock);
+
+ completionPortHandle = port;
+
+ ReleaseSRWLockExclusive (&lock);
+}
+
+/* Callback hook so the Haskell part of the I/O manager can notify this manager
+ that a request someone is waiting on was completed synchronously. This means
+ we need to wake up the scheduler as there is work to be done. */
+
+void completeSynchronousRequest (void)
+{
+ AcquireSRWLockExclusive (&lock);
+
+ WakeConditionVariable (&threadIOWait);
+
+ ReleaseSRWLockExclusive (&lock);
+}
+
+
+/* Register a new I/O request that the I/O manager should handle. PORT is the
+ completion port handle that the request is associated with, MSSEC is the
+ maximum amount of time in milliseconds that an alertable wait should be done
+ for before the RTS requested to be notified of progress and NUM_REQ is the
+ total overall number of outstanding I/O requests. */
+
+void registerAlertableWait (HANDLE port, DWORD mssec, uint64_t num_req)
+{
+ bool interrupt = false;
+ bool wakeup = false;
+ AcquireSRWLockExclusive (&lock);
+
+ /* Decide if we may have to wake up the I/O manager. */
+ wakeup = outstanding_requests == 0 && num_req > 0;
+
+ outstanding_requests = num_req;
+ /* If the new timeout is earlier than the old one we have to reschedule the
+ wait. Do this by interrupting the current operation and setting the new
+ timeout, since it must be the shortest one in the queue. */
+ if (timeout > mssec)
+ {
+ timeout = mssec;
+ interrupt = true;
+ }
+
+ ReleaseSRWLockExclusive (&lock);
+
+ if (wakeup)
+ WakeConditionVariable (&wakeEvent);
+ else if (interrupt)
+ PostQueuedCompletionStatus (port, 0, 0, NULL);
+}
+
+/* Exported callback function that will be called by the RTS to collect the
+ finished overlapped entried belonging to the completed I/O requests. The
+ number of read entries will be returned in NUM.
+
+ NOTE: This function isn't thread safe, but is intended to be called only
+ when requested to by the I/O manager via notifyRtsOfFinishedCall. In
+ that context it is thread safe as we're guaranteeing that the I/O
+ manager is blocked waiting for the read to happen followed by a
+ servicedIOEntries call. */
+OVERLAPPED_ENTRY* getOverlappedEntries (uint32_t *num)
+{
+ *num = num_last_completed;
+ return entries;
+}
+
+/* Internal function to notify the RTS of NUM completed I/O requests. */
+static void notifyRtsOfFinishedCall (uint32_t num)
+{
+ num_last_completed = num;
+#if !defined(THREADED_RTS)
+ Capability *cap = &MainCapability;
+ StgTSO * tso = createStrictIOThread (cap, RtsFlags.GcFlags.initialStkSize,
+ processRemoteCompletion_closure);
+ AcquireSRWLockExclusive (&lock);
+ if (num > 0)
+ outstanding_service_requests = true;
+
+ scheduleThread (cap, tso);
+
+ WakeConditionVariable (&threadIOWait);
+ ReleaseSRWLockExclusive (&lock);
+#endif
+}
+
+/* Called by the scheduler when we have ran out of work to do and we have at
+ least one thread blocked on an I/O Port. When WAIT then if this function
+ returns you will have at least one action to service, though this may be a
+ wake-up action. */
+
+void awaitAsyncRequests (bool wait)
+{
+ AcquireSRWLockExclusive (&lock);
+ /* We don't deal with spurious requests here, that's left up to AwaitEvent.c
+ because in principle we need to check if the capability work queue is now
+ not empty but we can't do that here. Also these locks don't guarantee
+ fairness, as such a request may have completed without us seeing a
+ timeslice in between. */
+ if (wait && !outstanding_service_requests)
+ SleepConditionVariableSRW (&threadIOWait, &lock, INFINITE, 0);
+ ReleaseSRWLockExclusive (&lock);
+}
+
+
+/* Exported function that will be called by the RTS in order to indicate that
+ the RTS has successfully finished servicing I/O request returned with
+ getOverlappedEntries. Some of the overlapped requests may not have been
+ an I/O request so the RTS also returns the amount of REMAINING overlapped
+ entried it still expects to be processed. */
+
+void servicedIOEntries (uint64_t remaining)
+{
+ AcquireSRWLockExclusive (&lock);
+
+ outstanding_requests = remaining;
+ if (outstanding_requests <= 0)
+ timeout = INFINITE;
+ outstanding_service_requests = false;
+
+ if (queue_full)
+ {
+ num_callbacks *= 2;
+ OVERLAPPED_ENTRY *new
+ = realloc (entries,
+ sizeof (OVERLAPPED_ENTRY) * num_callbacks);
+ if (new)
+ entries = new;
+ }
+
+ ReleaseSRWLockExclusive (&lock);
+
+ WakeConditionVariable (&wakeEvent);
+}
+/* Main thread runner for the non-threaded I/O Manager. */
+
+DWORD WINAPI runner (LPVOID lpParam STG_UNUSED)
+{
+ while (running)
+ {
+ AcquireSRWLockExclusive (&lock);
+
+ lastEvent = readIOManagerEvent ();
+ /* Non-alertable wait. While here we can't server any I/O requests so we
+ would ideally like to spent as little time here as possible. As such
+ there are only 3 reasons to enter this state:
+
+ 1) I/O manager hasn't been fully initialized yet.
+ 2) I/O manager was told to shutdown, instead of doing that we just
+ block indefinitely so we don't have to recreate the thread to start
+ back up.
+ 3) We are waiting for the RTS to service the last round of requests. */
+ while (completionPortHandle == INVALID_HANDLE_VALUE
+ || lastEvent == IO_MANAGER_DIE
+ || outstanding_service_requests)
+ {
+ SleepConditionVariableSRW (&wakeEvent, &lock, INFINITE, 0);
+ HsWord32 nextEvent = readIOManagerEvent ();
+ lastEvent = nextEvent ? nextEvent : lastEvent;
+ }
+
+ ReleaseSRWLockExclusive (&lock);
+
+ ULONG num_removed = -1;
+ ZeroMemory (entries, sizeof (entries[0]) * num_callbacks);
+ if (GetQueuedCompletionStatusEx (completionPortHandle, entries,
+ num_callbacks, &num_removed, timeout,
+ false))
+ {
+ if (outstanding_requests == 0)
+ num_removed = 0;
+
+ if (num_removed > 0)
+ {
+ queue_full = num_removed == num_callbacks;
+ notifyRtsOfFinishedCall (num_removed);
+ }
+ }
+ else if (WAIT_TIMEOUT == GetLastError ())
+ {
+ num_removed = 0;
+ notifyRtsOfFinishedCall (num_removed);
+ }
+
+ AcquireSRWLockExclusive (&lock);
+
+ if (!running)
+ ExitThread (0);
+
+ ReleaseSRWLockExclusive (&lock);
+ }
+ return 0;
+}
diff --git a/rts/win32/AsyncWinIO.h b/rts/win32/AsyncWinIO.h
new file mode 100644
index 0000000000..34a0f502de
--- /dev/null
+++ b/rts/win32/AsyncWinIO.h
@@ -0,0 +1,26 @@
+/* AsyncIO.h
+ *
+ * Integrating Win32 asynchronous IOCP with the GHC RTS.
+ *
+ * (c) Tamar Christina, 2018
+ *
+ * NOTE: This is the WinIO manager, only used for --io-manager=native.
+ * For the MIO manager see AsyncIO.h.
+ */
+
+#pragma once
+
+#include "Rts.h"
+#include <stdbool.h>
+#include <windows.h>
+
+extern bool startupAsyncWinIO(void);
+extern void shutdownAsyncWinIO(bool wait_threads);
+extern void awaitAsyncRequests(bool wait);
+extern void registerNewIOCPHandle (HANDLE port);
+extern void registerAlertableWait (HANDLE port, DWORD mssec, uint64_t num_req);
+
+extern OVERLAPPED_ENTRY* getOverlappedEntries (uint32_t *num);
+extern void servicedIOEntries (uint64_t remaining);
+extern void completeSynchronousRequest (void);
+