summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-04-18 17:33:09 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-04-18 17:33:09 +0100
commit831e26f3c6b2f4ce762f696db0f1bbba03bf3747 (patch)
tree00819116d338e3c3013818f88e2cedb590516e22
parent10557c85d7635e923511f5be47058637a5693a32 (diff)
parente4f0f5bbd1d66af7f3edfcac894202e4c9cb9aba (diff)
downloadrabbitmq-server-831e26f3c6b2f4ce762f696db0f1bbba03bf3747.tar.gz
Merge bug 25517.
-rw-r--r--include/rabbit.hrl2
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--src/rabbit_amqqueue_process.erl130
-rw-r--r--src/rabbit_exchange.erl61
-rw-r--r--src/rabbit_exchange_decorator.erl41
-rw-r--r--src/rabbit_msg_store.erl11
-rw-r--r--src/rabbit_policy.erl14
-rw-r--r--src/rabbit_ssl.erl29
-rw-r--r--src/rabbit_tests.erl5
-rw-r--r--src/rabbit_upgrade_functions.erl16
10 files changed, 154 insertions, 157 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index eeee799e..4282755d 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -40,7 +40,7 @@
-record(resource, {virtual_host, kind, name}).
-record(exchange, {name, type, durable, auto_delete, internal, arguments,
- scratches, policy}).
+ scratches, policy, decorators}).
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index d4526d87..3a15c4b6 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -9,7 +9,7 @@ Standards-Version: 3.9.2
Package: rabbitmq-server
Architecture: all
-Depends: erlang-nox (>= 1:12.b.3), adduser, logrotate, ${misc:Depends}
+Depends: erlang-nox (>= 1:12.b.3) | esl-erlang, adduser, logrotate, ${misc:Depends}
Description: AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b016c4d2..3712a625 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -49,10 +49,6 @@
ttl_timer_ref,
ttl_timer_expiry,
senders,
- publish_seqno,
- unconfirmed,
- delayed_stop,
- queue_monitors,
dlx,
dlx_routing_key,
max_length,
@@ -151,9 +147,6 @@ init_state(Q) ->
has_had_consumers = false,
active_consumers = queue:new(),
senders = pmon:new(),
- publish_seqno = 1,
- unconfirmed = dtree:empty(),
- queue_monitors = pmon:new(),
msg_id_to_channel = gb_trees:empty(),
status = running},
rabbit_event:init_stats_timer(State, #q.stats_timer).
@@ -820,80 +813,31 @@ dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
State1.
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
- publish_seqno = SeqNo0,
- unconfirmed = UC0,
- queue_monitors = QMons0,
backing_queue_state = BQS,
backing_queue = BQ}) ->
QName = qname(State),
- {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} =
- Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) ->
- case dead_letter_publish(Msg, Reason,
- X, RK, SeqNo, QName) of
- [] -> {[AckTag | AckImm], SeqNo, UC, QMons};
- QPids -> {AckImm, SeqNo + 1,
- dtree:insert(SeqNo, QPids, AckTag, UC),
- pmon:monitor_all(QPids, QMons)}
- end
- end, {[], SeqNo0, UC0, QMons0}, BQS),
- {_Guids, BQS2} = BQ:ack(AckImm1, BQS1),
- {Res, State#q{publish_seqno = SeqNo1,
- unconfirmed = UC1,
- queue_monitors = QMons1,
- backing_queue_state = BQS2}}.
-
-dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) ->
+ {Res, Acks1, BQS1} =
+ Fun(fun (Msg, AckTag, Acks) ->
+ dead_letter_publish(Msg, Reason, X, RK, QName),
+ [AckTag | Acks]
+ end, [], BQS),
+ {_Guids, BQS2} = BQ:ack(Acks1, BQS1),
+ {Res, State#q{backing_queue_state = BQS2}}.
+
+dead_letter_publish(Msg, Reason, X, RK, QName) ->
DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
- Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
+ Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(
- rabbit_amqqueue:lookup(Queues), Delivery),
- DeliveredQPids.
-
-handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
- unconfirmed = UC}) ->
- case pmon:is_monitored(QPid, QMons) of
- false -> noreply(State);
- true -> case rabbit_misc:is_abnormal_exit(Reason) of
- true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
- QNameS = rabbit_misc:rs(qname(State)),
- rabbit_log:warning("DLQ ~p for ~s died with "
- "~p unconfirmed messages~n",
- [QPid, QNameS, length(Lost)]);
- false -> ok
- end,
- {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
- cleanup_after_confirm(
- [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State#q{queue_monitors = pmon:erase(QPid, QMons),
- unconfirmed = UC1})
- end.
+ rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery),
+ ok.
-stop(State) -> stop(undefined, noreply, State).
+stop(State) -> stop(noreply, State).
-stop(From, Reply, State = #q{unconfirmed = UC}) ->
- case {dtree:is_empty(UC), Reply} of
- {true, noreply} -> {stop, normal, State};
- {true, _} -> {stop, normal, Reply, State};
- {false, _} -> noreply(State#q{delayed_stop = {From, Reply}})
- end.
+stop(noreply, State) -> {stop, normal, State};
+stop(Reply, State) -> {stop, normal, Reply, State}.
-cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
- unconfirmed = UC,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State1 = State#q{backing_queue_state = BQS1},
- case dtree:is_empty(UC) andalso DS =/= undefined of
- true -> case DS of
- {_, noreply} -> ok;
- {From, Reply} -> gen_server2:reply(From, Reply)
- end,
- {stop, normal, State1};
- false -> noreply(State1)
- end.
detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
@@ -1073,9 +1017,6 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
_ -> 0
end.
-handle_call(_, _, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -1115,16 +1056,15 @@ handle_call({deliver, Delivery, Delivered}, From, State) ->
gen_server2:reply(From, ok),
noreply(deliver_or_enqueue(Delivery, Delivered, State));
-handle_call({notify_down, ChPid}, From, State) ->
+handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
%% client. The queue is ultimately deleted in terminate/2; if we
%% return stop with a reply, terminate/2 will be called by
- %% gen_server2 *before* the reply is sent. FIXME: in case of a
- %% delayed stop the reply is sent earlier.
+ %% gen_server2 *before* the reply is sent.
case handle_ch_down(ChPid, State) of
{ok, State1} -> reply(ok, State1);
- {stop, State1} -> stop(From, ok, State1)
+ {stop, State1} -> stop(ok, State1)
end;
handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
@@ -1186,7 +1126,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;
-handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
+handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State = #q{exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
case lookup_ch(ChPid) of
@@ -1215,7 +1155,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true -> stop(From, ok, State1)
+ true -> stop(ok, State1)
end
end;
@@ -1224,14 +1164,14 @@ handle_call(stat, _From, State) ->
ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), consumer_count()}, State1);
-handle_call({delete, IfUnused, IfEmpty}, From,
+handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
IfUnused and not(IsUnused) -> reply({error, in_use}, State);
- true -> stop(From, {ok, BQ:len(BQS)}, State)
+ true -> stop({ok, BQ:len(BQS)}, State)
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
@@ -1286,19 +1226,6 @@ handle_call(force_event_refresh, _From,
end,
reply(ok, State).
-handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
- {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
- State1 = case dtree:is_defined(QPid, UC1) of
- false -> QMons = State#q.queue_monitors,
- State#q{queue_monitors = pmon:demonitor(QPid, QMons)};
- true -> State
- end,
- cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State1#q{unconfirmed = UC1});
-
-handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
@@ -1405,15 +1332,6 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
handle_cast(wake_up, State) ->
noreply(State).
-%% We need to not ignore this as we need to remove outstanding
-%% confirms due to queue death.
-handle_info({'DOWN', _MonitorRef, process, DownPid, Reason},
- State = #q{delayed_stop = DS}) when DS =/= undefined ->
- handle_queue_down(DownPid, Reason, State);
-
-handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_info(maybe_expire, State) ->
case is_unused(State) of
true -> stop(State);
@@ -1442,9 +1360,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% unexpectedly.
stop(State);
-handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) ->
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
- {ok, State1} -> handle_queue_down(DownPid, Reason, State1);
+ {ok, State1} -> noreply(State1);
{stop, State1} -> stop(State1)
end;
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 9e98448d..b4bdd348 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -68,7 +68,8 @@
-spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok').
-spec(update/2 ::
(name(),
- fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok').
+ fun((rabbit_types:exchange()) -> rabbit_types:exchange()))
+ -> not_found | rabbit_types:exchange()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -113,25 +114,39 @@ recover() ->
callback(X, create, map_create_tx(Tx), [X])
end,
rabbit_durable_exchange),
+ report_missing_decorators(Xs),
[XName || #exchange{name = XName} <- Xs].
-callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
+report_missing_decorators(Xs) ->
+ Mods = lists:usort(lists:append([rabbit_exchange_decorator:select(raw, D) ||
+ #exchange{decorators = D} <- Xs])),
+ case [M || M <- Mods, code:which(M) =:= non_existing] of
+ [] -> ok;
+ M -> rabbit_log:warning("Missing exchange decorators: ~p~n", [M])
+ end.
+
+callback(X = #exchange{type = XType,
+ decorators = Decorators}, Fun, Serial0, Args) ->
Serial = if is_function(Serial0) -> Serial0;
is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end,
[ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
- M <- registry_lookup(exchange_decorator)],
+ M <- rabbit_exchange_decorator:select(all, Decorators)],
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
-policy_changed(X = #exchange{type = XType}, X1) ->
- [ok = M:policy_changed(X, X1) ||
- M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]],
+policy_changed(X = #exchange{type = XType,
+ decorators = Decorators},
+ X1 = #exchange{decorators = Decorators1}) ->
+ D = rabbit_exchange_decorator:select(all, Decorators),
+ D1 = rabbit_exchange_decorator:select(all, Decorators1),
+ DAll = lists:usort(D ++ D1),
+ [ok = M:policy_changed(X, X1) || M <- [type_to_module(XType) | DAll]],
ok.
-serialise_events(X = #exchange{type = Type}) ->
+serialise_events(X = #exchange{type = Type, decorators = Decorators}) ->
lists:any(fun (M) -> M:serialise_events(X) end,
- registry_lookup(exchange_decorator))
+ rabbit_exchange_decorator:select(all, Decorators))
orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) ->
@@ -143,16 +158,6 @@ serial(#exchange{name = XName} = X) ->
(false) -> none
end.
-registry_lookup(exchange_decorator_route = Class) ->
- case get(exchange_decorator_route_modules) of
- undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)],
- put(exchange_decorator_route_modules, Mods),
- Mods;
- Mods -> Mods
- end;
-registry_lookup(Class) ->
- [M || {_, M} <- rabbit_registry:lookup_all(Class)].
-
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = rabbit_policy:set(#exchange{name = XName,
type = Type,
@@ -273,7 +278,8 @@ update_scratch(Name, App, Fun) ->
Scratches2 = orddict:store(
App, Fun(Scratch), Scratches1),
X#exchange{scratches = Scratches2}
- end)
+ end),
+ ok
end).
update(Name, Fun) ->
@@ -284,9 +290,10 @@ update(Name, Fun) ->
case Durable of
true -> ok = mnesia:write(rabbit_durable_exchange, X1, write);
_ -> ok
- end;
+ end,
+ X1;
[] ->
- ok
+ not_found
end.
info_keys() -> ?INFO_KEYS.
@@ -318,15 +325,15 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-route(#exchange{name = #resource{virtual_host = VHost,
- name = RName} = XName} = X,
+route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
+ decorators = Decorators} = X,
#delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
- case {registry_lookup(exchange_decorator_route), RName == <<"">>} of
- {[], true} ->
+ case {RName, rabbit_exchange_decorator:select(route, Decorators)} of
+ {<<"">>, []} ->
%% Optimisation
[rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
- {Decorators, _} ->
- lists:usort(route1(Delivery, Decorators, {[X], XName, []}))
+ {_, SelectedDecorators} ->
+ lists:usort(route1(Delivery, SelectedDecorators, {[X], XName, []}))
end.
route1(_, _, {[], _, QNames}) ->
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 040b55db..3abaa48c 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -16,6 +16,10 @@
-module(rabbit_exchange_decorator).
+-include("rabbit.hrl").
+
+-export([select/2, set/1]).
+
%% This is like an exchange type except that:
%%
%% 1) It applies to all exchanges as soon as it is installed, therefore
@@ -57,10 +61,13 @@
-callback remove_bindings(serial(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok'.
-%% Decorators can optionally implement route/2 which allows additional
-%% destinations to be added to the routing decision.
-%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
-%% [rabbit_amqqueue:name() | rabbit_exchange:name()].
+%% Allows additional destinations to be added to the routing decision.
+-callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+ [rabbit_amqqueue:name() | rabbit_exchange:name()].
+
+%% Whether the decorator wishes to receive callbacks for the exchange
+%% none:no callbacks, noroute:all callbacks except route, all:all callbacks
+-callback active_for(rabbit_types:exchange()) -> 'none' | 'noroute' | 'all'.
-else.
@@ -68,8 +75,32 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
- {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}];
+ {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3},
+ {route, 2}, {active_for, 1}];
behaviour_info(_Other) ->
undefined.
-endif.
+
+%%----------------------------------------------------------------------------
+
+%% select a subset of active decorators
+select(all, {Route, NoRoute}) -> filter(Route ++ NoRoute);
+select(route, {Route, _NoRoute}) -> filter(Route);
+select(raw, {Route, NoRoute}) -> Route ++ NoRoute.
+
+filter(Modules) ->
+ [M || M <- Modules, code:which(M) =/= non_existing].
+
+set(X) ->
+ Decs = lists:foldl(fun (D, {Route, NoRoute}) ->
+ ActiveFor = D:active_for(X),
+ {cons_if_eq(all, ActiveFor, D, Route),
+ cons_if_eq(noroute, ActiveFor, D, NoRoute)}
+ end, {[], []}, list()),
+ X#exchange{decorators = Decs}.
+
+list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
+
+cons_if_eq(Select, Select, Item, List) -> [Item | List];
+cons_if_eq(_Select, _Other, _Item, List) -> List.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 2344b1b2..c63321b5 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -51,6 +51,9 @@
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
+ %% i.e. two pairs, so GC does not go idle when busy
+-define(MAXIMUM_SIMULTANEOUS_GC_FILES, 4).
+
%%----------------------------------------------------------------------------
-record(msstate,
@@ -1728,10 +1731,12 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
(SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
%% TODO: the algorithm here is sub-optimal - it may result in a
%% complete traversal of FileSummaryEts.
- case ets:first(FileSummaryEts) of
- '$end_of_table' ->
+ First = ets:first(FileSummaryEts),
+ case First =:= '$end_of_table' orelse
+ orddict:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of
+ true ->
State;
- First ->
+ false ->
case find_files_to_combine(FileSummaryEts, FileSizeLimit,
ets:lookup(FileSummaryEts, First)) of
not_found ->
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 7398cd2d..0990c662 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -46,7 +46,8 @@ name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
-set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
+set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
+ X#exchange{policy = set0(Name)}).
set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
@@ -170,9 +171,14 @@ update_policies(VHost) ->
update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
case match(XName, Policies) of
OldPolicy -> no_change;
- NewPolicy -> rabbit_exchange:update(
- XName, fun(X1) -> X1#exchange{policy = NewPolicy} end),
- {X, X#exchange{policy = NewPolicy}}
+ NewPolicy -> case rabbit_exchange:update(
+ XName, fun (X0) ->
+ rabbit_exchange_decorator:set(
+ X0 #exchange{policy = NewPolicy})
+ end) of
+ #exchange{} = X1 -> {X, X1};
+ not_found -> {X, X }
+ end
end.
update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index b1238623..96277b68 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -162,15 +162,16 @@ format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) ->
{?'id-at-pseudonym' , "PSEUDONYM"},
{?'id-domainComponent' , "DC"},
{?'id-emailAddress' , "EMAILADDRESS"},
- {?'street-address' , "STREET"}],
+ {?'street-address' , "STREET"},
+ {{0,9,2342,19200300,100,1,1} , "UID"}], %% Not in public_key.hrl
case proplists:lookup(T, Fmts) of
{_, Fmt} ->
- io_lib:format(Fmt ++ "=~s", [FV]);
+ rabbit_misc:format(Fmt ++ "=~s", [FV]);
none when is_tuple(T) ->
- TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)],
- io_lib:format("~s:~s", [string:join(TypeL, "."), FV]);
+ TypeL = [rabbit_misc:format("~w", [X]) || X <- tuple_to_list(T)],
+ rabbit_misc:format("~s=~s", [string:join(TypeL, "."), FV]);
none ->
- io_lib:format("~p:~s", [T, FV])
+ rabbit_misc:format("~p=~s", [T, FV])
end.
%% Escape a string as per RFC4514.
@@ -204,14 +205,26 @@ format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString;
format_directory_string(ST, S);
format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
Min1, Min2, S1, S2, $Z]}) ->
- io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
- [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
+ rabbit_misc:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
+ [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
%% We appear to get an untagged value back for an ia5string
%% (e.g. domainComponent).
format_asn1_value(V) when is_list(V) ->
V;
+format_asn1_value(V) when is_binary(V) ->
+ %% OTP does not decode some values when combined with an unknown
+ %% type. That's probably wrong, so as a last ditch effort let's
+ %% try manually decoding. 'DirectoryString' is semi-arbitrary -
+ %% but it is the type which covers the various string types we
+ %% handle below.
+ try
+ {ST, S} = public_key:der_decode('DirectoryString', V),
+ format_directory_string(ST, S)
+ catch _:_ ->
+ rabbit_misc:format("~p", [V])
+ end;
format_asn1_value(V) ->
- io_lib:format("~p", [V]).
+ rabbit_misc:format("~p", [V]).
%% DirectoryString { INTEGER : maxSize } ::= CHOICE {
%% teletexString TeletexString (SIZE (1..maxSize)),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e7b69879..27b588d1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -563,8 +563,9 @@ test_topic_matching() ->
XName = #resource{virtual_host = <<"/">>,
kind = exchange,
name = <<"test_exchange">>},
- X = #exchange{name = XName, type = topic, durable = false,
- auto_delete = false, arguments = []},
+ X0 = #exchange{name = XName, type = topic, durable = false,
+ auto_delete = false, arguments = []},
+ X = rabbit_exchange_decorator:set(X0),
%% create
rabbit_exchange_type_topic:validate(X),
exchange_op_callback(X, create, []),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 457b1567..b7b1635b 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -43,6 +43,7 @@
-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}).
-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
+-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
%% -------------------------------------------------------------------
@@ -68,6 +69,7 @@
-spec(sync_slave_pids/0 :: () -> 'ok').
-spec(no_mirror_nodes/0 :: () -> 'ok').
-spec(gm_pids/0 :: () -> 'ok').
+-spec(exchange_decorators/0 :: () -> 'ok').
-endif.
@@ -282,6 +284,20 @@ gm_pids() ->
|| T <- Tables],
ok.
+exchange_decorators() ->
+ ok = exchange_decorators(rabbit_exchange),
+ ok = exchange_decorators(rabbit_durable_exchange).
+
+exchange_decorators(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches,
+ Policy}) ->
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches, Policy,
+ {[], []}}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches, policy,
+ decorators]).
%%--------------------------------------------------------------------