summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Young <ian@iangreenleaf.com>2020-01-19 06:14:03 -0600
committerJosé Valim <jose.valim@plataformatec.com.br>2020-01-19 13:14:03 +0100
commit86df142ed007e6024e005c52272a455c2d12e0d4 (patch)
treedc94dabd2d2cf5ae33500f621c89fb6ee54d20bb
parented87925a6859d979bcda0f0ea961404a551799a3 (diff)
downloadelixir-86df142ed007e6024e005c52272a455c2d12e0d4.tar.gz
Add Task.await_many/2 (#9741)
-rw-r--r--lib/elixir/lib/task.ex93
-rw-r--r--lib/elixir/test/elixir/task_test.exs129
2 files changed, 222 insertions, 0 deletions
diff --git a/lib/elixir/lib/task.ex b/lib/elixir/lib/task.ex
index 6f4f9f924..808b89fbe 100644
--- a/lib/elixir/lib/task.ex
+++ b/lib/elixir/lib/task.ex
@@ -608,6 +608,99 @@ defmodule Task do
end
end
+ @doc """
+ Awaits replies from multiple tasks and returns them.
+
+ This function receives a list of tasks and waits for their replies in the
+ given time interval. It returns a list of the results, in the same order as
+ the tasks supplied in the `tasks` input argument.
+
+ If any of the task processes dies, the current process will exit with the
+ same reason as that task.
+
+ A timeout in milliseconds or `:infinity`, can be given with a default value
+ of `5000`. If the timeout is exceeded, then the current process will exit.
+ Any task processes that are linked to the current process (which is the case
+ when a task is started with `async`) will also exit. Any task processes that
+ are trapping exits or not linked to the current process will continue to run.
+
+ This function assumes the tasks' monitors are still active or the monitors'
+ `:DOWN` message is in the message queue. If any tasks have been demonitored,
+ or the message already received, this function will wait for the duration of
+ the timeout.
+
+ This function can only be called once for any given task. If you want to be
+ able to check multiple times if a long-running task has finished its
+ computation, use `yield_many/2` instead.
+
+ ## Compatibility with OTP behaviours
+
+ It is not recommended to `await` long-running tasks inside an OTP behaviour
+ such as `GenServer`. See `await/2` for more information.
+
+ ## Examples
+
+ iex> tasks = [
+ ...> Task.async(fn -> 1 + 1 end),
+ ...> Task.async(fn -> 2 + 3 end)
+ ...> ]
+ iex> Task.await_many(tasks)
+ [2, 5]
+
+ """
+ @spec await_many([t], timeout) :: [term]
+ def await_many(tasks, timeout \\ 5000) when is_timeout(timeout) do
+ awaiting =
+ for task <- tasks, into: %{} do
+ %Task{ref: ref, owner: owner} = task
+
+ if owner != self() do
+ raise ArgumentError, invalid_owner_error(task)
+ end
+
+ {ref, true}
+ end
+
+ timeout_ref = make_ref()
+
+ timer_ref =
+ if timeout != :infinity do
+ Process.send_after(self(), timeout_ref, timeout)
+ end
+
+ try do
+ await_many(tasks, timeout, awaiting, %{}, timeout_ref)
+ after
+ timer_ref && Process.cancel_timer(timer_ref)
+ receive do: (^timeout_ref -> :ok), after: (0 -> :ok)
+ end
+ end
+
+ defp await_many(tasks, _timeout, map, replies, _timeout_ref) when map_size(map) == 0 do
+ for %{ref: ref} <- tasks, do: Map.fetch!(replies, ref)
+ end
+
+ defp await_many(tasks, timeout, awaiting, replies, timeout_ref) do
+ receive do
+ ^timeout_ref ->
+ exit({:timeout, {__MODULE__, :await_many, [tasks, timeout]}})
+
+ {:DOWN, ref, _, proc, reason} when is_map_key(awaiting, ref) ->
+ exit({reason(reason, proc), {__MODULE__, :await_many, [tasks, timeout]}})
+
+ {ref, reply} when is_map_key(awaiting, ref) ->
+ Process.demonitor(ref, [:flush])
+
+ await_many(
+ tasks,
+ timeout,
+ Map.delete(awaiting, ref),
+ Map.put(replies, ref, reply),
+ timeout_ref
+ )
+ end
+ end
+
@doc false
@deprecated "Pattern match directly on the message instead"
def find(tasks, {ref, reply}) when is_reference(ref) do
diff --git a/lib/elixir/test/elixir/task_test.exs b/lib/elixir/test/elixir/task_test.exs
index 5d0862c63..7f1e2f8e7 100644
--- a/lib/elixir/test/elixir/task_test.exs
+++ b/lib/elixir/test/elixir/task_test.exs
@@ -272,6 +272,135 @@ defmodule TaskTest do
end
end
+ describe "await_many/2" do
+ test "returns list of replies" do
+ tasks = for val <- [1, 3, 9], do: Task.async(fn -> val end)
+ assert Task.await_many(tasks) == [1, 3, 9]
+ end
+
+ test "returns replies in input order ignoring response order" do
+ refs = [ref_1 = make_ref(), ref_2 = make_ref(), ref_3 = make_ref()]
+ tasks = Enum.map(refs, fn ref -> %Task{ref: ref, owner: self(), pid: nil} end)
+ send(self(), {ref_2, 3})
+ send(self(), {ref_3, 9})
+ send(self(), {ref_1, 1})
+ assert Task.await_many(tasks) == [1, 3, 9]
+ end
+
+ test "returns an empty list immediately" do
+ assert Task.await_many([]) == []
+ end
+
+ test "ignores messages from other processes" do
+ other_ref = make_ref()
+ tasks = for val <- [:a, :b], do: Task.async(fn -> val end)
+ send(self(), other_ref)
+ send(self(), {other_ref, :z})
+ send(self(), {:DOWN, other_ref, :process, 1, :goodbye})
+ assert Task.await_many(tasks) == [:a, :b]
+ assert_received other_ref
+ assert_received {other_ref, :z}
+ assert_received {:DOWN, other_ref, :process, 1, :goodbye}
+ end
+
+ test "ignores additional messages after reply" do
+ refs = [ref_1 = make_ref(), ref_2 = make_ref()]
+ tasks = Enum.map(refs, fn ref -> %Task{ref: ref, owner: self(), pid: nil} end)
+ send(self(), {ref_2, :b})
+ send(self(), {ref_2, :other})
+ send(self(), {ref_1, :a})
+ assert Task.await_many(tasks) == [:a, :b]
+ assert_received {ref_2, :other}
+ end
+
+ test "exits on timeout" do
+ tasks = [Task.async(fn -> Process.sleep(:infinity) end)]
+ assert catch_exit(Task.await_many(tasks, 0)) == {:timeout, {Task, :await_many, [tasks, 0]}}
+ end
+
+ test "exits with same reason when task exits" do
+ tasks = [Task.async(fn -> exit(:normal) end)]
+ assert catch_exit(Task.await_many(tasks)) == {:normal, {Task, :await_many, [tasks, 5000]}}
+ end
+
+ test "exits immediately when any task exits" do
+ tasks = [
+ Task.async(fn -> Process.sleep(:infinity) end),
+ Task.async(fn -> exit(:normal) end)
+ ]
+
+ assert catch_exit(Task.await_many(tasks)) == {:normal, {Task, :await_many, [tasks, 5000]}}
+ end
+
+ test "exits immediately when any task crashes" do
+ Process.flag(:trap_exit, true)
+
+ tasks = [
+ Task.async(fn -> Process.sleep(:infinity) end),
+ Task.async(fn -> exit(:unknown) end)
+ ]
+
+ assert catch_exit(Task.await_many(tasks)) == {:unknown, {Task, :await_many, [tasks, 5000]}}
+ end
+
+ test "exits immediately when any task throws" do
+ Process.flag(:trap_exit, true)
+
+ tasks = [
+ Task.async(fn -> Process.sleep(:infinity) end),
+ Task.async(fn -> throw(:unknown) end)
+ ]
+
+ assert {{{:nocatch, :unknown}, _}, {Task, :await_many, [^tasks, 5000]}} =
+ catch_exit(Task.await_many(tasks))
+ end
+
+ test "exits immediately on any task error" do
+ Process.flag(:trap_exit, true)
+
+ tasks = [
+ Task.async(fn -> Process.sleep(:infinity) end),
+ Task.async(fn -> raise "oops" end)
+ ]
+
+ assert {{%RuntimeError{}, _}, {Task, :await_many, [^tasks, 5000]}} =
+ catch_exit(Task.await_many(tasks))
+ end
+
+ test "exits immediately on :noconnection" do
+ tasks = [
+ Task.async(fn -> Process.sleep(:infinity) end),
+ %Task{ref: ref = make_ref(), owner: self(), pid: self()}
+ ]
+
+ send(self(), {:DOWN, ref, :process, self(), :noconnection})
+ assert catch_exit(Task.await_many(tasks)) |> elem(0) == {:nodedown, :nonode@nohost}
+ end
+
+ test "exits immediately on :noconnection from named monitor" do
+ tasks = [
+ Task.async(fn -> Process.sleep(:infinity) end),
+ %Task{ref: ref = make_ref(), owner: self(), pid: nil}
+ ]
+
+ send(self(), {:DOWN, ref, :process, {:name, :node}, :noconnection})
+ assert catch_exit(Task.await_many(tasks)) |> elem(0) == {:nodedown, :node}
+ end
+
+ test "raises when invoked from a non-owner process" do
+ tasks = [
+ Task.async(fn -> Process.sleep(:infinity) end),
+ bad_task = create_task_in_other_process()
+ ]
+
+ message =
+ "task #{inspect(bad_task)} must be queried from the owner " <>
+ "but was queried from #{inspect(self())}"
+
+ assert_raise ArgumentError, message, fn -> Task.await_many(tasks, 1) end
+ end
+ end
+
describe "yield/2" do
test "returns {:ok, result} when reply and :DOWN in message queue" do
task = %Task{ref: make_ref(), owner: self(), pid: nil}