From f4e20306ddd516c40e26f3c8faf1e2316d1cc604 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 5 Jul 2012 16:00:28 +0100 Subject: Rough sketch of dynamic queue HA-ness: * Remove #amqqueue.mirror_nodes, we will always derive this from policy * Remove everything to do with x-ha-* arguments * Abstract a bit more stuff into rabbit_mirror_queue_misc * Add a new "at-least" mode This works! Sort of. Changing policies on the fly, changing HAness altogether, having "at-least" set up a new mirror when one disappears, and probably some other things do not work. --- include/rabbit.hrl | 2 +- src/rabbit_amqqueue.erl | 54 ++++------------------------- src/rabbit_amqqueue_process.erl | 29 ++++++++-------- src/rabbit_mirror_queue_master.erl | 10 ++---- src/rabbit_mirror_queue_misc.erl | 69 +++++++++++++++++++++++++++----------- src/rabbit_types.erl | 3 +- src/rabbit_upgrade_functions.erl | 14 ++++++++ 7 files changed, 89 insertions(+), 92 deletions(-) diff --git a/include/rabbit.hrl b/include/rabbit.hrl index e8b4a623..7be82aef 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -47,7 +47,7 @@ -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, mirror_nodes, policy}). + arguments, pid, slave_pids, policy}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index afbaea65..0565b1a5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -207,15 +207,15 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - {Node, MNodes} = determine_queue_nodes(Args), - Q = start_queue_process(Node, #amqqueue{name = QueueName, + Node = node(), %% TODO utter rubbish + Q = start_queue_process( + Node, rabbit_policy:set(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, arguments = Args, exclusive_owner = Owner, pid = none, - slave_pids = [], - mirror_nodes = MNodes}), + slave_pids = []})), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 @@ -270,22 +270,6 @@ store_queue(Q = #amqqueue{durable = false}) -> policy_changed(_Q1, _Q2) -> ok. -determine_queue_nodes(Args) -> - Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), - PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), - case {Policy, PolicyParams} of - {{_Type, <<"nodes">>}, {array, Nodes}} -> - case [list_to_atom(binary_to_list(Node)) || - {longstr, Node} <- Nodes] of - [Node] -> {Node, undefined}; - [First | Rest] -> {First, [First | Rest]} - end; - {{_Type, <<"all">>}, _} -> - {node(), all}; - _ -> - {node(), undefined} - end. - start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), Q#amqqueue{pid = Pid}. @@ -351,13 +335,11 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, RequiredArgs) -> rabbit_misc:assert_args_equivalence( - Args, RequiredArgs, QueueName, - [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). + Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]). check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, - {<<"x-ha-policy">>, fun check_ha_policy_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], [case rabbit_misc:table_lookup(Args, Key) of @@ -406,29 +388,6 @@ check_dlxrk_arg({longstr, _}, Args) -> check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. -check_ha_policy_arg({longstr, <<"all">>}, _Args) -> - ok; -check_ha_policy_arg({longstr, <<"nodes">>}, Args) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of - undefined -> - {error, {require, 'x-ha-policy-params'}}; - {array, []} -> - {error, {require_non_empty_list_of_nodes_for_ha}}; - {array, Ary} -> - case lists:all(fun ({longstr, _Node}) -> true; - (_ ) -> false - end, Ary) of - true -> ok; - false -> {error, {require_node_list_as_longstrs_for_ha, Ary}} - end; - {Type, _} -> - {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} - end; -check_ha_policy_arg({longstr, Policy}, _Args) -> - {error, {invalid_ha_policy, Policy}}; -check_ha_policy_arg({Type, _}, _Args) -> - {error, {unacceptable_type, Type}}. - list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). @@ -625,8 +584,7 @@ pseudo_queue(QueueName, Pid) -> auto_delete = false, arguments = [], pid = Pid, - slave_pids = [], - mirror_nodes = undefined}. + slave_pids = []}. deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> %% /dev/null optimisation diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8933de87..71f8aacd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -230,8 +230,7 @@ matches(false, Q1, Q2) -> Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso - Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes. + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids. bq_init(BQ, Q, Recover) -> Self = self(), @@ -296,11 +295,11 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. -backing_queue_module(#amqqueue{arguments = Args}) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of - undefined -> {ok, BQM} = application:get_env(backing_queue_module), - BQM; - _Policy -> rabbit_mirror_queue_master +backing_queue_module(Q) -> + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> {ok, BQM} = application:get_env(backing_queue_module), + BQM; + true -> rabbit_mirror_queue_master end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> @@ -906,11 +905,11 @@ infos(Items, State) -> Prefix ++ [{Item, i(Item, State)} || Item <- (Items1 -- [synchronised_slave_pids])]. -slaves_status(#q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> +slaves_status(#q{q = Q}) -> + case rabbit_mirror_queue_misc:slave_pids(Q) of + not_mirrored -> %% TODO do we need this branch? [{slave_pids, ''}, {synchronised_slave_pids, ''}]; - {ok, #amqqueue{slave_pids = SPids}} -> + SPids -> {Results, _Bad} = delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1), {SPids1, SSPids} = @@ -955,10 +954,10 @@ i(consumers, _) -> i(memory, _) -> {memory, M} = process_info(self(), memory), M; -i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> []; - {ok, #amqqueue{slave_pids = SPids}} -> SPids +i(slave_pids, #q{q = Q}) -> + case rabbit_mirror_queue_misc:slave_pids(Q) of + not_mirrored -> []; + SPids -> SPids end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 750bcd56..899c31b7 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -82,17 +82,13 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, +init(#amqqueue { name = QName } = Q, Recover, AsyncCallback) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - MNodes1 = - (case MNodes of - all -> rabbit_mnesia:all_clustered_nodes(); - undefined -> []; - _ -> MNodes - end) -- [node()], + {_MNode, MNodes} = rabbit_mirror_queue_misc:determine_queue_nodes(Q), + MNodes1 = MNodes -- [node()], [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 180677fe..876257b8 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -20,6 +20,10 @@ drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, report_deaths/4]). +%% temp +-export([determine_queue_nodes/1, is_mirrored/1, slave_pids/1]). + + -include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -85,24 +89,19 @@ remove_from_queue(QueueName, DeadPids) -> end). on_node_up() -> - Qs = + QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (#amqqueue { mirror_nodes = undefined }, QsN) -> - QsN; - (#amqqueue { name = QName, - mirror_nodes = all }, QsN) -> - [QName | QsN]; - (#amqqueue { name = QName, - mirror_nodes = MNodes }, QsN) -> + fun (Q = #amqqueue{name = QName}, QNames0) -> + {_MNode, MNodes} = determine_queue_nodes(Q), case lists:member(node(), MNodes) of - true -> [QName | QsN]; - false -> QsN + true -> [QName | QNames0]; + false -> QNames0 end end, [], rabbit_queue) end), - [add_mirror(Q, node()) || Q <- Qs], + [add_mirror(QName, node()) || QName <- QNames], ok. drop_mirror(VHostPath, QueueName, MirrorNode) -> @@ -150,14 +149,13 @@ add_mirror(Queue, MirrorNode) -> end end). -if_mirrored_queue(Queue, Fun) -> - rabbit_amqqueue:with( - Queue, fun (#amqqueue { arguments = Args } = Q) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of - undefined -> ok; - _ -> Fun(Q) - end - end). +if_mirrored_queue(QName, Fun) -> + rabbit_amqqueue:with(QName, fun (Q) -> + case is_mirrored(Q) of + false -> ok; + true -> Fun(Q) + end + end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; @@ -172,3 +170,36 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> end, rabbit_misc:pid_to_string(MirrorPid), [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]). + +%%---------------------------------------------------------------------------- + +determine_queue_nodes(Q) -> + case [rabbit_policy:get(P, Q) || P <- [<<"ha-mode">>, <<"ha-params">>]] of + [{ok, <<"all">>}, _] -> + {node(), rabbit_mnesia:all_clustered_nodes()}; + [{ok, <<"nodes">>}, {ok, Nodes}] -> + case [list_to_atom(binary_to_list(Node)) || Node <- Nodes] of + [Node] -> {Node, []}; + [First | Rest] -> {First, [First | Rest]} + end; + [{ok, <<"at-least">>}, {ok, Count}] -> + {node(), lists:sublist(rabbit_mnesia:all_clustered_nodes(), Count)}; + _ -> + {node(), []} + end. + +is_mirrored(Q) -> + case rabbit_policy:get(<<"ha-mode">>, Q) of + {ok, <<"all">>} -> true; + {ok, <<"nodes">>} -> true; + {ok, <<"at-least">>} -> true; + _ -> false + end. + +slave_pids(Q = #amqqueue{name = Name}) -> + case is_mirrored(Q) of + false -> not_mirrored; + true -> {ok, #amqqueue{slave_pids = SPids}} = + rabbit_amqqueue:lookup(Name), + SPids + end. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 732c29b6..adfb19a0 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -118,8 +118,7 @@ exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), pid :: rabbit_types:maybe(pid()), - slave_pids :: [pid()], - mirror_nodes :: [node()] | 'undefined' | 'all'}). + slave_pids :: [pid()]}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 18704807..d024ff63 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -40,6 +40,7 @@ -rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}). -rabbit_upgrade({policy, mnesia, [exchange_scratches, ha_mirrors]}). +-rabbit_upgrade({no_mirror_nodes, mnesia, [policy]}). %% ------------------------------------------------------------------- @@ -62,6 +63,7 @@ -spec(topic_trie_node/0 :: () -> 'ok'). -spec(runtime_parameters/0 :: () -> 'ok'). -spec(policy/0 :: () -> 'ok'). +-spec(no_mirror_nodes/0 :: () -> 'ok'). -endif. @@ -240,6 +242,18 @@ queue_policy(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, mirror_nodes, policy]). +no_mirror_nodes() -> + Tables = [rabbit_queue, rabbit_durable_queue], + RemoveMirrorNodesFun = + fun ({amqqueue, Name, D, AD, O, Args, Pid, SPids, _MNodes, Policy}) -> + {amqqueue, Name, D, AD, O, Args, Pid, SPids, Policy} + end, + [ok = transform(T, RemoveMirrorNodesFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, policy]) + || T <- Tables], + ok. + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> -- cgit v1.2.1 From a5542de131517694ae09b57e46804670c5050a71 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 6 Jul 2012 13:27:50 +0100 Subject: Rename function, remove utter rubbish. --- src/rabbit_amqqueue.erl | 22 +++++++++++----------- src/rabbit_mirror_queue_master.erl | 2 +- src/rabbit_mirror_queue_misc.erl | 6 +++--- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0565b1a5..b32aefdb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -207,18 +207,18 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Node = node(), %% TODO utter rubbish - Q = start_queue_process( - Node, rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = []})), - case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of + Q0 = rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = []}), + {Node, _MNodes} = rabbit_mirror_queue_misc:queue_nodes(Q0), + Q1 = start_queue_process(Node, Q0), + case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); - Q1 -> Q1 + Q2 -> Q2 end. internal_declare(Q, true) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 899c31b7..50349204 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -87,7 +87,7 @@ init(#amqqueue { name = QName } = Q, Recover, {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - {_MNode, MNodes} = rabbit_mirror_queue_misc:determine_queue_nodes(Q), + {_MNode, MNodes} = rabbit_mirror_queue_misc:queue_nodes(Q), MNodes1 = MNodes -- [node()], [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 876257b8..a84623f6 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -21,7 +21,7 @@ report_deaths/4]). %% temp --export([determine_queue_nodes/1, is_mirrored/1, slave_pids/1]). +-export([queue_nodes/1, is_mirrored/1, slave_pids/1]). -include("rabbit.hrl"). @@ -94,7 +94,7 @@ on_node_up() -> fun () -> mnesia:foldl( fun (Q = #amqqueue{name = QName}, QNames0) -> - {_MNode, MNodes} = determine_queue_nodes(Q), + {_MNode, MNodes} = queue_nodes(Q), case lists:member(node(), MNodes) of true -> [QName | QNames0]; false -> QNames0 @@ -173,7 +173,7 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> %%---------------------------------------------------------------------------- -determine_queue_nodes(Q) -> +queue_nodes(Q) -> case [rabbit_policy:get(P, Q) || P <- [<<"ha-mode">>, <<"ha-params">>]] of [{ok, <<"all">>}, _] -> {node(), rabbit_mnesia:all_clustered_nodes()}; -- cgit v1.2.1 From 40053cb586938328d6c0fcb05de3b0a4da4dd693 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 6 Jul 2012 16:55:54 +0100 Subject: Dynamic change of HA policy. Promotion from non-HA to master, and demotion from master to non-HA as appropriate. There will still be problems if the policy changes such that the master needs to change - I'm not sure we should even allow this. --- src/rabbit_amqqueue.erl | 11 +++++-- src/rabbit_amqqueue_process.erl | 25 +++++++++++++++ src/rabbit_mirror_queue_master.erl | 42 ++++++++++++++++++-------- src/rabbit_mirror_queue_misc.erl | 62 +++++++++++++++++++++++++++++--------- src/rabbit_mirror_queue_slave.erl | 4 ++- 5 files changed, 112 insertions(+), 32 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b32aefdb..be6613a0 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,6 +30,8 @@ -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). +%% temp +-export([start_mirroring/1, stop_mirroring/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -214,7 +216,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> exclusive_owner = Owner, pid = none, slave_pids = []}), - {Node, _MNodes} = rabbit_mirror_queue_misc:queue_nodes(Q0), + {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), Q1 = start_queue_process(Node, Q0), case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); @@ -267,8 +269,8 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -policy_changed(_Q1, _Q2) -> - ok. +policy_changed(Q1, Q2) -> + rabbit_mirror_queue_misc:update_mirrors(Q1, Q2). start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), @@ -550,6 +552,9 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). +start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring). +stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 71f8aacd..e4a61cb4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1199,6 +1199,31 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> ChPid, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end)); +handle_call(start_mirroring, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + reply(ok, State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + +handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, qname(State)}) of + [] -> ok; + [Q] -> rabbit_amqqueue:store_queue( + Q#amqqueue{slave_pids = undefined}) + end + end), + reply(ok, State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 50349204..e5ca085d 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -27,6 +27,9 @@ -export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). +%% temp +-export([init_with_existing_bq/3, stop_mirroring/1]). + -behaviour(rabbit_backing_queue). -include("rabbit.hrl"). @@ -82,16 +85,17 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(#amqqueue { name = QName } = Q, Recover, - AsyncCallback) -> +init(Q, Recover, AsyncCallback) -> + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, Recover, AsyncCallback), + init_with_existing_bq(Q, BQ, BQS). + +init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - {_MNode, MNodes} = rabbit_mirror_queue_misc:queue_nodes(Q), - MNodes1 = MNodes -- [node()], - [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], - {ok, BQ} = application:get_env(backing_queue_module), - BQS = BQ:init(Q, Recover, AsyncCallback), + {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- SNodes], ok = gm:broadcast(GM, {length, BQ:len(BQS)}), #state { gm = GM, coordinator = CPid, @@ -103,14 +107,24 @@ init(#amqqueue { name = QName } = Q, Recover, ack_msg_id = dict:new(), known_senders = sets:new() }. +stop_mirroring(State = #state { coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS }) -> + unlink(CPid), + stop_all_slaves(unmirroring, State), + {BQ, BQS}. + terminate({shutdown, dropped} = Reason, - State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> %% Backing queue termination - this node has been explicitly %% dropped. Normally, non-durable queues would be tidied up on %% startup, but there's a possibility that we will be added back %% in without this node being restarted. Thus we must do the full %% blown delete_and_terminate now, but only locally: we do not %% broadcast delete_and_terminate. + ok = gm:leave(GM), %% TODO presumably we need this? State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), set_delivered = 0 }; terminate(Reason, @@ -120,15 +134,17 @@ terminate(Reason, %% node. Thus just let some other slave take over. State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. -delete_and_terminate(Reason, State = #state { gm = GM, - backing_queue = BQ, +delete_and_terminate(Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + stop_all_slaves(Reason, State), + State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), + set_delivered = 0 }. + +stop_all_slaves(Reason, #state{gm = GM}) -> Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()], MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), - monitor_wait(MRefs), - State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), - set_delivered = 0 }. + monitor_wait(MRefs). monitor_wait([]) -> ok; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index a84623f6..de507afe 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -21,7 +21,8 @@ report_deaths/4]). %% temp --export([queue_nodes/1, is_mirrored/1, slave_pids/1]). +-export([suggested_queue_nodes/1, is_mirrored/1, slave_pids/1, + update_mirrors/2]). -include("rabbit.hrl"). @@ -94,8 +95,8 @@ on_node_up() -> fun () -> mnesia:foldl( fun (Q = #amqqueue{name = QName}, QNames0) -> - {_MNode, MNodes} = queue_nodes(Q), - case lists:member(node(), MNodes) of + {_MNode, SNodes} = suggested_queue_nodes(Q), + case lists:member(node(), SNodes) of true -> [QName | QNames0]; false -> QNames0 end @@ -107,9 +108,9 @@ on_node_up() -> drop_mirror(VHostPath, QueueName, MirrorNode) -> drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). -drop_mirror(Queue, MirrorNode) -> +drop_mirror(QName, MirrorNode) -> if_mirrored_queue( - Queue, + QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> @@ -128,9 +129,9 @@ drop_mirror(Queue, MirrorNode) -> add_mirror(VHostPath, QueueName, MirrorNode) -> add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). -add_mirror(Queue, MirrorNode) -> +add_mirror(QName, MirrorNode) -> if_mirrored_queue( - Queue, + QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> case rabbit_mirror_queue_slave_sup:start_child( @@ -173,21 +174,35 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> %%---------------------------------------------------------------------------- -queue_nodes(Q) -> +%% TODO this should take account of current nodes so we don't throw +%% away mirrors or change the master needlessly +suggested_queue_nodes(Q) -> case [rabbit_policy:get(P, Q) || P <- [<<"ha-mode">>, <<"ha-params">>]] of [{ok, <<"all">>}, _] -> - {node(), rabbit_mnesia:all_clustered_nodes()}; + {node(), rabbit_mnesia:all_clustered_nodes() -- [node()]}; [{ok, <<"nodes">>}, {ok, Nodes}] -> case [list_to_atom(binary_to_list(Node)) || Node <- Nodes] of [Node] -> {Node, []}; - [First | Rest] -> {First, [First | Rest]} + [First | Rest] -> {First, Rest} end; [{ok, <<"at-least">>}, {ok, Count}] -> - {node(), lists:sublist(rabbit_mnesia:all_clustered_nodes(), Count)}; + {node(), lists:sublist( + rabbit_mnesia:all_clustered_nodes(), Count) -- [node()]}; _ -> {node(), []} end. +actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> + MNode = case MPid of + undefined -> undefined; + _ -> node(MPid) + end, + SNodes = case SPids of + undefined -> undefined; + _ -> [node(Pid) || Pid <- SPids] + end, + {MNode, SNodes}. + is_mirrored(Q) -> case rabbit_policy:get(<<"ha-mode">>, Q) of {ok, <<"all">>} -> true; @@ -196,10 +211,27 @@ is_mirrored(Q) -> _ -> false end. -slave_pids(Q = #amqqueue{name = Name}) -> +slave_pids(#amqqueue{name = Name}) -> + {ok, Q = #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), case is_mirrored(Q) of false -> not_mirrored; - true -> {ok, #amqqueue{slave_pids = SPids}} = - rabbit_amqqueue:lookup(Name), - SPids + true -> SPids + end. + +update_mirrors(OldQ = #amqqueue{name = QName, pid = QPid}, + NewQ = #amqqueue{name = QName, pid = QPid}) -> + case {is_mirrored(OldQ), is_mirrored(NewQ)} of + {false, false} -> ok; + {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); + {false, true} -> rabbit_amqqueue:start_mirroring(QPid); + {true, true} -> {OldMNode, OldSNodes} = actual_queue_nodes(OldQ), + {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), + case OldMNode of + NewMNode -> ok; + _ -> io:format("TODO: master needs to change for ~p~n", [NewQ]) + end, + Add = NewSNodes -- OldSNodes, + Remove = OldSNodes -- NewSNodes, + [ok = drop_mirror(QName, SNode) || SNode <- Remove], + [ok = add_mirror(QName, SNode) || SNode <- Add] end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 03fafc3e..8f57e695 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -275,9 +275,11 @@ terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. ok; -terminate({shutdown, dropped} = R, #state { backing_queue = BQ, +terminate({shutdown, dropped} = R, #state { gm = GM, + backing_queue = BQ, backing_queue_state = BQS }) -> %% See rabbit_mirror_queue_master:terminate/2 + ok = gm:leave(GM), %% TODO presumably we need this? BQ:delete_and_terminate(R, BQS); terminate(Reason, #state { q = Q, gm = GM, -- cgit v1.2.1 From ab1386ea69e768e0d1829cf800237d7082aac450 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 9 Jul 2012 18:30:52 +0100 Subject: Various fixes. --- src/rabbit_amqqueue_process.erl | 24 ++++++++---------------- src/rabbit_mirror_queue_master.erl | 2 +- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5a3361ab..9404c53f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -923,19 +923,19 @@ i(consumers, _) -> i(memory, _) -> {memory, M} = process_info(self(), memory), M; -i(slave_pids, #q{q = Q = #amqqueue{name = Name}}) -> +i(slave_pids, #q{q = #amqqueue{name = Name}}) -> + {ok, Q = #amqqueue{slave_pids = SPids}} = + rabbit_amqqueue:lookup(Name), case rabbit_mirror_queue_misc:is_mirrored(Q) of false -> ''; - true -> {ok, #amqqueue{slave_pids = SPids}} = - rabbit_amqqueue:lookup(Name), - SPids + true -> SPids end; -i(synchronised_slave_pids, #q{q = Q = #amqqueue{name = Name}}) -> +i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> + {ok, Q = #amqqueue{sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(Name), case rabbit_mirror_queue_misc:is_mirrored(Q) of false -> ''; - true -> {ok, #amqqueue{sync_slave_pids = SSPids}} = - rabbit_amqqueue:lookup(Name), - SSPids + true -> SSPids end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); @@ -1191,14 +1191,6 @@ handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ = rabbit_mirror_queue_master, %% assertion {BQ1, BQS1} = BQ:stop_mirroring(BQS), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_queue, qname(State)}) of - [] -> ok; - [Q] -> rabbit_amqqueue:store_queue( - Q#amqqueue{slave_pids = undefined}) - end - end), reply(ok, State#q{backing_queue = BQ1, backing_queue_state = BQS1}); diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e5ca085d..2a04d872 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -111,7 +111,7 @@ stop_mirroring(State = #state { coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS }) -> unlink(CPid), - stop_all_slaves(unmirroring, State), + stop_all_slaves(shutdown, State), {BQ, BQS}. terminate({shutdown, dropped} = Reason, -- cgit v1.2.1 From 8aeaf3f89ffff3da206f3a8c7f9a8ef2fc22c8d7 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 7 Aug 2012 12:51:36 +0100 Subject: Take account of the current nodes when selecting new nodes, and some unit tests for the node selection logic. --- src/rabbit_mirror_queue_misc.erl | 70 +++++++++++++++++++++++----------------- src/rabbit_tests.erl | 29 +++++++++++++++++ 2 files changed, 70 insertions(+), 29 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index fe7c0442..0469f5f2 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -22,6 +22,8 @@ %% temp -export([suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2]). +%% for testing +-export([suggested_queue_nodes/4]). -include("rabbit.hrl"). @@ -204,41 +206,51 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids, %%---------------------------------------------------------------------------- -%% TODO this should take account of current nodes so we don't throw -%% away mirrors or change the master needlessly suggested_queue_nodes(Q) -> - case [rabbit_policy:get(P, Q) || P <- [<<"ha-mode">>, <<"ha-params">>]] of - [{ok, <<"all">>}, _] -> - {node(), rabbit_mnesia:all_clustered_nodes() -- [node()]}; - [{ok, <<"nodes">>}, {ok, Nodes}] -> - case [list_to_atom(binary_to_list(Node)) || Node <- Nodes] of - [Node] -> {Node, []}; - [First | Rest] -> {First, Rest} - end; - [{ok, <<"at-least">>}, {ok, Count}] -> - {node(), lists:sublist( - rabbit_mnesia:all_clustered_nodes(), Count) -- [node()]}; - _ -> - {node(), []} + {MNode0, SNodes} = actual_queue_nodes(Q), + MNode = case MNode0 of + none -> node(); + _ -> MNode0 + end, + suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), + {MNode, SNodes}, rabbit_mnesia:all_clustered_nodes()). + +policy(Policy, Q) -> + case rabbit_policy:get(Policy, Q) of + {ok, P} -> P; + _ -> none end. +suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) -> + {MNode, All -- [MNode]}; +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, _All) -> + Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], + case lists:member(MNode, Nodes) of + true -> {MNode, Nodes -- [MNode]}; + false -> {hd(Nodes), tl(Nodes)} + end; +suggested_queue_nodes(<<"at-least">>, Count, {MNode, SNodes}, All) -> + SCount = Count - 1, + {MNode, case SCount > length(SNodes) of + true -> Cand = (All -- [MNode]) -- SNodes, + SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); + false -> lists:sublist(SNodes, SCount) + end}; +suggested_queue_nodes(_, _, {MNode, _}, _) -> + {MNode, []}. + actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> - MNode = case MPid of - undefined -> undefined; - _ -> node(MPid) - end, - SNodes = case SPids of - undefined -> undefined; - _ -> [node(Pid) || Pid <- SPids] - end, - {MNode, SNodes}. + {case MPid of + none -> none; + _ -> node(MPid) + end, [node(Pid) || Pid <- SPids]}. is_mirrored(Q) -> - case rabbit_policy:get(<<"ha-mode">>, Q) of - {ok, <<"all">>} -> true; - {ok, <<"nodes">>} -> true; - {ok, <<"at-least">>} -> true; - _ -> false + case policy(<<"ha-mode">>, Q) of + <<"all">> -> true; + <<"nodes">> -> true; + <<"at-least">> -> true; + _ -> false end. update_mirrors(OldQ = #amqqueue{name = QName, pid = QPid}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bb60bd12..bc30fb4c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -52,6 +52,7 @@ all_tests() -> passed = test_log_management_during_startup(), passed = test_statistics(), passed = test_arguments_parser(), + passed = test_dynamic_mirroring(), passed = test_cluster_management(), passed = test_user_management(), passed = test_runtime_parameters(), @@ -856,6 +857,34 @@ test_arguments_parser() -> passed. +test_dynamic_mirroring() -> + %% Just unit tests of the node selection logic, see multi node + %% tests for the rest... + Test = fun ({NewM, NewSs}, Policy, Params, {OldM, OldSs}, All) -> + {NewM, NewSs0} = + rabbit_mirror_queue_misc:suggested_queue_nodes( + Policy, Params, {OldM, OldSs}, All), + NewSs = lists:sort(NewSs0) + end, + + Test({a,[b,c]},<<"all">>,'_',{a,[]}, [a,b,c]), + Test({a,[b,c]},<<"all">>,'_',{a,[b,c]},[a,b,c]), + Test({a,[b,c]},<<"all">>,'_',{a,[d]}, [a,b,c]), + + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]), + Test({b,[a,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]), + + Test({a,[b]}, <<"at-least">>,2,{a,[]}, [a,b,c,d]), + Test({a,[b,c]},<<"at-least">>,3,{a,[]}, [a,b,c,d]), + Test({a,[c]}, <<"at-least">>,2,{a,[c]}, [a,b,c,d]), + Test({a,[b,c]},<<"at-least">>,3,{a,[c]}, [a,b,c,d]), + Test({a,[c]}, <<"at-least">>,2,{a,[c,d]},[a,b,c,d]), + Test({a,[c,d]},<<"at-least">>,3,{a,[c,d]},[a,b,c,d]), + + passed. + test_cluster_management() -> %% 'cluster' and 'reset' should only work if the app is stopped {error, _} = control_action(cluster, []), -- cgit v1.2.1 From 1c5d42cc4f2a309ca4d5230e64ca65477cbb5fc6 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 7 Aug 2012 13:30:10 +0100 Subject: Support changing master (umm, which actually turned out to be a lot easier than expected, thanks Matthew). --- src/rabbit_mirror_queue_misc.erl | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 0469f5f2..87be4df7 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -259,14 +259,11 @@ update_mirrors(OldQ = #amqqueue{name = QName, pid = QPid}, {false, false} -> ok; {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); {false, true} -> rabbit_amqqueue:start_mirroring(QPid); - {true, true} -> {OldMNode, OldSNodes} = actual_queue_nodes(OldQ), - {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), - case OldMNode of - NewMNode -> ok; - _ -> io:format("TODO: master needs to change for ~p~n", [NewQ]) - end, - Add = NewSNodes -- OldSNodes, - Remove = OldSNodes -- NewSNodes, - [ok = drop_mirror(QName, SNode) || SNode <- Remove], - [ok = add_mirror(QName, SNode) || SNode <- Add] + {true, true} -> All = fun ({A,B}) -> [A|B] end, + OldNodes = All(actual_queue_nodes(OldQ)), + NewNodes = All(suggested_queue_nodes(NewQ)), + Add = NewNodes -- OldNodes, + Remove = OldNodes -- NewNodes, + [ok = drop_mirror(QName, Node) || Node <- Remove], + [ok = add_mirror(QName, Node) || Node <- Add] end. -- cgit v1.2.1 From 28cfa44999c3dce3bde80f8b6e35b3178a3795f1 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 7 Aug 2012 13:40:10 +0100 Subject: Reify something that was previously a little bit magic. --- src/rabbit_mirror_queue_misc.erl | 17 +++++++++++------ src/rabbit_mirror_queue_slave.erl | 2 ++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 87be4df7..5f36a19f 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -68,11 +68,11 @@ remove_from_queue(QueueName, DeadPids) -> [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, slave_pids = SPids }] -> - [QPid1 | SPids1] = Alive = - [Pid || Pid <- [QPid | SPids], - not lists:member(node(Pid), - DeadNodes) orelse - rabbit_misc:is_process_alive(Pid)], + Alive = [Pid || Pid <- [QPid | SPids], + not lists:member(node(Pid), + DeadNodes) orelse + rabbit_misc:is_process_alive(Pid)], + {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> {ok, QPid1, []}; @@ -206,6 +206,11 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids, %%---------------------------------------------------------------------------- +promote_slave([SPid | SPids]) -> + %% The slave pids are maintained in descending order of age, so + %% the one to promote is the oldest. + {SPid, SPids}. + suggested_queue_nodes(Q) -> {MNode0, SNodes} = actual_queue_nodes(Q), MNode = case MNode0 of @@ -227,7 +232,7 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, _All) -> Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], case lists:member(MNode, Nodes) of true -> {MNode, Nodes -- [MNode]}; - false -> {hd(Nodes), tl(Nodes)} + false -> promote_slave(Nodes) end; suggested_queue_nodes(<<"at-least">>, Count, {MNode, SNodes}, All) -> SCount = Count - 1, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f8f83d45..3fc33f72 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -154,6 +154,8 @@ init_it(Self, Node, QueueName) -> mnesia:read({rabbit_queue, QueueName}), case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of [] -> + %% Add to the end, so they are in descending order of age, see + %% rabbit_mirror_queue_misc:promote_slave/1 MPids1 = MPids ++ [Self], rabbit_mirror_queue_misc:store_updated_slaves( Q1#amqqueue{slave_pids = MPids1}), -- cgit v1.2.1 From 1ab8e0cbc66e711c3182d7ae734c704d5b0f183b Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 17 Aug 2012 12:16:05 +0100 Subject: at-least mode can imply that we need to start slaves in response to slaves dying elsewhere. So do that. --- src/rabbit_mirror_queue_misc.erl | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 5f36a19f..52846f58 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -58,38 +58,58 @@ %% slave (now master) receives messages it's not ready for (for %% example, new consumers). %% Returns {ok, NewMPid, DeadPids} -remove_from_queue(QueueName, DeadPids) -> - DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], + +remove_from_queue(QueueName, DeadGMPids) -> + case remove_from_queue0(QueueName, DeadGMPids) of + {ok, NewMPid, DeadQPids, ExtraNodes} -> + [ok = add_mirror(QueueName, Node) || Node <- ExtraNodes], + {ok, NewMPid, DeadQPids}; + Other -> + Other + end. + +remove_from_queue0(QueueName, DeadGMPids) -> + DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, + [Q = #amqqueue { name = QName, + pid = QPid, slave_pids = SPids }] -> Alive = [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), DeadNodes) orelse - rabbit_misc:is_process_alive(Pid)], + %% TODO when bug 25104 hits default do whatever it does. + false], + %% rabbit_misc:is_process_alive(Pid)], {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - {ok, QPid1, []}; + {ok, QPid1, [], []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - store_updated_slaves( - Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }), - {ok, QPid1, [QPid | SPids] -- Alive}; + Q1 = store_updated_slaves( + Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }), + %% Sometimes a slave dying means we need + %% to start more on other nodes - + %% "at-least" mode can cause this to + %% happen. + {_, OldNodes} = actual_queue_nodes(Q1), + {_, NewNodes} = suggested_queue_nodes(Q1), + {ok, QPid1, [QPid | SPids] -- Alive, + NewNodes -- OldNodes}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted %% slave to find it and make its %% promotion atomic. - {ok, QPid1, []} + {ok, QPid1, [], []} end end end). -- cgit v1.2.1 From 37267bda71d777c8eb3919085f356819ee678617 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 22 Aug 2012 16:00:38 +0100 Subject: s/at-least/exactly/g --- src/rabbit_mirror_queue_misc.erl | 12 ++++++------ src/rabbit_tests.erl | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 0383a15b..203ad651 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -93,7 +93,7 @@ remove_from_queue0(QueueName, DeadGMPids) -> slave_pids = SPids1 }), %% Sometimes a slave dying means we need %% to start more on other nodes - - %% "at-least" mode can cause this to + %% "exactly" mode can cause this to %% happen. {_, OldNodes} = actual_queue_nodes(Q1), {_, NewNodes} = suggested_queue_nodes(Q1), @@ -249,7 +249,7 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, _All) -> true -> {MNode, Nodes -- [MNode]}; false -> promote_slave(Nodes) end; -suggested_queue_nodes(<<"at-least">>, Count, {MNode, SNodes}, All) -> +suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, All) -> SCount = Count - 1, {MNode, case SCount > length(SNodes) of true -> Cand = (All -- [MNode]) -- SNodes, @@ -267,10 +267,10 @@ actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> is_mirrored(Q) -> case policy(<<"ha-mode">>, Q) of - <<"all">> -> true; - <<"nodes">> -> true; - <<"at-least">> -> true; - _ -> false + <<"all">> -> true; + <<"nodes">> -> true; + <<"exactly">> -> true; + _ -> false end. update_mirrors(OldQ = #amqqueue{name = QName, pid = QPid}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 84d7e96a..6e185751 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -874,12 +874,12 @@ test_dynamic_mirroring() -> Test({b,[a,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]), - Test({a,[b]}, <<"at-least">>,2,{a,[]}, [a,b,c,d]), - Test({a,[b,c]},<<"at-least">>,3,{a,[]}, [a,b,c,d]), - Test({a,[c]}, <<"at-least">>,2,{a,[c]}, [a,b,c,d]), - Test({a,[b,c]},<<"at-least">>,3,{a,[c]}, [a,b,c,d]), - Test({a,[c]}, <<"at-least">>,2,{a,[c,d]},[a,b,c,d]), - Test({a,[c,d]},<<"at-least">>,3,{a,[c,d]},[a,b,c,d]), + Test({a,[b]}, <<"exactly">>,2,{a,[]}, [a,b,c,d]), + Test({a,[b,c]},<<"exactly">>,3,{a,[]}, [a,b,c,d]), + Test({a,[c]}, <<"exactly">>,2,{a,[c]}, [a,b,c,d]), + Test({a,[b,c]},<<"exactly">>,3,{a,[c]}, [a,b,c,d]), + Test({a,[c]}, <<"exactly">>,2,{a,[c,d]},[a,b,c,d]), + Test({a,[c,d]},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]), passed. -- cgit v1.2.1 From 6bc19d6d7e6ccd7305389335f88736719be3eb02 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 23 Aug 2012 11:36:06 +0100 Subject: Fix broken merge --- src/rabbit_mirror_queue_misc.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 203ad651..e4591cca 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -76,11 +76,11 @@ remove_from_queue0(QueueName, DeadGMPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { name = QName, - pid = QPid, + [Q = #amqqueue { pid = QPid, slave_pids = SPids }] -> Alive = [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), DeadNodes)], + {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> {ok, QPid1, [], []}; -- cgit v1.2.1 From bf31e4e01ff5036720bb2445f057674012300c43 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 23 Aug 2012 15:24:37 +0100 Subject: Suggest queue nodes based on running nodes, otherwise (at the very least) we can immediately try to start a mirror on a node that has just gone down. --- src/rabbit_mirror_queue_misc.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index e4591cca..7caa96b5 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -233,7 +233,8 @@ suggested_queue_nodes(Q) -> _ -> MNode0 end, suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), - {MNode, SNodes}, rabbit_mnesia:all_clustered_nodes()). + {MNode, SNodes}, + rabbit_mnesia:running_clustered_nodes()). policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of -- cgit v1.2.1 From 491b5eccd1ee6d9e851e79d7d8991af2a21e4713 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 4 Sep 2012 17:55:11 +0100 Subject: Reify a touch more. --- src/rabbit_mirror_queue_slave.erl | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 3fc33f72..792adc95 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -154,11 +154,8 @@ init_it(Self, Node, QueueName) -> mnesia:read({rabbit_queue, QueueName}), case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of [] -> - %% Add to the end, so they are in descending order of age, see - %% rabbit_mirror_queue_misc:promote_slave/1 - MPids1 = MPids ++ [Self], rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), + Q1#amqqueue{slave_pids = add_slave(Self, MPids)}), {new, QPid}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of @@ -168,13 +165,17 @@ init_it(Self, Node, QueueName) -> [SPid] -> case rabbit_misc:is_process_alive(SPid) of true -> existing; - false -> MPids1 = (MPids -- [SPid]) ++ [Self], + false -> MPids1 = add_slave(Self, MPids -- [SPid]), rabbit_mirror_queue_misc:store_updated_slaves( Q1#amqqueue{slave_pids = MPids1}), {new, QPid} end end. +%% Add to the end, so they are in descending order of age, see +%% rabbit_mirror_queue_misc:promote_slave/1 +add_slave(New, MPids) -> MPids ++ [New]. + handle_call({deliver, Delivery = #delivery { immediate = true }}, From, State) -> %% It is safe to reply 'false' here even if a) we've not seen the -- cgit v1.2.1 From fe3106415e65713a034ff72bbf9d2842d7e7e889 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 4 Sep 2012 18:13:46 +0100 Subject: more reification, plus cosmetics --- src/rabbit_mirror_queue_slave.erl | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 792adc95..61bd54b9 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -153,28 +153,23 @@ init_it(Self, Node, QueueName) -> [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = mnesia:read({rabbit_queue, QueueName}), case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = add_slave(Self, MPids)}), - {new, QPid}; - [QPid] -> - case rabbit_misc:is_process_alive(QPid) of - true -> duplicate_live_master; - false -> {stale, QPid} - end; - [SPid] -> - case rabbit_misc:is_process_alive(SPid) of - true -> existing; - false -> MPids1 = add_slave(Self, MPids -- [SPid]), - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), - {new, QPid} - end + [] -> add_slave(Q1, Self, MPids), + {new, QPid}; + [QPid] -> case rabbit_misc:is_process_alive(QPid) of + true -> duplicate_live_master; + false -> {stale, QPid} + end; + [SPid] -> case rabbit_misc:is_process_alive(SPid) of + true -> existing; + false -> add_slave(Q1, Self, MPids -- [SPid]), + {new, QPid} + end end. %% Add to the end, so they are in descending order of age, see %% rabbit_mirror_queue_misc:promote_slave/1 -add_slave(New, MPids) -> MPids ++ [New]. +add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves( + Q#amqqueue{slave_pids = MPids ++ [New]}). handle_call({deliver, Delivery = #delivery { immediate = true }}, From, State) -> -- cgit v1.2.1 From 4d81ce03d196bf97ce3fda8969095e00e895d6ee Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 5 Sep 2012 10:08:51 +0100 Subject: Reduce difference to default --- src/rabbit_mirror_queue_slave.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 30f13f10..5ed5f063 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -927,9 +927,9 @@ set_synchronised(true, State = #state { q = #amqqueue { name = QName }, case mnesia:read({rabbit_queue, QName}) of [] -> ok; - [Q = #amqqueue{sync_slave_pids = SSPids}] -> - Q1 = Q#amqqueue{sync_slave_pids = [Self | SSPids]}, - rabbit_mirror_queue_misc:store_updated_slaves(Q1) + [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> + Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, + rabbit_mirror_queue_misc:store_updated_slaves(Q2) end end), State #state { synchronised = true }; -- cgit v1.2.1 From 6578389b1df1937fb08bb7a1621b50f09cb7416e Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 5 Sep 2012 10:27:28 +0100 Subject: Add specs, remove misleading "%% temp" comment. --- src/rabbit_amqqueue.erl | 4 ++-- src/rabbit_mirror_queue_master.erl | 4 +++- src/rabbit_mirror_queue_misc.erl | 16 ++++++++++------ 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 96cf226e..65d3001a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,8 +29,6 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). - -%% temp -export([start_mirroring/1, stop_mirroring/1]). %% internal @@ -165,6 +163,8 @@ -spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(policy_changed/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). +-spec(start_mirroring/1 :: (pid()) -> 'ok'). +-spec(stop_mirroring/1 :: (pid()) -> 'ok'). -endif. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 9f2305ba..e35b0808 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -27,7 +27,6 @@ -export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). -%% temp -export([init_with_existing_bq/3, stop_mirroring/1]). -behaviour(rabbit_backing_queue). @@ -66,6 +65,9 @@ (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). -spec(length_fun/0 :: () -> length_fun()). +-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) -> + master_state()). +-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}). -endif. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 7caa96b5..5ad04ff7 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -18,14 +18,12 @@ -export([remove_from_queue/2, on_node_up/0, drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, - report_deaths/4, store_updated_slaves/1]). + report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, + is_mirrored/1, update_mirrors/2]). -%% temp --export([suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2]). -%% for testing +%% for testing only -export([suggested_queue_nodes/4]). - -include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -45,6 +43,11 @@ -> rabbit_types:ok_or_error(any())). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). +-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> + {node(), [node()]}). +-spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). +-spec(update_mirrors/2 :: + (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -endif. @@ -286,5 +289,6 @@ update_mirrors(OldQ = #amqqueue{name = QName, pid = QPid}, Add = NewNodes -- OldNodes, Remove = OldNodes -- NewNodes, [ok = drop_mirror(QName, Node) || Node <- Remove], - [ok = add_mirror(QName, Node) || Node <- Add] + [ok = add_mirror(QName, Node) || Node <- Add], + ok end. -- cgit v1.2.1 From 7d58dbfd96c6a5b87e7a5a3520106dacbf365254 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 5 Sep 2012 10:39:46 +0100 Subject: No we don't. The point is that if a mirror is dropping out of the queue, does it need to explicitly leave the GM group? The answer is no: the master is linked to the coordinator is linked to the GM / the slave is linked to the GM. --- src/rabbit_mirror_queue_master.erl | 1 - src/rabbit_mirror_queue_slave.erl | 1 - 2 files changed, 2 deletions(-) diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e35b0808..473d9671 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -126,7 +126,6 @@ terminate({shutdown, dropped} = Reason, %% in without this node being restarted. Thus we must do the full %% blown delete_and_terminate now, but only locally: we do not %% broadcast delete_and_terminate. - ok = gm:leave(GM), %% TODO presumably we need this? State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), set_delivered = 0 }; terminate(Reason, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 5ed5f063..f245f913 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -297,7 +297,6 @@ terminate({shutdown, dropped} = R, #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> %% See rabbit_mirror_queue_master:terminate/2 - ok = gm:leave(GM), %% TODO presumably we need this? BQ:delete_and_terminate(R, BQS); terminate(Reason, #state { q = Q, gm = GM, -- cgit v1.2.1 From 1b0b6a904b2071582e4745c6cd621cda8427763f Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 5 Sep 2012 10:41:13 +0100 Subject: Gah --- src/rabbit_mirror_queue_master.erl | 3 +-- src/rabbit_mirror_queue_slave.erl | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 473d9671..8b71060c 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -117,8 +117,7 @@ stop_mirroring(State = #state { coordinator = CPid, {BQ, BQS}. terminate({shutdown, dropped} = Reason, - State = #state { gm = GM, - backing_queue = BQ, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination - this node has been explicitly %% dropped. Normally, non-durable queues would be tidied up on diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f245f913..9a4d5cbe 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -293,8 +293,7 @@ terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. ok; -terminate({shutdown, dropped} = R, #state { gm = GM, - backing_queue = BQ, +terminate({shutdown, dropped} = R, #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% See rabbit_mirror_queue_master:terminate/2 BQ:delete_and_terminate(R, BQS); -- cgit v1.2.1 From 798fd411d873ad2b5f0fbee58ad2011717fa8529 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 5 Sep 2012 13:25:03 +0100 Subject: Call add_mirror/2 before drop_mirror/2, and explain why. --- src/rabbit_mirror_queue_misc.erl | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 5ad04ff7..9fb18457 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -288,7 +288,17 @@ update_mirrors(OldQ = #amqqueue{name = QName, pid = QPid}, NewNodes = All(suggested_queue_nodes(NewQ)), Add = NewNodes -- OldNodes, Remove = OldNodes -- NewNodes, - [ok = drop_mirror(QName, Node) || Node <- Remove], + %% When a mirror dies, remove_from_queue/2 + %% might have to add new slaves (in + %% "exactly" mode). It will check mnesia to + %% see which slaves there currently are. If + %% drop_mirror/2 is invoked first then when + %% we end up in remove_from_queue/2 it will + %% not see the slaves that add_mirror/2 will + %% add, and also want to add them (even + %% though we are not responding to the death + %% of a mirror). Breakage ensues. [ok = add_mirror(QName, Node) || Node <- Add], + [ok = drop_mirror(QName, Node) || Node <- Remove], ok end. -- cgit v1.2.1 From 1222f40fffaacc43a1741757f084952e260e6d0c Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 5 Sep 2012 13:28:04 +0100 Subject: Cosmetic: give that comment room to breath. --- src/rabbit_mirror_queue_misc.erl | 41 ++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 9fb18457..5217e276 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -277,28 +277,29 @@ is_mirrored(Q) -> _ -> false end. -update_mirrors(OldQ = #amqqueue{name = QName, pid = QPid}, - NewQ = #amqqueue{name = QName, pid = QPid}) -> +update_mirrors(OldQ = #amqqueue{pid = QPid}, + NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of {false, false} -> ok; {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); {false, true} -> rabbit_amqqueue:start_mirroring(QPid); - {true, true} -> All = fun ({A,B}) -> [A|B] end, - OldNodes = All(actual_queue_nodes(OldQ)), - NewNodes = All(suggested_queue_nodes(NewQ)), - Add = NewNodes -- OldNodes, - Remove = OldNodes -- NewNodes, - %% When a mirror dies, remove_from_queue/2 - %% might have to add new slaves (in - %% "exactly" mode). It will check mnesia to - %% see which slaves there currently are. If - %% drop_mirror/2 is invoked first then when - %% we end up in remove_from_queue/2 it will - %% not see the slaves that add_mirror/2 will - %% add, and also want to add them (even - %% though we are not responding to the death - %% of a mirror). Breakage ensues. - [ok = add_mirror(QName, Node) || Node <- Add], - [ok = drop_mirror(QName, Node) || Node <- Remove], - ok + {true, true} -> update_mirrors0(OldQ, NewQ) end. + +update_mirrors0(OldQ = #amqqueue{name = QName}, + NewQ = #amqqueue{name = QName}) -> + All = fun ({A,B}) -> [A|B] end, + OldNodes = All(actual_queue_nodes(OldQ)), + NewNodes = All(suggested_queue_nodes(NewQ)), + Add = NewNodes -- OldNodes, + Remove = OldNodes -- NewNodes, + %% When a mirror dies, remove_from_queue/2 might have to add new + %% slaves (in "exactly" mode). It will check mnesia to see which + %% slaves there currently are. If drop_mirror/2 is invoked first + %% then when we end up in remove_from_queue/2 it will not see the + %% slaves that add_mirror/2 will add, and also want to add them + %% (even though we are not responding to the death of a + %% mirror). Breakage ensues. + [ok = add_mirror(QName, Node) || Node <- Add], + [ok = drop_mirror(QName, Node) || Node <- Remove], + ok. -- cgit v1.2.1 From dc63b6550d0fca9ec60c1fb66f5530923f66fc6d Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 5 Sep 2012 14:54:17 +0100 Subject: Also exclude DeadNodes from the list of new nodes to start mirrors on; we have checked for running_clustered_nodes() but that could be out of date. --- src/rabbit_mirror_queue_misc.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 5217e276..ad9dfa0d 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -101,7 +101,7 @@ remove_from_queue0(QueueName, DeadGMPids) -> {_, OldNodes} = actual_queue_nodes(Q1), {_, NewNodes} = suggested_queue_nodes(Q1), {ok, QPid1, [QPid | SPids] -- Alive, - NewNodes -- OldNodes}; + (NewNodes -- OldNodes) -- DeadNodes}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted -- cgit v1.2.1 From df6fc31afc8c28655a9a079f9998f7ef022c4406 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Wed, 5 Sep 2012 15:18:15 +0100 Subject: cosmetic --- src/rabbit_mirror_queue_misc.erl | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index ad9dfa0d..ebaae995 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -291,8 +291,6 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, All = fun ({A,B}) -> [A|B] end, OldNodes = All(actual_queue_nodes(OldQ)), NewNodes = All(suggested_queue_nodes(NewQ)), - Add = NewNodes -- OldNodes, - Remove = OldNodes -- NewNodes, %% When a mirror dies, remove_from_queue/2 might have to add new %% slaves (in "exactly" mode). It will check mnesia to see which %% slaves there currently are. If drop_mirror/2 is invoked first @@ -300,6 +298,6 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, %% slaves that add_mirror/2 will add, and also want to add them %% (even though we are not responding to the death of a %% mirror). Breakage ensues. - [ok = add_mirror(QName, Node) || Node <- Add], - [ok = drop_mirror(QName, Node) || Node <- Remove], + [ok = add_mirror(QName, Node) || Node <- NewNodes -- OldNodes], + [ok = drop_mirror(QName, Node) || Node <- OldNodes -- NewNodes], ok. -- cgit v1.2.1 From 1fd784d838b780bc282bdf726f6a74a27928ca66 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 17 Sep 2012 13:27:16 +0100 Subject: Cope with the fact that rabbit_mnesia:running_clustered_nodes/0 now does not include node() during boot. --- src/rabbit_mirror_queue_misc.erl | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index ebaae995..5325f1be 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -236,8 +236,17 @@ suggested_queue_nodes(Q) -> _ -> MNode0 end, suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), - {MNode, SNodes}, - rabbit_mnesia:running_clustered_nodes()). + {MNode, SNodes}, clusterable_nodes()). + +clusterable_nodes() -> + %% We may end up here via on_node_up/0, in which case we are still + %% booting - rabbit_mnesia:running_clustered_nodes/0 will report + %% us as not running. + Nodes = rabbit_mnesia:running_clustered_nodes(), + case lists:member(node(), Nodes) of + true -> Nodes; + false -> [node() | Nodes] + end. policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of -- cgit v1.2.1 From 39cc7a0a0185de0f4868354344013648f8228a18 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 17 Sep 2012 13:36:48 +0100 Subject: Simplify --- src/rabbit_mirror_queue_misc.erl | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 5325f1be..f53319c0 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -242,11 +242,7 @@ clusterable_nodes() -> %% We may end up here via on_node_up/0, in which case we are still %% booting - rabbit_mnesia:running_clustered_nodes/0 will report %% us as not running. - Nodes = rabbit_mnesia:running_clustered_nodes(), - case lists:member(node(), Nodes) of - true -> Nodes; - false -> [node() | Nodes] - end. + lists:usort([node() | rabbit_mnesia:running_clustered_nodes()]). policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of -- cgit v1.2.1 From ae391b362ac1f447e5b4b975b64c71a0b592b088 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 18 Sep 2012 11:42:55 +0100 Subject: Pull our attempts to list all nodes out of transactions and loops. --- src/rabbit_mirror_queue_misc.erl | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index f53319c0..603a490b 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -73,6 +73,7 @@ remove_from_queue(QueueName, DeadGMPids) -> remove_from_queue0(QueueName, DeadGMPids) -> DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], + ClusterNodes = clusterable_nodes() -- DeadNodes, rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -99,9 +100,10 @@ remove_from_queue0(QueueName, DeadGMPids) -> %% "exactly" mode can cause this to %% happen. {_, OldNodes} = actual_queue_nodes(Q1), - {_, NewNodes} = suggested_queue_nodes(Q1), + {_, NewNodes} = suggested_queue_nodes( + Q1, ClusterNodes), {ok, QPid1, [QPid | SPids] -- Alive, - (NewNodes -- OldNodes) -- DeadNodes}; + NewNodes -- OldNodes}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted @@ -113,12 +115,14 @@ remove_from_queue0(QueueName, DeadGMPids) -> end). on_node_up() -> + ClusterNodes = clusterable_nodes(), QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( fun (Q = #amqqueue{name = QName}, QNames0) -> - {_MNode, SNodes} = suggested_queue_nodes(Q), + {_MNode, SNodes} = suggested_queue_nodes( + Q, ClusterNodes), case lists:member(node(), SNodes) of true -> [QName | QNames0]; false -> QNames0 @@ -229,15 +233,21 @@ promote_slave([SPid | SPids]) -> %% the one to promote is the oldest. {SPid, SPids}. -suggested_queue_nodes(Q) -> +suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, clusterable_nodes()). + +%% This variant exists so we can pull a call to clusterable_nodes() +%% out of a loop or transaction or both. +suggested_queue_nodes(Q, ClusterNodes) -> {MNode0, SNodes} = actual_queue_nodes(Q), MNode = case MNode0 of none -> node(); _ -> MNode0 end, suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), - {MNode, SNodes}, clusterable_nodes()). + {MNode, SNodes}, ClusterNodes). +%% TODO we should probably just redefine +%% rabbit_mnesia:running_clustered_nodes/0? Waiting on Francesco. clusterable_nodes() -> %% We may end up here via on_node_up/0, in which case we are still %% booting - rabbit_mnesia:running_clustered_nodes/0 will report -- cgit v1.2.1 From 7e46234ac9183856ddc1fc063cef780190f6c086 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 19 Sep 2012 14:34:59 +0100 Subject: That hack should not be needed any more. --- src/rabbit_mirror_queue_misc.erl | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 603a490b..295f12da 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -73,7 +73,7 @@ remove_from_queue(QueueName, DeadGMPids) -> remove_from_queue0(QueueName, DeadGMPids) -> DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], - ClusterNodes = clusterable_nodes() -- DeadNodes, + ClusterNodes = rabbit_mnesia:running_clustered_nodes() -- DeadNodes, rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -115,7 +115,7 @@ remove_from_queue0(QueueName, DeadGMPids) -> end). on_node_up() -> - ClusterNodes = clusterable_nodes(), + ClusterNodes = rabbit_mnesia:running_clustered_nodes(), QNames = rabbit_misc:execute_mnesia_transaction( fun () -> @@ -233,10 +233,12 @@ promote_slave([SPid | SPids]) -> %% the one to promote is the oldest. {SPid, SPids}. -suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, clusterable_nodes()). +suggested_queue_nodes(Q) -> + suggested_queue_nodes(Q, rabbit_mnesia:running_clustered_nodes()). -%% This variant exists so we can pull a call to clusterable_nodes() -%% out of a loop or transaction or both. +%% This variant exists so we can pull a call to +%% rabbit_mnesia:running_clustered_nodes() out of a loop or +%% transaction or both. suggested_queue_nodes(Q, ClusterNodes) -> {MNode0, SNodes} = actual_queue_nodes(Q), MNode = case MNode0 of @@ -246,14 +248,6 @@ suggested_queue_nodes(Q, ClusterNodes) -> suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), {MNode, SNodes}, ClusterNodes). -%% TODO we should probably just redefine -%% rabbit_mnesia:running_clustered_nodes/0? Waiting on Francesco. -clusterable_nodes() -> - %% We may end up here via on_node_up/0, in which case we are still - %% booting - rabbit_mnesia:running_clustered_nodes/0 will report - %% us as not running. - lists:usort([node() | rabbit_mnesia:running_clustered_nodes()]). - policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of {ok, P} -> P; -- cgit v1.2.1 From 2965eaf2eba0db8dc59dd7a73a33f278fc1723e6 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 20 Sep 2012 12:05:07 +0100 Subject: Start new slaves in response to gm deaths a bit later, to prevent deadlock. Also remove {add,drop}_mirror/3 as not used, and don't export drop_mirror/2 as not used outside the module. --- src/rabbit_mirror_queue_coordinator.erl | 3 ++- src/rabbit_mirror_queue_master.erl | 2 +- src/rabbit_mirror_queue_misc.erl | 34 +++++++++++---------------------- src/rabbit_mirror_queue_slave.erl | 20 ++++++++++++------- 4 files changed, 27 insertions(+), 32 deletions(-) diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 4455b441..5284000b 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -344,9 +344,10 @@ handle_cast({gm_deaths, Deaths}, State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, MPid, DeadPids} -> + {ok, MPid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), noreply(State); {error, not_found} -> {stop, normal, State} diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4c6f9bbd..b7db04fe 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -97,7 +97,7 @@ init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) -> Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), - [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- SNodes], + rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), #state { gm = GM, coordinator = CPid, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 295f12da..090948e6 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -16,8 +16,7 @@ -module(rabbit_mirror_queue_misc). --export([remove_from_queue/2, on_node_up/0, - drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, +-export([remove_from_queue/2, on_node_up/0, add_mirrors/2, add_mirror/2, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2]). @@ -32,15 +31,11 @@ -spec(remove_from_queue/2 :: (rabbit_amqqueue:name(), [pid()]) - -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). + -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). --spec(drop_mirror/2 :: - (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())). +-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok'). -spec(add_mirror/2 :: (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())). --spec(add_mirror/3 :: - (rabbit_types:vhost(), binary(), atom()) - -> rabbit_types:ok_or_error(any())). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). -spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> @@ -63,15 +58,6 @@ %% Returns {ok, NewMPid, DeadPids} remove_from_queue(QueueName, DeadGMPids) -> - case remove_from_queue0(QueueName, DeadGMPids) of - {ok, NewMPid, DeadQPids, ExtraNodes} -> - [ok = add_mirror(QueueName, Node) || Node <- ExtraNodes], - {ok, NewMPid, DeadQPids}; - Other -> - Other - end. - -remove_from_queue0(QueueName, DeadGMPids) -> DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], ClusterNodes = rabbit_mnesia:running_clustered_nodes() -- DeadNodes, rabbit_misc:execute_mnesia_transaction( @@ -132,8 +118,9 @@ on_node_up() -> [add_mirror(QName, node()) || QName <- QNames], ok. -drop_mirror(VHostPath, QueueName, MirrorNode) -> - drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). +drop_mirrors(QName, Nodes) -> + [ok = drop_mirror(QName, Node) || Node <- Nodes], + ok. drop_mirror(QName, MirrorNode) -> if_mirrored_queue( @@ -153,8 +140,9 @@ drop_mirror(QName, MirrorNode) -> end end). -add_mirror(VHostPath, QueueName, MirrorNode) -> - add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). +add_mirrors(QName, Nodes) -> + [ok = add_mirror(QName, Node) || Node <- Nodes], + ok. add_mirror(QName, MirrorNode) -> if_mirrored_queue( @@ -307,6 +295,6 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, %% slaves that add_mirror/2 will add, and also want to add them %% (even though we are not responding to the death of a %% mirror). Breakage ensues. - [ok = add_mirror(QName, Node) || Node <- NewNodes -- OldNodes], - [ok = drop_mirror(QName, Node) || Node <- OldNodes -- NewNodes], + add_mirrors(QName, NewNodes -- OldNodes), + drop_mirrors(QName, OldNodes -- NewNodes), ok. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 12e3058a..bdbf972c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -200,18 +200,25 @@ handle_call({gm_deaths, Deaths}, From, {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; - {ok, Pid, DeadPids} -> + {ok, Pid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, DeadPids), if node(Pid) =:= node(MPid) -> %% master hasn't changed - reply(ok, State); + gen_server2:reply(From, ok), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), + noreply(State); node(Pid) =:= node() -> %% we've become master - promote_me(From, State); + QueueState = promote_me(From, State), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), + {become, rabbit_amqqueue_process, QueueState, hibernate}; true -> %% master has changed to not us. gen_server2:reply(From, ok), + %% assertion, we don't need to add_mirrors/2 in this + %% branch, see last clause in remove_from_queue/2 + [] = ExtraNodes, erlang:monitor(process, Pid), %% GM is lazy. So we know of the death of the %% slave since it is a neighbour of ours, but @@ -556,10 +563,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), {Delivery, true} <- queue:to_list(PubQ)], - QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, KS, MTC), - {become, rabbit_amqqueue_process, QueueState, hibernate}. + rabbit_amqqueue_process:init_with_backing_queue_state( + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags, + Deliveries, KS, MTC). noreply(State) -> {NewState, Timeout} = next_state(State), -- cgit v1.2.1 From 352e716468d82e536ce5e50d8bc34487509f5763 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 20 Sep 2012 13:32:22 +0100 Subject: That case was originally written to cover the unmirrored case - but actually the queue might be mirrored but with no slaves, or about to become mirrored, or whatever. And our is_process_alive() check serves the same purpose. --- src/rabbit_amqqueue.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 452b11b3..9d09742d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -297,8 +297,6 @@ lookup(Name) -> with(Name, F, E) -> case lookup(Name) of - {ok, Q = #amqqueue{slave_pids = []}} -> - rabbit_misc:with_exit_handler(E, fun () -> F(Q) end); {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do -- cgit v1.2.1 From e7552bbd97c63d89b045fd34eece3900ccca7034 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 20 Sep 2012 13:47:33 +0100 Subject: "nodes" policy should not suggest nodes which are not running. Also if nodes policy is completely unconnected with reality, just keep the master alive rather than blow up. --- src/rabbit_mirror_queue_misc.erl | 15 +++++++++++---- src/rabbit_tests.erl | 9 ++++++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 090948e6..3b25df6a 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -244,11 +244,18 @@ policy(Policy, Q) -> suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) -> {MNode, All -- [MNode]}; -suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, _All) -> +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) -> Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], - case lists:member(MNode, Nodes) of - true -> {MNode, Nodes -- [MNode]}; - false -> promote_slave(Nodes) + Unavailable = Nodes -- All, + Available = Nodes -- Unavailable, + case Available of + [] -> %% We have never heard of anything? Not much we can do but + %% keep the master alive. + {MNode, []}; + _ -> case lists:member(MNode, Available) of + true -> {MNode, Available -- [MNode]}; + false -> promote_slave(Available) + end end; suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, All) -> SCount = Count - 1, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 6cb34b09..b1d549fb 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -898,10 +898,17 @@ test_dynamic_mirroring() -> Test({a,[b,c]},<<"all">>,'_',{a,[b,c]},[a,b,c]), Test({a,[b,c]},<<"all">>,'_',{a,[d]}, [a,b,c]), - Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), + %% Add a node Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]), Test({b,[a,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), + %% Add two nodes and drop one + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), + %% Promote slave to master by policy Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]), + %% Don't try to include nodes that are not running + Test({a,[b]}, <<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]), + %% If we can't find any of the nodes listed then just keep the master + Test({a,[]}, <<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]), Test({a,[b]}, <<"exactly">>,2,{a,[]}, [a,b,c,d]), Test({a,[b,c]},<<"exactly">>,3,{a,[]}, [a,b,c,d]), -- cgit v1.2.1 From 4590a8ea2504eb60253cddf2c9f2b4ebca172423 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 20 Sep 2012 14:47:52 +0100 Subject: Handle the case where we go from unmirrored(A) to nodes(B, C). --- src/rabbit_mirror_queue_misc.erl | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 3b25df6a..011f0663 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -281,12 +281,19 @@ is_mirrored(Q) -> _ -> false end. + +%% [1] - rabbit_amqqueue:start_mirroring/1 will turn unmirrored to +%% master and start any needed slaves. However, if node(QPid) is not +%% in the nodes for the policy, it won't switch it. So this is for the +%% case where we kill the existing queue and restart elsewhere. TODO: +%% is this TRTTD? All alternatives seem ugly. update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of {false, false} -> ok; {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); - {false, true} -> rabbit_amqqueue:start_mirroring(QPid); + {false, true} -> rabbit_amqqueue:start_mirroring(QPid), + update_mirrors0(OldQ, NewQ); %% [1] {true, true} -> update_mirrors0(OldQ, NewQ) end. -- cgit v1.2.1 From 94511c36e377a52bebdd8b84f297e1b89ed39d54 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 28 Sep 2012 16:29:53 +0100 Subject: We want to catch here, if sup:start_child exits the rabbit_amqqueue:with we are in will convert any exit to not_found. Which is unlikely to be helpful --- src/rabbit_mirror_queue_misc.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 2e26ebfc..8aa860a7 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -163,7 +163,7 @@ add_mirror(QName, MirrorNode) -> end). start_child(Name, MirrorNode, Q) -> - case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of + case catch rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of {ok, undefined} -> %% this means the mirror process was %% already running on the given node. -- cgit v1.2.1 From d93e42d597cd93f9f469634f2ad4469bb0f7c41e Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 28 Sep 2012 16:52:53 +0100 Subject: Assert a bit here too. --- src/rabbit_mirror_queue_misc.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 8aa860a7..8c75ab02 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -115,7 +115,7 @@ on_node_up() -> end end, [], rabbit_queue) end), - [add_mirror(QName, node()) || QName <- QNames], + [ok = add_mirror(QName, node()) || QName <- QNames], ok. drop_mirrors(QName, Nodes) -> -- cgit v1.2.1 From a695a4d2e3f711fe7b2407cb8bac356b0c7d87a9 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 28 Sep 2012 16:53:25 +0100 Subject: Ignore if the node is down. --- src/rabbit_mirror_queue_misc.erl | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 8c75ab02..a0b03166 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -163,11 +163,19 @@ add_mirror(QName, MirrorNode) -> end). start_child(Name, MirrorNode, Q) -> - case catch rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of + case rabbit_misc:with_exit_handler( + rabbit_misc:const({ok, down}), + fun () -> + rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) + end) of {ok, undefined} -> %% this means the mirror process was %% already running on the given node. ok; + {ok, down} -> + %% Node went down between us deciding to start a mirror + %% and actually starting it. Which is fine. + ok; {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, SPid]), -- cgit v1.2.1 From 995def5af7bc95d652fa38b092195f63375e60b5 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 28 Sep 2012 16:58:40 +0100 Subject: Don't throw, rabbit_amqqueue:with/2 will eat it. --- src/rabbit_mirror_queue_misc.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index a0b03166..453f2f2c 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -186,7 +186,7 @@ start_child(Name, MirrorNode, Q) -> [rabbit_misc:rs(Name), MirrorNode, StalePid]), ok; {error, {{duplicate_live_master, _}=Err, _}} -> - throw(Err); + Err; Other -> Other end. -- cgit v1.2.1