diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-06-10 07:00:40 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-06-10 07:00:40 +0100 |
commit | 852e92a158c069b9b7d9e117b1faaaf366963bda (patch) | |
tree | f6eac690eb8177b4ba6ae8f19994329c87c3f8c8 | |
parent | 6a7d99ad7a86fbdd80090dc43916586f93d1ec32 (diff) | |
parent | 0f89004dd0b386db1b2444abd47c4946d05e3e33 (diff) | |
download | rabbitmq-server-852e92a158c069b9b7d9e117b1faaaf366963bda.tar.gz |
merge bug20940 into default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 18 |
2 files changed, 12 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6027c9c0..7ffb1c8f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -175,8 +175,7 @@ deliver_immediately(Message, Delivered, C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - case not(AckRequired) orelse rabbit_limiter:can_send( - LimiterPid, self()) of + case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of true -> rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 3f9b6ebb..9f3dcbd0 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -36,7 +36,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1, shutdown/1]). --export([limit/2, can_send/2, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/3, ack/2, register/2, unregister/2]). %%---------------------------------------------------------------------------- @@ -47,7 +47,7 @@ -spec(start_link/1 :: (pid()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(can_send/2 :: (maybe_pid(), pid()) -> bool()). +-spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). @@ -85,12 +85,13 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid) -> +can_send(undefined, _QPid, _AckRequired) -> true; -can_send(LimiterPid, QPid) -> +can_send(LimiterPid, QPid, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end). + fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, + infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer @@ -110,10 +111,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}) init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) -> +handle_call({can_send, QPid, AckRequired}, _From, + State = #lim{volume = Volume}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = Volume + 1}} + false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end}} end. handle_cast(shutdown, State) -> |