diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-10-14 11:01:58 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-14 11:01:58 +0200 |
commit | 75b2a53f524cd65a3c04c86717649a7f8a89379c (patch) | |
tree | 713e4956877dcf8c3d4e73ca1bfc59d8e6abecaf | |
parent | c935f9c7faf14eb37662375b7ec409fb3862caea (diff) | |
parent | 6b9589bae42b0bec11d75ae906810260da14ca76 (diff) | |
download | rabbitmq-server-git-75b2a53f524cd65a3c04c86717649a7f8a89379c.tar.gz |
Merge pull request #3503 from rabbitmq/super-stream-cli
Add functions to create/delete super stream in manager
10 files changed, 1283 insertions, 125 deletions
diff --git a/deps/rabbit_common/src/rabbit_date_time.erl b/deps/rabbit_common/src/rabbit_date_time.erl new file mode 100644 index 0000000000..e4a56ad783 --- /dev/null +++ b/deps/rabbit_common/src/rabbit_date_time.erl @@ -0,0 +1,48 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_date_time). + +-export([parse_duration/1]). + +-type datetime_plist() :: list({atom(), integer()}). + +% from https://github.com/erlsci/iso8601/blob/main/src/iso8601.erl +-spec gi(string()) -> integer(). +gi(DS) -> + {Int, _Rest} = string:to_integer(DS), + case Int of + error -> + 0; + _ -> + Int + end. + +-spec parse_duration(string()) -> datetime_plist(). +parse_duration(Bin) + when is_binary(Bin) -> %TODO extended format + parse_duration(binary_to_list(Bin)); +parse_duration(Str) -> + case re:run(Str, + "^(?<sign>-|\\+)?P(?:(?<years>[0-9]+)Y)?(?:(?<months>[0" + "-9]+)M)?(?:(?<days>[0-9]+)D)?(T(?:(?<hours>[0-9]+)H)?(" + "?:(?<minutes>[0-9]+)M)?(?:(?<seconds>[0-9]+(?:\\.[0-9]" + "+)?)S)?)?$", + [{capture, [sign, years, months, days, hours, minutes, seconds], + list}]) + of + {match, [Sign, Years, Months, Days, Hours, Minutes, Seconds]} -> + {ok, [{sign, Sign}, + {years, gi(Years)}, + {months, gi(Months)}, + {days, gi(Days)}, + {hours, gi(Hours)}, + {minutes, gi(Minutes)}, + {seconds, gi(Seconds)}]}; + nomatch -> + error + end. diff --git a/deps/rabbit_common/test/unit_SUITE.erl b/deps/rabbit_common/test/unit_SUITE.erl index 105488bed0..f00df8787b 100644 --- a/deps/rabbit_common/test/unit_SUITE.erl +++ b/deps/rabbit_common/test/unit_SUITE.erl @@ -44,7 +44,8 @@ groups() -> frame_encoding_does_not_fail_with_empty_binary_payload, amqp_table_conversion, name_type, - get_erl_path + get_erl_path, + date_time_parse_duration ]}, {parse_mem_limit, [parallel], [ parse_mem_limit_relative_exactly_max, @@ -460,3 +461,23 @@ get_erl_path(_) -> ?assertNotMatch(nomatch, string:find(Exe, "erl")) end, ok. + +date_time_parse_duration(_) -> + ?assertEqual( + {ok, [{sign, "+"}, {years, 6}, {months, 3}, {days, 1}, {hours, 1}, {minutes, 1}, {seconds, 1}]}, + rabbit_date_time:parse_duration("+P6Y3M1DT1H1M1.1S") + ), + ?assertEqual( + {ok, [{sign, []}, {years, 0}, {months, 0}, {days, 0}, {hours, 0}, {minutes, 6}, {seconds, 0}]}, + rabbit_date_time:parse_duration("PT6M") + ), + ?assertEqual( + {ok, [{sign, []}, {years, 0}, {months, 0}, {days, 0}, {hours, 0}, {minutes, 10}, {seconds, 30}]}, + rabbit_date_time:parse_duration("PT10M30S") + ), + ?assertEqual( + {ok, [{sign, []}, {years, 0}, {months, 0}, {days, 5}, {hours, 8}, {minutes, 0}, {seconds, 0}]}, + rabbit_date_time:parse_duration("P5DT8H") + ), + ?assertEqual(error, rabbit_date_time:parse_duration("foo")), + ok.
\ No newline at end of file diff --git a/deps/rabbitmq_stream/BUILD.bazel b/deps/rabbitmq_stream/BUILD.bazel index 91cf7a1e40..6f7cb0b034 100644 --- a/deps/rabbitmq_stream/BUILD.bazel +++ b/deps/rabbitmq_stream/BUILD.bazel @@ -86,6 +86,10 @@ suites = [ ), rabbitmq_integration_suite( PACKAGE, + name = "rabbit_stream_manager_SUITE", + ), + rabbitmq_integration_suite( + PACKAGE, name = "rabbit_stream_SUITE", deps = [ "//deps/rabbit:bazel_erlang_lib", diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl new file mode 100644 index 0000000000..4f2093a020 --- /dev/null +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl @@ -0,0 +1,300 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved. + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand'). + +-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1}, + {'Elixir.RabbitMQ.CLI.Core.Helpers', cli_acting_user, 0}, + {'Elixir.RabbitMQ.CLI.Core.ExitCodes', exit_software, 0}]). + +-export([scopes/0, + usage/0, + usage_additional/0, + usage_doc_guides/0, + switches/0, + banner/2, + validate/2, + merge_defaults/2, + run/2, + output/2, + description/0, + help_section/0]). + +scopes() -> + [ctl, streams]. + +description() -> + <<"Add a super stream (experimental feature)">>. + +switches() -> + [{partitions, integer}, + {routing_keys, string}, + {max_length_bytes, string}, + {max_age, string}, + {stream_max_segment_size_bytes, string}, + {leader_locator, string}, + {initial_cluster_size, integer}]. + +help_section() -> + {plugin, stream}. + +validate([], _Opts) -> + {validation_failure, not_enough_args}; +validate([_Name], #{partitions := _, routing_keys := _}) -> + {validation_failure, + "Specify --partitions or routing-keys, not both."}; +validate([_Name], #{partitions := Partitions}) when Partitions < 1 -> + {validation_failure, "The partition number must be greater than 0"}; +validate([_Name], Opts) -> + validate_stream_arguments(Opts); +validate(_, _Opts) -> + {validation_failure, too_many_args}. + +validate_stream_arguments(#{max_length_bytes := Value} = Opts) -> + case parse_information_unit(Value) of + error -> + {validation_failure, + "Invalid value for --max-length-bytes, valid example " + "values: 100gb, 50mb"}; + _ -> + validate_stream_arguments(maps:remove(max_length_bytes, Opts)) + end; +validate_stream_arguments(#{max_age := Value} = Opts) -> + case rabbit_date_time:parse_duration(Value) of + {ok, _} -> + validate_stream_arguments(maps:remove(max_age, Opts)); + error -> + {validation_failure, + "Invalid value for --max-age, the value must a " + "ISO 8601 duration, e.g. e.g. PT10M30S for 10 " + "minutes 30 seconds, P5DT8H for 5 days 8 hours."} + end; +validate_stream_arguments(#{stream_max_segment_size_bytes := Value} = + Opts) -> + case parse_information_unit(Value) of + error -> + {validation_failure, + "Invalid value for --stream-max-segment-size-bytes, " + "valid example values: 100gb, 50mb"}; + _ -> + validate_stream_arguments(maps:remove(stream_max_segment_size_bytes, + Opts)) + end; +validate_stream_arguments(#{leader_locator := <<"client-local">>} = + Opts) -> + validate_stream_arguments(maps:remove(leader_locator, Opts)); +validate_stream_arguments(#{leader_locator := <<"random">>} = Opts) -> + validate_stream_arguments(maps:remove(leader_locator, Opts)); +validate_stream_arguments(#{leader_locator := <<"least-leaders">>} = + Opts) -> + validate_stream_arguments(maps:remove(leader_locator, Opts)); +validate_stream_arguments(#{leader_locator := _}) -> + {validation_failure, + "Invalid value for --leader-locator, valid values " + "are client-local, random, least-leaders."}; +validate_stream_arguments(#{initial_cluster_size := Value} = Opts) -> + try + case rabbit_data_coercion:to_integer(Value) of + S when S > 0 -> + validate_stream_arguments(maps:remove(initial_cluster_size, + Opts)); + _ -> + {validation_failure, + "Invalid value for --initial-cluster-size, the " + "value must be positive."} + end + catch + error:_ -> + {validation_failure, + "Invalid value for --initial-cluster-size, the " + "value must be a positive integer."} + end; +validate_stream_arguments(_) -> + ok. + +merge_defaults(_Args, #{routing_keys := _V} = Opts) -> + {_Args, maps:merge(#{vhost => <<"/">>}, Opts)}; +merge_defaults(_Args, Opts) -> + {_Args, maps:merge(#{partitions => 3, vhost => <<"/">>}, Opts)}. + +usage() -> + <<"add_super_stream <name> [--vhost <vhost>] [--partition" + "s <partitions>] [--routing-keys <routing-keys>]">>. + +usage_additional() -> + [["<name>", "The name of the super stream."], + ["--vhost <vhost>", "The virtual host the super stream is added to."], + ["--partitions <partitions>", + "The number of partitions, default is 3. Mutually " + "exclusive with --routing-keys."], + ["--routing-keys <routing-keys>", + "Comma-separated list of routing keys. Mutually " + "exclusive with --partitions."], + ["--max-length-bytes <max-length-bytes>", + "The maximum size of partition streams, example " + "values: 20gb, 500mb."], + ["--max-age <max-age>", + "The maximum age of partition stream segments, " + "using the ISO 8601 duration format, e.g. PT10M30S " + "for 10 minutes 30 seconds, P5DT8H for 5 days " + "8 hours."], + ["--stream-max-segment-size-bytes <stream-max-segment-si" + "ze-bytes>", + "The maximum size of partition stream segments, " + "example values: 500mb, 1gb."], + ["--leader-locator <leader-locator>", + "Leader locator strategy for partition streams, " + "possible values are client-local, least-leaders, " + "random."], + ["--initial-cluster-size <initial-cluster-size>", + "The initial cluster size of partition streams."]]. + +usage_doc_guides() -> + [?STREAM_GUIDE_URL]. + +run([SuperStream], + #{node := NodeName, + vhost := VHost, + timeout := Timeout, + partitions := Partitions} = + Opts) -> + Streams = + [list_to_binary(binary_to_list(SuperStream) + ++ "-" + ++ integer_to_list(K)) + || K <- lists:seq(0, Partitions - 1)], + RoutingKeys = + [integer_to_binary(K) || K <- lists:seq(0, Partitions - 1)], + create_super_stream(NodeName, + Timeout, + VHost, + SuperStream, + Streams, + stream_arguments(Opts), + RoutingKeys); +run([SuperStream], + #{node := NodeName, + vhost := VHost, + timeout := Timeout, + routing_keys := RoutingKeysStr} = + Opts) -> + RoutingKeys = + [rabbit_data_coercion:to_binary( + string:strip(K)) + || K + <- string:tokens( + rabbit_data_coercion:to_list(RoutingKeysStr), ",")], + Streams = + [list_to_binary(binary_to_list(SuperStream) + ++ "-" + ++ binary_to_list(K)) + || K <- RoutingKeys], + create_super_stream(NodeName, + Timeout, + VHost, + SuperStream, + Streams, + stream_arguments(Opts), + RoutingKeys). + +stream_arguments(Opts) -> + stream_arguments(#{}, Opts). + +stream_arguments(Acc, Arguments) when map_size(Arguments) =:= 0 -> + Acc; +stream_arguments(Acc, #{max_length_bytes := Value} = Arguments) -> + stream_arguments(maps:put(<<"max-length-bytes">>, + parse_information_unit(Value), Acc), + maps:remove(max_length_bytes, Arguments)); +stream_arguments(Acc, #{max_age := Value} = Arguments) -> + {ok, Duration} = rabbit_date_time:parse_duration(Value), + DurationInSeconds = duration_to_seconds(Duration), + stream_arguments(maps:put(<<"max-age">>, + list_to_binary(integer_to_list(DurationInSeconds) + ++ "s"), + Acc), + maps:remove(max_age, Arguments)); +stream_arguments(Acc, + #{stream_max_segment_size_bytes := Value} = Arguments) -> + stream_arguments(maps:put(<<"stream-max-segment-size-bytes">>, + parse_information_unit(Value), Acc), + maps:remove(stream_max_segment_size_bytes, Arguments)); +stream_arguments(Acc, #{initial_cluster_size := Value} = Arguments) -> + stream_arguments(maps:put(<<"initial-cluster-size">>, + rabbit_data_coercion:to_binary(Value), Acc), + maps:remove(initial_cluster_size, Arguments)); +stream_arguments(Acc, #{leader_locator := Value} = Arguments) -> + stream_arguments(maps:put(<<"queue-leader-locator">>, Value, Acc), + maps:remove(leader_locator, Arguments)); +stream_arguments(ArgumentsAcc, _Arguments) -> + ArgumentsAcc. + +duration_to_seconds([{sign, _}, + {years, Y}, + {months, M}, + {days, D}, + {hours, H}, + {minutes, Mn}, + {seconds, S}]) -> + Y * 365 * 86400 + M * 30 * 86400 + D * 86400 + H * 3600 + Mn * 60 + S. + +create_super_stream(NodeName, + Timeout, + VHost, + SuperStream, + Streams, + Arguments, + RoutingKeys) -> + case rabbit_misc:rpc_call(NodeName, + rabbit_stream_manager, + create_super_stream, + [VHost, + SuperStream, + Streams, + Arguments, + RoutingKeys, + cli_acting_user()], + Timeout) + of + ok -> + {ok, + rabbit_misc:format("Super stream ~s has been created", + [SuperStream])}; + Error -> + Error + end. + +banner(_, _) -> + <<"Adding a super stream (experimental feature)...">>. + +output({error, Msg}, _Opts) -> + {error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg}; +output({ok, Msg}, _Opts) -> + {ok, Msg}. + +cli_acting_user() -> + 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user(). + +parse_information_unit(Value) -> + case rabbit_resource_monitor_misc:parse_information_unit(Value) of + {ok, R} -> + integer_to_binary(R); + {error, _} -> + error + end. diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl new file mode 100644 index 0000000000..0a2f0f785e --- /dev/null +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl @@ -0,0 +1,97 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved. + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand'). + +-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1}, + {'Elixir.RabbitMQ.CLI.Core.Helpers', cli_acting_user, 0}, + {'Elixir.RabbitMQ.CLI.Core.ExitCodes', exit_software, 0}]). + +-export([scopes/0, + usage/0, + usage_additional/0, + usage_doc_guides/0, + banner/2, + validate/2, + merge_defaults/2, + run/2, + output/2, + description/0, + help_section/0]). + +scopes() -> + [ctl, streams]. + +description() -> + <<"Delete a super stream (experimental feature)">>. + +help_section() -> + {plugin, stream}. + +validate([], _Opts) -> + {validation_failure, not_enough_args}; +validate([_Name], _Opts) -> + ok; +validate(_, _Opts) -> + {validation_failure, too_many_args}. + +merge_defaults(_Args, Opts) -> + {_Args, maps:merge(#{vhost => <<"/">>}, Opts)}. + +usage() -> + <<"delete_super_stream <name> [--vhost <vhost>]">>. + +usage_additional() -> + [["<name>", "The name of the super stream to delete."], + ["--vhost <vhost>", "The virtual host of the super stream."]]. + +usage_doc_guides() -> + [?STREAM_GUIDE_URL]. + +run([SuperStream], + #{node := NodeName, + vhost := VHost, + timeout := Timeout}) -> + delete_super_stream(NodeName, Timeout, VHost, SuperStream). + +delete_super_stream(NodeName, Timeout, VHost, SuperStream) -> + case rabbit_misc:rpc_call(NodeName, + rabbit_stream_manager, + delete_super_stream, + [VHost, SuperStream, cli_acting_user()], + Timeout) + of + ok -> + {ok, + rabbit_misc:format("Super stream ~s has been deleted", + [SuperStream])}; + Error -> + Error + end. + +banner(_, _) -> + <<"Deleting a super stream (experimental feature)...">>. + +output({error, Msg}, _Opts) -> + {error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg}; +output({ok, Msg}, _Opts) -> + {ok, Msg}. + +cli_acting_user() -> + 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user(). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 67f3be2214..3ff764a808 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit/include/amqqueue.hrl"). %% API -export([init/1, @@ -28,6 +29,8 @@ -export([start_link/1, create/4, delete/3, + create_super_stream/6, + delete_super_stream/3, lookup_leader/2, lookup_local_member/2, topology/2, @@ -56,6 +59,34 @@ create(VirtualHost, Reference, Arguments, Username) -> delete(VirtualHost, Reference, Username) -> gen_server:call(?MODULE, {delete, VirtualHost, Reference, Username}). +-spec create_super_stream(binary(), + binary(), + [binary()], + #{binary() => binary()}, + [binary()], + binary()) -> + ok | {error, term()}. +create_super_stream(VirtualHost, + Name, + Partitions, + Arguments, + RoutingKeys, + Username) -> + gen_server:call(?MODULE, + {create_super_stream, + VirtualHost, + Name, + Partitions, + Arguments, + RoutingKeys, + Username}). + +-spec delete_super_stream(binary(), binary(), binary()) -> + ok | {error, term()}. +delete_super_stream(VirtualHost, Name, Username) -> + gen_server:call(?MODULE, + {delete_super_stream, VirtualHost, Name, Username}). + -spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found. lookup_leader(VirtualHost, Stream) -> gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}). @@ -149,100 +180,107 @@ validate_stream_queue_arguments([_ | T]) -> handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State) -> - Name = - #resource{virtual_host = VirtualHost, - kind = queue, - name = Reference}, - StreamQueueArguments = stream_queue_arguments(Arguments), - case validate_stream_queue_arguments(StreamQueueArguments) of - ok -> - Q0 = amqqueue:new(Name, - none, - true, - false, - none, - StreamQueueArguments, - VirtualHost, - #{user => Username}, - rabbit_stream_queue), - try - QueueLookup = - rabbit_amqqueue:with(Name, - fun(Q) -> - ok = - rabbit_amqqueue:assert_equivalence(Q, - true, - false, - StreamQueueArguments, - none) - end), - - case QueueLookup of - ok -> - {reply, {error, reference_already_exists}, State}; - {error, not_found} -> - try - case rabbit_queue_type:declare(Q0, node()) of - {new, Q} -> - {reply, {ok, amqqueue:get_type_state(Q)}, - State}; - {existing, _} -> - {reply, {error, reference_already_exists}, - State}; - {error, Err} -> - rabbit_log:warning("Error while creating ~p stream, ~p", - [Reference, Err]), - {reply, {error, internal_error}, State} - end - catch - exit:Error -> - rabbit_log:error("Error while creating ~p stream, ~p", - [Reference, Error]), - {reply, {error, internal_error}, State} - end; - {error, {absent, _, Reason}} -> - rabbit_log:error("Error while creating ~p stream, ~p", - [Reference, Reason]), - {reply, {error, internal_error}, State} - end - catch - exit:ExitError -> - % likely to be a problem of inequivalent args on an existing stream - rabbit_log:error("Error while creating ~p stream: ~p", - [Reference, ExitError]), - {reply, {error, validation_failed}, State} - end; - error -> - {reply, {error, validation_failed}, State} - end; + {reply, create_stream(VirtualHost, Reference, Arguments, Username), + State}; handle_call({delete, VirtualHost, Reference, Username}, _From, State) -> - Name = - #resource{virtual_host = VirtualHost, - kind = queue, - name = Reference}, - rabbit_log:debug("Trying to delete stream ~p", [Reference]), - case rabbit_amqqueue:lookup(Name) of - {ok, Q} -> - rabbit_log:debug("Found queue record ~p, checking if it is a stream", - [Reference]), - case is_stream_queue(Q) of - true -> - rabbit_log:debug("Queue record ~p is a stream, trying to delete it", - [Reference]), - {ok, _} = - rabbit_stream_queue:delete(Q, false, false, Username), - rabbit_log:debug("Stream ~p deleted", [Reference]), - {reply, {ok, deleted}, State}; - _ -> - rabbit_log:debug("Queue record ~p is NOT a stream, returning error", - [Reference]), - {reply, {error, reference_not_found}, State} - end; - {error, not_found} -> - rabbit_log:debug("Stream ~p not found, cannot delete it", - [Reference]), - {reply, {error, reference_not_found}, State} + {reply, delete_stream(VirtualHost, Reference, Username), State}; +handle_call({create_super_stream, + VirtualHost, + Name, + Partitions, + Arguments, + RoutingKeys, + Username}, + _From, State) -> + case validate_super_stream_creation(VirtualHost, Name, Partitions) of + {error, Reason} -> + {reply, {error, Reason}, State}; + ok -> + case declare_super_stream_exchange(VirtualHost, Name, Username) of + ok -> + RollbackOperations = + [fun() -> + delete_super_stream_exchange(VirtualHost, Name, + Username) + end], + QueueCreationsResult = + lists:foldl(fun (Partition, {ok, RollbackOps}) -> + case create_stream(VirtualHost, + Partition, + Arguments, + Username) + of + {ok, _} -> + {ok, + [fun() -> + delete_stream(VirtualHost, + Partition, + Username) + end] + ++ RollbackOps}; + {error, Reason} -> + {{error, Reason}, + RollbackOps} + end; + (_, + {{error, _Reason}, _RollbackOps} = + Acc) -> + Acc + end, + {ok, RollbackOperations}, Partitions), + case QueueCreationsResult of + {ok, RollbackOps} -> + BindingsResult = + add_super_stream_bindings(VirtualHost, + Name, + Partitions, + RoutingKeys, + Username), + case BindingsResult of + ok -> + {reply, ok, State}; + Error -> + [Fun() || Fun <- RollbackOps], + {reply, Error, State} + end; + {{error, Reason}, RollbackOps} -> + [Fun() || Fun <- RollbackOps], + {reply, {error, Reason}, State} + end; + {error, Msg} -> + {reply, {error, Msg}, State} + end + end; +handle_call({delete_super_stream, VirtualHost, SuperStream, Username}, + _From, State) -> + case super_stream_partitions(VirtualHost, SuperStream) of + {ok, Partitions} -> + case delete_super_stream_exchange(VirtualHost, SuperStream, + Username) + of + ok -> + ok; + {error, Error} -> + rabbit_log:warning("Error while deleting super stream exchange ~p, ~p", + [SuperStream, Error]), + ok + end, + [begin + case delete_stream(VirtualHost, Stream, Username) of + {ok, deleted} -> + ok; + {error, Err} -> + rabbit_log:warning("Error while delete partition ~p of super stream " + "~p, ~p", + [Stream, SuperStream, Err]), + ok + end + end + || Stream <- Partitions], + {reply, ok, State}; + {error, Error} -> + {reply, {error, Error}, State} end; handle_call({lookup_leader, VirtualHost, Stream}, _From, State) -> Name = @@ -382,31 +420,7 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, end, {reply, Res, State}; handle_call({partitions, VirtualHost, SuperStream}, _From, State) -> - ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), - Res = try - rabbit_exchange:lookup_or_die(ExchangeName), - UnorderedBindings = - [Binding - || Binding = #binding{destination = D} - <- rabbit_binding:list_for_source(ExchangeName), - is_resource_stream_queue(D)], - OrderedBindings = - rabbit_stream_utils:sort_partitions(UnorderedBindings), - {ok, - lists:foldl(fun (#binding{destination = - #resource{kind = queue, name = Q}}, - Acc) -> - Acc ++ [Q]; - (_Binding, Acc) -> - Acc - end, - [], OrderedBindings)} - catch - exit:Error -> - rabbit_log:error("Error while looking up exchange ~p, ~p", - [ExchangeName, Error]), - {error, stream_not_found} - end, + Res = super_stream_partitions(VirtualHost, SuperStream), {reply, Res, State}; handle_call(which_children, _From, State) -> {reply, [], State}. @@ -418,6 +432,339 @@ handle_info(Info, State) -> rabbit_log:info("Received info ~p", [Info]), {noreply, State}. +create_stream(VirtualHost, Reference, Arguments, Username) -> + Name = + #resource{virtual_host = VirtualHost, + kind = queue, + name = Reference}, + StreamQueueArguments = stream_queue_arguments(Arguments), + case validate_stream_queue_arguments(StreamQueueArguments) of + ok -> + Q0 = amqqueue:new(Name, + none, + true, + false, + none, + StreamQueueArguments, + VirtualHost, + #{user => Username}, + rabbit_stream_queue), + try + QueueLookup = + rabbit_amqqueue:with(Name, + fun(Q) -> + ok = + rabbit_amqqueue:assert_equivalence(Q, + true, + false, + StreamQueueArguments, + none) + end), + + case QueueLookup of + ok -> + {error, reference_already_exists}; + {error, not_found} -> + try + case rabbit_queue_type:declare(Q0, node()) of + {new, Q} -> + {ok, amqqueue:get_type_state(Q)}; + {existing, _} -> + {error, reference_already_exists}; + {error, Err} -> + rabbit_log:warning("Error while creating ~p stream, ~p", + [Reference, Err]), + {error, internal_error} + end + catch + exit:Error -> + rabbit_log:error("Error while creating ~p stream, ~p", + [Reference, Error]), + {error, internal_error} + end; + {error, {absent, _, Reason}} -> + rabbit_log:error("Error while creating ~p stream, ~p", + [Reference, Reason]), + {error, internal_error} + end + catch + exit:ExitError -> + % likely to be a problem of inequivalent args on an existing stream + rabbit_log:error("Error while creating ~p stream: ~p", + [Reference, ExitError]), + {error, validation_failed} + end; + error -> + {error, validation_failed} + end. + +delete_stream(VirtualHost, Reference, Username) -> + Name = + #resource{virtual_host = VirtualHost, + kind = queue, + name = Reference}, + rabbit_log:debug("Trying to delete stream ~p", [Reference]), + case rabbit_amqqueue:lookup(Name) of + {ok, Q} -> + rabbit_log:debug("Found queue record ~p, checking if it is a stream", + [Reference]), + case is_stream_queue(Q) of + true -> + rabbit_log:debug("Queue record ~p is a stream, trying to delete it", + [Reference]), + {ok, _} = + rabbit_stream_queue:delete(Q, false, false, Username), + rabbit_log:debug("Stream ~p deleted", [Reference]), + {ok, deleted}; + _ -> + rabbit_log:debug("Queue record ~p is NOT a stream, returning error", + [Reference]), + {error, reference_not_found} + end; + {error, not_found} -> + rabbit_log:debug("Stream ~p not found, cannot delete it", + [Reference]), + {error, reference_not_found} + end. + +super_stream_partitions(VirtualHost, SuperStream) -> + ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), + try + rabbit_exchange:lookup_or_die(ExchangeName), + UnorderedBindings = + [Binding + || Binding = #binding{destination = D} + <- rabbit_binding:list_for_source(ExchangeName), + is_resource_stream_queue(D)], + OrderedBindings = + rabbit_stream_utils:sort_partitions(UnorderedBindings), + {ok, + lists:foldl(fun (#binding{destination = + #resource{kind = queue, name = Q}}, + Acc) -> + Acc ++ [Q]; + (_Binding, Acc) -> + Acc + end, + [], OrderedBindings)} + catch + exit:Error -> + rabbit_log:error("Error while looking up exchange ~p, ~p", + [ExchangeName, Error]), + {error, stream_not_found} + end. + +validate_super_stream_creation(VirtualHost, Name, Partitions) -> + case exchange_exists(VirtualHost, Name) of + {error, validation_failed} -> + {error, + {validation_failed, + rabbit_misc:format("~s is not a correct name for a super stream", + [Name])}}; + {ok, true} -> + {error, + {reference_already_exists, + rabbit_misc:format("there is already an exchange named ~s", + [Name])}}; + {ok, false} -> + case check_already_existing_queue(VirtualHost, Partitions) of + {error, Reason} -> + {error, Reason}; + ok -> + ok + end + end. + +exchange_exists(VirtualHost, Name) -> + case rabbit_stream_utils:enforce_correct_name(Name) of + {ok, CorrectName} -> + ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName), + case rabbit_exchange:lookup(ExchangeName) of + {ok, _} -> + {ok, true}; + {error, not_found} -> + {ok, false} + end; + error -> + {error, validation_failed} + end. + +queue_exists(VirtualHost, Name) -> + case rabbit_stream_utils:enforce_correct_name(Name) of + {ok, CorrectName} -> + QueueName = rabbit_misc:r(VirtualHost, queue, CorrectName), + case rabbit_amqqueue:lookup(QueueName) of + {ok, _} -> + {ok, true}; + {error, not_found} -> + {ok, false} + end; + error -> + {error, validation_failed} + end. + +check_already_existing_queue(VirtualHost, Queues) -> + check_already_existing_queue0(VirtualHost, Queues, undefined). + +check_already_existing_queue0(_VirtualHost, [], undefined) -> + ok; +check_already_existing_queue0(VirtualHost, [Q | T], _Error) -> + case queue_exists(VirtualHost, Q) of + {ok, false} -> + check_already_existing_queue0(VirtualHost, T, undefined); + {ok, true} -> + {error, + {reference_already_exists, + rabbit_misc:format("there is already a queue named ~s", [Q])}}; + {error, validation_failed} -> + {error, + {validation_failed, + rabbit_misc:format("~s is not a correct name for a queue", [Q])}} + end. + +declare_super_stream_exchange(VirtualHost, Name, Username) -> + case rabbit_stream_utils:enforce_correct_name(Name) of + {ok, CorrectName} -> + Args = + rabbit_misc:set_table_value([], + <<"x-super-stream">>, + bool, + true), + CheckedType = rabbit_exchange:check_type(<<"direct">>), + ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName), + X = case rabbit_exchange:lookup(ExchangeName) of + {ok, FoundX} -> + FoundX; + {error, not_found} -> + rabbit_exchange:declare(ExchangeName, + CheckedType, + true, + false, + false, + Args, + Username) + end, + try + ok = + rabbit_exchange:assert_equivalence(X, + CheckedType, + true, + false, + false, + Args) + catch + exit:ExitError -> + % likely to be a problem of inequivalent args on an existing stream + rabbit_log:error("Error while creating ~p super stream exchange: ~p", + [Name, ExitError]), + {error, validation_failed} + end; + error -> + {error, validation_failed} + end. + +add_super_stream_bindings(VirtualHost, + Name, + Partitions, + RoutingKeys, + Username) -> + PartitionsRoutingKeys = lists:zip(Partitions, RoutingKeys), + BindingsResult = + lists:foldl(fun ({Partition, RoutingKey}, {ok, Order}) -> + case add_super_stream_binding(VirtualHost, + Name, + Partition, + RoutingKey, + Order, + Username) + of + ok -> + {ok, Order + 1}; + {error, Reason} -> + {{error, Reason}, 0} + end; + (_, {{error, _Reason}, _Order} = Acc) -> + Acc + end, + {ok, 0}, PartitionsRoutingKeys), + case BindingsResult of + {ok, _} -> + ok; + {{error, Reason}, _} -> + {error, Reason} + end. + +add_super_stream_binding(VirtualHost, + SuperStream, + Partition, + RoutingKey, + Order, + Username) -> + {ok, ExchangeNameBin} = + rabbit_stream_utils:enforce_correct_name(SuperStream), + {ok, QueueNameBin} = + rabbit_stream_utils:enforce_correct_name(Partition), + ExchangeName = rabbit_misc:r(VirtualHost, exchange, ExchangeNameBin), + QueueName = rabbit_misc:r(VirtualHost, queue, QueueNameBin), + Pid = self(), + Arguments = + rabbit_misc:set_table_value([], + <<"x-stream-partition-order">>, + long, + Order), + case rabbit_binding:add(#binding{source = ExchangeName, + destination = QueueName, + key = RoutingKey, + args = Arguments}, + fun (_X, Q) when ?is_amqqueue(Q) -> + try + rabbit_amqqueue:check_exclusive_access(Q, + Pid) + catch + exit:Reason -> + {error, Reason} + end; + (_X, #exchange{}) -> + ok + end, + Username) + of + {error, {resources_missing, [{not_found, Name} | _]}} -> + {error, + {stream_not_found, + rabbit_misc:format("stream ~s does not exists", [Name])}}; + {error, {resources_missing, [{absent, Q, _Reason} | _]}} -> + {error, + {stream_not_found, + rabbit_misc:format("stream ~s does not exists (absent)", [Q])}}; + {error, binding_not_found} -> + {error, + {not_found, + rabbit_misc:format("no binding ~s between ~s and ~s", + [RoutingKey, rabbit_misc:rs(ExchangeName), + rabbit_misc:rs(QueueName)])}}; + {error, {binding_invalid, Fmt, Args}} -> + {error, {binding_invalid, rabbit_misc:format(Fmt, Args)}}; + {error, #amqp_error{} = Error} -> + {error, {internal_error, rabbit_misc:format("~p", [Error])}}; + ok -> + ok + end. + +delete_super_stream_exchange(VirtualHost, Name, Username) -> + case rabbit_stream_utils:enforce_correct_name(Name) of + {ok, CorrectName} -> + ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName), + case rabbit_exchange:delete(ExchangeName, false, Username) of + {error, not_found} -> + ok; + ok -> + ok + end; + error -> + {error, validation_failed} + end. + leader_from_members(Q) -> QState = amqqueue:get_type_state(Q), #{name := StreamName} = QState, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 91c0df8be0..ede34d95c7 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2044,7 +2044,7 @@ handle_frame_post_auth(Transport, State, {request, CorrelationId, {create_stream, Stream, Arguments}}) -> - case rabbit_stream_utils:enforce_correct_stream_name(Stream) of + case rabbit_stream_utils:enforce_correct_name(Stream) of {ok, StreamName} -> case rabbit_stream_utils:check_configure_permitted(#resource{name = StreamName, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 92d0bff8af..37545424c8 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -17,7 +17,7 @@ -module(rabbit_stream_utils). %% API --export([enforce_correct_stream_name/1, +-export([enforce_correct_name/1, write_messages/4, parse_map/2, auth_mechanisms/1, @@ -26,13 +26,14 @@ check_write_permitted/3, check_read_permitted/3, extract_stream_list/2, - sort_partitions/1]). + sort_partitions/1, + strip_cr_lf/1]). -define(MAX_PERMISSION_CACHE_SIZE, 12). -include_lib("rabbit_common/include/rabbit.hrl"). -enforce_correct_stream_name(Name) -> +enforce_correct_name(Name) -> % from rabbit_channel StrippedName = binary:replace(Name, [<<"\n">>, <<"\r">>], <<"">>, [global]), @@ -236,3 +237,6 @@ sort_partitions(Partitions) -> end end, Partitions). + +strip_cr_lf(NameBin) -> + binary:replace(NameBin, [<<"\n">>, <<"\r">>], <<"">>, [global]). diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl index c2652b4ad0..902bfee3c8 100644 --- a/deps/rabbitmq_stream/test/commands_SUITE.erl +++ b/deps/rabbitmq_stream/test/commands_SUITE.erl @@ -23,10 +23,16 @@ 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand'). -define(COMMAND_LIST_PUBLISHERS, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand'). +-define(COMMAND_ADD_SUPER_STREAM, + 'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand'). +-define(COMMAND_DELETE_SUPER_STREAM, + 'Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand'). all() -> - [{group, list_connections}, {group, list_consumers}, - {group, list_publishers}]. + [{group, list_connections}, + {group, list_consumers}, + {group, list_publishers}, + {group, super_streams}]. groups() -> [{list_connections, [], @@ -35,7 +41,13 @@ groups() -> {list_consumers, [], [list_consumers_merge_defaults, list_consumers_run]}, {list_publishers, [], - [list_publishers_merge_defaults, list_publishers_run]}]. + [list_publishers_merge_defaults, list_publishers_run]}, + {super_streams, [], + [add_super_stream_merge_defaults, + add_super_stream_validate, + delete_super_stream_merge_defaults, + delete_super_stream_validate, + add_delete_super_stream_run]}]. init_per_suite(Config) -> case rabbit_ct_helpers:is_mixed_versions() of @@ -309,6 +321,174 @@ list_publishers_run(Config) -> ?awaitMatch(0, publisher_count(Config), ?WAIT), ok. +add_super_stream_merge_defaults(_Config) -> + ?assertMatch({[<<"super-stream">>], + #{partitions := 3, vhost := <<"/">>}}, + ?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>], + #{})), + + ?assertMatch({[<<"super-stream">>], + #{partitions := 5, vhost := <<"/">>}}, + ?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>], + #{partitions => 5})), + + DefaultWithRoutingKeys = + ?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>], + #{routing_keys => + <<"amer,emea,apac">>}), + ?assertMatch({[<<"super-stream">>], + #{routing_keys := <<"amer,emea,apac">>, vhost := <<"/">>}}, + DefaultWithRoutingKeys), + + {_, Opts} = DefaultWithRoutingKeys, + ?assertEqual(false, maps:is_key(partitions, Opts)). + +add_super_stream_validate(_Config) -> + ?assertMatch({validation_failure, not_enough_args}, + ?COMMAND_ADD_SUPER_STREAM:validate([], #{})), + ?assertMatch({validation_failure, too_many_args}, + ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>, <<"b">>], #{})), + ?assertMatch({validation_failure, _}, + ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], + #{partitions => 1, + routing_keys => + <<"a,b,c">>})), + ?assertMatch({validation_failure, _}, + ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], + #{partitions => 0})), + ?assertEqual(ok, + ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], + #{partitions => 5})), + ?assertEqual(ok, + ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], + #{routing_keys => + <<"a,b,c">>})), + + [case Expected of + ok -> + ?assertEqual(ok, + ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts)); + error -> + ?assertMatch({validation_failure, _}, + ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], Opts)) + end + || {Opts, Expected} + <- [{#{max_length_bytes => 1000}, ok}, + {#{max_length_bytes => <<"1000">>}, ok}, + {#{max_length_bytes => <<"100gb">>}, ok}, + {#{max_length_bytes => <<"50mb">>}, ok}, + {#{max_length_bytes => <<"50bm">>}, error}, + {#{max_age => <<"PT10M">>}, ok}, + {#{max_age => <<"P5DT8H">>}, ok}, + {#{max_age => <<"foo">>}, error}, + {#{stream_max_segment_size_bytes => 1000}, ok}, + {#{stream_max_segment_size_bytes => <<"1000">>}, ok}, + {#{stream_max_segment_size_bytes => <<"100gb">>}, ok}, + {#{stream_max_segment_size_bytes => <<"50mb">>}, ok}, + {#{stream_max_segment_size_bytes => <<"50bm">>}, error}, + {#{leader_locator => <<"client-local">>}, ok}, + {#{leader_locator => <<"least-leaders">>}, ok}, + {#{leader_locator => <<"random">>}, ok}, + {#{leader_locator => <<"foo">>}, error}, + {#{initial_cluster_size => <<"1">>}, ok}, + {#{initial_cluster_size => <<"2">>}, ok}, + {#{initial_cluster_size => <<"3">>}, ok}, + {#{initial_cluster_size => <<"0">>}, error}, + {#{initial_cluster_size => <<"-1">>}, error}, + {#{initial_cluster_size => <<"foo">>}, error}]], + ok. + +delete_super_stream_merge_defaults(_Config) -> + ?assertMatch({[<<"super-stream">>], #{vhost := <<"/">>}}, + ?COMMAND_DELETE_SUPER_STREAM:merge_defaults([<<"super-stream">>], + #{})), + ok. + +delete_super_stream_validate(_Config) -> + ?assertMatch({validation_failure, not_enough_args}, + ?COMMAND_DELETE_SUPER_STREAM:validate([], #{})), + ?assertMatch({validation_failure, too_many_args}, + ?COMMAND_DELETE_SUPER_STREAM:validate([<<"a">>, <<"b">>], + #{})), + ?assertEqual(ok, ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], #{})), + ok. + +add_delete_super_stream_run(Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Opts = + #{node => Node, + timeout => 10000, + vhost => <<"/">>}, + + % with number of partitions + ?assertMatch({ok, _}, + ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], + maps:merge(#{partitions => 3}, + Opts))), + ?assertEqual({ok, + [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, + partitions(Config, <<"invoices">>)), + ?assertMatch({ok, _}, + ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + ?assertEqual({error, stream_not_found}, + partitions(Config, <<"invoices">>)), + + % with routing keys + ?assertMatch({ok, _}, + ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], + maps:merge(#{routing_keys => + <<" amer,emea , apac">>}, + Opts))), + ?assertEqual({ok, + [<<"invoices-amer">>, <<"invoices-emea">>, + <<"invoices-apac">>]}, + partitions(Config, <<"invoices">>)), + ?assertMatch({ok, _}, + ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + ?assertEqual({error, stream_not_found}, + partitions(Config, <<"invoices">>)), + + % with arguments + ExtraOptions = + #{partitions => 3, + max_length_bytes => <<"50mb">>, + max_age => <<"PT10M">>, + stream_max_segment_size_bytes => <<"1mb">>, + leader_locator => <<"random">>, + initial_cluster_size => <<"1">>}, + + ?assertMatch({ok, _}, + ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], + maps:merge(ExtraOptions, Opts))), + + {ok, Q} = queue_lookup(Config, <<"invoices-0">>), + Args = amqqueue:get_arguments(Q), + ?assertMatch({_, <<"random">>}, + rabbit_misc:table_lookup(Args, <<"x-queue-leader-locator">>)), + ?assertMatch({_, 1}, + rabbit_misc:table_lookup(Args, <<"x-initial-cluster-size">>)), + ?assertMatch({_, 1000000}, + rabbit_misc:table_lookup(Args, + <<"x-stream-max-segment-size-bytes">>)), + ?assertMatch({_, <<"600s">>}, + rabbit_misc:table_lookup(Args, <<"x-max-age">>)), + ?assertMatch({_, 50000000}, + rabbit_misc:table_lookup(Args, <<"x-max-length-bytes">>)), + ?assertMatch({_, <<"stream">>}, + rabbit_misc:table_lookup(Args, <<"x-queue-type">>)), + + ?assertMatch({ok, _}, + ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + + ok. + +partitions(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + partitions, + [<<"/">>, Name]). + create_stream(S, Stream, C0) -> rabbit_stream_SUITE:test_create_stream(gen_tcp, S, Stream, C0). @@ -384,3 +564,11 @@ amqp_params(network, _, Port) -> #amqp_params_network{port = Port}; amqp_params(direct, Node, _) -> #amqp_params_direct{node = Node}. + +queue_lookup(Config, Q) -> + QueueName = rabbit_misc:r(<<"/">>, queue, Q), + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_amqqueue, + lookup, + [QueueName]). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl new file mode 100644 index 0000000000..b47a954a95 --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -0,0 +1,149 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stream_manager_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-compile(export_all). + +all() -> + [{group, non_parallel_tests}]. + +groups() -> + [{non_parallel_tests, [], [manage_super_stream]}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + Config. + +end_per_suite(Config) -> + Config. + +init_per_group(_, Config) -> + Config1 = + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]), + Config2 = + rabbit_ct_helpers:set_config(Config1, + {rabbitmq_ct_tls_verify, verify_none}), + Config3 = + rabbit_ct_helpers:set_config(Config2, {rabbitmq_stream, verify_none}), + rabbit_ct_helpers:run_setup_steps(Config3, + [fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {rabbit, + [{core_metrics_gc_interval, + 1000}]}) + end, + fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {rabbitmq_stream, + [{connection_negotiation_step_timeout, + 500}]}) + end] + ++ rabbit_ct_broker_helpers:setup_steps()). + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +manage_super_stream(Config) -> + % create super stream + ?assertEqual(ok, + create_super_stream(Config, + <<"invoices">>, + [<<"invoices-0">>, <<"invoices-1">>, + <<"invoices-2">>], + [<<"0">>, <<"1">>, <<"2">>])), + % get the correct partitions + ?assertEqual({ok, + [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, + partitions(Config, <<"invoices">>)), + + [?assertEqual({ok, [Partition]}, + route(Config, RoutingKey, <<"invoices">>)) + || {Partition, RoutingKey} + <- [{<<"invoices-0">>, <<"0">>}, {<<"invoices-1">>, <<"1">>}, + {<<"invoices-2">>, <<"2">>}]], + + % get an error if trying to re-create it + ?assertMatch({error, _}, + create_super_stream(Config, + <<"invoices">>, + [<<"invoices-0">>, <<"invoices-1">>, + <<"invoices-2">>], + [<<"0">>, <<"1">>, <<"2">>])), + + % can delete it + ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), + + % create a stream with the same name as a potential partition + ?assertMatch({ok, _}, create_stream(Config, <<"invoices-1">>)), + + % cannot create the super stream because a partition already exists + ?assertMatch({error, _}, + create_super_stream(Config, + <<"invoices">>, + [<<"invoices-0">>, <<"invoices-1">>, + <<"invoices-2">>], + [<<"0">>, <<"1">>, <<"2">>])), + + ok. + +create_super_stream(Config, Name, Partitions, RKs) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + create_super_stream, + [<<"/">>, + Name, + Partitions, + #{}, + RKs, + <<"guest">>]). + +delete_super_stream(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + delete_super_stream, + [<<"/">>, Name, <<"guest">>]). + +create_stream(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + create, + [<<"/">>, Name, [], <<"guest">>]). + +partitions(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + partitions, + [<<"/">>, Name]). + +route(Config, RoutingKey, SuperStream) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + route, + [RoutingKey, <<"/">>, SuperStream]). |