summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/rpc_stream.ex
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/rpc_stream.ex')
-rw-r--r--deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/rpc_stream.ex124
1 files changed, 124 insertions, 0 deletions
diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/rpc_stream.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/rpc_stream.ex
new file mode 100644
index 0000000000..4b672a6d88
--- /dev/null
+++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/rpc_stream.ex
@@ -0,0 +1,124 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+
+defmodule RabbitMQ.CLI.Ctl.RpcStream do
+ alias RabbitMQ.CLI.Ctl.InfoKeys
+
+ def receive_list_items(node, mod, fun, args, timeout, info_keys) do
+ receive_list_items(node, [{mod, fun, args}], timeout, info_keys, 1)
+ end
+
+ def receive_list_items(node, mod, fun, args, timeout, info_keys, chunks) do
+ receive_list_items(node, [{mod, fun, args}], timeout, info_keys, chunks)
+ end
+
+ def receive_list_items(_node, _mfas, _timeout, _info_keys, 0) do
+ nil
+ end
+
+ def receive_list_items(node, mfas, timeout, info_keys, chunks_init) do
+ receive_list_items_with_fun(node, mfas, timeout, info_keys, chunks_init, fn v -> v end)
+ end
+
+ def receive_list_items_with_fun(node, mfas, timeout, info_keys, chunks_init, response_fun) do
+ pid = Kernel.self()
+ ref = Kernel.make_ref()
+ for {m, f, a} <- mfas, do: init_items_stream(node, m, f, a, timeout, pid, ref)
+
+ Stream.unfold(
+ {chunks_init, :continue},
+ fn
+ :finished ->
+ response_fun.(nil)
+
+ {chunks, :continue} ->
+ received =
+ receive do
+ {^ref, :finished} when chunks === 1 ->
+ nil
+
+ {^ref, :finished} ->
+ {[], {chunks - 1, :continue}}
+
+ {^ref, {:timeout, t}} ->
+ {{:error, {:badrpc, {:timeout, t / 1000}}}, :finished}
+
+ {^ref, []} ->
+ {[], {chunks, :continue}}
+
+ {^ref, :error, {:badrpc, :timeout}} ->
+ {{:error, {:badrpc, {:timeout, timeout / 1000}}}, :finished}
+
+ {^ref, result, :continue} ->
+ {result, {chunks, :continue}}
+
+ {:error, _} = error ->
+ {error, :finished}
+
+ {^ref, :error, error} ->
+ {{:error, simplify_emission_error(error)}, :finished}
+
+ {:DOWN, _mref, :process, _pid, :normal} ->
+ {[], {chunks, :continue}}
+
+ {:DOWN, _mref, :process, _pid, reason} ->
+ {{:error, simplify_emission_error(reason)}, :finished}
+ end
+
+ response_fun.(received)
+ end
+ )
+ |> display_list_items(info_keys)
+ end
+
+ def simplify_emission_error({:badrpc, {:EXIT, {{:nocatch, error}, error_details}}}) do
+ {error, error_details}
+ end
+
+ def simplify_emission_error({{:nocatch, error}, error_details}) do
+ {error, error_details}
+ end
+
+ def simplify_emission_error(other) do
+ other
+ end
+
+ defp display_list_items(items, info_keys) do
+ items
+ |> Stream.filter(fn
+ [] -> false
+ _ -> true
+ end)
+ |> Stream.map(fn
+ {:error, error} ->
+ error
+
+ # here item is a list of keyword lists:
+ [[{_, _} | _] | _] = item ->
+ Enum.map(item, fn i -> InfoKeys.info_for_keys(i, info_keys) end)
+
+ item ->
+ InfoKeys.info_for_keys(item, info_keys)
+ end)
+ end
+
+ defp init_items_stream(_node, _mod, _fun, _args, 0, pid, ref) do
+ set_stream_timeout(pid, ref, 0)
+ end
+
+ defp init_items_stream(node, mod, fun, args, timeout, pid, ref) do
+ :rabbit_control_misc.spawn_emitter_caller(node, mod, fun, args, ref, pid, timeout)
+ set_stream_timeout(pid, ref, timeout)
+ end
+
+ defp set_stream_timeout(_, _, :infinity) do
+ :ok
+ end
+
+ defp set_stream_timeout(pid, ref, timeout) do
+ Process.send_after(pid, {ref, {:timeout, timeout}}, timeout)
+ end
+end