diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-12 15:19:52 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-12 15:19:52 +0100 |
commit | 23a13253ad858dd17804e34766bd17d05a0213fa (patch) | |
tree | b999c1b0547eefe85807ef09283dbd43b8356762 | |
parent | 12c8c622775e7d96638296ecabe6082d4e4d8b96 (diff) | |
parent | f182b6135b803e0d77887046ad5fb6bb7adddea5 (diff) | |
download | rabbitmq-server-23a13253ad858dd17804e34766bd17d05a0213fa.tar.gz |
Merging default into bug 21377 and more consistent naming of Destination and DestinationName
-rw-r--r-- | docs/rabbitmqctl.1.xml | 8 | ||||
-rw-r--r-- | include/rabbit.hrl | 4 | ||||
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 124 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 55 | ||||
-rw-r--r-- | src/rabbit_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 77 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 5 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 9 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 11 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 6 | ||||
-rw-r--r-- | src/rabbit_router.erl | 52 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 4 | ||||
-rw-r--r-- | src/rabbit_types.erl | 12 |
16 files changed, 215 insertions, 166 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 5179eb25..607838ff 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -900,10 +900,10 @@ escaped as in C.</para></listitem> </varlistentry> <varlistentry> - <term>queue_name</term> - <listitem><para>The name of the queue to which the - binding is attached. with non-ASCII characters - escaped as in C.</para></listitem> + <term>destination</term> + <listitem><para>The type and name of the destination + to which the binding is attached. with non-ASCII + characters escaped as in C.</para></listitem> </varlistentry> <varlistentry> <term>routing_key</term> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 24aa8d98..6bf6c87e 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -60,8 +60,8 @@ -record(route, {binding, value = const}). -record(reverse_route, {reverse_binding, value = const}). --record(binding, {exchange_name, key, queue_name, args = []}). --record(reverse_binding, {queue_name, key, exchange_name, args = []}). +-record(binding, {exchange_name, key, destination, args = []}). +-record(reverse_binding, {destination, key, exchange_name, args = []}). -record(listener, {node, protocol, host, port}). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index cecd666b..014b0736 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -32,7 +32,7 @@ -spec(description/0 :: () -> [{atom(), any()}]). -spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) - -> {rabbit_router:routing_result(), [pid()]}). + -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). -spec(create/1 :: (rabbit_types:exchange()) -> 'ok'). -spec(recover/2 :: (rabbit_types:exchange(), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7116653c..f4256c8f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -252,7 +252,7 @@ add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, rabbit_binding:add(#binding{exchange_name = ExchangeName, - queue_name = QueueName, + destination = QueueName, key = RoutingKey, args = []}). @@ -434,7 +434,7 @@ internal_delete1(QueueName) -> %% we want to execute some things, as %% decided by rabbit_exchange, after the %% transaction. - rabbit_binding:remove_for_queue(QueueName). + rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> case @@ -479,7 +479,7 @@ on_node_down(Node) -> ok. delete_queue(QueueName) -> - Post = rabbit_binding:remove_transient_for_queue(QueueName), + Post = rabbit_binding:remove_transient_for_destination(QueueName), ok = mnesia:delete({rabbit_queue, QueueName}), Post. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 19150fa9..0a93d1a4 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -33,11 +33,12 @@ -include("rabbit.hrl"). -export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). --export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([list_for_exchange/1, list_for_destination/1, + list_for_exchange_and_destination/2]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_exchange/1, remove_for_exchange/1, - remove_for_queue/1, remove_transient_for_queue/1]). + remove_for_destination/1, remove_transient_for_destination/1]). %%---------------------------------------------------------------------------- @@ -66,9 +67,11 @@ bind_res() | rabbit_types:error('binding_not_found')). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). -spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). --spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). --spec(list_for_exchange_and_queue/2 :: - (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_destination/1 :: + (rabbit_amqqueue:name()|rabbit_exchange:name()) -> bindings()). +-spec(list_for_exchange_and_destination/2 :: + (rabbit_exchange:name(), + rabbit_amqqueue:name() | rabbit_exchange:name()) -> bindings()). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). -spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> @@ -78,16 +81,16 @@ -> [[rabbit_types:info()]]). -spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). -spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). --spec(remove_for_queue/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). --spec(remove_transient_for_queue/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). +-spec(remove_for_destination/1 :: + (rabbit_amqqueue:name() | rabbit_exchange:name()) -> fun (() -> any())). +-spec(remove_transient_for_destination/1 :: + (rabbit_amqqueue:name() | rabbit_exchange:name()) -> fun (() -> any())). -endif. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). +-define(INFO_KEYS, [exchange_name, destination, routing_key, arguments]). recover() -> rabbit_misc:table_fold( @@ -101,26 +104,24 @@ recover() -> exists(Binding) -> binding_action( Binding, - fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end). + fun (_X, _D, B) -> mnesia:read({rabbit_route, B}) /= [] end). -add(Binding) -> add(Binding, fun (_X, _Q) -> ok end). +add(Binding) -> add(Binding, fun (_X, _D) -> ok end). -remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end). +remove(Binding) -> remove(Binding, fun (_X, _D) -> ok end). add(Binding, InnerFun) -> case binding_action( Binding, - fun (X, Q, B) -> + fun (X, D, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to %% anything else - case InnerFun(X, Q) of + case InnerFun(X, D) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> Durable = (X#exchange.durable andalso - Q#amqqueue.durable), - ok = sync_binding( - B, Durable, + [] -> ok = sync_binding( + B, are_endpoints_durable(X, D), fun mnesia:write/3), {new, X, B}; [_] -> {existing, X, B} @@ -141,16 +142,14 @@ add(Binding, InnerFun) -> remove(Binding, InnerFun) -> case binding_action( Binding, - fun (X, Q, B) -> + fun (X, D, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> {error, binding_not_found}; - [_] -> case InnerFun(X, Q) of + [_] -> case InnerFun(X, D) of ok -> - Durable = (X#exchange.durable andalso - Q#amqqueue.durable), ok = sync_binding( - B, Durable, + B, are_endpoints_durable(X, D), fun mnesia:delete_object/3), Deleted = rabbit_exchange:maybe_auto_delete(X), @@ -175,7 +174,7 @@ remove(Binding, InnerFun) -> list(VHostPath) -> Route = #route{binding = #binding{ exchange_name = rabbit_misc:r(VHostPath, exchange), - queue_name = rabbit_misc:r(VHostPath, queue), + destination = rabbit_misc:r(VHostPath, '_'), _ = '_'}, _ = '_'}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, @@ -186,15 +185,15 @@ list_for_exchange(XName) -> [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_queue(QueueName) -> - Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, +list_for_destination(DestinationName) -> + Route = #route{binding = #binding{destination = DestinationName, _ = '_'}}, [reverse_binding(B) || #reverse_route{reverse_binding = B} <- mnesia:dirty_match_object(rabbit_reverse_route, reverse_route(Route))]. -list_for_exchange_and_queue(XName, QueueName) -> +list_for_exchange_and_destination(XName, DestinationName) -> Route = #route{binding = #binding{exchange_name = XName, - queue_name = QueueName, + destination = DestinationName, _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. @@ -208,10 +207,10 @@ map(VHostPath, F) -> infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. -i(exchange_name, #binding{exchange_name = XName}) -> XName; -i(queue_name, #binding{queue_name = QName}) -> QName; -i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; -i(arguments, #binding{args = Arguments}) -> Arguments; +i(exchange_name, #binding{exchange_name = XName}) -> XName; +i(destination, #binding{destination = DestinationName}) -> DestinationName; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). info(B = #binding{}) -> infos(?INFO_KEYS, B). @@ -241,22 +240,28 @@ remove_for_exchange(XName) -> _ = '_'}}, write)]. -remove_for_queue(QueueName) -> - remove_for_queue(QueueName, fun delete_forward_routes/1). +remove_for_destination(DestinationName) -> + remove_for_destination(DestinationName, fun delete_forward_routes/1). -remove_transient_for_queue(QueueName) -> - remove_for_queue(QueueName, fun delete_transient_forward_routes/1). +remove_transient_for_destination(DestinationName) -> + remove_for_destination(DestinationName, + fun delete_transient_forward_routes/1). %%---------------------------------------------------------------------------- +are_endpoints_durable(#exchange{durable = A}, #amqqueue{durable = B}) -> + A andalso B; +are_endpoints_durable(#exchange{durable = A}, #exchange{durable = B}) -> + A andalso B. + binding_action(Binding = #binding{exchange_name = XName, - queue_name = QueueName, + destination = DestinationName, args = Arguments}, Fun) -> - call_with_exchange_and_queue( - XName, QueueName, - fun (X, Q) -> + call_with_exchange_and_destination( + XName, DestinationName, + fun (X, D) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), - Fun(X, Q, Binding#binding{args = SortedArgs}) + Fun(X, D, Binding#binding{args = SortedArgs}) end). sync_binding(Binding, Durable, Fun) -> @@ -270,15 +275,19 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_exchange_and_queue(XName, QueueName, Fun) -> +call_with_exchange_and_destination(XName, DestinationName, Fun) -> + DestTable = case DestinationName#resource.kind of + queue -> rabbit_queue; + exchange -> rabbit_exchange + end, rabbit_misc:execute_mnesia_transaction( fun () -> case {mnesia:read({rabbit_exchange, XName}), - mnesia:read({rabbit_queue, QueueName})} of - {[X], [Q]} -> Fun(X, Q); - {[ ], [_]} -> {error, exchange_not_found}; - {[_], [ ]} -> {error, queue_not_found}; - {[ ], [ ]} -> {error, exchange_and_queue_not_found} - end + mnesia:read({DestTable, DestinationName})} of + {[X], [D]} -> Fun(X, D); + {[ ], [_]} -> {error, exchange_not_found}; + {[_], [ ]} -> {error, destination_not_found}; + {[ ], [ ]} -> {error, exchange_and_destination_not_found} + end end). %% Used with atoms from records; e.g., the type is expected to exist. @@ -293,7 +302,7 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). -remove_for_queue(QueueName, FwdDeleteFun) -> +remove_for_destination(DestinationName, FwdDeleteFun) -> DeletedBindings = [begin Route = reverse_route(ReverseRoute), @@ -304,9 +313,10 @@ remove_for_queue(QueueName, FwdDeleteFun) -> end || ReverseRoute <- mnesia:match_object( rabbit_reverse_route, - reverse_route(#route{binding = #binding{ - queue_name = QueueName, - _ = '_'}}), + reverse_route(#route{ + binding = #binding{ + destination = DestinationName, + _ = '_'}}), write)], Grouped = group_bindings_and_auto_delete( lists:keysort(#binding.exchange_name, DeletedBindings), []), @@ -359,19 +369,19 @@ reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. reverse_binding(#reverse_binding{exchange_name = XName, - queue_name = QueueName, + destination = DestinationName, key = Key, args = Args}) -> #binding{exchange_name = XName, - queue_name = QueueName, + destination = DestinationName, key = Key, args = Args}; reverse_binding(#binding{exchange_name = XName, - queue_name = QueueName, + destination = DestinationName, key = Key, args = Args}) -> #reverse_binding{exchange_name = XName, - queue_name = QueueName, + destination = DestinationName, key = Key, args = Args}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 174eab40..924f1bbe 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -715,6 +715,23 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; +handle_method(#'exchange.bind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + nowait = NoWait, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_binding:add/2, + SourceNameBin, exchange, DestinationNameBin, RoutingKey, + Arguments, #'exchange.bind_ok'{}, NoWait, State); + +handle_method(#'exchange.unbind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_binding:remove/2, + SourceNameBin, exchange, DestinationNameBin, RoutingKey, + Arguments, #'exchange.unbind_ok'{}, false, State); + handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, durable = Durable, @@ -806,7 +823,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, nowait = NoWait, arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:add/2, - ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); handle_method(#'queue.unbind'{queue = QueueNameBin, @@ -814,7 +831,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:remove/2, - ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); handle_method(#'queue.purge'{queue = QueueNameBin, @@ -879,42 +896,48 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, - ReturnMethod, NoWait, +binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, + RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid}) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! - QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_write_permitted(QueueName, State), - ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, - State), + DestinationName = + case DestinationType of + queue -> expand_queue_name_shortcut(DestinationNameBin, State); + exchange -> rabbit_misc:r(VHostPath, exchange, DestinationNameBin) + end, + check_write_permitted(DestinationName, State), + ActualRoutingKey = + expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), case Fun(#binding{exchange_name = ExchangeName, - queue_name = QueueName, + destination = DestinationName, key = ActualRoutingKey, args = Arguments}, - fun (_X, Q) -> + fun (_X, Q = #amqqueue{}) -> try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:Reason -> {error, Reason} - end + end; + (_X, #exchange{}) -> + ok end) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); - {error, queue_not_found} -> - rabbit_misc:not_found(QueueName); - {error, exchange_and_queue_not_found} -> + {error, destination_not_found} -> + rabbit_misc:not_found(DestinationName); + {error, exchange_and_destination_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(QueueName)]); + rabbit_misc:rs(DestinationName)]); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(QueueName)]); + rabbit_misc:rs(DestinationName)]); {error, #amqp_error{} = Error} -> rabbit_misc:protocol_error(Error); ok -> return_ok(State, NoWait, ReturnMethod) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index a3b6f369..59d3c889 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -257,7 +257,7 @@ action(list_exchanges, Node, Args, Opts, Inform) -> action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [exchange_name, queue_name, + ArgAtoms = default_if_empty(Args, [exchange_name, destination, routing_key, arguments]), display_info_list(rpc_call(Node, rabbit_binding, info_all, [VHostArg, ArgAtoms]), diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 2a19d5b1..9481301a 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -225,38 +225,55 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X, Delivery) -> - publish(X, [], Delivery). - -publish(X = #exchange{type = Type}, Seen, Delivery) -> - case (type_to_module(Type)):publish(X, Delivery) of - {_, []} = R -> - #exchange{name = XName, arguments = Args} = X, - case rabbit_misc:r_arg(XName, exchange, Args, - <<"alternate-exchange">>) of - undefined -> - R; - AName -> - NewSeen = [XName | Seen], - case lists:member(AName, NewSeen) of - true -> R; - false -> case lookup(AName) of - {ok, AX} -> - publish(AX, NewSeen, Delivery); - {error, not_found} -> - rabbit_log:warning( - "alternate exchange for ~s " - "does not exist: ~s", - [rabbit_misc:rs(XName), - rabbit_misc:rs(AName)]), - R - end - end - end; - R -> - R +publish(X = #exchange{name = XName}, Delivery) -> + QueueNames = find_queues(Delivery, queue:from_list([X]), [XName], []), + QueuePids = lookup_qpids(QueueNames), + rabbit_router:deliver(QueuePids, Delivery). + +find_queues(Delivery, WorkList, SeenExchanges, QueueNames) -> + case queue:out(WorkList) of + {empty, _WorkList} -> + lists:usort(lists:flatten(QueueNames)); + {{value, X = #exchange{type = Type}}, WorkList1} -> + {NewQueueNames, NewExchangeNames} = + process_alternate( + X, ((type_to_module(Type)):publish(X, Delivery))), + {WorkList2, SeenExchanges1} = + lists:foldl( + fun (XName, {WorkListN, SeenExchangesN} = Acc) -> + case lists:member(XName, SeenExchangesN) of + true -> Acc; + false -> {case lookup(XName) of + {ok, X1} -> + queue:in(X1, WorkListN); + {error, not_found} -> + WorkListN + end, [XName | SeenExchangesN]} + end + end, {WorkList1, SeenExchanges}, NewExchangeNames), + find_queues(Delivery, WorkList2, SeenExchanges1, + [NewQueueNames | QueueNames]) end. +process_alternate(#exchange{name = XName, arguments = Args}, {[], []}) -> + case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of + undefined -> + {[], []}; + AName -> + {[], [AName]} + end; +process_alternate(_X, Results) -> + Results. + +lookup_qpids(QueueNames) -> + lists:foldl( + fun (Key, Acc) -> + case mnesia:dirty_read({rabbit_queue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end + end, [], QueueNames). + call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, XName}) of diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 4f6eb851..bc740d4b 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -50,10 +50,9 @@ description() -> [{name, <<"direct">>}, {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery = +publish(#exchange{name = Name}, #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), - Delivery). + rabbit_router:match_routing_key(Name, RoutingKey). validate(_X) -> ok. create(_X) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 94798c78..4dad9cdd 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -50,8 +50,8 @@ description() -> [{name, <<"fanout">>}, {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery) -> - rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). +publish(#exchange{name = Name}, _Delivery) -> + rabbit_router:match_routing_key(Name, '_'). validate(_X) -> ok. create(_X) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 0a59a175..7edc6f7b 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -57,16 +57,13 @@ description() -> {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. publish(#exchange{name = Name}, - Delivery = #delivery{message = #basic_message{content = Content}}) -> + #delivery{message = #basic_message{content = Content}}) -> Headers = case (Content#content.properties)#'P_basic'.headers of undefined -> []; H -> rabbit_misc:sort_field_table(H) end, - rabbit_router:deliver(rabbit_router:match_bindings( - Name, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end), - Delivery). + rabbit_router:match_bindings( + Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end). default_headers_match_kind() -> all. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index e796acf3..f4a9c904 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -58,13 +58,12 @@ description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery = +publish(#exchange{name = Name}, #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_bindings( - Name, fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end), - Delivery). + rabbit_router:match_bindings(Name, + fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end). split_topic_key(Key) -> string:tokens(binary_to_list(Key), "."). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a3214888..e9ef61a2 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -212,13 +212,15 @@ table_definitions() -> {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. binding_match() -> - #binding{queue_name = queue_name_match(), + #binding{destination = binding_destination_match(), exchange_name = exchange_name_match(), _='_'}. reverse_binding_match() -> - #reverse_binding{queue_name = queue_name_match(), + #reverse_binding{destination = binding_destination_match(), exchange_name = exchange_name_match(), _='_'}. +binding_destination_match() -> + resource_match('_'). exchange_name_match() -> resource_match(exchange). queue_name_match() -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index bd57f737..89bafb8a 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -39,19 +39,19 @@ -ifdef(use_specs). --export_type([routing_key/0, routing_result/0]). +-export_type([routing_key/0, routing_result/0, match_result/0]). -type(routing_key() :: binary()). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). --type(qpids() :: [pid()]). +-type(match_result() :: {[rabbit_amqqueue:name()], [rabbit_exchange:name()]}). -spec(deliver/2 :: - (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). + ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}). -spec(match_bindings/2 :: (rabbit_exchange:name(), fun ((rabbit_types:binding()) -> boolean())) -> - qpids()). + match_result()). -spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') -> - qpids()). + match_result()). -endif. @@ -83,30 +83,28 @@ deliver(QPids, Delivery) -> %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange -match_bindings(Name, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = XName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - XName == Name, - Match(Binding)]), - lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])). - -match_routing_key(Name, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = Name, - queue_name = '$1', +match_bindings(XName, Match) -> + Query = qlc:q([DestinationName || + #route{binding = Binding = #binding{ + exchange_name = XName1, + destination = DestinationName}} <- + mnesia:table(rabbit_route), + XName == XName1, + Match(Binding)]), + partition_destinations(mnesia:async_dirty(fun qlc:e/1, [Query])). + +match_routing_key(XName, RoutingKey) -> + MatchHead = #route{binding = #binding{exchange_name = XName, + destination = '$1', key = RoutingKey, _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). - -lookup_qpids(Queues) -> - lists:foldl( - fun (Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of - [#amqqueue{pid = QPid}] -> [QPid | Acc]; - [] -> Acc - end - end, [], lists:usort(Queues)). + partition_destinations( + mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). + +partition_destinations(Destinations) -> + lists:partition( + fun (DestinationName) -> DestinationName#resource.kind =:= queue end, + Destinations). %%-------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a72656b7..eb08087a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1043,9 +1043,9 @@ test_server_status() -> %% misc binding listing APIs [_|_] = rabbit_binding:list_for_exchange( rabbit_misc:r(<<"/">>, exchange, <<"">>)), - [_] = rabbit_binding:list_for_queue( + [_] = rabbit_binding:list_for_destination( rabbit_misc:r(<<"/">>, queue, <<"foo">>)), - [_] = rabbit_binding:list_for_exchange_and_queue( + [_] = rabbit_binding:list_for_exchange_and_destination( rabbit_misc:r(<<"/">>, exchange, <<"">>), rabbit_misc:r(<<"/">>, queue, <<"foo">>)), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 0b6a15ec..03fbe55a 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -39,9 +39,10 @@ delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, - binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, - user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, - ok_pid_or_error/0, channel_exit/0, connection_exit/0]). + binding/0, binding_destination/0, amqqueue/0, exchange/0, + connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, + ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, + connection_exit/0]). -type(channel_exit() :: no_return()). -type(connection_exit() :: no_return()). @@ -113,9 +114,12 @@ host :: rabbit_networking:hostname(), port :: rabbit_networking:ip_port()}). +-type(binding_destination() :: + rabbit_amqqueue:name() | rabbit_exchange:name()). + -type(binding() :: #binding{exchange_name :: rabbit_exchange:name(), - queue_name :: rabbit_amqqueue:name(), + destination :: binding_destination(), key :: rabbit_binding:key(), args :: rabbit_framing:amqp_table()}). |