path: root/testsuite/tests
diff options
authorMatthew Pickering <>2021-06-03 23:04:13 +0100
committerMarge Bot <>2021-06-16 20:19:45 -0400
commit01fd26178f2c7ccbddbd66387e21f339cf9cda96 (patch)
tree5afd5f08cbbaf1eaf3996c12854e94864f8c3a35 /testsuite/tests
parenta2e4cb80db1b63ea2c5e0ab501acec7fb1b116e3 (diff)
profiling: Look in RHS of rules for cost centre ticks
There are some obscure situations where the RHS of a rule can contain a tick which is not mentioned anywhere else in the program. If this happens you end up with an obscure linker error. The solution is quite simple, traverse the RHS of rules to also look for ticks. It turned out to be easier to implement if the traversal was moved into CoreTidy rather than at the start of code generation because there we still had easy access to the rules. ./StreamD.o(.text+0x1b9f2): error: undefined reference to 'StreamK_mkStreamFromStream_HPC_cc' ./MArray.o(.text+0xbe83): error: undefined reference to 'StreamK_mkStreamFromStream_HPC_cc' Main.o(.text+0x6fdb): error: undefined reference to 'StreamK_mkStreamFromStream_HPC_cc'
Diffstat (limited to 'testsuite/tests')
14 files changed, 3773 insertions, 0 deletions
diff --git a/testsuite/tests/profiling/should_compile/T19894/Array.hs b/testsuite/tests/profiling/should_compile/T19894/Array.hs
new file mode 100644
index 0000000000..75cad1fcef
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Array.hs
@@ -0,0 +1,74 @@
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE RecordWildCards #-}
+module Array
+ (
+ Array(..)
+ , fromList
+ , read
+ , length
+ , writeNUnsafe
+ , MA.unsafeInlineIO
+ , MA.memcmp
+ , unsafeFreezeWithShrink
+ , foldl'
+ , unsafeIndexIO
+ )
+import Control.Monad.IO.Class (MonadIO(..))
+import Foreign.Storable (Storable(..))
+import GHC.ForeignPtr (ForeignPtr(..))
+import GHC.IO (unsafePerformIO)
+import GHC.Ptr (Ptr(..))
+import Unfold (Unfold(..))
+import Fold (Fold(..))
+import qualified MArray as MA
+import qualified Unfold as UF
+import Prelude hiding (length, read)
+data Array a =
+ Array
+ { aStart :: {-# UNPACK #-} !(ForeignPtr a) -- first address
+ , aEnd :: {-# UNPACK #-} !(Ptr a) -- first unused addres
+ }
+{-# INLINE unsafeFreeze #-}
+unsafeFreeze :: MA.Array a -> Array a
+unsafeFreeze (MA.Array as ae _) = Array as ae
+{-# INLINABLE fromList #-}
+fromList :: Storable a => [a] -> Array a
+fromList xs = unsafeFreeze $ MA.fromList xs
+{-# INLINE [1] writeNUnsafe #-}
+writeNUnsafe :: forall m a. (MonadIO m, Storable a)
+ => Int -> Fold m a (Array a)
+writeNUnsafe n = unsafeFreeze <$> MA.writeNUnsafe n
+{-# INLINE unsafeThaw #-}
+unsafeThaw :: Array a -> MA.Array a
+unsafeThaw (Array as ae) = MA.Array as ae ae
+{-# INLINE length #-}
+length :: forall a. Storable a => Array a -> Int
+length arr = MA.length (unsafeThaw arr)
+{-# INLINE [1] read #-}
+read :: forall m a. (Monad m, Storable a) => Unfold m (Array a) a
+read = UF.lmap unsafeThaw
+{-# INLINE unsafeFreezeWithShrink #-}
+unsafeFreezeWithShrink :: Storable a => MA.Array a -> Array a
+unsafeFreezeWithShrink arr = unsafePerformIO $ do
+ MA.Array as ae _ <- MA.shrinkToFit arr
+ return $ Array as ae
+{-# INLINE [1] foldl' #-}
+foldl' :: forall a b. Storable a => (b -> a -> b) -> b -> Array a -> b
+foldl' f z arr = MA.foldl' f z (unsafeThaw arr)
+{-# INLINE [1] unsafeIndexIO #-}
+unsafeIndexIO :: forall a. Storable a => Array a -> Int -> IO a
+unsafeIndexIO arr = MA.unsafeIndexIO (unsafeThaw arr)
diff --git a/testsuite/tests/profiling/should_compile/T19894/Fold.hs b/testsuite/tests/profiling/should_compile/T19894/Fold.hs
new file mode 100644
index 0000000000..7683b064f4
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Fold.hs
@@ -0,0 +1,277 @@
+{-# LANGUAGE ExistentialQuantification #-}
+module Fold
+ (
+ -- * Types
+ Step(..)
+ , Fold (..)
+ , sum
+ , chunksOf
+ , drain
+ , drainBy
+ )
+import Data.Bifunctor (Bifunctor(..))
+#if defined(FUSION_PLUGIN)
+import Fusion.Plugin.Types (Fuse(..))
+import Prelude hiding (sum, take)
+-- Step of a fold
+-- The Step functor around b allows expressing early termination like a right
+-- fold. Traditional list right folds use function composition and laziness to
+-- terminate early whereas we use data constructors. It allows stream fusion in
+-- contrast to the foldr/build fusion when composing with functions.
+-- | Represents the result of the @step@ of a 'Fold'. 'Partial' returns an
+-- intermediate state of the fold, the fold step can be called again with the
+-- state or the driver can use @extract@ on the state to get the result out.
+-- 'Done' returns the final result and the fold cannot be driven further.
+-- /Pre-release/
+#if defined(FUSION_PLUGIN)
+{-# ANN type Step Fuse #-}
+data Step s b
+ = Partial !s
+ | Done !b
+-- | 'first' maps over 'Partial' and 'second' maps over 'Done'.
+instance Bifunctor Step where
+ {-# INLINE bimap #-}
+ bimap f _ (Partial a) = Partial (f a)
+ bimap _ g (Done b) = Done (g b)
+ {-# INLINE first #-}
+ first f (Partial a) = Partial (f a)
+ first _ (Done x) = Done x
+ {-# INLINE second #-}
+ second _ (Partial x) = Partial x
+ second f (Done a) = Done (f a)
+-- | 'fmap' maps over 'Done'.
+-- @
+-- fmap = 'second'
+-- @
+instance Functor (Step s) where
+ {-# INLINE fmap #-}
+ fmap = second
+-- | Map a monadic function over the result @b@ in @Step s b@.
+-- /Internal/
+{-# INLINE mapMStep #-}
+mapMStep :: Applicative m => (a -> m b) -> Step s a -> m (Step s b)
+mapMStep f res =
+ case res of
+ Partial s -> pure $ Partial s
+ Done b -> Done <$> f b
+-- The Fold type
+-- | The type @Fold m a b@ having constructor @Fold step initial extract@
+-- represents a fold over an input stream of values of type @a@ to a final
+-- value of type @b@ in 'Monad' @m@.
+-- The fold uses an intermediate state @s@ as accumulator, the type @s@ is
+-- internal to the specific fold definition. The initial value of the fold
+-- state @s@ is returned by @initial@. The @step@ function consumes an input
+-- and either returns the final result @b@ if the fold is done or the next
+-- intermediate state (see 'Step'). At any point the fold driver can extract
+-- the result from the intermediate state using the @extract@ function.
+-- NOTE: The constructor is not yet exposed via exposed modules, smart
+-- constructors are provided to create folds. If you think you need the
+-- constructor of this type please consider using the smart constructors in
+-- "Streamly.Internal.Data.Fold' instead.
+-- /since 0.8.0 (type changed)/
+-- @since 0.7.0
+data Fold m a b =
+ -- | @Fold @ @ step @ @ initial @ @ extract@
+ forall s. Fold (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b)
+instance Functor m => Functor (Fold m a) where
+ {-# INLINE fmap #-}
+ fmap f (Fold step1 initial1 extract) = Fold step initial (fmap2 f extract)
+ where
+ initial = fmap2 f initial1
+ step s b = fmap2 f (step1 s b)
+ fmap2 g = fmap (fmap g)
+{-# INLINABLE lmapM #-}
+lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r
+lmapM f (Fold step begin done) = Fold step' begin done
+ where
+ step' x a = f a >>= step x
+-- | Make a fold from a left fold style pure step function and initial value of
+-- the accumulator.
+-- If your 'Fold' returns only 'Partial' (i.e. never returns a 'Done') then you
+-- can use @foldl'*@ constructors.
+-- A fold with an extract function can be expressed using fmap:
+-- @
+-- mkfoldlx :: Monad m => (s -> a -> s) -> s -> (s -> b) -> Fold m a b
+-- mkfoldlx step initial extract = fmap extract (foldl' step initial)
+-- @
+-- See also: "Streamly.Prelude.foldl'"
+-- @since 0.8.0
+{-# INLINE foldl' #-}
+foldl' :: Monad m => (b -> a -> b) -> b -> Fold m a b
+foldl' step initial =
+ Fold
+ (\s a -> return $ Partial $ step s a)
+ (return (Partial initial))
+ return
+-- | Determine the sum of all elements of a stream of numbers. Returns additive
+-- identity (@0@) when the stream is empty. Note that this is not numerically
+-- stable for floating point numbers.
+-- > sum = fmap getSum $ Fold.foldMap Sum
+-- @since 0.7.0
+{-# INLINE sum #-}
+sum :: (Monad m, Num a) => Fold m a a
+sum = foldl' (+) 0
+data Tuple' a b = Tuple' !a !b deriving Show
+{-# INLINE take #-}
+take :: Monad m => Int -> Fold m a b -> Fold m a b
+take n (Fold fstep finitial fextract) = Fold step initial extract
+ where
+ initial = do
+ res <- finitial
+ case res of
+ Partial s ->
+ if n > 0
+ then return $ Partial $ Tuple' 0 s
+ else Done <$> fextract s
+ Done b -> return $ Done b
+ step (Tuple' i r) a = do
+ res <- fstep r a
+ case res of
+ Partial sres -> do
+ let i1 = i + 1
+ s1 = Tuple' i1 sres
+ if i1 < n
+ then return $ Partial s1
+ else Done <$> fextract sres
+ Done bres -> return $ Done bres
+ extract (Tuple' _ r) = fextract r
+#if defined(FUSION_PLUGIN)
+{-# ANN type ManyState Fuse #-}
+data ManyState s1 s2
+ = ManyFirst !s1 !s2
+ | ManyLoop !s1 !s2
+-- | Collect zero or more applications of a fold. @many split collect@ applies
+-- the @split@ fold repeatedly on the input stream and accumulates zero or more
+-- fold results using @collect@.
+-- >>> two = Fold.take 2 Fold.toList
+-- >>> twos = Fold.many two Fold.toList
+-- >>> Stream.fold twos $ Stream.fromList [1..10]
+-- [[1,2],[3,4],[5,6],[7,8],[9,10]]
+-- Stops when @collect@ stops.
+-- See also: "Streamly.Prelude.concatMap", "Streamly.Prelude.foldMany"
+-- @since 0.8.0
+{-# INLINE many #-}
+many :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
+many (Fold sstep sinitial sextract) (Fold cstep cinitial cextract) =
+ Fold step initial extract
+ where
+ -- cs = collect state
+ -- ss = split state
+ -- cres = collect state result
+ -- sres = split state result
+ -- cb = collect done
+ -- sb = split done
+ -- Caution! There is mutual recursion here, inlining the right functions is
+ -- important.
+ {-# INLINE handleSplitStep #-}
+ handleSplitStep branch cs sres =
+ case sres of
+ Partial ss1 -> return $ Partial $ branch ss1 cs
+ Done sb -> runCollector ManyFirst cs sb
+ {-# INLINE handleCollectStep #-}
+ handleCollectStep branch cres =
+ case cres of
+ Partial cs -> do
+ sres <- sinitial
+ handleSplitStep branch cs sres
+ Done cb -> return $ Done cb
+ -- Do not inline this
+ runCollector branch cs sb = cstep cs sb >>= handleCollectStep branch
+ initial = cinitial >>= handleCollectStep ManyFirst
+ {-# INLINE step_ #-}
+ step_ ss cs a = do
+ sres <- sstep ss a
+ handleSplitStep ManyLoop cs sres
+ {-# INLINE step #-}
+ step (ManyFirst ss cs) a = step_ ss cs a
+ step (ManyLoop ss cs) a = step_ ss cs a
+ extract (ManyFirst _ cs) = cextract cs
+ extract (ManyLoop ss cs) = do
+ sb <- sextract ss
+ cres <- cstep cs sb
+ case cres of
+ Partial s -> cextract s
+ Done b -> return b
+{-# INLINE chunksOf #-}
+chunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c
+chunksOf n split = many (take n split)
+{-# INLINABLE drain #-}
+drain :: Monad m => Fold m a ()
+drain = foldl' (\_ _ -> ()) ()
+{-# INLINABLE drainBy #-}
+drainBy :: Monad m => (a -> m b) -> Fold m a ()
+drainBy f = lmapM f drain
diff --git a/testsuite/tests/profiling/should_compile/T19894/Handle.hs b/testsuite/tests/profiling/should_compile/T19894/Handle.hs
new file mode 100644
index 0000000000..d1222312fd
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Handle.hs
@@ -0,0 +1,80 @@
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE RecordWildCards #-}
+module Handle
+ (
+ write
+ , read
+ )
+import Control.Monad.IO.Class (MonadIO(..))
+import Data.Word (Word8)
+import Foreign.Storable (Storable(..))
+import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
+import Foreign.ForeignPtr (withForeignPtr)
+import Foreign.Ptr (plusPtr, minusPtr)
+import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
+import System.IO (Handle, hGetBufSome, hPutBuf)
+import Unfold (Unfold(..))
+import Fold (Fold(..))
+import Array (Array(..))
+import qualified MArray as MA
+import qualified Fold as FL
+import qualified Unfold as UF
+import qualified StreamD as D
+import qualified Array as A
+import Prelude hiding (length, read)
+{-# INLINABLE writeArray #-}
+writeArray :: Storable a => Handle -> Array a -> IO ()
+writeArray _ arr | A.length arr == 0 = return ()
+writeArray h Array{..} = withForeignPtr aStart $ \p -> hPutBuf h p aLen
+ where
+ aLen =
+ let p = unsafeForeignPtrToPtr aStart
+ in aEnd `minusPtr` p
+{-# INLINE writeChunks #-}
+writeChunks :: (MonadIO m, Storable a) => Handle -> Fold m (Array a) ()
+writeChunks h = FL.drainBy (liftIO . writeArray h)
+{-# INLINE writeWithBufferOf #-}
+writeWithBufferOf :: MonadIO m => Int -> Handle -> Fold m Word8 ()
+writeWithBufferOf n h = FL.chunksOf n (A.writeNUnsafe n) (writeChunks h)
+{-# INLINE write #-}
+write :: MonadIO m => Handle -> Fold m Word8 ()
+write = writeWithBufferOf MA.defaultChunkSize
+{-# INLINABLE readArrayUpto #-}
+readArrayUpto :: Int -> Handle -> IO (Array Word8)
+readArrayUpto size h = do
+ ptr <- mallocPlainForeignPtrBytes size
+ -- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
+ withForeignPtr ptr $ \p -> do
+ n <- hGetBufSome h p size
+ -- XXX shrink only if the diff is significant
+ return $
+ A.unsafeFreezeWithShrink $
+ MA.mutableArray ptr (p `plusPtr` n) (p `plusPtr` size)
+{-# INLINE [1] readChunksWithBufferOf #-}
+readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Handle) (Array Word8)
+readChunksWithBufferOf = Unfold step return
+ where
+ {-# INLINE [0] step #-}
+ step (size, h) = do
+ arr <- liftIO $ readArrayUpto size h
+ return $
+ case A.length arr of
+ 0 -> D.Stop
+ _ -> D.Yield arr (size, h)
+{-# INLINE readWithBufferOf #-}
+readWithBufferOf :: MonadIO m => Unfold m (Int, Handle) Word8
+readWithBufferOf = UF.many readChunksWithBufferOf
+{-# INLINE read #-}
+read :: MonadIO m => Unfold m Handle Word8
+read = UF.supplyFirst MA.defaultChunkSize readWithBufferOf
diff --git a/testsuite/tests/profiling/should_compile/T19894/MArray.hs b/testsuite/tests/profiling/should_compile/T19894/MArray.hs
new file mode 100644
index 0000000000..f307bb344d
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/MArray.hs
@@ -0,0 +1,372 @@
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE MagicHash #-}
+{-# LANGUAGE UnboxedTuples #-}
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE ExistentialQuantification #-}
+module MArray
+ (
+ Array (..)
+ , writeNUnsafe
+ , length
+ , fromList
+ , fromListN
+ , defaultChunkSize
+ , read
+ , shrinkToFit
+ , mutableArray
+ , unsafeInlineIO
+ , memcmp
+ , foldl'
+ , unsafeIndexIO
+ )
+import Control.Exception (assert)
+import Control.Monad (when, void)
+import Data.Functor.Identity (runIdentity)
+import Data.Word (Word8)
+import Foreign.C.Types (CSize(..), CInt(..))
+import Fold (Fold(..))
+import Control.Monad.IO.Class (MonadIO(..))
+import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
+import Foreign.ForeignPtr (withForeignPtr, touchForeignPtr)
+import Foreign.Ptr (plusPtr, minusPtr, castPtr)
+import Foreign.Storable (Storable(..))
+import GHC.Base (realWorld#)
+import GHC.ForeignPtr (ForeignPtr(..))
+import GHC.IO (IO(IO), unsafePerformIO)
+import GHC.Ptr (Ptr(..))
+import Step (Step(..))
+import Unfold (Unfold(..))
+import qualified GHC.ForeignPtr as GHC
+import qualified Fold as FL
+import qualified StreamD as D
+import qualified StreamK as K
+import Prelude hiding (length, read)
+data Array a =
+ Array
+ { aStart :: {-# UNPACK #-} !(ForeignPtr a) -- ^ first address
+ , aEnd :: {-# UNPACK #-} !(Ptr a) -- ^ first unused address
+ , aBound :: {-# UNPACK #-} !(Ptr a) -- ^ first address beyond allocated memory
+ }
+{-# INLINE mutableArray #-}
+mutableArray ::
+ ForeignPtr a -> Ptr a -> Ptr a -> Array a
+mutableArray = Array
+data ArrayUnsafe a = ArrayUnsafe
+ {-# UNPACK #-} !(ForeignPtr a) -- first address
+ {-# UNPACK #-} !(Ptr a) -- first unused address
+-- | allocate a new array using the provided allocator function.
+{-# INLINE newArrayAlignedAllocWith #-}
+newArrayAlignedAllocWith :: forall a. Storable a
+ => (Int -> Int -> IO (ForeignPtr a)) -> Int -> Int -> IO (Array a)
+newArrayAlignedAllocWith alloc alignSize count = do
+ let size = count * sizeOf (undefined :: a)
+ fptr <- alloc size alignSize
+ let p = unsafeForeignPtrToPtr fptr
+ return $ Array
+ { aStart = fptr
+ , aEnd = p
+ , aBound = p `plusPtr` size
+ }
+{-# INLINE mallocForeignPtrAlignedBytes #-}
+mallocForeignPtrAlignedBytes :: Int -> Int -> IO (GHC.ForeignPtr a)
+mallocForeignPtrAlignedBytes =
+ GHC.mallocPlainForeignPtrAlignedBytes
+{-# INLINE newArrayAligned #-}
+newArrayAligned :: forall a. Storable a => Int -> Int -> IO (Array a)
+newArrayAligned = newArrayAlignedAllocWith mallocForeignPtrAlignedBytes
+{-# INLINE newArray #-}
+newArray :: forall a. Storable a => Int -> IO (Array a)
+newArray = newArrayAligned (alignment (undefined :: a))
+-- | Like 'writeN' but does not check the array bounds when writing. The fold
+-- driver must not call the step function more than 'n' times otherwise it will
+-- corrupt the memory and crash. This function exists mainly because any
+-- conditional in the step function blocks fusion causing 10x performance
+-- slowdown.
+-- @since 0.7.0
+{-# INLINE [1] writeNUnsafe #-}
+writeNUnsafe :: forall m a. (MonadIO m, Storable a)
+ => Int -> Fold m a (Array a)
+writeNUnsafe n = Fold step initial extract
+ where
+ initial = do
+ (Array start end _) <- liftIO $ newArray (max n 0)
+ return $ FL.Partial $ ArrayUnsafe start end
+ step (ArrayUnsafe start end) x = do
+ liftIO $ poke end x
+ return
+ $ FL.Partial
+ $ ArrayUnsafe start (end `plusPtr` sizeOf (undefined :: a))
+ extract (ArrayUnsafe start end) = return $ Array start end end -- liftIO . shrinkToFit
+{-# INLINE byteLength #-}
+byteLength :: Array a -> Int
+byteLength Array{..} =
+ let p = unsafeForeignPtrToPtr aStart
+ len = aEnd `minusPtr` p
+ in assert (len >= 0) len
+-- | /O(1)/ Get the length of the array i.e. the number of elements in the
+-- array.
+-- @since 0.7.0
+{-# INLINE length #-}
+length :: forall a. Storable a => Array a -> Int
+length arr = byteLength arr `div` sizeOf (undefined :: a)
+{-# INLINE [1] fromStreamDN #-}
+fromStreamDN :: forall m a. (MonadIO m, Storable a)
+ => Int -> D.Stream m a -> m (Array a)
+fromStreamDN limit str = do
+ arr <- liftIO $ newArray limit
+ end <- D.foldlM' fwrite (return $ aEnd arr) $ D.take limit str
+ return $ arr {aEnd = end}
+ where
+ fwrite ptr x = do
+ liftIO $ poke ptr x
+ return $ ptr `plusPtr` sizeOf (undefined :: a)
+{-# INLINABLE fromListN #-}
+fromListN :: Storable a => Int -> [a] -> Array a
+fromListN n xs = unsafePerformIO $ fromStreamDN n $ D.fromList xs
+data GroupState s start end bound
+ = GroupStart s
+ | GroupBuffer s start end bound
+ | GroupYield start end bound (GroupState s start end bound)
+ | GroupFinish
+-- | @arraysOf n stream@ groups the input stream into a stream of
+-- arrays of size n.
+-- @arraysOf n = StreamD.foldMany (Array.writeN n)@
+-- /Pre-release/
+{-# INLINE [1] arraysOf #-}
+arraysOf :: forall m a. (MonadIO m, Storable a)
+ => Int -> D.Stream m a -> D.Stream m (Array a)
+-- XXX the idiomatic implementation leads to large regression in the D.reverse'
+-- benchmark. It seems it has difficulty producing optimized code when
+-- converting to StreamK. Investigate GHC optimizations.
+-- arraysOf n = D.foldMany (writeN n)
+arraysOf n (D.Stream step state) =
+ D.Stream step' (GroupStart state)
+ where
+ {-# INLINE [0] step' #-}
+ step' _ (GroupStart st) = do
+ when (n <= 0) $
+ -- XXX we can pass the module string from the higher level API
+ error $ "Streamly.Internal.Data.Array.Foreign.Mut.Type.fromStreamDArraysOf: the size of "
+ ++ "arrays [" ++ show n ++ "] must be a natural number"
+ Array start end bound <- liftIO $ newArray n
+ return $ D.Skip (GroupBuffer st start end bound)
+ step' gst (GroupBuffer st start end bound) = do
+ r <- step (K.adaptState gst) st
+ case r of
+ D.Yield x s -> do
+ liftIO $ poke end x
+ let end' = end `plusPtr` sizeOf (undefined :: a)
+ return $
+ if end' >= bound
+ then D.Skip (GroupYield start end' bound (GroupStart s))
+ else D.Skip (GroupBuffer s start end' bound)
+ D.Skip s -> return $ D.Skip (GroupBuffer s start end bound)
+ D.Stop -> return $ D.Skip (GroupYield start end bound GroupFinish)
+ step' _ (GroupYield start end bound next) =
+ return $ D.Yield (Array start end bound) next
+ step' _ GroupFinish = return D.Stop
+allocOverhead :: Int
+allocOverhead = 2 * sizeOf (undefined :: Int)
+mkChunkSize :: Int -> Int
+mkChunkSize n = let size = n - allocOverhead in max size 0
+mkChunkSizeKB :: Int -> Int
+mkChunkSizeKB n = mkChunkSize (n * k)
+ where k = 1024
+defaultChunkSize :: Int
+defaultChunkSize = mkChunkSizeKB 32
+{-# INLINE bufferChunks #-}
+bufferChunks :: (MonadIO m, Storable a) =>
+ D.Stream m a -> m (K.Stream m (Array a))
+bufferChunks m = D.foldr K.cons K.nil $ arraysOf defaultChunkSize m
+data Producer m a b =
+ -- | @Producer step inject extract@
+ forall s. Producer (s -> m (Step s b)) (a -> m s) (s -> m a)
+{-# INLINE unsafeInlineIO #-}
+unsafeInlineIO :: IO a -> a
+unsafeInlineIO (IO m) = case m realWorld# of (# _, r #) -> r
+data ReadUState a = ReadUState
+ {-# UNPACK #-} !(ForeignPtr a) -- foreign ptr with end of array pointer
+ {-# UNPACK #-} !(Ptr a) -- current pointer
+-- | Resumable unfold of an array.
+{-# INLINE [1] producer #-}
+producer :: forall m a. (Monad m, Storable a) => Producer m (Array a) a
+producer = Producer step inject extract
+ where
+ inject (Array (ForeignPtr start contents) (Ptr end) _) =
+ return $ ReadUState (ForeignPtr end contents) (Ptr start)
+ {-# INLINE [0] step #-}
+ step (ReadUState fp@(ForeignPtr end _) p) | p == Ptr end =
+ let x = unsafeInlineIO $ touchForeignPtr fp
+ in x `seq` return D.Stop
+ step (ReadUState fp p) = do
+ -- unsafeInlineIO allows us to run this in Identity monad for pure
+ -- toList/foldr case which makes them much faster due to not
+ -- accumulating the list and fusing better with the pure consumers.
+ --
+ -- This should be safe as the array contents are guaranteed to be
+ -- evaluated/written to before we peek at them.
+ let !x = unsafeInlineIO $ peek p
+ return $ D.Yield x
+ (ReadUState fp (p `plusPtr` sizeOf (undefined :: a)))
+ extract (ReadUState (ForeignPtr end contents) (Ptr p)) =
+ return $ Array (ForeignPtr p contents) (Ptr end) (Ptr end)
+{-# INLINE simplify #-}
+simplify :: Producer m a b -> Unfold m a b
+simplify (Producer step inject _) = Unfold step inject
+-- | Unfold an array into a stream.
+-- @since 0.7.0
+{-# INLINE [1] read #-}
+read :: forall m a. (Monad m, Storable a) => Unfold m (Array a) a
+read = simplify producer
+{-# INLINE fromStreamD #-}
+fromStreamD :: (MonadIO m, Storable a) => D.Stream m a -> m (Array a)
+fromStreamD m = do
+ buffered <- bufferChunks m
+ len <- K.foldl' (+) 0 ( length buffered)
+ fromStreamDN len $ D.unfoldMany read $ D.fromStreamK buffered
+-- | Create an 'Array' from a list. The list must be of finite size.
+-- @since 0.7.0
+{-# INLINABLE fromList #-}
+fromList :: Storable a => [a] -> Array a
+fromList xs = unsafePerformIO $ fromStreamD $ D.fromList xs
+foreign import ccall unsafe "string.h memcpy" c_memcpy
+ :: Ptr Word8 -> Ptr Word8 -> CSize -> IO (Ptr Word8)
+-- XXX we are converting Int to CSize
+memcpy :: Ptr Word8 -> Ptr Word8 -> Int -> IO ()
+memcpy dst src len = void (c_memcpy dst src (fromIntegral len))
+foreign import ccall unsafe "string.h memcmp" c_memcmp
+ :: Ptr Word8 -> Ptr Word8 -> CSize -> IO CInt
+-- XXX we are converting Int to CSize
+-- return True if the memory locations have identical contents
+{-# INLINE memcmp #-}
+memcmp :: Ptr Word8 -> Ptr Word8 -> Int -> IO Bool
+memcmp p1 p2 len = do
+ r <- c_memcmp p1 p2 (fromIntegral len)
+ return $ r == 0
+{-# NOINLINE reallocAligned #-}
+reallocAligned :: Int -> Int -> Array a -> IO (Array a)
+reallocAligned alignSize newSize Array{..} = do
+ assert (aEnd <= aBound) (return ())
+ let oldStart = unsafeForeignPtrToPtr aStart
+ let size = aEnd `minusPtr` oldStart
+ newPtr <- mallocForeignPtrAlignedBytes newSize alignSize
+ withForeignPtr newPtr $ \pNew -> do
+ memcpy (castPtr pNew) (castPtr oldStart) size
+ touchForeignPtr aStart
+ return $ Array
+ { aStart = newPtr
+ , aEnd = pNew `plusPtr` size
+ , aBound = pNew `plusPtr` newSize
+ }
+-- XXX can unaligned allocation be more efficient when alignment is not needed?
+{-# INLINABLE realloc #-}
+realloc :: forall a. Storable a => Int -> Array a -> IO (Array a)
+realloc = reallocAligned (alignment (undefined :: a))
+shrinkToFit :: forall a. Storable a => Array a -> IO (Array a)
+shrinkToFit arr@Array{..} = do
+ assert (aEnd <= aBound) (return ())
+ let start = unsafeForeignPtrToPtr aStart
+ let used = aEnd `minusPtr` start
+ waste = aBound `minusPtr` aEnd
+ -- if used == waste == 0 then do not realloc
+ -- if the wastage is more than 25% of the array then realloc
+ if used < 3 * waste
+ then realloc used arr
+ else return arr
+{-# INLINE [1] toStreamD #-}
+toStreamD :: forall m a. (Monad m, Storable a) => Array a -> D.Stream m a
+toStreamD Array{..} =
+ let p = unsafeForeignPtrToPtr aStart
+ in D.Stream step p
+ where
+ {-# INLINE [0] step #-}
+ step _ p | p == aEnd = return D.Stop
+ step _ p = do
+ -- unsafeInlineIO allows us to run this in Identity monad for pure
+ -- toList/foldr case which makes them much faster due to not
+ -- accumulating the list and fusing better with the pure consumers.
+ --
+ -- This should be safe as the array contents are guaranteed to be
+ -- evaluated/written to before we peek at them.
+ let !x = unsafeInlineIO $ do
+ r <- peek p
+ touchForeignPtr aStart
+ return r
+ return $ D.Yield x (p `plusPtr` sizeOf (undefined :: a))
+{-# INLINE [1] foldl' #-}
+foldl' :: forall a b. Storable a => (b -> a -> b) -> b -> Array a -> b
+foldl' f z arr = runIdentity $ D.foldl' f z $ toStreamD arr
+{-# INLINE [1] unsafeIndexIO #-}
+unsafeIndexIO :: forall a. Storable a => Array a -> Int -> IO a
+unsafeIndexIO Array {..} i =
+ withForeignPtr aStart $ \p -> do
+ let elemSize = sizeOf (undefined :: a)
+ elemOff = p `plusPtr` (elemSize * i)
+ assert (i >= 0 && elemOff `plusPtr` elemSize <= aEnd)
+ (return ())
+ peek elemOff
diff --git a/testsuite/tests/profiling/should_compile/T19894/Main.hs b/testsuite/tests/profiling/should_compile/T19894/Main.hs
new file mode 100644
index 0000000000..0195d14407
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Main.hs
@@ -0,0 +1,26 @@
+module Main (main) where
+import Data.Char (ord)
+import Data.Word (Word8)
+import System.IO (openFile, IOMode(..), Handle)
+import StreamK (IsStream, MonadAsync)
+import qualified Operations
+import qualified StreamK
+import qualified Fold
+import qualified Handle
+import qualified Array
+import Array (Array)
+toarr :: String -> Array Word8
+toarr = Array.fromList . map (fromIntegral . ord)
+-- | Split on a word8 sequence.
+splitOnSeq :: String -> Handle -> IO ()
+splitOnSeq str inh =
+ Operations.drain $ Operations.splitOnSeq (toarr str) Fold.drain
+ $ Operations.unfold inh
+main :: IO ()
+main = do
+ inh <- openFile "input.txt" ReadMode
+ splitOnSeq "aa" inh
diff --git a/testsuite/tests/profiling/should_compile/T19894/Operations.hs b/testsuite/tests/profiling/should_compile/T19894/Operations.hs
new file mode 100644
index 0000000000..b17298558a
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Operations.hs
@@ -0,0 +1,221 @@
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+module Operations (unfoldrM, drain, postscan, after_, replicate, fold, unfold,
+ splitOnSeq) where
+import Control.Monad.IO.Class (MonadIO(..))
+import Data.Bits (shiftR, shiftL, (.|.), (.&.))
+import Data.Word (Word, Word32)
+import Fold (Fold(..))
+import Foreign.Storable (Storable(..))
+import GHC.Types (SPEC(..))
+import Array (Array)
+import StreamK (IsStream, MonadAsync, adaptState)
+import Step (Step(..))
+import Unfold (Unfold)
+#if defined(FUSION_PLUGIN)
+import Fusion.Plugin.Types (Fuse(..))
+import qualified StreamK as K
+import qualified StreamD as D
+import qualified Serial
+import qualified Fold as FL
+import qualified Array as A
+import qualified Ring as RB
+import Prelude hiding (replicate)
+{-# INLINE [2] unfoldrM #-}
+unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
+unfoldrM = K.unfoldrM
+{-# RULES "unfoldrM serial" unfoldrM = unfoldrMSerial #-}
+{-# INLINE [2] unfoldrMSerial #-}
+unfoldrMSerial :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> K.Stream m a
+unfoldrMSerial = Serial.unfoldrM
+{-# INLINE [2] drain #-}
+drain :: (Monad m) => K.Stream m a -> m ()
+drain m = D.drain $ D.fromStreamK (K.toStream m)
+{-# RULES "drain fallback to CPS" [1]
+ forall a. D.drain (D.fromStreamK a) = K.drain a #-}
+{-# INLINE [1] postscan #-}
+postscan :: (IsStream t, Monad m)
+ => Fold m a b -> t m a -> t m b
+postscan fld m =
+ D.fromStreamD $ D.postscanOnce fld $ D.toStreamD m
+{-# INLINE [1] replicate #-}
+replicate :: (IsStream t, Monad m) => Int -> a -> t m a
+replicate n = D.fromStreamD . D.replicate n
+{-# INLINE fold_ #-}
+fold_ :: Monad m => Fold m a b -> K.Stream m a -> m (b, K.Stream m a)
+fold_ fl strm = do
+ (b, str) <- D.fold_ fl $ D.toStreamD strm
+ return $! (b, D.fromStreamD str)
+{-# INLINE fold #-}
+fold :: Monad m => Fold m a b -> K.Stream m a -> m b
+fold fl strm = do
+ (b, _) <- fold_ fl strm
+ return $! b
+{-# INLINE after_ #-}
+after_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
+after_ action xs = D.fromStreamD $ D.after_ action $ D.toStreamD xs
+{-# INLINE unfold #-}
+unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
+unfold unf x = D.fromStreamD $ D.unfold unf x
+#if defined(FUSION_PLUGIN)
+{-# ANN type SplitOnSeqState Fuse #-}
+data SplitOnSeqState rb rh ck w fs s b x =
+ SplitOnSeqInit
+ | SplitOnSeqYield b (SplitOnSeqState rb rh ck w fs s b x)
+ | SplitOnSeqDone
+ | SplitOnSeqWordInit !fs s
+ | SplitOnSeqWordLoop !w s !fs
+ | SplitOnSeqWordDone Int !fs !w
+ | SplitOnSeqReinit (fs -> SplitOnSeqState rb rh ck w fs s b x)
+{-# INLINE [1] splitOnSeqD #-}
+ :: forall m a b. (MonadIO m, Storable a, Enum a, Eq a)
+ => Array a
+ -> Fold m a b
+ -> D.Stream m a
+ -> D.Stream m b
+splitOnSeqD patArr (Fold fstep initial done) (D.Stream step state) =
+ D.Stream stepOuter SplitOnSeqInit
+ where
+ patLen = A.length patArr
+ maxIndex = patLen - 1
+ elemBits = sizeOf (undefined :: a) * 8
+ -- For word pattern case
+ wordMask :: Word
+ wordMask = (1 `shiftL` (elemBits * patLen)) - 1
+ elemMask :: Word
+ elemMask = (1 `shiftL` elemBits) - 1
+ wordPat :: Word
+ wordPat = wordMask .&. A.foldl' addToWord 0 patArr
+ addToWord wd a = (wd `shiftL` elemBits) .|. fromIntegral (fromEnum a)
+ skip = return . Skip
+ nextAfterInit nextGen stepRes =
+ case stepRes of
+ FL.Partial s -> nextGen s
+ FL.Done b -> SplitOnSeqYield b (SplitOnSeqReinit nextGen)
+ {-# INLINE yieldProceed #-}
+ yieldProceed nextGen fs =
+ initial >>= skip . SplitOnSeqYield fs . nextAfterInit nextGen
+ {-# INLINE [0] stepOuter #-}
+ stepOuter _ SplitOnSeqInit = do
+ res <- initial
+ case res of
+ FL.Partial acc -> return $ Skip $ SplitOnSeqWordInit acc state
+ FL.Done b -> skip $ SplitOnSeqYield b SplitOnSeqInit
+ stepOuter _ (SplitOnSeqYield x next) = return $ Yield x next
+ ---------------------------
+ -- Checkpoint
+ ---------------------------
+ stepOuter _ (SplitOnSeqReinit nextGen) =
+ initial >>= skip . nextAfterInit nextGen
+ -----------------
+ -- Done
+ -----------------
+ stepOuter _ SplitOnSeqDone = return Stop
+ ---------------------------
+ -- Short Pattern - Shift Or
+ ---------------------------
+ stepOuter _ (SplitOnSeqWordDone 0 fs _) = do
+ r <- done fs
+ skip $ SplitOnSeqYield r SplitOnSeqDone
+ stepOuter _ (SplitOnSeqWordDone n fs wrd) = do
+ let old = elemMask .&. (wrd `shiftR` (elemBits * (n - 1)))
+ r <- fstep fs (toEnum $ fromIntegral old)
+ case r of
+ FL.Partial fs1 -> skip $ SplitOnSeqWordDone (n - 1) fs1 wrd
+ FL.Done b -> do
+ let jump c = SplitOnSeqWordDone (n - 1) c wrd
+ yieldProceed jump b
+ stepOuter gst (SplitOnSeqWordInit fs st0) =
+ go SPEC 0 0 st0
+ where
+ {-# INLINE go #-}
+ go !_ !idx !wrd !st = do
+ res <- step (adaptState gst) st
+ case res of
+ Yield x s -> do
+ let wrd1 = addToWord wrd x
+ if idx == maxIndex
+ then do
+ if wrd1 .&. wordMask == wordPat
+ then do
+ let jump c = SplitOnSeqWordInit c s
+ done fs >>= yieldProceed jump
+ else skip $ SplitOnSeqWordLoop wrd1 s fs
+ else go SPEC (idx + 1) wrd1 s
+ Skip s -> go SPEC idx wrd s
+ Stop -> do
+ if idx /= 0
+ then skip $ SplitOnSeqWordDone idx fs wrd
+ else do
+ r <- done fs
+ skip $ SplitOnSeqYield r SplitOnSeqDone
+ stepOuter gst (SplitOnSeqWordLoop wrd0 st0 fs0) =
+ go SPEC wrd0 st0 fs0
+ where
+ {-# INLINE go #-}
+ go !_ !wrd !st !fs = do
+ res <- step (adaptState gst) st
+ case res of
+ Yield x s -> do
+ let jump c = SplitOnSeqWordInit c s
+ wrd1 = addToWord wrd x
+ old = (wordMask .&. wrd)
+ `shiftR` (elemBits * (patLen - 1))
+ r <- fstep fs (toEnum $ fromIntegral old)
+ case r of
+ FL.Partial fs1 -> do
+ if wrd1 .&. wordMask == wordPat
+ then done fs1 >>= yieldProceed jump
+ else go SPEC wrd1 s fs1
+ FL.Done b -> yieldProceed jump b
+ Skip s -> go SPEC wrd s fs
+ Stop -> skip $ SplitOnSeqWordDone patLen fs wrd
+{-# INLINE splitOnSeq #-}
+ :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
+ => Array a -> Fold m a b -> t m a -> t m b
+splitOnSeq patt f m = D.fromStreamD $ splitOnSeqD patt f (D.toStreamD m)
diff --git a/testsuite/tests/profiling/should_compile/T19894/Ring.hs b/testsuite/tests/profiling/should_compile/T19894/Ring.hs
new file mode 100644
index 0000000000..715103055f
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Ring.hs
@@ -0,0 +1,254 @@
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+-- |
+-- Module : Streamly.Internal.Ring.Foreign
+-- Copyright : (c) 2019 Composewell Technologies
+-- License : BSD3
+-- Maintainer :
+-- Stability : experimental
+-- Portability : GHC
+module Ring
+ ( Ring(..)
+ -- * Construction
+ , new
+ , advance
+ , moveBy
+ , startOf
+ -- * Modification
+ , unsafeInsert
+ -- * Folds
+ , unsafeFoldRing
+ , unsafeFoldRingM
+ , unsafeFoldRingFullM
+ , unsafeFoldRingNM
+ -- * Fast Byte Comparisons
+ , unsafeEqArray
+ , unsafeEqArrayN
+ ) where
+import Control.Exception (assert)
+import Foreign.ForeignPtr (ForeignPtr, withForeignPtr, touchForeignPtr)
+import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
+import Foreign.Ptr (plusPtr, minusPtr, castPtr)
+import Foreign.Storable (Storable(..))
+import GHC.ForeignPtr (mallocPlainForeignPtrAlignedBytes)
+import GHC.Ptr (Ptr(..))
+import Prelude hiding (length, concat)
+import Control.Monad.IO.Class (MonadIO(..))
+import qualified Array as A
+-- | A ring buffer is a mutable array of fixed size. Initially the array is
+-- empty, with ringStart pointing at the start of allocated memory. We call the
+-- next location to be written in the ring as ringHead. Initially ringHead ==
+-- ringStart. When the first item is added, ringHead points to ringStart +
+-- sizeof item. When the buffer becomes full ringHead would wrap around to
+-- ringStart. When the buffer is full, ringHead always points at the oldest
+-- item in the ring and the newest item added always overwrites the oldest
+-- item.
+-- When using it we should keep in mind that a ringBuffer is a mutable data
+-- structure. We should not leak out references to it for immutable use.
+data Ring a = Ring
+ { ringStart :: {-# UNPACK #-} !(ForeignPtr a) -- first address
+ , ringBound :: {-# UNPACK #-} !(Ptr a) -- first address beyond allocated memory
+ }
+-- | Get the first address of the ring as a pointer.
+startOf :: Ring a -> Ptr a
+startOf = unsafeForeignPtrToPtr . ringStart
+-- | Create a new ringbuffer and return the ring buffer and the ringHead.
+-- Returns the ring and the ringHead, the ringHead is same as ringStart.
+{-# INLINE new #-}
+new :: forall a. Storable a => Int -> IO (Ring a, Ptr a)
+new count = do
+ let size = count * sizeOf (undefined :: a)
+ fptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: a))
+ let p = unsafeForeignPtrToPtr fptr
+ return (Ring
+ { ringStart = fptr
+ , ringBound = p `plusPtr` size
+ }, p)
+-- | Advance the ringHead by 1 item, wrap around if we hit the end of the
+-- array.
+{-# INLINE advance #-}
+advance :: forall a. Storable a => Ring a -> Ptr a -> Ptr a
+advance Ring{..} ringHead =
+ let ptr = ringHead `plusPtr` sizeOf (undefined :: a)
+ in if ptr < ringBound
+ then ptr
+ else unsafeForeignPtrToPtr ringStart
+-- | Move the ringHead by n items. The direction depends on the sign on whether
+-- n is positive or negative. Wrap around if we hit the beginning or end of the
+-- array.
+{-# INLINE moveBy #-}
+moveBy :: forall a. Storable a => Int -> Ring a -> Ptr a -> Ptr a
+moveBy by Ring {..} ringHead = ringStartPtr `plusPtr` advanceFromHead
+ where
+ elemSize = sizeOf (undefined :: a)
+ ringStartPtr = unsafeForeignPtrToPtr ringStart
+ lenInBytes = ringBound `minusPtr` ringStartPtr
+ offInBytes = ringHead `minusPtr` ringStartPtr
+ len = assert (lenInBytes `mod` elemSize == 0) $ lenInBytes `div` elemSize
+ off = assert (offInBytes `mod` elemSize == 0) $ offInBytes `div` elemSize
+ advanceFromHead = (off + by `mod` len) * elemSize
+-- | Insert an item at the head of the ring, when the ring is full this
+-- replaces the oldest item in the ring with the new item. This is unsafe
+-- beause ringHead supplied is not verified to be within the Ring. Also,
+-- the ringStart foreignPtr must be guaranteed to be alive by the caller.
+{-# INLINE unsafeInsert #-}
+unsafeInsert :: Storable a => Ring a -> Ptr a -> a -> IO (Ptr a)
+unsafeInsert rb ringHead newVal = do
+ poke ringHead newVal
+ -- touchForeignPtr (ringStart rb)
+ return $ advance rb ringHead
+-- XXX remove all usage of A.unsafeInlineIO
+-- | Like 'unsafeEqArray' but compares only N bytes instead of entire length of
+-- the ring buffer. This is unsafe because the ringHead Ptr is not checked to
+-- be in range.
+{-# INLINE unsafeEqArrayN #-}
+unsafeEqArrayN :: Ring a -> Ptr a -> A.Array a -> Int -> Bool
+unsafeEqArrayN Ring{..} rh A.Array{..} n =
+ let !res = A.unsafeInlineIO $ do
+ let rs = unsafeForeignPtrToPtr ringStart
+ as = unsafeForeignPtrToPtr aStart
+ assert (aEnd `minusPtr` as >= ringBound `minusPtr` rs) (return ())
+ let len = ringBound `minusPtr` rh
+ r1 <- A.memcmp (castPtr rh) (castPtr as) (min len n)
+ r2 <- if n > len
+ then A.memcmp (castPtr rs) (castPtr (as `plusPtr` len))
+ (min (rh `minusPtr` rs) (n - len))
+ else return True
+ -- XXX enable these, check perf impact
+ -- touchForeignPtr ringStart
+ -- touchForeignPtr aStart
+ return (r1 && r2)
+ in res
+-- | Byte compare the entire length of ringBuffer with the given array,
+-- starting at the supplied ringHead pointer. Returns true if the Array and
+-- the ringBuffer have identical contents.
+-- This is unsafe because the ringHead Ptr is not checked to be in range. The
+-- supplied array must be equal to or bigger than the ringBuffer, ARRAY BOUNDS
+{-# INLINE unsafeEqArray #-}
+unsafeEqArray :: Ring a -> Ptr a -> A.Array a -> Bool
+unsafeEqArray Ring{..} rh A.Array{..} =
+ let !res = A.unsafeInlineIO $ do
+ let rs = unsafeForeignPtrToPtr ringStart
+ let as = unsafeForeignPtrToPtr aStart
+ assert (aEnd `minusPtr` as >= ringBound `minusPtr` rs)
+ (return ())
+ let len = ringBound `minusPtr` rh
+ r1 <- A.memcmp (castPtr rh) (castPtr as) len
+ r2 <- A.memcmp (castPtr rs) (castPtr (as `plusPtr` len))
+ (rh `minusPtr` rs)
+ -- XXX enable these, check perf impact
+ -- touchForeignPtr ringStart
+ -- touchForeignPtr aStart
+ return (r1 && r2)
+ in res
+-- XXX use MonadIO
+-- | Fold the buffer starting from ringStart up to the given 'Ptr' using a pure
+-- step function. This is useful to fold the items in the ring when the ring is
+-- not full. The supplied pointer is usually the end of the ring.
+-- Unsafe because the supplied Ptr is not checked to be in range.
+{-# INLINE unsafeFoldRing #-}
+unsafeFoldRing :: forall a b. Storable a
+ => Ptr a -> (b -> a -> b) -> b -> Ring a -> b
+unsafeFoldRing ptr f z Ring{..} =
+ let !res = A.unsafeInlineIO $ withForeignPtr ringStart $ \p ->
+ go z p ptr
+ in res
+ where
+ go !acc !p !q
+ | p == q = return acc
+ | otherwise = do
+ x <- peek p
+ go (f acc x) (p `plusPtr` sizeOf (undefined :: a)) q
+-- XXX Can we remove MonadIO here?
+withForeignPtrM :: MonadIO m => ForeignPtr a -> (Ptr a -> m b) -> m b
+withForeignPtrM fp fn = do
+ r <- fn $ unsafeForeignPtrToPtr fp
+ liftIO $ touchForeignPtr fp
+ return r
+-- | Like unsafeFoldRing but with a monadic step function.
+{-# INLINE unsafeFoldRingM #-}
+unsafeFoldRingM :: forall m a b. (MonadIO m, Storable a)
+ => Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
+unsafeFoldRingM ptr f z Ring {..} =
+ withForeignPtrM ringStart $ \x -> go z x ptr
+ where
+ go !acc !start !end
+ | start == end = return acc
+ | otherwise = do
+ let !x = A.unsafeInlineIO $ peek start
+ acc' <- f acc x
+ go acc' (start `plusPtr` sizeOf (undefined :: a)) end
+-- | Fold the entire length of a ring buffer starting at the supplied ringHead
+-- pointer. Assuming the supplied ringHead pointer points to the oldest item,
+-- this would fold the ring starting from the oldest item to the newest item in
+-- the ring.
+-- Note, this will crash on ring of 0 size.
+{-# INLINE unsafeFoldRingFullM #-}
+unsafeFoldRingFullM :: forall m a b. (MonadIO m, Storable a)
+ => Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
+unsafeFoldRingFullM rh f z rb@Ring {..} =
+ withForeignPtrM ringStart $ \_ -> go z rh
+ where
+ go !acc !start = do
+ let !x = A.unsafeInlineIO $ peek start
+ acc' <- f acc x
+ let ptr = advance rb start
+ if ptr == rh
+ then return acc'
+ else go acc' ptr
+-- | Fold @Int@ items in the ring starting at @Ptr a@. Won't fold more
+-- than the length of the ring.
+-- Note, this will crash on ring of 0 size.
+{-# INLINE unsafeFoldRingNM #-}
+unsafeFoldRingNM :: forall m a b. (MonadIO m, Storable a)
+ => Int -> Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
+unsafeFoldRingNM count rh f z rb@Ring {..} =
+ withForeignPtrM ringStart $ \_ -> go count z rh
+ where
+ go 0 acc _ = return acc
+ go !n !acc !start = do
+ let !x = A.unsafeInlineIO $ peek start
+ acc' <- f acc x
+ let ptr = advance rb start
+ if ptr == rh || n == 0
+ then return acc'
+ else go (n - 1) acc' ptr
diff --git a/testsuite/tests/profiling/should_compile/T19894/Serial.hs b/testsuite/tests/profiling/should_compile/T19894/Serial.hs
new file mode 100644
index 0000000000..d8c9cabcfe
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Serial.hs
@@ -0,0 +1,8 @@
+module Serial (unfoldrM) where
+import StreamK (IsStream)
+import qualified StreamD as D
+{-# INLINE unfoldrM #-}
+unfoldrM :: (IsStream t, Monad m) => (b -> m (Maybe (a, b))) -> b -> t m a
+unfoldrM step seed = D.fromStreamD (D.unfoldrM step seed)
diff --git a/testsuite/tests/profiling/should_compile/T19894/Step.hs b/testsuite/tests/profiling/should_compile/T19894/Step.hs
new file mode 100644
index 0000000000..72b134c84b
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Step.hs
@@ -0,0 +1,44 @@
+-- |
+-- Module : Streamly.Internal.Data.Stream.StreamD.Step
+-- Copyright : (c) 2018 Composewell Technologies
+-- License : BSD-3-Clause
+-- Maintainer :
+-- Stability : experimental
+-- Portability : GHC
+module Step
+ (
+ -- * The stream type
+ Step (..)
+ )
+#if defined(FUSION_PLUGIN)
+import Fusion.Plugin.Types (Fuse(..))
+-- | A stream is a succession of 'Step's. A 'Yield' produces a single value and
+-- the next state of the stream. 'Stop' indicates there are no more values in
+-- the stream.
+#if defined(FUSION_PLUGIN)
+{-# ANN type Step Fuse #-}
+data Step s a = Yield a s | Skip s | Stop
+instance Functor (Step s) where
+ {-# INLINE fmap #-}
+ fmap f (Yield x s) = Yield (f x) s
+ fmap _ (Skip s) = Skip s
+ fmap _ Stop = Stop
+yield :: Monad m => a -> s -> m (Step s a)
+yield a = return . Yield a
+skip :: Monad m => s -> m (Step s a)
+skip = return . Skip
+stop :: Monad m => m (Step s a)
+stop = return Stop
diff --git a/testsuite/tests/profiling/should_compile/T19894/StreamD.hs b/testsuite/tests/profiling/should_compile/T19894/StreamD.hs
new file mode 100644
index 0000000000..265780188a
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/StreamD.hs
@@ -0,0 +1,1079 @@
+{-# LANGUAGE PatternSynonyms #-}
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE ViewPatterns #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+#include "inline.hs"
+-- |
+-- Module : Streamly.Internal.Data.Stream.StreamD.Type
+-- Copyright : (c) 2018 Composewell Technologies
+-- (c) Roman Leshchinskiy 2008-2010
+-- License : BSD-3-Clause
+-- Maintainer :
+-- Stability : experimental
+-- Portability : GHC
+-- The stream type is inspired by the vector package. A few functions in this
+-- module have been originally adapted from the vector package (c) Roman
+-- Leshchinskiy. See the notes in specific functions.
+module StreamD
+ (
+ -- * The stream type
+ Step (..)
+ -- XXX UnStream is exported to avoid a performance issue in concatMap if we
+ -- use the pattern synonym "Stream".
+ , Stream (Stream, UnStream)
+ -- * Primitives
+ , nilM
+ , consM
+ , uncons
+ -- * From Unfold
+ , unfold
+ , unfoldrM
+ , drain
+ -- * From Values
+ , yield
+ , yieldM
+ , replicate
+ -- * From Containers
+ , fromList
+ -- * Conversions From/To
+ , fromStreamK
+ , toStreamK
+ , toStreamD
+ , fromStreamD
+ -- * Running a 'Fold'
+ , fold
+ , fold_
+ -- * Right Folds
+ -- , foldrT
+ , foldrM
+ , foldrMx
+ , foldr
+ -- , foldrS
+ -- * Left Folds
+ , foldl'
+ , foldlM'
+ , foldlx'
+ , foldlMx'
+ -- * To Containers
+ , toList
+ -- * Multi-stream folds
+ , eqBy
+ , cmpBy
+ -- * Transformations
+ , map
+ , mapM
+ , take
+ , takeWhile
+ , takeWhileM
+ , postscanOnce
+ , after_
+ -- * Nesting
+ -- , ConcatMapUState (..)
+ , unfoldMany
+ {-
+ , concatMap
+ , concatMapM
+ , FoldMany (..) -- for inspection testing
+ , FoldManyPost (..)
+ , foldMany
+ , foldManyPost
+ , groupsOf2
+ , chunksOf
+ -}
+ )
+import Control.Applicative (liftA2)
+-- import Control.Monad (when)
+-- import Control.Monad.Trans.Class (lift, MonadTrans)
+import Data.Functor.Identity (Identity(..))
+-- import Fusion.Plugin.Types (Fuse(..))
+import GHC.Base (build)
+import GHC.Types (SPEC(..))
+import Prelude hiding (map, mapM, foldr, take, concatMap, takeWhile, replicate)
+import Unfold (Unfold(..))
+import Fold (Fold(..))
+import Step (Step (..))
+import StreamK (State, adaptState, defState)
+-- import Streamly.Internal.Data.SVar (State, adaptState, defState)
+-- import Streamly.Internal.Data.Unfold.Type (Unfold(..))
+-- ort qualified Streamly.Internal.Data.Fold.Type as FL
+import qualified StreamK as K
+import qualified Fold as FL
+-- The direct style stream type
+-- gst = global state
+-- | A stream consists of a step function that generates the next step given a
+-- current state, and the current state.
+data Stream m a =
+ forall s. UnStream (State K.Stream m a -> s -> m (Step s a)) s
+unShare :: Stream m a -> Stream m a
+unShare (UnStream step state) = UnStream step' state
+ where step' gst = step (adaptState gst)
+pattern Stream :: (State K.Stream m a -> s -> m (Step s a)) -> s -> Stream m a
+pattern Stream step state <- (unShare -> UnStream step state)
+ where Stream = UnStream
+#if __GLASGOW_HASKELL__ >= 802
+{-# COMPLETE Stream #-}
+-- Primitives
+-- | An empty 'Stream' with a side effect.
+{-# INLINE_NORMAL nilM #-}
+nilM :: Monad m => m b -> Stream m a
+nilM m = Stream (\_ _ -> m >> return Stop) ()
+{-# INLINE_NORMAL consM #-}
+consM :: Monad m => m a -> Stream m a -> Stream m a
+consM m (Stream step state) = Stream step1 Nothing
+ where
+ {-# INLINE_LATE step1 #-}
+ step1 _ Nothing = m >>= \x -> return $ Yield x (Just state)
+ step1 gst (Just st) = do
+ r <- step gst st
+ return $
+ case r of
+ Yield a s -> Yield a (Just s)
+ Skip s -> Skip (Just s)
+ Stop -> Stop
+-- | Does not fuse, has the same performance as the StreamK version.
+{-# INLINE_NORMAL uncons #-}
+uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a))
+uncons (UnStream step state) = go state
+ where
+ go st = do
+ r <- step defState st
+ case r of
+ Yield x s -> return $ Just (x, Stream step s)
+ Skip s -> go s
+ Stop -> return Nothing
+-- From 'Unfold'
+data UnfoldState s = UnfoldNothing | UnfoldJust s
+-- | Convert an 'Unfold' into a 'Stream' by supplying it a seed.
+{-# INLINE_NORMAL unfold #-}
+unfold :: Monad m => Unfold m a b -> a -> Stream m b
+unfold (Unfold ustep inject) seed = Stream step UnfoldNothing
+ where
+ {-# INLINE_LATE step #-}
+ step _ UnfoldNothing = inject seed >>= return . Skip . UnfoldJust
+ step _ (UnfoldJust st) = do
+ r <- ustep st
+ return $ case r of
+ Yield x s -> Yield x (UnfoldJust s)
+ Skip s -> Skip (UnfoldJust s)
+ Stop -> Stop
+-- From Values
+-- | Create a singleton 'Stream' from a pure value.
+{-# INLINE_NORMAL yield #-}
+yield :: Applicative m => a -> Stream m a
+yield x = Stream (\_ s -> pure $ step undefined s) True
+ where
+ {-# INLINE_LATE step #-}
+ step _ True = Yield x False
+ step _ False = Stop
+-- | Create a singleton 'Stream' from a monadic action.
+{-# INLINE_NORMAL yieldM #-}
+yieldM :: Monad m => m a -> Stream m a
+yieldM m = Stream step True
+ where
+ {-# INLINE_LATE step #-}
+ step _ True = m >>= \x -> return $ Yield x False
+ step _ False = return Stop
+-- From Containers
+-- Adapted from the vector package.
+-- | Convert a list of pure values to a 'Stream'
+{-# INLINE_LATE fromList #-}
+fromList :: Applicative m => [a] -> Stream m a
+fromList = Stream step
+ where
+ {-# INLINE_LATE step #-}
+ step _ (x:xs) = pure $ Yield x xs
+ step _ [] = pure Stop
+-- Conversions From/To
+-- | Convert a CPS encoded StreamK to direct style step encoded StreamD
+{-# INLINE_LATE fromStreamK #-}
+fromStreamK :: Monad m => K.Stream m a -> Stream m a
+fromStreamK = Stream step
+ where
+ step gst m1 =
+ let stop = return Stop
+ single a = return $ Yield a K.nil
+ yieldk a r = return $ Yield a r
+ in K.foldStreamShared gst yieldk single stop m1
+-- | Convert a direct style step encoded StreamD to a CPS encoded StreamK
+{-# INLINE_LATE toStreamK #-}
+toStreamK :: Monad m => Stream m a -> K.Stream m a
+toStreamK (Stream step state) = go state
+ where
+ go st = K.mkStream $ \gst yld _ stp ->
+ let go' ss = do
+ r <- step gst ss
+ case r of
+ Yield x s -> yld x (go s)
+ Skip s -> go' s
+ Stop -> stp
+ in go' st
+#if !defined(DISABLE_FUSION)
+{-# RULES "fromStreamK/toStreamK fusion"
+ forall s. toStreamK (fromStreamK s) = s #-}
+{-# RULES "toStreamK/fromStreamK fusion"
+ forall s. fromStreamK (toStreamK s) = s #-}
+-- XXX Rename to toStream or move to some IsStream common module
+{-# INLINE fromStreamD #-}
+fromStreamD :: (K.IsStream t, Monad m) => Stream m a -> t m a
+fromStreamD = K.fromStream . toStreamK
+-- XXX Rename to toStream or move to some IsStream common module
+{-# INLINE toStreamD #-}
+toStreamD :: (K.IsStream t, Monad m) => t m a -> Stream m a
+toStreamD = fromStreamK . K.toStream
+-- Running a 'Fold'
+{-# INLINE_NORMAL fold #-}
+fold :: (Monad m) => Fold m a b -> Stream m a -> m b
+fold fld strm = do
+ (b, _) <- fold_ fld strm
+ return b
+{-# INLINE_NORMAL fold_ #-}
+fold_ :: Monad m => Fold m a b -> Stream m a -> m (b, Stream m a)
+fold_ (Fold fstep begin done) (Stream step state) = do
+ res <- begin
+ case res of
+ FL.Partial fs -> go SPEC fs state
+ FL.Done fb -> return $! (fb, Stream step state)
+ where
+ {-# INLINE go #-}
+ go !_ !fs st = do
+ r <- step defState st
+ case r of
+ Yield x s -> do
+ res <- fstep fs x
+ case res of
+ FL.Done b -> return $! (b, Stream step s)
+ FL.Partial fs1 -> go SPEC fs1 s
+ Skip s -> go SPEC fs s
+ Stop -> do
+ b <- done fs
+ return $! (b, Stream (\ _ _ -> return Stop) ())
+-- Right Folds
+-- Adapted from the vector package.
+-- XXX Use of SPEC constructor in folds causes 2x performance degradation in
+-- one shot operations, but helps immensely in operations composed of multiple
+-- combinators or the same combinator many times. There seems to be an
+-- opportunity to optimize here, can we get both, better perf for single ops
+-- as well as composed ops? Without SPEC, all single operation benchmarks
+-- become 2x faster.
+-- The way we want a left fold to be strict, dually we want the right fold to
+-- be lazy. The correct signature of the fold function to keep it lazy must be
+-- (a -> m b -> m b) instead of (a -> b -> m b). We were using the latter
+-- earlier, which is incorrect. In the latter signature we have to feed the
+-- value to the fold function after evaluating the monadic action, depending on
+-- the bind behavior of the monad, the action may get evaluated immediately
+-- introducing unnecessary strictness to the fold. If the implementation is
+-- lazy the following example, must work:
+-- S.foldrM (\x t -> if x then return t else return False) (return True)
+-- (S.fromList [False,undefined] :: SerialT IO Bool)
+{-# INLINE_NORMAL foldrM #-}
+foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b
+foldrM f z (Stream step state) = go SPEC state
+ where
+ {-# INLINE_LATE go #-}
+ go !_ st = do
+ r <- step defState st
+ case r of
+ Yield x s -> f x (go SPEC s)
+ Skip s -> go SPEC s
+ Stop -> z
+{-# INLINE_NORMAL foldrMx #-}
+foldrMx :: Monad m
+ => (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
+foldrMx fstep final convert (Stream step state) = convert $ go SPEC state
+ where
+ {-# INLINE_LATE go #-}
+ go !_ st = do
+ r <- step defState st
+ case r of
+ Yield x s -> fstep x (go SPEC s)
+ Skip s -> go SPEC s
+ Stop -> final
+-- Note that foldr works on pure values, therefore it becomes necessarily
+-- strict when the monad m is strict. In that case it cannot terminate early,
+-- it would evaluate all of its input. Though, this should work fine with lazy
+-- monads. For example, if "any" is implemented using "foldr" instead of
+-- "foldrM" it performs the same with Identity monad but performs 1000x slower
+-- with IO monad.
+{-# INLINE_NORMAL foldr #-}
+foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b
+foldr f z = foldrM (\a b -> liftA2 f (return a) b) (return z)
+-- this performs horribly, should not be used
+{-# INLINE_NORMAL foldrS #-}
+ :: Monad m
+ => (a -> Stream m b -> Stream m b)
+ -> Stream m b
+ -> Stream m a
+ -> Stream m b
+foldrS f final (Stream step state) = go SPEC state
+ where
+ {-# INLINE_LATE go #-}
+ go !_ st = do
+ -- defState??
+ r <- yieldM $ step defState st
+ case r of
+ Yield x s -> f x (go SPEC s)
+ Skip s -> go SPEC s
+ Stop -> final
+ -}
+-- Right fold to some transformer (T) monad. This can be useful to implement
+-- stateless combinators like map, filtering, insertions, takeWhile, dropWhile.
+{-# INLINE_NORMAL foldrT #-}
+foldrT :: (Monad m, Monad (t m), MonadTrans t)
+ => (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
+foldrT f final (Stream step state) = go SPEC state
+ where
+ {-# INLINE_LATE go #-}
+ go !_ st = do
+ r <- lift $ step defState st
+ case r of
+ Yield x s -> f x (go SPEC s)
+ Skip s -> go SPEC s
+ Stop -> final
+-- Left Folds
+-- XXX run begin action only if the stream is not empty.
+{-# INLINE_NORMAL foldlMx' #-}
+foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
+foldlMx' fstep begin done (Stream step state) =
+ begin >>= \x -> go SPEC x state
+ where
+ -- XXX !acc?
+ {-# INLINE_LATE go #-}
+ go !_ acc st = acc `seq` do
+ r <- step defState st
+ case r of
+ Yield x s -> do
+ acc' <- fstep acc x
+ go SPEC acc' s
+ Skip s -> go SPEC acc s
+ Stop -> done acc
+{-# INLINE foldlx' #-}
+foldlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
+foldlx' fstep begin done m =
+ foldlMx' (\b a -> return (fstep b a)) (return begin) (return . done) m
+-- Adapted from the vector package.
+-- XXX implement in terms of foldlMx'?
+{-# INLINE_NORMAL foldlM' #-}
+foldlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> m b
+foldlM' fstep mbegin (Stream step state) = do
+ begin <- mbegin
+ go SPEC begin state
+ where
+ {-# INLINE_LATE go #-}
+ go !_ acc st = acc `seq` do
+ r <- step defState st
+ case r of
+ Yield x s -> do
+ acc' <- fstep acc x
+ go SPEC acc' s
+ Skip s -> go SPEC acc s
+ Stop -> return acc
+{-# INLINE foldl' #-}
+foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b
+foldl' fstep begin = foldlM' (\b a -> return (fstep b a)) (return begin)
+-- To Containers
+{-# INLINE_NORMAL toList #-}
+toList :: Monad m => Stream m a -> m [a]
+toList = foldr (:) []
+-- Use foldr/build fusion to fuse with list consumers
+-- This can be useful when using the IsList instance
+{-# INLINE_LATE toListFB #-}
+toListFB :: (a -> b -> b) -> b -> Stream Identity a -> b
+toListFB c n (Stream step state) = go state
+ where
+ go st = case runIdentity (step defState st) of
+ Yield x s -> x `c` go s
+ Skip s -> go s
+ Stop -> n
+{-# RULES "toList Identity" toList = toListId #-}
+{-# INLINE_EARLY toListId #-}
+toListId :: Stream Identity a -> Identity [a]
+toListId s = Identity $ build (\c n -> toListFB c n s)
+-- Multi-stream folds
+-- Adapted from the vector package.
+{-# INLINE_NORMAL eqBy #-}
+eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
+eqBy eq (Stream step1 t1) (Stream step2 t2) = eq_loop0 SPEC t1 t2
+ where
+ eq_loop0 !_ s1 s2 = do
+ r <- step1 defState s1
+ case r of
+ Yield x s1' -> eq_loop1 SPEC x s1' s2
+ Skip s1' -> eq_loop0 SPEC s1' s2
+ Stop -> eq_null s2
+ eq_loop1 !_ x s1 s2 = do
+ r <- step2 defState s2
+ case r of
+ Yield y s2'
+ | eq x y -> eq_loop0 SPEC s1 s2'
+ | otherwise -> return False
+ Skip s2' -> eq_loop1 SPEC x s1 s2'
+ Stop -> return False
+ eq_null s2 = do
+ r <- step2 defState s2
+ case r of
+ Yield _ _ -> return False
+ Skip s2' -> eq_null s2'
+ Stop -> return True
+-- Adapted from the vector package.
+-- | Compare two streams lexicographically
+{-# INLINE_NORMAL cmpBy #-}
+ :: Monad m
+ => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
+cmpBy cmp (Stream step1 t1) (Stream step2 t2) = cmp_loop0 SPEC t1 t2
+ where
+ cmp_loop0 !_ s1 s2 = do
+ r <- step1 defState s1
+ case r of
+ Yield x s1' -> cmp_loop1 SPEC x s1' s2
+ Skip s1' -> cmp_loop0 SPEC s1' s2
+ Stop -> cmp_null s2
+ cmp_loop1 !_ x s1 s2 = do
+ r <- step2 defState s2
+ case r of
+ Yield y s2' -> case x `cmp` y of
+ EQ -> cmp_loop0 SPEC s1 s2'
+ c -> return c
+ Skip s2' -> cmp_loop1 SPEC x s1 s2'
+ Stop -> return GT
+ cmp_null s2 = do
+ r <- step2 defState s2
+ case r of
+ Yield _ _ -> return LT
+ Skip s2' -> cmp_null s2'
+ Stop -> return EQ
+-- Transformations
+-- Adapted from the vector package.
+-- | Map a monadic function over a 'Stream'
+{-# INLINE_NORMAL mapM #-}
+mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
+mapM f (Stream step state) = Stream step' state
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst st = do
+ r <- step (adaptState gst) st
+ case r of
+ Yield x s -> f x >>= \a -> return $ Yield a s
+ Skip s -> return $ Skip s
+ Stop -> return Stop
+{-# INLINE map #-}
+map :: Monad m => (a -> b) -> Stream m a -> Stream m b
+map f = mapM (return . f)
+instance Functor m => Functor (Stream m) where
+ {-# INLINE fmap #-}
+ fmap f (Stream step state) = Stream step' state
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst st = fmap (fmap f) (step (adaptState gst) st)
+ {-# INLINE (<$) #-}
+ (<$) = fmap . const
+-- Filtering
+-- Adapted from the vector package.
+{-# INLINE_NORMAL take #-}
+take :: Monad m => Int -> Stream m a -> Stream m a
+take n (Stream step state) = n `seq` Stream step' (state, 0)
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst (st, i) | i < n = do
+ r <- step gst st
+ return $ case r of
+ Yield x s -> Yield x (s, i + 1)
+ Skip s -> Skip (s, i)
+ Stop -> Stop
+ step' _ (_, _) = return Stop
+-- Adapted from the vector package.
+{-# INLINE_NORMAL takeWhileM #-}
+takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
+takeWhileM f (Stream step state) = Stream step' state
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst st = do
+ r <- step gst st
+ case r of
+ Yield x s -> do
+ b <- f x
+ return $ if b then Yield x s else Stop
+ Skip s -> return $ Skip s
+ Stop -> return Stop
+{-# INLINE takeWhile #-}
+takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
+takeWhile f = takeWhileM (return . f)
+-- Combine N Streams - concatAp
+{-# INLINE_NORMAL concatAp #-}
+concatAp :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b
+concatAp (Stream stepa statea) (Stream stepb stateb) = Stream step' (Left statea)
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst (Left st) = fmap
+ (\r -> case r of
+ Yield f s -> Skip (Right (f, s, stateb))
+ Skip s -> Skip (Left s)
+ Stop -> Stop)
+ (stepa (adaptState gst) st)
+ step' gst (Right (f, os, st)) = fmap
+ (\r -> case r of
+ Yield a s -> Yield (f a) (Right (f, os, s))
+ Skip s -> Skip (Right (f,os, s))
+ Stop -> Skip (Left os))
+ (stepb (adaptState gst) st)
+{-# INLINE_NORMAL apSequence #-}
+apSequence :: Functor f => Stream f a -> Stream f b -> Stream f b
+apSequence (Stream stepa statea) (Stream stepb stateb) =
+ Stream step (Left statea)
+ where
+ {-# INLINE_LATE step #-}
+ step gst (Left st) =
+ fmap
+ (\r ->
+ case r of
+ Yield _ s -> Skip (Right (s, stateb))
+ Skip s -> Skip (Left s)
+ Stop -> Stop)
+ (stepa (adaptState gst) st)
+ step gst (Right (ostate, st)) =
+ fmap
+ (\r ->
+ case r of
+ Yield b s -> Yield b (Right (ostate, s))
+ Skip s -> Skip (Right (ostate, s))
+ Stop -> Skip (Left ostate))
+ (stepb gst st)
+{-# INLINE_NORMAL apDiscardSnd #-}
+apDiscardSnd :: Functor f => Stream f a -> Stream f b -> Stream f a
+apDiscardSnd (Stream stepa statea) (Stream stepb stateb) =
+ Stream step (Left statea)
+ where
+ {-# INLINE_LATE step #-}
+ step gst (Left st) =
+ fmap
+ (\r ->
+ case r of
+ Yield b s -> Skip (Right (s, stateb, b))
+ Skip s -> Skip (Left s)
+ Stop -> Stop)
+ (stepa gst st)
+ step gst (Right (ostate, st, b)) =
+ fmap
+ (\r ->
+ case r of
+ Yield _ s -> Yield b (Right (ostate, s, b))
+ Skip s -> Skip (Right (ostate, s, b))
+ Stop -> Skip (Left ostate))
+ (stepb (adaptState gst) st)
+instance Applicative f => Applicative (Stream f) where
+ {-# INLINE pure #-}
+ pure = yield
+ {-# INLINE (<*>) #-}
+ (<*>) = concatAp
+#if MIN_VERSION_base(4,10,0)
+ {-# INLINE liftA2 #-}
+ liftA2 f x = (<*>) (fmap f x)
+ {-# INLINE (*>) #-}
+ (*>) = apSequence
+ {-# INLINE (<*) #-}
+ (<*) = apDiscardSnd
+-- Combine N Streams - unfoldMany
+-- Define a unique structure to use in inspection testing
+data ConcatMapUState o i =
+ ConcatMapUOuter o
+ | ConcatMapUInner o i
+-- | @unfoldMany unfold stream@ uses @unfold@ to map the input stream elements
+-- to streams and then flattens the generated streams into a single output
+-- stream.
+-- This is like 'concatMap' but uses an unfold with an explicit state to
+-- generate the stream instead of a 'Stream' type generator. This allows better
+-- optimization via fusion. This can be many times more efficient than
+-- 'concatMap'.
+{-# INLINE_NORMAL unfoldMany #-}
+unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b
+unfoldMany (Unfold istep inject) (Stream ostep ost) =
+ Stream step (ConcatMapUOuter ost)
+ where
+ {-# INLINE_LATE step #-}
+ step gst (ConcatMapUOuter o) = do
+ r <- ostep (adaptState gst) o
+ case r of
+ Yield a o' -> do
+ i <- inject a
+ i `seq` return (Skip (ConcatMapUInner o' i))
+ Skip o' -> return $ Skip (ConcatMapUOuter o')
+ Stop -> return $ Stop
+ step _ (ConcatMapUInner o i) = do
+ r <- istep i
+ return $ case r of
+ Yield x i' -> Yield x (ConcatMapUInner o i')
+ Skip i' -> Skip (ConcatMapUInner o i')
+ Stop -> Skip (ConcatMapUOuter o)
+-- Combine N Streams - concatMap
+-- Adapted from the vector package.
+{-# INLINE_NORMAL concatMapM #-}
+concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b
+concatMapM f (Stream step state) = Stream step' (Left state)
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst (Left st) = do
+ r <- step (adaptState gst) st
+ case r of
+ Yield a s -> do
+ b_stream <- f a
+ return $ Skip (Right (b_stream, s))
+ Skip s -> return $ Skip (Left s)
+ Stop -> return Stop
+ -- XXX flattenArrays is 5x faster than "concatMap fromArray". if somehow we
+ -- can get inner_step to inline and fuse here we can perhaps get the same
+ -- performance using "concatMap fromArray".
+ --
+ -- XXX using the pattern synonym "Stream" causes a major performance issue
+ -- here even if the synonym does not include an adaptState call. Need to
+ -- find out why. Is that something to be fixed in GHC?
+ step' gst (Right (UnStream inner_step inner_st, st)) = do
+ r <- inner_step (adaptState gst) inner_st
+ case r of
+ Yield b inner_s ->
+ return $ Yield b (Right (Stream inner_step inner_s, st))
+ Skip inner_s ->
+ return $ Skip (Right (Stream inner_step inner_s, st))
+ Stop -> return $ Skip (Left st)
+{-# INLINE concatMap #-}
+concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b
+concatMap f = concatMapM (return . f)
+-- XXX The idea behind this rule is to rewrite any calls to "concatMap
+-- fromArray" automatically to flattenArrays which is much faster. However, we
+-- need an INLINE_EARLY on concatMap for this rule to fire. But if we use
+-- INLINE_EARLY on concatMap or fromArray then direct uses of
+-- "concatMap fromArray" (without the RULE) become much slower, this means
+-- "concatMap f" in general would become slower. Need to find a solution to
+-- this.
+-- {-# RULES "concatMap Array.toStreamD"
+-- concatMap Array.toStreamD = Array.flattenArray #-}
+-- NOTE: even though concatMap for StreamD is 4x faster compared to StreamK,
+-- the monad instance does not seem to be significantly faster.
+instance Monad m => Monad (Stream m) where
+ {-# INLINE return #-}
+ return = pure
+ {-# INLINE (>>=) #-}
+ (>>=) = flip concatMap
+ {-# INLINE (>>) #-}
+ (>>) = (*>)
+-- Grouping/Splitting
+-- s = stream state, fs = fold state
+-- {-# ANN type FoldManyPost Fuse #-}
+data FoldManyPost s fs b a
+ = FoldManyPostStart s
+ | FoldManyPostLoop s fs
+ | FoldManyPostYield b (FoldManyPost s fs b a)
+ | FoldManyPostDone
+-- | Like foldMany but with the following differences:
+-- * If the stream is empty the default value of the fold would still be
+-- emitted in the output.
+-- * At the end of the stream if the last application of the fold did not
+-- receive any input it would still yield the default fold accumulator as the
+-- last value.
+{-# INLINE_NORMAL foldManyPost #-}
+foldManyPost :: Monad m => Fold m a b -> Stream m a -> Stream m b
+foldManyPost (Fold fstep initial extract) (Stream step state) =
+ Stream step' (FoldManyPostStart state)
+ where
+ {-# INLINE consume #-}
+ consume x s fs = do
+ res <- fstep fs x
+ return
+ $ Skip
+ $ case res of
+ FL.Done b -> FoldManyPostYield b (FoldManyPostStart s)
+ FL.Partial ps -> FoldManyPostLoop s ps
+ {-# INLINE_LATE step' #-}
+ step' _ (FoldManyPostStart st) = do
+ r <- initial
+ return
+ $ Skip
+ $ case r of
+ FL.Done b -> FoldManyPostYield b (FoldManyPostStart st)
+ FL.Partial fs -> FoldManyPostLoop st fs
+ step' gst (FoldManyPostLoop st fs) = do
+ r <- step (adaptState gst) st
+ case r of
+ Yield x s -> consume x s fs
+ Skip s -> return $ Skip (FoldManyPostLoop s fs)
+ Stop -> do
+ b <- extract fs
+ return $ Skip (FoldManyPostYield b FoldManyPostDone)
+ step' _ (FoldManyPostYield b next) = return $ Yield b next
+ step' _ FoldManyPostDone = return Stop
+-- {-# ANN type FoldMany Fuse #-}
+data FoldMany s fs b a
+ = FoldManyStart s
+ | FoldManyFirst fs s
+ | FoldManyLoop s fs
+ | FoldManyYield b (FoldMany s fs b a)
+ | FoldManyDone
+-- | Apply a fold multiple times until the stream ends. If the stream is empty
+-- the output would be empty.
+-- @foldMany f = parseMany (fromFold f)@
+-- A terminating fold may terminate even without accepting a single input. So
+-- we run the fold's initial action before evaluating the stream. However, this
+-- means that if later the stream does not yield anything we have to discard
+-- the fold's initial result which could have generated an effect.
+{-# INLINE_NORMAL foldMany #-}
+foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b
+foldMany (Fold fstep initial extract) (Stream step state) =
+ Stream step' (FoldManyStart state)
+ where
+ {-# INLINE consume #-}
+ consume x s fs = do
+ res <- fstep fs x
+ return
+ $ Skip
+ $ case res of
+ FL.Done b -> FoldManyYield b (FoldManyStart s)
+ FL.Partial ps -> FoldManyLoop s ps
+ {-# INLINE_LATE step' #-}
+ step' _ (FoldManyStart st) = do
+ r <- initial
+ return
+ $ Skip
+ $ case r of
+ FL.Done b -> FoldManyYield b (FoldManyStart st)
+ FL.Partial fs -> FoldManyFirst fs st
+ step' gst (FoldManyFirst fs st) = do
+ r <- step (adaptState gst) st
+ case r of
+ Yield x s -> consume x s fs
+ Skip s -> return $ Skip (FoldManyFirst fs s)
+ Stop -> return Stop
+ step' gst (FoldManyLoop st fs) = do
+ r <- step (adaptState gst) st
+ case r of
+ Yield x s -> consume x s fs
+ Skip s -> return $ Skip (FoldManyLoop s fs)
+ Stop -> do
+ b <- extract fs
+ return $ Skip (FoldManyYield b FoldManyDone)
+ step' _ (FoldManyYield b next) = return $ Yield b next
+ step' _ FoldManyDone = return Stop
+{-# INLINE chunksOf #-}
+chunksOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b
+chunksOf n f = foldMany (FL.take n f)
+data GroupState2 s fs
+ = GroupStart2 s
+ | GroupBuffer2 s fs Int
+ | GroupYield2 fs (GroupState2 s fs)
+ | GroupFinish2
+{-# INLINE_NORMAL groupsOf2 #-}
+ :: Monad m
+ => Int
+ -> m c
+ -> Fold2 m c a b
+ -> Stream m a
+ -> Stream m b
+groupsOf2 n input (Fold2 fstep inject extract) (Stream step state) =
+ n `seq` Stream step' (GroupStart2 state)
+ where
+ {-# INLINE_LATE step' #-}
+ step' _ (GroupStart2 st) = do
+ -- XXX shall we use the Natural type instead? Need to check performance
+ -- implications.
+ when (n <= 0) $
+ -- XXX we can pass the module string from the higher level API
+ error $ "Streamly.Internal.Data.Stream.StreamD.Type.groupsOf: the size of "
+ ++ "groups [" ++ show n ++ "] must be a natural number"
+ -- fs = fold state
+ fs <- input >>= inject
+ return $ Skip (GroupBuffer2 st fs 0)
+ step' gst (GroupBuffer2 st fs i) = do
+ r <- step (adaptState gst) st
+ case r of
+ Yield x s -> do
+ !fs' <- fstep fs x
+ let i' = i + 1
+ return $
+ if i' >= n
+ then Skip (GroupYield2 fs' (GroupStart2 s))
+ else Skip (GroupBuffer2 s fs' i')
+ Skip s -> return $ Skip (GroupBuffer2 s fs i)
+ Stop -> return $ Skip (GroupYield2 fs GroupFinish2)
+ step' _ (GroupYield2 fs next) = do
+ r <- extract fs
+ return $ Yield r next
+ step' _ GroupFinish2 = return Stop
+-- Other instances
+instance MonadTrans Stream where
+ {-# INLINE lift #-}
+ lift = yieldM
+instance (MonadThrow m) => MonadThrow (Stream m) where
+ throwM = lift . throwM
+ -}
+{-# INLINE_NORMAL unfoldrM #-}
+unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a
+unfoldrM next state = Stream step state
+ where
+ {-# INLINE_LATE step #-}
+ step _ st = do
+ r <- next st
+ return $ case r of
+ Just (x, s) -> Yield x s
+ Nothing -> Stop
+{-# INLINE_LATE drain #-}
+drain :: Monad m => Stream m a -> m ()
+-- drain = foldrM (\_ xs -> xs) (return ())
+drain (Stream step state) = go SPEC state
+ where
+ go !_ st = do
+ r <- step defState st
+ case r of
+ Yield _ s -> go SPEC s
+ Skip s -> go SPEC s
+ Stop -> return ()
+-- Scanning with a Fold
+data ScanState s f = ScanInit s | ScanDo s !f | ScanDone
+{-# INLINE [1] postscanOnce #-}
+postscanOnce :: Monad m => FL.Fold m a b -> Stream m a -> Stream m b
+postscanOnce (FL.Fold fstep initial extract) (Stream sstep state) =
+ Stream step (ScanInit state)
+ where
+ {-# INLINE_LATE step #-}
+ step _ (ScanInit st) = do
+ res <- initial
+ return
+ $ case res of
+ FL.Partial fs -> Skip $ ScanDo st fs
+ FL.Done b -> Yield b ScanDone
+ step gst (ScanDo st fs) = do
+ res <- sstep (adaptState gst) st
+ case res of
+ Yield x s -> do
+ r <- fstep fs x
+ case r of
+ FL.Partial fs1 -> do
+ !b <- extract fs1
+ return $ Yield b $ ScanDo s fs1
+ FL.Done b -> return $ Yield b ScanDone
+ Skip s -> return $ Skip $ ScanDo s fs
+ Stop -> return Stop
+ step _ ScanDone = return Stop
+{-# INLINE [1] replicateM #-}
+replicateM :: forall m a. Monad m => Int -> m a -> Stream m a
+replicateM n p = Stream step n
+ where
+ {-# INLINE_LATE step #-}
+ step _ (i :: Int)
+ | i <= 0 = return Stop
+ | otherwise = do
+ x <- p
+ return $ Yield x (i - 1)
+{-# INLINE [1] replicate #-}
+replicate :: Monad m => Int -> a -> Stream m a
+replicate n x = replicateM n (return x)
+{-# INLINE [1] after_ #-}
+after_ :: Monad m => m b -> Stream m a -> Stream m a
+after_ action (Stream step state) = Stream step' state
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst st = do
+ res <- step gst st
+ case res of
+ Yield x s -> return $ Yield x s
+ Skip s -> return $ Skip s
+ Stop -> action >> return Stop
+{-# LANGUAGE UndecidableInstances #-}
+{-# LANGUAGE KindSignatures #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE QuantifiedConstraints #-}
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE InstanceSigs #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+-- |
+-- Module : Streamly.Internal.Data.Stream.StreamK.Type
+-- Copyright : (c) 2017 Composewell Technologies
+-- License : BSD3
+-- Maintainer :
+-- Stability : experimental
+-- Portability : GHC
+-- Continuation passing style (CPS) stream implementation. The symbol 'K' below
+-- denotes a function as well as a Kontinuation.
+module StreamK
+ (
+ -- * A class for streams
+ IsStream (..)
+ , adapt
+ , State
+ , adaptState
+ , defState
+ , MonadAsync
+ , unfoldrM
+ , drain
+ -- * The stream type
+ , Stream (..)
+ -- * Construction
+ , mkStream
+ , fromStopK
+ , fromYieldK
+ , consK
+ -- * Elimination
+ , foldStream
+ , foldStreamShared
+ , foldl'
+ , foldlx'
+ -- * foldr/build
+ , foldrM
+ , foldrS
+ , foldrSShared
+ , foldrSM
+ , build
+ , buildS
+ , buildM
+ , buildSM
+ , sharedM
+ , augmentS
+ , augmentSM
+ -- instances
+ , cons
+ , (.:)
+ , consMStream
+ , consMBy
+ , yieldM
+ , yield
+ , nil
+ , nilM
+ , conjoin
+ , serial
+ , map
+ , mapM
+ , mapMSerial
+ , unShare
+ , concatMapBy
+ , concatMap
+ , bindWith
+ , concatPairsWith
+ , apWith
+ , apSerial
+ , apSerialDiscardFst
+ , apSerialDiscardSnd
+ , Streaming -- deprecated
+ )
+#include "inline.hs"
+import Control.Monad (ap, (>=>))
+-- import Control.Monad.Trans.Class (MonadTrans(lift))
+import Data.Kind (Type)
+-- import Control.Monad.Catch (MonadThrow)
+import Control.Monad.IO.Class (MonadIO(..))
+#if __GLASGOW_HASKELL__ < 808
+import Data.Semigroup (Semigroup(..))
+import Prelude hiding (map, mapM, concatMap, foldr)
+-- import Streamly.Internal.Data.SVar
+-- Basic stream type
+data State (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a = State
+ {
+ _inspectMode :: Bool
+ }
+adaptState :: State t m a -> State t n b
+adaptState st = st
+ {
+ _inspectMode = False
+ }
+defState :: State t m a
+defState = State
+ {
+ _inspectMode = False
+ }
+-- | The type @Stream m a@ represents a monadic stream of values of type 'a'
+-- constructed using actions in monad 'm'. It uses stop, singleton and yield
+-- continuations equivalent to the following direct style type:
+-- @
+-- data Stream m a = Stop | Singleton a | Yield a (Stream m a)
+-- @
+-- To facilitate parallel composition we maintain a local state in an 'SVar'
+-- that is shared across and is used for synchronization of the streams being
+-- composed.
+-- The singleton case can be expressed in terms of stop and yield but we have
+-- it as a separate case to optimize composition operations for streams with
+-- single element. We build singleton streams in the implementation of 'pure'
+-- for Applicative and Monad, and in 'lift' for MonadTrans.
+-- XXX remove the Stream type parameter from State as it is always constant.
+-- We can remove it from SVar as well
+newtype Stream (m :: Type -> Type) a =
+ MkStream (forall r.
+ State Stream m a -- state
+ -> (a -> Stream m a -> m r) -- yield
+ -> (a -> m r) -- singleton
+ -> m r -- stop
+ -> m r
+ )
+-- Types that can behave as a Stream
+infixr 5 `consM`
+infixr 5 |:
+type MonadAsync m = (MonadIO m)
+-- XXX Use a different SVar based on the stream type. But we need to make sure
+-- that we do not lose performance due to polymorphism.
+-- | Class of types that can represent a stream of elements of some type 'a' in
+-- some monad 'm'.
+-- /Since: 0.2.0 ("Streamly")/
+-- @since 0.8.0
+#if __GLASGOW_HASKELL__ >= 806
+ ( forall m a. MonadAsync m => Semigroup (t m a)
+ , forall m a. MonadAsync m => Monoid (t m a)
+ , forall m. Monad m => Functor (t m)
+ , forall m. MonadAsync m => Applicative (t m)
+ ) =>
+ IsStream t where
+ toStream :: t m a -> Stream m a
+ fromStream :: Stream m a -> t m a
+ -- | Constructs a stream by adding a monadic action at the head of an
+ -- existing stream. For example:
+ --
+ -- @
+ -- > toList $ getLine \`consM` getLine \`consM` nil
+ -- hello
+ -- world
+ -- ["hello","world"]
+ -- @
+ --
+ -- /Concurrent (do not use 'fromParallel' to construct infinite streams)/
+ --
+ -- @since 0.2.0
+ consM :: MonadAsync m => m a -> t m a -> t m a
+ -- | Operator equivalent of 'consM'. We can read it as "@parallel colon@"
+ -- to remember that @|@ comes before ':'.
+ --
+ -- @
+ -- > toList $ getLine |: getLine |: nil
+ -- hello
+ -- world
+ -- ["hello","world"]
+ -- @
+ --
+ -- @
+ -- let delay = threadDelay 1000000 >> print 1
+ -- drain $ fromSerial $ delay |: delay |: delay |: nil
+ -- drain $ fromParallel $ delay |: delay |: delay |: nil
+ -- @
+ --
+ -- /Concurrent (do not use 'fromParallel' to construct infinite streams)/
+ --
+ -- @since 0.2.0
+ (|:) :: MonadAsync m => m a -> t m a -> t m a
+ -- We can define (|:) just as 'consM' but it is defined explicitly for each
+ -- type because we want to use SPECIALIZE pragma on the definition.
+-- | Same as 'IsStream'.
+-- @since 0.1.0
+{-# DEPRECATED Streaming "Please use IsStream instead." #-}
+type Streaming = IsStream
+-- Type adapting combinators
+-- XXX Move/reset the State here by reconstructing the stream with cleared
+-- state. Can we make sure we do not do that when t1 = t2? If we do this then
+-- we do not need to do that explicitly using svarStyle. It would act as
+-- unShare when the stream type is the same.
+-- | Adapt any specific stream type to any other specific stream type.
+-- /Since: 0.1.0 ("Streamly")/
+-- @since 0.8.0
+adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
+adapt = fromStream . toStream
+-- Building a stream
+-- XXX The State is always parameterized by "Stream" which means State is not
+-- different for different stream types. So we have to manually make sure that
+-- when converting from one stream to another we migrate the state correctly.
+-- This can be fixed if we use a different SVar type for different streams.
+-- Currently we always use "SVar Stream" and therefore a different State type
+-- parameterized by that stream.
+-- XXX Since t is coercible we should be able to coerce k
+-- mkStream k = fromStream $ MkStream $ coerce k
+-- | Build a stream from an 'SVar', a stop continuation, a singleton stream
+-- continuation and a yield continuation.
+{-# INLINE_EARLY mkStream #-}
+mkStream :: IsStream t
+ => (forall r. State Stream m a
+ -> (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r)
+ -> t m a
+mkStream k = fromStream $ MkStream $ \st yld sng stp ->
+ let yieldk a r = yld a (toStream r)
+ in k st yieldk sng stp
+{-# RULES "mkStream from stream" mkStream = mkStreamFromStream #-}
+mkStreamFromStream :: IsStream t
+ => (forall r. State Stream m a
+ -> (a -> Stream m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r)
+ -> t m a
+mkStreamFromStream k = fromStream $ MkStream k
+{-# RULES "mkStream stream" mkStream = mkStreamStream #-}
+ :: (forall r. State Stream m a
+ -> (a -> Stream m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r)
+ -> Stream m a
+mkStreamStream = MkStream
+-- | A terminal function that has no continuation to follow.
+type StopK m = forall r. m r -> m r
+-- | A monadic continuation, it is a function that yields a value of type "a"
+-- and calls the argument (a -> m r) as a continuation with that value. We can
+-- also think of it as a callback with a handler (a -> m r). Category
+-- theorists call it a codensity type, a special type of right kan extension.
+type YieldK m a = forall r. (a -> m r) -> m r
+_wrapM :: Monad m => m a -> YieldK m a
+_wrapM m = \k -> m >>= k
+-- | Make an empty stream from a stop function.
+fromStopK :: IsStream t => StopK m -> t m a
+fromStopK k = mkStream $ \_ _ _ stp -> k stp
+-- | Make a singleton stream from a callback function. The callback function
+-- calls the one-shot yield continuation to yield an element.
+fromYieldK :: IsStream t => YieldK m a -> t m a
+fromYieldK k = mkStream $ \_ _ sng _ -> k sng
+-- | Add a yield function at the head of the stream.
+consK :: IsStream t => YieldK m a -> t m a -> t m a
+consK k r = mkStream $ \_ yld _ _ -> k (\x -> yld x r)
+-- XXX Build a stream from a repeating callback function.
+-- Construction
+infixr 5 `cons`
+-- faster than consM because there is no bind.
+-- | Construct a stream by adding a pure value at the head of an existing
+-- stream. For serial streams this is the same as @(return a) \`consM` r@ but
+-- more efficient. For concurrent streams this is not concurrent whereas
+-- 'consM' is concurrent. For example:
+-- @
+-- > toList $ 1 \`cons` 2 \`cons` 3 \`cons` nil
+-- [1,2,3]
+-- @
+-- @since 0.1.0
+{-# INLINE_NORMAL cons #-}
+cons :: IsStream t => a -> t m a -> t m a
+cons a r = mkStream $ \_ yld _ _ -> yld a r
+infixr 5 .:
+-- | Operator equivalent of 'cons'.
+-- @
+-- > toList $ 1 .: 2 .: 3 .: nil
+-- [1,2,3]
+-- @
+-- @since 0.1.1
+{-# INLINE (.:) #-}
+(.:) :: IsStream t => a -> t m a -> t m a
+(.:) = cons
+-- | An empty stream.
+-- @
+-- > toList nil
+-- []
+-- @
+-- @since 0.1.0
+{-# INLINE_NORMAL nil #-}
+nil :: IsStream t => t m a
+nil = mkStream $ \_ _ _ stp -> stp
+-- | An empty stream producing a side effect.
+-- @
+-- > toList (nilM (print "nil"))
+-- "nil"
+-- []
+-- @
+-- /Pre-release/
+{-# INLINE_NORMAL nilM #-}
+nilM :: (IsStream t, Monad m) => m b -> t m a
+nilM m = mkStream $ \_ _ _ stp -> m >> stp
+{-# INLINE_NORMAL yield #-}
+yield :: IsStream t => a -> t m a
+yield a = mkStream $ \_ _ single _ -> single a
+{-# INLINE_NORMAL yieldM #-}
+yieldM :: (Monad m, IsStream t) => m a -> t m a
+yieldM m = fromStream $ mkStream $ \_ _ single _ -> m >>= single
+-- XXX specialize to IO?
+{-# INLINE consMBy #-}
+consMBy :: (IsStream t, MonadAsync m) => (t m a -> t m a -> t m a)
+ -> m a -> t m a -> t m a
+consMBy f m r = (fromStream $ yieldM m) `f` r
+-- Folding a stream
+-- | Fold a stream by providing an SVar, a stop continuation, a singleton
+-- continuation and a yield continuation. The stream would share the current
+-- SVar passed via the State.
+{-# INLINE_EARLY foldStreamShared #-}
+ :: IsStream t
+ => State Stream m a
+ -> (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> t m a
+ -> m r
+foldStreamShared st yld sng stp m =
+ let yieldk a x = yld a (fromStream x)
+ MkStream k = toStream m
+ in k st yieldk sng stp
+-- XXX write a similar rule for foldStream as well?
+{-# RULES "foldStreamShared from stream"
+ foldStreamShared = foldStreamSharedStream #-}
+ :: State Stream m a
+ -> (a -> Stream m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> Stream m a
+ -> m r
+foldStreamSharedStream st yld sng stp m =
+ let MkStream k = toStream m
+ in k st yld sng stp
+-- | Fold a stream by providing a State, stop continuation, a singleton
+-- continuation and a yield continuation. The stream will not use the SVar
+-- passed via State.
+{-# INLINE foldStream #-}
+ :: IsStream t
+ => State Stream m a
+ -> (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> t m a
+ -> m r
+foldStream st yld sng stp m =
+ let yieldk a x = yld a (fromStream x)
+ MkStream k = toStream m
+ in k (adaptState st) yieldk sng stp
+-- Instances
+-- NOTE: specializing the function outside the instance definition seems to
+-- improve performance quite a bit at times, even if we have the same
+-- SPECIALIZE in the instance definition.
+{-# INLINE consMStream #-}
+{-# SPECIALIZE consMStream :: IO a -> Stream IO a -> Stream IO a #-}
+consMStream :: (Monad m) => m a -> Stream m a -> Stream m a
+consMStream m r = MkStream $ \_ yld _ _ -> m >>= \a -> yld a r
+-- IsStream Stream
+instance IsStream Stream where
+ toStream = id
+ fromStream = id
+ {-# INLINE consM #-}
+ {-# SPECIALIZE consM :: IO a -> Stream IO a -> Stream IO a #-}
+ consM :: Monad m => m a -> Stream m a -> Stream m a
+ consM = consMStream
+ {-# INLINE (|:) #-}
+ {-# SPECIALIZE (|:) :: IO a -> Stream IO a -> Stream IO a #-}
+ (|:) :: Monad m => m a -> Stream m a -> Stream m a
+ (|:) = consMStream
+-- foldr/build fusion
+-- XXX perhaps we can just use foldrSM/buildM everywhere as they are more
+-- general and cover foldrS/buildS as well.
+-- | The function 'f' decides how to reconstruct the stream. We could
+-- reconstruct using a shared state (SVar) or without sharing the state.
+{-# INLINE foldrSWith #-}
+foldrSWith :: IsStream t
+ => (forall r. State Stream m b
+ -> (b -> t m b -> m r)
+ -> (b -> m r)
+ -> m r
+ -> t m b
+ -> m r)
+ -> (a -> t m b -> t m b) -> t m b -> t m a -> t m b
+foldrSWith f step final m = go m
+ where
+ go m1 = mkStream $ \st yld sng stp ->
+ let run x = f st yld sng stp x
+ stop = run final
+ single a = run $ step a final
+ yieldk a r = run $ step a (go r)
+ -- XXX if type a and b are the same we do not need adaptState, can we
+ -- save some perf with that?
+ -- XXX since we are using adaptState anyway here we can use
+ -- foldStreamShared instead, will that save some perf?
+ in foldStream (adaptState st) yieldk single stop m1
+-- XXX we can use rewrite rules just for foldrSWith, if the function f is the
+-- same we can rewrite it.
+-- | Fold sharing the SVar state within the reconstructed stream
+{-# INLINE_NORMAL foldrSShared #-}
+foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
+foldrSShared = foldrSWith foldStreamShared
+-- XXX consM is a typeclass method, therefore rewritten already. Instead maybe
+-- we can make consM polymorphic using rewrite rules.
+-- {-# RULES "foldrSShared/id" foldrSShared consM nil = \x -> x #-}
+{-# RULES "foldrSShared/nil"
+ forall k z. foldrSShared k z nil = z #-}
+{-# RULES "foldrSShared/single"
+ forall k z x. foldrSShared k z (yield x) = k x z #-}
+-- {-# RULES "foldrSShared/app" [1]
+-- forall ys. foldrSShared consM ys = \xs -> xs `conjoin` ys #-}
+-- | Lazy right associative fold to a stream.
+{-# INLINE_NORMAL foldrS #-}
+foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
+foldrS = foldrSWith foldStream
+{-# RULES "foldrS/id" foldrS cons nil = \x -> x #-}
+{-# RULES "foldrS/nil" forall k z. foldrS k z nil = z #-}
+-- See notes in GHC.Base about this rule
+-- {-# RULES "foldr/cons"
+-- forall k z x xs. foldrS k z (x `cons` xs) = k x (foldrS k z xs) #-}
+{-# RULES "foldrS/single" forall k z x. foldrS k z (yield x) = k x z #-}
+-- {-# RULES "foldrS/app" [1]
+-- forall ys. foldrS cons ys = \xs -> xs `conjoin` ys #-}
+-- foldrS with monadic cons i.e. consM
+{-# INLINE foldrSMWith #-}
+foldrSMWith :: (IsStream t, Monad m)
+ => (forall r. State Stream m b
+ -> (b -> t m b -> m r)
+ -> (b -> m r)
+ -> m r
+ -> t m b
+ -> m r)
+ -> (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
+foldrSMWith f step final m = go m
+ where
+ go m1 = mkStream $ \st yld sng stp ->
+ let run x = f st yld sng stp x
+ stop = run final
+ single a = run $ step (return a) final
+ yieldk a r = run $ step (return a) (go r)
+ in foldStream (adaptState st) yieldk single stop m1
+{-# INLINE_NORMAL foldrSM #-}
+foldrSM :: (IsStream t, Monad m)
+ => (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
+foldrSM = foldrSMWith foldStream
+-- {-# RULES "foldrSM/id" foldrSM consM nil = \x -> x #-}
+{-# RULES "foldrSM/nil" forall k z. foldrSM k z nil = z #-}
+{-# RULES "foldrSM/single" forall k z x. foldrSM k z (yieldM x) = k x z #-}
+-- {-# RULES "foldrSM/app" [1]
+-- forall ys. foldrSM consM ys = \xs -> xs `conjoin` ys #-}
+-- Like foldrSM but sharing the SVar state within the recostructed stream.
+{-# INLINE_NORMAL foldrSMShared #-}
+foldrSMShared :: (IsStream t, Monad m)
+ => (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
+foldrSMShared = foldrSMWith foldStreamShared
+-- {-# RULES "foldrSM/id" foldrSM consM nil = \x -> x #-}
+{-# RULES "foldrSMShared/nil"
+ forall k z. foldrSMShared k z nil = z #-}
+{-# RULES "foldrSMShared/single"
+ forall k z x. foldrSMShared k z (yieldM x) = k x z #-}
+-- {-# RULES "foldrSM/app" [1]
+-- forall ys. foldrSM consM ys = \xs -> xs `conjoin` ys #-}
+-- build
+{-# INLINE_NORMAL build #-}
+build :: IsStream t => forall a. (forall b. (a -> b -> b) -> b -> b) -> t m a
+build g = g cons nil
+{-# RULES "foldrM/build"
+ forall k z (g :: forall b. (a -> b -> b) -> b -> b).
+ foldrM k z (build g) = g k z #-}
+{-# RULES "foldrS/build"
+ forall k z (g :: forall b. (a -> b -> b) -> b -> b).
+ foldrS k z (build g) = g k z #-}
+{-# RULES "foldrS/cons/build"
+ forall k z x (g :: forall b. (a -> b -> b) -> b -> b).
+ foldrS k z (x `cons` build g) = k x (g k z) #-}
+{-# RULES "foldrSShared/build"
+ forall k z (g :: forall b. (a -> b -> b) -> b -> b).
+ foldrSShared k z (build g) = g k z #-}
+{-# RULES "foldrSShared/cons/build"
+ forall k z x (g :: forall b. (a -> b -> b) -> b -> b).
+ foldrSShared k z (x `cons` build g) = k x (g k z) #-}
+-- build a stream by applying cons and nil to a build function
+{-# INLINE_NORMAL buildS #-}
+buildS :: IsStream t => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a
+buildS g = g cons nil
+{-# RULES "foldrS/buildS"
+ forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a).
+ foldrS k z (buildS g) = g k z #-}
+{-# RULES "foldrS/cons/buildS"
+ forall k z x (g :: (a -> t m a -> t m a) -> t m a -> t m a).
+ foldrS k z (x `cons` buildS g) = k x (g k z) #-}
+{-# RULES "foldrSShared/buildS"
+ forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a).
+ foldrSShared k z (buildS g) = g k z #-}
+{-# RULES "foldrSShared/cons/buildS"
+ forall k z x (g :: (a -> t m a -> t m a) -> t m a -> t m a).
+ foldrSShared k z (x `cons` buildS g) = k x (g k z) #-}
+-- build a stream by applying consM and nil to a build function
+{-# INLINE_NORMAL buildSM #-}
+buildSM :: (IsStream t, MonadAsync m)
+ => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a
+buildSM g = g consM nil
+{-# RULES "foldrSM/buildSM"
+ forall k z (g :: (m a -> t m a -> t m a) -> t m a -> t m a).
+ foldrSM k z (buildSM g) = g k z #-}
+{-# RULES "foldrSMShared/buildSM"
+ forall k z (g :: (m a -> t m a -> t m a) -> t m a -> t m a).
+ foldrSMShared k z (buildSM g) = g k z #-}
+-- Disabled because this may not fire as consM is a class Op
+{-# RULES "foldrS/consM/buildSM"
+ forall k z x (g :: (m a -> t m a -> t m a) -> t m a -> t m a)
+ . foldrSM k z (x `consM` buildSM g)
+ = k x (g k z)
+-- Build using monadic build functions (continuations) instead of
+-- reconstructing a stream.
+{-# INLINE_NORMAL buildM #-}
+buildM :: (IsStream t, MonadAsync m)
+ => (forall r. (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r
+ )
+ -> t m a
+buildM g = mkStream $ \st yld sng stp ->
+ g (\a r -> foldStream st yld sng stp (return a `consM` r)) sng stp
+-- | Like 'buildM' but shares the SVar state across computations.
+{-# INLINE_NORMAL sharedM #-}
+sharedM :: (IsStream t, MonadAsync m)
+ => (forall r. (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r
+ )
+ -> t m a
+sharedM g = mkStream $ \st yld sng stp ->
+ g (\a r -> foldStreamShared st yld sng stp (return a `consM` r)) sng stp
+-- augment
+{-# INLINE_NORMAL augmentS #-}
+augmentS :: IsStream t
+ => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
+augmentS g xs = g cons xs
+{-# RULES "augmentS/nil"
+ forall (g :: (a -> t m a -> t m a) -> t m a -> t m a).
+ augmentS g nil = buildS g
+ #-}
+{-# RULES "foldrS/augmentS"
+ forall k z xs (g :: (a -> t m a -> t m a) -> t m a -> t m a).
+ foldrS k z (augmentS g xs) = g k (foldrS k z xs)
+ #-}
+{-# RULES "augmentS/buildS"
+ forall (g :: (a -> t m a -> t m a) -> t m a -> t m a)
+ (h :: (a -> t m a -> t m a) -> t m a -> t m a).
+ augmentS g (buildS h) = buildS (\c n -> g c (h c n))
+ #-}
+{-# INLINE_NORMAL augmentSM #-}
+augmentSM :: (IsStream t, MonadAsync m)
+ => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
+augmentSM g xs = g consM xs
+{-# RULES "augmentSM/nil"
+ forall (g :: (m a -> t m a -> t m a) -> t m a -> t m a).
+ augmentSM g nil = buildSM g
+ #-}
+{-# RULES "foldrSM/augmentSM"
+ forall k z xs (g :: (m a -> t m a -> t m a) -> t m a -> t m a).
+ foldrSM k z (augmentSM g xs) = g k (foldrSM k z xs)
+ #-}
+{-# RULES "augmentSM/buildSM"
+ forall (g :: (m a -> t m a -> t m a) -> t m a -> t m a)
+ (h :: (m a -> t m a -> t m a) -> t m a -> t m a).
+ augmentSM g (buildSM h) = buildSM (\c n -> g c (h c n))
+ #-}
+-- Experimental foldrM/buildM
+-- | Lazy right fold with a monadic step function.
+{-# INLINE_NORMAL foldrM #-}
+foldrM :: IsStream t => (a -> m b -> m b) -> m b -> t m a -> m b
+foldrM step acc m = go m
+ where
+ go m1 =
+ let stop = acc
+ single a = step a acc
+ yieldk a r = step a (go r)
+ in foldStream defState yieldk single stop m1
+{-# INLINE_NORMAL foldrMKWith #-}
+ :: (State Stream m a
+ -> (a -> t m a -> m b)
+ -> (a -> m b)
+ -> m b
+ -> t m a
+ -> m b)
+ -> (a -> m b -> m b)
+ -> m b
+ -> ((a -> t m a -> m b) -> (a -> m b) -> m b -> m b)
+ -> m b
+foldrMKWith f step acc g = go g
+ where
+ go k =
+ let stop = acc
+ single a = step a acc
+ yieldk a r = step a (go (\yld sng stp -> f defState yld sng stp r))
+ in k yieldk single stop
+{-# RULES "foldrM/buildS"
+ forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a)
+ . foldrM k z (buildS g)
+ = g k z
+-- XXX in which case will foldrM/buildM fusion be useful?
+{-# RULES "foldrM/buildM"
+ forall step acc (g :: (forall r.
+ (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r
+ )).
+ foldrM step acc (buildM g) = foldrMKWith foldStream step acc g
+ #-}
+{-# RULES "foldrM/sharedM"
+ forall step acc (g :: (forall r.
+ (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r
+ )).
+ foldrM step acc (sharedM g) = foldrMKWith foldStreamShared step acc g
+ #-}
+-- Left fold
+-- | Strict left fold with an extraction function. Like the standard strict
+-- left fold, but applies a user supplied extraction function (the third
+-- argument) to the folded value at the end. This is designed to work with the
+-- @foldl@ library. The suffix @x@ is a mnemonic for extraction.
+-- Note that the accumulator is always evaluated including the initial value.
+{-# INLINE foldlx' #-}
+foldlx' :: forall t m a b x. (IsStream t, Monad m)
+ => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
+foldlx' step begin done m = get $ go m begin
+ where
+ {-# NOINLINE get #-}
+ get :: t m x -> m b
+ get m1 =
+ -- XXX we are not strictly evaluating the accumulator here. Is this
+ -- okay?
+ let single = return . done
+ -- XXX this is foldSingleton. why foldStreamShared?
+ in foldStreamShared undefined undefined single undefined m1
+ -- Note, this can be implemented by making a recursive call to "go",
+ -- however that is more expensive because of unnecessary recursion
+ -- that cannot be tail call optimized. Unfolding recursion explicitly via
+ -- continuations is much more efficient.
+ go :: t m a -> x -> t m x
+ go m1 !acc = mkStream $ \_ yld sng _ ->
+ let stop = sng acc
+ single a = sng $ step acc a
+ -- XXX this is foldNonEmptyStream
+ yieldk a r = foldStream defState yld sng undefined $
+ go r (step acc a)
+ in foldStream defState yieldk single stop m1
+-- | Strict left associative fold.
+{-# INLINE foldl' #-}
+foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b
+foldl' step begin = foldlx' step begin id
+-- Semigroup
+infixr 6 `serial`
+-- | Appends two streams sequentially, yielding all elements from the first
+-- stream, and then all elements from the second stream.
+-- >>> import Streamly.Prelude (serial)
+-- >>> stream1 = Stream.fromList [1,2]
+-- >>> stream2 = Stream.fromList [3,4]
+-- >>> Stream.toList $ stream1 `serial` stream2
+-- [1,2,3,4]
+-- This operation can be used to fold an infinite lazy container of streams.
+-- /Since: 0.2.0 ("Streamly")/
+-- @since 0.8.0
+{-# INLINE serial #-}
+serial :: IsStream t => t m a -> t m a -> t m a
+-- XXX This doubles the time of toNullAp benchmark, may not be fusing properly
+-- serial xs ys = augmentS (\c n -> foldrS c n xs) ys
+serial m1 m2 = go m1
+ where
+ go m = mkStream $ \st yld sng stp ->
+ let stop = foldStream st yld sng stp m2
+ single a = yld a m2
+ yieldk a r = yld a (go r)
+ in foldStream st yieldk single stop m
+-- join/merge/append streams depending on consM
+{-# INLINE conjoin #-}
+conjoin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
+conjoin xs ys = augmentSM (\c n -> foldrSM c n xs) ys
+instance Semigroup (Stream m a) where
+ (<>) = serial
+-- Monoid
+instance Monoid (Stream m a) where
+ mempty = nil
+ mappend = (<>)
+-- Functor
+-- Note eta expanded
+{-# INLINE_LATE mapFB #-}
+mapFB :: forall (t :: (Type -> Type) -> Type -> Type) b m a.
+ (b -> t m b -> t m b) -> (a -> b) -> a -> t m b -> t m b
+mapFB c f = \x ys -> c (f x) ys
+#undef Type
+{-# RULES
+"mapFB/mapFB" forall c f g. mapFB (mapFB c f) g = mapFB c (f . g)
+"mapFB/id" forall c. mapFB c (\x -> x) = c
+ #-}
+{-# INLINE map #-}
+map :: IsStream t => (a -> b) -> t m a -> t m b
+map f xs = buildS (\c n -> foldrS (mapFB c f) n xs)
+-- XXX This definition might potentially be more efficient, but the cost in the
+-- benchmark is dominated by unfoldrM cost so we cannot correctly determine
+-- differences in the mapping cost. We should perhaps deduct the cost of
+-- unfoldrM from the benchmarks and then compare.
+map f m = go m
+ where
+ go m1 =
+ mkStream $ \st yld sng stp ->
+ let single = sng . f
+ yieldk a r = yld (f a) (go r)
+ in foldStream (adaptState st) yieldk single stp m1
+{-# INLINE_LATE mapMFB #-}
+mapMFB :: Monad m => (m b -> t m b -> t m b) -> (a -> m b) -> m a -> t m b -> t m b
+mapMFB c f = \x ys -> c (x >>= f) ys
+{-# RULES
+ "mapMFB/mapMFB" forall c f g. mapMFB (mapMFB c f) g = mapMFB c (f >=> g)
+ #-}
+-- XXX These rules may never fire because pure/return type class rules will
+-- fire first.
+"mapMFB/pure" forall c. mapMFB c (\x -> pure x) = c
+"mapMFB/return" forall c. mapMFB c (\x -> return x) = c
+-- Be careful when modifying this, this uses a consM (|:) deliberately to allow
+-- other stream types to overload it.
+{-# INLINE mapM #-}
+mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
+mapM f = foldrSShared (\x xs -> f x `consM` xs) nil
+-- See note under map definition above.
+mapM f m = go m
+ where
+ go m1 = mkStream $ \st yld sng stp ->
+ let single a = f a >>= sng
+ yieldk a r = foldStreamShared st yld sng stp $ f a |: go r
+ in foldStream (adaptState st) yieldk single stp m1
+ -}
+-- This is experimental serial version supporting fusion.
+-- XXX what if we do not want to fuse two concurrent mapMs?
+-- XXX we can combine two concurrent mapM only if the SVar is of the same type
+-- So for now we use it only for serial streams.
+-- XXX fusion would be easier for monomoprhic stream types.
+-- {-# RULES "mapM serial" mapM = mapMSerial #-}
+{-# INLINE mapMSerial #-}
+mapMSerial :: MonadAsync m => (a -> m b) -> Stream m a -> Stream m b
+mapMSerial f xs = buildSM (\c n -> foldrSMShared (mapMFB c f) n xs)
+-- XXX in fact use the Stream type everywhere and only use polymorphism in the
+-- high level modules/prelude.
+instance Monad m => Functor (Stream m) where
+ fmap = map
+-- Transformers
+instance MonadTrans Stream where
+ {-# INLINE lift #-}
+ lift = yieldM
+-- Nesting
+-- | Detach a stream from an SVar
+{-# INLINE unShare #-}
+unShare :: IsStream t => t m a -> t m a
+unShare x = mkStream $ \st yld sng stp ->
+ foldStream st yld sng stp x
+-- XXX the function stream and value stream can run in parallel
+{-# INLINE apWith #-}
+ :: IsStream t
+ => (t m b -> t m b -> t m b)
+ -> t m (a -> b)
+ -> t m a
+ -> t m b
+apWith par fstream stream = go1 fstream
+ where
+ go1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single f = foldShared $ unShare (go2 f stream)
+ yieldk f r = foldShared $ unShare (go2 f stream) `par` go1 r
+ in foldStream (adaptState st) yieldk single stp m
+ go2 f m =
+ mkStream $ \st yld sng stp ->
+ let single a = sng (f a)
+ yieldk a r = yld (f a) (go2 f r)
+ in foldStream (adaptState st) yieldk single stp m
+{-# INLINE apSerial #-}
+ :: IsStream t
+ => t m (a -> b)
+ -> t m a
+ -> t m b
+apSerial fstream stream = go1 fstream
+ where
+ go1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single f = foldShared $ go3 f stream
+ yieldk f r = foldShared $ go2 f r stream
+ in foldStream (adaptState st) yieldk single stp m
+ go2 f r1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ stop = foldShared $ go1 r1
+ single a = yld (f a) (go1 r1)
+ yieldk a r = yld (f a) (go2 f r1 r)
+ in foldStream (adaptState st) yieldk single stop m
+ go3 f m =
+ mkStream $ \st yld sng stp ->
+ let single a = sng (f a)
+ yieldk a r = yld (f a) (go3 f r)
+ in foldStream (adaptState st) yieldk single stp m
+{-# INLINE apSerialDiscardFst #-}
+ :: IsStream t
+ => t m a
+ -> t m b
+ -> t m b
+apSerialDiscardFst fstream stream = go1 fstream
+ where
+ go1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single _ = foldShared $ stream
+ yieldk _ r = foldShared $ go2 r stream
+ in foldStream (adaptState st) yieldk single stp m
+ go2 r1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ stop = foldShared $ go1 r1
+ single a = yld a (go1 r1)
+ yieldk a r = yld a (go2 r1 r)
+ in foldStream st yieldk single stop m
+{-# INLINE apSerialDiscardSnd #-}
+ :: IsStream t
+ => t m a
+ -> t m b
+ -> t m a
+apSerialDiscardSnd fstream stream = go1 fstream
+ where
+ go1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single f = foldShared $ go3 f stream
+ yieldk f r = foldShared $ go2 f r stream
+ in foldStream st yieldk single stp m
+ go2 f r1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ stop = foldShared $ go1 r1
+ single _ = yld f (go1 r1)
+ yieldk _ r = yld f (go2 f r1 r)
+ in foldStream (adaptState st) yieldk single stop m
+ go3 f m =
+ mkStream $ \st yld sng stp ->
+ let single _ = sng f
+ yieldk _ r = yld f (go3 f r)
+ in foldStream (adaptState st) yieldk single stp m
+-- XXX This is just concatMapBy with arguments flipped. We need to keep this
+-- instead of using a concatMap style definition because the bind
+-- implementation in Async and WAsync streams show significant perf degradation
+-- if the argument order is changed.
+{-# INLINE bindWith #-}
+ :: IsStream t
+ => (t m b -> t m b -> t m b)
+ -> t m a
+ -> (a -> t m b)
+ -> t m b
+bindWith par m1 f = go m1
+ where
+ go m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single a = foldShared $ unShare (f a)
+ yieldk a r = foldShared $ unShare (f a) `par` go r
+ in foldStream (adaptState st) yieldk single stp m
+-- XXX express in terms of foldrS?
+-- XXX can we use a different stream type for the generated stream being
+-- falttened so that we can combine them differently and keep the resulting
+-- stream different?
+-- XXX do we need specialize to IO?
+-- XXX can we optimize when c and a are same, by removing the forall using
+-- rewrite rules with type applications?
+-- | Perform a 'concatMap' using a specified concat strategy. The first
+-- argument specifies a merge or concat function that is used to merge the
+-- streams generated by the map function. For example, the concat function
+-- could be 'serial', 'parallel', 'async', 'ahead' or any other zip or merge
+-- function.
+-- @since 0.7.0
+{-# INLINE concatMapBy #-}
+ :: IsStream t
+ => (t m b -> t m b -> t m b)
+ -> (a -> t m b)
+ -> t m a
+ -> t m b
+concatMapBy par f xs = bindWith par xs f
+{-# INLINE concatMap #-}
+concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b
+concatMap f m = fromStream $
+ concatMapBy serial
+ (\a -> adapt $ toStream $ f a)
+ (adapt $ toStream m)
+-- Fused version.
+-- XXX This fuses but when the stream is nil this performs poorly.
+-- The filterAllOut benchmark degrades. Need to investigate and fix that.
+{-# INLINE concatMap #-}
+concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b
+concatMap f xs = buildS
+ (\c n -> foldrS (\x b -> foldrS c b (f x)) n xs)
+-- Stream polymorphic concatMap implementation
+-- XXX need to use buildSM/foldrSMShared for parallel behavior
+-- XXX unShare seems to degrade the fused performance
+{-# INLINE_EARLY concatMap_ #-}
+concatMap_ :: IsStream t => (a -> t m b) -> t m a -> t m b
+concatMap_ f xs = buildS
+ (\c n -> foldrSShared (\x b -> foldrSShared c b (unShare $ f x)) n xs)
+-- | See 'Streamly.Internal.Data.Stream.IsStream.concatPairsWith' for
+-- documentation.
+{-# INLINE concatPairsWith #-}
+ :: IsStream t
+ => (t m b -> t m b -> t m b)
+ -> (a -> t m b)
+ -> t m a
+ -> t m b
+concatPairsWith combine f = go Nothing
+ where
+ go Nothing stream =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single a = foldShared $ unShare (f a)
+ yieldk a r = foldShared $ go (Just a) r
+ in foldStream (adaptState st) yieldk single stp stream
+ go (Just a1) stream =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ stop = foldShared $ unShare (f a1)
+ single a = foldShared $ unShare (f a1) `combine` f a
+ yieldk a r =
+ foldShared
+ $ concatPairsWith combine
+ (\(x,y) -> combine (unShare x) y)
+ $ (f a1, f a) `cons` makePairs Nothing r
+ in foldStream (adaptState st) yieldk single stop stream
+ makePairs Nothing stream =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single a = sng (f a, nil)
+ yieldk a r = foldShared $ makePairs (Just a) r
+ in foldStream (adaptState st) yieldk single stp stream
+ makePairs (Just a1) stream =
+ mkStream $ \st yld sng _ ->
+ let stop = sng (f a1, nil)
+ single a = sng (f a1, f a)
+ yieldk a r = yld (f a1, f a) (makePairs Nothing r)
+ in foldStream (adaptState st) yieldk single stop stream
+instance Monad m => Applicative (Stream m) where
+ {-# INLINE pure #-}
+ pure = yield
+ {-# INLINE (<*>) #-}
+ (<*>) = ap
+-- NOTE: even though concatMap for StreamD is 3x faster compared to StreamK,
+-- the monad instance of StreamD is slower than StreamK after foldr/build
+-- fusion.
+instance Monad m => Monad (Stream m) where
+ {-# INLINE return #-}
+ return = pure
+ {-# INLINE (>>=) #-}
+ (>>=) = flip concatMap
+-- Like concatMap but generates stream using an unfold function. Similar to
+-- unfoldMany but for StreamK.
+concatUnfoldr :: IsStream t
+ => (b -> t m (Maybe (a, b))) -> t m b -> t m a
+concatUnfoldr = undefined
+{-# INLINE unfoldrM #-}
+unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
+unfoldrM step = go
+ where
+ go s = sharedM $ \yld _ stp -> do
+ r <- step s
+ case r of
+ Just (a, b) -> yld a (go b)
+ Nothing -> stp
+{-# INLINE drain #-}
+drain :: (Monad m, IsStream t) => t m a -> m ()
+drain = foldrM (\_ xs -> xs) (return ())
@@ -0,0 +1,65 @@
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE ExistentialQuantification #-}
+module Unfold
+ ( Unfold (..)
+ , supplyFirst
+ , many
+ , lmap
+ )
+import Step (Step(..))
+#if defined(FUSION_PLUGIN)
+import Fusion.Plugin.Types (Fuse(..))
+data Unfold m a b =
+ -- | @Unfold step inject@
+ forall s. Unfold (s -> m (Step s b)) (a -> m s)
+{-# INLINE [1] lmap #-}
+lmap :: (a -> c) -> Unfold m c b -> Unfold m a b
+lmap f (Unfold ustep uinject) = Unfold ustep (uinject Prelude.. f)
+{-# INLINE [1] supplyFirst #-}
+supplyFirst :: a -> Unfold m (a, b) c -> Unfold m b c
+supplyFirst a = lmap (a, )
+#if defined(FUSION_PLUGIN)
+{-# ANN type ConcatState Fuse #-}
+data ConcatState s1 s2 = ConcatOuter s1 | ConcatInner s1 s2
+-- | Apply the second unfold to each output element of the first unfold and
+-- flatten the output in a single stream.
+-- /Since: 0.8.0/
+{-# INLINE [1] many #-}
+many :: Monad m => Unfold m a b -> Unfold m b c -> Unfold m a c
+many (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject
+ where
+ inject x = do
+ s <- inject1 x
+ return $ ConcatOuter s
+ {-# INLINE [0] step #-}
+ step (ConcatOuter st) = do
+ r <- step1 st
+ case r of
+ Yield x s -> do
+ innerSt <- inject2 x
+ return $ Skip (ConcatInner s innerSt)
+ Skip s -> return $ Skip (ConcatOuter s)
+ Stop -> return Stop
+ step (ConcatInner ost ist) = do
+ r <- step2 ist
+ return $ case r of
+ Yield x s -> Yield x (ConcatInner ost s)
+ Skip s -> Skip (ConcatInner ost s)
+ Stop -> Skip (ConcatOuter ost)
@@ -0,0 +1,27 @@
+-- We use fromStreamK/toStreamK to convert the direct style stream to CPS
+-- style. In the first phase we try fusing the fromStreamK/toStreamK using:
+-- {-# RULES "fromStreamK/toStreamK fusion"
+-- forall s. toStreamK (fromStreamK s) = s #-}
+-- If for some reason some of the operations could not be fused then we have
+-- fallback rules in the second phase. For example:
+-- {-# INLINE_EARLY unfoldr #-}
+-- unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a
+-- unfoldr step seed = fromStreamS (S.unfoldr step seed)
+-- {-# RULES "unfoldr fallback to StreamK" [1]
+-- forall a b. S.toStreamK (S.unfoldr a b) = K.unfoldr a b #-}```
+-- Then, fromStreamK/toStreamK are inlined in the last phase:
+-- {-# INLINE_LATE toStreamK #-}
+-- toStreamK :: Monad m => Stream m a -> K.Stream m a```
+-- The fallback rules make sure that if we could not fuse the direct style
+-- operations then better use the CPS style operation, because unfused direct
+-- style would have worse performance than the CPS style ops.
@@ -10,3 +10,4 @@ test('T12790', [only_ways(['normal']), req_profiling], compile, ['-O -prof'])
test('T14931', [when(opsys('mingw32'), skip), only_ways(['normal']), req_profiling],
makefile_test, ['T14931'])
test('T15108', [only_ways(['normal']), req_profiling], compile, ['-O -prof -fprof-auto'])
+test('T19894', [only_ways(['normal']), req_profiling, extra_files(['T19894'])], multimod_compile, ['Main', '-v0 -O2 -prof -fprof-auto -iT19894'])