summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-04-13 16:32:48 +0100
committerGitHub <noreply@github.com>2021-04-13 16:32:48 +0100
commitbfa1e4ec06cd33e271328b0f189ed0dbe7bab8ba (patch)
tree8ff9d1771b9009af75819e6b3b8f7b4fde24badd
parent9bd02b351014492844b38e2691dbabb57f1863b1 (diff)
parent4ff23ba6998cd6a5e29e6cee5ea9622979f93833 (diff)
downloadrabbitmq-server-git-bfa1e4ec06cd33e271328b0f189ed0dbe7bab8ba.tar.gz
Merge pull request #2929 from rabbitmq/stream-status-command
CLI command for stream details
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl48
-rw-r--r--deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/stream_status_command.ex71
-rw-r--r--deps/rabbitmq_cli/test/queues/stream_status_command_test.exs45
3 files changed, 164 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 12d62c70a5..f360d04edc 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -46,6 +46,9 @@
-export([format_osiris_event/2]).
-export([update_stream_conf/2]).
+-export([status/2,
+ tracking_status/2]).
+
-include_lib("rabbit_common/include/rabbit.hrl").
-include("amqqueue.hrl").
@@ -491,6 +494,51 @@ i(type, _) ->
stream;
i(_, _) ->
''.
+-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) ->
+ [[{binary(), term()}]] | {error, term()}.
+status(Vhost, QueueName) ->
+ %% Handle not found queues
+ QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} when ?amqqueue_is_classic(Q) ->
+ {error, classic_queue_not_supported};
+ {ok, Q} when ?amqqueue_is_quorum(Q) ->
+ {error, quorum_queue_not_supported};
+ {ok, Q} when ?amqqueue_is_stream(Q) ->
+ Data = osiris_counters:overview(),
+ case maps:get({osiris_writer, QName}, Data, undefined) of
+ undefined ->
+ [];
+ #{} = Cnt0 ->
+ Cnt = maps:without([chunks], Cnt0),
+ Conf = amqqueue:get_type_state(Q),
+ Max = maps:get(max_segment_size, Conf, osiris_log:get_default_max_segment_size()),
+ [maps:to_list(Cnt#{max_segment_size => Max})]
+ end;
+ {error, not_found} = E->
+ E
+ end.
+
+-spec tracking_status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) ->
+ [[{atom(), term()}]] | {error, term()}.
+tracking_status(Vhost, QueueName) ->
+ %% Handle not found queues
+ QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} when ?amqqueue_is_classic(Q) ->
+ {error, classic_queue_not_supported};
+ {ok, Q} when ?amqqueue_is_quorum(Q) ->
+ {error, quorum_queue_not_supported};
+ {ok, Q} when ?amqqueue_is_stream(Q) ->
+ Leader = amqqueue:get_pid(Q),
+ Map = osiris:read_tracking(Leader),
+ maps:fold(fun(K, V, Acc) ->
+ [[{reference, K},
+ {offset, V}] | Acc]
+ end, [], Map);
+ {error, not_found} = E->
+ E
+ end.
init(Q) when ?is_amqqueue(Q) ->
Leader = amqqueue:get_pid(Q),
diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/stream_status_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/stream_status_command.ex
new file mode 100644
index 0000000000..981ac03176
--- /dev/null
+++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/stream_status_command.ex
@@ -0,0 +1,71 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
+
+defmodule RabbitMQ.CLI.Queues.Commands.StreamStatusCommand do
+ alias RabbitMQ.CLI.Core.DocGuide
+
+ @behaviour RabbitMQ.CLI.CommandBehaviour
+ def scopes(), do: [:diagnostics, :queues]
+
+ def merge_defaults(args, opts), do: {args, Map.merge(%{tracking: false, vhost: "/"}, opts)}
+
+ def switches(), do: [tracking: :boolean]
+
+ use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument
+ use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
+
+ def run([name] = _args, %{node: node_name, vhost: vhost, tracking: :false}) do
+ case :rabbit_misc.rpc_call(node_name, :rabbit_stream_queue, :status, [vhost, name]) do
+ {:error, :classic_queue_not_supported} ->
+ {:error, "Cannot get stream status of a classic queue"}
+
+ {:error, :quorum_queue_not_supported} ->
+ {:error, "Cannot get stream status of a quorum queue"}
+
+ other ->
+ other
+ end
+ end
+ def run([name] = _args, %{node: node_name, vhost: vhost, tracking: :true}) do
+ case :rabbit_misc.rpc_call(node_name, :rabbit_stream_queue, :tracking_status, [vhost, name]) do
+ {:error, :classic_queue_not_supported} ->
+ {:error, "Cannot get stream status of a classic queue"}
+
+ {:error, :quorum_queue_not_supported} ->
+ {:error, "Cannot get stream status of a quorum queue"}
+
+ other ->
+ other
+ end
+ end
+
+ use RabbitMQ.CLI.DefaultOutput
+
+ def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable
+
+ def usage() do
+ "stream_status [--vhost <vhost>] [--tracking] <queue>"
+ end
+
+ def usage_additional do
+ [
+ ["<queue>", "Name of the queue"]
+ ]
+ end
+
+ def usage_doc_guides() do
+ [
+ DocGuide.stream_queues()
+ ]
+ end
+
+ def help_section(), do: :observability_and_health_checks
+
+ def description(), do: "Displays the status of a stream queue"
+
+ def banner([name], %{node: node_name}),
+ do: "Status of stream queue #{name} on node #{node_name} ..."
+end
diff --git a/deps/rabbitmq_cli/test/queues/stream_status_command_test.exs b/deps/rabbitmq_cli/test/queues/stream_status_command_test.exs
new file mode 100644
index 0000000000..b7b98a30e6
--- /dev/null
+++ b/deps/rabbitmq_cli/test/queues/stream_status_command_test.exs
@@ -0,0 +1,45 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+
+defmodule RabbitMQ.CLI.Queues.Commands.StreamStatusCommandTest do
+ use ExUnit.Case, async: false
+ import TestHelper
+
+ @command RabbitMQ.CLI.Queues.Commands.StreamStatusCommand
+
+ setup_all do
+ RabbitMQ.CLI.Core.Distribution.start()
+
+ :ok
+ end
+
+ setup context do
+ {:ok, opts: %{
+ node: get_rabbit_hostname(),
+ timeout: context[:test_timeout] || 30000
+ }}
+ end
+
+
+ test "validate: treats no arguments as a failure" do
+ assert @command.validate([], %{}) == {:validation_failure, :not_enough_args}
+ end
+
+ test "validate: accepts a single positional argument" do
+ assert @command.validate(["stream-queue-a"], %{}) == :ok
+ end
+
+ test "validate: when two or more arguments are provided, returns a failure" do
+ assert @command.validate(["stream-queue-a", "one-extra-arg"], %{}) == {:validation_failure, :too_many_args}
+ assert @command.validate(["stream-queue-a", "extra-arg", "another-extra-arg"], %{}) == {:validation_failure, :too_many_args}
+ end
+
+ @tag test_timeout: 3000
+ test "run: targeting an unreachable node throws a badrpc" do
+ assert match?({:badrpc, _}, @command.run(["stream-queue-a"],
+ %{node: :jake@thedog, vhost: "/", timeout: 200, tracking: false}))
+ end
+end