1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
{-# LANGUAGE Safe #-}
{-# LANGUAGE BangPatterns #-}
-----------------------------------------------------------------------------
-- |
-- Module : Control.Concurrent.QSem
-- Copyright : (c) The University of Glasgow 2001
-- License : BSD-style (see the file libraries/base/LICENSE)
--
-- Maintainer : libraries@haskell.org
-- Stability : experimental
-- Portability : non-portable (concurrency)
--
-- Simple quantity semaphores.
--
-----------------------------------------------------------------------------
module Control.Concurrent.QSem
( -- * Simple Quantity Semaphores
QSem, -- abstract
newQSem, -- :: Int -> IO QSem
waitQSem, -- :: QSem -> IO ()
signalQSem -- :: QSem -> IO ()
) where
import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar, tryTakeMVar
, putMVar, newMVar, tryPutMVar)
import Control.Exception
import Data.Maybe
-- | 'QSem' is a quantity semaphore in which the resource is acquired
-- and released in units of one. It provides guaranteed FIFO ordering
-- for satisfying blocked `waitQSem` calls.
--
-- The pattern
--
-- > bracket_ waitQSem signalQSem (...)
--
-- is safe; it never loses a unit of the resource.
--
newtype QSem = QSem (MVar (Int, [MVar ()], [MVar ()]))
-- The semaphore state (i, xs, ys):
--
-- i is the current resource value
--
-- (xs,ys) is the queue of blocked threads, where the queue is
-- given by xs ++ reverse ys. We can enqueue new blocked threads
-- by consing onto ys, and dequeue by removing from the head of xs.
--
-- A blocked thread is represented by an empty (MVar ()). To unblock
-- the thread, we put () into the MVar.
--
-- A thread can dequeue itself by also putting () into the MVar, which
-- it must do if it receives an exception while blocked in waitQSem.
-- This means that when unblocking a thread in signalQSem we must
-- first check whether the MVar is already full; the MVar lock on the
-- semaphore itself resolves race conditions between signalQSem and a
-- thread attempting to dequeue itself.
-- |Build a new 'QSem' with a supplied initial quantity.
-- The initial quantity must be at least 0.
newQSem :: Int -> IO QSem
newQSem initial
| initial < 0 = fail "newQSem: Initial quantity must be non-negative"
| otherwise = do
sem <- newMVar (initial, [], [])
return (QSem sem)
-- |Wait for a unit to become available
waitQSem :: QSem -> IO ()
waitQSem (QSem m) =
mask_ $ do
(i,b1,b2) <- takeMVar m
if i == 0
then do
b <- newEmptyMVar
putMVar m (i, b1, b:b2)
wait b
else do
let !z = i-1
putMVar m (z, b1, b2)
return ()
where
wait b = takeMVar b `onException`
(uninterruptibleMask_ $ do -- Note [signal uninterruptible]
(i,b1,b2) <- takeMVar m
r <- tryTakeMVar b
r' <- if isJust r
then signal (i,b1,b2)
else do putMVar b (); return (i,b1,b2)
putMVar m r')
-- |Signal that a unit of the 'QSem' is available
signalQSem :: QSem -> IO ()
signalQSem (QSem m) =
uninterruptibleMask_ $ do -- Note [signal uninterruptible]
r <- takeMVar m
r' <- signal r
putMVar m r'
-- Note [signal uninterruptible]
--
-- If we have
--
-- bracket waitQSem signalQSem (...)
--
-- and an exception arrives at the signalQSem, then we must not lose
-- the resource. The signalQSem is masked by bracket, but taking
-- the MVar might block, and so it would be interruptible. Hence we
-- need an uninterruptibleMask here.
--
-- This isn't ideal: during high contention, some threads won't be
-- interruptible. The QSemSTM implementation has better behaviour
-- here, but it performs much worse than this one in some
-- benchmarks.
signal :: (Int,[MVar ()],[MVar ()]) -> IO (Int,[MVar ()],[MVar ()])
signal (i,a1,a2) =
if i == 0
then loop a1 a2
else let !z = i+1 in return (z, a1, a2)
where
loop [] [] = return (1, [], [])
loop [] b2 = loop (reverse b2) []
loop (b:bs) b2 = do
r <- tryPutMVar b ()
if r then return (0, bs, b2)
else loop bs b2
|