diff options
19 files changed, 3844 insertions, 52 deletions
diff --git a/compiler/GHC/CoreToStg/Prep.hs b/compiler/GHC/CoreToStg/Prep.hs index 6c86ef990a..4529bc7d1b 100644 --- a/compiler/GHC/CoreToStg/Prep.hs +++ b/compiler/GHC/CoreToStg/Prep.hs @@ -20,7 +20,6 @@ where import GHC.Prelude import GHC.Platform -import GHC.Platform.Ways import GHC.Driver.Session import GHC.Driver.Env @@ -74,14 +73,12 @@ import GHC.Types.SrcLoc ( SrcSpan(..), realSrcLocSpan, mkRealSrcLoc ) import GHC.Types.Literal import GHC.Types.Tickish import GHC.Types.TyThing -import GHC.Types.CostCentre ( CostCentre, ccFromThisModule ) import GHC.Types.Unique.Supply import GHC.Data.Pair import Data.List ( unfoldr ) import Data.Functor.Identity import Control.Monad -import qualified Data.Set as S {- -- --------------------------------------------------------------------------- @@ -241,20 +238,15 @@ type CpeRhs = CoreExpr -- Non-terminal 'rhs' -} corePrepPgm :: HscEnv -> Module -> ModLocation -> CoreProgram -> [TyCon] - -> IO (CoreProgram, S.Set CostCentre) + -> IO CoreProgram corePrepPgm hsc_env this_mod mod_loc binds data_tycons = withTiming logger (text "CorePrep"<+>brackets (ppr this_mod)) - (\(a,b) -> a `seqList` b `seq` ()) $ do + (\a -> a `seqList` ()) $ do us <- mkSplitUniqSupply 's' initialCorePrepEnv <- mkInitialCorePrepEnv hsc_env - let cost_centres - | WayProf `S.member` ways dflags - = collectCostCentres this_mod binds - | otherwise - = S.empty - + let implicit_binds = mkDataConWorkers dflags mod_loc data_tycons -- NB: we must feed mkImplicitBinds through corePrep too -- so that they are suitably cloned and eta-expanded @@ -265,7 +257,7 @@ corePrepPgm hsc_env this_mod mod_loc binds data_tycons = return (deFloatTop (floats1 `appendFloats` floats2)) endPassIO hsc_env alwaysQualify CorePrep binds_out [] - return (binds_out, cost_centres) + return binds_out where dflags = hsc_dflags hsc_env logger = hsc_logger hsc_env @@ -2120,41 +2112,6 @@ wrapTicks (Floats flag floats0) expr = wrapBind t (NonRec binder rhs) = NonRec binder (mkTick t rhs) wrapBind t (Rec pairs) = Rec (mapSnd (mkTick t) pairs) ------------------------------------------------------------------------------- --- Collecting cost centres --- --------------------------------------------------------------------------- - --- | Collect cost centres defined in the current module, including those in --- unfoldings. -collectCostCentres :: Module -> CoreProgram -> S.Set CostCentre -collectCostCentres mod_name - = foldl' go_bind S.empty - where - go cs e = case e of - Var{} -> cs - Lit{} -> cs - App e1 e2 -> go (go cs e1) e2 - Lam _ e -> go cs e - Let b e -> go (go_bind cs b) e - Case scrt _ _ alts -> go_alts (go cs scrt) alts - Cast e _ -> go cs e - Tick (ProfNote cc _ _) e -> - go (if ccFromThisModule cc mod_name then S.insert cc cs else cs) e - Tick _ e -> go cs e - Type{} -> cs - Coercion{} -> cs - - go_alts = foldl' (\cs (Alt _con _bndrs e) -> go cs e) - - go_bind :: S.Set CostCentre -> CoreBind -> S.Set CostCentre - go_bind cs (NonRec b e) = - go (maybe cs (go cs) (get_unf b)) e - go_bind cs (Rec bs) = - foldl' (\cs' (b, e) -> go (maybe cs' (go cs') (get_unf b)) e) cs bs - - -- Unfoldings may have cost centres that in the original definion are - -- optimized away, see #5889. - get_unf = maybeUnfoldingTemplate . realIdUnfolding ------------------------------------------------------------------------------ diff --git a/compiler/GHC/Driver/Main.hs b/compiler/GHC/Driver/Main.hs index 2f40d7a00b..9a55807c0d 100644 --- a/compiler/GHC/Driver/Main.hs +++ b/compiler/GHC/Driver/Main.hs @@ -1565,6 +1565,7 @@ hscGenHardCode hsc_env cgguts location output_filename = do -- From now on, we just use the bits we need. cg_module = this_mod, cg_binds = core_binds, + cg_ccs = local_ccs, cg_tycons = tycons, cg_foreign = foreign_stubs0, cg_foreign_files = foreign_files, @@ -1582,7 +1583,7 @@ hscGenHardCode hsc_env cgguts location output_filename = do ------------------- -- PREPARE FOR CODE GENERATION -- Do saturation and convert to A-normal form - (prepd_binds, local_ccs) <- {-# SCC "CorePrep" #-} + (prepd_binds) <- {-# SCC "CorePrep" #-} corePrepPgm hsc_env this_mod location core_binds data_tycons @@ -1595,7 +1596,7 @@ hscGenHardCode hsc_env cgguts location output_filename = do (myCoreToStg logger dflags (hsc_IC hsc_env) this_mod location prepd_binds) let cost_centre_info = - (S.toList local_ccs ++ caf_ccs, caf_cc_stacks) + (local_ccs ++ caf_ccs, caf_cc_stacks) platform = targetPlatform dflags prof_init | sccProfilingEnabled dflags = profilingInitCode platform this_mod cost_centre_info @@ -1661,7 +1662,7 @@ hscInteractive hsc_env cgguts location = do ------------------- -- PREPARE FOR CODE GENERATION -- Do saturation and convert to A-normal form - (prepd_binds, _) <- {-# SCC "CorePrep" #-} + prepd_binds <- {-# SCC "CorePrep" #-} corePrepPgm hsc_env this_mod location core_binds data_tycons (stg_binds, _infotable_prov, _caf_ccs__caf_cc_stacks) @@ -1978,7 +1979,7 @@ hscParsedDecls hsc_env decls = runInteractiveHsc hsc_env $ do {- Prepare For Code Generation -} -- Do saturation and convert to A-normal form - (prepd_binds, _) <- {-# SCC "CorePrep" #-} + prepd_binds <- {-# SCC "CorePrep" #-} liftIO $ corePrepPgm hsc_env this_mod iNTERACTIVELoc core_binds data_tycons (stg_binds, _infotable_prov, _caf_ccs__caf_cc_stacks) diff --git a/compiler/GHC/HsToCore.hs b/compiler/GHC/HsToCore.hs index 1c11c17ac1..73c6accff4 100644 --- a/compiler/GHC/HsToCore.hs +++ b/compiler/GHC/HsToCore.hs @@ -45,13 +45,13 @@ import GHC.Core.FVs ( exprsSomeFreeVarsList ) import GHC.Core.SimpleOpt ( simpleOptPgm, simpleOptExpr ) import GHC.Core.Utils import GHC.Core.Unfold.Make -import GHC.Core.Ppr import GHC.Core.Coercion import GHC.Core.DataCon ( dataConWrapId ) import GHC.Core.Make import GHC.Core.Rules import GHC.Core.Opt.Monad ( CoreToDo(..) ) import GHC.Core.Lint ( endPassIO ) +import GHC.Core.Ppr import GHC.Builtin.Names import GHC.Builtin.Types.Prim diff --git a/compiler/GHC/Iface/Tidy.hs b/compiler/GHC/Iface/Tidy.hs index 101d470bdc..3285fb88e5 100644 --- a/compiler/GHC/Iface/Tidy.hs +++ b/compiler/GHC/Iface/Tidy.hs @@ -2,6 +2,7 @@ {-# LANGUAGE DeriveFunctor #-} {-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-} +{-# LANGUAGE NamedFieldPuns #-} {- (c) The GRASP/AQUA Project, Glasgow University, 1992-1998 @@ -77,6 +78,9 @@ import GHC.Data.Maybe import Control.Monad import Data.Function import Data.List ( sortBy, mapAccumL ) +import qualified Data.Set as S +import GHC.Platform.Ways +import GHC.Types.CostCentre {- Constructing the TypeEnv, Instances, Rules from which the @@ -429,6 +433,13 @@ tidyProgram hsc_env (ModGuts { mg_module = mod -- (c) Constructors even if they are not exported (the -- tidied TypeEnv has trimmed these away) ; alg_tycons = filter isAlgTyCon tcs + + + ; local_ccs + | WayProf `S.member` ways dflags + = collectCostCentres mod all_tidy_binds tidy_rules + | otherwise + = S.empty } ; endPassIO hsc_env print_unqual CoreTidy all_tidy_binds tidy_rules @@ -454,6 +465,7 @@ tidyProgram hsc_env (ModGuts { mg_module = mod ; return (CgGuts { cg_module = mod, cg_tycons = alg_tycons, cg_binds = all_tidy_binds, + cg_ccs = S.toList local_ccs, cg_foreign = add_spt_init_code foreign_stubs, cg_foreign_files = foreign_files, cg_dep_pkgs = dep_direct_pkgs deps, @@ -474,6 +486,53 @@ tidyProgram hsc_env (ModGuts { mg_module = mod dflags = hsc_dflags hsc_env logger = hsc_logger hsc_env + +------------------------------------------------------------------------------ +-- Collecting cost centres +-- --------------------------------------------------------------------------- + +-- | Collect cost centres defined in the current module, including those in +-- unfoldings. +collectCostCentres :: Module -> CoreProgram -> [CoreRule] -> S.Set CostCentre +collectCostCentres mod_name binds rules + = foldl' go_bind (go_rules S.empty) binds + where + go cs e = case e of + Var{} -> cs + Lit{} -> cs + App e1 e2 -> go (go cs e1) e2 + Lam _ e -> go cs e + Let b e -> go (go_bind cs b) e + Case scrt _ _ alts -> go_alts (go cs scrt) alts + Cast e _ -> go cs e + Tick (ProfNote cc _ _) e -> + go (if ccFromThisModule cc mod_name then S.insert cc cs else cs) e + Tick _ e -> go cs e + Type{} -> cs + Coercion{} -> cs + + go_alts = foldl' (\cs (Alt _con _bndrs e) -> go cs e) + + go_bind :: S.Set CostCentre -> CoreBind -> S.Set CostCentre + go_bind cs (NonRec b e) = + go (do_binder cs b) e + go_bind cs (Rec bs) = + foldl' (\cs' (b, e) -> go (do_binder cs' b) e) cs bs + + do_binder cs b = maybe cs (go cs) (get_unf b) + + + -- Unfoldings may have cost centres that in the original definion are + -- optimized away, see #5889. + get_unf = maybeUnfoldingTemplate . realIdUnfolding + + -- Have to look at the RHS of rules as well, as these may contain ticks which + -- don't appear anywhere else. See #19894 + go_rules cs = foldl' go cs (mapMaybe get_rhs rules) + + get_rhs Rule { ru_rhs } = Just ru_rhs + get_rhs BuiltinRule {} = Nothing + -------------------------- trimId :: Bool -> Id -> Id -- With -O0 we now trim off the arity, one-shot-ness, strictness diff --git a/compiler/GHC/Unit/Module/ModGuts.hs b/compiler/GHC/Unit/Module/ModGuts.hs index b40c980744..e799ebf2a1 100644 --- a/compiler/GHC/Unit/Module/ModGuts.hs +++ b/compiler/GHC/Unit/Module/ModGuts.hs @@ -34,6 +34,7 @@ import GHC.Types.Name.Reader import GHC.Types.SafeHaskell import GHC.Types.SourceFile ( HscSource(..), hscSourceToIsBoot ) import GHC.Types.SrcLoc +import GHC.Types.CostCentre -- | A ModGuts is carried through the compiler, accumulating stuff as it goes @@ -131,6 +132,7 @@ data CgGuts -- data constructor workers; reason: we regard them -- as part of the code-gen of tycons + cg_ccs :: [CostCentre], -- List of cost centres used in bindings and rules cg_foreign :: !ForeignStubs, -- ^ Foreign export stubs cg_foreign_files :: ![(ForeignSrcLang, FilePath)], cg_dep_pkgs :: ![UnitId], -- ^ Dependent packages, used to diff --git a/testsuite/tests/profiling/should_compile/T19894/Array.hs b/testsuite/tests/profiling/should_compile/T19894/Array.hs new file mode 100644 index 0000000000..75cad1fcef --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/Array.hs @@ -0,0 +1,74 @@ +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE RecordWildCards #-} +module Array + ( + Array(..) + , fromList + , read + , length + , writeNUnsafe + , MA.unsafeInlineIO + , MA.memcmp + , unsafeFreezeWithShrink + , foldl' + , unsafeIndexIO + ) + +where + +import Control.Monad.IO.Class (MonadIO(..)) +import Foreign.Storable (Storable(..)) +import GHC.ForeignPtr (ForeignPtr(..)) +import GHC.IO (unsafePerformIO) +import GHC.Ptr (Ptr(..)) +import Unfold (Unfold(..)) +import Fold (Fold(..)) +import qualified MArray as MA +import qualified Unfold as UF +import Prelude hiding (length, read) + +data Array a = + Array + { aStart :: {-# UNPACK #-} !(ForeignPtr a) -- first address + , aEnd :: {-# UNPACK #-} !(Ptr a) -- first unused addres + } + +{-# INLINE unsafeFreeze #-} +unsafeFreeze :: MA.Array a -> Array a +unsafeFreeze (MA.Array as ae _) = Array as ae + +{-# INLINABLE fromList #-} +fromList :: Storable a => [a] -> Array a +fromList xs = unsafeFreeze $ MA.fromList xs + +{-# INLINE [1] writeNUnsafe #-} +writeNUnsafe :: forall m a. (MonadIO m, Storable a) + => Int -> Fold m a (Array a) +writeNUnsafe n = unsafeFreeze <$> MA.writeNUnsafe n + +{-# INLINE unsafeThaw #-} +unsafeThaw :: Array a -> MA.Array a +unsafeThaw (Array as ae) = MA.Array as ae ae + +{-# INLINE length #-} +length :: forall a. Storable a => Array a -> Int +length arr = MA.length (unsafeThaw arr) + +{-# INLINE [1] read #-} +read :: forall m a. (Monad m, Storable a) => Unfold m (Array a) a +read = UF.lmap unsafeThaw MA.read + +{-# INLINE unsafeFreezeWithShrink #-} +unsafeFreezeWithShrink :: Storable a => MA.Array a -> Array a +unsafeFreezeWithShrink arr = unsafePerformIO $ do + MA.Array as ae _ <- MA.shrinkToFit arr + return $ Array as ae + +{-# INLINE [1] foldl' #-} +foldl' :: forall a b. Storable a => (b -> a -> b) -> b -> Array a -> b +foldl' f z arr = MA.foldl' f z (unsafeThaw arr) + +{-# INLINE [1] unsafeIndexIO #-} +unsafeIndexIO :: forall a. Storable a => Array a -> Int -> IO a +unsafeIndexIO arr = MA.unsafeIndexIO (unsafeThaw arr) + diff --git a/testsuite/tests/profiling/should_compile/T19894/Fold.hs b/testsuite/tests/profiling/should_compile/T19894/Fold.hs new file mode 100644 index 0000000000..7683b064f4 --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/Fold.hs @@ -0,0 +1,277 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE ExistentialQuantification #-} + +module Fold + ( + -- * Types + Step(..) + , Fold (..) + , sum + , chunksOf + , drain + , drainBy + ) +where + +import Data.Bifunctor (Bifunctor(..)) +#if defined(FUSION_PLUGIN) +import Fusion.Plugin.Types (Fuse(..)) +#endif +import Prelude hiding (sum, take) + +------------------------------------------------------------------------------ +-- Step of a fold +------------------------------------------------------------------------------ + +-- The Step functor around b allows expressing early termination like a right +-- fold. Traditional list right folds use function composition and laziness to +-- terminate early whereas we use data constructors. It allows stream fusion in +-- contrast to the foldr/build fusion when composing with functions. + +-- | Represents the result of the @step@ of a 'Fold'. 'Partial' returns an +-- intermediate state of the fold, the fold step can be called again with the +-- state or the driver can use @extract@ on the state to get the result out. +-- 'Done' returns the final result and the fold cannot be driven further. +-- +-- /Pre-release/ +-- +#if defined(FUSION_PLUGIN) +{-# ANN type Step Fuse #-} +#endif +data Step s b + = Partial !s + | Done !b + +-- | 'first' maps over 'Partial' and 'second' maps over 'Done'. +-- +instance Bifunctor Step where + {-# INLINE bimap #-} + bimap f _ (Partial a) = Partial (f a) + bimap _ g (Done b) = Done (g b) + + {-# INLINE first #-} + first f (Partial a) = Partial (f a) + first _ (Done x) = Done x + + {-# INLINE second #-} + second _ (Partial x) = Partial x + second f (Done a) = Done (f a) + +-- | 'fmap' maps over 'Done'. +-- +-- @ +-- fmap = 'second' +-- @ +-- +instance Functor (Step s) where + {-# INLINE fmap #-} + fmap = second + +{- +-- | Map a monadic function over the result @b@ in @Step s b@. +-- +-- /Internal/ +{-# INLINE mapMStep #-} +mapMStep :: Applicative m => (a -> m b) -> Step s a -> m (Step s b) +mapMStep f res = + case res of + Partial s -> pure $ Partial s + Done b -> Done <$> f b +-} + +------------------------------------------------------------------------------ +-- The Fold type +------------------------------------------------------------------------------ + +-- | The type @Fold m a b@ having constructor @Fold step initial extract@ +-- represents a fold over an input stream of values of type @a@ to a final +-- value of type @b@ in 'Monad' @m@. +-- +-- The fold uses an intermediate state @s@ as accumulator, the type @s@ is +-- internal to the specific fold definition. The initial value of the fold +-- state @s@ is returned by @initial@. The @step@ function consumes an input +-- and either returns the final result @b@ if the fold is done or the next +-- intermediate state (see 'Step'). At any point the fold driver can extract +-- the result from the intermediate state using the @extract@ function. +-- +-- NOTE: The constructor is not yet exposed via exposed modules, smart +-- constructors are provided to create folds. If you think you need the +-- constructor of this type please consider using the smart constructors in +-- "Streamly.Internal.Data.Fold' instead. +-- +-- /since 0.8.0 (type changed)/ +-- +-- @since 0.7.0 + +data Fold m a b = + -- | @Fold @ @ step @ @ initial @ @ extract@ + forall s. Fold (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b) + +instance Functor m => Functor (Fold m a) where + {-# INLINE fmap #-} + fmap f (Fold step1 initial1 extract) = Fold step initial (fmap2 f extract) + + where + + initial = fmap2 f initial1 + step s b = fmap2 f (step1 s b) + fmap2 g = fmap (fmap g) + +{-# INLINABLE lmapM #-} +lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r +lmapM f (Fold step begin done) = Fold step' begin done + where + step' x a = f a >>= step x + +-- | Make a fold from a left fold style pure step function and initial value of +-- the accumulator. +-- +-- If your 'Fold' returns only 'Partial' (i.e. never returns a 'Done') then you +-- can use @foldl'*@ constructors. +-- +-- A fold with an extract function can be expressed using fmap: +-- +-- @ +-- mkfoldlx :: Monad m => (s -> a -> s) -> s -> (s -> b) -> Fold m a b +-- mkfoldlx step initial extract = fmap extract (foldl' step initial) +-- @ +-- +-- See also: "Streamly.Prelude.foldl'" +-- +-- @since 0.8.0 +-- +{-# INLINE foldl' #-} +foldl' :: Monad m => (b -> a -> b) -> b -> Fold m a b +foldl' step initial = + Fold + (\s a -> return $ Partial $ step s a) + (return (Partial initial)) + return + +-- | Determine the sum of all elements of a stream of numbers. Returns additive +-- identity (@0@) when the stream is empty. Note that this is not numerically +-- stable for floating point numbers. +-- +-- > sum = fmap getSum $ Fold.foldMap Sum +-- +-- @since 0.7.0 +{-# INLINE sum #-} +sum :: (Monad m, Num a) => Fold m a a +sum = foldl' (+) 0 + +data Tuple' a b = Tuple' !a !b deriving Show + +{-# INLINE take #-} +take :: Monad m => Int -> Fold m a b -> Fold m a b +take n (Fold fstep finitial fextract) = Fold step initial extract + + where + + initial = do + res <- finitial + case res of + Partial s -> + if n > 0 + then return $ Partial $ Tuple' 0 s + else Done <$> fextract s + Done b -> return $ Done b + + step (Tuple' i r) a = do + res <- fstep r a + case res of + Partial sres -> do + let i1 = i + 1 + s1 = Tuple' i1 sres + if i1 < n + then return $ Partial s1 + else Done <$> fextract sres + Done bres -> return $ Done bres + + extract (Tuple' _ r) = fextract r + +#if defined(FUSION_PLUGIN) +{-# ANN type ManyState Fuse #-} +#endif +data ManyState s1 s2 + = ManyFirst !s1 !s2 + | ManyLoop !s1 !s2 + +-- | Collect zero or more applications of a fold. @many split collect@ applies +-- the @split@ fold repeatedly on the input stream and accumulates zero or more +-- fold results using @collect@. +-- +-- >>> two = Fold.take 2 Fold.toList +-- >>> twos = Fold.many two Fold.toList +-- >>> Stream.fold twos $ Stream.fromList [1..10] +-- [[1,2],[3,4],[5,6],[7,8],[9,10]] +-- +-- Stops when @collect@ stops. +-- +-- See also: "Streamly.Prelude.concatMap", "Streamly.Prelude.foldMany" +-- +-- @since 0.8.0 +-- +{-# INLINE many #-} +many :: Monad m => Fold m a b -> Fold m b c -> Fold m a c +many (Fold sstep sinitial sextract) (Fold cstep cinitial cextract) = + Fold step initial extract + + where + + -- cs = collect state + -- ss = split state + -- cres = collect state result + -- sres = split state result + -- cb = collect done + -- sb = split done + + -- Caution! There is mutual recursion here, inlining the right functions is + -- important. + + {-# INLINE handleSplitStep #-} + handleSplitStep branch cs sres = + case sres of + Partial ss1 -> return $ Partial $ branch ss1 cs + Done sb -> runCollector ManyFirst cs sb + + {-# INLINE handleCollectStep #-} + handleCollectStep branch cres = + case cres of + Partial cs -> do + sres <- sinitial + handleSplitStep branch cs sres + Done cb -> return $ Done cb + + -- Do not inline this + runCollector branch cs sb = cstep cs sb >>= handleCollectStep branch + + initial = cinitial >>= handleCollectStep ManyFirst + + {-# INLINE step_ #-} + step_ ss cs a = do + sres <- sstep ss a + handleSplitStep ManyLoop cs sres + + {-# INLINE step #-} + step (ManyFirst ss cs) a = step_ ss cs a + step (ManyLoop ss cs) a = step_ ss cs a + + extract (ManyFirst _ cs) = cextract cs + extract (ManyLoop ss cs) = do + sb <- sextract ss + cres <- cstep cs sb + case cres of + Partial s -> cextract s + Done b -> return b + +{-# INLINE chunksOf #-} +chunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c +chunksOf n split = many (take n split) + +{-# INLINABLE drain #-} +drain :: Monad m => Fold m a () +drain = foldl' (\_ _ -> ()) () + +{-# INLINABLE drainBy #-} +drainBy :: Monad m => (a -> m b) -> Fold m a () +drainBy f = lmapM f drain diff --git a/testsuite/tests/profiling/should_compile/T19894/Handle.hs b/testsuite/tests/profiling/should_compile/T19894/Handle.hs new file mode 100644 index 0000000000..d1222312fd --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/Handle.hs @@ -0,0 +1,80 @@ +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE RecordWildCards #-} +module Handle + ( + write + , read + ) + +where + +import Control.Monad.IO.Class (MonadIO(..)) +import Data.Word (Word8) +import Foreign.Storable (Storable(..)) +import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr) +import Foreign.ForeignPtr (withForeignPtr) +import Foreign.Ptr (plusPtr, minusPtr) +import GHC.ForeignPtr (mallocPlainForeignPtrBytes) +import System.IO (Handle, hGetBufSome, hPutBuf) +import Unfold (Unfold(..)) +import Fold (Fold(..)) +import Array (Array(..)) +import qualified MArray as MA +import qualified Fold as FL +import qualified Unfold as UF +import qualified StreamD as D +import qualified Array as A +import Prelude hiding (length, read) + +{-# INLINABLE writeArray #-} +writeArray :: Storable a => Handle -> Array a -> IO () +writeArray _ arr | A.length arr == 0 = return () +writeArray h Array{..} = withForeignPtr aStart $ \p -> hPutBuf h p aLen + where + aLen = + let p = unsafeForeignPtrToPtr aStart + in aEnd `minusPtr` p + +{-# INLINE writeChunks #-} +writeChunks :: (MonadIO m, Storable a) => Handle -> Fold m (Array a) () +writeChunks h = FL.drainBy (liftIO . writeArray h) + +{-# INLINE writeWithBufferOf #-} +writeWithBufferOf :: MonadIO m => Int -> Handle -> Fold m Word8 () +writeWithBufferOf n h = FL.chunksOf n (A.writeNUnsafe n) (writeChunks h) + +{-# INLINE write #-} +write :: MonadIO m => Handle -> Fold m Word8 () +write = writeWithBufferOf MA.defaultChunkSize + +{-# INLINABLE readArrayUpto #-} +readArrayUpto :: Int -> Handle -> IO (Array Word8) +readArrayUpto size h = do + ptr <- mallocPlainForeignPtrBytes size + -- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8)) + withForeignPtr ptr $ \p -> do + n <- hGetBufSome h p size + -- XXX shrink only if the diff is significant + return $ + A.unsafeFreezeWithShrink $ + MA.mutableArray ptr (p `plusPtr` n) (p `plusPtr` size) + +{-# INLINE [1] readChunksWithBufferOf #-} +readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Handle) (Array Word8) +readChunksWithBufferOf = Unfold step return + where + {-# INLINE [0] step #-} + step (size, h) = do + arr <- liftIO $ readArrayUpto size h + return $ + case A.length arr of + 0 -> D.Stop + _ -> D.Yield arr (size, h) + +{-# INLINE readWithBufferOf #-} +readWithBufferOf :: MonadIO m => Unfold m (Int, Handle) Word8 +readWithBufferOf = UF.many readChunksWithBufferOf A.read + +{-# INLINE read #-} +read :: MonadIO m => Unfold m Handle Word8 +read = UF.supplyFirst MA.defaultChunkSize readWithBufferOf diff --git a/testsuite/tests/profiling/should_compile/T19894/MArray.hs b/testsuite/tests/profiling/should_compile/T19894/MArray.hs new file mode 100644 index 0000000000..f307bb344d --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/MArray.hs @@ -0,0 +1,372 @@ +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE MagicHash #-} +{-# LANGUAGE UnboxedTuples #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE ExistentialQuantification #-} +module MArray + ( + Array (..) + , writeNUnsafe + , length + , fromList + , fromListN + , defaultChunkSize + , read + , shrinkToFit + , mutableArray + , unsafeInlineIO + , memcmp + , foldl' + , unsafeIndexIO + ) + +where + +import Control.Exception (assert) +import Control.Monad (when, void) +import Data.Functor.Identity (runIdentity) +import Data.Word (Word8) +import Foreign.C.Types (CSize(..), CInt(..)) +import Fold (Fold(..)) +import Control.Monad.IO.Class (MonadIO(..)) +import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr) +import Foreign.ForeignPtr (withForeignPtr, touchForeignPtr) +import Foreign.Ptr (plusPtr, minusPtr, castPtr) +import Foreign.Storable (Storable(..)) +import GHC.Base (realWorld#) +import GHC.ForeignPtr (ForeignPtr(..)) +import GHC.IO (IO(IO), unsafePerformIO) +import GHC.Ptr (Ptr(..)) +import Step (Step(..)) +import Unfold (Unfold(..)) +import qualified GHC.ForeignPtr as GHC +import qualified Fold as FL +import qualified StreamD as D +import qualified StreamK as K +import Prelude hiding (length, read) + +data Array a = + Array + { aStart :: {-# UNPACK #-} !(ForeignPtr a) -- ^ first address + , aEnd :: {-# UNPACK #-} !(Ptr a) -- ^ first unused address + , aBound :: {-# UNPACK #-} !(Ptr a) -- ^ first address beyond allocated memory + } + +{-# INLINE mutableArray #-} +mutableArray :: + ForeignPtr a -> Ptr a -> Ptr a -> Array a +mutableArray = Array + +data ArrayUnsafe a = ArrayUnsafe + {-# UNPACK #-} !(ForeignPtr a) -- first address + {-# UNPACK #-} !(Ptr a) -- first unused address + +-- | allocate a new array using the provided allocator function. +{-# INLINE newArrayAlignedAllocWith #-} +newArrayAlignedAllocWith :: forall a. Storable a + => (Int -> Int -> IO (ForeignPtr a)) -> Int -> Int -> IO (Array a) +newArrayAlignedAllocWith alloc alignSize count = do + let size = count * sizeOf (undefined :: a) + fptr <- alloc size alignSize + let p = unsafeForeignPtrToPtr fptr + return $ Array + { aStart = fptr + , aEnd = p + , aBound = p `plusPtr` size + } + +{-# INLINE mallocForeignPtrAlignedBytes #-} +mallocForeignPtrAlignedBytes :: Int -> Int -> IO (GHC.ForeignPtr a) +mallocForeignPtrAlignedBytes = + GHC.mallocPlainForeignPtrAlignedBytes + +{-# INLINE newArrayAligned #-} +newArrayAligned :: forall a. Storable a => Int -> Int -> IO (Array a) +newArrayAligned = newArrayAlignedAllocWith mallocForeignPtrAlignedBytes + +{-# INLINE newArray #-} +newArray :: forall a. Storable a => Int -> IO (Array a) +newArray = newArrayAligned (alignment (undefined :: a)) + +-- | Like 'writeN' but does not check the array bounds when writing. The fold +-- driver must not call the step function more than 'n' times otherwise it will +-- corrupt the memory and crash. This function exists mainly because any +-- conditional in the step function blocks fusion causing 10x performance +-- slowdown. +-- +-- @since 0.7.0 +{-# INLINE [1] writeNUnsafe #-} +writeNUnsafe :: forall m a. (MonadIO m, Storable a) + => Int -> Fold m a (Array a) +writeNUnsafe n = Fold step initial extract + + where + + initial = do + (Array start end _) <- liftIO $ newArray (max n 0) + return $ FL.Partial $ ArrayUnsafe start end + + step (ArrayUnsafe start end) x = do + liftIO $ poke end x + return + $ FL.Partial + $ ArrayUnsafe start (end `plusPtr` sizeOf (undefined :: a)) + + extract (ArrayUnsafe start end) = return $ Array start end end -- liftIO . shrinkToFit + +{-# INLINE byteLength #-} +byteLength :: Array a -> Int +byteLength Array{..} = + let p = unsafeForeignPtrToPtr aStart + len = aEnd `minusPtr` p + in assert (len >= 0) len + +-- | /O(1)/ Get the length of the array i.e. the number of elements in the +-- array. +-- +-- @since 0.7.0 +{-# INLINE length #-} +length :: forall a. Storable a => Array a -> Int +length arr = byteLength arr `div` sizeOf (undefined :: a) + +{-# INLINE [1] fromStreamDN #-} +fromStreamDN :: forall m a. (MonadIO m, Storable a) + => Int -> D.Stream m a -> m (Array a) +fromStreamDN limit str = do + arr <- liftIO $ newArray limit + end <- D.foldlM' fwrite (return $ aEnd arr) $ D.take limit str + return $ arr {aEnd = end} + + where + + fwrite ptr x = do + liftIO $ poke ptr x + return $ ptr `plusPtr` sizeOf (undefined :: a) + +{-# INLINABLE fromListN #-} +fromListN :: Storable a => Int -> [a] -> Array a +fromListN n xs = unsafePerformIO $ fromStreamDN n $ D.fromList xs + +data GroupState s start end bound + = GroupStart s + | GroupBuffer s start end bound + | GroupYield start end bound (GroupState s start end bound) + | GroupFinish + +-- | @arraysOf n stream@ groups the input stream into a stream of +-- arrays of size n. +-- +-- @arraysOf n = StreamD.foldMany (Array.writeN n)@ +-- +-- /Pre-release/ +{-# INLINE [1] arraysOf #-} +arraysOf :: forall m a. (MonadIO m, Storable a) + => Int -> D.Stream m a -> D.Stream m (Array a) +-- XXX the idiomatic implementation leads to large regression in the D.reverse' +-- benchmark. It seems it has difficulty producing optimized code when +-- converting to StreamK. Investigate GHC optimizations. +-- arraysOf n = D.foldMany (writeN n) +arraysOf n (D.Stream step state) = + D.Stream step' (GroupStart state) + + where + + {-# INLINE [0] step' #-} + step' _ (GroupStart st) = do + when (n <= 0) $ + -- XXX we can pass the module string from the higher level API + error $ "Streamly.Internal.Data.Array.Foreign.Mut.Type.fromStreamDArraysOf: the size of " + ++ "arrays [" ++ show n ++ "] must be a natural number" + Array start end bound <- liftIO $ newArray n + return $ D.Skip (GroupBuffer st start end bound) + + step' gst (GroupBuffer st start end bound) = do + r <- step (K.adaptState gst) st + case r of + D.Yield x s -> do + liftIO $ poke end x + let end' = end `plusPtr` sizeOf (undefined :: a) + return $ + if end' >= bound + then D.Skip (GroupYield start end' bound (GroupStart s)) + else D.Skip (GroupBuffer s start end' bound) + D.Skip s -> return $ D.Skip (GroupBuffer s start end bound) + D.Stop -> return $ D.Skip (GroupYield start end bound GroupFinish) + + step' _ (GroupYield start end bound next) = + return $ D.Yield (Array start end bound) next + + step' _ GroupFinish = return D.Stop + +allocOverhead :: Int +allocOverhead = 2 * sizeOf (undefined :: Int) + +mkChunkSize :: Int -> Int +mkChunkSize n = let size = n - allocOverhead in max size 0 + +mkChunkSizeKB :: Int -> Int +mkChunkSizeKB n = mkChunkSize (n * k) + where k = 1024 + +defaultChunkSize :: Int +defaultChunkSize = mkChunkSizeKB 32 + +{-# INLINE bufferChunks #-} +bufferChunks :: (MonadIO m, Storable a) => + D.Stream m a -> m (K.Stream m (Array a)) +bufferChunks m = D.foldr K.cons K.nil $ arraysOf defaultChunkSize m + +data Producer m a b = + -- | @Producer step inject extract@ + forall s. Producer (s -> m (Step s b)) (a -> m s) (s -> m a) + +{-# INLINE unsafeInlineIO #-} +unsafeInlineIO :: IO a -> a +unsafeInlineIO (IO m) = case m realWorld# of (# _, r #) -> r + +data ReadUState a = ReadUState + {-# UNPACK #-} !(ForeignPtr a) -- foreign ptr with end of array pointer + {-# UNPACK #-} !(Ptr a) -- current pointer + +-- | Resumable unfold of an array. +-- +{-# INLINE [1] producer #-} +producer :: forall m a. (Monad m, Storable a) => Producer m (Array a) a +producer = Producer step inject extract + where + + inject (Array (ForeignPtr start contents) (Ptr end) _) = + return $ ReadUState (ForeignPtr end contents) (Ptr start) + + {-# INLINE [0] step #-} + step (ReadUState fp@(ForeignPtr end _) p) | p == Ptr end = + let x = unsafeInlineIO $ touchForeignPtr fp + in x `seq` return D.Stop + step (ReadUState fp p) = do + -- unsafeInlineIO allows us to run this in Identity monad for pure + -- toList/foldr case which makes them much faster due to not + -- accumulating the list and fusing better with the pure consumers. + -- + -- This should be safe as the array contents are guaranteed to be + -- evaluated/written to before we peek at them. + let !x = unsafeInlineIO $ peek p + return $ D.Yield x + (ReadUState fp (p `plusPtr` sizeOf (undefined :: a))) + + extract (ReadUState (ForeignPtr end contents) (Ptr p)) = + return $ Array (ForeignPtr p contents) (Ptr end) (Ptr end) + +{-# INLINE simplify #-} +simplify :: Producer m a b -> Unfold m a b +simplify (Producer step inject _) = Unfold step inject + +-- | Unfold an array into a stream. +-- +-- @since 0.7.0 +{-# INLINE [1] read #-} +read :: forall m a. (Monad m, Storable a) => Unfold m (Array a) a +read = simplify producer + +{-# INLINE fromStreamD #-} +fromStreamD :: (MonadIO m, Storable a) => D.Stream m a -> m (Array a) +fromStreamD m = do + buffered <- bufferChunks m + len <- K.foldl' (+) 0 (K.map length buffered) + fromStreamDN len $ D.unfoldMany read $ D.fromStreamK buffered + +-- | Create an 'Array' from a list. The list must be of finite size. +-- +-- @since 0.7.0 +{-# INLINABLE fromList #-} +fromList :: Storable a => [a] -> Array a +fromList xs = unsafePerformIO $ fromStreamD $ D.fromList xs + +foreign import ccall unsafe "string.h memcpy" c_memcpy + :: Ptr Word8 -> Ptr Word8 -> CSize -> IO (Ptr Word8) + +-- XXX we are converting Int to CSize +memcpy :: Ptr Word8 -> Ptr Word8 -> Int -> IO () +memcpy dst src len = void (c_memcpy dst src (fromIntegral len)) + +foreign import ccall unsafe "string.h memcmp" c_memcmp + :: Ptr Word8 -> Ptr Word8 -> CSize -> IO CInt + +-- XXX we are converting Int to CSize +-- return True if the memory locations have identical contents +{-# INLINE memcmp #-} +memcmp :: Ptr Word8 -> Ptr Word8 -> Int -> IO Bool +memcmp p1 p2 len = do + r <- c_memcmp p1 p2 (fromIntegral len) + return $ r == 0 + +{-# NOINLINE reallocAligned #-} +reallocAligned :: Int -> Int -> Array a -> IO (Array a) +reallocAligned alignSize newSize Array{..} = do + assert (aEnd <= aBound) (return ()) + let oldStart = unsafeForeignPtrToPtr aStart + let size = aEnd `minusPtr` oldStart + newPtr <- mallocForeignPtrAlignedBytes newSize alignSize + withForeignPtr newPtr $ \pNew -> do + memcpy (castPtr pNew) (castPtr oldStart) size + touchForeignPtr aStart + return $ Array + { aStart = newPtr + , aEnd = pNew `plusPtr` size + , aBound = pNew `plusPtr` newSize + } + +-- XXX can unaligned allocation be more efficient when alignment is not needed? +{-# INLINABLE realloc #-} +realloc :: forall a. Storable a => Int -> Array a -> IO (Array a) +realloc = reallocAligned (alignment (undefined :: a)) + +shrinkToFit :: forall a. Storable a => Array a -> IO (Array a) +shrinkToFit arr@Array{..} = do + assert (aEnd <= aBound) (return ()) + let start = unsafeForeignPtrToPtr aStart + let used = aEnd `minusPtr` start + waste = aBound `minusPtr` aEnd + -- if used == waste == 0 then do not realloc + -- if the wastage is more than 25% of the array then realloc + if used < 3 * waste + then realloc used arr + else return arr + +{-# INLINE [1] toStreamD #-} +toStreamD :: forall m a. (Monad m, Storable a) => Array a -> D.Stream m a +toStreamD Array{..} = + let p = unsafeForeignPtrToPtr aStart + in D.Stream step p + + where + + {-# INLINE [0] step #-} + step _ p | p == aEnd = return D.Stop + step _ p = do + -- unsafeInlineIO allows us to run this in Identity monad for pure + -- toList/foldr case which makes them much faster due to not + -- accumulating the list and fusing better with the pure consumers. + -- + -- This should be safe as the array contents are guaranteed to be + -- evaluated/written to before we peek at them. + let !x = unsafeInlineIO $ do + r <- peek p + touchForeignPtr aStart + return r + return $ D.Yield x (p `plusPtr` sizeOf (undefined :: a)) + +{-# INLINE [1] foldl' #-} +foldl' :: forall a b. Storable a => (b -> a -> b) -> b -> Array a -> b +foldl' f z arr = runIdentity $ D.foldl' f z $ toStreamD arr + +{-# INLINE [1] unsafeIndexIO #-} +unsafeIndexIO :: forall a. Storable a => Array a -> Int -> IO a +unsafeIndexIO Array {..} i = + withForeignPtr aStart $ \p -> do + let elemSize = sizeOf (undefined :: a) + elemOff = p `plusPtr` (elemSize * i) + assert (i >= 0 && elemOff `plusPtr` elemSize <= aEnd) + (return ()) + peek elemOff diff --git a/testsuite/tests/profiling/should_compile/T19894/Main.hs b/testsuite/tests/profiling/should_compile/T19894/Main.hs new file mode 100644 index 0000000000..0195d14407 --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/Main.hs @@ -0,0 +1,26 @@ +module Main (main) where + +import Data.Char (ord) +import Data.Word (Word8) +import System.IO (openFile, IOMode(..), Handle) +import StreamK (IsStream, MonadAsync) +import qualified Operations +import qualified StreamK +import qualified Fold +import qualified Handle +import qualified Array +import Array (Array) + +toarr :: String -> Array Word8 +toarr = Array.fromList . map (fromIntegral . ord) + +-- | Split on a word8 sequence. +splitOnSeq :: String -> Handle -> IO () +splitOnSeq str inh = + Operations.drain $ Operations.splitOnSeq (toarr str) Fold.drain + $ Operations.unfold Handle.read inh + +main :: IO () +main = do + inh <- openFile "input.txt" ReadMode + splitOnSeq "aa" inh diff --git a/testsuite/tests/profiling/should_compile/T19894/Operations.hs b/testsuite/tests/profiling/should_compile/T19894/Operations.hs new file mode 100644 index 0000000000..b17298558a --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/Operations.hs @@ -0,0 +1,221 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Operations (unfoldrM, drain, postscan, after_, replicate, fold, unfold, + splitOnSeq) where + +import Control.Monad.IO.Class (MonadIO(..)) +import Data.Bits (shiftR, shiftL, (.|.), (.&.)) +import Data.Word (Word, Word32) +import Fold (Fold(..)) +import Foreign.Storable (Storable(..)) +import GHC.Types (SPEC(..)) +import Array (Array) +import StreamK (IsStream, MonadAsync, adaptState) +import Step (Step(..)) +import Unfold (Unfold) +#if defined(FUSION_PLUGIN) +import Fusion.Plugin.Types (Fuse(..)) +#endif +import qualified StreamK as K +import qualified StreamD as D +import qualified Serial +import qualified Fold as FL +import qualified Array as A +import qualified Ring as RB +import Prelude hiding (replicate) + +{-# INLINE [2] unfoldrM #-} +unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a +unfoldrM = K.unfoldrM + +{-# RULES "unfoldrM serial" unfoldrM = unfoldrMSerial #-} +{-# INLINE [2] unfoldrMSerial #-} +unfoldrMSerial :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> K.Stream m a +unfoldrMSerial = Serial.unfoldrM + +{-# INLINE [2] drain #-} +drain :: (Monad m) => K.Stream m a -> m () +drain m = D.drain $ D.fromStreamK (K.toStream m) +{-# RULES "drain fallback to CPS" [1] + forall a. D.drain (D.fromStreamK a) = K.drain a #-} + +{-# INLINE [1] postscan #-} +postscan :: (IsStream t, Monad m) + => Fold m a b -> t m a -> t m b +postscan fld m = + D.fromStreamD $ D.postscanOnce fld $ D.toStreamD m + +{-# INLINE [1] replicate #-} +replicate :: (IsStream t, Monad m) => Int -> a -> t m a +replicate n = D.fromStreamD . D.replicate n + +{-# INLINE fold_ #-} +fold_ :: Monad m => Fold m a b -> K.Stream m a -> m (b, K.Stream m a) +fold_ fl strm = do + (b, str) <- D.fold_ fl $ D.toStreamD strm + return $! (b, D.fromStreamD str) + +{-# INLINE fold #-} +fold :: Monad m => Fold m a b -> K.Stream m a -> m b +fold fl strm = do + (b, _) <- fold_ fl strm + return $! b + +{-# INLINE after_ #-} +after_ :: (IsStream t, Monad m) => m b -> t m a -> t m a +after_ action xs = D.fromStreamD $ D.after_ action $ D.toStreamD xs + +{-# INLINE unfold #-} +unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b +unfold unf x = D.fromStreamD $ D.unfold unf x + +#if defined(FUSION_PLUGIN) +{-# ANN type SplitOnSeqState Fuse #-} +#endif +data SplitOnSeqState rb rh ck w fs s b x = + SplitOnSeqInit + | SplitOnSeqYield b (SplitOnSeqState rb rh ck w fs s b x) + | SplitOnSeqDone + + | SplitOnSeqWordInit !fs s + | SplitOnSeqWordLoop !w s !fs + | SplitOnSeqWordDone Int !fs !w + + | SplitOnSeqReinit (fs -> SplitOnSeqState rb rh ck w fs s b x) + +{-# INLINE [1] splitOnSeqD #-} +splitOnSeqD + :: forall m a b. (MonadIO m, Storable a, Enum a, Eq a) + => Array a + -> Fold m a b + -> D.Stream m a + -> D.Stream m b +splitOnSeqD patArr (Fold fstep initial done) (D.Stream step state) = + D.Stream stepOuter SplitOnSeqInit + + where + + patLen = A.length patArr + maxIndex = patLen - 1 + elemBits = sizeOf (undefined :: a) * 8 + + -- For word pattern case + wordMask :: Word + wordMask = (1 `shiftL` (elemBits * patLen)) - 1 + + elemMask :: Word + elemMask = (1 `shiftL` elemBits) - 1 + + wordPat :: Word + wordPat = wordMask .&. A.foldl' addToWord 0 patArr + + addToWord wd a = (wd `shiftL` elemBits) .|. fromIntegral (fromEnum a) + + skip = return . Skip + + nextAfterInit nextGen stepRes = + case stepRes of + FL.Partial s -> nextGen s + FL.Done b -> SplitOnSeqYield b (SplitOnSeqReinit nextGen) + + {-# INLINE yieldProceed #-} + yieldProceed nextGen fs = + initial >>= skip . SplitOnSeqYield fs . nextAfterInit nextGen + + {-# INLINE [0] stepOuter #-} + stepOuter _ SplitOnSeqInit = do + res <- initial + case res of + FL.Partial acc -> return $ Skip $ SplitOnSeqWordInit acc state + FL.Done b -> skip $ SplitOnSeqYield b SplitOnSeqInit + + stepOuter _ (SplitOnSeqYield x next) = return $ Yield x next + + --------------------------- + -- Checkpoint + --------------------------- + + stepOuter _ (SplitOnSeqReinit nextGen) = + initial >>= skip . nextAfterInit nextGen + + ----------------- + -- Done + ----------------- + + stepOuter _ SplitOnSeqDone = return Stop + + --------------------------- + -- Short Pattern - Shift Or + --------------------------- + + stepOuter _ (SplitOnSeqWordDone 0 fs _) = do + r <- done fs + skip $ SplitOnSeqYield r SplitOnSeqDone + stepOuter _ (SplitOnSeqWordDone n fs wrd) = do + let old = elemMask .&. (wrd `shiftR` (elemBits * (n - 1))) + r <- fstep fs (toEnum $ fromIntegral old) + case r of + FL.Partial fs1 -> skip $ SplitOnSeqWordDone (n - 1) fs1 wrd + FL.Done b -> do + let jump c = SplitOnSeqWordDone (n - 1) c wrd + yieldProceed jump b + + stepOuter gst (SplitOnSeqWordInit fs st0) = + go SPEC 0 0 st0 + + where + + {-# INLINE go #-} + go !_ !idx !wrd !st = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + let wrd1 = addToWord wrd x + if idx == maxIndex + then do + if wrd1 .&. wordMask == wordPat + then do + let jump c = SplitOnSeqWordInit c s + done fs >>= yieldProceed jump + else skip $ SplitOnSeqWordLoop wrd1 s fs + else go SPEC (idx + 1) wrd1 s + Skip s -> go SPEC idx wrd s + Stop -> do + if idx /= 0 + then skip $ SplitOnSeqWordDone idx fs wrd + else do + r <- done fs + skip $ SplitOnSeqYield r SplitOnSeqDone + + stepOuter gst (SplitOnSeqWordLoop wrd0 st0 fs0) = + go SPEC wrd0 st0 fs0 + + where + + {-# INLINE go #-} + go !_ !wrd !st !fs = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + let jump c = SplitOnSeqWordInit c s + wrd1 = addToWord wrd x + old = (wordMask .&. wrd) + `shiftR` (elemBits * (patLen - 1)) + r <- fstep fs (toEnum $ fromIntegral old) + case r of + FL.Partial fs1 -> do + if wrd1 .&. wordMask == wordPat + then done fs1 >>= yieldProceed jump + else go SPEC wrd1 s fs1 + FL.Done b -> yieldProceed jump b + Skip s -> go SPEC wrd s fs + Stop -> skip $ SplitOnSeqWordDone patLen fs wrd + +{-# INLINE splitOnSeq #-} +splitOnSeq + :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) + => Array a -> Fold m a b -> t m a -> t m b +splitOnSeq patt f m = D.fromStreamD $ splitOnSeqD patt f (D.toStreamD m) diff --git a/testsuite/tests/profiling/should_compile/T19894/Ring.hs b/testsuite/tests/profiling/should_compile/T19894/Ring.hs new file mode 100644 index 0000000000..715103055f --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/Ring.hs @@ -0,0 +1,254 @@ +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE ScopedTypeVariables #-} +-- | +-- Module : Streamly.Internal.Ring.Foreign +-- Copyright : (c) 2019 Composewell Technologies +-- License : BSD3 +-- Maintainer : streamly@composewell.com +-- Stability : experimental +-- Portability : GHC +-- + +module Ring + ( Ring(..) + + -- * Construction + , new + , advance + , moveBy + , startOf + + -- * Modification + , unsafeInsert + + -- * Folds + , unsafeFoldRing + , unsafeFoldRingM + , unsafeFoldRingFullM + , unsafeFoldRingNM + + -- * Fast Byte Comparisons + , unsafeEqArray + , unsafeEqArrayN + ) where + +import Control.Exception (assert) +import Foreign.ForeignPtr (ForeignPtr, withForeignPtr, touchForeignPtr) +import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr) +import Foreign.Ptr (plusPtr, minusPtr, castPtr) +import Foreign.Storable (Storable(..)) +import GHC.ForeignPtr (mallocPlainForeignPtrAlignedBytes) +import GHC.Ptr (Ptr(..)) +import Prelude hiding (length, concat) + +import Control.Monad.IO.Class (MonadIO(..)) + +import qualified Array as A + +-- | A ring buffer is a mutable array of fixed size. Initially the array is +-- empty, with ringStart pointing at the start of allocated memory. We call the +-- next location to be written in the ring as ringHead. Initially ringHead == +-- ringStart. When the first item is added, ringHead points to ringStart + +-- sizeof item. When the buffer becomes full ringHead would wrap around to +-- ringStart. When the buffer is full, ringHead always points at the oldest +-- item in the ring and the newest item added always overwrites the oldest +-- item. +-- +-- When using it we should keep in mind that a ringBuffer is a mutable data +-- structure. We should not leak out references to it for immutable use. +-- +data Ring a = Ring + { ringStart :: {-# UNPACK #-} !(ForeignPtr a) -- first address + , ringBound :: {-# UNPACK #-} !(Ptr a) -- first address beyond allocated memory + } + +-- | Get the first address of the ring as a pointer. +startOf :: Ring a -> Ptr a +startOf = unsafeForeignPtrToPtr . ringStart + +-- | Create a new ringbuffer and return the ring buffer and the ringHead. +-- Returns the ring and the ringHead, the ringHead is same as ringStart. +{-# INLINE new #-} +new :: forall a. Storable a => Int -> IO (Ring a, Ptr a) +new count = do + let size = count * sizeOf (undefined :: a) + fptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: a)) + let p = unsafeForeignPtrToPtr fptr + return (Ring + { ringStart = fptr + , ringBound = p `plusPtr` size + }, p) + +-- | Advance the ringHead by 1 item, wrap around if we hit the end of the +-- array. +{-# INLINE advance #-} +advance :: forall a. Storable a => Ring a -> Ptr a -> Ptr a +advance Ring{..} ringHead = + let ptr = ringHead `plusPtr` sizeOf (undefined :: a) + in if ptr < ringBound + then ptr + else unsafeForeignPtrToPtr ringStart + +-- | Move the ringHead by n items. The direction depends on the sign on whether +-- n is positive or negative. Wrap around if we hit the beginning or end of the +-- array. +{-# INLINE moveBy #-} +moveBy :: forall a. Storable a => Int -> Ring a -> Ptr a -> Ptr a +moveBy by Ring {..} ringHead = ringStartPtr `plusPtr` advanceFromHead + + where + + elemSize = sizeOf (undefined :: a) + ringStartPtr = unsafeForeignPtrToPtr ringStart + lenInBytes = ringBound `minusPtr` ringStartPtr + offInBytes = ringHead `minusPtr` ringStartPtr + len = assert (lenInBytes `mod` elemSize == 0) $ lenInBytes `div` elemSize + off = assert (offInBytes `mod` elemSize == 0) $ offInBytes `div` elemSize + advanceFromHead = (off + by `mod` len) * elemSize + +-- | Insert an item at the head of the ring, when the ring is full this +-- replaces the oldest item in the ring with the new item. This is unsafe +-- beause ringHead supplied is not verified to be within the Ring. Also, +-- the ringStart foreignPtr must be guaranteed to be alive by the caller. +{-# INLINE unsafeInsert #-} +unsafeInsert :: Storable a => Ring a -> Ptr a -> a -> IO (Ptr a) +unsafeInsert rb ringHead newVal = do + poke ringHead newVal + -- touchForeignPtr (ringStart rb) + return $ advance rb ringHead + +-- XXX remove all usage of A.unsafeInlineIO +-- +-- | Like 'unsafeEqArray' but compares only N bytes instead of entire length of +-- the ring buffer. This is unsafe because the ringHead Ptr is not checked to +-- be in range. +{-# INLINE unsafeEqArrayN #-} +unsafeEqArrayN :: Ring a -> Ptr a -> A.Array a -> Int -> Bool +unsafeEqArrayN Ring{..} rh A.Array{..} n = + let !res = A.unsafeInlineIO $ do + let rs = unsafeForeignPtrToPtr ringStart + as = unsafeForeignPtrToPtr aStart + assert (aEnd `minusPtr` as >= ringBound `minusPtr` rs) (return ()) + let len = ringBound `minusPtr` rh + r1 <- A.memcmp (castPtr rh) (castPtr as) (min len n) + r2 <- if n > len + then A.memcmp (castPtr rs) (castPtr (as `plusPtr` len)) + (min (rh `minusPtr` rs) (n - len)) + else return True + -- XXX enable these, check perf impact + -- touchForeignPtr ringStart + -- touchForeignPtr aStart + return (r1 && r2) + in res + +-- | Byte compare the entire length of ringBuffer with the given array, +-- starting at the supplied ringHead pointer. Returns true if the Array and +-- the ringBuffer have identical contents. +-- +-- This is unsafe because the ringHead Ptr is not checked to be in range. The +-- supplied array must be equal to or bigger than the ringBuffer, ARRAY BOUNDS +-- ARE NOT CHECKED. +{-# INLINE unsafeEqArray #-} +unsafeEqArray :: Ring a -> Ptr a -> A.Array a -> Bool +unsafeEqArray Ring{..} rh A.Array{..} = + let !res = A.unsafeInlineIO $ do + let rs = unsafeForeignPtrToPtr ringStart + let as = unsafeForeignPtrToPtr aStart + assert (aEnd `minusPtr` as >= ringBound `minusPtr` rs) + (return ()) + let len = ringBound `minusPtr` rh + r1 <- A.memcmp (castPtr rh) (castPtr as) len + r2 <- A.memcmp (castPtr rs) (castPtr (as `plusPtr` len)) + (rh `minusPtr` rs) + -- XXX enable these, check perf impact + -- touchForeignPtr ringStart + -- touchForeignPtr aStart + return (r1 && r2) + in res + +-- XXX use MonadIO +-- +-- | Fold the buffer starting from ringStart up to the given 'Ptr' using a pure +-- step function. This is useful to fold the items in the ring when the ring is +-- not full. The supplied pointer is usually the end of the ring. +-- +-- Unsafe because the supplied Ptr is not checked to be in range. +{-# INLINE unsafeFoldRing #-} +unsafeFoldRing :: forall a b. Storable a + => Ptr a -> (b -> a -> b) -> b -> Ring a -> b +unsafeFoldRing ptr f z Ring{..} = + let !res = A.unsafeInlineIO $ withForeignPtr ringStart $ \p -> + go z p ptr + in res + where + go !acc !p !q + | p == q = return acc + | otherwise = do + x <- peek p + go (f acc x) (p `plusPtr` sizeOf (undefined :: a)) q + +-- XXX Can we remove MonadIO here? +withForeignPtrM :: MonadIO m => ForeignPtr a -> (Ptr a -> m b) -> m b +withForeignPtrM fp fn = do + r <- fn $ unsafeForeignPtrToPtr fp + liftIO $ touchForeignPtr fp + return r + +-- | Like unsafeFoldRing but with a monadic step function. +{-# INLINE unsafeFoldRingM #-} +unsafeFoldRingM :: forall m a b. (MonadIO m, Storable a) + => Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b +unsafeFoldRingM ptr f z Ring {..} = + withForeignPtrM ringStart $ \x -> go z x ptr + where + go !acc !start !end + | start == end = return acc + | otherwise = do + let !x = A.unsafeInlineIO $ peek start + acc' <- f acc x + go acc' (start `plusPtr` sizeOf (undefined :: a)) end + +-- | Fold the entire length of a ring buffer starting at the supplied ringHead +-- pointer. Assuming the supplied ringHead pointer points to the oldest item, +-- this would fold the ring starting from the oldest item to the newest item in +-- the ring. +-- +-- Note, this will crash on ring of 0 size. +-- +{-# INLINE unsafeFoldRingFullM #-} +unsafeFoldRingFullM :: forall m a b. (MonadIO m, Storable a) + => Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b +unsafeFoldRingFullM rh f z rb@Ring {..} = + withForeignPtrM ringStart $ \_ -> go z rh + where + go !acc !start = do + let !x = A.unsafeInlineIO $ peek start + acc' <- f acc x + let ptr = advance rb start + if ptr == rh + then return acc' + else go acc' ptr + +-- | Fold @Int@ items in the ring starting at @Ptr a@. Won't fold more +-- than the length of the ring. +-- +-- Note, this will crash on ring of 0 size. +-- +{-# INLINE unsafeFoldRingNM #-} +unsafeFoldRingNM :: forall m a b. (MonadIO m, Storable a) + => Int -> Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b +unsafeFoldRingNM count rh f z rb@Ring {..} = + withForeignPtrM ringStart $ \_ -> go count z rh + + where + + go 0 acc _ = return acc + go !n !acc !start = do + let !x = A.unsafeInlineIO $ peek start + acc' <- f acc x + let ptr = advance rb start + if ptr == rh || n == 0 + then return acc' + else go (n - 1) acc' ptr diff --git a/testsuite/tests/profiling/should_compile/T19894/Serial.hs b/testsuite/tests/profiling/should_compile/T19894/Serial.hs new file mode 100644 index 0000000000..d8c9cabcfe --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/Serial.hs @@ -0,0 +1,8 @@ +module Serial (unfoldrM) where + +import StreamK (IsStream) +import qualified StreamD as D + +{-# INLINE unfoldrM #-} +unfoldrM :: (IsStream t, Monad m) => (b -> m (Maybe (a, b))) -> b -> t m a +unfoldrM step seed = D.fromStreamD (D.unfoldrM step seed) diff --git a/testsuite/tests/profiling/should_compile/T19894/Step.hs b/testsuite/tests/profiling/should_compile/T19894/Step.hs new file mode 100644 index 0000000000..72b134c84b --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/Step.hs @@ -0,0 +1,44 @@ +{-# LANGUAGE CPP #-} +-- | +-- Module : Streamly.Internal.Data.Stream.StreamD.Step +-- Copyright : (c) 2018 Composewell Technologies +-- License : BSD-3-Clause +-- Maintainer : streamly@composewell.com +-- Stability : experimental +-- Portability : GHC + +module Step + ( + -- * The stream type + Step (..) + ) +where + +#if defined(FUSION_PLUGIN) +import Fusion.Plugin.Types (Fuse(..)) +#endif + +-- | A stream is a succession of 'Step's. A 'Yield' produces a single value and +-- the next state of the stream. 'Stop' indicates there are no more values in +-- the stream. +#if defined(FUSION_PLUGIN) +{-# ANN type Step Fuse #-} +#endif +data Step s a = Yield a s | Skip s | Stop + +instance Functor (Step s) where + {-# INLINE fmap #-} + fmap f (Yield x s) = Yield (f x) s + fmap _ (Skip s) = Skip s + fmap _ Stop = Stop + +{- +yield :: Monad m => a -> s -> m (Step s a) +yield a = return . Yield a + +skip :: Monad m => s -> m (Step s a) +skip = return . Skip + +stop :: Monad m => m (Step s a) +stop = return Stop +-} diff --git a/testsuite/tests/profiling/should_compile/T19894/StreamD.hs b/testsuite/tests/profiling/should_compile/T19894/StreamD.hs new file mode 100644 index 0000000000..265780188a --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/StreamD.hs @@ -0,0 +1,1079 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE ScopedTypeVariables #-} + +#include "inline.hs" + +-- | +-- Module : Streamly.Internal.Data.Stream.StreamD.Type +-- Copyright : (c) 2018 Composewell Technologies +-- (c) Roman Leshchinskiy 2008-2010 +-- License : BSD-3-Clause +-- Maintainer : streamly@composewell.com +-- Stability : experimental +-- Portability : GHC + +-- The stream type is inspired by the vector package. A few functions in this +-- module have been originally adapted from the vector package (c) Roman +-- Leshchinskiy. See the notes in specific functions. + +module StreamD + ( + -- * The stream type + Step (..) + -- XXX UnStream is exported to avoid a performance issue in concatMap if we + -- use the pattern synonym "Stream". + , Stream (Stream, UnStream) + + -- * Primitives + , nilM + , consM + , uncons + + -- * From Unfold + , unfold + , unfoldrM + , drain + + -- * From Values + , yield + , yieldM + , replicate + + -- * From Containers + , fromList + + -- * Conversions From/To + , fromStreamK + , toStreamK + , toStreamD + , fromStreamD + + -- * Running a 'Fold' + , fold + , fold_ + + -- * Right Folds + -- , foldrT + , foldrM + , foldrMx + , foldr + -- , foldrS + + -- * Left Folds + , foldl' + , foldlM' + , foldlx' + , foldlMx' + + -- * To Containers + , toList + + -- * Multi-stream folds + , eqBy + , cmpBy + + -- * Transformations + , map + , mapM + , take + , takeWhile + , takeWhileM + , postscanOnce + , after_ + + -- * Nesting + -- , ConcatMapUState (..) + , unfoldMany + {- + , concatMap + , concatMapM + , FoldMany (..) -- for inspection testing + , FoldManyPost (..) + , foldMany + , foldManyPost + , groupsOf2 + , chunksOf + -} + ) +where + +import Control.Applicative (liftA2) +-- import Control.Monad (when) +-- import Control.Monad.Trans.Class (lift, MonadTrans) +import Data.Functor.Identity (Identity(..)) +-- import Fusion.Plugin.Types (Fuse(..)) +import GHC.Base (build) +import GHC.Types (SPEC(..)) +import Prelude hiding (map, mapM, foldr, take, concatMap, takeWhile, replicate) + +import Unfold (Unfold(..)) +import Fold (Fold(..)) +import Step (Step (..)) +import StreamK (State, adaptState, defState) +-- import Streamly.Internal.Data.SVar (State, adaptState, defState) +-- import Streamly.Internal.Data.Unfold.Type (Unfold(..)) + +-- ort qualified Streamly.Internal.Data.Fold.Type as FL +import qualified StreamK as K +import qualified Fold as FL + +------------------------------------------------------------------------------ +-- The direct style stream type +------------------------------------------------------------------------------ + +-- gst = global state +-- | A stream consists of a step function that generates the next step given a +-- current state, and the current state. +data Stream m a = + forall s. UnStream (State K.Stream m a -> s -> m (Step s a)) s + +unShare :: Stream m a -> Stream m a +unShare (UnStream step state) = UnStream step' state + where step' gst = step (adaptState gst) + +pattern Stream :: (State K.Stream m a -> s -> m (Step s a)) -> s -> Stream m a +pattern Stream step state <- (unShare -> UnStream step state) + where Stream = UnStream + +#if __GLASGOW_HASKELL__ >= 802 +{-# COMPLETE Stream #-} +#endif + +------------------------------------------------------------------------------ +-- Primitives +------------------------------------------------------------------------------ + +-- | An empty 'Stream' with a side effect. +{-# INLINE_NORMAL nilM #-} +nilM :: Monad m => m b -> Stream m a +nilM m = Stream (\_ _ -> m >> return Stop) () + +{-# INLINE_NORMAL consM #-} +consM :: Monad m => m a -> Stream m a -> Stream m a +consM m (Stream step state) = Stream step1 Nothing + where + {-# INLINE_LATE step1 #-} + step1 _ Nothing = m >>= \x -> return $ Yield x (Just state) + step1 gst (Just st) = do + r <- step gst st + return $ + case r of + Yield a s -> Yield a (Just s) + Skip s -> Skip (Just s) + Stop -> Stop + +-- | Does not fuse, has the same performance as the StreamK version. +{-# INLINE_NORMAL uncons #-} +uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a)) +uncons (UnStream step state) = go state + where + go st = do + r <- step defState st + case r of + Yield x s -> return $ Just (x, Stream step s) + Skip s -> go s + Stop -> return Nothing + +------------------------------------------------------------------------------ +-- From 'Unfold' +------------------------------------------------------------------------------ + +data UnfoldState s = UnfoldNothing | UnfoldJust s + +-- | Convert an 'Unfold' into a 'Stream' by supplying it a seed. +-- +{-# INLINE_NORMAL unfold #-} +unfold :: Monad m => Unfold m a b -> a -> Stream m b +unfold (Unfold ustep inject) seed = Stream step UnfoldNothing + where + {-# INLINE_LATE step #-} + step _ UnfoldNothing = inject seed >>= return . Skip . UnfoldJust + step _ (UnfoldJust st) = do + r <- ustep st + return $ case r of + Yield x s -> Yield x (UnfoldJust s) + Skip s -> Skip (UnfoldJust s) + Stop -> Stop + +------------------------------------------------------------------------------ +-- From Values +------------------------------------------------------------------------------ + +-- | Create a singleton 'Stream' from a pure value. +{-# INLINE_NORMAL yield #-} +yield :: Applicative m => a -> Stream m a +yield x = Stream (\_ s -> pure $ step undefined s) True + where + {-# INLINE_LATE step #-} + step _ True = Yield x False + step _ False = Stop + +-- | Create a singleton 'Stream' from a monadic action. +{-# INLINE_NORMAL yieldM #-} +yieldM :: Monad m => m a -> Stream m a +yieldM m = Stream step True + where + {-# INLINE_LATE step #-} + step _ True = m >>= \x -> return $ Yield x False + step _ False = return Stop + +------------------------------------------------------------------------------ +-- From Containers +------------------------------------------------------------------------------ + +-- Adapted from the vector package. +-- | Convert a list of pure values to a 'Stream' +{-# INLINE_LATE fromList #-} +fromList :: Applicative m => [a] -> Stream m a +fromList = Stream step + where + {-# INLINE_LATE step #-} + step _ (x:xs) = pure $ Yield x xs + step _ [] = pure Stop + +------------------------------------------------------------------------------ +-- Conversions From/To +------------------------------------------------------------------------------ + +-- | Convert a CPS encoded StreamK to direct style step encoded StreamD +{-# INLINE_LATE fromStreamK #-} +fromStreamK :: Monad m => K.Stream m a -> Stream m a +fromStreamK = Stream step + where + step gst m1 = + let stop = return Stop + single a = return $ Yield a K.nil + yieldk a r = return $ Yield a r + in K.foldStreamShared gst yieldk single stop m1 + +-- | Convert a direct style step encoded StreamD to a CPS encoded StreamK +{-# INLINE_LATE toStreamK #-} +toStreamK :: Monad m => Stream m a -> K.Stream m a +toStreamK (Stream step state) = go state + where + go st = K.mkStream $ \gst yld _ stp -> + let go' ss = do + r <- step gst ss + case r of + Yield x s -> yld x (go s) + Skip s -> go' s + Stop -> stp + in go' st + +#if !defined(DISABLE_FUSION) +{-# RULES "fromStreamK/toStreamK fusion" + forall s. toStreamK (fromStreamK s) = s #-} +{-# RULES "toStreamK/fromStreamK fusion" + forall s. fromStreamK (toStreamK s) = s #-} +#endif + +-- XXX Rename to toStream or move to some IsStream common module +{-# INLINE fromStreamD #-} +fromStreamD :: (K.IsStream t, Monad m) => Stream m a -> t m a +fromStreamD = K.fromStream . toStreamK + +-- XXX Rename to toStream or move to some IsStream common module +{-# INLINE toStreamD #-} +toStreamD :: (K.IsStream t, Monad m) => t m a -> Stream m a +toStreamD = fromStreamK . K.toStream + +------------------------------------------------------------------------------ +-- Running a 'Fold' +------------------------------------------------------------------------------ + +{-# INLINE_NORMAL fold #-} +fold :: (Monad m) => Fold m a b -> Stream m a -> m b +fold fld strm = do + (b, _) <- fold_ fld strm + return b + +{-# INLINE_NORMAL fold_ #-} +fold_ :: Monad m => Fold m a b -> Stream m a -> m (b, Stream m a) +fold_ (Fold fstep begin done) (Stream step state) = do + res <- begin + case res of + FL.Partial fs -> go SPEC fs state + FL.Done fb -> return $! (fb, Stream step state) + + where + + {-# INLINE go #-} + go !_ !fs st = do + r <- step defState st + case r of + Yield x s -> do + res <- fstep fs x + case res of + FL.Done b -> return $! (b, Stream step s) + FL.Partial fs1 -> go SPEC fs1 s + Skip s -> go SPEC fs s + Stop -> do + b <- done fs + return $! (b, Stream (\ _ _ -> return Stop) ()) + +------------------------------------------------------------------------------ +-- Right Folds +------------------------------------------------------------------------------ + +-- Adapted from the vector package. +-- +-- XXX Use of SPEC constructor in folds causes 2x performance degradation in +-- one shot operations, but helps immensely in operations composed of multiple +-- combinators or the same combinator many times. There seems to be an +-- opportunity to optimize here, can we get both, better perf for single ops +-- as well as composed ops? Without SPEC, all single operation benchmarks +-- become 2x faster. + +-- The way we want a left fold to be strict, dually we want the right fold to +-- be lazy. The correct signature of the fold function to keep it lazy must be +-- (a -> m b -> m b) instead of (a -> b -> m b). We were using the latter +-- earlier, which is incorrect. In the latter signature we have to feed the +-- value to the fold function after evaluating the monadic action, depending on +-- the bind behavior of the monad, the action may get evaluated immediately +-- introducing unnecessary strictness to the fold. If the implementation is +-- lazy the following example, must work: +-- +-- S.foldrM (\x t -> if x then return t else return False) (return True) +-- (S.fromList [False,undefined] :: SerialT IO Bool) +-- +{-# INLINE_NORMAL foldrM #-} +foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b +foldrM f z (Stream step state) = go SPEC state + where + {-# INLINE_LATE go #-} + go !_ st = do + r <- step defState st + case r of + Yield x s -> f x (go SPEC s) + Skip s -> go SPEC s + Stop -> z + +{-# INLINE_NORMAL foldrMx #-} +foldrMx :: Monad m + => (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b +foldrMx fstep final convert (Stream step state) = convert $ go SPEC state + where + {-# INLINE_LATE go #-} + go !_ st = do + r <- step defState st + case r of + Yield x s -> fstep x (go SPEC s) + Skip s -> go SPEC s + Stop -> final + +-- Note that foldr works on pure values, therefore it becomes necessarily +-- strict when the monad m is strict. In that case it cannot terminate early, +-- it would evaluate all of its input. Though, this should work fine with lazy +-- monads. For example, if "any" is implemented using "foldr" instead of +-- "foldrM" it performs the same with Identity monad but performs 1000x slower +-- with IO monad. +-- +{-# INLINE_NORMAL foldr #-} +foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b +foldr f z = foldrM (\a b -> liftA2 f (return a) b) (return z) + +{- +-- this performs horribly, should not be used +{-# INLINE_NORMAL foldrS #-} +foldrS + :: Monad m + => (a -> Stream m b -> Stream m b) + -> Stream m b + -> Stream m a + -> Stream m b +foldrS f final (Stream step state) = go SPEC state + where + {-# INLINE_LATE go #-} + go !_ st = do + -- defState?? + r <- yieldM $ step defState st + case r of + Yield x s -> f x (go SPEC s) + Skip s -> go SPEC s + Stop -> final + -} + +{- +-- Right fold to some transformer (T) monad. This can be useful to implement +-- stateless combinators like map, filtering, insertions, takeWhile, dropWhile. +-- +{-# INLINE_NORMAL foldrT #-} +foldrT :: (Monad m, Monad (t m), MonadTrans t) + => (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b +foldrT f final (Stream step state) = go SPEC state + where + {-# INLINE_LATE go #-} + go !_ st = do + r <- lift $ step defState st + case r of + Yield x s -> f x (go SPEC s) + Skip s -> go SPEC s + Stop -> final +-} + +------------------------------------------------------------------------------ +-- Left Folds +------------------------------------------------------------------------------ + +-- XXX run begin action only if the stream is not empty. +{-# INLINE_NORMAL foldlMx' #-} +foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b +foldlMx' fstep begin done (Stream step state) = + begin >>= \x -> go SPEC x state + where + -- XXX !acc? + {-# INLINE_LATE go #-} + go !_ acc st = acc `seq` do + r <- step defState st + case r of + Yield x s -> do + acc' <- fstep acc x + go SPEC acc' s + Skip s -> go SPEC acc s + Stop -> done acc + +{-# INLINE foldlx' #-} +foldlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b +foldlx' fstep begin done m = + foldlMx' (\b a -> return (fstep b a)) (return begin) (return . done) m + +-- Adapted from the vector package. +-- XXX implement in terms of foldlMx'? +{-# INLINE_NORMAL foldlM' #-} +foldlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> m b +foldlM' fstep mbegin (Stream step state) = do + begin <- mbegin + go SPEC begin state + where + {-# INLINE_LATE go #-} + go !_ acc st = acc `seq` do + r <- step defState st + case r of + Yield x s -> do + acc' <- fstep acc x + go SPEC acc' s + Skip s -> go SPEC acc s + Stop -> return acc + +{-# INLINE foldl' #-} +foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b +foldl' fstep begin = foldlM' (\b a -> return (fstep b a)) (return begin) + +------------------------------------------------------------------------------ +-- To Containers +------------------------------------------------------------------------------ + +{-# INLINE_NORMAL toList #-} +toList :: Monad m => Stream m a -> m [a] +toList = foldr (:) [] + +-- Use foldr/build fusion to fuse with list consumers +-- This can be useful when using the IsList instance +{-# INLINE_LATE toListFB #-} +toListFB :: (a -> b -> b) -> b -> Stream Identity a -> b +toListFB c n (Stream step state) = go state + where + go st = case runIdentity (step defState st) of + Yield x s -> x `c` go s + Skip s -> go s + Stop -> n + +{-# RULES "toList Identity" toList = toListId #-} +{-# INLINE_EARLY toListId #-} +toListId :: Stream Identity a -> Identity [a] +toListId s = Identity $ build (\c n -> toListFB c n s) + +------------------------------------------------------------------------------ +-- Multi-stream folds +------------------------------------------------------------------------------ + +-- Adapted from the vector package. +{-# INLINE_NORMAL eqBy #-} +eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool +eqBy eq (Stream step1 t1) (Stream step2 t2) = eq_loop0 SPEC t1 t2 + where + eq_loop0 !_ s1 s2 = do + r <- step1 defState s1 + case r of + Yield x s1' -> eq_loop1 SPEC x s1' s2 + Skip s1' -> eq_loop0 SPEC s1' s2 + Stop -> eq_null s2 + + eq_loop1 !_ x s1 s2 = do + r <- step2 defState s2 + case r of + Yield y s2' + | eq x y -> eq_loop0 SPEC s1 s2' + | otherwise -> return False + Skip s2' -> eq_loop1 SPEC x s1 s2' + Stop -> return False + + eq_null s2 = do + r <- step2 defState s2 + case r of + Yield _ _ -> return False + Skip s2' -> eq_null s2' + Stop -> return True + +-- Adapted from the vector package. +-- | Compare two streams lexicographically +{-# INLINE_NORMAL cmpBy #-} +cmpBy + :: Monad m + => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering +cmpBy cmp (Stream step1 t1) (Stream step2 t2) = cmp_loop0 SPEC t1 t2 + where + cmp_loop0 !_ s1 s2 = do + r <- step1 defState s1 + case r of + Yield x s1' -> cmp_loop1 SPEC x s1' s2 + Skip s1' -> cmp_loop0 SPEC s1' s2 + Stop -> cmp_null s2 + + cmp_loop1 !_ x s1 s2 = do + r <- step2 defState s2 + case r of + Yield y s2' -> case x `cmp` y of + EQ -> cmp_loop0 SPEC s1 s2' + c -> return c + Skip s2' -> cmp_loop1 SPEC x s1 s2' + Stop -> return GT + + cmp_null s2 = do + r <- step2 defState s2 + case r of + Yield _ _ -> return LT + Skip s2' -> cmp_null s2' + Stop -> return EQ + +------------------------------------------------------------------------------ +-- Transformations +------------------------------------------------------------------------------ + +-- Adapted from the vector package. +-- | Map a monadic function over a 'Stream' +{-# INLINE_NORMAL mapM #-} +mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b +mapM f (Stream step state) = Stream step' state + where + {-# INLINE_LATE step' #-} + step' gst st = do + r <- step (adaptState gst) st + case r of + Yield x s -> f x >>= \a -> return $ Yield a s + Skip s -> return $ Skip s + Stop -> return Stop + +{-# INLINE map #-} +map :: Monad m => (a -> b) -> Stream m a -> Stream m b +map f = mapM (return . f) + +instance Functor m => Functor (Stream m) where + {-# INLINE fmap #-} + fmap f (Stream step state) = Stream step' state + where + {-# INLINE_LATE step' #-} + step' gst st = fmap (fmap f) (step (adaptState gst) st) + + {-# INLINE (<$) #-} + (<$) = fmap . const + +------------------------------------------------------------------------------- +-- Filtering +------------------------------------------------------------------------------- + +-- Adapted from the vector package. +{-# INLINE_NORMAL take #-} +take :: Monad m => Int -> Stream m a -> Stream m a +take n (Stream step state) = n `seq` Stream step' (state, 0) + where + {-# INLINE_LATE step' #-} + step' gst (st, i) | i < n = do + r <- step gst st + return $ case r of + Yield x s -> Yield x (s, i + 1) + Skip s -> Skip (s, i) + Stop -> Stop + step' _ (_, _) = return Stop + +-- Adapted from the vector package. +{-# INLINE_NORMAL takeWhileM #-} +takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a +takeWhileM f (Stream step state) = Stream step' state + where + {-# INLINE_LATE step' #-} + step' gst st = do + r <- step gst st + case r of + Yield x s -> do + b <- f x + return $ if b then Yield x s else Stop + Skip s -> return $ Skip s + Stop -> return Stop + +{-# INLINE takeWhile #-} +takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a +takeWhile f = takeWhileM (return . f) + +------------------------------------------------------------------------------ +-- Combine N Streams - concatAp +------------------------------------------------------------------------------ + +{-# INLINE_NORMAL concatAp #-} +concatAp :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b +concatAp (Stream stepa statea) (Stream stepb stateb) = Stream step' (Left statea) + where + {-# INLINE_LATE step' #-} + step' gst (Left st) = fmap + (\r -> case r of + Yield f s -> Skip (Right (f, s, stateb)) + Skip s -> Skip (Left s) + Stop -> Stop) + (stepa (adaptState gst) st) + step' gst (Right (f, os, st)) = fmap + (\r -> case r of + Yield a s -> Yield (f a) (Right (f, os, s)) + Skip s -> Skip (Right (f,os, s)) + Stop -> Skip (Left os)) + (stepb (adaptState gst) st) + +{-# INLINE_NORMAL apSequence #-} +apSequence :: Functor f => Stream f a -> Stream f b -> Stream f b +apSequence (Stream stepa statea) (Stream stepb stateb) = + Stream step (Left statea) + + where + + {-# INLINE_LATE step #-} + step gst (Left st) = + fmap + (\r -> + case r of + Yield _ s -> Skip (Right (s, stateb)) + Skip s -> Skip (Left s) + Stop -> Stop) + (stepa (adaptState gst) st) + step gst (Right (ostate, st)) = + fmap + (\r -> + case r of + Yield b s -> Yield b (Right (ostate, s)) + Skip s -> Skip (Right (ostate, s)) + Stop -> Skip (Left ostate)) + (stepb gst st) + +{-# INLINE_NORMAL apDiscardSnd #-} +apDiscardSnd :: Functor f => Stream f a -> Stream f b -> Stream f a +apDiscardSnd (Stream stepa statea) (Stream stepb stateb) = + Stream step (Left statea) + + where + + {-# INLINE_LATE step #-} + step gst (Left st) = + fmap + (\r -> + case r of + Yield b s -> Skip (Right (s, stateb, b)) + Skip s -> Skip (Left s) + Stop -> Stop) + (stepa gst st) + step gst (Right (ostate, st, b)) = + fmap + (\r -> + case r of + Yield _ s -> Yield b (Right (ostate, s, b)) + Skip s -> Skip (Right (ostate, s, b)) + Stop -> Skip (Left ostate)) + (stepb (adaptState gst) st) + +instance Applicative f => Applicative (Stream f) where + {-# INLINE pure #-} + pure = yield + + {-# INLINE (<*>) #-} + (<*>) = concatAp + +#if MIN_VERSION_base(4,10,0) + {-# INLINE liftA2 #-} + liftA2 f x = (<*>) (fmap f x) +#endif + + {-# INLINE (*>) #-} + (*>) = apSequence + + {-# INLINE (<*) #-} + (<*) = apDiscardSnd + +------------------------------------------------------------------------------ +-- Combine N Streams - unfoldMany +------------------------------------------------------------------------------ + +-- Define a unique structure to use in inspection testing +data ConcatMapUState o i = + ConcatMapUOuter o + | ConcatMapUInner o i + +-- | @unfoldMany unfold stream@ uses @unfold@ to map the input stream elements +-- to streams and then flattens the generated streams into a single output +-- stream. + +-- This is like 'concatMap' but uses an unfold with an explicit state to +-- generate the stream instead of a 'Stream' type generator. This allows better +-- optimization via fusion. This can be many times more efficient than +-- 'concatMap'. + +{-# INLINE_NORMAL unfoldMany #-} +unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b +unfoldMany (Unfold istep inject) (Stream ostep ost) = + Stream step (ConcatMapUOuter ost) + where + {-# INLINE_LATE step #-} + step gst (ConcatMapUOuter o) = do + r <- ostep (adaptState gst) o + case r of + Yield a o' -> do + i <- inject a + i `seq` return (Skip (ConcatMapUInner o' i)) + Skip o' -> return $ Skip (ConcatMapUOuter o') + Stop -> return $ Stop + + step _ (ConcatMapUInner o i) = do + r <- istep i + return $ case r of + Yield x i' -> Yield x (ConcatMapUInner o i') + Skip i' -> Skip (ConcatMapUInner o i') + Stop -> Skip (ConcatMapUOuter o) + +{- +------------------------------------------------------------------------------ +-- Combine N Streams - concatMap +------------------------------------------------------------------------------ + +-- Adapted from the vector package. +{-# INLINE_NORMAL concatMapM #-} +concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b +concatMapM f (Stream step state) = Stream step' (Left state) + where + {-# INLINE_LATE step' #-} + step' gst (Left st) = do + r <- step (adaptState gst) st + case r of + Yield a s -> do + b_stream <- f a + return $ Skip (Right (b_stream, s)) + Skip s -> return $ Skip (Left s) + Stop -> return Stop + + -- XXX flattenArrays is 5x faster than "concatMap fromArray". if somehow we + -- can get inner_step to inline and fuse here we can perhaps get the same + -- performance using "concatMap fromArray". + -- + -- XXX using the pattern synonym "Stream" causes a major performance issue + -- here even if the synonym does not include an adaptState call. Need to + -- find out why. Is that something to be fixed in GHC? + step' gst (Right (UnStream inner_step inner_st, st)) = do + r <- inner_step (adaptState gst) inner_st + case r of + Yield b inner_s -> + return $ Yield b (Right (Stream inner_step inner_s, st)) + Skip inner_s -> + return $ Skip (Right (Stream inner_step inner_s, st)) + Stop -> return $ Skip (Left st) + +{-# INLINE concatMap #-} +concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b +concatMap f = concatMapM (return . f) + +-- XXX The idea behind this rule is to rewrite any calls to "concatMap +-- fromArray" automatically to flattenArrays which is much faster. However, we +-- need an INLINE_EARLY on concatMap for this rule to fire. But if we use +-- INLINE_EARLY on concatMap or fromArray then direct uses of +-- "concatMap fromArray" (without the RULE) become much slower, this means +-- "concatMap f" in general would become slower. Need to find a solution to +-- this. +-- +-- {-# RULES "concatMap Array.toStreamD" +-- concatMap Array.toStreamD = Array.flattenArray #-} + +-- NOTE: even though concatMap for StreamD is 4x faster compared to StreamK, +-- the monad instance does not seem to be significantly faster. +instance Monad m => Monad (Stream m) where + {-# INLINE return #-} + return = pure + + {-# INLINE (>>=) #-} + (>>=) = flip concatMap + + {-# INLINE (>>) #-} + (>>) = (*>) + +------------------------------------------------------------------------------ +-- Grouping/Splitting +------------------------------------------------------------------------------ + +-- s = stream state, fs = fold state +-- {-# ANN type FoldManyPost Fuse #-} +data FoldManyPost s fs b a + = FoldManyPostStart s + | FoldManyPostLoop s fs + | FoldManyPostYield b (FoldManyPost s fs b a) + | FoldManyPostDone + +-- | Like foldMany but with the following differences: +-- +-- * If the stream is empty the default value of the fold would still be +-- emitted in the output. +-- * At the end of the stream if the last application of the fold did not +-- receive any input it would still yield the default fold accumulator as the +-- last value. +-- +{-# INLINE_NORMAL foldManyPost #-} +foldManyPost :: Monad m => Fold m a b -> Stream m a -> Stream m b +foldManyPost (Fold fstep initial extract) (Stream step state) = + Stream step' (FoldManyPostStart state) + + where + + {-# INLINE consume #-} + consume x s fs = do + res <- fstep fs x + return + $ Skip + $ case res of + FL.Done b -> FoldManyPostYield b (FoldManyPostStart s) + FL.Partial ps -> FoldManyPostLoop s ps + + {-# INLINE_LATE step' #-} + step' _ (FoldManyPostStart st) = do + r <- initial + return + $ Skip + $ case r of + FL.Done b -> FoldManyPostYield b (FoldManyPostStart st) + FL.Partial fs -> FoldManyPostLoop st fs + step' gst (FoldManyPostLoop st fs) = do + r <- step (adaptState gst) st + case r of + Yield x s -> consume x s fs + Skip s -> return $ Skip (FoldManyPostLoop s fs) + Stop -> do + b <- extract fs + return $ Skip (FoldManyPostYield b FoldManyPostDone) + step' _ (FoldManyPostYield b next) = return $ Yield b next + step' _ FoldManyPostDone = return Stop + +-- {-# ANN type FoldMany Fuse #-} +data FoldMany s fs b a + = FoldManyStart s + | FoldManyFirst fs s + | FoldManyLoop s fs + | FoldManyYield b (FoldMany s fs b a) + | FoldManyDone + +-- | Apply a fold multiple times until the stream ends. If the stream is empty +-- the output would be empty. +-- +-- @foldMany f = parseMany (fromFold f)@ +-- +-- A terminating fold may terminate even without accepting a single input. So +-- we run the fold's initial action before evaluating the stream. However, this +-- means that if later the stream does not yield anything we have to discard +-- the fold's initial result which could have generated an effect. +-- +{-# INLINE_NORMAL foldMany #-} +foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b +foldMany (Fold fstep initial extract) (Stream step state) = + Stream step' (FoldManyStart state) + + where + + {-# INLINE consume #-} + consume x s fs = do + res <- fstep fs x + return + $ Skip + $ case res of + FL.Done b -> FoldManyYield b (FoldManyStart s) + FL.Partial ps -> FoldManyLoop s ps + + {-# INLINE_LATE step' #-} + step' _ (FoldManyStart st) = do + r <- initial + return + $ Skip + $ case r of + FL.Done b -> FoldManyYield b (FoldManyStart st) + FL.Partial fs -> FoldManyFirst fs st + step' gst (FoldManyFirst fs st) = do + r <- step (adaptState gst) st + case r of + Yield x s -> consume x s fs + Skip s -> return $ Skip (FoldManyFirst fs s) + Stop -> return Stop + step' gst (FoldManyLoop st fs) = do + r <- step (adaptState gst) st + case r of + Yield x s -> consume x s fs + Skip s -> return $ Skip (FoldManyLoop s fs) + Stop -> do + b <- extract fs + return $ Skip (FoldManyYield b FoldManyDone) + step' _ (FoldManyYield b next) = return $ Yield b next + step' _ FoldManyDone = return Stop + +{-# INLINE chunksOf #-} +chunksOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b +chunksOf n f = foldMany (FL.take n f) + +data GroupState2 s fs + = GroupStart2 s + | GroupBuffer2 s fs Int + | GroupYield2 fs (GroupState2 s fs) + | GroupFinish2 + +{-# INLINE_NORMAL groupsOf2 #-} +groupsOf2 + :: Monad m + => Int + -> m c + -> Fold2 m c a b + -> Stream m a + -> Stream m b +groupsOf2 n input (Fold2 fstep inject extract) (Stream step state) = + n `seq` Stream step' (GroupStart2 state) + + where + + {-# INLINE_LATE step' #-} + step' _ (GroupStart2 st) = do + -- XXX shall we use the Natural type instead? Need to check performance + -- implications. + when (n <= 0) $ + -- XXX we can pass the module string from the higher level API + error $ "Streamly.Internal.Data.Stream.StreamD.Type.groupsOf: the size of " + ++ "groups [" ++ show n ++ "] must be a natural number" + -- fs = fold state + fs <- input >>= inject + return $ Skip (GroupBuffer2 st fs 0) + + step' gst (GroupBuffer2 st fs i) = do + r <- step (adaptState gst) st + case r of + Yield x s -> do + !fs' <- fstep fs x + let i' = i + 1 + return $ + if i' >= n + then Skip (GroupYield2 fs' (GroupStart2 s)) + else Skip (GroupBuffer2 s fs' i') + Skip s -> return $ Skip (GroupBuffer2 s fs i) + Stop -> return $ Skip (GroupYield2 fs GroupFinish2) + + step' _ (GroupYield2 fs next) = do + r <- extract fs + return $ Yield r next + + step' _ GroupFinish2 = return Stop + +------------------------------------------------------------------------------ +-- Other instances +------------------------------------------------------------------------------ + +instance MonadTrans Stream where + {-# INLINE lift #-} + lift = yieldM + +instance (MonadThrow m) => MonadThrow (Stream m) where + throwM = lift . throwM + -} + +{-# INLINE_NORMAL unfoldrM #-} +unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a +unfoldrM next state = Stream step state + where + {-# INLINE_LATE step #-} + step _ st = do + r <- next st + return $ case r of + Just (x, s) -> Yield x s + Nothing -> Stop + +{-# INLINE_LATE drain #-} +drain :: Monad m => Stream m a -> m () +-- drain = foldrM (\_ xs -> xs) (return ()) +drain (Stream step state) = go SPEC state + where + go !_ st = do + r <- step defState st + case r of + Yield _ s -> go SPEC s + Skip s -> go SPEC s + Stop -> return () + +------------------------------------------------------------------------------ +-- Scanning with a Fold +------------------------------------------------------------------------------ + +data ScanState s f = ScanInit s | ScanDo s !f | ScanDone + +{-# INLINE [1] postscanOnce #-} +postscanOnce :: Monad m => FL.Fold m a b -> Stream m a -> Stream m b +postscanOnce (FL.Fold fstep initial extract) (Stream sstep state) = + Stream step (ScanInit state) + + where + + {-# INLINE_LATE step #-} + step _ (ScanInit st) = do + res <- initial + return + $ case res of + FL.Partial fs -> Skip $ ScanDo st fs + FL.Done b -> Yield b ScanDone + step gst (ScanDo st fs) = do + res <- sstep (adaptState gst) st + case res of + Yield x s -> do + r <- fstep fs x + case r of + FL.Partial fs1 -> do + !b <- extract fs1 + return $ Yield b $ ScanDo s fs1 + FL.Done b -> return $ Yield b ScanDone + Skip s -> return $ Skip $ ScanDo s fs + Stop -> return Stop + step _ ScanDone = return Stop + +{-# INLINE [1] replicateM #-} +replicateM :: forall m a. Monad m => Int -> m a -> Stream m a +replicateM n p = Stream step n + where + {-# INLINE_LATE step #-} + step _ (i :: Int) + | i <= 0 = return Stop + | otherwise = do + x <- p + return $ Yield x (i - 1) + +{-# INLINE [1] replicate #-} +replicate :: Monad m => Int -> a -> Stream m a +replicate n x = replicateM n (return x) + +{-# INLINE [1] after_ #-} +after_ :: Monad m => m b -> Stream m a -> Stream m a +after_ action (Stream step state) = Stream step' state + + where + + {-# INLINE_LATE step' #-} + step' gst st = do + res <- step gst st + case res of + Yield x s -> return $ Yield x s + Skip s -> return $ Skip s + Stop -> action >> return Stop diff --git a/testsuite/tests/profiling/should_compile/T19894/StreamK.hs b/testsuite/tests/profiling/should_compile/T19894/StreamK.hs new file mode 100644 index 0000000000..d24559f4dc --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/StreamK.hs @@ -0,0 +1,1245 @@ +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE QuantifiedConstraints #-} +{-# LANGUAGE ConstraintKinds #-} +{-# LANGUAGE InstanceSigs #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE CPP #-} + +-- | +-- Module : Streamly.Internal.Data.Stream.StreamK.Type +-- Copyright : (c) 2017 Composewell Technologies +-- +-- License : BSD3 +-- Maintainer : streamly@composewell.com +-- Stability : experimental +-- Portability : GHC +-- +-- +-- Continuation passing style (CPS) stream implementation. The symbol 'K' below +-- denotes a function as well as a Kontinuation. +-- +module StreamK + ( + -- * A class for streams + IsStream (..) + , adapt + , State + , adaptState + , defState + , MonadAsync + , unfoldrM + , drain + + -- * The stream type + , Stream (..) + + -- * Construction + , mkStream + , fromStopK + , fromYieldK + , consK + + -- * Elimination + , foldStream + , foldStreamShared + , foldl' + , foldlx' + + -- * foldr/build + , foldrM + , foldrS + , foldrSShared + , foldrSM + , build + , buildS + , buildM + , buildSM + , sharedM + , augmentS + , augmentSM + + -- instances + , cons + , (.:) + , consMStream + , consMBy + , yieldM + , yield + + , nil + , nilM + , conjoin + , serial + , map + , mapM + , mapMSerial + , unShare + , concatMapBy + , concatMap + , bindWith + , concatPairsWith + , apWith + , apSerial + , apSerialDiscardFst + , apSerialDiscardSnd + + , Streaming -- deprecated + ) +where + +#include "inline.hs" + +import Control.Monad (ap, (>=>)) +-- import Control.Monad.Trans.Class (MonadTrans(lift)) +import Data.Kind (Type) +-- import Control.Monad.Catch (MonadThrow) +import Control.Monad.IO.Class (MonadIO(..)) + +#if __GLASGOW_HASKELL__ < 808 +import Data.Semigroup (Semigroup(..)) +#endif +import Prelude hiding (map, mapM, concatMap, foldr) + +-- import Streamly.Internal.Data.SVar + +------------------------------------------------------------------------------ +-- Basic stream type +------------------------------------------------------------------------------ + +data State (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a = State + { + _inspectMode :: Bool + } + +adaptState :: State t m a -> State t n b +adaptState st = st + { + _inspectMode = False + } + +defState :: State t m a +defState = State + { + _inspectMode = False + } + +-- | The type @Stream m a@ represents a monadic stream of values of type 'a' +-- constructed using actions in monad 'm'. It uses stop, singleton and yield +-- continuations equivalent to the following direct style type: +-- +-- @ +-- data Stream m a = Stop | Singleton a | Yield a (Stream m a) +-- @ +-- +-- To facilitate parallel composition we maintain a local state in an 'SVar' +-- that is shared across and is used for synchronization of the streams being +-- composed. +-- +-- The singleton case can be expressed in terms of stop and yield but we have +-- it as a separate case to optimize composition operations for streams with +-- single element. We build singleton streams in the implementation of 'pure' +-- for Applicative and Monad, and in 'lift' for MonadTrans. + +-- XXX remove the Stream type parameter from State as it is always constant. +-- We can remove it from SVar as well + +newtype Stream (m :: Type -> Type) a = + MkStream (forall r. + State Stream m a -- state + -> (a -> Stream m a -> m r) -- yield + -> (a -> m r) -- singleton + -> m r -- stop + -> m r + ) + +------------------------------------------------------------------------------ +-- Types that can behave as a Stream +------------------------------------------------------------------------------ + +infixr 5 `consM` +infixr 5 |: + +type MonadAsync m = (MonadIO m) + +-- XXX Use a different SVar based on the stream type. But we need to make sure +-- that we do not lose performance due to polymorphism. +-- +-- | Class of types that can represent a stream of elements of some type 'a' in +-- some monad 'm'. +-- +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 +class +#if __GLASGOW_HASKELL__ >= 806 + ( forall m a. MonadAsync m => Semigroup (t m a) + , forall m a. MonadAsync m => Monoid (t m a) + , forall m. Monad m => Functor (t m) + , forall m. MonadAsync m => Applicative (t m) + ) => +#endif + IsStream t where + toStream :: t m a -> Stream m a + fromStream :: Stream m a -> t m a + -- | Constructs a stream by adding a monadic action at the head of an + -- existing stream. For example: + -- + -- @ + -- > toList $ getLine \`consM` getLine \`consM` nil + -- hello + -- world + -- ["hello","world"] + -- @ + -- + -- /Concurrent (do not use 'fromParallel' to construct infinite streams)/ + -- + -- @since 0.2.0 + consM :: MonadAsync m => m a -> t m a -> t m a + -- | Operator equivalent of 'consM'. We can read it as "@parallel colon@" + -- to remember that @|@ comes before ':'. + -- + -- @ + -- > toList $ getLine |: getLine |: nil + -- hello + -- world + -- ["hello","world"] + -- @ + -- + -- @ + -- let delay = threadDelay 1000000 >> print 1 + -- drain $ fromSerial $ delay |: delay |: delay |: nil + -- drain $ fromParallel $ delay |: delay |: delay |: nil + -- @ + -- + -- /Concurrent (do not use 'fromParallel' to construct infinite streams)/ + -- + -- @since 0.2.0 + (|:) :: MonadAsync m => m a -> t m a -> t m a + -- We can define (|:) just as 'consM' but it is defined explicitly for each + -- type because we want to use SPECIALIZE pragma on the definition. + +-- | Same as 'IsStream'. +-- +-- @since 0.1.0 +{-# DEPRECATED Streaming "Please use IsStream instead." #-} +type Streaming = IsStream + +------------------------------------------------------------------------------- +-- Type adapting combinators +------------------------------------------------------------------------------- + +-- XXX Move/reset the State here by reconstructing the stream with cleared +-- state. Can we make sure we do not do that when t1 = t2? If we do this then +-- we do not need to do that explicitly using svarStyle. It would act as +-- unShare when the stream type is the same. +-- +-- | Adapt any specific stream type to any other specific stream type. +-- +-- /Since: 0.1.0 ("Streamly")/ +-- +-- @since 0.8.0 +adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a +adapt = fromStream . toStream + +------------------------------------------------------------------------------ +-- Building a stream +------------------------------------------------------------------------------ + +-- XXX The State is always parameterized by "Stream" which means State is not +-- different for different stream types. So we have to manually make sure that +-- when converting from one stream to another we migrate the state correctly. +-- This can be fixed if we use a different SVar type for different streams. +-- Currently we always use "SVar Stream" and therefore a different State type +-- parameterized by that stream. +-- +-- XXX Since t is coercible we should be able to coerce k +-- mkStream k = fromStream $ MkStream $ coerce k +-- +-- | Build a stream from an 'SVar', a stop continuation, a singleton stream +-- continuation and a yield continuation. +{-# INLINE_EARLY mkStream #-} +mkStream :: IsStream t + => (forall r. State Stream m a + -> (a -> t m a -> m r) + -> (a -> m r) + -> m r + -> m r) + -> t m a +mkStream k = fromStream $ MkStream $ \st yld sng stp -> + let yieldk a r = yld a (toStream r) + in k st yieldk sng stp + +{-# RULES "mkStream from stream" mkStream = mkStreamFromStream #-} +mkStreamFromStream :: IsStream t + => (forall r. State Stream m a + -> (a -> Stream m a -> m r) + -> (a -> m r) + -> m r + -> m r) + -> t m a +mkStreamFromStream k = fromStream $ MkStream k + +{-# RULES "mkStream stream" mkStream = mkStreamStream #-} +mkStreamStream + :: (forall r. State Stream m a + -> (a -> Stream m a -> m r) + -> (a -> m r) + -> m r + -> m r) + -> Stream m a +mkStreamStream = MkStream + +-- | A terminal function that has no continuation to follow. +type StopK m = forall r. m r -> m r + +-- | A monadic continuation, it is a function that yields a value of type "a" +-- and calls the argument (a -> m r) as a continuation with that value. We can +-- also think of it as a callback with a handler (a -> m r). Category +-- theorists call it a codensity type, a special type of right kan extension. +type YieldK m a = forall r. (a -> m r) -> m r + +_wrapM :: Monad m => m a -> YieldK m a +_wrapM m = \k -> m >>= k + +-- | Make an empty stream from a stop function. +fromStopK :: IsStream t => StopK m -> t m a +fromStopK k = mkStream $ \_ _ _ stp -> k stp + +-- | Make a singleton stream from a callback function. The callback function +-- calls the one-shot yield continuation to yield an element. +fromYieldK :: IsStream t => YieldK m a -> t m a +fromYieldK k = mkStream $ \_ _ sng _ -> k sng + +-- | Add a yield function at the head of the stream. +consK :: IsStream t => YieldK m a -> t m a -> t m a +consK k r = mkStream $ \_ yld _ _ -> k (\x -> yld x r) + +-- XXX Build a stream from a repeating callback function. + +------------------------------------------------------------------------------ +-- Construction +------------------------------------------------------------------------------ + +infixr 5 `cons` + +-- faster than consM because there is no bind. +-- | Construct a stream by adding a pure value at the head of an existing +-- stream. For serial streams this is the same as @(return a) \`consM` r@ but +-- more efficient. For concurrent streams this is not concurrent whereas +-- 'consM' is concurrent. For example: +-- +-- @ +-- > toList $ 1 \`cons` 2 \`cons` 3 \`cons` nil +-- [1,2,3] +-- @ +-- +-- @since 0.1.0 +{-# INLINE_NORMAL cons #-} +cons :: IsStream t => a -> t m a -> t m a +cons a r = mkStream $ \_ yld _ _ -> yld a r + +infixr 5 .: + +-- | Operator equivalent of 'cons'. +-- +-- @ +-- > toList $ 1 .: 2 .: 3 .: nil +-- [1,2,3] +-- @ +-- +-- @since 0.1.1 +{-# INLINE (.:) #-} +(.:) :: IsStream t => a -> t m a -> t m a +(.:) = cons + +-- | An empty stream. +-- +-- @ +-- > toList nil +-- [] +-- @ +-- +-- @since 0.1.0 +{-# INLINE_NORMAL nil #-} +nil :: IsStream t => t m a +nil = mkStream $ \_ _ _ stp -> stp + +-- | An empty stream producing a side effect. +-- +-- @ +-- > toList (nilM (print "nil")) +-- "nil" +-- [] +-- @ +-- +-- /Pre-release/ +{-# INLINE_NORMAL nilM #-} +nilM :: (IsStream t, Monad m) => m b -> t m a +nilM m = mkStream $ \_ _ _ stp -> m >> stp + +{-# INLINE_NORMAL yield #-} +yield :: IsStream t => a -> t m a +yield a = mkStream $ \_ _ single _ -> single a + +{-# INLINE_NORMAL yieldM #-} +yieldM :: (Monad m, IsStream t) => m a -> t m a +yieldM m = fromStream $ mkStream $ \_ _ single _ -> m >>= single + +-- XXX specialize to IO? +{-# INLINE consMBy #-} +consMBy :: (IsStream t, MonadAsync m) => (t m a -> t m a -> t m a) + -> m a -> t m a -> t m a +consMBy f m r = (fromStream $ yieldM m) `f` r + +------------------------------------------------------------------------------ +-- Folding a stream +------------------------------------------------------------------------------ + +-- | Fold a stream by providing an SVar, a stop continuation, a singleton +-- continuation and a yield continuation. The stream would share the current +-- SVar passed via the State. +{-# INLINE_EARLY foldStreamShared #-} +foldStreamShared + :: IsStream t + => State Stream m a + -> (a -> t m a -> m r) + -> (a -> m r) + -> m r + -> t m a + -> m r +foldStreamShared st yld sng stp m = + let yieldk a x = yld a (fromStream x) + MkStream k = toStream m + in k st yieldk sng stp + +-- XXX write a similar rule for foldStream as well? +{-# RULES "foldStreamShared from stream" + foldStreamShared = foldStreamSharedStream #-} +foldStreamSharedStream + :: State Stream m a + -> (a -> Stream m a -> m r) + -> (a -> m r) + -> m r + -> Stream m a + -> m r +foldStreamSharedStream st yld sng stp m = + let MkStream k = toStream m + in k st yld sng stp + +-- | Fold a stream by providing a State, stop continuation, a singleton +-- continuation and a yield continuation. The stream will not use the SVar +-- passed via State. +{-# INLINE foldStream #-} +foldStream + :: IsStream t + => State Stream m a + -> (a -> t m a -> m r) + -> (a -> m r) + -> m r + -> t m a + -> m r +foldStream st yld sng stp m = + let yieldk a x = yld a (fromStream x) + MkStream k = toStream m + in k (adaptState st) yieldk sng stp + +------------------------------------------------------------------------------- +-- Instances +------------------------------------------------------------------------------- + +-- NOTE: specializing the function outside the instance definition seems to +-- improve performance quite a bit at times, even if we have the same +-- SPECIALIZE in the instance definition. +{-# INLINE consMStream #-} +{-# SPECIALIZE consMStream :: IO a -> Stream IO a -> Stream IO a #-} +consMStream :: (Monad m) => m a -> Stream m a -> Stream m a +consMStream m r = MkStream $ \_ yld _ _ -> m >>= \a -> yld a r + +------------------------------------------------------------------------------- +-- IsStream Stream +------------------------------------------------------------------------------- + +instance IsStream Stream where + toStream = id + fromStream = id + + {-# INLINE consM #-} + {-# SPECIALIZE consM :: IO a -> Stream IO a -> Stream IO a #-} + consM :: Monad m => m a -> Stream m a -> Stream m a + consM = consMStream + + {-# INLINE (|:) #-} + {-# SPECIALIZE (|:) :: IO a -> Stream IO a -> Stream IO a #-} + (|:) :: Monad m => m a -> Stream m a -> Stream m a + (|:) = consMStream + +------------------------------------------------------------------------------- +-- foldr/build fusion +------------------------------------------------------------------------------- + +-- XXX perhaps we can just use foldrSM/buildM everywhere as they are more +-- general and cover foldrS/buildS as well. + +-- | The function 'f' decides how to reconstruct the stream. We could +-- reconstruct using a shared state (SVar) or without sharing the state. +-- +{-# INLINE foldrSWith #-} +foldrSWith :: IsStream t + => (forall r. State Stream m b + -> (b -> t m b -> m r) + -> (b -> m r) + -> m r + -> t m b + -> m r) + -> (a -> t m b -> t m b) -> t m b -> t m a -> t m b +foldrSWith f step final m = go m + where + go m1 = mkStream $ \st yld sng stp -> + let run x = f st yld sng stp x + stop = run final + single a = run $ step a final + yieldk a r = run $ step a (go r) + -- XXX if type a and b are the same we do not need adaptState, can we + -- save some perf with that? + -- XXX since we are using adaptState anyway here we can use + -- foldStreamShared instead, will that save some perf? + in foldStream (adaptState st) yieldk single stop m1 + +-- XXX we can use rewrite rules just for foldrSWith, if the function f is the +-- same we can rewrite it. + +-- | Fold sharing the SVar state within the reconstructed stream +{-# INLINE_NORMAL foldrSShared #-} +foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b +foldrSShared = foldrSWith foldStreamShared + +-- XXX consM is a typeclass method, therefore rewritten already. Instead maybe +-- we can make consM polymorphic using rewrite rules. +-- {-# RULES "foldrSShared/id" foldrSShared consM nil = \x -> x #-} +{-# RULES "foldrSShared/nil" + forall k z. foldrSShared k z nil = z #-} +{-# RULES "foldrSShared/single" + forall k z x. foldrSShared k z (yield x) = k x z #-} +-- {-# RULES "foldrSShared/app" [1] +-- forall ys. foldrSShared consM ys = \xs -> xs `conjoin` ys #-} + +-- | Lazy right associative fold to a stream. +{-# INLINE_NORMAL foldrS #-} +foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b +foldrS = foldrSWith foldStream + +{-# RULES "foldrS/id" foldrS cons nil = \x -> x #-} +{-# RULES "foldrS/nil" forall k z. foldrS k z nil = z #-} +-- See notes in GHC.Base about this rule +-- {-# RULES "foldr/cons" +-- forall k z x xs. foldrS k z (x `cons` xs) = k x (foldrS k z xs) #-} +{-# RULES "foldrS/single" forall k z x. foldrS k z (yield x) = k x z #-} +-- {-# RULES "foldrS/app" [1] +-- forall ys. foldrS cons ys = \xs -> xs `conjoin` ys #-} + +------------------------------------------------------------------------------- +-- foldrS with monadic cons i.e. consM +------------------------------------------------------------------------------- + +{-# INLINE foldrSMWith #-} +foldrSMWith :: (IsStream t, Monad m) + => (forall r. State Stream m b + -> (b -> t m b -> m r) + -> (b -> m r) + -> m r + -> t m b + -> m r) + -> (m a -> t m b -> t m b) -> t m b -> t m a -> t m b +foldrSMWith f step final m = go m + where + go m1 = mkStream $ \st yld sng stp -> + let run x = f st yld sng stp x + stop = run final + single a = run $ step (return a) final + yieldk a r = run $ step (return a) (go r) + in foldStream (adaptState st) yieldk single stop m1 + +{-# INLINE_NORMAL foldrSM #-} +foldrSM :: (IsStream t, Monad m) + => (m a -> t m b -> t m b) -> t m b -> t m a -> t m b +foldrSM = foldrSMWith foldStream + +-- {-# RULES "foldrSM/id" foldrSM consM nil = \x -> x #-} +{-# RULES "foldrSM/nil" forall k z. foldrSM k z nil = z #-} +{-# RULES "foldrSM/single" forall k z x. foldrSM k z (yieldM x) = k x z #-} +-- {-# RULES "foldrSM/app" [1] +-- forall ys. foldrSM consM ys = \xs -> xs `conjoin` ys #-} + +-- Like foldrSM but sharing the SVar state within the recostructed stream. +{-# INLINE_NORMAL foldrSMShared #-} +foldrSMShared :: (IsStream t, Monad m) + => (m a -> t m b -> t m b) -> t m b -> t m a -> t m b +foldrSMShared = foldrSMWith foldStreamShared + +-- {-# RULES "foldrSM/id" foldrSM consM nil = \x -> x #-} +{-# RULES "foldrSMShared/nil" + forall k z. foldrSMShared k z nil = z #-} +{-# RULES "foldrSMShared/single" + forall k z x. foldrSMShared k z (yieldM x) = k x z #-} +-- {-# RULES "foldrSM/app" [1] +-- forall ys. foldrSM consM ys = \xs -> xs `conjoin` ys #-} + +------------------------------------------------------------------------------- +-- build +------------------------------------------------------------------------------- + +{-# INLINE_NORMAL build #-} +build :: IsStream t => forall a. (forall b. (a -> b -> b) -> b -> b) -> t m a +build g = g cons nil + +{-# RULES "foldrM/build" + forall k z (g :: forall b. (a -> b -> b) -> b -> b). + foldrM k z (build g) = g k z #-} + +{-# RULES "foldrS/build" + forall k z (g :: forall b. (a -> b -> b) -> b -> b). + foldrS k z (build g) = g k z #-} + +{-# RULES "foldrS/cons/build" + forall k z x (g :: forall b. (a -> b -> b) -> b -> b). + foldrS k z (x `cons` build g) = k x (g k z) #-} + +{-# RULES "foldrSShared/build" + forall k z (g :: forall b. (a -> b -> b) -> b -> b). + foldrSShared k z (build g) = g k z #-} + +{-# RULES "foldrSShared/cons/build" + forall k z x (g :: forall b. (a -> b -> b) -> b -> b). + foldrSShared k z (x `cons` build g) = k x (g k z) #-} + +-- build a stream by applying cons and nil to a build function +{-# INLINE_NORMAL buildS #-} +buildS :: IsStream t => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a +buildS g = g cons nil + +{-# RULES "foldrS/buildS" + forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a). + foldrS k z (buildS g) = g k z #-} + +{-# RULES "foldrS/cons/buildS" + forall k z x (g :: (a -> t m a -> t m a) -> t m a -> t m a). + foldrS k z (x `cons` buildS g) = k x (g k z) #-} + +{-# RULES "foldrSShared/buildS" + forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a). + foldrSShared k z (buildS g) = g k z #-} + +{-# RULES "foldrSShared/cons/buildS" + forall k z x (g :: (a -> t m a -> t m a) -> t m a -> t m a). + foldrSShared k z (x `cons` buildS g) = k x (g k z) #-} + +-- build a stream by applying consM and nil to a build function +{-# INLINE_NORMAL buildSM #-} +buildSM :: (IsStream t, MonadAsync m) + => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a +buildSM g = g consM nil + +{-# RULES "foldrSM/buildSM" + forall k z (g :: (m a -> t m a -> t m a) -> t m a -> t m a). + foldrSM k z (buildSM g) = g k z #-} + +{-# RULES "foldrSMShared/buildSM" + forall k z (g :: (m a -> t m a -> t m a) -> t m a -> t m a). + foldrSMShared k z (buildSM g) = g k z #-} + +-- Disabled because this may not fire as consM is a class Op +{- +{-# RULES "foldrS/consM/buildSM" + forall k z x (g :: (m a -> t m a -> t m a) -> t m a -> t m a) + . foldrSM k z (x `consM` buildSM g) + = k x (g k z) +#-} +-} + +-- Build using monadic build functions (continuations) instead of +-- reconstructing a stream. +{-# INLINE_NORMAL buildM #-} +buildM :: (IsStream t, MonadAsync m) + => (forall r. (a -> t m a -> m r) + -> (a -> m r) + -> m r + -> m r + ) + -> t m a +buildM g = mkStream $ \st yld sng stp -> + g (\a r -> foldStream st yld sng stp (return a `consM` r)) sng stp + +-- | Like 'buildM' but shares the SVar state across computations. +{-# INLINE_NORMAL sharedM #-} +sharedM :: (IsStream t, MonadAsync m) + => (forall r. (a -> t m a -> m r) + -> (a -> m r) + -> m r + -> m r + ) + -> t m a +sharedM g = mkStream $ \st yld sng stp -> + g (\a r -> foldStreamShared st yld sng stp (return a `consM` r)) sng stp + +------------------------------------------------------------------------------- +-- augment +------------------------------------------------------------------------------- + +{-# INLINE_NORMAL augmentS #-} +augmentS :: IsStream t + => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a +augmentS g xs = g cons xs + +{-# RULES "augmentS/nil" + forall (g :: (a -> t m a -> t m a) -> t m a -> t m a). + augmentS g nil = buildS g + #-} + +{-# RULES "foldrS/augmentS" + forall k z xs (g :: (a -> t m a -> t m a) -> t m a -> t m a). + foldrS k z (augmentS g xs) = g k (foldrS k z xs) + #-} + +{-# RULES "augmentS/buildS" + forall (g :: (a -> t m a -> t m a) -> t m a -> t m a) + (h :: (a -> t m a -> t m a) -> t m a -> t m a). + augmentS g (buildS h) = buildS (\c n -> g c (h c n)) + #-} + +{-# INLINE_NORMAL augmentSM #-} +augmentSM :: (IsStream t, MonadAsync m) + => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a +augmentSM g xs = g consM xs + +{-# RULES "augmentSM/nil" + forall (g :: (m a -> t m a -> t m a) -> t m a -> t m a). + augmentSM g nil = buildSM g + #-} + +{-# RULES "foldrSM/augmentSM" + forall k z xs (g :: (m a -> t m a -> t m a) -> t m a -> t m a). + foldrSM k z (augmentSM g xs) = g k (foldrSM k z xs) + #-} + +{-# RULES "augmentSM/buildSM" + forall (g :: (m a -> t m a -> t m a) -> t m a -> t m a) + (h :: (m a -> t m a -> t m a) -> t m a -> t m a). + augmentSM g (buildSM h) = buildSM (\c n -> g c (h c n)) + #-} + +------------------------------------------------------------------------------- +-- Experimental foldrM/buildM +------------------------------------------------------------------------------- + +-- | Lazy right fold with a monadic step function. +{-# INLINE_NORMAL foldrM #-} +foldrM :: IsStream t => (a -> m b -> m b) -> m b -> t m a -> m b +foldrM step acc m = go m + where + go m1 = + let stop = acc + single a = step a acc + yieldk a r = step a (go r) + in foldStream defState yieldk single stop m1 + +{-# INLINE_NORMAL foldrMKWith #-} +foldrMKWith + :: (State Stream m a + -> (a -> t m a -> m b) + -> (a -> m b) + -> m b + -> t m a + -> m b) + -> (a -> m b -> m b) + -> m b + -> ((a -> t m a -> m b) -> (a -> m b) -> m b -> m b) + -> m b +foldrMKWith f step acc g = go g + where + go k = + let stop = acc + single a = step a acc + yieldk a r = step a (go (\yld sng stp -> f defState yld sng stp r)) + in k yieldk single stop + +{- +{-# RULES "foldrM/buildS" + forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a) + . foldrM k z (buildS g) + = g k z +#-} +-} +-- XXX in which case will foldrM/buildM fusion be useful? +{-# RULES "foldrM/buildM" + forall step acc (g :: (forall r. + (a -> t m a -> m r) + -> (a -> m r) + -> m r + -> m r + )). + foldrM step acc (buildM g) = foldrMKWith foldStream step acc g + #-} + +{-# RULES "foldrM/sharedM" + forall step acc (g :: (forall r. + (a -> t m a -> m r) + -> (a -> m r) + -> m r + -> m r + )). + foldrM step acc (sharedM g) = foldrMKWith foldStreamShared step acc g + #-} + +------------------------------------------------------------------------------ +-- Left fold +------------------------------------------------------------------------------ + +-- | Strict left fold with an extraction function. Like the standard strict +-- left fold, but applies a user supplied extraction function (the third +-- argument) to the folded value at the end. This is designed to work with the +-- @foldl@ library. The suffix @x@ is a mnemonic for extraction. +-- +-- Note that the accumulator is always evaluated including the initial value. +{-# INLINE foldlx' #-} +foldlx' :: forall t m a b x. (IsStream t, Monad m) + => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b +foldlx' step begin done m = get $ go m begin + where + {-# NOINLINE get #-} + get :: t m x -> m b + get m1 = + -- XXX we are not strictly evaluating the accumulator here. Is this + -- okay? + let single = return . done + -- XXX this is foldSingleton. why foldStreamShared? + in foldStreamShared undefined undefined single undefined m1 + + -- Note, this can be implemented by making a recursive call to "go", + -- however that is more expensive because of unnecessary recursion + -- that cannot be tail call optimized. Unfolding recursion explicitly via + -- continuations is much more efficient. + go :: t m a -> x -> t m x + go m1 !acc = mkStream $ \_ yld sng _ -> + let stop = sng acc + single a = sng $ step acc a + -- XXX this is foldNonEmptyStream + yieldk a r = foldStream defState yld sng undefined $ + go r (step acc a) + in foldStream defState yieldk single stop m1 + +-- | Strict left associative fold. +{-# INLINE foldl' #-} +foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b +foldl' step begin = foldlx' step begin id + +------------------------------------------------------------------------------ +-- Semigroup +------------------------------------------------------------------------------ + +infixr 6 `serial` + +-- | Appends two streams sequentially, yielding all elements from the first +-- stream, and then all elements from the second stream. +-- +-- >>> import Streamly.Prelude (serial) +-- >>> stream1 = Stream.fromList [1,2] +-- >>> stream2 = Stream.fromList [3,4] +-- >>> Stream.toList $ stream1 `serial` stream2 +-- [1,2,3,4] +-- +-- This operation can be used to fold an infinite lazy container of streams. +-- +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 +{-# INLINE serial #-} +serial :: IsStream t => t m a -> t m a -> t m a +-- XXX This doubles the time of toNullAp benchmark, may not be fusing properly +-- serial xs ys = augmentS (\c n -> foldrS c n xs) ys +serial m1 m2 = go m1 + where + go m = mkStream $ \st yld sng stp -> + let stop = foldStream st yld sng stp m2 + single a = yld a m2 + yieldk a r = yld a (go r) + in foldStream st yieldk single stop m + +-- join/merge/append streams depending on consM +{-# INLINE conjoin #-} +conjoin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a +conjoin xs ys = augmentSM (\c n -> foldrSM c n xs) ys + +instance Semigroup (Stream m a) where + (<>) = serial + +------------------------------------------------------------------------------ +-- Monoid +------------------------------------------------------------------------------ + +instance Monoid (Stream m a) where + mempty = nil + mappend = (<>) + +------------------------------------------------------------------------------- +-- Functor +------------------------------------------------------------------------------- + +-- Note eta expanded +{-# INLINE_LATE mapFB #-} +mapFB :: forall (t :: (Type -> Type) -> Type -> Type) b m a. + (b -> t m b -> t m b) -> (a -> b) -> a -> t m b -> t m b +mapFB c f = \x ys -> c (f x) ys +#undef Type + +{-# RULES +"mapFB/mapFB" forall c f g. mapFB (mapFB c f) g = mapFB c (f . g) +"mapFB/id" forall c. mapFB c (\x -> x) = c + #-} + +{-# INLINE map #-} +map :: IsStream t => (a -> b) -> t m a -> t m b +map f xs = buildS (\c n -> foldrS (mapFB c f) n xs) + +-- XXX This definition might potentially be more efficient, but the cost in the +-- benchmark is dominated by unfoldrM cost so we cannot correctly determine +-- differences in the mapping cost. We should perhaps deduct the cost of +-- unfoldrM from the benchmarks and then compare. +{- +map f m = go m + where + go m1 = + mkStream $ \st yld sng stp -> + let single = sng . f + yieldk a r = yld (f a) (go r) + in foldStream (adaptState st) yieldk single stp m1 +-} + +{-# INLINE_LATE mapMFB #-} +mapMFB :: Monad m => (m b -> t m b -> t m b) -> (a -> m b) -> m a -> t m b -> t m b +mapMFB c f = \x ys -> c (x >>= f) ys + +{-# RULES + "mapMFB/mapMFB" forall c f g. mapMFB (mapMFB c f) g = mapMFB c (f >=> g) + #-} +-- XXX These rules may never fire because pure/return type class rules will +-- fire first. +{- +"mapMFB/pure" forall c. mapMFB c (\x -> pure x) = c +"mapMFB/return" forall c. mapMFB c (\x -> return x) = c +-} + +-- Be careful when modifying this, this uses a consM (|:) deliberately to allow +-- other stream types to overload it. +{-# INLINE mapM #-} +mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b +mapM f = foldrSShared (\x xs -> f x `consM` xs) nil +-- See note under map definition above. +{- +mapM f m = go m + where + go m1 = mkStream $ \st yld sng stp -> + let single a = f a >>= sng + yieldk a r = foldStreamShared st yld sng stp $ f a |: go r + in foldStream (adaptState st) yieldk single stp m1 + -} + +-- This is experimental serial version supporting fusion. +-- +-- XXX what if we do not want to fuse two concurrent mapMs? +-- XXX we can combine two concurrent mapM only if the SVar is of the same type +-- So for now we use it only for serial streams. +-- XXX fusion would be easier for monomoprhic stream types. +-- {-# RULES "mapM serial" mapM = mapMSerial #-} +{-# INLINE mapMSerial #-} +mapMSerial :: MonadAsync m => (a -> m b) -> Stream m a -> Stream m b +mapMSerial f xs = buildSM (\c n -> foldrSMShared (mapMFB c f) n xs) + +-- XXX in fact use the Stream type everywhere and only use polymorphism in the +-- high level modules/prelude. +instance Monad m => Functor (Stream m) where + fmap = map + +------------------------------------------------------------------------------- +-- Transformers +------------------------------------------------------------------------------- + +{- +instance MonadTrans Stream where + {-# INLINE lift #-} + lift = yieldM +-} + +------------------------------------------------------------------------------- +-- Nesting +------------------------------------------------------------------------------- + +-- | Detach a stream from an SVar +{-# INLINE unShare #-} +unShare :: IsStream t => t m a -> t m a +unShare x = mkStream $ \st yld sng stp -> + foldStream st yld sng stp x + +-- XXX the function stream and value stream can run in parallel +{-# INLINE apWith #-} +apWith + :: IsStream t + => (t m b -> t m b -> t m b) + -> t m (a -> b) + -> t m a + -> t m b +apWith par fstream stream = go1 fstream + + where + + go1 m = + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp + single f = foldShared $ unShare (go2 f stream) + yieldk f r = foldShared $ unShare (go2 f stream) `par` go1 r + in foldStream (adaptState st) yieldk single stp m + + go2 f m = + mkStream $ \st yld sng stp -> + let single a = sng (f a) + yieldk a r = yld (f a) (go2 f r) + in foldStream (adaptState st) yieldk single stp m + +{-# INLINE apSerial #-} +apSerial + :: IsStream t + => t m (a -> b) + -> t m a + -> t m b +apSerial fstream stream = go1 fstream + + where + + go1 m = + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp + single f = foldShared $ go3 f stream + yieldk f r = foldShared $ go2 f r stream + in foldStream (adaptState st) yieldk single stp m + + go2 f r1 m = + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp + stop = foldShared $ go1 r1 + single a = yld (f a) (go1 r1) + yieldk a r = yld (f a) (go2 f r1 r) + in foldStream (adaptState st) yieldk single stop m + + go3 f m = + mkStream $ \st yld sng stp -> + let single a = sng (f a) + yieldk a r = yld (f a) (go3 f r) + in foldStream (adaptState st) yieldk single stp m + +{-# INLINE apSerialDiscardFst #-} +apSerialDiscardFst + :: IsStream t + => t m a + -> t m b + -> t m b +apSerialDiscardFst fstream stream = go1 fstream + + where + + go1 m = + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp + single _ = foldShared $ stream + yieldk _ r = foldShared $ go2 r stream + in foldStream (adaptState st) yieldk single stp m + + go2 r1 m = + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp + stop = foldShared $ go1 r1 + single a = yld a (go1 r1) + yieldk a r = yld a (go2 r1 r) + in foldStream st yieldk single stop m + +{-# INLINE apSerialDiscardSnd #-} +apSerialDiscardSnd + :: IsStream t + => t m a + -> t m b + -> t m a +apSerialDiscardSnd fstream stream = go1 fstream + + where + + go1 m = + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp + single f = foldShared $ go3 f stream + yieldk f r = foldShared $ go2 f r stream + in foldStream st yieldk single stp m + + go2 f r1 m = + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp + stop = foldShared $ go1 r1 + single _ = yld f (go1 r1) + yieldk _ r = yld f (go2 f r1 r) + in foldStream (adaptState st) yieldk single stop m + + go3 f m = + mkStream $ \st yld sng stp -> + let single _ = sng f + yieldk _ r = yld f (go3 f r) + in foldStream (adaptState st) yieldk single stp m + +-- XXX This is just concatMapBy with arguments flipped. We need to keep this +-- instead of using a concatMap style definition because the bind +-- implementation in Async and WAsync streams show significant perf degradation +-- if the argument order is changed. +{-# INLINE bindWith #-} +bindWith + :: IsStream t + => (t m b -> t m b -> t m b) + -> t m a + -> (a -> t m b) + -> t m b +bindWith par m1 f = go m1 + where + go m = + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp + single a = foldShared $ unShare (f a) + yieldk a r = foldShared $ unShare (f a) `par` go r + in foldStream (adaptState st) yieldk single stp m + +-- XXX express in terms of foldrS? +-- XXX can we use a different stream type for the generated stream being +-- falttened so that we can combine them differently and keep the resulting +-- stream different? +-- XXX do we need specialize to IO? +-- XXX can we optimize when c and a are same, by removing the forall using +-- rewrite rules with type applications? + +-- | Perform a 'concatMap' using a specified concat strategy. The first +-- argument specifies a merge or concat function that is used to merge the +-- streams generated by the map function. For example, the concat function +-- could be 'serial', 'parallel', 'async', 'ahead' or any other zip or merge +-- function. +-- +-- @since 0.7.0 +{-# INLINE concatMapBy #-} +concatMapBy + :: IsStream t + => (t m b -> t m b -> t m b) + -> (a -> t m b) + -> t m a + -> t m b +concatMapBy par f xs = bindWith par xs f + +{-# INLINE concatMap #-} +concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b +concatMap f m = fromStream $ + concatMapBy serial + (\a -> adapt $ toStream $ f a) + (adapt $ toStream m) + +{- +-- Fused version. +-- XXX This fuses but when the stream is nil this performs poorly. +-- The filterAllOut benchmark degrades. Need to investigate and fix that. +{-# INLINE concatMap #-} +concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b +concatMap f xs = buildS + (\c n -> foldrS (\x b -> foldrS c b (f x)) n xs) + +-- Stream polymorphic concatMap implementation +-- XXX need to use buildSM/foldrSMShared for parallel behavior +-- XXX unShare seems to degrade the fused performance +{-# INLINE_EARLY concatMap_ #-} +concatMap_ :: IsStream t => (a -> t m b) -> t m a -> t m b +concatMap_ f xs = buildS + (\c n -> foldrSShared (\x b -> foldrSShared c b (unShare $ f x)) n xs) +-} + +-- | See 'Streamly.Internal.Data.Stream.IsStream.concatPairsWith' for +-- documentation. +-- +{-# INLINE concatPairsWith #-} +concatPairsWith + :: IsStream t + => (t m b -> t m b -> t m b) + -> (a -> t m b) + -> t m a + -> t m b +concatPairsWith combine f = go Nothing + + where + + go Nothing stream = + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp + single a = foldShared $ unShare (f a) + yieldk a r = foldShared $ go (Just a) r + in foldStream (adaptState st) yieldk single stp stream + go (Just a1) stream = + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp + stop = foldShared $ unShare (f a1) + single a = foldShared $ unShare (f a1) `combine` f a + yieldk a r = + foldShared + $ concatPairsWith combine + (\(x,y) -> combine (unShare x) y) + $ (f a1, f a) `cons` makePairs Nothing r + in foldStream (adaptState st) yieldk single stop stream + + makePairs Nothing stream = + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp + single a = sng (f a, nil) + yieldk a r = foldShared $ makePairs (Just a) r + in foldStream (adaptState st) yieldk single stp stream + makePairs (Just a1) stream = + mkStream $ \st yld sng _ -> + let stop = sng (f a1, nil) + single a = sng (f a1, f a) + yieldk a r = yld (f a1, f a) (makePairs Nothing r) + in foldStream (adaptState st) yieldk single stop stream + +instance Monad m => Applicative (Stream m) where + {-# INLINE pure #-} + pure = yield + {-# INLINE (<*>) #-} + (<*>) = ap + +-- NOTE: even though concatMap for StreamD is 3x faster compared to StreamK, +-- the monad instance of StreamD is slower than StreamK after foldr/build +-- fusion. +instance Monad m => Monad (Stream m) where + {-# INLINE return #-} + return = pure + {-# INLINE (>>=) #-} + (>>=) = flip concatMap + +{- +-- Like concatMap but generates stream using an unfold function. Similar to +-- unfoldMany but for StreamK. +concatUnfoldr :: IsStream t + => (b -> t m (Maybe (a, b))) -> t m b -> t m a +concatUnfoldr = undefined +-} +{-# INLINE unfoldrM #-} +unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a +unfoldrM step = go + where + go s = sharedM $ \yld _ stp -> do + r <- step s + case r of + Just (a, b) -> yld a (go b) + Nothing -> stp + +{-# INLINE drain #-} +drain :: (Monad m, IsStream t) => t m a -> m () +drain = foldrM (\_ xs -> xs) (return ()) diff --git a/testsuite/tests/profiling/should_compile/T19894/Unfold.hs b/testsuite/tests/profiling/should_compile/T19894/Unfold.hs new file mode 100644 index 0000000000..838c8bf344 --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/Unfold.hs @@ -0,0 +1,65 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE ExistentialQuantification #-} +module Unfold + ( Unfold (..) + , supplyFirst + , many + , lmap + ) + +where + +import Step (Step(..)) +#if defined(FUSION_PLUGIN) +import Fusion.Plugin.Types (Fuse(..)) +#endif + +data Unfold m a b = + -- | @Unfold step inject@ + forall s. Unfold (s -> m (Step s b)) (a -> m s) + +{-# INLINE [1] lmap #-} +lmap :: (a -> c) -> Unfold m c b -> Unfold m a b +lmap f (Unfold ustep uinject) = Unfold ustep (uinject Prelude.. f) + +{-# INLINE [1] supplyFirst #-} +supplyFirst :: a -> Unfold m (a, b) c -> Unfold m b c +supplyFirst a = lmap (a, ) + +#if defined(FUSION_PLUGIN) +{-# ANN type ConcatState Fuse #-} +#endif +data ConcatState s1 s2 = ConcatOuter s1 | ConcatInner s1 s2 + +-- | Apply the second unfold to each output element of the first unfold and +-- flatten the output in a single stream. +-- +-- /Since: 0.8.0/ +-- +{-# INLINE [1] many #-} +many :: Monad m => Unfold m a b -> Unfold m b c -> Unfold m a c +many (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject + + where + + inject x = do + s <- inject1 x + return $ ConcatOuter s + + {-# INLINE [0] step #-} + step (ConcatOuter st) = do + r <- step1 st + case r of + Yield x s -> do + innerSt <- inject2 x + return $ Skip (ConcatInner s innerSt) + Skip s -> return $ Skip (ConcatOuter s) + Stop -> return Stop + + step (ConcatInner ost ist) = do + r <- step2 ist + return $ case r of + Yield x s -> Yield x (ConcatInner ost s) + Skip s -> Skip (ConcatInner ost s) + Stop -> Skip (ConcatOuter ost) diff --git a/testsuite/tests/profiling/should_compile/T19894/inline.hs b/testsuite/tests/profiling/should_compile/T19894/inline.hs new file mode 100644 index 0000000000..daa7ec659e --- /dev/null +++ b/testsuite/tests/profiling/should_compile/T19894/inline.hs @@ -0,0 +1,27 @@ +-- We use fromStreamK/toStreamK to convert the direct style stream to CPS +-- style. In the first phase we try fusing the fromStreamK/toStreamK using: +-- +-- {-# RULES "fromStreamK/toStreamK fusion" +-- forall s. toStreamK (fromStreamK s) = s #-} +-- +-- If for some reason some of the operations could not be fused then we have +-- fallback rules in the second phase. For example: +-- +-- {-# INLINE_EARLY unfoldr #-} +-- unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a +-- unfoldr step seed = fromStreamS (S.unfoldr step seed) +-- {-# RULES "unfoldr fallback to StreamK" [1] +-- forall a b. S.toStreamK (S.unfoldr a b) = K.unfoldr a b #-}``` +-- +-- Then, fromStreamK/toStreamK are inlined in the last phase: +-- +-- {-# INLINE_LATE toStreamK #-} +-- toStreamK :: Monad m => Stream m a -> K.Stream m a``` +-- +-- The fallback rules make sure that if we could not fuse the direct style +-- operations then better use the CPS style operation, because unfused direct +-- style would have worse performance than the CPS style ops. + +#define INLINE_EARLY INLINE [2] +#define INLINE_NORMAL INLINE [1] +#define INLINE_LATE INLINE [0] diff --git a/testsuite/tests/profiling/should_compile/all.T b/testsuite/tests/profiling/should_compile/all.T index 6f41f6b4a4..6d16aeff6c 100644 --- a/testsuite/tests/profiling/should_compile/all.T +++ b/testsuite/tests/profiling/should_compile/all.T @@ -10,3 +10,4 @@ test('T12790', [only_ways(['normal']), req_profiling], compile, ['-O -prof']) test('T14931', [when(opsys('mingw32'), skip), only_ways(['normal']), req_profiling], makefile_test, ['T14931']) test('T15108', [only_ways(['normal']), req_profiling], compile, ['-O -prof -fprof-auto']) +test('T19894', [only_ways(['normal']), req_profiling, extra_files(['T19894'])], multimod_compile, ['Main', '-v0 -O2 -prof -fprof-auto -iT19894']) |