summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-10-14 11:01:58 +0200
committerGitHub <noreply@github.com>2021-10-14 11:01:58 +0200
commit75b2a53f524cd65a3c04c86717649a7f8a89379c (patch)
tree713e4956877dcf8c3d4e73ca1bfc59d8e6abecaf
parentc935f9c7faf14eb37662375b7ec409fb3862caea (diff)
parent6b9589bae42b0bec11d75ae906810260da14ca76 (diff)
downloadrabbitmq-server-git-75b2a53f524cd65a3c04c86717649a7f8a89379c.tar.gz
Merge pull request #3503 from rabbitmq/super-stream-cli
Add functions to create/delete super stream in manager
-rw-r--r--deps/rabbit_common/src/rabbit_date_time.erl48
-rw-r--r--deps/rabbit_common/test/unit_SUITE.erl23
-rw-r--r--deps/rabbitmq_stream/BUILD.bazel4
-rw-r--r--deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl300
-rw-r--r--deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl97
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl581
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl2
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_utils.erl10
-rw-r--r--deps/rabbitmq_stream/test/commands_SUITE.erl194
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl149
10 files changed, 1283 insertions, 125 deletions
diff --git a/deps/rabbit_common/src/rabbit_date_time.erl b/deps/rabbit_common/src/rabbit_date_time.erl
new file mode 100644
index 0000000000..e4a56ad783
--- /dev/null
+++ b/deps/rabbit_common/src/rabbit_date_time.erl
@@ -0,0 +1,48 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_date_time).
+
+-export([parse_duration/1]).
+
+-type datetime_plist() :: list({atom(), integer()}).
+
+% from https://github.com/erlsci/iso8601/blob/main/src/iso8601.erl
+-spec gi(string()) -> integer().
+gi(DS) ->
+ {Int, _Rest} = string:to_integer(DS),
+ case Int of
+ error ->
+ 0;
+ _ ->
+ Int
+ end.
+
+-spec parse_duration(string()) -> datetime_plist().
+parse_duration(Bin)
+ when is_binary(Bin) -> %TODO extended format
+ parse_duration(binary_to_list(Bin));
+parse_duration(Str) ->
+ case re:run(Str,
+ "^(?<sign>-|\\+)?P(?:(?<years>[0-9]+)Y)?(?:(?<months>[0"
+ "-9]+)M)?(?:(?<days>[0-9]+)D)?(T(?:(?<hours>[0-9]+)H)?("
+ "?:(?<minutes>[0-9]+)M)?(?:(?<seconds>[0-9]+(?:\\.[0-9]"
+ "+)?)S)?)?$",
+ [{capture, [sign, years, months, days, hours, minutes, seconds],
+ list}])
+ of
+ {match, [Sign, Years, Months, Days, Hours, Minutes, Seconds]} ->
+ {ok, [{sign, Sign},
+ {years, gi(Years)},
+ {months, gi(Months)},
+ {days, gi(Days)},
+ {hours, gi(Hours)},
+ {minutes, gi(Minutes)},
+ {seconds, gi(Seconds)}]};
+ nomatch ->
+ error
+ end.
diff --git a/deps/rabbit_common/test/unit_SUITE.erl b/deps/rabbit_common/test/unit_SUITE.erl
index 105488bed0..f00df8787b 100644
--- a/deps/rabbit_common/test/unit_SUITE.erl
+++ b/deps/rabbit_common/test/unit_SUITE.erl
@@ -44,7 +44,8 @@ groups() ->
frame_encoding_does_not_fail_with_empty_binary_payload,
amqp_table_conversion,
name_type,
- get_erl_path
+ get_erl_path,
+ date_time_parse_duration
]},
{parse_mem_limit, [parallel], [
parse_mem_limit_relative_exactly_max,
@@ -460,3 +461,23 @@ get_erl_path(_) ->
?assertNotMatch(nomatch, string:find(Exe, "erl"))
end,
ok.
+
+date_time_parse_duration(_) ->
+ ?assertEqual(
+ {ok, [{sign, "+"}, {years, 6}, {months, 3}, {days, 1}, {hours, 1}, {minutes, 1}, {seconds, 1}]},
+ rabbit_date_time:parse_duration("+P6Y3M1DT1H1M1.1S")
+ ),
+ ?assertEqual(
+ {ok, [{sign, []}, {years, 0}, {months, 0}, {days, 0}, {hours, 0}, {minutes, 6}, {seconds, 0}]},
+ rabbit_date_time:parse_duration("PT6M")
+ ),
+ ?assertEqual(
+ {ok, [{sign, []}, {years, 0}, {months, 0}, {days, 0}, {hours, 0}, {minutes, 10}, {seconds, 30}]},
+ rabbit_date_time:parse_duration("PT10M30S")
+ ),
+ ?assertEqual(
+ {ok, [{sign, []}, {years, 0}, {months, 0}, {days, 5}, {hours, 8}, {minutes, 0}, {seconds, 0}]},
+ rabbit_date_time:parse_duration("P5DT8H")
+ ),
+ ?assertEqual(error, rabbit_date_time:parse_duration("foo")),
+ ok. \ No newline at end of file
diff --git a/deps/rabbitmq_stream/BUILD.bazel b/deps/rabbitmq_stream/BUILD.bazel
index 91cf7a1e40..6f7cb0b034 100644
--- a/deps/rabbitmq_stream/BUILD.bazel
+++ b/deps/rabbitmq_stream/BUILD.bazel
@@ -86,6 +86,10 @@ suites = [
),
rabbitmq_integration_suite(
PACKAGE,
+ name = "rabbit_stream_manager_SUITE",
+ ),
+ rabbitmq_integration_suite(
+ PACKAGE,
name = "rabbit_stream_SUITE",
deps = [
"//deps/rabbit:bazel_erlang_lib",
diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl
new file mode 100644
index 0000000000..4f2093a020
--- /dev/null
+++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl
@@ -0,0 +1,300 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
+
+-module('Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand').
+
+-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
+
+-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
+
+-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1},
+ {'Elixir.RabbitMQ.CLI.Core.Helpers', cli_acting_user, 0},
+ {'Elixir.RabbitMQ.CLI.Core.ExitCodes', exit_software, 0}]).
+
+-export([scopes/0,
+ usage/0,
+ usage_additional/0,
+ usage_doc_guides/0,
+ switches/0,
+ banner/2,
+ validate/2,
+ merge_defaults/2,
+ run/2,
+ output/2,
+ description/0,
+ help_section/0]).
+
+scopes() ->
+ [ctl, streams].
+
+description() ->
+ <<"Add a super stream (experimental feature)">>.
+
+switches() ->
+ [{partitions, integer},
+ {routing_keys, string},
+ {max_length_bytes, string},
+ {max_age, string},
+ {stream_max_segment_size_bytes, string},
+ {leader_locator, string},
+ {initial_cluster_size, integer}].
+
+help_section() ->
+ {plugin, stream}.
+
+validate([], _Opts) ->
+ {validation_failure, not_enough_args};
+validate([_Name], #{partitions := _, routing_keys := _}) ->
+ {validation_failure,
+ "Specify --partitions or routing-keys, not both."};
+validate([_Name], #{partitions := Partitions}) when Partitions < 1 ->
+ {validation_failure, "The partition number must be greater than 0"};
+validate([_Name], Opts) ->
+ validate_stream_arguments(Opts);
+validate(_, _Opts) ->
+ {validation_failure, too_many_args}.
+
+validate_stream_arguments(#{max_length_bytes := Value} = Opts) ->
+ case parse_information_unit(Value) of
+ error ->
+ {validation_failure,
+ "Invalid value for --max-length-bytes, valid example "
+ "values: 100gb, 50mb"};
+ _ ->
+ validate_stream_arguments(maps:remove(max_length_bytes, Opts))
+ end;
+validate_stream_arguments(#{max_age := Value} = Opts) ->
+ case rabbit_date_time:parse_duration(Value) of
+ {ok, _} ->
+ validate_stream_arguments(maps:remove(max_age, Opts));
+ error ->
+ {validation_failure,
+ "Invalid value for --max-age, the value must a "
+ "ISO 8601 duration, e.g. e.g. PT10M30S for 10 "
+ "minutes 30 seconds, P5DT8H for 5 days 8 hours."}
+ end;
+validate_stream_arguments(#{stream_max_segment_size_bytes := Value} =
+ Opts) ->
+ case parse_information_unit(Value) of
+ error ->
+ {validation_failure,
+ "Invalid value for --stream-max-segment-size-bytes, "
+ "valid example values: 100gb, 50mb"};
+ _ ->
+ validate_stream_arguments(maps:remove(stream_max_segment_size_bytes,
+ Opts))
+ end;
+validate_stream_arguments(#{leader_locator := <<"client-local">>} =
+ Opts) ->
+ validate_stream_arguments(maps:remove(leader_locator, Opts));
+validate_stream_arguments(#{leader_locator := <<"random">>} = Opts) ->
+ validate_stream_arguments(maps:remove(leader_locator, Opts));
+validate_stream_arguments(#{leader_locator := <<"least-leaders">>} =
+ Opts) ->
+ validate_stream_arguments(maps:remove(leader_locator, Opts));
+validate_stream_arguments(#{leader_locator := _}) ->
+ {validation_failure,
+ "Invalid value for --leader-locator, valid values "
+ "are client-local, random, least-leaders."};
+validate_stream_arguments(#{initial_cluster_size := Value} = Opts) ->
+ try
+ case rabbit_data_coercion:to_integer(Value) of
+ S when S > 0 ->
+ validate_stream_arguments(maps:remove(initial_cluster_size,
+ Opts));
+ _ ->
+ {validation_failure,
+ "Invalid value for --initial-cluster-size, the "
+ "value must be positive."}
+ end
+ catch
+ error:_ ->
+ {validation_failure,
+ "Invalid value for --initial-cluster-size, the "
+ "value must be a positive integer."}
+ end;
+validate_stream_arguments(_) ->
+ ok.
+
+merge_defaults(_Args, #{routing_keys := _V} = Opts) ->
+ {_Args, maps:merge(#{vhost => <<"/">>}, Opts)};
+merge_defaults(_Args, Opts) ->
+ {_Args, maps:merge(#{partitions => 3, vhost => <<"/">>}, Opts)}.
+
+usage() ->
+ <<"add_super_stream <name> [--vhost <vhost>] [--partition"
+ "s <partitions>] [--routing-keys <routing-keys>]">>.
+
+usage_additional() ->
+ [["<name>", "The name of the super stream."],
+ ["--vhost <vhost>", "The virtual host the super stream is added to."],
+ ["--partitions <partitions>",
+ "The number of partitions, default is 3. Mutually "
+ "exclusive with --routing-keys."],
+ ["--routing-keys <routing-keys>",
+ "Comma-separated list of routing keys. Mutually "
+ "exclusive with --partitions."],
+ ["--max-length-bytes <max-length-bytes>",
+ "The maximum size of partition streams, example "
+ "values: 20gb, 500mb."],
+ ["--max-age <max-age>",
+ "The maximum age of partition stream segments, "
+ "using the ISO 8601 duration format, e.g. PT10M30S "
+ "for 10 minutes 30 seconds, P5DT8H for 5 days "
+ "8 hours."],
+ ["--stream-max-segment-size-bytes <stream-max-segment-si"
+ "ze-bytes>",
+ "The maximum size of partition stream segments, "
+ "example values: 500mb, 1gb."],
+ ["--leader-locator <leader-locator>",
+ "Leader locator strategy for partition streams, "
+ "possible values are client-local, least-leaders, "
+ "random."],
+ ["--initial-cluster-size <initial-cluster-size>",
+ "The initial cluster size of partition streams."]].
+
+usage_doc_guides() ->
+ [?STREAM_GUIDE_URL].
+
+run([SuperStream],
+ #{node := NodeName,
+ vhost := VHost,
+ timeout := Timeout,
+ partitions := Partitions} =
+ Opts) ->
+ Streams =
+ [list_to_binary(binary_to_list(SuperStream)
+ ++ "-"
+ ++ integer_to_list(K))
+ || K <- lists:seq(0, Partitions - 1)],
+ RoutingKeys =
+ [integer_to_binary(K) || K <- lists:seq(0, Partitions - 1)],
+ create_super_stream(NodeName,
+ Timeout,
+ VHost,
+ SuperStream,
+ Streams,
+ stream_arguments(Opts),
+ RoutingKeys);
+run([SuperStream],
+ #{node := NodeName,
+ vhost := VHost,
+ timeout := Timeout,
+ routing_keys := RoutingKeysStr} =
+ Opts) ->
+ RoutingKeys =
+ [rabbit_data_coercion:to_binary(
+ string:strip(K))
+ || K
+ <- string:tokens(
+ rabbit_data_coercion:to_list(RoutingKeysStr), ",")],
+ Streams =
+ [list_to_binary(binary_to_list(SuperStream)
+ ++ "-"
+ ++ binary_to_list(K))
+ || K <- RoutingKeys],
+ create_super_stream(NodeName,
+ Timeout,
+ VHost,
+ SuperStream,
+ Streams,
+ stream_arguments(Opts),
+ RoutingKeys).
+
+stream_arguments(Opts) ->
+ stream_arguments(#{}, Opts).
+
+stream_arguments(Acc, Arguments) when map_size(Arguments) =:= 0 ->
+ Acc;
+stream_arguments(Acc, #{max_length_bytes := Value} = Arguments) ->
+ stream_arguments(maps:put(<<"max-length-bytes">>,
+ parse_information_unit(Value), Acc),
+ maps:remove(max_length_bytes, Arguments));
+stream_arguments(Acc, #{max_age := Value} = Arguments) ->
+ {ok, Duration} = rabbit_date_time:parse_duration(Value),
+ DurationInSeconds = duration_to_seconds(Duration),
+ stream_arguments(maps:put(<<"max-age">>,
+ list_to_binary(integer_to_list(DurationInSeconds)
+ ++ "s"),
+ Acc),
+ maps:remove(max_age, Arguments));
+stream_arguments(Acc,
+ #{stream_max_segment_size_bytes := Value} = Arguments) ->
+ stream_arguments(maps:put(<<"stream-max-segment-size-bytes">>,
+ parse_information_unit(Value), Acc),
+ maps:remove(stream_max_segment_size_bytes, Arguments));
+stream_arguments(Acc, #{initial_cluster_size := Value} = Arguments) ->
+ stream_arguments(maps:put(<<"initial-cluster-size">>,
+ rabbit_data_coercion:to_binary(Value), Acc),
+ maps:remove(initial_cluster_size, Arguments));
+stream_arguments(Acc, #{leader_locator := Value} = Arguments) ->
+ stream_arguments(maps:put(<<"queue-leader-locator">>, Value, Acc),
+ maps:remove(leader_locator, Arguments));
+stream_arguments(ArgumentsAcc, _Arguments) ->
+ ArgumentsAcc.
+
+duration_to_seconds([{sign, _},
+ {years, Y},
+ {months, M},
+ {days, D},
+ {hours, H},
+ {minutes, Mn},
+ {seconds, S}]) ->
+ Y * 365 * 86400 + M * 30 * 86400 + D * 86400 + H * 3600 + Mn * 60 + S.
+
+create_super_stream(NodeName,
+ Timeout,
+ VHost,
+ SuperStream,
+ Streams,
+ Arguments,
+ RoutingKeys) ->
+ case rabbit_misc:rpc_call(NodeName,
+ rabbit_stream_manager,
+ create_super_stream,
+ [VHost,
+ SuperStream,
+ Streams,
+ Arguments,
+ RoutingKeys,
+ cli_acting_user()],
+ Timeout)
+ of
+ ok ->
+ {ok,
+ rabbit_misc:format("Super stream ~s has been created",
+ [SuperStream])};
+ Error ->
+ Error
+ end.
+
+banner(_, _) ->
+ <<"Adding a super stream (experimental feature)...">>.
+
+output({error, Msg}, _Opts) ->
+ {error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg};
+output({ok, Msg}, _Opts) ->
+ {ok, Msg}.
+
+cli_acting_user() ->
+ 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user().
+
+parse_information_unit(Value) ->
+ case rabbit_resource_monitor_misc:parse_information_unit(Value) of
+ {ok, R} ->
+ integer_to_binary(R);
+ {error, _} ->
+ error
+ end.
diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl
new file mode 100644
index 0000000000..0a2f0f785e
--- /dev/null
+++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl
@@ -0,0 +1,97 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
+
+-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand').
+
+-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
+
+-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
+
+-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1},
+ {'Elixir.RabbitMQ.CLI.Core.Helpers', cli_acting_user, 0},
+ {'Elixir.RabbitMQ.CLI.Core.ExitCodes', exit_software, 0}]).
+
+-export([scopes/0,
+ usage/0,
+ usage_additional/0,
+ usage_doc_guides/0,
+ banner/2,
+ validate/2,
+ merge_defaults/2,
+ run/2,
+ output/2,
+ description/0,
+ help_section/0]).
+
+scopes() ->
+ [ctl, streams].
+
+description() ->
+ <<"Delete a super stream (experimental feature)">>.
+
+help_section() ->
+ {plugin, stream}.
+
+validate([], _Opts) ->
+ {validation_failure, not_enough_args};
+validate([_Name], _Opts) ->
+ ok;
+validate(_, _Opts) ->
+ {validation_failure, too_many_args}.
+
+merge_defaults(_Args, Opts) ->
+ {_Args, maps:merge(#{vhost => <<"/">>}, Opts)}.
+
+usage() ->
+ <<"delete_super_stream <name> [--vhost <vhost>]">>.
+
+usage_additional() ->
+ [["<name>", "The name of the super stream to delete."],
+ ["--vhost <vhost>", "The virtual host of the super stream."]].
+
+usage_doc_guides() ->
+ [?STREAM_GUIDE_URL].
+
+run([SuperStream],
+ #{node := NodeName,
+ vhost := VHost,
+ timeout := Timeout}) ->
+ delete_super_stream(NodeName, Timeout, VHost, SuperStream).
+
+delete_super_stream(NodeName, Timeout, VHost, SuperStream) ->
+ case rabbit_misc:rpc_call(NodeName,
+ rabbit_stream_manager,
+ delete_super_stream,
+ [VHost, SuperStream, cli_acting_user()],
+ Timeout)
+ of
+ ok ->
+ {ok,
+ rabbit_misc:format("Super stream ~s has been deleted",
+ [SuperStream])};
+ Error ->
+ Error
+ end.
+
+banner(_, _) ->
+ <<"Deleting a super stream (experimental feature)...">>.
+
+output({error, Msg}, _Opts) ->
+ {error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg};
+output({ok, Msg}, _Opts) ->
+ {ok, Msg}.
+
+cli_acting_user() ->
+ 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user().
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 67f3be2214..3ff764a808 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -19,6 +19,7 @@
-behaviour(gen_server).
-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit/include/amqqueue.hrl").
%% API
-export([init/1,
@@ -28,6 +29,8 @@
-export([start_link/1,
create/4,
delete/3,
+ create_super_stream/6,
+ delete_super_stream/3,
lookup_leader/2,
lookup_local_member/2,
topology/2,
@@ -56,6 +59,34 @@ create(VirtualHost, Reference, Arguments, Username) ->
delete(VirtualHost, Reference, Username) ->
gen_server:call(?MODULE, {delete, VirtualHost, Reference, Username}).
+-spec create_super_stream(binary(),
+ binary(),
+ [binary()],
+ #{binary() => binary()},
+ [binary()],
+ binary()) ->
+ ok | {error, term()}.
+create_super_stream(VirtualHost,
+ Name,
+ Partitions,
+ Arguments,
+ RoutingKeys,
+ Username) ->
+ gen_server:call(?MODULE,
+ {create_super_stream,
+ VirtualHost,
+ Name,
+ Partitions,
+ Arguments,
+ RoutingKeys,
+ Username}).
+
+-spec delete_super_stream(binary(), binary(), binary()) ->
+ ok | {error, term()}.
+delete_super_stream(VirtualHost, Name, Username) ->
+ gen_server:call(?MODULE,
+ {delete_super_stream, VirtualHost, Name, Username}).
+
-spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found.
lookup_leader(VirtualHost, Stream) ->
gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}).
@@ -149,100 +180,107 @@ validate_stream_queue_arguments([_ | T]) ->
handle_call({create, VirtualHost, Reference, Arguments, Username},
_From, State) ->
- Name =
- #resource{virtual_host = VirtualHost,
- kind = queue,
- name = Reference},
- StreamQueueArguments = stream_queue_arguments(Arguments),
- case validate_stream_queue_arguments(StreamQueueArguments) of
- ok ->
- Q0 = amqqueue:new(Name,
- none,
- true,
- false,
- none,
- StreamQueueArguments,
- VirtualHost,
- #{user => Username},
- rabbit_stream_queue),
- try
- QueueLookup =
- rabbit_amqqueue:with(Name,
- fun(Q) ->
- ok =
- rabbit_amqqueue:assert_equivalence(Q,
- true,
- false,
- StreamQueueArguments,
- none)
- end),
-
- case QueueLookup of
- ok ->
- {reply, {error, reference_already_exists}, State};
- {error, not_found} ->
- try
- case rabbit_queue_type:declare(Q0, node()) of
- {new, Q} ->
- {reply, {ok, amqqueue:get_type_state(Q)},
- State};
- {existing, _} ->
- {reply, {error, reference_already_exists},
- State};
- {error, Err} ->
- rabbit_log:warning("Error while creating ~p stream, ~p",
- [Reference, Err]),
- {reply, {error, internal_error}, State}
- end
- catch
- exit:Error ->
- rabbit_log:error("Error while creating ~p stream, ~p",
- [Reference, Error]),
- {reply, {error, internal_error}, State}
- end;
- {error, {absent, _, Reason}} ->
- rabbit_log:error("Error while creating ~p stream, ~p",
- [Reference, Reason]),
- {reply, {error, internal_error}, State}
- end
- catch
- exit:ExitError ->
- % likely to be a problem of inequivalent args on an existing stream
- rabbit_log:error("Error while creating ~p stream: ~p",
- [Reference, ExitError]),
- {reply, {error, validation_failed}, State}
- end;
- error ->
- {reply, {error, validation_failed}, State}
- end;
+ {reply, create_stream(VirtualHost, Reference, Arguments, Username),
+ State};
handle_call({delete, VirtualHost, Reference, Username}, _From,
State) ->
- Name =
- #resource{virtual_host = VirtualHost,
- kind = queue,
- name = Reference},
- rabbit_log:debug("Trying to delete stream ~p", [Reference]),
- case rabbit_amqqueue:lookup(Name) of
- {ok, Q} ->
- rabbit_log:debug("Found queue record ~p, checking if it is a stream",
- [Reference]),
- case is_stream_queue(Q) of
- true ->
- rabbit_log:debug("Queue record ~p is a stream, trying to delete it",
- [Reference]),
- {ok, _} =
- rabbit_stream_queue:delete(Q, false, false, Username),
- rabbit_log:debug("Stream ~p deleted", [Reference]),
- {reply, {ok, deleted}, State};
- _ ->
- rabbit_log:debug("Queue record ~p is NOT a stream, returning error",
- [Reference]),
- {reply, {error, reference_not_found}, State}
- end;
- {error, not_found} ->
- rabbit_log:debug("Stream ~p not found, cannot delete it",
- [Reference]),
- {reply, {error, reference_not_found}, State}
+ {reply, delete_stream(VirtualHost, Reference, Username), State};
+handle_call({create_super_stream,
+ VirtualHost,
+ Name,
+ Partitions,
+ Arguments,
+ RoutingKeys,
+ Username},
+ _From, State) ->
+ case validate_super_stream_creation(VirtualHost, Name, Partitions) of
+ {error, Reason} ->
+ {reply, {error, Reason}, State};
+ ok ->
+ case declare_super_stream_exchange(VirtualHost, Name, Username) of
+ ok ->
+ RollbackOperations =
+ [fun() ->
+ delete_super_stream_exchange(VirtualHost, Name,
+ Username)
+ end],
+ QueueCreationsResult =
+ lists:foldl(fun (Partition, {ok, RollbackOps}) ->
+ case create_stream(VirtualHost,
+ Partition,
+ Arguments,
+ Username)
+ of
+ {ok, _} ->
+ {ok,
+ [fun() ->
+ delete_stream(VirtualHost,
+ Partition,
+ Username)
+ end]
+ ++ RollbackOps};
+ {error, Reason} ->
+ {{error, Reason},
+ RollbackOps}
+ end;
+ (_,
+ {{error, _Reason}, _RollbackOps} =
+ Acc) ->
+ Acc
+ end,
+ {ok, RollbackOperations}, Partitions),
+ case QueueCreationsResult of
+ {ok, RollbackOps} ->
+ BindingsResult =
+ add_super_stream_bindings(VirtualHost,
+ Name,
+ Partitions,
+ RoutingKeys,
+ Username),
+ case BindingsResult of
+ ok ->
+ {reply, ok, State};
+ Error ->
+ [Fun() || Fun <- RollbackOps],
+ {reply, Error, State}
+ end;
+ {{error, Reason}, RollbackOps} ->
+ [Fun() || Fun <- RollbackOps],
+ {reply, {error, Reason}, State}
+ end;
+ {error, Msg} ->
+ {reply, {error, Msg}, State}
+ end
+ end;
+handle_call({delete_super_stream, VirtualHost, SuperStream, Username},
+ _From, State) ->
+ case super_stream_partitions(VirtualHost, SuperStream) of
+ {ok, Partitions} ->
+ case delete_super_stream_exchange(VirtualHost, SuperStream,
+ Username)
+ of
+ ok ->
+ ok;
+ {error, Error} ->
+ rabbit_log:warning("Error while deleting super stream exchange ~p, ~p",
+ [SuperStream, Error]),
+ ok
+ end,
+ [begin
+ case delete_stream(VirtualHost, Stream, Username) of
+ {ok, deleted} ->
+ ok;
+ {error, Err} ->
+ rabbit_log:warning("Error while delete partition ~p of super stream "
+ "~p, ~p",
+ [Stream, SuperStream, Err]),
+ ok
+ end
+ end
+ || Stream <- Partitions],
+ {reply, ok, State};
+ {error, Error} ->
+ {reply, {error, Error}, State}
end;
handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
Name =
@@ -382,31 +420,7 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
end,
{reply, Res, State};
handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
- ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream),
- Res = try
- rabbit_exchange:lookup_or_die(ExchangeName),
- UnorderedBindings =
- [Binding
- || Binding = #binding{destination = D}
- <- rabbit_binding:list_for_source(ExchangeName),
- is_resource_stream_queue(D)],
- OrderedBindings =
- rabbit_stream_utils:sort_partitions(UnorderedBindings),
- {ok,
- lists:foldl(fun (#binding{destination =
- #resource{kind = queue, name = Q}},
- Acc) ->
- Acc ++ [Q];
- (_Binding, Acc) ->
- Acc
- end,
- [], OrderedBindings)}
- catch
- exit:Error ->
- rabbit_log:error("Error while looking up exchange ~p, ~p",
- [ExchangeName, Error]),
- {error, stream_not_found}
- end,
+ Res = super_stream_partitions(VirtualHost, SuperStream),
{reply, Res, State};
handle_call(which_children, _From, State) ->
{reply, [], State}.
@@ -418,6 +432,339 @@ handle_info(Info, State) ->
rabbit_log:info("Received info ~p", [Info]),
{noreply, State}.
+create_stream(VirtualHost, Reference, Arguments, Username) ->
+ Name =
+ #resource{virtual_host = VirtualHost,
+ kind = queue,
+ name = Reference},
+ StreamQueueArguments = stream_queue_arguments(Arguments),
+ case validate_stream_queue_arguments(StreamQueueArguments) of
+ ok ->
+ Q0 = amqqueue:new(Name,
+ none,
+ true,
+ false,
+ none,
+ StreamQueueArguments,
+ VirtualHost,
+ #{user => Username},
+ rabbit_stream_queue),
+ try
+ QueueLookup =
+ rabbit_amqqueue:with(Name,
+ fun(Q) ->
+ ok =
+ rabbit_amqqueue:assert_equivalence(Q,
+ true,
+ false,
+ StreamQueueArguments,
+ none)
+ end),
+
+ case QueueLookup of
+ ok ->
+ {error, reference_already_exists};
+ {error, not_found} ->
+ try
+ case rabbit_queue_type:declare(Q0, node()) of
+ {new, Q} ->
+ {ok, amqqueue:get_type_state(Q)};
+ {existing, _} ->
+ {error, reference_already_exists};
+ {error, Err} ->
+ rabbit_log:warning("Error while creating ~p stream, ~p",
+ [Reference, Err]),
+ {error, internal_error}
+ end
+ catch
+ exit:Error ->
+ rabbit_log:error("Error while creating ~p stream, ~p",
+ [Reference, Error]),
+ {error, internal_error}
+ end;
+ {error, {absent, _, Reason}} ->
+ rabbit_log:error("Error while creating ~p stream, ~p",
+ [Reference, Reason]),
+ {error, internal_error}
+ end
+ catch
+ exit:ExitError ->
+ % likely to be a problem of inequivalent args on an existing stream
+ rabbit_log:error("Error while creating ~p stream: ~p",
+ [Reference, ExitError]),
+ {error, validation_failed}
+ end;
+ error ->
+ {error, validation_failed}
+ end.
+
+delete_stream(VirtualHost, Reference, Username) ->
+ Name =
+ #resource{virtual_host = VirtualHost,
+ kind = queue,
+ name = Reference},
+ rabbit_log:debug("Trying to delete stream ~p", [Reference]),
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, Q} ->
+ rabbit_log:debug("Found queue record ~p, checking if it is a stream",
+ [Reference]),
+ case is_stream_queue(Q) of
+ true ->
+ rabbit_log:debug("Queue record ~p is a stream, trying to delete it",
+ [Reference]),
+ {ok, _} =
+ rabbit_stream_queue:delete(Q, false, false, Username),
+ rabbit_log:debug("Stream ~p deleted", [Reference]),
+ {ok, deleted};
+ _ ->
+ rabbit_log:debug("Queue record ~p is NOT a stream, returning error",
+ [Reference]),
+ {error, reference_not_found}
+ end;
+ {error, not_found} ->
+ rabbit_log:debug("Stream ~p not found, cannot delete it",
+ [Reference]),
+ {error, reference_not_found}
+ end.
+
+super_stream_partitions(VirtualHost, SuperStream) ->
+ ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream),
+ try
+ rabbit_exchange:lookup_or_die(ExchangeName),
+ UnorderedBindings =
+ [Binding
+ || Binding = #binding{destination = D}
+ <- rabbit_binding:list_for_source(ExchangeName),
+ is_resource_stream_queue(D)],
+ OrderedBindings =
+ rabbit_stream_utils:sort_partitions(UnorderedBindings),
+ {ok,
+ lists:foldl(fun (#binding{destination =
+ #resource{kind = queue, name = Q}},
+ Acc) ->
+ Acc ++ [Q];
+ (_Binding, Acc) ->
+ Acc
+ end,
+ [], OrderedBindings)}
+ catch
+ exit:Error ->
+ rabbit_log:error("Error while looking up exchange ~p, ~p",
+ [ExchangeName, Error]),
+ {error, stream_not_found}
+ end.
+
+validate_super_stream_creation(VirtualHost, Name, Partitions) ->
+ case exchange_exists(VirtualHost, Name) of
+ {error, validation_failed} ->
+ {error,
+ {validation_failed,
+ rabbit_misc:format("~s is not a correct name for a super stream",
+ [Name])}};
+ {ok, true} ->
+ {error,
+ {reference_already_exists,
+ rabbit_misc:format("there is already an exchange named ~s",
+ [Name])}};
+ {ok, false} ->
+ case check_already_existing_queue(VirtualHost, Partitions) of
+ {error, Reason} ->
+ {error, Reason};
+ ok ->
+ ok
+ end
+ end.
+
+exchange_exists(VirtualHost, Name) ->
+ case rabbit_stream_utils:enforce_correct_name(Name) of
+ {ok, CorrectName} ->
+ ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName),
+ case rabbit_exchange:lookup(ExchangeName) of
+ {ok, _} ->
+ {ok, true};
+ {error, not_found} ->
+ {ok, false}
+ end;
+ error ->
+ {error, validation_failed}
+ end.
+
+queue_exists(VirtualHost, Name) ->
+ case rabbit_stream_utils:enforce_correct_name(Name) of
+ {ok, CorrectName} ->
+ QueueName = rabbit_misc:r(VirtualHost, queue, CorrectName),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, _} ->
+ {ok, true};
+ {error, not_found} ->
+ {ok, false}
+ end;
+ error ->
+ {error, validation_failed}
+ end.
+
+check_already_existing_queue(VirtualHost, Queues) ->
+ check_already_existing_queue0(VirtualHost, Queues, undefined).
+
+check_already_existing_queue0(_VirtualHost, [], undefined) ->
+ ok;
+check_already_existing_queue0(VirtualHost, [Q | T], _Error) ->
+ case queue_exists(VirtualHost, Q) of
+ {ok, false} ->
+ check_already_existing_queue0(VirtualHost, T, undefined);
+ {ok, true} ->
+ {error,
+ {reference_already_exists,
+ rabbit_misc:format("there is already a queue named ~s", [Q])}};
+ {error, validation_failed} ->
+ {error,
+ {validation_failed,
+ rabbit_misc:format("~s is not a correct name for a queue", [Q])}}
+ end.
+
+declare_super_stream_exchange(VirtualHost, Name, Username) ->
+ case rabbit_stream_utils:enforce_correct_name(Name) of
+ {ok, CorrectName} ->
+ Args =
+ rabbit_misc:set_table_value([],
+ <<"x-super-stream">>,
+ bool,
+ true),
+ CheckedType = rabbit_exchange:check_type(<<"direct">>),
+ ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName),
+ X = case rabbit_exchange:lookup(ExchangeName) of
+ {ok, FoundX} ->
+ FoundX;
+ {error, not_found} ->
+ rabbit_exchange:declare(ExchangeName,
+ CheckedType,
+ true,
+ false,
+ false,
+ Args,
+ Username)
+ end,
+ try
+ ok =
+ rabbit_exchange:assert_equivalence(X,
+ CheckedType,
+ true,
+ false,
+ false,
+ Args)
+ catch
+ exit:ExitError ->
+ % likely to be a problem of inequivalent args on an existing stream
+ rabbit_log:error("Error while creating ~p super stream exchange: ~p",
+ [Name, ExitError]),
+ {error, validation_failed}
+ end;
+ error ->
+ {error, validation_failed}
+ end.
+
+add_super_stream_bindings(VirtualHost,
+ Name,
+ Partitions,
+ RoutingKeys,
+ Username) ->
+ PartitionsRoutingKeys = lists:zip(Partitions, RoutingKeys),
+ BindingsResult =
+ lists:foldl(fun ({Partition, RoutingKey}, {ok, Order}) ->
+ case add_super_stream_binding(VirtualHost,
+ Name,
+ Partition,
+ RoutingKey,
+ Order,
+ Username)
+ of
+ ok ->
+ {ok, Order + 1};
+ {error, Reason} ->
+ {{error, Reason}, 0}
+ end;
+ (_, {{error, _Reason}, _Order} = Acc) ->
+ Acc
+ end,
+ {ok, 0}, PartitionsRoutingKeys),
+ case BindingsResult of
+ {ok, _} ->
+ ok;
+ {{error, Reason}, _} ->
+ {error, Reason}
+ end.
+
+add_super_stream_binding(VirtualHost,
+ SuperStream,
+ Partition,
+ RoutingKey,
+ Order,
+ Username) ->
+ {ok, ExchangeNameBin} =
+ rabbit_stream_utils:enforce_correct_name(SuperStream),
+ {ok, QueueNameBin} =
+ rabbit_stream_utils:enforce_correct_name(Partition),
+ ExchangeName = rabbit_misc:r(VirtualHost, exchange, ExchangeNameBin),
+ QueueName = rabbit_misc:r(VirtualHost, queue, QueueNameBin),
+ Pid = self(),
+ Arguments =
+ rabbit_misc:set_table_value([],
+ <<"x-stream-partition-order">>,
+ long,
+ Order),
+ case rabbit_binding:add(#binding{source = ExchangeName,
+ destination = QueueName,
+ key = RoutingKey,
+ args = Arguments},
+ fun (_X, Q) when ?is_amqqueue(Q) ->
+ try
+ rabbit_amqqueue:check_exclusive_access(Q,
+ Pid)
+ catch
+ exit:Reason ->
+ {error, Reason}
+ end;
+ (_X, #exchange{}) ->
+ ok
+ end,
+ Username)
+ of
+ {error, {resources_missing, [{not_found, Name} | _]}} ->
+ {error,
+ {stream_not_found,
+ rabbit_misc:format("stream ~s does not exists", [Name])}};
+ {error, {resources_missing, [{absent, Q, _Reason} | _]}} ->
+ {error,
+ {stream_not_found,
+ rabbit_misc:format("stream ~s does not exists (absent)", [Q])}};
+ {error, binding_not_found} ->
+ {error,
+ {not_found,
+ rabbit_misc:format("no binding ~s between ~s and ~s",
+ [RoutingKey, rabbit_misc:rs(ExchangeName),
+ rabbit_misc:rs(QueueName)])}};
+ {error, {binding_invalid, Fmt, Args}} ->
+ {error, {binding_invalid, rabbit_misc:format(Fmt, Args)}};
+ {error, #amqp_error{} = Error} ->
+ {error, {internal_error, rabbit_misc:format("~p", [Error])}};
+ ok ->
+ ok
+ end.
+
+delete_super_stream_exchange(VirtualHost, Name, Username) ->
+ case rabbit_stream_utils:enforce_correct_name(Name) of
+ {ok, CorrectName} ->
+ ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName),
+ case rabbit_exchange:delete(ExchangeName, false, Username) of
+ {error, not_found} ->
+ ok;
+ ok ->
+ ok
+ end;
+ error ->
+ {error, validation_failed}
+ end.
+
leader_from_members(Q) ->
QState = amqqueue:get_type_state(Q),
#{name := StreamName} = QState,
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 91c0df8be0..ede34d95c7 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -2044,7 +2044,7 @@ handle_frame_post_auth(Transport,
State,
{request, CorrelationId,
{create_stream, Stream, Arguments}}) ->
- case rabbit_stream_utils:enforce_correct_stream_name(Stream) of
+ case rabbit_stream_utils:enforce_correct_name(Stream) of
{ok, StreamName} ->
case rabbit_stream_utils:check_configure_permitted(#resource{name =
StreamName,
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
index 92d0bff8af..37545424c8 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
@@ -17,7 +17,7 @@
-module(rabbit_stream_utils).
%% API
--export([enforce_correct_stream_name/1,
+-export([enforce_correct_name/1,
write_messages/4,
parse_map/2,
auth_mechanisms/1,
@@ -26,13 +26,14 @@
check_write_permitted/3,
check_read_permitted/3,
extract_stream_list/2,
- sort_partitions/1]).
+ sort_partitions/1,
+ strip_cr_lf/1]).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-include_lib("rabbit_common/include/rabbit.hrl").
-enforce_correct_stream_name(Name) ->
+enforce_correct_name(Name) ->
% from rabbit_channel
StrippedName =
binary:replace(Name, [<<"\n">>, <<"\r">>], <<"">>, [global]),
@@ -236,3 +237,6 @@ sort_partitions(Partitions) ->
end
end,
Partitions).
+
+strip_cr_lf(NameBin) ->
+ binary:replace(NameBin, [<<"\n">>, <<"\r">>], <<"">>, [global]).
diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl
index c2652b4ad0..902bfee3c8 100644
--- a/deps/rabbitmq_stream/test/commands_SUITE.erl
+++ b/deps/rabbitmq_stream/test/commands_SUITE.erl
@@ -23,10 +23,16 @@
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand').
-define(COMMAND_LIST_PUBLISHERS,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand').
+-define(COMMAND_ADD_SUPER_STREAM,
+ 'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand').
+-define(COMMAND_DELETE_SUPER_STREAM,
+ 'Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand').
all() ->
- [{group, list_connections}, {group, list_consumers},
- {group, list_publishers}].
+ [{group, list_connections},
+ {group, list_consumers},
+ {group, list_publishers},
+ {group, super_streams}].
groups() ->
[{list_connections, [],
@@ -35,7 +41,13 @@ groups() ->
{list_consumers, [],
[list_consumers_merge_defaults, list_consumers_run]},
{list_publishers, [],
- [list_publishers_merge_defaults, list_publishers_run]}].
+ [list_publishers_merge_defaults, list_publishers_run]},
+ {super_streams, [],
+ [add_super_stream_merge_defaults,
+ add_super_stream_validate,
+ delete_super_stream_merge_defaults,
+ delete_super_stream_validate,
+ add_delete_super_stream_run]}].
init_per_suite(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
@@ -309,6 +321,174 @@ list_publishers_run(Config) ->
?awaitMatch(0, publisher_count(Config), ?WAIT),
ok.
+add_super_stream_merge_defaults(_Config) ->
+ ?assertMatch({[<<"super-stream">>],
+ #{partitions := 3, vhost := <<"/">>}},
+ ?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
+ #{})),
+
+ ?assertMatch({[<<"super-stream">>],
+ #{partitions := 5, vhost := <<"/">>}},
+ ?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
+ #{partitions => 5})),
+
+ DefaultWithRoutingKeys =
+ ?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
+ #{routing_keys =>
+ <<"amer,emea,apac">>}),
+ ?assertMatch({[<<"super-stream">>],
+ #{routing_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
+ DefaultWithRoutingKeys),
+
+ {_, Opts} = DefaultWithRoutingKeys,
+ ?assertEqual(false, maps:is_key(partitions, Opts)).
+
+add_super_stream_validate(_Config) ->
+ ?assertMatch({validation_failure, not_enough_args},
+ ?COMMAND_ADD_SUPER_STREAM:validate([], #{})),
+ ?assertMatch({validation_failure, too_many_args},
+ ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>, <<"b">>], #{})),
+ ?assertMatch({validation_failure, _},
+ ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
+ #{partitions => 1,
+ routing_keys =>
+ <<"a,b,c">>})),
+ ?assertMatch({validation_failure, _},
+ ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
+ #{partitions => 0})),
+ ?assertEqual(ok,
+ ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
+ #{partitions => 5})),
+ ?assertEqual(ok,
+ ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
+ #{routing_keys =>
+ <<"a,b,c">>})),
+
+ [case Expected of
+ ok ->
+ ?assertEqual(ok,
+ ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts));
+ error ->
+ ?assertMatch({validation_failure, _},
+ ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts))
+ end
+ || {Opts, Expected}
+ <- [{#{max_length_bytes => 1000}, ok},
+ {#{max_length_bytes => <<"1000">>}, ok},
+ {#{max_length_bytes => <<"100gb">>}, ok},
+ {#{max_length_bytes => <<"50mb">>}, ok},
+ {#{max_length_bytes => <<"50bm">>}, error},
+ {#{max_age => <<"PT10M">>}, ok},
+ {#{max_age => <<"P5DT8H">>}, ok},
+ {#{max_age => <<"foo">>}, error},
+ {#{stream_max_segment_size_bytes => 1000}, ok},
+ {#{stream_max_segment_size_bytes => <<"1000">>}, ok},
+ {#{stream_max_segment_size_bytes => <<"100gb">>}, ok},
+ {#{stream_max_segment_size_bytes => <<"50mb">>}, ok},
+ {#{stream_max_segment_size_bytes => <<"50bm">>}, error},
+ {#{leader_locator => <<"client-local">>}, ok},
+ {#{leader_locator => <<"least-leaders">>}, ok},
+ {#{leader_locator => <<"random">>}, ok},
+ {#{leader_locator => <<"foo">>}, error},
+ {#{initial_cluster_size => <<"1">>}, ok},
+ {#{initial_cluster_size => <<"2">>}, ok},
+ {#{initial_cluster_size => <<"3">>}, ok},
+ {#{initial_cluster_size => <<"0">>}, error},
+ {#{initial_cluster_size => <<"-1">>}, error},
+ {#{initial_cluster_size => <<"foo">>}, error}]],
+ ok.
+
+delete_super_stream_merge_defaults(_Config) ->
+ ?assertMatch({[<<"super-stream">>], #{vhost := <<"/">>}},
+ ?COMMAND_DELETE_SUPER_STREAM:merge_defaults([<<"super-stream">>],
+ #{})),
+ ok.
+
+delete_super_stream_validate(_Config) ->
+ ?assertMatch({validation_failure, not_enough_args},
+ ?COMMAND_DELETE_SUPER_STREAM:validate([], #{})),
+ ?assertMatch({validation_failure, too_many_args},
+ ?COMMAND_DELETE_SUPER_STREAM:validate([<<"a">>, <<"b">>],
+ #{})),
+ ?assertEqual(ok, ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], #{})),
+ ok.
+
+add_delete_super_stream_run(Config) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Opts =
+ #{node => Node,
+ timeout => 10000,
+ vhost => <<"/">>},
+
+ % with number of partitions
+ ?assertMatch({ok, _},
+ ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
+ maps:merge(#{partitions => 3},
+ Opts))),
+ ?assertEqual({ok,
+ [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]},
+ partitions(Config, <<"invoices">>)),
+ ?assertMatch({ok, _},
+ ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)),
+ ?assertEqual({error, stream_not_found},
+ partitions(Config, <<"invoices">>)),
+
+ % with routing keys
+ ?assertMatch({ok, _},
+ ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
+ maps:merge(#{routing_keys =>
+ <<" amer,emea , apac">>},
+ Opts))),
+ ?assertEqual({ok,
+ [<<"invoices-amer">>, <<"invoices-emea">>,
+ <<"invoices-apac">>]},
+ partitions(Config, <<"invoices">>)),
+ ?assertMatch({ok, _},
+ ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)),
+ ?assertEqual({error, stream_not_found},
+ partitions(Config, <<"invoices">>)),
+
+ % with arguments
+ ExtraOptions =
+ #{partitions => 3,
+ max_length_bytes => <<"50mb">>,
+ max_age => <<"PT10M">>,
+ stream_max_segment_size_bytes => <<"1mb">>,
+ leader_locator => <<"random">>,
+ initial_cluster_size => <<"1">>},
+
+ ?assertMatch({ok, _},
+ ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
+ maps:merge(ExtraOptions, Opts))),
+
+ {ok, Q} = queue_lookup(Config, <<"invoices-0">>),
+ Args = amqqueue:get_arguments(Q),
+ ?assertMatch({_, <<"random">>},
+ rabbit_misc:table_lookup(Args, <<"x-queue-leader-locator">>)),
+ ?assertMatch({_, 1},
+ rabbit_misc:table_lookup(Args, <<"x-initial-cluster-size">>)),
+ ?assertMatch({_, 1000000},
+ rabbit_misc:table_lookup(Args,
+ <<"x-stream-max-segment-size-bytes">>)),
+ ?assertMatch({_, <<"600s">>},
+ rabbit_misc:table_lookup(Args, <<"x-max-age">>)),
+ ?assertMatch({_, 50000000},
+ rabbit_misc:table_lookup(Args, <<"x-max-length-bytes">>)),
+ ?assertMatch({_, <<"stream">>},
+ rabbit_misc:table_lookup(Args, <<"x-queue-type">>)),
+
+ ?assertMatch({ok, _},
+ ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)),
+
+ ok.
+
+partitions(Config, Name) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ partitions,
+ [<<"/">>, Name]).
+
create_stream(S, Stream, C0) ->
rabbit_stream_SUITE:test_create_stream(gen_tcp, S, Stream, C0).
@@ -384,3 +564,11 @@ amqp_params(network, _, Port) ->
#amqp_params_network{port = Port};
amqp_params(direct, Node, _) ->
#amqp_params_direct{node = Node}.
+
+queue_lookup(Config, Q) ->
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q),
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_amqqueue,
+ lookup,
+ [QueueName]).
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
new file mode 100644
index 0000000000..b47a954a95
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
@@ -0,0 +1,149 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_manager_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-compile(export_all).
+
+all() ->
+ [{group, non_parallel_tests}].
+
+groups() ->
+ [{non_parallel_tests, [], [manage_super_stream]}].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(Config) ->
+ Config.
+
+init_per_group(_, Config) ->
+ Config1 =
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]),
+ Config2 =
+ rabbit_ct_helpers:set_config(Config1,
+ {rabbitmq_ct_tls_verify, verify_none}),
+ Config3 =
+ rabbit_ct_helpers:set_config(Config2, {rabbitmq_stream, verify_none}),
+ rabbit_ct_helpers:run_setup_steps(Config3,
+ [fun(StepConfig) ->
+ rabbit_ct_helpers:merge_app_env(StepConfig,
+ {rabbit,
+ [{core_metrics_gc_interval,
+ 1000}]})
+ end,
+ fun(StepConfig) ->
+ rabbit_ct_helpers:merge_app_env(StepConfig,
+ {rabbitmq_stream,
+ [{connection_negotiation_step_timeout,
+ 500}]})
+ end]
+ ++ rabbit_ct_broker_helpers:setup_steps()).
+
+end_per_group(_, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+manage_super_stream(Config) ->
+ % create super stream
+ ?assertEqual(ok,
+ create_super_stream(Config,
+ <<"invoices">>,
+ [<<"invoices-0">>, <<"invoices-1">>,
+ <<"invoices-2">>],
+ [<<"0">>, <<"1">>, <<"2">>])),
+ % get the correct partitions
+ ?assertEqual({ok,
+ [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]},
+ partitions(Config, <<"invoices">>)),
+
+ [?assertEqual({ok, [Partition]},
+ route(Config, RoutingKey, <<"invoices">>))
+ || {Partition, RoutingKey}
+ <- [{<<"invoices-0">>, <<"0">>}, {<<"invoices-1">>, <<"1">>},
+ {<<"invoices-2">>, <<"2">>}]],
+
+ % get an error if trying to re-create it
+ ?assertMatch({error, _},
+ create_super_stream(Config,
+ <<"invoices">>,
+ [<<"invoices-0">>, <<"invoices-1">>,
+ <<"invoices-2">>],
+ [<<"0">>, <<"1">>, <<"2">>])),
+
+ % can delete it
+ ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)),
+
+ % create a stream with the same name as a potential partition
+ ?assertMatch({ok, _}, create_stream(Config, <<"invoices-1">>)),
+
+ % cannot create the super stream because a partition already exists
+ ?assertMatch({error, _},
+ create_super_stream(Config,
+ <<"invoices">>,
+ [<<"invoices-0">>, <<"invoices-1">>,
+ <<"invoices-2">>],
+ [<<"0">>, <<"1">>, <<"2">>])),
+
+ ok.
+
+create_super_stream(Config, Name, Partitions, RKs) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ create_super_stream,
+ [<<"/">>,
+ Name,
+ Partitions,
+ #{},
+ RKs,
+ <<"guest">>]).
+
+delete_super_stream(Config, Name) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ delete_super_stream,
+ [<<"/">>, Name, <<"guest">>]).
+
+create_stream(Config, Name) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ create,
+ [<<"/">>, Name, [], <<"guest">>]).
+
+partitions(Config, Name) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ partitions,
+ [<<"/">>, Name]).
+
+route(Config, RoutingKey, SuperStream) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ route,
+ [RoutingKey, <<"/">>, SuperStream]).