summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-18 16:10:10 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-18 16:10:10 +0100
commitdd6702a9dffd892921b8260c346097485fd671f2 (patch)
tree169f49cecbaa6012eba1a56a189cb6894819311d
parent28f49c60d863c5a8a146b5b7a476d2919d558442 (diff)
parent8c21276b4c97d4be3060f2195e75d80364fa93ea (diff)
downloadrabbitmq-server-bug23319.tar.gz
merge default into bug23319bug23319
-rw-r--r--docs/rabbitmqctl.1.xml31
-rw-r--r--include/rabbit.hrl4
-rw-r--r--include/rabbit_exchange_type_spec.hrl4
-rw-r--r--src/rabbit_amqqueue.erl45
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_binding.erl346
-rw-r--r--src/rabbit_channel.erl90
-rw-r--r--src/rabbit_control.erl21
-rw-r--r--src/rabbit_exchange.erl95
-rw-r--r--src/rabbit_exchange_type.erl2
-rw-r--r--src/rabbit_exchange_type_direct.erl9
-rw-r--r--src/rabbit_exchange_type_fanout.erl6
-rw-r--r--src/rabbit_exchange_type_headers.erl15
-rw-r--r--src/rabbit_exchange_type_topic.erl13
-rw-r--r--src/rabbit_mnesia.erl10
-rw-r--r--src/rabbit_msg_store.erl109
-rw-r--r--src/rabbit_multi.erl12
-rw-r--r--src/rabbit_plugin_activator.erl20
-rw-r--r--src/rabbit_queue_collector.erl41
-rw-r--r--src/rabbit_router.erl58
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_types.erl19
22 files changed, 550 insertions, 421 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index f926d94f..acb99bc8 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -884,16 +884,31 @@
</para>
<variablelist>
<varlistentry>
- <term>exchange_name</term>
- <listitem><para>The name of the exchange to which the
- binding is attached. with non-ASCII characters
- escaped as in C.</para></listitem>
+ <term>source_name</term>
+ <listitem><para>The name of the source of messages to
+ which the binding is attached. With non-ASCII
+ characters escaped as in C.</para></listitem>
</varlistentry>
<varlistentry>
- <term>queue_name</term>
- <listitem><para>The name of the queue to which the
- binding is attached. with non-ASCII characters
- escaped as in C.</para></listitem>
+ <term>source_kind</term>
+ <listitem><para>The kind of the source of messages to
+ which the binding is attached. Currently always
+ queue. With non-ASCII characters escaped as in
+ C.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>destination_name</term>
+ <listitem><para>The name of the destination of
+ messages to which the binding is attached. With
+ non-ASCII characters escaped as in
+ C.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>destination_kind</term>
+ <listitem><para>The kind of the destination of
+ messages to which the binding is attached. With
+ non-ASCII characters escaped as in
+ C.</para></listitem>
</varlistentry>
<varlistentry>
<term>routing_key</term>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 7661e34c..28e3925d 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -60,8 +60,8 @@
-record(route, {binding, value = const}).
-record(reverse_route, {reverse_binding, value = const}).
--record(binding, {exchange_name, key, queue_name, args = []}).
--record(reverse_binding, {queue_name, key, exchange_name, args = []}).
+-record(binding, {source, key, destination, args = []}).
+-record(reverse_binding, {destination, key, source, args = []}).
-record(listener, {node, protocol, host, port}).
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index cecd666b..ae326a87 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -31,8 +31,8 @@
-ifdef(use_specs).
-spec(description/0 :: () -> [{atom(), any()}]).
--spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
- -> {rabbit_router:routing_result(), [pid()]}).
+-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
+ -> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
-spec(create/1 :: (rabbit_types:exchange()) -> 'ok').
-spec(recover/2 :: (rabbit_types:exchange(),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 24320f51..2389ec86 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]).
+-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
@@ -115,9 +115,7 @@
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
--spec(delete_exclusive/1 :: (rabbit_types:amqqueue())
- -> rabbit_types:ok_or_error2(qlen(),
- 'not_exclusive')).
+-spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -254,10 +252,10 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_binding:add(#binding{exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = []}).
+ rabbit_binding:add(#binding{source = ExchangeName,
+ destination = QueueName,
+ key = RoutingKey,
+ args = []}).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
@@ -362,8 +360,8 @@ stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
-delete_exclusive(#amqqueue{ pid = QPid }) ->
- gen_server2:call(QPid, delete_exclusive, infinity).
+delete_immediately(#amqqueue{ pid = QPid }) ->
+ gen_server2:cast(QPid, delete_immediately).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -439,7 +437,7 @@ internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_durable_queue, QueueName}),
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
- rabbit_binding:remove_for_queue(QueueName).
+ rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
case rabbit_misc:execute_mnesia_transaction(
@@ -450,8 +448,7 @@ internal_delete(QueueName) ->
end
end) of
{error, _} = Err -> Err;
- PostHook -> PostHook(),
- ok
+ Deletions -> ok = rabbit_binding:process_deletions(Deletions)
end.
maybe_run_queue_via_backing_queue(QPid, Fun) ->
@@ -470,19 +467,20 @@ maybe_expire(QPid) ->
gen_server2:cast(QPid, maybe_expire).
on_node_down(Node) ->
- [Hook() ||
- Hook <- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
- end)],
- ok.
+ rabbit_binding:process_deletions(
+ lists:foldl(
+ fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
+ #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node]))
+ end))).
delete_queue(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
- rabbit_binding:remove_transient_for_queue(QueueName).
+ rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
@@ -508,4 +506,3 @@ delegate_call(Pid, Msg, Timeout) ->
delegate_cast(Pid, Msg) ->
delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
-
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 61204deb..19db731a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -594,7 +594,6 @@ prioritise_call(Msg, _From, _State) ->
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
- delete_exclusive -> 8;
{maybe_run_queue_via_backing_queue, _Fun} -> 6;
_ -> 0
end.
@@ -602,6 +601,7 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
update_ram_duration -> 8;
+ delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
maybe_expire -> 8;
@@ -787,16 +787,6 @@ handle_call(stat, _From, State = #q{backing_queue = BQ,
reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)},
ensure_expiry_timer(State));
-handle_call(delete_exclusive, _From,
- State = #q{ backing_queue_state = BQS,
- backing_queue = BQ,
- q = #amqqueue{exclusive_owner = Owner}
- }) when Owner =/= none ->
- {stop, normal, {ok, BQ:len(BQS)}, State};
-
-handle_call(delete_exclusive, _From, State) ->
- reply({error, not_exclusive}, State);
-
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
@@ -868,6 +858,9 @@ handle_cast({reject, AckTags, Requeue, ChPid},
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
+handle_cast(delete_immediately, State) ->
+ {stop, normal, State};
+
handle_cast({unblock, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 19150fa9..1af213c4 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -33,29 +33,35 @@
-include("rabbit.hrl").
-export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]).
--export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]).
+-export([list_for_source/1, list_for_destination/1,
+ list_for_source_and_destination/2]).
+-export([new_deletions/0, combine_deletions/2, add_deletion/3,
+ process_deletions/1]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
--export([has_for_exchange/1, remove_for_exchange/1,
- remove_for_queue/1, remove_transient_for_queue/1]).
+-export([has_for_source/1, remove_for_source/1,
+ remove_for_destination/1, remove_transient_for_destination/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([key/0]).
+-export_type([key/0, deletions/0]).
-type(key() :: binary()).
--type(bind_errors() :: rabbit_types:error('queue_not_found' |
- 'exchange_not_found' |
- 'exchange_and_queue_not_found')).
+-type(bind_errors() :: rabbit_types:error('source_not_found' |
+ 'destination_not_found' |
+ 'source_and_destination_not_found')).
-type(bind_res() :: 'ok' | bind_errors()).
-type(inner_fun() ::
- fun((rabbit_types:exchange(), queue()) ->
+ fun((rabbit_types:exchange(),
+ rabbit_types:exchange() | rabbit_types:amqqueue()) ->
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-type(bindings() :: [rabbit_types:binding()]).
+-opaque(deletions() :: dict:dictionary()).
+
-spec(recover/0 :: () -> [rabbit_types:binding()]).
-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
-spec(add/1 :: (rabbit_types:binding()) -> bind_res()).
@@ -65,10 +71,13 @@
-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) ->
bind_res() | rabbit_types:error('binding_not_found')).
-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
--spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
--spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()).
--spec(list_for_exchange_and_queue/2 ::
- (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()).
+-spec(list_for_source/1 ::
+ (rabbit_types:binding_source()) -> bindings()).
+-spec(list_for_destination/1 ::
+ (rabbit_types:binding_destination()) -> bindings()).
+-spec(list_for_source_and_destination/2 ::
+ (rabbit_types:binding_source(), rabbit_types:binding_destination()) ->
+ bindings()).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]).
-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) ->
@@ -76,18 +85,27 @@
-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
-> [[rabbit_types:info()]]).
--spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()).
--spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
--spec(remove_for_queue/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
--spec(remove_transient_for_queue/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
+-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()).
+-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()).
+-spec(remove_for_destination/1 ::
+ (rabbit_types:binding_destination()) -> deletions()).
+-spec(remove_transient_for_destination/1 ::
+ (rabbit_types:binding_destination()) -> deletions()).
+-spec(process_deletions/1 :: (deletions()) -> 'ok').
+-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()).
+-spec(add_deletion/3 :: (rabbit_exchange:name(),
+ {'undefined' | rabbit_types:binding_source(),
+ 'deleted' | 'not_deleted',
+ deletions()}, deletions()) -> deletions()).
+-spec(new_deletions/0 :: () -> deletions()).
-endif.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]).
+-define(INFO_KEYS, [source_name, source_kind,
+ destination_name, destination_kind,
+ routing_key, arguments]).
recover() ->
rabbit_misc:table_fold(
@@ -101,36 +119,34 @@ recover() ->
exists(Binding) ->
binding_action(
Binding,
- fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end).
+ fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end).
-add(Binding) -> add(Binding, fun (_X, _Q) -> ok end).
+add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end).
-remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end).
+remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
add(Binding, InnerFun) ->
case binding_action(
Binding,
- fun (X, Q, B) ->
+ fun (Src, Dst, B) ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
- case InnerFun(X, Q) of
+ case InnerFun(Src, Dst) of
ok ->
case mnesia:read({rabbit_route, B}) of
- [] -> Durable = (X#exchange.durable andalso
- Q#amqqueue.durable),
- ok = sync_binding(
- B, Durable,
+ [] -> ok = sync_binding(
+ B, all_durable([Src, Dst]),
fun mnesia:write/3),
- {new, X, B};
- [_] -> {existing, X, B}
+ {new, Src, B};
+ [_] -> {existing, Src, B}
end;
{error, _} = E ->
E
end
end) of
- {new, X = #exchange{ type = Type }, B} ->
- ok = (type_to_module(Type)):add_binding(X, B),
+ {new, Src = #exchange{ type = Type }, B} ->
+ ok = (type_to_module(Type)):add_binding(Src, B),
rabbit_event:notify(binding_created, info(B));
{existing, _, _} ->
ok;
@@ -141,61 +157,55 @@ add(Binding, InnerFun) ->
remove(Binding, InnerFun) ->
case binding_action(
Binding,
- fun (X, Q, B) ->
+ fun (Src, Dst, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
- [] -> {error, binding_not_found};
- [_] -> case InnerFun(X, Q) of
- ok ->
- Durable = (X#exchange.durable andalso
- Q#amqqueue.durable),
- ok = sync_binding(
- B, Durable,
- fun mnesia:delete_object/3),
- Deleted =
- rabbit_exchange:maybe_auto_delete(X),
- {{Deleted, X}, B};
- {error, _} = E ->
- E
- end
+ [] ->
+ {error, binding_not_found};
+ [_] ->
+ case InnerFun(Src, Dst) of
+ ok ->
+ ok = sync_binding(
+ B, all_durable([Src, Dst]),
+ fun mnesia:delete_object/3),
+ {ok,
+ maybe_auto_delete(B#binding.source,
+ [B], new_deletions())};
+ {error, _} = E ->
+ E
+ end
end
end) of
{error, _} = Err ->
Err;
- {{IsDeleted, X = #exchange{ type = Type }}, B} ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> ok = Module:delete(X, [B]);
- not_deleted -> ok = Module:remove_bindings(X, [B])
- end,
- rabbit_event:notify(binding_deleted, info(B)),
- ok
+ {ok, Deletions} ->
+ ok = process_deletions(Deletions)
end.
list(VHostPath) ->
- Route = #route{binding = #binding{
- exchange_name = rabbit_misc:r(VHostPath, exchange),
- queue_name = rabbit_misc:r(VHostPath, queue),
- _ = '_'},
+ VHostResource = rabbit_misc:r(VHostPath, '_'),
+ Route = #route{binding = #binding{source = VHostResource,
+ destination = VHostResource,
+ _ = '_'},
_ = '_'},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
-list_for_exchange(XName) ->
- Route = #route{binding = #binding{exchange_name = XName, _ = '_'}},
+list_for_source(SrcName) ->
+ Route = #route{binding = #binding{source = SrcName, _ = '_'}},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
-list_for_queue(QueueName) ->
- Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}},
+list_for_destination(DstName) ->
+ Route = #route{binding = #binding{destination = DstName, _ = '_'}},
[reverse_binding(B) || #reverse_route{reverse_binding = B} <-
mnesia:dirty_match_object(rabbit_reverse_route,
reverse_route(Route))].
-list_for_exchange_and_queue(XName, QueueName) ->
- Route = #route{binding = #binding{exchange_name = XName,
- queue_name = QueueName,
- _ = '_'}},
+list_for_source_and_destination(SrcName, DstName) ->
+ Route = #route{binding = #binding{source = SrcName,
+ destination = DstName,
+ _ = '_'}},
[B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
Route)].
@@ -208,10 +218,12 @@ map(VHostPath, F) ->
infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items].
-i(exchange_name, #binding{exchange_name = XName}) -> XName;
-i(queue_name, #binding{queue_name = QName}) -> QName;
-i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
-i(arguments, #binding{args = Arguments}) -> Arguments;
+i(source_name, #binding{source = SrcName}) -> SrcName#resource.name;
+i(source_kind, #binding{source = SrcName}) -> SrcName#resource.kind;
+i(destination_name, #binding{destination = DstName}) -> DstName#resource.name;
+i(destination_kind, #binding{destination = DstName}) -> DstName#resource.kind;
+i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
+i(arguments, #binding{args = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
info(B = #binding{}) -> infos(?INFO_KEYS, B).
@@ -222,14 +234,14 @@ info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end).
-has_for_exchange(XName) ->
- Match = #route{binding = #binding{exchange_name = XName, _ = '_'}},
+has_for_source(SrcName) ->
+ Match = #route{binding = #binding{source = SrcName, _ = '_'}},
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
-remove_for_exchange(XName) ->
+remove_for_source(SrcName) ->
[begin
ok = mnesia:delete_object(rabbit_reverse_route,
reverse_route(Route), write),
@@ -237,26 +249,31 @@ remove_for_exchange(XName) ->
Route#route.binding
end || Route <- mnesia:match_object(
rabbit_route,
- #route{binding = #binding{exchange_name = XName,
- _ = '_'}},
+ #route{binding = #binding{source = SrcName,
+ _ = '_'}},
write)].
-remove_for_queue(QueueName) ->
- remove_for_queue(QueueName, fun delete_forward_routes/1).
+remove_for_destination(DstName) ->
+ remove_for_destination(DstName, fun delete_forward_routes/1).
-remove_transient_for_queue(QueueName) ->
- remove_for_queue(QueueName, fun delete_transient_forward_routes/1).
+remove_transient_for_destination(DstName) ->
+ remove_for_destination(DstName, fun delete_transient_forward_routes/1).
%%----------------------------------------------------------------------------
-binding_action(Binding = #binding{exchange_name = XName,
- queue_name = QueueName,
- args = Arguments}, Fun) ->
- call_with_exchange_and_queue(
- XName, QueueName,
- fun (X, Q) ->
+all_durable(Resources) ->
+ lists:all(fun (#exchange{durable = D}) -> D;
+ (#amqqueue{durable = D}) -> D
+ end, Resources).
+
+binding_action(Binding = #binding{source = SrcName,
+ destination = DstName,
+ args = Arguments}, Fun) ->
+ call_with_source_and_destination(
+ SrcName, DstName,
+ fun (Src, Dst) ->
SortedArgs = rabbit_misc:sort_field_table(Arguments),
- Fun(X, Q, Binding#binding{args = SortedArgs})
+ Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -270,17 +287,22 @@ sync_binding(Binding, Durable, Fun) ->
ok = Fun(rabbit_reverse_route, ReverseRoute, write),
ok.
-call_with_exchange_and_queue(XName, QueueName, Fun) ->
+call_with_source_and_destination(SrcName, DstName, Fun) ->
+ SrcTable = table_for_resource(SrcName),
+ DstTable = table_for_resource(DstName),
rabbit_misc:execute_mnesia_transaction(
- fun () -> case {mnesia:read({rabbit_exchange, XName}),
- mnesia:read({rabbit_queue, QueueName})} of
- {[X], [Q]} -> Fun(X, Q);
- {[ ], [_]} -> {error, exchange_not_found};
- {[_], [ ]} -> {error, queue_not_found};
- {[ ], [ ]} -> {error, exchange_and_queue_not_found}
- end
+ fun () -> case {mnesia:read({SrcTable, SrcName}),
+ mnesia:read({DstTable, DstName})} of
+ {[Src], [Dst]} -> Fun(Src, Dst);
+ {[], [_] } -> {error, source_not_found};
+ {[_], [] } -> {error, destination_not_found};
+ {[], [] } -> {error, source_and_destination_not_found}
+ end
end).
+table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
+table_for_resource(#resource{kind = queue}) -> rabbit_queue.
+
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
{ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
@@ -293,8 +315,8 @@ continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-remove_for_queue(QueueName, FwdDeleteFun) ->
- DeletedBindings =
+remove_for_destination(DstName, FwdDeleteFun) ->
+ Bindings =
[begin
Route = reverse_route(ReverseRoute),
ok = FwdDeleteFun(Route),
@@ -304,40 +326,41 @@ remove_for_queue(QueueName, FwdDeleteFun) ->
end || ReverseRoute
<- mnesia:match_object(
rabbit_reverse_route,
- reverse_route(#route{binding = #binding{
- queue_name = QueueName,
- _ = '_'}}),
+ reverse_route(#route{
+ binding = #binding{
+ destination = DstName,
+ _ = '_'}}),
write)],
- Grouped = group_bindings_and_auto_delete(
- lists:keysort(#binding.exchange_name, DeletedBindings), []),
- fun () ->
- lists:foreach(
- fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, Bs);
- not_deleted -> Module:remove_bindings(X, Bs)
- end
- end, Grouped)
- end.
+ group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
+ lists:keysort(#binding.source, Bindings)).
%% Requires that its input binding list is sorted in exchange-name
%% order, so that the grouping of bindings (for passing to
%% group_bindings_and_auto_delete1) works properly.
-group_bindings_and_auto_delete([], Acc) ->
+group_bindings_fold(_Fun, Acc, []) ->
Acc;
-group_bindings_and_auto_delete(
- [B = #binding{exchange_name = XName} | Bs], Acc) ->
- group_bindings_and_auto_delete(XName, Bs, [B], Acc).
-
-group_bindings_and_auto_delete(
- XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) ->
- group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc);
-group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) ->
- %% either Removed is [], or its head has a non-matching XName
- [X] = mnesia:read({rabbit_exchange, XName}),
- NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc],
- group_bindings_and_auto_delete(Removed, NewAcc).
+group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B]).
+
+group_bindings_fold(
+ Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]);
+group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
+ %% Either Removed is [], or its head has a non-matching SrcName.
+ group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
+
+maybe_auto_delete(XName, Bindings, Deletions) ->
+ case rabbit_exchange:lookup(XName) of
+ {error, not_found} ->
+ add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions);
+ {ok, X} ->
+ add_deletion(XName, {X, not_deleted, Bindings},
+ case rabbit_exchange:maybe_auto_delete(X) of
+ not_deleted -> Deletions;
+ {deleted, Deletions1} -> combine_deletions(
+ Deletions, Deletions1)
+ end)
+ end.
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
@@ -358,20 +381,59 @@ reverse_route(#route{binding = Binding}) ->
reverse_route(#reverse_route{reverse_binding = Binding}) ->
#route{binding = reverse_binding(Binding)}.
-reverse_binding(#reverse_binding{exchange_name = XName,
- queue_name = QueueName,
- key = Key,
- args = Args}) ->
- #binding{exchange_name = XName,
- queue_name = QueueName,
- key = Key,
- args = Args};
-
-reverse_binding(#binding{exchange_name = XName,
- queue_name = QueueName,
- key = Key,
- args = Args}) ->
- #reverse_binding{exchange_name = XName,
- queue_name = QueueName,
- key = Key,
- args = Args}.
+reverse_binding(#reverse_binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args}) ->
+ #binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args};
+
+reverse_binding(#binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args}) ->
+ #reverse_binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args}.
+
+%% ----------------------------------------------------------------------------
+%% Binding / exchange deletion abstraction API
+%% ----------------------------------------------------------------------------
+
+anything_but( NotThis, NotThis, NotThis) -> NotThis;
+anything_but( NotThis, NotThis, This) -> This;
+anything_but( NotThis, This, NotThis) -> This;
+anything_but(_NotThis, This, This) -> This.
+
+new_deletions() -> dict:new().
+
+add_deletion(XName, Entry, Deletions) ->
+ dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end,
+ Entry, Deletions).
+
+combine_deletions(Deletions1, Deletions2) ->
+ dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end,
+ Deletions1, Deletions2).
+
+merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
+ {anything_but(undefined, X1, X2),
+ anything_but(not_deleted, Deleted1, Deleted2),
+ [Bindings1 | Bindings2]}.
+
+process_deletions(Deletions) ->
+ dict:fold(
+ fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) ->
+ FlatBindings = lists:flatten(Bindings),
+ [rabbit_event:notify(binding_deleted, info(B)) ||
+ B <- FlatBindings],
+ TypeModule = type_to_module(Type),
+ case Deleted of
+ not_deleted -> TypeModule:remove_bindings(X, FlatBindings);
+ deleted -> rabbit_event:notify(exchange_deleted,
+ [{name, X#exchange.name}]),
+ TypeModule:delete(X, FlatBindings)
+ end
+ end, ok, Deletions).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index fe36cef9..58c8e341 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -368,6 +368,19 @@ expand_routing_key_shortcut(<<>>, <<>>,
expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
RoutingKey.
+expand_binding(queue, DestinationNameBin, RoutingKey, State) ->
+ {expand_queue_name_shortcut(DestinationNameBin, State),
+ expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State)};
+expand_binding(exchange, DestinationNameBin, RoutingKey, State) ->
+ {rabbit_misc:r(State#ch.virtual_host, exchange, DestinationNameBin),
+ RoutingKey}.
+
+check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
+ rabbit_misc:protocol_error(
+ access_refused, "operation not permitted on the default exchange", []);
+check_not_default_exchange(_) ->
+ ok.
+
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
@@ -681,6 +694,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
_, State = #ch{virtual_host = VHostPath}) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_not_default_exchange(ExchangeName),
check_configure_permitted(ExchangeName, State),
X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
@@ -709,6 +723,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_configure_permitted(ExchangeName, State),
+ check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName),
return_ok(State, NoWait, #'exchange.declare_ok'{});
@@ -717,6 +732,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
nowait = NoWait},
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_not_default_exchange(ExchangeName),
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
@@ -728,6 +744,24 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
+handle_method(#'exchange.bind'{destination = DestinationNameBin,
+ source = SourceNameBin,
+ routing_key = RoutingKey,
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_binding:add/2,
+ SourceNameBin, exchange, DestinationNameBin, RoutingKey,
+ Arguments, #'exchange.bind_ok'{}, NoWait, State);
+
+handle_method(#'exchange.unbind'{destination = DestinationNameBin,
+ source = SourceNameBin,
+ routing_key = RoutingKey,
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_binding:remove/2,
+ SourceNameBin, exchange, DestinationNameBin, RoutingKey,
+ Arguments, #'exchange.unbind_ok'{}, NoWait, State);
+
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = false,
durable = Durable,
@@ -819,7 +853,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
nowait = NoWait,
arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:add/2,
- ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
#'queue.bind_ok'{}, NoWait, State);
handle_method(#'queue.unbind'{queue = QueueNameBin,
@@ -827,7 +861,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:remove/2,
- ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
#'queue.unbind_ok'{}, false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
@@ -877,14 +911,14 @@ handle_method(#'channel.flow'{active = false}, _,
undefined -> start_limiter(State);
Other -> Other
end,
+ State1 = State#ch{limiter_pid = LimiterPid1},
ok = rabbit_limiter:block(LimiterPid1),
- QPids = consumer_queues(Consumers),
- Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids],
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- case Queues of
- [] -> {reply, #'channel.flow_ok'{active = false}, State};
- _ -> {noreply, State#ch{limiter_pid = LimiterPid1,
- blocking = dict:from_list(Queues)}}
+ case consumer_queues(Consumers) of
+ [] -> {reply, #'channel.flow_ok'{active = false}, State1};
+ QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
+ QPid <- QPids],
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ {noreply, State1#ch{blocking = dict:from_list(Queues)}}
end;
handle_method(_MethodRecord, _Content, _State) ->
@@ -893,42 +927,44 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
-binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
- ReturnMethod, NoWait,
+binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
+ RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
reader_pid = ReaderPid}) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
%% including the one named "" !
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
- check_write_permitted(QueueName, State),
- ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
- State),
+ {DestinationName, ActualRoutingKey} =
+ expand_binding(DestinationType, DestinationNameBin, RoutingKey, State),
+ check_write_permitted(DestinationName, State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
check_read_permitted(ExchangeName, State),
- case Fun(#binding{exchange_name = ExchangeName,
- queue_name = QueueName,
- key = ActualRoutingKey,
- args = Arguments},
- fun (_X, Q) ->
+ case Fun(#binding{source = ExchangeName,
+ destination = DestinationName,
+ key = ActualRoutingKey,
+ args = Arguments},
+ fun (_X, Q = #amqqueue{}) ->
try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
catch exit:Reason -> {error, Reason}
- end
+ end;
+ (_X, #exchange{}) ->
+ ok
end) of
- {error, exchange_not_found} ->
+ {error, source_not_found} ->
rabbit_misc:not_found(ExchangeName);
- {error, queue_not_found} ->
- rabbit_misc:not_found(QueueName);
- {error, exchange_and_queue_not_found} ->
+ {error, destination_not_found} ->
+ rabbit_misc:not_found(DestinationName);
+ {error, source_and_destination_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName),
- rabbit_misc:rs(QueueName)]);
+ rabbit_misc:rs(DestinationName)]);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
- rabbit_misc:rs(QueueName)]);
+ rabbit_misc:rs(DestinationName)]);
{error, #amqp_error{} = Error} ->
rabbit_misc:protocol_error(Error);
ok -> return_ok(State, NoWait, ReturnMethod)
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 01a893f6..6b212745 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -93,29 +93,29 @@ start() ->
end,
halt();
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- error("invalid command '~s'",
- [lists:flatten(
- rabbit_misc:intersperse(
- " ", [atom_to_list(Command) | Args]))]),
+ print_error("invalid command '~s'",
+ [lists:flatten(
+ rabbit_misc:intersperse(
+ " ", [atom_to_list(Command) | Args]))]),
usage();
{error, Reason} ->
- error("~p", [Reason]),
+ print_error("~p", [Reason]),
halt(2);
{badrpc, {'EXIT', Reason}} ->
- error("~p", [Reason]),
+ print_error("~p", [Reason]),
halt(2);
{badrpc, Reason} ->
- error("unable to connect to node ~w: ~w", [Node, Reason]),
+ print_error("unable to connect to node ~w: ~w", [Node, Reason]),
print_badrpc_diagnostics(Node),
halt(2);
Other ->
- error("~p", [Other]),
+ print_error("~p", [Other]),
halt(2)
end.
fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
-error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
+print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
print_badrpc_diagnostics(Node) ->
fmt_stderr("diagnostics:", []),
@@ -256,7 +256,8 @@ action(list_exchanges, Node, Args, Opts, Inform) ->
action(list_bindings, Node, Args, Opts, Inform) ->
Inform("Listing bindings", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- ArgAtoms = default_if_empty(Args, [exchange_name, queue_name,
+ ArgAtoms = default_if_empty(Args, [source_name, source_kind,
+ destination_name, destination_kind,
routing_key, arguments]),
display_info_list(rpc_call(Node, rabbit_binding, info_all,
[VHostArg, ArgAtoms]),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 2a19d5b1..46564233 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -82,8 +82,9 @@
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
rabbit_types:error('in_use')).
--spec(maybe_auto_delete/1:: (rabbit_types:exchange()) ->
- 'not_deleted' | 'auto_deleted').
+-spec(maybe_auto_delete/1::
+ (rabbit_types:exchange())
+ -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
-endif.
@@ -99,11 +100,11 @@ recover() ->
end, [], rabbit_durable_exchange),
Bs = rabbit_binding:recover(),
recover_with_bindings(
- lists:keysort(#binding.exchange_name, Bs),
+ lists:keysort(#binding.source, Bs),
lists:keysort(#exchange.name, Xs), []).
-recover_with_bindings([B = #binding{exchange_name = Name} | Rest],
- Xs = [#exchange{name = Name} | _],
+recover_with_bindings([B = #binding{source = XName} | Rest],
+ Xs = [#exchange{name = XName} | _],
Bindings) ->
recover_with_bindings(Rest, Xs, [B | Bindings]);
recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
@@ -225,38 +226,44 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-publish(X, Delivery) ->
- publish(X, [], Delivery).
-
-publish(X = #exchange{type = Type}, Seen, Delivery) ->
- case (type_to_module(Type)):publish(X, Delivery) of
- {_, []} = R ->
- #exchange{name = XName, arguments = Args} = X,
- case rabbit_misc:r_arg(XName, exchange, Args,
- <<"alternate-exchange">>) of
- undefined ->
- R;
- AName ->
- NewSeen = [XName | Seen],
- case lists:member(AName, NewSeen) of
- true -> R;
- false -> case lookup(AName) of
- {ok, AX} ->
- publish(AX, NewSeen, Delivery);
- {error, not_found} ->
- rabbit_log:warning(
- "alternate exchange for ~s "
- "does not exist: ~s",
- [rabbit_misc:rs(XName),
- rabbit_misc:rs(AName)]),
- R
- end
- end
- end;
- R ->
- R
+publish(X = #exchange{name = XName}, Delivery) ->
+ rabbit_router:deliver(
+ route(Delivery, {queue:from_list([X]), sets:from_list([XName]), []}),
+ Delivery).
+
+route(Delivery, {WorkList, SeenXs, QNames}) ->
+ case queue:out(WorkList) of
+ {empty, _WorkList} ->
+ lists:usort(QNames);
+ {{value, X = #exchange{type = Type}}, WorkList1} ->
+ DstNames = process_alternate(
+ X, ((type_to_module(Type)):route(X, Delivery))),
+ route(Delivery,
+ lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames},
+ DstNames))
end.
+process_alternate(#exchange{name = XName, arguments = Args}, []) ->
+ case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
+ undefined -> [];
+ AName -> [AName]
+ end;
+process_alternate(_X, Results) ->
+ Results.
+
+process_route(#resource{kind = exchange} = XName,
+ {WorkList, SeenXs, QNames} = Acc) ->
+ case sets:is_element(XName, SeenXs) of
+ true -> Acc;
+ false -> {case lookup(XName) of
+ {ok, X} -> queue:in(X, WorkList);
+ {error, not_found} -> WorkList
+ end, sets:add_element(XName, SeenXs), QNames}
+ end;
+process_route(#resource{kind = queue} = QName,
+ {WorkList, SeenXs, QNames}) ->
+ {WorkList, SeenXs, [QName | QNames]}.
+
call_with_exchange(XName, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
@@ -271,9 +278,10 @@ delete(XName, IfUnused) ->
false -> fun unconditional_delete/1
end,
case call_with_exchange(XName, Fun) of
- {deleted, X = #exchange{type = Type}, Bs} ->
- (type_to_module(Type)):delete(X, Bs),
- ok;
+ {deleted, X, Bs, Deletions} ->
+ ok = rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions));
Error = {error, _InUseOrNotFound} ->
Error
end.
@@ -282,19 +290,18 @@ maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
maybe_auto_delete(#exchange{auto_delete = true} = X) ->
case conditional_delete(X) of
- {error, in_use} -> not_deleted;
- {deleted, X, []} -> auto_deleted
+ {error, in_use} -> not_deleted;
+ {deleted, X, [], Deletions} -> {deleted, Deletions}
end.
conditional_delete(X = #exchange{name = XName}) ->
- case rabbit_binding:has_for_exchange(XName) of
+ case rabbit_binding:has_for_source(XName) of
false -> unconditional_delete(X);
true -> {error, in_use}
end.
unconditional_delete(X = #exchange{name = XName}) ->
- Bindings = rabbit_binding:remove_for_exchange(XName),
ok = mnesia:delete({rabbit_durable_exchange, XName}),
ok = mnesia:delete({rabbit_exchange, XName}),
- rabbit_event:notify(exchange_deleted, [{name, XName}]),
- {deleted, X, Bindings}.
+ Bindings = rabbit_binding:remove_for_source(XName),
+ {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 85760edc..742944dc 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -36,7 +36,7 @@
behaviour_info(callbacks) ->
[
{description, 0},
- {publish, 2},
+ {route, 2},
%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
{validate, 1},
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 4f6eb851..d934a497 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -34,7 +34,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, publish/2]).
+-export([description/0, route/2]).
-export([validate/1, create/1, recover/2, delete/2,
add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -50,10 +50,9 @@ description() ->
[{name, <<"direct">>},
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
-publish(#exchange{name = Name}, Delivery =
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey),
- Delivery).
+route(#exchange{name = Name},
+ #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
+ rabbit_router:match_routing_key(Name, RoutingKey).
validate(_X) -> ok.
create(_X) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 94798c78..77ca9686 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -34,7 +34,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, publish/2]).
+-export([description/0, route/2]).
-export([validate/1, create/1, recover/2, delete/2, add_binding/2,
remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -50,8 +50,8 @@ description() ->
[{name, <<"fanout">>},
{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
-publish(#exchange{name = Name}, Delivery) ->
- rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery).
+route(#exchange{name = Name}, _Delivery) ->
+ rabbit_router:match_routing_key(Name, '_').
validate(_X) -> ok.
create(_X) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 0a59a175..ec9e7ba4 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -35,7 +35,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, publish/2]).
+-export([description/0, route/2]).
-export([validate/1, create/1, recover/2, delete/2, add_binding/2,
remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -56,17 +56,14 @@ description() ->
[{name, <<"headers">>},
{description, <<"AMQP headers exchange, as per the AMQP specification">>}].
-publish(#exchange{name = Name},
- Delivery = #delivery{message = #basic_message{content = Content}}) ->
+route(#exchange{name = Name},
+ #delivery{message = #basic_message{content = Content}}) ->
Headers = case (Content#content.properties)#'P_basic'.headers of
undefined -> [];
H -> rabbit_misc:sort_field_table(H)
end,
- rabbit_router:deliver(rabbit_router:match_bindings(
- Name, fun (#binding{args = Spec}) ->
- headers_match(Spec, Headers)
- end),
- Delivery).
+ rabbit_router:match_bindings(
+ Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end).
default_headers_match_kind() -> all.
@@ -79,7 +76,7 @@ parse_x_match(Other) ->
%% Horrendous matching algorithm. Depends for its merge-like
%% (linear-time) behaviour on the lists:keysort
-%% (rabbit_misc:sort_field_table) that publish/1 and
+%% (rabbit_misc:sort_field_table) that route/1 and
%% rabbit_binding:{add,remove}/2 do.
%%
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index e796acf3..d3ecdd4d 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -34,7 +34,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, publish/2]).
+-export([description/0, route/2]).
-export([validate/1, create/1, recover/2, delete/2, add_binding/2,
remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -58,13 +58,12 @@ description() ->
[{name, <<"topic">>},
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
-publish(#exchange{name = Name}, Delivery =
+route(#exchange{name = Name},
#delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:deliver(rabbit_router:match_bindings(
- Name, fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end),
- Delivery).
+ rabbit_router:match_bindings(Name,
+ fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end).
split_topic_key(Key) ->
string:tokens(binary_to_list(Key), ".").
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index d35adf16..577d206d 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -218,13 +218,15 @@ table_definitions() ->
{match, #amqqueue{name = queue_name_match(), _='_'}}]}].
binding_match() ->
- #binding{queue_name = queue_name_match(),
- exchange_name = exchange_name_match(),
+ #binding{source = exchange_name_match(),
+ destination = binding_destination_match(),
_='_'}.
reverse_binding_match() ->
- #reverse_binding{queue_name = queue_name_match(),
- exchange_name = exchange_name_match(),
+ #reverse_binding{destination = binding_destination_match(),
+ source = exchange_name_match(),
_='_'}.
+binding_destination_match() ->
+ resource_match('_').
exchange_name_match() ->
resource_match(exchange).
queue_name_match() ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 81d3c501..66cc06cf 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,9 +33,9 @@
-behaviour(gen_server2).
--export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/2, client_terminate/2,
- client_delete_and_terminate/3, successfully_recovered_state/1]).
+-export([start_link/4, successfully_recovered_state/1,
+ client_init/2, client_terminate/2, client_delete_and_terminate/3,
+ write/4, read/3, contains/2, remove/2, release/2, sync/3]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -123,6 +123,11 @@
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
+-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
+-spec(client_init/2 :: (server(), binary()) -> client_msstate()).
+-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
+-spec(client_delete_and_terminate/3 ::
+ (client_msstate(), server(), binary()) -> 'ok').
-spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) ->
rabbit_types:ok(client_msstate())).
-spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) ->
@@ -131,15 +136,11 @@
-spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
-spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
-spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok').
+
+-spec(sync/1 :: (server()) -> 'ok').
-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
--spec(client_init/2 :: (server(), binary()) -> client_msstate()).
--spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
--spec(client_delete_and_terminate/3 ::
- (client_msstate(), server(), binary()) -> 'ok').
--spec(successfully_recovered_state/1 :: (server()) -> boolean()).
-
-spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
{ets:tid(), file:filename(), atom(), any()}) ->
'concurrent_readers' | non_neg_integer()).
@@ -308,6 +309,31 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
[Server, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).
+successfully_recovered_state(Server) ->
+ gen_server2:call(Server, successfully_recovered_state, infinity).
+
+client_init(Server, Ref) ->
+ {IState, IModule, Dir, GCPid,
+ FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
+ gen_server2:call(Server, {new_client_state, Ref}, infinity),
+ #client_msstate { file_handle_cache = dict:new(),
+ index_state = IState,
+ index_module = IModule,
+ dir = Dir,
+ gc_pid = GCPid,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts }.
+
+client_terminate(CState, Server) ->
+ close_all_handles(CState),
+ ok = gen_server2:call(Server, client_terminate, infinity).
+
+client_delete_and_terminate(CState, Server, Ref) ->
+ close_all_handles(CState),
+ ok = gen_server2:cast(Server, {client_delete, Ref}).
+
write(Server, Guid, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
@@ -345,7 +371,9 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
release(_Server, []) -> ok;
release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
-sync(Server) -> gen_server2:cast(Server, sync). %% internal
+
+sync(Server) ->
+ gen_server2:cast(Server, sync).
gc_done(Server, Reclaimed, Source, Destination) ->
gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}).
@@ -353,31 +381,6 @@ gc_done(Server, Reclaimed, Source, Destination) ->
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
-client_init(Server, Ref) ->
- {IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:call(Server, {new_client_state, Ref}, infinity),
- #client_msstate { file_handle_cache = dict:new(),
- index_state = IState,
- index_module = IModule,
- dir = Dir,
- gc_pid = GCPid,
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }.
-
-client_terminate(CState, Server) ->
- close_all_handles(CState),
- ok = gen_server2:call(Server, client_terminate, infinity).
-
-client_delete_and_terminate(CState, Server, Ref) ->
- close_all_handles(CState),
- ok = gen_server2:cast(Server, {client_delete, Ref}).
-
-successfully_recovered_state(Server) ->
- gen_server2:call(Server, successfully_recovered_state, infinity).
-
%%----------------------------------------------------------------------------
%% Client-side-only helpers
%%----------------------------------------------------------------------------
@@ -577,8 +580,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- {new_client_state, _Ref} -> 7;
successfully_recovered_state -> 7;
+ {new_client_state, _Ref} -> 7;
{read, _Guid} -> 2;
_ -> 0
end.
@@ -591,13 +594,8 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-handle_call({read, Guid}, From, State) ->
- State1 = read_message(Guid, From, State),
- noreply(State1);
-
-handle_call({contains, Guid}, From, State) ->
- State1 = contains_message(Guid, From, State),
- noreply(State1);
+handle_call(successfully_recovered_state, _From, State) ->
+ reply(State #msstate.successfully_recovered, State);
handle_call({new_client_state, CRef}, _From,
State = #msstate { dir = Dir,
@@ -613,11 +611,21 @@ handle_call({new_client_state, CRef}, _From,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
-handle_call(successfully_recovered_state, _From, State) ->
- reply(State #msstate.successfully_recovered, State);
-
handle_call(client_terminate, _From, State) ->
- reply(ok, State).
+ reply(ok, State);
+
+handle_call({read, Guid}, From, State) ->
+ State1 = read_message(Guid, From, State),
+ noreply(State1);
+
+handle_call({contains, Guid}, From, State) ->
+ State1 = contains_message(Guid, From, State),
+ noreply(State1).
+
+handle_cast({client_delete, CRef},
+ State = #msstate { client_refs = ClientRefs }) ->
+ noreply(
+ State #msstate { client_refs = sets:del_element(CRef, ClientRefs) });
handle_cast({write, Guid},
State = #msstate { sum_valid_data = SumValid,
@@ -712,12 +720,7 @@ handle_cast({gc_done, Reclaimed, Src, Dst},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State);
-
-handle_cast({client_delete, CRef},
- State = #msstate { client_refs = ClientRefs }) ->
- noreply(
- State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
+ noreply(State).
handle_info(timeout, State) ->
noreply(internal_sync(State));
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 5cfd6a5c..b48d0aa3 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -64,20 +64,20 @@ start() ->
io:format("done.~n"),
halt();
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- error("invalid command '~s'",
- [lists:flatten(
- rabbit_misc:intersperse(" ", FullCommand))]),
+ print_error("invalid command '~s'",
+ [lists:flatten(
+ rabbit_misc:intersperse(" ", FullCommand))]),
usage();
timeout ->
- error("timeout starting some nodes.", []),
+ print_error("timeout starting some nodes.", []),
halt(1);
Other ->
- error("~p", [Other]),
+ print_error("~p", [Other]),
halt(2)
end
end.
-error(Format, Args) ->
+print_error(Format, Args) ->
rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
parse_args([Command | Args]) ->
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index c38ef8d2..88300ab4 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -72,7 +72,8 @@ start() ->
%% applications along the way
AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of
{failed_to_load_app, App, Err} ->
- error("failed to load application ~s:~n~p", [App, Err]);
+ terminate("failed to load application ~s:~n~p",
+ [App, Err]);
AppList ->
AppList
end,
@@ -116,19 +117,20 @@ start() ->
end,
ok;
{error, Module, Error} ->
- error("generation of boot script file ~s failed:~n~s",
- [ScriptFile, Module:format_error(Error)])
+ terminate("generation of boot script file ~s failed:~n~s",
+ [ScriptFile, Module:format_error(Error)])
end,
case post_process_script(ScriptFile) of
ok -> ok;
{error, Reason} ->
- error("post processing of boot script file ~s failed:~n~w",
- [ScriptFile, Reason])
+ terminate("post processing of boot script file ~s failed:~n~w",
+ [ScriptFile, Reason])
end,
case systools:script2boot(RootName) of
ok -> ok;
- error -> error("failed to compile boot script file ~s", [ScriptFile])
+ error -> terminate("failed to compile boot script file ~s",
+ [ScriptFile])
end,
io:format("~w plugins activated:~n", [length(PluginApps)]),
[io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)])
@@ -190,11 +192,11 @@ unpack_ez_plugins(SrcDir, DestDir) ->
%% Eliminate the contents of the destination directory
case delete_recursively(DestDir) of
ok -> ok;
- {error, E} -> error("Could not delete dir ~s (~p)", [DestDir, E])
+ {error, E} -> terminate("Could not delete dir ~s (~p)", [DestDir, E])
end,
case filelib:ensure_dir(DestDir ++ "/") of
ok -> ok;
- {error, E2} -> error("Could not create dir ~s (~p)", [DestDir, E2])
+ {error, E2} -> terminate("Could not create dir ~s (~p)", [DestDir, E2])
end,
[unpack_ez_plugin(PluginName, DestDir) ||
PluginName <- filelib:wildcard(SrcDir ++ "/*.ez")].
@@ -261,6 +263,6 @@ process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) ->
process_entry(Entry) ->
[Entry].
-error(Fmt, Args) ->
+terminate(Fmt, Args) ->
io:format("ERROR: " ++ Fmt ++ "~n", Args),
halt(1).
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 0b8efc8f..6ac402c8 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {queues}).
+-record(state, {queues, delete_from}).
-include("rabbit.hrl").
@@ -66,32 +66,39 @@ delete_all(CollectorPid) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state{queues = dict:new()}}.
+ {ok, #state{queues = dict:new(), delete_from = undefined}}.
%%--------------------------------------------------------------------------
handle_call({register, Q}, _From,
- State = #state{queues = Queues}) ->
+ State = #state{queues = Queues, delete_from = Deleting}) ->
MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
- {reply, ok,
- State#state{queues = dict:store(MonitorRef, Q, Queues)}};
-
-handle_call(delete_all, _From, State = #state{queues = Queues}) ->
- [rabbit_misc:with_exit_handler(
- fun () -> ok end,
- fun () ->
- erlang:demonitor(MonitorRef),
- rabbit_amqqueue:delete_exclusive(Q)
- end)
- || {MonitorRef, Q} <- dict:to_list(Queues)],
- {reply, ok, State}.
+ case Deleting of
+ undefined -> ok;
+ _ -> rabbit_amqqueue:delete_immediately(Q)
+ end,
+ {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}};
+
+handle_call(delete_all, From, State = #state{queues = Queues,
+ delete_from = undefined}) ->
+ case dict:size(Queues) of
+ 0 -> {reply, ok, State#state{delete_from = From}};
+ _ -> [rabbit_amqqueue:delete_immediately(Q)
+ || {_MRef, Q} <- dict:to_list(Queues)],
+ {noreply, State#state{delete_from = From}}
+ end.
handle_cast(Msg, State) ->
{stop, {unhandled_cast, Msg}, State}.
handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
- State = #state{queues = Queues}) ->
- {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}.
+ State = #state{queues = Queues, delete_from = Deleting}) ->
+ Queues1 = dict:erase(MonitorRef, Queues),
+ case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of
+ true -> gen_server:reply(Deleting, ok);
+ false -> ok
+ end,
+ {noreply, State#state{queues = Queues1}}.
terminate(_Reason, _State) ->
ok.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 39eac072..00df1ce1 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -39,26 +39,27 @@
-ifdef(use_specs).
--export_type([routing_key/0, routing_result/0]).
+-export_type([routing_key/0, routing_result/0, match_result/0]).
-type(routing_key() :: binary()).
-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-type(qpids() :: [pid()]).
+-type(match_result() :: [rabbit_types:binding_destination()]).
--spec(deliver/2 ::
- (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}).
--spec(match_bindings/2 :: (rabbit_exchange:name(),
+-spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
+-spec(match_bindings/2 :: (rabbit_types:binding_source(),
fun ((rabbit_types:binding()) -> boolean())) ->
- qpids()).
--spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') ->
- qpids()).
+ match_result()).
+-spec(match_routing_key/2 :: (rabbit_types:binding_source(),
+ routing_key() | '_') -> match_result()).
-endif.
%%----------------------------------------------------------------------------
-deliver(QPids, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
+deliver(QNames, Delivery = #delivery{mandatory = false,
+ immediate = false}) ->
%% optimisation: when Mandatory = false and Immediate = false,
%% rabbit_amqqueue:deliver will deliver the message to the queue
%% process asynchronously, and return true, which means all the
@@ -66,11 +67,13 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
%% fire-and-forget cast here and return the QPids - the semantics
%% is preserved. This scales much better than the non-immediate
%% case below.
+ QPids = lookup_qpids(QNames),
delegate:invoke_no_result(
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};
-deliver(QPids, Delivery) ->
+deliver(QNames, Delivery) ->
+ QPids = lookup_qpids(QNames),
{Success, _} =
delegate:invoke(QPids,
fun (Pid) ->
@@ -82,22 +85,23 @@ deliver(QPids, Delivery) ->
{Routed, Handled}).
%% TODO: Maybe this should be handled by a cursor instead.
-%% TODO: This causes a full scan for each entry with the same exchange
-match_bindings(Name, Match) ->
- Query = qlc:q([QName || #route{binding = Binding = #binding{
- exchange_name = XName,
- queue_name = QName}} <-
- mnesia:table(rabbit_route),
- XName == Name,
- Match(Binding)]),
- lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])).
-
-match_routing_key(Name, RoutingKey) ->
- MatchHead = #route{binding = #binding{exchange_name = Name,
- queue_name = '$1',
- key = RoutingKey,
- _ = '_'}},
- lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
+%% TODO: This causes a full scan for each entry with the same source
+match_bindings(SrcName, Match) ->
+ Query = qlc:q([DestinationName ||
+ #route{binding = Binding = #binding{
+ source = SrcName1,
+ destination = DestinationName}} <-
+ mnesia:table(rabbit_route),
+ SrcName == SrcName1,
+ Match(Binding)]),
+ mnesia:async_dirty(fun qlc:e/1, [Query]).
+
+match_routing_key(SrcName, RoutingKey) ->
+ MatchHead = #route{binding = #binding{source = SrcName,
+ destination = '$1',
+ key = RoutingKey,
+ _ = '_'}},
+ mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]).
%%--------------------------------------------------------------------
@@ -115,4 +119,4 @@ lookup_qpids(QNames) ->
[#amqqueue{pid = QPid}] -> [QPid | QPids];
[] -> QPids
end
- end, [], lists:usort(QNames)).
+ end, [], QNames).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 9de17022..ba46d95d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1035,11 +1035,11 @@ test_server_status() ->
%% list bindings
ok = info_action(list_bindings, rabbit_binding:info_keys(), true),
%% misc binding listing APIs
- [_|_] = rabbit_binding:list_for_exchange(
+ [_|_] = rabbit_binding:list_for_source(
rabbit_misc:r(<<"/">>, exchange, <<"">>)),
- [_] = rabbit_binding:list_for_queue(
+ [_] = rabbit_binding:list_for_destination(
rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
- [_] = rabbit_binding:list_for_exchange_and_queue(
+ [_] = rabbit_binding:list_for_source_and_destination(
rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 0b6a15ec..b971a63f 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -39,9 +39,11 @@
delivery/0, content/0, decoded_content/0, undecoded_content/0,
unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
amqp_error/0, r/1, r2/2, r3/3, listener/0,
- binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
- user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2,
- ok_pid_or_error/0, channel_exit/0, connection_exit/0]).
+ binding/0, binding_source/0, binding_destination/0,
+ amqqueue/0, exchange/0,
+ connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1,
+ ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
+ connection_exit/0]).
-type(channel_exit() :: no_return()).
-type(connection_exit() :: no_return()).
@@ -113,11 +115,14 @@
host :: rabbit_networking:hostname(),
port :: rabbit_networking:ip_port()}).
+-type(binding_source() :: rabbit_exchange:name()).
+-type(binding_destination() :: rabbit_amqqueue:name() | rabbit_exchange:name()).
+
-type(binding() ::
- #binding{exchange_name :: rabbit_exchange:name(),
- queue_name :: rabbit_amqqueue:name(),
- key :: rabbit_binding:key(),
- args :: rabbit_framing:amqp_table()}).
+ #binding{source :: rabbit_exchange:name(),
+ destination :: binding_destination(),
+ key :: rabbit_binding:key(),
+ args :: rabbit_framing:amqp_table()}).
-type(amqqueue() ::
#amqqueue{name :: rabbit_amqqueue:name(),