diff options
author | David Feuer <david.feuer@gmail.com> | 2019-01-20 19:32:49 -0500 |
---|---|---|
committer | Marge Bot <ben+marge-bot@smart-cactus.org> | 2019-10-23 05:58:37 -0400 |
commit | 96c5411ad8859eb5aef274165de2859b7f127c8f (patch) | |
tree | 4635840d573c8913a1d712c4fe584eb4a4f959f7 /libraries/base/Control/Concurrent/QSemN.hs | |
parent | 2d2cc76ffb781d01c800608cd8be05cca67ac4c0 (diff) | |
download | haskell-96c5411ad8859eb5aef274165de2859b7f127c8f.tar.gz |
Use an IORef for QSemN
Replace the outer `MVar` in `QSemN` with an `IORef`. This should
probably be lighter, and it removes the need for `uninterruptibleMask`.
Previously Differential Revision https://phabricator.haskell.org/D4896
Diffstat (limited to 'libraries/base/Control/Concurrent/QSemN.hs')
-rw-r--r-- | libraries/base/Control/Concurrent/QSemN.hs | 90 |
1 files changed, 51 insertions, 39 deletions
diff --git a/libraries/base/Control/Concurrent/QSemN.hs b/libraries/base/Control/Concurrent/QSemN.hs index 8b3ce5526e..41d2b0ab73 100644 --- a/libraries/base/Control/Concurrent/QSemN.hs +++ b/libraries/base/Control/Concurrent/QSemN.hs @@ -1,4 +1,6 @@ -{-# LANGUAGE Safe #-} +{-# LANGUAGE Trustworthy #-} +{-# LANGUAGE BangPatterns #-} +{-# OPTIONS_GHC -funbox-strict-fields #-} ----------------------------------------------------------------------------- -- | @@ -23,11 +25,12 @@ module Control.Concurrent.QSemN signalQSemN -- :: QSemN -> Int -> IO () ) where -import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar, tryTakeMVar - , putMVar, newMVar +import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar , tryPutMVar, isEmptyMVar) import Control.Exception -import Data.Maybe +import Control.Monad (when) +import Data.IORef (IORef, newIORef, atomicModifyIORef) +import System.IO.Unsafe (unsafePerformIO) -- | 'QSemN' is a quantity semaphore in which the resource is acquired -- and released in units of one. It provides guaranteed FIFO ordering @@ -39,7 +42,7 @@ import Data.Maybe -- -- is safe; it never loses any of the resource. -- -newtype QSemN = QSemN (MVar (Int, [(Int, MVar ())], [(Int, MVar ())])) +data QSemN = QSemN !(IORef (Int, [(Int, MVar ())], [(Int, MVar ())])) -- The semaphore state (i, xs, ys): -- @@ -55,9 +58,7 @@ newtype QSemN = QSemN (MVar (Int, [(Int, MVar ())], [(Int, MVar ())])) -- A thread can dequeue itself by also putting () into the MVar, which -- it must do if it receives an exception while blocked in waitQSemN. -- This means that when unblocking a thread in signalQSemN we must --- first check whether the MVar is already full; the MVar lock on the --- semaphore itself resolves race conditions between signalQSemN and a --- thread attempting to dequeue itself. +-- first check whether the MVar is already full. -- |Build a new 'QSemN' with a supplied initial quantity. -- The initial quantity must be at least 0. @@ -65,54 +66,65 @@ newQSemN :: Int -> IO QSemN newQSemN initial | initial < 0 = fail "newQSemN: Initial quantity must be non-negative" | otherwise = do - sem <- newMVar (initial, [], []) + sem <- newIORef (initial, [], []) return (QSemN sem) +-- An unboxed version of Maybe (MVar a) +data MaybeMV a = JustMV !(MVar a) | NothingMV + -- |Wait for the specified quantity to become available waitQSemN :: QSemN -> Int -> IO () -waitQSemN (QSemN m) sz = - mask_ $ do - (i,b1,b2) <- takeMVar m +-- We need to mask here. Once we've enqueued our MVar, we need +-- to be sure to wait for it. Otherwise, we could lose our +-- allocated resource. +waitQSemN qs@(QSemN m) sz = mask_ $ do + -- unsafePerformIO and not unsafeDupablePerformIO. We must + -- be sure to wait on the same MVar that gets enqueued. + mmvar <- atomicModifyIORef m $ \ (i,b1,b2) -> unsafePerformIO $ do let z = i-sz if z < 0 - then do - b <- newEmptyMVar - putMVar m (i, b1, (sz,b):b2) - wait b - else do - putMVar m (z, b1, b2) - return () + then do + b <- newEmptyMVar + return ((i, b1, (sz,b):b2), JustMV b) + else return ((z, b1, b2), NothingMV) + + -- Note: this case match actually allocates the MVar if necessary. + case mmvar of + NothingMV -> return () + JustMV b -> wait b where + wait :: MVar () -> IO () wait b = do - takeMVar b `onException` - (uninterruptibleMask_ $ do -- Note [signal uninterruptible] - (i,b1,b2) <- takeMVar m - r <- tryTakeMVar b - r' <- if isJust r - then signal sz (i,b1,b2) - else do putMVar b (); return (i,b1,b2) - putMVar m r') + takeMVar b `onException` do + already_filled <- not <$> tryPutMVar b () + when already_filled $ signalQSemN qs sz -- |Signal that a given quantity is now available from the 'QSemN'. signalQSemN :: QSemN -> Int -> IO () -signalQSemN (QSemN m) sz = uninterruptibleMask_ $ do - r <- takeMVar m - r' <- signal sz r - putMVar m r' - -signal :: Int - -> (Int,[(Int,MVar ())],[(Int,MVar ())]) - -> IO (Int,[(Int,MVar ())],[(Int,MVar ())]) +-- We don't need to mask here because we should *already* be masked +-- here (e.g., by bracket). Indeed, if we're not already masked, +-- it's too late to do so. +-- +-- What if the unsafePerformIO thunk is forced in another thread, +-- and receives an asynchronous exception? That shouldn't be a +-- problem: when we force it ourselves, presumably masked, we +-- will resume its execution. +signalQSemN (QSemN m) sz0 = do + -- unsafePerformIO and not unsafeDupablePerformIO. We must not + -- wake up more threads than we're supposed to. + unit <- atomicModifyIORef m $ \(i,a1,a2) -> + unsafePerformIO (loop (sz0 + i) a1 a2) -signal sz0 (i,a1,a2) = loop (sz0 + i) a1 a2 + -- Forcing this will actually wake the necessary threads. + evaluate unit where - loop 0 bs b2 = return (0, bs, b2) - loop sz [] [] = return (sz, [], []) + loop 0 bs b2 = return ((0, bs, b2), ()) + loop sz [] [] = return ((sz, [], []), ()) loop sz [] b2 = loop sz (reverse b2) [] loop sz ((j,b):bs) b2 | j > sz = do r <- isEmptyMVar b - if r then return (sz, (j,b):bs, b2) + if r then return ((sz, (j,b):bs, b2), ()) else loop sz bs b2 | otherwise = do r <- tryPutMVar b () |