diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-04 20:02:36 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-04 20:02:36 +0000 |
commit | ddf6b31dd5cdf75815056df9aae9be9c63068f38 (patch) | |
tree | 419db1aa25122c75877917d2f134af88a8fa1a6c | |
parent | feddbf3ec5fdb0a112cbd770bed20ea9cd3e02fc (diff) | |
parent | dc87918e5f1160d7f7c72b55ab484720072b26af (diff) | |
download | rabbitmq-server-ddf6b31dd5cdf75815056df9aae9be9c63068f38.tar.gz |
merge default into bug19375
-rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 27 |
2 files changed, 32 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fbe146e8..a9f2f390 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -391,7 +391,8 @@ check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, - {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], + {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, + {<<"x-max-length">>, fun check_max_length_arg/2}], [case rabbit_misc:table_lookup(Args, Key) of undefined -> ok; TypeVal -> case Fun(TypeVal, Args) of @@ -414,6 +415,13 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. +check_max_length_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_negative, Val}}; + Error -> Error + end. + check_expires_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val == 0 -> {error, {value_zero, Val}}; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 35f0b816..004180db 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,7 +54,8 @@ delayed_stop, queue_monitors, dlx, - dlx_routing_key + dlx_routing_key, + max_length }). -record(consumer, {tag, ack_required}). @@ -240,7 +241,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> [{<<"x-expires">>, fun init_expires/2}, {<<"x-dead-letter-exchange">>, fun init_dlx/2}, {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-max-length">>, fun init_max_length/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). @@ -252,6 +254,9 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> init_dlx_routing_key(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}. +init_max_length(MaxLen, State) -> + State#q{max_length = MaxLen}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -545,10 +550,24 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% The next one is an optimisation {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); - {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> + {false, State2} -> + State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = + maybe_drop_head(State2), BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, - State2#q{backing_queue_state = BQS1}) + State3#q{backing_queue_state = BQS1}) + end. + +maybe_drop_head(State = #q{max_length = undefined}) -> + State; +maybe_drop_head(State = #q{max_length = MaxLen, + backing_queue = BQ, + backing_queue_state = BQS}) -> + case BQ:len(BQS) >= MaxLen of + true -> {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS), + (dead_letter_fun(maxlen))([{Msg, AckTag}]), + State#q{backing_queue_state = BQS1}; + false -> State end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, |