summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libraries/base/GHC/Event/Thread.hs94
-rw-r--r--testsuite/tests/concurrent/should_run/all.T1
2 files changed, 75 insertions, 20 deletions
diff --git a/libraries/base/GHC/Event/Thread.hs b/libraries/base/GHC/Event/Thread.hs
index a330225622..cf9a769766 100644
--- a/libraries/base/GHC/Event/Thread.hs
+++ b/libraries/base/GHC/Event/Thread.hs
@@ -18,7 +18,7 @@ module GHC.Event.Thread
-- TODO: Use new Windows I/O manager
import Control.Exception (finally, SomeException, toException)
import Data.Foldable (forM_, mapM_, sequence_)
-import Data.IORef (IORef, newIORef, readIORef, writeIORef)
+import Data.IORef (IORef, newIORef, readIORef, writeIORef, atomicWriteIORef)
import Data.Maybe (fromMaybe)
import Data.Tuple (snd)
import Foreign.C.Error (eBADF, errnoToIOError)
@@ -29,7 +29,8 @@ import GHC.List (zipWith, zipWith3)
import GHC.Conc.Sync (TVar, ThreadId, ThreadStatus(..), atomically, forkIO,
labelThread, modifyMVar_, withMVar, newTVar, sharedCAF,
getNumCapabilities, threadCapability, myThreadId, forkOn,
- threadStatus, writeTVar, newTVarIO, readTVar, retry,throwSTM,STM)
+ threadStatus, writeTVar, newTVarIO, readTVar, retry,
+ throwSTM, STM, yield)
import GHC.IO (mask_, uninterruptibleMask_, onException)
import GHC.IO.Exception (ioError)
import GHC.IOArray (IOArray, newIOArray, readIOArray, writeIOArray,
@@ -41,6 +42,7 @@ import GHC.Event.Manager (Event, EventManager, evtRead, evtWrite, loop,
new, registerFd, unregisterFd_)
import qualified GHC.Event.Manager as M
import qualified GHC.Event.TimerManager as TM
+import GHC.Ix (inRange)
import GHC.Num ((-), (+))
import GHC.Real (fromIntegral)
import GHC.Show (showSignedInt)
@@ -98,22 +100,44 @@ threadWaitWrite = threadWait evtWrite
closeFdWith :: (Fd -> IO ()) -- ^ Action that performs the close.
-> Fd -- ^ File descriptor to close.
-> IO ()
-closeFdWith close fd = do
- eventManagerArray <- readIORef eventManager
- let (low, high) = boundsIOArray eventManagerArray
- mgrs <- flip mapM [low..high] $ \i -> do
- Just (_,!mgr) <- readIOArray eventManagerArray i
- return mgr
- -- 'takeMVar', and 'M.closeFd_' might block, although for a very short time.
- -- To make 'closeFdWith' safe in presence of asynchronous exceptions we have
- -- to use uninterruptible mask.
- uninterruptibleMask_ $ do
- tables <- flip mapM mgrs $ \mgr -> takeMVar $ M.callbackTableVar mgr fd
- cbApps <- zipWithM (\mgr table -> M.closeFd_ mgr table fd) mgrs tables
- close fd `finally` sequence_ (zipWith3 finish mgrs tables cbApps)
+closeFdWith close fd = close_loop
where
finish mgr table cbApp = putMVar (M.callbackTableVar mgr fd) table >> cbApp
zipWithM f xs ys = sequence (zipWith f xs ys)
+ -- The array inside 'eventManager' can be swapped out at any time, see
+ -- 'ioManagerCapabilitiesChanged'. See #21651. We detect this case by
+ -- checking the array bounds before and after. When such a swap has
+ -- happened we cleanup and try again
+ close_loop = do
+ eventManagerArray <- readIORef eventManager
+ let ema_bounds@(low, high) = boundsIOArray eventManagerArray
+ mgrs <- flip mapM [low..high] $ \i -> do
+ Just (_,!mgr) <- readIOArray eventManagerArray i
+ return mgr
+
+ -- 'takeMVar', and 'M.closeFd_' might block, although for a very short time.
+ -- To make 'closeFdWith' safe in presence of asynchronous exceptions we have
+ -- to use uninterruptible mask.
+ join $ uninterruptibleMask_ $ do
+ tables <- flip mapM mgrs $ \mgr -> takeMVar $ M.callbackTableVar mgr fd
+ new_ema_bounds <- boundsIOArray `fmap` readIORef eventManager
+ -- Here we exploit Note [The eventManager Array]
+ if new_ema_bounds /= ema_bounds
+ then do
+ -- the array has been modified.
+ -- mgrs still holds the right EventManagers, by the Note.
+ -- new_ema_bounds must be larger than ema_bounds, by the note.
+ -- return the MVars we took and try again
+ sequence_ $ zipWith (\mgr table -> finish mgr table (pure ())) mgrs tables
+ pure close_loop
+ else do
+ -- We surely have taken all the appropriate MVars. Even if the array
+ -- has been swapped, our mgrs is still correct.
+ -- Remove the Fd from all callback tables, close the Fd, and run all
+ -- callbacks.
+ cbApps <- zipWithM (\mgr table -> M.closeFd_ mgr table fd) mgrs tables
+ close fd `finally` sequence_ (zipWith3 finish mgrs tables cbApps)
+ pure (pure ())
threadWait :: Event -> Fd -> IO ()
threadWait evt fd = mask_ $ do
@@ -177,10 +201,24 @@ threadWaitWriteSTM = threadWaitSTM evtWrite
getSystemEventManager :: IO (Maybe EventManager)
getSystemEventManager = do
t <- myThreadId
- (cap, _) <- threadCapability t
eventManagerArray <- readIORef eventManager
- mmgr <- readIOArray eventManagerArray cap
- return $ fmap snd mmgr
+ let r = boundsIOArray eventManagerArray
+ (cap, _) <- threadCapability t
+ -- It is possible that we've just increased the number of capabilities and the
+ -- new EventManager has not yet been constructed by
+ -- 'ioManagerCapabilitiesChanged'. We expect this to happen very rarely.
+ -- T21561 exercises this.
+ -- Two options to proceed:
+ -- 1) return the EventManager for capability 0. This is guaranteed to exist,
+ -- and "shouldn't" cause any correctness issues.
+ -- 2) Busy wait, with or without a call to 'yield'. This can't deadlock,
+ -- because we must be on a brand capability and there must be a call to
+ -- 'ioManagerCapabilitiesChanged' pending.
+ --
+ -- We take the second option, with the yield, judging it the most robust.
+ if not (inRange r cap)
+ then yield >> getSystemEventManager
+ else fmap snd `fmap` readIOArray eventManagerArray cap
getSystemEventManager_ :: IO EventManager
getSystemEventManager_ = do
@@ -191,6 +229,22 @@ getSystemEventManager_ = do
foreign import ccall unsafe "getOrSetSystemEventThreadEventManagerStore"
getOrSetSystemEventThreadEventManagerStore :: Ptr a -> IO (Ptr a)
+-- Note [The eventManager Array]
+-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+-- A mutable array holding the current EventManager for each capability
+-- An entry is Nothing only while the eventmanagers are initialised, see
+-- 'startIOManagerThread' and 'ioManagerCapabilitiesChanged'.
+-- The 'ThreadId' at array position 'cap' will have been 'forkOn'ed capabality
+-- 'cap'.
+-- The array will be swapped with newer arrays when the number of capabilities
+-- changes(via 'setNumCapabilities'). However:
+-- * the size of the arrays will never decrease; and
+-- * The 'EventManager's in the array are not replaced with other
+-- 'EventManager' constructors.
+--
+-- This is a similar strategy as the rts uses for it's
+-- capabilities array (n_capabilities is the size of the array,
+-- enabled_capabilities' is the number of active capabilities).
eventManager :: IORef (IOArray Int (Maybe (ThreadId, EventManager)))
eventManager = unsafePerformIO $ do
numCaps <- getNumCapabilities
@@ -351,7 +405,9 @@ ioManagerCapabilitiesChanged =
startIOManagerThread new_eventManagerArray
-- update the event manager array reference:
- writeIORef eventManager new_eventManagerArray
+ atomicWriteIORef eventManager new_eventManagerArray
+ -- We need an atomic write here because 'eventManager' is accessed
+ -- unsynchronized in 'getSystemEventManager' and 'closeFdWith'
else when (new_n_caps > numEnabled) $
forM_ [numEnabled..new_n_caps-1] $ \i -> do
Just (_,mgr) <- readIOArray eventManagerArray i
diff --git a/testsuite/tests/concurrent/should_run/all.T b/testsuite/tests/concurrent/should_run/all.T
index 11cecb3d14..6bfeab4410 100644
--- a/testsuite/tests/concurrent/should_run/all.T
+++ b/testsuite/tests/concurrent/should_run/all.T
@@ -229,7 +229,6 @@ test('T21651',
when(opsys('mingw32'),skip), # uses POSIX pipes
when(opsys('darwin'),extra_run_opts('8 12 2000 100')),
unless(opsys('darwin'),extra_run_opts('8 12 2000 200')), # darwin runners complain of too many open files
- expect_broken('21651'),
req_smp ],
compile_and_run, [''])