diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-12 12:48:28 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-12 12:48:28 +0100 |
commit | d6530fd55ad5e0850a7f0fb63930c2070187a09c (patch) | |
tree | edd91261e0008fdc197f7bf1c751973c4dcc9d73 | |
parent | 959a65ae92e47ba2fae416278fefb1cec330c61b (diff) | |
download | rabbitmq-server-d6530fd55ad5e0850a7f0fb63930c2070187a09c.tar.gz |
Max-bytes argument / policy.
-rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 64 | ||||
-rw-r--r-- | src/rabbit_policies.erl | 10 |
3 files changed, 47 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4e23dbd2..29ce3f03 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -461,7 +461,8 @@ declare_args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, - {<<"x-max-length">>, fun check_non_neg_int_arg/2}]. + {<<"x-max-length">>, fun check_non_neg_int_arg/2}, + {<<"x-max-bytes">>, fun check_non_neg_int_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ba1517af..78bd367c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -52,6 +52,7 @@ dlx, dlx_routing_key, max_length, + max_bytes, args_policy_version, status }). @@ -265,7 +266,8 @@ process_args_policy(State = #q{q = Q, {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2}, {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, - {<<"max-length">>, fun res_min/2, fun init_max_length/2}], + {<<"max-length">>, fun res_min/2, fun init_max_length/2}, + {<<"max-bytes">>, fun res_min/2, fun init_max_bytes/2}], drop_expired_msgs( lists:foldl(fun({Name, Resolve, Fun}, StateN) -> Fun(args_policy_lookup(Name, Resolve, Q), StateN) @@ -304,6 +306,10 @@ init_max_length(MaxLen, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}), State1. +init_max_bytes(MaxBytes, State) -> + {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), + State1. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS, consumers = Consumers} = lists:foldl(fun (F, S) -> F(S) end, State, @@ -543,34 +549,41 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% remains unchanged, or if the newly published message %% has no expiry and becomes the head of the queue then %% the call is unnecessary. - case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of + case {Dropped, QLen =:= 1, Props#message_properties.expiry} of {false, false, _} -> State4; {true, true, undefined} -> State4; {_, _, _} -> drop_expired_msgs(State4) end end. -maybe_drop_head(State = #q{max_length = undefined}) -> - {0, State}; -maybe_drop_head(State = #q{max_length = MaxLen, - backing_queue = BQ, - backing_queue_state = BQS}) -> - case BQ:len(BQS) - MaxLen of - Excess when Excess > 0 -> - {Excess, - with_dlx( - State#q.dlx, - fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end, - fun () -> - {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) -> - BQ:drop(false, BQS0) - end, {ok, BQS}, - lists:seq(1, Excess)), - State#q{backing_queue_state = BQS1} - end)}; - _ -> {0, State} +maybe_drop_head(State = #q{max_length = undefined, + max_bytes = undefined}) -> + {false, State}; +maybe_drop_head(State) -> + maybe_drop_head(false, State). + +maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + case over_max_length(State) of + true -> + maybe_drop_head(true, + with_dlx( + State#q.dlx, + fun (X) -> dead_letter_maxlen_msg(X, State) end, + fun () -> + {_, BQS1} = BQ:drop(false, BQS), + State#q{backing_queue_state = BQS1} + end)); + false -> + {AlreadyDropped, State} end. +over_max_length(#q{max_length = MaxLen, + max_bytes = MaxBytes, + backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ:len(BQS) > MaxLen orelse BQ:info(message_bytes, BQS) > MaxBytes. + requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> WasEmpty = BQ:is_empty(BQS), @@ -726,15 +739,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> end, rejected, X, State), State1. -dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> +dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) -> {ok, State1} = dead_letter_msgs( fun (DLFun, Acc, BQS) -> - lists:foldl(fun (_, {ok, Acc0, BQS0}) -> - {{Msg, _, AckTag}, BQS1} = - BQ:fetch(true, BQS0), - {ok, DLFun(Msg, AckTag, Acc0), BQS1} - end, {ok, Acc, BQS}, lists:seq(1, Excess)) + {{Msg, _, AckTag}, BQS1} = BQ:fetch(true, BQS), + {ok, DLFun(Msg, AckTag, Acc), BQS1} end, maxlen, X, State), State1. diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 3558cf98..6903c42e 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -34,7 +34,8 @@ register() -> {policy_validator, <<"dead-letter-routing-key">>}, {policy_validator, <<"message-ttl">>}, {policy_validator, <<"expires">>}, - {policy_validator, <<"max-length">>}]], + {policy_validator, <<"max-length">>}, + {policy_validator, <<"max-bytes">>}]], ok. validate_policy(Terms) -> @@ -76,6 +77,11 @@ validate_policy0(<<"max-length">>, Value) when is_integer(Value), Value >= 0 -> ok; validate_policy0(<<"max-length">>, Value) -> - {error, "~p is not a valid maximum length", [Value]}. + {error, "~p is not a valid maximum length", [Value]}; +validate_policy0(<<"max-bytes">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"max-bytes">>, Value) -> + {error, "~p is not a valid maximum number of bytes", [Value]}. |