diff options
author | Andreas Voellmy <andreas.voellmy@gmail.com> | 2012-12-22 20:56:02 -0500 |
---|---|---|
committer | Johan Tibell <johan.tibell@gmail.com> | 2013-02-11 21:38:06 -0800 |
commit | 12f3fef5ec52c1ec0958b674adcd981f48048428 (patch) | |
tree | 0848a1a1ddb84b0fb5d0224f8c8bb97c5c587a46 /libraries | |
parent | adebaa42dbe3b09e74c864981b6fc8684b6d5086 (diff) | |
download | haskell-12f3fef5ec52c1ec0958b674adcd981f48048428.tar.gz |
Parallel IO manager supports increasing and decreasing number of capabilities.
We never deallocate the backend files (e.g. epoll instance, eventfd files) when decreasing number of capabilities. Nor do we exit the poll loop for that instance. However, that thread will naturally empty its queue over time and eventually stay out in a foreign call indefinitely. There is a remote possibility that a Haskell thread got a reference to an IO manager just before the number of capabilities was decreased and then this thread finally registers a callback at some time far in the future. This scenario is the motivation for leaving the backend instance and thread servicing that instance alive.
The main change is now in adding new capabilities. Since those capabilites may have been active in the past, we may already have backend files available for use. We signal to the old thread servicing that backend to release the backend and exit. Upon exiting it fills an MVar. We start a new thread bound to the new capability and it waits to enter the poll loop until the MVar is full.
Diffstat (limited to 'libraries')
-rw-r--r-- | libraries/base/GHC/Event/Manager.hs | 43 | ||||
-rw-r--r-- | libraries/base/GHC/Event/Thread.hs | 84 |
2 files changed, 96 insertions, 31 deletions
diff --git a/libraries/base/GHC/Event/Manager.hs b/libraries/base/GHC/Event/Manager.hs index cb0ab02531..267d09fd20 100644 --- a/libraries/base/GHC/Event/Manager.hs +++ b/libraries/base/GHC/Event/Manager.hs @@ -7,7 +7,6 @@ , TypeSynonymInstances , FlexibleInstances #-} - module GHC.Event.Manager ( -- * Types EventManager @@ -22,6 +21,7 @@ module GHC.Event.Manager , loop , step , shutdown + , release , cleanup , wakeManager @@ -48,8 +48,9 @@ module GHC.Event.Manager ------------------------------------------------------------------------ -- Imports -import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar) -import Control.Exception (finally) +import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar, putMVar, + tryPutMVar, takeMVar) +import Control.Exception (onException) import Control.Monad ((=<<), forM_, liftM, sequence_, when, replicateM, void) import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef, writeIORef) @@ -104,6 +105,7 @@ type IOCallback = FdKey -> Event -> IO () data State = Created | Running | Dying + | Releasing | Finished deriving (Eq, Show) @@ -115,6 +117,7 @@ data EventManager = EventManager , emUniqueSource :: {-# UNPACK #-} !UniqueSource , emControl :: {-# UNPACK #-} !Control , emOneShot :: !Bool + , emLock :: MVar () } callbackArraySize :: Int @@ -165,12 +168,14 @@ newWith oneShot be = do when (st /= Finished) $ do I.delete be closeControl ctrl + lockVar <- newMVar () let mgr = EventManager { emBackend = be , emFds = iofds , emState = state , emUniqueSource = us , emControl = ctrl , emOneShot = oneShot + , emLock = lockVar } registerControlFd mgr (controlReadFd ctrl) evtRead registerControlFd mgr (wakeupReadFd ctrl) evtRead @@ -185,12 +190,20 @@ shutdown mgr = do state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s) when (state == Running) $ sendDie (emControl mgr) +-- | Asynchronously tell the thread executing the event +-- manager loop to exit. +release :: EventManager -> IO () +release EventManager{..} = do + state <- atomicModifyIORef emState $ \s -> (Releasing, s) + when (state == Running) $ sendWakeup emControl + finished :: EventManager -> IO Bool finished mgr = (== Finished) `liftM` readIORef (emState mgr) cleanup :: EventManager -> IO () cleanup EventManager{..} = do writeIORef emState Finished + void $ tryPutMVar emLock () I.delete emBackend closeControl emControl @@ -204,24 +217,30 @@ cleanup EventManager{..} = do -- closes all of its control resources when it finishes. loop :: EventManager -> IO () loop mgr@EventManager{..} = do + void $ takeMVar emLock state <- atomicModifyIORef emState $ \s -> case s of Created -> (Running, s) + Releasing -> (Running, s) _ -> (s, s) case state of - Created -> go `finally` cleanup mgr - Dying -> cleanup mgr - _ -> do cleanup mgr - error $ "GHC.Event.Manager.loop: state is already " ++ - show state + Created -> go `onException` cleanup mgr + Releasing -> go `onException` cleanup mgr + Dying -> cleanup mgr + _ -> do cleanup mgr + error $ "GHC.Event.Manager.loop: state is already " ++ + show state where - go = do running <- step mgr - when running (yield >> go) + go = do state <- step mgr + case state of + Running -> yield >> go + Releasing -> putMVar emLock () + _ -> cleanup mgr -step :: EventManager -> IO Bool +step :: EventManager -> IO State step mgr@EventManager{..} = do waitForIO state <- readIORef emState - state `seq` return (state == Running) + state `seq` return state where waitForIO = do n1 <- I.poll emBackend Nothing (onFdEvent mgr) diff --git a/libraries/base/GHC/Event/Thread.hs b/libraries/base/GHC/Event/Thread.hs index 85a037230b..b2aef8ab7e 100644 --- a/libraries/base/GHC/Event/Thread.hs +++ b/libraries/base/GHC/Event/Thread.hs @@ -1,6 +1,5 @@ {-# LANGUAGE Trustworthy #-} {-# LANGUAGE BangPatterns, ForeignFunctionInterface, NoImplicitPrelude #-} - module GHC.Event.Thread ( getSystemEventManager , getSystemTimerManager @@ -16,7 +15,7 @@ module GHC.Event.Thread ) where import Control.Exception (finally) -import Control.Monad (forM, forM_, zipWithM_) +import Control.Monad (forM, forM_, zipWithM_, when) import Data.IORef (IORef, newIORef, readIORef, writeIORef) import Data.Maybe (Maybe(..)) import Data.Tuple (snd) @@ -38,7 +37,7 @@ import GHC.Event.Manager (Event, EventManager, evtRead, evtWrite, loop, import qualified GHC.Event.IntMap as IM import qualified GHC.Event.Manager as M import qualified GHC.Event.TimerManager as TM -import GHC.Num ((-)) +import GHC.Num ((-), (+)) import System.IO.Unsafe (unsafePerformIO) import System.Posix.Types (Fd) @@ -181,9 +180,15 @@ eventManager = unsafePerformIO $ do sharedCAF em getOrSetSystemEventThreadEventManagerStore {-# NOINLINE eventManager #-} +numEnabledEventManagers :: IORef Int +numEnabledEventManagers = unsafePerformIO $ do + newIORef 0 + foreign import ccall unsafe "getOrSetSystemEventThreadIOManagerThreadStore" getOrSetSystemEventThreadIOManagerThreadStore :: Ptr a -> IO (Ptr a) +-- | The ioManagerLock protects the 'eventManager' value: +-- Only one thread at a time can start or shutdown event managers. {-# NOINLINE ioManagerLock #-} ioManagerLock :: MVar () ioManagerLock = unsafePerformIO $ do @@ -221,12 +226,20 @@ ensureIOManagerIsRunning startTimerManagerThread startIOManagerThreads :: IO () -startIOManagerThreads = do - eventManagerArray <- readIORef eventManager - let (low, high) = boundsIOArray eventManagerArray - withMVar ioManagerLock $ \_ -> - forM_ [low..high] (startIOManagerThread eventManagerArray) - +startIOManagerThreads = + withMVar ioManagerLock $ \_ -> do + eventManagerArray <- readIORef eventManager + let (_, high) = boundsIOArray eventManagerArray + forM_ [0..high] (startIOManagerThread eventManagerArray) + writeIORef numEnabledEventManagers (high+1) + +restartPollLoop :: EventManager -> Int -> IO ThreadId +restartPollLoop mgr i = do + M.release mgr + !t <- forkOn i $ loop mgr + labelThread t "IOManager" + return t + startIOManagerThread :: IOArray Int (Maybe (ThreadId, EventManager)) -> Int -> IO () @@ -238,7 +251,7 @@ startIOManagerThread eventManagerArray i = do writeIOArray eventManagerArray i (Just (t,mgr)) old <- readIOArray eventManagerArray i case old of - Nothing -> create + Nothing -> create Just (t,em) -> do s <- threadStatus t case s of @@ -251,7 +264,11 @@ startIOManagerThread eventManagerArray i = do -- See #4449 M.cleanup em create - _other -> return () + _other -> do + -- Another thread is currently servicing the manager loop. + -- Tell it to exit and start a new thread to work on the loop. + M.release em + create startTimerManagerThread :: IO () startTimerManagerThread = modifyMVar_ timerManagerThreadVar $ \old -> do @@ -282,15 +299,44 @@ startTimerManagerThread = modifyMVar_ timerManagerThreadVar $ \old -> do shutdownManagers :: IO () shutdownManagers = - do eventManagerArray <- readIORef eventManager - let (low, high) = boundsIOArray eventManagerArray - forM_ [low..high] $ \i -> do - mmgr <- readIOArray eventManagerArray i - case mmgr of - Nothing -> return () - Just (_,mgr) -> M.shutdown mgr + withMVar ioManagerLock $ \_ -> do + eventManagerArray <- readIORef eventManager + let (_, high) = boundsIOArray eventManagerArray + forM_ [0..high] $ \i -> do + mmgr <- readIOArray eventManagerArray i + case mmgr of + Nothing -> return () + Just (_,mgr) -> M.shutdown mgr foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool ioManagerCapabilitiesChanged :: Int -> IO () -ioManagerCapabilitiesChanged n = return () +ioManagerCapabilitiesChanged new_n_caps = do + withMVar ioManagerLock $ \_ -> do + numEnabled <- readIORef numEnabledEventManagers + writeIORef numEnabledEventManagers new_n_caps + eventManagerArray <- readIORef eventManager + let (_, high) = boundsIOArray eventManagerArray + let old_n_caps = high + 1 + if new_n_caps > old_n_caps + then do new_eventManagerArray <- newIOArray (0, new_n_caps - 1) Nothing + + -- copy the existing values into the new array: + forM_ [0..high] $ \i -> do + Just (tid,mgr) <- readIOArray eventManagerArray i + if i < numEnabled - 1 + then writeIOArray new_eventManagerArray i (Just (tid,mgr)) + else do tid' <- restartPollLoop mgr i + writeIOArray new_eventManagerArray i (Just (tid',mgr)) + + -- create new IO managers for the new caps: + forM_ [old_n_caps..new_n_caps-1] $ + startIOManagerThread eventManagerArray + + -- update the event manager array reference: + writeIORef eventManager new_eventManagerArray + else when (new_n_caps > numEnabled) $ + forM_ [numEnabled..new_n_caps-1] $ \i -> do + Just (_,mgr) <- readIOArray eventManagerArray i + tid <- restartPollLoop mgr i + writeIOArray eventManagerArray i (Just (tid,mgr)) |