summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-01-16 22:11:32 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-01-16 22:11:32 +0000
commitb0d278f1ca08e4fd315c1fc3344952e0173354dc (patch)
tree6a5e04f5d063079eb9ca21534c0b319a221a22e7
parente84f054a179fc996946835e54944c2b6810d8af1 (diff)
downloadrabbitmq-server-b0d278f1ca08e4fd315c1fc3344952e0173354dc.tar.gz
make flow control explicit in the channel API
just as we did for the queue As a result the existing call sites of rabbit_channel:do, e.g. in rabbit_test and the erlang client, can be left alone.
-rw-r--r--src/rabbit_channel.erl18
-rw-r--r--src/rabbit_reader.erl5
2 files changed, 14 insertions, 9 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 680e486d..0f6d2517 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
@@ -78,6 +78,8 @@
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
+-spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) -> 'ok').
-spec(flush/1 :: (pid()) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
@@ -111,7 +113,11 @@ do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
- gen_server2:cast(Pid, {method, Method, Content}).
+ gen_server2:cast(Pid, {method, Method, Content, noflow}).
+
+do_flow(Pid, Method, Content) ->
+ credit_flow:send(Pid),
+ gen_server2:cast(Pid, {method, Method, Content, flow}).
flush(Pid) ->
gen_server2:call(Pid, flush, infinity).
@@ -244,10 +250,10 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
handle_call(_Request, _From, State) ->
noreply(State).
-handle_cast({method, Method, Content}, State = #ch{conn_pid = Conn}) ->
- case Content of
- none -> ok;
- _ -> credit_flow:ack(Conn)
+handle_cast({method, Method, Content, Flow}, State = #ch{conn_pid = Conn}) ->
+ case Flow of
+ flow -> credit_flow:ack(Conn);
+ noflow -> ok
end,
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index c3b73f36..07a446e9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -919,9 +919,8 @@ process_channel_frame(Frame, Channel, ChPid, AState) ->
{ok, NewAState} -> NewAState;
{ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
NewAState;
- {ok, Method, Content, NewAState} -> credit_flow:send(ChPid),
- rabbit_channel:do(ChPid, Method,
- Content),
+ {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
+ ChPid, Method, Content),
NewAState;
{error, Reason} -> self() ! {channel_exit, Channel,
Reason},