summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-12-12 17:48:56 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-12-12 17:48:56 +0000
commita381c38a2965c0888589635c233afd7298838be9 (patch)
tree28d5da256ca97847ffaf75e30b33422bb3beee53
parentefdcf7500e22c1cd5508e6cfde5eab5936b326f0 (diff)
downloadrabbitmq-server-a381c38a2965c0888589635c233afd7298838be9.tar.gz
Limit queue depth
Without dead-lettering yet
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl39
2 files changed, 43 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1b6cc223..9c089973 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -387,7 +387,8 @@ check_declare_arguments(QueueName, Args) ->
Checks = [{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
- {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
+ {<<"x-maxdepth">>, fun check_maxdepth_arg/2}],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
TypeVal -> case Fun(TypeVal, Args) of
@@ -410,6 +411,12 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
+check_maxdepth_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val =< 0 -> {error, {value_non_positive, Val}};
+ X -> X
+ end.
+
check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2ffa2a1a..0de9b4e4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -54,7 +54,8 @@
delayed_stop,
queue_monitors,
dlx,
- dlx_routing_key
+ dlx_routing_key,
+ max_depth
}).
-record(consumer, {tag, ack_required}).
@@ -134,6 +135,7 @@ init(Q) ->
senders = pmon:new(),
dlx = undefined,
dlx_routing_key = undefined,
+ max_depth = undefined,
publish_seqno = 1,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
@@ -159,6 +161,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rate_timer_ref = RateTRef,
expiry_timer_ref = undefined,
ttl = undefined,
+ max_depth = undefined,
senders = Senders,
publish_seqno = 1,
unconfirmed = dtree:empty(),
@@ -258,7 +261,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
[{<<"x-expires">>, fun init_expires/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
{<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-maxdepth">>, fun init_maxdepth/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -270,6 +274,9 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
init_dlx_routing_key(RoutingKey, State) ->
State#q{dlx_routing_key = RoutingKey}.
+init_maxdepth(MaxDepth, State) ->
+ State#q{max_depth = MaxDepth}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -571,12 +578,36 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
- {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
+ {false, State2} ->
+ BQS1 = publish_max(Message, Props, Delivered, SenderPid, State2),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
+publish_max(Message, Props, Delivered, SenderPid,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ max_depth = undefined }) ->
+ BQ:publish(Message, Props, Delivered, SenderPid, BQS);
+publish_max(Message, Props, Delivered, SenderPid,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ max_depth = MaxDepth }) ->
+ Depth = BQ:depth(BQS),
+ case Depth >= MaxDepth of
+ true ->
+ Length = BQ:len(BQS),
+ case Length >= MaxDepth of
+ false ->
+ BQS;
+ true ->
+ {M, BQS1} = BQ:fetch(false, BQS),
+ BQ:publish(Message, Props, Delivered, SenderPid, BQS1)
+ end;
+ false->
+ BQ:publish(Message, Props, Delivered, SenderPid, BQS)
+ end.
+
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),