summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosé Valim <jose.valim@plataformatec.com.br>2017-02-16 13:47:56 +0100
committerJosé Valim <jose.valim@plataformatec.com.br>2017-02-16 13:49:22 +0100
commit251045480551cf25d7c231eb87e0aa60d343a311 (patch)
treeeac9d9019f9a1ab2cb7073a12a4d10cf906fce62
parenta1ccc4cca5d64f939e064b742e6685a601f58709 (diff)
downloadelixir-251045480551cf25d7c231eb87e0aa60d343a311.tar.gz
Allow consuming multiple items from suspended enumerable in Stream.transform/3
Closes #5763. Closes #5772.
-rw-r--r--lib/elixir/lib/stream.ex36
-rw-r--r--lib/elixir/test/elixir/stream_test.exs8
2 files changed, 27 insertions, 17 deletions
diff --git a/lib/elixir/lib/stream.ex b/lib/elixir/lib/stream.ex
index 61105ab64..bb44e2c20 100644
--- a/lib/elixir/lib/stream.ex
+++ b/lib/elixir/lib/stream.ex
@@ -791,16 +791,18 @@ defmodule Stream do
do_after(after_fun, user_acc)
:erlang.raise(kind, reason, stacktrace)
else
- {:suspended, [val], next} ->
- do_transform_user(val, user_acc, user, fun, :cont, next, inner_acc, inner, after_fun)
- {_, [val]} ->
- do_transform_user(val, user_acc, user, fun, :halt, next, inner_acc, inner, after_fun)
- {_, []} ->
- do_transform(user_acc, user, fun, :halt, next, inner_acc, inner, after_fun)
+ {:suspended, vals, next} ->
+ do_transform_user(:lists.reverse(vals), user_acc, user, fun, :cont, next, inner_acc, inner, after_fun)
+ {_, vals} ->
+ do_transform_user(:lists.reverse(vals), user_acc, user, fun, :halt, next, inner_acc, inner, after_fun)
end
end
- defp do_transform_user(val, user_acc, user, fun, next_op, next, inner_acc, inner, after_fun) do
+ defp do_transform_user([], user_acc, user, fun, next_op, next, inner_acc, inner, after_fun) do
+ do_transform(user_acc, user, fun, next_op, next, inner_acc, inner, after_fun)
+ end
+
+ defp do_transform_user([val | vals], user_acc, user, fun, next_op, next, inner_acc, inner, after_fun) do
user.(val, user_acc)
catch
kind, reason ->
@@ -810,20 +812,20 @@ defmodule Stream do
:erlang.raise(kind, reason, stacktrace)
else
{[], user_acc} ->
- do_transform(user_acc, user, fun, next_op, next, inner_acc, inner, after_fun)
+ do_transform_user(vals, user_acc, user, fun, next_op, next, inner_acc, inner, after_fun)
{list, user_acc} when is_list(list) ->
- do_list_transform(user_acc, user, fun, next_op, next, inner_acc, inner,
+ do_list_transform(vals, user_acc, user, fun, next_op, next, inner_acc, inner,
&Enumerable.List.reduce(list, &1, fun), after_fun)
{:halt, user_acc} ->
next.({:halt, []})
do_after(after_fun, user_acc)
{:halted, elem(inner_acc, 1)}
{other, user_acc} ->
- do_enum_transform(user_acc, user, fun, next_op, next, inner_acc, inner,
+ do_enum_transform(vals, user_acc, user, fun, next_op, next, inner_acc, inner,
&Enumerable.reduce(other, &1, inner), after_fun)
end
- defp do_list_transform(user_acc, user, fun, next_op, next, inner_acc, inner, reduce, after_fun) do
+ defp do_list_transform(vals, user_acc, user, fun, next_op, next, inner_acc, inner, reduce, after_fun) do
try do
reduce.(inner_acc)
catch
@@ -834,17 +836,17 @@ defmodule Stream do
:erlang.raise(kind, reason, stacktrace)
else
{:done, acc} ->
- do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
+ do_transform_user(vals, user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
{:halted, acc} ->
next.({:halt, []})
do_after(after_fun, user_acc)
{:halted, acc}
{:suspended, acc, c} ->
- {:suspended, acc, &do_list_transform(user_acc, user, fun, next_op, next, &1, inner, c, after_fun)}
+ {:suspended, acc, &do_list_transform(vals, user_acc, user, fun, next_op, next, &1, inner, c, after_fun)}
end
end
- defp do_enum_transform(user_acc, user, fun, next_op, next, {op, inner_acc}, inner, reduce, after_fun) do
+ defp do_enum_transform(vals, user_acc, user, fun, next_op, next, {op, inner_acc}, inner, reduce, after_fun) do
try do
reduce.({op, [:outer | inner_acc]})
catch
@@ -857,15 +859,15 @@ defmodule Stream do
# Only take into account outer halts when the op is not halt itself.
# Otherwise, we were the ones wishing to halt, so we should just stop.
{:halted, [:outer | acc]} when op != :halt ->
- do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
+ do_transform_user(vals, user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
{:halted, [_ | acc]} ->
next.({:halt, []})
do_after(after_fun, user_acc)
{:halted, acc}
{:done, [_ | acc]} ->
- do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
+ do_transform_user(vals, user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
{:suspended, [_ | acc], c} ->
- {:suspended, acc, &do_enum_transform(user_acc, user, fun, next_op, next, &1, inner, c, after_fun)}
+ {:suspended, acc, &do_enum_transform(vals, user_acc, user, fun, next_op, next, &1, inner, c, after_fun)}
end
end
diff --git a/lib/elixir/test/elixir/stream_test.exs b/lib/elixir/test/elixir/stream_test.exs
index 619efe24f..8af1deb53 100644
--- a/lib/elixir/test/elixir/stream_test.exs
+++ b/lib/elixir/test/elixir/stream_test.exs
@@ -481,6 +481,14 @@ defmodule StreamTest do
assert Process.get(:stream_transform)
end
+ test "transform/3 (via flat_map) handles multiple returns from suspension" do
+ assert [false]
+ |> Stream.take(1)
+ |> Stream.concat([true])
+ |> Stream.flat_map(&[&1])
+ |> Enum.to_list() == [false, true]
+ end
+
test "iterate/2" do
stream = Stream.iterate(0, &(&1+2))
assert Enum.take(stream, 5) == [0, 2, 4, 6, 8]