summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-02-02 13:41:24 +0000
committerEmile Joubert <emile@rabbitmq.com>2011-02-02 13:41:24 +0000
commit8304d8f8a8618b6e3aae73c18b4b2594d62fd67a (patch)
tree8a01b35b7691bfd83e0174aad7c850c744e722a9
parentd887a84c64321582266051b9a26ac9a9f1d1f6f7 (diff)
downloadrabbitmq-server-8304d8f8a8618b6e3aae73c18b4b2594d62fd67a.tar.gz
Refactored sender-supplied routing keys
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_basic.erl69
-rw-r--r--src/rabbit_channel.erl18
-rw-r--r--src/rabbit_exchange.erl60
-rw-r--r--src/rabbit_exchange_type_direct.erl45
-rw-r--r--src/rabbit_exchange_type_topic.erl8
-rw-r--r--src/rabbit_router.erl59
7 files changed, 93 insertions, 168 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5c5fad76..a8b326be 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -72,7 +72,7 @@
-record(listener, {node, protocol, host, ip_address, port}).
-record(basic_message, {exchange_name, routing_key, content, guid,
- is_persistent}).
+ is_persistent, route_list = []}).
-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, immediate, txn, sender, message,
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 1ac39b65..c9d4808c 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -33,10 +33,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, properties/1, delivery/5]).
+-export([publish/1, message/3, message/4, properties/1, delivery/5]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
--export([is_message_persistent/1]).
%%----------------------------------------------------------------------------
@@ -56,8 +55,10 @@
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary()) ->
- (rabbit_types:message() | rabbit_types:error(any()))).
+ properties_input(), binary()) -> rabbit_types:message()).
+-spec(message/3 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ rabbit_types:decoded_content()) -> rabbit_types:message()).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
@@ -71,9 +72,6 @@
rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}).
--spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) ->
- (boolean() |
- {'invalid', non_neg_integer()})).
-endif.
@@ -113,19 +111,33 @@ from_content(Content) ->
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
+%% This breaks the spec rule forbidding message modification
+strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} = DecodedContent,
+ Key) when Headers =/= undefined ->
+ case lists:keyfind(Key, 1, Headers) of
+ false -> DecodedContent;
+ Tuple -> Headers0 = lists:delete(Tuple, Headers),
+ DecodedContent#content{
+ properties_bin = none,
+ properties = Props#'P_basic'{headers = Headers0}}
+ end;
+strip_header(DecodedContent, _Key) ->
+ DecodedContent.
+
+message(ExchangeName, RoutingKey,
+ #content{properties = Props} = DecodedContent) ->
+ #basic_message{
+ exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = strip_header(DecodedContent, ?DELETED_HEADER),
+ guid = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent),
+ route_list = [RoutingKey | header_routes(Props#'P_basic'.headers)]}.
+
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
Content = build_content(Properties, BodyBin),
- case is_message_persistent(Content) of
- {invalid, Other} ->
- {error, {invalid_delivery_mode, Other}};
- IsPersistent when is_boolean(IsPersistent) ->
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKeyBin,
- content = Content,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent}
- end.
+ message(ExchangeName, RoutingKeyBin, Content).
properties(P = #'P_basic'{}) ->
P;
@@ -167,5 +179,26 @@ is_message_persistent(#content{properties = #'P_basic'{
1 -> false;
2 -> true;
undefined -> false;
- Other -> {invalid, Other}
+ Other -> rabbit_log:warning("Unknown delivery mode ~p - "
+ "treating as 1, non-persistent~n",
+ [Other]),
+ false
end.
+
+% Extract CC routes from headers
+header_routes(undefined) ->
+ [];
+header_routes(HeadersTable) ->
+ lists:flatten([case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
+ {longstr, Route} -> Route;
+ {array, Routes} -> rkeys(Routes, []);
+ _ -> []
+ end || HeaderKey <- ?ROUTING_HEADERS]).
+
+rkeys([{longstr, Route} | Rest], RKeys) ->
+ rkeys(Rest, [Route | RKeys]);
+rkeys([_ | Rest], RKeys) ->
+ rkeys(Rest, RKeys);
+rkeys(_, RKeys) ->
+ RKeys.
+
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5c900b0b..e818dd54 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -527,18 +527,13 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(DecodedContent#content.properties, State),
- IsPersistent = is_message_persistent(DecodedContent),
{MsgSeqNo, State1} =
case ConfirmEnabled of
false -> {undefined, State};
true -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent},
+ Message = rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -1200,17 +1195,6 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
-is_message_persistent(Content) ->
- case rabbit_basic:is_message_persistent(Content) of
- {invalid, Other} ->
- rabbit_log:warning("Unknown delivery mode ~p - "
- "treating as 1, non-persistent~n",
- [Other]),
- false;
- IsPersistent when is_boolean(IsPersistent) ->
- IsPersistent
- end.
-
process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
send_confirms([MsgSeqNo], State);
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a94e57f8..92259195 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -1,32 +1,17 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
%%
-%% The Original Code is RabbitMQ.
+%% The Original Code is RabbitMQ.
%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange).
@@ -36,7 +21,6 @@
-export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0,
info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
-export([callback/3]).
--export([header_routes/1]).
%% this must be run inside a mnesia tx
-export([maybe_auto_delete/1]).
-export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]).
@@ -89,7 +73,7 @@
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
-spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok').
--spec(header_routes/1 :: (rabbit_framing:amqp_table()) -> [binary()]).
+
-endif.
%%----------------------------------------------------------------------------
@@ -324,23 +308,3 @@ unconditional_delete(X = #exchange{name = XName}) ->
ok = mnesia:delete({rabbit_exchange, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
-
-header_routes(undefined) ->
- [];
-header_routes(Headers) ->
- lists:flatten([routing_keys(Headers, Header) || Header <- ?ROUTING_HEADERS]).
-
-routing_keys(HeadersTable, Key) ->
- case rabbit_misc:table_lookup(HeadersTable, Key) of
- {longstr, Route} -> [Route];
- {array, Routes} -> rkeys(Routes, []);
- _ -> []
- end.
-
-rkeys([{longstr, BinVal} | Rest], RKeys) ->
- rkeys(Rest, [BinVal | RKeys]);
-rkeys([{_, _} | Rest], RKeys) ->
- rkeys(Rest, RKeys);
-rkeys(_, RKeys) ->
- RKeys.
-
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 97988381..0baac1f8 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -1,37 +1,21 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
%%
-%% The Original Code is RabbitMQ.
+%% The Original Code is RabbitMQ.
%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_direct).
-include("rabbit.hrl").
--include("rabbit_framing.hrl").
-behaviour(rabbit_exchange_type).
@@ -52,12 +36,9 @@ description() ->
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey,
- content = Content}}) ->
- HeaderKeys = rabbit_exchange:header_routes(
- (Content#content.properties)#'P_basic'.headers),
+ #delivery{message = #basic_message{route_list = Routes}}) ->
lists:flatten([rabbit_router:match_routing_key(Name, RKey) ||
- RKey <- [RoutingKey | HeaderKeys]]).
+ RKey <- Routes]).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 8f3c0550..97cf8ecf 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -30,7 +30,6 @@
%%
-module(rabbit_exchange_type_topic).
--include("rabbit_framing.hrl").
-include("rabbit.hrl").
-behaviour(rabbit_exchange_type).
@@ -60,15 +59,12 @@ description() ->
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey,
- content = Content}}) ->
- HeaderKeys = rabbit_exchange:header_routes(
- (Content#content.properties)#'P_basic'.headers),
+ #delivery{message = #basic_message{route_list = Routes}}) ->
lists:flatten([rabbit_router:match_bindings(
Name,
fun (#binding{key = BindingKey}) ->
topic_matches(BindingKey, RKey)
- end) || RKey <- [RoutingKey | HeaderKeys]]).
+ end) || RKey <- Routes]).
split_topic_key(Key) ->
string:tokens(binary_to_list(Key), ".").
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 7f9b823e..692d2473 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -1,38 +1,22 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
%%
-%% The Original Code is RabbitMQ.
+%% The Original Code is RabbitMQ.
%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
-module(rabbit_router).
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--include("rabbit_framing.hrl").
-export([deliver/2, match_bindings/2, match_routing_key/2]).
@@ -69,39 +53,22 @@ deliver(QNames, Delivery = #delivery{mandatory = false,
%% is preserved. This scales much better than the non-immediate
%% case below.
QPids = lookup_qpids(QNames),
- ModifiedDelivery = strip_header(Delivery, ?DELETED_HEADER),
delegate:invoke_no_result(
- QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, ModifiedDelivery) end),
+ QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};
deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
immediate = Immediate}) ->
QPids = lookup_qpids(QNames),
- ModifiedDelivery = strip_header(Delivery, ?DELETED_HEADER),
{Success, _} =
delegate:invoke(QPids,
fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, ModifiedDelivery)
+ rabbit_amqqueue:deliver(Pid, Delivery)
end),
{Routed, Handled} =
lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Mandatory, Immediate, {Routed, Handled}).
-%% This breaks the spec rule forbidding message modification
-strip_header(Delivery = #delivery{message = Message = #basic_message{
- content = Content = #content{
- properties = Props = #'P_basic'{headers = Headers}}}},
- Key) when Headers =/= undefined ->
- case lists:keyfind(Key, 1, Headers) of
- false -> Delivery;
- Tuple -> Headers0 = lists:delete(Tuple, Headers),
- Delivery#delivery{message = Message#basic_message{
- content = Content#content{
- properties_bin = none,
- properties = Props#'P_basic'{headers = Headers0}}}}
- end;
-strip_header(Delivery, _Key) ->
- Delivery.
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same source