diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-10-04 14:43:15 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-10-04 14:43:15 +0100 |
commit | 4778b08097c8291524140444c8e872968fa2485a (patch) | |
tree | f1e3ea3b14cd2136f40b993d4e80660c89863da0 | |
parent | b725d82cd5be0dbb7f8fe8c7f516d3853ecb79ff (diff) | |
download | rabbitmq-server-4778b08097c8291524140444c8e872968fa2485a.tar.gz |
Introduce flow controlled versions of send_command() for the Erlang client to use.bug25804
-rw-r--r-- | src/rabbit_writer.erl | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index bf6964d8..34dd3d3b 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -25,6 +25,7 @@ -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, send_command_and_notify/4, send_command_and_notify/5, + send_command_flow/2, send_command_flow/3, flush/1]). -export([internal_send_command/4, internal_send_command/6]). @@ -78,6 +79,11 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(send_command_flow/2 :: + (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). +-spec(send_command_flow/3 :: + (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) + -> 'ok'). -spec(flush/1 :: (pid()) -> 'ok'). -spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), @@ -165,6 +171,12 @@ handle_message({send_command, MethodRecord}, State) -> internal_send_command_async(MethodRecord, State); handle_message({send_command, MethodRecord, Content}, State) -> internal_send_command_async(MethodRecord, Content, State); +handle_message({send_command_flow, MethodRecord, Sender}, State) -> + credit_flow:ack(Sender), + internal_send_command_async(MethodRecord, State); +handle_message({send_command_flow, MethodRecord, Content, Sender}, State) -> + credit_flow:ack(Sender), + internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> State1 = internal_flush( internal_send_command_async(MethodRecord, State)), @@ -212,6 +224,16 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. +send_command_flow(W, MethodRecord) -> + credit_flow:send(W), + W ! {send_command_flow, MethodRecord, self()}, + ok. + +send_command_flow(W, MethodRecord, Content) -> + credit_flow:send(W), + W ! {send_command_flow, MethodRecord, Content, self()}, + ok. + send_command_sync(W, MethodRecord) -> call(W, {send_command_sync, MethodRecord}). |