summaryrefslogtreecommitdiff
path: root/testsuite/tests/concurrent/should_run/T13916_Bracket.hs
blob: b09adfc860a399667d1e9534631abcbe5963f84d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{- |
Module      : Bracket
Description : Handling multiple environments with bracket-like apis
Maintainer  : robertkennedy@clearwateranalytics.com
Stability   : stable

This module is meant for ie Sql or mongo connections, where you may wish for some number of easy to grab
environments. In particular, this assumes your connection has some initialization/release functions

This module creates bugs with any optimizations enabled. The bugs do not occur if the program is in the same
module.
-}
module T13916_Bracket (
    -- * Data Types
    Spawner(..), Limit(..), Cache,
    -- * Usage
    withEnvCache, withEnv
    ) where

import Control.Concurrent.STM
import Control.Concurrent.STM.TSem
import Control.Exception hiding (handle)
import Control.Monad
import Data.Vector (Vector)
import qualified Data.Vector as Vector

-- * Data Types
-- | Tells the program how many environments it is allowed to spawn.
-- A `Lax` limit will spawn extra connections if the `Cache` is empty,
-- while a `Hard` limit will not spawn any more than the given number of connections simultaneously.
--
-- @since 0.3.7
data Limit = Hard {getLimit :: {-# unpack #-} !Int}

data Spawner env = Spawner
    { maker  :: IO env
    , killer :: env -> IO ()
    , isDead :: env -> IO Bool
    }

type VCache env = Vector (TMVar env)
data Cache env = Unlimited { spawner :: Spawner env
                           , vcache :: !(VCache env)
                           }
               | Limited   { spawner :: Spawner env
                           , vcache :: !(VCache env)
                           , envsem :: TSem
                           }

-- ** Initialization
withEnvCache :: Limit -> Spawner env -> (Cache env -> IO a) -> IO a
withEnvCache limit spawner = bracket starter releaseCache
    where starter = case limit of
            Hard n -> Limited spawner <$> initializeEmptyCache n <*> atomically (newTSem n)

-- ** Using a single value
withEnv :: Cache env -> (env -> IO a) -> IO a
withEnv cache = case cache of
    Unlimited{..} -> withEnvUnlimited spawner vcache
    Limited{..}   -> withEnvLimited   spawner vcache envsem

-- *** Unlimited
-- | Takes an env and returns it on completion of the function.
-- If all envs are already taken or closed, this will spin up a new env.
-- When the function finishes, this will attempt to put the env into the cache. If it cannot,
-- it will kill the env. Note this can lead to many concurrent connections.
--
-- @since 0.3.5
withEnvUnlimited :: Spawner env -> VCache env -> (env -> IO a) -> IO a
withEnvUnlimited Spawner{..} cache = bracket taker putter
  where
    taker = do
        mpipe <- atomically $ tryTakeEnv cache
        case mpipe of
            Nothing  -> maker
            Just env -> isDead env >>= \b -> if not b then return env else killer env >> maker

    putter env = do
        accepted <- atomically $ tryPutEnv cache env
        unless accepted $ killer env

-- *** Limited
-- | Takes an env and returns it on completion of the function.
-- If all envs are already taken, this will wait. This should have a constant number of environments
--
-- @since 0.3.6
withEnvLimited :: Spawner env -> VCache env -> TSem -> (env -> IO a) -> IO a
withEnvLimited spawner vcache envsem = bracket taker putter
  where
    taker = limitMakeEnv spawner vcache envsem
    putter env = atomically $ putEnv vcache env

limitMakeEnv :: Spawner env -> VCache env -> TSem -> IO env
limitMakeEnv Spawner{..} vcache envsem = go
  where
    go = do
        eenvpermission <- atomically $ ( Left  <$> takeEnv  vcache )
                              `orElse` ( Right <$> waitTSem envsem )
        case eenvpermission of
            Right () -> maker
            Left env -> do
                -- Given our env, we check if it's dead. If it's not, we are done and return it.
                -- If it is dead, we release it, signal that a new env can be created, and then recurse
                isdead <- isDead env
                if not isdead then return env
                    else do
                         killer env
                         atomically $ signalTSem envsem
                         go

-- * Low level
initializeEmptyCache :: Int -> IO (VCache env)
initializeEmptyCache n | n < 1     = return mempty
                       | otherwise = Vector.replicateM n newEmptyTMVarIO

takeEnv :: VCache env -> STM env
takeEnv = Vector.foldl folding retry
    where folding m stmenv = m `orElse` takeTMVar stmenv

tryTakeEnv :: VCache env -> STM (Maybe env)
tryTakeEnv cache = (Just <$> takeEnv cache) `orElse` pure Nothing

putEnv :: VCache env -> env -> STM ()
putEnv cache env = Vector.foldl folding retry cache
    where folding m stmenv = m `orElse` putTMVar stmenv env

tryPutEnv :: VCache env -> env -> STM Bool
tryPutEnv cache env = (putEnv cache env *> return True) `orElse` pure False

releaseCache :: Cache env -> IO ()
releaseCache cache = Vector.mapM_ qkRelease (vcache cache)
    where qkRelease tenv = atomically (tryTakeTMVar tenv)
                       >>= maybe (return ()) (killer $ spawner cache)