diff options
author | José Valim <jose.valim@dashbit.co> | 2022-01-05 12:42:31 +0100 |
---|---|---|
committer | José Valim <jose.valim@dashbit.co> | 2022-01-05 12:48:16 +0100 |
commit | 4090e990b7e1d53f7bffbda93a62800df9f3f327 (patch) | |
tree | 704519c25e261a44b1899bc1414b8c617b07243b | |
parent | 578e59e34cf01f9a7294c1c93b0a9826282125be (diff) | |
download | elixir-4090e990b7e1d53f7bffbda93a62800df9f3f327.tar.gz |
Add Stream.transform/5
This function allows the transformation accumulator to
be processed in order to emit the last elements of the
collection.
-rw-r--r-- | lib/elixir/lib/stream.ex | 134 | ||||
-rw-r--r-- | lib/elixir/test/elixir/stream_test.exs | 46 |
2 files changed, 139 insertions, 41 deletions
diff --git a/lib/elixir/lib/stream.ex b/lib/elixir/lib/stream.ex index afe356916..f84d8971a 100644 --- a/lib/elixir/lib/stream.ex +++ b/lib/elixir/lib/stream.ex @@ -856,44 +856,72 @@ defmodule Stream do iex> Enum.to_list(stream) [1001, 1002, 1003] + `Stream.transform/5` further generalizes this function to allow wrapping + around resources. """ @spec transform(Enumerable.t(), acc, fun) :: Enumerable.t() when fun: (element, acc -> {Enumerable.t(), acc} | {:halt, acc}), acc: any def transform(enum, acc, reducer) when is_function(reducer, 2) do - &do_transform(enum, fn -> acc end, reducer, &1, &2, nil) + &do_transform(enum, fn -> acc end, reducer, &1, &2, nil, fn acc -> acc end) end @doc """ - Transforms an existing stream with function-based start and finish. - - The accumulator is only calculated when transformation starts. It also - allows an after function to be given which is invoked when the stream - halts or completes. + Similar to `Stream.transform/5`, except `last_fun` is not supplied. This function can be seen as a combination of `Stream.resource/3` with `Stream.transform/3`. """ - @spec transform(Enumerable.t(), (() -> acc), fun, (acc -> term)) :: Enumerable.t() - when fun: (element, acc -> {Enumerable.t(), acc} | {:halt, acc}), - acc: any + @spec transform(Enumerable.t(), start_fun, reducer, after_fun) :: Enumerable.t() + when start_fun: (() -> acc), + reducer: (element, acc -> {Enumerable.t(), acc} | {:halt, acc}), + after_fun: (acc -> term), + acc: any def transform(enum, start_fun, reducer, after_fun) when is_function(start_fun, 0) and is_function(reducer, 2) and is_function(after_fun, 1) do - &do_transform(enum, start_fun, reducer, &1, &2, after_fun) + &do_transform(enum, start_fun, reducer, &1, &2, nil, after_fun) + end + + @doc """ + Transforms an existing stream with function-based start, last, and after + callbacks. + + Once transformation starts, `start_fun` is invoked to compute the initial + accumulator. Then, for each element in the enumerable, the `reducer` function + is invoked with the element and the accumulator, returning new elements and a + new accumulator, as in `transform/3`. + + Once the collection is done, `last_fun` is invoked with the accumulator to + emit any remaining items. Then `after_fun` is invoked, to close any resource, + but not emitting any new items. `last_fun` is only invoked if the given + enumerable terminates successfully (either because it is done or it halted + itself). `after_fun` is always invoked, therefore `after_fun` must be the + one used for closing resources. + """ + @spec transform(Enumerable.t(), start_fun, reducer, last_fun, after_fun) :: Enumerable.t() + when start_fun: (() -> acc), + reducer: (element, acc -> {Enumerable.t(), acc} | {:halt, acc}), + last_fun: (acc -> {Enumerable.t(), acc} | {:halt, acc}), + after_fun: (acc -> term), + acc: any + def transform(enum, start_fun, reducer, last_fun, after_fun) + when is_function(start_fun, 0) and is_function(reducer, 2) and is_function(last_fun, 1) and + is_function(after_fun, 1) do + &do_transform(enum, start_fun, reducer, &1, &2, last_fun, after_fun) end - defp do_transform(enumerables, user_acc, user, inner_acc, fun, after_fun) do + defp do_transform(enumerables, user_acc, user, inner_acc, fun, last_fun, after_fun) do inner = &do_transform_each(&1, &2, fun) step = &do_transform_step(&1, &2) next = &Enumerable.reduce(enumerables, &1, step) - funs = {user, fun, inner, after_fun} + funs = {user, fun, inner, last_fun, after_fun} do_transform(user_acc.(), :cont, next, inner_acc, funs) end defp do_transform(user_acc, _next_op, next, {:halt, inner_acc}, funs) do - {_, _, _, after_fun} = funs + {_, _, _, _, after_fun} = funs next.({:halt, []}) - do_after(after_fun, user_acc) + after_fun.(user_acc) {:halted, inner_acc} end @@ -901,72 +929,99 @@ defmodule Stream do {:suspended, inner_acc, &do_transform(user_acc, next_op, next, &1, funs)} end - defp do_transform(user_acc, :halt, _next, {_, inner_acc}, funs) do - {_, _, _, after_fun} = funs - do_after(after_fun, user_acc) - {:halted, inner_acc} - end - defp do_transform(user_acc, :cont, next, inner_acc, funs) do - {_, _, _, after_fun} = funs + {_, _, _, _, after_fun} = funs try do next.({:cont, []}) catch kind, reason -> - do_after(after_fun, user_acc) + after_fun.(user_acc) :erlang.raise(kind, reason, __STACKTRACE__) else {:suspended, vals, next} -> do_transform_user(:lists.reverse(vals), user_acc, :cont, next, inner_acc, funs) {_, vals} -> - do_transform_user(:lists.reverse(vals), user_acc, :halt, next, inner_acc, funs) + do_transform_user(:lists.reverse(vals), user_acc, :last, next, inner_acc, funs) + end + end + + defp do_transform(user_acc, :last, next, inner_acc, funs) do + {_, _, _, last_fun, after_fun} = funs + + if last_fun do + try do + last_fun.(user_acc) + catch + kind, reason -> + next.({:halt, []}) + after_fun.(user_acc) + :erlang.raise(kind, reason, __STACKTRACE__) + else + result -> do_transform_result(result, [], :halt, next, inner_acc, funs) + end + else + do_transform(user_acc, :halt, next, inner_acc, funs) end end + defp do_transform(user_acc, :halt, _next, inner_acc, funs) do + {_, _, _, _, after_fun} = funs + after_fun.(user_acc) + {:halted, elem(inner_acc, 1)} + end + defp do_transform_user([], user_acc, next_op, next, inner_acc, funs) do do_transform(user_acc, next_op, next, inner_acc, funs) end defp do_transform_user([val | vals], user_acc, next_op, next, inner_acc, funs) do - {user, fun, inner, after_fun} = funs + {user, _, _, _, after_fun} = funs try do user.(val, user_acc) catch kind, reason -> next.({:halt, []}) - do_after(after_fun, user_acc) + after_fun.(user_acc) :erlang.raise(kind, reason, __STACKTRACE__) else + result -> do_transform_result(result, vals, next_op, next, inner_acc, funs) + end + end + + defp do_transform_result(result, vals, next_op, next, inner_acc, funs) do + {_, fun, inner, _, after_fun} = funs + + case result do {[], user_acc} -> do_transform_user(vals, user_acc, next_op, next, inner_acc, funs) {list, user_acc} when is_list(list) -> reduce = &Enumerable.List.reduce(list, &1, fun) - do_list_transform(vals, user_acc, next_op, next, inner_acc, reduce, funs) + do_transform_inner_list(vals, user_acc, next_op, next, inner_acc, reduce, funs) {:halt, user_acc} -> next.({:halt, []}) - do_after(after_fun, user_acc) + after_fun.(user_acc) {:halted, elem(inner_acc, 1)} {other, user_acc} -> reduce = &Enumerable.reduce(other, &1, inner) - do_enum_transform(vals, user_acc, next_op, next, inner_acc, reduce, funs) + do_transform_inner_enum(vals, user_acc, next_op, next, inner_acc, reduce, funs) end end - defp do_list_transform(vals, user_acc, next_op, next, inner_acc, reduce, funs) do - {_, _, _, after_fun} = funs + defp do_transform_inner_list(vals, user_acc, next_op, next, inner_acc, reduce, funs) do + {_, _, _, _, after_fun} = funs try do reduce.(inner_acc) catch kind, reason -> next.({:halt, []}) - do_after(after_fun, user_acc) + after_fun.(user_acc) :erlang.raise(kind, reason, __STACKTRACE__) else {:done, acc} -> @@ -974,24 +1029,24 @@ defmodule Stream do {:halted, acc} -> next.({:halt, []}) - do_after(after_fun, user_acc) + after_fun.(user_acc) {:halted, acc} {:suspended, acc, continuation} -> - resume = &do_list_transform(vals, user_acc, next_op, next, &1, continuation, funs) + resume = &do_transform_inner_list(vals, user_acc, next_op, next, &1, continuation, funs) {:suspended, acc, resume} end end - defp do_enum_transform(vals, user_acc, next_op, next, {op, inner_acc}, reduce, funs) do - {_, _, _, after_fun} = funs + defp do_transform_inner_enum(vals, user_acc, next_op, next, {op, inner_acc}, reduce, funs) do + {_, _, _, _, after_fun} = funs try do reduce.({op, [:outer | inner_acc]}) catch kind, reason -> next.({:halt, []}) - do_after(after_fun, user_acc) + after_fun.(user_acc) :erlang.raise(kind, reason, __STACKTRACE__) else # Only take into account outer halts when the op is not halt itself. @@ -1001,21 +1056,18 @@ defmodule Stream do {:halted, [_ | acc]} -> next.({:halt, []}) - do_after(after_fun, user_acc) + after_fun.(user_acc) {:halted, acc} {:done, [_ | acc]} -> do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs) {:suspended, [_ | acc], continuation} -> - resume = &do_enum_transform(vals, user_acc, next_op, next, &1, continuation, funs) + resume = &do_transform_inner_enum(vals, user_acc, next_op, next, &1, continuation, funs) {:suspended, acc, resume} end end - defp do_after(nil, _user_acc), do: :ok - defp do_after(fun, user_acc), do: fun.(user_acc) - defp do_transform_each(x, [:outer | acc], f) do case f.(x, acc) do {:halt, res} -> {:halt, [:inner | res]} diff --git a/lib/elixir/test/elixir/stream_test.exs b/lib/elixir/test/elixir/stream_test.exs index 71c5187e6..5023dcb7b 100644 --- a/lib/elixir/test/elixir/stream_test.exs +++ b/lib/elixir/test/elixir/stream_test.exs @@ -1000,6 +1000,52 @@ defmodule StreamTest do assert Process.get(:stream_transform) end + test "transform/5 emits last elements on done" do + stream = + Stream.transform( + 1..5//2, + fn -> 0 end, + fn i, _acc -> {i..(i + 1), i+1} end, + fn 6 -> {7..10, 10} end, + fn i when is_integer(i) -> Process.put(__MODULE__, i) end + ) + + assert Enum.to_list(stream) == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + assert Process.get(__MODULE__) == 10 + + assert Enum.take(stream, 3) == [1, 2, 3] + assert Process.get(__MODULE__) == 4 + + assert Enum.take(stream, 4) == [1, 2, 3, 4] + assert Process.get(__MODULE__) == 4 + + assert Enum.take(stream, 7) == [1, 2, 3, 4, 5, 6, 7] + assert Process.get(__MODULE__) == 10 + end + + test "transform/5 emits last elements on inner halt done" do + stream = + Stream.transform( + Stream.take(1..15//2, 3), + fn -> 0 end, + fn i, _acc -> {i..(i + 1), i+1} end, + fn 6 -> {7..10, 10} end, + fn i when is_integer(i) -> Process.put(__MODULE__, i) end + ) + + assert Enum.to_list(stream) == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + assert Process.get(__MODULE__) == 10 + + assert Enum.take(stream, 3) == [1, 2, 3] + assert Process.get(__MODULE__) == 4 + + assert Enum.take(stream, 4) == [1, 2, 3, 4] + assert Process.get(__MODULE__) == 4 + + assert Enum.take(stream, 7) == [1, 2, 3, 4, 5, 6, 7] + assert Process.get(__MODULE__) == 10 + end + test "scan/2" do stream = Stream.scan(1..5, &(&1 + &2)) assert lazy?(stream) |