summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_quorum_queue.erl109
-rw-r--r--test/quorum_queue_SUITE.erl44
2 files changed, 152 insertions, 1 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index b4118b96ac..4fe9c4cbf3 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -39,6 +39,7 @@
-export([cleanup_data_dir/0]).
-export([shrink_all/1,
grow/4]).
+-export([rebalance/2]).
%%-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit.hrl").
@@ -847,6 +848,114 @@ grow(Node, VhostSpec, QueueSpec, Strategy) ->
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
+-spec rebalance(binary(), binary()) -> {ok, [{node(), pos_integer()}]}.
+rebalance(VhostSpec, QueueSpec) ->
+ Running = rabbit_mnesia:cluster_nodes(running),
+ NumRunning = length(Running),
+ ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
+ amqqueue:get_type(Q) == ?MODULE,
+ is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
+ is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)],
+ NumToRebalance = length(ToRebalance),
+ ByNode = group_by_node(ToRebalance),
+ Rem = case (NumToRebalance rem NumRunning) of
+ 0 -> 0;
+ _ -> 1
+ end,
+ MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem,
+ iterative_rebalance(ByNode, MaxQueuesDesired).
+
+iterative_rebalance(ByNode, MaxQueuesDesired) ->
+ case maybe_migrate(ByNode, MaxQueuesDesired) of
+ {ok, Summary} ->
+ rabbit_log:warning("Nothing to do, all balanced"),
+ {ok, Summary};
+ {migrated, Other} ->
+ iterative_rebalance(Other, MaxQueuesDesired);
+ {not_migrated, Other} ->
+ iterative_rebalance(Other, MaxQueuesDesired)
+ end.
+
+maybe_migrate(ByNode, MaxQueuesDesired) ->
+ maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).
+
+maybe_migrate(ByNode, _, []) ->
+ {ok, maps:fold(fun(K, V, Acc) ->
+ [{K, length(V)} | Acc]
+ end, [], ByNode)};
+maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
+ case maps:get(N, ByNode, []) of
+ [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired ->
+ {RaName, _} = Pid = amqqueue:get_pid(Q),
+ Name = amqqueue:get_name(Q),
+ Members = get_nodes(Q) -- [N],
+ case Members of
+ [] ->
+ {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
+ _ ->
+ [{Length, Destination} | _] = sort_by_number_of_queues(Members, ByNode),
+ rabbit_log:warning("Migrating quorum queue ~p from node ~p with ~p queues to node ~p with ~p queues",
+ [Name, N, length(All), Destination, Length]),
+ case ra:transfer_leadership(Pid, {RaName, Destination}) of
+ ok ->
+ {_, _, {_, NewNode}} = ra:members(Pid),
+ rabbit_log:warning("Quorum queue ~p migrated to ~p", [Name, NewNode]),
+ {migrated, update_migrated_queue(NewNode, N, Queue, Queues, ByNode)};
+ already_leader ->
+ rabbit_log:warning("Quorum queue ~p in ~p is already a leader",
+ [Name, Destination]),
+ {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
+ {error, Reason} ->
+ rabbit_log:warning("Error migrating quorum queue ~p: ~p", [Name, Reason]),
+ {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
+ {timeout, _} ->
+ %% TODO should we retry once?
+ rabbit_log:warning("Timeout migrating quorum queue ~p: ~p", [Name]),
+ {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}
+ end
+ end;
+ [{_, _, true} | _] = All when length(All) > MaxQueuesDesired ->
+ rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. "
+ "Do nothing", [N, length(All)]),
+ maybe_migrate(ByNode, MaxQueuesDesired, Nodes);
+ All ->
+ rabbit_log:warning("Node ~p only contains ~p queues, do nothing",
+ [N, length(All)]),
+ maybe_migrate(ByNode, MaxQueuesDesired, Nodes)
+ end.
+
+update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) ->
+ maps:update(N, Queues ++ [{Entries, Q, true}], ByNode).
+
+update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) ->
+ maps:update_with(NewNode,
+ fun(L) -> L ++ [{Entries, Q, true}] end,
+ [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)).
+
+sort_by_number_of_queues(Nodes, ByNode) ->
+ lists:keysort(1,
+ lists:map(fun(Node) ->
+ {num_queues(Node, ByNode), Node}
+ end, Nodes)).
+
+num_queues(Node, ByNode) ->
+ length(maps:get(Node, ByNode, [])).
+
+group_by_node(Queues) ->
+ ByNode = lists:foldl(fun(Q, Acc) ->
+ maps:update_with(amqqueue:qnode(Q),
+ fun(L) -> [{log_entries(Q), Q, false} | L] end,
+ [{log_entries(Q), Q, false}], Acc)
+ end, #{}, Queues),
+ maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode).
+
+log_entries(Q) ->
+ Name = amqqueue:get_name(Q),
+ case ets:lookup(ra_metrics, Name) of
+ [] -> 0;
+ [{_, _, SnapIdx, _, _, LastIdx, _}] -> LastIdx - SnapIdx
+ end.
+
get_resource_name(#resource{name = Name}) ->
Name.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 7ef38895eb..f0bcc9668b 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -71,7 +71,8 @@ groups() ->
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
consume_in_minority,
- shrink_all
+ shrink_all,
+ rebalance
]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
@@ -667,7 +668,48 @@ shrink_all(Config) ->
{_, {error, 1, last_node}}], Result2),
ok.
+rebalance(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q1 = <<"q1">>,
+ Q2 = <<"q2">>,
+ Q3 = <<"q3">>,
+ Q4 = <<"q4">>,
+ Q5 = <<"q5">>,
+
+ ?assertEqual({'queue.declare_ok', Q1, 0, 0},
+ declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ {ok, _, {_, Leader1}} = ra:members({ra_name(Q1), Server0}),
+ publish(Ch, Q1),
+ publish(Ch, Q1),
+ publish(Ch, Q1),
+
+ ?assertEqual({'queue.declare_ok', Q2, 0, 0},
+ declare(Ch, Q2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ {ok, _, {_, Leader2}} = ra:members({ra_name(Q2), Server0}),
+ publish(Ch, Q2),
+ publish(Ch, Q2),
+
+ ?assertEqual({'queue.declare_ok', Q3, 0, 0},
+ declare(Ch, Q3, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({'queue.declare_ok', Q4, 0, 0},
+ declare(Ch, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({'queue.declare_ok', Q5, 0, 0},
+ declare(Ch, Q5, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ timer:sleep(500),
+ {ok, Summary} = rpc:call(Server0, rabbit_quorum_queue, rebalance, [".*", ".*"]),
+
+ %% Q1 and Q2 should not have moved leader, as these are the queues with more
+ %% log entries and we allow up to two queues per node (3 nodes, 5 queues)
+ ?assertMatch({ok, _, {_, Leader1}}, ra:members({ra_name(Q1), Server0})),
+ ?assertMatch({ok, _, {_, Leader2}}, ra:members({ra_name(Q2), Server0})),
+
+ %% Check that we have at most 2 queues per node
+ ?assert(lists:all(fun({_, V}) -> V =< 2 end, Summary)),
+ ok.
subscribe_should_fail_when_global_qos_true(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),