summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Bakken <lbakken@pivotal.io>2020-03-05 19:52:43 +0000
committerLuke Bakken <lbakken@pivotal.io>2020-03-06 16:44:06 +0000
commit93179acf580a69eb3e4fd6f6f7412a6a8d85caa6 (patch)
treed995fc7fcaceca396a47793640638c676f3dbafd
parent4406d8eab7334f07d5caef9a61ca6412a54a6e4c (diff)
downloadrabbitmq-server-git-93179acf580a69eb3e4fd6f6f7412a6a8d85caa6.tar.gz
Add test that should fail
Add code to block multiple queue rebalance operations, fix test Allow acquiring the rebalance lock prior to calling rabbit_amqqueue:rebalance Simplify queue rebalance code to always acquire the lock using the current process
-rw-r--r--src/rabbit_amqqueue.erl35
-rw-r--r--test/dynamic_ha_SUITE.erl25
2 files changed, 54 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 864bce83a0..6a5fd98775 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -514,9 +514,29 @@ not_found_or_absent_dirty(Name) ->
{ok, Q} -> {absent, Q, nodedown}
end.
+-spec get_rebalance_lock(pid()) ->
+ {true, {rebalance_queues, pid()}} | false.
+get_rebalance_lock(Pid) when is_pid(Pid) ->
+ Id = {rebalance_queues, Pid},
+ Nodes = [node()|nodes()],
+ %% Note that we're not re-trying. We want to immediately know
+ %% if a re-balance is taking place and stop accordingly.
+ case global:set_lock(Id, Nodes, 0) of
+ true ->
+ {true, Id};
+ false ->
+ false
+ end.
+
-spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) ->
- {ok, [{node(), pos_integer()}]}.
+ {ok, [{node(), pos_integer()}]} | {error, term()}.
rebalance(Type, VhostSpec, QueueSpec) ->
+ %% We have not yet acquired the rebalance_queues global lock.
+ maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).
+
+maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
+ rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'",
+ [Type, VhostSpec, QueueSpec]),
Running = rabbit_mnesia:cluster_nodes(running),
NumRunning = length(Running),
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
@@ -527,11 +547,16 @@ rebalance(Type, VhostSpec, QueueSpec) ->
NumToRebalance = length(ToRebalance),
ByNode = group_by_node(ToRebalance),
Rem = case (NumToRebalance rem NumRunning) of
- 0 -> 0;
- _ -> 1
- end,
+ 0 -> 0;
+ _ -> 1
+ end,
MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem,
- iterative_rebalance(ByNode, MaxQueuesDesired).
+ Result = iterative_rebalance(ByNode, MaxQueuesDesired),
+ global:del_lock(Id),
+ Result;
+maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) ->
+ rabbit_log:warning("Queue rebalance operation is in progress, please wait."),
+ {error, rebalance_in_progress}.
filter_per_type(all, _) ->
true;
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 16a6dbd59a..1fc19a98f5 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -74,7 +74,8 @@ groups() ->
queue_survive_adding_dead_vhost_mirror,
rebalance_all,
rebalance_exactly,
- rebalance_nodes
+ rebalance_nodes,
+ rebalance_multiple_blocked
% FIXME: Re-enable those tests when the know issues are
% fixed.
% failing_random_policies,
@@ -691,6 +692,28 @@ rebalance_nodes(Config) ->
ok.
+rebalance_multiple_blocked(Config) ->
+ [A, _, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ Q1 = <<"q1">>,
+ Q2 = <<"q2">>,
+ Q3 = <<"q3">>,
+ Q4 = <<"q4">>,
+ Q5 = <<"q5">>,
+ amqp_channel:call(ACh, #'queue.declare'{queue = Q1}),
+ amqp_channel:call(ACh, #'queue.declare'{queue = Q2}),
+ amqp_channel:call(ACh, #'queue.declare'{queue = Q3}),
+ amqp_channel:call(ACh, #'queue.declare'{queue = Q4}),
+ amqp_channel:call(ACh, #'queue.declare'{queue = Q5}),
+ ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))),
+ ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))),
+ ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q3, A)))),
+ ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q4, A)))),
+ ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q5, A)))),
+ true = rpc:cast(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]),
+ {error, rebalance_in_progress} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]),
+ ok.
+
%%----------------------------------------------------------------------------
assert_slaves(RPCNode, QName, Exp) ->