diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-09-14 16:19:31 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-09-14 16:19:31 +0100 |
commit | 06652b964b1337584c3a26e5793cdfaf9036aafb (patch) | |
tree | 5be42311c7f08bcff8a4232be7418f7f395acf47 | |
parent | 36bb3df65be376e9236b7907a9951d9a36e681b5 (diff) | |
parent | df6fc31afc8c28655a9a079f9998f7ef022c4406 (diff) | |
download | rabbitmq-server-06652b964b1337584c3a26e5793cdfaf9036aafb.tar.gz |
Merge default
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 81 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 46 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 46 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 170 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 13 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 29 | ||||
-rw-r--r-- | src/rabbit_types.erl | 3 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 14 |
9 files changed, 271 insertions, 134 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index d6fac46d..a6d7f399 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -47,8 +47,7 @@ -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, sync_slave_pids, mirror_nodes, - policy}). + arguments, pid, slave_pids, sync_slave_pids, policy}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d566ac87..305f4c15 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,7 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). - +-export([start_mirroring/1, stop_mirroring/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -165,6 +165,8 @@ -spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(policy_changed/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). +-spec(start_mirroring/1 :: (pid()) -> 'ok'). +-spec(stop_mirroring/1 :: (pid()) -> 'ok'). -endif. @@ -210,19 +212,19 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - {Node, MNodes} = determine_queue_nodes(Args), - Q = start_queue_process(Node, #amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - mirror_nodes = MNodes}), - case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of + Q0 = rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = []}), + {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), + Q1 = start_queue_process(Node, Q0), + case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); - Q1 -> Q1 + Q2 -> Q2 end. internal_declare(Q, true) -> @@ -271,24 +273,8 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -policy_changed(_Q1, _Q2) -> - ok. - -determine_queue_nodes(Args) -> - Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), - PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), - case {Policy, PolicyParams} of - {{_Type, <<"nodes">>}, {array, Nodes}} -> - case [list_to_atom(binary_to_list(Node)) || - {longstr, Node} <- Nodes] of - [Node] -> {Node, undefined}; - [First | Rest] -> {First, [First | Rest]} - end; - {{_Type, <<"all">>}, _} -> - {node(), all}; - _ -> - {node(), undefined} - end. +policy_changed(Q1, Q2) -> + rabbit_mirror_queue_misc:update_mirrors(Q1, Q2). start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), @@ -364,13 +350,11 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, RequiredArgs) -> rabbit_misc:assert_args_equivalence( - Args, RequiredArgs, QueueName, - [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). + Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]). check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, - {<<"x-ha-policy">>, fun check_ha_policy_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], [case rabbit_misc:table_lookup(Args, Key) of @@ -421,29 +405,6 @@ check_dlxrk_arg({longstr, _}, Args) -> check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. -check_ha_policy_arg({longstr, <<"all">>}, _Args) -> - ok; -check_ha_policy_arg({longstr, <<"nodes">>}, Args) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of - undefined -> - {error, {require, 'x-ha-policy-params'}}; - {array, []} -> - {error, {require_non_empty_list_of_nodes_for_ha}}; - {array, Ary} -> - case lists:all(fun ({longstr, _Node}) -> true; - (_ ) -> false - end, Ary) of - true -> ok; - false -> {error, {require_node_list_as_longstrs_for_ha, Ary}} - end; - {Type, _} -> - {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} - end; -check_ha_policy_arg({longstr, Policy}, _Args) -> - {error, {invalid_ha_policy, Policy}}; -check_ha_policy_arg({Type, _}, _Args) -> - {error, {unacceptable_type, Type}}. - list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). @@ -608,6 +569,9 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). +start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring). +stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = @@ -642,8 +606,7 @@ pseudo_queue(QueueName, Pid) -> auto_delete = false, arguments = [], pid = Pid, - slave_pids = [], - mirror_nodes = undefined}. + slave_pids = []}. deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> %% /dev/null optimisation diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 20ba4574..eb6d3348 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -230,8 +230,7 @@ matches(false, Q1, Q2) -> Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso - Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes. + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids. bq_init(BQ, Q, Recover) -> Self = self(), @@ -296,11 +295,11 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. -backing_queue_module(#amqqueue{arguments = Args}) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of - undefined -> {ok, BQM} = application:get_env(backing_queue_module), - BQM; - _Policy -> rabbit_mirror_queue_master +backing_queue_module(Q) -> + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> {ok, BQM} = application:get_env(backing_queue_module), + BQM; + true -> rabbit_mirror_queue_master end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> @@ -920,14 +919,18 @@ i(memory, _) -> {memory, M} = process_info(self(), memory), M; i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> ''; - {ok, #amqqueue{slave_pids = SPids}} -> SPids + {ok, Q = #amqqueue{slave_pids = SPids}} = + rabbit_amqqueue:lookup(Name), + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> ''; + true -> SPids end; i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> ''; - {ok, #amqqueue{sync_slave_pids = SSPids}} -> SSPids + {ok, Q = #amqqueue{sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(Name), + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> ''; + true -> SSPids end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); @@ -1169,6 +1172,23 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> ChPid, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end)); +handle_call(start_mirroring, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + reply(ok, State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + +handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + reply(ok, State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index fb9f7e34..92abe6ea 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -27,6 +27,8 @@ -export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). +-export([init_with_existing_bq/3, stop_mirroring/1]). + -behaviour(rabbit_backing_queue). -include("rabbit.hrl"). @@ -63,6 +65,9 @@ (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). -spec(length_fun/0 :: () -> length_fun()). +-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) -> + master_state()). +-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}). -endif. @@ -82,20 +87,17 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, - AsyncCallback) -> +init(Q, Recover, AsyncCallback) -> + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, Recover, AsyncCallback), + init_with_existing_bq(Q, BQ, BQS). + +init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - MNodes1 = - (case MNodes of - all -> rabbit_mnesia:all_clustered_nodes(); - undefined -> []; - _ -> MNodes - end) -- [node()], - [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], - {ok, BQ} = application:get_env(backing_queue_module), - BQS = BQ:init(Q, Recover, AsyncCallback), + {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- SNodes], ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), #state { gm = GM, coordinator = CPid, @@ -107,8 +109,16 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, ack_msg_id = dict:new(), known_senders = sets:new() }. +stop_mirroring(State = #state { coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS }) -> + unlink(CPid), + stop_all_slaves(shutdown, State), + {BQ, BQS}. + terminate({shutdown, dropped} = Reason, - State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> %% Backing queue termination - this node has been explicitly %% dropped. Normally, non-durable queues would be tidied up on %% startup, but there's a possibility that we will be added back @@ -124,15 +134,17 @@ terminate(Reason, %% node. Thus just let some other slave take over. State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. -delete_and_terminate(Reason, State = #state { gm = GM, - backing_queue = BQ, +delete_and_terminate(Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + stop_all_slaves(Reason, State), + State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), + set_delivered = 0 }. + +stop_all_slaves(Reason, #state{gm = GM}) -> Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()], MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), - monitor_wait(MRefs), - State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), - set_delivered = 0 }. + monitor_wait(MRefs). monitor_wait([]) -> ok; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 89e334dd..ebaae995 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -18,7 +18,11 @@ -export([remove_from_queue/2, on_node_up/0, drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, - report_deaths/4, store_updated_slaves/1]). + report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, + is_mirrored/1, update_mirrors/2]). + +%% for testing only +-export([suggested_queue_nodes/4]). -include("rabbit.hrl"). @@ -39,6 +43,11 @@ -> rabbit_types:ok_or_error(any())). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). +-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> + {node(), [node()]}). +-spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). +-spec(update_mirrors/2 :: + (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -endif. @@ -52,8 +61,18 @@ %% slave (now master) receives messages it's not ready for (for %% example, new consumers). %% Returns {ok, NewMPid, DeadPids} -remove_from_queue(QueueName, DeadPids) -> - DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], + +remove_from_queue(QueueName, DeadGMPids) -> + case remove_from_queue0(QueueName, DeadGMPids) of + {ok, NewMPid, DeadQPids, ExtraNodes} -> + [ok = add_mirror(QueueName, Node) || Node <- ExtraNodes], + {ok, NewMPid, DeadQPids}; + Other -> + Other + end. + +remove_from_queue0(QueueName, DeadGMPids) -> + DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -62,57 +81,59 @@ remove_from_queue(QueueName, DeadPids) -> [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, slave_pids = SPids }] -> - [QPid1 | SPids1] = Alive = - [Pid || Pid <- [QPid | SPids], + Alive = [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), DeadNodes)], + {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - {ok, QPid1, []}; + {ok, QPid1, [], []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - store_updated_slaves( - Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }), - {ok, QPid1, [QPid | SPids] -- Alive}; + Q1 = store_updated_slaves( + Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }), + %% Sometimes a slave dying means we need + %% to start more on other nodes - + %% "exactly" mode can cause this to + %% happen. + {_, OldNodes} = actual_queue_nodes(Q1), + {_, NewNodes} = suggested_queue_nodes(Q1), + {ok, QPid1, [QPid | SPids] -- Alive, + (NewNodes -- OldNodes) -- DeadNodes}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted %% slave to find it and make its %% promotion atomic. - {ok, QPid1, []} + {ok, QPid1, [], []} end end end). on_node_up() -> - Qs = + QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (#amqqueue { mirror_nodes = undefined }, QsN) -> - QsN; - (#amqqueue { name = QName, - mirror_nodes = all }, QsN) -> - [QName | QsN]; - (#amqqueue { name = QName, - mirror_nodes = MNodes }, QsN) -> - case lists:member(node(), MNodes) of - true -> [QName | QsN]; - false -> QsN + fun (Q = #amqqueue{name = QName}, QNames0) -> + {_MNode, SNodes} = suggested_queue_nodes(Q), + case lists:member(node(), SNodes) of + true -> [QName | QNames0]; + false -> QNames0 end end, [], rabbit_queue) end), - [add_mirror(Q, node()) || Q <- Qs], + [add_mirror(QName, node()) || QName <- QNames], ok. drop_mirror(VHostPath, QueueName, MirrorNode) -> drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). -drop_mirror(Queue, MirrorNode) -> +drop_mirror(QName, MirrorNode) -> if_mirrored_queue( - Queue, + QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> @@ -131,9 +152,9 @@ drop_mirror(Queue, MirrorNode) -> add_mirror(VHostPath, QueueName, MirrorNode) -> add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). -add_mirror(Queue, MirrorNode) -> +add_mirror(QName, MirrorNode) -> if_mirrored_queue( - Queue, + QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> @@ -170,14 +191,13 @@ start_child(Name, MirrorNode, Q) -> Other end. -if_mirrored_queue(Queue, Fun) -> - rabbit_amqqueue:with( - Queue, fun (#amqqueue { arguments = Args } = Q) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of - undefined -> ok; - _ -> Fun(Q) - end - end). +if_mirrored_queue(QName, Fun) -> + rabbit_amqqueue:with(QName, fun (Q) -> + case is_mirrored(Q) of + false -> ok; + true -> Fun(Q) + end + end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; @@ -201,3 +221,83 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids, %% Wake it up so that we emit a stats event rabbit_amqqueue:wake_up(Q1), Q1. + +%%---------------------------------------------------------------------------- + +promote_slave([SPid | SPids]) -> + %% The slave pids are maintained in descending order of age, so + %% the one to promote is the oldest. + {SPid, SPids}. + +suggested_queue_nodes(Q) -> + {MNode0, SNodes} = actual_queue_nodes(Q), + MNode = case MNode0 of + none -> node(); + _ -> MNode0 + end, + suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), + {MNode, SNodes}, + rabbit_mnesia:running_clustered_nodes()). + +policy(Policy, Q) -> + case rabbit_policy:get(Policy, Q) of + {ok, P} -> P; + _ -> none + end. + +suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) -> + {MNode, All -- [MNode]}; +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, _All) -> + Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], + case lists:member(MNode, Nodes) of + true -> {MNode, Nodes -- [MNode]}; + false -> promote_slave(Nodes) + end; +suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, All) -> + SCount = Count - 1, + {MNode, case SCount > length(SNodes) of + true -> Cand = (All -- [MNode]) -- SNodes, + SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); + false -> lists:sublist(SNodes, SCount) + end}; +suggested_queue_nodes(_, _, {MNode, _}, _) -> + {MNode, []}. + +actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> + {case MPid of + none -> none; + _ -> node(MPid) + end, [node(Pid) || Pid <- SPids]}. + +is_mirrored(Q) -> + case policy(<<"ha-mode">>, Q) of + <<"all">> -> true; + <<"nodes">> -> true; + <<"exactly">> -> true; + _ -> false + end. + +update_mirrors(OldQ = #amqqueue{pid = QPid}, + NewQ = #amqqueue{pid = QPid}) -> + case {is_mirrored(OldQ), is_mirrored(NewQ)} of + {false, false} -> ok; + {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); + {false, true} -> rabbit_amqqueue:start_mirroring(QPid); + {true, true} -> update_mirrors0(OldQ, NewQ) + end. + +update_mirrors0(OldQ = #amqqueue{name = QName}, + NewQ = #amqqueue{name = QName}) -> + All = fun ({A,B}) -> [A|B] end, + OldNodes = All(actual_queue_nodes(OldQ)), + NewNodes = All(suggested_queue_nodes(NewQ)), + %% When a mirror dies, remove_from_queue/2 might have to add new + %% slaves (in "exactly" mode). It will check mnesia to see which + %% slaves there currently are. If drop_mirror/2 is invoked first + %% then when we end up in remove_from_queue/2 it will not see the + %% slaves that add_mirror/2 will add, and also want to add them + %% (even though we are not responding to the death of a + %% mirror). Breakage ensues. + [ok = add_mirror(QName, Node) || Node <- NewNodes -- OldNodes], + [ok = drop_mirror(QName, Node) || Node <- OldNodes -- NewNodes], + ok. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 3e45f026..12e3058a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -150,9 +150,7 @@ init_it(Self, Node, QueueName) -> [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = mnesia:read({rabbit_queue, QueueName}), case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> MPids1 = MPids ++ [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), + [] -> add_slave(Q1, Self, MPids), {new, QPid}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of true -> duplicate_live_master; @@ -160,13 +158,16 @@ init_it(Self, Node, QueueName) -> end; [SPid] -> case rabbit_misc:is_process_alive(SPid) of true -> existing; - false -> MPids1 = (MPids -- [SPid]) ++ [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), + false -> add_slave(Q1, Self, MPids -- [SPid]), {new, QPid} end end. +%% Add to the end, so they are in descending order of age, see +%% rabbit_mirror_queue_misc:promote_slave/1 +add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves( + Q#amqqueue{slave_pids = MPids ++ [New]}). + handle_call({deliver, Delivery = #delivery { immediate = true }}, From, State) -> %% It is safe to reply 'false' here even if a) we've not seen the diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3cc0e5db..6cb34b09 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -54,6 +54,7 @@ all_tests() -> passed = test_log_management_during_startup(), passed = test_statistics(), passed = test_arguments_parser(), + passed = test_dynamic_mirroring(), passed = test_user_management(), passed = test_runtime_parameters(), passed = test_server_status(), @@ -883,6 +884,34 @@ test_arguments_parser() -> passed. +test_dynamic_mirroring() -> + %% Just unit tests of the node selection logic, see multi node + %% tests for the rest... + Test = fun ({NewM, NewSs}, Policy, Params, {OldM, OldSs}, All) -> + {NewM, NewSs0} = + rabbit_mirror_queue_misc:suggested_queue_nodes( + Policy, Params, {OldM, OldSs}, All), + NewSs = lists:sort(NewSs0) + end, + + Test({a,[b,c]},<<"all">>,'_',{a,[]}, [a,b,c]), + Test({a,[b,c]},<<"all">>,'_',{a,[b,c]},[a,b,c]), + Test({a,[b,c]},<<"all">>,'_',{a,[d]}, [a,b,c]), + + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]), + Test({b,[a,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]), + + Test({a,[b]}, <<"exactly">>,2,{a,[]}, [a,b,c,d]), + Test({a,[b,c]},<<"exactly">>,3,{a,[]}, [a,b,c,d]), + Test({a,[c]}, <<"exactly">>,2,{a,[c]}, [a,b,c,d]), + Test({a,[b,c]},<<"exactly">>,3,{a,[c]}, [a,b,c,d]), + Test({a,[c]}, <<"exactly">>,2,{a,[c,d]},[a,b,c,d]), + Test({a,[c,d]},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]), + + passed. + test_user_management() -> %% lots if stuff that should fail diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 8966bcab..9d6c7ca3 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -118,8 +118,7 @@ exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), pid :: rabbit_types:maybe(pid()), - slave_pids :: [pid()], - mirror_nodes :: [node()] | 'undefined' | 'all'}). + slave_pids :: [pid()]}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 47b22b98..ac072c11 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -41,6 +41,7 @@ -rabbit_upgrade({policy, mnesia, [exchange_scratches, ha_mirrors]}). -rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). +-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). %% ------------------------------------------------------------------- @@ -64,6 +65,7 @@ -spec(runtime_parameters/0 :: () -> 'ok'). -spec(policy/0 :: () -> 'ok'). -spec(sync_slave_pids/0 :: () -> 'ok'). +-spec(no_mirror_nodes/0 :: () -> 'ok'). -endif. @@ -254,6 +256,18 @@ sync_slave_pids() -> || T <- Tables], ok. +no_mirror_nodes() -> + Tables = [rabbit_queue, rabbit_durable_queue], + RemoveMirrorNodesFun = + fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, _MNodes, Pol}) -> + {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol} + end, + [ok = transform(T, RemoveMirrorNodesFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, sync_slave_pids, policy]) + || T <- Tables], + ok. + %%-------------------------------------------------------------------- |