summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-02-09 12:01:21 +0000
committerEmile Joubert <emile@rabbitmq.com>2011-02-09 12:01:21 +0000
commitf315a8348de87819dc3b1fbd1987f94c176e8e01 (patch)
treee38dba2cdd461a8d1ea66f58035c14c28cba9571
parent9131553ed2edac17050f1a0bddd8e6b589fa2251 (diff)
downloadrabbitmq-server-f315a8348de87819dc3b1fbd1987f94c176e8e01.tar.gz
Sender-selected destinations - qa feedback
-rw-r--r--include/rabbit.hrl4
-rw-r--r--src/rabbit_basic.erl38
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl6
-rw-r--r--src/rabbit_exchange_type_topic.erl12
-rw-r--r--src/rabbit_types.erl2
6 files changed, 28 insertions, 40 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 0b6280d1..7bcf021e 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -56,8 +56,8 @@
-record(listener, {node, protocol, host, ip_address, port}).
--record(basic_message, {exchange_name, routing_key, content, guid,
- is_persistent, route_list = []}).
+-record(basic_message, {exchange_name, routing_keys = [], content, guid,
+ is_persistent}).
-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, immediate, txn, sender, message,
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index a144124f..f1348d33 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -97,15 +97,15 @@ from_content(Content) ->
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
%% This breaks the spec rule forbidding message modification
-strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} = DecodedContent,
- Key) when Headers =/= undefined ->
- case lists:keyfind(Key, 1, Headers) of
- false -> DecodedContent;
- Tuple -> Headers0 = lists:delete(Tuple, Headers),
+strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
+ = DecodedContent, Key) when Headers =/= undefined ->
+ rabbit_binary_generator:clear_encoded_content(
+ case lists:keyfind(Key, 1, Headers) of
+ false -> DecodedContent;
+ Tuple -> Headers0 = lists:delete(Tuple, Headers),
DecodedContent#content{
- properties_bin = none,
properties = Props#'P_basic'{headers = Headers0}}
- end;
+ end);
strip_header(DecodedContent, _Key) ->
DecodedContent.
@@ -113,11 +113,10 @@ message(ExchangeName, RoutingKey,
#content{properties = Props} = DecodedContent) ->
#basic_message{
exchange_name = ExchangeName,
- routing_key = RoutingKey,
content = strip_header(DecodedContent, ?DELETED_HEADER),
guid = rabbit_guid:guid(),
is_persistent = is_message_persistent(DecodedContent),
- route_list = [RoutingKey | header_routes(Props#'P_basic'.headers)]}.
+ routing_keys = [RoutingKey | header_routes(Props#'P_basic'.headers)]}.
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
@@ -164,26 +163,15 @@ is_message_persistent(#content{properties = #'P_basic'{
1 -> false;
2 -> true;
undefined -> false;
- Other -> rabbit_log:warning("Unknown delivery mode ~p - "
- "treating as 1, non-persistent~n",
- [Other]),
- false
+ _ -> false
end.
% Extract CC routes from headers
header_routes(undefined) ->
[];
header_routes(HeadersTable) ->
- lists:flatten([case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
- {longstr, Route} -> Route;
- {array, Routes} -> rkeys(Routes, []);
- _ -> []
- end || HeaderKey <- ?ROUTING_HEADERS]).
-
-rkeys([{longstr, Route} | Rest], RKeys) ->
- rkeys(Rest, [Route | RKeys]);
-rkeys([_ | Rest], RKeys) ->
- rkeys(Rest, RKeys);
-rkeys(_, RKeys) ->
- RKeys.
+ lists:append([case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
+ {array, Routes} -> [Route || {longstr, Route} <- Routes];
+ _ -> []
+ end || HeaderKey <- ?ROUTING_HEADERS]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index be232bd2..16a3911d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -243,7 +243,7 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content}}},
State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
@@ -609,7 +609,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
{ok, MessageCount,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
@@ -1074,7 +1074,7 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
end.
basic_return(#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content},
WriterPid, Reason) ->
{_Close, ReplyCode, ReplyText} =
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 0baac1f8..82776c4a 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -36,9 +36,9 @@ description() ->
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
route(#exchange{name = Name},
- #delivery{message = #basic_message{route_list = Routes}}) ->
- lists:flatten([rabbit_router:match_routing_key(Name, RKey) ||
- RKey <- Routes]).
+ #delivery{message = #basic_message{routing_keys = Routes}}) ->
+ lists:append([rabbit_router:match_routing_key(Name, RKey) ||
+ RKey <- Routes]).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index beee4974..27251d12 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -44,12 +44,12 @@ description() ->
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
route(#exchange{name = Name},
- #delivery{message = #basic_message{route_list = Routes}}) ->
- lists:flatten([rabbit_router:match_bindings(
- Name,
- fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RKey)
- end) || RKey <- Routes]).
+ #delivery{message = #basic_message{routing_keys = Routes}}) ->
+ lists:append([rabbit_router:match_bindings(
+ Name,
+ fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RKey)
+ end) || RKey <- Routes]).
split_topic_key(Key) ->
string:tokens(binary_to_list(Key), ".").
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 3dbe740f..ab2300c0 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -64,7 +64,7 @@
-type(content() :: undecoded_content() | decoded_content()).
-type(basic_message() ::
#basic_message{exchange_name :: rabbit_exchange:name(),
- routing_key :: rabbit_router:routing_key(),
+ routing_keys :: [rabbit_router:routing_key()],
content :: content(),
guid :: rabbit_guid:guid(),
is_persistent :: boolean()}).