summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl106
1 files changed, 81 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c8703740..bacb1d21 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -27,6 +27,8 @@
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+-export([store_queue/1]).
+
%% internal
-export([internal_declare/2, internal_delete/1,
@@ -191,18 +193,21 @@ find_durable_queues() ->
end).
recover_durable_queues(DurableQueues) ->
- Qs = [start_queue_process(Q) || Q <- DurableQueues],
+ Qs = [start_queue_process(node(), Q) || Q <- DurableQueues],
[QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs,
gen_server2:call(Pid, {init, true}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q = start_queue_process(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none}),
+ {Node, MNodes} = determine_queue_nodes(Args),
+ Q = start_queue_process(Node, #amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ mirror_nodes = MNodes}),
case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
@@ -240,8 +245,24 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-start_queue_process(Q) ->
- {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]),
+determine_queue_nodes(Args) ->
+ Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>),
+ PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>),
+ case {Policy, PolicyParams} of
+ {{_Type, <<"nodes">>}, {array, Nodes}} ->
+ case [list_to_atom(binary_to_list(Node)) ||
+ {longstr, Node} <- Nodes] of
+ [Node] -> {Node, undefined};
+ [First | Rest] -> {First, Rest}
+ end;
+ {{_Type, <<"all">>}, _} ->
+ {node(), all};
+ _ ->
+ {node(), undefined}
+ end.
+
+start_queue_process(Node, Q) ->
+ {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
@@ -257,8 +278,13 @@ lookup(Name) ->
with(Name, F, E) ->
case lookup(Name) of
- {ok, Q} -> rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
- {error, not_found} -> E()
+ {ok, Q = #amqqueue{slave_pids = []}} ->
+ rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
+ {ok, Q} ->
+ E1 = fun () -> timer:sleep(25), with(Name, F, E) end,
+ rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end);
+ {error, not_found} ->
+ E()
end.
with(Name, F) ->
@@ -295,31 +321,58 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
RequiredArgs) ->
- rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
- [<<"x-expires">>, <<"x-message-ttl">>]).
+ rabbit_misc:assert_args_equivalence(
+ Args, RequiredArgs, QueueName,
+ [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]).
check_declare_arguments(QueueName, Args) ->
- [case Fun(rabbit_misc:table_lookup(Args, Key)) of
+ [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of
ok -> ok;
{error, Error} -> rabbit_misc:protocol_error(
precondition_failed,
"invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <-
- [{<<"x-expires">>, fun check_integer_argument/1},
- {<<"x-message-ttl">>, fun check_integer_argument/1}]],
+ [{<<"x-expires">>, fun check_integer_argument/2},
+ {<<"x-message-ttl">>, fun check_integer_argument/2},
+ {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]],
ok.
-check_integer_argument(undefined) ->
+check_integer_argument(undefined, _Args) ->
ok;
-check_integer_argument({Type, Val}) when Val > 0 ->
+check_integer_argument({Type, Val}, _Args) when Val > 0 ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
false -> {error, {unacceptable_type, Type}}
end;
-check_integer_argument({_Type, Val}) ->
+check_integer_argument({_Type, Val}, _Args) ->
{error, {value_zero_or_less, Val}}.
+check_ha_policy_argument(undefined, _Args) ->
+ ok;
+check_ha_policy_argument({longstr, <<"all">>}, _Args) ->
+ ok;
+check_ha_policy_argument({longstr, <<"nodes">>}, Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of
+ undefined ->
+ {error, {require, 'x-ha-policy-params'}};
+ {array, []} ->
+ {error, {require_non_empty_list_of_nodes_for_ha}};
+ {array, Ary} ->
+ case lists:all(fun ({longstr, _Node}) -> true;
+ (_ ) -> false
+ end, Ary) of
+ true -> ok;
+ false -> {error, {require_node_list_as_longstrs_for_ha, Ary}}
+ end;
+ {Type, _} ->
+ {error, {ha_nodes_policy_params_not_array_of_longstr, Type}}
+ end;
+check_ha_policy_argument({longstr, Policy}, _Args) ->
+ {error, {invalid_ha_policy, Policy}};
+check_ha_policy_argument({Type, _}, _Args) ->
+ {error, {unacceptable_type, Type}}.
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
@@ -474,7 +527,8 @@ drop_expired(QPid) ->
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
+ #amqqueue{name = QueueName, pid = Pid,
+ slave_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node])),
rabbit_binding:process_deletions(
@@ -487,11 +541,13 @@ delete_queue(QueueName) ->
rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
- auto_delete = false,
- arguments = [],
- pid = Pid}.
+ #amqqueue{name = QueueName,
+ durable = false,
+ auto_delete = false,
+ arguments = [],
+ pid = Pid,
+ slave_pids = [],
+ mirror_nodes = undefined}.
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->