diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-02-02 13:41:24 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-02-02 13:41:24 +0000 |
commit | 8304d8f8a8618b6e3aae73c18b4b2594d62fd67a (patch) | |
tree | 8a01b35b7691bfd83e0174aad7c850c744e722a9 | |
parent | d887a84c64321582266051b9a26ac9a9f1d1f6f7 (diff) | |
download | rabbitmq-server-8304d8f8a8618b6e3aae73c18b4b2594d62fd67a.tar.gz |
Refactored sender-supplied routing keys
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 69 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 18 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 60 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 45 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 8 | ||||
-rw-r--r-- | src/rabbit_router.erl | 59 |
7 files changed, 93 insertions, 168 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5c5fad76..a8b326be 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -72,7 +72,7 @@ -record(listener, {node, protocol, host, ip_address, port}). -record(basic_message, {exchange_name, routing_key, content, guid, - is_persistent}). + is_persistent, route_list = []}). -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 1ac39b65..c9d4808c 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,10 +33,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/5]). +-export([publish/1, message/3, message/4, properties/1, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). --export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -56,8 +55,10 @@ rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) -> - (rabbit_types:message() | rabbit_types:error(any()))). + properties_input(), binary()) -> rabbit_types:message()). +-spec(message/3 :: + (rabbit_exchange:name(), rabbit_router:routing_key(), + rabbit_types:decoded_content()) -> rabbit_types:message()). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: @@ -71,9 +72,6 @@ rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). --spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) -> - (boolean() | - {'invalid', non_neg_integer()})). -endif. @@ -113,19 +111,33 @@ from_content(Content) -> rabbit_framing_amqp_0_9_1:method_id('basic.publish'), {Props, list_to_binary(lists:reverse(FragmentsRev))}. +%% This breaks the spec rule forbidding message modification +strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} = DecodedContent, + Key) when Headers =/= undefined -> + case lists:keyfind(Key, 1, Headers) of + false -> DecodedContent; + Tuple -> Headers0 = lists:delete(Tuple, Headers), + DecodedContent#content{ + properties_bin = none, + properties = Props#'P_basic'{headers = Headers0}} + end; +strip_header(DecodedContent, _Key) -> + DecodedContent. + +message(ExchangeName, RoutingKey, + #content{properties = Props} = DecodedContent) -> + #basic_message{ + exchange_name = ExchangeName, + routing_key = RoutingKey, + content = strip_header(DecodedContent, ?DELETED_HEADER), + guid = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent), + route_list = [RoutingKey | header_routes(Props#'P_basic'.headers)]}. + message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> Properties = properties(RawProperties), Content = build_content(Properties, BodyBin), - case is_message_persistent(Content) of - {invalid, Other} -> - {error, {invalid_delivery_mode, Other}}; - IsPersistent when is_boolean(IsPersistent) -> - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = Content, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent} - end. + message(ExchangeName, RoutingKeyBin, Content). properties(P = #'P_basic'{}) -> P; @@ -167,5 +179,26 @@ is_message_persistent(#content{properties = #'P_basic'{ 1 -> false; 2 -> true; undefined -> false; - Other -> {invalid, Other} + Other -> rabbit_log:warning("Unknown delivery mode ~p - " + "treating as 1, non-persistent~n", + [Other]), + false end. + +% Extract CC routes from headers +header_routes(undefined) -> + []; +header_routes(HeadersTable) -> + lists:flatten([case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of + {longstr, Route} -> Route; + {array, Routes} -> rkeys(Routes, []); + _ -> [] + end || HeaderKey <- ?ROUTING_HEADERS]). + +rkeys([{longstr, Route} | Rest], RKeys) -> + rkeys(Rest, [Route | RKeys]); +rkeys([_ | Rest], RKeys) -> + rkeys(Rest, RKeys); +rkeys(_, RKeys) -> + RKeys. + diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5c900b0b..e818dd54 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -527,18 +527,13 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(DecodedContent#content.properties, State), - IsPersistent = is_message_persistent(DecodedContent), {MsgSeqNo, State1} = case ConfirmEnabled of false -> {undefined, State}; true -> SeqNo = State#ch.publish_seqno, {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent}, + Message = rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -1200,17 +1195,6 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. -is_message_persistent(Content) -> - case rabbit_basic:is_message_persistent(Content) of - {invalid, Other} -> - rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", - [Other]), - false; - IsPersistent when is_boolean(IsPersistent) -> - IsPersistent - end. - process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), send_confirms([MsgSeqNo], State); diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a94e57f8..92259195 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -1,32 +1,17 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. %% -%% The Original Code is RabbitMQ. +%% The Original Code is RabbitMQ. %% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. %% -module(rabbit_exchange). @@ -36,7 +21,6 @@ -export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). -export([callback/3]). --export([header_routes/1]). %% this must be run inside a mnesia tx -export([maybe_auto_delete/1]). -export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]). @@ -89,7 +73,7 @@ (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). -spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok'). --spec(header_routes/1 :: (rabbit_framing:amqp_table()) -> [binary()]). + -endif. %%---------------------------------------------------------------------------- @@ -324,23 +308,3 @@ unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_exchange, XName}), Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. - -header_routes(undefined) -> - []; -header_routes(Headers) -> - lists:flatten([routing_keys(Headers, Header) || Header <- ?ROUTING_HEADERS]). - -routing_keys(HeadersTable, Key) -> - case rabbit_misc:table_lookup(HeadersTable, Key) of - {longstr, Route} -> [Route]; - {array, Routes} -> rkeys(Routes, []); - _ -> [] - end. - -rkeys([{longstr, BinVal} | Rest], RKeys) -> - rkeys(Rest, [BinVal | RKeys]); -rkeys([{_, _} | Rest], RKeys) -> - rkeys(Rest, RKeys); -rkeys(_, RKeys) -> - RKeys. - diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 97988381..0baac1f8 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -1,37 +1,21 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. %% -%% The Original Code is RabbitMQ. +%% The Original Code is RabbitMQ. %% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_direct). -include("rabbit.hrl"). --include("rabbit_framing.hrl"). -behaviour(rabbit_exchange_type). @@ -52,12 +36,9 @@ description() -> {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. route(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey, - content = Content}}) -> - HeaderKeys = rabbit_exchange:header_routes( - (Content#content.properties)#'P_basic'.headers), + #delivery{message = #basic_message{route_list = Routes}}) -> lists:flatten([rabbit_router:match_routing_key(Name, RKey) || - RKey <- [RoutingKey | HeaderKeys]]). + RKey <- Routes]). validate(_X) -> ok. create(_Tx, _X) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 8f3c0550..97cf8ecf 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -30,7 +30,6 @@ %% -module(rabbit_exchange_type_topic). --include("rabbit_framing.hrl"). -include("rabbit.hrl"). -behaviour(rabbit_exchange_type). @@ -60,15 +59,12 @@ description() -> {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. route(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey, - content = Content}}) -> - HeaderKeys = rabbit_exchange:header_routes( - (Content#content.properties)#'P_basic'.headers), + #delivery{message = #basic_message{route_list = Routes}}) -> lists:flatten([rabbit_router:match_bindings( Name, fun (#binding{key = BindingKey}) -> topic_matches(BindingKey, RKey) - end) || RKey <- [RoutingKey | HeaderKeys]]). + end) || RKey <- Routes]). split_topic_key(Key) -> string:tokens(binary_to_list(Key), "."). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 7f9b823e..692d2473 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -1,38 +1,22 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. %% -%% The Original Code is RabbitMQ. +%% The Original Code is RabbitMQ. %% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. %% -module(rabbit_router). -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --include("rabbit_framing.hrl"). -export([deliver/2, match_bindings/2, match_routing_key/2]). @@ -69,39 +53,22 @@ deliver(QNames, Delivery = #delivery{mandatory = false, %% is preserved. This scales much better than the non-immediate %% case below. QPids = lookup_qpids(QNames), - ModifiedDelivery = strip_header(Delivery, ?DELETED_HEADER), delegate:invoke_no_result( - QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, ModifiedDelivery) end), + QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; deliver(QNames, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}) -> QPids = lookup_qpids(QNames), - ModifiedDelivery = strip_header(Delivery, ?DELETED_HEADER), {Success, _} = delegate:invoke(QPids, fun (Pid) -> - rabbit_amqqueue:deliver(Pid, ModifiedDelivery) + rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success), check_delivery(Mandatory, Immediate, {Routed, Handled}). -%% This breaks the spec rule forbidding message modification -strip_header(Delivery = #delivery{message = Message = #basic_message{ - content = Content = #content{ - properties = Props = #'P_basic'{headers = Headers}}}}, - Key) when Headers =/= undefined -> - case lists:keyfind(Key, 1, Headers) of - false -> Delivery; - Tuple -> Headers0 = lists:delete(Tuple, Headers), - Delivery#delivery{message = Message#basic_message{ - content = Content#content{ - properties_bin = none, - properties = Props#'P_basic'{headers = Headers0}}}} - end; -strip_header(Delivery, _Key) -> - Delivery. %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same source |