diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-13 17:24:19 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-13 17:24:19 +0000 |
commit | 5e4738ff018d27d9af2b23b974205c2dd4d4ad2b (patch) | |
tree | b4ad733b3dcf694751f6748eb1fb27b21ce254cf | |
parent | 3bd41da26d35ac05d6408496bcdd5a3da54997ce (diff) | |
parent | 9e125a71712752f84550490e357e867206e000a3 (diff) | |
download | rabbitmq-server-5e4738ff018d27d9af2b23b974205c2dd4d4ad2b.tar.gz |
merge bug19375 into default
-rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 65 |
2 files changed, 61 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ae7fe5c5..82ac74fa 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -407,7 +407,8 @@ args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, - {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}]. + {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, + {<<"x-max-length">>, fun check_max_length_arg/2}]. check_string_arg({longstr, _}, _Args) -> ok; check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. @@ -418,6 +419,13 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. +check_max_length_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_negative, Val}}; + Error -> Error + end. + check_expires_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val == 0 -> {error, {value_zero, Val}}; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fba58d38..18b641d4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -55,6 +55,7 @@ queue_monitors, dlx, dlx_routing_key, + max_length, status }). @@ -242,7 +243,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> [{<<"x-expires">>, fun init_expires/2}, {<<"x-dead-letter-exchange">>, fun init_dlx/2}, {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-max-length">>, fun init_max_length/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). @@ -254,6 +256,8 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> init_dlx_routing_key(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}. +init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = lists:foldl(fun (F, S) -> F(S) end, State, @@ -557,27 +561,50 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - IsEmpty = BQ:is_empty(BQS), BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - State3 = State2#q{backing_queue_state = BQS1}, + {Dropped, State3 = #q{backing_queue_state = BQS2}} = + maybe_drop_head(State2#q{backing_queue_state = BQS1}), + QLen = BQ:len(BQS2), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so - %% we only do that IFF the new message ends up at the head - %% of the queue (because the queue was empty) and has an - %% expiry. Only then may it need expiring straight away, - %% or, if expiry is not due yet, the expiry timer may need - %% (re)scheduling. - case {IsEmpty, Props#message_properties.expiry} of - {false, _} -> State3; - {true, undefined} -> State3; - {true, _} -> drop_expired_msgs(State3) + %% we only do that if a new message that might have an + %% expiry ends up at the head of the queue. If the head + %% remains unchanged, or if the newly published message + %% has no expiry and becomes the head of the queue then + %% the call is unnecessary. + case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of + {false, false, _} -> State3; + {true, true, undefined} -> State3; + {_, _, _} -> drop_expired_msgs(State3) end end. +maybe_drop_head(State = #q{max_length = undefined}) -> + {0, State}; +maybe_drop_head(State = #q{max_length = MaxLen, + backing_queue = BQ, + backing_queue_state = BQS}) -> + case BQ:len(BQS) - MaxLen of + Excess when Excess > 0 -> + {Excess, + with_dlx( + State#q.dlx, + fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end, + fun () -> + {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) -> + BQ:drop(false, BQS0) + end, {ok, BQS}, + lists:seq(1, Excess)), + State#q{backing_queue_state = BQS1} + end)}; + _ -> {0, State} + end. + requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})). + {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}), + run_message_queue(drop_expired_msgs(State1)). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -761,6 +788,18 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> end, rejected, X, State), State1. +dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> + {ok, State1} = + dead_letter_msgs( + fun (DLFun, Acc, BQS) -> + lists:foldl(fun (_, {ok, Acc0, BQS0}) -> + {{Msg, _, AckTag}, BQS1} = + BQ:fetch(true, BQS0), + {ok, DLFun(Msg, AckTag, Acc0), BQS1} + end, {ok, Acc, BQS}, lists:seq(1, Excess)) + end, maxlen, X, State), + State1. + dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, publish_seqno = SeqNo0, unconfirmed = UC0, |