diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-19 09:46:49 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-19 09:46:49 +0200 |
commit | e8b343b3e6ef5e95532117cbad97f21931e328c8 (patch) | |
tree | 5ffd029a3e29cc0c187f1239b6de355190ad3d6a | |
parent | 8d8228ba96ef4f84b346a94d04f669a2a5242933 (diff) | |
download | rabbitmq-server-git-e8b343b3e6ef5e95532117cbad97f21931e328c8.tar.gz |
Use stream queue name hash to choose leader
-rw-r--r-- | src/rabbit_stream_coordinator.erl | 19 | ||||
-rw-r--r-- | test/rabbit_stream_queue_SUITE.erl | 18 |
2 files changed, 22 insertions, 15 deletions
diff --git a/src/rabbit_stream_coordinator.erl b/src/rabbit_stream_coordinator.erl index 085acda897..7b48d778d0 100644 --- a/src/rabbit_stream_coordinator.erl +++ b/src/rabbit_stream_coordinator.erl @@ -13,6 +13,7 @@ %% Copyright (c) 2012-2020 VMware, Inc. or its affiliates. All rights reserved. %% -module(rabbit_stream_coordinator). +-include("rabbit.hrl"). -behaviour(ra_machine). @@ -191,11 +192,9 @@ apply(#{from := From}, {policy_changed, #{stream_id := StreamId}} = Cmd, {State#?MODULE{streams = Streams}, '$ra_no_reply', []} end; -apply(#{from := From, - index := RaftIdx}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Streams} = State) -> - #{name := StreamId, - leader_locator_strategy := LeaderLocatorStrategy} = Conf0 = amqqueue:get_type_state(Q), - Conf = apply_leader_locator_strategy(Conf0, RaftIdx, Streams), +apply(#{from := From}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Streams} = State) -> + #{name := StreamId} = Conf0 = amqqueue:get_type_state(Q), + Conf = apply_leader_locator_strategy(Conf0, Streams), case maps:is_key(StreamId, Streams) of true -> {State, '$ra_no_reply', wrap_reply(From, {error, already_started})}; @@ -908,14 +907,16 @@ add_unique(Node, Nodes) -> delete_replica_pid(Node, ReplicaPids) -> lists:partition(fun(P) -> node(P) =/= Node end, ReplicaPids). -apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} = Conf, _, _) -> +apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} = Conf, _) -> Conf; apply_leader_locator_strategy(#{leader_node := Leader, replica_nodes := Replicas0, - leader_locator_strategy := <<"random">>} = Conf, Idx, _) -> + leader_locator_strategy := <<"random">>, + reference := #resource{name = Name}} = Conf, _) -> Replicas = [Leader | Replicas0], ClusterSize = length(Replicas), - Pos = (Idx rem ClusterSize) + 1, + Hash = erlang:phash2(Name), + Pos = (Hash rem ClusterSize) + 1, NewLeader = lists:nth(Pos, Replicas), NewReplicas = lists:delete(NewLeader, Replicas), Conf#{leader_node => NewLeader, @@ -923,7 +924,7 @@ apply_leader_locator_strategy(#{leader_node := Leader, apply_leader_locator_strategy(#{leader_node := Leader, replica_nodes := Replicas0, leader_locator_strategy := <<"least-leaders">>} = Conf, - _, Streams) -> + Streams) -> Replicas = [Leader | Replicas0], Counters0 = maps:from_list([{R, 0} || R <- Replicas]), Counters = maps:to_list(maps:fold(fun(_Key, #{conf := #{leader_node := L}}, Acc) -> diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl index ff4b58b035..cb7de46268 100644 --- a/test/rabbit_stream_queue_SUITE.erl +++ b/test/rabbit_stream_queue_SUITE.erl @@ -1262,22 +1262,28 @@ leader_locator_random(Config) -> info_all, [<<"/">>, [name, leader]])), Leader = proplists:get_value(leader, Info), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + repeat_until( fun() -> - ?assertMatch(#'queue.delete_ok'{}, - amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + QName = base64:encode(crypto:strong_rand_bytes(20)), - ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), [Info2] = lists:filter( fun(Props) -> - lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + lists:member({name, rabbit_misc:r(<<"/">>, queue, QName)}, Props) end, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info_all, [<<"/">>, [name, leader]])), Leader2 = proplists:get_value(leader, Info2), + + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = QName})), + Leader =/= Leader2 end, 10). |