diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-22 12:59:04 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-22 12:59:04 +0000 |
commit | 82be213f82e7877c49f214bde6fcc7c4514e0734 (patch) | |
tree | c7fd745d831bae9be715113f865dad8b43a194f3 | |
parent | 60a3fa3c4498ca55e0ae6cae22340867ea6cfd2a (diff) | |
download | rabbitmq-server-82be213f82e7877c49f214bde6fcc7c4514e0734.tar.gz |
Remove knowledge of AMQP frames from the limiter. (attempt 2)
-rw-r--r-- | src/rabbit_channel.erl | 24 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 10 |
2 files changed, 25 insertions, 9 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5ee030b1..9cb37c4f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,7 +21,8 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2]). +-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/3, + flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/0]). @@ -138,6 +139,12 @@ send_command(Pid, Msg) -> deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). +send_credit_reply(Pid, Len) -> + gen_server2:cast(Pid, {send_credit_reply, Len}). + +send_drained(Pid, ConsumerTag, Count) -> + gen_server2:cast(Pid, {send_drained, ConsumerTag, Count}). + flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). @@ -314,6 +321,21 @@ handle_cast({deliver, ConsumerTag, AckRequired, Content), noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); +handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_ok'{available = Len}), + noreply(State); + +handle_cast({send_drained, ConsumerTag, Count}, + State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_state'{consumer_tag = ConsumerTag, + credit = 0, + count = Count, + available = 0, + drain = true}), + noreply(State); + handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), noreply(State); diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 57fd0c26..401723a4 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -148,8 +148,7 @@ inform(Limiter = #token{q_state = Credits}, {Unblock, Credits2} = update_credit( CTag, Len, ChPid, Credit, Count, Drain, Credits), case Reply of - true -> rabbit_channel:send_command( - ChPid, #'basic.credit_ok'{available = Len}); + true -> rabbit_channel:send_credit_reply(ChPid, Len); false -> ok end, {Unblock, Limiter#token{q_state = Credits2}}. @@ -194,12 +193,7 @@ maybe_drain(_, _, _, _, Credit, Count) -> {Credit, Count}. send_drained(ChPid, CTag, Count) -> - rabbit_channel:send_command(ChPid, - #'basic.credit_state'{consumer_tag = CTag, - credit = 0, - count = Count, - available = 0, - drain = true}). + rabbit_channel:send_drained(ChPid, CTag, Count). update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) -> Count = case dict:find(CTag, Credits) of |