diff options
author | Vlad Ionescu <vlad@lshift.net> | 2009-07-22 16:24:03 +0100 |
---|---|---|
committer | Vlad Ionescu <vlad@lshift.net> | 2009-07-22 16:24:03 +0100 |
commit | a38fe72dd70d133e0a5a86708224893227d7a26a (patch) | |
tree | 5c079e6f8e52a8b1225bb7320158eddba8ede0c5 | |
parent | e368db61320d50a3577723f3f7a1e13814a22c75 (diff) | |
download | rabbitmq-server-bug20539.tar.gz |
adding support for sync-ing the writer with the erlang client channelbug20539
-rw-r--r-- | src/rabbit_writer.erl | 36 |
1 files changed, 33 insertions, 3 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 9cf9f8ae..e338ddfe 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,9 +33,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/3, shutdown/1, mainloop/1]). --export([send_command/2, send_command/3, - send_command_and_notify/5]). +-export([start/3, start_link/3, shutdown/1, mainloop/1]). +-export([send_command/2, send_command/3, send_command_and_signal_back/3, + send_command_and_signal_back/4, send_command_and_notify/5]). -export([internal_send_command/3, internal_send_command/5]). -import(gen_tcp). @@ -49,8 +49,12 @@ -ifdef(use_specs). -spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). +-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok'). +-spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). +-spec(send_command_and_signal_back/4 :: + (pid(), amqp_method(), content(), pid()) -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), amqp_method(), content()) -> 'ok'). -spec(internal_send_command/3 :: @@ -68,6 +72,11 @@ start(Sock, Channel, FrameMax) -> channel = Channel, frame_max = FrameMax}]). +start_link(Sock, Channel, FrameMax) -> + spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}]). + mainloop(State) -> receive Message -> ?MODULE:mainloop(handle_message(Message, State)) @@ -86,6 +95,19 @@ handle_message({send_command, MethodRecord, Content}, ok = internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax), State; +handle_message({send_command_and_signal_back, MethodRecord, Parent}, + State = #wstate{sock = Sock, channel = Channel}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord), + Parent ! rabbit_writer_send_command_signal, + State; +handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, + State = #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, + Content, FrameMax), + Parent ! rabbit_writer_send_command_signal, + State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, @@ -113,6 +135,14 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. +send_command_and_signal_back(W, MethodRecord, Parent) -> + W ! {send_command_and_signal_back, MethodRecord, Parent}, + ok. + +send_command_and_signal_back(W, MethodRecord, Content, Parent) -> + W ! {send_command_and_signal_back, MethodRecord, Content, Parent}, + ok. + send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. |