summaryrefslogtreecommitdiff
path: root/src/rabbit_writer.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_writer.erl')
-rw-r--r--src/rabbit_writer.erl187
1 files changed, 106 insertions, 81 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 3d10dc12..aa986e54 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,14 +33,14 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/3, start_link/3, shutdown/1, mainloop/1]).
--export([send_command/2, send_command/3, send_command_and_signal_back/3,
- send_command_and_signal_back/4, send_command_and_notify/5]).
--export([internal_send_command/3, internal_send_command/5]).
+-export([start/5, start_link/5, mainloop/2, mainloop1/2]).
+-export([send_command/2, send_command/3, send_command_sync/2,
+ send_command_sync/3, send_command_and_notify/5]).
+-export([internal_send_command/4, internal_send_command/6]).
-import(gen_tcp).
--record(wstate, {sock, channel, frame_max}).
+-record(wstate, {sock, channel, frame_max, protocol}).
-define(HIBERNATE_AFTER, 5000).
@@ -48,80 +48,96 @@
-ifdef(use_specs).
--spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
--spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
--spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok').
--spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok').
--spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok').
--spec(send_command_and_signal_back/4 ::
- (pid(), amqp_method(), content(), pid()) -> 'ok').
+-spec(start/5 ::
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), rabbit_types:protocol(), pid())
+ -> rabbit_types:ok(pid())).
+-spec(start_link/5 ::
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), rabbit_types:protocol(), pid())
+ -> rabbit_types:ok(pid())).
+-spec(send_command/2 ::
+ (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
+-spec(send_command/3 ::
+ (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
+ -> 'ok').
+-spec(send_command_sync/2 ::
+ (pid(), rabbit_framing:amqp_method()) -> 'ok').
+-spec(send_command_sync/3 ::
+ (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok').
-spec(send_command_and_notify/5 ::
- (pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok').
--spec(internal_send_command/3 ::
- (socket(), channel_number(), amqp_method_record()) -> 'ok').
--spec(internal_send_command/5 ::
- (socket(), channel_number(), amqp_method_record(),
- content(), non_neg_integer()) -> 'ok').
+ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:content())
+ -> 'ok').
+-spec(internal_send_command/4 ::
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ rabbit_framing:amqp_method_record(), rabbit_types:protocol())
+ -> 'ok').
+-spec(internal_send_command/6 ::
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ rabbit_framing:amqp_method_record(), rabbit_types:content(),
+ non_neg_integer(), rabbit_types:protocol())
+ -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start(Sock, Channel, FrameMax) ->
- spawn(?MODULE, mainloop, [#wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax}]).
-
-start_link(Sock, Channel, FrameMax) ->
- spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax}]).
-
-mainloop(State) ->
+start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ {ok,
+ proc_lib:spawn(?MODULE, mainloop, [ReaderPid,
+ #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
+
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ {ok,
+ proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid,
+ #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
+
+mainloop(ReaderPid, State) ->
+ try
+ mainloop1(ReaderPid, State)
+ catch
+ exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error}
+ end,
+ done.
+
+mainloop1(ReaderPid, State) ->
receive
- Message -> ?MODULE:mainloop(handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop, [State])
+ erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
end.
-handle_message({send_command, MethodRecord},
- State = #wstate{sock = Sock, channel = Channel}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord),
+handle_message({send_command, MethodRecord}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
State;
-handle_message({send_command, MethodRecord, Content},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+handle_message({send_command, MethodRecord, Content}, State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
State;
-handle_message({send_command_and_signal_back, MethodRecord, Parent},
- State = #wstate{sock = Sock, channel = Channel}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord),
- Parent ! rabbit_writer_send_command_signal,
+handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
+ gen_server:reply(From, ok),
State;
-handle_message({send_command_and_signal_back, MethodRecord, Content, Parent},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
- Parent ! rabbit_writer_send_command_signal,
+handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
+ State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
+ gen_server:reply(From, ok),
State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
-handle_message(shutdown, _State) ->
- exit(normal);
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).
@@ -135,48 +151,50 @@ send_command(W, MethodRecord, Content) ->
W ! {send_command, MethodRecord, Content},
ok.
-send_command_and_signal_back(W, MethodRecord, Parent) ->
- W ! {send_command_and_signal_back, MethodRecord, Parent},
- ok.
+send_command_sync(W, MethodRecord) ->
+ call(W, {send_command_sync, MethodRecord}).
-send_command_and_signal_back(W, MethodRecord, Content, Parent) ->
- W ! {send_command_and_signal_back, MethodRecord, Content, Parent},
- ok.
+send_command_sync(W, MethodRecord, Content) ->
+ call(W, {send_command_sync, MethodRecord, Content}).
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
-shutdown(W) ->
- W ! shutdown,
- ok.
+%---------------------------------------------------------------------------
+
+call(Pid, Msg) ->
+ {ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
+ Res.
%---------------------------------------------------------------------------
-assemble_frames(Channel, MethodRecord) ->
+assemble_frames(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
- rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord).
+ rabbit_binary_generator:build_simple_method_frame(
+ Channel, MethodRecord, Protocol).
-assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
+assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, Content),
MethodName = rabbit_misc:method_record_type(MethodRecord),
- true = rabbit_framing:method_has_content(MethodName), % assertion
+ true = Protocol:method_has_content(MethodName), % assertion
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
- Channel, MethodRecord),
+ Channel, MethodRecord, Protocol),
ContentFrames = rabbit_binary_generator:build_simple_content_frames(
- Channel, Content, FrameMax),
+ Channel, Content, FrameMax, Protocol),
[MethodFrame | ContentFrames].
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
-internal_send_command(Sock, Channel, MethodRecord) ->
- ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
+internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
+ ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)).
-internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
+internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
+ Protocol) ->
ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)).
+ Content, FrameMax, Protocol)).
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
@@ -196,13 +214,20 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
-internal_send_command_async(Sock, Channel, MethodRecord) ->
- true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)),
+internal_send_command_async(MethodRecord,
+ #wstate{sock = Sock,
+ channel = Channel,
+ protocol = Protocol}) ->
+ true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)),
ok.
-internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
+internal_send_command_async(MethodRecord, Content,
+ #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)),
+ Content, FrameMax, Protocol)),
ok.
port_cmd(Sock, Data) ->