diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-02-21 18:05:35 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-02-21 18:05:35 +0000 |
commit | 9a1acf8cb9c863c5f5e46309abec07009224252d (patch) | |
tree | 94e1ee2f09dbcbc180f93dcd89e44b077d1dd5f5 | |
parent | 97dada17a2770d5af602755a38bd6fbb812b8934 (diff) | |
download | rabbitmq-server-9a1acf8cb9c863c5f5e46309abec07009224252d.tar.gz |
Enforce queue limit with requeue
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 66 |
1 files changed, 37 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 72d6ab43..08d68e4c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -560,46 +560,51 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% The next one is an optimisation {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); - {false, State2} -> - State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = - maybe_drop_head(State2), - IsEmpty = BQ:is_empty(BQS), + {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - State4 = State3#q{backing_queue_state = BQS1}, + {Dropped, State3} = + maybe_drop_head(State2#q{backing_queue_state = BQS1}), + QLen = BQ:len(BQS1), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so - %% we only do that IFF the new message ends up at the head - %% of the queue (because the queue was empty) and has an - %% expiry. Only then may it need expiring straight away, - %% or, if expiry is not due yet, the expiry timer may need - %% (re)scheduling. - case {IsEmpty, Props#message_properties.expiry} of - {false, _} -> State4; - {true, undefined} -> State4; - {true, _} -> drop_expired_msgs(State4) + %% we only do that if a new message that might have an + %% expiry ends up at the head of the queue. If the head + %% remains unchanged, or if the newly published message + %% has no expiry and becomes the head of the queue then + %% the call is unnecessary. + case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of + {false, false, _} -> State3; + {true, true, undefined} -> State3; + {_, _, _} -> drop_expired_msgs(State3) end end. maybe_drop_head(State = #q{max_length = undefined}) -> - State; + {0, State}; maybe_drop_head(State = #q{max_length = MaxLen, backing_queue = BQ, backing_queue_state = BQS}) -> - case BQ:len(BQS) >= MaxLen of - true -> with_dlx( - State#q.dlx, - fun (X) -> dead_letter_maxlen_msgs(X, State) end, - fun () -> - {_, BQS1} = BQ:drop(false, BQS), - State#q{backing_queue_state = BQS1} - end); - false -> State + case BQ:len(BQS) - MaxLen of + Excess when Excess > 0 -> + {Excess, + with_dlx( + State#q.dlx, + fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end, + fun () -> + {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) -> + BQ:drop(false, BQS0) + end, {ok, BQS}, + lists:seq(1, Excess)), + State#q{backing_queue_state = BQS1} + end)}; + _ -> {0, State} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})). + {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}), + run_message_queue(drop_expired_msgs(State1)). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -783,12 +788,15 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> end, rejected, X, State), State1. -dead_letter_maxlen_msgs(X, State = #q{backing_queue = BQ}) -> +dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> {ok, State1} = dead_letter_msgs( - fun (DLFun, Acc, BQS1) -> - {{Msg, _, AckTag}, BQS2} = BQ:fetch(true, BQS1), - {ok, DLFun(Msg, AckTag, Acc), BQS2} + fun (DLFun, Acc, BQS) -> + lists:foldl(fun (_, {ok, Acc0, BQS0}) -> + {{Msg, _, AckTag}, BQS1} = + BQ:fetch(true, BQS0), + {ok, DLFun(Msg, AckTag, Acc0), BQS1} + end, {ok, Acc, BQS}, lists:seq(1, Excess)) end, maxlen, X, State), State1. |