summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-01-09 12:27:09 +0000
committerEmile Joubert <emile@rabbitmq.com>2013-01-09 12:27:09 +0000
commit48b9402395b9358442bcd6222900f9f89262879f (patch)
treeb6f08416af89c65dedab77446a1b268098f30a2c
parent06ed05a9ab5b603aa20b022c4463f89f8b76c582 (diff)
parent253bc8f774078ef98bfc55f2373c948c08d7124d (diff)
downloadrabbitmq-server-48b9402395b9358442bcd6222900f9f89262879f.tar.gz
Merged default into bug19375
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl40
2 files changed, 45 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 94150f1c..c974e528 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -405,7 +405,8 @@ args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
- {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}].
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
+ {<<"x-max-length">>, fun check_max_length_arg/2}].
check_string_arg({longstr, _}, _Args) -> ok;
check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}.
@@ -416,6 +417,13 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
+check_max_length_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_negative, Val}};
+ Error -> Error
+ end.
+
check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 536ed48a..0753d98c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -54,7 +54,8 @@
delayed_stop,
queue_monitors,
dlx,
- dlx_routing_key
+ dlx_routing_key,
+ max_length
}).
-record(consumer, {tag, ack_required}).
@@ -240,7 +241,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
[{<<"x-expires">>, fun init_expires/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
{<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-max-length">>, fun init_max_length/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -252,6 +254,8 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
init_dlx_routing_key(RoutingKey, State) ->
State#q{dlx_routing_key = RoutingKey}.
+init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -548,10 +552,32 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
- {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ {false, State2} ->
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_drop_head(State2),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ State3#q{backing_queue_state = BQS1})
+ end.
+
+maybe_drop_head(State = #q{max_length = undefined}) ->
+ State;
+maybe_drop_head(State = #q{max_length = MaxLen,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case BQ:len(BQS) >= MaxLen of
+ true ->
+ with_dlx(State#q.dlx,
+ fun (X) ->
+ {ok, State1} = dead_letter_maxlen_msgs(X, State),
+ State1
+ end,
+ fun () ->
+ {_, BQS1} = BQ:drop(false, BQS),
+ State#q{backing_queue_state = BQS1}
+ end);
+ false ->
+ State
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
@@ -741,6 +767,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
+dead_letter_maxlen_msgs(X, State = #q{backing_queue = BQ}) ->
+ dead_letter_msgs(fun (DLFun, Acc, BQS1) ->
+ {{Msg, _, AckTag}, BQS2} = BQ:fetch(true, BQS1),
+ {ok, DLFun(Msg, AckTag, Acc), BQS2}
+ end, maxlen, X, State).
+
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
publish_seqno = SeqNo0,
unconfirmed = UC0,