diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-03-21 10:41:42 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-03-21 10:41:42 +0000 |
commit | 30965c881b81a27880c1a81fcc735f79dd1f8982 (patch) | |
tree | 6c24c1366aea5bf6135fdc0fe22175c2d7204e87 /src/rabbit_limiter.erl | |
parent | b15b247899318309bc75840f2b37dde3f548d255 (diff) | |
download | rabbitmq-server-30965c881b81a27880c1a81fcc735f79dd1f8982.tar.gz |
Merge the two can_sends and tidy up.
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r-- | src/rabbit_limiter.erl | 56 |
1 files changed, 21 insertions, 35 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 71ed2e73..6825622d 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -111,14 +111,13 @@ is_prefetch_limited/1, is_blocked/1, is_active/1, get_prefetch_limit/1, ack/2, pid/1]). %% queue API --export([client/1, activate/1, can_send/2, resume/1, deactivate/1, - is_suspended/1]). +-export([client/1, activate/1, can_send/3, resume/1, deactivate/1, + is_suspended/1, is_consumer_blocked/2, credit/4, drained/1, + forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). --export([is_consumer_blocked/2, credit/4, drained/1, forget_consumer/2]). - -import(rabbit_misc, [serial_add/2, serial_diff/2]). %%---------------------------------------------------------------------------- @@ -141,8 +140,6 @@ -> lstate()). -spec(unlimit_prefetch/1 :: (lstate()) -> lstate()). -spec(block/1 :: (lstate()) -> lstate()). --spec(can_send/4 :: (qstate(), pid(), boolean(), rabbit_types:ctag()) - -> qstate() | 'consumer_blocked' | 'channel_blocked'). -spec(unblock/1 :: (lstate()) -> lstate()). -spec(is_prefetch_limited/1 :: (lstate()) -> boolean()). -spec(is_blocked/1 :: (lstate()) -> boolean()). @@ -153,7 +150,7 @@ -spec(client/1 :: (pid()) -> qstate()). -spec(activate/1 :: (qstate()) -> qstate()). --spec(can_send/2 :: (qstate(), boolean()) -> +-spec(can_send/3 :: (qstate(), boolean(), rabbit_types:ctag()) -> {'continue' | 'suspend', qstate()}). -spec(resume/1 :: (qstate()) -> qstate()). -spec(deactivate/1 :: (qstate()) -> qstate()). @@ -210,24 +207,6 @@ unblock(L) -> is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited. -can_send(Token = #qstate{pid = Pid, state = State, credits = Credits}, - QPid, AckReq, CTag) -> - case is_consumer_blocked(Token, CTag) of - false -> case State =/= active orelse call_can_send(Pid, QPid, AckReq) of - true -> Token#qstate{ - credits = record_send_q(CTag, Credits)}; - false -> channel_blocked - end; - true -> consumer_blocked - end. - -call_can_send(Pid, QPid, AckRequired) -> - rabbit_misc:with_exit_handler( - fun () -> true end, - fun () -> - gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) - end). - is_blocked(#lstate{blocked = Blocked}) -> Blocked. is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L). @@ -247,16 +226,24 @@ activate(L = #qstate{state = dormant}) -> L#qstate{state = active}; activate(L) -> L. -can_send(L = #qstate{state = active}, AckRequired) -> +can_send(L = #qstate{pid = Pid, state = State, credits = Credits}, + AckReq, CTag) -> + case is_consumer_blocked(L, CTag) of + false -> case State =/= active orelse call_can_send( + Pid, self(), AckReq) of + true -> {continue, L#qstate{ + credits = record_send_q(CTag, Credits)}}; + false -> {suspend, L#qstate{state = suspended}} + end; + true -> {suspend, L#qstate{state = suspended}} + end. + +call_can_send(Pid, QPid, AckRequired) -> rabbit_misc:with_exit_handler( - fun () -> {continue, L} end, - fun () -> Msg = {can_send, self(), AckRequired}, - case gen_server2:call(L#qstate.pid, Msg, infinity) of - true -> {continue, L}; - false -> {suspend, L#qstate{state = suspended}} - end - end); -can_send(L, _AckRequired) -> {continue, L}. + fun () -> true end, + fun () -> + gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) + end). resume(L) -> L#qstate{state = active}. @@ -275,7 +262,6 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> none -> false end. - credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) -> Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. |