diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-10-13 16:36:33 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-10-13 16:36:33 +0100 |
commit | 5e3e05135c5640e124ea215c41454d77fb6407c2 (patch) | |
tree | 19978721c55f056ee3598e498ad2a5a23d666b4c | |
parent | 343c47663a44feaae25e3bd63e8d1f7cd3c6dc51 (diff) | |
parent | 088c1f341fd3c5fb4f8bbcff8ecf0fd7bf86b1be (diff) | |
download | rabbitmq-server-5e3e05135c5640e124ea215c41454d77fb6407c2.tar.gz |
Merge default into bug21377
-rw-r--r-- | docs/rabbitmqctl.1.xml | 31 | ||||
-rw-r--r-- | include/rabbit.hrl | 4 | ||||
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 35 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 342 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 64 | ||||
-rw-r--r-- | src/rabbit_control.erl | 3 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 95 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 9 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 15 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 13 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 10 | ||||
-rw-r--r-- | src/rabbit_router.erl | 58 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
-rw-r--r-- | src/rabbit_types.erl | 19 |
17 files changed, 413 insertions, 303 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 73882861..3b7244c7 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -894,16 +894,31 @@ </para> <variablelist> <varlistentry> - <term>exchange_name</term> - <listitem><para>The name of the exchange to which the - binding is attached. with non-ASCII characters - escaped as in C.</para></listitem> + <term>source_name</term> + <listitem><para>The name of the source of messages to + which the binding is attached. With non-ASCII + characters escaped as in C.</para></listitem> </varlistentry> <varlistentry> - <term>queue_name</term> - <listitem><para>The name of the queue to which the - binding is attached. with non-ASCII characters - escaped as in C.</para></listitem> + <term>source_kind</term> + <listitem><para>The kind of the source of messages to + which the binding is attached. Currently always + queue. With non-ASCII characters escaped as in + C.</para></listitem> + </varlistentry> + <varlistentry> + <term>destination_name</term> + <listitem><para>The name of the destination of + messages to which the binding is attached. With + non-ASCII characters escaped as in + C.</para></listitem> + </varlistentry> + <varlistentry> + <term>destination_kind</term> + <listitem><para>The kind of the destination of + messages to which the binding is attached. With + non-ASCII characters escaped as in + C.</para></listitem> </varlistentry> <varlistentry> <term>routing_key</term> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 73a8ad97..af6e257a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -60,8 +60,8 @@ -record(route, {binding, value = const}). -record(reverse_route, {reverse_binding, value = const}). --record(binding, {exchange_name, key, queue_name, args = []}). --record(reverse_binding, {queue_name, key, exchange_name, args = []}). +-record(binding, {source, key, destination, args = []}). +-record(reverse_binding, {destination, key, source, args = []}). -record(listener, {node, protocol, host, port}). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index cecd666b..ae326a87 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -31,8 +31,8 @@ -ifdef(use_specs). -spec(description/0 :: () -> [{atom(), any()}]). --spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) - -> {rabbit_router:routing_result(), [pid()]}). +-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) + -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). -spec(create/1 :: (rabbit_types:exchange()) -> 'ok'). -spec(recover/2 :: (rabbit_types:exchange(), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 24320f51..6dcd04d5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -254,10 +254,10 @@ start_queue_process(Q) -> add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_binding:add(#binding{exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = []}). + rabbit_binding:add(#binding{source = ExchangeName, + destination = QueueName, + key = RoutingKey, + args = []}). lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). @@ -439,7 +439,7 @@ internal_delete1(QueueName) -> ok = mnesia:delete({rabbit_durable_queue, QueueName}), %% we want to execute some things, as decided by rabbit_exchange, %% after the transaction. - rabbit_binding:remove_for_queue(QueueName). + rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> case rabbit_misc:execute_mnesia_transaction( @@ -450,8 +450,7 @@ internal_delete(QueueName) -> end end) of {error, _} = Err -> Err; - PostHook -> PostHook(), - ok + Deletions -> ok = rabbit_binding:process_deletions(Deletions) end. maybe_run_queue_via_backing_queue(QPid, Fun) -> @@ -470,19 +469,20 @@ maybe_expire(QPid) -> gen_server2:cast(QPid, maybe_expire). on_node_down(Node) -> - [Hook() || - Hook <- rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end)], - ok. + rabbit_binding:process_deletions( + lists:foldl( + fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + rabbit_misc:execute_mnesia_transaction( + fun () -> qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end))). delete_queue(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), - rabbit_binding:remove_transient_for_queue(QueueName). + rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, @@ -508,4 +508,3 @@ delegate_call(Pid, Msg, Timeout) -> delegate_cast(Pid, Msg) -> delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). - diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 19150fa9..53c9c663 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -33,29 +33,35 @@ -include("rabbit.hrl"). -export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). --export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([list_for_source/1, list_for_destination/1, + list_for_source_and_destination/2]). +-export([new_deletions/0, combine_deletions/2, add_deletion/3, + process_deletions/1]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx --export([has_for_exchange/1, remove_for_exchange/1, - remove_for_queue/1, remove_transient_for_queue/1]). +-export([has_for_source/1, remove_for_source/1, + remove_for_destination/1, remove_transient_for_destination/1]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([key/0]). +-export_type([key/0, deletions/0]). -type(key() :: binary()). --type(bind_errors() :: rabbit_types:error('queue_not_found' | - 'exchange_not_found' | - 'exchange_and_queue_not_found')). +-type(bind_errors() :: rabbit_types:error('source_not_found' | + 'destination_not_found' | + 'source_and_destination_not_found')). -type(bind_res() :: 'ok' | bind_errors()). -type(inner_fun() :: - fun((rabbit_types:exchange(), queue()) -> + fun((rabbit_types:exchange(), + rabbit_types:exchange() | rabbit_types:amqqueue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). +-opaque(deletions() :: dict:dictionary()). + -spec(recover/0 :: () -> [rabbit_types:binding()]). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). -spec(add/1 :: (rabbit_types:binding()) -> bind_res()). @@ -65,10 +71,13 @@ -spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). --spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). --spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). --spec(list_for_exchange_and_queue/2 :: - (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_source/1 :: + (rabbit_types:binding_source()) -> bindings()). +-spec(list_for_destination/1 :: + (rabbit_types:binding_destination()) -> bindings()). +-spec(list_for_source_and_destination/2 :: + (rabbit_types:binding_source(), rabbit_types:binding_destination()) -> + bindings()). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). -spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> @@ -76,18 +85,27 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). -spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) -> [[rabbit_types:info()]]). --spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). --spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). --spec(remove_for_queue/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). --spec(remove_transient_for_queue/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). +-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). +-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). +-spec(remove_for_destination/1 :: + (rabbit_types:binding_destination()) -> deletions()). +-spec(remove_transient_for_destination/1 :: + (rabbit_types:binding_destination()) -> deletions()). +-spec(process_deletions/1 :: (deletions()) -> 'ok'). +-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). +-spec(add_deletion/3 :: (rabbit_exchange:name(), + {'undefined' | rabbit_types:binding_source(), + 'deleted' | 'not_deleted', + deletions()}, deletions()) -> deletions()). +-spec(new_deletions/0 :: () -> deletions()). -endif. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). +-define(INFO_KEYS, [source_name, source_kind, + destination_name, destination_kind, + routing_key, arguments]). recover() -> rabbit_misc:table_fold( @@ -101,36 +119,34 @@ recover() -> exists(Binding) -> binding_action( Binding, - fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end). + fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end). -add(Binding) -> add(Binding, fun (_X, _Q) -> ok end). +add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end). -remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end). +remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). add(Binding, InnerFun) -> case binding_action( Binding, - fun (X, Q, B) -> + fun (Src, Dst, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to %% anything else - case InnerFun(X, Q) of + case InnerFun(Src, Dst) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> Durable = (X#exchange.durable andalso - Q#amqqueue.durable), - ok = sync_binding( - B, Durable, + [] -> ok = sync_binding( + B, all_durable([Src, Dst]), fun mnesia:write/3), - {new, X, B}; - [_] -> {existing, X, B} + {new, Src, B}; + [_] -> {existing, Src, B} end; {error, _} = E -> E end end) of - {new, X = #exchange{ type = Type }, B} -> - ok = (type_to_module(Type)):add_binding(X, B), + {new, Src = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(Src, B), rabbit_event:notify(binding_created, info(B)); {existing, _, _} -> ok; @@ -141,61 +157,55 @@ add(Binding, InnerFun) -> remove(Binding, InnerFun) -> case binding_action( Binding, - fun (X, Q, B) -> + fun (Src, Dst, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of - [] -> {error, binding_not_found}; - [_] -> case InnerFun(X, Q) of - ok -> - Durable = (X#exchange.durable andalso - Q#amqqueue.durable), - ok = sync_binding( - B, Durable, - fun mnesia:delete_object/3), - Deleted = - rabbit_exchange:maybe_auto_delete(X), - {{Deleted, X}, B}; - {error, _} = E -> - E - end + [] -> + {error, binding_not_found}; + [_] -> + case InnerFun(Src, Dst) of + ok -> + ok = sync_binding( + B, all_durable([Src, Dst]), + fun mnesia:delete_object/3), + {ok, + maybe_auto_delete(B#binding.source, + [B], new_deletions())}; + {error, _} = E -> + E + end end end) of {error, _} = Err -> Err; - {{IsDeleted, X = #exchange{ type = Type }}, B} -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> ok = Module:delete(X, [B]); - not_deleted -> ok = Module:remove_bindings(X, [B]) - end, - rabbit_event:notify(binding_deleted, info(B)), - ok + {ok, Deletions} -> + ok = process_deletions(Deletions) end. list(VHostPath) -> - Route = #route{binding = #binding{ - exchange_name = rabbit_misc:r(VHostPath, exchange), - queue_name = rabbit_misc:r(VHostPath, queue), - _ = '_'}, + VHostResource = rabbit_misc:r(VHostPath, '_'), + Route = #route{binding = #binding{source = VHostResource, + destination = VHostResource, + _ = '_'}, _ = '_'}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_exchange(XName) -> - Route = #route{binding = #binding{exchange_name = XName, _ = '_'}}, +list_for_source(SrcName) -> + Route = #route{binding = #binding{source = SrcName, _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_queue(QueueName) -> - Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, +list_for_destination(DstName) -> + Route = #route{binding = #binding{destination = DstName, _ = '_'}}, [reverse_binding(B) || #reverse_route{reverse_binding = B} <- mnesia:dirty_match_object(rabbit_reverse_route, reverse_route(Route))]. -list_for_exchange_and_queue(XName, QueueName) -> - Route = #route{binding = #binding{exchange_name = XName, - queue_name = QueueName, - _ = '_'}}, +list_for_source_and_destination(SrcName, DstName) -> + Route = #route{binding = #binding{source = SrcName, + destination = DstName, + _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. @@ -208,10 +218,12 @@ map(VHostPath, F) -> infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. -i(exchange_name, #binding{exchange_name = XName}) -> XName; -i(queue_name, #binding{queue_name = QName}) -> QName; -i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; -i(arguments, #binding{args = Arguments}) -> Arguments; +i(source_name, #binding{source = SrcName}) -> SrcName#resource.name; +i(source_kind, #binding{source = SrcName}) -> SrcName#resource.kind; +i(destination_name, #binding{destination = DstName}) -> DstName#resource.name; +i(destination_kind, #binding{destination = DstName}) -> DstName#resource.kind; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). info(B = #binding{}) -> infos(?INFO_KEYS, B). @@ -222,14 +234,14 @@ info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). -has_for_exchange(XName) -> - Match = #route{binding = #binding{exchange_name = XName, _ = '_'}}, +has_for_source(SrcName) -> + Match = #route{binding = #binding{source = SrcName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). -remove_for_exchange(XName) -> +remove_for_source(SrcName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), @@ -237,26 +249,31 @@ remove_for_exchange(XName) -> Route#route.binding end || Route <- mnesia:match_object( rabbit_route, - #route{binding = #binding{exchange_name = XName, - _ = '_'}}, + #route{binding = #binding{source = SrcName, + _ = '_'}}, write)]. -remove_for_queue(QueueName) -> - remove_for_queue(QueueName, fun delete_forward_routes/1). +remove_for_destination(DstName) -> + remove_for_destination(DstName, fun delete_forward_routes/1). -remove_transient_for_queue(QueueName) -> - remove_for_queue(QueueName, fun delete_transient_forward_routes/1). +remove_transient_for_destination(DstName) -> + remove_for_destination(DstName, fun delete_transient_forward_routes/1). %%---------------------------------------------------------------------------- -binding_action(Binding = #binding{exchange_name = XName, - queue_name = QueueName, - args = Arguments}, Fun) -> - call_with_exchange_and_queue( - XName, QueueName, - fun (X, Q) -> +all_durable(Resources) -> + lists:all(fun (#exchange{durable = D}) -> D; + (#amqqueue{durable = D}) -> D + end, Resources). + +binding_action(Binding = #binding{source = SrcName, + destination = DstName, + args = Arguments}, Fun) -> + call_with_source_and_destination( + SrcName, DstName, + fun (Src, Dst) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), - Fun(X, Q, Binding#binding{args = SortedArgs}) + Fun(Src, Dst, Binding#binding{args = SortedArgs}) end). sync_binding(Binding, Durable, Fun) -> @@ -270,17 +287,22 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_exchange_and_queue(XName, QueueName, Fun) -> +call_with_source_and_destination(SrcName, DstName, Fun) -> + SrcTable = table_for_resource(SrcName), + DstTable = table_for_resource(DstName), rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({rabbit_exchange, XName}), - mnesia:read({rabbit_queue, QueueName})} of - {[X], [Q]} -> Fun(X, Q); - {[ ], [_]} -> {error, exchange_not_found}; - {[_], [ ]} -> {error, queue_not_found}; - {[ ], [ ]} -> {error, exchange_and_queue_not_found} - end + fun () -> case {mnesia:read({SrcTable, SrcName}), + mnesia:read({DstTable, DstName})} of + {[Src], [Dst]} -> Fun(Src, Dst); + {[], [_] } -> {error, source_not_found}; + {[_], [] } -> {error, destination_not_found}; + {[], [] } -> {error, source_and_destination_not_found} + end end). +table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; +table_for_resource(#resource{kind = queue}) -> rabbit_queue. + %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), @@ -293,8 +315,8 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). -remove_for_queue(QueueName, FwdDeleteFun) -> - DeletedBindings = +remove_for_destination(DstName, FwdDeleteFun) -> + Bindings = [begin Route = reverse_route(ReverseRoute), ok = FwdDeleteFun(Route), @@ -304,40 +326,41 @@ remove_for_queue(QueueName, FwdDeleteFun) -> end || ReverseRoute <- mnesia:match_object( rabbit_reverse_route, - reverse_route(#route{binding = #binding{ - queue_name = QueueName, - _ = '_'}}), + reverse_route(#route{ + binding = #binding{ + destination = DstName, + _ = '_'}}), write)], - Grouped = group_bindings_and_auto_delete( - lists:keysort(#binding.exchange_name, DeletedBindings), []), - fun () -> - lists:foreach( - fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> Module:delete(X, Bs); - not_deleted -> Module:remove_bindings(X, Bs) - end - end, Grouped) - end. + group_bindings_fold(fun maybe_auto_delete/3, new_deletions(), + lists:keysort(#binding.source, Bindings)). %% Requires that its input binding list is sorted in exchange-name %% order, so that the grouping of bindings (for passing to %% group_bindings_and_auto_delete1) works properly. -group_bindings_and_auto_delete([], Acc) -> +group_bindings_fold(_Fun, Acc, []) -> Acc; -group_bindings_and_auto_delete( - [B = #binding{exchange_name = XName} | Bs], Acc) -> - group_bindings_and_auto_delete(XName, Bs, [B], Acc). - -group_bindings_and_auto_delete( - XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) -> - group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc); -group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) -> - %% either Removed is [], or its head has a non-matching XName - [X] = mnesia:read({rabbit_exchange, XName}), - NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc], - group_bindings_and_auto_delete(Removed, NewAcc). +group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B]). + +group_bindings_fold( + Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]); +group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> + %% Either Removed is [], or its head has a non-matching SrcName. + group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). + +maybe_auto_delete(XName, Bindings, Deletions) -> + case rabbit_exchange:lookup(XName) of + {error, not_found} -> + add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); + {ok, X} -> + add_deletion(XName, {X, not_deleted, Bindings}, + case rabbit_exchange:maybe_auto_delete(X) of + not_deleted -> Deletions; + {deleted, Deletions1} -> combine_deletions( + Deletions, Deletions1) + end) + end. delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -358,20 +381,55 @@ reverse_route(#route{binding = Binding}) -> reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. -reverse_binding(#reverse_binding{exchange_name = XName, - queue_name = QueueName, - key = Key, - args = Args}) -> - #binding{exchange_name = XName, - queue_name = QueueName, - key = Key, - args = Args}; - -reverse_binding(#binding{exchange_name = XName, - queue_name = QueueName, - key = Key, - args = Args}) -> - #reverse_binding{exchange_name = XName, - queue_name = QueueName, - key = Key, - args = Args}. +reverse_binding(#reverse_binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}) -> + #binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}; + +reverse_binding(#binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}) -> + #reverse_binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}. + +%% ---------------------------------------------------------------------------- +%% Binding / exchange deletion abstraction API +%% ---------------------------------------------------------------------------- + +anything_but( NotThis, NotThis, NotThis) -> NotThis; +anything_but( NotThis, NotThis, This) -> This; +anything_but( NotThis, This, NotThis) -> This; +anything_but(_NotThis, This, This) -> This. + +new_deletions() -> dict:new(). + +add_deletion(XName, Entry, Deletions) -> + dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end, + Entry, Deletions). + +combine_deletions(Deletions1, Deletions2) -> + dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end, + Deletions1, Deletions2). + +merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> + {anything_but(undefined, X1, X2), + anything_but(not_deleted, Deleted1, Deleted2), + [Bindings1 | Bindings2]}. + +process_deletions(Deletions) -> + dict:fold( + fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) -> + TypeModule = type_to_module(Type), + FlatBindings = lists:flatten(Bindings), + case Deleted of + not_deleted -> TypeModule:remove_bindings(X, FlatBindings); + deleted -> TypeModule:delete(X, FlatBindings) + end + end, ok, Deletions). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fe36cef9..f75707c3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -728,6 +728,24 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; +handle_method(#'exchange.bind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + nowait = NoWait, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_binding:add/2, + SourceNameBin, exchange, DestinationNameBin, RoutingKey, + Arguments, #'exchange.bind_ok'{}, NoWait, State); + +handle_method(#'exchange.unbind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + nowait = NoWait, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_binding:remove/2, + SourceNameBin, exchange, DestinationNameBin, RoutingKey, + Arguments, #'exchange.unbind_ok'{}, NoWait, State); + handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, durable = Durable, @@ -819,7 +837,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, nowait = NoWait, arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:add/2, - ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); handle_method(#'queue.unbind'{queue = QueueNameBin, @@ -827,7 +845,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:remove/2, - ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); handle_method(#'queue.purge'{queue = QueueNameBin, @@ -893,42 +911,48 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, - ReturnMethod, NoWait, +binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, + RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid}) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! - QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_write_permitted(QueueName, State), - ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, - State), + DestinationName = + case DestinationType of + queue -> expand_queue_name_shortcut(DestinationNameBin, State); + exchange -> rabbit_misc:r(VHostPath, exchange, DestinationNameBin) + end, + check_write_permitted(DestinationName, State), + ActualRoutingKey = + expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(#binding{exchange_name = ExchangeName, - queue_name = QueueName, - key = ActualRoutingKey, - args = Arguments}, - fun (_X, Q) -> + case Fun(#binding{source = ExchangeName, + destination = DestinationName, + key = ActualRoutingKey, + args = Arguments}, + fun (_X, Q = #amqqueue{}) -> try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:Reason -> {error, Reason} - end + end; + (_X, #exchange{}) -> + ok end) of - {error, exchange_not_found} -> + {error, source_not_found} -> rabbit_misc:not_found(ExchangeName); - {error, queue_not_found} -> - rabbit_misc:not_found(QueueName); - {error, exchange_and_queue_not_found} -> + {error, destination_not_found} -> + rabbit_misc:not_found(DestinationName); + {error, source_and_destination_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(QueueName)]); + rabbit_misc:rs(DestinationName)]); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(QueueName)]); + rabbit_misc:rs(DestinationName)]); {error, #amqp_error{} = Error} -> rabbit_misc:protocol_error(Error); ok -> return_ok(State, NoWait, ReturnMethod) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 57efe7cc..58dedf68 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -257,7 +257,8 @@ action(list_exchanges, Node, Args, Opts, Inform) -> action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [exchange_name, queue_name, + ArgAtoms = default_if_empty(Args, [source_name, source_kind, + destination_name, destination_kind, routing_key, arguments]), display_info_list(rpc_call(Node, rabbit_binding, info_all, [VHostArg, ArgAtoms]), diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 2a19d5b1..46564233 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -82,8 +82,9 @@ (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | rabbit_types:error('in_use')). --spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> - 'not_deleted' | 'auto_deleted'). +-spec(maybe_auto_delete/1:: + (rabbit_types:exchange()) + -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). -endif. @@ -99,11 +100,11 @@ recover() -> end, [], rabbit_durable_exchange), Bs = rabbit_binding:recover(), recover_with_bindings( - lists:keysort(#binding.exchange_name, Bs), + lists:keysort(#binding.source, Bs), lists:keysort(#exchange.name, Xs), []). -recover_with_bindings([B = #binding{exchange_name = Name} | Rest], - Xs = [#exchange{name = Name} | _], +recover_with_bindings([B = #binding{source = XName} | Rest], + Xs = [#exchange{name = XName} | _], Bindings) -> recover_with_bindings(Rest, Xs, [B | Bindings]); recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> @@ -225,38 +226,44 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X, Delivery) -> - publish(X, [], Delivery). - -publish(X = #exchange{type = Type}, Seen, Delivery) -> - case (type_to_module(Type)):publish(X, Delivery) of - {_, []} = R -> - #exchange{name = XName, arguments = Args} = X, - case rabbit_misc:r_arg(XName, exchange, Args, - <<"alternate-exchange">>) of - undefined -> - R; - AName -> - NewSeen = [XName | Seen], - case lists:member(AName, NewSeen) of - true -> R; - false -> case lookup(AName) of - {ok, AX} -> - publish(AX, NewSeen, Delivery); - {error, not_found} -> - rabbit_log:warning( - "alternate exchange for ~s " - "does not exist: ~s", - [rabbit_misc:rs(XName), - rabbit_misc:rs(AName)]), - R - end - end - end; - R -> - R +publish(X = #exchange{name = XName}, Delivery) -> + rabbit_router:deliver( + route(Delivery, {queue:from_list([X]), sets:from_list([XName]), []}), + Delivery). + +route(Delivery, {WorkList, SeenXs, QNames}) -> + case queue:out(WorkList) of + {empty, _WorkList} -> + lists:usort(QNames); + {{value, X = #exchange{type = Type}}, WorkList1} -> + DstNames = process_alternate( + X, ((type_to_module(Type)):route(X, Delivery))), + route(Delivery, + lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames}, + DstNames)) end. +process_alternate(#exchange{name = XName, arguments = Args}, []) -> + case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of + undefined -> []; + AName -> [AName] + end; +process_alternate(_X, Results) -> + Results. + +process_route(#resource{kind = exchange} = XName, + {WorkList, SeenXs, QNames} = Acc) -> + case sets:is_element(XName, SeenXs) of + true -> Acc; + false -> {case lookup(XName) of + {ok, X} -> queue:in(X, WorkList); + {error, not_found} -> WorkList + end, sets:add_element(XName, SeenXs), QNames} + end; +process_route(#resource{kind = queue} = QName, + {WorkList, SeenXs, QNames}) -> + {WorkList, SeenXs, [QName | QNames]}. + call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, XName}) of @@ -271,9 +278,10 @@ delete(XName, IfUnused) -> false -> fun unconditional_delete/1 end, case call_with_exchange(XName, Fun) of - {deleted, X = #exchange{type = Type}, Bs} -> - (type_to_module(Type)):delete(X, Bs), - ok; + {deleted, X, Bs, Deletions} -> + ok = rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions)); Error = {error, _InUseOrNotFound} -> Error end. @@ -282,19 +290,18 @@ maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; maybe_auto_delete(#exchange{auto_delete = true} = X) -> case conditional_delete(X) of - {error, in_use} -> not_deleted; - {deleted, X, []} -> auto_deleted + {error, in_use} -> not_deleted; + {deleted, X, [], Deletions} -> {deleted, Deletions} end. conditional_delete(X = #exchange{name = XName}) -> - case rabbit_binding:has_for_exchange(XName) of + case rabbit_binding:has_for_source(XName) of false -> unconditional_delete(X); true -> {error, in_use} end. unconditional_delete(X = #exchange{name = XName}) -> - Bindings = rabbit_binding:remove_for_exchange(XName), ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), - rabbit_event:notify(exchange_deleted, [{name, XName}]), - {deleted, X, Bindings}. + Bindings = rabbit_binding:remove_for_source(XName), + {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 85760edc..742944dc 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -36,7 +36,7 @@ behaviour_info(callbacks) -> [ {description, 0}, - {publish, 2}, + {route, 2}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} {validate, 1}, diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 4f6eb851..d934a497 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -34,7 +34,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, publish/2]). +-export([description/0, route/2]). -export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -50,10 +50,9 @@ description() -> [{name, <<"direct">>}, {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery = - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), - Delivery). +route(#exchange{name = Name}, + #delivery{message = #basic_message{routing_key = RoutingKey}}) -> + rabbit_router:match_routing_key(Name, RoutingKey). validate(_X) -> ok. create(_X) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 94798c78..77ca9686 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -34,7 +34,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, publish/2]). +-export([description/0, route/2]). -export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -50,8 +50,8 @@ description() -> [{name, <<"fanout">>}, {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery) -> - rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). +route(#exchange{name = Name}, _Delivery) -> + rabbit_router:match_routing_key(Name, '_'). validate(_X) -> ok. create(_X) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 0a59a175..ec9e7ba4 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, publish/2]). +-export([description/0, route/2]). -export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -56,17 +56,14 @@ description() -> [{name, <<"headers">>}, {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, - Delivery = #delivery{message = #basic_message{content = Content}}) -> +route(#exchange{name = Name}, + #delivery{message = #basic_message{content = Content}}) -> Headers = case (Content#content.properties)#'P_basic'.headers of undefined -> []; H -> rabbit_misc:sort_field_table(H) end, - rabbit_router:deliver(rabbit_router:match_bindings( - Name, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end), - Delivery). + rabbit_router:match_bindings( + Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end). default_headers_match_kind() -> all. @@ -79,7 +76,7 @@ parse_x_match(Other) -> %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort -%% (rabbit_misc:sort_field_table) that publish/1 and +%% (rabbit_misc:sort_field_table) that route/1 and %% rabbit_binding:{add,remove}/2 do. %% %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index e796acf3..d3ecdd4d 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -34,7 +34,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, publish/2]). +-export([description/0, route/2]). -export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -58,13 +58,12 @@ description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery = +route(#exchange{name = Name}, #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_bindings( - Name, fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end), - Delivery). + rabbit_router:match_bindings(Name, + fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end). split_topic_key(Key) -> string:tokens(binary_to_list(Key), "."). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index d35adf16..577d206d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -218,13 +218,15 @@ table_definitions() -> {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. binding_match() -> - #binding{queue_name = queue_name_match(), - exchange_name = exchange_name_match(), + #binding{source = exchange_name_match(), + destination = binding_destination_match(), _='_'}. reverse_binding_match() -> - #reverse_binding{queue_name = queue_name_match(), - exchange_name = exchange_name_match(), + #reverse_binding{destination = binding_destination_match(), + source = exchange_name_match(), _='_'}. +binding_destination_match() -> + resource_match('_'). exchange_name_match() -> resource_match(exchange). queue_name_match() -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 39eac072..00df1ce1 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -39,26 +39,27 @@ -ifdef(use_specs). --export_type([routing_key/0, routing_result/0]). +-export_type([routing_key/0, routing_result/0, match_result/0]). -type(routing_key() :: binary()). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). -type(qpids() :: [pid()]). +-type(match_result() :: [rabbit_types:binding_destination()]). --spec(deliver/2 :: - (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). --spec(match_bindings/2 :: (rabbit_exchange:name(), +-spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). +-spec(match_bindings/2 :: (rabbit_types:binding_source(), fun ((rabbit_types:binding()) -> boolean())) -> - qpids()). --spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') -> - qpids()). + match_result()). +-spec(match_routing_key/2 :: (rabbit_types:binding_source(), + routing_key() | '_') -> match_result()). -endif. %%---------------------------------------------------------------------------- -deliver(QPids, Delivery = #delivery{mandatory = false, - immediate = false}) -> +deliver(QNames, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver will deliver the message to the queue %% process asynchronously, and return true, which means all the @@ -66,11 +67,13 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% fire-and-forget cast here and return the QPids - the semantics %% is preserved. This scales much better than the non-immediate %% case below. + QPids = lookup_qpids(QNames), delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; -deliver(QPids, Delivery) -> +deliver(QNames, Delivery) -> + QPids = lookup_qpids(QNames), {Success, _} = delegate:invoke(QPids, fun (Pid) -> @@ -82,22 +85,23 @@ deliver(QPids, Delivery) -> {Routed, Handled}). %% TODO: Maybe this should be handled by a cursor instead. -%% TODO: This causes a full scan for each entry with the same exchange -match_bindings(Name, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = XName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - XName == Name, - Match(Binding)]), - lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])). - -match_routing_key(Name, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = Name, - queue_name = '$1', - key = RoutingKey, - _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). +%% TODO: This causes a full scan for each entry with the same source +match_bindings(SrcName, Match) -> + Query = qlc:q([DestinationName || + #route{binding = Binding = #binding{ + source = SrcName1, + destination = DestinationName}} <- + mnesia:table(rabbit_route), + SrcName == SrcName1, + Match(Binding)]), + mnesia:async_dirty(fun qlc:e/1, [Query]). + +match_routing_key(SrcName, RoutingKey) -> + MatchHead = #route{binding = #binding{source = SrcName, + destination = '$1', + key = RoutingKey, + _ = '_'}}, + mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]). %%-------------------------------------------------------------------- @@ -115,4 +119,4 @@ lookup_qpids(QNames) -> [#amqqueue{pid = QPid}] -> [QPid | QPids]; [] -> QPids end - end, [], lists:usort(QNames)). + end, [], QNames). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b36ee0be..1b47cdb7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1040,11 +1040,11 @@ test_server_status() -> %% list bindings ok = info_action(list_bindings, rabbit_binding:info_keys(), true), %% misc binding listing APIs - [_|_] = rabbit_binding:list_for_exchange( + [_|_] = rabbit_binding:list_for_source( rabbit_misc:r(<<"/">>, exchange, <<"">>)), - [_] = rabbit_binding:list_for_queue( + [_] = rabbit_binding:list_for_destination( rabbit_misc:r(<<"/">>, queue, <<"foo">>)), - [_] = rabbit_binding:list_for_exchange_and_queue( + [_] = rabbit_binding:list_for_source_and_destination( rabbit_misc:r(<<"/">>, exchange, <<"">>), rabbit_misc:r(<<"/">>, queue, <<"foo">>)), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 0b6a15ec..b971a63f 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -39,9 +39,11 @@ delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, - binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, - user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, - ok_pid_or_error/0, channel_exit/0, connection_exit/0]). + binding/0, binding_source/0, binding_destination/0, + amqqueue/0, exchange/0, + connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, + ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, + connection_exit/0]). -type(channel_exit() :: no_return()). -type(connection_exit() :: no_return()). @@ -113,11 +115,14 @@ host :: rabbit_networking:hostname(), port :: rabbit_networking:ip_port()}). +-type(binding_source() :: rabbit_exchange:name()). +-type(binding_destination() :: rabbit_amqqueue:name() | rabbit_exchange:name()). + -type(binding() :: - #binding{exchange_name :: rabbit_exchange:name(), - queue_name :: rabbit_amqqueue:name(), - key :: rabbit_binding:key(), - args :: rabbit_framing:amqp_table()}). + #binding{source :: rabbit_exchange:name(), + destination :: binding_destination(), + key :: rabbit_binding:key(), + args :: rabbit_framing:amqp_table()}). -type(amqqueue() :: #amqqueue{name :: rabbit_amqqueue:name(), |