diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-21 14:45:51 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-21 14:45:51 +0000 |
commit | 62c4b3d9963ea78a5c36a3981958c8e6988f0756 (patch) | |
tree | 73022aec30253dddbd856a551594073ea685ea3b /src/rabbit_writer.erl | |
parent | 5e538690e65b7afbc97ab27816f2881b440cdd5b (diff) | |
download | rabbitmq-server-62c4b3d9963ea78a5c36a3981958c8e6988f0756.tar.gz |
get th channel to flush the writer
when the former is asked to terminate by the reader
Diffstat (limited to 'src/rabbit_writer.erl')
-rw-r--r-- | src/rabbit_writer.erl | 24 |
1 files changed, 17 insertions, 7 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index a7ea3d99..059d3839 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -21,7 +21,8 @@ -export([start/5, start_link/5, start/6, start_link/6]). -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, - send_command_and_notify/4, send_command_and_notify/5]). + send_command_and_notify/4, send_command_and_notify/5, + flush/1]). -export([internal_send_command/4, internal_send_command/6]). %% internal @@ -69,6 +70,7 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), rabbit_types:protocol()) @@ -130,7 +132,7 @@ mainloop1(State) -> receive Message -> ?MODULE:mainloop1(handle_message(Message, State)) after 0 -> - ?MODULE:mainloop1(flush(State)) + ?MODULE:mainloop1(internal_flush(State)) end. handle_message({send_command, MethodRecord}, State) -> @@ -138,12 +140,18 @@ handle_message({send_command, MethodRecord}, State) -> handle_message({send_command, MethodRecord, Content}, State) -> internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> - State1 = flush(internal_send_command_async(MethodRecord, State)), + State1 = internal_flush( + internal_send_command_async(MethodRecord, State)), gen_server:reply(From, ok), State1; handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, State) -> - State1 = flush(internal_send_command_async(MethodRecord, Content, State)), + State1 = internal_flush( + internal_send_command_async(MethodRecord, Content, State)), + gen_server:reply(From, ok), + State1; +handle_message({'$gen_call', From, flush}, State) -> + State1 = internal_flush(State), gen_server:reply(From, ok), State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) -> @@ -192,6 +200,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. +flush(W) -> call(W, flush). + %%--------------------------------------------------------------------------- call(Pid, Msg) -> @@ -251,13 +261,13 @@ internal_send_command_async(MethodRecord, Content, maybe_flush(State = #wstate{pending = Pending}) -> case iolist_size(Pending) >= ?FLUSH_THRESHOLD of - true -> flush(State); + true -> internal_flush(State); false -> State end. -flush(State = #wstate{pending = []}) -> +internal_flush(State = #wstate{pending = []}) -> State; -flush(State = #wstate{sock = Sock, pending = Pending}) -> +internal_flush(State = #wstate{sock = Sock, pending = Pending}) -> ok = port_cmd(Sock, lists:reverse(Pending)), State#wstate{pending = []}. |