diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-04-24 15:45:03 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-04-24 15:45:03 +0100 |
commit | e4101968e294aacd0f673bc0118f4d64240cc09a (patch) | |
tree | 42fde8481e6625ae385416c2ed2bc561a685bef3 | |
parent | e870642153f73f5cdafb9188f4f7a1cad9b0db52 (diff) | |
parent | 9e38384a80b0f2fe481fb44f887762314fc785c2 (diff) | |
download | rabbitmq-server-e4101968e294aacd0f673bc0118f4d64240cc09a.tar.gz |
Merge bug25512
-rw-r--r-- | docs/rabbitmqctl.1.xml | 2 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/gen_server2.erl | 23 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 130 | ||||
-rw-r--r-- | src/rabbit_autoheal.erl | 199 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 61 | ||||
-rw-r--r-- | src/rabbit_exchange_decorator.erl | 41 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 15 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 46 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 110 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 14 | ||||
-rw-r--r-- | src/rabbit_ssl.erl | 29 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 11 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 16 |
14 files changed, 501 insertions, 198 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index bbd2fe5b..0f3c0faf 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -289,7 +289,7 @@ <variablelist> <varlistentry id="join_cluster"> - <term><cmdsynopsis><command>join_cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg><arg choice="opt"><replaceable>--ram</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>join_cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg> <arg choice="opt">--ram</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index eeee799e..4282755d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -40,7 +40,7 @@ -record(resource, {virtual_host, kind, name}). -record(exchange, {name, type, durable, auto_delete, internal, arguments, - scratches, policy}). + scratches, policy, decorators}). -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 9109febd..507d1cda 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -75,6 +75,12 @@ %% format_message_queue/2 which is the equivalent of format_status/2 %% but where the second argument is specifically the priority_queue %% which contains the prioritised message_queue. +%% +%% 9) The function with_state/2 can be used to debug a process with +%% heavyweight state (without needing to copy the entire state out of +%% process as sys:get_status/1 would). Pass through a function which +%% can be invoked on the state, get back the result. The state is not +%% modified. %% All modifications are (C) 2009-2013 VMware, Inc. @@ -184,6 +190,7 @@ cast/2, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, + with_state/2, enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). %% System exports @@ -382,6 +389,16 @@ multi_call(Nodes, Name, Req, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> do_multi_call(Nodes, Name, Req, Timeout). +%% ----------------------------------------------------------------- +%% Apply a function to a generic server's state. +%% ----------------------------------------------------------------- +with_state(Name, Fun) -> + case catch gen:call(Name, '$with_state', Fun, infinity) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, with_state, [Name, Fun]}}) + end. %%----------------------------------------------------------------- %% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ @@ -645,6 +662,8 @@ in({'$gen_cast', Msg} = Input, in({'$gen_call', From, Msg} = Input, GS2State = #gs2_state { prioritisers = {F, _, _} }) -> in(Input, F(Msg, From, GS2State), GS2State); +in({'$with_state', _From, _Fun} = Input, GS2State) -> + in(Input, 0, GS2State); in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) -> in(Input, infinity, GS2State); in({system, _From, _Req} = Input, GS2State) -> @@ -663,6 +682,10 @@ process_msg({system, From, Req}, %% gen_server puts Hib on the end as the 7th arg, but that version %% of the fun seems not to be documented so leaving out for now. sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State); +process_msg({'$with_state', From, Fun}, + GS2State = #gs2_state{state = State}) -> + reply(From, catch Fun(State)), + loop(GS2State); process_msg({'EXIT', Parent, Reason} = Msg, GS2State = #gs2_state { parent = Parent }) -> terminate(Reason, Msg, GS2State); diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b016c4d2..3712a625 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -49,10 +49,6 @@ ttl_timer_ref, ttl_timer_expiry, senders, - publish_seqno, - unconfirmed, - delayed_stop, - queue_monitors, dlx, dlx_routing_key, max_length, @@ -151,9 +147,6 @@ init_state(Q) -> has_had_consumers = false, active_consumers = queue:new(), senders = pmon:new(), - publish_seqno = 1, - unconfirmed = dtree:empty(), - queue_monitors = pmon:new(), msg_id_to_channel = gb_trees:empty(), status = running}, rabbit_event:init_stats_timer(State, #q.stats_timer). @@ -820,80 +813,31 @@ dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> State1. dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, - publish_seqno = SeqNo0, - unconfirmed = UC0, - queue_monitors = QMons0, backing_queue_state = BQS, backing_queue = BQ}) -> QName = qname(State), - {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} = - Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) -> - case dead_letter_publish(Msg, Reason, - X, RK, SeqNo, QName) of - [] -> {[AckTag | AckImm], SeqNo, UC, QMons}; - QPids -> {AckImm, SeqNo + 1, - dtree:insert(SeqNo, QPids, AckTag, UC), - pmon:monitor_all(QPids, QMons)} - end - end, {[], SeqNo0, UC0, QMons0}, BQS), - {_Guids, BQS2} = BQ:ack(AckImm1, BQS1), - {Res, State#q{publish_seqno = SeqNo1, - unconfirmed = UC1, - queue_monitors = QMons1, - backing_queue_state = BQS2}}. - -dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) -> + {Res, Acks1, BQS1} = + Fun(fun (Msg, AckTag, Acks) -> + dead_letter_publish(Msg, Reason, X, RK, QName), + [AckTag | Acks] + end, [], BQS), + {_Guids, BQS2} = BQ:ack(Acks1, BQS1), + {Res, State#q{backing_queue_state = BQS2}}. + +dead_letter_publish(Msg, Reason, X, RK, QName) -> DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), - Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo), + Delivery = rabbit_basic:delivery(false, DLMsg, undefined), {Queues, Cycles} = detect_dead_letter_cycles( DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), - {_, DeliveredQPids} = rabbit_amqqueue:deliver( - rabbit_amqqueue:lookup(Queues), Delivery), - DeliveredQPids. - -handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, - unconfirmed = UC}) -> - case pmon:is_monitored(QPid, QMons) of - false -> noreply(State); - true -> case rabbit_misc:is_abnormal_exit(Reason) of - true -> {Lost, _UC1} = dtree:take_all(QPid, UC), - QNameS = rabbit_misc:rs(qname(State)), - rabbit_log:warning("DLQ ~p for ~s died with " - "~p unconfirmed messages~n", - [QPid, QNameS, length(Lost)]); - false -> ok - end, - {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), - cleanup_after_confirm( - [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State#q{queue_monitors = pmon:erase(QPid, QMons), - unconfirmed = UC1}) - end. + rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery), + ok. -stop(State) -> stop(undefined, noreply, State). +stop(State) -> stop(noreply, State). -stop(From, Reply, State = #q{unconfirmed = UC}) -> - case {dtree:is_empty(UC), Reply} of - {true, noreply} -> {stop, normal, State}; - {true, _} -> {stop, normal, Reply, State}; - {false, _} -> noreply(State#q{delayed_stop = {From, Reply}}) - end. +stop(noreply, State) -> {stop, normal, State}; +stop(Reply, State) -> {stop, normal, Reply, State}. -cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, - unconfirmed = UC, - backing_queue = BQ, - backing_queue_state = BQS}) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - State1 = State#q{backing_queue_state = BQS1}, - case dtree:is_empty(UC) andalso DS =/= undefined of - true -> case DS of - {_, noreply} -> ok; - {From, Reply} -> gen_server2:reply(From, Reply) - end, - {stop, normal, State1}; - false -> noreply(State1) - end. detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = @@ -1073,9 +1017,6 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. -handle_call(_, _, State = #q{delayed_stop = DS}) when DS =/= undefined -> - noreply(State); - handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); @@ -1115,16 +1056,15 @@ handle_call({deliver, Delivery, Delivered}, From, State) -> gen_server2:reply(From, ok), noreply(deliver_or_enqueue(Delivery, Delivered, State)); -handle_call({notify_down, ChPid}, From, State) -> +handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the %% client. The queue is ultimately deleted in terminate/2; if we %% return stop with a reply, terminate/2 will be called by - %% gen_server2 *before* the reply is sent. FIXME: in case of a - %% delayed stop the reply is sent earlier. + %% gen_server2 *before* the reply is sent. case handle_ch_down(ChPid, State) of {ok, State1} -> reply(ok, State1); - {stop, State1} -> stop(From, ok, State1) + {stop, State1} -> stop(ok, State1) end; handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, @@ -1186,7 +1126,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, reply(ok, run_message_queue(State1#q{active_consumers = AC1})) end; -handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, +handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), case lookup_ch(ChPid) of @@ -1215,7 +1155,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> stop(From, ok, State1) + true -> stop(ok, State1) end end; @@ -1224,14 +1164,14 @@ handle_call(stat, _From, State) -> ensure_expiry_timer(State), reply({ok, BQ:len(BQS), consumer_count()}, State1); -handle_call({delete, IfUnused, IfEmpty}, From, +handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); IfUnused and not(IsUnused) -> reply({error, in_use}, State); - true -> stop(From, {ok, BQ:len(BQS)}, State) + true -> stop({ok, BQ:len(BQS)}, State) end; handle_call(purge, _From, State = #q{backing_queue = BQ, @@ -1286,19 +1226,6 @@ handle_call(force_event_refresh, _From, end, reply(ok, State). -handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> - {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), - State1 = case dtree:is_defined(QPid, UC1) of - false -> QMons = State#q.queue_monitors, - State#q{queue_monitors = pmon:demonitor(QPid, QMons)}; - true -> State - end, - cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State1#q{unconfirmed = UC1}); - -handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> - noreply(State); - handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); @@ -1405,15 +1332,6 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, handle_cast(wake_up, State) -> noreply(State). -%% We need to not ignore this as we need to remove outstanding -%% confirms due to queue death. -handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, - State = #q{delayed_stop = DS}) when DS =/= undefined -> - handle_queue_down(DownPid, Reason, State); - -handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> - noreply(State); - handle_info(maybe_expire, State) -> case is_unused(State) of true -> stop(State); @@ -1442,9 +1360,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, %% unexpectedly. stop(State); -handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) -> +handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of - {ok, State1} -> handle_queue_down(DownPid, Reason, State1); + {ok, State1} -> noreply(State1); {stop, State1} -> stop(State1) end; diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl new file mode 100644 index 00000000..c00c2dd6 --- /dev/null +++ b/src/rabbit_autoheal.erl @@ -0,0 +1,199 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_autoheal). + +-export([init/0, maybe_start/1, node_down/2, handle_msg/3]). + +%% The named process we are running in. +-define(SERVER, rabbit_node_monitor). + +%%---------------------------------------------------------------------------- + +%% In order to autoheal we want to: +%% +%% * Find the winning partition +%% * Stop all nodes in other partitions +%% * Wait for them all to be stopped +%% * Start them again +%% +%% To keep things simple, we assume all nodes are up. We don't start +%% unless all nodes are up, and if a node goes down we abandon the +%% whole process. To further keep things simple we also defer the +%% decision as to the winning node to the "leader" - arbitrarily +%% selected as the first node in the cluster. +%% +%% To coordinate the restarting nodes we pick a special node from the +%% winning partition - the "winner". Restarting nodes then stop, tell +%% the winner they have done so, and wait for it to tell them it is +%% safe to start again. +%% +%% The winner and the leader are not necessarily the same node! Since +%% the leader may end up restarting, we also make sure that it does +%% not announce its decision (and thus cue other nodes to restart) +%% until it has seen a request from every node that has experienced a +%% partition. +%% +%% Possible states: +%% +%% not_healing +%% - the default +%% +%% {winner_waiting, OutstandingStops, Notify} +%% - we are the winner and are waiting for all losing nodes to stop +%% before telling them they can restart +%% +%% restarting +%% - we are restarting. Of course the node monitor immediately dies +%% then so this state does not last long. We therefore send the +%% autoheal_safe_to_start message to the rabbit_outside_app_process +%% instead. + +%%---------------------------------------------------------------------------- + +init() -> not_healing. + +maybe_start(not_healing) -> + case enabled() of + true -> [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)), + send(Leader, {request_start, node()}), + rabbit_log:info("Autoheal request sent to ~p~n", [Leader]), + not_healing; + false -> not_healing + end; +maybe_start(State) -> + State. + +enabled() -> + {ok, autoheal} =:= application:get_env(rabbit, cluster_partition_handling). + +node_down(_Node, {winner_waiting, _Nodes, _Notify} = Autoheal) -> + Autoheal; +node_down(_Node, not_healing) -> + not_healing; +node_down(Node, _State) -> + rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]), + not_healing. + +%% By receiving this message we become the leader +%% TODO should we try to debounce this? +handle_msg({request_start, Node}, + not_healing, Partitions) -> + rabbit_log:info("Autoheal request received from ~p~n", [Node]), + case rabbit_node_monitor:all_rabbit_nodes_up() of + false -> not_healing; + true -> AllPartitions = all_partitions(Partitions), + {Winner, Losers} = make_decision(AllPartitions), + rabbit_log:info("Autoheal decision~n" + " * Partitions: ~p~n" + " * Winner: ~p~n" + " * Losers: ~p~n", + [AllPartitions, Winner, Losers]), + send(Winner, {become_winner, Losers}), + [send(L, {winner_is, Winner}) || L <- Losers], + not_healing + end; + +handle_msg({become_winner, Losers}, + not_healing, _Partitions) -> + rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n", + [Losers]), + {winner_waiting, Losers, Losers}; + +handle_msg({become_winner, Losers}, + {winner_waiting, WaitFor, Notify}, _Partitions) -> + rabbit_log:info("Autoheal: I am the winner, waiting additionally for " + "~p to stop~n", [Losers]), + {winner_waiting, lists:usort(Losers ++ WaitFor), + lists:usort(Losers ++ Notify)}; + +handle_msg({winner_is, Winner}, + not_healing, _Partitions) -> + rabbit_log:warning( + "Autoheal: we were selected to restart; winner is ~p~n", [Winner]), + rabbit_node_monitor:run_outside_applications( + fun () -> + MRef = erlang:monitor(process, {?SERVER, Winner}), + rabbit:stop(), + send(Winner, {node_stopped, node()}), + receive + {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> ok; + autoheal_safe_to_start -> ok + end, + erlang:demonitor(MRef, [flush]), + rabbit:start() + end), + restarting; + +%% This is the winner receiving its last notification that a node has +%% stopped - all nodes can now start again +handle_msg({node_stopped, Node}, + {winner_waiting, [Node], Notify}, _Partitions) -> + rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]), + [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], + not_healing; + +handle_msg({node_stopped, Node}, + {winner_waiting, WaitFor, Notify}, _Partitions) -> + {winner_waiting, WaitFor -- [Node], Notify}; + +handle_msg(_, restarting, _Partitions) -> + %% ignore, we can contribute no further + restarting; + +handle_msg({node_stopped, _Node}, State, _Partitions) -> + %% ignore, we already cancelled the autoheal process + State. + +%%---------------------------------------------------------------------------- + +send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}. + +make_decision(AllPartitions) -> + Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]), + [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]), + {Winner, lists:append(Rest)}. + +partition_value(Partition) -> + Connections = [Res || Node <- Partition, + Res <- [rpc:call(Node, rabbit_networking, + connections_local, [])], + is_list(Res)], + {length(lists:append(Connections)), length(Partition)}. + +%% We have our local understanding of what partitions exist; but we +%% only know which nodes we have been partitioned from, not which +%% nodes are partitioned from each other. +all_partitions(PartitionedWith) -> + Nodes = rabbit_mnesia:cluster_nodes(all), + Partitions = [{node(), PartitionedWith} | + [rpc:call(Node, rabbit_node_monitor, partitions, []) + || Node <- Nodes -- [node()]]], + all_partitions(Partitions, [Nodes]). + +all_partitions([], Partitions) -> + Partitions; +all_partitions([{Node, CantSee} | Rest], Partitions) -> + {[Containing], Others} = + lists:partition(fun (Part) -> lists:member(Node, Part) end, Partitions), + A = Containing -- CantSee, + B = Containing -- A, + Partitions1 = case {A, B} of + {[], _} -> Partitions; + {_, []} -> Partitions; + _ -> [A, B | Others] + end, + all_partitions(Rest, Partitions1). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 9e98448d..b4bdd348 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -68,7 +68,8 @@ -spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok'). -spec(update/2 :: (name(), - fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok'). + fun((rabbit_types:exchange()) -> rabbit_types:exchange())) + -> not_found | rabbit_types:exchange()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: @@ -113,25 +114,39 @@ recover() -> callback(X, create, map_create_tx(Tx), [X]) end, rabbit_durable_exchange), + report_missing_decorators(Xs), [XName || #exchange{name = XName} <- Xs]. -callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> +report_missing_decorators(Xs) -> + Mods = lists:usort(lists:append([rabbit_exchange_decorator:select(raw, D) || + #exchange{decorators = D} <- Xs])), + case [M || M <- Mods, code:which(M) =:= non_existing] of + [] -> ok; + M -> rabbit_log:warning("Missing exchange decorators: ~p~n", [M]) + end. + +callback(X = #exchange{type = XType, + decorators = Decorators}, Fun, Serial0, Args) -> Serial = if is_function(Serial0) -> Serial0; is_atom(Serial0) -> fun (_Bool) -> Serial0 end end, [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) || - M <- registry_lookup(exchange_decorator)], + M <- rabbit_exchange_decorator:select(all, Decorators)], Module = type_to_module(XType), apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). -policy_changed(X = #exchange{type = XType}, X1) -> - [ok = M:policy_changed(X, X1) || - M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]], +policy_changed(X = #exchange{type = XType, + decorators = Decorators}, + X1 = #exchange{decorators = Decorators1}) -> + D = rabbit_exchange_decorator:select(all, Decorators), + D1 = rabbit_exchange_decorator:select(all, Decorators1), + DAll = lists:usort(D ++ D1), + [ok = M:policy_changed(X, X1) || M <- [type_to_module(XType) | DAll]], ok. -serialise_events(X = #exchange{type = Type}) -> +serialise_events(X = #exchange{type = Type, decorators = Decorators}) -> lists:any(fun (M) -> M:serialise_events(X) end, - registry_lookup(exchange_decorator)) + rabbit_exchange_decorator:select(all, Decorators)) orelse (type_to_module(Type)):serialise_events(). serial(#exchange{name = XName} = X) -> @@ -143,16 +158,6 @@ serial(#exchange{name = XName} = X) -> (false) -> none end. -registry_lookup(exchange_decorator_route = Class) -> - case get(exchange_decorator_route_modules) of - undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)], - put(exchange_decorator_route_modules, Mods), - Mods; - Mods -> Mods - end; -registry_lookup(Class) -> - [M || {_, M} <- rabbit_registry:lookup_all(Class)]. - declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = rabbit_policy:set(#exchange{name = XName, type = Type, @@ -273,7 +278,8 @@ update_scratch(Name, App, Fun) -> Scratches2 = orddict:store( App, Fun(Scratch), Scratches1), X#exchange{scratches = Scratches2} - end) + end), + ok end). update(Name, Fun) -> @@ -284,9 +290,10 @@ update(Name, Fun) -> case Durable of true -> ok = mnesia:write(rabbit_durable_exchange, X1, write); _ -> ok - end; + end, + X1; [] -> - ok + not_found end. info_keys() -> ?INFO_KEYS. @@ -318,15 +325,15 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -route(#exchange{name = #resource{virtual_host = VHost, - name = RName} = XName} = X, +route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, + decorators = Decorators} = X, #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> - case {registry_lookup(exchange_decorator_route), RName == <<"">>} of - {[], true} -> + case {RName, rabbit_exchange_decorator:select(route, Decorators)} of + {<<"">>, []} -> %% Optimisation [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; - {Decorators, _} -> - lists:usort(route1(Delivery, Decorators, {[X], XName, []})) + {_, SelectedDecorators} -> + lists:usort(route1(Delivery, SelectedDecorators, {[X], XName, []})) end. route1(_, _, {[], _, QNames}) -> diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 040b55db..3abaa48c 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -16,6 +16,10 @@ -module(rabbit_exchange_decorator). +-include("rabbit.hrl"). + +-export([select/2, set/1]). + %% This is like an exchange type except that: %% %% 1) It applies to all exchanges as soon as it is installed, therefore @@ -57,10 +61,13 @@ -callback remove_bindings(serial(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'. -%% Decorators can optionally implement route/2 which allows additional -%% destinations to be added to the routing decision. -%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> -%% [rabbit_amqqueue:name() | rabbit_exchange:name()]. +%% Allows additional destinations to be added to the routing decision. +-callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> + [rabbit_amqqueue:name() | rabbit_exchange:name()]. + +%% Whether the decorator wishes to receive callbacks for the exchange +%% none:no callbacks, noroute:all callbacks except route, all:all callbacks +-callback active_for(rabbit_types:exchange()) -> 'none' | 'noroute' | 'all'. -else. @@ -68,8 +75,32 @@ behaviour_info(callbacks) -> [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3}, - {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}]; + {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}, + {route, 2}, {active_for, 1}]; behaviour_info(_Other) -> undefined. -endif. + +%%---------------------------------------------------------------------------- + +%% select a subset of active decorators +select(all, {Route, NoRoute}) -> filter(Route ++ NoRoute); +select(route, {Route, _NoRoute}) -> filter(Route); +select(raw, {Route, NoRoute}) -> Route ++ NoRoute. + +filter(Modules) -> + [M || M <- Modules, code:which(M) =/= non_existing]. + +set(X) -> + Decs = lists:foldl(fun (D, {Route, NoRoute}) -> + ActiveFor = D:active_for(X), + {cons_if_eq(all, ActiveFor, D, Route), + cons_if_eq(noroute, ActiveFor, D, NoRoute)} + end, {[], []}, list()), + X#exchange{decorators = Decs}. + +list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. + +cons_if_eq(Select, Select, Item, List) -> [Item | List]; +cons_if_eq(_Select, _Other, _Item, List) -> List. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 22edfcb6..964b0eb4 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -605,10 +605,13 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> State #state { known_senders = pmon:monitor(ChPid, KS) }. local_sender_death(ChPid, State = #state { known_senders = KS }) -> + %% The channel will be monitored iff we have received a delivery + %% from it but not heard about its death from the master. So if it + %% is monitored we need to point the death out to the master (see + %% essay). ok = case pmon:is_monitored(ChPid, KS) of false -> ok; - true -> credit_flow:peer_down(ChPid), - confirm_sender_death(ChPid) + true -> confirm_sender_death(ChPid) end, State. @@ -621,6 +624,10 @@ confirm_sender_death(Pid) -> fun (?MODULE, State = #state { known_senders = KS, gm = GM }) -> %% We're running still as a slave + %% + %% See comment in local_sender_death/2; we might have + %% received a sender_death in the meanwhile so check + %% again. ok = case pmon:is_monitored(Pid, KS) of false -> ok; true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}), @@ -766,6 +773,9 @@ process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, known_senders = KS }) -> + %% The channel will be monitored iff we have received a message + %% from it. In this case we just want to avoid doing work if we + %% never got any messages. {ok, case pmon:is_monitored(ChPid, KS) of false -> State; true -> MS1 = case dict:find(ChPid, SQ) of @@ -775,6 +785,7 @@ process_instruction({sender_death, ChPid}, lists:foldl(fun dict:erase/2, MS, sets:to_list(PendingCh)) end, + credit_flow:peer_down(ChPid), State #state { sender_queues = dict:erase(ChPid, SQ), msg_id_status = MS1, known_senders = pmon:demonitor(ChPid, KS) } diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c39e898c..8cd976fa 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -404,7 +404,7 @@ cluster_status(WhichNodes) -> node_info() -> {erlang:system_info(otp_release), rabbit_misc:version(), - cluster_status_from_mnesia()}. + delegate_beam_hash(), cluster_status_from_mnesia()}. node_type() -> DiscNodes = cluster_nodes(disc), @@ -562,10 +562,13 @@ check_cluster_consistency(Node) -> case rpc:call(Node, rabbit_mnesia, node_info, []) of {badrpc, _Reason} -> {error, not_found}; - {_OTP, _Rabbit, {error, _}} -> + {_OTP, _Rabbit, _Hash, {error, _}} -> {error, not_found}; - {OTP, Rabbit, {ok, Status}} -> - case check_consistency(OTP, Rabbit, Node, Status) of + {_OTP, Rabbit, _Status} -> + %% pre-2013/04 format implies version mismatch + version_error("Rabbit", rabbit_misc:version(), Rabbit); + {OTP, Rabbit, Hash, {ok, Status}} -> + case check_consistency(OTP, Rabbit, Hash, Node, Status) of {error, _} = E -> E; {ok, Res} -> {ok, Res} end @@ -732,14 +735,17 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) -> Nodes end. -check_consistency(OTP, Rabbit) -> +check_consistency(OTP, Rabbit, Hash) -> rabbit_misc:sequence_error( - [check_otp_consistency(OTP), check_rabbit_consistency(Rabbit)]). + [check_otp_consistency(OTP), + check_rabbit_consistency(Rabbit), + check_beam_compatibility(Hash)]). -check_consistency(OTP, Rabbit, Node, Status) -> +check_consistency(OTP, Rabbit, Hash, Node, Status) -> rabbit_misc:sequence_error( [check_otp_consistency(OTP), check_rabbit_consistency(Rabbit), + check_beam_compatibility(Hash), check_nodes_consistency(Node, Status)]). check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> @@ -780,6 +786,21 @@ check_rabbit_consistency(Remote) -> rabbit_misc:version(), Remote, "Rabbit", fun rabbit_misc:version_minor_equivalent/2). +check_beam_compatibility(RemoteHash) -> + case RemoteHash == delegate_beam_hash() of + true -> ok; + false -> {error, {incompatible_bytecode, + "Incompatible Erlang bytecode found on nodes"}} + end. + +%% The delegate module sends functions across the cluster; if it is +%% out of sync (say due to mixed compilers), we will get badfun +%% exceptions when trying to do so. Let's detect that at startup. +delegate_beam_hash() -> + {delegate, Obj, _} = code:get_object_code(delegate), + {ok, {delegate, Hash}} = beam_lib:md5(Obj), + Hash. + %% This is fairly tricky. We want to know if the node is in the state %% that a `reset' would leave it in. We cannot simply check if the %% mnesia tables aren't there because restarted RAM nodes won't have @@ -805,11 +826,12 @@ find_good_node([]) -> none; find_good_node([Node | Nodes]) -> case rpc:call(Node, rabbit_mnesia, node_info, []) of - {badrpc, _Reason} -> find_good_node(Nodes); - {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of - {error, _} -> find_good_node(Nodes); - ok -> {ok, Node} - end + {badrpc, _Reason} -> find_good_node(Nodes); + {_OTP, _Rabbit, _} -> find_good_node(Nodes); + {OTP, Rabbit, Hash, _} -> case check_consistency(OTP, Rabbit, Hash) of + {error, _} -> find_good_node(Nodes); + ok -> {ok, Node} + end end. is_only_clustered_disc_node() -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index fb74d4a3..7d844c72 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -30,11 +30,14 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + %% Utils +-export([all_rabbit_nodes_up/0, run_outside_applications/1]). + -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). -define(RABBIT_DOWN_PING_INTERVAL, 1000). --record(state, {monitors, partitions, subscribers, down_ping_timer}). +-record(state, {monitors, partitions, subscribers, down_ping_timer, autoheal}). %%---------------------------------------------------------------------------- @@ -57,6 +60,9 @@ -spec(partitions/0 :: () -> {node(), [node()]}). -spec(subscribe/1 :: (pid()) -> 'ok'). +-spec(all_rabbit_nodes_up/0 :: () -> boolean()). +-spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()). + -endif. %%---------------------------------------------------------------------------- @@ -194,10 +200,12 @@ init([]) -> %% writing out the cluster status files - bad things can then %% happen. process_flag(trap_exit, true), + net_kernel:monitor_nodes(true), {ok, _} = mnesia:subscribe(system), {ok, #state{monitors = pmon:new(), subscribers = pmon:new(), - partitions = []}}. + partitions = [], + autoheal = rabbit_autoheal:init()}}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> {reply, {node(), Partitions}, State}; @@ -251,16 +259,22 @@ handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, ok = handle_dead_rabbit(Node), [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)], {noreply, handle_dead_rabbit_state( + Node, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})}; handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state{subscribers = Subscribers}) -> {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}}; +handle_info({nodedown, Node}, State) -> + ok = handle_dead_node(Node), + {noreply, State}; + handle_info({mnesia_system_event, {inconsistent_database, running_partitioned_network, Node}}, State = #state{partitions = Partitions, - monitors = Monitors}) -> + monitors = Monitors, + autoheal = AState}) -> %% We will not get a node_up from this node - yet we should treat it as %% up (mostly). State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of @@ -271,7 +285,13 @@ handle_info({mnesia_system_event, ok = handle_live_rabbit(Node), Partitions1 = ordsets:to_list( ordsets:add_element(Node, ordsets:from_list(Partitions))), - {noreply, State1#state{partitions = Partitions1}}; + {noreply, State1#state{partitions = Partitions1, + autoheal = rabbit_autoheal:maybe_start(AState)}}; + +handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState, + partitions = Partitions}) -> + AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions), + {noreply, State#state{autoheal = AState1}}; handle_info(ping_nodes, State) -> %% We ping nodes when some are down to ensure that we find out @@ -318,6 +338,18 @@ handle_dead_rabbit(Node) -> ok = rabbit_amqqueue:on_node_down(Node), ok = rabbit_alarm:on_node_down(Node), ok = rabbit_mnesia:on_node_down(Node), + ok. + +handle_dead_node(_Node) -> + %% In general in rabbit_node_monitor we care about whether the + %% rabbit application is up rather than the node; we do this so + %% that we can respond in the same way to "rabbitmqctl stop_app" + %% and "rabbitmqctl stop" as much as possible. + %% + %% However, for pause_minority mode we can't do this, since we + %% depend on looking at whether other nodes are up to decide + %% whether to come back up ourselves - if we decide that based on + %% the rabbit application we would go down and never come back. case application:get_env(rabbit, cluster_partition_handling) of {ok, pause_minority} -> case majority() of @@ -326,44 +358,32 @@ handle_dead_rabbit(Node) -> end; {ok, ignore} -> ok; + {ok, autoheal} -> + ok; {ok, Term} -> rabbit_log:warning("cluster_partition_handling ~p unrecognised, " "assuming 'ignore'~n", [Term]), ok - end, - ok. - -majority() -> - Nodes = rabbit_mnesia:cluster_nodes(all), - length(alive_nodes(Nodes)) / length(Nodes) > 0.5. - -all_nodes_up() -> - Nodes = rabbit_mnesia:cluster_nodes(all), - length(alive_nodes(Nodes)) =:= length(Nodes). - -%% mnesia:system_info(db_nodes) (and hence -%% rabbit_mnesia:cluster_nodes(running)) does not give reliable results -%% when partitioned. -alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)). - -alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)]. - -alive_rabbit_nodes() -> - [N || N <- alive_nodes(), rabbit_nodes:is_process_running(N, rabbit)]. + end. await_cluster_recovery() -> rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", []), Nodes = rabbit_mnesia:cluster_nodes(all), + run_outside_applications(fun () -> + rabbit:stop(), + wait_for_cluster_recovery(Nodes) + end). + +run_outside_applications(Fun) -> spawn(fun () -> %% If our group leader is inside an application we are about %% to stop, application:stop/1 does not return. group_leader(whereis(init), self()), - %% Ensure only one restarting process at a time, will + %% Ensure only one such process at a time, will %% exit(badarg) (harmlessly) if one is already running - register(rabbit_restarting_process, self()), - rabbit:stop(), - wait_for_cluster_recovery(Nodes) + register(rabbit_outside_app_process, self()), + Fun() end). wait_for_cluster_recovery(Nodes) -> @@ -373,7 +393,8 @@ wait_for_cluster_recovery(Nodes) -> wait_for_cluster_recovery(Nodes) end. -handle_dead_rabbit_state(State = #state{partitions = Partitions}) -> +handle_dead_rabbit_state(Node, State = #state{partitions = Partitions, + autoheal = Autoheal}) -> %% If we have been partitioned, and we are now in the only remaining %% partition, we no longer care about partitions - forget them. Note %% that we do not attempt to deal with individual (other) partitions @@ -383,7 +404,9 @@ handle_dead_rabbit_state(State = #state{partitions = Partitions}) -> [] -> []; _ -> Partitions end, - ensure_ping_timer(State#state{partitions = Partitions1}). + ensure_ping_timer( + State#state{partitions = Partitions1, + autoheal = rabbit_autoheal:node_down(Node, Autoheal)}). ensure_ping_timer(State) -> rabbit_misc:ensure_timer( @@ -416,3 +439,30 @@ legacy_should_be_disc_node(DiscNodes) -> add_node(Node, Nodes) -> lists:usort([Node | Nodes]). del_node(Node, Nodes) -> Nodes -- [Node]. + +%%-------------------------------------------------------------------- + +%% mnesia:system_info(db_nodes) (and hence +%% rabbit_mnesia:cluster_nodes(running)) does not give reliable +%% results when partitioned. So we have a small set of replacement +%% functions here. "rabbit" in a function's name implies we test if +%% the rabbit application is up, not just the node. + +majority() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + length(alive_nodes(Nodes)) / length(Nodes) > 0.5. + +all_nodes_up() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + length(alive_nodes(Nodes)) =:= length(Nodes). + +all_rabbit_nodes_up() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + length(alive_rabbit_nodes(Nodes)) =:= length(Nodes). + +alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)]. + +alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)). + +alive_rabbit_nodes(Nodes) -> + [N || N <- alive_nodes(Nodes), rabbit_nodes:is_process_running(N, rabbit)]. diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 7398cd2d..0990c662 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -46,7 +46,8 @@ name0(undefined) -> none; name0(Policy) -> pget(name, Policy). set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; -set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. +set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set( + X#exchange{policy = set0(Name)}). set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)). @@ -170,9 +171,14 @@ update_policies(VHost) -> update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> case match(XName, Policies) of OldPolicy -> no_change; - NewPolicy -> rabbit_exchange:update( - XName, fun(X1) -> X1#exchange{policy = NewPolicy} end), - {X, X#exchange{policy = NewPolicy}} + NewPolicy -> case rabbit_exchange:update( + XName, fun (X0) -> + rabbit_exchange_decorator:set( + X0 #exchange{policy = NewPolicy}) + end) of + #exchange{} = X1 -> {X, X1}; + not_found -> {X, X } + end end. update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index b1238623..96277b68 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -162,15 +162,16 @@ format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) -> {?'id-at-pseudonym' , "PSEUDONYM"}, {?'id-domainComponent' , "DC"}, {?'id-emailAddress' , "EMAILADDRESS"}, - {?'street-address' , "STREET"}], + {?'street-address' , "STREET"}, + {{0,9,2342,19200300,100,1,1} , "UID"}], %% Not in public_key.hrl case proplists:lookup(T, Fmts) of {_, Fmt} -> - io_lib:format(Fmt ++ "=~s", [FV]); + rabbit_misc:format(Fmt ++ "=~s", [FV]); none when is_tuple(T) -> - TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)], - io_lib:format("~s:~s", [string:join(TypeL, "."), FV]); + TypeL = [rabbit_misc:format("~w", [X]) || X <- tuple_to_list(T)], + rabbit_misc:format("~s=~s", [string:join(TypeL, "."), FV]); none -> - io_lib:format("~p:~s", [T, FV]) + rabbit_misc:format("~p=~s", [T, FV]) end. %% Escape a string as per RFC4514. @@ -204,14 +205,26 @@ format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString; format_directory_string(ST, S); format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2, $Z]}) -> - io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ", - [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]); + rabbit_misc:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ", + [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]); %% We appear to get an untagged value back for an ia5string %% (e.g. domainComponent). format_asn1_value(V) when is_list(V) -> V; +format_asn1_value(V) when is_binary(V) -> + %% OTP does not decode some values when combined with an unknown + %% type. That's probably wrong, so as a last ditch effort let's + %% try manually decoding. 'DirectoryString' is semi-arbitrary - + %% but it is the type which covers the various string types we + %% handle below. + try + {ST, S} = public_key:der_decode('DirectoryString', V), + format_directory_string(ST, S) + catch _:_ -> + rabbit_misc:format("~p", [V]) + end; format_asn1_value(V) -> - io_lib:format("~p", [V]). + rabbit_misc:format("~p", [V]). %% DirectoryString { INTEGER : maxSize } ::= CHOICE { %% teletexString TeletexString (SIZE (1..maxSize)), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e7b69879..163f6170 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -63,6 +63,7 @@ all_tests() -> passed = test_server_status(), passed = test_amqp_connection_refusal(), passed = test_confirms(), + passed = test_with_state(), passed = do_if_secondary_node( fun run_cluster_dependent_tests/1, @@ -563,8 +564,9 @@ test_topic_matching() -> XName = #resource{virtual_host = <<"/">>, kind = exchange, name = <<"test_exchange">>}, - X = #exchange{name = XName, type = topic, durable = false, - auto_delete = false, arguments = []}, + X0 = #exchange{name = XName, type = topic, durable = false, + auto_delete = false, arguments = []}, + X = rabbit_exchange_decorator:set(X0), %% create rabbit_exchange_type_topic:validate(X), exchange_op_callback(X, create, []), @@ -1298,6 +1300,11 @@ test_confirms() -> passed. +test_with_state() -> + fhc_state = gen_server2:with_state(file_handle_cache, + fun (S) -> element(1, S) end), + passed. + test_statistics_event_receiver(Pid) -> receive Foo -> Pid ! Foo, test_statistics_event_receiver(Pid) diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 457b1567..b7b1635b 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -43,6 +43,7 @@ -rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). -rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). -rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}). +-rabbit_upgrade({exchange_decorators, mnesia, [policy]}). %% ------------------------------------------------------------------- @@ -68,6 +69,7 @@ -spec(sync_slave_pids/0 :: () -> 'ok'). -spec(no_mirror_nodes/0 :: () -> 'ok'). -spec(gm_pids/0 :: () -> 'ok'). +-spec(exchange_decorators/0 :: () -> 'ok'). -endif. @@ -282,6 +284,20 @@ gm_pids() -> || T <- Tables], ok. +exchange_decorators() -> + ok = exchange_decorators(rabbit_exchange), + ok = exchange_decorators(rabbit_durable_exchange). + +exchange_decorators(Table) -> + transform( + Table, + fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches, + Policy}) -> + {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches, Policy, + {[], []}} + end, + [name, type, durable, auto_delete, internal, arguments, scratches, policy, + decorators]). %%-------------------------------------------------------------------- |