summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-12 12:48:28 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-12 12:48:28 +0100
commitd6530fd55ad5e0850a7f0fb63930c2070187a09c (patch)
treeedd91261e0008fdc197f7bf1c751973c4dcc9d73
parent959a65ae92e47ba2fae416278fefb1cec330c61b (diff)
downloadrabbitmq-server-d6530fd55ad5e0850a7f0fb63930c2070187a09c.tar.gz
Max-bytes argument / policy.
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl64
-rw-r--r--src/rabbit_policies.erl10
3 files changed, 47 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4e23dbd2..29ce3f03 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -461,7 +461,8 @@ declare_args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
- {<<"x-max-length">>, fun check_non_neg_int_arg/2}].
+ {<<"x-max-length">>, fun check_non_neg_int_arg/2},
+ {<<"x-max-bytes">>, fun check_non_neg_int_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ba1517af..78bd367c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -52,6 +52,7 @@
dlx,
dlx_routing_key,
max_length,
+ max_bytes,
args_policy_version,
status
}).
@@ -265,7 +266,8 @@ process_args_policy(State = #q{q = Q,
{<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2},
{<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
- {<<"max-length">>, fun res_min/2, fun init_max_length/2}],
+ {<<"max-length">>, fun res_min/2, fun init_max_length/2},
+ {<<"max-bytes">>, fun res_min/2, fun init_max_bytes/2}],
drop_expired_msgs(
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
Fun(args_policy_lookup(Name, Resolve, Q), StateN)
@@ -304,6 +306,10 @@ init_max_length(MaxLen, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}),
State1.
+init_max_bytes(MaxBytes, State) ->
+ {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
+ State1.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
lists:foldl(fun (F, S) -> F(S) end, State,
@@ -543,34 +549,41 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
%% remains unchanged, or if the newly published message
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
- case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
+ case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
{false, false, _} -> State4;
{true, true, undefined} -> State4;
{_, _, _} -> drop_expired_msgs(State4)
end
end.
-maybe_drop_head(State = #q{max_length = undefined}) ->
- {0, State};
-maybe_drop_head(State = #q{max_length = MaxLen,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case BQ:len(BQS) - MaxLen of
- Excess when Excess > 0 ->
- {Excess,
- with_dlx(
- State#q.dlx,
- fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end,
- fun () ->
- {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) ->
- BQ:drop(false, BQS0)
- end, {ok, BQS},
- lists:seq(1, Excess)),
- State#q{backing_queue_state = BQS1}
- end)};
- _ -> {0, State}
+maybe_drop_head(State = #q{max_length = undefined,
+ max_bytes = undefined}) ->
+ {false, State};
+maybe_drop_head(State) ->
+ maybe_drop_head(false, State).
+
+maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case over_max_length(State) of
+ true ->
+ maybe_drop_head(true,
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_maxlen_msg(X, State) end,
+ fun () ->
+ {_, BQS1} = BQ:drop(false, BQS),
+ State#q{backing_queue_state = BQS1}
+ end));
+ false ->
+ {AlreadyDropped, State}
end.
+over_max_length(#q{max_length = MaxLen,
+ max_bytes = MaxBytes,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ:len(BQS) > MaxLen orelse BQ:info(message_bytes, BQS) > MaxBytes.
+
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
WasEmpty = BQ:is_empty(BQS),
@@ -726,15 +739,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
-dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
+dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) ->
{ok, State1} =
dead_letter_msgs(
fun (DLFun, Acc, BQS) ->
- lists:foldl(fun (_, {ok, Acc0, BQS0}) ->
- {{Msg, _, AckTag}, BQS1} =
- BQ:fetch(true, BQS0),
- {ok, DLFun(Msg, AckTag, Acc0), BQS1}
- end, {ok, Acc, BQS}, lists:seq(1, Excess))
+ {{Msg, _, AckTag}, BQS1} = BQ:fetch(true, BQS),
+ {ok, DLFun(Msg, AckTag, Acc), BQS1}
end, maxlen, X, State),
State1.
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 3558cf98..6903c42e 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -34,7 +34,8 @@ register() ->
{policy_validator, <<"dead-letter-routing-key">>},
{policy_validator, <<"message-ttl">>},
{policy_validator, <<"expires">>},
- {policy_validator, <<"max-length">>}]],
+ {policy_validator, <<"max-length">>},
+ {policy_validator, <<"max-bytes">>}]],
ok.
validate_policy(Terms) ->
@@ -76,6 +77,11 @@ validate_policy0(<<"max-length">>, Value)
when is_integer(Value), Value >= 0 ->
ok;
validate_policy0(<<"max-length">>, Value) ->
- {error, "~p is not a valid maximum length", [Value]}.
+ {error, "~p is not a valid maximum length", [Value]};
+validate_policy0(<<"max-bytes">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"max-bytes">>, Value) ->
+ {error, "~p is not a valid maximum number of bytes", [Value]}.