diff options
-rw-r--r-- | libraries/base/GHC/Event/Thread.hs | 94 | ||||
-rw-r--r-- | testsuite/tests/concurrent/should_run/all.T | 1 |
2 files changed, 75 insertions, 20 deletions
diff --git a/libraries/base/GHC/Event/Thread.hs b/libraries/base/GHC/Event/Thread.hs index a330225622..cf9a769766 100644 --- a/libraries/base/GHC/Event/Thread.hs +++ b/libraries/base/GHC/Event/Thread.hs @@ -18,7 +18,7 @@ module GHC.Event.Thread -- TODO: Use new Windows I/O manager import Control.Exception (finally, SomeException, toException) import Data.Foldable (forM_, mapM_, sequence_) -import Data.IORef (IORef, newIORef, readIORef, writeIORef) +import Data.IORef (IORef, newIORef, readIORef, writeIORef, atomicWriteIORef) import Data.Maybe (fromMaybe) import Data.Tuple (snd) import Foreign.C.Error (eBADF, errnoToIOError) @@ -29,7 +29,8 @@ import GHC.List (zipWith, zipWith3) import GHC.Conc.Sync (TVar, ThreadId, ThreadStatus(..), atomically, forkIO, labelThread, modifyMVar_, withMVar, newTVar, sharedCAF, getNumCapabilities, threadCapability, myThreadId, forkOn, - threadStatus, writeTVar, newTVarIO, readTVar, retry,throwSTM,STM) + threadStatus, writeTVar, newTVarIO, readTVar, retry, + throwSTM, STM, yield) import GHC.IO (mask_, uninterruptibleMask_, onException) import GHC.IO.Exception (ioError) import GHC.IOArray (IOArray, newIOArray, readIOArray, writeIOArray, @@ -41,6 +42,7 @@ import GHC.Event.Manager (Event, EventManager, evtRead, evtWrite, loop, new, registerFd, unregisterFd_) import qualified GHC.Event.Manager as M import qualified GHC.Event.TimerManager as TM +import GHC.Ix (inRange) import GHC.Num ((-), (+)) import GHC.Real (fromIntegral) import GHC.Show (showSignedInt) @@ -98,22 +100,44 @@ threadWaitWrite = threadWait evtWrite closeFdWith :: (Fd -> IO ()) -- ^ Action that performs the close. -> Fd -- ^ File descriptor to close. -> IO () -closeFdWith close fd = do - eventManagerArray <- readIORef eventManager - let (low, high) = boundsIOArray eventManagerArray - mgrs <- flip mapM [low..high] $ \i -> do - Just (_,!mgr) <- readIOArray eventManagerArray i - return mgr - -- 'takeMVar', and 'M.closeFd_' might block, although for a very short time. - -- To make 'closeFdWith' safe in presence of asynchronous exceptions we have - -- to use uninterruptible mask. - uninterruptibleMask_ $ do - tables <- flip mapM mgrs $ \mgr -> takeMVar $ M.callbackTableVar mgr fd - cbApps <- zipWithM (\mgr table -> M.closeFd_ mgr table fd) mgrs tables - close fd `finally` sequence_ (zipWith3 finish mgrs tables cbApps) +closeFdWith close fd = close_loop where finish mgr table cbApp = putMVar (M.callbackTableVar mgr fd) table >> cbApp zipWithM f xs ys = sequence (zipWith f xs ys) + -- The array inside 'eventManager' can be swapped out at any time, see + -- 'ioManagerCapabilitiesChanged'. See #21651. We detect this case by + -- checking the array bounds before and after. When such a swap has + -- happened we cleanup and try again + close_loop = do + eventManagerArray <- readIORef eventManager + let ema_bounds@(low, high) = boundsIOArray eventManagerArray + mgrs <- flip mapM [low..high] $ \i -> do + Just (_,!mgr) <- readIOArray eventManagerArray i + return mgr + + -- 'takeMVar', and 'M.closeFd_' might block, although for a very short time. + -- To make 'closeFdWith' safe in presence of asynchronous exceptions we have + -- to use uninterruptible mask. + join $ uninterruptibleMask_ $ do + tables <- flip mapM mgrs $ \mgr -> takeMVar $ M.callbackTableVar mgr fd + new_ema_bounds <- boundsIOArray `fmap` readIORef eventManager + -- Here we exploit Note [The eventManager Array] + if new_ema_bounds /= ema_bounds + then do + -- the array has been modified. + -- mgrs still holds the right EventManagers, by the Note. + -- new_ema_bounds must be larger than ema_bounds, by the note. + -- return the MVars we took and try again + sequence_ $ zipWith (\mgr table -> finish mgr table (pure ())) mgrs tables + pure close_loop + else do + -- We surely have taken all the appropriate MVars. Even if the array + -- has been swapped, our mgrs is still correct. + -- Remove the Fd from all callback tables, close the Fd, and run all + -- callbacks. + cbApps <- zipWithM (\mgr table -> M.closeFd_ mgr table fd) mgrs tables + close fd `finally` sequence_ (zipWith3 finish mgrs tables cbApps) + pure (pure ()) threadWait :: Event -> Fd -> IO () threadWait evt fd = mask_ $ do @@ -177,10 +201,24 @@ threadWaitWriteSTM = threadWaitSTM evtWrite getSystemEventManager :: IO (Maybe EventManager) getSystemEventManager = do t <- myThreadId - (cap, _) <- threadCapability t eventManagerArray <- readIORef eventManager - mmgr <- readIOArray eventManagerArray cap - return $ fmap snd mmgr + let r = boundsIOArray eventManagerArray + (cap, _) <- threadCapability t + -- It is possible that we've just increased the number of capabilities and the + -- new EventManager has not yet been constructed by + -- 'ioManagerCapabilitiesChanged'. We expect this to happen very rarely. + -- T21561 exercises this. + -- Two options to proceed: + -- 1) return the EventManager for capability 0. This is guaranteed to exist, + -- and "shouldn't" cause any correctness issues. + -- 2) Busy wait, with or without a call to 'yield'. This can't deadlock, + -- because we must be on a brand capability and there must be a call to + -- 'ioManagerCapabilitiesChanged' pending. + -- + -- We take the second option, with the yield, judging it the most robust. + if not (inRange r cap) + then yield >> getSystemEventManager + else fmap snd `fmap` readIOArray eventManagerArray cap getSystemEventManager_ :: IO EventManager getSystemEventManager_ = do @@ -191,6 +229,22 @@ getSystemEventManager_ = do foreign import ccall unsafe "getOrSetSystemEventThreadEventManagerStore" getOrSetSystemEventThreadEventManagerStore :: Ptr a -> IO (Ptr a) +-- Note [The eventManager Array] +-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +-- A mutable array holding the current EventManager for each capability +-- An entry is Nothing only while the eventmanagers are initialised, see +-- 'startIOManagerThread' and 'ioManagerCapabilitiesChanged'. +-- The 'ThreadId' at array position 'cap' will have been 'forkOn'ed capabality +-- 'cap'. +-- The array will be swapped with newer arrays when the number of capabilities +-- changes(via 'setNumCapabilities'). However: +-- * the size of the arrays will never decrease; and +-- * The 'EventManager's in the array are not replaced with other +-- 'EventManager' constructors. +-- +-- This is a similar strategy as the rts uses for it's +-- capabilities array (n_capabilities is the size of the array, +-- enabled_capabilities' is the number of active capabilities). eventManager :: IORef (IOArray Int (Maybe (ThreadId, EventManager))) eventManager = unsafePerformIO $ do numCaps <- getNumCapabilities @@ -351,7 +405,9 @@ ioManagerCapabilitiesChanged = startIOManagerThread new_eventManagerArray -- update the event manager array reference: - writeIORef eventManager new_eventManagerArray + atomicWriteIORef eventManager new_eventManagerArray + -- We need an atomic write here because 'eventManager' is accessed + -- unsynchronized in 'getSystemEventManager' and 'closeFdWith' else when (new_n_caps > numEnabled) $ forM_ [numEnabled..new_n_caps-1] $ \i -> do Just (_,mgr) <- readIOArray eventManagerArray i diff --git a/testsuite/tests/concurrent/should_run/all.T b/testsuite/tests/concurrent/should_run/all.T index 11cecb3d14..6bfeab4410 100644 --- a/testsuite/tests/concurrent/should_run/all.T +++ b/testsuite/tests/concurrent/should_run/all.T @@ -229,7 +229,6 @@ test('T21651', when(opsys('mingw32'),skip), # uses POSIX pipes when(opsys('darwin'),extra_run_opts('8 12 2000 100')), unless(opsys('darwin'),extra_run_opts('8 12 2000 200')), # darwin runners complain of too many open files - expect_broken('21651'), req_smp ], compile_and_run, ['']) |