summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-19 10:58:14 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-19 10:58:14 +0200
commitdb5d3f8fa03ea6ed5c40c3d69f415cd875b7a89e (patch)
treecff0c2b015c70fea81e310a1968bbb61552b4f49
parente8b343b3e6ef5e95532117cbad97f21931e328c8 (diff)
downloadrabbitmq-server-git-db5d3f8fa03ea6ed5c40c3d69f415cd875b7a89e.tar.gz
Use stream ID hash to pick "random" leaderstream-queue-leader-locator-with-hash
-rw-r--r--src/rabbit_stream_coordinator.erl5
-rw-r--r--test/rabbit_stream_queue_SUITE.erl12
2 files changed, 7 insertions, 10 deletions
diff --git a/src/rabbit_stream_coordinator.erl b/src/rabbit_stream_coordinator.erl
index 7b48d778d0..8a1205b9af 100644
--- a/src/rabbit_stream_coordinator.erl
+++ b/src/rabbit_stream_coordinator.erl
@@ -13,7 +13,6 @@
%% Copyright (c) 2012-2020 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_stream_coordinator).
--include("rabbit.hrl").
-behaviour(ra_machine).
@@ -912,10 +911,10 @@ apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} =
apply_leader_locator_strategy(#{leader_node := Leader,
replica_nodes := Replicas0,
leader_locator_strategy := <<"random">>,
- reference := #resource{name = Name}} = Conf, _) ->
+ name := StreamId} = Conf, _) ->
Replicas = [Leader | Replicas0],
ClusterSize = length(Replicas),
- Hash = erlang:phash2(Name),
+ Hash = erlang:phash2(StreamId),
Pos = (Hash rem ClusterSize) + 1,
NewLeader = lists:nth(Pos, Replicas),
NewReplicas = lists:delete(NewLeader, Replicas),
diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl
index cb7de46268..d6275eae17 100644
--- a/test/rabbit_stream_queue_SUITE.erl
+++ b/test/rabbit_stream_queue_SUITE.erl
@@ -1267,23 +1267,21 @@ leader_locator_random(Config) ->
repeat_until(
fun() ->
- QName = base64:encode(crypto:strong_rand_bytes(20)),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
- ?assertEqual({'queue.declare_ok', QName, 0, 0},
- declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
[Info2] = lists:filter(
fun(Props) ->
- lists:member({name, rabbit_misc:r(<<"/">>, queue, QName)}, Props)
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
info_all, [<<"/">>, [name, leader]])),
Leader2 = proplists:get_value(leader, Info2),
- ?assertMatch(#'queue.delete_ok'{},
- amqp_channel:call(Ch, #'queue.delete'{queue = QName})),
-
Leader =/= Leader2
end, 10).