summaryrefslogtreecommitdiff
path: root/compiler/GHC/Data/Stream.hs
blob: d41b64b2263a8ecce28d1b459d4c1a07a275151d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE RankNTypes #-}
-- -----------------------------------------------------------------------------
--
-- (c) The University of Glasgow 2012
--
-- -----------------------------------------------------------------------------

-- | Monadic streams
module GHC.Data.Stream (
    Stream(..), StreamS(..), runStream, yield, liftIO,
    collect,  consume, fromList,
    map, mapM, mapAccumL_
  ) where

import GHC.Prelude hiding (map,mapM)

import Control.Monad hiding (mapM)
import Control.Monad.IO.Class

-- |
-- @Stream m a b@ is a computation in some Monad @m@ that delivers a sequence
-- of elements of type @a@ followed by a result of type @b@.
--
-- More concretely, a value of type @Stream m a b@ can be run using @runStreamInternal@
-- in the Monad @m@, and it delivers either
--
--  * the final result: @Done b@, or
--  * @Yield a str@ where @a@ is the next element in the stream, and @str@
--     is the rest of the stream
--  * @Effect mstr@ where @mstr@ is some action running in @m@ which
--  generates the rest of the stream.
--
-- Stream is itself a Monad, and provides an operation 'yield' that
-- produces a new element of the stream.  This makes it convenient to turn
-- existing monadic computations into streams.
--
-- The idea is that Stream is useful for making a monadic computation
-- that produces values from time to time.  This can be used for
-- knitting together two complex monadic operations, so that the
-- producer does not have to produce all its values before the
-- consumer starts consuming them.  We make the producer into a
-- Stream, and the consumer pulls on the stream each time it wants a
-- new value.
--
-- 'Stream' is implemented in the "yoneda" style for efficiency. By
-- representing a stream in this manner 'fmap' and '>>=' operations are
-- accumulated in the function parameters before being applied once when
-- the stream is destroyed. In the old implementation each usage of 'mapM'
-- and '>>=' would traverse the entire stream in order to apply the
-- substitution at the leaves.
--
-- The >>= operation for 'Stream' was a hot-spot in the ticky profile for
-- the "ManyConstructors" test which called the 'cg' function many times in
-- @StgToCmm.hs@
--
newtype Stream m a b =
          Stream { runStreamInternal :: forall r' r .
                                        (a -> m r') -- For fusing calls to `map` and `mapM`
                                     -> (b -> StreamS m r' r)  -- For fusing `>>=`
                                     -> StreamS m r' r }

runStream :: Applicative m => Stream m r' r -> StreamS m r' r
runStream st = runStreamInternal st pure Done

data StreamS m a b = Yield a (StreamS m a b)
                   | Done b
                   | Effect (m (StreamS m a b))
  deriving (Functor)

instance Monad m => Applicative (StreamS m a) where
  pure = Done
  (<*>) = ap

instance Monad m => Monad (StreamS m a) where
  a >>= k = case a of
                      Done r -> k r
                      Yield a s -> Yield a (s >>= k)
                      Effect m -> Effect (fmap (>>= k) m)

instance Functor (Stream f a) where
  fmap = liftM

instance Applicative (Stream m a) where
  pure a = Stream $ \_f g -> g a
  (<*>) = ap

instance Monad (Stream m a) where
  Stream m >>= k = Stream $ \f h -> m f (\a -> runStreamInternal (k a) f h)

instance MonadIO m => MonadIO (Stream m b) where
  liftIO io = Stream $ \_f g -> Effect (g <$> liftIO io)

yield :: Monad m => a -> Stream m a ()
yield a = Stream $ \f rest -> Effect (flip Yield (rest ())  <$> f a)

-- | Turn a Stream into an ordinary list, by demanding all the elements.
collect :: Monad m => Stream m a () -> m [a]
collect str = go [] (runStream str)
 where
  go acc (Done ()) = return (reverse acc)
  go acc (Effect m) = m >>= go acc
  go acc (Yield a k) = go (a:acc) k

consume :: (Monad m, Monad n) => Stream m a b -> (forall a . m a -> n a) -> (a -> n ()) -> n b
consume str l f = go (runStream str)
  where
    go (Done r) = return r
    go (Yield a p) = f a >> go p
    go (Effect m)  = l m >>= go

-- | Turn a list into a 'Stream', by yielding each element in turn.
fromList :: Monad m => [a] -> Stream m a ()
fromList = mapM_ yield

-- | Apply a function to each element of a 'Stream', lazily
map :: Monad m => (a -> b) -> Stream m a x -> Stream m b x
map f str = Stream $ \g h -> runStreamInternal str (g . f) h

-- | Apply a monadic operation to each element of a 'Stream', lazily
mapM :: Monad m => (a -> m b) -> Stream m a x -> Stream m b x
mapM f str = Stream $ \g h -> runStreamInternal str (g <=< f) h

-- | Note this is not very efficient because it traverses the whole stream
-- before rebuilding it, avoid using it if you can. mapAccumL used to
-- implemented but it wasn't used anywhere in the compiler and has similar
-- efficiency problems.
mapAccumL_ :: forall m a b c r . Monad m => (c -> a -> m (c,b)) -> c -> Stream m a r
           -> Stream m b (c, r)
mapAccumL_ f c str = Stream $ \f h -> go c f h (runStream str)

  where
    go :: c
             -> (b -> m r')
             -> ((c, r) -> StreamS m r' r1)
             -> StreamS m a r
             -> StreamS m r' r1
    go c _f1 h1 (Done r) = h1 (c, r)
    go c f1 h1 (Yield a p) = Effect (f c a >>= (\(c', b) -> f1 b
                                           >>= \r' -> return $ Yield r' (go c' f1 h1 p)))
    go c f1 h1 (Effect m) = Effect (go c f1 h1 <$> m)