summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-03-27 15:47:30 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-03-27 15:47:30 +0000
commitdfdd5dca117e1514c4263183de5739b6ce4fc4ec (patch)
tree61d30b86bc9a38bd1e04e20ee5346eb9b03de44d
parent5ef6c3c22b9507928cd48a9c1df109c35451dc08 (diff)
downloadrabbitmq-server-bug26082.tar.gz
Allow specification of the node to use for the queue, all else being equal.bug26082
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_mirror_queue_misc.erl24
2 files changed, 30 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 757f18ac..d38f8191 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -16,7 +16,7 @@
-module(rabbit_amqqueue).
--export([recover/0, stop/0, start/1, declare/5,
+-export([recover/0, stop/0, start/1, declare/5, declare/6,
delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
-export([pseudo_queue/2]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
@@ -73,6 +73,11 @@
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
-> {'new' | 'existing' | 'absent' | 'owner_died',
rabbit_types:amqqueue()} | rabbit_types:channel_exit()).
+-spec(declare/6 ::
+ (name(), boolean(), boolean(),
+ rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node())
+ -> {'new' | 'existing' | 'absent' | 'owner_died',
+ rabbit_types:amqqueue()} | rabbit_types:channel_exit()).
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
-> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
@@ -243,6 +248,13 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
[Q || {_, {new, Q}} <- Results].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
+ declare(QueueName, Durable, AutoDelete, Args, Owner, node()).
+
+
+%% The Node argument suggests where the queue (master if mirrored)
+%% should be. Note that in some cases (e.g. with "nodes" policy in
+%% effect) this might not be possible to satisfy.
+declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
ok = check_declare_arguments(QueueName, Args),
Q = rabbit_policy:set(#amqqueue{name = QueueName,
durable = Durable,
@@ -253,7 +265,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
slave_pids = [],
sync_slave_pids = [],
gm_pids = []}),
- {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
internal_declare(Q, true) ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index f1740d14..a2f4eec5 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -18,7 +18,8 @@
-behaviour(rabbit_policy_validator).
-export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
- report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
+ report_deaths/4, store_updated_slaves/1,
+ initial_queue_node/2, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, validate_policy/1,
maybe_auto_sync/1, log_info/3, log_warning/3]).
@@ -50,6 +51,7 @@
-> 'ok').
-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
rabbit_types:amqqueue()).
+-spec(initial_queue_node/2 :: (rabbit_types:amqqueue(), node()) -> node()).
-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) ->
{node(), [node()]}).
-spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()).
@@ -234,16 +236,20 @@ promote_slave([SPid | SPids]) ->
%% the one to promote is the oldest.
{SPid, SPids}.
-suggested_queue_nodes(Q) ->
- suggested_queue_nodes(Q, rabbit_mnesia:cluster_nodes(running)).
+initial_queue_node(Q, DefNode) ->
+ {MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, all_nodes()),
+ MNode.
-%% This variant exists so we can pull a call to
-%% rabbit_mnesia:cluster_nodes(running) out of a loop or
-%% transaction or both.
-suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, All) ->
+suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, all_nodes()).
+suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
+
+%% The third argument exists so we can pull a call to
+%% rabbit_mnesia:cluster_nodes(running) out of a loop or transaction
+%% or both.
+suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, DefNode, All) ->
{MNode0, SNodes, SSNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
- none -> node();
+ none -> DefNode;
_ -> MNode0
end,
case Owner of
@@ -256,6 +262,8 @@ suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, All) ->
_ -> {MNode, []}
end.
+all_nodes() -> rabbit_mnesia:cluster_nodes(running).
+
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
undefined -> none;