summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-19 09:46:49 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-19 09:46:49 +0200
commite8b343b3e6ef5e95532117cbad97f21931e328c8 (patch)
tree5ffd029a3e29cc0c187f1239b6de355190ad3d6a
parent8d8228ba96ef4f84b346a94d04f669a2a5242933 (diff)
downloadrabbitmq-server-git-e8b343b3e6ef5e95532117cbad97f21931e328c8.tar.gz
Use stream queue name hash to choose leader
-rw-r--r--src/rabbit_stream_coordinator.erl19
-rw-r--r--test/rabbit_stream_queue_SUITE.erl18
2 files changed, 22 insertions, 15 deletions
diff --git a/src/rabbit_stream_coordinator.erl b/src/rabbit_stream_coordinator.erl
index 085acda897..7b48d778d0 100644
--- a/src/rabbit_stream_coordinator.erl
+++ b/src/rabbit_stream_coordinator.erl
@@ -13,6 +13,7 @@
%% Copyright (c) 2012-2020 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_stream_coordinator).
+-include("rabbit.hrl").
-behaviour(ra_machine).
@@ -191,11 +192,9 @@ apply(#{from := From}, {policy_changed, #{stream_id := StreamId}} = Cmd,
{State#?MODULE{streams = Streams}, '$ra_no_reply', []}
end;
-apply(#{from := From,
- index := RaftIdx}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Streams} = State) ->
- #{name := StreamId,
- leader_locator_strategy := LeaderLocatorStrategy} = Conf0 = amqqueue:get_type_state(Q),
- Conf = apply_leader_locator_strategy(Conf0, RaftIdx, Streams),
+apply(#{from := From}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Streams} = State) ->
+ #{name := StreamId} = Conf0 = amqqueue:get_type_state(Q),
+ Conf = apply_leader_locator_strategy(Conf0, Streams),
case maps:is_key(StreamId, Streams) of
true ->
{State, '$ra_no_reply', wrap_reply(From, {error, already_started})};
@@ -908,14 +907,16 @@ add_unique(Node, Nodes) ->
delete_replica_pid(Node, ReplicaPids) ->
lists:partition(fun(P) -> node(P) =/= Node end, ReplicaPids).
-apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} = Conf, _, _) ->
+apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} = Conf, _) ->
Conf;
apply_leader_locator_strategy(#{leader_node := Leader,
replica_nodes := Replicas0,
- leader_locator_strategy := <<"random">>} = Conf, Idx, _) ->
+ leader_locator_strategy := <<"random">>,
+ reference := #resource{name = Name}} = Conf, _) ->
Replicas = [Leader | Replicas0],
ClusterSize = length(Replicas),
- Pos = (Idx rem ClusterSize) + 1,
+ Hash = erlang:phash2(Name),
+ Pos = (Hash rem ClusterSize) + 1,
NewLeader = lists:nth(Pos, Replicas),
NewReplicas = lists:delete(NewLeader, Replicas),
Conf#{leader_node => NewLeader,
@@ -923,7 +924,7 @@ apply_leader_locator_strategy(#{leader_node := Leader,
apply_leader_locator_strategy(#{leader_node := Leader,
replica_nodes := Replicas0,
leader_locator_strategy := <<"least-leaders">>} = Conf,
- _, Streams) ->
+ Streams) ->
Replicas = [Leader | Replicas0],
Counters0 = maps:from_list([{R, 0} || R <- Replicas]),
Counters = maps:to_list(maps:fold(fun(_Key, #{conf := #{leader_node := L}}, Acc) ->
diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl
index ff4b58b035..cb7de46268 100644
--- a/test/rabbit_stream_queue_SUITE.erl
+++ b/test/rabbit_stream_queue_SUITE.erl
@@ -1262,22 +1262,28 @@ leader_locator_random(Config) ->
info_all, [<<"/">>, [name, leader]])),
Leader = proplists:get_value(leader, Info),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+
repeat_until(
fun() ->
- ?assertMatch(#'queue.delete_ok'{},
- amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+ QName = base64:encode(crypto:strong_rand_bytes(20)),
- ?assertEqual({'queue.declare_ok', Q, 0, 0},
- declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
[Info2] = lists:filter(
fun(Props) ->
- lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, QName)}, Props)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
info_all, [<<"/">>, [name, leader]])),
Leader2 = proplists:get_value(leader, Info2),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = QName})),
+
Leader =/= Leader2
end, 10).