summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-06-10 07:00:40 +0100
committerMatthias Radestock <matthias@lshift.net>2009-06-10 07:00:40 +0100
commit852e92a158c069b9b7d9e117b1faaaf366963bda (patch)
treef6eac690eb8177b4ba6ae8f19994329c87c3f8c8
parent6a7d99ad7a86fbdd80090dc43916586f93d1ec32 (diff)
parent0f89004dd0b386db1b2444abd47c4946d05e3e33 (diff)
downloadrabbitmq-server-852e92a158c069b9b7d9e117b1faaaf366963bda.tar.gz
merge bug20940 into default
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_limiter.erl18
2 files changed, 12 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6027c9c0..7ffb1c8f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -175,8 +175,7 @@ deliver_immediately(Message, Delivered,
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- case not(AckRequired) orelse rabbit_limiter:can_send(
- LimiterPid, self()) of
+ case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of
true ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 3f9b6ebb..9f3dcbd0 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -36,7 +36,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1, shutdown/1]).
--export([limit/2, can_send/2, ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
%%----------------------------------------------------------------------------
@@ -47,7 +47,7 @@
-spec(start_link/1 :: (pid()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
--spec(can_send/2 :: (maybe_pid(), pid()) -> bool()).
+-spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
@@ -85,12 +85,13 @@ limit(LimiterPid, PrefetchCount) ->
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
-can_send(undefined, _QPid) ->
+can_send(undefined, _QPid, _AckRequired) ->
true;
-can_send(LimiterPid, QPid) ->
+can_send(LimiterPid, QPid, AckRequired) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end).
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired},
+ infinity) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer
@@ -110,10 +111,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid})
init([ChPid]) ->
{ok, #lim{ch_pid = ChPid} }.
-handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) ->
+handle_call({can_send, QPid, AckRequired}, _From,
+ State = #lim{volume = Volume}) ->
case limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = Volume + 1}}
+ false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end}}
end.
handle_cast(shutdown, State) ->