diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-12 16:49:20 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-12 16:49:20 +0100 |
commit | 9ffaa55446fbe90d145dd23cf48ae640f360bf84 (patch) | |
tree | df3c384fd8f885a065a20754e64ccad54fc7837e | |
parent | 23a13253ad858dd17804e34766bd17d05a0213fa (diff) | |
download | rabbitmq-server-9ffaa55446fbe90d145dd23cf48ae640f360bf84.tar.gz |
exchange_name => source; consistent naming of Src{Name} and Dst{Name}, except in rabbit_exchange when we clearly are talking only about exchanges; improvements to rabbit_control so that list_bindings returns the type of the endpoints as well as the endpoint names
-rw-r--r-- | docs/rabbitmqctl.1.xml | 14 | ||||
-rw-r--r-- | include/rabbit.hrl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 213 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 12 | ||||
-rw-r--r-- | src/rabbit_control.erl | 23 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 36 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 6 | ||||
-rw-r--r-- | src/rabbit_router.erl | 14 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 4 | ||||
-rw-r--r-- | src/rabbit_types.erl | 8 |
11 files changed, 177 insertions, 165 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 607838ff..ab16a532 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -894,16 +894,18 @@ </para> <variablelist> <varlistentry> - <term>exchange_name</term> - <listitem><para>The name of the exchange to which the - binding is attached. with non-ASCII characters - escaped as in C.</para></listitem> + <term>source</term> + <listitem><para>The name and type of the source of + messages to which the binding is attached. With + non-ASCII characters escaped as in + C.</para></listitem> </varlistentry> <varlistentry> <term>destination</term> <listitem><para>The type and name of the destination - to which the binding is attached. with non-ASCII - characters escaped as in C.</para></listitem> + of messages to which the binding is attached. With + non-ASCII characters escaped as in + C.</para></listitem> </varlistentry> <varlistentry> <term>routing_key</term> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 6bf6c87e..bce9dfa3 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -60,8 +60,8 @@ -record(route, {binding, value = const}). -record(reverse_route, {reverse_binding, value = const}). --record(binding, {exchange_name, key, destination, args = []}). --record(reverse_binding, {destination, key, exchange_name, args = []}). +-record(binding, {source, key, destination, args = []}). +-record(reverse_binding, {destination, key, source, args = []}). -record(listener, {node, protocol, host, port}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f4256c8f..90804e49 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -251,10 +251,10 @@ start_queue_process(Q) -> add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_binding:add(#binding{exchange_name = ExchangeName, - destination = QueueName, - key = RoutingKey, - args = []}). + rabbit_binding:add(#binding{source = ExchangeName, + destination = QueueName, + key = RoutingKey, + args = []}). lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 0a93d1a4..eff93baf 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -33,8 +33,8 @@ -include("rabbit.hrl"). -export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). --export([list_for_exchange/1, list_for_destination/1, - list_for_exchange_and_destination/2]). +-export([list_for_source/1, list_for_destination/1, + list_for_source_and_destination/2]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_exchange/1, remove_for_exchange/1, @@ -48,9 +48,9 @@ -type(key() :: binary()). --type(bind_errors() :: rabbit_types:error('queue_not_found' | - 'exchange_not_found' | - 'exchange_and_queue_not_found')). +-type(bind_errors() :: rabbit_types:error('source_not_found' | + 'destination_not_found' | + 'source_and_destination_not_found')). -type(bind_res() :: 'ok' | bind_errors()). -type(inner_fun() :: fun((rabbit_types:exchange(), queue()) -> @@ -66,10 +66,10 @@ -spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). --spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(list_for_source/1 :: (rabbit_exchange:name()) -> bindings()). -spec(list_for_destination/1 :: (rabbit_amqqueue:name()|rabbit_exchange:name()) -> bindings()). --spec(list_for_exchange_and_destination/2 :: +-spec(list_for_source_and_destination/2 :: (rabbit_exchange:name(), rabbit_amqqueue:name() | rabbit_exchange:name()) -> bindings()). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). @@ -90,7 +90,7 @@ %%---------------------------------------------------------------------------- --define(INFO_KEYS, [exchange_name, destination, routing_key, arguments]). +-define(INFO_KEYS, [source, destination, routing_key, arguments]). recover() -> rabbit_misc:table_fold( @@ -104,34 +104,34 @@ recover() -> exists(Binding) -> binding_action( Binding, - fun (_X, _D, B) -> mnesia:read({rabbit_route, B}) /= [] end). + fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end). -add(Binding) -> add(Binding, fun (_X, _D) -> ok end). +add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end). -remove(Binding) -> remove(Binding, fun (_X, _D) -> ok end). +remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). add(Binding, InnerFun) -> case binding_action( Binding, - fun (X, D, B) -> + fun (Src, Dst, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to %% anything else - case InnerFun(X, D) of + case InnerFun(Src, Dst) of ok -> case mnesia:read({rabbit_route, B}) of [] -> ok = sync_binding( - B, are_endpoints_durable(X, D), + B, are_endpoints_durable(Src, Dst), fun mnesia:write/3), - {new, X, B}; - [_] -> {existing, X, B} + {new, Src, B}; + [_] -> {existing, Src, B} end; {error, _} = E -> E end end) of - {new, X = #exchange{ type = Type }, B} -> - ok = (type_to_module(Type)):add_binding(X, B), + {new, Src = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(Src, B), rabbit_event:notify(binding_created, info(B)); {existing, _, _} -> ok; @@ -142,30 +142,32 @@ add(Binding, InnerFun) -> remove(Binding, InnerFun) -> case binding_action( Binding, - fun (X, D, B) -> + fun (Src, Dst, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of - [] -> {error, binding_not_found}; - [_] -> case InnerFun(X, D) of - ok -> - ok = sync_binding( - B, are_endpoints_durable(X, D), - fun mnesia:delete_object/3), - Deleted = - rabbit_exchange:maybe_auto_delete(X), - {{Deleted, X}, B}; - {error, _} = E -> - E - end + [] -> + {error, binding_not_found}; + [_] -> + case InnerFun(Src, Dst) of + ok -> + ok = sync_binding( + B, are_endpoints_durable(Src, Dst), + fun mnesia:delete_object/3), + Deleted = + rabbit_exchange:maybe_auto_delete(Src), + {{Deleted, Src}, B}; + {error, _} = E -> + E + end end end) of {error, _} = Err -> Err; - {{IsDeleted, X = #exchange{ type = Type }}, B} -> + {{IsDeleted, Src = #exchange{ type = Type }}, B} -> Module = type_to_module(Type), case IsDeleted of - auto_deleted -> ok = Module:delete(X, [B]); - not_deleted -> ok = Module:remove_bindings(X, [B]) + auto_deleted -> ok = Module:delete(Src, [B]); + not_deleted -> ok = Module:remove_bindings(Src, [B]) end, rabbit_event:notify(binding_deleted, info(B)), ok @@ -173,28 +175,28 @@ remove(Binding, InnerFun) -> list(VHostPath) -> Route = #route{binding = #binding{ - exchange_name = rabbit_misc:r(VHostPath, exchange), - destination = rabbit_misc:r(VHostPath, '_'), - _ = '_'}, + source = rabbit_misc:r(VHostPath, exchange), + destination = rabbit_misc:r(VHostPath, '_'), + _ = '_'}, _ = '_'}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_exchange(XName) -> - Route = #route{binding = #binding{exchange_name = XName, _ = '_'}}, +list_for_source(SrcName) -> + Route = #route{binding = #binding{source = SrcName, _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_destination(DestinationName) -> - Route = #route{binding = #binding{destination = DestinationName, _ = '_'}}, +list_for_destination(DstName) -> + Route = #route{binding = #binding{destination = DstName, _ = '_'}}, [reverse_binding(B) || #reverse_route{reverse_binding = B} <- mnesia:dirty_match_object(rabbit_reverse_route, reverse_route(Route))]. -list_for_exchange_and_destination(XName, DestinationName) -> - Route = #route{binding = #binding{exchange_name = XName, - destination = DestinationName, - _ = '_'}}, +list_for_source_and_destination(SrcName, DstName) -> + Route = #route{binding = #binding{source = SrcName, + destination = DstName, + _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. @@ -207,10 +209,10 @@ map(VHostPath, F) -> infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. -i(exchange_name, #binding{exchange_name = XName}) -> XName; -i(destination, #binding{destination = DestinationName}) -> DestinationName; -i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; -i(arguments, #binding{args = Arguments}) -> Arguments; +i(source, #binding{source = SrcName}) -> SrcName; +i(destination, #binding{destination = DstName}) -> DstName; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). info(B = #binding{}) -> infos(?INFO_KEYS, B). @@ -222,7 +224,7 @@ info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). has_for_exchange(XName) -> - Match = #route{binding = #binding{exchange_name = XName, _ = '_'}}, + Match = #route{binding = #binding{source = XName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure @@ -236,16 +238,15 @@ remove_for_exchange(XName) -> Route#route.binding end || Route <- mnesia:match_object( rabbit_route, - #route{binding = #binding{exchange_name = XName, - _ = '_'}}, + #route{binding = #binding{source = XName, + _ = '_'}}, write)]. -remove_for_destination(DestinationName) -> - remove_for_destination(DestinationName, fun delete_forward_routes/1). +remove_for_destination(DstName) -> + remove_for_destination(DstName, fun delete_forward_routes/1). -remove_transient_for_destination(DestinationName) -> - remove_for_destination(DestinationName, - fun delete_transient_forward_routes/1). +remove_transient_for_destination(DstName) -> + remove_for_destination(DstName, fun delete_transient_forward_routes/1). %%---------------------------------------------------------------------------- @@ -254,14 +255,14 @@ are_endpoints_durable(#exchange{durable = A}, #amqqueue{durable = B}) -> are_endpoints_durable(#exchange{durable = A}, #exchange{durable = B}) -> A andalso B. -binding_action(Binding = #binding{exchange_name = XName, - destination = DestinationName, - args = Arguments}, Fun) -> - call_with_exchange_and_destination( - XName, DestinationName, - fun (X, D) -> +binding_action(Binding = #binding{source = SrcName, + destination = DstName, + args = Arguments}, Fun) -> + call_with_source_and_destination( + SrcName, DstName, + fun (Src, Dst) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), - Fun(X, D, Binding#binding{args = SortedArgs}) + Fun(Src, Dst, Binding#binding{args = SortedArgs}) end). sync_binding(Binding, Durable, Fun) -> @@ -275,18 +276,18 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_exchange_and_destination(XName, DestinationName, Fun) -> - DestTable = case DestinationName#resource.kind of - queue -> rabbit_queue; - exchange -> rabbit_exchange - end, +call_with_source_and_destination(SrcName, DstName, Fun) -> + DstTable = case DstName#resource.kind of + queue -> rabbit_queue; + exchange -> rabbit_exchange + end, rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({rabbit_exchange, XName}), - mnesia:read({DestTable, DestinationName})} of - {[X], [D]} -> Fun(X, D); - {[ ], [_]} -> {error, exchange_not_found}; - {[_], [ ]} -> {error, destination_not_found}; - {[ ], [ ]} -> {error, exchange_and_destination_not_found} + fun () -> case {mnesia:read({rabbit_exchange, SrcName}), + mnesia:read({DstTable, DstName})} of + {[Src], [Dst]} -> Fun(Src, Dst); + {[], [_] } -> {error, source_not_found}; + {[_], [] } -> {error, destination_not_found}; + {[], [] } -> {error, source_and_destination_not_found} end end). @@ -302,7 +303,7 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). -remove_for_destination(DestinationName, FwdDeleteFun) -> +remove_for_destination(DstName, FwdDeleteFun) -> DeletedBindings = [begin Route = reverse_route(ReverseRoute), @@ -315,18 +316,18 @@ remove_for_destination(DestinationName, FwdDeleteFun) -> rabbit_reverse_route, reverse_route(#route{ binding = #binding{ - destination = DestinationName, + destination = DstName, _ = '_'}}), write)], Grouped = group_bindings_and_auto_delete( - lists:keysort(#binding.exchange_name, DeletedBindings), []), + lists:keysort(#binding.source, DeletedBindings), []), fun () -> lists:foreach( - fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> + fun ({{IsDeleted, Src = #exchange{ type = Type }}, Bs}) -> Module = type_to_module(Type), case IsDeleted of - auto_deleted -> Module:delete(X, Bs); - not_deleted -> Module:remove_bindings(X, Bs) + auto_deleted -> Module:delete(Src, Bs); + not_deleted -> Module:remove_bindings(Src, Bs) end end, Grouped) end. @@ -337,16 +338,16 @@ remove_for_destination(DestinationName, FwdDeleteFun) -> group_bindings_and_auto_delete([], Acc) -> Acc; group_bindings_and_auto_delete( - [B = #binding{exchange_name = XName} | Bs], Acc) -> - group_bindings_and_auto_delete(XName, Bs, [B], Acc). + [B = #binding{source = SrcName} | Bs], Acc) -> + group_bindings_and_auto_delete(SrcName, Bs, [B], Acc). group_bindings_and_auto_delete( - XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) -> - group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc); -group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) -> - %% either Removed is [], or its head has a non-matching XName - [X] = mnesia:read({rabbit_exchange, XName}), - NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc], + SrcName, [B = #binding{source = SrcName} | Bs], Bindings, Acc) -> + group_bindings_and_auto_delete(SrcName, Bs, [B | Bindings], Acc); +group_bindings_and_auto_delete(SrcName, Removed, Bindings, Acc) -> + %% either Removed is [], or its head has a non-matching SrcName + [Src] = mnesia:read({rabbit_exchange, SrcName}), + NewAcc = [{{rabbit_exchange:maybe_auto_delete(Src), Src}, Bindings} | Acc], group_bindings_and_auto_delete(Removed, NewAcc). delete_forward_routes(Route) -> @@ -368,20 +369,20 @@ reverse_route(#route{binding = Binding}) -> reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. -reverse_binding(#reverse_binding{exchange_name = XName, - destination = DestinationName, - key = Key, - args = Args}) -> - #binding{exchange_name = XName, - destination = DestinationName, - key = Key, - args = Args}; - -reverse_binding(#binding{exchange_name = XName, - destination = DestinationName, - key = Key, - args = Args}) -> - #reverse_binding{exchange_name = XName, - destination = DestinationName, - key = Key, - args = Args}. +reverse_binding(#reverse_binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}) -> + #binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}; + +reverse_binding(#binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}) -> + #reverse_binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 924f1bbe..0613422c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -914,10 +914,10 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(#binding{exchange_name = ExchangeName, - destination = DestinationName, - key = ActualRoutingKey, - args = Arguments}, + case Fun(#binding{source = ExchangeName, + destination = DestinationName, + key = ActualRoutingKey, + args = Arguments}, fun (_X, Q = #amqqueue{}) -> try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:Reason -> {error, Reason} @@ -925,11 +925,11 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, (_X, #exchange{}) -> ok end) of - {error, exchange_not_found} -> + {error, source_not_found} -> rabbit_misc:not_found(ExchangeName); {error, destination_not_found} -> rabbit_misc:not_found(DestinationName); - {error, exchange_and_destination_not_found} -> + {error, source_and_destination_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), rabbit_misc:rs(DestinationName)]); diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 59d3c889..e7050ef0 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -257,11 +257,16 @@ action(list_exchanges, Node, Args, Opts, Inform) -> action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [exchange_name, destination, + FormatFun = fun (#resource{name = Name, kind = Kind}) -> + "{" ++ format_info_item(Kind) ++ ": " ++ + format_info_item(Name) ++ "}" + end, + ArgAtoms = default_if_empty(Args, [source, destination, routing_key, arguments]), display_info_list(rpc_call(Node, rabbit_binding, info_all, [VHostArg, ArgAtoms]), - ArgAtoms); + ArgAtoms, [{source, FormatFun}, + {destination, FormatFun}]); action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), @@ -311,14 +316,18 @@ default_if_empty(List, Default) when is_list(List) -> [list_to_atom(X) || X <- List] end. -display_info_list(Results, InfoItemKeys) when is_list(Results) -> +display_info_list(Results, InfoItemKeys) -> + display_info_list(Results, InfoItemKeys, []). + +display_info_list(Results, InfoItemKeys, FormatFuns) when is_list(Results) -> lists:foreach( - fun (Result) -> display_row( - [format_info_item(proplists:get_value(X, Result)) || - X <- InfoItemKeys]) + fun (Result) -> + display_row( + [(proplists:get_value(X, FormatFuns, fun format_info_item/1))( + proplists:get_value(X, Result)) || X <- InfoItemKeys]) end, Results), ok; -display_info_list(Other, _) -> +display_info_list(Other, _, _) -> Other. display_row(Row) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 9481301a..9f85f4cc 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -99,11 +99,11 @@ recover() -> end, [], rabbit_durable_exchange), Bs = rabbit_binding:recover(), recover_with_bindings( - lists:keysort(#binding.exchange_name, Bs), + lists:keysort(#binding.source, Bs), lists:keysort(#exchange.name, Xs), []). -recover_with_bindings([B = #binding{exchange_name = Name} | Rest], - Xs = [#exchange{name = Name} | _], +recover_with_bindings([B = #binding{source = XName} | Rest], + Xs = [#exchange{name = XName} | _], Bindings) -> recover_with_bindings(Rest, Xs, [B | Bindings]); recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> @@ -226,33 +226,33 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X = #exchange{name = XName}, Delivery) -> - QueueNames = find_queues(Delivery, queue:from_list([X]), [XName], []), - QueuePids = lookup_qpids(QueueNames), - rabbit_router:deliver(QueuePids, Delivery). + QNames = find_qnames(Delivery, queue:from_list([X]), [XName], []), + QPids = lookup_qpids(QNames), + rabbit_router:deliver(QPids, Delivery). -find_queues(Delivery, WorkList, SeenExchanges, QueueNames) -> +find_qnames(Delivery, WorkList, SeenXs, QNames) -> case queue:out(WorkList) of {empty, _WorkList} -> - lists:usort(lists:flatten(QueueNames)); + lists:usort(lists:flatten(QNames)); {{value, X = #exchange{type = Type}}, WorkList1} -> - {NewQueueNames, NewExchangeNames} = + {NewQNames, NewXNames} = process_alternate( X, ((type_to_module(Type)):publish(X, Delivery))), - {WorkList2, SeenExchanges1} = + {WorkList2, SeenXs1} = lists:foldl( - fun (XName, {WorkListN, SeenExchangesN} = Acc) -> - case lists:member(XName, SeenExchangesN) of + fun (XName, {WorkListN, SeenXsN} = Acc) -> + case lists:member(XName, SeenXsN) of true -> Acc; false -> {case lookup(XName) of {ok, X1} -> queue:in(X1, WorkListN); {error, not_found} -> WorkListN - end, [XName | SeenExchangesN]} + end, [XName | SeenXsN]} end - end, {WorkList1, SeenExchanges}, NewExchangeNames), - find_queues(Delivery, WorkList2, SeenExchanges1, - [NewQueueNames | QueueNames]) + end, {WorkList1, SeenXs}, NewXNames), + find_qnames(Delivery, WorkList2, SeenXs1, + [NewQNames | QNames]) end. process_alternate(#exchange{name = XName, arguments = Args}, {[], []}) -> @@ -265,14 +265,14 @@ process_alternate(#exchange{name = XName, arguments = Args}, {[], []}) -> process_alternate(_X, Results) -> Results. -lookup_qpids(QueueNames) -> +lookup_qpids(QNames) -> lists:foldl( fun (Key, Acc) -> case mnesia:dirty_read({rabbit_queue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc end - end, [], QueueNames). + end, [], QNames). call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index e9ef61a2..33d13978 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -212,12 +212,12 @@ table_definitions() -> {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. binding_match() -> - #binding{destination = binding_destination_match(), - exchange_name = exchange_name_match(), + #binding{source = exchange_name_match(), + destination = binding_destination_match(), _='_'}. reverse_binding_match() -> #reverse_binding{destination = binding_destination_match(), - exchange_name = exchange_name_match(), + source = exchange_name_match(), _='_'}. binding_destination_match() -> resource_match('_'). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 89bafb8a..c5a1c440 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -83,21 +83,21 @@ deliver(QPids, Delivery) -> %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange -match_bindings(XName, Match) -> +match_bindings(SrcName, Match) -> Query = qlc:q([DestinationName || #route{binding = Binding = #binding{ - exchange_name = XName1, + source = SrcName1, destination = DestinationName}} <- mnesia:table(rabbit_route), - XName == XName1, + SrcName == SrcName1, Match(Binding)]), partition_destinations(mnesia:async_dirty(fun qlc:e/1, [Query])). -match_routing_key(XName, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = XName, +match_routing_key(SrcName, RoutingKey) -> + MatchHead = #route{binding = #binding{source = SrcName, destination = '$1', - key = RoutingKey, - _ = '_'}}, + key = RoutingKey, + _ = '_'}}, partition_destinations( mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index eb08087a..ea46357e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1041,11 +1041,11 @@ test_server_status() -> %% list bindings ok = info_action(list_bindings, rabbit_binding:info_keys(), true), %% misc binding listing APIs - [_|_] = rabbit_binding:list_for_exchange( + [_|_] = rabbit_binding:list_for_source( rabbit_misc:r(<<"/">>, exchange, <<"">>)), [_] = rabbit_binding:list_for_destination( rabbit_misc:r(<<"/">>, queue, <<"foo">>)), - [_] = rabbit_binding:list_for_exchange_and_destination( + [_] = rabbit_binding:list_for_source_and_destination( rabbit_misc:r(<<"/">>, exchange, <<"">>), rabbit_misc:r(<<"/">>, queue, <<"foo">>)), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 03fbe55a..603c45bd 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -118,10 +118,10 @@ rabbit_amqqueue:name() | rabbit_exchange:name()). -type(binding() :: - #binding{exchange_name :: rabbit_exchange:name(), - destination :: binding_destination(), - key :: rabbit_binding:key(), - args :: rabbit_framing:amqp_table()}). + #binding{source :: rabbit_exchange:name(), + destination :: binding_destination(), + key :: rabbit_binding:key(), + args :: rabbit_framing:amqp_table()}). -type(amqqueue() :: #amqqueue{name :: rabbit_amqqueue:name(), |