summaryrefslogtreecommitdiff
path: root/libraries/base/Control/Concurrent/QSem.hs
blob: 9f53eef826884eac51cf66c179b6ce1a1f51bc72 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
{-# LANGUAGE Safe #-}
{-# LANGUAGE BangPatterns #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.QSem
-- Copyright   :  (c) The University of Glasgow 2001
-- License     :  BSD-style (see the file libraries/base/LICENSE)
-- 
-- Maintainer  :  libraries@haskell.org
-- Stability   :  experimental
-- Portability :  non-portable (concurrency)
--
-- Simple quantity semaphores.
--
-----------------------------------------------------------------------------

module Control.Concurrent.QSem
        ( -- * Simple Quantity Semaphores
          QSem,         -- abstract
          newQSem,      -- :: Int  -> IO QSem
          waitQSem,     -- :: QSem -> IO ()
          signalQSem    -- :: QSem -> IO ()
        ) where

import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar, tryTakeMVar
                          , putMVar, newMVar, tryPutMVar)
import Control.Exception
import Data.Maybe

-- | 'QSem' is a quantity semaphore in which the resource is acquired
-- and released in units of one. It provides guaranteed FIFO ordering
-- for satisfying blocked `waitQSem` calls.
--
-- The pattern
--
-- >   bracket_ waitQSem signalQSem (...)
--
-- is safe; it never loses a unit of the resource.
--
newtype QSem = QSem (MVar (Int, [MVar ()], [MVar ()]))

-- The semaphore state (i, xs, ys):
--
--   i is the current resource value
--
--   (xs,ys) is the queue of blocked threads, where the queue is
--           given by xs ++ reverse ys.  We can enqueue new blocked threads
--           by consing onto ys, and dequeue by removing from the head of xs.
--
-- A blocked thread is represented by an empty (MVar ()).  To unblock
-- the thread, we put () into the MVar.
--
-- A thread can dequeue itself by also putting () into the MVar, which
-- it must do if it receives an exception while blocked in waitQSem.
-- This means that when unblocking a thread in signalQSem we must
-- first check whether the MVar is already full; the MVar lock on the
-- semaphore itself resolves race conditions between signalQSem and a
-- thread attempting to dequeue itself.

-- |Build a new 'QSem' with a supplied initial quantity.
--  The initial quantity must be at least 0.
newQSem :: Int -> IO QSem
newQSem initial
  | initial < 0 = fail "newQSem: Initial quantity must be non-negative"
  | otherwise   = do
      sem <- newMVar (initial, [], [])
      return (QSem sem)

-- |Wait for a unit to become available
waitQSem :: QSem -> IO ()
waitQSem (QSem m) =
  mask_ $ do
    (i,b1,b2) <- takeMVar m
    if i == 0
       then do
         b <- newEmptyMVar
         putMVar m (i, b1, b:b2)
         wait b
       else do
         let !z = i-1
         putMVar m (z, b1, b2)
         return ()
  where
    wait b = takeMVar b `onException`
                (uninterruptibleMask_ $ do -- Note [signal uninterruptible]
                   (i,b1,b2) <- takeMVar m
                   r <- tryTakeMVar b
                   r' <- if isJust r
                            then signal (i,b1,b2)
                            else do putMVar b (); return (i,b1,b2)
                   putMVar m r')

-- |Signal that a unit of the 'QSem' is available
signalQSem :: QSem -> IO ()
signalQSem (QSem m) =
  uninterruptibleMask_ $ do -- Note [signal uninterruptible]
    r <- takeMVar m
    r' <- signal r
    putMVar m r'

-- Note [signal uninterruptible]
--
--   If we have
--
--      bracket waitQSem signalQSem (...)
--
--   and an exception arrives at the signalQSem, then we must not lose
--   the resource.  The signalQSem is masked by bracket, but taking
--   the MVar might block, and so it would be interruptible.  Hence we
--   need an uninterruptibleMask here.
--
--   This isn't ideal: during high contention, some threads won't be
--   interruptible.  The QSemSTM implementation has better behaviour
--   here, but it performs much worse than this one in some
--   benchmarks.

signal :: (Int,[MVar ()],[MVar ()]) -> IO (Int,[MVar ()],[MVar ()])
signal (i,a1,a2) =
 if i == 0
   then loop a1 a2
   else let !z = i+1 in return (z, a1, a2)
 where
   loop [] [] = return (1, [], [])
   loop [] b2 = loop (reverse b2) []
   loop (b:bs) b2 = do
     r <- tryPutMVar b ()
     if r then return (0, bs, b2)
          else loop bs b2