summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-03-21 10:42:41 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-03-21 10:42:41 +0000
commite222e21a4a18a674a82a90f8e23acbfb42d2fc02 (patch)
tree4e29c1d677aa3819d606c94072c2d363b9872ddb /src/rabbit_amqqueue_process.erl
parent3a9022f793632b7ab779cc0dedfe1083f6b85cd9 (diff)
parent904c58e4fcce4938c6a8725f484e2b78813c6986 (diff)
downloadrabbitmq-server-e222e21a4a18a674a82a90f8e23acbfb42d2fc02.tar.gz
Merge in defaultbug23378
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl65
1 files changed, 52 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1d332f67..0d5fd304 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -55,6 +55,7 @@
queue_monitors,
dlx,
dlx_routing_key,
+ max_length,
status
}).
@@ -244,7 +245,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
[{<<"x-expires">>, fun init_expires/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
{<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-max-length">>, fun init_max_length/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -256,6 +258,8 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
init_dlx_routing_key(RoutingKey, State) ->
State#q{dlx_routing_key = RoutingKey}.
+init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
lists:foldl(fun (F, S) -> F(S) end, State,
@@ -559,27 +563,50 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- IsEmpty = BQ:is_empty(BQS),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- State3 = State2#q{backing_queue_state = BQS1},
+ {Dropped, State3 = #q{backing_queue_state = BQS2}} =
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
- %% we only do that IFF the new message ends up at the head
- %% of the queue (because the queue was empty) and has an
- %% expiry. Only then may it need expiring straight away,
- %% or, if expiry is not due yet, the expiry timer may need
- %% (re)scheduling.
- case {IsEmpty, Props#message_properties.expiry} of
- {false, _} -> State3;
- {true, undefined} -> State3;
- {true, _} -> drop_expired_msgs(State3)
+ %% we only do that if a new message that might have an
+ %% expiry ends up at the head of the queue. If the head
+ %% remains unchanged, or if the newly published message
+ %% has no expiry and becomes the head of the queue then
+ %% the call is unnecessary.
+ case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
+ {false, false, _} -> State3;
+ {true, true, undefined} -> State3;
+ {_, _, _} -> drop_expired_msgs(State3)
end
end.
+maybe_drop_head(State = #q{max_length = undefined}) ->
+ {0, State};
+maybe_drop_head(State = #q{max_length = MaxLen,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case BQ:len(BQS) - MaxLen of
+ Excess when Excess > 0 ->
+ {Excess,
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end,
+ fun () ->
+ {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) ->
+ BQ:drop(false, BQS0)
+ end, {ok, BQS},
+ lists:seq(1, Excess)),
+ State#q{backing_queue_state = BQS1}
+ end)};
+ _ -> {0, State}
+ end.
+
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
+ {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}),
+ run_message_queue(drop_expired_msgs(State1)).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -763,6 +790,18 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
+dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
+ {ok, State1} =
+ dead_letter_msgs(
+ fun (DLFun, Acc, BQS) ->
+ lists:foldl(fun (_, {ok, Acc0, BQS0}) ->
+ {{Msg, _, AckTag}, BQS1} =
+ BQ:fetch(true, BQS0),
+ {ok, DLFun(Msg, AckTag, Acc0), BQS1}
+ end, {ok, Acc, BQS}, lists:seq(1, Excess))
+ end, maxlen, X, State),
+ State1.
+
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
publish_seqno = SeqNo0,
unconfirmed = UC0,