diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-10-02 11:36:31 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-10-02 11:36:31 +0100 |
commit | 62fd5fc3ee8563ffba6cd735358dbef53401752d (patch) | |
tree | 77d8f349b44171e8b69485b76b7895f9eb91757a | |
parent | 8494b83766c2d310e631d95fc15916fb18e60ad2 (diff) | |
parent | 995def5af7bc95d652fa38b092195f63375e60b5 (diff) | |
download | rabbitmq-server-bug24908.tar.gz |
Merge defaultbug24908
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 83 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 46 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 45 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 217 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 33 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 36 | ||||
-rw-r--r-- | src/rabbit_types.erl | 3 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 14 |
10 files changed, 326 insertions, 157 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index f2389587..41cce0a3 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -47,8 +47,7 @@ -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, sync_slave_pids, mirror_nodes, - policy}). + arguments, pid, slave_pids, sync_slave_pids, policy}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4a20a1bc..8fc103e4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,7 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). - +-export([start_mirroring/1, stop_mirroring/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -165,6 +165,8 @@ -spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(policy_changed/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). +-spec(start_mirroring/1 :: (pid()) -> 'ok'). +-spec(stop_mirroring/1 :: (pid()) -> 'ok'). -endif. @@ -210,19 +212,19 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - {Node, MNodes} = determine_queue_nodes(Args), - Q = start_queue_process(Node, #amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - mirror_nodes = MNodes}), - case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of + Q0 = rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = []}), + {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), + Q1 = start_queue_process(Node, Q0), + case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); - Q1 -> Q1 + Q2 -> Q2 end. internal_declare(Q, true) -> @@ -271,24 +273,8 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -policy_changed(_Q1, _Q2) -> - ok. - -determine_queue_nodes(Args) -> - Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), - PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), - case {Policy, PolicyParams} of - {{_Type, <<"nodes">>}, {array, Nodes}} -> - case [list_to_atom(binary_to_list(Node)) || - {longstr, Node} <- Nodes] of - [Node] -> {Node, undefined}; - [First | Rest] -> {First, [First | Rest]} - end; - {{_Type, <<"all">>}, _} -> - {node(), all}; - _ -> - {node(), undefined} - end. +policy_changed(Q1, Q2) -> + rabbit_mirror_queue_misc:update_mirrors(Q1, Q2). start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), @@ -311,8 +297,6 @@ lookup(Name) -> with(Name, F, E) -> case lookup(Name) of - {ok, Q = #amqqueue{slave_pids = []}} -> - rabbit_misc:with_exit_handler(E, fun () -> F(Q) end); {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do @@ -364,13 +348,11 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, RequiredArgs) -> rabbit_misc:assert_args_equivalence( - Args, RequiredArgs, QueueName, - [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). + Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]). check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, - {<<"x-ha-policy">>, fun check_ha_policy_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], [case rabbit_misc:table_lookup(Args, Key) of @@ -421,29 +403,6 @@ check_dlxrk_arg({longstr, _}, Args) -> check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. -check_ha_policy_arg({longstr, <<"all">>}, _Args) -> - ok; -check_ha_policy_arg({longstr, <<"nodes">>}, Args) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of - undefined -> - {error, {require, 'x-ha-policy-params'}}; - {array, []} -> - {error, {require_non_empty_list_of_nodes_for_ha}}; - {array, Ary} -> - case lists:all(fun ({longstr, _Node}) -> true; - (_ ) -> false - end, Ary) of - true -> ok; - false -> {error, {require_node_list_as_longstrs_for_ha, Ary}} - end; - {Type, _} -> - {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} - end; -check_ha_policy_arg({longstr, Policy}, _Args) -> - {error, {invalid_ha_policy, Policy}}; -check_ha_policy_arg({Type, _}, _Args) -> - {error, {unacceptable_type, Type}}. - list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). @@ -613,6 +572,9 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). +start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring). +stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = @@ -647,8 +609,7 @@ pseudo_queue(QueueName, Pid) -> auto_delete = false, arguments = [], pid = Pid, - slave_pids = [], - mirror_nodes = undefined}. + slave_pids = []}. deliver([], #delivery{mandatory = false}, _Flow) -> %% /dev/null optimisation diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a0e74b42..e8d8fa5e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -229,8 +229,7 @@ matches(false, Q1, Q2) -> Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso - Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes. + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids. bq_init(BQ, Q, Recover) -> Self = self(), @@ -295,11 +294,11 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. -backing_queue_module(#amqqueue{arguments = Args}) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of - undefined -> {ok, BQM} = application:get_env(backing_queue_module), - BQM; - _Policy -> rabbit_mirror_queue_master +backing_queue_module(Q) -> + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> {ok, BQM} = application:get_env(backing_queue_module), + BQM; + true -> rabbit_mirror_queue_master end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> @@ -917,14 +916,18 @@ i(memory, _) -> {memory, M} = process_info(self(), memory), M; i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> ''; - {ok, #amqqueue{slave_pids = SPids}} -> SPids + {ok, Q = #amqqueue{slave_pids = SPids}} = + rabbit_amqqueue:lookup(Name), + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> ''; + true -> SPids end; i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> ''; - {ok, #amqqueue{sync_slave_pids = SSPids}} -> SSPids + {ok, Q = #amqqueue{sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(Name), + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> ''; + true -> SSPids end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); @@ -1148,6 +1151,23 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> ChPid, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end)); +handle_call(start_mirroring, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + reply(ok, State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + +handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + reply(ok, State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 4455b441..5284000b 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -344,9 +344,10 @@ handle_cast({gm_deaths, Deaths}, State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, MPid, DeadPids} -> + {ok, MPid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), noreply(State); {error, not_found} -> {stop, normal, State} diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 48650206..cfef98b7 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -27,6 +27,8 @@ -export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). +-export([init_with_existing_bq/3, stop_mirroring/1]). + -behaviour(rabbit_backing_queue). -include("rabbit.hrl"). @@ -63,6 +65,9 @@ (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). -spec(length_fun/0 :: () -> length_fun()). +-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) -> + master_state()). +-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}). -endif. @@ -82,19 +87,17 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, - AsyncCallback) -> +init(Q, Recover, AsyncCallback) -> + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, Recover, AsyncCallback), + init_with_existing_bq(Q, BQ, BQS). + +init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - MNodes1 = (case MNodes of - all -> rabbit_mnesia:cluster_nodes(all); - undefined -> []; - _ -> MNodes - end) -- [node()], - [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], - {ok, BQ} = application:get_env(backing_queue_module), - BQS = BQ:init(Q, Recover, AsyncCallback), + {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), #state { gm = GM, coordinator = CPid, @@ -106,8 +109,16 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, ack_msg_id = dict:new(), known_senders = sets:new() }. +stop_mirroring(State = #state { coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS }) -> + unlink(CPid), + stop_all_slaves(shutdown, State), + {BQ, BQS}. + terminate({shutdown, dropped} = Reason, - State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> %% Backing queue termination - this node has been explicitly %% dropped. Normally, non-durable queues would be tidied up on %% startup, but there's a possibility that we will be added back @@ -123,18 +134,20 @@ terminate(Reason, %% node. Thus just let some other slave take over. State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. -delete_and_terminate(Reason, State = #state { gm = GM, - backing_queue = BQ, +delete_and_terminate(Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + stop_all_slaves(Reason, State), + State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), + set_delivered = 0 }. + +stop_all_slaves(Reason, #state{gm = GM}) -> Info = gm:info(GM), Slaves = [Pid || Pid <- proplists:get_value(group_members, Info), node(Pid) =/= node()], MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), [receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs], - ok = gm:forget_group(proplists:get_value(group_name, Info)), - State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), - set_delivered = 0 }. + ok = gm:forget_group(proplists:get_value(group_name, Info)). purge(State = #state { gm = GM, backing_queue = BQ, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 89e334dd..453f2f2c 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -16,9 +16,12 @@ -module(rabbit_mirror_queue_misc). --export([remove_from_queue/2, on_node_up/0, - drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, - report_deaths/4, store_updated_slaves/1]). +-export([remove_from_queue/2, on_node_up/0, add_mirrors/2, add_mirror/2, + report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, + is_mirrored/1, update_mirrors/2]). + +%% for testing only +-export([suggested_queue_nodes/4]). -include("rabbit.hrl"). @@ -28,17 +31,18 @@ -spec(remove_from_queue/2 :: (rabbit_amqqueue:name(), [pid()]) - -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). + -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). --spec(drop_mirror/2 :: - (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())). +-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok'). -spec(add_mirror/2 :: (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())). --spec(add_mirror/3 :: - (rabbit_types:vhost(), binary(), atom()) - -> rabbit_types:ok_or_error(any())). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). +-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> + {node(), [node()]}). +-spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). +-spec(update_mirrors/2 :: + (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -endif. @@ -52,8 +56,10 @@ %% slave (now master) receives messages it's not ready for (for %% example, new consumers). %% Returns {ok, NewMPid, DeadPids} -remove_from_queue(QueueName, DeadPids) -> - DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], + +remove_from_queue(QueueName, DeadGMPids) -> + DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], + ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- DeadNodes, rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -62,57 +68,63 @@ remove_from_queue(QueueName, DeadPids) -> [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, slave_pids = SPids }] -> - [QPid1 | SPids1] = Alive = - [Pid || Pid <- [QPid | SPids], + Alive = [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), DeadNodes)], + {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - {ok, QPid1, []}; + {ok, QPid1, [], []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - store_updated_slaves( - Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }), - {ok, QPid1, [QPid | SPids] -- Alive}; + Q1 = store_updated_slaves( + Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }), + %% Sometimes a slave dying means we need + %% to start more on other nodes - + %% "exactly" mode can cause this to + %% happen. + {_, OldNodes} = actual_queue_nodes(Q1), + {_, NewNodes} = suggested_queue_nodes( + Q1, ClusterNodes), + {ok, QPid1, [QPid | SPids] -- Alive, + NewNodes -- OldNodes}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted %% slave to find it and make its %% promotion atomic. - {ok, QPid1, []} + {ok, QPid1, [], []} end end end). on_node_up() -> - Qs = + ClusterNodes = rabbit_mnesia:cluster_nodes(running), + QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (#amqqueue { mirror_nodes = undefined }, QsN) -> - QsN; - (#amqqueue { name = QName, - mirror_nodes = all }, QsN) -> - [QName | QsN]; - (#amqqueue { name = QName, - mirror_nodes = MNodes }, QsN) -> - case lists:member(node(), MNodes) of - true -> [QName | QsN]; - false -> QsN + fun (Q = #amqqueue{name = QName}, QNames0) -> + {_MNode, SNodes} = suggested_queue_nodes( + Q, ClusterNodes), + case lists:member(node(), SNodes) of + true -> [QName | QNames0]; + false -> QNames0 end end, [], rabbit_queue) end), - [add_mirror(Q, node()) || Q <- Qs], + [ok = add_mirror(QName, node()) || QName <- QNames], ok. -drop_mirror(VHostPath, QueueName, MirrorNode) -> - drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). +drop_mirrors(QName, Nodes) -> + [ok = drop_mirror(QName, Node) || Node <- Nodes], + ok. -drop_mirror(Queue, MirrorNode) -> +drop_mirror(QName, MirrorNode) -> if_mirrored_queue( - Queue, + QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> @@ -128,12 +140,13 @@ drop_mirror(Queue, MirrorNode) -> end end). -add_mirror(VHostPath, QueueName, MirrorNode) -> - add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). +add_mirrors(QName, Nodes) -> + [ok = add_mirror(QName, Node) || Node <- Nodes], + ok. -add_mirror(Queue, MirrorNode) -> +add_mirror(QName, MirrorNode) -> if_mirrored_queue( - Queue, + QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> @@ -150,11 +163,19 @@ add_mirror(Queue, MirrorNode) -> end). start_child(Name, MirrorNode, Q) -> - case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of + case rabbit_misc:with_exit_handler( + rabbit_misc:const({ok, down}), + fun () -> + rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) + end) of {ok, undefined} -> %% this means the mirror process was %% already running on the given node. ok; + {ok, down} -> + %% Node went down between us deciding to start a mirror + %% and actually starting it. Which is fine. + ok; {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, SPid]), @@ -165,19 +186,18 @@ start_child(Name, MirrorNode, Q) -> [rabbit_misc:rs(Name), MirrorNode, StalePid]), ok; {error, {{duplicate_live_master, _}=Err, _}} -> - throw(Err); + Err; Other -> Other end. -if_mirrored_queue(Queue, Fun) -> - rabbit_amqqueue:with( - Queue, fun (#amqqueue { arguments = Args } = Q) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of - undefined -> ok; - _ -> Fun(Q) - end - end). +if_mirrored_queue(QName, Fun) -> + rabbit_amqqueue:with(QName, fun (Q) -> + case is_mirrored(Q) of + false -> ok; + true -> Fun(Q) + end + end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; @@ -201,3 +221,102 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids, %% Wake it up so that we emit a stats event rabbit_amqqueue:wake_up(Q1), Q1. + +%%---------------------------------------------------------------------------- + +promote_slave([SPid | SPids]) -> + %% The slave pids are maintained in descending order of age, so + %% the one to promote is the oldest. + {SPid, SPids}. + +suggested_queue_nodes(Q) -> + suggested_queue_nodes(Q, rabbit_mnesia:cluster_nodes(running)). + +%% This variant exists so we can pull a call to +%% rabbit_mnesia:cluster_nodes(running) out of a loop or +%% transaction or both. +suggested_queue_nodes(Q, ClusterNodes) -> + {MNode0, SNodes} = actual_queue_nodes(Q), + MNode = case MNode0 of + none -> node(); + _ -> MNode0 + end, + suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), + {MNode, SNodes}, ClusterNodes). + +policy(Policy, Q) -> + case rabbit_policy:get(Policy, Q) of + {ok, P} -> P; + _ -> none + end. + +suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) -> + {MNode, All -- [MNode]}; +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) -> + Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], + Unavailable = Nodes -- All, + Available = Nodes -- Unavailable, + case Available of + [] -> %% We have never heard of anything? Not much we can do but + %% keep the master alive. + {MNode, []}; + _ -> case lists:member(MNode, Available) of + true -> {MNode, Available -- [MNode]}; + false -> promote_slave(Available) + end + end; +suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, All) -> + SCount = Count - 1, + {MNode, case SCount > length(SNodes) of + true -> Cand = (All -- [MNode]) -- SNodes, + SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); + false -> lists:sublist(SNodes, SCount) + end}; +suggested_queue_nodes(_, _, {MNode, _}, _) -> + {MNode, []}. + +actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> + {case MPid of + none -> none; + _ -> node(MPid) + end, [node(Pid) || Pid <- SPids]}. + +is_mirrored(Q) -> + case policy(<<"ha-mode">>, Q) of + <<"all">> -> true; + <<"nodes">> -> true; + <<"exactly">> -> true; + _ -> false + end. + + +%% [1] - rabbit_amqqueue:start_mirroring/1 will turn unmirrored to +%% master and start any needed slaves. However, if node(QPid) is not +%% in the nodes for the policy, it won't switch it. So this is for the +%% case where we kill the existing queue and restart elsewhere. TODO: +%% is this TRTTD? All alternatives seem ugly. +update_mirrors(OldQ = #amqqueue{pid = QPid}, + NewQ = #amqqueue{pid = QPid}) -> + case {is_mirrored(OldQ), is_mirrored(NewQ)} of + {false, false} -> ok; + {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); + {false, true} -> rabbit_amqqueue:start_mirroring(QPid), + update_mirrors0(OldQ, NewQ); %% [1] + {true, true} -> update_mirrors0(OldQ, NewQ) + end. + +update_mirrors0(OldQ = #amqqueue{name = QName}, + NewQ = #amqqueue{name = QName}) -> + All = fun ({A,B}) -> [A|B] end, + OldNodes = All(actual_queue_nodes(OldQ)), + NewNodes = All(suggested_queue_nodes(NewQ)), + %% When a mirror dies, remove_from_queue/2 might have to add new + %% slaves (in "exactly" mode). It will check mnesia to see which + %% slaves there currently are. If drop_mirror/2 is invoked first + %% then when we end up in remove_from_queue/2 it will not see the + %% slaves that add_mirror/2 will add, and also want to add them + %% (even though we are not responding to the death of a + %% mirror). Breakage ensues. + add_mirrors(QName, NewNodes -- OldNodes), + drop_mirrors(QName, OldNodes -- NewNodes), + ok. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 8e541db1..b4b0d4d3 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -150,9 +150,7 @@ init_it(Self, Node, QueueName) -> [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = mnesia:read({rabbit_queue, QueueName}), case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> MPids1 = MPids ++ [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), + [] -> add_slave(Q1, Self, MPids), {new, QPid}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of true -> duplicate_live_master; @@ -160,13 +158,16 @@ init_it(Self, Node, QueueName) -> end; [SPid] -> case rabbit_misc:is_process_alive(SPid) of true -> existing; - false -> MPids1 = (MPids -- [SPid]) ++ [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), + false -> add_slave(Q1, Self, MPids -- [SPid]), {new, QPid} end end. +%% Add to the end, so they are in descending order of age, see +%% rabbit_mirror_queue_misc:promote_slave/1 +add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves( + Q#amqqueue{slave_pids = MPids ++ [New]}). + handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" deliver mode. gen_server2:reply(From, ok), @@ -182,18 +183,25 @@ handle_call({gm_deaths, Deaths}, From, {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; - {ok, Pid, DeadPids} -> + {ok, Pid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, DeadPids), if node(Pid) =:= node(MPid) -> %% master hasn't changed - reply(ok, State); + gen_server2:reply(From, ok), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), + noreply(State); node(Pid) =:= node() -> %% we've become master - promote_me(From, State); + QueueState = promote_me(From, State), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), + {become, rabbit_amqqueue_process, QueueState, hibernate}; true -> %% master has changed to not us. gen_server2:reply(From, ok), + %% assertion, we don't need to add_mirrors/2 in this + %% branch, see last clause in remove_from_queue/2 + [] = ExtraNodes, erlang:monitor(process, Pid), %% GM is lazy. So we know of the death of the %% slave since it is a neighbour of ours, but @@ -533,10 +541,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], - QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, KS, MTC), - {become, rabbit_amqqueue_process, QueueState, hibernate}. + rabbit_amqqueue_process:init_with_backing_queue_state( + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags, + Deliveries, KS, MTC). noreply(State) -> {NewState, Timeout} = next_state(State), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index df0ee721..11f280bb 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -54,6 +54,7 @@ all_tests() -> passed = test_log_management_during_startup(), passed = test_statistics(), passed = test_arguments_parser(), + passed = test_dynamic_mirroring(), passed = test_user_management(), passed = test_runtime_parameters(), passed = test_server_status(), @@ -882,6 +883,41 @@ test_arguments_parser() -> passed. +test_dynamic_mirroring() -> + %% Just unit tests of the node selection logic, see multi node + %% tests for the rest... + Test = fun ({NewM, NewSs}, Policy, Params, {OldM, OldSs}, All) -> + {NewM, NewSs0} = + rabbit_mirror_queue_misc:suggested_queue_nodes( + Policy, Params, {OldM, OldSs}, All), + NewSs = lists:sort(NewSs0) + end, + + Test({a,[b,c]},<<"all">>,'_',{a,[]}, [a,b,c]), + Test({a,[b,c]},<<"all">>,'_',{a,[b,c]},[a,b,c]), + Test({a,[b,c]},<<"all">>,'_',{a,[d]}, [a,b,c]), + + %% Add a node + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]), + Test({b,[a,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), + %% Add two nodes and drop one + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), + %% Promote slave to master by policy + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]), + %% Don't try to include nodes that are not running + Test({a,[b]}, <<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]), + %% If we can't find any of the nodes listed then just keep the master + Test({a,[]}, <<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]), + + Test({a,[b]}, <<"exactly">>,2,{a,[]}, [a,b,c,d]), + Test({a,[b,c]},<<"exactly">>,3,{a,[]}, [a,b,c,d]), + Test({a,[c]}, <<"exactly">>,2,{a,[c]}, [a,b,c,d]), + Test({a,[b,c]},<<"exactly">>,3,{a,[c]}, [a,b,c,d]), + Test({a,[c]}, <<"exactly">>,2,{a,[c,d]},[a,b,c,d]), + Test({a,[c,d]},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]), + + passed. + test_user_management() -> %% lots if stuff that should fail diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index f488afb4..5bc3d9f5 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -117,8 +117,7 @@ exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), pid :: rabbit_types:maybe(pid()), - slave_pids :: [pid()], - mirror_nodes :: [node()] | 'undefined' | 'all'}). + slave_pids :: [pid()]}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 7c054993..ddc9c565 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -41,6 +41,7 @@ -rabbit_upgrade({policy, mnesia, [exchange_scratches, ha_mirrors]}). -rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). +-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). %% ------------------------------------------------------------------- @@ -64,6 +65,7 @@ -spec(runtime_parameters/0 :: () -> 'ok'). -spec(policy/0 :: () -> 'ok'). -spec(sync_slave_pids/0 :: () -> 'ok'). +-spec(no_mirror_nodes/0 :: () -> 'ok'). -endif. @@ -254,6 +256,18 @@ sync_slave_pids() -> || T <- Tables], ok. +no_mirror_nodes() -> + Tables = [rabbit_queue, rabbit_durable_queue], + RemoveMirrorNodesFun = + fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, _MNodes, Pol}) -> + {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol} + end, + [ok = transform(T, RemoveMirrorNodesFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, sync_slave_pids, policy]) + || T <- Tables], + ok. + %%-------------------------------------------------------------------- |