diff options
Diffstat (limited to 'src/rabbit_writer.erl')
-rw-r--r-- | src/rabbit_writer.erl | 187 |
1 files changed, 106 insertions, 81 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 3d10dc12..aa986e54 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,14 +33,14 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/3, start_link/3, shutdown/1, mainloop/1]). --export([send_command/2, send_command/3, send_command_and_signal_back/3, - send_command_and_signal_back/4, send_command_and_notify/5]). --export([internal_send_command/3, internal_send_command/5]). +-export([start/5, start_link/5, mainloop/2, mainloop1/2]). +-export([send_command/2, send_command/3, send_command_sync/2, + send_command_sync/3, send_command_and_notify/5]). +-export([internal_send_command/4, internal_send_command/6]). -import(gen_tcp). --record(wstate, {sock, channel, frame_max}). +-record(wstate, {sock, channel, frame_max, protocol}). -define(HIBERNATE_AFTER, 5000). @@ -48,80 +48,96 @@ -ifdef(use_specs). --spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). --spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). --spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok'). --spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok'). --spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). --spec(send_command_and_signal_back/4 :: - (pid(), amqp_method(), content(), pid()) -> 'ok'). +-spec(start/5 :: + (rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), rabbit_types:protocol(), pid()) + -> rabbit_types:ok(pid())). +-spec(start_link/5 :: + (rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), rabbit_types:protocol(), pid()) + -> rabbit_types:ok(pid())). +-spec(send_command/2 :: + (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). +-spec(send_command/3 :: + (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) + -> 'ok'). +-spec(send_command_sync/2 :: + (pid(), rabbit_framing:amqp_method()) -> 'ok'). +-spec(send_command_sync/3 :: + (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok'). -spec(send_command_and_notify/5 :: - (pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok'). --spec(internal_send_command/3 :: - (socket(), channel_number(), amqp_method_record()) -> 'ok'). --spec(internal_send_command/5 :: - (socket(), channel_number(), amqp_method_record(), - content(), non_neg_integer()) -> 'ok'). + (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), + rabbit_types:content()) + -> 'ok'). +-spec(internal_send_command/4 :: + (rabbit_net:socket(), rabbit_channel:channel_number(), + rabbit_framing:amqp_method_record(), rabbit_types:protocol()) + -> 'ok'). +-spec(internal_send_command/6 :: + (rabbit_net:socket(), rabbit_channel:channel_number(), + rabbit_framing:amqp_method_record(), rabbit_types:content(), + non_neg_integer(), rabbit_types:protocol()) + -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start(Sock, Channel, FrameMax) -> - spawn(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax}]). - -start_link(Sock, Channel, FrameMax) -> - spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax}]). - -mainloop(State) -> +start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> + {ok, + proc_lib:spawn(?MODULE, mainloop, [ReaderPid, + #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol}])}. + +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> + {ok, + proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid, + #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol}])}. + +mainloop(ReaderPid, State) -> + try + mainloop1(ReaderPid, State) + catch + exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error} + end, + done. + +mainloop1(ReaderPid, State) -> receive - Message -> ?MODULE:mainloop(handle_message(Message, State)) + Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [State]) + erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) end. -handle_message({send_command, MethodRecord}, - State = #wstate{sock = Sock, channel = Channel}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord), +handle_message({send_command, MethodRecord}, State) -> + ok = internal_send_command_async(MethodRecord, State), State; -handle_message({send_command, MethodRecord, Content}, - State = #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), +handle_message({send_command, MethodRecord, Content}, State) -> + ok = internal_send_command_async(MethodRecord, Content, State), State; -handle_message({send_command_and_signal_back, MethodRecord, Parent}, - State = #wstate{sock = Sock, channel = Channel}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord), - Parent ! rabbit_writer_send_command_signal, +handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> + ok = internal_send_command_async(MethodRecord, State), + gen_server:reply(From, ok), State; -handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, - State = #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), - Parent ! rabbit_writer_send_command_signal, +handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, + State) -> + ok = internal_send_command_async(MethodRecord, Content, State), + gen_server:reply(From, ok), State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, - State = #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + State) -> + ok = internal_send_command_async(MethodRecord, Content, State), rabbit_amqqueue:notify_sent(QPid, ChPid), State; handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); -handle_message(shutdown, _State) -> - exit(normal); handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). @@ -135,48 +151,50 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. -send_command_and_signal_back(W, MethodRecord, Parent) -> - W ! {send_command_and_signal_back, MethodRecord, Parent}, - ok. +send_command_sync(W, MethodRecord) -> + call(W, {send_command_sync, MethodRecord}). -send_command_and_signal_back(W, MethodRecord, Content, Parent) -> - W ! {send_command_and_signal_back, MethodRecord, Content, Parent}, - ok. +send_command_sync(W, MethodRecord, Content) -> + call(W, {send_command_sync, MethodRecord, Content}). send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. -shutdown(W) -> - W ! shutdown, - ok. +%--------------------------------------------------------------------------- + +call(Pid, Msg) -> + {ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity), + Res. %--------------------------------------------------------------------------- -assemble_frames(Channel, MethodRecord) -> +assemble_frames(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), - rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord). + rabbit_binary_generator:build_simple_method_frame( + Channel, MethodRecord, Protocol). -assemble_frames(Channel, MethodRecord, Content, FrameMax) -> +assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, Content), MethodName = rabbit_misc:method_record_type(MethodRecord), - true = rabbit_framing:method_has_content(MethodName), % assertion + true = Protocol:method_has_content(MethodName), % assertion MethodFrame = rabbit_binary_generator:build_simple_method_frame( - Channel, MethodRecord), + Channel, MethodRecord, Protocol), ContentFrames = rabbit_binary_generator:build_simple_content_frames( - Channel, Content, FrameMax), + Channel, Content, FrameMax, Protocol), [MethodFrame | ContentFrames]. tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). -internal_send_command(Sock, Channel, MethodRecord) -> - ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). +internal_send_command(Sock, Channel, MethodRecord, Protocol) -> + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)). -internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> +internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, + Protocol) -> ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)). + Content, FrameMax, Protocol)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -196,13 +214,20 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> %% Also note that the port has bounded buffers and port_command blocks %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. -internal_send_command_async(Sock, Channel, MethodRecord) -> - true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)), +internal_send_command_async(MethodRecord, + #wstate{sock = Sock, + channel = Channel, + protocol = Protocol}) -> + true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)), ok. -internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> +internal_send_command_async(MethodRecord, Content, + #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol}) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)), + Content, FrameMax, Protocol)), ok. port_cmd(Sock, Data) -> |