summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-03-21 10:41:42 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-03-21 10:41:42 +0000
commit30965c881b81a27880c1a81fcc735f79dd1f8982 (patch)
tree6c24c1366aea5bf6135fdc0fe22175c2d7204e87
parentb15b247899318309bc75840f2b37dde3f548d255 (diff)
downloadrabbitmq-server-30965c881b81a27880c1a81fcc735f79dd1f8982.tar.gz
Merge the two can_sends and tidy up.
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_limiter.erl56
2 files changed, 25 insertions, 42 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8f325e5c..22bbbd00 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -447,16 +447,13 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
- consumer_blocked ->
- block_consumer(C, E),
+ {suspend, Limiter} ->
+ block_consumer(C#cr{limiter = Limiter}, E),
{false, State};
- channel_blocked ->
- block_consumer(C, E),
- {false, State};
- Limiter2 ->
+ {continue, Limiter} ->
AC1 = queue:in(E, State#q.active_consumers),
deliver_msg_to_consumer(
- DeliverFun, Consumer, C#cr{limiter = Limiter2},
+ DeliverFun, Consumer, C#cr{limiter = Limiter},
State#q{active_consumers = AC1})
end
end.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 71ed2e73..6825622d 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -111,14 +111,13 @@
is_prefetch_limited/1, is_blocked/1, is_active/1,
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
--export([client/1, activate/1, can_send/2, resume/1, deactivate/1,
- is_suspended/1]).
+-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
+ is_suspended/1, is_consumer_blocked/2, credit/4, drained/1,
+ forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
--export([is_consumer_blocked/2, credit/4, drained/1, forget_consumer/2]).
-
-import(rabbit_misc, [serial_add/2, serial_diff/2]).
%%----------------------------------------------------------------------------
@@ -141,8 +140,6 @@
-> lstate()).
-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()).
-spec(block/1 :: (lstate()) -> lstate()).
--spec(can_send/4 :: (qstate(), pid(), boolean(), rabbit_types:ctag())
- -> qstate() | 'consumer_blocked' | 'channel_blocked').
-spec(unblock/1 :: (lstate()) -> lstate()).
-spec(is_prefetch_limited/1 :: (lstate()) -> boolean()).
-spec(is_blocked/1 :: (lstate()) -> boolean()).
@@ -153,7 +150,7 @@
-spec(client/1 :: (pid()) -> qstate()).
-spec(activate/1 :: (qstate()) -> qstate()).
--spec(can_send/2 :: (qstate(), boolean()) ->
+-spec(can_send/3 :: (qstate(), boolean(), rabbit_types:ctag()) ->
{'continue' | 'suspend', qstate()}).
-spec(resume/1 :: (qstate()) -> qstate()).
-spec(deactivate/1 :: (qstate()) -> qstate()).
@@ -210,24 +207,6 @@ unblock(L) ->
is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited.
-can_send(Token = #qstate{pid = Pid, state = State, credits = Credits},
- QPid, AckReq, CTag) ->
- case is_consumer_blocked(Token, CTag) of
- false -> case State =/= active orelse call_can_send(Pid, QPid, AckReq) of
- true -> Token#qstate{
- credits = record_send_q(CTag, Credits)};
- false -> channel_blocked
- end;
- true -> consumer_blocked
- end.
-
-call_can_send(Pid, QPid, AckRequired) ->
- rabbit_misc:with_exit_handler(
- fun () -> true end,
- fun () ->
- gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
- end).
-
is_blocked(#lstate{blocked = Blocked}) -> Blocked.
is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L).
@@ -247,16 +226,24 @@ activate(L = #qstate{state = dormant}) ->
L#qstate{state = active};
activate(L) -> L.
-can_send(L = #qstate{state = active}, AckRequired) ->
+can_send(L = #qstate{pid = Pid, state = State, credits = Credits},
+ AckReq, CTag) ->
+ case is_consumer_blocked(L, CTag) of
+ false -> case State =/= active orelse call_can_send(
+ Pid, self(), AckReq) of
+ true -> {continue, L#qstate{
+ credits = record_send_q(CTag, Credits)}};
+ false -> {suspend, L#qstate{state = suspended}}
+ end;
+ true -> {suspend, L#qstate{state = suspended}}
+ end.
+
+call_can_send(Pid, QPid, AckRequired) ->
rabbit_misc:with_exit_handler(
- fun () -> {continue, L} end,
- fun () -> Msg = {can_send, self(), AckRequired},
- case gen_server2:call(L#qstate.pid, Msg, infinity) of
- true -> {continue, L};
- false -> {suspend, L#qstate{state = suspended}}
- end
- end);
-can_send(L, _AckRequired) -> {continue, L}.
+ fun () -> true end,
+ fun () ->
+ gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
+ end).
resume(L) -> L#qstate{state = active}.
@@ -275,7 +262,6 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
none -> false
end.
-
credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) ->
Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.