summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-09-14 11:54:21 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-09-14 11:54:21 +0100
commitcd0ded1247c2b87825bb7418acdb8045d9bf004e (patch)
tree4741ad514c02f7c510dd1de14b0c38fadec2000b
parentfafb014bb2251d210ed972ceda796bf425ae72ab (diff)
parentde8ebece1efd57e697ffe0a444c2c98ea64331cb (diff)
downloadrabbitmq-server-cd0ded1247c2b87825bb7418acdb8045d9bf004e.tar.gz
merge default
In particular, enforce the limit introduced in bug 24867 with per-message TTL too.
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_amqqueue_process.erl24
-rw-r--r--src/rabbit_basic.erl23
-rw-r--r--src/rabbit_channel.erl11
-rw-r--r--src/rabbit_misc.erl10
5 files changed, 65 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d566ac87..090337b8 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,8 +40,6 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
--define(MAX_EXPIRY_TIMER, 4294967295).
-
-define(MORE_CONSUMER_CREDIT_AFTER, 50).
-define(FAILOVER_WAIT_MILLIS, 100).
@@ -399,18 +397,16 @@ check_int_arg({Type, _}, _) ->
check_positive_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}};
- ok when Val > 0 -> ok;
- ok -> {error, {value_zero_or_less, Val}};
- Error -> Error
+ ok when Val > 0 -> rabbit_misc:check_expiry_size(Val);
+ ok -> {error, {value_zero_or_less, Val}};
+ Error -> Error
end.
check_non_neg_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}};
- ok when Val >= 0 -> ok;
- ok -> {error, {value_less_than_zero, Val}};
- Error -> Error
+ ok when Val >= 0 -> rabbit_misc:check_expiry_size(Val);
+ ok -> {error, {value_less_than_zero, Val}};
+ Error -> Error
end.
check_dlxrk_arg({longstr, _}, Args) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 20ba4574..dc8ae711 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -540,7 +540,7 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
{false, BQS1} ->
deliver_msgs_to_consumers(
fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
- Props = message_properties(Confirm, State1),
+ Props = message_properties(Message, Confirm, State1),
{AckTag, BQS3} = BQ:publish_delivered(
AckRequired, Message, Props,
SenderPid, BQS2),
@@ -575,7 +575,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{false, State1} ->
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
- Props = message_properties(Confirm, State2),
+ Props = message_properties(Message, Confirm, State2),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
@@ -705,12 +705,24 @@ discard_delivery(#delivery{sender = SenderPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
-message_properties(Confirm, #q{ttl = TTL}) ->
- #message_properties{expiry = calculate_msg_expiry(TTL),
+message_properties(Message, Confirm, #q{ttl = TTL}) ->
+ #message_properties{expiry = calculate_msg_expiry(Message, TTL),
needs_confirming = needs_confirming(Confirm)}.
-calculate_msg_expiry(undefined) -> undefined;
-calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
+calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
+ #content{properties = Props} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ %% We assert that the expiration must be valid - we check in che channel.
+ Milli = case {rabbit_basic:parse_expiration(Props), TTL} of
+ {{ok, undefined}, _ } -> TTL;
+ {{ok, N }, undefined} -> N;
+ {{ok, N }, M } when N < M -> N;
+ {{ok, N }, M } when M < N -> M
+ end,
+ case Milli of
+ undefined -> undefined;
+ _ -> now_micros() + Milli * 1000
+ end.
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 734456d3..106cbbfa 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -20,7 +20,8 @@
-export([publish/4, publish/6, publish/1,
message/3, message/4, properties/1, append_table_header/3,
- extract_headers/1, map_headers/2, delivery/4, header_routes/1]).
+ extract_headers/1, map_headers/2, delivery/4, header_routes/1,
+ parse_expiration/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -72,6 +73,9 @@
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}).
+-spec(parse_expiration/1 ::
+ (rabbit_framing:amqp_property_record())
+ -> rabbit_types:ok_or_error2(non_neg_integer(), any())).
-endif.
@@ -226,3 +230,20 @@ header_routes(HeadersTable) ->
{Type, _Val} -> throw({error, {unacceptable_type_in_header,
binary_to_list(HeaderKey), Type}})
end || HeaderKey <- ?ROUTING_HEADERS]).
+
+parse_expiration(#'P_basic'{expiration = Expiration}) ->
+ case Expiration of
+ undefined -> {ok, undefined};
+ B -> case string:to_integer(binary_to_list(B)) of
+ {error, no_integer} = E ->
+ E;
+ {N, ""} ->
+ case rabbit_misc:check_expiry_size(N) of
+ ok -> {ok, N};
+ E = {error, _} -> E
+ end;
+ {_, S} ->
+ {error, {leftover_string, S}}
+ end
+ end.
+
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 69fe0edc..78f5d3b8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -608,8 +608,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_internal_exchange(Exchange),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- check_user_id_header(DecodedContent#content.properties, State),
+ DecodedContent = #content {properties = Props} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ check_user_id_header(Props, State),
+ case rabbit_basic:parse_expiration(Props) of
+ {ok, _} -> ok;
+ {error, E} -> rabbit_misc:protocol_error(
+ invalid_expiration, "cannot parse expiration '~p': ~p",
+ [Props#'P_basic'.expiration, E])
+ end,
{MsgSeqNo, State1} =
case {TxStatus, ConfirmEnabled} of
{none, false} -> {undefined, State};
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 20f541e5..7d3fb0ad 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -61,12 +61,16 @@
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
+-export([check_expiry_size/1]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal;
R =:= shutdown).
+%% This is dictated by `erlang:send_after' on which we depend to implement TTL.
+-define(MAX_EXPIRY_TIMER, 4294967295).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -222,6 +226,7 @@
-spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error').
-spec(json_to_term/1 :: (any()) -> any()).
-spec(term_to_json/1 :: (any()) -> any()).
+-spec(check_expiry_size/1 :: (integer()) -> rabbit_types:ok_or_error(any())).
-endif.
@@ -974,3 +979,8 @@ term_to_json(L) when is_list(L) ->
term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
+
+check_expiry_size(N) when N > ?MAX_EXPIRY_TIMER ->
+ {error, {value_too_big, N}};
+check_expiry_size(N) ->
+ ok.