diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-09-24 16:27:47 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-10-11 16:50:02 +0200 |
commit | b0bd5f8a00c33b8f8ccd0a8f3bfdf6835cf6c1cd (patch) | |
tree | 22fb1ac7859b7389775627dc3172355cbdc8722a | |
parent | a73b1a3d0db709c67bd609f278d194f352c0e984 (diff) | |
download | rabbitmq-server-git-b0bd5f8a00c33b8f8ccd0a8f3bfdf6835cf6c1cd.tar.gz |
Add delete_super_stream CLI command
3 files changed, 170 insertions, 8 deletions
diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl index ec161257f3..7843ed4895 100644 --- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl @@ -99,6 +99,28 @@ run([SuperStream], VHost, SuperStream, Streams, + RoutingKeys); +run([SuperStream], + #{node := NodeName, + vhost := VHost, + timeout := Timeout, + routing_keys := RoutingKeysStr}) -> + RoutingKeys = + [rabbit_data_coercion:to_binary( + string:strip(K)) + || K + <- string:tokens( + rabbit_data_coercion:to_list(RoutingKeysStr), ",")], + Streams = + [list_to_binary(binary_to_list(SuperStream) + ++ "-" + ++ binary_to_list(K)) + || K <- RoutingKeys], + create_super_stream(NodeName, + Timeout, + VHost, + SuperStream, + Streams, RoutingKeys). create_super_stream(NodeName, diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl new file mode 100644 index 0000000000..90bf2db414 --- /dev/null +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl @@ -0,0 +1,97 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved. + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand'). + +-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1}, + {'Elixir.RabbitMQ.CLI.Core.Helpers', cli_acting_user, 0}, + {'Elixir.RabbitMQ.CLI.Core.ExitCodes', exit_software, 0}]). + +-export([scopes/0, + usage/0, + usage_additional/0, + usage_doc_guides/0, + banner/2, + validate/2, + merge_defaults/2, + run/2, + output/2, + description/0, + help_section/0]). + +scopes() -> + [ctl, streams]. + +description() -> + <<"Delete a super stream (experimental feature)">>. + +help_section() -> + {plugin, stream}. + +validate([], _Opts) -> + {validation_failure, not_enough_args}; +validate([_Name], _Opts) -> + ok; +validate(_, _Opts) -> + {validation_failure, too_many_args}. + +merge_defaults(_Args, Opts) -> + {_Args, maps:merge(#{vhost => <<"/">>}, Opts)}. + +usage() -> + <<"delete_super_stream <name> [--vhost <vhost>]">>. + +usage_additional() -> + [["<name>", "The name of the super stream to delete."], + ["--vhost <vhost>", "The virtual host of the super stream."]]. + +usage_doc_guides() -> + [?STREAM_GUIDE_URL]. + +run([SuperStream], + #{node := NodeName, + vhost := VHost, + timeout := Timeout}) -> + delete_super_stream(NodeName, Timeout, VHost, SuperStream). + +delete_super_stream(NodeName, Timeout, VHost, SuperStream) -> + case rabbit_misc:rpc_call(NodeName, + rabbit_stream_manager, + delete_super_stream, + [VHost, SuperStream, cli_acting_user()], + Timeout) + of + ok -> + {ok, + rabbit_misc:format("Super stream ~s has been deleted", + [SuperStream])}; + Error -> + Error + end. + +banner(_, _) -> + <<"Deleting a super stream ...">>. + +output({error, Msg}, _Opts) -> + {error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg}; +output({ok, Msg}, _Opts) -> + {ok, Msg}. + +cli_acting_user() -> + 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user(). diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl index 0c9f10e9db..bee44c4a29 100644 --- a/deps/rabbitmq_stream/test/commands_SUITE.erl +++ b/deps/rabbitmq_stream/test/commands_SUITE.erl @@ -25,6 +25,8 @@ 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand'). -define(COMMAND_ADD_SUPER_STREAM, 'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand'). +-define(COMMAND_DELETE_SUPER_STREAM, + 'Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand'). all() -> [{group, list_connections}, @@ -41,8 +43,11 @@ groups() -> {list_publishers, [], [list_publishers_merge_defaults, list_publishers_run]}, {super_streams, [], - [add_super_stream_merge_defaults, add_super_stream_validate, - add_super_stream_run]}]. + [add_super_stream_merge_defaults, + add_super_stream_validate, + delete_super_stream_merge_defaults, + delete_super_stream_validate, + add_delete_super_stream_run]}]. init_per_suite(Config) -> case rabbit_ct_helpers:is_mixed_versions() of @@ -360,20 +365,58 @@ add_super_stream_validate(_Config) -> <<"a,b,c">>})), ok. -add_super_stream_run(Config) -> +delete_super_stream_merge_defaults(_Config) -> + ?assertMatch({[<<"super-stream">>], #{vhost := <<"/">>}}, + ?COMMAND_DELETE_SUPER_STREAM:merge_defaults([<<"super-stream">>], + #{})), + ok. + +delete_super_stream_validate(_Config) -> + ?assertMatch({validation_failure, not_enough_args}, + ?COMMAND_DELETE_SUPER_STREAM:validate([], #{})), + ?assertMatch({validation_failure, too_many_args}, + ?COMMAND_DELETE_SUPER_STREAM:validate([<<"a">>, <<"b">>], + #{})), + ?assertEqual(ok, ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], #{})), + ok. + +add_delete_super_stream_run(Config) -> Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Opts = #{node => Node, timeout => 10000, - vhost => <<"/">>, - partitions => 3}, + vhost => <<"/">>}, ?assertMatch({ok, _}, - ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], Opts)), + ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], + maps:merge(#{partitions => 3}, + Opts))), ?assertEqual({ok, [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, - rabbit_stream_manager_SUITE:partitions(Config, - <<"invoices">>)). + partitions(Config, <<"invoices">>)), + ?assertMatch({ok, _}, + ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + ?assertEqual({error, stream_not_found}, + partitions(Config, <<"invoices">>)), + + ?assertMatch({ok, _}, + ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], + maps:merge(#{routing_keys => + <<" amer,emea , apac">>}, + Opts))), + ?assertEqual({ok, + [<<"invoices-amer">>, <<"invoices-emea">>, + <<"invoices-apac">>]}, + partitions(Config, <<"invoices">>)), + ?assertMatch({ok, _}, + ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + ?assertEqual({error, stream_not_found}, + partitions(Config, <<"invoices">>)), + + ok. + +partitions(Config, SuperStream) -> + rabbit_stream_manager_SUITE:partitions(Config, SuperStream). create_stream(S, Stream, C0) -> rabbit_stream_SUITE:test_create_stream(gen_tcp, S, Stream, C0). |