diff options
authorAndreas Klebinger <>2020-05-06 13:39:47 +0200
committerBen Gamari <>2020-07-15 16:41:03 -0400
commit06542b033116bfc4b47c80bdeab44ed1d99005bb (patch)
parentc24c9a1f2a10e044a31b7d89586f4a19ff61e137 (diff)
winio: Refactor non-threaded runner thread and scheduler interface.
Only use a single communication point (registerAlertableWait) to inform the C side aobut both timeouts to use as well as outstanding requests. Also queue a haskell processing thread after each return from alertable waits. This way there is no risk of us missing a timer event.
4 files changed, 228 insertions, 141 deletions
diff --git a/libraries/base/GHC/Event/Windows.hsc b/libraries/base/GHC/Event/Windows.hsc
index 5d0cfe44f4..ccdc6e1144 100644
--- a/libraries/base/GHC/Event/Windows.hsc
+++ b/libraries/base/GHC/Event/Windows.hsc
@@ -68,6 +68,8 @@ module GHC.Event.Windows (
-- define DEBUG 1
+-- #define DEBUG_TRACE 1
##include "windows_cconv.h"
#include <windows.h>
#include <ntstatus.h>
@@ -270,14 +272,12 @@ foreign import ccall safe "registerNewIOCPHandle"
registerNewIOCPHandle :: FFI.IOCP -> IO ()
foreign import ccall safe "registerAlertableWait"
- registerAlertableWait :: FFI.IOCP -> DWORD -> Word64 -> IO ()
+-- (bool has_timeout, DWORD mssec, uint64_t num_req, bool pending_service);
+ c_registerAlertableWait :: Bool -> DWORD -> Word64 -> Bool -> IO ()
foreign import ccall safe "getOverlappedEntries"
getOverlappedEntries :: Ptr DWORD -> IO (Ptr OVERLAPPED_ENTRY)
-foreign import ccall safe "servicedIOEntries"
- servicedIOEntries :: Word64 -> IO ()
foreign import ccall safe "completeSynchronousRequest"
completeSynchronousRequest :: IO ()
@@ -385,28 +385,33 @@ newManager = do
{-# INLINE startIOManagerThread #-}
-- | Starts a new I/O manager thread.
-- For the threaded runtime it creates a pool of OS threads which stays alive
--- until they are instructed to die. For the non-threaded runtime we have a
--- single worker thread in the C runtime.
+-- until they are instructed to die.
+-- For the non-threaded runtime we have a single worker thread in
+-- the C runtime which we force to wake up instead.
startIOManagerThread :: IO () -> IO ()
-startIOManagerThread loop = do
- modifyMVar_ ioManagerThread $ \old -> do
- let create = do debugIO "spawning worker threads.."
- t <- if threadedIOMgr
- then forkOS loop
- else forkIO loop
- debugIO $ "created io-manager threads."
- labelThread t "IOManagerThread"
- return (Just t)
- debugIO $ "startIOManagerThread old=" ++ show old
- case old of
- Nothing -> create
- Just t -> do
- s <- threadStatus t
- case s of
- ThreadFinished -> create
- ThreadDied -> create
- _other -> do interruptSystemManager
- return (Just t)
+startIOManagerThread loop
+ | not threadedIOMgr
+ = debugIO "startIOManagerThread:NonThreaded" >>
+ interruptSystemManager
+ | otherwise = do
+ modifyMVar_ ioManagerThread $ \old -> do
+ let create = do debugIO "spawning worker threads.."
+ t <- if threadedIOMgr
+ then forkOS loop
+ else forkIO loop
+ debugIO $ "created io-manager threads."
+ labelThread t "IOManagerThread"
+ return (Just t)
+ debugIO $ "startIOManagerThread old=" ++ show old
+ case old of
+ Nothing -> create
+ Just t -> do
+ s <- threadStatus t
+ case s of
+ ThreadFinished -> create
+ ThreadDied -> create
+ _other -> do interruptSystemManager
+ return (Just t)
requests :: MVar Word64
requests = unsafePerformIO $ newMVar 0
@@ -450,6 +455,9 @@ callbackArraySize = 32
secondsToNanoSeconds :: Seconds -> Q.Prio
secondsToNanoSeconds s = ceiling $ s * 1000000000
+secondsToMilliSeconds :: Seconds -> Word32
+secondsToMilliSeconds s = ceiling $ s * 1000
nanoSecondsToSeconds :: Q.Prio -> Seconds
nanoSecondsToSeconds n = fromIntegral n / 1000000000.0
@@ -638,8 +646,11 @@ withOverlappedEx mgr fname h offset startCB completionCB = do
completionCB' status 0
when (not threadedIOMgr) $
do num_remaining <- outstandingRequests
- servicedIOEntries num_remaining
- completeSynchronousRequest
+ -- Run timeouts. This way if we canceled the last
+ -- IO Request and have no timer events waiting we
+ -- can go into an unbounded alertable wait.
+ delay <- runExpiredTimeouts mgr
+ registerAlertableWait delay num_remaining True
return $ IOFailed Nothing
let runner = do debugIO $ (dbg ":: waiting ") ++ " | " ++ show lpol
res <- readIOPort signal `catch` cancel
@@ -786,6 +797,9 @@ updateTimeout :: Manager -> TimeoutKey -> Seconds -> IO ()
updateTimeout mgr (TK key) relTime = do
now <- getTime (mgrClock mgr)
let !expTime = secondsToNanoSeconds $ now + relTime
+ -- Note: editTimeouts unconditionally wakes the IO Manager
+ -- but that is not required if the new time is after
+ -- the current time.
editTimeouts mgr (Q.adjust (const expTime) key)
-- | Unregister an active timeout. This is a harmless no-op if the timeout is
@@ -856,6 +870,8 @@ fromTimeout (Just sec) | sec > 120 = 120000
-- the queued I/O requests and timers. If the I/O manager was given a command
-- to block, shutdown or suspend than that request is honored at the end of the
-- loop.
+-- This function can be safely executed multiple times in parallel
step :: Bool -> Manager -> IO (Bool, Maybe Seconds)
step maxDelay mgr@Manager{..} = do
-- Determine how long to wait the next time we block in an alertable state.
@@ -872,7 +888,7 @@ step maxDelay mgr@Manager{..} = do
-- entering a kernel mode wait and this is free to be used. If non-threaded
-- then this is a no-op.
notifyWaiting mgrThreadPool
- n <- if threadedIOMgr
-- To quote Matt Godbolts:
-- There are some unusual edge cases you need to deal with. The
-- GetQueuedCompletionStatus function blocks a thread until there's
@@ -915,10 +931,7 @@ step maxDelay mgr@Manager{..} = do
-- The getQueuedCompletionStatusEx call will remove entries queued by the OS
-- and returns the finished ones in mgrOverlappedEntries and the number of
-- entries removed.
- then FFI.getQueuedCompletionStatusEx mgrIOCP mgrOverlappedEntries timer
- else do num_req <- outstandingRequests
- registerAlertableWait mgrIOCP timer num_req
- return 0
+ n <- FFI.getQueuedCompletionStatusEx mgrIOCP mgrOverlappedEntries timer
debugIO "WinIORunning"
-- If threaded this call informs the threadpool manager that a thread is
-- busy. If all threads are busy and we have not reached the maximum amount
@@ -1004,6 +1017,10 @@ processCompletion Manager{..} n delay = do
-- completed completions. It is mostly a wrapper around processCompletion.
processRemoteCompletion :: IO ()
processRemoteCompletion = do
+#if defined(DEBUG) || defined(DEBUG_TRACE)
+ tid <- myThreadId
+ labelThread tid $ "IOManagerThread-PRC" ++ show tid
alloca $ \ptr_n -> do
debugIO "processRemoteCompletion :: start ()"
-- First figure out how much work we have to do.
@@ -1015,11 +1032,20 @@ processRemoteCompletion = do
mngr <- getSystemManager
let arr = mgrOverlappedEntries mngr
A.unsafeSplat arr entries n
- _ <- processCompletion mngr n Nothing
+ -- Process timeouts
+ delay <- runExpiredTimeouts mngr :: IO (Maybe Seconds)
+ -- Process available completions
+ _ <- processCompletion mngr n delay
num_left <- outstandingRequests
+ -- Update and potentially wake up IO Manager
-- This call will unblock the non-threaded I/O manager. After this it is no
-- longer safe to use `entries` nor `completed`.
- servicedIOEntries num_left
+ registerAlertableWait delay num_left False
debugIO "WinIOBlocked"
-- We may have been woken up due to a timer timeout. So check for any
-- expired timeouts. If we have processed any completions only check
@@ -1028,21 +1054,30 @@ processRemoteCompletion = do
-- When not the threaded runtime we would not have reset the timer events
-- below. Because of this when the request is done we have an additional
- -- step here to reset the wait timers so the I/O manager doesn't keep
+ -- `step` here to reset the wait timers so the I/O manager doesn't keep
-- polling at the temporary high frequency we entered.
- if (n == 0)
- then step True mngr >> return ()
- else runExpiredTimeouts mngr >> return ()
debugIO "processRemoteCompletion :: done ()"
return ()
+registerAlertableWait :: Maybe Seconds -> Word64 -> Bool -> IO ()
+registerAlertableWait Nothing num_reqs pending_service =
+ c_registerAlertableWait False 0 num_reqs pending_service
+registerAlertableWait (Just delay) num_reqs pending_service =
+ c_registerAlertableWait True (secondsToMilliSeconds delay)
+ num_reqs pending_service
-- | Event loop for the Threaded I/O manager. The one for the non-threaded
-- I/O manager is in AsyncWinIO.c in the rts.
io_mngr_loop :: HANDLE -> Manager -> IO ()
+io_mngr_loop _event _mgr
+ | not threadedIOMgr
+ = do debugIO "io_mngr_loop:no-op:called in non-threaded case"
+ return ()
io_mngr_loop _event mgr = go False
go maxDelay =
do debugIO "io_mngr_loop:WinIORunning"
+ traceEventIO "io_mngr_loop:WinIORunning"
(more, delay) <- step maxDelay mgr
let !use_max_delay = not (isJust delay || more)
debugIO "I/O manager stepping."
@@ -1052,7 +1087,7 @@ io_mngr_loop _event mgr = go False
_ | event_id == io_MANAGER_WAKEUP -> return False
_ | event_id == io_MANAGER_DIE -> return True
0 -> return False -- spurious wakeup
- _ -> do debugIO $ "handling console event: " ++ show (event_id `shiftR` 1)
+ _ -> do traceEventIO $ "handling console event: " ++ show (event_id `shiftR` 1)
start_console_handler (event_id `shiftR` 1)
return False
@@ -1061,7 +1096,6 @@ io_mngr_loop _event mgr = go False
-- manager. It will be woken up again when there is more to do.
case () of
_ | exit -> debugIO "I/O manager shutting down."
- _ | not threadedIOMgr -> debugIO "I/O manager single threaded halt."
_ -> go use_max_delay
@@ -1157,8 +1191,9 @@ c_DEBUG_DUMP = return True -- scheduler `fmap` getDebugFlags
debugIO :: String -> IO ()
--- debugIO s = traceEventIO ( "debugIO :: " ++ s)
-#if defined(DEBUG)
+#if defined(DEBUG_TRACE)
+debugIO s = traceEventIO ( "winIO :: " ++ s)
+#elif defined(DEBUG)
debugIO s
= do debug <- c_DEBUG_DUMP
if debug
diff --git a/rts/RtsSymbols.c b/rts/RtsSymbols.c
index ff430d0137..9de31d78f3 100644
--- a/rts/RtsSymbols.c
+++ b/rts/RtsSymbols.c
@@ -345,7 +345,6 @@
SymI_HasProto(registerNewIOCPHandle) \
SymI_HasProto(getOverlappedEntries) \
- SymI_HasProto(servicedIOEntries) \
SymI_HasProto(completeSynchronousRequest) \
SymI_HasProto(registerAlertableWait) \
SymI_HasProto(ioManagerWakeup) \
diff --git a/rts/win32/AsyncWinIO.c b/rts/win32/AsyncWinIO.c
index 4c0d458819..ac7d68c033 100644
--- a/rts/win32/AsyncWinIO.c
+++ b/rts/win32/AsyncWinIO.c
@@ -14,6 +14,8 @@
#include "Prelude.h"
#include "Capability.h"
#include "Schedule.h"
+#include "Rts.h"
+#include "ThreadLabels.h"
#include <stdbool.h>
#include <windows.h>
@@ -99,7 +101,8 @@
`processRemoteCompletion` will process IO results invoking call backs and
processing timer events. Once done it resets `outstanding_service_requests`
and wakes up the IOManager thread. Which at this point becomes unblocked
- and reenters the altertable wait state.
+ and reenters the altertable wait state. This is done by calling into
+ registerAlterableWait.
As a design decision to keep this side as light as possible no bookkeeping
is done here to track requests. That is, this file has no way of knowing
@@ -122,29 +125,48 @@
Note [Notifying the RTS/Haskell of completed events]
- notifyRtsOfFinishedCall can't directly create a haskell thread.
+ The C side runner can't directly create a haskell thread.
With the current API of the haskell runtime this would be terrible
unsound. In particular the GC assumes no heap objects are generated,
and no heap memory is requested while it is running.
To work around this the scheduler invokes queueIOThread which checks
- if a new thread should be created. Since we only use this code path
- in the non-threaded runtime this is safe. The scheduler is never
- running concurrently with the GC or Mutator.
+ if a (haskell) thread should be created to process IO requests.
+ Since we only use this code path in the non-threaded runtime this
+ ensures there is only one OS thread at a time making use of the haskell
+ heap.
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ Note [Non-Threaded IO Manager startup sequence]
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ Under the new IO Manager we run a bit of initialization under
+ hs_init(). The first call into actual IO manager code is a
+ invocation of startupAsyncWinIO();
+ There we initialize IO manager locale variables and.
+ * call ioManagerStart()
+ * Creat a thread to execute "runner"
+ We never truely shut down the IO Manager. While this means we
+ might block forever on the IOPort if the IO Manager is no longer
+ needed we consider this cheap compared to the complexity of
+ properly handling pausing and resuming of the manager.
/* The IOCP Handle all I/O requests are associated with for this RTS. */
static HANDLE completionPortHandle = INVALID_HANDLE_VALUE;
/* Number of currently still outstanding I/O requests. */
-uint64_t outstanding_requests = 0;
+static volatile uint64_t outstanding_requests = 0;
/* Boolean controlling if the I/O manager is/should still be running. */
-bool running = false;
+static bool running = false;
/* Boolean to indicate whether we have outstanding I/O requests that still need
to be processed by the I/O manager on the Haskell side.
Set by:
- notifyRtsOfFinishedCall (true)
- servicedIOEntries (false)
+ notifyScheduler (true)
+ registerAlertableWait (false)
Read by:
@@ -156,38 +178,39 @@ volatile bool outstanding_service_requests = false;
Set by:
Read by:
- servicedIOEntries
+ registerAlertableWait
-bool queue_full = false;
+static bool queue_full = false;
/* Timeout to use for the next alertable wait. INFINITE means never timeout.
Also see note [WINIO Timer management]. */
-DWORD timeout = INFINITE;
-DWORD WINAPI runner (LPVOID lpParam);
-HANDLE workerThread = NULL;
-DWORD workerThreadId = 0;
+static DWORD timeout = INFINITE;
+static HANDLE workerThread = NULL;
+static DWORD workerThreadId = 0;
/* Synchronization mutex for modifying the above state variables in a thread
safe way. */
-SRWLOCK lock;
+static SRWLOCK wio_runner_lock;
/* Conditional variable to wake the I/O manager up from a non-alertable waiting
state. */
-/* Conditional variable to force the system thread to wait for a request to
+static CONDITION_VARIABLE wakeEvent;
+/* Conditional variable to force the system (haskell) thread to wait for a request to
complete. */
+static CONDITION_VARIABLE threadIOWait;
/* Number of callbacks to reserve slots for in ENTRIES. This is also the
total number of concurrent I/O requests we can handle in one go. */
-uint32_t num_callbacks = 32;
+static uint32_t num_callbacks = 32;
/* Buffer for I/O request information. */
+static OVERLAPPED_ENTRY *entries;
/* Number of I/O calls verified to have completed in the last round by the
Haskell I/O Manager. */
-uint32_t num_last_completed;
+static uint32_t num_last_completed;
/* Notify the Haskell side of this many new finished requests */
-uint32_t num_notify;
+static uint32_t num_notify;
/* Indicates to the scheduler that new work is available for processing.
Set by:
@@ -200,17 +223,20 @@ static volatile bool canQueueIOThread;
static void notifyScheduler(uint32_t num);
-// static void notifyRtsOfFinishedCall (uint32_t num);
+static DWORD WINAPI runner (LPVOID lpParam);
-/* Create and initialize the non-threaded I/O manager. */
+/* Create and initialize the non-threaded I/O manager.
+ Called just once from hs_init. */
bool startupAsyncWinIO(void)
+ ASSERT(!running);
running = true;
outstanding_service_requests = false;
completionPortHandle = INVALID_HANDLE_VALUE;
outstanding_requests = 0;
- InitializeSRWLock (&lock);
+ InitializeSRWLock (&wio_runner_lock);
InitializeConditionVariable (&wakeEvent);
InitializeConditionVariable (&threadIOWait);
@@ -239,7 +265,7 @@ void shutdownAsyncWinIO(bool wait_threads)
if (wait_threads)
- AcquireSRWLockExclusive (&lock);
+ AcquireSRWLockExclusive (&wio_runner_lock);
running = false;
ioManagerWakeup ();
@@ -247,7 +273,7 @@ void shutdownAsyncWinIO(bool wait_threads)
WakeConditionVariable (&wakeEvent);
WakeConditionVariable (&threadIOWait);
- ReleaseSRWLockExclusive (&lock);
+ ReleaseSRWLockExclusive (&wio_runner_lock);
/* Now wait for the thread to actually finish. */
WaitForSingleObject (workerThread, INFINITE);
@@ -267,11 +293,11 @@ void shutdownAsyncWinIO(bool wait_threads)
monitoring. All handles are expected to be associated with this handle. */
void registerNewIOCPHandle (HANDLE port)
- AcquireSRWLockExclusive (&lock);
+ AcquireSRWLockExclusive (&wio_runner_lock);
completionPortHandle = port;
- ReleaseSRWLockExclusive (&lock);
+ ReleaseSRWLockExclusive (&wio_runner_lock);
/* Callback hook so the Haskell part of the I/O manager can notify this manager
@@ -280,28 +306,65 @@ void registerNewIOCPHandle (HANDLE port)
void completeSynchronousRequest (void)
- AcquireSRWLockExclusive (&lock);
+ AcquireSRWLockExclusive (&wio_runner_lock);
WakeConditionVariable (&threadIOWait);
- ReleaseSRWLockExclusive (&lock);
+ ReleaseSRWLockExclusive (&wio_runner_lock);
-/* Register a new I/O request that the I/O manager should handle. PORT is the
- completion port handle that the request is associated with, MSSEC is the
- maximum amount of time in milliseconds that an alertable wait should be done
- for before the RTS requested to be notified of progress and NUM_REQ is the
- total overall number of outstanding I/O requests. */
+/* Register outstanding I/O requests that the I/O manager should handle.
+ This function will unblock the runner if it has been blocked in an
+ non-alertable wait. It might end an alertable wait as well but this
+ depends on the exact parameters provided.
+ The haskell side will call this to inform the runner either about new
+ I/O requests or to update the number of outstanding requests after
+ processing a bundle.
+ * has_timeout tells us if the mssec parameter is valid.
+ * MSSEC is the maximum amount of time in milliseconds that an alertable wait
+ should be done for before the haskell side requested to be notified of progress.
+ * NUM_REQ is the total overall number of outstanding I/O requests.
+ * outstanding_service indicates that there might be still a outstanding service
+ request queued and therefore we shouldn't unblock the runner quite yet.
+ `outstanding_service` is needed in case we cancel an IO operation. We don't want this
+ to result in two processRemoteCompletion threads being queued. As this is both harder
+ to reason about and bad for performance. So we only reset outstanding_service_requests
+ if no service is pending.
-void registerAlertableWait (HANDLE port, DWORD mssec, uint64_t num_req)
+ */
+void registerAlertableWait (bool has_timeout, DWORD mssec, uint64_t num_req, bool pending_service)
+ ASSERT(completionPortHandle != INVALID_HANDLE_VALUE);
+ AcquireSRWLockExclusive (&wio_runner_lock);
bool interrupt = false;
- bool wakeup = false;
- AcquireSRWLockExclusive (&lock);
+ outstanding_requests = num_req;
- /* Decide if we may have to wake up the I/O manager. */
- wakeup = outstanding_requests == 0 && num_req > 0;
+ if (outstanding_requests <= 0 && !has_timeout) {
+ timeout = INFINITE;
+ }
+ else if(has_timeout) {
+ timeout = mssec;
+ }
+ outstanding_service_requests = pending_service;
+ //Resize queue if required
+ if (queue_full)
+ {
+ num_callbacks *= 2;
+ = realloc (entries,
+ sizeof (OVERLAPPED_ENTRY) * num_callbacks);
+ if (new)
+ entries = new;
+ queue_full = false;
+ }
outstanding_requests = num_req;
/* If the new timeout is earlier than the old one we have to reschedule the
@@ -313,12 +376,16 @@ void registerAlertableWait (HANDLE port, DWORD mssec, uint64_t num_req)
interrupt = true;
- ReleaseSRWLockExclusive (&lock);
+ ReleaseSRWLockExclusive (&wio_runner_lock);
- if (wakeup)
- WakeConditionVariable (&wakeEvent);
- else if (interrupt)
- PostQueuedCompletionStatus (port, 0, 0, NULL);
+ // Since we call registerAlertableWait only after
+ // processing I/O requests it's always desireable to wake
+ // up the runner here.
+ WakeConditionVariable (&wakeEvent);
+ if (interrupt) {
+ PostQueuedCompletionStatus (completionPortHandle, 0, 0, NULL);
+ }
/* Exported callback function that will be called by the RTS to collect the
@@ -326,10 +393,10 @@ void registerAlertableWait (HANDLE port, DWORD mssec, uint64_t num_req)
number of read entries will be returned in NUM.
NOTE: This function isn't thread safe, but is intended to be called only
- when requested to by the I/O manager via notifyRtsOfFinishedCall. In
+ when requested by the I/O manager via notifyScheduler. In
that context it is thread safe as we're guaranteeing that the I/O
manager is blocked waiting for the read to happen followed by a
- servicedIOEntries call. */
+ registerAlertableWait call. */
OVERLAPPED_ENTRY* getOverlappedEntries (uint32_t *num)
*num = num_last_completed;
@@ -346,76 +413,48 @@ void awaitAsyncRequests (bool wait)
if(queueIOThread()) {
- AcquireSRWLockExclusive (&lock);
+ AcquireSRWLockExclusive (&wio_runner_lock);
/* We don't deal with spurious requests here, that's left up to AwaitEvent.c
because in principle we need to check if the capability work queue is now
not empty but we can't do that here. Also these locks don't guarantee
fairness, as such a request may have completed without us seeing a
timeslice in between. */
if (wait && outstanding_service_requests)
- SleepConditionVariableSRW (&threadIOWait, &lock, INFINITE, 0);
+ SleepConditionVariableSRW (&threadIOWait, &wio_runner_lock, INFINITE, 0);
- ReleaseSRWLockExclusive (&lock);
+ ReleaseSRWLockExclusive (&wio_runner_lock);
-/* Exported function that will be called by the RTS in order to indicate that
- the RTS has successfully finished servicing I/O request returned with
- getOverlappedEntries. Some of the overlapped requests may not have been
- an I/O request so the RTS also returns the amount of REMAINING overlapped
- entried it still expects to be processed. */
-void servicedIOEntries (uint64_t remaining)
- AcquireSRWLockExclusive (&lock);
- outstanding_requests = remaining;
- if (outstanding_requests <= 0)
- timeout = INFINITE;
- outstanding_service_requests = false;
- if (queue_full)
- {
- num_callbacks *= 2;
- = realloc (entries,
- sizeof (OVERLAPPED_ENTRY) * num_callbacks);
- if (new)
- entries = new;
- }
- ReleaseSRWLockExclusive (&lock);
- WakeConditionVariable (&wakeEvent);
/* Sets `canQueueIOThread` to indicate to the scheduler that it should
queue a new haskell thread to process IO events. */
static void notifyScheduler(uint32_t num) {
- AcquireSRWLockExclusive (&lock);
+ AcquireSRWLockExclusive (&wio_runner_lock);
canQueueIOThread = true;
num_notify = num;
- ReleaseSRWLockExclusive (&lock);
+ ReleaseSRWLockExclusive (&wio_runner_lock);
/* Queues a new haskell thread to process IO events
if there is work to do.
- Return true if work was queued.
+ Returns true if a thread/work was queued.
Not already waiting on service requests.
outstanding_service_requests = true
processRemoteCompletion queued.
- IOThread blocked until processRemoteCompletion has run.
+ IO runner thread blocked until processRemoteCompletion has run.
bool queueIOThread()
bool result = false;
#if !defined(THREADED_RTS)
- AcquireSRWLockExclusive (&lock);
+ AcquireSRWLockExclusive (&wio_runner_lock);
@@ -425,24 +464,26 @@ bool queueIOThread()
Capability *cap = &MainCapability;
StgTSO * tso = createStrictIOThread (cap, RtsFlags.GcFlags.initialStkSize,
+ labelThread(cap, tso, "ProcessIOThread");
scheduleThreadNow (cap, tso);
result = true;
- ReleaseSRWLockExclusive (&lock);
+ ReleaseSRWLockExclusive (&wio_runner_lock);
return result;
/* Main thread runner for the non-threaded I/O Manager. */
+static DWORD WINAPI runner (LPVOID lpParam STG_UNUSED)
/* The last event that was sent to the I/O manager. */
HsWord32 lastEvent = 0;
while (running)
- AcquireSRWLockExclusive (&lock);
+ AcquireSRWLockExclusive (&wio_runner_lock);
lastEvent = readIOManagerEvent ();
/* Non-alertable wait. While here we can't server any I/O requests so we
@@ -459,12 +500,15 @@ DWORD WINAPI runner (LPVOID lpParam STG_UNUSED)
|| outstanding_service_requests
|| canQueueIOThread)
- SleepConditionVariableSRW (&wakeEvent, &lock, INFINITE, 0);
+ // fprintf(stderr, "NonAlert sleep:(%x, %i, %i)\n",
+ // lastEvent, outstanding_service_requests, canQueueIOThread);
+ // fflush(stderr);
+ SleepConditionVariableSRW (&wakeEvent, &wio_runner_lock, INFINITE, 0);
HsWord32 nextEvent = readIOManagerEvent ();
lastEvent = nextEvent ? nextEvent : lastEvent;
- ReleaseSRWLockExclusive (&lock);
+ ReleaseSRWLockExclusive (&wio_runner_lock);
ULONG num_removed = -1;
ZeroMemory (entries, sizeof (entries[0]) * num_callbacks);
@@ -472,27 +516,37 @@ DWORD WINAPI runner (LPVOID lpParam STG_UNUSED)
num_callbacks, &num_removed, timeout,
+ //No completions outstanding on the haskell side.
+ //This means all returned bundles must have been
+ // processed already and hence can be ignored.
if (outstanding_requests == 0)
num_removed = 0;
if (num_removed > 0)
queue_full = num_removed == num_callbacks;
- notifyScheduler (num_removed);
else if (WAIT_TIMEOUT == GetLastError ())
num_removed = 0;
- notifyScheduler (num_removed);
- AcquireSRWLockExclusive (&lock);
+ // We always queue a haskell thread upon returning from GetQueuedCompletionStatusEx.
+ // We only wake up if:
+ // * IO was processed, in which case we need to process the events.
+ // * A timer event was registered/timed out. We need the process expired timers
+ // and update the timeout.
+ // * We woke up spuriously, which is quite rare.
+ // This simplifies the logic in exchange for a slim chance of redundant
+ // haskell threads if we wake up spuriously.
+ notifyScheduler (num_removed);
+ AcquireSRWLockExclusive (&wio_runner_lock);
if (!running)
ExitThread (0);
- ReleaseSRWLockExclusive (&lock);
+ ReleaseSRWLockExclusive (&wio_runner_lock);
return 0;
diff --git a/rts/win32/AsyncWinIO.h b/rts/win32/AsyncWinIO.h
index ac5413f8aa..8eefee5ab6 100644
--- a/rts/win32/AsyncWinIO.h
+++ b/rts/win32/AsyncWinIO.h
@@ -18,9 +18,8 @@ extern bool startupAsyncWinIO(void);
extern void shutdownAsyncWinIO(bool wait_threads);
extern void awaitAsyncRequests(bool wait);
extern void registerNewIOCPHandle (HANDLE port);
-extern void registerAlertableWait (HANDLE port, DWORD mssec, uint64_t num_req);
+extern void registerAlertableWait (bool has_timeout, DWORD mssec, uint64_t num_req, bool service_pending);
extern OVERLAPPED_ENTRY* getOverlappedEntries (uint32_t *num);
-extern void servicedIOEntries (uint64_t remaining);
extern void completeSynchronousRequest (void);
extern bool queueIOThread(void);