summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-09-23 17:00:18 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-10-11 16:50:01 +0200
commit147659093f53b97f146c872d9f0774801a2915bb (patch)
tree01710e1fdfb5bfb6313b0bf662de92abbf43bd50
parent49a47586b075c19e5af521a3f1d333ed8fab4654 (diff)
downloadrabbitmq-server-git-147659093f53b97f146c872d9f0774801a2915bb.tar.gz
Add functions to create/delete super stream in manager
-rw-r--r--deps/rabbitmq_stream/BUILD.bazel4
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl581
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl2
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_utils.erl10
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl149
5 files changed, 625 insertions, 121 deletions
diff --git a/deps/rabbitmq_stream/BUILD.bazel b/deps/rabbitmq_stream/BUILD.bazel
index 91cf7a1e40..6f7cb0b034 100644
--- a/deps/rabbitmq_stream/BUILD.bazel
+++ b/deps/rabbitmq_stream/BUILD.bazel
@@ -86,6 +86,10 @@ suites = [
),
rabbitmq_integration_suite(
PACKAGE,
+ name = "rabbit_stream_manager_SUITE",
+ ),
+ rabbitmq_integration_suite(
+ PACKAGE,
name = "rabbit_stream_SUITE",
deps = [
"//deps/rabbit:bazel_erlang_lib",
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 67f3be2214..a2b26848e8 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -19,6 +19,7 @@
-behaviour(gen_server).
-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit/include/amqqueue.hrl").
%% API
-export([init/1,
@@ -28,6 +29,8 @@
-export([start_link/1,
create/4,
delete/3,
+ create_super_stream/6,
+ delete_super_stream/3,
lookup_leader/2,
lookup_local_member/2,
topology/2,
@@ -56,6 +59,34 @@ create(VirtualHost, Reference, Arguments, Username) ->
delete(VirtualHost, Reference, Username) ->
gen_server:call(?MODULE, {delete, VirtualHost, Reference, Username}).
+-spec create_super_stream(binary(),
+ binary(),
+ [binary()],
+ #{binary() => binary()},
+ [binary()],
+ binary()) ->
+ ok | {error, term()}.
+create_super_stream(VirtualHost,
+ Name,
+ Partitions,
+ Arguments,
+ RoutingKeys,
+ Username) ->
+ gen_server:call(?MODULE,
+ {create_super_stream,
+ VirtualHost,
+ Name,
+ Partitions,
+ Arguments,
+ RoutingKeys,
+ Username}).
+
+-spec delete_super_stream(binary(), binary(), binary()) ->
+ ok | {error, term()}.
+delete_super_stream(VirtualHost, Name, Username) ->
+ gen_server:call(?MODULE,
+ {delete_super_stream, VirtualHost, Name, Username}).
+
-spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found.
lookup_leader(VirtualHost, Stream) ->
gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}).
@@ -149,100 +180,107 @@ validate_stream_queue_arguments([_ | T]) ->
handle_call({create, VirtualHost, Reference, Arguments, Username},
_From, State) ->
- Name =
- #resource{virtual_host = VirtualHost,
- kind = queue,
- name = Reference},
- StreamQueueArguments = stream_queue_arguments(Arguments),
- case validate_stream_queue_arguments(StreamQueueArguments) of
- ok ->
- Q0 = amqqueue:new(Name,
- none,
- true,
- false,
- none,
- StreamQueueArguments,
- VirtualHost,
- #{user => Username},
- rabbit_stream_queue),
- try
- QueueLookup =
- rabbit_amqqueue:with(Name,
- fun(Q) ->
- ok =
- rabbit_amqqueue:assert_equivalence(Q,
- true,
- false,
- StreamQueueArguments,
- none)
- end),
-
- case QueueLookup of
- ok ->
- {reply, {error, reference_already_exists}, State};
- {error, not_found} ->
- try
- case rabbit_queue_type:declare(Q0, node()) of
- {new, Q} ->
- {reply, {ok, amqqueue:get_type_state(Q)},
- State};
- {existing, _} ->
- {reply, {error, reference_already_exists},
- State};
- {error, Err} ->
- rabbit_log:warning("Error while creating ~p stream, ~p",
- [Reference, Err]),
- {reply, {error, internal_error}, State}
- end
- catch
- exit:Error ->
- rabbit_log:error("Error while creating ~p stream, ~p",
- [Reference, Error]),
- {reply, {error, internal_error}, State}
- end;
- {error, {absent, _, Reason}} ->
- rabbit_log:error("Error while creating ~p stream, ~p",
- [Reference, Reason]),
- {reply, {error, internal_error}, State}
- end
- catch
- exit:ExitError ->
- % likely to be a problem of inequivalent args on an existing stream
- rabbit_log:error("Error while creating ~p stream: ~p",
- [Reference, ExitError]),
- {reply, {error, validation_failed}, State}
- end;
- error ->
- {reply, {error, validation_failed}, State}
- end;
+ {reply, create_stream(VirtualHost, Reference, Arguments, Username),
+ State};
handle_call({delete, VirtualHost, Reference, Username}, _From,
State) ->
- Name =
- #resource{virtual_host = VirtualHost,
- kind = queue,
- name = Reference},
- rabbit_log:debug("Trying to delete stream ~p", [Reference]),
- case rabbit_amqqueue:lookup(Name) of
- {ok, Q} ->
- rabbit_log:debug("Found queue record ~p, checking if it is a stream",
- [Reference]),
- case is_stream_queue(Q) of
- true ->
- rabbit_log:debug("Queue record ~p is a stream, trying to delete it",
- [Reference]),
- {ok, _} =
- rabbit_stream_queue:delete(Q, false, false, Username),
- rabbit_log:debug("Stream ~p deleted", [Reference]),
- {reply, {ok, deleted}, State};
- _ ->
- rabbit_log:debug("Queue record ~p is NOT a stream, returning error",
- [Reference]),
- {reply, {error, reference_not_found}, State}
- end;
- {error, not_found} ->
- rabbit_log:debug("Stream ~p not found, cannot delete it",
- [Reference]),
- {reply, {error, reference_not_found}, State}
+ {reply, delete_stream(VirtualHost, Reference, Username), State};
+handle_call({create_super_stream,
+ VirtualHost,
+ Name,
+ Partitions,
+ Arguments,
+ RoutingKeys,
+ Username},
+ _From, State) ->
+ case validate_super_stream_creation(VirtualHost, Name, Partitions) of
+ {error, Reason} ->
+ {reply, {error, Reason}, State};
+ ok ->
+ case declare_super_stream_exchange(VirtualHost, Name, Username) of
+ ok ->
+ RollbackOperations =
+ [fun() ->
+ delete_super_stream_exchange(VirtualHost, Name,
+ Username)
+ end],
+ QueueCreationsResult =
+ lists:foldl(fun (Partition, {ok, RollbackOps}) ->
+ case create_stream(VirtualHost,
+ Partition,
+ Arguments,
+ Username)
+ of
+ {ok, _} ->
+ {ok,
+ [fun() ->
+ delete_stream(VirtualHost,
+ Partition,
+ Username)
+ end]
+ ++ RollbackOps};
+ {error, Reason} ->
+ {{error, Reason},
+ RollbackOps}
+ end;
+ (_,
+ {{error, _Reason}, _RollbackOps} =
+ Acc) ->
+ Acc
+ end,
+ {ok, RollbackOperations}, Partitions),
+ case QueueCreationsResult of
+ {ok, RollbackOps} ->
+ BindingsResult =
+ add_super_stream_bindings(VirtualHost,
+ Name,
+ Partitions,
+ RoutingKeys,
+ Username),
+ case BindingsResult of
+ ok ->
+ {reply, ok, State};
+ Error ->
+ [Fun() || Fun <- RollbackOps],
+ {reply, Error, State}
+ end;
+ {{error, Reason}, RollbackOps} ->
+ [Fun() || Fun <- RollbackOps],
+ {reply, {error, Reason}, State}
+ end;
+ {error, Msg} ->
+ {reply, {error, Msg}, State}
+ end
+ end;
+handle_call({delete_super_stream, VirtualHost, SuperStream, Username},
+ _From, State) ->
+ case super_stream_partitions(VirtualHost, SuperStream) of
+ {ok, Partitions} ->
+ case delete_super_stream_exchange(VirtualHost, SuperStream,
+ Username)
+ of
+ ok ->
+ ok;
+ {error, Error} ->
+ rabbit_log:warning("Error while deleting super stream exchange ~p, ~p",
+ [SuperStream, Error]),
+ ok
+ end,
+ [begin
+ case delete_stream(VirtualHost, Stream, Username) of
+ {ok, deleted} ->
+ ok;
+ {error, Err} ->
+ rabbit_log:warning("Error while delete partition ~p of super stream "
+ "~p, ~p",
+ [Stream, SuperStream, Err]),
+ ok
+ end
+ end
+ || Stream <- Partitions],
+ {reply, ok, State};
+ {error, Error} ->
+ {reply, {error, Error}, State}
end;
handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
Name =
@@ -382,31 +420,7 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
end,
{reply, Res, State};
handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
- ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream),
- Res = try
- rabbit_exchange:lookup_or_die(ExchangeName),
- UnorderedBindings =
- [Binding
- || Binding = #binding{destination = D}
- <- rabbit_binding:list_for_source(ExchangeName),
- is_resource_stream_queue(D)],
- OrderedBindings =
- rabbit_stream_utils:sort_partitions(UnorderedBindings),
- {ok,
- lists:foldl(fun (#binding{destination =
- #resource{kind = queue, name = Q}},
- Acc) ->
- Acc ++ [Q];
- (_Binding, Acc) ->
- Acc
- end,
- [], OrderedBindings)}
- catch
- exit:Error ->
- rabbit_log:error("Error while looking up exchange ~p, ~p",
- [ExchangeName, Error]),
- {error, stream_not_found}
- end,
+ Res = super_stream_partitions(VirtualHost, SuperStream),
{reply, Res, State};
handle_call(which_children, _From, State) ->
{reply, [], State}.
@@ -418,6 +432,339 @@ handle_info(Info, State) ->
rabbit_log:info("Received info ~p", [Info]),
{noreply, State}.
+create_stream(VirtualHost, Reference, Arguments, Username) ->
+ Name =
+ #resource{virtual_host = VirtualHost,
+ kind = queue,
+ name = Reference},
+ StreamQueueArguments = stream_queue_arguments(Arguments),
+ case validate_stream_queue_arguments(StreamQueueArguments) of
+ ok ->
+ Q0 = amqqueue:new(Name,
+ none,
+ true,
+ false,
+ none,
+ StreamQueueArguments,
+ VirtualHost,
+ #{user => Username},
+ rabbit_stream_queue),
+ try
+ QueueLookup =
+ rabbit_amqqueue:with(Name,
+ fun(Q) ->
+ ok =
+ rabbit_amqqueue:assert_equivalence(Q,
+ true,
+ false,
+ StreamQueueArguments,
+ none)
+ end),
+
+ case QueueLookup of
+ ok ->
+ {error, reference_already_exists};
+ {error, not_found} ->
+ try
+ case rabbit_queue_type:declare(Q0, node()) of
+ {new, Q} ->
+ {ok, amqqueue:get_type_state(Q)};
+ {existing, _} ->
+ {error, reference_already_exists};
+ {error, Err} ->
+ rabbit_log:warning("Error while creating ~p stream, ~p",
+ [Reference, Err]),
+ {error, internal_error}
+ end
+ catch
+ exit:Error ->
+ rabbit_log:error("Error while creating ~p stream, ~p",
+ [Reference, Error]),
+ {error, internal_error}
+ end;
+ {error, {absent, _, Reason}} ->
+ rabbit_log:error("Error while creating ~p stream, ~p",
+ [Reference, Reason]),
+ {error, internal_error}
+ end
+ catch
+ exit:ExitError ->
+ % likely to be a problem of inequivalent args on an existing stream
+ rabbit_log:error("Error while creating ~p stream: ~p",
+ [Reference, ExitError]),
+ {error, validation_failed}
+ end;
+ error ->
+ {error, validation_failed}
+ end.
+
+delete_stream(VirtualHost, Reference, Username) ->
+ Name =
+ #resource{virtual_host = VirtualHost,
+ kind = queue,
+ name = Reference},
+ rabbit_log:debug("Trying to delete stream ~p", [Reference]),
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, Q} ->
+ rabbit_log:debug("Found queue record ~p, checking if it is a stream",
+ [Reference]),
+ case is_stream_queue(Q) of
+ true ->
+ rabbit_log:debug("Queue record ~p is a stream, trying to delete it",
+ [Reference]),
+ {ok, _} =
+ rabbit_stream_queue:delete(Q, false, false, Username),
+ rabbit_log:debug("Stream ~p deleted", [Reference]),
+ {ok, deleted};
+ _ ->
+ rabbit_log:debug("Queue record ~p is NOT a stream, returning error",
+ [Reference]),
+ {error, reference_not_found}
+ end;
+ {error, not_found} ->
+ rabbit_log:debug("Stream ~p not found, cannot delete it",
+ [Reference]),
+ {error, reference_not_found}
+ end.
+
+super_stream_partitions(VirtualHost, SuperStream) ->
+ ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream),
+ try
+ rabbit_exchange:lookup_or_die(ExchangeName),
+ UnorderedBindings =
+ [Binding
+ || Binding = #binding{destination = D}
+ <- rabbit_binding:list_for_source(ExchangeName),
+ is_resource_stream_queue(D)],
+ OrderedBindings =
+ rabbit_stream_utils:sort_partitions(UnorderedBindings),
+ {ok,
+ lists:foldl(fun (#binding{destination =
+ #resource{kind = queue, name = Q}},
+ Acc) ->
+ Acc ++ [Q];
+ (_Binding, Acc) ->
+ Acc
+ end,
+ [], OrderedBindings)}
+ catch
+ exit:Error ->
+ rabbit_log:error("Error while looking up exchange ~p, ~p",
+ [ExchangeName, Error]),
+ {error, stream_not_found}
+ end.
+
+validate_super_stream_creation(VirtualHost, Name, Partitions) ->
+ case exchange_exists(VirtualHost, Name) of
+ {error, validation_failed} ->
+ {error,
+ {validation_failed,
+ rabbit_misc:format("~p is not a correct name for a super stream",
+ [Name])}};
+ {ok, true} ->
+ {error,
+ {reference_already_exists,
+ rabbit_misc:format("there is already an exchange named ~p",
+ [Name])}};
+ {ok, false} ->
+ case check_already_existing_queue(VirtualHost, Partitions) of
+ {error, Reason} ->
+ {error, Reason};
+ ok ->
+ ok
+ end
+ end.
+
+exchange_exists(VirtualHost, Name) ->
+ case rabbit_stream_utils:enforce_correct_name(Name) of
+ {ok, CorrectName} ->
+ ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName),
+ case rabbit_exchange:lookup(ExchangeName) of
+ {ok, _} ->
+ {ok, true};
+ {error, not_found} ->
+ {ok, false}
+ end;
+ error ->
+ {error, validation_failed}
+ end.
+
+queue_exists(VirtualHost, Name) ->
+ case rabbit_stream_utils:enforce_correct_name(Name) of
+ {ok, CorrectName} ->
+ QueueName = rabbit_misc:r(VirtualHost, queue, CorrectName),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, _} ->
+ {ok, true};
+ {error, not_found} ->
+ {ok, false}
+ end;
+ error ->
+ {error, validation_failed}
+ end.
+
+check_already_existing_queue(VirtualHost, Queues) ->
+ check_already_existing_queue0(VirtualHost, Queues, undefined).
+
+check_already_existing_queue0(_VirtualHost, [], undefined) ->
+ ok;
+check_already_existing_queue0(VirtualHost, [Q | T], _Error) ->
+ case queue_exists(VirtualHost, Q) of
+ {ok, false} ->
+ check_already_existing_queue0(VirtualHost, T, undefined);
+ {ok, true} ->
+ {error,
+ {reference_already_exists,
+ rabbit_misc:format("there is already a queue named ~p", [Q])}};
+ {error, validation_failed} ->
+ {error,
+ {validation_failed,
+ rabbit_misc:format("~p is not a correct name for a queue", [Q])}}
+ end.
+
+declare_super_stream_exchange(VirtualHost, Name, Username) ->
+ case rabbit_stream_utils:enforce_correct_name(Name) of
+ {ok, CorrectName} ->
+ Args =
+ rabbit_misc:set_table_value([],
+ <<"x-super-stream">>,
+ bool,
+ true),
+ CheckedType = rabbit_exchange:check_type(<<"direct">>),
+ ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName),
+ X = case rabbit_exchange:lookup(ExchangeName) of
+ {ok, FoundX} ->
+ FoundX;
+ {error, not_found} ->
+ rabbit_exchange:declare(ExchangeName,
+ CheckedType,
+ true,
+ false,
+ false,
+ Args,
+ Username)
+ end,
+ try
+ ok =
+ rabbit_exchange:assert_equivalence(X,
+ CheckedType,
+ true,
+ false,
+ false,
+ Args)
+ catch
+ exit:ExitError ->
+ % likely to be a problem of inequivalent args on an existing stream
+ rabbit_log:error("Error while creating ~p super stream exchange: ~p",
+ [Name, ExitError]),
+ {error, validation_failed}
+ end;
+ error ->
+ {error, validation_failed}
+ end.
+
+add_super_stream_bindings(VirtualHost,
+ Name,
+ Partitions,
+ RoutingKeys,
+ Username) ->
+ PartitionsRoutingKeys = lists:zip(Partitions, RoutingKeys),
+ BindingsResult =
+ lists:foldl(fun ({Partition, RoutingKey}, {ok, Order}) ->
+ case add_super_stream_binding(VirtualHost,
+ Name,
+ Partition,
+ RoutingKey,
+ Order,
+ Username)
+ of
+ ok ->
+ {ok, Order + 1};
+ {error, Reason} ->
+ {{error, Reason}, 0}
+ end;
+ (_, {{error, _Reason}, _Order} = Acc) ->
+ Acc
+ end,
+ {ok, 0}, PartitionsRoutingKeys),
+ case BindingsResult of
+ {ok, _} ->
+ ok;
+ {{error, Reason}, _} ->
+ {error, Reason}
+ end.
+
+add_super_stream_binding(VirtualHost,
+ SuperStream,
+ Partition,
+ RoutingKey,
+ Order,
+ Username) ->
+ {ok, ExchangeNameBin} =
+ rabbit_stream_utils:enforce_correct_name(SuperStream),
+ {ok, QueueNameBin} =
+ rabbit_stream_utils:enforce_correct_name(Partition),
+ ExchangeName = rabbit_misc:r(VirtualHost, exchange, ExchangeNameBin),
+ QueueName = rabbit_misc:r(VirtualHost, queue, QueueNameBin),
+ Pid = self(),
+ Arguments =
+ rabbit_misc:set_table_value([],
+ <<"x-stream-partition-order">>,
+ long,
+ Order),
+ case rabbit_binding:add(#binding{source = ExchangeName,
+ destination = QueueName,
+ key = RoutingKey,
+ args = Arguments},
+ fun (_X, Q) when ?is_amqqueue(Q) ->
+ try
+ rabbit_amqqueue:check_exclusive_access(Q,
+ Pid)
+ catch
+ exit:Reason ->
+ {error, Reason}
+ end;
+ (_X, #exchange{}) ->
+ ok
+ end,
+ Username)
+ of
+ {error, {resources_missing, [{not_found, Name} | _]}} ->
+ {error,
+ {stream_not_found,
+ rabbit_misc:format("stream ~s does not exists", [Name])}};
+ {error, {resources_missing, [{absent, Q, _Reason} | _]}} ->
+ {error,
+ {stream_not_found,
+ rabbit_misc:format("stream ~p does not exists (absent", [Q])}};
+ {error, binding_not_found} ->
+ {error,
+ {not_found,
+ rabbit_misc:format("no binding ~s between ~s and ~s",
+ [RoutingKey, rabbit_misc:rs(ExchangeName),
+ rabbit_misc:rs(QueueName)])}};
+ {error, {binding_invalid, Fmt, Args}} ->
+ {error, {binding_invalid, rabbit_misc:format(Fmt, Args)}};
+ {error, #amqp_error{} = Error} ->
+ {error, {internal_error, rabbit_misc:format("~p", [Error])}};
+ ok ->
+ ok
+ end.
+
+delete_super_stream_exchange(VirtualHost, Name, Username) ->
+ case rabbit_stream_utils:enforce_correct_name(Name) of
+ {ok, CorrectName} ->
+ ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName),
+ case rabbit_exchange:delete(ExchangeName, false, Username) of
+ {error, not_found} ->
+ ok;
+ ok ->
+ ok
+ end;
+ error ->
+ {error, validation_failed}
+ end.
+
leader_from_members(Q) ->
QState = amqqueue:get_type_state(Q),
#{name := StreamName} = QState,
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 91c0df8be0..ede34d95c7 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -2044,7 +2044,7 @@ handle_frame_post_auth(Transport,
State,
{request, CorrelationId,
{create_stream, Stream, Arguments}}) ->
- case rabbit_stream_utils:enforce_correct_stream_name(Stream) of
+ case rabbit_stream_utils:enforce_correct_name(Stream) of
{ok, StreamName} ->
case rabbit_stream_utils:check_configure_permitted(#resource{name =
StreamName,
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
index 92d0bff8af..37545424c8 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
@@ -17,7 +17,7 @@
-module(rabbit_stream_utils).
%% API
--export([enforce_correct_stream_name/1,
+-export([enforce_correct_name/1,
write_messages/4,
parse_map/2,
auth_mechanisms/1,
@@ -26,13 +26,14 @@
check_write_permitted/3,
check_read_permitted/3,
extract_stream_list/2,
- sort_partitions/1]).
+ sort_partitions/1,
+ strip_cr_lf/1]).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-include_lib("rabbit_common/include/rabbit.hrl").
-enforce_correct_stream_name(Name) ->
+enforce_correct_name(Name) ->
% from rabbit_channel
StrippedName =
binary:replace(Name, [<<"\n">>, <<"\r">>], <<"">>, [global]),
@@ -236,3 +237,6 @@ sort_partitions(Partitions) ->
end
end,
Partitions).
+
+strip_cr_lf(NameBin) ->
+ binary:replace(NameBin, [<<"\n">>, <<"\r">>], <<"">>, [global]).
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
new file mode 100644
index 0000000000..3c036fc1e2
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
@@ -0,0 +1,149 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_manager_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-compile(export_all).
+
+all() ->
+ [{group, non_parallel_tests}].
+
+groups() ->
+ [{non_parallel_tests, [], [manage_super_stream]}].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(Config) ->
+ Config.
+
+init_per_group(_, Config) ->
+ Config1 =
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]),
+ Config2 =
+ rabbit_ct_helpers:set_config(Config1,
+ {rabbitmq_ct_tls_verify, verify_none}),
+ Config3 =
+ rabbit_ct_helpers:set_config(Config2, {rabbitmq_stream, verify_none}),
+ rabbit_ct_helpers:run_setup_steps(Config3,
+ [fun(StepConfig) ->
+ rabbit_ct_helpers:merge_app_env(StepConfig,
+ {rabbit,
+ [{core_metrics_gc_interval,
+ 1000}]})
+ end,
+ fun(StepConfig) ->
+ rabbit_ct_helpers:merge_app_env(StepConfig,
+ {rabbitmq_stream,
+ [{connection_negotiation_step_timeout,
+ 500}]})
+ end]
+ ++ rabbit_ct_broker_helpers:setup_steps()).
+
+end_per_group(_, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+manage_super_stream(Config) ->
+ % create super stream
+ ?assertEqual(ok,
+ create_super_stream(Config,
+ <<"invoices">>,
+ [<<"invoices-0">>, <<"invoices-1">>,
+ <<"invoices-2">>],
+ [<<"0">>, <<"1">>, <<"2">>])),
+ % get the correct partitions
+ ?assertEqual({ok,
+ [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]},
+ partitions(Config, <<"invoices">>)),
+
+ [?assertEqual({ok, [Partition]},
+ route(Config, RoutingKey, <<"invoices">>))
+ || {Partition, RoutingKey}
+ <- [{<<"invoices-0">>, <<"0">>}, {<<"invoices-1">>, <<"1">>},
+ {<<"invoices-2">>, <<"2">>}]],
+
+ % get an error if trying to re-create it
+ ?assertMatch({error, _},
+ create_super_stream(Config,
+ <<"invoices">>,
+ [<<"invoices-0">>, <<"invoices-1">>,
+ <<"invoices-2">>],
+ [<<"0">>, <<"1">>, <<"2">>])),
+
+ % can delete it
+ ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)),
+
+ % create a stream with the same name as a potential partition
+ ?assertMatch({ok, _}, create_stream(Config, <<"invoices-1">>)),
+
+ % cannot create the super stream because a partition already exists
+ ?assertMatch({error, _},
+ create_super_stream(Config,
+ <<"invoices">>,
+ [<<"invoices-0">>, <<"invoices-1">>,
+ <<"invoices-2">>],
+ [<<"0">>, <<"1">>, <<"2">>])),
+
+ ok.
+
+create_super_stream(Config, Name, Partitions, RKs) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ create_super_stream,
+ [<<"/">>,
+ Name,
+ Partitions,
+ [],
+ RKs,
+ <<"guest">>]).
+
+delete_super_stream(Config, Name) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ delete_super_stream,
+ [<<"/">>, Name, <<"guest">>]).
+
+create_stream(Config, Name) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ create,
+ [<<"/">>, Name, [], <<"guest">>]).
+
+partitions(Config, Name) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ partitions,
+ [<<"/">>, Name]).
+
+route(Config, RoutingKey, SuperStream) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ route,
+ [RoutingKey, <<"/">>, SuperStream]).