diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-09-14 11:54:21 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-09-14 11:54:21 +0100 |
commit | cd0ded1247c2b87825bb7418acdb8045d9bf004e (patch) | |
tree | 4741ad514c02f7c510dd1de14b0c38fadec2000b | |
parent | fafb014bb2251d210ed972ceda796bf425ae72ab (diff) | |
parent | de8ebece1efd57e697ffe0a444c2c98ea64331cb (diff) | |
download | rabbitmq-server-cd0ded1247c2b87825bb7418acdb8045d9bf004e.tar.gz |
merge default
In particular, enforce the limit introduced in bug 24867 with per-message TTL
too.
-rw-r--r-- | src/rabbit_amqqueue.erl | 16 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 23 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 11 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 10 |
5 files changed, 65 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d566ac87..090337b8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,8 +40,6 @@ -define(INTEGER_ARG_TYPES, [byte, short, signedint, long]). --define(MAX_EXPIRY_TIMER, 4294967295). - -define(MORE_CONSUMER_CREDIT_AFTER, 50). -define(FAILOVER_WAIT_MILLIS, 100). @@ -399,18 +397,16 @@ check_int_arg({Type, _}, _) -> check_positive_int_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of - ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}}; - ok when Val > 0 -> ok; - ok -> {error, {value_zero_or_less, Val}}; - Error -> Error + ok when Val > 0 -> rabbit_misc:check_expiry_size(Val); + ok -> {error, {value_zero_or_less, Val}}; + Error -> Error end. check_non_neg_int_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of - ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}}; - ok when Val >= 0 -> ok; - ok -> {error, {value_less_than_zero, Val}}; - Error -> Error + ok when Val >= 0 -> rabbit_misc:check_expiry_size(Val); + ok -> {error, {value_less_than_zero, Val}}; + Error -> Error end. check_dlxrk_arg({longstr, _}, Args) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 20ba4574..dc8ae711 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -540,7 +540,7 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, {false, BQS1} -> deliver_msgs_to_consumers( fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - Props = message_properties(Confirm, State1), + Props = message_properties(Message, Confirm, State1), {AckTag, BQS3} = BQ:publish_delivered( AckRequired, Message, Props, SenderPid, BQS2), @@ -575,7 +575,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, {false, State1} -> State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), - Props = message_properties(Confirm, State2), + Props = message_properties(Message, Confirm, State2), BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) @@ -705,12 +705,24 @@ discard_delivery(#delivery{sender = SenderPid, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. -message_properties(Confirm, #q{ttl = TTL}) -> - #message_properties{expiry = calculate_msg_expiry(TTL), +message_properties(Message, Confirm, #q{ttl = TTL}) -> + #message_properties{expiry = calculate_msg_expiry(Message, TTL), needs_confirming = needs_confirming(Confirm)}. -calculate_msg_expiry(undefined) -> undefined; -calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). +calculate_msg_expiry(#basic_message{content = Content}, TTL) -> + #content{properties = Props} = + rabbit_binary_parser:ensure_content_decoded(Content), + %% We assert that the expiration must be valid - we check in che channel. + Milli = case {rabbit_basic:parse_expiration(Props), TTL} of + {{ok, undefined}, _ } -> TTL; + {{ok, N }, undefined} -> N; + {{ok, N }, M } when N < M -> N; + {{ok, N }, M } when M < N -> M + end, + case Milli of + undefined -> undefined; + _ -> now_micros() + Milli * 1000 + end. drop_expired_messages(State = #q{ttl = undefined}) -> State; diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 734456d3..106cbbfa 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -20,7 +20,8 @@ -export([publish/4, publish/6, publish/1, message/3, message/4, properties/1, append_table_header/3, - extract_headers/1, map_headers/2, delivery/4, header_routes/1]). + extract_headers/1, map_headers/2, delivery/4, header_routes/1, + parse_expiration/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -72,6 +73,9 @@ binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). +-spec(parse_expiration/1 :: + (rabbit_framing:amqp_property_record()) + -> rabbit_types:ok_or_error2(non_neg_integer(), any())). -endif. @@ -226,3 +230,20 @@ header_routes(HeadersTable) -> {Type, _Val} -> throw({error, {unacceptable_type_in_header, binary_to_list(HeaderKey), Type}}) end || HeaderKey <- ?ROUTING_HEADERS]). + +parse_expiration(#'P_basic'{expiration = Expiration}) -> + case Expiration of + undefined -> {ok, undefined}; + B -> case string:to_integer(binary_to_list(B)) of + {error, no_integer} = E -> + E; + {N, ""} -> + case rabbit_misc:check_expiry_size(N) of + ok -> {ok, N}; + E = {error, _} -> E + end; + {_, S} -> + {error, {leftover_string, S}} + end + end. + diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 69fe0edc..78f5d3b8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -608,8 +608,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_internal_exchange(Exchange), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. - DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - check_user_id_header(DecodedContent#content.properties, State), + DecodedContent = #content {properties = Props} = + rabbit_binary_parser:ensure_content_decoded(Content), + check_user_id_header(Props, State), + case rabbit_basic:parse_expiration(Props) of + {ok, _} -> ok; + {error, E} -> rabbit_misc:protocol_error( + invalid_expiration, "cannot parse expiration '~p': ~p", + [Props#'P_basic'.expiration, E]) + end, {MsgSeqNo, State1} = case {TxStatus, ConfirmEnabled} of {none, false} -> {undefined, State}; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 20f541e5..7d3fb0ad 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -61,12 +61,16 @@ -export([os_cmd/1]). -export([gb_sets_difference/2]). -export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]). +-export([check_expiry_size/1]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal; R =:= shutdown). +%% This is dictated by `erlang:send_after' on which we depend to implement TTL. +-define(MAX_EXPIRY_TIMER, 4294967295). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -222,6 +226,7 @@ -spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error'). -spec(json_to_term/1 :: (any()) -> any()). -spec(term_to_json/1 :: (any()) -> any()). +-spec(check_expiry_size/1 :: (integer()) -> rabbit_types:ok_or_error(any())). -endif. @@ -974,3 +979,8 @@ term_to_json(L) when is_list(L) -> term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. + +check_expiry_size(N) when N > ?MAX_EXPIRY_TIMER -> + {error, {value_too_big, N}}; +check_expiry_size(N) -> + ok. |