diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-16 22:11:32 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-16 22:11:32 +0000 |
commit | b0d278f1ca08e4fd315c1fc3344952e0173354dc (patch) | |
tree | 6a5e04f5d063079eb9ca21534c0b319a221a22e7 | |
parent | e84f054a179fc996946835e54944c2b6810d8af1 (diff) | |
download | rabbitmq-server-b0d278f1ca08e4fd315c1fc3344952e0173354dc.tar.gz |
make flow control explicit in the channel API
just as we did for the queue
As a result the existing call sites of rabbit_channel:do, e.g. in
rabbit_test and the erlang client, can be left alone.
-rw-r--r-- | src/rabbit_channel.erl | 18 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 5 |
2 files changed, 14 insertions, 9 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 680e486d..0f6d2517 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,7 +20,7 @@ -behaviour(gen_server2). --export([start_link/10, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). @@ -78,6 +78,8 @@ -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). +-spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(), + rabbit_types:maybe(rabbit_types:content())) -> 'ok'). -spec(flush/1 :: (pid()) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). @@ -111,7 +113,11 @@ do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - gen_server2:cast(Pid, {method, Method, Content}). + gen_server2:cast(Pid, {method, Method, Content, noflow}). + +do_flow(Pid, Method, Content) -> + credit_flow:send(Pid), + gen_server2:cast(Pid, {method, Method, Content, flow}). flush(Pid) -> gen_server2:call(Pid, flush, infinity). @@ -244,10 +250,10 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> handle_call(_Request, _From, State) -> noreply(State). -handle_cast({method, Method, Content}, State = #ch{conn_pid = Conn}) -> - case Content of - none -> ok; - _ -> credit_flow:ack(Conn) +handle_cast({method, Method, Content, Flow}, State = #ch{conn_pid = Conn}) -> + case Flow of + flow -> credit_flow:ack(Conn); + noflow -> ok end, try handle_method(Method, Content, State) of {reply, Reply, NewState} -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index c3b73f36..07a446e9 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -919,9 +919,8 @@ process_channel_frame(Frame, Channel, ChPid, AState) -> {ok, NewAState} -> NewAState; {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), NewAState; - {ok, Method, Content, NewAState} -> credit_flow:send(ChPid), - rabbit_channel:do(ChPid, Method, - Content), + {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( + ChPid, Method, Content), NewAState; {error, Reason} -> self() ! {channel_exit, Channel, Reason}, |