summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-09-24 16:27:47 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-10-11 16:50:02 +0200
commitb0bd5f8a00c33b8f8ccd0a8f3bfdf6835cf6c1cd (patch)
tree22fb1ac7859b7389775627dc3172355cbdc8722a
parenta73b1a3d0db709c67bd609f278d194f352c0e984 (diff)
downloadrabbitmq-server-git-b0bd5f8a00c33b8f8ccd0a8f3bfdf6835cf6c1cd.tar.gz
Add delete_super_stream CLI command
-rw-r--r--deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl22
-rw-r--r--deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl97
-rw-r--r--deps/rabbitmq_stream/test/commands_SUITE.erl59
3 files changed, 170 insertions, 8 deletions
diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl
index ec161257f3..7843ed4895 100644
--- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl
+++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl
@@ -99,6 +99,28 @@ run([SuperStream],
VHost,
SuperStream,
Streams,
+ RoutingKeys);
+run([SuperStream],
+ #{node := NodeName,
+ vhost := VHost,
+ timeout := Timeout,
+ routing_keys := RoutingKeysStr}) ->
+ RoutingKeys =
+ [rabbit_data_coercion:to_binary(
+ string:strip(K))
+ || K
+ <- string:tokens(
+ rabbit_data_coercion:to_list(RoutingKeysStr), ",")],
+ Streams =
+ [list_to_binary(binary_to_list(SuperStream)
+ ++ "-"
+ ++ binary_to_list(K))
+ || K <- RoutingKeys],
+ create_super_stream(NodeName,
+ Timeout,
+ VHost,
+ SuperStream,
+ Streams,
RoutingKeys).
create_super_stream(NodeName,
diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl
new file mode 100644
index 0000000000..90bf2db414
--- /dev/null
+++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl
@@ -0,0 +1,97 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
+
+-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand').
+
+-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
+
+-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
+
+-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1},
+ {'Elixir.RabbitMQ.CLI.Core.Helpers', cli_acting_user, 0},
+ {'Elixir.RabbitMQ.CLI.Core.ExitCodes', exit_software, 0}]).
+
+-export([scopes/0,
+ usage/0,
+ usage_additional/0,
+ usage_doc_guides/0,
+ banner/2,
+ validate/2,
+ merge_defaults/2,
+ run/2,
+ output/2,
+ description/0,
+ help_section/0]).
+
+scopes() ->
+ [ctl, streams].
+
+description() ->
+ <<"Delete a super stream (experimental feature)">>.
+
+help_section() ->
+ {plugin, stream}.
+
+validate([], _Opts) ->
+ {validation_failure, not_enough_args};
+validate([_Name], _Opts) ->
+ ok;
+validate(_, _Opts) ->
+ {validation_failure, too_many_args}.
+
+merge_defaults(_Args, Opts) ->
+ {_Args, maps:merge(#{vhost => <<"/">>}, Opts)}.
+
+usage() ->
+ <<"delete_super_stream <name> [--vhost <vhost>]">>.
+
+usage_additional() ->
+ [["<name>", "The name of the super stream to delete."],
+ ["--vhost <vhost>", "The virtual host of the super stream."]].
+
+usage_doc_guides() ->
+ [?STREAM_GUIDE_URL].
+
+run([SuperStream],
+ #{node := NodeName,
+ vhost := VHost,
+ timeout := Timeout}) ->
+ delete_super_stream(NodeName, Timeout, VHost, SuperStream).
+
+delete_super_stream(NodeName, Timeout, VHost, SuperStream) ->
+ case rabbit_misc:rpc_call(NodeName,
+ rabbit_stream_manager,
+ delete_super_stream,
+ [VHost, SuperStream, cli_acting_user()],
+ Timeout)
+ of
+ ok ->
+ {ok,
+ rabbit_misc:format("Super stream ~s has been deleted",
+ [SuperStream])};
+ Error ->
+ Error
+ end.
+
+banner(_, _) ->
+ <<"Deleting a super stream ...">>.
+
+output({error, Msg}, _Opts) ->
+ {error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg};
+output({ok, Msg}, _Opts) ->
+ {ok, Msg}.
+
+cli_acting_user() ->
+ 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user().
diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl
index 0c9f10e9db..bee44c4a29 100644
--- a/deps/rabbitmq_stream/test/commands_SUITE.erl
+++ b/deps/rabbitmq_stream/test/commands_SUITE.erl
@@ -25,6 +25,8 @@
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand').
-define(COMMAND_ADD_SUPER_STREAM,
'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand').
+-define(COMMAND_DELETE_SUPER_STREAM,
+ 'Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand').
all() ->
[{group, list_connections},
@@ -41,8 +43,11 @@ groups() ->
{list_publishers, [],
[list_publishers_merge_defaults, list_publishers_run]},
{super_streams, [],
- [add_super_stream_merge_defaults, add_super_stream_validate,
- add_super_stream_run]}].
+ [add_super_stream_merge_defaults,
+ add_super_stream_validate,
+ delete_super_stream_merge_defaults,
+ delete_super_stream_validate,
+ add_delete_super_stream_run]}].
init_per_suite(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
@@ -360,20 +365,58 @@ add_super_stream_validate(_Config) ->
<<"a,b,c">>})),
ok.
-add_super_stream_run(Config) ->
+delete_super_stream_merge_defaults(_Config) ->
+ ?assertMatch({[<<"super-stream">>], #{vhost := <<"/">>}},
+ ?COMMAND_DELETE_SUPER_STREAM:merge_defaults([<<"super-stream">>],
+ #{})),
+ ok.
+
+delete_super_stream_validate(_Config) ->
+ ?assertMatch({validation_failure, not_enough_args},
+ ?COMMAND_DELETE_SUPER_STREAM:validate([], #{})),
+ ?assertMatch({validation_failure, too_many_args},
+ ?COMMAND_DELETE_SUPER_STREAM:validate([<<"a">>, <<"b">>],
+ #{})),
+ ?assertEqual(ok, ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], #{})),
+ ok.
+
+add_delete_super_stream_run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =
#{node => Node,
timeout => 10000,
- vhost => <<"/">>,
- partitions => 3},
+ vhost => <<"/">>},
?assertMatch({ok, _},
- ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], Opts)),
+ ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
+ maps:merge(#{partitions => 3},
+ Opts))),
?assertEqual({ok,
[<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]},
- rabbit_stream_manager_SUITE:partitions(Config,
- <<"invoices">>)).
+ partitions(Config, <<"invoices">>)),
+ ?assertMatch({ok, _},
+ ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)),
+ ?assertEqual({error, stream_not_found},
+ partitions(Config, <<"invoices">>)),
+
+ ?assertMatch({ok, _},
+ ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
+ maps:merge(#{routing_keys =>
+ <<" amer,emea , apac">>},
+ Opts))),
+ ?assertEqual({ok,
+ [<<"invoices-amer">>, <<"invoices-emea">>,
+ <<"invoices-apac">>]},
+ partitions(Config, <<"invoices">>)),
+ ?assertMatch({ok, _},
+ ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)),
+ ?assertEqual({error, stream_not_found},
+ partitions(Config, <<"invoices">>)),
+
+ ok.
+
+partitions(Config, SuperStream) ->
+ rabbit_stream_manager_SUITE:partitions(Config, SuperStream).
create_stream(S, Stream, C0) ->
rabbit_stream_SUITE:test_create_stream(gen_tcp, S, Stream, C0).