diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-01-06 19:04:33 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-01-06 19:04:33 +0000 |
commit | d39b09caeb77f61ead9d1621bf808b6d5272d9bb (patch) | |
tree | 6d0ca0abf304bcdd089f587f6cbf809a7dc9a3b6 | |
parent | 1ab4da3c0a0b3d5515f3fb7fdcea802961eba2f1 (diff) | |
download | rabbitmq-server-d39b09caeb77f61ead9d1621bf808b6d5272d9bb.tar.gz |
Sender-specified distribution
First attempt for direct exchanges only
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 16 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 11 | ||||
-rw-r--r-- | src/rabbit_router.erl | 21 |
3 files changed, 42 insertions, 6 deletions
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index d49d0199..ab688853 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -31,6 +31,7 @@ -module(rabbit_exchange_type_direct). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). -behaviour(rabbit_exchange_type). @@ -50,9 +51,18 @@ description() -> [{name, <<"direct">>}, {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. -route(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:match_routing_key(Name, RoutingKey). +route(#exchange{name = #resource{virtual_host = VHost} = Name}, + #delivery{message = #basic_message{routing_key = RoutingKey, + content = Content}}) -> + BindingRoutes = rabbit_router:match_routing_key(Name, RoutingKey), + HeaderRKeys = + case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + Headers -> rabbit_misc:table_lookup(Headers, <<"CC">>, <<0>>) ++ + rabbit_misc:table_lookup(Headers, <<"BCC">>, <<0>>) + end, + HeaderRoutes = [rabbit_misc:r(VHost, queue, RKey) || RKey <- HeaderRKeys], + lists:usort(BindingRoutes ++ HeaderRoutes). validate(_X) -> ok. create(_X) -> ok. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 15ba787a..604346ed 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -40,7 +40,7 @@ protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1, assert_args_equivalence/4]). -export([dirty_read/1]). --export([table_lookup/2]). +-export([table_lookup/3, table_lookup/2]). -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). @@ -112,6 +112,8 @@ 'ok' | rabbit_types:connection_exit()). -spec(dirty_read/1 :: ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')). +-spec(table_lookup/3 :: + (rabbit_framing:amqp_table(), binary(), binary()) -> [binary()]). -spec(table_lookup/2 :: (rabbit_framing:amqp_table(), binary()) -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}). @@ -253,6 +255,13 @@ dirty_read(ReadSpec) -> [] -> {error, not_found} end. +table_lookup(Table, Key, Separator) -> + case table_lookup(Table, Key) of + undefined -> []; + {longstr, BinVal} -> binary:split(BinVal, Separator, [global]); + _ -> [] + end. + table_lookup(Table, Key) -> case lists:keysearch(Key, 1, Table) of {value, {_, TypeBin, ValueBin}} -> {TypeBin, ValueBin}; diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index d49c072c..2f556df7 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -32,6 +32,7 @@ -module(rabbit_router). -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). -export([deliver/2, match_bindings/2, match_routing_key/2]). @@ -68,22 +69,38 @@ deliver(QNames, Delivery = #delivery{mandatory = false, %% is preserved. This scales much better than the non-immediate %% case below. QPids = lookup_qpids(QNames), + ModifiedDelivery = strip_header(Delivery, <<"BCC">>), delegate:invoke_no_result( - QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), + QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, ModifiedDelivery) end), {routed, QPids}; deliver(QNames, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}) -> QPids = lookup_qpids(QNames), + ModifiedDelivery = strip_header(Delivery, <<"BCC">>), {Success, _} = delegate:invoke(QPids, fun (Pid) -> - rabbit_amqqueue:deliver(Pid, Delivery) + rabbit_amqqueue:deliver(Pid, ModifiedDelivery) end), {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success), check_delivery(Mandatory, Immediate, {Routed, Handled}). +strip_header(Delivery = #delivery{message = Message = #basic_message{ + content = Content = #content{ + properties = Props = #'P_basic'{headers = Headers}}}}, + Key) when Headers =/= undefined -> + case lists:keyfind(Key, 1, Headers) of + false -> Delivery; + Tuple -> Headers0 = lists:delete(Tuple, Headers), + Delivery#delivery{message = Message#basic_message{ + content = Content#content{ + properties_bin = none, + properties = Props#'P_basic'{headers = Headers0}}}} + end; +strip_header(Delivery, _Key) -> + Delivery. %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same source |