diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 106 |
1 files changed, 81 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c8703740..bacb1d21 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -27,6 +27,8 @@ -export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). +-export([store_queue/1]). + %% internal -export([internal_declare/2, internal_delete/1, @@ -191,18 +193,21 @@ find_durable_queues() -> end). recover_durable_queues(DurableQueues) -> - Qs = [start_queue_process(Q) || Q <- DurableQueues], + Qs = [start_queue_process(node(), Q) || Q <- DurableQueues], [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs, gen_server2:call(Pid, {init, true}, infinity) == {new, Q}]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q = start_queue_process(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none}), + {Node, MNodes} = determine_queue_nodes(Args), + Q = start_queue_process(Node, #amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + mirror_nodes = MNodes}), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 @@ -240,8 +245,24 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -start_queue_process(Q) -> - {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]), +determine_queue_nodes(Args) -> + Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), + PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), + case {Policy, PolicyParams} of + {{_Type, <<"nodes">>}, {array, Nodes}} -> + case [list_to_atom(binary_to_list(Node)) || + {longstr, Node} <- Nodes] of + [Node] -> {Node, undefined}; + [First | Rest] -> {First, Rest} + end; + {{_Type, <<"all">>}, _} -> + {node(), all}; + _ -> + {node(), undefined} + end. + +start_queue_process(Node, Q) -> + {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> @@ -257,8 +278,13 @@ lookup(Name) -> with(Name, F, E) -> case lookup(Name) of - {ok, Q} -> rabbit_misc:with_exit_handler(E, fun () -> F(Q) end); - {error, not_found} -> E() + {ok, Q = #amqqueue{slave_pids = []}} -> + rabbit_misc:with_exit_handler(E, fun () -> F(Q) end); + {ok, Q} -> + E1 = fun () -> timer:sleep(25), with(Name, F, E) end, + rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end); + {error, not_found} -> + E() end. with(Name, F) -> @@ -295,31 +321,58 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, RequiredArgs) -> - rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, - [<<"x-expires">>, <<"x-message-ttl">>]). + rabbit_misc:assert_args_equivalence( + Args, RequiredArgs, QueueName, + [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). check_declare_arguments(QueueName, Args) -> - [case Fun(rabbit_misc:table_lookup(Args, Key)) of + [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of ok -> ok; {error, Error} -> rabbit_misc:protocol_error( precondition_failed, "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) end || {Key, Fun} <- - [{<<"x-expires">>, fun check_integer_argument/1}, - {<<"x-message-ttl">>, fun check_integer_argument/1}]], + [{<<"x-expires">>, fun check_integer_argument/2}, + {<<"x-message-ttl">>, fun check_integer_argument/2}, + {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]], ok. -check_integer_argument(undefined) -> +check_integer_argument(undefined, _Args) -> ok; -check_integer_argument({Type, Val}) when Val > 0 -> +check_integer_argument({Type, Val}, _Args) when Val > 0 -> case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; false -> {error, {unacceptable_type, Type}} end; -check_integer_argument({_Type, Val}) -> +check_integer_argument({_Type, Val}, _Args) -> {error, {value_zero_or_less, Val}}. +check_ha_policy_argument(undefined, _Args) -> + ok; +check_ha_policy_argument({longstr, <<"all">>}, _Args) -> + ok; +check_ha_policy_argument({longstr, <<"nodes">>}, Args) -> + case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of + undefined -> + {error, {require, 'x-ha-policy-params'}}; + {array, []} -> + {error, {require_non_empty_list_of_nodes_for_ha}}; + {array, Ary} -> + case lists:all(fun ({longstr, _Node}) -> true; + (_ ) -> false + end, Ary) of + true -> ok; + false -> {error, {require_node_list_as_longstrs_for_ha, Ary}} + end; + {Type, _} -> + {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} + end; +check_ha_policy_argument({longstr, Policy}, _Args) -> + {error, {invalid_ha_policy, Policy}}; +check_ha_policy_argument({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -474,7 +527,8 @@ drop_expired(QPid) -> on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} + #amqqueue{name = QueueName, pid = Pid, + slave_pids = []} <- mnesia:table(rabbit_queue), node(Pid) == Node])), rabbit_binding:process_deletions( @@ -487,11 +541,13 @@ delete_queue(QueueName) -> rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> - #amqqueue{name = QueueName, - durable = false, - auto_delete = false, - arguments = [], - pid = Pid}. + #amqqueue{name = QueueName, + durable = false, + auto_delete = false, + arguments = [], + pid = Pid, + slave_pids = [], + mirror_nodes = undefined}. safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> |