diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-06-21 16:22:48 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-06-21 16:22:48 +0100 |
commit | 90d6060a8a108609d45163792412c09d7ce25c72 (patch) | |
tree | 6207a12d9b49d76dd5c103aa52d6af4dd289ea41 | |
parent | 30f4103decf18fd4163aedccf35760eb19c4718a (diff) | |
download | rabbitmq-server-90d6060a8a108609d45163792412c09d7ce25c72.tar.gz |
Sort out (everywhere except mirror modules) slave pids and mirror nodes and deal with non-local queue declaration, and introduce ha policy and ha policy params. (does not compile)
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 76 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 | ||||
-rw-r--r-- | src/rabbit_amqqueue_sup.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 24 | ||||
-rw-r--r-- | src/rabbit_router.erl | 4 | ||||
-rw-r--r-- | src/rabbit_types.erl | 3 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 11 |
8 files changed, 86 insertions, 61 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 0a202c5e..00b7e6e9 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -46,7 +46,7 @@ -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, mirror_pids}). + arguments, pid, slave_pids, mirror_nodes}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e5c53620..0b25a4e0 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -193,19 +193,21 @@ find_durable_queues() -> end). recover_durable_queues(DurableQueues) -> - Qs = [start_queue_process(Q) || Q <- DurableQueues], + Qs = [start_queue_process(node(), Q) || Q <- DurableQueues], [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs, gen_server2:call(Pid, {init, true}, infinity) == {new, Q}]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q = start_queue_process(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - mirror_pids = []}), + {Node, MNodes} = determine_queue_nodes(Args), + Q = start_queue_process(Node, #amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + mirror_nodes = MNodes}), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 @@ -243,8 +245,25 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -start_queue_process(Q) -> - {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]), +determine_queue_nodes(Args) -> + Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), + PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), + case {Policy, PolicyParams} of + {{_Type, <<"nodes">>}, {array, Nodes}} -> + case [list_to_atom(binary_to_list(Node)) || + {longstr, Node} <- Nodes] of + [] -> {node(), undefined}; + [Node] -> {Node, undefined}; + [First | Rest] -> {First, Rest} + end; + {{_Type, <<"all">>}, _} -> + {node(), all}; + _ -> + {node(), undefined} + end. + +start_queue_process(Node, Q) -> + {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> @@ -260,7 +279,7 @@ lookup(Name) -> with(Name, F, E) -> case lookup(Name) of - {ok, Q = #amqqueue{mirror_pids = []}} -> + {ok, Q = #amqqueue{slave_pids = []}} -> rabbit_misc:with_exit_handler(E, fun () -> F(Q) end); {ok, Q} -> E1 = fun () -> timer:sleep(25), with(Name, F, E) end, @@ -305,7 +324,7 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, RequiredArgs) -> rabbit_misc:assert_args_equivalence( Args, RequiredArgs, QueueName, - [<<"x-expires">>, <<"x-message-ttl">>, <<"x-mirror">>]). + [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). check_declare_arguments(QueueName, Args) -> [case Fun(rabbit_misc:table_lookup(Args, Key)) of @@ -317,7 +336,7 @@ check_declare_arguments(QueueName, Args) -> end || {Key, Fun} <- [{<<"x-expires">>, fun check_integer_argument/1}, {<<"x-message-ttl">>, fun check_integer_argument/1}, - {<<"x-mirror">>, fun check_array_of_longstr_argument/1}]], + {<<"x-ha-policy">>, fun check_ha_policy_argument/1}]], ok. check_integer_argument(undefined) -> @@ -330,16 +349,14 @@ check_integer_argument({Type, Val}) when Val > 0 -> check_integer_argument({_Type, Val}) -> {error, {value_zero_or_less, Val}}. -check_array_of_longstr_argument(undefined) -> +check_ha_policy_argument(undefined) -> ok; -check_array_of_longstr_argument({array, Array}) -> - case lists:all(fun ({longstr, _NodeName}) -> true; - (_) -> false - end, Array) of - true -> ok; - false -> {error, {array_contains_non_longstrs, Array}} - end; -check_array_of_longstr_argument({Type, _Val}) -> +check_ha_policy_argument({longstr, Policy}) + when Policy =:= <<"nodes">> orelse Policy =:= <<"all">> -> + ok; +check_ha_policy_argument({longstr, Policy}) -> + {error, {invalid_ha_policy, Policy}}; +check_ha_policy_argument({Type, _}) -> {error, {unacceptable_type, Type}}. list(VHostPath) -> @@ -497,7 +514,7 @@ on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || #amqqueue{name = QueueName, pid = Pid, - mirror_pids = []} + slave_pids = []} <- mnesia:table(rabbit_queue), node(Pid) == Node])), rabbit_binding:process_deletions( @@ -510,12 +527,13 @@ delete_queue(QueueName) -> rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> - #amqqueue{name = QueueName, - durable = false, - auto_delete = false, - arguments = [], - pid = Pid, - mirror_pids = []}. + #amqqueue{name = QueueName, + durable = false, + auto_delete = false, + arguments = [], + pid = Pid, + slave_pids = [], + mirror_nodes = undefined}. safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e7cb67a2..6d0f7f25 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -75,7 +75,7 @@ consumers, memory, backing_queue_status, - mirror_pids + slave_pids ]). -define(CREATION_EVENT_KEYS, @@ -84,7 +84,8 @@ durable, auto_delete, arguments, - owner_pid + owner_pid, + mirror_nodes ]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -101,7 +102,8 @@ init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, #q{q = Q#amqqueue{pid = self()}, + {ok, #q{q = Q#amqqueue{pid = self(), + mirror_nodes = MirrorNodes}, exclusive_consumer = none, has_had_consumers = false, backing_queue = backing_queue_module(Q), @@ -257,10 +259,10 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> end. backing_queue_module(#amqqueue{arguments = Args}) -> - case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of + case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of undefined -> {ok, BQM} = application:get_env(backing_queue_module), BQM; - _Nodes -> rabbit_mirror_queue_master + _Policy -> rabbit_mirror_queue_master end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> @@ -803,9 +805,12 @@ i(memory, _) -> M; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); -i(mirror_pids, #q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{mirror_pids = MPids}} = rabbit_amqqueue:lookup(Name), - MPids; +i(slave_pids, #q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), + SPids; +i(mirror_nodes, #q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{mirror_nodes = MNodes}} = rabbit_amqqueue:lookup(Name), + MNodes; i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 1344956e..2c28adce 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor2). --export([start_link/0, start_child/1]). +-export([start_link/0, start_child/2]). -export([init/1]). @@ -29,8 +29,8 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). -start_child(Args) -> - supervisor2:start_child(?SERVER, Args). +start_child(Node, Args) -> + supervisor2:start_child({?SERVER, Node}, Args). init([]) -> {ok, {{simple_one_for_one_terminate, 10, 10}, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 046d3380..94402d28 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -34,20 +34,20 @@ remove_from_queue(QueueName, DeadPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - mirror_pids = MPids }] -> - [QPid1 | MPids1] = - [Pid || Pid <- [QPid | MPids], + [Q = #amqqueue { pid = QPid, + slave_pids = SPids }] -> + [QPid1 | SPids1] = + [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), DeadNodes)], - case {{QPid, MPids}, {QPid1, MPids1}} of + case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> ok; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or master %% has changed to become us! - Q1 = Q #amqqueue { pid = QPid1, - mirror_pids = MPids1 }, + Q1 = Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }, ok = rabbit_amqqueue:store_queue(Q1); _ -> %% Master has changed, and we're not it, @@ -91,11 +91,11 @@ drop_slave(VHostPath, QueueName, MirrorNode) -> drop_slave(Queue, MirrorNode) -> if_mirrored_queue( Queue, - fun (#amqqueue { name = Name, pid = QPid, mirror_pids = MPids }) -> - case [Pid || Pid <- [QPid | MPids], node(Pid) =:= MirrorNode] of + fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> {error, {queue_not_mirrored_on_node, MirrorNode}}; - [QPid | MPids] -> + [QPid | SPids] -> {error, cannot_drop_only_mirror}; [Pid] -> rabbit_log:info("Dropping slave node on ~p for ~s~n", @@ -111,8 +111,8 @@ add_slave(VHostPath, QueueName, MirrorNode) -> add_slave(Queue, MirrorNode) -> if_mirrored_queue( Queue, - fun (#amqqueue { name = Name, pid = QPid, mirror_pids = MPids } = Q) -> - case [Pid || Pid <- [QPid | MPids], node(Pid) =:= MirrorNode] of + fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> Result = rabbit_mirror_queue_slave_sup:start_child( MirrorNode, [Q]), rabbit_log:info( diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index b1d940d2..26780676 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -110,8 +110,8 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. lookup_qpids(QNames) -> lists:foldl(fun (QName, QPids) -> case mnesia:dirty_read({rabbit_queue, QName}) of - [#amqqueue{pid = QPid, mirror_pids = MPids}] -> - MPids ++ [QPid | QPids]; + [#amqqueue{pid = QPid, slave_pids = SPids}] -> + SPids ++ [QPid | QPids]; [] -> QPids end diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 22204100..03b2c9e8 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -125,7 +125,8 @@ exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), pid :: rabbit_types:maybe(pid()), - mirror_pids :: [pid()]}). + slave_pids :: [pid()], + mirror_nodes :: [node()] | 'undefined' | 'all'}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b4ac3328..ac2c378c 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -30,7 +30,7 @@ -rabbit_upgrade({exchange_event_serial, mnesia, []}). -rabbit_upgrade({trace_exchanges, mnesia, []}). -rabbit_upgrade({user_admin_to_tags, mnesia, [user_to_internal_user]}). --rabbit_upgrade({mirror_pids, mnesia, []}). +-rabbit_upgrade({ha_mirrors, mnesia, []}). -rabbit_upgrade({gm, mnesia, []}). %% ------------------------------------------------------------------- @@ -47,7 +47,7 @@ -spec(exchange_event_serial/0 :: () -> 'ok'). -spec(trace_exchanges/0 :: () -> 'ok'). -spec(user_admin_to_tags/0 :: () -> 'ok'). --spec(mirror_pids/0 :: () -> 'ok'). +-spec(ha_mirrors/0 :: () -> 'ok'). -spec(gm/0 :: () -> 'ok'). -endif. @@ -137,16 +137,17 @@ user_admin_to_tags() -> end, [username, password_hash, tags], internal_user). -mirror_pids() -> +ha_mirrors() -> Tables = [rabbit_queue, rabbit_durable_queue], AddMirrorPidsFun = fun ({amqqueue, Name, Durable, AutoDelete, Owner, Arguments, Pid}) -> - {amqqueue, Name, Durable, AutoDelete, Owner, Arguments, Pid, []} + {amqqueue, Name, Durable, AutoDelete, Owner, Arguments, Pid, + [], undefined} end, [ ok = transform(T, AddMirrorPidsFun, [name, durable, auto_delete, exclusive_owner, arguments, - pid, mirror_pids]) + pid, slave_pids, mirror_nodes]) || T <- Tables ], ok. |