summaryrefslogtreecommitdiff
path: root/testsuite/tests/profiling
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/tests/profiling')
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/Array.hs74
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/Fold.hs277
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/Handle.hs80
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/MArray.hs372
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/Main.hs26
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/Operations.hs221
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/Ring.hs254
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/Serial.hs8
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/Step.hs44
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/StreamD.hs1079
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/StreamK.hs1245
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/Unfold.hs65
-rw-r--r--testsuite/tests/profiling/should_compile/T19894/inline.hs27
-rw-r--r--testsuite/tests/profiling/should_compile/all.T1
14 files changed, 3773 insertions, 0 deletions
diff --git a/testsuite/tests/profiling/should_compile/T19894/Array.hs b/testsuite/tests/profiling/should_compile/T19894/Array.hs
new file mode 100644
index 0000000000..75cad1fcef
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Array.hs
@@ -0,0 +1,74 @@
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE RecordWildCards #-}
+module Array
+ (
+ Array(..)
+ , fromList
+ , read
+ , length
+ , writeNUnsafe
+ , MA.unsafeInlineIO
+ , MA.memcmp
+ , unsafeFreezeWithShrink
+ , foldl'
+ , unsafeIndexIO
+ )
+
+where
+
+import Control.Monad.IO.Class (MonadIO(..))
+import Foreign.Storable (Storable(..))
+import GHC.ForeignPtr (ForeignPtr(..))
+import GHC.IO (unsafePerformIO)
+import GHC.Ptr (Ptr(..))
+import Unfold (Unfold(..))
+import Fold (Fold(..))
+import qualified MArray as MA
+import qualified Unfold as UF
+import Prelude hiding (length, read)
+
+data Array a =
+ Array
+ { aStart :: {-# UNPACK #-} !(ForeignPtr a) -- first address
+ , aEnd :: {-# UNPACK #-} !(Ptr a) -- first unused addres
+ }
+
+{-# INLINE unsafeFreeze #-}
+unsafeFreeze :: MA.Array a -> Array a
+unsafeFreeze (MA.Array as ae _) = Array as ae
+
+{-# INLINABLE fromList #-}
+fromList :: Storable a => [a] -> Array a
+fromList xs = unsafeFreeze $ MA.fromList xs
+
+{-# INLINE [1] writeNUnsafe #-}
+writeNUnsafe :: forall m a. (MonadIO m, Storable a)
+ => Int -> Fold m a (Array a)
+writeNUnsafe n = unsafeFreeze <$> MA.writeNUnsafe n
+
+{-# INLINE unsafeThaw #-}
+unsafeThaw :: Array a -> MA.Array a
+unsafeThaw (Array as ae) = MA.Array as ae ae
+
+{-# INLINE length #-}
+length :: forall a. Storable a => Array a -> Int
+length arr = MA.length (unsafeThaw arr)
+
+{-# INLINE [1] read #-}
+read :: forall m a. (Monad m, Storable a) => Unfold m (Array a) a
+read = UF.lmap unsafeThaw MA.read
+
+{-# INLINE unsafeFreezeWithShrink #-}
+unsafeFreezeWithShrink :: Storable a => MA.Array a -> Array a
+unsafeFreezeWithShrink arr = unsafePerformIO $ do
+ MA.Array as ae _ <- MA.shrinkToFit arr
+ return $ Array as ae
+
+{-# INLINE [1] foldl' #-}
+foldl' :: forall a b. Storable a => (b -> a -> b) -> b -> Array a -> b
+foldl' f z arr = MA.foldl' f z (unsafeThaw arr)
+
+{-# INLINE [1] unsafeIndexIO #-}
+unsafeIndexIO :: forall a. Storable a => Array a -> Int -> IO a
+unsafeIndexIO arr = MA.unsafeIndexIO (unsafeThaw arr)
+
diff --git a/testsuite/tests/profiling/should_compile/T19894/Fold.hs b/testsuite/tests/profiling/should_compile/T19894/Fold.hs
new file mode 100644
index 0000000000..7683b064f4
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Fold.hs
@@ -0,0 +1,277 @@
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE ExistentialQuantification #-}
+
+module Fold
+ (
+ -- * Types
+ Step(..)
+ , Fold (..)
+ , sum
+ , chunksOf
+ , drain
+ , drainBy
+ )
+where
+
+import Data.Bifunctor (Bifunctor(..))
+#if defined(FUSION_PLUGIN)
+import Fusion.Plugin.Types (Fuse(..))
+#endif
+import Prelude hiding (sum, take)
+
+------------------------------------------------------------------------------
+-- Step of a fold
+------------------------------------------------------------------------------
+
+-- The Step functor around b allows expressing early termination like a right
+-- fold. Traditional list right folds use function composition and laziness to
+-- terminate early whereas we use data constructors. It allows stream fusion in
+-- contrast to the foldr/build fusion when composing with functions.
+
+-- | Represents the result of the @step@ of a 'Fold'. 'Partial' returns an
+-- intermediate state of the fold, the fold step can be called again with the
+-- state or the driver can use @extract@ on the state to get the result out.
+-- 'Done' returns the final result and the fold cannot be driven further.
+--
+-- /Pre-release/
+--
+#if defined(FUSION_PLUGIN)
+{-# ANN type Step Fuse #-}
+#endif
+data Step s b
+ = Partial !s
+ | Done !b
+
+-- | 'first' maps over 'Partial' and 'second' maps over 'Done'.
+--
+instance Bifunctor Step where
+ {-# INLINE bimap #-}
+ bimap f _ (Partial a) = Partial (f a)
+ bimap _ g (Done b) = Done (g b)
+
+ {-# INLINE first #-}
+ first f (Partial a) = Partial (f a)
+ first _ (Done x) = Done x
+
+ {-# INLINE second #-}
+ second _ (Partial x) = Partial x
+ second f (Done a) = Done (f a)
+
+-- | 'fmap' maps over 'Done'.
+--
+-- @
+-- fmap = 'second'
+-- @
+--
+instance Functor (Step s) where
+ {-# INLINE fmap #-}
+ fmap = second
+
+{-
+-- | Map a monadic function over the result @b@ in @Step s b@.
+--
+-- /Internal/
+{-# INLINE mapMStep #-}
+mapMStep :: Applicative m => (a -> m b) -> Step s a -> m (Step s b)
+mapMStep f res =
+ case res of
+ Partial s -> pure $ Partial s
+ Done b -> Done <$> f b
+-}
+
+------------------------------------------------------------------------------
+-- The Fold type
+------------------------------------------------------------------------------
+
+-- | The type @Fold m a b@ having constructor @Fold step initial extract@
+-- represents a fold over an input stream of values of type @a@ to a final
+-- value of type @b@ in 'Monad' @m@.
+--
+-- The fold uses an intermediate state @s@ as accumulator, the type @s@ is
+-- internal to the specific fold definition. The initial value of the fold
+-- state @s@ is returned by @initial@. The @step@ function consumes an input
+-- and either returns the final result @b@ if the fold is done or the next
+-- intermediate state (see 'Step'). At any point the fold driver can extract
+-- the result from the intermediate state using the @extract@ function.
+--
+-- NOTE: The constructor is not yet exposed via exposed modules, smart
+-- constructors are provided to create folds. If you think you need the
+-- constructor of this type please consider using the smart constructors in
+-- "Streamly.Internal.Data.Fold' instead.
+--
+-- /since 0.8.0 (type changed)/
+--
+-- @since 0.7.0
+
+data Fold m a b =
+ -- | @Fold @ @ step @ @ initial @ @ extract@
+ forall s. Fold (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b)
+
+instance Functor m => Functor (Fold m a) where
+ {-# INLINE fmap #-}
+ fmap f (Fold step1 initial1 extract) = Fold step initial (fmap2 f extract)
+
+ where
+
+ initial = fmap2 f initial1
+ step s b = fmap2 f (step1 s b)
+ fmap2 g = fmap (fmap g)
+
+{-# INLINABLE lmapM #-}
+lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r
+lmapM f (Fold step begin done) = Fold step' begin done
+ where
+ step' x a = f a >>= step x
+
+-- | Make a fold from a left fold style pure step function and initial value of
+-- the accumulator.
+--
+-- If your 'Fold' returns only 'Partial' (i.e. never returns a 'Done') then you
+-- can use @foldl'*@ constructors.
+--
+-- A fold with an extract function can be expressed using fmap:
+--
+-- @
+-- mkfoldlx :: Monad m => (s -> a -> s) -> s -> (s -> b) -> Fold m a b
+-- mkfoldlx step initial extract = fmap extract (foldl' step initial)
+-- @
+--
+-- See also: "Streamly.Prelude.foldl'"
+--
+-- @since 0.8.0
+--
+{-# INLINE foldl' #-}
+foldl' :: Monad m => (b -> a -> b) -> b -> Fold m a b
+foldl' step initial =
+ Fold
+ (\s a -> return $ Partial $ step s a)
+ (return (Partial initial))
+ return
+
+-- | Determine the sum of all elements of a stream of numbers. Returns additive
+-- identity (@0@) when the stream is empty. Note that this is not numerically
+-- stable for floating point numbers.
+--
+-- > sum = fmap getSum $ Fold.foldMap Sum
+--
+-- @since 0.7.0
+{-# INLINE sum #-}
+sum :: (Monad m, Num a) => Fold m a a
+sum = foldl' (+) 0
+
+data Tuple' a b = Tuple' !a !b deriving Show
+
+{-# INLINE take #-}
+take :: Monad m => Int -> Fold m a b -> Fold m a b
+take n (Fold fstep finitial fextract) = Fold step initial extract
+
+ where
+
+ initial = do
+ res <- finitial
+ case res of
+ Partial s ->
+ if n > 0
+ then return $ Partial $ Tuple' 0 s
+ else Done <$> fextract s
+ Done b -> return $ Done b
+
+ step (Tuple' i r) a = do
+ res <- fstep r a
+ case res of
+ Partial sres -> do
+ let i1 = i + 1
+ s1 = Tuple' i1 sres
+ if i1 < n
+ then return $ Partial s1
+ else Done <$> fextract sres
+ Done bres -> return $ Done bres
+
+ extract (Tuple' _ r) = fextract r
+
+#if defined(FUSION_PLUGIN)
+{-# ANN type ManyState Fuse #-}
+#endif
+data ManyState s1 s2
+ = ManyFirst !s1 !s2
+ | ManyLoop !s1 !s2
+
+-- | Collect zero or more applications of a fold. @many split collect@ applies
+-- the @split@ fold repeatedly on the input stream and accumulates zero or more
+-- fold results using @collect@.
+--
+-- >>> two = Fold.take 2 Fold.toList
+-- >>> twos = Fold.many two Fold.toList
+-- >>> Stream.fold twos $ Stream.fromList [1..10]
+-- [[1,2],[3,4],[5,6],[7,8],[9,10]]
+--
+-- Stops when @collect@ stops.
+--
+-- See also: "Streamly.Prelude.concatMap", "Streamly.Prelude.foldMany"
+--
+-- @since 0.8.0
+--
+{-# INLINE many #-}
+many :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
+many (Fold sstep sinitial sextract) (Fold cstep cinitial cextract) =
+ Fold step initial extract
+
+ where
+
+ -- cs = collect state
+ -- ss = split state
+ -- cres = collect state result
+ -- sres = split state result
+ -- cb = collect done
+ -- sb = split done
+
+ -- Caution! There is mutual recursion here, inlining the right functions is
+ -- important.
+
+ {-# INLINE handleSplitStep #-}
+ handleSplitStep branch cs sres =
+ case sres of
+ Partial ss1 -> return $ Partial $ branch ss1 cs
+ Done sb -> runCollector ManyFirst cs sb
+
+ {-# INLINE handleCollectStep #-}
+ handleCollectStep branch cres =
+ case cres of
+ Partial cs -> do
+ sres <- sinitial
+ handleSplitStep branch cs sres
+ Done cb -> return $ Done cb
+
+ -- Do not inline this
+ runCollector branch cs sb = cstep cs sb >>= handleCollectStep branch
+
+ initial = cinitial >>= handleCollectStep ManyFirst
+
+ {-# INLINE step_ #-}
+ step_ ss cs a = do
+ sres <- sstep ss a
+ handleSplitStep ManyLoop cs sres
+
+ {-# INLINE step #-}
+ step (ManyFirst ss cs) a = step_ ss cs a
+ step (ManyLoop ss cs) a = step_ ss cs a
+
+ extract (ManyFirst _ cs) = cextract cs
+ extract (ManyLoop ss cs) = do
+ sb <- sextract ss
+ cres <- cstep cs sb
+ case cres of
+ Partial s -> cextract s
+ Done b -> return b
+
+{-# INLINE chunksOf #-}
+chunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c
+chunksOf n split = many (take n split)
+
+{-# INLINABLE drain #-}
+drain :: Monad m => Fold m a ()
+drain = foldl' (\_ _ -> ()) ()
+
+{-# INLINABLE drainBy #-}
+drainBy :: Monad m => (a -> m b) -> Fold m a ()
+drainBy f = lmapM f drain
diff --git a/testsuite/tests/profiling/should_compile/T19894/Handle.hs b/testsuite/tests/profiling/should_compile/T19894/Handle.hs
new file mode 100644
index 0000000000..d1222312fd
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Handle.hs
@@ -0,0 +1,80 @@
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE RecordWildCards #-}
+module Handle
+ (
+ write
+ , read
+ )
+
+where
+
+import Control.Monad.IO.Class (MonadIO(..))
+import Data.Word (Word8)
+import Foreign.Storable (Storable(..))
+import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
+import Foreign.ForeignPtr (withForeignPtr)
+import Foreign.Ptr (plusPtr, minusPtr)
+import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
+import System.IO (Handle, hGetBufSome, hPutBuf)
+import Unfold (Unfold(..))
+import Fold (Fold(..))
+import Array (Array(..))
+import qualified MArray as MA
+import qualified Fold as FL
+import qualified Unfold as UF
+import qualified StreamD as D
+import qualified Array as A
+import Prelude hiding (length, read)
+
+{-# INLINABLE writeArray #-}
+writeArray :: Storable a => Handle -> Array a -> IO ()
+writeArray _ arr | A.length arr == 0 = return ()
+writeArray h Array{..} = withForeignPtr aStart $ \p -> hPutBuf h p aLen
+ where
+ aLen =
+ let p = unsafeForeignPtrToPtr aStart
+ in aEnd `minusPtr` p
+
+{-# INLINE writeChunks #-}
+writeChunks :: (MonadIO m, Storable a) => Handle -> Fold m (Array a) ()
+writeChunks h = FL.drainBy (liftIO . writeArray h)
+
+{-# INLINE writeWithBufferOf #-}
+writeWithBufferOf :: MonadIO m => Int -> Handle -> Fold m Word8 ()
+writeWithBufferOf n h = FL.chunksOf n (A.writeNUnsafe n) (writeChunks h)
+
+{-# INLINE write #-}
+write :: MonadIO m => Handle -> Fold m Word8 ()
+write = writeWithBufferOf MA.defaultChunkSize
+
+{-# INLINABLE readArrayUpto #-}
+readArrayUpto :: Int -> Handle -> IO (Array Word8)
+readArrayUpto size h = do
+ ptr <- mallocPlainForeignPtrBytes size
+ -- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
+ withForeignPtr ptr $ \p -> do
+ n <- hGetBufSome h p size
+ -- XXX shrink only if the diff is significant
+ return $
+ A.unsafeFreezeWithShrink $
+ MA.mutableArray ptr (p `plusPtr` n) (p `plusPtr` size)
+
+{-# INLINE [1] readChunksWithBufferOf #-}
+readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Handle) (Array Word8)
+readChunksWithBufferOf = Unfold step return
+ where
+ {-# INLINE [0] step #-}
+ step (size, h) = do
+ arr <- liftIO $ readArrayUpto size h
+ return $
+ case A.length arr of
+ 0 -> D.Stop
+ _ -> D.Yield arr (size, h)
+
+{-# INLINE readWithBufferOf #-}
+readWithBufferOf :: MonadIO m => Unfold m (Int, Handle) Word8
+readWithBufferOf = UF.many readChunksWithBufferOf A.read
+
+{-# INLINE read #-}
+read :: MonadIO m => Unfold m Handle Word8
+read = UF.supplyFirst MA.defaultChunkSize readWithBufferOf
diff --git a/testsuite/tests/profiling/should_compile/T19894/MArray.hs b/testsuite/tests/profiling/should_compile/T19894/MArray.hs
new file mode 100644
index 0000000000..f307bb344d
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/MArray.hs
@@ -0,0 +1,372 @@
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE MagicHash #-}
+{-# LANGUAGE UnboxedTuples #-}
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE ExistentialQuantification #-}
+module MArray
+ (
+ Array (..)
+ , writeNUnsafe
+ , length
+ , fromList
+ , fromListN
+ , defaultChunkSize
+ , read
+ , shrinkToFit
+ , mutableArray
+ , unsafeInlineIO
+ , memcmp
+ , foldl'
+ , unsafeIndexIO
+ )
+
+where
+
+import Control.Exception (assert)
+import Control.Monad (when, void)
+import Data.Functor.Identity (runIdentity)
+import Data.Word (Word8)
+import Foreign.C.Types (CSize(..), CInt(..))
+import Fold (Fold(..))
+import Control.Monad.IO.Class (MonadIO(..))
+import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
+import Foreign.ForeignPtr (withForeignPtr, touchForeignPtr)
+import Foreign.Ptr (plusPtr, minusPtr, castPtr)
+import Foreign.Storable (Storable(..))
+import GHC.Base (realWorld#)
+import GHC.ForeignPtr (ForeignPtr(..))
+import GHC.IO (IO(IO), unsafePerformIO)
+import GHC.Ptr (Ptr(..))
+import Step (Step(..))
+import Unfold (Unfold(..))
+import qualified GHC.ForeignPtr as GHC
+import qualified Fold as FL
+import qualified StreamD as D
+import qualified StreamK as K
+import Prelude hiding (length, read)
+
+data Array a =
+ Array
+ { aStart :: {-# UNPACK #-} !(ForeignPtr a) -- ^ first address
+ , aEnd :: {-# UNPACK #-} !(Ptr a) -- ^ first unused address
+ , aBound :: {-# UNPACK #-} !(Ptr a) -- ^ first address beyond allocated memory
+ }
+
+{-# INLINE mutableArray #-}
+mutableArray ::
+ ForeignPtr a -> Ptr a -> Ptr a -> Array a
+mutableArray = Array
+
+data ArrayUnsafe a = ArrayUnsafe
+ {-# UNPACK #-} !(ForeignPtr a) -- first address
+ {-# UNPACK #-} !(Ptr a) -- first unused address
+
+-- | allocate a new array using the provided allocator function.
+{-# INLINE newArrayAlignedAllocWith #-}
+newArrayAlignedAllocWith :: forall a. Storable a
+ => (Int -> Int -> IO (ForeignPtr a)) -> Int -> Int -> IO (Array a)
+newArrayAlignedAllocWith alloc alignSize count = do
+ let size = count * sizeOf (undefined :: a)
+ fptr <- alloc size alignSize
+ let p = unsafeForeignPtrToPtr fptr
+ return $ Array
+ { aStart = fptr
+ , aEnd = p
+ , aBound = p `plusPtr` size
+ }
+
+{-# INLINE mallocForeignPtrAlignedBytes #-}
+mallocForeignPtrAlignedBytes :: Int -> Int -> IO (GHC.ForeignPtr a)
+mallocForeignPtrAlignedBytes =
+ GHC.mallocPlainForeignPtrAlignedBytes
+
+{-# INLINE newArrayAligned #-}
+newArrayAligned :: forall a. Storable a => Int -> Int -> IO (Array a)
+newArrayAligned = newArrayAlignedAllocWith mallocForeignPtrAlignedBytes
+
+{-# INLINE newArray #-}
+newArray :: forall a. Storable a => Int -> IO (Array a)
+newArray = newArrayAligned (alignment (undefined :: a))
+
+-- | Like 'writeN' but does not check the array bounds when writing. The fold
+-- driver must not call the step function more than 'n' times otherwise it will
+-- corrupt the memory and crash. This function exists mainly because any
+-- conditional in the step function blocks fusion causing 10x performance
+-- slowdown.
+--
+-- @since 0.7.0
+{-# INLINE [1] writeNUnsafe #-}
+writeNUnsafe :: forall m a. (MonadIO m, Storable a)
+ => Int -> Fold m a (Array a)
+writeNUnsafe n = Fold step initial extract
+
+ where
+
+ initial = do
+ (Array start end _) <- liftIO $ newArray (max n 0)
+ return $ FL.Partial $ ArrayUnsafe start end
+
+ step (ArrayUnsafe start end) x = do
+ liftIO $ poke end x
+ return
+ $ FL.Partial
+ $ ArrayUnsafe start (end `plusPtr` sizeOf (undefined :: a))
+
+ extract (ArrayUnsafe start end) = return $ Array start end end -- liftIO . shrinkToFit
+
+{-# INLINE byteLength #-}
+byteLength :: Array a -> Int
+byteLength Array{..} =
+ let p = unsafeForeignPtrToPtr aStart
+ len = aEnd `minusPtr` p
+ in assert (len >= 0) len
+
+-- | /O(1)/ Get the length of the array i.e. the number of elements in the
+-- array.
+--
+-- @since 0.7.0
+{-# INLINE length #-}
+length :: forall a. Storable a => Array a -> Int
+length arr = byteLength arr `div` sizeOf (undefined :: a)
+
+{-# INLINE [1] fromStreamDN #-}
+fromStreamDN :: forall m a. (MonadIO m, Storable a)
+ => Int -> D.Stream m a -> m (Array a)
+fromStreamDN limit str = do
+ arr <- liftIO $ newArray limit
+ end <- D.foldlM' fwrite (return $ aEnd arr) $ D.take limit str
+ return $ arr {aEnd = end}
+
+ where
+
+ fwrite ptr x = do
+ liftIO $ poke ptr x
+ return $ ptr `plusPtr` sizeOf (undefined :: a)
+
+{-# INLINABLE fromListN #-}
+fromListN :: Storable a => Int -> [a] -> Array a
+fromListN n xs = unsafePerformIO $ fromStreamDN n $ D.fromList xs
+
+data GroupState s start end bound
+ = GroupStart s
+ | GroupBuffer s start end bound
+ | GroupYield start end bound (GroupState s start end bound)
+ | GroupFinish
+
+-- | @arraysOf n stream@ groups the input stream into a stream of
+-- arrays of size n.
+--
+-- @arraysOf n = StreamD.foldMany (Array.writeN n)@
+--
+-- /Pre-release/
+{-# INLINE [1] arraysOf #-}
+arraysOf :: forall m a. (MonadIO m, Storable a)
+ => Int -> D.Stream m a -> D.Stream m (Array a)
+-- XXX the idiomatic implementation leads to large regression in the D.reverse'
+-- benchmark. It seems it has difficulty producing optimized code when
+-- converting to StreamK. Investigate GHC optimizations.
+-- arraysOf n = D.foldMany (writeN n)
+arraysOf n (D.Stream step state) =
+ D.Stream step' (GroupStart state)
+
+ where
+
+ {-# INLINE [0] step' #-}
+ step' _ (GroupStart st) = do
+ when (n <= 0) $
+ -- XXX we can pass the module string from the higher level API
+ error $ "Streamly.Internal.Data.Array.Foreign.Mut.Type.fromStreamDArraysOf: the size of "
+ ++ "arrays [" ++ show n ++ "] must be a natural number"
+ Array start end bound <- liftIO $ newArray n
+ return $ D.Skip (GroupBuffer st start end bound)
+
+ step' gst (GroupBuffer st start end bound) = do
+ r <- step (K.adaptState gst) st
+ case r of
+ D.Yield x s -> do
+ liftIO $ poke end x
+ let end' = end `plusPtr` sizeOf (undefined :: a)
+ return $
+ if end' >= bound
+ then D.Skip (GroupYield start end' bound (GroupStart s))
+ else D.Skip (GroupBuffer s start end' bound)
+ D.Skip s -> return $ D.Skip (GroupBuffer s start end bound)
+ D.Stop -> return $ D.Skip (GroupYield start end bound GroupFinish)
+
+ step' _ (GroupYield start end bound next) =
+ return $ D.Yield (Array start end bound) next
+
+ step' _ GroupFinish = return D.Stop
+
+allocOverhead :: Int
+allocOverhead = 2 * sizeOf (undefined :: Int)
+
+mkChunkSize :: Int -> Int
+mkChunkSize n = let size = n - allocOverhead in max size 0
+
+mkChunkSizeKB :: Int -> Int
+mkChunkSizeKB n = mkChunkSize (n * k)
+ where k = 1024
+
+defaultChunkSize :: Int
+defaultChunkSize = mkChunkSizeKB 32
+
+{-# INLINE bufferChunks #-}
+bufferChunks :: (MonadIO m, Storable a) =>
+ D.Stream m a -> m (K.Stream m (Array a))
+bufferChunks m = D.foldr K.cons K.nil $ arraysOf defaultChunkSize m
+
+data Producer m a b =
+ -- | @Producer step inject extract@
+ forall s. Producer (s -> m (Step s b)) (a -> m s) (s -> m a)
+
+{-# INLINE unsafeInlineIO #-}
+unsafeInlineIO :: IO a -> a
+unsafeInlineIO (IO m) = case m realWorld# of (# _, r #) -> r
+
+data ReadUState a = ReadUState
+ {-# UNPACK #-} !(ForeignPtr a) -- foreign ptr with end of array pointer
+ {-# UNPACK #-} !(Ptr a) -- current pointer
+
+-- | Resumable unfold of an array.
+--
+{-# INLINE [1] producer #-}
+producer :: forall m a. (Monad m, Storable a) => Producer m (Array a) a
+producer = Producer step inject extract
+ where
+
+ inject (Array (ForeignPtr start contents) (Ptr end) _) =
+ return $ ReadUState (ForeignPtr end contents) (Ptr start)
+
+ {-# INLINE [0] step #-}
+ step (ReadUState fp@(ForeignPtr end _) p) | p == Ptr end =
+ let x = unsafeInlineIO $ touchForeignPtr fp
+ in x `seq` return D.Stop
+ step (ReadUState fp p) = do
+ -- unsafeInlineIO allows us to run this in Identity monad for pure
+ -- toList/foldr case which makes them much faster due to not
+ -- accumulating the list and fusing better with the pure consumers.
+ --
+ -- This should be safe as the array contents are guaranteed to be
+ -- evaluated/written to before we peek at them.
+ let !x = unsafeInlineIO $ peek p
+ return $ D.Yield x
+ (ReadUState fp (p `plusPtr` sizeOf (undefined :: a)))
+
+ extract (ReadUState (ForeignPtr end contents) (Ptr p)) =
+ return $ Array (ForeignPtr p contents) (Ptr end) (Ptr end)
+
+{-# INLINE simplify #-}
+simplify :: Producer m a b -> Unfold m a b
+simplify (Producer step inject _) = Unfold step inject
+
+-- | Unfold an array into a stream.
+--
+-- @since 0.7.0
+{-# INLINE [1] read #-}
+read :: forall m a. (Monad m, Storable a) => Unfold m (Array a) a
+read = simplify producer
+
+{-# INLINE fromStreamD #-}
+fromStreamD :: (MonadIO m, Storable a) => D.Stream m a -> m (Array a)
+fromStreamD m = do
+ buffered <- bufferChunks m
+ len <- K.foldl' (+) 0 (K.map length buffered)
+ fromStreamDN len $ D.unfoldMany read $ D.fromStreamK buffered
+
+-- | Create an 'Array' from a list. The list must be of finite size.
+--
+-- @since 0.7.0
+{-# INLINABLE fromList #-}
+fromList :: Storable a => [a] -> Array a
+fromList xs = unsafePerformIO $ fromStreamD $ D.fromList xs
+
+foreign import ccall unsafe "string.h memcpy" c_memcpy
+ :: Ptr Word8 -> Ptr Word8 -> CSize -> IO (Ptr Word8)
+
+-- XXX we are converting Int to CSize
+memcpy :: Ptr Word8 -> Ptr Word8 -> Int -> IO ()
+memcpy dst src len = void (c_memcpy dst src (fromIntegral len))
+
+foreign import ccall unsafe "string.h memcmp" c_memcmp
+ :: Ptr Word8 -> Ptr Word8 -> CSize -> IO CInt
+
+-- XXX we are converting Int to CSize
+-- return True if the memory locations have identical contents
+{-# INLINE memcmp #-}
+memcmp :: Ptr Word8 -> Ptr Word8 -> Int -> IO Bool
+memcmp p1 p2 len = do
+ r <- c_memcmp p1 p2 (fromIntegral len)
+ return $ r == 0
+
+{-# NOINLINE reallocAligned #-}
+reallocAligned :: Int -> Int -> Array a -> IO (Array a)
+reallocAligned alignSize newSize Array{..} = do
+ assert (aEnd <= aBound) (return ())
+ let oldStart = unsafeForeignPtrToPtr aStart
+ let size = aEnd `minusPtr` oldStart
+ newPtr <- mallocForeignPtrAlignedBytes newSize alignSize
+ withForeignPtr newPtr $ \pNew -> do
+ memcpy (castPtr pNew) (castPtr oldStart) size
+ touchForeignPtr aStart
+ return $ Array
+ { aStart = newPtr
+ , aEnd = pNew `plusPtr` size
+ , aBound = pNew `plusPtr` newSize
+ }
+
+-- XXX can unaligned allocation be more efficient when alignment is not needed?
+{-# INLINABLE realloc #-}
+realloc :: forall a. Storable a => Int -> Array a -> IO (Array a)
+realloc = reallocAligned (alignment (undefined :: a))
+
+shrinkToFit :: forall a. Storable a => Array a -> IO (Array a)
+shrinkToFit arr@Array{..} = do
+ assert (aEnd <= aBound) (return ())
+ let start = unsafeForeignPtrToPtr aStart
+ let used = aEnd `minusPtr` start
+ waste = aBound `minusPtr` aEnd
+ -- if used == waste == 0 then do not realloc
+ -- if the wastage is more than 25% of the array then realloc
+ if used < 3 * waste
+ then realloc used arr
+ else return arr
+
+{-# INLINE [1] toStreamD #-}
+toStreamD :: forall m a. (Monad m, Storable a) => Array a -> D.Stream m a
+toStreamD Array{..} =
+ let p = unsafeForeignPtrToPtr aStart
+ in D.Stream step p
+
+ where
+
+ {-# INLINE [0] step #-}
+ step _ p | p == aEnd = return D.Stop
+ step _ p = do
+ -- unsafeInlineIO allows us to run this in Identity monad for pure
+ -- toList/foldr case which makes them much faster due to not
+ -- accumulating the list and fusing better with the pure consumers.
+ --
+ -- This should be safe as the array contents are guaranteed to be
+ -- evaluated/written to before we peek at them.
+ let !x = unsafeInlineIO $ do
+ r <- peek p
+ touchForeignPtr aStart
+ return r
+ return $ D.Yield x (p `plusPtr` sizeOf (undefined :: a))
+
+{-# INLINE [1] foldl' #-}
+foldl' :: forall a b. Storable a => (b -> a -> b) -> b -> Array a -> b
+foldl' f z arr = runIdentity $ D.foldl' f z $ toStreamD arr
+
+{-# INLINE [1] unsafeIndexIO #-}
+unsafeIndexIO :: forall a. Storable a => Array a -> Int -> IO a
+unsafeIndexIO Array {..} i =
+ withForeignPtr aStart $ \p -> do
+ let elemSize = sizeOf (undefined :: a)
+ elemOff = p `plusPtr` (elemSize * i)
+ assert (i >= 0 && elemOff `plusPtr` elemSize <= aEnd)
+ (return ())
+ peek elemOff
diff --git a/testsuite/tests/profiling/should_compile/T19894/Main.hs b/testsuite/tests/profiling/should_compile/T19894/Main.hs
new file mode 100644
index 0000000000..0195d14407
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Main.hs
@@ -0,0 +1,26 @@
+module Main (main) where
+
+import Data.Char (ord)
+import Data.Word (Word8)
+import System.IO (openFile, IOMode(..), Handle)
+import StreamK (IsStream, MonadAsync)
+import qualified Operations
+import qualified StreamK
+import qualified Fold
+import qualified Handle
+import qualified Array
+import Array (Array)
+
+toarr :: String -> Array Word8
+toarr = Array.fromList . map (fromIntegral . ord)
+
+-- | Split on a word8 sequence.
+splitOnSeq :: String -> Handle -> IO ()
+splitOnSeq str inh =
+ Operations.drain $ Operations.splitOnSeq (toarr str) Fold.drain
+ $ Operations.unfold Handle.read inh
+
+main :: IO ()
+main = do
+ inh <- openFile "input.txt" ReadMode
+ splitOnSeq "aa" inh
diff --git a/testsuite/tests/profiling/should_compile/T19894/Operations.hs b/testsuite/tests/profiling/should_compile/T19894/Operations.hs
new file mode 100644
index 0000000000..b17298558a
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Operations.hs
@@ -0,0 +1,221 @@
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+
+module Operations (unfoldrM, drain, postscan, after_, replicate, fold, unfold,
+ splitOnSeq) where
+
+import Control.Monad.IO.Class (MonadIO(..))
+import Data.Bits (shiftR, shiftL, (.|.), (.&.))
+import Data.Word (Word, Word32)
+import Fold (Fold(..))
+import Foreign.Storable (Storable(..))
+import GHC.Types (SPEC(..))
+import Array (Array)
+import StreamK (IsStream, MonadAsync, adaptState)
+import Step (Step(..))
+import Unfold (Unfold)
+#if defined(FUSION_PLUGIN)
+import Fusion.Plugin.Types (Fuse(..))
+#endif
+import qualified StreamK as K
+import qualified StreamD as D
+import qualified Serial
+import qualified Fold as FL
+import qualified Array as A
+import qualified Ring as RB
+import Prelude hiding (replicate)
+
+{-# INLINE [2] unfoldrM #-}
+unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
+unfoldrM = K.unfoldrM
+
+{-# RULES "unfoldrM serial" unfoldrM = unfoldrMSerial #-}
+{-# INLINE [2] unfoldrMSerial #-}
+unfoldrMSerial :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> K.Stream m a
+unfoldrMSerial = Serial.unfoldrM
+
+{-# INLINE [2] drain #-}
+drain :: (Monad m) => K.Stream m a -> m ()
+drain m = D.drain $ D.fromStreamK (K.toStream m)
+{-# RULES "drain fallback to CPS" [1]
+ forall a. D.drain (D.fromStreamK a) = K.drain a #-}
+
+{-# INLINE [1] postscan #-}
+postscan :: (IsStream t, Monad m)
+ => Fold m a b -> t m a -> t m b
+postscan fld m =
+ D.fromStreamD $ D.postscanOnce fld $ D.toStreamD m
+
+{-# INLINE [1] replicate #-}
+replicate :: (IsStream t, Monad m) => Int -> a -> t m a
+replicate n = D.fromStreamD . D.replicate n
+
+{-# INLINE fold_ #-}
+fold_ :: Monad m => Fold m a b -> K.Stream m a -> m (b, K.Stream m a)
+fold_ fl strm = do
+ (b, str) <- D.fold_ fl $ D.toStreamD strm
+ return $! (b, D.fromStreamD str)
+
+{-# INLINE fold #-}
+fold :: Monad m => Fold m a b -> K.Stream m a -> m b
+fold fl strm = do
+ (b, _) <- fold_ fl strm
+ return $! b
+
+{-# INLINE after_ #-}
+after_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
+after_ action xs = D.fromStreamD $ D.after_ action $ D.toStreamD xs
+
+{-# INLINE unfold #-}
+unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
+unfold unf x = D.fromStreamD $ D.unfold unf x
+
+#if defined(FUSION_PLUGIN)
+{-# ANN type SplitOnSeqState Fuse #-}
+#endif
+data SplitOnSeqState rb rh ck w fs s b x =
+ SplitOnSeqInit
+ | SplitOnSeqYield b (SplitOnSeqState rb rh ck w fs s b x)
+ | SplitOnSeqDone
+
+ | SplitOnSeqWordInit !fs s
+ | SplitOnSeqWordLoop !w s !fs
+ | SplitOnSeqWordDone Int !fs !w
+
+ | SplitOnSeqReinit (fs -> SplitOnSeqState rb rh ck w fs s b x)
+
+{-# INLINE [1] splitOnSeqD #-}
+splitOnSeqD
+ :: forall m a b. (MonadIO m, Storable a, Enum a, Eq a)
+ => Array a
+ -> Fold m a b
+ -> D.Stream m a
+ -> D.Stream m b
+splitOnSeqD patArr (Fold fstep initial done) (D.Stream step state) =
+ D.Stream stepOuter SplitOnSeqInit
+
+ where
+
+ patLen = A.length patArr
+ maxIndex = patLen - 1
+ elemBits = sizeOf (undefined :: a) * 8
+
+ -- For word pattern case
+ wordMask :: Word
+ wordMask = (1 `shiftL` (elemBits * patLen)) - 1
+
+ elemMask :: Word
+ elemMask = (1 `shiftL` elemBits) - 1
+
+ wordPat :: Word
+ wordPat = wordMask .&. A.foldl' addToWord 0 patArr
+
+ addToWord wd a = (wd `shiftL` elemBits) .|. fromIntegral (fromEnum a)
+
+ skip = return . Skip
+
+ nextAfterInit nextGen stepRes =
+ case stepRes of
+ FL.Partial s -> nextGen s
+ FL.Done b -> SplitOnSeqYield b (SplitOnSeqReinit nextGen)
+
+ {-# INLINE yieldProceed #-}
+ yieldProceed nextGen fs =
+ initial >>= skip . SplitOnSeqYield fs . nextAfterInit nextGen
+
+ {-# INLINE [0] stepOuter #-}
+ stepOuter _ SplitOnSeqInit = do
+ res <- initial
+ case res of
+ FL.Partial acc -> return $ Skip $ SplitOnSeqWordInit acc state
+ FL.Done b -> skip $ SplitOnSeqYield b SplitOnSeqInit
+
+ stepOuter _ (SplitOnSeqYield x next) = return $ Yield x next
+
+ ---------------------------
+ -- Checkpoint
+ ---------------------------
+
+ stepOuter _ (SplitOnSeqReinit nextGen) =
+ initial >>= skip . nextAfterInit nextGen
+
+ -----------------
+ -- Done
+ -----------------
+
+ stepOuter _ SplitOnSeqDone = return Stop
+
+ ---------------------------
+ -- Short Pattern - Shift Or
+ ---------------------------
+
+ stepOuter _ (SplitOnSeqWordDone 0 fs _) = do
+ r <- done fs
+ skip $ SplitOnSeqYield r SplitOnSeqDone
+ stepOuter _ (SplitOnSeqWordDone n fs wrd) = do
+ let old = elemMask .&. (wrd `shiftR` (elemBits * (n - 1)))
+ r <- fstep fs (toEnum $ fromIntegral old)
+ case r of
+ FL.Partial fs1 -> skip $ SplitOnSeqWordDone (n - 1) fs1 wrd
+ FL.Done b -> do
+ let jump c = SplitOnSeqWordDone (n - 1) c wrd
+ yieldProceed jump b
+
+ stepOuter gst (SplitOnSeqWordInit fs st0) =
+ go SPEC 0 0 st0
+
+ where
+
+ {-# INLINE go #-}
+ go !_ !idx !wrd !st = do
+ res <- step (adaptState gst) st
+ case res of
+ Yield x s -> do
+ let wrd1 = addToWord wrd x
+ if idx == maxIndex
+ then do
+ if wrd1 .&. wordMask == wordPat
+ then do
+ let jump c = SplitOnSeqWordInit c s
+ done fs >>= yieldProceed jump
+ else skip $ SplitOnSeqWordLoop wrd1 s fs
+ else go SPEC (idx + 1) wrd1 s
+ Skip s -> go SPEC idx wrd s
+ Stop -> do
+ if idx /= 0
+ then skip $ SplitOnSeqWordDone idx fs wrd
+ else do
+ r <- done fs
+ skip $ SplitOnSeqYield r SplitOnSeqDone
+
+ stepOuter gst (SplitOnSeqWordLoop wrd0 st0 fs0) =
+ go SPEC wrd0 st0 fs0
+
+ where
+
+ {-# INLINE go #-}
+ go !_ !wrd !st !fs = do
+ res <- step (adaptState gst) st
+ case res of
+ Yield x s -> do
+ let jump c = SplitOnSeqWordInit c s
+ wrd1 = addToWord wrd x
+ old = (wordMask .&. wrd)
+ `shiftR` (elemBits * (patLen - 1))
+ r <- fstep fs (toEnum $ fromIntegral old)
+ case r of
+ FL.Partial fs1 -> do
+ if wrd1 .&. wordMask == wordPat
+ then done fs1 >>= yieldProceed jump
+ else go SPEC wrd1 s fs1
+ FL.Done b -> yieldProceed jump b
+ Skip s -> go SPEC wrd s fs
+ Stop -> skip $ SplitOnSeqWordDone patLen fs wrd
+
+{-# INLINE splitOnSeq #-}
+splitOnSeq
+ :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
+ => Array a -> Fold m a b -> t m a -> t m b
+splitOnSeq patt f m = D.fromStreamD $ splitOnSeqD patt f (D.toStreamD m)
diff --git a/testsuite/tests/profiling/should_compile/T19894/Ring.hs b/testsuite/tests/profiling/should_compile/T19894/Ring.hs
new file mode 100644
index 0000000000..715103055f
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Ring.hs
@@ -0,0 +1,254 @@
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+-- |
+-- Module : Streamly.Internal.Ring.Foreign
+-- Copyright : (c) 2019 Composewell Technologies
+-- License : BSD3
+-- Maintainer : streamly@composewell.com
+-- Stability : experimental
+-- Portability : GHC
+--
+
+module Ring
+ ( Ring(..)
+
+ -- * Construction
+ , new
+ , advance
+ , moveBy
+ , startOf
+
+ -- * Modification
+ , unsafeInsert
+
+ -- * Folds
+ , unsafeFoldRing
+ , unsafeFoldRingM
+ , unsafeFoldRingFullM
+ , unsafeFoldRingNM
+
+ -- * Fast Byte Comparisons
+ , unsafeEqArray
+ , unsafeEqArrayN
+ ) where
+
+import Control.Exception (assert)
+import Foreign.ForeignPtr (ForeignPtr, withForeignPtr, touchForeignPtr)
+import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
+import Foreign.Ptr (plusPtr, minusPtr, castPtr)
+import Foreign.Storable (Storable(..))
+import GHC.ForeignPtr (mallocPlainForeignPtrAlignedBytes)
+import GHC.Ptr (Ptr(..))
+import Prelude hiding (length, concat)
+
+import Control.Monad.IO.Class (MonadIO(..))
+
+import qualified Array as A
+
+-- | A ring buffer is a mutable array of fixed size. Initially the array is
+-- empty, with ringStart pointing at the start of allocated memory. We call the
+-- next location to be written in the ring as ringHead. Initially ringHead ==
+-- ringStart. When the first item is added, ringHead points to ringStart +
+-- sizeof item. When the buffer becomes full ringHead would wrap around to
+-- ringStart. When the buffer is full, ringHead always points at the oldest
+-- item in the ring and the newest item added always overwrites the oldest
+-- item.
+--
+-- When using it we should keep in mind that a ringBuffer is a mutable data
+-- structure. We should not leak out references to it for immutable use.
+--
+data Ring a = Ring
+ { ringStart :: {-# UNPACK #-} !(ForeignPtr a) -- first address
+ , ringBound :: {-# UNPACK #-} !(Ptr a) -- first address beyond allocated memory
+ }
+
+-- | Get the first address of the ring as a pointer.
+startOf :: Ring a -> Ptr a
+startOf = unsafeForeignPtrToPtr . ringStart
+
+-- | Create a new ringbuffer and return the ring buffer and the ringHead.
+-- Returns the ring and the ringHead, the ringHead is same as ringStart.
+{-# INLINE new #-}
+new :: forall a. Storable a => Int -> IO (Ring a, Ptr a)
+new count = do
+ let size = count * sizeOf (undefined :: a)
+ fptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: a))
+ let p = unsafeForeignPtrToPtr fptr
+ return (Ring
+ { ringStart = fptr
+ , ringBound = p `plusPtr` size
+ }, p)
+
+-- | Advance the ringHead by 1 item, wrap around if we hit the end of the
+-- array.
+{-# INLINE advance #-}
+advance :: forall a. Storable a => Ring a -> Ptr a -> Ptr a
+advance Ring{..} ringHead =
+ let ptr = ringHead `plusPtr` sizeOf (undefined :: a)
+ in if ptr < ringBound
+ then ptr
+ else unsafeForeignPtrToPtr ringStart
+
+-- | Move the ringHead by n items. The direction depends on the sign on whether
+-- n is positive or negative. Wrap around if we hit the beginning or end of the
+-- array.
+{-# INLINE moveBy #-}
+moveBy :: forall a. Storable a => Int -> Ring a -> Ptr a -> Ptr a
+moveBy by Ring {..} ringHead = ringStartPtr `plusPtr` advanceFromHead
+
+ where
+
+ elemSize = sizeOf (undefined :: a)
+ ringStartPtr = unsafeForeignPtrToPtr ringStart
+ lenInBytes = ringBound `minusPtr` ringStartPtr
+ offInBytes = ringHead `minusPtr` ringStartPtr
+ len = assert (lenInBytes `mod` elemSize == 0) $ lenInBytes `div` elemSize
+ off = assert (offInBytes `mod` elemSize == 0) $ offInBytes `div` elemSize
+ advanceFromHead = (off + by `mod` len) * elemSize
+
+-- | Insert an item at the head of the ring, when the ring is full this
+-- replaces the oldest item in the ring with the new item. This is unsafe
+-- beause ringHead supplied is not verified to be within the Ring. Also,
+-- the ringStart foreignPtr must be guaranteed to be alive by the caller.
+{-# INLINE unsafeInsert #-}
+unsafeInsert :: Storable a => Ring a -> Ptr a -> a -> IO (Ptr a)
+unsafeInsert rb ringHead newVal = do
+ poke ringHead newVal
+ -- touchForeignPtr (ringStart rb)
+ return $ advance rb ringHead
+
+-- XXX remove all usage of A.unsafeInlineIO
+--
+-- | Like 'unsafeEqArray' but compares only N bytes instead of entire length of
+-- the ring buffer. This is unsafe because the ringHead Ptr is not checked to
+-- be in range.
+{-# INLINE unsafeEqArrayN #-}
+unsafeEqArrayN :: Ring a -> Ptr a -> A.Array a -> Int -> Bool
+unsafeEqArrayN Ring{..} rh A.Array{..} n =
+ let !res = A.unsafeInlineIO $ do
+ let rs = unsafeForeignPtrToPtr ringStart
+ as = unsafeForeignPtrToPtr aStart
+ assert (aEnd `minusPtr` as >= ringBound `minusPtr` rs) (return ())
+ let len = ringBound `minusPtr` rh
+ r1 <- A.memcmp (castPtr rh) (castPtr as) (min len n)
+ r2 <- if n > len
+ then A.memcmp (castPtr rs) (castPtr (as `plusPtr` len))
+ (min (rh `minusPtr` rs) (n - len))
+ else return True
+ -- XXX enable these, check perf impact
+ -- touchForeignPtr ringStart
+ -- touchForeignPtr aStart
+ return (r1 && r2)
+ in res
+
+-- | Byte compare the entire length of ringBuffer with the given array,
+-- starting at the supplied ringHead pointer. Returns true if the Array and
+-- the ringBuffer have identical contents.
+--
+-- This is unsafe because the ringHead Ptr is not checked to be in range. The
+-- supplied array must be equal to or bigger than the ringBuffer, ARRAY BOUNDS
+-- ARE NOT CHECKED.
+{-# INLINE unsafeEqArray #-}
+unsafeEqArray :: Ring a -> Ptr a -> A.Array a -> Bool
+unsafeEqArray Ring{..} rh A.Array{..} =
+ let !res = A.unsafeInlineIO $ do
+ let rs = unsafeForeignPtrToPtr ringStart
+ let as = unsafeForeignPtrToPtr aStart
+ assert (aEnd `minusPtr` as >= ringBound `minusPtr` rs)
+ (return ())
+ let len = ringBound `minusPtr` rh
+ r1 <- A.memcmp (castPtr rh) (castPtr as) len
+ r2 <- A.memcmp (castPtr rs) (castPtr (as `plusPtr` len))
+ (rh `minusPtr` rs)
+ -- XXX enable these, check perf impact
+ -- touchForeignPtr ringStart
+ -- touchForeignPtr aStart
+ return (r1 && r2)
+ in res
+
+-- XXX use MonadIO
+--
+-- | Fold the buffer starting from ringStart up to the given 'Ptr' using a pure
+-- step function. This is useful to fold the items in the ring when the ring is
+-- not full. The supplied pointer is usually the end of the ring.
+--
+-- Unsafe because the supplied Ptr is not checked to be in range.
+{-# INLINE unsafeFoldRing #-}
+unsafeFoldRing :: forall a b. Storable a
+ => Ptr a -> (b -> a -> b) -> b -> Ring a -> b
+unsafeFoldRing ptr f z Ring{..} =
+ let !res = A.unsafeInlineIO $ withForeignPtr ringStart $ \p ->
+ go z p ptr
+ in res
+ where
+ go !acc !p !q
+ | p == q = return acc
+ | otherwise = do
+ x <- peek p
+ go (f acc x) (p `plusPtr` sizeOf (undefined :: a)) q
+
+-- XXX Can we remove MonadIO here?
+withForeignPtrM :: MonadIO m => ForeignPtr a -> (Ptr a -> m b) -> m b
+withForeignPtrM fp fn = do
+ r <- fn $ unsafeForeignPtrToPtr fp
+ liftIO $ touchForeignPtr fp
+ return r
+
+-- | Like unsafeFoldRing but with a monadic step function.
+{-# INLINE unsafeFoldRingM #-}
+unsafeFoldRingM :: forall m a b. (MonadIO m, Storable a)
+ => Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
+unsafeFoldRingM ptr f z Ring {..} =
+ withForeignPtrM ringStart $ \x -> go z x ptr
+ where
+ go !acc !start !end
+ | start == end = return acc
+ | otherwise = do
+ let !x = A.unsafeInlineIO $ peek start
+ acc' <- f acc x
+ go acc' (start `plusPtr` sizeOf (undefined :: a)) end
+
+-- | Fold the entire length of a ring buffer starting at the supplied ringHead
+-- pointer. Assuming the supplied ringHead pointer points to the oldest item,
+-- this would fold the ring starting from the oldest item to the newest item in
+-- the ring.
+--
+-- Note, this will crash on ring of 0 size.
+--
+{-# INLINE unsafeFoldRingFullM #-}
+unsafeFoldRingFullM :: forall m a b. (MonadIO m, Storable a)
+ => Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
+unsafeFoldRingFullM rh f z rb@Ring {..} =
+ withForeignPtrM ringStart $ \_ -> go z rh
+ where
+ go !acc !start = do
+ let !x = A.unsafeInlineIO $ peek start
+ acc' <- f acc x
+ let ptr = advance rb start
+ if ptr == rh
+ then return acc'
+ else go acc' ptr
+
+-- | Fold @Int@ items in the ring starting at @Ptr a@. Won't fold more
+-- than the length of the ring.
+--
+-- Note, this will crash on ring of 0 size.
+--
+{-# INLINE unsafeFoldRingNM #-}
+unsafeFoldRingNM :: forall m a b. (MonadIO m, Storable a)
+ => Int -> Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
+unsafeFoldRingNM count rh f z rb@Ring {..} =
+ withForeignPtrM ringStart $ \_ -> go count z rh
+
+ where
+
+ go 0 acc _ = return acc
+ go !n !acc !start = do
+ let !x = A.unsafeInlineIO $ peek start
+ acc' <- f acc x
+ let ptr = advance rb start
+ if ptr == rh || n == 0
+ then return acc'
+ else go (n - 1) acc' ptr
diff --git a/testsuite/tests/profiling/should_compile/T19894/Serial.hs b/testsuite/tests/profiling/should_compile/T19894/Serial.hs
new file mode 100644
index 0000000000..d8c9cabcfe
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Serial.hs
@@ -0,0 +1,8 @@
+module Serial (unfoldrM) where
+
+import StreamK (IsStream)
+import qualified StreamD as D
+
+{-# INLINE unfoldrM #-}
+unfoldrM :: (IsStream t, Monad m) => (b -> m (Maybe (a, b))) -> b -> t m a
+unfoldrM step seed = D.fromStreamD (D.unfoldrM step seed)
diff --git a/testsuite/tests/profiling/should_compile/T19894/Step.hs b/testsuite/tests/profiling/should_compile/T19894/Step.hs
new file mode 100644
index 0000000000..72b134c84b
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Step.hs
@@ -0,0 +1,44 @@
+{-# LANGUAGE CPP #-}
+-- |
+-- Module : Streamly.Internal.Data.Stream.StreamD.Step
+-- Copyright : (c) 2018 Composewell Technologies
+-- License : BSD-3-Clause
+-- Maintainer : streamly@composewell.com
+-- Stability : experimental
+-- Portability : GHC
+
+module Step
+ (
+ -- * The stream type
+ Step (..)
+ )
+where
+
+#if defined(FUSION_PLUGIN)
+import Fusion.Plugin.Types (Fuse(..))
+#endif
+
+-- | A stream is a succession of 'Step's. A 'Yield' produces a single value and
+-- the next state of the stream. 'Stop' indicates there are no more values in
+-- the stream.
+#if defined(FUSION_PLUGIN)
+{-# ANN type Step Fuse #-}
+#endif
+data Step s a = Yield a s | Skip s | Stop
+
+instance Functor (Step s) where
+ {-# INLINE fmap #-}
+ fmap f (Yield x s) = Yield (f x) s
+ fmap _ (Skip s) = Skip s
+ fmap _ Stop = Stop
+
+{-
+yield :: Monad m => a -> s -> m (Step s a)
+yield a = return . Yield a
+
+skip :: Monad m => s -> m (Step s a)
+skip = return . Skip
+
+stop :: Monad m => m (Step s a)
+stop = return Stop
+-}
diff --git a/testsuite/tests/profiling/should_compile/T19894/StreamD.hs b/testsuite/tests/profiling/should_compile/T19894/StreamD.hs
new file mode 100644
index 0000000000..265780188a
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/StreamD.hs
@@ -0,0 +1,1079 @@
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE PatternSynonyms #-}
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE ViewPatterns #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+
+#include "inline.hs"
+
+-- |
+-- Module : Streamly.Internal.Data.Stream.StreamD.Type
+-- Copyright : (c) 2018 Composewell Technologies
+-- (c) Roman Leshchinskiy 2008-2010
+-- License : BSD-3-Clause
+-- Maintainer : streamly@composewell.com
+-- Stability : experimental
+-- Portability : GHC
+
+-- The stream type is inspired by the vector package. A few functions in this
+-- module have been originally adapted from the vector package (c) Roman
+-- Leshchinskiy. See the notes in specific functions.
+
+module StreamD
+ (
+ -- * The stream type
+ Step (..)
+ -- XXX UnStream is exported to avoid a performance issue in concatMap if we
+ -- use the pattern synonym "Stream".
+ , Stream (Stream, UnStream)
+
+ -- * Primitives
+ , nilM
+ , consM
+ , uncons
+
+ -- * From Unfold
+ , unfold
+ , unfoldrM
+ , drain
+
+ -- * From Values
+ , yield
+ , yieldM
+ , replicate
+
+ -- * From Containers
+ , fromList
+
+ -- * Conversions From/To
+ , fromStreamK
+ , toStreamK
+ , toStreamD
+ , fromStreamD
+
+ -- * Running a 'Fold'
+ , fold
+ , fold_
+
+ -- * Right Folds
+ -- , foldrT
+ , foldrM
+ , foldrMx
+ , foldr
+ -- , foldrS
+
+ -- * Left Folds
+ , foldl'
+ , foldlM'
+ , foldlx'
+ , foldlMx'
+
+ -- * To Containers
+ , toList
+
+ -- * Multi-stream folds
+ , eqBy
+ , cmpBy
+
+ -- * Transformations
+ , map
+ , mapM
+ , take
+ , takeWhile
+ , takeWhileM
+ , postscanOnce
+ , after_
+
+ -- * Nesting
+ -- , ConcatMapUState (..)
+ , unfoldMany
+ {-
+ , concatMap
+ , concatMapM
+ , FoldMany (..) -- for inspection testing
+ , FoldManyPost (..)
+ , foldMany
+ , foldManyPost
+ , groupsOf2
+ , chunksOf
+ -}
+ )
+where
+
+import Control.Applicative (liftA2)
+-- import Control.Monad (when)
+-- import Control.Monad.Trans.Class (lift, MonadTrans)
+import Data.Functor.Identity (Identity(..))
+-- import Fusion.Plugin.Types (Fuse(..))
+import GHC.Base (build)
+import GHC.Types (SPEC(..))
+import Prelude hiding (map, mapM, foldr, take, concatMap, takeWhile, replicate)
+
+import Unfold (Unfold(..))
+import Fold (Fold(..))
+import Step (Step (..))
+import StreamK (State, adaptState, defState)
+-- import Streamly.Internal.Data.SVar (State, adaptState, defState)
+-- import Streamly.Internal.Data.Unfold.Type (Unfold(..))
+
+-- ort qualified Streamly.Internal.Data.Fold.Type as FL
+import qualified StreamK as K
+import qualified Fold as FL
+
+------------------------------------------------------------------------------
+-- The direct style stream type
+------------------------------------------------------------------------------
+
+-- gst = global state
+-- | A stream consists of a step function that generates the next step given a
+-- current state, and the current state.
+data Stream m a =
+ forall s. UnStream (State K.Stream m a -> s -> m (Step s a)) s
+
+unShare :: Stream m a -> Stream m a
+unShare (UnStream step state) = UnStream step' state
+ where step' gst = step (adaptState gst)
+
+pattern Stream :: (State K.Stream m a -> s -> m (Step s a)) -> s -> Stream m a
+pattern Stream step state <- (unShare -> UnStream step state)
+ where Stream = UnStream
+
+#if __GLASGOW_HASKELL__ >= 802
+{-# COMPLETE Stream #-}
+#endif
+
+------------------------------------------------------------------------------
+-- Primitives
+------------------------------------------------------------------------------
+
+-- | An empty 'Stream' with a side effect.
+{-# INLINE_NORMAL nilM #-}
+nilM :: Monad m => m b -> Stream m a
+nilM m = Stream (\_ _ -> m >> return Stop) ()
+
+{-# INLINE_NORMAL consM #-}
+consM :: Monad m => m a -> Stream m a -> Stream m a
+consM m (Stream step state) = Stream step1 Nothing
+ where
+ {-# INLINE_LATE step1 #-}
+ step1 _ Nothing = m >>= \x -> return $ Yield x (Just state)
+ step1 gst (Just st) = do
+ r <- step gst st
+ return $
+ case r of
+ Yield a s -> Yield a (Just s)
+ Skip s -> Skip (Just s)
+ Stop -> Stop
+
+-- | Does not fuse, has the same performance as the StreamK version.
+{-# INLINE_NORMAL uncons #-}
+uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a))
+uncons (UnStream step state) = go state
+ where
+ go st = do
+ r <- step defState st
+ case r of
+ Yield x s -> return $ Just (x, Stream step s)
+ Skip s -> go s
+ Stop -> return Nothing
+
+------------------------------------------------------------------------------
+-- From 'Unfold'
+------------------------------------------------------------------------------
+
+data UnfoldState s = UnfoldNothing | UnfoldJust s
+
+-- | Convert an 'Unfold' into a 'Stream' by supplying it a seed.
+--
+{-# INLINE_NORMAL unfold #-}
+unfold :: Monad m => Unfold m a b -> a -> Stream m b
+unfold (Unfold ustep inject) seed = Stream step UnfoldNothing
+ where
+ {-# INLINE_LATE step #-}
+ step _ UnfoldNothing = inject seed >>= return . Skip . UnfoldJust
+ step _ (UnfoldJust st) = do
+ r <- ustep st
+ return $ case r of
+ Yield x s -> Yield x (UnfoldJust s)
+ Skip s -> Skip (UnfoldJust s)
+ Stop -> Stop
+
+------------------------------------------------------------------------------
+-- From Values
+------------------------------------------------------------------------------
+
+-- | Create a singleton 'Stream' from a pure value.
+{-# INLINE_NORMAL yield #-}
+yield :: Applicative m => a -> Stream m a
+yield x = Stream (\_ s -> pure $ step undefined s) True
+ where
+ {-# INLINE_LATE step #-}
+ step _ True = Yield x False
+ step _ False = Stop
+
+-- | Create a singleton 'Stream' from a monadic action.
+{-# INLINE_NORMAL yieldM #-}
+yieldM :: Monad m => m a -> Stream m a
+yieldM m = Stream step True
+ where
+ {-# INLINE_LATE step #-}
+ step _ True = m >>= \x -> return $ Yield x False
+ step _ False = return Stop
+
+------------------------------------------------------------------------------
+-- From Containers
+------------------------------------------------------------------------------
+
+-- Adapted from the vector package.
+-- | Convert a list of pure values to a 'Stream'
+{-# INLINE_LATE fromList #-}
+fromList :: Applicative m => [a] -> Stream m a
+fromList = Stream step
+ where
+ {-# INLINE_LATE step #-}
+ step _ (x:xs) = pure $ Yield x xs
+ step _ [] = pure Stop
+
+------------------------------------------------------------------------------
+-- Conversions From/To
+------------------------------------------------------------------------------
+
+-- | Convert a CPS encoded StreamK to direct style step encoded StreamD
+{-# INLINE_LATE fromStreamK #-}
+fromStreamK :: Monad m => K.Stream m a -> Stream m a
+fromStreamK = Stream step
+ where
+ step gst m1 =
+ let stop = return Stop
+ single a = return $ Yield a K.nil
+ yieldk a r = return $ Yield a r
+ in K.foldStreamShared gst yieldk single stop m1
+
+-- | Convert a direct style step encoded StreamD to a CPS encoded StreamK
+{-# INLINE_LATE toStreamK #-}
+toStreamK :: Monad m => Stream m a -> K.Stream m a
+toStreamK (Stream step state) = go state
+ where
+ go st = K.mkStream $ \gst yld _ stp ->
+ let go' ss = do
+ r <- step gst ss
+ case r of
+ Yield x s -> yld x (go s)
+ Skip s -> go' s
+ Stop -> stp
+ in go' st
+
+#if !defined(DISABLE_FUSION)
+{-# RULES "fromStreamK/toStreamK fusion"
+ forall s. toStreamK (fromStreamK s) = s #-}
+{-# RULES "toStreamK/fromStreamK fusion"
+ forall s. fromStreamK (toStreamK s) = s #-}
+#endif
+
+-- XXX Rename to toStream or move to some IsStream common module
+{-# INLINE fromStreamD #-}
+fromStreamD :: (K.IsStream t, Monad m) => Stream m a -> t m a
+fromStreamD = K.fromStream . toStreamK
+
+-- XXX Rename to toStream or move to some IsStream common module
+{-# INLINE toStreamD #-}
+toStreamD :: (K.IsStream t, Monad m) => t m a -> Stream m a
+toStreamD = fromStreamK . K.toStream
+
+------------------------------------------------------------------------------
+-- Running a 'Fold'
+------------------------------------------------------------------------------
+
+{-# INLINE_NORMAL fold #-}
+fold :: (Monad m) => Fold m a b -> Stream m a -> m b
+fold fld strm = do
+ (b, _) <- fold_ fld strm
+ return b
+
+{-# INLINE_NORMAL fold_ #-}
+fold_ :: Monad m => Fold m a b -> Stream m a -> m (b, Stream m a)
+fold_ (Fold fstep begin done) (Stream step state) = do
+ res <- begin
+ case res of
+ FL.Partial fs -> go SPEC fs state
+ FL.Done fb -> return $! (fb, Stream step state)
+
+ where
+
+ {-# INLINE go #-}
+ go !_ !fs st = do
+ r <- step defState st
+ case r of
+ Yield x s -> do
+ res <- fstep fs x
+ case res of
+ FL.Done b -> return $! (b, Stream step s)
+ FL.Partial fs1 -> go SPEC fs1 s
+ Skip s -> go SPEC fs s
+ Stop -> do
+ b <- done fs
+ return $! (b, Stream (\ _ _ -> return Stop) ())
+
+------------------------------------------------------------------------------
+-- Right Folds
+------------------------------------------------------------------------------
+
+-- Adapted from the vector package.
+--
+-- XXX Use of SPEC constructor in folds causes 2x performance degradation in
+-- one shot operations, but helps immensely in operations composed of multiple
+-- combinators or the same combinator many times. There seems to be an
+-- opportunity to optimize here, can we get both, better perf for single ops
+-- as well as composed ops? Without SPEC, all single operation benchmarks
+-- become 2x faster.
+
+-- The way we want a left fold to be strict, dually we want the right fold to
+-- be lazy. The correct signature of the fold function to keep it lazy must be
+-- (a -> m b -> m b) instead of (a -> b -> m b). We were using the latter
+-- earlier, which is incorrect. In the latter signature we have to feed the
+-- value to the fold function after evaluating the monadic action, depending on
+-- the bind behavior of the monad, the action may get evaluated immediately
+-- introducing unnecessary strictness to the fold. If the implementation is
+-- lazy the following example, must work:
+--
+-- S.foldrM (\x t -> if x then return t else return False) (return True)
+-- (S.fromList [False,undefined] :: SerialT IO Bool)
+--
+{-# INLINE_NORMAL foldrM #-}
+foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b
+foldrM f z (Stream step state) = go SPEC state
+ where
+ {-# INLINE_LATE go #-}
+ go !_ st = do
+ r <- step defState st
+ case r of
+ Yield x s -> f x (go SPEC s)
+ Skip s -> go SPEC s
+ Stop -> z
+
+{-# INLINE_NORMAL foldrMx #-}
+foldrMx :: Monad m
+ => (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
+foldrMx fstep final convert (Stream step state) = convert $ go SPEC state
+ where
+ {-# INLINE_LATE go #-}
+ go !_ st = do
+ r <- step defState st
+ case r of
+ Yield x s -> fstep x (go SPEC s)
+ Skip s -> go SPEC s
+ Stop -> final
+
+-- Note that foldr works on pure values, therefore it becomes necessarily
+-- strict when the monad m is strict. In that case it cannot terminate early,
+-- it would evaluate all of its input. Though, this should work fine with lazy
+-- monads. For example, if "any" is implemented using "foldr" instead of
+-- "foldrM" it performs the same with Identity monad but performs 1000x slower
+-- with IO monad.
+--
+{-# INLINE_NORMAL foldr #-}
+foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b
+foldr f z = foldrM (\a b -> liftA2 f (return a) b) (return z)
+
+{-
+-- this performs horribly, should not be used
+{-# INLINE_NORMAL foldrS #-}
+foldrS
+ :: Monad m
+ => (a -> Stream m b -> Stream m b)
+ -> Stream m b
+ -> Stream m a
+ -> Stream m b
+foldrS f final (Stream step state) = go SPEC state
+ where
+ {-# INLINE_LATE go #-}
+ go !_ st = do
+ -- defState??
+ r <- yieldM $ step defState st
+ case r of
+ Yield x s -> f x (go SPEC s)
+ Skip s -> go SPEC s
+ Stop -> final
+ -}
+
+{-
+-- Right fold to some transformer (T) monad. This can be useful to implement
+-- stateless combinators like map, filtering, insertions, takeWhile, dropWhile.
+--
+{-# INLINE_NORMAL foldrT #-}
+foldrT :: (Monad m, Monad (t m), MonadTrans t)
+ => (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
+foldrT f final (Stream step state) = go SPEC state
+ where
+ {-# INLINE_LATE go #-}
+ go !_ st = do
+ r <- lift $ step defState st
+ case r of
+ Yield x s -> f x (go SPEC s)
+ Skip s -> go SPEC s
+ Stop -> final
+-}
+
+------------------------------------------------------------------------------
+-- Left Folds
+------------------------------------------------------------------------------
+
+-- XXX run begin action only if the stream is not empty.
+{-# INLINE_NORMAL foldlMx' #-}
+foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
+foldlMx' fstep begin done (Stream step state) =
+ begin >>= \x -> go SPEC x state
+ where
+ -- XXX !acc?
+ {-# INLINE_LATE go #-}
+ go !_ acc st = acc `seq` do
+ r <- step defState st
+ case r of
+ Yield x s -> do
+ acc' <- fstep acc x
+ go SPEC acc' s
+ Skip s -> go SPEC acc s
+ Stop -> done acc
+
+{-# INLINE foldlx' #-}
+foldlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
+foldlx' fstep begin done m =
+ foldlMx' (\b a -> return (fstep b a)) (return begin) (return . done) m
+
+-- Adapted from the vector package.
+-- XXX implement in terms of foldlMx'?
+{-# INLINE_NORMAL foldlM' #-}
+foldlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> m b
+foldlM' fstep mbegin (Stream step state) = do
+ begin <- mbegin
+ go SPEC begin state
+ where
+ {-# INLINE_LATE go #-}
+ go !_ acc st = acc `seq` do
+ r <- step defState st
+ case r of
+ Yield x s -> do
+ acc' <- fstep acc x
+ go SPEC acc' s
+ Skip s -> go SPEC acc s
+ Stop -> return acc
+
+{-# INLINE foldl' #-}
+foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b
+foldl' fstep begin = foldlM' (\b a -> return (fstep b a)) (return begin)
+
+------------------------------------------------------------------------------
+-- To Containers
+------------------------------------------------------------------------------
+
+{-# INLINE_NORMAL toList #-}
+toList :: Monad m => Stream m a -> m [a]
+toList = foldr (:) []
+
+-- Use foldr/build fusion to fuse with list consumers
+-- This can be useful when using the IsList instance
+{-# INLINE_LATE toListFB #-}
+toListFB :: (a -> b -> b) -> b -> Stream Identity a -> b
+toListFB c n (Stream step state) = go state
+ where
+ go st = case runIdentity (step defState st) of
+ Yield x s -> x `c` go s
+ Skip s -> go s
+ Stop -> n
+
+{-# RULES "toList Identity" toList = toListId #-}
+{-# INLINE_EARLY toListId #-}
+toListId :: Stream Identity a -> Identity [a]
+toListId s = Identity $ build (\c n -> toListFB c n s)
+
+------------------------------------------------------------------------------
+-- Multi-stream folds
+------------------------------------------------------------------------------
+
+-- Adapted from the vector package.
+{-# INLINE_NORMAL eqBy #-}
+eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
+eqBy eq (Stream step1 t1) (Stream step2 t2) = eq_loop0 SPEC t1 t2
+ where
+ eq_loop0 !_ s1 s2 = do
+ r <- step1 defState s1
+ case r of
+ Yield x s1' -> eq_loop1 SPEC x s1' s2
+ Skip s1' -> eq_loop0 SPEC s1' s2
+ Stop -> eq_null s2
+
+ eq_loop1 !_ x s1 s2 = do
+ r <- step2 defState s2
+ case r of
+ Yield y s2'
+ | eq x y -> eq_loop0 SPEC s1 s2'
+ | otherwise -> return False
+ Skip s2' -> eq_loop1 SPEC x s1 s2'
+ Stop -> return False
+
+ eq_null s2 = do
+ r <- step2 defState s2
+ case r of
+ Yield _ _ -> return False
+ Skip s2' -> eq_null s2'
+ Stop -> return True
+
+-- Adapted from the vector package.
+-- | Compare two streams lexicographically
+{-# INLINE_NORMAL cmpBy #-}
+cmpBy
+ :: Monad m
+ => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
+cmpBy cmp (Stream step1 t1) (Stream step2 t2) = cmp_loop0 SPEC t1 t2
+ where
+ cmp_loop0 !_ s1 s2 = do
+ r <- step1 defState s1
+ case r of
+ Yield x s1' -> cmp_loop1 SPEC x s1' s2
+ Skip s1' -> cmp_loop0 SPEC s1' s2
+ Stop -> cmp_null s2
+
+ cmp_loop1 !_ x s1 s2 = do
+ r <- step2 defState s2
+ case r of
+ Yield y s2' -> case x `cmp` y of
+ EQ -> cmp_loop0 SPEC s1 s2'
+ c -> return c
+ Skip s2' -> cmp_loop1 SPEC x s1 s2'
+ Stop -> return GT
+
+ cmp_null s2 = do
+ r <- step2 defState s2
+ case r of
+ Yield _ _ -> return LT
+ Skip s2' -> cmp_null s2'
+ Stop -> return EQ
+
+------------------------------------------------------------------------------
+-- Transformations
+------------------------------------------------------------------------------
+
+-- Adapted from the vector package.
+-- | Map a monadic function over a 'Stream'
+{-# INLINE_NORMAL mapM #-}
+mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
+mapM f (Stream step state) = Stream step' state
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst st = do
+ r <- step (adaptState gst) st
+ case r of
+ Yield x s -> f x >>= \a -> return $ Yield a s
+ Skip s -> return $ Skip s
+ Stop -> return Stop
+
+{-# INLINE map #-}
+map :: Monad m => (a -> b) -> Stream m a -> Stream m b
+map f = mapM (return . f)
+
+instance Functor m => Functor (Stream m) where
+ {-# INLINE fmap #-}
+ fmap f (Stream step state) = Stream step' state
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst st = fmap (fmap f) (step (adaptState gst) st)
+
+ {-# INLINE (<$) #-}
+ (<$) = fmap . const
+
+-------------------------------------------------------------------------------
+-- Filtering
+-------------------------------------------------------------------------------
+
+-- Adapted from the vector package.
+{-# INLINE_NORMAL take #-}
+take :: Monad m => Int -> Stream m a -> Stream m a
+take n (Stream step state) = n `seq` Stream step' (state, 0)
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst (st, i) | i < n = do
+ r <- step gst st
+ return $ case r of
+ Yield x s -> Yield x (s, i + 1)
+ Skip s -> Skip (s, i)
+ Stop -> Stop
+ step' _ (_, _) = return Stop
+
+-- Adapted from the vector package.
+{-# INLINE_NORMAL takeWhileM #-}
+takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
+takeWhileM f (Stream step state) = Stream step' state
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst st = do
+ r <- step gst st
+ case r of
+ Yield x s -> do
+ b <- f x
+ return $ if b then Yield x s else Stop
+ Skip s -> return $ Skip s
+ Stop -> return Stop
+
+{-# INLINE takeWhile #-}
+takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
+takeWhile f = takeWhileM (return . f)
+
+------------------------------------------------------------------------------
+-- Combine N Streams - concatAp
+------------------------------------------------------------------------------
+
+{-# INLINE_NORMAL concatAp #-}
+concatAp :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b
+concatAp (Stream stepa statea) (Stream stepb stateb) = Stream step' (Left statea)
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst (Left st) = fmap
+ (\r -> case r of
+ Yield f s -> Skip (Right (f, s, stateb))
+ Skip s -> Skip (Left s)
+ Stop -> Stop)
+ (stepa (adaptState gst) st)
+ step' gst (Right (f, os, st)) = fmap
+ (\r -> case r of
+ Yield a s -> Yield (f a) (Right (f, os, s))
+ Skip s -> Skip (Right (f,os, s))
+ Stop -> Skip (Left os))
+ (stepb (adaptState gst) st)
+
+{-# INLINE_NORMAL apSequence #-}
+apSequence :: Functor f => Stream f a -> Stream f b -> Stream f b
+apSequence (Stream stepa statea) (Stream stepb stateb) =
+ Stream step (Left statea)
+
+ where
+
+ {-# INLINE_LATE step #-}
+ step gst (Left st) =
+ fmap
+ (\r ->
+ case r of
+ Yield _ s -> Skip (Right (s, stateb))
+ Skip s -> Skip (Left s)
+ Stop -> Stop)
+ (stepa (adaptState gst) st)
+ step gst (Right (ostate, st)) =
+ fmap
+ (\r ->
+ case r of
+ Yield b s -> Yield b (Right (ostate, s))
+ Skip s -> Skip (Right (ostate, s))
+ Stop -> Skip (Left ostate))
+ (stepb gst st)
+
+{-# INLINE_NORMAL apDiscardSnd #-}
+apDiscardSnd :: Functor f => Stream f a -> Stream f b -> Stream f a
+apDiscardSnd (Stream stepa statea) (Stream stepb stateb) =
+ Stream step (Left statea)
+
+ where
+
+ {-# INLINE_LATE step #-}
+ step gst (Left st) =
+ fmap
+ (\r ->
+ case r of
+ Yield b s -> Skip (Right (s, stateb, b))
+ Skip s -> Skip (Left s)
+ Stop -> Stop)
+ (stepa gst st)
+ step gst (Right (ostate, st, b)) =
+ fmap
+ (\r ->
+ case r of
+ Yield _ s -> Yield b (Right (ostate, s, b))
+ Skip s -> Skip (Right (ostate, s, b))
+ Stop -> Skip (Left ostate))
+ (stepb (adaptState gst) st)
+
+instance Applicative f => Applicative (Stream f) where
+ {-# INLINE pure #-}
+ pure = yield
+
+ {-# INLINE (<*>) #-}
+ (<*>) = concatAp
+
+#if MIN_VERSION_base(4,10,0)
+ {-# INLINE liftA2 #-}
+ liftA2 f x = (<*>) (fmap f x)
+#endif
+
+ {-# INLINE (*>) #-}
+ (*>) = apSequence
+
+ {-# INLINE (<*) #-}
+ (<*) = apDiscardSnd
+
+------------------------------------------------------------------------------
+-- Combine N Streams - unfoldMany
+------------------------------------------------------------------------------
+
+-- Define a unique structure to use in inspection testing
+data ConcatMapUState o i =
+ ConcatMapUOuter o
+ | ConcatMapUInner o i
+
+-- | @unfoldMany unfold stream@ uses @unfold@ to map the input stream elements
+-- to streams and then flattens the generated streams into a single output
+-- stream.
+
+-- This is like 'concatMap' but uses an unfold with an explicit state to
+-- generate the stream instead of a 'Stream' type generator. This allows better
+-- optimization via fusion. This can be many times more efficient than
+-- 'concatMap'.
+
+{-# INLINE_NORMAL unfoldMany #-}
+unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b
+unfoldMany (Unfold istep inject) (Stream ostep ost) =
+ Stream step (ConcatMapUOuter ost)
+ where
+ {-# INLINE_LATE step #-}
+ step gst (ConcatMapUOuter o) = do
+ r <- ostep (adaptState gst) o
+ case r of
+ Yield a o' -> do
+ i <- inject a
+ i `seq` return (Skip (ConcatMapUInner o' i))
+ Skip o' -> return $ Skip (ConcatMapUOuter o')
+ Stop -> return $ Stop
+
+ step _ (ConcatMapUInner o i) = do
+ r <- istep i
+ return $ case r of
+ Yield x i' -> Yield x (ConcatMapUInner o i')
+ Skip i' -> Skip (ConcatMapUInner o i')
+ Stop -> Skip (ConcatMapUOuter o)
+
+{-
+------------------------------------------------------------------------------
+-- Combine N Streams - concatMap
+------------------------------------------------------------------------------
+
+-- Adapted from the vector package.
+{-# INLINE_NORMAL concatMapM #-}
+concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b
+concatMapM f (Stream step state) = Stream step' (Left state)
+ where
+ {-# INLINE_LATE step' #-}
+ step' gst (Left st) = do
+ r <- step (adaptState gst) st
+ case r of
+ Yield a s -> do
+ b_stream <- f a
+ return $ Skip (Right (b_stream, s))
+ Skip s -> return $ Skip (Left s)
+ Stop -> return Stop
+
+ -- XXX flattenArrays is 5x faster than "concatMap fromArray". if somehow we
+ -- can get inner_step to inline and fuse here we can perhaps get the same
+ -- performance using "concatMap fromArray".
+ --
+ -- XXX using the pattern synonym "Stream" causes a major performance issue
+ -- here even if the synonym does not include an adaptState call. Need to
+ -- find out why. Is that something to be fixed in GHC?
+ step' gst (Right (UnStream inner_step inner_st, st)) = do
+ r <- inner_step (adaptState gst) inner_st
+ case r of
+ Yield b inner_s ->
+ return $ Yield b (Right (Stream inner_step inner_s, st))
+ Skip inner_s ->
+ return $ Skip (Right (Stream inner_step inner_s, st))
+ Stop -> return $ Skip (Left st)
+
+{-# INLINE concatMap #-}
+concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b
+concatMap f = concatMapM (return . f)
+
+-- XXX The idea behind this rule is to rewrite any calls to "concatMap
+-- fromArray" automatically to flattenArrays which is much faster. However, we
+-- need an INLINE_EARLY on concatMap for this rule to fire. But if we use
+-- INLINE_EARLY on concatMap or fromArray then direct uses of
+-- "concatMap fromArray" (without the RULE) become much slower, this means
+-- "concatMap f" in general would become slower. Need to find a solution to
+-- this.
+--
+-- {-# RULES "concatMap Array.toStreamD"
+-- concatMap Array.toStreamD = Array.flattenArray #-}
+
+-- NOTE: even though concatMap for StreamD is 4x faster compared to StreamK,
+-- the monad instance does not seem to be significantly faster.
+instance Monad m => Monad (Stream m) where
+ {-# INLINE return #-}
+ return = pure
+
+ {-# INLINE (>>=) #-}
+ (>>=) = flip concatMap
+
+ {-# INLINE (>>) #-}
+ (>>) = (*>)
+
+------------------------------------------------------------------------------
+-- Grouping/Splitting
+------------------------------------------------------------------------------
+
+-- s = stream state, fs = fold state
+-- {-# ANN type FoldManyPost Fuse #-}
+data FoldManyPost s fs b a
+ = FoldManyPostStart s
+ | FoldManyPostLoop s fs
+ | FoldManyPostYield b (FoldManyPost s fs b a)
+ | FoldManyPostDone
+
+-- | Like foldMany but with the following differences:
+--
+-- * If the stream is empty the default value of the fold would still be
+-- emitted in the output.
+-- * At the end of the stream if the last application of the fold did not
+-- receive any input it would still yield the default fold accumulator as the
+-- last value.
+--
+{-# INLINE_NORMAL foldManyPost #-}
+foldManyPost :: Monad m => Fold m a b -> Stream m a -> Stream m b
+foldManyPost (Fold fstep initial extract) (Stream step state) =
+ Stream step' (FoldManyPostStart state)
+
+ where
+
+ {-# INLINE consume #-}
+ consume x s fs = do
+ res <- fstep fs x
+ return
+ $ Skip
+ $ case res of
+ FL.Done b -> FoldManyPostYield b (FoldManyPostStart s)
+ FL.Partial ps -> FoldManyPostLoop s ps
+
+ {-# INLINE_LATE step' #-}
+ step' _ (FoldManyPostStart st) = do
+ r <- initial
+ return
+ $ Skip
+ $ case r of
+ FL.Done b -> FoldManyPostYield b (FoldManyPostStart st)
+ FL.Partial fs -> FoldManyPostLoop st fs
+ step' gst (FoldManyPostLoop st fs) = do
+ r <- step (adaptState gst) st
+ case r of
+ Yield x s -> consume x s fs
+ Skip s -> return $ Skip (FoldManyPostLoop s fs)
+ Stop -> do
+ b <- extract fs
+ return $ Skip (FoldManyPostYield b FoldManyPostDone)
+ step' _ (FoldManyPostYield b next) = return $ Yield b next
+ step' _ FoldManyPostDone = return Stop
+
+-- {-# ANN type FoldMany Fuse #-}
+data FoldMany s fs b a
+ = FoldManyStart s
+ | FoldManyFirst fs s
+ | FoldManyLoop s fs
+ | FoldManyYield b (FoldMany s fs b a)
+ | FoldManyDone
+
+-- | Apply a fold multiple times until the stream ends. If the stream is empty
+-- the output would be empty.
+--
+-- @foldMany f = parseMany (fromFold f)@
+--
+-- A terminating fold may terminate even without accepting a single input. So
+-- we run the fold's initial action before evaluating the stream. However, this
+-- means that if later the stream does not yield anything we have to discard
+-- the fold's initial result which could have generated an effect.
+--
+{-# INLINE_NORMAL foldMany #-}
+foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b
+foldMany (Fold fstep initial extract) (Stream step state) =
+ Stream step' (FoldManyStart state)
+
+ where
+
+ {-# INLINE consume #-}
+ consume x s fs = do
+ res <- fstep fs x
+ return
+ $ Skip
+ $ case res of
+ FL.Done b -> FoldManyYield b (FoldManyStart s)
+ FL.Partial ps -> FoldManyLoop s ps
+
+ {-# INLINE_LATE step' #-}
+ step' _ (FoldManyStart st) = do
+ r <- initial
+ return
+ $ Skip
+ $ case r of
+ FL.Done b -> FoldManyYield b (FoldManyStart st)
+ FL.Partial fs -> FoldManyFirst fs st
+ step' gst (FoldManyFirst fs st) = do
+ r <- step (adaptState gst) st
+ case r of
+ Yield x s -> consume x s fs
+ Skip s -> return $ Skip (FoldManyFirst fs s)
+ Stop -> return Stop
+ step' gst (FoldManyLoop st fs) = do
+ r <- step (adaptState gst) st
+ case r of
+ Yield x s -> consume x s fs
+ Skip s -> return $ Skip (FoldManyLoop s fs)
+ Stop -> do
+ b <- extract fs
+ return $ Skip (FoldManyYield b FoldManyDone)
+ step' _ (FoldManyYield b next) = return $ Yield b next
+ step' _ FoldManyDone = return Stop
+
+{-# INLINE chunksOf #-}
+chunksOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b
+chunksOf n f = foldMany (FL.take n f)
+
+data GroupState2 s fs
+ = GroupStart2 s
+ | GroupBuffer2 s fs Int
+ | GroupYield2 fs (GroupState2 s fs)
+ | GroupFinish2
+
+{-# INLINE_NORMAL groupsOf2 #-}
+groupsOf2
+ :: Monad m
+ => Int
+ -> m c
+ -> Fold2 m c a b
+ -> Stream m a
+ -> Stream m b
+groupsOf2 n input (Fold2 fstep inject extract) (Stream step state) =
+ n `seq` Stream step' (GroupStart2 state)
+
+ where
+
+ {-# INLINE_LATE step' #-}
+ step' _ (GroupStart2 st) = do
+ -- XXX shall we use the Natural type instead? Need to check performance
+ -- implications.
+ when (n <= 0) $
+ -- XXX we can pass the module string from the higher level API
+ error $ "Streamly.Internal.Data.Stream.StreamD.Type.groupsOf: the size of "
+ ++ "groups [" ++ show n ++ "] must be a natural number"
+ -- fs = fold state
+ fs <- input >>= inject
+ return $ Skip (GroupBuffer2 st fs 0)
+
+ step' gst (GroupBuffer2 st fs i) = do
+ r <- step (adaptState gst) st
+ case r of
+ Yield x s -> do
+ !fs' <- fstep fs x
+ let i' = i + 1
+ return $
+ if i' >= n
+ then Skip (GroupYield2 fs' (GroupStart2 s))
+ else Skip (GroupBuffer2 s fs' i')
+ Skip s -> return $ Skip (GroupBuffer2 s fs i)
+ Stop -> return $ Skip (GroupYield2 fs GroupFinish2)
+
+ step' _ (GroupYield2 fs next) = do
+ r <- extract fs
+ return $ Yield r next
+
+ step' _ GroupFinish2 = return Stop
+
+------------------------------------------------------------------------------
+-- Other instances
+------------------------------------------------------------------------------
+
+instance MonadTrans Stream where
+ {-# INLINE lift #-}
+ lift = yieldM
+
+instance (MonadThrow m) => MonadThrow (Stream m) where
+ throwM = lift . throwM
+ -}
+
+{-# INLINE_NORMAL unfoldrM #-}
+unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a
+unfoldrM next state = Stream step state
+ where
+ {-# INLINE_LATE step #-}
+ step _ st = do
+ r <- next st
+ return $ case r of
+ Just (x, s) -> Yield x s
+ Nothing -> Stop
+
+{-# INLINE_LATE drain #-}
+drain :: Monad m => Stream m a -> m ()
+-- drain = foldrM (\_ xs -> xs) (return ())
+drain (Stream step state) = go SPEC state
+ where
+ go !_ st = do
+ r <- step defState st
+ case r of
+ Yield _ s -> go SPEC s
+ Skip s -> go SPEC s
+ Stop -> return ()
+
+------------------------------------------------------------------------------
+-- Scanning with a Fold
+------------------------------------------------------------------------------
+
+data ScanState s f = ScanInit s | ScanDo s !f | ScanDone
+
+{-# INLINE [1] postscanOnce #-}
+postscanOnce :: Monad m => FL.Fold m a b -> Stream m a -> Stream m b
+postscanOnce (FL.Fold fstep initial extract) (Stream sstep state) =
+ Stream step (ScanInit state)
+
+ where
+
+ {-# INLINE_LATE step #-}
+ step _ (ScanInit st) = do
+ res <- initial
+ return
+ $ case res of
+ FL.Partial fs -> Skip $ ScanDo st fs
+ FL.Done b -> Yield b ScanDone
+ step gst (ScanDo st fs) = do
+ res <- sstep (adaptState gst) st
+ case res of
+ Yield x s -> do
+ r <- fstep fs x
+ case r of
+ FL.Partial fs1 -> do
+ !b <- extract fs1
+ return $ Yield b $ ScanDo s fs1
+ FL.Done b -> return $ Yield b ScanDone
+ Skip s -> return $ Skip $ ScanDo s fs
+ Stop -> return Stop
+ step _ ScanDone = return Stop
+
+{-# INLINE [1] replicateM #-}
+replicateM :: forall m a. Monad m => Int -> m a -> Stream m a
+replicateM n p = Stream step n
+ where
+ {-# INLINE_LATE step #-}
+ step _ (i :: Int)
+ | i <= 0 = return Stop
+ | otherwise = do
+ x <- p
+ return $ Yield x (i - 1)
+
+{-# INLINE [1] replicate #-}
+replicate :: Monad m => Int -> a -> Stream m a
+replicate n x = replicateM n (return x)
+
+{-# INLINE [1] after_ #-}
+after_ :: Monad m => m b -> Stream m a -> Stream m a
+after_ action (Stream step state) = Stream step' state
+
+ where
+
+ {-# INLINE_LATE step' #-}
+ step' gst st = do
+ res <- step gst st
+ case res of
+ Yield x s -> return $ Yield x s
+ Skip s -> return $ Skip s
+ Stop -> action >> return Stop
diff --git a/testsuite/tests/profiling/should_compile/T19894/StreamK.hs b/testsuite/tests/profiling/should_compile/T19894/StreamK.hs
new file mode 100644
index 0000000000..d24559f4dc
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/StreamK.hs
@@ -0,0 +1,1245 @@
+{-# LANGUAGE UndecidableInstances #-}
+{-# LANGUAGE KindSignatures #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE QuantifiedConstraints #-}
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE InstanceSigs #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE CPP #-}
+
+-- |
+-- Module : Streamly.Internal.Data.Stream.StreamK.Type
+-- Copyright : (c) 2017 Composewell Technologies
+--
+-- License : BSD3
+-- Maintainer : streamly@composewell.com
+-- Stability : experimental
+-- Portability : GHC
+--
+--
+-- Continuation passing style (CPS) stream implementation. The symbol 'K' below
+-- denotes a function as well as a Kontinuation.
+--
+module StreamK
+ (
+ -- * A class for streams
+ IsStream (..)
+ , adapt
+ , State
+ , adaptState
+ , defState
+ , MonadAsync
+ , unfoldrM
+ , drain
+
+ -- * The stream type
+ , Stream (..)
+
+ -- * Construction
+ , mkStream
+ , fromStopK
+ , fromYieldK
+ , consK
+
+ -- * Elimination
+ , foldStream
+ , foldStreamShared
+ , foldl'
+ , foldlx'
+
+ -- * foldr/build
+ , foldrM
+ , foldrS
+ , foldrSShared
+ , foldrSM
+ , build
+ , buildS
+ , buildM
+ , buildSM
+ , sharedM
+ , augmentS
+ , augmentSM
+
+ -- instances
+ , cons
+ , (.:)
+ , consMStream
+ , consMBy
+ , yieldM
+ , yield
+
+ , nil
+ , nilM
+ , conjoin
+ , serial
+ , map
+ , mapM
+ , mapMSerial
+ , unShare
+ , concatMapBy
+ , concatMap
+ , bindWith
+ , concatPairsWith
+ , apWith
+ , apSerial
+ , apSerialDiscardFst
+ , apSerialDiscardSnd
+
+ , Streaming -- deprecated
+ )
+where
+
+#include "inline.hs"
+
+import Control.Monad (ap, (>=>))
+-- import Control.Monad.Trans.Class (MonadTrans(lift))
+import Data.Kind (Type)
+-- import Control.Monad.Catch (MonadThrow)
+import Control.Monad.IO.Class (MonadIO(..))
+
+#if __GLASGOW_HASKELL__ < 808
+import Data.Semigroup (Semigroup(..))
+#endif
+import Prelude hiding (map, mapM, concatMap, foldr)
+
+-- import Streamly.Internal.Data.SVar
+
+------------------------------------------------------------------------------
+-- Basic stream type
+------------------------------------------------------------------------------
+
+data State (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a = State
+ {
+ _inspectMode :: Bool
+ }
+
+adaptState :: State t m a -> State t n b
+adaptState st = st
+ {
+ _inspectMode = False
+ }
+
+defState :: State t m a
+defState = State
+ {
+ _inspectMode = False
+ }
+
+-- | The type @Stream m a@ represents a monadic stream of values of type 'a'
+-- constructed using actions in monad 'm'. It uses stop, singleton and yield
+-- continuations equivalent to the following direct style type:
+--
+-- @
+-- data Stream m a = Stop | Singleton a | Yield a (Stream m a)
+-- @
+--
+-- To facilitate parallel composition we maintain a local state in an 'SVar'
+-- that is shared across and is used for synchronization of the streams being
+-- composed.
+--
+-- The singleton case can be expressed in terms of stop and yield but we have
+-- it as a separate case to optimize composition operations for streams with
+-- single element. We build singleton streams in the implementation of 'pure'
+-- for Applicative and Monad, and in 'lift' for MonadTrans.
+
+-- XXX remove the Stream type parameter from State as it is always constant.
+-- We can remove it from SVar as well
+
+newtype Stream (m :: Type -> Type) a =
+ MkStream (forall r.
+ State Stream m a -- state
+ -> (a -> Stream m a -> m r) -- yield
+ -> (a -> m r) -- singleton
+ -> m r -- stop
+ -> m r
+ )
+
+------------------------------------------------------------------------------
+-- Types that can behave as a Stream
+------------------------------------------------------------------------------
+
+infixr 5 `consM`
+infixr 5 |:
+
+type MonadAsync m = (MonadIO m)
+
+-- XXX Use a different SVar based on the stream type. But we need to make sure
+-- that we do not lose performance due to polymorphism.
+--
+-- | Class of types that can represent a stream of elements of some type 'a' in
+-- some monad 'm'.
+--
+-- /Since: 0.2.0 ("Streamly")/
+--
+-- @since 0.8.0
+class
+#if __GLASGOW_HASKELL__ >= 806
+ ( forall m a. MonadAsync m => Semigroup (t m a)
+ , forall m a. MonadAsync m => Monoid (t m a)
+ , forall m. Monad m => Functor (t m)
+ , forall m. MonadAsync m => Applicative (t m)
+ ) =>
+#endif
+ IsStream t where
+ toStream :: t m a -> Stream m a
+ fromStream :: Stream m a -> t m a
+ -- | Constructs a stream by adding a monadic action at the head of an
+ -- existing stream. For example:
+ --
+ -- @
+ -- > toList $ getLine \`consM` getLine \`consM` nil
+ -- hello
+ -- world
+ -- ["hello","world"]
+ -- @
+ --
+ -- /Concurrent (do not use 'fromParallel' to construct infinite streams)/
+ --
+ -- @since 0.2.0
+ consM :: MonadAsync m => m a -> t m a -> t m a
+ -- | Operator equivalent of 'consM'. We can read it as "@parallel colon@"
+ -- to remember that @|@ comes before ':'.
+ --
+ -- @
+ -- > toList $ getLine |: getLine |: nil
+ -- hello
+ -- world
+ -- ["hello","world"]
+ -- @
+ --
+ -- @
+ -- let delay = threadDelay 1000000 >> print 1
+ -- drain $ fromSerial $ delay |: delay |: delay |: nil
+ -- drain $ fromParallel $ delay |: delay |: delay |: nil
+ -- @
+ --
+ -- /Concurrent (do not use 'fromParallel' to construct infinite streams)/
+ --
+ -- @since 0.2.0
+ (|:) :: MonadAsync m => m a -> t m a -> t m a
+ -- We can define (|:) just as 'consM' but it is defined explicitly for each
+ -- type because we want to use SPECIALIZE pragma on the definition.
+
+-- | Same as 'IsStream'.
+--
+-- @since 0.1.0
+{-# DEPRECATED Streaming "Please use IsStream instead." #-}
+type Streaming = IsStream
+
+-------------------------------------------------------------------------------
+-- Type adapting combinators
+-------------------------------------------------------------------------------
+
+-- XXX Move/reset the State here by reconstructing the stream with cleared
+-- state. Can we make sure we do not do that when t1 = t2? If we do this then
+-- we do not need to do that explicitly using svarStyle. It would act as
+-- unShare when the stream type is the same.
+--
+-- | Adapt any specific stream type to any other specific stream type.
+--
+-- /Since: 0.1.0 ("Streamly")/
+--
+-- @since 0.8.0
+adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
+adapt = fromStream . toStream
+
+------------------------------------------------------------------------------
+-- Building a stream
+------------------------------------------------------------------------------
+
+-- XXX The State is always parameterized by "Stream" which means State is not
+-- different for different stream types. So we have to manually make sure that
+-- when converting from one stream to another we migrate the state correctly.
+-- This can be fixed if we use a different SVar type for different streams.
+-- Currently we always use "SVar Stream" and therefore a different State type
+-- parameterized by that stream.
+--
+-- XXX Since t is coercible we should be able to coerce k
+-- mkStream k = fromStream $ MkStream $ coerce k
+--
+-- | Build a stream from an 'SVar', a stop continuation, a singleton stream
+-- continuation and a yield continuation.
+{-# INLINE_EARLY mkStream #-}
+mkStream :: IsStream t
+ => (forall r. State Stream m a
+ -> (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r)
+ -> t m a
+mkStream k = fromStream $ MkStream $ \st yld sng stp ->
+ let yieldk a r = yld a (toStream r)
+ in k st yieldk sng stp
+
+{-# RULES "mkStream from stream" mkStream = mkStreamFromStream #-}
+mkStreamFromStream :: IsStream t
+ => (forall r. State Stream m a
+ -> (a -> Stream m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r)
+ -> t m a
+mkStreamFromStream k = fromStream $ MkStream k
+
+{-# RULES "mkStream stream" mkStream = mkStreamStream #-}
+mkStreamStream
+ :: (forall r. State Stream m a
+ -> (a -> Stream m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r)
+ -> Stream m a
+mkStreamStream = MkStream
+
+-- | A terminal function that has no continuation to follow.
+type StopK m = forall r. m r -> m r
+
+-- | A monadic continuation, it is a function that yields a value of type "a"
+-- and calls the argument (a -> m r) as a continuation with that value. We can
+-- also think of it as a callback with a handler (a -> m r). Category
+-- theorists call it a codensity type, a special type of right kan extension.
+type YieldK m a = forall r. (a -> m r) -> m r
+
+_wrapM :: Monad m => m a -> YieldK m a
+_wrapM m = \k -> m >>= k
+
+-- | Make an empty stream from a stop function.
+fromStopK :: IsStream t => StopK m -> t m a
+fromStopK k = mkStream $ \_ _ _ stp -> k stp
+
+-- | Make a singleton stream from a callback function. The callback function
+-- calls the one-shot yield continuation to yield an element.
+fromYieldK :: IsStream t => YieldK m a -> t m a
+fromYieldK k = mkStream $ \_ _ sng _ -> k sng
+
+-- | Add a yield function at the head of the stream.
+consK :: IsStream t => YieldK m a -> t m a -> t m a
+consK k r = mkStream $ \_ yld _ _ -> k (\x -> yld x r)
+
+-- XXX Build a stream from a repeating callback function.
+
+------------------------------------------------------------------------------
+-- Construction
+------------------------------------------------------------------------------
+
+infixr 5 `cons`
+
+-- faster than consM because there is no bind.
+-- | Construct a stream by adding a pure value at the head of an existing
+-- stream. For serial streams this is the same as @(return a) \`consM` r@ but
+-- more efficient. For concurrent streams this is not concurrent whereas
+-- 'consM' is concurrent. For example:
+--
+-- @
+-- > toList $ 1 \`cons` 2 \`cons` 3 \`cons` nil
+-- [1,2,3]
+-- @
+--
+-- @since 0.1.0
+{-# INLINE_NORMAL cons #-}
+cons :: IsStream t => a -> t m a -> t m a
+cons a r = mkStream $ \_ yld _ _ -> yld a r
+
+infixr 5 .:
+
+-- | Operator equivalent of 'cons'.
+--
+-- @
+-- > toList $ 1 .: 2 .: 3 .: nil
+-- [1,2,3]
+-- @
+--
+-- @since 0.1.1
+{-# INLINE (.:) #-}
+(.:) :: IsStream t => a -> t m a -> t m a
+(.:) = cons
+
+-- | An empty stream.
+--
+-- @
+-- > toList nil
+-- []
+-- @
+--
+-- @since 0.1.0
+{-# INLINE_NORMAL nil #-}
+nil :: IsStream t => t m a
+nil = mkStream $ \_ _ _ stp -> stp
+
+-- | An empty stream producing a side effect.
+--
+-- @
+-- > toList (nilM (print "nil"))
+-- "nil"
+-- []
+-- @
+--
+-- /Pre-release/
+{-# INLINE_NORMAL nilM #-}
+nilM :: (IsStream t, Monad m) => m b -> t m a
+nilM m = mkStream $ \_ _ _ stp -> m >> stp
+
+{-# INLINE_NORMAL yield #-}
+yield :: IsStream t => a -> t m a
+yield a = mkStream $ \_ _ single _ -> single a
+
+{-# INLINE_NORMAL yieldM #-}
+yieldM :: (Monad m, IsStream t) => m a -> t m a
+yieldM m = fromStream $ mkStream $ \_ _ single _ -> m >>= single
+
+-- XXX specialize to IO?
+{-# INLINE consMBy #-}
+consMBy :: (IsStream t, MonadAsync m) => (t m a -> t m a -> t m a)
+ -> m a -> t m a -> t m a
+consMBy f m r = (fromStream $ yieldM m) `f` r
+
+------------------------------------------------------------------------------
+-- Folding a stream
+------------------------------------------------------------------------------
+
+-- | Fold a stream by providing an SVar, a stop continuation, a singleton
+-- continuation and a yield continuation. The stream would share the current
+-- SVar passed via the State.
+{-# INLINE_EARLY foldStreamShared #-}
+foldStreamShared
+ :: IsStream t
+ => State Stream m a
+ -> (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> t m a
+ -> m r
+foldStreamShared st yld sng stp m =
+ let yieldk a x = yld a (fromStream x)
+ MkStream k = toStream m
+ in k st yieldk sng stp
+
+-- XXX write a similar rule for foldStream as well?
+{-# RULES "foldStreamShared from stream"
+ foldStreamShared = foldStreamSharedStream #-}
+foldStreamSharedStream
+ :: State Stream m a
+ -> (a -> Stream m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> Stream m a
+ -> m r
+foldStreamSharedStream st yld sng stp m =
+ let MkStream k = toStream m
+ in k st yld sng stp
+
+-- | Fold a stream by providing a State, stop continuation, a singleton
+-- continuation and a yield continuation. The stream will not use the SVar
+-- passed via State.
+{-# INLINE foldStream #-}
+foldStream
+ :: IsStream t
+ => State Stream m a
+ -> (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> t m a
+ -> m r
+foldStream st yld sng stp m =
+ let yieldk a x = yld a (fromStream x)
+ MkStream k = toStream m
+ in k (adaptState st) yieldk sng stp
+
+-------------------------------------------------------------------------------
+-- Instances
+-------------------------------------------------------------------------------
+
+-- NOTE: specializing the function outside the instance definition seems to
+-- improve performance quite a bit at times, even if we have the same
+-- SPECIALIZE in the instance definition.
+{-# INLINE consMStream #-}
+{-# SPECIALIZE consMStream :: IO a -> Stream IO a -> Stream IO a #-}
+consMStream :: (Monad m) => m a -> Stream m a -> Stream m a
+consMStream m r = MkStream $ \_ yld _ _ -> m >>= \a -> yld a r
+
+-------------------------------------------------------------------------------
+-- IsStream Stream
+-------------------------------------------------------------------------------
+
+instance IsStream Stream where
+ toStream = id
+ fromStream = id
+
+ {-# INLINE consM #-}
+ {-# SPECIALIZE consM :: IO a -> Stream IO a -> Stream IO a #-}
+ consM :: Monad m => m a -> Stream m a -> Stream m a
+ consM = consMStream
+
+ {-# INLINE (|:) #-}
+ {-# SPECIALIZE (|:) :: IO a -> Stream IO a -> Stream IO a #-}
+ (|:) :: Monad m => m a -> Stream m a -> Stream m a
+ (|:) = consMStream
+
+-------------------------------------------------------------------------------
+-- foldr/build fusion
+-------------------------------------------------------------------------------
+
+-- XXX perhaps we can just use foldrSM/buildM everywhere as they are more
+-- general and cover foldrS/buildS as well.
+
+-- | The function 'f' decides how to reconstruct the stream. We could
+-- reconstruct using a shared state (SVar) or without sharing the state.
+--
+{-# INLINE foldrSWith #-}
+foldrSWith :: IsStream t
+ => (forall r. State Stream m b
+ -> (b -> t m b -> m r)
+ -> (b -> m r)
+ -> m r
+ -> t m b
+ -> m r)
+ -> (a -> t m b -> t m b) -> t m b -> t m a -> t m b
+foldrSWith f step final m = go m
+ where
+ go m1 = mkStream $ \st yld sng stp ->
+ let run x = f st yld sng stp x
+ stop = run final
+ single a = run $ step a final
+ yieldk a r = run $ step a (go r)
+ -- XXX if type a and b are the same we do not need adaptState, can we
+ -- save some perf with that?
+ -- XXX since we are using adaptState anyway here we can use
+ -- foldStreamShared instead, will that save some perf?
+ in foldStream (adaptState st) yieldk single stop m1
+
+-- XXX we can use rewrite rules just for foldrSWith, if the function f is the
+-- same we can rewrite it.
+
+-- | Fold sharing the SVar state within the reconstructed stream
+{-# INLINE_NORMAL foldrSShared #-}
+foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
+foldrSShared = foldrSWith foldStreamShared
+
+-- XXX consM is a typeclass method, therefore rewritten already. Instead maybe
+-- we can make consM polymorphic using rewrite rules.
+-- {-# RULES "foldrSShared/id" foldrSShared consM nil = \x -> x #-}
+{-# RULES "foldrSShared/nil"
+ forall k z. foldrSShared k z nil = z #-}
+{-# RULES "foldrSShared/single"
+ forall k z x. foldrSShared k z (yield x) = k x z #-}
+-- {-# RULES "foldrSShared/app" [1]
+-- forall ys. foldrSShared consM ys = \xs -> xs `conjoin` ys #-}
+
+-- | Lazy right associative fold to a stream.
+{-# INLINE_NORMAL foldrS #-}
+foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
+foldrS = foldrSWith foldStream
+
+{-# RULES "foldrS/id" foldrS cons nil = \x -> x #-}
+{-# RULES "foldrS/nil" forall k z. foldrS k z nil = z #-}
+-- See notes in GHC.Base about this rule
+-- {-# RULES "foldr/cons"
+-- forall k z x xs. foldrS k z (x `cons` xs) = k x (foldrS k z xs) #-}
+{-# RULES "foldrS/single" forall k z x. foldrS k z (yield x) = k x z #-}
+-- {-# RULES "foldrS/app" [1]
+-- forall ys. foldrS cons ys = \xs -> xs `conjoin` ys #-}
+
+-------------------------------------------------------------------------------
+-- foldrS with monadic cons i.e. consM
+-------------------------------------------------------------------------------
+
+{-# INLINE foldrSMWith #-}
+foldrSMWith :: (IsStream t, Monad m)
+ => (forall r. State Stream m b
+ -> (b -> t m b -> m r)
+ -> (b -> m r)
+ -> m r
+ -> t m b
+ -> m r)
+ -> (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
+foldrSMWith f step final m = go m
+ where
+ go m1 = mkStream $ \st yld sng stp ->
+ let run x = f st yld sng stp x
+ stop = run final
+ single a = run $ step (return a) final
+ yieldk a r = run $ step (return a) (go r)
+ in foldStream (adaptState st) yieldk single stop m1
+
+{-# INLINE_NORMAL foldrSM #-}
+foldrSM :: (IsStream t, Monad m)
+ => (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
+foldrSM = foldrSMWith foldStream
+
+-- {-# RULES "foldrSM/id" foldrSM consM nil = \x -> x #-}
+{-# RULES "foldrSM/nil" forall k z. foldrSM k z nil = z #-}
+{-# RULES "foldrSM/single" forall k z x. foldrSM k z (yieldM x) = k x z #-}
+-- {-# RULES "foldrSM/app" [1]
+-- forall ys. foldrSM consM ys = \xs -> xs `conjoin` ys #-}
+
+-- Like foldrSM but sharing the SVar state within the recostructed stream.
+{-# INLINE_NORMAL foldrSMShared #-}
+foldrSMShared :: (IsStream t, Monad m)
+ => (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
+foldrSMShared = foldrSMWith foldStreamShared
+
+-- {-# RULES "foldrSM/id" foldrSM consM nil = \x -> x #-}
+{-# RULES "foldrSMShared/nil"
+ forall k z. foldrSMShared k z nil = z #-}
+{-# RULES "foldrSMShared/single"
+ forall k z x. foldrSMShared k z (yieldM x) = k x z #-}
+-- {-# RULES "foldrSM/app" [1]
+-- forall ys. foldrSM consM ys = \xs -> xs `conjoin` ys #-}
+
+-------------------------------------------------------------------------------
+-- build
+-------------------------------------------------------------------------------
+
+{-# INLINE_NORMAL build #-}
+build :: IsStream t => forall a. (forall b. (a -> b -> b) -> b -> b) -> t m a
+build g = g cons nil
+
+{-# RULES "foldrM/build"
+ forall k z (g :: forall b. (a -> b -> b) -> b -> b).
+ foldrM k z (build g) = g k z #-}
+
+{-# RULES "foldrS/build"
+ forall k z (g :: forall b. (a -> b -> b) -> b -> b).
+ foldrS k z (build g) = g k z #-}
+
+{-# RULES "foldrS/cons/build"
+ forall k z x (g :: forall b. (a -> b -> b) -> b -> b).
+ foldrS k z (x `cons` build g) = k x (g k z) #-}
+
+{-# RULES "foldrSShared/build"
+ forall k z (g :: forall b. (a -> b -> b) -> b -> b).
+ foldrSShared k z (build g) = g k z #-}
+
+{-# RULES "foldrSShared/cons/build"
+ forall k z x (g :: forall b. (a -> b -> b) -> b -> b).
+ foldrSShared k z (x `cons` build g) = k x (g k z) #-}
+
+-- build a stream by applying cons and nil to a build function
+{-# INLINE_NORMAL buildS #-}
+buildS :: IsStream t => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a
+buildS g = g cons nil
+
+{-# RULES "foldrS/buildS"
+ forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a).
+ foldrS k z (buildS g) = g k z #-}
+
+{-# RULES "foldrS/cons/buildS"
+ forall k z x (g :: (a -> t m a -> t m a) -> t m a -> t m a).
+ foldrS k z (x `cons` buildS g) = k x (g k z) #-}
+
+{-# RULES "foldrSShared/buildS"
+ forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a).
+ foldrSShared k z (buildS g) = g k z #-}
+
+{-# RULES "foldrSShared/cons/buildS"
+ forall k z x (g :: (a -> t m a -> t m a) -> t m a -> t m a).
+ foldrSShared k z (x `cons` buildS g) = k x (g k z) #-}
+
+-- build a stream by applying consM and nil to a build function
+{-# INLINE_NORMAL buildSM #-}
+buildSM :: (IsStream t, MonadAsync m)
+ => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a
+buildSM g = g consM nil
+
+{-# RULES "foldrSM/buildSM"
+ forall k z (g :: (m a -> t m a -> t m a) -> t m a -> t m a).
+ foldrSM k z (buildSM g) = g k z #-}
+
+{-# RULES "foldrSMShared/buildSM"
+ forall k z (g :: (m a -> t m a -> t m a) -> t m a -> t m a).
+ foldrSMShared k z (buildSM g) = g k z #-}
+
+-- Disabled because this may not fire as consM is a class Op
+{-
+{-# RULES "foldrS/consM/buildSM"
+ forall k z x (g :: (m a -> t m a -> t m a) -> t m a -> t m a)
+ . foldrSM k z (x `consM` buildSM g)
+ = k x (g k z)
+#-}
+-}
+
+-- Build using monadic build functions (continuations) instead of
+-- reconstructing a stream.
+{-# INLINE_NORMAL buildM #-}
+buildM :: (IsStream t, MonadAsync m)
+ => (forall r. (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r
+ )
+ -> t m a
+buildM g = mkStream $ \st yld sng stp ->
+ g (\a r -> foldStream st yld sng stp (return a `consM` r)) sng stp
+
+-- | Like 'buildM' but shares the SVar state across computations.
+{-# INLINE_NORMAL sharedM #-}
+sharedM :: (IsStream t, MonadAsync m)
+ => (forall r. (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r
+ )
+ -> t m a
+sharedM g = mkStream $ \st yld sng stp ->
+ g (\a r -> foldStreamShared st yld sng stp (return a `consM` r)) sng stp
+
+-------------------------------------------------------------------------------
+-- augment
+-------------------------------------------------------------------------------
+
+{-# INLINE_NORMAL augmentS #-}
+augmentS :: IsStream t
+ => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
+augmentS g xs = g cons xs
+
+{-# RULES "augmentS/nil"
+ forall (g :: (a -> t m a -> t m a) -> t m a -> t m a).
+ augmentS g nil = buildS g
+ #-}
+
+{-# RULES "foldrS/augmentS"
+ forall k z xs (g :: (a -> t m a -> t m a) -> t m a -> t m a).
+ foldrS k z (augmentS g xs) = g k (foldrS k z xs)
+ #-}
+
+{-# RULES "augmentS/buildS"
+ forall (g :: (a -> t m a -> t m a) -> t m a -> t m a)
+ (h :: (a -> t m a -> t m a) -> t m a -> t m a).
+ augmentS g (buildS h) = buildS (\c n -> g c (h c n))
+ #-}
+
+{-# INLINE_NORMAL augmentSM #-}
+augmentSM :: (IsStream t, MonadAsync m)
+ => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
+augmentSM g xs = g consM xs
+
+{-# RULES "augmentSM/nil"
+ forall (g :: (m a -> t m a -> t m a) -> t m a -> t m a).
+ augmentSM g nil = buildSM g
+ #-}
+
+{-# RULES "foldrSM/augmentSM"
+ forall k z xs (g :: (m a -> t m a -> t m a) -> t m a -> t m a).
+ foldrSM k z (augmentSM g xs) = g k (foldrSM k z xs)
+ #-}
+
+{-# RULES "augmentSM/buildSM"
+ forall (g :: (m a -> t m a -> t m a) -> t m a -> t m a)
+ (h :: (m a -> t m a -> t m a) -> t m a -> t m a).
+ augmentSM g (buildSM h) = buildSM (\c n -> g c (h c n))
+ #-}
+
+-------------------------------------------------------------------------------
+-- Experimental foldrM/buildM
+-------------------------------------------------------------------------------
+
+-- | Lazy right fold with a monadic step function.
+{-# INLINE_NORMAL foldrM #-}
+foldrM :: IsStream t => (a -> m b -> m b) -> m b -> t m a -> m b
+foldrM step acc m = go m
+ where
+ go m1 =
+ let stop = acc
+ single a = step a acc
+ yieldk a r = step a (go r)
+ in foldStream defState yieldk single stop m1
+
+{-# INLINE_NORMAL foldrMKWith #-}
+foldrMKWith
+ :: (State Stream m a
+ -> (a -> t m a -> m b)
+ -> (a -> m b)
+ -> m b
+ -> t m a
+ -> m b)
+ -> (a -> m b -> m b)
+ -> m b
+ -> ((a -> t m a -> m b) -> (a -> m b) -> m b -> m b)
+ -> m b
+foldrMKWith f step acc g = go g
+ where
+ go k =
+ let stop = acc
+ single a = step a acc
+ yieldk a r = step a (go (\yld sng stp -> f defState yld sng stp r))
+ in k yieldk single stop
+
+{-
+{-# RULES "foldrM/buildS"
+ forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a)
+ . foldrM k z (buildS g)
+ = g k z
+#-}
+-}
+-- XXX in which case will foldrM/buildM fusion be useful?
+{-# RULES "foldrM/buildM"
+ forall step acc (g :: (forall r.
+ (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r
+ )).
+ foldrM step acc (buildM g) = foldrMKWith foldStream step acc g
+ #-}
+
+{-# RULES "foldrM/sharedM"
+ forall step acc (g :: (forall r.
+ (a -> t m a -> m r)
+ -> (a -> m r)
+ -> m r
+ -> m r
+ )).
+ foldrM step acc (sharedM g) = foldrMKWith foldStreamShared step acc g
+ #-}
+
+------------------------------------------------------------------------------
+-- Left fold
+------------------------------------------------------------------------------
+
+-- | Strict left fold with an extraction function. Like the standard strict
+-- left fold, but applies a user supplied extraction function (the third
+-- argument) to the folded value at the end. This is designed to work with the
+-- @foldl@ library. The suffix @x@ is a mnemonic for extraction.
+--
+-- Note that the accumulator is always evaluated including the initial value.
+{-# INLINE foldlx' #-}
+foldlx' :: forall t m a b x. (IsStream t, Monad m)
+ => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
+foldlx' step begin done m = get $ go m begin
+ where
+ {-# NOINLINE get #-}
+ get :: t m x -> m b
+ get m1 =
+ -- XXX we are not strictly evaluating the accumulator here. Is this
+ -- okay?
+ let single = return . done
+ -- XXX this is foldSingleton. why foldStreamShared?
+ in foldStreamShared undefined undefined single undefined m1
+
+ -- Note, this can be implemented by making a recursive call to "go",
+ -- however that is more expensive because of unnecessary recursion
+ -- that cannot be tail call optimized. Unfolding recursion explicitly via
+ -- continuations is much more efficient.
+ go :: t m a -> x -> t m x
+ go m1 !acc = mkStream $ \_ yld sng _ ->
+ let stop = sng acc
+ single a = sng $ step acc a
+ -- XXX this is foldNonEmptyStream
+ yieldk a r = foldStream defState yld sng undefined $
+ go r (step acc a)
+ in foldStream defState yieldk single stop m1
+
+-- | Strict left associative fold.
+{-# INLINE foldl' #-}
+foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b
+foldl' step begin = foldlx' step begin id
+
+------------------------------------------------------------------------------
+-- Semigroup
+------------------------------------------------------------------------------
+
+infixr 6 `serial`
+
+-- | Appends two streams sequentially, yielding all elements from the first
+-- stream, and then all elements from the second stream.
+--
+-- >>> import Streamly.Prelude (serial)
+-- >>> stream1 = Stream.fromList [1,2]
+-- >>> stream2 = Stream.fromList [3,4]
+-- >>> Stream.toList $ stream1 `serial` stream2
+-- [1,2,3,4]
+--
+-- This operation can be used to fold an infinite lazy container of streams.
+--
+-- /Since: 0.2.0 ("Streamly")/
+--
+-- @since 0.8.0
+{-# INLINE serial #-}
+serial :: IsStream t => t m a -> t m a -> t m a
+-- XXX This doubles the time of toNullAp benchmark, may not be fusing properly
+-- serial xs ys = augmentS (\c n -> foldrS c n xs) ys
+serial m1 m2 = go m1
+ where
+ go m = mkStream $ \st yld sng stp ->
+ let stop = foldStream st yld sng stp m2
+ single a = yld a m2
+ yieldk a r = yld a (go r)
+ in foldStream st yieldk single stop m
+
+-- join/merge/append streams depending on consM
+{-# INLINE conjoin #-}
+conjoin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
+conjoin xs ys = augmentSM (\c n -> foldrSM c n xs) ys
+
+instance Semigroup (Stream m a) where
+ (<>) = serial
+
+------------------------------------------------------------------------------
+-- Monoid
+------------------------------------------------------------------------------
+
+instance Monoid (Stream m a) where
+ mempty = nil
+ mappend = (<>)
+
+-------------------------------------------------------------------------------
+-- Functor
+-------------------------------------------------------------------------------
+
+-- Note eta expanded
+{-# INLINE_LATE mapFB #-}
+mapFB :: forall (t :: (Type -> Type) -> Type -> Type) b m a.
+ (b -> t m b -> t m b) -> (a -> b) -> a -> t m b -> t m b
+mapFB c f = \x ys -> c (f x) ys
+#undef Type
+
+{-# RULES
+"mapFB/mapFB" forall c f g. mapFB (mapFB c f) g = mapFB c (f . g)
+"mapFB/id" forall c. mapFB c (\x -> x) = c
+ #-}
+
+{-# INLINE map #-}
+map :: IsStream t => (a -> b) -> t m a -> t m b
+map f xs = buildS (\c n -> foldrS (mapFB c f) n xs)
+
+-- XXX This definition might potentially be more efficient, but the cost in the
+-- benchmark is dominated by unfoldrM cost so we cannot correctly determine
+-- differences in the mapping cost. We should perhaps deduct the cost of
+-- unfoldrM from the benchmarks and then compare.
+{-
+map f m = go m
+ where
+ go m1 =
+ mkStream $ \st yld sng stp ->
+ let single = sng . f
+ yieldk a r = yld (f a) (go r)
+ in foldStream (adaptState st) yieldk single stp m1
+-}
+
+{-# INLINE_LATE mapMFB #-}
+mapMFB :: Monad m => (m b -> t m b -> t m b) -> (a -> m b) -> m a -> t m b -> t m b
+mapMFB c f = \x ys -> c (x >>= f) ys
+
+{-# RULES
+ "mapMFB/mapMFB" forall c f g. mapMFB (mapMFB c f) g = mapMFB c (f >=> g)
+ #-}
+-- XXX These rules may never fire because pure/return type class rules will
+-- fire first.
+{-
+"mapMFB/pure" forall c. mapMFB c (\x -> pure x) = c
+"mapMFB/return" forall c. mapMFB c (\x -> return x) = c
+-}
+
+-- Be careful when modifying this, this uses a consM (|:) deliberately to allow
+-- other stream types to overload it.
+{-# INLINE mapM #-}
+mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
+mapM f = foldrSShared (\x xs -> f x `consM` xs) nil
+-- See note under map definition above.
+{-
+mapM f m = go m
+ where
+ go m1 = mkStream $ \st yld sng stp ->
+ let single a = f a >>= sng
+ yieldk a r = foldStreamShared st yld sng stp $ f a |: go r
+ in foldStream (adaptState st) yieldk single stp m1
+ -}
+
+-- This is experimental serial version supporting fusion.
+--
+-- XXX what if we do not want to fuse two concurrent mapMs?
+-- XXX we can combine two concurrent mapM only if the SVar is of the same type
+-- So for now we use it only for serial streams.
+-- XXX fusion would be easier for monomoprhic stream types.
+-- {-# RULES "mapM serial" mapM = mapMSerial #-}
+{-# INLINE mapMSerial #-}
+mapMSerial :: MonadAsync m => (a -> m b) -> Stream m a -> Stream m b
+mapMSerial f xs = buildSM (\c n -> foldrSMShared (mapMFB c f) n xs)
+
+-- XXX in fact use the Stream type everywhere and only use polymorphism in the
+-- high level modules/prelude.
+instance Monad m => Functor (Stream m) where
+ fmap = map
+
+-------------------------------------------------------------------------------
+-- Transformers
+-------------------------------------------------------------------------------
+
+{-
+instance MonadTrans Stream where
+ {-# INLINE lift #-}
+ lift = yieldM
+-}
+
+-------------------------------------------------------------------------------
+-- Nesting
+-------------------------------------------------------------------------------
+
+-- | Detach a stream from an SVar
+{-# INLINE unShare #-}
+unShare :: IsStream t => t m a -> t m a
+unShare x = mkStream $ \st yld sng stp ->
+ foldStream st yld sng stp x
+
+-- XXX the function stream and value stream can run in parallel
+{-# INLINE apWith #-}
+apWith
+ :: IsStream t
+ => (t m b -> t m b -> t m b)
+ -> t m (a -> b)
+ -> t m a
+ -> t m b
+apWith par fstream stream = go1 fstream
+
+ where
+
+ go1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single f = foldShared $ unShare (go2 f stream)
+ yieldk f r = foldShared $ unShare (go2 f stream) `par` go1 r
+ in foldStream (adaptState st) yieldk single stp m
+
+ go2 f m =
+ mkStream $ \st yld sng stp ->
+ let single a = sng (f a)
+ yieldk a r = yld (f a) (go2 f r)
+ in foldStream (adaptState st) yieldk single stp m
+
+{-# INLINE apSerial #-}
+apSerial
+ :: IsStream t
+ => t m (a -> b)
+ -> t m a
+ -> t m b
+apSerial fstream stream = go1 fstream
+
+ where
+
+ go1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single f = foldShared $ go3 f stream
+ yieldk f r = foldShared $ go2 f r stream
+ in foldStream (adaptState st) yieldk single stp m
+
+ go2 f r1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ stop = foldShared $ go1 r1
+ single a = yld (f a) (go1 r1)
+ yieldk a r = yld (f a) (go2 f r1 r)
+ in foldStream (adaptState st) yieldk single stop m
+
+ go3 f m =
+ mkStream $ \st yld sng stp ->
+ let single a = sng (f a)
+ yieldk a r = yld (f a) (go3 f r)
+ in foldStream (adaptState st) yieldk single stp m
+
+{-# INLINE apSerialDiscardFst #-}
+apSerialDiscardFst
+ :: IsStream t
+ => t m a
+ -> t m b
+ -> t m b
+apSerialDiscardFst fstream stream = go1 fstream
+
+ where
+
+ go1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single _ = foldShared $ stream
+ yieldk _ r = foldShared $ go2 r stream
+ in foldStream (adaptState st) yieldk single stp m
+
+ go2 r1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ stop = foldShared $ go1 r1
+ single a = yld a (go1 r1)
+ yieldk a r = yld a (go2 r1 r)
+ in foldStream st yieldk single stop m
+
+{-# INLINE apSerialDiscardSnd #-}
+apSerialDiscardSnd
+ :: IsStream t
+ => t m a
+ -> t m b
+ -> t m a
+apSerialDiscardSnd fstream stream = go1 fstream
+
+ where
+
+ go1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single f = foldShared $ go3 f stream
+ yieldk f r = foldShared $ go2 f r stream
+ in foldStream st yieldk single stp m
+
+ go2 f r1 m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ stop = foldShared $ go1 r1
+ single _ = yld f (go1 r1)
+ yieldk _ r = yld f (go2 f r1 r)
+ in foldStream (adaptState st) yieldk single stop m
+
+ go3 f m =
+ mkStream $ \st yld sng stp ->
+ let single _ = sng f
+ yieldk _ r = yld f (go3 f r)
+ in foldStream (adaptState st) yieldk single stp m
+
+-- XXX This is just concatMapBy with arguments flipped. We need to keep this
+-- instead of using a concatMap style definition because the bind
+-- implementation in Async and WAsync streams show significant perf degradation
+-- if the argument order is changed.
+{-# INLINE bindWith #-}
+bindWith
+ :: IsStream t
+ => (t m b -> t m b -> t m b)
+ -> t m a
+ -> (a -> t m b)
+ -> t m b
+bindWith par m1 f = go m1
+ where
+ go m =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single a = foldShared $ unShare (f a)
+ yieldk a r = foldShared $ unShare (f a) `par` go r
+ in foldStream (adaptState st) yieldk single stp m
+
+-- XXX express in terms of foldrS?
+-- XXX can we use a different stream type for the generated stream being
+-- falttened so that we can combine them differently and keep the resulting
+-- stream different?
+-- XXX do we need specialize to IO?
+-- XXX can we optimize when c and a are same, by removing the forall using
+-- rewrite rules with type applications?
+
+-- | Perform a 'concatMap' using a specified concat strategy. The first
+-- argument specifies a merge or concat function that is used to merge the
+-- streams generated by the map function. For example, the concat function
+-- could be 'serial', 'parallel', 'async', 'ahead' or any other zip or merge
+-- function.
+--
+-- @since 0.7.0
+{-# INLINE concatMapBy #-}
+concatMapBy
+ :: IsStream t
+ => (t m b -> t m b -> t m b)
+ -> (a -> t m b)
+ -> t m a
+ -> t m b
+concatMapBy par f xs = bindWith par xs f
+
+{-# INLINE concatMap #-}
+concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b
+concatMap f m = fromStream $
+ concatMapBy serial
+ (\a -> adapt $ toStream $ f a)
+ (adapt $ toStream m)
+
+{-
+-- Fused version.
+-- XXX This fuses but when the stream is nil this performs poorly.
+-- The filterAllOut benchmark degrades. Need to investigate and fix that.
+{-# INLINE concatMap #-}
+concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b
+concatMap f xs = buildS
+ (\c n -> foldrS (\x b -> foldrS c b (f x)) n xs)
+
+-- Stream polymorphic concatMap implementation
+-- XXX need to use buildSM/foldrSMShared for parallel behavior
+-- XXX unShare seems to degrade the fused performance
+{-# INLINE_EARLY concatMap_ #-}
+concatMap_ :: IsStream t => (a -> t m b) -> t m a -> t m b
+concatMap_ f xs = buildS
+ (\c n -> foldrSShared (\x b -> foldrSShared c b (unShare $ f x)) n xs)
+-}
+
+-- | See 'Streamly.Internal.Data.Stream.IsStream.concatPairsWith' for
+-- documentation.
+--
+{-# INLINE concatPairsWith #-}
+concatPairsWith
+ :: IsStream t
+ => (t m b -> t m b -> t m b)
+ -> (a -> t m b)
+ -> t m a
+ -> t m b
+concatPairsWith combine f = go Nothing
+
+ where
+
+ go Nothing stream =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single a = foldShared $ unShare (f a)
+ yieldk a r = foldShared $ go (Just a) r
+ in foldStream (adaptState st) yieldk single stp stream
+ go (Just a1) stream =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ stop = foldShared $ unShare (f a1)
+ single a = foldShared $ unShare (f a1) `combine` f a
+ yieldk a r =
+ foldShared
+ $ concatPairsWith combine
+ (\(x,y) -> combine (unShare x) y)
+ $ (f a1, f a) `cons` makePairs Nothing r
+ in foldStream (adaptState st) yieldk single stop stream
+
+ makePairs Nothing stream =
+ mkStream $ \st yld sng stp ->
+ let foldShared = foldStreamShared st yld sng stp
+ single a = sng (f a, nil)
+ yieldk a r = foldShared $ makePairs (Just a) r
+ in foldStream (adaptState st) yieldk single stp stream
+ makePairs (Just a1) stream =
+ mkStream $ \st yld sng _ ->
+ let stop = sng (f a1, nil)
+ single a = sng (f a1, f a)
+ yieldk a r = yld (f a1, f a) (makePairs Nothing r)
+ in foldStream (adaptState st) yieldk single stop stream
+
+instance Monad m => Applicative (Stream m) where
+ {-# INLINE pure #-}
+ pure = yield
+ {-# INLINE (<*>) #-}
+ (<*>) = ap
+
+-- NOTE: even though concatMap for StreamD is 3x faster compared to StreamK,
+-- the monad instance of StreamD is slower than StreamK after foldr/build
+-- fusion.
+instance Monad m => Monad (Stream m) where
+ {-# INLINE return #-}
+ return = pure
+ {-# INLINE (>>=) #-}
+ (>>=) = flip concatMap
+
+{-
+-- Like concatMap but generates stream using an unfold function. Similar to
+-- unfoldMany but for StreamK.
+concatUnfoldr :: IsStream t
+ => (b -> t m (Maybe (a, b))) -> t m b -> t m a
+concatUnfoldr = undefined
+-}
+{-# INLINE unfoldrM #-}
+unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
+unfoldrM step = go
+ where
+ go s = sharedM $ \yld _ stp -> do
+ r <- step s
+ case r of
+ Just (a, b) -> yld a (go b)
+ Nothing -> stp
+
+{-# INLINE drain #-}
+drain :: (Monad m, IsStream t) => t m a -> m ()
+drain = foldrM (\_ xs -> xs) (return ())
diff --git a/testsuite/tests/profiling/should_compile/T19894/Unfold.hs b/testsuite/tests/profiling/should_compile/T19894/Unfold.hs
new file mode 100644
index 0000000000..838c8bf344
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/Unfold.hs
@@ -0,0 +1,65 @@
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE ExistentialQuantification #-}
+module Unfold
+ ( Unfold (..)
+ , supplyFirst
+ , many
+ , lmap
+ )
+
+where
+
+import Step (Step(..))
+#if defined(FUSION_PLUGIN)
+import Fusion.Plugin.Types (Fuse(..))
+#endif
+
+data Unfold m a b =
+ -- | @Unfold step inject@
+ forall s. Unfold (s -> m (Step s b)) (a -> m s)
+
+{-# INLINE [1] lmap #-}
+lmap :: (a -> c) -> Unfold m c b -> Unfold m a b
+lmap f (Unfold ustep uinject) = Unfold ustep (uinject Prelude.. f)
+
+{-# INLINE [1] supplyFirst #-}
+supplyFirst :: a -> Unfold m (a, b) c -> Unfold m b c
+supplyFirst a = lmap (a, )
+
+#if defined(FUSION_PLUGIN)
+{-# ANN type ConcatState Fuse #-}
+#endif
+data ConcatState s1 s2 = ConcatOuter s1 | ConcatInner s1 s2
+
+-- | Apply the second unfold to each output element of the first unfold and
+-- flatten the output in a single stream.
+--
+-- /Since: 0.8.0/
+--
+{-# INLINE [1] many #-}
+many :: Monad m => Unfold m a b -> Unfold m b c -> Unfold m a c
+many (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject
+
+ where
+
+ inject x = do
+ s <- inject1 x
+ return $ ConcatOuter s
+
+ {-# INLINE [0] step #-}
+ step (ConcatOuter st) = do
+ r <- step1 st
+ case r of
+ Yield x s -> do
+ innerSt <- inject2 x
+ return $ Skip (ConcatInner s innerSt)
+ Skip s -> return $ Skip (ConcatOuter s)
+ Stop -> return Stop
+
+ step (ConcatInner ost ist) = do
+ r <- step2 ist
+ return $ case r of
+ Yield x s -> Yield x (ConcatInner ost s)
+ Skip s -> Skip (ConcatInner ost s)
+ Stop -> Skip (ConcatOuter ost)
diff --git a/testsuite/tests/profiling/should_compile/T19894/inline.hs b/testsuite/tests/profiling/should_compile/T19894/inline.hs
new file mode 100644
index 0000000000..daa7ec659e
--- /dev/null
+++ b/testsuite/tests/profiling/should_compile/T19894/inline.hs
@@ -0,0 +1,27 @@
+-- We use fromStreamK/toStreamK to convert the direct style stream to CPS
+-- style. In the first phase we try fusing the fromStreamK/toStreamK using:
+--
+-- {-# RULES "fromStreamK/toStreamK fusion"
+-- forall s. toStreamK (fromStreamK s) = s #-}
+--
+-- If for some reason some of the operations could not be fused then we have
+-- fallback rules in the second phase. For example:
+--
+-- {-# INLINE_EARLY unfoldr #-}
+-- unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a
+-- unfoldr step seed = fromStreamS (S.unfoldr step seed)
+-- {-# RULES "unfoldr fallback to StreamK" [1]
+-- forall a b. S.toStreamK (S.unfoldr a b) = K.unfoldr a b #-}```
+--
+-- Then, fromStreamK/toStreamK are inlined in the last phase:
+--
+-- {-# INLINE_LATE toStreamK #-}
+-- toStreamK :: Monad m => Stream m a -> K.Stream m a```
+--
+-- The fallback rules make sure that if we could not fuse the direct style
+-- operations then better use the CPS style operation, because unfused direct
+-- style would have worse performance than the CPS style ops.
+
+#define INLINE_EARLY INLINE [2]
+#define INLINE_NORMAL INLINE [1]
+#define INLINE_LATE INLINE [0]
diff --git a/testsuite/tests/profiling/should_compile/all.T b/testsuite/tests/profiling/should_compile/all.T
index 6f41f6b4a4..6d16aeff6c 100644
--- a/testsuite/tests/profiling/should_compile/all.T
+++ b/testsuite/tests/profiling/should_compile/all.T
@@ -10,3 +10,4 @@ test('T12790', [only_ways(['normal']), req_profiling], compile, ['-O -prof'])
test('T14931', [when(opsys('mingw32'), skip), only_ways(['normal']), req_profiling],
makefile_test, ['T14931'])
test('T15108', [only_ways(['normal']), req_profiling], compile, ['-O -prof -fprof-auto'])
+test('T19894', [only_ways(['normal']), req_profiling, extra_files(['T19894'])], multimod_compile, ['Main', '-v0 -O2 -prof -fprof-auto -iT19894'])