diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-07-05 16:00:28 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-07-05 16:00:28 +0100 |
commit | f4e20306ddd516c40e26f3c8faf1e2316d1cc604 (patch) | |
tree | f930b063fc9c466f3857591e6654e2f4b1052b6d | |
parent | a47b7dba6201972a293dd3563466ad29f834e799 (diff) | |
download | rabbitmq-server-f4e20306ddd516c40e26f3c8faf1e2316d1cc604.tar.gz |
Rough sketch of dynamic queue HA-ness:
* Remove #amqqueue.mirror_nodes, we will always derive this from policy
* Remove everything to do with x-ha-* arguments
* Abstract a bit more stuff into rabbit_mirror_queue_misc
* Add a new "at-least" mode
This works! Sort of. Changing policies on the fly, changing HAness altogether, having "at-least" set up a new mirror when one disappears, and probably some other things do not work.
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 54 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 10 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 69 | ||||
-rw-r--r-- | src/rabbit_types.erl | 3 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 14 |
7 files changed, 89 insertions, 92 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index e8b4a623..7be82aef 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -47,7 +47,7 @@ -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, mirror_nodes, policy}). + arguments, pid, slave_pids, policy}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index afbaea65..0565b1a5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -207,15 +207,15 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - {Node, MNodes} = determine_queue_nodes(Args), - Q = start_queue_process(Node, #amqqueue{name = QueueName, + Node = node(), %% TODO utter rubbish + Q = start_queue_process( + Node, rabbit_policy:set(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, arguments = Args, exclusive_owner = Owner, pid = none, - slave_pids = [], - mirror_nodes = MNodes}), + slave_pids = []})), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 @@ -270,22 +270,6 @@ store_queue(Q = #amqqueue{durable = false}) -> policy_changed(_Q1, _Q2) -> ok. -determine_queue_nodes(Args) -> - Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), - PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), - case {Policy, PolicyParams} of - {{_Type, <<"nodes">>}, {array, Nodes}} -> - case [list_to_atom(binary_to_list(Node)) || - {longstr, Node} <- Nodes] of - [Node] -> {Node, undefined}; - [First | Rest] -> {First, [First | Rest]} - end; - {{_Type, <<"all">>}, _} -> - {node(), all}; - _ -> - {node(), undefined} - end. - start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), Q#amqqueue{pid = Pid}. @@ -351,13 +335,11 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, RequiredArgs) -> rabbit_misc:assert_args_equivalence( - Args, RequiredArgs, QueueName, - [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). + Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]). check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, - {<<"x-ha-policy">>, fun check_ha_policy_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], [case rabbit_misc:table_lookup(Args, Key) of @@ -406,29 +388,6 @@ check_dlxrk_arg({longstr, _}, Args) -> check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. -check_ha_policy_arg({longstr, <<"all">>}, _Args) -> - ok; -check_ha_policy_arg({longstr, <<"nodes">>}, Args) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of - undefined -> - {error, {require, 'x-ha-policy-params'}}; - {array, []} -> - {error, {require_non_empty_list_of_nodes_for_ha}}; - {array, Ary} -> - case lists:all(fun ({longstr, _Node}) -> true; - (_ ) -> false - end, Ary) of - true -> ok; - false -> {error, {require_node_list_as_longstrs_for_ha, Ary}} - end; - {Type, _} -> - {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} - end; -check_ha_policy_arg({longstr, Policy}, _Args) -> - {error, {invalid_ha_policy, Policy}}; -check_ha_policy_arg({Type, _}, _Args) -> - {error, {unacceptable_type, Type}}. - list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). @@ -625,8 +584,7 @@ pseudo_queue(QueueName, Pid) -> auto_delete = false, arguments = [], pid = Pid, - slave_pids = [], - mirror_nodes = undefined}. + slave_pids = []}. deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> %% /dev/null optimisation diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8933de87..71f8aacd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -230,8 +230,7 @@ matches(false, Q1, Q2) -> Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso - Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes. + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids. bq_init(BQ, Q, Recover) -> Self = self(), @@ -296,11 +295,11 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. -backing_queue_module(#amqqueue{arguments = Args}) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of - undefined -> {ok, BQM} = application:get_env(backing_queue_module), - BQM; - _Policy -> rabbit_mirror_queue_master +backing_queue_module(Q) -> + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> {ok, BQM} = application:get_env(backing_queue_module), + BQM; + true -> rabbit_mirror_queue_master end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> @@ -906,11 +905,11 @@ infos(Items, State) -> Prefix ++ [{Item, i(Item, State)} || Item <- (Items1 -- [synchronised_slave_pids])]. -slaves_status(#q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> +slaves_status(#q{q = Q}) -> + case rabbit_mirror_queue_misc:slave_pids(Q) of + not_mirrored -> %% TODO do we need this branch? [{slave_pids, ''}, {synchronised_slave_pids, ''}]; - {ok, #amqqueue{slave_pids = SPids}} -> + SPids -> {Results, _Bad} = delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1), {SPids1, SSPids} = @@ -955,10 +954,10 @@ i(consumers, _) -> i(memory, _) -> {memory, M} = process_info(self(), memory), M; -i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> []; - {ok, #amqqueue{slave_pids = SPids}} -> SPids +i(slave_pids, #q{q = Q}) -> + case rabbit_mirror_queue_misc:slave_pids(Q) of + not_mirrored -> []; + SPids -> SPids end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 750bcd56..899c31b7 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -82,17 +82,13 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, +init(#amqqueue { name = QName } = Q, Recover, AsyncCallback) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - MNodes1 = - (case MNodes of - all -> rabbit_mnesia:all_clustered_nodes(); - undefined -> []; - _ -> MNodes - end) -- [node()], + {_MNode, MNodes} = rabbit_mirror_queue_misc:determine_queue_nodes(Q), + MNodes1 = MNodes -- [node()], [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 180677fe..876257b8 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -20,6 +20,10 @@ drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, report_deaths/4]). +%% temp +-export([determine_queue_nodes/1, is_mirrored/1, slave_pids/1]). + + -include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -85,24 +89,19 @@ remove_from_queue(QueueName, DeadPids) -> end). on_node_up() -> - Qs = + QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (#amqqueue { mirror_nodes = undefined }, QsN) -> - QsN; - (#amqqueue { name = QName, - mirror_nodes = all }, QsN) -> - [QName | QsN]; - (#amqqueue { name = QName, - mirror_nodes = MNodes }, QsN) -> + fun (Q = #amqqueue{name = QName}, QNames0) -> + {_MNode, MNodes} = determine_queue_nodes(Q), case lists:member(node(), MNodes) of - true -> [QName | QsN]; - false -> QsN + true -> [QName | QNames0]; + false -> QNames0 end end, [], rabbit_queue) end), - [add_mirror(Q, node()) || Q <- Qs], + [add_mirror(QName, node()) || QName <- QNames], ok. drop_mirror(VHostPath, QueueName, MirrorNode) -> @@ -150,14 +149,13 @@ add_mirror(Queue, MirrorNode) -> end end). -if_mirrored_queue(Queue, Fun) -> - rabbit_amqqueue:with( - Queue, fun (#amqqueue { arguments = Args } = Q) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of - undefined -> ok; - _ -> Fun(Q) - end - end). +if_mirrored_queue(QName, Fun) -> + rabbit_amqqueue:with(QName, fun (Q) -> + case is_mirrored(Q) of + false -> ok; + true -> Fun(Q) + end + end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; @@ -172,3 +170,36 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> end, rabbit_misc:pid_to_string(MirrorPid), [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]). + +%%---------------------------------------------------------------------------- + +determine_queue_nodes(Q) -> + case [rabbit_policy:get(P, Q) || P <- [<<"ha-mode">>, <<"ha-params">>]] of + [{ok, <<"all">>}, _] -> + {node(), rabbit_mnesia:all_clustered_nodes()}; + [{ok, <<"nodes">>}, {ok, Nodes}] -> + case [list_to_atom(binary_to_list(Node)) || Node <- Nodes] of + [Node] -> {Node, []}; + [First | Rest] -> {First, [First | Rest]} + end; + [{ok, <<"at-least">>}, {ok, Count}] -> + {node(), lists:sublist(rabbit_mnesia:all_clustered_nodes(), Count)}; + _ -> + {node(), []} + end. + +is_mirrored(Q) -> + case rabbit_policy:get(<<"ha-mode">>, Q) of + {ok, <<"all">>} -> true; + {ok, <<"nodes">>} -> true; + {ok, <<"at-least">>} -> true; + _ -> false + end. + +slave_pids(Q = #amqqueue{name = Name}) -> + case is_mirrored(Q) of + false -> not_mirrored; + true -> {ok, #amqqueue{slave_pids = SPids}} = + rabbit_amqqueue:lookup(Name), + SPids + end. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 732c29b6..adfb19a0 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -118,8 +118,7 @@ exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), pid :: rabbit_types:maybe(pid()), - slave_pids :: [pid()], - mirror_nodes :: [node()] | 'undefined' | 'all'}). + slave_pids :: [pid()]}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 18704807..d024ff63 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -40,6 +40,7 @@ -rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}). -rabbit_upgrade({policy, mnesia, [exchange_scratches, ha_mirrors]}). +-rabbit_upgrade({no_mirror_nodes, mnesia, [policy]}). %% ------------------------------------------------------------------- @@ -62,6 +63,7 @@ -spec(topic_trie_node/0 :: () -> 'ok'). -spec(runtime_parameters/0 :: () -> 'ok'). -spec(policy/0 :: () -> 'ok'). +-spec(no_mirror_nodes/0 :: () -> 'ok'). -endif. @@ -240,6 +242,18 @@ queue_policy(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, mirror_nodes, policy]). +no_mirror_nodes() -> + Tables = [rabbit_queue, rabbit_durable_queue], + RemoveMirrorNodesFun = + fun ({amqqueue, Name, D, AD, O, Args, Pid, SPids, _MNodes, Policy}) -> + {amqqueue, Name, D, AD, O, Args, Pid, SPids, Policy} + end, + [ok = transform(T, RemoveMirrorNodesFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, policy]) + || T <- Tables], + ok. + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |