diff options
author | Ian Young <ian@iangreenleaf.com> | 2020-01-19 06:14:03 -0600 |
---|---|---|
committer | José Valim <jose.valim@plataformatec.com.br> | 2020-01-19 13:14:03 +0100 |
commit | 86df142ed007e6024e005c52272a455c2d12e0d4 (patch) | |
tree | dc94dabd2d2cf5ae33500f621c89fb6ee54d20bb | |
parent | ed87925a6859d979bcda0f0ea961404a551799a3 (diff) | |
download | elixir-86df142ed007e6024e005c52272a455c2d12e0d4.tar.gz |
Add Task.await_many/2 (#9741)
-rw-r--r-- | lib/elixir/lib/task.ex | 93 | ||||
-rw-r--r-- | lib/elixir/test/elixir/task_test.exs | 129 |
2 files changed, 222 insertions, 0 deletions
diff --git a/lib/elixir/lib/task.ex b/lib/elixir/lib/task.ex index 6f4f9f924..808b89fbe 100644 --- a/lib/elixir/lib/task.ex +++ b/lib/elixir/lib/task.ex @@ -608,6 +608,99 @@ defmodule Task do end end + @doc """ + Awaits replies from multiple tasks and returns them. + + This function receives a list of tasks and waits for their replies in the + given time interval. It returns a list of the results, in the same order as + the tasks supplied in the `tasks` input argument. + + If any of the task processes dies, the current process will exit with the + same reason as that task. + + A timeout in milliseconds or `:infinity`, can be given with a default value + of `5000`. If the timeout is exceeded, then the current process will exit. + Any task processes that are linked to the current process (which is the case + when a task is started with `async`) will also exit. Any task processes that + are trapping exits or not linked to the current process will continue to run. + + This function assumes the tasks' monitors are still active or the monitors' + `:DOWN` message is in the message queue. If any tasks have been demonitored, + or the message already received, this function will wait for the duration of + the timeout. + + This function can only be called once for any given task. If you want to be + able to check multiple times if a long-running task has finished its + computation, use `yield_many/2` instead. + + ## Compatibility with OTP behaviours + + It is not recommended to `await` long-running tasks inside an OTP behaviour + such as `GenServer`. See `await/2` for more information. + + ## Examples + + iex> tasks = [ + ...> Task.async(fn -> 1 + 1 end), + ...> Task.async(fn -> 2 + 3 end) + ...> ] + iex> Task.await_many(tasks) + [2, 5] + + """ + @spec await_many([t], timeout) :: [term] + def await_many(tasks, timeout \\ 5000) when is_timeout(timeout) do + awaiting = + for task <- tasks, into: %{} do + %Task{ref: ref, owner: owner} = task + + if owner != self() do + raise ArgumentError, invalid_owner_error(task) + end + + {ref, true} + end + + timeout_ref = make_ref() + + timer_ref = + if timeout != :infinity do + Process.send_after(self(), timeout_ref, timeout) + end + + try do + await_many(tasks, timeout, awaiting, %{}, timeout_ref) + after + timer_ref && Process.cancel_timer(timer_ref) + receive do: (^timeout_ref -> :ok), after: (0 -> :ok) + end + end + + defp await_many(tasks, _timeout, map, replies, _timeout_ref) when map_size(map) == 0 do + for %{ref: ref} <- tasks, do: Map.fetch!(replies, ref) + end + + defp await_many(tasks, timeout, awaiting, replies, timeout_ref) do + receive do + ^timeout_ref -> + exit({:timeout, {__MODULE__, :await_many, [tasks, timeout]}}) + + {:DOWN, ref, _, proc, reason} when is_map_key(awaiting, ref) -> + exit({reason(reason, proc), {__MODULE__, :await_many, [tasks, timeout]}}) + + {ref, reply} when is_map_key(awaiting, ref) -> + Process.demonitor(ref, [:flush]) + + await_many( + tasks, + timeout, + Map.delete(awaiting, ref), + Map.put(replies, ref, reply), + timeout_ref + ) + end + end + @doc false @deprecated "Pattern match directly on the message instead" def find(tasks, {ref, reply}) when is_reference(ref) do diff --git a/lib/elixir/test/elixir/task_test.exs b/lib/elixir/test/elixir/task_test.exs index 5d0862c63..7f1e2f8e7 100644 --- a/lib/elixir/test/elixir/task_test.exs +++ b/lib/elixir/test/elixir/task_test.exs @@ -272,6 +272,135 @@ defmodule TaskTest do end end + describe "await_many/2" do + test "returns list of replies" do + tasks = for val <- [1, 3, 9], do: Task.async(fn -> val end) + assert Task.await_many(tasks) == [1, 3, 9] + end + + test "returns replies in input order ignoring response order" do + refs = [ref_1 = make_ref(), ref_2 = make_ref(), ref_3 = make_ref()] + tasks = Enum.map(refs, fn ref -> %Task{ref: ref, owner: self(), pid: nil} end) + send(self(), {ref_2, 3}) + send(self(), {ref_3, 9}) + send(self(), {ref_1, 1}) + assert Task.await_many(tasks) == [1, 3, 9] + end + + test "returns an empty list immediately" do + assert Task.await_many([]) == [] + end + + test "ignores messages from other processes" do + other_ref = make_ref() + tasks = for val <- [:a, :b], do: Task.async(fn -> val end) + send(self(), other_ref) + send(self(), {other_ref, :z}) + send(self(), {:DOWN, other_ref, :process, 1, :goodbye}) + assert Task.await_many(tasks) == [:a, :b] + assert_received other_ref + assert_received {other_ref, :z} + assert_received {:DOWN, other_ref, :process, 1, :goodbye} + end + + test "ignores additional messages after reply" do + refs = [ref_1 = make_ref(), ref_2 = make_ref()] + tasks = Enum.map(refs, fn ref -> %Task{ref: ref, owner: self(), pid: nil} end) + send(self(), {ref_2, :b}) + send(self(), {ref_2, :other}) + send(self(), {ref_1, :a}) + assert Task.await_many(tasks) == [:a, :b] + assert_received {ref_2, :other} + end + + test "exits on timeout" do + tasks = [Task.async(fn -> Process.sleep(:infinity) end)] + assert catch_exit(Task.await_many(tasks, 0)) == {:timeout, {Task, :await_many, [tasks, 0]}} + end + + test "exits with same reason when task exits" do + tasks = [Task.async(fn -> exit(:normal) end)] + assert catch_exit(Task.await_many(tasks)) == {:normal, {Task, :await_many, [tasks, 5000]}} + end + + test "exits immediately when any task exits" do + tasks = [ + Task.async(fn -> Process.sleep(:infinity) end), + Task.async(fn -> exit(:normal) end) + ] + + assert catch_exit(Task.await_many(tasks)) == {:normal, {Task, :await_many, [tasks, 5000]}} + end + + test "exits immediately when any task crashes" do + Process.flag(:trap_exit, true) + + tasks = [ + Task.async(fn -> Process.sleep(:infinity) end), + Task.async(fn -> exit(:unknown) end) + ] + + assert catch_exit(Task.await_many(tasks)) == {:unknown, {Task, :await_many, [tasks, 5000]}} + end + + test "exits immediately when any task throws" do + Process.flag(:trap_exit, true) + + tasks = [ + Task.async(fn -> Process.sleep(:infinity) end), + Task.async(fn -> throw(:unknown) end) + ] + + assert {{{:nocatch, :unknown}, _}, {Task, :await_many, [^tasks, 5000]}} = + catch_exit(Task.await_many(tasks)) + end + + test "exits immediately on any task error" do + Process.flag(:trap_exit, true) + + tasks = [ + Task.async(fn -> Process.sleep(:infinity) end), + Task.async(fn -> raise "oops" end) + ] + + assert {{%RuntimeError{}, _}, {Task, :await_many, [^tasks, 5000]}} = + catch_exit(Task.await_many(tasks)) + end + + test "exits immediately on :noconnection" do + tasks = [ + Task.async(fn -> Process.sleep(:infinity) end), + %Task{ref: ref = make_ref(), owner: self(), pid: self()} + ] + + send(self(), {:DOWN, ref, :process, self(), :noconnection}) + assert catch_exit(Task.await_many(tasks)) |> elem(0) == {:nodedown, :nonode@nohost} + end + + test "exits immediately on :noconnection from named monitor" do + tasks = [ + Task.async(fn -> Process.sleep(:infinity) end), + %Task{ref: ref = make_ref(), owner: self(), pid: nil} + ] + + send(self(), {:DOWN, ref, :process, {:name, :node}, :noconnection}) + assert catch_exit(Task.await_many(tasks)) |> elem(0) == {:nodedown, :node} + end + + test "raises when invoked from a non-owner process" do + tasks = [ + Task.async(fn -> Process.sleep(:infinity) end), + bad_task = create_task_in_other_process() + ] + + message = + "task #{inspect(bad_task)} must be queried from the owner " <> + "but was queried from #{inspect(self())}" + + assert_raise ArgumentError, message, fn -> Task.await_many(tasks, 1) end + end + end + describe "yield/2" do test "returns {:ok, result} when reply and :DOWN in message queue" do task = %Task{ref: make_ref(), owner: self(), pid: nil} |