diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-27 15:47:30 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-27 15:47:30 +0000 |
commit | dfdd5dca117e1514c4263183de5739b6ce4fc4ec (patch) | |
tree | 61d30b86bc9a38bd1e04e20ee5346eb9b03de44d | |
parent | 5ef6c3c22b9507928cd48a9c1df109c35451dc08 (diff) | |
download | rabbitmq-server-bug26082.tar.gz |
Allow specification of the node to use for the queue, all else being equal.bug26082
-rw-r--r-- | src/rabbit_amqqueue.erl | 16 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 24 |
2 files changed, 30 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 757f18ac..d38f8191 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -16,7 +16,7 @@ -module(rabbit_amqqueue). --export([recover/0, stop/0, start/1, declare/5, +-export([recover/0, stop/0, start/1, declare/5, declare/6, delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). -export([pseudo_queue/2]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, @@ -73,6 +73,11 @@ rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> {'new' | 'existing' | 'absent' | 'owner_died', rabbit_types:amqqueue()} | rabbit_types:channel_exit()). +-spec(declare/6 :: + (name(), boolean(), boolean(), + rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node()) + -> {'new' | 'existing' | 'absent' | 'owner_died', + rabbit_types:amqqueue()} | rabbit_types:channel_exit()). -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). @@ -243,6 +248,13 @@ recover_durable_queues(QueuesAndRecoveryTerms) -> [Q || {_, {new, Q}} <- Results]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> + declare(QueueName, Durable, AutoDelete, Args, Owner, node()). + + +%% The Node argument suggests where the queue (master if mirrored) +%% should be. Note that in some cases (e.g. with "nodes" policy in +%% effect) this might not be possible to satisfy. +declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> ok = check_declare_arguments(QueueName, Args), Q = rabbit_policy:set(#amqqueue{name = QueueName, durable = Durable, @@ -253,7 +265,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> slave_pids = [], sync_slave_pids = [], gm_pids = []}), - {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). internal_declare(Q, true) -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index f1740d14..a2f4eec5 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -18,7 +18,8 @@ -behaviour(rabbit_policy_validator). -export([remove_from_queue/3, on_node_up/0, add_mirrors/3, - report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, + report_deaths/4, store_updated_slaves/1, + initial_queue_node/2, suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2, validate_policy/1, maybe_auto_sync/1, log_info/3, log_warning/3]). @@ -50,6 +51,7 @@ -> 'ok'). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). +-spec(initial_queue_node/2 :: (rabbit_types:amqqueue(), node()) -> node()). -spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> {node(), [node()]}). -spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). @@ -234,16 +236,20 @@ promote_slave([SPid | SPids]) -> %% the one to promote is the oldest. {SPid, SPids}. -suggested_queue_nodes(Q) -> - suggested_queue_nodes(Q, rabbit_mnesia:cluster_nodes(running)). +initial_queue_node(Q, DefNode) -> + {MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, all_nodes()), + MNode. -%% This variant exists so we can pull a call to -%% rabbit_mnesia:cluster_nodes(running) out of a loop or -%% transaction or both. -suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, All) -> +suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, all_nodes()). +suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All). + +%% The third argument exists so we can pull a call to +%% rabbit_mnesia:cluster_nodes(running) out of a loop or transaction +%% or both. +suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, DefNode, All) -> {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q), MNode = case MNode0 of - none -> node(); + none -> DefNode; _ -> MNode0 end, case Owner of @@ -256,6 +262,8 @@ suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, All) -> _ -> {MNode, []} end. +all_nodes() -> rabbit_mnesia:cluster_nodes(running). + policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of undefined -> none; |