diff options
author | Tim Watson <tim@rabbitmq.com> | 2013-10-21 14:58:26 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2013-10-21 14:58:26 +0100 |
commit | ae19b3da5f9ab369505e2720491ba06592b3d0de (patch) | |
tree | 5dae6e0f0fa67025f93f2e3aa5f4d117d3ca5314 | |
parent | e39a106a524d90add9db16811a0d22ab23d32108 (diff) | |
parent | 4778b08097c8291524140444c8e872968fa2485a (diff) | |
download | rabbitmq-server-ae19b3da5f9ab369505e2720491ba06592b3d0de.tar.gz |
merge bug25804 into default
-rw-r--r-- | src/rabbit_writer.erl | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index bf6964d8..34dd3d3b 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -25,6 +25,7 @@ -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, send_command_and_notify/4, send_command_and_notify/5, + send_command_flow/2, send_command_flow/3, flush/1]). -export([internal_send_command/4, internal_send_command/6]). @@ -78,6 +79,11 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(send_command_flow/2 :: + (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). +-spec(send_command_flow/3 :: + (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) + -> 'ok'). -spec(flush/1 :: (pid()) -> 'ok'). -spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), @@ -165,6 +171,12 @@ handle_message({send_command, MethodRecord}, State) -> internal_send_command_async(MethodRecord, State); handle_message({send_command, MethodRecord, Content}, State) -> internal_send_command_async(MethodRecord, Content, State); +handle_message({send_command_flow, MethodRecord, Sender}, State) -> + credit_flow:ack(Sender), + internal_send_command_async(MethodRecord, State); +handle_message({send_command_flow, MethodRecord, Content, Sender}, State) -> + credit_flow:ack(Sender), + internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> State1 = internal_flush( internal_send_command_async(MethodRecord, State)), @@ -212,6 +224,16 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. +send_command_flow(W, MethodRecord) -> + credit_flow:send(W), + W ! {send_command_flow, MethodRecord, self()}, + ok. + +send_command_flow(W, MethodRecord, Content) -> + credit_flow:send(W), + W ! {send_command_flow, MethodRecord, Content, self()}, + ok. + send_command_sync(W, MethodRecord) -> call(W, {send_command_sync, MethodRecord}). |