diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-04-18 17:33:09 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-04-18 17:33:09 +0100 |
commit | 831e26f3c6b2f4ce762f696db0f1bbba03bf3747 (patch) | |
tree | 00819116d338e3c3013818f88e2cedb590516e22 | |
parent | 10557c85d7635e923511f5be47058637a5693a32 (diff) | |
parent | e4f0f5bbd1d66af7f3edfcac894202e4c9cb9aba (diff) | |
download | rabbitmq-server-831e26f3c6b2f4ce762f696db0f1bbba03bf3747.tar.gz |
Merge bug 25517.
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 130 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 61 | ||||
-rw-r--r-- | src/rabbit_exchange_decorator.erl | 41 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 11 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 14 | ||||
-rw-r--r-- | src/rabbit_ssl.erl | 29 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 5 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 16 |
10 files changed, 154 insertions, 157 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index eeee799e..4282755d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -40,7 +40,7 @@ -record(resource, {virtual_host, kind, name}). -record(exchange, {name, type, durable, auto_delete, internal, arguments, - scratches, policy}). + scratches, policy, decorators}). -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index d4526d87..3a15c4b6 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -9,7 +9,7 @@ Standards-Version: 3.9.2 Package: rabbitmq-server Architecture: all -Depends: erlang-nox (>= 1:12.b.3), adduser, logrotate, ${misc:Depends} +Depends: erlang-nox (>= 1:12.b.3) | esl-erlang, adduser, logrotate, ${misc:Depends} Description: AMQP server written in Erlang RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b016c4d2..3712a625 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -49,10 +49,6 @@ ttl_timer_ref, ttl_timer_expiry, senders, - publish_seqno, - unconfirmed, - delayed_stop, - queue_monitors, dlx, dlx_routing_key, max_length, @@ -151,9 +147,6 @@ init_state(Q) -> has_had_consumers = false, active_consumers = queue:new(), senders = pmon:new(), - publish_seqno = 1, - unconfirmed = dtree:empty(), - queue_monitors = pmon:new(), msg_id_to_channel = gb_trees:empty(), status = running}, rabbit_event:init_stats_timer(State, #q.stats_timer). @@ -820,80 +813,31 @@ dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> State1. dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, - publish_seqno = SeqNo0, - unconfirmed = UC0, - queue_monitors = QMons0, backing_queue_state = BQS, backing_queue = BQ}) -> QName = qname(State), - {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} = - Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) -> - case dead_letter_publish(Msg, Reason, - X, RK, SeqNo, QName) of - [] -> {[AckTag | AckImm], SeqNo, UC, QMons}; - QPids -> {AckImm, SeqNo + 1, - dtree:insert(SeqNo, QPids, AckTag, UC), - pmon:monitor_all(QPids, QMons)} - end - end, {[], SeqNo0, UC0, QMons0}, BQS), - {_Guids, BQS2} = BQ:ack(AckImm1, BQS1), - {Res, State#q{publish_seqno = SeqNo1, - unconfirmed = UC1, - queue_monitors = QMons1, - backing_queue_state = BQS2}}. - -dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) -> + {Res, Acks1, BQS1} = + Fun(fun (Msg, AckTag, Acks) -> + dead_letter_publish(Msg, Reason, X, RK, QName), + [AckTag | Acks] + end, [], BQS), + {_Guids, BQS2} = BQ:ack(Acks1, BQS1), + {Res, State#q{backing_queue_state = BQS2}}. + +dead_letter_publish(Msg, Reason, X, RK, QName) -> DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), - Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo), + Delivery = rabbit_basic:delivery(false, DLMsg, undefined), {Queues, Cycles} = detect_dead_letter_cycles( DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), - {_, DeliveredQPids} = rabbit_amqqueue:deliver( - rabbit_amqqueue:lookup(Queues), Delivery), - DeliveredQPids. - -handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, - unconfirmed = UC}) -> - case pmon:is_monitored(QPid, QMons) of - false -> noreply(State); - true -> case rabbit_misc:is_abnormal_exit(Reason) of - true -> {Lost, _UC1} = dtree:take_all(QPid, UC), - QNameS = rabbit_misc:rs(qname(State)), - rabbit_log:warning("DLQ ~p for ~s died with " - "~p unconfirmed messages~n", - [QPid, QNameS, length(Lost)]); - false -> ok - end, - {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), - cleanup_after_confirm( - [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State#q{queue_monitors = pmon:erase(QPid, QMons), - unconfirmed = UC1}) - end. + rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery), + ok. -stop(State) -> stop(undefined, noreply, State). +stop(State) -> stop(noreply, State). -stop(From, Reply, State = #q{unconfirmed = UC}) -> - case {dtree:is_empty(UC), Reply} of - {true, noreply} -> {stop, normal, State}; - {true, _} -> {stop, normal, Reply, State}; - {false, _} -> noreply(State#q{delayed_stop = {From, Reply}}) - end. +stop(noreply, State) -> {stop, normal, State}; +stop(Reply, State) -> {stop, normal, Reply, State}. -cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, - unconfirmed = UC, - backing_queue = BQ, - backing_queue_state = BQS}) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - State1 = State#q{backing_queue_state = BQS1}, - case dtree:is_empty(UC) andalso DS =/= undefined of - true -> case DS of - {_, noreply} -> ok; - {From, Reply} -> gen_server2:reply(From, Reply) - end, - {stop, normal, State1}; - false -> noreply(State1) - end. detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = @@ -1073,9 +1017,6 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. -handle_call(_, _, State = #q{delayed_stop = DS}) when DS =/= undefined -> - noreply(State); - handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); @@ -1115,16 +1056,15 @@ handle_call({deliver, Delivery, Delivered}, From, State) -> gen_server2:reply(From, ok), noreply(deliver_or_enqueue(Delivery, Delivered, State)); -handle_call({notify_down, ChPid}, From, State) -> +handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the %% client. The queue is ultimately deleted in terminate/2; if we %% return stop with a reply, terminate/2 will be called by - %% gen_server2 *before* the reply is sent. FIXME: in case of a - %% delayed stop the reply is sent earlier. + %% gen_server2 *before* the reply is sent. case handle_ch_down(ChPid, State) of {ok, State1} -> reply(ok, State1); - {stop, State1} -> stop(From, ok, State1) + {stop, State1} -> stop(ok, State1) end; handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, @@ -1186,7 +1126,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, reply(ok, run_message_queue(State1#q{active_consumers = AC1})) end; -handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, +handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), case lookup_ch(ChPid) of @@ -1215,7 +1155,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> stop(From, ok, State1) + true -> stop(ok, State1) end end; @@ -1224,14 +1164,14 @@ handle_call(stat, _From, State) -> ensure_expiry_timer(State), reply({ok, BQ:len(BQS), consumer_count()}, State1); -handle_call({delete, IfUnused, IfEmpty}, From, +handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); IfUnused and not(IsUnused) -> reply({error, in_use}, State); - true -> stop(From, {ok, BQ:len(BQS)}, State) + true -> stop({ok, BQ:len(BQS)}, State) end; handle_call(purge, _From, State = #q{backing_queue = BQ, @@ -1286,19 +1226,6 @@ handle_call(force_event_refresh, _From, end, reply(ok, State). -handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> - {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), - State1 = case dtree:is_defined(QPid, UC1) of - false -> QMons = State#q.queue_monitors, - State#q{queue_monitors = pmon:demonitor(QPid, QMons)}; - true -> State - end, - cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State1#q{unconfirmed = UC1}); - -handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> - noreply(State); - handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); @@ -1405,15 +1332,6 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, handle_cast(wake_up, State) -> noreply(State). -%% We need to not ignore this as we need to remove outstanding -%% confirms due to queue death. -handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, - State = #q{delayed_stop = DS}) when DS =/= undefined -> - handle_queue_down(DownPid, Reason, State); - -handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> - noreply(State); - handle_info(maybe_expire, State) -> case is_unused(State) of true -> stop(State); @@ -1442,9 +1360,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, %% unexpectedly. stop(State); -handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) -> +handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of - {ok, State1} -> handle_queue_down(DownPid, Reason, State1); + {ok, State1} -> noreply(State1); {stop, State1} -> stop(State1) end; diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 9e98448d..b4bdd348 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -68,7 +68,8 @@ -spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok'). -spec(update/2 :: (name(), - fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok'). + fun((rabbit_types:exchange()) -> rabbit_types:exchange())) + -> not_found | rabbit_types:exchange()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: @@ -113,25 +114,39 @@ recover() -> callback(X, create, map_create_tx(Tx), [X]) end, rabbit_durable_exchange), + report_missing_decorators(Xs), [XName || #exchange{name = XName} <- Xs]. -callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> +report_missing_decorators(Xs) -> + Mods = lists:usort(lists:append([rabbit_exchange_decorator:select(raw, D) || + #exchange{decorators = D} <- Xs])), + case [M || M <- Mods, code:which(M) =:= non_existing] of + [] -> ok; + M -> rabbit_log:warning("Missing exchange decorators: ~p~n", [M]) + end. + +callback(X = #exchange{type = XType, + decorators = Decorators}, Fun, Serial0, Args) -> Serial = if is_function(Serial0) -> Serial0; is_atom(Serial0) -> fun (_Bool) -> Serial0 end end, [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) || - M <- registry_lookup(exchange_decorator)], + M <- rabbit_exchange_decorator:select(all, Decorators)], Module = type_to_module(XType), apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). -policy_changed(X = #exchange{type = XType}, X1) -> - [ok = M:policy_changed(X, X1) || - M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]], +policy_changed(X = #exchange{type = XType, + decorators = Decorators}, + X1 = #exchange{decorators = Decorators1}) -> + D = rabbit_exchange_decorator:select(all, Decorators), + D1 = rabbit_exchange_decorator:select(all, Decorators1), + DAll = lists:usort(D ++ D1), + [ok = M:policy_changed(X, X1) || M <- [type_to_module(XType) | DAll]], ok. -serialise_events(X = #exchange{type = Type}) -> +serialise_events(X = #exchange{type = Type, decorators = Decorators}) -> lists:any(fun (M) -> M:serialise_events(X) end, - registry_lookup(exchange_decorator)) + rabbit_exchange_decorator:select(all, Decorators)) orelse (type_to_module(Type)):serialise_events(). serial(#exchange{name = XName} = X) -> @@ -143,16 +158,6 @@ serial(#exchange{name = XName} = X) -> (false) -> none end. -registry_lookup(exchange_decorator_route = Class) -> - case get(exchange_decorator_route_modules) of - undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)], - put(exchange_decorator_route_modules, Mods), - Mods; - Mods -> Mods - end; -registry_lookup(Class) -> - [M || {_, M} <- rabbit_registry:lookup_all(Class)]. - declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = rabbit_policy:set(#exchange{name = XName, type = Type, @@ -273,7 +278,8 @@ update_scratch(Name, App, Fun) -> Scratches2 = orddict:store( App, Fun(Scratch), Scratches1), X#exchange{scratches = Scratches2} - end) + end), + ok end). update(Name, Fun) -> @@ -284,9 +290,10 @@ update(Name, Fun) -> case Durable of true -> ok = mnesia:write(rabbit_durable_exchange, X1, write); _ -> ok - end; + end, + X1; [] -> - ok + not_found end. info_keys() -> ?INFO_KEYS. @@ -318,15 +325,15 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -route(#exchange{name = #resource{virtual_host = VHost, - name = RName} = XName} = X, +route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, + decorators = Decorators} = X, #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> - case {registry_lookup(exchange_decorator_route), RName == <<"">>} of - {[], true} -> + case {RName, rabbit_exchange_decorator:select(route, Decorators)} of + {<<"">>, []} -> %% Optimisation [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; - {Decorators, _} -> - lists:usort(route1(Delivery, Decorators, {[X], XName, []})) + {_, SelectedDecorators} -> + lists:usort(route1(Delivery, SelectedDecorators, {[X], XName, []})) end. route1(_, _, {[], _, QNames}) -> diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 040b55db..3abaa48c 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -16,6 +16,10 @@ -module(rabbit_exchange_decorator). +-include("rabbit.hrl"). + +-export([select/2, set/1]). + %% This is like an exchange type except that: %% %% 1) It applies to all exchanges as soon as it is installed, therefore @@ -57,10 +61,13 @@ -callback remove_bindings(serial(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'. -%% Decorators can optionally implement route/2 which allows additional -%% destinations to be added to the routing decision. -%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> -%% [rabbit_amqqueue:name() | rabbit_exchange:name()]. +%% Allows additional destinations to be added to the routing decision. +-callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> + [rabbit_amqqueue:name() | rabbit_exchange:name()]. + +%% Whether the decorator wishes to receive callbacks for the exchange +%% none:no callbacks, noroute:all callbacks except route, all:all callbacks +-callback active_for(rabbit_types:exchange()) -> 'none' | 'noroute' | 'all'. -else. @@ -68,8 +75,32 @@ behaviour_info(callbacks) -> [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3}, - {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}]; + {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}, + {route, 2}, {active_for, 1}]; behaviour_info(_Other) -> undefined. -endif. + +%%---------------------------------------------------------------------------- + +%% select a subset of active decorators +select(all, {Route, NoRoute}) -> filter(Route ++ NoRoute); +select(route, {Route, _NoRoute}) -> filter(Route); +select(raw, {Route, NoRoute}) -> Route ++ NoRoute. + +filter(Modules) -> + [M || M <- Modules, code:which(M) =/= non_existing]. + +set(X) -> + Decs = lists:foldl(fun (D, {Route, NoRoute}) -> + ActiveFor = D:active_for(X), + {cons_if_eq(all, ActiveFor, D, Route), + cons_if_eq(noroute, ActiveFor, D, NoRoute)} + end, {[], []}, list()), + X#exchange{decorators = Decs}. + +list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. + +cons_if_eq(Select, Select, Item, List) -> [Item | List]; +cons_if_eq(_Select, _Other, _Item, List) -> List. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2344b1b2..c63321b5 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -51,6 +51,9 @@ -define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB + %% i.e. two pairs, so GC does not go idle when busy +-define(MAXIMUM_SIMULTANEOUS_GC_FILES, 4). + %%---------------------------------------------------------------------------- -record(msstate, @@ -1728,10 +1731,12 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> %% TODO: the algorithm here is sub-optimal - it may result in a %% complete traversal of FileSummaryEts. - case ets:first(FileSummaryEts) of - '$end_of_table' -> + First = ets:first(FileSummaryEts), + case First =:= '$end_of_table' orelse + orddict:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of + true -> State; - First -> + false -> case find_files_to_combine(FileSummaryEts, FileSizeLimit, ets:lookup(FileSummaryEts, First)) of not_found -> diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 7398cd2d..0990c662 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -46,7 +46,8 @@ name0(undefined) -> none; name0(Policy) -> pget(name, Policy). set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; -set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. +set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set( + X#exchange{policy = set0(Name)}). set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)). @@ -170,9 +171,14 @@ update_policies(VHost) -> update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> case match(XName, Policies) of OldPolicy -> no_change; - NewPolicy -> rabbit_exchange:update( - XName, fun(X1) -> X1#exchange{policy = NewPolicy} end), - {X, X#exchange{policy = NewPolicy}} + NewPolicy -> case rabbit_exchange:update( + XName, fun (X0) -> + rabbit_exchange_decorator:set( + X0 #exchange{policy = NewPolicy}) + end) of + #exchange{} = X1 -> {X, X1}; + not_found -> {X, X } + end end. update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index b1238623..96277b68 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -162,15 +162,16 @@ format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) -> {?'id-at-pseudonym' , "PSEUDONYM"}, {?'id-domainComponent' , "DC"}, {?'id-emailAddress' , "EMAILADDRESS"}, - {?'street-address' , "STREET"}], + {?'street-address' , "STREET"}, + {{0,9,2342,19200300,100,1,1} , "UID"}], %% Not in public_key.hrl case proplists:lookup(T, Fmts) of {_, Fmt} -> - io_lib:format(Fmt ++ "=~s", [FV]); + rabbit_misc:format(Fmt ++ "=~s", [FV]); none when is_tuple(T) -> - TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)], - io_lib:format("~s:~s", [string:join(TypeL, "."), FV]); + TypeL = [rabbit_misc:format("~w", [X]) || X <- tuple_to_list(T)], + rabbit_misc:format("~s=~s", [string:join(TypeL, "."), FV]); none -> - io_lib:format("~p:~s", [T, FV]) + rabbit_misc:format("~p=~s", [T, FV]) end. %% Escape a string as per RFC4514. @@ -204,14 +205,26 @@ format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString; format_directory_string(ST, S); format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2, $Z]}) -> - io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ", - [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]); + rabbit_misc:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ", + [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]); %% We appear to get an untagged value back for an ia5string %% (e.g. domainComponent). format_asn1_value(V) when is_list(V) -> V; +format_asn1_value(V) when is_binary(V) -> + %% OTP does not decode some values when combined with an unknown + %% type. That's probably wrong, so as a last ditch effort let's + %% try manually decoding. 'DirectoryString' is semi-arbitrary - + %% but it is the type which covers the various string types we + %% handle below. + try + {ST, S} = public_key:der_decode('DirectoryString', V), + format_directory_string(ST, S) + catch _:_ -> + rabbit_misc:format("~p", [V]) + end; format_asn1_value(V) -> - io_lib:format("~p", [V]). + rabbit_misc:format("~p", [V]). %% DirectoryString { INTEGER : maxSize } ::= CHOICE { %% teletexString TeletexString (SIZE (1..maxSize)), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e7b69879..27b588d1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -563,8 +563,9 @@ test_topic_matching() -> XName = #resource{virtual_host = <<"/">>, kind = exchange, name = <<"test_exchange">>}, - X = #exchange{name = XName, type = topic, durable = false, - auto_delete = false, arguments = []}, + X0 = #exchange{name = XName, type = topic, durable = false, + auto_delete = false, arguments = []}, + X = rabbit_exchange_decorator:set(X0), %% create rabbit_exchange_type_topic:validate(X), exchange_op_callback(X, create, []), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 457b1567..b7b1635b 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -43,6 +43,7 @@ -rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). -rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). -rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}). +-rabbit_upgrade({exchange_decorators, mnesia, [policy]}). %% ------------------------------------------------------------------- @@ -68,6 +69,7 @@ -spec(sync_slave_pids/0 :: () -> 'ok'). -spec(no_mirror_nodes/0 :: () -> 'ok'). -spec(gm_pids/0 :: () -> 'ok'). +-spec(exchange_decorators/0 :: () -> 'ok'). -endif. @@ -282,6 +284,20 @@ gm_pids() -> || T <- Tables], ok. +exchange_decorators() -> + ok = exchange_decorators(rabbit_exchange), + ok = exchange_decorators(rabbit_durable_exchange). + +exchange_decorators(Table) -> + transform( + Table, + fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches, + Policy}) -> + {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches, Policy, + {[], []}} + end, + [name, type, durable, auto_delete, internal, arguments, scratches, policy, + decorators]). %%-------------------------------------------------------------------- |