diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-06 13:06:42 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-06 13:06:42 +0000 |
commit | 25d7aaabd10557a5d90c26edcbd8db4ba489b1d4 (patch) | |
tree | f2696377c2fa676b2130d53c4fa0fdc04770bbb5 | |
parent | 6d2246365ddeb028503f70dce556bd6a41fe6de1 (diff) | |
parent | ae9b697c58b7155e4eec81be145072b373e3fe7e (diff) | |
download | rabbitmq-server-25d7aaabd10557a5d90c26edcbd8db4ba489b1d4.tar.gz |
merge bug25277 into default
-rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 22 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 13 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 9 |
5 files changed, 62 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 621da633..922951be 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -41,8 +41,6 @@ -define(INTEGER_ARG_TYPES, [byte, short, signedint, long]). --define(MAX_EXPIRY_TIMER, 4294967295). - -define(MORE_CONSUMER_CREDIT_AFTER, 50). -define(FAILOVER_WAIT_MILLIS, 100). @@ -373,8 +371,8 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]). check_declare_arguments(QueueName, Args) -> - Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, - {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, + Checks = [{<<"x-expires">>, fun check_expires_arg/2}, + {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], [case rabbit_misc:table_lookup(Args, Key) of @@ -401,20 +399,17 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. -check_positive_int_arg({Type, Val}, Args) -> +check_expires_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of - ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}}; - ok when Val > 0 -> ok; - ok -> {error, {value_zero_or_less, Val}}; - Error -> Error + ok when Val == 0 -> {error, {value_zero, Val}}; + ok -> rabbit_misc:check_expiry(Val); + Error -> Error end. -check_non_neg_int_arg({Type, Val}, Args) -> +check_message_ttl_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of - ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}}; - ok when Val >= 0 -> ok; - ok -> {error, {value_less_than_zero, Val}}; - Error -> Error + ok -> rabbit_misc:check_expiry(Val); + Error -> Error end. check_dlxrk_arg({longstr, _}, Args) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1bd1a45a..43fe3578 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -553,7 +553,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), - Props = message_properties(Confirm, Delivered, State), + Props = message_properties(Message, Confirm, Delivered, State), case attempt_delivery(Delivery, Props, State1) of {true, State2} -> State2; @@ -680,16 +680,21 @@ subtract_acks(ChPid, AckTags, State, Fun) -> Fun(State) end. -message_properties(Confirm, Delivered, #q{ttl = TTL}) -> - #message_properties{expiry = calculate_msg_expiry(TTL), +message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) -> + #message_properties{expiry = calculate_msg_expiry(Message, TTL), needs_confirming = Confirm == eventually, delivered = Delivered}. -calculate_msg_expiry(undefined) -> undefined; -calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). +calculate_msg_expiry(#basic_message{content = Content}, TTL) -> + #content{properties = Props} = + rabbit_binary_parser:ensure_content_decoded(Content), + %% We assert that the expiration must be valid - we check in the channel. + {ok, MsgTTL} = rabbit_basic:parse_expiration(Props), + case lists:min([TTL, MsgTTL]) of + undefined -> undefined; + T -> now_micros() + T * 1000 + end. -drop_expired_messages(State = #q{ttl = undefined}) -> - State; drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ }) -> Now = now_micros(), @@ -711,8 +716,6 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, ensure_ttl_timer(undefined, State) -> State; -ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) -> - State; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> After = (case Expiry - now_micros() of V when V > 0 -> V + 999; %% always fire later diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 9966c0df..9bd1fad9 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -20,7 +20,8 @@ -export([publish/4, publish/5, publish/1, message/3, message/4, properties/1, prepend_table_header/3, - extract_headers/1, map_headers/2, delivery/3, header_routes/1]). + extract_headers/1, map_headers/2, delivery/3, header_routes/1, + parse_expiration/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -72,6 +73,9 @@ binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). +-spec(parse_expiration/1 :: + (rabbit_framing:amqp_property_record()) + -> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())). -endif. @@ -254,3 +258,19 @@ header_routes(HeadersTable) -> {Type, _Val} -> throw({error, {unacceptable_type_in_header, binary_to_list(HeaderKey), Type}}) end || HeaderKey <- ?ROUTING_HEADERS]). + +parse_expiration(#'P_basic'{expiration = undefined}) -> + {ok, undefined}; +parse_expiration(#'P_basic'{expiration = Expiration}) -> + case string:to_integer(binary_to_list(Expiration)) of + {error, no_integer} = E -> + E; + {N, ""} -> + case rabbit_misc:check_expiry(N) of + ok -> {ok, N}; + E = {error, _} -> E + end; + {_, S} -> + {error, {leftover_string, S}} + end. + diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c6a33c72..a94d2ab5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -474,6 +474,13 @@ check_user_id_header(#'P_basic'{user_id = Claimed}, "'~s'", [Claimed, Actual]) end. +check_expiration_header(Props) -> + case rabbit_basic:parse_expiration(Props) of + {ok, _} -> ok; + {error, E} -> precondition_failed("invalid expiration '~s': ~p", + [Props#'P_basic'.expiration, E]) + end. + check_internal_exchange(#exchange{name = Name, internal = true}) -> rabbit_misc:protocol_error(access_refused, "cannot publish to internal ~s", @@ -614,8 +621,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_internal_exchange(Exchange), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. - DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - check_user_id_header(DecodedContent#content.properties, State), + DecodedContent = #content {properties = Props} = + rabbit_binary_parser:ensure_content_decoded(Content), + check_user_id_header(Props, State), + check_expiration_header(Props), {MsgSeqNo, State1} = case {TxStatus, ConfirmEnabled} of {none, false} -> {undefined, State}; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 78f25175..7e3cc3d7 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -63,6 +63,7 @@ -export([version/0]). -export([sequence_error/1]). -export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]). +-export([check_expiry/1]). -export([base64url/1]). %% Horrible macro to use in guards @@ -70,6 +71,9 @@ R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal; R =:= shutdown). +%% This is dictated by `erlang:send_after' on which we depend to implement TTL. +-define(MAX_EXPIRY_TIMER, 4294967295). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -229,6 +233,7 @@ -spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error'). -spec(json_to_term/1 :: (any()) -> any()). -spec(term_to_json/1 :: (any()) -> any()). +-spec(check_expiry/1 :: (integer()) -> rabbit_types:ok_or_error(any())). -spec(base64url/1 :: (binary()) -> string()). -endif. @@ -1000,6 +1005,10 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. +check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}}; +check_expiry(N) when N < 0 -> {error, {value_negative, N}}; +check_expiry(_N) -> ok. + base64url(In) -> lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; ($\/, Acc) -> [$\_ | Acc]; |