summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-06 13:06:42 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-06 13:06:42 +0000
commit25d7aaabd10557a5d90c26edcbd8db4ba489b1d4 (patch)
treef2696377c2fa676b2130d53c4fa0fdc04770bbb5
parent6d2246365ddeb028503f70dce556bd6a41fe6de1 (diff)
parentae9b697c58b7155e4eec81be145072b373e3fe7e (diff)
downloadrabbitmq-server-25d7aaabd10557a5d90c26edcbd8db4ba489b1d4.tar.gz
merge bug25277 into default
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_basic.erl22
-rw-r--r--src/rabbit_channel.erl13
-rw-r--r--src/rabbit_misc.erl9
5 files changed, 62 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 621da633..922951be 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -41,8 +41,6 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
--define(MAX_EXPIRY_TIMER, 4294967295).
-
-define(MORE_CONSUMER_CREDIT_AFTER, 50).
-define(FAILOVER_WAIT_MILLIS, 100).
@@ -373,8 +371,8 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]).
check_declare_arguments(QueueName, Args) ->
- Checks = [{<<"x-expires">>, fun check_positive_int_arg/2},
- {<<"x-message-ttl">>, fun check_non_neg_int_arg/2},
+ Checks = [{<<"x-expires">>, fun check_expires_arg/2},
+ {<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
[case rabbit_misc:table_lookup(Args, Key) of
@@ -401,20 +399,17 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
-check_positive_int_arg({Type, Val}, Args) ->
+check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}};
- ok when Val > 0 -> ok;
- ok -> {error, {value_zero_or_less, Val}};
- Error -> Error
+ ok when Val == 0 -> {error, {value_zero, Val}};
+ ok -> rabbit_misc:check_expiry(Val);
+ Error -> Error
end.
-check_non_neg_int_arg({Type, Val}, Args) ->
+check_message_ttl_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}};
- ok when Val >= 0 -> ok;
- ok -> {error, {value_less_than_zero, Val}};
- Error -> Error
+ ok -> rabbit_misc:check_expiry(Val);
+ Error -> Error
end.
check_dlxrk_arg({longstr, _}, Args) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1bd1a45a..43fe3578 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -553,7 +553,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Confirm, Delivered, State),
+ Props = message_properties(Message, Confirm, Delivered, State),
case attempt_delivery(Delivery, Props, State1) of
{true, State2} ->
State2;
@@ -680,16 +680,21 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
Fun(State)
end.
-message_properties(Confirm, Delivered, #q{ttl = TTL}) ->
- #message_properties{expiry = calculate_msg_expiry(TTL),
+message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) ->
+ #message_properties{expiry = calculate_msg_expiry(Message, TTL),
needs_confirming = Confirm == eventually,
delivered = Delivered}.
-calculate_msg_expiry(undefined) -> undefined;
-calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
+calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
+ #content{properties = Props} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ %% We assert that the expiration must be valid - we check in the channel.
+ {ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
+ case lists:min([TTL, MsgTTL]) of
+ undefined -> undefined;
+ T -> now_micros() + T * 1000
+ end.
-drop_expired_messages(State = #q{ttl = undefined}) ->
- State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
@@ -711,8 +716,6 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
ensure_ttl_timer(undefined, State) ->
State;
-ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) ->
- State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
After = (case Expiry - now_micros() of
V when V > 0 -> V + 999; %% always fire later
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 9966c0df..9bd1fad9 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -20,7 +20,8 @@
-export([publish/4, publish/5, publish/1,
message/3, message/4, properties/1, prepend_table_header/3,
- extract_headers/1, map_headers/2, delivery/3, header_routes/1]).
+ extract_headers/1, map_headers/2, delivery/3, header_routes/1,
+ parse_expiration/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -72,6 +73,9 @@
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}).
+-spec(parse_expiration/1 ::
+ (rabbit_framing:amqp_property_record())
+ -> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())).
-endif.
@@ -254,3 +258,19 @@ header_routes(HeadersTable) ->
{Type, _Val} -> throw({error, {unacceptable_type_in_header,
binary_to_list(HeaderKey), Type}})
end || HeaderKey <- ?ROUTING_HEADERS]).
+
+parse_expiration(#'P_basic'{expiration = undefined}) ->
+ {ok, undefined};
+parse_expiration(#'P_basic'{expiration = Expiration}) ->
+ case string:to_integer(binary_to_list(Expiration)) of
+ {error, no_integer} = E ->
+ E;
+ {N, ""} ->
+ case rabbit_misc:check_expiry(N) of
+ ok -> {ok, N};
+ E = {error, _} -> E
+ end;
+ {_, S} ->
+ {error, {leftover_string, S}}
+ end.
+
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c6a33c72..a94d2ab5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -474,6 +474,13 @@ check_user_id_header(#'P_basic'{user_id = Claimed},
"'~s'", [Claimed, Actual])
end.
+check_expiration_header(Props) ->
+ case rabbit_basic:parse_expiration(Props) of
+ {ok, _} -> ok;
+ {error, E} -> precondition_failed("invalid expiration '~s': ~p",
+ [Props#'P_basic'.expiration, E])
+ end.
+
check_internal_exchange(#exchange{name = Name, internal = true}) ->
rabbit_misc:protocol_error(access_refused,
"cannot publish to internal ~s",
@@ -614,8 +621,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_internal_exchange(Exchange),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- check_user_id_header(DecodedContent#content.properties, State),
+ DecodedContent = #content {properties = Props} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ check_user_id_header(Props, State),
+ check_expiration_header(Props),
{MsgSeqNo, State1} =
case {TxStatus, ConfirmEnabled} of
{none, false} -> {undefined, State};
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 78f25175..7e3cc3d7 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -63,6 +63,7 @@
-export([version/0]).
-export([sequence_error/1]).
-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
+-export([check_expiry/1]).
-export([base64url/1]).
%% Horrible macro to use in guards
@@ -70,6 +71,9 @@
R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal;
R =:= shutdown).
+%% This is dictated by `erlang:send_after' on which we depend to implement TTL.
+-define(MAX_EXPIRY_TIMER, 4294967295).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -229,6 +233,7 @@
-spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error').
-spec(json_to_term/1 :: (any()) -> any()).
-spec(term_to_json/1 :: (any()) -> any()).
+-spec(check_expiry/1 :: (integer()) -> rabbit_types:ok_or_error(any())).
-spec(base64url/1 :: (binary()) -> string()).
-endif.
@@ -1000,6 +1005,10 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
+check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}};
+check_expiry(N) when N < 0 -> {error, {value_negative, N}};
+check_expiry(_N) -> ok.
+
base64url(In) ->
lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
($\/, Acc) -> [$\_ | Acc];