summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-21 14:50:35 +0200
committerGitHub <noreply@github.com>2020-10-21 14:50:35 +0200
commit3147ee9a71ee37a63551fa93f91e279b650c834b (patch)
tree7b3a452e83e68b00803de82a80be24c619080dc6
parent81ec8cad274f2266d70f4164d361859504d174e3 (diff)
parent2c4ad90953b79fc6cc814b47cd7b0617cea10e8c (diff)
downloadrabbitmq-server-git-3147ee9a71ee37a63551fa93f91e279b650c834b.tar.gz
Merge pull request #2471 from rabbitmq/stream-queue-leader-locator
Stream queue leader locator
-rw-r--r--src/rabbit_amqqueue.erl13
-rw-r--r--src/rabbit_policies.erl10
-rw-r--r--src/rabbit_stream_coordinator.erl45
-rw-r--r--src/rabbit_stream_queue.erl14
-rw-r--r--test/rabbit_stream_queue_SUITE.erl188
5 files changed, 262 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 82b261045d..deead84971 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -783,7 +783,8 @@ declare_args() ->
{<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2},
{<<"x-max-age">>, fun check_max_age_arg/2},
{<<"x-max-segment-size">>, fun check_non_neg_int_arg/2},
- {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2}].
+ {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2},
+ {<<"x-queue-leader-locator">>, fun check_queue_leader_locator_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
@@ -902,6 +903,16 @@ check_overflow({longstr, Val}, _Args) ->
check_overflow({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
+check_queue_leader_locator_arg({longstr, Val}, _Args) ->
+ case lists:member(Val, [<<"client-local">>,
+ <<"random">>,
+ <<"least-leaders">>]) of
+ true -> ok;
+ false -> {error, invalid_queue_locator_arg}
+ end;
+check_queue_leader_locator_arg({Type, _}, _Args) ->
+ {error, {unacceptable_type, Type}}.
+
check_queue_mode({longstr, Val}, _Args) ->
case lists:member(Val, [<<"default">>, <<"lazy">>]) of
true -> ok;
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 0dd56a8ccd..f2c44e2289 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -41,6 +41,7 @@ register() ->
{policy_validator, <<"delivery-limit">>},
{policy_validator, <<"max-age">>},
{policy_validator, <<"max-segment-size">>},
+ {policy_validator, <<"queue-leader-locator">>},
{operator_policy_validator, <<"expires">>},
{operator_policy_validator, <<"message-ttl">>},
{operator_policy_validator, <<"max-length">>},
@@ -147,6 +148,15 @@ validate_policy0(<<"max-age">>, Value) ->
ok
end;
+validate_policy0(<<"queue-leader-locator">>, <<"client-local">>) ->
+ ok;
+validate_policy0(<<"queue-leader-locator">>, <<"random">>) ->
+ ok;
+validate_policy0(<<"queue-leader-locator">>, <<"least-leaders">>) ->
+ ok;
+validate_policy0(<<"queue-leader-locator">>, Value) ->
+ {error, "~p is not a valid queue leader locator value", [Value]};
+
validate_policy0(<<"max-segment-size">>, Value)
when is_integer(Value), Value >= 0 ->
ok;
diff --git a/src/rabbit_stream_coordinator.erl b/src/rabbit_stream_coordinator.erl
index 472cf5d70f..8a1205b9af 100644
--- a/src/rabbit_stream_coordinator.erl
+++ b/src/rabbit_stream_coordinator.erl
@@ -192,13 +192,14 @@ apply(#{from := From}, {policy_changed, #{stream_id := StreamId}} = Cmd,
end;
apply(#{from := From}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Streams} = State) ->
- #{name := StreamId} = Conf = amqqueue:get_type_state(Q),
+ #{name := StreamId} = Conf0 = amqqueue:get_type_state(Q),
+ Conf = apply_leader_locator_strategy(Conf0, Streams),
case maps:is_key(StreamId, Streams) of
true ->
{State, '$ra_no_reply', wrap_reply(From, {error, already_started})};
false ->
Phase = phase_start_cluster,
- PhaseArgs = [Q],
+ PhaseArgs = [amqqueue:set_type_state(Q, Conf)],
SState = #{state => start_cluster,
phase => Phase,
phase_args => PhaseArgs,
@@ -904,3 +905,43 @@ add_unique(Node, Nodes) ->
delete_replica_pid(Node, ReplicaPids) ->
lists:partition(fun(P) -> node(P) =/= Node end, ReplicaPids).
+
+apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} = Conf, _) ->
+ Conf;
+apply_leader_locator_strategy(#{leader_node := Leader,
+ replica_nodes := Replicas0,
+ leader_locator_strategy := <<"random">>,
+ name := StreamId} = Conf, _) ->
+ Replicas = [Leader | Replicas0],
+ ClusterSize = length(Replicas),
+ Hash = erlang:phash2(StreamId),
+ Pos = (Hash rem ClusterSize) + 1,
+ NewLeader = lists:nth(Pos, Replicas),
+ NewReplicas = lists:delete(NewLeader, Replicas),
+ Conf#{leader_node => NewLeader,
+ replica_nodes => NewReplicas};
+apply_leader_locator_strategy(#{leader_node := Leader,
+ replica_nodes := Replicas0,
+ leader_locator_strategy := <<"least-leaders">>} = Conf,
+ Streams) ->
+ Replicas = [Leader | Replicas0],
+ Counters0 = maps:from_list([{R, 0} || R <- Replicas]),
+ Counters = maps:to_list(maps:fold(fun(_Key, #{conf := #{leader_node := L}}, Acc) ->
+ maps:update_with(L, fun(V) -> V + 1 end, 0, Acc)
+ end, Counters0, Streams)),
+ Ordered = lists:sort(fun({_, V1}, {_, V2}) ->
+ V1 =< V2
+ end, Counters),
+ %% We could have potentially introduced nodes that are not in the list of replicas if
+ %% initial cluster size is smaller than the cluster size. Let's select the first one
+ %% that is on the list of replicas
+ NewLeader = select_first_matching_node(Ordered, Replicas),
+ NewReplicas = lists:delete(NewLeader, Replicas),
+ Conf#{leader_node => NewLeader,
+ replica_nodes => NewReplicas}.
+
+select_first_matching_node([{N, _} | Rest], Replicas) ->
+ case lists:member(N, Replicas) of
+ true -> N;
+ false -> select_first_matching_node(Rest, Replicas)
+ end.
diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl
index 7ad5076cd3..2a9575c117 100644
--- a/src/rabbit_stream_queue.erl
+++ b/src/rabbit_stream_queue.erl
@@ -496,6 +496,8 @@ make_stream_conf(Node, Q) ->
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)),
MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q),
+ LeaderLocator = queue_leader_locator(args_policy_lookup(<<"queue-leader-locator">>,
+ fun res_arg/2, Q)),
Replicas0 = rabbit_mnesia:cluster_nodes(all) -- [Node],
Arguments = amqqueue:get_arguments(Q),
Replicas = select_stream_nodes(get_initial_cluster_size(Arguments) - 1, Replicas0),
@@ -507,6 +509,7 @@ make_stream_conf(Node, Q) ->
add_if_defined(max_segment_size, MaxSegmentSize, #{reference => QName,
name => Name,
retention => Retention,
+ leader_locator_strategy => LeaderLocator,
leader_node => Node,
replica_nodes => Replicas,
event_formatter => Formatter,
@@ -562,6 +565,12 @@ max_age(Age) ->
max_age(Age1, Age2) ->
min(rabbit_amqqueue:check_max_age(Age1), rabbit_amqqueue:check_max_age(Age2)).
+queue_leader_locator(undefined) -> <<"client-local">>;
+queue_leader_locator(Val) -> Val.
+
+res_arg(PolVal, undefined) -> PolVal;
+res_arg(_, ArgVal) -> ArgVal.
+
queue_name(#resource{virtual_host = VHost, name = Name}) ->
Timestamp = erlang:integer_to_binary(erlang:system_time()),
osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_",
@@ -670,12 +679,13 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange},
rabbit_msg_record:to_iodata(R).
capabilities() ->
- #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>],
+ #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>,
+ <<"queue-leader-locator">>],
queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>,
<<"x-max-length">>, <<"x-max-length-bytes">>,
<<"x-single-active-consumer">>, <<"x-queue-type">>,
<<"x-max-age">>, <<"x-max-segment-size">>,
- <<"x-initial-cluster-size">>],
+ <<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>],
consumer_arguments => [<<"x-stream-offset">>],
server_named => false}.
diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl
index 5bc4c2a5ad..5ae4de82f4 100644
--- a/test/rabbit_stream_queue_SUITE.erl
+++ b/test/rabbit_stream_queue_SUITE.erl
@@ -48,7 +48,11 @@ groups() ->
consume_from_replica,
leader_failover,
initial_cluster_size_one,
- initial_cluster_size_two]},
+ initial_cluster_size_two,
+ leader_locator_client_local,
+ leader_locator_random,
+ leader_locator_least_leaders,
+ leader_locator_policy]},
{unclustered_size_3_1, [], [add_replica]},
{unclustered_size_3_2, [], [consume_without_local_replica]},
{unclustered_size_3_3, [], [grow_coordinator_cluster]},
@@ -1105,7 +1109,7 @@ leader_failover(Config) ->
ok = rabbit_ct_broker_helpers:start_node(Config, Server1).
initial_cluster_size_one(Config) ->
- [Server1, _Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
@@ -1119,7 +1123,7 @@ initial_cluster_size_one(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
initial_cluster_size_two(Config) ->
- [Server1, _Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
@@ -1140,6 +1144,184 @@ initial_cluster_size_two(Config) ->
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
+leader_locator_client_local(Config) ->
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
+
+ [Info] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ ?assertEqual(Server1, proplists:get_value(leader, Info)),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+
+ %% Try second node
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
+
+ [Info2] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ ?assertEqual(Server2, proplists:get_value(leader, Info2)),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch2, #'queue.delete'{queue = Q})),
+
+ %% Try third node
+ Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server3),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch3, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
+
+ [Info3] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ ?assertEqual(Server3, proplists:get_value(leader, Info3)),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch3, #'queue.delete'{queue = Q})).
+
+leader_locator_random(Config) ->
+ [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
+
+ [Info] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ Leader = proplists:get_value(leader, Info),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+
+ repeat_until(
+ fun() ->
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
+
+ [Info2] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ Leader2 = proplists:get_value(leader, Info2),
+
+ Leader =/= Leader2
+ end, 10).
+
+leader_locator_least_leaders(Config) ->
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ Q1 = <<"q1">>,
+ Q2 = <<"q2">>,
+ ?assertEqual({'queue.declare_ok', Q1, 0, 0},
+ declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
+ ?assertEqual({'queue.declare_ok', Q2, 0, 0},
+ declare(Ch, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])),
+
+ [Info] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ Leader = proplists:get_value(leader, Info),
+
+ ?assert(lists:member(Leader, [Server2, Server3])).
+
+leader_locator_policy(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"leader-locator">>, <<"leader_locator_.*">>, <<"queues">>,
+ [{<<"queue-leader-locator">>, <<"random">>}]),
+
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [policy, operator_policy,
+ effective_policy_definition,
+ name, leader]]),
+
+ ?assertEqual(<<"leader-locator">>, proplists:get_value(policy, Info)),
+ ?assertEqual('', proplists:get_value(operator_policy, Info)),
+ ?assertEqual([{<<"queue-leader-locator">>, <<"random">>}],
+ proplists:get_value(effective_policy_definition, Info)),
+
+ Leader = proplists:get_value(leader, Info),
+
+ repeat_until(
+ fun() ->
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ [Info2] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ Leader2 = proplists:get_value(leader, Info2),
+ Leader =/= Leader2
+ end, 10),
+
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"leader-locator">>).
+
+repeat_until(_, 0) ->
+ ct:fail("Condition did not materialize in the expected amount of attempts");
+repeat_until(Fun, N) ->
+ case Fun() of
+ true -> ok;
+ false -> repeat_until(Fun, N - 1)
+ end.
+
invalid_policy(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),