diff options
author | José Valim <jose.valim@plataformatec.com.br> | 2017-02-16 13:47:56 +0100 |
---|---|---|
committer | José Valim <jose.valim@plataformatec.com.br> | 2017-02-16 13:49:22 +0100 |
commit | 251045480551cf25d7c231eb87e0aa60d343a311 (patch) | |
tree | eac9d9019f9a1ab2cb7073a12a4d10cf906fce62 | |
parent | a1ccc4cca5d64f939e064b742e6685a601f58709 (diff) | |
download | elixir-251045480551cf25d7c231eb87e0aa60d343a311.tar.gz |
Allow consuming multiple items from suspended enumerable in Stream.transform/3
Closes #5763.
Closes #5772.
-rw-r--r-- | lib/elixir/lib/stream.ex | 36 | ||||
-rw-r--r-- | lib/elixir/test/elixir/stream_test.exs | 8 |
2 files changed, 27 insertions, 17 deletions
diff --git a/lib/elixir/lib/stream.ex b/lib/elixir/lib/stream.ex index 61105ab64..bb44e2c20 100644 --- a/lib/elixir/lib/stream.ex +++ b/lib/elixir/lib/stream.ex @@ -791,16 +791,18 @@ defmodule Stream do do_after(after_fun, user_acc) :erlang.raise(kind, reason, stacktrace) else - {:suspended, [val], next} -> - do_transform_user(val, user_acc, user, fun, :cont, next, inner_acc, inner, after_fun) - {_, [val]} -> - do_transform_user(val, user_acc, user, fun, :halt, next, inner_acc, inner, after_fun) - {_, []} -> - do_transform(user_acc, user, fun, :halt, next, inner_acc, inner, after_fun) + {:suspended, vals, next} -> + do_transform_user(:lists.reverse(vals), user_acc, user, fun, :cont, next, inner_acc, inner, after_fun) + {_, vals} -> + do_transform_user(:lists.reverse(vals), user_acc, user, fun, :halt, next, inner_acc, inner, after_fun) end end - defp do_transform_user(val, user_acc, user, fun, next_op, next, inner_acc, inner, after_fun) do + defp do_transform_user([], user_acc, user, fun, next_op, next, inner_acc, inner, after_fun) do + do_transform(user_acc, user, fun, next_op, next, inner_acc, inner, after_fun) + end + + defp do_transform_user([val | vals], user_acc, user, fun, next_op, next, inner_acc, inner, after_fun) do user.(val, user_acc) catch kind, reason -> @@ -810,20 +812,20 @@ defmodule Stream do :erlang.raise(kind, reason, stacktrace) else {[], user_acc} -> - do_transform(user_acc, user, fun, next_op, next, inner_acc, inner, after_fun) + do_transform_user(vals, user_acc, user, fun, next_op, next, inner_acc, inner, after_fun) {list, user_acc} when is_list(list) -> - do_list_transform(user_acc, user, fun, next_op, next, inner_acc, inner, + do_list_transform(vals, user_acc, user, fun, next_op, next, inner_acc, inner, &Enumerable.List.reduce(list, &1, fun), after_fun) {:halt, user_acc} -> next.({:halt, []}) do_after(after_fun, user_acc) {:halted, elem(inner_acc, 1)} {other, user_acc} -> - do_enum_transform(user_acc, user, fun, next_op, next, inner_acc, inner, + do_enum_transform(vals, user_acc, user, fun, next_op, next, inner_acc, inner, &Enumerable.reduce(other, &1, inner), after_fun) end - defp do_list_transform(user_acc, user, fun, next_op, next, inner_acc, inner, reduce, after_fun) do + defp do_list_transform(vals, user_acc, user, fun, next_op, next, inner_acc, inner, reduce, after_fun) do try do reduce.(inner_acc) catch @@ -834,17 +836,17 @@ defmodule Stream do :erlang.raise(kind, reason, stacktrace) else {:done, acc} -> - do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun) + do_transform_user(vals, user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun) {:halted, acc} -> next.({:halt, []}) do_after(after_fun, user_acc) {:halted, acc} {:suspended, acc, c} -> - {:suspended, acc, &do_list_transform(user_acc, user, fun, next_op, next, &1, inner, c, after_fun)} + {:suspended, acc, &do_list_transform(vals, user_acc, user, fun, next_op, next, &1, inner, c, after_fun)} end end - defp do_enum_transform(user_acc, user, fun, next_op, next, {op, inner_acc}, inner, reduce, after_fun) do + defp do_enum_transform(vals, user_acc, user, fun, next_op, next, {op, inner_acc}, inner, reduce, after_fun) do try do reduce.({op, [:outer | inner_acc]}) catch @@ -857,15 +859,15 @@ defmodule Stream do # Only take into account outer halts when the op is not halt itself. # Otherwise, we were the ones wishing to halt, so we should just stop. {:halted, [:outer | acc]} when op != :halt -> - do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun) + do_transform_user(vals, user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun) {:halted, [_ | acc]} -> next.({:halt, []}) do_after(after_fun, user_acc) {:halted, acc} {:done, [_ | acc]} -> - do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun) + do_transform_user(vals, user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun) {:suspended, [_ | acc], c} -> - {:suspended, acc, &do_enum_transform(user_acc, user, fun, next_op, next, &1, inner, c, after_fun)} + {:suspended, acc, &do_enum_transform(vals, user_acc, user, fun, next_op, next, &1, inner, c, after_fun)} end end diff --git a/lib/elixir/test/elixir/stream_test.exs b/lib/elixir/test/elixir/stream_test.exs index 619efe24f..8af1deb53 100644 --- a/lib/elixir/test/elixir/stream_test.exs +++ b/lib/elixir/test/elixir/stream_test.exs @@ -481,6 +481,14 @@ defmodule StreamTest do assert Process.get(:stream_transform) end + test "transform/3 (via flat_map) handles multiple returns from suspension" do + assert [false] + |> Stream.take(1) + |> Stream.concat([true]) + |> Stream.flat_map(&[&1]) + |> Enum.to_list() == [false, true] + end + test "iterate/2" do stream = Stream.iterate(0, &(&1+2)) assert Enum.take(stream, 5) == [0, 2, 4, 6, 8] |