summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-01-06 19:04:33 +0000
committerEmile Joubert <emile@rabbitmq.com>2011-01-06 19:04:33 +0000
commitd39b09caeb77f61ead9d1621bf808b6d5272d9bb (patch)
tree6d0ca0abf304bcdd089f587f6cbf809a7dc9a3b6
parent1ab4da3c0a0b3d5515f3fb7fdcea802961eba2f1 (diff)
downloadrabbitmq-server-d39b09caeb77f61ead9d1621bf808b6d5272d9bb.tar.gz
Sender-specified distribution
First attempt for direct exchanges only
-rw-r--r--src/rabbit_exchange_type_direct.erl16
-rw-r--r--src/rabbit_misc.erl11
-rw-r--r--src/rabbit_router.erl21
3 files changed, 42 insertions, 6 deletions
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index d49d0199..ab688853 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -31,6 +31,7 @@
-module(rabbit_exchange_type_direct).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
-behaviour(rabbit_exchange_type).
@@ -50,9 +51,18 @@ description() ->
[{name, <<"direct">>},
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
-route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:match_routing_key(Name, RoutingKey).
+route(#exchange{name = #resource{virtual_host = VHost} = Name},
+ #delivery{message = #basic_message{routing_key = RoutingKey,
+ content = Content}}) ->
+ BindingRoutes = rabbit_router:match_routing_key(Name, RoutingKey),
+ HeaderRKeys =
+ case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ Headers -> rabbit_misc:table_lookup(Headers, <<"CC">>, <<0>>) ++
+ rabbit_misc:table_lookup(Headers, <<"BCC">>, <<0>>)
+ end,
+ HeaderRoutes = [rabbit_misc:r(VHost, queue, RKey) || RKey <- HeaderRKeys],
+ lists:usort(BindingRoutes ++ HeaderRoutes).
validate(_X) -> ok.
create(_X) -> ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 15ba787a..604346ed 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -40,7 +40,7 @@
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
--export([table_lookup/2]).
+-export([table_lookup/3, table_lookup/2]).
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
@@ -112,6 +112,8 @@
'ok' | rabbit_types:connection_exit()).
-spec(dirty_read/1 ::
({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
+-spec(table_lookup/3 ::
+ (rabbit_framing:amqp_table(), binary(), binary()) -> [binary()]).
-spec(table_lookup/2 ::
(rabbit_framing:amqp_table(), binary())
-> 'undefined' | {rabbit_framing:amqp_field_type(), any()}).
@@ -253,6 +255,13 @@ dirty_read(ReadSpec) ->
[] -> {error, not_found}
end.
+table_lookup(Table, Key, Separator) ->
+ case table_lookup(Table, Key) of
+ undefined -> [];
+ {longstr, BinVal} -> binary:split(BinVal, Separator, [global]);
+ _ -> []
+ end.
+
table_lookup(Table, Key) ->
case lists:keysearch(Key, 1, Table) of
{value, {_, TypeBin, ValueBin}} -> {TypeBin, ValueBin};
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index d49c072c..2f556df7 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -32,6 +32,7 @@
-module(rabbit_router).
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
-export([deliver/2, match_bindings/2, match_routing_key/2]).
@@ -68,22 +69,38 @@ deliver(QNames, Delivery = #delivery{mandatory = false,
%% is preserved. This scales much better than the non-immediate
%% case below.
QPids = lookup_qpids(QNames),
+ ModifiedDelivery = strip_header(Delivery, <<"BCC">>),
delegate:invoke_no_result(
- QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
+ QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, ModifiedDelivery) end),
{routed, QPids};
deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
immediate = Immediate}) ->
QPids = lookup_qpids(QNames),
+ ModifiedDelivery = strip_header(Delivery, <<"BCC">>),
{Success, _} =
delegate:invoke(QPids,
fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, Delivery)
+ rabbit_amqqueue:deliver(Pid, ModifiedDelivery)
end),
{Routed, Handled} =
lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Mandatory, Immediate, {Routed, Handled}).
+strip_header(Delivery = #delivery{message = Message = #basic_message{
+ content = Content = #content{
+ properties = Props = #'P_basic'{headers = Headers}}}},
+ Key) when Headers =/= undefined ->
+ case lists:keyfind(Key, 1, Headers) of
+ false -> Delivery;
+ Tuple -> Headers0 = lists:delete(Tuple, Headers),
+ Delivery#delivery{message = Message#basic_message{
+ content = Content#content{
+ properties_bin = none,
+ properties = Props#'P_basic'{headers = Headers0}}}}
+ end;
+strip_header(Delivery, _Key) ->
+ Delivery.
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same source