summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosé Valim <jose.valim@dashbit.co>2022-01-05 12:42:31 +0100
committerJosé Valim <jose.valim@dashbit.co>2022-01-05 12:48:16 +0100
commit4090e990b7e1d53f7bffbda93a62800df9f3f327 (patch)
tree704519c25e261a44b1899bc1414b8c617b07243b
parent578e59e34cf01f9a7294c1c93b0a9826282125be (diff)
downloadelixir-4090e990b7e1d53f7bffbda93a62800df9f3f327.tar.gz
Add Stream.transform/5
This function allows the transformation accumulator to be processed in order to emit the last elements of the collection.
-rw-r--r--lib/elixir/lib/stream.ex134
-rw-r--r--lib/elixir/test/elixir/stream_test.exs46
2 files changed, 139 insertions, 41 deletions
diff --git a/lib/elixir/lib/stream.ex b/lib/elixir/lib/stream.ex
index afe356916..f84d8971a 100644
--- a/lib/elixir/lib/stream.ex
+++ b/lib/elixir/lib/stream.ex
@@ -856,44 +856,72 @@ defmodule Stream do
iex> Enum.to_list(stream)
[1001, 1002, 1003]
+ `Stream.transform/5` further generalizes this function to allow wrapping
+ around resources.
"""
@spec transform(Enumerable.t(), acc, fun) :: Enumerable.t()
when fun: (element, acc -> {Enumerable.t(), acc} | {:halt, acc}),
acc: any
def transform(enum, acc, reducer) when is_function(reducer, 2) do
- &do_transform(enum, fn -> acc end, reducer, &1, &2, nil)
+ &do_transform(enum, fn -> acc end, reducer, &1, &2, nil, fn acc -> acc end)
end
@doc """
- Transforms an existing stream with function-based start and finish.
-
- The accumulator is only calculated when transformation starts. It also
- allows an after function to be given which is invoked when the stream
- halts or completes.
+ Similar to `Stream.transform/5`, except `last_fun` is not supplied.
This function can be seen as a combination of `Stream.resource/3` with
`Stream.transform/3`.
"""
- @spec transform(Enumerable.t(), (() -> acc), fun, (acc -> term)) :: Enumerable.t()
- when fun: (element, acc -> {Enumerable.t(), acc} | {:halt, acc}),
- acc: any
+ @spec transform(Enumerable.t(), start_fun, reducer, after_fun) :: Enumerable.t()
+ when start_fun: (() -> acc),
+ reducer: (element, acc -> {Enumerable.t(), acc} | {:halt, acc}),
+ after_fun: (acc -> term),
+ acc: any
def transform(enum, start_fun, reducer, after_fun)
when is_function(start_fun, 0) and is_function(reducer, 2) and is_function(after_fun, 1) do
- &do_transform(enum, start_fun, reducer, &1, &2, after_fun)
+ &do_transform(enum, start_fun, reducer, &1, &2, nil, after_fun)
+ end
+
+ @doc """
+ Transforms an existing stream with function-based start, last, and after
+ callbacks.
+
+ Once transformation starts, `start_fun` is invoked to compute the initial
+ accumulator. Then, for each element in the enumerable, the `reducer` function
+ is invoked with the element and the accumulator, returning new elements and a
+ new accumulator, as in `transform/3`.
+
+ Once the collection is done, `last_fun` is invoked with the accumulator to
+ emit any remaining items. Then `after_fun` is invoked, to close any resource,
+ but not emitting any new items. `last_fun` is only invoked if the given
+ enumerable terminates successfully (either because it is done or it halted
+ itself). `after_fun` is always invoked, therefore `after_fun` must be the
+ one used for closing resources.
+ """
+ @spec transform(Enumerable.t(), start_fun, reducer, last_fun, after_fun) :: Enumerable.t()
+ when start_fun: (() -> acc),
+ reducer: (element, acc -> {Enumerable.t(), acc} | {:halt, acc}),
+ last_fun: (acc -> {Enumerable.t(), acc} | {:halt, acc}),
+ after_fun: (acc -> term),
+ acc: any
+ def transform(enum, start_fun, reducer, last_fun, after_fun)
+ when is_function(start_fun, 0) and is_function(reducer, 2) and is_function(last_fun, 1) and
+ is_function(after_fun, 1) do
+ &do_transform(enum, start_fun, reducer, &1, &2, last_fun, after_fun)
end
- defp do_transform(enumerables, user_acc, user, inner_acc, fun, after_fun) do
+ defp do_transform(enumerables, user_acc, user, inner_acc, fun, last_fun, after_fun) do
inner = &do_transform_each(&1, &2, fun)
step = &do_transform_step(&1, &2)
next = &Enumerable.reduce(enumerables, &1, step)
- funs = {user, fun, inner, after_fun}
+ funs = {user, fun, inner, last_fun, after_fun}
do_transform(user_acc.(), :cont, next, inner_acc, funs)
end
defp do_transform(user_acc, _next_op, next, {:halt, inner_acc}, funs) do
- {_, _, _, after_fun} = funs
+ {_, _, _, _, after_fun} = funs
next.({:halt, []})
- do_after(after_fun, user_acc)
+ after_fun.(user_acc)
{:halted, inner_acc}
end
@@ -901,72 +929,99 @@ defmodule Stream do
{:suspended, inner_acc, &do_transform(user_acc, next_op, next, &1, funs)}
end
- defp do_transform(user_acc, :halt, _next, {_, inner_acc}, funs) do
- {_, _, _, after_fun} = funs
- do_after(after_fun, user_acc)
- {:halted, inner_acc}
- end
-
defp do_transform(user_acc, :cont, next, inner_acc, funs) do
- {_, _, _, after_fun} = funs
+ {_, _, _, _, after_fun} = funs
try do
next.({:cont, []})
catch
kind, reason ->
- do_after(after_fun, user_acc)
+ after_fun.(user_acc)
:erlang.raise(kind, reason, __STACKTRACE__)
else
{:suspended, vals, next} ->
do_transform_user(:lists.reverse(vals), user_acc, :cont, next, inner_acc, funs)
{_, vals} ->
- do_transform_user(:lists.reverse(vals), user_acc, :halt, next, inner_acc, funs)
+ do_transform_user(:lists.reverse(vals), user_acc, :last, next, inner_acc, funs)
+ end
+ end
+
+ defp do_transform(user_acc, :last, next, inner_acc, funs) do
+ {_, _, _, last_fun, after_fun} = funs
+
+ if last_fun do
+ try do
+ last_fun.(user_acc)
+ catch
+ kind, reason ->
+ next.({:halt, []})
+ after_fun.(user_acc)
+ :erlang.raise(kind, reason, __STACKTRACE__)
+ else
+ result -> do_transform_result(result, [], :halt, next, inner_acc, funs)
+ end
+ else
+ do_transform(user_acc, :halt, next, inner_acc, funs)
end
end
+ defp do_transform(user_acc, :halt, _next, inner_acc, funs) do
+ {_, _, _, _, after_fun} = funs
+ after_fun.(user_acc)
+ {:halted, elem(inner_acc, 1)}
+ end
+
defp do_transform_user([], user_acc, next_op, next, inner_acc, funs) do
do_transform(user_acc, next_op, next, inner_acc, funs)
end
defp do_transform_user([val | vals], user_acc, next_op, next, inner_acc, funs) do
- {user, fun, inner, after_fun} = funs
+ {user, _, _, _, after_fun} = funs
try do
user.(val, user_acc)
catch
kind, reason ->
next.({:halt, []})
- do_after(after_fun, user_acc)
+ after_fun.(user_acc)
:erlang.raise(kind, reason, __STACKTRACE__)
else
+ result -> do_transform_result(result, vals, next_op, next, inner_acc, funs)
+ end
+ end
+
+ defp do_transform_result(result, vals, next_op, next, inner_acc, funs) do
+ {_, fun, inner, _, after_fun} = funs
+
+ case result do
{[], user_acc} ->
do_transform_user(vals, user_acc, next_op, next, inner_acc, funs)
{list, user_acc} when is_list(list) ->
reduce = &Enumerable.List.reduce(list, &1, fun)
- do_list_transform(vals, user_acc, next_op, next, inner_acc, reduce, funs)
+ do_transform_inner_list(vals, user_acc, next_op, next, inner_acc, reduce, funs)
{:halt, user_acc} ->
next.({:halt, []})
- do_after(after_fun, user_acc)
+ after_fun.(user_acc)
{:halted, elem(inner_acc, 1)}
{other, user_acc} ->
reduce = &Enumerable.reduce(other, &1, inner)
- do_enum_transform(vals, user_acc, next_op, next, inner_acc, reduce, funs)
+ do_transform_inner_enum(vals, user_acc, next_op, next, inner_acc, reduce, funs)
end
end
- defp do_list_transform(vals, user_acc, next_op, next, inner_acc, reduce, funs) do
- {_, _, _, after_fun} = funs
+ defp do_transform_inner_list(vals, user_acc, next_op, next, inner_acc, reduce, funs) do
+ {_, _, _, _, after_fun} = funs
try do
reduce.(inner_acc)
catch
kind, reason ->
next.({:halt, []})
- do_after(after_fun, user_acc)
+ after_fun.(user_acc)
:erlang.raise(kind, reason, __STACKTRACE__)
else
{:done, acc} ->
@@ -974,24 +1029,24 @@ defmodule Stream do
{:halted, acc} ->
next.({:halt, []})
- do_after(after_fun, user_acc)
+ after_fun.(user_acc)
{:halted, acc}
{:suspended, acc, continuation} ->
- resume = &do_list_transform(vals, user_acc, next_op, next, &1, continuation, funs)
+ resume = &do_transform_inner_list(vals, user_acc, next_op, next, &1, continuation, funs)
{:suspended, acc, resume}
end
end
- defp do_enum_transform(vals, user_acc, next_op, next, {op, inner_acc}, reduce, funs) do
- {_, _, _, after_fun} = funs
+ defp do_transform_inner_enum(vals, user_acc, next_op, next, {op, inner_acc}, reduce, funs) do
+ {_, _, _, _, after_fun} = funs
try do
reduce.({op, [:outer | inner_acc]})
catch
kind, reason ->
next.({:halt, []})
- do_after(after_fun, user_acc)
+ after_fun.(user_acc)
:erlang.raise(kind, reason, __STACKTRACE__)
else
# Only take into account outer halts when the op is not halt itself.
@@ -1001,21 +1056,18 @@ defmodule Stream do
{:halted, [_ | acc]} ->
next.({:halt, []})
- do_after(after_fun, user_acc)
+ after_fun.(user_acc)
{:halted, acc}
{:done, [_ | acc]} ->
do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs)
{:suspended, [_ | acc], continuation} ->
- resume = &do_enum_transform(vals, user_acc, next_op, next, &1, continuation, funs)
+ resume = &do_transform_inner_enum(vals, user_acc, next_op, next, &1, continuation, funs)
{:suspended, acc, resume}
end
end
- defp do_after(nil, _user_acc), do: :ok
- defp do_after(fun, user_acc), do: fun.(user_acc)
-
defp do_transform_each(x, [:outer | acc], f) do
case f.(x, acc) do
{:halt, res} -> {:halt, [:inner | res]}
diff --git a/lib/elixir/test/elixir/stream_test.exs b/lib/elixir/test/elixir/stream_test.exs
index 71c5187e6..5023dcb7b 100644
--- a/lib/elixir/test/elixir/stream_test.exs
+++ b/lib/elixir/test/elixir/stream_test.exs
@@ -1000,6 +1000,52 @@ defmodule StreamTest do
assert Process.get(:stream_transform)
end
+ test "transform/5 emits last elements on done" do
+ stream =
+ Stream.transform(
+ 1..5//2,
+ fn -> 0 end,
+ fn i, _acc -> {i..(i + 1), i+1} end,
+ fn 6 -> {7..10, 10} end,
+ fn i when is_integer(i) -> Process.put(__MODULE__, i) end
+ )
+
+ assert Enum.to_list(stream) == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+ assert Process.get(__MODULE__) == 10
+
+ assert Enum.take(stream, 3) == [1, 2, 3]
+ assert Process.get(__MODULE__) == 4
+
+ assert Enum.take(stream, 4) == [1, 2, 3, 4]
+ assert Process.get(__MODULE__) == 4
+
+ assert Enum.take(stream, 7) == [1, 2, 3, 4, 5, 6, 7]
+ assert Process.get(__MODULE__) == 10
+ end
+
+ test "transform/5 emits last elements on inner halt done" do
+ stream =
+ Stream.transform(
+ Stream.take(1..15//2, 3),
+ fn -> 0 end,
+ fn i, _acc -> {i..(i + 1), i+1} end,
+ fn 6 -> {7..10, 10} end,
+ fn i when is_integer(i) -> Process.put(__MODULE__, i) end
+ )
+
+ assert Enum.to_list(stream) == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+ assert Process.get(__MODULE__) == 10
+
+ assert Enum.take(stream, 3) == [1, 2, 3]
+ assert Process.get(__MODULE__) == 4
+
+ assert Enum.take(stream, 4) == [1, 2, 3, 4]
+ assert Process.get(__MODULE__) == 4
+
+ assert Enum.take(stream, 7) == [1, 2, 3, 4, 5, 6, 7]
+ assert Process.get(__MODULE__) == 10
+ end
+
test "scan/2" do
stream = Stream.scan(1..5, &(&1 + &2))
assert lazy?(stream)