summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-10-12 14:43:37 +0100
committerdcorbacho <dparracorbacho@piotal.io>2020-10-12 14:43:37 +0100
commitf3ab0a736685f3bac767bb40001bfb85cce81a9f (patch)
treea91687556f34ae16f360c1c20c06e1b2f5aaed75
parent6ebdbf4191e43a96bf5e3a6382985918ac211744 (diff)
downloadrabbitmq-server-git-f3ab0a736685f3bac767bb40001bfb85cce81a9f.tar.gz
Select initial cluster size on stream queue declarationstream-initial-cluster-size
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_stream_queue.erl30
-rw-r--r--test/rabbit_stream_queue_SUITE.erl40
3 files changed, 71 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fd7682fdeb..82b261045d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -780,9 +780,10 @@ declare_args() ->
{<<"x-queue-mode">>, fun check_queue_mode/2},
{<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2},
{<<"x-queue-type">>, fun check_queue_type/2},
- {<<"x-quorum-initial-group-size">>, fun check_default_quorum_initial_group_size_arg/2},
+ {<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2},
{<<"x-max-age">>, fun check_max_age_arg/2},
- {<<"x-max-segment-size">>, fun check_non_neg_int_arg/2}].
+ {<<"x-max-segment-size">>, fun check_non_neg_int_arg/2},
+ {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
@@ -829,7 +830,7 @@ check_single_active_consumer_arg({Type, Val}, Args) ->
Error -> Error
end.
-check_default_quorum_initial_group_size_arg({Type, Val}, Args) ->
+check_initial_cluster_size_arg({Type, Val}, Args) ->
case check_non_neg_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
ok -> ok;
diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl
index ed44f1258a..7dd9ac27f6 100644
--- a/src/rabbit_stream_queue.erl
+++ b/src/rabbit_stream_queue.erl
@@ -495,7 +495,9 @@ make_stream_conf(Node, Q) ->
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)),
MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q),
- Replicas = rabbit_mnesia:cluster_nodes(all) -- [Node],
+ Replicas0 = rabbit_mnesia:cluster_nodes(all) -- [Node],
+ Arguments = amqqueue:get_arguments(Q),
+ Replicas = select_stream_nodes(get_initial_cluster_size(Arguments) - 1, Replicas0),
Formatter = {?MODULE, format_osiris_event, [QName]},
Retention = lists:filter(fun({_, R}) ->
R =/= undefined
@@ -509,6 +511,23 @@ make_stream_conf(Node, Q) ->
event_formatter => Formatter,
epoch => 1}).
+select_stream_nodes(Size, All) when length(All) =< Size ->
+ All;
+select_stream_nodes(Size, All) ->
+ Node = node(),
+ case lists:member(Node, All) of
+ true ->
+ select_stream_nodes(Size - 1, lists:delete(Node, All), [Node]);
+ false ->
+ select_stream_nodes(Size, All, [])
+ end.
+
+select_stream_nodes(0, _, Selected) ->
+ Selected;
+select_stream_nodes(Size, Rest, Selected) ->
+ S = lists:nth(rand:uniform(length(Rest)), Rest),
+ select_stream_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]).
+
update_stream_conf(#{reference := QName} = Conf) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
@@ -654,6 +673,13 @@ capabilities() ->
queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>,
<<"x-max-length">>, <<"x-max-length-bytes">>,
<<"x-single-active-consumer">>, <<"x-queue-type">>,
- <<"x-max-age">>, <<"x-max-segment-size">>],
+ <<"x-max-age">>, <<"x-max-segment-size">>,
+ <<"x-initial-cluster-size">>],
consumer_arguments => [<<"x-stream-offset">>],
server_named => false}.
+
+get_initial_cluster_size(Arguments) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-initial-cluster-size">>) of
+ undefined -> length(rabbit_mnesia:cluster_nodes(running));
+ {_Type, Val} -> Val
+ end.
diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl
index cb38e15ea7..8b8966b049 100644
--- a/test/rabbit_stream_queue_SUITE.erl
+++ b/test/rabbit_stream_queue_SUITE.erl
@@ -46,7 +46,9 @@ groups() ->
delete_classic_replica,
delete_quorum_replica,
consume_from_replica,
- leader_failover]},
+ leader_failover,
+ initial_cluster_size_one,
+ initial_cluster_size_two]},
{unclustered_size_3_1, [], [add_replica]},
{unclustered_size_3_2, [], [consume_without_local_replica]},
{unclustered_size_3_3, [], [grow_coordinator_cluster]},
@@ -1141,6 +1143,42 @@ leader_failover(Config) ->
?assert(NewLeader =/= Server1),
ok = rabbit_ct_broker_helpers:start_node(Config, Server1).
+initial_cluster_size_one(Config) ->
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-initial-cluster-size">>, long, 1}])),
+ check_leader_and_replicas(Config, Q, Server1, []),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
+
+initial_cluster_size_two(Config) ->
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-initial-cluster-size">>, long, 2}])),
+
+ [Info] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader, members]])),
+ ?assertEqual(Server1, proplists:get_value(leader, Info)),
+ ?assertEqual(1, length(proplists:get_value(members, Info))),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
+
invalid_policy(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),