diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-09-23 17:00:18 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-10-11 16:50:01 +0200 |
commit | 147659093f53b97f146c872d9f0774801a2915bb (patch) | |
tree | 01710e1fdfb5bfb6313b0bf662de92abbf43bd50 | |
parent | 49a47586b075c19e5af521a3f1d333ed8fab4654 (diff) | |
download | rabbitmq-server-git-147659093f53b97f146c872d9f0774801a2915bb.tar.gz |
Add functions to create/delete super stream in manager
-rw-r--r-- | deps/rabbitmq_stream/BUILD.bazel | 4 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 581 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 2 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_utils.erl | 10 | ||||
-rw-r--r-- | deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl | 149 |
5 files changed, 625 insertions, 121 deletions
diff --git a/deps/rabbitmq_stream/BUILD.bazel b/deps/rabbitmq_stream/BUILD.bazel index 91cf7a1e40..6f7cb0b034 100644 --- a/deps/rabbitmq_stream/BUILD.bazel +++ b/deps/rabbitmq_stream/BUILD.bazel @@ -86,6 +86,10 @@ suites = [ ), rabbitmq_integration_suite( PACKAGE, + name = "rabbit_stream_manager_SUITE", + ), + rabbitmq_integration_suite( + PACKAGE, name = "rabbit_stream_SUITE", deps = [ "//deps/rabbit:bazel_erlang_lib", diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 67f3be2214..a2b26848e8 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit/include/amqqueue.hrl"). %% API -export([init/1, @@ -28,6 +29,8 @@ -export([start_link/1, create/4, delete/3, + create_super_stream/6, + delete_super_stream/3, lookup_leader/2, lookup_local_member/2, topology/2, @@ -56,6 +59,34 @@ create(VirtualHost, Reference, Arguments, Username) -> delete(VirtualHost, Reference, Username) -> gen_server:call(?MODULE, {delete, VirtualHost, Reference, Username}). +-spec create_super_stream(binary(), + binary(), + [binary()], + #{binary() => binary()}, + [binary()], + binary()) -> + ok | {error, term()}. +create_super_stream(VirtualHost, + Name, + Partitions, + Arguments, + RoutingKeys, + Username) -> + gen_server:call(?MODULE, + {create_super_stream, + VirtualHost, + Name, + Partitions, + Arguments, + RoutingKeys, + Username}). + +-spec delete_super_stream(binary(), binary(), binary()) -> + ok | {error, term()}. +delete_super_stream(VirtualHost, Name, Username) -> + gen_server:call(?MODULE, + {delete_super_stream, VirtualHost, Name, Username}). + -spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found. lookup_leader(VirtualHost, Stream) -> gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}). @@ -149,100 +180,107 @@ validate_stream_queue_arguments([_ | T]) -> handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State) -> - Name = - #resource{virtual_host = VirtualHost, - kind = queue, - name = Reference}, - StreamQueueArguments = stream_queue_arguments(Arguments), - case validate_stream_queue_arguments(StreamQueueArguments) of - ok -> - Q0 = amqqueue:new(Name, - none, - true, - false, - none, - StreamQueueArguments, - VirtualHost, - #{user => Username}, - rabbit_stream_queue), - try - QueueLookup = - rabbit_amqqueue:with(Name, - fun(Q) -> - ok = - rabbit_amqqueue:assert_equivalence(Q, - true, - false, - StreamQueueArguments, - none) - end), - - case QueueLookup of - ok -> - {reply, {error, reference_already_exists}, State}; - {error, not_found} -> - try - case rabbit_queue_type:declare(Q0, node()) of - {new, Q} -> - {reply, {ok, amqqueue:get_type_state(Q)}, - State}; - {existing, _} -> - {reply, {error, reference_already_exists}, - State}; - {error, Err} -> - rabbit_log:warning("Error while creating ~p stream, ~p", - [Reference, Err]), - {reply, {error, internal_error}, State} - end - catch - exit:Error -> - rabbit_log:error("Error while creating ~p stream, ~p", - [Reference, Error]), - {reply, {error, internal_error}, State} - end; - {error, {absent, _, Reason}} -> - rabbit_log:error("Error while creating ~p stream, ~p", - [Reference, Reason]), - {reply, {error, internal_error}, State} - end - catch - exit:ExitError -> - % likely to be a problem of inequivalent args on an existing stream - rabbit_log:error("Error while creating ~p stream: ~p", - [Reference, ExitError]), - {reply, {error, validation_failed}, State} - end; - error -> - {reply, {error, validation_failed}, State} - end; + {reply, create_stream(VirtualHost, Reference, Arguments, Username), + State}; handle_call({delete, VirtualHost, Reference, Username}, _From, State) -> - Name = - #resource{virtual_host = VirtualHost, - kind = queue, - name = Reference}, - rabbit_log:debug("Trying to delete stream ~p", [Reference]), - case rabbit_amqqueue:lookup(Name) of - {ok, Q} -> - rabbit_log:debug("Found queue record ~p, checking if it is a stream", - [Reference]), - case is_stream_queue(Q) of - true -> - rabbit_log:debug("Queue record ~p is a stream, trying to delete it", - [Reference]), - {ok, _} = - rabbit_stream_queue:delete(Q, false, false, Username), - rabbit_log:debug("Stream ~p deleted", [Reference]), - {reply, {ok, deleted}, State}; - _ -> - rabbit_log:debug("Queue record ~p is NOT a stream, returning error", - [Reference]), - {reply, {error, reference_not_found}, State} - end; - {error, not_found} -> - rabbit_log:debug("Stream ~p not found, cannot delete it", - [Reference]), - {reply, {error, reference_not_found}, State} + {reply, delete_stream(VirtualHost, Reference, Username), State}; +handle_call({create_super_stream, + VirtualHost, + Name, + Partitions, + Arguments, + RoutingKeys, + Username}, + _From, State) -> + case validate_super_stream_creation(VirtualHost, Name, Partitions) of + {error, Reason} -> + {reply, {error, Reason}, State}; + ok -> + case declare_super_stream_exchange(VirtualHost, Name, Username) of + ok -> + RollbackOperations = + [fun() -> + delete_super_stream_exchange(VirtualHost, Name, + Username) + end], + QueueCreationsResult = + lists:foldl(fun (Partition, {ok, RollbackOps}) -> + case create_stream(VirtualHost, + Partition, + Arguments, + Username) + of + {ok, _} -> + {ok, + [fun() -> + delete_stream(VirtualHost, + Partition, + Username) + end] + ++ RollbackOps}; + {error, Reason} -> + {{error, Reason}, + RollbackOps} + end; + (_, + {{error, _Reason}, _RollbackOps} = + Acc) -> + Acc + end, + {ok, RollbackOperations}, Partitions), + case QueueCreationsResult of + {ok, RollbackOps} -> + BindingsResult = + add_super_stream_bindings(VirtualHost, + Name, + Partitions, + RoutingKeys, + Username), + case BindingsResult of + ok -> + {reply, ok, State}; + Error -> + [Fun() || Fun <- RollbackOps], + {reply, Error, State} + end; + {{error, Reason}, RollbackOps} -> + [Fun() || Fun <- RollbackOps], + {reply, {error, Reason}, State} + end; + {error, Msg} -> + {reply, {error, Msg}, State} + end + end; +handle_call({delete_super_stream, VirtualHost, SuperStream, Username}, + _From, State) -> + case super_stream_partitions(VirtualHost, SuperStream) of + {ok, Partitions} -> + case delete_super_stream_exchange(VirtualHost, SuperStream, + Username) + of + ok -> + ok; + {error, Error} -> + rabbit_log:warning("Error while deleting super stream exchange ~p, ~p", + [SuperStream, Error]), + ok + end, + [begin + case delete_stream(VirtualHost, Stream, Username) of + {ok, deleted} -> + ok; + {error, Err} -> + rabbit_log:warning("Error while delete partition ~p of super stream " + "~p, ~p", + [Stream, SuperStream, Err]), + ok + end + end + || Stream <- Partitions], + {reply, ok, State}; + {error, Error} -> + {reply, {error, Error}, State} end; handle_call({lookup_leader, VirtualHost, Stream}, _From, State) -> Name = @@ -382,31 +420,7 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, end, {reply, Res, State}; handle_call({partitions, VirtualHost, SuperStream}, _From, State) -> - ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), - Res = try - rabbit_exchange:lookup_or_die(ExchangeName), - UnorderedBindings = - [Binding - || Binding = #binding{destination = D} - <- rabbit_binding:list_for_source(ExchangeName), - is_resource_stream_queue(D)], - OrderedBindings = - rabbit_stream_utils:sort_partitions(UnorderedBindings), - {ok, - lists:foldl(fun (#binding{destination = - #resource{kind = queue, name = Q}}, - Acc) -> - Acc ++ [Q]; - (_Binding, Acc) -> - Acc - end, - [], OrderedBindings)} - catch - exit:Error -> - rabbit_log:error("Error while looking up exchange ~p, ~p", - [ExchangeName, Error]), - {error, stream_not_found} - end, + Res = super_stream_partitions(VirtualHost, SuperStream), {reply, Res, State}; handle_call(which_children, _From, State) -> {reply, [], State}. @@ -418,6 +432,339 @@ handle_info(Info, State) -> rabbit_log:info("Received info ~p", [Info]), {noreply, State}. +create_stream(VirtualHost, Reference, Arguments, Username) -> + Name = + #resource{virtual_host = VirtualHost, + kind = queue, + name = Reference}, + StreamQueueArguments = stream_queue_arguments(Arguments), + case validate_stream_queue_arguments(StreamQueueArguments) of + ok -> + Q0 = amqqueue:new(Name, + none, + true, + false, + none, + StreamQueueArguments, + VirtualHost, + #{user => Username}, + rabbit_stream_queue), + try + QueueLookup = + rabbit_amqqueue:with(Name, + fun(Q) -> + ok = + rabbit_amqqueue:assert_equivalence(Q, + true, + false, + StreamQueueArguments, + none) + end), + + case QueueLookup of + ok -> + {error, reference_already_exists}; + {error, not_found} -> + try + case rabbit_queue_type:declare(Q0, node()) of + {new, Q} -> + {ok, amqqueue:get_type_state(Q)}; + {existing, _} -> + {error, reference_already_exists}; + {error, Err} -> + rabbit_log:warning("Error while creating ~p stream, ~p", + [Reference, Err]), + {error, internal_error} + end + catch + exit:Error -> + rabbit_log:error("Error while creating ~p stream, ~p", + [Reference, Error]), + {error, internal_error} + end; + {error, {absent, _, Reason}} -> + rabbit_log:error("Error while creating ~p stream, ~p", + [Reference, Reason]), + {error, internal_error} + end + catch + exit:ExitError -> + % likely to be a problem of inequivalent args on an existing stream + rabbit_log:error("Error while creating ~p stream: ~p", + [Reference, ExitError]), + {error, validation_failed} + end; + error -> + {error, validation_failed} + end. + +delete_stream(VirtualHost, Reference, Username) -> + Name = + #resource{virtual_host = VirtualHost, + kind = queue, + name = Reference}, + rabbit_log:debug("Trying to delete stream ~p", [Reference]), + case rabbit_amqqueue:lookup(Name) of + {ok, Q} -> + rabbit_log:debug("Found queue record ~p, checking if it is a stream", + [Reference]), + case is_stream_queue(Q) of + true -> + rabbit_log:debug("Queue record ~p is a stream, trying to delete it", + [Reference]), + {ok, _} = + rabbit_stream_queue:delete(Q, false, false, Username), + rabbit_log:debug("Stream ~p deleted", [Reference]), + {ok, deleted}; + _ -> + rabbit_log:debug("Queue record ~p is NOT a stream, returning error", + [Reference]), + {error, reference_not_found} + end; + {error, not_found} -> + rabbit_log:debug("Stream ~p not found, cannot delete it", + [Reference]), + {error, reference_not_found} + end. + +super_stream_partitions(VirtualHost, SuperStream) -> + ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), + try + rabbit_exchange:lookup_or_die(ExchangeName), + UnorderedBindings = + [Binding + || Binding = #binding{destination = D} + <- rabbit_binding:list_for_source(ExchangeName), + is_resource_stream_queue(D)], + OrderedBindings = + rabbit_stream_utils:sort_partitions(UnorderedBindings), + {ok, + lists:foldl(fun (#binding{destination = + #resource{kind = queue, name = Q}}, + Acc) -> + Acc ++ [Q]; + (_Binding, Acc) -> + Acc + end, + [], OrderedBindings)} + catch + exit:Error -> + rabbit_log:error("Error while looking up exchange ~p, ~p", + [ExchangeName, Error]), + {error, stream_not_found} + end. + +validate_super_stream_creation(VirtualHost, Name, Partitions) -> + case exchange_exists(VirtualHost, Name) of + {error, validation_failed} -> + {error, + {validation_failed, + rabbit_misc:format("~p is not a correct name for a super stream", + [Name])}}; + {ok, true} -> + {error, + {reference_already_exists, + rabbit_misc:format("there is already an exchange named ~p", + [Name])}}; + {ok, false} -> + case check_already_existing_queue(VirtualHost, Partitions) of + {error, Reason} -> + {error, Reason}; + ok -> + ok + end + end. + +exchange_exists(VirtualHost, Name) -> + case rabbit_stream_utils:enforce_correct_name(Name) of + {ok, CorrectName} -> + ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName), + case rabbit_exchange:lookup(ExchangeName) of + {ok, _} -> + {ok, true}; + {error, not_found} -> + {ok, false} + end; + error -> + {error, validation_failed} + end. + +queue_exists(VirtualHost, Name) -> + case rabbit_stream_utils:enforce_correct_name(Name) of + {ok, CorrectName} -> + QueueName = rabbit_misc:r(VirtualHost, queue, CorrectName), + case rabbit_amqqueue:lookup(QueueName) of + {ok, _} -> + {ok, true}; + {error, not_found} -> + {ok, false} + end; + error -> + {error, validation_failed} + end. + +check_already_existing_queue(VirtualHost, Queues) -> + check_already_existing_queue0(VirtualHost, Queues, undefined). + +check_already_existing_queue0(_VirtualHost, [], undefined) -> + ok; +check_already_existing_queue0(VirtualHost, [Q | T], _Error) -> + case queue_exists(VirtualHost, Q) of + {ok, false} -> + check_already_existing_queue0(VirtualHost, T, undefined); + {ok, true} -> + {error, + {reference_already_exists, + rabbit_misc:format("there is already a queue named ~p", [Q])}}; + {error, validation_failed} -> + {error, + {validation_failed, + rabbit_misc:format("~p is not a correct name for a queue", [Q])}} + end. + +declare_super_stream_exchange(VirtualHost, Name, Username) -> + case rabbit_stream_utils:enforce_correct_name(Name) of + {ok, CorrectName} -> + Args = + rabbit_misc:set_table_value([], + <<"x-super-stream">>, + bool, + true), + CheckedType = rabbit_exchange:check_type(<<"direct">>), + ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName), + X = case rabbit_exchange:lookup(ExchangeName) of + {ok, FoundX} -> + FoundX; + {error, not_found} -> + rabbit_exchange:declare(ExchangeName, + CheckedType, + true, + false, + false, + Args, + Username) + end, + try + ok = + rabbit_exchange:assert_equivalence(X, + CheckedType, + true, + false, + false, + Args) + catch + exit:ExitError -> + % likely to be a problem of inequivalent args on an existing stream + rabbit_log:error("Error while creating ~p super stream exchange: ~p", + [Name, ExitError]), + {error, validation_failed} + end; + error -> + {error, validation_failed} + end. + +add_super_stream_bindings(VirtualHost, + Name, + Partitions, + RoutingKeys, + Username) -> + PartitionsRoutingKeys = lists:zip(Partitions, RoutingKeys), + BindingsResult = + lists:foldl(fun ({Partition, RoutingKey}, {ok, Order}) -> + case add_super_stream_binding(VirtualHost, + Name, + Partition, + RoutingKey, + Order, + Username) + of + ok -> + {ok, Order + 1}; + {error, Reason} -> + {{error, Reason}, 0} + end; + (_, {{error, _Reason}, _Order} = Acc) -> + Acc + end, + {ok, 0}, PartitionsRoutingKeys), + case BindingsResult of + {ok, _} -> + ok; + {{error, Reason}, _} -> + {error, Reason} + end. + +add_super_stream_binding(VirtualHost, + SuperStream, + Partition, + RoutingKey, + Order, + Username) -> + {ok, ExchangeNameBin} = + rabbit_stream_utils:enforce_correct_name(SuperStream), + {ok, QueueNameBin} = + rabbit_stream_utils:enforce_correct_name(Partition), + ExchangeName = rabbit_misc:r(VirtualHost, exchange, ExchangeNameBin), + QueueName = rabbit_misc:r(VirtualHost, queue, QueueNameBin), + Pid = self(), + Arguments = + rabbit_misc:set_table_value([], + <<"x-stream-partition-order">>, + long, + Order), + case rabbit_binding:add(#binding{source = ExchangeName, + destination = QueueName, + key = RoutingKey, + args = Arguments}, + fun (_X, Q) when ?is_amqqueue(Q) -> + try + rabbit_amqqueue:check_exclusive_access(Q, + Pid) + catch + exit:Reason -> + {error, Reason} + end; + (_X, #exchange{}) -> + ok + end, + Username) + of + {error, {resources_missing, [{not_found, Name} | _]}} -> + {error, + {stream_not_found, + rabbit_misc:format("stream ~s does not exists", [Name])}}; + {error, {resources_missing, [{absent, Q, _Reason} | _]}} -> + {error, + {stream_not_found, + rabbit_misc:format("stream ~p does not exists (absent", [Q])}}; + {error, binding_not_found} -> + {error, + {not_found, + rabbit_misc:format("no binding ~s between ~s and ~s", + [RoutingKey, rabbit_misc:rs(ExchangeName), + rabbit_misc:rs(QueueName)])}}; + {error, {binding_invalid, Fmt, Args}} -> + {error, {binding_invalid, rabbit_misc:format(Fmt, Args)}}; + {error, #amqp_error{} = Error} -> + {error, {internal_error, rabbit_misc:format("~p", [Error])}}; + ok -> + ok + end. + +delete_super_stream_exchange(VirtualHost, Name, Username) -> + case rabbit_stream_utils:enforce_correct_name(Name) of + {ok, CorrectName} -> + ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName), + case rabbit_exchange:delete(ExchangeName, false, Username) of + {error, not_found} -> + ok; + ok -> + ok + end; + error -> + {error, validation_failed} + end. + leader_from_members(Q) -> QState = amqqueue:get_type_state(Q), #{name := StreamName} = QState, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 91c0df8be0..ede34d95c7 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2044,7 +2044,7 @@ handle_frame_post_auth(Transport, State, {request, CorrelationId, {create_stream, Stream, Arguments}}) -> - case rabbit_stream_utils:enforce_correct_stream_name(Stream) of + case rabbit_stream_utils:enforce_correct_name(Stream) of {ok, StreamName} -> case rabbit_stream_utils:check_configure_permitted(#resource{name = StreamName, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 92d0bff8af..37545424c8 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -17,7 +17,7 @@ -module(rabbit_stream_utils). %% API --export([enforce_correct_stream_name/1, +-export([enforce_correct_name/1, write_messages/4, parse_map/2, auth_mechanisms/1, @@ -26,13 +26,14 @@ check_write_permitted/3, check_read_permitted/3, extract_stream_list/2, - sort_partitions/1]). + sort_partitions/1, + strip_cr_lf/1]). -define(MAX_PERMISSION_CACHE_SIZE, 12). -include_lib("rabbit_common/include/rabbit.hrl"). -enforce_correct_stream_name(Name) -> +enforce_correct_name(Name) -> % from rabbit_channel StrippedName = binary:replace(Name, [<<"\n">>, <<"\r">>], <<"">>, [global]), @@ -236,3 +237,6 @@ sort_partitions(Partitions) -> end end, Partitions). + +strip_cr_lf(NameBin) -> + binary:replace(NameBin, [<<"\n">>, <<"\r">>], <<"">>, [global]). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl new file mode 100644 index 0000000000..3c036fc1e2 --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -0,0 +1,149 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stream_manager_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-compile(export_all). + +all() -> + [{group, non_parallel_tests}]. + +groups() -> + [{non_parallel_tests, [], [manage_super_stream]}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + Config. + +end_per_suite(Config) -> + Config. + +init_per_group(_, Config) -> + Config1 = + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]), + Config2 = + rabbit_ct_helpers:set_config(Config1, + {rabbitmq_ct_tls_verify, verify_none}), + Config3 = + rabbit_ct_helpers:set_config(Config2, {rabbitmq_stream, verify_none}), + rabbit_ct_helpers:run_setup_steps(Config3, + [fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {rabbit, + [{core_metrics_gc_interval, + 1000}]}) + end, + fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {rabbitmq_stream, + [{connection_negotiation_step_timeout, + 500}]}) + end] + ++ rabbit_ct_broker_helpers:setup_steps()). + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +manage_super_stream(Config) -> + % create super stream + ?assertEqual(ok, + create_super_stream(Config, + <<"invoices">>, + [<<"invoices-0">>, <<"invoices-1">>, + <<"invoices-2">>], + [<<"0">>, <<"1">>, <<"2">>])), + % get the correct partitions + ?assertEqual({ok, + [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, + partitions(Config, <<"invoices">>)), + + [?assertEqual({ok, [Partition]}, + route(Config, RoutingKey, <<"invoices">>)) + || {Partition, RoutingKey} + <- [{<<"invoices-0">>, <<"0">>}, {<<"invoices-1">>, <<"1">>}, + {<<"invoices-2">>, <<"2">>}]], + + % get an error if trying to re-create it + ?assertMatch({error, _}, + create_super_stream(Config, + <<"invoices">>, + [<<"invoices-0">>, <<"invoices-1">>, + <<"invoices-2">>], + [<<"0">>, <<"1">>, <<"2">>])), + + % can delete it + ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), + + % create a stream with the same name as a potential partition + ?assertMatch({ok, _}, create_stream(Config, <<"invoices-1">>)), + + % cannot create the super stream because a partition already exists + ?assertMatch({error, _}, + create_super_stream(Config, + <<"invoices">>, + [<<"invoices-0">>, <<"invoices-1">>, + <<"invoices-2">>], + [<<"0">>, <<"1">>, <<"2">>])), + + ok. + +create_super_stream(Config, Name, Partitions, RKs) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + create_super_stream, + [<<"/">>, + Name, + Partitions, + [], + RKs, + <<"guest">>]). + +delete_super_stream(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + delete_super_stream, + [<<"/">>, Name, <<"guest">>]). + +create_stream(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + create, + [<<"/">>, Name, [], <<"guest">>]). + +partitions(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + partitions, + [<<"/">>, Name]). + +route(Config, RoutingKey, SuperStream) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + route, + [RoutingKey, <<"/">>, SuperStream]). |