diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-12-12 17:48:56 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-12-12 17:48:56 +0000 |
commit | a381c38a2965c0888589635c233afd7298838be9 (patch) | |
tree | 28d5da256ca97847ffaf75e30b33422bb3beee53 | |
parent | efdcf7500e22c1cd5508e6cfde5eab5936b326f0 (diff) | |
download | rabbitmq-server-a381c38a2965c0888589635c233afd7298838be9.tar.gz |
Limit queue depth
Without dead-lettering yet
-rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 39 |
2 files changed, 43 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1b6cc223..9c089973 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -387,7 +387,8 @@ check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, - {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], + {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, + {<<"x-maxdepth">>, fun check_maxdepth_arg/2}], [case rabbit_misc:table_lookup(Args, Key) of undefined -> ok; TypeVal -> case Fun(TypeVal, Args) of @@ -410,6 +411,12 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. +check_maxdepth_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val =< 0 -> {error, {value_non_positive, Val}}; + X -> X + end. + check_expires_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val == 0 -> {error, {value_zero, Val}}; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2ffa2a1a..0de9b4e4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,7 +54,8 @@ delayed_stop, queue_monitors, dlx, - dlx_routing_key + dlx_routing_key, + max_depth }). -record(consumer, {tag, ack_required}). @@ -134,6 +135,7 @@ init(Q) -> senders = pmon:new(), dlx = undefined, dlx_routing_key = undefined, + max_depth = undefined, publish_seqno = 1, unconfirmed = dtree:empty(), delayed_stop = undefined, @@ -159,6 +161,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, expiry_timer_ref = undefined, ttl = undefined, + max_depth = undefined, senders = Senders, publish_seqno = 1, unconfirmed = dtree:empty(), @@ -258,7 +261,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> [{<<"x-expires">>, fun init_expires/2}, {<<"x-dead-letter-exchange">>, fun init_dlx/2}, {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-maxdepth">>, fun init_maxdepth/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). @@ -270,6 +274,9 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> init_dlx_routing_key(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}. +init_maxdepth(MaxDepth, State) -> + State#q{max_depth = MaxDepth}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -571,12 +578,36 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% The next one is an optimisation {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); - {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), + {false, State2} -> + BQS1 = publish_max(Message, Props, Delivered, SenderPid, State2), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) end. +publish_max(Message, Props, Delivered, SenderPid, + State = #q{backing_queue = BQ, + backing_queue_state = BQS, + max_depth = undefined }) -> + BQ:publish(Message, Props, Delivered, SenderPid, BQS); +publish_max(Message, Props, Delivered, SenderPid, + State = #q{backing_queue = BQ, + backing_queue_state = BQS, + max_depth = MaxDepth }) -> + Depth = BQ:depth(BQS), + case Depth >= MaxDepth of + true -> + Length = BQ:len(BQS), + case Length >= MaxDepth of + false -> + BQS; + true -> + {M, BQS1} = BQ:fetch(false, BQS), + BQ:publish(Message, Props, Delivered, SenderPid, BQS1) + end; + false-> + BQ:publish(Message, Props, Delivered, SenderPid, BQS) + end. + requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), |