diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-02-09 12:01:21 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-02-09 12:01:21 +0000 |
commit | f315a8348de87819dc3b1fbd1987f94c176e8e01 (patch) | |
tree | e38dba2cdd461a8d1ea66f58035c14c28cba9571 | |
parent | 9131553ed2edac17050f1a0bddd8e6b589fa2251 (diff) | |
download | rabbitmq-server-f315a8348de87819dc3b1fbd1987f94c176e8e01.tar.gz |
Sender-selected destinations - qa feedback
-rw-r--r-- | include/rabbit.hrl | 4 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 38 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 12 | ||||
-rw-r--r-- | src/rabbit_types.erl | 2 |
6 files changed, 28 insertions, 40 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 0b6280d1..7bcf021e 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -56,8 +56,8 @@ -record(listener, {node, protocol, host, ip_address, port}). --record(basic_message, {exchange_name, routing_key, content, guid, - is_persistent, route_list = []}). +-record(basic_message, {exchange_name, routing_keys = [], content, guid, + is_persistent}). -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index a144124f..f1348d33 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -97,15 +97,15 @@ from_content(Content) -> {Props, list_to_binary(lists:reverse(FragmentsRev))}. %% This breaks the spec rule forbidding message modification -strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} = DecodedContent, - Key) when Headers =/= undefined -> - case lists:keyfind(Key, 1, Headers) of - false -> DecodedContent; - Tuple -> Headers0 = lists:delete(Tuple, Headers), +strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} + = DecodedContent, Key) when Headers =/= undefined -> + rabbit_binary_generator:clear_encoded_content( + case lists:keyfind(Key, 1, Headers) of + false -> DecodedContent; + Tuple -> Headers0 = lists:delete(Tuple, Headers), DecodedContent#content{ - properties_bin = none, properties = Props#'P_basic'{headers = Headers0}} - end; + end); strip_header(DecodedContent, _Key) -> DecodedContent. @@ -113,11 +113,10 @@ message(ExchangeName, RoutingKey, #content{properties = Props} = DecodedContent) -> #basic_message{ exchange_name = ExchangeName, - routing_key = RoutingKey, content = strip_header(DecodedContent, ?DELETED_HEADER), guid = rabbit_guid:guid(), is_persistent = is_message_persistent(DecodedContent), - route_list = [RoutingKey | header_routes(Props#'P_basic'.headers)]}. + routing_keys = [RoutingKey | header_routes(Props#'P_basic'.headers)]}. message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> Properties = properties(RawProperties), @@ -164,26 +163,15 @@ is_message_persistent(#content{properties = #'P_basic'{ 1 -> false; 2 -> true; undefined -> false; - Other -> rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", - [Other]), - false + _ -> false end. % Extract CC routes from headers header_routes(undefined) -> []; header_routes(HeadersTable) -> - lists:flatten([case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of - {longstr, Route} -> Route; - {array, Routes} -> rkeys(Routes, []); - _ -> [] - end || HeaderKey <- ?ROUTING_HEADERS]). - -rkeys([{longstr, Route} | Rest], RKeys) -> - rkeys(Rest, [Route | RKeys]); -rkeys([_ | Rest], RKeys) -> - rkeys(Rest, RKeys); -rkeys(_, RKeys) -> - RKeys. + lists:append([case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of + {array, Routes} -> [Route || {longstr, Route} <- Routes]; + _ -> [] + end || HeaderKey <- ?ROUTING_HEADERS]). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index be232bd2..16a3911d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -243,7 +243,7 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> handle_cast({deliver, ConsumerTag, AckRequired, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, + routing_keys = [RoutingKey | _CcRoutes], content = Content}}}, State = #ch{writer_pid = WriterPid, next_tag = DeliveryTag}) -> @@ -609,7 +609,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, {ok, MessageCount, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, + routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> State1 = lock_message(not(NoAck), ack_record(DeliveryTag, none, Msg), @@ -1074,7 +1074,7 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, end. basic_return(#basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, + routing_keys = [RoutingKey | _CcRoutes], content = Content}, WriterPid, Reason) -> {_Close, ReplyCode, ReplyText} = diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 0baac1f8..82776c4a 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -36,9 +36,9 @@ description() -> {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. route(#exchange{name = Name}, - #delivery{message = #basic_message{route_list = Routes}}) -> - lists:flatten([rabbit_router:match_routing_key(Name, RKey) || - RKey <- Routes]). + #delivery{message = #basic_message{routing_keys = Routes}}) -> + lists:append([rabbit_router:match_routing_key(Name, RKey) || + RKey <- Routes]). validate(_X) -> ok. create(_Tx, _X) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index beee4974..27251d12 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -44,12 +44,12 @@ description() -> {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. route(#exchange{name = Name}, - #delivery{message = #basic_message{route_list = Routes}}) -> - lists:flatten([rabbit_router:match_bindings( - Name, - fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RKey) - end) || RKey <- Routes]). + #delivery{message = #basic_message{routing_keys = Routes}}) -> + lists:append([rabbit_router:match_bindings( + Name, + fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RKey) + end) || RKey <- Routes]). split_topic_key(Key) -> string:tokens(binary_to_list(Key), "."). diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 3dbe740f..ab2300c0 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -64,7 +64,7 @@ -type(content() :: undecoded_content() | decoded_content()). -type(basic_message() :: #basic_message{exchange_name :: rabbit_exchange:name(), - routing_key :: rabbit_router:routing_key(), + routing_keys :: [rabbit_router:routing_key()], content :: content(), guid :: rabbit_guid:guid(), is_persistent :: boolean()}). |