summaryrefslogtreecommitdiff
path: root/libraries/base/Control/Concurrent/QSemN.hs
diff options
context:
space:
mode:
authorDavid Feuer <david.feuer@gmail.com>2019-01-20 19:32:49 -0500
committerMarge Bot <ben+marge-bot@smart-cactus.org>2019-10-23 05:58:37 -0400
commit96c5411ad8859eb5aef274165de2859b7f127c8f (patch)
tree4635840d573c8913a1d712c4fe584eb4a4f959f7 /libraries/base/Control/Concurrent/QSemN.hs
parent2d2cc76ffb781d01c800608cd8be05cca67ac4c0 (diff)
downloadhaskell-96c5411ad8859eb5aef274165de2859b7f127c8f.tar.gz
Use an IORef for QSemN
Replace the outer `MVar` in `QSemN` with an `IORef`. This should probably be lighter, and it removes the need for `uninterruptibleMask`. Previously Differential Revision https://phabricator.haskell.org/D4896
Diffstat (limited to 'libraries/base/Control/Concurrent/QSemN.hs')
-rw-r--r--libraries/base/Control/Concurrent/QSemN.hs90
1 files changed, 51 insertions, 39 deletions
diff --git a/libraries/base/Control/Concurrent/QSemN.hs b/libraries/base/Control/Concurrent/QSemN.hs
index 8b3ce5526e..41d2b0ab73 100644
--- a/libraries/base/Control/Concurrent/QSemN.hs
+++ b/libraries/base/Control/Concurrent/QSemN.hs
@@ -1,4 +1,6 @@
-{-# LANGUAGE Safe #-}
+{-# LANGUAGE Trustworthy #-}
+{-# LANGUAGE BangPatterns #-}
+{-# OPTIONS_GHC -funbox-strict-fields #-}
-----------------------------------------------------------------------------
-- |
@@ -23,11 +25,12 @@ module Control.Concurrent.QSemN
signalQSemN -- :: QSemN -> Int -> IO ()
) where
-import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar, tryTakeMVar
- , putMVar, newMVar
+import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar
, tryPutMVar, isEmptyMVar)
import Control.Exception
-import Data.Maybe
+import Control.Monad (when)
+import Data.IORef (IORef, newIORef, atomicModifyIORef)
+import System.IO.Unsafe (unsafePerformIO)
-- | 'QSemN' is a quantity semaphore in which the resource is acquired
-- and released in units of one. It provides guaranteed FIFO ordering
@@ -39,7 +42,7 @@ import Data.Maybe
--
-- is safe; it never loses any of the resource.
--
-newtype QSemN = QSemN (MVar (Int, [(Int, MVar ())], [(Int, MVar ())]))
+data QSemN = QSemN !(IORef (Int, [(Int, MVar ())], [(Int, MVar ())]))
-- The semaphore state (i, xs, ys):
--
@@ -55,9 +58,7 @@ newtype QSemN = QSemN (MVar (Int, [(Int, MVar ())], [(Int, MVar ())]))
-- A thread can dequeue itself by also putting () into the MVar, which
-- it must do if it receives an exception while blocked in waitQSemN.
-- This means that when unblocking a thread in signalQSemN we must
--- first check whether the MVar is already full; the MVar lock on the
--- semaphore itself resolves race conditions between signalQSemN and a
--- thread attempting to dequeue itself.
+-- first check whether the MVar is already full.
-- |Build a new 'QSemN' with a supplied initial quantity.
-- The initial quantity must be at least 0.
@@ -65,54 +66,65 @@ newQSemN :: Int -> IO QSemN
newQSemN initial
| initial < 0 = fail "newQSemN: Initial quantity must be non-negative"
| otherwise = do
- sem <- newMVar (initial, [], [])
+ sem <- newIORef (initial, [], [])
return (QSemN sem)
+-- An unboxed version of Maybe (MVar a)
+data MaybeMV a = JustMV !(MVar a) | NothingMV
+
-- |Wait for the specified quantity to become available
waitQSemN :: QSemN -> Int -> IO ()
-waitQSemN (QSemN m) sz =
- mask_ $ do
- (i,b1,b2) <- takeMVar m
+-- We need to mask here. Once we've enqueued our MVar, we need
+-- to be sure to wait for it. Otherwise, we could lose our
+-- allocated resource.
+waitQSemN qs@(QSemN m) sz = mask_ $ do
+ -- unsafePerformIO and not unsafeDupablePerformIO. We must
+ -- be sure to wait on the same MVar that gets enqueued.
+ mmvar <- atomicModifyIORef m $ \ (i,b1,b2) -> unsafePerformIO $ do
let z = i-sz
if z < 0
- then do
- b <- newEmptyMVar
- putMVar m (i, b1, (sz,b):b2)
- wait b
- else do
- putMVar m (z, b1, b2)
- return ()
+ then do
+ b <- newEmptyMVar
+ return ((i, b1, (sz,b):b2), JustMV b)
+ else return ((z, b1, b2), NothingMV)
+
+ -- Note: this case match actually allocates the MVar if necessary.
+ case mmvar of
+ NothingMV -> return ()
+ JustMV b -> wait b
where
+ wait :: MVar () -> IO ()
wait b = do
- takeMVar b `onException`
- (uninterruptibleMask_ $ do -- Note [signal uninterruptible]
- (i,b1,b2) <- takeMVar m
- r <- tryTakeMVar b
- r' <- if isJust r
- then signal sz (i,b1,b2)
- else do putMVar b (); return (i,b1,b2)
- putMVar m r')
+ takeMVar b `onException` do
+ already_filled <- not <$> tryPutMVar b ()
+ when already_filled $ signalQSemN qs sz
-- |Signal that a given quantity is now available from the 'QSemN'.
signalQSemN :: QSemN -> Int -> IO ()
-signalQSemN (QSemN m) sz = uninterruptibleMask_ $ do
- r <- takeMVar m
- r' <- signal sz r
- putMVar m r'
-
-signal :: Int
- -> (Int,[(Int,MVar ())],[(Int,MVar ())])
- -> IO (Int,[(Int,MVar ())],[(Int,MVar ())])
+-- We don't need to mask here because we should *already* be masked
+-- here (e.g., by bracket). Indeed, if we're not already masked,
+-- it's too late to do so.
+--
+-- What if the unsafePerformIO thunk is forced in another thread,
+-- and receives an asynchronous exception? That shouldn't be a
+-- problem: when we force it ourselves, presumably masked, we
+-- will resume its execution.
+signalQSemN (QSemN m) sz0 = do
+ -- unsafePerformIO and not unsafeDupablePerformIO. We must not
+ -- wake up more threads than we're supposed to.
+ unit <- atomicModifyIORef m $ \(i,a1,a2) ->
+ unsafePerformIO (loop (sz0 + i) a1 a2)
-signal sz0 (i,a1,a2) = loop (sz0 + i) a1 a2
+ -- Forcing this will actually wake the necessary threads.
+ evaluate unit
where
- loop 0 bs b2 = return (0, bs, b2)
- loop sz [] [] = return (sz, [], [])
+ loop 0 bs b2 = return ((0, bs, b2), ())
+ loop sz [] [] = return ((sz, [], []), ())
loop sz [] b2 = loop sz (reverse b2) []
loop sz ((j,b):bs) b2
| j > sz = do
r <- isEmptyMVar b
- if r then return (sz, (j,b):bs, b2)
+ if r then return ((sz, (j,b):bs, b2), ())
else loop sz bs b2
| otherwise = do
r <- tryPutMVar b ()