diff options
author | Michael Klishin <mklishin@pivotal.io> | 2020-03-07 21:31:01 +0300 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2020-03-07 21:32:13 +0300 |
commit | 486f3adb6bab6ec19c312828c2746ffd69c07a4f (patch) | |
tree | c94e5733814d497753b6c6eaf4672ab88843cb12 | |
parent | d3ca25bfec0e307b19167bac5f27824b95cbbbbc (diff) | |
download | rabbitmq-server-git-486f3adb6bab6ec19c312828c2746ffd69c07a4f.tar.gz |
Merge pull request #2268 from rabbitmq/rabbitmq-management-782v3.8.3
Allow only one rebalance operation to happen at a time
(cherry picked from commit f05114e8651ac1aebe5841d2d71bd64744391119)
-rw-r--r-- | src/rabbit_amqqueue.erl | 36 | ||||
-rw-r--r-- | test/dynamic_ha_SUITE.erl | 25 |
2 files changed, 55 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 864bce83a0..f81c36829b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -514,9 +514,29 @@ not_found_or_absent_dirty(Name) -> {ok, Q} -> {absent, Q, nodedown} end. +-spec get_rebalance_lock(pid()) -> + {true, {rebalance_queues, pid()}} | false. +get_rebalance_lock(Pid) when is_pid(Pid) -> + Id = {rebalance_queues, Pid}, + Nodes = [node()|nodes()], + %% Note that we're not re-trying. We want to immediately know + %% if a re-balance is taking place and stop accordingly. + case global:set_lock(Id, Nodes, 0) of + true -> + {true, Id}; + false -> + false + end. + -spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) -> - {ok, [{node(), pos_integer()}]}. + {ok, [{node(), pos_integer()}]} | {error, term()}. rebalance(Type, VhostSpec, QueueSpec) -> + %% We have not yet acquired the rebalance_queues global lock. + maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec). + +maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) -> + rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'", + [Type, VhostSpec, QueueSpec]), Running = rabbit_mnesia:cluster_nodes(running), NumRunning = length(Running), ToRebalance = [Q || Q <- rabbit_amqqueue:list(), @@ -527,11 +547,17 @@ rebalance(Type, VhostSpec, QueueSpec) -> NumToRebalance = length(ToRebalance), ByNode = group_by_node(ToRebalance), Rem = case (NumToRebalance rem NumRunning) of - 0 -> 0; - _ -> 1 - end, + 0 -> 0; + _ -> 1 + end, MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem, - iterative_rebalance(ByNode, MaxQueuesDesired). + Result = iterative_rebalance(ByNode, MaxQueuesDesired), + global:del_lock(Id), + rabbit_log:info("Finished queue rebalance operation"), + Result; +maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) -> + rabbit_log:warning("Queue rebalance operation is in progress, please wait."), + {error, rebalance_in_progress}. filter_per_type(all, _) -> true; diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 594aae2911..e7a9b41642 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -74,7 +74,8 @@ groups() -> queue_survive_adding_dead_vhost_mirror, rebalance_all, rebalance_exactly, - rebalance_nodes + rebalance_nodes, + rebalance_multiple_blocked % FIXME: Re-enable those tests when the know issues are % fixed. % failing_random_policies, @@ -666,6 +667,28 @@ rebalance_nodes(Config) -> ok. +rebalance_multiple_blocked(Config) -> + [A, _, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + Q1 = <<"q1">>, + Q2 = <<"q2">>, + Q3 = <<"q3">>, + Q4 = <<"q4">>, + Q5 = <<"q5">>, + amqp_channel:call(ACh, #'queue.declare'{queue = Q1}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q2}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q3}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q4}), + amqp_channel:call(ACh, #'queue.declare'{queue = Q5}), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q3, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q4, A)))), + ?assertEqual(A, node(proplists:get_value(pid, find_queue(Q5, A)))), + true = rpc:cast(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]), + {error, rebalance_in_progress} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]), + ok. + %%---------------------------------------------------------------------------- assert_slaves(RPCNode, QName, Exp) -> |