summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-10-21 14:58:26 +0100
committerTim Watson <tim@rabbitmq.com>2013-10-21 14:58:26 +0100
commitae19b3da5f9ab369505e2720491ba06592b3d0de (patch)
tree5dae6e0f0fa67025f93f2e3aa5f4d117d3ca5314
parente39a106a524d90add9db16811a0d22ab23d32108 (diff)
parent4778b08097c8291524140444c8e872968fa2485a (diff)
downloadrabbitmq-server-ae19b3da5f9ab369505e2720491ba06592b3d0de.tar.gz
merge bug25804 into default
-rw-r--r--src/rabbit_writer.erl22
1 files changed, 22 insertions, 0 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index bf6964d8..34dd3d3b 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -25,6 +25,7 @@
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
send_command_and_notify/4, send_command_and_notify/5,
+ send_command_flow/2, send_command_flow/3,
flush/1]).
-export([internal_send_command/4, internal_send_command/6]).
@@ -78,6 +79,11 @@
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
-> 'ok').
+-spec(send_command_flow/2 ::
+ (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
+-spec(send_command_flow/3 ::
+ (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
+ -> 'ok').
-spec(flush/1 :: (pid()) -> 'ok').
-spec(internal_send_command/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
@@ -165,6 +171,12 @@ handle_message({send_command, MethodRecord}, State) ->
internal_send_command_async(MethodRecord, State);
handle_message({send_command, MethodRecord, Content}, State) ->
internal_send_command_async(MethodRecord, Content, State);
+handle_message({send_command_flow, MethodRecord, Sender}, State) ->
+ credit_flow:ack(Sender),
+ internal_send_command_async(MethodRecord, State);
+handle_message({send_command_flow, MethodRecord, Content, Sender}, State) ->
+ credit_flow:ack(Sender),
+ internal_send_command_async(MethodRecord, Content, State);
handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
State1 = internal_flush(
internal_send_command_async(MethodRecord, State)),
@@ -212,6 +224,16 @@ send_command(W, MethodRecord, Content) ->
W ! {send_command, MethodRecord, Content},
ok.
+send_command_flow(W, MethodRecord) ->
+ credit_flow:send(W),
+ W ! {send_command_flow, MethodRecord, self()},
+ ok.
+
+send_command_flow(W, MethodRecord, Content) ->
+ credit_flow:send(W),
+ W ! {send_command_flow, MethodRecord, Content, self()},
+ ok.
+
send_command_sync(W, MethodRecord) ->
call(W, {send_command_sync, MethodRecord}).