summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-01-24 17:40:26 +0000
committerEmile Joubert <emile@rabbitmq.com>2011-01-24 17:40:26 +0000
commitd887a84c64321582266051b9a26ac9a9f1d1f6f7 (patch)
tree2e7eb985f76143d2c21989609648ddfb2609cf9d
parent7068da5a734cb426d193aaf16a9ca410a4e9d454 (diff)
downloadrabbitmq-server-d887a84c64321582266051b9a26ac9a9f1d1f6f7.tar.gz
Treat sender-specified destinations as routing keys
rather than queue names
-rw-r--r--src/rabbit_exchange.erl13
-rw-r--r--src/rabbit_exchange_type_direct.erl10
-rw-r--r--src/rabbit_exchange_type_fanout.erl10
-rw-r--r--src/rabbit_exchange_type_topic.erl15
4 files changed, 22 insertions, 26 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 24079d22..a94e57f8 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,7 +36,7 @@
-export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0,
info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
-export([callback/3]).
--export([header_routes/2]).
+-export([header_routes/1]).
%% this must be run inside a mnesia tx
-export([maybe_auto_delete/1]).
-export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]).
@@ -89,8 +89,7 @@
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
-spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok').
--spec(header_routes/2 :: (rabbit_framing:amqp_table(), rabbit_types:vhost()) ->
- [rabbit_types:r('queue')]).
+-spec(header_routes/1 :: (rabbit_framing:amqp_table()) -> [binary()]).
-endif.
%%----------------------------------------------------------------------------
@@ -326,12 +325,10 @@ unconditional_delete(X = #exchange{name = XName}) ->
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
-header_routes(undefined, _VHost) ->
+header_routes(undefined) ->
[];
-header_routes(Headers, VHost) ->
- [rabbit_misc:r(VHost, queue, RKey) ||
- RKey <- lists:flatten([routing_keys(Headers, Header) ||
- Header <- ?ROUTING_HEADERS])].
+header_routes(Headers) ->
+ lists:flatten([routing_keys(Headers, Header) || Header <- ?ROUTING_HEADERS]).
routing_keys(HeadersTable, Key) ->
case rabbit_misc:table_lookup(HeadersTable, Key) of
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index ade57451..97988381 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -51,13 +51,13 @@ description() ->
[{name, <<"direct">>},
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
-route(#exchange{name = #resource{virtual_host = VHost} = Name},
+route(#exchange{name = Name},
#delivery{message = #basic_message{routing_key = RoutingKey,
content = Content}}) ->
- BindingRoutes = rabbit_router:match_routing_key(Name, RoutingKey),
- HeaderRoutes = rabbit_exchange:header_routes(
- (Content#content.properties)#'P_basic'.headers, VHost),
- BindingRoutes ++ HeaderRoutes.
+ HeaderKeys = rabbit_exchange:header_routes(
+ (Content#content.properties)#'P_basic'.headers),
+ lists:flatten([rabbit_router:match_routing_key(Name, RKey) ||
+ RKey <- [RoutingKey | HeaderKeys]]).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index f3716141..5266dd87 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -31,7 +31,6 @@
-module(rabbit_exchange_type_fanout).
-include("rabbit.hrl").
--include("rabbit_framing.hrl").
-behaviour(rabbit_exchange_type).
@@ -51,13 +50,8 @@ description() ->
[{name, <<"fanout">>},
{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
-route(#exchange{name = #resource{virtual_host = VHost} = Name},
- #delivery{message = #basic_message{content = Content}}) ->
- BindingRoutes = rabbit_router:match_routing_key(Name, '_'),
- HeaderRoutes = rabbit_exchange:header_routes(
- (Content#content.properties)#'P_basic'.headers, VHost),
- BindingRoutes ++ HeaderRoutes.
-
+route(#exchange{name = Name}, _Delivery) ->
+ rabbit_router:match_routing_key(Name, '_').
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 2f0d47a7..8f3c0550 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -30,6 +30,7 @@
%%
-module(rabbit_exchange_type_topic).
+-include("rabbit_framing.hrl").
-include("rabbit.hrl").
-behaviour(rabbit_exchange_type).
@@ -59,11 +60,15 @@ description() ->
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:match_bindings(Name,
- fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end).
+ #delivery{message = #basic_message{routing_key = RoutingKey,
+ content = Content}}) ->
+ HeaderKeys = rabbit_exchange:header_routes(
+ (Content#content.properties)#'P_basic'.headers),
+ lists:flatten([rabbit_router:match_bindings(
+ Name,
+ fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RKey)
+ end) || RKey <- [RoutingKey | HeaderKeys]]).
split_topic_key(Key) ->
string:tokens(binary_to_list(Key), ".").