summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-02-21 18:05:35 +0000
committerEmile Joubert <emile@rabbitmq.com>2013-02-21 18:05:35 +0000
commit9a1acf8cb9c863c5f5e46309abec07009224252d (patch)
tree94e1ee2f09dbcbc180f93dcd89e44b077d1dd5f5
parent97dada17a2770d5af602755a38bd6fbb812b8934 (diff)
downloadrabbitmq-server-9a1acf8cb9c863c5f5e46309abec07009224252d.tar.gz
Enforce queue limit with requeue
-rw-r--r--src/rabbit_amqqueue_process.erl66
1 files changed, 37 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 72d6ab43..08d68e4c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -560,46 +560,51 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
- {false, State2} ->
- State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- maybe_drop_head(State2),
- IsEmpty = BQ:is_empty(BQS),
+ {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- State4 = State3#q{backing_queue_state = BQS1},
+ {Dropped, State3} =
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ QLen = BQ:len(BQS1),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
- %% we only do that IFF the new message ends up at the head
- %% of the queue (because the queue was empty) and has an
- %% expiry. Only then may it need expiring straight away,
- %% or, if expiry is not due yet, the expiry timer may need
- %% (re)scheduling.
- case {IsEmpty, Props#message_properties.expiry} of
- {false, _} -> State4;
- {true, undefined} -> State4;
- {true, _} -> drop_expired_msgs(State4)
+ %% we only do that if a new message that might have an
+ %% expiry ends up at the head of the queue. If the head
+ %% remains unchanged, or if the newly published message
+ %% has no expiry and becomes the head of the queue then
+ %% the call is unnecessary.
+ case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
+ {false, false, _} -> State3;
+ {true, true, undefined} -> State3;
+ {_, _, _} -> drop_expired_msgs(State3)
end
end.
maybe_drop_head(State = #q{max_length = undefined}) ->
- State;
+ {0, State};
maybe_drop_head(State = #q{max_length = MaxLen,
backing_queue = BQ,
backing_queue_state = BQS}) ->
- case BQ:len(BQS) >= MaxLen of
- true -> with_dlx(
- State#q.dlx,
- fun (X) -> dead_letter_maxlen_msgs(X, State) end,
- fun () ->
- {_, BQS1} = BQ:drop(false, BQS),
- State#q{backing_queue_state = BQS1}
- end);
- false -> State
+ case BQ:len(BQS) - MaxLen of
+ Excess when Excess > 0 ->
+ {Excess,
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end,
+ fun () ->
+ {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) ->
+ BQ:drop(false, BQS0)
+ end, {ok, BQS},
+ lists:seq(1, Excess)),
+ State#q{backing_queue_state = BQS1}
+ end)};
+ _ -> {0, State}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
+ {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}),
+ run_message_queue(drop_expired_msgs(State1)).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -783,12 +788,15 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
-dead_letter_maxlen_msgs(X, State = #q{backing_queue = BQ}) ->
+dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
{ok, State1} =
dead_letter_msgs(
- fun (DLFun, Acc, BQS1) ->
- {{Msg, _, AckTag}, BQS2} = BQ:fetch(true, BQS1),
- {ok, DLFun(Msg, AckTag, Acc), BQS2}
+ fun (DLFun, Acc, BQS) ->
+ lists:foldl(fun (_, {ok, Acc0, BQS0}) ->
+ {{Msg, _, AckTag}, BQS1} =
+ BQ:fetch(true, BQS0),
+ {ok, DLFun(Msg, AckTag, Acc0), BQS1}
+ end, {ok, Acc, BQS}, lists:seq(1, Excess))
end, maxlen, X, State),
State1.