summaryrefslogtreecommitdiff
path: root/libraries
diff options
context:
space:
mode:
authorAndreas Voellmy <andreas.voellmy@gmail.com>2012-12-22 20:56:02 -0500
committerJohan Tibell <johan.tibell@gmail.com>2013-02-11 21:38:06 -0800
commit12f3fef5ec52c1ec0958b674adcd981f48048428 (patch)
tree0848a1a1ddb84b0fb5d0224f8c8bb97c5c587a46 /libraries
parentadebaa42dbe3b09e74c864981b6fc8684b6d5086 (diff)
downloadhaskell-12f3fef5ec52c1ec0958b674adcd981f48048428.tar.gz
Parallel IO manager supports increasing and decreasing number of capabilities.
We never deallocate the backend files (e.g. epoll instance, eventfd files) when decreasing number of capabilities. Nor do we exit the poll loop for that instance. However, that thread will naturally empty its queue over time and eventually stay out in a foreign call indefinitely. There is a remote possibility that a Haskell thread got a reference to an IO manager just before the number of capabilities was decreased and then this thread finally registers a callback at some time far in the future. This scenario is the motivation for leaving the backend instance and thread servicing that instance alive. The main change is now in adding new capabilities. Since those capabilites may have been active in the past, we may already have backend files available for use. We signal to the old thread servicing that backend to release the backend and exit. Upon exiting it fills an MVar. We start a new thread bound to the new capability and it waits to enter the poll loop until the MVar is full.
Diffstat (limited to 'libraries')
-rw-r--r--libraries/base/GHC/Event/Manager.hs43
-rw-r--r--libraries/base/GHC/Event/Thread.hs84
2 files changed, 96 insertions, 31 deletions
diff --git a/libraries/base/GHC/Event/Manager.hs b/libraries/base/GHC/Event/Manager.hs
index cb0ab02531..267d09fd20 100644
--- a/libraries/base/GHC/Event/Manager.hs
+++ b/libraries/base/GHC/Event/Manager.hs
@@ -7,7 +7,6 @@
, TypeSynonymInstances
, FlexibleInstances
#-}
-
module GHC.Event.Manager
( -- * Types
EventManager
@@ -22,6 +21,7 @@ module GHC.Event.Manager
, loop
, step
, shutdown
+ , release
, cleanup
, wakeManager
@@ -48,8 +48,9 @@ module GHC.Event.Manager
------------------------------------------------------------------------
-- Imports
-import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar)
-import Control.Exception (finally)
+import Control.Concurrent.MVar (MVar, modifyMVar, newMVar, readMVar, putMVar,
+ tryPutMVar, takeMVar)
+import Control.Exception (onException)
import Control.Monad ((=<<), forM_, liftM, sequence_, when, replicateM, void)
import Data.IORef (IORef, atomicModifyIORef, mkWeakIORef, newIORef, readIORef,
writeIORef)
@@ -104,6 +105,7 @@ type IOCallback = FdKey -> Event -> IO ()
data State = Created
| Running
| Dying
+ | Releasing
| Finished
deriving (Eq, Show)
@@ -115,6 +117,7 @@ data EventManager = EventManager
, emUniqueSource :: {-# UNPACK #-} !UniqueSource
, emControl :: {-# UNPACK #-} !Control
, emOneShot :: !Bool
+ , emLock :: MVar ()
}
callbackArraySize :: Int
@@ -165,12 +168,14 @@ newWith oneShot be = do
when (st /= Finished) $ do
I.delete be
closeControl ctrl
+ lockVar <- newMVar ()
let mgr = EventManager { emBackend = be
, emFds = iofds
, emState = state
, emUniqueSource = us
, emControl = ctrl
, emOneShot = oneShot
+ , emLock = lockVar
}
registerControlFd mgr (controlReadFd ctrl) evtRead
registerControlFd mgr (wakeupReadFd ctrl) evtRead
@@ -185,12 +190,20 @@ shutdown mgr = do
state <- atomicModifyIORef (emState mgr) $ \s -> (Dying, s)
when (state == Running) $ sendDie (emControl mgr)
+-- | Asynchronously tell the thread executing the event
+-- manager loop to exit.
+release :: EventManager -> IO ()
+release EventManager{..} = do
+ state <- atomicModifyIORef emState $ \s -> (Releasing, s)
+ when (state == Running) $ sendWakeup emControl
+
finished :: EventManager -> IO Bool
finished mgr = (== Finished) `liftM` readIORef (emState mgr)
cleanup :: EventManager -> IO ()
cleanup EventManager{..} = do
writeIORef emState Finished
+ void $ tryPutMVar emLock ()
I.delete emBackend
closeControl emControl
@@ -204,24 +217,30 @@ cleanup EventManager{..} = do
-- closes all of its control resources when it finishes.
loop :: EventManager -> IO ()
loop mgr@EventManager{..} = do
+ void $ takeMVar emLock
state <- atomicModifyIORef emState $ \s -> case s of
Created -> (Running, s)
+ Releasing -> (Running, s)
_ -> (s, s)
case state of
- Created -> go `finally` cleanup mgr
- Dying -> cleanup mgr
- _ -> do cleanup mgr
- error $ "GHC.Event.Manager.loop: state is already " ++
- show state
+ Created -> go `onException` cleanup mgr
+ Releasing -> go `onException` cleanup mgr
+ Dying -> cleanup mgr
+ _ -> do cleanup mgr
+ error $ "GHC.Event.Manager.loop: state is already " ++
+ show state
where
- go = do running <- step mgr
- when running (yield >> go)
+ go = do state <- step mgr
+ case state of
+ Running -> yield >> go
+ Releasing -> putMVar emLock ()
+ _ -> cleanup mgr
-step :: EventManager -> IO Bool
+step :: EventManager -> IO State
step mgr@EventManager{..} = do
waitForIO
state <- readIORef emState
- state `seq` return (state == Running)
+ state `seq` return state
where
waitForIO = do
n1 <- I.poll emBackend Nothing (onFdEvent mgr)
diff --git a/libraries/base/GHC/Event/Thread.hs b/libraries/base/GHC/Event/Thread.hs
index 85a037230b..b2aef8ab7e 100644
--- a/libraries/base/GHC/Event/Thread.hs
+++ b/libraries/base/GHC/Event/Thread.hs
@@ -1,6 +1,5 @@
{-# LANGUAGE Trustworthy #-}
{-# LANGUAGE BangPatterns, ForeignFunctionInterface, NoImplicitPrelude #-}
-
module GHC.Event.Thread
( getSystemEventManager
, getSystemTimerManager
@@ -16,7 +15,7 @@ module GHC.Event.Thread
) where
import Control.Exception (finally)
-import Control.Monad (forM, forM_, zipWithM_)
+import Control.Monad (forM, forM_, zipWithM_, when)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Maybe (Maybe(..))
import Data.Tuple (snd)
@@ -38,7 +37,7 @@ import GHC.Event.Manager (Event, EventManager, evtRead, evtWrite, loop,
import qualified GHC.Event.IntMap as IM
import qualified GHC.Event.Manager as M
import qualified GHC.Event.TimerManager as TM
-import GHC.Num ((-))
+import GHC.Num ((-), (+))
import System.IO.Unsafe (unsafePerformIO)
import System.Posix.Types (Fd)
@@ -181,9 +180,15 @@ eventManager = unsafePerformIO $ do
sharedCAF em getOrSetSystemEventThreadEventManagerStore
{-# NOINLINE eventManager #-}
+numEnabledEventManagers :: IORef Int
+numEnabledEventManagers = unsafePerformIO $ do
+ newIORef 0
+
foreign import ccall unsafe "getOrSetSystemEventThreadIOManagerThreadStore"
getOrSetSystemEventThreadIOManagerThreadStore :: Ptr a -> IO (Ptr a)
+-- | The ioManagerLock protects the 'eventManager' value:
+-- Only one thread at a time can start or shutdown event managers.
{-# NOINLINE ioManagerLock #-}
ioManagerLock :: MVar ()
ioManagerLock = unsafePerformIO $ do
@@ -221,12 +226,20 @@ ensureIOManagerIsRunning
startTimerManagerThread
startIOManagerThreads :: IO ()
-startIOManagerThreads = do
- eventManagerArray <- readIORef eventManager
- let (low, high) = boundsIOArray eventManagerArray
- withMVar ioManagerLock $ \_ ->
- forM_ [low..high] (startIOManagerThread eventManagerArray)
-
+startIOManagerThreads =
+ withMVar ioManagerLock $ \_ -> do
+ eventManagerArray <- readIORef eventManager
+ let (_, high) = boundsIOArray eventManagerArray
+ forM_ [0..high] (startIOManagerThread eventManagerArray)
+ writeIORef numEnabledEventManagers (high+1)
+
+restartPollLoop :: EventManager -> Int -> IO ThreadId
+restartPollLoop mgr i = do
+ M.release mgr
+ !t <- forkOn i $ loop mgr
+ labelThread t "IOManager"
+ return t
+
startIOManagerThread :: IOArray Int (Maybe (ThreadId, EventManager))
-> Int
-> IO ()
@@ -238,7 +251,7 @@ startIOManagerThread eventManagerArray i = do
writeIOArray eventManagerArray i (Just (t,mgr))
old <- readIOArray eventManagerArray i
case old of
- Nothing -> create
+ Nothing -> create
Just (t,em) -> do
s <- threadStatus t
case s of
@@ -251,7 +264,11 @@ startIOManagerThread eventManagerArray i = do
-- See #4449
M.cleanup em
create
- _other -> return ()
+ _other -> do
+ -- Another thread is currently servicing the manager loop.
+ -- Tell it to exit and start a new thread to work on the loop.
+ M.release em
+ create
startTimerManagerThread :: IO ()
startTimerManagerThread = modifyMVar_ timerManagerThreadVar $ \old -> do
@@ -282,15 +299,44 @@ startTimerManagerThread = modifyMVar_ timerManagerThreadVar $ \old -> do
shutdownManagers :: IO ()
shutdownManagers =
- do eventManagerArray <- readIORef eventManager
- let (low, high) = boundsIOArray eventManagerArray
- forM_ [low..high] $ \i -> do
- mmgr <- readIOArray eventManagerArray i
- case mmgr of
- Nothing -> return ()
- Just (_,mgr) -> M.shutdown mgr
+ withMVar ioManagerLock $ \_ -> do
+ eventManagerArray <- readIORef eventManager
+ let (_, high) = boundsIOArray eventManagerArray
+ forM_ [0..high] $ \i -> do
+ mmgr <- readIOArray eventManagerArray i
+ case mmgr of
+ Nothing -> return ()
+ Just (_,mgr) -> M.shutdown mgr
foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
ioManagerCapabilitiesChanged :: Int -> IO ()
-ioManagerCapabilitiesChanged n = return ()
+ioManagerCapabilitiesChanged new_n_caps = do
+ withMVar ioManagerLock $ \_ -> do
+ numEnabled <- readIORef numEnabledEventManagers
+ writeIORef numEnabledEventManagers new_n_caps
+ eventManagerArray <- readIORef eventManager
+ let (_, high) = boundsIOArray eventManagerArray
+ let old_n_caps = high + 1
+ if new_n_caps > old_n_caps
+ then do new_eventManagerArray <- newIOArray (0, new_n_caps - 1) Nothing
+
+ -- copy the existing values into the new array:
+ forM_ [0..high] $ \i -> do
+ Just (tid,mgr) <- readIOArray eventManagerArray i
+ if i < numEnabled - 1
+ then writeIOArray new_eventManagerArray i (Just (tid,mgr))
+ else do tid' <- restartPollLoop mgr i
+ writeIOArray new_eventManagerArray i (Just (tid',mgr))
+
+ -- create new IO managers for the new caps:
+ forM_ [old_n_caps..new_n_caps-1] $
+ startIOManagerThread eventManagerArray
+
+ -- update the event manager array reference:
+ writeIORef eventManager new_eventManagerArray
+ else when (new_n_caps > numEnabled) $
+ forM_ [numEnabled..new_n_caps-1] $ \i -> do
+ Just (_,mgr) <- readIOArray eventManagerArray i
+ tid <- restartPollLoop mgr i
+ writeIOArray eventManagerArray i (Just (tid,mgr))