summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-09 15:36:18 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-09 15:36:18 +0000
commit570e697c41496458d87c261c294dd29c24580856 (patch)
tree509543429c62126240c6a40251dbd83dee204dea
parent626142a346e5d7cf25f1b249c518b2e723c8d83a (diff)
parent48b9402395b9358442bcd6222900f9f89262879f (diff)
downloadrabbitmq-server-570e697c41496458d87c261c294dd29c24580856.tar.gz
merge default into bug19375
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl38
2 files changed, 44 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2477b891..6a31b24d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -407,7 +407,8 @@ args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
- {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}].
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
+ {<<"x-max-length">>, fun check_max_length_arg/2}].
check_string_arg({longstr, _}, _Args) -> ok;
check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}.
@@ -418,6 +419,13 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
+check_max_length_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_negative, Val}};
+ Error -> Error
+ end.
+
check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e76bf6ea..904eb6d0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -55,6 +55,7 @@
queue_monitors,
dlx,
dlx_routing_key,
+ max_length,
status
}).
@@ -243,7 +244,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
[{<<"x-expires">>, fun init_expires/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
{<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-max-length">>, fun init_max_length/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -255,6 +257,8 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
init_dlx_routing_key(RoutingKey, State) ->
State#q{dlx_routing_key = RoutingKey}.
+init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -551,10 +555,32 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
- {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ {false, State2} ->
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_drop_head(State2),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ State3#q{backing_queue_state = BQS1})
+ end.
+
+maybe_drop_head(State = #q{max_length = undefined}) ->
+ State;
+maybe_drop_head(State = #q{max_length = MaxLen,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case BQ:len(BQS) >= MaxLen of
+ true ->
+ with_dlx(State#q.dlx,
+ fun (X) ->
+ {ok, State1} = dead_letter_maxlen_msgs(X, State),
+ State1
+ end,
+ fun () ->
+ {_, BQS1} = BQ:drop(false, BQS),
+ State#q{backing_queue_state = BQS1}
+ end);
+ false ->
+ State
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
@@ -744,6 +770,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
+dead_letter_maxlen_msgs(X, State = #q{backing_queue = BQ}) ->
+ dead_letter_msgs(fun (DLFun, Acc, BQS1) ->
+ {{Msg, _, AckTag}, BQS2} = BQ:fetch(true, BQS1),
+ {ok, DLFun(Msg, AckTag, Acc), BQS2}
+ end, maxlen, X, State).
+
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
publish_seqno = SeqNo0,
unconfirmed = UC0,