diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-10-15 15:16:00 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-10-15 15:16:00 +0100 |
commit | 8d8228ba96ef4f84b346a94d04f669a2a5242933 (patch) | |
tree | 0b3748e70c72ec2b735dc32c5806928cf8fbb7ac | |
parent | f3ab0a736685f3bac767bb40001bfb85cce81a9f (diff) | |
download | rabbitmq-server-git-8d8228ba96ef4f84b346a94d04f669a2a5242933.tar.gz |
Stream queue-leader-locator argument and policy
Options: client-local, random, least-leaders
-rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
-rw-r--r-- | src/rabbit_policies.erl | 10 | ||||
-rw-r--r-- | src/rabbit_stream_coordinator.erl | 47 | ||||
-rw-r--r-- | src/rabbit_stream_queue.erl | 14 | ||||
-rw-r--r-- | test/rabbit_stream_queue_SUITE.erl | 192 |
5 files changed, 266 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 82b261045d..deead84971 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -783,7 +783,8 @@ declare_args() -> {<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2}, {<<"x-max-age">>, fun check_max_age_arg/2}, {<<"x-max-segment-size">>, fun check_non_neg_int_arg/2}, - {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2}]. + {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2}, + {<<"x-queue-leader-locator">>, fun check_queue_leader_locator_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. @@ -902,6 +903,16 @@ check_overflow({longstr, Val}, _Args) -> check_overflow({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. +check_queue_leader_locator_arg({longstr, Val}, _Args) -> + case lists:member(Val, [<<"client-local">>, + <<"random">>, + <<"least-leaders">>]) of + true -> ok; + false -> {error, invalid_queue_locator_arg} + end; +check_queue_leader_locator_arg({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + check_queue_mode({longstr, Val}, _Args) -> case lists:member(Val, [<<"default">>, <<"lazy">>]) of true -> ok; diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 0dd56a8ccd..f2c44e2289 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -41,6 +41,7 @@ register() -> {policy_validator, <<"delivery-limit">>}, {policy_validator, <<"max-age">>}, {policy_validator, <<"max-segment-size">>}, + {policy_validator, <<"queue-leader-locator">>}, {operator_policy_validator, <<"expires">>}, {operator_policy_validator, <<"message-ttl">>}, {operator_policy_validator, <<"max-length">>}, @@ -147,6 +148,15 @@ validate_policy0(<<"max-age">>, Value) -> ok end; +validate_policy0(<<"queue-leader-locator">>, <<"client-local">>) -> + ok; +validate_policy0(<<"queue-leader-locator">>, <<"random">>) -> + ok; +validate_policy0(<<"queue-leader-locator">>, <<"least-leaders">>) -> + ok; +validate_policy0(<<"queue-leader-locator">>, Value) -> + {error, "~p is not a valid queue leader locator value", [Value]}; + validate_policy0(<<"max-segment-size">>, Value) when is_integer(Value), Value >= 0 -> ok; diff --git a/src/rabbit_stream_coordinator.erl b/src/rabbit_stream_coordinator.erl index 472cf5d70f..085acda897 100644 --- a/src/rabbit_stream_coordinator.erl +++ b/src/rabbit_stream_coordinator.erl @@ -191,14 +191,17 @@ apply(#{from := From}, {policy_changed, #{stream_id := StreamId}} = Cmd, {State#?MODULE{streams = Streams}, '$ra_no_reply', []} end; -apply(#{from := From}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Streams} = State) -> - #{name := StreamId} = Conf = amqqueue:get_type_state(Q), +apply(#{from := From, + index := RaftIdx}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Streams} = State) -> + #{name := StreamId, + leader_locator_strategy := LeaderLocatorStrategy} = Conf0 = amqqueue:get_type_state(Q), + Conf = apply_leader_locator_strategy(Conf0, RaftIdx, Streams), case maps:is_key(StreamId, Streams) of true -> {State, '$ra_no_reply', wrap_reply(From, {error, already_started})}; false -> Phase = phase_start_cluster, - PhaseArgs = [Q], + PhaseArgs = [amqqueue:set_type_state(Q, Conf)], SState = #{state => start_cluster, phase => Phase, phase_args => PhaseArgs, @@ -904,3 +907,41 @@ add_unique(Node, Nodes) -> delete_replica_pid(Node, ReplicaPids) -> lists:partition(fun(P) -> node(P) =/= Node end, ReplicaPids). + +apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} = Conf, _, _) -> + Conf; +apply_leader_locator_strategy(#{leader_node := Leader, + replica_nodes := Replicas0, + leader_locator_strategy := <<"random">>} = Conf, Idx, _) -> + Replicas = [Leader | Replicas0], + ClusterSize = length(Replicas), + Pos = (Idx rem ClusterSize) + 1, + NewLeader = lists:nth(Pos, Replicas), + NewReplicas = lists:delete(NewLeader, Replicas), + Conf#{leader_node => NewLeader, + replica_nodes => NewReplicas}; +apply_leader_locator_strategy(#{leader_node := Leader, + replica_nodes := Replicas0, + leader_locator_strategy := <<"least-leaders">>} = Conf, + _, Streams) -> + Replicas = [Leader | Replicas0], + Counters0 = maps:from_list([{R, 0} || R <- Replicas]), + Counters = maps:to_list(maps:fold(fun(_Key, #{conf := #{leader_node := L}}, Acc) -> + maps:update_with(L, fun(V) -> V + 1 end, 0, Acc) + end, Counters0, Streams)), + Ordered = lists:sort(fun({_, V1}, {_, V2}) -> + V1 =< V2 + end, Counters), + %% We could have potentially introduced nodes that are not in the list of replicas if + %% initial cluster size is smaller than the cluster size. Let's select the first one + %% that is on the list of replicas + NewLeader = select_first_matching_node(Ordered, Replicas), + NewReplicas = lists:delete(NewLeader, Replicas), + Conf#{leader_node => NewLeader, + replica_nodes => NewReplicas}. + +select_first_matching_node([{N, _} | Rest], Replicas) -> + case lists:member(N, Replicas) of + true -> N; + false -> select_first_matching_node(Rest, Replicas) + end. diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl index 7dd9ac27f6..3af56da9eb 100644 --- a/src/rabbit_stream_queue.erl +++ b/src/rabbit_stream_queue.erl @@ -495,6 +495,8 @@ make_stream_conf(Node, Q) -> MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)), MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q), + LeaderLocator = queue_leader_locator(args_policy_lookup(<<"queue-leader-locator">>, + fun res_arg/2, Q)), Replicas0 = rabbit_mnesia:cluster_nodes(all) -- [Node], Arguments = amqqueue:get_arguments(Q), Replicas = select_stream_nodes(get_initial_cluster_size(Arguments) - 1, Replicas0), @@ -506,6 +508,7 @@ make_stream_conf(Node, Q) -> add_if_defined(max_segment_size, MaxSegmentSize, #{reference => QName, name => Name, retention => Retention, + leader_locator_strategy => LeaderLocator, leader_node => Node, replica_nodes => Replicas, event_formatter => Formatter, @@ -561,6 +564,12 @@ max_age(Age) -> max_age(Age1, Age2) -> min(rabbit_amqqueue:check_max_age(Age1), rabbit_amqqueue:check_max_age(Age2)). +queue_leader_locator(undefined) -> <<"client-local">>; +queue_leader_locator(Val) -> Val. + +res_arg(PolVal, undefined) -> PolVal; +res_arg(_, ArgVal) -> ArgVal. + queue_name(#resource{virtual_host = VHost, name = Name}) -> Timestamp = erlang:integer_to_binary(erlang:system_time()), osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_", @@ -669,12 +678,13 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange}, rabbit_msg_record:to_iodata(R). capabilities() -> - #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>], + #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>, + <<"queue-leader-locator">>], queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>, <<"x-max-length">>, <<"x-max-length-bytes">>, <<"x-single-active-consumer">>, <<"x-queue-type">>, <<"x-max-age">>, <<"x-max-segment-size">>, - <<"x-initial-cluster-size">>], + <<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>], consumer_arguments => [<<"x-stream-offset">>], server_named => false}. diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl index 8b8966b049..ff4b58b035 100644 --- a/test/rabbit_stream_queue_SUITE.erl +++ b/test/rabbit_stream_queue_SUITE.erl @@ -48,7 +48,11 @@ groups() -> consume_from_replica, leader_failover, initial_cluster_size_one, - initial_cluster_size_two]}, + initial_cluster_size_two, + leader_locator_client_local, + leader_locator_random, + leader_locator_least_leaders, + leader_locator_policy]}, {unclustered_size_3_1, [], [add_replica]}, {unclustered_size_3_2, [], [consume_without_local_replica]}, {unclustered_size_3_3, [], [grow_coordinator_cluster]}, @@ -264,7 +268,13 @@ declare_invalid_args(Config) -> {{shutdown, {server_initiated_close, 406, _}}, _}, declare(rabbit_ct_client_helpers:open_channel(Config, Server), Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])). + {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"hop">>}])). declare_server_named(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -1144,7 +1154,7 @@ leader_failover(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, Server1). initial_cluster_size_one(Config) -> - [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), Q = ?config(queue_name, Config), @@ -1158,7 +1168,7 @@ initial_cluster_size_one(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q})). initial_cluster_size_two(Config) -> - [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), Q = ?config(queue_name, Config), @@ -1179,6 +1189,180 @@ initial_cluster_size_two(Config) -> ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = Q})). +leader_locator_client_local(Config) -> + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + + [Info] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + ?assertEqual(Server1, proplists:get_value(leader, Info)), + + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + + %% Try second node + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + + [Info2] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + ?assertEqual(Server2, proplists:get_value(leader, Info2)), + + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch2, #'queue.delete'{queue = Q})), + + %% Try third node + Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server3), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch3, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + + [Info3] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + ?assertEqual(Server3, proplists:get_value(leader, Info3)), + + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch3, #'queue.delete'{queue = Q})). + +leader_locator_random(Config) -> + [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), + + [Info] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + Leader = proplists:get_value(leader, Info), + + repeat_until( + fun() -> + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), + + [Info2] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + Leader2 = proplists:get_value(leader, Info2), + Leader =/= Leader2 + end, 10). + +leader_locator_least_leaders(Config) -> + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + Q1 = <<"q1">>, + Q2 = <<"q2">>, + ?assertEqual({'queue.declare_ok', Q1, 0, 0}, + declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + ?assertEqual({'queue.declare_ok', Q2, 0, 0}, + declare(Ch, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])), + + [Info] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + Leader = proplists:get_value(leader, Info), + + ?assert(lists:member(Leader, [Server2, Server3])). + +leader_locator_policy(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"leader-locator">>, <<"leader_locator_.*">>, <<"queues">>, + [{<<"queue-leader-locator">>, <<"random">>}]), + + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [policy, operator_policy, + effective_policy_definition, + name, leader]]), + + ?assertEqual(<<"leader-locator">>, proplists:get_value(policy, Info)), + ?assertEqual('', proplists:get_value(operator_policy, Info)), + ?assertEqual([{<<"queue-leader-locator">>, <<"random">>}], + proplists:get_value(effective_policy_definition, Info)), + + Leader = proplists:get_value(leader, Info), + + repeat_until( + fun() -> + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + [Info2] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + Leader2 = proplists:get_value(leader, Info2), + Leader =/= Leader2 + end, 10), + + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"leader-locator">>). + +repeat_until(_, 0) -> + ct:fail("Condition did not materialize in the expected amount of attempts"); +repeat_until(Fun, N) -> + case Fun() of + true -> ok; + false -> repeat_until(Fun, N - 1) + end. + invalid_policy(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |