diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-07-09 17:57:50 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-07-09 17:57:50 +0100 |
commit | 79bb2b059719fef123b23e4166e93a46376f382b (patch) | |
tree | 8d79fc414e0e04ec76def6d590cd1149b0e9e8c7 | |
parent | 3432b6db0947dbdaab853f980e12aa470154368e (diff) | |
parent | 40053cb586938328d6c0fcb05de3b0a4da4dd693 (diff) | |
download | rabbitmq-server-79bb2b059719fef123b23e4166e93a46376f382b.tar.gz |
Merge bug25048
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 79 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 58 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 46 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 103 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 12 | ||||
-rw-r--r-- | src/rabbit_types.erl | 3 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 14 |
8 files changed, 195 insertions, 123 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index d6fac46d..a6d7f399 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -47,8 +47,7 @@ -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, sync_slave_pids, mirror_nodes, - policy}). + arguments, pid, slave_pids, sync_slave_pids, policy}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 05aad4bd..16ab1edb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,6 +30,8 @@ -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). +%% temp +-export([start_mirroring/1, stop_mirroring/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -208,19 +210,19 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - {Node, MNodes} = determine_queue_nodes(Args), - Q = start_queue_process(Node, #amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - mirror_nodes = MNodes}), - case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of + Q0 = rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = []}), + {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), + Q1 = start_queue_process(Node, Q0), + case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); - Q1 -> Q1 + Q2 -> Q2 end. internal_declare(Q, true) -> @@ -269,24 +271,8 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -policy_changed(_Q1, _Q2) -> - ok. - -determine_queue_nodes(Args) -> - Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), - PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), - case {Policy, PolicyParams} of - {{_Type, <<"nodes">>}, {array, Nodes}} -> - case [list_to_atom(binary_to_list(Node)) || - {longstr, Node} <- Nodes] of - [Node] -> {Node, undefined}; - [First | Rest] -> {First, [First | Rest]} - end; - {{_Type, <<"all">>}, _} -> - {node(), all}; - _ -> - {node(), undefined} - end. +policy_changed(Q1, Q2) -> + rabbit_mirror_queue_misc:update_mirrors(Q1, Q2). start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), @@ -353,13 +339,11 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, RequiredArgs) -> rabbit_misc:assert_args_equivalence( - Args, RequiredArgs, QueueName, - [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). + Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]). check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, - {<<"x-ha-policy">>, fun check_ha_policy_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], [case rabbit_misc:table_lookup(Args, Key) of @@ -408,29 +392,6 @@ check_dlxrk_arg({longstr, _}, Args) -> check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. -check_ha_policy_arg({longstr, <<"all">>}, _Args) -> - ok; -check_ha_policy_arg({longstr, <<"nodes">>}, Args) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of - undefined -> - {error, {require, 'x-ha-policy-params'}}; - {array, []} -> - {error, {require_non_empty_list_of_nodes_for_ha}}; - {array, Ary} -> - case lists:all(fun ({longstr, _Node}) -> true; - (_ ) -> false - end, Ary) of - true -> ok; - false -> {error, {require_node_list_as_longstrs_for_ha, Ary}} - end; - {Type, _} -> - {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} - end; -check_ha_policy_arg({longstr, Policy}, _Args) -> - {error, {invalid_ha_policy, Policy}}; -check_ha_policy_arg({Type, _}, _Args) -> - {error, {unacceptable_type, Type}}. - list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). @@ -595,6 +556,9 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). +start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring). +stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = @@ -629,8 +593,7 @@ pseudo_queue(QueueName, Pid) -> auto_delete = false, arguments = [], pid = Pid, - slave_pids = [], - mirror_nodes = undefined}. + slave_pids = []}. deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> %% /dev/null optimisation diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1bf25bb2..5a3361ab 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -229,8 +229,7 @@ matches(false, Q1, Q2) -> Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso - Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes. + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids. bq_init(BQ, Q, Recover) -> Self = self(), @@ -295,11 +294,11 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. -backing_queue_module(#amqqueue{arguments = Args}) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of - undefined -> {ok, BQM} = application:get_env(backing_queue_module), - BQM; - _Policy -> rabbit_mirror_queue_master +backing_queue_module(Q) -> + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> {ok, BQM} = application:get_env(backing_queue_module), + BQM; + true -> rabbit_mirror_queue_master end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> @@ -924,15 +923,19 @@ i(consumers, _) -> i(memory, _) -> {memory, M} = process_info(self(), memory), M; -i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> ''; - {ok, #amqqueue{slave_pids = SPids}} -> SPids +i(slave_pids, #q{q = Q = #amqqueue{name = Name}}) -> + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> ''; + true -> {ok, #amqqueue{slave_pids = SPids}} = + rabbit_amqqueue:lookup(Name), + SPids end; -i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> ''; - {ok, #amqqueue{sync_slave_pids = SSPids}} -> SSPids +i(synchronised_slave_pids, #q{q = Q = #amqqueue{name = Name}}) -> + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> ''; + true -> {ok, #amqqueue{sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(Name), + SSPids end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); @@ -1174,6 +1177,31 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> ChPid, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end)); +handle_call(start_mirroring, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + reply(ok, State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + +handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, qname(State)}) of + [] -> ok; + [Q] -> rabbit_amqqueue:store_queue( + Q#amqqueue{slave_pids = undefined}) + end + end), + reply(ok, State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 750bcd56..e5ca085d 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -27,6 +27,9 @@ -export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). +%% temp +-export([init_with_existing_bq/3, stop_mirroring/1]). + -behaviour(rabbit_backing_queue). -include("rabbit.hrl"). @@ -82,20 +85,17 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, - AsyncCallback) -> +init(Q, Recover, AsyncCallback) -> + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, Recover, AsyncCallback), + init_with_existing_bq(Q, BQ, BQS). + +init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - MNodes1 = - (case MNodes of - all -> rabbit_mnesia:all_clustered_nodes(); - undefined -> []; - _ -> MNodes - end) -- [node()], - [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], - {ok, BQ} = application:get_env(backing_queue_module), - BQS = BQ:init(Q, Recover, AsyncCallback), + {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- SNodes], ok = gm:broadcast(GM, {length, BQ:len(BQS)}), #state { gm = GM, coordinator = CPid, @@ -107,14 +107,24 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, ack_msg_id = dict:new(), known_senders = sets:new() }. +stop_mirroring(State = #state { coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS }) -> + unlink(CPid), + stop_all_slaves(unmirroring, State), + {BQ, BQS}. + terminate({shutdown, dropped} = Reason, - State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> %% Backing queue termination - this node has been explicitly %% dropped. Normally, non-durable queues would be tidied up on %% startup, but there's a possibility that we will be added back %% in without this node being restarted. Thus we must do the full %% blown delete_and_terminate now, but only locally: we do not %% broadcast delete_and_terminate. + ok = gm:leave(GM), %% TODO presumably we need this? State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), set_delivered = 0 }; terminate(Reason, @@ -124,15 +134,17 @@ terminate(Reason, %% node. Thus just let some other slave take over. State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. -delete_and_terminate(Reason, State = #state { gm = GM, - backing_queue = BQ, +delete_and_terminate(Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + stop_all_slaves(Reason, State), + State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), + set_delivered = 0 }. + +stop_all_slaves(Reason, #state{gm = GM}) -> Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()], MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), - monitor_wait(MRefs), - State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), - set_delivered = 0 }. + monitor_wait(MRefs). monitor_wait([]) -> ok; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 83ecd4bf..8a5d4441 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -20,6 +20,10 @@ drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, report_deaths/4, store_updated_slaves/1]). +%% temp +-export([suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2]). + + -include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -86,32 +90,27 @@ remove_from_queue(QueueName, DeadPids) -> end). on_node_up() -> - Qs = + QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (#amqqueue { mirror_nodes = undefined }, QsN) -> - QsN; - (#amqqueue { name = QName, - mirror_nodes = all }, QsN) -> - [QName | QsN]; - (#amqqueue { name = QName, - mirror_nodes = MNodes }, QsN) -> - case lists:member(node(), MNodes) of - true -> [QName | QsN]; - false -> QsN + fun (Q = #amqqueue{name = QName}, QNames0) -> + {_MNode, SNodes} = suggested_queue_nodes(Q), + case lists:member(node(), SNodes) of + true -> [QName | QNames0]; + false -> QNames0 end end, [], rabbit_queue) end), - [add_mirror(Q, node()) || Q <- Qs], + [add_mirror(QName, node()) || QName <- QNames], ok. drop_mirror(VHostPath, QueueName, MirrorNode) -> drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). -drop_mirror(Queue, MirrorNode) -> +drop_mirror(QName, MirrorNode) -> if_mirrored_queue( - Queue, + QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> @@ -130,9 +129,9 @@ drop_mirror(Queue, MirrorNode) -> add_mirror(VHostPath, QueueName, MirrorNode) -> add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). -add_mirror(Queue, MirrorNode) -> +add_mirror(QName, MirrorNode) -> if_mirrored_queue( - Queue, + QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> case rabbit_mirror_queue_slave_sup:start_child( @@ -151,14 +150,13 @@ add_mirror(Queue, MirrorNode) -> end end). -if_mirrored_queue(Queue, Fun) -> - rabbit_amqqueue:with( - Queue, fun (#amqqueue { arguments = Args } = Q) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of - undefined -> ok; - _ -> Fun(Q) - end - end). +if_mirrored_queue(QName, Fun) -> + rabbit_amqqueue:with(QName, fun (Q) -> + case is_mirrored(Q) of + false -> ok; + true -> Fun(Q) + end + end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; @@ -182,3 +180,60 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids, %% Wake it up so that we emit a stats event rabbit_amqqueue:wake_up(Q1), Q1. + +%%---------------------------------------------------------------------------- + +%% TODO this should take account of current nodes so we don't throw +%% away mirrors or change the master needlessly +suggested_queue_nodes(Q) -> + case [rabbit_policy:get(P, Q) || P <- [<<"ha-mode">>, <<"ha-params">>]] of + [{ok, <<"all">>}, _] -> + {node(), rabbit_mnesia:all_clustered_nodes() -- [node()]}; + [{ok, <<"nodes">>}, {ok, Nodes}] -> + case [list_to_atom(binary_to_list(Node)) || Node <- Nodes] of + [Node] -> {Node, []}; + [First | Rest] -> {First, Rest} + end; + [{ok, <<"at-least">>}, {ok, Count}] -> + {node(), lists:sublist( + rabbit_mnesia:all_clustered_nodes(), Count) -- [node()]}; + _ -> + {node(), []} + end. + +actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> + MNode = case MPid of + undefined -> undefined; + _ -> node(MPid) + end, + SNodes = case SPids of + undefined -> undefined; + _ -> [node(Pid) || Pid <- SPids] + end, + {MNode, SNodes}. + +is_mirrored(Q) -> + case rabbit_policy:get(<<"ha-mode">>, Q) of + {ok, <<"all">>} -> true; + {ok, <<"nodes">>} -> true; + {ok, <<"at-least">>} -> true; + _ -> false + end. + +update_mirrors(OldQ = #amqqueue{name = QName, pid = QPid}, + NewQ = #amqqueue{name = QName, pid = QPid}) -> + case {is_mirrored(OldQ), is_mirrored(NewQ)} of + {false, false} -> ok; + {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); + {false, true} -> rabbit_amqqueue:start_mirroring(QPid); + {true, true} -> {OldMNode, OldSNodes} = actual_queue_nodes(OldQ), + {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), + case OldMNode of + NewMNode -> ok; + _ -> io:format("TODO: master needs to change for ~p~n", [NewQ]) + end, + Add = NewSNodes -- OldSNodes, + Remove = OldSNodes -- NewSNodes, + [ok = drop_mirror(QName, SNode) || SNode <- Remove], + [ok = add_mirror(QName, SNode) || SNode <- Add] + end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 7939b48c..15862f17 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -275,9 +275,11 @@ terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. ok; -terminate({shutdown, dropped} = R, #state { backing_queue = BQ, +terminate({shutdown, dropped} = R, #state { gm = GM, + backing_queue = BQ, backing_queue_state = BQS }) -> %% See rabbit_mirror_queue_master:terminate/2 + ok = gm:leave(GM), %% TODO presumably we need this? BQ:delete_and_terminate(R, BQS); terminate(Reason, #state { q = Q, gm = GM, @@ -904,7 +906,7 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, %% We intentionally leave out the head where a slave becomes %% unsynchronised: we assert that can never happen. -set_synchronised(true, State = #state { q = Q = #amqqueue { name = QName }, +set_synchronised(true, State = #state { q = #amqqueue { name = QName }, synchronised = false }) -> Self = self(), rabbit_misc:execute_mnesia_transaction( @@ -912,9 +914,9 @@ set_synchronised(true, State = #state { q = Q = #amqqueue { name = QName }, case mnesia:read({rabbit_queue, QName}) of [] -> ok; - [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> - Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, - rabbit_mirror_queue_misc:store_updated_slaves(Q2) + [Q = #amqqueue{sync_slave_pids = SSPids}] -> + Q1 = Q#amqqueue{sync_slave_pids = [Self | SSPids]}, + rabbit_mirror_queue_misc:store_updated_slaves(Q1) end end), State #state { synchronised = true }; diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 732c29b6..adfb19a0 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -118,8 +118,7 @@ exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), pid :: rabbit_types:maybe(pid()), - slave_pids :: [pid()], - mirror_nodes :: [node()] | 'undefined' | 'all'}). + slave_pids :: [pid()]}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 8a44e03a..388fdec1 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -41,6 +41,7 @@ -rabbit_upgrade({policy, mnesia, [exchange_scratches, ha_mirrors]}). -rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). +-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). %% ------------------------------------------------------------------- @@ -64,6 +65,7 @@ -spec(runtime_parameters/0 :: () -> 'ok'). -spec(policy/0 :: () -> 'ok'). -spec(sync_slave_pids/0 :: () -> 'ok'). +-spec(no_mirror_nodes/0 :: () -> 'ok'). -endif. %%-------------------------------------------------------------------- @@ -253,6 +255,18 @@ sync_slave_pids(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, mirror_nodes, policy]). +no_mirror_nodes() -> + Tables = [rabbit_queue, rabbit_durable_queue], + RemoveMirrorNodesFun = + fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, _MNodes, Pol}) -> + {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol} + end, + [ok = transform(T, RemoveMirrorNodesFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, sync_slave_pids, policy]) + || T <- Tables], + ok. + %%-------------------------------------------------------------------- |