summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-06-21 16:22:48 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-06-21 16:22:48 +0100
commit90d6060a8a108609d45163792412c09d7ce25c72 (patch)
tree6207a12d9b49d76dd5c103aa52d6af4dd289ea41
parent30f4103decf18fd4163aedccf35760eb19c4718a (diff)
downloadrabbitmq-server-90d6060a8a108609d45163792412c09d7ce25c72.tar.gz
Sort out (everywhere except mirror modules) slave pids and mirror nodes and deal with non-local queue declaration, and introduce ha policy and ha policy params. (does not compile)
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue.erl76
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_amqqueue_sup.erl6
-rw-r--r--src/rabbit_mirror_queue_misc.erl24
-rw-r--r--src/rabbit_router.erl4
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_upgrade_functions.erl11
8 files changed, 86 insertions, 61 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 0a202c5e..00b7e6e9 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -46,7 +46,7 @@
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, mirror_pids}).
+ arguments, pid, slave_pids, mirror_nodes}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index e5c53620..0b25a4e0 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -193,19 +193,21 @@ find_durable_queues() ->
end).
recover_durable_queues(DurableQueues) ->
- Qs = [start_queue_process(Q) || Q <- DurableQueues],
+ Qs = [start_queue_process(node(), Q) || Q <- DurableQueues],
[QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs,
gen_server2:call(Pid, {init, true}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q = start_queue_process(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- mirror_pids = []}),
+ {Node, MNodes} = determine_queue_nodes(Args),
+ Q = start_queue_process(Node, #amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ mirror_nodes = MNodes}),
case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
@@ -243,8 +245,25 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-start_queue_process(Q) ->
- {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]),
+determine_queue_nodes(Args) ->
+ Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>),
+ PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>),
+ case {Policy, PolicyParams} of
+ {{_Type, <<"nodes">>}, {array, Nodes}} ->
+ case [list_to_atom(binary_to_list(Node)) ||
+ {longstr, Node} <- Nodes] of
+ [] -> {node(), undefined};
+ [Node] -> {Node, undefined};
+ [First | Rest] -> {First, Rest}
+ end;
+ {{_Type, <<"all">>}, _} ->
+ {node(), all};
+ _ ->
+ {node(), undefined}
+ end.
+
+start_queue_process(Node, Q) ->
+ {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
@@ -260,7 +279,7 @@ lookup(Name) ->
with(Name, F, E) ->
case lookup(Name) of
- {ok, Q = #amqqueue{mirror_pids = []}} ->
+ {ok, Q = #amqqueue{slave_pids = []}} ->
rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
{ok, Q} ->
E1 = fun () -> timer:sleep(25), with(Name, F, E) end,
@@ -305,7 +324,7 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
RequiredArgs) ->
rabbit_misc:assert_args_equivalence(
Args, RequiredArgs, QueueName,
- [<<"x-expires">>, <<"x-message-ttl">>, <<"x-mirror">>]).
+ [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]).
check_declare_arguments(QueueName, Args) ->
[case Fun(rabbit_misc:table_lookup(Args, Key)) of
@@ -317,7 +336,7 @@ check_declare_arguments(QueueName, Args) ->
end || {Key, Fun} <-
[{<<"x-expires">>, fun check_integer_argument/1},
{<<"x-message-ttl">>, fun check_integer_argument/1},
- {<<"x-mirror">>, fun check_array_of_longstr_argument/1}]],
+ {<<"x-ha-policy">>, fun check_ha_policy_argument/1}]],
ok.
check_integer_argument(undefined) ->
@@ -330,16 +349,14 @@ check_integer_argument({Type, Val}) when Val > 0 ->
check_integer_argument({_Type, Val}) ->
{error, {value_zero_or_less, Val}}.
-check_array_of_longstr_argument(undefined) ->
+check_ha_policy_argument(undefined) ->
ok;
-check_array_of_longstr_argument({array, Array}) ->
- case lists:all(fun ({longstr, _NodeName}) -> true;
- (_) -> false
- end, Array) of
- true -> ok;
- false -> {error, {array_contains_non_longstrs, Array}}
- end;
-check_array_of_longstr_argument({Type, _Val}) ->
+check_ha_policy_argument({longstr, Policy})
+ when Policy =:= <<"nodes">> orelse Policy =:= <<"all">> ->
+ ok;
+check_ha_policy_argument({longstr, Policy}) ->
+ {error, {invalid_ha_policy, Policy}};
+check_ha_policy_argument({Type, _}) ->
{error, {unacceptable_type, Type}}.
list(VHostPath) ->
@@ -497,7 +514,7 @@ on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) ||
#amqqueue{name = QueueName, pid = Pid,
- mirror_pids = []}
+ slave_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node])),
rabbit_binding:process_deletions(
@@ -510,12 +527,13 @@ delete_queue(QueueName) ->
rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
- auto_delete = false,
- arguments = [],
- pid = Pid,
- mirror_pids = []}.
+ #amqqueue{name = QueueName,
+ durable = false,
+ auto_delete = false,
+ arguments = [],
+ pid = Pid,
+ slave_pids = [],
+ mirror_nodes = undefined}.
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e7cb67a2..6d0f7f25 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -75,7 +75,7 @@
consumers,
memory,
backing_queue_status,
- mirror_pids
+ slave_pids
]).
-define(CREATION_EVENT_KEYS,
@@ -84,7 +84,8 @@
durable,
auto_delete,
arguments,
- owner_pid
+ owner_pid,
+ mirror_nodes
]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -101,7 +102,8 @@ init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, #q{q = Q#amqqueue{pid = self()},
+ {ok, #q{q = Q#amqqueue{pid = self(),
+ mirror_nodes = MirrorNodes},
exclusive_consumer = none,
has_had_consumers = false,
backing_queue = backing_queue_module(Q),
@@ -257,10 +259,10 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
end.
backing_queue_module(#amqqueue{arguments = Args}) ->
- case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
+ case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of
undefined -> {ok, BQM} = application:get_env(backing_queue_module),
BQM;
- _Nodes -> rabbit_mirror_queue_master
+ _Policy -> rabbit_mirror_queue_master
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
@@ -803,9 +805,12 @@ i(memory, _) ->
M;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
-i(mirror_pids, #q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{mirror_pids = MPids}} = rabbit_amqqueue:lookup(Name),
- MPids;
+i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
+ SPids;
+i(mirror_nodes, #q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_nodes = MNodes}} = rabbit_amqqueue:lookup(Name),
+ MNodes;
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 1344956e..2c28adce 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor2).
--export([start_link/0, start_child/1]).
+-export([start_link/0, start_child/2]).
-export([init/1]).
@@ -29,8 +29,8 @@
start_link() ->
supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-start_child(Args) ->
- supervisor2:start_child(?SERVER, Args).
+start_child(Node, Args) ->
+ supervisor2:start_child({?SERVER, Node}, Args).
init([]) ->
{ok, {{simple_one_for_one_terminate, 10, 10},
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 046d3380..94402d28 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -34,20 +34,20 @@ remove_from_queue(QueueName, DeadPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- mirror_pids = MPids }] ->
- [QPid1 | MPids1] =
- [Pid || Pid <- [QPid | MPids],
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids }] ->
+ [QPid1 | SPids1] =
+ [Pid || Pid <- [QPid | SPids],
not lists:member(node(Pid), DeadNodes)],
- case {{QPid, MPids}, {QPid1, MPids1}} of
+ case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
ok;
_ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or master
%% has changed to become us!
- Q1 = Q #amqqueue { pid = QPid1,
- mirror_pids = MPids1 },
+ Q1 = Q #amqqueue { pid = QPid1,
+ slave_pids = SPids1 },
ok = rabbit_amqqueue:store_queue(Q1);
_ ->
%% Master has changed, and we're not it,
@@ -91,11 +91,11 @@ drop_slave(VHostPath, QueueName, MirrorNode) ->
drop_slave(Queue, MirrorNode) ->
if_mirrored_queue(
Queue,
- fun (#amqqueue { name = Name, pid = QPid, mirror_pids = MPids }) ->
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= MirrorNode] of
+ fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) ->
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] ->
{error, {queue_not_mirrored_on_node, MirrorNode}};
- [QPid | MPids] ->
+ [QPid | SPids] ->
{error, cannot_drop_only_mirror};
[Pid] ->
rabbit_log:info("Dropping slave node on ~p for ~s~n",
@@ -111,8 +111,8 @@ add_slave(VHostPath, QueueName, MirrorNode) ->
add_slave(Queue, MirrorNode) ->
if_mirrored_queue(
Queue,
- fun (#amqqueue { name = Name, pid = QPid, mirror_pids = MPids } = Q) ->
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= MirrorNode] of
+ fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] -> Result = rabbit_mirror_queue_slave_sup:start_child(
MirrorNode, [Q]),
rabbit_log:info(
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index b1d940d2..26780676 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -110,8 +110,8 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
lookup_qpids(QNames) ->
lists:foldl(fun (QName, QPids) ->
case mnesia:dirty_read({rabbit_queue, QName}) of
- [#amqqueue{pid = QPid, mirror_pids = MPids}] ->
- MPids ++ [QPid | QPids];
+ [#amqqueue{pid = QPid, slave_pids = SPids}] ->
+ SPids ++ [QPid | QPids];
[] ->
QPids
end
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 22204100..03b2c9e8 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -125,7 +125,8 @@
exclusive_owner :: rabbit_types:maybe(pid()),
arguments :: rabbit_framing:amqp_table(),
pid :: rabbit_types:maybe(pid()),
- mirror_pids :: [pid()]}).
+ slave_pids :: [pid()],
+ mirror_nodes :: [node()] | 'undefined' | 'all'}).
-type(exchange() ::
#exchange{name :: rabbit_exchange:name(),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b4ac3328..ac2c378c 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -30,7 +30,7 @@
-rabbit_upgrade({exchange_event_serial, mnesia, []}).
-rabbit_upgrade({trace_exchanges, mnesia, []}).
-rabbit_upgrade({user_admin_to_tags, mnesia, [user_to_internal_user]}).
--rabbit_upgrade({mirror_pids, mnesia, []}).
+-rabbit_upgrade({ha_mirrors, mnesia, []}).
-rabbit_upgrade({gm, mnesia, []}).
%% -------------------------------------------------------------------
@@ -47,7 +47,7 @@
-spec(exchange_event_serial/0 :: () -> 'ok').
-spec(trace_exchanges/0 :: () -> 'ok').
-spec(user_admin_to_tags/0 :: () -> 'ok').
--spec(mirror_pids/0 :: () -> 'ok').
+-spec(ha_mirrors/0 :: () -> 'ok').
-spec(gm/0 :: () -> 'ok').
-endif.
@@ -137,16 +137,17 @@ user_admin_to_tags() ->
end,
[username, password_hash, tags], internal_user).
-mirror_pids() ->
+ha_mirrors() ->
Tables = [rabbit_queue, rabbit_durable_queue],
AddMirrorPidsFun =
fun ({amqqueue, Name, Durable, AutoDelete, Owner, Arguments, Pid}) ->
- {amqqueue, Name, Durable, AutoDelete, Owner, Arguments, Pid, []}
+ {amqqueue, Name, Durable, AutoDelete, Owner, Arguments, Pid,
+ [], undefined}
end,
[ ok = transform(T,
AddMirrorPidsFun,
[name, durable, auto_delete, exclusive_owner, arguments,
- pid, mirror_pids])
+ pid, slave_pids, mirror_nodes])
|| T <- Tables ],
ok.