diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-04-06 16:41:37 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-04-06 16:41:37 +0100 |
commit | 4f7518de918f5d171d7ee20aa3f2a016f93dcb82 (patch) | |
tree | 945564bb389a3fa6a960a0af9697e315843eb11c /src/rabbit_basic.erl | |
parent | 4e9f0dc61d494aded3e6600304215b40a3c8f32d (diff) | |
parent | 22bdb4ffd079fb5aa842e1673de888be46029ab0 (diff) | |
download | rabbitmq-server-4f7518de918f5d171d7ee20aa3f2a016f93dcb82.tar.gz |
merge default into bug23559
Diffstat (limited to 'src/rabbit_basic.erl')
-rw-r--r-- | src/rabbit_basic.erl | 120 |
1 files changed, 70 insertions, 50 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 38412982..3cf73e80 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -1,42 +1,26 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ %% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. %% -%% The Original Code is RabbitMQ. +%% The Original Code is RabbitMQ. %% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. %% -module(rabbit_basic). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/4]). +-export([publish/1, message/3, message/4, properties/1, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). --export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -50,13 +34,17 @@ -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/4 :: +-spec(delivery/5 :: (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message()) -> rabbit_types:delivery()). + rabbit_types:message(), undefined | integer()) -> + rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) -> - (rabbit_types:message() | rabbit_types:error(any()))). + properties_input(), binary()) -> rabbit_types:message()). +-spec(message/3 :: + (rabbit_exchange:name(), rabbit_router:routing_key(), + rabbit_types:decoded_content()) -> + rabbit_types:ok_or_error2(rabbit_types:message(), any())). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: @@ -70,9 +58,6 @@ rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). --spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) -> - (boolean() | - {'invalid', non_neg_integer()})). -endif. @@ -88,9 +73,9 @@ publish(Delivery = #delivery{ Other end. -delivery(Mandatory, Immediate, Txn, Message) -> +delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - sender = self(), message = Message}. + sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) -> %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 @@ -112,19 +97,40 @@ from_content(Content) -> rabbit_framing_amqp_0_9_1:method_id('basic.publish'), {Props, list_to_binary(lists:reverse(FragmentsRev))}. -message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> +%% This breaks the spec rule forbidding message modification +strip_header(#content{properties = #'P_basic'{headers = undefined}} + = DecodedContent, _Key) -> + DecodedContent; +strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} + = DecodedContent, Key) -> + case lists:keysearch(Key, 1, Headers) of + false -> DecodedContent; + {value, Found} -> Headers0 = lists:delete(Found, Headers), + rabbit_binary_generator:clear_encoded_content( + DecodedContent#content{ + properties = Props#'P_basic'{ + headers = Headers0}}) + end. + +message(ExchangeName, RoutingKey, + #content{properties = Props} = DecodedContent) -> + try + {ok, #basic_message{ + exchange_name = ExchangeName, + content = strip_header(DecodedContent, ?DELETED_HEADER), + id = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent), + routing_keys = [RoutingKey | + header_routes(Props#'P_basic'.headers)]}} + catch + {error, _Reason} = Error -> Error + end. + +message(ExchangeName, RoutingKey, RawProperties, BodyBin) -> Properties = properties(RawProperties), Content = build_content(Properties, BodyBin), - case is_message_persistent(Content) of - {invalid, Other} -> - {error, {invalid_delivery_mode, Other}}; - IsPersistent when is_boolean(IsPersistent) -> - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = Content, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent} - end. + {ok, Msg} = message(ExchangeName, RoutingKey, Content), + Msg. properties(P = #'P_basic'{}) -> P; @@ -157,7 +163,8 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, BodyBin) -> publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, - properties(Properties), BodyBin))). + properties(Properties), BodyBin), + undefined)). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> @@ -165,5 +172,18 @@ is_message_persistent(#content{properties = #'P_basic'{ 1 -> false; 2 -> true; undefined -> false; - Other -> {invalid, Other} + Other -> throw({error, {delivery_mode_unknown, Other}}) end. + +%% Extract CC routes from headers +header_routes(undefined) -> + []; +header_routes(HeadersTable) -> + lists:append( + [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of + {array, Routes} -> [Route || {longstr, Route} <- Routes]; + undefined -> []; + {Type, _Val} -> throw({error, {unacceptable_type_in_header, + Type, + binary_to_list(HeaderKey)}}) + end || HeaderKey <- ?ROUTING_HEADERS]). |